]> git.proxmox.com Git - mirror_ubuntu-kernels.git/commitdiff
rxrpc: Move client call connection to the I/O thread
authorDavid Howells <dhowells@redhat.com>
Wed, 19 Oct 2022 08:45:43 +0000 (09:45 +0100)
committerDavid Howells <dhowells@redhat.com>
Fri, 6 Jan 2023 09:43:33 +0000 (09:43 +0000)
Move the connection setup of client calls to the I/O thread so that a whole
load of locking and barrierage can be eliminated.  This necessitates the
app thread waiting for connection to complete before it can begin
encrypting data.

This also completes the fix for a race that exists between call connection
and call disconnection whereby the data transmission code adds the call to
the peer error distribution list after the call has been disconnected (say
by the rxrpc socket getting closed).

The fix is to complete the process of moving call connection, data
transmission and call disconnection into the I/O thread and thus forcibly
serialising them.

Note that the issue may predate the overhaul to an I/O thread model that
were included in the merge window for v6.2, but the timing is very much
changed by the change given below.

Fixes: cf37b5987508 ("rxrpc: Move DATA transmission into call processor work item")
Reported-by: syzbot+c22650d2844392afdcfd@syzkaller.appspotmail.com
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

14 files changed:
include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/call_object.c
net/rxrpc/call_state.c
net/rxrpc/conn_client.c
net/rxrpc/conn_event.c
net/rxrpc/conn_object.c
net/rxrpc/conn_service.c
net/rxrpc/io_thread.c
net/rxrpc/local_object.c
net/rxrpc/proc.c
net/rxrpc/rxkad.c
net/rxrpc/security.c
net/rxrpc/sendmsg.c

index e2f6b79d551703c381bd9a0df9ef20c6de008358..283db0ea3db4b37edda6c81f3c84213dc79d577c 100644 (file)
        EM(rxrpc_conn_put_call,                 "PUT call    ") \
        EM(rxrpc_conn_put_call_input,           "PUT inp-call") \
        EM(rxrpc_conn_put_conn_input,           "PUT inp-conn") \
-       EM(rxrpc_conn_put_discard,              "PUT discard ") \
        EM(rxrpc_conn_put_discard_idle,         "PUT disc-idl") \
        EM(rxrpc_conn_put_local_dead,           "PUT loc-dead") \
        EM(rxrpc_conn_put_noreuse,              "PUT noreuse ") \
        EM(rxrpc_client_chan_activate,          "ChActv") \
        EM(rxrpc_client_chan_disconnect,        "ChDisc") \
        EM(rxrpc_client_chan_pass,              "ChPass") \
-       EM(rxrpc_client_chan_wait_failed,       "ChWtFl") \
        EM(rxrpc_client_cleanup,                "Clean ") \
        EM(rxrpc_client_discard,                "Discar") \
-       EM(rxrpc_client_duplicate,              "Duplic") \
        EM(rxrpc_client_exposed,                "Expose") \
        EM(rxrpc_client_replace,                "Replac") \
+       EM(rxrpc_client_queue_new_call,         "Q-Call") \
        EM(rxrpc_client_to_active,              "->Actv") \
        E_(rxrpc_client_to_idle,                "->Idle")
 
        EM(rxrpc_call_put_sendmsg,              "PUT sendmsg ") \
        EM(rxrpc_call_put_unnotify,             "PUT unnotify") \
        EM(rxrpc_call_put_userid_exists,        "PUT u-exists") \
+       EM(rxrpc_call_put_userid,               "PUT user-id ") \
        EM(rxrpc_call_see_accept,               "SEE accept  ") \
        EM(rxrpc_call_see_activate_client,      "SEE act-clnt") \
        EM(rxrpc_call_see_connect_failed,       "SEE con-fail") \
