]> git.proxmox.com Git - mirror_frr.git/blob - tests/lib/test_zmq.c
*: Rename `struct thread` to `struct event`
[mirror_frr.git] / tests / lib / test_zmq.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * ZeroMQ event test
4 * Copyright (C) 2017 David Lamparter, for NetDEF, Inc.
5 */
6
7 #include <zebra.h>
8 #include "memory.h"
9 #include "sigevent.h"
10 #include "frr_zmq.h"
11
12 DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer");
13 DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message");
14
15 static struct thread_master *master;
16
17 static void msg_buf_free(void *data, void *hint)
18 {
19 XFREE(MTYPE_TESTBUF, data);
20 }
21
22 static int recv_delim(void *zmqsock)
23 {
24 /* receive delim */
25 zmq_msg_t zdelim;
26 int more;
27 zmq_msg_init(&zdelim);
28 zmq_msg_recv(&zdelim, zmqsock, 0);
29 more = zmq_msg_more(&zdelim);
30 zmq_msg_close(&zdelim);
31 return more;
32 }
33 static void send_delim(void *zmqsock)
34 {
35 /* Send delim */
36 zmq_msg_t zdelim;
37 zmq_msg_init(&zdelim);
38 zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE);
39 zmq_msg_close(&zdelim);
40 }
41 static void run_client(int syncfd)
42 {
43 int i, j;
44 char buf[32];
45 char dummy;
46 void *zmqctx = NULL;
47 void *zmqsock;
48 int more;
49
50 read(syncfd, &dummy, 1);
51
52 zmqctx = zmq_ctx_new();
53 zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
54
55 zmqsock = zmq_socket(zmqctx, ZMQ_DEALER);
56 if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
57 perror("zmq_connect");
58 exit(1);
59 }
60
61 /* single-part */
62 for (i = 0; i < 8; i++) {
63 snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i,
64 'b' + i, 'c' + i);
65 printf("client send: %s\n", buf);
66 fflush(stdout);
67 send_delim(zmqsock);
68 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
69 more = recv_delim(zmqsock);
70 while (more) {
71 zmq_recv(zmqsock, buf, sizeof(buf), 0);
72 printf("client recv: %s\n", buf);
73 size_t len = sizeof(more);
74 if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
75 break;
76 }
77 }
78
79 /* multipart */
80 for (i = 2; i < 5; i++) {
81 printf("---\n");
82 send_delim(zmqsock);
83 zmq_msg_t part;
84 for (j = 1; j <= i; j++) {
85 char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
86
87 snprintf(dyn, 32, "part %d/%d", j, i);
88 printf("client send: %s\n", dyn);
89 fflush(stdout);
90
91 zmq_msg_init_data(&part, dyn, strlen(dyn) + 1,
92 msg_buf_free, NULL);
93 zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
94 }
95
96 recv_delim(zmqsock);
97 do {
98 char *data;
99
100 zmq_msg_recv(&part, zmqsock, 0);
101 data = zmq_msg_data(&part);
102 more = zmq_msg_more(&part);
103 printf("client recv (more: %d): %s\n", more, data);
104 } while (more);
105 zmq_msg_close(&part);
106 }
107
108 /* write callback */
109 printf("---\n");
110 snprintf(buf, sizeof(buf), "Done receiving");
111 printf("client send: %s\n", buf);
112 fflush(stdout);
113 send_delim(zmqsock);
114 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
115 /* wait for message from server */
116 more = recv_delim(zmqsock);
117 while (more) {
118 zmq_recv(zmqsock, buf, sizeof(buf), 0);
119 printf("client recv: %s\n", buf);
120 size_t len = sizeof(more);
121 if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
122 break;
123 }
124
125 zmq_close(zmqsock);
126 zmq_ctx_term(zmqctx);
127 }
128
129 static struct frrzmq_cb *cb;
130
131 static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
132 {
133 /* receive id */
134 zmq_msg_init(msg_id);
135 zmq_msg_recv(msg_id, zmqsock, 0);
136 /* receive delim */
137 recv_delim(zmqsock);
138 }
139 static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
140 {
141 /* Send Id */
142 zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE);
143 send_delim(zmqsock);
144 }
145 static void serverwritefn(void *arg, void *zmqsock)
146 {
147 zmq_msg_t *msg_id = (zmq_msg_t *)arg;
148 char buf[32] = "Test write callback";
149 size_t i;
150
151 for (i = 0; i < strlen(buf); i++)
152 buf[i] = toupper(buf[i]);
153 printf("server send: %s\n", buf);
154 fflush(stdout);
155 send_id_and_delim(zmqsock, msg_id);
156 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
157
158 /* send just once */
159 frrzmq_thread_cancel(&cb, &cb->write);
160
161 zmq_msg_close(msg_id);
162 XFREE(MTYPE_ZMQMSG, msg_id);
163 }
164 static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
165 unsigned partnum)
166 {
167 static int num = 0;
168 int more = zmq_msg_more(msg);
169 char *in = zmq_msg_data(msg);
170 size_t i;
171 zmq_msg_t reply;
172 char *out;
173
174 /* Id */
175 if (partnum == 0) {
176 send_id_and_delim(zmqsock, msg);
177 return;
178 }
179 /* Delim */
180 if (partnum == 1)
181 return;
182
183
184 printf("server recv part %u (more: %d): %s\n", partnum, more, in);
185 fflush(stdout);
186
187 out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
188 for (i = 0; i < strlen(in); i++)
189 out[i] = toupper(in[i]);
190 out[i] = '\0';
191 zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
192 zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
193
194 if (more)
195 return;
196
197 out = XMALLOC(MTYPE_TESTBUF, 32);
198 snprintf(out, 32, "msg# was %u", partnum);
199 zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
200 zmq_msg_send(&reply, zmqsock, 0);
201
202 zmq_msg_close(&reply);
203
204 if (++num < 7)
205 return;
206
207 /* write callback test */
208 char buf[32];
209 zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t));
210 recv_id_and_delim(zmqsock, msg_id);
211 zmq_recv(zmqsock, buf, sizeof(buf), 0);
212 printf("server recv: %s\n", buf);
213 fflush(stdout);
214
215 frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id,
216 zmqsock, &cb);
217 }
218
219 static void serverfn(void *arg, void *zmqsock)
220 {
221 static int num = 0;
222
223 zmq_msg_t msg_id;
224 char buf[32];
225 size_t i;
226
227 recv_id_and_delim(zmqsock, &msg_id);
228 zmq_recv(zmqsock, buf, sizeof(buf), 0);
229
230 printf("server recv: %s\n", buf);
231 fflush(stdout);
232 for (i = 0; i < strlen(buf); i++)
233 buf[i] = toupper(buf[i]);
234 send_id_and_delim(zmqsock, &msg_id);
235 zmq_msg_close(&msg_id);
236 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
237
238 if (++num < 4)
239 return;
240
241 /* change to multipart callback */
242 frrzmq_thread_cancel(&cb, &cb->read);
243 frrzmq_thread_cancel(&cb, &cb->write);
244
245 frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock,
246 &cb);
247 }
248
249 static void sigchld(void)
250 {
251 printf("child exited.\n");
252 frrzmq_thread_cancel(&cb, &cb->read);
253 frrzmq_thread_cancel(&cb, &cb->write);
254 }
255
256 static struct frr_signal_t sigs[] = {
257 {
258 .signal = SIGCHLD,
259 .handler = sigchld,
260 },
261 };
262
263 static void run_server(int syncfd)
264 {
265 void *zmqsock;
266 char dummy = 0;
267 struct event t;
268
269 master = thread_master_create(NULL);
270 signal_init(master, array_size(sigs), sigs);
271 frrzmq_init();
272
273 zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER);
274 if (zmq_bind(zmqsock, "tcp://*:17171")) {
275 perror("zmq_bind");
276 exit(1);
277 }
278
279 frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb);
280
281 write(syncfd, &dummy, sizeof(dummy));
282 while (thread_fetch(master, &t))
283 thread_call(&t);
284
285 zmq_close(zmqsock);
286 frrzmq_finish();
287 thread_master_free(master);
288 log_memstats_stderr("test");
289 }
290
291 int main(void)
292 {
293 int syncpipe[2];
294 pid_t child;
295
296 if (pipe(syncpipe)) {
297 perror("pipe");
298 exit(1);
299 }
300
301 child = fork();
302 if (child < 0) {
303 perror("fork");
304 exit(1);
305 } else if (child == 0) {
306 run_client(syncpipe[0]);
307 exit(0);
308 }
309
310 run_server(syncpipe[1]);
311 exit(0);
312 }