]> git.proxmox.com Git - mirror_frr.git/blobdiff - tests/lib/test_zmq.c
Merge pull request #8626 from idryzhov/fix-isis-topo1
[mirror_frr.git] / tests / lib / test_zmq.c
index c270ec3d18f8faa240f90c070d29c128d932f6b2..65195aa3e1be361eed5ed94d5e3ecc22b8904e88 100644 (file)
@@ -22,7 +22,8 @@
 #include "sigevent.h"
 #include "frr_zmq.h"
 
-DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer")
+DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer");
+DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message");
 
 static struct thread_master *master;
 
@@ -31,6 +32,25 @@ static void msg_buf_free(void *data, void *hint)
        XFREE(MTYPE_TESTBUF, data);
 }
 
+static int recv_delim(void *zmqsock)
+{
+       /* receive delim */
+       zmq_msg_t zdelim;
+       int more;
+       zmq_msg_init(&zdelim);
+       zmq_msg_recv(&zdelim, zmqsock, 0);
+       more = zmq_msg_more(&zdelim);
+       zmq_msg_close(&zdelim);
+       return more;
+}
+static void send_delim(void *zmqsock)
+{
+       /* Send delim */
+       zmq_msg_t zdelim;
+       zmq_msg_init(&zdelim);
+       zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE);
+       zmq_msg_close(&zdelim);
+}
 static void run_client(int syncfd)
 {
        int i, j;
@@ -38,13 +58,14 @@ static void run_client(int syncfd)
        char dummy;
        void *zmqctx = NULL;
        void *zmqsock;
+       int more;
 
        read(syncfd, &dummy, 1);
 
        zmqctx = zmq_ctx_new();
        zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
 
-       zmqsock = zmq_socket(zmqctx, ZMQ_REQ);
+       zmqsock = zmq_socket(zmqctx, ZMQ_DEALER);
        if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
                perror("zmq_connect");
                exit(1);
@@ -52,22 +73,28 @@ static void run_client(int syncfd)
 
        /* single-part */
        for (i = 0; i < 8; i++) {
-               snprintf(buf, sizeof(buf), "msg #%d %c%c%c",
-                        i, 'a' + i, 'b' + i, 'c' + i);
+               snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i,
+                        'b' + i, 'c' + i);
                printf("client send: %s\n", buf);
                fflush(stdout);
-               zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
-               zmq_recv(zmqsock, buf, sizeof(buf), 0);
-               printf("client recv: %s\n", buf);
+               send_delim(zmqsock);
+               zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+               more = recv_delim(zmqsock);
+               while (more) {
+                       zmq_recv(zmqsock, buf, sizeof(buf), 0);
+                       printf("client recv: %s\n", buf);
+                       size_t len = sizeof(more);
+                       if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+                               break;
+               }
        }
 
        /* multipart */
        for (i = 2; i < 5; i++) {
-               int more;
-
                printf("---\n");
+               send_delim(zmqsock);
+               zmq_msg_t part;
                for (j = 1; j <= i; j++) {
-                       zmq_msg_t part;
                        char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
 
                        snprintf(dyn, 32, "part %d/%d", j, i);
@@ -79,7 +106,7 @@ static void run_client(int syncfd)
                        zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
                }
 
-               zmq_msg_t part;
+               recv_delim(zmqsock);
                do {
                        char *data;
 
@@ -90,26 +117,85 @@ static void run_client(int syncfd)
                } while (more);
                zmq_msg_close(&part);
        }
+
+       /* write callback */
+       printf("---\n");
+       snprintf(buf, sizeof(buf), "Done receiving");
+       printf("client send: %s\n", buf);
+       fflush(stdout);
+       send_delim(zmqsock);
+       zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+       /* wait for message from server */
+       more = recv_delim(zmqsock);
+       while (more) {
+               zmq_recv(zmqsock, buf, sizeof(buf), 0);
+               printf("client recv: %s\n", buf);
+               size_t len = sizeof(more);
+               if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+                       break;
+       }
+
        zmq_close(zmqsock);
        zmq_ctx_term(zmqctx);
 }
 
 static struct frrzmq_cb *cb;
 
