]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blobdiff - net/tipc/socket.c
sched/headers: Prepare to move signal wakeup & sigpending methods from <linux/sched...
[mirror_ubuntu-artful-kernel.git] / net / tipc / socket.c
index 800caaa699a1669f6efe228e51974e6e2bd19e60..43e4045e72bc00cfbc9db6c1bf987a46e272969b 100644 (file)
@@ -35,6 +35,8 @@
  */
 
 #include <linux/rhashtable.h>
+#include <linux/sched/signal.h>
+
 #include "core.h"
 #include "name_table.h"
 #include "node.h"
@@ -67,16 +69,19 @@ enum {
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
  * @portid: unique port identity in TIPC socket hash table
  * @phdr: preformatted message header used when sending messages
+ * #cong_links: list of congested links
  * @publications: list of publications for port
+ * @blocking_link: address of the congested link we are currently sleeping on
  * @pub_count: total # of publications port has made during its lifetime
  * @probing_state:
  * @conn_timeout: the time we can wait for an unresponded setup request
  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
- * @link_cong: non-zero if owner must sleep because of link congestion
+ * @cong_link_cnt: number of congested links
  * @sent_unacked: # messages sent by socket, and not yet acked by peer
  * @rcv_unacked: # messages read by user, but not yet acked back to peer
  * @peer: 'connected' peer for dgram/rdm
  * @node: hash table node
+ * @mc_method: cookie for use between socket and broadcast layer
  * @rcu: rcu struct for tipc_sock
  */
 struct tipc_sock {
@@ -87,13 +92,13 @@ struct tipc_sock {
        u32 max_pkt;
        u32 portid;
        struct tipc_msg phdr;
-       struct list_head sock_list;
+       struct list_head cong_links;
        struct list_head publications;
        u32 pub_count;
        uint conn_timeout;
        atomic_t dupl_rcvcnt;
        bool probe_unacked;
-       bool link_cong;
+       u16 cong_link_cnt;
        u16 snt_unacked;
        u16 snd_win;
        u16 peer_caps;
@@ -101,6 +106,7 @@ struct tipc_sock {
        u16 rcv_win;
        struct sockaddr_tipc peer;
        struct rhash_head node;
+       struct tipc_mc_method mc_method;
        struct rcu_head rcu;
 };
 
@@ -110,7 +116,6 @@ static void tipc_write_space(struct sock *sk);
 static void tipc_sock_destruct(struct sock *sk);
 static int tipc_release(struct socket *sock);
 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
-static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
 static void tipc_sk_timeout(unsigned long data);
 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
                           struct tipc_name_seq const *seq);
@@ -119,8 +124,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
 static int tipc_sk_insert(struct tipc_sock *tsk);
 static void tipc_sk_remove(struct tipc_sock *tsk);
-static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
-                             size_t dsz);
+static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
 
 static const struct proto_ops packet_ops;
@@ -334,6 +338,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)
        return res;
 }
 
+static int tipc_sk_sock_err(struct socket *sock, long *timeout)
+{
+       struct sock *sk = sock->sk;
+       int err = sock_error(sk);
+       int typ = sock->type;
+
+       if (err)
+               return err;
+       if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
+               if (sk->sk_state == TIPC_DISCONNECTING)
+                       return -EPIPE;
+               else if (!tipc_sk_connected(sk))
+                       return -ENOTCONN;
+       }
+       if (!*timeout)
+               return -EAGAIN;
+       if (signal_pending(current))
+               return sock_intr_errno(*timeout);
+
+       return 0;
+}
+
+#define tipc_wait_for_cond(sock_, timeout_, condition_)                        \
+({                                                                     \
+       int rc_ = 0;                                                    \
+       int done_ = 0;                                                  \
+                                                                       \
+       while (!(condition_) && !done_) {                               \
+               struct sock *sk_ = sock->sk;                            \
+               DEFINE_WAIT_FUNC(wait_, woken_wake_function);           \
+                                                                       \
+               rc_ = tipc_sk_sock_err(sock_, timeout_);                \
+               if (rc_)                                                \
+                       break;                                          \
+               prepare_to_wait(sk_sleep(sk_), &wait_,                  \
+                               TASK_INTERRUPTIBLE);                    \
+               done_ = sk_wait_event(sk_, timeout_,                    \
+                                     (condition_), &wait_);            \
+               remove_wait_queue(sk_sleep(sk_), &wait_);               \
+       }                                                               \
+       rc_;                                                            \
+})
+
 /**
  * tipc_sk_create - create a TIPC socket
  * @net: network namespace (must be default network)
@@ -382,10 +429,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
        tsk = tipc_sk(sk);
        tsk->max_pkt = MAX_PKT_DEFAULT;
        INIT_LIST_HEAD(&tsk->publications);
+       INIT_LIST_HEAD(&tsk->cong_links);
        msg = &tsk->phdr;
        tn = net_generic(sock_net(sk), tipc_net_id);
-       tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
-                     NAMED_H_SIZE, 0);
 
        /* Finish initializing socket data structures */
        sock->ops = ops;