index de84061a54472b626f1f19caa093b5d3e1233467..007258538beebb3d20b42730c7ed05d55360de6a 100644 (file)
@@ -292,7 +292,6 @@ struct rxrpc_local {
        struct rb_root          client_bundles; /* Client connection bundles by socket params */
        spinlock_t              client_bundles_lock; /* Lock for client_bundles */
        bool                    kill_all_client_conns;
-       spinlock_t              client_conn_cache_lock; /* Lock for ->*_client_conns */
        struct list_head        idle_client_conns;
        struct timer_list       client_conn_reap_timer;
        unsigned long           client_conn_flags;
@@ -304,7 +303,8 @@ struct rxrpc_local {
        bool                    dead;
        bool                    service_closed; /* Service socket closed */
        struct idr              conn_ids;       /* List of connection IDs */
-       spinlock_t              conn_lock;      /* Lock for client connection pool */
+       struct list_head        new_client_calls; /* Newly created client calls need connection */
+       spinlock_t              client_call_lock; /* Lock for ->new_client_calls */
        struct sockaddr_rxrpc   srx;            /* local address */
 };
 
@@ -385,7 +385,6 @@ enum rxrpc_call_completion {
  * Bits in the connection flags.
  */
 enum rxrpc_conn_flag {
-       RXRPC_CONN_HAS_IDR,             /* Has a client conn ID assigned */
        RXRPC_CONN_IN_SERVICE_CONNS,    /* Conn is in peer->service_conns */
        RXRPC_CONN_DONT_REUSE,          /* Don't reuse this connection */
        RXRPC_CONN_PROBING_FOR_UPGRADE, /* Probing for service upgrade */
@@ -413,6 +412,7 @@ enum rxrpc_conn_event {
  */
 enum rxrpc_conn_proto_state {
        RXRPC_CONN_UNUSED,              /* Connection not yet attempted */
+       RXRPC_CONN_CLIENT_UNSECURED,    /* Client connection needs security init */
        RXRPC_CONN_CLIENT,              /* Client connection */
        RXRPC_CONN_SERVICE_PREALLOC,    /* Service connection preallocation */
        RXRPC_CONN_SERVICE_UNSECURED,   /* Service unsecured connection */
@@ -436,11 +436,9 @@ struct rxrpc_bundle {
        u32                     security_level; /* Security level selected */
        u16                     service_id;     /* Service ID for this connection */
        bool                    try_upgrade;    /* True if the bundle is attempting upgrade */
-       bool                    alloc_conn;     /* True if someone's getting a conn */
        bool                    exclusive;      /* T if conn is exclusive */
        bool                    upgrade;        /* T if service ID can be upgraded */
-       short                   alloc_error;    /* Error from last conn allocation */
-       spinlock_t              channel_lock;
+       unsigned short          alloc_error;    /* Error from last conn allocation */
        struct rb_node          local_node;     /* Node in local->client_conns */
        struct list_head        waiting_calls;  /* Calls waiting for channels */
        unsigned long           avail_chans;    /* Mask of available channels */
@@ -468,7 +466,7 @@ struct rxrpc_connection {
        unsigned char           act_chans;      /* Mask of active channels */
        struct rxrpc_channel {
                unsigned long           final_ack_at;   /* Time at which to issue final ACK */
-               struct rxrpc_call __rcu *call;          /* Active call */
+               struct rxrpc_call       *call;          /* Active call */
                unsigned int            call_debug_id;  /* call->debug_id */
                u32                     call_id;        /* ID of current call */
                u32                     call_counter;   /* Call ID counter */
@@ -489,6 +487,7 @@ struct rxrpc_connection {
        struct list_head        link;           /* link in master connection list */
        struct sk_buff_head     rx_queue;       /* received conn-level packets */
 
+       struct mutex            security_lock;  /* Lock for security management */
        const struct rxrpc_security *security;  /* applied security module */
        union {
                struct {
@@ -619,7 +618,7 @@ struct rxrpc_call {
        struct work_struct      destroyer;      /* In-process-context destroyer */
        rxrpc_notify_rx_t       notify_rx;      /* kernel service Rx notification function */
        struct list_head        link;           /* link in master call list */
-       struct list_head        chan_wait_link; /* Link in conn->bundle->waiting_calls */
+       struct list_head        wait_link;      /* Link in local->new_client_calls */
        struct hlist_node       error_link;     /* link in error distribution list */
        struct list_head        accept_link;    /* Link in rx->acceptq */
        struct list_head        recvmsg_link;   /* Link in rx->recvmsg_q */
@@ -866,6 +865,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
                                         struct sockaddr_rxrpc *,
                                         struct rxrpc_call_params *, gfp_t,
                                         unsigned int);
+void rxrpc_start_call_timer(struct rxrpc_call *call);
 void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *,
                         struct sk_buff *);
 void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
@@ -905,6 +905,7 @@ static inline void rxrpc_set_call_state(struct rxrpc_call *call,
 {
        /* Order write of completion info before write of ->state. */
        smp_store_release(&call->_state, state);
+       wake_up(&call->waitq);
 }
 
 static inline enum rxrpc_call_state __rxrpc_call_state(const struct rxrpc_call *call)
@@ -940,10 +941,11 @@ extern unsigned int rxrpc_reap_client_connections;
 extern unsigned long rxrpc_conn_idle_client_expiry;
 extern unsigned long rxrpc_conn_idle_client_fast_expiry;
 
-void rxrpc_destroy_client_conn_ids(struct rxrpc_local *local);
+void rxrpc_purge_client_connections(struct rxrpc_local *local);
 struct rxrpc_bundle *rxrpc_get_bundle(struct rxrpc_bundle *, enum rxrpc_bundle_trace);
 void rxrpc_put_bundle(struct rxrpc_bundle *, enum rxrpc_bundle_trace);
-int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp);
+int rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t gfp);
+void rxrpc_connect_client_calls(struct rxrpc_local *local);
 void rxrpc_expose_client_call(struct rxrpc_call *);
 void rxrpc_disconnect_client_call(struct rxrpc_bundle *, struct rxrpc_call *);
 void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle);
index c94161acf3c418a34d490bb900825451c033bc5a..3ded5a24627c55f6850392192b7e9e414c14547c 100644 (file)
@@ -150,7 +150,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
        timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
        INIT_WORK(&call->destroyer, rxrpc_destroy_call);
        INIT_LIST_HEAD(&call->link);
-       INIT_LIST_HEAD(&call->chan_wait_link);
+       INIT_LIST_HEAD(&call->wait_link);
        INIT_LIST_HEAD(&call->accept_link);
        INIT_LIST_HEAD(&call->recvmsg_link);
        INIT_LIST_HEAD(&call->sock_link);
@@ -242,7 +242,7 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
 /*
  * Initiate the call ack/resend/expiry timer.
  */
-static void rxrpc_start_call_timer(struct rxrpc_call *call)
+void rxrpc_start_call_timer(struct rxrpc_call *call)
 {
        unsigned long now = jiffies;
        unsigned long j = now + MAX_JIFFY_OFFSET;
@@ -286,6 +286,39 @@ static void rxrpc_put_call_slot(struct rxrpc_call *call)
        up(limiter);
 }
 
+/*
+ * Start the process of connecting a call.  We obtain a peer and a connection
+ * bundle, but the actual association of a call with a connection is offloaded
+ * to the I/O thread to simplify locking.
+ */
+static int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
+{
+       struct rxrpc_local *local = call->local;
+       int ret = 0;
+
+       _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
+
+       call->peer = rxrpc_lookup_peer(local, &call->dest_srx, gfp);
+       if (!call->peer)
+               goto error;
+
+       ret = rxrpc_look_up_bundle(call, gfp);
+       if (ret < 0)
+               goto error;
+
+       trace_rxrpc_client(NULL, -1, rxrpc_client_queue_new_call);
+       rxrpc_get_call(call, rxrpc_call_get_io_thread);
+       spin_lock(&local->client_call_lock);
+       list_add_tail(&call->wait_link, &local->new_client_calls);
+       spin_unlock(&local->client_call_lock);
+       rxrpc_wake_up_io_thread(local);
+       return 0;
+
+error:
+       __set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
+       return ret;
+}
+
 /*
  * Set up a call for the given parameters.
  * - Called with the socket lock held, which it must release.
@@ -369,10 +402,6 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
        if (ret < 0)
                goto error_attached_to_socket;
 
-       rxrpc_see_call(call, rxrpc_call_see_connected);
-
-       rxrpc_start_call_timer(call);
-
        _leave(" = %p [new]", call);
        return call;
 
@@ -387,22 +416,20 @@ error_dup_user_ID:
        rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, -EEXIST);
        trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0,
                         rxrpc_call_see_userid_exists);
-       rxrpc_release_call(rx, call);
        mutex_unlock(&call->user_mutex);
        rxrpc_put_call(call, rxrpc_call_put_userid_exists);
        _leave(" = -EEXIST");
        return ERR_PTR(-EEXIST);
 
        /* We got an error, but the call is attached to the socket and is in
-        * need of release.  However, we might now race with recvmsg() when
-        * completing the call queues it.  Return 0 from sys_sendmsg() and
+        * need of release.  However, we might now race with recvmsg() when it
+        * completion notifies the socket.  Return 0 from sys_sendmsg() and
         * leave the error to recvmsg() to deal with.
         */
 error_attached_to_socket:
        trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), ret,
                         rxrpc_call_see_connect_failed);
-       set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
-       rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, ret);
+       rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret);
        _leave(" = c=%08x [err]", call->debug_id);
        return call;
 }
@@ -460,7 +487,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
        chan = sp->hdr.cid & RXRPC_CHANNELMASK;
        conn->channels[chan].call_counter = call->call_id;
        conn->channels[chan].call_id = call->call_id;
-       rcu_assign_pointer(conn->channels[chan].call, call);
+       conn->channels[chan].call = call;
        spin_unlock(&conn->state_lock);
 
        spin_lock(&conn->peer->lock);
@@ -520,7 +547,7 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call)
 void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 {
        struct rxrpc_connection *conn = call->conn;
-       bool put = false;
+       bool put = false, putu = false;
 
        _enter("{%d,%d}", call->debug_id, refcount_read(&call->ref));
 
@@ -555,7 +582,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
        if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
                rb_erase(&call->sock_node, &rx->calls);
                memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
-               rxrpc_put_call(call, rxrpc_call_put_userid_exists);
+               putu = true;
        }
 
        list_del(&call->sock_link);
@@ -563,6 +590,9 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 
        _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
 
+       if (putu)
+               rxrpc_put_call(call, rxrpc_call_put_userid);
+
        _leave("");
 }
 
index 27dc1242b7125bf83a67b5a935a544229d266233..6afb54373ebbf9207889c44de4faac06a2157d52 100644 (file)
@@ -65,5 +65,5 @@ void rxrpc_prefail_call(struct rxrpc_call *call, enum rxrpc_call_completion comp
        call->completion        = compl;
        call->_state            = RXRPC_CALL_COMPLETE;
        trace_rxrpc_call_complete(call);
-       __set_bit(RXRPC_CALL_RELEASED, &call->flags);
+       WARN_ON_ONCE(__test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags));
 }
index ebb43f65ebc51388d7a9417e8ec374dc8bb9021e..981ca5b98bcb9096fa9bd6ee3561d2792a17782f 100644 (file)
@@ -39,61 +39,19 @@ static void rxrpc_activate_bundle(struct rxrpc_bundle *bundle)
        atomic_inc(&bundle->active);
 }
 
