1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3 * Copyright (c) 2014-2017 Oracle. All rights reserved.
4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the BSD-type
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions
16 * Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
19 * Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
24 * Neither the name of the Network Appliance, Inc. nor the names of
25 * its contributors may be used to endorse or promote products
26 * derived from this software without specific prior written
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 * This file contains the guts of the RPC RDMA protocol, and
46 * does marshaling/unmarshaling, etc. It is also where interfacing
47 * to the Linux RPC framework lives.
50 #include <linux/highmem.h>
52 #include <linux/sunrpc/svc_rdma.h>
54 #include "xprt_rdma.h"
55 #include <trace/events/rpcrdma.h>
57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
58 # define RPCDBG_FACILITY RPCDBG_TRANS
61 /* Returns size of largest RPC-over-RDMA header in a Call message
63 * The largest Call header contains a full-size Read list and a
64 * minimal Reply chunk.
66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs
)
70 /* Fixed header fields and list discriminators */
71 size
= RPCRDMA_HDRLEN_MIN
;
73 /* Maximum Read list size */
74 size
= maxsegs
* rpcrdma_readchunk_maxsz
* sizeof(__be32
);
76 /* Minimal Read chunk size */
77 size
+= sizeof(__be32
); /* segment count */
78 size
+= rpcrdma_segment_maxsz
* sizeof(__be32
);
79 size
+= sizeof(__be32
); /* list discriminator */
81 dprintk("RPC: %s: max call header size = %u\n",
86 /* Returns size of largest RPC-over-RDMA header in a Reply message
88 * There is only one Write list or one Reply chunk per Reply
89 * message. The larger list is the Write list.
91 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs
)
95 /* Fixed header fields and list discriminators */
96 size
= RPCRDMA_HDRLEN_MIN
;
98 /* Maximum Write list size */
99 size
= sizeof(__be32
); /* segment count */
100 size
+= maxsegs
* rpcrdma_segment_maxsz
* sizeof(__be32
);
101 size
+= sizeof(__be32
); /* list discriminator */
103 dprintk("RPC: %s: max reply header size = %u\n",
108 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt
*r_xprt
)
110 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
111 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
112 unsigned int maxsegs
= ia
->ri_max_segs
;
114 ia
->ri_max_inline_write
= cdata
->inline_wsize
-
115 rpcrdma_max_call_header_size(maxsegs
);
116 ia
->ri_max_inline_read
= cdata
->inline_rsize
-
117 rpcrdma_max_reply_header_size(maxsegs
);
120 /* The client can send a request inline as long as the RPCRDMA header
121 * plus the RPC call fit under the transport's inline limit. If the
122 * combined call message size exceeds that limit, the client must use
123 * a Read chunk for this operation.
125 * A Read chunk is also required if sending the RPC call inline would
126 * exceed this device's max_sge limit.
128 static bool rpcrdma_args_inline(struct rpcrdma_xprt
*r_xprt
,
129 struct rpc_rqst
*rqst
)
131 struct xdr_buf
*xdr
= &rqst
->rq_snd_buf
;
132 unsigned int count
, remaining
, offset
;
134 if (xdr
->len
> r_xprt
->rx_ia
.ri_max_inline_write
)
138 remaining
= xdr
->page_len
;
139 offset
= offset_in_page(xdr
->page_base
);
140 count
= RPCRDMA_MIN_SEND_SGES
;
142 remaining
-= min_t(unsigned int,
143 PAGE_SIZE
- offset
, remaining
);
145 if (++count
> r_xprt
->rx_ia
.ri_max_send_sges
)
153 /* The client can't know how large the actual reply will be. Thus it
154 * plans for the largest possible reply for that particular ULP
155 * operation. If the maximum combined reply message size exceeds that
156 * limit, the client must provide a write list or a reply chunk for
159 static bool rpcrdma_results_inline(struct rpcrdma_xprt
*r_xprt
,
160 struct rpc_rqst
*rqst
)
162 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
164 return rqst
->rq_rcv_buf
.buflen
<= ia
->ri_max_inline_read
;
167 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
168 * a byte range. Other modes coalesce these SGEs into a single MR
171 * Returns pointer to next available SGE, and bumps the total number
174 static struct rpcrdma_mr_seg
*
175 rpcrdma_convert_kvec(struct kvec
*vec
, struct rpcrdma_mr_seg
*seg
,
178 u32 remaining
, page_offset
;
181 base
= vec
->iov_base
;
182 page_offset
= offset_in_page(base
);
183 remaining
= vec
->iov_len
;
186 seg
->mr_offset
= base
;
187 seg
->mr_len
= min_t(u32
, PAGE_SIZE
- page_offset
, remaining
);
188 remaining
-= seg
->mr_len
;
197 /* Convert @xdrbuf into SGEs no larger than a page each. As they
198 * are registered, these SGEs are then coalesced into RDMA segments
199 * when the selected memreg mode supports it.
201 * Returns positive number of SGEs consumed, or a negative errno.
205 rpcrdma_convert_iovs(struct rpcrdma_xprt
*r_xprt
, struct xdr_buf
*xdrbuf
,
206 unsigned int pos
, enum rpcrdma_chunktype type
,
207 struct rpcrdma_mr_seg
*seg
)
209 unsigned long page_base
;
211 struct page
**ppages
;
215 seg
= rpcrdma_convert_kvec(&xdrbuf
->head
[0], seg
, &n
);
217 len
= xdrbuf
->page_len
;
218 ppages
= xdrbuf
->pages
+ (xdrbuf
->page_base
>> PAGE_SHIFT
);
219 page_base
= offset_in_page(xdrbuf
->page_base
);
221 /* ACL likes to be lazy in allocating pages - ACLs
222 * are small by default but can get huge.
224 if (unlikely(xdrbuf
->flags
& XDRBUF_SPARSE_PAGES
)) {
226 *ppages
= alloc_page(GFP_ATOMIC
);
230 seg
->mr_page
= *ppages
;
231 seg
->mr_offset
= (char *)page_base
;
232 seg
->mr_len
= min_t(u32
, PAGE_SIZE
- page_base
, len
);
240 /* When encoding a Read chunk, the tail iovec contains an
241 * XDR pad and may be omitted.
243 if (type
== rpcrdma_readch
&& r_xprt
->rx_ia
.ri_implicit_roundup
)
246 /* When encoding a Write chunk, some servers need to see an
247 * extra segment for non-XDR-aligned Write chunks. The upper
248 * layer provides space in the tail iovec that may be used
251 if (type
== rpcrdma_writech
&& r_xprt
->rx_ia
.ri_implicit_roundup
)
254 if (xdrbuf
->tail
[0].iov_len
)
255 seg
= rpcrdma_convert_kvec(&xdrbuf
->tail
[0], seg
, &n
);
258 if (unlikely(n
> RPCRDMA_MAX_SEGS
))
264 encode_item_present(struct xdr_stream
*xdr
)
268 p
= xdr_reserve_space(xdr
, sizeof(*p
));
277 encode_item_not_present(struct xdr_stream
*xdr
)
281 p
= xdr_reserve_space(xdr
, sizeof(*p
));
290 xdr_encode_rdma_segment(__be32
*iptr
, struct rpcrdma_mr
*mr
)
292 *iptr
++ = cpu_to_be32(mr
->mr_handle
);
293 *iptr
++ = cpu_to_be32(mr
->mr_length
);
294 xdr_encode_hyper(iptr
, mr
->mr_offset
);
298 encode_rdma_segment(struct xdr_stream
*xdr
, struct rpcrdma_mr
*mr
)
302 p
= xdr_reserve_space(xdr
, 4 * sizeof(*p
));
306 xdr_encode_rdma_segment(p
, mr
);
311 encode_read_segment(struct xdr_stream
*xdr
, struct rpcrdma_mr
*mr
,
316 p
= xdr_reserve_space(xdr
, 6 * sizeof(*p
));
320 *p
++ = xdr_one
; /* Item present */
321 *p
++ = cpu_to_be32(position
);
322 xdr_encode_rdma_segment(p
, mr
);
326 /* Register and XDR encode the Read list. Supports encoding a list of read
327 * segments that belong to a single read chunk.
329 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
331 * Read chunklist (a linked list):
332 * N elements, position P (same P for all chunks of same arg!):
333 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
335 * Returns zero on success, or a negative errno if a failure occurred.
336 * @xdr is advanced to the next position in the stream.
338 * Only a single @pos value is currently supported.
341 rpcrdma_encode_read_list(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
342 struct rpc_rqst
*rqst
, enum rpcrdma_chunktype rtype
)
344 struct xdr_stream
*xdr
= &req
->rl_stream
;
345 struct rpcrdma_mr_seg
*seg
;
346 struct rpcrdma_mr
*mr
;
350 pos
= rqst
->rq_snd_buf
.head
[0].iov_len
;
351 if (rtype
== rpcrdma_areadch
)
353 seg
= req
->rl_segments
;
354 nsegs
= rpcrdma_convert_iovs(r_xprt
, &rqst
->rq_snd_buf
, pos
,
360 seg
= frwr_map(r_xprt
, seg
, nsegs
, false, rqst
->rq_xid
, &mr
);
363 rpcrdma_mr_push(mr
, &req
->rl_registered
);
365 if (encode_read_segment(xdr
, mr
, pos
) < 0)
368 trace_xprtrdma_chunk_read(rqst
->rq_task
, pos
, mr
, nsegs
);
369 r_xprt
->rx_stats
.read_chunk_count
++;
370 nsegs
-= mr
->mr_nents
;
376 /* Register and XDR encode the Write list. Supports encoding a list
377 * containing one array of plain segments that belong to a single
380 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
382 * Write chunklist (a list of (one) counted array):
384 * 1 - N - HLOO - HLOO - ... - HLOO - 0
386 * Returns zero on success, or a negative errno if a failure occurred.
387 * @xdr is advanced to the next position in the stream.
389 * Only a single Write chunk is currently supported.
392 rpcrdma_encode_write_list(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
393 struct rpc_rqst
*rqst
, enum rpcrdma_chunktype wtype
)
395 struct xdr_stream
*xdr
= &req
->rl_stream
;
396 struct rpcrdma_mr_seg
*seg
;
397 struct rpcrdma_mr
*mr
;
401 seg
= req
->rl_segments
;
402 nsegs
= rpcrdma_convert_iovs(r_xprt
, &rqst
->rq_rcv_buf
,
403 rqst
->rq_rcv_buf
.head
[0].iov_len
,
408 if (encode_item_present(xdr
) < 0)
410 segcount
= xdr_reserve_space(xdr
, sizeof(*segcount
));
411 if (unlikely(!segcount
))
413 /* Actual value encoded below */
417 seg
= frwr_map(r_xprt
, seg
, nsegs
, true, rqst
->rq_xid
, &mr
);
420 rpcrdma_mr_push(mr
, &req
->rl_registered
);
422 if (encode_rdma_segment(xdr
, mr
) < 0)
425 trace_xprtrdma_chunk_write(rqst
->rq_task
, mr
, nsegs
);
426 r_xprt
->rx_stats
.write_chunk_count
++;
427 r_xprt
->rx_stats
.total_rdma_request
+= mr
->mr_length
;
429 nsegs
-= mr
->mr_nents
;
432 /* Update count of segments in this Write chunk */
433 *segcount
= cpu_to_be32(nchunks
);
438 /* Register and XDR encode the Reply chunk. Supports encoding an array
439 * of plain segments that belong to a single write (reply) chunk.
441 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
443 * Reply chunk (a counted array):
445 * 1 - N - HLOO - HLOO - ... - HLOO
447 * Returns zero on success, or a negative errno if a failure occurred.
448 * @xdr is advanced to the next position in the stream.
451 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
452 struct rpc_rqst
*rqst
, enum rpcrdma_chunktype wtype
)
454 struct xdr_stream
*xdr
= &req
->rl_stream
;
455 struct rpcrdma_mr_seg
*seg
;
456 struct rpcrdma_mr
*mr
;
460 seg
= req
->rl_segments
;
461 nsegs
= rpcrdma_convert_iovs(r_xprt
, &rqst
->rq_rcv_buf
, 0, wtype
, seg
);
465 if (encode_item_present(xdr
) < 0)
467 segcount
= xdr_reserve_space(xdr
, sizeof(*segcount
));
468 if (unlikely(!segcount
))
470 /* Actual value encoded below */
474 seg
= frwr_map(r_xprt
, seg
, nsegs
, true, rqst
->rq_xid
, &mr
);
477 rpcrdma_mr_push(mr
, &req
->rl_registered
);
479 if (encode_rdma_segment(xdr
, mr
) < 0)
482 trace_xprtrdma_chunk_reply(rqst
->rq_task
, mr
, nsegs
);
483 r_xprt
->rx_stats
.reply_chunk_count
++;
484 r_xprt
->rx_stats
.total_rdma_request
+= mr
->mr_length
;
486 nsegs
-= mr
->mr_nents
;
489 /* Update count of segments in the Reply chunk */
490 *segcount
= cpu_to_be32(nchunks
);
496 * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
497 * @sc: sendctx containing SGEs to unmap
501 rpcrdma_unmap_sendctx(struct rpcrdma_sendctx
*sc
)
503 struct rpcrdma_ia
*ia
= &sc
->sc_xprt
->rx_ia
;
507 /* The first two SGEs contain the transport header and
508 * the inline buffer. These are always left mapped so
509 * they can be cheaply re-used.
511 sge
= &sc
->sc_sges
[2];
512 for (count
= sc
->sc_unmap_count
; count
; ++sge
, --count
)
513 ib_dma_unmap_page(ia
->ri_device
,
514 sge
->addr
, sge
->length
, DMA_TO_DEVICE
);
516 if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES
, &sc
->sc_req
->rl_flags
)) {
517 smp_mb__after_atomic();
518 wake_up_bit(&sc
->sc_req
->rl_flags
, RPCRDMA_REQ_F_TX_RESOURCES
);
522 /* Prepare an SGE for the RPC-over-RDMA transport header.
525 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
528 struct rpcrdma_sendctx
*sc
= req
->rl_sendctx
;
529 struct rpcrdma_regbuf
*rb
= req
->rl_rdmabuf
;
530 struct ib_sge
*sge
= sc
->sc_sges
;
532 if (!rpcrdma_dma_map_regbuf(ia
, rb
))
534 sge
->addr
= rdmab_addr(rb
);
536 sge
->lkey
= rdmab_lkey(rb
);
538 ib_dma_sync_single_for_device(rdmab_device(rb
), sge
->addr
,
539 sge
->length
, DMA_TO_DEVICE
);
544 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
548 /* Prepare the Send SGEs. The head and tail iovec, and each entry
549 * in the page list, gets its own SGE.
552 rpcrdma_prepare_msg_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
553 struct xdr_buf
*xdr
, enum rpcrdma_chunktype rtype
)
555 struct rpcrdma_sendctx
*sc
= req
->rl_sendctx
;
556 unsigned int sge_no
, page_base
, len
, remaining
;
557 struct rpcrdma_regbuf
*rb
= req
->rl_sendbuf
;
558 struct ib_device
*device
= ia
->ri_device
;
559 struct ib_sge
*sge
= sc
->sc_sges
;
560 u32 lkey
= ia
->ri_pd
->local_dma_lkey
;
561 struct page
*page
, **ppages
;
563 /* The head iovec is straightforward, as it is already
564 * DMA-mapped. Sync the content that has changed.
566 if (!rpcrdma_dma_map_regbuf(ia
, rb
))
569 sge
[sge_no
].addr
= rdmab_addr(rb
);
570 sge
[sge_no
].length
= xdr
->head
[0].iov_len
;
571 sge
[sge_no
].lkey
= rdmab_lkey(rb
);
572 ib_dma_sync_single_for_device(rdmab_device(rb
), sge
[sge_no
].addr
,
573 sge
[sge_no
].length
, DMA_TO_DEVICE
);
575 /* If there is a Read chunk, the page list is being handled
576 * via explicit RDMA, and thus is skipped here. However, the
577 * tail iovec may include an XDR pad for the page list, as
578 * well as additional content, and may not reside in the
579 * same page as the head iovec.
581 if (rtype
== rpcrdma_readch
) {
582 len
= xdr
->tail
[0].iov_len
;
584 /* Do not include the tail if it is only an XDR pad */
588 page
= virt_to_page(xdr
->tail
[0].iov_base
);
589 page_base
= offset_in_page(xdr
->tail
[0].iov_base
);
591 /* If the content in the page list is an odd length,
592 * xdr_write_pages() has added a pad at the beginning
593 * of the tail iovec. Force the tail's non-pad content
594 * to land at the next XDR position in the Send message.
596 page_base
+= len
& 3;
601 /* If there is a page list present, temporarily DMA map
602 * and prepare an SGE for each page to be sent.
605 ppages
= xdr
->pages
+ (xdr
->page_base
>> PAGE_SHIFT
);
606 page_base
= offset_in_page(xdr
->page_base
);
607 remaining
= xdr
->page_len
;
610 if (sge_no
> RPCRDMA_MAX_SEND_SGES
- 2)
611 goto out_mapping_overflow
;
613 len
= min_t(u32
, PAGE_SIZE
- page_base
, remaining
);
614 sge
[sge_no
].addr
= ib_dma_map_page(device
, *ppages
,
617 if (ib_dma_mapping_error(device
, sge
[sge_no
].addr
))
618 goto out_mapping_err
;
619 sge
[sge_no
].length
= len
;
620 sge
[sge_no
].lkey
= lkey
;
622 sc
->sc_unmap_count
++;
629 /* The tail iovec is not always constructed in the same
630 * page where the head iovec resides (see, for example,
631 * gss_wrap_req_priv). To neatly accommodate that case,
632 * DMA map it separately.
634 if (xdr
->tail
[0].iov_len
) {
635 page
= virt_to_page(xdr
->tail
[0].iov_base
);
636 page_base
= offset_in_page(xdr
->tail
[0].iov_base
);
637 len
= xdr
->tail
[0].iov_len
;
641 sge
[sge_no
].addr
= ib_dma_map_page(device
, page
,
644 if (ib_dma_mapping_error(device
, sge
[sge_no
].addr
))
645 goto out_mapping_err
;
646 sge
[sge_no
].length
= len
;
647 sge
[sge_no
].lkey
= lkey
;
648 sc
->sc_unmap_count
++;
652 sc
->sc_wr
.num_sge
+= sge_no
;
653 if (sc
->sc_unmap_count
)
654 __set_bit(RPCRDMA_REQ_F_TX_RESOURCES
, &req
->rl_flags
);
658 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
661 out_mapping_overflow
:
662 rpcrdma_unmap_sendctx(sc
);
663 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no
);
667 rpcrdma_unmap_sendctx(sc
);
668 trace_xprtrdma_dma_maperr(sge
[sge_no
].addr
);
673 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
674 * @r_xprt: controlling transport
675 * @req: context of RPC Call being marshalled
676 * @hdrlen: size of transport header, in bytes
677 * @xdr: xdr_buf containing RPC Call
678 * @rtype: chunk type being encoded
680 * Returns 0 on success; otherwise a negative errno is returned.
683 rpcrdma_prepare_send_sges(struct rpcrdma_xprt
*r_xprt
,
684 struct rpcrdma_req
*req
, u32 hdrlen
,
685 struct xdr_buf
*xdr
, enum rpcrdma_chunktype rtype
)
687 req
->rl_sendctx
= rpcrdma_sendctx_get_locked(&r_xprt
->rx_buf
);
688 if (!req
->rl_sendctx
)
690 req
->rl_sendctx
->sc_wr
.num_sge
= 0;
691 req
->rl_sendctx
->sc_unmap_count
= 0;
692 req
->rl_sendctx
->sc_req
= req
;
693 __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES
, &req
->rl_flags
);
695 if (!rpcrdma_prepare_hdr_sge(&r_xprt
->rx_ia
, req
, hdrlen
))
698 if (rtype
!= rpcrdma_areadch
)
699 if (!rpcrdma_prepare_msg_sges(&r_xprt
->rx_ia
, req
, xdr
, rtype
))
706 * rpcrdma_marshal_req - Marshal and send one RPC request
707 * @r_xprt: controlling transport
708 * @rqst: RPC request to be marshaled
710 * For the RPC in "rqst", this function:
711 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
712 * - Registers Read, Write, and Reply chunks
713 * - Constructs the transport header
714 * - Posts a Send WR to send the transport header and request
717 * %0 if the RPC was sent successfully,
718 * %-ENOTCONN if the connection was lost,
719 * %-EAGAIN if the caller should call again with the same arguments,
720 * %-ENOBUFS if the caller should call again after a delay,
721 * %-EMSGSIZE if the transport header is too small,
722 * %-EIO if a permanent problem occurred while marshaling.
725 rpcrdma_marshal_req(struct rpcrdma_xprt
*r_xprt
, struct rpc_rqst
*rqst
)
727 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
728 struct xdr_stream
*xdr
= &req
->rl_stream
;
729 enum rpcrdma_chunktype rtype
, wtype
;
734 rpcrdma_set_xdrlen(&req
->rl_hdrbuf
, 0);
735 xdr_init_encode(xdr
, &req
->rl_hdrbuf
,
736 req
->rl_rdmabuf
->rg_base
);
738 /* Fixed header fields */
740 p
= xdr_reserve_space(xdr
, 4 * sizeof(*p
));
744 *p
++ = rpcrdma_version
;
745 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_max_requests
);
747 /* When the ULP employs a GSS flavor that guarantees integrity
748 * or privacy, direct data placement of individual data items
751 ddp_allowed
= !(rqst
->rq_cred
->cr_auth
->au_flags
&
752 RPCAUTH_AUTH_DATATOUCH
);
755 * Chunks needed for results?
757 * o If the expected result is under the inline threshold, all ops
759 * o Large read ops return data as write chunk(s), header as
761 * o Large non-read ops return as a single reply chunk.
763 if (rpcrdma_results_inline(r_xprt
, rqst
))
764 wtype
= rpcrdma_noch
;
765 else if (ddp_allowed
&& rqst
->rq_rcv_buf
.flags
& XDRBUF_READ
)
766 wtype
= rpcrdma_writech
;
768 wtype
= rpcrdma_replych
;
771 * Chunks needed for arguments?
773 * o If the total request is under the inline threshold, all ops
774 * are sent as inline.
775 * o Large write ops transmit data as read chunk(s), header as
777 * o Large non-write ops are sent with the entire message as a
778 * single read chunk (protocol 0-position special case).
780 * This assumes that the upper layer does not present a request
781 * that both has a data payload, and whose non-data arguments
782 * by themselves are larger than the inline threshold.
784 if (rpcrdma_args_inline(r_xprt
, rqst
)) {
786 rtype
= rpcrdma_noch
;
787 } else if (ddp_allowed
&& rqst
->rq_snd_buf
.flags
& XDRBUF_WRITE
) {
789 rtype
= rpcrdma_readch
;
791 r_xprt
->rx_stats
.nomsg_call_count
++;
793 rtype
= rpcrdma_areadch
;
796 /* If this is a retransmit, discard previously registered
797 * chunks. Very likely the connection has been replaced,
798 * so these registrations are invalid and unusable.
800 while (unlikely(!list_empty(&req
->rl_registered
))) {
801 struct rpcrdma_mr
*mr
;
803 mr
= rpcrdma_mr_pop(&req
->rl_registered
);
804 rpcrdma_mr_recycle(mr
);
807 /* This implementation supports the following combinations
808 * of chunk lists in one RPC-over-RDMA Call message:
813 * - Read list + Reply chunk
815 * It might not yet support the following combinations:
817 * - Read list + Write list
819 * It does not support the following combinations:
821 * - Write list + Reply chunk
822 * - Read list + Write list + Reply chunk
824 * This implementation supports only a single chunk in each
825 * Read or Write list. Thus for example the client cannot
826 * send a Call message with a Position Zero Read chunk and a
827 * regular Read chunk at the same time.
829 if (rtype
!= rpcrdma_noch
) {
830 ret
= rpcrdma_encode_read_list(r_xprt
, req
, rqst
, rtype
);
834 ret
= encode_item_not_present(xdr
);
838 if (wtype
== rpcrdma_writech
) {
839 ret
= rpcrdma_encode_write_list(r_xprt
, req
, rqst
, wtype
);
843 ret
= encode_item_not_present(xdr
);
847 if (wtype
!= rpcrdma_replych
)
848 ret
= encode_item_not_present(xdr
);
850 ret
= rpcrdma_encode_reply_chunk(r_xprt
, req
, rqst
, wtype
);
854 trace_xprtrdma_marshal(rqst
, xdr_stream_pos(xdr
), rtype
, wtype
);
856 ret
= rpcrdma_prepare_send_sges(r_xprt
, req
, xdr_stream_pos(xdr
),
857 &rqst
->rq_snd_buf
, rtype
);
865 xprt_wait_for_buffer_space(rqst
->rq_xprt
);
870 r_xprt
->rx_stats
.failed_marshal_count
++;
876 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
877 * @rqst: controlling RPC request
878 * @srcp: points to RPC message payload in receive buffer
879 * @copy_len: remaining length of receive buffer content
880 * @pad: Write chunk pad bytes needed (zero for pure inline)
882 * The upper layer has set the maximum number of bytes it can
883 * receive in each component of rq_rcv_buf. These values are set in
884 * the head.iov_len, page_len, tail.iov_len, and buflen fields.
886 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
887 * many cases this function simply updates iov_base pointers in
888 * rq_rcv_buf to point directly to the received reply data, to
889 * avoid copying reply data.
891 * Returns the count of bytes which had to be memcopied.
894 rpcrdma_inline_fixup(struct rpc_rqst
*rqst
, char *srcp
, int copy_len
, int pad
)
896 unsigned long fixup_copy_count
;
897 int i
, npages
, curlen
;
899 struct page
**ppages
;
902 /* The head iovec is redirected to the RPC reply message
903 * in the receive buffer, to avoid a memcopy.
905 rqst
->rq_rcv_buf
.head
[0].iov_base
= srcp
;
906 rqst
->rq_private_buf
.head
[0].iov_base
= srcp
;
908 /* The contents of the receive buffer that follow
909 * head.iov_len bytes are copied into the page list.
911 curlen
= rqst
->rq_rcv_buf
.head
[0].iov_len
;
912 if (curlen
> copy_len
)
914 trace_xprtrdma_fixup(rqst
, copy_len
, curlen
);
918 ppages
= rqst
->rq_rcv_buf
.pages
+
919 (rqst
->rq_rcv_buf
.page_base
>> PAGE_SHIFT
);
920 page_base
= offset_in_page(rqst
->rq_rcv_buf
.page_base
);
921 fixup_copy_count
= 0;
922 if (copy_len
&& rqst
->rq_rcv_buf
.page_len
) {
925 pagelist_len
= rqst
->rq_rcv_buf
.page_len
;
926 if (pagelist_len
> copy_len
)
927 pagelist_len
= copy_len
;
928 npages
= PAGE_ALIGN(page_base
+ pagelist_len
) >> PAGE_SHIFT
;
929 for (i
= 0; i
< npages
; i
++) {
930 curlen
= PAGE_SIZE
- page_base
;
931 if (curlen
> pagelist_len
)
932 curlen
= pagelist_len
;
934 trace_xprtrdma_fixup_pg(rqst
, i
, srcp
,
936 destp
= kmap_atomic(ppages
[i
]);
937 memcpy(destp
+ page_base
, srcp
, curlen
);
938 flush_dcache_page(ppages
[i
]);
939 kunmap_atomic(destp
);
942 fixup_copy_count
+= curlen
;
943 pagelist_len
-= curlen
;
949 /* Implicit padding for the last segment in a Write
950 * chunk is inserted inline at the front of the tail
951 * iovec. The upper layer ignores the content of
952 * the pad. Simply ensure inline content in the tail
953 * that follows the Write chunk is properly aligned.
959 /* The tail iovec is redirected to the remaining data
960 * in the receive buffer, to avoid a memcopy.
962 if (copy_len
|| pad
) {
963 rqst
->rq_rcv_buf
.tail
[0].iov_base
= srcp
;
964 rqst
->rq_private_buf
.tail
[0].iov_base
= srcp
;
967 return fixup_copy_count
;
970 /* By convention, backchannel calls arrive via rdma_msg type
971 * messages, and never populate the chunk lists. This makes
972 * the RPC/RDMA header small and fixed in size, so it is
973 * straightforward to check the RPC header's direction field.
976 rpcrdma_is_bcall(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_rep
*rep
)
977 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
979 struct xdr_stream
*xdr
= &rep
->rr_stream
;
982 if (rep
->rr_proc
!= rdma_msg
)
985 /* Peek at stream contents without advancing. */
986 p
= xdr_inline_decode(xdr
, 0);
989 if (*p
++ != xdr_zero
)
991 if (*p
++ != xdr_zero
)
993 if (*p
++ != xdr_zero
)
997 if (*p
++ != rep
->rr_xid
)
999 if (*p
!= cpu_to_be32(RPC_CALL
))
1002 /* Now that we are sure this is a backchannel call,
1003 * advance to the RPC header.
1005 p
= xdr_inline_decode(xdr
, 3 * sizeof(*p
));
1009 rpcrdma_bc_receive_call(r_xprt
, rep
);
1013 pr_warn("RPC/RDMA short backward direction call\n");
1016 #else /* CONFIG_SUNRPC_BACKCHANNEL */
1020 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1022 static int decode_rdma_segment(struct xdr_stream
*xdr
, u32
*length
)
1028 p
= xdr_inline_decode(xdr
, 4 * sizeof(*p
));
1032 handle
= be32_to_cpup(p
++);
1033 *length
= be32_to_cpup(p
++);
1034 xdr_decode_hyper(p
, &offset
);
1036 trace_xprtrdma_decode_seg(handle
, *length
, offset
);
1040 static int decode_write_chunk(struct xdr_stream
*xdr
, u32
*length
)
1042 u32 segcount
, seglength
;
1045 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1050 segcount
= be32_to_cpup(p
);
1051 while (segcount
--) {
1052 if (decode_rdma_segment(xdr
, &seglength
))
1054 *length
+= seglength
;
1060 /* In RPC-over-RDMA Version One replies, a Read list is never
1061 * expected. This decoder is a stub that returns an error if
1062 * a Read list is present.
1064 static int decode_read_list(struct xdr_stream
*xdr
)
1068 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1071 if (unlikely(*p
!= xdr_zero
))
1076 /* Supports only one Write chunk in the Write list
1078 static int decode_write_list(struct xdr_stream
*xdr
, u32
*length
)
1087 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1095 if (decode_write_chunk(xdr
, &chunklen
))
1097 *length
+= chunklen
;
1103 static int decode_reply_chunk(struct xdr_stream
*xdr
, u32
*length
)
1107 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1113 if (decode_write_chunk(xdr
, length
))
1119 rpcrdma_decode_msg(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_rep
*rep
,
1120 struct rpc_rqst
*rqst
)
1122 struct xdr_stream
*xdr
= &rep
->rr_stream
;
1123 u32 writelist
, replychunk
, rpclen
;
1126 /* Decode the chunk lists */
1127 if (decode_read_list(xdr
))
1129 if (decode_write_list(xdr
, &writelist
))
1131 if (decode_reply_chunk(xdr
, &replychunk
))
1134 /* RDMA_MSG sanity checks */
1135 if (unlikely(replychunk
))
1138 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1139 base
= (char *)xdr_inline_decode(xdr
, 0);
1140 rpclen
= xdr_stream_remaining(xdr
);
1141 r_xprt
->rx_stats
.fixup_copy_count
+=
1142 rpcrdma_inline_fixup(rqst
, base
, rpclen
, writelist
& 3);
1144 r_xprt
->rx_stats
.total_rdma_reply
+= writelist
;
1145 return rpclen
+ xdr_align_size(writelist
);
1149 rpcrdma_decode_nomsg(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_rep
*rep
)
1151 struct xdr_stream
*xdr
= &rep
->rr_stream
;
1152 u32 writelist
, replychunk
;
1154 /* Decode the chunk lists */
1155 if (decode_read_list(xdr
))
1157 if (decode_write_list(xdr
, &writelist
))
1159 if (decode_reply_chunk(xdr
, &replychunk
))
1162 /* RDMA_NOMSG sanity checks */
1163 if (unlikely(writelist
))
1165 if (unlikely(!replychunk
))
1168 /* Reply chunk buffer already is the reply vector */
1169 r_xprt
->rx_stats
.total_rdma_reply
+= replychunk
;
1174 rpcrdma_decode_error(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_rep
*rep
,
1175 struct rpc_rqst
*rqst
)
1177 struct xdr_stream
*xdr
= &rep
->rr_stream
;
1180 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1186 p
= xdr_inline_decode(xdr
, 2 * sizeof(*p
));
1189 dprintk("RPC: %s: server reports "
1190 "version error (%u-%u), xid %08x\n", __func__
,
1191 be32_to_cpup(p
), be32_to_cpu(*(p
+ 1)),
1192 be32_to_cpu(rep
->rr_xid
));
1195 dprintk("RPC: %s: server reports "
1196 "header decoding error, xid %08x\n", __func__
,
1197 be32_to_cpu(rep
->rr_xid
));
1200 dprintk("RPC: %s: server reports "
1201 "unrecognized error %d, xid %08x\n", __func__
,
1202 be32_to_cpup(p
), be32_to_cpu(rep
->rr_xid
));
1205 r_xprt
->rx_stats
.bad_reply_count
++;
1209 /* Perform XID lookup, reconstruction of the RPC reply, and
1210 * RPC completion while holding the transport lock to ensure
1211 * the rep, rqst, and rq_task pointers remain stable.
1213 void rpcrdma_complete_rqst(struct rpcrdma_rep
*rep
)
1215 struct rpcrdma_xprt
*r_xprt
= rep
->rr_rxprt
;
1216 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
1217 struct rpc_rqst
*rqst
= rep
->rr_rqst
;
1220 xprt
->reestablish_timeout
= 0;
1222 switch (rep
->rr_proc
) {
1224 status
= rpcrdma_decode_msg(r_xprt
, rep
, rqst
);
1227 status
= rpcrdma_decode_nomsg(r_xprt
, rep
);
1230 status
= rpcrdma_decode_error(r_xprt
, rep
, rqst
);
1239 spin_lock(&xprt
->queue_lock
);
1240 xprt_complete_rqst(rqst
->rq_task
, status
);
1241 xprt_unpin_rqst(rqst
);
1242 spin_unlock(&xprt
->queue_lock
);
1245 /* If the incoming reply terminated a pending RPC, the next
1246 * RPC call will post a replacement receive buffer as it is
1250 trace_xprtrdma_reply_hdr(rep
);
1251 r_xprt
->rx_stats
.bad_reply_count
++;
1255 void rpcrdma_release_rqst(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
)
1257 /* Invalidate and unmap the data payloads before waking
1258 * the waiting application. This guarantees the memory
1259 * regions are properly fenced from the server before the
1260 * application accesses the data. It also ensures proper
1261 * send flow control: waking the next RPC waits until this
1262 * RPC has relinquished all its Send Queue entries.
1264 if (!list_empty(&req
->rl_registered
))
1265 frwr_unmap_sync(r_xprt
, &req
->rl_registered
);
1267 /* Ensure that any DMA mapped pages associated with
1268 * the Send of the RPC Call have been unmapped before
1269 * allowing the RPC to complete. This protects argument
1270 * memory not controlled by the RPC client from being
1271 * re-used before we're done with it.
1273 if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES
, &req
->rl_flags
)) {
1274 r_xprt
->rx_stats
.reply_waits_for_send
++;
1275 out_of_line_wait_on_bit(&req
->rl_flags
,
1276 RPCRDMA_REQ_F_TX_RESOURCES
,
1278 TASK_UNINTERRUPTIBLE
);
1282 /* Reply handling runs in the poll worker thread. Anything that
1283 * might wait is deferred to a separate workqueue.
1285 void rpcrdma_deferred_completion(struct work_struct
*work
)
1287 struct rpcrdma_rep
*rep
=
1288 container_of(work
, struct rpcrdma_rep
, rr_work
);
1289 struct rpcrdma_req
*req
= rpcr_to_rdmar(rep
->rr_rqst
);
1290 struct rpcrdma_xprt
*r_xprt
= rep
->rr_rxprt
;
1292 trace_xprtrdma_defer_cmp(rep
);
1293 if (rep
->rr_wc_flags
& IB_WC_WITH_INVALIDATE
)
1294 frwr_reminv(rep
, &req
->rl_registered
);
1295 rpcrdma_release_rqst(r_xprt
, req
);
1296 rpcrdma_complete_rqst(rep
);
1299 /* Process received RPC/RDMA messages.
1301 * Errors must result in the RPC task either being awakened, or
1302 * allowed to timeout, to discover the errors at that time.
1304 void rpcrdma_reply_handler(struct rpcrdma_rep
*rep
)
1306 struct rpcrdma_xprt
*r_xprt
= rep
->rr_rxprt
;
1307 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
1308 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1309 struct rpcrdma_req
*req
;
1310 struct rpc_rqst
*rqst
;
1314 /* Fixed transport header fields */
1315 xdr_init_decode(&rep
->rr_stream
, &rep
->rr_hdrbuf
,
1316 rep
->rr_hdrbuf
.head
[0].iov_base
);
1317 p
= xdr_inline_decode(&rep
->rr_stream
, 4 * sizeof(*p
));
1319 goto out_shortreply
;
1321 rep
->rr_vers
= *p
++;
1322 credits
= be32_to_cpu(*p
++);
1323 rep
->rr_proc
= *p
++;
1325 if (rep
->rr_vers
!= rpcrdma_version
)
1326 goto out_badversion
;
1328 if (rpcrdma_is_bcall(r_xprt
, rep
))
1331 /* Match incoming rpcrdma_rep to an rpcrdma_req to
1332 * get context for handling any incoming chunks.
1334 spin_lock(&xprt
->queue_lock
);
1335 rqst
= xprt_lookup_rqst(xprt
, rep
->rr_xid
);
1338 xprt_pin_rqst(rqst
);
1339 spin_unlock(&xprt
->queue_lock
);
1342 credits
= 1; /* don't deadlock */
1343 else if (credits
> buf
->rb_max_requests
)
1344 credits
= buf
->rb_max_requests
;
1345 if (buf
->rb_credits
!= credits
) {
1346 spin_lock_bh(&xprt
->transport_lock
);
1347 buf
->rb_credits
= credits
;
1348 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
1349 spin_unlock_bh(&xprt
->transport_lock
);
1352 req
= rpcr_to_rdmar(rqst
);
1353 if (req
->rl_reply
) {
1354 trace_xprtrdma_leaked_rep(rqst
, req
->rl_reply
);
1355 rpcrdma_recv_buffer_put(req
->rl_reply
);
1357 req
->rl_reply
= rep
;
1358 rep
->rr_rqst
= rqst
;
1359 clear_bit(RPCRDMA_REQ_F_PENDING
, &req
->rl_flags
);
1361 trace_xprtrdma_reply(rqst
->rq_task
, rep
, req
, credits
);
1362 queue_work(buf
->rb_completion_wq
, &rep
->rr_work
);
1366 trace_xprtrdma_reply_vers(rep
);
1370 spin_unlock(&xprt
->queue_lock
);
1371 trace_xprtrdma_reply_rqst(rep
);
1375 trace_xprtrdma_reply_short(rep
);
1378 rpcrdma_recv_buffer_put(rep
);