]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
build: fix data plane FPM netlink module
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22#include <arpa/inet.h>
23
24#include <sys/types.h>
25#include <sys/socket.h>
26
27#include <errno.h>
28#include <string.h>
29
30#include "config.h" /* Include this explicitly */
31#include "lib/zebra.h"
6cc059cd 32#include "lib/json.h"
d35f447d 33#include "lib/libfrr.h"
c871e6c9 34#include "lib/frratomic.h"
3bdd7fca 35#include "lib/command.h"
d35f447d
RZ
36#include "lib/memory.h"
37#include "lib/network.h"
38#include "lib/ns.h"
39#include "lib/frr_pthread.h"
bda10adf 40#include "zebra/interface.h"
d35f447d 41#include "zebra/zebra_dplane.h"
018e77bc 42#include "zebra/zebra_router.h"
bda10adf 43#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
44#include "zebra/kernel_netlink.h"
45#include "zebra/rt_netlink.h"
46#include "zebra/debug.h"
47
48#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
49#define SOUTHBOUND_DEFAULT_PORT 2620
50
a179ba35
RZ
51/**
52 * FPM header:
53 * {
54 * version: 1 byte (always 1),
55 * type: 1 byte (1 for netlink, 2 protobuf),
56 * len: 2 bytes (network order),
57 * }
58 *
59 * This header is used with any format to tell the users how many bytes to
60 * expect.
61 */
62#define FPM_HEADER_SIZE 4
63
d35f447d
RZ
64static const char *prov_name = "dplane_fpm_nl";
65
66struct fpm_nl_ctx {
67 /* data plane connection. */
68 int socket;
3bdd7fca 69 bool disabled;
d35f447d 70 bool connecting;
018e77bc 71 bool rib_complete;
bda10adf 72 bool rmac_complete;
d35f447d
RZ
73 struct sockaddr_storage addr;
74
75 /* data plane buffers. */
76 struct stream *ibuf;
77 struct stream *obuf;
78 pthread_mutex_t obuf_mutex;
79
ba803a2f
RZ
80 /*
81 * data plane context queue:
82 * When a FPM server connection becomes a bottleneck, we must keep the
83 * data plane contexts until we get a chance to process them.
84 */
85 struct dplane_ctx_q ctxqueue;
86 pthread_mutex_t ctxqueue_mutex;
87
d35f447d 88 /* data plane events. */
ba803a2f 89 struct zebra_dplane_provider *prov;
d35f447d
RZ
90 struct frr_pthread *fthread;
91 struct thread *t_connect;
92 struct thread *t_read;
93 struct thread *t_write;
3bdd7fca 94 struct thread *t_event;
ba803a2f 95 struct thread *t_dequeue;
018e77bc
RZ
96
97 /* zebra events. */
98 struct thread *t_ribreset;
99 struct thread *t_ribwalk;
bda10adf
RZ
100 struct thread *t_rmacreset;
101 struct thread *t_rmacwalk;
6cc059cd
RZ
102
103 /* Statistic counters. */
104 struct {
105 /* Amount of bytes read into ibuf. */
edfeff42 106 _Atomic uint64_t bytes_read;
6cc059cd 107 /* Amount of bytes written from obuf. */
edfeff42 108 _Atomic uint64_t bytes_sent;
ad4d1022 109 /* Output buffer current usage. */
edfeff42 110 _Atomic uint64_t obuf_bytes;
ad4d1022 111 /* Output buffer peak usage. */
edfeff42 112 _Atomic uint64_t obuf_peak;
6cc059cd
RZ
113
114 /* Amount of connection closes. */
edfeff42 115 _Atomic uint64_t connection_closes;
6cc059cd 116 /* Amount of connection errors. */
edfeff42 117 _Atomic uint64_t connection_errors;
6cc059cd
RZ
118
119 /* Amount of user configurations: FNE_RECONNECT. */
edfeff42 120 _Atomic uint64_t user_configures;
6cc059cd 121 /* Amount of user disable requests: FNE_DISABLE. */
edfeff42 122 _Atomic uint64_t user_disables;
6cc059cd
RZ
123
124 /* Amount of data plane context processed. */
edfeff42 125 _Atomic uint64_t dplane_contexts;
ba803a2f 126 /* Amount of data plane contexts enqueued. */
edfeff42 127 _Atomic uint64_t ctxqueue_len;
ba803a2f 128 /* Peak amount of data plane contexts enqueued. */
edfeff42 129 _Atomic uint64_t ctxqueue_len_peak;
6cc059cd
RZ
130
131 /* Amount of buffer full events. */
edfeff42 132 _Atomic uint64_t buffer_full;
6cc059cd 133 } counters;
3bdd7fca
RZ
134} * gfnc;
135
136enum fpm_nl_events {
137 /* Ask for FPM to reconnect the external server. */
138 FNE_RECONNECT,
139 /* Disable FPM. */
140 FNE_DISABLE,
6cc059cd
RZ
141 /* Reset counters. */
142 FNE_RESET_COUNTERS,
d35f447d
RZ
143};
144
018e77bc
RZ
145/*
146 * Prototypes.
147 */
3bdd7fca 148static int fpm_process_event(struct thread *t);
018e77bc
RZ
149static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
150static int fpm_rib_send(struct thread *t);
151static int fpm_rib_reset(struct thread *t);
bda10adf
RZ
152static int fpm_rmac_send(struct thread *t);
153static int fpm_rmac_reset(struct thread *t);
018e77bc 154
ad4d1022
RZ
155/*
156 * Helper functions.
157 */
158
159/**
160 * Reorganizes the data on the buffer so it can fit more data.
161 *
162 * @param s stream pointer.
163 */
164static void stream_pulldown(struct stream *s)
165{
166 size_t rlen = STREAM_READABLE(s);
167
168 /* No more data, so just move the pointers. */
169 if (rlen == 0) {
170 stream_reset(s);
171 return;
172 }
173
174 /* Move the available data to the beginning. */
175 memmove(s->data, &s->data[s->getp], rlen);
176 s->getp = 0;
177 s->endp = rlen;
178}
179
3bdd7fca
RZ
180/*
181 * CLI.
182 */
6cc059cd
RZ
183#define FPM_STR "Forwarding Plane Manager configuration\n"
184
3bdd7fca
RZ
185DEFUN(fpm_set_address, fpm_set_address_cmd,
186 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 187 FPM_STR
3bdd7fca
RZ
188 "FPM remote listening server address\n"
189 "Remote IPv4 FPM server\n"
190 "Remote IPv6 FPM server\n"
191 "FPM remote listening server port\n"
192 "Remote FPM server port\n")
193{
194 struct sockaddr_in *sin;
195 struct sockaddr_in6 *sin6;
196 uint16_t port = 0;
197 uint8_t naddr[INET6_BUFSIZ];
198
199 if (argc == 5)
200 port = strtol(argv[4]->arg, NULL, 10);
201
202 /* Handle IPv4 addresses. */
203 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
204 sin = (struct sockaddr_in *)&gfnc->addr;
205
206 memset(sin, 0, sizeof(*sin));
207 sin->sin_family = AF_INET;
208 sin->sin_port =
209 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
210#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
211 sin->sin_len = sizeof(*sin);
212#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
213 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
214
215 goto ask_reconnect;
216 }
217
218 /* Handle IPv6 addresses. */
219 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
220 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
221 return CMD_WARNING;
222 }
223
224 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
225 memset(sin6, 0, sizeof(*sin6));
226 sin6->sin6_family = AF_INET6;
227 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
228#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
229 sin6->sin6_len = sizeof(*sin6);
230#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
231 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
232
233ask_reconnect:
234 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
235 FNE_RECONNECT, &gfnc->t_event);
236 return CMD_SUCCESS;
237}
238
239DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
240 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
241 NO_STR
6cc059cd 242 FPM_STR
3bdd7fca
RZ
243 "FPM remote listening server address\n"
244 "Remote IPv4 FPM server\n"
245 "Remote IPv6 FPM server\n"
246 "FPM remote listening server port\n"
247 "Remote FPM server port\n")
248{
249 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
250 FNE_DISABLE, &gfnc->t_event);
251 return CMD_SUCCESS;
252}
253
6cc059cd
RZ
254DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
255 "clear fpm counters",
256 CLEAR_STR
257 FPM_STR
258 "FPM statistic counters\n")
259{
260 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
261 FNE_RESET_COUNTERS, &gfnc->t_event);
262 return CMD_SUCCESS;
263}
264
265DEFUN(fpm_show_counters, fpm_show_counters_cmd,
266 "show fpm counters",
267 SHOW_STR
268 FPM_STR
269 "FPM statistic counters\n")
270{
271 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
272
273#define SHOW_COUNTER(label, counter) \
274 vty_out(vty, "%28s: %Lu\n", (label), (counter));
275
276 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
277 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
278 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
279 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
280 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
281 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
282 SHOW_COUNTER("Data plane items processed",
283 gfnc->counters.dplane_contexts);
ba803a2f
RZ
284 SHOW_COUNTER("Data plane items enqueued",
285 gfnc->counters.ctxqueue_len);
286 SHOW_COUNTER("Data plane items queue peak",
287 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
288 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
289 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
290 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
291
292#undef SHOW_COUNTER
293
294 return CMD_SUCCESS;
295}
296
297DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
298 "show fpm counters json",
299 SHOW_STR
300 FPM_STR
301 "FPM statistic counters\n"
302 JSON_STR)
303{
304 struct json_object *jo;
305
306 jo = json_object_new_object();
307 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
308 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
309 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
310 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
6cc059cd
RZ
311 json_object_int_add(jo, "connection-closes", gfnc->counters.connection_closes);
312 json_object_int_add(jo, "connection-errors", gfnc->counters.connection_errors);
313 json_object_int_add(jo, "data-plane-contexts", gfnc->counters.dplane_contexts);
ba803a2f
RZ
314 json_object_int_add(jo, "data-plane-contexts-queue",
315 gfnc->counters.ctxqueue_len);
316 json_object_int_add(jo, "data-plane-contexts-queue-peak",
317 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
318 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
319 json_object_int_add(jo, "user-configures", gfnc->counters.user_configures);
320 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
321 vty_out(vty, "%s\n", json_object_to_json_string_ext(jo, 0));
322 json_object_free(jo);
323
324 return CMD_SUCCESS;
325}
326
3bdd7fca
RZ
327static int fpm_write_config(struct vty *vty)
328{
329 struct sockaddr_in *sin;
330 struct sockaddr_in6 *sin6;
331 int written = 0;
332 char addrstr[INET6_ADDRSTRLEN];
333
334 if (gfnc->disabled)
335 return written;
336
337 switch (gfnc->addr.ss_family) {
338 case AF_INET:
339 written = 1;
340 sin = (struct sockaddr_in *)&gfnc->addr;
341 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
342 vty_out(vty, "fpm address %s", addrstr);
343 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
344 vty_out(vty, " port %d", ntohs(sin->sin_port));
345
346 vty_out(vty, "\n");
347 break;
348 case AF_INET6:
349 written = 1;
350 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
351 inet_ntop(AF_INET, &sin6->sin6_addr, addrstr, sizeof(addrstr));
352 vty_out(vty, "fpm address %s", addrstr);
353 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
354 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
355
356 vty_out(vty, "\n");
357 break;
358
359 default:
360 break;
361 }
362
363 return written;
364}
365
366struct cmd_node fpm_node = {
367 .node = VTY_NODE,
368 .prompt = "",
369 .vtysh = 1,
370};
371
d35f447d
RZ
372/*
373 * FPM functions.
374 */
375static int fpm_connect(struct thread *t);
376
377static void fpm_reconnect(struct fpm_nl_ctx *fnc)
378{
379 /* Grab the lock to empty the stream and stop the zebra thread. */
380 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
381
3bdd7fca
RZ
382 /* Avoid calling close on `-1`. */
383 if (fnc->socket != -1) {
384 close(fnc->socket);
385 fnc->socket = -1;
386 }
387
d35f447d
RZ
388 stream_reset(fnc->ibuf);
389 stream_reset(fnc->obuf);
390 THREAD_OFF(fnc->t_read);
391 THREAD_OFF(fnc->t_write);
018e77bc
RZ
392
393 if (fnc->t_ribreset)
394 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
395 if (fnc->t_ribwalk)
396 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
bda10adf
RZ
397 if (fnc->t_rmacreset)
398 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
399 if (fnc->t_rmacwalk)
400 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
018e77bc 401
3bdd7fca
RZ
402 /* FPM is disabled, don't attempt to connect. */
403 if (fnc->disabled)
404 return;
405
d35f447d
RZ
406 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
407 &fnc->t_connect);
408}
409
410static int fpm_read(struct thread *t)
411{
412 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
413 ssize_t rv;
414
415 /* Let's ignore the input at the moment. */
416 rv = stream_read_try(fnc->ibuf, fnc->socket,
417 STREAM_WRITEABLE(fnc->ibuf));
418 if (rv == 0) {
c871e6c9
RZ
419 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
420 memory_order_relaxed);
d35f447d
RZ
421 zlog_debug("%s: connection closed", __func__);
422 fpm_reconnect(fnc);
423 return 0;
424 }
425 if (rv == -1) {
426 if (errno == EAGAIN || errno == EWOULDBLOCK
427 || errno == EINTR)
428 return 0;
429
c871e6c9
RZ
430 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
431 memory_order_relaxed);
d35f447d
RZ
432 zlog_debug("%s: connection failure: %s", __func__,
433 strerror(errno));
434 fpm_reconnect(fnc);
435 return 0;
436 }
437 stream_reset(fnc->ibuf);
438
6cc059cd 439 /* Account all bytes read. */
c871e6c9
RZ
440 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
441 memory_order_relaxed);
6cc059cd 442
d35f447d
RZ
443 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
444 &fnc->t_read);
445
446 return 0;
447}
448
449static int fpm_write(struct thread *t)
450{
451 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
452 socklen_t statuslen;
453 ssize_t bwritten;
454 int rv, status;
455 size_t btotal;
456
457 if (fnc->connecting == true) {
458 status = 0;
459 statuslen = sizeof(status);
460
461 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
462 &statuslen);
463 if (rv == -1 || status != 0) {
464 if (rv != -1)
465 zlog_debug("%s: connection failed: %s",
466 __func__, strerror(status));
467 else
468 zlog_debug("%s: SO_ERROR failed: %s", __func__,
469 strerror(status));
470
c871e6c9
RZ
471 atomic_fetch_add_explicit(
472 &fnc->counters.connection_errors, 1,
473 memory_order_relaxed);
6cc059cd 474
d35f447d
RZ
475 fpm_reconnect(fnc);
476 return 0;
477 }
478
479 fnc->connecting = false;
018e77bc
RZ
480
481 /* Ask zebra main thread to start walking the RIB table. */
482 thread_add_timer(zrouter.master, fpm_rib_send, fnc, 0,
483 &fnc->t_ribwalk);
bda10adf
RZ
484 thread_add_timer(zrouter.master, fpm_rmac_send, fnc, 0,
485 &fnc->t_rmacwalk);
d35f447d
RZ
486 }
487
488 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
489
490 while (true) {
491 /* Stream is empty: reset pointers and return. */
492 if (STREAM_READABLE(fnc->obuf) == 0) {
493 stream_reset(fnc->obuf);
494 break;
495 }
496
497 /* Try to write all at once. */
498 btotal = stream_get_endp(fnc->obuf) -
499 stream_get_getp(fnc->obuf);
500 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
501 if (bwritten == 0) {
c871e6c9
RZ
502 atomic_fetch_add_explicit(
503 &fnc->counters.connection_closes, 1,
504 memory_order_relaxed);
d35f447d
RZ
505 zlog_debug("%s: connection closed", __func__);
506 break;
507 }
508 if (bwritten == -1) {
ad4d1022
RZ
509 /* Attempt to continue if blocked by a signal. */
510 if (errno == EINTR)
511 continue;
512 /* Receiver is probably slow, lets give it some time. */
513 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
514 break;
515
c871e6c9
RZ
516 atomic_fetch_add_explicit(
517 &fnc->counters.connection_errors, 1,
518 memory_order_relaxed);
d35f447d
RZ
519 zlog_debug("%s: connection failure: %s", __func__,
520 strerror(errno));
521 fpm_reconnect(fnc);
522 break;
523 }
524
6cc059cd 525 /* Account all bytes sent. */
c871e6c9
RZ
526 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
527 memory_order_relaxed);
6cc059cd 528
ad4d1022 529 /* Account number of bytes free. */
c871e6c9
RZ
530 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
531 memory_order_relaxed);
ad4d1022 532
d35f447d
RZ
533 stream_forward_getp(fnc->obuf, (size_t)bwritten);
534 }
535
536 /* Stream is not empty yet, we must schedule more writes. */
537 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 538 stream_pulldown(fnc->obuf);
d35f447d
RZ
539 thread_add_write(fnc->fthread->master, fpm_write, fnc,
540 fnc->socket, &fnc->t_write);
541 return 0;
542 }
543
544 return 0;
545}
546
547static int fpm_connect(struct thread *t)
548{
549 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
550 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
551 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
552 socklen_t slen;
d35f447d
RZ
553 int rv, sock;
554 char addrstr[INET6_ADDRSTRLEN];
555
3bdd7fca 556 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 557 if (sock == -1) {
6cc059cd 558 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
559 strerror(errno));
560 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
561 &fnc->t_connect);
562 return 0;
563 }
564
565 set_nonblocking(sock);
566
3bdd7fca
RZ
567 if (fnc->addr.ss_family == AF_INET) {
568 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
569 slen = sizeof(*sin);
570 } else {
571 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
572 slen = sizeof(*sin6);
573 }
d35f447d 574
d35f447d
RZ
575 zlog_debug("%s: attempting to connect to %s:%d", __func__, addrstr,
576 ntohs(sin->sin_port));
577
3bdd7fca 578 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 579 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
580 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
581 memory_order_relaxed);
d35f447d
RZ
582 close(sock);
583 zlog_warn("%s: fpm connection failed: %s", __func__,
584 strerror(errno));
585 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
586 &fnc->t_connect);
587 return 0;
588 }
589
590 fnc->connecting = (errno == EINPROGRESS);
591 fnc->socket = sock;
592 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
593 &fnc->t_read);
594 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
595 &fnc->t_write);
596
018e77bc
RZ
597 /* Mark all routes as unsent. */
598 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
599 &fnc->t_ribreset);
bda10adf
RZ
600 thread_add_timer(zrouter.master, fpm_rmac_reset, fnc, 0,
601 &fnc->t_rmacreset);
018e77bc 602
d35f447d
RZ
603 return 0;
604}
605
606/**
607 * Encode data plane operation context into netlink and enqueue it in the FPM
608 * output buffer.
609 *
610 * @param fnc the netlink FPM context.
611 * @param ctx the data plane operation context data.
612 * @return 0 on success or -1 on not enough space.
613 */
614static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
615{
616 uint8_t nl_buf[NL_PKT_BUF_SIZE];
617 size_t nl_buf_len;
618 ssize_t rv;
edfeff42 619 uint64_t obytes, obytes_peak;
d35f447d
RZ
620
621 nl_buf_len = 0;
622
623 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
624
625 switch (dplane_ctx_get_op(ctx)) {
626 case DPLANE_OP_ROUTE_UPDATE:
627 case DPLANE_OP_ROUTE_DELETE:
628 rv = netlink_route_multipath(RTM_DELROUTE, ctx, nl_buf,
629 sizeof(nl_buf));
630 if (rv <= 0) {
631 zlog_debug("%s: netlink_route_multipath failed",
632 __func__);
633 return 0;
634 }
635
636 nl_buf_len = (size_t)rv;
d35f447d
RZ
637
638 /* UPDATE operations need a INSTALL, otherwise just quit. */
639 if (dplane_ctx_get_op(ctx) == DPLANE_OP_ROUTE_DELETE)
640 break;
641
642 /* FALL THROUGH */
643 case DPLANE_OP_ROUTE_INSTALL:
644 rv = netlink_route_multipath(RTM_NEWROUTE, ctx,
645 &nl_buf[nl_buf_len],
646 sizeof(nl_buf) - nl_buf_len);
647 if (rv <= 0) {
648 zlog_debug("%s: netlink_route_multipath failed",
649 __func__);
650 return 0;
651 }
652
653 nl_buf_len += (size_t)rv;
d35f447d
RZ
654 break;
655
bda10adf
RZ
656 case DPLANE_OP_MAC_INSTALL:
657 case DPLANE_OP_MAC_DELETE:
658 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
659 if (rv <= 0) {
660 zlog_debug("%s: netlink_macfdb_update_ctx failed",
661 __func__);
662 return 0;
663 }
664
665 nl_buf_len = (size_t)rv;
bda10adf
RZ
666 break;
667
d35f447d
RZ
668 case DPLANE_OP_NH_INSTALL:
669 case DPLANE_OP_NH_UPDATE:
670 case DPLANE_OP_NH_DELETE:
671 case DPLANE_OP_LSP_INSTALL:
672 case DPLANE_OP_LSP_UPDATE:
673 case DPLANE_OP_LSP_DELETE:
674 case DPLANE_OP_PW_INSTALL:
675 case DPLANE_OP_PW_UNINSTALL:
676 case DPLANE_OP_ADDR_INSTALL:
677 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
678 case DPLANE_OP_NEIGH_INSTALL:
679 case DPLANE_OP_NEIGH_UPDATE:
680 case DPLANE_OP_NEIGH_DELETE:
681 case DPLANE_OP_VTEP_ADD:
682 case DPLANE_OP_VTEP_DELETE:
683 case DPLANE_OP_SYS_ROUTE_ADD:
684 case DPLANE_OP_SYS_ROUTE_DELETE:
685 case DPLANE_OP_ROUTE_NOTIFY:
686 case DPLANE_OP_LSP_NOTIFY:
687 case DPLANE_OP_NONE:
688 break;
689
690 default:
691 zlog_debug("%s: unhandled data plane message (%d) %s",
692 __func__, dplane_ctx_get_op(ctx),
693 dplane_op2str(dplane_ctx_get_op(ctx)));
694 break;
695 }
696
697 /* Skip empty enqueues. */
698 if (nl_buf_len == 0)
699 return 0;
700
a179ba35
RZ
701 /* We must know if someday a message goes beyond 65KiB. */
702 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
703
704 /* Check if we have enough buffer space. */
705 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
706 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
707 memory_order_relaxed);
708 zlog_debug("%s: buffer full: wants to write %zu but has %zu",
a179ba35
RZ
709 __func__, nl_buf_len + FPM_HEADER_SIZE,
710 STREAM_WRITEABLE(fnc->obuf));
711 return -1;
712 }
713
d35f447d 714 /*
a179ba35
RZ
715 * Fill in the FPM header information.
716 *
717 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
718 */
719 stream_putc(fnc->obuf, 1);
720 stream_putc(fnc->obuf, 1);
a179ba35 721 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
722
723 /* Write current data. */
724 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
725
ad4d1022 726 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
727 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
728 nl_buf_len + FPM_HEADER_SIZE,
729 memory_order_relaxed);
edfeff42
RZ
730 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
731 memory_order_relaxed);
732 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
733 memory_order_relaxed);
734 if (obytes_peak < obytes)
c871e6c9
RZ
735 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
736 memory_order_relaxed);
ad4d1022 737
d35f447d
RZ
738 /* Tell the thread to start writing. */
739 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
740 &fnc->t_write);
741
742 return 0;
743}
744
018e77bc
RZ
745/**
746 * Send all RIB installed routes to the connected data plane.
747 */
748static int fpm_rib_send(struct thread *t)
749{
750 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
751 rib_dest_t *dest;
752 struct route_node *rn;
753 struct route_table *rt;
754 struct zebra_dplane_ctx *ctx;
755 rib_tables_iter_t rt_iter;
756
757 /* Allocate temporary context for all transactions. */
758 ctx = dplane_ctx_alloc();
759
760 rt_iter.state = RIB_TABLES_ITER_S_INIT;
761 while ((rt = rib_tables_iter_next(&rt_iter))) {
762 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
763 dest = rib_dest_from_rnode(rn);
764 /* Skip bad route entries. */
765 if (dest == NULL || dest->selected_fib == NULL) {
766 continue;
767 }
768
769 /* Check for already sent routes. */
770 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
771 continue;
772 }
773
774 /* Enqueue route install. */
775 dplane_ctx_reset(ctx);
776 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
777 dest->selected_fib);
778 if (fpm_nl_enqueue(fnc, ctx) == -1) {
779 /* Free the temporary allocated context. */
780 dplane_ctx_fini(&ctx);
781
018e77bc
RZ
782 thread_add_timer(zrouter.master, fpm_rib_send,
783 fnc, 1, &fnc->t_ribwalk);
784 return 0;
785 }
786
787 /* Mark as sent. */
788 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
789 }
790 }
791
792 /* Free the temporary allocated context. */
793 dplane_ctx_fini(&ctx);
794
795 /* All RIB routes sent! */
796 fnc->rib_complete = true;
797
798 return 0;
799}
800
bda10adf
RZ
801/*
802 * The next three functions will handle RMAC enqueue.
803 */
804struct fpm_rmac_arg {
805 struct zebra_dplane_ctx *ctx;
806 struct fpm_nl_ctx *fnc;
807 zebra_l3vni_t *zl3vni;
808};
809
810static void fpm_enqueue_rmac_table(struct hash_backet *backet, void *arg)
811{
812 struct fpm_rmac_arg *fra = arg;
813 zebra_mac_t *zrmac = backet->data;
814 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
815 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
816 struct zebra_if *br_zif;
817 vlanid_t vid;
818 bool sticky;
819
820 /* Entry already sent. */
821 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT))
822 return;
823
824 sticky = !!CHECK_FLAG(zrmac->flags,
825 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
826 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
827 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
828
829 dplane_ctx_reset(fra->ctx);
830 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
831 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
832 zif->brslave_info.br_if, vid, &zrmac->macaddr,
833 zrmac->fwd_info.r_vtep_ip, sticky);
834 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
835 thread_add_timer(zrouter.master, fpm_rmac_send,
836 fra->fnc, 1, &fra->fnc->t_rmacwalk);
837 }
838}
839
840static void fpm_enqueue_l3vni_table(struct hash_backet *backet, void *arg)
841{
842 struct fpm_rmac_arg *fra = arg;
843 zebra_l3vni_t *zl3vni = backet->data;
844
845 fra->zl3vni = zl3vni;
846 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
847}
848
849static int fpm_rmac_send(struct thread *t)
850{
851 struct fpm_rmac_arg fra;
852
853 fra.fnc = THREAD_ARG(t);
854 fra.ctx = dplane_ctx_alloc();
855 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
856 dplane_ctx_fini(&fra.ctx);
857
858 return 0;
859}
860
018e77bc
RZ
861/**
862 * Resets the RIB FPM flags so we send all routes again.
863 */
864static int fpm_rib_reset(struct thread *t)
865{
866 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
867 rib_dest_t *dest;
868 struct route_node *rn;
869 struct route_table *rt;
870 rib_tables_iter_t rt_iter;
871
872 fnc->rib_complete = false;
873
874 rt_iter.state = RIB_TABLES_ITER_S_INIT;
875 while ((rt = rib_tables_iter_next(&rt_iter))) {
876 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
877 dest = rib_dest_from_rnode(rn);
878 /* Skip bad route entries. */
879 if (dest == NULL)
880 continue;
881
882 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
883 }
884 }
885
886 return 0;
887}
888
bda10adf
RZ
889/*
890 * The next three function will handle RMAC table reset.
891 */
892static void fpm_unset_rmac_table(struct hash_backet *backet, void *arg)
893{
894 zebra_mac_t *zrmac = backet->data;
895
896 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
897}
898
899static void fpm_unset_l3vni_table(struct hash_backet *backet, void *arg)
900{
901 zebra_l3vni_t *zl3vni = backet->data;
902
903 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
904}
905
906static int fpm_rmac_reset(struct thread *t)
907{
908 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
909
910 return 0;
911}
912
ba803a2f
RZ
913static int fpm_process_queue(struct thread *t)
914{
915 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
916 struct zebra_dplane_ctx *ctx;
917
918 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
919
920 while (true) {
921 /* No space available yet. */
922 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
923 break;
924
925 /* Dequeue next item or quit processing. */
926 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
927 if (ctx == NULL)
928 break;
929
930 fpm_nl_enqueue(fnc, ctx);
931
932 /* Account the processed entries. */
c871e6c9
RZ
933 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1,
934 memory_order_relaxed);
935 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
936 memory_order_relaxed);
ba803a2f
RZ
937
938 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
939 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
940 }
941
942 /* Check for more items in the queue. */
c871e6c9
RZ
943 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
944 memory_order_relaxed)
945 > 0)
ba803a2f
RZ
946 thread_add_timer(fnc->fthread->master, fpm_process_queue,
947 fnc, 0, &fnc->t_dequeue);
948
949 return 0;
950}
951
3bdd7fca
RZ
952/**
953 * Handles external (e.g. CLI, data plane or others) events.
954 */
955static int fpm_process_event(struct thread *t)
956{
957 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
958 int event = THREAD_VAL(t);
959
960 switch (event) {
961 case FNE_DISABLE:
962 zlog_debug("%s: manual FPM disable event", __func__);
963 fnc->disabled = true;
c871e6c9
RZ
964 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
965 memory_order_relaxed);
3bdd7fca
RZ
966
967 /* Call reconnect to disable timers and clean up context. */
968 fpm_reconnect(fnc);
969 break;
970
971 case FNE_RECONNECT:
972 zlog_debug("%s: manual FPM reconnect event", __func__);
973 fnc->disabled = false;
c871e6c9
RZ
974 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
975 memory_order_relaxed);
3bdd7fca
RZ
976 fpm_reconnect(fnc);
977 break;
978
6cc059cd
RZ
979 case FNE_RESET_COUNTERS:
980 zlog_debug("%s: manual FPM counters reset event", __func__);
981 memset(&fnc->counters, 0, sizeof(fnc->counters));
982 break;
983
3bdd7fca
RZ
984 default:
985 zlog_debug("%s: unhandled event %d", __func__, event);
986 break;
987 }
988
989 return 0;
990}
991
d35f447d
RZ
992/*
993 * Data plane functions.
994 */
995static int fpm_nl_start(struct zebra_dplane_provider *prov)
996{
997 struct fpm_nl_ctx *fnc;
998
999 fnc = dplane_provider_get_data(prov);
1000 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1001 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1002 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1003 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1004 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1005 fnc->socket = -1;
3bdd7fca 1006 fnc->disabled = true;
ba803a2f
RZ
1007 fnc->prov = prov;
1008 TAILQ_INIT(&fnc->ctxqueue);
1009 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d
RZ
1010
1011 return 0;
1012}
1013
1014static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1015{
1016 struct fpm_nl_ctx *fnc;
1017
1018 fnc = dplane_provider_get_data(prov);
1019 stream_free(fnc->ibuf);
1020 stream_free(fnc->obuf);
1021 close(fnc->socket);
1022
1023 return 0;
1024}
1025
1026static int fpm_nl_process(struct zebra_dplane_provider *prov)
1027{
1028 struct zebra_dplane_ctx *ctx;
1029 struct fpm_nl_ctx *fnc;
1030 int counter, limit;
edfeff42 1031 uint64_t cur_queue, peak_queue;
d35f447d
RZ
1032
1033 fnc = dplane_provider_get_data(prov);
1034 limit = dplane_provider_get_work_limit(prov);
1035 for (counter = 0; counter < limit; counter++) {
1036 ctx = dplane_provider_dequeue_in_ctx(prov);
1037 if (ctx == NULL)
1038 break;
1039
1040 /*
1041 * Skip all notifications if not connected, we'll walk the RIB
1042 * anyway.
1043 */
6cc059cd 1044 if (fnc->socket != -1 && fnc->connecting == false) {
ba803a2f
RZ
1045 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1046 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1047
1048 /* Account the number of contexts. */
c871e6c9
RZ
1049 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1050 1, memory_order_relaxed);
1051 cur_queue = atomic_load_explicit(
1052 &fnc->counters.ctxqueue_len,
1053 memory_order_relaxed);
1054 peak_queue = atomic_load_explicit(
1055 &fnc->counters.ctxqueue_len_peak,
1056 memory_order_relaxed);
edfeff42 1057 if (peak_queue < cur_queue)
c871e6c9
RZ
1058 atomic_store_explicit(
1059 &fnc->counters.ctxqueue_len_peak,
1060 peak_queue, memory_order_relaxed);
ba803a2f 1061 continue;
6cc059cd
RZ
1062 }
1063
d35f447d
RZ
1064 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1065 dplane_provider_enqueue_out_ctx(prov, ctx);
1066 }
1067
c871e6c9
RZ
1068 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1069 memory_order_relaxed)
1070 > 0)
ba803a2f
RZ
1071 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1072 fnc, 0, &fnc->t_dequeue);
1073
d35f447d
RZ
1074 return 0;
1075}
1076
1077static int fpm_nl_new(struct thread_master *tm)
1078{
1079 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1080 int rv;
1081
3bdd7fca 1082 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1083 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1084 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1085 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1086 &prov);
1087
1088 if (IS_ZEBRA_DEBUG_DPLANE)
1089 zlog_debug("%s register status: %d", prov_name, rv);
1090
3bdd7fca 1091 install_node(&fpm_node, fpm_write_config);
6cc059cd
RZ
1092 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1093 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1094 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1095 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1096 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
1097
d35f447d
RZ
1098 return 0;
1099}
1100
1101static int fpm_nl_init(void)
1102{
1103 hook_register(frr_late_init, fpm_nl_new);
1104 return 0;
1105}
1106
1107FRR_MODULE_SETUP(
1108 .name = "dplane_fpm_nl",
1109 .version = "0.0.1",
1110 .description = "Data plane plugin for FPM using netlink.",
1111 .init = fpm_nl_init,
1112 )