}
}
-/* read callback integration */
-struct frrzmq_cb {
- struct thread *thread;
- void *zmqsock;
- void *arg;
- int fd;
-
- bool cancelled;
-
- void (*cb_msg)(void *arg, void *zmqsock);
- void (*cb_part)(void *arg, void *zmqsock,
- zmq_msg_t *msg, unsigned partnum);
-};
-
-
static int frrzmq_read_msg(struct thread *t)
{
- struct frrzmq_cb *cb = THREAD_ARG(t);
+ struct frrzmq_cb **cbp = THREAD_ARG(t);
+ struct frrzmq_cb *cb;
zmq_msg_t msg;
unsigned partno;
+ unsigned char read = 0;
int ret, more;
size_t moresz;
+ if (!cbp)
+ return 1;
+ cb = (*cbp);
+ if (!cb || !cb->zmqsock)
+ return 1;
+
while (1) {
- zmq_pollitem_t polli = {
- .socket = cb->zmqsock,
- .events = ZMQ_POLLIN
- };
+ zmq_pollitem_t polli = {.socket = cb->zmqsock,
+ .events = ZMQ_POLLIN};
ret = zmq_poll(&polli, 1, 0);
if (ret < 0)
goto out_err;
+
if (!(polli.revents & ZMQ_POLLIN))
break;
- if (cb->cb_msg) {
- cb->cb_msg(cb->arg, cb->zmqsock);
+ if (cb->read.cb_msg) {
+ cb->read.cb_msg(cb->read.arg, cb->zmqsock);
+ read = 1;
- if (cb->cancelled) {
- XFREE(MTYPE_ZEROMQ_CB, cb);
+ if (cb->read.cancelled) {
+ frrzmq_check_events(cbp, &cb->write,
+ ZMQ_POLLOUT);
+ cb->read.thread = NULL;
+ if (cb->write.cancelled && !cb->write.thread)
+ XFREE(MTYPE_ZEROMQ_CB, cb);
return 0;
}
continue;
zmq_msg_close(&msg);
goto out_err;
}
+ read = 1;
- cb->cb_part(cb->arg, cb->zmqsock, &msg, partno);
- if (cb->cancelled) {
+ cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
+ partno);
+ if (cb->read.cancelled) {
zmq_msg_close(&msg);
- XFREE(MTYPE_ZEROMQ_CB, cb);
+ frrzmq_check_events(cbp, &cb->write,
+ ZMQ_POLLOUT);
+ cb->read.thread = NULL;
+ if (cb->write.cancelled && !cb->write.thread)
+ XFREE(MTYPE_ZEROMQ_CB, cb);
return 0;
}
* message; don't use zmq_msg_more here */
moresz = sizeof(more);
more = 0;
- ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE,
- &more, &moresz);
+ ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
+ &moresz);
if (ret < 0) {
zmq_msg_close(&msg);
goto out_err;
zmq_msg_close(&msg);
}
- funcname_thread_add_read_write(THREAD_READ, t->master, frrzmq_read_msg,
- cb, cb->fd, &cb->thread, t->funcname, t->schedfrom,
- t->schedfrom_line);
+ if (read)
+ frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
+
+ funcname_thread_add_read_write(
+ THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
+ &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
return 0;
out_err:
- zlog_err("ZeroMQ error: %s(%d)", strerror (errno), errno);
- return 0;
+ zlog_err("ZeroMQ read error: %s(%d)", strerror(errno), errno);
+ if (cb->read.cb_error)
+ cb->read.cb_error(cb->read.arg, cb->zmqsock);
+ return 1;
}
-struct frrzmq_cb *funcname_frrzmq_thread_add_read(
- struct thread_master *master,
- void (*msgfunc)(void *arg, void *zmqsock),
- void (*partfunc)(void *arg, void *zmqsock,
- zmq_msg_t *msg, unsigned partnum),
- void *arg, void *zmqsock, debugargdef)
+int funcname_frrzmq_thread_add_read(struct thread_master *master,
+ void (*msgfunc)(void *arg, void *zmqsock),
+ void (*partfunc)(void *arg, void *zmqsock,
+ zmq_msg_t *msg,
+ unsigned partnum),
+ void (*errfunc)(void *arg, void *zmqsock),
+ void *arg, void *zmqsock,
+ struct frrzmq_cb **cbp, debugargdef)
{
int fd, events;
size_t len;
struct frrzmq_cb *cb;
+ if (!cbp)
+ return -1;
if (!(msgfunc || partfunc) || (msgfunc && partfunc))
- return NULL;
+ return -1;
+ len = sizeof(fd);
+ if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
+ return -1;
+ len = sizeof(events);
+ if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
+ return -1;
+
+ if (*cbp)
+ cb = *cbp;
+ else {
+ cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
+ cb->write.cancelled = 1;
+ if (!cb)
+ return -1;
+ *cbp = cb;
+ }
+
+ cb->zmqsock = zmqsock;
+ cb->fd = fd;
+ cb->read.arg = arg;
+ cb->read.cb_msg = msgfunc;
+ cb->read.cb_part = partfunc;
+ cb->read.cb_error = errfunc;
+ cb->read.cancelled = 0;
+
+ if (events & ZMQ_POLLIN) {
+ if (cb->read.thread) {
+ thread_cancel(cb->read.thread);
+ cb->read.thread = NULL;
+ }
+ funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
+ &cb->read.thread, funcname, schedfrom,
+ fromln);
+ } else
+ funcname_thread_add_read_write(
+ THREAD_READ, master, frrzmq_read_msg, cbp, fd,
+ &cb->read.thread, funcname, schedfrom, fromln);
+ return 0;
+}
+
+static int frrzmq_write_msg(struct thread *t)
+{
+ struct frrzmq_cb **cbp = THREAD_ARG(t);
+ struct frrzmq_cb *cb;
+ unsigned char written = 0;
+ int ret;
+
+ if (!cbp)
+ return 1;
+ cb = (*cbp);
+ if (!cb || !cb->zmqsock)
+ return 1;
+
+ while (1) {
+ zmq_pollitem_t polli = {.socket = cb->zmqsock,
+ .events = ZMQ_POLLOUT};
+ ret = zmq_poll(&polli, 1, 0);
+
+ if (ret < 0)
+ goto out_err;
+
+ if (!(polli.revents & ZMQ_POLLOUT))
+ break;
+
+ if (cb->write.cb_msg) {
+ cb->write.cb_msg(cb->write.arg, cb->zmqsock);
+ written = 1;
+
+ if (cb->write.cancelled) {
+ frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
+ cb->write.thread = NULL;
+ if (cb->read.cancelled && !cb->read.thread)
+ XFREE(MTYPE_ZEROMQ_CB, cb);
+ return 0;
+ }
+ continue;
+ }
+ }
+
+ if (written)
+ frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
+
+ funcname_thread_add_read_write(THREAD_WRITE, t->master,
+ frrzmq_write_msg, cbp, cb->fd,
+ &cb->write.thread, t->funcname,
+ t->schedfrom, t->schedfrom_line);
+ return 0;
+
+out_err:
+ zlog_err("ZeroMQ write error: %s(%d)", strerror(errno), errno);
+ if (cb->write.cb_error)
+ cb->write.cb_error(cb->write.arg, cb->zmqsock);
+ return 1;
+}
+int funcname_frrzmq_thread_add_write(struct thread_master *master,
+ void (*msgfunc)(void *arg, void *zmqsock),
+ void (*errfunc)(void *arg, void *zmqsock),
+ void *arg, void *zmqsock,
+ struct frrzmq_cb **cbp, debugargdef)
+{
+ int fd, events;
+ size_t len;
+ struct frrzmq_cb *cb;
+
+ if (!cbp)
+ return -1;
+ if (!msgfunc)
+ return -1;
len = sizeof(fd);
if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
- return NULL;
+ return -1;
len = sizeof(events);
if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
- return NULL;
+ return -1;
- cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
- if (!cb)
- return NULL;
+ if (*cbp)
+ cb = *cbp;
+ else {
+ cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
+ cb->read.cancelled = 1;
+ if (!cb)
+ return -1;
+ *cbp = cb;
+ }
- cb->arg = arg;
cb->zmqsock = zmqsock;
- cb->cb_msg = msgfunc;
- cb->cb_part = partfunc;
cb->fd = fd;
+ cb->write.arg = arg;
+ cb->write.cb_msg = msgfunc;
+ cb->write.cb_part = NULL;
+ cb->write.cb_error = errfunc;
+ cb->write.cancelled = 0;
+
+ if (events & ZMQ_POLLOUT) {
+ if (cb->write.thread) {
+ thread_cancel(cb->write.thread);
+ cb->write.thread = NULL;
+ }
+ funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
+ &cb->write.thread, funcname,
+ schedfrom, fromln);
+ } else
+ funcname_thread_add_read_write(
+ THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
+ &cb->write.thread, funcname, schedfrom, fromln);
+ return 0;
+}
- if (events & ZMQ_POLLIN)
- funcname_thread_add_event(master,
- frrzmq_read_msg, cb, fd, &cb->thread,
- funcname, schedfrom, fromln);
- else
- funcname_thread_add_read_write(THREAD_READ, master,
- frrzmq_read_msg, cb, fd, &cb->thread,
- funcname, schedfrom, fromln);
- return cb;
+void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
+{
+ if (!cb || !*cb)
+ return;
+ core->cancelled = 1;
+ if (core->thread) {
+ thread_cancel(core->thread);
+ core->thread = NULL;
+ }
+ if ((*cb)->read.cancelled && !(*cb)->read.thread
+ && (*cb)->write.cancelled && (*cb)->write.thread)
+ XFREE(MTYPE_ZEROMQ_CB, *cb);
}
-void frrzmq_thread_cancel(struct frrzmq_cb *cb)
+void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
+ int event)
{
- if (!cb->thread) {
- /* canceling from within callback */
- cb->cancelled = 1;
+ struct frrzmq_cb *cb;
+ int events;
+ size_t len;
+
+ if (!cbp)
+ return;
+ cb = (*cbp);
+ if (!cb || !cb->zmqsock)
+ return;
+
+ if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
return;
+ if (events & event && core->thread && !core->cancelled) {
+ struct thread_master *tm = core->thread->master;
+ thread_cancel(core->thread);
+ core->thread = NULL;
+ thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
+ : frrzmq_write_msg),
+ cbp, cb->fd, &core->thread);
}
- thread_cancel(cb->thread);
- XFREE(MTYPE_ZEROMQ_CB, cb);
}
* foo_LDFLAGS = libfrrzmq.la libfrr.la $(ZEROMQ_LIBS)
*/
+/* callback integration */
+struct cb_core {
+ struct thread *thread;
+ void *arg;
+
+ bool cancelled;
+
+ void (*cb_msg)(void *arg, void *zmqsock);
+ void (*cb_part)(void *arg, void *zmqsock, zmq_msg_t *msg,
+ unsigned partnum);
+ void (*cb_error)(void *arg, void *zmqsock);
+};
+struct frrzmq_cb {
+ void *zmqsock;
+ int fd;
+
+ struct cb_core read;
+ struct cb_core write;
+};
+
/* libzmq's context
*
* this is mostly here as a convenience, it has IPv6 enabled but nothing
*/
extern void *frrzmq_context;
-extern void frrzmq_init (void);
-extern void frrzmq_finish (void);
+extern void frrzmq_init(void);
+extern void frrzmq_finish(void);
#define debugargdef const char *funcname, const char *schedfrom, int fromln
/* core event registration, one of these 2 macros should be used */
-#define frrzmq_thread_add_read_msg(m,f,a,z) funcname_frrzmq_thread_add_read( \
- m,f,NULL,a,z,#f,__FILE__,__LINE__)
-#define frrzmq_thread_add_read_part(m,f,a,z) funcname_frrzmq_thread_add_read( \
- m,NULL,f,a,z,#f,__FILE__,__LINE__)
+#define frrzmq_thread_add_read_msg(m, f, e, a, z, d) \
+ funcname_frrzmq_thread_add_read(m, f, NULL, e, a, z, d, #f, __FILE__, \
+ __LINE__)
+#define frrzmq_thread_add_read_part(m, f, e, a, z, d) \
+ funcname_frrzmq_thread_add_read(m, NULL, f, e, a, z, d, #f, __FILE__, \
+ __LINE__)
+#define frrzmq_thread_add_write_msg(m, f, e, a, z, d) \
+ funcname_frrzmq_thread_add_write(m, f, e, a, z, d, #f, __FILE__, \
+ __LINE__)
+struct cb_core;
struct frrzmq_cb;
-/* Set up a POLLIN notification to be called from the libfrr main loop.
- * This has the following properties:
+/* Set up a POLLIN or POLLOUT notification to be called from the libfrr main
+ * loop. This has the following properties:
*
* - since ZeroMQ works with edge triggered notifications, it will loop and
* dispatch as many events as ZeroMQ has pending at the time libfrr calls
* - if partfunc is specified, the message is read and partfunc is called
* for each ZeroMQ multi-part subpart. Note that you can't send replies
* before all parts have been read because that violates the ZeroMQ FSM.
+ * - write version doesn't allow for partial callback, you must handle the
+ * whole message (all parts) in msgfunc callback
* - you can safely cancel the callback from within itself
* - installing a callback will check for pending events (ZMQ_EVENTS) and
* may schedule the event to run as soon as libfrr is back in its main
* loop.
+ */
+extern int funcname_frrzmq_thread_add_read(
+ struct thread_master *master, void (*msgfunc)(void *arg, void *zmqsock),
+ void (*partfunc)(void *arg, void *zmqsock, zmq_msg_t *msg,
+ unsigned partnum),
+ void (*errfunc)(void *arg, void *zmqsock), void *arg, void *zmqsock,
+ struct frrzmq_cb **cb, debugargdef);
+extern int funcname_frrzmq_thread_add_write(
+ struct thread_master *master, void (*msgfunc)(void *arg, void *zmqsock),
+ void (*errfunc)(void *arg, void *zmqsock), void *arg, void *zmqsock,
+ struct frrzmq_cb **cb, debugargdef);
+
+extern void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core);
+
+/*
+ * http://api.zeromq.org/4-2:zmq-getsockopt#toc10
*
- * TODO #1: add ZMQ_POLLERR / error callback
- * TODO #2: add frrzmq_check_events() function to check for edge triggered
- * things that may have happened after a zmq_send() call or so
+ * As the descriptor is edge triggered, applications must update the state of
+ * ZMQ_EVENTS after each invocation of zmq_send or zmq_recv.To be more explicit:
+ * after calling zmq_send the socket may become readable (and vice versa)
+ * without triggering a read event on the file descriptor.
*/
-extern struct frrzmq_cb *funcname_frrzmq_thread_add_read(
- struct thread_master *master,
- void (*msgfunc)(void *arg, void *zmqsock),
- void (*partfunc)(void *arg, void *zmqsock,
- zmq_msg_t *msg, unsigned partnum),
- void *arg, void *zmqsock, debugargdef);
-
-extern void frrzmq_thread_cancel(struct frrzmq_cb *cb);
+extern void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
+ int event);
#endif /* _FRRZMQ_H */
#include "frr_zmq.h"
DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer")
+DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message")
static struct thread_master *master;
XFREE(MTYPE_TESTBUF, data);
}
+static int recv_delim(void *zmqsock)
+{
+ /* receive delim */
+ zmq_msg_t zdelim;
+ int more;
+ zmq_msg_init(&zdelim);
+ zmq_msg_recv(&zdelim, zmqsock, 0);
+ more = zmq_msg_more(&zdelim);
+ zmq_msg_close(&zdelim);
+ return more;
+}
+static void send_delim(void *zmqsock)
+{
+ /* Send delim */
+ zmq_msg_t zdelim;
+ zmq_msg_init(&zdelim);
+ zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE);
+ zmq_msg_close(&zdelim);
+}
static void run_client(int syncfd)
{
int i, j;
char dummy;
void *zmqctx = NULL;
void *zmqsock;
+ int more;
read(syncfd, &dummy, 1);
zmqctx = zmq_ctx_new();
zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
- zmqsock = zmq_socket(zmqctx, ZMQ_REQ);
+ zmqsock = zmq_socket(zmqctx, ZMQ_DEALER);
if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
perror("zmq_connect");
exit(1);
/* single-part */
for (i = 0; i < 8; i++) {
- snprintf(buf, sizeof(buf), "msg #%d %c%c%c",
- i, 'a' + i, 'b' + i, 'c' + i);
+ snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i,
+ 'b' + i, 'c' + i);
printf("client send: %s\n", buf);
fflush(stdout);
- zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
- zmq_recv(zmqsock, buf, sizeof(buf), 0);
- printf("client recv: %s\n", buf);
+ send_delim(zmqsock);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+ more = recv_delim(zmqsock);
+ while (more) {
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+ printf("client recv: %s\n", buf);
+ size_t len = sizeof(more);
+ if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+ break;
+ }
}
/* multipart */
for (i = 2; i < 5; i++) {
- int more;
-
printf("---\n");
+ send_delim(zmqsock);
+ zmq_msg_t part;
for (j = 1; j <= i; j++) {
- zmq_msg_t part;
char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
snprintf(dyn, 32, "part %d/%d", j, i);
zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
}
- zmq_msg_t part;
+ recv_delim(zmqsock);
do {
char *data;
} while (more);
zmq_msg_close(&part);
}
+
+ /* write callback */
+ printf("---\n");
+ snprintf(buf, 32, "Done receiving");
+ printf("client send: %s\n", buf);
+ fflush(stdout);
+ send_delim(zmqsock);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+ /* wait for message from server */
+ more = recv_delim(zmqsock);
+ while (more) {
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+ printf("client recv: %s\n", buf);
+ size_t len = sizeof(more);
+ if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+ break;
+ }
+
zmq_close(zmqsock);
zmq_ctx_term(zmqctx);
}
static struct frrzmq_cb *cb;
+static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+ /* receive id */
+ zmq_msg_init(msg_id);
+ zmq_msg_recv(msg_id, zmqsock, 0);
+ /* receive delim */
+ recv_delim(zmqsock);
+}
+static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+ /* Send Id */
+ zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE);
+ send_delim(zmqsock);
+}
+static void serverwritefn(void *arg, void *zmqsock)
+{
+ zmq_msg_t *msg_id = (zmq_msg_t *)arg;
+ char buf[32] = "Test write callback";
+ size_t i;
+
+ for (i = 0; i < strlen(buf); i++)
+ buf[i] = toupper(buf[i]);
+ printf("server send: %s\n", buf);
+ fflush(stdout);
+ send_id_and_delim(zmqsock, msg_id);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+
+ /* send just once */
+ frrzmq_thread_cancel(&cb, &cb->write);
+
+ zmq_msg_close(msg_id);
+ XFREE(MTYPE_ZMQMSG, msg_id);
+}
static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
- unsigned partnum)
+ unsigned partnum)
{
+ static int num = 0;
int more = zmq_msg_more(msg);
char *in = zmq_msg_data(msg);
size_t i;
zmq_msg_t reply;
char *out;
+ /* Id */
+ if (partnum == 0) {
+ send_id_and_delim(zmqsock, msg);
+ return;
+ }
+ /* Delim */
+ if (partnum == 1)
+ return;
+
+
printf("server recv part %u (more: %d): %s\n", partnum, more, in);
fflush(stdout);
- /* REQ-REP doesn't allow sending a reply here */
- if (more)
- return;
out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
for (i = 0; i < strlen(in); i++)
zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
+ if (more)
+ return;
+
out = XMALLOC(MTYPE_TESTBUF, 32);
snprintf(out, 32, "msg# was %u", partnum);
zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
zmq_msg_send(&reply, zmqsock, 0);
+
+ zmq_msg_close(&reply);
+
+ if (++num < 7)
+ return;
+
+ /* write callback test */
+ char buf[32];
+ zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t));
+ recv_id_and_delim(zmqsock, msg_id);
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+ printf("server recv: %s\n", buf);
+ fflush(stdout);
+
+ frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id,
+ zmqsock, &cb);
}
static void serverfn(void *arg, void *zmqsock)
{
static int num = 0;
+ zmq_msg_t msg_id;
char buf[32];
size_t i;
+
+ recv_id_and_delim(zmqsock, &msg_id);
zmq_recv(zmqsock, buf, sizeof(buf), 0);
printf("server recv: %s\n", buf);
fflush(stdout);
for (i = 0; i < strlen(buf); i++)
buf[i] = toupper(buf[i]);
+ send_id_and_delim(zmqsock, &msg_id);
+ zmq_msg_close(&msg_id);
zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
if (++num < 4)
return;
/* change to multipart callback */
- frrzmq_thread_cancel(cb);
+ frrzmq_thread_cancel(&cb, &cb->read);
+ frrzmq_thread_cancel(&cb, &cb->write);
- cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock);
+ frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock,
+ &cb);
}
static void sigchld(void)
{
printf("child exited.\n");
- frrzmq_thread_cancel(cb);
+ frrzmq_thread_cancel(&cb, &cb->read);
+ frrzmq_thread_cancel(&cb, &cb->write);
}
static struct quagga_signal_t sigs[] = {
signal_init(master, array_size(sigs), sigs);
frrzmq_init();
- zmqsock = zmq_socket(frrzmq_context, ZMQ_REP);
+ zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER);
if (zmq_bind(zmqsock, "tcp://*:17171")) {
perror("zmq_bind");
exit(1);
}
- cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock);
+ frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb);
write(syncfd, &dummy, sizeof(dummy));
while (thread_fetch(master, &t))
server recv: msg #3 def
client recv: MSG #3 DEF
client send: msg #4 efg
-server recv part 0 (more: 0): msg #4 efg
+server recv part 2 (more: 0): msg #4 efg
client recv: MSG #4 EFG
+client recv: msg# was 2
client send: msg #5 fgh
-client recv: msg# was 0
+server recv part 2 (more: 0): msg #5 fgh
+client recv: MSG #5 FGH
+client recv: msg# was 2
client send: msg #6 ghi
-server recv part 0 (more: 0): msg #6 ghi
+server recv part 2 (more: 0): msg #6 ghi
client recv: MSG #6 GHI
+client recv: msg# was 2
client send: msg #7 hij
-client recv: msg# was 0
+server recv part 2 (more: 0): msg #7 hij
+client recv: MSG #7 HIJ
+client recv: msg# was 2
---
client send: part 1/2
client send: part 2/2
-server recv part 0 (more: 1): part 1/2
-server recv part 1 (more: 0): part 2/2
+server recv part 2 (more: 1): part 1/2
+server recv part 3 (more: 0): part 2/2
+client recv (more: 1): PART 1/2
client recv (more: 1): PART 2/2
-client recv (more: 0): msg# was 1
+client recv (more: 0): msg# was 3
---
client send: part 1/3
client send: part 2/3
client send: part 3/3
-server recv part 0 (more: 1): part 1/3
-server recv part 1 (more: 1): part 2/3
-server recv part 2 (more: 0): part 3/3
+server recv part 2 (more: 1): part 1/3
+server recv part 3 (more: 1): part 2/3
+server recv part 4 (more: 0): part 3/3
+client recv (more: 1): PART 1/3
+client recv (more: 1): PART 2/3
client recv (more: 1): PART 3/3
-client recv (more: 0): msg# was 2
+client recv (more: 0): msg# was 4
---
client send: part 1/4
client send: part 2/4
client send: part 3/4
client send: part 4/4
-server recv part 0 (more: 1): part 1/4
-server recv part 1 (more: 1): part 2/4
-server recv part 2 (more: 1): part 3/4
-server recv part 3 (more: 0): part 4/4
+server recv part 2 (more: 1): part 1/4
+server recv part 3 (more: 1): part 2/4
+server recv part 4 (more: 1): part 3/4
+server recv part 5 (more: 0): part 4/4
+client recv (more: 1): PART 1/4
+client recv (more: 1): PART 2/4
+client recv (more: 1): PART 3/4
client recv (more: 1): PART 4/4
-client recv (more: 0): msg# was 3
+client recv (more: 0): msg# was 5
+---
+client send: Done receiving
+server recv: Done receiving
+server send: TEST WRITE CALLBACK
+client recv: TEST WRITE CALLBACK
child exited.