]>
git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
27 #include "lib_errors.h"
29 DEFINE_MTYPE_STATIC(LIB
, ZEROMQ_CB
, "ZeroMQ callback")
31 /* libzmq's context */
32 void *frrzmq_context
= NULL
;
33 static unsigned frrzmq_initcount
= 0;
35 void frrzmq_init(void)
37 if (frrzmq_initcount
++ == 0) {
38 frrzmq_context
= zmq_ctx_new();
39 zmq_ctx_set(frrzmq_context
, ZMQ_IPV6
, 1);
43 void frrzmq_finish(void)
45 if (--frrzmq_initcount
== 0) {
46 zmq_ctx_term(frrzmq_context
);
47 frrzmq_context
= NULL
;
51 static int frrzmq_read_msg(struct thread
*t
)
53 struct frrzmq_cb
**cbp
= THREAD_ARG(t
);
57 unsigned char read
= 0;
64 if (!cb
|| !cb
->zmqsock
)
68 zmq_pollitem_t polli
= {.socket
= cb
->zmqsock
,
69 .events
= ZMQ_POLLIN
};
70 ret
= zmq_poll(&polli
, 1, 0);
75 if (!(polli
.revents
& ZMQ_POLLIN
))
78 if (cb
->read
.cb_msg
) {
79 cb
->read
.cb_msg(cb
->read
.arg
, cb
->zmqsock
);
82 if (cb
->read
.cancelled
) {
83 frrzmq_check_events(cbp
, &cb
->write
,
85 cb
->read
.thread
= NULL
;
86 if (cb
->write
.cancelled
&& !cb
->write
.thread
)
87 XFREE(MTYPE_ZEROMQ_CB
, cb
);
94 if (zmq_msg_init(&msg
))
97 ret
= zmq_msg_recv(&msg
, cb
->zmqsock
, ZMQ_NOBLOCK
);
107 cb
->read
.cb_part(cb
->read
.arg
, cb
->zmqsock
, &msg
,
109 if (cb
->read
.cancelled
) {
111 frrzmq_check_events(cbp
, &cb
->write
,
113 cb
->read
.thread
= NULL
;
114 if (cb
->write
.cancelled
&& !cb
->write
.thread
)
115 XFREE(MTYPE_ZEROMQ_CB
, cb
);
119 /* cb_part may have read additional parts of the
120 * message; don't use zmq_msg_more here */
121 moresz
= sizeof(more
);
123 ret
= zmq_getsockopt(cb
->zmqsock
, ZMQ_RCVMORE
, &more
,
136 frrzmq_check_events(cbp
, &cb
->write
, ZMQ_POLLOUT
);
138 funcname_thread_add_read_write(
139 THREAD_READ
, t
->master
, frrzmq_read_msg
, cbp
, cb
->fd
,
140 &cb
->read
.thread
, t
->funcname
, t
->schedfrom
, t
->schedfrom_line
);
144 flog_err(EC_LIB_ZMQ
, "ZeroMQ read error: %s(%d)", strerror(errno
),
146 if (cb
->read
.cb_error
)
147 cb
->read
.cb_error(cb
->read
.arg
, cb
->zmqsock
);
151 int funcname_frrzmq_thread_add_read(struct thread_master
*master
,
152 void (*msgfunc
)(void *arg
, void *zmqsock
),
153 void (*partfunc
)(void *arg
, void *zmqsock
,
156 void (*errfunc
)(void *arg
, void *zmqsock
),
157 void *arg
, void *zmqsock
,
158 struct frrzmq_cb
**cbp
, debugargdef
)
162 struct frrzmq_cb
*cb
;
166 if (!(msgfunc
|| partfunc
) || (msgfunc
&& partfunc
))
169 if (zmq_getsockopt(zmqsock
, ZMQ_FD
, &fd
, &len
))
171 len
= sizeof(events
);
172 if (zmq_getsockopt(zmqsock
, ZMQ_EVENTS
, &events
, &len
))
178 cb
= XCALLOC(MTYPE_ZEROMQ_CB
, sizeof(struct frrzmq_cb
));
180 cb
->write
.cancelled
= 1;
184 cb
->zmqsock
= zmqsock
;
187 cb
->read
.cb_msg
= msgfunc
;
188 cb
->read
.cb_part
= partfunc
;
189 cb
->read
.cb_error
= errfunc
;
190 cb
->read
.cancelled
= 0;
192 if (events
& ZMQ_POLLIN
) {
193 if (cb
->read
.thread
) {
194 thread_cancel(cb
->read
.thread
);
195 cb
->read
.thread
= NULL
;
197 funcname_thread_add_event(master
, frrzmq_read_msg
, cbp
, fd
,
198 &cb
->read
.thread
, funcname
, schedfrom
,
201 funcname_thread_add_read_write(
202 THREAD_READ
, master
, frrzmq_read_msg
, cbp
, fd
,
203 &cb
->read
.thread
, funcname
, schedfrom
, fromln
);
207 static int frrzmq_write_msg(struct thread
*t
)
209 struct frrzmq_cb
**cbp
= THREAD_ARG(t
);
210 struct frrzmq_cb
*cb
;
211 unsigned char written
= 0;
217 if (!cb
|| !cb
->zmqsock
)
221 zmq_pollitem_t polli
= {.socket
= cb
->zmqsock
,
222 .events
= ZMQ_POLLOUT
};
223 ret
= zmq_poll(&polli
, 1, 0);
228 if (!(polli
.revents
& ZMQ_POLLOUT
))
231 if (cb
->write
.cb_msg
) {
232 cb
->write
.cb_msg(cb
->write
.arg
, cb
->zmqsock
);
235 if (cb
->write
.cancelled
) {
236 frrzmq_check_events(cbp
, &cb
->read
, ZMQ_POLLIN
);
237 cb
->write
.thread
= NULL
;
238 if (cb
->read
.cancelled
&& !cb
->read
.thread
)
239 XFREE(MTYPE_ZEROMQ_CB
, cb
);
247 frrzmq_check_events(cbp
, &cb
->read
, ZMQ_POLLIN
);
249 funcname_thread_add_read_write(THREAD_WRITE
, t
->master
,
250 frrzmq_write_msg
, cbp
, cb
->fd
,
251 &cb
->write
.thread
, t
->funcname
,
252 t
->schedfrom
, t
->schedfrom_line
);
256 flog_err(EC_LIB_ZMQ
, "ZeroMQ write error: %s(%d)", strerror(errno
),
258 if (cb
->write
.cb_error
)
259 cb
->write
.cb_error(cb
->write
.arg
, cb
->zmqsock
);
262 int funcname_frrzmq_thread_add_write(struct thread_master
*master
,
263 void (*msgfunc
)(void *arg
, void *zmqsock
),
264 void (*errfunc
)(void *arg
, void *zmqsock
),
265 void *arg
, void *zmqsock
,
266 struct frrzmq_cb
**cbp
, debugargdef
)
270 struct frrzmq_cb
*cb
;
277 if (zmq_getsockopt(zmqsock
, ZMQ_FD
, &fd
, &len
))
279 len
= sizeof(events
);
280 if (zmq_getsockopt(zmqsock
, ZMQ_EVENTS
, &events
, &len
))
286 cb
= XCALLOC(MTYPE_ZEROMQ_CB
, sizeof(struct frrzmq_cb
));
288 cb
->read
.cancelled
= 1;
292 cb
->zmqsock
= zmqsock
;
295 cb
->write
.cb_msg
= msgfunc
;
296 cb
->write
.cb_part
= NULL
;
297 cb
->write
.cb_error
= errfunc
;
298 cb
->write
.cancelled
= 0;
300 if (events
& ZMQ_POLLOUT
) {
301 if (cb
->write
.thread
) {
302 thread_cancel(cb
->write
.thread
);
303 cb
->write
.thread
= NULL
;
305 funcname_thread_add_event(master
, frrzmq_write_msg
, cbp
, fd
,
306 &cb
->write
.thread
, funcname
,
309 funcname_thread_add_read_write(
310 THREAD_WRITE
, master
, frrzmq_write_msg
, cbp
, fd
,
311 &cb
->write
.thread
, funcname
, schedfrom
, fromln
);
315 void frrzmq_thread_cancel(struct frrzmq_cb
**cb
, struct cb_core
*core
)
321 thread_cancel(core
->thread
);
324 if ((*cb
)->read
.cancelled
&& !(*cb
)->read
.thread
325 && (*cb
)->write
.cancelled
&& (*cb
)->write
.thread
)
326 XFREE(MTYPE_ZEROMQ_CB
, *cb
);
329 void frrzmq_check_events(struct frrzmq_cb
**cbp
, struct cb_core
*core
,
332 struct frrzmq_cb
*cb
;
339 if (!cb
|| !cb
->zmqsock
)
342 len
= sizeof(events
);
343 if (zmq_getsockopt(cb
->zmqsock
, ZMQ_EVENTS
, &events
, &len
))
345 if (events
& event
&& core
->thread
&& !core
->cancelled
) {
346 struct thread_master
*tm
= core
->thread
->master
;
347 thread_cancel(core
->thread
);
349 thread_add_event(tm
, (event
== ZMQ_POLLIN
? frrzmq_read_msg
351 cbp
, cb
->fd
, &core
->thread
);