]>
Commit | Line | Data |
---|---|---|
b6116506 DL |
1 | /* |
2 | * libzebra ZeroMQ bindings | |
3 | * Copyright (C) 2015 David Lamparter | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify it | |
6 | * under the terms of the GNU General Public License as published by the Free | |
7 | * Software Foundation; either version 2 of the License, or (at your option) | |
8 | * any later version. | |
9 | * | |
10 | * This program is distributed in the hope that it will be useful, but WITHOUT | |
11 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
12 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
13 | * more details. | |
14 | * | |
15 | * You should have received a copy of the GNU General Public License along | |
16 | * with this program; see the file COPYING; if not, write to the Free Software | |
17 | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
18 | */ | |
19 | ||
20 | #include <zebra.h> | |
21 | #include <zmq.h> | |
22 | ||
23 | #include "thread.h" | |
24 | #include "memory.h" | |
25 | #include "frr_zmq.h" | |
26 | #include "log.h" | |
27 | ||
28 | DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback") | |
29 | ||
30 | /* libzmq's context */ | |
31 | void *frrzmq_context = NULL; | |
32 | static unsigned frrzmq_initcount = 0; | |
33 | ||
34 | void frrzmq_init(void) | |
35 | { | |
36 | if (frrzmq_initcount++ == 0) { | |
37 | frrzmq_context = zmq_ctx_new(); | |
38 | zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1); | |
39 | } | |
40 | } | |
41 | ||
42 | void frrzmq_finish(void) | |
43 | { | |
44 | if (--frrzmq_initcount == 0) { | |
45 | zmq_ctx_term(frrzmq_context); | |
46 | frrzmq_context = NULL; | |
47 | } | |
48 | } | |
49 | ||
50 | /* read callback integration */ | |
51 | struct frrzmq_cb { | |
52 | struct thread *thread; | |
53 | void *zmqsock; | |
54 | void *arg; | |
55 | int fd; | |
56 | ||
57 | bool cancelled; | |
58 | ||
59 | void (*cb_msg)(void *arg, void *zmqsock); | |
60 | void (*cb_part)(void *arg, void *zmqsock, | |
61 | zmq_msg_t *msg, unsigned partnum); | |
62 | }; | |
63 | ||
64 | ||
65 | static int frrzmq_read_msg(struct thread *t) | |
66 | { | |
67 | struct frrzmq_cb *cb = THREAD_ARG(t); | |
68 | zmq_msg_t msg; | |
69 | unsigned partno; | |
70 | int ret, more; | |
71 | size_t moresz; | |
72 | ||
73 | while (1) { | |
74 | zmq_pollitem_t polli = { | |
75 | .socket = cb->zmqsock, | |
76 | .events = ZMQ_POLLIN | |
77 | }; | |
78 | ret = zmq_poll(&polli, 1, 0); | |
79 | ||
80 | if (ret < 0) | |
81 | goto out_err; | |
82 | if (!(polli.revents & ZMQ_POLLIN)) | |
83 | break; | |
84 | ||
85 | if (cb->cb_msg) { | |
86 | cb->cb_msg(cb->arg, cb->zmqsock); | |
87 | ||
88 | if (cb->cancelled) { | |
89 | XFREE(MTYPE_ZEROMQ_CB, cb); | |
90 | return 0; | |
91 | } | |
92 | continue; | |
93 | } | |
94 | ||
95 | partno = 0; | |
96 | if (zmq_msg_init(&msg)) | |
97 | goto out_err; | |
98 | do { | |
99 | ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK); | |
100 | if (ret < 0) { | |
101 | if (errno == EAGAIN) | |
102 | break; | |
103 | ||
104 | zmq_msg_close(&msg); | |
105 | goto out_err; | |
106 | } | |
107 | ||
108 | cb->cb_part(cb->arg, cb->zmqsock, &msg, partno); | |
109 | if (cb->cancelled) { | |
110 | zmq_msg_close(&msg); | |
111 | XFREE(MTYPE_ZEROMQ_CB, cb); | |
112 | return 0; | |
113 | } | |
114 | ||
115 | /* cb_part may have read additional parts of the | |
116 | * message; don't use zmq_msg_more here */ | |
117 | moresz = sizeof(more); | |
118 | more = 0; | |
119 | ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE, | |
120 | &more, &moresz); | |
121 | if (ret < 0) { | |
122 | zmq_msg_close(&msg); | |
123 | goto out_err; | |
124 | } | |
125 | ||
126 | partno++; | |
127 | } while (more); | |
128 | zmq_msg_close(&msg); | |
129 | } | |
130 | ||
131 | funcname_thread_add_read_write(THREAD_READ, t->master, frrzmq_read_msg, | |
132 | cb, cb->fd, &cb->thread, t->funcname, t->schedfrom, | |
133 | t->schedfrom_line); | |
134 | return 0; | |
135 | ||
136 | out_err: | |
137 | zlog_err("ZeroMQ error: %s(%d)", strerror (errno), errno); | |
138 | return 0; | |
139 | } | |
140 | ||
141 | struct frrzmq_cb *funcname_frrzmq_thread_add_read( | |
142 | struct thread_master *master, | |
143 | void (*msgfunc)(void *arg, void *zmqsock), | |
144 | void (*partfunc)(void *arg, void *zmqsock, | |
145 | zmq_msg_t *msg, unsigned partnum), | |
146 | void *arg, void *zmqsock, debugargdef) | |
147 | { | |
148 | int fd, events; | |
149 | size_t len; | |
150 | struct frrzmq_cb *cb; | |
151 | ||
152 | if (!(msgfunc || partfunc) || (msgfunc && partfunc)) | |
153 | return NULL; | |
154 | len = sizeof(fd); | |
155 | if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len)) | |
156 | return NULL; | |
157 | len = sizeof(events); | |
158 | if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len)) | |
159 | return NULL; | |
160 | ||
161 | cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb)); | |
162 | if (!cb) | |
163 | return NULL; | |
164 | ||
165 | cb->arg = arg; | |
166 | cb->zmqsock = zmqsock; | |
167 | cb->cb_msg = msgfunc; | |
168 | cb->cb_part = partfunc; | |
169 | cb->fd = fd; | |
170 | ||
171 | if (events & ZMQ_POLLIN) | |
172 | funcname_thread_add_event(master, | |
173 | frrzmq_read_msg, cb, fd, &cb->thread, | |
174 | funcname, schedfrom, fromln); | |
175 | else | |
176 | funcname_thread_add_read_write(THREAD_READ, master, | |
177 | frrzmq_read_msg, cb, fd, &cb->thread, | |
178 | funcname, schedfrom, fromln); | |
179 | return cb; | |
180 | } | |
181 | ||
182 | void frrzmq_thread_cancel(struct frrzmq_cb *cb) | |
183 | { | |
184 | if (!cb->thread) { | |
185 | /* canceling from within callback */ | |
186 | cb->cancelled = 1; | |
187 | return; | |
188 | } | |
189 | thread_cancel(cb->thread); | |
190 | XFREE(MTYPE_ZEROMQ_CB, cb); | |
191 | } |