1 /* 2 * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33 #include <linux/kernel.h> 34 #include <linux/sched/clock.h> 35 #include <linux/slab.h> 36 #include <linux/pci.h> 37 #include <linux/dma-mapping.h> 38 #include <rdma/rdma_cm.h> 39 40 #include "rds_single_path.h" 41 #include "rds.h" 42 #include "ib.h" 43 44 static struct kmem_cache *rds_ib_incoming_slab; 45 static struct kmem_cache *rds_ib_frag_slab; 46 static atomic_t rds_ib_allocation = ATOMIC_INIT(0); 47 48 void rds_ib_recv_init_ring(struct rds_ib_connection *ic) 49 { 50 struct rds_ib_recv_work *recv; 51 u32 i; 52 53 for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) { 54 struct ib_sge *sge; 55 56 recv->r_ibinc = NULL; 57 recv->r_frag = NULL; 58 59 recv->r_wr.next = NULL; 60 recv->r_wr.wr_id = i; 61 recv->r_wr.sg_list = recv->r_sge; 62 recv->r_wr.num_sge = RDS_IB_RECV_SGE; 63 64 sge = &recv->r_sge[0]; 65 sge->addr = ic->i_recv_hdrs_dma[i]; 66 sge->length = sizeof(struct rds_header); 67 sge->lkey = ic->i_pd->local_dma_lkey; 68 69 sge = &recv->r_sge[1]; 70 sge->addr = 0; 71 sge->length = RDS_FRAG_SIZE; 72 sge->lkey = ic->i_pd->local_dma_lkey; 73 } 74 } 75 76 /* 77 * The entire 'from' list, including the from element itself, is put on 78 * to the tail of the 'to' list. 79 */ 80 static void list_splice_entire_tail(struct list_head *from, 81 struct list_head *to) 82 { 83 struct list_head *from_last = from->prev; 84 85 list_splice_tail(from_last, to); 86 list_add_tail(from_last, to); 87 } 88 89 static void rds_ib_cache_xfer_to_ready(struct rds_ib_refill_cache *cache) 90 { 91 struct list_head *tmp; 92 93 tmp = xchg(&cache->xfer, NULL); 94 if (tmp) { 95 if (cache->ready) 96 list_splice_entire_tail(tmp, cache->ready); 97 else 98 cache->ready = tmp; 99 } 100 } 101 102 static int rds_ib_recv_alloc_cache(struct rds_ib_refill_cache *cache, gfp_t gfp) 103 { 104 struct rds_ib_cache_head *head; 105 int cpu; 106 107 cache->percpu = alloc_percpu_gfp(struct rds_ib_cache_head, gfp); 108 if (!cache->percpu) 109 return -ENOMEM; 110 111 for_each_possible_cpu(cpu) { 112 head = per_cpu_ptr(cache->percpu, cpu); 113 head->first = NULL; 114 head->count = 0; 115 } 116 cache->xfer = NULL; 117 cache->ready = NULL; 118 119 return 0; 120 } 121 122 int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic, gfp_t gfp) 123 { 124 int ret; 125 126 ret = rds_ib_recv_alloc_cache(&ic->i_cache_incs, gfp); 127 if (!ret) { 128 ret = rds_ib_recv_alloc_cache(&ic->i_cache_frags, gfp); 129 if (ret) 130 free_percpu(ic->i_cache_incs.percpu); 131 } 132 133 return ret; 134 } 135 136 static void rds_ib_cache_splice_all_lists(struct rds_ib_refill_cache *cache, 137 struct list_head *caller_list) 138 { 139 struct rds_ib_cache_head *head; 140 int cpu; 141 142 for_each_possible_cpu(cpu) { 143 head = per_cpu_ptr(cache->percpu, cpu); 144 if (head->first) { 145 list_splice_entire_tail(head->first, caller_list); 146 head->first = NULL; 147 } 148 } 149 150 if (cache->ready) { 151 list_splice_entire_tail(cache->ready, caller_list); 152 cache->ready = NULL; 153 } 154 } 155 156 void rds_ib_recv_free_caches(struct rds_ib_connection *ic) 157 { 158 struct rds_ib_incoming *inc; 159 struct rds_ib_incoming *inc_tmp; 160 struct rds_page_frag *frag; 161 struct rds_page_frag *frag_tmp; 162 LIST_HEAD(list); 163 164 rds_ib_cache_xfer_to_ready(&ic->i_cache_incs); 165 rds_ib_cache_splice_all_lists(&ic->i_cache_incs, &list); 166 free_percpu(ic->i_cache_incs.percpu); 167 168 list_for_each_entry_safe(inc, inc_tmp, &list, ii_cache_entry) { 169 list_del(&inc->ii_cache_entry); 170 WARN_ON(!list_empty(&inc->ii_frags)); 171 kmem_cache_free(rds_ib_incoming_slab, inc); 172 atomic_dec(&rds_ib_allocation); 173 } 174 175 rds_ib_cache_xfer_to_ready(&ic->i_cache_frags); 176 rds_ib_cache_splice_all_lists(&ic->i_cache_frags, &list); 177 free_percpu(ic->i_cache_frags.percpu); 178 179 list_for_each_entry_safe(frag, frag_tmp, &list, f_cache_entry) { 180 list_del(&frag->f_cache_entry); 181 WARN_ON(!list_empty(&frag->f_item)); 182 kmem_cache_free(rds_ib_frag_slab, frag); 183 } 184 } 185 186 /* fwd decl */ 187 static void rds_ib_recv_cache_put(struct list_head *new_item, 188 struct rds_ib_refill_cache *cache); 189 static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache); 190 191 192 /* Recycle frag and attached recv buffer f_sg */ 193 static void rds_ib_frag_free(struct rds_ib_connection *ic, 194 struct rds_page_frag *frag) 195 { 196 rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg)); 197 198 rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags); 199 atomic_add(RDS_FRAG_SIZE / SZ_1K, &ic->i_cache_allocs); 200 rds_ib_stats_add(s_ib_recv_added_to_cache, RDS_FRAG_SIZE); 201 } 202 203 /* Recycle inc after freeing attached frags */ 204 void rds_ib_inc_free(struct rds_incoming *inc) 205 { 206 struct rds_ib_incoming *ibinc; 207 struct rds_page_frag *frag; 208 struct rds_page_frag *pos; 209 struct rds_ib_connection *ic = inc->i_conn->c_transport_data; 210 211 ibinc = container_of(inc, struct rds_ib_incoming, ii_inc); 212 213 /* Free attached frags */ 214 list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) { 215 list_del_init(&frag->f_item); 216 rds_ib_frag_free(ic, frag); 217 } 218 BUG_ON(!list_empty(&ibinc->ii_frags)); 219 220 rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc); 221 rds_ib_recv_cache_put(&ibinc->ii_cache_entry, &ic->i_cache_incs); 222 } 223 224 static void rds_ib_recv_clear_one(struct rds_ib_connection *ic, 225 struct rds_ib_recv_work *recv) 226 { 227 if (recv->r_ibinc) { 228 rds_inc_put(&recv->r_ibinc->ii_inc); 229 recv->r_ibinc = NULL; 230 } 231 if (recv->r_frag) { 232 ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE); 233 rds_ib_frag_free(ic, recv->r_frag); 234 recv->r_frag = NULL; 235 } 236 } 237 238 void rds_ib_recv_clear_ring(struct rds_ib_connection *ic) 239 { 240 u32 i; 241 242 for (i = 0; i < ic->i_recv_ring.w_nr; i++) 243 rds_ib_recv_clear_one(ic, &ic->i_recvs[i]); 244 } 245 246 static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *ic, 247 gfp_t slab_mask) 248 { 249 struct rds_ib_incoming *ibinc; 250 struct list_head *cache_item; 251 int avail_allocs; 252 253 cache_item = rds_ib_recv_cache_get(&ic->i_cache_incs); 254 if (cache_item) { 255 ibinc = container_of(cache_item, struct rds_ib_incoming, ii_cache_entry); 256 } else { 257 avail_allocs = atomic_add_unless(&rds_ib_allocation, 258 1, rds_ib_sysctl_max_recv_allocation); 259 if (!avail_allocs) { 260 rds_ib_stats_inc(s_ib_rx_alloc_limit); 261 return NULL; 262 } 263 ibinc = kmem_cache_alloc(rds_ib_incoming_slab, slab_mask); 264 if (!ibinc) { 265 atomic_dec(&rds_ib_allocation); 266 return NULL; 267 } 268 rds_ib_stats_inc(s_ib_rx_total_incs); 269 } 270 INIT_LIST_HEAD(&ibinc->ii_frags); 271 rds_inc_init(&ibinc->ii_inc, ic->conn, &ic->conn->c_faddr); 272 273 return ibinc; 274 } 275 276 static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic, 277 gfp_t slab_mask, gfp_t page_mask) 278 { 279 struct rds_page_frag *frag; 280 struct list_head *cache_item; 281 int ret; 282 283 cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags); 284 if (cache_item) { 285 frag = container_of(cache_item, struct rds_page_frag, f_cache_entry); 286 atomic_sub(RDS_FRAG_SIZE / SZ_1K, &ic->i_cache_allocs); 287 rds_ib_stats_add(s_ib_recv_added_to_cache, RDS_FRAG_SIZE); 288 } else { 289 frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask); 290 if (!frag) 291 return NULL; 292 293 sg_init_table(&frag->f_sg, 1); 294 ret = rds_page_remainder_alloc(&frag->f_sg, 295 RDS_FRAG_SIZE, page_mask); 296 if (ret) { 297 kmem_cache_free(rds_ib_frag_slab, frag); 298 return NULL; 299 } 300 rds_ib_stats_inc(s_ib_rx_total_frags); 301 } 302 303 INIT_LIST_HEAD(&frag->f_item); 304 305 return frag; 306 } 307 308 static int rds_ib_recv_refill_one(struct rds_connection *conn, 309 struct rds_ib_recv_work *recv, gfp_t gfp) 310 { 311 struct rds_ib_connection *ic = conn->c_transport_data; 312 struct ib_sge *sge; 313 int ret = -ENOMEM; 314 gfp_t slab_mask = gfp; 315 gfp_t page_mask = gfp; 316 317 if (gfp & __GFP_DIRECT_RECLAIM) { 318 slab_mask = GFP_KERNEL; 319 page_mask = GFP_HIGHUSER; 320 } 321 322 if (!ic->i_cache_incs.ready) 323 rds_ib_cache_xfer_to_ready(&ic->i_cache_incs); 324 if (!ic->i_cache_frags.ready) 325 rds_ib_cache_xfer_to_ready(&ic->i_cache_frags); 326 327 /* 328 * ibinc was taken from recv if recv contained the start of a message. 329 * recvs that were continuations will still have this allocated. 330 */ 331 if (!recv->r_ibinc) { 332 recv->r_ibinc = rds_ib_refill_one_inc(ic, slab_mask); 333 if (!recv->r_ibinc) 334 goto out; 335 } 336 337 WARN_ON(recv->r_frag); /* leak! */ 338 recv->r_frag = rds_ib_refill_one_frag(ic, slab_mask, page_mask); 339 if (!recv->r_frag) 340 goto out; 341 342 ret = ib_dma_map_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 343 1, DMA_FROM_DEVICE); 344 WARN_ON(ret != 1); 345 346 sge = &recv->r_sge[0]; 347 sge->addr = ic->i_recv_hdrs_dma[recv - ic->i_recvs]; 348 sge->length = sizeof(struct rds_header); 349 350 sge = &recv->r_sge[1]; 351 sge->addr = sg_dma_address(&recv->r_frag->f_sg); 352 sge->length = sg_dma_len(&recv->r_frag->f_sg); 353 354 ret = 0; 355 out: 356 return ret; 357 } 358 359 static int acquire_refill(struct rds_connection *conn) 360 { 361 return test_and_set_bit(RDS_RECV_REFILL, &conn->c_flags) == 0; 362 } 363 364 static void release_refill(struct rds_connection *conn) 365 { 366 clear_bit(RDS_RECV_REFILL, &conn->c_flags); 367 smp_mb__after_atomic(); 368 369 /* We don't use wait_on_bit()/wake_up_bit() because our waking is in a 370 * hot path and finding waiters is very rare. We don't want to walk 371 * the system-wide hashed waitqueue buckets in the fast path only to 372 * almost never find waiters. 373 */ 374 if (waitqueue_active(&conn->c_waitq)) 375 wake_up_all(&conn->c_waitq); 376 } 377 378 /* 379 * This tries to allocate and post unused work requests after making sure that 380 * they have all the allocations they need to queue received fragments into 381 * sockets. 382 */ 383 void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) 384 { 385 struct rds_ib_connection *ic = conn->c_transport_data; 386 struct rds_ib_recv_work *recv; 387 unsigned int posted = 0; 388 int ret = 0; 389 bool can_wait = !!(gfp & __GFP_DIRECT_RECLAIM); 390 bool must_wake = false; 391 u32 pos; 392 393 /* the goal here is to just make sure that someone, somewhere 394 * is posting buffers. If we can't get the refill lock, 395 * let them do their thing 396 */ 397 if (!acquire_refill(conn)) 398 return; 399 400 while ((prefill || rds_conn_up(conn)) && 401 rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) { 402 if (pos >= ic->i_recv_ring.w_nr) { 403 printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n", 404 pos); 405 break; 406 } 407 408 recv = &ic->i_recvs[pos]; 409 ret = rds_ib_recv_refill_one(conn, recv, gfp); 410 if (ret) { 411 must_wake = true; 412 break; 413 } 414 415 rdsdebug("recv %p ibinc %p page %p addr %lu\n", recv, 416 recv->r_ibinc, sg_page(&recv->r_frag->f_sg), 417 (long)sg_dma_address(&recv->r_frag->f_sg)); 418 419 /* XXX when can this fail? */ 420 ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, NULL); 421 if (ret) { 422 rds_ib_conn_error(conn, "recv post on " 423 "%pI6c returned %d, disconnecting and " 424 "reconnecting\n", &conn->c_faddr, 425 ret); 426 break; 427 } 428 429 posted++; 430 431 if ((posted > 128 && need_resched()) || posted > 8192) { 432 must_wake = true; 433 break; 434 } 435 } 436 437 /* We're doing flow control - update the window. */ 438 if (ic->i_flowctl && posted) 439 rds_ib_advertise_credits(conn, posted); 440 441 if (ret) 442 rds_ib_ring_unalloc(&ic->i_recv_ring, 1); 443 444 release_refill(conn); 445 446 /* if we're called from the softirq handler, we'll be GFP_NOWAIT. 447 * in this case the ring being low is going to lead to more interrupts 448 * and we can safely let the softirq code take care of it unless the 449 * ring is completely empty. 450 * 451 * if we're called from krdsd, we'll be GFP_KERNEL. In this case 452 * we might have raced with the softirq code while we had the refill 453 * lock held. Use rds_ib_ring_low() instead of ring_empty to decide 454 * if we should requeue. 455 */ 456 if (rds_conn_up(conn) && 457 (must_wake || 458 (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) || 459 rds_ib_ring_empty(&ic->i_recv_ring))) { 460 queue_delayed_work(rds_wq, &conn->c_recv_w, 1); 461 } 462 if (can_wait) 463 cond_resched(); 464 } 465 466 /* 467 * We want to recycle several types of recv allocations, like incs and frags. 468 * To use this, the *_free() function passes in the ptr to a list_head within 469 * the recyclee, as well as the cache to put it on. 470 * 471 * First, we put the memory on a percpu list. When this reaches a certain size, 472 * We move it to an intermediate non-percpu list in a lockless manner, with some 473 * xchg/compxchg wizardry. 474 * 475 * N.B. Instead of a list_head as the anchor, we use a single pointer, which can 476 * be NULL and xchg'd. The list is actually empty when the pointer is NULL, and 477 * list_empty() will return true with one element is actually present. 478 */ 479 static void rds_ib_recv_cache_put(struct list_head *new_item, 480 struct rds_ib_refill_cache *cache) 481 { 482 unsigned long flags; 483 struct list_head *old, *chpfirst; 484 485 local_irq_save(flags); 486 487 chpfirst = __this_cpu_read(cache->percpu->first); 488 if (!chpfirst) 489 INIT_LIST_HEAD(new_item); 490 else /* put on front */ 491 list_add_tail(new_item, chpfirst); 492 493 __this_cpu_write(cache->percpu->first, new_item); 494 __this_cpu_inc(cache->percpu->count); 495 496 if (__this_cpu_read(cache->percpu->count) < RDS_IB_RECYCLE_BATCH_COUNT) 497 goto end; 498 499 /* 500 * Return our per-cpu first list to the cache's xfer by atomically 501 * grabbing the current xfer list, appending it to our per-cpu list, 502 * and then atomically returning that entire list back to the 503 * cache's xfer list as long as it's still empty. 504 */ 505 do { 506 old = xchg(&cache->xfer, NULL); 507 if (old) 508 list_splice_entire_tail(old, chpfirst); 509 old = cmpxchg(&cache->xfer, NULL, chpfirst); 510 } while (old); 511 512 513 __this_cpu_write(cache->percpu->first, NULL); 514 __this_cpu_write(cache->percpu->count, 0); 515 end: 516 local_irq_restore(flags); 517 } 518 519 static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache) 520 { 521 struct list_head *head = cache->ready; 522 523 if (head) { 524 if (!list_empty(head)) { 525 cache->ready = head->next; 526 list_del_init(head); 527 } else 528 cache->ready = NULL; 529 } 530 531 return head; 532 } 533 534 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to) 535 { 536 struct rds_ib_incoming *ibinc; 537 struct rds_page_frag *frag; 538 unsigned long to_copy; 539 unsigned long frag_off = 0; 540 int copied = 0; 541 int ret; 542 u32 len; 543 544 ibinc = container_of(inc, struct rds_ib_incoming, ii_inc); 545 frag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item); 546 len = be32_to_cpu(inc->i_hdr.h_len); 547 548 while (iov_iter_count(to) && copied < len) { 549 if (frag_off == RDS_FRAG_SIZE) { 550 frag = list_entry(frag->f_item.next, 551 struct rds_page_frag, f_item); 552 frag_off = 0; 553 } 554 to_copy = min_t(unsigned long, iov_iter_count(to), 555 RDS_FRAG_SIZE - frag_off); 556 to_copy = min_t(unsigned long, to_copy, len - copied); 557 558 /* XXX needs + offset for multiple recvs per page */ 559 rds_stats_add(s_copy_to_user, to_copy); 560 ret = copy_page_to_iter(sg_page(&frag->f_sg), 561 frag->f_sg.offset + frag_off, 562 to_copy, 563 to); 564 if (ret != to_copy) 565 return -EFAULT; 566 567 frag_off += to_copy; 568 copied += to_copy; 569 } 570 571 return copied; 572 } 573 574 /* ic starts out kzalloc()ed */ 575 void rds_ib_recv_init_ack(struct rds_ib_connection *ic) 576 { 577 struct ib_send_wr *wr = &ic->i_ack_wr; 578 struct ib_sge *sge = &ic->i_ack_sge; 579 580 sge->addr = ic->i_ack_dma; 581 sge->length = sizeof(struct rds_header); 582 sge->lkey = ic->i_pd->local_dma_lkey; 583 584 wr->sg_list = sge; 585 wr->num_sge = 1; 586 wr->opcode = IB_WR_SEND; 587 wr->wr_id = RDS_IB_ACK_WR_ID; 588 wr->send_flags = IB_SEND_SIGNALED | IB_SEND_SOLICITED; 589 } 590 591 /* 592 * You'd think that with reliable IB connections you wouldn't need to ack 593 * messages that have been received. The problem is that IB hardware generates 594 * an ack message before it has DMAed the message into memory. This creates a 595 * potential message loss if the HCA is disabled for any reason between when it 596 * sends the ack and before the message is DMAed and processed. This is only a 597 * potential issue if another HCA is available for fail-over. 598 * 599 * When the remote host receives our ack they'll free the sent message from 600 * their send queue. To decrease the latency of this we always send an ack 601 * immediately after we've received messages. 602 * 603 * For simplicity, we only have one ack in flight at a time. This puts 604 * pressure on senders to have deep enough send queues to absorb the latency of 605 * a single ack frame being in flight. This might not be good enough. 606 * 607 * This is implemented by have a long-lived send_wr and sge which point to a 608 * statically allocated ack frame. This ack wr does not fall under the ring 609 * accounting that the tx and rx wrs do. The QP attribute specifically makes 610 * room for it beyond the ring size. Send completion notices its special 611 * wr_id and avoids working with the ring in that case. 612 */ 613 #ifndef KERNEL_HAS_ATOMIC64 614 void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required) 615 { 616 unsigned long flags; 617 618 spin_lock_irqsave(&ic->i_ack_lock, flags); 619 ic->i_ack_next = seq; 620 if (ack_required) 621 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 622 spin_unlock_irqrestore(&ic->i_ack_lock, flags); 623 } 624 625 static u64 rds_ib_get_ack(struct rds_ib_connection *ic) 626 { 627 unsigned long flags; 628 u64 seq; 629 630 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 631 632 spin_lock_irqsave(&ic->i_ack_lock, flags); 633 seq = ic->i_ack_next; 634 spin_unlock_irqrestore(&ic->i_ack_lock, flags); 635 636 return seq; 637 } 638 #else 639 void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required) 640 { 641 atomic64_set(&ic->i_ack_next, seq); 642 if (ack_required) { 643 smp_mb__before_atomic(); 644 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 645 } 646 } 647 648 static u64 rds_ib_get_ack(struct rds_ib_connection *ic) 649 { 650 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 651 smp_mb__after_atomic(); 652 653 return atomic64_read(&ic->i_ack_next); 654 } 655 #endif 656 657 658 static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits) 659 { 660 struct rds_header *hdr = ic->i_ack; 661 u64 seq; 662 int ret; 663 664 seq = rds_ib_get_ack(ic); 665 666 rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq); 667 668 ib_dma_sync_single_for_cpu(ic->rds_ibdev->dev, ic->i_ack_dma, 669 sizeof(*hdr), DMA_TO_DEVICE); 670 rds_message_populate_header(hdr, 0, 0, 0); 671 hdr->h_ack = cpu_to_be64(seq); 672 hdr->h_credit = adv_credits; 673 rds_message_make_checksum(hdr); 674 ib_dma_sync_single_for_device(ic->rds_ibdev->dev, ic->i_ack_dma, 675 sizeof(*hdr), DMA_TO_DEVICE); 676 677 ic->i_ack_queued = jiffies; 678 679 ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, NULL); 680 if (unlikely(ret)) { 681 /* Failed to send. Release the WR, and 682 * force another ACK. 683 */ 684 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags); 685 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 686 687 rds_ib_stats_inc(s_ib_ack_send_failure); 688 689 rds_ib_conn_error(ic->conn, "sending ack failed\n"); 690 } else 691 rds_ib_stats_inc(s_ib_ack_sent); 692 } 693 694 /* 695 * There are 3 ways of getting acknowledgements to the peer: 696 * 1. We call rds_ib_attempt_ack from the recv completion handler 697 * to send an ACK-only frame. 698 * However, there can be only one such frame in the send queue 699 * at any time, so we may have to postpone it. 700 * 2. When another (data) packet is transmitted while there's 701 * an ACK in the queue, we piggyback the ACK sequence number 702 * on the data packet. 703 * 3. If the ACK WR is done sending, we get called from the 704 * send queue completion handler, and check whether there's 705 * another ACK pending (postponed because the WR was on the 706 * queue). If so, we transmit it. 707 * 708 * We maintain 2 variables: 709 * - i_ack_flags, which keeps track of whether the ACK WR 710 * is currently in the send queue or not (IB_ACK_IN_FLIGHT) 711 * - i_ack_next, which is the last sequence number we received 712 * 713 * Potentially, send queue and receive queue handlers can run concurrently. 714 * It would be nice to not have to use a spinlock to synchronize things, 715 * but the one problem that rules this out is that 64bit updates are 716 * not atomic on all platforms. Things would be a lot simpler if 717 * we had atomic64 or maybe cmpxchg64 everywhere. 718 * 719 * Reconnecting complicates this picture just slightly. When we 720 * reconnect, we may be seeing duplicate packets. The peer 721 * is retransmitting them, because it hasn't seen an ACK for 722 * them. It is important that we ACK these. 723 * 724 * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with 725 * this flag set *MUST* be acknowledged immediately. 726 */ 727 728 /* 729 * When we get here, we're called from the recv queue handler. 730 * Check whether we ought to transmit an ACK. 731 */ 732 void rds_ib_attempt_ack(struct rds_ib_connection *ic) 733 { 734 unsigned int adv_credits; 735 736 if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags)) 737 return; 738 739 if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) { 740 rds_ib_stats_inc(s_ib_ack_send_delayed); 741 return; 742 } 743 744 /* Can we get a send credit? */ 745 if (!rds_ib_send_grab_credits(ic, 1, &adv_credits, 0, RDS_MAX_ADV_CREDIT)) { 746 rds_ib_stats_inc(s_ib_tx_throttle); 747 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags); 748 return; 749 } 750 751 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 752 rds_ib_send_ack(ic, adv_credits); 753 } 754 755 /* 756 * We get here from the send completion handler, when the 757 * adapter tells us the ACK frame was sent. 758 */ 759 void rds_ib_ack_send_complete(struct rds_ib_connection *ic) 760 { 761 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags); 762 rds_ib_attempt_ack(ic); 763 } 764 765 /* 766 * This is called by the regular xmit code when it wants to piggyback 767 * an ACK on an outgoing frame. 768 */ 769 u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic) 770 { 771 if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags)) 772 rds_ib_stats_inc(s_ib_ack_send_piggybacked); 773 return rds_ib_get_ack(ic); 774 } 775 776 /* 777 * It's kind of lame that we're copying from the posted receive pages into 778 * long-lived bitmaps. We could have posted the bitmaps and rdma written into 779 * them. But receiving new congestion bitmaps should be a *rare* event, so 780 * hopefully we won't need to invest that complexity in making it more 781 * efficient. By copying we can share a simpler core with TCP which has to 782 * copy. 783 */ 784 static void rds_ib_cong_recv(struct rds_connection *conn, 785 struct rds_ib_incoming *ibinc) 786 { 787 struct rds_cong_map *map; 788 unsigned int map_off; 789 unsigned int map_page; 790 struct rds_page_frag *frag; 791 unsigned long frag_off; 792 unsigned long to_copy; 793 unsigned long copied; 794 __le64 uncongested = 0; 795 void *addr; 796 797 /* catch completely corrupt packets */ 798 if (be32_to_cpu(ibinc->ii_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES) 799 return; 800 801 map = conn->c_fcong; 802 map_page = 0; 803 map_off = 0; 804 805 frag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item); 806 frag_off = 0; 807 808 copied = 0; 809 810 while (copied < RDS_CONG_MAP_BYTES) { 811 __le64 *src, *dst; 812 unsigned int k; 813 814 to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off); 815 BUG_ON(to_copy & 7); /* Must be 64bit aligned. */ 816 817 addr = kmap_atomic(sg_page(&frag->f_sg)); 818 819 src = addr + frag->f_sg.offset + frag_off; 820 dst = (void *)map->m_page_addrs[map_page] + map_off; 821 for (k = 0; k < to_copy; k += 8) { 822 /* Record ports that became uncongested, ie 823 * bits that changed from 0 to 1. */ 824 uncongested |= ~(*src) & *dst; 825 *dst++ = *src++; 826 } 827 kunmap_atomic(addr); 828 829 copied += to_copy; 830 831 map_off += to_copy; 832 if (map_off == PAGE_SIZE) { 833 map_off = 0; 834 map_page++; 835 } 836 837 frag_off += to_copy; 838 if (frag_off == RDS_FRAG_SIZE) { 839 frag = list_entry(frag->f_item.next, 840 struct rds_page_frag, f_item); 841 frag_off = 0; 842 } 843 } 844 845 /* the congestion map is in little endian order */ 846 rds_cong_map_updated(map, le64_to_cpu(uncongested)); 847 } 848 849 static void rds_ib_process_recv(struct rds_connection *conn, 850 struct rds_ib_recv_work *recv, u32 data_len, 851 struct rds_ib_ack_state *state) 852 { 853 struct rds_ib_connection *ic = conn->c_transport_data; 854 struct rds_ib_incoming *ibinc = ic->i_ibinc; 855 struct rds_header *ihdr, *hdr; 856 dma_addr_t dma_addr = ic->i_recv_hdrs_dma[recv - ic->i_recvs]; 857 858 /* XXX shut down the connection if port 0,0 are seen? */ 859 860 rdsdebug("ic %p ibinc %p recv %p byte len %u\n", ic, ibinc, recv, 861 data_len); 862 863 if (data_len < sizeof(struct rds_header)) { 864 rds_ib_conn_error(conn, "incoming message " 865 "from %pI6c didn't include a " 866 "header, disconnecting and " 867 "reconnecting\n", 868 &conn->c_faddr); 869 return; 870 } 871 data_len -= sizeof(struct rds_header); 872 873 ihdr = ic->i_recv_hdrs[recv - ic->i_recvs]; 874 875 ib_dma_sync_single_for_cpu(ic->rds_ibdev->dev, dma_addr, 876 sizeof(*ihdr), DMA_FROM_DEVICE); 877 /* Validate the checksum. */ 878 if (!rds_message_verify_checksum(ihdr)) { 879 rds_ib_conn_error(conn, "incoming message " 880 "from %pI6c has corrupted header - " 881 "forcing a reconnect\n", 882 &conn->c_faddr); 883 rds_stats_inc(s_recv_drop_bad_checksum); 884 goto done; 885 } 886 887 /* Process the ACK sequence which comes with every packet */ 888 state->ack_recv = be64_to_cpu(ihdr->h_ack); 889 state->ack_recv_valid = 1; 890 891 /* Process the credits update if there was one */ 892 if (ihdr->h_credit) 893 rds_ib_send_add_credits(conn, ihdr->h_credit); 894 895 if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) { 896 /* This is an ACK-only packet. The fact that it gets 897 * special treatment here is that historically, ACKs 898 * were rather special beasts. 899 */ 900 rds_ib_stats_inc(s_ib_ack_received); 901 902 /* 903 * Usually the frags make their way on to incs and are then freed as 904 * the inc is freed. We don't go that route, so we have to drop the 905 * page ref ourselves. We can't just leave the page on the recv 906 * because that confuses the dma mapping of pages and each recv's use 907 * of a partial page. 908 * 909 * FIXME: Fold this into the code path below. 910 */ 911 rds_ib_frag_free(ic, recv->r_frag); 912 recv->r_frag = NULL; 913 goto done; 914 } 915 916 /* 917 * If we don't already have an inc on the connection then this 918 * fragment has a header and starts a message.. copy its header 919 * into the inc and save the inc so we can hang upcoming fragments 920 * off its list. 921 */ 922 if (!ibinc) { 923 ibinc = recv->r_ibinc; 924 recv->r_ibinc = NULL; 925 ic->i_ibinc = ibinc; 926 927 hdr = &ibinc->ii_inc.i_hdr; 928 ibinc->ii_inc.i_rx_lat_trace[RDS_MSG_RX_HDR] = 929 local_clock(); 930 memcpy(hdr, ihdr, sizeof(*hdr)); 931 ic->i_recv_data_rem = be32_to_cpu(hdr->h_len); 932 ibinc->ii_inc.i_rx_lat_trace[RDS_MSG_RX_START] = 933 local_clock(); 934 935 rdsdebug("ic %p ibinc %p rem %u flag 0x%x\n", ic, ibinc, 936 ic->i_recv_data_rem, hdr->h_flags); 937 } else { 938 hdr = &ibinc->ii_inc.i_hdr; 939 /* We can't just use memcmp here; fragments of a 940 * single message may carry different ACKs */ 941 if (hdr->h_sequence != ihdr->h_sequence || 942 hdr->h_len != ihdr->h_len || 943 hdr->h_sport != ihdr->h_sport || 944 hdr->h_dport != ihdr->h_dport) { 945 rds_ib_conn_error(conn, 946 "fragment header mismatch; forcing reconnect\n"); 947 goto done; 948 } 949 } 950 951 list_add_tail(&recv->r_frag->f_item, &ibinc->ii_frags); 952 recv->r_frag = NULL; 953 954 if (ic->i_recv_data_rem > RDS_FRAG_SIZE) 955 ic->i_recv_data_rem -= RDS_FRAG_SIZE; 956 else { 957 ic->i_recv_data_rem = 0; 958 ic->i_ibinc = NULL; 959 960 if (ibinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP) { 961 rds_ib_cong_recv(conn, ibinc); 962 } else { 963 rds_recv_incoming(conn, &conn->c_faddr, &conn->c_laddr, 964 &ibinc->ii_inc, GFP_ATOMIC); 965 state->ack_next = be64_to_cpu(hdr->h_sequence); 966 state->ack_next_valid = 1; 967 } 968 969 /* Evaluate the ACK_REQUIRED flag *after* we received 970 * the complete frame, and after bumping the next_rx 971 * sequence. */ 972 if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) { 973 rds_stats_inc(s_recv_ack_required); 974 state->ack_required = 1; 975 } 976 977 rds_inc_put(&ibinc->ii_inc); 978 } 979 done: 980 ib_dma_sync_single_for_device(ic->rds_ibdev->dev, dma_addr, 981 sizeof(*ihdr), DMA_FROM_DEVICE); 982 } 983 984 void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, 985 struct ib_wc *wc, 986 struct rds_ib_ack_state *state) 987 { 988 struct rds_connection *conn = ic->conn; 989 struct rds_ib_recv_work *recv; 990 991 rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", 992 (unsigned long long)wc->wr_id, wc->status, 993 ib_wc_status_msg(wc->status), wc->byte_len, 994 be32_to_cpu(wc->ex.imm_data)); 995 996 rds_ib_stats_inc(s_ib_rx_cq_event); 997 recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; 998 ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, 999 DMA_FROM_DEVICE); 1000 1001 /* Also process recvs in connecting state because it is possible 1002 * to get a recv completion _before_ the rdmacm ESTABLISHED 1003 * event is processed. 1004 */ 1005 if (wc->status == IB_WC_SUCCESS) { 1006 rds_ib_process_recv(conn, recv, wc->byte_len, state); 1007 } else { 1008 /* We expect errors as the qp is drained during shutdown */ 1009 if (rds_conn_up(conn) || rds_conn_connecting(conn)) 1010 rds_ib_conn_error(conn, "recv completion on <%pI6c,%pI6c, %d> had status %u (%s), vendor err 0x%x, disconnecting and reconnecting\n", 1011 &conn->c_laddr, &conn->c_faddr, 1012 conn->c_tos, wc->status, 1013 ib_wc_status_msg(wc->status), 1014 wc->vendor_err); 1015 } 1016 1017 /* rds_ib_process_recv() doesn't always consume the frag, and 1018 * we might not have called it at all if the wc didn't indicate 1019 * success. We already unmapped the frag's pages, though, and 1020 * the following rds_ib_ring_free() call tells the refill path 1021 * that it will not find an allocated frag here. Make sure we 1022 * keep that promise by freeing a frag that's still on the ring. 1023 */ 1024 if (recv->r_frag) { 1025 rds_ib_frag_free(ic, recv->r_frag); 1026 recv->r_frag = NULL; 1027 } 1028 rds_ib_ring_free(&ic->i_recv_ring, 1); 1029 1030 /* If we ever end up with a really empty receive ring, we're 1031 * in deep trouble, as the sender will definitely see RNR 1032 * timeouts. */ 1033 if (rds_ib_ring_empty(&ic->i_recv_ring)) 1034 rds_ib_stats_inc(s_ib_rx_ring_empty); 1035 1036 if (rds_ib_ring_low(&ic->i_recv_ring)) { 1037 rds_ib_recv_refill(conn, 0, GFP_NOWAIT | __GFP_NOWARN); 1038 rds_ib_stats_inc(s_ib_rx_refill_from_cq); 1039 } 1040 } 1041 1042 int rds_ib_recv_path(struct rds_conn_path *cp) 1043 { 1044 struct rds_connection *conn = cp->cp_conn; 1045 struct rds_ib_connection *ic = conn->c_transport_data; 1046 1047 rdsdebug("conn %p\n", conn); 1048 if (rds_conn_up(conn)) { 1049 rds_ib_attempt_ack(ic); 1050 rds_ib_recv_refill(conn, 0, GFP_KERNEL); 1051 rds_ib_stats_inc(s_ib_rx_refill_from_thread); 1052 } 1053 1054 return 0; 1055 } 1056 1057 int rds_ib_recv_init(void) 1058 { 1059 struct sysinfo si; 1060 int ret = -ENOMEM; 1061 1062 /* Default to 30% of all available RAM for recv memory */ 1063 si_meminfo(&si); 1064 rds_ib_sysctl_max_recv_allocation = si.totalram / 3 * PAGE_SIZE / RDS_FRAG_SIZE; 1065 1066 rds_ib_incoming_slab = 1067 kmem_cache_create_usercopy("rds_ib_incoming", 1068 sizeof(struct rds_ib_incoming), 1069 0, SLAB_HWCACHE_ALIGN, 1070 offsetof(struct rds_ib_incoming, 1071 ii_inc.i_usercopy), 1072 sizeof(struct rds_inc_usercopy), 1073 NULL); 1074 if (!rds_ib_incoming_slab) 1075 goto out; 1076 1077 rds_ib_frag_slab = kmem_cache_create("rds_ib_frag", 1078 sizeof(struct rds_page_frag), 1079 0, SLAB_HWCACHE_ALIGN, NULL); 1080 if (!rds_ib_frag_slab) { 1081 kmem_cache_destroy(rds_ib_incoming_slab); 1082 rds_ib_incoming_slab = NULL; 1083 } else 1084 ret = 0; 1085 out: 1086 return ret; 1087 } 1088 1089 void rds_ib_recv_exit(void) 1090 { 1091 WARN_ON(atomic_read(&rds_ib_allocation)); 1092 1093 kmem_cache_destroy(rds_ib_incoming_slab); 1094 kmem_cache_destroy(rds_ib_frag_slab); 1095 } 1096
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.