]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
zebra: data plane plugin for FPM netlink
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22#include <arpa/inet.h>
23
24#include <sys/types.h>
25#include <sys/socket.h>
26
27#include <errno.h>
28#include <string.h>
29
30#include "config.h" /* Include this explicitly */
31#include "lib/zebra.h"
32#include "lib/libfrr.h"
33#include "lib/memory.h"
34#include "lib/network.h"
35#include "lib/ns.h"
36#include "lib/frr_pthread.h"
37#include "zebra/zebra_dplane.h"
38#include "zebra/kernel_netlink.h"
39#include "zebra/rt_netlink.h"
40#include "zebra/debug.h"
41
42#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
43#define SOUTHBOUND_DEFAULT_PORT 2620
44
45static const char *prov_name = "dplane_fpm_nl";
46
47struct fpm_nl_ctx {
48 /* data plane connection. */
49 int socket;
50 bool connecting;
51 struct sockaddr_storage addr;
52
53 /* data plane buffers. */
54 struct stream *ibuf;
55 struct stream *obuf;
56 pthread_mutex_t obuf_mutex;
57
58 /* data plane events. */
59 struct frr_pthread *fthread;
60 struct thread *t_connect;
61 struct thread *t_read;
62 struct thread *t_write;
63};
64
65/*
66 * FPM functions.
67 */
68static int fpm_connect(struct thread *t);
69
70static void fpm_reconnect(struct fpm_nl_ctx *fnc)
71{
72 /* Grab the lock to empty the stream and stop the zebra thread. */
73 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
74
75 close(fnc->socket);
76 fnc->socket = -1;
77 stream_reset(fnc->ibuf);
78 stream_reset(fnc->obuf);
79 THREAD_OFF(fnc->t_read);
80 THREAD_OFF(fnc->t_write);
81 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
82 &fnc->t_connect);
83}
84
85static int fpm_read(struct thread *t)
86{
87 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
88 ssize_t rv;
89
90 /* Let's ignore the input at the moment. */
91 rv = stream_read_try(fnc->ibuf, fnc->socket,
92 STREAM_WRITEABLE(fnc->ibuf));
93 if (rv == 0) {
94 zlog_debug("%s: connection closed", __func__);
95 fpm_reconnect(fnc);
96 return 0;
97 }
98 if (rv == -1) {
99 if (errno == EAGAIN || errno == EWOULDBLOCK
100 || errno == EINTR)
101 return 0;
102
103 zlog_debug("%s: connection failure: %s", __func__,
104 strerror(errno));
105 fpm_reconnect(fnc);
106 return 0;
107 }
108 stream_reset(fnc->ibuf);
109
110 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
111 &fnc->t_read);
112
113 return 0;
114}
115
116static int fpm_write(struct thread *t)
117{
118 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
119 socklen_t statuslen;
120 ssize_t bwritten;
121 int rv, status;
122 size_t btotal;
123
124 if (fnc->connecting == true) {
125 status = 0;
126 statuslen = sizeof(status);
127
128 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
129 &statuslen);
130 if (rv == -1 || status != 0) {
131 if (rv != -1)
132 zlog_debug("%s: connection failed: %s",
133 __func__, strerror(status));
134 else
135 zlog_debug("%s: SO_ERROR failed: %s", __func__,
136 strerror(status));
137
138 fpm_reconnect(fnc);
139 return 0;
140 }
141
142 fnc->connecting = false;
143 }
144
145 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
146
147 while (true) {
148 /* Stream is empty: reset pointers and return. */
149 if (STREAM_READABLE(fnc->obuf) == 0) {
150 stream_reset(fnc->obuf);
151 break;
152 }
153
154 /* Try to write all at once. */
155 btotal = stream_get_endp(fnc->obuf) -
156 stream_get_getp(fnc->obuf);
157 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
158 if (bwritten == 0) {
159 zlog_debug("%s: connection closed", __func__);
160 break;
161 }
162 if (bwritten == -1) {
163 if (errno == EAGAIN || errno == EWOULDBLOCK
164 || errno == EINTR)
165 break;
166
167 zlog_debug("%s: connection failure: %s", __func__,
168 strerror(errno));
169 fpm_reconnect(fnc);
170 break;
171 }
172
173 stream_forward_getp(fnc->obuf, (size_t)bwritten);
174 }
175
176 /* Stream is not empty yet, we must schedule more writes. */
177 if (STREAM_READABLE(fnc->obuf)) {
178 thread_add_write(fnc->fthread->master, fpm_write, fnc,
179 fnc->socket, &fnc->t_write);
180 return 0;
181 }
182
183 return 0;
184}
185
186static int fpm_connect(struct thread *t)
187{
188 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
189 struct sockaddr_in *sin;
190 int rv, sock;
191 char addrstr[INET6_ADDRSTRLEN];
192
193 sock = socket(AF_INET, SOCK_STREAM, 0);
194 if (sock == -1) {
195 zlog_err("%s: fpm connection failed: %s", __func__,
196 strerror(errno));
197 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
198 &fnc->t_connect);
199 return 0;
200 }
201
202 set_nonblocking(sock);
203
204 sin = (struct sockaddr_in *)&fnc->addr;
205 memset(sin, 0, sizeof(*sin));
206 sin->sin_family = AF_INET;
207 sin->sin_addr.s_addr = htonl(SOUTHBOUND_DEFAULT_ADDR);
208 sin->sin_port = htons(SOUTHBOUND_DEFAULT_PORT);
209#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
210 sin->sin_len = sizeof(sin);
211#endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
212
213 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
214 zlog_debug("%s: attempting to connect to %s:%d", __func__, addrstr,
215 ntohs(sin->sin_port));
216
217 rv = connect(sock, (struct sockaddr *)sin, sizeof(*sin));
218 if (rv == -1 && errno != EINPROGRESS) {
219 close(sock);
220 zlog_warn("%s: fpm connection failed: %s", __func__,
221 strerror(errno));
222 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
223 &fnc->t_connect);
224 return 0;
225 }
226
227 fnc->connecting = (errno == EINPROGRESS);
228 fnc->socket = sock;
229 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
230 &fnc->t_read);
231 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
232 &fnc->t_write);
233
234 return 0;
235}
236
237/**
238 * Encode data plane operation context into netlink and enqueue it in the FPM
239 * output buffer.
240 *
241 * @param fnc the netlink FPM context.
242 * @param ctx the data plane operation context data.
243 * @return 0 on success or -1 on not enough space.
244 */
245static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
246{
247 uint8_t nl_buf[NL_PKT_BUF_SIZE];
248 size_t nl_buf_len;
249 ssize_t rv;
250
251 nl_buf_len = 0;
252
253 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
254
255 switch (dplane_ctx_get_op(ctx)) {
256 case DPLANE_OP_ROUTE_UPDATE:
257 case DPLANE_OP_ROUTE_DELETE:
258 rv = netlink_route_multipath(RTM_DELROUTE, ctx, nl_buf,
259 sizeof(nl_buf));
260 if (rv <= 0) {
261 zlog_debug("%s: netlink_route_multipath failed",
262 __func__);
263 return 0;
264 }
265
266 nl_buf_len = (size_t)rv;
267 if (STREAM_WRITEABLE(fnc->obuf) < nl_buf_len) {
268 zlog_debug("%s: not enough output buffer (%ld vs %lu)",
269 __func__, STREAM_WRITEABLE(fnc->obuf),
270 nl_buf_len);
271 return -1;
272 }
273
274 /* UPDATE operations need a INSTALL, otherwise just quit. */
275 if (dplane_ctx_get_op(ctx) == DPLANE_OP_ROUTE_DELETE)
276 break;
277
278 /* FALL THROUGH */
279 case DPLANE_OP_ROUTE_INSTALL:
280 rv = netlink_route_multipath(RTM_NEWROUTE, ctx,
281 &nl_buf[nl_buf_len],
282 sizeof(nl_buf) - nl_buf_len);
283 if (rv <= 0) {
284 zlog_debug("%s: netlink_route_multipath failed",
285 __func__);
286 return 0;
287 }
288
289 nl_buf_len += (size_t)rv;
290 if (STREAM_WRITEABLE(fnc->obuf) < nl_buf_len) {
291 zlog_debug("%s: not enough output buffer (%ld vs %lu)",
292 __func__, STREAM_WRITEABLE(fnc->obuf),
293 nl_buf_len);
294 return -1;
295 }
296 break;
297
298 case DPLANE_OP_NH_INSTALL:
299 case DPLANE_OP_NH_UPDATE:
300 case DPLANE_OP_NH_DELETE:
301 case DPLANE_OP_LSP_INSTALL:
302 case DPLANE_OP_LSP_UPDATE:
303 case DPLANE_OP_LSP_DELETE:
304 case DPLANE_OP_PW_INSTALL:
305 case DPLANE_OP_PW_UNINSTALL:
306 case DPLANE_OP_ADDR_INSTALL:
307 case DPLANE_OP_ADDR_UNINSTALL:
308 case DPLANE_OP_MAC_INSTALL:
309 case DPLANE_OP_MAC_DELETE:
310 case DPLANE_OP_NEIGH_INSTALL:
311 case DPLANE_OP_NEIGH_UPDATE:
312 case DPLANE_OP_NEIGH_DELETE:
313 case DPLANE_OP_VTEP_ADD:
314 case DPLANE_OP_VTEP_DELETE:
315 case DPLANE_OP_SYS_ROUTE_ADD:
316 case DPLANE_OP_SYS_ROUTE_DELETE:
317 case DPLANE_OP_ROUTE_NOTIFY:
318 case DPLANE_OP_LSP_NOTIFY:
319 case DPLANE_OP_NONE:
320 break;
321
322 default:
323 zlog_debug("%s: unhandled data plane message (%d) %s",
324 __func__, dplane_ctx_get_op(ctx),
325 dplane_op2str(dplane_ctx_get_op(ctx)));
326 break;
327 }
328
329 /* Skip empty enqueues. */
330 if (nl_buf_len == 0)
331 return 0;
332
333 /*
334 * FPM header:
335 * {
336 * version: 1 byte (always 1),
337 * type: 1 byte (1 for netlink, 2 protobuf),
338 * len: 2 bytes (network order),
339 * }
340 */
341 stream_putc(fnc->obuf, 1);
342 stream_putc(fnc->obuf, 1);
343 assert(nl_buf_len < UINT16_MAX);
344 stream_putw(fnc->obuf, nl_buf_len + 4);
345
346 /* Write current data. */
347 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
348
349 /* Tell the thread to start writing. */
350 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
351 &fnc->t_write);
352
353 return 0;
354}
355
356/*
357 * Data plane functions.
358 */
359static int fpm_nl_start(struct zebra_dplane_provider *prov)
360{
361 struct fpm_nl_ctx *fnc;
362
363 fnc = dplane_provider_get_data(prov);
364 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
365 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
366 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
367 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
368 pthread_mutex_init(&fnc->obuf_mutex, NULL);
369 fnc->socket = -1;
370
371 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 1,
372 &fnc->t_connect);
373
374 return 0;
375}
376
377static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
378{
379 struct fpm_nl_ctx *fnc;
380
381 fnc = dplane_provider_get_data(prov);
382 stream_free(fnc->ibuf);
383 stream_free(fnc->obuf);
384 close(fnc->socket);
385
386 return 0;
387}
388
389static int fpm_nl_process(struct zebra_dplane_provider *prov)
390{
391 struct zebra_dplane_ctx *ctx;
392 struct fpm_nl_ctx *fnc;
393 int counter, limit;
394
395 fnc = dplane_provider_get_data(prov);
396 limit = dplane_provider_get_work_limit(prov);
397 for (counter = 0; counter < limit; counter++) {
398 ctx = dplane_provider_dequeue_in_ctx(prov);
399 if (ctx == NULL)
400 break;
401
402 /*
403 * Skip all notifications if not connected, we'll walk the RIB
404 * anyway.
405 */
406 if (fnc->socket != -1 && fnc->connecting == false)
407 fpm_nl_enqueue(fnc, ctx);
408
409 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
410 dplane_provider_enqueue_out_ctx(prov, ctx);
411 }
412
413 return 0;
414}
415
416static int fpm_nl_new(struct thread_master *tm)
417{
418 struct zebra_dplane_provider *prov = NULL;
419 struct fpm_nl_ctx *fnc;
420 int rv;
421
422 fnc = calloc(1, sizeof(*fnc));
423 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
424 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
425 fpm_nl_process, fpm_nl_finish, fnc,
426 &prov);
427
428 if (IS_ZEBRA_DEBUG_DPLANE)
429 zlog_debug("%s register status: %d", prov_name, rv);
430
431 return 0;
432}
433
434static int fpm_nl_init(void)
435{
436 hook_register(frr_late_init, fpm_nl_new);
437 return 0;
438}
439
440FRR_MODULE_SETUP(
441 .name = "dplane_fpm_nl",
442 .version = "0.0.1",
443 .description = "Data plane plugin for FPM using netlink.",
444 .init = fpm_nl_init,
445 )