]>
git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that
22 * the test_zmq.c unit test is still working. There are dependancies
23 * between the two that are extremely fragile. My understanding
24 * is that there is specialized ownership of the cb pointer based
25 * upon what is happening. Those assumptions are supposed to be
26 * tested in the test_zmq.c
35 #include "lib_errors.h"
37 DEFINE_MTYPE_STATIC(LIB
, ZEROMQ_CB
, "ZeroMQ callback");
39 /* libzmq's context */
40 void *frrzmq_context
= NULL
;
41 static unsigned frrzmq_initcount
= 0;
43 void frrzmq_init(void)
45 if (frrzmq_initcount
++ == 0) {
46 frrzmq_context
= zmq_ctx_new();
47 zmq_ctx_set(frrzmq_context
, ZMQ_IPV6
, 1);
51 void frrzmq_finish(void)
53 if (--frrzmq_initcount
== 0) {
54 zmq_ctx_term(frrzmq_context
);
55 frrzmq_context
= NULL
;
59 static int frrzmq_read_msg(struct thread
*t
)
61 struct frrzmq_cb
**cbp
= THREAD_ARG(t
);
65 unsigned char read
= 0;
72 if (!cb
|| !cb
->zmqsock
)
76 zmq_pollitem_t polli
= {.socket
= cb
->zmqsock
,
77 .events
= ZMQ_POLLIN
};
78 ret
= zmq_poll(&polli
, 1, 0);
83 if (!(polli
.revents
& ZMQ_POLLIN
))
86 if (cb
->read
.cb_msg
) {
87 cb
->read
.cb_msg(cb
->read
.arg
, cb
->zmqsock
);
90 if (cb
->read
.cancelled
) {
91 frrzmq_check_events(cbp
, &cb
->write
,
93 cb
->read
.thread
= NULL
;
94 if (cb
->write
.cancelled
&& !cb
->write
.thread
)
95 XFREE(MTYPE_ZEROMQ_CB
, *cbp
);
103 if (zmq_msg_init(&msg
))
106 ret
= zmq_msg_recv(&msg
, cb
->zmqsock
, ZMQ_NOBLOCK
);
116 cb
->read
.cb_part(cb
->read
.arg
, cb
->zmqsock
, &msg
,
118 if (cb
->read
.cancelled
) {
120 frrzmq_check_events(cbp
, &cb
->write
,
122 cb
->read
.thread
= NULL
;
123 if (cb
->write
.cancelled
&& !cb
->write
.thread
)
124 XFREE(MTYPE_ZEROMQ_CB
, *cbp
);
129 /* cb_part may have read additional parts of the
130 * message; don't use zmq_msg_more here */
131 moresz
= sizeof(more
);
133 ret
= zmq_getsockopt(cb
->zmqsock
, ZMQ_RCVMORE
, &more
,
146 frrzmq_check_events(cbp
, &cb
->write
, ZMQ_POLLOUT
);
148 thread_add_read(t
->master
, frrzmq_read_msg
, cbp
,
149 cb
->fd
, &cb
->read
.thread
);
153 flog_err(EC_LIB_ZMQ
, "ZeroMQ read error: %s(%d)", strerror(errno
),
155 if (cb
->read
.cb_error
)
156 cb
->read
.cb_error(cb
->read
.arg
, cb
->zmqsock
);
160 int _frrzmq_thread_add_read(const struct xref_threadsched
*xref
,
161 struct thread_master
*master
,
162 void (*msgfunc
)(void *arg
, void *zmqsock
),
163 void (*partfunc
)(void *arg
, void *zmqsock
,
164 zmq_msg_t
*msg
, unsigned partnum
),
165 void (*errfunc
)(void *arg
, void *zmqsock
),
166 void *arg
, void *zmqsock
,
167 struct frrzmq_cb
**cbp
)
171 struct frrzmq_cb
*cb
;
175 if (!(msgfunc
|| partfunc
) || (msgfunc
&& partfunc
))
178 if (zmq_getsockopt(zmqsock
, ZMQ_FD
, &fd
, &len
))
180 len
= sizeof(events
);
181 if (zmq_getsockopt(zmqsock
, ZMQ_EVENTS
, &events
, &len
))
187 cb
= XCALLOC(MTYPE_ZEROMQ_CB
, sizeof(struct frrzmq_cb
));
189 cb
->write
.cancelled
= true;
193 cb
->zmqsock
= zmqsock
;
196 cb
->read
.cb_msg
= msgfunc
;
197 cb
->read
.cb_part
= partfunc
;
198 cb
->read
.cb_error
= errfunc
;
199 cb
->read
.cancelled
= false;
201 if (events
& ZMQ_POLLIN
) {
202 thread_cancel(&cb
->read
.thread
);
204 thread_add_event(master
, frrzmq_read_msg
, cbp
, fd
,
207 thread_add_read(master
, frrzmq_read_msg
, cbp
, fd
,
212 static int frrzmq_write_msg(struct thread
*t
)
214 struct frrzmq_cb
**cbp
= THREAD_ARG(t
);
215 struct frrzmq_cb
*cb
;
216 unsigned char written
= 0;
222 if (!cb
|| !cb
->zmqsock
)
226 zmq_pollitem_t polli
= {.socket
= cb
->zmqsock
,
227 .events
= ZMQ_POLLOUT
};
228 ret
= zmq_poll(&polli
, 1, 0);
233 if (!(polli
.revents
& ZMQ_POLLOUT
))
236 if (cb
->write
.cb_msg
) {
237 cb
->write
.cb_msg(cb
->write
.arg
, cb
->zmqsock
);
240 if (cb
->write
.cancelled
) {
241 frrzmq_check_events(cbp
, &cb
->read
, ZMQ_POLLIN
);
242 cb
->write
.thread
= NULL
;
243 if (cb
->read
.cancelled
&& !cb
->read
.thread
)
244 XFREE(MTYPE_ZEROMQ_CB
, *cbp
);
253 frrzmq_check_events(cbp
, &cb
->read
, ZMQ_POLLIN
);
255 thread_add_write(t
->master
, frrzmq_write_msg
, cbp
,
256 cb
->fd
, &cb
->write
.thread
);
260 flog_err(EC_LIB_ZMQ
, "ZeroMQ write error: %s(%d)", strerror(errno
),
262 if (cb
->write
.cb_error
)
263 cb
->write
.cb_error(cb
->write
.arg
, cb
->zmqsock
);
267 int _frrzmq_thread_add_write(const struct xref_threadsched
*xref
,
268 struct thread_master
*master
,
269 void (*msgfunc
)(void *arg
, void *zmqsock
),
270 void (*errfunc
)(void *arg
, void *zmqsock
),
271 void *arg
, void *zmqsock
, struct frrzmq_cb
**cbp
)
275 struct frrzmq_cb
*cb
;
282 if (zmq_getsockopt(zmqsock
, ZMQ_FD
, &fd
, &len
))
284 len
= sizeof(events
);
285 if (zmq_getsockopt(zmqsock
, ZMQ_EVENTS
, &events
, &len
))
291 cb
= XCALLOC(MTYPE_ZEROMQ_CB
, sizeof(struct frrzmq_cb
));
293 cb
->read
.cancelled
= true;
297 cb
->zmqsock
= zmqsock
;
300 cb
->write
.cb_msg
= msgfunc
;
301 cb
->write
.cb_part
= NULL
;
302 cb
->write
.cb_error
= errfunc
;
303 cb
->write
.cancelled
= false;
305 if (events
& ZMQ_POLLOUT
) {
306 thread_cancel(&cb
->write
.thread
);
308 _thread_add_event(xref
, master
, frrzmq_write_msg
, cbp
, fd
,
311 thread_add_write(master
, frrzmq_write_msg
, cbp
, fd
,
316 void frrzmq_thread_cancel(struct frrzmq_cb
**cb
, struct cb_core
*core
)
320 core
->cancelled
= true;
321 thread_cancel(&core
->thread
);
324 * Looking at this code one would assume that FRR
325 * would want a `!(*cb)->write.thread. This was
326 * attempted in e08165def1c62beee0e87385 but this
327 * change caused `make check` to stop working
328 * which was not noticed because our CI system
329 * does not build with zeromq. Put this back
330 * to the code as written in 2017. e08165de..
331 * was introduced in 2021. So someone was ok
332 * with frrzmq_thread_cancel for 4 years. This will
333 * allow those people doing `make check` to continue
334 * working. In the meantime if the people using
335 * this code see an issue they can fix it
337 if ((*cb
)->read
.cancelled
&& !(*cb
)->read
.thread
338 && (*cb
)->write
.cancelled
&& (*cb
)->write
.thread
)
339 XFREE(MTYPE_ZEROMQ_CB
, *cb
);
342 void frrzmq_check_events(struct frrzmq_cb
**cbp
, struct cb_core
*core
,
345 struct frrzmq_cb
*cb
;
352 if (!cb
|| !cb
->zmqsock
)
355 len
= sizeof(events
);
356 if (zmq_getsockopt(cb
->zmqsock
, ZMQ_EVENTS
, &events
, &len
))
358 if ((events
& event
) && core
->thread
&& !core
->cancelled
) {
359 struct thread_master
*tm
= core
->thread
->master
;
361 thread_cancel(&core
->thread
);
363 if (event
== ZMQ_POLLIN
)
364 thread_add_event(tm
, frrzmq_read_msg
,
365 cbp
, cb
->fd
, &core
->thread
);
367 thread_add_event(tm
, frrzmq_write_msg
,
368 cbp
, cb
->fd
, &core
->thread
);