]> git.proxmox.com Git - mirror_frr.git/commitdiff
Merge pull request #1478 from bingen/zeromq4
authorDonald Sharp <sharpd@cumulusnetworks.com>
Wed, 13 Dec 2017 12:36:57 +0000 (07:36 -0500)
committerGitHub <noreply@github.com>
Wed, 13 Dec 2017 12:36:57 +0000 (07:36 -0500)
lib: Address ZMQ lib TODOs

lib/frr_zmq.c
lib/frr_zmq.h
tests/lib/test_zmq.c
tests/lib/test_zmq.refout

index 861f7a5f0c2ea8659752e2b750cf4e895022fe1f..d4df5130e7e33e40d04da993e9f4fec731896cb6 100644 (file)
@@ -47,46 +47,43 @@ void frrzmq_finish(void)
        }
 }
 
-/* read callback integration */
-struct frrzmq_cb {
-       struct thread *thread;
-       void *zmqsock;
-       void *arg;
-       int fd;
-
-       bool cancelled;
-
-       void (*cb_msg)(void *arg, void *zmqsock);
-       void (*cb_part)(void *arg, void *zmqsock,
-                       zmq_msg_t *msg, unsigned partnum);
-};
-
-
 static int frrzmq_read_msg(struct thread *t)
 {
-       struct frrzmq_cb *cb = THREAD_ARG(t);
+       struct frrzmq_cb **cbp = THREAD_ARG(t);
+       struct frrzmq_cb *cb;
        zmq_msg_t msg;
        unsigned partno;
+       unsigned char read = 0;
        int ret, more;
        size_t moresz;
 
+       if (!cbp)
+               return 1;
+       cb = (*cbp);
+       if (!cb || !cb->zmqsock)
+               return 1;
+
        while (1) {
-               zmq_pollitem_t polli = {
-                       .socket = cb->zmqsock,
-                       .events = ZMQ_POLLIN
-               };
+               zmq_pollitem_t polli = {.socket = cb->zmqsock,
+                                       .events = ZMQ_POLLIN};
                ret = zmq_poll(&polli, 1, 0);
 
                if (ret < 0)
                        goto out_err;
+
                if (!(polli.revents & ZMQ_POLLIN))
                        break;
 
-               if (cb->cb_msg) {
-                       cb->cb_msg(cb->arg, cb->zmqsock);
+               if (cb->read.cb_msg) {
+                       cb->read.cb_msg(cb->read.arg, cb->zmqsock);
+                       read = 1;
 
-                       if (cb->cancelled) {
-                               XFREE(MTYPE_ZEROMQ_CB, cb);
+                       if (cb->read.cancelled) {
+                               frrzmq_check_events(cbp, &cb->write,
+                                                   ZMQ_POLLOUT);
+                               cb->read.thread = NULL;
+                               if (cb->write.cancelled && !cb->write.thread)
+                                       XFREE(MTYPE_ZEROMQ_CB, cb);
                                return 0;
                        }
                        continue;
@@ -104,11 +101,17 @@ static int frrzmq_read_msg(struct thread *t)
                                zmq_msg_close(&msg);
                                goto out_err;
                        }
+                       read = 1;
 
-                       cb->cb_part(cb->arg, cb->zmqsock, &msg, partno);
-                       if (cb->cancelled) {
+                       cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
+                                        partno);
+                       if (cb->read.cancelled) {
                                zmq_msg_close(&msg);
-                               XFREE(MTYPE_ZEROMQ_CB, cb);
+                               frrzmq_check_events(cbp, &cb->write,
+                                                   ZMQ_POLLOUT);
+                               cb->read.thread = NULL;
+                               if (cb->write.cancelled && !cb->write.thread)
+                                       XFREE(MTYPE_ZEROMQ_CB, cb);
                                return 0;
                        }
 
@@ -116,8 +119,8 @@ static int frrzmq_read_msg(struct thread *t)
                         * message; don't use zmq_msg_more here */
                        moresz = sizeof(more);
                        more = 0;
-                       ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE,
-                                            &more, &moresz);
+                       ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
+                                            &moresz);
                        if (ret < 0) {
                                zmq_msg_close(&msg);
                                goto out_err;
@@ -128,64 +131,221 @@ static int frrzmq_read_msg(struct thread *t)
                zmq_msg_close(&msg);
        }
 
-       funcname_thread_add_read_write(THREAD_READ, t->master, frrzmq_read_msg,
-                       cb, cb->fd, &cb->thread, t->funcname, t->schedfrom,
-                       t->schedfrom_line);
+       if (read)
+               frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
+
+       funcname_thread_add_read_write(
+               THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
+               &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
        return 0;
 
 out_err:
-       zlog_err("ZeroMQ error: %s(%d)", strerror (errno), errno);
-       return 0;
+       zlog_err("ZeroMQ read error: %s(%d)", strerror(errno), errno);
+       if (cb->read.cb_error)
+               cb->read.cb_error(cb->read.arg, cb->zmqsock);
+       return 1;
 }
 
-struct frrzmq_cb *funcname_frrzmq_thread_add_read(
-               struct thread_master *master,
-               void (*msgfunc)(void *arg, void *zmqsock),
-               void (*partfunc)(void *arg, void *zmqsock,
-                                zmq_msg_t *msg, unsigned partnum),
-               void *arg, void *zmqsock, debugargdef)
+int funcname_frrzmq_thread_add_read(struct thread_master *master,
+                                   void (*msgfunc)(void *arg, void *zmqsock),
+                                   void (*partfunc)(void *arg, void *zmqsock,
+                                                    zmq_msg_t *msg,
+                                                    unsigned partnum),
+                                   void (*errfunc)(void *arg, void *zmqsock),
+                                   void *arg, void *zmqsock,
+                                   struct frrzmq_cb **cbp, debugargdef)
 {
        int fd, events;
        size_t len;
        struct frrzmq_cb *cb;
 
+       if (!cbp)
+               return -1;
        if (!(msgfunc || partfunc) || (msgfunc && partfunc))
-               return NULL;
+               return -1;
+       len = sizeof(fd);
+       if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
+               return -1;
+       len = sizeof(events);
+       if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
+               return -1;
+
+       if (*cbp)
+               cb = *cbp;
+       else {
+               cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
+               cb->write.cancelled = 1;
+               if (!cb)
+                       return -1;
+               *cbp = cb;
+       }
+
+       cb->zmqsock = zmqsock;
+       cb->fd = fd;
+       cb->read.arg = arg;
+       cb->read.cb_msg = msgfunc;
+       cb->read.cb_part = partfunc;
+       cb->read.cb_error = errfunc;
+       cb->read.cancelled = 0;
+
+       if (events & ZMQ_POLLIN) {
+               if (cb->read.thread) {
+                       thread_cancel(cb->read.thread);
+                       cb->read.thread = NULL;
+               }
+               funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
+                                         &cb->read.thread, funcname, schedfrom,
+                                         fromln);
+       } else
+               funcname_thread_add_read_write(
+                       THREAD_READ, master, frrzmq_read_msg, cbp, fd,
+                       &cb->read.thread, funcname, schedfrom, fromln);
+       return 0;
+}
+
+static int frrzmq_write_msg(struct thread *t)
+{
+       struct frrzmq_cb **cbp = THREAD_ARG(t);
+       struct frrzmq_cb *cb;
+       unsigned char written = 0;
+       int ret;
+
+       if (!cbp)
+               return 1;
+       cb = (*cbp);
+       if (!cb || !cb->zmqsock)
+               return 1;
+
+       while (1) {
+               zmq_pollitem_t polli = {.socket = cb->zmqsock,
+                                       .events = ZMQ_POLLOUT};
+               ret = zmq_poll(&polli, 1, 0);
+
+               if (ret < 0)
+                       goto out_err;
+
+               if (!(polli.revents & ZMQ_POLLOUT))
+                       break;
+
+               if (cb->write.cb_msg) {
+                       cb->write.cb_msg(cb->write.arg, cb->zmqsock);
+                       written = 1;
+
+                       if (cb->write.cancelled) {
+                               frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
+                               cb->write.thread = NULL;
+                               if (cb->read.cancelled && !cb->read.thread)
+                                       XFREE(MTYPE_ZEROMQ_CB, cb);
+                               return 0;
+                       }
+                       continue;
+               }
+       }
+
+       if (written)
+               frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
+
+       funcname_thread_add_read_write(THREAD_WRITE, t->master,
+                                      frrzmq_write_msg, cbp, cb->fd,
+                                      &cb->write.thread, t->funcname,
+                                      t->schedfrom, t->schedfrom_line);
+       return 0;
+
+out_err:
+       zlog_err("ZeroMQ write error: %s(%d)", strerror(errno), errno);
+       if (cb->write.cb_error)
+               cb->write.cb_error(cb->write.arg, cb->zmqsock);
+       return 1;
+}
+int funcname_frrzmq_thread_add_write(struct thread_master *master,
+                                    void (*msgfunc)(void *arg, void *zmqsock),
+                                    void (*errfunc)(void *arg, void *zmqsock),
+                                    void *arg, void *zmqsock,
+                                    struct frrzmq_cb **cbp, debugargdef)
+{
+       int fd, events;
+       size_t len;
+       struct frrzmq_cb *cb;
+
+       if (!cbp)
+               return -1;
+       if (!msgfunc)
+               return -1;
        len = sizeof(fd);
        if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
-               return NULL;
+               return -1;
        len = sizeof(events);
        if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
-               return NULL;
+               return -1;
 
-       cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
-       if (!cb)
-               return NULL;
+       if (*cbp)
+               cb = *cbp;
+       else {
+               cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
+               cb->read.cancelled = 1;
+               if (!cb)
+                       return -1;
+               *cbp = cb;
+       }
 
-       cb->arg = arg;
        cb->zmqsock = zmqsock;
-       cb->cb_msg = msgfunc;
-       cb->cb_part = partfunc;
        cb->fd = fd;
+       cb->write.arg = arg;
+       cb->write.cb_msg = msgfunc;
+       cb->write.cb_part = NULL;
+       cb->write.cb_error = errfunc;
+       cb->write.cancelled = 0;
+
+       if (events & ZMQ_POLLOUT) {
+               if (cb->write.thread) {
+                       thread_cancel(cb->write.thread);
+                       cb->write.thread = NULL;
+               }
+               funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
+                                         &cb->write.thread, funcname,
+                                         schedfrom, fromln);
+       } else
+               funcname_thread_add_read_write(
+                       THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
+                       &cb->write.thread, funcname, schedfrom, fromln);
+       return 0;
+}
 
