]>
Commit | Line | Data |
---|---|---|
acddc0ed | 1 | // SPDX-License-Identifier: GPL-2.0-or-later |
b6116506 DL |
2 | /* |
3 | * libzebra ZeroMQ bindings | |
4 | * Copyright (C) 2015 David Lamparter | |
b6116506 DL |
5 | */ |
6 | ||
fb1df4cd DS |
7 | /* |
8 | * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that | |
214d8a60 | 9 | * the test_zmq.c unit test is still working. There are dependencies |
fb1df4cd DS |
10 | * between the two that are extremely fragile. My understanding |
11 | * is that there is specialized ownership of the cb pointer based | |
12 | * upon what is happening. Those assumptions are supposed to be | |
13 | * tested in the test_zmq.c | |
14 | */ | |
b6116506 DL |
15 | #include <zebra.h> |
16 | #include <zmq.h> | |
17 | ||
cb37cb33 | 18 | #include "event.h" |
b6116506 DL |
19 | #include "memory.h" |
20 | #include "frr_zmq.h" | |
21 | #include "log.h" | |
35774357 | 22 | #include "lib_errors.h" |
b6116506 | 23 | |
bf8d3d6a | 24 | DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback"); |
b6116506 DL |
25 | |
26 | /* libzmq's context */ | |
27 | void *frrzmq_context = NULL; | |
28 | static unsigned frrzmq_initcount = 0; | |
29 | ||
30 | void frrzmq_init(void) | |
31 | { | |
32 | if (frrzmq_initcount++ == 0) { | |
33 | frrzmq_context = zmq_ctx_new(); | |
34 | zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1); | |
35 | } | |
36 | } | |
37 | ||
38 | void frrzmq_finish(void) | |
39 | { | |
40 | if (--frrzmq_initcount == 0) { | |
41 | zmq_ctx_term(frrzmq_context); | |
42 | frrzmq_context = NULL; | |
43 | } | |
44 | } | |
45 | ||
e6685141 | 46 | static void frrzmq_read_msg(struct event *t) |
b6116506 | 47 | { |
afd0f10d | 48 | struct frrzmq_cb **cbp = THREAD_ARG(t); |
49 | struct frrzmq_cb *cb; | |
b6116506 DL |
50 | zmq_msg_t msg; |
51 | unsigned partno; | |
afd0f10d | 52 | unsigned char read = 0; |
b6116506 DL |
53 | int ret, more; |
54 | size_t moresz; | |
55 | ||
afd0f10d | 56 | if (!cbp) |
cc9f21da | 57 | return; |
afd0f10d | 58 | cb = (*cbp); |
59 | if (!cb || !cb->zmqsock) | |
cc9f21da | 60 | return; |
afd0f10d | 61 | |
b6116506 | 62 | while (1) { |
afd0f10d | 63 | zmq_pollitem_t polli = {.socket = cb->zmqsock, |
64 | .events = ZMQ_POLLIN}; | |
b6116506 DL |
65 | ret = zmq_poll(&polli, 1, 0); |
66 | ||
67 | if (ret < 0) | |
68 | goto out_err; | |
afd0f10d | 69 | |
b6116506 DL |
70 | if (!(polli.revents & ZMQ_POLLIN)) |
71 | break; | |
72 | ||
afd0f10d | 73 | if (cb->read.cb_msg) { |
8fd5502b | 74 | cb->in_cb = true; |
afd0f10d | 75 | cb->read.cb_msg(cb->read.arg, cb->zmqsock); |
8fd5502b MS |
76 | cb->in_cb = false; |
77 | ||
afd0f10d | 78 | read = 1; |
b6116506 | 79 | |
afd0f10d | 80 | if (cb->read.cancelled) { |
81 | frrzmq_check_events(cbp, &cb->write, | |
82 | ZMQ_POLLOUT); | |
83 | cb->read.thread = NULL; | |
84 | if (cb->write.cancelled && !cb->write.thread) | |
cf2182c0 MS |
85 | XFREE(MTYPE_ZEROMQ_CB, *cbp); |
86 | ||
cc9f21da | 87 | return; |
b6116506 DL |
88 | } |
89 | continue; | |
90 | } | |
91 | ||
92 | partno = 0; | |
93 | if (zmq_msg_init(&msg)) | |
94 | goto out_err; | |
95 | do { | |
96 | ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK); | |
97 | if (ret < 0) { | |
98 | if (errno == EAGAIN) | |
99 | break; | |
100 | ||
101 | zmq_msg_close(&msg); | |
102 | goto out_err; | |
103 | } | |
afd0f10d | 104 | read = 1; |
b6116506 | 105 | |
8fd5502b | 106 | cb->in_cb = true; |
afd0f10d | 107 | cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg, |
108 | partno); | |
8fd5502b MS |
109 | cb->in_cb = false; |
110 | ||
afd0f10d | 111 | if (cb->read.cancelled) { |
b6116506 | 112 | zmq_msg_close(&msg); |
afd0f10d | 113 | frrzmq_check_events(cbp, &cb->write, |
114 | ZMQ_POLLOUT); | |
115 | cb->read.thread = NULL; | |
116 | if (cb->write.cancelled && !cb->write.thread) | |
cf2182c0 MS |
117 | XFREE(MTYPE_ZEROMQ_CB, *cbp); |
118 | ||
cc9f21da | 119 | return; |
b6116506 DL |
120 | } |
121 | ||
122 | /* cb_part may have read additional parts of the | |
123 | * message; don't use zmq_msg_more here */ | |
124 | moresz = sizeof(more); | |
125 | more = 0; | |
afd0f10d | 126 | ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more, |
127 | &moresz); | |
b6116506 DL |
128 | if (ret < 0) { |
129 | zmq_msg_close(&msg); | |
130 | goto out_err; | |
131 | } | |
132 | ||
133 | partno++; | |
134 | } while (more); | |
135 | zmq_msg_close(&msg); | |
136 | } | |
137 | ||
afd0f10d | 138 | if (read) |
139 | frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT); | |
140 | ||
907a2395 DS |
141 | event_add_read(t->master, frrzmq_read_msg, cbp, cb->fd, |
142 | &cb->read.thread); | |
cc9f21da | 143 | return; |
b6116506 DL |
144 | |
145 | out_err: | |
450971aa | 146 | flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno), |
1c50c1c0 | 147 | errno); |
afd0f10d | 148 | if (cb->read.cb_error) |
149 | cb->read.cb_error(cb->read.arg, cb->zmqsock); | |
b6116506 DL |
150 | } |
151 | ||
907a2395 | 152 | int _frrzmq_event_add_read(const struct xref_threadsched *xref, |
2453d15d | 153 | struct event_master *master, |
907a2395 DS |
154 | void (*msgfunc)(void *arg, void *zmqsock), |
155 | void (*partfunc)(void *arg, void *zmqsock, | |
156 | zmq_msg_t *msg, unsigned partnum), | |
157 | void (*errfunc)(void *arg, void *zmqsock), void *arg, | |
158 | void *zmqsock, struct frrzmq_cb **cbp) | |
b6116506 DL |
159 | { |
160 | int fd, events; | |
161 | size_t len; | |
162 | struct frrzmq_cb *cb; | |
163 | ||
afd0f10d | 164 | if (!cbp) |
165 | return -1; | |
b6116506 | 166 | if (!(msgfunc || partfunc) || (msgfunc && partfunc)) |
afd0f10d | 167 | return -1; |
168 | len = sizeof(fd); | |
169 | if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) | |
170 | return -1; | |
171 | len = sizeof(events); | |
172 | if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) | |
173 | return -1; | |
174 | ||
175 | if (*cbp) | |
176 | cb = *cbp; | |
177 | else { | |
178 | cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); | |
08c2d52a | 179 | cb->write.cancelled = true; |
afd0f10d | 180 | *cbp = cb; |
181 | } | |
182 | ||
183 | cb->zmqsock = zmqsock; | |
184 | cb->fd = fd; | |
185 | cb->read.arg = arg; | |
186 | cb->read.cb_msg = msgfunc; | |
187 | cb->read.cb_part = partfunc; | |
188 | cb->read.cb_error = errfunc; | |
08c2d52a | 189 | cb->read.cancelled = false; |
8fd5502b | 190 | cb->in_cb = false; |
afd0f10d | 191 | |
192 | if (events & ZMQ_POLLIN) { | |
332beb64 | 193 | event_cancel(&cb->read.thread); |
b3d6bc6e | 194 | |
907a2395 | 195 | event_add_event(master, frrzmq_read_msg, cbp, fd, |
1dffe357 | 196 | &cb->read.thread); |
907a2395 DS |
197 | } else |
198 | event_add_read(master, frrzmq_read_msg, cbp, fd, | |
199 | &cb->read.thread); | |
afd0f10d | 200 | return 0; |
201 | } | |
202 | ||
e6685141 | 203 | static void frrzmq_write_msg(struct event *t) |
afd0f10d | 204 | { |
205 | struct frrzmq_cb **cbp = THREAD_ARG(t); | |
206 | struct frrzmq_cb *cb; | |
207 | unsigned char written = 0; | |
208 | int ret; | |
209 | ||
210 | if (!cbp) | |
cc9f21da | 211 | return; |
afd0f10d | 212 | cb = (*cbp); |
213 | if (!cb || !cb->zmqsock) | |
cc9f21da | 214 | return; |
afd0f10d | 215 | |
216 | while (1) { | |
217 | zmq_pollitem_t polli = {.socket = cb->zmqsock, | |
218 | .events = ZMQ_POLLOUT}; | |
219 | ret = zmq_poll(&polli, 1, 0); | |
220 | ||
221 | if (ret < 0) | |
222 | goto out_err; | |
223 | ||
224 | if (!(polli.revents & ZMQ_POLLOUT)) | |
225 | break; | |
226 | ||
227 | if (cb->write.cb_msg) { | |
8fd5502b | 228 | cb->in_cb = true; |
afd0f10d | 229 | cb->write.cb_msg(cb->write.arg, cb->zmqsock); |
8fd5502b MS |
230 | cb->in_cb = false; |
231 | ||
afd0f10d | 232 | written = 1; |
233 | ||
234 | if (cb->write.cancelled) { | |
235 | frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); | |
236 | cb->write.thread = NULL; | |
237 | if (cb->read.cancelled && !cb->read.thread) | |
cf2182c0 MS |
238 | XFREE(MTYPE_ZEROMQ_CB, *cbp); |
239 | ||
cc9f21da | 240 | return; |
afd0f10d | 241 | } |
242 | continue; | |
243 | } | |
244 | } | |
245 | ||
246 | if (written) | |
247 | frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); | |
248 | ||
907a2395 DS |
249 | event_add_write(t->master, frrzmq_write_msg, cbp, cb->fd, |
250 | &cb->write.thread); | |
cc9f21da | 251 | return; |
afd0f10d | 252 | |
253 | out_err: | |
450971aa | 254 | flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno), |
1c50c1c0 | 255 | errno); |
afd0f10d | 256 | if (cb->write.cb_error) |
257 | cb->write.cb_error(cb->write.arg, cb->zmqsock); | |
afd0f10d | 258 | } |
60a3efec | 259 | |
907a2395 | 260 | int _frrzmq_event_add_write(const struct xref_threadsched *xref, |
2453d15d | 261 | struct event_master *master, |
907a2395 DS |
262 | void (*msgfunc)(void *arg, void *zmqsock), |
263 | void (*errfunc)(void *arg, void *zmqsock), | |
264 | void *arg, void *zmqsock, struct frrzmq_cb **cbp) | |
afd0f10d | 265 | { |
266 | int fd, events; | |
267 | size_t len; | |
268 | struct frrzmq_cb *cb; | |
269 | ||
270 | if (!cbp) | |
271 | return -1; | |
272 | if (!msgfunc) | |
273 | return -1; | |
b6116506 DL |
274 | len = sizeof(fd); |
275 | if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) | |
afd0f10d | 276 | return -1; |
b6116506 DL |
277 | len = sizeof(events); |
278 | if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) | |
afd0f10d | 279 | return -1; |
b6116506 | 280 | |
afd0f10d | 281 | if (*cbp) |
282 | cb = *cbp; | |
283 | else { | |
284 | cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); | |
08c2d52a | 285 | cb->read.cancelled = true; |
afd0f10d | 286 | *cbp = cb; |
287 | } | |
b6116506 | 288 | |
b6116506 | 289 | cb->zmqsock = zmqsock; |
b6116506 | 290 | cb->fd = fd; |
afd0f10d | 291 | cb->write.arg = arg; |
292 | cb->write.cb_msg = msgfunc; | |
293 | cb->write.cb_part = NULL; | |
294 | cb->write.cb_error = errfunc; | |
08c2d52a | 295 | cb->write.cancelled = false; |
8fd5502b | 296 | cb->in_cb = false; |
afd0f10d | 297 | |
298 | if (events & ZMQ_POLLOUT) { | |
332beb64 | 299 | event_cancel(&cb->write.thread); |
b3d6bc6e | 300 | |
907a2395 | 301 | _event_add_event(xref, master, frrzmq_write_msg, cbp, fd, |
1dffe357 | 302 | &cb->write.thread); |
907a2395 DS |
303 | } else |
304 | event_add_write(master, frrzmq_write_msg, cbp, fd, | |
305 | &cb->write.thread); | |
afd0f10d | 306 | return 0; |
307 | } | |
b6116506 | 308 | |
afd0f10d | 309 | void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core) |
310 | { | |
311 | if (!cb || !*cb) | |
312 | return; | |
08c2d52a | 313 | core->cancelled = true; |
332beb64 | 314 | event_cancel(&core->thread); |
b3d6bc6e | 315 | |
8fd5502b MS |
316 | /* If cancelled from within a callback, don't try to free memory |
317 | * in this path. | |
fb1df4cd | 318 | */ |
8fd5502b MS |
319 | if ((*cb)->in_cb) |
320 | return; | |
321 | ||
322 | /* Ok to free the callback context if no more ... context. */ | |
afd0f10d | 323 | if ((*cb)->read.cancelled && !(*cb)->read.thread |
8fd5502b | 324 | && (*cb)->write.cancelled && ((*cb)->write.thread == NULL)) |
afd0f10d | 325 | XFREE(MTYPE_ZEROMQ_CB, *cb); |
b6116506 DL |
326 | } |
327 | ||
afd0f10d | 328 | void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core, |
329 | int event) | |
b6116506 | 330 | { |
afd0f10d | 331 | struct frrzmq_cb *cb; |
332 | int events; | |
333 | size_t len; | |
334 | ||
335 | if (!cbp) | |
336 | return; | |
337 | cb = (*cbp); | |
338 | if (!cb || !cb->zmqsock) | |
339 | return; | |
340 | ||
81b8afcf | 341 | len = sizeof(events); |
afd0f10d | 342 | if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len)) |
b6116506 | 343 | return; |
e08165de | 344 | if ((events & event) && core->thread && !core->cancelled) { |
2453d15d | 345 | struct event_master *tm = core->thread->master; |
e08165de | 346 | |
332beb64 | 347 | event_cancel(&core->thread); |
b3d6bc6e | 348 | |
e08165de | 349 | if (event == ZMQ_POLLIN) |
907a2395 DS |
350 | event_add_event(tm, frrzmq_read_msg, cbp, cb->fd, |
351 | &core->thread); | |
e08165de | 352 | else |
907a2395 DS |
353 | event_add_event(tm, frrzmq_write_msg, cbp, cb->fd, |
354 | &core->thread); | |
b6116506 | 355 | } |
b6116506 | 356 | } |