]>
Commit | Line | Data |
---|---|---|
b6116506 DL |
1 | /* |
2 | * libzebra ZeroMQ bindings | |
3 | * Copyright (C) 2015 David Lamparter | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify it | |
6 | * under the terms of the GNU General Public License as published by the Free | |
7 | * Software Foundation; either version 2 of the License, or (at your option) | |
8 | * any later version. | |
9 | * | |
10 | * This program is distributed in the hope that it will be useful, but WITHOUT | |
11 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
12 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
13 | * more details. | |
14 | * | |
15 | * You should have received a copy of the GNU General Public License along | |
16 | * with this program; see the file COPYING; if not, write to the Free Software | |
17 | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
18 | */ | |
19 | ||
20 | #include <zebra.h> | |
21 | #include <zmq.h> | |
22 | ||
23 | #include "thread.h" | |
24 | #include "memory.h" | |
25 | #include "frr_zmq.h" | |
26 | #include "log.h" | |
35774357 | 27 | #include "lib_errors.h" |
b6116506 DL |
28 | |
29 | DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback") | |
30 | ||
31 | /* libzmq's context */ | |
32 | void *frrzmq_context = NULL; | |
33 | static unsigned frrzmq_initcount = 0; | |
34 | ||
35 | void frrzmq_init(void) | |
36 | { | |
37 | if (frrzmq_initcount++ == 0) { | |
38 | frrzmq_context = zmq_ctx_new(); | |
39 | zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1); | |
40 | } | |
41 | } | |
42 | ||
43 | void frrzmq_finish(void) | |
44 | { | |
45 | if (--frrzmq_initcount == 0) { | |
46 | zmq_ctx_term(frrzmq_context); | |
47 | frrzmq_context = NULL; | |
48 | } | |
49 | } | |
50 | ||
b6116506 DL |
51 | static int frrzmq_read_msg(struct thread *t) |
52 | { | |
afd0f10d | 53 | struct frrzmq_cb **cbp = THREAD_ARG(t); |
54 | struct frrzmq_cb *cb; | |
b6116506 DL |
55 | zmq_msg_t msg; |
56 | unsigned partno; | |
afd0f10d | 57 | unsigned char read = 0; |
b6116506 DL |
58 | int ret, more; |
59 | size_t moresz; | |
60 | ||
afd0f10d | 61 | if (!cbp) |
62 | return 1; | |
63 | cb = (*cbp); | |
64 | if (!cb || !cb->zmqsock) | |
65 | return 1; | |
66 | ||
b6116506 | 67 | while (1) { |
afd0f10d | 68 | zmq_pollitem_t polli = {.socket = cb->zmqsock, |
69 | .events = ZMQ_POLLIN}; | |
b6116506 DL |
70 | ret = zmq_poll(&polli, 1, 0); |
71 | ||
72 | if (ret < 0) | |
73 | goto out_err; | |
afd0f10d | 74 | |
b6116506 DL |
75 | if (!(polli.revents & ZMQ_POLLIN)) |
76 | break; | |
77 | ||
afd0f10d | 78 | if (cb->read.cb_msg) { |
79 | cb->read.cb_msg(cb->read.arg, cb->zmqsock); | |
80 | read = 1; | |
b6116506 | 81 | |
afd0f10d | 82 | if (cb->read.cancelled) { |
83 | frrzmq_check_events(cbp, &cb->write, | |
84 | ZMQ_POLLOUT); | |
85 | cb->read.thread = NULL; | |
86 | if (cb->write.cancelled && !cb->write.thread) | |
87 | XFREE(MTYPE_ZEROMQ_CB, cb); | |
b6116506 DL |
88 | return 0; |
89 | } | |
90 | continue; | |
91 | } | |
92 | ||
93 | partno = 0; | |
94 | if (zmq_msg_init(&msg)) | |
95 | goto out_err; | |
96 | do { | |
97 | ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK); | |
98 | if (ret < 0) { | |
99 | if (errno == EAGAIN) | |
100 | break; | |
101 | ||
102 | zmq_msg_close(&msg); | |
103 | goto out_err; | |
104 | } | |
afd0f10d | 105 | read = 1; |
b6116506 | 106 | |
afd0f10d | 107 | cb->read.cb_part(cb->read.arg, cb->zmqsock, &msg, |
108 | partno); | |
109 | if (cb->read.cancelled) { | |
b6116506 | 110 | zmq_msg_close(&msg); |
afd0f10d | 111 | frrzmq_check_events(cbp, &cb->write, |
112 | ZMQ_POLLOUT); | |
113 | cb->read.thread = NULL; | |
114 | if (cb->write.cancelled && !cb->write.thread) | |
115 | XFREE(MTYPE_ZEROMQ_CB, cb); | |
b6116506 DL |
116 | return 0; |
117 | } | |
118 | ||
119 | /* cb_part may have read additional parts of the | |
120 | * message; don't use zmq_msg_more here */ | |
121 | moresz = sizeof(more); | |
122 | more = 0; | |
afd0f10d | 123 | ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, &more, |
124 | &moresz); | |
b6116506 DL |
125 | if (ret < 0) { |
126 | zmq_msg_close(&msg); | |
127 | goto out_err; | |
128 | } | |
129 | ||
130 | partno++; | |
131 | } while (more); | |
132 | zmq_msg_close(&msg); | |
133 | } | |
134 | ||
afd0f10d | 135 | if (read) |
136 | frrzmq_check_events(cbp, &cb->write, ZMQ_POLLOUT); | |
137 | ||
138 | funcname_thread_add_read_write( | |
139 | THREAD_READ, t->master, frrzmq_read_msg, cbp, cb->fd, | |
140 | &cb->read.thread, t->funcname, t->schedfrom, t->schedfrom_line); | |
b6116506 DL |
141 | return 0; |
142 | ||
143 | out_err: | |
450971aa | 144 | flog_err(EC_LIB_ZMQ, "ZeroMQ read error: %s(%d)", strerror(errno), |
1c50c1c0 | 145 | errno); |
afd0f10d | 146 | if (cb->read.cb_error) |
147 | cb->read.cb_error(cb->read.arg, cb->zmqsock); | |
148 | return 1; | |
b6116506 DL |
149 | } |
150 | ||
afd0f10d | 151 | int funcname_frrzmq_thread_add_read(struct thread_master *master, |
152 | void (*msgfunc)(void *arg, void *zmqsock), | |
153 | void (*partfunc)(void *arg, void *zmqsock, | |
154 | zmq_msg_t *msg, | |
155 | unsigned partnum), | |
156 | void (*errfunc)(void *arg, void *zmqsock), | |
157 | void *arg, void *zmqsock, | |
158 | struct frrzmq_cb **cbp, debugargdef) | |
b6116506 DL |
159 | { |
160 | int fd, events; | |
161 | size_t len; | |
162 | struct frrzmq_cb *cb; | |
163 | ||
afd0f10d | 164 | if (!cbp) |
165 | return -1; | |
b6116506 | 166 | if (!(msgfunc || partfunc) || (msgfunc && partfunc)) |
afd0f10d | 167 | return -1; |
168 | len = sizeof(fd); | |
169 | if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) | |
170 | return -1; | |
171 | len = sizeof(events); | |
172 | if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) | |
173 | return -1; | |
174 | ||
175 | if (*cbp) | |
176 | cb = *cbp; | |
177 | else { | |
178 | cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); | |
afd0f10d | 179 | if (!cb) |
180 | return -1; | |
6d10727a | 181 | |
182 | cb->write.cancelled = 1; | |
afd0f10d | 183 | *cbp = cb; |
184 | } | |
185 | ||
186 | cb->zmqsock = zmqsock; | |
187 | cb->fd = fd; | |
188 | cb->read.arg = arg; | |
189 | cb->read.cb_msg = msgfunc; | |
190 | cb->read.cb_part = partfunc; | |
191 | cb->read.cb_error = errfunc; | |
192 | cb->read.cancelled = 0; | |
193 | ||
194 | if (events & ZMQ_POLLIN) { | |
195 | if (cb->read.thread) { | |
196 | thread_cancel(cb->read.thread); | |
197 | cb->read.thread = NULL; | |
198 | } | |
199 | funcname_thread_add_event(master, frrzmq_read_msg, cbp, fd, | |
200 | &cb->read.thread, funcname, schedfrom, | |
201 | fromln); | |
202 | } else | |
203 | funcname_thread_add_read_write( | |
204 | THREAD_READ, master, frrzmq_read_msg, cbp, fd, | |
205 | &cb->read.thread, funcname, schedfrom, fromln); | |
206 | return 0; | |
207 | } | |
208 | ||
209 | static int frrzmq_write_msg(struct thread *t) | |
210 | { | |
211 | struct frrzmq_cb **cbp = THREAD_ARG(t); | |
212 | struct frrzmq_cb *cb; | |
213 | unsigned char written = 0; | |
214 | int ret; | |
215 | ||
216 | if (!cbp) | |
217 | return 1; | |
218 | cb = (*cbp); | |
219 | if (!cb || !cb->zmqsock) | |
220 | return 1; | |
221 | ||
222 | while (1) { | |
223 | zmq_pollitem_t polli = {.socket = cb->zmqsock, | |
224 | .events = ZMQ_POLLOUT}; | |
225 | ret = zmq_poll(&polli, 1, 0); | |
226 | ||
227 | if (ret < 0) | |
228 | goto out_err; | |
229 | ||
230 | if (!(polli.revents & ZMQ_POLLOUT)) | |
231 | break; | |
232 | ||
233 | if (cb->write.cb_msg) { | |
234 | cb->write.cb_msg(cb->write.arg, cb->zmqsock); | |
235 | written = 1; | |
236 | ||
237 | if (cb->write.cancelled) { | |
238 | frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); | |
239 | cb->write.thread = NULL; | |
240 | if (cb->read.cancelled && !cb->read.thread) | |
241 | XFREE(MTYPE_ZEROMQ_CB, cb); | |
242 | return 0; | |
243 | } | |
244 | continue; | |
245 | } | |
246 | } | |
247 | ||
248 | if (written) | |
249 | frrzmq_check_events(cbp, &cb->read, ZMQ_POLLIN); | |
250 | ||
251 | funcname_thread_add_read_write(THREAD_WRITE, t->master, | |
252 | frrzmq_write_msg, cbp, cb->fd, | |
253 | &cb->write.thread, t->funcname, | |
254 | t->schedfrom, t->schedfrom_line); | |
255 | return 0; | |
256 | ||
257 | out_err: | |
450971aa | 258 | flog_err(EC_LIB_ZMQ, "ZeroMQ write error: %s(%d)", strerror(errno), |
1c50c1c0 | 259 | errno); |
afd0f10d | 260 | if (cb->write.cb_error) |
261 | cb->write.cb_error(cb->write.arg, cb->zmqsock); | |
262 | return 1; | |
263 | } | |
264 | int funcname_frrzmq_thread_add_write(struct thread_master *master, | |
265 | void (*msgfunc)(void *arg, void *zmqsock), | |
266 | void (*errfunc)(void *arg, void *zmqsock), | |
267 | void *arg, void *zmqsock, | |
268 | struct frrzmq_cb **cbp, debugargdef) | |
269 | { | |
270 | int fd, events; | |
271 | size_t len; | |
272 | struct frrzmq_cb *cb; | |
273 | ||
274 | if (!cbp) | |
275 | return -1; | |
276 | if (!msgfunc) | |
277 | return -1; | |
b6116506 DL |
278 | len = sizeof(fd); |
279 | if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) | |
afd0f10d | 280 | return -1; |
b6116506 DL |
281 | len = sizeof(events); |
282 | if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) | |
afd0f10d | 283 | return -1; |
b6116506 | 284 | |
afd0f10d | 285 | if (*cbp) |
286 | cb = *cbp; | |
287 | else { | |
288 | cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); | |
afd0f10d | 289 | if (!cb) |
290 | return -1; | |
6d10727a | 291 | |
292 | cb->read.cancelled = 1; | |
afd0f10d | 293 | *cbp = cb; |
294 | } | |
b6116506 | 295 | |
b6116506 | 296 | cb->zmqsock = zmqsock; |
b6116506 | 297 | cb->fd = fd; |
afd0f10d | 298 | cb->write.arg = arg; |
299 | cb->write.cb_msg = msgfunc; | |
300 | cb->write.cb_part = NULL; | |
301 | cb->write.cb_error = errfunc; | |
302 | cb->write.cancelled = 0; | |
303 | ||
304 | if (events & ZMQ_POLLOUT) { | |
305 | if (cb->write.thread) { | |
306 | thread_cancel(cb->write.thread); | |
307 | cb->write.thread = NULL; | |
308 | } | |
309 | funcname_thread_add_event(master, frrzmq_write_msg, cbp, fd, | |
310 | &cb->write.thread, funcname, | |
311 | schedfrom, fromln); | |
312 | } else | |
313 | funcname_thread_add_read_write( | |
314 | THREAD_WRITE, master, frrzmq_write_msg, cbp, fd, | |
315 | &cb->write.thread, funcname, schedfrom, fromln); | |
316 | return 0; | |
317 | } | |
b6116506 | 318 | |
afd0f10d | 319 | void frrzmq_thread_cancel(struct frrzmq_cb **cb, struct cb_core *core) |
320 | { | |
321 | if (!cb || !*cb) | |
322 | return; | |
323 | core->cancelled = 1; | |
324 | if (core->thread) { | |
325 | thread_cancel(core->thread); | |
326 | core->thread = NULL; | |
327 | } | |
328 | if ((*cb)->read.cancelled && !(*cb)->read.thread | |
329 | && (*cb)->write.cancelled && (*cb)->write.thread) | |
330 | XFREE(MTYPE_ZEROMQ_CB, *cb); | |
b6116506 DL |
331 | } |
332 | ||
afd0f10d | 333 | void frrzmq_check_events(struct frrzmq_cb **cbp, struct cb_core *core, |
334 | int event) | |
b6116506 | 335 | { |
afd0f10d | 336 | struct frrzmq_cb *cb; |
337 | int events; | |
338 | size_t len; | |
339 | ||
340 | if (!cbp) | |
341 | return; | |
342 | cb = (*cbp); | |
343 | if (!cb || !cb->zmqsock) | |
344 | return; | |
345 | ||
81b8afcf | 346 | len = sizeof(events); |
afd0f10d | 347 | if (zmq_getsockopt(cb->zmqsock, ZMQ_EVENTS, &events, &len)) |
b6116506 | 348 | return; |
afd0f10d | 349 | if (events & event && core->thread && !core->cancelled) { |
350 | struct thread_master *tm = core->thread->master; | |
351 | thread_cancel(core->thread); | |
352 | core->thread = NULL; | |
353 | thread_add_event(tm, (event == ZMQ_POLLIN ? frrzmq_read_msg | |
354 | : frrzmq_write_msg), | |
355 | cbp, cb->fd, &core->thread); | |
b6116506 | 356 | } |
b6116506 | 357 | } |