2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "config.h" /* Include this explicitly */
26 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
34 #include "lib/zebra.h"
36 #include "lib/libfrr.h"
37 #include "lib/frratomic.h"
38 #include "lib/command.h"
39 #include "lib/memory.h"
40 #include "lib/network.h"
42 #include "lib/frr_pthread.h"
43 #include "zebra/debug.h"
44 #include "zebra/interface.h"
45 #include "zebra/zebra_dplane.h"
46 #include "zebra/zebra_mpls.h"
47 #include "zebra/zebra_router.h"
48 #include "zebra/zebra_evpn.h"
49 #include "zebra/zebra_evpn_mac.h"
50 #include "zebra/zebra_vxlan_private.h"
51 #include "zebra/kernel_netlink.h"
52 #include "zebra/rt_netlink.h"
53 #include "zebra/debug.h"
55 #define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
56 #define SOUTHBOUND_DEFAULT_PORT 2620
61 * version: 1 byte (always 1),
62 * type: 1 byte (1 for netlink, 2 protobuf),
63 * len: 2 bytes (network order),
66 * This header is used with any format to tell the users how many bytes to
69 #define FPM_HEADER_SIZE 4
71 static const char *prov_name
= "dplane_fpm_nl";
74 /* data plane connection. */
79 struct sockaddr_storage addr
;
81 /* data plane buffers. */
84 pthread_mutex_t obuf_mutex
;
87 * data plane context queue:
88 * When a FPM server connection becomes a bottleneck, we must keep the
89 * data plane contexts until we get a chance to process them.
91 struct dplane_ctx_q ctxqueue
;
92 pthread_mutex_t ctxqueue_mutex
;
94 /* data plane events. */
95 struct zebra_dplane_provider
*prov
;
96 struct frr_pthread
*fthread
;
97 struct thread
*t_connect
;
98 struct thread
*t_read
;
99 struct thread
*t_write
;
100 struct thread
*t_event
;
101 struct thread
*t_dequeue
;
104 struct thread
*t_lspreset
;
105 struct thread
*t_lspwalk
;
106 struct thread
*t_nhgreset
;
107 struct thread
*t_nhgwalk
;
108 struct thread
*t_ribreset
;
109 struct thread
*t_ribwalk
;
110 struct thread
*t_rmacreset
;
111 struct thread
*t_rmacwalk
;
113 /* Statistic counters. */
115 /* Amount of bytes read into ibuf. */
116 _Atomic
uint32_t bytes_read
;
117 /* Amount of bytes written from obuf. */
118 _Atomic
uint32_t bytes_sent
;
119 /* Output buffer current usage. */
120 _Atomic
uint32_t obuf_bytes
;
121 /* Output buffer peak usage. */
122 _Atomic
uint32_t obuf_peak
;
124 /* Amount of connection closes. */
125 _Atomic
uint32_t connection_closes
;
126 /* Amount of connection errors. */
127 _Atomic
uint32_t connection_errors
;
129 /* Amount of user configurations: FNE_RECONNECT. */
130 _Atomic
uint32_t user_configures
;
131 /* Amount of user disable requests: FNE_DISABLE. */
132 _Atomic
uint32_t user_disables
;
134 /* Amount of data plane context processed. */
135 _Atomic
uint32_t dplane_contexts
;
136 /* Amount of data plane contexts enqueued. */
137 _Atomic
uint32_t ctxqueue_len
;
138 /* Peak amount of data plane contexts enqueued. */
139 _Atomic
uint32_t ctxqueue_len_peak
;
141 /* Amount of buffer full events. */
142 _Atomic
uint32_t buffer_full
;
147 /* Ask for FPM to reconnect the external server. */
151 /* Reset counters. */
153 /* Toggle next hop group feature. */
155 /* Reconnect request by our own code to avoid races. */
156 FNE_INTERNAL_RECONNECT
,
158 /* LSP walk finished. */
160 /* Next hop groups walk finished. */
162 /* RIB walk finished. */
164 /* RMAC walk finished. */
168 #define FPM_RECONNECT(fnc) \
169 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
170 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
172 #define WALK_FINISH(fnc, ev) \
173 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
179 static void fpm_process_event(struct thread
*t
);
180 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
);
181 static void fpm_lsp_send(struct thread
*t
);
182 static void fpm_lsp_reset(struct thread
*t
);
183 static void fpm_nhg_send(struct thread
*t
);
184 static void fpm_nhg_reset(struct thread
*t
);
185 static void fpm_rib_send(struct thread
*t
);
186 static void fpm_rib_reset(struct thread
*t
);
187 static void fpm_rmac_send(struct thread
*t
);
188 static void fpm_rmac_reset(struct thread
*t
);
193 #define FPM_STR "Forwarding Plane Manager configuration\n"
195 DEFUN(fpm_set_address
, fpm_set_address_cmd
,
196 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
198 "FPM remote listening server address\n"
199 "Remote IPv4 FPM server\n"
200 "Remote IPv6 FPM server\n"
201 "FPM remote listening server port\n"
202 "Remote FPM server port\n")
204 struct sockaddr_in
*sin
;
205 struct sockaddr_in6
*sin6
;
207 uint8_t naddr
[INET6_BUFSIZ
];
210 port
= strtol(argv
[4]->arg
, NULL
, 10);
212 /* Handle IPv4 addresses. */
213 if (inet_pton(AF_INET
, argv
[2]->arg
, naddr
) == 1) {
214 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
216 memset(sin
, 0, sizeof(*sin
));
217 sin
->sin_family
= AF_INET
;
219 port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
220 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
221 sin
->sin_len
= sizeof(*sin
);
222 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
223 memcpy(&sin
->sin_addr
, naddr
, sizeof(sin
->sin_addr
));
228 /* Handle IPv6 addresses. */
229 if (inet_pton(AF_INET6
, argv
[2]->arg
, naddr
) != 1) {
230 vty_out(vty
, "%% Invalid address: %s\n", argv
[2]->arg
);
234 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
235 memset(sin6
, 0, sizeof(*sin6
));
236 sin6
->sin6_family
= AF_INET6
;
237 sin6
->sin6_port
= port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
238 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
239 sin6
->sin6_len
= sizeof(*sin6
);
240 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
241 memcpy(&sin6
->sin6_addr
, naddr
, sizeof(sin6
->sin6_addr
));
244 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
245 FNE_RECONNECT
, &gfnc
->t_event
);
249 DEFUN(no_fpm_set_address
, no_fpm_set_address_cmd
,
250 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
253 "FPM remote listening server address\n"
254 "Remote IPv4 FPM server\n"
255 "Remote IPv6 FPM server\n"
256 "FPM remote listening server port\n"
257 "Remote FPM server port\n")
259 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
260 FNE_DISABLE
, &gfnc
->t_event
);
264 DEFUN(fpm_use_nhg
, fpm_use_nhg_cmd
,
265 "fpm use-next-hop-groups",
267 "Use netlink next hop groups feature.\n")
269 /* Already enabled. */
273 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
274 FNE_TOGGLE_NHG
, &gfnc
->t_event
);
279 DEFUN(no_fpm_use_nhg
, no_fpm_use_nhg_cmd
,
280 "no fpm use-next-hop-groups",
283 "Use netlink next hop groups feature.\n")
285 /* Already disabled. */
289 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
290 FNE_TOGGLE_NHG
, &gfnc
->t_event
);
295 DEFUN(fpm_reset_counters
, fpm_reset_counters_cmd
,
296 "clear fpm counters",
299 "FPM statistic counters\n")
301 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
302 FNE_RESET_COUNTERS
, &gfnc
->t_event
);
306 DEFUN(fpm_show_counters
, fpm_show_counters_cmd
,
310 "FPM statistic counters\n")
312 vty_out(vty
, "%30s\n%30s\n", "FPM counters", "============");
314 #define SHOW_COUNTER(label, counter) \
315 vty_out(vty, "%28s: %u\n", (label), (counter))
317 SHOW_COUNTER("Input bytes", gfnc
->counters
.bytes_read
);
318 SHOW_COUNTER("Output bytes", gfnc
->counters
.bytes_sent
);
319 SHOW_COUNTER("Output buffer current size", gfnc
->counters
.obuf_bytes
);
320 SHOW_COUNTER("Output buffer peak size", gfnc
->counters
.obuf_peak
);
321 SHOW_COUNTER("Connection closes", gfnc
->counters
.connection_closes
);
322 SHOW_COUNTER("Connection errors", gfnc
->counters
.connection_errors
);
323 SHOW_COUNTER("Data plane items processed",
324 gfnc
->counters
.dplane_contexts
);
325 SHOW_COUNTER("Data plane items enqueued",
326 gfnc
->counters
.ctxqueue_len
);
327 SHOW_COUNTER("Data plane items queue peak",
328 gfnc
->counters
.ctxqueue_len_peak
);
329 SHOW_COUNTER("Buffer full hits", gfnc
->counters
.buffer_full
);
330 SHOW_COUNTER("User FPM configurations", gfnc
->counters
.user_configures
);
331 SHOW_COUNTER("User FPM disable requests", gfnc
->counters
.user_disables
);
338 DEFUN(fpm_show_counters_json
, fpm_show_counters_json_cmd
,
339 "show fpm counters json",
342 "FPM statistic counters\n"
345 struct json_object
*jo
;
347 jo
= json_object_new_object();
348 json_object_int_add(jo
, "bytes-read", gfnc
->counters
.bytes_read
);
349 json_object_int_add(jo
, "bytes-sent", gfnc
->counters
.bytes_sent
);
350 json_object_int_add(jo
, "obuf-bytes", gfnc
->counters
.obuf_bytes
);
351 json_object_int_add(jo
, "obuf-bytes-peak", gfnc
->counters
.obuf_peak
);
352 json_object_int_add(jo
, "connection-closes",
353 gfnc
->counters
.connection_closes
);
354 json_object_int_add(jo
, "connection-errors",
355 gfnc
->counters
.connection_errors
);
356 json_object_int_add(jo
, "data-plane-contexts",
357 gfnc
->counters
.dplane_contexts
);
358 json_object_int_add(jo
, "data-plane-contexts-queue",
359 gfnc
->counters
.ctxqueue_len
);
360 json_object_int_add(jo
, "data-plane-contexts-queue-peak",
361 gfnc
->counters
.ctxqueue_len_peak
);
362 json_object_int_add(jo
, "buffer-full-hits", gfnc
->counters
.buffer_full
);
363 json_object_int_add(jo
, "user-configures",
364 gfnc
->counters
.user_configures
);
365 json_object_int_add(jo
, "user-disables", gfnc
->counters
.user_disables
);
371 static int fpm_write_config(struct vty
*vty
)
373 struct sockaddr_in
*sin
;
374 struct sockaddr_in6
*sin6
;
380 switch (gfnc
->addr
.ss_family
) {
383 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
384 vty_out(vty
, "fpm address %pI4", &sin
->sin_addr
);
385 if (sin
->sin_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
386 vty_out(vty
, " port %d", ntohs(sin
->sin_port
));
392 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
393 vty_out(vty
, "fpm address %pI6", &sin6
->sin6_addr
);
394 if (sin6
->sin6_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
395 vty_out(vty
, " port %d", ntohs(sin6
->sin6_port
));
404 if (!gfnc
->use_nhg
) {
405 vty_out(vty
, "no fpm use-next-hop-groups\n");
412 static struct cmd_node fpm_node
= {
416 .config_write
= fpm_write_config
,
422 static void fpm_connect(struct thread
*t
);
424 static void fpm_reconnect(struct fpm_nl_ctx
*fnc
)
426 /* Cancel all zebra threads first. */
427 thread_cancel_async(zrouter
.master
, &fnc
->t_lspreset
, NULL
);
428 thread_cancel_async(zrouter
.master
, &fnc
->t_lspwalk
, NULL
);
429 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgreset
, NULL
);
430 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgwalk
, NULL
);
431 thread_cancel_async(zrouter
.master
, &fnc
->t_ribreset
, NULL
);
432 thread_cancel_async(zrouter
.master
, &fnc
->t_ribwalk
, NULL
);
433 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacreset
, NULL
);
434 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacwalk
, NULL
);
437 * Grab the lock to empty the streams (data plane might try to
438 * enqueue updates while we are closing).
440 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
442 /* Avoid calling close on `-1`. */
443 if (fnc
->socket
!= -1) {
448 stream_reset(fnc
->ibuf
);
449 stream_reset(fnc
->obuf
);
450 THREAD_OFF(fnc
->t_read
);
451 THREAD_OFF(fnc
->t_write
);
453 /* FPM is disabled, don't attempt to connect. */
457 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
461 static void fpm_read(struct thread
*t
)
463 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
466 /* Let's ignore the input at the moment. */
467 rv
= stream_read_try(fnc
->ibuf
, fnc
->socket
,
468 STREAM_WRITEABLE(fnc
->ibuf
));
469 /* We've got an interruption. */
471 /* Schedule next read. */
472 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
,
473 fnc
->socket
, &fnc
->t_read
);
477 atomic_fetch_add_explicit(&fnc
->counters
.connection_closes
, 1,
478 memory_order_relaxed
);
480 if (IS_ZEBRA_DEBUG_FPM
)
481 zlog_debug("%s: connection closed", __func__
);
487 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
488 memory_order_relaxed
);
489 zlog_warn("%s: connection failure: %s", __func__
,
494 stream_reset(fnc
->ibuf
);
496 /* Account all bytes read. */
497 atomic_fetch_add_explicit(&fnc
->counters
.bytes_read
, rv
,
498 memory_order_relaxed
);
500 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, fnc
->socket
,
504 static void fpm_write(struct thread
*t
)
506 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
512 if (fnc
->connecting
== true) {
514 statuslen
= sizeof(status
);
516 rv
= getsockopt(fnc
->socket
, SOL_SOCKET
, SO_ERROR
, &status
,
518 if (rv
== -1 || status
!= 0) {
520 zlog_warn("%s: connection failed: %s", __func__
,
523 zlog_warn("%s: SO_ERROR failed: %s", __func__
,
526 atomic_fetch_add_explicit(
527 &fnc
->counters
.connection_errors
, 1,
528 memory_order_relaxed
);
534 fnc
->connecting
= false;
537 * Starting with LSPs walk all FPM objects, marking them
538 * as unsent and then replaying them.
540 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
543 /* Permit receiving messages now. */
544 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
,
545 fnc
->socket
, &fnc
->t_read
);
548 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
551 /* Stream is empty: reset pointers and return. */
552 if (STREAM_READABLE(fnc
->obuf
) == 0) {
553 stream_reset(fnc
->obuf
);
557 /* Try to write all at once. */
558 btotal
= stream_get_endp(fnc
->obuf
) -
559 stream_get_getp(fnc
->obuf
);
560 bwritten
= write(fnc
->socket
, stream_pnt(fnc
->obuf
), btotal
);
562 atomic_fetch_add_explicit(
563 &fnc
->counters
.connection_closes
, 1,
564 memory_order_relaxed
);
566 if (IS_ZEBRA_DEBUG_FPM
)
567 zlog_debug("%s: connection closed", __func__
);
570 if (bwritten
== -1) {
571 /* Attempt to continue if blocked by a signal. */
574 /* Receiver is probably slow, lets give it some time. */
575 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
578 atomic_fetch_add_explicit(
579 &fnc
->counters
.connection_errors
, 1,
580 memory_order_relaxed
);
581 zlog_warn("%s: connection failure: %s", __func__
,
588 /* Account all bytes sent. */
589 atomic_fetch_add_explicit(&fnc
->counters
.bytes_sent
, bwritten
,
590 memory_order_relaxed
);
592 /* Account number of bytes free. */
593 atomic_fetch_sub_explicit(&fnc
->counters
.obuf_bytes
, bwritten
,
594 memory_order_relaxed
);
596 stream_forward_getp(fnc
->obuf
, (size_t)bwritten
);
599 /* Stream is not empty yet, we must schedule more writes. */
600 if (STREAM_READABLE(fnc
->obuf
)) {
601 stream_pulldown(fnc
->obuf
);
602 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
,
603 fnc
->socket
, &fnc
->t_write
);
608 static void fpm_connect(struct thread
*t
)
610 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
611 struct sockaddr_in
*sin
= (struct sockaddr_in
*)&fnc
->addr
;
612 struct sockaddr_in6
*sin6
= (struct sockaddr_in6
*)&fnc
->addr
;
615 char addrstr
[INET6_ADDRSTRLEN
];
617 sock
= socket(fnc
->addr
.ss_family
, SOCK_STREAM
, 0);
619 zlog_err("%s: fpm socket failed: %s", __func__
,
621 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
626 set_nonblocking(sock
);
628 if (fnc
->addr
.ss_family
== AF_INET
) {
629 inet_ntop(AF_INET
, &sin
->sin_addr
, addrstr
, sizeof(addrstr
));
632 inet_ntop(AF_INET6
, &sin6
->sin6_addr
, addrstr
, sizeof(addrstr
));
633 slen
= sizeof(*sin6
);
636 if (IS_ZEBRA_DEBUG_FPM
)
637 zlog_debug("%s: attempting to connect to %s:%d", __func__
,
638 addrstr
, ntohs(sin
->sin_port
));
640 rv
= connect(sock
, (struct sockaddr
*)&fnc
->addr
, slen
);
641 if (rv
== -1 && errno
!= EINPROGRESS
) {
642 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
643 memory_order_relaxed
);
645 zlog_warn("%s: fpm connection failed: %s", __func__
,
647 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
652 fnc
->connecting
= (errno
== EINPROGRESS
);
654 if (!fnc
->connecting
)
655 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, sock
,
657 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, sock
,
661 * Starting with LSPs walk all FPM objects, marking them
662 * as unsent and then replaying them.
664 * If we are not connected, then delay the objects reset/send.
666 if (!fnc
->connecting
)
667 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
672 * Encode data plane operation context into netlink and enqueue it in the FPM
675 * @param fnc the netlink FPM context.
676 * @param ctx the data plane operation context data.
677 * @return 0 on success or -1 on not enough space.
679 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
)
681 uint8_t nl_buf
[NL_PKT_BUF_SIZE
];
684 uint64_t obytes
, obytes_peak
;
685 enum dplane_op_e op
= dplane_ctx_get_op(ctx
);
688 * If we were configured to not use next hop groups, then quit as soon
692 && (op
== DPLANE_OP_NH_DELETE
|| op
== DPLANE_OP_NH_INSTALL
693 || op
== DPLANE_OP_NH_UPDATE
))
698 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
701 case DPLANE_OP_ROUTE_UPDATE
:
702 case DPLANE_OP_ROUTE_DELETE
:
703 rv
= netlink_route_multipath_msg_encode(RTM_DELROUTE
, ctx
,
704 nl_buf
, sizeof(nl_buf
),
708 "%s: netlink_route_multipath_msg_encode failed",
713 nl_buf_len
= (size_t)rv
;
715 /* UPDATE operations need a INSTALL, otherwise just quit. */
716 if (op
== DPLANE_OP_ROUTE_DELETE
)
720 case DPLANE_OP_ROUTE_INSTALL
:
721 rv
= netlink_route_multipath_msg_encode(
722 RTM_NEWROUTE
, ctx
, &nl_buf
[nl_buf_len
],
723 sizeof(nl_buf
) - nl_buf_len
, true, fnc
->use_nhg
);
726 "%s: netlink_route_multipath_msg_encode failed",
731 nl_buf_len
+= (size_t)rv
;
734 case DPLANE_OP_MAC_INSTALL
:
735 case DPLANE_OP_MAC_DELETE
:
736 rv
= netlink_macfdb_update_ctx(ctx
, nl_buf
, sizeof(nl_buf
));
738 zlog_err("%s: netlink_macfdb_update_ctx failed",
743 nl_buf_len
= (size_t)rv
;
746 case DPLANE_OP_NH_DELETE
:
747 rv
= netlink_nexthop_msg_encode(RTM_DELNEXTHOP
, ctx
, nl_buf
,
750 zlog_err("%s: netlink_nexthop_msg_encode failed",
755 nl_buf_len
= (size_t)rv
;
757 case DPLANE_OP_NH_INSTALL
:
758 case DPLANE_OP_NH_UPDATE
:
759 rv
= netlink_nexthop_msg_encode(RTM_NEWNEXTHOP
, ctx
, nl_buf
,
762 zlog_err("%s: netlink_nexthop_msg_encode failed",
767 nl_buf_len
= (size_t)rv
;
770 case DPLANE_OP_LSP_INSTALL
:
771 case DPLANE_OP_LSP_UPDATE
:
772 case DPLANE_OP_LSP_DELETE
:
773 rv
= netlink_lsp_msg_encoder(ctx
, nl_buf
, sizeof(nl_buf
));
775 zlog_err("%s: netlink_lsp_msg_encoder failed",
780 nl_buf_len
+= (size_t)rv
;
783 /* Un-handled by FPM at this time. */
784 case DPLANE_OP_PW_INSTALL
:
785 case DPLANE_OP_PW_UNINSTALL
:
786 case DPLANE_OP_ADDR_INSTALL
:
787 case DPLANE_OP_ADDR_UNINSTALL
:
788 case DPLANE_OP_NEIGH_INSTALL
:
789 case DPLANE_OP_NEIGH_UPDATE
:
790 case DPLANE_OP_NEIGH_DELETE
:
791 case DPLANE_OP_VTEP_ADD
:
792 case DPLANE_OP_VTEP_DELETE
:
793 case DPLANE_OP_SYS_ROUTE_ADD
:
794 case DPLANE_OP_SYS_ROUTE_DELETE
:
795 case DPLANE_OP_ROUTE_NOTIFY
:
796 case DPLANE_OP_LSP_NOTIFY
:
797 case DPLANE_OP_RULE_ADD
:
798 case DPLANE_OP_RULE_DELETE
:
799 case DPLANE_OP_RULE_UPDATE
:
800 case DPLANE_OP_NEIGH_DISCOVER
:
801 case DPLANE_OP_BR_PORT_UPDATE
:
802 case DPLANE_OP_IPTABLE_ADD
:
803 case DPLANE_OP_IPTABLE_DELETE
:
804 case DPLANE_OP_IPSET_ADD
:
805 case DPLANE_OP_IPSET_DELETE
:
806 case DPLANE_OP_IPSET_ENTRY_ADD
:
807 case DPLANE_OP_IPSET_ENTRY_DELETE
:
808 case DPLANE_OP_NEIGH_IP_INSTALL
:
809 case DPLANE_OP_NEIGH_IP_DELETE
:
810 case DPLANE_OP_NEIGH_TABLE_UPDATE
:
811 case DPLANE_OP_GRE_SET
:
812 case DPLANE_OP_INTF_ADDR_ADD
:
813 case DPLANE_OP_INTF_ADDR_DEL
:
814 case DPLANE_OP_INTF_NETCONFIG
:
820 /* Skip empty enqueues. */
824 /* We must know if someday a message goes beyond 65KiB. */
825 assert((nl_buf_len
+ FPM_HEADER_SIZE
) <= UINT16_MAX
);
827 /* Check if we have enough buffer space. */
828 if (STREAM_WRITEABLE(fnc
->obuf
) < (nl_buf_len
+ FPM_HEADER_SIZE
)) {
829 atomic_fetch_add_explicit(&fnc
->counters
.buffer_full
, 1,
830 memory_order_relaxed
);
832 if (IS_ZEBRA_DEBUG_FPM
)
834 "%s: buffer full: wants to write %zu but has %zu",
835 __func__
, nl_buf_len
+ FPM_HEADER_SIZE
,
836 STREAM_WRITEABLE(fnc
->obuf
));
842 * Fill in the FPM header information.
844 * See FPM_HEADER_SIZE definition for more information.
846 stream_putc(fnc
->obuf
, 1);
847 stream_putc(fnc
->obuf
, 1);
848 stream_putw(fnc
->obuf
, nl_buf_len
+ FPM_HEADER_SIZE
);
850 /* Write current data. */
851 stream_write(fnc
->obuf
, nl_buf
, (size_t)nl_buf_len
);
853 /* Account number of bytes waiting to be written. */
854 atomic_fetch_add_explicit(&fnc
->counters
.obuf_bytes
,
855 nl_buf_len
+ FPM_HEADER_SIZE
,
856 memory_order_relaxed
);
857 obytes
= atomic_load_explicit(&fnc
->counters
.obuf_bytes
,
858 memory_order_relaxed
);
859 obytes_peak
= atomic_load_explicit(&fnc
->counters
.obuf_peak
,
860 memory_order_relaxed
);
861 if (obytes_peak
< obytes
)
862 atomic_store_explicit(&fnc
->counters
.obuf_peak
, obytes
,
863 memory_order_relaxed
);
865 /* Tell the thread to start writing. */
866 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, fnc
->socket
,
873 * LSP walk/send functions
876 struct zebra_dplane_ctx
*ctx
;
877 struct fpm_nl_ctx
*fnc
;
881 static int fpm_lsp_send_cb(struct hash_bucket
*bucket
, void *arg
)
883 struct zebra_lsp
*lsp
= bucket
->data
;
884 struct fpm_lsp_arg
*fla
= arg
;
886 /* Skip entries which have already been sent */
887 if (CHECK_FLAG(lsp
->flags
, LSP_FLAG_FPM
))
888 return HASHWALK_CONTINUE
;
890 dplane_ctx_reset(fla
->ctx
);
891 dplane_ctx_lsp_init(fla
->ctx
, DPLANE_OP_LSP_INSTALL
, lsp
);
893 if (fpm_nl_enqueue(fla
->fnc
, fla
->ctx
) == -1) {
894 fla
->complete
= false;
895 return HASHWALK_ABORT
;
898 /* Mark entry as sent */
899 SET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
900 return HASHWALK_CONTINUE
;
903 static void fpm_lsp_send(struct thread
*t
)
905 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
906 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
907 struct fpm_lsp_arg fla
;
910 fla
.ctx
= dplane_ctx_alloc();
913 hash_walk(zvrf
->lsp_table
, fpm_lsp_send_cb
, &fla
);
915 dplane_ctx_fini(&fla
.ctx
);
918 WALK_FINISH(fnc
, FNE_LSP_FINISHED
);
920 /* Now move onto routes */
921 thread_add_timer(zrouter
.master
, fpm_nhg_reset
, fnc
, 0,
924 /* Didn't finish - reschedule LSP walk */
925 thread_add_timer(zrouter
.master
, fpm_lsp_send
, fnc
, 0,
931 * Next hop walk/send functions.
934 struct zebra_dplane_ctx
*ctx
;
935 struct fpm_nl_ctx
*fnc
;
939 static int fpm_nhg_send_cb(struct hash_bucket
*bucket
, void *arg
)
941 struct nhg_hash_entry
*nhe
= bucket
->data
;
942 struct fpm_nhg_arg
*fna
= arg
;
944 /* This entry was already sent, skip it. */
945 if (CHECK_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
))
946 return HASHWALK_CONTINUE
;
948 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
949 dplane_ctx_reset(fna
->ctx
);
950 dplane_ctx_nexthop_init(fna
->ctx
, DPLANE_OP_NH_INSTALL
, nhe
);
951 if (fpm_nl_enqueue(fna
->fnc
, fna
->ctx
) == -1) {
952 /* Our buffers are full, lets give it some cycles. */
953 fna
->complete
= false;
954 return HASHWALK_ABORT
;
957 /* Mark group as sent, so it doesn't get sent again. */
958 SET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
960 return HASHWALK_CONTINUE
;
963 static void fpm_nhg_send(struct thread
*t
)
965 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
966 struct fpm_nhg_arg fna
;
969 fna
.ctx
= dplane_ctx_alloc();
972 /* Send next hops. */
974 hash_walk(zrouter
.nhgs_id
, fpm_nhg_send_cb
, &fna
);
976 /* `free()` allocated memory. */
977 dplane_ctx_fini(&fna
.ctx
);
979 /* We are done sending next hops, lets install the routes now. */
981 WALK_FINISH(fnc
, FNE_NHG_FINISHED
);
982 thread_add_timer(zrouter
.master
, fpm_rib_reset
, fnc
, 0,
984 } else /* Otherwise reschedule next hop group again. */
985 thread_add_timer(zrouter
.master
, fpm_nhg_send
, fnc
, 0,
990 * Send all RIB installed routes to the connected data plane.
992 static void fpm_rib_send(struct thread
*t
)
994 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
996 struct route_node
*rn
;
997 struct route_table
*rt
;
998 struct zebra_dplane_ctx
*ctx
;
999 rib_tables_iter_t rt_iter
;
1001 /* Allocate temporary context for all transactions. */
1002 ctx
= dplane_ctx_alloc();
1004 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1005 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1006 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1007 dest
= rib_dest_from_rnode(rn
);
1008 /* Skip bad route entries. */
1009 if (dest
== NULL
|| dest
->selected_fib
== NULL
)
1012 /* Check for already sent routes. */
1013 if (CHECK_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
))
1016 /* Enqueue route install. */
1017 dplane_ctx_reset(ctx
);
1018 dplane_ctx_route_init(ctx
, DPLANE_OP_ROUTE_INSTALL
, rn
,
1019 dest
->selected_fib
);
1020 if (fpm_nl_enqueue(fnc
, ctx
) == -1) {
1021 /* Free the temporary allocated context. */
1022 dplane_ctx_fini(&ctx
);
1024 thread_add_timer(zrouter
.master
, fpm_rib_send
,
1025 fnc
, 1, &fnc
->t_ribwalk
);
1030 SET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1034 /* Free the temporary allocated context. */
1035 dplane_ctx_fini(&ctx
);
1037 /* All RIB routes sent! */
1038 WALK_FINISH(fnc
, FNE_RIB_FINISHED
);
1040 /* Schedule next event: RMAC reset. */
1041 thread_add_event(zrouter
.master
, fpm_rmac_reset
, fnc
, 0,
1046 * The next three functions will handle RMAC enqueue.
1048 struct fpm_rmac_arg
{
1049 struct zebra_dplane_ctx
*ctx
;
1050 struct fpm_nl_ctx
*fnc
;
1051 struct zebra_l3vni
*zl3vni
;
1055 static void fpm_enqueue_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1057 struct fpm_rmac_arg
*fra
= arg
;
1058 struct zebra_mac
*zrmac
= bucket
->data
;
1059 struct zebra_if
*zif
= fra
->zl3vni
->vxlan_if
->info
;
1060 const struct zebra_l2info_vxlan
*vxl
= &zif
->l2info
.vxl
;
1061 struct zebra_if
*br_zif
;
1065 /* Entry already sent. */
1066 if (CHECK_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
) || !fra
->complete
)
1069 sticky
= !!CHECK_FLAG(zrmac
->flags
,
1070 (ZEBRA_MAC_STICKY
| ZEBRA_MAC_REMOTE_DEF_GW
));
1071 br_zif
= (struct zebra_if
*)(zif
->brslave_info
.br_if
->info
);
1072 vid
= IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif
) ? vxl
->access_vlan
: 0;
1074 dplane_ctx_reset(fra
->ctx
);
1075 dplane_ctx_set_op(fra
->ctx
, DPLANE_OP_MAC_INSTALL
);
1076 dplane_mac_init(fra
->ctx
, fra
->zl3vni
->vxlan_if
,
1077 zif
->brslave_info
.br_if
, vid
,
1078 &zrmac
->macaddr
, zrmac
->fwd_info
.r_vtep_ip
, sticky
,
1079 0 /*nhg*/, 0 /*update_flags*/);
1080 if (fpm_nl_enqueue(fra
->fnc
, fra
->ctx
) == -1) {
1081 thread_add_timer(zrouter
.master
, fpm_rmac_send
,
1082 fra
->fnc
, 1, &fra
->fnc
->t_rmacwalk
);
1083 fra
->complete
= false;
1087 static void fpm_enqueue_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1089 struct fpm_rmac_arg
*fra
= arg
;
1090 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1092 fra
->zl3vni
= zl3vni
;
1093 hash_iterate(zl3vni
->rmac_table
, fpm_enqueue_rmac_table
, zl3vni
);
1096 static void fpm_rmac_send(struct thread
*t
)
1098 struct fpm_rmac_arg fra
;
1100 fra
.fnc
= THREAD_ARG(t
);
1101 fra
.ctx
= dplane_ctx_alloc();
1102 fra
.complete
= true;
1103 hash_iterate(zrouter
.l3vni_table
, fpm_enqueue_l3vni_table
, &fra
);
1104 dplane_ctx_fini(&fra
.ctx
);
1106 /* RMAC walk completed. */
1108 WALK_FINISH(fra
.fnc
, FNE_RMAC_FINISHED
);
1112 * Resets the next hop FPM flags so we send all next hops again.
1114 static void fpm_nhg_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1116 struct nhg_hash_entry
*nhe
= bucket
->data
;
1118 /* Unset FPM installation flag so it gets installed again. */
1119 UNSET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
1122 static void fpm_nhg_reset(struct thread
*t
)
1124 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1126 hash_iterate(zrouter
.nhgs_id
, fpm_nhg_reset_cb
, NULL
);
1128 /* Schedule next step: send next hop groups. */
1129 thread_add_event(zrouter
.master
, fpm_nhg_send
, fnc
, 0, &fnc
->t_nhgwalk
);
1133 * Resets the LSP FPM flag so we send all LSPs again.
1135 static void fpm_lsp_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1137 struct zebra_lsp
*lsp
= bucket
->data
;
1139 UNSET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
1142 static void fpm_lsp_reset(struct thread
*t
)
1144 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1145 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
1147 hash_iterate(zvrf
->lsp_table
, fpm_lsp_reset_cb
, NULL
);
1149 /* Schedule next step: send LSPs */
1150 thread_add_event(zrouter
.master
, fpm_lsp_send
, fnc
, 0, &fnc
->t_lspwalk
);
1154 * Resets the RIB FPM flags so we send all routes again.
1156 static void fpm_rib_reset(struct thread
*t
)
1158 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1160 struct route_node
*rn
;
1161 struct route_table
*rt
;
1162 rib_tables_iter_t rt_iter
;
1164 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1165 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1166 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1167 dest
= rib_dest_from_rnode(rn
);
1168 /* Skip bad route entries. */
1172 UNSET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1176 /* Schedule next step: send RIB routes. */
1177 thread_add_event(zrouter
.master
, fpm_rib_send
, fnc
, 0, &fnc
->t_ribwalk
);
1181 * The next three function will handle RMAC table reset.
1183 static void fpm_unset_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1185 struct zebra_mac
*zrmac
= bucket
->data
;
1187 UNSET_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
);
1190 static void fpm_unset_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1192 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1194 hash_iterate(zl3vni
->rmac_table
, fpm_unset_rmac_table
, zl3vni
);
1197 static void fpm_rmac_reset(struct thread
*t
)
1199 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1201 hash_iterate(zrouter
.l3vni_table
, fpm_unset_l3vni_table
, NULL
);
1203 /* Schedule next event: send RMAC entries. */
1204 thread_add_event(zrouter
.master
, fpm_rmac_send
, fnc
, 0,
1208 static void fpm_process_queue(struct thread
*t
)
1210 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1211 struct zebra_dplane_ctx
*ctx
;
1212 bool no_bufs
= false;
1213 uint64_t processed_contexts
= 0;
1216 /* No space available yet. */
1217 if (STREAM_WRITEABLE(fnc
->obuf
) < NL_PKT_BUF_SIZE
) {
1222 /* Dequeue next item or quit processing. */
1223 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1224 ctx
= dplane_ctx_dequeue(&fnc
->ctxqueue
);
1230 * Intentionally ignoring the return value
1231 * as that we are ensuring that we can write to
1232 * the output data in the STREAM_WRITEABLE
1233 * check above, so we can ignore the return
1235 if (fnc
->socket
!= -1)
1236 (void)fpm_nl_enqueue(fnc
, ctx
);
1238 /* Account the processed entries. */
1239 processed_contexts
++;
1240 atomic_fetch_sub_explicit(&fnc
->counters
.ctxqueue_len
, 1,
1241 memory_order_relaxed
);
1243 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1244 dplane_provider_enqueue_out_ctx(fnc
->prov
, ctx
);
1247 /* Update count of processed contexts */
1248 atomic_fetch_add_explicit(&fnc
->counters
.dplane_contexts
,
1249 processed_contexts
, memory_order_relaxed
);
1251 /* Re-schedule if we ran out of buffer space */
1253 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1254 fnc
, 0, &fnc
->t_dequeue
);
1257 * Let the dataplane thread know if there are items in the
1258 * output queue to be processed. Otherwise they may sit
1259 * until the dataplane thread gets scheduled for new,
1262 if (dplane_provider_out_ctx_queue_len(fnc
->prov
) > 0)
1263 dplane_provider_work_ready();
1267 * Handles external (e.g. CLI, data plane or others) events.
1269 static void fpm_process_event(struct thread
*t
)
1271 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1272 int event
= THREAD_VAL(t
);
1276 zlog_info("%s: manual FPM disable event", __func__
);
1277 fnc
->disabled
= true;
1278 atomic_fetch_add_explicit(&fnc
->counters
.user_disables
, 1,
1279 memory_order_relaxed
);
1281 /* Call reconnect to disable timers and clean up context. */
1286 zlog_info("%s: manual FPM reconnect event", __func__
);
1287 fnc
->disabled
= false;
1288 atomic_fetch_add_explicit(&fnc
->counters
.user_configures
, 1,
1289 memory_order_relaxed
);
1293 case FNE_RESET_COUNTERS
:
1294 zlog_info("%s: manual FPM counters reset event", __func__
);
1295 memset(&fnc
->counters
, 0, sizeof(fnc
->counters
));
1298 case FNE_TOGGLE_NHG
:
1299 zlog_info("%s: toggle next hop groups support", __func__
);
1300 fnc
->use_nhg
= !fnc
->use_nhg
;
1304 case FNE_INTERNAL_RECONNECT
:
1308 case FNE_NHG_FINISHED
:
1309 if (IS_ZEBRA_DEBUG_FPM
)
1310 zlog_debug("%s: next hop groups walk finished",
1313 case FNE_RIB_FINISHED
:
1314 if (IS_ZEBRA_DEBUG_FPM
)
1315 zlog_debug("%s: RIB walk finished", __func__
);
1317 case FNE_RMAC_FINISHED
:
1318 if (IS_ZEBRA_DEBUG_FPM
)
1319 zlog_debug("%s: RMAC walk finished", __func__
);
1321 case FNE_LSP_FINISHED
:
1322 if (IS_ZEBRA_DEBUG_FPM
)
1323 zlog_debug("%s: LSP walk finished", __func__
);
1327 if (IS_ZEBRA_DEBUG_FPM
)
1328 zlog_debug("%s: unhandled event %d", __func__
, event
);
1334 * Data plane functions.
1336 static int fpm_nl_start(struct zebra_dplane_provider
*prov
)
1338 struct fpm_nl_ctx
*fnc
;
1340 fnc
= dplane_provider_get_data(prov
);
1341 fnc
->fthread
= frr_pthread_new(NULL
, prov_name
, prov_name
);
1342 assert(frr_pthread_run(fnc
->fthread
, NULL
) == 0);
1343 fnc
->ibuf
= stream_new(NL_PKT_BUF_SIZE
);
1344 fnc
->obuf
= stream_new(NL_PKT_BUF_SIZE
* 128);
1345 pthread_mutex_init(&fnc
->obuf_mutex
, NULL
);
1347 fnc
->disabled
= true;
1349 TAILQ_INIT(&fnc
->ctxqueue
);
1350 pthread_mutex_init(&fnc
->ctxqueue_mutex
, NULL
);
1352 /* Set default values. */
1353 fnc
->use_nhg
= true;
1358 static int fpm_nl_finish_early(struct fpm_nl_ctx
*fnc
)
1360 /* Disable all events and close socket. */
1361 THREAD_OFF(fnc
->t_lspreset
);
1362 THREAD_OFF(fnc
->t_lspwalk
);
1363 THREAD_OFF(fnc
->t_nhgreset
);
1364 THREAD_OFF(fnc
->t_nhgwalk
);
1365 THREAD_OFF(fnc
->t_ribreset
);
1366 THREAD_OFF(fnc
->t_ribwalk
);
1367 THREAD_OFF(fnc
->t_rmacreset
);
1368 THREAD_OFF(fnc
->t_rmacwalk
);
1369 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_read
, NULL
);
1370 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_write
, NULL
);
1371 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_connect
, NULL
);
1373 if (fnc
->socket
!= -1) {
1381 static int fpm_nl_finish_late(struct fpm_nl_ctx
*fnc
)
1383 /* Stop the running thread. */
1384 frr_pthread_stop(fnc
->fthread
, NULL
);
1386 /* Free all allocated resources. */
1387 pthread_mutex_destroy(&fnc
->obuf_mutex
);
1388 pthread_mutex_destroy(&fnc
->ctxqueue_mutex
);
1389 stream_free(fnc
->ibuf
);
1390 stream_free(fnc
->obuf
);
1397 static int fpm_nl_finish(struct zebra_dplane_provider
*prov
, bool early
)
1399 struct fpm_nl_ctx
*fnc
;
1401 fnc
= dplane_provider_get_data(prov
);
1403 return fpm_nl_finish_early(fnc
);
1405 return fpm_nl_finish_late(fnc
);
1408 static int fpm_nl_process(struct zebra_dplane_provider
*prov
)
1410 struct zebra_dplane_ctx
*ctx
;
1411 struct fpm_nl_ctx
*fnc
;
1413 uint64_t cur_queue
, peak_queue
= 0, stored_peak_queue
;
1415 fnc
= dplane_provider_get_data(prov
);
1416 limit
= dplane_provider_get_work_limit(prov
);
1417 for (counter
= 0; counter
< limit
; counter
++) {
1418 ctx
= dplane_provider_dequeue_in_ctx(prov
);
1423 * Skip all notifications if not connected, we'll walk the RIB
1426 if (fnc
->socket
!= -1 && fnc
->connecting
== false) {
1428 * Update the number of queued contexts *before*
1429 * enqueueing, to ensure counter consistency.
1431 atomic_fetch_add_explicit(&fnc
->counters
.ctxqueue_len
,
1432 1, memory_order_relaxed
);
1434 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1435 dplane_ctx_enqueue_tail(&fnc
->ctxqueue
, ctx
);
1438 cur_queue
= atomic_load_explicit(
1439 &fnc
->counters
.ctxqueue_len
,
1440 memory_order_relaxed
);
1441 if (peak_queue
< cur_queue
)
1442 peak_queue
= cur_queue
;
1446 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1447 dplane_provider_enqueue_out_ctx(prov
, ctx
);
1450 /* Update peak queue length, if we just observed a new peak */
1451 stored_peak_queue
= atomic_load_explicit(
1452 &fnc
->counters
.ctxqueue_len_peak
, memory_order_relaxed
);
1453 if (stored_peak_queue
< peak_queue
)
1454 atomic_store_explicit(&fnc
->counters
.ctxqueue_len_peak
,
1455 peak_queue
, memory_order_relaxed
);
1457 if (atomic_load_explicit(&fnc
->counters
.ctxqueue_len
,
1458 memory_order_relaxed
)
1460 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1461 fnc
, 0, &fnc
->t_dequeue
);
1463 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1464 if (counter
>= limit
)
1465 dplane_provider_work_ready();
1470 static int fpm_nl_new(struct thread_master
*tm
)
1472 struct zebra_dplane_provider
*prov
= NULL
;
1475 gfnc
= calloc(1, sizeof(*gfnc
));
1476 rv
= dplane_provider_register(prov_name
, DPLANE_PRIO_POSTPROCESS
,
1477 DPLANE_PROV_FLAG_THREADED
, fpm_nl_start
,
1478 fpm_nl_process
, fpm_nl_finish
, gfnc
,
1481 if (IS_ZEBRA_DEBUG_DPLANE
)
1482 zlog_debug("%s register status: %d", prov_name
, rv
);
1484 install_node(&fpm_node
);
1485 install_element(ENABLE_NODE
, &fpm_show_counters_cmd
);
1486 install_element(ENABLE_NODE
, &fpm_show_counters_json_cmd
);
1487 install_element(ENABLE_NODE
, &fpm_reset_counters_cmd
);
1488 install_element(CONFIG_NODE
, &fpm_set_address_cmd
);
1489 install_element(CONFIG_NODE
, &no_fpm_set_address_cmd
);
1490 install_element(CONFIG_NODE
, &fpm_use_nhg_cmd
);
1491 install_element(CONFIG_NODE
, &no_fpm_use_nhg_cmd
);
1496 static int fpm_nl_init(void)
1498 hook_register(frr_late_init
, fpm_nl_new
);
1503 .name
= "dplane_fpm_nl",
1505 .description
= "Data plane plugin for FPM using netlink.",
1506 .init
= fpm_nl_init
,