-/*
- * Get a connection ID and epoch for a client connection from the global pool.
- * The connection struct pointer is then recorded in the idr radix tree.  The
- * epoch doesn't change until the client is rebooted (or, at least, unless the
- * module is unloaded).
- */
-static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
-                                         gfp_t gfp)
-{
-       struct rxrpc_local *local = conn->local;
-       int id;
-
-       _enter("");
-
-       idr_preload(gfp);
-       spin_lock(&local->conn_lock);
-
-       id = idr_alloc_cyclic(&local->conn_ids, conn,
-                             1, 0x40000000, GFP_NOWAIT);
-       if (id < 0)
-               goto error;
-
-       spin_unlock(&local->conn_lock);
-       idr_preload_end();
-
-       conn->proto.epoch = local->rxnet->epoch;
-       conn->proto.cid = id << RXRPC_CIDSHIFT;
-       set_bit(RXRPC_CONN_HAS_IDR, &conn->flags);
-       _leave(" [CID %x]", conn->proto.cid);
-       return 0;
-
-error:
-       spin_unlock(&local->conn_lock);
-       idr_preload_end();
-       _leave(" = %d", id);
-       return id;
-}
-
 /*
  * Release a connection ID for a client connection.
  */
 static void rxrpc_put_client_connection_id(struct rxrpc_local *local,
                                           struct rxrpc_connection *conn)
 {
-       if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
-               spin_lock(&local->conn_lock);
-               idr_remove(&local->conn_ids, conn->proto.cid >> RXRPC_CIDSHIFT);
-               spin_unlock(&local->conn_lock);
-       }
+       idr_remove(&local->conn_ids, conn->proto.cid >> RXRPC_CIDSHIFT);
 }
 
 /*
  * Destroy the client connection ID tree.
  */
-void rxrpc_destroy_client_conn_ids(struct rxrpc_local *local)
+static void rxrpc_destroy_client_conn_ids(struct rxrpc_local *local)
 {
        struct rxrpc_connection *conn;
        int id;
@@ -129,7 +87,6 @@ static struct rxrpc_bundle *rxrpc_alloc_bundle(struct rxrpc_call *call,
                bundle->security_level  = call->security_level;
                refcount_set(&bundle->ref, 1);
                atomic_set(&bundle->active, 1);
-               spin_lock_init(&bundle->channel_lock);
                INIT_LIST_HEAD(&bundle->waiting_calls);
                trace_rxrpc_bundle(bundle->debug_id, 1, rxrpc_bundle_new);
        }
@@ -169,69 +126,68 @@ void rxrpc_put_bundle(struct rxrpc_bundle *bundle, enum rxrpc_bundle_trace why)
        }
 }
 
+/*
+ * Get rid of outstanding client connection preallocations when a local
+ * endpoint is destroyed.
+ */
+void rxrpc_purge_client_connections(struct rxrpc_local *local)
+{
+       rxrpc_destroy_client_conn_ids(local);
+}
+
 /*
  * Allocate a client connection.
  */
 static struct rxrpc_connection *
-rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle, gfp_t gfp)
+rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle)
 {
        struct rxrpc_connection *conn;
-       struct rxrpc_net *rxnet = bundle->local->rxnet;
-       int ret;
+       struct rxrpc_local *local = bundle->local;
+       struct rxrpc_net *rxnet = local->rxnet;
+       int id;
 
        _enter("");
 
-       conn = rxrpc_alloc_connection(rxnet, gfp);
-       if (!conn) {
-               _leave(" = -ENOMEM");
+       conn = rxrpc_alloc_connection(rxnet, GFP_ATOMIC | __GFP_NOWARN);
+       if (!conn)
                return ERR_PTR(-ENOMEM);
+
+       id = idr_alloc_cyclic(&local->conn_ids, conn, 1, 0x40000000,
+                             GFP_ATOMIC | __GFP_NOWARN);
+       if (id < 0) {
+               kfree(conn);
+               return ERR_PTR(id);
        }
 
        refcount_set(&conn->ref, 1);
-       conn->bundle            = bundle;
-       conn->local             = bundle->local;
-       conn->peer              = bundle->peer;
-       conn->key               = bundle->key;
+       conn->proto.cid         = id << RXRPC_CIDSHIFT;
+       conn->proto.epoch       = local->rxnet->epoch;
+       conn->out_clientflag    = RXRPC_CLIENT_INITIATED;
+       conn->bundle            = rxrpc_get_bundle(bundle, rxrpc_bundle_get_client_conn);
+       conn->local             = rxrpc_get_local(bundle->local, rxrpc_local_get_client_conn);
+       conn->peer              = rxrpc_get_peer(bundle->peer, rxrpc_peer_get_client_conn);
+       conn->key               = key_get(bundle->key);
+       conn->security          = bundle->security;
        conn->exclusive         = bundle->exclusive;
        conn->upgrade           = bundle->upgrade;
        conn->orig_service_id   = bundle->service_id;
        conn->security_level    = bundle->security_level;
-       conn->out_clientflag    = RXRPC_CLIENT_INITIATED;
-       conn->state             = RXRPC_CONN_CLIENT;
+       conn->state             = RXRPC_CONN_CLIENT_UNSECURED;
        conn->service_id        = conn->orig_service_id;
 
-       ret = rxrpc_get_client_connection_id(conn, gfp);
-       if (ret < 0)
-               goto error_0;
-
-       ret = rxrpc_init_client_conn_security(conn);
-       if (ret < 0)
-               goto error_1;
+       if (conn->security == &rxrpc_no_security)
+               conn->state     = RXRPC_CONN_CLIENT;
 
        atomic_inc(&rxnet->nr_conns);
        write_lock(&rxnet->conn_lock);
        list_add_tail(&conn->proc_link, &rxnet->conn_proc_list);
        write_unlock(&rxnet->conn_lock);
 
-       rxrpc_get_bundle(bundle, rxrpc_bundle_get_client_conn);
-       rxrpc_get_peer(conn->peer, rxrpc_peer_get_client_conn);
-       rxrpc_get_local(conn->local, rxrpc_local_get_client_conn);
-       key_get(conn->key);
-
-       trace_rxrpc_conn(conn->debug_id, refcount_read(&conn->ref),
-                        rxrpc_conn_new_client);
+       rxrpc_see_connection(conn, rxrpc_conn_new_client);
 
        atomic_inc(&rxnet->nr_client_conns);
        trace_rxrpc_client(conn, -1, rxrpc_client_alloc);
-       _leave(" = %p", conn);
        return conn;
-
-error_1:
-       rxrpc_put_client_connection_id(bundle->local, conn);
-error_0:
-       kfree(conn);
-       _leave(" = %d", ret);
-       return ERR_PTR(ret);
 }
 
 /*
@@ -249,7 +205,8 @@ static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn)
        if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags))
                goto dont_reuse;
 
-       if (conn->state != RXRPC_CONN_CLIENT ||
+       if ((conn->state != RXRPC_CONN_CLIENT_UNSECURED &&
+            conn->state != RXRPC_CONN_CLIENT) ||
            conn->proto.epoch != rxnet->epoch)
                goto mark_dont_reuse;
 
@@ -280,7 +237,7 @@ dont_reuse:
  * Look up the conn bundle that matches the connection parameters, adding it if
  * it doesn't yet exist.
  */
-static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t gfp)
+int rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t gfp)
 {
        static atomic_t rxrpc_bundle_id;
        struct rxrpc_bundle *bundle, *candidate;
@@ -295,7 +252,7 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t
 
        if (test_bit(RXRPC_CALL_EXCLUSIVE, &call->flags)) {
                call->bundle = rxrpc_alloc_bundle(call, gfp);
-               return call->bundle;
+               return call->bundle ? 0 : -ENOMEM;
        }
 
        /* First, see if the bundle is already there. */
@@ -324,7 +281,7 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t
        /* It wasn't.  We need to add one. */
        candidate = rxrpc_alloc_bundle(call, gfp);
        if (!candidate)
-               return ERR_PTR(-ENOMEM);
+               return -ENOMEM;
 
        _debug("search 2");
        spin_lock(&local->client_bundles_lock);
@@ -355,7 +312,7 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t
        call->bundle = rxrpc_get_bundle(candidate, rxrpc_bundle_get_client_call);
        spin_unlock(&local->client_bundles_lock);
        _leave(" = B=%u [new]", call->bundle->debug_id);
-       return call->bundle;
+       return 0;
 
 found_bundle_free:
        rxrpc_free_bundle(candidate);
@@ -364,160 +321,77 @@ found_bundle:
        rxrpc_activate_bundle(bundle);
        spin_unlock(&local->client_bundles_lock);
        _leave(" = B=%u [found]", call->bundle->debug_id);
-       return call->bundle;
-}
-
-/*
- * Create or find a client bundle to use for a call.
- *
- * If we return with a connection, the call will be on its waiting list.  It's
- * left to the caller to assign a channel and wake up the call.
- */
-static struct rxrpc_bundle *rxrpc_prep_call(struct rxrpc_call *call, gfp_t gfp)
-{
-       struct rxrpc_bundle *bundle;
-
-       _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
-
-       call->peer = rxrpc_lookup_peer(call->local, &call->dest_srx, gfp);
-       if (!call->peer)
-               goto error;
-
-       call->tx_last_sent = ktime_get_real();
-       call->cong_ssthresh = call->peer->cong_ssthresh;
-       if (call->cong_cwnd >= call->cong_ssthresh)
-               call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
-       else
-               call->cong_mode = RXRPC_CALL_SLOW_START;
-
-       /* Find the client connection bundle. */
-       bundle = rxrpc_look_up_bundle(call, gfp);
-       if (!bundle)
-               goto error;
-
-       /* Get this call queued.  Someone else may activate it whilst we're
-        * lining up a new connection, but that's fine.
-        */
-       spin_lock(&bundle->channel_lock);
-       list_add_tail(&call->chan_wait_link, &bundle->waiting_calls);
-       spin_unlock(&bundle->channel_lock);
-
-       _leave(" = [B=%x]", bundle->debug_id);
-       return bundle;
-
-error:
-       _leave(" = -ENOMEM");
-       return ERR_PTR(-ENOMEM);
+       return 0;
 }
 
 /*
  * Allocate a new connection and add it into a bundle.
  */
