]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
Merge pull request #7614 from donaldsharp/more_use_after_free
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
018e77bc 46#include "zebra/zebra_router.h"
b2998086
PR
47#include "zebra/zebra_evpn.h"
48#include "zebra/zebra_evpn_mac.h"
bda10adf 49#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
50#include "zebra/kernel_netlink.h"
51#include "zebra/rt_netlink.h"
52#include "zebra/debug.h"
53
54#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
55#define SOUTHBOUND_DEFAULT_PORT 2620
56
a179ba35
RZ
57/**
58 * FPM header:
59 * {
60 * version: 1 byte (always 1),
61 * type: 1 byte (1 for netlink, 2 protobuf),
62 * len: 2 bytes (network order),
63 * }
64 *
65 * This header is used with any format to tell the users how many bytes to
66 * expect.
67 */
68#define FPM_HEADER_SIZE 4
69
d35f447d
RZ
70static const char *prov_name = "dplane_fpm_nl";
71
72struct fpm_nl_ctx {
73 /* data plane connection. */
74 int socket;
3bdd7fca 75 bool disabled;
d35f447d 76 bool connecting;
55eb9d4d 77 bool nhg_complete;
018e77bc 78 bool rib_complete;
bda10adf 79 bool rmac_complete;
b55ab92a 80 bool use_nhg;
d35f447d
RZ
81 struct sockaddr_storage addr;
82
83 /* data plane buffers. */
84 struct stream *ibuf;
85 struct stream *obuf;
86 pthread_mutex_t obuf_mutex;
87
ba803a2f
RZ
88 /*
89 * data plane context queue:
90 * When a FPM server connection becomes a bottleneck, we must keep the
91 * data plane contexts until we get a chance to process them.
92 */
93 struct dplane_ctx_q ctxqueue;
94 pthread_mutex_t ctxqueue_mutex;
95
d35f447d 96 /* data plane events. */
ba803a2f 97 struct zebra_dplane_provider *prov;
d35f447d
RZ
98 struct frr_pthread *fthread;
99 struct thread *t_connect;
100 struct thread *t_read;
101 struct thread *t_write;
3bdd7fca 102 struct thread *t_event;
ba803a2f 103 struct thread *t_dequeue;
018e77bc
RZ
104
105 /* zebra events. */
981ca597
RZ
106 struct thread *t_nhgreset;
107 struct thread *t_nhgwalk;
018e77bc
RZ
108 struct thread *t_ribreset;
109 struct thread *t_ribwalk;
bda10adf
RZ
110 struct thread *t_rmacreset;
111 struct thread *t_rmacwalk;
6cc059cd
RZ
112
113 /* Statistic counters. */
114 struct {
115 /* Amount of bytes read into ibuf. */
770a8d28 116 _Atomic uint32_t bytes_read;
6cc059cd 117 /* Amount of bytes written from obuf. */
770a8d28 118 _Atomic uint32_t bytes_sent;
ad4d1022 119 /* Output buffer current usage. */
770a8d28 120 _Atomic uint32_t obuf_bytes;
ad4d1022 121 /* Output buffer peak usage. */
770a8d28 122 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
123
124 /* Amount of connection closes. */
770a8d28 125 _Atomic uint32_t connection_closes;
6cc059cd 126 /* Amount of connection errors. */
770a8d28 127 _Atomic uint32_t connection_errors;
6cc059cd
RZ
128
129 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 130 _Atomic uint32_t user_configures;
6cc059cd 131 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 132 _Atomic uint32_t user_disables;
6cc059cd
RZ
133
134 /* Amount of data plane context processed. */
770a8d28 135 _Atomic uint32_t dplane_contexts;
ba803a2f 136 /* Amount of data plane contexts enqueued. */
770a8d28 137 _Atomic uint32_t ctxqueue_len;
ba803a2f 138 /* Peak amount of data plane contexts enqueued. */
770a8d28 139 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
140
141 /* Amount of buffer full events. */
770a8d28 142 _Atomic uint32_t buffer_full;
6cc059cd 143 } counters;
770a8d28 144} *gfnc;
3bdd7fca
RZ
145
146enum fpm_nl_events {
147 /* Ask for FPM to reconnect the external server. */
148 FNE_RECONNECT,
149 /* Disable FPM. */
150 FNE_DISABLE,
6cc059cd
RZ
151 /* Reset counters. */
152 FNE_RESET_COUNTERS,
b55ab92a
RZ
153 /* Toggle next hop group feature. */
154 FNE_TOGGLE_NHG,
a2032324
RZ
155 /* Reconnect request by our own code to avoid races. */
156 FNE_INTERNAL_RECONNECT,
55eb9d4d
RZ
157
158 /* Next hop groups walk finished. */
159 FNE_NHG_FINISHED,
160 /* RIB walk finished. */
161 FNE_RIB_FINISHED,
162 /* RMAC walk finished. */
163 FNE_RMAC_FINISHED,
d35f447d
RZ
164};
165
a2032324
RZ
166#define FPM_RECONNECT(fnc) \
167 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
168 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
169
55eb9d4d
RZ
170#define WALK_FINISH(fnc, ev) \
171 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
172 (ev), NULL)
173
018e77bc
RZ
174/*
175 * Prototypes.
176 */
3bdd7fca 177static int fpm_process_event(struct thread *t);
018e77bc 178static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
981ca597
RZ
179static int fpm_nhg_send(struct thread *t);
180static int fpm_nhg_reset(struct thread *t);
018e77bc
RZ
181static int fpm_rib_send(struct thread *t);
182static int fpm_rib_reset(struct thread *t);
bda10adf
RZ
183static int fpm_rmac_send(struct thread *t);
184static int fpm_rmac_reset(struct thread *t);
018e77bc 185
ad4d1022
RZ
186/*
187 * Helper functions.
188 */
189
190/**
191 * Reorganizes the data on the buffer so it can fit more data.
192 *
193 * @param s stream pointer.
194 */
195static void stream_pulldown(struct stream *s)
196{
197 size_t rlen = STREAM_READABLE(s);
198
199 /* No more data, so just move the pointers. */
200 if (rlen == 0) {
201 stream_reset(s);
202 return;
203 }
204
205 /* Move the available data to the beginning. */
206 memmove(s->data, &s->data[s->getp], rlen);
207 s->getp = 0;
208 s->endp = rlen;
209}
210
3bdd7fca
RZ
211/*
212 * CLI.
213 */
6cc059cd
RZ
214#define FPM_STR "Forwarding Plane Manager configuration\n"
215
3bdd7fca
RZ
216DEFUN(fpm_set_address, fpm_set_address_cmd,
217 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 218 FPM_STR
3bdd7fca
RZ
219 "FPM remote listening server address\n"
220 "Remote IPv4 FPM server\n"
221 "Remote IPv6 FPM server\n"
222 "FPM remote listening server port\n"
223 "Remote FPM server port\n")
224{
225 struct sockaddr_in *sin;
226 struct sockaddr_in6 *sin6;
227 uint16_t port = 0;
228 uint8_t naddr[INET6_BUFSIZ];
229
230 if (argc == 5)
231 port = strtol(argv[4]->arg, NULL, 10);
232
233 /* Handle IPv4 addresses. */
234 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
235 sin = (struct sockaddr_in *)&gfnc->addr;
236
237 memset(sin, 0, sizeof(*sin));
238 sin->sin_family = AF_INET;
239 sin->sin_port =
240 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
241#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
242 sin->sin_len = sizeof(*sin);
243#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
244 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
245
246 goto ask_reconnect;
247 }
248
249 /* Handle IPv6 addresses. */
250 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
251 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
252 return CMD_WARNING;
253 }
254
255 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
256 memset(sin6, 0, sizeof(*sin6));
257 sin6->sin6_family = AF_INET6;
258 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
259#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
260 sin6->sin6_len = sizeof(*sin6);
261#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
262 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
263
264ask_reconnect:
265 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
266 FNE_RECONNECT, &gfnc->t_event);
267 return CMD_SUCCESS;
268}
269
270DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
271 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
272 NO_STR
6cc059cd 273 FPM_STR
3bdd7fca
RZ
274 "FPM remote listening server address\n"
275 "Remote IPv4 FPM server\n"
276 "Remote IPv6 FPM server\n"
277 "FPM remote listening server port\n"
278 "Remote FPM server port\n")
279{
280 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
281 FNE_DISABLE, &gfnc->t_event);
282 return CMD_SUCCESS;
283}
284
b55ab92a
RZ
285DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
286 "fpm use-next-hop-groups",
287 FPM_STR
288 "Use netlink next hop groups feature.\n")
289{
290 /* Already enabled. */
291 if (gfnc->use_nhg)
292 return CMD_SUCCESS;
293
294 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
295 FNE_TOGGLE_NHG, &gfnc->t_event);
296
297 return CMD_SUCCESS;
298}
299
300DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
301 "no fpm use-next-hop-groups",
302 NO_STR
303 FPM_STR
304 "Use netlink next hop groups feature.\n")
305{
306 /* Already disabled. */
307 if (!gfnc->use_nhg)
308 return CMD_SUCCESS;
309
310 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
311 FNE_TOGGLE_NHG, &gfnc->t_event);
312
313 return CMD_SUCCESS;
314}
315
6cc059cd
RZ
316DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
317 "clear fpm counters",
318 CLEAR_STR
319 FPM_STR
320 "FPM statistic counters\n")
321{
322 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
323 FNE_RESET_COUNTERS, &gfnc->t_event);
324 return CMD_SUCCESS;
325}
326
327DEFUN(fpm_show_counters, fpm_show_counters_cmd,
328 "show fpm counters",
329 SHOW_STR
330 FPM_STR
331 "FPM statistic counters\n")
332{
333 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
334
335#define SHOW_COUNTER(label, counter) \
770a8d28 336 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
337
338 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
339 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
340 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
341 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
342 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
343 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
344 SHOW_COUNTER("Data plane items processed",
345 gfnc->counters.dplane_contexts);
ba803a2f
RZ
346 SHOW_COUNTER("Data plane items enqueued",
347 gfnc->counters.ctxqueue_len);
348 SHOW_COUNTER("Data plane items queue peak",
349 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
350 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
351 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
352 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
353
354#undef SHOW_COUNTER
355
356 return CMD_SUCCESS;
357}
358
359DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
360 "show fpm counters json",
361 SHOW_STR
362 FPM_STR
363 "FPM statistic counters\n"
364 JSON_STR)
365{
366 struct json_object *jo;
367
368 jo = json_object_new_object();
369 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
370 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
371 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
372 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
373 json_object_int_add(jo, "connection-closes",
374 gfnc->counters.connection_closes);
375 json_object_int_add(jo, "connection-errors",
376 gfnc->counters.connection_errors);
377 json_object_int_add(jo, "data-plane-contexts",
378 gfnc->counters.dplane_contexts);
ba803a2f
RZ
379 json_object_int_add(jo, "data-plane-contexts-queue",
380 gfnc->counters.ctxqueue_len);
381 json_object_int_add(jo, "data-plane-contexts-queue-peak",
382 gfnc->counters.ctxqueue_len_peak);
6cc059cd 383 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
384 json_object_int_add(jo, "user-configures",
385 gfnc->counters.user_configures);
6cc059cd
RZ
386 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
387 vty_out(vty, "%s\n", json_object_to_json_string_ext(jo, 0));
388 json_object_free(jo);
389
390 return CMD_SUCCESS;
391}
392
3bdd7fca
RZ
393static int fpm_write_config(struct vty *vty)
394{
395 struct sockaddr_in *sin;
396 struct sockaddr_in6 *sin6;
397 int written = 0;
398 char addrstr[INET6_ADDRSTRLEN];
399
400 if (gfnc->disabled)
401 return written;
402
403 switch (gfnc->addr.ss_family) {
404 case AF_INET:
405 written = 1;
406 sin = (struct sockaddr_in *)&gfnc->addr;
407 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
408 vty_out(vty, "fpm address %s", addrstr);
409 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
410 vty_out(vty, " port %d", ntohs(sin->sin_port));
411
412 vty_out(vty, "\n");
413 break;
414 case AF_INET6:
415 written = 1;
416 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
417 inet_ntop(AF_INET, &sin6->sin6_addr, addrstr, sizeof(addrstr));
418 vty_out(vty, "fpm address %s", addrstr);
419 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
420 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
421
422 vty_out(vty, "\n");
423 break;
424
425 default:
426 break;
427 }
428
b55ab92a
RZ
429 if (!gfnc->use_nhg) {
430 vty_out(vty, "no fpm use-next-hop-groups\n");
431 written = 1;
432 }
433
3bdd7fca
RZ
434 return written;
435}
436
612c2c15 437static struct cmd_node fpm_node = {
893d8beb
DL
438 .name = "fpm",
439 .node = FPM_NODE,
3bdd7fca 440 .prompt = "",
612c2c15 441 .config_write = fpm_write_config,
3bdd7fca
RZ
442};
443
d35f447d
RZ
444/*
445 * FPM functions.
446 */
447static int fpm_connect(struct thread *t);
448
449static void fpm_reconnect(struct fpm_nl_ctx *fnc)
450{
a2032324
RZ
451 /* Cancel all zebra threads first. */
452 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
453 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
454 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
455 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
456 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
457 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
458
459 /*
460 * Grab the lock to empty the streams (data plane might try to
461 * enqueue updates while we are closing).
462 */
d35f447d
RZ
463 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
464
3bdd7fca
RZ
465 /* Avoid calling close on `-1`. */
466 if (fnc->socket != -1) {
467 close(fnc->socket);
468 fnc->socket = -1;
469 }
470
d35f447d
RZ
471 stream_reset(fnc->ibuf);
472 stream_reset(fnc->obuf);
473 THREAD_OFF(fnc->t_read);
474 THREAD_OFF(fnc->t_write);
018e77bc 475
3bdd7fca
RZ
476 /* FPM is disabled, don't attempt to connect. */
477 if (fnc->disabled)
478 return;
479
d35f447d
RZ
480 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
481 &fnc->t_connect);
482}
483
484static int fpm_read(struct thread *t)
485{
486 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
487 ssize_t rv;
488
489 /* Let's ignore the input at the moment. */
490 rv = stream_read_try(fnc->ibuf, fnc->socket,
491 STREAM_WRITEABLE(fnc->ibuf));
e1afb97f
RZ
492 /* We've got an interruption. */
493 if (rv == -2) {
494 /* Schedule next read. */
495 thread_add_read(fnc->fthread->master, fpm_read, fnc,
496 fnc->socket, &fnc->t_read);
497 return 0;
498 }
d35f447d 499 if (rv == 0) {
c871e6c9
RZ
500 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
501 memory_order_relaxed);
e5e444d8
RZ
502
503 if (IS_ZEBRA_DEBUG_FPM)
504 zlog_debug("%s: connection closed", __func__);
505
a2032324 506 FPM_RECONNECT(fnc);
d35f447d
RZ
507 return 0;
508 }
509 if (rv == -1) {
c871e6c9
RZ
510 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
511 memory_order_relaxed);
e5e444d8
RZ
512 zlog_warn("%s: connection failure: %s", __func__,
513 strerror(errno));
a2032324 514 FPM_RECONNECT(fnc);
d35f447d
RZ
515 return 0;
516 }
517 stream_reset(fnc->ibuf);
518
6cc059cd 519 /* Account all bytes read. */
c871e6c9
RZ
520 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
521 memory_order_relaxed);
6cc059cd 522
d35f447d
RZ
523 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
524 &fnc->t_read);
525
526 return 0;
527}
528
529static int fpm_write(struct thread *t)
530{
531 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
532 socklen_t statuslen;
533 ssize_t bwritten;
534 int rv, status;
535 size_t btotal;
536
537 if (fnc->connecting == true) {
538 status = 0;
539 statuslen = sizeof(status);
540
541 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
542 &statuslen);
543 if (rv == -1 || status != 0) {
544 if (rv != -1)
e5e444d8
RZ
545 zlog_warn("%s: connection failed: %s", __func__,
546 strerror(status));
d35f447d 547 else
e5e444d8
RZ
548 zlog_warn("%s: SO_ERROR failed: %s", __func__,
549 strerror(status));
d35f447d 550
c871e6c9
RZ
551 atomic_fetch_add_explicit(
552 &fnc->counters.connection_errors, 1,
553 memory_order_relaxed);
6cc059cd 554
a2032324 555 FPM_RECONNECT(fnc);
d35f447d
RZ
556 return 0;
557 }
558
559 fnc->connecting = false;
018e77bc 560
e1afb97f
RZ
561 /* Permit receiving messages now. */
562 thread_add_read(fnc->fthread->master, fpm_read, fnc,
563 fnc->socket, &fnc->t_read);
d35f447d
RZ
564 }
565
566 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
567
568 while (true) {
569 /* Stream is empty: reset pointers and return. */
570 if (STREAM_READABLE(fnc->obuf) == 0) {
571 stream_reset(fnc->obuf);
572 break;
573 }
574
575 /* Try to write all at once. */
576 btotal = stream_get_endp(fnc->obuf) -
577 stream_get_getp(fnc->obuf);
578 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
579 if (bwritten == 0) {
c871e6c9
RZ
580 atomic_fetch_add_explicit(
581 &fnc->counters.connection_closes, 1,
582 memory_order_relaxed);
e5e444d8
RZ
583
584 if (IS_ZEBRA_DEBUG_FPM)
585 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
586 break;
587 }
588 if (bwritten == -1) {
ad4d1022
RZ
589 /* Attempt to continue if blocked by a signal. */
590 if (errno == EINTR)
591 continue;
592 /* Receiver is probably slow, lets give it some time. */
593 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
594 break;
595
c871e6c9
RZ
596 atomic_fetch_add_explicit(
597 &fnc->counters.connection_errors, 1,
598 memory_order_relaxed);
e5e444d8
RZ
599 zlog_warn("%s: connection failure: %s", __func__,
600 strerror(errno));
a2032324
RZ
601
602 FPM_RECONNECT(fnc);
603 return 0;
d35f447d
RZ
604 }
605
6cc059cd 606 /* Account all bytes sent. */
c871e6c9
RZ
607 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
608 memory_order_relaxed);
6cc059cd 609
ad4d1022 610 /* Account number of bytes free. */
c871e6c9
RZ
611 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
612 memory_order_relaxed);
ad4d1022 613
d35f447d
RZ
614 stream_forward_getp(fnc->obuf, (size_t)bwritten);
615 }
616
617 /* Stream is not empty yet, we must schedule more writes. */
618 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 619 stream_pulldown(fnc->obuf);
d35f447d
RZ
620 thread_add_write(fnc->fthread->master, fpm_write, fnc,
621 fnc->socket, &fnc->t_write);
622 return 0;
623 }
624
625 return 0;
626}
627
628static int fpm_connect(struct thread *t)
629{
630 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
631 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
632 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
633 socklen_t slen;
d35f447d
RZ
634 int rv, sock;
635 char addrstr[INET6_ADDRSTRLEN];
636
3bdd7fca 637 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 638 if (sock == -1) {
6cc059cd 639 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
640 strerror(errno));
641 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
642 &fnc->t_connect);
643 return 0;
644 }
645
646 set_nonblocking(sock);
647
3bdd7fca
RZ
648 if (fnc->addr.ss_family == AF_INET) {
649 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
650 slen = sizeof(*sin);
651 } else {
652 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
653 slen = sizeof(*sin6);
654 }
d35f447d 655
e5e444d8
RZ
656 if (IS_ZEBRA_DEBUG_FPM)
657 zlog_debug("%s: attempting to connect to %s:%d", __func__,
658 addrstr, ntohs(sin->sin_port));
d35f447d 659
3bdd7fca 660 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 661 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
662 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
663 memory_order_relaxed);
d35f447d
RZ
664 close(sock);
665 zlog_warn("%s: fpm connection failed: %s", __func__,
666 strerror(errno));
667 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
668 &fnc->t_connect);
669 return 0;
670 }
671
672 fnc->connecting = (errno == EINPROGRESS);
673 fnc->socket = sock;
e1afb97f
RZ
674 if (!fnc->connecting)
675 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
676 &fnc->t_read);
d35f447d
RZ
677 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
678 &fnc->t_write);
679
018e77bc 680 /* Mark all routes as unsent. */
e41e0f81
RZ
681 if (fnc->use_nhg)
682 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
683 &fnc->t_nhgreset);
684 else
685 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
686 &fnc->t_ribreset);
018e77bc 687
d35f447d
RZ
688 return 0;
689}
690
691/**
692 * Encode data plane operation context into netlink and enqueue it in the FPM
693 * output buffer.
694 *
695 * @param fnc the netlink FPM context.
696 * @param ctx the data plane operation context data.
697 * @return 0 on success or -1 on not enough space.
698 */
699static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
700{
701 uint8_t nl_buf[NL_PKT_BUF_SIZE];
702 size_t nl_buf_len;
703 ssize_t rv;
edfeff42 704 uint64_t obytes, obytes_peak;
b55ab92a
RZ
705 enum dplane_op_e op = dplane_ctx_get_op(ctx);
706
707 /*
708 * If we were configured to not use next hop groups, then quit as soon
709 * as possible.
710 */
711 if ((!fnc->use_nhg)
712 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
713 || op == DPLANE_OP_NH_UPDATE))
714 return 0;
d35f447d
RZ
715
716 nl_buf_len = 0;
717
718 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
719
b55ab92a 720 switch (op) {
d35f447d
RZ
721 case DPLANE_OP_ROUTE_UPDATE:
722 case DPLANE_OP_ROUTE_DELETE:
0be6e7d7
JU
723 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
724 nl_buf, sizeof(nl_buf),
725 true, fnc->use_nhg);
d35f447d 726 if (rv <= 0) {
0be6e7d7
JU
727 zlog_err(
728 "%s: netlink_route_multipath_msg_encode failed",
729 __func__);
d35f447d
RZ
730 return 0;
731 }
732
733 nl_buf_len = (size_t)rv;
d35f447d
RZ
734
735 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 736 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
737 break;
738
739 /* FALL THROUGH */
740 case DPLANE_OP_ROUTE_INSTALL:
0be6e7d7 741 rv = netlink_route_multipath_msg_encode(
b55ab92a
RZ
742 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
743 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 744 if (rv <= 0) {
0be6e7d7
JU
745 zlog_err(
746 "%s: netlink_route_multipath_msg_encode failed",
747 __func__);
d35f447d
RZ
748 return 0;
749 }
750
751 nl_buf_len += (size_t)rv;
d35f447d
RZ
752 break;
753
bda10adf
RZ
754 case DPLANE_OP_MAC_INSTALL:
755 case DPLANE_OP_MAC_DELETE:
756 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
757 if (rv <= 0) {
e5e444d8
RZ
758 zlog_err("%s: netlink_macfdb_update_ctx failed",
759 __func__);
bda10adf
RZ
760 return 0;
761 }
762
763 nl_buf_len = (size_t)rv;
bda10adf
RZ
764 break;
765
e9a1cd93 766 case DPLANE_OP_NH_DELETE:
0be6e7d7
JU
767 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
768 sizeof(nl_buf));
e9a1cd93 769 if (rv <= 0) {
0be6e7d7
JU
770 zlog_err("%s: netlink_nexthop_msg_encode failed",
771 __func__);
e9a1cd93
RZ
772 return 0;
773 }
774
775 nl_buf_len = (size_t)rv;
776 break;
d35f447d
RZ
777 case DPLANE_OP_NH_INSTALL:
778 case DPLANE_OP_NH_UPDATE:
0be6e7d7
JU
779 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
780 sizeof(nl_buf));
e9a1cd93 781 if (rv <= 0) {
0be6e7d7
JU
782 zlog_err("%s: netlink_nexthop_msg_encode failed",
783 __func__);
e9a1cd93
RZ
784 return 0;
785 }
786
787 nl_buf_len = (size_t)rv;
788 break;
789
d35f447d
RZ
790 case DPLANE_OP_LSP_INSTALL:
791 case DPLANE_OP_LSP_UPDATE:
792 case DPLANE_OP_LSP_DELETE:
793 case DPLANE_OP_PW_INSTALL:
794 case DPLANE_OP_PW_UNINSTALL:
795 case DPLANE_OP_ADDR_INSTALL:
796 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
797 case DPLANE_OP_NEIGH_INSTALL:
798 case DPLANE_OP_NEIGH_UPDATE:
799 case DPLANE_OP_NEIGH_DELETE:
800 case DPLANE_OP_VTEP_ADD:
801 case DPLANE_OP_VTEP_DELETE:
802 case DPLANE_OP_SYS_ROUTE_ADD:
803 case DPLANE_OP_SYS_ROUTE_DELETE:
804 case DPLANE_OP_ROUTE_NOTIFY:
805 case DPLANE_OP_LSP_NOTIFY:
806 case DPLANE_OP_NONE:
807 break;
808
809 default:
e5e444d8
RZ
810 if (IS_ZEBRA_DEBUG_FPM)
811 zlog_debug("%s: unhandled data plane message (%d) %s",
812 __func__, dplane_ctx_get_op(ctx),
813 dplane_op2str(dplane_ctx_get_op(ctx)));
d35f447d
RZ
814 break;
815 }
816
817 /* Skip empty enqueues. */
818 if (nl_buf_len == 0)
819 return 0;
820
a179ba35
RZ
821 /* We must know if someday a message goes beyond 65KiB. */
822 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
823
824 /* Check if we have enough buffer space. */
825 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
826 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
827 memory_order_relaxed);
e5e444d8
RZ
828
829 if (IS_ZEBRA_DEBUG_FPM)
830 zlog_debug(
831 "%s: buffer full: wants to write %zu but has %zu",
832 __func__, nl_buf_len + FPM_HEADER_SIZE,
833 STREAM_WRITEABLE(fnc->obuf));
834
a179ba35
RZ
835 return -1;
836 }
837
d35f447d 838 /*
a179ba35
RZ
839 * Fill in the FPM header information.
840 *
841 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
842 */
843 stream_putc(fnc->obuf, 1);
844 stream_putc(fnc->obuf, 1);
a179ba35 845 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
846
847 /* Write current data. */
848 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
849
ad4d1022 850 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
851 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
852 nl_buf_len + FPM_HEADER_SIZE,
853 memory_order_relaxed);
edfeff42
RZ
854 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
855 memory_order_relaxed);
856 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
857 memory_order_relaxed);
858 if (obytes_peak < obytes)
c871e6c9
RZ
859 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
860 memory_order_relaxed);
ad4d1022 861
d35f447d
RZ
862 /* Tell the thread to start writing. */
863 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
864 &fnc->t_write);
865
866 return 0;
867}
868
981ca597
RZ
869/*
870 * Next hop walk/send functions.
871 */
872struct fpm_nhg_arg {
873 struct zebra_dplane_ctx *ctx;
874 struct fpm_nl_ctx *fnc;
875 bool complete;
876};
877
878static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
879{
880 struct nhg_hash_entry *nhe = bucket->data;
881 struct fpm_nhg_arg *fna = arg;
882
883 /* This entry was already sent, skip it. */
884 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
885 return HASHWALK_CONTINUE;
886
887 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
888 dplane_ctx_reset(fna->ctx);
889 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
890 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
891 /* Our buffers are full, lets give it some cycles. */
892 fna->complete = false;
893 return HASHWALK_ABORT;
894 }
895
896 /* Mark group as sent, so it doesn't get sent again. */
897 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
898
899 return HASHWALK_CONTINUE;
900}
901
902static int fpm_nhg_send(struct thread *t)
903{
904 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
905 struct fpm_nhg_arg fna;
906
907 fna.fnc = fnc;
908 fna.ctx = dplane_ctx_alloc();
909 fna.complete = true;
910
911 /* Send next hops. */
912 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
913
914 /* `free()` allocated memory. */
915 dplane_ctx_fini(&fna.ctx);
916
917 /* We are done sending next hops, lets install the routes now. */
55eb9d4d
RZ
918 if (fna.complete) {
919 WALK_FINISH(fnc, FNE_NHG_FINISHED);
e41e0f81
RZ
920 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
921 &fnc->t_ribreset);
55eb9d4d 922 } else /* Otherwise reschedule next hop group again. */
981ca597
RZ
923 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
924 &fnc->t_nhgwalk);
925
926 return 0;
927}
928
018e77bc
RZ
929/**
930 * Send all RIB installed routes to the connected data plane.
931 */
932static int fpm_rib_send(struct thread *t)
933{
934 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
935 rib_dest_t *dest;
936 struct route_node *rn;
937 struct route_table *rt;
938 struct zebra_dplane_ctx *ctx;
939 rib_tables_iter_t rt_iter;
940
941 /* Allocate temporary context for all transactions. */
942 ctx = dplane_ctx_alloc();
943
944 rt_iter.state = RIB_TABLES_ITER_S_INIT;
945 while ((rt = rib_tables_iter_next(&rt_iter))) {
946 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
947 dest = rib_dest_from_rnode(rn);
948 /* Skip bad route entries. */
a50404aa 949 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 950 continue;
018e77bc
RZ
951
952 /* Check for already sent routes. */
a50404aa 953 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 954 continue;
018e77bc
RZ
955
956 /* Enqueue route install. */
957 dplane_ctx_reset(ctx);
958 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
959 dest->selected_fib);
960 if (fpm_nl_enqueue(fnc, ctx) == -1) {
961 /* Free the temporary allocated context. */
962 dplane_ctx_fini(&ctx);
963
018e77bc
RZ
964 thread_add_timer(zrouter.master, fpm_rib_send,
965 fnc, 1, &fnc->t_ribwalk);
966 return 0;
967 }
968
969 /* Mark as sent. */
970 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
971 }
972 }
973
974 /* Free the temporary allocated context. */
975 dplane_ctx_fini(&ctx);
976
977 /* All RIB routes sent! */
55eb9d4d 978 WALK_FINISH(fnc, FNE_RIB_FINISHED);
018e77bc 979
e41e0f81
RZ
980 /* Schedule next event: RMAC reset. */
981 thread_add_event(zrouter.master, fpm_rmac_reset, fnc, 0,
982 &fnc->t_rmacreset);
983
018e77bc
RZ
984 return 0;
985}
986
bda10adf
RZ
987/*
988 * The next three functions will handle RMAC enqueue.
989 */
990struct fpm_rmac_arg {
991 struct zebra_dplane_ctx *ctx;
992 struct fpm_nl_ctx *fnc;
993 zebra_l3vni_t *zl3vni;
55eb9d4d 994 bool complete;
bda10adf
RZ
995};
996
9d5c3268 997static void fpm_enqueue_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
998{
999 struct fpm_rmac_arg *fra = arg;
1000 zebra_mac_t *zrmac = backet->data;
1001 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1002 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1003 struct zebra_if *br_zif;
1004 vlanid_t vid;
1005 bool sticky;
1006
1007 /* Entry already sent. */
55eb9d4d 1008 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
bda10adf
RZ
1009 return;
1010
1011 sticky = !!CHECK_FLAG(zrmac->flags,
1012 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1013 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1014 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1015
1016 dplane_ctx_reset(fra->ctx);
1017 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1018 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a 1019 zif->brslave_info.br_if, vid,
f188e68e
AK
1020 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky,
1021 0 /*nhg*/, 0 /*update_flags*/);
bda10adf 1022 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1023 thread_add_timer(zrouter.master, fpm_rmac_send,
1024 fra->fnc, 1, &fra->fnc->t_rmacwalk);
55eb9d4d 1025 fra->complete = false;
bda10adf
RZ
1026 }
1027}
1028
9d5c3268 1029static void fpm_enqueue_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1030{
1031 struct fpm_rmac_arg *fra = arg;
1032 zebra_l3vni_t *zl3vni = backet->data;
1033
1034 fra->zl3vni = zl3vni;
1035 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1036}
1037
1038static int fpm_rmac_send(struct thread *t)
1039{
1040 struct fpm_rmac_arg fra;
1041
1042 fra.fnc = THREAD_ARG(t);
1043 fra.ctx = dplane_ctx_alloc();
55eb9d4d 1044 fra.complete = true;
bda10adf
RZ
1045 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1046 dplane_ctx_fini(&fra.ctx);
1047
55eb9d4d
RZ
1048 /* RMAC walk completed. */
1049 if (fra.complete)
1050 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
1051
bda10adf
RZ
1052 return 0;
1053}
1054
981ca597
RZ
1055/*
1056 * Resets the next hop FPM flags so we send all next hops again.
1057 */
1058static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1059{
1060 struct nhg_hash_entry *nhe = bucket->data;
1061
1062 /* Unset FPM installation flag so it gets installed again. */
1063 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1064}
1065
1066static int fpm_nhg_reset(struct thread *t)
1067{
55eb9d4d
RZ
1068 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1069
1070 fnc->nhg_complete = false;
981ca597 1071 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
e41e0f81
RZ
1072
1073 /* Schedule next step: send next hop groups. */
1074 thread_add_event(zrouter.master, fpm_nhg_send, fnc, 0, &fnc->t_nhgwalk);
1075
981ca597
RZ
1076 return 0;
1077}
1078
018e77bc
RZ
1079/**
1080 * Resets the RIB FPM flags so we send all routes again.
1081 */
1082static int fpm_rib_reset(struct thread *t)
1083{
1084 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1085 rib_dest_t *dest;
1086 struct route_node *rn;
1087 struct route_table *rt;
1088 rib_tables_iter_t rt_iter;
1089
1090 fnc->rib_complete = false;
1091
1092 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1093 while ((rt = rib_tables_iter_next(&rt_iter))) {
1094 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1095 dest = rib_dest_from_rnode(rn);
1096 /* Skip bad route entries. */
1097 if (dest == NULL)
1098 continue;
1099
1100 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1101 }
1102 }
1103
e41e0f81
RZ
1104 /* Schedule next step: send RIB routes. */
1105 thread_add_event(zrouter.master, fpm_rib_send, fnc, 0, &fnc->t_ribwalk);
1106
018e77bc
RZ
1107 return 0;
1108}
1109
bda10adf
RZ
1110/*
1111 * The next three function will handle RMAC table reset.
1112 */
9d5c3268 1113static void fpm_unset_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1114{
1115 zebra_mac_t *zrmac = backet->data;
1116
1117 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1118}
1119
9d5c3268 1120static void fpm_unset_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1121{
1122 zebra_l3vni_t *zl3vni = backet->data;
1123
1124 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1125}
1126
1127static int fpm_rmac_reset(struct thread *t)
1128{
55eb9d4d
RZ
1129 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1130
1131 fnc->rmac_complete = false;
bda10adf
RZ
1132 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1133
e41e0f81
RZ
1134 /* Schedule next event: send RMAC entries. */
1135 thread_add_event(zrouter.master, fpm_rmac_send, fnc, 0,
1136 &fnc->t_rmacwalk);
1137
bda10adf
RZ
1138 return 0;
1139}
1140
ba803a2f
RZ
1141static int fpm_process_queue(struct thread *t)
1142{
1143 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1144 struct zebra_dplane_ctx *ctx;
1145
1146 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1147
1148 while (true) {
1149 /* No space available yet. */
1150 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
1151 break;
1152
1153 /* Dequeue next item or quit processing. */
1154 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1155 if (ctx == NULL)
1156 break;
1157
1158 fpm_nl_enqueue(fnc, ctx);
1159
1160 /* Account the processed entries. */
c871e6c9
RZ
1161 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1,
1162 memory_order_relaxed);
1163 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1164 memory_order_relaxed);
ba803a2f
RZ
1165
1166 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1167 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1168 }
1169
1170 /* Check for more items in the queue. */
c871e6c9
RZ
1171 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1172 memory_order_relaxed)
1173 > 0)
ba803a2f
RZ
1174 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1175 fnc, 0, &fnc->t_dequeue);
1176
1177 return 0;
1178}
1179
3bdd7fca
RZ
1180/**
1181 * Handles external (e.g. CLI, data plane or others) events.
1182 */
1183static int fpm_process_event(struct thread *t)
1184{
1185 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1186 int event = THREAD_VAL(t);
1187
1188 switch (event) {
1189 case FNE_DISABLE:
e5e444d8 1190 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1191 fnc->disabled = true;
c871e6c9
RZ
1192 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1193 memory_order_relaxed);
3bdd7fca
RZ
1194
1195 /* Call reconnect to disable timers and clean up context. */
1196 fpm_reconnect(fnc);
1197 break;
1198
1199 case FNE_RECONNECT:
e5e444d8 1200 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1201 fnc->disabled = false;
c871e6c9
RZ
1202 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1203 memory_order_relaxed);
3bdd7fca
RZ
1204 fpm_reconnect(fnc);
1205 break;
1206
6cc059cd 1207 case FNE_RESET_COUNTERS:
e5e444d8 1208 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1209 memset(&fnc->counters, 0, sizeof(fnc->counters));
1210 break;
1211
b55ab92a
RZ
1212 case FNE_TOGGLE_NHG:
1213 zlog_info("%s: toggle next hop groups support", __func__);
1214 fnc->use_nhg = !fnc->use_nhg;
1215 fpm_reconnect(fnc);
1216 break;
1217
a2032324
RZ
1218 case FNE_INTERNAL_RECONNECT:
1219 fpm_reconnect(fnc);
1220 break;
1221
55eb9d4d
RZ
1222 case FNE_NHG_FINISHED:
1223 if (IS_ZEBRA_DEBUG_FPM)
1224 zlog_debug("%s: next hop groups walk finished",
1225 __func__);
1226
1227 fnc->nhg_complete = true;
1228 break;
1229 case FNE_RIB_FINISHED:
1230 if (IS_ZEBRA_DEBUG_FPM)
1231 zlog_debug("%s: RIB walk finished", __func__);
1232
1233 fnc->rib_complete = true;
1234 break;
1235 case FNE_RMAC_FINISHED:
1236 if (IS_ZEBRA_DEBUG_FPM)
1237 zlog_debug("%s: RMAC walk finished", __func__);
1238
1239 fnc->rmac_complete = true;
1240 break;
1241
3bdd7fca 1242 default:
e5e444d8
RZ
1243 if (IS_ZEBRA_DEBUG_FPM)
1244 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1245 break;
1246 }
1247
1248 return 0;
1249}
1250
d35f447d
RZ
1251/*
1252 * Data plane functions.
1253 */
1254static int fpm_nl_start(struct zebra_dplane_provider *prov)
1255{
1256 struct fpm_nl_ctx *fnc;
1257
1258 fnc = dplane_provider_get_data(prov);
1259 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1260 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1261 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1262 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1263 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1264 fnc->socket = -1;
3bdd7fca 1265 fnc->disabled = true;
ba803a2f
RZ
1266 fnc->prov = prov;
1267 TAILQ_INIT(&fnc->ctxqueue);
1268 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1269
b55ab92a
RZ
1270 /* Set default values. */
1271 fnc->use_nhg = true;
1272
d35f447d
RZ
1273 return 0;
1274}
1275
98a87504 1276static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1277{
98a87504 1278 /* Disable all events and close socket. */
981ca597
RZ
1279 THREAD_OFF(fnc->t_nhgreset);
1280 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1281 THREAD_OFF(fnc->t_ribreset);
1282 THREAD_OFF(fnc->t_ribwalk);
1283 THREAD_OFF(fnc->t_rmacreset);
1284 THREAD_OFF(fnc->t_rmacwalk);
1285 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1286 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1287 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1288
98a87504
RZ
1289 if (fnc->socket != -1) {
1290 close(fnc->socket);
1291 fnc->socket = -1;
1292 }
1293
1294 return 0;
1295}
1296
1297static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1298{
1299 /* Stop the running thread. */
1300 frr_pthread_stop(fnc->fthread, NULL);
1301
1302 /* Free all allocated resources. */
1303 pthread_mutex_destroy(&fnc->obuf_mutex);
1304 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1305 stream_free(fnc->ibuf);
1306 stream_free(fnc->obuf);
98a87504
RZ
1307 free(gfnc);
1308 gfnc = NULL;
d35f447d
RZ
1309
1310 return 0;
1311}
1312
98a87504
RZ
1313static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1314{
1315 struct fpm_nl_ctx *fnc;
1316
1317 fnc = dplane_provider_get_data(prov);
1318 if (early)
1319 return fpm_nl_finish_early(fnc);
1320
1321 return fpm_nl_finish_late(fnc);
1322}
1323
d35f447d
RZ
1324static int fpm_nl_process(struct zebra_dplane_provider *prov)
1325{
1326 struct zebra_dplane_ctx *ctx;
1327 struct fpm_nl_ctx *fnc;
1328 int counter, limit;
edfeff42 1329 uint64_t cur_queue, peak_queue;
d35f447d
RZ
1330
1331 fnc = dplane_provider_get_data(prov);
1332 limit = dplane_provider_get_work_limit(prov);
1333 for (counter = 0; counter < limit; counter++) {
1334 ctx = dplane_provider_dequeue_in_ctx(prov);
1335 if (ctx == NULL)
1336 break;
1337
1338 /*
1339 * Skip all notifications if not connected, we'll walk the RIB
1340 * anyway.
1341 */
6cc059cd 1342 if (fnc->socket != -1 && fnc->connecting == false) {
ba803a2f
RZ
1343 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1344 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1345
1346 /* Account the number of contexts. */
c871e6c9
RZ
1347 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1348 1, memory_order_relaxed);
1349 cur_queue = atomic_load_explicit(
1350 &fnc->counters.ctxqueue_len,
1351 memory_order_relaxed);
1352 peak_queue = atomic_load_explicit(
1353 &fnc->counters.ctxqueue_len_peak,
1354 memory_order_relaxed);
edfeff42 1355 if (peak_queue < cur_queue)
c871e6c9
RZ
1356 atomic_store_explicit(
1357 &fnc->counters.ctxqueue_len_peak,
1358 peak_queue, memory_order_relaxed);
ba803a2f 1359 continue;
6cc059cd
RZ
1360 }
1361
d35f447d
RZ
1362 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1363 dplane_provider_enqueue_out_ctx(prov, ctx);
1364 }
1365
c871e6c9
RZ
1366 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1367 memory_order_relaxed)
1368 > 0)
ba803a2f
RZ
1369 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1370 fnc, 0, &fnc->t_dequeue);
1371
d35f447d
RZ
1372 return 0;
1373}
1374
1375static int fpm_nl_new(struct thread_master *tm)
1376{
1377 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1378 int rv;
1379
3bdd7fca 1380 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1381 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1382 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1383 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1384 &prov);
1385
1386 if (IS_ZEBRA_DEBUG_DPLANE)
1387 zlog_debug("%s register status: %d", prov_name, rv);
1388
612c2c15 1389 install_node(&fpm_node);
6cc059cd
RZ
1390 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1391 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1392 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1393 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1394 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1395 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1396 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1397
d35f447d
RZ
1398 return 0;
1399}
1400
1401static int fpm_nl_init(void)
1402{
1403 hook_register(frr_late_init, fpm_nl_new);
1404 return 0;
1405}
1406
1407FRR_MODULE_SETUP(
1408 .name = "dplane_fpm_nl",
1409 .version = "0.0.1",
1410 .description = "Data plane plugin for FPM using netlink.",
1411 .init = fpm_nl_init,
1412 )