]>
Commit | Line | Data |
---|---|---|
a0b974de DL |
1 | /* |
2 | * ZeroMQ event test | |
3 | * Copyright (C) 2017 David Lamparter, for NetDEF, Inc. | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify it | |
6 | * under the terms of the GNU General Public License as published by the Free | |
7 | * Software Foundation; either version 2 of the License, or (at your option) | |
8 | * any later version. | |
9 | * | |
10 | * This program is distributed in the hope that it will be useful, but WITHOUT | |
11 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
12 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
13 | * more details. | |
14 | * | |
15 | * You should have received a copy of the GNU General Public License along | |
16 | * with this program; see the file COPYING; if not, write to the Free Software | |
17 | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
18 | */ | |
19 | ||
20 | #include <zebra.h> | |
21 | #include "memory.h" | |
22 | #include "sigevent.h" | |
23 | #include "frr_zmq.h" | |
24 | ||
25 | DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer") | |
26 | ||
27 | static struct thread_master *master; | |
28 | ||
29 | static void msg_buf_free(void *data, void *hint) | |
30 | { | |
31 | XFREE(MTYPE_TESTBUF, data); | |
32 | } | |
33 | ||
34 | static void run_client(int syncfd) | |
35 | { | |
36 | int i, j; | |
37 | char buf[32]; | |
38 | char dummy; | |
39 | void *zmqctx = NULL; | |
40 | void *zmqsock; | |
41 | ||
42 | read(syncfd, &dummy, 1); | |
43 | ||
44 | zmqctx = zmq_ctx_new(); | |
45 | zmq_ctx_set(zmqctx, ZMQ_IPV6, 1); | |
46 | ||
47 | zmqsock = zmq_socket(zmqctx, ZMQ_REQ); | |
48 | if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) { | |
49 | perror("zmq_connect"); | |
50 | exit(1); | |
51 | } | |
52 | ||
53 | /* single-part */ | |
54 | for (i = 0; i < 8; i++) { | |
55 | snprintf(buf, sizeof(buf), "msg #%d %c%c%c", | |
56 | i, 'a' + i, 'b' + i, 'c' + i); | |
57 | printf("client send: %s\n", buf); | |
58 | fflush(stdout); | |
59 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); | |
60 | zmq_recv(zmqsock, buf, sizeof(buf), 0); | |
61 | printf("client recv: %s\n", buf); | |
62 | } | |
63 | ||
64 | /* multipart */ | |
65 | for (i = 2; i < 5; i++) { | |
66 | int more; | |
67 | ||
68 | printf("---\n"); | |
69 | for (j = 1; j <= i; j++) { | |
70 | zmq_msg_t part; | |
71 | char *dyn = XMALLOC(MTYPE_TESTBUF, 32); | |
72 | ||
73 | snprintf(dyn, 32, "part %d/%d", j, i); | |
74 | printf("client send: %s\n", dyn); | |
75 | fflush(stdout); | |
76 | ||
77 | zmq_msg_init_data(&part, dyn, strlen(dyn) + 1, | |
78 | msg_buf_free, NULL); | |
79 | zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0); | |
80 | } | |
81 | ||
82 | zmq_msg_t part; | |
83 | do { | |
84 | char *data; | |
85 | ||
86 | zmq_msg_recv(&part, zmqsock, 0); | |
87 | data = zmq_msg_data(&part); | |
88 | more = zmq_msg_more(&part); | |
89 | printf("client recv (more: %d): %s\n", more, data); | |
90 | } while (more); | |
91 | zmq_msg_close(&part); | |
92 | } | |
93 | zmq_close(zmqsock); | |
94 | zmq_ctx_term(zmqctx); | |
95 | } | |
96 | ||
97 | static struct frrzmq_cb *cb; | |
98 | ||
99 | static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, | |
100 | unsigned partnum) | |
101 | { | |
102 | int more = zmq_msg_more(msg); | |
103 | char *in = zmq_msg_data(msg); | |
104 | size_t i; | |
105 | zmq_msg_t reply; | |
106 | char *out; | |
107 | ||
108 | printf("server recv part %u (more: %d): %s\n", partnum, more, in); | |
109 | fflush(stdout); | |
110 | /* REQ-REP doesn't allow sending a reply here */ | |
111 | if (more) | |
112 | return; | |
113 | ||
114 | out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1); | |
115 | for (i = 0; i < strlen(in); i++) | |
116 | out[i] = toupper(in[i]); | |
117 | out[i] = '\0'; | |
118 | zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); | |
119 | zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE); | |
120 | ||
121 | out = XMALLOC(MTYPE_TESTBUF, 32); | |
122 | snprintf(out, 32, "msg# was %u", partnum); | |
123 | zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); | |
124 | zmq_msg_send(&reply, zmqsock, 0); | |
125 | } | |
126 | ||
127 | static void serverfn(void *arg, void *zmqsock) | |
128 | { | |
129 | static int num = 0; | |
130 | ||
131 | char buf[32]; | |
132 | size_t i; | |
133 | zmq_recv(zmqsock, buf, sizeof(buf), 0); | |
134 | ||
135 | printf("server recv: %s\n", buf); | |
136 | fflush(stdout); | |
137 | for (i = 0; i < strlen(buf); i++) | |
138 | buf[i] = toupper(buf[i]); | |
139 | zmq_send(zmqsock, buf, strlen(buf) + 1, 0); | |
140 | ||
141 | if (++num < 4) | |
142 | return; | |
143 | ||
144 | /* change to multipart callback */ | |
145 | frrzmq_thread_cancel(cb); | |
146 | ||
147 | cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock); | |
148 | } | |
149 | ||
150 | static void sigchld(void) | |
151 | { | |
152 | printf("child exited.\n"); | |
153 | frrzmq_thread_cancel(cb); | |
154 | } | |
155 | ||
156 | static struct quagga_signal_t sigs[] = { | |
157 | { | |
158 | .signal = SIGCHLD, | |
159 | .handler = sigchld, | |
160 | }, | |
161 | }; | |
162 | ||
163 | static void run_server(int syncfd) | |
164 | { | |
165 | void *zmqsock; | |
166 | char dummy = 0; | |
167 | struct thread t; | |
168 | ||
169 | master = thread_master_create(NULL); | |
170 | signal_init(master, array_size(sigs), sigs); | |
171 | frrzmq_init(); | |
172 | ||
173 | zmqsock = zmq_socket(frrzmq_context, ZMQ_REP); | |
174 | if (zmq_bind(zmqsock, "tcp://*:17171")) { | |
175 | perror("zmq_bind"); | |
176 | exit(1); | |
177 | } | |
178 | ||
179 | cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock); | |
180 | ||
181 | write(syncfd, &dummy, sizeof(dummy)); | |
182 | while (thread_fetch(master, &t)) | |
183 | thread_call(&t); | |
184 | ||
185 | zmq_close(zmqsock); | |
186 | frrzmq_finish(); | |
187 | thread_master_free(master); | |
188 | log_memstats_stderr("test"); | |
189 | } | |
190 | ||
191 | int main(void) | |
192 | { | |
193 | int syncpipe[2]; | |
194 | pid_t child; | |
195 | ||
196 | if (pipe(syncpipe)) { | |
197 | perror("pipe"); | |
198 | exit(1); | |
199 | } | |
200 | ||
201 | child = fork(); | |
202 | if (child < 0) { | |
203 | perror("fork"); | |
204 | exit(1); | |
205 | } else if (child == 0) { | |
206 | run_client(syncpipe[0]); | |
207 | exit(0); | |
208 | } | |
209 | ||
210 | run_server(syncpipe[1]); | |
211 | exit(0); | |
212 | } |