@@ -395,6 +441,13 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
                pr_warn("Socket create failed; port number exhausted\n");
                return -EINVAL;
        }
+
+       /* Ensure tsk is visible before we read own_addr. */
+       smp_mb();
+
+       tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
+                     NAMED_H_SIZE, 0);
+
        msg_set_origport(msg, tsk->portid);
        setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
        sk->sk_shutdown = 0;
@@ -432,9 +485,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
        struct net *net = sock_net(sk);
+       long timeout = CONN_TIMEOUT_DEFAULT;
        u32 dnode = tsk_peer_node(tsk);
        struct sk_buff *skb;
 
+       /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
+       tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
+                                           !tsk_conn_cong(tsk)));
+
        /* Reject all unreceived messages, except on an active connection
         * (which disconnects locally & sends a 'FIN+' to peer).
         */
@@ -505,7 +563,8 @@ static int tipc_release(struct socket *sock)
 
        /* Reject any messages that accumulated in backlog queue */
        release_sock(sk);
-
+       u32_list_purge(&tsk->cong_links);
+       tsk->cong_link_cnt = 0;
        call_rcu(&tsk->rcu, tipc_sk_callback);
        sock->sk = NULL;
 
@@ -648,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 
        switch (sk->sk_state) {
        case TIPC_ESTABLISHED:
-               if (!tsk->link_cong && !tsk_conn_cong(tsk))
+               if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
                        mask |= POLLOUT;
                /* fall thru' */
        case TIPC_LISTEN:
@@ -657,7 +716,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
                        mask |= (POLLIN | POLLRDNORM);
                break;
        case TIPC_OPEN:
-               if (!tsk->link_cong)
+               if (!tsk->cong_link_cnt)
                        mask |= POLLOUT;
                if (tipc_sk_type_connectionless(sk) &&
                    (!skb_queue_empty(&sk->sk_receive_queue)))
@@ -676,63 +735,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
  * @sock: socket structure
  * @seq: destination address
  * @msg: message to send
- * @dsz: total length of message data
- * @timeo: timeout to wait for wakeup
+ * @dlen: length of data to send
+ * @timeout: timeout to wait for wakeup
  *
  * Called from function tipc_sendmsg(), which has done all sanity checks
  * Returns the number of bytes sent on success, or errno
  */
 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
-                         struct msghdr *msg, size_t dsz, long timeo)
+                         struct msghdr *msg, size_t dlen, long timeout)
 {
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
+       struct tipc_msg *hdr = &tsk->phdr;
        struct net *net = sock_net(sk);
-       struct tipc_msg *mhdr = &tsk->phdr;
-       struct sk_buff_head pktchain;
-       struct iov_iter save = msg->msg_iter;
-       uint mtu;
+       int mtu = tipc_bcast_get_mtu(net);
+       struct tipc_mc_method *method = &tsk->mc_method;
+       u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
+       struct sk_buff_head pkts;
+       struct tipc_nlist dsts;
        int rc;
 
-       if (!timeo && tsk->link_cong)
-               return -ELINKCONG;
+       /* Block or return if any destination link is congested */
+       rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
+       if (unlikely(rc))
+               return rc;
 
-       msg_set_type(mhdr, TIPC_MCAST_MSG);
-       msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
-       msg_set_destport(mhdr, 0);
-       msg_set_destnode(mhdr, 0);
-       msg_set_nametype(mhdr, seq->type);
-       msg_set_namelower(mhdr, seq->lower);
-       msg_set_nameupper(mhdr, seq->upper);
-       msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
+       /* Lookup destination nodes */
+       tipc_nlist_init(&dsts, tipc_own_addr(net));
+       tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
+                                     seq->upper, domain, &dsts);
+       if (!dsts.local && !dsts.remote)
+               return -EHOSTUNREACH;
 
