]>
git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
3 * libzebra ZeroMQ bindings
4 * Copyright (C) 2015 David Lamparter
8 * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that
9 * the test_zmq.c unit test is still working. There are dependencies
10 * between the two that are extremely fragile. My understanding
11 * is that there is specialized ownership of the cb pointer based
12 * upon what is happening. Those assumptions are supposed to be
13 * tested in the test_zmq.c
22 #include "lib_errors.h"
24 DEFINE_MTYPE_STATIC(LIB
, ZEROMQ_CB
, "ZeroMQ callback");
26 /* libzmq's context */
27 void *frrzmq_context
= NULL
;
28 static unsigned frrzmq_initcount
= 0;
30 void frrzmq_init(void)
32 if (frrzmq_initcount
++ == 0) {
33 frrzmq_context
= zmq_ctx_new();
34 zmq_ctx_set(frrzmq_context
, ZMQ_IPV6
, 1);
38 void frrzmq_finish(void)
40 if (--frrzmq_initcount
== 0) {
41 zmq_ctx_term(frrzmq_context
);
42 frrzmq_context
= NULL
;
46 static void frrzmq_read_msg(struct thread
*t
)
48 struct frrzmq_cb
**cbp
= THREAD_ARG(t
);
52 unsigned char read
= 0;
59 if (!cb
|| !cb
->zmqsock
)
63 zmq_pollitem_t polli
= {.socket
= cb
->zmqsock
,
64 .events
= ZMQ_POLLIN
};
65 ret
= zmq_poll(&polli
, 1, 0);
70 if (!(polli
.revents
& ZMQ_POLLIN
))
73 if (cb
->read
.cb_msg
) {
75 cb
->read
.cb_msg(cb
->read
.arg
, cb
->zmqsock
);
80 if (cb
->read
.cancelled
) {
81 frrzmq_check_events(cbp
, &cb
->write
,
83 cb
->read
.thread
= NULL
;
84 if (cb
->write
.cancelled
&& !cb
->write
.thread
)
85 XFREE(MTYPE_ZEROMQ_CB
, *cbp
);
93 if (zmq_msg_init(&msg
))
96 ret
= zmq_msg_recv(&msg
, cb
->zmqsock
, ZMQ_NOBLOCK
);
107 cb
->read
.cb_part(cb
->read
.arg
, cb
->zmqsock
, &msg
,
111 if (cb
->read
.cancelled
) {
113 frrzmq_check_events(cbp
, &cb
->write
,
115 cb
->read
.thread
= NULL
;
116 if (cb
->write
.cancelled
&& !cb
->write
.thread
)
117 XFREE(MTYPE_ZEROMQ_CB
, *cbp
);
122 /* cb_part may have read additional parts of the
123 * message; don't use zmq_msg_more here */
124 moresz
= sizeof(more
);
126 ret
= zmq_getsockopt(cb
->zmqsock
, ZMQ_RCVMORE
, &more
,
139 frrzmq_check_events(cbp
, &cb
->write
, ZMQ_POLLOUT
);
141 thread_add_read(t
->master
, frrzmq_read_msg
, cbp
,
142 cb
->fd
, &cb
->read
.thread
);
146 flog_err(EC_LIB_ZMQ
, "ZeroMQ read error: %s(%d)", strerror(errno
),
148 if (cb
->read
.cb_error
)
149 cb
->read
.cb_error(cb
->read
.arg
, cb
->zmqsock
);
152 int _frrzmq_thread_add_read(const struct xref_threadsched
*xref
,
153 struct thread_master
*master
,
154 void (*msgfunc
)(void *arg
, void *zmqsock
),
155 void (*partfunc
)(void *arg
, void *zmqsock
,
156 zmq_msg_t
*msg
, unsigned partnum
),
157 void (*errfunc
)(void *arg
, void *zmqsock
),
158 void *arg
, void *zmqsock
,
159 struct frrzmq_cb
**cbp
)
163 struct frrzmq_cb
*cb
;
167 if (!(msgfunc
|| partfunc
) || (msgfunc
&& partfunc
))
170 if (zmq_getsockopt(zmqsock
, ZMQ_FD
, &fd
, &len
))
172 len
= sizeof(events
);
173 if (zmq_getsockopt(zmqsock
, ZMQ_EVENTS
, &events
, &len
))
179 cb
= XCALLOC(MTYPE_ZEROMQ_CB
, sizeof(struct frrzmq_cb
));
180 cb
->write
.cancelled
= true;
184 cb
->zmqsock
= zmqsock
;
187 cb
->read
.cb_msg
= msgfunc
;
188 cb
->read
.cb_part
= partfunc
;
189 cb
->read
.cb_error
= errfunc
;
190 cb
->read
.cancelled
= false;
193 if (events
& ZMQ_POLLIN
) {
194 thread_cancel(&cb
->read
.thread
);
196 thread_add_event(master
, frrzmq_read_msg
, cbp
, fd
,
199 thread_add_read(master
, frrzmq_read_msg
, cbp
, fd
,
204 static void frrzmq_write_msg(struct thread
*t
)
206 struct frrzmq_cb
**cbp
= THREAD_ARG(t
);
207 struct frrzmq_cb
*cb
;
208 unsigned char written
= 0;
214 if (!cb
|| !cb
->zmqsock
)
218 zmq_pollitem_t polli
= {.socket
= cb
->zmqsock
,
219 .events
= ZMQ_POLLOUT
};
220 ret
= zmq_poll(&polli
, 1, 0);
225 if (!(polli
.revents
& ZMQ_POLLOUT
))
228 if (cb
->write
.cb_msg
) {
230 cb
->write
.cb_msg(cb
->write
.arg
, cb
->zmqsock
);
235 if (cb
->write
.cancelled
) {
236 frrzmq_check_events(cbp
, &cb
->read
, ZMQ_POLLIN
);
237 cb
->write
.thread
= NULL
;
238 if (cb
->read
.cancelled
&& !cb
->read
.thread
)
239 XFREE(MTYPE_ZEROMQ_CB
, *cbp
);
248 frrzmq_check_events(cbp
, &cb
->read
, ZMQ_POLLIN
);
250 thread_add_write(t
->master
, frrzmq_write_msg
, cbp
,
251 cb
->fd
, &cb
->write
.thread
);
255 flog_err(EC_LIB_ZMQ
, "ZeroMQ write error: %s(%d)", strerror(errno
),
257 if (cb
->write
.cb_error
)
258 cb
->write
.cb_error(cb
->write
.arg
, cb
->zmqsock
);
261 int _frrzmq_thread_add_write(const struct xref_threadsched
*xref
,
262 struct thread_master
*master
,
263 void (*msgfunc
)(void *arg
, void *zmqsock
),
264 void (*errfunc
)(void *arg
, void *zmqsock
),
265 void *arg
, void *zmqsock
, struct frrzmq_cb
**cbp
)
269 struct frrzmq_cb
*cb
;
276 if (zmq_getsockopt(zmqsock
, ZMQ_FD
, &fd
, &len
))
278 len
= sizeof(events
);
279 if (zmq_getsockopt(zmqsock
, ZMQ_EVENTS
, &events
, &len
))
285 cb
= XCALLOC(MTYPE_ZEROMQ_CB
, sizeof(struct frrzmq_cb
));
286 cb
->read
.cancelled
= true;
290 cb
->zmqsock
= zmqsock
;
293 cb
->write
.cb_msg
= msgfunc
;
294 cb
->write
.cb_part
= NULL
;
295 cb
->write
.cb_error
= errfunc
;
296 cb
->write
.cancelled
= false;
299 if (events
& ZMQ_POLLOUT
) {
300 thread_cancel(&cb
->write
.thread
);
302 _thread_add_event(xref
, master
, frrzmq_write_msg
, cbp
, fd
,
305 thread_add_write(master
, frrzmq_write_msg
, cbp
, fd
,
310 void frrzmq_thread_cancel(struct frrzmq_cb
**cb
, struct cb_core
*core
)
314 core
->cancelled
= true;
315 thread_cancel(&core
->thread
);
317 /* If cancelled from within a callback, don't try to free memory
323 /* Ok to free the callback context if no more ... context. */
324 if ((*cb
)->read
.cancelled
&& !(*cb
)->read
.thread
325 && (*cb
)->write
.cancelled
&& ((*cb
)->write
.thread
== NULL
))
326 XFREE(MTYPE_ZEROMQ_CB
, *cb
);
329 void frrzmq_check_events(struct frrzmq_cb
**cbp
, struct cb_core
*core
,
332 struct frrzmq_cb
*cb
;
339 if (!cb
|| !cb
->zmqsock
)
342 len
= sizeof(events
);
343 if (zmq_getsockopt(cb
->zmqsock
, ZMQ_EVENTS
, &events
, &len
))
345 if ((events
& event
) && core
->thread
&& !core
->cancelled
) {
346 struct thread_master
*tm
= core
->thread
->master
;
348 thread_cancel(&core
->thread
);
350 if (event
== ZMQ_POLLIN
)
351 thread_add_event(tm
, frrzmq_read_msg
,
352 cbp
, cb
->fd
, &core
->thread
);
354 thread_add_event(tm
, frrzmq_write_msg
,
355 cbp
, cb
->fd
, &core
->thread
);