]> git.proxmox.com Git - mirror_frr.git/blame - tests/lib/test_zmq.c
tests: add ZeroMQ test
[mirror_frr.git] / tests / lib / test_zmq.c
CommitLineData
a0b974de
DL
1/*
2 * ZeroMQ event test
3 * Copyright (C) 2017 David Lamparter, for NetDEF, Inc.
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20#include <zebra.h>
21#include "memory.h"
22#include "sigevent.h"
23#include "frr_zmq.h"
24
25DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer")
26
27static struct thread_master *master;
28
29static void msg_buf_free(void *data, void *hint)
30{
31 XFREE(MTYPE_TESTBUF, data);
32}
33
34static void run_client(int syncfd)
35{
36 int i, j;
37 char buf[32];
38 char dummy;
39 void *zmqctx = NULL;
40 void *zmqsock;
41
42 read(syncfd, &dummy, 1);
43
44 zmqctx = zmq_ctx_new();
45 zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
46
47 zmqsock = zmq_socket(zmqctx, ZMQ_REQ);
48 if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
49 perror("zmq_connect");
50 exit(1);
51 }
52
53 /* single-part */
54 for (i = 0; i < 8; i++) {
55 snprintf(buf, sizeof(buf), "msg #%d %c%c%c",
56 i, 'a' + i, 'b' + i, 'c' + i);
57 printf("client send: %s\n", buf);
58 fflush(stdout);
59 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
60 zmq_recv(zmqsock, buf, sizeof(buf), 0);
61 printf("client recv: %s\n", buf);
62 }
63
64 /* multipart */
65 for (i = 2; i < 5; i++) {
66 int more;
67
68 printf("---\n");
69 for (j = 1; j <= i; j++) {
70 zmq_msg_t part;
71 char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
72
73 snprintf(dyn, 32, "part %d/%d", j, i);
74 printf("client send: %s\n", dyn);
75 fflush(stdout);
76
77 zmq_msg_init_data(&part, dyn, strlen(dyn) + 1,
78 msg_buf_free, NULL);
79 zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
80 }
81
82 zmq_msg_t part;
83 do {
84 char *data;
85
86 zmq_msg_recv(&part, zmqsock, 0);
87 data = zmq_msg_data(&part);
88 more = zmq_msg_more(&part);
89 printf("client recv (more: %d): %s\n", more, data);
90 } while (more);
91 zmq_msg_close(&part);
92 }
93 zmq_close(zmqsock);
94 zmq_ctx_term(zmqctx);
95}
96
97static struct frrzmq_cb *cb;
98
99static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
100 unsigned partnum)
101{
102 int more = zmq_msg_more(msg);
103 char *in = zmq_msg_data(msg);
104 size_t i;
105 zmq_msg_t reply;
106 char *out;
107
108 printf("server recv part %u (more: %d): %s\n", partnum, more, in);
109 fflush(stdout);
110 /* REQ-REP doesn't allow sending a reply here */
111 if (more)
112 return;
113
114 out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
115 for (i = 0; i < strlen(in); i++)
116 out[i] = toupper(in[i]);
117 out[i] = '\0';
118 zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
119 zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
120
121 out = XMALLOC(MTYPE_TESTBUF, 32);
122 snprintf(out, 32, "msg# was %u", partnum);
123 zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
124 zmq_msg_send(&reply, zmqsock, 0);
125}
126
127static void serverfn(void *arg, void *zmqsock)
128{
129 static int num = 0;
130
131 char buf[32];
132 size_t i;
133 zmq_recv(zmqsock, buf, sizeof(buf), 0);
134
135 printf("server recv: %s\n", buf);
136 fflush(stdout);
137 for (i = 0; i < strlen(buf); i++)
138 buf[i] = toupper(buf[i]);
139 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
140
141 if (++num < 4)
142 return;
143
144 /* change to multipart callback */
145 frrzmq_thread_cancel(cb);
146
147 cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock);
148}
149
150static void sigchld(void)
151{
152 printf("child exited.\n");
153 frrzmq_thread_cancel(cb);
154}
155
156static struct quagga_signal_t sigs[] = {
157 {
158 .signal = SIGCHLD,
159 .handler = sigchld,
160 },
161};
162
163static void run_server(int syncfd)
164{
165 void *zmqsock;
166 char dummy = 0;
167 struct thread t;
168
169 master = thread_master_create(NULL);
170 signal_init(master, array_size(sigs), sigs);
171 frrzmq_init();
172
173 zmqsock = zmq_socket(frrzmq_context, ZMQ_REP);
174 if (zmq_bind(zmqsock, "tcp://*:17171")) {
175 perror("zmq_bind");
176 exit(1);
177 }
178
179 cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock);
180
181 write(syncfd, &dummy, sizeof(dummy));
182 while (thread_fetch(master, &t))
183 thread_call(&t);
184
185 zmq_close(zmqsock);
186 frrzmq_finish();
187 thread_master_free(master);
188 log_memstats_stderr("test");
189}
190
191int main(void)
192{
193 int syncpipe[2];
194 pid_t child;
195
196 if (pipe(syncpipe)) {
197 perror("pipe");
198 exit(1);
199 }
200
201 child = fork();
202 if (child < 0) {
203 perror("fork");
204 exit(1);
205 } else if (child == 0) {
206 run_client(syncpipe[0]);
207 exit(0);
208 }
209
210 run_server(syncpipe[1]);
211 exit(0);
212}