2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "config.h" /* Include this explicitly */
26 #include <arpa/inet.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
34 #include "lib/zebra.h"
36 #include "lib/libfrr.h"
37 #include "lib/frratomic.h"
38 #include "lib/command.h"
39 #include "lib/memory.h"
40 #include "lib/network.h"
42 #include "lib/frr_pthread.h"
43 #include "zebra/debug.h"
44 #include "zebra/interface.h"
45 #include "zebra/zebra_dplane.h"
46 #include "zebra/zebra_mpls.h"
47 #include "zebra/zebra_router.h"
48 #include "zebra/zebra_evpn.h"
49 #include "zebra/zebra_evpn_mac.h"
50 #include "zebra/zebra_vxlan_private.h"
51 #include "zebra/kernel_netlink.h"
52 #include "zebra/rt_netlink.h"
53 #include "zebra/debug.h"
55 #define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
56 #define SOUTHBOUND_DEFAULT_PORT 2620
61 * version: 1 byte (always 1),
62 * type: 1 byte (1 for netlink, 2 protobuf),
63 * len: 2 bytes (network order),
66 * This header is used with any format to tell the users how many bytes to
69 #define FPM_HEADER_SIZE 4
71 static const char *prov_name
= "dplane_fpm_nl";
74 /* data plane connection. */
79 struct sockaddr_storage addr
;
81 /* data plane buffers. */
84 pthread_mutex_t obuf_mutex
;
87 * data plane context queue:
88 * When a FPM server connection becomes a bottleneck, we must keep the
89 * data plane contexts until we get a chance to process them.
91 struct dplane_ctx_q ctxqueue
;
92 pthread_mutex_t ctxqueue_mutex
;
94 /* data plane events. */
95 struct zebra_dplane_provider
*prov
;
96 struct frr_pthread
*fthread
;
97 struct thread
*t_connect
;
98 struct thread
*t_read
;
99 struct thread
*t_write
;
100 struct thread
*t_event
;
101 struct thread
*t_dequeue
;
104 struct thread
*t_lspreset
;
105 struct thread
*t_lspwalk
;
106 struct thread
*t_nhgreset
;
107 struct thread
*t_nhgwalk
;
108 struct thread
*t_ribreset
;
109 struct thread
*t_ribwalk
;
110 struct thread
*t_rmacreset
;
111 struct thread
*t_rmacwalk
;
113 /* Statistic counters. */
115 /* Amount of bytes read into ibuf. */
116 _Atomic
uint32_t bytes_read
;
117 /* Amount of bytes written from obuf. */
118 _Atomic
uint32_t bytes_sent
;
119 /* Output buffer current usage. */
120 _Atomic
uint32_t obuf_bytes
;
121 /* Output buffer peak usage. */
122 _Atomic
uint32_t obuf_peak
;
124 /* Amount of connection closes. */
125 _Atomic
uint32_t connection_closes
;
126 /* Amount of connection errors. */
127 _Atomic
uint32_t connection_errors
;
129 /* Amount of user configurations: FNE_RECONNECT. */
130 _Atomic
uint32_t user_configures
;
131 /* Amount of user disable requests: FNE_DISABLE. */
132 _Atomic
uint32_t user_disables
;
134 /* Amount of data plane context processed. */
135 _Atomic
uint32_t dplane_contexts
;
136 /* Amount of data plane contexts enqueued. */
137 _Atomic
uint32_t ctxqueue_len
;
138 /* Peak amount of data plane contexts enqueued. */
139 _Atomic
uint32_t ctxqueue_len_peak
;
141 /* Amount of buffer full events. */
142 _Atomic
uint32_t buffer_full
;
147 /* Ask for FPM to reconnect the external server. */
151 /* Reset counters. */
153 /* Toggle next hop group feature. */
155 /* Reconnect request by our own code to avoid races. */
156 FNE_INTERNAL_RECONNECT
,
158 /* LSP walk finished. */
160 /* Next hop groups walk finished. */
162 /* RIB walk finished. */
164 /* RMAC walk finished. */
168 #define FPM_RECONNECT(fnc) \
169 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
170 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
172 #define WALK_FINISH(fnc, ev) \
173 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
179 static int fpm_process_event(struct thread
*t
);
180 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
);
181 static int fpm_lsp_send(struct thread
*t
);
182 static int fpm_lsp_reset(struct thread
*t
);
183 static int fpm_nhg_send(struct thread
*t
);
184 static int fpm_nhg_reset(struct thread
*t
);
185 static int fpm_rib_send(struct thread
*t
);
186 static int fpm_rib_reset(struct thread
*t
);
187 static int fpm_rmac_send(struct thread
*t
);
188 static int fpm_rmac_reset(struct thread
*t
);
193 #define FPM_STR "Forwarding Plane Manager configuration\n"
195 DEFUN(fpm_set_address
, fpm_set_address_cmd
,
196 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
198 "FPM remote listening server address\n"
199 "Remote IPv4 FPM server\n"
200 "Remote IPv6 FPM server\n"
201 "FPM remote listening server port\n"
202 "Remote FPM server port\n")
204 struct sockaddr_in
*sin
;
205 struct sockaddr_in6
*sin6
;
207 uint8_t naddr
[INET6_BUFSIZ
];
210 port
= strtol(argv
[4]->arg
, NULL
, 10);
212 /* Handle IPv4 addresses. */
213 if (inet_pton(AF_INET
, argv
[2]->arg
, naddr
) == 1) {
214 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
216 memset(sin
, 0, sizeof(*sin
));
217 sin
->sin_family
= AF_INET
;
219 port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
220 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
221 sin
->sin_len
= sizeof(*sin
);
222 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
223 memcpy(&sin
->sin_addr
, naddr
, sizeof(sin
->sin_addr
));
228 /* Handle IPv6 addresses. */
229 if (inet_pton(AF_INET6
, argv
[2]->arg
, naddr
) != 1) {
230 vty_out(vty
, "%% Invalid address: %s\n", argv
[2]->arg
);
234 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
235 memset(sin6
, 0, sizeof(*sin6
));
236 sin6
->sin6_family
= AF_INET6
;
237 sin6
->sin6_port
= port
? htons(port
) : htons(SOUTHBOUND_DEFAULT_PORT
);
238 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
239 sin6
->sin6_len
= sizeof(*sin6
);
240 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
241 memcpy(&sin6
->sin6_addr
, naddr
, sizeof(sin6
->sin6_addr
));
244 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
245 FNE_RECONNECT
, &gfnc
->t_event
);
249 DEFUN(no_fpm_set_address
, no_fpm_set_address_cmd
,
250 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
253 "FPM remote listening server address\n"
254 "Remote IPv4 FPM server\n"
255 "Remote IPv6 FPM server\n"
256 "FPM remote listening server port\n"
257 "Remote FPM server port\n")
259 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
260 FNE_DISABLE
, &gfnc
->t_event
);
264 DEFUN(fpm_use_nhg
, fpm_use_nhg_cmd
,
265 "fpm use-next-hop-groups",
267 "Use netlink next hop groups feature.\n")
269 /* Already enabled. */
273 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
274 FNE_TOGGLE_NHG
, &gfnc
->t_event
);
279 DEFUN(no_fpm_use_nhg
, no_fpm_use_nhg_cmd
,
280 "no fpm use-next-hop-groups",
283 "Use netlink next hop groups feature.\n")
285 /* Already disabled. */
289 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
290 FNE_TOGGLE_NHG
, &gfnc
->t_event
);
295 DEFUN(fpm_reset_counters
, fpm_reset_counters_cmd
,
296 "clear fpm counters",
299 "FPM statistic counters\n")
301 thread_add_event(gfnc
->fthread
->master
, fpm_process_event
, gfnc
,
302 FNE_RESET_COUNTERS
, &gfnc
->t_event
);
306 DEFUN(fpm_show_counters
, fpm_show_counters_cmd
,
310 "FPM statistic counters\n")
312 vty_out(vty
, "%30s\n%30s\n", "FPM counters", "============");
314 #define SHOW_COUNTER(label, counter) \
315 vty_out(vty, "%28s: %u\n", (label), (counter))
317 SHOW_COUNTER("Input bytes", gfnc
->counters
.bytes_read
);
318 SHOW_COUNTER("Output bytes", gfnc
->counters
.bytes_sent
);
319 SHOW_COUNTER("Output buffer current size", gfnc
->counters
.obuf_bytes
);
320 SHOW_COUNTER("Output buffer peak size", gfnc
->counters
.obuf_peak
);
321 SHOW_COUNTER("Connection closes", gfnc
->counters
.connection_closes
);
322 SHOW_COUNTER("Connection errors", gfnc
->counters
.connection_errors
);
323 SHOW_COUNTER("Data plane items processed",
324 gfnc
->counters
.dplane_contexts
);
325 SHOW_COUNTER("Data plane items enqueued",
326 gfnc
->counters
.ctxqueue_len
);
327 SHOW_COUNTER("Data plane items queue peak",
328 gfnc
->counters
.ctxqueue_len_peak
);
329 SHOW_COUNTER("Buffer full hits", gfnc
->counters
.buffer_full
);
330 SHOW_COUNTER("User FPM configurations", gfnc
->counters
.user_configures
);
331 SHOW_COUNTER("User FPM disable requests", gfnc
->counters
.user_disables
);
338 DEFUN(fpm_show_counters_json
, fpm_show_counters_json_cmd
,
339 "show fpm counters json",
342 "FPM statistic counters\n"
345 struct json_object
*jo
;
347 jo
= json_object_new_object();
348 json_object_int_add(jo
, "bytes-read", gfnc
->counters
.bytes_read
);
349 json_object_int_add(jo
, "bytes-sent", gfnc
->counters
.bytes_sent
);
350 json_object_int_add(jo
, "obuf-bytes", gfnc
->counters
.obuf_bytes
);
351 json_object_int_add(jo
, "obuf-bytes-peak", gfnc
->counters
.obuf_peak
);
352 json_object_int_add(jo
, "connection-closes",
353 gfnc
->counters
.connection_closes
);
354 json_object_int_add(jo
, "connection-errors",
355 gfnc
->counters
.connection_errors
);
356 json_object_int_add(jo
, "data-plane-contexts",
357 gfnc
->counters
.dplane_contexts
);
358 json_object_int_add(jo
, "data-plane-contexts-queue",
359 gfnc
->counters
.ctxqueue_len
);
360 json_object_int_add(jo
, "data-plane-contexts-queue-peak",
361 gfnc
->counters
.ctxqueue_len_peak
);
362 json_object_int_add(jo
, "buffer-full-hits", gfnc
->counters
.buffer_full
);
363 json_object_int_add(jo
, "user-configures",
364 gfnc
->counters
.user_configures
);
365 json_object_int_add(jo
, "user-disables", gfnc
->counters
.user_disables
);
366 vty_out(vty
, "%s\n", json_object_to_json_string_ext(jo
, 0));
367 json_object_free(jo
);
372 static int fpm_write_config(struct vty
*vty
)
374 struct sockaddr_in
*sin
;
375 struct sockaddr_in6
*sin6
;
381 switch (gfnc
->addr
.ss_family
) {
384 sin
= (struct sockaddr_in
*)&gfnc
->addr
;
385 vty_out(vty
, "fpm address %pI4", &sin
->sin_addr
);
386 if (sin
->sin_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
387 vty_out(vty
, " port %d", ntohs(sin
->sin_port
));
393 sin6
= (struct sockaddr_in6
*)&gfnc
->addr
;
394 vty_out(vty
, "fpm address %pI6", &sin6
->sin6_addr
);
395 if (sin6
->sin6_port
!= htons(SOUTHBOUND_DEFAULT_PORT
))
396 vty_out(vty
, " port %d", ntohs(sin6
->sin6_port
));
405 if (!gfnc
->use_nhg
) {
406 vty_out(vty
, "no fpm use-next-hop-groups\n");
413 static struct cmd_node fpm_node
= {
417 .config_write
= fpm_write_config
,
423 static int fpm_connect(struct thread
*t
);
425 static void fpm_reconnect(struct fpm_nl_ctx
*fnc
)
427 /* Cancel all zebra threads first. */
428 thread_cancel_async(zrouter
.master
, &fnc
->t_lspreset
, NULL
);
429 thread_cancel_async(zrouter
.master
, &fnc
->t_lspwalk
, NULL
);
430 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgreset
, NULL
);
431 thread_cancel_async(zrouter
.master
, &fnc
->t_nhgwalk
, NULL
);
432 thread_cancel_async(zrouter
.master
, &fnc
->t_ribreset
, NULL
);
433 thread_cancel_async(zrouter
.master
, &fnc
->t_ribwalk
, NULL
);
434 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacreset
, NULL
);
435 thread_cancel_async(zrouter
.master
, &fnc
->t_rmacwalk
, NULL
);
438 * Grab the lock to empty the streams (data plane might try to
439 * enqueue updates while we are closing).
441 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
443 /* Avoid calling close on `-1`. */
444 if (fnc
->socket
!= -1) {
449 stream_reset(fnc
->ibuf
);
450 stream_reset(fnc
->obuf
);
451 THREAD_OFF(fnc
->t_read
);
452 THREAD_OFF(fnc
->t_write
);
454 /* FPM is disabled, don't attempt to connect. */
458 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
462 static int fpm_read(struct thread
*t
)
464 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
467 /* Let's ignore the input at the moment. */
468 rv
= stream_read_try(fnc
->ibuf
, fnc
->socket
,
469 STREAM_WRITEABLE(fnc
->ibuf
));
470 /* We've got an interruption. */
472 /* Schedule next read. */
473 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
,
474 fnc
->socket
, &fnc
->t_read
);
478 atomic_fetch_add_explicit(&fnc
->counters
.connection_closes
, 1,
479 memory_order_relaxed
);
481 if (IS_ZEBRA_DEBUG_FPM
)
482 zlog_debug("%s: connection closed", __func__
);
488 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
489 memory_order_relaxed
);
490 zlog_warn("%s: connection failure: %s", __func__
,
495 stream_reset(fnc
->ibuf
);
497 /* Account all bytes read. */
498 atomic_fetch_add_explicit(&fnc
->counters
.bytes_read
, rv
,
499 memory_order_relaxed
);
501 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, fnc
->socket
,
507 static int fpm_write(struct thread
*t
)
509 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
515 if (fnc
->connecting
== true) {
517 statuslen
= sizeof(status
);
519 rv
= getsockopt(fnc
->socket
, SOL_SOCKET
, SO_ERROR
, &status
,
521 if (rv
== -1 || status
!= 0) {
523 zlog_warn("%s: connection failed: %s", __func__
,
526 zlog_warn("%s: SO_ERROR failed: %s", __func__
,
529 atomic_fetch_add_explicit(
530 &fnc
->counters
.connection_errors
, 1,
531 memory_order_relaxed
);
537 fnc
->connecting
= false;
540 * Starting with LSPs walk all FPM objects, marking them
541 * as unsent and then replaying them.
543 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
546 /* Permit receiving messages now. */
547 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
,
548 fnc
->socket
, &fnc
->t_read
);
551 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
554 /* Stream is empty: reset pointers and return. */
555 if (STREAM_READABLE(fnc
->obuf
) == 0) {
556 stream_reset(fnc
->obuf
);
560 /* Try to write all at once. */
561 btotal
= stream_get_endp(fnc
->obuf
) -
562 stream_get_getp(fnc
->obuf
);
563 bwritten
= write(fnc
->socket
, stream_pnt(fnc
->obuf
), btotal
);
565 atomic_fetch_add_explicit(
566 &fnc
->counters
.connection_closes
, 1,
567 memory_order_relaxed
);
569 if (IS_ZEBRA_DEBUG_FPM
)
570 zlog_debug("%s: connection closed", __func__
);
573 if (bwritten
== -1) {
574 /* Attempt to continue if blocked by a signal. */
577 /* Receiver is probably slow, lets give it some time. */
578 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
581 atomic_fetch_add_explicit(
582 &fnc
->counters
.connection_errors
, 1,
583 memory_order_relaxed
);
584 zlog_warn("%s: connection failure: %s", __func__
,
591 /* Account all bytes sent. */
592 atomic_fetch_add_explicit(&fnc
->counters
.bytes_sent
, bwritten
,
593 memory_order_relaxed
);
595 /* Account number of bytes free. */
596 atomic_fetch_sub_explicit(&fnc
->counters
.obuf_bytes
, bwritten
,
597 memory_order_relaxed
);
599 stream_forward_getp(fnc
->obuf
, (size_t)bwritten
);
602 /* Stream is not empty yet, we must schedule more writes. */
603 if (STREAM_READABLE(fnc
->obuf
)) {
604 stream_pulldown(fnc
->obuf
);
605 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
,
606 fnc
->socket
, &fnc
->t_write
);
613 static int fpm_connect(struct thread
*t
)
615 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
616 struct sockaddr_in
*sin
= (struct sockaddr_in
*)&fnc
->addr
;
617 struct sockaddr_in6
*sin6
= (struct sockaddr_in6
*)&fnc
->addr
;
620 char addrstr
[INET6_ADDRSTRLEN
];
622 sock
= socket(fnc
->addr
.ss_family
, SOCK_STREAM
, 0);
624 zlog_err("%s: fpm socket failed: %s", __func__
,
626 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
631 set_nonblocking(sock
);
633 if (fnc
->addr
.ss_family
== AF_INET
) {
634 inet_ntop(AF_INET
, &sin
->sin_addr
, addrstr
, sizeof(addrstr
));
637 inet_ntop(AF_INET6
, &sin6
->sin6_addr
, addrstr
, sizeof(addrstr
));
638 slen
= sizeof(*sin6
);
641 if (IS_ZEBRA_DEBUG_FPM
)
642 zlog_debug("%s: attempting to connect to %s:%d", __func__
,
643 addrstr
, ntohs(sin
->sin_port
));
645 rv
= connect(sock
, (struct sockaddr
*)&fnc
->addr
, slen
);
646 if (rv
== -1 && errno
!= EINPROGRESS
) {
647 atomic_fetch_add_explicit(&fnc
->counters
.connection_errors
, 1,
648 memory_order_relaxed
);
650 zlog_warn("%s: fpm connection failed: %s", __func__
,
652 thread_add_timer(fnc
->fthread
->master
, fpm_connect
, fnc
, 3,
657 fnc
->connecting
= (errno
== EINPROGRESS
);
659 if (!fnc
->connecting
)
660 thread_add_read(fnc
->fthread
->master
, fpm_read
, fnc
, sock
,
662 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, sock
,
666 * Starting with LSPs walk all FPM objects, marking them
667 * as unsent and then replaying them.
669 * If we are not connected, then delay the objects reset/send.
671 if (!fnc
->connecting
)
672 thread_add_timer(zrouter
.master
, fpm_lsp_reset
, fnc
, 0,
679 * Encode data plane operation context into netlink and enqueue it in the FPM
682 * @param fnc the netlink FPM context.
683 * @param ctx the data plane operation context data.
684 * @return 0 on success or -1 on not enough space.
686 static int fpm_nl_enqueue(struct fpm_nl_ctx
*fnc
, struct zebra_dplane_ctx
*ctx
)
688 uint8_t nl_buf
[NL_PKT_BUF_SIZE
];
691 uint64_t obytes
, obytes_peak
;
692 enum dplane_op_e op
= dplane_ctx_get_op(ctx
);
695 * If we were configured to not use next hop groups, then quit as soon
699 && (op
== DPLANE_OP_NH_DELETE
|| op
== DPLANE_OP_NH_INSTALL
700 || op
== DPLANE_OP_NH_UPDATE
))
705 frr_mutex_lock_autounlock(&fnc
->obuf_mutex
);
708 case DPLANE_OP_ROUTE_UPDATE
:
709 case DPLANE_OP_ROUTE_DELETE
:
710 rv
= netlink_route_multipath_msg_encode(RTM_DELROUTE
, ctx
,
711 nl_buf
, sizeof(nl_buf
),
715 "%s: netlink_route_multipath_msg_encode failed",
720 nl_buf_len
= (size_t)rv
;
722 /* UPDATE operations need a INSTALL, otherwise just quit. */
723 if (op
== DPLANE_OP_ROUTE_DELETE
)
727 case DPLANE_OP_ROUTE_INSTALL
:
728 rv
= netlink_route_multipath_msg_encode(
729 RTM_NEWROUTE
, ctx
, &nl_buf
[nl_buf_len
],
730 sizeof(nl_buf
) - nl_buf_len
, true, fnc
->use_nhg
);
733 "%s: netlink_route_multipath_msg_encode failed",
738 nl_buf_len
+= (size_t)rv
;
741 case DPLANE_OP_MAC_INSTALL
:
742 case DPLANE_OP_MAC_DELETE
:
743 rv
= netlink_macfdb_update_ctx(ctx
, nl_buf
, sizeof(nl_buf
));
745 zlog_err("%s: netlink_macfdb_update_ctx failed",
750 nl_buf_len
= (size_t)rv
;
753 case DPLANE_OP_NH_DELETE
:
754 rv
= netlink_nexthop_msg_encode(RTM_DELNEXTHOP
, ctx
, nl_buf
,
757 zlog_err("%s: netlink_nexthop_msg_encode failed",
762 nl_buf_len
= (size_t)rv
;
764 case DPLANE_OP_NH_INSTALL
:
765 case DPLANE_OP_NH_UPDATE
:
766 rv
= netlink_nexthop_msg_encode(RTM_NEWNEXTHOP
, ctx
, nl_buf
,
769 zlog_err("%s: netlink_nexthop_msg_encode failed",
774 nl_buf_len
= (size_t)rv
;
777 case DPLANE_OP_LSP_INSTALL
:
778 case DPLANE_OP_LSP_UPDATE
:
779 case DPLANE_OP_LSP_DELETE
:
780 rv
= netlink_lsp_msg_encoder(ctx
, nl_buf
, sizeof(nl_buf
));
782 zlog_err("%s: netlink_lsp_msg_encoder failed",
787 nl_buf_len
+= (size_t)rv
;
790 case DPLANE_OP_PW_INSTALL
:
791 case DPLANE_OP_PW_UNINSTALL
:
792 case DPLANE_OP_ADDR_INSTALL
:
793 case DPLANE_OP_ADDR_UNINSTALL
:
794 case DPLANE_OP_NEIGH_INSTALL
:
795 case DPLANE_OP_NEIGH_UPDATE
:
796 case DPLANE_OP_NEIGH_DELETE
:
797 case DPLANE_OP_VTEP_ADD
:
798 case DPLANE_OP_VTEP_DELETE
:
799 case DPLANE_OP_SYS_ROUTE_ADD
:
800 case DPLANE_OP_SYS_ROUTE_DELETE
:
801 case DPLANE_OP_ROUTE_NOTIFY
:
802 case DPLANE_OP_LSP_NOTIFY
:
807 if (IS_ZEBRA_DEBUG_FPM
)
808 zlog_debug("%s: unhandled data plane message (%d) %s",
809 __func__
, dplane_ctx_get_op(ctx
),
810 dplane_op2str(dplane_ctx_get_op(ctx
)));
814 /* Skip empty enqueues. */
818 /* We must know if someday a message goes beyond 65KiB. */
819 assert((nl_buf_len
+ FPM_HEADER_SIZE
) <= UINT16_MAX
);
821 /* Check if we have enough buffer space. */
822 if (STREAM_WRITEABLE(fnc
->obuf
) < (nl_buf_len
+ FPM_HEADER_SIZE
)) {
823 atomic_fetch_add_explicit(&fnc
->counters
.buffer_full
, 1,
824 memory_order_relaxed
);
826 if (IS_ZEBRA_DEBUG_FPM
)
828 "%s: buffer full: wants to write %zu but has %zu",
829 __func__
, nl_buf_len
+ FPM_HEADER_SIZE
,
830 STREAM_WRITEABLE(fnc
->obuf
));
836 * Fill in the FPM header information.
838 * See FPM_HEADER_SIZE definition for more information.
840 stream_putc(fnc
->obuf
, 1);
841 stream_putc(fnc
->obuf
, 1);
842 stream_putw(fnc
->obuf
, nl_buf_len
+ FPM_HEADER_SIZE
);
844 /* Write current data. */
845 stream_write(fnc
->obuf
, nl_buf
, (size_t)nl_buf_len
);
847 /* Account number of bytes waiting to be written. */
848 atomic_fetch_add_explicit(&fnc
->counters
.obuf_bytes
,
849 nl_buf_len
+ FPM_HEADER_SIZE
,
850 memory_order_relaxed
);
851 obytes
= atomic_load_explicit(&fnc
->counters
.obuf_bytes
,
852 memory_order_relaxed
);
853 obytes_peak
= atomic_load_explicit(&fnc
->counters
.obuf_peak
,
854 memory_order_relaxed
);
855 if (obytes_peak
< obytes
)
856 atomic_store_explicit(&fnc
->counters
.obuf_peak
, obytes
,
857 memory_order_relaxed
);
859 /* Tell the thread to start writing. */
860 thread_add_write(fnc
->fthread
->master
, fpm_write
, fnc
, fnc
->socket
,
867 * LSP walk/send functions
870 struct zebra_dplane_ctx
*ctx
;
871 struct fpm_nl_ctx
*fnc
;
875 static int fpm_lsp_send_cb(struct hash_bucket
*bucket
, void *arg
)
877 struct zebra_lsp
*lsp
= bucket
->data
;
878 struct fpm_lsp_arg
*fla
= arg
;
880 /* Skip entries which have already been sent */
881 if (CHECK_FLAG(lsp
->flags
, LSP_FLAG_FPM
))
882 return HASHWALK_CONTINUE
;
884 dplane_ctx_reset(fla
->ctx
);
885 dplane_ctx_lsp_init(fla
->ctx
, DPLANE_OP_LSP_INSTALL
, lsp
);
887 if (fpm_nl_enqueue(fla
->fnc
, fla
->ctx
) == -1) {
888 fla
->complete
= false;
889 return HASHWALK_ABORT
;
892 /* Mark entry as sent */
893 SET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
894 return HASHWALK_CONTINUE
;
897 static int fpm_lsp_send(struct thread
*t
)
899 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
900 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
901 struct fpm_lsp_arg fla
;
904 fla
.ctx
= dplane_ctx_alloc();
907 hash_walk(zvrf
->lsp_table
, fpm_lsp_send_cb
, &fla
);
909 dplane_ctx_fini(&fla
.ctx
);
912 WALK_FINISH(fnc
, FNE_LSP_FINISHED
);
914 /* Now move onto routes */
915 thread_add_timer(zrouter
.master
, fpm_nhg_reset
, fnc
, 0,
918 /* Didn't finish - reschedule LSP walk */
919 thread_add_timer(zrouter
.master
, fpm_lsp_send
, fnc
, 0,
927 * Next hop walk/send functions.
930 struct zebra_dplane_ctx
*ctx
;
931 struct fpm_nl_ctx
*fnc
;
935 static int fpm_nhg_send_cb(struct hash_bucket
*bucket
, void *arg
)
937 struct nhg_hash_entry
*nhe
= bucket
->data
;
938 struct fpm_nhg_arg
*fna
= arg
;
940 /* This entry was already sent, skip it. */
941 if (CHECK_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
))
942 return HASHWALK_CONTINUE
;
944 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
945 dplane_ctx_reset(fna
->ctx
);
946 dplane_ctx_nexthop_init(fna
->ctx
, DPLANE_OP_NH_INSTALL
, nhe
);
947 if (fpm_nl_enqueue(fna
->fnc
, fna
->ctx
) == -1) {
948 /* Our buffers are full, lets give it some cycles. */
949 fna
->complete
= false;
950 return HASHWALK_ABORT
;
953 /* Mark group as sent, so it doesn't get sent again. */
954 SET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
956 return HASHWALK_CONTINUE
;
959 static int fpm_nhg_send(struct thread
*t
)
961 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
962 struct fpm_nhg_arg fna
;
965 fna
.ctx
= dplane_ctx_alloc();
968 /* Send next hops. */
970 hash_walk(zrouter
.nhgs_id
, fpm_nhg_send_cb
, &fna
);
972 /* `free()` allocated memory. */
973 dplane_ctx_fini(&fna
.ctx
);
975 /* We are done sending next hops, lets install the routes now. */
977 WALK_FINISH(fnc
, FNE_NHG_FINISHED
);
978 thread_add_timer(zrouter
.master
, fpm_rib_reset
, fnc
, 0,
980 } else /* Otherwise reschedule next hop group again. */
981 thread_add_timer(zrouter
.master
, fpm_nhg_send
, fnc
, 0,
988 * Send all RIB installed routes to the connected data plane.
990 static int fpm_rib_send(struct thread
*t
)
992 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
994 struct route_node
*rn
;
995 struct route_table
*rt
;
996 struct zebra_dplane_ctx
*ctx
;
997 rib_tables_iter_t rt_iter
;
999 /* Allocate temporary context for all transactions. */
1000 ctx
= dplane_ctx_alloc();
1002 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1003 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1004 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1005 dest
= rib_dest_from_rnode(rn
);
1006 /* Skip bad route entries. */
1007 if (dest
== NULL
|| dest
->selected_fib
== NULL
)
1010 /* Check for already sent routes. */
1011 if (CHECK_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
))
1014 /* Enqueue route install. */
1015 dplane_ctx_reset(ctx
);
1016 dplane_ctx_route_init(ctx
, DPLANE_OP_ROUTE_INSTALL
, rn
,
1017 dest
->selected_fib
);
1018 if (fpm_nl_enqueue(fnc
, ctx
) == -1) {
1019 /* Free the temporary allocated context. */
1020 dplane_ctx_fini(&ctx
);
1022 thread_add_timer(zrouter
.master
, fpm_rib_send
,
1023 fnc
, 1, &fnc
->t_ribwalk
);
1028 SET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1032 /* Free the temporary allocated context. */
1033 dplane_ctx_fini(&ctx
);
1035 /* All RIB routes sent! */
1036 WALK_FINISH(fnc
, FNE_RIB_FINISHED
);
1038 /* Schedule next event: RMAC reset. */
1039 thread_add_event(zrouter
.master
, fpm_rmac_reset
, fnc
, 0,
1046 * The next three functions will handle RMAC enqueue.
1048 struct fpm_rmac_arg
{
1049 struct zebra_dplane_ctx
*ctx
;
1050 struct fpm_nl_ctx
*fnc
;
1051 struct zebra_l3vni
*zl3vni
;
1055 static void fpm_enqueue_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1057 struct fpm_rmac_arg
*fra
= arg
;
1058 struct zebra_mac
*zrmac
= bucket
->data
;
1059 struct zebra_if
*zif
= fra
->zl3vni
->vxlan_if
->info
;
1060 const struct zebra_l2info_vxlan
*vxl
= &zif
->l2info
.vxl
;
1061 struct zebra_if
*br_zif
;
1065 /* Entry already sent. */
1066 if (CHECK_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
) || !fra
->complete
)
1069 sticky
= !!CHECK_FLAG(zrmac
->flags
,
1070 (ZEBRA_MAC_STICKY
| ZEBRA_MAC_REMOTE_DEF_GW
));
1071 br_zif
= (struct zebra_if
*)(zif
->brslave_info
.br_if
->info
);
1072 vid
= IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif
) ? vxl
->access_vlan
: 0;
1074 dplane_ctx_reset(fra
->ctx
);
1075 dplane_ctx_set_op(fra
->ctx
, DPLANE_OP_MAC_INSTALL
);
1076 dplane_mac_init(fra
->ctx
, fra
->zl3vni
->vxlan_if
,
1077 zif
->brslave_info
.br_if
, vid
,
1078 &zrmac
->macaddr
, zrmac
->fwd_info
.r_vtep_ip
, sticky
,
1079 0 /*nhg*/, 0 /*update_flags*/);
1080 if (fpm_nl_enqueue(fra
->fnc
, fra
->ctx
) == -1) {
1081 thread_add_timer(zrouter
.master
, fpm_rmac_send
,
1082 fra
->fnc
, 1, &fra
->fnc
->t_rmacwalk
);
1083 fra
->complete
= false;
1087 static void fpm_enqueue_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1089 struct fpm_rmac_arg
*fra
= arg
;
1090 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1092 fra
->zl3vni
= zl3vni
;
1093 hash_iterate(zl3vni
->rmac_table
, fpm_enqueue_rmac_table
, zl3vni
);
1096 static int fpm_rmac_send(struct thread
*t
)
1098 struct fpm_rmac_arg fra
;
1100 fra
.fnc
= THREAD_ARG(t
);
1101 fra
.ctx
= dplane_ctx_alloc();
1102 fra
.complete
= true;
1103 hash_iterate(zrouter
.l3vni_table
, fpm_enqueue_l3vni_table
, &fra
);
1104 dplane_ctx_fini(&fra
.ctx
);
1106 /* RMAC walk completed. */
1108 WALK_FINISH(fra
.fnc
, FNE_RMAC_FINISHED
);
1114 * Resets the next hop FPM flags so we send all next hops again.
1116 static void fpm_nhg_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1118 struct nhg_hash_entry
*nhe
= bucket
->data
;
1120 /* Unset FPM installation flag so it gets installed again. */
1121 UNSET_FLAG(nhe
->flags
, NEXTHOP_GROUP_FPM
);
1124 static int fpm_nhg_reset(struct thread
*t
)
1126 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1128 hash_iterate(zrouter
.nhgs_id
, fpm_nhg_reset_cb
, NULL
);
1130 /* Schedule next step: send next hop groups. */
1131 thread_add_event(zrouter
.master
, fpm_nhg_send
, fnc
, 0, &fnc
->t_nhgwalk
);
1137 * Resets the LSP FPM flag so we send all LSPs again.
1139 static void fpm_lsp_reset_cb(struct hash_bucket
*bucket
, void *arg
)
1141 struct zebra_lsp
*lsp
= bucket
->data
;
1143 UNSET_FLAG(lsp
->flags
, LSP_FLAG_FPM
);
1146 static int fpm_lsp_reset(struct thread
*t
)
1148 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1149 struct zebra_vrf
*zvrf
= vrf_info_lookup(VRF_DEFAULT
);
1151 hash_iterate(zvrf
->lsp_table
, fpm_lsp_reset_cb
, NULL
);
1153 /* Schedule next step: send LSPs */
1154 thread_add_event(zrouter
.master
, fpm_lsp_send
, fnc
, 0, &fnc
->t_lspwalk
);
1160 * Resets the RIB FPM flags so we send all routes again.
1162 static int fpm_rib_reset(struct thread
*t
)
1164 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1166 struct route_node
*rn
;
1167 struct route_table
*rt
;
1168 rib_tables_iter_t rt_iter
;
1170 rt_iter
.state
= RIB_TABLES_ITER_S_INIT
;
1171 while ((rt
= rib_tables_iter_next(&rt_iter
))) {
1172 for (rn
= route_top(rt
); rn
; rn
= srcdest_route_next(rn
)) {
1173 dest
= rib_dest_from_rnode(rn
);
1174 /* Skip bad route entries. */
1178 UNSET_FLAG(dest
->flags
, RIB_DEST_UPDATE_FPM
);
1182 /* Schedule next step: send RIB routes. */
1183 thread_add_event(zrouter
.master
, fpm_rib_send
, fnc
, 0, &fnc
->t_ribwalk
);
1189 * The next three function will handle RMAC table reset.
1191 static void fpm_unset_rmac_table(struct hash_bucket
*bucket
, void *arg
)
1193 struct zebra_mac
*zrmac
= bucket
->data
;
1195 UNSET_FLAG(zrmac
->flags
, ZEBRA_MAC_FPM_SENT
);
1198 static void fpm_unset_l3vni_table(struct hash_bucket
*bucket
, void *arg
)
1200 struct zebra_l3vni
*zl3vni
= bucket
->data
;
1202 hash_iterate(zl3vni
->rmac_table
, fpm_unset_rmac_table
, zl3vni
);
1205 static int fpm_rmac_reset(struct thread
*t
)
1207 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1209 hash_iterate(zrouter
.l3vni_table
, fpm_unset_l3vni_table
, NULL
);
1211 /* Schedule next event: send RMAC entries. */
1212 thread_add_event(zrouter
.master
, fpm_rmac_send
, fnc
, 0,
1218 static int fpm_process_queue(struct thread
*t
)
1220 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1221 struct zebra_dplane_ctx
*ctx
;
1222 bool no_bufs
= false;
1223 uint64_t processed_contexts
= 0;
1226 /* No space available yet. */
1227 if (STREAM_WRITEABLE(fnc
->obuf
) < NL_PKT_BUF_SIZE
) {
1232 /* Dequeue next item or quit processing. */
1233 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1234 ctx
= dplane_ctx_dequeue(&fnc
->ctxqueue
);
1240 * Intentionally ignoring the return value
1241 * as that we are ensuring that we can write to
1242 * the output data in the STREAM_WRITEABLE
1243 * check above, so we can ignore the return
1245 (void)fpm_nl_enqueue(fnc
, ctx
);
1247 /* Account the processed entries. */
1248 processed_contexts
++;
1249 atomic_fetch_sub_explicit(&fnc
->counters
.ctxqueue_len
, 1,
1250 memory_order_relaxed
);
1252 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1253 dplane_provider_enqueue_out_ctx(fnc
->prov
, ctx
);
1256 /* Update count of processed contexts */
1257 atomic_fetch_add_explicit(&fnc
->counters
.dplane_contexts
,
1258 processed_contexts
, memory_order_relaxed
);
1260 /* Re-schedule if we ran out of buffer space */
1262 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1263 fnc
, 0, &fnc
->t_dequeue
);
1266 * Let the dataplane thread know if there are items in the
1267 * output queue to be processed. Otherwise they may sit
1268 * until the dataplane thread gets scheduled for new,
1271 if (dplane_provider_out_ctx_queue_len(fnc
->prov
) > 0)
1272 dplane_provider_work_ready();
1278 * Handles external (e.g. CLI, data plane or others) events.
1280 static int fpm_process_event(struct thread
*t
)
1282 struct fpm_nl_ctx
*fnc
= THREAD_ARG(t
);
1283 int event
= THREAD_VAL(t
);
1287 zlog_info("%s: manual FPM disable event", __func__
);
1288 fnc
->disabled
= true;
1289 atomic_fetch_add_explicit(&fnc
->counters
.user_disables
, 1,
1290 memory_order_relaxed
);
1292 /* Call reconnect to disable timers and clean up context. */
1297 zlog_info("%s: manual FPM reconnect event", __func__
);
1298 fnc
->disabled
= false;
1299 atomic_fetch_add_explicit(&fnc
->counters
.user_configures
, 1,
1300 memory_order_relaxed
);
1304 case FNE_RESET_COUNTERS
:
1305 zlog_info("%s: manual FPM counters reset event", __func__
);
1306 memset(&fnc
->counters
, 0, sizeof(fnc
->counters
));
1309 case FNE_TOGGLE_NHG
:
1310 zlog_info("%s: toggle next hop groups support", __func__
);
1311 fnc
->use_nhg
= !fnc
->use_nhg
;
1315 case FNE_INTERNAL_RECONNECT
:
1319 case FNE_NHG_FINISHED
:
1320 if (IS_ZEBRA_DEBUG_FPM
)
1321 zlog_debug("%s: next hop groups walk finished",
1324 case FNE_RIB_FINISHED
:
1325 if (IS_ZEBRA_DEBUG_FPM
)
1326 zlog_debug("%s: RIB walk finished", __func__
);
1328 case FNE_RMAC_FINISHED
:
1329 if (IS_ZEBRA_DEBUG_FPM
)
1330 zlog_debug("%s: RMAC walk finished", __func__
);
1332 case FNE_LSP_FINISHED
:
1333 if (IS_ZEBRA_DEBUG_FPM
)
1334 zlog_debug("%s: LSP walk finished", __func__
);
1338 if (IS_ZEBRA_DEBUG_FPM
)
1339 zlog_debug("%s: unhandled event %d", __func__
, event
);
1347 * Data plane functions.
1349 static int fpm_nl_start(struct zebra_dplane_provider
*prov
)
1351 struct fpm_nl_ctx
*fnc
;
1353 fnc
= dplane_provider_get_data(prov
);
1354 fnc
->fthread
= frr_pthread_new(NULL
, prov_name
, prov_name
);
1355 assert(frr_pthread_run(fnc
->fthread
, NULL
) == 0);
1356 fnc
->ibuf
= stream_new(NL_PKT_BUF_SIZE
);
1357 fnc
->obuf
= stream_new(NL_PKT_BUF_SIZE
* 128);
1358 pthread_mutex_init(&fnc
->obuf_mutex
, NULL
);
1360 fnc
->disabled
= true;
1362 TAILQ_INIT(&fnc
->ctxqueue
);
1363 pthread_mutex_init(&fnc
->ctxqueue_mutex
, NULL
);
1365 /* Set default values. */
1366 fnc
->use_nhg
= true;
1371 static int fpm_nl_finish_early(struct fpm_nl_ctx
*fnc
)
1373 /* Disable all events and close socket. */
1374 THREAD_OFF(fnc
->t_lspreset
);
1375 THREAD_OFF(fnc
->t_lspwalk
);
1376 THREAD_OFF(fnc
->t_nhgreset
);
1377 THREAD_OFF(fnc
->t_nhgwalk
);
1378 THREAD_OFF(fnc
->t_ribreset
);
1379 THREAD_OFF(fnc
->t_ribwalk
);
1380 THREAD_OFF(fnc
->t_rmacreset
);
1381 THREAD_OFF(fnc
->t_rmacwalk
);
1382 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_read
, NULL
);
1383 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_write
, NULL
);
1384 thread_cancel_async(fnc
->fthread
->master
, &fnc
->t_connect
, NULL
);
1386 if (fnc
->socket
!= -1) {
1394 static int fpm_nl_finish_late(struct fpm_nl_ctx
*fnc
)
1396 /* Stop the running thread. */
1397 frr_pthread_stop(fnc
->fthread
, NULL
);
1399 /* Free all allocated resources. */
1400 pthread_mutex_destroy(&fnc
->obuf_mutex
);
1401 pthread_mutex_destroy(&fnc
->ctxqueue_mutex
);
1402 stream_free(fnc
->ibuf
);
1403 stream_free(fnc
->obuf
);
1410 static int fpm_nl_finish(struct zebra_dplane_provider
*prov
, bool early
)
1412 struct fpm_nl_ctx
*fnc
;
1414 fnc
= dplane_provider_get_data(prov
);
1416 return fpm_nl_finish_early(fnc
);
1418 return fpm_nl_finish_late(fnc
);
1421 static int fpm_nl_process(struct zebra_dplane_provider
*prov
)
1423 struct zebra_dplane_ctx
*ctx
;
1424 struct fpm_nl_ctx
*fnc
;
1426 uint64_t cur_queue
, peak_queue
= 0, stored_peak_queue
;
1428 fnc
= dplane_provider_get_data(prov
);
1429 limit
= dplane_provider_get_work_limit(prov
);
1430 for (counter
= 0; counter
< limit
; counter
++) {
1431 ctx
= dplane_provider_dequeue_in_ctx(prov
);
1436 * Skip all notifications if not connected, we'll walk the RIB
1439 if (fnc
->socket
!= -1 && fnc
->connecting
== false) {
1441 * Update the number of queued contexts *before*
1442 * enqueueing, to ensure counter consistency.
1444 atomic_fetch_add_explicit(&fnc
->counters
.ctxqueue_len
,
1445 1, memory_order_relaxed
);
1447 frr_with_mutex (&fnc
->ctxqueue_mutex
) {
1448 dplane_ctx_enqueue_tail(&fnc
->ctxqueue
, ctx
);
1451 cur_queue
= atomic_load_explicit(
1452 &fnc
->counters
.ctxqueue_len
,
1453 memory_order_relaxed
);
1454 if (peak_queue
< cur_queue
)
1455 peak_queue
= cur_queue
;
1459 dplane_ctx_set_status(ctx
, ZEBRA_DPLANE_REQUEST_SUCCESS
);
1460 dplane_provider_enqueue_out_ctx(prov
, ctx
);
1463 /* Update peak queue length, if we just observed a new peak */
1464 stored_peak_queue
= atomic_load_explicit(
1465 &fnc
->counters
.ctxqueue_len_peak
, memory_order_relaxed
);
1466 if (stored_peak_queue
< peak_queue
)
1467 atomic_store_explicit(&fnc
->counters
.ctxqueue_len_peak
,
1468 peak_queue
, memory_order_relaxed
);
1470 if (atomic_load_explicit(&fnc
->counters
.ctxqueue_len
,
1471 memory_order_relaxed
)
1473 thread_add_timer(fnc
->fthread
->master
, fpm_process_queue
,
1474 fnc
, 0, &fnc
->t_dequeue
);
1476 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1477 if (counter
>= limit
)
1478 dplane_provider_work_ready();
1483 static int fpm_nl_new(struct thread_master
*tm
)
1485 struct zebra_dplane_provider
*prov
= NULL
;
1488 gfnc
= calloc(1, sizeof(*gfnc
));
1489 rv
= dplane_provider_register(prov_name
, DPLANE_PRIO_POSTPROCESS
,
1490 DPLANE_PROV_FLAG_THREADED
, fpm_nl_start
,
1491 fpm_nl_process
, fpm_nl_finish
, gfnc
,
1494 if (IS_ZEBRA_DEBUG_DPLANE
)
1495 zlog_debug("%s register status: %d", prov_name
, rv
);
1497 install_node(&fpm_node
);
1498 install_element(ENABLE_NODE
, &fpm_show_counters_cmd
);
1499 install_element(ENABLE_NODE
, &fpm_show_counters_json_cmd
);
1500 install_element(ENABLE_NODE
, &fpm_reset_counters_cmd
);
1501 install_element(CONFIG_NODE
, &fpm_set_address_cmd
);
1502 install_element(CONFIG_NODE
, &no_fpm_set_address_cmd
);
1503 install_element(CONFIG_NODE
, &fpm_use_nhg_cmd
);
1504 install_element(CONFIG_NODE
, &no_fpm_use_nhg_cmd
);
1509 static int fpm_nl_init(void)
1511 hook_register(frr_late_init
, fpm_nl_new
);
1516 .name
= "dplane_fpm_nl",
1518 .description
= "Data plane plugin for FPM using netlink.",
1519 .init
= fpm_nl_init
,