-       if (events & ZMQ_POLLIN)
-               funcname_thread_add_event(master,
-                               frrzmq_read_msg, cb, fd, &cb->thread,
-                               funcname, schedfrom, fromln);
-       else
-               funcname_thread_add_read_write(THREAD_READ, master,
-                               frrzmq_read_msg, cb, fd, &cb->thread,
-                               funcname, schedfrom, fromln);
-       return cb;
+void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
+{
+       if (!cb || !*cb)
+               return;
+       core->cancelled = 1;
+       if (core->thread) {
+               thread_cancel(core->thread);
+               core->thread = NULL;
+       }
+       if ((*cb)->read.cancelled && !(*cb)->read.thread
+           && (*cb)->write.cancelled && (*cb)->write.thread)
+               XFREE(MTYPE_ZEROMQ_CB, *cb);
 }
 
-void frrzmq_thread_cancel(struct frrzmq_cb *cb)
+void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
+                        int event)
 {
-       if (!cb->thread) {
-               /* canceling from within callback */
-               cb->cancelled = 1;
+       struct frrzmq_cb *cb;
+       int events;
+       size_t len;
+
+       if (!cbp)
+               return;
+       cb = (*cbp);
+       if (!cb || !cb->zmqsock)
+               return;
+
+       if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
                return;
+       if (events & event && core->thread && !core->cancelled) {
+               struct thread_master *tm = core->thread->master;
+               thread_cancel(core->thread);
+               core->thread = NULL;
+               thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
+                                                         : frrzmq_write_msg),
+                                cbp, cb->fd, &core->thread);
        }
