+// SPDX-License-Identifier: GPL-2.0-or-later
/*
* libzebra ZeroMQ bindings
* Copyright (C) 2015 David Lamparter
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the Free
- * Software Foundation; either version 2 of the License, or (at your option)
- * any later version.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; see the file COPYING; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
* IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that
- * the test_zmq.c unit test is still working. There are dependancies
+ * the test_zmq.c unit test is still working. There are dependencies
* between the two that are extremely fragile. My understanding
* is that there is specialized ownership of the cb pointer based
* upon what is happening. Those assumptions are supposed to be
}
}
-static int frrzmq_read_msg(struct thread *t)
+static void frrzmq_read_msg(struct thread *t)
{
struct frrzmq_cb **cbp = THREAD_ARG(t);
struct frrzmq_cb *cb;
size_t moresz;
if (!cbp)
- return 1;
+ return;
cb = (*cbp);
if (!cb || !cb->zmqsock)
- return 1;
+ return;
while (1) {
zmq_pollitem_t polli = {.socket = cb->zmqsock,
break;
if (cb->read.cb_msg) {
+ cb->in_cb = true;
cb->read.cb_msg(cb->read.arg, cb->zmqsock);
+ cb->in_cb = false;
+
read = 1;
if (cb->read.cancelled) {
ZMQ_POLLOUT);
cb->read.thread = NULL;
if (cb->write.cancelled && !cb->write.thread)
- XFREE(MTYPE_ZEROMQ_CB, cb);
- return 0;
+ XFREE(MTYPE_ZEROMQ_CB, *cbp);
+
+ return;
}
continue;
}
}
read = 1;
+ cb->in_cb = true;
cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
partno);
+ cb->in_cb = false;
+
if (cb->read.cancelled) {
zmq_msg_close(&msg);
frrzmq_check_events(cbp, &cb->write,
ZMQ_POLLOUT);
cb->read.thread = NULL;
if (cb->write.cancelled && !cb->write.thread)
- XFREE(MTYPE_ZEROMQ_CB, cb);
- return 0;
+ XFREE(MTYPE_ZEROMQ_CB, *cbp);
+
+ return;
}
/* cb_part may have read additional parts of the
thread_add_read(t->master, frrzmq_read_msg, cbp,
cb->fd, &cb->read.thread);
- return 0;
+ return;
out_err:
flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
errno);
if (cb->read.cb_error)
cb->read.cb_error(cb->read.arg, cb->zmqsock);
- return 1;
}
int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
cb = *cbp;
else {
cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
-
cb->write.cancelled = true;
*cbp = cb;
}
cb->read.cb_part = partfunc;
cb->read.cb_error = errfunc;
cb->read.cancelled = false;
+ cb->in_cb = false;
if (events & ZMQ_POLLIN) {
thread_cancel(&cb->read.thread);
return 0;
}
-static int frrzmq_write_msg(struct thread *t)
+static void frrzmq_write_msg(struct thread *t)
{
struct frrzmq_cb **cbp = THREAD_ARG(t);
struct frrzmq_cb *cb;
int ret;
if (!cbp)
- return 1;
+ return;
cb = (*cbp);
if (!cb || !cb->zmqsock)
- return 1;
+ return;
while (1) {
zmq_pollitem_t polli = {.socket = cb->zmqsock,
break;
if (cb->write.cb_msg) {
+ cb->in_cb = true;
cb->write.cb_msg(cb->write.arg, cb->zmqsock);
+ cb->in_cb = false;
+
written = 1;
if (cb->write.cancelled) {
frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
cb->write.thread = NULL;
if (cb->read.cancelled && !cb->read.thread)
- XFREE(MTYPE_ZEROMQ_CB, cb);
- return 0;
+ XFREE(MTYPE_ZEROMQ_CB, *cbp);
+
+ return;
}
continue;
}
thread_add_write(t->master, frrzmq_write_msg, cbp,
cb->fd, &cb->write.thread);
- return 0;
+ return;
out_err:
flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
errno);
if (cb->write.cb_error)
cb->write.cb_error(cb->write.arg, cb->zmqsock);
- return 1;
}
int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
cb = *cbp;
else {
cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
-
cb->read.cancelled = true;
*cbp = cb;
}
cb->write.cb_part = NULL;
cb->write.cb_error = errfunc;
cb->write.cancelled = false;
+ cb->in_cb = false;
if (events & ZMQ_POLLOUT) {
thread_cancel(&cb->write.thread);
core->cancelled = true;
thread_cancel(&core->thread);
- /*
- * Looking at this code one would assume that FRR
- * would want a `!(*cb)->write.thread. This was
- * attempted in e08165def1c62beee0e87385 but this
- * change caused `make check` to stop working
- * which was not noticed because our CI system
- * does not build with zeromq. Put this back
- * to the code as written in 2017. e08165de..
- * was introduced in 2021. So someone was ok
- * with frrzmq_thread_cancel for 4 years. This will
- * allow those people doing `make check` to continue
- * working. In the meantime if the people using
- * this code see an issue they can fix it
+ /* If cancelled from within a callback, don't try to free memory
+ * in this path.
*/
+ if ((*cb)->in_cb)
+ return;
+
+ /* Ok to free the callback context if no more ... context. */
if ((*cb)->read.cancelled && !(*cb)->read.thread
- && (*cb)->write.cancelled && (*cb)->write.thread)
+ && (*cb)->write.cancelled && ((*cb)->write.thread == NULL))
XFREE(MTYPE_ZEROMQ_CB, *cb);
}