]> git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
Merge pull request #1536 from opensourcerouting/isis-l2conv
[mirror_frr.git] / lib / frr_zmq.c
1 /*
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20 #include <zebra.h>
21 #include <zmq.h>
22
23 #include "thread.h"
24 #include "memory.h"
25 #include "frr_zmq.h"
26 #include "log.h"
27
28 DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback")
29
30 /* libzmq's context */
31 void *frrzmq_context = NULL;
32 static unsigned frrzmq_initcount = 0;
33
34 void frrzmq_init(void)
35 {
36 if (frrzmq_initcount++ == 0) {
37 frrzmq_context = zmq_ctx_new();
38 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
39 }
40 }
41
42 void frrzmq_finish(void)
43 {
44 if (--frrzmq_initcount == 0) {
45 zmq_ctx_term(frrzmq_context);
46 frrzmq_context = NULL;
47 }
48 }
49
50 static int frrzmq_read_msg(struct thread *t)
51 {
52 struct frrzmq_cb **cbp = THREAD_ARG(t);
53 struct frrzmq_cb *cb;
54 zmq_msg_t msg;
55 unsigned partno;
56 unsigned char read = 0;
57 int ret, more;
58 size_t moresz;
59
60 if (!cbp)
61 return 1;
62 cb = (*cbp);
63 if (!cb || !cb->zmqsock)
64 return 1;
65
66 while (1) {
67 zmq_pollitem_t polli = {.socket = cb->zmqsock,
68 .events = ZMQ_POLLIN};
69 ret = zmq_poll(&polli, 1, 0);
70
71 if (ret < 0)
72 goto out_err;
73
74 if (!(polli.revents & ZMQ_POLLIN))
75 break;
76
77 if (cb->read.cb_msg) {
78 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
79 read = 1;
80
81 if (cb->read.cancelled) {
82 frrzmq_check_events(cbp, &cb->write,
83 ZMQ_POLLOUT);
84 cb->read.thread = NULL;
85 if (cb->write.cancelled && !cb->write.thread)
86 XFREE(MTYPE_ZEROMQ_CB, cb);
87 return 0;
88 }
89 continue;
90 }
91
92 partno = 0;
93 if (zmq_msg_init(&msg))
94 goto out_err;
95 do {
96 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
97 if (ret < 0) {
98 if (errno == EAGAIN)
99 break;
100
101 zmq_msg_close(&msg);
102 goto out_err;
103 }
104 read = 1;
105
106 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
107 partno);
108 if (cb->read.cancelled) {
109 zmq_msg_close(&msg);
110 frrzmq_check_events(cbp, &cb->write,
111 ZMQ_POLLOUT);
112 cb->read.thread = NULL;
113 if (cb->write.cancelled && !cb->write.thread)
114 XFREE(MTYPE_ZEROMQ_CB, cb);
115 return 0;
116 }
117
118 /* cb_part may have read additional parts of the
119 * message; don't use zmq_msg_more here */
120 moresz = sizeof(more);
121 more = 0;
122 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
123 &moresz);
124 if (ret < 0) {
125 zmq_msg_close(&msg);
126 goto out_err;
127 }
128
129 partno++;
130 } while (more);
131 zmq_msg_close(&msg);
132 }
133
134 if (read)
135 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
136
137 funcname_thread_add_read_write(
138 THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
139 &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
140 return 0;
141
142 out_err:
143 zlog_err("ZeroMQ read error: %s(%d)", strerror(errno), errno);
144 if (cb->read.cb_error)
145 cb->read.cb_error(cb->read.arg, cb->zmqsock);
146 return 1;
147 }
148
149 int funcname_frrzmq_thread_add_read(struct thread_master *master,
150 void (*msgfunc)(void *arg, void *zmqsock),
151 void (*partfunc)(void *arg, void *zmqsock,
152 zmq_msg_t *msg,
153 unsigned partnum),
154 void (*errfunc)(void *arg, void *zmqsock),
155 void *arg, void *zmqsock,
156 struct frrzmq_cb **cbp, debugargdef)
157 {
158 int fd, events;
159 size_t len;
160 struct frrzmq_cb *cb;
161
162 if (!cbp)
163 return -1;
164 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
165 return -1;
166 len = sizeof(fd);
167 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
168 return -1;
169 len = sizeof(events);
170 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
171 return -1;
172
173 if (*cbp)
174 cb = *cbp;
175 else {
176 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
177 cb->write.cancelled = 1;
178 if (!cb)
179 return -1;
180 *cbp = cb;
181 }
182
183 cb->zmqsock = zmqsock;
184 cb->fd = fd;
185 cb->read.arg = arg;
186 cb->read.cb_msg = msgfunc;
187 cb->read.cb_part = partfunc;
188 cb->read.cb_error = errfunc;
189 cb->read.cancelled = 0;
190
191 if (events & ZMQ_POLLIN) {
192 if (cb->read.thread) {
193 thread_cancel(cb->read.thread);
194 cb->read.thread = NULL;
195 }
196 funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
197 &cb->read.thread, funcname, schedfrom,
198 fromln);
199 } else
200 funcname_thread_add_read_write(
201 THREAD_READ, master, frrzmq_read_msg, cbp, fd,
202 &cb->read.thread, funcname, schedfrom, fromln);
203 return 0;
204 }
205
206 static int frrzmq_write_msg(struct thread *t)
207 {
208 struct frrzmq_cb **cbp = THREAD_ARG(t);
209 struct frrzmq_cb *cb;
210 unsigned char written = 0;
211 int ret;
212
213 if (!cbp)
214 return 1;
215 cb = (*cbp);
216 if (!cb || !cb->zmqsock)
217 return 1;
218
219 while (1) {
220 zmq_pollitem_t polli = {.socket = cb->zmqsock,
221 .events = ZMQ_POLLOUT};
222 ret = zmq_poll(&polli, 1, 0);
223
224 if (ret < 0)
225 goto out_err;
226
227 if (!(polli.revents & ZMQ_POLLOUT))
228 break;
229
230 if (cb->write.cb_msg) {
231 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
232 written = 1;
233
234 if (cb->write.cancelled) {
235 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
236 cb->write.thread = NULL;
237 if (cb->read.cancelled && !cb->read.thread)
238 XFREE(MTYPE_ZEROMQ_CB, cb);
239 return 0;
240 }
241 continue;
242 }
243 }
244
245 if (written)
246 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
247
248 funcname_thread_add_read_write(THREAD_WRITE, t->master,
249 frrzmq_write_msg, cbp, cb->fd,
250 &cb->write.thread, t->funcname,
251 t->schedfrom, t->schedfrom_line);
252 return 0;
253
254 out_err:
255 zlog_err("ZeroMQ write error: %s(%d)", strerror(errno), errno);
256 if (cb->write.cb_error)
257 cb->write.cb_error(cb->write.arg, cb->zmqsock);
258 return 1;
259 }
260 int funcname_frrzmq_thread_add_write(struct thread_master *master,
261 void (*msgfunc)(void *arg, void *zmqsock),
262 void (*errfunc)(void *arg, void *zmqsock),
263 void *arg, void *zmqsock,
264 struct frrzmq_cb **cbp, debugargdef)
265 {
266 int fd, events;
267 size_t len;
268 struct frrzmq_cb *cb;
269
270 if (!cbp)
271 return -1;
272 if (!msgfunc)
273 return -1;
274 len = sizeof(fd);
275 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
276 return -1;
277 len = sizeof(events);
278 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
279 return -1;
280
281 if (*cbp)
282 cb = *cbp;
283 else {
284 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
285 cb->read.cancelled = 1;
286 if (!cb)
287 return -1;
288 *cbp = cb;
289 }
290
291 cb->zmqsock = zmqsock;
292 cb->fd = fd;
293 cb->write.arg = arg;
294 cb->write.cb_msg = msgfunc;
295 cb->write.cb_part = NULL;
296 cb->write.cb_error = errfunc;
297 cb->write.cancelled = 0;
298
299 if (events & ZMQ_POLLOUT) {
300 if (cb->write.thread) {
301 thread_cancel(cb->write.thread);
302 cb->write.thread = NULL;
303 }
304 funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
305 &cb->write.thread, funcname,
306 schedfrom, fromln);
307 } else
308 funcname_thread_add_read_write(
309 THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
310 &cb->write.thread, funcname, schedfrom, fromln);
311 return 0;
312 }
313
314 void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
315 {
316 if (!cb || !*cb)
317 return;
318 core->cancelled = 1;
319 if (core->thread) {
320 thread_cancel(core->thread);
321 core->thread = NULL;
322 }
323 if ((*cb)->read.cancelled && !(*cb)->read.thread
324 && (*cb)->write.cancelled && (*cb)->write.thread)
325 XFREE(MTYPE_ZEROMQ_CB, *cb);
326 }
327
328 void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
329 int event)
330 {
331 struct frrzmq_cb *cb;
332 int events;
333 size_t len;
334
335 if (!cbp)
336 return;
337 cb = (*cbp);
338 if (!cb || !cb->zmqsock)
339 return;
340
341 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
342 return;
343 if (events & event && core->thread && !core->cancelled) {
344 struct thread_master *tm = core->thread->master;
345 thread_cancel(core->thread);
346 core->thread = NULL;
347 thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
348 : frrzmq_write_msg),
349 cbp, cb->fd, &core->thread);
350 }
351 }