]>
git.proxmox.com Git - mirror_frr.git/blob - tests/lib/test_zmq.c
3 * Copyright (C) 2017 David Lamparter, for NetDEF, Inc.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25 DEFINE_MTYPE_STATIC(LIB
, TESTBUF
, "zmq test buffer");
26 DEFINE_MTYPE_STATIC(LIB
, ZMQMSG
, "zmq message");
28 static struct thread_master
*master
;
30 static void msg_buf_free(void *data
, void *hint
)
32 XFREE(MTYPE_TESTBUF
, data
);
35 static int recv_delim(void *zmqsock
)
40 zmq_msg_init(&zdelim
);
41 zmq_msg_recv(&zdelim
, zmqsock
, 0);
42 more
= zmq_msg_more(&zdelim
);
43 zmq_msg_close(&zdelim
);
46 static void send_delim(void *zmqsock
)
50 zmq_msg_init(&zdelim
);
51 zmq_msg_send(&zdelim
, zmqsock
, ZMQ_SNDMORE
);
52 zmq_msg_close(&zdelim
);
54 static void run_client(int syncfd
)
63 read(syncfd
, &dummy
, 1);
65 zmqctx
= zmq_ctx_new();
66 zmq_ctx_set(zmqctx
, ZMQ_IPV6
, 1);
68 zmqsock
= zmq_socket(zmqctx
, ZMQ_DEALER
);
69 if (zmq_connect(zmqsock
, "tcp://127.0.0.1:17171")) {
70 perror("zmq_connect");
75 for (i
= 0; i
< 8; i
++) {
76 snprintf(buf
, sizeof(buf
), "msg #%d %c%c%c", i
, 'a' + i
,
78 printf("client send: %s\n", buf
);
81 zmq_send(zmqsock
, buf
, strlen(buf
) + 1, 0);
82 more
= recv_delim(zmqsock
);
84 zmq_recv(zmqsock
, buf
, sizeof(buf
), 0);
85 printf("client recv: %s\n", buf
);
86 size_t len
= sizeof(more
);
87 if (zmq_getsockopt(zmqsock
, ZMQ_RCVMORE
, &more
, &len
))
93 for (i
= 2; i
< 5; i
++) {
97 for (j
= 1; j
<= i
; j
++) {
98 char *dyn
= XMALLOC(MTYPE_TESTBUF
, 32);
100 snprintf(dyn
, 32, "part %d/%d", j
, i
);
101 printf("client send: %s\n", dyn
);
104 zmq_msg_init_data(&part
, dyn
, strlen(dyn
) + 1,
106 zmq_msg_send(&part
, zmqsock
, j
< i
? ZMQ_SNDMORE
: 0);
113 zmq_msg_recv(&part
, zmqsock
, 0);
114 data
= zmq_msg_data(&part
);
115 more
= zmq_msg_more(&part
);
116 printf("client recv (more: %d): %s\n", more
, data
);
118 zmq_msg_close(&part
);
123 snprintf(buf
, sizeof(buf
), "Done receiving");
124 printf("client send: %s\n", buf
);
127 zmq_send(zmqsock
, buf
, strlen(buf
) + 1, 0);
128 /* wait for message from server */
129 more
= recv_delim(zmqsock
);
131 zmq_recv(zmqsock
, buf
, sizeof(buf
), 0);
132 printf("client recv: %s\n", buf
);
133 size_t len
= sizeof(more
);
134 if (zmq_getsockopt(zmqsock
, ZMQ_RCVMORE
, &more
, &len
))
139 zmq_ctx_term(zmqctx
);
142 static struct frrzmq_cb
*cb
;
144 static void recv_id_and_delim(void *zmqsock
, zmq_msg_t
*msg_id
)
147 zmq_msg_init(msg_id
);
148 zmq_msg_recv(msg_id
, zmqsock
, 0);
152 static void send_id_and_delim(void *zmqsock
, zmq_msg_t
*msg_id
)
155 zmq_msg_send(msg_id
, zmqsock
, ZMQ_SNDMORE
);
158 static void serverwritefn(void *arg
, void *zmqsock
)
160 zmq_msg_t
*msg_id
= (zmq_msg_t
*)arg
;
161 char buf
[32] = "Test write callback";
164 for (i
= 0; i
< strlen(buf
); i
++)
165 buf
[i
] = toupper(buf
[i
]);
166 printf("server send: %s\n", buf
);
168 send_id_and_delim(zmqsock
, msg_id
);
169 zmq_send(zmqsock
, buf
, strlen(buf
) + 1, 0);
172 frrzmq_thread_cancel(&cb
, &cb
->write
);
174 zmq_msg_close(msg_id
);
175 XFREE(MTYPE_ZMQMSG
, msg_id
);
177 static void serverpartfn(void *arg
, void *zmqsock
, zmq_msg_t
*msg
,
181 int more
= zmq_msg_more(msg
);
182 char *in
= zmq_msg_data(msg
);
189 send_id_and_delim(zmqsock
, msg
);
197 printf("server recv part %u (more: %d): %s\n", partnum
, more
, in
);
200 out
= XMALLOC(MTYPE_TESTBUF
, strlen(in
) + 1);
201 for (i
= 0; i
< strlen(in
); i
++)
202 out
[i
] = toupper(in
[i
]);
204 zmq_msg_init_data(&reply
, out
, strlen(out
) + 1, msg_buf_free
, NULL
);
205 zmq_msg_send(&reply
, zmqsock
, ZMQ_SNDMORE
);
210 out
= XMALLOC(MTYPE_TESTBUF
, 32);
211 snprintf(out
, 32, "msg# was %u", partnum
);
212 zmq_msg_init_data(&reply
, out
, strlen(out
) + 1, msg_buf_free
, NULL
);
213 zmq_msg_send(&reply
, zmqsock
, 0);
215 zmq_msg_close(&reply
);
220 /* write callback test */
222 zmq_msg_t
*msg_id
= XMALLOC(MTYPE_ZMQMSG
, sizeof(zmq_msg_t
));
223 recv_id_and_delim(zmqsock
, msg_id
);
224 zmq_recv(zmqsock
, buf
, sizeof(buf
), 0);
225 printf("server recv: %s\n", buf
);
228 frrzmq_thread_add_write_msg(master
, serverwritefn
, NULL
, msg_id
,
232 static void serverfn(void *arg
, void *zmqsock
)
240 recv_id_and_delim(zmqsock
, &msg_id
);
241 zmq_recv(zmqsock
, buf
, sizeof(buf
), 0);
243 printf("server recv: %s\n", buf
);
245 for (i
= 0; i
< strlen(buf
); i
++)
246 buf
[i
] = toupper(buf
[i
]);
247 send_id_and_delim(zmqsock
, &msg_id
);
248 zmq_msg_close(&msg_id
);
249 zmq_send(zmqsock
, buf
, strlen(buf
) + 1, 0);
254 /* change to multipart callback */
255 frrzmq_thread_cancel(&cb
, &cb
->read
);
256 frrzmq_thread_cancel(&cb
, &cb
->write
);
258 frrzmq_thread_add_read_part(master
, serverpartfn
, NULL
, NULL
, zmqsock
,
262 static void sigchld(void)
264 printf("child exited.\n");
265 frrzmq_thread_cancel(&cb
, &cb
->read
);
266 frrzmq_thread_cancel(&cb
, &cb
->write
);
269 static struct frr_signal_t sigs
[] = {
276 static void run_server(int syncfd
)
282 master
= thread_master_create(NULL
);
283 signal_init(master
, array_size(sigs
), sigs
);
286 zmqsock
= zmq_socket(frrzmq_context
, ZMQ_ROUTER
);
287 if (zmq_bind(zmqsock
, "tcp://*:17171")) {
292 frrzmq_thread_add_read_msg(master
, serverfn
, NULL
, NULL
, zmqsock
, &cb
);
294 write(syncfd
, &dummy
, sizeof(dummy
));
295 while (thread_fetch(master
, &t
))
300 thread_master_free(master
);
301 log_memstats_stderr("test");
309 if (pipe(syncpipe
)) {
318 } else if (child
== 0) {
319 run_client(syncpipe
[0]);
323 run_server(syncpipe
[1]);