-static void rxrpc_add_conn_to_bundle(struct rxrpc_bundle *bundle, gfp_t gfp)
-       __releases(bundle->channel_lock)
+static bool rxrpc_add_conn_to_bundle(struct rxrpc_bundle *bundle,
+                                    unsigned int slot)
 {
-       struct rxrpc_connection *candidate = NULL, *old = NULL;
-       bool conflict;
-       int i;
-
-       _enter("");
-
-       conflict = bundle->alloc_conn;
-       if (!conflict)
-               bundle->alloc_conn = true;
-       spin_unlock(&bundle->channel_lock);
-       if (conflict) {
-               _leave(" [conf]");
-               return;
-       }
-
-       candidate = rxrpc_alloc_client_connection(bundle, gfp);
-
-       spin_lock(&bundle->channel_lock);
-       bundle->alloc_conn = false;
+       struct rxrpc_connection *conn, *old;
+       unsigned int shift = slot * RXRPC_MAXCALLS;
+       unsigned int i;
 
-       if (IS_ERR(candidate)) {
-               bundle->alloc_error = PTR_ERR(candidate);
-               spin_unlock(&bundle->channel_lock);
-               _leave(" [err %ld]", PTR_ERR(candidate));
-               return;
+       old = bundle->conns[slot];
+       if (old) {
+               bundle->conns[slot] = NULL;
+               trace_rxrpc_client(old, -1, rxrpc_client_replace);
+               rxrpc_put_connection(old, rxrpc_conn_put_noreuse);
        }
 
-       bundle->alloc_error = 0;
-
-       for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) {
-               unsigned int shift = i * RXRPC_MAXCALLS;
-               int j;
-
-               old = bundle->conns[i];
-               if (!rxrpc_may_reuse_conn(old)) {
-                       if (old)
-                               trace_rxrpc_client(old, -1, rxrpc_client_replace);
-                       candidate->bundle_shift = shift;
-                       rxrpc_activate_bundle(bundle);
-                       bundle->conns[i] = candidate;
-                       for (j = 0; j < RXRPC_MAXCALLS; j++)
-                               set_bit(shift + j, &bundle->avail_chans);
-                       candidate = NULL;
-                       break;
-               }
-
-               old = NULL;
+       conn = rxrpc_alloc_client_connection(bundle);
+       if (IS_ERR(conn)) {
+               bundle->alloc_error = PTR_ERR(conn);
+               return false;
        }
 
-       spin_unlock(&bundle->channel_lock);
-
-       if (candidate) {
-               _debug("discard C=%x", candidate->debug_id);
-               trace_rxrpc_client(candidate, -1, rxrpc_client_duplicate);
-               rxrpc_put_connection(candidate, rxrpc_conn_put_discard);
-       }
-
-       rxrpc_put_connection(old, rxrpc_conn_put_noreuse);
-       _leave("");
+       rxrpc_activate_bundle(bundle);
+       conn->bundle_shift = shift;
+       bundle->conns[slot] = conn;
+       for (i = 0; i < RXRPC_MAXCALLS; i++)
+               set_bit(shift + i, &bundle->avail_chans);
+       return true;
 }
 
 /*
  * Add a connection to a bundle if there are no usable connections or we have
  * connections waiting for extra capacity.
  */
-static void rxrpc_maybe_add_conn(struct rxrpc_bundle *bundle, gfp_t gfp)
+static bool rxrpc_bundle_has_space(struct rxrpc_bundle *bundle)
 {
-       struct rxrpc_call *call;
-       int i, usable;
+       int slot = -1, i, usable;
 
        _enter("");
 
-       spin_lock(&bundle->channel_lock);
+       bundle->alloc_error = 0;
 
        /* See if there are any usable connections. */
        usable = 0;
-       for (i = 0; i < ARRAY_SIZE(bundle->conns); i++)
+       for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) {
                if (rxrpc_may_reuse_conn(bundle->conns[i]))
                        usable++;
-
-       if (!usable && !list_empty(&bundle->waiting_calls)) {
-               call = list_first_entry(&bundle->waiting_calls,
-                                       struct rxrpc_call, chan_wait_link);
-               if (test_bit(RXRPC_CALL_UPGRADE, &call->flags))
-                       bundle->try_upgrade = true;
+               else if (slot == -1)
+                       slot = i;
        }
 
+       if (!usable && bundle->upgrade)
+               bundle->try_upgrade = true;
+
        if (!usable)
                goto alloc_conn;
 
        if (!bundle->avail_chans &&
            !bundle->try_upgrade &&
-           !list_empty(&bundle->waiting_calls) &&
            usable < ARRAY_SIZE(bundle->conns))
                goto alloc_conn;
 
-       spin_unlock(&bundle->channel_lock);
        _leave("");
-       return;
+       return usable;
 
 alloc_conn:
-       return rxrpc_add_conn_to_bundle(bundle, gfp);
+       return slot >= 0 ? rxrpc_add_conn_to_bundle(bundle, slot) : false;
 }
 
 /*
@@ -531,11 +405,13 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
        struct rxrpc_channel *chan = &conn->channels[channel];
        struct rxrpc_bundle *bundle = conn->bundle;
        struct rxrpc_call *call = list_entry(bundle->waiting_calls.next,
-                                            struct rxrpc_call, chan_wait_link);
+                                            struct rxrpc_call, wait_link);
        u32 call_id = chan->call_counter + 1;
 
        _enter("C=%x,%u", conn->debug_id, channel);
 
+       list_del_init(&call->wait_link);
+
        trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate);
 
        /* Cancel the final ACK on the previous call if it hasn't been sent yet
@@ -545,65 +421,50 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
        clear_bit(conn->bundle_shift + channel, &bundle->avail_chans);
 
        rxrpc_see_call(call, rxrpc_call_see_activate_client);
-       list_del_init(&call->chan_wait_link);
        call->conn      = rxrpc_get_connection(conn, rxrpc_conn_get_activate_call);
        call->cid       = conn->proto.cid | channel;
        call->call_id   = call_id;
        call->dest_srx.srx_service = conn->service_id;
-
-       trace_rxrpc_connect_call(call);
-
-       rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_SEND_REQUEST);
-
-       /* Paired with the read barrier in rxrpc_connect_call().  This orders
-        * cid and epoch in the connection wrt to call_id without the need to
-        * take the channel_lock.
-        *
-        * We provisionally assign a callNumber at this point, but we don't
-        * confirm it until the call is about to be exposed.
-        *
-        * TODO: Pair with a barrier in the data_ready handler when that looks
-        * at the call ID through a connection channel.
-        */
-       smp_wmb();
+       call->cong_ssthresh = call->peer->cong_ssthresh;
+       if (call->cong_cwnd >= call->cong_ssthresh)
+               call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
+       else
+               call->cong_mode = RXRPC_CALL_SLOW_START;
 
        chan->call_id           = call_id;
        chan->call_debug_id     = call->debug_id;
-       rcu_assign_pointer(chan->call, call);
+       chan->call              = call;
+
+       rxrpc_see_call(call, rxrpc_call_see_connected);
+       trace_rxrpc_connect_call(call);
+       call->tx_last_sent = ktime_get_real();
+       rxrpc_start_call_timer(call);
+       rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_SEND_REQUEST);
        wake_up(&call->waitq);
 }
 
 /*
  * Remove a connection from the idle list if it's on it.
  */
