]> git.proxmox.com Git - mirror_frr.git/blame - lib/frr_zmq.c
Merge pull request #13649 from donaldsharp/unlock_the_node_or_else
[mirror_frr.git] / lib / frr_zmq.c
CommitLineData
acddc0ed 1// SPDX-License-Identifier: GPL-2.0-or-later
b6116506
DL
2/*
3 * libzebra ZeroMQ bindings
4 * Copyright (C) 2015 David Lamparter
b6116506
DL
5 */
6
fb1df4cd
DS
7/*
8 * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that
214d8a60 9 * the test_zmq.c unit test is still working. There are dependencies
fb1df4cd
DS
10 * between the two that are extremely fragile. My understanding
11 * is that there is specialized ownership of the cb pointer based
12 * upon what is happening. Those assumptions are supposed to be
13 * tested in the test_zmq.c
14 */
b6116506
DL
15#include <zebra.h>
16#include <zmq.h>
17
24a58196 18#include "frrevent.h"
b6116506
DL
19#include "memory.h"
20#include "frr_zmq.h"
21#include "log.h"
35774357 22#include "lib_errors.h"
b6116506 23
bf8d3d6a 24DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback");
b6116506
DL
25
26/* libzmq's context */
27void *frrzmq_context = NULL;
28static unsigned frrzmq_initcount = 0;
29
30void frrzmq_init(void)
31{
32 if (frrzmq_initcount++ == 0) {
33 frrzmq_context = zmq_ctx_new();
34 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
35 }
36}
37
38void frrzmq_finish(void)
39{
40 if (--frrzmq_initcount == 0) {
41 zmq_ctx_term(frrzmq_context);
42 frrzmq_context = NULL;
43 }
44}
45
e6685141 46static void frrzmq_read_msg(struct event *t)
b6116506 47{
e16d030c 48 struct frrzmq_cb **cbp = EVENT_ARG(t);
afd0f10d 49 struct frrzmq_cb *cb;
b6116506
DL
50 zmq_msg_t msg;
51 unsigned partno;
afd0f10d 52 unsigned char read = 0;
b6116506
DL
53 int ret, more;
54 size_t moresz;
55
afd0f10d 56 if (!cbp)
cc9f21da 57 return;
afd0f10d 58 cb = (*cbp);
59 if (!cb || !cb->zmqsock)
cc9f21da 60 return;
afd0f10d 61
b6116506 62 while (1) {
afd0f10d 63 zmq_pollitem_t polli = {.socket = cb->zmqsock,
64 .events = ZMQ_POLLIN};
b6116506
DL
65 ret = zmq_poll(&polli, 1, 0);
66
67 if (ret < 0)
68 goto out_err;
afd0f10d 69
b6116506
DL
70 if (!(polli.revents & ZMQ_POLLIN))
71 break;
72
afd0f10d 73 if (cb->read.cb_msg) {
8fd5502b 74 cb->in_cb = true;
afd0f10d 75 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
8fd5502b
MS
76 cb->in_cb = false;
77
afd0f10d 78 read = 1;
b6116506 79
afd0f10d 80 if (cb->read.cancelled) {
81 frrzmq_check_events(cbp, &cb->write,
82 ZMQ_POLLOUT);
83 cb->read.thread = NULL;
84 if (cb->write.cancelled && !cb->write.thread)
cf2182c0
MS
85 XFREE(MTYPE_ZEROMQ_CB, *cbp);
86
cc9f21da 87 return;
b6116506
DL
88 }
89 continue;
90 }
91
92 partno = 0;
93 if (zmq_msg_init(&msg))
94 goto out_err;
95 do {
96 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
97 if (ret < 0) {
98 if (errno == EAGAIN)
99 break;
100
101 zmq_msg_close(&msg);
102 goto out_err;
103 }
afd0f10d 104 read = 1;
b6116506 105
8fd5502b 106 cb->in_cb = true;
afd0f10d 107 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
108 partno);
8fd5502b
MS
109 cb->in_cb = false;
110
afd0f10d 111 if (cb->read.cancelled) {
b6116506 112 zmq_msg_close(&msg);
afd0f10d 113 frrzmq_check_events(cbp, &cb->write,
114 ZMQ_POLLOUT);
115 cb->read.thread = NULL;
116 if (cb->write.cancelled && !cb->write.thread)
cf2182c0
MS
117 XFREE(MTYPE_ZEROMQ_CB, *cbp);
118
cc9f21da 119 return;
b6116506
DL
120 }
121
122 /* cb_part may have read additional parts of the
123 * message; don't use zmq_msg_more here */
124 moresz = sizeof(more);
125 more = 0;
afd0f10d 126 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
127 &moresz);
b6116506
DL
128 if (ret < 0) {
129 zmq_msg_close(&msg);
130 goto out_err;
131 }
132
133 partno++;
134 } while (more);
135 zmq_msg_close(&msg);
136 }
137
afd0f10d 138 if (read)
139 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
140
907a2395
DS
141 event_add_read(t->master, frrzmq_read_msg, cbp, cb->fd,
142 &cb->read.thread);
cc9f21da 143 return;
b6116506
DL
144
145out_err:
450971aa 146 flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
1c50c1c0 147 errno);
afd0f10d 148 if (cb->read.cb_error)
149 cb->read.cb_error(cb->read.arg, cb->zmqsock);
b6116506
DL
150}
151
5163a1c5 152int _frrzmq_event_add_read(const struct xref_eventsched *xref,
cd9d0537 153 struct event_loop *master,
907a2395
DS
154 void (*msgfunc)(void *arg, void *zmqsock),
155 void (*partfunc)(void *arg, void *zmqsock,
156 zmq_msg_t *msg, unsigned partnum),
157 void (*errfunc)(void *arg, void *zmqsock), void *arg,
158 void *zmqsock, struct frrzmq_cb **cbp)
b6116506
DL
159{
160 int fd, events;
161 size_t len;
162 struct frrzmq_cb *cb;
163
afd0f10d 164 if (!cbp)
165 return -1;
b6116506 166 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
afd0f10d 167 return -1;
168 len = sizeof(fd);
169 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
170 return -1;
171 len = sizeof(events);
172 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
173 return -1;
174
175 if (*cbp)
176 cb = *cbp;
177 else {
178 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
08c2d52a 179 cb->write.cancelled = true;
afd0f10d 180 *cbp = cb;
181 }
182
183 cb->zmqsock = zmqsock;
184 cb->fd = fd;
185 cb->read.arg = arg;
186 cb->read.cb_msg = msgfunc;
187 cb->read.cb_part = partfunc;
188 cb->read.cb_error = errfunc;
08c2d52a 189 cb->read.cancelled = false;
8fd5502b 190 cb->in_cb = false;
afd0f10d 191
192 if (events & ZMQ_POLLIN) {
332beb64 193 event_cancel(&cb->read.thread);
b3d6bc6e 194
907a2395 195 event_add_event(master, frrzmq_read_msg, cbp, fd,
1dffe357 196 &cb->read.thread);
907a2395
DS
197 } else
198 event_add_read(master, frrzmq_read_msg, cbp, fd,
199 &cb->read.thread);
afd0f10d 200 return 0;
201}
202
e6685141 203static void frrzmq_write_msg(struct event *t)
afd0f10d 204{
e16d030c 205 struct frrzmq_cb **cbp = EVENT_ARG(t);
afd0f10d 206 struct frrzmq_cb *cb;
207 unsigned char written = 0;
208 int ret;
209
210 if (!cbp)
cc9f21da 211 return;
afd0f10d 212 cb = (*cbp);
213 if (!cb || !cb->zmqsock)
cc9f21da 214 return;
afd0f10d 215
216 while (1) {
217 zmq_pollitem_t polli = {.socket = cb->zmqsock,
218 .events = ZMQ_POLLOUT};
219 ret = zmq_poll(&polli, 1, 0);
220
221 if (ret < 0)
222 goto out_err;
223
224 if (!(polli.revents & ZMQ_POLLOUT))
225 break;
226
227 if (cb->write.cb_msg) {
8fd5502b 228 cb->in_cb = true;
afd0f10d 229 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
8fd5502b
MS
230 cb->in_cb = false;
231
afd0f10d 232 written = 1;
233
234 if (cb->write.cancelled) {
235 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
236 cb->write.thread = NULL;
237 if (cb->read.cancelled && !cb->read.thread)
cf2182c0
MS
238 XFREE(MTYPE_ZEROMQ_CB, *cbp);
239
cc9f21da 240 return;
afd0f10d 241 }
242 continue;
243 }
244 }
245
246 if (written)
247 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
248
907a2395
DS
249 event_add_write(t->master, frrzmq_write_msg, cbp, cb->fd,
250 &cb->write.thread);
cc9f21da 251 return;
afd0f10d 252
253out_err:
450971aa 254 flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
1c50c1c0 255 errno);
afd0f10d 256 if (cb->write.cb_error)
257 cb->write.cb_error(cb->write.arg, cb->zmqsock);
afd0f10d 258}
60a3efec 259
5163a1c5 260int _frrzmq_event_add_write(const struct xref_eventsched *xref,
cd9d0537 261 struct event_loop *master,
907a2395
DS
262 void (*msgfunc)(void *arg, void *zmqsock),
263 void (*errfunc)(void *arg, void *zmqsock),
264 void *arg, void *zmqsock, struct frrzmq_cb **cbp)
afd0f10d 265{
266 int fd, events;
267 size_t len;
268 struct frrzmq_cb *cb;
269
270 if (!cbp)
271 return -1;
272 if (!msgfunc)
273 return -1;
b6116506
DL
274 len = sizeof(fd);
275 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
afd0f10d 276 return -1;
b6116506
DL
277 len = sizeof(events);
278 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
afd0f10d 279 return -1;
b6116506 280
afd0f10d 281 if (*cbp)
282 cb = *cbp;
283 else {
284 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
08c2d52a 285 cb->read.cancelled = true;
afd0f10d 286 *cbp = cb;
287 }
b6116506 288
b6116506 289 cb->zmqsock = zmqsock;
b6116506 290 cb->fd = fd;
afd0f10d 291 cb->write.arg = arg;
292 cb->write.cb_msg = msgfunc;
293 cb->write.cb_part = NULL;
294 cb->write.cb_error = errfunc;
08c2d52a 295 cb->write.cancelled = false;
8fd5502b 296 cb->in_cb = false;
afd0f10d 297
298 if (events & ZMQ_POLLOUT) {
332beb64 299 event_cancel(&cb->write.thread);
b3d6bc6e 300
907a2395 301 _event_add_event(xref, master, frrzmq_write_msg, cbp, fd,
1dffe357 302 &cb->write.thread);
907a2395
DS
303 } else
304 event_add_write(master, frrzmq_write_msg, cbp, fd,
305 &cb->write.thread);
afd0f10d 306 return 0;
307}
b6116506 308
afd0f10d 309void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
310{
311 if (!cb || !*cb)
312 return;
08c2d52a 313 core->cancelled = true;
332beb64 314 event_cancel(&core->thread);
b3d6bc6e 315
8fd5502b
MS
316 /* If cancelled from within a callback, don't try to free memory
317 * in this path.
fb1df4cd 318 */
8fd5502b
MS
319 if ((*cb)->in_cb)
320 return;
321
322 /* Ok to free the callback context if no more ... context. */
afd0f10d 323 if ((*cb)->read.cancelled && !(*cb)->read.thread
8fd5502b 324 && (*cb)->write.cancelled && ((*cb)->write.thread == NULL))
afd0f10d 325 XFREE(MTYPE_ZEROMQ_CB, *cb);
b6116506
DL
326}
327
afd0f10d 328void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
329 int event)
b6116506 330{
afd0f10d 331 struct frrzmq_cb *cb;
332 int events;
333 size_t len;
334
335 if (!cbp)
336 return;
337 cb = (*cbp);
338 if (!cb || !cb->zmqsock)
339 return;
340
81b8afcf 341 len = sizeof(events);
afd0f10d 342 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
b6116506 343 return;
e08165de 344 if ((events & event) && core->thread && !core->cancelled) {
cd9d0537 345 struct event_loop *tm = core->thread->master;
e08165de 346
332beb64 347 event_cancel(&core->thread);
b3d6bc6e 348
e08165de 349 if (event == ZMQ_POLLIN)
907a2395
DS
350 event_add_event(tm, frrzmq_read_msg, cbp, cb->fd,
351 &core->thread);
e08165de 352 else
907a2395
DS
353 event_add_event(tm, frrzmq_write_msg, cbp, cb->fd,
354 &core->thread);
b6116506 355 }
b6116506 356}