]> git.proxmox.com Git - mirror_frr.git/blob - zebra/dplane_fpm_nl.c
doc: Add `show ipv6 rpf X:X::X:X` command to docs
[mirror_frr.git] / zebra / dplane_fpm_nl.c
1 /*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h" /* Include this explicitly */
24 #endif
25
26 #include <arpa/inet.h>
27
28 #include <sys/types.h>
29 #include <sys/socket.h>
30
31 #include <errno.h>
32 #include <string.h>
33
34 #include "lib/zebra.h"
35 #include "lib/json.h"
36 #include "lib/libfrr.h"
37 #include "lib/frratomic.h"
38 #include "lib/command.h"
39 #include "lib/memory.h"
40 #include "lib/network.h"
41 #include "lib/ns.h"
42 #include "lib/frr_pthread.h"
43 #include "zebra/debug.h"
44 #include "zebra/interface.h"
45 #include "zebra/zebra_dplane.h"
46 #include "zebra/zebra_mpls.h"
47 #include "zebra/zebra_router.h"
48 #include "zebra/interface.h"
49 #include "zebra/zebra_vxlan_private.h"
50 #include "zebra/zebra_evpn.h"
51 #include "zebra/zebra_evpn_mac.h"
52 #include "zebra/kernel_netlink.h"
53 #include "zebra/rt_netlink.h"
54 #include "zebra/debug.h"
55 #include "fpm/fpm.h"
56
57 #define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
58 #define SOUTHBOUND_DEFAULT_PORT 2620
59
60 /**
61 * FPM header:
62 * {
63 * version: 1 byte (always 1),
64 * type: 1 byte (1 for netlink, 2 protobuf),
65 * len: 2 bytes (network order),
66 * }
67 *
68 * This header is used with any format to tell the users how many bytes to
69 * expect.
70 */
71 #define FPM_HEADER_SIZE 4
72
73 static const char *prov_name = "dplane_fpm_nl";
74
75 struct fpm_nl_ctx {
76 /* data plane connection. */
77 int socket;
78 bool disabled;
79 bool connecting;
80 bool use_nhg;
81 struct sockaddr_storage addr;
82
83 /* data plane buffers. */
84 struct stream *ibuf;
85 struct stream *obuf;
86 pthread_mutex_t obuf_mutex;
87
88 /*
89 * data plane context queue:
90 * When a FPM server connection becomes a bottleneck, we must keep the
91 * data plane contexts until we get a chance to process them.
92 */
93 struct dplane_ctx_list_head ctxqueue;
94 pthread_mutex_t ctxqueue_mutex;
95
96 /* data plane events. */
97 struct zebra_dplane_provider *prov;
98 struct frr_pthread *fthread;
99 struct thread *t_connect;
100 struct thread *t_read;
101 struct thread *t_write;
102 struct thread *t_event;
103 struct thread *t_nhg;
104 struct thread *t_dequeue;
105
106 /* zebra events. */
107 struct thread *t_lspreset;
108 struct thread *t_lspwalk;
109 struct thread *t_nhgreset;
110 struct thread *t_nhgwalk;
111 struct thread *t_ribreset;
112 struct thread *t_ribwalk;
113 struct thread *t_rmacreset;
114 struct thread *t_rmacwalk;
115
116 /* Statistic counters. */
117 struct {
118 /* Amount of bytes read into ibuf. */
119 _Atomic uint32_t bytes_read;
120 /* Amount of bytes written from obuf. */
121 _Atomic uint32_t bytes_sent;
122 /* Output buffer current usage. */
123 _Atomic uint32_t obuf_bytes;
124 /* Output buffer peak usage. */
125 _Atomic uint32_t obuf_peak;
126
127 /* Amount of connection closes. */
128 _Atomic uint32_t connection_closes;
129 /* Amount of connection errors. */
130 _Atomic uint32_t connection_errors;
131
132 /* Amount of user configurations: FNE_RECONNECT. */
133 _Atomic uint32_t user_configures;
134 /* Amount of user disable requests: FNE_DISABLE. */
135 _Atomic uint32_t user_disables;
136
137 /* Amount of data plane context processed. */
138 _Atomic uint32_t dplane_contexts;
139 /* Amount of data plane contexts enqueued. */
140 _Atomic uint32_t ctxqueue_len;
141 /* Peak amount of data plane contexts enqueued. */
142 _Atomic uint32_t ctxqueue_len_peak;
143
144 /* Amount of buffer full events. */
145 _Atomic uint32_t buffer_full;
146 } counters;
147 } *gfnc;
148
149 enum fpm_nl_events {
150 /* Ask for FPM to reconnect the external server. */
151 FNE_RECONNECT,
152 /* Disable FPM. */
153 FNE_DISABLE,
154 /* Reset counters. */
155 FNE_RESET_COUNTERS,
156 /* Toggle next hop group feature. */
157 FNE_TOGGLE_NHG,
158 /* Reconnect request by our own code to avoid races. */
159 FNE_INTERNAL_RECONNECT,
160
161 /* LSP walk finished. */
162 FNE_LSP_FINISHED,
163 /* Next hop groups walk finished. */
164 FNE_NHG_FINISHED,
165 /* RIB walk finished. */
166 FNE_RIB_FINISHED,
167 /* RMAC walk finished. */
168 FNE_RMAC_FINISHED,
169 };
170
171 #define FPM_RECONNECT(fnc) \
172 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
173 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
174
175 #define WALK_FINISH(fnc, ev) \
176 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
177 (ev), NULL)
178
179 /*
180 * Prototypes.
181 */
182 static void fpm_process_event(struct thread *t);
183 static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
184 static void fpm_lsp_send(struct thread *t);
185 static void fpm_lsp_reset(struct thread *t);
186 static void fpm_nhg_send(struct thread *t);
187 static void fpm_nhg_reset(struct thread *t);
188 static void fpm_rib_send(struct thread *t);
189 static void fpm_rib_reset(struct thread *t);
190 static void fpm_rmac_send(struct thread *t);
191 static void fpm_rmac_reset(struct thread *t);
192
193 /*
194 * CLI.
195 */
196 #define FPM_STR "Forwarding Plane Manager configuration\n"
197
198 DEFUN(fpm_set_address, fpm_set_address_cmd,
199 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
200 FPM_STR
201 "FPM remote listening server address\n"
202 "Remote IPv4 FPM server\n"
203 "Remote IPv6 FPM server\n"
204 "FPM remote listening server port\n"
205 "Remote FPM server port\n")
206 {
207 struct sockaddr_in *sin;
208 struct sockaddr_in6 *sin6;
209 uint16_t port = 0;
210 uint8_t naddr[INET6_BUFSIZ];
211
212 if (argc == 5)
213 port = strtol(argv[4]->arg, NULL, 10);
214
215 /* Handle IPv4 addresses. */
216 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
217 sin = (struct sockaddr_in *)&gfnc->addr;
218
219 memset(sin, 0, sizeof(*sin));
220 sin->sin_family = AF_INET;
221 sin->sin_port =
222 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
223 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
224 sin->sin_len = sizeof(*sin);
225 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
226 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
227
228 goto ask_reconnect;
229 }
230
231 /* Handle IPv6 addresses. */
232 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
233 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
234 return CMD_WARNING;
235 }
236
237 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
238 memset(sin6, 0, sizeof(*sin6));
239 sin6->sin6_family = AF_INET6;
240 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
241 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
242 sin6->sin6_len = sizeof(*sin6);
243 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
244 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
245
246 ask_reconnect:
247 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
248 FNE_RECONNECT, &gfnc->t_event);
249 return CMD_SUCCESS;
250 }
251
252 DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
253 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
254 NO_STR
255 FPM_STR
256 "FPM remote listening server address\n"
257 "Remote IPv4 FPM server\n"
258 "Remote IPv6 FPM server\n"
259 "FPM remote listening server port\n"
260 "Remote FPM server port\n")
261 {
262 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
263 FNE_DISABLE, &gfnc->t_event);
264 return CMD_SUCCESS;
265 }
266
267 DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
268 "fpm use-next-hop-groups",
269 FPM_STR
270 "Use netlink next hop groups feature.\n")
271 {
272 /* Already enabled. */
273 if (gfnc->use_nhg)
274 return CMD_SUCCESS;
275
276 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
277 FNE_TOGGLE_NHG, &gfnc->t_nhg);
278
279 return CMD_SUCCESS;
280 }
281
282 DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
283 "no fpm use-next-hop-groups",
284 NO_STR
285 FPM_STR
286 "Use netlink next hop groups feature.\n")
287 {
288 /* Already disabled. */
289 if (!gfnc->use_nhg)
290 return CMD_SUCCESS;
291
292 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
293 FNE_TOGGLE_NHG, &gfnc->t_nhg);
294
295 return CMD_SUCCESS;
296 }
297
298 DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
299 "clear fpm counters",
300 CLEAR_STR
301 FPM_STR
302 "FPM statistic counters\n")
303 {
304 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
305 FNE_RESET_COUNTERS, &gfnc->t_event);
306 return CMD_SUCCESS;
307 }
308
309 DEFUN(fpm_show_counters, fpm_show_counters_cmd,
310 "show fpm counters",
311 SHOW_STR
312 FPM_STR
313 "FPM statistic counters\n")
314 {
315 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
316
317 #define SHOW_COUNTER(label, counter) \
318 vty_out(vty, "%28s: %u\n", (label), (counter))
319
320 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
321 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
322 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
323 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
324 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
325 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
326 SHOW_COUNTER("Data plane items processed",
327 gfnc->counters.dplane_contexts);
328 SHOW_COUNTER("Data plane items enqueued",
329 gfnc->counters.ctxqueue_len);
330 SHOW_COUNTER("Data plane items queue peak",
331 gfnc->counters.ctxqueue_len_peak);
332 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
333 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
334 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
335
336 #undef SHOW_COUNTER
337
338 return CMD_SUCCESS;
339 }
340
341 DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
342 "show fpm counters json",
343 SHOW_STR
344 FPM_STR
345 "FPM statistic counters\n"
346 JSON_STR)
347 {
348 struct json_object *jo;
349
350 jo = json_object_new_object();
351 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
352 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
353 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
354 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
355 json_object_int_add(jo, "connection-closes",
356 gfnc->counters.connection_closes);
357 json_object_int_add(jo, "connection-errors",
358 gfnc->counters.connection_errors);
359 json_object_int_add(jo, "data-plane-contexts",
360 gfnc->counters.dplane_contexts);
361 json_object_int_add(jo, "data-plane-contexts-queue",
362 gfnc->counters.ctxqueue_len);
363 json_object_int_add(jo, "data-plane-contexts-queue-peak",
364 gfnc->counters.ctxqueue_len_peak);
365 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
366 json_object_int_add(jo, "user-configures",
367 gfnc->counters.user_configures);
368 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
369 vty_json(vty, jo);
370
371 return CMD_SUCCESS;
372 }
373
374 static int fpm_write_config(struct vty *vty)
375 {
376 struct sockaddr_in *sin;
377 struct sockaddr_in6 *sin6;
378 int written = 0;
379
380 if (gfnc->disabled)
381 return written;
382
383 switch (gfnc->addr.ss_family) {
384 case AF_INET:
385 written = 1;
386 sin = (struct sockaddr_in *)&gfnc->addr;
387 vty_out(vty, "fpm address %pI4", &sin->sin_addr);
388 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
389 vty_out(vty, " port %d", ntohs(sin->sin_port));
390
391 vty_out(vty, "\n");
392 break;
393 case AF_INET6:
394 written = 1;
395 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
396 vty_out(vty, "fpm address %pI6", &sin6->sin6_addr);
397 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
398 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
399
400 vty_out(vty, "\n");
401 break;
402
403 default:
404 break;
405 }
406
407 if (!gfnc->use_nhg) {
408 vty_out(vty, "no fpm use-next-hop-groups\n");
409 written = 1;
410 }
411
412 return written;
413 }
414
415 static struct cmd_node fpm_node = {
416 .name = "fpm",
417 .node = FPM_NODE,
418 .prompt = "",
419 .config_write = fpm_write_config,
420 };
421
422 /*
423 * FPM functions.
424 */
425 static void fpm_connect(struct thread *t);
426
427 static void fpm_reconnect(struct fpm_nl_ctx *fnc)
428 {
429 /* Cancel all zebra threads first. */
430 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
431 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
432 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
433 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
434 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
435 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
436 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
437 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
438
439 /*
440 * Grab the lock to empty the streams (data plane might try to
441 * enqueue updates while we are closing).
442 */
443 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
444
445 /* Avoid calling close on `-1`. */
446 if (fnc->socket != -1) {
447 close(fnc->socket);
448 fnc->socket = -1;
449 }
450
451 stream_reset(fnc->ibuf);
452 stream_reset(fnc->obuf);
453 THREAD_OFF(fnc->t_read);
454 THREAD_OFF(fnc->t_write);
455
456 /* FPM is disabled, don't attempt to connect. */
457 if (fnc->disabled)
458 return;
459
460 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
461 &fnc->t_connect);
462 }
463
464 static void fpm_read(struct thread *t)
465 {
466 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
467 fpm_msg_hdr_t fpm;
468 ssize_t rv;
469 char buf[65535];
470 struct nlmsghdr *hdr;
471 struct zebra_dplane_ctx *ctx;
472 size_t available_bytes;
473 size_t hdr_available_bytes;
474
475 /* Let's ignore the input at the moment. */
476 rv = stream_read_try(fnc->ibuf, fnc->socket,
477 STREAM_WRITEABLE(fnc->ibuf));
478 if (rv == 0) {
479 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
480 memory_order_relaxed);
481
482 if (IS_ZEBRA_DEBUG_FPM)
483 zlog_debug("%s: connection closed", __func__);
484
485 FPM_RECONNECT(fnc);
486 return;
487 }
488 if (rv == -1) {
489 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
490 memory_order_relaxed);
491 zlog_warn("%s: connection failure: %s", __func__,
492 strerror(errno));
493 FPM_RECONNECT(fnc);
494 return;
495 }
496
497 /* Schedule the next read */
498 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
499 &fnc->t_read);
500
501 /* We've got an interruption. */
502 if (rv == -2)
503 return;
504
505
506 /* Account all bytes read. */
507 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
508 memory_order_relaxed);
509
510 available_bytes = STREAM_READABLE(fnc->ibuf);
511 while (available_bytes) {
512 if (available_bytes < (ssize_t)FPM_MSG_HDR_LEN) {
513 stream_pulldown(fnc->ibuf);
514 return;
515 }
516
517 fpm.version = stream_getc(fnc->ibuf);
518 fpm.msg_type = stream_getc(fnc->ibuf);
519 fpm.msg_len = stream_getw(fnc->ibuf);
520
521 if (fpm.version != FPM_PROTO_VERSION &&
522 fpm.msg_type != FPM_MSG_TYPE_NETLINK) {
523 stream_reset(fnc->ibuf);
524 zlog_warn(
525 "%s: Received version/msg_type %u/%u, expected 1/1",
526 __func__, fpm.version, fpm.msg_type);
527
528 FPM_RECONNECT(fnc);
529 return;
530 }
531
532 /*
533 * If the passed in length doesn't even fill in the header
534 * something is wrong and reset.
535 */
536 if (fpm.msg_len < FPM_MSG_HDR_LEN) {
537 zlog_warn(
538 "%s: Received message length: %u that does not even fill the FPM header",
539 __func__, fpm.msg_len);
540 FPM_RECONNECT(fnc);
541 return;
542 }
543
544 /*
545 * If we have not received the whole payload, reset the stream
546 * back to the beginning of the header and move it to the
547 * top.
548 */
549 if (fpm.msg_len > available_bytes) {
550 stream_rewind_getp(fnc->ibuf, FPM_MSG_HDR_LEN);
551 stream_pulldown(fnc->ibuf);
552 return;
553 }
554
555 available_bytes -= FPM_MSG_HDR_LEN;
556
557 /*
558 * Place the data from the stream into a buffer
559 */
560 hdr = (struct nlmsghdr *)buf;
561 stream_get(buf, fnc->ibuf, fpm.msg_len - FPM_MSG_HDR_LEN);
562 hdr_available_bytes = fpm.msg_len - FPM_MSG_HDR_LEN;
563 available_bytes -= hdr_available_bytes;
564
565 /* Sanity check: must be at least header size. */
566 if (hdr->nlmsg_len < sizeof(*hdr)) {
567 zlog_warn(
568 "%s: [seq=%u] invalid message length %u (< %zu)",
569 __func__, hdr->nlmsg_seq, hdr->nlmsg_len,
570 sizeof(*hdr));
571 continue;
572 }
573 if (hdr->nlmsg_len > fpm.msg_len) {
574 zlog_warn(
575 "%s: Received a inner header length of %u that is greater than the fpm total length of %u",
576 __func__, hdr->nlmsg_len, fpm.msg_len);
577 FPM_RECONNECT(fnc);
578 }
579 /* Not enough bytes available. */
580 if (hdr->nlmsg_len > hdr_available_bytes) {
581 zlog_warn(
582 "%s: [seq=%u] invalid message length %u (> %zu)",
583 __func__, hdr->nlmsg_seq, hdr->nlmsg_len,
584 available_bytes);
585 continue;
586 }
587
588 if (!(hdr->nlmsg_flags & NLM_F_REQUEST)) {
589 if (IS_ZEBRA_DEBUG_FPM)
590 zlog_debug(
591 "%s: [seq=%u] not a request, skipping",
592 __func__, hdr->nlmsg_seq);
593
594 /*
595 * This request is a bust, go to the next one
596 */
597 continue;
598 }
599
600 switch (hdr->nlmsg_type) {
601 case RTM_NEWROUTE:
602 ctx = dplane_ctx_alloc();
603 dplane_ctx_set_op(ctx, DPLANE_OP_ROUTE_NOTIFY);
604 if (netlink_route_change_read_unicast_internal(
605 hdr, 0, false, ctx) != 1) {
606 dplane_ctx_fini(&ctx);
607 stream_pulldown(fnc->ibuf);
608 /*
609 * Let's continue to read other messages
610 * Even if we ignore this one.
611 */
612 }
613 break;
614 default:
615 if (IS_ZEBRA_DEBUG_FPM)
616 zlog_debug(
617 "%s: Received message type %u which is not currently handled",
618 __func__, hdr->nlmsg_type);
619 break;
620 }
621 }
622
623 stream_reset(fnc->ibuf);
624 }
625
626 static void fpm_write(struct thread *t)
627 {
628 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
629 socklen_t statuslen;
630 ssize_t bwritten;
631 int rv, status;
632 size_t btotal;
633
634 if (fnc->connecting == true) {
635 status = 0;
636 statuslen = sizeof(status);
637
638 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
639 &statuslen);
640 if (rv == -1 || status != 0) {
641 if (rv != -1)
642 zlog_warn("%s: connection failed: %s", __func__,
643 strerror(status));
644 else
645 zlog_warn("%s: SO_ERROR failed: %s", __func__,
646 strerror(status));
647
648 atomic_fetch_add_explicit(
649 &fnc->counters.connection_errors, 1,
650 memory_order_relaxed);
651
652 FPM_RECONNECT(fnc);
653 return;
654 }
655
656 fnc->connecting = false;
657
658 /*
659 * Starting with LSPs walk all FPM objects, marking them
660 * as unsent and then replaying them.
661 */
662 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
663 &fnc->t_lspreset);
664
665 /* Permit receiving messages now. */
666 thread_add_read(fnc->fthread->master, fpm_read, fnc,
667 fnc->socket, &fnc->t_read);
668 }
669
670 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
671
672 while (true) {
673 /* Stream is empty: reset pointers and return. */
674 if (STREAM_READABLE(fnc->obuf) == 0) {
675 stream_reset(fnc->obuf);
676 break;
677 }
678
679 /* Try to write all at once. */
680 btotal = stream_get_endp(fnc->obuf) -
681 stream_get_getp(fnc->obuf);
682 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
683 if (bwritten == 0) {
684 atomic_fetch_add_explicit(
685 &fnc->counters.connection_closes, 1,
686 memory_order_relaxed);
687
688 if (IS_ZEBRA_DEBUG_FPM)
689 zlog_debug("%s: connection closed", __func__);
690 break;
691 }
692 if (bwritten == -1) {
693 /* Attempt to continue if blocked by a signal. */
694 if (errno == EINTR)
695 continue;
696 /* Receiver is probably slow, lets give it some time. */
697 if (errno == EAGAIN || errno == EWOULDBLOCK)
698 break;
699
700 atomic_fetch_add_explicit(
701 &fnc->counters.connection_errors, 1,
702 memory_order_relaxed);
703 zlog_warn("%s: connection failure: %s", __func__,
704 strerror(errno));
705
706 FPM_RECONNECT(fnc);
707 return;
708 }
709
710 /* Account all bytes sent. */
711 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
712 memory_order_relaxed);
713
714 /* Account number of bytes free. */
715 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
716 memory_order_relaxed);
717
718 stream_forward_getp(fnc->obuf, (size_t)bwritten);
719 }
720
721 /* Stream is not empty yet, we must schedule more writes. */
722 if (STREAM_READABLE(fnc->obuf)) {
723 stream_pulldown(fnc->obuf);
724 thread_add_write(fnc->fthread->master, fpm_write, fnc,
725 fnc->socket, &fnc->t_write);
726 return;
727 }
728 }
729
730 static void fpm_connect(struct thread *t)
731 {
732 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
733 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
734 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
735 socklen_t slen;
736 int rv, sock;
737 char addrstr[INET6_ADDRSTRLEN];
738
739 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
740 if (sock == -1) {
741 zlog_err("%s: fpm socket failed: %s", __func__,
742 strerror(errno));
743 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
744 &fnc->t_connect);
745 return;
746 }
747
748 set_nonblocking(sock);
749
750 if (fnc->addr.ss_family == AF_INET) {
751 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
752 slen = sizeof(*sin);
753 } else {
754 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
755 slen = sizeof(*sin6);
756 }
757
758 if (IS_ZEBRA_DEBUG_FPM)
759 zlog_debug("%s: attempting to connect to %s:%d", __func__,
760 addrstr, ntohs(sin->sin_port));
761
762 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
763 if (rv == -1 && errno != EINPROGRESS) {
764 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
765 memory_order_relaxed);
766 close(sock);
767 zlog_warn("%s: fpm connection failed: %s", __func__,
768 strerror(errno));
769 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
770 &fnc->t_connect);
771 return;
772 }
773
774 fnc->connecting = (errno == EINPROGRESS);
775 fnc->socket = sock;
776 if (!fnc->connecting)
777 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
778 &fnc->t_read);
779 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
780 &fnc->t_write);
781
782 /*
783 * Starting with LSPs walk all FPM objects, marking them
784 * as unsent and then replaying them.
785 *
786 * If we are not connected, then delay the objects reset/send.
787 */
788 if (!fnc->connecting)
789 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
790 &fnc->t_lspreset);
791 }
792
793 /**
794 * Encode data plane operation context into netlink and enqueue it in the FPM
795 * output buffer.
796 *
797 * @param fnc the netlink FPM context.
798 * @param ctx the data plane operation context data.
799 * @return 0 on success or -1 on not enough space.
800 */
801 static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
802 {
803 uint8_t nl_buf[NL_PKT_BUF_SIZE];
804 size_t nl_buf_len;
805 ssize_t rv;
806 uint64_t obytes, obytes_peak;
807 enum dplane_op_e op = dplane_ctx_get_op(ctx);
808
809 /*
810 * If we were configured to not use next hop groups, then quit as soon
811 * as possible.
812 */
813 if ((!fnc->use_nhg)
814 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
815 || op == DPLANE_OP_NH_UPDATE))
816 return 0;
817
818 nl_buf_len = 0;
819
820 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
821
822 switch (op) {
823 case DPLANE_OP_ROUTE_UPDATE:
824 case DPLANE_OP_ROUTE_DELETE:
825 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
826 nl_buf, sizeof(nl_buf),
827 true, fnc->use_nhg);
828 if (rv <= 0) {
829 zlog_err(
830 "%s: netlink_route_multipath_msg_encode failed",
831 __func__);
832 return 0;
833 }
834
835 nl_buf_len = (size_t)rv;
836
837 /* UPDATE operations need a INSTALL, otherwise just quit. */
838 if (op == DPLANE_OP_ROUTE_DELETE)
839 break;
840
841 /* FALL THROUGH */
842 case DPLANE_OP_ROUTE_INSTALL:
843 rv = netlink_route_multipath_msg_encode(
844 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
845 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
846 if (rv <= 0) {
847 zlog_err(
848 "%s: netlink_route_multipath_msg_encode failed",
849 __func__);
850 return 0;
851 }
852
853 nl_buf_len += (size_t)rv;
854 break;
855
856 case DPLANE_OP_MAC_INSTALL:
857 case DPLANE_OP_MAC_DELETE:
858 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
859 if (rv <= 0) {
860 zlog_err("%s: netlink_macfdb_update_ctx failed",
861 __func__);
862 return 0;
863 }
864
865 nl_buf_len = (size_t)rv;
866 break;
867
868 case DPLANE_OP_NH_DELETE:
869 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
870 sizeof(nl_buf), true);
871 if (rv <= 0) {
872 zlog_err("%s: netlink_nexthop_msg_encode failed",
873 __func__);
874 return 0;
875 }
876
877 nl_buf_len = (size_t)rv;
878 break;
879 case DPLANE_OP_NH_INSTALL:
880 case DPLANE_OP_NH_UPDATE:
881 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
882 sizeof(nl_buf), true);
883 if (rv <= 0) {
884 zlog_err("%s: netlink_nexthop_msg_encode failed",
885 __func__);
886 return 0;
887 }
888
889 nl_buf_len = (size_t)rv;
890 break;
891
892 case DPLANE_OP_LSP_INSTALL:
893 case DPLANE_OP_LSP_UPDATE:
894 case DPLANE_OP_LSP_DELETE:
895 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
896 if (rv <= 0) {
897 zlog_err("%s: netlink_lsp_msg_encoder failed",
898 __func__);
899 return 0;
900 }
901
902 nl_buf_len += (size_t)rv;
903 break;
904
905 /* Un-handled by FPM at this time. */
906 case DPLANE_OP_PW_INSTALL:
907 case DPLANE_OP_PW_UNINSTALL:
908 case DPLANE_OP_ADDR_INSTALL:
909 case DPLANE_OP_ADDR_UNINSTALL:
910 case DPLANE_OP_NEIGH_INSTALL:
911 case DPLANE_OP_NEIGH_UPDATE:
912 case DPLANE_OP_NEIGH_DELETE:
913 case DPLANE_OP_VTEP_ADD:
914 case DPLANE_OP_VTEP_DELETE:
915 case DPLANE_OP_SYS_ROUTE_ADD:
916 case DPLANE_OP_SYS_ROUTE_DELETE:
917 case DPLANE_OP_ROUTE_NOTIFY:
918 case DPLANE_OP_LSP_NOTIFY:
919 case DPLANE_OP_RULE_ADD:
920 case DPLANE_OP_RULE_DELETE:
921 case DPLANE_OP_RULE_UPDATE:
922 case DPLANE_OP_NEIGH_DISCOVER:
923 case DPLANE_OP_BR_PORT_UPDATE:
924 case DPLANE_OP_IPTABLE_ADD:
925 case DPLANE_OP_IPTABLE_DELETE:
926 case DPLANE_OP_IPSET_ADD:
927 case DPLANE_OP_IPSET_DELETE:
928 case DPLANE_OP_IPSET_ENTRY_ADD:
929 case DPLANE_OP_IPSET_ENTRY_DELETE:
930 case DPLANE_OP_NEIGH_IP_INSTALL:
931 case DPLANE_OP_NEIGH_IP_DELETE:
932 case DPLANE_OP_NEIGH_TABLE_UPDATE:
933 case DPLANE_OP_GRE_SET:
934 case DPLANE_OP_INTF_ADDR_ADD:
935 case DPLANE_OP_INTF_ADDR_DEL:
936 case DPLANE_OP_INTF_NETCONFIG:
937 case DPLANE_OP_INTF_INSTALL:
938 case DPLANE_OP_INTF_UPDATE:
939 case DPLANE_OP_INTF_DELETE:
940 case DPLANE_OP_TC_QDISC_INSTALL:
941 case DPLANE_OP_TC_QDISC_UNINSTALL:
942 case DPLANE_OP_TC_CLASS_ADD:
943 case DPLANE_OP_TC_CLASS_DELETE:
944 case DPLANE_OP_TC_CLASS_UPDATE:
945 case DPLANE_OP_TC_FILTER_ADD:
946 case DPLANE_OP_TC_FILTER_DELETE:
947 case DPLANE_OP_TC_FILTER_UPDATE:
948 case DPLANE_OP_NONE:
949 break;
950
951 }
952
953 /* Skip empty enqueues. */
954 if (nl_buf_len == 0)
955 return 0;
956
957 /* We must know if someday a message goes beyond 65KiB. */
958 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
959
960 /* Check if we have enough buffer space. */
961 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
962 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
963 memory_order_relaxed);
964
965 if (IS_ZEBRA_DEBUG_FPM)
966 zlog_debug(
967 "%s: buffer full: wants to write %zu but has %zu",
968 __func__, nl_buf_len + FPM_HEADER_SIZE,
969 STREAM_WRITEABLE(fnc->obuf));
970
971 return -1;
972 }
973
974 /*
975 * Fill in the FPM header information.
976 *
977 * See FPM_HEADER_SIZE definition for more information.
978 */
979 stream_putc(fnc->obuf, 1);
980 stream_putc(fnc->obuf, 1);
981 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
982
983 /* Write current data. */
984 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
985
986 /* Account number of bytes waiting to be written. */
987 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
988 nl_buf_len + FPM_HEADER_SIZE,
989 memory_order_relaxed);
990 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
991 memory_order_relaxed);
992 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
993 memory_order_relaxed);
994 if (obytes_peak < obytes)
995 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
996 memory_order_relaxed);
997
998 /* Tell the thread to start writing. */
999 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
1000 &fnc->t_write);
1001
1002 return 0;
1003 }
1004
1005 /*
1006 * LSP walk/send functions
1007 */
1008 struct fpm_lsp_arg {
1009 struct zebra_dplane_ctx *ctx;
1010 struct fpm_nl_ctx *fnc;
1011 bool complete;
1012 };
1013
1014 static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
1015 {
1016 struct zebra_lsp *lsp = bucket->data;
1017 struct fpm_lsp_arg *fla = arg;
1018
1019 /* Skip entries which have already been sent */
1020 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
1021 return HASHWALK_CONTINUE;
1022
1023 dplane_ctx_reset(fla->ctx);
1024 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
1025
1026 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
1027 fla->complete = false;
1028 return HASHWALK_ABORT;
1029 }
1030
1031 /* Mark entry as sent */
1032 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
1033 return HASHWALK_CONTINUE;
1034 }
1035
1036 static void fpm_lsp_send(struct thread *t)
1037 {
1038 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1039 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1040 struct fpm_lsp_arg fla;
1041
1042 fla.fnc = fnc;
1043 fla.ctx = dplane_ctx_alloc();
1044 fla.complete = true;
1045
1046 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
1047
1048 dplane_ctx_fini(&fla.ctx);
1049
1050 if (fla.complete) {
1051 WALK_FINISH(fnc, FNE_LSP_FINISHED);
1052
1053 /* Now move onto routes */
1054 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
1055 &fnc->t_nhgreset);
1056 } else {
1057 /* Didn't finish - reschedule LSP walk */
1058 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
1059 &fnc->t_lspwalk);
1060 }
1061 }
1062
1063 /*
1064 * Next hop walk/send functions.
1065 */
1066 struct fpm_nhg_arg {
1067 struct zebra_dplane_ctx *ctx;
1068 struct fpm_nl_ctx *fnc;
1069 bool complete;
1070 };
1071
1072 static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
1073 {
1074 struct nhg_hash_entry *nhe = bucket->data;
1075 struct fpm_nhg_arg *fna = arg;
1076
1077 /* This entry was already sent, skip it. */
1078 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
1079 return HASHWALK_CONTINUE;
1080
1081 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
1082 dplane_ctx_reset(fna->ctx);
1083 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
1084 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
1085 /* Our buffers are full, lets give it some cycles. */
1086 fna->complete = false;
1087 return HASHWALK_ABORT;
1088 }
1089
1090 /* Mark group as sent, so it doesn't get sent again. */
1091 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1092
1093 return HASHWALK_CONTINUE;
1094 }
1095
1096 static void fpm_nhg_send(struct thread *t)
1097 {
1098 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1099 struct fpm_nhg_arg fna;
1100
1101 fna.fnc = fnc;
1102 fna.ctx = dplane_ctx_alloc();
1103 fna.complete = true;
1104
1105 /* Send next hops. */
1106 if (fnc->use_nhg)
1107 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
1108
1109 /* `free()` allocated memory. */
1110 dplane_ctx_fini(&fna.ctx);
1111
1112 /* We are done sending next hops, lets install the routes now. */
1113 if (fna.complete) {
1114 WALK_FINISH(fnc, FNE_NHG_FINISHED);
1115 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
1116 &fnc->t_ribreset);
1117 } else /* Otherwise reschedule next hop group again. */
1118 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
1119 &fnc->t_nhgwalk);
1120 }
1121
1122 /**
1123 * Send all RIB installed routes to the connected data plane.
1124 */
1125 static void fpm_rib_send(struct thread *t)
1126 {
1127 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1128 rib_dest_t *dest;
1129 struct route_node *rn;
1130 struct route_table *rt;
1131 struct zebra_dplane_ctx *ctx;
1132 rib_tables_iter_t rt_iter;
1133
1134 /* Allocate temporary context for all transactions. */
1135 ctx = dplane_ctx_alloc();
1136
1137 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1138 while ((rt = rib_tables_iter_next(&rt_iter))) {
1139 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1140 dest = rib_dest_from_rnode(rn);
1141 /* Skip bad route entries. */
1142 if (dest == NULL || dest->selected_fib == NULL)
1143 continue;
1144
1145 /* Check for already sent routes. */
1146 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
1147 continue;
1148
1149 /* Enqueue route install. */
1150 dplane_ctx_reset(ctx);
1151 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1152 dest->selected_fib);
1153 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1154 /* Free the temporary allocated context. */
1155 dplane_ctx_fini(&ctx);
1156
1157 thread_add_timer(zrouter.master, fpm_rib_send,
1158 fnc, 1, &fnc->t_ribwalk);
1159 return;
1160 }
1161
1162 /* Mark as sent. */
1163 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1164 }
1165 }
1166
1167 /* Free the temporary allocated context. */
1168 dplane_ctx_fini(&ctx);
1169
1170 /* All RIB routes sent! */
1171 WALK_FINISH(fnc, FNE_RIB_FINISHED);
1172
1173 /* Schedule next event: RMAC reset. */
1174 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1175 &fnc->t_rmacreset);
1176 }
1177
1178 /*
1179 * The next three functions will handle RMAC enqueue.
1180 */
1181 struct fpm_rmac_arg {
1182 struct zebra_dplane_ctx *ctx;
1183 struct fpm_nl_ctx *fnc;
1184 struct zebra_l3vni *zl3vni;
1185 bool complete;
1186 };
1187
1188 static void fpm_enqueue_rmac_table(struct hash_bucket *bucket, void *arg)
1189 {
1190 struct fpm_rmac_arg *fra = arg;
1191 struct zebra_mac *zrmac = bucket->data;
1192 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1193 struct zebra_vxlan_vni *vni;
1194 struct zebra_if *br_zif;
1195 vlanid_t vid;
1196 bool sticky;
1197
1198 /* Entry already sent. */
1199 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
1200 return;
1201
1202 sticky = !!CHECK_FLAG(zrmac->flags,
1203 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1204 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1205 vni = zebra_vxlan_if_vni_find(zif, fra->zl3vni->vni);
1206 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vni->access_vlan : 0;
1207
1208 dplane_ctx_reset(fra->ctx);
1209 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1210 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
1211 zif->brslave_info.br_if, vid, &zrmac->macaddr, vni->vni,
1212 zrmac->fwd_info.r_vtep_ip, sticky, 0 /*nhg*/,
1213 0 /*update_flags*/);
1214 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
1215 thread_add_timer(zrouter.master, fpm_rmac_send,
1216 fra->fnc, 1, &fra->fnc->t_rmacwalk);
1217 fra->complete = false;
1218 }
1219 }
1220
1221 static void fpm_enqueue_l3vni_table(struct hash_bucket *bucket, void *arg)
1222 {
1223 struct fpm_rmac_arg *fra = arg;
1224 struct zebra_l3vni *zl3vni = bucket->data;
1225
1226 fra->zl3vni = zl3vni;
1227 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1228 }
1229
1230 static void fpm_rmac_send(struct thread *t)
1231 {
1232 struct fpm_rmac_arg fra;
1233
1234 fra.fnc = THREAD_ARG(t);
1235 fra.ctx = dplane_ctx_alloc();
1236 fra.complete = true;
1237 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1238 dplane_ctx_fini(&fra.ctx);
1239
1240 /* RMAC walk completed. */
1241 if (fra.complete)
1242 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
1243 }
1244
1245 /*
1246 * Resets the next hop FPM flags so we send all next hops again.
1247 */
1248 static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1249 {
1250 struct nhg_hash_entry *nhe = bucket->data;
1251
1252 /* Unset FPM installation flag so it gets installed again. */
1253 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1254 }
1255
1256 static void fpm_nhg_reset(struct thread *t)
1257 {
1258 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1259
1260 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
1261
1262 /* Schedule next step: send next hop groups. */
1263 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
1264 }
1265
1266 /*
1267 * Resets the LSP FPM flag so we send all LSPs again.
1268 */
1269 static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1270 {
1271 struct zebra_lsp *lsp = bucket->data;
1272
1273 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1274 }
1275
1276 static void fpm_lsp_reset(struct thread *t)
1277 {
1278 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1279 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1280
1281 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1282
1283 /* Schedule next step: send LSPs */
1284 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
1285 }
1286
1287 /**
1288 * Resets the RIB FPM flags so we send all routes again.
1289 */
1290 static void fpm_rib_reset(struct thread *t)
1291 {
1292 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1293 rib_dest_t *dest;
1294 struct route_node *rn;
1295 struct route_table *rt;
1296 rib_tables_iter_t rt_iter;
1297
1298 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1299 while ((rt = rib_tables_iter_next(&rt_iter))) {
1300 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1301 dest = rib_dest_from_rnode(rn);
1302 /* Skip bad route entries. */
1303 if (dest == NULL)
1304 continue;
1305
1306 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1307 }
1308 }
1309
1310 /* Schedule next step: send RIB routes. */
1311 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
1312 }
1313
1314 /*
1315 * The next three function will handle RMAC table reset.
1316 */
1317 static void fpm_unset_rmac_table(struct hash_bucket *bucket, void *arg)
1318 {
1319 struct zebra_mac *zrmac = bucket->data;
1320
1321 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1322 }
1323
1324 static void fpm_unset_l3vni_table(struct hash_bucket *bucket, void *arg)
1325 {
1326 struct zebra_l3vni *zl3vni = bucket->data;
1327
1328 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1329 }
1330
1331 static void fpm_rmac_reset(struct thread *t)
1332 {
1333 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1334
1335 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1336
1337 /* Schedule next event: send RMAC entries. */
1338 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1339 &fnc->t_rmacwalk);
1340 }
1341
1342 static void fpm_process_queue(struct thread *t)
1343 {
1344 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1345 struct zebra_dplane_ctx *ctx;
1346 bool no_bufs = false;
1347 uint64_t processed_contexts = 0;
1348
1349 while (true) {
1350 /* No space available yet. */
1351 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) {
1352 no_bufs = true;
1353 break;
1354 }
1355
1356 /* Dequeue next item or quit processing. */
1357 frr_with_mutex (&fnc->ctxqueue_mutex) {
1358 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1359 }
1360 if (ctx == NULL)
1361 break;
1362
1363 /*
1364 * Intentionally ignoring the return value
1365 * as that we are ensuring that we can write to
1366 * the output data in the STREAM_WRITEABLE
1367 * check above, so we can ignore the return
1368 */
1369 if (fnc->socket != -1)
1370 (void)fpm_nl_enqueue(fnc, ctx);
1371
1372 /* Account the processed entries. */
1373 processed_contexts++;
1374 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1375 memory_order_relaxed);
1376
1377 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1378 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1379 }
1380
1381 /* Update count of processed contexts */
1382 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts,
1383 processed_contexts, memory_order_relaxed);
1384
1385 /* Re-schedule if we ran out of buffer space */
1386 if (no_bufs)
1387 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1388 fnc, 0, &fnc->t_dequeue);
1389
1390 /*
1391 * Let the dataplane thread know if there are items in the
1392 * output queue to be processed. Otherwise they may sit
1393 * until the dataplane thread gets scheduled for new,
1394 * unrelated work.
1395 */
1396 if (dplane_provider_out_ctx_queue_len(fnc->prov) > 0)
1397 dplane_provider_work_ready();
1398 }
1399
1400 /**
1401 * Handles external (e.g. CLI, data plane or others) events.
1402 */
1403 static void fpm_process_event(struct thread *t)
1404 {
1405 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1406 enum fpm_nl_events event = THREAD_VAL(t);
1407
1408 switch (event) {
1409 case FNE_DISABLE:
1410 zlog_info("%s: manual FPM disable event", __func__);
1411 fnc->disabled = true;
1412 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1413 memory_order_relaxed);
1414
1415 /* Call reconnect to disable timers and clean up context. */
1416 fpm_reconnect(fnc);
1417 break;
1418
1419 case FNE_RECONNECT:
1420 zlog_info("%s: manual FPM reconnect event", __func__);
1421 fnc->disabled = false;
1422 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1423 memory_order_relaxed);
1424 fpm_reconnect(fnc);
1425 break;
1426
1427 case FNE_RESET_COUNTERS:
1428 zlog_info("%s: manual FPM counters reset event", __func__);
1429 memset(&fnc->counters, 0, sizeof(fnc->counters));
1430 break;
1431
1432 case FNE_TOGGLE_NHG:
1433 zlog_info("%s: toggle next hop groups support", __func__);
1434 fnc->use_nhg = !fnc->use_nhg;
1435 fpm_reconnect(fnc);
1436 break;
1437
1438 case FNE_INTERNAL_RECONNECT:
1439 fpm_reconnect(fnc);
1440 break;
1441
1442 case FNE_NHG_FINISHED:
1443 if (IS_ZEBRA_DEBUG_FPM)
1444 zlog_debug("%s: next hop groups walk finished",
1445 __func__);
1446 break;
1447 case FNE_RIB_FINISHED:
1448 if (IS_ZEBRA_DEBUG_FPM)
1449 zlog_debug("%s: RIB walk finished", __func__);
1450 break;
1451 case FNE_RMAC_FINISHED:
1452 if (IS_ZEBRA_DEBUG_FPM)
1453 zlog_debug("%s: RMAC walk finished", __func__);
1454 break;
1455 case FNE_LSP_FINISHED:
1456 if (IS_ZEBRA_DEBUG_FPM)
1457 zlog_debug("%s: LSP walk finished", __func__);
1458 break;
1459 }
1460 }
1461
1462 /*
1463 * Data plane functions.
1464 */
1465 static int fpm_nl_start(struct zebra_dplane_provider *prov)
1466 {
1467 struct fpm_nl_ctx *fnc;
1468
1469 fnc = dplane_provider_get_data(prov);
1470 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1471 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1472 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1473 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1474 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1475 fnc->socket = -1;
1476 fnc->disabled = true;
1477 fnc->prov = prov;
1478 dplane_ctx_q_init(&fnc->ctxqueue);
1479 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
1480
1481 /* Set default values. */
1482 fnc->use_nhg = true;
1483
1484 return 0;
1485 }
1486
1487 static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
1488 {
1489 /* Disable all events and close socket. */
1490 THREAD_OFF(fnc->t_lspreset);
1491 THREAD_OFF(fnc->t_lspwalk);
1492 THREAD_OFF(fnc->t_nhgreset);
1493 THREAD_OFF(fnc->t_nhgwalk);
1494 THREAD_OFF(fnc->t_ribreset);
1495 THREAD_OFF(fnc->t_ribwalk);
1496 THREAD_OFF(fnc->t_rmacreset);
1497 THREAD_OFF(fnc->t_rmacwalk);
1498 THREAD_OFF(fnc->t_event);
1499 THREAD_OFF(fnc->t_nhg);
1500 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1501 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1502 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
1503
1504 if (fnc->socket != -1) {
1505 close(fnc->socket);
1506 fnc->socket = -1;
1507 }
1508
1509 return 0;
1510 }
1511
1512 static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1513 {
1514 /* Stop the running thread. */
1515 frr_pthread_stop(fnc->fthread, NULL);
1516
1517 /* Free all allocated resources. */
1518 pthread_mutex_destroy(&fnc->obuf_mutex);
1519 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
1520 stream_free(fnc->ibuf);
1521 stream_free(fnc->obuf);
1522 free(gfnc);
1523 gfnc = NULL;
1524
1525 return 0;
1526 }
1527
1528 static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1529 {
1530 struct fpm_nl_ctx *fnc;
1531
1532 fnc = dplane_provider_get_data(prov);
1533 if (early)
1534 return fpm_nl_finish_early(fnc);
1535
1536 return fpm_nl_finish_late(fnc);
1537 }
1538
1539 static int fpm_nl_process(struct zebra_dplane_provider *prov)
1540 {
1541 struct zebra_dplane_ctx *ctx;
1542 struct fpm_nl_ctx *fnc;
1543 int counter, limit;
1544 uint64_t cur_queue, peak_queue = 0, stored_peak_queue;
1545
1546 fnc = dplane_provider_get_data(prov);
1547 limit = dplane_provider_get_work_limit(prov);
1548 for (counter = 0; counter < limit; counter++) {
1549 ctx = dplane_provider_dequeue_in_ctx(prov);
1550 if (ctx == NULL)
1551 break;
1552
1553 /*
1554 * Skip all notifications if not connected, we'll walk the RIB
1555 * anyway.
1556 */
1557 if (fnc->socket != -1 && fnc->connecting == false) {
1558 /*
1559 * Update the number of queued contexts *before*
1560 * enqueueing, to ensure counter consistency.
1561 */
1562 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1563 1, memory_order_relaxed);
1564
1565 frr_with_mutex (&fnc->ctxqueue_mutex) {
1566 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1567 }
1568
1569 cur_queue = atomic_load_explicit(
1570 &fnc->counters.ctxqueue_len,
1571 memory_order_relaxed);
1572 if (peak_queue < cur_queue)
1573 peak_queue = cur_queue;
1574 continue;
1575 }
1576
1577 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1578 dplane_provider_enqueue_out_ctx(prov, ctx);
1579 }
1580
1581 /* Update peak queue length, if we just observed a new peak */
1582 stored_peak_queue = atomic_load_explicit(
1583 &fnc->counters.ctxqueue_len_peak, memory_order_relaxed);
1584 if (stored_peak_queue < peak_queue)
1585 atomic_store_explicit(&fnc->counters.ctxqueue_len_peak,
1586 peak_queue, memory_order_relaxed);
1587
1588 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1589 memory_order_relaxed)
1590 > 0)
1591 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1592 fnc, 0, &fnc->t_dequeue);
1593
1594 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1595 if (counter >= limit)
1596 dplane_provider_work_ready();
1597
1598 return 0;
1599 }
1600
1601 static int fpm_nl_new(struct thread_master *tm)
1602 {
1603 struct zebra_dplane_provider *prov = NULL;
1604 int rv;
1605
1606 gfnc = calloc(1, sizeof(*gfnc));
1607 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1608 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
1609 fpm_nl_process, fpm_nl_finish, gfnc,
1610 &prov);
1611
1612 if (IS_ZEBRA_DEBUG_DPLANE)
1613 zlog_debug("%s register status: %d", prov_name, rv);
1614
1615 install_node(&fpm_node);
1616 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1617 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1618 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
1619 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1620 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
1621 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1622 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
1623
1624 return 0;
1625 }
1626
1627 static int fpm_nl_init(void)
1628 {
1629 hook_register(frr_late_init, fpm_nl_new);
1630 return 0;
1631 }
1632
1633 FRR_MODULE_SETUP(
1634 .name = "dplane_fpm_nl",
1635 .version = "0.0.1",
1636 .description = "Data plane plugin for FPM using netlink.",
1637 .init = fpm_nl_init,
1638 );