2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #include <linux/types.h>
47 #include <linux/slab.h>
48 #include <linux/capability.h>
49 #include <linux/sched.h>
50 #include <linux/errno.h>
51 #include <linux/socket.h>
53 #include <linux/net.h>
55 #include <linux/udp.h>
56 #include <linux/tcp.h>
57 #include <linux/sunrpc/clnt.h>
58 #include <linux/file.h>
59 #include <linux/workqueue.h>
60 #include <linux/random.h>
63 #include <net/checksum.h>
72 # undef RPC_DEBUG_DATA
73 # define RPCDBG_FACILITY RPCDBG_XPRT
76 #define XPRT_MAX_BACKOFF (8)
77 #define XPRT_IDLE_TIMEOUT (5*60*HZ)
78 #define XPRT_MAX_RESVPORT (800)
83 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
84 static inline void do_xprt_reserve(struct rpc_task
*);
85 static void xprt_disconnect(struct rpc_xprt
*);
86 static void xprt_connect_status(struct rpc_task
*task
);
87 static struct rpc_xprt
* xprt_setup(int proto
, struct sockaddr_in
*ap
,
88 struct rpc_timeout
*to
);
89 static struct socket
*xprt_create_socket(struct rpc_xprt
*, int, int);
90 static void xprt_bind_socket(struct rpc_xprt
*, struct socket
*);
91 static int __xprt_get_cong(struct rpc_xprt
*, struct rpc_task
*);
93 static int xprt_clear_backlog(struct rpc_xprt
*xprt
);
97 * Print the buffer contents (first 128 bytes only--just enough for
101 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
103 u8
*buf
= (u8
*) packet
;
106 dprintk("RPC: %s\n", msg
);
107 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
111 dprintk("0x%04x ", j
);
113 dprintk("%02x%02x%02x%02x ",
114 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
120 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
127 * Look up RPC transport given an INET socket
129 static inline struct rpc_xprt
*
130 xprt_from_sock(struct sock
*sk
)
132 return (struct rpc_xprt
*) sk
->sk_user_data
;
136 * Serialize write access to sockets, in order to prevent different
137 * requests from interfering with each other.
138 * Also prevents TCP socket connects from colliding with writes.
141 __xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
143 struct rpc_rqst
*req
= task
->tk_rqstp
;
145 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
)) {
146 if (task
== xprt
->snd_task
)
150 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
)) {
151 xprt
->snd_task
= task
;
153 req
->rq_bytes_sent
= 0;
158 smp_mb__before_clear_bit();
159 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
160 smp_mb__after_clear_bit();
162 dprintk("RPC: %4d failed to lock socket %p\n", task
->tk_pid
, xprt
);
163 task
->tk_timeout
= 0;
164 task
->tk_status
= -EAGAIN
;
165 if (req
&& req
->rq_ntrans
)
166 rpc_sleep_on(&xprt
->resend
, task
, NULL
, NULL
);
168 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
173 xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
177 spin_lock_bh(&xprt
->sock_lock
);
178 retval
= __xprt_lock_write(xprt
, task
);
179 spin_unlock_bh(&xprt
->sock_lock
);
185 __xprt_lock_write_next(struct rpc_xprt
*xprt
)
187 struct rpc_task
*task
;
189 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
))
191 if (!xprt
->nocong
&& RPCXPRT_CONGESTED(xprt
))
193 task
= rpc_wake_up_next(&xprt
->resend
);
195 task
= rpc_wake_up_next(&xprt
->sending
);
199 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
)) {
200 struct rpc_rqst
*req
= task
->tk_rqstp
;
201 xprt
->snd_task
= task
;
203 req
->rq_bytes_sent
= 0;
209 smp_mb__before_clear_bit();
210 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
211 smp_mb__after_clear_bit();
215 * Releases the socket for use by other requests.
218 __xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
220 if (xprt
->snd_task
== task
) {
221 xprt
->snd_task
= NULL
;
222 smp_mb__before_clear_bit();
223 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
224 smp_mb__after_clear_bit();
225 __xprt_lock_write_next(xprt
);
230 xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
232 spin_lock_bh(&xprt
->sock_lock
);
233 __xprt_release_write(xprt
, task
);
234 spin_unlock_bh(&xprt
->sock_lock
);
238 * Write data to socket.
241 xprt_sendmsg(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
243 struct socket
*sock
= xprt
->sock
;
244 struct xdr_buf
*xdr
= &req
->rq_snd_buf
;
245 struct sockaddr
*addr
= NULL
;
253 xprt_pktdump("packet data:",
254 req
->rq_svec
->iov_base
,
255 req
->rq_svec
->iov_len
);
257 /* For UDP, we need to provide an address */
259 addr
= (struct sockaddr
*) &xprt
->addr
;
260 addrlen
= sizeof(xprt
->addr
);
262 /* Dont repeat bytes */
263 skip
= req
->rq_bytes_sent
;
265 clear_bit(SOCK_ASYNC_NOSPACE
, &sock
->flags
);
266 result
= xdr_sendpages(sock
, addr
, addrlen
, xdr
, skip
, MSG_DONTWAIT
);
268 dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr
->len
- skip
, result
);
275 /* When the server has died, an ICMP port unreachable message
276 * prompts ECONNREFUSED.
283 /* connection broken */
288 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
294 * Van Jacobson congestion avoidance. Check if the congestion window
295 * overflowed. Put the task to sleep if this is the case.
298 __xprt_get_cong(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
300 struct rpc_rqst
*req
= task
->tk_rqstp
;
304 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
305 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
306 if (RPCXPRT_CONGESTED(xprt
))
309 xprt
->cong
+= RPC_CWNDSCALE
;
314 * Adjust the congestion window, and wake up the next task
315 * that has been sleeping due to congestion
318 __xprt_put_cong(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
323 xprt
->cong
-= RPC_CWNDSCALE
;
324 __xprt_lock_write_next(xprt
);
328 * Adjust RPC congestion window
329 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
332 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
337 if (result
>= 0 && cwnd
<= xprt
->cong
) {
338 /* The (cwnd >> 1) term makes sure
339 * the result gets rounded properly. */
340 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
341 if (cwnd
> RPC_MAXCWND(xprt
))
342 cwnd
= RPC_MAXCWND(xprt
);
343 __xprt_lock_write_next(xprt
);
344 } else if (result
== -ETIMEDOUT
) {
346 if (cwnd
< RPC_CWNDSCALE
)
347 cwnd
= RPC_CWNDSCALE
;
349 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
350 xprt
->cong
, xprt
->cwnd
, cwnd
);
355 * Reset the major timeout value
357 static void xprt_reset_majortimeo(struct rpc_rqst
*req
)
359 struct rpc_timeout
*to
= &req
->rq_xprt
->timeout
;
361 req
->rq_majortimeo
= req
->rq_timeout
;
362 if (to
->to_exponential
)
363 req
->rq_majortimeo
<<= to
->to_retries
;
365 req
->rq_majortimeo
+= to
->to_increment
* to
->to_retries
;
366 if (req
->rq_majortimeo
> to
->to_maxval
|| req
->rq_majortimeo
== 0)
367 req
->rq_majortimeo
= to
->to_maxval
;
368 req
->rq_majortimeo
+= jiffies
;
372 * Adjust timeout values etc for next retransmit
374 int xprt_adjust_timeout(struct rpc_rqst
*req
)
376 struct rpc_xprt
*xprt
= req
->rq_xprt
;
377 struct rpc_timeout
*to
= &xprt
->timeout
;
380 if (time_before(jiffies
, req
->rq_majortimeo
)) {
381 if (to
->to_exponential
)
382 req
->rq_timeout
<<= 1;
384 req
->rq_timeout
+= to
->to_increment
;
385 if (to
->to_maxval
&& req
->rq_timeout
>= to
->to_maxval
)
386 req
->rq_timeout
= to
->to_maxval
;
388 pprintk("RPC: %lu retrans\n", jiffies
);
390 req
->rq_timeout
= to
->to_initval
;
392 xprt_reset_majortimeo(req
);
393 /* Reset the RTT counters == "slow start" */
394 spin_lock_bh(&xprt
->sock_lock
);
395 rpc_init_rtt(req
->rq_task
->tk_client
->cl_rtt
, to
->to_initval
);
396 spin_unlock_bh(&xprt
->sock_lock
);
397 pprintk("RPC: %lu timeout\n", jiffies
);
401 if (req
->rq_timeout
== 0) {
402 printk(KERN_WARNING
"xprt_adjust_timeout: rq_timeout = 0!\n");
403 req
->rq_timeout
= 5 * HZ
;
409 * Close down a transport socket
412 xprt_close(struct rpc_xprt
*xprt
)
414 struct socket
*sock
= xprt
->sock
;
415 struct sock
*sk
= xprt
->inet
;
420 write_lock_bh(&sk
->sk_callback_lock
);
424 sk
->sk_user_data
= NULL
;
425 sk
->sk_data_ready
= xprt
->old_data_ready
;
426 sk
->sk_state_change
= xprt
->old_state_change
;
427 sk
->sk_write_space
= xprt
->old_write_space
;
428 write_unlock_bh(&sk
->sk_callback_lock
);
436 xprt_socket_autoclose(void *args
)
438 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)args
;
440 xprt_disconnect(xprt
);
442 xprt_release_write(xprt
, NULL
);
446 * Mark a transport as disconnected
449 xprt_disconnect(struct rpc_xprt
*xprt
)
451 dprintk("RPC: disconnected transport %p\n", xprt
);
452 spin_lock_bh(&xprt
->sock_lock
);
453 xprt_clear_connected(xprt
);
454 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
455 spin_unlock_bh(&xprt
->sock_lock
);
459 * Used to allow disconnection when we've been idle
462 xprt_init_autodisconnect(unsigned long data
)
464 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)data
;
466 spin_lock(&xprt
->sock_lock
);
467 if (!list_empty(&xprt
->recv
) || xprt
->shutdown
)
469 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
))
471 spin_unlock(&xprt
->sock_lock
);
472 /* Let keventd close the socket */
473 if (test_bit(XPRT_CONNECTING
, &xprt
->sockstate
) != 0)
474 xprt_release_write(xprt
, NULL
);
476 schedule_work(&xprt
->task_cleanup
);
479 spin_unlock(&xprt
->sock_lock
);
482 static void xprt_socket_connect(void *args
)
484 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)args
;
485 struct socket
*sock
= xprt
->sock
;
488 if (xprt
->shutdown
|| xprt
->addr
.sin_port
== 0)
492 * Start by resetting any existing state
495 sock
= xprt_create_socket(xprt
, xprt
->prot
, xprt
->resvport
);
497 /* couldn't create socket or bind to reserved port;
498 * this is likely a permanent error, so cause an abort */
501 xprt_bind_socket(xprt
, sock
);
502 xprt_sock_setbufsize(xprt
);
509 * Tell the socket layer to start connecting...
511 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
512 sizeof(xprt
->addr
), O_NONBLOCK
);
513 dprintk("RPC: %p connect status %d connected %d sock state %d\n",
514 xprt
, -status
, xprt_connected(xprt
), sock
->sk
->sk_state
);
524 rpc_wake_up_status(&xprt
->pending
, status
);
526 rpc_wake_up(&xprt
->pending
);
528 smp_mb__before_clear_bit();
529 clear_bit(XPRT_CONNECTING
, &xprt
->sockstate
);
530 smp_mb__after_clear_bit();
534 * Attempt to connect a TCP socket.
537 void xprt_connect(struct rpc_task
*task
)
539 struct rpc_xprt
*xprt
= task
->tk_xprt
;
541 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task
->tk_pid
,
542 xprt
, (xprt_connected(xprt
) ? "is" : "is not"));
544 if (xprt
->shutdown
) {
545 task
->tk_status
= -EIO
;
548 if (!xprt
->addr
.sin_port
) {
549 task
->tk_status
= -EIO
;
552 if (!xprt_lock_write(xprt
, task
))
554 if (xprt_connected(xprt
))
558 task
->tk_rqstp
->rq_bytes_sent
= 0;
560 task
->tk_timeout
= RPC_CONNECT_TIMEOUT
;
561 rpc_sleep_on(&xprt
->pending
, task
, xprt_connect_status
, NULL
);
562 if (!test_and_set_bit(XPRT_CONNECTING
, &xprt
->sockstate
)) {
563 /* Note: if we are here due to a dropped connection
564 * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ
567 if (xprt
->sock
!= NULL
)
568 schedule_delayed_work(&xprt
->sock_connect
,
569 RPC_REESTABLISH_TIMEOUT
);
571 schedule_work(&xprt
->sock_connect
);
572 if (!RPC_IS_ASYNC(task
))
573 flush_scheduled_work();
578 xprt_release_write(xprt
, task
);
582 * We arrive here when awoken from waiting on connection establishment.
585 xprt_connect_status(struct rpc_task
*task
)
587 struct rpc_xprt
*xprt
= task
->tk_xprt
;
589 if (task
->tk_status
>= 0) {
590 dprintk("RPC: %4d xprt_connect_status: connection established\n",
595 /* if soft mounted, just cause this RPC to fail */
596 if (RPC_IS_SOFT(task
))
597 task
->tk_status
= -EIO
;
599 switch (task
->tk_status
) {
605 dprintk("RPC: %4d xprt_connect_status: timed out\n",
609 printk(KERN_ERR
"RPC: error %d connecting to server %s\n",
610 -task
->tk_status
, task
->tk_client
->cl_server
);
612 xprt_release_write(xprt
, task
);
616 * Look up the RPC request corresponding to a reply, and then lock it.
618 static inline struct rpc_rqst
*
619 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
621 struct list_head
*pos
;
622 struct rpc_rqst
*req
= NULL
;
624 list_for_each(pos
, &xprt
->recv
) {
625 struct rpc_rqst
*entry
= list_entry(pos
, struct rpc_rqst
, rq_list
);
626 if (entry
->rq_xid
== xid
) {
635 * Complete reply received.
636 * The TCP code relies on us to remove the request from xprt->pending.
639 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
641 struct rpc_task
*task
= req
->rq_task
;
642 struct rpc_clnt
*clnt
= task
->tk_client
;
644 /* Adjust congestion window */
646 unsigned timer
= task
->tk_msg
.rpc_proc
->p_timer
;
647 xprt_adjust_cwnd(xprt
, copied
);
648 __xprt_put_cong(xprt
, req
);
650 if (req
->rq_ntrans
== 1)
651 rpc_update_rtt(clnt
->cl_rtt
, timer
,
652 (long)jiffies
- req
->rq_xtime
);
653 rpc_set_timeo(clnt
->cl_rtt
, timer
, req
->rq_ntrans
- 1);
658 /* Profile only reads for now */
660 static unsigned long nextstat
;
661 static unsigned long pkt_rtt
, pkt_len
, pkt_cnt
;
664 pkt_len
+= req
->rq_slen
+ copied
;
665 pkt_rtt
+= jiffies
- req
->rq_xtime
;
666 if (time_before(nextstat
, jiffies
)) {
667 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
668 printk("RPC: %ld %ld %ld %ld stat\n",
669 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
670 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
671 nextstat
= jiffies
+ 5 * HZ
;
676 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
677 list_del_init(&req
->rq_list
);
678 req
->rq_received
= req
->rq_private_buf
.len
= copied
;
680 /* ... and wake up the process. */
681 rpc_wake_up_task(task
);
686 skb_read_bits(skb_reader_t
*desc
, void *to
, size_t len
)
688 if (len
> desc
->count
)
690 if (skb_copy_bits(desc
->skb
, desc
->offset
, to
, len
))
698 skb_read_and_csum_bits(skb_reader_t
*desc
, void *to
, size_t len
)
700 unsigned int csum2
, pos
;
702 if (len
> desc
->count
)
705 csum2
= skb_copy_and_csum_bits(desc
->skb
, pos
, to
, len
, 0);
706 desc
->csum
= csum_block_add(desc
->csum
, csum2
, pos
);
713 * We have set things up such that we perform the checksum of the UDP
714 * packet in parallel with the copies into the RPC client iovec. -DaveM
717 csum_partial_copy_to_xdr(struct xdr_buf
*xdr
, struct sk_buff
*skb
)
722 desc
.offset
= sizeof(struct udphdr
);
723 desc
.count
= skb
->len
- desc
.offset
;
725 if (skb
->ip_summed
== CHECKSUM_UNNECESSARY
)
728 desc
.csum
= csum_partial(skb
->data
, desc
.offset
, skb
->csum
);
729 if (xdr_partial_copy_from_skb(xdr
, 0, &desc
, skb_read_and_csum_bits
) < 0)
731 if (desc
.offset
!= skb
->len
) {
733 csum2
= skb_checksum(skb
, desc
.offset
, skb
->len
- desc
.offset
, 0);
734 desc
.csum
= csum_block_add(desc
.csum
, csum2
, desc
.offset
);
738 if ((unsigned short)csum_fold(desc
.csum
))
742 if (xdr_partial_copy_from_skb(xdr
, 0, &desc
, skb_read_bits
) < 0)
750 * Input handler for RPC replies. Called from a bottom half and hence
754 udp_data_ready(struct sock
*sk
, int len
)
756 struct rpc_task
*task
;
757 struct rpc_xprt
*xprt
;
758 struct rpc_rqst
*rovr
;
760 int err
, repsize
, copied
;
763 read_lock(&sk
->sk_callback_lock
);
764 dprintk("RPC: udp_data_ready...\n");
765 if (!(xprt
= xprt_from_sock(sk
))) {
766 printk("RPC: udp_data_ready request not found!\n");
770 dprintk("RPC: udp_data_ready client %p\n", xprt
);
772 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
778 repsize
= skb
->len
- sizeof(struct udphdr
);
780 printk("RPC: impossible RPC reply size %d!\n", repsize
);
784 /* Copy the XID from the skb... */
785 xp
= skb_header_pointer(skb
, sizeof(struct udphdr
),
786 sizeof(_xid
), &_xid
);
790 /* Look up and lock the request corresponding to the given XID */
791 spin_lock(&xprt
->sock_lock
);
792 rovr
= xprt_lookup_rqst(xprt
, *xp
);
795 task
= rovr
->rq_task
;
797 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
799 if ((copied
= rovr
->rq_private_buf
.buflen
) > repsize
)
802 /* Suck it into the iovec, verify checksum if not done by hw. */
803 if (csum_partial_copy_to_xdr(&rovr
->rq_private_buf
, skb
))
806 /* Something worked... */
807 dst_confirm(skb
->dst
);
809 xprt_complete_rqst(xprt
, rovr
, copied
);
812 spin_unlock(&xprt
->sock_lock
);
814 skb_free_datagram(sk
, skb
);
816 read_unlock(&sk
->sk_callback_lock
);
820 * Copy from an skb into memory and shrink the skb.
823 tcp_copy_data(skb_reader_t
*desc
, void *p
, size_t len
)
825 if (len
> desc
->count
)
827 if (skb_copy_bits(desc
->skb
, desc
->offset
, p
, len
)) {
828 dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n",
834 dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n",
840 * TCP read fragment marker
843 tcp_read_fraghdr(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
848 p
= ((char *) &xprt
->tcp_recm
) + xprt
->tcp_offset
;
849 len
= sizeof(xprt
->tcp_recm
) - xprt
->tcp_offset
;
850 used
= tcp_copy_data(desc
, p
, len
);
851 xprt
->tcp_offset
+= used
;
854 xprt
->tcp_reclen
= ntohl(xprt
->tcp_recm
);
855 if (xprt
->tcp_reclen
& 0x80000000)
856 xprt
->tcp_flags
|= XPRT_LAST_FRAG
;
858 xprt
->tcp_flags
&= ~XPRT_LAST_FRAG
;
859 xprt
->tcp_reclen
&= 0x7fffffff;
860 xprt
->tcp_flags
&= ~XPRT_COPY_RECM
;
861 xprt
->tcp_offset
= 0;
862 /* Sanity check of the record length */
863 if (xprt
->tcp_reclen
< 4) {
864 printk(KERN_ERR
"RPC: Invalid TCP record fragment length\n");
865 xprt_disconnect(xprt
);
867 dprintk("RPC: reading TCP record fragment of length %d\n",
872 tcp_check_recm(struct rpc_xprt
*xprt
)
874 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
875 xprt
, xprt
->tcp_copied
, xprt
->tcp_offset
, xprt
->tcp_reclen
, xprt
->tcp_flags
);
876 if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
877 xprt
->tcp_flags
|= XPRT_COPY_RECM
;
878 xprt
->tcp_offset
= 0;
879 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
) {
880 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
881 xprt
->tcp_flags
|= XPRT_COPY_XID
;
882 xprt
->tcp_copied
= 0;
891 tcp_read_xid(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
896 len
= sizeof(xprt
->tcp_xid
) - xprt
->tcp_offset
;
897 dprintk("RPC: reading XID (%Zu bytes)\n", len
);
898 p
= ((char *) &xprt
->tcp_xid
) + xprt
->tcp_offset
;
899 used
= tcp_copy_data(desc
, p
, len
);
900 xprt
->tcp_offset
+= used
;
903 xprt
->tcp_flags
&= ~XPRT_COPY_XID
;
904 xprt
->tcp_flags
|= XPRT_COPY_DATA
;
905 xprt
->tcp_copied
= 4;
906 dprintk("RPC: reading reply for XID %08x\n",
907 ntohl(xprt
->tcp_xid
));
908 tcp_check_recm(xprt
);
912 * TCP read and complete request
915 tcp_read_request(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
917 struct rpc_rqst
*req
;
918 struct xdr_buf
*rcvbuf
;
922 /* Find and lock the request corresponding to this xid */
923 spin_lock(&xprt
->sock_lock
);
924 req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
);
926 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
927 dprintk("RPC: XID %08x request not found!\n",
928 ntohl(xprt
->tcp_xid
));
929 spin_unlock(&xprt
->sock_lock
);
933 rcvbuf
= &req
->rq_private_buf
;
935 if (len
> xprt
->tcp_reclen
- xprt
->tcp_offset
) {
936 skb_reader_t my_desc
;
938 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
939 memcpy(&my_desc
, desc
, sizeof(my_desc
));
941 r
= xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
942 &my_desc
, tcp_copy_data
);
946 r
= xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
947 desc
, tcp_copy_data
);
950 xprt
->tcp_copied
+= r
;
951 xprt
->tcp_offset
+= r
;
954 /* Error when copying to the receive buffer,
955 * usually because we weren't able to allocate
956 * additional buffer pages. All we can do now
957 * is turn off XPRT_COPY_DATA, so the request
958 * will not receive any additional updates,
960 * Any remaining data from this record will
963 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
964 dprintk("RPC: XID %08x truncated request\n",
965 ntohl(xprt
->tcp_xid
));
966 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
967 xprt
, xprt
->tcp_copied
, xprt
->tcp_offset
, xprt
->tcp_reclen
);
971 dprintk("RPC: XID %08x read %Zd bytes\n",
972 ntohl(xprt
->tcp_xid
), r
);
973 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
974 xprt
, xprt
->tcp_copied
, xprt
->tcp_offset
, xprt
->tcp_reclen
);
976 if (xprt
->tcp_copied
== req
->rq_private_buf
.buflen
)
977 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
978 else if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
979 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
)
980 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
984 if (!(xprt
->tcp_flags
& XPRT_COPY_DATA
)) {
985 dprintk("RPC: %4d received reply complete\n",
986 req
->rq_task
->tk_pid
);
987 xprt_complete_rqst(xprt
, req
, xprt
->tcp_copied
);
989 spin_unlock(&xprt
->sock_lock
);
990 tcp_check_recm(xprt
);
994 * TCP discard extra bytes from a short read
997 tcp_read_discard(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
1001 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
1002 if (len
> desc
->count
)
1005 desc
->offset
+= len
;
1006 xprt
->tcp_offset
+= len
;
1007 dprintk("RPC: discarded %Zu bytes\n", len
);
1008 tcp_check_recm(xprt
);
1012 * TCP record receive routine
1013 * We first have to grab the record marker, then the XID, then the data.
1016 tcp_data_recv(read_descriptor_t
*rd_desc
, struct sk_buff
*skb
,
1017 unsigned int offset
, size_t len
)
1019 struct rpc_xprt
*xprt
= rd_desc
->arg
.data
;
1020 skb_reader_t desc
= {
1027 dprintk("RPC: tcp_data_recv\n");
1029 /* Read in a new fragment marker if necessary */
1030 /* Can we ever really expect to get completely empty fragments? */
1031 if (xprt
->tcp_flags
& XPRT_COPY_RECM
) {
1032 tcp_read_fraghdr(xprt
, &desc
);
1035 /* Read in the xid if necessary */
1036 if (xprt
->tcp_flags
& XPRT_COPY_XID
) {
1037 tcp_read_xid(xprt
, &desc
);
1040 /* Read in the request data */
1041 if (xprt
->tcp_flags
& XPRT_COPY_DATA
) {
1042 tcp_read_request(xprt
, &desc
);
1045 /* Skip over any trailing bytes on short reads */
1046 tcp_read_discard(xprt
, &desc
);
1047 } while (desc
.count
);
1048 dprintk("RPC: tcp_data_recv done\n");
1049 return len
- desc
.count
;
1052 static void tcp_data_ready(struct sock
*sk
, int bytes
)
1054 struct rpc_xprt
*xprt
;
1055 read_descriptor_t rd_desc
;
1057 read_lock(&sk
->sk_callback_lock
);
1058 dprintk("RPC: tcp_data_ready...\n");
1059 if (!(xprt
= xprt_from_sock(sk
))) {
1060 printk("RPC: tcp_data_ready socket info not found!\n");
1066 /* We use rd_desc to pass struct xprt to tcp_data_recv */
1067 rd_desc
.arg
.data
= xprt
;
1068 rd_desc
.count
= 65536;
1069 tcp_read_sock(sk
, &rd_desc
, tcp_data_recv
);
1071 read_unlock(&sk
->sk_callback_lock
);
1075 tcp_state_change(struct sock
*sk
)
1077 struct rpc_xprt
*xprt
;
1079 read_lock(&sk
->sk_callback_lock
);
1080 if (!(xprt
= xprt_from_sock(sk
)))
1082 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
1083 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
1084 sk
->sk_state
, xprt_connected(xprt
),
1085 sock_flag(sk
, SOCK_DEAD
),
1086 sock_flag(sk
, SOCK_ZAPPED
));
1088 switch (sk
->sk_state
) {
1089 case TCP_ESTABLISHED
:
1090 spin_lock_bh(&xprt
->sock_lock
);
1091 if (!xprt_test_and_set_connected(xprt
)) {
1092 /* Reset TCP record info */
1093 xprt
->tcp_offset
= 0;
1094 xprt
->tcp_reclen
= 0;
1095 xprt
->tcp_copied
= 0;
1096 xprt
->tcp_flags
= XPRT_COPY_RECM
| XPRT_COPY_XID
;
1097 rpc_wake_up(&xprt
->pending
);
1099 spin_unlock_bh(&xprt
->sock_lock
);
1105 xprt_disconnect(xprt
);
1109 read_unlock(&sk
->sk_callback_lock
);
1113 * Called when more output buffer space is available for this socket.
1114 * We try not to wake our writers until they can make "significant"
1115 * progress, otherwise we'll waste resources thrashing sock_sendmsg
1116 * with a bunch of small requests.
1119 xprt_write_space(struct sock
*sk
)
1121 struct rpc_xprt
*xprt
;
1122 struct socket
*sock
;
1124 read_lock(&sk
->sk_callback_lock
);
1125 if (!(xprt
= xprt_from_sock(sk
)) || !(sock
= sk
->sk_socket
))
1130 /* Wait until we have enough socket memory */
1132 /* from net/core/stream.c:sk_stream_write_space */
1133 if (sk_stream_wspace(sk
) < sk_stream_min_wspace(sk
))
1136 /* from net/core/sock.c:sock_def_write_space */
1137 if (!sock_writeable(sk
))
1141 if (!test_and_clear_bit(SOCK_NOSPACE
, &sock
->flags
))
1144 spin_lock_bh(&xprt
->sock_lock
);
1146 rpc_wake_up_task(xprt
->snd_task
);
1147 spin_unlock_bh(&xprt
->sock_lock
);
1149 read_unlock(&sk
->sk_callback_lock
);
1153 * RPC receive timeout handler.
1156 xprt_timer(struct rpc_task
*task
)
1158 struct rpc_rqst
*req
= task
->tk_rqstp
;
1159 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1161 spin_lock(&xprt
->sock_lock
);
1162 if (req
->rq_received
)
1165 xprt_adjust_cwnd(req
->rq_xprt
, -ETIMEDOUT
);
1166 __xprt_put_cong(xprt
, req
);
1168 dprintk("RPC: %4d xprt_timer (%s request)\n",
1169 task
->tk_pid
, req
? "pending" : "backlogged");
1171 task
->tk_status
= -ETIMEDOUT
;
1173 task
->tk_timeout
= 0;
1174 rpc_wake_up_task(task
);
1175 spin_unlock(&xprt
->sock_lock
);
1179 * Place the actual RPC call.
1180 * We have to copy the iovec because sendmsg fiddles with its contents.
1183 xprt_prepare_transmit(struct rpc_task
*task
)
1185 struct rpc_rqst
*req
= task
->tk_rqstp
;
1186 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1189 dprintk("RPC: %4d xprt_prepare_transmit\n", task
->tk_pid
);
1194 spin_lock_bh(&xprt
->sock_lock
);
1195 if (req
->rq_received
&& !req
->rq_bytes_sent
) {
1196 err
= req
->rq_received
;
1199 if (!__xprt_lock_write(xprt
, task
)) {
1204 if (!xprt_connected(xprt
)) {
1209 spin_unlock_bh(&xprt
->sock_lock
);
1214 xprt_transmit(struct rpc_task
*task
)
1216 struct rpc_clnt
*clnt
= task
->tk_client
;
1217 struct rpc_rqst
*req
= task
->tk_rqstp
;
1218 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1219 int status
, retry
= 0;
1222 dprintk("RPC: %4d xprt_transmit(%u)\n", task
->tk_pid
, req
->rq_slen
);
1224 /* set up everything as needed. */
1225 /* Write the record marker */
1227 u32
*marker
= req
->rq_svec
[0].iov_base
;
1229 *marker
= htonl(0x80000000|(req
->rq_slen
-sizeof(*marker
)));
1233 if (!req
->rq_received
) {
1234 if (list_empty(&req
->rq_list
)) {
1235 spin_lock_bh(&xprt
->sock_lock
);
1236 /* Update the softirq receive buffer */
1237 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
,
1238 sizeof(req
->rq_private_buf
));
1239 /* Add request to the receive list */
1240 list_add_tail(&req
->rq_list
, &xprt
->recv
);
1241 spin_unlock_bh(&xprt
->sock_lock
);
1242 xprt_reset_majortimeo(req
);
1243 /* Turn off autodisconnect */
1244 del_singleshot_timer_sync(&xprt
->timer
);
1246 } else if (!req
->rq_bytes_sent
)
1249 /* Continue transmitting the packet/record. We must be careful
1250 * to cope with writespace callbacks arriving _after_ we have
1251 * called xprt_sendmsg().
1254 req
->rq_xtime
= jiffies
;
1255 status
= xprt_sendmsg(xprt
, req
);
1261 req
->rq_bytes_sent
+= status
;
1263 /* If we've sent the entire packet, immediately
1264 * reset the count of bytes sent. */
1265 if (req
->rq_bytes_sent
>= req
->rq_slen
) {
1266 req
->rq_bytes_sent
= 0;
1270 if (status
>= req
->rq_slen
)
1276 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1277 task
->tk_pid
, req
->rq_slen
- req
->rq_bytes_sent
,
1285 /* Note: at this point, task->tk_sleeping has not yet been set,
1286 * hence there is no danger of the waking up task being put on
1287 * schedq, and being picked up by a parallel run of rpciod().
1289 task
->tk_status
= status
;
1293 if (test_bit(SOCK_ASYNC_NOSPACE
, &xprt
->sock
->flags
)) {
1294 /* Protect against races with xprt_write_space */
1295 spin_lock_bh(&xprt
->sock_lock
);
1296 /* Don't race with disconnect */
1297 if (!xprt_connected(xprt
))
1298 task
->tk_status
= -ENOTCONN
;
1299 else if (test_bit(SOCK_NOSPACE
, &xprt
->sock
->flags
)) {
1300 task
->tk_timeout
= req
->rq_timeout
;
1301 rpc_sleep_on(&xprt
->pending
, task
, NULL
, NULL
);
1303 spin_unlock_bh(&xprt
->sock_lock
);
1306 /* Keep holding the socket if it is blocked */
1307 rpc_delay(task
, HZ
>>4);
1310 task
->tk_timeout
= RPC_REESTABLISH_TIMEOUT
;
1311 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
1316 xprt_disconnect(xprt
);
1318 xprt_release_write(xprt
, task
);
1321 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1322 /* Set the task's receive timeout value */
1323 spin_lock_bh(&xprt
->sock_lock
);
1324 if (!xprt
->nocong
) {
1325 int timer
= task
->tk_msg
.rpc_proc
->p_timer
;
1326 task
->tk_timeout
= rpc_calc_rto(clnt
->cl_rtt
, timer
);
1327 task
->tk_timeout
<<= rpc_ntimeo(clnt
->cl_rtt
, timer
) + req
->rq_retries
;
1328 if (task
->tk_timeout
> xprt
->timeout
.to_maxval
|| task
->tk_timeout
== 0)
1329 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
1331 task
->tk_timeout
= req
->rq_timeout
;
1332 /* Don't race with disconnect */
1333 if (!xprt_connected(xprt
))
1334 task
->tk_status
= -ENOTCONN
;
1335 else if (!req
->rq_received
)
1336 rpc_sleep_on(&xprt
->pending
, task
, NULL
, xprt_timer
);
1337 __xprt_release_write(xprt
, task
);
1338 spin_unlock_bh(&xprt
->sock_lock
);
1342 * Reserve an RPC call slot.
1345 do_xprt_reserve(struct rpc_task
*task
)
1347 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1349 task
->tk_status
= 0;
1352 if (!list_empty(&xprt
->free
)) {
1353 struct rpc_rqst
*req
= list_entry(xprt
->free
.next
, struct rpc_rqst
, rq_list
);
1354 list_del_init(&req
->rq_list
);
1355 task
->tk_rqstp
= req
;
1356 xprt_request_init(task
, xprt
);
1359 dprintk("RPC: waiting for request slot\n");
1360 task
->tk_status
= -EAGAIN
;
1361 task
->tk_timeout
= 0;
1362 rpc_sleep_on(&xprt
->backlog
, task
, NULL
, NULL
);
1366 xprt_reserve(struct rpc_task
*task
)
1368 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1370 task
->tk_status
= -EIO
;
1371 if (!xprt
->shutdown
) {
1372 spin_lock(&xprt
->xprt_lock
);
1373 do_xprt_reserve(task
);
1374 spin_unlock(&xprt
->xprt_lock
);
1379 * Allocate a 'unique' XID
1381 static inline u32
xprt_alloc_xid(struct rpc_xprt
*xprt
)
1386 static inline void xprt_init_xid(struct rpc_xprt
*xprt
)
1388 get_random_bytes(&xprt
->xid
, sizeof(xprt
->xid
));
1392 * Initialize RPC request
1395 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1397 struct rpc_rqst
*req
= task
->tk_rqstp
;
1399 req
->rq_timeout
= xprt
->timeout
.to_initval
;
1400 req
->rq_task
= task
;
1401 req
->rq_xprt
= xprt
;
1402 req
->rq_xid
= xprt_alloc_xid(xprt
);
1403 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
,
1404 req
, ntohl(req
->rq_xid
));
1408 * Release an RPC call slot
1411 xprt_release(struct rpc_task
*task
)
1413 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1414 struct rpc_rqst
*req
;
1416 if (!(req
= task
->tk_rqstp
))
1418 spin_lock_bh(&xprt
->sock_lock
);
1419 __xprt_release_write(xprt
, task
);
1420 __xprt_put_cong(xprt
, req
);
1421 if (!list_empty(&req
->rq_list
))
1422 list_del(&req
->rq_list
);
1423 xprt
->last_used
= jiffies
;
1424 if (list_empty(&xprt
->recv
) && !xprt
->shutdown
)
1425 mod_timer(&xprt
->timer
, xprt
->last_used
+ XPRT_IDLE_TIMEOUT
);
1426 spin_unlock_bh(&xprt
->sock_lock
);
1427 task
->tk_rqstp
= NULL
;
1428 memset(req
, 0, sizeof(*req
)); /* mark unused */
1430 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1432 spin_lock(&xprt
->xprt_lock
);
1433 list_add(&req
->rq_list
, &xprt
->free
);
1434 xprt_clear_backlog(xprt
);
1435 spin_unlock(&xprt
->xprt_lock
);
1439 * Set default timeout parameters
1442 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1444 if (proto
== IPPROTO_UDP
)
1445 xprt_set_timeout(to
, 5, 5 * HZ
);
1447 xprt_set_timeout(to
, 5, 60 * HZ
);
1451 * Set constant timeout
1454 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1457 to
->to_increment
= incr
;
1458 to
->to_maxval
= incr
* retr
;
1459 to
->to_retries
= retr
;
1460 to
->to_exponential
= 0;
1463 unsigned int xprt_udp_slot_table_entries
= RPC_DEF_SLOT_TABLE
;
1464 unsigned int xprt_tcp_slot_table_entries
= RPC_DEF_SLOT_TABLE
;
1467 * Initialize an RPC client
1469 static struct rpc_xprt
*
1470 xprt_setup(int proto
, struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1472 struct rpc_xprt
*xprt
;
1473 unsigned int entries
;
1474 size_t slot_table_size
;
1475 struct rpc_rqst
*req
;
1477 dprintk("RPC: setting up %s transport...\n",
1478 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1480 entries
= (proto
== IPPROTO_TCP
)?
1481 xprt_tcp_slot_table_entries
: xprt_udp_slot_table_entries
;
1483 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1484 return ERR_PTR(-ENOMEM
);
1485 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1486 xprt
->max_reqs
= entries
;
1487 slot_table_size
= entries
* sizeof(xprt
->slot
[0]);
1488 xprt
->slot
= kmalloc(slot_table_size
, GFP_KERNEL
);
1489 if (xprt
->slot
== NULL
) {
1491 return ERR_PTR(-ENOMEM
);
1493 memset(xprt
->slot
, 0, slot_table_size
);
1497 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1499 xprt
->cwnd
= RPC_MAXCWND(xprt
);
1501 xprt
->max_payload
= (1U << 31) - 1;
1503 xprt
->cwnd
= RPC_INITCWND
;
1504 xprt
->max_payload
= (1U << 16) - (MAX_HEADER
<< 3);
1506 spin_lock_init(&xprt
->sock_lock
);
1507 spin_lock_init(&xprt
->xprt_lock
);
1508 init_waitqueue_head(&xprt
->cong_wait
);
1510 INIT_LIST_HEAD(&xprt
->free
);
1511 INIT_LIST_HEAD(&xprt
->recv
);
1512 INIT_WORK(&xprt
->sock_connect
, xprt_socket_connect
, xprt
);
1513 INIT_WORK(&xprt
->task_cleanup
, xprt_socket_autoclose
, xprt
);
1514 init_timer(&xprt
->timer
);
1515 xprt
->timer
.function
= xprt_init_autodisconnect
;
1516 xprt
->timer
.data
= (unsigned long) xprt
;
1517 xprt
->last_used
= jiffies
;
1518 xprt
->port
= XPRT_MAX_RESVPORT
;
1520 /* Set timeout parameters */
1522 xprt
->timeout
= *to
;
1524 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1526 rpc_init_wait_queue(&xprt
->pending
, "xprt_pending");
1527 rpc_init_wait_queue(&xprt
->sending
, "xprt_sending");
1528 rpc_init_wait_queue(&xprt
->resend
, "xprt_resend");
1529 rpc_init_priority_wait_queue(&xprt
->backlog
, "xprt_backlog");
1531 /* initialize free list */
1532 for (req
= &xprt
->slot
[entries
-1]; req
>= &xprt
->slot
[0]; req
--)
1533 list_add(&req
->rq_list
, &xprt
->free
);
1535 xprt_init_xid(xprt
);
1537 /* Check whether we want to use a reserved port */
1538 xprt
->resvport
= capable(CAP_NET_BIND_SERVICE
) ? 1 : 0;
1540 dprintk("RPC: created transport %p with %u slots\n", xprt
,
1547 * Bind to a reserved port
1549 static inline int xprt_bindresvport(struct rpc_xprt
*xprt
, struct socket
*sock
)
1551 struct sockaddr_in myaddr
= {
1552 .sin_family
= AF_INET
,
1556 /* Were we already bound to a given port? Try to reuse it */
1559 myaddr
.sin_port
= htons(port
);
1560 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1567 port
= XPRT_MAX_RESVPORT
;
1568 } while (err
== -EADDRINUSE
&& port
!= xprt
->port
);
1570 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1575 xprt_bind_socket(struct rpc_xprt
*xprt
, struct socket
*sock
)
1577 struct sock
*sk
= sock
->sk
;
1582 write_lock_bh(&sk
->sk_callback_lock
);
1583 sk
->sk_user_data
= xprt
;
1584 xprt
->old_data_ready
= sk
->sk_data_ready
;
1585 xprt
->old_state_change
= sk
->sk_state_change
;
1586 xprt
->old_write_space
= sk
->sk_write_space
;
1587 if (xprt
->prot
== IPPROTO_UDP
) {
1588 sk
->sk_data_ready
= udp_data_ready
;
1589 sk
->sk_no_check
= UDP_CSUM_NORCV
;
1590 xprt_set_connected(xprt
);
1592 tcp_sk(sk
)->nonagle
= 1; /* disable Nagle's algorithm */
1593 sk
->sk_data_ready
= tcp_data_ready
;
1594 sk
->sk_state_change
= tcp_state_change
;
1595 xprt_clear_connected(xprt
);
1597 sk
->sk_write_space
= xprt_write_space
;
1599 /* Reset to new socket */
1602 write_unlock_bh(&sk
->sk_callback_lock
);
1608 * Set socket buffer length
1611 xprt_sock_setbufsize(struct rpc_xprt
*xprt
)
1613 struct sock
*sk
= xprt
->inet
;
1617 if (xprt
->rcvsize
) {
1618 sk
->sk_userlocks
|= SOCK_RCVBUF_LOCK
;
1619 sk
->sk_rcvbuf
= xprt
->rcvsize
* xprt
->max_reqs
* 2;
1621 if (xprt
->sndsize
) {
1622 sk
->sk_userlocks
|= SOCK_SNDBUF_LOCK
;
1623 sk
->sk_sndbuf
= xprt
->sndsize
* xprt
->max_reqs
* 2;
1624 sk
->sk_write_space(sk
);
1629 * Datastream sockets are created here, but xprt_connect will create
1630 * and connect stream sockets.
1632 static struct socket
* xprt_create_socket(struct rpc_xprt
*xprt
, int proto
, int resvport
)
1634 struct socket
*sock
;
1637 dprintk("RPC: xprt_create_socket(%s %d)\n",
1638 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1640 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1642 if ((err
= sock_create_kern(PF_INET
, type
, proto
, &sock
)) < 0) {
1643 printk("RPC: can't create socket (%d).\n", -err
);
1647 /* If the caller has the capability, bind to a reserved port */
1648 if (resvport
&& xprt_bindresvport(xprt
, sock
) < 0) {
1649 printk("RPC: can't bind to reserved port.\n");
1661 * Create an RPC client transport given the protocol and peer address.
1664 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1666 struct rpc_xprt
*xprt
;
1668 xprt
= xprt_setup(proto
, sap
, to
);
1670 dprintk("RPC: xprt_create_proto failed\n");
1672 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt
);
1677 * Prepare for transport shutdown.
1680 xprt_shutdown(struct rpc_xprt
*xprt
)
1683 rpc_wake_up(&xprt
->sending
);
1684 rpc_wake_up(&xprt
->resend
);
1685 rpc_wake_up(&xprt
->pending
);
1686 rpc_wake_up(&xprt
->backlog
);
1687 wake_up(&xprt
->cong_wait
);
1688 del_timer_sync(&xprt
->timer
);
1690 /* synchronously wait for connect worker to finish */
1691 cancel_delayed_work(&xprt
->sock_connect
);
1692 flush_scheduled_work();
1696 * Clear the xprt backlog queue
1699 xprt_clear_backlog(struct rpc_xprt
*xprt
) {
1700 rpc_wake_up_next(&xprt
->backlog
);
1701 wake_up(&xprt
->cong_wait
);
1706 * Destroy an RPC transport, killing off all requests.
1709 xprt_destroy(struct rpc_xprt
*xprt
)
1711 dprintk("RPC: destroying transport %p\n", xprt
);
1712 xprt_shutdown(xprt
);
1713 xprt_disconnect(xprt
);