]> git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
Merge pull request #12798 from donaldsharp/rib_match_multicast
[mirror_frr.git] / lib / frr_zmq.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * libzebra ZeroMQ bindings
4 * Copyright (C) 2015 David Lamparter
5 */
6
7 /*
8 * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that
9 * the test_zmq.c unit test is still working. There are dependencies
10 * between the two that are extremely fragile. My understanding
11 * is that there is specialized ownership of the cb pointer based
12 * upon what is happening. Those assumptions are supposed to be
13 * tested in the test_zmq.c
14 */
15 #include <zebra.h>
16 #include <zmq.h>
17
18 #include "thread.h"
19 #include "memory.h"
20 #include "frr_zmq.h"
21 #include "log.h"
22 #include "lib_errors.h"
23
24 DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback");
25
26 /* libzmq's context */
27 void *frrzmq_context = NULL;
28 static unsigned frrzmq_initcount = 0;
29
30 void frrzmq_init(void)
31 {
32 if (frrzmq_initcount++ == 0) {
33 frrzmq_context = zmq_ctx_new();
34 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
35 }
36 }
37
38 void frrzmq_finish(void)
39 {
40 if (--frrzmq_initcount == 0) {
41 zmq_ctx_term(frrzmq_context);
42 frrzmq_context = NULL;
43 }
44 }
45
46 static void frrzmq_read_msg(struct thread *t)
47 {
48 struct frrzmq_cb **cbp = THREAD_ARG(t);
49 struct frrzmq_cb *cb;
50 zmq_msg_t msg;
51 unsigned partno;
52 unsigned char read = 0;
53 int ret, more;
54 size_t moresz;
55
56 if (!cbp)
57 return;
58 cb = (*cbp);
59 if (!cb || !cb->zmqsock)
60 return;
61
62 while (1) {
63 zmq_pollitem_t polli = {.socket = cb->zmqsock,
64 .events = ZMQ_POLLIN};
65 ret = zmq_poll(&polli, 1, 0);
66
67 if (ret < 0)
68 goto out_err;
69
70 if (!(polli.revents & ZMQ_POLLIN))
71 break;
72
73 if (cb->read.cb_msg) {
74 cb->in_cb = true;
75 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
76 cb->in_cb = false;
77
78 read = 1;
79
80 if (cb->read.cancelled) {
81 frrzmq_check_events(cbp, &cb->write,
82 ZMQ_POLLOUT);
83 cb->read.thread = NULL;
84 if (cb->write.cancelled && !cb->write.thread)
85 XFREE(MTYPE_ZEROMQ_CB, *cbp);
86
87 return;
88 }
89 continue;
90 }
91
92 partno = 0;
93 if (zmq_msg_init(&msg))
94 goto out_err;
95 do {
96 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
97 if (ret < 0) {
98 if (errno == EAGAIN)
99 break;
100
101 zmq_msg_close(&msg);
102 goto out_err;
103 }
104 read = 1;
105
106 cb->in_cb = true;
107 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
108 partno);
109 cb->in_cb = false;
110
111 if (cb->read.cancelled) {
112 zmq_msg_close(&msg);
113 frrzmq_check_events(cbp, &cb->write,
114 ZMQ_POLLOUT);
115 cb->read.thread = NULL;
116 if (cb->write.cancelled && !cb->write.thread)
117 XFREE(MTYPE_ZEROMQ_CB, *cbp);
118
119 return;
120 }
121
122 /* cb_part may have read additional parts of the
123 * message; don't use zmq_msg_more here */
124 moresz = sizeof(more);
125 more = 0;
126 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
127 &moresz);
128 if (ret < 0) {
129 zmq_msg_close(&msg);
130 goto out_err;
131 }
132
133 partno++;
134 } while (more);
135 zmq_msg_close(&msg);
136 }
137
138 if (read)
139 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
140
141 thread_add_read(t->master, frrzmq_read_msg, cbp,
142 cb->fd, &cb->read.thread);
143 return;
144
145 out_err:
146 flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
147 errno);
148 if (cb->read.cb_error)
149 cb->read.cb_error(cb->read.arg, cb->zmqsock);
150 }
151
152 int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
153 struct thread_master *master,
154 void (*msgfunc)(void *arg, void *zmqsock),
155 void (*partfunc)(void *arg, void *zmqsock,
156 zmq_msg_t *msg, unsigned partnum),
157 void (*errfunc)(void *arg, void *zmqsock),
158 void *arg, void *zmqsock,
159 struct frrzmq_cb **cbp)
160 {
161 int fd, events;
162 size_t len;
163 struct frrzmq_cb *cb;
164
165 if (!cbp)
166 return -1;
167 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
168 return -1;
169 len = sizeof(fd);
170 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
171 return -1;
172 len = sizeof(events);
173 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
174 return -1;
175
176 if (*cbp)
177 cb = *cbp;
178 else {
179 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
180 cb->write.cancelled = true;
181 *cbp = cb;
182 }
183
184 cb->zmqsock = zmqsock;
185 cb->fd = fd;
186 cb->read.arg = arg;
187 cb->read.cb_msg = msgfunc;
188 cb->read.cb_part = partfunc;
189 cb->read.cb_error = errfunc;
190 cb->read.cancelled = false;
191 cb->in_cb = false;
192
193 if (events & ZMQ_POLLIN) {
194 thread_cancel(&cb->read.thread);
195
196 thread_add_event(master, frrzmq_read_msg, cbp, fd,
197 &cb->read.thread);
198 } else
199 thread_add_read(master, frrzmq_read_msg, cbp, fd,
200 &cb->read.thread);
201 return 0;
202 }
203
204 static void frrzmq_write_msg(struct thread *t)
205 {
206 struct frrzmq_cb **cbp = THREAD_ARG(t);
207 struct frrzmq_cb *cb;
208 unsigned char written = 0;
209 int ret;
210
211 if (!cbp)
212 return;
213 cb = (*cbp);
214 if (!cb || !cb->zmqsock)
215 return;
216
217 while (1) {
218 zmq_pollitem_t polli = {.socket = cb->zmqsock,
219 .events = ZMQ_POLLOUT};
220 ret = zmq_poll(&polli, 1, 0);
221
222 if (ret < 0)
223 goto out_err;
224
225 if (!(polli.revents & ZMQ_POLLOUT))
226 break;
227
228 if (cb->write.cb_msg) {
229 cb->in_cb = true;
230 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
231 cb->in_cb = false;
232
233 written = 1;
234
235 if (cb->write.cancelled) {
236 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
237 cb->write.thread = NULL;
238 if (cb->read.cancelled && !cb->read.thread)
239 XFREE(MTYPE_ZEROMQ_CB, *cbp);
240
241 return;
242 }
243 continue;
244 }
245 }
246
247 if (written)
248 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
249
250 thread_add_write(t->master, frrzmq_write_msg, cbp,
251 cb->fd, &cb->write.thread);
252 return;
253
254 out_err:
255 flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
256 errno);
257 if (cb->write.cb_error)
258 cb->write.cb_error(cb->write.arg, cb->zmqsock);
259 }
260
261 int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
262 struct thread_master *master,
263 void (*msgfunc)(void *arg, void *zmqsock),
264 void (*errfunc)(void *arg, void *zmqsock),
265 void *arg, void *zmqsock, struct frrzmq_cb **cbp)
266 {
267 int fd, events;
268 size_t len;
269 struct frrzmq_cb *cb;
270
271 if (!cbp)
272 return -1;
273 if (!msgfunc)
274 return -1;
275 len = sizeof(fd);
276 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
277 return -1;
278 len = sizeof(events);
279 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
280 return -1;
281
282 if (*cbp)
283 cb = *cbp;
284 else {
285 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
286 cb->read.cancelled = true;
287 *cbp = cb;
288 }
289
290 cb->zmqsock = zmqsock;
291 cb->fd = fd;
292 cb->write.arg = arg;
293 cb->write.cb_msg = msgfunc;
294 cb->write.cb_part = NULL;
295 cb->write.cb_error = errfunc;
296 cb->write.cancelled = false;
297 cb->in_cb = false;
298
299 if (events & ZMQ_POLLOUT) {
300 thread_cancel(&cb->write.thread);
301
302 _thread_add_event(xref, master, frrzmq_write_msg, cbp, fd,
303 &cb->write.thread);
304 } else
305 thread_add_write(master, frrzmq_write_msg, cbp, fd,
306 &cb->write.thread);
307 return 0;
308 }
309
310 void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
311 {
312 if (!cb || !*cb)
313 return;
314 core->cancelled = true;
315 thread_cancel(&core->thread);
316
317 /* If cancelled from within a callback, don't try to free memory
318 * in this path.
319 */
320 if ((*cb)->in_cb)
321 return;
322
323 /* Ok to free the callback context if no more ... context. */
324 if ((*cb)->read.cancelled && !(*cb)->read.thread
325 && (*cb)->write.cancelled && ((*cb)->write.thread == NULL))
326 XFREE(MTYPE_ZEROMQ_CB, *cb);
327 }
328
329 void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
330 int event)
331 {
332 struct frrzmq_cb *cb;
333 int events;
334 size_t len;
335
336 if (!cbp)
337 return;
338 cb = (*cbp);
339 if (!cb || !cb->zmqsock)
340 return;
341
342 len = sizeof(events);
343 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
344 return;
345 if ((events & event) && core->thread && !core->cancelled) {
346 struct thread_master *tm = core->thread->master;
347
348 thread_cancel(&core->thread);
349
350 if (event == ZMQ_POLLIN)
351 thread_add_event(tm, frrzmq_read_msg,
352 cbp, cb->fd, &core->thread);
353 else
354 thread_add_event(tm, frrzmq_write_msg,
355 cbp, cb->fd, &core->thread);
356 }
357 }