+static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+       /* receive id */
+       zmq_msg_init(msg_id);
+       zmq_msg_recv(msg_id, zmqsock, 0);
+       /* receive delim */
+       recv_delim(zmqsock);
+}
+static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+       /* Send Id */
+       zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE);
+       send_delim(zmqsock);
+}
+static void serverwritefn(void *arg, void *zmqsock)
+{
+       zmq_msg_t *msg_id = (zmq_msg_t *)arg;
+       char buf[32] = "Test write callback";
+       size_t i;
+
+       for (i = 0; i < strlen(buf); i++)
+               buf[i] = toupper(buf[i]);
+       printf("server send: %s\n", buf);
+       fflush(stdout);
+       send_id_and_delim(zmqsock, msg_id);
+       zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+
+       /* send just once */
+       frrzmq_thread_cancel(&cb, &cb->write);
+
+       zmq_msg_close(msg_id);
+       XFREE(MTYPE_ZMQMSG, msg_id);
+}
 static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
-                       unsigned partnum)
+                        unsigned partnum)
 {
+       static int num = 0;
        int more = zmq_msg_more(msg);
        char *in = zmq_msg_data(msg);
        size_t i;
        zmq_msg_t reply;
        char *out;
 
+       /* Id */
+       if (partnum == 0) {
+               send_id_and_delim(zmqsock, msg);
+               return;
+       }
+       /* Delim */
+       if (partnum == 1)
+               return;
+
+
        printf("server recv part %u (more: %d): %s\n", partnum, more, in);
        fflush(stdout);
-       /* REQ-REP doesn't allow sending a reply here */
-       if (more)
-               return;
 
        out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
        for (i = 0; i < strlen(in); i++)
@@ -118,39 +204,66 @@ static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
        zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
        zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
 
+       if (more)
+               return;
+
        out = XMALLOC(MTYPE_TESTBUF, 32);
        snprintf(out, 32, "msg# was %u", partnum);
        zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
        zmq_msg_send(&reply, zmqsock, 0);
+
+       zmq_msg_close(&reply);
+
+       if (++num < 7)
+               return;
+
+       /* write callback test */
+       char buf[32];
+       zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t));
+       recv_id_and_delim(zmqsock, msg_id);
+       zmq_recv(zmqsock, buf, sizeof(buf), 0);
+       printf("server recv: %s\n", buf);
+       fflush(stdout);
+
+       frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id,
+                                   zmqsock, &cb);
 }
 
 static void serverfn(void *arg, void *zmqsock)
 {
        static int num = 0;
 
+       zmq_msg_t msg_id;
        char buf[32];
        size_t i;
+
+       recv_id_and_delim(zmqsock, &msg_id);
        zmq_recv(zmqsock, buf, sizeof(buf), 0);
 
        printf("server recv: %s\n", buf);
        fflush(stdout);
        for (i = 0; i < strlen(buf); i++)
                buf[i] = toupper(buf[i]);
+       send_id_and_delim(zmqsock, &msg_id);
+       zmq_msg_close(&msg_id);
        zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
 
        if (++num < 4)
                return;
 
        /* change to multipart callback */
-       frrzmq_thread_cancel(cb);
+       frrzmq_thread_cancel(&cb, &cb->read);
+       frrzmq_thread_cancel(&cb, &cb->write);
 
-       cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock);
+       frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock,
+                                   &cb);
 }
 
 static void sigchld(void)
 {
        printf("child exited.\n");
-       frrzmq_thread_cancel(cb);
+       frrzmq_thread_cancel(&cb, &cb->read);
+       frrzmq_thread_cancel(&cb, &cb->write);
 }
 
 static struct quagga_signal_t sigs[] = {
@@ -170,13 +283,13 @@ static void run_server(int syncfd)
        signal_init(master, array_size(sigs), sigs);
        frrzmq_init();
 
-       zmqsock = zmq_socket(frrzmq_context, ZMQ_REP);
+       zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER);
        if (zmq_bind(zmqsock, "tcp://*:17171")) {
                perror("zmq_bind");
                exit(1);
        }
 
-       cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock);
+       frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb);
 
        write(syncfd, &dummy, sizeof(dummy));
        while (thread_fetch(master, &t))