~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/net/sunrpc/xprtrdma/svc_rdma_rw.c

Version: ~ [ linux-6.11-rc3 ] ~ [ linux-6.10.4 ] ~ [ linux-6.9.12 ] ~ [ linux-6.8.12 ] ~ [ linux-6.7.12 ] ~ [ linux-6.6.45 ] ~ [ linux-6.5.13 ] ~ [ linux-6.4.16 ] ~ [ linux-6.3.13 ] ~ [ linux-6.2.16 ] ~ [ linux-6.1.104 ] ~ [ linux-6.0.19 ] ~ [ linux-5.19.17 ] ~ [ linux-5.18.19 ] ~ [ linux-5.17.15 ] ~ [ linux-5.16.20 ] ~ [ linux-5.15.164 ] ~ [ linux-5.14.21 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.223 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.281 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.319 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.336 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.337 ] ~ [ linux-4.4.302 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.9 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 // SPDX-License-Identifier: GPL-2.0
  2 /*
  3  * Copyright (c) 2016-2018 Oracle.  All rights reserved.
  4  *
  5  * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
  6  */
  7 
  8 #include <rdma/rw.h>
  9 
 10 #include <linux/sunrpc/xdr.h>
 11 #include <linux/sunrpc/rpc_rdma.h>
 12 #include <linux/sunrpc/svc_rdma.h>
 13 
 14 #include "xprt_rdma.h"
 15 #include <trace/events/rpcrdma.h>
 16 
 17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
 18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
 19 
 20 /* Each R/W context contains state for one chain of RDMA Read or
 21  * Write Work Requests.
 22  *
 23  * Each WR chain handles a single contiguous server-side buffer,
 24  * because scatterlist entries after the first have to start on
 25  * page alignment. xdr_buf iovecs cannot guarantee alignment.
 26  *
 27  * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
 28  * from a client may contain a unique R_key, so each WR chain moves
 29  * up to one segment at a time.
 30  *
 31  * The scatterlist makes this data structure over 4KB in size. To
 32  * make it less likely to fail, and to handle the allocation for
 33  * smaller I/O requests without disabling bottom-halves, these
 34  * contexts are created on demand, but cached and reused until the
 35  * controlling svcxprt_rdma is destroyed.
 36  */
 37 struct svc_rdma_rw_ctxt {
 38         struct llist_node       rw_node;
 39         struct list_head        rw_list;
 40         struct rdma_rw_ctx      rw_ctx;
 41         unsigned int            rw_nents;
 42         unsigned int            rw_first_sgl_nents;
 43         struct sg_table         rw_sg_table;
 44         struct scatterlist      rw_first_sgl[];
 45 };
 46 
 47 static inline struct svc_rdma_rw_ctxt *
 48 svc_rdma_next_ctxt(struct list_head *list)
 49 {
 50         return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
 51                                         rw_list);
 52 }
 53 
 54 static struct svc_rdma_rw_ctxt *
 55 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
 56 {
 57         struct ib_device *dev = rdma->sc_cm_id->device;
 58         unsigned int first_sgl_nents = dev->attrs.max_send_sge;
 59         struct svc_rdma_rw_ctxt *ctxt;
 60         struct llist_node *node;
 61 
 62         spin_lock(&rdma->sc_rw_ctxt_lock);
 63         node = llist_del_first(&rdma->sc_rw_ctxts);
 64         spin_unlock(&rdma->sc_rw_ctxt_lock);
 65         if (node) {
 66                 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
 67         } else {
 68                 ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents),
 69                                     GFP_KERNEL, ibdev_to_node(dev));
 70                 if (!ctxt)
 71                         goto out_noctx;
 72 
 73                 INIT_LIST_HEAD(&ctxt->rw_list);
 74                 ctxt->rw_first_sgl_nents = first_sgl_nents;
 75         }
 76 
 77         ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
 78         if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
 79                                    ctxt->rw_sg_table.sgl,
 80                                    first_sgl_nents))
 81                 goto out_free;
 82         return ctxt;
 83 
 84 out_free:
 85         kfree(ctxt);
 86 out_noctx:
 87         trace_svcrdma_rwctx_empty(rdma, sges);
 88         return NULL;
 89 }
 90 
 91 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt,
 92                                    struct llist_head *list)
 93 {
 94         sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents);
 95         llist_add(&ctxt->rw_node, list);
 96 }
 97 
 98 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
 99                                  struct svc_rdma_rw_ctxt *ctxt)