-       skb_queue_head_init(&pktchain);
+       /* Build message header */
+       msg_set_type(hdr, TIPC_MCAST_MSG);
+       msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+       msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
+       msg_set_destport(hdr, 0);
+       msg_set_destnode(hdr, 0);
+       msg_set_nametype(hdr, seq->type);
+       msg_set_namelower(hdr, seq->lower);
+       msg_set_nameupper(hdr, seq->upper);
 
-new_mtu:
-       mtu = tipc_bcast_get_mtu(net);
-       rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
-       if (unlikely(rc < 0))
-               return rc;
+       /* Build message as chain of buffers */
+       skb_queue_head_init(&pkts);
+       rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 
-       do {
-               rc = tipc_bcast_xmit(net, &pktchain);
-               if (likely(!rc))
-                       return dsz;
-
-               if (rc == -ELINKCONG) {
-                       tsk->link_cong = 1;
-                       rc = tipc_wait_for_sndmsg(sock, &timeo);
-                       if (!rc)
-                               continue;
-               }
-               __skb_queue_purge(&pktchain);
-               if (rc == -EMSGSIZE) {
-                       msg->msg_iter = save;
-                       goto new_mtu;
-               }
-               break;
-       } while (1);
-       return rc;
+       /* Send message if build was successful */
+       if (unlikely(rc == dlen))
+               rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
+                                    &tsk->cong_link_cnt);
+
+       tipc_nlist_purge(&dsts);
+
+       return rc ? rc : dlen;
 }
 
 /**
@@ -746,7 +802,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
                       struct sk_buff_head *inputq)
 {
        struct tipc_msg *msg;
-       struct tipc_plist dports;
+       struct list_head dports;
        u32 portid;
        u32 scope = TIPC_CLUSTER_SCOPE;
        struct sk_buff_head tmpq;
@@ -754,7 +810,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
        struct sk_buff *skb, *_skb;
 
        __skb_queue_head_init(&tmpq);
-       tipc_plist_init(&dports);
+       INIT_LIST_HEAD(&dports);
 
        skb = tipc_skb_peek(arrvq, &inputq->lock);
        for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
@@ -768,8 +824,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
                tipc_nametbl_mc_translate(net,
                                          msg_nametype(msg), msg_namelower(msg),
                                          msg_nameupper(msg), scope, &dports);
-               portid = tipc_plist_pop(&dports);
-               for (; portid; portid = tipc_plist_pop(&dports)) {
+               portid = u32_pop(&dports);
+               for (; portid; portid = u32_pop(&dports)) {
                        _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
                        if (_skb) {
                                msg_set_destport(buf_msg(_skb), portid);
@@ -830,31 +886,6 @@ exit:
        kfree_skb(skb);
 }
 
-static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
-{
-       DEFINE_WAIT_FUNC(wait, woken_wake_function);
-       struct sock *sk = sock->sk;
-       struct tipc_sock *tsk = tipc_sk(sk);
-       int done;
-
-       do {
-               int err = sock_error(sk);
-               if (err)
-                       return err;
-               if (sk->sk_shutdown & SEND_SHUTDOWN)
-                       return -EPIPE;
-               if (!*timeo_p)
-                       return -EAGAIN;
-               if (signal_pending(current))
-                       return sock_intr_errno(*timeo_p);
-
-               add_wait_queue(sk_sleep(sk), &wait);
-               done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait);
-               remove_wait_queue(sk_sleep(sk), &wait);
-       } while (!done);
-       return 0;
-}
-
 /**
  * tipc_sendmsg - send message in connectionless manner
  * @sock: socket structure
@@ -881,35 +912,38 @@ static int tipc_sendmsg(struct socket *sock,
        return ret;
 }
 
-static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
+static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 {
-       DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
        struct sock *sk = sock->sk;
-       struct tipc_sock *tsk = tipc_sk(sk);
        struct net *net = sock_net(sk);
-       struct tipc_msg *mhdr = &tsk->phdr;
-       u32 dnode, dport;
-       struct sk_buff_head pktchain;
-       bool is_connectionless = tipc_sk_type_connectionless(sk);
-       struct sk_buff *skb;
+       struct tipc_sock *tsk = tipc_sk(sk);
+       DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
+       long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+       struct list_head *clinks = &tsk->cong_links;
+       bool syn = !tipc_sk_type_connectionless(sk);
+       struct tipc_msg *hdr = &tsk->phdr;
        struct tipc_name_seq *seq;
-       struct iov_iter save;
-       u32 mtu;
-       long timeo;
-       int rc;
+       struct sk_buff_head pkts;
+       u32 type, inst, domain;
+       u32 dnode, dport;
+       int mtu, rc;
 
-       if (dsz > TIPC_MAX_USER_MSG_SIZE)
+       if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
                return -EMSGSIZE;
+
        if (unlikely(!dest)) {
-               if (is_connectionless && tsk->peer.family == AF_TIPC)
-                       dest = &tsk->peer;
-               else
+               dest = &tsk->peer;
+               if (!syn || dest->family != AF_TIPC)
                        return -EDESTADDRREQ;
-       } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
-                  dest->family != AF_TIPC) {
-               return -EINVAL;
        }
-       if (!is_connectionless) {
+
+       if (unlikely(m->msg_namelen < sizeof(*dest)))
+               return -EINVAL;
+
+       if (unlikely(dest->family != AF_TIPC))
+               return -EINVAL;
+
+       if (unlikely(syn)) {
                if (sk->sk_state == TIPC_LISTEN)
                        return -EPIPE;
                if (sk->sk_state != TIPC_OPEN)
@@ -921,102 +955,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
                        tsk->conn_instance = dest->addr.name.name.instance;
                }
        }
-       seq = &dest->addr.nameseq;
-       timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 
-       if (dest->addrtype == TIPC_ADDR_MCAST) {
-               return tipc_sendmcast(sock, seq, m, dsz, timeo);
-       } else if (dest->addrtype == TIPC_ADDR_NAME) {
-               u32 type = dest->addr.name.name.type;
-               u32 inst = dest->addr.name.name.instance;
-               u32 domain = dest->addr.name.domain;
+       seq = &dest->addr.nameseq;
+       if (dest->addrtype == TIPC_ADDR_MCAST)
+               return tipc_sendmcast(sock, seq, m, dlen, timeout);
 
+       if (dest->addrtype == TIPC_ADDR_NAME) {
+               type = dest->addr.name.name.type;
+               inst = dest->addr.name.name.instance;
+               domain = dest->addr.name.domain;
                dnode = domain;
-               msg_set_type(mhdr, TIPC_NAMED_MSG);
-               msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
-               msg_set_nametype(mhdr, type);
-               msg_set_nameinst(mhdr, inst);
-               msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
+               msg_set_type(hdr, TIPC_NAMED_MSG);
+               msg_set_hdr_sz(hdr, NAMED_H_SIZE);
+               msg_set_nametype(hdr, type);
+               msg_set_nameinst(hdr, inst);
+               msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
                dport = tipc_nametbl_translate(net, type, inst, &dnode);
-               msg_set_destnode(mhdr, dnode);
-               msg_set_destport(mhdr, dport);
+               msg_set_destnode(hdr, dnode);
+               msg_set_destport(hdr, dport);
                if (unlikely(!dport && !dnode))
                        return -EHOSTUNREACH;
+
        } else if (dest->addrtype == TIPC_ADDR_ID) {
                dnode = dest->addr.id.node;
-               msg_set_type(mhdr, TIPC_DIRECT_MSG);
-               msg_set_lookup_scope(mhdr, 0);
-               msg_set_destnode(mhdr, dnode);
-               msg_set_destport(mhdr, dest->addr.id.ref);
-               msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
+               msg_set_type(hdr, TIPC_DIRECT_MSG);
+               msg_set_lookup_scope(hdr, 0);
+               msg_set_destnode(hdr, dnode);
+               msg_set_destport(hdr, dest->addr.id.ref);
+               msg_set_hdr_sz(hdr, BASIC_H_SIZE);
        }
 
-       skb_queue_head_init(&pktchain);
-       save = m->msg_iter;
-new_mtu:
-       mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
-       rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
-       if (rc < 0)
+       /* Block or return if destination link is congested */
+       rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
+       if (unlikely(rc))
                return rc;
 
