]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
fpm: simplify reset logic
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
b300c8bb 46#include "zebra/zebra_mpls.h"
018e77bc 47#include "zebra/zebra_router.h"
b2998086
PR
48#include "zebra/zebra_evpn.h"
49#include "zebra/zebra_evpn_mac.h"
bda10adf 50#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
51#include "zebra/kernel_netlink.h"
52#include "zebra/rt_netlink.h"
53#include "zebra/debug.h"
54
55#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
56#define SOUTHBOUND_DEFAULT_PORT 2620
57
a179ba35
RZ
58/**
59 * FPM header:
60 * {
61 * version: 1 byte (always 1),
62 * type: 1 byte (1 for netlink, 2 protobuf),
63 * len: 2 bytes (network order),
64 * }
65 *
66 * This header is used with any format to tell the users how many bytes to
67 * expect.
68 */
69#define FPM_HEADER_SIZE 4
70
d35f447d
RZ
71static const char *prov_name = "dplane_fpm_nl";
72
73struct fpm_nl_ctx {
74 /* data plane connection. */
75 int socket;
3bdd7fca 76 bool disabled;
d35f447d 77 bool connecting;
b55ab92a 78 bool use_nhg;
d35f447d
RZ
79 struct sockaddr_storage addr;
80
81 /* data plane buffers. */
82 struct stream *ibuf;
83 struct stream *obuf;
84 pthread_mutex_t obuf_mutex;
85
ba803a2f
RZ
86 /*
87 * data plane context queue:
88 * When a FPM server connection becomes a bottleneck, we must keep the
89 * data plane contexts until we get a chance to process them.
90 */
91 struct dplane_ctx_q ctxqueue;
92 pthread_mutex_t ctxqueue_mutex;
93
d35f447d 94 /* data plane events. */
ba803a2f 95 struct zebra_dplane_provider *prov;
d35f447d
RZ
96 struct frr_pthread *fthread;
97 struct thread *t_connect;
98 struct thread *t_read;
99 struct thread *t_write;
3bdd7fca 100 struct thread *t_event;
ba803a2f 101 struct thread *t_dequeue;
018e77bc
RZ
102
103 /* zebra events. */
f9bf1ecc
DE
104 struct thread *t_lspreset;
105 struct thread *t_lspwalk;
981ca597
RZ
106 struct thread *t_nhgreset;
107 struct thread *t_nhgwalk;
018e77bc
RZ
108 struct thread *t_ribreset;
109 struct thread *t_ribwalk;
bda10adf
RZ
110 struct thread *t_rmacreset;
111 struct thread *t_rmacwalk;
6cc059cd
RZ
112
113 /* Statistic counters. */
114 struct {
115 /* Amount of bytes read into ibuf. */
770a8d28 116 _Atomic uint32_t bytes_read;
6cc059cd 117 /* Amount of bytes written from obuf. */
770a8d28 118 _Atomic uint32_t bytes_sent;
ad4d1022 119 /* Output buffer current usage. */
770a8d28 120 _Atomic uint32_t obuf_bytes;
ad4d1022 121 /* Output buffer peak usage. */
770a8d28 122 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
123
124 /* Amount of connection closes. */
770a8d28 125 _Atomic uint32_t connection_closes;
6cc059cd 126 /* Amount of connection errors. */
770a8d28 127 _Atomic uint32_t connection_errors;
6cc059cd
RZ
128
129 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 130 _Atomic uint32_t user_configures;
6cc059cd 131 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 132 _Atomic uint32_t user_disables;
6cc059cd
RZ
133
134 /* Amount of data plane context processed. */
770a8d28 135 _Atomic uint32_t dplane_contexts;
ba803a2f 136 /* Amount of data plane contexts enqueued. */
770a8d28 137 _Atomic uint32_t ctxqueue_len;
ba803a2f 138 /* Peak amount of data plane contexts enqueued. */
770a8d28 139 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
140
141 /* Amount of buffer full events. */
770a8d28 142 _Atomic uint32_t buffer_full;
6cc059cd 143 } counters;
770a8d28 144} *gfnc;
3bdd7fca
RZ
145
146enum fpm_nl_events {
147 /* Ask for FPM to reconnect the external server. */
148 FNE_RECONNECT,
149 /* Disable FPM. */
150 FNE_DISABLE,
6cc059cd
RZ
151 /* Reset counters. */
152 FNE_RESET_COUNTERS,
b55ab92a
RZ
153 /* Toggle next hop group feature. */
154 FNE_TOGGLE_NHG,
a2032324
RZ
155 /* Reconnect request by our own code to avoid races. */
156 FNE_INTERNAL_RECONNECT,
55eb9d4d 157
f9bf1ecc
DE
158 /* LSP walk finished. */
159 FNE_LSP_FINISHED,
55eb9d4d
RZ
160 /* Next hop groups walk finished. */
161 FNE_NHG_FINISHED,
162 /* RIB walk finished. */
163 FNE_RIB_FINISHED,
164 /* RMAC walk finished. */
165 FNE_RMAC_FINISHED,
d35f447d
RZ
166};
167
a2032324
RZ
168#define FPM_RECONNECT(fnc) \
169 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
170 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
171
55eb9d4d
RZ
172#define WALK_FINISH(fnc, ev) \
173 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
174 (ev), NULL)
175
018e77bc
RZ
176/*
177 * Prototypes.
178 */
3bdd7fca 179static int fpm_process_event(struct thread *t);
018e77bc 180static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
f9bf1ecc
DE
181static int fpm_lsp_send(struct thread *t);
182static int fpm_lsp_reset(struct thread *t);
981ca597
RZ
183static int fpm_nhg_send(struct thread *t);
184static int fpm_nhg_reset(struct thread *t);
018e77bc
RZ
185static int fpm_rib_send(struct thread *t);
186static int fpm_rib_reset(struct thread *t);
bda10adf
RZ
187static int fpm_rmac_send(struct thread *t);
188static int fpm_rmac_reset(struct thread *t);
018e77bc 189
3bdd7fca
RZ
190/*
191 * CLI.
192 */
6cc059cd
RZ
193#define FPM_STR "Forwarding Plane Manager configuration\n"
194
3bdd7fca
RZ
195DEFUN(fpm_set_address, fpm_set_address_cmd,
196 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 197 FPM_STR
3bdd7fca
RZ
198 "FPM remote listening server address\n"
199 "Remote IPv4 FPM server\n"
200 "Remote IPv6 FPM server\n"
201 "FPM remote listening server port\n"
202 "Remote FPM server port\n")
203{
204 struct sockaddr_in *sin;
205 struct sockaddr_in6 *sin6;
206 uint16_t port = 0;
207 uint8_t naddr[INET6_BUFSIZ];
208
209 if (argc == 5)
210 port = strtol(argv[4]->arg, NULL, 10);
211
212 /* Handle IPv4 addresses. */
213 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
214 sin = (struct sockaddr_in *)&gfnc->addr;
215
216 memset(sin, 0, sizeof(*sin));
217 sin->sin_family = AF_INET;
218 sin->sin_port =
219 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
220#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
221 sin->sin_len = sizeof(*sin);
222#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
223 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
224
225 goto ask_reconnect;
226 }
227
228 /* Handle IPv6 addresses. */
229 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
230 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
231 return CMD_WARNING;
232 }
233
234 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
235 memset(sin6, 0, sizeof(*sin6));
236 sin6->sin6_family = AF_INET6;
237 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
238#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
239 sin6->sin6_len = sizeof(*sin6);
240#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
241 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
242
243ask_reconnect:
244 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
245 FNE_RECONNECT, &gfnc->t_event);
246 return CMD_SUCCESS;
247}
248
249DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
250 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
251 NO_STR
6cc059cd 252 FPM_STR
3bdd7fca
RZ
253 "FPM remote listening server address\n"
254 "Remote IPv4 FPM server\n"
255 "Remote IPv6 FPM server\n"
256 "FPM remote listening server port\n"
257 "Remote FPM server port\n")
258{
259 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
260 FNE_DISABLE, &gfnc->t_event);
261 return CMD_SUCCESS;
262}
263
b55ab92a
RZ
264DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
265 "fpm use-next-hop-groups",
266 FPM_STR
267 "Use netlink next hop groups feature.\n")
268{
269 /* Already enabled. */
270 if (gfnc->use_nhg)
271 return CMD_SUCCESS;
272
273 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
274 FNE_TOGGLE_NHG, &gfnc->t_event);
275
276 return CMD_SUCCESS;
277}
278
279DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
280 "no fpm use-next-hop-groups",
281 NO_STR
282 FPM_STR
283 "Use netlink next hop groups feature.\n")
284{
285 /* Already disabled. */
286 if (!gfnc->use_nhg)
287 return CMD_SUCCESS;
288
289 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
290 FNE_TOGGLE_NHG, &gfnc->t_event);
291
292 return CMD_SUCCESS;
293}
294
6cc059cd
RZ
295DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
296 "clear fpm counters",
297 CLEAR_STR
298 FPM_STR
299 "FPM statistic counters\n")
300{
301 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
302 FNE_RESET_COUNTERS, &gfnc->t_event);
303 return CMD_SUCCESS;
304}
305
306DEFUN(fpm_show_counters, fpm_show_counters_cmd,
307 "show fpm counters",
308 SHOW_STR
309 FPM_STR
310 "FPM statistic counters\n")
311{
312 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
313
314#define SHOW_COUNTER(label, counter) \
770a8d28 315 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
316
317 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
318 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
319 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
320 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
321 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
322 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
323 SHOW_COUNTER("Data plane items processed",
324 gfnc->counters.dplane_contexts);
ba803a2f
RZ
325 SHOW_COUNTER("Data plane items enqueued",
326 gfnc->counters.ctxqueue_len);
327 SHOW_COUNTER("Data plane items queue peak",
328 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
329 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
330 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
331 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
332
333#undef SHOW_COUNTER
334
335 return CMD_SUCCESS;
336}
337
338DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
339 "show fpm counters json",
340 SHOW_STR
341 FPM_STR
342 "FPM statistic counters\n"
343 JSON_STR)
344{
345 struct json_object *jo;
346
347 jo = json_object_new_object();
348 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
349 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
350 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
351 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
352 json_object_int_add(jo, "connection-closes",
353 gfnc->counters.connection_closes);
354 json_object_int_add(jo, "connection-errors",
355 gfnc->counters.connection_errors);
356 json_object_int_add(jo, "data-plane-contexts",
357 gfnc->counters.dplane_contexts);
ba803a2f
RZ
358 json_object_int_add(jo, "data-plane-contexts-queue",
359 gfnc->counters.ctxqueue_len);
360 json_object_int_add(jo, "data-plane-contexts-queue-peak",
361 gfnc->counters.ctxqueue_len_peak);
6cc059cd 362 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
363 json_object_int_add(jo, "user-configures",
364 gfnc->counters.user_configures);
6cc059cd
RZ
365 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
366 vty_out(vty, "%s\n", json_object_to_json_string_ext(jo, 0));
367 json_object_free(jo);
368
369 return CMD_SUCCESS;
370}
371
3bdd7fca
RZ
372static int fpm_write_config(struct vty *vty)
373{
374 struct sockaddr_in *sin;
375 struct sockaddr_in6 *sin6;
376 int written = 0;
3bdd7fca
RZ
377
378 if (gfnc->disabled)
379 return written;
380
381 switch (gfnc->addr.ss_family) {
382 case AF_INET:
383 written = 1;
384 sin = (struct sockaddr_in *)&gfnc->addr;
a3adec46 385 vty_out(vty, "fpm address %pI4", &sin->sin_addr);
3bdd7fca
RZ
386 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
387 vty_out(vty, " port %d", ntohs(sin->sin_port));
388
389 vty_out(vty, "\n");
390 break;
391 case AF_INET6:
392 written = 1;
393 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
a3adec46 394 vty_out(vty, "fpm address %pI6", &sin6->sin6_addr);
3bdd7fca
RZ
395 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
396 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
397
398 vty_out(vty, "\n");
399 break;
400
401 default:
402 break;
403 }
404
b55ab92a
RZ
405 if (!gfnc->use_nhg) {
406 vty_out(vty, "no fpm use-next-hop-groups\n");
407 written = 1;
408 }
409
3bdd7fca
RZ
410 return written;
411}
412
612c2c15 413static struct cmd_node fpm_node = {
893d8beb
DL
414 .name = "fpm",
415 .node = FPM_NODE,
3bdd7fca 416 .prompt = "",
612c2c15 417 .config_write = fpm_write_config,
3bdd7fca
RZ
418};
419
d35f447d
RZ
420/*
421 * FPM functions.
422 */
423static int fpm_connect(struct thread *t);
424
425static void fpm_reconnect(struct fpm_nl_ctx *fnc)
426{
a2032324 427 /* Cancel all zebra threads first. */
f9bf1ecc
DE
428 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
429 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
a2032324
RZ
430 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
431 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
432 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
433 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
434 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
435 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
436
437 /*
438 * Grab the lock to empty the streams (data plane might try to
439 * enqueue updates while we are closing).
440 */
d35f447d
RZ
441 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
442
3bdd7fca
RZ
443 /* Avoid calling close on `-1`. */
444 if (fnc->socket != -1) {
445 close(fnc->socket);
446 fnc->socket = -1;
447 }
448
d35f447d
RZ
449 stream_reset(fnc->ibuf);
450 stream_reset(fnc->obuf);
451 THREAD_OFF(fnc->t_read);
452 THREAD_OFF(fnc->t_write);
018e77bc 453
3bdd7fca
RZ
454 /* FPM is disabled, don't attempt to connect. */
455 if (fnc->disabled)
456 return;
457
d35f447d
RZ
458 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
459 &fnc->t_connect);
460}
461
462static int fpm_read(struct thread *t)
463{
464 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
465 ssize_t rv;
466
467 /* Let's ignore the input at the moment. */
468 rv = stream_read_try(fnc->ibuf, fnc->socket,
469 STREAM_WRITEABLE(fnc->ibuf));
e1afb97f
RZ
470 /* We've got an interruption. */
471 if (rv == -2) {
472 /* Schedule next read. */
473 thread_add_read(fnc->fthread->master, fpm_read, fnc,
474 fnc->socket, &fnc->t_read);
475 return 0;
476 }
d35f447d 477 if (rv == 0) {
c871e6c9
RZ
478 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
479 memory_order_relaxed);
e5e444d8
RZ
480
481 if (IS_ZEBRA_DEBUG_FPM)
482 zlog_debug("%s: connection closed", __func__);
483
a2032324 484 FPM_RECONNECT(fnc);
d35f447d
RZ
485 return 0;
486 }
487 if (rv == -1) {
c871e6c9
RZ
488 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
489 memory_order_relaxed);
e5e444d8
RZ
490 zlog_warn("%s: connection failure: %s", __func__,
491 strerror(errno));
a2032324 492 FPM_RECONNECT(fnc);
d35f447d
RZ
493 return 0;
494 }
495 stream_reset(fnc->ibuf);
496
6cc059cd 497 /* Account all bytes read. */
c871e6c9
RZ
498 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
499 memory_order_relaxed);
6cc059cd 500
d35f447d
RZ
501 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
502 &fnc->t_read);
503
504 return 0;
505}
506
507static int fpm_write(struct thread *t)
508{
509 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
510 socklen_t statuslen;
511 ssize_t bwritten;
512 int rv, status;
513 size_t btotal;
514
515 if (fnc->connecting == true) {
516 status = 0;
517 statuslen = sizeof(status);
518
519 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
520 &statuslen);
521 if (rv == -1 || status != 0) {
522 if (rv != -1)
e5e444d8
RZ
523 zlog_warn("%s: connection failed: %s", __func__,
524 strerror(status));
d35f447d 525 else
e5e444d8
RZ
526 zlog_warn("%s: SO_ERROR failed: %s", __func__,
527 strerror(status));
d35f447d 528
c871e6c9
RZ
529 atomic_fetch_add_explicit(
530 &fnc->counters.connection_errors, 1,
531 memory_order_relaxed);
6cc059cd 532
a2032324 533 FPM_RECONNECT(fnc);
d35f447d
RZ
534 return 0;
535 }
536
537 fnc->connecting = false;
018e77bc 538
e1afb97f
RZ
539 /* Permit receiving messages now. */
540 thread_add_read(fnc->fthread->master, fpm_read, fnc,
541 fnc->socket, &fnc->t_read);
d35f447d
RZ
542 }
543
544 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
545
546 while (true) {
547 /* Stream is empty: reset pointers and return. */
548 if (STREAM_READABLE(fnc->obuf) == 0) {
549 stream_reset(fnc->obuf);
550 break;
551 }
552
553 /* Try to write all at once. */
554 btotal = stream_get_endp(fnc->obuf) -
555 stream_get_getp(fnc->obuf);
556 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
557 if (bwritten == 0) {
c871e6c9
RZ
558 atomic_fetch_add_explicit(
559 &fnc->counters.connection_closes, 1,
560 memory_order_relaxed);
e5e444d8
RZ
561
562 if (IS_ZEBRA_DEBUG_FPM)
563 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
564 break;
565 }
566 if (bwritten == -1) {
ad4d1022
RZ
567 /* Attempt to continue if blocked by a signal. */
568 if (errno == EINTR)
569 continue;
570 /* Receiver is probably slow, lets give it some time. */
571 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
572 break;
573
c871e6c9
RZ
574 atomic_fetch_add_explicit(
575 &fnc->counters.connection_errors, 1,
576 memory_order_relaxed);
e5e444d8
RZ
577 zlog_warn("%s: connection failure: %s", __func__,
578 strerror(errno));
a2032324
RZ
579
580 FPM_RECONNECT(fnc);
581 return 0;
d35f447d
RZ
582 }
583
6cc059cd 584 /* Account all bytes sent. */
c871e6c9
RZ
585 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
586 memory_order_relaxed);
6cc059cd 587
ad4d1022 588 /* Account number of bytes free. */
c871e6c9
RZ
589 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
590 memory_order_relaxed);
ad4d1022 591
d35f447d
RZ
592 stream_forward_getp(fnc->obuf, (size_t)bwritten);
593 }
594
595 /* Stream is not empty yet, we must schedule more writes. */
596 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 597 stream_pulldown(fnc->obuf);
d35f447d
RZ
598 thread_add_write(fnc->fthread->master, fpm_write, fnc,
599 fnc->socket, &fnc->t_write);
600 return 0;
601 }
602
603 return 0;
604}
605
606static int fpm_connect(struct thread *t)
607{
608 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
609 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
610 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
611 socklen_t slen;
d35f447d
RZ
612 int rv, sock;
613 char addrstr[INET6_ADDRSTRLEN];
614
3bdd7fca 615 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 616 if (sock == -1) {
6cc059cd 617 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
618 strerror(errno));
619 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
620 &fnc->t_connect);
621 return 0;
622 }
623
624 set_nonblocking(sock);
625
3bdd7fca
RZ
626 if (fnc->addr.ss_family == AF_INET) {
627 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
628 slen = sizeof(*sin);
629 } else {
630 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
631 slen = sizeof(*sin6);
632 }
d35f447d 633
e5e444d8
RZ
634 if (IS_ZEBRA_DEBUG_FPM)
635 zlog_debug("%s: attempting to connect to %s:%d", __func__,
636 addrstr, ntohs(sin->sin_port));
d35f447d 637
3bdd7fca 638 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 639 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
640 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
641 memory_order_relaxed);
d35f447d
RZ
642 close(sock);
643 zlog_warn("%s: fpm connection failed: %s", __func__,
644 strerror(errno));
645 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
646 &fnc->t_connect);
647 return 0;
648 }
649
650 fnc->connecting = (errno == EINPROGRESS);
651 fnc->socket = sock;
e1afb97f
RZ
652 if (!fnc->connecting)
653 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
654 &fnc->t_read);
d35f447d
RZ
655 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
656 &fnc->t_write);
657
f9bf1ecc
DE
658 /*
659 * Starting with LSPs walk all FPM objects, marking them
660 * as unsent and then replaying them.
661 */
662 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
663 &fnc->t_lspreset);
018e77bc 664
d35f447d
RZ
665 return 0;
666}
667
668/**
669 * Encode data plane operation context into netlink and enqueue it in the FPM
670 * output buffer.
671 *
672 * @param fnc the netlink FPM context.
673 * @param ctx the data plane operation context data.
674 * @return 0 on success or -1 on not enough space.
675 */
676static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
677{
678 uint8_t nl_buf[NL_PKT_BUF_SIZE];
679 size_t nl_buf_len;
680 ssize_t rv;
edfeff42 681 uint64_t obytes, obytes_peak;
b55ab92a
RZ
682 enum dplane_op_e op = dplane_ctx_get_op(ctx);
683
684 /*
685 * If we were configured to not use next hop groups, then quit as soon
686 * as possible.
687 */
688 if ((!fnc->use_nhg)
689 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
690 || op == DPLANE_OP_NH_UPDATE))
691 return 0;
d35f447d
RZ
692
693 nl_buf_len = 0;
694
695 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
696
b55ab92a 697 switch (op) {
d35f447d
RZ
698 case DPLANE_OP_ROUTE_UPDATE:
699 case DPLANE_OP_ROUTE_DELETE:
0be6e7d7
JU
700 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
701 nl_buf, sizeof(nl_buf),
702 true, fnc->use_nhg);
d35f447d 703 if (rv <= 0) {
0be6e7d7
JU
704 zlog_err(
705 "%s: netlink_route_multipath_msg_encode failed",
706 __func__);
d35f447d
RZ
707 return 0;
708 }
709
710 nl_buf_len = (size_t)rv;
d35f447d
RZ
711
712 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 713 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
714 break;
715
716 /* FALL THROUGH */
717 case DPLANE_OP_ROUTE_INSTALL:
0be6e7d7 718 rv = netlink_route_multipath_msg_encode(
b55ab92a
RZ
719 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
720 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 721 if (rv <= 0) {
0be6e7d7
JU
722 zlog_err(
723 "%s: netlink_route_multipath_msg_encode failed",
724 __func__);
d35f447d
RZ
725 return 0;
726 }
727
728 nl_buf_len += (size_t)rv;
d35f447d
RZ
729 break;
730
bda10adf
RZ
731 case DPLANE_OP_MAC_INSTALL:
732 case DPLANE_OP_MAC_DELETE:
733 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
734 if (rv <= 0) {
e5e444d8
RZ
735 zlog_err("%s: netlink_macfdb_update_ctx failed",
736 __func__);
bda10adf
RZ
737 return 0;
738 }
739
740 nl_buf_len = (size_t)rv;
bda10adf
RZ
741 break;
742
e9a1cd93 743 case DPLANE_OP_NH_DELETE:
0be6e7d7
JU
744 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
745 sizeof(nl_buf));
e9a1cd93 746 if (rv <= 0) {
0be6e7d7
JU
747 zlog_err("%s: netlink_nexthop_msg_encode failed",
748 __func__);
e9a1cd93
RZ
749 return 0;
750 }
751
752 nl_buf_len = (size_t)rv;
753 break;
d35f447d
RZ
754 case DPLANE_OP_NH_INSTALL:
755 case DPLANE_OP_NH_UPDATE:
0be6e7d7
JU
756 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
757 sizeof(nl_buf));
e9a1cd93 758 if (rv <= 0) {
0be6e7d7
JU
759 zlog_err("%s: netlink_nexthop_msg_encode failed",
760 __func__);
e9a1cd93
RZ
761 return 0;
762 }
763
764 nl_buf_len = (size_t)rv;
765 break;
766
d35f447d
RZ
767 case DPLANE_OP_LSP_INSTALL:
768 case DPLANE_OP_LSP_UPDATE:
769 case DPLANE_OP_LSP_DELETE:
b300c8bb
DE
770 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
771 if (rv <= 0) {
f9bf1ecc
DE
772 zlog_err("%s: netlink_lsp_msg_encoder failed",
773 __func__);
b300c8bb
DE
774 return 0;
775 }
776
777 nl_buf_len += (size_t)rv;
778 break;
779
d35f447d
RZ
780 case DPLANE_OP_PW_INSTALL:
781 case DPLANE_OP_PW_UNINSTALL:
782 case DPLANE_OP_ADDR_INSTALL:
783 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
784 case DPLANE_OP_NEIGH_INSTALL:
785 case DPLANE_OP_NEIGH_UPDATE:
786 case DPLANE_OP_NEIGH_DELETE:
787 case DPLANE_OP_VTEP_ADD:
788 case DPLANE_OP_VTEP_DELETE:
789 case DPLANE_OP_SYS_ROUTE_ADD:
790 case DPLANE_OP_SYS_ROUTE_DELETE:
791 case DPLANE_OP_ROUTE_NOTIFY:
792 case DPLANE_OP_LSP_NOTIFY:
793 case DPLANE_OP_NONE:
794 break;
795
796 default:
e5e444d8
RZ
797 if (IS_ZEBRA_DEBUG_FPM)
798 zlog_debug("%s: unhandled data plane message (%d) %s",
799 __func__, dplane_ctx_get_op(ctx),
800 dplane_op2str(dplane_ctx_get_op(ctx)));
d35f447d
RZ
801 break;
802 }
803
804 /* Skip empty enqueues. */
805 if (nl_buf_len == 0)
806 return 0;
807
a179ba35
RZ
808 /* We must know if someday a message goes beyond 65KiB. */
809 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
810
811 /* Check if we have enough buffer space. */
812 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
813 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
814 memory_order_relaxed);
e5e444d8
RZ
815
816 if (IS_ZEBRA_DEBUG_FPM)
817 zlog_debug(
818 "%s: buffer full: wants to write %zu but has %zu",
819 __func__, nl_buf_len + FPM_HEADER_SIZE,
820 STREAM_WRITEABLE(fnc->obuf));
821
a179ba35
RZ
822 return -1;
823 }
824
d35f447d 825 /*
a179ba35
RZ
826 * Fill in the FPM header information.
827 *
828 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
829 */
830 stream_putc(fnc->obuf, 1);
831 stream_putc(fnc->obuf, 1);
a179ba35 832 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
833
834 /* Write current data. */
835 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
836
ad4d1022 837 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
838 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
839 nl_buf_len + FPM_HEADER_SIZE,
840 memory_order_relaxed);
edfeff42
RZ
841 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
842 memory_order_relaxed);
843 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
844 memory_order_relaxed);
845 if (obytes_peak < obytes)
c871e6c9
RZ
846 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
847 memory_order_relaxed);
ad4d1022 848
d35f447d
RZ
849 /* Tell the thread to start writing. */
850 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
851 &fnc->t_write);
852
853 return 0;
854}
855
f9bf1ecc
DE
856/*
857 * LSP walk/send functions
858 */
859struct fpm_lsp_arg {
860 struct zebra_dplane_ctx *ctx;
861 struct fpm_nl_ctx *fnc;
862 bool complete;
863};
864
865static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
866{
867 zebra_lsp_t *lsp = bucket->data;
868 struct fpm_lsp_arg *fla = arg;
869
870 /* Skip entries which have already been sent */
871 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
872 return HASHWALK_CONTINUE;
873
874 dplane_ctx_reset(fla->ctx);
875 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
876
877 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
878 fla->complete = false;
879 return HASHWALK_ABORT;
880 }
881
882 /* Mark entry as sent */
883 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
884 return HASHWALK_CONTINUE;
885}
886
887static int fpm_lsp_send(struct thread *t)
888{
889 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
890 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
891 struct fpm_lsp_arg fla;
892
893 fla.fnc = fnc;
894 fla.ctx = dplane_ctx_alloc();
895 fla.complete = true;
896
897 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
898
899 dplane_ctx_fini(&fla.ctx);
900
901 if (fla.complete) {
902 WALK_FINISH(fnc, FNE_LSP_FINISHED);
903
904 /* Now move onto routes */
1f9193c1
RZ
905 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
906 &fnc->t_nhgreset);
f9bf1ecc
DE
907 } else {
908 /* Didn't finish - reschedule LSP walk */
909 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
910 &fnc->t_lspwalk);
911 }
912
913 return 0;
914}
915
981ca597
RZ
916/*
917 * Next hop walk/send functions.
918 */
919struct fpm_nhg_arg {
920 struct zebra_dplane_ctx *ctx;
921 struct fpm_nl_ctx *fnc;
922 bool complete;
923};
924
925static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
926{
927 struct nhg_hash_entry *nhe = bucket->data;
928 struct fpm_nhg_arg *fna = arg;
929
930 /* This entry was already sent, skip it. */
931 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
932 return HASHWALK_CONTINUE;
933
934 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
935 dplane_ctx_reset(fna->ctx);
936 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
937 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
938 /* Our buffers are full, lets give it some cycles. */
939 fna->complete = false;
940 return HASHWALK_ABORT;
941 }
942
943 /* Mark group as sent, so it doesn't get sent again. */
944 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
945
946 return HASHWALK_CONTINUE;
947}
948
949static int fpm_nhg_send(struct thread *t)
950{
951 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
952 struct fpm_nhg_arg fna;
953
954 fna.fnc = fnc;
955 fna.ctx = dplane_ctx_alloc();
956 fna.complete = true;
957
958 /* Send next hops. */
1f9193c1
RZ
959 if (fnc->use_nhg)
960 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
981ca597
RZ
961
962 /* `free()` allocated memory. */
963 dplane_ctx_fini(&fna.ctx);
964
965 /* We are done sending next hops, lets install the routes now. */
55eb9d4d
RZ
966 if (fna.complete) {
967 WALK_FINISH(fnc, FNE_NHG_FINISHED);
e41e0f81
RZ
968 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
969 &fnc->t_ribreset);
55eb9d4d 970 } else /* Otherwise reschedule next hop group again. */
981ca597
RZ
971 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
972 &fnc->t_nhgwalk);
973
974 return 0;
975}
976
018e77bc
RZ
977/**
978 * Send all RIB installed routes to the connected data plane.
979 */
980static int fpm_rib_send(struct thread *t)
981{
982 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
983 rib_dest_t *dest;
984 struct route_node *rn;
985 struct route_table *rt;
986 struct zebra_dplane_ctx *ctx;
987 rib_tables_iter_t rt_iter;
988
989 /* Allocate temporary context for all transactions. */
990 ctx = dplane_ctx_alloc();
991
992 rt_iter.state = RIB_TABLES_ITER_S_INIT;
993 while ((rt = rib_tables_iter_next(&rt_iter))) {
994 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
995 dest = rib_dest_from_rnode(rn);
996 /* Skip bad route entries. */
a50404aa 997 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 998 continue;
018e77bc
RZ
999
1000 /* Check for already sent routes. */
a50404aa 1001 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 1002 continue;
018e77bc
RZ
1003
1004 /* Enqueue route install. */
1005 dplane_ctx_reset(ctx);
1006 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1007 dest->selected_fib);
1008 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1009 /* Free the temporary allocated context. */
1010 dplane_ctx_fini(&ctx);
1011
018e77bc
RZ
1012 thread_add_timer(zrouter.master, fpm_rib_send,
1013 fnc, 1, &fnc->t_ribwalk);
1014 return 0;
1015 }
1016
1017 /* Mark as sent. */
1018 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1019 }
1020 }
1021
1022 /* Free the temporary allocated context. */
1023 dplane_ctx_fini(&ctx);
1024
1025 /* All RIB routes sent! */
55eb9d4d 1026 WALK_FINISH(fnc, FNE_RIB_FINISHED);
018e77bc 1027
e41e0f81
RZ
1028 /* Schedule next event: RMAC reset. */
1029 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1030 &fnc->t_rmacreset);
1031
018e77bc
RZ
1032 return 0;
1033}
1034
bda10adf
RZ
1035/*
1036 * The next three functions will handle RMAC enqueue.
1037 */
1038struct fpm_rmac_arg {
1039 struct zebra_dplane_ctx *ctx;
1040 struct fpm_nl_ctx *fnc;
1041 zebra_l3vni_t *zl3vni;
55eb9d4d 1042 bool complete;
bda10adf
RZ
1043};
1044
9d5c3268 1045static void fpm_enqueue_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1046{
1047 struct fpm_rmac_arg *fra = arg;
1048 zebra_mac_t *zrmac = backet->data;
1049 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1050 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1051 struct zebra_if *br_zif;
1052 vlanid_t vid;
1053 bool sticky;
1054
1055 /* Entry already sent. */
55eb9d4d 1056 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
bda10adf
RZ
1057 return;
1058
1059 sticky = !!CHECK_FLAG(zrmac->flags,
1060 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1061 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1062 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1063
1064 dplane_ctx_reset(fra->ctx);
1065 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1066 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a 1067 zif->brslave_info.br_if, vid,
f188e68e
AK
1068 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky,
1069 0 /*nhg*/, 0 /*update_flags*/);
bda10adf 1070 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1071 thread_add_timer(zrouter.master, fpm_rmac_send,
1072 fra->fnc, 1, &fra->fnc->t_rmacwalk);
55eb9d4d 1073 fra->complete = false;
bda10adf
RZ
1074 }
1075}
1076
9d5c3268 1077static void fpm_enqueue_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1078{
1079 struct fpm_rmac_arg *fra = arg;
1080 zebra_l3vni_t *zl3vni = backet->data;
1081
1082 fra->zl3vni = zl3vni;
1083 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1084}
1085
1086static int fpm_rmac_send(struct thread *t)
1087{
1088 struct fpm_rmac_arg fra;
1089
1090 fra.fnc = THREAD_ARG(t);
1091 fra.ctx = dplane_ctx_alloc();
55eb9d4d 1092 fra.complete = true;
bda10adf
RZ
1093 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1094 dplane_ctx_fini(&fra.ctx);
1095
55eb9d4d
RZ
1096 /* RMAC walk completed. */
1097 if (fra.complete)
1098 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
1099
bda10adf
RZ
1100 return 0;
1101}
1102
981ca597
RZ
1103/*
1104 * Resets the next hop FPM flags so we send all next hops again.
1105 */
1106static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1107{
1108 struct nhg_hash_entry *nhe = bucket->data;
1109
1110 /* Unset FPM installation flag so it gets installed again. */
1111 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1112}
1113
1114static int fpm_nhg_reset(struct thread *t)
1115{
55eb9d4d
RZ
1116 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1117
981ca597 1118 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
e41e0f81
RZ
1119
1120 /* Schedule next step: send next hop groups. */
1121 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
1122
981ca597
RZ
1123 return 0;
1124}
1125
f9bf1ecc
DE
1126/*
1127 * Resets the LSP FPM flag so we send all LSPs again.
1128 */
1129static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1130{
1131 zebra_lsp_t *lsp = bucket->data;
1132
1133 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1134}
1135
1136static int fpm_lsp_reset(struct thread *t)
1137{
1138 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1139 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1140
1141 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1142
1143 /* Schedule next step: send LSPs */
1144 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
1145
1146 return 0;
1147}
1148
018e77bc
RZ
1149/**
1150 * Resets the RIB FPM flags so we send all routes again.
1151 */
1152static int fpm_rib_reset(struct thread *t)
1153{
1154 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1155 rib_dest_t *dest;
1156 struct route_node *rn;
1157 struct route_table *rt;
1158 rib_tables_iter_t rt_iter;
1159
018e77bc
RZ
1160 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1161 while ((rt = rib_tables_iter_next(&rt_iter))) {
1162 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1163 dest = rib_dest_from_rnode(rn);
1164 /* Skip bad route entries. */
1165 if (dest == NULL)
1166 continue;
1167
1168 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1169 }
1170 }
1171
e41e0f81
RZ
1172 /* Schedule next step: send RIB routes. */
1173 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
1174
018e77bc
RZ
1175 return 0;
1176}
1177
bda10adf
RZ
1178/*
1179 * The next three function will handle RMAC table reset.
1180 */
9d5c3268 1181static void fpm_unset_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1182{
1183 zebra_mac_t *zrmac = backet->data;
1184
1185 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1186}
1187
9d5c3268 1188static void fpm_unset_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1189{
1190 zebra_l3vni_t *zl3vni = backet->data;
1191
1192 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1193}
1194
1195static int fpm_rmac_reset(struct thread *t)
1196{
55eb9d4d
RZ
1197 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1198
bda10adf
RZ
1199 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1200
e41e0f81
RZ
1201 /* Schedule next event: send RMAC entries. */
1202 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1203 &fnc->t_rmacwalk);
1204
bda10adf
RZ
1205 return 0;
1206}
1207
ba803a2f
RZ
1208static int fpm_process_queue(struct thread *t)
1209{
1210 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1211 struct zebra_dplane_ctx *ctx;
1212
1213 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1214
1215 while (true) {
1216 /* No space available yet. */
1217 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
1218 break;
1219
1220 /* Dequeue next item or quit processing. */
1221 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1222 if (ctx == NULL)
1223 break;
1224
1225 fpm_nl_enqueue(fnc, ctx);
1226
1227 /* Account the processed entries. */
c871e6c9
RZ
1228 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1,
1229 memory_order_relaxed);
1230 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1231 memory_order_relaxed);
ba803a2f
RZ
1232
1233 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1234 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1235 }
1236
1237 /* Check for more items in the queue. */
c871e6c9
RZ
1238 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1239 memory_order_relaxed)
1240 > 0)
ba803a2f
RZ
1241 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1242 fnc, 0, &fnc->t_dequeue);
1243
1244 return 0;
1245}
1246
3bdd7fca
RZ
1247/**
1248 * Handles external (e.g. CLI, data plane or others) events.
1249 */
1250static int fpm_process_event(struct thread *t)
1251{
1252 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1253 int event = THREAD_VAL(t);
1254
1255 switch (event) {
1256 case FNE_DISABLE:
e5e444d8 1257 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1258 fnc->disabled = true;
c871e6c9
RZ
1259 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1260 memory_order_relaxed);
3bdd7fca
RZ
1261
1262 /* Call reconnect to disable timers and clean up context. */
1263 fpm_reconnect(fnc);
1264 break;
1265
1266 case FNE_RECONNECT:
e5e444d8 1267 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1268 fnc->disabled = false;
c871e6c9
RZ
1269 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1270 memory_order_relaxed);
3bdd7fca
RZ
1271 fpm_reconnect(fnc);
1272 break;
1273
6cc059cd 1274 case FNE_RESET_COUNTERS:
e5e444d8 1275 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1276 memset(&fnc->counters, 0, sizeof(fnc->counters));
1277 break;
1278
b55ab92a
RZ
1279 case FNE_TOGGLE_NHG:
1280 zlog_info("%s: toggle next hop groups support", __func__);
1281 fnc->use_nhg = !fnc->use_nhg;
1282 fpm_reconnect(fnc);
1283 break;
1284
a2032324
RZ
1285 case FNE_INTERNAL_RECONNECT:
1286 fpm_reconnect(fnc);
1287 break;
1288
55eb9d4d
RZ
1289 case FNE_NHG_FINISHED:
1290 if (IS_ZEBRA_DEBUG_FPM)
1291 zlog_debug("%s: next hop groups walk finished",
1292 __func__);
55eb9d4d
RZ
1293 break;
1294 case FNE_RIB_FINISHED:
1295 if (IS_ZEBRA_DEBUG_FPM)
1296 zlog_debug("%s: RIB walk finished", __func__);
55eb9d4d
RZ
1297 break;
1298 case FNE_RMAC_FINISHED:
1299 if (IS_ZEBRA_DEBUG_FPM)
1300 zlog_debug("%s: RMAC walk finished", __func__);
55eb9d4d 1301 break;
f9bf1ecc
DE
1302 case FNE_LSP_FINISHED:
1303 if (IS_ZEBRA_DEBUG_FPM)
1304 zlog_debug("%s: LSP walk finished", __func__);
1305 break;
55eb9d4d 1306
3bdd7fca 1307 default:
e5e444d8
RZ
1308 if (IS_ZEBRA_DEBUG_FPM)
1309 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1310 break;
1311 }
1312
1313 return 0;
1314}
1315
d35f447d
RZ
1316/*
1317 * Data plane functions.
1318 */
1319static int fpm_nl_start(struct zebra_dplane_provider *prov)
1320{
1321 struct fpm_nl_ctx *fnc;
1322
1323 fnc = dplane_provider_get_data(prov);
1324 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1325 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1326 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1327 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1328 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1329 fnc->socket = -1;
3bdd7fca 1330 fnc->disabled = true;
ba803a2f
RZ
1331 fnc->prov = prov;
1332 TAILQ_INIT(&fnc->ctxqueue);
1333 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1334
b55ab92a
RZ
1335 /* Set default values. */
1336 fnc->use_nhg = true;
1337
d35f447d
RZ
1338 return 0;
1339}
1340
98a87504 1341static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1342{
98a87504 1343 /* Disable all events and close socket. */
f9bf1ecc
DE
1344 THREAD_OFF(fnc->t_lspreset);
1345 THREAD_OFF(fnc->t_lspwalk);
981ca597
RZ
1346 THREAD_OFF(fnc->t_nhgreset);
1347 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1348 THREAD_OFF(fnc->t_ribreset);
1349 THREAD_OFF(fnc->t_ribwalk);
1350 THREAD_OFF(fnc->t_rmacreset);
1351 THREAD_OFF(fnc->t_rmacwalk);
1352 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1353 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1354 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1355
98a87504
RZ
1356 if (fnc->socket != -1) {
1357 close(fnc->socket);
1358 fnc->socket = -1;
1359 }
1360
1361 return 0;
1362}
1363
1364static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1365{
1366 /* Stop the running thread. */
1367 frr_pthread_stop(fnc->fthread, NULL);
1368
1369 /* Free all allocated resources. */
1370 pthread_mutex_destroy(&fnc->obuf_mutex);
1371 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1372 stream_free(fnc->ibuf);
1373 stream_free(fnc->obuf);
98a87504
RZ
1374 free(gfnc);
1375 gfnc = NULL;
d35f447d
RZ
1376
1377 return 0;
1378}
1379
98a87504
RZ
1380static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1381{
1382 struct fpm_nl_ctx *fnc;
1383
1384 fnc = dplane_provider_get_data(prov);
1385 if (early)
1386 return fpm_nl_finish_early(fnc);
1387
1388 return fpm_nl_finish_late(fnc);
1389}
1390
d35f447d
RZ
1391static int fpm_nl_process(struct zebra_dplane_provider *prov)
1392{
1393 struct zebra_dplane_ctx *ctx;
1394 struct fpm_nl_ctx *fnc;
1395 int counter, limit;
edfeff42 1396 uint64_t cur_queue, peak_queue;
d35f447d
RZ
1397
1398 fnc = dplane_provider_get_data(prov);
1399 limit = dplane_provider_get_work_limit(prov);
1400 for (counter = 0; counter < limit; counter++) {
1401 ctx = dplane_provider_dequeue_in_ctx(prov);
1402 if (ctx == NULL)
1403 break;
1404
1405 /*
1406 * Skip all notifications if not connected, we'll walk the RIB
1407 * anyway.
1408 */
6cc059cd 1409 if (fnc->socket != -1 && fnc->connecting == false) {
ba803a2f
RZ
1410 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1411 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1412
1413 /* Account the number of contexts. */
c871e6c9
RZ
1414 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1415 1, memory_order_relaxed);
1416 cur_queue = atomic_load_explicit(
1417 &fnc->counters.ctxqueue_len,
1418 memory_order_relaxed);
1419 peak_queue = atomic_load_explicit(
1420 &fnc->counters.ctxqueue_len_peak,
1421 memory_order_relaxed);
edfeff42 1422 if (peak_queue < cur_queue)
c871e6c9
RZ
1423 atomic_store_explicit(
1424 &fnc->counters.ctxqueue_len_peak,
1425 peak_queue, memory_order_relaxed);
ba803a2f 1426 continue;
6cc059cd
RZ
1427 }
1428
d35f447d
RZ
1429 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1430 dplane_provider_enqueue_out_ctx(prov, ctx);
1431 }
1432
c871e6c9
RZ
1433 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1434 memory_order_relaxed)
1435 > 0)
ba803a2f
RZ
1436 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1437 fnc, 0, &fnc->t_dequeue);
1438
d35f447d
RZ
1439 return 0;
1440}
1441
1442static int fpm_nl_new(struct thread_master *tm)
1443{
1444 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1445 int rv;
1446
3bdd7fca 1447 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1448 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1449 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1450 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1451 &prov);
1452
1453 if (IS_ZEBRA_DEBUG_DPLANE)
1454 zlog_debug("%s register status: %d", prov_name, rv);
1455
612c2c15 1456 install_node(&fpm_node);
6cc059cd
RZ
1457 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1458 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1459 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1460 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1461 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1462 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1463 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1464
d35f447d
RZ
1465 return 0;
1466}
1467
1468static int fpm_nl_init(void)
1469{
1470 hook_register(frr_late_init, fpm_nl_new);
1471 return 0;
1472}
1473
1474FRR_MODULE_SETUP(
1475 .name = "dplane_fpm_nl",
1476 .version = "0.0.1",
1477 .description = "Data plane plugin for FPM using netlink.",
1478 .init = fpm_nl_init,
1479 )