-       thread_cancel(cb->thread);
-       XFREE(MTYPE_ZEROMQ_CB, cb);
 }
index 69c6f8580dbb6bb68adc69a8af191d8a295ac6dd..1146b879640c87ba66e17c3b7b01c2919a1108b2 100644 (file)
  *   foo_LDFLAGS = libfrrzmq.la libfrr.la $(ZEROMQ_LIBS)
  */
 
+/* callback integration */
+struct cb_core {
+       struct thread *thread;
+       void *arg;
+
+       bool cancelled;
+
+       void (*cb_msg)(void *arg, void *zmqsock);
+       void (*cb_part)(void *arg, void *zmqsock, zmq_msg_t *msg,
+                       unsigned partnum);
+       void (*cb_error)(void *arg, void *zmqsock);
+};
+struct frrzmq_cb {
+       void *zmqsock;
+       int fd;
+
+       struct cb_core read;
+       struct cb_core write;
+};
+
 /* libzmq's context
  *
  * this is mostly here as a convenience, it has IPv6 enabled but nothing
  */
 extern void *frrzmq_context;
 
-extern void frrzmq_init (void);
-extern void frrzmq_finish (void);
+extern void frrzmq_init(void);
+extern void frrzmq_finish(void);
 
 #define debugargdef const char *funcname, const char *schedfrom, int fromln
 
 /* core event registration, one of these 2 macros should be used */
