]> git.proxmox.com Git - mirror_frr.git/blob - lib/frr_zmq.c
Merge pull request #1059 from opensourcerouting/oldbits-1
[mirror_frr.git] / lib / frr_zmq.c
1 /*
2 * libzebra ZeroMQ bindings
3 * Copyright (C) 2015 David Lamparter
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20 #include <zebra.h>
21 #include <zmq.h>
22
23 #include "thread.h"
24 #include "memory.h"
25 #include "frr_zmq.h"
26 #include "log.h"
27
28 DEFINE_MTYPE_STATIC(LIB, ZEROMQ_CB, "ZeroMQ callback")
29
30 /* libzmq's context */
31 void *frrzmq_context = NULL;
32 static unsigned frrzmq_initcount = 0;
33
34 void frrzmq_init(void)
35 {
36 if (frrzmq_initcount++ == 0) {
37 frrzmq_context = zmq_ctx_new();
38 zmq_ctx_set(frrzmq_context, ZMQ_IPV6, 1);
39 }
40 }
41
42 void frrzmq_finish(void)
43 {
44 if (--frrzmq_initcount == 0) {
45 zmq_ctx_term(frrzmq_context);
46 frrzmq_context = NULL;
47 }
48 }
49
50 /* read callback integration */
51 struct frrzmq_cb {
52 struct thread *thread;
53 void *zmqsock;
54 void *arg;
55 int fd;
56
57 bool cancelled;
58
59 void (*cb_msg)(void *arg, void *zmqsock);
60 void (*cb_part)(void *arg, void *zmqsock,
61 zmq_msg_t *msg, unsigned partnum);
62 };
63
64
65 static int frrzmq_read_msg(struct thread *t)
66 {
67 struct frrzmq_cb *cb = THREAD_ARG(t);
68 zmq_msg_t msg;
69 unsigned partno;
70 int ret, more;
71 size_t moresz;
72
73 while (1) {
74 zmq_pollitem_t polli = {
75 .socket = cb->zmqsock,
76 .events = ZMQ_POLLIN
77 };
78 ret = zmq_poll(&polli, 1, 0);
79
80 if (ret < 0)
81 goto out_err;
82 if (!(polli.revents & ZMQ_POLLIN))
83 break;
84
85 if (cb->cb_msg) {
86 cb->cb_msg(cb->arg, cb->zmqsock);
87
88 if (cb->cancelled) {
89 XFREE(MTYPE_ZEROMQ_CB, cb);
90 return 0;
91 }
92 continue;
93 }
94
95 partno = 0;
96 if (zmq_msg_init(&msg))
97 goto out_err;
98 do {
99 ret = zmq_msg_recv(&msg, cb->zmqsock, ZMQ_NOBLOCK);
100 if (ret < 0) {
101 if (errno == EAGAIN)
102 break;
103
104 zmq_msg_close(&msg);
105 goto out_err;
106 }
107
108 cb->cb_part(cb->arg, cb->zmqsock, &msg, partno);
109 if (cb->cancelled) {
110 zmq_msg_close(&msg);
111 XFREE(MTYPE_ZEROMQ_CB, cb);
112 return 0;
113 }
114
115 /* cb_part may have read additional parts of the
116 * message; don't use zmq_msg_more here */
117 moresz = sizeof(more);
118 more = 0;
119 ret = zmq_getsockopt(cb->zmqsock, ZMQ_RCVMORE,
120 &more, &moresz);
121 if (ret < 0) {
122 zmq_msg_close(&msg);
123 goto out_err;
124 }
125
126 partno++;
127 } while (more);
128 zmq_msg_close(&msg);
129 }
130
131 funcname_thread_add_read_write(THREAD_READ, t->master, frrzmq_read_msg,
132 cb, cb->fd, &cb->thread, t->funcname, t->schedfrom,
133 t->schedfrom_line);
134 return 0;
135
136 out_err:
137 zlog_err("ZeroMQ error: %s(%d)", strerror (errno), errno);
138 return 0;
139 }
140
141 struct frrzmq_cb *funcname_frrzmq_thread_add_read(
142 struct thread_master *master,
143 void (*msgfunc)(void *arg, void *zmqsock),
144 void (*partfunc)(void *arg, void *zmqsock,
145 zmq_msg_t *msg, unsigned partnum),
146 void *arg, void *zmqsock, debugargdef)
147 {
148 int fd, events;
149 size_t len;
150 struct frrzmq_cb *cb;
151
152 if (!(msgfunc || partfunc) || (msgfunc && partfunc))
153 return NULL;
154 len = sizeof(fd);
155 if (zmq_getsockopt(zmqsock, ZMQ_FD, &fd, &len))
156 return NULL;
157 len = sizeof(events);
158 if (zmq_getsockopt(zmqsock, ZMQ_EVENTS, &events, &len))
159 return NULL;
160
161 cb = XCALLOC(MTYPE_ZEROMQ_CB, sizeof(struct frrzmq_cb));
162 if (!cb)
163 return NULL;
164
165 cb->arg = arg;
166 cb->zmqsock = zmqsock;
167 cb->cb_msg = msgfunc;
168 cb->cb_part = partfunc;
169 cb->fd = fd;
170
171 if (events & ZMQ_POLLIN)
172 funcname_thread_add_event(master,
173 frrzmq_read_msg, cb, fd, &cb->thread,
174 funcname, schedfrom, fromln);
175 else
176 funcname_thread_add_read_write(THREAD_READ, master,
177 frrzmq_read_msg, cb, fd, &cb->thread,
178 funcname, schedfrom, fromln);
179 return cb;
180 }
181
182 void frrzmq_thread_cancel(struct frrzmq_cb *cb)
183 {
184 if (!cb->thread) {
185 /* canceling from within callback */
186 cb->cancelled = 1;
187 return;
188 }
189 thread_cancel(cb->thread);
190 XFREE(MTYPE_ZEROMQ_CB, cb);
191 }