-static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn)
+static void rxrpc_unidle_conn(struct rxrpc_connection *conn)
 {
-       struct rxrpc_local *local = bundle->local;
-       bool drop_ref;
-
        if (!list_empty(&conn->cache_link)) {
-               drop_ref = false;
-               spin_lock(&local->client_conn_cache_lock);
-               if (!list_empty(&conn->cache_link)) {
-                       list_del_init(&conn->cache_link);
-                       drop_ref = true;
-               }
-               spin_unlock(&local->client_conn_cache_lock);
-               if (drop_ref)
-                       rxrpc_put_connection(conn, rxrpc_conn_put_unidle);
+               list_del_init(&conn->cache_link);
+               rxrpc_put_connection(conn, rxrpc_conn_put_unidle);
        }
 }
 
 /*
- * Assign channels and callNumbers to waiting calls with channel_lock
- * held by caller.
+ * Assign channels and callNumbers to waiting calls.
  */
-static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle)
+static void rxrpc_activate_channels(struct rxrpc_bundle *bundle)
 {
        struct rxrpc_connection *conn;
        unsigned long avail, mask;
        unsigned int channel, slot;
 
+       trace_rxrpc_client(NULL, -1, rxrpc_client_activate_chans);
+
        if (bundle->try_upgrade)
                mask = 1;
        else
@@ -623,7 +484,7 @@ static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle)
 
                if (bundle->try_upgrade)
                        set_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags);
-               rxrpc_unidle_conn(bundle, conn);
+               rxrpc_unidle_conn(conn);
 
                channel &= (RXRPC_MAXCALLS - 1);
                conn->act_chans |= 1 << channel;
@@ -632,125 +493,24 @@ static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle)
 }
 
 /*
- * Assign channels and callNumbers to waiting calls.
- */
-static void rxrpc_activate_channels(struct rxrpc_bundle *bundle)
-{
-       _enter("B=%x", bundle->debug_id);
-
-       trace_rxrpc_client(NULL, -1, rxrpc_client_activate_chans);
-
-       if (!bundle->avail_chans)
-               return;
-
-       spin_lock(&bundle->channel_lock);
-       rxrpc_activate_channels_locked(bundle);
-       spin_unlock(&bundle->channel_lock);
-       _leave("");
-}
-
-/*
- * Wait for a callNumber and a channel to be granted to a call.
- */
-static int rxrpc_wait_for_channel(struct rxrpc_bundle *bundle,
-                                 struct rxrpc_call *call, gfp_t gfp)
-{
-       DECLARE_WAITQUEUE(myself, current);
-       int ret = 0;
-
-       _enter("%d", call->debug_id);
-
-       if (!gfpflags_allow_blocking(gfp)) {
-               rxrpc_maybe_add_conn(bundle, gfp);
-               rxrpc_activate_channels(bundle);
-               ret = bundle->alloc_error ?: -EAGAIN;
-               goto out;
-       }
-
-       add_wait_queue_exclusive(&call->waitq, &myself);
-       for (;;) {
-               rxrpc_maybe_add_conn(bundle, gfp);
-               rxrpc_activate_channels(bundle);
-               ret = bundle->alloc_error;
-               if (ret < 0)
-                       break;
-
-               switch (call->interruptibility) {
-               case RXRPC_INTERRUPTIBLE:
-               case RXRPC_PREINTERRUPTIBLE:
-                       set_current_state(TASK_INTERRUPTIBLE);
-                       break;
-               case RXRPC_UNINTERRUPTIBLE:
-               default:
-                       set_current_state(TASK_UNINTERRUPTIBLE);
-                       break;
-               }
-               if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN)
-                       break;
-               if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
-                    call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
-                   signal_pending(current)) {
-                       ret = -ERESTARTSYS;
-                       break;
-               }
-               schedule();
-       }
-       remove_wait_queue(&call->waitq, &myself);
-       __set_current_state(TASK_RUNNING);
-
-out:
-       _leave(" = %d", ret);
-       return ret;
-}
-
-/*
- * find a connection for a call
- * - called in process context with IRQs enabled
+ * Connect waiting channels (called from the I/O thread).
  */
-int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
+void rxrpc_connect_client_calls(struct rxrpc_local *local)
 {
-       struct rxrpc_bundle *bundle;
-       int ret = 0;
-
-       _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
-
-       rxrpc_get_call(call, rxrpc_call_get_io_thread);
-
-       bundle = rxrpc_prep_call(call, gfp);
-       if (IS_ERR(bundle)) {
-               rxrpc_put_call(call, rxrpc_call_get_io_thread);
-               ret = PTR_ERR(bundle);
-               goto out;
-       }
-
-       if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_AWAIT_CONN) {
-               ret = rxrpc_wait_for_channel(bundle, call, gfp);
-               if (ret < 0)
-                       goto wait_failed;
-       }
-
-granted_channel:
-       /* Paired with the write barrier in rxrpc_activate_one_channel(). */
-       smp_rmb();
+       struct rxrpc_call *call;
 
-out:
-       _leave(" = %d", ret);
-       return ret;
+       while ((call = list_first_entry_or_null(&local->new_client_calls,
+                                               struct rxrpc_call, wait_link))
+              ) {
+               struct rxrpc_bundle *bundle = call->bundle;
 
-wait_failed:
-       spin_lock(&bundle->channel_lock);
-       list_del_init(&call->chan_wait_link);
-       spin_unlock(&bundle->channel_lock);
+               spin_lock(&local->client_call_lock);
+               list_move_tail(&call->wait_link, &bundle->waiting_calls);
+               spin_unlock(&local->client_call_lock);
 
-       if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) {
-               ret = 0;
-               goto granted_channel;
+               if (rxrpc_bundle_has_space(bundle))
+                       rxrpc_activate_channels(bundle);
        }
-
-       trace_rxrpc_client(call->conn, ret, rxrpc_client_chan_wait_failed);
-       rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret);
-       rxrpc_disconnect_client_call(bundle, call);
-       goto out;
 }
 
 /*
@@ -808,8 +568,6 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
 
        _enter("c=%x", call->debug_id);
 
-       spin_lock(&bundle->channel_lock);
-
        /* Calls that have never actually been assigned a channel can simply be
         * discarded.
         */
