]> git.proxmox.com Git - mirror_frr.git/blame - lib/frr_zmq.c
Merge pull request #5473 from yasuhiro-ohara-ntt/ospf6d-self-orig-maxage-fix
[mirror_frr.git] / lib / frr_zmq.c
CommitLineData
b6116506
DL
1/*
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20#include <zebra.h>
21#include <zmq.h>
22
23#include "thread.h"
24#include "memory.h"
25#include "frr_zmq.h"
26#include "log.h"
35774357 27#include "lib_errors.h"
b6116506
DL
28
29DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback")
30
31/* libzmq's context */
32void *frrzmq_context = NULL;
33static unsigned frrzmq_initcount = 0;
34
35void frrzmq_init(void)
36{
37 if (frrzmq_initcount++ == 0) {
38 frrzmq_context = zmq_ctx_new();
39 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
40 }
41}
42
43void frrzmq_finish(void)
44{
45 if (--frrzmq_initcount == 0) {
46 zmq_ctx_term(frrzmq_context);
47 frrzmq_context = NULL;
48 }
49}
50
b6116506
DL
51static int frrzmq_read_msg(struct thread *t)
52{
afd0f10d 53 struct frrzmq_cb **cbp = THREAD_ARG(t);
54 struct frrzmq_cb *cb;
b6116506
DL
55 zmq_msg_t msg;
56 unsigned partno;
afd0f10d 57 unsigned char read = 0;
b6116506
DL
58 int ret, more;
59 size_t moresz;
60
afd0f10d 61 if (!cbp)
62 return 1;
63 cb = (*cbp);
64 if (!cb || !cb->zmqsock)
65 return 1;
66
b6116506 67 while (1) {
afd0f10d 68 zmq_pollitem_t polli = {.socket = cb->zmqsock,
69 .events = ZMQ_POLLIN};
b6116506
DL
70 ret = zmq_poll(&polli, 1, 0);
71
72 if (ret < 0)
73 goto out_err;
afd0f10d 74
b6116506
DL
75 if (!(polli.revents & ZMQ_POLLIN))
76 break;
77
afd0f10d 78 if (cb->read.cb_msg) {
79 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
80 read = 1;
b6116506 81
afd0f10d 82 if (cb->read.cancelled) {
83 frrzmq_check_events(cbp, &cb->write,
84 ZMQ_POLLOUT);
85 cb->read.thread = NULL;
86 if (cb->write.cancelled && !cb->write.thread)
87 XFREE(MTYPE_ZEROMQ_CB, cb);
b6116506
DL
88 return 0;
89 }
90 continue;
91 }
92
93 partno = 0;
94 if (zmq_msg_init(&msg))
95 goto out_err;
96 do {
97 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
98 if (ret < 0) {
99 if (errno == EAGAIN)
100 break;
101
102 zmq_msg_close(&msg);
103 goto out_err;
104 }
afd0f10d 105 read = 1;
b6116506 106
afd0f10d 107 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
108 partno);
109 if (cb->read.cancelled) {
b6116506 110 zmq_msg_close(&msg);
afd0f10d 111 frrzmq_check_events(cbp, &cb->write,
112 ZMQ_POLLOUT);
113 cb->read.thread = NULL;
114 if (cb->write.cancelled && !cb->write.thread)
115 XFREE(MTYPE_ZEROMQ_CB, cb);
b6116506
DL
116 return 0;
117 }
118
119 /* cb_part may have read additional parts of the
120 * message; don't use zmq_msg_more here */
121 moresz = sizeof(more);
122 more = 0;
afd0f10d 123 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
124 &moresz);
b6116506
DL
125 if (ret < 0) {
126 zmq_msg_close(&msg);
127 goto out_err;
128 }
129
130 partno++;
131 } while (more);
132 zmq_msg_close(&msg);
133 }
134
afd0f10d 135 if (read)
136 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
137
138 funcname_thread_add_read_write(
139 THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
140 &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
b6116506
DL
141 return 0;
142
143out_err:
450971aa 144 flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
1c50c1c0 145 errno);
afd0f10d 146 if (cb->read.cb_error)
147 cb->read.cb_error(cb->read.arg, cb->zmqsock);
148 return 1;
b6116506
DL
149}
150
afd0f10d 151int funcname_frrzmq_thread_add_read(struct thread_master *master,
152 void (*msgfunc)(void *arg, void *zmqsock),
153 void (*partfunc)(void *arg, void *zmqsock,
154 zmq_msg_t *msg,
155 unsigned partnum),
156 void (*errfunc)(void *arg, void *zmqsock),
157 void *arg, void *zmqsock,
158 struct frrzmq_cb **cbp, debugargdef)
b6116506
DL
159{
160 int fd, events;
161 size_t len;
162 struct frrzmq_cb *cb;
163
afd0f10d 164 if (!cbp)
165 return -1;
b6116506 166 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
afd0f10d 167 return -1;
168 len = sizeof(fd);
169 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
170 return -1;
171 len = sizeof(events);
172 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
173 return -1;
174
175 if (*cbp)
176 cb = *cbp;
177 else {
178 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
6d10727a 179
180 cb->write.cancelled = 1;
afd0f10d 181 *cbp = cb;
182 }
183
184 cb->zmqsock = zmqsock;
185 cb->fd = fd;
186 cb->read.arg = arg;
187 cb->read.cb_msg = msgfunc;
188 cb->read.cb_part = partfunc;
189 cb->read.cb_error = errfunc;
190 cb->read.cancelled = 0;
191
192 if (events & ZMQ_POLLIN) {
193 if (cb->read.thread) {
194 thread_cancel(cb->read.thread);
195 cb->read.thread = NULL;
196 }
197 funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
198 &cb->read.thread, funcname, schedfrom,
199 fromln);
200 } else
201 funcname_thread_add_read_write(
202 THREAD_READ, master, frrzmq_read_msg, cbp, fd,
203 &cb->read.thread, funcname, schedfrom, fromln);
204 return 0;
205}
206
207static int frrzmq_write_msg(struct thread *t)
208{
209 struct frrzmq_cb **cbp = THREAD_ARG(t);
210 struct frrzmq_cb *cb;
211 unsigned char written = 0;
212 int ret;
213
214 if (!cbp)
215 return 1;
216 cb = (*cbp);
217 if (!cb || !cb->zmqsock)
218 return 1;
219
220 while (1) {
221 zmq_pollitem_t polli = {.socket = cb->zmqsock,
222 .events = ZMQ_POLLOUT};
223 ret = zmq_poll(&polli, 1, 0);
224
225 if (ret < 0)
226 goto out_err;
227
228 if (!(polli.revents & ZMQ_POLLOUT))
229 break;
230
231 if (cb->write.cb_msg) {
232 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
233 written = 1;
234
235 if (cb->write.cancelled) {
236 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
237 cb->write.thread = NULL;
238 if (cb->read.cancelled && !cb->read.thread)
239 XFREE(MTYPE_ZEROMQ_CB, cb);
240 return 0;
241 }
242 continue;
243 }
244 }
245
246 if (written)
247 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
248
249 funcname_thread_add_read_write(THREAD_WRITE, t->master,
250 frrzmq_write_msg, cbp, cb->fd,
251 &cb->write.thread, t->funcname,
252 t->schedfrom, t->schedfrom_line);
253 return 0;
254
255out_err:
450971aa 256 flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
1c50c1c0 257 errno);
afd0f10d 258 if (cb->write.cb_error)
259 cb->write.cb_error(cb->write.arg, cb->zmqsock);
260 return 1;
261}
262int funcname_frrzmq_thread_add_write(struct thread_master *master,
263 void (*msgfunc)(void *arg, void *zmqsock),
264 void (*errfunc)(void *arg, void *zmqsock),
265 void *arg, void *zmqsock,
266 struct frrzmq_cb **cbp, debugargdef)
267{
268 int fd, events;
269 size_t len;
270 struct frrzmq_cb *cb;
271
272 if (!cbp)
273 return -1;
274 if (!msgfunc)
275 return -1;
b6116506
DL
276 len = sizeof(fd);
277 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
afd0f10d 278 return -1;
b6116506
DL
279 len = sizeof(events);
280 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
afd0f10d 281 return -1;
b6116506 282
afd0f10d 283 if (*cbp)
284 cb = *cbp;
285 else {
286 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
6d10727a 287
288 cb->read.cancelled = 1;
afd0f10d 289 *cbp = cb;
290 }
b6116506 291
b6116506 292 cb->zmqsock = zmqsock;
b6116506 293 cb->fd = fd;
afd0f10d 294 cb->write.arg = arg;
295 cb->write.cb_msg = msgfunc;
296 cb->write.cb_part = NULL;
297 cb->write.cb_error = errfunc;
298 cb->write.cancelled = 0;
299
300 if (events & ZMQ_POLLOUT) {
301 if (cb->write.thread) {
302 thread_cancel(cb->write.thread);
303 cb->write.thread = NULL;
304 }
305 funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
306 &cb->write.thread, funcname,
307 schedfrom, fromln);
308 } else
309 funcname_thread_add_read_write(
310 THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
311 &cb->write.thread, funcname, schedfrom, fromln);
312 return 0;
313}
b6116506 314
afd0f10d 315void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
316{
317 if (!cb || !*cb)
318 return;
319 core->cancelled = 1;
320 if (core->thread) {
321 thread_cancel(core->thread);
322 core->thread = NULL;
323 }
324 if ((*cb)->read.cancelled && !(*cb)->read.thread
325 && (*cb)->write.cancelled && (*cb)->write.thread)
326 XFREE(MTYPE_ZEROMQ_CB, *cb);
b6116506
DL
327}
328
afd0f10d 329void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
330 int event)
b6116506 331{
afd0f10d 332 struct frrzmq_cb *cb;
333 int events;
334 size_t len;
335
336 if (!cbp)
337 return;
338 cb = (*cbp);
339 if (!cb || !cb->zmqsock)
340 return;
341
81b8afcf 342 len = sizeof(events);
afd0f10d 343 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
b6116506 344 return;
afd0f10d 345 if (events & event && core->thread && !core->cancelled) {
346 struct thread_master *tm = core->thread->master;
347 thread_cancel(core->thread);
348 core->thread = NULL;
349 thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
350 : frrzmq_write_msg),
351 cbp, cb->fd, &core->thread);
b6116506 352 }
b6116506 353}