]> git.proxmox.com Git - mirror_frr.git/blob - zebra/dplane_fpm_nl.c
Merge pull request #10643 from Jafaral/ospf-multi-vrf
[mirror_frr.git] / zebra / dplane_fpm_nl.c
1 /*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h" /* Include this explicitly */
24 #endif
25
26 #include <arpa/inet.h>
27
28 #include <sys/types.h>
29 #include <sys/socket.h>
30
31 #include <errno.h>
32 #include <string.h>
33
34 #include "lib/zebra.h"
35 #include "lib/json.h"
36 #include "lib/libfrr.h"
37 #include "lib/frratomic.h"
38 #include "lib/command.h"
39 #include "lib/memory.h"
40 #include "lib/network.h"
41 #include "lib/ns.h"
42 #include "lib/frr_pthread.h"
43 #include "zebra/debug.h"
44 #include "zebra/interface.h"
45 #include "zebra/zebra_dplane.h"
46 #include "zebra/zebra_mpls.h"
47 #include "zebra/zebra_router.h"
48 #include "zebra/zebra_evpn.h"
49 #include "zebra/zebra_evpn_mac.h"
50 #include "zebra/zebra_vxlan_private.h"
51 #include "zebra/kernel_netlink.h"
52 #include "zebra/rt_netlink.h"
53 #include "zebra/debug.h"
54
55 #define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
56 #define SOUTHBOUND_DEFAULT_PORT 2620
57
58 /**
59 * FPM header:
60 * {
61 * version: 1 byte (always 1),
62 * type: 1 byte (1 for netlink, 2 protobuf),
63 * len: 2 bytes (network order),
64 * }
65 *
66 * This header is used with any format to tell the users how many bytes to
67 * expect.
68 */
69 #define FPM_HEADER_SIZE 4
70
71 static const char *prov_name = "dplane_fpm_nl";
72
73 struct fpm_nl_ctx {
74 /* data plane connection. */
75 int socket;
76 bool disabled;
77 bool connecting;
78 bool use_nhg;
79 struct sockaddr_storage addr;
80
81 /* data plane buffers. */
82 struct stream *ibuf;
83 struct stream *obuf;
84 pthread_mutex_t obuf_mutex;
85
86 /*
87 * data plane context queue:
88 * When a FPM server connection becomes a bottleneck, we must keep the
89 * data plane contexts until we get a chance to process them.
90 */
91 struct dplane_ctx_q ctxqueue;
92 pthread_mutex_t ctxqueue_mutex;
93
94 /* data plane events. */
95 struct zebra_dplane_provider *prov;
96 struct frr_pthread *fthread;
97 struct thread *t_connect;
98 struct thread *t_read;
99 struct thread *t_write;
100 struct thread *t_event;
101 struct thread *t_dequeue;
102
103 /* zebra events. */
104 struct thread *t_lspreset;
105 struct thread *t_lspwalk;
106 struct thread *t_nhgreset;
107 struct thread *t_nhgwalk;
108 struct thread *t_ribreset;
109 struct thread *t_ribwalk;
110 struct thread *t_rmacreset;
111 struct thread *t_rmacwalk;
112
113 /* Statistic counters. */
114 struct {
115 /* Amount of bytes read into ibuf. */
116 _Atomic uint32_t bytes_read;
117 /* Amount of bytes written from obuf. */
118 _Atomic uint32_t bytes_sent;
119 /* Output buffer current usage. */
120 _Atomic uint32_t obuf_bytes;
121 /* Output buffer peak usage. */
122 _Atomic uint32_t obuf_peak;
123
124 /* Amount of connection closes. */
125 _Atomic uint32_t connection_closes;
126 /* Amount of connection errors. */
127 _Atomic uint32_t connection_errors;
128
129 /* Amount of user configurations: FNE_RECONNECT. */
130 _Atomic uint32_t user_configures;
131 /* Amount of user disable requests: FNE_DISABLE. */
132 _Atomic uint32_t user_disables;
133
134 /* Amount of data plane context processed. */
135 _Atomic uint32_t dplane_contexts;
136 /* Amount of data plane contexts enqueued. */
137 _Atomic uint32_t ctxqueue_len;
138 /* Peak amount of data plane contexts enqueued. */
139 _Atomic uint32_t ctxqueue_len_peak;
140
141 /* Amount of buffer full events. */
142 _Atomic uint32_t buffer_full;
143 } counters;
144 } *gfnc;
145
146 enum fpm_nl_events {
147 /* Ask for FPM to reconnect the external server. */
148 FNE_RECONNECT,
149 /* Disable FPM. */
150 FNE_DISABLE,
151 /* Reset counters. */
152 FNE_RESET_COUNTERS,
153 /* Toggle next hop group feature. */
154 FNE_TOGGLE_NHG,
155 /* Reconnect request by our own code to avoid races. */
156 FNE_INTERNAL_RECONNECT,
157
158 /* LSP walk finished. */
159 FNE_LSP_FINISHED,
160 /* Next hop groups walk finished. */
161 FNE_NHG_FINISHED,
162 /* RIB walk finished. */
163 FNE_RIB_FINISHED,
164 /* RMAC walk finished. */
165 FNE_RMAC_FINISHED,
166 };
167
168 #define FPM_RECONNECT(fnc) \
169 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
170 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
171
172 #define WALK_FINISH(fnc, ev) \
173 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
174 (ev), NULL)
175
176 /*
177 * Prototypes.
178 */
179 static void fpm_process_event(struct thread *t);
180 static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
181 static void fpm_lsp_send(struct thread *t);
182 static void fpm_lsp_reset(struct thread *t);
183 static void fpm_nhg_send(struct thread *t);
184 static void fpm_nhg_reset(struct thread *t);
185 static void fpm_rib_send(struct thread *t);
186 static void fpm_rib_reset(struct thread *t);
187 static void fpm_rmac_send(struct thread *t);
188 static void fpm_rmac_reset(struct thread *t);
189
190 /*
191 * CLI.
192 */
193 #define FPM_STR "Forwarding Plane Manager configuration\n"
194
195 DEFUN(fpm_set_address, fpm_set_address_cmd,
196 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
197 FPM_STR
198 "FPM remote listening server address\n"
199 "Remote IPv4 FPM server\n"
200 "Remote IPv6 FPM server\n"
201 "FPM remote listening server port\n"
202 "Remote FPM server port\n")
203 {
204 struct sockaddr_in *sin;
205 struct sockaddr_in6 *sin6;
206 uint16_t port = 0;
207 uint8_t naddr[INET6_BUFSIZ];
208
209 if (argc == 5)
210 port = strtol(argv[4]->arg, NULL, 10);
211
212 /* Handle IPv4 addresses. */
213 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
214 sin = (struct sockaddr_in *)&gfnc->addr;
215
216 memset(sin, 0, sizeof(*sin));
217 sin->sin_family = AF_INET;
218 sin->sin_port =
219 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
220 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
221 sin->sin_len = sizeof(*sin);
222 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
223 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
224
225 goto ask_reconnect;
226 }
227
228 /* Handle IPv6 addresses. */
229 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
230 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
231 return CMD_WARNING;
232 }
233
234 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
235 memset(sin6, 0, sizeof(*sin6));
236 sin6->sin6_family = AF_INET6;
237 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
238 #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
239 sin6->sin6_len = sizeof(*sin6);
240 #endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
241 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
242
243 ask_reconnect:
244 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
245 FNE_RECONNECT, &gfnc->t_event);
246 return CMD_SUCCESS;
247 }
248
249 DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
250 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
251 NO_STR
252 FPM_STR
253 "FPM remote listening server address\n"
254 "Remote IPv4 FPM server\n"
255 "Remote IPv6 FPM server\n"
256 "FPM remote listening server port\n"
257 "Remote FPM server port\n")
258 {
259 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
260 FNE_DISABLE, &gfnc->t_event);
261 return CMD_SUCCESS;
262 }
263
264 DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
265 "fpm use-next-hop-groups",
266 FPM_STR
267 "Use netlink next hop groups feature.\n")
268 {
269 /* Already enabled. */
270 if (gfnc->use_nhg)
271 return CMD_SUCCESS;
272
273 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
274 FNE_TOGGLE_NHG, &gfnc->t_event);
275
276 return CMD_SUCCESS;
277 }
278
279 DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
280 "no fpm use-next-hop-groups",
281 NO_STR
282 FPM_STR
283 "Use netlink next hop groups feature.\n")
284 {
285 /* Already disabled. */
286 if (!gfnc->use_nhg)
287 return CMD_SUCCESS;
288
289 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
290 FNE_TOGGLE_NHG, &gfnc->t_event);
291
292 return CMD_SUCCESS;
293 }
294
295 DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
296 "clear fpm counters",
297 CLEAR_STR
298 FPM_STR
299 "FPM statistic counters\n")
300 {
301 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
302 FNE_RESET_COUNTERS, &gfnc->t_event);
303 return CMD_SUCCESS;
304 }
305
306 DEFUN(fpm_show_counters, fpm_show_counters_cmd,
307 "show fpm counters",
308 SHOW_STR
309 FPM_STR
310 "FPM statistic counters\n")
311 {
312 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
313
314 #define SHOW_COUNTER(label, counter) \
315 vty_out(vty, "%28s: %u\n", (label), (counter))
316
317 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
318 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
319 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
320 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
321 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
322 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
323 SHOW_COUNTER("Data plane items processed",
324 gfnc->counters.dplane_contexts);
325 SHOW_COUNTER("Data plane items enqueued",
326 gfnc->counters.ctxqueue_len);
327 SHOW_COUNTER("Data plane items queue peak",
328 gfnc->counters.ctxqueue_len_peak);
329 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
330 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
331 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
332
333 #undef SHOW_COUNTER
334
335 return CMD_SUCCESS;
336 }
337
338 DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
339 "show fpm counters json",
340 SHOW_STR
341 FPM_STR
342 "FPM statistic counters\n"
343 JSON_STR)
344 {
345 struct json_object *jo;
346
347 jo = json_object_new_object();
348 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
349 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
350 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
351 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
352 json_object_int_add(jo, "connection-closes",
353 gfnc->counters.connection_closes);
354 json_object_int_add(jo, "connection-errors",
355 gfnc->counters.connection_errors);
356 json_object_int_add(jo, "data-plane-contexts",
357 gfnc->counters.dplane_contexts);
358 json_object_int_add(jo, "data-plane-contexts-queue",
359 gfnc->counters.ctxqueue_len);
360 json_object_int_add(jo, "data-plane-contexts-queue-peak",
361 gfnc->counters.ctxqueue_len_peak);
362 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
363 json_object_int_add(jo, "user-configures",
364 gfnc->counters.user_configures);
365 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
366 vty_json(vty, jo);
367
368 return CMD_SUCCESS;
369 }
370
371 static int fpm_write_config(struct vty *vty)
372 {
373 struct sockaddr_in *sin;
374 struct sockaddr_in6 *sin6;
375 int written = 0;
376
377 if (gfnc->disabled)
378 return written;
379
380 switch (gfnc->addr.ss_family) {
381 case AF_INET:
382 written = 1;
383 sin = (struct sockaddr_in *)&gfnc->addr;
384 vty_out(vty, "fpm address %pI4", &sin->sin_addr);
385 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
386 vty_out(vty, " port %d", ntohs(sin->sin_port));
387
388 vty_out(vty, "\n");
389 break;
390 case AF_INET6:
391 written = 1;
392 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
393 vty_out(vty, "fpm address %pI6", &sin6->sin6_addr);
394 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
395 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
396
397 vty_out(vty, "\n");
398 break;
399
400 default:
401 break;
402 }
403
404 if (!gfnc->use_nhg) {
405 vty_out(vty, "no fpm use-next-hop-groups\n");
406 written = 1;
407 }
408
409 return written;
410 }
411
412 static struct cmd_node fpm_node = {
413 .name = "fpm",
414 .node = FPM_NODE,
415 .prompt = "",
416 .config_write = fpm_write_config,
417 };
418
419 /*
420 * FPM functions.
421 */
422 static void fpm_connect(struct thread *t);
423
424 static void fpm_reconnect(struct fpm_nl_ctx *fnc)
425 {
426 /* Cancel all zebra threads first. */
427 thread_cancel_async(zrouter.master, &fnc->t_lspreset, NULL);
428 thread_cancel_async(zrouter.master, &fnc->t_lspwalk, NULL);
429 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
430 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
431 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
432 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
433 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
434 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
435
436 /*
437 * Grab the lock to empty the streams (data plane might try to
438 * enqueue updates while we are closing).
439 */
440 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
441
442 /* Avoid calling close on `-1`. */
443 if (fnc->socket != -1) {
444 close(fnc->socket);
445 fnc->socket = -1;
446 }
447
448 stream_reset(fnc->ibuf);
449 stream_reset(fnc->obuf);
450 THREAD_OFF(fnc->t_read);
451 THREAD_OFF(fnc->t_write);
452
453 /* FPM is disabled, don't attempt to connect. */
454 if (fnc->disabled)
455 return;
456
457 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
458 &fnc->t_connect);
459 }
460
461 static void fpm_read(struct thread *t)
462 {
463 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
464 ssize_t rv;
465
466 /* Let's ignore the input at the moment. */
467 rv = stream_read_try(fnc->ibuf, fnc->socket,
468 STREAM_WRITEABLE(fnc->ibuf));
469 /* We've got an interruption. */
470 if (rv == -2) {
471 /* Schedule next read. */
472 thread_add_read(fnc->fthread->master, fpm_read, fnc,
473 fnc->socket, &fnc->t_read);
474 return;
475 }
476 if (rv == 0) {
477 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
478 memory_order_relaxed);
479
480 if (IS_ZEBRA_DEBUG_FPM)
481 zlog_debug("%s: connection closed", __func__);
482
483 FPM_RECONNECT(fnc);
484 return;
485 }
486 if (rv == -1) {
487 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
488 memory_order_relaxed);
489 zlog_warn("%s: connection failure: %s", __func__,
490 strerror(errno));
491 FPM_RECONNECT(fnc);
492 return;
493 }
494 stream_reset(fnc->ibuf);
495
496 /* Account all bytes read. */
497 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
498 memory_order_relaxed);
499
500 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
501 &fnc->t_read);
502 }
503
504 static void fpm_write(struct thread *t)
505 {
506 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
507 socklen_t statuslen;
508 ssize_t bwritten;
509 int rv, status;
510 size_t btotal;
511
512 if (fnc->connecting == true) {
513 status = 0;
514 statuslen = sizeof(status);
515
516 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
517 &statuslen);
518 if (rv == -1 || status != 0) {
519 if (rv != -1)
520 zlog_warn("%s: connection failed: %s", __func__,
521 strerror(status));
522 else
523 zlog_warn("%s: SO_ERROR failed: %s", __func__,
524 strerror(status));
525
526 atomic_fetch_add_explicit(
527 &fnc->counters.connection_errors, 1,
528 memory_order_relaxed);
529
530 FPM_RECONNECT(fnc);
531 return;
532 }
533
534 fnc->connecting = false;
535
536 /*
537 * Starting with LSPs walk all FPM objects, marking them
538 * as unsent and then replaying them.
539 */
540 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
541 &fnc->t_lspreset);
542
543 /* Permit receiving messages now. */
544 thread_add_read(fnc->fthread->master, fpm_read, fnc,
545 fnc->socket, &fnc->t_read);
546 }
547
548 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
549
550 while (true) {
551 /* Stream is empty: reset pointers and return. */
552 if (STREAM_READABLE(fnc->obuf) == 0) {
553 stream_reset(fnc->obuf);
554 break;
555 }
556
557 /* Try to write all at once. */
558 btotal = stream_get_endp(fnc->obuf) -
559 stream_get_getp(fnc->obuf);
560 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
561 if (bwritten == 0) {
562 atomic_fetch_add_explicit(
563 &fnc->counters.connection_closes, 1,
564 memory_order_relaxed);
565
566 if (IS_ZEBRA_DEBUG_FPM)
567 zlog_debug("%s: connection closed", __func__);
568 break;
569 }
570 if (bwritten == -1) {
571 /* Attempt to continue if blocked by a signal. */
572 if (errno == EINTR)
573 continue;
574 /* Receiver is probably slow, lets give it some time. */
575 if (errno == EAGAIN || errno == EWOULDBLOCK)
576 break;
577
578 atomic_fetch_add_explicit(
579 &fnc->counters.connection_errors, 1,
580 memory_order_relaxed);
581 zlog_warn("%s: connection failure: %s", __func__,
582 strerror(errno));
583
584 FPM_RECONNECT(fnc);
585 return;
586 }
587
588 /* Account all bytes sent. */
589 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
590 memory_order_relaxed);
591
592 /* Account number of bytes free. */
593 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
594 memory_order_relaxed);
595
596 stream_forward_getp(fnc->obuf, (size_t)bwritten);
597 }
598
599 /* Stream is not empty yet, we must schedule more writes. */
600 if (STREAM_READABLE(fnc->obuf)) {
601 stream_pulldown(fnc->obuf);
602 thread_add_write(fnc->fthread->master, fpm_write, fnc,
603 fnc->socket, &fnc->t_write);
604 return;
605 }
606 }
607
608 static void fpm_connect(struct thread *t)
609 {
610 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
611 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
612 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
613 socklen_t slen;
614 int rv, sock;
615 char addrstr[INET6_ADDRSTRLEN];
616
617 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
618 if (sock == -1) {
619 zlog_err("%s: fpm socket failed: %s", __func__,
620 strerror(errno));
621 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
622 &fnc->t_connect);
623 return;
624 }
625
626 set_nonblocking(sock);
627
628 if (fnc->addr.ss_family == AF_INET) {
629 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
630 slen = sizeof(*sin);
631 } else {
632 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
633 slen = sizeof(*sin6);
634 }
635
636 if (IS_ZEBRA_DEBUG_FPM)
637 zlog_debug("%s: attempting to connect to %s:%d", __func__,
638 addrstr, ntohs(sin->sin_port));
639
640 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
641 if (rv == -1 && errno != EINPROGRESS) {
642 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
643 memory_order_relaxed);
644 close(sock);
645 zlog_warn("%s: fpm connection failed: %s", __func__,
646 strerror(errno));
647 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
648 &fnc->t_connect);
649 return;
650 }
651
652 fnc->connecting = (errno == EINPROGRESS);
653 fnc->socket = sock;
654 if (!fnc->connecting)
655 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
656 &fnc->t_read);
657 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
658 &fnc->t_write);
659
660 /*
661 * Starting with LSPs walk all FPM objects, marking them
662 * as unsent and then replaying them.
663 *
664 * If we are not connected, then delay the objects reset/send.
665 */
666 if (!fnc->connecting)
667 thread_add_timer(zrouter.master, fpm_lsp_reset, fnc, 0,
668 &fnc->t_lspreset);
669 }
670
671 /**
672 * Encode data plane operation context into netlink and enqueue it in the FPM
673 * output buffer.
674 *
675 * @param fnc the netlink FPM context.
676 * @param ctx the data plane operation context data.
677 * @return 0 on success or -1 on not enough space.
678 */
679 static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
680 {
681 uint8_t nl_buf[NL_PKT_BUF_SIZE];
682 size_t nl_buf_len;
683 ssize_t rv;
684 uint64_t obytes, obytes_peak;
685 enum dplane_op_e op = dplane_ctx_get_op(ctx);
686
687 /*
688 * If we were configured to not use next hop groups, then quit as soon
689 * as possible.
690 */
691 if ((!fnc->use_nhg)
692 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
693 || op == DPLANE_OP_NH_UPDATE))
694 return 0;
695
696 nl_buf_len = 0;
697
698 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
699
700 switch (op) {
701 case DPLANE_OP_ROUTE_UPDATE:
702 case DPLANE_OP_ROUTE_DELETE:
703 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
704 nl_buf, sizeof(nl_buf),
705 true, fnc->use_nhg);
706 if (rv <= 0) {
707 zlog_err(
708 "%s: netlink_route_multipath_msg_encode failed",
709 __func__);
710 return 0;
711 }
712
713 nl_buf_len = (size_t)rv;
714
715 /* UPDATE operations need a INSTALL, otherwise just quit. */
716 if (op == DPLANE_OP_ROUTE_DELETE)
717 break;
718
719 /* FALL THROUGH */
720 case DPLANE_OP_ROUTE_INSTALL:
721 rv = netlink_route_multipath_msg_encode(
722 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
723 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
724 if (rv <= 0) {
725 zlog_err(
726 "%s: netlink_route_multipath_msg_encode failed",
727 __func__);
728 return 0;
729 }
730
731 nl_buf_len += (size_t)rv;
732 break;
733
734 case DPLANE_OP_MAC_INSTALL:
735 case DPLANE_OP_MAC_DELETE:
736 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
737 if (rv <= 0) {
738 zlog_err("%s: netlink_macfdb_update_ctx failed",
739 __func__);
740 return 0;
741 }
742
743 nl_buf_len = (size_t)rv;
744 break;
745
746 case DPLANE_OP_NH_DELETE:
747 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
748 sizeof(nl_buf));
749 if (rv <= 0) {
750 zlog_err("%s: netlink_nexthop_msg_encode failed",
751 __func__);
752 return 0;
753 }
754
755 nl_buf_len = (size_t)rv;
756 break;
757 case DPLANE_OP_NH_INSTALL:
758 case DPLANE_OP_NH_UPDATE:
759 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
760 sizeof(nl_buf));
761 if (rv <= 0) {
762 zlog_err("%s: netlink_nexthop_msg_encode failed",
763 __func__);
764 return 0;
765 }
766
767 nl_buf_len = (size_t)rv;
768 break;
769
770 case DPLANE_OP_LSP_INSTALL:
771 case DPLANE_OP_LSP_UPDATE:
772 case DPLANE_OP_LSP_DELETE:
773 rv = netlink_lsp_msg_encoder(ctx, nl_buf, sizeof(nl_buf));
774 if (rv <= 0) {
775 zlog_err("%s: netlink_lsp_msg_encoder failed",
776 __func__);
777 return 0;
778 }
779
780 nl_buf_len += (size_t)rv;
781 break;
782
783 /* Un-handled by FPM at this time. */
784 case DPLANE_OP_PW_INSTALL:
785 case DPLANE_OP_PW_UNINSTALL:
786 case DPLANE_OP_ADDR_INSTALL:
787 case DPLANE_OP_ADDR_UNINSTALL:
788 case DPLANE_OP_NEIGH_INSTALL:
789 case DPLANE_OP_NEIGH_UPDATE:
790 case DPLANE_OP_NEIGH_DELETE:
791 case DPLANE_OP_VTEP_ADD:
792 case DPLANE_OP_VTEP_DELETE:
793 case DPLANE_OP_SYS_ROUTE_ADD:
794 case DPLANE_OP_SYS_ROUTE_DELETE:
795 case DPLANE_OP_ROUTE_NOTIFY:
796 case DPLANE_OP_LSP_NOTIFY:
797 case DPLANE_OP_RULE_ADD:
798 case DPLANE_OP_RULE_DELETE:
799 case DPLANE_OP_RULE_UPDATE:
800 case DPLANE_OP_NEIGH_DISCOVER:
801 case DPLANE_OP_BR_PORT_UPDATE:
802 case DPLANE_OP_IPTABLE_ADD:
803 case DPLANE_OP_IPTABLE_DELETE:
804 case DPLANE_OP_IPSET_ADD:
805 case DPLANE_OP_IPSET_DELETE:
806 case DPLANE_OP_IPSET_ENTRY_ADD:
807 case DPLANE_OP_IPSET_ENTRY_DELETE:
808 case DPLANE_OP_NEIGH_IP_INSTALL:
809 case DPLANE_OP_NEIGH_IP_DELETE:
810 case DPLANE_OP_NEIGH_TABLE_UPDATE:
811 case DPLANE_OP_GRE_SET:
812 case DPLANE_OP_INTF_ADDR_ADD:
813 case DPLANE_OP_INTF_ADDR_DEL:
814 case DPLANE_OP_INTF_NETCONFIG:
815 case DPLANE_OP_NONE:
816 break;
817
818 }
819
820 /* Skip empty enqueues. */
821 if (nl_buf_len == 0)
822 return 0;
823
824 /* We must know if someday a message goes beyond 65KiB. */
825 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
826
827 /* Check if we have enough buffer space. */
828 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
829 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
830 memory_order_relaxed);
831
832 if (IS_ZEBRA_DEBUG_FPM)
833 zlog_debug(
834 "%s: buffer full: wants to write %zu but has %zu",
835 __func__, nl_buf_len + FPM_HEADER_SIZE,
836 STREAM_WRITEABLE(fnc->obuf));
837
838 return -1;
839 }
840
841 /*
842 * Fill in the FPM header information.
843 *
844 * See FPM_HEADER_SIZE definition for more information.
845 */
846 stream_putc(fnc->obuf, 1);
847 stream_putc(fnc->obuf, 1);
848 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
849
850 /* Write current data. */
851 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
852
853 /* Account number of bytes waiting to be written. */
854 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
855 nl_buf_len + FPM_HEADER_SIZE,
856 memory_order_relaxed);
857 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
858 memory_order_relaxed);
859 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
860 memory_order_relaxed);
861 if (obytes_peak < obytes)
862 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
863 memory_order_relaxed);
864
865 /* Tell the thread to start writing. */
866 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
867 &fnc->t_write);
868
869 return 0;
870 }
871
872 /*
873 * LSP walk/send functions
874 */
875 struct fpm_lsp_arg {
876 struct zebra_dplane_ctx *ctx;
877 struct fpm_nl_ctx *fnc;
878 bool complete;
879 };
880
881 static int fpm_lsp_send_cb(struct hash_bucket *bucket, void *arg)
882 {
883 struct zebra_lsp *lsp = bucket->data;
884 struct fpm_lsp_arg *fla = arg;
885
886 /* Skip entries which have already been sent */
887 if (CHECK_FLAG(lsp->flags, LSP_FLAG_FPM))
888 return HASHWALK_CONTINUE;
889
890 dplane_ctx_reset(fla->ctx);
891 dplane_ctx_lsp_init(fla->ctx, DPLANE_OP_LSP_INSTALL, lsp);
892
893 if (fpm_nl_enqueue(fla->fnc, fla->ctx) == -1) {
894 fla->complete = false;
895 return HASHWALK_ABORT;
896 }
897
898 /* Mark entry as sent */
899 SET_FLAG(lsp->flags, LSP_FLAG_FPM);
900 return HASHWALK_CONTINUE;
901 }
902
903 static void fpm_lsp_send(struct thread *t)
904 {
905 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
906 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
907 struct fpm_lsp_arg fla;
908
909 fla.fnc = fnc;
910 fla.ctx = dplane_ctx_alloc();
911 fla.complete = true;
912
913 hash_walk(zvrf->lsp_table, fpm_lsp_send_cb, &fla);
914
915 dplane_ctx_fini(&fla.ctx);
916
917 if (fla.complete) {
918 WALK_FINISH(fnc, FNE_LSP_FINISHED);
919
920 /* Now move onto routes */
921 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
922 &fnc->t_nhgreset);
923 } else {
924 /* Didn't finish - reschedule LSP walk */
925 thread_add_timer(zrouter.master, fpm_lsp_send, fnc, 0,
926 &fnc->t_lspwalk);
927 }
928 }
929
930 /*
931 * Next hop walk/send functions.
932 */
933 struct fpm_nhg_arg {
934 struct zebra_dplane_ctx *ctx;
935 struct fpm_nl_ctx *fnc;
936 bool complete;
937 };
938
939 static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
940 {
941 struct nhg_hash_entry *nhe = bucket->data;
942 struct fpm_nhg_arg *fna = arg;
943
944 /* This entry was already sent, skip it. */
945 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
946 return HASHWALK_CONTINUE;
947
948 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
949 dplane_ctx_reset(fna->ctx);
950 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
951 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
952 /* Our buffers are full, lets give it some cycles. */
953 fna->complete = false;
954 return HASHWALK_ABORT;
955 }
956
957 /* Mark group as sent, so it doesn't get sent again. */
958 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
959
960 return HASHWALK_CONTINUE;
961 }
962
963 static void fpm_nhg_send(struct thread *t)
964 {
965 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
966 struct fpm_nhg_arg fna;
967
968 fna.fnc = fnc;
969 fna.ctx = dplane_ctx_alloc();
970 fna.complete = true;
971
972 /* Send next hops. */
973 if (fnc->use_nhg)
974 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
975
976 /* `free()` allocated memory. */
977 dplane_ctx_fini(&fna.ctx);
978
979 /* We are done sending next hops, lets install the routes now. */
980 if (fna.complete) {
981 WALK_FINISH(fnc, FNE_NHG_FINISHED);
982 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
983 &fnc->t_ribreset);
984 } else /* Otherwise reschedule next hop group again. */
985 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
986 &fnc->t_nhgwalk);
987 }
988
989 /**
990 * Send all RIB installed routes to the connected data plane.
991 */
992 static void fpm_rib_send(struct thread *t)
993 {
994 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
995 rib_dest_t *dest;
996 struct route_node *rn;
997 struct route_table *rt;
998 struct zebra_dplane_ctx *ctx;
999 rib_tables_iter_t rt_iter;
1000
1001 /* Allocate temporary context for all transactions. */
1002 ctx = dplane_ctx_alloc();
1003
1004 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1005 while ((rt = rib_tables_iter_next(&rt_iter))) {
1006 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1007 dest = rib_dest_from_rnode(rn);
1008 /* Skip bad route entries. */
1009 if (dest == NULL || dest->selected_fib == NULL)
1010 continue;
1011
1012 /* Check for already sent routes. */
1013 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
1014 continue;
1015
1016 /* Enqueue route install. */
1017 dplane_ctx_reset(ctx);
1018 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
1019 dest->selected_fib);
1020 if (fpm_nl_enqueue(fnc, ctx) == -1) {
1021 /* Free the temporary allocated context. */
1022 dplane_ctx_fini(&ctx);
1023
1024 thread_add_timer(zrouter.master, fpm_rib_send,
1025 fnc, 1, &fnc->t_ribwalk);
1026 return;
1027 }
1028
1029 /* Mark as sent. */
1030 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1031 }
1032 }
1033
1034 /* Free the temporary allocated context. */
1035 dplane_ctx_fini(&ctx);
1036
1037 /* All RIB routes sent! */
1038 WALK_FINISH(fnc, FNE_RIB_FINISHED);
1039
1040 /* Schedule next event: RMAC reset. */
1041 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
1042 &fnc->t_rmacreset);
1043 }
1044
1045 /*
1046 * The next three functions will handle RMAC enqueue.
1047 */
1048 struct fpm_rmac_arg {
1049 struct zebra_dplane_ctx *ctx;
1050 struct fpm_nl_ctx *fnc;
1051 struct zebra_l3vni *zl3vni;
1052 bool complete;
1053 };
1054
1055 static void fpm_enqueue_rmac_table(struct hash_bucket *bucket, void *arg)
1056 {
1057 struct fpm_rmac_arg *fra = arg;
1058 struct zebra_mac *zrmac = bucket->data;
1059 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1060 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1061 struct zebra_if *br_zif;
1062 vlanid_t vid;
1063 bool sticky;
1064
1065 /* Entry already sent. */
1066 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
1067 return;
1068
1069 sticky = !!CHECK_FLAG(zrmac->flags,
1070 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1071 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1072 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1073
1074 dplane_ctx_reset(fra->ctx);
1075 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1076 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
1077 zif->brslave_info.br_if, vid,
1078 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky,
1079 0 /*nhg*/, 0 /*update_flags*/);
1080 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
1081 thread_add_timer(zrouter.master, fpm_rmac_send,
1082 fra->fnc, 1, &fra->fnc->t_rmacwalk);
1083 fra->complete = false;
1084 }
1085 }
1086
1087 static void fpm_enqueue_l3vni_table(struct hash_bucket *bucket, void *arg)
1088 {
1089 struct fpm_rmac_arg *fra = arg;
1090 struct zebra_l3vni *zl3vni = bucket->data;
1091
1092 fra->zl3vni = zl3vni;
1093 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1094 }
1095
1096 static void fpm_rmac_send(struct thread *t)
1097 {
1098 struct fpm_rmac_arg fra;
1099
1100 fra.fnc = THREAD_ARG(t);
1101 fra.ctx = dplane_ctx_alloc();
1102 fra.complete = true;
1103 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1104 dplane_ctx_fini(&fra.ctx);
1105
1106 /* RMAC walk completed. */
1107 if (fra.complete)
1108 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
1109 }
1110
1111 /*
1112 * Resets the next hop FPM flags so we send all next hops again.
1113 */
1114 static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1115 {
1116 struct nhg_hash_entry *nhe = bucket->data;
1117
1118 /* Unset FPM installation flag so it gets installed again. */
1119 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1120 }
1121
1122 static void fpm_nhg_reset(struct thread *t)
1123 {
1124 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1125
1126 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
1127
1128 /* Schedule next step: send next hop groups. */
1129 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
1130 }
1131
1132 /*
1133 * Resets the LSP FPM flag so we send all LSPs again.
1134 */
1135 static void fpm_lsp_reset_cb(struct hash_bucket *bucket, void *arg)
1136 {
1137 struct zebra_lsp *lsp = bucket->data;
1138
1139 UNSET_FLAG(lsp->flags, LSP_FLAG_FPM);
1140 }
1141
1142 static void fpm_lsp_reset(struct thread *t)
1143 {
1144 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1145 struct zebra_vrf *zvrf = vrf_info_lookup(VRF_DEFAULT);
1146
1147 hash_iterate(zvrf->lsp_table, fpm_lsp_reset_cb, NULL);
1148
1149 /* Schedule next step: send LSPs */
1150 thread_add_event(zrouter.master, fpm_lsp_send, fnc, 0, &fnc->t_lspwalk);
1151 }
1152
1153 /**
1154 * Resets the RIB FPM flags so we send all routes again.
1155 */
1156 static void fpm_rib_reset(struct thread *t)
1157 {
1158 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1159 rib_dest_t *dest;
1160 struct route_node *rn;
1161 struct route_table *rt;
1162 rib_tables_iter_t rt_iter;
1163
1164 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1165 while ((rt = rib_tables_iter_next(&rt_iter))) {
1166 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1167 dest = rib_dest_from_rnode(rn);
1168 /* Skip bad route entries. */
1169 if (dest == NULL)
1170 continue;
1171
1172 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1173 }
1174 }
1175
1176 /* Schedule next step: send RIB routes. */
1177 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
1178 }
1179
1180 /*
1181 * The next three function will handle RMAC table reset.
1182 */
1183 static void fpm_unset_rmac_table(struct hash_bucket *bucket, void *arg)
1184 {
1185 struct zebra_mac *zrmac = bucket->data;
1186
1187 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1188 }
1189
1190 static void fpm_unset_l3vni_table(struct hash_bucket *bucket, void *arg)
1191 {
1192 struct zebra_l3vni *zl3vni = bucket->data;
1193
1194 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1195 }
1196
1197 static void fpm_rmac_reset(struct thread *t)
1198 {
1199 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1200
1201 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1202
1203 /* Schedule next event: send RMAC entries. */
1204 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1205 &fnc->t_rmacwalk);
1206 }
1207
1208 static void fpm_process_queue(struct thread *t)
1209 {
1210 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1211 struct zebra_dplane_ctx *ctx;
1212 bool no_bufs = false;
1213 uint64_t processed_contexts = 0;
1214
1215 while (true) {
1216 /* No space available yet. */
1217 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE) {
1218 no_bufs = true;
1219 break;
1220 }
1221
1222 /* Dequeue next item or quit processing. */
1223 frr_with_mutex (&fnc->ctxqueue_mutex) {
1224 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1225 }
1226 if (ctx == NULL)
1227 break;
1228
1229 /*
1230 * Intentionally ignoring the return value
1231 * as that we are ensuring that we can write to
1232 * the output data in the STREAM_WRITEABLE
1233 * check above, so we can ignore the return
1234 */
1235 if (fnc->socket != -1)
1236 (void)fpm_nl_enqueue(fnc, ctx);
1237
1238 /* Account the processed entries. */
1239 processed_contexts++;
1240 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1241 memory_order_relaxed);
1242
1243 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1244 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1245 }
1246
1247 /* Update count of processed contexts */
1248 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts,
1249 processed_contexts, memory_order_relaxed);
1250
1251 /* Re-schedule if we ran out of buffer space */
1252 if (no_bufs)
1253 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1254 fnc, 0, &fnc->t_dequeue);
1255
1256 /*
1257 * Let the dataplane thread know if there are items in the
1258 * output queue to be processed. Otherwise they may sit
1259 * until the dataplane thread gets scheduled for new,
1260 * unrelated work.
1261 */
1262 if (dplane_provider_out_ctx_queue_len(fnc->prov) > 0)
1263 dplane_provider_work_ready();
1264 }
1265
1266 /**
1267 * Handles external (e.g. CLI, data plane or others) events.
1268 */
1269 static void fpm_process_event(struct thread *t)
1270 {
1271 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1272 int event = THREAD_VAL(t);
1273
1274 switch (event) {
1275 case FNE_DISABLE:
1276 zlog_info("%s: manual FPM disable event", __func__);
1277 fnc->disabled = true;
1278 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1279 memory_order_relaxed);
1280
1281 /* Call reconnect to disable timers and clean up context. */
1282 fpm_reconnect(fnc);
1283 break;
1284
1285 case FNE_RECONNECT:
1286 zlog_info("%s: manual FPM reconnect event", __func__);
1287 fnc->disabled = false;
1288 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1289 memory_order_relaxed);
1290 fpm_reconnect(fnc);
1291 break;
1292
1293 case FNE_RESET_COUNTERS:
1294 zlog_info("%s: manual FPM counters reset event", __func__);
1295 memset(&fnc->counters, 0, sizeof(fnc->counters));
1296 break;
1297
1298 case FNE_TOGGLE_NHG:
1299 zlog_info("%s: toggle next hop groups support", __func__);
1300 fnc->use_nhg = !fnc->use_nhg;
1301 fpm_reconnect(fnc);
1302 break;
1303
1304 case FNE_INTERNAL_RECONNECT:
1305 fpm_reconnect(fnc);
1306 break;
1307
1308 case FNE_NHG_FINISHED:
1309 if (IS_ZEBRA_DEBUG_FPM)
1310 zlog_debug("%s: next hop groups walk finished",
1311 __func__);
1312 break;
1313 case FNE_RIB_FINISHED:
1314 if (IS_ZEBRA_DEBUG_FPM)
1315 zlog_debug("%s: RIB walk finished", __func__);
1316 break;
1317 case FNE_RMAC_FINISHED:
1318 if (IS_ZEBRA_DEBUG_FPM)
1319 zlog_debug("%s: RMAC walk finished", __func__);
1320 break;
1321 case FNE_LSP_FINISHED:
1322 if (IS_ZEBRA_DEBUG_FPM)
1323 zlog_debug("%s: LSP walk finished", __func__);
1324 break;
1325
1326 default:
1327 if (IS_ZEBRA_DEBUG_FPM)
1328 zlog_debug("%s: unhandled event %d", __func__, event);
1329 break;
1330 }
1331 }
1332
1333 /*
1334 * Data plane functions.
1335 */
1336 static int fpm_nl_start(struct zebra_dplane_provider *prov)
1337 {
1338 struct fpm_nl_ctx *fnc;
1339
1340 fnc = dplane_provider_get_data(prov);
1341 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1342 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1343 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1344 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1345 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1346 fnc->socket = -1;
1347 fnc->disabled = true;
1348 fnc->prov = prov;
1349 TAILQ_INIT(&fnc->ctxqueue);
1350 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
1351
1352 /* Set default values. */
1353 fnc->use_nhg = true;
1354
1355 return 0;
1356 }
1357
1358 static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
1359 {
1360 /* Disable all events and close socket. */
1361 THREAD_OFF(fnc->t_lspreset);
1362 THREAD_OFF(fnc->t_lspwalk);
1363 THREAD_OFF(fnc->t_nhgreset);
1364 THREAD_OFF(fnc->t_nhgwalk);
1365 THREAD_OFF(fnc->t_ribreset);
1366 THREAD_OFF(fnc->t_ribwalk);
1367 THREAD_OFF(fnc->t_rmacreset);
1368 THREAD_OFF(fnc->t_rmacwalk);
1369 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1370 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1371 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
1372
1373 if (fnc->socket != -1) {
1374 close(fnc->socket);
1375 fnc->socket = -1;
1376 }
1377
1378 return 0;
1379 }
1380
1381 static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1382 {
1383 /* Stop the running thread. */
1384 frr_pthread_stop(fnc->fthread, NULL);
1385
1386 /* Free all allocated resources. */
1387 pthread_mutex_destroy(&fnc->obuf_mutex);
1388 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
1389 stream_free(fnc->ibuf);
1390 stream_free(fnc->obuf);
1391 free(gfnc);
1392 gfnc = NULL;
1393
1394 return 0;
1395 }
1396
1397 static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1398 {
1399 struct fpm_nl_ctx *fnc;
1400
1401 fnc = dplane_provider_get_data(prov);
1402 if (early)
1403 return fpm_nl_finish_early(fnc);
1404
1405 return fpm_nl_finish_late(fnc);
1406 }
1407
1408 static int fpm_nl_process(struct zebra_dplane_provider *prov)
1409 {
1410 struct zebra_dplane_ctx *ctx;
1411 struct fpm_nl_ctx *fnc;
1412 int counter, limit;
1413 uint64_t cur_queue, peak_queue = 0, stored_peak_queue;
1414
1415 fnc = dplane_provider_get_data(prov);
1416 limit = dplane_provider_get_work_limit(prov);
1417 for (counter = 0; counter < limit; counter++) {
1418 ctx = dplane_provider_dequeue_in_ctx(prov);
1419 if (ctx == NULL)
1420 break;
1421
1422 /*
1423 * Skip all notifications if not connected, we'll walk the RIB
1424 * anyway.
1425 */
1426 if (fnc->socket != -1 && fnc->connecting == false) {
1427 /*
1428 * Update the number of queued contexts *before*
1429 * enqueueing, to ensure counter consistency.
1430 */
1431 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1432 1, memory_order_relaxed);
1433
1434 frr_with_mutex (&fnc->ctxqueue_mutex) {
1435 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1436 }
1437
1438 cur_queue = atomic_load_explicit(
1439 &fnc->counters.ctxqueue_len,
1440 memory_order_relaxed);
1441 if (peak_queue < cur_queue)
1442 peak_queue = cur_queue;
1443 continue;
1444 }
1445
1446 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1447 dplane_provider_enqueue_out_ctx(prov, ctx);
1448 }
1449
1450 /* Update peak queue length, if we just observed a new peak */
1451 stored_peak_queue = atomic_load_explicit(
1452 &fnc->counters.ctxqueue_len_peak, memory_order_relaxed);
1453 if (stored_peak_queue < peak_queue)
1454 atomic_store_explicit(&fnc->counters.ctxqueue_len_peak,
1455 peak_queue, memory_order_relaxed);
1456
1457 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1458 memory_order_relaxed)
1459 > 0)
1460 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1461 fnc, 0, &fnc->t_dequeue);
1462
1463 /* Ensure dataplane thread is rescheduled if we hit the work limit */
1464 if (counter >= limit)
1465 dplane_provider_work_ready();
1466
1467 return 0;
1468 }
1469
1470 static int fpm_nl_new(struct thread_master *tm)
1471 {
1472 struct zebra_dplane_provider *prov = NULL;
1473 int rv;
1474
1475 gfnc = calloc(1, sizeof(*gfnc));
1476 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1477 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
1478 fpm_nl_process, fpm_nl_finish, gfnc,
1479 &prov);
1480
1481 if (IS_ZEBRA_DEBUG_DPLANE)
1482 zlog_debug("%s register status: %d", prov_name, rv);
1483
1484 install_node(&fpm_node);
1485 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1486 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1487 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
1488 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1489 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
1490 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1491 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
1492
1493 return 0;
1494 }
1495
1496 static int fpm_nl_init(void)
1497 {
1498 hook_register(frr_late_init, fpm_nl_new);
1499 return 0;
1500 }
1501
1502 FRR_MODULE_SETUP(
1503 .name = "dplane_fpm_nl",
1504 .version = "0.0.1",
1505 .description = "Data plane plugin for FPM using netlink.",
1506 .init = fpm_nl_init,
1507 );