@@ -818,8 +576,8 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
                _debug("call is waiting");
                ASSERTCMP(call->call_id, ==, 0);
                ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags));
-               list_del_init(&call->chan_wait_link);
-               goto out;
+               list_del_init(&call->wait_link);
+               return;
        }
 
        cid = call->cid;
@@ -827,10 +585,8 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
        chan = &conn->channels[channel];
        trace_rxrpc_client(conn, channel, rxrpc_client_chan_disconnect);
 
-       if (rcu_access_pointer(chan->call) != call) {
-               spin_unlock(&bundle->channel_lock);
-               BUG();
-       }
+       if (WARN_ON(chan->call != call))
+               return;
 
        may_reuse = rxrpc_may_reuse_conn(conn);
 
@@ -851,16 +607,15 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
                        trace_rxrpc_client(conn, channel, rxrpc_client_to_active);
                        bundle->try_upgrade = false;
                        if (may_reuse)
-                               rxrpc_activate_channels_locked(bundle);
+                               rxrpc_activate_channels(bundle);
                }
-
        }
 
        /* See if we can pass the channel directly to another call. */
        if (may_reuse && !list_empty(&bundle->waiting_calls)) {
                trace_rxrpc_client(conn, channel, rxrpc_client_chan_pass);
                rxrpc_activate_one_channel(conn, channel);
-               goto out;
+               return;
        }
 
        /* Schedule the final ACK to be transmitted in a short while so that it
@@ -878,7 +633,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
        }
 
        /* Deactivate the channel. */
-       rcu_assign_pointer(chan->call, NULL);
+       chan->call = NULL;
        set_bit(conn->bundle_shift + channel, &conn->bundle->avail_chans);
        conn->act_chans &= ~(1 << channel);
 
@@ -891,15 +646,10 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
                conn->idle_timestamp = jiffies;
 
                rxrpc_get_connection(conn, rxrpc_conn_get_idle);
-               spin_lock(&local->client_conn_cache_lock);
                list_move_tail(&conn->cache_link, &local->idle_client_conns);
-               spin_unlock(&local->client_conn_cache_lock);
 
                rxrpc_set_client_reap_timer(local);
        }
-
-out:
-       spin_unlock(&bundle->channel_lock);
 }
 
 /*
@@ -909,7 +659,6 @@ static void rxrpc_unbundle_conn(struct rxrpc_connection *conn)
 {
        struct rxrpc_bundle *bundle = conn->bundle;
        unsigned int bindex;
-       bool need_drop = false;
        int i;
 
        _enter("C=%x", conn->debug_id);
@@ -917,18 +666,13 @@ static void rxrpc_unbundle_conn(struct rxrpc_connection *conn)
        if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
                rxrpc_process_delayed_final_acks(conn, true);
 
-       spin_lock(&bundle->channel_lock);
        bindex = conn->bundle_shift / RXRPC_MAXCALLS;
        if (bundle->conns[bindex] == conn) {
                _debug("clear slot %u", bindex);
                bundle->conns[bindex] = NULL;
                for (i = 0; i < RXRPC_MAXCALLS; i++)
                        clear_bit(conn->bundle_shift + i, &bundle->avail_chans);
-               need_drop = true;
-       }
-       spin_unlock(&bundle->channel_lock);
-
-       if (need_drop) {
+               rxrpc_put_client_connection_id(bundle->local, conn);
                rxrpc_deactivate_bundle(bundle);
                rxrpc_put_connection(conn, rxrpc_conn_put_unbundle);
        }
@@ -990,24 +734,16 @@ void rxrpc_discard_expired_client_conns(struct rxrpc_local *local)
 
        _enter("");
 
-       if (list_empty(&local->idle_client_conns)) {
-               _leave(" [empty]");
-               return;
-       }
-
        /* We keep an estimate of what the number of conns ought to be after
         * we've discarded some so that we don't overdo the discarding.
         */
        nr_conns = atomic_read(&local->rxnet->nr_client_conns);
 
 next:
-       spin_lock(&local->client_conn_cache_lock);
-
-       if (list_empty(&local->idle_client_conns))
-               goto out;
-
-       conn = list_entry(local->idle_client_conns.next,
-                         struct rxrpc_connection, cache_link);
+       conn = list_first_entry_or_null(&local->idle_client_conns,
+                                       struct rxrpc_connection, cache_link);
+       if (!conn)
+               return;
 
        if (!local->kill_all_client_conns) {
                /* If the number of connections is over the reap limit, we
@@ -1032,8 +768,6 @@ next:
        trace_rxrpc_client(conn, -1, rxrpc_client_discard);
        list_del_init(&conn->cache_link);
 
-       spin_unlock(&local->client_conn_cache_lock);
-
        rxrpc_unbundle_conn(conn);
        /* Drop the ->cache_link ref */
        rxrpc_put_connection(conn, rxrpc_conn_put_discard_idle);
@@ -1053,8 +787,6 @@ not_yet_expired:
        if (!local->kill_all_client_conns)
                timer_reduce(&local->client_conn_reap_timer, conn_expires_at);
 
-out:
-       spin_unlock(&local->client_conn_cache_lock);
        _leave("");
 }
 
@@ -1063,34 +795,19 @@ out:
  */
 void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
 {
-       struct rxrpc_connection *conn, *tmp;
-       LIST_HEAD(graveyard);
+       struct rxrpc_connection *conn;
 
        _enter("");
 
-       spin_lock(&local->client_conn_cache_lock);
        local->kill_all_client_conns = true;
-       spin_unlock(&local->client_conn_cache_lock);
 
        del_timer_sync(&local->client_conn_reap_timer);
 
-       spin_lock(&local->client_conn_cache_lock);
-
-       list_for_each_entry_safe(conn, tmp, &local->idle_client_conns,
-                                cache_link) {
-               if (conn->local == local) {
-                       atomic_dec(&conn->active);
-                       trace_rxrpc_client(conn, -1, rxrpc_client_discard);
-                       list_move(&conn->cache_link, &graveyard);
-               }
-       }
-
-       spin_unlock(&local->client_conn_cache_lock);
-
-       while (!list_empty(&graveyard)) {
-               conn = list_entry(graveyard.next,
-                                 struct rxrpc_connection, cache_link);
+       while ((conn = list_first_entry_or_null(&local->idle_client_conns,
+                                               struct rxrpc_connection, cache_link))) {
                list_del_init(&conn->cache_link);
+               atomic_dec(&conn->active);
+               trace_rxrpc_client(conn, -1, rxrpc_client_discard);
                rxrpc_unbundle_conn(conn);
                rxrpc_put_connection(conn, rxrpc_conn_put_local_dead);
        }
index 8d0b9ff0a5e154e50ddf4825c9ce3007ac1a20ac..44414e724415e907b05a65d1c51d322573148ba9 100644 (file)
@@ -100,9 +100,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
        /* If the last call got moved on whilst we were waiting to run, just
         * ignore this packet.
         */
-       call_id = READ_ONCE(chan->last_call);
-       /* Sync with __rxrpc_disconnect_call() */
-       smp_rmb();
+       call_id = chan->last_call;
        if (skb && call_id != sp->hdr.callNumber)
                return;
 
@@ -119,9 +117,12 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
        iov[2].iov_base = &ack_info;
        iov[2].iov_len  = sizeof(ack_info);
 
+       serial = atomic_inc_return(&conn->serial);
+
        pkt.whdr.epoch          = htonl(conn->proto.epoch);
        pkt.whdr.cid            = htonl(conn->proto.cid | channel);
        pkt.whdr.callNumber     = htonl(call_id);
+       pkt.whdr.serial         = htonl(serial);
        pkt.whdr.seq            = 0;
        pkt.whdr.type           = chan->last_type;
        pkt.whdr.flags          = conn->out_clientflag;
@@ -158,31 +159,15 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
                iov[0].iov_len += sizeof(pkt.ack);
                len += sizeof(pkt.ack) + 3 + sizeof(ack_info);
                ioc = 3;
-               break;
-
-       default:
-               return;
-       }
-
-       /* Resync with __rxrpc_disconnect_call() and check that the last call
-        * didn't get advanced whilst we were filling out the packets.
-        */
-       smp_rmb();
-       if (READ_ONCE(chan->last_call) != call_id)
-               return;
-
-       serial = atomic_inc_return(&conn->serial);
-       pkt.whdr.serial = htonl(serial);
 
-       switch (chan->last_type) {
-       case RXRPC_PACKET_TYPE_ABORT:
-               break;
-       case RXRPC_PACKET_TYPE_ACK:
                trace_rxrpc_tx_ack(chan->call_debug_id, serial,
                                   ntohl(pkt.ack.firstPacket),
                                   ntohl(pkt.ack.serial),
                                   pkt.ack.reason, 0);
                break;
+
+       default:
+               return;
        }
 
        ret = kernel_sendmsg(conn->local->socket, &msg, iov, ioc, len);
@@ -207,12 +192,8 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn)
 
        _enter("{%d},%x", conn->debug_id, conn->abort_code);
 
