1 /* RxRPC individual remote procedure call handling
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
14 #include <linux/slab.h>
15 #include <linux/module.h>
16 #include <linux/circ_buf.h>
17 #include <linux/spinlock_types.h>
19 #include <net/af_rxrpc.h>
20 #include "ar-internal.h"
23 * Maximum lifetime of a call (in jiffies).
25 unsigned int rxrpc_max_call_lifetime
= 60 * HZ
;
27 const char *const rxrpc_call_states
[NR__RXRPC_CALL_STATES
] = {
28 [RXRPC_CALL_UNINITIALISED
] = "Uninit ",
29 [RXRPC_CALL_CLIENT_AWAIT_CONN
] = "ClWtConn",
30 [RXRPC_CALL_CLIENT_SEND_REQUEST
] = "ClSndReq",
31 [RXRPC_CALL_CLIENT_AWAIT_REPLY
] = "ClAwtRpl",
32 [RXRPC_CALL_CLIENT_RECV_REPLY
] = "ClRcvRpl",
33 [RXRPC_CALL_SERVER_PREALLOC
] = "SvPrealc",
34 [RXRPC_CALL_SERVER_SECURING
] = "SvSecure",
35 [RXRPC_CALL_SERVER_ACCEPTING
] = "SvAccept",
36 [RXRPC_CALL_SERVER_RECV_REQUEST
] = "SvRcvReq",
37 [RXRPC_CALL_SERVER_ACK_REQUEST
] = "SvAckReq",
38 [RXRPC_CALL_SERVER_SEND_REPLY
] = "SvSndRpl",
39 [RXRPC_CALL_SERVER_AWAIT_ACK
] = "SvAwtACK",
40 [RXRPC_CALL_COMPLETE
] = "Complete",
43 const char *const rxrpc_call_completions
[NR__RXRPC_CALL_COMPLETIONS
] = {
44 [RXRPC_CALL_SUCCEEDED
] = "Complete",
45 [RXRPC_CALL_REMOTELY_ABORTED
] = "RmtAbort",
46 [RXRPC_CALL_LOCALLY_ABORTED
] = "LocAbort",
47 [RXRPC_CALL_LOCAL_ERROR
] = "LocError",
48 [RXRPC_CALL_NETWORK_ERROR
] = "NetError",
51 const char rxrpc_call_traces
[rxrpc_call__nr_trace
][4] = {
52 [rxrpc_call_new_client
] = "NWc",
53 [rxrpc_call_new_service
] = "NWs",
54 [rxrpc_call_queued
] = "QUE",
55 [rxrpc_call_queued_ref
] = "QUR",
56 [rxrpc_call_connected
] = "CON",
57 [rxrpc_call_release
] = "RLS",
58 [rxrpc_call_seen
] = "SEE",
59 [rxrpc_call_got
] = "GOT",
60 [rxrpc_call_got_userid
] = "Gus",
61 [rxrpc_call_got_kernel
] = "Gke",
62 [rxrpc_call_put
] = "PUT",
63 [rxrpc_call_put_userid
] = "Pus",
64 [rxrpc_call_put_kernel
] = "Pke",
65 [rxrpc_call_put_noqueue
] = "PNQ",
66 [rxrpc_call_error
] = "*E*",
69 struct kmem_cache
*rxrpc_call_jar
;
70 LIST_HEAD(rxrpc_calls
);
71 DEFINE_RWLOCK(rxrpc_call_lock
);
73 static void rxrpc_call_timer_expired(unsigned long _call
)
75 struct rxrpc_call
*call
= (struct rxrpc_call
*)_call
;
77 _enter("%d", call
->debug_id
);
79 if (call
->state
< RXRPC_CALL_COMPLETE
)
80 rxrpc_queue_call(call
);
84 * find an extant server call
85 * - called in process context with IRQs enabled
87 struct rxrpc_call
*rxrpc_find_call_by_user_ID(struct rxrpc_sock
*rx
,
88 unsigned long user_call_ID
)
90 struct rxrpc_call
*call
;
93 _enter("%p,%lx", rx
, user_call_ID
);
95 read_lock(&rx
->call_lock
);
97 p
= rx
->calls
.rb_node
;
99 call
= rb_entry(p
, struct rxrpc_call
, sock_node
);
101 if (user_call_ID
< call
->user_call_ID
)
103 else if (user_call_ID
> call
->user_call_ID
)
106 goto found_extant_call
;
109 read_unlock(&rx
->call_lock
);
114 rxrpc_get_call(call
, rxrpc_call_got
);
115 read_unlock(&rx
->call_lock
);
116 _leave(" = %p [%d]", call
, atomic_read(&call
->usage
));
121 * allocate a new call
123 struct rxrpc_call
*rxrpc_alloc_call(gfp_t gfp
)
125 struct rxrpc_call
*call
;
127 call
= kmem_cache_zalloc(rxrpc_call_jar
, gfp
);
131 call
->rxtx_buffer
= kcalloc(RXRPC_RXTX_BUFF_SIZE
,
132 sizeof(struct sk_buff
*),
134 if (!call
->rxtx_buffer
)
137 call
->rxtx_annotations
= kcalloc(RXRPC_RXTX_BUFF_SIZE
, sizeof(u8
), gfp
);
138 if (!call
->rxtx_annotations
)
141 setup_timer(&call
->timer
, rxrpc_call_timer_expired
,
142 (unsigned long)call
);
143 INIT_WORK(&call
->processor
, &rxrpc_process_call
);
144 INIT_LIST_HEAD(&call
->link
);
145 INIT_LIST_HEAD(&call
->chan_wait_link
);
146 INIT_LIST_HEAD(&call
->accept_link
);
147 INIT_LIST_HEAD(&call
->recvmsg_link
);
148 INIT_LIST_HEAD(&call
->sock_link
);
149 init_waitqueue_head(&call
->waitq
);
150 spin_lock_init(&call
->lock
);
151 rwlock_init(&call
->state_lock
);
152 atomic_set(&call
->usage
, 1);
153 call
->debug_id
= atomic_inc_return(&rxrpc_debug_id
);
155 memset(&call
->sock_node
, 0xed, sizeof(call
->sock_node
));
157 /* Leave space in the ring to handle a maxed-out jumbo packet */
158 call
->rx_winsize
= rxrpc_rx_window_size
;
159 call
->tx_winsize
= 16;
160 call
->rx_expect_next
= 1;
164 kfree(call
->rxtx_buffer
);
166 kmem_cache_free(rxrpc_call_jar
, call
);
171 * Allocate a new client call.
173 static struct rxrpc_call
*rxrpc_alloc_client_call(struct sockaddr_rxrpc
*srx
,
176 struct rxrpc_call
*call
;
180 call
= rxrpc_alloc_call(gfp
);
182 return ERR_PTR(-ENOMEM
);
183 call
->state
= RXRPC_CALL_CLIENT_AWAIT_CONN
;
184 call
->service_id
= srx
->srx_service
;
185 call
->tx_phase
= true;
187 _leave(" = %p", call
);
192 * Initiate the call ack/resend/expiry timer.
194 static void rxrpc_start_call_timer(struct rxrpc_call
*call
)
196 unsigned long expire_at
;
198 expire_at
= jiffies
+ rxrpc_max_call_lifetime
;
199 call
->expire_at
= expire_at
;
200 call
->ack_at
= expire_at
;
201 call
->resend_at
= expire_at
;
202 call
->timer
.expires
= expire_at
+ 1;
203 rxrpc_set_timer(call
);
207 * set up a call for the given data
208 * - called in process context with IRQs enabled
210 struct rxrpc_call
*rxrpc_new_client_call(struct rxrpc_sock
*rx
,
211 struct rxrpc_conn_parameters
*cp
,
212 struct sockaddr_rxrpc
*srx
,
213 unsigned long user_call_ID
,
216 struct rxrpc_call
*call
, *xcall
;
217 struct rb_node
*parent
, **pp
;
218 const void *here
= __builtin_return_address(0);
221 _enter("%p,%lx", rx
, user_call_ID
);
223 call
= rxrpc_alloc_client_call(srx
, gfp
);
225 _leave(" = %ld", PTR_ERR(call
));
229 trace_rxrpc_call(call
, rxrpc_call_new_client
, atomic_read(&call
->usage
),
230 here
, (const void *)user_call_ID
);
232 /* Publish the call, even though it is incompletely set up as yet */
233 write_lock(&rx
->call_lock
);
235 pp
= &rx
->calls
.rb_node
;
239 xcall
= rb_entry(parent
, struct rxrpc_call
, sock_node
);
241 if (user_call_ID
< xcall
->user_call_ID
)
242 pp
= &(*pp
)->rb_left
;
243 else if (user_call_ID
> xcall
->user_call_ID
)
244 pp
= &(*pp
)->rb_right
;
246 goto error_dup_user_ID
;
249 rcu_assign_pointer(call
->socket
, rx
);
250 call
->user_call_ID
= user_call_ID
;
251 __set_bit(RXRPC_CALL_HAS_USERID
, &call
->flags
);
252 rxrpc_get_call(call
, rxrpc_call_got_userid
);
253 rb_link_node(&call
->sock_node
, parent
, pp
);
254 rb_insert_color(&call
->sock_node
, &rx
->calls
);
255 list_add(&call
->sock_link
, &rx
->sock_calls
);
257 write_unlock(&rx
->call_lock
);
259 write_lock(&rxrpc_call_lock
);
260 list_add_tail(&call
->link
, &rxrpc_calls
);
261 write_unlock(&rxrpc_call_lock
);
263 /* Set up or get a connection record and set the protocol parameters,
264 * including channel number and call ID.
266 ret
= rxrpc_connect_call(call
, cp
, srx
, gfp
);
270 trace_rxrpc_call(call
, rxrpc_call_connected
, atomic_read(&call
->usage
),
273 spin_lock_bh(&call
->conn
->params
.peer
->lock
);
274 hlist_add_head(&call
->error_link
,
275 &call
->conn
->params
.peer
->error_targets
);
276 spin_unlock_bh(&call
->conn
->params
.peer
->lock
);
278 rxrpc_start_call_timer(call
);
280 _net("CALL new %d on CONN %d", call
->debug_id
, call
->conn
->debug_id
);
282 _leave(" = %p [new]", call
);
285 /* We unexpectedly found the user ID in the list after taking
286 * the call_lock. This shouldn't happen unless the user races
287 * with itself and tries to add the same user ID twice at the
288 * same time in different threads.
291 write_unlock(&rx
->call_lock
);
295 __rxrpc_set_call_completion(call
, RXRPC_CALL_LOCAL_ERROR
,
297 trace_rxrpc_call(call
, rxrpc_call_error
, atomic_read(&call
->usage
),
299 rxrpc_release_call(rx
, call
);
300 rxrpc_put_call(call
, rxrpc_call_put
);
301 _leave(" = %d", ret
);
306 * Set up an incoming call. call->conn points to the connection.
307 * This is called in BH context and isn't allowed to fail.
309 void rxrpc_incoming_call(struct rxrpc_sock
*rx
,
310 struct rxrpc_call
*call
,
313 struct rxrpc_connection
*conn
= call
->conn
;
314 struct rxrpc_skb_priv
*sp
= rxrpc_skb(skb
);
317 _enter(",%d", call
->conn
->debug_id
);
319 rcu_assign_pointer(call
->socket
, rx
);
320 call
->call_id
= sp
->hdr
.callNumber
;
321 call
->service_id
= sp
->hdr
.serviceId
;
322 call
->cid
= sp
->hdr
.cid
;
323 call
->state
= RXRPC_CALL_SERVER_ACCEPTING
;
324 if (sp
->hdr
.securityIndex
> 0)
325 call
->state
= RXRPC_CALL_SERVER_SECURING
;
327 /* Set the channel for this call. We don't get channel_lock as we're
328 * only defending against the data_ready handler (which we're called
329 * from) and the RESPONSE packet parser (which is only really
330 * interested in call_counter and can cope with a disagreement with the
333 chan
= sp
->hdr
.cid
& RXRPC_CHANNELMASK
;
334 conn
->channels
[chan
].call_counter
= call
->call_id
;
335 conn
->channels
[chan
].call_id
= call
->call_id
;
336 rcu_assign_pointer(conn
->channels
[chan
].call
, call
);
338 spin_lock(&conn
->params
.peer
->lock
);
339 hlist_add_head(&call
->error_link
, &conn
->params
.peer
->error_targets
);
340 spin_unlock(&conn
->params
.peer
->lock
);
342 _net("CALL incoming %d on CONN %d", call
->debug_id
, call
->conn
->debug_id
);
344 rxrpc_start_call_timer(call
);
349 * Queue a call's work processor, getting a ref to pass to the work queue.
351 bool rxrpc_queue_call(struct rxrpc_call
*call
)
353 const void *here
= __builtin_return_address(0);
354 int n
= __atomic_add_unless(&call
->usage
, 1, 0);
357 if (rxrpc_queue_work(&call
->processor
))
358 trace_rxrpc_call(call
, rxrpc_call_queued
, n
+ 1, here
, NULL
);
360 rxrpc_put_call(call
, rxrpc_call_put_noqueue
);
365 * Queue a call's work processor, passing the callers ref to the work queue.
367 bool __rxrpc_queue_call(struct rxrpc_call
*call
)
369 const void *here
= __builtin_return_address(0);
370 int n
= atomic_read(&call
->usage
);
372 if (rxrpc_queue_work(&call
->processor
))
373 trace_rxrpc_call(call
, rxrpc_call_queued_ref
, n
, here
, NULL
);
375 rxrpc_put_call(call
, rxrpc_call_put_noqueue
);
380 * Note the re-emergence of a call.
382 void rxrpc_see_call(struct rxrpc_call
*call
)
384 const void *here
= __builtin_return_address(0);
386 int n
= atomic_read(&call
->usage
);
388 trace_rxrpc_call(call
, rxrpc_call_seen
, n
, here
, NULL
);
393 * Note the addition of a ref on a call.
395 void rxrpc_get_call(struct rxrpc_call
*call
, enum rxrpc_call_trace op
)
397 const void *here
= __builtin_return_address(0);
398 int n
= atomic_inc_return(&call
->usage
);
400 trace_rxrpc_call(call
, op
, n
, here
, NULL
);
404 * Detach a call from its owning socket.
406 void rxrpc_release_call(struct rxrpc_sock
*rx
, struct rxrpc_call
*call
)
408 const void *here
= __builtin_return_address(0);
409 struct rxrpc_connection
*conn
= call
->conn
;
413 _enter("{%d,%d}", call
->debug_id
, atomic_read(&call
->usage
));
415 trace_rxrpc_call(call
, rxrpc_call_release
, atomic_read(&call
->usage
),
416 here
, (const void *)call
->flags
);
418 ASSERTCMP(call
->state
, ==, RXRPC_CALL_COMPLETE
);
420 spin_lock_bh(&call
->lock
);
421 if (test_and_set_bit(RXRPC_CALL_RELEASED
, &call
->flags
))
423 spin_unlock_bh(&call
->lock
);
425 del_timer_sync(&call
->timer
);
427 /* Make sure we don't get any more notifications */
428 write_lock_bh(&rx
->recvmsg_lock
);
430 if (!list_empty(&call
->recvmsg_link
)) {
431 _debug("unlinking once-pending call %p { e=%lx f=%lx }",
432 call
, call
->events
, call
->flags
);
433 list_del(&call
->recvmsg_link
);
437 /* list_empty() must return false in rxrpc_notify_socket() */
438 call
->recvmsg_link
.next
= NULL
;
439 call
->recvmsg_link
.prev
= NULL
;
441 write_unlock_bh(&rx
->recvmsg_lock
);
443 rxrpc_put_call(call
, rxrpc_call_put
);
445 write_lock(&rx
->call_lock
);
447 if (test_and_clear_bit(RXRPC_CALL_HAS_USERID
, &call
->flags
)) {
448 rb_erase(&call
->sock_node
, &rx
->calls
);
449 memset(&call
->sock_node
, 0xdd, sizeof(call
->sock_node
));
450 rxrpc_put_call(call
, rxrpc_call_put_userid
);
453 list_del(&call
->sock_link
);
454 write_unlock(&rx
->call_lock
);
456 _debug("RELEASE CALL %p (%d CONN %p)", call
, call
->debug_id
, conn
);
459 rxrpc_disconnect_call(call
);
461 for (i
= 0; i
< RXRPC_RXTX_BUFF_SIZE
; i
++) {
462 rxrpc_free_skb(call
->rxtx_buffer
[i
],
463 (call
->tx_phase
? rxrpc_skb_tx_cleaned
:
464 rxrpc_skb_rx_cleaned
));
465 call
->rxtx_buffer
[i
] = NULL
;
472 * release all the calls associated with a socket
474 void rxrpc_release_calls_on_socket(struct rxrpc_sock
*rx
)
476 struct rxrpc_call
*call
;
480 while (!list_empty(&rx
->to_be_accepted
)) {
481 call
= list_entry(rx
->to_be_accepted
.next
,
482 struct rxrpc_call
, accept_link
);
483 list_del(&call
->accept_link
);
484 rxrpc_abort_call("SKR", call
, 0, RX_CALL_DEAD
, ECONNRESET
);
485 rxrpc_put_call(call
, rxrpc_call_put
);
488 while (!list_empty(&rx
->sock_calls
)) {
489 call
= list_entry(rx
->sock_calls
.next
,
490 struct rxrpc_call
, sock_link
);
491 rxrpc_get_call(call
, rxrpc_call_got
);
492 rxrpc_abort_call("SKT", call
, 0, RX_CALL_DEAD
, ECONNRESET
);
493 rxrpc_send_call_packet(call
, RXRPC_PACKET_TYPE_ABORT
);
494 rxrpc_release_call(rx
, call
);
495 rxrpc_put_call(call
, rxrpc_call_put
);
504 void rxrpc_put_call(struct rxrpc_call
*call
, enum rxrpc_call_trace op
)
506 const void *here
= __builtin_return_address(0);
509 ASSERT(call
!= NULL
);
511 n
= atomic_dec_return(&call
->usage
);
512 trace_rxrpc_call(call
, op
, n
, here
, NULL
);
515 _debug("call %d dead", call
->debug_id
);
516 ASSERTCMP(call
->state
, ==, RXRPC_CALL_COMPLETE
);
518 write_lock(&rxrpc_call_lock
);
519 list_del_init(&call
->link
);
520 write_unlock(&rxrpc_call_lock
);
522 rxrpc_cleanup_call(call
);
527 * Final call destruction under RCU.
529 static void rxrpc_rcu_destroy_call(struct rcu_head
*rcu
)
531 struct rxrpc_call
*call
= container_of(rcu
, struct rxrpc_call
, rcu
);
533 rxrpc_put_peer(call
->peer
);
534 kfree(call
->rxtx_buffer
);
535 kfree(call
->rxtx_annotations
);
536 kmem_cache_free(rxrpc_call_jar
, call
);
542 void rxrpc_cleanup_call(struct rxrpc_call
*call
)
546 _net("DESTROY CALL %d", call
->debug_id
);
548 memset(&call
->sock_node
, 0xcd, sizeof(call
->sock_node
));
550 del_timer_sync(&call
->timer
);
552 ASSERTCMP(call
->state
, ==, RXRPC_CALL_COMPLETE
);
553 ASSERT(test_bit(RXRPC_CALL_RELEASED
, &call
->flags
));
554 ASSERTCMP(call
->conn
, ==, NULL
);
556 /* Clean up the Rx/Tx buffer */
557 for (i
= 0; i
< RXRPC_RXTX_BUFF_SIZE
; i
++)
558 rxrpc_free_skb(call
->rxtx_buffer
[i
],
559 (call
->tx_phase
? rxrpc_skb_tx_cleaned
:
560 rxrpc_skb_rx_cleaned
));
562 rxrpc_free_skb(call
->tx_pending
, rxrpc_skb_tx_cleaned
);
564 call_rcu(&call
->rcu
, rxrpc_rcu_destroy_call
);
568 * Make sure that all calls are gone.
570 void __exit
rxrpc_destroy_all_calls(void)
572 struct rxrpc_call
*call
;
576 if (list_empty(&rxrpc_calls
))
579 write_lock(&rxrpc_call_lock
);
581 while (!list_empty(&rxrpc_calls
)) {
582 call
= list_entry(rxrpc_calls
.next
, struct rxrpc_call
, link
);
583 _debug("Zapping call %p", call
);
585 rxrpc_see_call(call
);
586 list_del_init(&call
->link
);
588 pr_err("Call %p still in use (%d,%s,%lx,%lx)!\n",
589 call
, atomic_read(&call
->usage
),
590 rxrpc_call_states
[call
->state
],
591 call
->flags
, call
->events
);
593 write_unlock(&rxrpc_call_lock
);
595 write_lock(&rxrpc_call_lock
);
598 write_unlock(&rxrpc_call_lock
);