-       do {
-               skb = skb_peek(&pktchain);
-               TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-               rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
-               if (likely(!rc)) {
-                       if (!is_connectionless)
-                               tipc_set_sk_state(sk, TIPC_CONNECTING);
-                       return dsz;
-               }
-               if (rc == -ELINKCONG) {
-                       tsk->link_cong = 1;
-                       rc = tipc_wait_for_sndmsg(sock, &timeo);
-                       if (!rc)
-                               continue;
-               }
-               __skb_queue_purge(&pktchain);
-               if (rc == -EMSGSIZE) {
-                       m->msg_iter = save;
-                       goto new_mtu;
-               }
-               break;
-       } while (1);
-
-       return rc;
-}
+       skb_queue_head_init(&pkts);
+       mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+       rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
+       if (unlikely(rc != dlen))
+               return rc;
 
-static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
-{
-       DEFINE_WAIT_FUNC(wait, woken_wake_function);
-       struct sock *sk = sock->sk;
-       struct tipc_sock *tsk = tipc_sk(sk);
-       int done;
+       rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+       if (unlikely(rc == -ELINKCONG)) {
+               u32_push(clinks, dnode);
+               tsk->cong_link_cnt++;
+               rc = 0;
+       }
 
-       do {
-               int err = sock_error(sk);
-               if (err)
-                       return err;
-               if (sk->sk_state == TIPC_DISCONNECTING)
-                       return -EPIPE;
-               else if (!tipc_sk_connected(sk))
-                       return -ENOTCONN;
-               if (!*timeo_p)
-                       return -EAGAIN;
-               if (signal_pending(current))
-                       return sock_intr_errno(*timeo_p);
+       if (unlikely(syn && !rc))
+               tipc_set_sk_state(sk, TIPC_CONNECTING);
 
-               add_wait_queue(sk_sleep(sk), &wait);
-               done = sk_wait_event(sk, timeo_p,
-                                    (!tsk->link_cong &&
-                                     !tsk_conn_cong(tsk)) ||
-                                     !tipc_sk_connected(sk), &wait);
-               remove_wait_queue(sk_sleep(sk), &wait);
-       } while (!done);
-       return 0;
+       return rc ? rc : dlen;
 }
 
 /**
- * tipc_send_stream - send stream-oriented data
+ * tipc_sendstream - send stream-oriented data
  * @sock: socket structure
  * @m: data to send
  * @dsz: total length of data to be transmitted
@@ -1026,94 +1020,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
  * Returns the number of bytes sent on success (or partial success),
  * or errno if no data sent
  */
