]> git.proxmox.com Git - mirror_frr.git/blame - tests/lib/test_zmq.c
*: manual SPDX License ID conversions
[mirror_frr.git] / tests / lib / test_zmq.c
CommitLineData
a0b974de
DL
1/*
2 * ZeroMQ event test
3 * Copyright (C) 2017 David Lamparter, for NetDEF, Inc.
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20#include <zebra.h>
21#include "memory.h"
22#include "sigevent.h"
23#include "frr_zmq.h"
24
bf8d3d6a
DL
25DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer");
26DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message");
a0b974de
DL
27
28static struct thread_master *master;
29
30static void msg_buf_free(void *data, void *hint)
31{
32 XFREE(MTYPE_TESTBUF, data);
33}
34
afd0f10d 35static int recv_delim(void *zmqsock)
36{
37 /* receive delim */
38 zmq_msg_t zdelim;
39 int more;
40 zmq_msg_init(&zdelim);
41 zmq_msg_recv(&zdelim, zmqsock, 0);
42 more = zmq_msg_more(&zdelim);
43 zmq_msg_close(&zdelim);
44 return more;
45}
46static void send_delim(void *zmqsock)
47{
48 /* Send delim */
49 zmq_msg_t zdelim;
50 zmq_msg_init(&zdelim);
51 zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE);
52 zmq_msg_close(&zdelim);
53}
a0b974de
DL
54static void run_client(int syncfd)
55{
56 int i, j;
57 char buf[32];
58 char dummy;
59 void *zmqctx = NULL;
60 void *zmqsock;
afd0f10d 61 int more;
a0b974de
DL
62
63 read(syncfd, &dummy, 1);
64
65 zmqctx = zmq_ctx_new();
66 zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
67
afd0f10d 68 zmqsock = zmq_socket(zmqctx, ZMQ_DEALER);
a0b974de
DL
69 if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
70 perror("zmq_connect");
71 exit(1);
72 }
73
74 /* single-part */
75 for (i = 0; i < 8; i++) {
afd0f10d 76 snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i,
77 'b' + i, 'c' + i);
a0b974de
DL
78 printf("client send: %s\n", buf);
79 fflush(stdout);
afd0f10d 80 send_delim(zmqsock);
81 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
82 more = recv_delim(zmqsock);
83 while (more) {
84 zmq_recv(zmqsock, buf, sizeof(buf), 0);
85 printf("client recv: %s\n", buf);
86 size_t len = sizeof(more);
87 if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
88 break;
89 }
a0b974de
DL
90 }
91
92 /* multipart */
93 for (i = 2; i < 5; i++) {
a0b974de 94 printf("---\n");
afd0f10d 95 send_delim(zmqsock);
96 zmq_msg_t part;
a0b974de 97 for (j = 1; j <= i; j++) {
a0b974de
DL
98 char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
99
100 snprintf(dyn, 32, "part %d/%d", j, i);
101 printf("client send: %s\n", dyn);
102 fflush(stdout);
103
104 zmq_msg_init_data(&part, dyn, strlen(dyn) + 1,
105 msg_buf_free, NULL);
106 zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
107 }
108
afd0f10d 109 recv_delim(zmqsock);
a0b974de
DL
110 do {
111 char *data;
112
113 zmq_msg_recv(&part, zmqsock, 0);
114 data = zmq_msg_data(&part);
115 more = zmq_msg_more(&part);
116 printf("client recv (more: %d): %s\n", more, data);
117 } while (more);
118 zmq_msg_close(&part);
119 }
afd0f10d 120
121 /* write callback */
122 printf("---\n");
772270f3 123 snprintf(buf, sizeof(buf), "Done receiving");
afd0f10d 124 printf("client send: %s\n", buf);
125 fflush(stdout);
126 send_delim(zmqsock);
127 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
128 /* wait for message from server */
129 more = recv_delim(zmqsock);
130 while (more) {
131 zmq_recv(zmqsock, buf, sizeof(buf), 0);
132 printf("client recv: %s\n", buf);
133 size_t len = sizeof(more);
134 if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
135 break;
136 }
137
a0b974de
DL
138 zmq_close(zmqsock);
139 zmq_ctx_term(zmqctx);
140}
141
142static struct frrzmq_cb *cb;
143
afd0f10d 144static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
145{
146 /* receive id */
147 zmq_msg_init(msg_id);
148 zmq_msg_recv(msg_id, zmqsock, 0);
149 /* receive delim */
150 recv_delim(zmqsock);
151}
152static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
153{
154 /* Send Id */
155 zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE);
156 send_delim(zmqsock);
157}
158static void serverwritefn(void *arg, void *zmqsock)
159{
160 zmq_msg_t *msg_id = (zmq_msg_t *)arg;
161 char buf[32] = "Test write callback";
162 size_t i;
163
164 for (i = 0; i < strlen(buf); i++)
165 buf[i] = toupper(buf[i]);
166 printf("server send: %s\n", buf);
167 fflush(stdout);
168 send_id_and_delim(zmqsock, msg_id);
169 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
170
171 /* send just once */
172 frrzmq_thread_cancel(&cb, &cb->write);
173
174 zmq_msg_close(msg_id);
175 XFREE(MTYPE_ZMQMSG, msg_id);
176}
a0b974de 177static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
afd0f10d 178 unsigned partnum)
a0b974de 179{
afd0f10d 180 static int num = 0;
a0b974de
DL
181 int more = zmq_msg_more(msg);
182 char *in = zmq_msg_data(msg);
183 size_t i;
184 zmq_msg_t reply;
185 char *out;
186
afd0f10d 187 /* Id */
188 if (partnum == 0) {
189 send_id_and_delim(zmqsock, msg);
190 return;
191 }
192 /* Delim */
193 if (partnum == 1)
194 return;
195
196
a0b974de
DL
197 printf("server recv part %u (more: %d): %s\n", partnum, more, in);
198 fflush(stdout);
a0b974de
DL
199
200 out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
201 for (i = 0; i < strlen(in); i++)
202 out[i] = toupper(in[i]);
203 out[i] = '\0';
204 zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
205 zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
206
afd0f10d 207 if (more)
208 return;
209
a0b974de
DL
210 out = XMALLOC(MTYPE_TESTBUF, 32);
211 snprintf(out, 32, "msg# was %u", partnum);
212 zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
213 zmq_msg_send(&reply, zmqsock, 0);
afd0f10d 214
215 zmq_msg_close(&reply);
216
217 if (++num < 7)
218 return;
219
220 /* write callback test */
221 char buf[32];
222 zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t));
223 recv_id_and_delim(zmqsock, msg_id);
224 zmq_recv(zmqsock, buf, sizeof(buf), 0);
225 printf("server recv: %s\n", buf);
226 fflush(stdout);
227
228 frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id,
229 zmqsock, &cb);
a0b974de
DL
230}
231
232static void serverfn(void *arg, void *zmqsock)
233{
234 static int num = 0;
235
afd0f10d 236 zmq_msg_t msg_id;
a0b974de
DL
237 char buf[32];
238 size_t i;
afd0f10d 239
240 recv_id_and_delim(zmqsock, &msg_id);
a0b974de
DL
241 zmq_recv(zmqsock, buf, sizeof(buf), 0);
242
243 printf("server recv: %s\n", buf);
244 fflush(stdout);
245 for (i = 0; i < strlen(buf); i++)
246 buf[i] = toupper(buf[i]);
afd0f10d 247 send_id_and_delim(zmqsock, &msg_id);
248 zmq_msg_close(&msg_id);
a0b974de
DL
249 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
250
251 if (++num < 4)
252 return;
253
254 /* change to multipart callback */
afd0f10d 255 frrzmq_thread_cancel(&cb, &cb->read);
256 frrzmq_thread_cancel(&cb, &cb->write);
a0b974de 257
afd0f10d 258 frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock,
259 &cb);
a0b974de
DL
260}
261
262static void sigchld(void)
263{
264 printf("child exited.\n");
afd0f10d 265 frrzmq_thread_cancel(&cb, &cb->read);
266 frrzmq_thread_cancel(&cb, &cb->write);
a0b974de
DL
267}
268
7cc91e67 269static struct frr_signal_t sigs[] = {
a0b974de
DL
270 {
271 .signal = SIGCHLD,
272 .handler = sigchld,
273 },
274};
275
276static void run_server(int syncfd)
277{
278 void *zmqsock;
279 char dummy = 0;
280 struct thread t;
281
282 master = thread_master_create(NULL);
283 signal_init(master, array_size(sigs), sigs);
284 frrzmq_init();
285
afd0f10d 286 zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER);
a0b974de
DL
287 if (zmq_bind(zmqsock, "tcp://*:17171")) {
288 perror("zmq_bind");
289 exit(1);
290 }
291
afd0f10d 292 frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb);
a0b974de
DL
293
294 write(syncfd, &dummy, sizeof(dummy));
295 while (thread_fetch(master, &t))
296 thread_call(&t);
297
298 zmq_close(zmqsock);
299 frrzmq_finish();
300 thread_master_free(master);
301 log_memstats_stderr("test");
302}
303
304int main(void)
305{
306 int syncpipe[2];
307 pid_t child;
308
309 if (pipe(syncpipe)) {
310 perror("pipe");
311 exit(1);
312 }
313
314 child = fork();
315 if (child < 0) {
316 perror("fork");
317 exit(1);
318 } else if (child == 0) {
319 run_client(syncpipe[0]);
320 exit(0);
321 }
322
323 run_server(syncpipe[1]);
324 exit(0);
325}