]> git.proxmox.com Git - mirror_frr.git/blame - lib/frr_zmq.c
lib: Convert internal sequence number to int64_t
[mirror_frr.git] / lib / frr_zmq.c
CommitLineData
b6116506
DL
1/*
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20#include <zebra.h>
21#include <zmq.h>
22
23#include "thread.h"
24#include "memory.h"
25#include "frr_zmq.h"
26#include "log.h"
27
28DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback")
29
30/* libzmq's context */
31void *frrzmq_context = NULL;
32static unsigned frrzmq_initcount = 0;
33
34void frrzmq_init(void)
35{
36 if (frrzmq_initcount++ == 0) {
37 frrzmq_context = zmq_ctx_new();
38 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
39 }
40}
41
42void frrzmq_finish(void)
43{
44 if (--frrzmq_initcount == 0) {
45 zmq_ctx_term(frrzmq_context);
46 frrzmq_context = NULL;
47 }
48}
49
b6116506
DL
50static int frrzmq_read_msg(struct thread *t)
51{
afd0f10d 52 struct frrzmq_cb **cbp = THREAD_ARG(t);
53 struct frrzmq_cb *cb;
b6116506
DL
54 zmq_msg_t msg;
55 unsigned partno;
afd0f10d 56 unsigned char read = 0;
b6116506
DL
57 int ret, more;
58 size_t moresz;
59
afd0f10d 60 if (!cbp)
61 return 1;
62 cb = (*cbp);
63 if (!cb || !cb->zmqsock)
64 return 1;
65
b6116506 66 while (1) {
afd0f10d 67 zmq_pollitem_t polli = {.socket = cb->zmqsock,
68 .events = ZMQ_POLLIN};
b6116506
DL
69 ret = zmq_poll(&polli, 1, 0);
70
71 if (ret < 0)
72 goto out_err;
afd0f10d 73
b6116506
DL
74 if (!(polli.revents & ZMQ_POLLIN))
75 break;
76
afd0f10d 77 if (cb->read.cb_msg) {
78 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
79 read = 1;
b6116506 80
afd0f10d 81 if (cb->read.cancelled) {
82 frrzmq_check_events(cbp, &cb->write,
83 ZMQ_POLLOUT);
84 cb->read.thread = NULL;
85 if (cb->write.cancelled && !cb->write.thread)
86 XFREE(MTYPE_ZEROMQ_CB, cb);
b6116506
DL
87 return 0;
88 }
89 continue;
90 }
91
92 partno = 0;
93 if (zmq_msg_init(&msg))
94 goto out_err;
95 do {
96 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
97 if (ret < 0) {
98 if (errno == EAGAIN)
99 break;
100
101 zmq_msg_close(&msg);
102 goto out_err;
103 }
afd0f10d 104 read = 1;
b6116506 105
afd0f10d 106 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
107 partno);
108 if (cb->read.cancelled) {
b6116506 109 zmq_msg_close(&msg);
afd0f10d 110 frrzmq_check_events(cbp, &cb->write,
111 ZMQ_POLLOUT);
112 cb->read.thread = NULL;
113 if (cb->write.cancelled && !cb->write.thread)
114 XFREE(MTYPE_ZEROMQ_CB, cb);
b6116506
DL
115 return 0;
116 }
117
118 /* cb_part may have read additional parts of the
119 * message; don't use zmq_msg_more here */
120 moresz = sizeof(more);
121 more = 0;
afd0f10d 122 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
123 &moresz);
b6116506
DL
124 if (ret < 0) {
125 zmq_msg_close(&msg);
126 goto out_err;
127 }
128
129 partno++;
130 } while (more);
131 zmq_msg_close(&msg);
132 }
133
afd0f10d 134 if (read)
135 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
136
137 funcname_thread_add_read_write(
138 THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
139 &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
b6116506
DL
140 return 0;
141
142out_err:
afd0f10d 143 zlog_err("ZeroMQ read error: %s(%d)", strerror(errno), errno);
144 if (cb->read.cb_error)
145 cb->read.cb_error(cb->read.arg, cb->zmqsock);
146 return 1;
b6116506
DL
147}
148
afd0f10d 149int funcname_frrzmq_thread_add_read(struct thread_master *master,
150 void (*msgfunc)(void *arg, void *zmqsock),
151 void (*partfunc)(void *arg, void *zmqsock,
152 zmq_msg_t *msg,
153 unsigned partnum),
154 void (*errfunc)(void *arg, void *zmqsock),
155 void *arg, void *zmqsock,
156 struct frrzmq_cb **cbp, debugargdef)
b6116506
DL
157{
158 int fd, events;
159 size_t len;
160 struct frrzmq_cb *cb;
161
afd0f10d 162 if (!cbp)
163 return -1;
b6116506 164 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
afd0f10d 165 return -1;
166 len = sizeof(fd);
167 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
168 return -1;
169 len = sizeof(events);
170 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
171 return -1;
172
173 if (*cbp)
174 cb = *cbp;
175 else {
176 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
177 cb->write.cancelled = 1;
178 if (!cb)
179 return -1;
180 *cbp = cb;
181 }
182
183 cb->zmqsock = zmqsock;
184 cb->fd = fd;
185 cb->read.arg = arg;
186 cb->read.cb_msg = msgfunc;
187 cb->read.cb_part = partfunc;
188 cb->read.cb_error = errfunc;
189 cb->read.cancelled = 0;
190
191 if (events & ZMQ_POLLIN) {
192 if (cb->read.thread) {
193 thread_cancel(cb->read.thread);
194 cb->read.thread = NULL;
195 }
196 funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
197 &cb->read.thread, funcname, schedfrom,
198 fromln);
199 } else
200 funcname_thread_add_read_write(
201 THREAD_READ, master, frrzmq_read_msg, cbp, fd,
202 &cb->read.thread, funcname, schedfrom, fromln);
203 return 0;
204}
205
206static int frrzmq_write_msg(struct thread *t)
207{
208 struct frrzmq_cb **cbp = THREAD_ARG(t);
209 struct frrzmq_cb *cb;
210 unsigned char written = 0;
211 int ret;
212
213 if (!cbp)
214 return 1;
215 cb = (*cbp);
216 if (!cb || !cb->zmqsock)
217 return 1;
218
219 while (1) {
220 zmq_pollitem_t polli = {.socket = cb->zmqsock,
221 .events = ZMQ_POLLOUT};
222 ret = zmq_poll(&polli, 1, 0);
223
224 if (ret < 0)
225 goto out_err;
226
227 if (!(polli.revents & ZMQ_POLLOUT))
228 break;
229
230 if (cb->write.cb_msg) {
231 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
232 written = 1;
233
234 if (cb->write.cancelled) {
235 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
236 cb->write.thread = NULL;
237 if (cb->read.cancelled && !cb->read.thread)
238 XFREE(MTYPE_ZEROMQ_CB, cb);
239 return 0;
240 }
241 continue;
242 }
243 }
244
245 if (written)
246 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
247
248 funcname_thread_add_read_write(THREAD_WRITE, t->master,
249 frrzmq_write_msg, cbp, cb->fd,
250 &cb->write.thread, t->funcname,
251 t->schedfrom, t->schedfrom_line);
252 return 0;
253
254out_err:
255 zlog_err("ZeroMQ write error: %s(%d)", strerror(errno), errno);
256 if (cb->write.cb_error)
257 cb->write.cb_error(cb->write.arg, cb->zmqsock);
258 return 1;
259}
260int funcname_frrzmq_thread_add_write(struct thread_master *master,
261 void (*msgfunc)(void *arg, void *zmqsock),
262 void (*errfunc)(void *arg, void *zmqsock),
263 void *arg, void *zmqsock,
264 struct frrzmq_cb **cbp, debugargdef)
265{
266 int fd, events;
267 size_t len;
268 struct frrzmq_cb *cb;
269
270 if (!cbp)
271 return -1;
272 if (!msgfunc)
273 return -1;
b6116506
DL
274 len = sizeof(fd);
275 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
afd0f10d 276 return -1;
b6116506
DL
277 len = sizeof(events);
278 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
afd0f10d 279 return -1;
b6116506 280
afd0f10d 281 if (*cbp)
282 cb = *cbp;
283 else {
284 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
285 cb->read.cancelled = 1;
286 if (!cb)
287 return -1;
288 *cbp = cb;
289 }
b6116506 290
b6116506 291 cb->zmqsock = zmqsock;
b6116506 292 cb->fd = fd;
afd0f10d 293 cb->write.arg = arg;
294 cb->write.cb_msg = msgfunc;
295 cb->write.cb_part = NULL;
296 cb->write.cb_error = errfunc;
297 cb->write.cancelled = 0;
298
299 if (events & ZMQ_POLLOUT) {
300 if (cb->write.thread) {
301 thread_cancel(cb->write.thread);
302 cb->write.thread = NULL;
303 }
304 funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
305 &cb->write.thread, funcname,
306 schedfrom, fromln);
307 } else
308 funcname_thread_add_read_write(
309 THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
310 &cb->write.thread, funcname, schedfrom, fromln);
311 return 0;
312}
b6116506 313
afd0f10d 314void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
315{
316 if (!cb || !*cb)
317 return;
318 core->cancelled = 1;
319 if (core->thread) {
320 thread_cancel(core->thread);
321 core->thread = NULL;
322 }
323 if ((*cb)->read.cancelled && !(*cb)->read.thread
324 && (*cb)->write.cancelled && (*cb)->write.thread)
325 XFREE(MTYPE_ZEROMQ_CB, *cb);
b6116506
DL
326}
327
afd0f10d 328void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
329 int event)
b6116506 330{
afd0f10d 331 struct frrzmq_cb *cb;
332 int events;
333 size_t len;
334
335 if (!cbp)
336 return;
337 cb = (*cbp);
338 if (!cb || !cb->zmqsock)
339 return;
340
81b8afcf 341 len = sizeof(events);
afd0f10d 342 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
b6116506 343 return;
afd0f10d 344 if (events & event && core->thread && !core->cancelled) {
345 struct thread_master *tm = core->thread->master;
346 thread_cancel(core->thread);
347 core->thread = NULL;
348 thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
349 : frrzmq_write_msg),
350 cbp, cb->fd, &core->thread);
b6116506 351 }
b6116506 352}