]> git.proxmox.com Git - mirror_frr.git/blame - lib/frr_zmq.c
lib: Add LIB_ERR_DEVELOPMENT
[mirror_frr.git] / lib / frr_zmq.c
CommitLineData
b6116506
DL
1/*
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20#include <zebra.h>
21#include <zmq.h>
22
23#include "thread.h"
24#include "memory.h"
25#include "frr_zmq.h"
26#include "log.h"
27
28DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback")
29
30/* libzmq's context */
31void *frrzmq_context = NULL;
32static unsigned frrzmq_initcount = 0;
33
34void frrzmq_init(void)
35{
36 if (frrzmq_initcount++ == 0) {
37 frrzmq_context = zmq_ctx_new();
38 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
39 }
40}
41
42void frrzmq_finish(void)
43{
44 if (--frrzmq_initcount == 0) {
45 zmq_ctx_term(frrzmq_context);
46 frrzmq_context = NULL;
47 }
48}
49
b6116506
DL
50static int frrzmq_read_msg(struct thread *t)
51{
afd0f10d 52 struct frrzmq_cb **cbp = THREAD_ARG(t);
53 struct frrzmq_cb *cb;
b6116506
DL
54 zmq_msg_t msg;
55 unsigned partno;
afd0f10d 56 unsigned char read = 0;
b6116506
DL
57 int ret, more;
58 size_t moresz;
59
afd0f10d 60 if (!cbp)
61 return 1;
62 cb = (*cbp);
63 if (!cb || !cb->zmqsock)
64 return 1;
65
b6116506 66 while (1) {
afd0f10d 67 zmq_pollitem_t polli = {.socket = cb->zmqsock,
68 .events = ZMQ_POLLIN};
b6116506
DL
69 ret = zmq_poll(&polli, 1, 0);
70
71 if (ret < 0)
72 goto out_err;
afd0f10d 73
b6116506
DL
74 if (!(polli.revents & ZMQ_POLLIN))
75 break;
76
afd0f10d 77 if (cb->read.cb_msg) {
78 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
79 read = 1;
b6116506 80
afd0f10d 81 if (cb->read.cancelled) {
82 frrzmq_check_events(cbp, &cb->write,
83 ZMQ_POLLOUT);
84 cb->read.thread = NULL;
85 if (cb->write.cancelled && !cb->write.thread)
86 XFREE(MTYPE_ZEROMQ_CB, cb);
b6116506
DL
87 return 0;
88 }
89 continue;
90 }
91
92 partno = 0;
93 if (zmq_msg_init(&msg))
94 goto out_err;
95 do {
96 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
97 if (ret < 0) {
98 if (errno == EAGAIN)
99 break;
100
101 zmq_msg_close(&msg);
102 goto out_err;
103 }
afd0f10d 104 read = 1;
b6116506 105
afd0f10d 106 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
107 partno);
108 if (cb->read.cancelled) {
b6116506 109 zmq_msg_close(&msg);
afd0f10d 110 frrzmq_check_events(cbp, &cb->write,
111 ZMQ_POLLOUT);
112 cb->read.thread = NULL;
113 if (cb->write.cancelled && !cb->write.thread)
114 XFREE(MTYPE_ZEROMQ_CB, cb);
b6116506
DL
115 return 0;
116 }
117
118 /* cb_part may have read additional parts of the
119 * message; don't use zmq_msg_more here */
120 moresz = sizeof(more);
121 more = 0;
afd0f10d 122 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
123 &moresz);
b6116506
DL
124 if (ret < 0) {
125 zmq_msg_close(&msg);
126 goto out_err;
127 }
128
129 partno++;
130 } while (more);
131 zmq_msg_close(&msg);
132 }
133
afd0f10d 134 if (read)
135 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
136
137 funcname_thread_add_read_write(
138 THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
139 &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
b6116506
DL
140 return 0;
141
142out_err:
afd0f10d 143 zlog_err("ZeroMQ read error: %s(%d)", strerror(errno), errno);
144 if (cb->read.cb_error)
145 cb->read.cb_error(cb->read.arg, cb->zmqsock);
146 return 1;
b6116506
DL
147}
148
afd0f10d 149int funcname_frrzmq_thread_add_read(struct thread_master *master,
150 void (*msgfunc)(void *arg, void *zmqsock),
151 void (*partfunc)(void *arg, void *zmqsock,
152 zmq_msg_t *msg,
153 unsigned partnum),
154 void (*errfunc)(void *arg, void *zmqsock),
155 void *arg, void *zmqsock,
156 struct frrzmq_cb **cbp, debugargdef)
b6116506
DL
157{
158 int fd, events;
159 size_t len;
160 struct frrzmq_cb *cb;
161
afd0f10d 162 if (!cbp)
163 return -1;
b6116506 164 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
afd0f10d 165 return -1;
166 len = sizeof(fd);
167 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
168 return -1;
169 len = sizeof(events);
170 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
171 return -1;
172
173 if (*cbp)
174 cb = *cbp;
175 else {
176 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
afd0f10d 177 if (!cb)
178 return -1;
6d10727a 179
180 cb->write.cancelled = 1;
afd0f10d 181 *cbp = cb;
182 }
183
184 cb->zmqsock = zmqsock;
185 cb->fd = fd;
186 cb->read.arg = arg;
187 cb->read.cb_msg = msgfunc;
188 cb->read.cb_part = partfunc;
189 cb->read.cb_error = errfunc;
190 cb->read.cancelled = 0;
191
192 if (events & ZMQ_POLLIN) {
193 if (cb->read.thread) {
194 thread_cancel(cb->read.thread);
195 cb->read.thread = NULL;
196 }
197 funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
198 &cb->read.thread, funcname, schedfrom,
199 fromln);
200 } else
201 funcname_thread_add_read_write(
202 THREAD_READ, master, frrzmq_read_msg, cbp, fd,
203 &cb->read.thread, funcname, schedfrom, fromln);
204 return 0;
205}
206
207static int frrzmq_write_msg(struct thread *t)
208{
209 struct frrzmq_cb **cbp = THREAD_ARG(t);
210 struct frrzmq_cb *cb;
211 unsigned char written = 0;
212 int ret;
213
214 if (!cbp)
215 return 1;
216 cb = (*cbp);
217 if (!cb || !cb->zmqsock)
218 return 1;
219
220 while (1) {
221 zmq_pollitem_t polli = {.socket = cb->zmqsock,
222 .events = ZMQ_POLLOUT};
223 ret = zmq_poll(&polli, 1, 0);
224
225 if (ret < 0)
226 goto out_err;
227
228 if (!(polli.revents & ZMQ_POLLOUT))
229 break;
230
231 if (cb->write.cb_msg) {
232 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
233 written = 1;
234
235 if (cb->write.cancelled) {
236 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
237 cb->write.thread = NULL;
238 if (cb->read.cancelled && !cb->read.thread)
239 XFREE(MTYPE_ZEROMQ_CB, cb);
240 return 0;
241 }
242 continue;
243 }
244 }
245
246 if (written)
247 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
248
249 funcname_thread_add_read_write(THREAD_WRITE, t->master,
250 frrzmq_write_msg, cbp, cb->fd,
251 &cb->write.thread, t->funcname,
252 t->schedfrom, t->schedfrom_line);
253 return 0;
254
255out_err:
256 zlog_err("ZeroMQ write error: %s(%d)", strerror(errno), errno);
257 if (cb->write.cb_error)
258 cb->write.cb_error(cb->write.arg, cb->zmqsock);
259 return 1;
260}
261int funcname_frrzmq_thread_add_write(struct thread_master *master,
262 void (*msgfunc)(void *arg, void *zmqsock),
263 void (*errfunc)(void *arg, void *zmqsock),
264 void *arg, void *zmqsock,
265 struct frrzmq_cb **cbp, debugargdef)
266{
267 int fd, events;
268 size_t len;
269 struct frrzmq_cb *cb;
270
271 if (!cbp)
272 return -1;
273 if (!msgfunc)
274 return -1;
b6116506
DL
275 len = sizeof(fd);
276 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
afd0f10d 277 return -1;
b6116506
DL
278 len = sizeof(events);
279 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
afd0f10d 280 return -1;
b6116506 281
afd0f10d 282 if (*cbp)
283 cb = *cbp;
284 else {
285 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
afd0f10d 286 if (!cb)
287 return -1;
6d10727a 288
289 cb->read.cancelled = 1;
afd0f10d 290 *cbp = cb;
291 }
b6116506 292
b6116506 293 cb->zmqsock = zmqsock;
b6116506 294 cb->fd = fd;
afd0f10d 295 cb->write.arg = arg;
296 cb->write.cb_msg = msgfunc;
297 cb->write.cb_part = NULL;
298 cb->write.cb_error = errfunc;
299 cb->write.cancelled = 0;
300
301 if (events & ZMQ_POLLOUT) {
302 if (cb->write.thread) {
303 thread_cancel(cb->write.thread);
304 cb->write.thread = NULL;
305 }
306 funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
307 &cb->write.thread, funcname,
308 schedfrom, fromln);
309 } else
310 funcname_thread_add_read_write(
311 THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
312 &cb->write.thread, funcname, schedfrom, fromln);
313 return 0;
314}
b6116506 315
afd0f10d 316void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
317{
318 if (!cb || !*cb)
319 return;
320 core->cancelled = 1;
321 if (core->thread) {
322 thread_cancel(core->thread);
323 core->thread = NULL;
324 }
325 if ((*cb)->read.cancelled && !(*cb)->read.thread
326 && (*cb)->write.cancelled && (*cb)->write.thread)
327 XFREE(MTYPE_ZEROMQ_CB, *cb);
b6116506
DL
328}
329
afd0f10d 330void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
331 int event)
b6116506 332{
afd0f10d 333 struct frrzmq_cb *cb;
334 int events;
335 size_t len;
336
337 if (!cbp)
338 return;
339 cb = (*cbp);
340 if (!cb || !cb->zmqsock)
341 return;
342
81b8afcf 343 len = sizeof(events);
afd0f10d 344 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
b6116506 345 return;
afd0f10d 346 if (events & event && core->thread && !core->cancelled) {
347 struct thread_master *tm = core->thread->master;
348 thread_cancel(core->thread);
349 core->thread = NULL;
350 thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
351 : frrzmq_write_msg),
352 cbp, cb->fd, &core->thread);
b6116506 353 }
b6116506 354}