2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "config.h" /* Include this explicitly */
26 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
34 #include "lib/zebra.h"
36 #include "lib/libfrr.h"
37 #include "lib/frratomic.h"
38 #include "lib/command.h"
39 #include "lib/memory.h"
40 #include "lib/network.h"
42 #include "lib/frr_pthread.h"
43 #include "zebra/debug.h"
44 #include "zebra/interface.h"
45 #include "zebra/zebra_dplane.h"
46 #include "zebra/zebra_mpls.h"
47 #include "zebra/zebra_router.h"
48 #include "zebra/interface.h"
49 #include "zebra/zebra_vxlan_private.h"
50 #include "zebra/zebra_evpn.h"
51 #include "zebra/zebra_evpn_mac.h"
52 #include "zebra/kernel_netlink.h"
53 #include "zebra/rt_netlink.h"
54 #include "zebra/debug.h"
57 #define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
58 #define SOUTHBOUND_DEFAULT_PORT 2620
63 * version: 1 byte (always 1),
64 * type: 1 byte (1 for netlink, 2 protobuf),
65 * len: 2 bytes (network order),
68 * This header is used with any format to tell the users how many bytes to
71 #define FPM_HEADER_SIZE 4
73 static const char *prov_name
= "dplane_fpm_nl";
76 /* data plane connection. */
81 struct sockaddr_storage addr
;
83 /* data plane buffers. */
86 pthread_mutex_t obuf_mutex
;
89 * data plane context queue:
90 * When a FPM server connection becomes a bottleneck, we must keep the
91 * data plane contexts until we get a chance to process them.
93 struct dplane_ctx_list_head ctxqueue
;
94 pthread_mutex_t ctxqueue_mutex
;
96 /* data plane events. */
97 struct zebra_dplane_provider
*prov
;
98 struct frr_pthread
*fthread
;
99 struct thread
*t_connect
;
100 struct thread
*t_read
;
101 struct thread
*t_write
;
102 struct thread
*t_event
;
103 struct thread
*t_nhg
;
104 struct thread
*t_dequeue
;
107 struct thread
*t_lspreset
;
108 struct thread
*t_lspwalk
;
109 struct thread
*t_nhgreset
;
110 struct thread
*t_nhgwalk
;
111 struct thread
*t_ribreset
;
112 struct thread
*t_ribwalk
;
113 struct thread
*t_rmacreset
;
114 struct thread
*t_rmacwalk
;
116 /* Statistic counters. */
118 /* Amount of bytes read into ibuf. */
119 _Atomic
uint32_t bytes_read
;
120 /* Amount of bytes written from obuf. */
121 _Atomic
uint32_t bytes_sent
;
122 /* Output buffer current usage. */
123 _Atomic
uint32_t obuf_bytes
;
124 /* Output buffer peak usage. */
125 _Atomic
uint32_t obuf_peak
;
127 /* Amount of connection closes. */
128 _Atomic
uint32_t connection_closes
;
129 /* Amount of connection errors. */
130 _Atomic
uint32_t connection_errors
;
132 /* Amount of user configurations: FNE_RECONNECT. */
133 _Atomic
uint32_t user_configures
;
134 /* Amount of user disable requests: FNE_DISABLE. */
135 _Atomic
uint32_t user_disables
;
137 /* Amount of data plane context processed. */
138 _Atomic
uint32_t dplane_contexts
;
139 /* Amount of data plane contexts enqueued. */
140 _Atomic
uint32_t ctxqueue_len
;
141 /* Peak amount of data plane contexts enqueued. */
142 _Atomic
uint32_t ctxqueue_len_peak
;
144 /* Amount of buffer full events. */
145 _Atomic
uint32_t buffer_full
;
150 /* Ask for FPM to reconnect the external server. */
154 /* Reset counters. */
156 /* Toggle next hop group feature. */
158 /* Reconnect request by our own code to avoid races. */
159 FNE_INTERNAL_RECONNECT
,
161 /* LSP walk finished. */
163 /* Next hop groups walk finished. */
165 /* RIB walk finished. */
167 /* RMAC walk finished. */
171 #define FPM_RECONNECT(fnc) \
172 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
173 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
175 #define WALK_FINISH(fnc, ev) \
176 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
182 static void fpm_process_event(struct thread
*t
);
183 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
);
184 static void fpm_lsp_send(struct thread
*t
);
185 static void fpm_lsp_reset(struct thread
*t
);
186 static void fpm_nhg_send(struct thread
*t
);
187 static void fpm_nhg_reset(struct thread
*t
);
188 static void fpm_rib_send(struct thread
*t
);
189 static void fpm_rib_reset(struct thread
*t
);
190 static void fpm_rmac_send(struct thread
*t
);
191 static void fpm_rmac_reset(struct thread
*t
);
196 #define FPM_STR "Forwarding Plane Manager configuration\n"
198 DEFUN(fpm_set_address
, fpm_set_address_cmd
,
199 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
201 "FPM remote listening server address\n"
202 "Remote IPv4 FPM server\n"
203 "Remote IPv6 FPM server\n"
204 "FPM remote listening server port\n"
205 "Remote FPM server port\n")
207 struct sockaddr_in
*sin
;
208 struct sockaddr_in6
*sin6
;
210 uint8_t naddr
[INET6_BUFSIZ
];
213 port
= strtol(argv
[4]->arg
, NULL
, 10);
215 /* Handle IPv4 addresses. */
216 if (inet_pton(AF_INET
, argv
[2]->arg
, naddr
) == 1) {
217 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
219 memset(sin
, 0, sizeof(*sin
));
220 sin
->sin_family
= AF_INET
;
222 port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
223 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
224 sin
->sin_len
= sizeof(*sin
);
225 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
226 memcpy(&sin
->sin_addr
, naddr
, sizeof(sin
->sin_addr
));
231 /* Handle IPv6 addresses. */
232 if (inet_pton(AF_INET6
, argv
[2]->arg
, naddr
) != 1) {
233 vty_out(vty
, "%% Invalid address: %s\n", argv
[2]->arg
);
237 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
238 memset(sin6
, 0, sizeof(*sin6
));
239 sin6
->sin6_family
= AF_INET6
;
240 sin6
->sin6_port
= port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
241 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
242 sin6
->sin6_len
= sizeof(*sin6
);
243 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
244 memcpy(&sin6
->sin6_addr
, naddr
, sizeof(sin6
->sin6_addr
));
247 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
248 FNE_RECONNECT
, &gfnc
->t_event
);
252 DEFUN(no_fpm_set_address
, no_fpm_set_address_cmd
,
253 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
256 "FPM remote listening server address\n"
257 "Remote IPv4 FPM server\n"
258 "Remote IPv6 FPM server\n"
259 "FPM remote listening server port\n"
260 "Remote FPM server port\n")
262 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
263 FNE_DISABLE
, &gfnc
->t_event
);
267 DEFUN(fpm_use_nhg
, fpm_use_nhg_cmd
,
268 "fpm use-next-hop-groups",
270 "Use netlink next hop groups feature.\n")
272 /* Already enabled. */
276 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
277 FNE_TOGGLE_NHG
, &gfnc
->t_nhg
);
282 DEFUN(no_fpm_use_nhg
, no_fpm_use_nhg_cmd
,
283 "no fpm use-next-hop-groups",
286 "Use netlink next hop groups feature.\n")
288 /* Already disabled. */
292 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
293 FNE_TOGGLE_NHG
, &gfnc
->t_nhg
);
298 DEFUN(fpm_reset_counters
, fpm_reset_counters_cmd
,
299 "clear fpm counters",
302 "FPM statistic counters\n")
304 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
305 FNE_RESET_COUNTERS
, &gfnc
->t_event
);
309 DEFUN(fpm_show_counters
, fpm_show_counters_cmd
,
313 "FPM statistic counters\n")
315 vty_out(vty
, "%30s\n%30s\n", "FPM counters", "============");
317 #define SHOW_COUNTER(label, counter) \
318 vty_out(vty, "%28s: %u\n", (label), (counter))
320 SHOW_COUNTER("Input bytes", gfnc
->counters
.bytes_read
);
321 SHOW_COUNTER("Output bytes", gfnc
->counters
.bytes_sent
);
322 SHOW_COUNTER("Output buffer current size", gfnc
->counters
.obuf_bytes
);
323 SHOW_COUNTER("Output buffer peak size", gfnc
->counters
.obuf_peak
);
324 SHOW_COUNTER("Connection closes", gfnc
->counters
.connection_closes
);
325 SHOW_COUNTER("Connection errors", gfnc
->counters
.connection_errors
);
326 SHOW_COUNTER("Data plane items processed",
327 gfnc
->counters
.dplane_contexts
);
328 SHOW_COUNTER("Data plane items enqueued",
329 gfnc
->counters
.ctxqueue_len
);
330 SHOW_COUNTER("Data plane items queue peak",
331 gfnc
->counters
.ctxqueue_len_peak
);
332 SHOW_COUNTER("Buffer full hits", gfnc
->counters
.buffer_full
);
333 SHOW_COUNTER("User FPM configurations", gfnc
->counters
.user_configures
);
334 SHOW_COUNTER("User FPM disable requests", gfnc
->counters
.user_disables
);
341 DEFUN(fpm_show_counters_json
, fpm_show_counters_json_cmd
,
342 "show fpm counters json",
345 "FPM statistic counters\n"
348 struct json_object
*jo
;
350 jo
= json_object_new_object();
351 json_object_int_add(jo
, "bytes-read", gfnc
->counters
.bytes_read
);
352 json_object_int_add(jo
, "bytes-sent", gfnc
->counters
.bytes_sent
);
353 json_object_int_add(jo
, "obuf-bytes", gfnc
->counters
.obuf_bytes
);
354 json_object_int_add(jo
, "obuf-bytes-peak", gfnc
->counters
.obuf_peak
);
355 json_object_int_add(jo
, "connection-closes",
356 gfnc
->counters
.connection_closes
);
357 json_object_int_add(jo
, "connection-errors",
358 gfnc
->counters
.connection_errors
);
359 json_object_int_add(jo
, "data-plane-contexts",
360 gfnc
->counters
.dplane_contexts
);
361 json_object_int_add(jo
, "data-plane-contexts-queue",
362 gfnc
->counters
.ctxqueue_len
);
363 json_object_int_add(jo
, "data-plane-contexts-queue-peak",
364 gfnc
->counters
.ctxqueue_len_peak
);
365 json_object_int_add(jo
, "buffer-full-hits", gfnc
->counters
.buffer_full
);
366 json_object_int_add(jo
, "user-configures",
367 gfnc
->counters
.user_configures
);
368 json_object_int_add(jo
, "user-disables", gfnc
->counters
.user_disables
);
374 static int fpm_write_config(struct vty
*vty
)
376 struct sockaddr_in
*sin
;
377 struct sockaddr_in6
*sin6
;
383 switch (gfnc
->addr
.ss_family
) {
386 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
387 vty_out(vty
, "fpm address %pI4", &sin
->sin_addr
);
388 if (sin
->sin_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
389 vty_out(vty
, " port %d", ntohs(sin
->sin_port
));
395 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
396 vty_out(vty
, "fpm address %pI6", &sin6
->sin6_addr
);
397 if (sin6
->sin6_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
398 vty_out(vty
, " port %d", ntohs(sin6
->sin6_port
));
407 if (!gfnc
->use_nhg
) {
408 vty_out(vty
, "no fpm use-next-hop-groups\n");
415 static struct cmd_node fpm_node
= {
419 .config_write
= fpm_write_config
,
425 static void fpm_connect(struct thread
*t
);
427 static void fpm_reconnect(struct fpm_nl_ctx
*fnc
)
429 /* Cancel all zebra threads first. */
430 thread_cancel_async(zrouter
.master
, &fnc
->t_lspreset
, NULL
);
431 thread_cancel_async(zrouter
.master
, &fnc
->t_lspwalk
, NULL
);
432 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgreset
, NULL
);
433 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgwalk
, NULL
);
434 thread_cancel_async(zrouter
.master
, &fnc
->t_ribreset
, NULL
);
435 thread_cancel_async(zrouter
.master
, &fnc
->t_ribwalk
, NULL
);
436 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacreset
, NULL
);
437 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacwalk
, NULL
);
440 * Grab the lock to empty the streams (data plane might try to
441 * enqueue updates while we are closing).
443 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
445 /* Avoid calling close on `-1`. */
446 if (fnc
->socket
!= -1) {
451 stream_reset(fnc
->ibuf
);
452 stream_reset(fnc
->obuf
);
453 THREAD_OFF(fnc
->t_read
);
454 THREAD_OFF(fnc
->t_write
);
456 /* FPM is disabled, don't attempt to connect. */
460 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
464 static void fpm_read(struct thread
*t
)
466 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
470 struct nlmsghdr
*hdr
;
471 struct zebra_dplane_ctx
*ctx
;
472 size_t available_bytes
;
473 size_t hdr_available_bytes
;
475 /* Let's ignore the input at the moment. */
476 rv
= stream_read_try(fnc
->ibuf
, fnc
->socket
,
477 STREAM_WRITEABLE(fnc
->ibuf
));
479 atomic_fetch_add_explicit(&fnc
->counters
.connection_closes
, 1,
480 memory_order_relaxed
);
482 if (IS_ZEBRA_DEBUG_FPM
)
483 zlog_debug("%s: connection closed", __func__
);
489 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
490 memory_order_relaxed
);
491 zlog_warn("%s: connection failure: %s", __func__
,
497 /* Schedule the next read */
498 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, fnc
->socket
,
501 /* We've got an interruption. */
506 /* Account all bytes read. */
507 atomic_fetch_add_explicit(&fnc
->counters
.bytes_read
, rv
,
508 memory_order_relaxed
);
510 available_bytes
= STREAM_READABLE(fnc
->ibuf
);
511 while (available_bytes
) {
512 if (available_bytes
< (ssize_t
)FPM_MSG_HDR_LEN
) {
513 stream_pulldown(fnc
->ibuf
);
517 fpm
.version
= stream_getc(fnc
->ibuf
);
518 fpm
.msg_type
= stream_getc(fnc
->ibuf
);
519 fpm
.msg_len
= stream_getw(fnc
->ibuf
);
521 if (fpm
.version
!= FPM_PROTO_VERSION
&&
522 fpm
.msg_type
!= FPM_MSG_TYPE_NETLINK
) {
523 stream_reset(fnc
->ibuf
);
525 "%s: Received version/msg_type %u/%u, expected 1/1",
526 __func__
, fpm
.version
, fpm
.msg_type
);
533 * If the passed in length doesn't even fill in the header
534 * something is wrong and reset.
536 if (fpm
.msg_len
< FPM_MSG_HDR_LEN
) {
538 "%s: Received message length: %u that does not even fill the FPM header",
539 __func__
, fpm
.msg_len
);
545 * If we have not received the whole payload, reset the stream
546 * back to the beginning of the header and move it to the
549 if (fpm
.msg_len
> available_bytes
) {
550 stream_rewind_getp(fnc
->ibuf
, FPM_MSG_HDR_LEN
);
551 stream_pulldown(fnc
->ibuf
);
555 available_bytes
-= FPM_MSG_HDR_LEN
;
558 * Place the data from the stream into a buffer
560 hdr
= (struct nlmsghdr
*)buf
;
561 stream_get(buf
, fnc
->ibuf
, fpm
.msg_len
- FPM_MSG_HDR_LEN
);
562 hdr_available_bytes
= fpm
.msg_len
- FPM_MSG_HDR_LEN
;
563 available_bytes
-= hdr_available_bytes
;
565 /* Sanity check: must be at least header size. */
566 if (hdr
->nlmsg_len
< sizeof(*hdr
)) {
568 "%s: [seq=%u] invalid message length %u (< %zu)",
569 __func__
, hdr
->nlmsg_seq
, hdr
->nlmsg_len
,
573 if (hdr
->nlmsg_len
> fpm
.msg_len
) {
575 "%s: Received a inner header length of %u that is greater than the fpm total length of %u",
576 __func__
, hdr
->nlmsg_len
, fpm
.msg_len
);
579 /* Not enough bytes available. */
580 if (hdr
->nlmsg_len
> hdr_available_bytes
) {
582 "%s: [seq=%u] invalid message length %u (> %zu)",
583 __func__
, hdr
->nlmsg_seq
, hdr
->nlmsg_len
,
588 if (!(hdr
->nlmsg_flags
& NLM_F_REQUEST
)) {
589 if (IS_ZEBRA_DEBUG_FPM
)
591 "%s: [seq=%u] not a request, skipping",
592 __func__
, hdr
->nlmsg_seq
);
595 * This request is a bust, go to the next one
600 switch (hdr
->nlmsg_type
) {
602 ctx
= dplane_ctx_alloc();
603 dplane_ctx_set_op(ctx
, DPLANE_OP_ROUTE_NOTIFY
);
604 if (netlink_route_change_read_unicast_internal(
605 hdr
, 0, false, ctx
) != 1) {
606 dplane_ctx_fini(&ctx
);
607 stream_pulldown(fnc
->ibuf
);
609 * Let's continue to read other messages
610 * Even if we ignore this one.
615 if (IS_ZEBRA_DEBUG_FPM
)
617 "%s: Received message type %u which is not currently handled",
618 __func__
, hdr
->nlmsg_type
);
623 stream_reset(fnc
->ibuf
);
626 static void fpm_write(struct thread
*t
)
628 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
634 if (fnc
->connecting
== true) {
636 statuslen
= sizeof(status
);
638 rv
= getsockopt(fnc
->socket
, SOL_SOCKET
, SO_ERROR
, &status
,
640 if (rv
== -1 || status
!= 0) {
642 zlog_warn("%s: connection failed: %s", __func__
,
645 zlog_warn("%s: SO_ERROR failed: %s", __func__
,
648 atomic_fetch_add_explicit(
649 &fnc
->counters
.connection_errors
, 1,
650 memory_order_relaxed
);
656 fnc
->connecting
= false;
659 * Starting with LSPs walk all FPM objects, marking them
660 * as unsent and then replaying them.
662 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
665 /* Permit receiving messages now. */
666 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
,
667 fnc
->socket
, &fnc
->t_read
);
670 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
673 /* Stream is empty: reset pointers and return. */
674 if (STREAM_READABLE(fnc
->obuf
) == 0) {
675 stream_reset(fnc
->obuf
);
679 /* Try to write all at once. */
680 btotal
= stream_get_endp(fnc
->obuf
) -
681 stream_get_getp(fnc
->obuf
);
682 bwritten
= write(fnc
->socket
, stream_pnt(fnc
->obuf
), btotal
);
684 atomic_fetch_add_explicit(
685 &fnc
->counters
.connection_closes
, 1,
686 memory_order_relaxed
);
688 if (IS_ZEBRA_DEBUG_FPM
)
689 zlog_debug("%s: connection closed", __func__
);
692 if (bwritten
== -1) {
693 /* Attempt to continue if blocked by a signal. */
696 /* Receiver is probably slow, lets give it some time. */
697 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
700 atomic_fetch_add_explicit(
701 &fnc
->counters
.connection_errors
, 1,
702 memory_order_relaxed
);
703 zlog_warn("%s: connection failure: %s", __func__
,
710 /* Account all bytes sent. */
711 atomic_fetch_add_explicit(&fnc
->counters
.bytes_sent
, bwritten
,
712 memory_order_relaxed
);
714 /* Account number of bytes free. */
715 atomic_fetch_sub_explicit(&fnc
->counters
.obuf_bytes
, bwritten
,
716 memory_order_relaxed
);
718 stream_forward_getp(fnc
->obuf
, (size_t)bwritten
);
721 /* Stream is not empty yet, we must schedule more writes. */
722 if (STREAM_READABLE(fnc
->obuf
)) {
723 stream_pulldown(fnc
->obuf
);
724 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
,
725 fnc
->socket
, &fnc
->t_write
);
730 static void fpm_connect(struct thread
*t
)
732 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
733 struct sockaddr_in
*sin
= (struct sockaddr_in
*)&fnc
->addr
;
734 struct sockaddr_in6
*sin6
= (struct sockaddr_in6
*)&fnc
->addr
;
737 char addrstr
[INET6_ADDRSTRLEN
];
739 sock
= socket(fnc
->addr
.ss_family
, SOCK_STREAM
, 0);
741 zlog_err("%s: fpm socket failed: %s", __func__
,
743 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
748 set_nonblocking(sock
);
750 if (fnc
->addr
.ss_family
== AF_INET
) {
751 inet_ntop(AF_INET
, &sin
->sin_addr
, addrstr
, sizeof(addrstr
));
754 inet_ntop(AF_INET6
, &sin6
->sin6_addr
, addrstr
, sizeof(addrstr
));
755 slen
= sizeof(*sin6
);
758 if (IS_ZEBRA_DEBUG_FPM
)
759 zlog_debug("%s: attempting to connect to %s:%d", __func__
,
760 addrstr
, ntohs(sin
->sin_port
));
762 rv
= connect(sock
, (struct sockaddr
*)&fnc
->addr
, slen
);
763 if (rv
== -1 && errno
!= EINPROGRESS
) {
764 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
765 memory_order_relaxed
);
767 zlog_warn("%s: fpm connection failed: %s", __func__
,
769 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
774 fnc
->connecting
= (errno
== EINPROGRESS
);
776 if (!fnc
->connecting
)
777 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, sock
,
779 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, sock
,
783 * Starting with LSPs walk all FPM objects, marking them
784 * as unsent and then replaying them.
786 * If we are not connected, then delay the objects reset/send.
788 if (!fnc
->connecting
)
789 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
794 * Encode data plane operation context into netlink and enqueue it in the FPM
797 * @param fnc the netlink FPM context.
798 * @param ctx the data plane operation context data.
799 * @return 0 on success or -1 on not enough space.
801 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
)
803 uint8_t nl_buf
[NL_PKT_BUF_SIZE
];
806 uint64_t obytes
, obytes_peak
;
807 enum dplane_op_e op
= dplane_ctx_get_op(ctx
);
810 * If we were configured to not use next hop groups, then quit as soon
814 && (op
== DPLANE_OP_NH_DELETE
|| op
== DPLANE_OP_NH_INSTALL
815 || op
== DPLANE_OP_NH_UPDATE
))
820 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
823 case DPLANE_OP_ROUTE_UPDATE
:
824 case DPLANE_OP_ROUTE_DELETE
:
825 rv
= netlink_route_multipath_msg_encode(RTM_DELROUTE
, ctx
,
826 nl_buf
, sizeof(nl_buf
),
830 "%s: netlink_route_multipath_msg_encode failed",
835 nl_buf_len
= (size_t)rv
;
837 /* UPDATE operations need a INSTALL, otherwise just quit. */
838 if (op
== DPLANE_OP_ROUTE_DELETE
)
842 case DPLANE_OP_ROUTE_INSTALL
:
843 rv
= netlink_route_multipath_msg_encode(
844 RTM_NEWROUTE
, ctx
, &nl_buf
[nl_buf_len
],
845 sizeof(nl_buf
) - nl_buf_len
, true, fnc
->use_nhg
);
848 "%s: netlink_route_multipath_msg_encode failed",
853 nl_buf_len
+= (size_t)rv
;
856 case DPLANE_OP_MAC_INSTALL
:
857 case DPLANE_OP_MAC_DELETE
:
858 rv
= netlink_macfdb_update_ctx(ctx
, nl_buf
, sizeof(nl_buf
));
860 zlog_err("%s: netlink_macfdb_update_ctx failed",
865 nl_buf_len
= (size_t)rv
;
868 case DPLANE_OP_NH_DELETE
:
869 rv
= netlink_nexthop_msg_encode(RTM_DELNEXTHOP
, ctx
, nl_buf
,
870 sizeof(nl_buf
), true);
872 zlog_err("%s: netlink_nexthop_msg_encode failed",
877 nl_buf_len
= (size_t)rv
;
879 case DPLANE_OP_NH_INSTALL
:
880 case DPLANE_OP_NH_UPDATE
:
881 rv
= netlink_nexthop_msg_encode(RTM_NEWNEXTHOP
, ctx
, nl_buf
,
882 sizeof(nl_buf
), true);
884 zlog_err("%s: netlink_nexthop_msg_encode failed",
889 nl_buf_len
= (size_t)rv
;
892 case DPLANE_OP_LSP_INSTALL
:
893 case DPLANE_OP_LSP_UPDATE
:
894 case DPLANE_OP_LSP_DELETE
:
895 rv
= netlink_lsp_msg_encoder(ctx
, nl_buf
, sizeof(nl_buf
));
897 zlog_err("%s: netlink_lsp_msg_encoder failed",
902 nl_buf_len
+= (size_t)rv
;
905 /* Un-handled by FPM at this time. */
906 case DPLANE_OP_PW_INSTALL
:
907 case DPLANE_OP_PW_UNINSTALL
:
908 case DPLANE_OP_ADDR_INSTALL
:
909 case DPLANE_OP_ADDR_UNINSTALL
:
910 case DPLANE_OP_NEIGH_INSTALL
:
911 case DPLANE_OP_NEIGH_UPDATE
:
912 case DPLANE_OP_NEIGH_DELETE
:
913 case DPLANE_OP_VTEP_ADD
:
914 case DPLANE_OP_VTEP_DELETE
:
915 case DPLANE_OP_SYS_ROUTE_ADD
:
916 case DPLANE_OP_SYS_ROUTE_DELETE
:
917 case DPLANE_OP_ROUTE_NOTIFY
:
918 case DPLANE_OP_LSP_NOTIFY
:
919 case DPLANE_OP_RULE_ADD
:
920 case DPLANE_OP_RULE_DELETE
:
921 case DPLANE_OP_RULE_UPDATE
:
922 case DPLANE_OP_NEIGH_DISCOVER
:
923 case DPLANE_OP_BR_PORT_UPDATE
:
924 case DPLANE_OP_IPTABLE_ADD
:
925 case DPLANE_OP_IPTABLE_DELETE
:
926 case DPLANE_OP_IPSET_ADD
:
927 case DPLANE_OP_IPSET_DELETE
:
928 case DPLANE_OP_IPSET_ENTRY_ADD
:
929 case DPLANE_OP_IPSET_ENTRY_DELETE
:
930 case DPLANE_OP_NEIGH_IP_INSTALL
:
931 case DPLANE_OP_NEIGH_IP_DELETE
:
932 case DPLANE_OP_NEIGH_TABLE_UPDATE
:
933 case DPLANE_OP_GRE_SET
:
934 case DPLANE_OP_INTF_ADDR_ADD
:
935 case DPLANE_OP_INTF_ADDR_DEL
:
936 case DPLANE_OP_INTF_NETCONFIG
:
937 case DPLANE_OP_INTF_INSTALL
:
938 case DPLANE_OP_INTF_UPDATE
:
939 case DPLANE_OP_INTF_DELETE
:
940 case DPLANE_OP_TC_QDISC_INSTALL
:
941 case DPLANE_OP_TC_QDISC_UNINSTALL
:
942 case DPLANE_OP_TC_CLASS_ADD
:
943 case DPLANE_OP_TC_CLASS_DELETE
:
944 case DPLANE_OP_TC_CLASS_UPDATE
:
945 case DPLANE_OP_TC_FILTER_ADD
:
946 case DPLANE_OP_TC_FILTER_DELETE
:
947 case DPLANE_OP_TC_FILTER_UPDATE
:
953 /* Skip empty enqueues. */
957 /* We must know if someday a message goes beyond 65KiB. */
958 assert((nl_buf_len
+ FPM_HEADER_SIZE
) <= UINT16_MAX
);
960 /* Check if we have enough buffer space. */
961 if (STREAM_WRITEABLE(fnc
->obuf
) < (nl_buf_len
+ FPM_HEADER_SIZE
)) {
962 atomic_fetch_add_explicit(&fnc
->counters
.buffer_full
, 1,
963 memory_order_relaxed
);
965 if (IS_ZEBRA_DEBUG_FPM
)
967 "%s: buffer full: wants to write %zu but has %zu",
968 __func__
, nl_buf_len
+ FPM_HEADER_SIZE
,
969 STREAM_WRITEABLE(fnc
->obuf
));
975 * Fill in the FPM header information.
977 * See FPM_HEADER_SIZE definition for more information.
979 stream_putc(fnc
->obuf
, 1);
980 stream_putc(fnc
->obuf
, 1);
981 stream_putw(fnc
->obuf
, nl_buf_len
+ FPM_HEADER_SIZE
);
983 /* Write current data. */
984 stream_write(fnc
->obuf
, nl_buf
, (size_t)nl_buf_len
);
986 /* Account number of bytes waiting to be written. */
987 atomic_fetch_add_explicit(&fnc
->counters
.obuf_bytes
,
988 nl_buf_len
+ FPM_HEADER_SIZE
,
989 memory_order_relaxed
);
990 obytes
= atomic_load_explicit(&fnc
->counters
.obuf_bytes
,
991 memory_order_relaxed
);
992 obytes_peak
= atomic_load_explicit(&fnc
->counters
.obuf_peak
,
993 memory_order_relaxed
);
994 if (obytes_peak
< obytes
)
995 atomic_store_explicit(&fnc
->counters
.obuf_peak
, obytes
,
996 memory_order_relaxed
);
998 /* Tell the thread to start writing. */
999 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, fnc
->socket
,
1006 * LSP walk/send functions
1008 struct fpm_lsp_arg
{
1009 struct zebra_dplane_ctx
*ctx
;
1010 struct fpm_nl_ctx
*fnc
;
1014 static int fpm_lsp_send_cb(struct hash_bucket
*bucket
, void *arg
)
1016 struct zebra_lsp
*lsp
= bucket
->data
;
1017 struct fpm_lsp_arg
*fla
= arg
;
1019 /* Skip entries which have already been sent */
1020 if (CHECK_FLAG(lsp
->flags
, LSP_FLAG_FPM
))
1021 return HASHWALK_CONTINUE
;
1023 dplane_ctx_reset(fla
->ctx
);
1024 dplane_ctx_lsp_init(fla
->ctx
, DPLANE_OP_LSP_INSTALL
, lsp
);
1026 if (fpm_nl_enqueue(fla
->fnc
, fla
->ctx
) == -1) {
1027 fla
->complete
= false;
1028 return HASHWALK_ABORT
;
1031 /* Mark entry as sent */
1032 SET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
1033 return HASHWALK_CONTINUE
;
1036 static void fpm_lsp_send(struct thread
*t
)
1038 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1039 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
1040 struct fpm_lsp_arg fla
;
1043 fla
.ctx
= dplane_ctx_alloc();
1044 fla
.complete
= true;
1046 hash_walk(zvrf
->lsp_table
, fpm_lsp_send_cb
, &fla
);
1048 dplane_ctx_fini(&fla
.ctx
);
1051 WALK_FINISH(fnc
, FNE_LSP_FINISHED
);
1053 /* Now move onto routes */
1054 thread_add_timer(zrouter
.master
, fpm_nhg_reset
, fnc
, 0,
1057 /* Didn't finish - reschedule LSP walk */
1058 thread_add_timer(zrouter
.master
, fpm_lsp_send
, fnc
, 0,
1064 * Next hop walk/send functions.
1066 struct fpm_nhg_arg
{
1067 struct zebra_dplane_ctx
*ctx
;
1068 struct fpm_nl_ctx
*fnc
;
1072 static int fpm_nhg_send_cb(struct hash_bucket
*bucket
, void *arg
)
1074 struct nhg_hash_entry
*nhe
= bucket
->data
;
1075 struct fpm_nhg_arg
*fna
= arg
;
1077 /* This entry was already sent, skip it. */
1078 if (CHECK_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
))
1079 return HASHWALK_CONTINUE
;
1081 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
1082 dplane_ctx_reset(fna
->ctx
);
1083 dplane_ctx_nexthop_init(fna
->ctx
, DPLANE_OP_NH_INSTALL
, nhe
);
1084 if (fpm_nl_enqueue(fna
->fnc
, fna
->ctx
) == -1) {
1085 /* Our buffers are full, lets give it some cycles. */
1086 fna
->complete
= false;
1087 return HASHWALK_ABORT
;
1090 /* Mark group as sent, so it doesn't get sent again. */
1091 SET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
1093 return HASHWALK_CONTINUE
;
1096 static void fpm_nhg_send(struct thread
*t
)
1098 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1099 struct fpm_nhg_arg fna
;
1102 fna
.ctx
= dplane_ctx_alloc();
1103 fna
.complete
= true;
1105 /* Send next hops. */
1107 hash_walk(zrouter
.nhgs_id
, fpm_nhg_send_cb
, &fna
);
1109 /* `free()` allocated memory. */
1110 dplane_ctx_fini(&fna
.ctx
);
1112 /* We are done sending next hops, lets install the routes now. */
1114 WALK_FINISH(fnc
, FNE_NHG_FINISHED
);
1115 thread_add_timer(zrouter
.master
, fpm_rib_reset
, fnc
, 0,
1117 } else /* Otherwise reschedule next hop group again. */
1118 thread_add_timer(zrouter
.master
, fpm_nhg_send
, fnc
, 0,
1123 * Send all RIB installed routes to the connected data plane.
1125 static void fpm_rib_send(struct thread
*t
)
1127 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1129 struct route_node
*rn
;
1130 struct route_table
*rt
;
1131 struct zebra_dplane_ctx
*ctx
;
1132 rib_tables_iter_t rt_iter
;
1134 /* Allocate temporary context for all transactions. */
1135 ctx
= dplane_ctx_alloc();
1137 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1138 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1139 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1140 dest
= rib_dest_from_rnode(rn
);
1141 /* Skip bad route entries. */
1142 if (dest
== NULL
|| dest
->selected_fib
== NULL
)
1145 /* Check for already sent routes. */
1146 if (CHECK_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
))
1149 /* Enqueue route install. */
1150 dplane_ctx_reset(ctx
);
1151 dplane_ctx_route_init(ctx
, DPLANE_OP_ROUTE_INSTALL
, rn
,
1152 dest
->selected_fib
);
1153 if (fpm_nl_enqueue(fnc
, ctx
) == -1) {
1154 /* Free the temporary allocated context. */
1155 dplane_ctx_fini(&ctx
);
1157 thread_add_timer(zrouter
.master
, fpm_rib_send
,
1158 fnc
, 1, &fnc
->t_ribwalk
);
1163 SET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1167 /* Free the temporary allocated context. */
1168 dplane_ctx_fini(&ctx
);
1170 /* All RIB routes sent! */
1171 WALK_FINISH(fnc
, FNE_RIB_FINISHED
);
1173 /* Schedule next event: RMAC reset. */
1174 thread_add_event(zrouter
.master
, fpm_rmac_reset
, fnc
, 0,
1179 * The next three functions will handle RMAC enqueue.
1181 struct fpm_rmac_arg
{
1182 struct zebra_dplane_ctx
*ctx
;
1183 struct fpm_nl_ctx
*fnc
;
1184 struct zebra_l3vni
*zl3vni
;
1188 static void fpm_enqueue_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1190 struct fpm_rmac_arg
*fra
= arg
;
1191 struct zebra_mac
*zrmac
= bucket
->data
;
1192 struct zebra_if
*zif
= fra
->zl3vni
->vxlan_if
->info
;
1193 struct zebra_vxlan_vni
*vni
;
1194 struct zebra_if
*br_zif
;
1198 /* Entry already sent. */
1199 if (CHECK_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
) || !fra
->complete
)
1202 sticky
= !!CHECK_FLAG(zrmac
->flags
,
1203 (ZEBRA_MAC_STICKY
| ZEBRA_MAC_REMOTE_DEF_GW
));
1204 br_zif
= (struct zebra_if
*)(zif
->brslave_info
.br_if
->info
);
1205 vni
= zebra_vxlan_if_vni_find(zif
, fra
->zl3vni
->vni
);
1206 vid
= IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif
) ? vni
->access_vlan
: 0;
1208 dplane_ctx_reset(fra
->ctx
);
1209 dplane_ctx_set_op(fra
->ctx
, DPLANE_OP_MAC_INSTALL
);
1210 dplane_mac_init(fra
->ctx
, fra
->zl3vni
->vxlan_if
,
1211 zif
->brslave_info
.br_if
, vid
, &zrmac
->macaddr
, vni
->vni
,
1212 zrmac
->fwd_info
.r_vtep_ip
, sticky
, 0 /*nhg*/,
1213 0 /*update_flags*/);
1214 if (fpm_nl_enqueue(fra
->fnc
, fra
->ctx
) == -1) {
1215 thread_add_timer(zrouter
.master
, fpm_rmac_send
,
1216 fra
->fnc
, 1, &fra
->fnc
->t_rmacwalk
);
1217 fra
->complete
= false;
1221 static void fpm_enqueue_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1223 struct fpm_rmac_arg
*fra
= arg
;
1224 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1226 fra
->zl3vni
= zl3vni
;
1227 hash_iterate(zl3vni
->rmac_table
, fpm_enqueue_rmac_table
, zl3vni
);
1230 static void fpm_rmac_send(struct thread
*t
)
1232 struct fpm_rmac_arg fra
;
1234 fra
.fnc
= THREAD_ARG(t
);
1235 fra
.ctx
= dplane_ctx_alloc();
1236 fra
.complete
= true;
1237 hash_iterate(zrouter
.l3vni_table
, fpm_enqueue_l3vni_table
, &fra
);
1238 dplane_ctx_fini(&fra
.ctx
);
1240 /* RMAC walk completed. */
1242 WALK_FINISH(fra
.fnc
, FNE_RMAC_FINISHED
);
1246 * Resets the next hop FPM flags so we send all next hops again.
1248 static void fpm_nhg_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1250 struct nhg_hash_entry
*nhe
= bucket
->data
;
1252 /* Unset FPM installation flag so it gets installed again. */
1253 UNSET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
1256 static void fpm_nhg_reset(struct thread
*t
)
1258 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1260 hash_iterate(zrouter
.nhgs_id
, fpm_nhg_reset_cb
, NULL
);
1262 /* Schedule next step: send next hop groups. */
1263 thread_add_event(zrouter
.master
, fpm_nhg_send
, fnc
, 0, &fnc
->t_nhgwalk
);
1267 * Resets the LSP FPM flag so we send all LSPs again.
1269 static void fpm_lsp_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1271 struct zebra_lsp
*lsp
= bucket
->data
;
1273 UNSET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
1276 static void fpm_lsp_reset(struct thread
*t
)
1278 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1279 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
1281 hash_iterate(zvrf
->lsp_table
, fpm_lsp_reset_cb
, NULL
);
1283 /* Schedule next step: send LSPs */
1284 thread_add_event(zrouter
.master
, fpm_lsp_send
, fnc
, 0, &fnc
->t_lspwalk
);
1288 * Resets the RIB FPM flags so we send all routes again.
1290 static void fpm_rib_reset(struct thread
*t
)
1292 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1294 struct route_node
*rn
;
1295 struct route_table
*rt
;
1296 rib_tables_iter_t rt_iter
;
1298 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1299 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1300 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1301 dest
= rib_dest_from_rnode(rn
);
1302 /* Skip bad route entries. */
1306 UNSET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1310 /* Schedule next step: send RIB routes. */
1311 thread_add_event(zrouter
.master
, fpm_rib_send
, fnc
, 0, &fnc
->t_ribwalk
);
1315 * The next three function will handle RMAC table reset.
1317 static void fpm_unset_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1319 struct zebra_mac
*zrmac
= bucket
->data
;
1321 UNSET_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
);
1324 static void fpm_unset_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1326 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1328 hash_iterate(zl3vni
->rmac_table
, fpm_unset_rmac_table
, zl3vni
);
1331 static void fpm_rmac_reset(struct thread
*t
)
1333 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1335 hash_iterate(zrouter
.l3vni_table
, fpm_unset_l3vni_table
, NULL
);
1337 /* Schedule next event: send RMAC entries. */
1338 thread_add_event(zrouter
.master
, fpm_rmac_send
, fnc
, 0,
1342 static void fpm_process_queue(struct thread
*t
)
1344 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1345 struct zebra_dplane_ctx
*ctx
;
1346 bool no_bufs
= false;
1347 uint64_t processed_contexts
= 0;
1350 /* No space available yet. */
1351 if (STREAM_WRITEABLE(fnc
->obuf
) < NL_PKT_BUF_SIZE
) {
1356 /* Dequeue next item or quit processing. */
1357 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1358 ctx
= dplane_ctx_dequeue(&fnc
->ctxqueue
);
1364 * Intentionally ignoring the return value
1365 * as that we are ensuring that we can write to
1366 * the output data in the STREAM_WRITEABLE
1367 * check above, so we can ignore the return
1369 if (fnc
->socket
!= -1)
1370 (void)fpm_nl_enqueue(fnc
, ctx
);
1372 /* Account the processed entries. */
1373 processed_contexts
++;
1374 atomic_fetch_sub_explicit(&fnc
->counters
.ctxqueue_len
, 1,
1375 memory_order_relaxed
);
1377 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1378 dplane_provider_enqueue_out_ctx(fnc
->prov
, ctx
);
1381 /* Update count of processed contexts */
1382 atomic_fetch_add_explicit(&fnc
->counters
.dplane_contexts
,
1383 processed_contexts
, memory_order_relaxed
);
1385 /* Re-schedule if we ran out of buffer space */
1387 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1388 fnc
, 0, &fnc
->t_dequeue
);
1391 * Let the dataplane thread know if there are items in the
1392 * output queue to be processed. Otherwise they may sit
1393 * until the dataplane thread gets scheduled for new,
1396 if (dplane_provider_out_ctx_queue_len(fnc
->prov
) > 0)
1397 dplane_provider_work_ready();
1401 * Handles external (e.g. CLI, data plane or others) events.
1403 static void fpm_process_event(struct thread
*t
)
1405 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1406 enum fpm_nl_events event
= THREAD_VAL(t
);
1410 zlog_info("%s: manual FPM disable event", __func__
);
1411 fnc
->disabled
= true;
1412 atomic_fetch_add_explicit(&fnc
->counters
.user_disables
, 1,
1413 memory_order_relaxed
);
1415 /* Call reconnect to disable timers and clean up context. */
1420 zlog_info("%s: manual FPM reconnect event", __func__
);
1421 fnc
->disabled
= false;
1422 atomic_fetch_add_explicit(&fnc
->counters
.user_configures
, 1,
1423 memory_order_relaxed
);
1427 case FNE_RESET_COUNTERS
:
1428 zlog_info("%s: manual FPM counters reset event", __func__
);
1429 memset(&fnc
->counters
, 0, sizeof(fnc
->counters
));
1432 case FNE_TOGGLE_NHG
:
1433 zlog_info("%s: toggle next hop groups support", __func__
);
1434 fnc
->use_nhg
= !fnc
->use_nhg
;
1438 case FNE_INTERNAL_RECONNECT
:
1442 case FNE_NHG_FINISHED
:
1443 if (IS_ZEBRA_DEBUG_FPM
)
1444 zlog_debug("%s: next hop groups walk finished",
1447 case FNE_RIB_FINISHED
:
1448 if (IS_ZEBRA_DEBUG_FPM
)
1449 zlog_debug("%s: RIB walk finished", __func__
);
1451 case FNE_RMAC_FINISHED
:
1452 if (IS_ZEBRA_DEBUG_FPM
)
1453 zlog_debug("%s: RMAC walk finished", __func__
);
1455 case FNE_LSP_FINISHED
:
1456 if (IS_ZEBRA_DEBUG_FPM
)
1457 zlog_debug("%s: LSP walk finished", __func__
);
1463 * Data plane functions.
1465 static int fpm_nl_start(struct zebra_dplane_provider
*prov
)
1467 struct fpm_nl_ctx
*fnc
;
1469 fnc
= dplane_provider_get_data(prov
);
1470 fnc
->fthread
= frr_pthread_new(NULL
, prov_name
, prov_name
);
1471 assert(frr_pthread_run(fnc
->fthread
, NULL
) == 0);
1472 fnc
->ibuf
= stream_new(NL_PKT_BUF_SIZE
);
1473 fnc
->obuf
= stream_new(NL_PKT_BUF_SIZE
* 128);
1474 pthread_mutex_init(&fnc
->obuf_mutex
, NULL
);
1476 fnc
->disabled
= true;
1478 dplane_ctx_q_init(&fnc
->ctxqueue
);
1479 pthread_mutex_init(&fnc
->ctxqueue_mutex
, NULL
);
1481 /* Set default values. */
1482 fnc
->use_nhg
= true;
1487 static int fpm_nl_finish_early(struct fpm_nl_ctx
*fnc
)
1489 /* Disable all events and close socket. */
1490 THREAD_OFF(fnc
->t_lspreset
);
1491 THREAD_OFF(fnc
->t_lspwalk
);
1492 THREAD_OFF(fnc
->t_nhgreset
);
1493 THREAD_OFF(fnc
->t_nhgwalk
);
1494 THREAD_OFF(fnc
->t_ribreset
);
1495 THREAD_OFF(fnc
->t_ribwalk
);
1496 THREAD_OFF(fnc
->t_rmacreset
);
1497 THREAD_OFF(fnc
->t_rmacwalk
);
1498 THREAD_OFF(fnc
->t_event
);
1499 THREAD_OFF(fnc
->t_nhg
);
1500 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_read
, NULL
);
1501 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_write
, NULL
);
1502 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_connect
, NULL
);
1504 if (fnc
->socket
!= -1) {
1512 static int fpm_nl_finish_late(struct fpm_nl_ctx
*fnc
)
1514 /* Stop the running thread. */
1515 frr_pthread_stop(fnc
->fthread
, NULL
);
1517 /* Free all allocated resources. */
1518 pthread_mutex_destroy(&fnc
->obuf_mutex
);
1519 pthread_mutex_destroy(&fnc
->ctxqueue_mutex
);
1520 stream_free(fnc
->ibuf
);
1521 stream_free(fnc
->obuf
);
1528 static int fpm_nl_finish(struct zebra_dplane_provider
*prov
, bool early
)
1530 struct fpm_nl_ctx
*fnc
;
1532 fnc
= dplane_provider_get_data(prov
);
1534 return fpm_nl_finish_early(fnc
);
1536 return fpm_nl_finish_late(fnc
);
1539 static int fpm_nl_process(struct zebra_dplane_provider
*prov
)
1541 struct zebra_dplane_ctx
*ctx
;
1542 struct fpm_nl_ctx
*fnc
;
1544 uint64_t cur_queue
, peak_queue
= 0, stored_peak_queue
;
1546 fnc
= dplane_provider_get_data(prov
);
1547 limit
= dplane_provider_get_work_limit(prov
);
1548 for (counter
= 0; counter
< limit
; counter
++) {
1549 ctx
= dplane_provider_dequeue_in_ctx(prov
);
1554 * Skip all notifications if not connected, we'll walk the RIB
1557 if (fnc
->socket
!= -1 && fnc
->connecting
== false) {
1559 * Update the number of queued contexts *before*
1560 * enqueueing, to ensure counter consistency.
1562 atomic_fetch_add_explicit(&fnc
->counters
.ctxqueue_len
,
1563 1, memory_order_relaxed
);
1565 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1566 dplane_ctx_enqueue_tail(&fnc
->ctxqueue
, ctx
);
1569 cur_queue
= atomic_load_explicit(
1570 &fnc
->counters
.ctxqueue_len
,
1571 memory_order_relaxed
);
1572 if (peak_queue
< cur_queue
)
1573 peak_queue
= cur_queue
;
1577 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1578 dplane_provider_enqueue_out_ctx(prov
, ctx
);
1581 /* Update peak queue length, if we just observed a new peak */
1582 stored_peak_queue
= atomic_load_explicit(
1583 &fnc
->counters
.ctxqueue_len_peak
, memory_order_relaxed
);
1584 if (stored_peak_queue
< peak_queue
)
1585 atomic_store_explicit(&fnc
->counters
.ctxqueue_len_peak
,
1586 peak_queue
, memory_order_relaxed
);
1588 if (atomic_load_explicit(&fnc
->counters
.ctxqueue_len
,
1589 memory_order_relaxed
)
1591 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1592 fnc
, 0, &fnc
->t_dequeue
);
1594 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1595 if (counter
>= limit
)
1596 dplane_provider_work_ready();
1601 static int fpm_nl_new(struct thread_master
*tm
)
1603 struct zebra_dplane_provider
*prov
= NULL
;
1606 gfnc
= calloc(1, sizeof(*gfnc
));
1607 rv
= dplane_provider_register(prov_name
, DPLANE_PRIO_POSTPROCESS
,
1608 DPLANE_PROV_FLAG_THREADED
, fpm_nl_start
,
1609 fpm_nl_process
, fpm_nl_finish
, gfnc
,
1612 if (IS_ZEBRA_DEBUG_DPLANE
)
1613 zlog_debug("%s register status: %d", prov_name
, rv
);
1615 install_node(&fpm_node
);
1616 install_element(ENABLE_NODE
, &fpm_show_counters_cmd
);
1617 install_element(ENABLE_NODE
, &fpm_show_counters_json_cmd
);
1618 install_element(ENABLE_NODE
, &fpm_reset_counters_cmd
);
1619 install_element(CONFIG_NODE
, &fpm_set_address_cmd
);
1620 install_element(CONFIG_NODE
, &no_fpm_set_address_cmd
);
1621 install_element(CONFIG_NODE
, &fpm_use_nhg_cmd
);
1622 install_element(CONFIG_NODE
, &no_fpm_use_nhg_cmd
);
1627 static int fpm_nl_init(void)
1629 hook_register(frr_late_init
, fpm_nl_new
);
1634 .name
= "dplane_fpm_nl",
1636 .description
= "Data plane plugin for FPM using netlink.",
1637 .init
= fpm_nl_init
,