]> git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
Merge pull request #2474 from donaldsharp/vty_thread_cancel_writes
[mirror_frr.git] / lib / frr_zmq.c
1 /*
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20 #include <zebra.h>
21 #include <zmq.h>
22
23 #include "thread.h"
24 #include "memory.h"
25 #include "frr_zmq.h"
26 #include "log.h"
27
28 DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback")
29
30 /* libzmq's context */
31 void *frrzmq_context = NULL;
32 static unsigned frrzmq_initcount = 0;
33
34 void frrzmq_init(void)
35 {
36 if (frrzmq_initcount++ == 0) {
37 frrzmq_context = zmq_ctx_new();
38 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
39 }
40 }
41
42 void frrzmq_finish(void)
43 {
44 if (--frrzmq_initcount == 0) {
45 zmq_ctx_term(frrzmq_context);
46 frrzmq_context = NULL;
47 }
48 }
49
50 static int frrzmq_read_msg(struct thread *t)
51 {
52 struct frrzmq_cb **cbp = THREAD_ARG(t);
53 struct frrzmq_cb *cb;
54 zmq_msg_t msg;
55 unsigned partno;
56 unsigned char read = 0;
57 int ret, more;
58 size_t moresz;
59
60 if (!cbp)
61 return 1;
62 cb = (*cbp);
63 if (!cb || !cb->zmqsock)
64 return 1;
65
66 while (1) {
67 zmq_pollitem_t polli = {.socket = cb->zmqsock,
68 .events = ZMQ_POLLIN};
69 ret = zmq_poll(&polli, 1, 0);
70
71 if (ret < 0)
72 goto out_err;
73
74 if (!(polli.revents & ZMQ_POLLIN))
75 break;
76
77 if (cb->read.cb_msg) {
78 cb->read.cb_msg(cb->read.arg, cb->zmqsock);
79 read = 1;
80
81 if (cb->read.cancelled) {
82 frrzmq_check_events(cbp, &cb->write,
83 ZMQ_POLLOUT);
84 cb->read.thread = NULL;
85 if (cb->write.cancelled && !cb->write.thread)
86 XFREE(MTYPE_ZEROMQ_CB, cb);
87 return 0;
88 }
89 continue;
90 }
91
92 partno = 0;
93 if (zmq_msg_init(&msg))
94 goto out_err;
95 do {
96 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
97 if (ret < 0) {
98 if (errno == EAGAIN)
99 break;
100
101 zmq_msg_close(&msg);
102 goto out_err;
103 }
104 read = 1;
105
106 cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg,
107 partno);
108 if (cb->read.cancelled) {
109 zmq_msg_close(&msg);
110 frrzmq_check_events(cbp, &cb->write,
111 ZMQ_POLLOUT);
112 cb->read.thread = NULL;
113 if (cb->write.cancelled && !cb->write.thread)
114 XFREE(MTYPE_ZEROMQ_CB, cb);
115 return 0;
116 }
117
118 /* cb_part may have read additional parts of the
119 * message; don't use zmq_msg_more here */
120 moresz = sizeof(more);
121 more = 0;
122 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more,
123 &moresz);
124 if (ret < 0) {
125 zmq_msg_close(&msg);
126 goto out_err;
127 }
128
129 partno++;
130 } while (more);
131 zmq_msg_close(&msg);
132 }
133
134 if (read)
135 frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT);
136
137 funcname_thread_add_read_write(
138 THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd,
139 &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line);
140 return 0;
141
142 out_err:
143 zlog_err("ZeroMQ read error: %s(%d)", strerror(errno), errno);
144 if (cb->read.cb_error)
145 cb->read.cb_error(cb->read.arg, cb->zmqsock);
146 return 1;
147 }
148
149 int funcname_frrzmq_thread_add_read(struct thread_master *master,
150 void (*msgfunc)(void *arg, void *zmqsock),
151 void (*partfunc)(void *arg, void *zmqsock,
152 zmq_msg_t *msg,
153 unsigned partnum),
154 void (*errfunc)(void *arg, void *zmqsock),
155 void *arg, void *zmqsock,
156 struct frrzmq_cb **cbp, debugargdef)
157 {
158 int fd, events;
159 size_t len;
160 struct frrzmq_cb *cb;
161
162 if (!cbp)
163 return -1;
164 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
165 return -1;
166 len = sizeof(fd);
167 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
168 return -1;
169 len = sizeof(events);
170 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
171 return -1;
172
173 if (*cbp)
174 cb = *cbp;
175 else {
176 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
177 if (!cb)
178 return -1;
179
180 cb->write.cancelled = 1;
181 *cbp = cb;
182 }
183
184 cb->zmqsock = zmqsock;
185 cb->fd = fd;
186 cb->read.arg = arg;
187 cb->read.cb_msg = msgfunc;
188 cb->read.cb_part = partfunc;
189 cb->read.cb_error = errfunc;
190 cb->read.cancelled = 0;
191
192 if (events & ZMQ_POLLIN) {
193 if (cb->read.thread) {
194 thread_cancel(cb->read.thread);
195 cb->read.thread = NULL;
196 }
197 funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd,
198 &cb->read.thread, funcname, schedfrom,
199 fromln);
200 } else
201 funcname_thread_add_read_write(
202 THREAD_READ, master, frrzmq_read_msg, cbp, fd,
203 &cb->read.thread, funcname, schedfrom, fromln);
204 return 0;
205 }
206
207 static int frrzmq_write_msg(struct thread *t)
208 {
209 struct frrzmq_cb **cbp = THREAD_ARG(t);
210 struct frrzmq_cb *cb;
211 unsigned char written = 0;
212 int ret;
213
214 if (!cbp)
215 return 1;
216 cb = (*cbp);
217 if (!cb || !cb->zmqsock)
218 return 1;
219
220 while (1) {
221 zmq_pollitem_t polli = {.socket = cb->zmqsock,
222 .events = ZMQ_POLLOUT};
223 ret = zmq_poll(&polli, 1, 0);
224
225 if (ret < 0)
226 goto out_err;
227
228 if (!(polli.revents & ZMQ_POLLOUT))
229 break;
230
231 if (cb->write.cb_msg) {
232 cb->write.cb_msg(cb->write.arg, cb->zmqsock);
233 written = 1;
234
235 if (cb->write.cancelled) {
236 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
237 cb->write.thread = NULL;
238 if (cb->read.cancelled && !cb->read.thread)
239 XFREE(MTYPE_ZEROMQ_CB, cb);
240 return 0;
241 }
242 continue;
243 }
244 }
245
246 if (written)
247 frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN);
248
249 funcname_thread_add_read_write(THREAD_WRITE, t->master,
250 frrzmq_write_msg, cbp, cb->fd,
251 &cb->write.thread, t->funcname,
252 t->schedfrom, t->schedfrom_line);
253 return 0;
254
255 out_err:
256 zlog_err("ZeroMQ write error: %s(%d)", strerror(errno), errno);
257 if (cb->write.cb_error)
258 cb->write.cb_error(cb->write.arg, cb->zmqsock);
259 return 1;
260 }
261 int funcname_frrzmq_thread_add_write(struct thread_master *master,
262 void (*msgfunc)(void *arg, void *zmqsock),
263 void (*errfunc)(void *arg, void *zmqsock),
264 void *arg, void *zmqsock,
265 struct frrzmq_cb **cbp, debugargdef)
266 {
267 int fd, events;
268 size_t len;
269 struct frrzmq_cb *cb;
270
271 if (!cbp)
272 return -1;
273 if (!msgfunc)
274 return -1;
275 len = sizeof(fd);
276 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
277 return -1;
278 len = sizeof(events);
279 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
280 return -1;
281
282 if (*cbp)
283 cb = *cbp;
284 else {
285 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
286 if (!cb)
287 return -1;
288
289 cb->read.cancelled = 1;
290 *cbp = cb;
291 }
292
293 cb->zmqsock = zmqsock;
294 cb->fd = fd;
295 cb->write.arg = arg;
296 cb->write.cb_msg = msgfunc;
297 cb->write.cb_part = NULL;
298 cb->write.cb_error = errfunc;
299 cb->write.cancelled = 0;
300
301 if (events & ZMQ_POLLOUT) {
302 if (cb->write.thread) {
303 thread_cancel(cb->write.thread);
304 cb->write.thread = NULL;
305 }
306 funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd,
307 &cb->write.thread, funcname,
308 schedfrom, fromln);
309 } else
310 funcname_thread_add_read_write(
311 THREAD_WRITE, master, frrzmq_write_msg, cbp, fd,
312 &cb->write.thread, funcname, schedfrom, fromln);
313 return 0;
314 }
315
316 void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core)
317 {
318 if (!cb || !*cb)
319 return;
320 core->cancelled = 1;
321 if (core->thread) {
322 thread_cancel(core->thread);
323 core->thread = NULL;
324 }
325 if ((*cb)->read.cancelled && !(*cb)->read.thread
326 && (*cb)->write.cancelled && (*cb)->write.thread)
327 XFREE(MTYPE_ZEROMQ_CB, *cb);
328 }
329
330 void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core,
331 int event)
332 {
333 struct frrzmq_cb *cb;
334 int events;
335 size_t len;
336
337 if (!cbp)
338 return;
339 cb = (*cbp);
340 if (!cb || !cb->zmqsock)
341 return;
342
343 len = sizeof(events);
344 if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len))
345 return;
346 if (events & event && core->thread && !core->cancelled) {
347 struct thread_master *tm = core->thread->master;
348 thread_cancel(core->thread);
349 core->thread = NULL;
350 thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg
351 : frrzmq_write_msg),
352 cbp, cb->fd, &core->thread);
353 }
354 }