]> git.proxmox.com Git - mirror_frr.git/blobdiff - lib/frr_zmq.c
*: Convert thread_cancelXXX to event_cancelXXX
[mirror_frr.git] / lib / frr_zmq.c
index e297985f9457c5fbb95cfbde80658b5b79afc715..4a860fe253f589584730adefe5e9a20160e6ce36 100644 (file)
@@ -1,20 +1,7 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
 /*
  * libzebra ZeroMQ bindings
  * Copyright (C) 2015  David Lamparter
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the Free
- * Software Foundation; either version 2 of the License, or (at your option)
- * any later version.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; see the file COPYING; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
 /*
@@ -28,7 +15,7 @@
 #include <zebra.h>
 #include <zmq.h>
 
-#include "thread.h"
+#include "event.h"
 #include "memory.h"
 #include "frr_zmq.h"
 #include "log.h"
@@ -56,7 +43,7 @@ void frrzmq_finish(void)
        }
 }
 
-static int frrzmq_read_msg(struct thread *t)
+static void frrzmq_read_msg(struct event *t)
 {
        struct frrzmq_cb **cbp = THREAD_ARG(t);
        struct frrzmq_cb *cb;
@@ -67,10 +54,10 @@ static int frrzmq_read_msg(struct thread *t)
        size_t moresz;
 
        if (!cbp)
-               return 1;
+               return;
        cb = (*cbp);
        if (!cb || !cb->zmqsock)
-               return 1;
+               return;
 
        while (1) {
                zmq_pollitem_t polli = {.socket = cb->zmqsock,
@@ -84,7 +71,10 @@ static int frrzmq_read_msg(struct thread *t)
                        break;
 
                if (cb->read.cb_msg) {
+                       cb->in_cb = true;
                        cb->read.cb_msg(cb->read.arg, cb->zmqsock);
+                       cb->in_cb = false;
+
                        read = 1;
 
                        if (cb->read.cancelled) {
@@ -92,8 +82,9 @@ static int frrzmq_read_msg(struct thread *t)
                                                    ZMQ_POLLOUT);
                                cb->read.thread = NULL;
                                if (cb->write.cancelled && !cb->write.thread)
-                                       XFREE(MTYPE_ZEROMQ_CB, cb);
-                               return 0;
+                                       XFREE(MTYPE_ZEROMQ_CB, *cbp);
+
+                               return;
                        }
                        continue;
                }
@@ -112,16 +103,20 @@ static int frrzmq_read_msg(struct thread *t)
                        }
                        read = 1;
 
+                       cb->in_cb = true;
                        cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
                                         partno);
+                       cb->in_cb = false;
+
                        if (cb->read.cancelled) {
                                zmq_msg_close(&msg);
                                frrzmq_check_events(cbp, &cb->write,
                                                    ZMQ_POLLOUT);
                                cb->read.thread = NULL;
                                if (cb->write.cancelled && !cb->write.thread)
-                                       XFREE(MTYPE_ZEROMQ_CB, cb);
-                               return 0;
+                                       XFREE(MTYPE_ZEROMQ_CB, *cbp);
+
+                               return;
                        }
 
                        /* cb_part may have read additional parts of the
@@ -143,26 +138,24 @@ static int frrzmq_read_msg(struct thread *t)
        if (read)
                frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
 
-       thread_add_read(t->master, frrzmq_read_msg, cbp,
-                       cb->fd, &cb->read.thread);
-       return 0;
+       event_add_read(t->master, frrzmq_read_msg, cbp, cb->fd,
+                      &cb->read.thread);
+       return;
 
 out_err:
        flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
                 errno);
        if (cb->read.cb_error)
                cb->read.cb_error(cb->read.arg, cb->zmqsock);
-       return 1;
 }
 
-int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
-                           struct thread_master *master,
-                           void (*msgfunc)(void *arg, void *zmqsock),
-                           void (*partfunc)(void *arg, void *zmqsock,
-                                            zmq_msg_t *msg, unsigned partnum),
-                           void (*errfunc)(void *arg, void *zmqsock),
-                           void *arg, void *zmqsock,
-                           struct frrzmq_cb **cbp)
+int _frrzmq_event_add_read(const struct xref_threadsched *xref,
+                          struct thread_master *master,
+                          void (*msgfunc)(void *arg, void *zmqsock),
+                          void (*partfunc)(void *arg, void *zmqsock,
+                                           zmq_msg_t *msg, unsigned partnum),
+                          void (*errfunc)(void *arg, void *zmqsock), void *arg,
+                          void *zmqsock, struct frrzmq_cb **cbp)
 {
        int fd, events;
        size_t len;
@@ -183,7 +176,6 @@ int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
                cb = *cbp;
        else {
                cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
-
                cb->write.cancelled = true;
                *cbp = cb;
        }
@@ -195,19 +187,20 @@ int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
        cb->read.cb_part = partfunc;
        cb->read.cb_error = errfunc;
        cb->read.cancelled = false;
+       cb->in_cb = false;
 
        if (events & ZMQ_POLLIN) {
-               thread_cancel(&cb->read.thread);
+               event_cancel(&cb->read.thread);
 
-               thread_add_event(master, frrzmq_read_msg, cbp, fd,
-                                 &cb->read.thread);
-       } else
-               thread_add_read(master, frrzmq_read_msg, cbp, fd,
+               event_add_event(master, frrzmq_read_msg, cbp, fd,
                                &cb->read.thread);
+       } else
+               event_add_read(master, frrzmq_read_msg, cbp, fd,
+                              &cb->read.thread);
        return 0;
 }
 
-static int frrzmq_write_msg(struct thread *t)
+static void frrzmq_write_msg(struct event *t)
 {
        struct frrzmq_cb **cbp = THREAD_ARG(t);
        struct frrzmq_cb *cb;
@@ -215,10 +208,10 @@ static int frrzmq_write_msg(struct thread *t)
        int ret;
 
        if (!cbp)
-               return 1;
+               return;
        cb = (*cbp);
        if (!cb || !cb->zmqsock)
-               return 1;
+               return;
 
        while (1) {
                zmq_pollitem_t polli = {.socket = cb->zmqsock,
@@ -232,15 +225,19 @@ static int frrzmq_write_msg(struct thread *t)
                        break;
 
                if (cb->write.cb_msg) {
+                       cb->in_cb = true;
                        cb->write.cb_msg(cb->write.arg, cb->zmqsock);
+                       cb->in_cb = false;
+
                        written = 1;
 
                        if (cb->write.cancelled) {
                                frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
                                cb->write.thread = NULL;
                                if (cb->read.cancelled && !cb->read.thread)
-                                       XFREE(MTYPE_ZEROMQ_CB, cb);
-                               return 0;
+                                       XFREE(MTYPE_ZEROMQ_CB, *cbp);
+
+                               return;
                        }
                        continue;
                }
@@ -249,23 +246,22 @@ static int frrzmq_write_msg(struct thread *t)
        if (written)
                frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
 
-       thread_add_write(t->master, frrzmq_write_msg, cbp,
-                        cb->fd, &cb->write.thread);
-       return 0;
+       event_add_write(t->master, frrzmq_write_msg, cbp, cb->fd,
+                       &cb->write.thread);
+       return;
 
 out_err:
        flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
                 errno);
        if (cb->write.cb_error)
                cb->write.cb_error(cb->write.arg, cb->zmqsock);
-       return 1;
 }
 
-int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
-                            struct thread_master *master,
-                            void (*msgfunc)(void *arg, void *zmqsock),
-                            void (*errfunc)(void *arg, void *zmqsock),
-                            void *arg, void *zmqsock, struct frrzmq_cb **cbp)
+int _frrzmq_event_add_write(const struct xref_threadsched *xref,
+                           struct thread_master *master,
+                           void (*msgfunc)(void *arg, void *zmqsock),
+                           void (*errfunc)(void *arg, void *zmqsock),
+                           void *arg, void *zmqsock, struct frrzmq_cb **cbp)
 {
        int fd, events;
        size_t len;
@@ -286,7 +282,6 @@ int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
                cb = *cbp;
        else {
                cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
-
                cb->read.cancelled = true;
                *cbp = cb;
        }
@@ -298,15 +293,16 @@ int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
        cb->write.cb_part = NULL;
        cb->write.cb_error = errfunc;
        cb->write.cancelled = false;
+       cb->in_cb = false;
 
        if (events & ZMQ_POLLOUT) {
-               thread_cancel(&cb->write.thread);
+               event_cancel(&cb->write.thread);
 
-               _thread_add_event(xref, master, frrzmq_write_msg, cbp, fd,
-                                 &cb->write.thread);
-       } else
-               thread_add_write(master, frrzmq_write_msg, cbp, fd,
+               _event_add_event(xref, master, frrzmq_write_msg, cbp, fd,
                                 &cb->write.thread);
+       } else
+               event_add_write(master, frrzmq_write_msg, cbp, fd,
+                               &cb->write.thread);
        return 0;
 }
 
@@ -315,24 +311,17 @@ void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
        if (!cb || !*cb)
                return;
        core->cancelled = true;
-       thread_cancel(&core->thread);
-
-       /*
-        * Looking at this code one would assume that FRR
-        * would want a `!(*cb)->write.thread.  This was
-        * attempted in e08165def1c62beee0e87385 but this
-        * change caused `make check` to stop working
-        * which was not noticed because our CI system
-        * does not build with zeromq.  Put this back
-        * to the code as written in 2017.  e08165de..
-        * was introduced in 2021.  So someone was ok
-        * with frrzmq_thread_cancel for 4 years.  This will
-        * allow those people doing `make check` to continue
-        * working.  In the meantime if the people using
-        * this code see an issue they can fix it
+       event_cancel(&core->thread);
+
+       /* If cancelled from within a callback, don't try to free memory
+        * in this path.
         */
+       if ((*cb)->in_cb)
+               return;
+
+       /* Ok to free the callback context if no more ... context. */
        if ((*cb)->read.cancelled && !(*cb)->read.thread
-           && (*cb)->write.cancelled && (*cb)->write.thread)
+           && (*cb)->write.cancelled && ((*cb)->write.thread == NULL))
                XFREE(MTYPE_ZEROMQ_CB, *cb);
 }
 
@@ -355,13 +344,13 @@ void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
        if ((events & event) && core->thread && !core->cancelled) {
                struct thread_master *tm = core->thread->master;
 
-               thread_cancel(&core->thread);
+               event_cancel(&core->thread);
 
                if (event == ZMQ_POLLIN)
-                       thread_add_event(tm, frrzmq_read_msg,
-                                        cbp, cb->fd, &core->thread);
+                       event_add_event(tm, frrzmq_read_msg, cbp, cb->fd,
+                                       &core->thread);
                else
-                       thread_add_event(tm, frrzmq_write_msg,
-                                        cbp, cb->fd, &core->thread);
+                       event_add_event(tm, frrzmq_write_msg, cbp, cb->fd,
+                                       &core->thread);
        }
 }