1 /* SPDX-License-Identifier: GPL-2.0-or-later */ 2 /* 3 * Definitions for the 'struct ptr_ring' datastructure. 4 * 5 * Author: 6 * Michael S. Tsirkin <mst@redhat.com> 7 * 8 * Copyright (C) 2016 Red Hat, Inc. 9 * 10 * This is a limited-size FIFO maintaining pointers in FIFO order, with 11 * one CPU producing entries and another consuming entries from a FIFO. 12 * 13 * This implementation tries to minimize cache-contention when there is a 14 * single producer and a single consumer CPU. 15 */ 16 17 #ifndef _LINUX_PTR_RING_H 18 #define _LINUX_PTR_RING_H 1 19 20 #ifdef __KERNEL__ 21 #include <linux/spinlock.h> 22 #include <linux/cache.h> 23 #include <linux/types.h> 24 #include <linux/compiler.h> 25 #include <linux/slab.h> 26 #include <linux/mm.h> 27 #include <asm/errno.h> 28 #endif 29 30 struct ptr_ring { 31 int producer ____cacheline_aligned_in_smp; 32 spinlock_t producer_lock; 33 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 34 int consumer_tail; /* next entry to invalidate */ 35 spinlock_t consumer_lock; 36 /* Shared consumer/producer data */ 37 /* Read-only by both the producer and the consumer */ 38 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 39 int batch; /* number of entries to consume in a batch */ 40 void **queue; 41 }; 42 43 /* Note: callers invoking this in a loop must use a compiler barrier, 44 * for example cpu_relax(). 45 * 46 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: 47 * see e.g. ptr_ring_full. 48 */ 49 static inline bool __ptr_ring_full(struct ptr_ring *r) 50 { 51 return r->queue[r->producer]; 52 } 53 54 static inline bool ptr_ring_full(struct ptr_ring *r) 55 { 56 bool ret; 57 58 spin_lock(&r->producer_lock); 59 ret = __ptr_ring_full(r); 60 spin_unlock(&r->producer_lock); 61 62 return ret; 63 } 64 65 static inline bool ptr_ring_full_irq(struct ptr_ring *r) 66 { 67 bool ret; 68 69 spin_lock_irq(&r->producer_lock); 70 ret = __ptr_ring_full(r); 71 spin_unlock_irq(&r->producer_lock); 72 73 return ret; 74 } 75 76 static inline bool ptr_ring_full_any(struct ptr_ring *r) 77 { 78 unsigned long flags; 79 bool ret; 80 81 spin_lock_irqsave(&r->producer_lock, flags); 82 ret = __ptr_ring_full(r); 83 spin_unlock_irqrestore(&r->producer_lock, flags); 84 85 return ret; 86 } 87 88 static inline bool ptr_ring_full_bh(struct ptr_ring *r) 89 { 90 bool ret; 91 92 spin_lock_bh(&r->producer_lock); 93 ret = __ptr_ring_full(r); 94 spin_unlock_bh(&r->producer_lock); 95 96 return ret; 97 } 98 99 /* Note: callers invoking this in a loop must use a compiler barrier, 100 * for example cpu_relax(). Callers must hold producer_lock. 101 * Callers are responsible for making sure pointer that is being queued 102 * points to a valid data. 103 */ 104 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 105 { 106 if (unlikely(!r->size) || r->queue[r->producer]) 107 return -ENOSPC; 108 109 /* Make sure the pointer we are storing points to a valid data. */ 110 /* Pairs with the dependency ordering in __ptr_ring_consume. */ 111 smp_wmb(); 112 113 WRITE_ONCE(r->queue[r->producer++], ptr); 114 if (unlikely(r->producer >= r->size)) 115 r->producer = 0; 116 return 0; 117 } 118 119 /* 120 * Note: resize (below) nests producer lock within consumer lock, so if you 121 * consume in interrupt or BH context, you must disable interrupts/BH when 122 * calling this. 123 */ 124 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 125 { 126 int ret; 127 128 spin_lock(&r->producer_lock); 129 ret = __ptr_ring_produce(r, ptr); 130 spin_unlock(&r->producer_lock); 131 132 return ret; 133 } 134 135 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 136 { 137 int ret; 138 139 spin_lock_irq(&r->producer_lock); 140 ret = __ptr_ring_produce(r, ptr); 141 spin_unlock_irq(&r->producer_lock); 142 143 return ret; 144 } 145 146 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 147 { 148 unsigned long flags; 149 int ret; 150 151 spin_lock_irqsave(&r->producer_lock, flags); 152 ret = __ptr_ring_produce(r, ptr); 153 spin_unlock_irqrestore(&r->producer_lock, flags); 154 155 return ret; 156 } 157 158 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 159 { 160 int ret; 161 162 spin_lock_bh(&r->producer_lock); 163 ret = __ptr_ring_produce(r, ptr); 164 spin_unlock_bh(&r->producer_lock); 165 166 return ret; 167 } 168 169 static inline void *__ptr_ring_peek(struct ptr_ring *r) 170 { 171 if (likely(r->size)) 172 return READ_ONCE(r->queue[r->consumer_head]); 173 return NULL; 174 } 175 176 /* 177 * Test ring empty status without taking any locks. 178 * 179 * NB: This is only safe to call if ring is never resized. 180 * 181 * However, if some other CPU consumes ring entries at the same time, the value 182 * returned is not guaranteed to be correct. 183 * 184 * In this case - to avoid incorrectly detecting the ring 185 * as empty - the CPU consuming the ring entries is responsible 186 * for either consuming all ring entries until the ring is empty, 187 * or synchronizing with some other CPU and causing it to 188 * re-test __ptr_ring_empty and/or consume the ring enteries 189 * after the synchronization point. 190 * 191 * Note: callers invoking this in a loop must use a compiler barrier, 192 * for example cpu_relax(). 193 */ 194 static inline bool __ptr_ring_empty(struct ptr_ring *r) 195 { 196 if (likely(r->size)) 197 return !r->queue[READ_ONCE(r->consumer_head)]; 198 return true; 199 } 200 201 static inline bool ptr_ring_empty(struct ptr_ring *r) 202 { 203 bool ret; 204 205 spin_lock(&r->consumer_lock); 206 ret = __ptr_ring_empty(r); 207 spin_unlock(&r->consumer_lock); 208 209 return ret; 210 } 211 212 static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 213 { 214 bool ret; 215 216 spin_lock_irq(&r->consumer_lock); 217 ret = __ptr_ring_empty(r); 218 spin_unlock_irq(&r->consumer_lock); 219 220 return ret; 221 } 222 223 static inline bool ptr_ring_empty_any(struct ptr_ring *r) 224 { 225 unsigned long flags; 226 bool ret; 227 228 spin_lock_irqsave(&r->consumer_lock, flags); 229 ret = __ptr_ring_empty(r); 230 spin_unlock_irqrestore(&r->consumer_lock, flags); 231 232 return ret; 233 } 234 235 static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 236 { 237 bool ret; 238 239 spin_lock_bh(&r->consumer_lock); 240 ret = __ptr_ring_empty(r); 241 spin_unlock_bh(&r->consumer_lock); 242 243 return ret; 244 } 245 246 /* Must only be called after __ptr_ring_peek returned !NULL */ 247 static inline void __ptr_ring_discard_one(struct ptr_ring *r) 248 { 249 /* Fundamentally, what we want to do is update consumer 250 * index and zero out the entry so producer can reuse it. 251 * Doing it naively at each consume would be as simple as: 252 * consumer = r->consumer; 253 * r->queue[consumer++] = NULL; 254 * if (unlikely(consumer >= r->size)) 255 * consumer = 0; 256 * r->consumer = consumer; 257 * but that is suboptimal when the ring is full as producer is writing 258 * out new entries in the same cache line. Defer these updates until a 259 * batch of entries has been consumed. 260 */ 261 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty 262 * to work correctly. 263 */ 264 int consumer_head = r->consumer_head; 265 int head = consumer_head++; 266 267 /* Once we have processed enough entries invalidate them in 268 * the ring all at once so producer can reuse their space in the ring. 269 * We also do this when we reach end of the ring - not mandatory 270 * but helps keep the implementation simple. 271 */ 272 if (unlikely(consumer_head - r->consumer_tail >= r->batch || 273 consumer_head >= r->size)) { 274 /* Zero out entries in the reverse order: this way we touch the 275 * cache line that producer might currently be reading the last; 276 * producer won't make progress and touch other cache lines 277 * besides the first one until we write out all entries. 278 */ 279 while (likely(head >= r->consumer_tail)) 280 r->queue[head--] = NULL; 281 r->consumer_tail = consumer_head; 282 } 283 if (unlikely(consumer_head >= r->size)) { 284 consumer_head = 0; 285 r->consumer_tail = 0; 286 } 287 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 288 WRITE_ONCE(r->consumer_head, consumer_head); 289 } 290 291 static inline void *__ptr_ring_consume(struct ptr_ring *r) 292 { 293 void *ptr; 294 295 /* The READ_ONCE in __ptr_ring_peek guarantees that anyone 296 * accessing data through the pointer is up to date. Pairs 297 * with smp_wmb in __ptr_ring_produce. 298 */ 299 ptr = __ptr_ring_peek(r); 300 if (ptr) 301 __ptr_ring_discard_one(r); 302 303 return ptr; 304 } 305 306 static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 307 void **array, int n) 308 { 309 void *ptr; 310 int i; 311 312 for (i = 0; i < n; i++) { 313 ptr = __ptr_ring_consume(r); 314 if (!ptr) 315 break; 316 array[i] = ptr; 317 } 318 319 return i; 320 } 321 322 /* 323 * Note: resize (below) nests producer lock within consumer lock, so if you 324 * call this in interrupt or BH context, you must disable interrupts/BH when 325 * producing. 326 */ 327 static inline void *ptr_ring_consume(struct ptr_ring *r) 328 { 329 void *ptr; 330 331 spin_lock(&r->consumer_lock); 332 ptr = __ptr_ring_consume(r); 333 spin_unlock(&r->consumer_lock); 334 335 return ptr; 336 } 337 338 static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 339 { 340 void *ptr; 341 342 spin_lock_irq(&r->consumer_lock); 343 ptr = __ptr_ring_consume(r); 344 spin_unlock_irq(&r->consumer_lock); 345 346 return ptr; 347 } 348 349 static inline void *ptr_ring_consume_any(struct ptr_ring *r) 350 { 351 unsigned long flags; 352 void *ptr; 353 354 spin_lock_irqsave(&r->consumer_lock, flags); 355 ptr = __ptr_ring_consume(r); 356 spin_unlock_irqrestore(&r->consumer_lock, flags); 357 358 return ptr; 359 } 360 361 static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 362 { 363 void *ptr; 364 365 spin_lock_bh(&r->consumer_lock); 366 ptr = __ptr_ring_consume(r); 367 spin_unlock_bh(&r->consumer_lock); 368 369 return ptr; 370 } 371 372 static inline int ptr_ring_consume_batched(struct ptr_ring *r, 373 void **array, int n) 374 { 375 int ret; 376 377 spin_lock(&r->consumer_lock); 378 ret = __ptr_ring_consume_batched(r, array, n); 379 spin_unlock(&r->consumer_lock); 380 381 return ret; 382 } 383 384 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 385 void **array, int n) 386 { 387 int ret; 388 389 spin_lock_irq(&r->consumer_lock); 390 ret = __ptr_ring_consume_batched(r, array, n); 391 spin_unlock_irq(&r->consumer_lock); 392 393 return ret; 394 } 395 396 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 397 void **array, int n) 398 { 399 unsigned long flags; 400 int ret; 401 402 spin_lock_irqsave(&r->consumer_lock, flags); 403 ret = __ptr_ring_consume_batched(r, array, n); 404 spin_unlock_irqrestore(&r->consumer_lock, flags); 405 406 return ret; 407 } 408 409 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 410 void **array, int n) 411 { 412 int ret; 413 414 spin_lock_bh(&r->consumer_lock); 415 ret = __ptr_ring_consume_batched(r, array, n); 416 spin_unlock_bh(&r->consumer_lock); 417 418 return ret; 419 } 420 421 /* Cast to structure type and call a function without discarding from FIFO. 422 * Function must return a value. 423 * Callers must take consumer_lock. 424 */ 425 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 426 427 #define PTR_RING_PEEK_CALL(r, f) ({ \ 428 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 429 \ 430 spin_lock(&(r)->consumer_lock); \ 431 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 432 spin_unlock(&(r)->consumer_lock); \ 433 __PTR_RING_PEEK_CALL_v; \ 434 }) 435 436 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 437 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 438 \ 439 spin_lock_irq(&(r)->consumer_lock); \ 440 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 441 spin_unlock_irq(&(r)->consumer_lock); \ 442 __PTR_RING_PEEK_CALL_v; \ 443 }) 444 445 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 446 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 447 \ 448 spin_lock_bh(&(r)->consumer_lock); \ 449 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 450 spin_unlock_bh(&(r)->consumer_lock); \ 451 __PTR_RING_PEEK_CALL_v; \ 452 }) 453 454 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 455 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 456 unsigned long __PTR_RING_PEEK_CALL_f;\ 457 \ 458 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 459 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 460 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 461 __PTR_RING_PEEK_CALL_v; \ 462 }) 463 464 /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See 465 * documentation for vmalloc for which of them are legal. 466 */ 467 static inline void **__ptr_ring_init_queue_alloc_noprof(unsigned int size, gfp_t gfp) 468 { 469 if (size > KMALLOC_MAX_SIZE / sizeof(void *)) 470 return NULL; 471 return kvmalloc_array_noprof(size, sizeof(void *), gfp | __GFP_ZERO); 472 } 473 474 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 475 { 476 r->size = size; 477 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 478 /* We need to set batch at least to 1 to make logic 479 * in __ptr_ring_discard_one work correctly. 480 * Batching too much (because ring is small) would cause a lot of 481 * burstiness. Needs tuning, for now disable batching. 482 */ 483 if (r->batch > r->size / 2 || !r->batch) 484 r->batch = 1; 485 } 486 487 static inline int ptr_ring_init_noprof(struct ptr_ring *r, int size, gfp_t gfp) 488 { 489 r->queue = __ptr_ring_init_queue_alloc_noprof(size, gfp); 490 if (!r->queue) 491 return -ENOMEM; 492 493 __ptr_ring_set_size(r, size); 494 r->producer = r->consumer_head = r->consumer_tail = 0; 495 spin_lock_init(&r->producer_lock); 496 spin_lock_init(&r->consumer_lock); 497 498 return 0; 499 } 500 #define ptr_ring_init(...) alloc_hooks(ptr_ring_init_noprof(__VA_ARGS__)) 501 502 /* 503 * Return entries into ring. Destroy entries that don't fit. 504 * 505 * Note: this is expected to be a rare slow path operation. 506 * 507 * Note: producer lock is nested within consumer lock, so if you 508 * resize you must make sure all uses nest correctly. 509 * In particular if you consume ring in interrupt or BH context, you must 510 * disable interrupts/BH when doing so. 511 */ 512 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 513 void (*destroy)(void *)) 514 { 515 unsigned long flags; 516 int head; 517 518 spin_lock_irqsave(&r->consumer_lock, flags); 519 spin_lock(&r->producer_lock); 520 521 if (!r->size) 522 goto done; 523 524 /* 525 * Clean out buffered entries (for simplicity). This way following code 526 * can test entries for NULL and if not assume they are valid. 527 */ 528 head = r->consumer_head - 1; 529 while (likely(head >= r->consumer_tail)) 530 r->queue[head--] = NULL; 531 r->consumer_tail = r->consumer_head; 532 533 /* 534 * Go over entries in batch, start moving head back and copy entries. 535 * Stop when we run into previously unconsumed entries. 536 */ 537 while (n) { 538 head = r->consumer_head - 1; 539 if (head < 0) 540 head = r->size - 1; 541 if (r->queue[head]) { 542 /* This batch entry will have to be destroyed. */ 543 goto done; 544 } 545 r->queue[head] = batch[--n]; 546 r->consumer_tail = head; 547 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 548 WRITE_ONCE(r->consumer_head, head); 549 } 550 551 done: 552 /* Destroy all entries left in the batch. */ 553 while (n) 554 destroy(batch[--n]); 555 spin_unlock(&r->producer_lock); 556 spin_unlock_irqrestore(&r->consumer_lock, flags); 557 } 558 559 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 560 int size, gfp_t gfp, 561 void (*destroy)(void *)) 562 { 563 int producer = 0; 564 void **old; 565 void *ptr; 566 567 while ((ptr = __ptr_ring_consume(r))) 568 if (producer < size) 569 queue[producer++] = ptr; 570 else if (destroy) 571 destroy(ptr); 572 573 if (producer >= size) 574 producer = 0; 575 __ptr_ring_set_size(r, size); 576 r->producer = producer; 577 r->consumer_head = 0; 578 r->consumer_tail = 0; 579 old = r->queue; 580 r->queue = queue; 581 582 return old; 583 } 584 585 /* 586 * Note: producer lock is nested within consumer lock, so if you 587 * resize you must make sure all uses nest correctly. 588 * In particular if you consume ring in interrupt or BH context, you must 589 * disable interrupts/BH when doing so. 590 */ 591 static inline int ptr_ring_resize_noprof(struct ptr_ring *r, int size, gfp_t gfp, 592 void (*destroy)(void *)) 593 { 594 unsigned long flags; 595 void **queue = __ptr_ring_init_queue_alloc_noprof(size, gfp); 596 void **old; 597 598 if (!queue) 599 return -ENOMEM; 600 601 spin_lock_irqsave(&(r)->consumer_lock, flags); 602 spin_lock(&(r)->producer_lock); 603 604 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 605 606 spin_unlock(&(r)->producer_lock); 607 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 608 609 kvfree(old); 610 611 return 0; 612 } 613 #define ptr_ring_resize(...) alloc_hooks(ptr_ring_resize_noprof(__VA_ARGS__)) 614 615 /* 616 * Note: producer lock is nested within consumer lock, so if you 617 * resize you must make sure all uses nest correctly. 618 * In particular if you consume ring in interrupt or BH context, you must 619 * disable interrupts/BH when doing so. 620 */ 621 static inline int ptr_ring_resize_multiple_noprof(struct ptr_ring **rings, 622 unsigned int nrings, 623 int size, 624 gfp_t gfp, void (*destroy)(void *)) 625 { 626 unsigned long flags; 627 void ***queues; 628 int i; 629 630 queues = kmalloc_array_noprof(nrings, sizeof(*queues), gfp); 631 if (!queues) 632 goto noqueues; 633 634 for (i = 0; i < nrings; ++i) { 635 queues[i] = __ptr_ring_init_queue_alloc_noprof(size, gfp); 636 if (!queues[i]) 637 goto nomem; 638 } 639 640 for (i = 0; i < nrings; ++i) { 641 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 642 spin_lock(&(rings[i])->producer_lock); 643 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 644 size, gfp, destroy); 645 spin_unlock(&(rings[i])->producer_lock); 646 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 647 } 648 649 for (i = 0; i < nrings; ++i) 650 kvfree(queues[i]); 651 652 kfree(queues); 653 654 return 0; 655 656 nomem: 657 while (--i >= 0) 658 kvfree(queues[i]); 659 660 kfree(queues); 661 662 noqueues: 663 return -ENOMEM; 664 } 665 #define ptr_ring_resize_multiple(...) \ 666 alloc_hooks(ptr_ring_resize_multiple_noprof(__VA_ARGS__)) 667 668 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 669 { 670 void *ptr; 671 672 if (destroy) 673 while ((ptr = ptr_ring_consume(r))) 674 destroy(ptr); 675 kvfree(r->queue); 676 } 677 678 #endif /* _LINUX_PTR_RING_H */ 679
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.