100 {
101         __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts);
102 }
103 
104 /**
105  * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
106  * @rdma: transport about to be destroyed
107  *
108  */
109 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
110 {
111         struct svc_rdma_rw_ctxt *ctxt;
112         struct llist_node *node;
113 
114         while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) {
115                 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
116                 kfree(ctxt);
117         }
118 }
119 
120 /**
121  * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
122  * @rdma: controlling transport instance
123  * @ctxt: R/W context to prepare
124  * @offset: RDMA offset
125  * @handle: RDMA tag/handle
126  * @direction: I/O direction
127  *
128  * Returns on success, the number of WQEs that will be needed
129  * on the workqueue, or a negative errno.
130  */
131 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
132                                 struct svc_rdma_rw_ctxt *ctxt,
133                                 u64 offset, u32 handle,
134                                 enum dma_data_direction direction)
135 {
136         int ret;
137 
138         ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num,
139                                ctxt->rw_sg_table.sgl, ctxt->rw_nents,
140                                0, offset, handle, direction);
141         if (unlikely(ret < 0)) {
142                 trace_svcrdma_dma_map_rw_err(rdma, offset, handle,
143                                              ctxt->rw_nents, ret);
144                 svc_rdma_put_rw_ctxt(rdma, ctxt);
145         }
146         return ret;
147 }
148 
149 /**
150  * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt
151  * @rdma: controlling transport instance
152  * @cc: svc_rdma_chunk_ctxt to be initialized
153  */
154 void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
155                       struct svc_rdma_chunk_ctxt *cc)
156 {
157         struct rpc_rdma_cid *cid = &cc->cc_cid;
158 
159         if (unlikely(!cid->ci_completion_id))
160                 svc_rdma_send_cid_init(rdma, cid);
161 
162         INIT_LIST_HEAD(&cc->cc_rwctxts);
163         cc->cc_sqecount = 0;
164 }
165 
166 /**
167  * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
168  * @rdma: controlling transport instance
169  * @cc: svc_rdma_chunk_ctxt to be released
170  * @dir: DMA direction
171  */
172 void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
173                          struct svc_rdma_chunk_ctxt *cc,
174                          enum dma_data_direction dir)
175 {
176         struct llist_node *first, *last;
177         struct svc_rdma_rw_ctxt *ctxt;
178         LLIST_HEAD(free);
179 
180         trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount);
181 
182         first = last = NULL;
183         while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
184                 list_del(&ctxt->rw_list);
185 
186                 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
187                                     rdma->sc_port_num, ctxt->rw_sg_table.sgl,
188                                     ctxt->rw_nents, dir);
189                 __svc_rdma_put_rw_ctxt(ctxt, &free);
190 
191                 ctxt->rw_node.next = first;
192                 first = &ctxt->rw_node;
193                 if (!last)
194                         last = first;
195         }
196         if (first)
197                 llist_add_batch(first, last, &rdma->sc_rw_ctxts);
198 }
199 
200 static struct svc_rdma_write_info *
201 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
202                           const struct svc_rdma_chunk *chunk)
203 {
204         struct svc_rdma_write_info *info;
205 
206         info = kzalloc_node(sizeof(*info), GFP_KERNEL,
207                             ibdev_to_node(rdma->sc_cm_id->device));
208         if (!info)
209                 return info;
210 
211         info->wi_rdma = rdma;
212         info->wi_chunk = chunk;
213         svc_rdma_cc_init(rdma, &info->wi_cc);
214         info->wi_cc.cc_cqe.done = svc_rdma_write_done;
215         return info;
216 }
217 
218 static void svc_rdma_write_info_free_async(struct work_struct *work)
219 {
220         struct svc_rdma_write_info *info;
221 
222         info = container_of(work, struct svc_rdma_write_info, wi_work);
223         svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE);
224         kfree(info);
225 }
226 
227 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
228 {
229         INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async);
230         queue_work(svcrdma_wq, &info->wi_work);
231 }
232 
233 /**
234  * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
235  * @rdma: controlling transport
236  * @ctxt: Send context that is being released
237  */
238 void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
239                                   struct svc_rdma_send_ctxt *ctxt)
240 {
241         struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc;
242 
243         if (!cc->cc_sqecount)
244                 return;
245         svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE);
246 }
247 
248 /**
249  * svc_rdma_reply_done - Reply chunk Write completion handler
250  * @cq: controlling Completion Queue
251  * @wc: Work Completion report
252  *
253  * Pages under I/O are released by a subsequent Send completion.
254  */
255 static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
256 {
257         struct ib_cqe *cqe = wc->wr_cqe;
258         struct svc_rdma_chunk_ctxt *cc =
259                         container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
260         struct svcxprt_rdma *rdma = cq->cq_context;
261 
262         switch (wc->status) {
263         case IB_WC_SUCCESS:
264                 trace_svcrdma_wc_reply(&cc->cc_cid);
265                 return;
266         case IB_WC_WR_FLUSH_ERR:
267                 trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
268                 break;
269         default:
270                 trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
271         }
272 
273         svc_xprt_deferred_close(&rdma->sc_xprt);
274 }
275 
276 /**
277  * svc_rdma_write_done - Write chunk completion
278  * @cq: controlling Completion Queue
279  * @wc: Work Completion
280  *
281  * Pages under I/O are freed by a subsequent Send completion.
282  */
283 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
284 {
285         struct svcxprt_rdma *rdma = cq->cq_context;
286         struct ib_cqe *cqe = wc->wr_cqe;
287         struct svc_rdma_chunk_ctxt *cc =
288                         container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
289         struct svc_rdma_write_info *info =
290                         container_of(cc, struct svc_rdma_write_info, wi_cc);
291 
292         switch (wc->status) {
293         case IB_WC_SUCCESS:
294                 trace_svcrdma_wc_write(&cc->cc_cid);
295                 break;
296         case IB_WC_WR_FLUSH_ERR:
297                 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
298                 break;
299         default:
300                 trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
301         }
302 
303         svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
304 
305         if (unlikely(wc->status != IB_WC_SUCCESS))
306                 svc_xprt_deferred_close(&rdma->sc_xprt);
307 
308         svc_rdma_write_info_free(info);
309 }
310 
311 /**
312  * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
313  * @cq: controlling Completion Queue
314  * @wc: Work Completion
315  *
316  */
317 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
318 {
319         struct svcxprt_rdma *rdma = cq->cq_context;
320         struct ib_cqe *cqe = wc->wr_cqe;
321         struct svc_rdma_chunk_ctxt *cc =
322                         container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
323         struct svc_rdma_recv_ctxt *ctxt;
324 
325         svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
326 
327         ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
328         switch (wc->status) {
329         case IB_WC_SUCCESS:
330                 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes,
331                                       cc->cc_posttime);
332 
333                 spin_lock(&rdma->sc_rq_dto_lock);
334                 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q);
335                 /* the unlock pairs with the smp_rmb in svc_xprt_ready */
336                 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
337                 spin_unlock(&rdma->sc_rq_dto_lock);
338                 svc_xprt_enqueue(&rdma->sc_xprt);
339                 return;
340         case IB_WC_WR_FLUSH_ERR:
341                 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
342                 break;
343         default:
344                 trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
345         }
346 
347         /* The RDMA Read has flushed, so the incoming RPC message
348          * cannot be constructed and must be dropped. Signal the
349          * loss to the client by closing the connection.
350          */
351         svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
352         svc_rdma_recv_ctxt_put(rdma, ctxt);
353         svc_xprt_deferred_close(&rdma->sc_xprt);
354 }
355 
356 /*
357  * Assumptions:
358  * - If ib_post_send() succeeds, only one completion is expected,
359  *   even if one or more WRs are flushed. This is true when posting
360  *   an rdma_rw_ctx or when posting a single signaled WR.
361  */
362 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma,
363                                     struct svc_rdma_chunk_ctxt *cc)
364 {
365         struct ib_send_wr *first_wr;
366         const struct ib_send_wr *bad_wr;
367         struct list_head *tmp;
368         struct ib_cqe *cqe;
369         int ret;
370 
371         might_sleep();
372 
373         if (cc->cc_sqecount > rdma->sc_sq_depth)
374                 return -EINVAL;
375 
376         first_wr = NULL;
377         cqe = &cc->cc_cqe;
378         list_for_each(tmp, &cc->cc_rwctxts) {
379                 struct svc_rdma_rw_ctxt *ctxt;
380 
381                 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
382                 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
383                                            rdma->sc_port_num, cqe, first_wr);
384                 cqe = NULL;
385         }
386 
387         do {
388                 if (atomic_sub_return(cc->cc_sqecount,
389                                       &rdma->sc_sq_avail) > 0) {
390                         cc->cc_posttime = ktime_get();
391                         ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
392                         if (ret)
393                                 break;
394                         return 0;
395                 }
396 
397                 percpu_counter_inc(&svcrdma_stat_sq_starve);
398                 trace_svcrdma_sq_full(rdma, &cc->cc_cid);
399                 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
400                 wait_event(rdma->sc_send_wait,
401                            atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
402                 trace_svcrdma_sq_retry(rdma, &cc->cc_cid);
403         } while (1);
404 
405         trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret);
406         svc_xprt_deferred_close(&rdma->sc_xprt);
407 
408         /* If even one was posted, there will be a completion. */
409         if (bad_wr != first_wr)
410                 return 0;
411 
412         atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
413         wake_up(&rdma->sc_send_wait);
414         return -ENOTCONN;
415 }
416 
417 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
418  */
419 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
420                                unsigned int len,
421                                struct svc_rdma_rw_ctxt *ctxt)
422 {
423         struct scatterlist *sg = ctxt->rw_sg_table.sgl;
424 
425         sg_set_buf(&sg[0], info->wi_base, len);
426         info->wi_base += len;
427 
428         ctxt->rw_nents = 1;
429 }
430 
431 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
432  */
433 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
434                                     unsigned int remaining,
435                                     struct svc_rdma_rw_ctxt *ctxt)
436 {
437         unsigned int sge_no, sge_bytes, page_off, page_no;
438         const struct xdr_buf *xdr = info->wi_xdr;
439         struct scatterlist *sg;
440         struct page **page;
441 
442         page_off = info->wi_next_off + xdr->page_base;
443         page_no = page_off >> PAGE_SHIFT;
444         page_off = offset_in_page(page_off);
445         page = xdr->pages + page_no;
446         info->wi_next_off += remaining;
447         sg = ctxt->rw_sg_table.sgl;
448         sge_no = 0;
449         do {
450                 sge_bytes = min_t(unsigned int, remaining,
451                                   PAGE_SIZE - page_off);
452                 sg_set_page(sg, *page, sge_bytes, page_off);
453 
454                 remaining -= sge_bytes;
455                 sg = sg_next(sg);
456                 page_off = 0;
457                 sge_no++;
458                 page++;
459         } while (remaining);
460 
461         ctxt->rw_nents = sge_no;
462 }
463 
464 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
465  * an RPC Reply.
466  */
467 static int
468 svc_rdma_build_writes(struct svc_rdma_write_info *info,
469                       void (*constructor)(struct svc_rdma_write_info *info,
470                                           unsigned int len,
471                                           struct svc_rdma_rw_ctxt *ctxt),
472                       unsigned int remaining)
473 {
474         struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
475         struct svcxprt_rdma *rdma = info->wi_rdma;
476         const struct svc_rdma_segment *seg;
477         struct svc_rdma_rw_ctxt *ctxt;
478         int ret;
479 
480         do {
481                 unsigned int write_len;
482                 u64 offset;
483 
484                 if (info->wi_seg_no >= info->wi_chunk->ch_segcount)
485                         goto out_overflow;
486 
487                 seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
488                 write_len = min(remaining, seg->rs_length - info->wi_seg_off);
489                 if (!write_len)
490                         goto out_overflow;
491                 ctxt = svc_rdma_get_rw_ctxt(rdma,
492                                             (write_len >> PAGE_SHIFT) + 2);
493                 if (!ctxt)
494                         return -ENOMEM;
495 
496                 constructor(info, write_len, ctxt);
497                 offset = seg->rs_offset + info->wi_seg_off;
498                 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
499                                            DMA_TO_DEVICE);
500                 if (ret < 0)
501                         return -EIO;
502                 percpu_counter_inc(&svcrdma_stat_write);
503 
504                 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
505                 cc->cc_sqecount += ret;
506                 if (write_len == seg->rs_length - info->wi_seg_off) {
507                         info->wi_seg_no++;
508                         info->wi_seg_off = 0;
509                 } else {
510                         info->wi_seg_off += write_len;
511                 }
512                 remaining -= write_len;
513         } while (remaining);
514 
515         return 0;
516 
517 out_overflow:
518         trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no,
519                                      info->wi_chunk->ch_segcount);
520         return -E2BIG;
521 }
522 
523 /**
524  * svc_rdma_iov_write - Construct RDMA Writes from an iov
525  * @info: pointer to write arguments
526  * @iov: kvec to write
527  *
528  * Returns:
529  *   On success, returns zero
530  *   %-E2BIG if the client-provided Write chunk is too small
531  *   %-ENOMEM if a resource has been exhausted
532  *   %-EIO if an rdma-rw error occurred
533  */
534 static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
535                               const struct kvec *iov)
536 {
537         info->wi_base = iov->iov_base;
538         return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
539                                      iov->iov_len);
540 }
541 
542 /**
543  * svc_rdma_pages_write - Construct RDMA Writes from pages
544  * @info: pointer to write arguments
545  * @xdr: xdr_buf with pages to write
546  * @offset: offset into the content of @xdr
547  * @length: number of bytes to write
548  *
549  * Returns:
550  *   On success, returns zero
551  *   %-E2BIG if the client-provided Write chunk is too small
552  *   %-ENOMEM if a resource has been exhausted
553  *   %-EIO if an rdma-rw error occurred
554  */
555 static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
556                                 const struct xdr_buf *xdr,
557                                 unsigned int offset,
558                                 unsigned long length)
559 {
560         info->wi_xdr = xdr;
561         info->wi_next_off = offset - xdr->head[0].iov_len;
562         return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
563                                      length);
564 }
565 
566 /**
567  * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
568  * @xdr: xdr_buf to write
569  * @data: pointer to write arguments
570  *
571  * Returns:
572  *   On success, returns zero
573  *   %-E2BIG if the client-provided Write chunk is too small
574  *   %-ENOMEM if a resource has been exhausted
575  *   %-EIO if an rdma-rw error occurred
576  */
577 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
578 {
579         struct svc_rdma_write_info *info = data;
580         int ret;
581 
582         if (xdr->head[0].iov_len) {
583                 ret = svc_rdma_iov_write(info, &xdr->head[0]);
584                 if (ret < 0)
585                         return ret;
586         }
587 
588         if (xdr->page_len) {
589                 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
590                                            xdr->page_len);
591                 if (ret < 0)
592                         return ret;
593         }
594 
595         if (xdr->tail[0].iov_len) {
596                 ret = svc_rdma_iov_write(info, &xdr->tail[0]);
597                 if (ret < 0)
598                         return ret;
599         }
600 
601         return xdr->len;
602 }
603 
604 static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
605                                      const struct svc_rdma_chunk *chunk,
606                                      const struct xdr_buf *xdr)
607 {
608         struct svc_rdma_write_info *info;
609         struct svc_rdma_chunk_ctxt *cc;
610         struct xdr_buf payload;
611         int ret;
612 
613         if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
614                                chunk->ch_payload_length))
615                 return -EMSGSIZE;
616 
617         info = svc_rdma_write_info_alloc(rdma, chunk);
618         if (!info)
619                 return -ENOMEM;
620         cc = &info->wi_cc;
621 
622         ret = svc_rdma_xb_write(&payload, info);
623         if (ret != payload.len)
624                 goto out_err;
625 
626         trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
627         ret = svc_rdma_post_chunk_ctxt(rdma, cc);
628         if (ret < 0)
629                 goto out_err;
630         return 0;
631 
632 out_err:
633         svc_rdma_write_info_free(info);
634         return ret;
635 }
636 
637 /**
638  * svc_rdma_send_write_list - Send all chunks on the Write list
639  * @rdma: controlling RDMA transport
640  * @rctxt: Write list provisioned by the client
641  * @xdr: xdr_buf containing an RPC Reply message
642  *
643  * Returns zero on success, or a negative errno if one or more
644  * Write chunks could not be sent.
645  */
646 int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
647                              const struct svc_rdma_recv_ctxt *rctxt,
648                              const struct xdr_buf *xdr)
649 {
650         struct svc_rdma_chunk *chunk;
651         int ret;
652 
653         pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
654                 if (!chunk->ch_payload_length)
655                         break;
656                 ret = svc_rdma_send_write_chunk(rdma, chunk, xdr);
657                 if (ret < 0)
658                         return ret;
659         }
660         return 0;
661 }
662 
663 /**
664  * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
665  * @rdma: controlling RDMA transport
666  * @write_pcl: Write chunk list provided by client
667  * @reply_pcl: Reply chunk provided by client
668  * @sctxt: Send WR resources
669  * @xdr: xdr_buf containing an RPC Reply
670  *
671  * Returns a non-negative number of bytes the chunk consumed, or
672  *      %-E2BIG if the payload was larger than the Reply chunk,
673  *      %-EINVAL if client provided too many segments,
674  *      %-ENOMEM if rdma_rw context pool was exhausted,
675  *      %-ENOTCONN if posting failed (connection is lost),
676  *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
677  */
678 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
679                                  const struct svc_rdma_pcl *write_pcl,
680                                  const struct svc_rdma_pcl *reply_pcl,
681                                  struct svc_rdma_send_ctxt *sctxt,
682                                  const struct xdr_buf *xdr)
683 {
684         struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
685         struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
686         struct ib_send_wr *first_wr;
687         struct list_head *pos;
688         struct ib_cqe *cqe;
689         int ret;
690 
691         info->wi_rdma = rdma;
692         info->wi_chunk = pcl_first_chunk(reply_pcl);
693         info->wi_seg_off = 0;
694         info->wi_seg_no = 0;
695         info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
696 
697         ret = pcl_process_nonpayloads(write_pcl, xdr,
698                                       svc_rdma_xb_write, info);
699         if (ret < 0)
700                 return ret;
701 
702         first_wr = sctxt->sc_wr_chain;
703         cqe = &cc->cc_cqe;
704         list_for_each(pos, &cc->cc_rwctxts) {
705                 struct svc_rdma_rw_ctxt *rwc;
706 
707                 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
708                 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
709                                            rdma->sc_port_num, cqe, first_wr);
710                 cqe = NULL;
711         }
712         sctxt->sc_wr_chain = first_wr;
713         sctxt->sc_sqecount += cc->cc_sqecount;
714 
715         trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
716         return xdr->len;
717 }
718 
719 /**
720  * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
721  * @rqstp: RPC transaction context
722  * @head: context for ongoing I/O
723  * @segment: co-ordinates of remote memory to be read
724  *
725  * Returns:
726  *   %0: the Read WR chain was constructed successfully
727  *   %-EINVAL: there were not enough rq_pages to finish
728  *   %-ENOMEM: allocating a local resources failed
729  *   %-EIO: a DMA mapping error occurred
730  */
731 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp,
732                                        struct svc_rdma_recv_ctxt *head,
733                                        const struct svc_rdma_segment *segment)
734 {
735         struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp);
736         struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
737         unsigned int sge_no, seg_len, len;
738         struct svc_rdma_rw_ctxt *ctxt;
739         struct scatterlist *sg;
740         int ret;
741 
742         len = segment->rs_length;
743         sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT;
744         ctxt = svc_rdma_get_rw_ctxt(rdma, sge_no);
745         if (!ctxt)
746                 return -ENOMEM;
747         ctxt->rw_nents = sge_no;
748 
749         sg = ctxt->rw_sg_table.sgl;
750         for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
751                 seg_len = min_t(unsigned int, len,
752                                 PAGE_SIZE - head->rc_pageoff);
753 
754                 if (!head->rc_pageoff)
755                         head->rc_page_count++;
756 
757                 sg_set_page(sg, rqstp->rq_pages[head->rc_curpage],
758                             seg_len, head->rc_pageoff);
759                 sg = sg_next(sg);
760 
761                 head->rc_pageoff += seg_len;
762                 if (head->rc_pageoff == PAGE_SIZE) {
763                         head->rc_curpage++;
764                         head->rc_pageoff = 0;
765                 }
766                 len -= seg_len;
767 
768                 if (len && ((head->rc_curpage + 1) > ARRAY_SIZE(rqstp->rq_pages)))
769                         goto out_overrun;
770         }
771 
772         ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset,
773                                    segment->rs_handle, DMA_FROM_DEVICE);
774         if (ret < 0)
775                 return -EIO;
776         percpu_counter_inc(&svcrdma_stat_read);
777 
778         list_add(&ctxt->rw_list, &cc->cc_rwctxts);
779         cc->cc_sqecount += ret;
780         return 0;
781 
782 out_overrun:
783         trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage);
784         return -EINVAL;
785 }
786 
787 /**
788  * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
789  * @rqstp: RPC transaction context
790  * @head: context for ongoing I/O
791  * @chunk: Read chunk to pull
792  *
793  * Return values:
794  *   %0: the Read WR chain was constructed successfully
795  *   %-EINVAL: there were not enough resources to finish
796  *   %-ENOMEM: allocating a local resources failed
797  *   %-EIO: a DMA mapping error occurred
798  */
799 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
800                                      struct svc_rdma_recv_ctxt *head,
801                                      const struct svc_rdma_chunk *chunk)
802 {
803         const struct svc_rdma_segment *segment;
804         int ret;
805 
806         ret = -EINVAL;
807         pcl_for_each_segment(segment, chunk) {
808                 ret = svc_rdma_build_read_segment(rqstp, head, segment);
809                 if (ret < 0)
810                         break;
811                 head->rc_readbytes += segment->rs_length;
812         }
813         return ret;
814 }
815 
816 /**
817  * svc_rdma_copy_inline_range - Copy part of the inline content into pages
818  * @rqstp: RPC transaction context
819  * @head: context for ongoing I/O
820  * @offset: offset into the Receive buffer of region to copy
821  * @remaining: length of region to copy
822  *
823  * Take a page at a time from rqstp->rq_pages and copy the inline
824  * content from the Receive buffer into that page. Update
825  * head->rc_curpage and head->rc_pageoff so that the next RDMA Read
826  * result will land contiguously with the copied content.
827  *
828  * Return values:
829  *   %0: Inline content was successfully copied
830  *   %-EINVAL: offset or length was incorrect
831  */
832 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp,
833                                       struct svc_rdma_recv_ctxt *head,
834                                       unsigned int offset,
835                                       unsigned int remaining)
836 {
837         unsigned char *dst, *src = head->rc_recv_buf;
838         unsigned int page_no, numpages;
839 
840         numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT;
841         for (page_no = 0; page_no < numpages; page_no++) {
842                 unsigned int page_len;
843 
844                 page_len = min_t(unsigned int, remaining,
845                                  PAGE_SIZE - head->rc_pageoff);
846 
847                 if (!head->rc_pageoff)
848                         head->rc_page_count++;
849 
850                 dst = page_address(rqstp->rq_pages[head->rc_curpage]);
851                 memcpy(dst + head->rc_curpage, src + offset, page_len);
852 
853                 head->rc_readbytes += page_len;
854                 head->rc_pageoff += page_len;
855                 if (head->rc_pageoff == PAGE_SIZE) {
856                         head->rc_curpage++;
857                         head->rc_pageoff = 0;
858                 }
859                 remaining -= page_len;
860                 offset += page_len;
861         }
862 
863         return -EINVAL;
864 }
865 
866 /**
867  * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
868  * @rqstp: RPC transaction context
869  * @head: context for ongoing I/O
870  *
871  * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
872  * like an incoming TCP call.
873  *
874  * Return values:
875  *   %0: RDMA Read WQEs were successfully built
876  *   %-EINVAL: client provided too many chunks or segments,
877  *   %-ENOMEM: rdma_rw context pool was exhausted,
878  *   %-ENOTCONN: posting failed (connection is lost),
879  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
880  */
881 static noinline int
882 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
883                               struct svc_rdma_recv_ctxt *head)
884 {
885         const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
886         struct svc_rdma_chunk *chunk, *next;
887         unsigned int start, length;
888         int ret;
889 
890         start = 0;
891         chunk = pcl_first_chunk(pcl);
892         length = chunk->ch_position;
893         ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
894         if (ret < 0)
895                 return ret;
896 
897         pcl_for_each_chunk(chunk, pcl) {
898                 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
899                 if (ret < 0)
900                         return ret;
901 
902                 next = pcl_next_chunk(pcl, chunk);
903                 if (!next)
904                         break;
905 
906                 start += length;
907                 length = next->ch_position - head->rc_readbytes;
908                 ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
909                 if (ret < 0)
910                         return ret;
911         }
912 
913         start += length;
914         length = head->rc_byte_len - start;
915         return svc_rdma_copy_inline_range(rqstp, head, start, length);
916 }
917 
918 /**
919  * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
920  * @rqstp: RPC transaction context
921  * @head: context for ongoing I/O
922  *
923  * The chunk data lands in the page list of rqstp->rq_arg.pages.
924  *
925  * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
926  * Therefore, XDR round-up of the Read chunk and trailing
927  * inline content must both be added at the end of the pagelist.
928  *
929  * Return values:
930  *   %0: RDMA Read WQEs were successfully built
931  *   %-EINVAL: client provided too many chunks or segments,
932  *   %-ENOMEM: rdma_rw context pool was exhausted,
933  *   %-ENOTCONN: posting failed (connection is lost),
934  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
935  */
936 static int svc_rdma_read_data_item(struct svc_rqst *rqstp,
937                                    struct svc_rdma_recv_ctxt *head)
938 {
939         return svc_rdma_build_read_chunk(rqstp, head,
940                                          pcl_first_chunk(&head->rc_read_pcl));
941 }
942 
943 /**
944  * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk
945  * @rqstp: RPC transaction context
946  * @head: context for ongoing I/O
947  * @chunk: parsed Call chunk to pull
948  * @offset: offset of region to pull
949  * @length: length of region to pull
950  *
951  * Return values:
952  *   %0: RDMA Read WQEs were successfully built
953  *   %-EINVAL: there were not enough resources to finish
954  *   %-ENOMEM: rdma_rw context pool was exhausted,
955  *   %-ENOTCONN: posting failed (connection is lost),
956  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
957  */
958 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp,
959                                      struct svc_rdma_recv_ctxt *head,
960                                      const struct svc_rdma_chunk *chunk,
961                                      unsigned int offset, unsigned int length)
962 {
963         const struct svc_rdma_segment *segment;
964         int ret;
965 
966         ret = -EINVAL;
967         pcl_for_each_segment(segment, chunk) {
968                 struct svc_rdma_segment dummy;
969 
970                 if (offset > segment->rs_length) {
971                         offset -= segment->rs_length;
972                         continue;
973                 }
974 
975                 dummy.rs_handle = segment->rs_handle;
976                 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
977                 dummy.rs_offset = segment->rs_offset + offset;
978 
979                 ret = svc_rdma_build_read_segment(rqstp, head, &dummy);
980                 if (ret < 0)
981                         break;
982 
983                 head->rc_readbytes += dummy.rs_length;
984                 length -= dummy.rs_length;
985                 offset = 0;
986         }
987         return ret;
988 }
989 
990 /**
991  * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
992  * @rqstp: RPC transaction context
993  * @head: context for ongoing I/O
994  *
995  * Return values:
996  *   %0: RDMA Read WQEs were successfully built
997  *   %-EINVAL: there were not enough resources to finish
998  *   %-ENOMEM: rdma_rw context pool was exhausted,
999  *   %-ENOTCONN: posting failed (connection is lost),
1000  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1001  */
1002 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
1003                                     struct svc_rdma_recv_ctxt *head)
1004 {
1005         const struct svc_rdma_chunk *call_chunk =
1006                         pcl_first_chunk(&head->rc_call_pcl);
1007         const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
1008         struct svc_rdma_chunk *chunk, *next;
1009         unsigned int start, length;
1010         int ret;
1011 
1012         if (pcl_is_empty(pcl))
1013                 return svc_rdma_build_read_chunk(rqstp, head, call_chunk);
1014 
1015         start = 0;
1016         chunk = pcl_first_chunk(pcl);
1017         length = chunk->ch_position;
1018         ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1019                                         start, length);
1020         if (ret < 0)
1021                 return ret;
1022 
1023         pcl_for_each_chunk(chunk, pcl) {
1024                 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
1025                 if (ret < 0)
1026                         return ret;
1027 
1028                 next = pcl_next_chunk(pcl, chunk);
1029                 if (!next)
1030                         break;
1031 
1032                 start += length;
1033                 length = next->ch_position - head->rc_readbytes;
1034                 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1035                                                 start, length);
1036                 if (ret < 0)
1037                         return ret;
1038         }
1039 
1040         start += length;
1041         length = call_chunk->ch_length - start;
1042         return svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1043                                          start, length);
1044 }
1045 
1046 /**
1047  * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1048  * @rqstp: RPC transaction context
1049  * @head: context for ongoing I/O
1050  *
1051  * The start of the data lands in the first page just after the
1052  * Transport header, and the rest lands in rqstp->rq_arg.pages.
1053  *
1054  * Assumptions:
1055  *      - A PZRC is never sent in an RDMA_MSG message, though it's
1056  *        allowed by spec.
1057  *
1058  * Return values:
1059  *   %0: RDMA Read WQEs were successfully built
1060  *   %-EINVAL: client provided too many chunks or segments,
1061  *   %-ENOMEM: rdma_rw context pool was exhausted,
1062  *   %-ENOTCONN: posting failed (connection is lost),
1063  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1064  */
1065 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp,
1066                                           struct svc_rdma_recv_ctxt *head)
1067 {
1068         return svc_rdma_read_call_chunk(rqstp, head);
1069 }
1070 
1071 /* Pages under I/O have been copied to head->rc_pages. Ensure that
1072  * svc_xprt_release() does not put them when svc_rdma_recvfrom()
1073  * returns. This has to be done after all Read WRs are constructed
1074  * to properly handle a page that happens to be part of I/O on behalf
1075  * of two different RDMA segments.
1076  *
1077  * Note: if the subsequent post_send fails, these pages have already
1078  * been moved to head->rc_pages and thus will be cleaned up by
1079  * svc_rdma_recv_ctxt_put().
1080  */
1081 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
1082                                       struct svc_rdma_recv_ctxt *head)
1083 {
1084         unsigned int i;
1085 
1086         for (i = 0; i < head->rc_page_count; i++) {
1087                 head->rc_pages[i] = rqstp->rq_pages[i];
1088                 rqstp->rq_pages[i] = NULL;
1089         }
1090 }
1091 
1092 /**
1093  * svc_rdma_process_read_list - Pull list of Read chunks from the client
1094  * @rdma: controlling RDMA transport
1095  * @rqstp: set of pages to use as Read sink buffers
1096  * @head: pages under I/O collect here
1097  *
1098  * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1099  * pull each Read chunk as they decode an incoming RPC message.
1100  *
1101  * On Linux, however, the server needs to have a fully-constructed RPC
1102  * message in rqstp->rq_arg when there is a positive return code from
1103  * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1104  * it is received, then here the whole Read list is pulled all at once.
1105  * The ingress RPC message is fully reconstructed once all associated
1106  * RDMA Reads have completed.
1107  *
1108  * Return values:
1109  *   %1: all needed RDMA Reads were posted successfully,
1110  *   %-EINVAL: client provided too many chunks or segments,
1111  *   %-ENOMEM: rdma_rw context pool was exhausted,
1112  *   %-ENOTCONN: posting failed (connection is lost),
1113  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1114  */
1115 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1116                                struct svc_rqst *rqstp,
1117                                struct svc_rdma_recv_ctxt *head)
1118 {
1119         struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
1120         int ret;
1121 
1122         cc->cc_cqe.done = svc_rdma_wc_read_done;
1123         cc->cc_sqecount = 0;
1124         head->rc_pageoff = 0;
1125         head->rc_curpage = 0;
1126         head->rc_readbytes = 0;
1127 
1128         if (pcl_is_empty(&head->rc_call_pcl)) {
1129                 if (head->rc_read_pcl.cl_count == 1)
1130                         ret = svc_rdma_read_data_item(rqstp, head);
1131                 else
1132                         ret = svc_rdma_read_multiple_chunks(rqstp, head);
1133         } else
1134                 ret = svc_rdma_read_special(rqstp, head);
1135         svc_rdma_clear_rqst_pages(rqstp, head);
1136         if (ret < 0)
1137                 return ret;
1138 
1139         trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
1140         ret = svc_rdma_post_chunk_ctxt(rdma, cc);
1141         return ret < 0 ? ret : 1;
1142 }
1143 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

sflogo.php