]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
zebra: dplane API to get provider output q length
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
b300c8bb 46#include "zebra/zebra_mpls.h"
018e77bc 47#include "zebra/zebra_router.h"
b2998086
PR
48#include "zebra/zebra_evpn.h"
49#include "zebra/zebra_evpn_mac.h"
bda10adf 50#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
51#include "zebra/kernel_netlink.h"
52#include "zebra/rt_netlink.h"
53#include "zebra/debug.h"
54
55#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
56#define SOUTHBOUND_DEFAULT_PORT 2620
57
a179ba35
RZ
58/**
59 * FPM header:
60 * {
61 * version: 1 byte (always 1),
62 * type: 1 byte (1 for netlink, 2 protobuf),
63 * len: 2 bytes (network order),
64 * }
65 *
66 * This header is used with any format to tell the users how many bytes to
67 * expect.
68 */
69#define FPM_HEADER_SIZE 4
70
d35f447d
RZ
71static const char *prov_name = "dplane_fpm_nl";
72
73struct fpm_nl_ctx {
74 /* data plane connection. */
75 int socket;
3bdd7fca 76 bool disabled;
d35f447d 77 bool connecting;
55eb9d4d 78 bool nhg_complete;
018e77bc 79 bool rib_complete;
bda10adf 80 bool rmac_complete;
b55ab92a 81 bool use_nhg;
d35f447d
RZ
82 struct sockaddr_storage addr;
83
84 /* data plane buffers. */
85 struct stream *ibuf;
86 struct stream *obuf;
87 pthread_mutex_t obuf_mutex;
88
ba803a2f
RZ
89 /*
90 * data plane context queue:
91 * When a FPM server connection becomes a bottleneck, we must keep the
92 * data plane contexts until we get a chance to process them.
93 */
94 struct dplane_ctx_q ctxqueue;
95 pthread_mutex_t ctxqueue_mutex;
96
d35f447d 97 /* data plane events. */
ba803a2f 98 struct zebra_dplane_provider *prov;
d35f447d
RZ
99 struct frr_pthread *fthread;
100 struct thread *t_connect;
101 struct thread *t_read;
102 struct thread *t_write;
3bdd7fca 103 struct thread *t_event;
ba803a2f 104 struct thread *t_dequeue;
018e77bc
RZ
105
106 /* zebra events. */
f9bf1ecc
DE
107 struct thread *t_lspreset;
108 struct thread *t_lspwalk;
981ca597
RZ
109 struct thread *t_nhgreset;
110 struct thread *t_nhgwalk;
018e77bc
RZ
111 struct thread *t_ribreset;
112 struct thread *t_ribwalk;
bda10adf
RZ
113 struct thread *t_rmacreset;
114 struct thread *t_rmacwalk;
6cc059cd
RZ
115
116 /* Statistic counters. */
117 struct {
118 /* Amount of bytes read into ibuf. */
770a8d28 119 _Atomic uint32_t bytes_read;
6cc059cd 120 /* Amount of bytes written from obuf. */
770a8d28 121 _Atomic uint32_t bytes_sent;
ad4d1022 122 /* Output buffer current usage. */
770a8d28 123 _Atomic uint32_t obuf_bytes;
ad4d1022 124 /* Output buffer peak usage. */
770a8d28 125 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
126
127 /* Amount of connection closes. */
770a8d28 128 _Atomic uint32_t connection_closes;
6cc059cd 129 /* Amount of connection errors. */
770a8d28 130 _Atomic uint32_t connection_errors;
6cc059cd
RZ
131
132 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 133 _Atomic uint32_t user_configures;
6cc059cd 134 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 135 _Atomic uint32_t user_disables;
6cc059cd
RZ
136
137 /* Amount of data plane context processed. */
770a8d28 138 _Atomic uint32_t dplane_contexts;
ba803a2f 139 /* Amount of data plane contexts enqueued. */
770a8d28 140 _Atomic uint32_t ctxqueue_len;
ba803a2f 141 /* Peak amount of data plane contexts enqueued. */
770a8d28 142 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
143
144 /* Amount of buffer full events. */
770a8d28 145 _Atomic uint32_t buffer_full;
6cc059cd 146 } counters;
770a8d28 147} *gfnc;
3bdd7fca
RZ
148
149enum fpm_nl_events {
150 /* Ask for FPM to reconnect the external server. */
151 FNE_RECONNECT,
152 /* Disable FPM. */
153 FNE_DISABLE,
6cc059cd
RZ
154 /* Reset counters. */
155 FNE_RESET_COUNTERS,
b55ab92a
RZ
156 /* Toggle next hop group feature. */
157 FNE_TOGGLE_NHG,
a2032324
RZ
158 /* Reconnect request by our own code to avoid races. */
159 FNE_INTERNAL_RECONNECT,
55eb9d4d 160
f9bf1ecc
DE
161 /* LSP walk finished. */
162 FNE_LSP_FINISHED,
55eb9d4d
RZ
163 /* Next hop groups walk finished. */
164 FNE_NHG_FINISHED,
165 /* RIB walk finished. */
166 FNE_RIB_FINISHED,
167 /* RMAC walk finished. */
168 FNE_RMAC_FINISHED,
d35f447d
RZ
169};
170
a2032324
RZ
171#define FPM_RECONNECT(fnc) \
172 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
173 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
174
55eb9d4d
RZ
175#define WALK_FINISH(fnc, ev) \
176 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
177 (ev), NULL)
178
018e77bc
RZ
179/*
180 * Prototypes.
181 */
3bdd7fca 182static int fpm_process_event(struct thread *t);
018e77bc 183static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
f9bf1ecc
DE
184static int fpm_lsp_send(struct thread *t);
185static int fpm_lsp_reset(struct thread *t);
981ca597
RZ
186static int fpm_nhg_send(struct thread *t);
187static int fpm_nhg_reset(struct thread *t);
018e77bc
RZ
188static int fpm_rib_send(struct thread *t);
189static int fpm_rib_reset(struct thread *t);
bda10adf
RZ
190static int fpm_rmac_send(struct thread *t);
191static int fpm_rmac_reset(struct thread *t);
018e77bc 192
3bdd7fca
RZ
193/*
194 * CLI.
195 */
6cc059cd
RZ
196#define FPM_STR "Forwarding Plane Manager configuration\n"
197
3bdd7fca
RZ
198DEFUN(fpm_set_address, fpm_set_address_cmd,
199 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 200 FPM_STR
3bdd7fca
RZ
201 "FPM remote listening server address\n"
202 "Remote IPv4 FPM server\n"
203 "Remote IPv6 FPM server\n"
204 "FPM remote listening server port\n"
205 "Remote FPM server port\n")
206{
207 struct sockaddr_in *sin;
208 struct sockaddr_in6 *sin6;
209 uint16_t port = 0;
210 uint8_t naddr[INET6_BUFSIZ];
211
212 if (argc == 5)
213 port = strtol(argv[4]->arg, NULL, 10);
214
215 /* Handle IPv4 addresses. */
216 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
217 sin = (struct sockaddr_in *)&gfnc->addr;
218
219 memset(sin, 0, sizeof(*sin));
220 sin->sin_family = AF_INET;
221 sin->sin_port =
222 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
223#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
224 sin->sin_len = sizeof(*sin);
225#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
226 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
227
228 goto ask_reconnect;
229 }
230
231 /* Handle IPv6 addresses. */
232 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
233 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
234 return CMD_WARNING;
235 }
236
237 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
238 memset(sin6, 0, sizeof(*sin6));
239 sin6->sin6_family = AF_INET6;
240 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
241#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
242 sin6->sin6_len = sizeof(*sin6);
243#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
244 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
245
246ask_reconnect:
247 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
248 FNE_RECONNECT, &gfnc->t_event);
249 return CMD_SUCCESS;
250}
251
252DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
253 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
254 NO_STR
6cc059cd 255 FPM_STR
3bdd7fca
RZ
256 "FPM remote listening server address\n"
257 "Remote IPv4 FPM server\n"
258 "Remote IPv6 FPM server\n"
259 "FPM remote listening server port\n"
260 "Remote FPM server port\n")
261{
262 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
263 FNE_DISABLE, &gfnc->t_event);
264 return CMD_SUCCESS;
265}
266
b55ab92a
RZ
267DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
268 "fpm use-next-hop-groups",
269 FPM_STR
270 "Use netlink next hop groups feature.\n")
271{
272 /* Already enabled. */
273 if (gfnc->use_nhg)
274 return CMD_SUCCESS;
275
276 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
277 FNE_TOGGLE_NHG, &gfnc->t_event);
278
279 return CMD_SUCCESS;
280}
281
282DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
283 "no fpm use-next-hop-groups",
284 NO_STR
285 FPM_STR
286 "Use netlink next hop groups feature.\n")
287{
288 /* Already disabled. */
289 if (!gfnc->use_nhg)
290 return CMD_SUCCESS;
291
292 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
293 FNE_TOGGLE_NHG, &gfnc->t_event);
294
295 return CMD_SUCCESS;
296}
297
6cc059cd
RZ
298DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
299 "clear fpm counters",
300 CLEAR_STR
301 FPM_STR
302 "FPM statistic counters\n")
303{
304 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
305 FNE_RESET_COUNTERS, &gfnc->t_event);
306 return CMD_SUCCESS;
307}
308
309DEFUN(fpm_show_counters, fpm_show_counters_cmd,
310 "show fpm counters",
311 SHOW_STR
312 FPM_STR
313 "FPM statistic counters\n")
314{
315 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
316
317#define SHOW_COUNTER(label, counter) \
770a8d28 318 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
319
320 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
321 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
322 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
323 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
324 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
325 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
326 SHOW_COUNTER("Data plane items processed",
327 gfnc->counters.dplane_contexts);
ba803a2f
RZ
328 SHOW_COUNTER("Data plane items enqueued",
329 gfnc->counters.ctxqueue_len);
330 SHOW_COUNTER("Data plane items queue peak",
331 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
332 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
333 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
334 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
335
336#undef SHOW_COUNTER
337
338 return CMD_SUCCESS;
339}
340
341DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
342 "show fpm counters json",
343 SHOW_STR
344 FPM_STR
345 "FPM statistic counters\n"
346 JSON_STR)
347{
348 struct json_object *jo;
349
350 jo = json_object_new_object();
351 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
352 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
353 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
354 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
355 json_object_int_add(jo, "connection-closes",
356 gfnc->counters.connection_closes);
357 json_object_int_add(jo, "connection-errors",
358 gfnc->counters.connection_errors);
359 json_object_int_add(jo, "data-plane-contexts",
360 gfnc->counters.dplane_contexts);
ba803a2f
RZ
361 json_object_int_add(jo, "data-plane-contexts-queue",
362 gfnc->counters.ctxqueue_len);
363 json_object_int_add(jo, "data-plane-contexts-queue-peak",
364 gfnc->counters.ctxqueue_len_peak);
6cc059cd 365 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
366 json_object_int_add(jo, "user-configures",
367 gfnc->counters.user_configures);
6cc059cd
RZ
368 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
369 vty_out(vty, "%s\n", json_object_to_json_string_ext(jo, 0));
370 json_object_free(jo);
371
372 return CMD_SUCCESS;
373}
374
3bdd7fca
RZ
375static int fpm_write_config(struct vty *vty)
376{
377 struct sockaddr_in *sin;
378 struct sockaddr_in6 *sin6;
379 int written = 0;
380 char addrstr[INET6_ADDRSTRLEN];
381
382 if (gfnc->disabled)
383 return written;
384
385 switch (gfnc->addr.ss_family) {
386 case AF_INET:
387 written = 1;
388 sin = (struct sockaddr_in *)&gfnc->addr;
389 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
390 vty_out(vty, "fpm address %s", addrstr);
391 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
392 vty_out(vty, " port %d", ntohs(sin->sin_port));
393
394 vty_out(vty, "\n");
395 break;
396 case AF_INET6:
397 written = 1;
398 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
399 inet_ntop(AF_INET, &sin6->sin6_addr, addrstr, sizeof(addrstr));
400 vty_out(vty, "fpm address %s", addrstr);
401 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
402 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
403
404 vty_out(vty, "\n");
405 break;
406
407 default:
408 break;
409 }
410
b55ab92a
RZ
411 if (!gfnc->use_nhg) {
412 vty_out(vty, "no fpm use-next-hop-groups\n");
413 written = 1;
414 }
415
3bdd7fca
RZ
416 return written;
417}
418
612c2c15 419static struct cmd_node fpm_node = {
893d8beb
DL
420 .name = "fpm",
421 .node = FPM_NODE,
3bdd7fca 422 .prompt = "",
612c2c15 423 .config_write = fpm_write_config,
3bdd7fca
RZ
424};
425
d35f447d
RZ
426/*
427 * FPM functions.
428 */
429static int fpm_connect(struct thread *t);
430
431static void fpm_reconnect(struct fpm_nl_ctx *fnc)
432{
a2032324 433 /* Cancel all zebra threads first. */
f9bf1ecc
DE
434 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
435 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
a2032324
RZ
436 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
437 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
438 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
439 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
440 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
441 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
442
443 /*
444 * Grab the lock to empty the streams (data plane might try to
445 * enqueue updates while we are closing).
446 */
d35f447d
RZ
447 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
448
3bdd7fca
RZ
449 /* Avoid calling close on `-1`. */
450 if (fnc->socket != -1) {
451 close(fnc->socket);
452 fnc->socket = -1;
453 }
454
d35f447d
RZ
455 stream_reset(fnc->ibuf);
456 stream_reset(fnc->obuf);
457 THREAD_OFF(fnc->t_read);
458 THREAD_OFF(fnc->t_write);
018e77bc 459
3bdd7fca
RZ
460 /* FPM is disabled, don't attempt to connect. */
461 if (fnc->disabled)
462 return;
463
d35f447d
RZ
464 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
465 &fnc->t_connect);
466}
467
468static int fpm_read(struct thread *t)
469{
470 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
471 ssize_t rv;
472
473 /* Let's ignore the input at the moment. */
474 rv = stream_read_try(fnc->ibuf, fnc->socket,
475 STREAM_WRITEABLE(fnc->ibuf));
e1afb97f
RZ
476 /* We've got an interruption. */
477 if (rv == -2) {
478 /* Schedule next read. */
479 thread_add_read(fnc->fthread->master, fpm_read, fnc,
480 fnc->socket, &fnc->t_read);
481 return 0;
482 }
d35f447d 483 if (rv == 0) {
c871e6c9
RZ
484 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
485 memory_order_relaxed);
e5e444d8
RZ
486
487 if (IS_ZEBRA_DEBUG_FPM)
488 zlog_debug("%s: connection closed", __func__);
489
a2032324 490 FPM_RECONNECT(fnc);
d35f447d
RZ
491 return 0;
492 }
493 if (rv == -1) {
c871e6c9
RZ
494 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
495 memory_order_relaxed);
e5e444d8
RZ
496 zlog_warn("%s: connection failure: %s", __func__,
497 strerror(errno));
a2032324 498 FPM_RECONNECT(fnc);
d35f447d
RZ
499 return 0;
500 }
501 stream_reset(fnc->ibuf);
502
6cc059cd 503 /* Account all bytes read. */
c871e6c9
RZ
504 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
505 memory_order_relaxed);
6cc059cd 506
d35f447d
RZ
507 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
508 &fnc->t_read);
509
510 return 0;
511}
512
513static int fpm_write(struct thread *t)
514{
515 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
516 socklen_t statuslen;
517 ssize_t bwritten;
518 int rv, status;
519 size_t btotal;
520
521 if (fnc->connecting == true) {
522 status = 0;
523 statuslen = sizeof(status);
524
525 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
526 &statuslen);
527 if (rv == -1 || status != 0) {
528 if (rv != -1)
e5e444d8
RZ
529 zlog_warn("%s: connection failed: %s", __func__,
530 strerror(status));
d35f447d 531 else
e5e444d8
RZ
532 zlog_warn("%s: SO_ERROR failed: %s", __func__,
533 strerror(status));
d35f447d 534
c871e6c9
RZ
535 atomic_fetch_add_explicit(
536 &fnc->counters.connection_errors, 1,
537 memory_order_relaxed);
6cc059cd 538
a2032324 539 FPM_RECONNECT(fnc);
d35f447d
RZ
540 return 0;
541 }
542
543 fnc->connecting = false;
018e77bc 544
e1afb97f
RZ
545 /* Permit receiving messages now. */
546 thread_add_read(fnc->fthread->master, fpm_read, fnc,
547 fnc->socket, &fnc->t_read);
d35f447d
RZ
548 }
549
550 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
551
552 while (true) {
553 /* Stream is empty: reset pointers and return. */
554 if (STREAM_READABLE(fnc->obuf) == 0) {
555 stream_reset(fnc->obuf);
556 break;
557 }
558
559 /* Try to write all at once. */
560 btotal = stream_get_endp(fnc->obuf) -
561 stream_get_getp(fnc->obuf);
562 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
563 if (bwritten == 0) {
c871e6c9
RZ
564 atomic_fetch_add_explicit(
565 &fnc->counters.connection_closes, 1,
566 memory_order_relaxed);
e5e444d8
RZ
567
568 if (IS_ZEBRA_DEBUG_FPM)
569 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
570 break;
571 }
572 if (bwritten == -1) {
ad4d1022
RZ
573 /* Attempt to continue if blocked by a signal. */
574 if (errno == EINTR)
575 continue;
576 /* Receiver is probably slow, lets give it some time. */
577 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
578 break;
579
c871e6c9
RZ
580 atomic_fetch_add_explicit(
581 &fnc->counters.connection_errors, 1,
582 memory_order_relaxed);
e5e444d8
RZ
583 zlog_warn("%s: connection failure: %s", __func__,
584 strerror(errno));
a2032324
RZ
585
586 FPM_RECONNECT(fnc);
587 return 0;
d35f447d
RZ
588 }
589
6cc059cd 590 /* Account all bytes sent. */
c871e6c9
RZ
591 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
592 memory_order_relaxed);
6cc059cd 593
ad4d1022 594 /* Account number of bytes free. */
c871e6c9
RZ
595 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
596 memory_order_relaxed);
ad4d1022 597
d35f447d
RZ
598 stream_forward_getp(fnc->obuf, (size_t)bwritten);
599 }
600
601 /* Stream is not empty yet, we must schedule more writes. */
602 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 603 stream_pulldown(fnc->obuf);
d35f447d
RZ
604 thread_add_write(fnc->fthread->master, fpm_write, fnc,
605 fnc->socket, &fnc->t_write);
606 return 0;
607 }
608
609 return 0;
610}
611
612static int fpm_connect(struct thread *t)
613{
614 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
615 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
616 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
617 socklen_t slen;
d35f447d
RZ
618 int rv, sock;
619 char addrstr[INET6_ADDRSTRLEN];
620
3bdd7fca 621 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 622 if (sock == -1) {
6cc059cd 623 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
624 strerror(errno));
625 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
626 &fnc->t_connect);
627 return 0;
628 }
629
630 set_nonblocking(sock);
631
3bdd7fca
RZ
632 if (fnc->addr.ss_family == AF_INET) {
633 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
634 slen = sizeof(*sin);
635 } else {
636 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
637 slen = sizeof(*sin6);
638 }
d35f447d 639
e5e444d8
RZ
640 if (IS_ZEBRA_DEBUG_FPM)
641 zlog_debug("%s: attempting to connect to %s:%d", __func__,
642 addrstr, ntohs(sin->sin_port));
d35f447d 643
3bdd7fca 644 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 645 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
646 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
647 memory_order_relaxed);
d35f447d
RZ
648 close(sock);
649 zlog_warn("%s: fpm connection failed: %s", __func__,
650 strerror(errno));
651 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
652 &fnc->t_connect);
653 return 0;
654 }
655
656 fnc->connecting = (errno == EINPROGRESS);
657 fnc->socket = sock;
e1afb97f
RZ
658 if (!fnc->connecting)
659 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
660 &fnc->t_read);
d35f447d
RZ
661 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
662 &fnc->t_write);
663
f9bf1ecc
DE
664 /*
665 * Starting with LSPs walk all FPM objects, marking them
666 * as unsent and then replaying them.
667 */
668 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
669 &fnc->t_lspreset);
018e77bc 670
d35f447d
RZ
671 return 0;
672}
673
674/**
675 * Encode data plane operation context into netlink and enqueue it in the FPM
676 * output buffer.
677 *
678 * @param fnc the netlink FPM context.
679 * @param ctx the data plane operation context data.
680 * @return 0 on success or -1 on not enough space.
681 */
682static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
683{
684 uint8_t nl_buf[NL_PKT_BUF_SIZE];
685 size_t nl_buf_len;
686 ssize_t rv;
edfeff42 687 uint64_t obytes, obytes_peak;
b55ab92a
RZ
688 enum dplane_op_e op = dplane_ctx_get_op(ctx);
689
690 /*
691 * If we were configured to not use next hop groups, then quit as soon
692 * as possible.
693 */
694 if ((!fnc->use_nhg)
695 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
696 || op == DPLANE_OP_NH_UPDATE))
697 return 0;
d35f447d
RZ
698
699 nl_buf_len = 0;
700
701 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
702
b55ab92a 703 switch (op) {
d35f447d
RZ
704 case DPLANE_OP_ROUTE_UPDATE:
705 case DPLANE_OP_ROUTE_DELETE:
0be6e7d7
JU
706 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
707 nl_buf, sizeof(nl_buf),
708 true, fnc->use_nhg);
d35f447d 709 if (rv <= 0) {
0be6e7d7
JU
710 zlog_err(
711 "%s: netlink_route_multipath_msg_encode failed",
712 __func__);
d35f447d
RZ
713 return 0;
714 }
715
716 nl_buf_len = (size_t)rv;
d35f447d
RZ
717
718 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 719 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
720 break;
721
722 /* FALL THROUGH */
723 case DPLANE_OP_ROUTE_INSTALL:
0be6e7d7 724 rv = netlink_route_multipath_msg_encode(
b55ab92a
RZ
725 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
726 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 727 if (rv <= 0) {
0be6e7d7
JU
728 zlog_err(
729 "%s: netlink_route_multipath_msg_encode failed",
730 __func__);
d35f447d
RZ
731 return 0;
732 }
733
734 nl_buf_len += (size_t)rv;
d35f447d
RZ
735 break;
736
bda10adf
RZ
737 case DPLANE_OP_MAC_INSTALL:
738 case DPLANE_OP_MAC_DELETE:
739 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
740 if (rv <= 0) {
e5e444d8
RZ
741 zlog_err("%s: netlink_macfdb_update_ctx failed",
742 __func__);
bda10adf
RZ
743 return 0;
744 }
745
746 nl_buf_len = (size_t)rv;
bda10adf
RZ
747 break;
748
e9a1cd93 749 case DPLANE_OP_NH_DELETE:
0be6e7d7
JU
750 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
751 sizeof(nl_buf));
e9a1cd93 752 if (rv <= 0) {
0be6e7d7
JU
753 zlog_err("%s: netlink_nexthop_msg_encode failed",
754 __func__);
e9a1cd93
RZ
755 return 0;
756 }
757
758 nl_buf_len = (size_t)rv;
759 break;
d35f447d
RZ
760 case DPLANE_OP_NH_INSTALL:
761 case DPLANE_OP_NH_UPDATE:
0be6e7d7
JU
762 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
763 sizeof(nl_buf));
e9a1cd93 764 if (rv <= 0) {
0be6e7d7
JU
765 zlog_err("%s: netlink_nexthop_msg_encode failed",
766 __func__);
e9a1cd93
RZ
767 return 0;
768 }
769
770 nl_buf_len = (size_t)rv;
771 break;
772
d35f447d
RZ
773 case DPLANE_OP_LSP_INSTALL:
774 case DPLANE_OP_LSP_UPDATE:
775 case DPLANE_OP_LSP_DELETE:
b300c8bb
DE
776 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
777 if (rv <= 0) {
f9bf1ecc
DE
778 zlog_err("%s: netlink_lsp_msg_encoder failed",
779 __func__);
b300c8bb
DE
780 return 0;
781 }
782
783 nl_buf_len += (size_t)rv;
784 break;
785
d35f447d
RZ
786 case DPLANE_OP_PW_INSTALL:
787 case DPLANE_OP_PW_UNINSTALL:
788 case DPLANE_OP_ADDR_INSTALL:
789 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
790 case DPLANE_OP_NEIGH_INSTALL:
791 case DPLANE_OP_NEIGH_UPDATE:
792 case DPLANE_OP_NEIGH_DELETE:
793 case DPLANE_OP_VTEP_ADD:
794 case DPLANE_OP_VTEP_DELETE:
795 case DPLANE_OP_SYS_ROUTE_ADD:
796 case DPLANE_OP_SYS_ROUTE_DELETE:
797 case DPLANE_OP_ROUTE_NOTIFY:
798 case DPLANE_OP_LSP_NOTIFY:
799 case DPLANE_OP_NONE:
800 break;
801
802 default:
e5e444d8
RZ
803 if (IS_ZEBRA_DEBUG_FPM)
804 zlog_debug("%s: unhandled data plane message (%d) %s",
805 __func__, dplane_ctx_get_op(ctx),
806 dplane_op2str(dplane_ctx_get_op(ctx)));
d35f447d
RZ
807 break;
808 }
809
810 /* Skip empty enqueues. */
811 if (nl_buf_len == 0)
812 return 0;
813
a179ba35
RZ
814 /* We must know if someday a message goes beyond 65KiB. */
815 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
816
817 /* Check if we have enough buffer space. */
818 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
819 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
820 memory_order_relaxed);
e5e444d8
RZ
821
822 if (IS_ZEBRA_DEBUG_FPM)
823 zlog_debug(
824 "%s: buffer full: wants to write %zu but has %zu",
825 __func__, nl_buf_len + FPM_HEADER_SIZE,
826 STREAM_WRITEABLE(fnc->obuf));
827
a179ba35
RZ
828 return -1;
829 }
830
d35f447d 831 /*
a179ba35
RZ
832 * Fill in the FPM header information.
833 *
834 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
835 */
836 stream_putc(fnc->obuf, 1);
837 stream_putc(fnc->obuf, 1);
a179ba35 838 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
839
840 /* Write current data. */
841 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
842
ad4d1022 843 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
844 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
845 nl_buf_len + FPM_HEADER_SIZE,
846 memory_order_relaxed);
edfeff42
RZ
847 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
848 memory_order_relaxed);
849 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
850 memory_order_relaxed);
851 if (obytes_peak < obytes)
c871e6c9
RZ
852 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
853 memory_order_relaxed);
ad4d1022 854
d35f447d
RZ
855 /* Tell the thread to start writing. */
856 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
857 &fnc->t_write);
858
859 return 0;
860}
861
f9bf1ecc
DE
862/*
863 * LSP walk/send functions
864 */
865struct fpm_lsp_arg {
866 struct zebra_dplane_ctx *ctx;
867 struct fpm_nl_ctx *fnc;
868 bool complete;
869};
870
871static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
872{
873 zebra_lsp_t *lsp = bucket->data;
874 struct fpm_lsp_arg *fla = arg;
875
876 /* Skip entries which have already been sent */
877 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
878 return HASHWALK_CONTINUE;
879
880 dplane_ctx_reset(fla->ctx);
881 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
882
883 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
884 fla->complete = false;
885 return HASHWALK_ABORT;
886 }
887
888 /* Mark entry as sent */
889 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
890 return HASHWALK_CONTINUE;
891}
892
893static int fpm_lsp_send(struct thread *t)
894{
895 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
896 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
897 struct fpm_lsp_arg fla;
898
899 fla.fnc = fnc;
900 fla.ctx = dplane_ctx_alloc();
901 fla.complete = true;
902
903 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
904
905 dplane_ctx_fini(&fla.ctx);
906
907 if (fla.complete) {
908 WALK_FINISH(fnc, FNE_LSP_FINISHED);
909
910 /* Now move onto routes */
911 if (fnc->use_nhg)
912 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
913 &fnc->t_nhgreset);
914 else
915 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
916 &fnc->t_ribreset);
917 } else {
918 /* Didn't finish - reschedule LSP walk */
919 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
920 &fnc->t_lspwalk);
921 }
922
923 return 0;
924}
925
981ca597
RZ
926/*
927 * Next hop walk/send functions.
928 */
929struct fpm_nhg_arg {
930 struct zebra_dplane_ctx *ctx;
931 struct fpm_nl_ctx *fnc;
932 bool complete;
933};
934
935static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
936{
937 struct nhg_hash_entry *nhe = bucket->data;
938 struct fpm_nhg_arg *fna = arg;
939
940 /* This entry was already sent, skip it. */
941 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
942 return HASHWALK_CONTINUE;
943
944 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
945 dplane_ctx_reset(fna->ctx);
946 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
947 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
948 /* Our buffers are full, lets give it some cycles. */
949 fna->complete = false;
950 return HASHWALK_ABORT;
951 }
952
953 /* Mark group as sent, so it doesn't get sent again. */
954 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
955
956 return HASHWALK_CONTINUE;
957}
958
959static int fpm_nhg_send(struct thread *t)
960{
961 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
962 struct fpm_nhg_arg fna;
963
964 fna.fnc = fnc;
965 fna.ctx = dplane_ctx_alloc();
966 fna.complete = true;
967
968 /* Send next hops. */
969 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
970
971 /* `free()` allocated memory. */
972 dplane_ctx_fini(&fna.ctx);
973
974 /* We are done sending next hops, lets install the routes now. */
55eb9d4d
RZ
975 if (fna.complete) {
976 WALK_FINISH(fnc, FNE_NHG_FINISHED);
e41e0f81
RZ
977 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
978 &fnc->t_ribreset);
55eb9d4d 979 } else /* Otherwise reschedule next hop group again. */
981ca597
RZ
980 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
981 &fnc->t_nhgwalk);
982
983 return 0;
984}
985
018e77bc
RZ
986/**
987 * Send all RIB installed routes to the connected data plane.
988 */
989static int fpm_rib_send(struct thread *t)
990{
991 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
992 rib_dest_t *dest;
993 struct route_node *rn;
994 struct route_table *rt;
995 struct zebra_dplane_ctx *ctx;
996 rib_tables_iter_t rt_iter;
997
998 /* Allocate temporary context for all transactions. */
999 ctx = dplane_ctx_alloc();
1000
1001 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1002 while ((rt = rib_tables_iter_next(&rt_iter))) {
1003 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1004 dest = rib_dest_from_rnode(rn);
1005 /* Skip bad route entries. */
a50404aa 1006 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 1007 continue;
018e77bc
RZ
1008
1009 /* Check for already sent routes. */
a50404aa 1010 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 1011 continue;
018e77bc
RZ
1012
1013 /* Enqueue route install. */
1014 dplane_ctx_reset(ctx);
1015 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1016 dest->selected_fib);
1017 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1018 /* Free the temporary allocated context. */
1019 dplane_ctx_fini(&ctx);
1020
018e77bc
RZ
1021 thread_add_timer(zrouter.master, fpm_rib_send,
1022 fnc, 1, &fnc->t_ribwalk);
1023 return 0;
1024 }
1025
1026 /* Mark as sent. */
1027 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1028 }
1029 }
1030
1031 /* Free the temporary allocated context. */
1032 dplane_ctx_fini(&ctx);
1033
1034 /* All RIB routes sent! */
55eb9d4d 1035 WALK_FINISH(fnc, FNE_RIB_FINISHED);
018e77bc 1036
e41e0f81
RZ
1037 /* Schedule next event: RMAC reset. */
1038 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1039 &fnc->t_rmacreset);
1040
018e77bc
RZ
1041 return 0;
1042}
1043
bda10adf
RZ
1044/*
1045 * The next three functions will handle RMAC enqueue.
1046 */
1047struct fpm_rmac_arg {
1048 struct zebra_dplane_ctx *ctx;
1049 struct fpm_nl_ctx *fnc;
1050 zebra_l3vni_t *zl3vni;
55eb9d4d 1051 bool complete;
bda10adf
RZ
1052};
1053
9d5c3268 1054static void fpm_enqueue_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1055{
1056 struct fpm_rmac_arg *fra = arg;
1057 zebra_mac_t *zrmac = backet->data;
1058 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1059 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1060 struct zebra_if *br_zif;
1061 vlanid_t vid;
1062 bool sticky;
1063
1064 /* Entry already sent. */
55eb9d4d 1065 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
bda10adf
RZ
1066 return;
1067
1068 sticky = !!CHECK_FLAG(zrmac->flags,
1069 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1070 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1071 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1072
1073 dplane_ctx_reset(fra->ctx);
1074 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1075 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a 1076 zif->brslave_info.br_if, vid,
f188e68e
AK
1077 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky,
1078 0 /*nhg*/, 0 /*update_flags*/);
bda10adf 1079 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1080 thread_add_timer(zrouter.master, fpm_rmac_send,
1081 fra->fnc, 1, &fra->fnc->t_rmacwalk);
55eb9d4d 1082 fra->complete = false;
bda10adf
RZ
1083 }
1084}
1085
9d5c3268 1086static void fpm_enqueue_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1087{
1088 struct fpm_rmac_arg *fra = arg;
1089 zebra_l3vni_t *zl3vni = backet->data;
1090
1091 fra->zl3vni = zl3vni;
1092 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1093}
1094
1095static int fpm_rmac_send(struct thread *t)
1096{
1097 struct fpm_rmac_arg fra;
1098
1099 fra.fnc = THREAD_ARG(t);
1100 fra.ctx = dplane_ctx_alloc();
55eb9d4d 1101 fra.complete = true;
bda10adf
RZ
1102 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1103 dplane_ctx_fini(&fra.ctx);
1104
55eb9d4d
RZ
1105 /* RMAC walk completed. */
1106 if (fra.complete)
1107 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
1108
bda10adf
RZ
1109 return 0;
1110}
1111
981ca597
RZ
1112/*
1113 * Resets the next hop FPM flags so we send all next hops again.
1114 */
1115static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1116{
1117 struct nhg_hash_entry *nhe = bucket->data;
1118
1119 /* Unset FPM installation flag so it gets installed again. */
1120 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1121}
1122
1123static int fpm_nhg_reset(struct thread *t)
1124{
55eb9d4d
RZ
1125 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1126
1127 fnc->nhg_complete = false;
981ca597 1128 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
e41e0f81
RZ
1129
1130 /* Schedule next step: send next hop groups. */
1131 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
1132
981ca597
RZ
1133 return 0;
1134}
1135
f9bf1ecc
DE
1136/*
1137 * Resets the LSP FPM flag so we send all LSPs again.
1138 */
1139static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1140{
1141 zebra_lsp_t *lsp = bucket->data;
1142
1143 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1144}
1145
1146static int fpm_lsp_reset(struct thread *t)
1147{
1148 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1149 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1150
1151 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1152
1153 /* Schedule next step: send LSPs */
1154 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
1155
1156 return 0;
1157}
1158
018e77bc
RZ
1159/**
1160 * Resets the RIB FPM flags so we send all routes again.
1161 */
1162static int fpm_rib_reset(struct thread *t)
1163{
1164 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1165 rib_dest_t *dest;
1166 struct route_node *rn;
1167 struct route_table *rt;
1168 rib_tables_iter_t rt_iter;
1169
1170 fnc->rib_complete = false;
1171
1172 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1173 while ((rt = rib_tables_iter_next(&rt_iter))) {
1174 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1175 dest = rib_dest_from_rnode(rn);
1176 /* Skip bad route entries. */
1177 if (dest == NULL)
1178 continue;
1179
1180 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1181 }
1182 }
1183
e41e0f81
RZ
1184 /* Schedule next step: send RIB routes. */
1185 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
1186
018e77bc
RZ
1187 return 0;
1188}
1189
bda10adf
RZ
1190/*
1191 * The next three function will handle RMAC table reset.
1192 */
9d5c3268 1193static void fpm_unset_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1194{
1195 zebra_mac_t *zrmac = backet->data;
1196
1197 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1198}
1199
9d5c3268 1200static void fpm_unset_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1201{
1202 zebra_l3vni_t *zl3vni = backet->data;
1203
1204 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1205}
1206
1207static int fpm_rmac_reset(struct thread *t)
1208{
55eb9d4d
RZ
1209 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1210
1211 fnc->rmac_complete = false;
bda10adf
RZ
1212 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1213
e41e0f81
RZ
1214 /* Schedule next event: send RMAC entries. */
1215 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1216 &fnc->t_rmacwalk);
1217
bda10adf
RZ
1218 return 0;
1219}
1220
ba803a2f
RZ
1221static int fpm_process_queue(struct thread *t)
1222{
1223 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1224 struct zebra_dplane_ctx *ctx;
1225
1226 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1227
1228 while (true) {
1229 /* No space available yet. */
1230 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
1231 break;
1232
1233 /* Dequeue next item or quit processing. */
1234 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1235 if (ctx == NULL)
1236 break;
1237
1238 fpm_nl_enqueue(fnc, ctx);
1239
1240 /* Account the processed entries. */
c871e6c9
RZ
1241 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1,
1242 memory_order_relaxed);
1243 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1244 memory_order_relaxed);
ba803a2f
RZ
1245
1246 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1247 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1248 }
1249
1250 /* Check for more items in the queue. */
c871e6c9
RZ
1251 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1252 memory_order_relaxed)
1253 > 0)
ba803a2f
RZ
1254 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1255 fnc, 0, &fnc->t_dequeue);
1256
1257 return 0;
1258}
1259
3bdd7fca
RZ
1260/**
1261 * Handles external (e.g. CLI, data plane or others) events.
1262 */
1263static int fpm_process_event(struct thread *t)
1264{
1265 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1266 int event = THREAD_VAL(t);
1267
1268 switch (event) {
1269 case FNE_DISABLE:
e5e444d8 1270 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1271 fnc->disabled = true;
c871e6c9
RZ
1272 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1273 memory_order_relaxed);
3bdd7fca
RZ
1274
1275 /* Call reconnect to disable timers and clean up context. */
1276 fpm_reconnect(fnc);
1277 break;
1278
1279 case FNE_RECONNECT:
e5e444d8 1280 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1281 fnc->disabled = false;
c871e6c9
RZ
1282 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1283 memory_order_relaxed);
3bdd7fca
RZ
1284 fpm_reconnect(fnc);
1285 break;
1286
6cc059cd 1287 case FNE_RESET_COUNTERS:
e5e444d8 1288 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1289 memset(&fnc->counters, 0, sizeof(fnc->counters));
1290 break;
1291
b55ab92a
RZ
1292 case FNE_TOGGLE_NHG:
1293 zlog_info("%s: toggle next hop groups support", __func__);
1294 fnc->use_nhg = !fnc->use_nhg;
1295 fpm_reconnect(fnc);
1296 break;
1297
a2032324
RZ
1298 case FNE_INTERNAL_RECONNECT:
1299 fpm_reconnect(fnc);
1300 break;
1301
55eb9d4d
RZ
1302 case FNE_NHG_FINISHED:
1303 if (IS_ZEBRA_DEBUG_FPM)
1304 zlog_debug("%s: next hop groups walk finished",
1305 __func__);
1306
1307 fnc->nhg_complete = true;
1308 break;
1309 case FNE_RIB_FINISHED:
1310 if (IS_ZEBRA_DEBUG_FPM)
1311 zlog_debug("%s: RIB walk finished", __func__);
1312
1313 fnc->rib_complete = true;
1314 break;
1315 case FNE_RMAC_FINISHED:
1316 if (IS_ZEBRA_DEBUG_FPM)
1317 zlog_debug("%s: RMAC walk finished", __func__);
1318
1319 fnc->rmac_complete = true;
1320 break;
f9bf1ecc
DE
1321 case FNE_LSP_FINISHED:
1322 if (IS_ZEBRA_DEBUG_FPM)
1323 zlog_debug("%s: LSP walk finished", __func__);
1324 break;
55eb9d4d 1325
3bdd7fca 1326 default:
e5e444d8
RZ
1327 if (IS_ZEBRA_DEBUG_FPM)
1328 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1329 break;
1330 }
1331
1332 return 0;
1333}
1334
d35f447d
RZ
1335/*
1336 * Data plane functions.
1337 */
1338static int fpm_nl_start(struct zebra_dplane_provider *prov)
1339{
1340 struct fpm_nl_ctx *fnc;
1341
1342 fnc = dplane_provider_get_data(prov);
1343 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1344 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1345 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1346 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1347 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1348 fnc->socket = -1;
3bdd7fca 1349 fnc->disabled = true;
ba803a2f
RZ
1350 fnc->prov = prov;
1351 TAILQ_INIT(&fnc->ctxqueue);
1352 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1353
b55ab92a
RZ
1354 /* Set default values. */
1355 fnc->use_nhg = true;
1356
d35f447d
RZ
1357 return 0;
1358}
1359
98a87504 1360static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1361{
98a87504 1362 /* Disable all events and close socket. */
f9bf1ecc
DE
1363 THREAD_OFF(fnc->t_lspreset);
1364 THREAD_OFF(fnc->t_lspwalk);
981ca597
RZ
1365 THREAD_OFF(fnc->t_nhgreset);
1366 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1367 THREAD_OFF(fnc->t_ribreset);
1368 THREAD_OFF(fnc->t_ribwalk);
1369 THREAD_OFF(fnc->t_rmacreset);
1370 THREAD_OFF(fnc->t_rmacwalk);
1371 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1372 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1373 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1374
98a87504
RZ
1375 if (fnc->socket != -1) {
1376 close(fnc->socket);
1377 fnc->socket = -1;
1378 }
1379
1380 return 0;
1381}
1382
1383static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1384{
1385 /* Stop the running thread. */
1386 frr_pthread_stop(fnc->fthread, NULL);
1387
1388 /* Free all allocated resources. */
1389 pthread_mutex_destroy(&fnc->obuf_mutex);
1390 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1391 stream_free(fnc->ibuf);
1392 stream_free(fnc->obuf);
98a87504
RZ
1393 free(gfnc);
1394 gfnc = NULL;
d35f447d
RZ
1395
1396 return 0;
1397}
1398
98a87504
RZ
1399static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1400{
1401 struct fpm_nl_ctx *fnc;
1402
1403 fnc = dplane_provider_get_data(prov);
1404 if (early)
1405 return fpm_nl_finish_early(fnc);
1406
1407 return fpm_nl_finish_late(fnc);
1408}
1409
d35f447d
RZ
1410static int fpm_nl_process(struct zebra_dplane_provider *prov)
1411{
1412 struct zebra_dplane_ctx *ctx;
1413 struct fpm_nl_ctx *fnc;
1414 int counter, limit;
edfeff42 1415 uint64_t cur_queue, peak_queue;
d35f447d
RZ
1416
1417 fnc = dplane_provider_get_data(prov);
1418 limit = dplane_provider_get_work_limit(prov);
1419 for (counter = 0; counter < limit; counter++) {
1420 ctx = dplane_provider_dequeue_in_ctx(prov);
1421 if (ctx == NULL)
1422 break;
1423
1424 /*
1425 * Skip all notifications if not connected, we'll walk the RIB
1426 * anyway.
1427 */
6cc059cd 1428 if (fnc->socket != -1 && fnc->connecting == false) {
ba803a2f
RZ
1429 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1430 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1431
1432 /* Account the number of contexts. */
c871e6c9
RZ
1433 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1434 1, memory_order_relaxed);
1435 cur_queue = atomic_load_explicit(
1436 &fnc->counters.ctxqueue_len,
1437 memory_order_relaxed);
1438 peak_queue = atomic_load_explicit(
1439 &fnc->counters.ctxqueue_len_peak,
1440 memory_order_relaxed);
edfeff42 1441 if (peak_queue < cur_queue)
c871e6c9
RZ
1442 atomic_store_explicit(
1443 &fnc->counters.ctxqueue_len_peak,
7545bda0 1444 cur_queue, memory_order_relaxed);
ba803a2f 1445 continue;
6cc059cd
RZ
1446 }
1447
d35f447d
RZ
1448 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1449 dplane_provider_enqueue_out_ctx(prov, ctx);
1450 }
1451
c871e6c9
RZ
1452 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1453 memory_order_relaxed)
1454 > 0)
ba803a2f
RZ
1455 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1456 fnc, 0, &fnc->t_dequeue);
1457
d35f447d
RZ
1458 return 0;
1459}
1460
1461static int fpm_nl_new(struct thread_master *tm)
1462{
1463 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1464 int rv;
1465
3bdd7fca 1466 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1467 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1468 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1469 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1470 &prov);
1471
1472 if (IS_ZEBRA_DEBUG_DPLANE)
1473 zlog_debug("%s register status: %d", prov_name, rv);
1474
612c2c15 1475 install_node(&fpm_node);
6cc059cd
RZ
1476 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1477 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1478 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1479 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1480 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1481 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1482 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1483
d35f447d
RZ
1484 return 0;
1485}
1486
1487static int fpm_nl_init(void)
1488{
1489 hook_register(frr_late_init, fpm_nl_new);
1490 return 0;
1491}
1492
1493FRR_MODULE_SETUP(
1494 .name = "dplane_fpm_nl",
1495 .version = "0.0.1",
1496 .description = "Data plane plugin for FPM using netlink.",
1497 .init = fpm_nl_init,
1498 )