2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "config.h" /* Include this explicitly */
26 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
34 #include "lib/zebra.h"
36 #include "lib/libfrr.h"
37 #include "lib/frratomic.h"
38 #include "lib/command.h"
39 #include "lib/memory.h"
40 #include "lib/network.h"
42 #include "lib/frr_pthread.h"
43 #include "zebra/debug.h"
44 #include "zebra/interface.h"
45 #include "zebra/zebra_dplane.h"
46 #include "zebra/zebra_mpls.h"
47 #include "zebra/zebra_router.h"
48 #include "zebra/zebra_evpn.h"
49 #include "zebra/zebra_evpn_mac.h"
50 #include "zebra/zebra_vxlan_private.h"
51 #include "zebra/kernel_netlink.h"
52 #include "zebra/rt_netlink.h"
53 #include "zebra/debug.h"
56 #define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
57 #define SOUTHBOUND_DEFAULT_PORT 2620
62 * version: 1 byte (always 1),
63 * type: 1 byte (1 for netlink, 2 protobuf),
64 * len: 2 bytes (network order),
67 * This header is used with any format to tell the users how many bytes to
70 #define FPM_HEADER_SIZE 4
72 static const char *prov_name
= "dplane_fpm_nl";
75 /* data plane connection. */
80 struct sockaddr_storage addr
;
82 /* data plane buffers. */
85 pthread_mutex_t obuf_mutex
;
88 * data plane context queue:
89 * When a FPM server connection becomes a bottleneck, we must keep the
90 * data plane contexts until we get a chance to process them.
92 struct dplane_ctx_list_head ctxqueue
;
93 pthread_mutex_t ctxqueue_mutex
;
95 /* data plane events. */
96 struct zebra_dplane_provider
*prov
;
97 struct frr_pthread
*fthread
;
98 struct thread
*t_connect
;
99 struct thread
*t_read
;
100 struct thread
*t_write
;
101 struct thread
*t_event
;
102 struct thread
*t_nhg
;
103 struct thread
*t_dequeue
;
106 struct thread
*t_lspreset
;
107 struct thread
*t_lspwalk
;
108 struct thread
*t_nhgreset
;
109 struct thread
*t_nhgwalk
;
110 struct thread
*t_ribreset
;
111 struct thread
*t_ribwalk
;
112 struct thread
*t_rmacreset
;
113 struct thread
*t_rmacwalk
;
115 /* Statistic counters. */
117 /* Amount of bytes read into ibuf. */
118 _Atomic
uint32_t bytes_read
;
119 /* Amount of bytes written from obuf. */
120 _Atomic
uint32_t bytes_sent
;
121 /* Output buffer current usage. */
122 _Atomic
uint32_t obuf_bytes
;
123 /* Output buffer peak usage. */
124 _Atomic
uint32_t obuf_peak
;
126 /* Amount of connection closes. */
127 _Atomic
uint32_t connection_closes
;
128 /* Amount of connection errors. */
129 _Atomic
uint32_t connection_errors
;
131 /* Amount of user configurations: FNE_RECONNECT. */
132 _Atomic
uint32_t user_configures
;
133 /* Amount of user disable requests: FNE_DISABLE. */
134 _Atomic
uint32_t user_disables
;
136 /* Amount of data plane context processed. */
137 _Atomic
uint32_t dplane_contexts
;
138 /* Amount of data plane contexts enqueued. */
139 _Atomic
uint32_t ctxqueue_len
;
140 /* Peak amount of data plane contexts enqueued. */
141 _Atomic
uint32_t ctxqueue_len_peak
;
143 /* Amount of buffer full events. */
144 _Atomic
uint32_t buffer_full
;
149 /* Ask for FPM to reconnect the external server. */
153 /* Reset counters. */
155 /* Toggle next hop group feature. */
157 /* Reconnect request by our own code to avoid races. */
158 FNE_INTERNAL_RECONNECT
,
160 /* LSP walk finished. */
162 /* Next hop groups walk finished. */
164 /* RIB walk finished. */
166 /* RMAC walk finished. */
170 #define FPM_RECONNECT(fnc) \
171 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
172 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
174 #define WALK_FINISH(fnc, ev) \
175 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
181 static void fpm_process_event(struct thread
*t
);
182 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
);
183 static void fpm_lsp_send(struct thread
*t
);
184 static void fpm_lsp_reset(struct thread
*t
);
185 static void fpm_nhg_send(struct thread
*t
);
186 static void fpm_nhg_reset(struct thread
*t
);
187 static void fpm_rib_send(struct thread
*t
);
188 static void fpm_rib_reset(struct thread
*t
);
189 static void fpm_rmac_send(struct thread
*t
);
190 static void fpm_rmac_reset(struct thread
*t
);
195 #define FPM_STR "Forwarding Plane Manager configuration\n"
197 DEFUN(fpm_set_address
, fpm_set_address_cmd
,
198 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
200 "FPM remote listening server address\n"
201 "Remote IPv4 FPM server\n"
202 "Remote IPv6 FPM server\n"
203 "FPM remote listening server port\n"
204 "Remote FPM server port\n")
206 struct sockaddr_in
*sin
;
207 struct sockaddr_in6
*sin6
;
209 uint8_t naddr
[INET6_BUFSIZ
];
212 port
= strtol(argv
[4]->arg
, NULL
, 10);
214 /* Handle IPv4 addresses. */
215 if (inet_pton(AF_INET
, argv
[2]->arg
, naddr
) == 1) {
216 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
218 memset(sin
, 0, sizeof(*sin
));
219 sin
->sin_family
= AF_INET
;
221 port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
222 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
223 sin
->sin_len
= sizeof(*sin
);
224 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
225 memcpy(&sin
->sin_addr
, naddr
, sizeof(sin
->sin_addr
));
230 /* Handle IPv6 addresses. */
231 if (inet_pton(AF_INET6
, argv
[2]->arg
, naddr
) != 1) {
232 vty_out(vty
, "%% Invalid address: %s\n", argv
[2]->arg
);
236 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
237 memset(sin6
, 0, sizeof(*sin6
));
238 sin6
->sin6_family
= AF_INET6
;
239 sin6
->sin6_port
= port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
240 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
241 sin6
->sin6_len
= sizeof(*sin6
);
242 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
243 memcpy(&sin6
->sin6_addr
, naddr
, sizeof(sin6
->sin6_addr
));
246 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
247 FNE_RECONNECT
, &gfnc
->t_event
);
251 DEFUN(no_fpm_set_address
, no_fpm_set_address_cmd
,
252 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
255 "FPM remote listening server address\n"
256 "Remote IPv4 FPM server\n"
257 "Remote IPv6 FPM server\n"
258 "FPM remote listening server port\n"
259 "Remote FPM server port\n")
261 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
262 FNE_DISABLE
, &gfnc
->t_event
);
266 DEFUN(fpm_use_nhg
, fpm_use_nhg_cmd
,
267 "fpm use-next-hop-groups",
269 "Use netlink next hop groups feature.\n")
271 /* Already enabled. */
275 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
276 FNE_TOGGLE_NHG
, &gfnc
->t_nhg
);
281 DEFUN(no_fpm_use_nhg
, no_fpm_use_nhg_cmd
,
282 "no fpm use-next-hop-groups",
285 "Use netlink next hop groups feature.\n")
287 /* Already disabled. */
291 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
292 FNE_TOGGLE_NHG
, &gfnc
->t_nhg
);
297 DEFUN(fpm_reset_counters
, fpm_reset_counters_cmd
,
298 "clear fpm counters",
301 "FPM statistic counters\n")
303 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
304 FNE_RESET_COUNTERS
, &gfnc
->t_event
);
308 DEFUN(fpm_show_counters
, fpm_show_counters_cmd
,
312 "FPM statistic counters\n")
314 vty_out(vty
, "%30s\n%30s\n", "FPM counters", "============");
316 #define SHOW_COUNTER(label, counter) \
317 vty_out(vty, "%28s: %u\n", (label), (counter))
319 SHOW_COUNTER("Input bytes", gfnc
->counters
.bytes_read
);
320 SHOW_COUNTER("Output bytes", gfnc
->counters
.bytes_sent
);
321 SHOW_COUNTER("Output buffer current size", gfnc
->counters
.obuf_bytes
);
322 SHOW_COUNTER("Output buffer peak size", gfnc
->counters
.obuf_peak
);
323 SHOW_COUNTER("Connection closes", gfnc
->counters
.connection_closes
);
324 SHOW_COUNTER("Connection errors", gfnc
->counters
.connection_errors
);
325 SHOW_COUNTER("Data plane items processed",
326 gfnc
->counters
.dplane_contexts
);
327 SHOW_COUNTER("Data plane items enqueued",
328 gfnc
->counters
.ctxqueue_len
);
329 SHOW_COUNTER("Data plane items queue peak",
330 gfnc
->counters
.ctxqueue_len_peak
);
331 SHOW_COUNTER("Buffer full hits", gfnc
->counters
.buffer_full
);
332 SHOW_COUNTER("User FPM configurations", gfnc
->counters
.user_configures
);
333 SHOW_COUNTER("User FPM disable requests", gfnc
->counters
.user_disables
);
340 DEFUN(fpm_show_counters_json
, fpm_show_counters_json_cmd
,
341 "show fpm counters json",
344 "FPM statistic counters\n"
347 struct json_object
*jo
;
349 jo
= json_object_new_object();
350 json_object_int_add(jo
, "bytes-read", gfnc
->counters
.bytes_read
);
351 json_object_int_add(jo
, "bytes-sent", gfnc
->counters
.bytes_sent
);
352 json_object_int_add(jo
, "obuf-bytes", gfnc
->counters
.obuf_bytes
);
353 json_object_int_add(jo
, "obuf-bytes-peak", gfnc
->counters
.obuf_peak
);
354 json_object_int_add(jo
, "connection-closes",
355 gfnc
->counters
.connection_closes
);
356 json_object_int_add(jo
, "connection-errors",
357 gfnc
->counters
.connection_errors
);
358 json_object_int_add(jo
, "data-plane-contexts",
359 gfnc
->counters
.dplane_contexts
);
360 json_object_int_add(jo
, "data-plane-contexts-queue",
361 gfnc
->counters
.ctxqueue_len
);
362 json_object_int_add(jo
, "data-plane-contexts-queue-peak",
363 gfnc
->counters
.ctxqueue_len_peak
);
364 json_object_int_add(jo
, "buffer-full-hits", gfnc
->counters
.buffer_full
);
365 json_object_int_add(jo
, "user-configures",
366 gfnc
->counters
.user_configures
);
367 json_object_int_add(jo
, "user-disables", gfnc
->counters
.user_disables
);
373 static int fpm_write_config(struct vty
*vty
)
375 struct sockaddr_in
*sin
;
376 struct sockaddr_in6
*sin6
;
382 switch (gfnc
->addr
.ss_family
) {
385 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
386 vty_out(vty
, "fpm address %pI4", &sin
->sin_addr
);
387 if (sin
->sin_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
388 vty_out(vty
, " port %d", ntohs(sin
->sin_port
));
394 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
395 vty_out(vty
, "fpm address %pI6", &sin6
->sin6_addr
);
396 if (sin6
->sin6_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
397 vty_out(vty
, " port %d", ntohs(sin6
->sin6_port
));
406 if (!gfnc
->use_nhg
) {
407 vty_out(vty
, "no fpm use-next-hop-groups\n");
414 static struct cmd_node fpm_node
= {
418 .config_write
= fpm_write_config
,
424 static void fpm_connect(struct thread
*t
);
426 static void fpm_reconnect(struct fpm_nl_ctx
*fnc
)
428 /* Cancel all zebra threads first. */
429 thread_cancel_async(zrouter
.master
, &fnc
->t_lspreset
, NULL
);
430 thread_cancel_async(zrouter
.master
, &fnc
->t_lspwalk
, NULL
);
431 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgreset
, NULL
);
432 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgwalk
, NULL
);
433 thread_cancel_async(zrouter
.master
, &fnc
->t_ribreset
, NULL
);
434 thread_cancel_async(zrouter
.master
, &fnc
->t_ribwalk
, NULL
);
435 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacreset
, NULL
);
436 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacwalk
, NULL
);
439 * Grab the lock to empty the streams (data plane might try to
440 * enqueue updates while we are closing).
442 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
444 /* Avoid calling close on `-1`. */
445 if (fnc
->socket
!= -1) {
450 stream_reset(fnc
->ibuf
);
451 stream_reset(fnc
->obuf
);
452 THREAD_OFF(fnc
->t_read
);
453 THREAD_OFF(fnc
->t_write
);
455 /* FPM is disabled, don't attempt to connect. */
459 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
463 static void fpm_read(struct thread
*t
)
465 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
469 struct nlmsghdr
*hdr
;
470 struct zebra_dplane_ctx
*ctx
;
471 size_t available_bytes
;
472 size_t hdr_available_bytes
;
474 /* Let's ignore the input at the moment. */
475 rv
= stream_read_try(fnc
->ibuf
, fnc
->socket
,
476 STREAM_WRITEABLE(fnc
->ibuf
));
478 atomic_fetch_add_explicit(&fnc
->counters
.connection_closes
, 1,
479 memory_order_relaxed
);
481 if (IS_ZEBRA_DEBUG_FPM
)
482 zlog_debug("%s: connection closed", __func__
);
488 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
489 memory_order_relaxed
);
490 zlog_warn("%s: connection failure: %s", __func__
,
496 /* Schedule the next read */
497 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, fnc
->socket
,
500 /* We've got an interruption. */
505 /* Account all bytes read. */
506 atomic_fetch_add_explicit(&fnc
->counters
.bytes_read
, rv
,
507 memory_order_relaxed
);
509 available_bytes
= STREAM_READABLE(fnc
->ibuf
);
510 while (available_bytes
) {
511 if (available_bytes
< (ssize_t
)FPM_MSG_HDR_LEN
) {
512 stream_pulldown(fnc
->ibuf
);
516 fpm
.version
= stream_getc(fnc
->ibuf
);
517 fpm
.msg_type
= stream_getc(fnc
->ibuf
);
518 fpm
.msg_len
= stream_getw(fnc
->ibuf
);
520 if (fpm
.version
!= FPM_PROTO_VERSION
&&
521 fpm
.msg_type
!= FPM_MSG_TYPE_NETLINK
) {
522 stream_reset(fnc
->ibuf
);
524 "%s: Received version/msg_type %u/%u, expected 1/1",
525 __func__
, fpm
.version
, fpm
.msg_type
);
532 * If the passed in length doesn't even fill in the header
533 * something is wrong and reset.
535 if (fpm
.msg_len
< FPM_MSG_HDR_LEN
) {
537 "%s: Received message length: %u that does not even fill the FPM header",
538 __func__
, fpm
.msg_len
);
544 * If we have not received the whole payload, reset the stream
545 * back to the beginning of the header and move it to the
548 if (fpm
.msg_len
> available_bytes
) {
549 stream_rewind_getp(fnc
->ibuf
, FPM_MSG_HDR_LEN
);
550 stream_pulldown(fnc
->ibuf
);
554 available_bytes
-= FPM_MSG_HDR_LEN
;
557 * Place the data from the stream into a buffer
559 hdr
= (struct nlmsghdr
*)buf
;
560 stream_get(buf
, fnc
->ibuf
, fpm
.msg_len
- FPM_MSG_HDR_LEN
);
561 hdr_available_bytes
= fpm
.msg_len
- FPM_MSG_HDR_LEN
;
562 available_bytes
-= hdr_available_bytes
;
564 /* Sanity check: must be at least header size. */
565 if (hdr
->nlmsg_len
< sizeof(*hdr
)) {
567 "%s: [seq=%u] invalid message length %u (< %zu)",
568 __func__
, hdr
->nlmsg_seq
, hdr
->nlmsg_len
,
572 if (hdr
->nlmsg_len
> fpm
.msg_len
) {
574 "%s: Received a inner header length of %u that is greater than the fpm total length of %u",
575 __func__
, hdr
->nlmsg_len
, fpm
.msg_len
);
578 /* Not enough bytes available. */
579 if (hdr
->nlmsg_len
> hdr_available_bytes
) {
581 "%s: [seq=%u] invalid message length %u (> %zu)",
582 __func__
, hdr
->nlmsg_seq
, hdr
->nlmsg_len
,
587 if (!(hdr
->nlmsg_flags
& NLM_F_REQUEST
)) {
588 if (IS_ZEBRA_DEBUG_FPM
)
590 "%s: [seq=%u] not a request, skipping",
591 __func__
, hdr
->nlmsg_seq
);
594 * This request is a bust, go to the next one
599 switch (hdr
->nlmsg_type
) {
601 ctx
= dplane_ctx_alloc();
602 dplane_ctx_set_op(ctx
, DPLANE_OP_ROUTE_NOTIFY
);
603 if (netlink_route_change_read_unicast_internal(
604 hdr
, 0, false, ctx
) != 1) {
605 dplane_ctx_fini(&ctx
);
606 stream_pulldown(fnc
->ibuf
);
608 * Let's continue to read other messages
609 * Even if we ignore this one.
614 if (IS_ZEBRA_DEBUG_FPM
)
616 "%s: Received message type %u which is not currently handled",
617 __func__
, hdr
->nlmsg_type
);
622 stream_reset(fnc
->ibuf
);
625 static void fpm_write(struct thread
*t
)
627 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
633 if (fnc
->connecting
== true) {
635 statuslen
= sizeof(status
);
637 rv
= getsockopt(fnc
->socket
, SOL_SOCKET
, SO_ERROR
, &status
,
639 if (rv
== -1 || status
!= 0) {
641 zlog_warn("%s: connection failed: %s", __func__
,
644 zlog_warn("%s: SO_ERROR failed: %s", __func__
,
647 atomic_fetch_add_explicit(
648 &fnc
->counters
.connection_errors
, 1,
649 memory_order_relaxed
);
655 fnc
->connecting
= false;
658 * Starting with LSPs walk all FPM objects, marking them
659 * as unsent and then replaying them.
661 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
664 /* Permit receiving messages now. */
665 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
,
666 fnc
->socket
, &fnc
->t_read
);
669 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
672 /* Stream is empty: reset pointers and return. */
673 if (STREAM_READABLE(fnc
->obuf
) == 0) {
674 stream_reset(fnc
->obuf
);
678 /* Try to write all at once. */
679 btotal
= stream_get_endp(fnc
->obuf
) -
680 stream_get_getp(fnc
->obuf
);
681 bwritten
= write(fnc
->socket
, stream_pnt(fnc
->obuf
), btotal
);
683 atomic_fetch_add_explicit(
684 &fnc
->counters
.connection_closes
, 1,
685 memory_order_relaxed
);
687 if (IS_ZEBRA_DEBUG_FPM
)
688 zlog_debug("%s: connection closed", __func__
);
691 if (bwritten
== -1) {
692 /* Attempt to continue if blocked by a signal. */
695 /* Receiver is probably slow, lets give it some time. */
696 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
699 atomic_fetch_add_explicit(
700 &fnc
->counters
.connection_errors
, 1,
701 memory_order_relaxed
);
702 zlog_warn("%s: connection failure: %s", __func__
,
709 /* Account all bytes sent. */
710 atomic_fetch_add_explicit(&fnc
->counters
.bytes_sent
, bwritten
,
711 memory_order_relaxed
);
713 /* Account number of bytes free. */
714 atomic_fetch_sub_explicit(&fnc
->counters
.obuf_bytes
, bwritten
,
715 memory_order_relaxed
);
717 stream_forward_getp(fnc
->obuf
, (size_t)bwritten
);
720 /* Stream is not empty yet, we must schedule more writes. */
721 if (STREAM_READABLE(fnc
->obuf
)) {
722 stream_pulldown(fnc
->obuf
);
723 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
,
724 fnc
->socket
, &fnc
->t_write
);
729 static void fpm_connect(struct thread
*t
)
731 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
732 struct sockaddr_in
*sin
= (struct sockaddr_in
*)&fnc
->addr
;
733 struct sockaddr_in6
*sin6
= (struct sockaddr_in6
*)&fnc
->addr
;
736 char addrstr
[INET6_ADDRSTRLEN
];
738 sock
= socket(fnc
->addr
.ss_family
, SOCK_STREAM
, 0);
740 zlog_err("%s: fpm socket failed: %s", __func__
,
742 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
747 set_nonblocking(sock
);
749 if (fnc
->addr
.ss_family
== AF_INET
) {
750 inet_ntop(AF_INET
, &sin
->sin_addr
, addrstr
, sizeof(addrstr
));
753 inet_ntop(AF_INET6
, &sin6
->sin6_addr
, addrstr
, sizeof(addrstr
));
754 slen
= sizeof(*sin6
);
757 if (IS_ZEBRA_DEBUG_FPM
)
758 zlog_debug("%s: attempting to connect to %s:%d", __func__
,
759 addrstr
, ntohs(sin
->sin_port
));
761 rv
= connect(sock
, (struct sockaddr
*)&fnc
->addr
, slen
);
762 if (rv
== -1 && errno
!= EINPROGRESS
) {
763 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
764 memory_order_relaxed
);
766 zlog_warn("%s: fpm connection failed: %s", __func__
,
768 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
773 fnc
->connecting
= (errno
== EINPROGRESS
);
775 if (!fnc
->connecting
)
776 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, sock
,
778 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, sock
,
782 * Starting with LSPs walk all FPM objects, marking them
783 * as unsent and then replaying them.
785 * If we are not connected, then delay the objects reset/send.
787 if (!fnc
->connecting
)
788 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
793 * Encode data plane operation context into netlink and enqueue it in the FPM
796 * @param fnc the netlink FPM context.
797 * @param ctx the data plane operation context data.
798 * @return 0 on success or -1 on not enough space.
800 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
)
802 uint8_t nl_buf
[NL_PKT_BUF_SIZE
];
805 uint64_t obytes
, obytes_peak
;
806 enum dplane_op_e op
= dplane_ctx_get_op(ctx
);
809 * If we were configured to not use next hop groups, then quit as soon
813 && (op
== DPLANE_OP_NH_DELETE
|| op
== DPLANE_OP_NH_INSTALL
814 || op
== DPLANE_OP_NH_UPDATE
))
819 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
822 case DPLANE_OP_ROUTE_UPDATE
:
823 case DPLANE_OP_ROUTE_DELETE
:
824 rv
= netlink_route_multipath_msg_encode(RTM_DELROUTE
, ctx
,
825 nl_buf
, sizeof(nl_buf
),
829 "%s: netlink_route_multipath_msg_encode failed",
834 nl_buf_len
= (size_t)rv
;
836 /* UPDATE operations need a INSTALL, otherwise just quit. */
837 if (op
== DPLANE_OP_ROUTE_DELETE
)
841 case DPLANE_OP_ROUTE_INSTALL
:
842 rv
= netlink_route_multipath_msg_encode(
843 RTM_NEWROUTE
, ctx
, &nl_buf
[nl_buf_len
],
844 sizeof(nl_buf
) - nl_buf_len
, true, fnc
->use_nhg
);
847 "%s: netlink_route_multipath_msg_encode failed",
852 nl_buf_len
+= (size_t)rv
;
855 case DPLANE_OP_MAC_INSTALL
:
856 case DPLANE_OP_MAC_DELETE
:
857 rv
= netlink_macfdb_update_ctx(ctx
, nl_buf
, sizeof(nl_buf
));
859 zlog_err("%s: netlink_macfdb_update_ctx failed",
864 nl_buf_len
= (size_t)rv
;
867 case DPLANE_OP_NH_DELETE
:
868 rv
= netlink_nexthop_msg_encode(RTM_DELNEXTHOP
, ctx
, nl_buf
,
869 sizeof(nl_buf
), true);
871 zlog_err("%s: netlink_nexthop_msg_encode failed",
876 nl_buf_len
= (size_t)rv
;
878 case DPLANE_OP_NH_INSTALL
:
879 case DPLANE_OP_NH_UPDATE
:
880 rv
= netlink_nexthop_msg_encode(RTM_NEWNEXTHOP
, ctx
, nl_buf
,
881 sizeof(nl_buf
), true);
883 zlog_err("%s: netlink_nexthop_msg_encode failed",
888 nl_buf_len
= (size_t)rv
;
891 case DPLANE_OP_LSP_INSTALL
:
892 case DPLANE_OP_LSP_UPDATE
:
893 case DPLANE_OP_LSP_DELETE
:
894 rv
= netlink_lsp_msg_encoder(ctx
, nl_buf
, sizeof(nl_buf
));
896 zlog_err("%s: netlink_lsp_msg_encoder failed",
901 nl_buf_len
+= (size_t)rv
;
904 /* Un-handled by FPM at this time. */
905 case DPLANE_OP_PW_INSTALL
:
906 case DPLANE_OP_PW_UNINSTALL
:
907 case DPLANE_OP_ADDR_INSTALL
:
908 case DPLANE_OP_ADDR_UNINSTALL
:
909 case DPLANE_OP_NEIGH_INSTALL
:
910 case DPLANE_OP_NEIGH_UPDATE
:
911 case DPLANE_OP_NEIGH_DELETE
:
912 case DPLANE_OP_VTEP_ADD
:
913 case DPLANE_OP_VTEP_DELETE
:
914 case DPLANE_OP_SYS_ROUTE_ADD
:
915 case DPLANE_OP_SYS_ROUTE_DELETE
:
916 case DPLANE_OP_ROUTE_NOTIFY
:
917 case DPLANE_OP_LSP_NOTIFY
:
918 case DPLANE_OP_RULE_ADD
:
919 case DPLANE_OP_RULE_DELETE
:
920 case DPLANE_OP_RULE_UPDATE
:
921 case DPLANE_OP_NEIGH_DISCOVER
:
922 case DPLANE_OP_BR_PORT_UPDATE
:
923 case DPLANE_OP_IPTABLE_ADD
:
924 case DPLANE_OP_IPTABLE_DELETE
:
925 case DPLANE_OP_IPSET_ADD
:
926 case DPLANE_OP_IPSET_DELETE
:
927 case DPLANE_OP_IPSET_ENTRY_ADD
:
928 case DPLANE_OP_IPSET_ENTRY_DELETE
:
929 case DPLANE_OP_NEIGH_IP_INSTALL
:
930 case DPLANE_OP_NEIGH_IP_DELETE
:
931 case DPLANE_OP_NEIGH_TABLE_UPDATE
:
932 case DPLANE_OP_GRE_SET
:
933 case DPLANE_OP_INTF_ADDR_ADD
:
934 case DPLANE_OP_INTF_ADDR_DEL
:
935 case DPLANE_OP_INTF_NETCONFIG
:
936 case DPLANE_OP_INTF_INSTALL
:
937 case DPLANE_OP_INTF_UPDATE
:
938 case DPLANE_OP_INTF_DELETE
:
939 case DPLANE_OP_TC_QDISC_INSTALL
:
940 case DPLANE_OP_TC_QDISC_UNINSTALL
:
941 case DPLANE_OP_TC_CLASS_ADD
:
942 case DPLANE_OP_TC_CLASS_DELETE
:
943 case DPLANE_OP_TC_CLASS_UPDATE
:
944 case DPLANE_OP_TC_FILTER_ADD
:
945 case DPLANE_OP_TC_FILTER_DELETE
:
946 case DPLANE_OP_TC_FILTER_UPDATE
:
952 /* Skip empty enqueues. */
956 /* We must know if someday a message goes beyond 65KiB. */
957 assert((nl_buf_len
+ FPM_HEADER_SIZE
) <= UINT16_MAX
);
959 /* Check if we have enough buffer space. */
960 if (STREAM_WRITEABLE(fnc
->obuf
) < (nl_buf_len
+ FPM_HEADER_SIZE
)) {
961 atomic_fetch_add_explicit(&fnc
->counters
.buffer_full
, 1,
962 memory_order_relaxed
);
964 if (IS_ZEBRA_DEBUG_FPM
)
966 "%s: buffer full: wants to write %zu but has %zu",
967 __func__
, nl_buf_len
+ FPM_HEADER_SIZE
,
968 STREAM_WRITEABLE(fnc
->obuf
));
974 * Fill in the FPM header information.
976 * See FPM_HEADER_SIZE definition for more information.
978 stream_putc(fnc
->obuf
, 1);
979 stream_putc(fnc
->obuf
, 1);
980 stream_putw(fnc
->obuf
, nl_buf_len
+ FPM_HEADER_SIZE
);
982 /* Write current data. */
983 stream_write(fnc
->obuf
, nl_buf
, (size_t)nl_buf_len
);
985 /* Account number of bytes waiting to be written. */
986 atomic_fetch_add_explicit(&fnc
->counters
.obuf_bytes
,
987 nl_buf_len
+ FPM_HEADER_SIZE
,
988 memory_order_relaxed
);
989 obytes
= atomic_load_explicit(&fnc
->counters
.obuf_bytes
,
990 memory_order_relaxed
);
991 obytes_peak
= atomic_load_explicit(&fnc
->counters
.obuf_peak
,
992 memory_order_relaxed
);
993 if (obytes_peak
< obytes
)
994 atomic_store_explicit(&fnc
->counters
.obuf_peak
, obytes
,
995 memory_order_relaxed
);
997 /* Tell the thread to start writing. */
998 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, fnc
->socket
,
1005 * LSP walk/send functions
1007 struct fpm_lsp_arg
{
1008 struct zebra_dplane_ctx
*ctx
;
1009 struct fpm_nl_ctx
*fnc
;
1013 static int fpm_lsp_send_cb(struct hash_bucket
*bucket
, void *arg
)
1015 struct zebra_lsp
*lsp
= bucket
->data
;
1016 struct fpm_lsp_arg
*fla
= arg
;
1018 /* Skip entries which have already been sent */
1019 if (CHECK_FLAG(lsp
->flags
, LSP_FLAG_FPM
))
1020 return HASHWALK_CONTINUE
;
1022 dplane_ctx_reset(fla
->ctx
);
1023 dplane_ctx_lsp_init(fla
->ctx
, DPLANE_OP_LSP_INSTALL
, lsp
);
1025 if (fpm_nl_enqueue(fla
->fnc
, fla
->ctx
) == -1) {
1026 fla
->complete
= false;
1027 return HASHWALK_ABORT
;
1030 /* Mark entry as sent */
1031 SET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
1032 return HASHWALK_CONTINUE
;
1035 static void fpm_lsp_send(struct thread
*t
)
1037 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1038 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
1039 struct fpm_lsp_arg fla
;
1042 fla
.ctx
= dplane_ctx_alloc();
1043 fla
.complete
= true;
1045 hash_walk(zvrf
->lsp_table
, fpm_lsp_send_cb
, &fla
);
1047 dplane_ctx_fini(&fla
.ctx
);
1050 WALK_FINISH(fnc
, FNE_LSP_FINISHED
);
1052 /* Now move onto routes */
1053 thread_add_timer(zrouter
.master
, fpm_nhg_reset
, fnc
, 0,
1056 /* Didn't finish - reschedule LSP walk */
1057 thread_add_timer(zrouter
.master
, fpm_lsp_send
, fnc
, 0,
1063 * Next hop walk/send functions.
1065 struct fpm_nhg_arg
{
1066 struct zebra_dplane_ctx
*ctx
;
1067 struct fpm_nl_ctx
*fnc
;
1071 static int fpm_nhg_send_cb(struct hash_bucket
*bucket
, void *arg
)
1073 struct nhg_hash_entry
*nhe
= bucket
->data
;
1074 struct fpm_nhg_arg
*fna
= arg
;
1076 /* This entry was already sent, skip it. */
1077 if (CHECK_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
))
1078 return HASHWALK_CONTINUE
;
1080 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
1081 dplane_ctx_reset(fna
->ctx
);
1082 dplane_ctx_nexthop_init(fna
->ctx
, DPLANE_OP_NH_INSTALL
, nhe
);
1083 if (fpm_nl_enqueue(fna
->fnc
, fna
->ctx
) == -1) {
1084 /* Our buffers are full, lets give it some cycles. */
1085 fna
->complete
= false;
1086 return HASHWALK_ABORT
;
1089 /* Mark group as sent, so it doesn't get sent again. */
1090 SET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
1092 return HASHWALK_CONTINUE
;
1095 static void fpm_nhg_send(struct thread
*t
)
1097 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1098 struct fpm_nhg_arg fna
;
1101 fna
.ctx
= dplane_ctx_alloc();
1102 fna
.complete
= true;
1104 /* Send next hops. */
1106 hash_walk(zrouter
.nhgs_id
, fpm_nhg_send_cb
, &fna
);
1108 /* `free()` allocated memory. */
1109 dplane_ctx_fini(&fna
.ctx
);
1111 /* We are done sending next hops, lets install the routes now. */
1113 WALK_FINISH(fnc
, FNE_NHG_FINISHED
);
1114 thread_add_timer(zrouter
.master
, fpm_rib_reset
, fnc
, 0,
1116 } else /* Otherwise reschedule next hop group again. */
1117 thread_add_timer(zrouter
.master
, fpm_nhg_send
, fnc
, 0,
1122 * Send all RIB installed routes to the connected data plane.
1124 static void fpm_rib_send(struct thread
*t
)
1126 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1128 struct route_node
*rn
;
1129 struct route_table
*rt
;
1130 struct zebra_dplane_ctx
*ctx
;
1131 rib_tables_iter_t rt_iter
;
1133 /* Allocate temporary context for all transactions. */
1134 ctx
= dplane_ctx_alloc();
1136 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1137 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1138 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1139 dest
= rib_dest_from_rnode(rn
);
1140 /* Skip bad route entries. */
1141 if (dest
== NULL
|| dest
->selected_fib
== NULL
)
1144 /* Check for already sent routes. */
1145 if (CHECK_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
))
1148 /* Enqueue route install. */
1149 dplane_ctx_reset(ctx
);
1150 dplane_ctx_route_init(ctx
, DPLANE_OP_ROUTE_INSTALL
, rn
,
1151 dest
->selected_fib
);
1152 if (fpm_nl_enqueue(fnc
, ctx
) == -1) {
1153 /* Free the temporary allocated context. */
1154 dplane_ctx_fini(&ctx
);
1156 thread_add_timer(zrouter
.master
, fpm_rib_send
,
1157 fnc
, 1, &fnc
->t_ribwalk
);
1162 SET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1166 /* Free the temporary allocated context. */
1167 dplane_ctx_fini(&ctx
);
1169 /* All RIB routes sent! */
1170 WALK_FINISH(fnc
, FNE_RIB_FINISHED
);
1172 /* Schedule next event: RMAC reset. */
1173 thread_add_event(zrouter
.master
, fpm_rmac_reset
, fnc
, 0,
1178 * The next three functions will handle RMAC enqueue.
1180 struct fpm_rmac_arg
{
1181 struct zebra_dplane_ctx
*ctx
;
1182 struct fpm_nl_ctx
*fnc
;
1183 struct zebra_l3vni
*zl3vni
;
1187 static void fpm_enqueue_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1189 struct fpm_rmac_arg
*fra
= arg
;
1190 struct zebra_mac
*zrmac
= bucket
->data
;
1191 struct zebra_if
*zif
= fra
->zl3vni
->vxlan_if
->info
;
1192 const struct zebra_l2info_vxlan
*vxl
= &zif
->l2info
.vxl
;
1193 struct zebra_if
*br_zif
;
1197 /* Entry already sent. */
1198 if (CHECK_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
) || !fra
->complete
)
1201 sticky
= !!CHECK_FLAG(zrmac
->flags
,
1202 (ZEBRA_MAC_STICKY
| ZEBRA_MAC_REMOTE_DEF_GW
));
1203 br_zif
= (struct zebra_if
*)(zif
->brslave_info
.br_if
->info
);
1204 vid
= IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif
) ? vxl
->access_vlan
: 0;
1206 dplane_ctx_reset(fra
->ctx
);
1207 dplane_ctx_set_op(fra
->ctx
, DPLANE_OP_MAC_INSTALL
);
1208 dplane_mac_init(fra
->ctx
, fra
->zl3vni
->vxlan_if
,
1209 zif
->brslave_info
.br_if
, vid
,
1210 &zrmac
->macaddr
, zrmac
->fwd_info
.r_vtep_ip
, sticky
,
1211 0 /*nhg*/, 0 /*update_flags*/);
1212 if (fpm_nl_enqueue(fra
->fnc
, fra
->ctx
) == -1) {
1213 thread_add_timer(zrouter
.master
, fpm_rmac_send
,
1214 fra
->fnc
, 1, &fra
->fnc
->t_rmacwalk
);
1215 fra
->complete
= false;
1219 static void fpm_enqueue_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1221 struct fpm_rmac_arg
*fra
= arg
;
1222 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1224 fra
->zl3vni
= zl3vni
;
1225 hash_iterate(zl3vni
->rmac_table
, fpm_enqueue_rmac_table
, zl3vni
);
1228 static void fpm_rmac_send(struct thread
*t
)
1230 struct fpm_rmac_arg fra
;
1232 fra
.fnc
= THREAD_ARG(t
);
1233 fra
.ctx
= dplane_ctx_alloc();
1234 fra
.complete
= true;
1235 hash_iterate(zrouter
.l3vni_table
, fpm_enqueue_l3vni_table
, &fra
);
1236 dplane_ctx_fini(&fra
.ctx
);
1238 /* RMAC walk completed. */
1240 WALK_FINISH(fra
.fnc
, FNE_RMAC_FINISHED
);
1244 * Resets the next hop FPM flags so we send all next hops again.
1246 static void fpm_nhg_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1248 struct nhg_hash_entry
*nhe
= bucket
->data
;
1250 /* Unset FPM installation flag so it gets installed again. */
1251 UNSET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
1254 static void fpm_nhg_reset(struct thread
*t
)
1256 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1258 hash_iterate(zrouter
.nhgs_id
, fpm_nhg_reset_cb
, NULL
);
1260 /* Schedule next step: send next hop groups. */
1261 thread_add_event(zrouter
.master
, fpm_nhg_send
, fnc
, 0, &fnc
->t_nhgwalk
);
1265 * Resets the LSP FPM flag so we send all LSPs again.
1267 static void fpm_lsp_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1269 struct zebra_lsp
*lsp
= bucket
->data
;
1271 UNSET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
1274 static void fpm_lsp_reset(struct thread
*t
)
1276 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1277 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
1279 hash_iterate(zvrf
->lsp_table
, fpm_lsp_reset_cb
, NULL
);
1281 /* Schedule next step: send LSPs */
1282 thread_add_event(zrouter
.master
, fpm_lsp_send
, fnc
, 0, &fnc
->t_lspwalk
);
1286 * Resets the RIB FPM flags so we send all routes again.
1288 static void fpm_rib_reset(struct thread
*t
)
1290 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1292 struct route_node
*rn
;
1293 struct route_table
*rt
;
1294 rib_tables_iter_t rt_iter
;
1296 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1297 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1298 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1299 dest
= rib_dest_from_rnode(rn
);
1300 /* Skip bad route entries. */
1304 UNSET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1308 /* Schedule next step: send RIB routes. */
1309 thread_add_event(zrouter
.master
, fpm_rib_send
, fnc
, 0, &fnc
->t_ribwalk
);
1313 * The next three function will handle RMAC table reset.
1315 static void fpm_unset_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1317 struct zebra_mac
*zrmac
= bucket
->data
;
1319 UNSET_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
);
1322 static void fpm_unset_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1324 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1326 hash_iterate(zl3vni
->rmac_table
, fpm_unset_rmac_table
, zl3vni
);
1329 static void fpm_rmac_reset(struct thread
*t
)
1331 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1333 hash_iterate(zrouter
.l3vni_table
, fpm_unset_l3vni_table
, NULL
);
1335 /* Schedule next event: send RMAC entries. */
1336 thread_add_event(zrouter
.master
, fpm_rmac_send
, fnc
, 0,
1340 static void fpm_process_queue(struct thread
*t
)
1342 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1343 struct zebra_dplane_ctx
*ctx
;
1344 bool no_bufs
= false;
1345 uint64_t processed_contexts
= 0;
1348 /* No space available yet. */
1349 if (STREAM_WRITEABLE(fnc
->obuf
) < NL_PKT_BUF_SIZE
) {
1354 /* Dequeue next item or quit processing. */
1355 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1356 ctx
= dplane_ctx_dequeue(&fnc
->ctxqueue
);
1362 * Intentionally ignoring the return value
1363 * as that we are ensuring that we can write to
1364 * the output data in the STREAM_WRITEABLE
1365 * check above, so we can ignore the return
1367 if (fnc
->socket
!= -1)
1368 (void)fpm_nl_enqueue(fnc
, ctx
);
1370 /* Account the processed entries. */
1371 processed_contexts
++;
1372 atomic_fetch_sub_explicit(&fnc
->counters
.ctxqueue_len
, 1,
1373 memory_order_relaxed
);
1375 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1376 dplane_provider_enqueue_out_ctx(fnc
->prov
, ctx
);
1379 /* Update count of processed contexts */
1380 atomic_fetch_add_explicit(&fnc
->counters
.dplane_contexts
,
1381 processed_contexts
, memory_order_relaxed
);
1383 /* Re-schedule if we ran out of buffer space */
1385 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1386 fnc
, 0, &fnc
->t_dequeue
);
1389 * Let the dataplane thread know if there are items in the
1390 * output queue to be processed. Otherwise they may sit
1391 * until the dataplane thread gets scheduled for new,
1394 if (dplane_provider_out_ctx_queue_len(fnc
->prov
) > 0)
1395 dplane_provider_work_ready();
1399 * Handles external (e.g. CLI, data plane or others) events.
1401 static void fpm_process_event(struct thread
*t
)
1403 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1404 enum fpm_nl_events event
= THREAD_VAL(t
);
1408 zlog_info("%s: manual FPM disable event", __func__
);
1409 fnc
->disabled
= true;
1410 atomic_fetch_add_explicit(&fnc
->counters
.user_disables
, 1,
1411 memory_order_relaxed
);
1413 /* Call reconnect to disable timers and clean up context. */
1418 zlog_info("%s: manual FPM reconnect event", __func__
);
1419 fnc
->disabled
= false;
1420 atomic_fetch_add_explicit(&fnc
->counters
.user_configures
, 1,
1421 memory_order_relaxed
);
1425 case FNE_RESET_COUNTERS
:
1426 zlog_info("%s: manual FPM counters reset event", __func__
);
1427 memset(&fnc
->counters
, 0, sizeof(fnc
->counters
));
1430 case FNE_TOGGLE_NHG
:
1431 zlog_info("%s: toggle next hop groups support", __func__
);
1432 fnc
->use_nhg
= !fnc
->use_nhg
;
1436 case FNE_INTERNAL_RECONNECT
:
1440 case FNE_NHG_FINISHED
:
1441 if (IS_ZEBRA_DEBUG_FPM
)
1442 zlog_debug("%s: next hop groups walk finished",
1445 case FNE_RIB_FINISHED
:
1446 if (IS_ZEBRA_DEBUG_FPM
)
1447 zlog_debug("%s: RIB walk finished", __func__
);
1449 case FNE_RMAC_FINISHED
:
1450 if (IS_ZEBRA_DEBUG_FPM
)
1451 zlog_debug("%s: RMAC walk finished", __func__
);
1453 case FNE_LSP_FINISHED
:
1454 if (IS_ZEBRA_DEBUG_FPM
)
1455 zlog_debug("%s: LSP walk finished", __func__
);
1461 * Data plane functions.
1463 static int fpm_nl_start(struct zebra_dplane_provider
*prov
)
1465 struct fpm_nl_ctx
*fnc
;
1467 fnc
= dplane_provider_get_data(prov
);
1468 fnc
->fthread
= frr_pthread_new(NULL
, prov_name
, prov_name
);
1469 assert(frr_pthread_run(fnc
->fthread
, NULL
) == 0);
1470 fnc
->ibuf
= stream_new(NL_PKT_BUF_SIZE
);
1471 fnc
->obuf
= stream_new(NL_PKT_BUF_SIZE
* 128);
1472 pthread_mutex_init(&fnc
->obuf_mutex
, NULL
);
1474 fnc
->disabled
= true;
1476 dplane_ctx_q_init(&fnc
->ctxqueue
);
1477 pthread_mutex_init(&fnc
->ctxqueue_mutex
, NULL
);
1479 /* Set default values. */
1480 fnc
->use_nhg
= true;
1485 static int fpm_nl_finish_early(struct fpm_nl_ctx
*fnc
)
1487 /* Disable all events and close socket. */
1488 THREAD_OFF(fnc
->t_lspreset
);
1489 THREAD_OFF(fnc
->t_lspwalk
);
1490 THREAD_OFF(fnc
->t_nhgreset
);
1491 THREAD_OFF(fnc
->t_nhgwalk
);
1492 THREAD_OFF(fnc
->t_ribreset
);
1493 THREAD_OFF(fnc
->t_ribwalk
);
1494 THREAD_OFF(fnc
->t_rmacreset
);
1495 THREAD_OFF(fnc
->t_rmacwalk
);
1496 THREAD_OFF(fnc
->t_event
);
1497 THREAD_OFF(fnc
->t_nhg
);
1498 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_read
, NULL
);
1499 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_write
, NULL
);
1500 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_connect
, NULL
);
1502 if (fnc
->socket
!= -1) {
1510 static int fpm_nl_finish_late(struct fpm_nl_ctx
*fnc
)
1512 /* Stop the running thread. */
1513 frr_pthread_stop(fnc
->fthread
, NULL
);
1515 /* Free all allocated resources. */
1516 pthread_mutex_destroy(&fnc
->obuf_mutex
);
1517 pthread_mutex_destroy(&fnc
->ctxqueue_mutex
);
1518 stream_free(fnc
->ibuf
);
1519 stream_free(fnc
->obuf
);
1526 static int fpm_nl_finish(struct zebra_dplane_provider
*prov
, bool early
)
1528 struct fpm_nl_ctx
*fnc
;
1530 fnc
= dplane_provider_get_data(prov
);
1532 return fpm_nl_finish_early(fnc
);
1534 return fpm_nl_finish_late(fnc
);
1537 static int fpm_nl_process(struct zebra_dplane_provider
*prov
)
1539 struct zebra_dplane_ctx
*ctx
;
1540 struct fpm_nl_ctx
*fnc
;
1542 uint64_t cur_queue
, peak_queue
= 0, stored_peak_queue
;
1544 fnc
= dplane_provider_get_data(prov
);
1545 limit
= dplane_provider_get_work_limit(prov
);
1546 for (counter
= 0; counter
< limit
; counter
++) {
1547 ctx
= dplane_provider_dequeue_in_ctx(prov
);
1552 * Skip all notifications if not connected, we'll walk the RIB
1555 if (fnc
->socket
!= -1 && fnc
->connecting
== false) {
1557 * Update the number of queued contexts *before*
1558 * enqueueing, to ensure counter consistency.
1560 atomic_fetch_add_explicit(&fnc
->counters
.ctxqueue_len
,
1561 1, memory_order_relaxed
);
1563 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1564 dplane_ctx_enqueue_tail(&fnc
->ctxqueue
, ctx
);
1567 cur_queue
= atomic_load_explicit(
1568 &fnc
->counters
.ctxqueue_len
,
1569 memory_order_relaxed
);
1570 if (peak_queue
< cur_queue
)
1571 peak_queue
= cur_queue
;
1575 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1576 dplane_provider_enqueue_out_ctx(prov
, ctx
);
1579 /* Update peak queue length, if we just observed a new peak */
1580 stored_peak_queue
= atomic_load_explicit(
1581 &fnc
->counters
.ctxqueue_len_peak
, memory_order_relaxed
);
1582 if (stored_peak_queue
< peak_queue
)
1583 atomic_store_explicit(&fnc
->counters
.ctxqueue_len_peak
,
1584 peak_queue
, memory_order_relaxed
);
1586 if (atomic_load_explicit(&fnc
->counters
.ctxqueue_len
,
1587 memory_order_relaxed
)
1589 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1590 fnc
, 0, &fnc
->t_dequeue
);
1592 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1593 if (counter
>= limit
)
1594 dplane_provider_work_ready();
1599 static int fpm_nl_new(struct thread_master
*tm
)
1601 struct zebra_dplane_provider
*prov
= NULL
;
1604 gfnc
= calloc(1, sizeof(*gfnc
));
1605 rv
= dplane_provider_register(prov_name
, DPLANE_PRIO_POSTPROCESS
,
1606 DPLANE_PROV_FLAG_THREADED
, fpm_nl_start
,
1607 fpm_nl_process
, fpm_nl_finish
, gfnc
,
1610 if (IS_ZEBRA_DEBUG_DPLANE
)
1611 zlog_debug("%s register status: %d", prov_name
, rv
);
1613 install_node(&fpm_node
);
1614 install_element(ENABLE_NODE
, &fpm_show_counters_cmd
);
1615 install_element(ENABLE_NODE
, &fpm_show_counters_json_cmd
);
1616 install_element(ENABLE_NODE
, &fpm_reset_counters_cmd
);
1617 install_element(CONFIG_NODE
, &fpm_set_address_cmd
);
1618 install_element(CONFIG_NODE
, &no_fpm_set_address_cmd
);
1619 install_element(CONFIG_NODE
, &fpm_use_nhg_cmd
);
1620 install_element(CONFIG_NODE
, &no_fpm_use_nhg_cmd
);
1625 static int fpm_nl_init(void)
1627 hook_register(frr_late_init
, fpm_nl_new
);
1632 .name
= "dplane_fpm_nl",
1634 .description
= "Data plane plugin for FPM using netlink.",
1635 .init
= fpm_nl_init
,