]>
Commit | Line | Data |
---|---|---|
a0b974de DL |
1 | /* |
2 | * ZeroMQ event test | |
3 | * Copyright (C) 2017 David Lamparter, for NetDEF, Inc. | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify it | |
6 | * under the terms of the GNU General Public License as published by the Free | |
7 | * Software Foundation; either version 2 of the License, or (at your option) | |
8 | * any later version. | |
9 | * | |
10 | * This program is distributed in the hope that it will be useful, but WITHOUT | |
11 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
12 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
13 | * more details. | |
14 | * | |
15 | * You should have received a copy of the GNU General Public License along | |
16 | * with this program; see the file COPYING; if not, write to the Free Software | |
17 | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
18 | */ | |
19 | ||
20 | #include <zebra.h> | |
21 | #include "memory.h" | |
22 | #include "sigevent.h" | |
23 | #include "frr_zmq.h" | |
24 | ||
bf8d3d6a DL |
25 | DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer"); |
26 | DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message"); | |
a0b974de DL |
27 | |
28 | static struct thread_master *master; | |
29 | ||
30 | static void msg_buf_free(void *data, void *hint) | |
31 | { | |
32 | XFREE(MTYPE_TESTBUF, data); | |
33 | } | |
34 | ||
afd0f10d | 35 | static int recv_delim(void *zmqsock) |
36 | { | |
37 | /* receive delim */ | |
38 | zmq_msg_t zdelim; | |
39 | int more; | |
40 | zmq_msg_init(&zdelim); | |
41 | zmq_msg_recv(&zdelim, zmqsock, 0); | |
42 | more = zmq_msg_more(&zdelim); | |
43 | zmq_msg_close(&zdelim); | |
44 | return more; | |
45 | } | |
46 | static void send_delim(void *zmqsock) | |
47 | { | |
48 | /* Send delim */ | |
49 | zmq_msg_t zdelim; | |
50 | zmq_msg_init(&zdelim); | |
51 | zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE); | |
52 | zmq_msg_close(&zdelim); | |
53 | } | |
a0b974de DL |
54 | static void run_client(int syncfd) |
55 | { | |
56 | int i, j; | |
57 | char buf[32]; | |
58 | char dummy; | |
59 | void *zmqctx = NULL; | |
60 | void *zmqsock; | |
afd0f10d | 61 | int more; |
a0b974de DL |
62 | |
63 | read(syncfd, &dummy, 1); | |
64 | ||
65 | zmqctx = zmq_ctx_new(); | |
66 | zmq_ctx_set(zmqctx, ZMQ_IPV6, 1); | |
67 | ||
afd0f10d | 68 | zmqsock = zmq_socket(zmqctx, ZMQ_DEALER); |
a0b974de DL |
69 | if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) { |
70 | perror("zmq_connect"); | |
71 | exit(1); | |
72 | } | |
73 | ||
74 | /* single-part */ | |
75 | for (i = 0; i < 8; i++) { | |
afd0f10d | 76 | snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i, |
77 | 'b' + i, 'c' + i); | |
a0b974de DL |
78 | printf("client send: %s\n", buf); |
79 | fflush(stdout); | |
afd0f10d | 80 | send_delim(zmqsock); |
81 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); | |
82 | more = recv_delim(zmqsock); | |
83 | while (more) { | |
84 | zmq_recv(zmqsock, buf, sizeof(buf), 0); | |
85 | printf("client recv: %s\n", buf); | |
86 | size_t len = sizeof(more); | |
87 | if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) | |
88 | break; | |
89 | } | |
a0b974de DL |
90 | } |
91 | ||
92 | /* multipart */ | |
93 | for (i = 2; i < 5; i++) { | |
a0b974de | 94 | printf("---\n"); |
afd0f10d | 95 | send_delim(zmqsock); |
96 | zmq_msg_t part; | |
a0b974de | 97 | for (j = 1; j <= i; j++) { |
a0b974de DL |
98 | char *dyn = XMALLOC(MTYPE_TESTBUF, 32); |
99 | ||
100 | snprintf(dyn, 32, "part %d/%d", j, i); | |
101 | printf("client send: %s\n", dyn); | |
102 | fflush(stdout); | |
103 | ||
104 | zmq_msg_init_data(&part, dyn, strlen(dyn) + 1, | |
105 | msg_buf_free, NULL); | |
106 | zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0); | |
107 | } | |
108 | ||
afd0f10d | 109 | recv_delim(zmqsock); |
a0b974de DL |
110 | do { |
111 | char *data; | |
112 | ||
113 | zmq_msg_recv(&part, zmqsock, 0); | |
114 | data = zmq_msg_data(&part); | |
115 | more = zmq_msg_more(&part); | |
116 | printf("client recv (more: %d): %s\n", more, data); | |
117 | } while (more); | |
118 | zmq_msg_close(&part); | |
119 | } | |
afd0f10d | 120 | |
121 | /* write callback */ | |
122 | printf("---\n"); | |
772270f3 | 123 | snprintf(buf, sizeof(buf), "Done receiving"); |
afd0f10d | 124 | printf("client send: %s\n", buf); |
125 | fflush(stdout); | |
126 | send_delim(zmqsock); | |
127 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); | |
128 | /* wait for message from server */ | |
129 | more = recv_delim(zmqsock); | |
130 | while (more) { | |
131 | zmq_recv(zmqsock, buf, sizeof(buf), 0); | |
132 | printf("client recv: %s\n", buf); | |
133 | size_t len = sizeof(more); | |
134 | if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) | |
135 | break; | |
136 | } | |
137 | ||
a0b974de DL |
138 | zmq_close(zmqsock); |
139 | zmq_ctx_term(zmqctx); | |
140 | } | |
141 | ||
142 | static struct frrzmq_cb *cb; | |
143 | ||
afd0f10d | 144 | static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) |
145 | { | |
146 | /* receive id */ | |
147 | zmq_msg_init(msg_id); | |
148 | zmq_msg_recv(msg_id, zmqsock, 0); | |
149 | /* receive delim */ | |
150 | recv_delim(zmqsock); | |
151 | } | |
152 | static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) | |
153 | { | |
154 | /* Send Id */ | |
155 | zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE); | |
156 | send_delim(zmqsock); | |
157 | } | |
158 | static void serverwritefn(void *arg, void *zmqsock) | |
159 | { | |
160 | zmq_msg_t *msg_id = (zmq_msg_t *)arg; | |
161 | char buf[32] = "Test write callback"; | |
162 | size_t i; | |
163 | ||
164 | for (i = 0; i < strlen(buf); i++) | |
165 | buf[i] = toupper(buf[i]); | |
166 | printf("server send: %s\n", buf); | |
167 | fflush(stdout); | |
168 | send_id_and_delim(zmqsock, msg_id); | |
169 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); | |
170 | ||
171 | /* send just once */ | |
172 | frrzmq_thread_cancel(&cb, &cb->write); | |
173 | ||
174 | zmq_msg_close(msg_id); | |
175 | XFREE(MTYPE_ZMQMSG, msg_id); | |
176 | } | |
a0b974de | 177 | static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, |
afd0f10d | 178 | unsigned partnum) |
a0b974de | 179 | { |
afd0f10d | 180 | static int num = 0; |
a0b974de DL |
181 | int more = zmq_msg_more(msg); |
182 | char *in = zmq_msg_data(msg); | |
183 | size_t i; | |
184 | zmq_msg_t reply; | |
185 | char *out; | |
186 | ||
afd0f10d | 187 | /* Id */ |
188 | if (partnum == 0) { | |
189 | send_id_and_delim(zmqsock, msg); | |
190 | return; | |
191 | } | |
192 | /* Delim */ | |
193 | if (partnum == 1) | |
194 | return; | |
195 | ||
196 | ||
a0b974de DL |
197 | printf("server recv part %u (more: %d): %s\n", partnum, more, in); |
198 | fflush(stdout); | |
a0b974de DL |
199 | |
200 | out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1); | |
201 | for (i = 0; i < strlen(in); i++) | |
202 | out[i] = toupper(in[i]); | |
203 | out[i] = '\0'; | |
204 | zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); | |
205 | zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE); | |
206 | ||
afd0f10d | 207 | if (more) |
208 | return; | |
209 | ||
a0b974de DL |
210 | out = XMALLOC(MTYPE_TESTBUF, 32); |
211 | snprintf(out, 32, "msg# was %u", partnum); | |
212 | zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); | |
213 | zmq_msg_send(&reply, zmqsock, 0); | |
afd0f10d | 214 | |
215 | zmq_msg_close(&reply); | |
216 | ||
217 | if (++num < 7) | |
218 | return; | |
219 | ||
220 | /* write callback test */ | |
221 | char buf[32]; | |
222 | zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t)); | |
223 | recv_id_and_delim(zmqsock, msg_id); | |
224 | zmq_recv(zmqsock, buf, sizeof(buf), 0); | |
225 | printf("server recv: %s\n", buf); | |
226 | fflush(stdout); | |
227 | ||
228 | frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id, | |
229 | zmqsock, &cb); | |
a0b974de DL |
230 | } |
231 | ||
232 | static void serverfn(void *arg, void *zmqsock) | |
233 | { | |
234 | static int num = 0; | |
235 | ||
afd0f10d | 236 | zmq_msg_t msg_id; |
a0b974de DL |
237 | char buf[32]; |
238 | size_t i; | |
afd0f10d | 239 | |
240 | recv_id_and_delim(zmqsock, &msg_id); | |
a0b974de DL |
241 | zmq_recv(zmqsock, buf, sizeof(buf), 0); |
242 | ||
243 | printf("server recv: %s\n", buf); | |
244 | fflush(stdout); | |
245 | for (i = 0; i < strlen(buf); i++) | |
246 | buf[i] = toupper(buf[i]); | |
afd0f10d | 247 | send_id_and_delim(zmqsock, &msg_id); |
248 | zmq_msg_close(&msg_id); | |
a0b974de DL |
249 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); |
250 | ||
251 | if (++num < 4) | |
252 | return; | |
253 | ||
254 | /* change to multipart callback */ | |
afd0f10d | 255 | frrzmq_thread_cancel(&cb, &cb->read); |
256 | frrzmq_thread_cancel(&cb, &cb->write); | |
a0b974de | 257 | |
afd0f10d | 258 | frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock, |
259 | &cb); | |
a0b974de DL |
260 | } |
261 | ||
262 | static void sigchld(void) | |
263 | { | |
264 | printf("child exited.\n"); | |
afd0f10d | 265 | frrzmq_thread_cancel(&cb, &cb->read); |
266 | frrzmq_thread_cancel(&cb, &cb->write); | |
a0b974de DL |
267 | } |
268 | ||
7cc91e67 | 269 | static struct frr_signal_t sigs[] = { |
a0b974de DL |
270 | { |
271 | .signal = SIGCHLD, | |
272 | .handler = sigchld, | |
273 | }, | |
274 | }; | |
275 | ||
276 | static void run_server(int syncfd) | |
277 | { | |
278 | void *zmqsock; | |
279 | char dummy = 0; | |
280 | struct thread t; | |
281 | ||
282 | master = thread_master_create(NULL); | |
283 | signal_init(master, array_size(sigs), sigs); | |
284 | frrzmq_init(); | |
285 | ||
afd0f10d | 286 | zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER); |
a0b974de DL |
287 | if (zmq_bind(zmqsock, "tcp://*:17171")) { |
288 | perror("zmq_bind"); | |
289 | exit(1); | |
290 | } | |
291 | ||
afd0f10d | 292 | frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb); |
a0b974de DL |
293 | |
294 | write(syncfd, &dummy, sizeof(dummy)); | |
295 | while (thread_fetch(master, &t)) | |
296 | thread_call(&t); | |
297 | ||
298 | zmq_close(zmqsock); | |
299 | frrzmq_finish(); | |
300 | thread_master_free(master); | |
301 | log_memstats_stderr("test"); | |
302 | } | |
303 | ||
304 | int main(void) | |
305 | { | |
306 | int syncpipe[2]; | |
307 | pid_t child; | |
308 | ||
309 | if (pipe(syncpipe)) { | |
310 | perror("pipe"); | |
311 | exit(1); | |
312 | } | |
313 | ||
314 | child = fork(); | |
315 | if (child < 0) { | |
316 | perror("fork"); | |
317 | exit(1); | |
318 | } else if (child == 0) { | |
319 | run_client(syncpipe[0]); | |
320 | exit(0); | |
321 | } | |
322 | ||
323 | run_server(syncpipe[1]); | |
324 | exit(0); | |
325 | } |