]> git.proxmox.com Git - mirror_ubuntu-kernels.git/commitdiff
net/smc: use worker to process incoming llc messages
authorKarsten Graul <kgraul@linux.ibm.com>
Wed, 29 Apr 2020 15:10:46 +0000 (17:10 +0200)
committerDavid S. Miller <davem@davemloft.net>
Wed, 29 Apr 2020 19:26:32 +0000 (12:26 -0700)
Incoming llc messages are processed in irq tasklet context, and
a worker is used to send outgoing messages. The worker is needed
because getting a send buffer could result in a wait for a free buffer.

To make sure all incoming llc messages are processed in a serialized way
introduce an event queue and create a new queue entry for each message
which is queued to this event queue. A new worker processes the event
queue entries in order.
And remove the use of a separate worker to send outgoing llc messages
because the messages are processed in worker context already.
With this event queue the serialized llc_wq work queue is obsolete,
remove it.

Signed-off-by: Karsten Graul <kgraul@linux.ibm.com>
Reviewed-by: Ursula Braun <ubraun@linux.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/smc/smc_core.c
net/smc/smc_core.h
net/smc/smc_llc.c
net/smc/smc_llc.h

index 78ccfbf6e4afbc34980360a860270b2013107d35..a1463da1461413ebb5e2d8d08f442f4ae255754f 100644 (file)
@@ -412,7 +412,8 @@ static int smc_lgr_create(struct smc_sock *smc, struct smc_init_info *ini)
                lgr->role = smc->listen_smc ? SMC_SERV : SMC_CLNT;
                memcpy(lgr->peer_systemid, ini->ib_lcl->id_for_peer,
                       SMC_SYSTEMID_LEN);
-
+               INIT_LIST_HEAD(&lgr->llc_event_q);
+               spin_lock_init(&lgr->llc_event_q_lock);
                link_idx = SMC_SINGLE_LINK;
                lnk = &lgr->lnk[link_idx];
                rc = smcr_link_init(lgr, lnk, link_idx, ini);
@@ -613,6 +614,7 @@ static void smc_lgr_free(struct smc_link_group *lgr)
                        if (lgr->lnk[i].state != SMC_LNK_UNUSED)
                                smcr_link_clear(&lgr->lnk[i]);
                }
+               smc_llc_event_flush(lgr);
                if (!atomic_dec_return(&lgr_cnt))
                        wake_up(&lgrs_deleted);
        }
index 2b1960c8c8ce08b18fac325a75b5a3922b1a967d..6548e9a06f7314772e2dce061d9fbdc27932ff90 100644 (file)
@@ -120,7 +120,6 @@ struct smc_link {
        struct smc_link_group   *lgr;           /* parent link group */
 
        enum smc_link_state     state;          /* state of link */
-       struct workqueue_struct *llc_wq;        /* single thread work queue */
        struct completion       llc_confirm;    /* wait for rx of conf link */
        struct completion       llc_confirm_resp; /* wait 4 rx of cnf lnk rsp */
        int                     llc_confirm_rc; /* rc from confirm link msg */
@@ -233,6 +232,12 @@ struct smc_link_group {
                        DECLARE_BITMAP(rtokens_used_mask, SMC_RMBS_PER_LGR_MAX);
                                                /* used rtoken elements */
                        u8                      next_link_id;
+                       struct list_head        llc_event_q;
+                                               /* queue for llc events */
+                       spinlock_t              llc_event_q_lock;
+                                               /* protects llc_event_q */
+                       struct work_struct      llc_event_work;
+                                               /* llc event worker */
                };
                struct { /* SMC-D */
                        u64                     peer_gid;
index 2f03131c85fdd550647896690735ce5c747cf9b1..be74876a36aefe74fd7793fc0aa509e1aa9f13d1 100644 (file)
@@ -134,6 +134,12 @@ union smc_llc_msg {
 
 #define SMC_LLC_FLAG_RESP              0x80
 
+struct smc_llc_qentry {
+       struct list_head list;
+       struct smc_link *link;
+       union smc_llc_msg msg;
+};
+
 /********************************** send *************************************/
 
 struct smc_llc_tx_pend {
@@ -356,46 +362,20 @@ static int smc_llc_send_test_link(struct smc_link *link, u8 user_data[16])
        return rc;
 }
 
-struct smc_llc_send_work {
-       struct work_struct work;
-       struct smc_link *link;
-       int llclen;
-       union smc_llc_msg llcbuf;
-};
-
-/* worker that sends a prepared message */
-static void smc_llc_send_message_work(struct work_struct *work)
+/* schedule an llc send on link, may wait for buffers */
+static int smc_llc_send_message(struct smc_link *link, void *llcbuf)
 {
-       struct smc_llc_send_work *llcwrk = container_of(work,
-                                               struct smc_llc_send_work, work);
        struct smc_wr_tx_pend_priv *pend;
        struct smc_wr_buf *wr_buf;
        int rc;
 
-       if (!smc_link_usable(llcwrk->link))
-               goto out;
-       rc = smc_llc_add_pending_send(llcwrk->link, &wr_buf, &pend);
+       if (!smc_link_usable(link))
+               return -ENOLINK;
+       rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
        if (rc)
-               goto out;
-       memcpy(wr_buf, &llcwrk->llcbuf, llcwrk->llclen);
-       smc_wr_tx_send(llcwrk->link, pend);
-out:
-       kfree(llcwrk);
-}
-
-/* copy llcbuf and schedule an llc send on link */
-static int smc_llc_send_message(struct smc_link *link, void *llcbuf, int llclen)
-{
-       struct smc_llc_send_work *wrk = kmalloc(sizeof(*wrk), GFP_ATOMIC);
-
-       if (!wrk)
-               return -ENOMEM;
-       INIT_WORK(&wrk->work, smc_llc_send_message_work);
-       wrk->link = link;
-       wrk->llclen = llclen;
-       memcpy(&wrk->llcbuf, llcbuf, llclen);
-       queue_work(link->llc_wq, &wrk->work);
-       return 0;
+               return rc;
+       memcpy(wr_buf, llcbuf, sizeof(union smc_llc_msg));
+       return smc_wr_tx_send(link, pend);
 }
 
 /********************************* receive ***********************************/
@@ -452,7 +432,7 @@ static void smc_llc_rx_add_link(struct smc_link *link,
                                        link->smcibdev->mac[link->ibport - 1],
                                        link->gid, SMC_LLC_RESP);
                }
-               smc_llc_send_message(link, llc, sizeof(*llc));
+               smc_llc_send_message(link, llc);
        }
 }
 