-#define frrzmq_thread_add_read_msg(m,f,a,z) funcname_frrzmq_thread_add_read( \
-                               m,f,NULL,a,z,#f,__FILE__,__LINE__)
-#define frrzmq_thread_add_read_part(m,f,a,z) funcname_frrzmq_thread_add_read( \
-                               m,NULL,f,a,z,#f,__FILE__,__LINE__)
+#define frrzmq_thread_add_read_msg(m, f, e, a, z, d)                           \
+       funcname_frrzmq_thread_add_read(m, f, NULL, e, a, z, d, #f, __FILE__,  \
+                                       __LINE__)
+#define frrzmq_thread_add_read_part(m, f, e, a, z, d)                          \
+       funcname_frrzmq_thread_add_read(m, NULL, f, e, a, z, d, #f, __FILE__,  \
+                                       __LINE__)
+#define frrzmq_thread_add_write_msg(m, f, e, a, z, d)                          \
+       funcname_frrzmq_thread_add_write(m, f, e, a, z, d, #f, __FILE__,       \
+                                        __LINE__)
 
+struct cb_core;
 struct frrzmq_cb;
 
-/* Set up a POLLIN notification to be called from the libfrr main loop.
- * This has the following properties:
+/* Set up a POLLIN or POLLOUT notification to be called from the libfrr main
+ * loop. This has the following properties:
  *
  * - since ZeroMQ works with edge triggered notifications, it will loop and
  *   dispatch as many events as ZeroMQ has pending at the time libfrr calls
@@ -67,22 +93,35 @@ struct frrzmq_cb;
  *   - if partfunc is specified, the message is read and partfunc is called
  *     for each ZeroMQ multi-part subpart.  Note that you can't send replies
  *     before all parts have been read because that violates the ZeroMQ FSM.
+ * - write version doesn't allow for partial callback, you must handle the
+ *   whole message (all parts) in msgfunc callback
  * - you can safely cancel the callback from within itself
  * - installing a callback will check for pending events (ZMQ_EVENTS) and
  *   may schedule the event to run as soon as libfrr is back in its main
  *   loop.
+ */
+extern int funcname_frrzmq_thread_add_read(
+       struct thread_master *master, void (*msgfunc)(void *arg, void *zmqsock),
+       void (*partfunc)(void *arg, void *zmqsock, zmq_msg_t *msg,
+                        unsigned partnum),
+       void (*errfunc)(void *arg, void *zmqsock), void *arg, void *zmqsock,
+       struct frrzmq_cb **cb, debugargdef);
+extern int funcname_frrzmq_thread_add_write(
+       struct thread_master *master, void (*msgfunc)(void *arg, void *zmqsock),
+       void (*errfunc)(void *arg, void *zmqsock), void *arg, void *zmqsock,
+       struct frrzmq_cb **cb, debugargdef);
+
+extern void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core);
+
+/*
+ * http://api.zeromq.org/4-2:zmq-getsockopt#toc10
  *
- * TODO #1: add ZMQ_POLLERR / error callback
- * TODO #2: add frrzmq_check_events() function to check for edge triggered
- *          things that may have happened after a zmq_send() call or so
+ * As the descriptor is edge triggered, applications must update the state of
+ * ZMQ_EVENTS after each invocation of zmq_send or zmq_recv.To be more explicit:
+ * after calling zmq_send the socket may become readable (and vice versa)
+ * without triggering a read event on the file descriptor.
  */
