]> git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
zebra, lib: fix the ZEBRA_INTERFACE_VRF_UPDATE zapi message
[mirror_frr.git] / lib / frr_zmq.c
1 /*
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20 #include <zebra.h>
21 #include <zmq.h>
22
23 #include "thread.h"
24 #include "memory.h"
25 #include "frr_zmq.h"
26 #include "log.h"
27 #include "lib_errors.h"
28
29 DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback")
30
31 /* libzmq's context */
32 void *frrzmq_context = NULL;
33 static unsigned frrzmq_initcount = 0;
34
35 void frrzmq_init(void)
36 {
37 if (frrzmq_initcount++ == 0) {
38 frrzmq_context = zmq_ctx_new();
39 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
40 }
41 }
42
43 void frrzmq_finish(void)
44 {
45 if (--frrzmq_initcount == 0) {
46 zmq_ctx_term(frrzmq_context);
47 frrzmq_context = NULL;
48 }
49 }
50
51 static int frrzmq_read_msg(struct thread *t)
52 {
53 struct frrzmq_cb **cbp = THREAD_ARG(t);
54 struct frrzmq_cb *cb;
55 zmq_msg_t msg;
56 unsigned partno;
57 unsigned char read = 0;
58 int ret, more;
59 size_t moresz;
60
61 if (!cbp)
62 return 1;
63 cb = (*cbp);
64 if (!cb || !cb->zmqsock)
65 return 1;
66
67 while (1) {
68 zmq_pollitem_t polli = {.socket = cb->zmqsock,
69 .events = ZMQ_POLLIN};
70 ret = zmq_poll(&polli, 1, 0);
71
72 if (ret < 0)
73 goto out_err;
74
75 if (!(polli.revents & ZMQ_POLLIN))
76 break;
77
78 if (cb->read.cb_msg) {
79 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
80 read = 1;
81
82 if (cb->read.cancelled) {
83 frrzmq_check_events(cbp, &cb->write,
84 ZMQ_POLLOUT);
85 cb->read.thread = NULL;
86 if (cb->write.cancelled && !cb->write.thread)
87 XFREE(MTYPE_ZEROMQ_CB, cb);
88 return 0;
89 }
90 continue;
91 }
92
93 partno = 0;
94 if (zmq_msg_init(&msg))
95 goto out_err;
96 do {
97 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
98 if (ret < 0) {
99 if (errno == EAGAIN)
100 break;
101
102 zmq_msg_close(&msg);
103 goto out_err;
104 }
105 read = 1;
106
107 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
108 partno);
109 if (cb->read.cancelled) {
110 zmq_msg_close(&msg);
111 frrzmq_check_events(cbp, &cb->write,
112 ZMQ_POLLOUT);
113 cb->read.thread = NULL;
114 if (cb->write.cancelled && !cb->write.thread)
115 XFREE(MTYPE_ZEROMQ_CB, cb);
116 return 0;
117 }
118
119 /* cb_part may have read additional parts of the
120 * message; don't use zmq_msg_more here */
121 moresz = sizeof(more);
122 more = 0;
123 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
124 &moresz);
125 if (ret < 0) {
126 zmq_msg_close(&msg);
127 goto out_err;
128 }
129
130 partno++;
131 } while (more);
132 zmq_msg_close(&msg);
133 }
134
135 if (read)
136 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
137
138 funcname_thread_add_read_write(
139 THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
140 &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
141 return 0;
142
143 out_err:
144 flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno),
145 errno);
146 if (cb->read.cb_error)
147 cb->read.cb_error(cb->read.arg, cb->zmqsock);
148 return 1;
149 }
150
151 int funcname_frrzmq_thread_add_read(struct thread_master *master,
152 void (*msgfunc)(void *arg, void *zmqsock),
153 void (*partfunc)(void *arg, void *zmqsock,
154 zmq_msg_t *msg,
155 unsigned partnum),
156 void (*errfunc)(void *arg, void *zmqsock),
157 void *arg, void *zmqsock,
158 struct frrzmq_cb **cbp, debugargdef)
159 {
160 int fd, events;
161 size_t len;
162 struct frrzmq_cb *cb;
163
164 if (!cbp)
165 return -1;
166 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
167 return -1;
168 len = sizeof(fd);
169 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
170 return -1;
171 len = sizeof(events);
172 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
173 return -1;
174
175 if (*cbp)
176 cb = *cbp;
177 else {
178 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
179 if (!cb)
180 return -1;
181
182 cb->write.cancelled = 1;
183 *cbp = cb;
184 }
185
186 cb->zmqsock = zmqsock;
187 cb->fd = fd;
188 cb->read.arg = arg;
189 cb->read.cb_msg = msgfunc;
190 cb->read.cb_part = partfunc;
191 cb->read.cb_error = errfunc;
192 cb->read.cancelled = 0;
193
194 if (events & ZMQ_POLLIN) {
195 if (cb->read.thread) {
196 thread_cancel(cb->read.thread);
197 cb->read.thread = NULL;
198 }
199 funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
200 &cb->read.thread, funcname, schedfrom,
201 fromln);
202 } else
203 funcname_thread_add_read_write(
204 THREAD_READ, master, frrzmq_read_msg, cbp, fd,
205 &cb->read.thread, funcname, schedfrom, fromln);
206 return 0;
207 }
208
209 static int frrzmq_write_msg(struct thread *t)
210 {
211 struct frrzmq_cb **cbp = THREAD_ARG(t);
212 struct frrzmq_cb *cb;
213 unsigned char written = 0;
214 int ret;
215
216 if (!cbp)
217 return 1;
218 cb = (*cbp);
219 if (!cb || !cb->zmqsock)
220 return 1;
221
222 while (1) {
223 zmq_pollitem_t polli = {.socket = cb->zmqsock,
224 .events = ZMQ_POLLOUT};
225 ret = zmq_poll(&polli, 1, 0);
226
227 if (ret < 0)
228 goto out_err;
229
230 if (!(polli.revents & ZMQ_POLLOUT))
231 break;
232
233 if (cb->write.cb_msg) {
234 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
235 written = 1;
236
237 if (cb->write.cancelled) {
238 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
239 cb->write.thread = NULL;
240 if (cb->read.cancelled && !cb->read.thread)
241 XFREE(MTYPE_ZEROMQ_CB, cb);
242 return 0;
243 }
244 continue;
245 }
246 }
247
248 if (written)
249 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
250
251 funcname_thread_add_read_write(THREAD_WRITE, t->master,
252 frrzmq_write_msg, cbp, cb->fd,
253 &cb->write.thread, t->funcname,
254 t->schedfrom, t->schedfrom_line);
255 return 0;
256
257 out_err:
258 flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno),
259 errno);
260 if (cb->write.cb_error)
261 cb->write.cb_error(cb->write.arg, cb->zmqsock);
262 return 1;
263 }
264 int funcname_frrzmq_thread_add_write(struct thread_master *master,
265 void (*msgfunc)(void *arg, void *zmqsock),
266 void (*errfunc)(void *arg, void *zmqsock),
267 void *arg, void *zmqsock,
268 struct frrzmq_cb **cbp, debugargdef)
269 {
270 int fd, events;
271 size_t len;
272 struct frrzmq_cb *cb;
273
274 if (!cbp)
275 return -1;
276 if (!msgfunc)
277 return -1;
278 len = sizeof(fd);
279 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
280 return -1;
281 len = sizeof(events);
282 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
283 return -1;
284
285 if (*cbp)
286 cb = *cbp;
287 else {
288 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
289 if (!cb)
290 return -1;
291
292 cb->read.cancelled = 1;
293 *cbp = cb;
294 }
295
296 cb->zmqsock = zmqsock;
297 cb->fd = fd;
298 cb->write.arg = arg;
299 cb->write.cb_msg = msgfunc;
300 cb->write.cb_part = NULL;
301 cb->write.cb_error = errfunc;
302 cb->write.cancelled = 0;
303
304 if (events & ZMQ_POLLOUT) {
305 if (cb->write.thread) {
306 thread_cancel(cb->write.thread);
307 cb->write.thread = NULL;
308 }
309 funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
310 &cb->write.thread, funcname,
311 schedfrom, fromln);
312 } else
313 funcname_thread_add_read_write(
314 THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
315 &cb->write.thread, funcname, schedfrom, fromln);
316 return 0;
317 }
318
319 void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
320 {
321 if (!cb || !*cb)
322 return;
323 core->cancelled = 1;
324 if (core->thread) {
325 thread_cancel(core->thread);
326 core->thread = NULL;
327 }
328 if ((*cb)->read.cancelled && !(*cb)->read.thread
329 && (*cb)->write.cancelled && (*cb)->write.thread)
330 XFREE(MTYPE_ZEROMQ_CB, *cb);
331 }
332
333 void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
334 int event)
335 {
336 struct frrzmq_cb *cb;
337 int events;
338 size_t len;
339
340 if (!cbp)
341 return;
342 cb = (*cbp);
343 if (!cb || !cb->zmqsock)
344 return;
345
346 len = sizeof(events);
347 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
348 return;
349 if (events & event && core->thread && !core->cancelled) {
350 struct thread_master *tm = core->thread->master;
351 thread_cancel(core->thread);
352 core->thread = NULL;
353 thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
354 : frrzmq_write_msg),
355 cbp, cb->fd, &core->thread);
356 }
357 }