]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
zebra,fpm: fix dead lock on close during startup
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
018e77bc 46#include "zebra/zebra_router.h"
bda10adf 47#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
48#include "zebra/kernel_netlink.h"
49#include "zebra/rt_netlink.h"
50#include "zebra/debug.h"
51
52#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
53#define SOUTHBOUND_DEFAULT_PORT 2620
54
a179ba35
RZ
55/**
56 * FPM header:
57 * {
58 * version: 1 byte (always 1),
59 * type: 1 byte (1 for netlink, 2 protobuf),
60 * len: 2 bytes (network order),
61 * }
62 *
63 * This header is used with any format to tell the users how many bytes to
64 * expect.
65 */
66#define FPM_HEADER_SIZE 4
67
d35f447d
RZ
68static const char *prov_name = "dplane_fpm_nl";
69
70struct fpm_nl_ctx {
71 /* data plane connection. */
72 int socket;
3bdd7fca 73 bool disabled;
d35f447d 74 bool connecting;
018e77bc 75 bool rib_complete;
bda10adf 76 bool rmac_complete;
b55ab92a 77 bool use_nhg;
d35f447d
RZ
78 struct sockaddr_storage addr;
79
80 /* data plane buffers. */
81 struct stream *ibuf;
82 struct stream *obuf;
83 pthread_mutex_t obuf_mutex;
84
ba803a2f
RZ
85 /*
86 * data plane context queue:
87 * When a FPM server connection becomes a bottleneck, we must keep the
88 * data plane contexts until we get a chance to process them.
89 */
90 struct dplane_ctx_q ctxqueue;
91 pthread_mutex_t ctxqueue_mutex;
92
d35f447d 93 /* data plane events. */
ba803a2f 94 struct zebra_dplane_provider *prov;
d35f447d
RZ
95 struct frr_pthread *fthread;
96 struct thread *t_connect;
97 struct thread *t_read;
98 struct thread *t_write;
3bdd7fca 99 struct thread *t_event;
ba803a2f 100 struct thread *t_dequeue;
018e77bc
RZ
101
102 /* zebra events. */
981ca597
RZ
103 struct thread *t_nhgreset;
104 struct thread *t_nhgwalk;
018e77bc
RZ
105 struct thread *t_ribreset;
106 struct thread *t_ribwalk;
bda10adf
RZ
107 struct thread *t_rmacreset;
108 struct thread *t_rmacwalk;
6cc059cd
RZ
109
110 /* Statistic counters. */
111 struct {
112 /* Amount of bytes read into ibuf. */
770a8d28 113 _Atomic uint32_t bytes_read;
6cc059cd 114 /* Amount of bytes written from obuf. */
770a8d28 115 _Atomic uint32_t bytes_sent;
ad4d1022 116 /* Output buffer current usage. */
770a8d28 117 _Atomic uint32_t obuf_bytes;
ad4d1022 118 /* Output buffer peak usage. */
770a8d28 119 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
120
121 /* Amount of connection closes. */
770a8d28 122 _Atomic uint32_t connection_closes;
6cc059cd 123 /* Amount of connection errors. */
770a8d28 124 _Atomic uint32_t connection_errors;
6cc059cd
RZ
125
126 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 127 _Atomic uint32_t user_configures;
6cc059cd 128 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 129 _Atomic uint32_t user_disables;
6cc059cd
RZ
130
131 /* Amount of data plane context processed. */
770a8d28 132 _Atomic uint32_t dplane_contexts;
ba803a2f 133 /* Amount of data plane contexts enqueued. */
770a8d28 134 _Atomic uint32_t ctxqueue_len;
ba803a2f 135 /* Peak amount of data plane contexts enqueued. */
770a8d28 136 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
137
138 /* Amount of buffer full events. */
770a8d28 139 _Atomic uint32_t buffer_full;
6cc059cd 140 } counters;
770a8d28 141} *gfnc;
3bdd7fca
RZ
142
143enum fpm_nl_events {
144 /* Ask for FPM to reconnect the external server. */
145 FNE_RECONNECT,
146 /* Disable FPM. */
147 FNE_DISABLE,
6cc059cd
RZ
148 /* Reset counters. */
149 FNE_RESET_COUNTERS,
b55ab92a
RZ
150 /* Toggle next hop group feature. */
151 FNE_TOGGLE_NHG,
a2032324
RZ
152 /* Reconnect request by our own code to avoid races. */
153 FNE_INTERNAL_RECONNECT,
d35f447d
RZ
154};
155
a2032324
RZ
156#define FPM_RECONNECT(fnc) \
157 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
158 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
159
018e77bc
RZ
160/*
161 * Prototypes.
162 */
3bdd7fca 163static int fpm_process_event(struct thread *t);
018e77bc 164static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
981ca597
RZ
165static int fpm_nhg_send(struct thread *t);
166static int fpm_nhg_reset(struct thread *t);
018e77bc
RZ
167static int fpm_rib_send(struct thread *t);
168static int fpm_rib_reset(struct thread *t);
bda10adf
RZ
169static int fpm_rmac_send(struct thread *t);
170static int fpm_rmac_reset(struct thread *t);
018e77bc 171
ad4d1022
RZ
172/*
173 * Helper functions.
174 */
175
176/**
177 * Reorganizes the data on the buffer so it can fit more data.
178 *
179 * @param s stream pointer.
180 */
181static void stream_pulldown(struct stream *s)
182{
183 size_t rlen = STREAM_READABLE(s);
184
185 /* No more data, so just move the pointers. */
186 if (rlen == 0) {
187 stream_reset(s);
188 return;
189 }
190
191 /* Move the available data to the beginning. */
192 memmove(s->data, &s->data[s->getp], rlen);
193 s->getp = 0;
194 s->endp = rlen;
195}
196
3bdd7fca
RZ
197/*
198 * CLI.
199 */
6cc059cd
RZ
200#define FPM_STR "Forwarding Plane Manager configuration\n"
201
3bdd7fca
RZ
202DEFUN(fpm_set_address, fpm_set_address_cmd,
203 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 204 FPM_STR
3bdd7fca
RZ
205 "FPM remote listening server address\n"
206 "Remote IPv4 FPM server\n"
207 "Remote IPv6 FPM server\n"
208 "FPM remote listening server port\n"
209 "Remote FPM server port\n")
210{
211 struct sockaddr_in *sin;
212 struct sockaddr_in6 *sin6;
213 uint16_t port = 0;
214 uint8_t naddr[INET6_BUFSIZ];
215
216 if (argc == 5)
217 port = strtol(argv[4]->arg, NULL, 10);
218
219 /* Handle IPv4 addresses. */
220 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
221 sin = (struct sockaddr_in *)&gfnc->addr;
222
223 memset(sin, 0, sizeof(*sin));
224 sin->sin_family = AF_INET;
225 sin->sin_port =
226 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
227#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
228 sin->sin_len = sizeof(*sin);
229#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
230 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
231
232 goto ask_reconnect;
233 }
234
235 /* Handle IPv6 addresses. */
236 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
237 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
238 return CMD_WARNING;
239 }
240
241 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
242 memset(sin6, 0, sizeof(*sin6));
243 sin6->sin6_family = AF_INET6;
244 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
245#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
246 sin6->sin6_len = sizeof(*sin6);
247#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
248 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
249
250ask_reconnect:
251 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
252 FNE_RECONNECT, &gfnc->t_event);
253 return CMD_SUCCESS;
254}
255
256DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
257 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
258 NO_STR
6cc059cd 259 FPM_STR
3bdd7fca
RZ
260 "FPM remote listening server address\n"
261 "Remote IPv4 FPM server\n"
262 "Remote IPv6 FPM server\n"
263 "FPM remote listening server port\n"
264 "Remote FPM server port\n")
265{
266 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
267 FNE_DISABLE, &gfnc->t_event);
268 return CMD_SUCCESS;
269}
270
b55ab92a
RZ
271DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
272 "fpm use-next-hop-groups",
273 FPM_STR
274 "Use netlink next hop groups feature.\n")
275{
276 /* Already enabled. */
277 if (gfnc->use_nhg)
278 return CMD_SUCCESS;
279
280 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
281 FNE_TOGGLE_NHG, &gfnc->t_event);
282
283 return CMD_SUCCESS;
284}
285
286DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
287 "no fpm use-next-hop-groups",
288 NO_STR
289 FPM_STR
290 "Use netlink next hop groups feature.\n")
291{
292 /* Already disabled. */
293 if (!gfnc->use_nhg)
294 return CMD_SUCCESS;
295
296 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
297 FNE_TOGGLE_NHG, &gfnc->t_event);
298
299 return CMD_SUCCESS;
300}
301
6cc059cd
RZ
302DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
303 "clear fpm counters",
304 CLEAR_STR
305 FPM_STR
306 "FPM statistic counters\n")
307{
308 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
309 FNE_RESET_COUNTERS, &gfnc->t_event);
310 return CMD_SUCCESS;
311}
312
313DEFUN(fpm_show_counters, fpm_show_counters_cmd,
314 "show fpm counters",
315 SHOW_STR
316 FPM_STR
317 "FPM statistic counters\n")
318{
319 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
320
321#define SHOW_COUNTER(label, counter) \
770a8d28 322 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
323
324 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
325 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
326 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
327 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
328 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
329 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
330 SHOW_COUNTER("Data plane items processed",
331 gfnc->counters.dplane_contexts);
ba803a2f
RZ
332 SHOW_COUNTER("Data plane items enqueued",
333 gfnc->counters.ctxqueue_len);
334 SHOW_COUNTER("Data plane items queue peak",
335 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
336 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
337 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
338 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
339
340#undef SHOW_COUNTER
341
342 return CMD_SUCCESS;
343}
344
345DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
346 "show fpm counters json",
347 SHOW_STR
348 FPM_STR
349 "FPM statistic counters\n"
350 JSON_STR)
351{
352 struct json_object *jo;
353
354 jo = json_object_new_object();
355 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
356 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
357 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
358 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
359 json_object_int_add(jo, "connection-closes",
360 gfnc->counters.connection_closes);
361 json_object_int_add(jo, "connection-errors",
362 gfnc->counters.connection_errors);
363 json_object_int_add(jo, "data-plane-contexts",
364 gfnc->counters.dplane_contexts);
ba803a2f
RZ
365 json_object_int_add(jo, "data-plane-contexts-queue",
366 gfnc->counters.ctxqueue_len);
367 json_object_int_add(jo, "data-plane-contexts-queue-peak",
368 gfnc->counters.ctxqueue_len_peak);
6cc059cd 369 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
370 json_object_int_add(jo, "user-configures",
371 gfnc->counters.user_configures);
6cc059cd
RZ
372 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
373 vty_out(vty, "%s\n", json_object_to_json_string_ext(jo, 0));
374 json_object_free(jo);
375
376 return CMD_SUCCESS;
377}
378
3bdd7fca
RZ
379static int fpm_write_config(struct vty *vty)
380{
381 struct sockaddr_in *sin;
382 struct sockaddr_in6 *sin6;
383 int written = 0;
384 char addrstr[INET6_ADDRSTRLEN];
385
386 if (gfnc->disabled)
387 return written;
388
389 switch (gfnc->addr.ss_family) {
390 case AF_INET:
391 written = 1;
392 sin = (struct sockaddr_in *)&gfnc->addr;
393 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
394 vty_out(vty, "fpm address %s", addrstr);
395 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
396 vty_out(vty, " port %d", ntohs(sin->sin_port));
397
398 vty_out(vty, "\n");
399 break;
400 case AF_INET6:
401 written = 1;
402 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
403 inet_ntop(AF_INET, &sin6->sin6_addr, addrstr, sizeof(addrstr));
404 vty_out(vty, "fpm address %s", addrstr);
405 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
406 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
407
408 vty_out(vty, "\n");
409 break;
410
411 default:
412 break;
413 }
414
b55ab92a
RZ
415 if (!gfnc->use_nhg) {
416 vty_out(vty, "no fpm use-next-hop-groups\n");
417 written = 1;
418 }
419
3bdd7fca
RZ
420 return written;
421}
422
612c2c15 423static struct cmd_node fpm_node = {
893d8beb
DL
424 .name = "fpm",
425 .node = FPM_NODE,
3bdd7fca 426 .prompt = "",
612c2c15 427 .config_write = fpm_write_config,
3bdd7fca
RZ
428};
429
d35f447d
RZ
430/*
431 * FPM functions.
432 */
433static int fpm_connect(struct thread *t);
434
435static void fpm_reconnect(struct fpm_nl_ctx *fnc)
436{
a2032324
RZ
437 /* Cancel all zebra threads first. */
438 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
439 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
440 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
441 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
442 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
443 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
444
445 /*
446 * Grab the lock to empty the streams (data plane might try to
447 * enqueue updates while we are closing).
448 */
d35f447d
RZ
449 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
450
3bdd7fca
RZ
451 /* Avoid calling close on `-1`. */
452 if (fnc->socket != -1) {
453 close(fnc->socket);
454 fnc->socket = -1;
455 }
456
d35f447d
RZ
457 stream_reset(fnc->ibuf);
458 stream_reset(fnc->obuf);
459 THREAD_OFF(fnc->t_read);
460 THREAD_OFF(fnc->t_write);
018e77bc 461
3bdd7fca
RZ
462 /* FPM is disabled, don't attempt to connect. */
463 if (fnc->disabled)
464 return;
465
d35f447d
RZ
466 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
467 &fnc->t_connect);
468}
469
470static int fpm_read(struct thread *t)
471{
472 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
473 ssize_t rv;
474
475 /* Let's ignore the input at the moment. */
476 rv = stream_read_try(fnc->ibuf, fnc->socket,
477 STREAM_WRITEABLE(fnc->ibuf));
478 if (rv == 0) {
c871e6c9
RZ
479 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
480 memory_order_relaxed);
e5e444d8
RZ
481
482 if (IS_ZEBRA_DEBUG_FPM)
483 zlog_debug("%s: connection closed", __func__);
484
a2032324 485 FPM_RECONNECT(fnc);
d35f447d
RZ
486 return 0;
487 }
488 if (rv == -1) {
489 if (errno == EAGAIN || errno == EWOULDBLOCK
490 || errno == EINTR)
491 return 0;
492
c871e6c9
RZ
493 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
494 memory_order_relaxed);
e5e444d8
RZ
495 zlog_warn("%s: connection failure: %s", __func__,
496 strerror(errno));
a2032324 497 FPM_RECONNECT(fnc);
d35f447d
RZ
498 return 0;
499 }
500 stream_reset(fnc->ibuf);
501
6cc059cd 502 /* Account all bytes read. */
c871e6c9
RZ
503 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
504 memory_order_relaxed);
6cc059cd 505
d35f447d
RZ
506 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
507 &fnc->t_read);
508
509 return 0;
510}
511
512static int fpm_write(struct thread *t)
513{
514 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
515 socklen_t statuslen;
516 ssize_t bwritten;
517 int rv, status;
518 size_t btotal;
519
520 if (fnc->connecting == true) {
521 status = 0;
522 statuslen = sizeof(status);
523
524 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
525 &statuslen);
526 if (rv == -1 || status != 0) {
527 if (rv != -1)
e5e444d8
RZ
528 zlog_warn("%s: connection failed: %s", __func__,
529 strerror(status));
d35f447d 530 else
e5e444d8
RZ
531 zlog_warn("%s: SO_ERROR failed: %s", __func__,
532 strerror(status));
d35f447d 533
c871e6c9
RZ
534 atomic_fetch_add_explicit(
535 &fnc->counters.connection_errors, 1,
536 memory_order_relaxed);
6cc059cd 537
a2032324 538 FPM_RECONNECT(fnc);
d35f447d
RZ
539 return 0;
540 }
541
542 fnc->connecting = false;
018e77bc 543
b55ab92a
RZ
544 /*
545 * Walk the route tables to send old information before starting
546 * to send updated information.
547 *
548 * NOTE 1:
549 * RIB table walk is called after the next group table walk
550 * ends.
551 *
552 * NOTE 2:
553 * Don't attempt to go through next hop group table if we were
554 * explictly told to not use it.
555 */
556 if (fnc->use_nhg)
557 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
558 &fnc->t_nhgwalk);
559 else
560 thread_add_timer(zrouter.master, fpm_rib_send, fnc, 0,
561 &fnc->t_ribwalk);
562
bda10adf
RZ
563 thread_add_timer(zrouter.master, fpm_rmac_send, fnc, 0,
564 &fnc->t_rmacwalk);
d35f447d
RZ
565 }
566
567 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
568
569 while (true) {
570 /* Stream is empty: reset pointers and return. */
571 if (STREAM_READABLE(fnc->obuf) == 0) {
572 stream_reset(fnc->obuf);
573 break;
574 }
575
576 /* Try to write all at once. */
577 btotal = stream_get_endp(fnc->obuf) -
578 stream_get_getp(fnc->obuf);
579 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
580 if (bwritten == 0) {
c871e6c9
RZ
581 atomic_fetch_add_explicit(
582 &fnc->counters.connection_closes, 1,
583 memory_order_relaxed);
e5e444d8
RZ
584
585 if (IS_ZEBRA_DEBUG_FPM)
586 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
587 break;
588 }
589 if (bwritten == -1) {
ad4d1022
RZ
590 /* Attempt to continue if blocked by a signal. */
591 if (errno == EINTR)
592 continue;
593 /* Receiver is probably slow, lets give it some time. */
594 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
595 break;
596
c871e6c9
RZ
597 atomic_fetch_add_explicit(
598 &fnc->counters.connection_errors, 1,
599 memory_order_relaxed);
e5e444d8
RZ
600 zlog_warn("%s: connection failure: %s", __func__,
601 strerror(errno));
a2032324
RZ
602
603 FPM_RECONNECT(fnc);
604 return 0;
d35f447d
RZ
605 }
606
6cc059cd 607 /* Account all bytes sent. */
c871e6c9
RZ
608 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
609 memory_order_relaxed);
6cc059cd 610
ad4d1022 611 /* Account number of bytes free. */
c871e6c9
RZ
612 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
613 memory_order_relaxed);
ad4d1022 614
d35f447d
RZ
615 stream_forward_getp(fnc->obuf, (size_t)bwritten);
616 }
617
618 /* Stream is not empty yet, we must schedule more writes. */
619 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 620 stream_pulldown(fnc->obuf);
d35f447d
RZ
621 thread_add_write(fnc->fthread->master, fpm_write, fnc,
622 fnc->socket, &fnc->t_write);
623 return 0;
624 }
625
626 return 0;
627}
628
629static int fpm_connect(struct thread *t)
630{
631 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
632 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
633 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
634 socklen_t slen;
d35f447d
RZ
635 int rv, sock;
636 char addrstr[INET6_ADDRSTRLEN];
637
3bdd7fca 638 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 639 if (sock == -1) {
6cc059cd 640 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
641 strerror(errno));
642 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
643 &fnc->t_connect);
644 return 0;
645 }
646
647 set_nonblocking(sock);
648
3bdd7fca
RZ
649 if (fnc->addr.ss_family == AF_INET) {
650 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
651 slen = sizeof(*sin);
652 } else {
653 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
654 slen = sizeof(*sin6);
655 }
d35f447d 656
e5e444d8
RZ
657 if (IS_ZEBRA_DEBUG_FPM)
658 zlog_debug("%s: attempting to connect to %s:%d", __func__,
659 addrstr, ntohs(sin->sin_port));
d35f447d 660
3bdd7fca 661 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 662 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
663 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
664 memory_order_relaxed);
d35f447d
RZ
665 close(sock);
666 zlog_warn("%s: fpm connection failed: %s", __func__,
667 strerror(errno));
668 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
669 &fnc->t_connect);
670 return 0;
671 }
672
673 fnc->connecting = (errno == EINPROGRESS);
674 fnc->socket = sock;
675 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
676 &fnc->t_read);
677 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
678 &fnc->t_write);
679
018e77bc 680 /* Mark all routes as unsent. */
981ca597
RZ
681 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
682 &fnc->t_nhgreset);
018e77bc
RZ
683 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
684 &fnc->t_ribreset);
bda10adf
RZ
685 thread_add_timer(zrouter.master, fpm_rmac_reset, fnc, 0,
686 &fnc->t_rmacreset);
018e77bc 687
d35f447d
RZ
688 return 0;
689}
690
691/**
692 * Encode data plane operation context into netlink and enqueue it in the FPM
693 * output buffer.
694 *
695 * @param fnc the netlink FPM context.
696 * @param ctx the data plane operation context data.
697 * @return 0 on success or -1 on not enough space.
698 */
699static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
700{
701 uint8_t nl_buf[NL_PKT_BUF_SIZE];
702 size_t nl_buf_len;
703 ssize_t rv;
edfeff42 704 uint64_t obytes, obytes_peak;
b55ab92a
RZ
705 enum dplane_op_e op = dplane_ctx_get_op(ctx);
706
707 /*
708 * If we were configured to not use next hop groups, then quit as soon
709 * as possible.
710 */
711 if ((!fnc->use_nhg)
712 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
713 || op == DPLANE_OP_NH_UPDATE))
714 return 0;
d35f447d
RZ
715
716 nl_buf_len = 0;
717
718 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
719
b55ab92a 720 switch (op) {
d35f447d
RZ
721 case DPLANE_OP_ROUTE_UPDATE:
722 case DPLANE_OP_ROUTE_DELETE:
0be6e7d7
JU
723 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
724 nl_buf, sizeof(nl_buf),
725 true, fnc->use_nhg);
d35f447d 726 if (rv <= 0) {
0be6e7d7
JU
727 zlog_err(
728 "%s: netlink_route_multipath_msg_encode failed",
729 __func__);
d35f447d
RZ
730 return 0;
731 }
732
733 nl_buf_len = (size_t)rv;
d35f447d
RZ
734
735 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 736 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
737 break;
738
739 /* FALL THROUGH */
740 case DPLANE_OP_ROUTE_INSTALL:
0be6e7d7 741 rv = netlink_route_multipath_msg_encode(
b55ab92a
RZ
742 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
743 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 744 if (rv <= 0) {
0be6e7d7
JU
745 zlog_err(
746 "%s: netlink_route_multipath_msg_encode failed",
747 __func__);
d35f447d
RZ
748 return 0;
749 }
750
751 nl_buf_len += (size_t)rv;
d35f447d
RZ
752 break;
753
bda10adf
RZ
754 case DPLANE_OP_MAC_INSTALL:
755 case DPLANE_OP_MAC_DELETE:
756 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
757 if (rv <= 0) {
e5e444d8
RZ
758 zlog_err("%s: netlink_macfdb_update_ctx failed",
759 __func__);
bda10adf
RZ
760 return 0;
761 }
762
763 nl_buf_len = (size_t)rv;
bda10adf
RZ
764 break;
765
e9a1cd93 766 case DPLANE_OP_NH_DELETE:
0be6e7d7
JU
767 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
768 sizeof(nl_buf));
e9a1cd93 769 if (rv <= 0) {
0be6e7d7
JU
770 zlog_err("%s: netlink_nexthop_msg_encode failed",
771 __func__);
e9a1cd93
RZ
772 return 0;
773 }
774
775 nl_buf_len = (size_t)rv;
776 break;
d35f447d
RZ
777 case DPLANE_OP_NH_INSTALL:
778 case DPLANE_OP_NH_UPDATE:
0be6e7d7
JU
779 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
780 sizeof(nl_buf));
e9a1cd93 781 if (rv <= 0) {
0be6e7d7
JU
782 zlog_err("%s: netlink_nexthop_msg_encode failed",
783 __func__);
e9a1cd93
RZ
784 return 0;
785 }
786
787 nl_buf_len = (size_t)rv;
788 break;
789
d35f447d
RZ
790 case DPLANE_OP_LSP_INSTALL:
791 case DPLANE_OP_LSP_UPDATE:
792 case DPLANE_OP_LSP_DELETE:
793 case DPLANE_OP_PW_INSTALL:
794 case DPLANE_OP_PW_UNINSTALL:
795 case DPLANE_OP_ADDR_INSTALL:
796 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
797 case DPLANE_OP_NEIGH_INSTALL:
798 case DPLANE_OP_NEIGH_UPDATE:
799 case DPLANE_OP_NEIGH_DELETE:
800 case DPLANE_OP_VTEP_ADD:
801 case DPLANE_OP_VTEP_DELETE:
802 case DPLANE_OP_SYS_ROUTE_ADD:
803 case DPLANE_OP_SYS_ROUTE_DELETE:
804 case DPLANE_OP_ROUTE_NOTIFY:
805 case DPLANE_OP_LSP_NOTIFY:
806 case DPLANE_OP_NONE:
807 break;
808
809 default:
e5e444d8
RZ
810 if (IS_ZEBRA_DEBUG_FPM)
811 zlog_debug("%s: unhandled data plane message (%d) %s",
812 __func__, dplane_ctx_get_op(ctx),
813 dplane_op2str(dplane_ctx_get_op(ctx)));
d35f447d
RZ
814 break;
815 }
816
817 /* Skip empty enqueues. */
818 if (nl_buf_len == 0)
819 return 0;
820
a179ba35
RZ
821 /* We must know if someday a message goes beyond 65KiB. */
822 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
823
824 /* Check if we have enough buffer space. */
825 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
826 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
827 memory_order_relaxed);
e5e444d8
RZ
828
829 if (IS_ZEBRA_DEBUG_FPM)
830 zlog_debug(
831 "%s: buffer full: wants to write %zu but has %zu",
832 __func__, nl_buf_len + FPM_HEADER_SIZE,
833 STREAM_WRITEABLE(fnc->obuf));
834
a179ba35
RZ
835 return -1;
836 }
837
d35f447d 838 /*
a179ba35
RZ
839 * Fill in the FPM header information.
840 *
841 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
842 */
843 stream_putc(fnc->obuf, 1);
844 stream_putc(fnc->obuf, 1);
a179ba35 845 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
846
847 /* Write current data. */
848 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
849
ad4d1022 850 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
851 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
852 nl_buf_len + FPM_HEADER_SIZE,
853 memory_order_relaxed);
edfeff42
RZ
854 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
855 memory_order_relaxed);
856 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
857 memory_order_relaxed);
858 if (obytes_peak < obytes)
c871e6c9
RZ
859 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
860 memory_order_relaxed);
ad4d1022 861
d35f447d
RZ
862 /* Tell the thread to start writing. */
863 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
864 &fnc->t_write);
865
866 return 0;
867}
868
981ca597
RZ
869/*
870 * Next hop walk/send functions.
871 */
872struct fpm_nhg_arg {
873 struct zebra_dplane_ctx *ctx;
874 struct fpm_nl_ctx *fnc;
875 bool complete;
876};
877
878static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
879{
880 struct nhg_hash_entry *nhe = bucket->data;
881 struct fpm_nhg_arg *fna = arg;
882
883 /* This entry was already sent, skip it. */
884 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
885 return HASHWALK_CONTINUE;
886
887 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
888 dplane_ctx_reset(fna->ctx);
889 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
890 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
891 /* Our buffers are full, lets give it some cycles. */
892 fna->complete = false;
893 return HASHWALK_ABORT;
894 }
895
896 /* Mark group as sent, so it doesn't get sent again. */
897 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
898
899 return HASHWALK_CONTINUE;
900}
901
902static int fpm_nhg_send(struct thread *t)
903{
904 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
905 struct fpm_nhg_arg fna;
906
907 fna.fnc = fnc;
908 fna.ctx = dplane_ctx_alloc();
909 fna.complete = true;
910
911 /* Send next hops. */
912 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
913
914 /* `free()` allocated memory. */
915 dplane_ctx_fini(&fna.ctx);
916
917 /* We are done sending next hops, lets install the routes now. */
918 if (fna.complete)
919 thread_add_timer(zrouter.master, fpm_rib_send, fnc, 0,
920 &fnc->t_ribwalk);
921 else /* Otherwise reschedule next hop group again. */
922 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
923 &fnc->t_nhgwalk);
924
925 return 0;
926}
927
018e77bc
RZ
928/**
929 * Send all RIB installed routes to the connected data plane.
930 */
931static int fpm_rib_send(struct thread *t)
932{
933 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
934 rib_dest_t *dest;
935 struct route_node *rn;
936 struct route_table *rt;
937 struct zebra_dplane_ctx *ctx;
938 rib_tables_iter_t rt_iter;
939
940 /* Allocate temporary context for all transactions. */
941 ctx = dplane_ctx_alloc();
942
943 rt_iter.state = RIB_TABLES_ITER_S_INIT;
944 while ((rt = rib_tables_iter_next(&rt_iter))) {
945 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
946 dest = rib_dest_from_rnode(rn);
947 /* Skip bad route entries. */
a50404aa 948 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 949 continue;
018e77bc
RZ
950
951 /* Check for already sent routes. */
a50404aa 952 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 953 continue;
018e77bc
RZ
954
955 /* Enqueue route install. */
956 dplane_ctx_reset(ctx);
957 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
958 dest->selected_fib);
959 if (fpm_nl_enqueue(fnc, ctx) == -1) {
960 /* Free the temporary allocated context. */
961 dplane_ctx_fini(&ctx);
962
018e77bc
RZ
963 thread_add_timer(zrouter.master, fpm_rib_send,
964 fnc, 1, &fnc->t_ribwalk);
965 return 0;
966 }
967
968 /* Mark as sent. */
969 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
970 }
971 }
972
973 /* Free the temporary allocated context. */
974 dplane_ctx_fini(&ctx);
975
976 /* All RIB routes sent! */
977 fnc->rib_complete = true;
978
979 return 0;
980}
981
bda10adf
RZ
982/*
983 * The next three functions will handle RMAC enqueue.
984 */
985struct fpm_rmac_arg {
986 struct zebra_dplane_ctx *ctx;
987 struct fpm_nl_ctx *fnc;
988 zebra_l3vni_t *zl3vni;
989};
990
9d5c3268 991static void fpm_enqueue_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
992{
993 struct fpm_rmac_arg *fra = arg;
994 zebra_mac_t *zrmac = backet->data;
995 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
996 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
997 struct zebra_if *br_zif;
998 vlanid_t vid;
999 bool sticky;
1000
1001 /* Entry already sent. */
1002 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT))
1003 return;
1004
1005 sticky = !!CHECK_FLAG(zrmac->flags,
1006 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1007 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1008 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1009
1010 dplane_ctx_reset(fra->ctx);
1011 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1012 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a
RZ
1013 zif->brslave_info.br_if, vid,
1014 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky);
bda10adf 1015 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1016 thread_add_timer(zrouter.master, fpm_rmac_send,
1017 fra->fnc, 1, &fra->fnc->t_rmacwalk);
1018 }
1019}
1020
9d5c3268 1021static void fpm_enqueue_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1022{
1023 struct fpm_rmac_arg *fra = arg;
1024 zebra_l3vni_t *zl3vni = backet->data;
1025
1026 fra->zl3vni = zl3vni;
1027 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1028}
1029
1030static int fpm_rmac_send(struct thread *t)
1031{
1032 struct fpm_rmac_arg fra;
1033
1034 fra.fnc = THREAD_ARG(t);
1035 fra.ctx = dplane_ctx_alloc();
1036 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1037 dplane_ctx_fini(&fra.ctx);
1038
1039 return 0;
1040}
1041
981ca597
RZ
1042/*
1043 * Resets the next hop FPM flags so we send all next hops again.
1044 */
1045static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1046{
1047 struct nhg_hash_entry *nhe = bucket->data;
1048
1049 /* Unset FPM installation flag so it gets installed again. */
1050 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1051}
1052
1053static int fpm_nhg_reset(struct thread *t)
1054{
1055 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
1056 return 0;
1057}
1058
018e77bc
RZ
1059/**
1060 * Resets the RIB FPM flags so we send all routes again.
1061 */
1062static int fpm_rib_reset(struct thread *t)
1063{
1064 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1065 rib_dest_t *dest;
1066 struct route_node *rn;
1067 struct route_table *rt;
1068 rib_tables_iter_t rt_iter;
1069
1070 fnc->rib_complete = false;
1071
1072 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1073 while ((rt = rib_tables_iter_next(&rt_iter))) {
1074 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1075 dest = rib_dest_from_rnode(rn);
1076 /* Skip bad route entries. */
1077 if (dest == NULL)
1078 continue;
1079
1080 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1081 }
1082 }
1083
1084 return 0;
1085}
1086
bda10adf
RZ
1087/*
1088 * The next three function will handle RMAC table reset.
1089 */
9d5c3268 1090static void fpm_unset_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1091{
1092 zebra_mac_t *zrmac = backet->data;
1093
1094 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1095}
1096
9d5c3268 1097static void fpm_unset_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1098{
1099 zebra_l3vni_t *zl3vni = backet->data;
1100
1101 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1102}
1103
1104static int fpm_rmac_reset(struct thread *t)
1105{
1106 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1107
1108 return 0;
1109}
1110
ba803a2f
RZ
1111static int fpm_process_queue(struct thread *t)
1112{
1113 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1114 struct zebra_dplane_ctx *ctx;
1115
1116 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1117
1118 while (true) {
1119 /* No space available yet. */
1120 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
1121 break;
1122
1123 /* Dequeue next item or quit processing. */
1124 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1125 if (ctx == NULL)
1126 break;
1127
1128 fpm_nl_enqueue(fnc, ctx);
1129
1130 /* Account the processed entries. */
c871e6c9
RZ
1131 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1,
1132 memory_order_relaxed);
1133 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1134 memory_order_relaxed);
ba803a2f
RZ
1135
1136 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1137 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1138 }
1139
1140 /* Check for more items in the queue. */
c871e6c9
RZ
1141 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1142 memory_order_relaxed)
1143 > 0)
ba803a2f
RZ
1144 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1145 fnc, 0, &fnc->t_dequeue);
1146
1147 return 0;
1148}
1149
3bdd7fca
RZ
1150/**
1151 * Handles external (e.g. CLI, data plane or others) events.
1152 */
1153static int fpm_process_event(struct thread *t)
1154{
1155 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1156 int event = THREAD_VAL(t);
1157
1158 switch (event) {
1159 case FNE_DISABLE:
e5e444d8 1160 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1161 fnc->disabled = true;
c871e6c9
RZ
1162 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1163 memory_order_relaxed);
3bdd7fca
RZ
1164
1165 /* Call reconnect to disable timers and clean up context. */
1166 fpm_reconnect(fnc);
1167 break;
1168
1169 case FNE_RECONNECT:
e5e444d8 1170 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1171 fnc->disabled = false;
c871e6c9
RZ
1172 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1173 memory_order_relaxed);
3bdd7fca
RZ
1174 fpm_reconnect(fnc);
1175 break;
1176
6cc059cd 1177 case FNE_RESET_COUNTERS:
e5e444d8 1178 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1179 memset(&fnc->counters, 0, sizeof(fnc->counters));
1180 break;
1181
b55ab92a
RZ
1182 case FNE_TOGGLE_NHG:
1183 zlog_info("%s: toggle next hop groups support", __func__);
1184 fnc->use_nhg = !fnc->use_nhg;
1185 fpm_reconnect(fnc);
1186 break;
1187
a2032324
RZ
1188 case FNE_INTERNAL_RECONNECT:
1189 fpm_reconnect(fnc);
1190 break;
1191
3bdd7fca 1192 default:
e5e444d8
RZ
1193 if (IS_ZEBRA_DEBUG_FPM)
1194 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1195 break;
1196 }
1197
1198 return 0;
1199}
1200
d35f447d
RZ
1201/*
1202 * Data plane functions.
1203 */
1204static int fpm_nl_start(struct zebra_dplane_provider *prov)
1205{
1206 struct fpm_nl_ctx *fnc;
1207
1208 fnc = dplane_provider_get_data(prov);
1209 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1210 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1211 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1212 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1213 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1214 fnc->socket = -1;
3bdd7fca 1215 fnc->disabled = true;
ba803a2f
RZ
1216 fnc->prov = prov;
1217 TAILQ_INIT(&fnc->ctxqueue);
1218 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1219
b55ab92a
RZ
1220 /* Set default values. */
1221 fnc->use_nhg = true;
1222
d35f447d
RZ
1223 return 0;
1224}
1225
98a87504 1226static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1227{
98a87504 1228 /* Disable all events and close socket. */
981ca597
RZ
1229 THREAD_OFF(fnc->t_nhgreset);
1230 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1231 THREAD_OFF(fnc->t_ribreset);
1232 THREAD_OFF(fnc->t_ribwalk);
1233 THREAD_OFF(fnc->t_rmacreset);
1234 THREAD_OFF(fnc->t_rmacwalk);
1235 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1236 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1237 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1238
98a87504
RZ
1239 if (fnc->socket != -1) {
1240 close(fnc->socket);
1241 fnc->socket = -1;
1242 }
1243
1244 return 0;
1245}
1246
1247static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1248{
1249 /* Stop the running thread. */
1250 frr_pthread_stop(fnc->fthread, NULL);
1251
1252 /* Free all allocated resources. */
1253 pthread_mutex_destroy(&fnc->obuf_mutex);
1254 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1255 stream_free(fnc->ibuf);
1256 stream_free(fnc->obuf);
98a87504
RZ
1257 free(gfnc);
1258 gfnc = NULL;
d35f447d
RZ
1259
1260 return 0;
1261}
1262
98a87504
RZ
1263static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1264{
1265 struct fpm_nl_ctx *fnc;
1266
1267 fnc = dplane_provider_get_data(prov);
1268 if (early)
1269 return fpm_nl_finish_early(fnc);
1270
1271 return fpm_nl_finish_late(fnc);
1272}
1273
d35f447d
RZ
1274static int fpm_nl_process(struct zebra_dplane_provider *prov)
1275{
1276 struct zebra_dplane_ctx *ctx;
1277 struct fpm_nl_ctx *fnc;
1278 int counter, limit;
edfeff42 1279 uint64_t cur_queue, peak_queue;
d35f447d
RZ
1280
1281 fnc = dplane_provider_get_data(prov);
1282 limit = dplane_provider_get_work_limit(prov);
1283 for (counter = 0; counter < limit; counter++) {
1284 ctx = dplane_provider_dequeue_in_ctx(prov);
1285 if (ctx == NULL)
1286 break;
1287
1288 /*
1289 * Skip all notifications if not connected, we'll walk the RIB
1290 * anyway.
1291 */
6cc059cd 1292 if (fnc->socket != -1 && fnc->connecting == false) {
ba803a2f
RZ
1293 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1294 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1295
1296 /* Account the number of contexts. */
c871e6c9
RZ
1297 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1298 1, memory_order_relaxed);
1299 cur_queue = atomic_load_explicit(
1300 &fnc->counters.ctxqueue_len,
1301 memory_order_relaxed);
1302 peak_queue = atomic_load_explicit(
1303 &fnc->counters.ctxqueue_len_peak,
1304 memory_order_relaxed);
edfeff42 1305 if (peak_queue < cur_queue)
c871e6c9
RZ
1306 atomic_store_explicit(
1307 &fnc->counters.ctxqueue_len_peak,
1308 peak_queue, memory_order_relaxed);
ba803a2f 1309 continue;
6cc059cd
RZ
1310 }
1311
d35f447d
RZ
1312 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1313 dplane_provider_enqueue_out_ctx(prov, ctx);
1314 }
1315
c871e6c9
RZ
1316 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1317 memory_order_relaxed)
1318 > 0)
ba803a2f
RZ
1319 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1320 fnc, 0, &fnc->t_dequeue);
1321
d35f447d
RZ
1322 return 0;
1323}
1324
1325static int fpm_nl_new(struct thread_master *tm)
1326{
1327 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1328 int rv;
1329
3bdd7fca 1330 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1331 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1332 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1333 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1334 &prov);
1335
1336 if (IS_ZEBRA_DEBUG_DPLANE)
1337 zlog_debug("%s register status: %d", prov_name, rv);
1338
612c2c15 1339 install_node(&fpm_node);
6cc059cd
RZ
1340 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1341 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1342 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1343 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1344 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1345 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1346 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1347
d35f447d
RZ
1348 return 0;
1349}
1350
1351static int fpm_nl_init(void)
1352{
1353 hook_register(frr_late_init, fpm_nl_new);
1354 return 0;
1355}
1356
1357FRR_MODULE_SETUP(
1358 .name = "dplane_fpm_nl",
1359 .version = "0.0.1",
1360 .description = "Data plane plugin for FPM using netlink.",
1361 .init = fpm_nl_init,
1362 )