-extern struct frrzmq_cb *funcname_frrzmq_thread_add_read(
-               struct thread_master *master,
-               void (*msgfunc)(void *arg, void *zmqsock),
-               void (*partfunc)(void *arg, void *zmqsock,
-                                zmq_msg_t *msg, unsigned partnum),
-               void *arg, void *zmqsock, debugargdef);
-
-extern void frrzmq_thread_cancel(struct frrzmq_cb *cb);
+extern void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
+                               int event);
 
 #endif /* _FRRZMQ_H */
index c270ec3d18f8faa240f90c070d29c128d932f6b2..b6624915e874f47009dc32e212e825930eb1a318 100644 (file)
@@ -23,6 +23,7 @@
 #include "frr_zmq.h"
 
 DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer")
+DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message")
 
 static struct thread_master *master;
 
@@ -31,6 +32,25 @@ static void msg_buf_free(void *data, void *hint)
        XFREE(MTYPE_TESTBUF, data);
 }
 
+static int recv_delim(void *zmqsock)
+{
+       /* receive delim */
+       zmq_msg_t zdelim;
+       int more;
+       zmq_msg_init(&zdelim);
+       zmq_msg_recv(&zdelim, zmqsock, 0);
+       more = zmq_msg_more(&zdelim);
+       zmq_msg_close(&zdelim);
+       return more;
+}
+static void send_delim(void *zmqsock)
+{
+       /* Send delim */
+       zmq_msg_t zdelim;
+       zmq_msg_init(&zdelim);
+       zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE);
+       zmq_msg_close(&zdelim);
+}
 static void run_client(int syncfd)
 {
        int i, j;
@@ -38,13 +58,14 @@ static void run_client(int syncfd)
        char dummy;
        void *zmqctx = NULL;
        void *zmqsock;
+       int more;
 
        read(syncfd, &dummy, 1);
 
        zmqctx = zmq_ctx_new();
        zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
 
-       zmqsock = zmq_socket(zmqctx, ZMQ_REQ);
+       zmqsock = zmq_socket(zmqctx, ZMQ_DEALER);
        if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
                perror("zmq_connect");
                exit(1);
@@ -52,22 +73,28 @@ static void run_client(int syncfd)
 
        /* single-part */
        for (i = 0; i < 8; i++) {
-               snprintf(buf, sizeof(buf), "msg #%d %c%c%c",
-                        i, 'a' + i, 'b' + i, 'c' + i);
+               snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i,
+                        'b' + i, 'c' + i);
                printf("client send: %s\n", buf);
                fflush(stdout);
-               zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
-               zmq_recv(zmqsock, buf, sizeof(buf), 0);
-               printf("client recv: %s\n", buf);
+               send_delim(zmqsock);
+               zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+               more = recv_delim(zmqsock);
+               while (more) {
+                       zmq_recv(zmqsock, buf, sizeof(buf), 0);
+                       printf("client recv: %s\n", buf);
+                       size_t len = sizeof(more);
+                       if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+                               break;
+               }
        }
 
        /* multipart */
        for (i = 2; i < 5; i++) {
-               int more;
-
                printf("---\n");
+               send_delim(zmqsock);
+               zmq_msg_t part;
                for (j = 1; j <= i; j++) {
-                       zmq_msg_t part;
                        char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
 
                        snprintf(dyn, 32, "part %d/%d", j, i);
@@ -79,7 +106,7 @@ static void run_client(int syncfd)
                        zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
                }
 
-               zmq_msg_t part;
+               recv_delim(zmqsock);
                do {
                        char *data;
 
@@ -90,26 +117,85 @@ static void run_client(int syncfd)
                } while (more);
                zmq_msg_close(&part);
        }
