~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/io_uring/sqpoll.c

Version: ~ [ linux-6.11.5 ] ~ [ linux-6.10.14 ] ~ [ linux-6.9.12 ] ~ [ linux-6.8.12 ] ~ [ linux-6.7.12 ] ~ [ linux-6.6.58 ] ~ [ linux-6.5.13 ] ~ [ linux-6.4.16 ] ~ [ linux-6.3.13 ] ~ [ linux-6.2.16 ] ~ [ linux-6.1.114 ] ~ [ linux-6.0.19 ] ~ [ linux-5.19.17 ] ~ [ linux-5.18.19 ] ~ [ linux-5.17.15 ] ~ [ linux-5.16.20 ] ~ [ linux-5.15.169 ] ~ [ linux-5.14.21 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.228 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.284 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.322 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.336 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.337 ] ~ [ linux-4.4.302 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.9 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 // SPDX-License-Identifier: GPL-2.0
  2 /*
  3  * Contains the core associated with submission side polling of the SQ
  4  * ring, offloading submissions from the application to a kernel thread.
  5  */
  6 #include <linux/kernel.h>
  7 #include <linux/errno.h>
  8 #include <linux/file.h>
  9 #include <linux/mm.h>
 10 #include <linux/slab.h>
 11 #include <linux/audit.h>
 12 #include <linux/security.h>
 13 #include <linux/cpuset.h>
 14 #include <linux/io_uring.h>
 15 
 16 #include <uapi/linux/io_uring.h>
 17 
 18 #include "io_uring.h"
 19 #include "napi.h"
 20 #include "sqpoll.h"
 21 
 22 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
 23 #define IORING_TW_CAP_ENTRIES_VALUE     8
 24 
 25 enum {
 26         IO_SQ_THREAD_SHOULD_STOP = 0,
 27         IO_SQ_THREAD_SHOULD_PARK,
 28 };
 29 
 30 void io_sq_thread_unpark(struct io_sq_data *sqd)
 31         __releases(&sqd->lock)
 32 {
 33         WARN_ON_ONCE(sqd->thread == current);
 34 
 35         /*
 36          * Do the dance but not conditional clear_bit() because it'd race with
 37          * other threads incrementing park_pending and setting the bit.
 38          */
 39         clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
 40         if (atomic_dec_return(&sqd->park_pending))
 41                 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
 42         mutex_unlock(&sqd->lock);
 43 }
 44 
 45 void io_sq_thread_park(struct io_sq_data *sqd)
 46         __acquires(&sqd->lock)
 47 {
 48         WARN_ON_ONCE(data_race(sqd->thread) == current);
 49 
 50         atomic_inc(&sqd->park_pending);
 51         set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
 52         mutex_lock(&sqd->lock);
 53         if (sqd->thread)
 54                 wake_up_process(sqd->thread);
 55 }
 56 
 57 void io_sq_thread_stop(struct io_sq_data *sqd)
 58 {
 59         WARN_ON_ONCE(sqd->thread == current);
 60         WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
 61 
 62         set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
 63         mutex_lock(&sqd->lock);
 64         if (sqd->thread)
 65                 wake_up_process(sqd->thread);
 66         mutex_unlock(&sqd->lock);
 67         wait_for_completion(&sqd->exited);
 68 }
 69 
 70 void io_put_sq_data(struct io_sq_data *sqd)
 71 {
 72         if (refcount_dec_and_test(&sqd->refs)) {
 73                 WARN_ON_ONCE(atomic_read(&sqd->park_pending));
 74 
 75                 io_sq_thread_stop(sqd);
 76                 kfree(sqd);
 77         }
 78 }
 79 
 80 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
 81 {
 82         struct io_ring_ctx *ctx;
 83         unsigned sq_thread_idle = 0;
 84 
 85         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
 86                 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
 87         sqd->sq_thread_idle = sq_thread_idle;
 88 }
 89 
 90 void io_sq_thread_finish(struct io_ring_ctx *ctx)
 91 {
 92         struct io_sq_data *sqd = ctx->sq_data;
 93 
 94         if (sqd) {
 95                 io_sq_thread_park(sqd);
 96                 list_del_init(&ctx->sqd_list);
 97                 io_sqd_update_thread_idle(sqd);
 98                 io_sq_thread_unpark(sqd);
 99 
100                 io_put_sq_data(sqd);
101                 ctx->sq_data = NULL;
102         }
103 }
104 
105 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
106 {
107         struct io_ring_ctx *ctx_attach;
108         struct io_sq_data *sqd;
109         struct fd f;
110 
111         f = fdget(p->wq_fd);
112         if (!f.file)
113                 return ERR_PTR(-ENXIO);
114         if (!io_is_uring_fops(f.file)) {
115                 fdput(f);
116                 return ERR_PTR(-EINVAL);
117         }
118 
119         ctx_attach = f.file->private_data;
120         sqd = ctx_attach->sq_data;
121         if (!sqd) {
122                 fdput(f);
123                 return ERR_PTR(-EINVAL);
124         }
125         if (sqd->task_tgid != current->tgid) {
126                 fdput(f);
127                 return ERR_PTR(-EPERM);
128         }
129 
130         refcount_inc(&sqd->refs);
131         fdput(f);
132         return sqd;
133 }
134 
135 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
136                                          bool *attached)
137 {
138         struct io_sq_data *sqd;
139 
140         *attached = false;
141         if (p->flags & IORING_SETUP_ATTACH_WQ) {
142                 sqd = io_attach_sq_data(p);
143                 if (!IS_ERR(sqd)) {
144                         *attached = true;
145                         return sqd;
146                 }
147                 /* fall through for EPERM case, setup new sqd/task */
148                 if (PTR_ERR(sqd) != -EPERM)
149                         return sqd;
150         }
151 
152         sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
153         if (!sqd)
154                 return ERR_PTR(-ENOMEM);
155 
156         atomic_set(&sqd->park_pending, 0);
157         refcount_set(&sqd->refs, 1);
158         INIT_LIST_HEAD(&sqd->ctx_list);
159         mutex_init(&sqd->lock);
160         init_waitqueue_head(&sqd->wait);
161         init_completion(&sqd->exited);
162         return sqd;
163 }
164 
165 static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
166 {
167         return READ_ONCE(sqd->state);
168 }
169 
170 static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
171 {
172         unsigned int to_submit;
173         int ret = 0;
174 
175         to_submit = io_sqring_entries(ctx);
176         /* if we're handling multiple rings, cap submit size for fairness */
177         if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
178                 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
179 
180         if (!wq_list_empty(&ctx->iopoll_list) || to_submit) {
181                 const struct cred *creds = NULL;
182 
183                 if (ctx->sq_creds != current_cred())
184                         creds = override_creds(ctx->sq_creds);
185 
186                 mutex_lock(&ctx->uring_lock);
187                 if (!wq_list_empty(&ctx->iopoll_list))
188                         io_do_iopoll(ctx, true);
189 
190                 /*
191                  * Don't submit if refs are dying, good for io_uring_register(),
192                  * but also it is relied upon by io_ring_exit_work()
193                  */
194                 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
195                     !(ctx->flags & IORING_SETUP_R_DISABLED))
196                         ret = io_submit_sqes(ctx, to_submit);
197                 mutex_unlock(&ctx->uring_lock);
198 
199                 if (io_napi(ctx))
200                         ret += io_napi_sqpoll_busy_poll(ctx);
201 
202                 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
203                         wake_up(&ctx->sqo_sq_wait);
204                 if (creds)
205                         revert_creds(creds);
206         }
207 
208         return ret;
209 }
210 
211 static bool io_sqd_handle_event(struct io_sq_data *sqd)
212 {
213         bool did_sig = false;
214         struct ksignal ksig;
215 
216         if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
217             signal_pending(current)) {
218                 mutex_unlock(&sqd->lock);
219                 if (signal_pending(current))
220                         did_sig = get_signal(&ksig);
221                 cond_resched();
222                 mutex_lock(&sqd->lock);
223                 sqd->sq_cpu = raw_smp_processor_id();
224         }
225         return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
226 }
227 
228 /*
229  * Run task_work, processing the retry_list first. The retry_list holds
230  * entries that we passed on in the previous run, if we had more task_work
231  * than we were asked to process. Newly queued task_work isn't run until the
232  * retry list has been fully processed.
233  */
234 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
235 {
236         struct io_uring_task *tctx = current->io_uring;
237         unsigned int count = 0;
238 
239         if (*retry_list) {
240                 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
241                 if (count >= max_entries)
242                         goto out;
243                 max_entries -= count;
244         }
245         *retry_list = tctx_task_work_run(tctx, max_entries, &count);
246 out:
247         if (task_work_pending(current))
248                 task_work_run();
249         return count;
250 }
251 
252 static bool io_sq_tw_pending(struct llist_node *retry_list)
253 {
254         struct io_uring_task *tctx = current->io_uring;
255 
256         return retry_list || !llist_empty(&tctx->task_list);
257 }
258 
259 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
260 {
261         struct rusage end;
262 
263         getrusage(current, RUSAGE_SELF, &end);
264         end.ru_stime.tv_sec -= start->ru_stime.tv_sec;
265         end.ru_stime.tv_usec -= start->ru_stime.tv_usec;
266 
267         sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000;
268 }
269 
270 static int io_sq_thread(void *data)
271 {
272         struct llist_node *retry_list = NULL;
273         struct io_sq_data *sqd = data;
274         struct io_ring_ctx *ctx;
275         struct rusage start;
276         unsigned long timeout = 0;
277         char buf[TASK_COMM_LEN];
278         DEFINE_WAIT(wait);
279 
280         /* offload context creation failed, just exit */
281         if (!current->io_uring)
282                 goto err_out;
283 
284         snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
285         set_task_comm(current, buf);
286 
287         /* reset to our pid after we've set task_comm, for fdinfo */
288         sqd->task_pid = current->pid;
289 
290         if (sqd->sq_cpu != -1) {
291                 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
292         } else {
293                 set_cpus_allowed_ptr(current, cpu_online_mask);
294                 sqd->sq_cpu = raw_smp_processor_id();
295         }
296 
297         /*
298          * Force audit context to get setup, in case we do prep side async
299          * operations that would trigger an audit call before any issue side
300          * audit has been done.
301          */
302         audit_uring_entry(IORING_OP_NOP);
303         audit_uring_exit(true, 0);
304 
305         mutex_lock(&sqd->lock);
306         while (1) {
307                 bool cap_entries, sqt_spin = false;
308 
309                 if (io_sqd_events_pending(sqd) || signal_pending(current)) {
310                         if (io_sqd_handle_event(sqd))
311                                 break;
312                         timeout = jiffies + sqd->sq_thread_idle;
313                 }
314 
315                 cap_entries = !list_is_singular(&sqd->ctx_list);
316                 getrusage(current, RUSAGE_SELF, &start);
317                 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
318                         int ret = __io_sq_thread(ctx, cap_entries);
319 
320                         if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
321                                 sqt_spin = true;
322                 }
323                 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
324                         sqt_spin = true;
325 
326                 if (sqt_spin || !time_after(jiffies, timeout)) {
327                         if (sqt_spin) {
328                                 io_sq_update_worktime(sqd, &start);
329                                 timeout = jiffies + sqd->sq_thread_idle;
330                         }
331                         if (unlikely(need_resched())) {
332                                 mutex_unlock(&sqd->lock);
333                                 cond_resched();
334                                 mutex_lock(&sqd->lock);
335                                 sqd->sq_cpu = raw_smp_processor_id();
336                         }
337                         continue;
338                 }
339 
340                 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
341                 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
342                         bool needs_sched = true;
343 
344                         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
345                                 atomic_or(IORING_SQ_NEED_WAKEUP,
346                                                 &ctx->rings->sq_flags);
347                                 if ((ctx->flags & IORING_SETUP_IOPOLL) &&
348                                     !wq_list_empty(&ctx->iopoll_list)) {
349                                         needs_sched = false;
350                                         break;
351                                 }
352 
353                                 /*
354                                  * Ensure the store of the wakeup flag is not
355                                  * reordered with the load of the SQ tail
356                                  */
357                                 smp_mb__after_atomic();
358 
359                                 if (io_sqring_entries(ctx)) {
360                                         needs_sched = false;
361                                         break;
362                                 }
363                         }
364 
365                         if (needs_sched) {
366                                 mutex_unlock(&sqd->lock);
367                                 schedule();
368                                 mutex_lock(&sqd->lock);
369                                 sqd->sq_cpu = raw_smp_processor_id();
370                         }
371                         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
372                                 atomic_andnot(IORING_SQ_NEED_WAKEUP,
373                                                 &ctx->rings->sq_flags);
374                 }
375 
376                 finish_wait(&sqd->wait, &wait);
377                 timeout = jiffies + sqd->sq_thread_idle;
378         }
379 
380         if (retry_list)
381                 io_sq_tw(&retry_list, UINT_MAX);
382 
383         io_uring_cancel_generic(true, sqd);
384         sqd->thread = NULL;
385         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
386                 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
387         io_run_task_work();
388         mutex_unlock(&sqd->lock);
389 err_out:
390         complete(&sqd->exited);
391         do_exit(0);
392 }
393 
394 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
395 {
396         DEFINE_WAIT(wait);
397 
398         do {
399                 if (!io_sqring_full(ctx))
400                         break;
401                 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
402 
403                 if (!io_sqring_full(ctx))
404                         break;
405                 schedule();
406         } while (!signal_pending(current));
407 
408         finish_wait(&ctx->sqo_sq_wait, &wait);
409 }
410 
411 __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
412                                 struct io_uring_params *p)
413 {
414         int ret;
415 
416         /* Retain compatibility with failing for an invalid attach attempt */
417         if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
418                                 IORING_SETUP_ATTACH_WQ) {
419                 struct fd f;
420 
421                 f = fdget(p->wq_fd);
422                 if (!f.file)
423                         return -ENXIO;
424                 if (!io_is_uring_fops(f.file)) {
425                         fdput(f);
426                         return -EINVAL;
427                 }
428                 fdput(f);
429         }
430         if (ctx->flags & IORING_SETUP_SQPOLL) {
431                 struct task_struct *tsk;
432                 struct io_sq_data *sqd;
433                 bool attached;
434 
435                 ret = security_uring_sqpoll();
436                 if (ret)
437                         return ret;
438 
439                 sqd = io_get_sq_data(p, &attached);
440                 if (IS_ERR(sqd)) {
441                         ret = PTR_ERR(sqd);
442                         goto err;
443                 }
444 
445                 ctx->sq_creds = get_current_cred();
446                 ctx->sq_data = sqd;
447                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
448                 if (!ctx->sq_thread_idle)
449                         ctx->sq_thread_idle = HZ;
450 
451                 io_sq_thread_park(sqd);
452                 list_add(&ctx->sqd_list, &sqd->ctx_list);
453                 io_sqd_update_thread_idle(sqd);
454                 /* don't attach to a dying SQPOLL thread, would be racy */
455                 ret = (attached && !sqd->thread) ? -ENXIO : 0;
456                 io_sq_thread_unpark(sqd);
457 
458                 if (ret < 0)
459                         goto err;
460                 if (attached)
461                         return 0;
462 
463                 if (p->flags & IORING_SETUP_SQ_AFF) {
464                         cpumask_var_t allowed_mask;
465                         int cpu = p->sq_thread_cpu;
466 
467                         ret = -EINVAL;
468                         if (cpu >= nr_cpu_ids || !cpu_online(cpu))
469                                 goto err_sqpoll;
470                         ret = -ENOMEM;
471                         if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
472                                 goto err_sqpoll;
473                         ret = -EINVAL;
474                         cpuset_cpus_allowed(current, allowed_mask);
475                         if (!cpumask_test_cpu(cpu, allowed_mask)) {
476                                 free_cpumask_var(allowed_mask);
477                                 goto err_sqpoll;
478                         }
479                         free_cpumask_var(allowed_mask);
480                         sqd->sq_cpu = cpu;
481                 } else {
482                         sqd->sq_cpu = -1;
483                 }
484 
485                 sqd->task_pid = current->pid;
486                 sqd->task_tgid = current->tgid;
487                 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
488                 if (IS_ERR(tsk)) {
489                         ret = PTR_ERR(tsk);
490                         goto err_sqpoll;
491                 }
492 
493                 sqd->thread = tsk;
494                 ret = io_uring_alloc_task_context(tsk, ctx);
495                 wake_up_new_task(tsk);
496                 if (ret)
497                         goto err;
498         } else if (p->flags & IORING_SETUP_SQ_AFF) {
499                 /* Can't have SQ_AFF without SQPOLL */
500                 ret = -EINVAL;
501                 goto err;
502         }
503 
504         return 0;
505 err_sqpoll:
506         complete(&ctx->sq_data->exited);
507 err:
508         io_sq_thread_finish(ctx);
509         return ret;
510 }
511 
512 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
513                                      cpumask_var_t mask)
514 {
515         struct io_sq_data *sqd = ctx->sq_data;
516         int ret = -EINVAL;
517 
518         if (sqd) {
519                 io_sq_thread_park(sqd);
520                 /* Don't set affinity for a dying thread */
521                 if (sqd->thread)
522                         ret = io_wq_cpu_affinity(sqd->thread->io_uring, mask);
523                 io_sq_thread_unpark(sqd);
524         }
525 
526         return ret;
527 }
528 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

sflogo.php