-       spin_lock(&conn->bundle->channel_lock);
-
        for (i = 0; i < RXRPC_MAXCALLS; i++) {
-               call = rcu_dereference_protected(
-                       conn->channels[i].call,
-                       lockdep_is_held(&conn->bundle->channel_lock));
+               call = conn->channels[i].call;
                if (call)
                        rxrpc_set_call_completion(call,
                                                  conn->completion,
@@ -220,7 +201,6 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn)
                                                  conn->error);
        }
 
-       spin_unlock(&conn->bundle->channel_lock);
        _leave("");
 }
 
@@ -316,9 +296,7 @@ again:
                if (!test_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags))
                        continue;
 
-               smp_rmb(); /* vs rxrpc_disconnect_client_call */
-               ack_at = READ_ONCE(chan->final_ack_at);
-
+               ack_at = chan->final_ack_at;
                if (time_before(j, ack_at) && !force) {
                        if (time_before(ack_at, next_j)) {
                                next_j = ack_at;
@@ -446,15 +424,8 @@ void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb)
                if (conn->state != RXRPC_CONN_SERVICE)
                        break;
 
-               spin_lock(&conn->bundle->channel_lock);
-
                for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
-                       rxrpc_call_is_secure(
-                               rcu_dereference_protected(
-                                       conn->channels[loop].call,
-                                       lockdep_is_held(&conn->bundle->channel_lock)));
-
-               spin_unlock(&conn->bundle->channel_lock);
+                       rxrpc_call_is_secure(conn->channels[loop].call);
                break;
        }
 
index 3d8c1dc6a82ae6e76125d834f7775a70f78f541e..ac85d4644a3c3a047066129e69c40a626557f1c7 100644 (file)
@@ -67,6 +67,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *rxnet,
                INIT_WORK(&conn->destructor, rxrpc_clean_up_connection);
                INIT_LIST_HEAD(&conn->proc_link);
                INIT_LIST_HEAD(&conn->link);
+               mutex_init(&conn->security_lock);
                skb_queue_head_init(&conn->rx_queue);
                conn->rxnet = rxnet;
                conn->security = &rxrpc_no_security;
@@ -157,7 +158,7 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
 
        _enter("%d,%x", conn->debug_id, call->cid);
 
-       if (rcu_access_pointer(chan->call) == call) {
+       if (chan->call == call) {
                /* Save the result of the call so that we can repeat it if necessary
                 * through the channel, whilst disposing of the actual call record.
                 */
@@ -177,12 +178,9 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
                        break;
                }
 
-               /* Sync with rxrpc_conn_retransmit(). */
-               smp_wmb();
                chan->last_call = chan->call_id;
                chan->call_id = chan->call_counter;
-
-               rcu_assign_pointer(chan->call, NULL);
+               chan->call = NULL;
        }
 
        _leave("");
