]> git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
*: Convert thread_cancelXXX to event_cancelXXX
[mirror_frr.git] / lib / frr_zmq.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * libzebra ZeroMQ bindings
4 * Copyright (C) 2015 David Lamparter
5 */
6
7 /*
8 * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that
9 * the test_zmq.c unit test is still working. There are dependencies
10 * between the two that are extremely fragile. My understanding
11 * is that there is specialized ownership of the cb pointer based
12 * upon what is happening. Those assumptions are supposed to be
13 * tested in the test_zmq.c
14 */
15 #include <zebra.h>
16 #include <zmq.h>
17
18 #include "event.h"
19 #include "memory.h"
20 #include "frr_zmq.h"
21 #include "log.h"
22 #include "lib_errors.h"
23
24 DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback");
25
26 /* libzmq's context */
27 void *frrzmq_context = NULL;
28 static unsigned frrzmq_initcount = 0;
29
30 void frrzmq_init(void)
31 {
32 if (frrzmq_initcount++ == 0) {
33 frrzmq_context = zmq_ctx_new();
34 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
35 }
36 }
37
38 void frrzmq_finish(void)
39 {
40 if (--frrzmq_initcount == 0) {
41 zmq_ctx_term(frrzmq_context);
42 frrzmq_context = NULL;
43 }
44 }
45
46 static void frrzmq_read_msg(struct event *t)
47 {
48 struct frrzmq_cb **cbp = THREAD_ARG(t);
49 struct frrzmq_cb *cb;
50 zmq_msg_t msg;
51 unsigned partno;
52 unsigned char read = 0;
53 int ret, more;
54 size_t moresz;
55
56 if (!cbp)
57 return;
58 cb = (*cbp);
59 if (!cb || !cb->zmqsock)
60 return;
61
62 while (1) {
63 zmq_pollitem_t polli = {.socket = cb->zmqsock,
64 .events = ZMQ_POLLIN};
65 ret = zmq_poll(&polli, 1, 0);
66
67 if (ret < 0)
68 goto out_err;
69
70 if (!(polli.revents & ZMQ_POLLIN))
71 break;
72
73 if (cb->read.cb_msg) {
74 cb->in_cb = true;
75 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
76 cb->in_cb = false;
77
78 read = 1;
79
80 if (cb->read.cancelled) {
81 frrzmq_check_events(cbp, &cb->write,
82 ZMQ_POLLOUT);
83 cb->read.thread = NULL;
84 if (cb->write.cancelled && !cb->write.thread)
85 XFREE(MTYPE_ZEROMQ_CB, *cbp);
86
87 return;
88 }
89 continue;
90 }
91
92 partno = 0;
93 if (zmq_msg_init(&msg))
94 goto out_err;
95 do {
96 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
97 if (ret < 0) {
98 if (errno == EAGAIN)
99 break;
100
101 zmq_msg_close(&msg);
102 goto out_err;
103 }
104 read = 1;
105
106 cb->in_cb = true;
107 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
108 partno);
109 cb->in_cb = false;
110
111 if (cb->read.cancelled) {
112 zmq_msg_close(&msg);
113 frrzmq_check_events(cbp, &cb->write,
114 ZMQ_POLLOUT);
115 cb->read.thread = NULL;
116 if (cb->write.cancelled && !cb->write.thread)
117 XFREE(MTYPE_ZEROMQ_CB, *cbp);
118
119 return;
120 }
121
122 /* cb_part may have read additional parts of the
123 * message; don't use zmq_msg_more here */
124 moresz = sizeof(more);
125 more = 0;
126 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
127 &moresz);
128 if (ret < 0) {
129 zmq_msg_close(&msg);
130 goto out_err;
131 }
132
133 partno++;
134 } while (more);
135 zmq_msg_close(&msg);
136 }
137
138 if (read)
139 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
140
141 event_add_read(t->master, frrzmq_read_msg, cbp, cb->fd,
142 &cb->read.thread);
143 return;
144
145 out_err:
146 flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
147 errno);
148 if (cb->read.cb_error)
149 cb->read.cb_error(cb->read.arg, cb->zmqsock);
150 }
151
152 int _frrzmq_event_add_read(const struct xref_threadsched *xref,
153 struct thread_master *master,
154 void (*msgfunc)(void *arg, void *zmqsock),
155 void (*partfunc)(void *arg, void *zmqsock,
156 zmq_msg_t *msg, unsigned partnum),
157 void (*errfunc)(void *arg, void *zmqsock), void *arg,
158 void *zmqsock, struct frrzmq_cb **cbp)
159 {
160 int fd, events;
161 size_t len;
162 struct frrzmq_cb *cb;
163
164 if (!cbp)
165 return -1;
166 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
167 return -1;
168 len = sizeof(fd);
169 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
170 return -1;
171 len = sizeof(events);
172 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
173 return -1;
174
175 if (*cbp)
176 cb = *cbp;
177 else {
178 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
179 cb->write.cancelled = true;
180 *cbp = cb;
181 }
182
183 cb->zmqsock = zmqsock;
184 cb->fd = fd;
185 cb->read.arg = arg;
186 cb->read.cb_msg = msgfunc;
187 cb->read.cb_part = partfunc;
188 cb->read.cb_error = errfunc;
189 cb->read.cancelled = false;
190 cb->in_cb = false;
191
192 if (events & ZMQ_POLLIN) {
193 event_cancel(&cb->read.thread);
194
195 event_add_event(master, frrzmq_read_msg, cbp, fd,
196 &cb->read.thread);
197 } else
198 event_add_read(master, frrzmq_read_msg, cbp, fd,
199 &cb->read.thread);
200 return 0;
201 }
202
203 static void frrzmq_write_msg(struct event *t)
204 {
205 struct frrzmq_cb **cbp = THREAD_ARG(t);
206 struct frrzmq_cb *cb;
207 unsigned char written = 0;
208 int ret;
209
210 if (!cbp)
211 return;
212 cb = (*cbp);
213 if (!cb || !cb->zmqsock)
214 return;
215
216 while (1) {
217 zmq_pollitem_t polli = {.socket = cb->zmqsock,
218 .events = ZMQ_POLLOUT};
219 ret = zmq_poll(&polli, 1, 0);
220
221 if (ret < 0)
222 goto out_err;
223
224 if (!(polli.revents & ZMQ_POLLOUT))
225 break;
226
227 if (cb->write.cb_msg) {
228 cb->in_cb = true;
229 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
230 cb->in_cb = false;
231
232 written = 1;
233
234 if (cb->write.cancelled) {
235 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
236 cb->write.thread = NULL;
237 if (cb->read.cancelled && !cb->read.thread)
238 XFREE(MTYPE_ZEROMQ_CB, *cbp);
239
240 return;
241 }
242 continue;
243 }
244 }
245
246 if (written)
247 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
248
249 event_add_write(t->master, frrzmq_write_msg, cbp, cb->fd,
250 &cb->write.thread);
251 return;
252
253 out_err:
254 flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
255 errno);
256 if (cb->write.cb_error)
257 cb->write.cb_error(cb->write.arg, cb->zmqsock);
258 }
259
260 int _frrzmq_event_add_write(const struct xref_threadsched *xref,
261 struct thread_master *master,
262 void (*msgfunc)(void *arg, void *zmqsock),
263 void (*errfunc)(void *arg, void *zmqsock),
264 void *arg, void *zmqsock, struct frrzmq_cb **cbp)
265 {
266 int fd, events;
267 size_t len;
268 struct frrzmq_cb *cb;
269
270 if (!cbp)
271 return -1;
272 if (!msgfunc)
273 return -1;
274 len = sizeof(fd);
275 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
276 return -1;
277 len = sizeof(events);
278 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
279 return -1;
280
281 if (*cbp)
282 cb = *cbp;
283 else {
284 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
285 cb->read.cancelled = true;
286 *cbp = cb;
287 }
288
289 cb->zmqsock = zmqsock;
290 cb->fd = fd;
291 cb->write.arg = arg;
292 cb->write.cb_msg = msgfunc;
293 cb->write.cb_part = NULL;
294 cb->write.cb_error = errfunc;
295 cb->write.cancelled = false;
296 cb->in_cb = false;
297
298 if (events & ZMQ_POLLOUT) {
299 event_cancel(&cb->write.thread);
300
301 _event_add_event(xref, master, frrzmq_write_msg, cbp, fd,
302 &cb->write.thread);
303 } else
304 event_add_write(master, frrzmq_write_msg, cbp, fd,
305 &cb->write.thread);
306 return 0;
307 }
308
309 void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
310 {
311 if (!cb || !*cb)
312 return;
313 core->cancelled = true;
314 event_cancel(&core->thread);
315
316 /* If cancelled from within a callback, don't try to free memory
317 * in this path.
318 */
319 if ((*cb)->in_cb)
320 return;
321
322 /* Ok to free the callback context if no more ... context. */
323 if ((*cb)->read.cancelled && !(*cb)->read.thread
324 && (*cb)->write.cancelled && ((*cb)->write.thread == NULL))
325 XFREE(MTYPE_ZEROMQ_CB, *cb);
326 }
327
328 void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
329 int event)
330 {
331 struct frrzmq_cb *cb;
332 int events;
333 size_t len;
334
335 if (!cbp)
336 return;
337 cb = (*cbp);
338 if (!cb || !cb->zmqsock)
339 return;
340
341 len = sizeof(events);
342 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
343 return;
344 if ((events & event) && core->thread && !core->cancelled) {
345 struct thread_master *tm = core->thread->master;
346
347 event_cancel(&core->thread);
348
349 if (event == ZMQ_POLLIN)
350 event_add_event(tm, frrzmq_read_msg, cbp, cb->fd,
351 &core->thread);
352 else
353 event_add_event(tm, frrzmq_write_msg, cbp, cb->fd,
354 &core->thread);
355 }
356 }