]> git.proxmox.com Git - mirror_frr.git/blobdiff - lib/frr_zmq.c
Merge pull request #11003 from anlancs/bgpd-mh-trival-remove
[mirror_frr.git] / lib / frr_zmq.c
index ea9c828f7c00af80b564912fefa6a223ca7acca5..db5c4c91a256b71fcfc2d2e42ffa14c250fc075a 100644 (file)
@@ -19,7 +19,7 @@
 
 /*
  * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that
- * the test_zmq.c unit test is still working.  There are dependancies
+ * the test_zmq.c unit test is still working.  There are dependencies
  * between the two that are extremely fragile.  My understanding
  * is that there is specialized ownership of the cb pointer based
  * upon what is happening.  Those assumptions are supposed to be
@@ -56,7 +56,7 @@ void frrzmq_finish(void)
        }
 }
 
-static int frrzmq_read_msg(struct thread *t)
+static void frrzmq_read_msg(struct thread *t)
 {
        struct frrzmq_cb **cbp = THREAD_ARG(t);
        struct frrzmq_cb *cb;
@@ -67,10 +67,10 @@ static int frrzmq_read_msg(struct thread *t)
        size_t moresz;
 
        if (!cbp)
-               return 1;
+               return;
        cb = (*cbp);
        if (!cb || !cb->zmqsock)
-               return 1;
+               return;
 
        while (1) {
                zmq_pollitem_t polli = {.socket = cb->zmqsock,
@@ -84,7 +84,10 @@ static int frrzmq_read_msg(struct thread *t)
                        break;
 
                if (cb->read.cb_msg) {
+                       cb->in_cb = true;
                        cb->read.cb_msg(cb->read.arg, cb->zmqsock);
+                       cb->in_cb = false;
+
                        read = 1;
 
                        if (cb->read.cancelled) {
@@ -92,8 +95,9 @@ static int frrzmq_read_msg(struct thread *t)
                                                    ZMQ_POLLOUT);
                                cb->read.thread = NULL;
                                if (cb->write.cancelled && !cb->write.thread)
-                                       XFREE(MTYPE_ZEROMQ_CB, cb);
-                               return 0;
+                                       XFREE(MTYPE_ZEROMQ_CB, *cbp);
+
+                               return;
                        }
                        continue;
                }
@@ -112,16 +116,20 @@ static int frrzmq_read_msg(struct thread *t)
                        }
                        read = 1;
 
+                       cb->in_cb = true;
                        cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
                                         partno);
+                       cb->in_cb = false;
+
                        if (cb->read.cancelled) {
                                zmq_msg_close(&msg);
                                frrzmq_check_events(cbp, &cb->write,
                                                    ZMQ_POLLOUT);
                                cb->read.thread = NULL;
                                if (cb->write.cancelled && !cb->write.thread)
-                                       XFREE(MTYPE_ZEROMQ_CB, cb);
-                               return 0;
+                                       XFREE(MTYPE_ZEROMQ_CB, *cbp);
+
+                               return;
                        }
 
                        /* cb_part may have read additional parts of the
@@ -145,14 +153,13 @@ static int frrzmq_read_msg(struct thread *t)
 
        thread_add_read(t->master, frrzmq_read_msg, cbp,
                        cb->fd, &cb->read.thread);
-       return 0;
+       return;
 
 out_err:
        flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
                 errno);
        if (cb->read.cb_error)
                cb->read.cb_error(cb->read.arg, cb->zmqsock);
-       return 1;
 }
 
 int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
@@ -183,7 +190,6 @@ int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
                cb = *cbp;
        else {
                cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
-
                cb->write.cancelled = true;
                *cbp = cb;
        }
@@ -195,6 +201,7 @@ int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
        cb->read.cb_part = partfunc;
        cb->read.cb_error = errfunc;
        cb->read.cancelled = false;
+       cb->in_cb = false;
 
        if (events & ZMQ_POLLIN) {
                thread_cancel(&cb->read.thread);
@@ -207,7 +214,7 @@ int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
        return 0;
 }
 
-static int frrzmq_write_msg(struct thread *t)
+static void frrzmq_write_msg(struct thread *t)
 {
        struct frrzmq_cb **cbp = THREAD_ARG(t);
        struct frrzmq_cb *cb;
@@ -215,10 +222,10 @@ static int frrzmq_write_msg(struct thread *t)
        int ret;
 
        if (!cbp)
-               return 1;
+               return;
        cb = (*cbp);
        if (!cb || !cb->zmqsock)
-               return 1;
+               return;
 
        while (1) {
                zmq_pollitem_t polli = {.socket = cb->zmqsock,
@@ -232,15 +239,19 @@ static int frrzmq_write_msg(struct thread *t)
                        break;
 
                if (cb->write.cb_msg) {
+                       cb->in_cb = true;
                        cb->write.cb_msg(cb->write.arg, cb->zmqsock);
+                       cb->in_cb = false;
+
                        written = 1;
 
                        if (cb->write.cancelled) {
                                frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
                                cb->write.thread = NULL;
                                if (cb->read.cancelled && !cb->read.thread)
-                                       XFREE(MTYPE_ZEROMQ_CB, cb);
-                               return 0;
+                                       XFREE(MTYPE_ZEROMQ_CB, *cbp);
+
+                               return;
                        }
                        continue;
                }
@@ -251,14 +262,13 @@ static int frrzmq_write_msg(struct thread *t)
 
        thread_add_write(t->master, frrzmq_write_msg, cbp,
                         cb->fd, &cb->write.thread);
-       return 0;
+       return;
 
 out_err:
        flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
                 errno);
        if (cb->write.cb_error)
                cb->write.cb_error(cb->write.arg, cb->zmqsock);
-       return 1;
 }
 
 int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
@@ -286,7 +296,6 @@ int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
                cb = *cbp;
        else {
                cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
-
                cb->read.cancelled = true;
                *cbp = cb;
        }
@@ -298,6 +307,7 @@ int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
        cb->write.cb_part = NULL;
        cb->write.cb_error = errfunc;
        cb->write.cancelled = false;
+       cb->in_cb = false;
 
        if (events & ZMQ_POLLOUT) {
                thread_cancel(&cb->write.thread);
@@ -317,22 +327,15 @@ void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
        core->cancelled = true;
        thread_cancel(&core->thread);
 
-       /*
-        * Looking at this code one would assume that FRR
-        * would want a `!(*cb)->write.thread.  This was
-        * attempted in e08165def1c62beee0e87385 but this
-        * change caused `make check` to stop working
-        * which was not noticed because our CI system
-        * does not build with zeromq.  Put this back
-        * to the code as written in 2017.  e08165de..
-        * was introduced in 2021.  So someone was ok
-        * with frrzmq_thread_cancel for 4 years.  This will
-        * allow those people doing `make check` to continue
-        * working.  In the meantime if the people using
-        * this code see an issue they can fix it
+       /* If cancelled from within a callback, don't try to free memory
+        * in this path.
         */
+       if ((*cb)->in_cb)
+               return;
+
+       /* Ok to free the callback context if no more ... context. */
        if ((*cb)->read.cancelled && !(*cb)->read.thread
-           && (*cb)->write.cancelled && (*cb)->write.thread)
+           && (*cb)->write.cancelled && ((*cb)->write.thread == NULL))
                XFREE(MTYPE_ZEROMQ_CB, *cb);
 }