]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
zebra: avoid default clause in FPM switch
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
b300c8bb 46#include "zebra/zebra_mpls.h"
018e77bc 47#include "zebra/zebra_router.h"
b2998086
PR
48#include "zebra/zebra_evpn.h"
49#include "zebra/zebra_evpn_mac.h"
bda10adf 50#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
51#include "zebra/kernel_netlink.h"
52#include "zebra/rt_netlink.h"
53#include "zebra/debug.h"
54
55#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
56#define SOUTHBOUND_DEFAULT_PORT 2620
57
a179ba35
RZ
58/**
59 * FPM header:
60 * {
61 * version: 1 byte (always 1),
62 * type: 1 byte (1 for netlink, 2 protobuf),
63 * len: 2 bytes (network order),
64 * }
65 *
66 * This header is used with any format to tell the users how many bytes to
67 * expect.
68 */
69#define FPM_HEADER_SIZE 4
70
d35f447d
RZ
71static const char *prov_name = "dplane_fpm_nl";
72
73struct fpm_nl_ctx {
74 /* data plane connection. */
75 int socket;
3bdd7fca 76 bool disabled;
d35f447d 77 bool connecting;
b55ab92a 78 bool use_nhg;
d35f447d
RZ
79 struct sockaddr_storage addr;
80
81 /* data plane buffers. */
82 struct stream *ibuf;
83 struct stream *obuf;
84 pthread_mutex_t obuf_mutex;
85
ba803a2f
RZ
86 /*
87 * data plane context queue:
88 * When a FPM server connection becomes a bottleneck, we must keep the
89 * data plane contexts until we get a chance to process them.
90 */
91 struct dplane_ctx_q ctxqueue;
92 pthread_mutex_t ctxqueue_mutex;
93
d35f447d 94 /* data plane events. */
ba803a2f 95 struct zebra_dplane_provider *prov;
d35f447d
RZ
96 struct frr_pthread *fthread;
97 struct thread *t_connect;
98 struct thread *t_read;
99 struct thread *t_write;
3bdd7fca 100 struct thread *t_event;
ba803a2f 101 struct thread *t_dequeue;
018e77bc
RZ
102
103 /* zebra events. */
f9bf1ecc
DE
104 struct thread *t_lspreset;
105 struct thread *t_lspwalk;
981ca597
RZ
106 struct thread *t_nhgreset;
107 struct thread *t_nhgwalk;
018e77bc
RZ
108 struct thread *t_ribreset;
109 struct thread *t_ribwalk;
bda10adf
RZ
110 struct thread *t_rmacreset;
111 struct thread *t_rmacwalk;
6cc059cd
RZ
112
113 /* Statistic counters. */
114 struct {
115 /* Amount of bytes read into ibuf. */
770a8d28 116 _Atomic uint32_t bytes_read;
6cc059cd 117 /* Amount of bytes written from obuf. */
770a8d28 118 _Atomic uint32_t bytes_sent;
ad4d1022 119 /* Output buffer current usage. */
770a8d28 120 _Atomic uint32_t obuf_bytes;
ad4d1022 121 /* Output buffer peak usage. */
770a8d28 122 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
123
124 /* Amount of connection closes. */
770a8d28 125 _Atomic uint32_t connection_closes;
6cc059cd 126 /* Amount of connection errors. */
770a8d28 127 _Atomic uint32_t connection_errors;
6cc059cd
RZ
128
129 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 130 _Atomic uint32_t user_configures;
6cc059cd 131 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 132 _Atomic uint32_t user_disables;
6cc059cd
RZ
133
134 /* Amount of data plane context processed. */
770a8d28 135 _Atomic uint32_t dplane_contexts;
ba803a2f 136 /* Amount of data plane contexts enqueued. */
770a8d28 137 _Atomic uint32_t ctxqueue_len;
ba803a2f 138 /* Peak amount of data plane contexts enqueued. */
770a8d28 139 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
140
141 /* Amount of buffer full events. */
770a8d28 142 _Atomic uint32_t buffer_full;
6cc059cd 143 } counters;
770a8d28 144} *gfnc;
3bdd7fca
RZ
145
146enum fpm_nl_events {
147 /* Ask for FPM to reconnect the external server. */
148 FNE_RECONNECT,
149 /* Disable FPM. */
150 FNE_DISABLE,
6cc059cd
RZ
151 /* Reset counters. */
152 FNE_RESET_COUNTERS,
b55ab92a
RZ
153 /* Toggle next hop group feature. */
154 FNE_TOGGLE_NHG,
a2032324
RZ
155 /* Reconnect request by our own code to avoid races. */
156 FNE_INTERNAL_RECONNECT,
55eb9d4d 157
f9bf1ecc
DE
158 /* LSP walk finished. */
159 FNE_LSP_FINISHED,
55eb9d4d
RZ
160 /* Next hop groups walk finished. */
161 FNE_NHG_FINISHED,
162 /* RIB walk finished. */
163 FNE_RIB_FINISHED,
164 /* RMAC walk finished. */
165 FNE_RMAC_FINISHED,
d35f447d
RZ
166};
167
a2032324
RZ
168#define FPM_RECONNECT(fnc) \
169 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
170 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
171
55eb9d4d
RZ
172#define WALK_FINISH(fnc, ev) \
173 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
174 (ev), NULL)
175
018e77bc
RZ
176/*
177 * Prototypes.
178 */
cc9f21da 179static void fpm_process_event(struct thread *t);
018e77bc 180static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
cc9f21da
DS
181static void fpm_lsp_send(struct thread *t);
182static void fpm_lsp_reset(struct thread *t);
183static void fpm_nhg_send(struct thread *t);
184static void fpm_nhg_reset(struct thread *t);
185static void fpm_rib_send(struct thread *t);
186static void fpm_rib_reset(struct thread *t);
187static void fpm_rmac_send(struct thread *t);
188static void fpm_rmac_reset(struct thread *t);
018e77bc 189
3bdd7fca
RZ
190/*
191 * CLI.
192 */
6cc059cd
RZ
193#define FPM_STR "Forwarding Plane Manager configuration\n"
194
3bdd7fca
RZ
195DEFUN(fpm_set_address, fpm_set_address_cmd,
196 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 197 FPM_STR
3bdd7fca
RZ
198 "FPM remote listening server address\n"
199 "Remote IPv4 FPM server\n"
200 "Remote IPv6 FPM server\n"
201 "FPM remote listening server port\n"
202 "Remote FPM server port\n")
203{
204 struct sockaddr_in *sin;
205 struct sockaddr_in6 *sin6;
206 uint16_t port = 0;
207 uint8_t naddr[INET6_BUFSIZ];
208
209 if (argc == 5)
210 port = strtol(argv[4]->arg, NULL, 10);
211
212 /* Handle IPv4 addresses. */
213 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
214 sin = (struct sockaddr_in *)&gfnc->addr;
215
216 memset(sin, 0, sizeof(*sin));
217 sin->sin_family = AF_INET;
218 sin->sin_port =
219 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
220#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
221 sin->sin_len = sizeof(*sin);
222#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
223 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
224
225 goto ask_reconnect;
226 }
227
228 /* Handle IPv6 addresses. */
229 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
230 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
231 return CMD_WARNING;
232 }
233
234 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
235 memset(sin6, 0, sizeof(*sin6));
236 sin6->sin6_family = AF_INET6;
237 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
238#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
239 sin6->sin6_len = sizeof(*sin6);
240#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
241 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
242
243ask_reconnect:
244 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
245 FNE_RECONNECT, &gfnc->t_event);
246 return CMD_SUCCESS;
247}
248
249DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
250 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
251 NO_STR
6cc059cd 252 FPM_STR
3bdd7fca
RZ
253 "FPM remote listening server address\n"
254 "Remote IPv4 FPM server\n"
255 "Remote IPv6 FPM server\n"
256 "FPM remote listening server port\n"
257 "Remote FPM server port\n")
258{
259 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
260 FNE_DISABLE, &gfnc->t_event);
261 return CMD_SUCCESS;
262}
263
b55ab92a
RZ
264DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
265 "fpm use-next-hop-groups",
266 FPM_STR
267 "Use netlink next hop groups feature.\n")
268{
269 /* Already enabled. */
270 if (gfnc->use_nhg)
271 return CMD_SUCCESS;
272
273 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
274 FNE_TOGGLE_NHG, &gfnc->t_event);
275
276 return CMD_SUCCESS;
277}
278
279DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
280 "no fpm use-next-hop-groups",
281 NO_STR
282 FPM_STR
283 "Use netlink next hop groups feature.\n")
284{
285 /* Already disabled. */
286 if (!gfnc->use_nhg)
287 return CMD_SUCCESS;
288
289 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
290 FNE_TOGGLE_NHG, &gfnc->t_event);
291
292 return CMD_SUCCESS;
293}
294
6cc059cd
RZ
295DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
296 "clear fpm counters",
297 CLEAR_STR
298 FPM_STR
299 "FPM statistic counters\n")
300{
301 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
302 FNE_RESET_COUNTERS, &gfnc->t_event);
303 return CMD_SUCCESS;
304}
305
306DEFUN(fpm_show_counters, fpm_show_counters_cmd,
307 "show fpm counters",
308 SHOW_STR
309 FPM_STR
310 "FPM statistic counters\n")
311{
312 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
313
314#define SHOW_COUNTER(label, counter) \
770a8d28 315 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
316
317 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
318 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
319 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
320 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
321 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
322 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
323 SHOW_COUNTER("Data plane items processed",
324 gfnc->counters.dplane_contexts);
ba803a2f
RZ
325 SHOW_COUNTER("Data plane items enqueued",
326 gfnc->counters.ctxqueue_len);
327 SHOW_COUNTER("Data plane items queue peak",
328 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
329 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
330 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
331 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
332
333#undef SHOW_COUNTER
334
335 return CMD_SUCCESS;
336}
337
338DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
339 "show fpm counters json",
340 SHOW_STR
341 FPM_STR
342 "FPM statistic counters\n"
343 JSON_STR)
344{
345 struct json_object *jo;
346
347 jo = json_object_new_object();
348 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
349 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
350 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
351 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
352 json_object_int_add(jo, "connection-closes",
353 gfnc->counters.connection_closes);
354 json_object_int_add(jo, "connection-errors",
355 gfnc->counters.connection_errors);
356 json_object_int_add(jo, "data-plane-contexts",
357 gfnc->counters.dplane_contexts);
ba803a2f
RZ
358 json_object_int_add(jo, "data-plane-contexts-queue",
359 gfnc->counters.ctxqueue_len);
360 json_object_int_add(jo, "data-plane-contexts-queue-peak",
361 gfnc->counters.ctxqueue_len_peak);
6cc059cd 362 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
363 json_object_int_add(jo, "user-configures",
364 gfnc->counters.user_configures);
6cc059cd 365 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
962af8a8 366 vty_json(vty, jo);
6cc059cd
RZ
367
368 return CMD_SUCCESS;
369}
370
3bdd7fca
RZ
371static int fpm_write_config(struct vty *vty)
372{
373 struct sockaddr_in *sin;
374 struct sockaddr_in6 *sin6;
375 int written = 0;
3bdd7fca
RZ
376
377 if (gfnc->disabled)
378 return written;
379
380 switch (gfnc->addr.ss_family) {
381 case AF_INET:
382 written = 1;
383 sin = (struct sockaddr_in *)&gfnc->addr;
a3adec46 384 vty_out(vty, "fpm address %pI4", &sin->sin_addr);
3bdd7fca
RZ
385 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
386 vty_out(vty, " port %d", ntohs(sin->sin_port));
387
388 vty_out(vty, "\n");
389 break;
390 case AF_INET6:
391 written = 1;
392 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
a3adec46 393 vty_out(vty, "fpm address %pI6", &sin6->sin6_addr);
3bdd7fca
RZ
394 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
395 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
396
397 vty_out(vty, "\n");
398 break;
399
400 default:
401 break;
402 }
403
b55ab92a
RZ
404 if (!gfnc->use_nhg) {
405 vty_out(vty, "no fpm use-next-hop-groups\n");
406 written = 1;
407 }
408
3bdd7fca
RZ
409 return written;
410}
411
612c2c15 412static struct cmd_node fpm_node = {
893d8beb
DL
413 .name = "fpm",
414 .node = FPM_NODE,
3bdd7fca 415 .prompt = "",
612c2c15 416 .config_write = fpm_write_config,
3bdd7fca
RZ
417};
418
d35f447d
RZ
419/*
420 * FPM functions.
421 */
cc9f21da 422static void fpm_connect(struct thread *t);
d35f447d
RZ
423
424static void fpm_reconnect(struct fpm_nl_ctx *fnc)
425{
a2032324 426 /* Cancel all zebra threads first. */
f9bf1ecc
DE
427 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
428 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
a2032324
RZ
429 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
430 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
431 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
432 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
433 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
434 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
435
436 /*
437 * Grab the lock to empty the streams (data plane might try to
438 * enqueue updates while we are closing).
439 */
d35f447d
RZ
440 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
441
3bdd7fca
RZ
442 /* Avoid calling close on `-1`. */
443 if (fnc->socket != -1) {
444 close(fnc->socket);
445 fnc->socket = -1;
446 }
447
d35f447d
RZ
448 stream_reset(fnc->ibuf);
449 stream_reset(fnc->obuf);
450 THREAD_OFF(fnc->t_read);
451 THREAD_OFF(fnc->t_write);
018e77bc 452
3bdd7fca
RZ
453 /* FPM is disabled, don't attempt to connect. */
454 if (fnc->disabled)
455 return;
456
d35f447d
RZ
457 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
458 &fnc->t_connect);
459}
460
cc9f21da 461static void fpm_read(struct thread *t)
d35f447d
RZ
462{
463 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
464 ssize_t rv;
465
466 /* Let's ignore the input at the moment. */
467 rv = stream_read_try(fnc->ibuf, fnc->socket,
468 STREAM_WRITEABLE(fnc->ibuf));
e1afb97f
RZ
469 /* We've got an interruption. */
470 if (rv == -2) {
471 /* Schedule next read. */
472 thread_add_read(fnc->fthread->master, fpm_read, fnc,
473 fnc->socket, &fnc->t_read);
cc9f21da 474 return;
e1afb97f 475 }
d35f447d 476 if (rv == 0) {
c871e6c9
RZ
477 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
478 memory_order_relaxed);
e5e444d8
RZ
479
480 if (IS_ZEBRA_DEBUG_FPM)
481 zlog_debug("%s: connection closed", __func__);
482
a2032324 483 FPM_RECONNECT(fnc);
cc9f21da 484 return;
d35f447d
RZ
485 }
486 if (rv == -1) {
c871e6c9
RZ
487 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
488 memory_order_relaxed);
e5e444d8
RZ
489 zlog_warn("%s: connection failure: %s", __func__,
490 strerror(errno));
a2032324 491 FPM_RECONNECT(fnc);
cc9f21da 492 return;
d35f447d
RZ
493 }
494 stream_reset(fnc->ibuf);
495
6cc059cd 496 /* Account all bytes read. */
c871e6c9
RZ
497 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
498 memory_order_relaxed);
6cc059cd 499
d35f447d
RZ
500 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
501 &fnc->t_read);
d35f447d
RZ
502}
503
cc9f21da 504static void fpm_write(struct thread *t)
d35f447d
RZ
505{
506 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
507 socklen_t statuslen;
508 ssize_t bwritten;
509 int rv, status;
510 size_t btotal;
511
512 if (fnc->connecting == true) {
513 status = 0;
514 statuslen = sizeof(status);
515
516 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
517 &statuslen);
518 if (rv == -1 || status != 0) {
519 if (rv != -1)
e5e444d8
RZ
520 zlog_warn("%s: connection failed: %s", __func__,
521 strerror(status));
d35f447d 522 else
e5e444d8
RZ
523 zlog_warn("%s: SO_ERROR failed: %s", __func__,
524 strerror(status));
d35f447d 525
c871e6c9
RZ
526 atomic_fetch_add_explicit(
527 &fnc->counters.connection_errors, 1,
528 memory_order_relaxed);
6cc059cd 529
a2032324 530 FPM_RECONNECT(fnc);
cc9f21da 531 return;
d35f447d
RZ
532 }
533
534 fnc->connecting = false;
018e77bc 535
f584de52
RZ
536 /*
537 * Starting with LSPs walk all FPM objects, marking them
538 * as unsent and then replaying them.
539 */
540 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
541 &fnc->t_lspreset);
542
e1afb97f
RZ
543 /* Permit receiving messages now. */
544 thread_add_read(fnc->fthread->master, fpm_read, fnc,
545 fnc->socket, &fnc->t_read);
d35f447d
RZ
546 }
547
548 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
549
550 while (true) {
551 /* Stream is empty: reset pointers and return. */
552 if (STREAM_READABLE(fnc->obuf) == 0) {
553 stream_reset(fnc->obuf);
554 break;
555 }
556
557 /* Try to write all at once. */
558 btotal = stream_get_endp(fnc->obuf) -
559 stream_get_getp(fnc->obuf);
560 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
561 if (bwritten == 0) {
c871e6c9
RZ
562 atomic_fetch_add_explicit(
563 &fnc->counters.connection_closes, 1,
564 memory_order_relaxed);
e5e444d8
RZ
565
566 if (IS_ZEBRA_DEBUG_FPM)
567 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
568 break;
569 }
570 if (bwritten == -1) {
ad4d1022
RZ
571 /* Attempt to continue if blocked by a signal. */
572 if (errno == EINTR)
573 continue;
574 /* Receiver is probably slow, lets give it some time. */
575 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
576 break;
577
c871e6c9
RZ
578 atomic_fetch_add_explicit(
579 &fnc->counters.connection_errors, 1,
580 memory_order_relaxed);
e5e444d8
RZ
581 zlog_warn("%s: connection failure: %s", __func__,
582 strerror(errno));
a2032324
RZ
583
584 FPM_RECONNECT(fnc);
cc9f21da 585 return;
d35f447d
RZ
586 }
587
6cc059cd 588 /* Account all bytes sent. */
c871e6c9
RZ
589 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
590 memory_order_relaxed);
6cc059cd 591
ad4d1022 592 /* Account number of bytes free. */
c871e6c9
RZ
593 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
594 memory_order_relaxed);
ad4d1022 595
d35f447d
RZ
596 stream_forward_getp(fnc->obuf, (size_t)bwritten);
597 }
598
599 /* Stream is not empty yet, we must schedule more writes. */
600 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 601 stream_pulldown(fnc->obuf);
d35f447d
RZ
602 thread_add_write(fnc->fthread->master, fpm_write, fnc,
603 fnc->socket, &fnc->t_write);
cc9f21da 604 return;
d35f447d 605 }
d35f447d
RZ
606}
607
cc9f21da 608static void fpm_connect(struct thread *t)
d35f447d
RZ
609{
610 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
611 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
612 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
613 socklen_t slen;
d35f447d
RZ
614 int rv, sock;
615 char addrstr[INET6_ADDRSTRLEN];
616
3bdd7fca 617 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 618 if (sock == -1) {
6cc059cd 619 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
620 strerror(errno));
621 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
622 &fnc->t_connect);
cc9f21da 623 return;
d35f447d
RZ
624 }
625
626 set_nonblocking(sock);
627
3bdd7fca
RZ
628 if (fnc->addr.ss_family == AF_INET) {
629 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
630 slen = sizeof(*sin);
631 } else {
632 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
633 slen = sizeof(*sin6);
634 }
d35f447d 635
e5e444d8
RZ
636 if (IS_ZEBRA_DEBUG_FPM)
637 zlog_debug("%s: attempting to connect to %s:%d", __func__,
638 addrstr, ntohs(sin->sin_port));
d35f447d 639
3bdd7fca 640 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 641 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
642 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
643 memory_order_relaxed);
d35f447d
RZ
644 close(sock);
645 zlog_warn("%s: fpm connection failed: %s", __func__,
646 strerror(errno));
647 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
648 &fnc->t_connect);
cc9f21da 649 return;
d35f447d
RZ
650 }
651
652 fnc->connecting = (errno == EINPROGRESS);
653 fnc->socket = sock;
e1afb97f
RZ
654 if (!fnc->connecting)
655 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
656 &fnc->t_read);
d35f447d
RZ
657 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
658 &fnc->t_write);
659
f9bf1ecc
DE
660 /*
661 * Starting with LSPs walk all FPM objects, marking them
662 * as unsent and then replaying them.
f584de52
RZ
663 *
664 * If we are not connected, then delay the objects reset/send.
f9bf1ecc 665 */
f584de52
RZ
666 if (!fnc->connecting)
667 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
668 &fnc->t_lspreset);
d35f447d
RZ
669}
670
671/**
672 * Encode data plane operation context into netlink and enqueue it in the FPM
673 * output buffer.
674 *
675 * @param fnc the netlink FPM context.
676 * @param ctx the data plane operation context data.
677 * @return 0 on success or -1 on not enough space.
678 */
679static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
680{
681 uint8_t nl_buf[NL_PKT_BUF_SIZE];
682 size_t nl_buf_len;
683 ssize_t rv;
edfeff42 684 uint64_t obytes, obytes_peak;
b55ab92a
RZ
685 enum dplane_op_e op = dplane_ctx_get_op(ctx);
686
687 /*
688 * If we were configured to not use next hop groups, then quit as soon
689 * as possible.
690 */
691 if ((!fnc->use_nhg)
692 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
693 || op == DPLANE_OP_NH_UPDATE))
694 return 0;
d35f447d
RZ
695
696 nl_buf_len = 0;
697
698 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
699
b55ab92a 700 switch (op) {
d35f447d
RZ
701 case DPLANE_OP_ROUTE_UPDATE:
702 case DPLANE_OP_ROUTE_DELETE:
0be6e7d7
JU
703 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
704 nl_buf, sizeof(nl_buf),
705 true, fnc->use_nhg);
d35f447d 706 if (rv <= 0) {
0be6e7d7
JU
707 zlog_err(
708 "%s: netlink_route_multipath_msg_encode failed",
709 __func__);
d35f447d
RZ
710 return 0;
711 }
712
713 nl_buf_len = (size_t)rv;
d35f447d
RZ
714
715 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 716 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
717 break;
718
719 /* FALL THROUGH */
720 case DPLANE_OP_ROUTE_INSTALL:
0be6e7d7 721 rv = netlink_route_multipath_msg_encode(
b55ab92a
RZ
722 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
723 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 724 if (rv <= 0) {
0be6e7d7
JU
725 zlog_err(
726 "%s: netlink_route_multipath_msg_encode failed",
727 __func__);
d35f447d
RZ
728 return 0;
729 }
730
731 nl_buf_len += (size_t)rv;
d35f447d
RZ
732 break;
733
bda10adf
RZ
734 case DPLANE_OP_MAC_INSTALL:
735 case DPLANE_OP_MAC_DELETE:
736 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
737 if (rv <= 0) {
e5e444d8
RZ
738 zlog_err("%s: netlink_macfdb_update_ctx failed",
739 __func__);
bda10adf
RZ
740 return 0;
741 }
742
743 nl_buf_len = (size_t)rv;
bda10adf
RZ
744 break;
745
e9a1cd93 746 case DPLANE_OP_NH_DELETE:
0be6e7d7
JU
747 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
748 sizeof(nl_buf));
e9a1cd93 749 if (rv <= 0) {
0be6e7d7
JU
750 zlog_err("%s: netlink_nexthop_msg_encode failed",
751 __func__);
e9a1cd93
RZ
752 return 0;
753 }
754
755 nl_buf_len = (size_t)rv;
756 break;
d35f447d
RZ
757 case DPLANE_OP_NH_INSTALL:
758 case DPLANE_OP_NH_UPDATE:
0be6e7d7
JU
759 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
760 sizeof(nl_buf));
e9a1cd93 761 if (rv <= 0) {
0be6e7d7
JU
762 zlog_err("%s: netlink_nexthop_msg_encode failed",
763 __func__);
e9a1cd93
RZ
764 return 0;
765 }
766
767 nl_buf_len = (size_t)rv;
768 break;
769
d35f447d
RZ
770 case DPLANE_OP_LSP_INSTALL:
771 case DPLANE_OP_LSP_UPDATE:
772 case DPLANE_OP_LSP_DELETE:
b300c8bb
DE
773 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
774 if (rv <= 0) {
f9bf1ecc
DE
775 zlog_err("%s: netlink_lsp_msg_encoder failed",
776 __func__);
b300c8bb
DE
777 return 0;
778 }
779
780 nl_buf_len += (size_t)rv;
781 break;
782
d4bcd88d 783 /* Un-handled by FPM at this time. */
d35f447d
RZ
784 case DPLANE_OP_PW_INSTALL:
785 case DPLANE_OP_PW_UNINSTALL:
786 case DPLANE_OP_ADDR_INSTALL:
787 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
788 case DPLANE_OP_NEIGH_INSTALL:
789 case DPLANE_OP_NEIGH_UPDATE:
790 case DPLANE_OP_NEIGH_DELETE:
791 case DPLANE_OP_VTEP_ADD:
792 case DPLANE_OP_VTEP_DELETE:
793 case DPLANE_OP_SYS_ROUTE_ADD:
794 case DPLANE_OP_SYS_ROUTE_DELETE:
795 case DPLANE_OP_ROUTE_NOTIFY:
796 case DPLANE_OP_LSP_NOTIFY:
d4bcd88d
MS
797 case DPLANE_OP_RULE_ADD:
798 case DPLANE_OP_RULE_DELETE:
799 case DPLANE_OP_RULE_UPDATE:
800 case DPLANE_OP_NEIGH_DISCOVER:
801 case DPLANE_OP_BR_PORT_UPDATE:
802 case DPLANE_OP_IPTABLE_ADD:
803 case DPLANE_OP_IPTABLE_DELETE:
804 case DPLANE_OP_IPSET_ADD:
805 case DPLANE_OP_IPSET_DELETE:
806 case DPLANE_OP_IPSET_ENTRY_ADD:
807 case DPLANE_OP_IPSET_ENTRY_DELETE:
808 case DPLANE_OP_NEIGH_IP_INSTALL:
809 case DPLANE_OP_NEIGH_IP_DELETE:
810 case DPLANE_OP_NEIGH_TABLE_UPDATE:
811 case DPLANE_OP_GRE_SET:
812 case DPLANE_OP_INTF_ADDR_ADD:
813 case DPLANE_OP_INTF_ADDR_DEL:
d35f447d
RZ
814 case DPLANE_OP_NONE:
815 break;
816
d35f447d
RZ
817 }
818
819 /* Skip empty enqueues. */
820 if (nl_buf_len == 0)
821 return 0;
822
a179ba35
RZ
823 /* We must know if someday a message goes beyond 65KiB. */
824 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
825
826 /* Check if we have enough buffer space. */
827 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
828 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
829 memory_order_relaxed);
e5e444d8
RZ
830
831 if (IS_ZEBRA_DEBUG_FPM)
832 zlog_debug(
833 "%s: buffer full: wants to write %zu but has %zu",
834 __func__, nl_buf_len + FPM_HEADER_SIZE,
835 STREAM_WRITEABLE(fnc->obuf));
836
a179ba35
RZ
837 return -1;
838 }
839
d35f447d 840 /*
a179ba35
RZ
841 * Fill in the FPM header information.
842 *
843 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
844 */
845 stream_putc(fnc->obuf, 1);
846 stream_putc(fnc->obuf, 1);
a179ba35 847 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
848
849 /* Write current data. */
850 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
851
ad4d1022 852 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
853 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
854 nl_buf_len + FPM_HEADER_SIZE,
855 memory_order_relaxed);
edfeff42
RZ
856 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
857 memory_order_relaxed);
858 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
859 memory_order_relaxed);
860 if (obytes_peak < obytes)
c871e6c9
RZ
861 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
862 memory_order_relaxed);
ad4d1022 863
d35f447d
RZ
864 /* Tell the thread to start writing. */
865 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
866 &fnc->t_write);
867
868 return 0;
869}
870
f9bf1ecc
DE
871/*
872 * LSP walk/send functions
873 */
874struct fpm_lsp_arg {
875 struct zebra_dplane_ctx *ctx;
876 struct fpm_nl_ctx *fnc;
877 bool complete;
878};
879
880static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
881{
8f74a383 882 struct zebra_lsp *lsp = bucket->data;
f9bf1ecc
DE
883 struct fpm_lsp_arg *fla = arg;
884
885 /* Skip entries which have already been sent */
886 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
887 return HASHWALK_CONTINUE;
888
889 dplane_ctx_reset(fla->ctx);
890 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
891
892 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
893 fla->complete = false;
894 return HASHWALK_ABORT;
895 }
896
897 /* Mark entry as sent */
898 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
899 return HASHWALK_CONTINUE;
900}
901
cc9f21da 902static void fpm_lsp_send(struct thread *t)
f9bf1ecc
DE
903{
904 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
905 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
906 struct fpm_lsp_arg fla;
907
908 fla.fnc = fnc;
909 fla.ctx = dplane_ctx_alloc();
910 fla.complete = true;
911
912 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
913
914 dplane_ctx_fini(&fla.ctx);
915
916 if (fla.complete) {
917 WALK_FINISH(fnc, FNE_LSP_FINISHED);
918
919 /* Now move onto routes */
1f9193c1
RZ
920 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
921 &fnc->t_nhgreset);
f9bf1ecc
DE
922 } else {
923 /* Didn't finish - reschedule LSP walk */
924 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
925 &fnc->t_lspwalk);
926 }
f9bf1ecc
DE
927}
928
981ca597
RZ
929/*
930 * Next hop walk/send functions.
931 */
932struct fpm_nhg_arg {
933 struct zebra_dplane_ctx *ctx;
934 struct fpm_nl_ctx *fnc;
935 bool complete;
936};
937
938static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
939{
940 struct nhg_hash_entry *nhe = bucket->data;
941 struct fpm_nhg_arg *fna = arg;
942
943 /* This entry was already sent, skip it. */
944 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
945 return HASHWALK_CONTINUE;
946
947 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
948 dplane_ctx_reset(fna->ctx);
949 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
950 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
951 /* Our buffers are full, lets give it some cycles. */
952 fna->complete = false;
953 return HASHWALK_ABORT;
954 }
955
956 /* Mark group as sent, so it doesn't get sent again. */
957 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
958
959 return HASHWALK_CONTINUE;
960}
961
cc9f21da 962static void fpm_nhg_send(struct thread *t)
981ca597
RZ
963{
964 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
965 struct fpm_nhg_arg fna;
966
967 fna.fnc = fnc;
968 fna.ctx = dplane_ctx_alloc();
969 fna.complete = true;
970
971 /* Send next hops. */
1f9193c1
RZ
972 if (fnc->use_nhg)
973 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
981ca597
RZ
974
975 /* `free()` allocated memory. */
976 dplane_ctx_fini(&fna.ctx);
977
978 /* We are done sending next hops, lets install the routes now. */
55eb9d4d
RZ
979 if (fna.complete) {
980 WALK_FINISH(fnc, FNE_NHG_FINISHED);
e41e0f81
RZ
981 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
982 &fnc->t_ribreset);
55eb9d4d 983 } else /* Otherwise reschedule next hop group again. */
981ca597
RZ
984 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
985 &fnc->t_nhgwalk);
981ca597
RZ
986}
987
018e77bc
RZ
988/**
989 * Send all RIB installed routes to the connected data plane.
990 */
cc9f21da 991static void fpm_rib_send(struct thread *t)
018e77bc
RZ
992{
993 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
994 rib_dest_t *dest;
995 struct route_node *rn;
996 struct route_table *rt;
997 struct zebra_dplane_ctx *ctx;
998 rib_tables_iter_t rt_iter;
999
1000 /* Allocate temporary context for all transactions. */
1001 ctx = dplane_ctx_alloc();
1002
1003 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1004 while ((rt = rib_tables_iter_next(&rt_iter))) {
1005 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1006 dest = rib_dest_from_rnode(rn);
1007 /* Skip bad route entries. */
a50404aa 1008 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 1009 continue;
018e77bc
RZ
1010
1011 /* Check for already sent routes. */
a50404aa 1012 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 1013 continue;
018e77bc
RZ
1014
1015 /* Enqueue route install. */
1016 dplane_ctx_reset(ctx);
1017 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1018 dest->selected_fib);
1019 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1020 /* Free the temporary allocated context. */
1021 dplane_ctx_fini(&ctx);
1022
018e77bc
RZ
1023 thread_add_timer(zrouter.master, fpm_rib_send,
1024 fnc, 1, &fnc->t_ribwalk);
cc9f21da 1025 return;
018e77bc
RZ
1026 }
1027
1028 /* Mark as sent. */
1029 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1030 }
1031 }
1032
1033 /* Free the temporary allocated context. */
1034 dplane_ctx_fini(&ctx);
1035
1036 /* All RIB routes sent! */
55eb9d4d 1037 WALK_FINISH(fnc, FNE_RIB_FINISHED);
018e77bc 1038
e41e0f81
RZ
1039 /* Schedule next event: RMAC reset. */
1040 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1041 &fnc->t_rmacreset);
018e77bc
RZ
1042}
1043
bda10adf
RZ
1044/*
1045 * The next three functions will handle RMAC enqueue.
1046 */
1047struct fpm_rmac_arg {
1048 struct zebra_dplane_ctx *ctx;
1049 struct fpm_nl_ctx *fnc;
05843a27 1050 struct zebra_l3vni *zl3vni;
55eb9d4d 1051 bool complete;
bda10adf
RZ
1052};
1053
1ac88792 1054static void fpm_enqueue_rmac_table(struct hash_bucket *bucket, void *arg)
bda10adf
RZ
1055{
1056 struct fpm_rmac_arg *fra = arg;
3198b2b3 1057 struct zebra_mac *zrmac = bucket->data;
bda10adf
RZ
1058 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1059 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1060 struct zebra_if *br_zif;
1061 vlanid_t vid;
1062 bool sticky;
1063
1064 /* Entry already sent. */
55eb9d4d 1065 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
bda10adf
RZ
1066 return;
1067
1068 sticky = !!CHECK_FLAG(zrmac->flags,
1069 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1070 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1071 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1072
1073 dplane_ctx_reset(fra->ctx);
1074 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1075 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a 1076 zif->brslave_info.br_if, vid,
f188e68e
AK
1077 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky,
1078 0 /*nhg*/, 0 /*update_flags*/);
bda10adf 1079 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1080 thread_add_timer(zrouter.master, fpm_rmac_send,
1081 fra->fnc, 1, &fra->fnc->t_rmacwalk);
55eb9d4d 1082 fra->complete = false;
bda10adf
RZ
1083 }
1084}
1085
1ac88792 1086static void fpm_enqueue_l3vni_table(struct hash_bucket *bucket, void *arg)
bda10adf
RZ
1087{
1088 struct fpm_rmac_arg *fra = arg;
05843a27 1089 struct zebra_l3vni *zl3vni = bucket->data;
bda10adf
RZ
1090
1091 fra->zl3vni = zl3vni;
1092 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1093}
1094
cc9f21da 1095static void fpm_rmac_send(struct thread *t)
bda10adf
RZ
1096{
1097 struct fpm_rmac_arg fra;
1098
1099 fra.fnc = THREAD_ARG(t);
1100 fra.ctx = dplane_ctx_alloc();
55eb9d4d 1101 fra.complete = true;
bda10adf
RZ
1102 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1103 dplane_ctx_fini(&fra.ctx);
1104
55eb9d4d
RZ
1105 /* RMAC walk completed. */
1106 if (fra.complete)
1107 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
bda10adf
RZ
1108}
1109
981ca597
RZ
1110/*
1111 * Resets the next hop FPM flags so we send all next hops again.
1112 */
1113static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1114{
1115 struct nhg_hash_entry *nhe = bucket->data;
1116
1117 /* Unset FPM installation flag so it gets installed again. */
1118 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1119}
1120
cc9f21da 1121static void fpm_nhg_reset(struct thread *t)
981ca597 1122{
55eb9d4d
RZ
1123 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1124
981ca597 1125 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
e41e0f81
RZ
1126
1127 /* Schedule next step: send next hop groups. */
1128 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
981ca597
RZ
1129}
1130
f9bf1ecc
DE
1131/*
1132 * Resets the LSP FPM flag so we send all LSPs again.
1133 */
1134static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1135{
8f74a383 1136 struct zebra_lsp *lsp = bucket->data;
f9bf1ecc
DE
1137
1138 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1139}
1140
cc9f21da 1141static void fpm_lsp_reset(struct thread *t)
f9bf1ecc
DE
1142{
1143 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1144 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1145
1146 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1147
1148 /* Schedule next step: send LSPs */
1149 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
f9bf1ecc
DE
1150}
1151
018e77bc
RZ
1152/**
1153 * Resets the RIB FPM flags so we send all routes again.
1154 */
cc9f21da 1155static void fpm_rib_reset(struct thread *t)
018e77bc
RZ
1156{
1157 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1158 rib_dest_t *dest;
1159 struct route_node *rn;
1160 struct route_table *rt;
1161 rib_tables_iter_t rt_iter;
1162
018e77bc
RZ
1163 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1164 while ((rt = rib_tables_iter_next(&rt_iter))) {
1165 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1166 dest = rib_dest_from_rnode(rn);
1167 /* Skip bad route entries. */
1168 if (dest == NULL)
1169 continue;
1170
1171 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1172 }
1173 }
1174
e41e0f81
RZ
1175 /* Schedule next step: send RIB routes. */
1176 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
018e77bc
RZ
1177}
1178
bda10adf
RZ
1179/*
1180 * The next three function will handle RMAC table reset.
1181 */
1ac88792 1182static void fpm_unset_rmac_table(struct hash_bucket *bucket, void *arg)
bda10adf 1183{
3198b2b3 1184 struct zebra_mac *zrmac = bucket->data;
bda10adf
RZ
1185
1186 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1187}
1188
1ac88792 1189static void fpm_unset_l3vni_table(struct hash_bucket *bucket, void *arg)
bda10adf 1190{
05843a27 1191 struct zebra_l3vni *zl3vni = bucket->data;
bda10adf
RZ
1192
1193 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1194}
1195
cc9f21da 1196static void fpm_rmac_reset(struct thread *t)
bda10adf 1197{
55eb9d4d
RZ
1198 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1199
bda10adf
RZ
1200 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1201
e41e0f81
RZ
1202 /* Schedule next event: send RMAC entries. */
1203 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1204 &fnc->t_rmacwalk);
bda10adf
RZ
1205}
1206
cc9f21da 1207static void fpm_process_queue(struct thread *t)
ba803a2f
RZ
1208{
1209 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1210 struct zebra_dplane_ctx *ctx;
3f2b998f 1211 bool no_bufs = false;
438dd3e7 1212 uint64_t processed_contexts = 0;
ba803a2f 1213
ba803a2f
RZ
1214 while (true) {
1215 /* No space available yet. */
3f2b998f
DE
1216 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) {
1217 no_bufs = true;
ba803a2f 1218 break;
3f2b998f 1219 }
ba803a2f
RZ
1220
1221 /* Dequeue next item or quit processing. */
dc693fe0
DE
1222 frr_with_mutex (&fnc->ctxqueue_mutex) {
1223 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1224 }
ba803a2f
RZ
1225 if (ctx == NULL)
1226 break;
1227
3a150188
DS
1228 /*
1229 * Intentionally ignoring the return value
1230 * as that we are ensuring that we can write to
1231 * the output data in the STREAM_WRITEABLE
1232 * check above, so we can ignore the return
1233 */
1234 (void)fpm_nl_enqueue(fnc, ctx);
ba803a2f
RZ
1235
1236 /* Account the processed entries. */
438dd3e7 1237 processed_contexts++;
c871e6c9
RZ
1238 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1239 memory_order_relaxed);
ba803a2f
RZ
1240
1241 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1242 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1243 }
1244
438dd3e7
DE
1245 /* Update count of processed contexts */
1246 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts,
1247 processed_contexts, memory_order_relaxed);
1248
3f2b998f
DE
1249 /* Re-schedule if we ran out of buffer space */
1250 if (no_bufs)
ba803a2f
RZ
1251 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1252 fnc, 0, &fnc->t_dequeue);
1253
164d8e86
DE
1254 /*
1255 * Let the dataplane thread know if there are items in the
1256 * output queue to be processed. Otherwise they may sit
1257 * until the dataplane thread gets scheduled for new,
1258 * unrelated work.
1259 */
1260 if (dplane_provider_out_ctx_queue_len(fnc->prov) > 0)
1261 dplane_provider_work_ready();
ba803a2f
RZ
1262}
1263
3bdd7fca
RZ
1264/**
1265 * Handles external (e.g. CLI, data plane or others) events.
1266 */
cc9f21da 1267static void fpm_process_event(struct thread *t)
3bdd7fca
RZ
1268{
1269 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1270 int event = THREAD_VAL(t);
1271
1272 switch (event) {
1273 case FNE_DISABLE:
e5e444d8 1274 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1275 fnc->disabled = true;
c871e6c9
RZ
1276 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1277 memory_order_relaxed);
3bdd7fca
RZ
1278
1279 /* Call reconnect to disable timers and clean up context. */
1280 fpm_reconnect(fnc);
1281 break;
1282
1283 case FNE_RECONNECT:
e5e444d8 1284 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1285 fnc->disabled = false;
c871e6c9
RZ
1286 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1287 memory_order_relaxed);
3bdd7fca
RZ
1288 fpm_reconnect(fnc);
1289 break;
1290
6cc059cd 1291 case FNE_RESET_COUNTERS:
e5e444d8 1292 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1293 memset(&fnc->counters, 0, sizeof(fnc->counters));
1294 break;
1295
b55ab92a
RZ
1296 case FNE_TOGGLE_NHG:
1297 zlog_info("%s: toggle next hop groups support", __func__);
1298 fnc->use_nhg = !fnc->use_nhg;
1299 fpm_reconnect(fnc);
1300 break;
1301
a2032324
RZ
1302 case FNE_INTERNAL_RECONNECT:
1303 fpm_reconnect(fnc);
1304 break;
1305
55eb9d4d
RZ
1306 case FNE_NHG_FINISHED:
1307 if (IS_ZEBRA_DEBUG_FPM)
1308 zlog_debug("%s: next hop groups walk finished",
1309 __func__);
55eb9d4d
RZ
1310 break;
1311 case FNE_RIB_FINISHED:
1312 if (IS_ZEBRA_DEBUG_FPM)
1313 zlog_debug("%s: RIB walk finished", __func__);
55eb9d4d
RZ
1314 break;
1315 case FNE_RMAC_FINISHED:
1316 if (IS_ZEBRA_DEBUG_FPM)
1317 zlog_debug("%s: RMAC walk finished", __func__);
55eb9d4d 1318 break;
f9bf1ecc
DE
1319 case FNE_LSP_FINISHED:
1320 if (IS_ZEBRA_DEBUG_FPM)
1321 zlog_debug("%s: LSP walk finished", __func__);
1322 break;
55eb9d4d 1323
3bdd7fca 1324 default:
e5e444d8
RZ
1325 if (IS_ZEBRA_DEBUG_FPM)
1326 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1327 break;
1328 }
3bdd7fca
RZ
1329}
1330
d35f447d
RZ
1331/*
1332 * Data plane functions.
1333 */
1334static int fpm_nl_start(struct zebra_dplane_provider *prov)
1335{
1336 struct fpm_nl_ctx *fnc;
1337
1338 fnc = dplane_provider_get_data(prov);
1339 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1340 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1341 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1342 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1343 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1344 fnc->socket = -1;
3bdd7fca 1345 fnc->disabled = true;
ba803a2f
RZ
1346 fnc->prov = prov;
1347 TAILQ_INIT(&fnc->ctxqueue);
1348 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1349
b55ab92a
RZ
1350 /* Set default values. */
1351 fnc->use_nhg = true;
1352
d35f447d
RZ
1353 return 0;
1354}
1355
98a87504 1356static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1357{
98a87504 1358 /* Disable all events and close socket. */
f9bf1ecc
DE
1359 THREAD_OFF(fnc->t_lspreset);
1360 THREAD_OFF(fnc->t_lspwalk);
981ca597
RZ
1361 THREAD_OFF(fnc->t_nhgreset);
1362 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1363 THREAD_OFF(fnc->t_ribreset);
1364 THREAD_OFF(fnc->t_ribwalk);
1365 THREAD_OFF(fnc->t_rmacreset);
1366 THREAD_OFF(fnc->t_rmacwalk);
1367 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1368 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1369 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1370
98a87504
RZ
1371 if (fnc->socket != -1) {
1372 close(fnc->socket);
1373 fnc->socket = -1;
1374 }
1375
1376 return 0;
1377}
1378
1379static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1380{
1381 /* Stop the running thread. */
1382 frr_pthread_stop(fnc->fthread, NULL);
1383
1384 /* Free all allocated resources. */
1385 pthread_mutex_destroy(&fnc->obuf_mutex);
1386 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1387 stream_free(fnc->ibuf);
1388 stream_free(fnc->obuf);
98a87504
RZ
1389 free(gfnc);
1390 gfnc = NULL;
d35f447d
RZ
1391
1392 return 0;
1393}
1394
98a87504
RZ
1395static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1396{
1397 struct fpm_nl_ctx *fnc;
1398
1399 fnc = dplane_provider_get_data(prov);
1400 if (early)
1401 return fpm_nl_finish_early(fnc);
1402
1403 return fpm_nl_finish_late(fnc);
1404}
1405
d35f447d
RZ
1406static int fpm_nl_process(struct zebra_dplane_provider *prov)
1407{
1408 struct zebra_dplane_ctx *ctx;
1409 struct fpm_nl_ctx *fnc;
1410 int counter, limit;
bf2f7839 1411 uint64_t cur_queue, peak_queue = 0, stored_peak_queue;
d35f447d
RZ
1412
1413 fnc = dplane_provider_get_data(prov);
1414 limit = dplane_provider_get_work_limit(prov);
1415 for (counter = 0; counter < limit; counter++) {
1416 ctx = dplane_provider_dequeue_in_ctx(prov);
1417 if (ctx == NULL)
1418 break;
1419
1420 /*
1421 * Skip all notifications if not connected, we'll walk the RIB
1422 * anyway.
1423 */
6cc059cd 1424 if (fnc->socket != -1 && fnc->connecting == false) {
dc693fe0
DE
1425 /*
1426 * Update the number of queued contexts *before*
1427 * enqueueing, to ensure counter consistency.
1428 */
c871e6c9
RZ
1429 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1430 1, memory_order_relaxed);
dc693fe0
DE
1431
1432 frr_with_mutex (&fnc->ctxqueue_mutex) {
1433 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1434 }
1435
c871e6c9
RZ
1436 cur_queue = atomic_load_explicit(
1437 &fnc->counters.ctxqueue_len,
1438 memory_order_relaxed);
edfeff42 1439 if (peak_queue < cur_queue)
bf2f7839 1440 peak_queue = cur_queue;
ba803a2f 1441 continue;
6cc059cd
RZ
1442 }
1443
d35f447d
RZ
1444 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1445 dplane_provider_enqueue_out_ctx(prov, ctx);
1446 }
1447
bf2f7839
DE
1448 /* Update peak queue length, if we just observed a new peak */
1449 stored_peak_queue = atomic_load_explicit(
1450 &fnc->counters.ctxqueue_len_peak, memory_order_relaxed);
1451 if (stored_peak_queue < peak_queue)
1452 atomic_store_explicit(&fnc->counters.ctxqueue_len_peak,
1453 peak_queue, memory_order_relaxed);
1454
c871e6c9
RZ
1455 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1456 memory_order_relaxed)
1457 > 0)
ba803a2f
RZ
1458 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1459 fnc, 0, &fnc->t_dequeue);
1460
b677907c
DE
1461 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1462 if (counter >= limit)
1463 dplane_provider_work_ready();
1464
d35f447d
RZ
1465 return 0;
1466}
1467
1468static int fpm_nl_new(struct thread_master *tm)
1469{
1470 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1471 int rv;
1472
3bdd7fca 1473 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1474 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1475 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1476 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1477 &prov);
1478
1479 if (IS_ZEBRA_DEBUG_DPLANE)
1480 zlog_debug("%s register status: %d", prov_name, rv);
1481
612c2c15 1482 install_node(&fpm_node);
6cc059cd
RZ
1483 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1484 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1485 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1486 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1487 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1488 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1489 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1490
d35f447d
RZ
1491 return 0;
1492}
1493
1494static int fpm_nl_init(void)
1495{
1496 hook_register(frr_late_init, fpm_nl_new);
1497 return 0;
1498}
1499
1500FRR_MODULE_SETUP(
1501 .name = "dplane_fpm_nl",
1502 .version = "0.0.1",
1503 .description = "Data plane plugin for FPM using netlink.",
1504 .init = fpm_nl_init,
80413c20 1505);