]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
zebra: clean up netlink api
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
018e77bc 46#include "zebra/zebra_router.h"
bda10adf 47#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
48#include "zebra/kernel_netlink.h"
49#include "zebra/rt_netlink.h"
50#include "zebra/debug.h"
51
52#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
53#define SOUTHBOUND_DEFAULT_PORT 2620
54
a179ba35
RZ
55/**
56 * FPM header:
57 * {
58 * version: 1 byte (always 1),
59 * type: 1 byte (1 for netlink, 2 protobuf),
60 * len: 2 bytes (network order),
61 * }
62 *
63 * This header is used with any format to tell the users how many bytes to
64 * expect.
65 */
66#define FPM_HEADER_SIZE 4
67
d35f447d
RZ
68static const char *prov_name = "dplane_fpm_nl";
69
70struct fpm_nl_ctx {
71 /* data plane connection. */
72 int socket;
3bdd7fca 73 bool disabled;
d35f447d 74 bool connecting;
018e77bc 75 bool rib_complete;
bda10adf 76 bool rmac_complete;
b55ab92a 77 bool use_nhg;
d35f447d
RZ
78 struct sockaddr_storage addr;
79
80 /* data plane buffers. */
81 struct stream *ibuf;
82 struct stream *obuf;
83 pthread_mutex_t obuf_mutex;
84
ba803a2f
RZ
85 /*
86 * data plane context queue:
87 * When a FPM server connection becomes a bottleneck, we must keep the
88 * data plane contexts until we get a chance to process them.
89 */
90 struct dplane_ctx_q ctxqueue;
91 pthread_mutex_t ctxqueue_mutex;
92
d35f447d 93 /* data plane events. */
ba803a2f 94 struct zebra_dplane_provider *prov;
d35f447d
RZ
95 struct frr_pthread *fthread;
96 struct thread *t_connect;
97 struct thread *t_read;
98 struct thread *t_write;
3bdd7fca 99 struct thread *t_event;
ba803a2f 100 struct thread *t_dequeue;
018e77bc
RZ
101
102 /* zebra events. */
981ca597
RZ
103 struct thread *t_nhgreset;
104 struct thread *t_nhgwalk;
018e77bc
RZ
105 struct thread *t_ribreset;
106 struct thread *t_ribwalk;
bda10adf
RZ
107 struct thread *t_rmacreset;
108 struct thread *t_rmacwalk;
6cc059cd
RZ
109
110 /* Statistic counters. */
111 struct {
112 /* Amount of bytes read into ibuf. */
770a8d28 113 _Atomic uint32_t bytes_read;
6cc059cd 114 /* Amount of bytes written from obuf. */
770a8d28 115 _Atomic uint32_t bytes_sent;
ad4d1022 116 /* Output buffer current usage. */
770a8d28 117 _Atomic uint32_t obuf_bytes;
ad4d1022 118 /* Output buffer peak usage. */
770a8d28 119 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
120
121 /* Amount of connection closes. */
770a8d28 122 _Atomic uint32_t connection_closes;
6cc059cd 123 /* Amount of connection errors. */
770a8d28 124 _Atomic uint32_t connection_errors;
6cc059cd
RZ
125
126 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 127 _Atomic uint32_t user_configures;
6cc059cd 128 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 129 _Atomic uint32_t user_disables;
6cc059cd
RZ
130
131 /* Amount of data plane context processed. */
770a8d28 132 _Atomic uint32_t dplane_contexts;
ba803a2f 133 /* Amount of data plane contexts enqueued. */
770a8d28 134 _Atomic uint32_t ctxqueue_len;
ba803a2f 135 /* Peak amount of data plane contexts enqueued. */
770a8d28 136 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
137
138 /* Amount of buffer full events. */
770a8d28 139 _Atomic uint32_t buffer_full;
6cc059cd 140 } counters;
770a8d28 141} *gfnc;
3bdd7fca
RZ
142
143enum fpm_nl_events {
144 /* Ask for FPM to reconnect the external server. */
145 FNE_RECONNECT,
146 /* Disable FPM. */
147 FNE_DISABLE,
6cc059cd
RZ
148 /* Reset counters. */
149 FNE_RESET_COUNTERS,
b55ab92a
RZ
150 /* Toggle next hop group feature. */
151 FNE_TOGGLE_NHG,
d35f447d
RZ
152};
153
018e77bc
RZ
154/*
155 * Prototypes.
156 */
3bdd7fca 157static int fpm_process_event(struct thread *t);
018e77bc 158static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
981ca597
RZ
159static int fpm_nhg_send(struct thread *t);
160static int fpm_nhg_reset(struct thread *t);
018e77bc
RZ
161static int fpm_rib_send(struct thread *t);
162static int fpm_rib_reset(struct thread *t);
bda10adf
RZ
163static int fpm_rmac_send(struct thread *t);
164static int fpm_rmac_reset(struct thread *t);
018e77bc 165
ad4d1022
RZ
166/*
167 * Helper functions.
168 */
169
170/**
171 * Reorganizes the data on the buffer so it can fit more data.
172 *
173 * @param s stream pointer.
174 */
175static void stream_pulldown(struct stream *s)
176{
177 size_t rlen = STREAM_READABLE(s);
178
179 /* No more data, so just move the pointers. */
180 if (rlen == 0) {
181 stream_reset(s);
182 return;
183 }
184
185 /* Move the available data to the beginning. */
186 memmove(s->data, &s->data[s->getp], rlen);
187 s->getp = 0;
188 s->endp = rlen;
189}
190
3bdd7fca
RZ
191/*
192 * CLI.
193 */
6cc059cd
RZ
194#define FPM_STR "Forwarding Plane Manager configuration\n"
195
3bdd7fca
RZ
196DEFUN(fpm_set_address, fpm_set_address_cmd,
197 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 198 FPM_STR
3bdd7fca
RZ
199 "FPM remote listening server address\n"
200 "Remote IPv4 FPM server\n"
201 "Remote IPv6 FPM server\n"
202 "FPM remote listening server port\n"
203 "Remote FPM server port\n")
204{
205 struct sockaddr_in *sin;
206 struct sockaddr_in6 *sin6;
207 uint16_t port = 0;
208 uint8_t naddr[INET6_BUFSIZ];
209
210 if (argc == 5)
211 port = strtol(argv[4]->arg, NULL, 10);
212
213 /* Handle IPv4 addresses. */
214 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
215 sin = (struct sockaddr_in *)&gfnc->addr;
216
217 memset(sin, 0, sizeof(*sin));
218 sin->sin_family = AF_INET;
219 sin->sin_port =
220 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
221#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
222 sin->sin_len = sizeof(*sin);
223#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
224 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
225
226 goto ask_reconnect;
227 }
228
229 /* Handle IPv6 addresses. */
230 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
231 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
232 return CMD_WARNING;
233 }
234
235 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
236 memset(sin6, 0, sizeof(*sin6));
237 sin6->sin6_family = AF_INET6;
238 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
239#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
240 sin6->sin6_len = sizeof(*sin6);
241#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
242 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
243
244ask_reconnect:
245 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
246 FNE_RECONNECT, &gfnc->t_event);
247 return CMD_SUCCESS;
248}
249
250DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
251 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
252 NO_STR
6cc059cd 253 FPM_STR
3bdd7fca
RZ
254 "FPM remote listening server address\n"
255 "Remote IPv4 FPM server\n"
256 "Remote IPv6 FPM server\n"
257 "FPM remote listening server port\n"
258 "Remote FPM server port\n")
259{
260 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
261 FNE_DISABLE, &gfnc->t_event);
262 return CMD_SUCCESS;
263}
264
b55ab92a
RZ
265DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
266 "fpm use-next-hop-groups",
267 FPM_STR
268 "Use netlink next hop groups feature.\n")
269{
270 /* Already enabled. */
271 if (gfnc->use_nhg)
272 return CMD_SUCCESS;
273
274 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
275 FNE_TOGGLE_NHG, &gfnc->t_event);
276
277 return CMD_SUCCESS;
278}
279
280DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
281 "no fpm use-next-hop-groups",
282 NO_STR
283 FPM_STR
284 "Use netlink next hop groups feature.\n")
285{
286 /* Already disabled. */
287 if (!gfnc->use_nhg)
288 return CMD_SUCCESS;
289
290 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
291 FNE_TOGGLE_NHG, &gfnc->t_event);
292
293 return CMD_SUCCESS;
294}
295
6cc059cd
RZ
296DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
297 "clear fpm counters",
298 CLEAR_STR
299 FPM_STR
300 "FPM statistic counters\n")
301{
302 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
303 FNE_RESET_COUNTERS, &gfnc->t_event);
304 return CMD_SUCCESS;
305}
306
307DEFUN(fpm_show_counters, fpm_show_counters_cmd,
308 "show fpm counters",
309 SHOW_STR
310 FPM_STR
311 "FPM statistic counters\n")
312{
313 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
314
315#define SHOW_COUNTER(label, counter) \
770a8d28 316 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
317
318 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
319 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
320 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
321 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
322 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
323 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
324 SHOW_COUNTER("Data plane items processed",
325 gfnc->counters.dplane_contexts);
ba803a2f
RZ
326 SHOW_COUNTER("Data plane items enqueued",
327 gfnc->counters.ctxqueue_len);
328 SHOW_COUNTER("Data plane items queue peak",
329 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
330 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
331 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
332 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
333
334#undef SHOW_COUNTER
335
336 return CMD_SUCCESS;
337}
338
339DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
340 "show fpm counters json",
341 SHOW_STR
342 FPM_STR
343 "FPM statistic counters\n"
344 JSON_STR)
345{
346 struct json_object *jo;
347
348 jo = json_object_new_object();
349 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
350 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
351 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
352 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
353 json_object_int_add(jo, "connection-closes",
354 gfnc->counters.connection_closes);
355 json_object_int_add(jo, "connection-errors",
356 gfnc->counters.connection_errors);
357 json_object_int_add(jo, "data-plane-contexts",
358 gfnc->counters.dplane_contexts);
ba803a2f
RZ
359 json_object_int_add(jo, "data-plane-contexts-queue",
360 gfnc->counters.ctxqueue_len);
361 json_object_int_add(jo, "data-plane-contexts-queue-peak",
362 gfnc->counters.ctxqueue_len_peak);
6cc059cd 363 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
364 json_object_int_add(jo, "user-configures",
365 gfnc->counters.user_configures);
6cc059cd
RZ
366 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
367 vty_out(vty, "%s\n", json_object_to_json_string_ext(jo, 0));
368 json_object_free(jo);
369
370 return CMD_SUCCESS;
371}
372
3bdd7fca
RZ
373static int fpm_write_config(struct vty *vty)
374{
375 struct sockaddr_in *sin;
376 struct sockaddr_in6 *sin6;
377 int written = 0;
378 char addrstr[INET6_ADDRSTRLEN];
379
380 if (gfnc->disabled)
381 return written;
382
383 switch (gfnc->addr.ss_family) {
384 case AF_INET:
385 written = 1;
386 sin = (struct sockaddr_in *)&gfnc->addr;
387 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
388 vty_out(vty, "fpm address %s", addrstr);
389 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
390 vty_out(vty, " port %d", ntohs(sin->sin_port));
391
392 vty_out(vty, "\n");
393 break;
394 case AF_INET6:
395 written = 1;
396 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
397 inet_ntop(AF_INET, &sin6->sin6_addr, addrstr, sizeof(addrstr));
398 vty_out(vty, "fpm address %s", addrstr);
399 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
400 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
401
402 vty_out(vty, "\n");
403 break;
404
405 default:
406 break;
407 }
408
b55ab92a
RZ
409 if (!gfnc->use_nhg) {
410 vty_out(vty, "no fpm use-next-hop-groups\n");
411 written = 1;
412 }
413
3bdd7fca
RZ
414 return written;
415}
416
612c2c15 417static struct cmd_node fpm_node = {
893d8beb
DL
418 .name = "fpm",
419 .node = FPM_NODE,
3bdd7fca 420 .prompt = "",
612c2c15 421 .config_write = fpm_write_config,
3bdd7fca
RZ
422};
423
d35f447d
RZ
424/*
425 * FPM functions.
426 */
427static int fpm_connect(struct thread *t);
428
429static void fpm_reconnect(struct fpm_nl_ctx *fnc)
430{
431 /* Grab the lock to empty the stream and stop the zebra thread. */
432 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
433
3bdd7fca
RZ
434 /* Avoid calling close on `-1`. */
435 if (fnc->socket != -1) {
436 close(fnc->socket);
437 fnc->socket = -1;
438 }
439
d35f447d
RZ
440 stream_reset(fnc->ibuf);
441 stream_reset(fnc->obuf);
442 THREAD_OFF(fnc->t_read);
443 THREAD_OFF(fnc->t_write);
018e77bc 444
981ca597
RZ
445 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
446 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
c69e7ab7
RZ
447 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
448 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
449 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
450 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
018e77bc 451
3bdd7fca
RZ
452 /* FPM is disabled, don't attempt to connect. */
453 if (fnc->disabled)
454 return;
455
d35f447d
RZ
456 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
457 &fnc->t_connect);
458}
459
460static int fpm_read(struct thread *t)
461{
462 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
463 ssize_t rv;
464
465 /* Let's ignore the input at the moment. */
466 rv = stream_read_try(fnc->ibuf, fnc->socket,
467 STREAM_WRITEABLE(fnc->ibuf));
468 if (rv == 0) {
c871e6c9
RZ
469 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
470 memory_order_relaxed);
e5e444d8
RZ
471
472 if (IS_ZEBRA_DEBUG_FPM)
473 zlog_debug("%s: connection closed", __func__);
474
d35f447d
RZ
475 fpm_reconnect(fnc);
476 return 0;
477 }
478 if (rv == -1) {
479 if (errno == EAGAIN || errno == EWOULDBLOCK
480 || errno == EINTR)
481 return 0;
482
c871e6c9
RZ
483 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
484 memory_order_relaxed);
e5e444d8
RZ
485 zlog_warn("%s: connection failure: %s", __func__,
486 strerror(errno));
d35f447d
RZ
487 fpm_reconnect(fnc);
488 return 0;
489 }
490 stream_reset(fnc->ibuf);
491
6cc059cd 492 /* Account all bytes read. */
c871e6c9
RZ
493 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
494 memory_order_relaxed);
6cc059cd 495
d35f447d
RZ
496 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
497 &fnc->t_read);
498
499 return 0;
500}
501
502static int fpm_write(struct thread *t)
503{
504 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
505 socklen_t statuslen;
506 ssize_t bwritten;
507 int rv, status;
508 size_t btotal;
509
510 if (fnc->connecting == true) {
511 status = 0;
512 statuslen = sizeof(status);
513
514 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
515 &statuslen);
516 if (rv == -1 || status != 0) {
517 if (rv != -1)
e5e444d8
RZ
518 zlog_warn("%s: connection failed: %s", __func__,
519 strerror(status));
d35f447d 520 else
e5e444d8
RZ
521 zlog_warn("%s: SO_ERROR failed: %s", __func__,
522 strerror(status));
d35f447d 523
c871e6c9
RZ
524 atomic_fetch_add_explicit(
525 &fnc->counters.connection_errors, 1,
526 memory_order_relaxed);
6cc059cd 527
d35f447d
RZ
528 fpm_reconnect(fnc);
529 return 0;
530 }
531
532 fnc->connecting = false;
018e77bc 533
b55ab92a
RZ
534 /*
535 * Walk the route tables to send old information before starting
536 * to send updated information.
537 *
538 * NOTE 1:
539 * RIB table walk is called after the next group table walk
540 * ends.
541 *
542 * NOTE 2:
543 * Don't attempt to go through next hop group table if we were
544 * explictly told to not use it.
545 */
546 if (fnc->use_nhg)
547 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
548 &fnc->t_nhgwalk);
549 else
550 thread_add_timer(zrouter.master, fpm_rib_send, fnc, 0,
551 &fnc->t_ribwalk);
552
bda10adf
RZ
553 thread_add_timer(zrouter.master, fpm_rmac_send, fnc, 0,
554 &fnc->t_rmacwalk);
d35f447d
RZ
555 }
556
557 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
558
559 while (true) {
560 /* Stream is empty: reset pointers and return. */
561 if (STREAM_READABLE(fnc->obuf) == 0) {
562 stream_reset(fnc->obuf);
563 break;
564 }
565
566 /* Try to write all at once. */
567 btotal = stream_get_endp(fnc->obuf) -
568 stream_get_getp(fnc->obuf);
569 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
570 if (bwritten == 0) {
c871e6c9
RZ
571 atomic_fetch_add_explicit(
572 &fnc->counters.connection_closes, 1,
573 memory_order_relaxed);
e5e444d8
RZ
574
575 if (IS_ZEBRA_DEBUG_FPM)
576 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
577 break;
578 }
579 if (bwritten == -1) {
ad4d1022
RZ
580 /* Attempt to continue if blocked by a signal. */
581 if (errno == EINTR)
582 continue;
583 /* Receiver is probably slow, lets give it some time. */
584 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
585 break;
586
c871e6c9
RZ
587 atomic_fetch_add_explicit(
588 &fnc->counters.connection_errors, 1,
589 memory_order_relaxed);
e5e444d8
RZ
590 zlog_warn("%s: connection failure: %s", __func__,
591 strerror(errno));
d35f447d
RZ
592 fpm_reconnect(fnc);
593 break;
594 }
595
6cc059cd 596 /* Account all bytes sent. */
c871e6c9
RZ
597 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
598 memory_order_relaxed);
6cc059cd 599
ad4d1022 600 /* Account number of bytes free. */
c871e6c9
RZ
601 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
602 memory_order_relaxed);
ad4d1022 603
d35f447d
RZ
604 stream_forward_getp(fnc->obuf, (size_t)bwritten);
605 }
606
607 /* Stream is not empty yet, we must schedule more writes. */
608 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 609 stream_pulldown(fnc->obuf);
d35f447d
RZ
610 thread_add_write(fnc->fthread->master, fpm_write, fnc,
611 fnc->socket, &fnc->t_write);
612 return 0;
613 }
614
615 return 0;
616}
617
618static int fpm_connect(struct thread *t)
619{
620 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
621 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
622 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
623 socklen_t slen;
d35f447d
RZ
624 int rv, sock;
625 char addrstr[INET6_ADDRSTRLEN];
626
3bdd7fca 627 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 628 if (sock == -1) {
6cc059cd 629 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
630 strerror(errno));
631 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
632 &fnc->t_connect);
633 return 0;
634 }
635
636 set_nonblocking(sock);
637
3bdd7fca
RZ
638 if (fnc->addr.ss_family == AF_INET) {
639 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
640 slen = sizeof(*sin);
641 } else {
642 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
643 slen = sizeof(*sin6);
644 }
d35f447d 645
e5e444d8
RZ
646 if (IS_ZEBRA_DEBUG_FPM)
647 zlog_debug("%s: attempting to connect to %s:%d", __func__,
648 addrstr, ntohs(sin->sin_port));
d35f447d 649
3bdd7fca 650 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 651 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
652 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
653 memory_order_relaxed);
d35f447d
RZ
654 close(sock);
655 zlog_warn("%s: fpm connection failed: %s", __func__,
656 strerror(errno));
657 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
658 &fnc->t_connect);
659 return 0;
660 }
661
662 fnc->connecting = (errno == EINPROGRESS);
663 fnc->socket = sock;
664 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
665 &fnc->t_read);
666 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
667 &fnc->t_write);
668
018e77bc 669 /* Mark all routes as unsent. */
981ca597
RZ
670 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
671 &fnc->t_nhgreset);
018e77bc
RZ
672 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
673 &fnc->t_ribreset);
bda10adf
RZ
674 thread_add_timer(zrouter.master, fpm_rmac_reset, fnc, 0,
675 &fnc->t_rmacreset);
018e77bc 676
d35f447d
RZ
677 return 0;
678}
679
680/**
681 * Encode data plane operation context into netlink and enqueue it in the FPM
682 * output buffer.
683 *
684 * @param fnc the netlink FPM context.
685 * @param ctx the data plane operation context data.
686 * @return 0 on success or -1 on not enough space.
687 */
688static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
689{
690 uint8_t nl_buf[NL_PKT_BUF_SIZE];
691 size_t nl_buf_len;
692 ssize_t rv;
edfeff42 693 uint64_t obytes, obytes_peak;
b55ab92a
RZ
694 enum dplane_op_e op = dplane_ctx_get_op(ctx);
695
696 /*
697 * If we were configured to not use next hop groups, then quit as soon
698 * as possible.
699 */
700 if ((!fnc->use_nhg)
701 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
702 || op == DPLANE_OP_NH_UPDATE))
703 return 0;
d35f447d
RZ
704
705 nl_buf_len = 0;
706
707 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
708
b55ab92a 709 switch (op) {
d35f447d
RZ
710 case DPLANE_OP_ROUTE_UPDATE:
711 case DPLANE_OP_ROUTE_DELETE:
712 rv = netlink_route_multipath(RTM_DELROUTE, ctx, nl_buf,
b55ab92a
RZ
713 sizeof(nl_buf), true,
714 fnc->use_nhg);
d35f447d 715 if (rv <= 0) {
e5e444d8
RZ
716 zlog_err("%s: netlink_route_multipath failed",
717 __func__);
d35f447d
RZ
718 return 0;
719 }
720
721 nl_buf_len = (size_t)rv;
d35f447d
RZ
722
723 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 724 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
725 break;
726
727 /* FALL THROUGH */
728 case DPLANE_OP_ROUTE_INSTALL:
b55ab92a
RZ
729 rv = netlink_route_multipath(
730 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
731 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 732 if (rv <= 0) {
e5e444d8
RZ
733 zlog_err("%s: netlink_route_multipath failed",
734 __func__);
d35f447d
RZ
735 return 0;
736 }
737
738 nl_buf_len += (size_t)rv;
d35f447d
RZ
739 break;
740
bda10adf
RZ
741 case DPLANE_OP_MAC_INSTALL:
742 case DPLANE_OP_MAC_DELETE:
743 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
744 if (rv <= 0) {
e5e444d8
RZ
745 zlog_err("%s: netlink_macfdb_update_ctx failed",
746 __func__);
bda10adf
RZ
747 return 0;
748 }
749
750 nl_buf_len = (size_t)rv;
bda10adf
RZ
751 break;
752
e9a1cd93
RZ
753 case DPLANE_OP_NH_DELETE:
754 rv = netlink_nexthop_encode(RTM_DELNEXTHOP, ctx, nl_buf,
755 sizeof(nl_buf));
756 if (rv <= 0) {
757 zlog_err("%s: netlink_nexthop_encode failed", __func__);
758 return 0;
759 }
760
761 nl_buf_len = (size_t)rv;
762 break;
d35f447d
RZ
763 case DPLANE_OP_NH_INSTALL:
764 case DPLANE_OP_NH_UPDATE:
e9a1cd93
RZ
765 rv = netlink_nexthop_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
766 sizeof(nl_buf));
767 if (rv <= 0) {
768 zlog_err("%s: netlink_nexthop_encode failed", __func__);
769 return 0;
770 }
771
772 nl_buf_len = (size_t)rv;
773 break;
774
d35f447d
RZ
775 case DPLANE_OP_LSP_INSTALL:
776 case DPLANE_OP_LSP_UPDATE:
777 case DPLANE_OP_LSP_DELETE:
778 case DPLANE_OP_PW_INSTALL:
779 case DPLANE_OP_PW_UNINSTALL:
780 case DPLANE_OP_ADDR_INSTALL:
781 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
782 case DPLANE_OP_NEIGH_INSTALL:
783 case DPLANE_OP_NEIGH_UPDATE:
784 case DPLANE_OP_NEIGH_DELETE:
785 case DPLANE_OP_VTEP_ADD:
786 case DPLANE_OP_VTEP_DELETE:
787 case DPLANE_OP_SYS_ROUTE_ADD:
788 case DPLANE_OP_SYS_ROUTE_DELETE:
789 case DPLANE_OP_ROUTE_NOTIFY:
790 case DPLANE_OP_LSP_NOTIFY:
791 case DPLANE_OP_NONE:
792 break;
793
794 default:
e5e444d8
RZ
795 if (IS_ZEBRA_DEBUG_FPM)
796 zlog_debug("%s: unhandled data plane message (%d) %s",
797 __func__, dplane_ctx_get_op(ctx),
798 dplane_op2str(dplane_ctx_get_op(ctx)));
d35f447d
RZ
799 break;
800 }
801
802 /* Skip empty enqueues. */
803 if (nl_buf_len == 0)
804 return 0;
805
a179ba35
RZ
806 /* We must know if someday a message goes beyond 65KiB. */
807 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
808
809 /* Check if we have enough buffer space. */
810 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
811 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
812 memory_order_relaxed);
e5e444d8
RZ
813
814 if (IS_ZEBRA_DEBUG_FPM)
815 zlog_debug(
816 "%s: buffer full: wants to write %zu but has %zu",
817 __func__, nl_buf_len + FPM_HEADER_SIZE,
818 STREAM_WRITEABLE(fnc->obuf));
819
a179ba35
RZ
820 return -1;
821 }
822
d35f447d 823 /*
a179ba35
RZ
824 * Fill in the FPM header information.
825 *
826 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
827 */
828 stream_putc(fnc->obuf, 1);
829 stream_putc(fnc->obuf, 1);
a179ba35 830 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
831
832 /* Write current data. */
833 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
834
ad4d1022 835 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
836 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
837 nl_buf_len + FPM_HEADER_SIZE,
838 memory_order_relaxed);
edfeff42
RZ
839 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
840 memory_order_relaxed);
841 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
842 memory_order_relaxed);
843 if (obytes_peak < obytes)
c871e6c9
RZ
844 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
845 memory_order_relaxed);
ad4d1022 846
d35f447d
RZ
847 /* Tell the thread to start writing. */
848 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
849 &fnc->t_write);
850
851 return 0;
852}
853
981ca597
RZ
854/*
855 * Next hop walk/send functions.
856 */
857struct fpm_nhg_arg {
858 struct zebra_dplane_ctx *ctx;
859 struct fpm_nl_ctx *fnc;
860 bool complete;
861};
862
863static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
864{
865 struct nhg_hash_entry *nhe = bucket->data;
866 struct fpm_nhg_arg *fna = arg;
867
868 /* This entry was already sent, skip it. */
869 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
870 return HASHWALK_CONTINUE;
871
872 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
873 dplane_ctx_reset(fna->ctx);
874 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
875 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
876 /* Our buffers are full, lets give it some cycles. */
877 fna->complete = false;
878 return HASHWALK_ABORT;
879 }
880
881 /* Mark group as sent, so it doesn't get sent again. */
882 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
883
884 return HASHWALK_CONTINUE;
885}
886
887static int fpm_nhg_send(struct thread *t)
888{
889 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
890 struct fpm_nhg_arg fna;
891
892 fna.fnc = fnc;
893 fna.ctx = dplane_ctx_alloc();
894 fna.complete = true;
895
896 /* Send next hops. */
897 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
898
899 /* `free()` allocated memory. */
900 dplane_ctx_fini(&fna.ctx);
901
902 /* We are done sending next hops, lets install the routes now. */
903 if (fna.complete)
904 thread_add_timer(zrouter.master, fpm_rib_send, fnc, 0,
905 &fnc->t_ribwalk);
906 else /* Otherwise reschedule next hop group again. */
907 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
908 &fnc->t_nhgwalk);
909
910 return 0;
911}
912
018e77bc
RZ
913/**
914 * Send all RIB installed routes to the connected data plane.
915 */
916static int fpm_rib_send(struct thread *t)
917{
918 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
919 rib_dest_t *dest;
920 struct route_node *rn;
921 struct route_table *rt;
922 struct zebra_dplane_ctx *ctx;
923 rib_tables_iter_t rt_iter;
924
925 /* Allocate temporary context for all transactions. */
926 ctx = dplane_ctx_alloc();
927
928 rt_iter.state = RIB_TABLES_ITER_S_INIT;
929 while ((rt = rib_tables_iter_next(&rt_iter))) {
930 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
931 dest = rib_dest_from_rnode(rn);
932 /* Skip bad route entries. */
a50404aa 933 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 934 continue;
018e77bc
RZ
935
936 /* Check for already sent routes. */
a50404aa 937 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 938 continue;
018e77bc
RZ
939
940 /* Enqueue route install. */
941 dplane_ctx_reset(ctx);
942 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
943 dest->selected_fib);
944 if (fpm_nl_enqueue(fnc, ctx) == -1) {
945 /* Free the temporary allocated context. */
946 dplane_ctx_fini(&ctx);
947
018e77bc
RZ
948 thread_add_timer(zrouter.master, fpm_rib_send,
949 fnc, 1, &fnc->t_ribwalk);
950 return 0;
951 }
952
953 /* Mark as sent. */
954 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
955 }
956 }
957
958 /* Free the temporary allocated context. */
959 dplane_ctx_fini(&ctx);
960
961 /* All RIB routes sent! */
962 fnc->rib_complete = true;
963
964 return 0;
965}
966
bda10adf
RZ
967/*
968 * The next three functions will handle RMAC enqueue.
969 */
970struct fpm_rmac_arg {
971 struct zebra_dplane_ctx *ctx;
972 struct fpm_nl_ctx *fnc;
973 zebra_l3vni_t *zl3vni;
974};
975
9d5c3268 976static void fpm_enqueue_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
977{
978 struct fpm_rmac_arg *fra = arg;
979 zebra_mac_t *zrmac = backet->data;
980 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
981 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
982 struct zebra_if *br_zif;
983 vlanid_t vid;
984 bool sticky;
985
986 /* Entry already sent. */
987 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT))
988 return;
989
990 sticky = !!CHECK_FLAG(zrmac->flags,
991 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
992 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
993 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
994
995 dplane_ctx_reset(fra->ctx);
996 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
997 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a
RZ
998 zif->brslave_info.br_if, vid,
999 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky);
bda10adf 1000 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1001 thread_add_timer(zrouter.master, fpm_rmac_send,
1002 fra->fnc, 1, &fra->fnc->t_rmacwalk);
1003 }
1004}
1005
9d5c3268 1006static void fpm_enqueue_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1007{
1008 struct fpm_rmac_arg *fra = arg;
1009 zebra_l3vni_t *zl3vni = backet->data;
1010
1011 fra->zl3vni = zl3vni;
1012 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1013}
1014
1015static int fpm_rmac_send(struct thread *t)
1016{
1017 struct fpm_rmac_arg fra;
1018
1019 fra.fnc = THREAD_ARG(t);
1020 fra.ctx = dplane_ctx_alloc();
1021 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1022 dplane_ctx_fini(&fra.ctx);
1023
1024 return 0;
1025}
1026
981ca597
RZ
1027/*
1028 * Resets the next hop FPM flags so we send all next hops again.
1029 */
1030static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1031{
1032 struct nhg_hash_entry *nhe = bucket->data;
1033
1034 /* Unset FPM installation flag so it gets installed again. */
1035 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1036}
1037
1038static int fpm_nhg_reset(struct thread *t)
1039{
1040 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
1041 return 0;
1042}
1043
018e77bc
RZ
1044/**
1045 * Resets the RIB FPM flags so we send all routes again.
1046 */
1047static int fpm_rib_reset(struct thread *t)
1048{
1049 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1050 rib_dest_t *dest;
1051 struct route_node *rn;
1052 struct route_table *rt;
1053 rib_tables_iter_t rt_iter;
1054
1055 fnc->rib_complete = false;
1056
1057 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1058 while ((rt = rib_tables_iter_next(&rt_iter))) {
1059 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1060 dest = rib_dest_from_rnode(rn);
1061 /* Skip bad route entries. */
1062 if (dest == NULL)
1063 continue;
1064
1065 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1066 }
1067 }
1068
1069 return 0;
1070}
1071
bda10adf
RZ
1072/*
1073 * The next three function will handle RMAC table reset.
1074 */
9d5c3268 1075static void fpm_unset_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1076{
1077 zebra_mac_t *zrmac = backet->data;
1078
1079 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1080}
1081
9d5c3268 1082static void fpm_unset_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1083{
1084 zebra_l3vni_t *zl3vni = backet->data;
1085
1086 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1087}
1088
1089static int fpm_rmac_reset(struct thread *t)
1090{
1091 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1092
1093 return 0;
1094}
1095
ba803a2f
RZ
1096static int fpm_process_queue(struct thread *t)
1097{
1098 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1099 struct zebra_dplane_ctx *ctx;
1100
1101 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1102
1103 while (true) {
1104 /* No space available yet. */
1105 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
1106 break;
1107
1108 /* Dequeue next item or quit processing. */
1109 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1110 if (ctx == NULL)
1111 break;
1112
1113 fpm_nl_enqueue(fnc, ctx);
1114
1115 /* Account the processed entries. */
c871e6c9
RZ
1116 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1,
1117 memory_order_relaxed);
1118 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1119 memory_order_relaxed);
ba803a2f
RZ
1120
1121 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1122 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1123 }
1124
1125 /* Check for more items in the queue. */
c871e6c9
RZ
1126 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1127 memory_order_relaxed)
1128 > 0)
ba803a2f
RZ
1129 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1130 fnc, 0, &fnc->t_dequeue);
1131
1132 return 0;
1133}
1134
3bdd7fca
RZ
1135/**
1136 * Handles external (e.g. CLI, data plane or others) events.
1137 */
1138static int fpm_process_event(struct thread *t)
1139{
1140 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1141 int event = THREAD_VAL(t);
1142
1143 switch (event) {
1144 case FNE_DISABLE:
e5e444d8 1145 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1146 fnc->disabled = true;
c871e6c9
RZ
1147 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1148 memory_order_relaxed);
3bdd7fca
RZ
1149
1150 /* Call reconnect to disable timers and clean up context. */
1151 fpm_reconnect(fnc);
1152 break;
1153
1154 case FNE_RECONNECT:
e5e444d8 1155 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1156 fnc->disabled = false;
c871e6c9
RZ
1157 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1158 memory_order_relaxed);
3bdd7fca
RZ
1159 fpm_reconnect(fnc);
1160 break;
1161
6cc059cd 1162 case FNE_RESET_COUNTERS:
e5e444d8 1163 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1164 memset(&fnc->counters, 0, sizeof(fnc->counters));
1165 break;
1166
b55ab92a
RZ
1167 case FNE_TOGGLE_NHG:
1168 zlog_info("%s: toggle next hop groups support", __func__);
1169 fnc->use_nhg = !fnc->use_nhg;
1170 fpm_reconnect(fnc);
1171 break;
1172
3bdd7fca 1173 default:
e5e444d8
RZ
1174 if (IS_ZEBRA_DEBUG_FPM)
1175 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1176 break;
1177 }
1178
1179 return 0;
1180}
1181
d35f447d
RZ
1182/*
1183 * Data plane functions.
1184 */
1185static int fpm_nl_start(struct zebra_dplane_provider *prov)
1186{
1187 struct fpm_nl_ctx *fnc;
1188
1189 fnc = dplane_provider_get_data(prov);
1190 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1191 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1192 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1193 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1194 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1195 fnc->socket = -1;
3bdd7fca 1196 fnc->disabled = true;
ba803a2f
RZ
1197 fnc->prov = prov;
1198 TAILQ_INIT(&fnc->ctxqueue);
1199 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1200
b55ab92a
RZ
1201 /* Set default values. */
1202 fnc->use_nhg = true;
1203
d35f447d
RZ
1204 return 0;
1205}
1206
98a87504 1207static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1208{
98a87504 1209 /* Disable all events and close socket. */
981ca597
RZ
1210 THREAD_OFF(fnc->t_nhgreset);
1211 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1212 THREAD_OFF(fnc->t_ribreset);
1213 THREAD_OFF(fnc->t_ribwalk);
1214 THREAD_OFF(fnc->t_rmacreset);
1215 THREAD_OFF(fnc->t_rmacwalk);
1216 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1217 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1218 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1219
98a87504
RZ
1220 if (fnc->socket != -1) {
1221 close(fnc->socket);
1222 fnc->socket = -1;
1223 }
1224
1225 return 0;
1226}
1227
1228static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1229{
1230 /* Stop the running thread. */
1231 frr_pthread_stop(fnc->fthread, NULL);
1232
1233 /* Free all allocated resources. */
1234 pthread_mutex_destroy(&fnc->obuf_mutex);
1235 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1236 stream_free(fnc->ibuf);
1237 stream_free(fnc->obuf);
98a87504
RZ
1238 free(gfnc);
1239 gfnc = NULL;
d35f447d
RZ
1240
1241 return 0;
1242}
1243
98a87504
RZ
1244static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1245{
1246 struct fpm_nl_ctx *fnc;
1247
1248 fnc = dplane_provider_get_data(prov);
1249 if (early)
1250 return fpm_nl_finish_early(fnc);
1251
1252 return fpm_nl_finish_late(fnc);
1253}
1254
d35f447d
RZ
1255static int fpm_nl_process(struct zebra_dplane_provider *prov)
1256{
1257 struct zebra_dplane_ctx *ctx;
1258 struct fpm_nl_ctx *fnc;
1259 int counter, limit;
edfeff42 1260 uint64_t cur_queue, peak_queue;
d35f447d
RZ
1261
1262 fnc = dplane_provider_get_data(prov);
1263 limit = dplane_provider_get_work_limit(prov);
1264 for (counter = 0; counter < limit; counter++) {
1265 ctx = dplane_provider_dequeue_in_ctx(prov);
1266 if (ctx == NULL)
1267 break;
1268
1269 /*
1270 * Skip all notifications if not connected, we'll walk the RIB
1271 * anyway.
1272 */
6cc059cd 1273 if (fnc->socket != -1 && fnc->connecting == false) {
ba803a2f
RZ
1274 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1275 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1276
1277 /* Account the number of contexts. */
c871e6c9
RZ
1278 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1279 1, memory_order_relaxed);
1280 cur_queue = atomic_load_explicit(
1281 &fnc->counters.ctxqueue_len,
1282 memory_order_relaxed);
1283 peak_queue = atomic_load_explicit(
1284 &fnc->counters.ctxqueue_len_peak,
1285 memory_order_relaxed);
edfeff42 1286 if (peak_queue < cur_queue)
c871e6c9
RZ
1287 atomic_store_explicit(
1288 &fnc->counters.ctxqueue_len_peak,
1289 peak_queue, memory_order_relaxed);
ba803a2f 1290 continue;
6cc059cd
RZ
1291 }
1292
d35f447d
RZ
1293 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1294 dplane_provider_enqueue_out_ctx(prov, ctx);
1295 }
1296
c871e6c9
RZ
1297 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1298 memory_order_relaxed)
1299 > 0)
ba803a2f
RZ
1300 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1301 fnc, 0, &fnc->t_dequeue);
1302
d35f447d
RZ
1303 return 0;
1304}
1305
1306static int fpm_nl_new(struct thread_master *tm)
1307{
1308 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1309 int rv;
1310
3bdd7fca 1311 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1312 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1313 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1314 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1315 &prov);
1316
1317 if (IS_ZEBRA_DEBUG_DPLANE)
1318 zlog_debug("%s register status: %d", prov_name, rv);
1319
612c2c15 1320 install_node(&fpm_node);
6cc059cd
RZ
1321 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1322 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1323 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1324 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1325 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1326 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1327 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1328
d35f447d
RZ
1329 return 0;
1330}
1331
1332static int fpm_nl_init(void)
1333{
1334 hook_register(frr_late_init, fpm_nl_new);
1335 return 0;
1336}
1337
1338FRR_MODULE_SETUP(
1339 .name = "dplane_fpm_nl",
1340 .version = "0.0.1",
1341 .description = "Data plane plugin for FPM using netlink.",
1342 .init = fpm_nl_init,
1343 )