+
+       /* write callback */
+       printf("---\n");
+       snprintf(buf, 32, "Done receiving");
+       printf("client send: %s\n", buf);
+       fflush(stdout);
+       send_delim(zmqsock);
+       zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+       /* wait for message from server */
+       more = recv_delim(zmqsock);
+       while (more) {
+               zmq_recv(zmqsock, buf, sizeof(buf), 0);
+               printf("client recv: %s\n", buf);
+               size_t len = sizeof(more);
+               if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+                       break;
+       }
+
        zmq_close(zmqsock);
        zmq_ctx_term(zmqctx);
 }
 
 static struct frrzmq_cb *cb;
 
+static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+       /* receive id */
+       zmq_msg_init(msg_id);
+       zmq_msg_recv(msg_id, zmqsock, 0);
+       /* receive delim */
+       recv_delim(zmqsock);
+}
+static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+       /* Send Id */
+       zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE);
+       send_delim(zmqsock);
+}
+static void serverwritefn(void *arg, void *zmqsock)
+{
+       zmq_msg_t *msg_id = (zmq_msg_t *)arg;
+       char buf[32] = "Test write callback";
+       size_t i;
+
+       for (i = 0; i < strlen(buf); i++)
+               buf[i] = toupper(buf[i]);
+       printf("server send: %s\n", buf);
+       fflush(stdout);
+       send_id_and_delim(zmqsock, msg_id);
+       zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+
+       /* send just once */
+       frrzmq_thread_cancel(&cb, &cb->write);
+
+       zmq_msg_close(msg_id);
+       XFREE(MTYPE_ZMQMSG, msg_id);
+}
 static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
-                       unsigned partnum)
+                        unsigned partnum)
 {
+       static int num = 0;
        int more = zmq_msg_more(msg);
        char *in = zmq_msg_data(msg);
        size_t i;
        zmq_msg_t reply;
        char *out;
 
+       /* Id */
+       if (partnum == 0) {
+               send_id_and_delim(zmqsock, msg);
+               return;
+       }
+       /* Delim */
+       if (partnum == 1)
+               return;
+
+
        printf("server recv part %u (more: %d): %s\n", partnum, more, in);
        fflush(stdout);
-       /* REQ-REP doesn't allow sending a reply here */
-       if (more)
-               return;
 
        out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
        for (i = 0; i < strlen(in); i++)
@@ -118,39 +204,66 @@ static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
        zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
        zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
 
+       if (more)
+               return;
+
        out = XMALLOC(MTYPE_TESTBUF, 32);
        snprintf(out, 32, "msg# was %u", partnum);
        zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
        zmq_msg_send(&reply, zmqsock, 0);
+
+       zmq_msg_close(&reply);
+
+       if (++num < 7)
+               return;
+
+       /* write callback test */
+       char buf[32];
+       zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t));
+       recv_id_and_delim(zmqsock, msg_id);
+       zmq_recv(zmqsock, buf, sizeof(buf), 0);
+       printf("server recv: %s\n", buf);
+       fflush(stdout);
+
+       frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id,
+                                   zmqsock, &cb);
 }
 
 static void serverfn(void *arg, void *zmqsock)
 {
        static int num = 0;
 
+       zmq_msg_t msg_id;
        char buf[32];
        size_t i;
+
+       recv_id_and_delim(zmqsock, &msg_id);
        zmq_recv(zmqsock, buf, sizeof(buf), 0);
 
        printf("server recv: %s\n", buf);
        fflush(stdout);
        for (i = 0; i < strlen(buf); i++)
                buf[i] = toupper(buf[i]);
+       send_id_and_delim(zmqsock, &msg_id);
+       zmq_msg_close(&msg_id);
        zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
 
        if (++num < 4)
                return;
 
        /* change to multipart callback */
-       frrzmq_thread_cancel(cb);
+       frrzmq_thread_cancel(&cb, &cb->read);
+       frrzmq_thread_cancel(&cb, &cb->write);
 
-       cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock);
+       frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock,
+                                   &cb);
 }
 
 static void sigchld(void)
 {
        printf("child exited.\n");
-       frrzmq_thread_cancel(cb);
+       frrzmq_thread_cancel(&cb, &cb->read);
+       frrzmq_thread_cancel(&cb, &cb->write);
 }
 
 static struct quagga_signal_t sigs[] = {
@@ -170,13 +283,13 @@ static void run_server(int syncfd)
        signal_init(master, array_size(sigs), sigs);
        frrzmq_init();
 
-       zmqsock = zmq_socket(frrzmq_context, ZMQ_REP);
+       zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER);
        if (zmq_bind(zmqsock, "tcp://*:17171")) {
                perror("zmq_bind");
                exit(1);
        }
 
