]> git.proxmox.com Git - mirror_frr.git/blame - zebra/dplane_fpm_nl.c
zebra,fpm: fix race on completion detection
[mirror_frr.git] / zebra / dplane_fpm_nl.c
CommitLineData
d35f447d
RZ
1/*
2 * Zebra dataplane plugin for Forwarding Plane Manager (FPM) using netlink.
3 *
4 * Copyright (C) 2019 Network Device Education Foundation, Inc. ("NetDEF")
5 * Rafael Zalamena
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along
18 * with this program; see the file COPYING; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
7309092b
DL
22#ifdef HAVE_CONFIG_H
23#include "config.h" /* Include this explicitly */
24#endif
25
d35f447d
RZ
26#include <arpa/inet.h>
27
28#include <sys/types.h>
29#include <sys/socket.h>
30
31#include <errno.h>
32#include <string.h>
33
d35f447d 34#include "lib/zebra.h"
6cc059cd 35#include "lib/json.h"
d35f447d 36#include "lib/libfrr.h"
c871e6c9 37#include "lib/frratomic.h"
3bdd7fca 38#include "lib/command.h"
d35f447d
RZ
39#include "lib/memory.h"
40#include "lib/network.h"
41#include "lib/ns.h"
42#include "lib/frr_pthread.h"
e5e444d8 43#include "zebra/debug.h"
bda10adf 44#include "zebra/interface.h"
d35f447d 45#include "zebra/zebra_dplane.h"
018e77bc 46#include "zebra/zebra_router.h"
bda10adf 47#include "zebra/zebra_vxlan_private.h"
d35f447d
RZ
48#include "zebra/kernel_netlink.h"
49#include "zebra/rt_netlink.h"
50#include "zebra/debug.h"
51
52#define SOUTHBOUND_DEFAULT_ADDR INADDR_LOOPBACK
53#define SOUTHBOUND_DEFAULT_PORT 2620
54
a179ba35
RZ
55/**
56 * FPM header:
57 * {
58 * version: 1 byte (always 1),
59 * type: 1 byte (1 for netlink, 2 protobuf),
60 * len: 2 bytes (network order),
61 * }
62 *
63 * This header is used with any format to tell the users how many bytes to
64 * expect.
65 */
66#define FPM_HEADER_SIZE 4
67
d35f447d
RZ
68static const char *prov_name = "dplane_fpm_nl";
69
70struct fpm_nl_ctx {
71 /* data plane connection. */
72 int socket;
3bdd7fca 73 bool disabled;
d35f447d 74 bool connecting;
55eb9d4d 75 bool nhg_complete;
018e77bc 76 bool rib_complete;
bda10adf 77 bool rmac_complete;
b55ab92a 78 bool use_nhg;
d35f447d
RZ
79 struct sockaddr_storage addr;
80
81 /* data plane buffers. */
82 struct stream *ibuf;
83 struct stream *obuf;
84 pthread_mutex_t obuf_mutex;
85
ba803a2f
RZ
86 /*
87 * data plane context queue:
88 * When a FPM server connection becomes a bottleneck, we must keep the
89 * data plane contexts until we get a chance to process them.
90 */
91 struct dplane_ctx_q ctxqueue;
92 pthread_mutex_t ctxqueue_mutex;
93
d35f447d 94 /* data plane events. */
ba803a2f 95 struct zebra_dplane_provider *prov;
d35f447d
RZ
96 struct frr_pthread *fthread;
97 struct thread *t_connect;
98 struct thread *t_read;
99 struct thread *t_write;
3bdd7fca 100 struct thread *t_event;
ba803a2f 101 struct thread *t_dequeue;
018e77bc
RZ
102
103 /* zebra events. */
981ca597
RZ
104 struct thread *t_nhgreset;
105 struct thread *t_nhgwalk;
018e77bc
RZ
106 struct thread *t_ribreset;
107 struct thread *t_ribwalk;
bda10adf
RZ
108 struct thread *t_rmacreset;
109 struct thread *t_rmacwalk;
6cc059cd
RZ
110
111 /* Statistic counters. */
112 struct {
113 /* Amount of bytes read into ibuf. */
770a8d28 114 _Atomic uint32_t bytes_read;
6cc059cd 115 /* Amount of bytes written from obuf. */
770a8d28 116 _Atomic uint32_t bytes_sent;
ad4d1022 117 /* Output buffer current usage. */
770a8d28 118 _Atomic uint32_t obuf_bytes;
ad4d1022 119 /* Output buffer peak usage. */
770a8d28 120 _Atomic uint32_t obuf_peak;
6cc059cd
RZ
121
122 /* Amount of connection closes. */
770a8d28 123 _Atomic uint32_t connection_closes;
6cc059cd 124 /* Amount of connection errors. */
770a8d28 125 _Atomic uint32_t connection_errors;
6cc059cd
RZ
126
127 /* Amount of user configurations: FNE_RECONNECT. */
770a8d28 128 _Atomic uint32_t user_configures;
6cc059cd 129 /* Amount of user disable requests: FNE_DISABLE. */
770a8d28 130 _Atomic uint32_t user_disables;
6cc059cd
RZ
131
132 /* Amount of data plane context processed. */
770a8d28 133 _Atomic uint32_t dplane_contexts;
ba803a2f 134 /* Amount of data plane contexts enqueued. */
770a8d28 135 _Atomic uint32_t ctxqueue_len;
ba803a2f 136 /* Peak amount of data plane contexts enqueued. */
770a8d28 137 _Atomic uint32_t ctxqueue_len_peak;
6cc059cd
RZ
138
139 /* Amount of buffer full events. */
770a8d28 140 _Atomic uint32_t buffer_full;
6cc059cd 141 } counters;
770a8d28 142} *gfnc;
3bdd7fca
RZ
143
144enum fpm_nl_events {
145 /* Ask for FPM to reconnect the external server. */
146 FNE_RECONNECT,
147 /* Disable FPM. */
148 FNE_DISABLE,
6cc059cd
RZ
149 /* Reset counters. */
150 FNE_RESET_COUNTERS,
b55ab92a
RZ
151 /* Toggle next hop group feature. */
152 FNE_TOGGLE_NHG,
a2032324
RZ
153 /* Reconnect request by our own code to avoid races. */
154 FNE_INTERNAL_RECONNECT,
55eb9d4d
RZ
155
156 /* Next hop groups walk finished. */
157 FNE_NHG_FINISHED,
158 /* RIB walk finished. */
159 FNE_RIB_FINISHED,
160 /* RMAC walk finished. */
161 FNE_RMAC_FINISHED,
d35f447d
RZ
162};
163
a2032324
RZ
164#define FPM_RECONNECT(fnc) \
165 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
166 FNE_INTERNAL_RECONNECT, &(fnc)->t_event)
167
55eb9d4d
RZ
168#define WALK_FINISH(fnc, ev) \
169 thread_add_event((fnc)->fthread->master, fpm_process_event, (fnc), \
170 (ev), NULL)
171
018e77bc
RZ
172/*
173 * Prototypes.
174 */
3bdd7fca 175static int fpm_process_event(struct thread *t);
018e77bc 176static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx);
981ca597
RZ
177static int fpm_nhg_send(struct thread *t);
178static int fpm_nhg_reset(struct thread *t);
018e77bc
RZ
179static int fpm_rib_send(struct thread *t);
180static int fpm_rib_reset(struct thread *t);
bda10adf
RZ
181static int fpm_rmac_send(struct thread *t);
182static int fpm_rmac_reset(struct thread *t);
018e77bc 183
ad4d1022
RZ
184/*
185 * Helper functions.
186 */
187
188/**
189 * Reorganizes the data on the buffer so it can fit more data.
190 *
191 * @param s stream pointer.
192 */
193static void stream_pulldown(struct stream *s)
194{
195 size_t rlen = STREAM_READABLE(s);
196
197 /* No more data, so just move the pointers. */
198 if (rlen == 0) {
199 stream_reset(s);
200 return;
201 }
202
203 /* Move the available data to the beginning. */
204 memmove(s->data, &s->data[s->getp], rlen);
205 s->getp = 0;
206 s->endp = rlen;
207}
208
3bdd7fca
RZ
209/*
210 * CLI.
211 */
6cc059cd
RZ
212#define FPM_STR "Forwarding Plane Manager configuration\n"
213
3bdd7fca
RZ
214DEFUN(fpm_set_address, fpm_set_address_cmd,
215 "fpm address <A.B.C.D|X:X::X:X> [port (1-65535)]",
6cc059cd 216 FPM_STR
3bdd7fca
RZ
217 "FPM remote listening server address\n"
218 "Remote IPv4 FPM server\n"
219 "Remote IPv6 FPM server\n"
220 "FPM remote listening server port\n"
221 "Remote FPM server port\n")
222{
223 struct sockaddr_in *sin;
224 struct sockaddr_in6 *sin6;
225 uint16_t port = 0;
226 uint8_t naddr[INET6_BUFSIZ];
227
228 if (argc == 5)
229 port = strtol(argv[4]->arg, NULL, 10);
230
231 /* Handle IPv4 addresses. */
232 if (inet_pton(AF_INET, argv[2]->arg, naddr) == 1) {
233 sin = (struct sockaddr_in *)&gfnc->addr;
234
235 memset(sin, 0, sizeof(*sin));
236 sin->sin_family = AF_INET;
237 sin->sin_port =
238 port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
239#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
240 sin->sin_len = sizeof(*sin);
241#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
242 memcpy(&sin->sin_addr, naddr, sizeof(sin->sin_addr));
243
244 goto ask_reconnect;
245 }
246
247 /* Handle IPv6 addresses. */
248 if (inet_pton(AF_INET6, argv[2]->arg, naddr) != 1) {
249 vty_out(vty, "%% Invalid address: %s\n", argv[2]->arg);
250 return CMD_WARNING;
251 }
252
253 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
254 memset(sin6, 0, sizeof(*sin6));
255 sin6->sin6_family = AF_INET6;
256 sin6->sin6_port = port ? htons(port) : htons(SOUTHBOUND_DEFAULT_PORT);
257#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
258 sin6->sin6_len = sizeof(*sin6);
259#endif /* HAVE_STRUCT_SOCKADDR_SA_LEN */
260 memcpy(&sin6->sin6_addr, naddr, sizeof(sin6->sin6_addr));
261
262ask_reconnect:
263 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
264 FNE_RECONNECT, &gfnc->t_event);
265 return CMD_SUCCESS;
266}
267
268DEFUN(no_fpm_set_address, no_fpm_set_address_cmd,
269 "no fpm address [<A.B.C.D|X:X::X:X> [port <1-65535>]]",
270 NO_STR
6cc059cd 271 FPM_STR
3bdd7fca
RZ
272 "FPM remote listening server address\n"
273 "Remote IPv4 FPM server\n"
274 "Remote IPv6 FPM server\n"
275 "FPM remote listening server port\n"
276 "Remote FPM server port\n")
277{
278 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
279 FNE_DISABLE, &gfnc->t_event);
280 return CMD_SUCCESS;
281}
282
b55ab92a
RZ
283DEFUN(fpm_use_nhg, fpm_use_nhg_cmd,
284 "fpm use-next-hop-groups",
285 FPM_STR
286 "Use netlink next hop groups feature.\n")
287{
288 /* Already enabled. */
289 if (gfnc->use_nhg)
290 return CMD_SUCCESS;
291
292 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
293 FNE_TOGGLE_NHG, &gfnc->t_event);
294
295 return CMD_SUCCESS;
296}
297
298DEFUN(no_fpm_use_nhg, no_fpm_use_nhg_cmd,
299 "no fpm use-next-hop-groups",
300 NO_STR
301 FPM_STR
302 "Use netlink next hop groups feature.\n")
303{
304 /* Already disabled. */
305 if (!gfnc->use_nhg)
306 return CMD_SUCCESS;
307
308 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
309 FNE_TOGGLE_NHG, &gfnc->t_event);
310
311 return CMD_SUCCESS;
312}
313
6cc059cd
RZ
314DEFUN(fpm_reset_counters, fpm_reset_counters_cmd,
315 "clear fpm counters",
316 CLEAR_STR
317 FPM_STR
318 "FPM statistic counters\n")
319{
320 thread_add_event(gfnc->fthread->master, fpm_process_event, gfnc,
321 FNE_RESET_COUNTERS, &gfnc->t_event);
322 return CMD_SUCCESS;
323}
324
325DEFUN(fpm_show_counters, fpm_show_counters_cmd,
326 "show fpm counters",
327 SHOW_STR
328 FPM_STR
329 "FPM statistic counters\n")
330{
331 vty_out(vty, "%30s\n%30s\n", "FPM counters", "============");
332
333#define SHOW_COUNTER(label, counter) \
770a8d28 334 vty_out(vty, "%28s: %u\n", (label), (counter))
6cc059cd
RZ
335
336 SHOW_COUNTER("Input bytes", gfnc->counters.bytes_read);
337 SHOW_COUNTER("Output bytes", gfnc->counters.bytes_sent);
ad4d1022
RZ
338 SHOW_COUNTER("Output buffer current size", gfnc->counters.obuf_bytes);
339 SHOW_COUNTER("Output buffer peak size", gfnc->counters.obuf_peak);
6cc059cd
RZ
340 SHOW_COUNTER("Connection closes", gfnc->counters.connection_closes);
341 SHOW_COUNTER("Connection errors", gfnc->counters.connection_errors);
342 SHOW_COUNTER("Data plane items processed",
343 gfnc->counters.dplane_contexts);
ba803a2f
RZ
344 SHOW_COUNTER("Data plane items enqueued",
345 gfnc->counters.ctxqueue_len);
346 SHOW_COUNTER("Data plane items queue peak",
347 gfnc->counters.ctxqueue_len_peak);
6cc059cd
RZ
348 SHOW_COUNTER("Buffer full hits", gfnc->counters.buffer_full);
349 SHOW_COUNTER("User FPM configurations", gfnc->counters.user_configures);
350 SHOW_COUNTER("User FPM disable requests", gfnc->counters.user_disables);
351
352#undef SHOW_COUNTER
353
354 return CMD_SUCCESS;
355}
356
357DEFUN(fpm_show_counters_json, fpm_show_counters_json_cmd,
358 "show fpm counters json",
359 SHOW_STR
360 FPM_STR
361 "FPM statistic counters\n"
362 JSON_STR)
363{
364 struct json_object *jo;
365
366 jo = json_object_new_object();
367 json_object_int_add(jo, "bytes-read", gfnc->counters.bytes_read);
368 json_object_int_add(jo, "bytes-sent", gfnc->counters.bytes_sent);
ad4d1022
RZ
369 json_object_int_add(jo, "obuf-bytes", gfnc->counters.obuf_bytes);
370 json_object_int_add(jo, "obuf-bytes-peak", gfnc->counters.obuf_peak);
a50404aa
RZ
371 json_object_int_add(jo, "connection-closes",
372 gfnc->counters.connection_closes);
373 json_object_int_add(jo, "connection-errors",
374 gfnc->counters.connection_errors);
375 json_object_int_add(jo, "data-plane-contexts",
376 gfnc->counters.dplane_contexts);
ba803a2f
RZ
377 json_object_int_add(jo, "data-plane-contexts-queue",
378 gfnc->counters.ctxqueue_len);
379 json_object_int_add(jo, "data-plane-contexts-queue-peak",
380 gfnc->counters.ctxqueue_len_peak);
6cc059cd 381 json_object_int_add(jo, "buffer-full-hits", gfnc->counters.buffer_full);
a50404aa
RZ
382 json_object_int_add(jo, "user-configures",
383 gfnc->counters.user_configures);
6cc059cd
RZ
384 json_object_int_add(jo, "user-disables", gfnc->counters.user_disables);
385 vty_out(vty, "%s\n", json_object_to_json_string_ext(jo, 0));
386 json_object_free(jo);
387
388 return CMD_SUCCESS;
389}
390
3bdd7fca
RZ
391static int fpm_write_config(struct vty *vty)
392{
393 struct sockaddr_in *sin;
394 struct sockaddr_in6 *sin6;
395 int written = 0;
396 char addrstr[INET6_ADDRSTRLEN];
397
398 if (gfnc->disabled)
399 return written;
400
401 switch (gfnc->addr.ss_family) {
402 case AF_INET:
403 written = 1;
404 sin = (struct sockaddr_in *)&gfnc->addr;
405 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
406 vty_out(vty, "fpm address %s", addrstr);
407 if (sin->sin_port != htons(SOUTHBOUND_DEFAULT_PORT))
408 vty_out(vty, " port %d", ntohs(sin->sin_port));
409
410 vty_out(vty, "\n");
411 break;
412 case AF_INET6:
413 written = 1;
414 sin6 = (struct sockaddr_in6 *)&gfnc->addr;
415 inet_ntop(AF_INET, &sin6->sin6_addr, addrstr, sizeof(addrstr));
416 vty_out(vty, "fpm address %s", addrstr);
417 if (sin6->sin6_port != htons(SOUTHBOUND_DEFAULT_PORT))
418 vty_out(vty, " port %d", ntohs(sin6->sin6_port));
419
420 vty_out(vty, "\n");
421 break;
422
423 default:
424 break;
425 }
426
b55ab92a
RZ
427 if (!gfnc->use_nhg) {
428 vty_out(vty, "no fpm use-next-hop-groups\n");
429 written = 1;
430 }
431
3bdd7fca
RZ
432 return written;
433}
434
612c2c15 435static struct cmd_node fpm_node = {
893d8beb
DL
436 .name = "fpm",
437 .node = FPM_NODE,
3bdd7fca 438 .prompt = "",
612c2c15 439 .config_write = fpm_write_config,
3bdd7fca
RZ
440};
441
d35f447d
RZ
442/*
443 * FPM functions.
444 */
445static int fpm_connect(struct thread *t);
446
447static void fpm_reconnect(struct fpm_nl_ctx *fnc)
448{
a2032324
RZ
449 /* Cancel all zebra threads first. */
450 thread_cancel_async(zrouter.master, &fnc->t_nhgreset, NULL);
451 thread_cancel_async(zrouter.master, &fnc->t_nhgwalk, NULL);
452 thread_cancel_async(zrouter.master, &fnc->t_ribreset, NULL);
453 thread_cancel_async(zrouter.master, &fnc->t_ribwalk, NULL);
454 thread_cancel_async(zrouter.master, &fnc->t_rmacreset, NULL);
455 thread_cancel_async(zrouter.master, &fnc->t_rmacwalk, NULL);
456
457 /*
458 * Grab the lock to empty the streams (data plane might try to
459 * enqueue updates while we are closing).
460 */
d35f447d
RZ
461 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
462
3bdd7fca
RZ
463 /* Avoid calling close on `-1`. */
464 if (fnc->socket != -1) {
465 close(fnc->socket);
466 fnc->socket = -1;
467 }
468
d35f447d
RZ
469 stream_reset(fnc->ibuf);
470 stream_reset(fnc->obuf);
471 THREAD_OFF(fnc->t_read);
472 THREAD_OFF(fnc->t_write);
018e77bc 473
3bdd7fca
RZ
474 /* FPM is disabled, don't attempt to connect. */
475 if (fnc->disabled)
476 return;
477
d35f447d
RZ
478 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
479 &fnc->t_connect);
480}
481
482static int fpm_read(struct thread *t)
483{
484 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
485 ssize_t rv;
486
487 /* Let's ignore the input at the moment. */
488 rv = stream_read_try(fnc->ibuf, fnc->socket,
489 STREAM_WRITEABLE(fnc->ibuf));
e1afb97f
RZ
490 /* We've got an interruption. */
491 if (rv == -2) {
492 /* Schedule next read. */
493 thread_add_read(fnc->fthread->master, fpm_read, fnc,
494 fnc->socket, &fnc->t_read);
495 return 0;
496 }
d35f447d 497 if (rv == 0) {
c871e6c9
RZ
498 atomic_fetch_add_explicit(&fnc->counters.connection_closes, 1,
499 memory_order_relaxed);
e5e444d8
RZ
500
501 if (IS_ZEBRA_DEBUG_FPM)
502 zlog_debug("%s: connection closed", __func__);
503
a2032324 504 FPM_RECONNECT(fnc);
d35f447d
RZ
505 return 0;
506 }
507 if (rv == -1) {
c871e6c9
RZ
508 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
509 memory_order_relaxed);
e5e444d8
RZ
510 zlog_warn("%s: connection failure: %s", __func__,
511 strerror(errno));
a2032324 512 FPM_RECONNECT(fnc);
d35f447d
RZ
513 return 0;
514 }
515 stream_reset(fnc->ibuf);
516
6cc059cd 517 /* Account all bytes read. */
c871e6c9
RZ
518 atomic_fetch_add_explicit(&fnc->counters.bytes_read, rv,
519 memory_order_relaxed);
6cc059cd 520
d35f447d
RZ
521 thread_add_read(fnc->fthread->master, fpm_read, fnc, fnc->socket,
522 &fnc->t_read);
523
524 return 0;
525}
526
527static int fpm_write(struct thread *t)
528{
529 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
530 socklen_t statuslen;
531 ssize_t bwritten;
532 int rv, status;
533 size_t btotal;
534
535 if (fnc->connecting == true) {
536 status = 0;
537 statuslen = sizeof(status);
538
539 rv = getsockopt(fnc->socket, SOL_SOCKET, SO_ERROR, &status,
540 &statuslen);
541 if (rv == -1 || status != 0) {
542 if (rv != -1)
e5e444d8
RZ
543 zlog_warn("%s: connection failed: %s", __func__,
544 strerror(status));
d35f447d 545 else
e5e444d8
RZ
546 zlog_warn("%s: SO_ERROR failed: %s", __func__,
547 strerror(status));
d35f447d 548
c871e6c9
RZ
549 atomic_fetch_add_explicit(
550 &fnc->counters.connection_errors, 1,
551 memory_order_relaxed);
6cc059cd 552
a2032324 553 FPM_RECONNECT(fnc);
d35f447d
RZ
554 return 0;
555 }
556
557 fnc->connecting = false;
018e77bc 558
e1afb97f
RZ
559 /* Permit receiving messages now. */
560 thread_add_read(fnc->fthread->master, fpm_read, fnc,
561 fnc->socket, &fnc->t_read);
562
b55ab92a
RZ
563 /*
564 * Walk the route tables to send old information before starting
565 * to send updated information.
566 *
567 * NOTE 1:
568 * RIB table walk is called after the next group table walk
569 * ends.
570 *
571 * NOTE 2:
572 * Don't attempt to go through next hop group table if we were
573 * explictly told to not use it.
574 */
575 if (fnc->use_nhg)
576 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
577 &fnc->t_nhgwalk);
578 else
579 thread_add_timer(zrouter.master, fpm_rib_send, fnc, 0,
580 &fnc->t_ribwalk);
581
bda10adf
RZ
582 thread_add_timer(zrouter.master, fpm_rmac_send, fnc, 0,
583 &fnc->t_rmacwalk);
d35f447d
RZ
584 }
585
586 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
587
588 while (true) {
589 /* Stream is empty: reset pointers and return. */
590 if (STREAM_READABLE(fnc->obuf) == 0) {
591 stream_reset(fnc->obuf);
592 break;
593 }
594
595 /* Try to write all at once. */
596 btotal = stream_get_endp(fnc->obuf) -
597 stream_get_getp(fnc->obuf);
598 bwritten = write(fnc->socket, stream_pnt(fnc->obuf), btotal);
599 if (bwritten == 0) {
c871e6c9
RZ
600 atomic_fetch_add_explicit(
601 &fnc->counters.connection_closes, 1,
602 memory_order_relaxed);
e5e444d8
RZ
603
604 if (IS_ZEBRA_DEBUG_FPM)
605 zlog_debug("%s: connection closed", __func__);
d35f447d
RZ
606 break;
607 }
608 if (bwritten == -1) {
ad4d1022
RZ
609 /* Attempt to continue if blocked by a signal. */
610 if (errno == EINTR)
611 continue;
612 /* Receiver is probably slow, lets give it some time. */
613 if (errno == EAGAIN || errno == EWOULDBLOCK)
d35f447d
RZ
614 break;
615
c871e6c9
RZ
616 atomic_fetch_add_explicit(
617 &fnc->counters.connection_errors, 1,
618 memory_order_relaxed);
e5e444d8
RZ
619 zlog_warn("%s: connection failure: %s", __func__,
620 strerror(errno));
a2032324
RZ
621
622 FPM_RECONNECT(fnc);
623 return 0;
d35f447d
RZ
624 }
625
6cc059cd 626 /* Account all bytes sent. */
c871e6c9
RZ
627 atomic_fetch_add_explicit(&fnc->counters.bytes_sent, bwritten,
628 memory_order_relaxed);
6cc059cd 629
ad4d1022 630 /* Account number of bytes free. */
c871e6c9
RZ
631 atomic_fetch_sub_explicit(&fnc->counters.obuf_bytes, bwritten,
632 memory_order_relaxed);
ad4d1022 633
d35f447d
RZ
634 stream_forward_getp(fnc->obuf, (size_t)bwritten);
635 }
636
637 /* Stream is not empty yet, we must schedule more writes. */
638 if (STREAM_READABLE(fnc->obuf)) {
ad4d1022 639 stream_pulldown(fnc->obuf);
d35f447d
RZ
640 thread_add_write(fnc->fthread->master, fpm_write, fnc,
641 fnc->socket, &fnc->t_write);
642 return 0;
643 }
644
645 return 0;
646}
647
648static int fpm_connect(struct thread *t)
649{
650 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
3bdd7fca
RZ
651 struct sockaddr_in *sin = (struct sockaddr_in *)&fnc->addr;
652 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&fnc->addr;
653 socklen_t slen;
d35f447d
RZ
654 int rv, sock;
655 char addrstr[INET6_ADDRSTRLEN];
656
3bdd7fca 657 sock = socket(fnc->addr.ss_family, SOCK_STREAM, 0);
d35f447d 658 if (sock == -1) {
6cc059cd 659 zlog_err("%s: fpm socket failed: %s", __func__,
d35f447d
RZ
660 strerror(errno));
661 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
662 &fnc->t_connect);
663 return 0;
664 }
665
666 set_nonblocking(sock);
667
3bdd7fca
RZ
668 if (fnc->addr.ss_family == AF_INET) {
669 inet_ntop(AF_INET, &sin->sin_addr, addrstr, sizeof(addrstr));
670 slen = sizeof(*sin);
671 } else {
672 inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, sizeof(addrstr));
673 slen = sizeof(*sin6);
674 }
d35f447d 675
e5e444d8
RZ
676 if (IS_ZEBRA_DEBUG_FPM)
677 zlog_debug("%s: attempting to connect to %s:%d", __func__,
678 addrstr, ntohs(sin->sin_port));
d35f447d 679
3bdd7fca 680 rv = connect(sock, (struct sockaddr *)&fnc->addr, slen);
d35f447d 681 if (rv == -1 && errno != EINPROGRESS) {
c871e6c9
RZ
682 atomic_fetch_add_explicit(&fnc->counters.connection_errors, 1,
683 memory_order_relaxed);
d35f447d
RZ
684 close(sock);
685 zlog_warn("%s: fpm connection failed: %s", __func__,
686 strerror(errno));
687 thread_add_timer(fnc->fthread->master, fpm_connect, fnc, 3,
688 &fnc->t_connect);
689 return 0;
690 }
691
692 fnc->connecting = (errno == EINPROGRESS);
693 fnc->socket = sock;
e1afb97f
RZ
694 if (!fnc->connecting)
695 thread_add_read(fnc->fthread->master, fpm_read, fnc, sock,
696 &fnc->t_read);
d35f447d
RZ
697 thread_add_write(fnc->fthread->master, fpm_write, fnc, sock,
698 &fnc->t_write);
699
018e77bc 700 /* Mark all routes as unsent. */
981ca597
RZ
701 thread_add_timer(zrouter.master, fpm_nhg_reset, fnc, 0,
702 &fnc->t_nhgreset);
018e77bc
RZ
703 thread_add_timer(zrouter.master, fpm_rib_reset, fnc, 0,
704 &fnc->t_ribreset);
bda10adf
RZ
705 thread_add_timer(zrouter.master, fpm_rmac_reset, fnc, 0,
706 &fnc->t_rmacreset);
018e77bc 707
d35f447d
RZ
708 return 0;
709}
710
711/**
712 * Encode data plane operation context into netlink and enqueue it in the FPM
713 * output buffer.
714 *
715 * @param fnc the netlink FPM context.
716 * @param ctx the data plane operation context data.
717 * @return 0 on success or -1 on not enough space.
718 */
719static int fpm_nl_enqueue(struct fpm_nl_ctx *fnc, struct zebra_dplane_ctx *ctx)
720{
721 uint8_t nl_buf[NL_PKT_BUF_SIZE];
722 size_t nl_buf_len;
723 ssize_t rv;
edfeff42 724 uint64_t obytes, obytes_peak;
b55ab92a
RZ
725 enum dplane_op_e op = dplane_ctx_get_op(ctx);
726
727 /*
728 * If we were configured to not use next hop groups, then quit as soon
729 * as possible.
730 */
731 if ((!fnc->use_nhg)
732 && (op == DPLANE_OP_NH_DELETE || op == DPLANE_OP_NH_INSTALL
733 || op == DPLANE_OP_NH_UPDATE))
734 return 0;
d35f447d
RZ
735
736 nl_buf_len = 0;
737
738 frr_mutex_lock_autounlock(&fnc->obuf_mutex);
739
b55ab92a 740 switch (op) {
d35f447d
RZ
741 case DPLANE_OP_ROUTE_UPDATE:
742 case DPLANE_OP_ROUTE_DELETE:
0be6e7d7
JU
743 rv = netlink_route_multipath_msg_encode(RTM_DELROUTE, ctx,
744 nl_buf, sizeof(nl_buf),
745 true, fnc->use_nhg);
d35f447d 746 if (rv <= 0) {
0be6e7d7
JU
747 zlog_err(
748 "%s: netlink_route_multipath_msg_encode failed",
749 __func__);
d35f447d
RZ
750 return 0;
751 }
752
753 nl_buf_len = (size_t)rv;
d35f447d
RZ
754
755 /* UPDATE operations need a INSTALL, otherwise just quit. */
b55ab92a 756 if (op == DPLANE_OP_ROUTE_DELETE)
d35f447d
RZ
757 break;
758
759 /* FALL THROUGH */
760 case DPLANE_OP_ROUTE_INSTALL:
0be6e7d7 761 rv = netlink_route_multipath_msg_encode(
b55ab92a
RZ
762 RTM_NEWROUTE, ctx, &nl_buf[nl_buf_len],
763 sizeof(nl_buf) - nl_buf_len, true, fnc->use_nhg);
d35f447d 764 if (rv <= 0) {
0be6e7d7
JU
765 zlog_err(
766 "%s: netlink_route_multipath_msg_encode failed",
767 __func__);
d35f447d
RZ
768 return 0;
769 }
770
771 nl_buf_len += (size_t)rv;
d35f447d
RZ
772 break;
773
bda10adf
RZ
774 case DPLANE_OP_MAC_INSTALL:
775 case DPLANE_OP_MAC_DELETE:
776 rv = netlink_macfdb_update_ctx(ctx, nl_buf, sizeof(nl_buf));
777 if (rv <= 0) {
e5e444d8
RZ
778 zlog_err("%s: netlink_macfdb_update_ctx failed",
779 __func__);
bda10adf
RZ
780 return 0;
781 }
782
783 nl_buf_len = (size_t)rv;
bda10adf
RZ
784 break;
785
e9a1cd93 786 case DPLANE_OP_NH_DELETE:
0be6e7d7
JU
787 rv = netlink_nexthop_msg_encode(RTM_DELNEXTHOP, ctx, nl_buf,
788 sizeof(nl_buf));
e9a1cd93 789 if (rv <= 0) {
0be6e7d7
JU
790 zlog_err("%s: netlink_nexthop_msg_encode failed",
791 __func__);
e9a1cd93
RZ
792 return 0;
793 }
794
795 nl_buf_len = (size_t)rv;
796 break;
d35f447d
RZ
797 case DPLANE_OP_NH_INSTALL:
798 case DPLANE_OP_NH_UPDATE:
0be6e7d7
JU
799 rv = netlink_nexthop_msg_encode(RTM_NEWNEXTHOP, ctx, nl_buf,
800 sizeof(nl_buf));
e9a1cd93 801 if (rv <= 0) {
0be6e7d7
JU
802 zlog_err("%s: netlink_nexthop_msg_encode failed",
803 __func__);
e9a1cd93
RZ
804 return 0;
805 }
806
807 nl_buf_len = (size_t)rv;
808 break;
809
d35f447d
RZ
810 case DPLANE_OP_LSP_INSTALL:
811 case DPLANE_OP_LSP_UPDATE:
812 case DPLANE_OP_LSP_DELETE:
813 case DPLANE_OP_PW_INSTALL:
814 case DPLANE_OP_PW_UNINSTALL:
815 case DPLANE_OP_ADDR_INSTALL:
816 case DPLANE_OP_ADDR_UNINSTALL:
d35f447d
RZ
817 case DPLANE_OP_NEIGH_INSTALL:
818 case DPLANE_OP_NEIGH_UPDATE:
819 case DPLANE_OP_NEIGH_DELETE:
820 case DPLANE_OP_VTEP_ADD:
821 case DPLANE_OP_VTEP_DELETE:
822 case DPLANE_OP_SYS_ROUTE_ADD:
823 case DPLANE_OP_SYS_ROUTE_DELETE:
824 case DPLANE_OP_ROUTE_NOTIFY:
825 case DPLANE_OP_LSP_NOTIFY:
826 case DPLANE_OP_NONE:
827 break;
828
829 default:
e5e444d8
RZ
830 if (IS_ZEBRA_DEBUG_FPM)
831 zlog_debug("%s: unhandled data plane message (%d) %s",
832 __func__, dplane_ctx_get_op(ctx),
833 dplane_op2str(dplane_ctx_get_op(ctx)));
d35f447d
RZ
834 break;
835 }
836
837 /* Skip empty enqueues. */
838 if (nl_buf_len == 0)
839 return 0;
840
a179ba35
RZ
841 /* We must know if someday a message goes beyond 65KiB. */
842 assert((nl_buf_len + FPM_HEADER_SIZE) <= UINT16_MAX);
843
844 /* Check if we have enough buffer space. */
845 if (STREAM_WRITEABLE(fnc->obuf) < (nl_buf_len + FPM_HEADER_SIZE)) {
c871e6c9
RZ
846 atomic_fetch_add_explicit(&fnc->counters.buffer_full, 1,
847 memory_order_relaxed);
e5e444d8
RZ
848
849 if (IS_ZEBRA_DEBUG_FPM)
850 zlog_debug(
851 "%s: buffer full: wants to write %zu but has %zu",
852 __func__, nl_buf_len + FPM_HEADER_SIZE,
853 STREAM_WRITEABLE(fnc->obuf));
854
a179ba35
RZ
855 return -1;
856 }
857
d35f447d 858 /*
a179ba35
RZ
859 * Fill in the FPM header information.
860 *
861 * See FPM_HEADER_SIZE definition for more information.
d35f447d
RZ
862 */
863 stream_putc(fnc->obuf, 1);
864 stream_putc(fnc->obuf, 1);
a179ba35 865 stream_putw(fnc->obuf, nl_buf_len + FPM_HEADER_SIZE);
d35f447d
RZ
866
867 /* Write current data. */
868 stream_write(fnc->obuf, nl_buf, (size_t)nl_buf_len);
869
ad4d1022 870 /* Account number of bytes waiting to be written. */
c871e6c9
RZ
871 atomic_fetch_add_explicit(&fnc->counters.obuf_bytes,
872 nl_buf_len + FPM_HEADER_SIZE,
873 memory_order_relaxed);
edfeff42
RZ
874 obytes = atomic_load_explicit(&fnc->counters.obuf_bytes,
875 memory_order_relaxed);
876 obytes_peak = atomic_load_explicit(&fnc->counters.obuf_peak,
877 memory_order_relaxed);
878 if (obytes_peak < obytes)
c871e6c9
RZ
879 atomic_store_explicit(&fnc->counters.obuf_peak, obytes,
880 memory_order_relaxed);
ad4d1022 881
d35f447d
RZ
882 /* Tell the thread to start writing. */
883 thread_add_write(fnc->fthread->master, fpm_write, fnc, fnc->socket,
884 &fnc->t_write);
885
886 return 0;
887}
888
981ca597
RZ
889/*
890 * Next hop walk/send functions.
891 */
892struct fpm_nhg_arg {
893 struct zebra_dplane_ctx *ctx;
894 struct fpm_nl_ctx *fnc;
895 bool complete;
896};
897
898static int fpm_nhg_send_cb(struct hash_bucket *bucket, void *arg)
899{
900 struct nhg_hash_entry *nhe = bucket->data;
901 struct fpm_nhg_arg *fna = arg;
902
903 /* This entry was already sent, skip it. */
904 if (CHECK_FLAG(nhe->flags, NEXTHOP_GROUP_FPM))
905 return HASHWALK_CONTINUE;
906
907 /* Reset ctx to reuse allocated memory, take a snapshot and send it. */
908 dplane_ctx_reset(fna->ctx);
909 dplane_ctx_nexthop_init(fna->ctx, DPLANE_OP_NH_INSTALL, nhe);
910 if (fpm_nl_enqueue(fna->fnc, fna->ctx) == -1) {
911 /* Our buffers are full, lets give it some cycles. */
912 fna->complete = false;
913 return HASHWALK_ABORT;
914 }
915
916 /* Mark group as sent, so it doesn't get sent again. */
917 SET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
918
919 return HASHWALK_CONTINUE;
920}
921
922static int fpm_nhg_send(struct thread *t)
923{
924 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
925 struct fpm_nhg_arg fna;
926
927 fna.fnc = fnc;
928 fna.ctx = dplane_ctx_alloc();
929 fna.complete = true;
930
931 /* Send next hops. */
932 hash_walk(zrouter.nhgs_id, fpm_nhg_send_cb, &fna);
933
934 /* `free()` allocated memory. */
935 dplane_ctx_fini(&fna.ctx);
936
937 /* We are done sending next hops, lets install the routes now. */
55eb9d4d
RZ
938 if (fna.complete) {
939 WALK_FINISH(fnc, FNE_NHG_FINISHED);
981ca597
RZ
940 thread_add_timer(zrouter.master, fpm_rib_send, fnc, 0,
941 &fnc->t_ribwalk);
55eb9d4d 942 } else /* Otherwise reschedule next hop group again. */
981ca597
RZ
943 thread_add_timer(zrouter.master, fpm_nhg_send, fnc, 0,
944 &fnc->t_nhgwalk);
945
946 return 0;
947}
948
018e77bc
RZ
949/**
950 * Send all RIB installed routes to the connected data plane.
951 */
952static int fpm_rib_send(struct thread *t)
953{
954 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
955 rib_dest_t *dest;
956 struct route_node *rn;
957 struct route_table *rt;
958 struct zebra_dplane_ctx *ctx;
959 rib_tables_iter_t rt_iter;
960
961 /* Allocate temporary context for all transactions. */
962 ctx = dplane_ctx_alloc();
963
964 rt_iter.state = RIB_TABLES_ITER_S_INIT;
965 while ((rt = rib_tables_iter_next(&rt_iter))) {
966 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
967 dest = rib_dest_from_rnode(rn);
968 /* Skip bad route entries. */
a50404aa 969 if (dest == NULL || dest->selected_fib == NULL)
018e77bc 970 continue;
018e77bc
RZ
971
972 /* Check for already sent routes. */
a50404aa 973 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
018e77bc 974 continue;
018e77bc
RZ
975
976 /* Enqueue route install. */
977 dplane_ctx_reset(ctx);
978 dplane_ctx_route_init(ctx, DPLANE_OP_ROUTE_INSTALL, rn,
979 dest->selected_fib);
980 if (fpm_nl_enqueue(fnc, ctx) == -1) {
981 /* Free the temporary allocated context. */
982 dplane_ctx_fini(&ctx);
983
018e77bc
RZ
984 thread_add_timer(zrouter.master, fpm_rib_send,
985 fnc, 1, &fnc->t_ribwalk);
986 return 0;
987 }
988
989 /* Mark as sent. */
990 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
991 }
992 }
993
994 /* Free the temporary allocated context. */
995 dplane_ctx_fini(&ctx);
996
997 /* All RIB routes sent! */
55eb9d4d 998 WALK_FINISH(fnc, FNE_RIB_FINISHED);
018e77bc
RZ
999
1000 return 0;
1001}
1002
bda10adf
RZ
1003/*
1004 * The next three functions will handle RMAC enqueue.
1005 */
1006struct fpm_rmac_arg {
1007 struct zebra_dplane_ctx *ctx;
1008 struct fpm_nl_ctx *fnc;
1009 zebra_l3vni_t *zl3vni;
55eb9d4d 1010 bool complete;
bda10adf
RZ
1011};
1012
9d5c3268 1013static void fpm_enqueue_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1014{
1015 struct fpm_rmac_arg *fra = arg;
1016 zebra_mac_t *zrmac = backet->data;
1017 struct zebra_if *zif = fra->zl3vni->vxlan_if->info;
1018 const struct zebra_l2info_vxlan *vxl = &zif->l2info.vxl;
1019 struct zebra_if *br_zif;
1020 vlanid_t vid;
1021 bool sticky;
1022
1023 /* Entry already sent. */
55eb9d4d 1024 if (CHECK_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT) || !fra->complete)
bda10adf
RZ
1025 return;
1026
1027 sticky = !!CHECK_FLAG(zrmac->flags,
1028 (ZEBRA_MAC_STICKY | ZEBRA_MAC_REMOTE_DEF_GW));
1029 br_zif = (struct zebra_if *)(zif->brslave_info.br_if->info);
1030 vid = IS_ZEBRA_IF_BRIDGE_VLAN_AWARE(br_zif) ? vxl->access_vlan : 0;
1031
1032 dplane_ctx_reset(fra->ctx);
1033 dplane_ctx_set_op(fra->ctx, DPLANE_OP_MAC_INSTALL);
1034 dplane_mac_init(fra->ctx, fra->zl3vni->vxlan_if,
f2a0ba3a
RZ
1035 zif->brslave_info.br_if, vid,
1036 &zrmac->macaddr, zrmac->fwd_info.r_vtep_ip, sticky);
bda10adf 1037 if (fpm_nl_enqueue(fra->fnc, fra->ctx) == -1) {
bda10adf
RZ
1038 thread_add_timer(zrouter.master, fpm_rmac_send,
1039 fra->fnc, 1, &fra->fnc->t_rmacwalk);
55eb9d4d 1040 fra->complete = false;
bda10adf
RZ
1041 }
1042}
1043
9d5c3268 1044static void fpm_enqueue_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1045{
1046 struct fpm_rmac_arg *fra = arg;
1047 zebra_l3vni_t *zl3vni = backet->data;
1048
1049 fra->zl3vni = zl3vni;
1050 hash_iterate(zl3vni->rmac_table, fpm_enqueue_rmac_table, zl3vni);
1051}
1052
1053static int fpm_rmac_send(struct thread *t)
1054{
1055 struct fpm_rmac_arg fra;
1056
1057 fra.fnc = THREAD_ARG(t);
1058 fra.ctx = dplane_ctx_alloc();
55eb9d4d 1059 fra.complete = true;
bda10adf
RZ
1060 hash_iterate(zrouter.l3vni_table, fpm_enqueue_l3vni_table, &fra);
1061 dplane_ctx_fini(&fra.ctx);
1062
55eb9d4d
RZ
1063 /* RMAC walk completed. */
1064 if (fra.complete)
1065 WALK_FINISH(fra.fnc, FNE_RMAC_FINISHED);
1066
bda10adf
RZ
1067 return 0;
1068}
1069
981ca597
RZ
1070/*
1071 * Resets the next hop FPM flags so we send all next hops again.
1072 */
1073static void fpm_nhg_reset_cb(struct hash_bucket *bucket, void *arg)
1074{
1075 struct nhg_hash_entry *nhe = bucket->data;
1076
1077 /* Unset FPM installation flag so it gets installed again. */
1078 UNSET_FLAG(nhe->flags, NEXTHOP_GROUP_FPM);
1079}
1080
1081static int fpm_nhg_reset(struct thread *t)
1082{
55eb9d4d
RZ
1083 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1084
1085 fnc->nhg_complete = false;
981ca597
RZ
1086 hash_iterate(zrouter.nhgs_id, fpm_nhg_reset_cb, NULL);
1087 return 0;
1088}
1089
018e77bc
RZ
1090/**
1091 * Resets the RIB FPM flags so we send all routes again.
1092 */
1093static int fpm_rib_reset(struct thread *t)
1094{
1095 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1096 rib_dest_t *dest;
1097 struct route_node *rn;
1098 struct route_table *rt;
1099 rib_tables_iter_t rt_iter;
1100
1101 fnc->rib_complete = false;
1102
1103 rt_iter.state = RIB_TABLES_ITER_S_INIT;
1104 while ((rt = rib_tables_iter_next(&rt_iter))) {
1105 for (rn = route_top(rt); rn; rn = srcdest_route_next(rn)) {
1106 dest = rib_dest_from_rnode(rn);
1107 /* Skip bad route entries. */
1108 if (dest == NULL)
1109 continue;
1110
1111 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1112 }
1113 }
1114
1115 return 0;
1116}
1117
bda10adf
RZ
1118/*
1119 * The next three function will handle RMAC table reset.
1120 */
9d5c3268 1121static void fpm_unset_rmac_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1122{
1123 zebra_mac_t *zrmac = backet->data;
1124
1125 UNSET_FLAG(zrmac->flags, ZEBRA_MAC_FPM_SENT);
1126}
1127
9d5c3268 1128static void fpm_unset_l3vni_table(struct hash_bucket *backet, void *arg)
bda10adf
RZ
1129{
1130 zebra_l3vni_t *zl3vni = backet->data;
1131
1132 hash_iterate(zl3vni->rmac_table, fpm_unset_rmac_table, zl3vni);
1133}
1134
1135static int fpm_rmac_reset(struct thread *t)
1136{
55eb9d4d
RZ
1137 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1138
1139 fnc->rmac_complete = false;
bda10adf
RZ
1140 hash_iterate(zrouter.l3vni_table, fpm_unset_l3vni_table, NULL);
1141
1142 return 0;
1143}
1144
ba803a2f
RZ
1145static int fpm_process_queue(struct thread *t)
1146{
1147 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1148 struct zebra_dplane_ctx *ctx;
1149
1150 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1151
1152 while (true) {
1153 /* No space available yet. */
1154 if (STREAM_WRITEABLE(fnc->obuf) < NL_PKT_BUF_SIZE)
1155 break;
1156
1157 /* Dequeue next item or quit processing. */
1158 ctx = dplane_ctx_dequeue(&fnc->ctxqueue);
1159 if (ctx == NULL)
1160 break;
1161
1162 fpm_nl_enqueue(fnc, ctx);
1163
1164 /* Account the processed entries. */
c871e6c9
RZ
1165 atomic_fetch_add_explicit(&fnc->counters.dplane_contexts, 1,
1166 memory_order_relaxed);
1167 atomic_fetch_sub_explicit(&fnc->counters.ctxqueue_len, 1,
1168 memory_order_relaxed);
ba803a2f
RZ
1169
1170 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1171 dplane_provider_enqueue_out_ctx(fnc->prov, ctx);
1172 }
1173
1174 /* Check for more items in the queue. */
c871e6c9
RZ
1175 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1176 memory_order_relaxed)
1177 > 0)
ba803a2f
RZ
1178 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1179 fnc, 0, &fnc->t_dequeue);
1180
1181 return 0;
1182}
1183
3bdd7fca
RZ
1184/**
1185 * Handles external (e.g. CLI, data plane or others) events.
1186 */
1187static int fpm_process_event(struct thread *t)
1188{
1189 struct fpm_nl_ctx *fnc = THREAD_ARG(t);
1190 int event = THREAD_VAL(t);
1191
1192 switch (event) {
1193 case FNE_DISABLE:
e5e444d8 1194 zlog_info("%s: manual FPM disable event", __func__);
3bdd7fca 1195 fnc->disabled = true;
c871e6c9
RZ
1196 atomic_fetch_add_explicit(&fnc->counters.user_disables, 1,
1197 memory_order_relaxed);
3bdd7fca
RZ
1198
1199 /* Call reconnect to disable timers and clean up context. */
1200 fpm_reconnect(fnc);
1201 break;
1202
1203 case FNE_RECONNECT:
e5e444d8 1204 zlog_info("%s: manual FPM reconnect event", __func__);
3bdd7fca 1205 fnc->disabled = false;
c871e6c9
RZ
1206 atomic_fetch_add_explicit(&fnc->counters.user_configures, 1,
1207 memory_order_relaxed);
3bdd7fca
RZ
1208 fpm_reconnect(fnc);
1209 break;
1210
6cc059cd 1211 case FNE_RESET_COUNTERS:
e5e444d8 1212 zlog_info("%s: manual FPM counters reset event", __func__);
6cc059cd
RZ
1213 memset(&fnc->counters, 0, sizeof(fnc->counters));
1214 break;
1215
b55ab92a
RZ
1216 case FNE_TOGGLE_NHG:
1217 zlog_info("%s: toggle next hop groups support", __func__);
1218 fnc->use_nhg = !fnc->use_nhg;
1219 fpm_reconnect(fnc);
1220 break;
1221
a2032324
RZ
1222 case FNE_INTERNAL_RECONNECT:
1223 fpm_reconnect(fnc);
1224 break;
1225
55eb9d4d
RZ
1226 case FNE_NHG_FINISHED:
1227 if (IS_ZEBRA_DEBUG_FPM)
1228 zlog_debug("%s: next hop groups walk finished",
1229 __func__);
1230
1231 fnc->nhg_complete = true;
1232 break;
1233 case FNE_RIB_FINISHED:
1234 if (IS_ZEBRA_DEBUG_FPM)
1235 zlog_debug("%s: RIB walk finished", __func__);
1236
1237 fnc->rib_complete = true;
1238 break;
1239 case FNE_RMAC_FINISHED:
1240 if (IS_ZEBRA_DEBUG_FPM)
1241 zlog_debug("%s: RMAC walk finished", __func__);
1242
1243 fnc->rmac_complete = true;
1244 break;
1245
3bdd7fca 1246 default:
e5e444d8
RZ
1247 if (IS_ZEBRA_DEBUG_FPM)
1248 zlog_debug("%s: unhandled event %d", __func__, event);
3bdd7fca
RZ
1249 break;
1250 }
1251
1252 return 0;
1253}
1254
d35f447d
RZ
1255/*
1256 * Data plane functions.
1257 */
1258static int fpm_nl_start(struct zebra_dplane_provider *prov)
1259{
1260 struct fpm_nl_ctx *fnc;
1261
1262 fnc = dplane_provider_get_data(prov);
1263 fnc->fthread = frr_pthread_new(NULL, prov_name, prov_name);
1264 assert(frr_pthread_run(fnc->fthread, NULL) == 0);
1265 fnc->ibuf = stream_new(NL_PKT_BUF_SIZE);
1266 fnc->obuf = stream_new(NL_PKT_BUF_SIZE * 128);
1267 pthread_mutex_init(&fnc->obuf_mutex, NULL);
1268 fnc->socket = -1;
3bdd7fca 1269 fnc->disabled = true;
ba803a2f
RZ
1270 fnc->prov = prov;
1271 TAILQ_INIT(&fnc->ctxqueue);
1272 pthread_mutex_init(&fnc->ctxqueue_mutex, NULL);
d35f447d 1273
b55ab92a
RZ
1274 /* Set default values. */
1275 fnc->use_nhg = true;
1276
d35f447d
RZ
1277 return 0;
1278}
1279
98a87504 1280static int fpm_nl_finish_early(struct fpm_nl_ctx *fnc)
d35f447d 1281{
98a87504 1282 /* Disable all events and close socket. */
981ca597
RZ
1283 THREAD_OFF(fnc->t_nhgreset);
1284 THREAD_OFF(fnc->t_nhgwalk);
98a87504
RZ
1285 THREAD_OFF(fnc->t_ribreset);
1286 THREAD_OFF(fnc->t_ribwalk);
1287 THREAD_OFF(fnc->t_rmacreset);
1288 THREAD_OFF(fnc->t_rmacwalk);
1289 thread_cancel_async(fnc->fthread->master, &fnc->t_read, NULL);
1290 thread_cancel_async(fnc->fthread->master, &fnc->t_write, NULL);
1291 thread_cancel_async(fnc->fthread->master, &fnc->t_connect, NULL);
d35f447d 1292
98a87504
RZ
1293 if (fnc->socket != -1) {
1294 close(fnc->socket);
1295 fnc->socket = -1;
1296 }
1297
1298 return 0;
1299}
1300
1301static int fpm_nl_finish_late(struct fpm_nl_ctx *fnc)
1302{
1303 /* Stop the running thread. */
1304 frr_pthread_stop(fnc->fthread, NULL);
1305
1306 /* Free all allocated resources. */
1307 pthread_mutex_destroy(&fnc->obuf_mutex);
1308 pthread_mutex_destroy(&fnc->ctxqueue_mutex);
d35f447d
RZ
1309 stream_free(fnc->ibuf);
1310 stream_free(fnc->obuf);
98a87504
RZ
1311 free(gfnc);
1312 gfnc = NULL;
d35f447d
RZ
1313
1314 return 0;
1315}
1316
98a87504
RZ
1317static int fpm_nl_finish(struct zebra_dplane_provider *prov, bool early)
1318{
1319 struct fpm_nl_ctx *fnc;
1320
1321 fnc = dplane_provider_get_data(prov);
1322 if (early)
1323 return fpm_nl_finish_early(fnc);
1324
1325 return fpm_nl_finish_late(fnc);
1326}
1327
d35f447d
RZ
1328static int fpm_nl_process(struct zebra_dplane_provider *prov)
1329{
1330 struct zebra_dplane_ctx *ctx;
1331 struct fpm_nl_ctx *fnc;
1332 int counter, limit;
edfeff42 1333 uint64_t cur_queue, peak_queue;
d35f447d
RZ
1334
1335 fnc = dplane_provider_get_data(prov);
1336 limit = dplane_provider_get_work_limit(prov);
1337 for (counter = 0; counter < limit; counter++) {
1338 ctx = dplane_provider_dequeue_in_ctx(prov);
1339 if (ctx == NULL)
1340 break;
1341
1342 /*
1343 * Skip all notifications if not connected, we'll walk the RIB
1344 * anyway.
1345 */
6cc059cd 1346 if (fnc->socket != -1 && fnc->connecting == false) {
ba803a2f
RZ
1347 frr_mutex_lock_autounlock(&fnc->ctxqueue_mutex);
1348 dplane_ctx_enqueue_tail(&fnc->ctxqueue, ctx);
1349
1350 /* Account the number of contexts. */
c871e6c9
RZ
1351 atomic_fetch_add_explicit(&fnc->counters.ctxqueue_len,
1352 1, memory_order_relaxed);
1353 cur_queue = atomic_load_explicit(
1354 &fnc->counters.ctxqueue_len,
1355 memory_order_relaxed);
1356 peak_queue = atomic_load_explicit(
1357 &fnc->counters.ctxqueue_len_peak,
1358 memory_order_relaxed);
edfeff42 1359 if (peak_queue < cur_queue)
c871e6c9
RZ
1360 atomic_store_explicit(
1361 &fnc->counters.ctxqueue_len_peak,
1362 peak_queue, memory_order_relaxed);
ba803a2f 1363 continue;
6cc059cd
RZ
1364 }
1365
d35f447d
RZ
1366 dplane_ctx_set_status(ctx, ZEBRA_DPLANE_REQUEST_SUCCESS);
1367 dplane_provider_enqueue_out_ctx(prov, ctx);
1368 }
1369
c871e6c9
RZ
1370 if (atomic_load_explicit(&fnc->counters.ctxqueue_len,
1371 memory_order_relaxed)
1372 > 0)
ba803a2f
RZ
1373 thread_add_timer(fnc->fthread->master, fpm_process_queue,
1374 fnc, 0, &fnc->t_dequeue);
1375
d35f447d
RZ
1376 return 0;
1377}
1378
1379static int fpm_nl_new(struct thread_master *tm)
1380{
1381 struct zebra_dplane_provider *prov = NULL;
d35f447d
RZ
1382 int rv;
1383
3bdd7fca 1384 gfnc = calloc(1, sizeof(*gfnc));
d35f447d
RZ
1385 rv = dplane_provider_register(prov_name, DPLANE_PRIO_POSTPROCESS,
1386 DPLANE_PROV_FLAG_THREADED, fpm_nl_start,
3bdd7fca 1387 fpm_nl_process, fpm_nl_finish, gfnc,
d35f447d
RZ
1388 &prov);
1389
1390 if (IS_ZEBRA_DEBUG_DPLANE)
1391 zlog_debug("%s register status: %d", prov_name, rv);
1392
612c2c15 1393 install_node(&fpm_node);
6cc059cd
RZ
1394 install_element(ENABLE_NODE, &fpm_show_counters_cmd);
1395 install_element(ENABLE_NODE, &fpm_show_counters_json_cmd);
1396 install_element(ENABLE_NODE, &fpm_reset_counters_cmd);
3bdd7fca
RZ
1397 install_element(CONFIG_NODE, &fpm_set_address_cmd);
1398 install_element(CONFIG_NODE, &no_fpm_set_address_cmd);
b55ab92a
RZ
1399 install_element(CONFIG_NODE, &fpm_use_nhg_cmd);
1400 install_element(CONFIG_NODE, &no_fpm_use_nhg_cmd);
3bdd7fca 1401
d35f447d
RZ
1402 return 0;
1403}
1404
1405static int fpm_nl_init(void)
1406{
1407 hook_register(frr_late_init, fpm_nl_new);
1408 return 0;
1409}
1410
1411FRR_MODULE_SETUP(
1412 .name = "dplane_fpm_nl",
1413 .version = "0.0.1",
1414 .description = "Data plane plugin for FPM using netlink.",
1415 .init = fpm_nl_init,
1416 )