]>
git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
28 DEFINE_MTYPE_STATIC(LIB
, ZEROMQ_CB
, "ZeroMQ callback")
30 /* libzmq's context */
31 void *frrzmq_context
= NULL
;
32 static unsigned frrzmq_initcount
= 0;
34 void frrzmq_init(void)
36 if (frrzmq_initcount
++ == 0) {
37 frrzmq_context
= zmq_ctx_new();
38 zmq_ctx_set(frrzmq_context
, ZMQ_IPV6
, 1);
42 void frrzmq_finish(void)
44 if (--frrzmq_initcount
== 0) {
45 zmq_ctx_term(frrzmq_context
);
46 frrzmq_context
= NULL
;
50 /* read callback integration */
52 struct thread
*thread
;
59 void (*cb_msg
)(void *arg
, void *zmqsock
);
60 void (*cb_part
)(void *arg
, void *zmqsock
,
61 zmq_msg_t
*msg
, unsigned partnum
);
65 static int frrzmq_read_msg(struct thread
*t
)
67 struct frrzmq_cb
*cb
= THREAD_ARG(t
);
74 zmq_pollitem_t polli
= {
75 .socket
= cb
->zmqsock
,
78 ret
= zmq_poll(&polli
, 1, 0);
82 if (!(polli
.revents
& ZMQ_POLLIN
))
86 cb
->cb_msg(cb
->arg
, cb
->zmqsock
);
89 XFREE(MTYPE_ZEROMQ_CB
, cb
);
96 if (zmq_msg_init(&msg
))
99 ret
= zmq_msg_recv(&msg
, cb
->zmqsock
, ZMQ_NOBLOCK
);
108 cb
->cb_part(cb
->arg
, cb
->zmqsock
, &msg
, partno
);
111 XFREE(MTYPE_ZEROMQ_CB
, cb
);
115 /* cb_part may have read additional parts of the
116 * message; don't use zmq_msg_more here */
117 moresz
= sizeof(more
);
119 ret
= zmq_getsockopt(cb
->zmqsock
, ZMQ_RCVMORE
,
131 funcname_thread_add_read_write(THREAD_READ
, t
->master
, frrzmq_read_msg
,
132 cb
, cb
->fd
, &cb
->thread
, t
->funcname
, t
->schedfrom
,
137 zlog_err("ZeroMQ error: %s(%d)", strerror (errno
), errno
);
141 struct frrzmq_cb
*funcname_frrzmq_thread_add_read(
142 struct thread_master
*master
,
143 void (*msgfunc
)(void *arg
, void *zmqsock
),
144 void (*partfunc
)(void *arg
, void *zmqsock
,
145 zmq_msg_t
*msg
, unsigned partnum
),
146 void *arg
, void *zmqsock
, debugargdef
)
150 struct frrzmq_cb
*cb
;
152 if (!(msgfunc
|| partfunc
) || (msgfunc
&& partfunc
))
155 if (zmq_getsockopt(zmqsock
, ZMQ_FD
, &fd
, &len
))
157 len
= sizeof(events
);
158 if (zmq_getsockopt(zmqsock
, ZMQ_EVENTS
, &events
, &len
))
161 cb
= XCALLOC(MTYPE_ZEROMQ_CB
, sizeof(struct frrzmq_cb
));
166 cb
->zmqsock
= zmqsock
;
167 cb
->cb_msg
= msgfunc
;
168 cb
->cb_part
= partfunc
;
171 if (events
& ZMQ_POLLIN
)
172 funcname_thread_add_event(master
,
173 frrzmq_read_msg
, cb
, fd
, &cb
->thread
,
174 funcname
, schedfrom
, fromln
);
176 funcname_thread_add_read_write(THREAD_READ
, master
,
177 frrzmq_read_msg
, cb
, fd
, &cb
->thread
,
178 funcname
, schedfrom
, fromln
);
182 void frrzmq_thread_cancel(struct frrzmq_cb
*cb
)
185 /* canceling from within callback */
189 thread_cancel(cb
->thread
);
190 XFREE(MTYPE_ZEROMQ_CB
, cb
);