-       cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock);
+       frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb);
 
        write(syncfd, &dummy, sizeof(dummy));
        while (thread_fetch(master, &t))
index 61f45f02b137241d7ca17c804c8d37e9de2f26ef..acac50553d15ec01f04eac697ac9d3fbdd5bf6bb 100644 (file)
@@ -11,40 +11,57 @@ client send: msg #3 def
 server recv: msg #3 def
 client recv: MSG #3 DEF
 client send: msg #4 efg
-server recv part 0 (more: 0): msg #4 efg
+server recv part 2 (more: 0): msg #4 efg
 client recv: MSG #4 EFG
+client recv: msg# was 2
 client send: msg #5 fgh
-client recv: msg# was 0
+server recv part 2 (more: 0): msg #5 fgh
+client recv: MSG #5 FGH
+client recv: msg# was 2
 client send: msg #6 ghi
-server recv part 0 (more: 0): msg #6 ghi
+server recv part 2 (more: 0): msg #6 ghi
 client recv: MSG #6 GHI
+client recv: msg# was 2
 client send: msg #7 hij
-client recv: msg# was 0
+server recv part 2 (more: 0): msg #7 hij
+client recv: MSG #7 HIJ
+client recv: msg# was 2
 ---
 client send: part 1/2
 client send: part 2/2
-server recv part 0 (more: 1): part 1/2
-server recv part 1 (more: 0): part 2/2
+server recv part 2 (more: 1): part 1/2
+server recv part 3 (more: 0): part 2/2
+client recv (more: 1): PART 1/2
 client recv (more: 1): PART 2/2
-client recv (more: 0): msg# was 1
+client recv (more: 0): msg# was 3
 ---
 client send: part 1/3
 client send: part 2/3
 client send: part 3/3
-server recv part 0 (more: 1): part 1/3
-server recv part 1 (more: 1): part 2/3
-server recv part 2 (more: 0): part 3/3
+server recv part 2 (more: 1): part 1/3
+server recv part 3 (more: 1): part 2/3
+server recv part 4 (more: 0): part 3/3
+client recv (more: 1): PART 1/3
+client recv (more: 1): PART 2/3
 client recv (more: 1): PART 3/3
-client recv (more: 0): msg# was 2
+client recv (more: 0): msg# was 4
 ---
 client send: part 1/4
 client send: part 2/4
 client send: part 3/4
 client send: part 4/4
-server recv part 0 (more: 1): part 1/4
-server recv part 1 (more: 1): part 2/4
-server recv part 2 (more: 1): part 3/4
-server recv part 3 (more: 0): part 4/4
+server recv part 2 (more: 1): part 1/4
+server recv part 3 (more: 1): part 2/4
+server recv part 4 (more: 1): part 3/4
+server recv part 5 (more: 0): part 4/4
+client recv (more: 1): PART 1/4
+client recv (more: 1): PART 2/4
+client recv (more: 1): PART 3/4
 client recv (more: 1): PART 4/4
-client recv (more: 0): msg# was 3
+client recv (more: 0): msg# was 5
+---
+client send: Done receiving
+server recv: Done receiving
+server send: TEST WRITE CALLBACK
+client recv: TEST WRITE CALLBACK
 child exited.