1 // SPDX-License-Identifier: GPL-2.0-or-later
3 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
5 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
10 #include "config.h" /* Include this explicitly */
13 #include <arpa/inet.h>
15 #include <sys/types.h>
16 #include <sys/socket.h>
21 #include "lib/zebra.h"
23 #include "lib/libfrr.h"
24 #include "lib/frratomic.h"
25 #include "lib/command.h"
26 #include "lib/memory.h"
27 #include "lib/network.h"
29 #include "lib/frr_pthread.h"
30 #include "zebra/debug.h"
31 #include "zebra/interface.h"
32 #include "zebra/zebra_dplane.h"
33 #include "zebra/zebra_mpls.h"
34 #include "zebra/zebra_router.h"
35 #include "zebra/interface.h"
36 #include "zebra/zebra_vxlan_private.h"
37 #include "zebra/zebra_evpn.h"
38 #include "zebra/zebra_evpn_mac.h"
39 #include "zebra/kernel_netlink.h"
40 #include "zebra/rt_netlink.h"
41 #include "zebra/debug.h"
44 #define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
45 #define SOUTHBOUND_DEFAULT_PORT 2620
50 * version: 1 byte (always 1),
51 * type: 1 byte (1 for netlink, 2 protobuf),
52 * len: 2 bytes (network order),
55 * This header is used with any format to tell the users how many bytes to
58 #define FPM_HEADER_SIZE 4
60 static const char *prov_name
= "dplane_fpm_nl";
63 /* data plane connection. */
68 struct sockaddr_storage addr
;
70 /* data plane buffers. */
73 pthread_mutex_t obuf_mutex
;
76 * data plane context queue:
77 * When a FPM server connection becomes a bottleneck, we must keep the
78 * data plane contexts until we get a chance to process them.
80 struct dplane_ctx_list_head ctxqueue
;
81 pthread_mutex_t ctxqueue_mutex
;
83 /* data plane events. */
84 struct zebra_dplane_provider
*prov
;
85 struct frr_pthread
*fthread
;
86 struct thread
*t_connect
;
87 struct thread
*t_read
;
88 struct thread
*t_write
;
89 struct thread
*t_event
;
91 struct thread
*t_dequeue
;
94 struct thread
*t_lspreset
;
95 struct thread
*t_lspwalk
;
96 struct thread
*t_nhgreset
;
97 struct thread
*t_nhgwalk
;
98 struct thread
*t_ribreset
;
99 struct thread
*t_ribwalk
;
100 struct thread
*t_rmacreset
;
101 struct thread
*t_rmacwalk
;
103 /* Statistic counters. */
105 /* Amount of bytes read into ibuf. */
106 _Atomic
uint32_t bytes_read
;
107 /* Amount of bytes written from obuf. */
108 _Atomic
uint32_t bytes_sent
;
109 /* Output buffer current usage. */
110 _Atomic
uint32_t obuf_bytes
;
111 /* Output buffer peak usage. */
112 _Atomic
uint32_t obuf_peak
;
114 /* Amount of connection closes. */
115 _Atomic
uint32_t connection_closes
;
116 /* Amount of connection errors. */
117 _Atomic
uint32_t connection_errors
;
119 /* Amount of user configurations: FNE_RECONNECT. */
120 _Atomic
uint32_t user_configures
;
121 /* Amount of user disable requests: FNE_DISABLE. */
122 _Atomic
uint32_t user_disables
;
124 /* Amount of data plane context processed. */
125 _Atomic
uint32_t dplane_contexts
;
126 /* Amount of data plane contexts enqueued. */
127 _Atomic
uint32_t ctxqueue_len
;
128 /* Peak amount of data plane contexts enqueued. */
129 _Atomic
uint32_t ctxqueue_len_peak
;
131 /* Amount of buffer full events. */
132 _Atomic
uint32_t buffer_full
;
137 /* Ask for FPM to reconnect the external server. */
141 /* Reset counters. */
143 /* Toggle next hop group feature. */
145 /* Reconnect request by our own code to avoid races. */
146 FNE_INTERNAL_RECONNECT
,
148 /* LSP walk finished. */
150 /* Next hop groups walk finished. */
152 /* RIB walk finished. */
154 /* RMAC walk finished. */
158 #define FPM_RECONNECT(fnc) \
159 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
160 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
162 #define WALK_FINISH(fnc, ev) \
163 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
169 static void fpm_process_event(struct thread
*t
);
170 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
);
171 static void fpm_lsp_send(struct thread
*t
);
172 static void fpm_lsp_reset(struct thread
*t
);
173 static void fpm_nhg_send(struct thread
*t
);
174 static void fpm_nhg_reset(struct thread
*t
);
175 static void fpm_rib_send(struct thread
*t
);
176 static void fpm_rib_reset(struct thread
*t
);
177 static void fpm_rmac_send(struct thread
*t
);
178 static void fpm_rmac_reset(struct thread
*t
);
183 #define FPM_STR "Forwarding Plane Manager configuration\n"
185 DEFUN(fpm_set_address
, fpm_set_address_cmd
,
186 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
188 "FPM remote listening server address\n"
189 "Remote IPv4 FPM server\n"
190 "Remote IPv6 FPM server\n"
191 "FPM remote listening server port\n"
192 "Remote FPM server port\n")
194 struct sockaddr_in
*sin
;
195 struct sockaddr_in6
*sin6
;
197 uint8_t naddr
[INET6_BUFSIZ
];
200 port
= strtol(argv
[4]->arg
, NULL
, 10);
202 /* Handle IPv4 addresses. */
203 if (inet_pton(AF_INET
, argv
[2]->arg
, naddr
) == 1) {
204 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
206 memset(sin
, 0, sizeof(*sin
));
207 sin
->sin_family
= AF_INET
;
209 port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
210 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
211 sin
->sin_len
= sizeof(*sin
);
212 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
213 memcpy(&sin
->sin_addr
, naddr
, sizeof(sin
->sin_addr
));
218 /* Handle IPv6 addresses. */
219 if (inet_pton(AF_INET6
, argv
[2]->arg
, naddr
) != 1) {
220 vty_out(vty
, "%% Invalid address: %s\n", argv
[2]->arg
);
224 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
225 memset(sin6
, 0, sizeof(*sin6
));
226 sin6
->sin6_family
= AF_INET6
;
227 sin6
->sin6_port
= port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
228 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
229 sin6
->sin6_len
= sizeof(*sin6
);
230 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
231 memcpy(&sin6
->sin6_addr
, naddr
, sizeof(sin6
->sin6_addr
));
234 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
235 FNE_RECONNECT
, &gfnc
->t_event
);
239 DEFUN(no_fpm_set_address
, no_fpm_set_address_cmd
,
240 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
243 "FPM remote listening server address\n"
244 "Remote IPv4 FPM server\n"
245 "Remote IPv6 FPM server\n"
246 "FPM remote listening server port\n"
247 "Remote FPM server port\n")
249 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
250 FNE_DISABLE
, &gfnc
->t_event
);
254 DEFUN(fpm_use_nhg
, fpm_use_nhg_cmd
,
255 "fpm use-next-hop-groups",
257 "Use netlink next hop groups feature.\n")
259 /* Already enabled. */
263 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
264 FNE_TOGGLE_NHG
, &gfnc
->t_nhg
);
269 DEFUN(no_fpm_use_nhg
, no_fpm_use_nhg_cmd
,
270 "no fpm use-next-hop-groups",
273 "Use netlink next hop groups feature.\n")
275 /* Already disabled. */
279 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
280 FNE_TOGGLE_NHG
, &gfnc
->t_nhg
);
285 DEFUN(fpm_reset_counters
, fpm_reset_counters_cmd
,
286 "clear fpm counters",
289 "FPM statistic counters\n")
291 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
292 FNE_RESET_COUNTERS
, &gfnc
->t_event
);
296 DEFUN(fpm_show_counters
, fpm_show_counters_cmd
,
300 "FPM statistic counters\n")
302 vty_out(vty
, "%30s\n%30s\n", "FPM counters", "============");
304 #define SHOW_COUNTER(label, counter) \
305 vty_out(vty, "%28s: %u\n", (label), (counter))
307 SHOW_COUNTER("Input bytes", gfnc
->counters
.bytes_read
);
308 SHOW_COUNTER("Output bytes", gfnc
->counters
.bytes_sent
);
309 SHOW_COUNTER("Output buffer current size", gfnc
->counters
.obuf_bytes
);
310 SHOW_COUNTER("Output buffer peak size", gfnc
->counters
.obuf_peak
);
311 SHOW_COUNTER("Connection closes", gfnc
->counters
.connection_closes
);
312 SHOW_COUNTER("Connection errors", gfnc
->counters
.connection_errors
);
313 SHOW_COUNTER("Data plane items processed",
314 gfnc
->counters
.dplane_contexts
);
315 SHOW_COUNTER("Data plane items enqueued",
316 gfnc
->counters
.ctxqueue_len
);
317 SHOW_COUNTER("Data plane items queue peak",
318 gfnc
->counters
.ctxqueue_len_peak
);
319 SHOW_COUNTER("Buffer full hits", gfnc
->counters
.buffer_full
);
320 SHOW_COUNTER("User FPM configurations", gfnc
->counters
.user_configures
);
321 SHOW_COUNTER("User FPM disable requests", gfnc
->counters
.user_disables
);
328 DEFUN(fpm_show_counters_json
, fpm_show_counters_json_cmd
,
329 "show fpm counters json",
332 "FPM statistic counters\n"
335 struct json_object
*jo
;
337 jo
= json_object_new_object();
338 json_object_int_add(jo
, "bytes-read", gfnc
->counters
.bytes_read
);
339 json_object_int_add(jo
, "bytes-sent", gfnc
->counters
.bytes_sent
);
340 json_object_int_add(jo
, "obuf-bytes", gfnc
->counters
.obuf_bytes
);
341 json_object_int_add(jo
, "obuf-bytes-peak", gfnc
->counters
.obuf_peak
);
342 json_object_int_add(jo
, "connection-closes",
343 gfnc
->counters
.connection_closes
);
344 json_object_int_add(jo
, "connection-errors",
345 gfnc
->counters
.connection_errors
);
346 json_object_int_add(jo
, "data-plane-contexts",
347 gfnc
->counters
.dplane_contexts
);
348 json_object_int_add(jo
, "data-plane-contexts-queue",
349 gfnc
->counters
.ctxqueue_len
);
350 json_object_int_add(jo
, "data-plane-contexts-queue-peak",
351 gfnc
->counters
.ctxqueue_len_peak
);
352 json_object_int_add(jo
, "buffer-full-hits", gfnc
->counters
.buffer_full
);
353 json_object_int_add(jo
, "user-configures",
354 gfnc
->counters
.user_configures
);
355 json_object_int_add(jo
, "user-disables", gfnc
->counters
.user_disables
);
361 static int fpm_write_config(struct vty
*vty
)
363 struct sockaddr_in
*sin
;
364 struct sockaddr_in6
*sin6
;
370 switch (gfnc
->addr
.ss_family
) {
373 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
374 vty_out(vty
, "fpm address %pI4", &sin
->sin_addr
);
375 if (sin
->sin_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
376 vty_out(vty
, " port %d", ntohs(sin
->sin_port
));
382 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
383 vty_out(vty
, "fpm address %pI6", &sin6
->sin6_addr
);
384 if (sin6
->sin6_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
385 vty_out(vty
, " port %d", ntohs(sin6
->sin6_port
));
394 if (!gfnc
->use_nhg
) {
395 vty_out(vty
, "no fpm use-next-hop-groups\n");
402 static struct cmd_node fpm_node
= {
406 .config_write
= fpm_write_config
,
412 static void fpm_connect(struct thread
*t
);
414 static void fpm_reconnect(struct fpm_nl_ctx
*fnc
)
416 /* Cancel all zebra threads first. */
417 thread_cancel_async(zrouter
.master
, &fnc
->t_lspreset
, NULL
);
418 thread_cancel_async(zrouter
.master
, &fnc
->t_lspwalk
, NULL
);
419 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgreset
, NULL
);
420 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgwalk
, NULL
);
421 thread_cancel_async(zrouter
.master
, &fnc
->t_ribreset
, NULL
);
422 thread_cancel_async(zrouter
.master
, &fnc
->t_ribwalk
, NULL
);
423 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacreset
, NULL
);
424 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacwalk
, NULL
);
427 * Grab the lock to empty the streams (data plane might try to
428 * enqueue updates while we are closing).
430 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
432 /* Avoid calling close on `-1`. */
433 if (fnc
->socket
!= -1) {
438 stream_reset(fnc
->ibuf
);
439 stream_reset(fnc
->obuf
);
440 THREAD_OFF(fnc
->t_read
);
441 THREAD_OFF(fnc
->t_write
);
443 /* FPM is disabled, don't attempt to connect. */
447 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
451 static void fpm_read(struct thread
*t
)
453 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
457 struct nlmsghdr
*hdr
;
458 struct zebra_dplane_ctx
*ctx
;
459 size_t available_bytes
;
460 size_t hdr_available_bytes
;
462 /* Let's ignore the input at the moment. */
463 rv
= stream_read_try(fnc
->ibuf
, fnc
->socket
,
464 STREAM_WRITEABLE(fnc
->ibuf
));
466 atomic_fetch_add_explicit(&fnc
->counters
.connection_closes
, 1,
467 memory_order_relaxed
);
469 if (IS_ZEBRA_DEBUG_FPM
)
470 zlog_debug("%s: connection closed", __func__
);
476 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
477 memory_order_relaxed
);
478 zlog_warn("%s: connection failure: %s", __func__
,
484 /* Schedule the next read */
485 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, fnc
->socket
,
488 /* We've got an interruption. */
493 /* Account all bytes read. */
494 atomic_fetch_add_explicit(&fnc
->counters
.bytes_read
, rv
,
495 memory_order_relaxed
);
497 available_bytes
= STREAM_READABLE(fnc
->ibuf
);
498 while (available_bytes
) {
499 if (available_bytes
< (ssize_t
)FPM_MSG_HDR_LEN
) {
500 stream_pulldown(fnc
->ibuf
);
504 fpm
.version
= stream_getc(fnc
->ibuf
);
505 fpm
.msg_type
= stream_getc(fnc
->ibuf
);
506 fpm
.msg_len
= stream_getw(fnc
->ibuf
);
508 if (fpm
.version
!= FPM_PROTO_VERSION
&&
509 fpm
.msg_type
!= FPM_MSG_TYPE_NETLINK
) {
510 stream_reset(fnc
->ibuf
);
512 "%s: Received version/msg_type %u/%u, expected 1/1",
513 __func__
, fpm
.version
, fpm
.msg_type
);
520 * If the passed in length doesn't even fill in the header
521 * something is wrong and reset.
523 if (fpm
.msg_len
< FPM_MSG_HDR_LEN
) {
525 "%s: Received message length: %u that does not even fill the FPM header",
526 __func__
, fpm
.msg_len
);
532 * If we have not received the whole payload, reset the stream
533 * back to the beginning of the header and move it to the
536 if (fpm
.msg_len
> available_bytes
) {
537 stream_rewind_getp(fnc
->ibuf
, FPM_MSG_HDR_LEN
);
538 stream_pulldown(fnc
->ibuf
);
542 available_bytes
-= FPM_MSG_HDR_LEN
;
545 * Place the data from the stream into a buffer
547 hdr
= (struct nlmsghdr
*)buf
;
548 stream_get(buf
, fnc
->ibuf
, fpm
.msg_len
- FPM_MSG_HDR_LEN
);
549 hdr_available_bytes
= fpm
.msg_len
- FPM_MSG_HDR_LEN
;
550 available_bytes
-= hdr_available_bytes
;
552 /* Sanity check: must be at least header size. */
553 if (hdr
->nlmsg_len
< sizeof(*hdr
)) {
555 "%s: [seq=%u] invalid message length %u (< %zu)",
556 __func__
, hdr
->nlmsg_seq
, hdr
->nlmsg_len
,
560 if (hdr
->nlmsg_len
> fpm
.msg_len
) {
562 "%s: Received a inner header length of %u that is greater than the fpm total length of %u",
563 __func__
, hdr
->nlmsg_len
, fpm
.msg_len
);
566 /* Not enough bytes available. */
567 if (hdr
->nlmsg_len
> hdr_available_bytes
) {
569 "%s: [seq=%u] invalid message length %u (> %zu)",
570 __func__
, hdr
->nlmsg_seq
, hdr
->nlmsg_len
,
575 if (!(hdr
->nlmsg_flags
& NLM_F_REQUEST
)) {
576 if (IS_ZEBRA_DEBUG_FPM
)
578 "%s: [seq=%u] not a request, skipping",
579 __func__
, hdr
->nlmsg_seq
);
582 * This request is a bust, go to the next one
587 switch (hdr
->nlmsg_type
) {
589 ctx
= dplane_ctx_alloc();
590 dplane_ctx_set_op(ctx
, DPLANE_OP_ROUTE_NOTIFY
);
591 if (netlink_route_change_read_unicast_internal(
592 hdr
, 0, false, ctx
) != 1) {
593 dplane_ctx_fini(&ctx
);
594 stream_pulldown(fnc
->ibuf
);
596 * Let's continue to read other messages
597 * Even if we ignore this one.
602 if (IS_ZEBRA_DEBUG_FPM
)
604 "%s: Received message type %u which is not currently handled",
605 __func__
, hdr
->nlmsg_type
);
610 stream_reset(fnc
->ibuf
);
613 static void fpm_write(struct thread
*t
)
615 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
621 if (fnc
->connecting
== true) {
623 statuslen
= sizeof(status
);
625 rv
= getsockopt(fnc
->socket
, SOL_SOCKET
, SO_ERROR
, &status
,
627 if (rv
== -1 || status
!= 0) {
629 zlog_warn("%s: connection failed: %s", __func__
,
632 zlog_warn("%s: SO_ERROR failed: %s", __func__
,
635 atomic_fetch_add_explicit(
636 &fnc
->counters
.connection_errors
, 1,
637 memory_order_relaxed
);
643 fnc
->connecting
= false;
646 * Starting with LSPs walk all FPM objects, marking them
647 * as unsent and then replaying them.
649 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
652 /* Permit receiving messages now. */
653 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
,
654 fnc
->socket
, &fnc
->t_read
);
657 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
660 /* Stream is empty: reset pointers and return. */
661 if (STREAM_READABLE(fnc
->obuf
) == 0) {
662 stream_reset(fnc
->obuf
);
666 /* Try to write all at once. */
667 btotal
= stream_get_endp(fnc
->obuf
) -
668 stream_get_getp(fnc
->obuf
);
669 bwritten
= write(fnc
->socket
, stream_pnt(fnc
->obuf
), btotal
);
671 atomic_fetch_add_explicit(
672 &fnc
->counters
.connection_closes
, 1,
673 memory_order_relaxed
);
675 if (IS_ZEBRA_DEBUG_FPM
)
676 zlog_debug("%s: connection closed", __func__
);
679 if (bwritten
== -1) {
680 /* Attempt to continue if blocked by a signal. */
683 /* Receiver is probably slow, lets give it some time. */
684 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
687 atomic_fetch_add_explicit(
688 &fnc
->counters
.connection_errors
, 1,
689 memory_order_relaxed
);
690 zlog_warn("%s: connection failure: %s", __func__
,
697 /* Account all bytes sent. */
698 atomic_fetch_add_explicit(&fnc
->counters
.bytes_sent
, bwritten
,
699 memory_order_relaxed
);
701 /* Account number of bytes free. */
702 atomic_fetch_sub_explicit(&fnc
->counters
.obuf_bytes
, bwritten
,
703 memory_order_relaxed
);
705 stream_forward_getp(fnc
->obuf
, (size_t)bwritten
);
708 /* Stream is not empty yet, we must schedule more writes. */
709 if (STREAM_READABLE(fnc
->obuf
)) {
710 stream_pulldown(fnc
->obuf
);
711 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
,
712 fnc
->socket
, &fnc
->t_write
);
717 static void fpm_connect(struct thread
*t
)
719 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
720 struct sockaddr_in
*sin
= (struct sockaddr_in
*)&fnc
->addr
;
721 struct sockaddr_in6
*sin6
= (struct sockaddr_in6
*)&fnc
->addr
;
724 char addrstr
[INET6_ADDRSTRLEN
];
726 sock
= socket(fnc
->addr
.ss_family
, SOCK_STREAM
, 0);
728 zlog_err("%s: fpm socket failed: %s", __func__
,
730 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
735 set_nonblocking(sock
);
737 if (fnc
->addr
.ss_family
== AF_INET
) {
738 inet_ntop(AF_INET
, &sin
->sin_addr
, addrstr
, sizeof(addrstr
));
741 inet_ntop(AF_INET6
, &sin6
->sin6_addr
, addrstr
, sizeof(addrstr
));
742 slen
= sizeof(*sin6
);
745 if (IS_ZEBRA_DEBUG_FPM
)
746 zlog_debug("%s: attempting to connect to %s:%d", __func__
,
747 addrstr
, ntohs(sin
->sin_port
));
749 rv
= connect(sock
, (struct sockaddr
*)&fnc
->addr
, slen
);
750 if (rv
== -1 && errno
!= EINPROGRESS
) {
751 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
752 memory_order_relaxed
);
754 zlog_warn("%s: fpm connection failed: %s", __func__
,
756 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
761 fnc
->connecting
= (errno
== EINPROGRESS
);
763 if (!fnc
->connecting
)
764 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, sock
,
766 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, sock
,
770 * Starting with LSPs walk all FPM objects, marking them
771 * as unsent and then replaying them.
773 * If we are not connected, then delay the objects reset/send.
775 if (!fnc
->connecting
)
776 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
781 * Encode data plane operation context into netlink and enqueue it in the FPM
784 * @param fnc the netlink FPM context.
785 * @param ctx the data plane operation context data.
786 * @return 0 on success or -1 on not enough space.
788 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
)
790 uint8_t nl_buf
[NL_PKT_BUF_SIZE
];
793 uint64_t obytes
, obytes_peak
;
794 enum dplane_op_e op
= dplane_ctx_get_op(ctx
);
797 * If we were configured to not use next hop groups, then quit as soon
801 && (op
== DPLANE_OP_NH_DELETE
|| op
== DPLANE_OP_NH_INSTALL
802 || op
== DPLANE_OP_NH_UPDATE
))
807 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
810 case DPLANE_OP_ROUTE_UPDATE
:
811 case DPLANE_OP_ROUTE_DELETE
:
812 rv
= netlink_route_multipath_msg_encode(RTM_DELROUTE
, ctx
,
813 nl_buf
, sizeof(nl_buf
),
817 "%s: netlink_route_multipath_msg_encode failed",
822 nl_buf_len
= (size_t)rv
;
824 /* UPDATE operations need a INSTALL, otherwise just quit. */
825 if (op
== DPLANE_OP_ROUTE_DELETE
)
829 case DPLANE_OP_ROUTE_INSTALL
:
830 rv
= netlink_route_multipath_msg_encode(
831 RTM_NEWROUTE
, ctx
, &nl_buf
[nl_buf_len
],
832 sizeof(nl_buf
) - nl_buf_len
, true, fnc
->use_nhg
);
835 "%s: netlink_route_multipath_msg_encode failed",
840 nl_buf_len
+= (size_t)rv
;
843 case DPLANE_OP_MAC_INSTALL
:
844 case DPLANE_OP_MAC_DELETE
:
845 rv
= netlink_macfdb_update_ctx(ctx
, nl_buf
, sizeof(nl_buf
));
847 zlog_err("%s: netlink_macfdb_update_ctx failed",
852 nl_buf_len
= (size_t)rv
;
855 case DPLANE_OP_NH_DELETE
:
856 rv
= netlink_nexthop_msg_encode(RTM_DELNEXTHOP
, ctx
, nl_buf
,
857 sizeof(nl_buf
), true);
859 zlog_err("%s: netlink_nexthop_msg_encode failed",
864 nl_buf_len
= (size_t)rv
;
866 case DPLANE_OP_NH_INSTALL
:
867 case DPLANE_OP_NH_UPDATE
:
868 rv
= netlink_nexthop_msg_encode(RTM_NEWNEXTHOP
, ctx
, nl_buf
,
869 sizeof(nl_buf
), true);
871 zlog_err("%s: netlink_nexthop_msg_encode failed",
876 nl_buf_len
= (size_t)rv
;
879 case DPLANE_OP_LSP_INSTALL
:
880 case DPLANE_OP_LSP_UPDATE
:
881 case DPLANE_OP_LSP_DELETE
:
882 rv
= netlink_lsp_msg_encoder(ctx
, nl_buf
, sizeof(nl_buf
));
884 zlog_err("%s: netlink_lsp_msg_encoder failed",
889 nl_buf_len
+= (size_t)rv
;
892 /* Un-handled by FPM at this time. */
893 case DPLANE_OP_PW_INSTALL
:
894 case DPLANE_OP_PW_UNINSTALL
:
895 case DPLANE_OP_ADDR_INSTALL
:
896 case DPLANE_OP_ADDR_UNINSTALL
:
897 case DPLANE_OP_NEIGH_INSTALL
:
898 case DPLANE_OP_NEIGH_UPDATE
:
899 case DPLANE_OP_NEIGH_DELETE
:
900 case DPLANE_OP_VTEP_ADD
:
901 case DPLANE_OP_VTEP_DELETE
:
902 case DPLANE_OP_SYS_ROUTE_ADD
:
903 case DPLANE_OP_SYS_ROUTE_DELETE
:
904 case DPLANE_OP_ROUTE_NOTIFY
:
905 case DPLANE_OP_LSP_NOTIFY
:
906 case DPLANE_OP_RULE_ADD
:
907 case DPLANE_OP_RULE_DELETE
:
908 case DPLANE_OP_RULE_UPDATE
:
909 case DPLANE_OP_NEIGH_DISCOVER
:
910 case DPLANE_OP_BR_PORT_UPDATE
:
911 case DPLANE_OP_IPTABLE_ADD
:
912 case DPLANE_OP_IPTABLE_DELETE
:
913 case DPLANE_OP_IPSET_ADD
:
914 case DPLANE_OP_IPSET_DELETE
:
915 case DPLANE_OP_IPSET_ENTRY_ADD
:
916 case DPLANE_OP_IPSET_ENTRY_DELETE
:
917 case DPLANE_OP_NEIGH_IP_INSTALL
:
918 case DPLANE_OP_NEIGH_IP_DELETE
:
919 case DPLANE_OP_NEIGH_TABLE_UPDATE
:
920 case DPLANE_OP_GRE_SET
:
921 case DPLANE_OP_INTF_ADDR_ADD
:
922 case DPLANE_OP_INTF_ADDR_DEL
:
923 case DPLANE_OP_INTF_NETCONFIG
:
924 case DPLANE_OP_INTF_INSTALL
:
925 case DPLANE_OP_INTF_UPDATE
:
926 case DPLANE_OP_INTF_DELETE
:
927 case DPLANE_OP_TC_QDISC_INSTALL
:
928 case DPLANE_OP_TC_QDISC_UNINSTALL
:
929 case DPLANE_OP_TC_CLASS_ADD
:
930 case DPLANE_OP_TC_CLASS_DELETE
:
931 case DPLANE_OP_TC_CLASS_UPDATE
:
932 case DPLANE_OP_TC_FILTER_ADD
:
933 case DPLANE_OP_TC_FILTER_DELETE
:
934 case DPLANE_OP_TC_FILTER_UPDATE
:
940 /* Skip empty enqueues. */
944 /* We must know if someday a message goes beyond 65KiB. */
945 assert((nl_buf_len
+ FPM_HEADER_SIZE
) <= UINT16_MAX
);
947 /* Check if we have enough buffer space. */
948 if (STREAM_WRITEABLE(fnc
->obuf
) < (nl_buf_len
+ FPM_HEADER_SIZE
)) {
949 atomic_fetch_add_explicit(&fnc
->counters
.buffer_full
, 1,
950 memory_order_relaxed
);
952 if (IS_ZEBRA_DEBUG_FPM
)
954 "%s: buffer full: wants to write %zu but has %zu",
955 __func__
, nl_buf_len
+ FPM_HEADER_SIZE
,
956 STREAM_WRITEABLE(fnc
->obuf
));
962 * Fill in the FPM header information.
964 * See FPM_HEADER_SIZE definition for more information.
966 stream_putc(fnc
->obuf
, 1);
967 stream_putc(fnc
->obuf
, 1);
968 stream_putw(fnc
->obuf
, nl_buf_len
+ FPM_HEADER_SIZE
);
970 /* Write current data. */
971 stream_write(fnc
->obuf
, nl_buf
, (size_t)nl_buf_len
);
973 /* Account number of bytes waiting to be written. */
974 atomic_fetch_add_explicit(&fnc
->counters
.obuf_bytes
,
975 nl_buf_len
+ FPM_HEADER_SIZE
,
976 memory_order_relaxed
);
977 obytes
= atomic_load_explicit(&fnc
->counters
.obuf_bytes
,
978 memory_order_relaxed
);
979 obytes_peak
= atomic_load_explicit(&fnc
->counters
.obuf_peak
,
980 memory_order_relaxed
);
981 if (obytes_peak
< obytes
)
982 atomic_store_explicit(&fnc
->counters
.obuf_peak
, obytes
,
983 memory_order_relaxed
);
985 /* Tell the thread to start writing. */
986 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, fnc
->socket
,
993 * LSP walk/send functions
996 struct zebra_dplane_ctx
*ctx
;
997 struct fpm_nl_ctx
*fnc
;
1001 static int fpm_lsp_send_cb(struct hash_bucket
*bucket
, void *arg
)
1003 struct zebra_lsp
*lsp
= bucket
->data
;
1004 struct fpm_lsp_arg
*fla
= arg
;
1006 /* Skip entries which have already been sent */
1007 if (CHECK_FLAG(lsp
->flags
, LSP_FLAG_FPM
))
1008 return HASHWALK_CONTINUE
;
1010 dplane_ctx_reset(fla
->ctx
);
1011 dplane_ctx_lsp_init(fla
->ctx
, DPLANE_OP_LSP_INSTALL
, lsp
);
1013 if (fpm_nl_enqueue(fla
->fnc
, fla
->ctx
) == -1) {
1014 fla
->complete
= false;
1015 return HASHWALK_ABORT
;
1018 /* Mark entry as sent */
1019 SET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
1020 return HASHWALK_CONTINUE
;
1023 static void fpm_lsp_send(struct thread
*t
)
1025 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1026 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
1027 struct fpm_lsp_arg fla
;
1030 fla
.ctx
= dplane_ctx_alloc();
1031 fla
.complete
= true;
1033 hash_walk(zvrf
->lsp_table
, fpm_lsp_send_cb
, &fla
);
1035 dplane_ctx_fini(&fla
.ctx
);
1038 WALK_FINISH(fnc
, FNE_LSP_FINISHED
);
1040 /* Now move onto routes */
1041 thread_add_timer(zrouter
.master
, fpm_nhg_reset
, fnc
, 0,
1044 /* Didn't finish - reschedule LSP walk */
1045 thread_add_timer(zrouter
.master
, fpm_lsp_send
, fnc
, 0,
1051 * Next hop walk/send functions.
1053 struct fpm_nhg_arg
{
1054 struct zebra_dplane_ctx
*ctx
;
1055 struct fpm_nl_ctx
*fnc
;
1059 static int fpm_nhg_send_cb(struct hash_bucket
*bucket
, void *arg
)
1061 struct nhg_hash_entry
*nhe
= bucket
->data
;
1062 struct fpm_nhg_arg
*fna
= arg
;
1064 /* This entry was already sent, skip it. */
1065 if (CHECK_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
))
1066 return HASHWALK_CONTINUE
;
1068 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
1069 dplane_ctx_reset(fna
->ctx
);
1070 dplane_ctx_nexthop_init(fna
->ctx
, DPLANE_OP_NH_INSTALL
, nhe
);
1071 if (fpm_nl_enqueue(fna
->fnc
, fna
->ctx
) == -1) {
1072 /* Our buffers are full, lets give it some cycles. */
1073 fna
->complete
= false;
1074 return HASHWALK_ABORT
;
1077 /* Mark group as sent, so it doesn't get sent again. */
1078 SET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
1080 return HASHWALK_CONTINUE
;
1083 static void fpm_nhg_send(struct thread
*t
)
1085 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1086 struct fpm_nhg_arg fna
;
1089 fna
.ctx
= dplane_ctx_alloc();
1090 fna
.complete
= true;
1092 /* Send next hops. */
1094 hash_walk(zrouter
.nhgs_id
, fpm_nhg_send_cb
, &fna
);
1096 /* `free()` allocated memory. */
1097 dplane_ctx_fini(&fna
.ctx
);
1099 /* We are done sending next hops, lets install the routes now. */
1101 WALK_FINISH(fnc
, FNE_NHG_FINISHED
);
1102 thread_add_timer(zrouter
.master
, fpm_rib_reset
, fnc
, 0,
1104 } else /* Otherwise reschedule next hop group again. */
1105 thread_add_timer(zrouter
.master
, fpm_nhg_send
, fnc
, 0,
1110 * Send all RIB installed routes to the connected data plane.
1112 static void fpm_rib_send(struct thread
*t
)
1114 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1116 struct route_node
*rn
;
1117 struct route_table
*rt
;
1118 struct zebra_dplane_ctx
*ctx
;
1119 rib_tables_iter_t rt_iter
;
1121 /* Allocate temporary context for all transactions. */
1122 ctx
= dplane_ctx_alloc();
1124 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1125 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1126 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1127 dest
= rib_dest_from_rnode(rn
);
1128 /* Skip bad route entries. */
1129 if (dest
== NULL
|| dest
->selected_fib
== NULL
)
1132 /* Check for already sent routes. */
1133 if (CHECK_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
))
1136 /* Enqueue route install. */
1137 dplane_ctx_reset(ctx
);
1138 dplane_ctx_route_init(ctx
, DPLANE_OP_ROUTE_INSTALL
, rn
,
1139 dest
->selected_fib
);
1140 if (fpm_nl_enqueue(fnc
, ctx
) == -1) {
1141 /* Free the temporary allocated context. */
1142 dplane_ctx_fini(&ctx
);
1144 thread_add_timer(zrouter
.master
, fpm_rib_send
,
1145 fnc
, 1, &fnc
->t_ribwalk
);
1150 SET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1154 /* Free the temporary allocated context. */
1155 dplane_ctx_fini(&ctx
);
1157 /* All RIB routes sent! */
1158 WALK_FINISH(fnc
, FNE_RIB_FINISHED
);
1160 /* Schedule next event: RMAC reset. */
1161 thread_add_event(zrouter
.master
, fpm_rmac_reset
, fnc
, 0,
1166 * The next three functions will handle RMAC enqueue.
1168 struct fpm_rmac_arg
{
1169 struct zebra_dplane_ctx
*ctx
;
1170 struct fpm_nl_ctx
*fnc
;
1171 struct zebra_l3vni
*zl3vni
;
1175 static void fpm_enqueue_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1177 struct fpm_rmac_arg
*fra
= arg
;
1178 struct zebra_mac
*zrmac
= bucket
->data
;
1179 struct zebra_if
*zif
= fra
->zl3vni
->vxlan_if
->info
;
1180 struct zebra_vxlan_vni
*vni
;
1181 struct zebra_if
*br_zif
;
1185 /* Entry already sent. */
1186 if (CHECK_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
) || !fra
->complete
)
1189 sticky
= !!CHECK_FLAG(zrmac
->flags
,
1190 (ZEBRA_MAC_STICKY
| ZEBRA_MAC_REMOTE_DEF_GW
));
1191 br_zif
= (struct zebra_if
*)(zif
->brslave_info
.br_if
->info
);
1192 vni
= zebra_vxlan_if_vni_find(zif
, fra
->zl3vni
->vni
);
1193 vid
= IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif
) ? vni
->access_vlan
: 0;
1195 dplane_ctx_reset(fra
->ctx
);
1196 dplane_ctx_set_op(fra
->ctx
, DPLANE_OP_MAC_INSTALL
);
1197 dplane_mac_init(fra
->ctx
, fra
->zl3vni
->vxlan_if
,
1198 zif
->brslave_info
.br_if
, vid
, &zrmac
->macaddr
, vni
->vni
,
1199 zrmac
->fwd_info
.r_vtep_ip
, sticky
, 0 /*nhg*/,
1200 0 /*update_flags*/);
1201 if (fpm_nl_enqueue(fra
->fnc
, fra
->ctx
) == -1) {
1202 thread_add_timer(zrouter
.master
, fpm_rmac_send
,
1203 fra
->fnc
, 1, &fra
->fnc
->t_rmacwalk
);
1204 fra
->complete
= false;
1208 static void fpm_enqueue_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1210 struct fpm_rmac_arg
*fra
= arg
;
1211 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1213 fra
->zl3vni
= zl3vni
;
1214 hash_iterate(zl3vni
->rmac_table
, fpm_enqueue_rmac_table
, zl3vni
);
1217 static void fpm_rmac_send(struct thread
*t
)
1219 struct fpm_rmac_arg fra
;
1221 fra
.fnc
= THREAD_ARG(t
);
1222 fra
.ctx
= dplane_ctx_alloc();
1223 fra
.complete
= true;
1224 hash_iterate(zrouter
.l3vni_table
, fpm_enqueue_l3vni_table
, &fra
);
1225 dplane_ctx_fini(&fra
.ctx
);
1227 /* RMAC walk completed. */
1229 WALK_FINISH(fra
.fnc
, FNE_RMAC_FINISHED
);
1233 * Resets the next hop FPM flags so we send all next hops again.
1235 static void fpm_nhg_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1237 struct nhg_hash_entry
*nhe
= bucket
->data
;
1239 /* Unset FPM installation flag so it gets installed again. */
1240 UNSET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
1243 static void fpm_nhg_reset(struct thread
*t
)
1245 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1247 hash_iterate(zrouter
.nhgs_id
, fpm_nhg_reset_cb
, NULL
);
1249 /* Schedule next step: send next hop groups. */
1250 thread_add_event(zrouter
.master
, fpm_nhg_send
, fnc
, 0, &fnc
->t_nhgwalk
);
1254 * Resets the LSP FPM flag so we send all LSPs again.
1256 static void fpm_lsp_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1258 struct zebra_lsp
*lsp
= bucket
->data
;
1260 UNSET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
1263 static void fpm_lsp_reset(struct thread
*t
)
1265 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1266 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
1268 hash_iterate(zvrf
->lsp_table
, fpm_lsp_reset_cb
, NULL
);
1270 /* Schedule next step: send LSPs */
1271 thread_add_event(zrouter
.master
, fpm_lsp_send
, fnc
, 0, &fnc
->t_lspwalk
);
1275 * Resets the RIB FPM flags so we send all routes again.
1277 static void fpm_rib_reset(struct thread
*t
)
1279 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1281 struct route_node
*rn
;
1282 struct route_table
*rt
;
1283 rib_tables_iter_t rt_iter
;
1285 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1286 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1287 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1288 dest
= rib_dest_from_rnode(rn
);
1289 /* Skip bad route entries. */
1293 UNSET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1297 /* Schedule next step: send RIB routes. */
1298 thread_add_event(zrouter
.master
, fpm_rib_send
, fnc
, 0, &fnc
->t_ribwalk
);
1302 * The next three function will handle RMAC table reset.
1304 static void fpm_unset_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1306 struct zebra_mac
*zrmac
= bucket
->data
;
1308 UNSET_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
);
1311 static void fpm_unset_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1313 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1315 hash_iterate(zl3vni
->rmac_table
, fpm_unset_rmac_table
, zl3vni
);
1318 static void fpm_rmac_reset(struct thread
*t
)
1320 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1322 hash_iterate(zrouter
.l3vni_table
, fpm_unset_l3vni_table
, NULL
);
1324 /* Schedule next event: send RMAC entries. */
1325 thread_add_event(zrouter
.master
, fpm_rmac_send
, fnc
, 0,
1329 static void fpm_process_queue(struct thread
*t
)
1331 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1332 struct zebra_dplane_ctx
*ctx
;
1333 bool no_bufs
= false;
1334 uint64_t processed_contexts
= 0;
1337 /* No space available yet. */
1338 if (STREAM_WRITEABLE(fnc
->obuf
) < NL_PKT_BUF_SIZE
) {
1343 /* Dequeue next item or quit processing. */
1344 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1345 ctx
= dplane_ctx_dequeue(&fnc
->ctxqueue
);
1351 * Intentionally ignoring the return value
1352 * as that we are ensuring that we can write to
1353 * the output data in the STREAM_WRITEABLE
1354 * check above, so we can ignore the return
1356 if (fnc
->socket
!= -1)
1357 (void)fpm_nl_enqueue(fnc
, ctx
);
1359 /* Account the processed entries. */
1360 processed_contexts
++;
1361 atomic_fetch_sub_explicit(&fnc
->counters
.ctxqueue_len
, 1,
1362 memory_order_relaxed
);
1364 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1365 dplane_provider_enqueue_out_ctx(fnc
->prov
, ctx
);
1368 /* Update count of processed contexts */
1369 atomic_fetch_add_explicit(&fnc
->counters
.dplane_contexts
,
1370 processed_contexts
, memory_order_relaxed
);
1372 /* Re-schedule if we ran out of buffer space */
1374 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1375 fnc
, 0, &fnc
->t_dequeue
);
1378 * Let the dataplane thread know if there are items in the
1379 * output queue to be processed. Otherwise they may sit
1380 * until the dataplane thread gets scheduled for new,
1383 if (dplane_provider_out_ctx_queue_len(fnc
->prov
) > 0)
1384 dplane_provider_work_ready();
1388 * Handles external (e.g. CLI, data plane or others) events.
1390 static void fpm_process_event(struct thread
*t
)
1392 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1393 enum fpm_nl_events event
= THREAD_VAL(t
);
1397 zlog_info("%s: manual FPM disable event", __func__
);
1398 fnc
->disabled
= true;
1399 atomic_fetch_add_explicit(&fnc
->counters
.user_disables
, 1,
1400 memory_order_relaxed
);
1402 /* Call reconnect to disable timers and clean up context. */
1407 zlog_info("%s: manual FPM reconnect event", __func__
);
1408 fnc
->disabled
= false;
1409 atomic_fetch_add_explicit(&fnc
->counters
.user_configures
, 1,
1410 memory_order_relaxed
);
1414 case FNE_RESET_COUNTERS
:
1415 zlog_info("%s: manual FPM counters reset event", __func__
);
1416 memset(&fnc
->counters
, 0, sizeof(fnc
->counters
));
1419 case FNE_TOGGLE_NHG
:
1420 zlog_info("%s: toggle next hop groups support", __func__
);
1421 fnc
->use_nhg
= !fnc
->use_nhg
;
1425 case FNE_INTERNAL_RECONNECT
:
1429 case FNE_NHG_FINISHED
:
1430 if (IS_ZEBRA_DEBUG_FPM
)
1431 zlog_debug("%s: next hop groups walk finished",
1434 case FNE_RIB_FINISHED
:
1435 if (IS_ZEBRA_DEBUG_FPM
)
1436 zlog_debug("%s: RIB walk finished", __func__
);
1438 case FNE_RMAC_FINISHED
:
1439 if (IS_ZEBRA_DEBUG_FPM
)
1440 zlog_debug("%s: RMAC walk finished", __func__
);
1442 case FNE_LSP_FINISHED
:
1443 if (IS_ZEBRA_DEBUG_FPM
)
1444 zlog_debug("%s: LSP walk finished", __func__
);
1450 * Data plane functions.
1452 static int fpm_nl_start(struct zebra_dplane_provider
*prov
)
1454 struct fpm_nl_ctx
*fnc
;
1456 fnc
= dplane_provider_get_data(prov
);
1457 fnc
->fthread
= frr_pthread_new(NULL
, prov_name
, prov_name
);
1458 assert(frr_pthread_run(fnc
->fthread
, NULL
) == 0);
1459 fnc
->ibuf
= stream_new(NL_PKT_BUF_SIZE
);
1460 fnc
->obuf
= stream_new(NL_PKT_BUF_SIZE
* 128);
1461 pthread_mutex_init(&fnc
->obuf_mutex
, NULL
);
1463 fnc
->disabled
= true;
1465 dplane_ctx_q_init(&fnc
->ctxqueue
);
1466 pthread_mutex_init(&fnc
->ctxqueue_mutex
, NULL
);
1468 /* Set default values. */
1469 fnc
->use_nhg
= true;
1474 static int fpm_nl_finish_early(struct fpm_nl_ctx
*fnc
)
1476 /* Disable all events and close socket. */
1477 THREAD_OFF(fnc
->t_lspreset
);
1478 THREAD_OFF(fnc
->t_lspwalk
);
1479 THREAD_OFF(fnc
->t_nhgreset
);
1480 THREAD_OFF(fnc
->t_nhgwalk
);
1481 THREAD_OFF(fnc
->t_ribreset
);
1482 THREAD_OFF(fnc
->t_ribwalk
);
1483 THREAD_OFF(fnc
->t_rmacreset
);
1484 THREAD_OFF(fnc
->t_rmacwalk
);
1485 THREAD_OFF(fnc
->t_event
);
1486 THREAD_OFF(fnc
->t_nhg
);
1487 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_read
, NULL
);
1488 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_write
, NULL
);
1489 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_connect
, NULL
);
1491 if (fnc
->socket
!= -1) {
1499 static int fpm_nl_finish_late(struct fpm_nl_ctx
*fnc
)
1501 /* Stop the running thread. */
1502 frr_pthread_stop(fnc
->fthread
, NULL
);
1504 /* Free all allocated resources. */
1505 pthread_mutex_destroy(&fnc
->obuf_mutex
);
1506 pthread_mutex_destroy(&fnc
->ctxqueue_mutex
);
1507 stream_free(fnc
->ibuf
);
1508 stream_free(fnc
->obuf
);
1515 static int fpm_nl_finish(struct zebra_dplane_provider
*prov
, bool early
)
1517 struct fpm_nl_ctx
*fnc
;
1519 fnc
= dplane_provider_get_data(prov
);
1521 return fpm_nl_finish_early(fnc
);
1523 return fpm_nl_finish_late(fnc
);
1526 static int fpm_nl_process(struct zebra_dplane_provider
*prov
)
1528 struct zebra_dplane_ctx
*ctx
;
1529 struct fpm_nl_ctx
*fnc
;
1531 uint64_t cur_queue
, peak_queue
= 0, stored_peak_queue
;
1533 fnc
= dplane_provider_get_data(prov
);
1534 limit
= dplane_provider_get_work_limit(prov
);
1535 for (counter
= 0; counter
< limit
; counter
++) {
1536 ctx
= dplane_provider_dequeue_in_ctx(prov
);
1541 * Skip all notifications if not connected, we'll walk the RIB
1544 if (fnc
->socket
!= -1 && fnc
->connecting
== false) {
1546 * Update the number of queued contexts *before*
1547 * enqueueing, to ensure counter consistency.
1549 atomic_fetch_add_explicit(&fnc
->counters
.ctxqueue_len
,
1550 1, memory_order_relaxed
);
1552 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1553 dplane_ctx_enqueue_tail(&fnc
->ctxqueue
, ctx
);
1556 cur_queue
= atomic_load_explicit(
1557 &fnc
->counters
.ctxqueue_len
,
1558 memory_order_relaxed
);
1559 if (peak_queue
< cur_queue
)
1560 peak_queue
= cur_queue
;
1564 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1565 dplane_provider_enqueue_out_ctx(prov
, ctx
);
1568 /* Update peak queue length, if we just observed a new peak */
1569 stored_peak_queue
= atomic_load_explicit(
1570 &fnc
->counters
.ctxqueue_len_peak
, memory_order_relaxed
);
1571 if (stored_peak_queue
< peak_queue
)
1572 atomic_store_explicit(&fnc
->counters
.ctxqueue_len_peak
,
1573 peak_queue
, memory_order_relaxed
);
1575 if (atomic_load_explicit(&fnc
->counters
.ctxqueue_len
,
1576 memory_order_relaxed
)
1578 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1579 fnc
, 0, &fnc
->t_dequeue
);
1581 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1582 if (counter
>= limit
)
1583 dplane_provider_work_ready();
1588 static int fpm_nl_new(struct thread_master
*tm
)
1590 struct zebra_dplane_provider
*prov
= NULL
;
1593 gfnc
= calloc(1, sizeof(*gfnc
));
1594 rv
= dplane_provider_register(prov_name
, DPLANE_PRIO_POSTPROCESS
,
1595 DPLANE_PROV_FLAG_THREADED
, fpm_nl_start
,
1596 fpm_nl_process
, fpm_nl_finish
, gfnc
,
1599 if (IS_ZEBRA_DEBUG_DPLANE
)
1600 zlog_debug("%s register status: %d", prov_name
, rv
);
1602 install_node(&fpm_node
);
1603 install_element(ENABLE_NODE
, &fpm_show_counters_cmd
);
1604 install_element(ENABLE_NODE
, &fpm_show_counters_json_cmd
);
1605 install_element(ENABLE_NODE
, &fpm_reset_counters_cmd
);
1606 install_element(CONFIG_NODE
, &fpm_set_address_cmd
);
1607 install_element(CONFIG_NODE
, &no_fpm_set_address_cmd
);
1608 install_element(CONFIG_NODE
, &fpm_use_nhg_cmd
);
1609 install_element(CONFIG_NODE
, &no_fpm_use_nhg_cmd
);
1614 static int fpm_nl_init(void)
1616 hook_register(frr_late_init
, fpm_nl_new
);
1621 .name
= "dplane_fpm_nl",
1623 .description
= "Data plane plugin for FPM using netlink.",
1624 .init
= fpm_nl_init
,