]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
Merge pull request #10447 from ton31337/fix/json_with_whitespaces
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
b300c8bb 46#include "zebra/zebra_mpls.h"
018e77bc 47#include "zebra/zebra_router.h"
b2998086
PR
48#include "zebra/zebra_evpn.h"
49#include "zebra/zebra_evpn_mac.h"
bda10adf 50#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
51#include "zebra/kernel_netlink.h"
52#include "zebra/rt_netlink.h"
53#include "zebra/debug.h"
54
55#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
56#define SOUTHBOUND_DEFAULT_PORT 2620
57
a179ba35
RZ
58/**
59 * FPM header:
60 * {
61 * version: 1 byte (always 1),
62 * type: 1 byte (1 for netlink, 2 protobuf),
63 * len: 2 bytes (network order),
64 * }
65 *
66 * This header is used with any format to tell the users how many bytes to
67 * expect.
68 */
69#define FPM_HEADER_SIZE 4
70
d35f447d
RZ
71static const char *prov_name = "dplane_fpm_nl";
72
73struct fpm_nl_ctx {
74 /* data plane connection. */
75 int socket;
3bdd7fca 76 bool disabled;
d35f447d 77 bool connecting;
b55ab92a 78 bool use_nhg;
d35f447d
RZ
79 struct sockaddr_storage addr;
80
81 /* data plane buffers. */
82 struct stream *ibuf;
83 struct stream *obuf;
84 pthread_mutex_t obuf_mutex;
85
ba803a2f
RZ
86 /*
87 * data plane context queue:
88 * When a FPM server connection becomes a bottleneck, we must keep the
89 * data plane contexts until we get a chance to process them.
90 */
91 struct dplane_ctx_q ctxqueue;
92 pthread_mutex_t ctxqueue_mutex;
93
d35f447d 94 /* data plane events. */
ba803a2f 95 struct zebra_dplane_provider *prov;
d35f447d
RZ
96 struct frr_pthread *fthread;
97 struct thread *t_connect;
98 struct thread *t_read;
99 struct thread *t_write;
3bdd7fca 100 struct thread *t_event;
ba803a2f 101 struct thread *t_dequeue;
018e77bc
RZ
102
103 /* zebra events. */
f9bf1ecc
DE
104 struct thread *t_lspreset;
105 struct thread *t_lspwalk;
981ca597
RZ
106 struct thread *t_nhgreset;
107 struct thread *t_nhgwalk;
018e77bc
RZ
108 struct thread *t_ribreset;
109 struct thread *t_ribwalk;
bda10adf
RZ
110 struct thread *t_rmacreset;
111 struct thread *t_rmacwalk;
6cc059cd
RZ
112
113 /* Statistic counters. */
114 struct {
115 /* Amount of bytes read into ibuf. */
770a8d28 116 _Atomic uint32_t bytes_read;
6cc059cd 117 /* Amount of bytes written from obuf. */
770a8d28 118 _Atomic uint32_t bytes_sent;
ad4d1022 119 /* Output buffer current usage. */
770a8d28 120 _Atomic uint32_t obuf_bytes;
ad4d1022 121 /* Output buffer peak usage. */
770a8d28 122 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
123
124 /* Amount of connection closes. */
770a8d28 125 _Atomic uint32_t connection_closes;
6cc059cd 126 /* Amount of connection errors. */
770a8d28 127 _Atomic uint32_t connection_errors;
6cc059cd
RZ
128
129 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 130 _Atomic uint32_t user_configures;
6cc059cd 131 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 132 _Atomic uint32_t user_disables;
6cc059cd
RZ
133
134 /* Amount of data plane context processed. */
770a8d28 135 _Atomic uint32_t dplane_contexts;
ba803a2f 136 /* Amount of data plane contexts enqueued. */
770a8d28 137 _Atomic uint32_t ctxqueue_len;
ba803a2f 138 /* Peak amount of data plane contexts enqueued. */
770a8d28 139 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
140
141 /* Amount of buffer full events. */
770a8d28 142 _Atomic uint32_t buffer_full;
6cc059cd 143 } counters;
770a8d28 144} *gfnc;
3bdd7fca
RZ
145
146enum fpm_nl_events {
147 /* Ask for FPM to reconnect the external server. */
148 FNE_RECONNECT,
149 /* Disable FPM. */
150 FNE_DISABLE,
6cc059cd
RZ
151 /* Reset counters. */
152 FNE_RESET_COUNTERS,
b55ab92a
RZ
153 /* Toggle next hop group feature. */
154 FNE_TOGGLE_NHG,
a2032324
RZ
155 /* Reconnect request by our own code to avoid races. */
156 FNE_INTERNAL_RECONNECT,
55eb9d4d 157
f9bf1ecc
DE
158 /* LSP walk finished. */
159 FNE_LSP_FINISHED,
55eb9d4d
RZ
160 /* Next hop groups walk finished. */
161 FNE_NHG_FINISHED,
162 /* RIB walk finished. */
163 FNE_RIB_FINISHED,
164 /* RMAC walk finished. */
165 FNE_RMAC_FINISHED,
d35f447d
RZ
166};
167
a2032324
RZ
168#define FPM_RECONNECT(fnc) \
169 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
170 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
171
55eb9d4d
RZ
172#define WALK_FINISH(fnc, ev) \
173 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
174 (ev), NULL)
175
018e77bc
RZ
176/*
177 * Prototypes.
178 */
cc9f21da 179static void fpm_process_event(struct thread *t);
018e77bc 180static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
cc9f21da
DS
181static void fpm_lsp_send(struct thread *t);
182static void fpm_lsp_reset(struct thread *t);
183static void fpm_nhg_send(struct thread *t);
184static void fpm_nhg_reset(struct thread *t);
185static void fpm_rib_send(struct thread *t);
186static void fpm_rib_reset(struct thread *t);
187static void fpm_rmac_send(struct thread *t);
188static void fpm_rmac_reset(struct thread *t);
018e77bc 189
3bdd7fca
RZ
190/*
191 * CLI.
192 */
6cc059cd
RZ
193#define FPM_STR "Forwarding Plane Manager configuration\n"
194
3bdd7fca
RZ
195DEFUN(fpm_set_address, fpm_set_address_cmd,
196 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 197 FPM_STR
3bdd7fca
RZ
198 "FPM remote listening server address\n"
199 "Remote IPv4 FPM server\n"
200 "Remote IPv6 FPM server\n"
201 "FPM remote listening server port\n"
202 "Remote FPM server port\n")
203{
204 struct sockaddr_in *sin;
205 struct sockaddr_in6 *sin6;
206 uint16_t port = 0;
207 uint8_t naddr[INET6_BUFSIZ];
208
209 if (argc == 5)
210 port = strtol(argv[4]->arg, NULL, 10);
211
212 /* Handle IPv4 addresses. */
213 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
214 sin = (struct sockaddr_in *)&gfnc->addr;
215
216 memset(sin, 0, sizeof(*sin));
217 sin->sin_family = AF_INET;
218 sin->sin_port =
219 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
220#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
221 sin->sin_len = sizeof(*sin);
222#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
223 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
224
225 goto ask_reconnect;
226 }
227
228 /* Handle IPv6 addresses. */
229 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
230 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
231 return CMD_WARNING;
232 }
233
234 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
235 memset(sin6, 0, sizeof(*sin6));
236 sin6->sin6_family = AF_INET6;
237 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
238#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
239 sin6->sin6_len = sizeof(*sin6);
240#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
241 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
242
243ask_reconnect:
244 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
245 FNE_RECONNECT, &gfnc->t_event);
246 return CMD_SUCCESS;
247}
248
249DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
250 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
251 NO_STR
6cc059cd 252 FPM_STR
3bdd7fca
RZ
253 "FPM remote listening server address\n"
254 "Remote IPv4 FPM server\n"
255 "Remote IPv6 FPM server\n"
256 "FPM remote listening server port\n"
257 "Remote FPM server port\n")
258{
259 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
260 FNE_DISABLE, &gfnc->t_event);
261 return CMD_SUCCESS;
262}
263
b55ab92a
RZ
264DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
265 "fpm use-next-hop-groups",
266 FPM_STR
267 "Use netlink next hop groups feature.\n")
268{
269 /* Already enabled. */
270 if (gfnc->use_nhg)
271 return CMD_SUCCESS;
272
273 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
274 FNE_TOGGLE_NHG, &gfnc->t_event);
275
276 return CMD_SUCCESS;
277}
278
279DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
280 "no fpm use-next-hop-groups",
281 NO_STR
282 FPM_STR
283 "Use netlink next hop groups feature.\n")
284{
285 /* Already disabled. */
286 if (!gfnc->use_nhg)
287 return CMD_SUCCESS;
288
289 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
290 FNE_TOGGLE_NHG, &gfnc->t_event);
291
292 return CMD_SUCCESS;
293}
294
6cc059cd
RZ
295DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
296 "clear fpm counters",
297 CLEAR_STR
298 FPM_STR
299 "FPM statistic counters\n")
300{
301 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
302 FNE_RESET_COUNTERS, &gfnc->t_event);
303 return CMD_SUCCESS;
304}
305
306DEFUN(fpm_show_counters, fpm_show_counters_cmd,
307 "show fpm counters",
308 SHOW_STR
309 FPM_STR
310 "FPM statistic counters\n")
311{
312 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
313
314#define SHOW_COUNTER(label, counter) \
770a8d28 315 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
316
317 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
318 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
319 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
320 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
321 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
322 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
323 SHOW_COUNTER("Data plane items processed",
324 gfnc->counters.dplane_contexts);
ba803a2f
RZ
325 SHOW_COUNTER("Data plane items enqueued",
326 gfnc->counters.ctxqueue_len);
327 SHOW_COUNTER("Data plane items queue peak",
328 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
329 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
330 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
331 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
332
333#undef SHOW_COUNTER
334
335 return CMD_SUCCESS;
336}
337
338DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
339 "show fpm counters json",
340 SHOW_STR
341 FPM_STR
342 "FPM statistic counters\n"
343 JSON_STR)
344{
345 struct json_object *jo;
346
347 jo = json_object_new_object();
348 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
349 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
350 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
351 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
352 json_object_int_add(jo, "connection-closes",
353 gfnc->counters.connection_closes);
354 json_object_int_add(jo, "connection-errors",
355 gfnc->counters.connection_errors);
356 json_object_int_add(jo, "data-plane-contexts",
357 gfnc->counters.dplane_contexts);
ba803a2f
RZ
358 json_object_int_add(jo, "data-plane-contexts-queue",
359 gfnc->counters.ctxqueue_len);
360 json_object_int_add(jo, "data-plane-contexts-queue-peak",
361 gfnc->counters.ctxqueue_len_peak);
6cc059cd 362 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
363 json_object_int_add(jo, "user-configures",
364 gfnc->counters.user_configures);
6cc059cd 365 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
962af8a8 366 vty_json(vty, jo);
6cc059cd
RZ
367
368 return CMD_SUCCESS;
369}
370
3bdd7fca
RZ
371static int fpm_write_config(struct vty *vty)
372{
373 struct sockaddr_in *sin;
374 struct sockaddr_in6 *sin6;
375 int written = 0;
3bdd7fca
RZ
376
377 if (gfnc->disabled)
378 return written;
379
380 switch (gfnc->addr.ss_family) {
381 case AF_INET:
382 written = 1;
383 sin = (struct sockaddr_in *)&gfnc->addr;
a3adec46 384 vty_out(vty, "fpm address %pI4", &sin->sin_addr);
3bdd7fca
RZ
385 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
386 vty_out(vty, " port %d", ntohs(sin->sin_port));
387
388 vty_out(vty, "\n");
389 break;
390 case AF_INET6:
391 written = 1;
392 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
a3adec46 393 vty_out(vty, "fpm address %pI6", &sin6->sin6_addr);
3bdd7fca
RZ
394 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
395 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
396
397 vty_out(vty, "\n");
398 break;
399
400 default:
401 break;
402 }
403
b55ab92a
RZ
404 if (!gfnc->use_nhg) {
405 vty_out(vty, "no fpm use-next-hop-groups\n");
406 written = 1;
407 }
408
3bdd7fca
RZ
409 return written;
410}
411
612c2c15 412static struct cmd_node fpm_node = {
893d8beb
DL
413 .name = "fpm",
414 .node = FPM_NODE,
3bdd7fca 415 .prompt = "",
612c2c15 416 .config_write = fpm_write_config,
3bdd7fca
RZ
417};
418
d35f447d
RZ
419/*
420 * FPM functions.
421 */
cc9f21da 422static void fpm_connect(struct thread *t);
d35f447d
RZ
423
424static void fpm_reconnect(struct fpm_nl_ctx *fnc)
425{
a2032324 426 /* Cancel all zebra threads first. */
f9bf1ecc
DE
427 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
428 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
a2032324
RZ
429 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
430 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
431 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
432 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
433 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
434 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
435
436 /*
437 * Grab the lock to empty the streams (data plane might try to
438 * enqueue updates while we are closing).
439 */
d35f447d
RZ
440 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
441
3bdd7fca
RZ
442 /* Avoid calling close on `-1`. */
443 if (fnc->socket != -1) {
444 close(fnc->socket);
445 fnc->socket = -1;
446 }
447
d35f447d
RZ
448 stream_reset(fnc->ibuf);
449 stream_reset(fnc->obuf);
450 THREAD_OFF(fnc->t_read);
451 THREAD_OFF(fnc->t_write);
018e77bc 452
3bdd7fca
RZ
453 /* FPM is disabled, don't attempt to connect. */
454 if (fnc->disabled)
455 return;
456
d35f447d
RZ
457 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
458 &fnc->t_connect);
459}
460
cc9f21da 461static void fpm_read(struct thread *t)
d35f447d
RZ
462{
463 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
464 ssize_t rv;
465
466 /* Let's ignore the input at the moment. */
467 rv = stream_read_try(fnc->ibuf, fnc->socket,
468 STREAM_WRITEABLE(fnc->ibuf));
e1afb97f
RZ
469 /* We've got an interruption. */
470 if (rv == -2) {
471 /* Schedule next read. */
472 thread_add_read(fnc->fthread->master, fpm_read, fnc,
473 fnc->socket, &fnc->t_read);
cc9f21da 474 return;
e1afb97f 475 }
d35f447d 476 if (rv == 0) {
c871e6c9
RZ
477 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
478 memory_order_relaxed);
e5e444d8
RZ
479
480 if (IS_ZEBRA_DEBUG_FPM)
481 zlog_debug("%s: connection closed", __func__);
482
a2032324 483 FPM_RECONNECT(fnc);
cc9f21da 484 return;
d35f447d
RZ
485 }
486 if (rv == -1) {
c871e6c9
RZ
487 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
488 memory_order_relaxed);
e5e444d8
RZ
489 zlog_warn("%s: connection failure: %s", __func__,
490 strerror(errno));
a2032324 491 FPM_RECONNECT(fnc);
cc9f21da 492 return;
d35f447d
RZ
493 }
494 stream_reset(fnc->ibuf);
495
6cc059cd 496 /* Account all bytes read. */
c871e6c9
RZ
497 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
498 memory_order_relaxed);
6cc059cd 499
d35f447d
RZ
500 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
501 &fnc->t_read);
d35f447d
RZ
502}
503
cc9f21da 504static void fpm_write(struct thread *t)
d35f447d
RZ
505{
506 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
507 socklen_t statuslen;
508 ssize_t bwritten;
509 int rv, status;
510 size_t btotal;
511
512 if (fnc->connecting == true) {
513 status = 0;
514 statuslen = sizeof(status);
515
516 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
517 &statuslen);
518 if (rv == -1 || status != 0) {
519 if (rv != -1)
e5e444d8
RZ
520 zlog_warn("%s: connection failed: %s", __func__,
521 strerror(status));
d35f447d 522 else
e5e444d8
RZ
523 zlog_warn("%s: SO_ERROR failed: %s", __func__,
524 strerror(status));
d35f447d 525
c871e6c9
RZ
526 atomic_fetch_add_explicit(
527 &fnc->counters.connection_errors, 1,
528 memory_order_relaxed);
6cc059cd 529
a2032324 530 FPM_RECONNECT(fnc);
cc9f21da 531 return;
d35f447d
RZ
532 }
533
534 fnc->connecting = false;
018e77bc 535
f584de52
RZ
536 /*
537 * Starting with LSPs walk all FPM objects, marking them
538 * as unsent and then replaying them.
539 */
540 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
541 &fnc->t_lspreset);
542
e1afb97f
RZ
543 /* Permit receiving messages now. */
544 thread_add_read(fnc->fthread->master, fpm_read, fnc,
545 fnc->socket, &fnc->t_read);
d35f447d
RZ
546 }
547
548 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
549
550 while (true) {
551 /* Stream is empty: reset pointers and return. */
552 if (STREAM_READABLE(fnc->obuf) == 0) {
553 stream_reset(fnc->obuf);
554 break;
555 }
556
557 /* Try to write all at once. */
558 btotal = stream_get_endp(fnc->obuf) -
559 stream_get_getp(fnc->obuf);
560 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
561 if (bwritten == 0) {
c871e6c9
RZ
562 atomic_fetch_add_explicit(
563 &fnc->counters.connection_closes, 1,
564 memory_order_relaxed);
e5e444d8
RZ
565
566 if (IS_ZEBRA_DEBUG_FPM)
567 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
568 break;
569 }
570 if (bwritten == -1) {
ad4d1022
RZ
571 /* Attempt to continue if blocked by a signal. */
572 if (errno == EINTR)
573 continue;
574 /* Receiver is probably slow, lets give it some time. */
575 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
576 break;
577
c871e6c9
RZ
578 atomic_fetch_add_explicit(
579 &fnc->counters.connection_errors, 1,
580 memory_order_relaxed);
e5e444d8
RZ
581 zlog_warn("%s: connection failure: %s", __func__,
582 strerror(errno));
a2032324
RZ
583
584 FPM_RECONNECT(fnc);
cc9f21da 585 return;
d35f447d
RZ
586 }
587
6cc059cd 588 /* Account all bytes sent. */
c871e6c9
RZ
589 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
590 memory_order_relaxed);
6cc059cd 591
ad4d1022 592 /* Account number of bytes free. */
c871e6c9
RZ
593 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
594 memory_order_relaxed);
ad4d1022 595
d35f447d
RZ
596 stream_forward_getp(fnc->obuf, (size_t)bwritten);
597 }
598
599 /* Stream is not empty yet, we must schedule more writes. */
600 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 601 stream_pulldown(fnc->obuf);
d35f447d
RZ
602 thread_add_write(fnc->fthread->master, fpm_write, fnc,
603 fnc->socket, &fnc->t_write);
cc9f21da 604 return;
d35f447d 605 }
d35f447d
RZ
606}
607
cc9f21da 608static void fpm_connect(struct thread *t)
d35f447d
RZ
609{
610 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
611 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
612 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
613 socklen_t slen;
d35f447d
RZ
614 int rv, sock;
615 char addrstr[INET6_ADDRSTRLEN];
616
3bdd7fca 617 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 618 if (sock == -1) {
6cc059cd 619 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
620 strerror(errno));
621 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
622 &fnc->t_connect);
cc9f21da 623 return;
d35f447d
RZ
624 }
625
626 set_nonblocking(sock);
627
3bdd7fca
RZ
628 if (fnc->addr.ss_family == AF_INET) {
629 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
630 slen = sizeof(*sin);
631 } else {
632 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
633 slen = sizeof(*sin6);
634 }
d35f447d 635
e5e444d8
RZ
636 if (IS_ZEBRA_DEBUG_FPM)
637 zlog_debug("%s: attempting to connect to %s:%d", __func__,
638 addrstr, ntohs(sin->sin_port));
d35f447d 639
3bdd7fca 640 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 641 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
642 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
643 memory_order_relaxed);
d35f447d
RZ
644 close(sock);
645 zlog_warn("%s: fpm connection failed: %s", __func__,
646 strerror(errno));
647 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
648 &fnc->t_connect);
cc9f21da 649 return;
d35f447d
RZ
650 }
651
652 fnc->connecting = (errno == EINPROGRESS);
653 fnc->socket = sock;
e1afb97f
RZ
654 if (!fnc->connecting)
655 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
656 &fnc->t_read);
d35f447d
RZ
657 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
658 &fnc->t_write);
659
f9bf1ecc
DE
660 /*
661 * Starting with LSPs walk all FPM objects, marking them
662 * as unsent and then replaying them.
f584de52
RZ
663 *
664 * If we are not connected, then delay the objects reset/send.
f9bf1ecc 665 */
f584de52
RZ
666 if (!fnc->connecting)
667 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
668 &fnc->t_lspreset);
d35f447d
RZ
669}
670
671/**
672 * Encode data plane operation context into netlink and enqueue it in the FPM
673 * output buffer.
674 *
675 * @param fnc the netlink FPM context.
676 * @param ctx the data plane operation context data.
677 * @return 0 on success or -1 on not enough space.
678 */
679static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
680{
681 uint8_t nl_buf[NL_PKT_BUF_SIZE];
682 size_t nl_buf_len;
683 ssize_t rv;
edfeff42 684 uint64_t obytes, obytes_peak;
b55ab92a
RZ
685 enum dplane_op_e op = dplane_ctx_get_op(ctx);
686
687 /*
688 * If we were configured to not use next hop groups, then quit as soon
689 * as possible.
690 */
691 if ((!fnc->use_nhg)
692 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
693 || op == DPLANE_OP_NH_UPDATE))
694 return 0;
d35f447d
RZ
695
696 nl_buf_len = 0;
697
698 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
699
b55ab92a 700 switch (op) {
d35f447d
RZ
701 case DPLANE_OP_ROUTE_UPDATE:
702 case DPLANE_OP_ROUTE_DELETE:
0be6e7d7
JU
703 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
704 nl_buf, sizeof(nl_buf),
705 true, fnc->use_nhg);
d35f447d 706 if (rv <= 0) {
0be6e7d7
JU
707 zlog_err(
708 "%s: netlink_route_multipath_msg_encode failed",
709 __func__);
d35f447d
RZ
710 return 0;
711 }
712
713 nl_buf_len = (size_t)rv;
d35f447d
RZ
714
715 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 716 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
717 break;
718
719 /* FALL THROUGH */
720 case DPLANE_OP_ROUTE_INSTALL:
0be6e7d7 721 rv = netlink_route_multipath_msg_encode(
b55ab92a
RZ
722 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
723 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 724 if (rv <= 0) {
0be6e7d7
JU
725 zlog_err(
726 "%s: netlink_route_multipath_msg_encode failed",
727 __func__);
d35f447d
RZ
728 return 0;
729 }
730
731 nl_buf_len += (size_t)rv;
d35f447d
RZ
732 break;
733
bda10adf
RZ
734 case DPLANE_OP_MAC_INSTALL:
735 case DPLANE_OP_MAC_DELETE:
736 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
737 if (rv <= 0) {
e5e444d8
RZ
738 zlog_err("%s: netlink_macfdb_update_ctx failed",
739 __func__);
bda10adf
RZ
740 return 0;
741 }
742
743 nl_buf_len = (size_t)rv;
bda10adf
RZ
744 break;
745
e9a1cd93 746 case DPLANE_OP_NH_DELETE:
0be6e7d7
JU
747 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
748 sizeof(nl_buf));
e9a1cd93 749 if (rv <= 0) {
0be6e7d7
JU
750 zlog_err("%s: netlink_nexthop_msg_encode failed",
751 __func__);
e9a1cd93
RZ
752 return 0;
753 }
754
755 nl_buf_len = (size_t)rv;
756 break;
d35f447d
RZ
757 case DPLANE_OP_NH_INSTALL:
758 case DPLANE_OP_NH_UPDATE:
0be6e7d7
JU
759 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
760 sizeof(nl_buf));
e9a1cd93 761 if (rv <= 0) {
0be6e7d7
JU
762 zlog_err("%s: netlink_nexthop_msg_encode failed",
763 __func__);
e9a1cd93
RZ
764 return 0;
765 }
766
767 nl_buf_len = (size_t)rv;
768 break;
769
d35f447d
RZ
770 case DPLANE_OP_LSP_INSTALL:
771 case DPLANE_OP_LSP_UPDATE:
772 case DPLANE_OP_LSP_DELETE:
b300c8bb
DE
773 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
774 if (rv <= 0) {
f9bf1ecc
DE
775 zlog_err("%s: netlink_lsp_msg_encoder failed",
776 __func__);
b300c8bb
DE
777 return 0;
778 }
779
780 nl_buf_len += (size_t)rv;
781 break;
782
d4bcd88d 783 /* Un-handled by FPM at this time. */
d35f447d
RZ
784 case DPLANE_OP_PW_INSTALL:
785 case DPLANE_OP_PW_UNINSTALL:
786 case DPLANE_OP_ADDR_INSTALL:
787 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
788 case DPLANE_OP_NEIGH_INSTALL:
789 case DPLANE_OP_NEIGH_UPDATE:
790 case DPLANE_OP_NEIGH_DELETE:
791 case DPLANE_OP_VTEP_ADD:
792 case DPLANE_OP_VTEP_DELETE:
793 case DPLANE_OP_SYS_ROUTE_ADD:
794 case DPLANE_OP_SYS_ROUTE_DELETE:
795 case DPLANE_OP_ROUTE_NOTIFY:
796 case DPLANE_OP_LSP_NOTIFY:
d4bcd88d
MS
797 case DPLANE_OP_RULE_ADD:
798 case DPLANE_OP_RULE_DELETE:
799 case DPLANE_OP_RULE_UPDATE:
800 case DPLANE_OP_NEIGH_DISCOVER:
801 case DPLANE_OP_BR_PORT_UPDATE:
802 case DPLANE_OP_IPTABLE_ADD:
803 case DPLANE_OP_IPTABLE_DELETE:
804 case DPLANE_OP_IPSET_ADD:
805 case DPLANE_OP_IPSET_DELETE:
806 case DPLANE_OP_IPSET_ENTRY_ADD:
807 case DPLANE_OP_IPSET_ENTRY_DELETE:
808 case DPLANE_OP_NEIGH_IP_INSTALL:
809 case DPLANE_OP_NEIGH_IP_DELETE:
810 case DPLANE_OP_NEIGH_TABLE_UPDATE:
811 case DPLANE_OP_GRE_SET:
812 case DPLANE_OP_INTF_ADDR_ADD:
813 case DPLANE_OP_INTF_ADDR_DEL:
728f2017 814 case DPLANE_OP_INTF_NETCONFIG:
d35f447d
RZ
815 case DPLANE_OP_NONE:
816 break;
817
d35f447d
RZ
818 }
819
820 /* Skip empty enqueues. */
821 if (nl_buf_len == 0)
822 return 0;
823
a179ba35
RZ
824 /* We must know if someday a message goes beyond 65KiB. */
825 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
826
827 /* Check if we have enough buffer space. */
828 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
829 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
830 memory_order_relaxed);
e5e444d8
RZ
831
832 if (IS_ZEBRA_DEBUG_FPM)
833 zlog_debug(
834 "%s: buffer full: wants to write %zu but has %zu",
835 __func__, nl_buf_len + FPM_HEADER_SIZE,
836 STREAM_WRITEABLE(fnc->obuf));
837
a179ba35
RZ
838 return -1;
839 }
840
d35f447d 841 /*
a179ba35
RZ
842 * Fill in the FPM header information.
843 *
844 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
845 */
846 stream_putc(fnc->obuf, 1);
847 stream_putc(fnc->obuf, 1);
a179ba35 848 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
849
850 /* Write current data. */
851 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
852
ad4d1022 853 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
854 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
855 nl_buf_len + FPM_HEADER_SIZE,
856 memory_order_relaxed);
edfeff42
RZ
857 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
858 memory_order_relaxed);
859 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
860 memory_order_relaxed);
861 if (obytes_peak < obytes)
c871e6c9
RZ
862 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
863 memory_order_relaxed);
ad4d1022 864
d35f447d
RZ
865 /* Tell the thread to start writing. */
866 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
867 &fnc->t_write);
868
869 return 0;
870}
871
f9bf1ecc
DE
872/*
873 * LSP walk/send functions
874 */
875struct fpm_lsp_arg {
876 struct zebra_dplane_ctx *ctx;
877 struct fpm_nl_ctx *fnc;
878 bool complete;
879};
880
881static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
882{
8f74a383 883 struct zebra_lsp *lsp = bucket->data;
f9bf1ecc
DE
884 struct fpm_lsp_arg *fla = arg;
885
886 /* Skip entries which have already been sent */
887 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
888 return HASHWALK_CONTINUE;
889
890 dplane_ctx_reset(fla->ctx);
891 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
892
893 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
894 fla->complete = false;
895 return HASHWALK_ABORT;
896 }
897
898 /* Mark entry as sent */
899 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
900 return HASHWALK_CONTINUE;
901}
902
cc9f21da 903static void fpm_lsp_send(struct thread *t)
f9bf1ecc
DE
904{
905 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
906 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
907 struct fpm_lsp_arg fla;
908
909 fla.fnc = fnc;
910 fla.ctx = dplane_ctx_alloc();
911 fla.complete = true;
912
913 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
914
915 dplane_ctx_fini(&fla.ctx);
916
917 if (fla.complete) {
918 WALK_FINISH(fnc, FNE_LSP_FINISHED);
919
920 /* Now move onto routes */
1f9193c1
RZ
921 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
922 &fnc->t_nhgreset);
f9bf1ecc
DE
923 } else {
924 /* Didn't finish - reschedule LSP walk */
925 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
926 &fnc->t_lspwalk);
927 }
f9bf1ecc
DE
928}
929
981ca597
RZ
930/*
931 * Next hop walk/send functions.
932 */
933struct fpm_nhg_arg {
934 struct zebra_dplane_ctx *ctx;
935 struct fpm_nl_ctx *fnc;
936 bool complete;
937};
938
939static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
940{
941 struct nhg_hash_entry *nhe = bucket->data;
942 struct fpm_nhg_arg *fna = arg;
943
944 /* This entry was already sent, skip it. */
945 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
946 return HASHWALK_CONTINUE;
947
948 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
949 dplane_ctx_reset(fna->ctx);
950 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
951 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
952 /* Our buffers are full, lets give it some cycles. */
953 fna->complete = false;
954 return HASHWALK_ABORT;
955 }
956
957 /* Mark group as sent, so it doesn't get sent again. */
958 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
959
960 return HASHWALK_CONTINUE;
961}
962
cc9f21da 963static void fpm_nhg_send(struct thread *t)
981ca597
RZ
964{
965 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
966 struct fpm_nhg_arg fna;
967
968 fna.fnc = fnc;
969 fna.ctx = dplane_ctx_alloc();
970 fna.complete = true;
971
972 /* Send next hops. */
1f9193c1
RZ
973 if (fnc->use_nhg)
974 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
981ca597
RZ
975
976 /* `free()` allocated memory. */
977 dplane_ctx_fini(&fna.ctx);
978
979 /* We are done sending next hops, lets install the routes now. */
55eb9d4d
RZ
980 if (fna.complete) {
981 WALK_FINISH(fnc, FNE_NHG_FINISHED);
e41e0f81
RZ
982 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
983 &fnc->t_ribreset);
55eb9d4d 984 } else /* Otherwise reschedule next hop group again. */
981ca597
RZ
985 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
986 &fnc->t_nhgwalk);
981ca597
RZ
987}
988
018e77bc
RZ
989/**
990 * Send all RIB installed routes to the connected data plane.
991 */
cc9f21da 992static void fpm_rib_send(struct thread *t)
018e77bc
RZ
993{
994 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
995 rib_dest_t *dest;
996 struct route_node *rn;
997 struct route_table *rt;
998 struct zebra_dplane_ctx *ctx;
999 rib_tables_iter_t rt_iter;
1000
1001 /* Allocate temporary context for all transactions. */
1002 ctx = dplane_ctx_alloc();
1003
1004 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1005 while ((rt = rib_tables_iter_next(&rt_iter))) {
1006 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1007 dest = rib_dest_from_rnode(rn);
1008 /* Skip bad route entries. */
a50404aa 1009 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 1010 continue;
018e77bc
RZ
1011
1012 /* Check for already sent routes. */
a50404aa 1013 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 1014 continue;
018e77bc
RZ
1015
1016 /* Enqueue route install. */
1017 dplane_ctx_reset(ctx);
1018 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1019 dest->selected_fib);
1020 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1021 /* Free the temporary allocated context. */
1022 dplane_ctx_fini(&ctx);
1023
018e77bc
RZ
1024 thread_add_timer(zrouter.master, fpm_rib_send,
1025 fnc, 1, &fnc->t_ribwalk);
cc9f21da 1026 return;
018e77bc
RZ
1027 }
1028
1029 /* Mark as sent. */
1030 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1031 }
1032 }
1033
1034 /* Free the temporary allocated context. */
1035 dplane_ctx_fini(&ctx);
1036
1037 /* All RIB routes sent! */
55eb9d4d 1038 WALK_FINISH(fnc, FNE_RIB_FINISHED);
018e77bc 1039
e41e0f81
RZ
1040 /* Schedule next event: RMAC reset. */
1041 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1042 &fnc->t_rmacreset);
018e77bc
RZ
1043}
1044
bda10adf
RZ
1045/*
1046 * The next three functions will handle RMAC enqueue.
1047 */
1048struct fpm_rmac_arg {
1049 struct zebra_dplane_ctx *ctx;
1050 struct fpm_nl_ctx *fnc;
05843a27 1051 struct zebra_l3vni *zl3vni;
55eb9d4d 1052 bool complete;
bda10adf
RZ
1053};
1054
1ac88792 1055static void fpm_enqueue_rmac_table(struct hash_bucket *bucket, void *arg)
bda10adf
RZ
1056{
1057 struct fpm_rmac_arg *fra = arg;
3198b2b3 1058 struct zebra_mac *zrmac = bucket->data;
bda10adf
RZ
1059 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1060 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1061 struct zebra_if *br_zif;
1062 vlanid_t vid;
1063 bool sticky;
1064
1065 /* Entry already sent. */
55eb9d4d 1066 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
bda10adf
RZ
1067 return;
1068
1069 sticky = !!CHECK_FLAG(zrmac->flags,
1070 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1071 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1072 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1073
1074 dplane_ctx_reset(fra->ctx);
1075 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1076 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a 1077 zif->brslave_info.br_if, vid,
f188e68e
AK
1078 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky,
1079 0 /*nhg*/, 0 /*update_flags*/);
bda10adf 1080 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1081 thread_add_timer(zrouter.master, fpm_rmac_send,
1082 fra->fnc, 1, &fra->fnc->t_rmacwalk);
55eb9d4d 1083 fra->complete = false;
bda10adf
RZ
1084 }
1085}
1086
1ac88792 1087static void fpm_enqueue_l3vni_table(struct hash_bucket *bucket, void *arg)
bda10adf
RZ
1088{
1089 struct fpm_rmac_arg *fra = arg;
05843a27 1090 struct zebra_l3vni *zl3vni = bucket->data;
bda10adf
RZ
1091
1092 fra->zl3vni = zl3vni;
1093 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1094}
1095
cc9f21da 1096static void fpm_rmac_send(struct thread *t)
bda10adf
RZ
1097{
1098 struct fpm_rmac_arg fra;
1099
1100 fra.fnc = THREAD_ARG(t);
1101 fra.ctx = dplane_ctx_alloc();
55eb9d4d 1102 fra.complete = true;
bda10adf
RZ
1103 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1104 dplane_ctx_fini(&fra.ctx);
1105
55eb9d4d
RZ
1106 /* RMAC walk completed. */
1107 if (fra.complete)
1108 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
bda10adf
RZ
1109}
1110
981ca597
RZ
1111/*
1112 * Resets the next hop FPM flags so we send all next hops again.
1113 */
1114static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1115{
1116 struct nhg_hash_entry *nhe = bucket->data;
1117
1118 /* Unset FPM installation flag so it gets installed again. */
1119 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1120}
1121
cc9f21da 1122static void fpm_nhg_reset(struct thread *t)
981ca597 1123{
55eb9d4d
RZ
1124 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1125
981ca597 1126 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
e41e0f81
RZ
1127
1128 /* Schedule next step: send next hop groups. */
1129 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
981ca597
RZ
1130}
1131
f9bf1ecc
DE
1132/*
1133 * Resets the LSP FPM flag so we send all LSPs again.
1134 */
1135static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1136{
8f74a383 1137 struct zebra_lsp *lsp = bucket->data;
f9bf1ecc
DE
1138
1139 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1140}
1141
cc9f21da 1142static void fpm_lsp_reset(struct thread *t)
f9bf1ecc
DE
1143{
1144 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1145 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1146
1147 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1148
1149 /* Schedule next step: send LSPs */
1150 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
f9bf1ecc
DE
1151}
1152
018e77bc
RZ
1153/**
1154 * Resets the RIB FPM flags so we send all routes again.
1155 */
cc9f21da 1156static void fpm_rib_reset(struct thread *t)
018e77bc
RZ
1157{
1158 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1159 rib_dest_t *dest;
1160 struct route_node *rn;
1161 struct route_table *rt;
1162 rib_tables_iter_t rt_iter;
1163
018e77bc
RZ
1164 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1165 while ((rt = rib_tables_iter_next(&rt_iter))) {
1166 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1167 dest = rib_dest_from_rnode(rn);
1168 /* Skip bad route entries. */
1169 if (dest == NULL)
1170 continue;
1171
1172 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1173 }
1174 }
1175
e41e0f81
RZ
1176 /* Schedule next step: send RIB routes. */
1177 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
018e77bc
RZ
1178}
1179
bda10adf
RZ
1180/*
1181 * The next three function will handle RMAC table reset.
1182 */
1ac88792 1183static void fpm_unset_rmac_table(struct hash_bucket *bucket, void *arg)
bda10adf 1184{
3198b2b3 1185 struct zebra_mac *zrmac = bucket->data;
bda10adf
RZ
1186
1187 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1188}
1189
1ac88792 1190static void fpm_unset_l3vni_table(struct hash_bucket *bucket, void *arg)
bda10adf 1191{
05843a27 1192 struct zebra_l3vni *zl3vni = bucket->data;
bda10adf
RZ
1193
1194 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1195}
1196
cc9f21da 1197static void fpm_rmac_reset(struct thread *t)
bda10adf 1198{
55eb9d4d
RZ
1199 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1200
bda10adf
RZ
1201 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1202
e41e0f81
RZ
1203 /* Schedule next event: send RMAC entries. */
1204 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1205 &fnc->t_rmacwalk);
bda10adf
RZ
1206}
1207
cc9f21da 1208static void fpm_process_queue(struct thread *t)
ba803a2f
RZ
1209{
1210 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1211 struct zebra_dplane_ctx *ctx;
3f2b998f 1212 bool no_bufs = false;
438dd3e7 1213 uint64_t processed_contexts = 0;
ba803a2f 1214
ba803a2f
RZ
1215 while (true) {
1216 /* No space available yet. */
3f2b998f
DE
1217 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) {
1218 no_bufs = true;
ba803a2f 1219 break;
3f2b998f 1220 }
ba803a2f
RZ
1221
1222 /* Dequeue next item or quit processing. */
dc693fe0
DE
1223 frr_with_mutex (&fnc->ctxqueue_mutex) {
1224 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1225 }
ba803a2f
RZ
1226 if (ctx == NULL)
1227 break;
1228
3a150188
DS
1229 /*
1230 * Intentionally ignoring the return value
1231 * as that we are ensuring that we can write to
1232 * the output data in the STREAM_WRITEABLE
1233 * check above, so we can ignore the return
1234 */
1235 (void)fpm_nl_enqueue(fnc, ctx);
ba803a2f
RZ
1236
1237 /* Account the processed entries. */
438dd3e7 1238 processed_contexts++;
c871e6c9
RZ
1239 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1240 memory_order_relaxed);
ba803a2f
RZ
1241
1242 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1243 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1244 }
1245
438dd3e7
DE
1246 /* Update count of processed contexts */
1247 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts,
1248 processed_contexts, memory_order_relaxed);
1249
3f2b998f
DE
1250 /* Re-schedule if we ran out of buffer space */
1251 if (no_bufs)
ba803a2f
RZ
1252 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1253 fnc, 0, &fnc->t_dequeue);
1254
164d8e86
DE
1255 /*
1256 * Let the dataplane thread know if there are items in the
1257 * output queue to be processed. Otherwise they may sit
1258 * until the dataplane thread gets scheduled for new,
1259 * unrelated work.
1260 */
1261 if (dplane_provider_out_ctx_queue_len(fnc->prov) > 0)
1262 dplane_provider_work_ready();
ba803a2f
RZ
1263}
1264
3bdd7fca
RZ
1265/**
1266 * Handles external (e.g. CLI, data plane or others) events.
1267 */
cc9f21da 1268static void fpm_process_event(struct thread *t)
3bdd7fca
RZ
1269{
1270 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1271 int event = THREAD_VAL(t);
1272
1273 switch (event) {
1274 case FNE_DISABLE:
e5e444d8 1275 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1276 fnc->disabled = true;
c871e6c9
RZ
1277 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1278 memory_order_relaxed);
3bdd7fca
RZ
1279
1280 /* Call reconnect to disable timers and clean up context. */
1281 fpm_reconnect(fnc);
1282 break;
1283
1284 case FNE_RECONNECT:
e5e444d8 1285 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1286 fnc->disabled = false;
c871e6c9
RZ
1287 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1288 memory_order_relaxed);
3bdd7fca
RZ
1289 fpm_reconnect(fnc);
1290 break;
1291
6cc059cd 1292 case FNE_RESET_COUNTERS:
e5e444d8 1293 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1294 memset(&fnc->counters, 0, sizeof(fnc->counters));
1295 break;
1296
b55ab92a
RZ
1297 case FNE_TOGGLE_NHG:
1298 zlog_info("%s: toggle next hop groups support", __func__);
1299 fnc->use_nhg = !fnc->use_nhg;
1300 fpm_reconnect(fnc);
1301 break;
1302
a2032324
RZ
1303 case FNE_INTERNAL_RECONNECT:
1304 fpm_reconnect(fnc);
1305 break;
1306
55eb9d4d
RZ
1307 case FNE_NHG_FINISHED:
1308 if (IS_ZEBRA_DEBUG_FPM)
1309 zlog_debug("%s: next hop groups walk finished",
1310 __func__);
55eb9d4d
RZ
1311 break;
1312 case FNE_RIB_FINISHED:
1313 if (IS_ZEBRA_DEBUG_FPM)
1314 zlog_debug("%s: RIB walk finished", __func__);
55eb9d4d
RZ
1315 break;
1316 case FNE_RMAC_FINISHED:
1317 if (IS_ZEBRA_DEBUG_FPM)
1318 zlog_debug("%s: RMAC walk finished", __func__);
55eb9d4d 1319 break;
f9bf1ecc
DE
1320 case FNE_LSP_FINISHED:
1321 if (IS_ZEBRA_DEBUG_FPM)
1322 zlog_debug("%s: LSP walk finished", __func__);
1323 break;
55eb9d4d 1324
3bdd7fca 1325 default:
e5e444d8
RZ
1326 if (IS_ZEBRA_DEBUG_FPM)
1327 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1328 break;
1329 }
3bdd7fca
RZ
1330}
1331
d35f447d
RZ
1332/*
1333 * Data plane functions.
1334 */
1335static int fpm_nl_start(struct zebra_dplane_provider *prov)
1336{
1337 struct fpm_nl_ctx *fnc;
1338
1339 fnc = dplane_provider_get_data(prov);
1340 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1341 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1342 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1343 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1344 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1345 fnc->socket = -1;
3bdd7fca 1346 fnc->disabled = true;
ba803a2f
RZ
1347 fnc->prov = prov;
1348 TAILQ_INIT(&fnc->ctxqueue);
1349 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1350
b55ab92a
RZ
1351 /* Set default values. */
1352 fnc->use_nhg = true;
1353
d35f447d
RZ
1354 return 0;
1355}
1356
98a87504 1357static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1358{
98a87504 1359 /* Disable all events and close socket. */
f9bf1ecc
DE
1360 THREAD_OFF(fnc->t_lspreset);
1361 THREAD_OFF(fnc->t_lspwalk);
981ca597
RZ
1362 THREAD_OFF(fnc->t_nhgreset);
1363 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1364 THREAD_OFF(fnc->t_ribreset);
1365 THREAD_OFF(fnc->t_ribwalk);
1366 THREAD_OFF(fnc->t_rmacreset);
1367 THREAD_OFF(fnc->t_rmacwalk);
1368 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1369 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1370 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1371
98a87504
RZ
1372 if (fnc->socket != -1) {
1373 close(fnc->socket);
1374 fnc->socket = -1;
1375 }
1376
1377 return 0;
1378}
1379
1380static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1381{
1382 /* Stop the running thread. */
1383 frr_pthread_stop(fnc->fthread, NULL);
1384
1385 /* Free all allocated resources. */
1386 pthread_mutex_destroy(&fnc->obuf_mutex);
1387 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1388 stream_free(fnc->ibuf);
1389 stream_free(fnc->obuf);
98a87504
RZ
1390 free(gfnc);
1391 gfnc = NULL;
d35f447d
RZ
1392
1393 return 0;
1394}
1395
98a87504
RZ
1396static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1397{
1398 struct fpm_nl_ctx *fnc;
1399
1400 fnc = dplane_provider_get_data(prov);
1401 if (early)
1402 return fpm_nl_finish_early(fnc);
1403
1404 return fpm_nl_finish_late(fnc);
1405}
1406
d35f447d
RZ
1407static int fpm_nl_process(struct zebra_dplane_provider *prov)
1408{
1409 struct zebra_dplane_ctx *ctx;
1410 struct fpm_nl_ctx *fnc;
1411 int counter, limit;
bf2f7839 1412 uint64_t cur_queue, peak_queue = 0, stored_peak_queue;
d35f447d
RZ
1413
1414 fnc = dplane_provider_get_data(prov);
1415 limit = dplane_provider_get_work_limit(prov);
1416 for (counter = 0; counter < limit; counter++) {
1417 ctx = dplane_provider_dequeue_in_ctx(prov);
1418 if (ctx == NULL)
1419 break;
1420
1421 /*
1422 * Skip all notifications if not connected, we'll walk the RIB
1423 * anyway.
1424 */
6cc059cd 1425 if (fnc->socket != -1 && fnc->connecting == false) {
dc693fe0
DE
1426 /*
1427 * Update the number of queued contexts *before*
1428 * enqueueing, to ensure counter consistency.
1429 */
c871e6c9
RZ
1430 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1431 1, memory_order_relaxed);
dc693fe0
DE
1432
1433 frr_with_mutex (&fnc->ctxqueue_mutex) {
1434 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1435 }
1436
c871e6c9
RZ
1437 cur_queue = atomic_load_explicit(
1438 &fnc->counters.ctxqueue_len,
1439 memory_order_relaxed);
edfeff42 1440 if (peak_queue < cur_queue)
bf2f7839 1441 peak_queue = cur_queue;
ba803a2f 1442 continue;
6cc059cd
RZ
1443 }
1444
d35f447d
RZ
1445 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1446 dplane_provider_enqueue_out_ctx(prov, ctx);
1447 }
1448
bf2f7839
DE
1449 /* Update peak queue length, if we just observed a new peak */
1450 stored_peak_queue = atomic_load_explicit(
1451 &fnc->counters.ctxqueue_len_peak, memory_order_relaxed);
1452 if (stored_peak_queue < peak_queue)
1453 atomic_store_explicit(&fnc->counters.ctxqueue_len_peak,
1454 peak_queue, memory_order_relaxed);
1455
c871e6c9
RZ
1456 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1457 memory_order_relaxed)
1458 > 0)
ba803a2f
RZ
1459 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1460 fnc, 0, &fnc->t_dequeue);
1461
b677907c
DE
1462 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1463 if (counter >= limit)
1464 dplane_provider_work_ready();
1465
d35f447d
RZ
1466 return 0;
1467}
1468
1469static int fpm_nl_new(struct thread_master *tm)
1470{
1471 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1472 int rv;
1473
3bdd7fca 1474 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1475 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1476 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1477 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1478 &prov);
1479
1480 if (IS_ZEBRA_DEBUG_DPLANE)
1481 zlog_debug("%s register status: %d", prov_name, rv);
1482
612c2c15 1483 install_node(&fpm_node);
6cc059cd
RZ
1484 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1485 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1486 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1487 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1488 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1489 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1490 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1491
d35f447d
RZ
1492 return 0;
1493}
1494
1495static int fpm_nl_init(void)
1496{
1497 hook_register(frr_late_init, fpm_nl_new);
1498 return 0;
1499}
1500
1501FRR_MODULE_SETUP(
1502 .name = "dplane_fpm_nl",
1503 .version = "0.0.1",
1504 .description = "Data plane plugin for FPM using netlink.",
1505 .init = fpm_nl_init,
80413c20 1506);