]>
git.proxmox.com Git - mirror_frr.git/blob - tests/lib/test_zmq.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
4 * Copyright (C) 2017 David Lamparter, for NetDEF, Inc.
12 DEFINE_MTYPE_STATIC(LIB
, TESTBUF
, "zmq test buffer");
13 DEFINE_MTYPE_STATIC(LIB
, ZMQMSG
, "zmq message");
15 static struct event_loop
*master
;
17 static void msg_buf_free(void *data
, void *hint
)
19 XFREE(MTYPE_TESTBUF
, data
);
22 static int recv_delim(void *zmqsock
)
27 zmq_msg_init(&zdelim
);
28 zmq_msg_recv(&zdelim
, zmqsock
, 0);
29 more
= zmq_msg_more(&zdelim
);
30 zmq_msg_close(&zdelim
);
33 static void send_delim(void *zmqsock
)
37 zmq_msg_init(&zdelim
);
38 zmq_msg_send(&zdelim
, zmqsock
, ZMQ_SNDMORE
);
39 zmq_msg_close(&zdelim
);
41 static void run_client(int syncfd
)
50 read(syncfd
, &dummy
, 1);
52 zmqctx
= zmq_ctx_new();
53 zmq_ctx_set(zmqctx
, ZMQ_IPV6
, 1);
55 zmqsock
= zmq_socket(zmqctx
, ZMQ_DEALER
);
56 if (zmq_connect(zmqsock
, "tcp://127.0.0.1:17171")) {
57 perror("zmq_connect");
62 for (i
= 0; i
< 8; i
++) {
63 snprintf(buf
, sizeof(buf
), "msg #%d %c%c%c", i
, 'a' + i
,
65 printf("client send: %s\n", buf
);
68 zmq_send(zmqsock
, buf
, strlen(buf
) + 1, 0);
69 more
= recv_delim(zmqsock
);
71 zmq_recv(zmqsock
, buf
, sizeof(buf
), 0);
72 printf("client recv: %s\n", buf
);
73 size_t len
= sizeof(more
);
74 if (zmq_getsockopt(zmqsock
, ZMQ_RCVMORE
, &more
, &len
))
80 for (i
= 2; i
< 5; i
++) {
84 for (j
= 1; j
<= i
; j
++) {
85 char *dyn
= XMALLOC(MTYPE_TESTBUF
, 32);
87 snprintf(dyn
, 32, "part %d/%d", j
, i
);
88 printf("client send: %s\n", dyn
);
91 zmq_msg_init_data(&part
, dyn
, strlen(dyn
) + 1,
93 zmq_msg_send(&part
, zmqsock
, j
< i
? ZMQ_SNDMORE
: 0);
100 zmq_msg_recv(&part
, zmqsock
, 0);
101 data
= zmq_msg_data(&part
);
102 more
= zmq_msg_more(&part
);
103 printf("client recv (more: %d): %s\n", more
, data
);
105 zmq_msg_close(&part
);
110 snprintf(buf
, sizeof(buf
), "Done receiving");
111 printf("client send: %s\n", buf
);
114 zmq_send(zmqsock
, buf
, strlen(buf
) + 1, 0);
115 /* wait for message from server */
116 more
= recv_delim(zmqsock
);
118 zmq_recv(zmqsock
, buf
, sizeof(buf
), 0);
119 printf("client recv: %s\n", buf
);
120 size_t len
= sizeof(more
);
121 if (zmq_getsockopt(zmqsock
, ZMQ_RCVMORE
, &more
, &len
))
126 zmq_ctx_term(zmqctx
);
129 static struct frrzmq_cb
*cb
;
131 static void recv_id_and_delim(void *zmqsock
, zmq_msg_t
*msg_id
)
134 zmq_msg_init(msg_id
);
135 zmq_msg_recv(msg_id
, zmqsock
, 0);
139 static void send_id_and_delim(void *zmqsock
, zmq_msg_t
*msg_id
)
142 zmq_msg_send(msg_id
, zmqsock
, ZMQ_SNDMORE
);
145 static void serverwritefn(void *arg
, void *zmqsock
)
147 zmq_msg_t
*msg_id
= (zmq_msg_t
*)arg
;
148 char buf
[32] = "Test write callback";
151 for (i
= 0; i
< strlen(buf
); i
++)
152 buf
[i
] = toupper(buf
[i
]);
153 printf("server send: %s\n", buf
);
155 send_id_and_delim(zmqsock
, msg_id
);
156 zmq_send(zmqsock
, buf
, strlen(buf
) + 1, 0);
159 frrzmq_thread_cancel(&cb
, &cb
->write
);
161 zmq_msg_close(msg_id
);
162 XFREE(MTYPE_ZMQMSG
, msg_id
);
164 static void serverpartfn(void *arg
, void *zmqsock
, zmq_msg_t
*msg
,
168 int more
= zmq_msg_more(msg
);
169 char *in
= zmq_msg_data(msg
);
176 send_id_and_delim(zmqsock
, msg
);
184 printf("server recv part %u (more: %d): %s\n", partnum
, more
, in
);
187 out
= XMALLOC(MTYPE_TESTBUF
, strlen(in
) + 1);
188 for (i
= 0; i
< strlen(in
); i
++)
189 out
[i
] = toupper(in
[i
]);
191 zmq_msg_init_data(&reply
, out
, strlen(out
) + 1, msg_buf_free
, NULL
);
192 zmq_msg_send(&reply
, zmqsock
, ZMQ_SNDMORE
);
197 out
= XMALLOC(MTYPE_TESTBUF
, 32);
198 snprintf(out
, 32, "msg# was %u", partnum
);
199 zmq_msg_init_data(&reply
, out
, strlen(out
) + 1, msg_buf_free
, NULL
);
200 zmq_msg_send(&reply
, zmqsock
, 0);
202 zmq_msg_close(&reply
);
207 /* write callback test */
209 zmq_msg_t
*msg_id
= XMALLOC(MTYPE_ZMQMSG
, sizeof(zmq_msg_t
));
210 recv_id_and_delim(zmqsock
, msg_id
);
211 zmq_recv(zmqsock
, buf
, sizeof(buf
), 0);
212 printf("server recv: %s\n", buf
);
215 frrzmq_event_add_write_msg(master
, serverwritefn
, NULL
, msg_id
, zmqsock
,
219 static void serverfn(void *arg
, void *zmqsock
)
227 recv_id_and_delim(zmqsock
, &msg_id
);
228 zmq_recv(zmqsock
, buf
, sizeof(buf
), 0);
230 printf("server recv: %s\n", buf
);
232 for (i
= 0; i
< strlen(buf
); i
++)
233 buf
[i
] = toupper(buf
[i
]);
234 send_id_and_delim(zmqsock
, &msg_id
);
235 zmq_msg_close(&msg_id
);
236 zmq_send(zmqsock
, buf
, strlen(buf
) + 1, 0);
241 /* change to multipart callback */
242 frrzmq_thread_cancel(&cb
, &cb
->read
);
243 frrzmq_thread_cancel(&cb
, &cb
->write
);
245 frrzmq_event_add_read_part(master
, serverpartfn
, NULL
, NULL
, zmqsock
,
249 static void sigchld(void)
251 printf("child exited.\n");
252 frrzmq_thread_cancel(&cb
, &cb
->read
);
253 frrzmq_thread_cancel(&cb
, &cb
->write
);
256 static struct frr_signal_t sigs
[] = {
263 static void run_server(int syncfd
)
269 master
= event_master_create(NULL
);
270 signal_init(master
, array_size(sigs
), sigs
);
273 zmqsock
= zmq_socket(frrzmq_context
, ZMQ_ROUTER
);
274 if (zmq_bind(zmqsock
, "tcp://*:17171")) {
279 frrzmq_event_add_read_msg(master
, serverfn
, NULL
, NULL
, zmqsock
, &cb
);
281 write(syncfd
, &dummy
, sizeof(dummy
));
282 while (event_fetch(master
, &t
))
287 event_master_free(master
);
288 log_memstats_stderr("test");
296 if (pipe(syncpipe
)) {
305 } else if (child
== 0) {
306 run_client(syncpipe
[0]);
310 run_server(syncpipe
[1]);