]>
Commit | Line | Data |
---|---|---|
acddc0ed | 1 | // SPDX-License-Identifier: GPL-2.0-or-later |
a0b974de DL |
2 | /* |
3 | * ZeroMQ event test | |
4 | * Copyright (C) 2017 David Lamparter, for NetDEF, Inc. | |
a0b974de DL |
5 | */ |
6 | ||
7 | #include <zebra.h> | |
8 | #include "memory.h" | |
9 | #include "sigevent.h" | |
10 | #include "frr_zmq.h" | |
11 | ||
bf8d3d6a DL |
12 | DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer"); |
13 | DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message"); | |
a0b974de DL |
14 | |
15 | static struct thread_master *master; | |
16 | ||
17 | static void msg_buf_free(void *data, void *hint) | |
18 | { | |
19 | XFREE(MTYPE_TESTBUF, data); | |
20 | } | |
21 | ||
afd0f10d | 22 | static int recv_delim(void *zmqsock) |
23 | { | |
24 | /* receive delim */ | |
25 | zmq_msg_t zdelim; | |
26 | int more; | |
27 | zmq_msg_init(&zdelim); | |
28 | zmq_msg_recv(&zdelim, zmqsock, 0); | |
29 | more = zmq_msg_more(&zdelim); | |
30 | zmq_msg_close(&zdelim); | |
31 | return more; | |
32 | } | |
33 | static void send_delim(void *zmqsock) | |
34 | { | |
35 | /* Send delim */ | |
36 | zmq_msg_t zdelim; | |
37 | zmq_msg_init(&zdelim); | |
38 | zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE); | |
39 | zmq_msg_close(&zdelim); | |
40 | } | |
a0b974de DL |
41 | static void run_client(int syncfd) |
42 | { | |
43 | int i, j; | |
44 | char buf[32]; | |
45 | char dummy; | |
46 | void *zmqctx = NULL; | |
47 | void *zmqsock; | |
afd0f10d | 48 | int more; |
a0b974de DL |
49 | |
50 | read(syncfd, &dummy, 1); | |
51 | ||
52 | zmqctx = zmq_ctx_new(); | |
53 | zmq_ctx_set(zmqctx, ZMQ_IPV6, 1); | |
54 | ||
afd0f10d | 55 | zmqsock = zmq_socket(zmqctx, ZMQ_DEALER); |
a0b974de DL |
56 | if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) { |
57 | perror("zmq_connect"); | |
58 | exit(1); | |
59 | } | |
60 | ||
61 | /* single-part */ | |
62 | for (i = 0; i < 8; i++) { | |
afd0f10d | 63 | snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i, |
64 | 'b' + i, 'c' + i); | |
a0b974de DL |
65 | printf("client send: %s\n", buf); |
66 | fflush(stdout); | |
afd0f10d | 67 | send_delim(zmqsock); |
68 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); | |
69 | more = recv_delim(zmqsock); | |
70 | while (more) { | |
71 | zmq_recv(zmqsock, buf, sizeof(buf), 0); | |
72 | printf("client recv: %s\n", buf); | |
73 | size_t len = sizeof(more); | |
74 | if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) | |
75 | break; | |
76 | } | |
a0b974de DL |
77 | } |
78 | ||
79 | /* multipart */ | |
80 | for (i = 2; i < 5; i++) { | |
a0b974de | 81 | printf("---\n"); |
afd0f10d | 82 | send_delim(zmqsock); |
83 | zmq_msg_t part; | |
a0b974de | 84 | for (j = 1; j <= i; j++) { |
a0b974de DL |
85 | char *dyn = XMALLOC(MTYPE_TESTBUF, 32); |
86 | ||
87 | snprintf(dyn, 32, "part %d/%d", j, i); | |
88 | printf("client send: %s\n", dyn); | |
89 | fflush(stdout); | |
90 | ||
91 | zmq_msg_init_data(&part, dyn, strlen(dyn) + 1, | |
92 | msg_buf_free, NULL); | |
93 | zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0); | |
94 | } | |
95 | ||
afd0f10d | 96 | recv_delim(zmqsock); |
a0b974de DL |
97 | do { |
98 | char *data; | |
99 | ||
100 | zmq_msg_recv(&part, zmqsock, 0); | |
101 | data = zmq_msg_data(&part); | |
102 | more = zmq_msg_more(&part); | |
103 | printf("client recv (more: %d): %s\n", more, data); | |
104 | } while (more); | |
105 | zmq_msg_close(&part); | |
106 | } | |
afd0f10d | 107 | |
108 | /* write callback */ | |
109 | printf("---\n"); | |
772270f3 | 110 | snprintf(buf, sizeof(buf), "Done receiving"); |
afd0f10d | 111 | printf("client send: %s\n", buf); |
112 | fflush(stdout); | |
113 | send_delim(zmqsock); | |
114 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); | |
115 | /* wait for message from server */ | |
116 | more = recv_delim(zmqsock); | |
117 | while (more) { | |
118 | zmq_recv(zmqsock, buf, sizeof(buf), 0); | |
119 | printf("client recv: %s\n", buf); | |
120 | size_t len = sizeof(more); | |
121 | if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) | |
122 | break; | |
123 | } | |
124 | ||
a0b974de DL |
125 | zmq_close(zmqsock); |
126 | zmq_ctx_term(zmqctx); | |
127 | } | |
128 | ||
129 | static struct frrzmq_cb *cb; | |
130 | ||
afd0f10d | 131 | static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) |
132 | { | |
133 | /* receive id */ | |
134 | zmq_msg_init(msg_id); | |
135 | zmq_msg_recv(msg_id, zmqsock, 0); | |
136 | /* receive delim */ | |
137 | recv_delim(zmqsock); | |
138 | } | |
139 | static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) | |
140 | { | |
141 | /* Send Id */ | |
142 | zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE); | |
143 | send_delim(zmqsock); | |
144 | } | |
145 | static void serverwritefn(void *arg, void *zmqsock) | |
146 | { | |
147 | zmq_msg_t *msg_id = (zmq_msg_t *)arg; | |
148 | char buf[32] = "Test write callback"; | |
149 | size_t i; | |
150 | ||
151 | for (i = 0; i < strlen(buf); i++) | |
152 | buf[i] = toupper(buf[i]); | |
153 | printf("server send: %s\n", buf); | |
154 | fflush(stdout); | |
155 | send_id_and_delim(zmqsock, msg_id); | |
156 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); | |
157 | ||
158 | /* send just once */ | |
159 | frrzmq_thread_cancel(&cb, &cb->write); | |
160 | ||
161 | zmq_msg_close(msg_id); | |
162 | XFREE(MTYPE_ZMQMSG, msg_id); | |
163 | } | |
a0b974de | 164 | static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, |
afd0f10d | 165 | unsigned partnum) |
a0b974de | 166 | { |
afd0f10d | 167 | static int num = 0; |
a0b974de DL |
168 | int more = zmq_msg_more(msg); |
169 | char *in = zmq_msg_data(msg); | |
170 | size_t i; | |
171 | zmq_msg_t reply; | |
172 | char *out; | |
173 | ||
afd0f10d | 174 | /* Id */ |
175 | if (partnum == 0) { | |
176 | send_id_and_delim(zmqsock, msg); | |
177 | return; | |
178 | } | |
179 | /* Delim */ | |
180 | if (partnum == 1) | |
181 | return; | |
182 | ||
183 | ||
a0b974de DL |
184 | printf("server recv part %u (more: %d): %s\n", partnum, more, in); |
185 | fflush(stdout); | |
a0b974de DL |
186 | |
187 | out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1); | |
188 | for (i = 0; i < strlen(in); i++) | |
189 | out[i] = toupper(in[i]); | |
190 | out[i] = '\0'; | |
191 | zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); | |
192 | zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE); | |
193 | ||
afd0f10d | 194 | if (more) |
195 | return; | |
196 | ||
a0b974de DL |
197 | out = XMALLOC(MTYPE_TESTBUF, 32); |
198 | snprintf(out, 32, "msg# was %u", partnum); | |
199 | zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); | |
200 | zmq_msg_send(&reply, zmqsock, 0); | |
afd0f10d | 201 | |
202 | zmq_msg_close(&reply); | |
203 | ||
204 | if (++num < 7) | |
205 | return; | |
206 | ||
207 | /* write callback test */ | |
208 | char buf[32]; | |
209 | zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t)); | |
210 | recv_id_and_delim(zmqsock, msg_id); | |
211 | zmq_recv(zmqsock, buf, sizeof(buf), 0); | |
212 | printf("server recv: %s\n", buf); | |
213 | fflush(stdout); | |
214 | ||
215 | frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id, | |
216 | zmqsock, &cb); | |
a0b974de DL |
217 | } |
218 | ||
219 | static void serverfn(void *arg, void *zmqsock) | |
220 | { | |
221 | static int num = 0; | |
222 | ||
afd0f10d | 223 | zmq_msg_t msg_id; |
a0b974de DL |
224 | char buf[32]; |
225 | size_t i; | |
afd0f10d | 226 | |
227 | recv_id_and_delim(zmqsock, &msg_id); | |
a0b974de DL |
228 | zmq_recv(zmqsock, buf, sizeof(buf), 0); |
229 | ||
230 | printf("server recv: %s\n", buf); | |
231 | fflush(stdout); | |
232 | for (i = 0; i < strlen(buf); i++) | |
233 | buf[i] = toupper(buf[i]); | |
afd0f10d | 234 | send_id_and_delim(zmqsock, &msg_id); |
235 | zmq_msg_close(&msg_id); | |
a0b974de DL |
236 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); |
237 | ||
238 | if (++num < 4) | |
239 | return; | |
240 | ||
241 | /* change to multipart callback */ | |
afd0f10d | 242 | frrzmq_thread_cancel(&cb, &cb->read); |
243 | frrzmq_thread_cancel(&cb, &cb->write); | |
a0b974de | 244 | |
afd0f10d | 245 | frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock, |
246 | &cb); | |
a0b974de DL |
247 | } |
248 | ||
249 | static void sigchld(void) | |
250 | { | |
251 | printf("child exited.\n"); | |
afd0f10d | 252 | frrzmq_thread_cancel(&cb, &cb->read); |
253 | frrzmq_thread_cancel(&cb, &cb->write); | |
a0b974de DL |
254 | } |
255 | ||
7cc91e67 | 256 | static struct frr_signal_t sigs[] = { |
a0b974de DL |
257 | { |
258 | .signal = SIGCHLD, | |
259 | .handler = sigchld, | |
260 | }, | |
261 | }; | |
262 | ||
263 | static void run_server(int syncfd) | |
264 | { | |
265 | void *zmqsock; | |
266 | char dummy = 0; | |
267 | struct thread t; | |
268 | ||
269 | master = thread_master_create(NULL); | |
270 | signal_init(master, array_size(sigs), sigs); | |
271 | frrzmq_init(); | |
272 | ||
afd0f10d | 273 | zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER); |
a0b974de DL |
274 | if (zmq_bind(zmqsock, "tcp://*:17171")) { |
275 | perror("zmq_bind"); | |
276 | exit(1); | |
277 | } | |
278 | ||
afd0f10d | 279 | frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb); |
a0b974de DL |
280 | |
281 | write(syncfd, &dummy, sizeof(dummy)); | |
282 | while (thread_fetch(master, &t)) | |
283 | thread_call(&t); | |
284 | ||
285 | zmq_close(zmqsock); | |
286 | frrzmq_finish(); | |
287 | thread_master_free(master); | |
288 | log_memstats_stderr("test"); | |
289 | } | |
290 | ||
291 | int main(void) | |
292 | { | |
293 | int syncpipe[2]; | |
294 | pid_t child; | |
295 | ||
296 | if (pipe(syncpipe)) { | |
297 | perror("pipe"); | |
298 | exit(1); | |
299 | } | |
300 | ||
301 | child = fork(); | |
302 | if (child < 0) { | |
303 | perror("fork"); | |
304 | exit(1); | |
305 | } else if (child == 0) { | |
306 | run_client(syncpipe[0]); | |
307 | exit(0); | |
308 | } | |
309 | ||
310 | run_server(syncpipe[1]); | |
311 | exit(0); | |
312 | } |