]> git.proxmox.com Git - mirror_frr.git/blame - lib/frr_zmq.c
*: Rename thread.[ch] to event.[ch]
[mirror_frr.git] / lib / frr_zmq.c
CommitLineData
acddc0ed 1// SPDX-License-Identifier: GPL-2.0-or-later
b6116506
DL
2/*
3 * libzebra ZeroMQ bindings
4 * Copyright (C) 2015 David Lamparter
b6116506
DL
5 */
6
fb1df4cd
DS
7/*
8 * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that
214d8a60 9 * the test_zmq.c unit test is still working. There are dependencies
fb1df4cd
DS
10 * between the two that are extremely fragile. My understanding
11 * is that there is specialized ownership of the cb pointer based
12 * upon what is happening. Those assumptions are supposed to be
13 * tested in the test_zmq.c
14 */
b6116506
DL
15#include <zebra.h>
16#include <zmq.h>
17
cb37cb33 18#include "event.h"
b6116506
DL
19#include "memory.h"
20#include "frr_zmq.h"
21#include "log.h"
35774357 22#include "lib_errors.h"
b6116506 23
bf8d3d6a 24DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback");
b6116506
DL
25
26/* libzmq's context */
27void *frrzmq_context = NULL;
28static unsigned frrzmq_initcount = 0;
29
30void frrzmq_init(void)
31{
32 if (frrzmq_initcount++ == 0) {
33 frrzmq_context = zmq_ctx_new();
34 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
35 }
36}
37
38void frrzmq_finish(void)
39{
40 if (--frrzmq_initcount == 0) {
41 zmq_ctx_term(frrzmq_context);
42 frrzmq_context = NULL;
43 }
44}
45
cc9f21da 46static void frrzmq_read_msg(struct thread *t)
b6116506 47{
afd0f10d 48 struct frrzmq_cb **cbp = THREAD_ARG(t);
49 struct frrzmq_cb *cb;
b6116506
DL
50 zmq_msg_t msg;
51 unsigned partno;
afd0f10d 52 unsigned char read = 0;
b6116506
DL
53 int ret, more;
54 size_t moresz;
55
afd0f10d 56 if (!cbp)
cc9f21da 57 return;
afd0f10d 58 cb = (*cbp);
59 if (!cb || !cb->zmqsock)
cc9f21da 60 return;
afd0f10d 61
b6116506 62 while (1) {
afd0f10d 63 zmq_pollitem_t polli = {.socket = cb->zmqsock,
64 .events = ZMQ_POLLIN};
b6116506
DL
65 ret = zmq_poll(&polli, 1, 0);
66
67 if (ret < 0)
68 goto out_err;
afd0f10d 69
b6116506
DL
70 if (!(polli.revents & ZMQ_POLLIN))
71 break;
72
afd0f10d 73 if (cb->read.cb_msg) {
8fd5502b 74 cb->in_cb = true;
afd0f10d 75 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
8fd5502b
MS
76 cb->in_cb = false;
77
afd0f10d 78 read = 1;
b6116506 79
afd0f10d 80 if (cb->read.cancelled) {
81 frrzmq_check_events(cbp, &cb->write,
82 ZMQ_POLLOUT);
83 cb->read.thread = NULL;
84 if (cb->write.cancelled && !cb->write.thread)
cf2182c0
MS
85 XFREE(MTYPE_ZEROMQ_CB, *cbp);
86
cc9f21da 87 return;
b6116506
DL
88 }
89 continue;
90 }
91
92 partno = 0;
93 if (zmq_msg_init(&msg))
94 goto out_err;
95 do {
96 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
97 if (ret < 0) {
98 if (errno == EAGAIN)
99 break;
100
101 zmq_msg_close(&msg);
102 goto out_err;
103 }
afd0f10d 104 read = 1;
b6116506 105
8fd5502b 106 cb->in_cb = true;
afd0f10d 107 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
108 partno);
8fd5502b
MS
109 cb->in_cb = false;
110
afd0f10d 111 if (cb->read.cancelled) {
b6116506 112 zmq_msg_close(&msg);
afd0f10d 113 frrzmq_check_events(cbp, &cb->write,
114 ZMQ_POLLOUT);
115 cb->read.thread = NULL;
116 if (cb->write.cancelled && !cb->write.thread)
cf2182c0
MS
117 XFREE(MTYPE_ZEROMQ_CB, *cbp);
118
cc9f21da 119 return;
b6116506
DL
120 }
121
122 /* cb_part may have read additional parts of the
123 * message; don't use zmq_msg_more here */
124 moresz = sizeof(more);
125 more = 0;
afd0f10d 126 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
127 &moresz);
b6116506
DL
128 if (ret < 0) {
129 zmq_msg_close(&msg);
130 goto out_err;
131 }
132
133 partno++;
134 } while (more);
135 zmq_msg_close(&msg);
136 }
137
afd0f10d 138 if (read)
139 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
140
1dffe357
MS
141 thread_add_read(t->master, frrzmq_read_msg, cbp,
142 cb->fd, &cb->read.thread);
cc9f21da 143 return;
b6116506
DL
144
145out_err:
450971aa 146 flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
1c50c1c0 147 errno);
afd0f10d 148 if (cb->read.cb_error)
149 cb->read.cb_error(cb->read.arg, cb->zmqsock);
b6116506
DL
150}
151
60a3efec
DL
152int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
153 struct thread_master *master,
154 void (*msgfunc)(void *arg, void *zmqsock),
155 void (*partfunc)(void *arg, void *zmqsock,
156 zmq_msg_t *msg, unsigned partnum),
157 void (*errfunc)(void *arg, void *zmqsock),
158 void *arg, void *zmqsock,
159 struct frrzmq_cb **cbp)
b6116506
DL
160{
161 int fd, events;
162 size_t len;
163 struct frrzmq_cb *cb;
164
afd0f10d 165 if (!cbp)
166 return -1;
b6116506 167 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
afd0f10d 168 return -1;
169 len = sizeof(fd);
170 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
171 return -1;
172 len = sizeof(events);
173 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
174 return -1;
175
176 if (*cbp)
177 cb = *cbp;
178 else {
179 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
08c2d52a 180 cb->write.cancelled = true;
afd0f10d 181 *cbp = cb;
182 }
183
184 cb->zmqsock = zmqsock;
185 cb->fd = fd;
186 cb->read.arg = arg;
187 cb->read.cb_msg = msgfunc;
188 cb->read.cb_part = partfunc;
189 cb->read.cb_error = errfunc;
08c2d52a 190 cb->read.cancelled = false;
8fd5502b 191 cb->in_cb = false;
afd0f10d 192
193 if (events & ZMQ_POLLIN) {
b3d6bc6e
MS
194 thread_cancel(&cb->read.thread);
195
1dffe357 196 thread_add_event(master, frrzmq_read_msg, cbp, fd,
60a3efec 197 &cb->read.thread);
afd0f10d 198 } else
1dffe357
MS
199 thread_add_read(master, frrzmq_read_msg, cbp, fd,
200 &cb->read.thread);
afd0f10d 201 return 0;
202}
203
cc9f21da 204static void frrzmq_write_msg(struct thread *t)
afd0f10d 205{
206 struct frrzmq_cb **cbp = THREAD_ARG(t);
207 struct frrzmq_cb *cb;
208 unsigned char written = 0;
209 int ret;
210
211 if (!cbp)
cc9f21da 212 return;
afd0f10d 213 cb = (*cbp);
214 if (!cb || !cb->zmqsock)
cc9f21da 215 return;
afd0f10d 216
217 while (1) {
218 zmq_pollitem_t polli = {.socket = cb->zmqsock,
219 .events = ZMQ_POLLOUT};
220 ret = zmq_poll(&polli, 1, 0);
221
222 if (ret < 0)
223 goto out_err;
224
225 if (!(polli.revents & ZMQ_POLLOUT))
226 break;
227
228 if (cb->write.cb_msg) {
8fd5502b 229 cb->in_cb = true;
afd0f10d 230 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
8fd5502b
MS
231 cb->in_cb = false;
232
afd0f10d 233 written = 1;
234
235 if (cb->write.cancelled) {
236 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
237 cb->write.thread = NULL;
238 if (cb->read.cancelled && !cb->read.thread)
cf2182c0
MS
239 XFREE(MTYPE_ZEROMQ_CB, *cbp);
240
cc9f21da 241 return;
afd0f10d 242 }
243 continue;
244 }
245 }
246
247 if (written)
248 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
249
1dffe357
MS
250 thread_add_write(t->master, frrzmq_write_msg, cbp,
251 cb->fd, &cb->write.thread);
cc9f21da 252 return;
afd0f10d 253
254out_err:
450971aa 255 flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
1c50c1c0 256 errno);
afd0f10d 257 if (cb->write.cb_error)
258 cb->write.cb_error(cb->write.arg, cb->zmqsock);
afd0f10d 259}
60a3efec
DL
260
261int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
262 struct thread_master *master,
263 void (*msgfunc)(void *arg, void *zmqsock),
264 void (*errfunc)(void *arg, void *zmqsock),
265 void *arg, void *zmqsock, struct frrzmq_cb **cbp)
afd0f10d 266{
267 int fd, events;
268 size_t len;
269 struct frrzmq_cb *cb;
270
271 if (!cbp)
272 return -1;
273 if (!msgfunc)
274 return -1;
b6116506
DL
275 len = sizeof(fd);
276 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
afd0f10d 277 return -1;
b6116506
DL
278 len = sizeof(events);
279 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
afd0f10d 280 return -1;
b6116506 281
afd0f10d 282 if (*cbp)
283 cb = *cbp;
284 else {
285 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
08c2d52a 286 cb->read.cancelled = true;
afd0f10d 287 *cbp = cb;
288 }
b6116506 289
b6116506 290 cb->zmqsock = zmqsock;
b6116506 291 cb->fd = fd;
afd0f10d 292 cb->write.arg = arg;
293 cb->write.cb_msg = msgfunc;
294 cb->write.cb_part = NULL;
295 cb->write.cb_error = errfunc;
08c2d52a 296 cb->write.cancelled = false;
8fd5502b 297 cb->in_cb = false;
afd0f10d 298
299 if (events & ZMQ_POLLOUT) {
b3d6bc6e
MS
300 thread_cancel(&cb->write.thread);
301
60a3efec
DL
302 _thread_add_event(xref, master, frrzmq_write_msg, cbp, fd,
303 &cb->write.thread);
afd0f10d 304 } else
1dffe357
MS
305 thread_add_write(master, frrzmq_write_msg, cbp, fd,
306 &cb->write.thread);
afd0f10d 307 return 0;
308}
b6116506 309
afd0f10d 310void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
311{
312 if (!cb || !*cb)
313 return;
08c2d52a 314 core->cancelled = true;
b3d6bc6e
MS
315 thread_cancel(&core->thread);
316
8fd5502b
MS
317 /* If cancelled from within a callback, don't try to free memory
318 * in this path.
fb1df4cd 319 */
8fd5502b
MS
320 if ((*cb)->in_cb)
321 return;
322
323 /* Ok to free the callback context if no more ... context. */
afd0f10d 324 if ((*cb)->read.cancelled && !(*cb)->read.thread
8fd5502b 325 && (*cb)->write.cancelled && ((*cb)->write.thread == NULL))
afd0f10d 326 XFREE(MTYPE_ZEROMQ_CB, *cb);
b6116506
DL
327}
328
afd0f10d 329void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
330 int event)
b6116506 331{
afd0f10d 332 struct frrzmq_cb *cb;
333 int events;
334 size_t len;
335
336 if (!cbp)
337 return;
338 cb = (*cbp);
339 if (!cb || !cb->zmqsock)
340 return;
341
81b8afcf 342 len = sizeof(events);
afd0f10d 343 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
b6116506 344 return;
e08165de 345 if ((events & event) && core->thread && !core->cancelled) {
afd0f10d 346 struct thread_master *tm = core->thread->master;
e08165de 347
b3d6bc6e
MS
348 thread_cancel(&core->thread);
349
e08165de
MS
350 if (event == ZMQ_POLLIN)
351 thread_add_event(tm, frrzmq_read_msg,
352 cbp, cb->fd, &core->thread);
353 else
354 thread_add_event(tm, frrzmq_write_msg,
355 cbp, cb->fd, &core->thread);
b6116506 356 }
b6116506 357}