@@ -474,7 +454,7 @@ static void smc_llc_rx_delete_link(struct smc_link *link,
                        /* server requests to delete this link, send response */
                        smc_llc_prep_delete_link(llc, link, SMC_LLC_RESP, true);
                }
-               smc_llc_send_message(link, llc, sizeof(*llc));
+               smc_llc_send_message(link, llc);
                smc_lgr_terminate_sched(lgr);
        }
 }
@@ -487,7 +467,7 @@ static void smc_llc_rx_test_link(struct smc_link *link,
                        complete(&link->llc_testlink_resp);
        } else {
                llc->hd.flags |= SMC_LLC_FLAG_RESP;
-               smc_llc_send_message(link, llc, sizeof(*llc));
+               smc_llc_send_message(link, llc);
        }
 }
 
@@ -510,7 +490,7 @@ static void smc_llc_rx_confirm_rkey(struct smc_link *link,
                llc->hd.flags |= SMC_LLC_FLAG_RESP;
                if (rc < 0)
                        llc->hd.flags |= SMC_LLC_FLAG_RKEY_NEG;
-               smc_llc_send_message(link, llc, sizeof(*llc));
+               smc_llc_send_message(link, llc);
        }
 }
 
@@ -522,7 +502,7 @@ static void smc_llc_rx_confirm_rkey_cont(struct smc_link *link,
        } else {
                /* ignore rtokens for other links, we have only one link */
                llc->hd.flags |= SMC_LLC_FLAG_RESP;
-               smc_llc_send_message(link, llc, sizeof(*llc));
+               smc_llc_send_message(link, llc);
        }
 }
 
@@ -549,21 +529,30 @@ static void smc_llc_rx_delete_rkey(struct smc_link *link,
                }
 
                llc->hd.flags |= SMC_LLC_FLAG_RESP;
-               smc_llc_send_message(link, llc, sizeof(*llc));
+               smc_llc_send_message(link, llc);
        }
 }
 
