]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
*: remove cmd_node->vtysh
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22#include <arpa/inet.h>
23
24#include <sys/types.h>
25#include <sys/socket.h>
26
27#include <errno.h>
28#include <string.h>
29
30#include "config.h" /* Include this explicitly */
31#include "lib/zebra.h"
6cc059cd 32#include "lib/json.h"
d35f447d 33#include "lib/libfrr.h"
c871e6c9 34#include "lib/frratomic.h"
3bdd7fca 35#include "lib/command.h"
d35f447d
RZ
36#include "lib/memory.h"
37#include "lib/network.h"
38#include "lib/ns.h"
39#include "lib/frr_pthread.h"
e5e444d8 40#include "zebra/debug.h"
bda10adf 41#include "zebra/interface.h"
d35f447d 42#include "zebra/zebra_dplane.h"
018e77bc 43#include "zebra/zebra_router.h"
bda10adf 44#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
45#include "zebra/kernel_netlink.h"
46#include "zebra/rt_netlink.h"
47#include "zebra/debug.h"
48
49#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
50#define SOUTHBOUND_DEFAULT_PORT 2620
51
a179ba35
RZ
52/**
53 * FPM header:
54 * {
55 * version: 1 byte (always 1),
56 * type: 1 byte (1 for netlink, 2 protobuf),
57 * len: 2 bytes (network order),
58 * }
59 *
60 * This header is used with any format to tell the users how many bytes to
61 * expect.
62 */
63#define FPM_HEADER_SIZE 4
64
d35f447d
RZ
65static const char *prov_name = "dplane_fpm_nl";
66
67struct fpm_nl_ctx {
68 /* data plane connection. */
69 int socket;
3bdd7fca 70 bool disabled;
d35f447d 71 bool connecting;
018e77bc 72 bool rib_complete;
bda10adf 73 bool rmac_complete;
d35f447d
RZ
74 struct sockaddr_storage addr;
75
76 /* data plane buffers. */
77 struct stream *ibuf;
78 struct stream *obuf;
79 pthread_mutex_t obuf_mutex;
80
ba803a2f
RZ
81 /*
82 * data plane context queue:
83 * When a FPM server connection becomes a bottleneck, we must keep the
84 * data plane contexts until we get a chance to process them.
85 */
86 struct dplane_ctx_q ctxqueue;
87 pthread_mutex_t ctxqueue_mutex;
88
d35f447d 89 /* data plane events. */
ba803a2f 90 struct zebra_dplane_provider *prov;
d35f447d
RZ
91 struct frr_pthread *fthread;
92 struct thread *t_connect;
93 struct thread *t_read;
94 struct thread *t_write;
3bdd7fca 95 struct thread *t_event;
ba803a2f 96 struct thread *t_dequeue;
018e77bc
RZ
97
98 /* zebra events. */
99 struct thread *t_ribreset;
100 struct thread *t_ribwalk;
bda10adf
RZ
101 struct thread *t_rmacreset;
102 struct thread *t_rmacwalk;
6cc059cd
RZ
103
104 /* Statistic counters. */
105 struct {
106 /* Amount of bytes read into ibuf. */
770a8d28 107 _Atomic uint32_t bytes_read;
6cc059cd 108 /* Amount of bytes written from obuf. */
770a8d28 109 _Atomic uint32_t bytes_sent;
ad4d1022 110 /* Output buffer current usage. */
770a8d28 111 _Atomic uint32_t obuf_bytes;
ad4d1022 112 /* Output buffer peak usage. */
770a8d28 113 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
114
115 /* Amount of connection closes. */
770a8d28 116 _Atomic uint32_t connection_closes;
6cc059cd 117 /* Amount of connection errors. */
770a8d28 118 _Atomic uint32_t connection_errors;
6cc059cd
RZ
119
120 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 121 _Atomic uint32_t user_configures;
6cc059cd 122 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 123 _Atomic uint32_t user_disables;
6cc059cd
RZ
124
125 /* Amount of data plane context processed. */
770a8d28 126 _Atomic uint32_t dplane_contexts;
ba803a2f 127 /* Amount of data plane contexts enqueued. */
770a8d28 128 _Atomic uint32_t ctxqueue_len;
ba803a2f 129 /* Peak amount of data plane contexts enqueued. */
770a8d28 130 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
131
132 /* Amount of buffer full events. */
770a8d28 133 _Atomic uint32_t buffer_full;
6cc059cd 134 } counters;
770a8d28 135} *gfnc;
3bdd7fca
RZ
136
137enum fpm_nl_events {
138 /* Ask for FPM to reconnect the external server. */
139 FNE_RECONNECT,
140 /* Disable FPM. */
141 FNE_DISABLE,
6cc059cd
RZ
142 /* Reset counters. */
143 FNE_RESET_COUNTERS,
d35f447d
RZ
144};
145
018e77bc
RZ
146/*
147 * Prototypes.
148 */
3bdd7fca 149static int fpm_process_event(struct thread *t);
018e77bc
RZ
150static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
151static int fpm_rib_send(struct thread *t);
152static int fpm_rib_reset(struct thread *t);
bda10adf
RZ
153static int fpm_rmac_send(struct thread *t);
154static int fpm_rmac_reset(struct thread *t);
018e77bc 155
ad4d1022
RZ
156/*
157 * Helper functions.
158 */
159
160/**
161 * Reorganizes the data on the buffer so it can fit more data.
162 *
163 * @param s stream pointer.
164 */
165static void stream_pulldown(struct stream *s)
166{
167 size_t rlen = STREAM_READABLE(s);
168
169 /* No more data, so just move the pointers. */
170 if (rlen == 0) {
171 stream_reset(s);
172 return;
173 }
174
175 /* Move the available data to the beginning. */
176 memmove(s->data, &s->data[s->getp], rlen);
177 s->getp = 0;
178 s->endp = rlen;
179}
180
3bdd7fca
RZ
181/*
182 * CLI.
183 */
6cc059cd
RZ
184#define FPM_STR "Forwarding Plane Manager configuration\n"
185
3bdd7fca
RZ
186DEFUN(fpm_set_address, fpm_set_address_cmd,
187 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 188 FPM_STR
3bdd7fca
RZ
189 "FPM remote listening server address\n"
190 "Remote IPv4 FPM server\n"
191 "Remote IPv6 FPM server\n"
192 "FPM remote listening server port\n"
193 "Remote FPM server port\n")
194{
195 struct sockaddr_in *sin;
196 struct sockaddr_in6 *sin6;
197 uint16_t port = 0;
198 uint8_t naddr[INET6_BUFSIZ];
199
200 if (argc == 5)
201 port = strtol(argv[4]->arg, NULL, 10);
202
203 /* Handle IPv4 addresses. */
204 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
205 sin = (struct sockaddr_in *)&gfnc->addr;
206
207 memset(sin, 0, sizeof(*sin));
208 sin->sin_family = AF_INET;
209 sin->sin_port =
210 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
211#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
212 sin->sin_len = sizeof(*sin);
213#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
214 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
215
216 goto ask_reconnect;
217 }
218
219 /* Handle IPv6 addresses. */
220 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
221 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
222 return CMD_WARNING;
223 }
224
225 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
226 memset(sin6, 0, sizeof(*sin6));
227 sin6->sin6_family = AF_INET6;
228 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
229#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
230 sin6->sin6_len = sizeof(*sin6);
231#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
232 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
233
234ask_reconnect:
235 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
236 FNE_RECONNECT, &gfnc->t_event);
237 return CMD_SUCCESS;
238}
239
240DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
241 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
242 NO_STR
6cc059cd 243 FPM_STR
3bdd7fca
RZ
244 "FPM remote listening server address\n"
245 "Remote IPv4 FPM server\n"
246 "Remote IPv6 FPM server\n"
247 "FPM remote listening server port\n"
248 "Remote FPM server port\n")
249{
250 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
251 FNE_DISABLE, &gfnc->t_event);
252 return CMD_SUCCESS;
253}
254
6cc059cd
RZ
255DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
256 "clear fpm counters",
257 CLEAR_STR
258 FPM_STR
259 "FPM statistic counters\n")
260{
261 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
262 FNE_RESET_COUNTERS, &gfnc->t_event);
263 return CMD_SUCCESS;
264}
265
266DEFUN(fpm_show_counters, fpm_show_counters_cmd,
267 "show fpm counters",
268 SHOW_STR
269 FPM_STR
270 "FPM statistic counters\n")
271{
272 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
273
274#define SHOW_COUNTER(label, counter) \
770a8d28 275 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
276
277 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
278 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
279 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
280 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
281 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
282 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
283 SHOW_COUNTER("Data plane items processed",
284 gfnc->counters.dplane_contexts);
ba803a2f
RZ
285 SHOW_COUNTER("Data plane items enqueued",
286 gfnc->counters.ctxqueue_len);
287 SHOW_COUNTER("Data plane items queue peak",
288 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
289 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
290 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
291 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
292
293#undef SHOW_COUNTER
294
295 return CMD_SUCCESS;
296}
297
298DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
299 "show fpm counters json",
300 SHOW_STR
301 FPM_STR
302 "FPM statistic counters\n"
303 JSON_STR)
304{
305 struct json_object *jo;
306
307 jo = json_object_new_object();
308 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
309 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
310 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
311 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
312 json_object_int_add(jo, "connection-closes",
313 gfnc->counters.connection_closes);
314 json_object_int_add(jo, "connection-errors",
315 gfnc->counters.connection_errors);
316 json_object_int_add(jo, "data-plane-contexts",
317 gfnc->counters.dplane_contexts);
ba803a2f
RZ
318 json_object_int_add(jo, "data-plane-contexts-queue",
319 gfnc->counters.ctxqueue_len);
320 json_object_int_add(jo, "data-plane-contexts-queue-peak",
321 gfnc->counters.ctxqueue_len_peak);
6cc059cd 322 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
323 json_object_int_add(jo, "user-configures",
324 gfnc->counters.user_configures);
6cc059cd
RZ
325 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
326 vty_out(vty, "%s\n", json_object_to_json_string_ext(jo, 0));
327 json_object_free(jo);
328
329 return CMD_SUCCESS;
330}
331
3bdd7fca
RZ
332static int fpm_write_config(struct vty *vty)
333{
334 struct sockaddr_in *sin;
335 struct sockaddr_in6 *sin6;
336 int written = 0;
337 char addrstr[INET6_ADDRSTRLEN];
338
339 if (gfnc->disabled)
340 return written;
341
342 switch (gfnc->addr.ss_family) {
343 case AF_INET:
344 written = 1;
345 sin = (struct sockaddr_in *)&gfnc->addr;
346 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
347 vty_out(vty, "fpm address %s", addrstr);
348 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
349 vty_out(vty, " port %d", ntohs(sin->sin_port));
350
351 vty_out(vty, "\n");
352 break;
353 case AF_INET6:
354 written = 1;
355 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
356 inet_ntop(AF_INET, &sin6->sin6_addr, addrstr, sizeof(addrstr));
357 vty_out(vty, "fpm address %s", addrstr);
358 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
359 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
360
361 vty_out(vty, "\n");
362 break;
363
364 default:
365 break;
366 }
367
368 return written;
369}
370
371struct cmd_node fpm_node = {
372 .node = VTY_NODE,
373 .prompt = "",
3bdd7fca
RZ
374};
375
d35f447d
RZ
376/*
377 * FPM functions.
378 */
379static int fpm_connect(struct thread *t);
380
381static void fpm_reconnect(struct fpm_nl_ctx *fnc)
382{
383 /* Grab the lock to empty the stream and stop the zebra thread. */
384 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
385
3bdd7fca
RZ
386 /* Avoid calling close on `-1`. */
387 if (fnc->socket != -1) {
388 close(fnc->socket);
389 fnc->socket = -1;
390 }
391
d35f447d
RZ
392 stream_reset(fnc->ibuf);
393 stream_reset(fnc->obuf);
394 THREAD_OFF(fnc->t_read);
395 THREAD_OFF(fnc->t_write);
018e77bc
RZ
396
397 if (fnc->t_ribreset)
398 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
399 if (fnc->t_ribwalk)
400 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
bda10adf
RZ
401 if (fnc->t_rmacreset)
402 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
403 if (fnc->t_rmacwalk)
404 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
018e77bc 405
3bdd7fca
RZ
406 /* FPM is disabled, don't attempt to connect. */
407 if (fnc->disabled)
408 return;
409
d35f447d
RZ
410 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
411 &fnc->t_connect);
412}
413
414static int fpm_read(struct thread *t)
415{
416 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
417 ssize_t rv;
418
419 /* Let's ignore the input at the moment. */
420 rv = stream_read_try(fnc->ibuf, fnc->socket,
421 STREAM_WRITEABLE(fnc->ibuf));
422 if (rv == 0) {
c871e6c9
RZ
423 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
424 memory_order_relaxed);
e5e444d8
RZ
425
426 if (IS_ZEBRA_DEBUG_FPM)
427 zlog_debug("%s: connection closed", __func__);
428
d35f447d
RZ
429 fpm_reconnect(fnc);
430 return 0;
431 }
432 if (rv == -1) {
433 if (errno == EAGAIN || errno == EWOULDBLOCK
434 || errno == EINTR)
435 return 0;
436
c871e6c9
RZ
437 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
438 memory_order_relaxed);
e5e444d8
RZ
439 zlog_warn("%s: connection failure: %s", __func__,
440 strerror(errno));
d35f447d
RZ
441 fpm_reconnect(fnc);
442 return 0;
443 }
444 stream_reset(fnc->ibuf);
445
6cc059cd 446 /* Account all bytes read. */
c871e6c9
RZ
447 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
448 memory_order_relaxed);
6cc059cd 449
d35f447d
RZ
450 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
451 &fnc->t_read);
452
453 return 0;
454}
455
456static int fpm_write(struct thread *t)
457{
458 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
459 socklen_t statuslen;
460 ssize_t bwritten;
461 int rv, status;
462 size_t btotal;
463
464 if (fnc->connecting == true) {
465 status = 0;
466 statuslen = sizeof(status);
467
468 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
469 &statuslen);
470 if (rv == -1 || status != 0) {
471 if (rv != -1)
e5e444d8
RZ
472 zlog_warn("%s: connection failed: %s", __func__,
473 strerror(status));
d35f447d 474 else
e5e444d8
RZ
475 zlog_warn("%s: SO_ERROR failed: %s", __func__,
476 strerror(status));
d35f447d 477
c871e6c9
RZ
478 atomic_fetch_add_explicit(
479 &fnc->counters.connection_errors, 1,
480 memory_order_relaxed);
6cc059cd 481
d35f447d
RZ
482 fpm_reconnect(fnc);
483 return 0;
484 }
485
486 fnc->connecting = false;
018e77bc
RZ
487
488 /* Ask zebra main thread to start walking the RIB table. */
489 thread_add_timer(zrouter.master, fpm_rib_send, fnc, 0,
490 &fnc->t_ribwalk);
bda10adf
RZ
491 thread_add_timer(zrouter.master, fpm_rmac_send, fnc, 0,
492 &fnc->t_rmacwalk);
d35f447d
RZ
493 }
494
495 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
496
497 while (true) {
498 /* Stream is empty: reset pointers and return. */
499 if (STREAM_READABLE(fnc->obuf) == 0) {
500 stream_reset(fnc->obuf);
501 break;
502 }
503
504 /* Try to write all at once. */
505 btotal = stream_get_endp(fnc->obuf) -
506 stream_get_getp(fnc->obuf);
507 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
508 if (bwritten == 0) {
c871e6c9
RZ
509 atomic_fetch_add_explicit(
510 &fnc->counters.connection_closes, 1,
511 memory_order_relaxed);
e5e444d8
RZ
512
513 if (IS_ZEBRA_DEBUG_FPM)
514 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
515 break;
516 }
517 if (bwritten == -1) {
ad4d1022
RZ
518 /* Attempt to continue if blocked by a signal. */
519 if (errno == EINTR)
520 continue;
521 /* Receiver is probably slow, lets give it some time. */
522 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
523 break;
524
c871e6c9
RZ
525 atomic_fetch_add_explicit(
526 &fnc->counters.connection_errors, 1,
527 memory_order_relaxed);
e5e444d8
RZ
528 zlog_warn("%s: connection failure: %s", __func__,
529 strerror(errno));
d35f447d
RZ
530 fpm_reconnect(fnc);
531 break;
532 }
533
6cc059cd 534 /* Account all bytes sent. */
c871e6c9
RZ
535 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
536 memory_order_relaxed);
6cc059cd 537
ad4d1022 538 /* Account number of bytes free. */
c871e6c9
RZ
539 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
540 memory_order_relaxed);
ad4d1022 541
d35f447d
RZ
542 stream_forward_getp(fnc->obuf, (size_t)bwritten);
543 }
544
545 /* Stream is not empty yet, we must schedule more writes. */
546 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 547 stream_pulldown(fnc->obuf);
d35f447d
RZ
548 thread_add_write(fnc->fthread->master, fpm_write, fnc,
549 fnc->socket, &fnc->t_write);
550 return 0;
551 }
552
553 return 0;
554}
555
556static int fpm_connect(struct thread *t)
557{
558 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
559 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
560 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
561 socklen_t slen;
d35f447d
RZ
562 int rv, sock;
563 char addrstr[INET6_ADDRSTRLEN];
564
3bdd7fca 565 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 566 if (sock == -1) {
6cc059cd 567 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
568 strerror(errno));
569 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
570 &fnc->t_connect);
571 return 0;
572 }
573
574 set_nonblocking(sock);
575
3bdd7fca
RZ
576 if (fnc->addr.ss_family == AF_INET) {
577 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
578 slen = sizeof(*sin);
579 } else {
580 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
581 slen = sizeof(*sin6);
582 }
d35f447d 583
e5e444d8
RZ
584 if (IS_ZEBRA_DEBUG_FPM)
585 zlog_debug("%s: attempting to connect to %s:%d", __func__,
586 addrstr, ntohs(sin->sin_port));
d35f447d 587
3bdd7fca 588 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 589 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
590 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
591 memory_order_relaxed);
d35f447d
RZ
592 close(sock);
593 zlog_warn("%s: fpm connection failed: %s", __func__,
594 strerror(errno));
595 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
596 &fnc->t_connect);
597 return 0;
598 }
599
600 fnc->connecting = (errno == EINPROGRESS);
601 fnc->socket = sock;
602 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
603 &fnc->t_read);
604 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
605 &fnc->t_write);
606
018e77bc
RZ
607 /* Mark all routes as unsent. */
608 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
609 &fnc->t_ribreset);
bda10adf
RZ
610 thread_add_timer(zrouter.master, fpm_rmac_reset, fnc, 0,
611 &fnc->t_rmacreset);
018e77bc 612
d35f447d
RZ
613 return 0;
614}
615
616/**
617 * Encode data plane operation context into netlink and enqueue it in the FPM
618 * output buffer.
619 *
620 * @param fnc the netlink FPM context.
621 * @param ctx the data plane operation context data.
622 * @return 0 on success or -1 on not enough space.
623 */
624static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
625{
626 uint8_t nl_buf[NL_PKT_BUF_SIZE];
627 size_t nl_buf_len;
628 ssize_t rv;
edfeff42 629 uint64_t obytes, obytes_peak;
d35f447d
RZ
630
631 nl_buf_len = 0;
632
633 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
634
635 switch (dplane_ctx_get_op(ctx)) {
636 case DPLANE_OP_ROUTE_UPDATE:
637 case DPLANE_OP_ROUTE_DELETE:
638 rv = netlink_route_multipath(RTM_DELROUTE, ctx, nl_buf,
f2a0ba3a 639 sizeof(nl_buf), true);
d35f447d 640 if (rv <= 0) {
e5e444d8
RZ
641 zlog_err("%s: netlink_route_multipath failed",
642 __func__);
d35f447d
RZ
643 return 0;
644 }
645
646 nl_buf_len = (size_t)rv;
d35f447d
RZ
647
648 /* UPDATE operations need a INSTALL, otherwise just quit. */
649 if (dplane_ctx_get_op(ctx) == DPLANE_OP_ROUTE_DELETE)
650 break;
651
652 /* FALL THROUGH */
653 case DPLANE_OP_ROUTE_INSTALL:
654 rv = netlink_route_multipath(RTM_NEWROUTE, ctx,
655 &nl_buf[nl_buf_len],
f2a0ba3a 656 sizeof(nl_buf) - nl_buf_len, true);
d35f447d 657 if (rv <= 0) {
e5e444d8
RZ
658 zlog_err("%s: netlink_route_multipath failed",
659 __func__);
d35f447d
RZ
660 return 0;
661 }
662
663 nl_buf_len += (size_t)rv;
d35f447d
RZ
664 break;
665
bda10adf
RZ
666 case DPLANE_OP_MAC_INSTALL:
667 case DPLANE_OP_MAC_DELETE:
668 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
669 if (rv <= 0) {
e5e444d8
RZ
670 zlog_err("%s: netlink_macfdb_update_ctx failed",
671 __func__);
bda10adf
RZ
672 return 0;
673 }
674
675 nl_buf_len = (size_t)rv;
bda10adf
RZ
676 break;
677
d35f447d
RZ
678 case DPLANE_OP_NH_INSTALL:
679 case DPLANE_OP_NH_UPDATE:
680 case DPLANE_OP_NH_DELETE:
681 case DPLANE_OP_LSP_INSTALL:
682 case DPLANE_OP_LSP_UPDATE:
683 case DPLANE_OP_LSP_DELETE:
684 case DPLANE_OP_PW_INSTALL:
685 case DPLANE_OP_PW_UNINSTALL:
686 case DPLANE_OP_ADDR_INSTALL:
687 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
688 case DPLANE_OP_NEIGH_INSTALL:
689 case DPLANE_OP_NEIGH_UPDATE:
690 case DPLANE_OP_NEIGH_DELETE:
691 case DPLANE_OP_VTEP_ADD:
692 case DPLANE_OP_VTEP_DELETE:
693 case DPLANE_OP_SYS_ROUTE_ADD:
694 case DPLANE_OP_SYS_ROUTE_DELETE:
695 case DPLANE_OP_ROUTE_NOTIFY:
696 case DPLANE_OP_LSP_NOTIFY:
697 case DPLANE_OP_NONE:
698 break;
699
700 default:
e5e444d8
RZ
701 if (IS_ZEBRA_DEBUG_FPM)
702 zlog_debug("%s: unhandled data plane message (%d) %s",
703 __func__, dplane_ctx_get_op(ctx),
704 dplane_op2str(dplane_ctx_get_op(ctx)));
d35f447d
RZ
705 break;
706 }
707
708 /* Skip empty enqueues. */
709 if (nl_buf_len == 0)
710 return 0;
711
a179ba35
RZ
712 /* We must know if someday a message goes beyond 65KiB. */
713 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
714
715 /* Check if we have enough buffer space. */
716 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
717 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
718 memory_order_relaxed);
e5e444d8
RZ
719
720 if (IS_ZEBRA_DEBUG_FPM)
721 zlog_debug(
722 "%s: buffer full: wants to write %zu but has %zu",
723 __func__, nl_buf_len + FPM_HEADER_SIZE,
724 STREAM_WRITEABLE(fnc->obuf));
725
a179ba35
RZ
726 return -1;
727 }
728
d35f447d 729 /*
a179ba35
RZ
730 * Fill in the FPM header information.
731 *
732 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
733 */
734 stream_putc(fnc->obuf, 1);
735 stream_putc(fnc->obuf, 1);
a179ba35 736 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
737
738 /* Write current data. */
739 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
740
ad4d1022 741 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
742 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
743 nl_buf_len + FPM_HEADER_SIZE,
744 memory_order_relaxed);
edfeff42
RZ
745 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
746 memory_order_relaxed);
747 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
748 memory_order_relaxed);
749 if (obytes_peak < obytes)
c871e6c9
RZ
750 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
751 memory_order_relaxed);
ad4d1022 752
d35f447d
RZ
753 /* Tell the thread to start writing. */
754 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
755 &fnc->t_write);
756
757 return 0;
758}
759
018e77bc
RZ
760/**
761 * Send all RIB installed routes to the connected data plane.
762 */
763static int fpm_rib_send(struct thread *t)
764{
765 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
766 rib_dest_t *dest;
767 struct route_node *rn;
768 struct route_table *rt;
769 struct zebra_dplane_ctx *ctx;
770 rib_tables_iter_t rt_iter;
771
772 /* Allocate temporary context for all transactions. */
773 ctx = dplane_ctx_alloc();
774
775 rt_iter.state = RIB_TABLES_ITER_S_INIT;
776 while ((rt = rib_tables_iter_next(&rt_iter))) {
777 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
778 dest = rib_dest_from_rnode(rn);
779 /* Skip bad route entries. */
a50404aa 780 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 781 continue;
018e77bc
RZ
782
783 /* Check for already sent routes. */
a50404aa 784 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 785 continue;
018e77bc
RZ
786
787 /* Enqueue route install. */
788 dplane_ctx_reset(ctx);
789 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
790 dest->selected_fib);
791 if (fpm_nl_enqueue(fnc, ctx) == -1) {
792 /* Free the temporary allocated context. */
793 dplane_ctx_fini(&ctx);
794
018e77bc
RZ
795 thread_add_timer(zrouter.master, fpm_rib_send,
796 fnc, 1, &fnc->t_ribwalk);
797 return 0;
798 }
799
800 /* Mark as sent. */
801 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
802 }
803 }
804
805 /* Free the temporary allocated context. */
806 dplane_ctx_fini(&ctx);
807
808 /* All RIB routes sent! */
809 fnc->rib_complete = true;
810
811 return 0;
812}
813
bda10adf
RZ
814/*
815 * The next three functions will handle RMAC enqueue.
816 */
817struct fpm_rmac_arg {
818 struct zebra_dplane_ctx *ctx;
819 struct fpm_nl_ctx *fnc;
820 zebra_l3vni_t *zl3vni;
821};
822
9d5c3268 823static void fpm_enqueue_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
824{
825 struct fpm_rmac_arg *fra = arg;
826 zebra_mac_t *zrmac = backet->data;
827 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
828 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
829 struct zebra_if *br_zif;
830 vlanid_t vid;
831 bool sticky;
832
833 /* Entry already sent. */
834 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT))
835 return;
836
837 sticky = !!CHECK_FLAG(zrmac->flags,
838 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
839 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
840 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
841
842 dplane_ctx_reset(fra->ctx);
843 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
844 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a
RZ
845 zif->brslave_info.br_if, vid,
846 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky);
bda10adf 847 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
848 thread_add_timer(zrouter.master, fpm_rmac_send,
849 fra->fnc, 1, &fra->fnc->t_rmacwalk);
850 }
851}
852
9d5c3268 853static void fpm_enqueue_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
854{
855 struct fpm_rmac_arg *fra = arg;
856 zebra_l3vni_t *zl3vni = backet->data;
857
858 fra->zl3vni = zl3vni;
859 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
860}
861
862static int fpm_rmac_send(struct thread *t)
863{
864 struct fpm_rmac_arg fra;
865
866 fra.fnc = THREAD_ARG(t);
867 fra.ctx = dplane_ctx_alloc();
868 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
869 dplane_ctx_fini(&fra.ctx);
870
871 return 0;
872}
873
018e77bc
RZ
874/**
875 * Resets the RIB FPM flags so we send all routes again.
876 */
877static int fpm_rib_reset(struct thread *t)
878{
879 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
880 rib_dest_t *dest;
881 struct route_node *rn;
882 struct route_table *rt;
883 rib_tables_iter_t rt_iter;
884
885 fnc->rib_complete = false;
886
887 rt_iter.state = RIB_TABLES_ITER_S_INIT;
888 while ((rt = rib_tables_iter_next(&rt_iter))) {
889 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
890 dest = rib_dest_from_rnode(rn);
891 /* Skip bad route entries. */
892 if (dest == NULL)
893 continue;
894
895 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
896 }
897 }
898
899 return 0;
900}
901
bda10adf
RZ
902/*
903 * The next three function will handle RMAC table reset.
904 */
9d5c3268 905static void fpm_unset_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
906{
907 zebra_mac_t *zrmac = backet->data;
908
909 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
910}
911
9d5c3268 912static void fpm_unset_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
913{
914 zebra_l3vni_t *zl3vni = backet->data;
915
916 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
917}
918
919static int fpm_rmac_reset(struct thread *t)
920{
921 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
922
923 return 0;
924}
925
ba803a2f
RZ
926static int fpm_process_queue(struct thread *t)
927{
928 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
929 struct zebra_dplane_ctx *ctx;
930
931 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
932
933 while (true) {
934 /* No space available yet. */
935 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
936 break;
937
938 /* Dequeue next item or quit processing. */
939 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
940 if (ctx == NULL)
941 break;
942
943 fpm_nl_enqueue(fnc, ctx);
944
945 /* Account the processed entries. */
c871e6c9
RZ
946 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1,
947 memory_order_relaxed);
948 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
949 memory_order_relaxed);
ba803a2f
RZ
950
951 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
952 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
953 }
954
955 /* Check for more items in the queue. */
c871e6c9
RZ
956 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
957 memory_order_relaxed)
958 > 0)
ba803a2f
RZ
959 thread_add_timer(fnc->fthread->master, fpm_process_queue,
960 fnc, 0, &fnc->t_dequeue);
961
962 return 0;
963}
964
3bdd7fca
RZ
965/**
966 * Handles external (e.g. CLI, data plane or others) events.
967 */
968static int fpm_process_event(struct thread *t)
969{
970 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
971 int event = THREAD_VAL(t);
972
973 switch (event) {
974 case FNE_DISABLE:
e5e444d8 975 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 976 fnc->disabled = true;
c871e6c9
RZ
977 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
978 memory_order_relaxed);
3bdd7fca
RZ
979
980 /* Call reconnect to disable timers and clean up context. */
981 fpm_reconnect(fnc);
982 break;
983
984 case FNE_RECONNECT:
e5e444d8 985 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 986 fnc->disabled = false;
c871e6c9
RZ
987 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
988 memory_order_relaxed);
3bdd7fca
RZ
989 fpm_reconnect(fnc);
990 break;
991
6cc059cd 992 case FNE_RESET_COUNTERS:
e5e444d8 993 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
994 memset(&fnc->counters, 0, sizeof(fnc->counters));
995 break;
996
3bdd7fca 997 default:
e5e444d8
RZ
998 if (IS_ZEBRA_DEBUG_FPM)
999 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1000 break;
1001 }
1002
1003 return 0;
1004}
1005
d35f447d
RZ
1006/*
1007 * Data plane functions.
1008 */
1009static int fpm_nl_start(struct zebra_dplane_provider *prov)
1010{
1011 struct fpm_nl_ctx *fnc;
1012
1013 fnc = dplane_provider_get_data(prov);
1014 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1015 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1016 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1017 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1018 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1019 fnc->socket = -1;
3bdd7fca 1020 fnc->disabled = true;
ba803a2f
RZ
1021 fnc->prov = prov;
1022 TAILQ_INIT(&fnc->ctxqueue);
1023 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d
RZ
1024
1025 return 0;
1026}
1027
1028static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1029{
1030 struct fpm_nl_ctx *fnc;
1031
1032 fnc = dplane_provider_get_data(prov);
1033 stream_free(fnc->ibuf);
1034 stream_free(fnc->obuf);
1035 close(fnc->socket);
1036
1037 return 0;
1038}
1039
1040static int fpm_nl_process(struct zebra_dplane_provider *prov)
1041{
1042 struct zebra_dplane_ctx *ctx;
1043 struct fpm_nl_ctx *fnc;
1044 int counter, limit;
edfeff42 1045 uint64_t cur_queue, peak_queue;
d35f447d
RZ
1046
1047 fnc = dplane_provider_get_data(prov);
1048 limit = dplane_provider_get_work_limit(prov);
1049 for (counter = 0; counter < limit; counter++) {
1050 ctx = dplane_provider_dequeue_in_ctx(prov);
1051 if (ctx == NULL)
1052 break;
1053
1054 /*
1055 * Skip all notifications if not connected, we'll walk the RIB
1056 * anyway.
1057 */
6cc059cd 1058 if (fnc->socket != -1 && fnc->connecting == false) {
ba803a2f
RZ
1059 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1060 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1061
1062 /* Account the number of contexts. */
c871e6c9
RZ
1063 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1064 1, memory_order_relaxed);
1065 cur_queue = atomic_load_explicit(
1066 &fnc->counters.ctxqueue_len,
1067 memory_order_relaxed);
1068 peak_queue = atomic_load_explicit(
1069 &fnc->counters.ctxqueue_len_peak,
1070 memory_order_relaxed);
edfeff42 1071 if (peak_queue < cur_queue)
c871e6c9
RZ
1072 atomic_store_explicit(
1073 &fnc->counters.ctxqueue_len_peak,
1074 peak_queue, memory_order_relaxed);
ba803a2f 1075 continue;
6cc059cd
RZ
1076 }
1077
d35f447d
RZ
1078 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1079 dplane_provider_enqueue_out_ctx(prov, ctx);
1080 }
1081
c871e6c9
RZ
1082 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1083 memory_order_relaxed)
1084 > 0)
ba803a2f
RZ
1085 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1086 fnc, 0, &fnc->t_dequeue);
1087
d35f447d
RZ
1088 return 0;
1089}
1090
1091static int fpm_nl_new(struct thread_master *tm)
1092{
1093 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1094 int rv;
1095
3bdd7fca 1096 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1097 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1098 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1099 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1100 &prov);
1101
1102 if (IS_ZEBRA_DEBUG_DPLANE)
1103 zlog_debug("%s register status: %d", prov_name, rv);
1104
3bdd7fca 1105 install_node(&fpm_node, fpm_write_config);
6cc059cd
RZ
1106 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1107 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1108 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1109 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1110 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
1111
d35f447d
RZ
1112 return 0;
1113}
1114
1115static int fpm_nl_init(void)
1116{
1117 hook_register(frr_late_init, fpm_nl_new);
1118 return 0;
1119}
1120
1121FRR_MODULE_SETUP(
1122 .name = "dplane_fpm_nl",
1123 .version = "0.0.1",
1124 .description = "Data plane plugin for FPM using netlink.",
1125 .init = fpm_nl_init,
1126 )