]> git.proxmox.com Git - mirror_frr.git/blob - zebra/dplane_fpm_nl.c
Merge pull request #12798 from donaldsharp/rib_match_multicast
[mirror_frr.git] / zebra / dplane_fpm_nl.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
4 *
5 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
6 * Rafael Zalamena
7 */
8
9 #ifdef HAVE_CONFIG_H
10 #include "config.h" /* Include this explicitly */
11 #endif
12
13 #include <arpa/inet.h>
14
15 #include <sys/types.h>
16 #include <sys/socket.h>
17
18 #include <errno.h>
19 #include <string.h>
20
21 #include "lib/zebra.h"
22 #include "lib/json.h"
23 #include "lib/libfrr.h"
24 #include "lib/frratomic.h"
25 #include "lib/command.h"
26 #include "lib/memory.h"
27 #include "lib/network.h"
28 #include "lib/ns.h"
29 #include "lib/frr_pthread.h"
30 #include "zebra/debug.h"
31 #include "zebra/interface.h"
32 #include "zebra/zebra_dplane.h"
33 #include "zebra/zebra_mpls.h"
34 #include "zebra/zebra_router.h"
35 #include "zebra/interface.h"
36 #include "zebra/zebra_vxlan_private.h"
37 #include "zebra/zebra_evpn.h"
38 #include "zebra/zebra_evpn_mac.h"
39 #include "zebra/kernel_netlink.h"
40 #include "zebra/rt_netlink.h"
41 #include "zebra/debug.h"
42 #include "fpm/fpm.h"
43
44 #define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
45 #define SOUTHBOUND_DEFAULT_PORT 2620
46
47 /**
48 * FPM header:
49 * {
50 * version: 1 byte (always 1),
51 * type: 1 byte (1 for netlink, 2 protobuf),
52 * len: 2 bytes (network order),
53 * }
54 *
55 * This header is used with any format to tell the users how many bytes to
56 * expect.
57 */
58 #define FPM_HEADER_SIZE 4
59
60 static const char *prov_name = "dplane_fpm_nl";
61
62 struct fpm_nl_ctx {
63 /* data plane connection. */
64 int socket;
65 bool disabled;
66 bool connecting;
67 bool use_nhg;
68 struct sockaddr_storage addr;
69
70 /* data plane buffers. */
71 struct stream *ibuf;
72 struct stream *obuf;
73 pthread_mutex_t obuf_mutex;
74
75 /*
76 * data plane context queue:
77 * When a FPM server connection becomes a bottleneck, we must keep the
78 * data plane contexts until we get a chance to process them.
79 */
80 struct dplane_ctx_list_head ctxqueue;
81 pthread_mutex_t ctxqueue_mutex;
82
83 /* data plane events. */
84 struct zebra_dplane_provider *prov;
85 struct frr_pthread *fthread;
86 struct thread *t_connect;
87 struct thread *t_read;
88 struct thread *t_write;
89 struct thread *t_event;
90 struct thread *t_nhg;
91 struct thread *t_dequeue;
92
93 /* zebra events. */
94 struct thread *t_lspreset;
95 struct thread *t_lspwalk;
96 struct thread *t_nhgreset;
97 struct thread *t_nhgwalk;
98 struct thread *t_ribreset;
99 struct thread *t_ribwalk;
100 struct thread *t_rmacreset;
101 struct thread *t_rmacwalk;
102
103 /* Statistic counters. */
104 struct {
105 /* Amount of bytes read into ibuf. */
106 _Atomic uint32_t bytes_read;
107 /* Amount of bytes written from obuf. */
108 _Atomic uint32_t bytes_sent;
109 /* Output buffer current usage. */
110 _Atomic uint32_t obuf_bytes;
111 /* Output buffer peak usage. */
112 _Atomic uint32_t obuf_peak;
113
114 /* Amount of connection closes. */
115 _Atomic uint32_t connection_closes;
116 /* Amount of connection errors. */
117 _Atomic uint32_t connection_errors;
118
119 /* Amount of user configurations: FNE_RECONNECT. */
120 _Atomic uint32_t user_configures;
121 /* Amount of user disable requests: FNE_DISABLE. */
122 _Atomic uint32_t user_disables;
123
124 /* Amount of data plane context processed. */
125 _Atomic uint32_t dplane_contexts;
126 /* Amount of data plane contexts enqueued. */
127 _Atomic uint32_t ctxqueue_len;
128 /* Peak amount of data plane contexts enqueued. */
129 _Atomic uint32_t ctxqueue_len_peak;
130
131 /* Amount of buffer full events. */
132 _Atomic uint32_t buffer_full;
133 } counters;
134 } *gfnc;
135
136 enum fpm_nl_events {
137 /* Ask for FPM to reconnect the external server. */
138 FNE_RECONNECT,
139 /* Disable FPM. */
140 FNE_DISABLE,
141 /* Reset counters. */
142 FNE_RESET_COUNTERS,
143 /* Toggle next hop group feature. */
144 FNE_TOGGLE_NHG,
145 /* Reconnect request by our own code to avoid races. */
146 FNE_INTERNAL_RECONNECT,
147
148 /* LSP walk finished. */
149 FNE_LSP_FINISHED,
150 /* Next hop groups walk finished. */
151 FNE_NHG_FINISHED,
152 /* RIB walk finished. */
153 FNE_RIB_FINISHED,
154 /* RMAC walk finished. */
155 FNE_RMAC_FINISHED,
156 };
157
158 #define FPM_RECONNECT(fnc) \
159 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
160 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
161
162 #define WALK_FINISH(fnc, ev) \
163 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
164 (ev), NULL)
165
166 /*
167 * Prototypes.
168 */
169 static void fpm_process_event(struct thread *t);
170 static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
171 static void fpm_lsp_send(struct thread *t);
172 static void fpm_lsp_reset(struct thread *t);
173 static void fpm_nhg_send(struct thread *t);
174 static void fpm_nhg_reset(struct thread *t);
175 static void fpm_rib_send(struct thread *t);
176 static void fpm_rib_reset(struct thread *t);
177 static void fpm_rmac_send(struct thread *t);
178 static void fpm_rmac_reset(struct thread *t);
179
180 /*
181 * CLI.
182 */
183 #define FPM_STR "Forwarding Plane Manager configuration\n"
184
185 DEFUN(fpm_set_address, fpm_set_address_cmd,
186 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
187 FPM_STR
188 "FPM remote listening server address\n"
189 "Remote IPv4 FPM server\n"
190 "Remote IPv6 FPM server\n"
191 "FPM remote listening server port\n"
192 "Remote FPM server port\n")
193 {
194 struct sockaddr_in *sin;
195 struct sockaddr_in6 *sin6;
196 uint16_t port = 0;
197 uint8_t naddr[INET6_BUFSIZ];
198
199 if (argc == 5)
200 port = strtol(argv[4]->arg, NULL, 10);
201
202 /* Handle IPv4 addresses. */
203 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
204 sin = (struct sockaddr_in *)&gfnc->addr;
205
206 memset(sin, 0, sizeof(*sin));
207 sin->sin_family = AF_INET;
208 sin->sin_port =
209 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
210 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
211 sin->sin_len = sizeof(*sin);
212 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
213 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
214
215 goto ask_reconnect;
216 }
217
218 /* Handle IPv6 addresses. */
219 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
220 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
221 return CMD_WARNING;
222 }
223
224 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
225 memset(sin6, 0, sizeof(*sin6));
226 sin6->sin6_family = AF_INET6;
227 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
228 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
229 sin6->sin6_len = sizeof(*sin6);
230 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
231 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
232
233 ask_reconnect:
234 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
235 FNE_RECONNECT, &gfnc->t_event);
236 return CMD_SUCCESS;
237 }
238
239 DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
240 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
241 NO_STR
242 FPM_STR
243 "FPM remote listening server address\n"
244 "Remote IPv4 FPM server\n"
245 "Remote IPv6 FPM server\n"
246 "FPM remote listening server port\n"
247 "Remote FPM server port\n")
248 {
249 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
250 FNE_DISABLE, &gfnc->t_event);
251 return CMD_SUCCESS;
252 }
253
254 DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
255 "fpm use-next-hop-groups",
256 FPM_STR
257 "Use netlink next hop groups feature.\n")
258 {
259 /* Already enabled. */
260 if (gfnc->use_nhg)
261 return CMD_SUCCESS;
262
263 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
264 FNE_TOGGLE_NHG, &gfnc->t_nhg);
265
266 return CMD_SUCCESS;
267 }
268
269 DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
270 "no fpm use-next-hop-groups",
271 NO_STR
272 FPM_STR
273 "Use netlink next hop groups feature.\n")
274 {
275 /* Already disabled. */
276 if (!gfnc->use_nhg)
277 return CMD_SUCCESS;
278
279 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
280 FNE_TOGGLE_NHG, &gfnc->t_nhg);
281
282 return CMD_SUCCESS;
283 }
284
285 DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
286 "clear fpm counters",
287 CLEAR_STR
288 FPM_STR
289 "FPM statistic counters\n")
290 {
291 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
292 FNE_RESET_COUNTERS, &gfnc->t_event);
293 return CMD_SUCCESS;
294 }
295
296 DEFUN(fpm_show_counters, fpm_show_counters_cmd,
297 "show fpm counters",
298 SHOW_STR
299 FPM_STR
300 "FPM statistic counters\n")
301 {
302 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
303
304 #define SHOW_COUNTER(label, counter) \
305 vty_out(vty, "%28s: %u\n", (label), (counter))
306
307 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
308 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
309 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
310 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
311 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
312 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
313 SHOW_COUNTER("Data plane items processed",
314 gfnc->counters.dplane_contexts);
315 SHOW_COUNTER("Data plane items enqueued",
316 gfnc->counters.ctxqueue_len);
317 SHOW_COUNTER("Data plane items queue peak",
318 gfnc->counters.ctxqueue_len_peak);
319 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
320 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
321 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
322
323 #undef SHOW_COUNTER
324
325 return CMD_SUCCESS;
326 }
327
328 DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
329 "show fpm counters json",
330 SHOW_STR
331 FPM_STR
332 "FPM statistic counters\n"
333 JSON_STR)
334 {
335 struct json_object *jo;
336
337 jo = json_object_new_object();
338 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
339 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
340 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
341 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
342 json_object_int_add(jo, "connection-closes",
343 gfnc->counters.connection_closes);
344 json_object_int_add(jo, "connection-errors",
345 gfnc->counters.connection_errors);
346 json_object_int_add(jo, "data-plane-contexts",
347 gfnc->counters.dplane_contexts);
348 json_object_int_add(jo, "data-plane-contexts-queue",
349 gfnc->counters.ctxqueue_len);
350 json_object_int_add(jo, "data-plane-contexts-queue-peak",
351 gfnc->counters.ctxqueue_len_peak);
352 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
353 json_object_int_add(jo, "user-configures",
354 gfnc->counters.user_configures);
355 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
356 vty_json(vty, jo);
357
358 return CMD_SUCCESS;
359 }
360
361 static int fpm_write_config(struct vty *vty)
362 {
363 struct sockaddr_in *sin;
364 struct sockaddr_in6 *sin6;
365 int written = 0;
366
367 if (gfnc->disabled)
368 return written;
369
370 switch (gfnc->addr.ss_family) {
371 case AF_INET:
372 written = 1;
373 sin = (struct sockaddr_in *)&gfnc->addr;
374 vty_out(vty, "fpm address %pI4", &sin->sin_addr);
375 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
376 vty_out(vty, " port %d", ntohs(sin->sin_port));
377
378 vty_out(vty, "\n");
379 break;
380 case AF_INET6:
381 written = 1;
382 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
383 vty_out(vty, "fpm address %pI6", &sin6->sin6_addr);
384 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
385 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
386
387 vty_out(vty, "\n");
388 break;
389
390 default:
391 break;
392 }
393
394 if (!gfnc->use_nhg) {
395 vty_out(vty, "no fpm use-next-hop-groups\n");
396 written = 1;
397 }
398
399 return written;
400 }
401
402 static struct cmd_node fpm_node = {
403 .name = "fpm",
404 .node = FPM_NODE,
405 .prompt = "",
406 .config_write = fpm_write_config,
407 };
408
409 /*
410 * FPM functions.
411 */
412 static void fpm_connect(struct thread *t);
413
414 static void fpm_reconnect(struct fpm_nl_ctx *fnc)
415 {
416 /* Cancel all zebra threads first. */
417 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
418 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
419 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
420 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
421 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
422 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
423 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
424 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
425
426 /*
427 * Grab the lock to empty the streams (data plane might try to
428 * enqueue updates while we are closing).
429 */
430 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
431
432 /* Avoid calling close on `-1`. */
433 if (fnc->socket != -1) {
434 close(fnc->socket);
435 fnc->socket = -1;
436 }
437
438 stream_reset(fnc->ibuf);
439 stream_reset(fnc->obuf);
440 THREAD_OFF(fnc->t_read);
441 THREAD_OFF(fnc->t_write);
442
443 /* FPM is disabled, don't attempt to connect. */
444 if (fnc->disabled)
445 return;
446
447 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
448 &fnc->t_connect);
449 }
450
451 static void fpm_read(struct thread *t)
452 {
453 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
454 fpm_msg_hdr_t fpm;
455 ssize_t rv;
456 char buf[65535];
457 struct nlmsghdr *hdr;
458 struct zebra_dplane_ctx *ctx;
459 size_t available_bytes;
460 size_t hdr_available_bytes;
461
462 /* Let's ignore the input at the moment. */
463 rv = stream_read_try(fnc->ibuf, fnc->socket,
464 STREAM_WRITEABLE(fnc->ibuf));
465 if (rv == 0) {
466 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
467 memory_order_relaxed);
468
469 if (IS_ZEBRA_DEBUG_FPM)
470 zlog_debug("%s: connection closed", __func__);
471
472 FPM_RECONNECT(fnc);
473 return;
474 }
475 if (rv == -1) {
476 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
477 memory_order_relaxed);
478 zlog_warn("%s: connection failure: %s", __func__,
479 strerror(errno));
480 FPM_RECONNECT(fnc);
481 return;
482 }
483
484 /* Schedule the next read */
485 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
486 &fnc->t_read);
487
488 /* We've got an interruption. */
489 if (rv == -2)
490 return;
491
492
493 /* Account all bytes read. */
494 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
495 memory_order_relaxed);
496
497 available_bytes = STREAM_READABLE(fnc->ibuf);
498 while (available_bytes) {
499 if (available_bytes < (ssize_t)FPM_MSG_HDR_LEN) {
500 stream_pulldown(fnc->ibuf);
501 return;
502 }
503
504 fpm.version = stream_getc(fnc->ibuf);
505 fpm.msg_type = stream_getc(fnc->ibuf);
506 fpm.msg_len = stream_getw(fnc->ibuf);
507
508 if (fpm.version != FPM_PROTO_VERSION &&
509 fpm.msg_type != FPM_MSG_TYPE_NETLINK) {
510 stream_reset(fnc->ibuf);
511 zlog_warn(
512 "%s: Received version/msg_type %u/%u, expected 1/1",
513 __func__, fpm.version, fpm.msg_type);
514
515 FPM_RECONNECT(fnc);
516 return;
517 }
518
519 /*
520 * If the passed in length doesn't even fill in the header
521 * something is wrong and reset.
522 */
523 if (fpm.msg_len < FPM_MSG_HDR_LEN) {
524 zlog_warn(
525 "%s: Received message length: %u that does not even fill the FPM header",
526 __func__, fpm.msg_len);
527 FPM_RECONNECT(fnc);
528 return;
529 }
530
531 /*
532 * If we have not received the whole payload, reset the stream
533 * back to the beginning of the header and move it to the
534 * top.
535 */
536 if (fpm.msg_len > available_bytes) {
537 stream_rewind_getp(fnc->ibuf, FPM_MSG_HDR_LEN);
538 stream_pulldown(fnc->ibuf);
539 return;
540 }
541
542 available_bytes -= FPM_MSG_HDR_LEN;
543
544 /*
545 * Place the data from the stream into a buffer
546 */
547 hdr = (struct nlmsghdr *)buf;
548 stream_get(buf, fnc->ibuf, fpm.msg_len - FPM_MSG_HDR_LEN);
549 hdr_available_bytes = fpm.msg_len - FPM_MSG_HDR_LEN;
550 available_bytes -= hdr_available_bytes;
551
552 /* Sanity check: must be at least header size. */
553 if (hdr->nlmsg_len < sizeof(*hdr)) {
554 zlog_warn(
555 "%s: [seq=%u] invalid message length %u (< %zu)",
556 __func__, hdr->nlmsg_seq, hdr->nlmsg_len,
557 sizeof(*hdr));
558 continue;
559 }
560 if (hdr->nlmsg_len > fpm.msg_len) {
561 zlog_warn(
562 "%s: Received a inner header length of %u that is greater than the fpm total length of %u",
563 __func__, hdr->nlmsg_len, fpm.msg_len);
564 FPM_RECONNECT(fnc);
565 }
566 /* Not enough bytes available. */
567 if (hdr->nlmsg_len > hdr_available_bytes) {
568 zlog_warn(
569 "%s: [seq=%u] invalid message length %u (> %zu)",
570 __func__, hdr->nlmsg_seq, hdr->nlmsg_len,
571 available_bytes);
572 continue;
573 }
574
575 if (!(hdr->nlmsg_flags & NLM_F_REQUEST)) {
576 if (IS_ZEBRA_DEBUG_FPM)
577 zlog_debug(
578 "%s: [seq=%u] not a request, skipping",
579 __func__, hdr->nlmsg_seq);
580
581 /*
582 * This request is a bust, go to the next one
583 */
584 continue;
585 }
586
587 switch (hdr->nlmsg_type) {
588 case RTM_NEWROUTE:
589 ctx = dplane_ctx_alloc();
590 dplane_ctx_set_op(ctx, DPLANE_OP_ROUTE_NOTIFY);
591 if (netlink_route_change_read_unicast_internal(
592 hdr, 0, false, ctx) != 1) {
593 dplane_ctx_fini(&ctx);
594 stream_pulldown(fnc->ibuf);
595 /*
596 * Let's continue to read other messages
597 * Even if we ignore this one.
598 */
599 }
600 break;
601 default:
602 if (IS_ZEBRA_DEBUG_FPM)
603 zlog_debug(
604 "%s: Received message type %u which is not currently handled",
605 __func__, hdr->nlmsg_type);
606 break;
607 }
608 }
609
610 stream_reset(fnc->ibuf);
611 }
612
613 static void fpm_write(struct thread *t)
614 {
615 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
616 socklen_t statuslen;
617 ssize_t bwritten;
618 int rv, status;
619 size_t btotal;
620
621 if (fnc->connecting == true) {
622 status = 0;
623 statuslen = sizeof(status);
624
625 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
626 &statuslen);
627 if (rv == -1 || status != 0) {
628 if (rv != -1)
629 zlog_warn("%s: connection failed: %s", __func__,
630 strerror(status));
631 else
632 zlog_warn("%s: SO_ERROR failed: %s", __func__,
633 strerror(status));
634
635 atomic_fetch_add_explicit(
636 &fnc->counters.connection_errors, 1,
637 memory_order_relaxed);
638
639 FPM_RECONNECT(fnc);
640 return;
641 }
642
643 fnc->connecting = false;
644
645 /*
646 * Starting with LSPs walk all FPM objects, marking them
647 * as unsent and then replaying them.
648 */
649 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
650 &fnc->t_lspreset);
651
652 /* Permit receiving messages now. */
653 thread_add_read(fnc->fthread->master, fpm_read, fnc,
654 fnc->socket, &fnc->t_read);
655 }
656
657 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
658
659 while (true) {
660 /* Stream is empty: reset pointers and return. */
661 if (STREAM_READABLE(fnc->obuf) == 0) {
662 stream_reset(fnc->obuf);
663 break;
664 }
665
666 /* Try to write all at once. */
667 btotal = stream_get_endp(fnc->obuf) -
668 stream_get_getp(fnc->obuf);
669 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
670 if (bwritten == 0) {
671 atomic_fetch_add_explicit(
672 &fnc->counters.connection_closes, 1,
673 memory_order_relaxed);
674
675 if (IS_ZEBRA_DEBUG_FPM)
676 zlog_debug("%s: connection closed", __func__);
677 break;
678 }
679 if (bwritten == -1) {
680 /* Attempt to continue if blocked by a signal. */
681 if (errno == EINTR)
682 continue;
683 /* Receiver is probably slow, lets give it some time. */
684 if (errno == EAGAIN || errno == EWOULDBLOCK)
685 break;
686
687 atomic_fetch_add_explicit(
688 &fnc->counters.connection_errors, 1,
689 memory_order_relaxed);
690 zlog_warn("%s: connection failure: %s", __func__,
691 strerror(errno));
692
693 FPM_RECONNECT(fnc);
694 return;
695 }
696
697 /* Account all bytes sent. */
698 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
699 memory_order_relaxed);
700
701 /* Account number of bytes free. */
702 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
703 memory_order_relaxed);
704
705 stream_forward_getp(fnc->obuf, (size_t)bwritten);
706 }
707
708 /* Stream is not empty yet, we must schedule more writes. */
709 if (STREAM_READABLE(fnc->obuf)) {
710 stream_pulldown(fnc->obuf);
711 thread_add_write(fnc->fthread->master, fpm_write, fnc,
712 fnc->socket, &fnc->t_write);
713 return;
714 }
715 }
716
717 static void fpm_connect(struct thread *t)
718 {
719 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
720 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
721 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
722 socklen_t slen;
723 int rv, sock;
724 char addrstr[INET6_ADDRSTRLEN];
725
726 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
727 if (sock == -1) {
728 zlog_err("%s: fpm socket failed: %s", __func__,
729 strerror(errno));
730 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
731 &fnc->t_connect);
732 return;
733 }
734
735 set_nonblocking(sock);
736
737 if (fnc->addr.ss_family == AF_INET) {
738 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
739 slen = sizeof(*sin);
740 } else {
741 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
742 slen = sizeof(*sin6);
743 }
744
745 if (IS_ZEBRA_DEBUG_FPM)
746 zlog_debug("%s: attempting to connect to %s:%d", __func__,
747 addrstr, ntohs(sin->sin_port));
748
749 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
750 if (rv == -1 && errno != EINPROGRESS) {
751 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
752 memory_order_relaxed);
753 close(sock);
754 zlog_warn("%s: fpm connection failed: %s", __func__,
755 strerror(errno));
756 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
757 &fnc->t_connect);
758 return;
759 }
760
761 fnc->connecting = (errno == EINPROGRESS);
762 fnc->socket = sock;
763 if (!fnc->connecting)
764 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
765 &fnc->t_read);
766 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
767 &fnc->t_write);
768
769 /*
770 * Starting with LSPs walk all FPM objects, marking them
771 * as unsent and then replaying them.
772 *
773 * If we are not connected, then delay the objects reset/send.
774 */
775 if (!fnc->connecting)
776 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
777 &fnc->t_lspreset);
778 }
779
780 /**
781 * Encode data plane operation context into netlink and enqueue it in the FPM
782 * output buffer.
783 *
784 * @param fnc the netlink FPM context.
785 * @param ctx the data plane operation context data.
786 * @return 0 on success or -1 on not enough space.
787 */
788 static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
789 {
790 uint8_t nl_buf[NL_PKT_BUF_SIZE];
791 size_t nl_buf_len;
792 ssize_t rv;
793 uint64_t obytes, obytes_peak;
794 enum dplane_op_e op = dplane_ctx_get_op(ctx);
795
796 /*
797 * If we were configured to not use next hop groups, then quit as soon
798 * as possible.
799 */
800 if ((!fnc->use_nhg)
801 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
802 || op == DPLANE_OP_NH_UPDATE))
803 return 0;
804
805 nl_buf_len = 0;
806
807 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
808
809 switch (op) {
810 case DPLANE_OP_ROUTE_UPDATE:
811 case DPLANE_OP_ROUTE_DELETE:
812 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
813 nl_buf, sizeof(nl_buf),
814 true, fnc->use_nhg);
815 if (rv <= 0) {
816 zlog_err(
817 "%s: netlink_route_multipath_msg_encode failed",
818 __func__);
819 return 0;
820 }
821
822 nl_buf_len = (size_t)rv;
823
824 /* UPDATE operations need a INSTALL, otherwise just quit. */
825 if (op == DPLANE_OP_ROUTE_DELETE)
826 break;
827
828 /* FALL THROUGH */
829 case DPLANE_OP_ROUTE_INSTALL:
830 rv = netlink_route_multipath_msg_encode(
831 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
832 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
833 if (rv <= 0) {
834 zlog_err(
835 "%s: netlink_route_multipath_msg_encode failed",
836 __func__);
837 return 0;
838 }
839
840 nl_buf_len += (size_t)rv;
841 break;
842
843 case DPLANE_OP_MAC_INSTALL:
844 case DPLANE_OP_MAC_DELETE:
845 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
846 if (rv <= 0) {
847 zlog_err("%s: netlink_macfdb_update_ctx failed",
848 __func__);
849 return 0;
850 }
851
852 nl_buf_len = (size_t)rv;
853 break;
854
855 case DPLANE_OP_NH_DELETE:
856 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
857 sizeof(nl_buf), true);
858 if (rv <= 0) {
859 zlog_err("%s: netlink_nexthop_msg_encode failed",
860 __func__);
861 return 0;
862 }
863
864 nl_buf_len = (size_t)rv;
865 break;
866 case DPLANE_OP_NH_INSTALL:
867 case DPLANE_OP_NH_UPDATE:
868 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
869 sizeof(nl_buf), true);
870 if (rv <= 0) {
871 zlog_err("%s: netlink_nexthop_msg_encode failed",
872 __func__);
873 return 0;
874 }
875
876 nl_buf_len = (size_t)rv;
877 break;
878
879 case DPLANE_OP_LSP_INSTALL:
880 case DPLANE_OP_LSP_UPDATE:
881 case DPLANE_OP_LSP_DELETE:
882 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
883 if (rv <= 0) {
884 zlog_err("%s: netlink_lsp_msg_encoder failed",
885 __func__);
886 return 0;
887 }
888
889 nl_buf_len += (size_t)rv;
890 break;
891
892 /* Un-handled by FPM at this time. */
893 case DPLANE_OP_PW_INSTALL:
894 case DPLANE_OP_PW_UNINSTALL:
895 case DPLANE_OP_ADDR_INSTALL:
896 case DPLANE_OP_ADDR_UNINSTALL:
897 case DPLANE_OP_NEIGH_INSTALL:
898 case DPLANE_OP_NEIGH_UPDATE:
899 case DPLANE_OP_NEIGH_DELETE:
900 case DPLANE_OP_VTEP_ADD:
901 case DPLANE_OP_VTEP_DELETE:
902 case DPLANE_OP_SYS_ROUTE_ADD:
903 case DPLANE_OP_SYS_ROUTE_DELETE:
904 case DPLANE_OP_ROUTE_NOTIFY:
905 case DPLANE_OP_LSP_NOTIFY:
906 case DPLANE_OP_RULE_ADD:
907 case DPLANE_OP_RULE_DELETE:
908 case DPLANE_OP_RULE_UPDATE:
909 case DPLANE_OP_NEIGH_DISCOVER:
910 case DPLANE_OP_BR_PORT_UPDATE:
911 case DPLANE_OP_IPTABLE_ADD:
912 case DPLANE_OP_IPTABLE_DELETE:
913 case DPLANE_OP_IPSET_ADD:
914 case DPLANE_OP_IPSET_DELETE:
915 case DPLANE_OP_IPSET_ENTRY_ADD:
916 case DPLANE_OP_IPSET_ENTRY_DELETE:
917 case DPLANE_OP_NEIGH_IP_INSTALL:
918 case DPLANE_OP_NEIGH_IP_DELETE:
919 case DPLANE_OP_NEIGH_TABLE_UPDATE:
920 case DPLANE_OP_GRE_SET:
921 case DPLANE_OP_INTF_ADDR_ADD:
922 case DPLANE_OP_INTF_ADDR_DEL:
923 case DPLANE_OP_INTF_NETCONFIG:
924 case DPLANE_OP_INTF_INSTALL:
925 case DPLANE_OP_INTF_UPDATE:
926 case DPLANE_OP_INTF_DELETE:
927 case DPLANE_OP_TC_QDISC_INSTALL:
928 case DPLANE_OP_TC_QDISC_UNINSTALL:
929 case DPLANE_OP_TC_CLASS_ADD:
930 case DPLANE_OP_TC_CLASS_DELETE:
931 case DPLANE_OP_TC_CLASS_UPDATE:
932 case DPLANE_OP_TC_FILTER_ADD:
933 case DPLANE_OP_TC_FILTER_DELETE:
934 case DPLANE_OP_TC_FILTER_UPDATE:
935 case DPLANE_OP_NONE:
936 break;
937
938 }
939
940 /* Skip empty enqueues. */
941 if (nl_buf_len == 0)
942 return 0;
943
944 /* We must know if someday a message goes beyond 65KiB. */
945 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
946
947 /* Check if we have enough buffer space. */
948 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
949 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
950 memory_order_relaxed);
951
952 if (IS_ZEBRA_DEBUG_FPM)
953 zlog_debug(
954 "%s: buffer full: wants to write %zu but has %zu",
955 __func__, nl_buf_len + FPM_HEADER_SIZE,
956 STREAM_WRITEABLE(fnc->obuf));
957
958 return -1;
959 }
960
961 /*
962 * Fill in the FPM header information.
963 *
964 * See FPM_HEADER_SIZE definition for more information.
965 */
966 stream_putc(fnc->obuf, 1);
967 stream_putc(fnc->obuf, 1);
968 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
969
970 /* Write current data. */
971 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
972
973 /* Account number of bytes waiting to be written. */
974 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
975 nl_buf_len + FPM_HEADER_SIZE,
976 memory_order_relaxed);
977 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
978 memory_order_relaxed);
979 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
980 memory_order_relaxed);
981 if (obytes_peak < obytes)
982 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
983 memory_order_relaxed);
984
985 /* Tell the thread to start writing. */
986 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
987 &fnc->t_write);
988
989 return 0;
990 }
991
992 /*
993 * LSP walk/send functions
994 */
995 struct fpm_lsp_arg {
996 struct zebra_dplane_ctx *ctx;
997 struct fpm_nl_ctx *fnc;
998 bool complete;
999 };
1000
1001 static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
1002 {
1003 struct zebra_lsp *lsp = bucket->data;
1004 struct fpm_lsp_arg *fla = arg;
1005
1006 /* Skip entries which have already been sent */
1007 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
1008 return HASHWALK_CONTINUE;
1009
1010 dplane_ctx_reset(fla->ctx);
1011 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
1012
1013 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
1014 fla->complete = false;
1015 return HASHWALK_ABORT;
1016 }
1017
1018 /* Mark entry as sent */
1019 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
1020 return HASHWALK_CONTINUE;
1021 }
1022
1023 static void fpm_lsp_send(struct thread *t)
1024 {
1025 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1026 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1027 struct fpm_lsp_arg fla;
1028
1029 fla.fnc = fnc;
1030 fla.ctx = dplane_ctx_alloc();
1031 fla.complete = true;
1032
1033 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
1034
1035 dplane_ctx_fini(&fla.ctx);
1036
1037 if (fla.complete) {
1038 WALK_FINISH(fnc, FNE_LSP_FINISHED);
1039
1040 /* Now move onto routes */
1041 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
1042 &fnc->t_nhgreset);
1043 } else {
1044 /* Didn't finish - reschedule LSP walk */
1045 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
1046 &fnc->t_lspwalk);
1047 }
1048 }
1049
1050 /*
1051 * Next hop walk/send functions.
1052 */
1053 struct fpm_nhg_arg {
1054 struct zebra_dplane_ctx *ctx;
1055 struct fpm_nl_ctx *fnc;
1056 bool complete;
1057 };
1058
1059 static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
1060 {
1061 struct nhg_hash_entry *nhe = bucket->data;
1062 struct fpm_nhg_arg *fna = arg;
1063
1064 /* This entry was already sent, skip it. */
1065 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
1066 return HASHWALK_CONTINUE;
1067
1068 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
1069 dplane_ctx_reset(fna->ctx);
1070 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
1071 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
1072 /* Our buffers are full, lets give it some cycles. */
1073 fna->complete = false;
1074 return HASHWALK_ABORT;
1075 }
1076
1077 /* Mark group as sent, so it doesn't get sent again. */
1078 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1079
1080 return HASHWALK_CONTINUE;
1081 }
1082
1083 static void fpm_nhg_send(struct thread *t)
1084 {
1085 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1086 struct fpm_nhg_arg fna;
1087
1088 fna.fnc = fnc;
1089 fna.ctx = dplane_ctx_alloc();
1090 fna.complete = true;
1091
1092 /* Send next hops. */
1093 if (fnc->use_nhg)
1094 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
1095
1096 /* `free()` allocated memory. */
1097 dplane_ctx_fini(&fna.ctx);
1098
1099 /* We are done sending next hops, lets install the routes now. */
1100 if (fna.complete) {
1101 WALK_FINISH(fnc, FNE_NHG_FINISHED);
1102 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
1103 &fnc->t_ribreset);
1104 } else /* Otherwise reschedule next hop group again. */
1105 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
1106 &fnc->t_nhgwalk);
1107 }
1108
1109 /**
1110 * Send all RIB installed routes to the connected data plane.
1111 */
1112 static void fpm_rib_send(struct thread *t)
1113 {
1114 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1115 rib_dest_t *dest;
1116 struct route_node *rn;
1117 struct route_table *rt;
1118 struct zebra_dplane_ctx *ctx;
1119 rib_tables_iter_t rt_iter;
1120
1121 /* Allocate temporary context for all transactions. */
1122 ctx = dplane_ctx_alloc();
1123
1124 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1125 while ((rt = rib_tables_iter_next(&rt_iter))) {
1126 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1127 dest = rib_dest_from_rnode(rn);
1128 /* Skip bad route entries. */
1129 if (dest == NULL || dest->selected_fib == NULL)
1130 continue;
1131
1132 /* Check for already sent routes. */
1133 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
1134 continue;
1135
1136 /* Enqueue route install. */
1137 dplane_ctx_reset(ctx);
1138 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1139 dest->selected_fib);
1140 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1141 /* Free the temporary allocated context. */
1142 dplane_ctx_fini(&ctx);
1143
1144 thread_add_timer(zrouter.master, fpm_rib_send,
1145 fnc, 1, &fnc->t_ribwalk);
1146 return;
1147 }
1148
1149 /* Mark as sent. */
1150 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1151 }
1152 }
1153
1154 /* Free the temporary allocated context. */
1155 dplane_ctx_fini(&ctx);
1156
1157 /* All RIB routes sent! */
1158 WALK_FINISH(fnc, FNE_RIB_FINISHED);
1159
1160 /* Schedule next event: RMAC reset. */
1161 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1162 &fnc->t_rmacreset);
1163 }
1164
1165 /*
1166 * The next three functions will handle RMAC enqueue.
1167 */
1168 struct fpm_rmac_arg {
1169 struct zebra_dplane_ctx *ctx;
1170 struct fpm_nl_ctx *fnc;
1171 struct zebra_l3vni *zl3vni;
1172 bool complete;
1173 };
1174
1175 static void fpm_enqueue_rmac_table(struct hash_bucket *bucket, void *arg)
1176 {
1177 struct fpm_rmac_arg *fra = arg;
1178 struct zebra_mac *zrmac = bucket->data;
1179 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1180 struct zebra_vxlan_vni *vni;
1181 struct zebra_if *br_zif;
1182 vlanid_t vid;
1183 bool sticky;
1184
1185 /* Entry already sent. */
1186 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
1187 return;
1188
1189 sticky = !!CHECK_FLAG(zrmac->flags,
1190 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1191 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1192 vni = zebra_vxlan_if_vni_find(zif, fra->zl3vni->vni);
1193 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vni->access_vlan : 0;
1194
1195 dplane_ctx_reset(fra->ctx);
1196 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1197 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
1198 zif->brslave_info.br_if, vid, &zrmac->macaddr, vni->vni,
1199 zrmac->fwd_info.r_vtep_ip, sticky, 0 /*nhg*/,
1200 0 /*update_flags*/);
1201 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
1202 thread_add_timer(zrouter.master, fpm_rmac_send,
1203 fra->fnc, 1, &fra->fnc->t_rmacwalk);
1204 fra->complete = false;
1205 }
1206 }
1207
1208 static void fpm_enqueue_l3vni_table(struct hash_bucket *bucket, void *arg)
1209 {
1210 struct fpm_rmac_arg *fra = arg;
1211 struct zebra_l3vni *zl3vni = bucket->data;
1212
1213 fra->zl3vni = zl3vni;
1214 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1215 }
1216
1217 static void fpm_rmac_send(struct thread *t)
1218 {
1219 struct fpm_rmac_arg fra;
1220
1221 fra.fnc = THREAD_ARG(t);
1222 fra.ctx = dplane_ctx_alloc();
1223 fra.complete = true;
1224 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1225 dplane_ctx_fini(&fra.ctx);
1226
1227 /* RMAC walk completed. */
1228 if (fra.complete)
1229 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
1230 }
1231
1232 /*
1233 * Resets the next hop FPM flags so we send all next hops again.
1234 */
1235 static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1236 {
1237 struct nhg_hash_entry *nhe = bucket->data;
1238
1239 /* Unset FPM installation flag so it gets installed again. */
1240 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1241 }
1242
1243 static void fpm_nhg_reset(struct thread *t)
1244 {
1245 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1246
1247 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
1248
1249 /* Schedule next step: send next hop groups. */
1250 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
1251 }
1252
1253 /*
1254 * Resets the LSP FPM flag so we send all LSPs again.
1255 */
1256 static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1257 {
1258 struct zebra_lsp *lsp = bucket->data;
1259
1260 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1261 }
1262
1263 static void fpm_lsp_reset(struct thread *t)
1264 {
1265 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1266 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1267
1268 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1269
1270 /* Schedule next step: send LSPs */
1271 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
1272 }
1273
1274 /**
1275 * Resets the RIB FPM flags so we send all routes again.
1276 */
1277 static void fpm_rib_reset(struct thread *t)
1278 {
1279 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1280 rib_dest_t *dest;
1281 struct route_node *rn;
1282 struct route_table *rt;
1283 rib_tables_iter_t rt_iter;
1284
1285 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1286 while ((rt = rib_tables_iter_next(&rt_iter))) {
1287 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1288 dest = rib_dest_from_rnode(rn);
1289 /* Skip bad route entries. */
1290 if (dest == NULL)
1291 continue;
1292
1293 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1294 }
1295 }
1296
1297 /* Schedule next step: send RIB routes. */
1298 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
1299 }
1300
1301 /*
1302 * The next three function will handle RMAC table reset.
1303 */
1304 static void fpm_unset_rmac_table(struct hash_bucket *bucket, void *arg)
1305 {
1306 struct zebra_mac *zrmac = bucket->data;
1307
1308 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1309 }
1310
1311 static void fpm_unset_l3vni_table(struct hash_bucket *bucket, void *arg)
1312 {
1313 struct zebra_l3vni *zl3vni = bucket->data;
1314
1315 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1316 }
1317
1318 static void fpm_rmac_reset(struct thread *t)
1319 {
1320 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1321
1322 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1323
1324 /* Schedule next event: send RMAC entries. */
1325 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1326 &fnc->t_rmacwalk);
1327 }
1328
1329 static void fpm_process_queue(struct thread *t)
1330 {
1331 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1332 struct zebra_dplane_ctx *ctx;
1333 bool no_bufs = false;
1334 uint64_t processed_contexts = 0;
1335
1336 while (true) {
1337 /* No space available yet. */
1338 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) {
1339 no_bufs = true;
1340 break;
1341 }
1342
1343 /* Dequeue next item or quit processing. */
1344 frr_with_mutex (&fnc->ctxqueue_mutex) {
1345 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1346 }
1347 if (ctx == NULL)
1348 break;
1349
1350 /*
1351 * Intentionally ignoring the return value
1352 * as that we are ensuring that we can write to
1353 * the output data in the STREAM_WRITEABLE
1354 * check above, so we can ignore the return
1355 */
1356 if (fnc->socket != -1)
1357 (void)fpm_nl_enqueue(fnc, ctx);
1358
1359 /* Account the processed entries. */
1360 processed_contexts++;
1361 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1362 memory_order_relaxed);
1363
1364 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1365 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1366 }
1367
1368 /* Update count of processed contexts */
1369 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts,
1370 processed_contexts, memory_order_relaxed);
1371
1372 /* Re-schedule if we ran out of buffer space */
1373 if (no_bufs)
1374 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1375 fnc, 0, &fnc->t_dequeue);
1376
1377 /*
1378 * Let the dataplane thread know if there are items in the
1379 * output queue to be processed. Otherwise they may sit
1380 * until the dataplane thread gets scheduled for new,
1381 * unrelated work.
1382 */
1383 if (dplane_provider_out_ctx_queue_len(fnc->prov) > 0)
1384 dplane_provider_work_ready();
1385 }
1386
1387 /**
1388 * Handles external (e.g. CLI, data plane or others) events.
1389 */
1390 static void fpm_process_event(struct thread *t)
1391 {
1392 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1393 enum fpm_nl_events event = THREAD_VAL(t);
1394
1395 switch (event) {
1396 case FNE_DISABLE:
1397 zlog_info("%s: manual FPM disable event", __func__);
1398 fnc->disabled = true;
1399 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1400 memory_order_relaxed);
1401
1402 /* Call reconnect to disable timers and clean up context. */
1403 fpm_reconnect(fnc);
1404 break;
1405
1406 case FNE_RECONNECT:
1407 zlog_info("%s: manual FPM reconnect event", __func__);
1408 fnc->disabled = false;
1409 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1410 memory_order_relaxed);
1411 fpm_reconnect(fnc);
1412 break;
1413
1414 case FNE_RESET_COUNTERS:
1415 zlog_info("%s: manual FPM counters reset event", __func__);
1416 memset(&fnc->counters, 0, sizeof(fnc->counters));
1417 break;
1418
1419 case FNE_TOGGLE_NHG:
1420 zlog_info("%s: toggle next hop groups support", __func__);
1421 fnc->use_nhg = !fnc->use_nhg;
1422 fpm_reconnect(fnc);
1423 break;
1424
1425 case FNE_INTERNAL_RECONNECT:
1426 fpm_reconnect(fnc);
1427 break;
1428
1429 case FNE_NHG_FINISHED:
1430 if (IS_ZEBRA_DEBUG_FPM)
1431 zlog_debug("%s: next hop groups walk finished",
1432 __func__);
1433 break;
1434 case FNE_RIB_FINISHED:
1435 if (IS_ZEBRA_DEBUG_FPM)
1436 zlog_debug("%s: RIB walk finished", __func__);
1437 break;
1438 case FNE_RMAC_FINISHED:
1439 if (IS_ZEBRA_DEBUG_FPM)
1440 zlog_debug("%s: RMAC walk finished", __func__);
1441 break;
1442 case FNE_LSP_FINISHED:
1443 if (IS_ZEBRA_DEBUG_FPM)
1444 zlog_debug("%s: LSP walk finished", __func__);
1445 break;
1446 }
1447 }
1448
1449 /*
1450 * Data plane functions.
1451 */
1452 static int fpm_nl_start(struct zebra_dplane_provider *prov)
1453 {
1454 struct fpm_nl_ctx *fnc;
1455
1456 fnc = dplane_provider_get_data(prov);
1457 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1458 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1459 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1460 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1461 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1462 fnc->socket = -1;
1463 fnc->disabled = true;
1464 fnc->prov = prov;
1465 dplane_ctx_q_init(&fnc->ctxqueue);
1466 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
1467
1468 /* Set default values. */
1469 fnc->use_nhg = true;
1470
1471 return 0;
1472 }
1473
1474 static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
1475 {
1476 /* Disable all events and close socket. */
1477 THREAD_OFF(fnc->t_lspreset);
1478 THREAD_OFF(fnc->t_lspwalk);
1479 THREAD_OFF(fnc->t_nhgreset);
1480 THREAD_OFF(fnc->t_nhgwalk);
1481 THREAD_OFF(fnc->t_ribreset);
1482 THREAD_OFF(fnc->t_ribwalk);
1483 THREAD_OFF(fnc->t_rmacreset);
1484 THREAD_OFF(fnc->t_rmacwalk);
1485 THREAD_OFF(fnc->t_event);
1486 THREAD_OFF(fnc->t_nhg);
1487 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1488 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1489 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
1490
1491 if (fnc->socket != -1) {
1492 close(fnc->socket);
1493 fnc->socket = -1;
1494 }
1495
1496 return 0;
1497 }
1498
1499 static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1500 {
1501 /* Stop the running thread. */
1502 frr_pthread_stop(fnc->fthread, NULL);
1503
1504 /* Free all allocated resources. */
1505 pthread_mutex_destroy(&fnc->obuf_mutex);
1506 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
1507 stream_free(fnc->ibuf);
1508 stream_free(fnc->obuf);
1509 free(gfnc);
1510 gfnc = NULL;
1511
1512 return 0;
1513 }
1514
1515 static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1516 {
1517 struct fpm_nl_ctx *fnc;
1518
1519 fnc = dplane_provider_get_data(prov);
1520 if (early)
1521 return fpm_nl_finish_early(fnc);
1522
1523 return fpm_nl_finish_late(fnc);
1524 }
1525
1526 static int fpm_nl_process(struct zebra_dplane_provider *prov)
1527 {
1528 struct zebra_dplane_ctx *ctx;
1529 struct fpm_nl_ctx *fnc;
1530 int counter, limit;
1531 uint64_t cur_queue, peak_queue = 0, stored_peak_queue;
1532
1533 fnc = dplane_provider_get_data(prov);
1534 limit = dplane_provider_get_work_limit(prov);
1535 for (counter = 0; counter < limit; counter++) {
1536 ctx = dplane_provider_dequeue_in_ctx(prov);
1537 if (ctx == NULL)
1538 break;
1539
1540 /*
1541 * Skip all notifications if not connected, we'll walk the RIB
1542 * anyway.
1543 */
1544 if (fnc->socket != -1 && fnc->connecting == false) {
1545 /*
1546 * Update the number of queued contexts *before*
1547 * enqueueing, to ensure counter consistency.
1548 */
1549 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1550 1, memory_order_relaxed);
1551
1552 frr_with_mutex (&fnc->ctxqueue_mutex) {
1553 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1554 }
1555
1556 cur_queue = atomic_load_explicit(
1557 &fnc->counters.ctxqueue_len,
1558 memory_order_relaxed);
1559 if (peak_queue < cur_queue)
1560 peak_queue = cur_queue;
1561 continue;
1562 }
1563
1564 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1565 dplane_provider_enqueue_out_ctx(prov, ctx);
1566 }
1567
1568 /* Update peak queue length, if we just observed a new peak */
1569 stored_peak_queue = atomic_load_explicit(
1570 &fnc->counters.ctxqueue_len_peak, memory_order_relaxed);
1571 if (stored_peak_queue < peak_queue)
1572 atomic_store_explicit(&fnc->counters.ctxqueue_len_peak,
1573 peak_queue, memory_order_relaxed);
1574
1575 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1576 memory_order_relaxed)
1577 > 0)
1578 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1579 fnc, 0, &fnc->t_dequeue);
1580
1581 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1582 if (counter >= limit)
1583 dplane_provider_work_ready();
1584
1585 return 0;
1586 }
1587
1588 static int fpm_nl_new(struct thread_master *tm)
1589 {
1590 struct zebra_dplane_provider *prov = NULL;
1591 int rv;
1592
1593 gfnc = calloc(1, sizeof(*gfnc));
1594 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1595 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
1596 fpm_nl_process, fpm_nl_finish, gfnc,
1597 &prov);
1598
1599 if (IS_ZEBRA_DEBUG_DPLANE)
1600 zlog_debug("%s register status: %d", prov_name, rv);
1601
1602 install_node(&fpm_node);
1603 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1604 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1605 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
1606 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1607 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
1608 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1609 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
1610
1611 return 0;
1612 }
1613
1614 static int fpm_nl_init(void)
1615 {
1616 hook_register(frr_late_init, fpm_nl_new);
1617 return 0;
1618 }
1619
1620 FRR_MODULE_SETUP(
1621 .name = "dplane_fpm_nl",
1622 .version = "0.0.1",
1623 .description = "Data plane plugin for FPM using netlink.",
1624 .init = fpm_nl_init,
1625 );