-static void smc_llc_rx_handler(struct ib_wc *wc, void *buf)
+/* flush the llc event queue */
+void smc_llc_event_flush(struct smc_link_group *lgr)
 {
-       struct smc_link *link = (struct smc_link *)wc->qp->qp_context;
-       union smc_llc_msg *llc = buf;
+       struct smc_llc_qentry *qentry, *q;
+
+       spin_lock_bh(&lgr->llc_event_q_lock);
+       list_for_each_entry_safe(qentry, q, &lgr->llc_event_q, list) {
+               list_del_init(&qentry->list);
+               kfree(qentry);
+       }
+       spin_unlock_bh(&lgr->llc_event_q_lock);
+}
+
+static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
+{
+       union smc_llc_msg *llc = &qentry->msg;
+       struct smc_link *link = qentry->link;
 
-       if (wc->byte_len < sizeof(*llc))
-               return; /* short message */
-       if (llc->raw.hdr.length != sizeof(*llc))
-               return; /* invalid message */
        if (!smc_link_usable(link))
-               return; /* link not active, drop msg */
+               goto out;
 
        switch (llc->raw.hdr.common.type) {
        case SMC_LLC_TEST_LINK:
@@ -588,6 +577,54 @@ static void smc_llc_rx_handler(struct ib_wc *wc, void *buf)
                smc_llc_rx_delete_rkey(link, &llc->delete_rkey);
                break;
        }
+out:
+       kfree(qentry);
+}
+
+/* worker to process llc messages on the event queue */
+static void smc_llc_event_work(struct work_struct *work)
+{
+       struct smc_link_group *lgr = container_of(work, struct smc_link_group,
+                                                 llc_event_work);
+       struct smc_llc_qentry *qentry;
+
+again:
+       spin_lock_bh(&lgr->llc_event_q_lock);
+       if (!list_empty(&lgr->llc_event_q)) {
+               qentry = list_first_entry(&lgr->llc_event_q,
+                                         struct smc_llc_qentry, list);
+               list_del_init(&qentry->list);
+               spin_unlock_bh(&lgr->llc_event_q_lock);
+               smc_llc_event_handler(qentry);
+               goto again;
+       }
+       spin_unlock_bh(&lgr->llc_event_q_lock);
+}
+
+/* copy received msg and add it to the event queue */
+static void smc_llc_rx_handler(struct ib_wc *wc, void *buf)
+{
+       struct smc_link *link = (struct smc_link *)wc->qp->qp_context;
+       struct smc_link_group *lgr = link->lgr;
+       struct smc_llc_qentry *qentry;
+       union smc_llc_msg *llc = buf;
+       unsigned long flags;
+
+       if (wc->byte_len < sizeof(*llc))
+               return; /* short message */
+       if (llc->raw.hdr.length != sizeof(*llc))
+               return; /* invalid message */
+
+       qentry = kmalloc(sizeof(*qentry), GFP_ATOMIC);
+       if (!qentry)
+               return;
+       qentry->link = link;
+       INIT_LIST_HEAD(&qentry->list);
+       memcpy(&qentry->msg, llc, sizeof(union smc_llc_msg));
+       spin_lock_irqsave(&lgr->llc_event_q_lock, flags);
+       list_add_tail(&qentry->list, &lgr->llc_event_q);
+       spin_unlock_irqrestore(&lgr->llc_event_q_lock, flags);
+       schedule_work(&link->lgr->llc_event_work);
 }
 
 /***************************** worker, utils *********************************/
@@ -626,12 +663,6 @@ out:
 
 int smc_llc_link_init(struct smc_link *link)
 {
-       struct smc_link_group *lgr = smc_get_lgr(link);
-       link->llc_wq = alloc_ordered_workqueue("llc_wq-%x:%x)", WQ_MEM_RECLAIM,
-                                              *((u32 *)lgr->id),
-                                              link->link_id);
-       if (!link->llc_wq)
-               return -ENOMEM;
        init_completion(&link->llc_confirm);
        init_completion(&link->llc_confirm_resp);
        init_completion(&link->llc_add);
@@ -640,6 +671,7 @@ int smc_llc_link_init(struct smc_link *link)
        init_completion(&link->llc_delete_rkey);
        mutex_init(&link->llc_delete_rkey_mutex);
        init_completion(&link->llc_testlink_resp);
+       INIT_WORK(&link->lgr->llc_event_work, smc_llc_event_work);
        INIT_DELAYED_WORK(&link->llc_testlink_wrk, smc_llc_testlink_work);
        return 0;
 }
@@ -663,8 +695,6 @@ void smc_llc_link_deleting(struct smc_link *link)
 /* called in worker context */
 void smc_llc_link_clear(struct smc_link *link)
 {
-       flush_workqueue(link->llc_wq);
-       destroy_workqueue(link->llc_wq);
        complete(&link->llc_testlink_resp);
        cancel_delayed_work_sync(&link->llc_testlink_wrk);
        smc_wr_wakeup_reg_wait(link);
index c2c9d48d079fad487edb3579de6498d83fed5b29..9de83495ad1482ce0e753c7e9eb2565140943fad 100644 (file)
@@ -61,6 +61,7 @@ int smc_llc_do_confirm_rkey(struct smc_link *link,
                            struct smc_buf_desc *rmb_desc);
 int smc_llc_do_delete_rkey(struct smc_link *link,
                           struct smc_buf_desc *rmb_desc);
+void smc_llc_event_flush(struct smc_link_group *lgr);
 int smc_llc_init(void) __init;
 
 #endif /* SMC_LLC_H */