-static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
+static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
 {
        struct sock *sk = sock->sk;
        int ret;
 
        lock_sock(sk);
-       ret = __tipc_send_stream(sock, m, dsz);
+       ret = __tipc_sendstream(sock, m, dsz);
        release_sock(sk);
 
        return ret;
 }
 
-static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
+static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
 {
        struct sock *sk = sock->sk;
-       struct net *net = sock_net(sk);
-       struct tipc_sock *tsk = tipc_sk(sk);
-       struct tipc_msg *mhdr = &tsk->phdr;
-       struct sk_buff_head pktchain;
        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-       u32 portid = tsk->portid;
-       int rc = -EINVAL;
-       long timeo;
-       u32 dnode;
-       uint mtu, send, sent = 0;
-       struct iov_iter save;
-       int hlen = MIN_H_SIZE;
-
-       /* Handle implied connection establishment */
-       if (unlikely(dest)) {
-               rc = __tipc_sendmsg(sock, m, dsz);
-               hlen = msg_hdr_sz(mhdr);
-               if (dsz && (dsz == rc))
-                       tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
-               return rc;
-       }
-       if (dsz > (uint)INT_MAX)
-               return -EMSGSIZE;
-
-       if (unlikely(!tipc_sk_connected(sk))) {
-               if (sk->sk_state == TIPC_DISCONNECTING)
-                       return -EPIPE;
-               else
-                       return -ENOTCONN;
-       }
+       long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+       struct tipc_sock *tsk = tipc_sk(sk);
+       struct tipc_msg *hdr = &tsk->phdr;
+       struct net *net = sock_net(sk);
+       struct sk_buff_head pkts;
+       u32 dnode = tsk_peer_node(tsk);
+       int send, sent = 0;
+       int rc = 0;
 
-       timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
-       if (!timeo && tsk->link_cong)
-               return -ELINKCONG;
+       skb_queue_head_init(&pkts);
 
-       dnode = tsk_peer_node(tsk);
-       skb_queue_head_init(&pktchain);
+       if (unlikely(dlen > INT_MAX))
+               return -EMSGSIZE;
 
-next:
-       save = m->msg_iter;
-       mtu = tsk->max_pkt;
-       send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
-       rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
-       if (unlikely(rc < 0))
+       /* Handle implicit connection setup */
+       if (unlikely(dest)) {
+               rc = __tipc_sendmsg(sock, m, dlen);
+               if (dlen && (dlen == rc))
+                       tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
                return rc;
+       }
 
        do {
-               if (likely(!tsk_conn_cong(tsk))) {
-                       rc = tipc_node_xmit(net, &pktchain, dnode, portid);
-                       if (likely(!rc)) {
-                               tsk->snt_unacked += tsk_inc(tsk, send + hlen);
-                               sent += send;
-                               if (sent == dsz)
-                                       return dsz;
-                               goto next;
-                       }
-                       if (rc == -EMSGSIZE) {
-                               __skb_queue_purge(&pktchain);
-                               tsk->max_pkt = tipc_node_get_mtu(net, dnode,
-                                                                portid);
-                               m->msg_iter = save;
-                               goto next;
-                       }
-                       if (rc != -ELINKCONG)
-                               break;
+               rc = tipc_wait_for_cond(sock, &timeout,
+                                       (!tsk->cong_link_cnt &&
+                                        !tsk_conn_cong(tsk) &&
+                                        tipc_sk_connected(sk)));
+               if (unlikely(rc))
+                       break;
 
-                       tsk->link_cong = 1;
+               send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
+               rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
+               if (unlikely(rc != send))
+                       break;
+
+               rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+               if (unlikely(rc == -ELINKCONG)) {
+                       tsk->cong_link_cnt = 1;
+                       rc = 0;
                }
-               rc = tipc_wait_for_sndpkt(sock, &timeo);
-       } while (!rc);
+               if (likely(!rc)) {
+                       tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
+                       sent += send;
+               }
+       } while (sent < dlen && !rc);
 
-       __skb_queue_purge(&pktchain);
-       return sent ? sent : rc;
+       return rc ? rc : sent;
 }
 
 /**
@@ -1131,7 +1100,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
        if (dsz > TIPC_MAX_USER_MSG_SIZE)
                return -EMSGSIZE;
 
-       return tipc_send_stream(sock, m, dsz);
+       return tipc_sendstream(sock, m, dsz);
 }
 
 /* tipc_sk_finish_conn - complete the setup of a connection
@@ -1698,6 +1667,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
        unsigned int limit = rcvbuf_limit(sk, skb);
        int err = TIPC_OK;
        int usr = msg_user(hdr);
+       u32 onode;
 
        if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
                tipc_sk_proto_rcv(tsk, skb, xmitq);
@@ -1705,8 +1675,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
        }
 
        if (unlikely(usr == SOCK_WAKEUP)) {
+               onode = msg_orignode(hdr);
                kfree_skb(skb);
-               tsk->link_cong = 0;
+               u32_del(&tsk->cong_links, onode);
+               tsk->cong_link_cnt--;
                sk->sk_write_space(sk);
                return false;
        }
@@ -2114,7 +2086,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
                struct msghdr m = {NULL,};
 
                tsk_advance_rx_queue(sk);
-               __tipc_send_stream(new_sock, &m, 0);
+               __tipc_sendstream(new_sock, &m, 0);
        } else {
                __skb_dequeue(&sk->sk_receive_queue);
                __skb_queue_head(&new_sk->sk_receive_queue, buf);
@@ -2269,24 +2241,27 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 void tipc_sk_reinit(struct net *net)
 {
        struct tipc_net *tn = net_generic(net, tipc_net_id);
-       const struct bucket_table *tbl;
-       struct rhash_head *pos;
+       struct rhashtable_iter iter;
        struct tipc_sock *tsk;
        struct tipc_msg *msg;
-       int i;
 
-       rcu_read_lock();
-       tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
-       for (i = 0; i < tbl->size; i++) {
-               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
+       rhashtable_walk_enter(&tn->sk_rht, &iter);
+
+       do {
+               tsk = ERR_PTR(rhashtable_walk_start(&iter));
+               if (tsk)
+                       continue;
+
+               while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
                        spin_lock_bh(&tsk->sk.sk_lock.slock);
                        msg = &tsk->phdr;
                        msg_set_prevnode(msg, tn->own_addr);
                        msg_set_orignode(msg, tn->own_addr);
                        spin_unlock_bh(&tsk->sk.sk_lock.slock);
                }
-       }
-       rcu_read_unlock();
+
+               rhashtable_walk_stop(&iter);
+       } while (tsk == ERR_PTR(-EAGAIN));
 }
 
 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
@@ -2382,18 +2357,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
 {
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
-       u32 value;
-       int res;
+       u32 value = 0;
+       int res = 0;
 
        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
                return 0;
        if (lvl != SOL_TIPC)
                return -ENOPROTOOPT;
-       if (ol < sizeof(value))
-               return -EINVAL;
-       res = get_user(value, (u32 __user *)ov);
-       if (res)
-               return res;
+
+       switch (opt) {
+       case TIPC_IMPORTANCE:
+       case TIPC_SRC_DROPPABLE:
+       case TIPC_DEST_DROPPABLE:
+       case TIPC_CONN_TIMEOUT:
+               if (ol < sizeof(value))
+                       return -EINVAL;
+               res = get_user(value, (u32 __user *)ov);
+               if (res)
+                       return res;
+               break;
+       default:
+               if (ov || ol)
+                       return -EINVAL;
+       }
 
        lock_sock(sk);
 
@@ -2412,7 +2398,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
                break;
        case TIPC_CONN_TIMEOUT:
                tipc_sk(sk)->conn_timeout = value;
-               /* no need to set "res", since already 0 at this point */
+               break;
+       case TIPC_MCAST_BROADCAST:
+               tsk->mc_method.rcast = false;
+               tsk->mc_method.mandatory = true;
+               break;
+       case TIPC_MCAST_REPLICAST:
+               tsk->mc_method.rcast = true;
+               tsk->mc_method.mandatory = true;
                break;
        default:
                res = -EINVAL;
@@ -2575,7 +2568,7 @@ static const struct proto_ops stream_ops = {
        .shutdown       = tipc_shutdown,
        .setsockopt     = tipc_setsockopt,
        .getsockopt     = tipc_getsockopt,
-       .sendmsg        = tipc_send_stream,
+       .sendmsg        = tipc_sendstream,
        .recvmsg        = tipc_recv_stream,
        .mmap           = sock_no_mmap,
        .sendpage       = sock_no_sendpage