]> git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
Merge pull request #8870 from anlancs/master-fix-reload-service
[mirror_frr.git] / lib / frr_zmq.c
1 /*
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20 /*
21 * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that
22 * the test_zmq.c unit test is still working. There are dependancies
23 * between the two that are extremely fragile. My understanding
24 * is that there is specialized ownership of the cb pointer based
25 * upon what is happening. Those assumptions are supposed to be
26 * tested in the test_zmq.c
27 */
28 #include <zebra.h>
29 #include <zmq.h>
30
31 #include "thread.h"
32 #include "memory.h"
33 #include "frr_zmq.h"
34 #include "log.h"
35 #include "lib_errors.h"
36
37 DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback");
38
39 /* libzmq's context */
40 void *frrzmq_context = NULL;
41 static unsigned frrzmq_initcount = 0;
42
43 void frrzmq_init(void)
44 {
45 if (frrzmq_initcount++ == 0) {
46 frrzmq_context = zmq_ctx_new();
47 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
48 }
49 }
50
51 void frrzmq_finish(void)
52 {
53 if (--frrzmq_initcount == 0) {
54 zmq_ctx_term(frrzmq_context);
55 frrzmq_context = NULL;
56 }
57 }
58
59 static int frrzmq_read_msg(struct thread *t)
60 {
61 struct frrzmq_cb **cbp = THREAD_ARG(t);
62 struct frrzmq_cb *cb;
63 zmq_msg_t msg;
64 unsigned partno;
65 unsigned char read = 0;
66 int ret, more;
67 size_t moresz;
68
69 if (!cbp)
70 return 1;
71 cb = (*cbp);
72 if (!cb || !cb->zmqsock)
73 return 1;
74
75 while (1) {
76 zmq_pollitem_t polli = {.socket = cb->zmqsock,
77 .events = ZMQ_POLLIN};
78 ret = zmq_poll(&polli, 1, 0);
79
80 if (ret < 0)
81 goto out_err;
82
83 if (!(polli.revents & ZMQ_POLLIN))
84 break;
85
86 if (cb->read.cb_msg) {
87 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
88 read = 1;
89
90 if (cb->read.cancelled) {
91 frrzmq_check_events(cbp, &cb->write,
92 ZMQ_POLLOUT);
93 cb->read.thread = NULL;
94 if (cb->write.cancelled && !cb->write.thread)
95 XFREE(MTYPE_ZEROMQ_CB, cb);
96 return 0;
97 }
98 continue;
99 }
100
101 partno = 0;
102 if (zmq_msg_init(&msg))
103 goto out_err;
104 do {
105 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
106 if (ret < 0) {
107 if (errno == EAGAIN)
108 break;
109
110 zmq_msg_close(&msg);
111 goto out_err;
112 }
113 read = 1;
114
115 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
116 partno);
117 if (cb->read.cancelled) {
118 zmq_msg_close(&msg);
119 frrzmq_check_events(cbp, &cb->write,
120 ZMQ_POLLOUT);
121 cb->read.thread = NULL;
122 if (cb->write.cancelled && !cb->write.thread)
123 XFREE(MTYPE_ZEROMQ_CB, cb);
124 return 0;
125 }
126
127 /* cb_part may have read additional parts of the
128 * message; don't use zmq_msg_more here */
129 moresz = sizeof(more);
130 more = 0;
131 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
132 &moresz);
133 if (ret < 0) {
134 zmq_msg_close(&msg);
135 goto out_err;
136 }
137
138 partno++;
139 } while (more);
140 zmq_msg_close(&msg);
141 }
142
143 if (read)
144 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
145
146 thread_add_read(t->master, frrzmq_read_msg, cbp,
147 cb->fd, &cb->read.thread);
148 return 0;
149
150 out_err:
151 flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
152 errno);
153 if (cb->read.cb_error)
154 cb->read.cb_error(cb->read.arg, cb->zmqsock);
155 return 1;
156 }
157
158 int _frrzmq_thread_add_read(const struct xref_threadsched *xref,
159 struct thread_master *master,
160 void (*msgfunc)(void *arg, void *zmqsock),
161 void (*partfunc)(void *arg, void *zmqsock,
162 zmq_msg_t *msg, unsigned partnum),
163 void (*errfunc)(void *arg, void *zmqsock),
164 void *arg, void *zmqsock,
165 struct frrzmq_cb **cbp)
166 {
167 int fd, events;
168 size_t len;
169 struct frrzmq_cb *cb;
170
171 if (!cbp)
172 return -1;
173 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
174 return -1;
175 len = sizeof(fd);
176 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
177 return -1;
178 len = sizeof(events);
179 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
180 return -1;
181
182 if (*cbp)
183 cb = *cbp;
184 else {
185 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
186
187 cb->write.cancelled = true;
188 *cbp = cb;
189 }
190
191 cb->zmqsock = zmqsock;
192 cb->fd = fd;
193 cb->read.arg = arg;
194 cb->read.cb_msg = msgfunc;
195 cb->read.cb_part = partfunc;
196 cb->read.cb_error = errfunc;
197 cb->read.cancelled = false;
198
199 if (events & ZMQ_POLLIN) {
200 thread_cancel(&cb->read.thread);
201
202 thread_add_event(master, frrzmq_read_msg, cbp, fd,
203 &cb->read.thread);
204 } else
205 thread_add_read(master, frrzmq_read_msg, cbp, fd,
206 &cb->read.thread);
207 return 0;
208 }
209
210 static int frrzmq_write_msg(struct thread *t)
211 {
212 struct frrzmq_cb **cbp = THREAD_ARG(t);
213 struct frrzmq_cb *cb;
214 unsigned char written = 0;
215 int ret;
216
217 if (!cbp)
218 return 1;
219 cb = (*cbp);
220 if (!cb || !cb->zmqsock)
221 return 1;
222
223 while (1) {
224 zmq_pollitem_t polli = {.socket = cb->zmqsock,
225 .events = ZMQ_POLLOUT};
226 ret = zmq_poll(&polli, 1, 0);
227
228 if (ret < 0)
229 goto out_err;
230
231 if (!(polli.revents & ZMQ_POLLOUT))
232 break;
233
234 if (cb->write.cb_msg) {
235 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
236 written = 1;
237
238 if (cb->write.cancelled) {
239 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
240 cb->write.thread = NULL;
241 if (cb->read.cancelled && !cb->read.thread)
242 XFREE(MTYPE_ZEROMQ_CB, cb);
243 return 0;
244 }
245 continue;
246 }
247 }
248
249 if (written)
250 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
251
252 thread_add_write(t->master, frrzmq_write_msg, cbp,
253 cb->fd, &cb->write.thread);
254 return 0;
255
256 out_err:
257 flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
258 errno);
259 if (cb->write.cb_error)
260 cb->write.cb_error(cb->write.arg, cb->zmqsock);
261 return 1;
262 }
263
264 int _frrzmq_thread_add_write(const struct xref_threadsched *xref,
265 struct thread_master *master,
266 void (*msgfunc)(void *arg, void *zmqsock),
267 void (*errfunc)(void *arg, void *zmqsock),
268 void *arg, void *zmqsock, struct frrzmq_cb **cbp)
269 {
270 int fd, events;
271 size_t len;
272 struct frrzmq_cb *cb;
273
274 if (!cbp)
275 return -1;
276 if (!msgfunc)
277 return -1;
278 len = sizeof(fd);
279 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
280 return -1;
281 len = sizeof(events);
282 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
283 return -1;
284
285 if (*cbp)
286 cb = *cbp;
287 else {
288 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
289
290 cb->read.cancelled = true;
291 *cbp = cb;
292 }
293
294 cb->zmqsock = zmqsock;
295 cb->fd = fd;
296 cb->write.arg = arg;
297 cb->write.cb_msg = msgfunc;
298 cb->write.cb_part = NULL;
299 cb->write.cb_error = errfunc;
300 cb->write.cancelled = false;
301
302 if (events & ZMQ_POLLOUT) {
303 thread_cancel(&cb->write.thread);
304
305 _thread_add_event(xref, master, frrzmq_write_msg, cbp, fd,
306 &cb->write.thread);
307 } else
308 thread_add_write(master, frrzmq_write_msg, cbp, fd,
309 &cb->write.thread);
310 return 0;
311 }
312
313 void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
314 {
315 if (!cb || !*cb)
316 return;
317 core->cancelled = true;
318 thread_cancel(&core->thread);
319
320 /*
321 * Looking at this code one would assume that FRR
322 * would want a `!(*cb)->write.thread. This was
323 * attempted in e08165def1c62beee0e87385 but this
324 * change caused `make check` to stop working
325 * which was not noticed because our CI system
326 * does not build with zeromq. Put this back
327 * to the code as written in 2017. e08165de..
328 * was introduced in 2021. So someone was ok
329 * with frrzmq_thread_cancel for 4 years. This will
330 * allow those people doing `make check` to continue
331 * working. In the meantime if the people using
332 * this code see an issue they can fix it
333 */
334 if ((*cb)->read.cancelled && !(*cb)->read.thread
335 && (*cb)->write.cancelled && (*cb)->write.thread)
336 XFREE(MTYPE_ZEROMQ_CB, *cb);
337 }
338
339 void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
340 int event)
341 {
342 struct frrzmq_cb *cb;
343 int events;
344 size_t len;
345
346 if (!cbp)
347 return;
348 cb = (*cbp);
349 if (!cb || !cb->zmqsock)
350 return;
351
352 len = sizeof(events);
353 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
354 return;
355 if ((events & event) && core->thread && !core->cancelled) {
356 struct thread_master *tm = core->thread->master;
357
358 thread_cancel(&core->thread);
359
360 if (event == ZMQ_POLLIN)
361 thread_add_event(tm, frrzmq_read_msg,
362 cbp, cb->fd, &core->thread);
363 else
364 thread_add_event(tm, frrzmq_write_msg,
365 cbp, cb->fd, &core->thread);
366 }
367 }