]> git.proxmox.com Git - mirror_frr.git/blobdiff - lib/frr_zmq.c
zebra, lib: fix the ZEBRA_INTERFACE_VRF_UPDATE zapi message
[mirror_frr.git] / lib / frr_zmq.c
index 861f7a5f0c2ea8659752e2b750cf4e895022fe1f..cfea238d95b1e34de249b4ac7c3012b3497e7aad 100644 (file)
@@ -24,6 +24,7 @@
 #include "memory.h"
 #include "frr_zmq.h"
 #include "log.h"
+#include "lib_errors.h"
 
 DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback")
 
@@ -47,46 +48,43 @@ void frrzmq_finish(void)
        }
 }
 
-/* read callback integration */
-struct frrzmq_cb {
-       struct thread *thread;
-       void *zmqsock;
-       void *arg;
-       int fd;
-
-       bool cancelled;
-
-       void (*cb_msg)(void *arg, void *zmqsock);
-       void (*cb_part)(void *arg, void *zmqsock,
-                       zmq_msg_t *msg, unsigned partnum);
-};
-
-
 static int frrzmq_read_msg(struct thread *t)
 {
-       struct frrzmq_cb *cb = THREAD_ARG(t);
+       struct frrzmq_cb **cbp = THREAD_ARG(t);
+       struct frrzmq_cb *cb;
        zmq_msg_t msg;
        unsigned partno;
+       unsigned char read = 0;
        int ret, more;
        size_t moresz;
 
+       if (!cbp)
+               return 1;
+       cb = (*cbp);
+       if (!cb || !cb->zmqsock)
+               return 1;
+
        while (1) {
-               zmq_pollitem_t polli = {
-                       .socket = cb->zmqsock,
-                       .events = ZMQ_POLLIN
-               };
+               zmq_pollitem_t polli = {.socket = cb->zmqsock,
+                                       .events = ZMQ_POLLIN};
                ret = zmq_poll(&polli, 1, 0);
 
                if (ret < 0)
                        goto out_err;
+
                if (!(polli.revents & ZMQ_POLLIN))
                        break;
 
-               if (cb->cb_msg) {
-                       cb->cb_msg(cb->arg, cb->zmqsock);
+               if (cb->read.cb_msg) {
+                       cb->read.cb_msg(cb->read.arg, cb->zmqsock);
+                       read = 1;
 
-                       if (cb->cancelled) {
-                               XFREE(MTYPE_ZEROMQ_CB, cb);
+                       if (cb->read.cancelled) {
+                               frrzmq_check_events(cbp, &cb->write,
+                                                   ZMQ_POLLOUT);
+                               cb->read.thread = NULL;
+                               if (cb->write.cancelled && !cb->write.thread)
+                                       XFREE(MTYPE_ZEROMQ_CB, cb);
                                return 0;
                        }
                        continue;
@@ -104,11 +102,17 @@ static int frrzmq_read_msg(struct thread *t)
                                zmq_msg_close(&msg);
                                goto out_err;
                        }
+                       read = 1;
 
-                       cb->cb_part(cb->arg, cb->zmqsock, &msg, partno);
-                       if (cb->cancelled) {
+                       cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
+                                        partno);
+                       if (cb->read.cancelled) {
                                zmq_msg_close(&msg);
-                               XFREE(MTYPE_ZEROMQ_CB, cb);
+                               frrzmq_check_events(cbp, &cb->write,
+                                                   ZMQ_POLLOUT);
+                               cb->read.thread = NULL;
+                               if (cb->write.cancelled && !cb->write.thread)
+                                       XFREE(MTYPE_ZEROMQ_CB, cb);
                                return 0;
                        }
 
@@ -116,8 +120,8 @@ static int frrzmq_read_msg(struct thread *t)
                         * message; don't use zmq_msg_more here */
                        moresz = sizeof(more);
                        more = 0;
-                       ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE,
-                                            &more, &moresz);
+                       ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
+                                            &moresz);
                        if (ret < 0) {
                                zmq_msg_close(&msg);
                                goto out_err;
@@ -128,64 +132,226 @@ static int frrzmq_read_msg(struct thread *t)
                zmq_msg_close(&msg);
        }
 
-       funcname_thread_add_read_write(THREAD_READ, t->master, frrzmq_read_msg,
-                       cb, cb->fd, &cb->thread, t->funcname, t->schedfrom,
-                       t->schedfrom_line);
+       if (read)
+               frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
+
+       funcname_thread_add_read_write(
+               THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
+               &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
        return 0;
 
 out_err:
-       zlog_err("ZeroMQ error: %s(%d)", strerror (errno), errno);
-       return 0;
+       flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
+                errno);
+       if (cb->read.cb_error)
+               cb->read.cb_error(cb->read.arg, cb->zmqsock);
+       return 1;
 }
 
-struct frrzmq_cb *funcname_frrzmq_thread_add_read(
-               struct thread_master *master,
-               void (*msgfunc)(void *arg, void *zmqsock),
-               void (*partfunc)(void *arg, void *zmqsock,
-                                zmq_msg_t *msg, unsigned partnum),
-               void *arg, void *zmqsock, debugargdef)
+int funcname_frrzmq_thread_add_read(struct thread_master *master,
+                                   void (*msgfunc)(void *arg, void *zmqsock),
+                                   void (*partfunc)(void *arg, void *zmqsock,
+                                                    zmq_msg_t *msg,
+                                                    unsigned partnum),
+                                   void (*errfunc)(void *arg, void *zmqsock),
+                                   void *arg, void *zmqsock,
+                                   struct frrzmq_cb **cbp, debugargdef)
 {
        int fd, events;
        size_t len;
        struct frrzmq_cb *cb;
 
+       if (!cbp)
+               return -1;
        if (!(msgfunc || partfunc) || (msgfunc && partfunc))
-               return NULL;
+               return -1;
        len = sizeof(fd);
        if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
-               return NULL;
+               return -1;
        len = sizeof(events);
        if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
-               return NULL;
+               return -1;
 
-       cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
-       if (!cb)
-               return NULL;
+       if (*cbp)
+               cb = *cbp;
+       else {
+               cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
+               if (!cb)
+                       return -1;
+
+               cb->write.cancelled = 1;
+               *cbp = cb;
+       }
 
-       cb->arg = arg;
        cb->zmqsock = zmqsock;
-       cb->cb_msg = msgfunc;
-       cb->cb_part = partfunc;
        cb->fd = fd;
+       cb->read.arg = arg;
+       cb->read.cb_msg = msgfunc;
+       cb->read.cb_part = partfunc;
+       cb->read.cb_error = errfunc;
+       cb->read.cancelled = 0;
 
-       if (events & ZMQ_POLLIN)
-               funcname_thread_add_event(master,
-                               frrzmq_read_msg, cb, fd, &cb->thread,
-                               funcname, schedfrom, fromln);
-       else
-               funcname_thread_add_read_write(THREAD_READ, master,
-                               frrzmq_read_msg, cb, fd, &cb->thread,
-                               funcname, schedfrom, fromln);
-       return cb;
+       if (events & ZMQ_POLLIN) {
+               if (cb->read.thread) {
+                       thread_cancel(cb->read.thread);
+                       cb->read.thread = NULL;
+               }
+               funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
+                                         &cb->read.thread, funcname, schedfrom,
+                                         fromln);
+       } else
+               funcname_thread_add_read_write(
+                       THREAD_READ, master, frrzmq_read_msg, cbp, fd,
+                       &cb->read.thread, funcname, schedfrom, fromln);
+       return 0;
 }
 
