]> git.proxmox.com Git - mirror_frr.git/blob - zebra/dplane_fpm_nl.c
0a9fecc9dff1b306ac2bc1688768365459db2bee
[mirror_frr.git] / zebra / dplane_fpm_nl.c
1 /*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h" /* Include this explicitly */
24 #endif
25
26 #include <arpa/inet.h>
27
28 #include <sys/types.h>
29 #include <sys/socket.h>
30
31 #include <errno.h>
32 #include <string.h>
33
34 #include "lib/zebra.h"
35 #include "lib/json.h"
36 #include "lib/libfrr.h"
37 #include "lib/frratomic.h"
38 #include "lib/command.h"
39 #include "lib/memory.h"
40 #include "lib/network.h"
41 #include "lib/ns.h"
42 #include "lib/frr_pthread.h"
43 #include "zebra/debug.h"
44 #include "zebra/interface.h"
45 #include "zebra/zebra_dplane.h"
46 #include "zebra/zebra_mpls.h"
47 #include "zebra/zebra_router.h"
48 #include "zebra/zebra_evpn.h"
49 #include "zebra/zebra_evpn_mac.h"
50 #include "zebra/zebra_vxlan_private.h"
51 #include "zebra/kernel_netlink.h"
52 #include "zebra/rt_netlink.h"
53 #include "zebra/debug.h"
54 #include "fpm/fpm.h"
55
56 #define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
57 #define SOUTHBOUND_DEFAULT_PORT 2620
58
59 /**
60 * FPM header:
61 * {
62 * version: 1 byte (always 1),
63 * type: 1 byte (1 for netlink, 2 protobuf),
64 * len: 2 bytes (network order),
65 * }
66 *
67 * This header is used with any format to tell the users how many bytes to
68 * expect.
69 */
70 #define FPM_HEADER_SIZE 4
71
72 static const char *prov_name = "dplane_fpm_nl";
73
74 struct fpm_nl_ctx {
75 /* data plane connection. */
76 int socket;
77 bool disabled;
78 bool connecting;
79 bool use_nhg;
80 struct sockaddr_storage addr;
81
82 /* data plane buffers. */
83 struct stream *ibuf;
84 struct stream *obuf;
85 pthread_mutex_t obuf_mutex;
86
87 /*
88 * data plane context queue:
89 * When a FPM server connection becomes a bottleneck, we must keep the
90 * data plane contexts until we get a chance to process them.
91 */
92 struct dplane_ctx_list_head ctxqueue;
93 pthread_mutex_t ctxqueue_mutex;
94
95 /* data plane events. */
96 struct zebra_dplane_provider *prov;
97 struct frr_pthread *fthread;
98 struct thread *t_connect;
99 struct thread *t_read;
100 struct thread *t_write;
101 struct thread *t_event;
102 struct thread *t_nhg;
103 struct thread *t_dequeue;
104
105 /* zebra events. */
106 struct thread *t_lspreset;
107 struct thread *t_lspwalk;
108 struct thread *t_nhgreset;
109 struct thread *t_nhgwalk;
110 struct thread *t_ribreset;
111 struct thread *t_ribwalk;
112 struct thread *t_rmacreset;
113 struct thread *t_rmacwalk;
114
115 /* Statistic counters. */
116 struct {
117 /* Amount of bytes read into ibuf. */
118 _Atomic uint32_t bytes_read;
119 /* Amount of bytes written from obuf. */
120 _Atomic uint32_t bytes_sent;
121 /* Output buffer current usage. */
122 _Atomic uint32_t obuf_bytes;
123 /* Output buffer peak usage. */
124 _Atomic uint32_t obuf_peak;
125
126 /* Amount of connection closes. */
127 _Atomic uint32_t connection_closes;
128 /* Amount of connection errors. */
129 _Atomic uint32_t connection_errors;
130
131 /* Amount of user configurations: FNE_RECONNECT. */
132 _Atomic uint32_t user_configures;
133 /* Amount of user disable requests: FNE_DISABLE. */
134 _Atomic uint32_t user_disables;
135
136 /* Amount of data plane context processed. */
137 _Atomic uint32_t dplane_contexts;
138 /* Amount of data plane contexts enqueued. */
139 _Atomic uint32_t ctxqueue_len;
140 /* Peak amount of data plane contexts enqueued. */
141 _Atomic uint32_t ctxqueue_len_peak;
142
143 /* Amount of buffer full events. */
144 _Atomic uint32_t buffer_full;
145 } counters;
146 } *gfnc;
147
148 enum fpm_nl_events {
149 /* Ask for FPM to reconnect the external server. */
150 FNE_RECONNECT,
151 /* Disable FPM. */
152 FNE_DISABLE,
153 /* Reset counters. */
154 FNE_RESET_COUNTERS,
155 /* Toggle next hop group feature. */
156 FNE_TOGGLE_NHG,
157 /* Reconnect request by our own code to avoid races. */
158 FNE_INTERNAL_RECONNECT,
159
160 /* LSP walk finished. */
161 FNE_LSP_FINISHED,
162 /* Next hop groups walk finished. */
163 FNE_NHG_FINISHED,
164 /* RIB walk finished. */
165 FNE_RIB_FINISHED,
166 /* RMAC walk finished. */
167 FNE_RMAC_FINISHED,
168 };
169
170 #define FPM_RECONNECT(fnc) \
171 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
172 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
173
174 #define WALK_FINISH(fnc, ev) \
175 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
176 (ev), NULL)
177
178 /*
179 * Prototypes.
180 */
181 static void fpm_process_event(struct thread *t);
182 static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
183 static void fpm_lsp_send(struct thread *t);
184 static void fpm_lsp_reset(struct thread *t);
185 static void fpm_nhg_send(struct thread *t);
186 static void fpm_nhg_reset(struct thread *t);
187 static void fpm_rib_send(struct thread *t);
188 static void fpm_rib_reset(struct thread *t);
189 static void fpm_rmac_send(struct thread *t);
190 static void fpm_rmac_reset(struct thread *t);
191
192 /*
193 * CLI.
194 */
195 #define FPM_STR "Forwarding Plane Manager configuration\n"
196
197 DEFUN(fpm_set_address, fpm_set_address_cmd,
198 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
199 FPM_STR
200 "FPM remote listening server address\n"
201 "Remote IPv4 FPM server\n"
202 "Remote IPv6 FPM server\n"
203 "FPM remote listening server port\n"
204 "Remote FPM server port\n")
205 {
206 struct sockaddr_in *sin;
207 struct sockaddr_in6 *sin6;
208 uint16_t port = 0;
209 uint8_t naddr[INET6_BUFSIZ];
210
211 if (argc == 5)
212 port = strtol(argv[4]->arg, NULL, 10);
213
214 /* Handle IPv4 addresses. */
215 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
216 sin = (struct sockaddr_in *)&gfnc->addr;
217
218 memset(sin, 0, sizeof(*sin));
219 sin->sin_family = AF_INET;
220 sin->sin_port =
221 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
222 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
223 sin->sin_len = sizeof(*sin);
224 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
225 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
226
227 goto ask_reconnect;
228 }
229
230 /* Handle IPv6 addresses. */
231 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
232 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
233 return CMD_WARNING;
234 }
235
236 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
237 memset(sin6, 0, sizeof(*sin6));
238 sin6->sin6_family = AF_INET6;
239 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
240 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
241 sin6->sin6_len = sizeof(*sin6);
242 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
243 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
244
245 ask_reconnect:
246 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
247 FNE_RECONNECT, &gfnc->t_event);
248 return CMD_SUCCESS;
249 }
250
251 DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
252 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
253 NO_STR
254 FPM_STR
255 "FPM remote listening server address\n"
256 "Remote IPv4 FPM server\n"
257 "Remote IPv6 FPM server\n"
258 "FPM remote listening server port\n"
259 "Remote FPM server port\n")
260 {
261 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
262 FNE_DISABLE, &gfnc->t_event);
263 return CMD_SUCCESS;
264 }
265
266 DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
267 "fpm use-next-hop-groups",
268 FPM_STR
269 "Use netlink next hop groups feature.\n")
270 {
271 /* Already enabled. */
272 if (gfnc->use_nhg)
273 return CMD_SUCCESS;
274
275 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
276 FNE_TOGGLE_NHG, &gfnc->t_nhg);
277
278 return CMD_SUCCESS;
279 }
280
281 DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
282 "no fpm use-next-hop-groups",
283 NO_STR
284 FPM_STR
285 "Use netlink next hop groups feature.\n")
286 {
287 /* Already disabled. */
288 if (!gfnc->use_nhg)
289 return CMD_SUCCESS;
290
291 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
292 FNE_TOGGLE_NHG, &gfnc->t_nhg);
293
294 return CMD_SUCCESS;
295 }
296
297 DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
298 "clear fpm counters",
299 CLEAR_STR
300 FPM_STR
301 "FPM statistic counters\n")
302 {
303 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
304 FNE_RESET_COUNTERS, &gfnc->t_event);
305 return CMD_SUCCESS;
306 }
307
308 DEFUN(fpm_show_counters, fpm_show_counters_cmd,
309 "show fpm counters",
310 SHOW_STR
311 FPM_STR
312 "FPM statistic counters\n")
313 {
314 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
315
316 #define SHOW_COUNTER(label, counter) \
317 vty_out(vty, "%28s: %u\n", (label), (counter))
318
319 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
320 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
321 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
322 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
323 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
324 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
325 SHOW_COUNTER("Data plane items processed",
326 gfnc->counters.dplane_contexts);
327 SHOW_COUNTER("Data plane items enqueued",
328 gfnc->counters.ctxqueue_len);
329 SHOW_COUNTER("Data plane items queue peak",
330 gfnc->counters.ctxqueue_len_peak);
331 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
332 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
333 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
334
335 #undef SHOW_COUNTER
336
337 return CMD_SUCCESS;
338 }
339
340 DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
341 "show fpm counters json",
342 SHOW_STR
343 FPM_STR
344 "FPM statistic counters\n"
345 JSON_STR)
346 {
347 struct json_object *jo;
348
349 jo = json_object_new_object();
350 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
351 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
352 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
353 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
354 json_object_int_add(jo, "connection-closes",
355 gfnc->counters.connection_closes);
356 json_object_int_add(jo, "connection-errors",
357 gfnc->counters.connection_errors);
358 json_object_int_add(jo, "data-plane-contexts",
359 gfnc->counters.dplane_contexts);
360 json_object_int_add(jo, "data-plane-contexts-queue",
361 gfnc->counters.ctxqueue_len);
362 json_object_int_add(jo, "data-plane-contexts-queue-peak",
363 gfnc->counters.ctxqueue_len_peak);
364 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
365 json_object_int_add(jo, "user-configures",
366 gfnc->counters.user_configures);
367 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
368 vty_json(vty, jo);
369
370 return CMD_SUCCESS;
371 }
372
373 static int fpm_write_config(struct vty *vty)
374 {
375 struct sockaddr_in *sin;
376 struct sockaddr_in6 *sin6;
377 int written = 0;
378
379 if (gfnc->disabled)
380 return written;
381
382 switch (gfnc->addr.ss_family) {
383 case AF_INET:
384 written = 1;
385 sin = (struct sockaddr_in *)&gfnc->addr;
386 vty_out(vty, "fpm address %pI4", &sin->sin_addr);
387 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
388 vty_out(vty, " port %d", ntohs(sin->sin_port));
389
390 vty_out(vty, "\n");
391 break;
392 case AF_INET6:
393 written = 1;
394 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
395 vty_out(vty, "fpm address %pI6", &sin6->sin6_addr);
396 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
397 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
398
399 vty_out(vty, "\n");
400 break;
401
402 default:
403 break;
404 }
405
406 if (!gfnc->use_nhg) {
407 vty_out(vty, "no fpm use-next-hop-groups\n");
408 written = 1;
409 }
410
411 return written;
412 }
413
414 static struct cmd_node fpm_node = {
415 .name = "fpm",
416 .node = FPM_NODE,
417 .prompt = "",
418 .config_write = fpm_write_config,
419 };
420
421 /*
422 * FPM functions.
423 */
424 static void fpm_connect(struct thread *t);
425
426 static void fpm_reconnect(struct fpm_nl_ctx *fnc)
427 {
428 /* Cancel all zebra threads first. */
429 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
430 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
431 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
432 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
433 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
434 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
435 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
436 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
437
438 /*
439 * Grab the lock to empty the streams (data plane might try to
440 * enqueue updates while we are closing).
441 */
442 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
443
444 /* Avoid calling close on `-1`. */
445 if (fnc->socket != -1) {
446 close(fnc->socket);
447 fnc->socket = -1;
448 }
449
450 stream_reset(fnc->ibuf);
451 stream_reset(fnc->obuf);
452 THREAD_OFF(fnc->t_read);
453 THREAD_OFF(fnc->t_write);
454
455 /* FPM is disabled, don't attempt to connect. */
456 if (fnc->disabled)
457 return;
458
459 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
460 &fnc->t_connect);
461 }
462
463 static void fpm_read(struct thread *t)
464 {
465 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
466 fpm_msg_hdr_t fpm;
467 ssize_t rv;
468 char buf[65535];
469 struct nlmsghdr *hdr;
470 struct zebra_dplane_ctx *ctx;
471 size_t available_bytes;
472 size_t hdr_available_bytes;
473
474 /* Let's ignore the input at the moment. */
475 rv = stream_read_try(fnc->ibuf, fnc->socket,
476 STREAM_WRITEABLE(fnc->ibuf));
477 if (rv == 0) {
478 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
479 memory_order_relaxed);
480
481 if (IS_ZEBRA_DEBUG_FPM)
482 zlog_debug("%s: connection closed", __func__);
483
484 FPM_RECONNECT(fnc);
485 return;
486 }
487 if (rv == -1) {
488 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
489 memory_order_relaxed);
490 zlog_warn("%s: connection failure: %s", __func__,
491 strerror(errno));
492 FPM_RECONNECT(fnc);
493 return;
494 }
495
496 /* Schedule the next read */
497 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
498 &fnc->t_read);
499
500 /* We've got an interruption. */
501 if (rv == -2)
502 return;
503
504
505 /* Account all bytes read. */
506 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
507 memory_order_relaxed);
508
509 available_bytes = STREAM_READABLE(fnc->ibuf);
510 while (available_bytes) {
511 if (available_bytes < (ssize_t)FPM_MSG_HDR_LEN) {
512 stream_pulldown(fnc->ibuf);
513 return;
514 }
515
516 fpm.version = stream_getc(fnc->ibuf);
517 fpm.msg_type = stream_getc(fnc->ibuf);
518 fpm.msg_len = stream_getw(fnc->ibuf);
519
520 if (fpm.version != FPM_PROTO_VERSION &&
521 fpm.msg_type != FPM_MSG_TYPE_NETLINK) {
522 stream_reset(fnc->ibuf);
523 zlog_warn(
524 "%s: Received version/msg_type %u/%u, expected 1/1",
525 __func__, fpm.version, fpm.msg_type);
526
527 FPM_RECONNECT(fnc);
528 return;
529 }
530
531 /*
532 * If the passed in length doesn't even fill in the header
533 * something is wrong and reset.
534 */
535 if (fpm.msg_len < FPM_MSG_HDR_LEN) {
536 zlog_warn(
537 "%s: Received message length: %u that does not even fill the FPM header",
538 __func__, fpm.msg_len);
539 FPM_RECONNECT(fnc);
540 return;
541 }
542
543 /*
544 * If we have not received the whole payload, reset the stream
545 * back to the beginning of the header and move it to the
546 * top.
547 */
548 if (fpm.msg_len > available_bytes) {
549 stream_rewind_getp(fnc->ibuf, FPM_MSG_HDR_LEN);
550 stream_pulldown(fnc->ibuf);
551 return;
552 }
553
554 available_bytes -= FPM_MSG_HDR_LEN;
555
556 /*
557 * Place the data from the stream into a buffer
558 */
559 hdr = (struct nlmsghdr *)buf;
560 stream_get(buf, fnc->ibuf, fpm.msg_len - FPM_MSG_HDR_LEN);
561 hdr_available_bytes = fpm.msg_len - FPM_MSG_HDR_LEN;
562 available_bytes -= hdr_available_bytes;
563
564 /* Sanity check: must be at least header size. */
565 if (hdr->nlmsg_len < sizeof(*hdr)) {
566 zlog_warn(
567 "%s: [seq=%u] invalid message length %u (< %zu)",
568 __func__, hdr->nlmsg_seq, hdr->nlmsg_len,
569 sizeof(*hdr));
570 continue;
571 }
572 if (hdr->nlmsg_len > fpm.msg_len) {
573 zlog_warn(
574 "%s: Received a inner header length of %u that is greater than the fpm total length of %u",
575 __func__, hdr->nlmsg_len, fpm.msg_len);
576 FPM_RECONNECT(fnc);
577 }
578 /* Not enough bytes available. */
579 if (hdr->nlmsg_len > hdr_available_bytes) {
580 zlog_warn(
581 "%s: [seq=%u] invalid message length %u (> %zu)",
582 __func__, hdr->nlmsg_seq, hdr->nlmsg_len,
583 available_bytes);
584 continue;
585 }
586
587 if (!(hdr->nlmsg_flags & NLM_F_REQUEST)) {
588 if (IS_ZEBRA_DEBUG_FPM)
589 zlog_debug(
590 "%s: [seq=%u] not a request, skipping",
591 __func__, hdr->nlmsg_seq);
592
593 /*
594 * This request is a bust, go to the next one
595 */
596 continue;
597 }
598
599 switch (hdr->nlmsg_type) {
600 case RTM_NEWROUTE:
601 ctx = dplane_ctx_alloc();
602 dplane_ctx_set_op(ctx, DPLANE_OP_ROUTE_NOTIFY);
603 if (netlink_route_change_read_unicast_internal(
604 hdr, 0, false, ctx) != 1) {
605 dplane_ctx_fini(&ctx);
606 stream_pulldown(fnc->ibuf);
607 /*
608 * Let's continue to read other messages
609 * Even if we ignore this one.
610 */
611 }
612 break;
613 default:
614 if (IS_ZEBRA_DEBUG_FPM)
615 zlog_debug(
616 "%s: Received message type %u which is not currently handled",
617 __func__, hdr->nlmsg_type);
618 break;
619 }
620 }
621
622 stream_reset(fnc->ibuf);
623 }
624
625 static void fpm_write(struct thread *t)
626 {
627 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
628 socklen_t statuslen;
629 ssize_t bwritten;
630 int rv, status;
631 size_t btotal;
632
633 if (fnc->connecting == true) {
634 status = 0;
635 statuslen = sizeof(status);
636
637 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
638 &statuslen);
639 if (rv == -1 || status != 0) {
640 if (rv != -1)
641 zlog_warn("%s: connection failed: %s", __func__,
642 strerror(status));
643 else
644 zlog_warn("%s: SO_ERROR failed: %s", __func__,
645 strerror(status));
646
647 atomic_fetch_add_explicit(
648 &fnc->counters.connection_errors, 1,
649 memory_order_relaxed);
650
651 FPM_RECONNECT(fnc);
652 return;
653 }
654
655 fnc->connecting = false;
656
657 /*
658 * Starting with LSPs walk all FPM objects, marking them
659 * as unsent and then replaying them.
660 */
661 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
662 &fnc->t_lspreset);
663
664 /* Permit receiving messages now. */
665 thread_add_read(fnc->fthread->master, fpm_read, fnc,
666 fnc->socket, &fnc->t_read);
667 }
668
669 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
670
671 while (true) {
672 /* Stream is empty: reset pointers and return. */
673 if (STREAM_READABLE(fnc->obuf) == 0) {
674 stream_reset(fnc->obuf);
675 break;
676 }
677
678 /* Try to write all at once. */
679 btotal = stream_get_endp(fnc->obuf) -
680 stream_get_getp(fnc->obuf);
681 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
682 if (bwritten == 0) {
683 atomic_fetch_add_explicit(
684 &fnc->counters.connection_closes, 1,
685 memory_order_relaxed);
686
687 if (IS_ZEBRA_DEBUG_FPM)
688 zlog_debug("%s: connection closed", __func__);
689 break;
690 }
691 if (bwritten == -1) {
692 /* Attempt to continue if blocked by a signal. */
693 if (errno == EINTR)
694 continue;
695 /* Receiver is probably slow, lets give it some time. */
696 if (errno == EAGAIN || errno == EWOULDBLOCK)
697 break;
698
699 atomic_fetch_add_explicit(
700 &fnc->counters.connection_errors, 1,
701 memory_order_relaxed);
702 zlog_warn("%s: connection failure: %s", __func__,
703 strerror(errno));
704
705 FPM_RECONNECT(fnc);
706 return;
707 }
708
709 /* Account all bytes sent. */
710 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
711 memory_order_relaxed);
712
713 /* Account number of bytes free. */
714 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
715 memory_order_relaxed);
716
717 stream_forward_getp(fnc->obuf, (size_t)bwritten);
718 }
719
720 /* Stream is not empty yet, we must schedule more writes. */
721 if (STREAM_READABLE(fnc->obuf)) {
722 stream_pulldown(fnc->obuf);
723 thread_add_write(fnc->fthread->master, fpm_write, fnc,
724 fnc->socket, &fnc->t_write);
725 return;
726 }
727 }
728
729 static void fpm_connect(struct thread *t)
730 {
731 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
732 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
733 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
734 socklen_t slen;
735 int rv, sock;
736 char addrstr[INET6_ADDRSTRLEN];
737
738 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
739 if (sock == -1) {
740 zlog_err("%s: fpm socket failed: %s", __func__,
741 strerror(errno));
742 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
743 &fnc->t_connect);
744 return;
745 }
746
747 set_nonblocking(sock);
748
749 if (fnc->addr.ss_family == AF_INET) {
750 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
751 slen = sizeof(*sin);
752 } else {
753 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
754 slen = sizeof(*sin6);
755 }
756
757 if (IS_ZEBRA_DEBUG_FPM)
758 zlog_debug("%s: attempting to connect to %s:%d", __func__,
759 addrstr, ntohs(sin->sin_port));
760
761 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
762 if (rv == -1 && errno != EINPROGRESS) {
763 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
764 memory_order_relaxed);
765 close(sock);
766 zlog_warn("%s: fpm connection failed: %s", __func__,
767 strerror(errno));
768 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
769 &fnc->t_connect);
770 return;
771 }
772
773 fnc->connecting = (errno == EINPROGRESS);
774 fnc->socket = sock;
775 if (!fnc->connecting)
776 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
777 &fnc->t_read);
778 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
779 &fnc->t_write);
780
781 /*
782 * Starting with LSPs walk all FPM objects, marking them
783 * as unsent and then replaying them.
784 *
785 * If we are not connected, then delay the objects reset/send.
786 */
787 if (!fnc->connecting)
788 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
789 &fnc->t_lspreset);
790 }
791
792 /**
793 * Encode data plane operation context into netlink and enqueue it in the FPM
794 * output buffer.
795 *
796 * @param fnc the netlink FPM context.
797 * @param ctx the data plane operation context data.
798 * @return 0 on success or -1 on not enough space.
799 */
800 static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
801 {
802 uint8_t nl_buf[NL_PKT_BUF_SIZE];
803 size_t nl_buf_len;
804 ssize_t rv;
805 uint64_t obytes, obytes_peak;
806 enum dplane_op_e op = dplane_ctx_get_op(ctx);
807
808 /*
809 * If we were configured to not use next hop groups, then quit as soon
810 * as possible.
811 */
812 if ((!fnc->use_nhg)
813 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
814 || op == DPLANE_OP_NH_UPDATE))
815 return 0;
816
817 nl_buf_len = 0;
818
819 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
820
821 switch (op) {
822 case DPLANE_OP_ROUTE_UPDATE:
823 case DPLANE_OP_ROUTE_DELETE:
824 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
825 nl_buf, sizeof(nl_buf),
826 true, fnc->use_nhg);
827 if (rv <= 0) {
828 zlog_err(
829 "%s: netlink_route_multipath_msg_encode failed",
830 __func__);
831 return 0;
832 }
833
834 nl_buf_len = (size_t)rv;
835
836 /* UPDATE operations need a INSTALL, otherwise just quit. */
837 if (op == DPLANE_OP_ROUTE_DELETE)
838 break;
839
840 /* FALL THROUGH */
841 case DPLANE_OP_ROUTE_INSTALL:
842 rv = netlink_route_multipath_msg_encode(
843 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
844 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
845 if (rv <= 0) {
846 zlog_err(
847 "%s: netlink_route_multipath_msg_encode failed",
848 __func__);
849 return 0;
850 }
851
852 nl_buf_len += (size_t)rv;
853 break;
854
855 case DPLANE_OP_MAC_INSTALL:
856 case DPLANE_OP_MAC_DELETE:
857 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
858 if (rv <= 0) {
859 zlog_err("%s: netlink_macfdb_update_ctx failed",
860 __func__);
861 return 0;
862 }
863
864 nl_buf_len = (size_t)rv;
865 break;
866
867 case DPLANE_OP_NH_DELETE:
868 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
869 sizeof(nl_buf), true);
870 if (rv <= 0) {
871 zlog_err("%s: netlink_nexthop_msg_encode failed",
872 __func__);
873 return 0;
874 }
875
876 nl_buf_len = (size_t)rv;
877 break;
878 case DPLANE_OP_NH_INSTALL:
879 case DPLANE_OP_NH_UPDATE:
880 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
881 sizeof(nl_buf), true);
882 if (rv <= 0) {
883 zlog_err("%s: netlink_nexthop_msg_encode failed",
884 __func__);
885 return 0;
886 }
887
888 nl_buf_len = (size_t)rv;
889 break;
890
891 case DPLANE_OP_LSP_INSTALL:
892 case DPLANE_OP_LSP_UPDATE:
893 case DPLANE_OP_LSP_DELETE:
894 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
895 if (rv <= 0) {
896 zlog_err("%s: netlink_lsp_msg_encoder failed",
897 __func__);
898 return 0;
899 }
900
901 nl_buf_len += (size_t)rv;
902 break;
903
904 /* Un-handled by FPM at this time. */
905 case DPLANE_OP_PW_INSTALL:
906 case DPLANE_OP_PW_UNINSTALL:
907 case DPLANE_OP_ADDR_INSTALL:
908 case DPLANE_OP_ADDR_UNINSTALL:
909 case DPLANE_OP_NEIGH_INSTALL:
910 case DPLANE_OP_NEIGH_UPDATE:
911 case DPLANE_OP_NEIGH_DELETE:
912 case DPLANE_OP_VTEP_ADD:
913 case DPLANE_OP_VTEP_DELETE:
914 case DPLANE_OP_SYS_ROUTE_ADD:
915 case DPLANE_OP_SYS_ROUTE_DELETE:
916 case DPLANE_OP_ROUTE_NOTIFY:
917 case DPLANE_OP_LSP_NOTIFY:
918 case DPLANE_OP_RULE_ADD:
919 case DPLANE_OP_RULE_DELETE:
920 case DPLANE_OP_RULE_UPDATE:
921 case DPLANE_OP_NEIGH_DISCOVER:
922 case DPLANE_OP_BR_PORT_UPDATE:
923 case DPLANE_OP_IPTABLE_ADD:
924 case DPLANE_OP_IPTABLE_DELETE:
925 case DPLANE_OP_IPSET_ADD:
926 case DPLANE_OP_IPSET_DELETE:
927 case DPLANE_OP_IPSET_ENTRY_ADD:
928 case DPLANE_OP_IPSET_ENTRY_DELETE:
929 case DPLANE_OP_NEIGH_IP_INSTALL:
930 case DPLANE_OP_NEIGH_IP_DELETE:
931 case DPLANE_OP_NEIGH_TABLE_UPDATE:
932 case DPLANE_OP_GRE_SET:
933 case DPLANE_OP_INTF_ADDR_ADD:
934 case DPLANE_OP_INTF_ADDR_DEL:
935 case DPLANE_OP_INTF_NETCONFIG:
936 case DPLANE_OP_INTF_INSTALL:
937 case DPLANE_OP_INTF_UPDATE:
938 case DPLANE_OP_INTF_DELETE:
939 case DPLANE_OP_TC_QDISC_INSTALL:
940 case DPLANE_OP_TC_QDISC_UNINSTALL:
941 case DPLANE_OP_TC_CLASS_ADD:
942 case DPLANE_OP_TC_CLASS_DELETE:
943 case DPLANE_OP_TC_CLASS_UPDATE:
944 case DPLANE_OP_TC_FILTER_ADD:
945 case DPLANE_OP_TC_FILTER_DELETE:
946 case DPLANE_OP_TC_FILTER_UPDATE:
947 case DPLANE_OP_NONE:
948 break;
949
950 }
951
952 /* Skip empty enqueues. */
953 if (nl_buf_len == 0)
954 return 0;
955
956 /* We must know if someday a message goes beyond 65KiB. */
957 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
958
959 /* Check if we have enough buffer space. */
960 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
961 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
962 memory_order_relaxed);
963
964 if (IS_ZEBRA_DEBUG_FPM)
965 zlog_debug(
966 "%s: buffer full: wants to write %zu but has %zu",
967 __func__, nl_buf_len + FPM_HEADER_SIZE,
968 STREAM_WRITEABLE(fnc->obuf));
969
970 return -1;
971 }
972
973 /*
974 * Fill in the FPM header information.
975 *
976 * See FPM_HEADER_SIZE definition for more information.
977 */
978 stream_putc(fnc->obuf, 1);
979 stream_putc(fnc->obuf, 1);
980 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
981
982 /* Write current data. */
983 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
984
985 /* Account number of bytes waiting to be written. */
986 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
987 nl_buf_len + FPM_HEADER_SIZE,
988 memory_order_relaxed);
989 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
990 memory_order_relaxed);
991 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
992 memory_order_relaxed);
993 if (obytes_peak < obytes)
994 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
995 memory_order_relaxed);
996
997 /* Tell the thread to start writing. */
998 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
999 &fnc->t_write);
1000
1001 return 0;
1002 }
1003
1004 /*
1005 * LSP walk/send functions
1006 */
1007 struct fpm_lsp_arg {
1008 struct zebra_dplane_ctx *ctx;
1009 struct fpm_nl_ctx *fnc;
1010 bool complete;
1011 };
1012
1013 static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
1014 {
1015 struct zebra_lsp *lsp = bucket->data;
1016 struct fpm_lsp_arg *fla = arg;
1017
1018 /* Skip entries which have already been sent */
1019 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
1020 return HASHWALK_CONTINUE;
1021
1022 dplane_ctx_reset(fla->ctx);
1023 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
1024
1025 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
1026 fla->complete = false;
1027 return HASHWALK_ABORT;
1028 }
1029
1030 /* Mark entry as sent */
1031 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
1032 return HASHWALK_CONTINUE;
1033 }
1034
1035 static void fpm_lsp_send(struct thread *t)
1036 {
1037 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1038 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1039 struct fpm_lsp_arg fla;
1040
1041 fla.fnc = fnc;
1042 fla.ctx = dplane_ctx_alloc();
1043 fla.complete = true;
1044
1045 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
1046
1047 dplane_ctx_fini(&fla.ctx);
1048
1049 if (fla.complete) {
1050 WALK_FINISH(fnc, FNE_LSP_FINISHED);
1051
1052 /* Now move onto routes */
1053 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
1054 &fnc->t_nhgreset);
1055 } else {
1056 /* Didn't finish - reschedule LSP walk */
1057 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
1058 &fnc->t_lspwalk);
1059 }
1060 }
1061
1062 /*
1063 * Next hop walk/send functions.
1064 */
1065 struct fpm_nhg_arg {
1066 struct zebra_dplane_ctx *ctx;
1067 struct fpm_nl_ctx *fnc;
1068 bool complete;
1069 };
1070
1071 static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
1072 {
1073 struct nhg_hash_entry *nhe = bucket->data;
1074 struct fpm_nhg_arg *fna = arg;
1075
1076 /* This entry was already sent, skip it. */
1077 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
1078 return HASHWALK_CONTINUE;
1079
1080 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
1081 dplane_ctx_reset(fna->ctx);
1082 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
1083 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
1084 /* Our buffers are full, lets give it some cycles. */
1085 fna->complete = false;
1086 return HASHWALK_ABORT;
1087 }
1088
1089 /* Mark group as sent, so it doesn't get sent again. */
1090 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1091
1092 return HASHWALK_CONTINUE;
1093 }
1094
1095 static void fpm_nhg_send(struct thread *t)
1096 {
1097 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1098 struct fpm_nhg_arg fna;
1099
1100 fna.fnc = fnc;
1101 fna.ctx = dplane_ctx_alloc();
1102 fna.complete = true;
1103
1104 /* Send next hops. */
1105 if (fnc->use_nhg)
1106 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
1107
1108 /* `free()` allocated memory. */
1109 dplane_ctx_fini(&fna.ctx);
1110
1111 /* We are done sending next hops, lets install the routes now. */
1112 if (fna.complete) {
1113 WALK_FINISH(fnc, FNE_NHG_FINISHED);
1114 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
1115 &fnc->t_ribreset);
1116 } else /* Otherwise reschedule next hop group again. */
1117 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
1118 &fnc->t_nhgwalk);
1119 }
1120
1121 /**
1122 * Send all RIB installed routes to the connected data plane.
1123 */
1124 static void fpm_rib_send(struct thread *t)
1125 {
1126 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1127 rib_dest_t *dest;
1128 struct route_node *rn;
1129 struct route_table *rt;
1130 struct zebra_dplane_ctx *ctx;
1131 rib_tables_iter_t rt_iter;
1132
1133 /* Allocate temporary context for all transactions. */
1134 ctx = dplane_ctx_alloc();
1135
1136 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1137 while ((rt = rib_tables_iter_next(&rt_iter))) {
1138 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1139 dest = rib_dest_from_rnode(rn);
1140 /* Skip bad route entries. */
1141 if (dest == NULL || dest->selected_fib == NULL)
1142 continue;
1143
1144 /* Check for already sent routes. */
1145 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
1146 continue;
1147
1148 /* Enqueue route install. */
1149 dplane_ctx_reset(ctx);
1150 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1151 dest->selected_fib);
1152 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1153 /* Free the temporary allocated context. */
1154 dplane_ctx_fini(&ctx);
1155
1156 thread_add_timer(zrouter.master, fpm_rib_send,
1157 fnc, 1, &fnc->t_ribwalk);
1158 return;
1159 }
1160
1161 /* Mark as sent. */
1162 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1163 }
1164 }
1165
1166 /* Free the temporary allocated context. */
1167 dplane_ctx_fini(&ctx);
1168
1169 /* All RIB routes sent! */
1170 WALK_FINISH(fnc, FNE_RIB_FINISHED);
1171
1172 /* Schedule next event: RMAC reset. */
1173 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1174 &fnc->t_rmacreset);
1175 }
1176
1177 /*
1178 * The next three functions will handle RMAC enqueue.
1179 */
1180 struct fpm_rmac_arg {
1181 struct zebra_dplane_ctx *ctx;
1182 struct fpm_nl_ctx *fnc;
1183 struct zebra_l3vni *zl3vni;
1184 bool complete;
1185 };
1186
1187 static void fpm_enqueue_rmac_table(struct hash_bucket *bucket, void *arg)
1188 {
1189 struct fpm_rmac_arg *fra = arg;
1190 struct zebra_mac *zrmac = bucket->data;
1191 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1192 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1193 struct zebra_if *br_zif;
1194 vlanid_t vid;
1195 bool sticky;
1196
1197 /* Entry already sent. */
1198 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
1199 return;
1200
1201 sticky = !!CHECK_FLAG(zrmac->flags,
1202 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1203 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1204 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1205
1206 dplane_ctx_reset(fra->ctx);
1207 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1208 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
1209 zif->brslave_info.br_if, vid,
1210 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky,
1211 0 /*nhg*/, 0 /*update_flags*/);
1212 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
1213 thread_add_timer(zrouter.master, fpm_rmac_send,
1214 fra->fnc, 1, &fra->fnc->t_rmacwalk);
1215 fra->complete = false;
1216 }
1217 }
1218
1219 static void fpm_enqueue_l3vni_table(struct hash_bucket *bucket, void *arg)
1220 {
1221 struct fpm_rmac_arg *fra = arg;
1222 struct zebra_l3vni *zl3vni = bucket->data;
1223
1224 fra->zl3vni = zl3vni;
1225 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1226 }
1227
1228 static void fpm_rmac_send(struct thread *t)
1229 {
1230 struct fpm_rmac_arg fra;
1231
1232 fra.fnc = THREAD_ARG(t);
1233 fra.ctx = dplane_ctx_alloc();
1234 fra.complete = true;
1235 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1236 dplane_ctx_fini(&fra.ctx);
1237
1238 /* RMAC walk completed. */
1239 if (fra.complete)
1240 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
1241 }
1242
1243 /*
1244 * Resets the next hop FPM flags so we send all next hops again.
1245 */
1246 static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1247 {
1248 struct nhg_hash_entry *nhe = bucket->data;
1249
1250 /* Unset FPM installation flag so it gets installed again. */
1251 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1252 }
1253
1254 static void fpm_nhg_reset(struct thread *t)
1255 {
1256 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1257
1258 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
1259
1260 /* Schedule next step: send next hop groups. */
1261 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
1262 }
1263
1264 /*
1265 * Resets the LSP FPM flag so we send all LSPs again.
1266 */
1267 static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1268 {
1269 struct zebra_lsp *lsp = bucket->data;
1270
1271 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1272 }
1273
1274 static void fpm_lsp_reset(struct thread *t)
1275 {
1276 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1277 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1278
1279 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1280
1281 /* Schedule next step: send LSPs */
1282 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
1283 }
1284
1285 /**
1286 * Resets the RIB FPM flags so we send all routes again.
1287 */
1288 static void fpm_rib_reset(struct thread *t)
1289 {
1290 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1291 rib_dest_t *dest;
1292 struct route_node *rn;
1293 struct route_table *rt;
1294 rib_tables_iter_t rt_iter;
1295
1296 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1297 while ((rt = rib_tables_iter_next(&rt_iter))) {
1298 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1299 dest = rib_dest_from_rnode(rn);
1300 /* Skip bad route entries. */
1301 if (dest == NULL)
1302 continue;
1303
1304 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1305 }
1306 }
1307
1308 /* Schedule next step: send RIB routes. */
1309 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
1310 }
1311
1312 /*
1313 * The next three function will handle RMAC table reset.
1314 */
1315 static void fpm_unset_rmac_table(struct hash_bucket *bucket, void *arg)
1316 {
1317 struct zebra_mac *zrmac = bucket->data;
1318
1319 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1320 }
1321
1322 static void fpm_unset_l3vni_table(struct hash_bucket *bucket, void *arg)
1323 {
1324 struct zebra_l3vni *zl3vni = bucket->data;
1325
1326 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1327 }
1328
1329 static void fpm_rmac_reset(struct thread *t)
1330 {
1331 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1332
1333 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1334
1335 /* Schedule next event: send RMAC entries. */
1336 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1337 &fnc->t_rmacwalk);
1338 }
1339
1340 static void fpm_process_queue(struct thread *t)
1341 {
1342 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1343 struct zebra_dplane_ctx *ctx;
1344 bool no_bufs = false;
1345 uint64_t processed_contexts = 0;
1346
1347 while (true) {
1348 /* No space available yet. */
1349 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) {
1350 no_bufs = true;
1351 break;
1352 }
1353
1354 /* Dequeue next item or quit processing. */
1355 frr_with_mutex (&fnc->ctxqueue_mutex) {
1356 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1357 }
1358 if (ctx == NULL)
1359 break;
1360
1361 /*
1362 * Intentionally ignoring the return value
1363 * as that we are ensuring that we can write to
1364 * the output data in the STREAM_WRITEABLE
1365 * check above, so we can ignore the return
1366 */
1367 if (fnc->socket != -1)
1368 (void)fpm_nl_enqueue(fnc, ctx);
1369
1370 /* Account the processed entries. */
1371 processed_contexts++;
1372 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1373 memory_order_relaxed);
1374
1375 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1376 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1377 }
1378
1379 /* Update count of processed contexts */
1380 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts,
1381 processed_contexts, memory_order_relaxed);
1382
1383 /* Re-schedule if we ran out of buffer space */
1384 if (no_bufs)
1385 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1386 fnc, 0, &fnc->t_dequeue);
1387
1388 /*
1389 * Let the dataplane thread know if there are items in the
1390 * output queue to be processed. Otherwise they may sit
1391 * until the dataplane thread gets scheduled for new,
1392 * unrelated work.
1393 */
1394 if (dplane_provider_out_ctx_queue_len(fnc->prov) > 0)
1395 dplane_provider_work_ready();
1396 }
1397
1398 /**
1399 * Handles external (e.g. CLI, data plane or others) events.
1400 */
1401 static void fpm_process_event(struct thread *t)
1402 {
1403 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1404 enum fpm_nl_events event = THREAD_VAL(t);
1405
1406 switch (event) {
1407 case FNE_DISABLE:
1408 zlog_info("%s: manual FPM disable event", __func__);
1409 fnc->disabled = true;
1410 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1411 memory_order_relaxed);
1412
1413 /* Call reconnect to disable timers and clean up context. */
1414 fpm_reconnect(fnc);
1415 break;
1416
1417 case FNE_RECONNECT:
1418 zlog_info("%s: manual FPM reconnect event", __func__);
1419 fnc->disabled = false;
1420 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1421 memory_order_relaxed);
1422 fpm_reconnect(fnc);
1423 break;
1424
1425 case FNE_RESET_COUNTERS:
1426 zlog_info("%s: manual FPM counters reset event", __func__);
1427 memset(&fnc->counters, 0, sizeof(fnc->counters));
1428 break;
1429
1430 case FNE_TOGGLE_NHG:
1431 zlog_info("%s: toggle next hop groups support", __func__);
1432 fnc->use_nhg = !fnc->use_nhg;
1433 fpm_reconnect(fnc);
1434 break;
1435
1436 case FNE_INTERNAL_RECONNECT:
1437 fpm_reconnect(fnc);
1438 break;
1439
1440 case FNE_NHG_FINISHED:
1441 if (IS_ZEBRA_DEBUG_FPM)
1442 zlog_debug("%s: next hop groups walk finished",
1443 __func__);
1444 break;
1445 case FNE_RIB_FINISHED:
1446 if (IS_ZEBRA_DEBUG_FPM)
1447 zlog_debug("%s: RIB walk finished", __func__);
1448 break;
1449 case FNE_RMAC_FINISHED:
1450 if (IS_ZEBRA_DEBUG_FPM)
1451 zlog_debug("%s: RMAC walk finished", __func__);
1452 break;
1453 case FNE_LSP_FINISHED:
1454 if (IS_ZEBRA_DEBUG_FPM)
1455 zlog_debug("%s: LSP walk finished", __func__);
1456 break;
1457 }
1458 }
1459
1460 /*
1461 * Data plane functions.
1462 */
1463 static int fpm_nl_start(struct zebra_dplane_provider *prov)
1464 {
1465 struct fpm_nl_ctx *fnc;
1466
1467 fnc = dplane_provider_get_data(prov);
1468 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1469 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1470 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1471 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1472 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1473 fnc->socket = -1;
1474 fnc->disabled = true;
1475 fnc->prov = prov;
1476 dplane_ctx_q_init(&fnc->ctxqueue);
1477 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
1478
1479 /* Set default values. */
1480 fnc->use_nhg = true;
1481
1482 return 0;
1483 }
1484
1485 static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
1486 {
1487 /* Disable all events and close socket. */
1488 THREAD_OFF(fnc->t_lspreset);
1489 THREAD_OFF(fnc->t_lspwalk);
1490 THREAD_OFF(fnc->t_nhgreset);
1491 THREAD_OFF(fnc->t_nhgwalk);
1492 THREAD_OFF(fnc->t_ribreset);
1493 THREAD_OFF(fnc->t_ribwalk);
1494 THREAD_OFF(fnc->t_rmacreset);
1495 THREAD_OFF(fnc->t_rmacwalk);
1496 THREAD_OFF(fnc->t_event);
1497 THREAD_OFF(fnc->t_nhg);
1498 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1499 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1500 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
1501
1502 if (fnc->socket != -1) {
1503 close(fnc->socket);
1504 fnc->socket = -1;
1505 }
1506
1507 return 0;
1508 }
1509
1510 static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1511 {
1512 /* Stop the running thread. */
1513 frr_pthread_stop(fnc->fthread, NULL);
1514
1515 /* Free all allocated resources. */
1516 pthread_mutex_destroy(&fnc->obuf_mutex);
1517 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
1518 stream_free(fnc->ibuf);
1519 stream_free(fnc->obuf);
1520 free(gfnc);
1521 gfnc = NULL;
1522
1523 return 0;
1524 }
1525
1526 static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1527 {
1528 struct fpm_nl_ctx *fnc;
1529
1530 fnc = dplane_provider_get_data(prov);
1531 if (early)
1532 return fpm_nl_finish_early(fnc);
1533
1534 return fpm_nl_finish_late(fnc);
1535 }
1536
1537 static int fpm_nl_process(struct zebra_dplane_provider *prov)
1538 {
1539 struct zebra_dplane_ctx *ctx;
1540 struct fpm_nl_ctx *fnc;
1541 int counter, limit;
1542 uint64_t cur_queue, peak_queue = 0, stored_peak_queue;
1543
1544 fnc = dplane_provider_get_data(prov);
1545 limit = dplane_provider_get_work_limit(prov);
1546 for (counter = 0; counter < limit; counter++) {
1547 ctx = dplane_provider_dequeue_in_ctx(prov);
1548 if (ctx == NULL)
1549 break;
1550
1551 /*
1552 * Skip all notifications if not connected, we'll walk the RIB
1553 * anyway.
1554 */
1555 if (fnc->socket != -1 && fnc->connecting == false) {
1556 /*
1557 * Update the number of queued contexts *before*
1558 * enqueueing, to ensure counter consistency.
1559 */
1560 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1561 1, memory_order_relaxed);
1562
1563 frr_with_mutex (&fnc->ctxqueue_mutex) {
1564 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1565 }
1566
1567 cur_queue = atomic_load_explicit(
1568 &fnc->counters.ctxqueue_len,
1569 memory_order_relaxed);
1570 if (peak_queue < cur_queue)
1571 peak_queue = cur_queue;
1572 continue;
1573 }
1574
1575 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1576 dplane_provider_enqueue_out_ctx(prov, ctx);
1577 }
1578
1579 /* Update peak queue length, if we just observed a new peak */
1580 stored_peak_queue = atomic_load_explicit(
1581 &fnc->counters.ctxqueue_len_peak, memory_order_relaxed);
1582 if (stored_peak_queue < peak_queue)
1583 atomic_store_explicit(&fnc->counters.ctxqueue_len_peak,
1584 peak_queue, memory_order_relaxed);
1585
1586 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1587 memory_order_relaxed)
1588 > 0)
1589 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1590 fnc, 0, &fnc->t_dequeue);
1591
1592 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1593 if (counter >= limit)
1594 dplane_provider_work_ready();
1595
1596 return 0;
1597 }
1598
1599 static int fpm_nl_new(struct thread_master *tm)
1600 {
1601 struct zebra_dplane_provider *prov = NULL;
1602 int rv;
1603
1604 gfnc = calloc(1, sizeof(*gfnc));
1605 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1606 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
1607 fpm_nl_process, fpm_nl_finish, gfnc,
1608 &prov);
1609
1610 if (IS_ZEBRA_DEBUG_DPLANE)
1611 zlog_debug("%s register status: %d", prov_name, rv);
1612
1613 install_node(&fpm_node);
1614 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1615 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1616 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
1617 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1618 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
1619 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1620 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
1621
1622 return 0;
1623 }
1624
1625 static int fpm_nl_init(void)
1626 {
1627 hook_register(frr_late_init, fpm_nl_new);
1628 return 0;
1629 }
1630
1631 FRR_MODULE_SETUP(
1632 .name = "dplane_fpm_nl",
1633 .version = "0.0.1",
1634 .description = "Data plane plugin for FPM using netlink.",
1635 .init = fpm_nl_init,
1636 );