@@ -210,10 +208,7 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
        if (rxrpc_is_client_call(call)) {
                rxrpc_disconnect_client_call(call->bundle, call);
        } else {
-               spin_lock(&conn->bundle->channel_lock);
                __rxrpc_disconnect_call(conn, call);
-               spin_unlock(&conn->bundle->channel_lock);
-
                conn->idle_timestamp = jiffies;
                if (atomic_dec_and_test(&conn->active))
                        rxrpc_set_service_reap_timer(conn->rxnet,
@@ -316,10 +311,10 @@ static void rxrpc_clean_up_connection(struct work_struct *work)
                container_of(work, struct rxrpc_connection, destructor);
        struct rxrpc_net *rxnet = conn->rxnet;
 
-       ASSERT(!rcu_access_pointer(conn->channels[0].call) &&
-              !rcu_access_pointer(conn->channels[1].call) &&
-              !rcu_access_pointer(conn->channels[2].call) &&
-              !rcu_access_pointer(conn->channels[3].call));
+       ASSERT(!conn->channels[0].call &&
+              !conn->channels[1].call &&
+              !conn->channels[2].call &&
+              !conn->channels[3].call);
        ASSERT(list_empty(&conn->cache_link));
 
        del_timer_sync(&conn->timer);
index 2a55a88b2a5b7e5f07204395e39b96130324659c..f30323de82bd11873e7bef81399d3ca269df3fce 100644 (file)
@@ -11,7 +11,6 @@
 static struct rxrpc_bundle rxrpc_service_dummy_bundle = {
        .ref            = REFCOUNT_INIT(1),
        .debug_id       = UINT_MAX,
-       .channel_lock   = __SPIN_LOCK_UNLOCKED(&rxrpc_service_dummy_bundle.channel_lock),
 };
 
 /*
index a299cc34c1403aaa2c9e8fb24ec56b6a1f015469..9e9dfb2fc559be6c2615ecaf2e675f9e3c73d752 100644 (file)
@@ -369,10 +369,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
                return just_discard;
        }
 
-       rcu_read_lock();
-       call = rxrpc_try_get_call(rcu_dereference(chan->call),
-                                 rxrpc_call_get_input);
-       rcu_read_unlock();
+       call = rxrpc_try_get_call(chan->call, rxrpc_call_get_input);
 
        if (sp->hdr.callNumber > chan->call_id) {
                if (rxrpc_to_client(sp)) {
@@ -453,6 +450,9 @@ int rxrpc_io_thread(void *data)
                        continue;
                }
 
+               if (!list_empty(&local->new_client_calls))
+                       rxrpc_connect_client_calls(local);
+
                /* Process received packets and errors. */
                if ((skb = __skb_dequeue(&rx_queue))) {
                        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
@@ -492,7 +492,10 @@ int rxrpc_io_thread(void *data)
                should_stop = kthread_should_stop();
                if (!skb_queue_empty(&local->rx_queue) ||
                    !list_empty(&local->call_attend_q) ||
-                   !list_empty(&local->conn_attend_q)) {
+                   !list_empty(&local->conn_attend_q) ||
+                   !list_empty(&local->new_client_calls) ||
+                   test_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
+                            &local->client_conn_flags)) {
                        __set_current_state(TASK_RUNNING);
                        continue;
                }
index 9bc8d08ca12cf09136dfd9a6805bb947b72e296c..b8eaca5d9f221362517b26aeeea91739c18bba24 100644 (file)
@@ -117,7 +117,6 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
                local->client_bundles = RB_ROOT;
                spin_lock_init(&local->client_bundles_lock);
                local->kill_all_client_conns = false;
-               spin_lock_init(&local->client_conn_cache_lock);
                INIT_LIST_HEAD(&local->idle_client_conns);
                timer_setup(&local->client_conn_reap_timer,
                            rxrpc_client_conn_reap_timeout, 0);
@@ -133,7 +132,8 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
                if (tmp == 0)
                        tmp = 1;
                idr_set_cursor(&local->conn_ids, tmp);
-               spin_lock_init(&local->conn_lock);
+               INIT_LIST_HEAD(&local->new_client_calls);
+               spin_lock_init(&local->client_call_lock);
 
                trace_rxrpc_local(local->debug_id, rxrpc_local_new, 1, 1);
        }
@@ -435,7 +435,7 @@ void rxrpc_destroy_local(struct rxrpc_local *local)
         * local endpoint.
         */
        rxrpc_purge_queue(&local->rx_queue);
-       rxrpc_destroy_client_conn_ids(local);
+       rxrpc_purge_client_connections(local);
 }
 
 /*
index c39ef94602ed712ad07f1e8c7f0e9399fa2c5d6c..750158a085cdd1a6f8fe3bbe9cfec01f32ca3f3e 100644 (file)
@@ -12,6 +12,7 @@
 
 static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = {
        [RXRPC_CONN_UNUSED]                     = "Unused  ",
+       [RXRPC_CONN_CLIENT_UNSECURED]           = "ClUnsec ",
        [RXRPC_CONN_CLIENT]                     = "Client  ",
        [RXRPC_CONN_SERVICE_PREALLOC]           = "SvPrealc",
        [RXRPC_CONN_SERVICE_UNSECURED]          = "SvUnsec ",
index dfb01e7b90fbbc1240ec089eeaf6842355dda12c..1bf571a66e020d263ceb1d5a4489253b8fbf9728 100644 (file)
@@ -1122,36 +1122,31 @@ static int rxkad_verify_response(struct rxrpc_connection *conn,
                goto protocol_error_free;
        }
 
-       spin_lock(&conn->bundle->channel_lock);
        for (i = 0; i < RXRPC_MAXCALLS; i++) {
-               struct rxrpc_call *call;
                u32 call_id = ntohl(response->encrypted.call_id[i]);
+               u32 counter = READ_ONCE(conn->channels[i].call_counter);
 
                if (call_id > INT_MAX) {
                        rxrpc_abort_conn(conn, skb, RXKADSEALEDINCON, -EPROTO,
                                         rxkad_abort_resp_bad_callid);
-                       goto protocol_error_unlock;
+                       goto protocol_error_free;
                }
 
-               if (call_id < conn->channels[i].call_counter) {
+               if (call_id < counter) {
                        rxrpc_abort_conn(conn, skb, RXKADSEALEDINCON, -EPROTO,
                                         rxkad_abort_resp_call_ctr);
-                       goto protocol_error_unlock;
+                       goto protocol_error_free;
                }
 
-               if (call_id > conn->channels[i].call_counter) {
-                       call = rcu_dereference_protected(
-                               conn->channels[i].call,
-                               lockdep_is_held(&conn->bundle->channel_lock));
-                       if (call && !__rxrpc_call_is_complete(call)) {
+               if (call_id > counter) {
+                       if (conn->channels[i].call) {
                                rxrpc_abort_conn(conn, skb, RXKADSEALEDINCON, -EPROTO,
                                                 rxkad_abort_resp_call_state);
-                               goto protocol_error_unlock;
+                               goto protocol_error_free;
                        }
                        conn->channels[i].call_counter = call_id;
                }
        }
-       spin_unlock(&conn->bundle->channel_lock);
 
        if (ntohl(response->encrypted.inc_nonce) != conn->rxkad.nonce + 1) {
                rxrpc_abort_conn(conn, skb, RXKADOUTOFSEQUENCE, -EPROTO,
@@ -1179,8 +1174,6 @@ static int rxkad_verify_response(struct rxrpc_connection *conn,
        _leave(" = 0");
        return 0;
 
-protocol_error_unlock:
-       spin_unlock(&conn->bundle->channel_lock);
 protocol_error_free:
        kfree(ticket);
 protocol_error:
index 78af14694618d433b669e0276f47ccc6563a2f4d..cd66634dffe694fac055ccafd64118e4002ec984 100644 (file)
@@ -97,38 +97,31 @@ found:
  */
 int rxrpc_init_client_conn_security(struct rxrpc_connection *conn)
 {
-       const struct rxrpc_security *sec;
        struct rxrpc_key_token *token;
        struct key *key = conn->key;
-       int ret;
+       int ret = 0;
 
        _enter("{%d},{%x}", conn->debug_id, key_serial(key));
 
-       if (!key)
-               return 0;
-
-       ret = key_validate(key);
-       if (ret < 0)
-               return ret;
-
        for (token = key->payload.data[0]; token; token = token->next) {
-               sec = rxrpc_security_lookup(token->security_index);
-               if (sec)
+               if (token->security_index == conn->security->security_index)
                        goto found;
        }
        return -EKEYREJECTED;
 
 found:
-       conn->security = sec;
-
-       ret = conn->security->init_connection_security(conn, token);
-       if (ret < 0) {
-               conn->security = &rxrpc_no_security;
-               return ret;
+       mutex_lock(&conn->security_lock);
+       if (conn->state == RXRPC_CONN_CLIENT_UNSECURED) {
+               ret = conn->security->init_connection_security(conn, token);
+               if (ret == 0) {
+                       spin_lock(&conn->state_lock);
+                       if (conn->state == RXRPC_CONN_CLIENT_UNSECURED)
+                               conn->state = RXRPC_CONN_CLIENT;
+                       spin_unlock(&conn->state_lock);
+               }
        }
-
-       _leave(" = 0");
-       return 0;
+       mutex_unlock(&conn->security_lock);
+       return ret;
 }
 
 /*
index a5d0005b7ce527f826093ff00ede65190aff8105..da49fcf1c45674ce8299aa7bd927679a31bf2eb1 100644 (file)
@@ -38,6 +38,60 @@ bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
        return false;
 }
 
+/*
+ * Wait for a call to become connected.  Interruption here doesn't cause the
+ * call to be aborted.
+ */
+static int rxrpc_wait_to_be_connected(struct rxrpc_call *call, long *timeo)
+{
+       DECLARE_WAITQUEUE(myself, current);
+       int ret = 0;
+
+       _enter("%d", call->debug_id);
+
+       if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN)
+               return call->error;
+
+       add_wait_queue_exclusive(&call->waitq, &myself);
+
+       for (;;) {
+               ret = call->error;
+               if (ret < 0)
+                       break;
+
+               switch (call->interruptibility) {
+               case RXRPC_INTERRUPTIBLE:
+               case RXRPC_PREINTERRUPTIBLE:
+                       set_current_state(TASK_INTERRUPTIBLE);
+                       break;
+               case RXRPC_UNINTERRUPTIBLE:
+               default:
+                       set_current_state(TASK_UNINTERRUPTIBLE);
+                       break;
+               }
+               if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) {
+                       ret = call->error;
+                       break;
+               }
+               if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
+                    call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
+                   signal_pending(current)) {
+                       ret = sock_intr_errno(*timeo);
+                       break;
+               }
+               *timeo = schedule_timeout(*timeo);
+       }
+
+       remove_wait_queue(&call->waitq, &myself);
+       __set_current_state(TASK_RUNNING);
+
+       if (ret == 0 && rxrpc_call_is_complete(call))
+               ret = call->error;
+
+       _leave(" = %d", ret);
+       return ret;
+}
+
 /*
  * Return true if there's sufficient Tx queue space.
  */
@@ -239,6 +293,16 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 
        timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
 
+       ret = rxrpc_wait_to_be_connected(call, &timeo);
+       if (ret < 0)
+               return ret;
+
+       if (call->conn->state == RXRPC_CONN_CLIENT_UNSECURED) {
+               ret = rxrpc_init_client_conn_security(call->conn);
+               if (ret < 0)
+                       return ret;
+       }
+
        /* this should be in poll */
        sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);