-void frrzmq_thread_cancel(struct frrzmq_cb *cb)
+static int frrzmq_write_msg(struct thread *t)
 {
-       if (!cb->thread) {
-               /* canceling from within callback */
-               cb->cancelled = 1;
+       struct frrzmq_cb **cbp = THREAD_ARG(t);
+       struct frrzmq_cb *cb;
+       unsigned char written = 0;
+       int ret;
+
+       if (!cbp)
+               return 1;
+       cb = (*cbp);
+       if (!cb || !cb->zmqsock)
+               return 1;
+
+       while (1) {
+               zmq_pollitem_t polli = {.socket = cb->zmqsock,
+                                       .events = ZMQ_POLLOUT};
+               ret = zmq_poll(&polli, 1, 0);
+
+               if (ret < 0)
+                       goto out_err;
+
+               if (!(polli.revents & ZMQ_POLLOUT))
+                       break;
+
+               if (cb->write.cb_msg) {
+                       cb->write.cb_msg(cb->write.arg, cb->zmqsock);
+                       written = 1;
+
+                       if (cb->write.cancelled) {
+                               frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
+                               cb->write.thread = NULL;
+                               if (cb->read.cancelled && !cb->read.thread)
+                                       XFREE(MTYPE_ZEROMQ_CB, cb);
+                               return 0;
+                       }
+                       continue;
+               }
+       }
+
+       if (written)
+               frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
+
+       funcname_thread_add_read_write(THREAD_WRITE, t->master,
+                                      frrzmq_write_msg, cbp, cb->fd,
+                                      &cb->write.thread, t->funcname,
+                                      t->schedfrom, t->schedfrom_line);
+       return 0;
+
+out_err:
+       flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
+                errno);
+       if (cb->write.cb_error)
+               cb->write.cb_error(cb->write.arg, cb->zmqsock);
+       return 1;
+}
+int funcname_frrzmq_thread_add_write(struct thread_master *master,
+                                    void (*msgfunc)(void *arg, void *zmqsock),
+                                    void (*errfunc)(void *arg, void *zmqsock),
+                                    void *arg, void *zmqsock,
+                                    struct frrzmq_cb **cbp, debugargdef)
+{
+       int fd, events;
+       size_t len;
+       struct frrzmq_cb *cb;
+
+       if (!cbp)
+               return -1;
+       if (!msgfunc)
+               return -1;
+       len = sizeof(fd);
+       if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
+               return -1;
+       len = sizeof(events);
+       if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
+               return -1;
+
+       if (*cbp)
+               cb = *cbp;
+       else {
+               cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
+               if (!cb)
+                       return -1;
+
+               cb->read.cancelled = 1;
+               *cbp = cb;
+       }
+
+       cb->zmqsock = zmqsock;
+       cb->fd = fd;
+       cb->write.arg = arg;
+       cb->write.cb_msg = msgfunc;
+       cb->write.cb_part = NULL;
+       cb->write.cb_error = errfunc;
+       cb->write.cancelled = 0;
+
+       if (events & ZMQ_POLLOUT) {
+               if (cb->write.thread) {
+                       thread_cancel(cb->write.thread);
+                       cb->write.thread = NULL;
+               }
+               funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
+                                         &cb->write.thread, funcname,
+                                         schedfrom, fromln);
+       } else
+               funcname_thread_add_read_write(
+                       THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
+                       &cb->write.thread, funcname, schedfrom, fromln);
+       return 0;
+}
+
+void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
+{
+       if (!cb || !*cb)
+               return;
+       core->cancelled = 1;
+       if (core->thread) {
+               thread_cancel(core->thread);
+               core->thread = NULL;
+       }
+       if ((*cb)->read.cancelled && !(*cb)->read.thread
+           && (*cb)->write.cancelled && (*cb)->write.thread)
+               XFREE(MTYPE_ZEROMQ_CB, *cb);
+}
+
+void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
+                        int event)
+{
+       struct frrzmq_cb *cb;
+       int events;
+       size_t len;
+
+       if (!cbp)
+               return;
+       cb = (*cbp);
+       if (!cb || !cb->zmqsock)
+               return;
+
+       len = sizeof(events);
+       if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
                return;
+       if (events & event && core->thread && !core->cancelled) {
+               struct thread_master *tm = core->thread->master;
+               thread_cancel(core->thread);
+               core->thread = NULL;
+               thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
+                                                         : frrzmq_write_msg),
+                                cbp, cb->fd, &core->thread);
        }
-       thread_cancel(cb->thread);
-       XFREE(MTYPE_ZEROMQ_CB, cb);
 }