]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
zebra: Add ctx to netlink message parsing
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
b300c8bb 46#include "zebra/zebra_mpls.h"
018e77bc 47#include "zebra/zebra_router.h"
b2998086
PR
48#include "zebra/zebra_evpn.h"
49#include "zebra/zebra_evpn_mac.h"
bda10adf 50#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
51#include "zebra/kernel_netlink.h"
52#include "zebra/rt_netlink.h"
53#include "zebra/debug.h"
54
55#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
56#define SOUTHBOUND_DEFAULT_PORT 2620
57
a179ba35
RZ
58/**
59 * FPM header:
60 * {
61 * version: 1 byte (always 1),
62 * type: 1 byte (1 for netlink, 2 protobuf),
63 * len: 2 bytes (network order),
64 * }
65 *
66 * This header is used with any format to tell the users how many bytes to
67 * expect.
68 */
69#define FPM_HEADER_SIZE 4
70
d35f447d
RZ
71static const char *prov_name = "dplane_fpm_nl";
72
73struct fpm_nl_ctx {
74 /* data plane connection. */
75 int socket;
3bdd7fca 76 bool disabled;
d35f447d 77 bool connecting;
b55ab92a 78 bool use_nhg;
d35f447d
RZ
79 struct sockaddr_storage addr;
80
81 /* data plane buffers. */
82 struct stream *ibuf;
83 struct stream *obuf;
84 pthread_mutex_t obuf_mutex;
85
ba803a2f
RZ
86 /*
87 * data plane context queue:
88 * When a FPM server connection becomes a bottleneck, we must keep the
89 * data plane contexts until we get a chance to process them.
90 */
91 struct dplane_ctx_q ctxqueue;
92 pthread_mutex_t ctxqueue_mutex;
93
d35f447d 94 /* data plane events. */
ba803a2f 95 struct zebra_dplane_provider *prov;
d35f447d
RZ
96 struct frr_pthread *fthread;
97 struct thread *t_connect;
98 struct thread *t_read;
99 struct thread *t_write;
3bdd7fca 100 struct thread *t_event;
551fa8c3 101 struct thread *t_nhg;
ba803a2f 102 struct thread *t_dequeue;
018e77bc
RZ
103
104 /* zebra events. */
f9bf1ecc
DE
105 struct thread *t_lspreset;
106 struct thread *t_lspwalk;
981ca597
RZ
107 struct thread *t_nhgreset;
108 struct thread *t_nhgwalk;
018e77bc
RZ
109 struct thread *t_ribreset;
110 struct thread *t_ribwalk;
bda10adf
RZ
111 struct thread *t_rmacreset;
112 struct thread *t_rmacwalk;
6cc059cd
RZ
113
114 /* Statistic counters. */
115 struct {
116 /* Amount of bytes read into ibuf. */
770a8d28 117 _Atomic uint32_t bytes_read;
6cc059cd 118 /* Amount of bytes written from obuf. */
770a8d28 119 _Atomic uint32_t bytes_sent;
ad4d1022 120 /* Output buffer current usage. */
770a8d28 121 _Atomic uint32_t obuf_bytes;
ad4d1022 122 /* Output buffer peak usage. */
770a8d28 123 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
124
125 /* Amount of connection closes. */
770a8d28 126 _Atomic uint32_t connection_closes;
6cc059cd 127 /* Amount of connection errors. */
770a8d28 128 _Atomic uint32_t connection_errors;
6cc059cd
RZ
129
130 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 131 _Atomic uint32_t user_configures;
6cc059cd 132 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 133 _Atomic uint32_t user_disables;
6cc059cd
RZ
134
135 /* Amount of data plane context processed. */
770a8d28 136 _Atomic uint32_t dplane_contexts;
ba803a2f 137 /* Amount of data plane contexts enqueued. */
770a8d28 138 _Atomic uint32_t ctxqueue_len;
ba803a2f 139 /* Peak amount of data plane contexts enqueued. */
770a8d28 140 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
141
142 /* Amount of buffer full events. */
770a8d28 143 _Atomic uint32_t buffer_full;
6cc059cd 144 } counters;
770a8d28 145} *gfnc;
3bdd7fca
RZ
146
147enum fpm_nl_events {
148 /* Ask for FPM to reconnect the external server. */
149 FNE_RECONNECT,
150 /* Disable FPM. */
151 FNE_DISABLE,
6cc059cd
RZ
152 /* Reset counters. */
153 FNE_RESET_COUNTERS,
b55ab92a
RZ
154 /* Toggle next hop group feature. */
155 FNE_TOGGLE_NHG,
a2032324
RZ
156 /* Reconnect request by our own code to avoid races. */
157 FNE_INTERNAL_RECONNECT,
55eb9d4d 158
f9bf1ecc
DE
159 /* LSP walk finished. */
160 FNE_LSP_FINISHED,
55eb9d4d
RZ
161 /* Next hop groups walk finished. */
162 FNE_NHG_FINISHED,
163 /* RIB walk finished. */
164 FNE_RIB_FINISHED,
165 /* RMAC walk finished. */
166 FNE_RMAC_FINISHED,
d35f447d
RZ
167};
168
a2032324
RZ
169#define FPM_RECONNECT(fnc) \
170 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
171 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
172
55eb9d4d
RZ
173#define WALK_FINISH(fnc, ev) \
174 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
175 (ev), NULL)
176
018e77bc
RZ
177/*
178 * Prototypes.
179 */
cc9f21da 180static void fpm_process_event(struct thread *t);
018e77bc 181static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
cc9f21da
DS
182static void fpm_lsp_send(struct thread *t);
183static void fpm_lsp_reset(struct thread *t);
184static void fpm_nhg_send(struct thread *t);
185static void fpm_nhg_reset(struct thread *t);
186static void fpm_rib_send(struct thread *t);
187static void fpm_rib_reset(struct thread *t);
188static void fpm_rmac_send(struct thread *t);
189static void fpm_rmac_reset(struct thread *t);
018e77bc 190
3bdd7fca
RZ
191/*
192 * CLI.
193 */
6cc059cd
RZ
194#define FPM_STR "Forwarding Plane Manager configuration\n"
195
3bdd7fca
RZ
196DEFUN(fpm_set_address, fpm_set_address_cmd,
197 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 198 FPM_STR
3bdd7fca
RZ
199 "FPM remote listening server address\n"
200 "Remote IPv4 FPM server\n"
201 "Remote IPv6 FPM server\n"
202 "FPM remote listening server port\n"
203 "Remote FPM server port\n")
204{
205 struct sockaddr_in *sin;
206 struct sockaddr_in6 *sin6;
207 uint16_t port = 0;
208 uint8_t naddr[INET6_BUFSIZ];
209
210 if (argc == 5)
211 port = strtol(argv[4]->arg, NULL, 10);
212
213 /* Handle IPv4 addresses. */
214 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
215 sin = (struct sockaddr_in *)&gfnc->addr;
216
217 memset(sin, 0, sizeof(*sin));
218 sin->sin_family = AF_INET;
219 sin->sin_port =
220 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
221#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
222 sin->sin_len = sizeof(*sin);
223#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
224 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
225
226 goto ask_reconnect;
227 }
228
229 /* Handle IPv6 addresses. */
230 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
231 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
232 return CMD_WARNING;
233 }
234
235 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
236 memset(sin6, 0, sizeof(*sin6));
237 sin6->sin6_family = AF_INET6;
238 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
239#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
240 sin6->sin6_len = sizeof(*sin6);
241#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
242 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
243
244ask_reconnect:
245 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
246 FNE_RECONNECT, &gfnc->t_event);
247 return CMD_SUCCESS;
248}
249
250DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
251 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
252 NO_STR
6cc059cd 253 FPM_STR
3bdd7fca
RZ
254 "FPM remote listening server address\n"
255 "Remote IPv4 FPM server\n"
256 "Remote IPv6 FPM server\n"
257 "FPM remote listening server port\n"
258 "Remote FPM server port\n")
259{
260 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
261 FNE_DISABLE, &gfnc->t_event);
262 return CMD_SUCCESS;
263}
264
b55ab92a
RZ
265DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
266 "fpm use-next-hop-groups",
267 FPM_STR
268 "Use netlink next hop groups feature.\n")
269{
270 /* Already enabled. */
271 if (gfnc->use_nhg)
272 return CMD_SUCCESS;
273
274 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
551fa8c3 275 FNE_TOGGLE_NHG, &gfnc->t_nhg);
b55ab92a
RZ
276
277 return CMD_SUCCESS;
278}
279
280DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
281 "no fpm use-next-hop-groups",
282 NO_STR
283 FPM_STR
284 "Use netlink next hop groups feature.\n")
285{
286 /* Already disabled. */
287 if (!gfnc->use_nhg)
288 return CMD_SUCCESS;
289
290 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
551fa8c3 291 FNE_TOGGLE_NHG, &gfnc->t_nhg);
b55ab92a
RZ
292
293 return CMD_SUCCESS;
294}
295
6cc059cd
RZ
296DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
297 "clear fpm counters",
298 CLEAR_STR
299 FPM_STR
300 "FPM statistic counters\n")
301{
302 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
303 FNE_RESET_COUNTERS, &gfnc->t_event);
304 return CMD_SUCCESS;
305}
306
307DEFUN(fpm_show_counters, fpm_show_counters_cmd,
308 "show fpm counters",
309 SHOW_STR
310 FPM_STR
311 "FPM statistic counters\n")
312{
313 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
314
315#define SHOW_COUNTER(label, counter) \
770a8d28 316 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
317
318 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
319 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
320 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
321 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
322 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
323 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
324 SHOW_COUNTER("Data plane items processed",
325 gfnc->counters.dplane_contexts);
ba803a2f
RZ
326 SHOW_COUNTER("Data plane items enqueued",
327 gfnc->counters.ctxqueue_len);
328 SHOW_COUNTER("Data plane items queue peak",
329 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
330 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
331 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
332 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
333
334#undef SHOW_COUNTER
335
336 return CMD_SUCCESS;
337}
338
339DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
340 "show fpm counters json",
341 SHOW_STR
342 FPM_STR
343 "FPM statistic counters\n"
344 JSON_STR)
345{
346 struct json_object *jo;
347
348 jo = json_object_new_object();
349 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
350 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
351 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
352 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
353 json_object_int_add(jo, "connection-closes",
354 gfnc->counters.connection_closes);
355 json_object_int_add(jo, "connection-errors",
356 gfnc->counters.connection_errors);
357 json_object_int_add(jo, "data-plane-contexts",
358 gfnc->counters.dplane_contexts);
ba803a2f
RZ
359 json_object_int_add(jo, "data-plane-contexts-queue",
360 gfnc->counters.ctxqueue_len);
361 json_object_int_add(jo, "data-plane-contexts-queue-peak",
362 gfnc->counters.ctxqueue_len_peak);
6cc059cd 363 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
364 json_object_int_add(jo, "user-configures",
365 gfnc->counters.user_configures);
6cc059cd 366 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
962af8a8 367 vty_json(vty, jo);
6cc059cd
RZ
368
369 return CMD_SUCCESS;
370}
371
3bdd7fca
RZ
372static int fpm_write_config(struct vty *vty)
373{
374 struct sockaddr_in *sin;
375 struct sockaddr_in6 *sin6;
376 int written = 0;
3bdd7fca
RZ
377
378 if (gfnc->disabled)
379 return written;
380
381 switch (gfnc->addr.ss_family) {
382 case AF_INET:
383 written = 1;
384 sin = (struct sockaddr_in *)&gfnc->addr;
a3adec46 385 vty_out(vty, "fpm address %pI4", &sin->sin_addr);
3bdd7fca
RZ
386 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
387 vty_out(vty, " port %d", ntohs(sin->sin_port));
388
389 vty_out(vty, "\n");
390 break;
391 case AF_INET6:
392 written = 1;
393 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
a3adec46 394 vty_out(vty, "fpm address %pI6", &sin6->sin6_addr);
3bdd7fca
RZ
395 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
396 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
397
398 vty_out(vty, "\n");
399 break;
400
401 default:
402 break;
403 }
404
b55ab92a
RZ
405 if (!gfnc->use_nhg) {
406 vty_out(vty, "no fpm use-next-hop-groups\n");
407 written = 1;
408 }
409
3bdd7fca
RZ
410 return written;
411}
412
612c2c15 413static struct cmd_node fpm_node = {
893d8beb
DL
414 .name = "fpm",
415 .node = FPM_NODE,
3bdd7fca 416 .prompt = "",
612c2c15 417 .config_write = fpm_write_config,
3bdd7fca
RZ
418};
419
d35f447d
RZ
420/*
421 * FPM functions.
422 */
cc9f21da 423static void fpm_connect(struct thread *t);
d35f447d
RZ
424
425static void fpm_reconnect(struct fpm_nl_ctx *fnc)
426{
a2032324 427 /* Cancel all zebra threads first. */
f9bf1ecc
DE
428 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
429 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
a2032324
RZ
430 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
431 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
432 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
433 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
434 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
435 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
436
437 /*
438 * Grab the lock to empty the streams (data plane might try to
439 * enqueue updates while we are closing).
440 */
d35f447d
RZ
441 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
442
3bdd7fca
RZ
443 /* Avoid calling close on `-1`. */
444 if (fnc->socket != -1) {
445 close(fnc->socket);
446 fnc->socket = -1;
447 }
448
d35f447d
RZ
449 stream_reset(fnc->ibuf);
450 stream_reset(fnc->obuf);
451 THREAD_OFF(fnc->t_read);
452 THREAD_OFF(fnc->t_write);
018e77bc 453
3bdd7fca
RZ
454 /* FPM is disabled, don't attempt to connect. */
455 if (fnc->disabled)
456 return;
457
d35f447d
RZ
458 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
459 &fnc->t_connect);
460}
461
cc9f21da 462static void fpm_read(struct thread *t)
d35f447d
RZ
463{
464 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
465 ssize_t rv;
466
467 /* Let's ignore the input at the moment. */
468 rv = stream_read_try(fnc->ibuf, fnc->socket,
469 STREAM_WRITEABLE(fnc->ibuf));
470 if (rv == 0) {
c871e6c9
RZ
471 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
472 memory_order_relaxed);
e5e444d8
RZ
473
474 if (IS_ZEBRA_DEBUG_FPM)
475 zlog_debug("%s: connection closed", __func__);
476
a2032324 477 FPM_RECONNECT(fnc);
cc9f21da 478 return;
d35f447d
RZ
479 }
480 if (rv == -1) {
c871e6c9
RZ
481 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
482 memory_order_relaxed);
e5e444d8
RZ
483 zlog_warn("%s: connection failure: %s", __func__,
484 strerror(errno));
a2032324 485 FPM_RECONNECT(fnc);
cc9f21da 486 return;
d35f447d 487 }
7d83e139
DS
488
489 /* Schedule the next read */
490 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
491 &fnc->t_read);
492
493 /* We've got an interruption. */
494 if (rv == -2)
495 return;
496
d35f447d
RZ
497 stream_reset(fnc->ibuf);
498
6cc059cd 499 /* Account all bytes read. */
c871e6c9
RZ
500 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
501 memory_order_relaxed);
d35f447d
RZ
502}
503
cc9f21da 504static void fpm_write(struct thread *t)
d35f447d
RZ
505{
506 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
507 socklen_t statuslen;
508 ssize_t bwritten;
509 int rv, status;
510 size_t btotal;
511
512 if (fnc->connecting == true) {
513 status = 0;
514 statuslen = sizeof(status);
515
516 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
517 &statuslen);
518 if (rv == -1 || status != 0) {
519 if (rv != -1)
e5e444d8
RZ
520 zlog_warn("%s: connection failed: %s", __func__,
521 strerror(status));
d35f447d 522 else
e5e444d8
RZ
523 zlog_warn("%s: SO_ERROR failed: %s", __func__,
524 strerror(status));
d35f447d 525
c871e6c9
RZ
526 atomic_fetch_add_explicit(
527 &fnc->counters.connection_errors, 1,
528 memory_order_relaxed);
6cc059cd 529
a2032324 530 FPM_RECONNECT(fnc);
cc9f21da 531 return;
d35f447d
RZ
532 }
533
534 fnc->connecting = false;
018e77bc 535
f584de52
RZ
536 /*
537 * Starting with LSPs walk all FPM objects, marking them
538 * as unsent and then replaying them.
539 */
540 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
541 &fnc->t_lspreset);
542
e1afb97f
RZ
543 /* Permit receiving messages now. */
544 thread_add_read(fnc->fthread->master, fpm_read, fnc,
545 fnc->socket, &fnc->t_read);
d35f447d
RZ
546 }
547
548 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
549
550 while (true) {
551 /* Stream is empty: reset pointers and return. */
552 if (STREAM_READABLE(fnc->obuf) == 0) {
553 stream_reset(fnc->obuf);
554 break;
555 }
556
557 /* Try to write all at once. */
558 btotal = stream_get_endp(fnc->obuf) -
559 stream_get_getp(fnc->obuf);
560 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
561 if (bwritten == 0) {
c871e6c9
RZ
562 atomic_fetch_add_explicit(
563 &fnc->counters.connection_closes, 1,
564 memory_order_relaxed);
e5e444d8
RZ
565
566 if (IS_ZEBRA_DEBUG_FPM)
567 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
568 break;
569 }
570 if (bwritten == -1) {
ad4d1022
RZ
571 /* Attempt to continue if blocked by a signal. */
572 if (errno == EINTR)
573 continue;
574 /* Receiver is probably slow, lets give it some time. */
575 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
576 break;
577
c871e6c9
RZ
578 atomic_fetch_add_explicit(
579 &fnc->counters.connection_errors, 1,
580 memory_order_relaxed);
e5e444d8
RZ
581 zlog_warn("%s: connection failure: %s", __func__,
582 strerror(errno));
a2032324
RZ
583
584 FPM_RECONNECT(fnc);
cc9f21da 585 return;
d35f447d
RZ
586 }
587
6cc059cd 588 /* Account all bytes sent. */
c871e6c9
RZ
589 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
590 memory_order_relaxed);
6cc059cd 591
ad4d1022 592 /* Account number of bytes free. */
c871e6c9
RZ
593 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
594 memory_order_relaxed);
ad4d1022 595
d35f447d
RZ
596 stream_forward_getp(fnc->obuf, (size_t)bwritten);
597 }
598
599 /* Stream is not empty yet, we must schedule more writes. */
600 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 601 stream_pulldown(fnc->obuf);
d35f447d
RZ
602 thread_add_write(fnc->fthread->master, fpm_write, fnc,
603 fnc->socket, &fnc->t_write);
cc9f21da 604 return;
d35f447d 605 }
d35f447d
RZ
606}
607
cc9f21da 608static void fpm_connect(struct thread *t)
d35f447d
RZ
609{
610 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
611 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
612 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
613 socklen_t slen;
d35f447d
RZ
614 int rv, sock;
615 char addrstr[INET6_ADDRSTRLEN];
616
3bdd7fca 617 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 618 if (sock == -1) {
6cc059cd 619 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
620 strerror(errno));
621 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
622 &fnc->t_connect);
cc9f21da 623 return;
d35f447d
RZ
624 }
625
626 set_nonblocking(sock);
627
3bdd7fca
RZ
628 if (fnc->addr.ss_family == AF_INET) {
629 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
630 slen = sizeof(*sin);
631 } else {
632 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
633 slen = sizeof(*sin6);
634 }
d35f447d 635
e5e444d8
RZ
636 if (IS_ZEBRA_DEBUG_FPM)
637 zlog_debug("%s: attempting to connect to %s:%d", __func__,
638 addrstr, ntohs(sin->sin_port));
d35f447d 639
3bdd7fca 640 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 641 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
642 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
643 memory_order_relaxed);
d35f447d
RZ
644 close(sock);
645 zlog_warn("%s: fpm connection failed: %s", __func__,
646 strerror(errno));
647 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
648 &fnc->t_connect);
cc9f21da 649 return;
d35f447d
RZ
650 }
651
652 fnc->connecting = (errno == EINPROGRESS);
653 fnc->socket = sock;
e1afb97f
RZ
654 if (!fnc->connecting)
655 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
656 &fnc->t_read);
d35f447d
RZ
657 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
658 &fnc->t_write);
659
f9bf1ecc
DE
660 /*
661 * Starting with LSPs walk all FPM objects, marking them
662 * as unsent and then replaying them.
f584de52
RZ
663 *
664 * If we are not connected, then delay the objects reset/send.
f9bf1ecc 665 */
f584de52
RZ
666 if (!fnc->connecting)
667 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
668 &fnc->t_lspreset);
d35f447d
RZ
669}
670
671/**
672 * Encode data plane operation context into netlink and enqueue it in the FPM
673 * output buffer.
674 *
675 * @param fnc the netlink FPM context.
676 * @param ctx the data plane operation context data.
677 * @return 0 on success or -1 on not enough space.
678 */
679static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
680{
681 uint8_t nl_buf[NL_PKT_BUF_SIZE];
682 size_t nl_buf_len;
683 ssize_t rv;
edfeff42 684 uint64_t obytes, obytes_peak;
b55ab92a
RZ
685 enum dplane_op_e op = dplane_ctx_get_op(ctx);
686
687 /*
688 * If we were configured to not use next hop groups, then quit as soon
689 * as possible.
690 */
691 if ((!fnc->use_nhg)
692 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
693 || op == DPLANE_OP_NH_UPDATE))
694 return 0;
d35f447d
RZ
695
696 nl_buf_len = 0;
697
698 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
699
b55ab92a 700 switch (op) {
d35f447d
RZ
701 case DPLANE_OP_ROUTE_UPDATE:
702 case DPLANE_OP_ROUTE_DELETE:
0be6e7d7
JU
703 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
704 nl_buf, sizeof(nl_buf),
705 true, fnc->use_nhg);
d35f447d 706 if (rv <= 0) {
0be6e7d7
JU
707 zlog_err(
708 "%s: netlink_route_multipath_msg_encode failed",
709 __func__);
d35f447d
RZ
710 return 0;
711 }
712
713 nl_buf_len = (size_t)rv;
d35f447d
RZ
714
715 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 716 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
717 break;
718
719 /* FALL THROUGH */
720 case DPLANE_OP_ROUTE_INSTALL:
0be6e7d7 721 rv = netlink_route_multipath_msg_encode(
b55ab92a
RZ
722 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
723 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 724 if (rv <= 0) {
0be6e7d7
JU
725 zlog_err(
726 "%s: netlink_route_multipath_msg_encode failed",
727 __func__);
d35f447d
RZ
728 return 0;
729 }
730
731 nl_buf_len += (size_t)rv;
d35f447d
RZ
732 break;
733
bda10adf
RZ
734 case DPLANE_OP_MAC_INSTALL:
735 case DPLANE_OP_MAC_DELETE:
736 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
737 if (rv <= 0) {
e5e444d8
RZ
738 zlog_err("%s: netlink_macfdb_update_ctx failed",
739 __func__);
bda10adf
RZ
740 return 0;
741 }
742
743 nl_buf_len = (size_t)rv;
bda10adf
RZ
744 break;
745
e9a1cd93 746 case DPLANE_OP_NH_DELETE:
0be6e7d7
JU
747 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
748 sizeof(nl_buf));
e9a1cd93 749 if (rv <= 0) {
0be6e7d7
JU
750 zlog_err("%s: netlink_nexthop_msg_encode failed",
751 __func__);
e9a1cd93
RZ
752 return 0;
753 }
754
755 nl_buf_len = (size_t)rv;
756 break;
d35f447d
RZ
757 case DPLANE_OP_NH_INSTALL:
758 case DPLANE_OP_NH_UPDATE:
0be6e7d7
JU
759 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
760 sizeof(nl_buf));
e9a1cd93 761 if (rv <= 0) {
0be6e7d7
JU
762 zlog_err("%s: netlink_nexthop_msg_encode failed",
763 __func__);
e9a1cd93
RZ
764 return 0;
765 }
766
767 nl_buf_len = (size_t)rv;
768 break;
769
d35f447d
RZ
770 case DPLANE_OP_LSP_INSTALL:
771 case DPLANE_OP_LSP_UPDATE:
772 case DPLANE_OP_LSP_DELETE:
b300c8bb
DE
773 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
774 if (rv <= 0) {
f9bf1ecc
DE
775 zlog_err("%s: netlink_lsp_msg_encoder failed",
776 __func__);
b300c8bb
DE
777 return 0;
778 }
779
780 nl_buf_len += (size_t)rv;
781 break;
782
d4bcd88d 783 /* Un-handled by FPM at this time. */
d35f447d
RZ
784 case DPLANE_OP_PW_INSTALL:
785 case DPLANE_OP_PW_UNINSTALL:
786 case DPLANE_OP_ADDR_INSTALL:
787 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
788 case DPLANE_OP_NEIGH_INSTALL:
789 case DPLANE_OP_NEIGH_UPDATE:
790 case DPLANE_OP_NEIGH_DELETE:
791 case DPLANE_OP_VTEP_ADD:
792 case DPLANE_OP_VTEP_DELETE:
793 case DPLANE_OP_SYS_ROUTE_ADD:
794 case DPLANE_OP_SYS_ROUTE_DELETE:
795 case DPLANE_OP_ROUTE_NOTIFY:
796 case DPLANE_OP_LSP_NOTIFY:
d4bcd88d
MS
797 case DPLANE_OP_RULE_ADD:
798 case DPLANE_OP_RULE_DELETE:
799 case DPLANE_OP_RULE_UPDATE:
800 case DPLANE_OP_NEIGH_DISCOVER:
801 case DPLANE_OP_BR_PORT_UPDATE:
802 case DPLANE_OP_IPTABLE_ADD:
803 case DPLANE_OP_IPTABLE_DELETE:
804 case DPLANE_OP_IPSET_ADD:
805 case DPLANE_OP_IPSET_DELETE:
806 case DPLANE_OP_IPSET_ENTRY_ADD:
807 case DPLANE_OP_IPSET_ENTRY_DELETE:
808 case DPLANE_OP_NEIGH_IP_INSTALL:
809 case DPLANE_OP_NEIGH_IP_DELETE:
810 case DPLANE_OP_NEIGH_TABLE_UPDATE:
811 case DPLANE_OP_GRE_SET:
812 case DPLANE_OP_INTF_ADDR_ADD:
813 case DPLANE_OP_INTF_ADDR_DEL:
728f2017 814 case DPLANE_OP_INTF_NETCONFIG:
5d414138
SW
815 case DPLANE_OP_INTF_INSTALL:
816 case DPLANE_OP_INTF_UPDATE:
817 case DPLANE_OP_INTF_DELETE:
c317d3f2
SY
818 case DPLANE_OP_TC_QDISC_INSTALL:
819 case DPLANE_OP_TC_QDISC_UNINSTALL:
820 case DPLANE_OP_TC_CLASS_ADD:
821 case DPLANE_OP_TC_CLASS_DELETE:
822 case DPLANE_OP_TC_CLASS_UPDATE:
823 case DPLANE_OP_TC_FILTER_ADD:
824 case DPLANE_OP_TC_FILTER_DELETE:
825 case DPLANE_OP_TC_FILTER_UPDATE:
d35f447d
RZ
826 case DPLANE_OP_NONE:
827 break;
828
d35f447d
RZ
829 }
830
831 /* Skip empty enqueues. */
832 if (nl_buf_len == 0)
833 return 0;
834
a179ba35
RZ
835 /* We must know if someday a message goes beyond 65KiB. */
836 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
837
838 /* Check if we have enough buffer space. */
839 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
840 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
841 memory_order_relaxed);
e5e444d8
RZ
842
843 if (IS_ZEBRA_DEBUG_FPM)
844 zlog_debug(
845 "%s: buffer full: wants to write %zu but has %zu",
846 __func__, nl_buf_len + FPM_HEADER_SIZE,
847 STREAM_WRITEABLE(fnc->obuf));
848
a179ba35
RZ
849 return -1;
850 }
851
d35f447d 852 /*
a179ba35
RZ
853 * Fill in the FPM header information.
854 *
855 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
856 */
857 stream_putc(fnc->obuf, 1);
858 stream_putc(fnc->obuf, 1);
a179ba35 859 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
860
861 /* Write current data. */
862 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
863
ad4d1022 864 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
865 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
866 nl_buf_len + FPM_HEADER_SIZE,
867 memory_order_relaxed);
edfeff42
RZ
868 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
869 memory_order_relaxed);
870 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
871 memory_order_relaxed);
872 if (obytes_peak < obytes)
c871e6c9
RZ
873 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
874 memory_order_relaxed);
ad4d1022 875
d35f447d
RZ
876 /* Tell the thread to start writing. */
877 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
878 &fnc->t_write);
879
880 return 0;
881}
882
f9bf1ecc
DE
883/*
884 * LSP walk/send functions
885 */
886struct fpm_lsp_arg {
887 struct zebra_dplane_ctx *ctx;
888 struct fpm_nl_ctx *fnc;
889 bool complete;
890};
891
892static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
893{
8f74a383 894 struct zebra_lsp *lsp = bucket->data;
f9bf1ecc
DE
895 struct fpm_lsp_arg *fla = arg;
896
897 /* Skip entries which have already been sent */
898 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
899 return HASHWALK_CONTINUE;
900
901 dplane_ctx_reset(fla->ctx);
902 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
903
904 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
905 fla->complete = false;
906 return HASHWALK_ABORT;
907 }
908
909 /* Mark entry as sent */
910 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
911 return HASHWALK_CONTINUE;
912}
913
cc9f21da 914static void fpm_lsp_send(struct thread *t)
f9bf1ecc
DE
915{
916 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
917 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
918 struct fpm_lsp_arg fla;
919
920 fla.fnc = fnc;
921 fla.ctx = dplane_ctx_alloc();
922 fla.complete = true;
923
924 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
925
926 dplane_ctx_fini(&fla.ctx);
927
928 if (fla.complete) {
929 WALK_FINISH(fnc, FNE_LSP_FINISHED);
930
931 /* Now move onto routes */
1f9193c1
RZ
932 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
933 &fnc->t_nhgreset);
f9bf1ecc
DE
934 } else {
935 /* Didn't finish - reschedule LSP walk */
936 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
937 &fnc->t_lspwalk);
938 }
f9bf1ecc
DE
939}
940
981ca597
RZ
941/*
942 * Next hop walk/send functions.
943 */
944struct fpm_nhg_arg {
945 struct zebra_dplane_ctx *ctx;
946 struct fpm_nl_ctx *fnc;
947 bool complete;
948};
949
950static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
951{
952 struct nhg_hash_entry *nhe = bucket->data;
953 struct fpm_nhg_arg *fna = arg;
954
955 /* This entry was already sent, skip it. */
956 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
957 return HASHWALK_CONTINUE;
958
959 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
960 dplane_ctx_reset(fna->ctx);
961 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
962 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
963 /* Our buffers are full, lets give it some cycles. */
964 fna->complete = false;
965 return HASHWALK_ABORT;
966 }
967
968 /* Mark group as sent, so it doesn't get sent again. */
969 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
970
971 return HASHWALK_CONTINUE;
972}
973
cc9f21da 974static void fpm_nhg_send(struct thread *t)
981ca597
RZ
975{
976 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
977 struct fpm_nhg_arg fna;
978
979 fna.fnc = fnc;
980 fna.ctx = dplane_ctx_alloc();
981 fna.complete = true;
982
983 /* Send next hops. */
1f9193c1
RZ
984 if (fnc->use_nhg)
985 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
981ca597
RZ
986
987 /* `free()` allocated memory. */
988 dplane_ctx_fini(&fna.ctx);
989
990 /* We are done sending next hops, lets install the routes now. */
55eb9d4d
RZ
991 if (fna.complete) {
992 WALK_FINISH(fnc, FNE_NHG_FINISHED);
e41e0f81
RZ
993 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
994 &fnc->t_ribreset);
55eb9d4d 995 } else /* Otherwise reschedule next hop group again. */
981ca597
RZ
996 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
997 &fnc->t_nhgwalk);
981ca597
RZ
998}
999
018e77bc
RZ
1000/**
1001 * Send all RIB installed routes to the connected data plane.
1002 */
cc9f21da 1003static void fpm_rib_send(struct thread *t)
018e77bc
RZ
1004{
1005 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1006 rib_dest_t *dest;
1007 struct route_node *rn;
1008 struct route_table *rt;
1009 struct zebra_dplane_ctx *ctx;
1010 rib_tables_iter_t rt_iter;
1011
1012 /* Allocate temporary context for all transactions. */
1013 ctx = dplane_ctx_alloc();
1014
1015 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1016 while ((rt = rib_tables_iter_next(&rt_iter))) {
1017 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1018 dest = rib_dest_from_rnode(rn);
1019 /* Skip bad route entries. */
a50404aa 1020 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 1021 continue;
018e77bc
RZ
1022
1023 /* Check for already sent routes. */
a50404aa 1024 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 1025 continue;
018e77bc
RZ
1026
1027 /* Enqueue route install. */
1028 dplane_ctx_reset(ctx);
1029 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1030 dest->selected_fib);
1031 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1032 /* Free the temporary allocated context. */
1033 dplane_ctx_fini(&ctx);
1034
018e77bc
RZ
1035 thread_add_timer(zrouter.master, fpm_rib_send,
1036 fnc, 1, &fnc->t_ribwalk);
cc9f21da 1037 return;
018e77bc
RZ
1038 }
1039
1040 /* Mark as sent. */
1041 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1042 }
1043 }
1044
1045 /* Free the temporary allocated context. */
1046 dplane_ctx_fini(&ctx);
1047
1048 /* All RIB routes sent! */
55eb9d4d 1049 WALK_FINISH(fnc, FNE_RIB_FINISHED);
018e77bc 1050
e41e0f81
RZ
1051 /* Schedule next event: RMAC reset. */
1052 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1053 &fnc->t_rmacreset);
018e77bc
RZ
1054}
1055
bda10adf
RZ
1056/*
1057 * The next three functions will handle RMAC enqueue.
1058 */
1059struct fpm_rmac_arg {
1060 struct zebra_dplane_ctx *ctx;
1061 struct fpm_nl_ctx *fnc;
05843a27 1062 struct zebra_l3vni *zl3vni;
55eb9d4d 1063 bool complete;
bda10adf
RZ
1064};
1065
1ac88792 1066static void fpm_enqueue_rmac_table(struct hash_bucket *bucket, void *arg)
bda10adf
RZ
1067{
1068 struct fpm_rmac_arg *fra = arg;
3198b2b3 1069 struct zebra_mac *zrmac = bucket->data;
bda10adf
RZ
1070 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1071 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1072 struct zebra_if *br_zif;
1073 vlanid_t vid;
1074 bool sticky;
1075
1076 /* Entry already sent. */
55eb9d4d 1077 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
bda10adf
RZ
1078 return;
1079
1080 sticky = !!CHECK_FLAG(zrmac->flags,
1081 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1082 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1083 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1084
1085 dplane_ctx_reset(fra->ctx);
1086 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1087 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a 1088 zif->brslave_info.br_if, vid,
f188e68e
AK
1089 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky,
1090 0 /*nhg*/, 0 /*update_flags*/);
bda10adf 1091 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1092 thread_add_timer(zrouter.master, fpm_rmac_send,
1093 fra->fnc, 1, &fra->fnc->t_rmacwalk);
55eb9d4d 1094 fra->complete = false;
bda10adf
RZ
1095 }
1096}
1097
1ac88792 1098static void fpm_enqueue_l3vni_table(struct hash_bucket *bucket, void *arg)
bda10adf
RZ
1099{
1100 struct fpm_rmac_arg *fra = arg;
05843a27 1101 struct zebra_l3vni *zl3vni = bucket->data;
bda10adf
RZ
1102
1103 fra->zl3vni = zl3vni;
1104 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1105}
1106
cc9f21da 1107static void fpm_rmac_send(struct thread *t)
bda10adf
RZ
1108{
1109 struct fpm_rmac_arg fra;
1110
1111 fra.fnc = THREAD_ARG(t);
1112 fra.ctx = dplane_ctx_alloc();
55eb9d4d 1113 fra.complete = true;
bda10adf
RZ
1114 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1115 dplane_ctx_fini(&fra.ctx);
1116
55eb9d4d
RZ
1117 /* RMAC walk completed. */
1118 if (fra.complete)
1119 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
bda10adf
RZ
1120}
1121
981ca597
RZ
1122/*
1123 * Resets the next hop FPM flags so we send all next hops again.
1124 */
1125static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1126{
1127 struct nhg_hash_entry *nhe = bucket->data;
1128
1129 /* Unset FPM installation flag so it gets installed again. */
1130 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1131}
1132
cc9f21da 1133static void fpm_nhg_reset(struct thread *t)
981ca597 1134{
55eb9d4d
RZ
1135 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1136
981ca597 1137 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
e41e0f81
RZ
1138
1139 /* Schedule next step: send next hop groups. */
1140 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
981ca597
RZ
1141}
1142
f9bf1ecc
DE
1143/*
1144 * Resets the LSP FPM flag so we send all LSPs again.
1145 */
1146static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1147{
8f74a383 1148 struct zebra_lsp *lsp = bucket->data;
f9bf1ecc
DE
1149
1150 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1151}
1152
cc9f21da 1153static void fpm_lsp_reset(struct thread *t)
f9bf1ecc
DE
1154{
1155 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1156 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1157
1158 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1159
1160 /* Schedule next step: send LSPs */
1161 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
f9bf1ecc
DE
1162}
1163
018e77bc
RZ
1164/**
1165 * Resets the RIB FPM flags so we send all routes again.
1166 */
cc9f21da 1167static void fpm_rib_reset(struct thread *t)
018e77bc
RZ
1168{
1169 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1170 rib_dest_t *dest;
1171 struct route_node *rn;
1172 struct route_table *rt;
1173 rib_tables_iter_t rt_iter;
1174
018e77bc
RZ
1175 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1176 while ((rt = rib_tables_iter_next(&rt_iter))) {
1177 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1178 dest = rib_dest_from_rnode(rn);
1179 /* Skip bad route entries. */
1180 if (dest == NULL)
1181 continue;
1182
1183 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1184 }
1185 }
1186
e41e0f81
RZ
1187 /* Schedule next step: send RIB routes. */
1188 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
018e77bc
RZ
1189}
1190
bda10adf
RZ
1191/*
1192 * The next three function will handle RMAC table reset.
1193 */
1ac88792 1194static void fpm_unset_rmac_table(struct hash_bucket *bucket, void *arg)
bda10adf 1195{
3198b2b3 1196 struct zebra_mac *zrmac = bucket->data;
bda10adf
RZ
1197
1198 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1199}
1200
1ac88792 1201static void fpm_unset_l3vni_table(struct hash_bucket *bucket, void *arg)
bda10adf 1202{
05843a27 1203 struct zebra_l3vni *zl3vni = bucket->data;
bda10adf
RZ
1204
1205 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1206}
1207
cc9f21da 1208static void fpm_rmac_reset(struct thread *t)
bda10adf 1209{
55eb9d4d
RZ
1210 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1211
bda10adf
RZ
1212 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1213
e41e0f81
RZ
1214 /* Schedule next event: send RMAC entries. */
1215 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1216 &fnc->t_rmacwalk);
bda10adf
RZ
1217}
1218
cc9f21da 1219static void fpm_process_queue(struct thread *t)
ba803a2f
RZ
1220{
1221 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1222 struct zebra_dplane_ctx *ctx;
3f2b998f 1223 bool no_bufs = false;
438dd3e7 1224 uint64_t processed_contexts = 0;
ba803a2f 1225
ba803a2f
RZ
1226 while (true) {
1227 /* No space available yet. */
3f2b998f
DE
1228 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) {
1229 no_bufs = true;
ba803a2f 1230 break;
3f2b998f 1231 }
ba803a2f
RZ
1232
1233 /* Dequeue next item or quit processing. */
dc693fe0
DE
1234 frr_with_mutex (&fnc->ctxqueue_mutex) {
1235 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1236 }
ba803a2f
RZ
1237 if (ctx == NULL)
1238 break;
1239
3a150188
DS
1240 /*
1241 * Intentionally ignoring the return value
1242 * as that we are ensuring that we can write to
1243 * the output data in the STREAM_WRITEABLE
1244 * check above, so we can ignore the return
1245 */
3b1caddd
RZ
1246 if (fnc->socket != -1)
1247 (void)fpm_nl_enqueue(fnc, ctx);
ba803a2f
RZ
1248
1249 /* Account the processed entries. */
438dd3e7 1250 processed_contexts++;
c871e6c9
RZ
1251 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1252 memory_order_relaxed);
ba803a2f
RZ
1253
1254 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1255 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1256 }
1257
438dd3e7
DE
1258 /* Update count of processed contexts */
1259 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts,
1260 processed_contexts, memory_order_relaxed);
1261
3f2b998f
DE
1262 /* Re-schedule if we ran out of buffer space */
1263 if (no_bufs)
ba803a2f
RZ
1264 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1265 fnc, 0, &fnc->t_dequeue);
1266
164d8e86
DE
1267 /*
1268 * Let the dataplane thread know if there are items in the
1269 * output queue to be processed. Otherwise they may sit
1270 * until the dataplane thread gets scheduled for new,
1271 * unrelated work.
1272 */
1273 if (dplane_provider_out_ctx_queue_len(fnc->prov) > 0)
1274 dplane_provider_work_ready();
ba803a2f
RZ
1275}
1276
3bdd7fca
RZ
1277/**
1278 * Handles external (e.g. CLI, data plane or others) events.
1279 */
cc9f21da 1280static void fpm_process_event(struct thread *t)
3bdd7fca
RZ
1281{
1282 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
dc31de93 1283 enum fpm_nl_events event = THREAD_VAL(t);
3bdd7fca
RZ
1284
1285 switch (event) {
1286 case FNE_DISABLE:
e5e444d8 1287 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1288 fnc->disabled = true;
c871e6c9
RZ
1289 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1290 memory_order_relaxed);
3bdd7fca
RZ
1291
1292 /* Call reconnect to disable timers and clean up context. */
1293 fpm_reconnect(fnc);
1294 break;
1295
1296 case FNE_RECONNECT:
e5e444d8 1297 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1298 fnc->disabled = false;
c871e6c9
RZ
1299 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1300 memory_order_relaxed);
3bdd7fca
RZ
1301 fpm_reconnect(fnc);
1302 break;
1303
6cc059cd 1304 case FNE_RESET_COUNTERS:
e5e444d8 1305 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1306 memset(&fnc->counters, 0, sizeof(fnc->counters));
1307 break;
1308
b55ab92a
RZ
1309 case FNE_TOGGLE_NHG:
1310 zlog_info("%s: toggle next hop groups support", __func__);
1311 fnc->use_nhg = !fnc->use_nhg;
1312 fpm_reconnect(fnc);
1313 break;
1314
a2032324
RZ
1315 case FNE_INTERNAL_RECONNECT:
1316 fpm_reconnect(fnc);
1317 break;
1318
55eb9d4d
RZ
1319 case FNE_NHG_FINISHED:
1320 if (IS_ZEBRA_DEBUG_FPM)
1321 zlog_debug("%s: next hop groups walk finished",
1322 __func__);
55eb9d4d
RZ
1323 break;
1324 case FNE_RIB_FINISHED:
1325 if (IS_ZEBRA_DEBUG_FPM)
1326 zlog_debug("%s: RIB walk finished", __func__);
55eb9d4d
RZ
1327 break;
1328 case FNE_RMAC_FINISHED:
1329 if (IS_ZEBRA_DEBUG_FPM)
1330 zlog_debug("%s: RMAC walk finished", __func__);
55eb9d4d 1331 break;
f9bf1ecc
DE
1332 case FNE_LSP_FINISHED:
1333 if (IS_ZEBRA_DEBUG_FPM)
1334 zlog_debug("%s: LSP walk finished", __func__);
1335 break;
3bdd7fca 1336 }
3bdd7fca
RZ
1337}
1338
d35f447d
RZ
1339/*
1340 * Data plane functions.
1341 */
1342static int fpm_nl_start(struct zebra_dplane_provider *prov)
1343{
1344 struct fpm_nl_ctx *fnc;
1345
1346 fnc = dplane_provider_get_data(prov);
1347 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1348 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1349 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1350 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1351 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1352 fnc->socket = -1;
3bdd7fca 1353 fnc->disabled = true;
ba803a2f
RZ
1354 fnc->prov = prov;
1355 TAILQ_INIT(&fnc->ctxqueue);
1356 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1357
b55ab92a
RZ
1358 /* Set default values. */
1359 fnc->use_nhg = true;
1360
d35f447d
RZ
1361 return 0;
1362}
1363
98a87504 1364static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1365{
98a87504 1366 /* Disable all events and close socket. */
f9bf1ecc
DE
1367 THREAD_OFF(fnc->t_lspreset);
1368 THREAD_OFF(fnc->t_lspwalk);
981ca597
RZ
1369 THREAD_OFF(fnc->t_nhgreset);
1370 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1371 THREAD_OFF(fnc->t_ribreset);
1372 THREAD_OFF(fnc->t_ribwalk);
1373 THREAD_OFF(fnc->t_rmacreset);
1374 THREAD_OFF(fnc->t_rmacwalk);
551fa8c3
DS
1375 THREAD_OFF(fnc->t_event);
1376 THREAD_OFF(fnc->t_nhg);
98a87504
RZ
1377 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1378 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1379 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1380
98a87504
RZ
1381 if (fnc->socket != -1) {
1382 close(fnc->socket);
1383 fnc->socket = -1;
1384 }
1385
1386 return 0;
1387}
1388
1389static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1390{
1391 /* Stop the running thread. */
1392 frr_pthread_stop(fnc->fthread, NULL);
1393
1394 /* Free all allocated resources. */
1395 pthread_mutex_destroy(&fnc->obuf_mutex);
1396 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1397 stream_free(fnc->ibuf);
1398 stream_free(fnc->obuf);
98a87504
RZ
1399 free(gfnc);
1400 gfnc = NULL;
d35f447d
RZ
1401
1402 return 0;
1403}
1404
98a87504
RZ
1405static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1406{
1407 struct fpm_nl_ctx *fnc;
1408
1409 fnc = dplane_provider_get_data(prov);
1410 if (early)
1411 return fpm_nl_finish_early(fnc);
1412
1413 return fpm_nl_finish_late(fnc);
1414}
1415
d35f447d
RZ
1416static int fpm_nl_process(struct zebra_dplane_provider *prov)
1417{
1418 struct zebra_dplane_ctx *ctx;
1419 struct fpm_nl_ctx *fnc;
1420 int counter, limit;
bf2f7839 1421 uint64_t cur_queue, peak_queue = 0, stored_peak_queue;
d35f447d
RZ
1422
1423 fnc = dplane_provider_get_data(prov);
1424 limit = dplane_provider_get_work_limit(prov);
1425 for (counter = 0; counter < limit; counter++) {
1426 ctx = dplane_provider_dequeue_in_ctx(prov);
1427 if (ctx == NULL)
1428 break;
1429
1430 /*
1431 * Skip all notifications if not connected, we'll walk the RIB
1432 * anyway.
1433 */
6cc059cd 1434 if (fnc->socket != -1 && fnc->connecting == false) {
dc693fe0
DE
1435 /*
1436 * Update the number of queued contexts *before*
1437 * enqueueing, to ensure counter consistency.
1438 */
c871e6c9
RZ
1439 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1440 1, memory_order_relaxed);
dc693fe0
DE
1441
1442 frr_with_mutex (&fnc->ctxqueue_mutex) {
1443 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1444 }
1445
c871e6c9
RZ
1446 cur_queue = atomic_load_explicit(
1447 &fnc->counters.ctxqueue_len,
1448 memory_order_relaxed);
edfeff42 1449 if (peak_queue < cur_queue)
bf2f7839 1450 peak_queue = cur_queue;
ba803a2f 1451 continue;
6cc059cd
RZ
1452 }
1453
d35f447d
RZ
1454 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1455 dplane_provider_enqueue_out_ctx(prov, ctx);
1456 }
1457
bf2f7839
DE
1458 /* Update peak queue length, if we just observed a new peak */
1459 stored_peak_queue = atomic_load_explicit(
1460 &fnc->counters.ctxqueue_len_peak, memory_order_relaxed);
1461 if (stored_peak_queue < peak_queue)
1462 atomic_store_explicit(&fnc->counters.ctxqueue_len_peak,
1463 peak_queue, memory_order_relaxed);
1464
c871e6c9
RZ
1465 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1466 memory_order_relaxed)
1467 > 0)
ba803a2f
RZ
1468 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1469 fnc, 0, &fnc->t_dequeue);
1470
b677907c
DE
1471 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1472 if (counter >= limit)
1473 dplane_provider_work_ready();
1474
d35f447d
RZ
1475 return 0;
1476}
1477
1478static int fpm_nl_new(struct thread_master *tm)
1479{
1480 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1481 int rv;
1482
3bdd7fca 1483 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1484 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1485 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1486 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1487 &prov);
1488
1489 if (IS_ZEBRA_DEBUG_DPLANE)
1490 zlog_debug("%s register status: %d", prov_name, rv);
1491
612c2c15 1492 install_node(&fpm_node);
6cc059cd
RZ
1493 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1494 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1495 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1496 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1497 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1498 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1499 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1500
d35f447d
RZ
1501 return 0;
1502}
1503
1504static int fpm_nl_init(void)
1505{
1506 hook_register(frr_late_init, fpm_nl_new);
1507 return 0;
1508}
1509
1510FRR_MODULE_SETUP(
1511 .name = "dplane_fpm_nl",
1512 .version = "0.0.1",
1513 .description = "Data plane plugin for FPM using netlink.",
1514 .init = fpm_nl_init,
80413c20 1515);