]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
zebra: Convert to `struct zebra_nhlfe` as per our internal standard
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
b300c8bb 46#include "zebra/zebra_mpls.h"
018e77bc 47#include "zebra/zebra_router.h"
b2998086
PR
48#include "zebra/zebra_evpn.h"
49#include "zebra/zebra_evpn_mac.h"
bda10adf 50#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
51#include "zebra/kernel_netlink.h"
52#include "zebra/rt_netlink.h"
53#include "zebra/debug.h"
54
55#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
56#define SOUTHBOUND_DEFAULT_PORT 2620
57
a179ba35
RZ
58/**
59 * FPM header:
60 * {
61 * version: 1 byte (always 1),
62 * type: 1 byte (1 for netlink, 2 protobuf),
63 * len: 2 bytes (network order),
64 * }
65 *
66 * This header is used with any format to tell the users how many bytes to
67 * expect.
68 */
69#define FPM_HEADER_SIZE 4
70
d35f447d
RZ
71static const char *prov_name = "dplane_fpm_nl";
72
73struct fpm_nl_ctx {
74 /* data plane connection. */
75 int socket;
3bdd7fca 76 bool disabled;
d35f447d 77 bool connecting;
b55ab92a 78 bool use_nhg;
d35f447d
RZ
79 struct sockaddr_storage addr;
80
81 /* data plane buffers. */
82 struct stream *ibuf;
83 struct stream *obuf;
84 pthread_mutex_t obuf_mutex;
85
ba803a2f
RZ
86 /*
87 * data plane context queue:
88 * When a FPM server connection becomes a bottleneck, we must keep the
89 * data plane contexts until we get a chance to process them.
90 */
91 struct dplane_ctx_q ctxqueue;
92 pthread_mutex_t ctxqueue_mutex;
93
d35f447d 94 /* data plane events. */
ba803a2f 95 struct zebra_dplane_provider *prov;
d35f447d
RZ
96 struct frr_pthread *fthread;
97 struct thread *t_connect;
98 struct thread *t_read;
99 struct thread *t_write;
3bdd7fca 100 struct thread *t_event;
ba803a2f 101 struct thread *t_dequeue;
018e77bc
RZ
102
103 /* zebra events. */
f9bf1ecc
DE
104 struct thread *t_lspreset;
105 struct thread *t_lspwalk;
981ca597
RZ
106 struct thread *t_nhgreset;
107 struct thread *t_nhgwalk;
018e77bc
RZ
108 struct thread *t_ribreset;
109 struct thread *t_ribwalk;
bda10adf
RZ
110 struct thread *t_rmacreset;
111 struct thread *t_rmacwalk;
6cc059cd
RZ
112
113 /* Statistic counters. */
114 struct {
115 /* Amount of bytes read into ibuf. */
770a8d28 116 _Atomic uint32_t bytes_read;
6cc059cd 117 /* Amount of bytes written from obuf. */
770a8d28 118 _Atomic uint32_t bytes_sent;
ad4d1022 119 /* Output buffer current usage. */
770a8d28 120 _Atomic uint32_t obuf_bytes;
ad4d1022 121 /* Output buffer peak usage. */
770a8d28 122 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
123
124 /* Amount of connection closes. */
770a8d28 125 _Atomic uint32_t connection_closes;
6cc059cd 126 /* Amount of connection errors. */
770a8d28 127 _Atomic uint32_t connection_errors;
6cc059cd
RZ
128
129 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 130 _Atomic uint32_t user_configures;
6cc059cd 131 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 132 _Atomic uint32_t user_disables;
6cc059cd
RZ
133
134 /* Amount of data plane context processed. */
770a8d28 135 _Atomic uint32_t dplane_contexts;
ba803a2f 136 /* Amount of data plane contexts enqueued. */
770a8d28 137 _Atomic uint32_t ctxqueue_len;
ba803a2f 138 /* Peak amount of data plane contexts enqueued. */
770a8d28 139 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
140
141 /* Amount of buffer full events. */
770a8d28 142 _Atomic uint32_t buffer_full;
6cc059cd 143 } counters;
770a8d28 144} *gfnc;
3bdd7fca
RZ
145
146enum fpm_nl_events {
147 /* Ask for FPM to reconnect the external server. */
148 FNE_RECONNECT,
149 /* Disable FPM. */
150 FNE_DISABLE,
6cc059cd
RZ
151 /* Reset counters. */
152 FNE_RESET_COUNTERS,
b55ab92a
RZ
153 /* Toggle next hop group feature. */
154 FNE_TOGGLE_NHG,
a2032324
RZ
155 /* Reconnect request by our own code to avoid races. */
156 FNE_INTERNAL_RECONNECT,
55eb9d4d 157
f9bf1ecc
DE
158 /* LSP walk finished. */
159 FNE_LSP_FINISHED,
55eb9d4d
RZ
160 /* Next hop groups walk finished. */
161 FNE_NHG_FINISHED,
162 /* RIB walk finished. */
163 FNE_RIB_FINISHED,
164 /* RMAC walk finished. */
165 FNE_RMAC_FINISHED,
d35f447d
RZ
166};
167
a2032324
RZ
168#define FPM_RECONNECT(fnc) \
169 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
170 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
171
55eb9d4d
RZ
172#define WALK_FINISH(fnc, ev) \
173 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
174 (ev), NULL)
175
018e77bc
RZ
176/*
177 * Prototypes.
178 */
3bdd7fca 179static int fpm_process_event(struct thread *t);
018e77bc 180static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
f9bf1ecc
DE
181static int fpm_lsp_send(struct thread *t);
182static int fpm_lsp_reset(struct thread *t);
981ca597
RZ
183static int fpm_nhg_send(struct thread *t);
184static int fpm_nhg_reset(struct thread *t);
018e77bc
RZ
185static int fpm_rib_send(struct thread *t);
186static int fpm_rib_reset(struct thread *t);
bda10adf
RZ
187static int fpm_rmac_send(struct thread *t);
188static int fpm_rmac_reset(struct thread *t);
018e77bc 189
3bdd7fca
RZ
190/*
191 * CLI.
192 */
6cc059cd
RZ
193#define FPM_STR "Forwarding Plane Manager configuration\n"
194
3bdd7fca
RZ
195DEFUN(fpm_set_address, fpm_set_address_cmd,
196 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 197 FPM_STR
3bdd7fca
RZ
198 "FPM remote listening server address\n"
199 "Remote IPv4 FPM server\n"
200 "Remote IPv6 FPM server\n"
201 "FPM remote listening server port\n"
202 "Remote FPM server port\n")
203{
204 struct sockaddr_in *sin;
205 struct sockaddr_in6 *sin6;
206 uint16_t port = 0;
207 uint8_t naddr[INET6_BUFSIZ];
208
209 if (argc == 5)
210 port = strtol(argv[4]->arg, NULL, 10);
211
212 /* Handle IPv4 addresses. */
213 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
214 sin = (struct sockaddr_in *)&gfnc->addr;
215
216 memset(sin, 0, sizeof(*sin));
217 sin->sin_family = AF_INET;
218 sin->sin_port =
219 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
220#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
221 sin->sin_len = sizeof(*sin);
222#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
223 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
224
225 goto ask_reconnect;
226 }
227
228 /* Handle IPv6 addresses. */
229 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
230 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
231 return CMD_WARNING;
232 }
233
234 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
235 memset(sin6, 0, sizeof(*sin6));
236 sin6->sin6_family = AF_INET6;
237 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
238#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
239 sin6->sin6_len = sizeof(*sin6);
240#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
241 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
242
243ask_reconnect:
244 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
245 FNE_RECONNECT, &gfnc->t_event);
246 return CMD_SUCCESS;
247}
248
249DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
250 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
251 NO_STR
6cc059cd 252 FPM_STR
3bdd7fca
RZ
253 "FPM remote listening server address\n"
254 "Remote IPv4 FPM server\n"
255 "Remote IPv6 FPM server\n"
256 "FPM remote listening server port\n"
257 "Remote FPM server port\n")
258{
259 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
260 FNE_DISABLE, &gfnc->t_event);
261 return CMD_SUCCESS;
262}
263
b55ab92a
RZ
264DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
265 "fpm use-next-hop-groups",
266 FPM_STR
267 "Use netlink next hop groups feature.\n")
268{
269 /* Already enabled. */
270 if (gfnc->use_nhg)
271 return CMD_SUCCESS;
272
273 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
274 FNE_TOGGLE_NHG, &gfnc->t_event);
275
276 return CMD_SUCCESS;
277}
278
279DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
280 "no fpm use-next-hop-groups",
281 NO_STR
282 FPM_STR
283 "Use netlink next hop groups feature.\n")
284{
285 /* Already disabled. */
286 if (!gfnc->use_nhg)
287 return CMD_SUCCESS;
288
289 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
290 FNE_TOGGLE_NHG, &gfnc->t_event);
291
292 return CMD_SUCCESS;
293}
294
6cc059cd
RZ
295DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
296 "clear fpm counters",
297 CLEAR_STR
298 FPM_STR
299 "FPM statistic counters\n")
300{
301 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
302 FNE_RESET_COUNTERS, &gfnc->t_event);
303 return CMD_SUCCESS;
304}
305
306DEFUN(fpm_show_counters, fpm_show_counters_cmd,
307 "show fpm counters",
308 SHOW_STR
309 FPM_STR
310 "FPM statistic counters\n")
311{
312 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
313
314#define SHOW_COUNTER(label, counter) \
770a8d28 315 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
316
317 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
318 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
319 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
320 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
321 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
322 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
323 SHOW_COUNTER("Data plane items processed",
324 gfnc->counters.dplane_contexts);
ba803a2f
RZ
325 SHOW_COUNTER("Data plane items enqueued",
326 gfnc->counters.ctxqueue_len);
327 SHOW_COUNTER("Data plane items queue peak",
328 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
329 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
330 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
331 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
332
333#undef SHOW_COUNTER
334
335 return CMD_SUCCESS;
336}
337
338DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
339 "show fpm counters json",
340 SHOW_STR
341 FPM_STR
342 "FPM statistic counters\n"
343 JSON_STR)
344{
345 struct json_object *jo;
346
347 jo = json_object_new_object();
348 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
349 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
350 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
351 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
352 json_object_int_add(jo, "connection-closes",
353 gfnc->counters.connection_closes);
354 json_object_int_add(jo, "connection-errors",
355 gfnc->counters.connection_errors);
356 json_object_int_add(jo, "data-plane-contexts",
357 gfnc->counters.dplane_contexts);
ba803a2f
RZ
358 json_object_int_add(jo, "data-plane-contexts-queue",
359 gfnc->counters.ctxqueue_len);
360 json_object_int_add(jo, "data-plane-contexts-queue-peak",
361 gfnc->counters.ctxqueue_len_peak);
6cc059cd 362 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
363 json_object_int_add(jo, "user-configures",
364 gfnc->counters.user_configures);
6cc059cd
RZ
365 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
366 vty_out(vty, "%s\n", json_object_to_json_string_ext(jo, 0));
367 json_object_free(jo);
368
369 return CMD_SUCCESS;
370}
371
3bdd7fca
RZ
372static int fpm_write_config(struct vty *vty)
373{
374 struct sockaddr_in *sin;
375 struct sockaddr_in6 *sin6;
376 int written = 0;
3bdd7fca
RZ
377
378 if (gfnc->disabled)
379 return written;
380
381 switch (gfnc->addr.ss_family) {
382 case AF_INET:
383 written = 1;
384 sin = (struct sockaddr_in *)&gfnc->addr;
a3adec46 385 vty_out(vty, "fpm address %pI4", &sin->sin_addr);
3bdd7fca
RZ
386 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
387 vty_out(vty, " port %d", ntohs(sin->sin_port));
388
389 vty_out(vty, "\n");
390 break;
391 case AF_INET6:
392 written = 1;
393 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
a3adec46 394 vty_out(vty, "fpm address %pI6", &sin6->sin6_addr);
3bdd7fca
RZ
395 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
396 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
397
398 vty_out(vty, "\n");
399 break;
400
401 default:
402 break;
403 }
404
b55ab92a
RZ
405 if (!gfnc->use_nhg) {
406 vty_out(vty, "no fpm use-next-hop-groups\n");
407 written = 1;
408 }
409
3bdd7fca
RZ
410 return written;
411}
412
612c2c15 413static struct cmd_node fpm_node = {
893d8beb
DL
414 .name = "fpm",
415 .node = FPM_NODE,
3bdd7fca 416 .prompt = "",
612c2c15 417 .config_write = fpm_write_config,
3bdd7fca
RZ
418};
419
d35f447d
RZ
420/*
421 * FPM functions.
422 */
423static int fpm_connect(struct thread *t);
424
425static void fpm_reconnect(struct fpm_nl_ctx *fnc)
426{
a2032324 427 /* Cancel all zebra threads first. */
f9bf1ecc
DE
428 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
429 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
a2032324
RZ
430 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
431 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
432 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
433 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
434 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
435 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
436
437 /*
438 * Grab the lock to empty the streams (data plane might try to
439 * enqueue updates while we are closing).
440 */
d35f447d
RZ
441 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
442
3bdd7fca
RZ
443 /* Avoid calling close on `-1`. */
444 if (fnc->socket != -1) {
445 close(fnc->socket);
446 fnc->socket = -1;
447 }
448
d35f447d
RZ
449 stream_reset(fnc->ibuf);
450 stream_reset(fnc->obuf);
451 THREAD_OFF(fnc->t_read);
452 THREAD_OFF(fnc->t_write);
018e77bc 453
3bdd7fca
RZ
454 /* FPM is disabled, don't attempt to connect. */
455 if (fnc->disabled)
456 return;
457
d35f447d
RZ
458 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
459 &fnc->t_connect);
460}
461
462static int fpm_read(struct thread *t)
463{
464 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
465 ssize_t rv;
466
467 /* Let's ignore the input at the moment. */
468 rv = stream_read_try(fnc->ibuf, fnc->socket,
469 STREAM_WRITEABLE(fnc->ibuf));
e1afb97f
RZ
470 /* We've got an interruption. */
471 if (rv == -2) {
472 /* Schedule next read. */
473 thread_add_read(fnc->fthread->master, fpm_read, fnc,
474 fnc->socket, &fnc->t_read);
475 return 0;
476 }
d35f447d 477 if (rv == 0) {
c871e6c9
RZ
478 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
479 memory_order_relaxed);
e5e444d8
RZ
480
481 if (IS_ZEBRA_DEBUG_FPM)
482 zlog_debug("%s: connection closed", __func__);
483
a2032324 484 FPM_RECONNECT(fnc);
d35f447d
RZ
485 return 0;
486 }
487 if (rv == -1) {
c871e6c9
RZ
488 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
489 memory_order_relaxed);
e5e444d8
RZ
490 zlog_warn("%s: connection failure: %s", __func__,
491 strerror(errno));
a2032324 492 FPM_RECONNECT(fnc);
d35f447d
RZ
493 return 0;
494 }
495 stream_reset(fnc->ibuf);
496
6cc059cd 497 /* Account all bytes read. */
c871e6c9
RZ
498 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
499 memory_order_relaxed);
6cc059cd 500
d35f447d
RZ
501 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
502 &fnc->t_read);
503
504 return 0;
505}
506
507static int fpm_write(struct thread *t)
508{
509 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
510 socklen_t statuslen;
511 ssize_t bwritten;
512 int rv, status;
513 size_t btotal;
514
515 if (fnc->connecting == true) {
516 status = 0;
517 statuslen = sizeof(status);
518
519 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
520 &statuslen);
521 if (rv == -1 || status != 0) {
522 if (rv != -1)
e5e444d8
RZ
523 zlog_warn("%s: connection failed: %s", __func__,
524 strerror(status));
d35f447d 525 else
e5e444d8
RZ
526 zlog_warn("%s: SO_ERROR failed: %s", __func__,
527 strerror(status));
d35f447d 528
c871e6c9
RZ
529 atomic_fetch_add_explicit(
530 &fnc->counters.connection_errors, 1,
531 memory_order_relaxed);
6cc059cd 532
a2032324 533 FPM_RECONNECT(fnc);
d35f447d
RZ
534 return 0;
535 }
536
537 fnc->connecting = false;
018e77bc 538
f584de52
RZ
539 /*
540 * Starting with LSPs walk all FPM objects, marking them
541 * as unsent and then replaying them.
542 */
543 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
544 &fnc->t_lspreset);
545
e1afb97f
RZ
546 /* Permit receiving messages now. */
547 thread_add_read(fnc->fthread->master, fpm_read, fnc,
548 fnc->socket, &fnc->t_read);
d35f447d
RZ
549 }
550
551 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
552
553 while (true) {
554 /* Stream is empty: reset pointers and return. */
555 if (STREAM_READABLE(fnc->obuf) == 0) {
556 stream_reset(fnc->obuf);
557 break;
558 }
559
560 /* Try to write all at once. */
561 btotal = stream_get_endp(fnc->obuf) -
562 stream_get_getp(fnc->obuf);
563 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
564 if (bwritten == 0) {
c871e6c9
RZ
565 atomic_fetch_add_explicit(
566 &fnc->counters.connection_closes, 1,
567 memory_order_relaxed);
e5e444d8
RZ
568
569 if (IS_ZEBRA_DEBUG_FPM)
570 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
571 break;
572 }
573 if (bwritten == -1) {
ad4d1022
RZ
574 /* Attempt to continue if blocked by a signal. */
575 if (errno == EINTR)
576 continue;
577 /* Receiver is probably slow, lets give it some time. */
578 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
579 break;
580
c871e6c9
RZ
581 atomic_fetch_add_explicit(
582 &fnc->counters.connection_errors, 1,
583 memory_order_relaxed);
e5e444d8
RZ
584 zlog_warn("%s: connection failure: %s", __func__,
585 strerror(errno));
a2032324
RZ
586
587 FPM_RECONNECT(fnc);
588 return 0;
d35f447d
RZ
589 }
590
6cc059cd 591 /* Account all bytes sent. */
c871e6c9
RZ
592 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
593 memory_order_relaxed);
6cc059cd 594
ad4d1022 595 /* Account number of bytes free. */
c871e6c9
RZ
596 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
597 memory_order_relaxed);
ad4d1022 598
d35f447d
RZ
599 stream_forward_getp(fnc->obuf, (size_t)bwritten);
600 }
601
602 /* Stream is not empty yet, we must schedule more writes. */
603 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 604 stream_pulldown(fnc->obuf);
d35f447d
RZ
605 thread_add_write(fnc->fthread->master, fpm_write, fnc,
606 fnc->socket, &fnc->t_write);
607 return 0;
608 }
609
610 return 0;
611}
612
613static int fpm_connect(struct thread *t)
614{
615 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
616 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
617 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
618 socklen_t slen;
d35f447d
RZ
619 int rv, sock;
620 char addrstr[INET6_ADDRSTRLEN];
621
3bdd7fca 622 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 623 if (sock == -1) {
6cc059cd 624 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
625 strerror(errno));
626 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
627 &fnc->t_connect);
628 return 0;
629 }
630
631 set_nonblocking(sock);
632
3bdd7fca
RZ
633 if (fnc->addr.ss_family == AF_INET) {
634 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
635 slen = sizeof(*sin);
636 } else {
637 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
638 slen = sizeof(*sin6);
639 }
d35f447d 640
e5e444d8
RZ
641 if (IS_ZEBRA_DEBUG_FPM)
642 zlog_debug("%s: attempting to connect to %s:%d", __func__,
643 addrstr, ntohs(sin->sin_port));
d35f447d 644
3bdd7fca 645 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 646 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
647 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
648 memory_order_relaxed);
d35f447d
RZ
649 close(sock);
650 zlog_warn("%s: fpm connection failed: %s", __func__,
651 strerror(errno));
652 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
653 &fnc->t_connect);
654 return 0;
655 }
656
657 fnc->connecting = (errno == EINPROGRESS);
658 fnc->socket = sock;
e1afb97f
RZ
659 if (!fnc->connecting)
660 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
661 &fnc->t_read);
d35f447d
RZ
662 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
663 &fnc->t_write);
664
f9bf1ecc
DE
665 /*
666 * Starting with LSPs walk all FPM objects, marking them
667 * as unsent and then replaying them.
f584de52
RZ
668 *
669 * If we are not connected, then delay the objects reset/send.
f9bf1ecc 670 */
f584de52
RZ
671 if (!fnc->connecting)
672 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
673 &fnc->t_lspreset);
018e77bc 674
d35f447d
RZ
675 return 0;
676}
677
678/**
679 * Encode data plane operation context into netlink and enqueue it in the FPM
680 * output buffer.
681 *
682 * @param fnc the netlink FPM context.
683 * @param ctx the data plane operation context data.
684 * @return 0 on success or -1 on not enough space.
685 */
686static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
687{
688 uint8_t nl_buf[NL_PKT_BUF_SIZE];
689 size_t nl_buf_len;
690 ssize_t rv;
edfeff42 691 uint64_t obytes, obytes_peak;
b55ab92a
RZ
692 enum dplane_op_e op = dplane_ctx_get_op(ctx);
693
694 /*
695 * If we were configured to not use next hop groups, then quit as soon
696 * as possible.
697 */
698 if ((!fnc->use_nhg)
699 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
700 || op == DPLANE_OP_NH_UPDATE))
701 return 0;
d35f447d
RZ
702
703 nl_buf_len = 0;
704
705 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
706
b55ab92a 707 switch (op) {
d35f447d
RZ
708 case DPLANE_OP_ROUTE_UPDATE:
709 case DPLANE_OP_ROUTE_DELETE:
0be6e7d7
JU
710 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
711 nl_buf, sizeof(nl_buf),
712 true, fnc->use_nhg);
d35f447d 713 if (rv <= 0) {
0be6e7d7
JU
714 zlog_err(
715 "%s: netlink_route_multipath_msg_encode failed",
716 __func__);
d35f447d
RZ
717 return 0;
718 }
719
720 nl_buf_len = (size_t)rv;
d35f447d
RZ
721
722 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 723 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
724 break;
725
726 /* FALL THROUGH */
727 case DPLANE_OP_ROUTE_INSTALL:
0be6e7d7 728 rv = netlink_route_multipath_msg_encode(
b55ab92a
RZ
729 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
730 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 731 if (rv <= 0) {
0be6e7d7
JU
732 zlog_err(
733 "%s: netlink_route_multipath_msg_encode failed",
734 __func__);
d35f447d
RZ
735 return 0;
736 }
737
738 nl_buf_len += (size_t)rv;
d35f447d
RZ
739 break;
740
bda10adf
RZ
741 case DPLANE_OP_MAC_INSTALL:
742 case DPLANE_OP_MAC_DELETE:
743 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
744 if (rv <= 0) {
e5e444d8
RZ
745 zlog_err("%s: netlink_macfdb_update_ctx failed",
746 __func__);
bda10adf
RZ
747 return 0;
748 }
749
750 nl_buf_len = (size_t)rv;
bda10adf
RZ
751 break;
752
e9a1cd93 753 case DPLANE_OP_NH_DELETE:
0be6e7d7
JU
754 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
755 sizeof(nl_buf));
e9a1cd93 756 if (rv <= 0) {
0be6e7d7
JU
757 zlog_err("%s: netlink_nexthop_msg_encode failed",
758 __func__);
e9a1cd93
RZ
759 return 0;
760 }
761
762 nl_buf_len = (size_t)rv;
763 break;
d35f447d
RZ
764 case DPLANE_OP_NH_INSTALL:
765 case DPLANE_OP_NH_UPDATE:
0be6e7d7
JU
766 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
767 sizeof(nl_buf));
e9a1cd93 768 if (rv <= 0) {
0be6e7d7
JU
769 zlog_err("%s: netlink_nexthop_msg_encode failed",
770 __func__);
e9a1cd93
RZ
771 return 0;
772 }
773
774 nl_buf_len = (size_t)rv;
775 break;
776
d35f447d
RZ
777 case DPLANE_OP_LSP_INSTALL:
778 case DPLANE_OP_LSP_UPDATE:
779 case DPLANE_OP_LSP_DELETE:
b300c8bb
DE
780 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
781 if (rv <= 0) {
f9bf1ecc
DE
782 zlog_err("%s: netlink_lsp_msg_encoder failed",
783 __func__);
b300c8bb
DE
784 return 0;
785 }
786
787 nl_buf_len += (size_t)rv;
788 break;
789
d35f447d
RZ
790 case DPLANE_OP_PW_INSTALL:
791 case DPLANE_OP_PW_UNINSTALL:
792 case DPLANE_OP_ADDR_INSTALL:
793 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
794 case DPLANE_OP_NEIGH_INSTALL:
795 case DPLANE_OP_NEIGH_UPDATE:
796 case DPLANE_OP_NEIGH_DELETE:
797 case DPLANE_OP_VTEP_ADD:
798 case DPLANE_OP_VTEP_DELETE:
799 case DPLANE_OP_SYS_ROUTE_ADD:
800 case DPLANE_OP_SYS_ROUTE_DELETE:
801 case DPLANE_OP_ROUTE_NOTIFY:
802 case DPLANE_OP_LSP_NOTIFY:
803 case DPLANE_OP_NONE:
804 break;
805
806 default:
e5e444d8
RZ
807 if (IS_ZEBRA_DEBUG_FPM)
808 zlog_debug("%s: unhandled data plane message (%d) %s",
809 __func__, dplane_ctx_get_op(ctx),
810 dplane_op2str(dplane_ctx_get_op(ctx)));
d35f447d
RZ
811 break;
812 }
813
814 /* Skip empty enqueues. */
815 if (nl_buf_len == 0)
816 return 0;
817
a179ba35
RZ
818 /* We must know if someday a message goes beyond 65KiB. */
819 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
820
821 /* Check if we have enough buffer space. */
822 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
823 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
824 memory_order_relaxed);
e5e444d8
RZ
825
826 if (IS_ZEBRA_DEBUG_FPM)
827 zlog_debug(
828 "%s: buffer full: wants to write %zu but has %zu",
829 __func__, nl_buf_len + FPM_HEADER_SIZE,
830 STREAM_WRITEABLE(fnc->obuf));
831
a179ba35
RZ
832 return -1;
833 }
834
d35f447d 835 /*
a179ba35
RZ
836 * Fill in the FPM header information.
837 *
838 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
839 */
840 stream_putc(fnc->obuf, 1);
841 stream_putc(fnc->obuf, 1);
a179ba35 842 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
843
844 /* Write current data. */
845 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
846
ad4d1022 847 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
848 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
849 nl_buf_len + FPM_HEADER_SIZE,
850 memory_order_relaxed);
edfeff42
RZ
851 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
852 memory_order_relaxed);
853 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
854 memory_order_relaxed);
855 if (obytes_peak < obytes)
c871e6c9
RZ
856 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
857 memory_order_relaxed);
ad4d1022 858
d35f447d
RZ
859 /* Tell the thread to start writing. */
860 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
861 &fnc->t_write);
862
863 return 0;
864}
865
f9bf1ecc
DE
866/*
867 * LSP walk/send functions
868 */
869struct fpm_lsp_arg {
870 struct zebra_dplane_ctx *ctx;
871 struct fpm_nl_ctx *fnc;
872 bool complete;
873};
874
875static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
876{
877 zebra_lsp_t *lsp = bucket->data;
878 struct fpm_lsp_arg *fla = arg;
879
880 /* Skip entries which have already been sent */
881 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
882 return HASHWALK_CONTINUE;
883
884 dplane_ctx_reset(fla->ctx);
885 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
886
887 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
888 fla->complete = false;
889 return HASHWALK_ABORT;
890 }
891
892 /* Mark entry as sent */
893 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
894 return HASHWALK_CONTINUE;
895}
896
897static int fpm_lsp_send(struct thread *t)
898{
899 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
900 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
901 struct fpm_lsp_arg fla;
902
903 fla.fnc = fnc;
904 fla.ctx = dplane_ctx_alloc();
905 fla.complete = true;
906
907 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
908
909 dplane_ctx_fini(&fla.ctx);
910
911 if (fla.complete) {
912 WALK_FINISH(fnc, FNE_LSP_FINISHED);
913
914 /* Now move onto routes */
1f9193c1
RZ
915 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
916 &fnc->t_nhgreset);
f9bf1ecc
DE
917 } else {
918 /* Didn't finish - reschedule LSP walk */
919 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
920 &fnc->t_lspwalk);
921 }
922
923 return 0;
924}
925
981ca597
RZ
926/*
927 * Next hop walk/send functions.
928 */
929struct fpm_nhg_arg {
930 struct zebra_dplane_ctx *ctx;
931 struct fpm_nl_ctx *fnc;
932 bool complete;
933};
934
935static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
936{
937 struct nhg_hash_entry *nhe = bucket->data;
938 struct fpm_nhg_arg *fna = arg;
939
940 /* This entry was already sent, skip it. */
941 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
942 return HASHWALK_CONTINUE;
943
944 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
945 dplane_ctx_reset(fna->ctx);
946 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
947 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
948 /* Our buffers are full, lets give it some cycles. */
949 fna->complete = false;
950 return HASHWALK_ABORT;
951 }
952
953 /* Mark group as sent, so it doesn't get sent again. */
954 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
955
956 return HASHWALK_CONTINUE;
957}
958
959static int fpm_nhg_send(struct thread *t)
960{
961 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
962 struct fpm_nhg_arg fna;
963
964 fna.fnc = fnc;
965 fna.ctx = dplane_ctx_alloc();
966 fna.complete = true;
967
968 /* Send next hops. */
1f9193c1
RZ
969 if (fnc->use_nhg)
970 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
981ca597
RZ
971
972 /* `free()` allocated memory. */
973 dplane_ctx_fini(&fna.ctx);
974
975 /* We are done sending next hops, lets install the routes now. */
55eb9d4d
RZ
976 if (fna.complete) {
977 WALK_FINISH(fnc, FNE_NHG_FINISHED);
e41e0f81
RZ
978 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
979 &fnc->t_ribreset);
55eb9d4d 980 } else /* Otherwise reschedule next hop group again. */
981ca597
RZ
981 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
982 &fnc->t_nhgwalk);
983
984 return 0;
985}
986
018e77bc
RZ
987/**
988 * Send all RIB installed routes to the connected data plane.
989 */
990static int fpm_rib_send(struct thread *t)
991{
992 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
993 rib_dest_t *dest;
994 struct route_node *rn;
995 struct route_table *rt;
996 struct zebra_dplane_ctx *ctx;
997 rib_tables_iter_t rt_iter;
998
999 /* Allocate temporary context for all transactions. */
1000 ctx = dplane_ctx_alloc();
1001
1002 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1003 while ((rt = rib_tables_iter_next(&rt_iter))) {
1004 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1005 dest = rib_dest_from_rnode(rn);
1006 /* Skip bad route entries. */
a50404aa 1007 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 1008 continue;
018e77bc
RZ
1009
1010 /* Check for already sent routes. */
a50404aa 1011 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 1012 continue;
018e77bc
RZ
1013
1014 /* Enqueue route install. */
1015 dplane_ctx_reset(ctx);
1016 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1017 dest->selected_fib);
1018 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1019 /* Free the temporary allocated context. */
1020 dplane_ctx_fini(&ctx);
1021
018e77bc
RZ
1022 thread_add_timer(zrouter.master, fpm_rib_send,
1023 fnc, 1, &fnc->t_ribwalk);
1024 return 0;
1025 }
1026
1027 /* Mark as sent. */
1028 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1029 }
1030 }
1031
1032 /* Free the temporary allocated context. */
1033 dplane_ctx_fini(&ctx);
1034
1035 /* All RIB routes sent! */
55eb9d4d 1036 WALK_FINISH(fnc, FNE_RIB_FINISHED);
018e77bc 1037
e41e0f81
RZ
1038 /* Schedule next event: RMAC reset. */
1039 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1040 &fnc->t_rmacreset);
1041
018e77bc
RZ
1042 return 0;
1043}
1044
bda10adf
RZ
1045/*
1046 * The next three functions will handle RMAC enqueue.
1047 */
1048struct fpm_rmac_arg {
1049 struct zebra_dplane_ctx *ctx;
1050 struct fpm_nl_ctx *fnc;
05843a27 1051 struct zebra_l3vni *zl3vni;
55eb9d4d 1052 bool complete;
bda10adf
RZ
1053};
1054
1ac88792 1055static void fpm_enqueue_rmac_table(struct hash_bucket *bucket, void *arg)
bda10adf
RZ
1056{
1057 struct fpm_rmac_arg *fra = arg;
3198b2b3 1058 struct zebra_mac *zrmac = bucket->data;
bda10adf
RZ
1059 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1060 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1061 struct zebra_if *br_zif;
1062 vlanid_t vid;
1063 bool sticky;
1064
1065 /* Entry already sent. */
55eb9d4d 1066 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
bda10adf
RZ
1067 return;
1068
1069 sticky = !!CHECK_FLAG(zrmac->flags,
1070 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1071 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1072 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1073
1074 dplane_ctx_reset(fra->ctx);
1075 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1076 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a 1077 zif->brslave_info.br_if, vid,
f188e68e
AK
1078 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky,
1079 0 /*nhg*/, 0 /*update_flags*/);
bda10adf 1080 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1081 thread_add_timer(zrouter.master, fpm_rmac_send,
1082 fra->fnc, 1, &fra->fnc->t_rmacwalk);
55eb9d4d 1083 fra->complete = false;
bda10adf
RZ
1084 }
1085}
1086
1ac88792 1087static void fpm_enqueue_l3vni_table(struct hash_bucket *bucket, void *arg)
bda10adf
RZ
1088{
1089 struct fpm_rmac_arg *fra = arg;
05843a27 1090 struct zebra_l3vni *zl3vni = bucket->data;
bda10adf
RZ
1091
1092 fra->zl3vni = zl3vni;
1093 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1094}
1095
1096static int fpm_rmac_send(struct thread *t)
1097{
1098 struct fpm_rmac_arg fra;
1099
1100 fra.fnc = THREAD_ARG(t);
1101 fra.ctx = dplane_ctx_alloc();
55eb9d4d 1102 fra.complete = true;
bda10adf
RZ
1103 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1104 dplane_ctx_fini(&fra.ctx);
1105
55eb9d4d
RZ
1106 /* RMAC walk completed. */
1107 if (fra.complete)
1108 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
1109
bda10adf
RZ
1110 return 0;
1111}
1112
981ca597
RZ
1113/*
1114 * Resets the next hop FPM flags so we send all next hops again.
1115 */
1116static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1117{
1118 struct nhg_hash_entry *nhe = bucket->data;
1119
1120 /* Unset FPM installation flag so it gets installed again. */
1121 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1122}
1123
1124static int fpm_nhg_reset(struct thread *t)
1125{
55eb9d4d
RZ
1126 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1127
981ca597 1128 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
e41e0f81
RZ
1129
1130 /* Schedule next step: send next hop groups. */
1131 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
1132
981ca597
RZ
1133 return 0;
1134}
1135
f9bf1ecc
DE
1136/*
1137 * Resets the LSP FPM flag so we send all LSPs again.
1138 */
1139static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1140{
1141 zebra_lsp_t *lsp = bucket->data;
1142
1143 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1144}
1145
1146static int fpm_lsp_reset(struct thread *t)
1147{
1148 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1149 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1150
1151 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1152
1153 /* Schedule next step: send LSPs */
1154 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
1155
1156 return 0;
1157}
1158
018e77bc
RZ
1159/**
1160 * Resets the RIB FPM flags so we send all routes again.
1161 */
1162static int fpm_rib_reset(struct thread *t)
1163{
1164 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1165 rib_dest_t *dest;
1166 struct route_node *rn;
1167 struct route_table *rt;
1168 rib_tables_iter_t rt_iter;
1169
018e77bc
RZ
1170 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1171 while ((rt = rib_tables_iter_next(&rt_iter))) {
1172 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1173 dest = rib_dest_from_rnode(rn);
1174 /* Skip bad route entries. */
1175 if (dest == NULL)
1176 continue;
1177
1178 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1179 }
1180 }
1181
e41e0f81
RZ
1182 /* Schedule next step: send RIB routes. */
1183 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
1184
018e77bc
RZ
1185 return 0;
1186}
1187
bda10adf
RZ
1188/*
1189 * The next three function will handle RMAC table reset.
1190 */
1ac88792 1191static void fpm_unset_rmac_table(struct hash_bucket *bucket, void *arg)
bda10adf 1192{
3198b2b3 1193 struct zebra_mac *zrmac = bucket->data;
bda10adf
RZ
1194
1195 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1196}
1197
1ac88792 1198static void fpm_unset_l3vni_table(struct hash_bucket *bucket, void *arg)
bda10adf 1199{
05843a27 1200 struct zebra_l3vni *zl3vni = bucket->data;
bda10adf
RZ
1201
1202 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1203}
1204
1205static int fpm_rmac_reset(struct thread *t)
1206{
55eb9d4d
RZ
1207 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1208
bda10adf
RZ
1209 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1210
e41e0f81
RZ
1211 /* Schedule next event: send RMAC entries. */
1212 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1213 &fnc->t_rmacwalk);
1214
bda10adf
RZ
1215 return 0;
1216}
1217
ba803a2f
RZ
1218static int fpm_process_queue(struct thread *t)
1219{
1220 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1221 struct zebra_dplane_ctx *ctx;
3f2b998f 1222 bool no_bufs = false;
438dd3e7 1223 uint64_t processed_contexts = 0;
ba803a2f 1224
ba803a2f
RZ
1225 while (true) {
1226 /* No space available yet. */
3f2b998f
DE
1227 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) {
1228 no_bufs = true;
ba803a2f 1229 break;
3f2b998f 1230 }
ba803a2f
RZ
1231
1232 /* Dequeue next item or quit processing. */
dc693fe0
DE
1233 frr_with_mutex (&fnc->ctxqueue_mutex) {
1234 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1235 }
ba803a2f
RZ
1236 if (ctx == NULL)
1237 break;
1238
3a150188
DS
1239 /*
1240 * Intentionally ignoring the return value
1241 * as that we are ensuring that we can write to
1242 * the output data in the STREAM_WRITEABLE
1243 * check above, so we can ignore the return
1244 */
1245 (void)fpm_nl_enqueue(fnc, ctx);
ba803a2f
RZ
1246
1247 /* Account the processed entries. */
438dd3e7 1248 processed_contexts++;
c871e6c9
RZ
1249 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1250 memory_order_relaxed);
ba803a2f
RZ
1251
1252 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1253 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1254 }
1255
438dd3e7
DE
1256 /* Update count of processed contexts */
1257 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts,
1258 processed_contexts, memory_order_relaxed);
1259
3f2b998f
DE
1260 /* Re-schedule if we ran out of buffer space */
1261 if (no_bufs)
ba803a2f
RZ
1262 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1263 fnc, 0, &fnc->t_dequeue);
1264
164d8e86
DE
1265 /*
1266 * Let the dataplane thread know if there are items in the
1267 * output queue to be processed. Otherwise they may sit
1268 * until the dataplane thread gets scheduled for new,
1269 * unrelated work.
1270 */
1271 if (dplane_provider_out_ctx_queue_len(fnc->prov) > 0)
1272 dplane_provider_work_ready();
1273
ba803a2f
RZ
1274 return 0;
1275}
1276
3bdd7fca
RZ
1277/**
1278 * Handles external (e.g. CLI, data plane or others) events.
1279 */
1280static int fpm_process_event(struct thread *t)
1281{
1282 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1283 int event = THREAD_VAL(t);
1284
1285 switch (event) {
1286 case FNE_DISABLE:
e5e444d8 1287 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1288 fnc->disabled = true;
c871e6c9
RZ
1289 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1290 memory_order_relaxed);
3bdd7fca
RZ
1291
1292 /* Call reconnect to disable timers and clean up context. */
1293 fpm_reconnect(fnc);
1294 break;
1295
1296 case FNE_RECONNECT:
e5e444d8 1297 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1298 fnc->disabled = false;
c871e6c9
RZ
1299 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1300 memory_order_relaxed);
3bdd7fca
RZ
1301 fpm_reconnect(fnc);
1302 break;
1303
6cc059cd 1304 case FNE_RESET_COUNTERS:
e5e444d8 1305 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1306 memset(&fnc->counters, 0, sizeof(fnc->counters));
1307 break;
1308
b55ab92a
RZ
1309 case FNE_TOGGLE_NHG:
1310 zlog_info("%s: toggle next hop groups support", __func__);
1311 fnc->use_nhg = !fnc->use_nhg;
1312 fpm_reconnect(fnc);
1313 break;
1314
a2032324
RZ
1315 case FNE_INTERNAL_RECONNECT:
1316 fpm_reconnect(fnc);
1317 break;
1318
55eb9d4d
RZ
1319 case FNE_NHG_FINISHED:
1320 if (IS_ZEBRA_DEBUG_FPM)
1321 zlog_debug("%s: next hop groups walk finished",
1322 __func__);
55eb9d4d
RZ
1323 break;
1324 case FNE_RIB_FINISHED:
1325 if (IS_ZEBRA_DEBUG_FPM)
1326 zlog_debug("%s: RIB walk finished", __func__);
55eb9d4d
RZ
1327 break;
1328 case FNE_RMAC_FINISHED:
1329 if (IS_ZEBRA_DEBUG_FPM)
1330 zlog_debug("%s: RMAC walk finished", __func__);
55eb9d4d 1331 break;
f9bf1ecc
DE
1332 case FNE_LSP_FINISHED:
1333 if (IS_ZEBRA_DEBUG_FPM)
1334 zlog_debug("%s: LSP walk finished", __func__);
1335 break;
55eb9d4d 1336
3bdd7fca 1337 default:
e5e444d8
RZ
1338 if (IS_ZEBRA_DEBUG_FPM)
1339 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1340 break;
1341 }
1342
1343 return 0;
1344}
1345
d35f447d
RZ
1346/*
1347 * Data plane functions.
1348 */
1349static int fpm_nl_start(struct zebra_dplane_provider *prov)
1350{
1351 struct fpm_nl_ctx *fnc;
1352
1353 fnc = dplane_provider_get_data(prov);
1354 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1355 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1356 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1357 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1358 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1359 fnc->socket = -1;
3bdd7fca 1360 fnc->disabled = true;
ba803a2f
RZ
1361 fnc->prov = prov;
1362 TAILQ_INIT(&fnc->ctxqueue);
1363 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1364
b55ab92a
RZ
1365 /* Set default values. */
1366 fnc->use_nhg = true;
1367
d35f447d
RZ
1368 return 0;
1369}
1370
98a87504 1371static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1372{
98a87504 1373 /* Disable all events and close socket. */
f9bf1ecc
DE
1374 THREAD_OFF(fnc->t_lspreset);
1375 THREAD_OFF(fnc->t_lspwalk);
981ca597
RZ
1376 THREAD_OFF(fnc->t_nhgreset);
1377 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1378 THREAD_OFF(fnc->t_ribreset);
1379 THREAD_OFF(fnc->t_ribwalk);
1380 THREAD_OFF(fnc->t_rmacreset);
1381 THREAD_OFF(fnc->t_rmacwalk);
1382 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1383 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1384 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1385
98a87504
RZ
1386 if (fnc->socket != -1) {
1387 close(fnc->socket);
1388 fnc->socket = -1;
1389 }
1390
1391 return 0;
1392}
1393
1394static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1395{
1396 /* Stop the running thread. */
1397 frr_pthread_stop(fnc->fthread, NULL);
1398
1399 /* Free all allocated resources. */
1400 pthread_mutex_destroy(&fnc->obuf_mutex);
1401 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1402 stream_free(fnc->ibuf);
1403 stream_free(fnc->obuf);
98a87504
RZ
1404 free(gfnc);
1405 gfnc = NULL;
d35f447d
RZ
1406
1407 return 0;
1408}
1409
98a87504
RZ
1410static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1411{
1412 struct fpm_nl_ctx *fnc;
1413
1414 fnc = dplane_provider_get_data(prov);
1415 if (early)
1416 return fpm_nl_finish_early(fnc);
1417
1418 return fpm_nl_finish_late(fnc);
1419}
1420
d35f447d
RZ
1421static int fpm_nl_process(struct zebra_dplane_provider *prov)
1422{
1423 struct zebra_dplane_ctx *ctx;
1424 struct fpm_nl_ctx *fnc;
1425 int counter, limit;
bf2f7839 1426 uint64_t cur_queue, peak_queue = 0, stored_peak_queue;
d35f447d
RZ
1427
1428 fnc = dplane_provider_get_data(prov);
1429 limit = dplane_provider_get_work_limit(prov);
1430 for (counter = 0; counter < limit; counter++) {
1431 ctx = dplane_provider_dequeue_in_ctx(prov);
1432 if (ctx == NULL)
1433 break;
1434
1435 /*
1436 * Skip all notifications if not connected, we'll walk the RIB
1437 * anyway.
1438 */
6cc059cd 1439 if (fnc->socket != -1 && fnc->connecting == false) {
dc693fe0
DE
1440 /*
1441 * Update the number of queued contexts *before*
1442 * enqueueing, to ensure counter consistency.
1443 */
c871e6c9
RZ
1444 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1445 1, memory_order_relaxed);
dc693fe0
DE
1446
1447 frr_with_mutex (&fnc->ctxqueue_mutex) {
1448 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1449 }
1450
c871e6c9
RZ
1451 cur_queue = atomic_load_explicit(
1452 &fnc->counters.ctxqueue_len,
1453 memory_order_relaxed);
edfeff42 1454 if (peak_queue < cur_queue)
bf2f7839 1455 peak_queue = cur_queue;
ba803a2f 1456 continue;
6cc059cd
RZ
1457 }
1458
d35f447d
RZ
1459 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1460 dplane_provider_enqueue_out_ctx(prov, ctx);
1461 }
1462
bf2f7839
DE
1463 /* Update peak queue length, if we just observed a new peak */
1464 stored_peak_queue = atomic_load_explicit(
1465 &fnc->counters.ctxqueue_len_peak, memory_order_relaxed);
1466 if (stored_peak_queue < peak_queue)
1467 atomic_store_explicit(&fnc->counters.ctxqueue_len_peak,
1468 peak_queue, memory_order_relaxed);
1469
c871e6c9
RZ
1470 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1471 memory_order_relaxed)
1472 > 0)
ba803a2f
RZ
1473 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1474 fnc, 0, &fnc->t_dequeue);
1475
b677907c
DE
1476 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1477 if (counter >= limit)
1478 dplane_provider_work_ready();
1479
d35f447d
RZ
1480 return 0;
1481}
1482
1483static int fpm_nl_new(struct thread_master *tm)
1484{
1485 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1486 int rv;
1487
3bdd7fca 1488 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1489 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1490 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1491 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1492 &prov);
1493
1494 if (IS_ZEBRA_DEBUG_DPLANE)
1495 zlog_debug("%s register status: %d", prov_name, rv);
1496
612c2c15 1497 install_node(&fpm_node);
6cc059cd
RZ
1498 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1499 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1500 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1501 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1502 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1503 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1504 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1505
d35f447d
RZ
1506 return 0;
1507}
1508
1509static int fpm_nl_init(void)
1510{
1511 hook_register(frr_late_init, fpm_nl_new);
1512 return 0;
1513}
1514
1515FRR_MODULE_SETUP(
1516 .name = "dplane_fpm_nl",
1517 .version = "0.0.1",
1518 .description = "Data plane plugin for FPM using netlink.",
1519 .init = fpm_nl_init,
80413c20 1520);