/* Local endpoint object management
*
- * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2016 Red Hat, Inc. All Rights Reserved.
* Written by David Howells (dhowells@redhat.com)
*
* This program is free software; you can redistribute it and/or
#include <linux/slab.h>
#include <linux/udp.h>
#include <linux/ip.h>
+#include <linux/hashtable.h>
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "ar-internal.h"
-static LIST_HEAD(rxrpc_locals);
-DEFINE_RWLOCK(rxrpc_local_lock);
-static DECLARE_RWSEM(rxrpc_local_sem);
-static DECLARE_WAIT_QUEUE_HEAD(rxrpc_local_wq);
+static void rxrpc_local_processor(struct work_struct *);
+static void rxrpc_local_rcu(struct rcu_head *);
-static void rxrpc_destroy_local(struct work_struct *work);
+static DEFINE_MUTEX(rxrpc_local_mutex);
+static LIST_HEAD(rxrpc_local_endpoints);
/*
- * allocate a new local
+ * Compare a local to an address. Return -ve, 0 or +ve to indicate less than,
+ * same or greater than.
+ *
+ * We explicitly don't compare the RxRPC service ID as we want to reject
+ * conflicting uses by differing services. Further, we don't want to share
+ * addresses with different options (IPv6), so we don't compare those bits
+ * either.
*/
-static
-struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx)
+static long rxrpc_local_cmp_key(const struct rxrpc_local *local,
+ const struct sockaddr_rxrpc *srx)
+{
+ long diff;
+
+ diff = ((local->srx.transport_type - srx->transport_type) ?:
+ (local->srx.transport_len - srx->transport_len) ?:
+ (local->srx.transport.family - srx->transport.family));
+ if (diff != 0)
+ return diff;
+
+ switch (srx->transport.family) {
+ case AF_INET:
+ /* If the choice of UDP port is left up to the transport, then
+ * the endpoint record doesn't match.
+ */
+ return ((u16 __force)local->srx.transport.sin.sin_port -
+ (u16 __force)srx->transport.sin.sin_port) ?:
+ memcmp(&local->srx.transport.sin.sin_addr,
+ &srx->transport.sin.sin_addr,
+ sizeof(struct in_addr));
+ default:
+ BUG();
+ }
+}
+
+/*
+ * Allocate a new local endpoint.
+ */
+static struct rxrpc_local *rxrpc_alloc_local(const struct sockaddr_rxrpc *srx)
{
struct rxrpc_local *local;
local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
if (local) {
- INIT_WORK(&local->destroyer, &rxrpc_destroy_local);
- INIT_WORK(&local->acceptor, &rxrpc_accept_incoming_calls);
- INIT_WORK(&local->rejecter, &rxrpc_reject_packets);
- INIT_WORK(&local->event_processor, &rxrpc_process_local_events);
- INIT_LIST_HEAD(&local->services);
+ atomic_set(&local->usage, 1);
INIT_LIST_HEAD(&local->link);
+ INIT_WORK(&local->processor, rxrpc_local_processor);
+ INIT_LIST_HEAD(&local->services);
init_rwsem(&local->defrag_sem);
skb_queue_head_init(&local->accept_queue);
skb_queue_head_init(&local->reject_queue);
skb_queue_head_init(&local->event_queue);
+ mutex_init(&local->conn_lock);
spin_lock_init(&local->lock);
rwlock_init(&local->services_lock);
- atomic_set(&local->usage, 1);
local->debug_id = atomic_inc_return(&rxrpc_debug_id);
memcpy(&local->srx, srx, sizeof(*srx));
}
/*
* create the local socket
- * - must be called with rxrpc_local_sem writelocked
+ * - must be called with rxrpc_local_mutex locked
*/
-static int rxrpc_create_local(struct rxrpc_local *local)
+static int rxrpc_open_socket(struct rxrpc_local *local)
{
struct sock *sock;
int ret, opt;
if (local->srx.transport_len > sizeof(sa_family_t)) {
_debug("bind");
ret = kernel_bind(local->socket,
- (struct sockaddr *) &local->srx.transport,
+ (struct sockaddr *)&local->srx.transport,
local->srx.transport_len);
if (ret < 0) {
- _debug("bind failed");
+ _debug("bind failed %d", ret);
goto error;
}
}
goto error;
}
- write_lock_bh(&rxrpc_local_lock);
- list_add(&local->link, &rxrpc_locals);
- write_unlock_bh(&rxrpc_local_lock);
-
/* set the socket up */
sock = local->socket->sk;
sock->sk_user_data = local;
}
/*
- * create a new local endpoint using the specified UDP address
+ * Look up or create a new local endpoint using the specified local address.
*/
-struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *srx)
+struct rxrpc_local *rxrpc_lookup_local(const struct sockaddr_rxrpc *srx)
{
struct rxrpc_local *local;
+ struct list_head *cursor;
+ const char *age;
+ long diff;
int ret;
- _enter("{%d,%u,%pI4+%hu}",
- srx->transport_type,
- srx->transport.family,
- &srx->transport.sin.sin_addr,
- ntohs(srx->transport.sin.sin_port));
-
- down_write(&rxrpc_local_sem);
+ if (srx->transport.family == AF_INET) {
+ _enter("{%d,%u,%pI4+%hu}",
+ srx->transport_type,
+ srx->transport.family,
+ &srx->transport.sin.sin_addr,
+ ntohs(srx->transport.sin.sin_port));
+ } else {
+ _enter("{%d,%u}",
+ srx->transport_type,
+ srx->transport.family);
+ return ERR_PTR(-EAFNOSUPPORT);
+ }
- /* see if we have a suitable local local endpoint already */
- read_lock_bh(&rxrpc_local_lock);
+ mutex_lock(&rxrpc_local_mutex);
- list_for_each_entry(local, &rxrpc_locals, link) {
- _debug("CMP {%d,%u,%pI4+%hu}",
- local->srx.transport_type,
- local->srx.transport.family,
- &local->srx.transport.sin.sin_addr,
- ntohs(local->srx.transport.sin.sin_port));
+ for (cursor = rxrpc_local_endpoints.next;
+ cursor != &rxrpc_local_endpoints;
+ cursor = cursor->next) {
+ local = list_entry(cursor, struct rxrpc_local, link);
- if (local->srx.transport_type != srx->transport_type ||
- local->srx.transport.family != srx->transport.family)
+ diff = rxrpc_local_cmp_key(local, srx);
+ if (diff < 0)
continue;
+ if (diff > 0)
+ break;
+
+ /* Services aren't allowed to share transport sockets, so
+ * reject that here. It is possible that the object is dying -
+ * but it may also still have the local transport address that
+ * we want bound.
+ */
+ if (srx->srx_service) {
+ local = NULL;
+ goto addr_in_use;
+ }
- switch (srx->transport.family) {
- case AF_INET:
- if (local->srx.transport.sin.sin_port !=
- srx->transport.sin.sin_port)
- continue;
- if (memcmp(&local->srx.transport.sin.sin_addr,
- &srx->transport.sin.sin_addr,
- sizeof(struct in_addr)) != 0)
- continue;
- goto found_local;
-
- default:
- BUG();
+ /* Found a match. We replace a dying object. Attempting to
+ * bind the transport socket may still fail if we're attempting
+ * to use a local address that the dying object is still using.
+ */
+ if (!atomic_inc_not_zero(&local->usage)) {
+ cursor = cursor->next;
+ list_del_init(&local->link);
+ break;
}
- }
- read_unlock_bh(&rxrpc_local_lock);
+ age = "old";
+ goto found;
+ }
- /* we didn't find one, so we need to create one */
local = rxrpc_alloc_local(srx);
- if (!local) {
- up_write(&rxrpc_local_sem);
- return ERR_PTR(-ENOMEM);
- }
+ if (!local)
+ goto nomem;
- ret = rxrpc_create_local(local);
- if (ret < 0) {
- up_write(&rxrpc_local_sem);
- kfree(local);
- _leave(" = %d", ret);
- return ERR_PTR(ret);
- }
+ ret = rxrpc_open_socket(local);
+ if (ret < 0)
+ goto sock_error;
+
+ list_add_tail(&local->link, cursor);
+ age = "new";
- up_write(&rxrpc_local_sem);
+found:
+ mutex_unlock(&rxrpc_local_mutex);
- _net("LOCAL new %d {%d,%u,%pI4+%hu}",
+ _net("LOCAL %s %d {%d,%u,%pI4+%hu}",
+ age,
local->debug_id,
local->srx.transport_type,
local->srx.transport.family,
&local->srx.transport.sin.sin_addr,
ntohs(local->srx.transport.sin.sin_port));
- _leave(" = %p [new]", local);
+ _leave(" = %p", local);
return local;
-found_local:
- rxrpc_get_local(local);
- read_unlock_bh(&rxrpc_local_lock);
- up_write(&rxrpc_local_sem);
+nomem:
+ ret = -ENOMEM;
+sock_error:
+ mutex_unlock(&rxrpc_local_mutex);
+ kfree(local);
+ _leave(" = %d", ret);
+ return ERR_PTR(ret);
- _net("LOCAL old %d {%d,%u,%pI4+%hu}",
- local->debug_id,
- local->srx.transport_type,
- local->srx.transport.family,
- &local->srx.transport.sin.sin_addr,
- ntohs(local->srx.transport.sin.sin_port));
+addr_in_use:
+ mutex_unlock(&rxrpc_local_mutex);
+ _leave(" = -EADDRINUSE");
+ return ERR_PTR(-EADDRINUSE);
+}
- _leave(" = %p [reuse]", local);
- return local;
+/*
+ * A local endpoint reached its end of life.
+ */
+void __rxrpc_put_local(struct rxrpc_local *local)
+{
+ _enter("%d", local->debug_id);
+ rxrpc_queue_work(&local->processor);
}
/*
- * release a local endpoint
+ * Destroy a local endpoint's socket and then hand the record to RCU to dispose
+ * of.
+ *
+ * Closing the socket cannot be done from bottom half context or RCU callback
+ * context because it might sleep.
*/
-void rxrpc_put_local(struct rxrpc_local *local)
+static void rxrpc_local_destroyer(struct rxrpc_local *local)
{
- _enter("%p{u=%d}", local, atomic_read(&local->usage));
+ struct socket *socket = local->socket;
- ASSERTCMP(atomic_read(&local->usage), >, 0);
+ _enter("%d", local->debug_id);
- /* to prevent a race, the decrement and the dequeue must be effectively
- * atomic */
- write_lock_bh(&rxrpc_local_lock);
- if (unlikely(atomic_dec_and_test(&local->usage))) {
- _debug("destroy local");
- rxrpc_queue_work(&local->destroyer);
+ /* We can get a race between an incoming call packet queueing the
+ * processor again and the work processor starting the destruction
+ * process which will shut down the UDP socket.
+ */
+ if (local->dead) {
+ _leave(" [already dead]");
+ return;
}
- write_unlock_bh(&rxrpc_local_lock);
- _leave("");
+ local->dead = true;
+
+ mutex_lock(&rxrpc_local_mutex);
+ list_del_init(&local->link);
+ mutex_unlock(&rxrpc_local_mutex);
+
+ ASSERT(list_empty(&local->services));
+
+ if (socket) {
+ local->socket = NULL;
+ kernel_sock_shutdown(socket, SHUT_RDWR);
+ socket->sk->sk_user_data = NULL;
+ sock_release(socket);
+ }
+
+ /* At this point, there should be no more packets coming in to the
+ * local endpoint.
+ */
+ rxrpc_purge_queue(&local->accept_queue);
+ rxrpc_purge_queue(&local->reject_queue);
+ rxrpc_purge_queue(&local->event_queue);
+
+ _debug("rcu local %d", local->debug_id);
+ call_rcu(&local->rcu, rxrpc_local_rcu);
}
/*
- * destroy a local endpoint
+ * Process events on an endpoint
*/
-static void rxrpc_destroy_local(struct work_struct *work)
+static void rxrpc_local_processor(struct work_struct *work)
{
struct rxrpc_local *local =
- container_of(work, struct rxrpc_local, destroyer);
+ container_of(work, struct rxrpc_local, processor);
+ bool again;
- _enter("%p{%d}", local, atomic_read(&local->usage));
+ _enter("%d", local->debug_id);
- down_write(&rxrpc_local_sem);
+ do {
+ again = false;
+ if (atomic_read(&local->usage) == 0)
+ return rxrpc_local_destroyer(local);
- write_lock_bh(&rxrpc_local_lock);
- if (atomic_read(&local->usage) > 0) {
- write_unlock_bh(&rxrpc_local_lock);
- up_read(&rxrpc_local_sem);
- _leave(" [resurrected]");
- return;
- }
+ if (!skb_queue_empty(&local->accept_queue)) {
+ rxrpc_accept_incoming_calls(local);
+ again = true;
+ }
- list_del(&local->link);
- local->socket->sk->sk_user_data = NULL;
- write_unlock_bh(&rxrpc_local_lock);
+ if (!skb_queue_empty(&local->reject_queue)) {
+ rxrpc_reject_packets(local);
+ again = true;
+ }
- downgrade_write(&rxrpc_local_sem);
+ if (!skb_queue_empty(&local->event_queue)) {
+ rxrpc_process_local_events(local);
+ again = true;
+ }
+ } while (again);
+}
- ASSERT(list_empty(&local->services));
- ASSERT(!work_pending(&local->acceptor));
- ASSERT(!work_pending(&local->rejecter));
- ASSERT(!work_pending(&local->event_processor));
+/*
+ * Destroy a local endpoint after the RCU grace period expires.
+ */
+static void rxrpc_local_rcu(struct rcu_head *rcu)
+{
+ struct rxrpc_local *local = container_of(rcu, struct rxrpc_local, rcu);
- /* finish cleaning up the local descriptor */
- rxrpc_purge_queue(&local->accept_queue);
- rxrpc_purge_queue(&local->reject_queue);
- rxrpc_purge_queue(&local->event_queue);
- kernel_sock_shutdown(local->socket, SHUT_RDWR);
- sock_release(local->socket);
+ _enter("%d", local->debug_id);
- up_read(&rxrpc_local_sem);
+ ASSERT(!work_pending(&local->processor));
_net("DESTROY LOCAL %d", local->debug_id);
kfree(local);
-
- if (list_empty(&rxrpc_locals))
- wake_up_all(&rxrpc_local_wq);
-
_leave("");
}
/*
- * preemptively destroy all local local endpoint rather than waiting for
- * them to be destroyed
+ * Verify the local endpoint list is empty by this point.
*/
void __exit rxrpc_destroy_all_locals(void)
{
- DECLARE_WAITQUEUE(myself,current);
+ struct rxrpc_local *local;
_enter("");
- /* we simply have to wait for them to go away */
- if (!list_empty(&rxrpc_locals)) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- add_wait_queue(&rxrpc_local_wq, &myself);
-
- while (!list_empty(&rxrpc_locals)) {
- schedule();
- set_current_state(TASK_UNINTERRUPTIBLE);
- }
+ if (list_empty(&rxrpc_local_endpoints))
+ return;
- remove_wait_queue(&rxrpc_local_wq, &myself);
- set_current_state(TASK_RUNNING);
+ mutex_lock(&rxrpc_local_mutex);
+ list_for_each_entry(local, &rxrpc_local_endpoints, link) {
+ pr_err("AF_RXRPC: Leaked local %p {%d}\n",
+ local, atomic_read(&local->usage));
}
-
- _leave("");
+ mutex_unlock(&rxrpc_local_mutex);
+ BUG();
}