]>
Commit | Line | Data |
---|---|---|
b6116506 DL |
1 | /* |
2 | * libzebra ZeroMQ bindings | |
3 | * Copyright (C) 2015 David Lamparter | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify it | |
6 | * under the terms of the GNU General Public License as published by the Free | |
7 | * Software Foundation; either version 2 of the License, or (at your option) | |
8 | * any later version. | |
9 | * | |
10 | * This program is distributed in the hope that it will be useful, but WITHOUT | |
11 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
12 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
13 | * more details. | |
14 | * | |
15 | * You should have received a copy of the GNU General Public License along | |
16 | * with this program; see the file COPYING; if not, write to the Free Software | |
17 | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
18 | */ | |
19 | ||
fb1df4cd DS |
20 | /* |
21 | * IF YOU MODIFY THIS FILE PLEASE RUN `make check` and ensure that | |
214d8a60 | 22 | * the test_zmq.c unit test is still working. There are dependencies |
fb1df4cd DS |
23 | * between the two that are extremely fragile. My understanding |
24 | * is that there is specialized ownership of the cb pointer based | |
25 | * upon what is happening. Those assumptions are supposed to be | |
26 | * tested in the test_zmq.c | |
27 | */ | |
b6116506 DL |
28 | #include <zebra.h> |
29 | #include <zmq.h> | |
30 | ||
31 | #include "thread.h" | |
32 | #include "memory.h" | |
33 | #include "frr_zmq.h" | |
34 | #include "log.h" | |
35774357 | 35 | #include "lib_errors.h" |
b6116506 | 36 | |
bf8d3d6a | 37 | DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback"); |
b6116506 DL |
38 | |
39 | /* libzmq's context */ | |
40 | void *frrzmq_context = NULL; | |
41 | static unsigned frrzmq_initcount = 0; | |
42 | ||
43 | void frrzmq_init(void) | |
44 | { | |
45 | if (frrzmq_initcount++ == 0) { | |
46 | frrzmq_context = zmq_ctx_new(); | |
47 | zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1); | |
48 | } | |
49 | } | |
50 | ||
51 | void frrzmq_finish(void) | |
52 | { | |
53 | if (--frrzmq_initcount == 0) { | |
54 | zmq_ctx_term(frrzmq_context); | |
55 | frrzmq_context = NULL; | |
56 | } | |
57 | } | |
58 | ||
cc9f21da | 59 | static void frrzmq_read_msg(struct thread *t) |
b6116506 | 60 | { |
afd0f10d | 61 | struct frrzmq_cb **cbp = THREAD_ARG(t); |
62 | struct frrzmq_cb *cb; | |
b6116506 DL |
63 | zmq_msg_t msg; |
64 | unsigned partno; | |
afd0f10d | 65 | unsigned char read = 0; |
b6116506 DL |
66 | int ret, more; |
67 | size_t moresz; | |
68 | ||
afd0f10d | 69 | if (!cbp) |
cc9f21da | 70 | return; |
afd0f10d | 71 | cb = (*cbp); |
72 | if (!cb || !cb->zmqsock) | |
cc9f21da | 73 | return; |
afd0f10d | 74 | |
b6116506 | 75 | while (1) { |
afd0f10d | 76 | zmq_pollitem_t polli = {.socket = cb->zmqsock, |
77 | .events = ZMQ_POLLIN}; | |
b6116506 DL |
78 | ret = zmq_poll(&polli, 1, 0); |
79 | ||
80 | if (ret < 0) | |
81 | goto out_err; | |
afd0f10d | 82 | |
b6116506 DL |
83 | if (!(polli.revents & ZMQ_POLLIN)) |
84 | break; | |
85 | ||
afd0f10d | 86 | if (cb->read.cb_msg) { |
8fd5502b | 87 | cb->in_cb = true; |
afd0f10d | 88 | cb->read.cb_msg(cb->read.arg, cb->zmqsock); |
8fd5502b MS |
89 | cb->in_cb = false; |
90 | ||
afd0f10d | 91 | read = 1; |
b6116506 | 92 | |
afd0f10d | 93 | if (cb->read.cancelled) { |
94 | frrzmq_check_events(cbp, &cb->write, | |
95 | ZMQ_POLLOUT); | |
96 | cb->read.thread = NULL; | |
97 | if (cb->write.cancelled && !cb->write.thread) | |
cf2182c0 MS |
98 | XFREE(MTYPE_ZEROMQ_CB, *cbp); |
99 | ||
cc9f21da | 100 | return; |
b6116506 DL |
101 | } |
102 | continue; | |
103 | } | |
104 | ||
105 | partno = 0; | |
106 | if (zmq_msg_init(&msg)) | |
107 | goto out_err; | |
108 | do { | |
109 | ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK); | |
110 | if (ret < 0) { | |
111 | if (errno == EAGAIN) | |
112 | break; | |
113 | ||
114 | zmq_msg_close(&msg); | |
115 | goto out_err; | |
116 | } | |
afd0f10d | 117 | read = 1; |
b6116506 | 118 | |
8fd5502b | 119 | cb->in_cb = true; |
afd0f10d | 120 | cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg, |
121 | partno); | |
8fd5502b MS |
122 | cb->in_cb = false; |
123 | ||
afd0f10d | 124 | if (cb->read.cancelled) { |
b6116506 | 125 | zmq_msg_close(&msg); |
afd0f10d | 126 | frrzmq_check_events(cbp, &cb->write, |
127 | ZMQ_POLLOUT); | |
128 | cb->read.thread = NULL; | |
129 | if (cb->write.cancelled && !cb->write.thread) | |
cf2182c0 MS |
130 | XFREE(MTYPE_ZEROMQ_CB, *cbp); |
131 | ||
cc9f21da | 132 | return; |
b6116506 DL |
133 | } |
134 | ||
135 | /* cb_part may have read additional parts of the | |
136 | * message; don't use zmq_msg_more here */ | |
137 | moresz = sizeof(more); | |
138 | more = 0; | |
afd0f10d | 139 | ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more, |
140 | &moresz); | |
b6116506 DL |
141 | if (ret < 0) { |
142 | zmq_msg_close(&msg); | |
143 | goto out_err; | |
144 | } | |
145 | ||
146 | partno++; | |
147 | } while (more); | |
148 | zmq_msg_close(&msg); | |
149 | } | |
150 | ||
afd0f10d | 151 | if (read) |
152 | frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT); | |
153 | ||
1dffe357 MS |
154 | thread_add_read(t->master, frrzmq_read_msg, cbp, |
155 | cb->fd, &cb->read.thread); | |
cc9f21da | 156 | return; |
b6116506 DL |
157 | |
158 | out_err: | |
450971aa | 159 | flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno), |
1c50c1c0 | 160 | errno); |
afd0f10d | 161 | if (cb->read.cb_error) |
162 | cb->read.cb_error(cb->read.arg, cb->zmqsock); | |
b6116506 DL |
163 | } |
164 | ||
60a3efec DL |
165 | int _frrzmq_thread_add_read(const struct xref_threadsched *xref, |
166 | struct thread_master *master, | |
167 | void (*msgfunc)(void *arg, void *zmqsock), | |
168 | void (*partfunc)(void *arg, void *zmqsock, | |
169 | zmq_msg_t *msg, unsigned partnum), | |
170 | void (*errfunc)(void *arg, void *zmqsock), | |
171 | void *arg, void *zmqsock, | |
172 | struct frrzmq_cb **cbp) | |
b6116506 DL |
173 | { |
174 | int fd, events; | |
175 | size_t len; | |
176 | struct frrzmq_cb *cb; | |
177 | ||
afd0f10d | 178 | if (!cbp) |
179 | return -1; | |
b6116506 | 180 | if (!(msgfunc || partfunc) || (msgfunc && partfunc)) |
afd0f10d | 181 | return -1; |
182 | len = sizeof(fd); | |
183 | if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) | |
184 | return -1; | |
185 | len = sizeof(events); | |
186 | if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) | |
187 | return -1; | |
188 | ||
189 | if (*cbp) | |
190 | cb = *cbp; | |
191 | else { | |
192 | cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); | |
08c2d52a | 193 | cb->write.cancelled = true; |
afd0f10d | 194 | *cbp = cb; |
195 | } | |
196 | ||
197 | cb->zmqsock = zmqsock; | |
198 | cb->fd = fd; | |
199 | cb->read.arg = arg; | |
200 | cb->read.cb_msg = msgfunc; | |
201 | cb->read.cb_part = partfunc; | |
202 | cb->read.cb_error = errfunc; | |
08c2d52a | 203 | cb->read.cancelled = false; |
8fd5502b | 204 | cb->in_cb = false; |
afd0f10d | 205 | |
206 | if (events & ZMQ_POLLIN) { | |
b3d6bc6e MS |
207 | thread_cancel(&cb->read.thread); |
208 | ||
1dffe357 | 209 | thread_add_event(master, frrzmq_read_msg, cbp, fd, |
60a3efec | 210 | &cb->read.thread); |
afd0f10d | 211 | } else |
1dffe357 MS |
212 | thread_add_read(master, frrzmq_read_msg, cbp, fd, |
213 | &cb->read.thread); | |
afd0f10d | 214 | return 0; |
215 | } | |
216 | ||
cc9f21da | 217 | static void frrzmq_write_msg(struct thread *t) |
afd0f10d | 218 | { |
219 | struct frrzmq_cb **cbp = THREAD_ARG(t); | |
220 | struct frrzmq_cb *cb; | |
221 | unsigned char written = 0; | |
222 | int ret; | |
223 | ||
224 | if (!cbp) | |
cc9f21da | 225 | return; |
afd0f10d | 226 | cb = (*cbp); |
227 | if (!cb || !cb->zmqsock) | |
cc9f21da | 228 | return; |
afd0f10d | 229 | |
230 | while (1) { | |
231 | zmq_pollitem_t polli = {.socket = cb->zmqsock, | |
232 | .events = ZMQ_POLLOUT}; | |
233 | ret = zmq_poll(&polli, 1, 0); | |
234 | ||
235 | if (ret < 0) | |
236 | goto out_err; | |
237 | ||
238 | if (!(polli.revents & ZMQ_POLLOUT)) | |
239 | break; | |
240 | ||
241 | if (cb->write.cb_msg) { | |
8fd5502b | 242 | cb->in_cb = true; |
afd0f10d | 243 | cb->write.cb_msg(cb->write.arg, cb->zmqsock); |
8fd5502b MS |
244 | cb->in_cb = false; |
245 | ||
afd0f10d | 246 | written = 1; |
247 | ||
248 | if (cb->write.cancelled) { | |
249 | frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); | |
250 | cb->write.thread = NULL; | |
251 | if (cb->read.cancelled && !cb->read.thread) | |
cf2182c0 MS |
252 | XFREE(MTYPE_ZEROMQ_CB, *cbp); |
253 | ||
cc9f21da | 254 | return; |
afd0f10d | 255 | } |
256 | continue; | |
257 | } | |
258 | } | |
259 | ||
260 | if (written) | |
261 | frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); | |
262 | ||
1dffe357 MS |
263 | thread_add_write(t->master, frrzmq_write_msg, cbp, |
264 | cb->fd, &cb->write.thread); | |
cc9f21da | 265 | return; |
afd0f10d | 266 | |
267 | out_err: | |
450971aa | 268 | flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno), |
1c50c1c0 | 269 | errno); |
afd0f10d | 270 | if (cb->write.cb_error) |
271 | cb->write.cb_error(cb->write.arg, cb->zmqsock); | |
afd0f10d | 272 | } |
60a3efec DL |
273 | |
274 | int _frrzmq_thread_add_write(const struct xref_threadsched *xref, | |
275 | struct thread_master *master, | |
276 | void (*msgfunc)(void *arg, void *zmqsock), | |
277 | void (*errfunc)(void *arg, void *zmqsock), | |
278 | void *arg, void *zmqsock, struct frrzmq_cb **cbp) | |
afd0f10d | 279 | { |
280 | int fd, events; | |
281 | size_t len; | |
282 | struct frrzmq_cb *cb; | |
283 | ||
284 | if (!cbp) | |
285 | return -1; | |
286 | if (!msgfunc) | |
287 | return -1; | |
b6116506 DL |
288 | len = sizeof(fd); |
289 | if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) | |
afd0f10d | 290 | return -1; |
b6116506 DL |
291 | len = sizeof(events); |
292 | if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) | |
afd0f10d | 293 | return -1; |
b6116506 | 294 | |
afd0f10d | 295 | if (*cbp) |
296 | cb = *cbp; | |
297 | else { | |
298 | cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); | |
08c2d52a | 299 | cb->read.cancelled = true; |
afd0f10d | 300 | *cbp = cb; |
301 | } | |
b6116506 | 302 | |
b6116506 | 303 | cb->zmqsock = zmqsock; |
b6116506 | 304 | cb->fd = fd; |
afd0f10d | 305 | cb->write.arg = arg; |
306 | cb->write.cb_msg = msgfunc; | |
307 | cb->write.cb_part = NULL; | |
308 | cb->write.cb_error = errfunc; | |
08c2d52a | 309 | cb->write.cancelled = false; |
8fd5502b | 310 | cb->in_cb = false; |
afd0f10d | 311 | |
312 | if (events & ZMQ_POLLOUT) { | |
b3d6bc6e MS |
313 | thread_cancel(&cb->write.thread); |
314 | ||
60a3efec DL |
315 | _thread_add_event(xref, master, frrzmq_write_msg, cbp, fd, |
316 | &cb->write.thread); | |
afd0f10d | 317 | } else |
1dffe357 MS |
318 | thread_add_write(master, frrzmq_write_msg, cbp, fd, |
319 | &cb->write.thread); | |
afd0f10d | 320 | return 0; |
321 | } | |
b6116506 | 322 | |
afd0f10d | 323 | void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core) |
324 | { | |
325 | if (!cb || !*cb) | |
326 | return; | |
08c2d52a | 327 | core->cancelled = true; |
b3d6bc6e MS |
328 | thread_cancel(&core->thread); |
329 | ||
8fd5502b MS |
330 | /* If cancelled from within a callback, don't try to free memory |
331 | * in this path. | |
fb1df4cd | 332 | */ |
8fd5502b MS |
333 | if ((*cb)->in_cb) |
334 | return; | |
335 | ||
336 | /* Ok to free the callback context if no more ... context. */ | |
afd0f10d | 337 | if ((*cb)->read.cancelled && !(*cb)->read.thread |
8fd5502b | 338 | && (*cb)->write.cancelled && ((*cb)->write.thread == NULL)) |
afd0f10d | 339 | XFREE(MTYPE_ZEROMQ_CB, *cb); |
b6116506 DL |
340 | } |
341 | ||
afd0f10d | 342 | void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core, |
343 | int event) | |
b6116506 | 344 | { |
afd0f10d | 345 | struct frrzmq_cb *cb; |
346 | int events; | |
347 | size_t len; | |
348 | ||
349 | if (!cbp) | |
350 | return; | |
351 | cb = (*cbp); | |
352 | if (!cb || !cb->zmqsock) | |
353 | return; | |
354 | ||
81b8afcf | 355 | len = sizeof(events); |
afd0f10d | 356 | if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len)) |
b6116506 | 357 | return; |
e08165de | 358 | if ((events & event) && core->thread && !core->cancelled) { |
afd0f10d | 359 | struct thread_master *tm = core->thread->master; |
e08165de | 360 | |
b3d6bc6e MS |
361 | thread_cancel(&core->thread); |
362 | ||
e08165de MS |
363 | if (event == ZMQ_POLLIN) |
364 | thread_add_event(tm, frrzmq_read_msg, | |
365 | cbp, cb->fd, &core->thread); | |
366 | else | |
367 | thread_add_event(tm, frrzmq_write_msg, | |
368 | cbp, cb->fd, &core->thread); | |
b6116506 | 369 | } |
b6116506 | 370 | } |