~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/io_uring/sqpoll.c

Version: ~ [ linux-6.11-rc3 ] ~ [ linux-6.10.4 ] ~ [ linux-6.9.12 ] ~ [ linux-6.8.12 ] ~ [ linux-6.7.12 ] ~ [ linux-6.6.45 ] ~ [ linux-6.5.13 ] ~ [ linux-6.4.16 ] ~ [ linux-6.3.13 ] ~ [ linux-6.2.16 ] ~ [ linux-6.1.104 ] ~ [ linux-6.0.19 ] ~ [ linux-5.19.17 ] ~ [ linux-5.18.19 ] ~ [ linux-5.17.15 ] ~ [ linux-5.16.20 ] ~ [ linux-5.15.164 ] ~ [ linux-5.14.21 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.223 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.281 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.319 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.336 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.337 ] ~ [ linux-4.4.302 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.9 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 // SPDX-License-Identifier: GPL-2.0
  2 /*
  3  * Contains the core associated with submission side polling of the SQ
  4  * ring, offloading submissions from the application to a kernel thread.
  5  */
  6 #include <linux/kernel.h>
  7 #include <linux/errno.h>
  8 #include <linux/file.h>
  9 #include <linux/mm.h>
 10 #include <linux/slab.h>
 11 #include <linux/audit.h>
 12 #include <linux/security.h>
 13 #include <linux/io_uring.h>
 14 
 15 #include <uapi/linux/io_uring.h>
 16 
 17 #include "io_uring.h"
 18 #include "napi.h"
 19 #include "sqpoll.h"
 20 
 21 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
 22 #define IORING_TW_CAP_ENTRIES_VALUE     8
 23 
 24 enum {
 25         IO_SQ_THREAD_SHOULD_STOP = 0,
 26         IO_SQ_THREAD_SHOULD_PARK,
 27 };
 28 
 29 void io_sq_thread_unpark(struct io_sq_data *sqd)
 30         __releases(&sqd->lock)
 31 {
 32         WARN_ON_ONCE(sqd->thread == current);
 33 
 34         /*
 35          * Do the dance but not conditional clear_bit() because it'd race with
 36          * other threads incrementing park_pending and setting the bit.
 37          */
 38         clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
 39         if (atomic_dec_return(&sqd->park_pending))
 40                 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
 41         mutex_unlock(&sqd->lock);
 42 }
 43 
 44 void io_sq_thread_park(struct io_sq_data *sqd)
 45         __acquires(&sqd->lock)
 46 {
 47         WARN_ON_ONCE(sqd->thread == current);
 48 
 49         atomic_inc(&sqd->park_pending);
 50         set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
 51         mutex_lock(&sqd->lock);
 52         if (sqd->thread)
 53                 wake_up_process(sqd->thread);
 54 }
 55 
 56 void io_sq_thread_stop(struct io_sq_data *sqd)
 57 {
 58         WARN_ON_ONCE(sqd->thread == current);
 59         WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
 60 
 61         set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
 62         mutex_lock(&sqd->lock);
 63         if (sqd->thread)
 64                 wake_up_process(sqd->thread);
 65         mutex_unlock(&sqd->lock);
 66         wait_for_completion(&sqd->exited);
 67 }
 68 
 69 void io_put_sq_data(struct io_sq_data *sqd)
 70 {
 71         if (refcount_dec_and_test(&sqd->refs)) {
 72                 WARN_ON_ONCE(atomic_read(&sqd->park_pending));
 73 
 74                 io_sq_thread_stop(sqd);
 75                 kfree(sqd);
 76         }
 77 }
 78 
 79 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
 80 {
 81         struct io_ring_ctx *ctx;
 82         unsigned sq_thread_idle = 0;
 83 
 84         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
 85                 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
 86         sqd->sq_thread_idle = sq_thread_idle;
 87 }
 88 
 89 void io_sq_thread_finish(struct io_ring_ctx *ctx)
 90 {
 91         struct io_sq_data *sqd = ctx->sq_data;
 92 
 93         if (sqd) {
 94                 io_sq_thread_park(sqd);
 95                 list_del_init(&ctx->sqd_list);
 96                 io_sqd_update_thread_idle(sqd);
 97                 io_sq_thread_unpark(sqd);
 98 
 99                 io_put_sq_data(sqd);
100                 ctx->sq_data = NULL;
101         }
102 }
103 
104 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
105 {
106         struct io_ring_ctx *ctx_attach;
107         struct io_sq_data *sqd;
108         struct fd f;
109 
110         f = fdget(p->wq_fd);
111         if (!f.file)
112                 return ERR_PTR(-ENXIO);
113         if (!io_is_uring_fops(f.file)) {
114                 fdput(f);
115                 return ERR_PTR(-EINVAL);
116         }
117 
118         ctx_attach = f.file->private_data;
119         sqd = ctx_attach->sq_data;
120         if (!sqd) {
121                 fdput(f);
122                 return ERR_PTR(-EINVAL);
123         }
124         if (sqd->task_tgid != current->tgid) {
125                 fdput(f);
126                 return ERR_PTR(-EPERM);
127         }
128 
129         refcount_inc(&sqd->refs);
130         fdput(f);
131         return sqd;
132 }
133 
134 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
135                                          bool *attached)
136 {
137         struct io_sq_data *sqd;
138 
139         *attached = false;
140         if (p->flags & IORING_SETUP_ATTACH_WQ) {
141                 sqd = io_attach_sq_data(p);
142                 if (!IS_ERR(sqd)) {
143                         *attached = true;
144                         return sqd;
145                 }
146                 /* fall through for EPERM case, setup new sqd/task */
147                 if (PTR_ERR(sqd) != -EPERM)
148                         return sqd;
149         }
150 
151         sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
152         if (!sqd)
153                 return ERR_PTR(-ENOMEM);
154 
155         atomic_set(&sqd->park_pending, 0);
156         refcount_set(&sqd->refs, 1);
157         INIT_LIST_HEAD(&sqd->ctx_list);
158         mutex_init(&sqd->lock);
159         init_waitqueue_head(&sqd->wait);
160         init_completion(&sqd->exited);
161         return sqd;
162 }
163 
164 static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
165 {
166         return READ_ONCE(sqd->state);
167 }
168 
169 static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
170 {
171         unsigned int to_submit;
172         int ret = 0;
173 
174         to_submit = io_sqring_entries(ctx);
175         /* if we're handling multiple rings, cap submit size for fairness */
176         if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
177                 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
178 
179         if (!wq_list_empty(&ctx->iopoll_list) || to_submit) {
180                 const struct cred *creds = NULL;
181 
182                 if (ctx->sq_creds != current_cred())
183                         creds = override_creds(ctx->sq_creds);
184 
185                 mutex_lock(&ctx->uring_lock);
186                 if (!wq_list_empty(&ctx->iopoll_list))
187                         io_do_iopoll(ctx, true);
188 
189                 /*
190                  * Don't submit if refs are dying, good for io_uring_register(),
191                  * but also it is relied upon by io_ring_exit_work()
192                  */
193                 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
194                     !(ctx->flags & IORING_SETUP_R_DISABLED))
195                         ret = io_submit_sqes(ctx, to_submit);
196                 mutex_unlock(&ctx->uring_lock);
197 
198                 if (io_napi(ctx))
199                         ret += io_napi_sqpoll_busy_poll(ctx);
200 
201                 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
202                         wake_up(&ctx->sqo_sq_wait);
203                 if (creds)
204                         revert_creds(creds);
205         }
206 
207         return ret;
208 }
209 
210 static bool io_sqd_handle_event(struct io_sq_data *sqd)
211 {
212         bool did_sig = false;
213         struct ksignal ksig;
214 
215         if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
216             signal_pending(current)) {
217                 mutex_unlock(&sqd->lock);
218                 if (signal_pending(current))
219                         did_sig = get_signal(&ksig);
220                 cond_resched();
221                 mutex_lock(&sqd->lock);
222                 sqd->sq_cpu = raw_smp_processor_id();
223         }
224         return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
225 }
226 
227 /*
228  * Run task_work, processing the retry_list first. The retry_list holds
229  * entries that we passed on in the previous run, if we had more task_work
230  * than we were asked to process. Newly queued task_work isn't run until the
231  * retry list has been fully processed.
232  */
233 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
234 {
235         struct io_uring_task *tctx = current->io_uring;
236         unsigned int count = 0;
237 
238         if (*retry_list) {
239                 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
240                 if (count >= max_entries)
241                         goto out;
242                 max_entries -= count;
243         }
244         *retry_list = tctx_task_work_run(tctx, max_entries, &count);
245 out:
246         if (task_work_pending(current))
247                 task_work_run();
248         return count;
249 }
250 
251 static bool io_sq_tw_pending(struct llist_node *retry_list)
252 {
253         struct io_uring_task *tctx = current->io_uring;
254 
255         return retry_list || !llist_empty(&tctx->task_list);
256 }
257 
258 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
259 {
260         struct rusage end;
261 
262         getrusage(current, RUSAGE_SELF, &end);
263         end.ru_stime.tv_sec -= start->ru_stime.tv_sec;
264         end.ru_stime.tv_usec -= start->ru_stime.tv_usec;
265 
266         sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000;
267 }
268 
269 static int io_sq_thread(void *data)
270 {
271         struct llist_node *retry_list = NULL;
272         struct io_sq_data *sqd = data;
273         struct io_ring_ctx *ctx;
274         struct rusage start;
275         unsigned long timeout = 0;
276         char buf[TASK_COMM_LEN];
277         DEFINE_WAIT(wait);
278 
279         /* offload context creation failed, just exit */
280         if (!current->io_uring)
281                 goto err_out;
282 
283         snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
284         set_task_comm(current, buf);
285 
286         /* reset to our pid after we've set task_comm, for fdinfo */
287         sqd->task_pid = current->pid;
288 
289         if (sqd->sq_cpu != -1) {
290                 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
291         } else {
292                 set_cpus_allowed_ptr(current, cpu_online_mask);
293                 sqd->sq_cpu = raw_smp_processor_id();
294         }
295 
296         /*
297          * Force audit context to get setup, in case we do prep side async
298          * operations that would trigger an audit call before any issue side
299          * audit has been done.
300          */
301         audit_uring_entry(IORING_OP_NOP);
302         audit_uring_exit(true, 0);
303 
304         mutex_lock(&sqd->lock);
305         while (1) {
306                 bool cap_entries, sqt_spin = false;
307 
308                 if (io_sqd_events_pending(sqd) || signal_pending(current)) {
309                         if (io_sqd_handle_event(sqd))
310                                 break;
311                         timeout = jiffies + sqd->sq_thread_idle;
312                 }
313 
314                 cap_entries = !list_is_singular(&sqd->ctx_list);
315                 getrusage(current, RUSAGE_SELF, &start);
316                 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
317                         int ret = __io_sq_thread(ctx, cap_entries);
318 
319                         if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
320                                 sqt_spin = true;
321                 }
322                 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
323                         sqt_spin = true;
324 
325                 if (sqt_spin || !time_after(jiffies, timeout)) {
326                         if (sqt_spin) {
327                                 io_sq_update_worktime(sqd, &start);
328                                 timeout = jiffies + sqd->sq_thread_idle;
329                         }
330                         if (unlikely(need_resched())) {
331                                 mutex_unlock(&sqd->lock);
332                                 cond_resched();
333                                 mutex_lock(&sqd->lock);
334                                 sqd->sq_cpu = raw_smp_processor_id();
335                         }
336                         continue;
337                 }
338 
339                 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
340                 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
341                         bool needs_sched = true;
342 
343                         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
344                                 atomic_or(IORING_SQ_NEED_WAKEUP,
345                                                 &ctx->rings->sq_flags);
346                                 if ((ctx->flags & IORING_SETUP_IOPOLL) &&
347                                     !wq_list_empty(&ctx->iopoll_list)) {
348                                         needs_sched = false;
349                                         break;
350                                 }
351 
352                                 /*
353                                  * Ensure the store of the wakeup flag is not
354                                  * reordered with the load of the SQ tail
355                                  */
356                                 smp_mb__after_atomic();
357 
358                                 if (io_sqring_entries(ctx)) {
359                                         needs_sched = false;
360                                         break;
361                                 }
362                         }
363 
364                         if (needs_sched) {
365                                 mutex_unlock(&sqd->lock);
366                                 schedule();
367                                 mutex_lock(&sqd->lock);
368                                 sqd->sq_cpu = raw_smp_processor_id();
369                         }
370                         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
371                                 atomic_andnot(IORING_SQ_NEED_WAKEUP,
372                                                 &ctx->rings->sq_flags);
373                 }
374 
375                 finish_wait(&sqd->wait, &wait);
376                 timeout = jiffies + sqd->sq_thread_idle;
377         }
378 
379         if (retry_list)
380                 io_sq_tw(&retry_list, UINT_MAX);
381 
382         io_uring_cancel_generic(true, sqd);
383         sqd->thread = NULL;
384         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
385                 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
386         io_run_task_work();
387         mutex_unlock(&sqd->lock);
388 err_out:
389         complete(&sqd->exited);
390         do_exit(0);
391 }
392 
393 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
394 {
395         DEFINE_WAIT(wait);
396 
397         do {
398                 if (!io_sqring_full(ctx))
399                         break;
400                 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
401 
402                 if (!io_sqring_full(ctx))
403                         break;
404                 schedule();
405         } while (!signal_pending(current));
406 
407         finish_wait(&ctx->sqo_sq_wait, &wait);
408 }
409 
410 __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
411                                 struct io_uring_params *p)
412 {
413         int ret;
414 
415         /* Retain compatibility with failing for an invalid attach attempt */
416         if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
417                                 IORING_SETUP_ATTACH_WQ) {
418                 struct fd f;
419 
420                 f = fdget(p->wq_fd);
421                 if (!f.file)
422                         return -ENXIO;
423                 if (!io_is_uring_fops(f.file)) {
424                         fdput(f);
425                         return -EINVAL;
426                 }
427                 fdput(f);
428         }
429         if (ctx->flags & IORING_SETUP_SQPOLL) {
430                 struct task_struct *tsk;
431                 struct io_sq_data *sqd;
432                 bool attached;
433 
434                 ret = security_uring_sqpoll();
435                 if (ret)
436                         return ret;
437 
438                 sqd = io_get_sq_data(p, &attached);
439                 if (IS_ERR(sqd)) {
440                         ret = PTR_ERR(sqd);
441                         goto err;
442                 }
443 
444                 ctx->sq_creds = get_current_cred();
445                 ctx->sq_data = sqd;
446                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
447                 if (!ctx->sq_thread_idle)
448                         ctx->sq_thread_idle = HZ;
449 
450                 io_sq_thread_park(sqd);
451                 list_add(&ctx->sqd_list, &sqd->ctx_list);
452                 io_sqd_update_thread_idle(sqd);
453                 /* don't attach to a dying SQPOLL thread, would be racy */
454                 ret = (attached && !sqd->thread) ? -ENXIO : 0;
455                 io_sq_thread_unpark(sqd);
456 
457                 if (ret < 0)
458                         goto err;
459                 if (attached)
460                         return 0;
461 
462                 if (p->flags & IORING_SETUP_SQ_AFF) {
463                         int cpu = p->sq_thread_cpu;
464 
465                         ret = -EINVAL;
466                         if (cpu >= nr_cpu_ids || !cpu_online(cpu))
467                                 goto err_sqpoll;
468                         sqd->sq_cpu = cpu;
469                 } else {
470                         sqd->sq_cpu = -1;
471                 }
472 
473                 sqd->task_pid = current->pid;
474                 sqd->task_tgid = current->tgid;
475                 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
476                 if (IS_ERR(tsk)) {
477                         ret = PTR_ERR(tsk);
478                         goto err_sqpoll;
479                 }
480 
481                 sqd->thread = tsk;
482                 ret = io_uring_alloc_task_context(tsk, ctx);
483                 wake_up_new_task(tsk);
484                 if (ret)
485                         goto err;
486         } else if (p->flags & IORING_SETUP_SQ_AFF) {
487                 /* Can't have SQ_AFF without SQPOLL */
488                 ret = -EINVAL;
489                 goto err;
490         }
491 
492         return 0;
493 err_sqpoll:
494         complete(&ctx->sq_data->exited);
495 err:
496         io_sq_thread_finish(ctx);
497         return ret;
498 }
499 
500 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
501                                      cpumask_var_t mask)
502 {
503         struct io_sq_data *sqd = ctx->sq_data;
504         int ret = -EINVAL;
505 
506         if (sqd) {
507                 io_sq_thread_park(sqd);
508                 /* Don't set affinity for a dying thread */
509                 if (sqd->thread)
510                         ret = io_wq_cpu_affinity(sqd->thread->io_uring, mask);
511                 io_sq_thread_unpark(sqd);
512         }
513 
514         return ret;
515 }
516 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

sflogo.php