#include "memory.h"
#include "frr_zmq.h"
#include "log.h"
+#include "lib_errors.h"
DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback")
}
}
-/* read callback integration */
-struct frrzmq_cb {
- struct thread *thread;
- void *zmqsock;
- void *arg;
- int fd;
-
- bool cancelled;
-
- void (*cb_msg)(void *arg, void *zmqsock);
- void (*cb_part)(void *arg, void *zmqsock,
- zmq_msg_t *msg, unsigned partnum);
-};
-
-
static int frrzmq_read_msg(struct thread *t)
{
- struct frrzmq_cb *cb = THREAD_ARG(t);
+ struct frrzmq_cb **cbp = THREAD_ARG(t);
+ struct frrzmq_cb *cb;
zmq_msg_t msg;
unsigned partno;
+ unsigned char read = 0;
int ret, more;
size_t moresz;
+ if (!cbp)
+ return 1;
+ cb = (*cbp);
+ if (!cb || !cb->zmqsock)
+ return 1;
+
while (1) {
- zmq_pollitem_t polli = {
- .socket = cb->zmqsock,
- .events = ZMQ_POLLIN
- };
+ zmq_pollitem_t polli = {.socket = cb->zmqsock,
+ .events = ZMQ_POLLIN};
ret = zmq_poll(&polli, 1, 0);
if (ret < 0)
goto out_err;
+
if (!(polli.revents & ZMQ_POLLIN))
break;
- if (cb->cb_msg) {
- cb->cb_msg(cb->arg, cb->zmqsock);
+ if (cb->read.cb_msg) {
+ cb->read.cb_msg(cb->read.arg, cb->zmqsock);
+ read = 1;
- if (cb->cancelled) {
- XFREE(MTYPE_ZEROMQ_CB, cb);
+ if (cb->read.cancelled) {
+ frrzmq_check_events(cbp, &cb->write,
+ ZMQ_POLLOUT);
+ cb->read.thread = NULL;
+ if (cb->write.cancelled && !cb->write.thread)
+ XFREE(MTYPE_ZEROMQ_CB, cb);
return 0;
}
continue;
zmq_msg_close(&msg);
goto out_err;
}
+ read = 1;
- cb->cb_part(cb->arg, cb->zmqsock, &msg, partno);
- if (cb->cancelled) {
+ cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
+ partno);
+ if (cb->read.cancelled) {
zmq_msg_close(&msg);
- XFREE(MTYPE_ZEROMQ_CB, cb);
+ frrzmq_check_events(cbp, &cb->write,
+ ZMQ_POLLOUT);
+ cb->read.thread = NULL;
+ if (cb->write.cancelled && !cb->write.thread)
+ XFREE(MTYPE_ZEROMQ_CB, cb);
return 0;
}
* message; don't use zmq_msg_more here */
moresz = sizeof(more);
more = 0;
- ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE,
- &more, &moresz);
+ ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
+ &moresz);
if (ret < 0) {
zmq_msg_close(&msg);
goto out_err;
zmq_msg_close(&msg);
}
- funcname_thread_add_read_write(THREAD_READ, t->master, frrzmq_read_msg,
- cb, cb->fd, &cb->thread, t->funcname, t->schedfrom,
- t->schedfrom_line);
+ if (read)
+ frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
+
+ funcname_thread_add_read_write(
+ THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
+ &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
return 0;
out_err:
- zlog_err("ZeroMQ error: %s(%d)", strerror (errno), errno);
- return 0;
+ flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
+ errno);
+ if (cb->read.cb_error)
+ cb->read.cb_error(cb->read.arg, cb->zmqsock);
+ return 1;
}
-struct frrzmq_cb *funcname_frrzmq_thread_add_read(
- struct thread_master *master,
- void (*msgfunc)(void *arg, void *zmqsock),
- void (*partfunc)(void *arg, void *zmqsock,
- zmq_msg_t *msg, unsigned partnum),
- void *arg, void *zmqsock, debugargdef)
+int funcname_frrzmq_thread_add_read(struct thread_master *master,
+ void (*msgfunc)(void *arg, void *zmqsock),
+ void (*partfunc)(void *arg, void *zmqsock,
+ zmq_msg_t *msg,
+ unsigned partnum),
+ void (*errfunc)(void *arg, void *zmqsock),
+ void *arg, void *zmqsock,
+ struct frrzmq_cb **cbp, debugargdef)
{
int fd, events;
size_t len;
struct frrzmq_cb *cb;
+ if (!cbp)
+ return -1;
if (!(msgfunc || partfunc) || (msgfunc && partfunc))
- return NULL;
+ return -1;
len = sizeof(fd);
if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
- return NULL;
+ return -1;
len = sizeof(events);
if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
- return NULL;
+ return -1;
- cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
- if (!cb)
- return NULL;
+ if (*cbp)
+ cb = *cbp;
+ else {
+ cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
+ if (!cb)
+ return -1;
+
+ cb->write.cancelled = 1;
+ *cbp = cb;
+ }
- cb->arg = arg;
cb->zmqsock = zmqsock;
- cb->cb_msg = msgfunc;
- cb->cb_part = partfunc;
cb->fd = fd;
+ cb->read.arg = arg;
+ cb->read.cb_msg = msgfunc;
+ cb->read.cb_part = partfunc;
+ cb->read.cb_error = errfunc;
+ cb->read.cancelled = 0;
- if (events & ZMQ_POLLIN)
- funcname_thread_add_event(master,
- frrzmq_read_msg, cb, fd, &cb->thread,
- funcname, schedfrom, fromln);
- else
- funcname_thread_add_read_write(THREAD_READ, master,
- frrzmq_read_msg, cb, fd, &cb->thread,
- funcname, schedfrom, fromln);
- return cb;
+ if (events & ZMQ_POLLIN) {
+ if (cb->read.thread) {
+ thread_cancel(cb->read.thread);
+ cb->read.thread = NULL;
+ }
+ funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
+ &cb->read.thread, funcname, schedfrom,
+ fromln);
+ } else
+ funcname_thread_add_read_write(
+ THREAD_READ, master, frrzmq_read_msg, cbp, fd,
+ &cb->read.thread, funcname, schedfrom, fromln);
+ return 0;
}
-void frrzmq_thread_cancel(struct frrzmq_cb *cb)
+static int frrzmq_write_msg(struct thread *t)
{
- if (!cb->thread) {
- /* canceling from within callback */
- cb->cancelled = 1;
+ struct frrzmq_cb **cbp = THREAD_ARG(t);
+ struct frrzmq_cb *cb;
+ unsigned char written = 0;
+ int ret;
+
+ if (!cbp)
+ return 1;
+ cb = (*cbp);
+ if (!cb || !cb->zmqsock)
+ return 1;
+
+ while (1) {
+ zmq_pollitem_t polli = {.socket = cb->zmqsock,
+ .events = ZMQ_POLLOUT};
+ ret = zmq_poll(&polli, 1, 0);
+
+ if (ret < 0)
+ goto out_err;
+
+ if (!(polli.revents & ZMQ_POLLOUT))
+ break;
+
+ if (cb->write.cb_msg) {
+ cb->write.cb_msg(cb->write.arg, cb->zmqsock);
+ written = 1;
+
+ if (cb->write.cancelled) {
+ frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
+ cb->write.thread = NULL;
+ if (cb->read.cancelled && !cb->read.thread)
+ XFREE(MTYPE_ZEROMQ_CB, cb);
+ return 0;
+ }
+ continue;
+ }
+ }
+
+ if (written)
+ frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
+
+ funcname_thread_add_read_write(THREAD_WRITE, t->master,
+ frrzmq_write_msg, cbp, cb->fd,
+ &cb->write.thread, t->funcname,
+ t->schedfrom, t->schedfrom_line);
+ return 0;
+
+out_err:
+ flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
+ errno);
+ if (cb->write.cb_error)
+ cb->write.cb_error(cb->write.arg, cb->zmqsock);
+ return 1;
+}
+int funcname_frrzmq_thread_add_write(struct thread_master *master,
+ void (*msgfunc)(void *arg, void *zmqsock),
+ void (*errfunc)(void *arg, void *zmqsock),
+ void *arg, void *zmqsock,
+ struct frrzmq_cb **cbp, debugargdef)
+{
+ int fd, events;
+ size_t len;
+ struct frrzmq_cb *cb;
+
+ if (!cbp)
+ return -1;
+ if (!msgfunc)
+ return -1;
+ len = sizeof(fd);
+ if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
+ return -1;
+ len = sizeof(events);
+ if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
+ return -1;
+
+ if (*cbp)
+ cb = *cbp;
+ else {
+ cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
+ if (!cb)
+ return -1;
+
+ cb->read.cancelled = 1;
+ *cbp = cb;
+ }
+
+ cb->zmqsock = zmqsock;
+ cb->fd = fd;
+ cb->write.arg = arg;
+ cb->write.cb_msg = msgfunc;
+ cb->write.cb_part = NULL;
+ cb->write.cb_error = errfunc;
+ cb->write.cancelled = 0;
+
+ if (events & ZMQ_POLLOUT) {
+ if (cb->write.thread) {
+ thread_cancel(cb->write.thread);
+ cb->write.thread = NULL;
+ }
+ funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
+ &cb->write.thread, funcname,
+ schedfrom, fromln);
+ } else
+ funcname_thread_add_read_write(
+ THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
+ &cb->write.thread, funcname, schedfrom, fromln);
+ return 0;
+}
+
+void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
+{
+ if (!cb || !*cb)
+ return;
+ core->cancelled = 1;
+ if (core->thread) {
+ thread_cancel(core->thread);
+ core->thread = NULL;
+ }
+ if ((*cb)->read.cancelled && !(*cb)->read.thread
+ && (*cb)->write.cancelled && (*cb)->write.thread)
+ XFREE(MTYPE_ZEROMQ_CB, *cb);
+}
+
+void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
+ int event)
+{
+ struct frrzmq_cb *cb;
+ int events;
+ size_t len;
+
+ if (!cbp)
+ return;
+ cb = (*cbp);
+ if (!cb || !cb->zmqsock)
+ return;
+
+ len = sizeof(events);
+ if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
return;
+ if (events & event && core->thread && !core->cancelled) {
+ struct thread_master *tm = core->thread->master;
+ thread_cancel(core->thread);
+ core->thread = NULL;
+ thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
+ : frrzmq_write_msg),
+ cbp, cb->fd, &core->thread);
}
- thread_cancel(cb->thread);
- XFREE(MTYPE_ZEROMQ_CB, cb);
}