]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_fpm.c
zebra: On shutdown stop hook calls for fpm rmac updates
[mirror_frr.git] / zebra / zebra_fpm.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * Main implementation file for interface to Forwarding Plane Manager.
4 *
5 * Copyright (C) 2012 by Open Source Routing.
6 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
7 */
8
9 #include <zebra.h>
10
11 #include "log.h"
12 #include "libfrr.h"
13 #include "stream.h"
14 #include "frrevent.h"
15 #include "network.h"
16 #include "command.h"
17 #include "lib/version.h"
18 #include "jhash.h"
19
20 #include "zebra/rib.h"
21 #include "zebra/zserv.h"
22 #include "zebra/zebra_ns.h"
23 #include "zebra/zebra_vrf.h"
24 #include "zebra/zebra_errors.h"
25
26 #include "fpm/fpm.h"
27 #include "zebra_fpm_private.h"
28 #include "zebra/zebra_router.h"
29 #include "zebra_vxlan_private.h"
30
31 DEFINE_MTYPE_STATIC(ZEBRA, FPM_MAC_INFO, "FPM_MAC_INFO");
32
33 /*
34 * Interval at which we attempt to connect to the FPM.
35 */
36 #define ZFPM_CONNECT_RETRY_IVL 5
37
38 /*
39 * Sizes of outgoing and incoming stream buffers for writing/reading
40 * FPM messages.
41 */
42 #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
43 #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
44
45 /*
46 * The maximum number of times the FPM socket write callback can call
47 * 'write' before it yields.
48 */
49 #define ZFPM_MAX_WRITES_PER_RUN 10
50
51 /*
52 * Interval over which we collect statistics.
53 */
54 #define ZFPM_STATS_IVL_SECS 10
55 #define FPM_MAX_MAC_MSG_LEN 512
56
57 static void zfpm_iterate_rmac_table(struct hash_bucket *bucket, void *args);
58
59 /*
60 * Structure that holds state for iterating over all route_node
61 * structures that are candidates for being communicated to the FPM.
62 */
63 struct zfpm_rnodes_iter {
64 rib_tables_iter_t tables_iter;
65 route_table_iter_t iter;
66 };
67
68 /*
69 * Statistics.
70 */
71 struct zfpm_stats {
72 unsigned long connect_calls;
73 unsigned long connect_no_sock;
74
75 unsigned long read_cb_calls;
76
77 unsigned long write_cb_calls;
78 unsigned long write_calls;
79 unsigned long partial_writes;
80 unsigned long max_writes_hit;
81 unsigned long t_write_yields;
82
83 unsigned long nop_deletes_skipped;
84 unsigned long route_adds;
85 unsigned long route_dels;
86
87 unsigned long updates_triggered;
88 unsigned long redundant_triggers;
89
90 unsigned long dests_del_after_update;
91
92 unsigned long t_conn_down_starts;
93 unsigned long t_conn_down_dests_processed;
94 unsigned long t_conn_down_yields;
95 unsigned long t_conn_down_finishes;
96
97 unsigned long t_conn_up_starts;
98 unsigned long t_conn_up_dests_processed;
99 unsigned long t_conn_up_yields;
100 unsigned long t_conn_up_aborts;
101 unsigned long t_conn_up_finishes;
102 };
103
104 /*
105 * States for the FPM state machine.
106 */
107 enum zfpm_state {
108
109 /*
110 * In this state we are not yet ready to connect to the FPM. This
111 * can happen when this module is disabled, or if we're cleaning up
112 * after a connection has gone down.
113 */
114 ZFPM_STATE_IDLE,
115
116 /*
117 * Ready to talk to the FPM and periodically trying to connect to
118 * it.
119 */
120 ZFPM_STATE_ACTIVE,
121
122 /*
123 * In the middle of bringing up a TCP connection. Specifically,
124 * waiting for a connect() call to complete asynchronously.
125 */
126 ZFPM_STATE_CONNECTING,
127
128 /*
129 * TCP connection to the FPM is up.
130 */
131 ZFPM_STATE_ESTABLISHED
132
133 };
134
135 /*
136 * Message format to be used to communicate with the FPM.
137 */
138 enum zfpm_msg_format {
139 ZFPM_MSG_FORMAT_NONE,
140 ZFPM_MSG_FORMAT_NETLINK,
141 ZFPM_MSG_FORMAT_PROTOBUF,
142 };
143
144 /*
145 * Globals.
146 */
147 struct zfpm_glob {
148
149 /*
150 * True if the FPM module has been enabled.
151 */
152 int enabled;
153
154 /*
155 * Message format to be used to communicate with the fpm.
156 */
157 enum zfpm_msg_format message_format;
158
159 struct event_loop *master;
160
161 enum zfpm_state state;
162
163 in_addr_t fpm_server;
164 /*
165 * Port on which the FPM is running.
166 */
167 int fpm_port;
168
169 /*
170 * List of rib_dest_t structures to be processed
171 */
172 TAILQ_HEAD(zfpm_dest_q, rib_dest_t_) dest_q;
173
174 /*
175 * List of fpm_mac_info structures to be processed
176 */
177 TAILQ_HEAD(zfpm_mac_q, fpm_mac_info_t) mac_q;
178
179 /*
180 * Hash table of fpm_mac_info_t entries
181 *
182 * While adding fpm_mac_info_t for a MAC to the mac_q,
183 * it is possible that another fpm_mac_info_t node for the this MAC
184 * is already present in the queue.
185 * This is possible in the case of consecutive add->delete operations.
186 * To avoid such duplicate insertions in the mac_q,
187 * define a hash table for fpm_mac_info_t which can be looked up
188 * to see if an fpm_mac_info_t node for a MAC is already present
189 * in the mac_q.
190 */
191 struct hash *fpm_mac_info_table;
192
193 /*
194 * Stream socket to the FPM.
195 */
196 int sock;
197
198 /*
199 * Buffers for messages to/from the FPM.
200 */
201 struct stream *obuf;
202 struct stream *ibuf;
203
204 /*
205 * Threads for I/O.
206 */
207 struct event *t_connect;
208 struct event *t_write;
209 struct event *t_read;
210
211 /*
212 * Thread to clean up after the TCP connection to the FPM goes down
213 * and the state that belongs to it.
214 */
215 struct event *t_conn_down;
216
217 struct {
218 struct zfpm_rnodes_iter iter;
219 } t_conn_down_state;
220
221 /*
222 * Thread to take actions once the TCP conn to the FPM comes up, and
223 * the state that belongs to it.
224 */
225 struct event *t_conn_up;
226
227 struct {
228 struct zfpm_rnodes_iter iter;
229 } t_conn_up_state;
230
231 unsigned long connect_calls;
232 time_t last_connect_call_time;
233
234 /*
235 * Stats from the start of the current statistics interval up to
236 * now. These are the counters we typically update in the code.
237 */
238 struct zfpm_stats stats;
239
240 /*
241 * Statistics that were gathered in the last collection interval.
242 */
243 struct zfpm_stats last_ivl_stats;
244
245 /*
246 * Cumulative stats from the last clear to the start of the current
247 * statistics interval.
248 */
249 struct zfpm_stats cumulative_stats;
250
251 /*
252 * Stats interval timer.
253 */
254 struct event *t_stats;
255
256 /*
257 * If non-zero, the last time when statistics were cleared.
258 */
259 time_t last_stats_clear_time;
260
261 /*
262 * Flag to track the MAC dump status to FPM
263 */
264 bool fpm_mac_dump_done;
265 };
266
267 static struct zfpm_glob zfpm_glob_space;
268 static struct zfpm_glob *zfpm_g = &zfpm_glob_space;
269
270 static int zfpm_trigger_update(struct route_node *rn, const char *reason);
271
272 static void zfpm_read_cb(struct event *thread);
273 static void zfpm_write_cb(struct event *thread);
274
275 static void zfpm_set_state(enum zfpm_state state, const char *reason);
276 static void zfpm_start_connect_timer(const char *reason);
277 static void zfpm_start_stats_timer(void);
278 static void zfpm_mac_info_del(struct fpm_mac_info_t *fpm_mac);
279
280 static const char ipv4_ll_buf[16] = "169.254.0.1";
281 union g_addr ipv4ll_gateway;
282
283 /*
284 * zfpm_thread_should_yield
285 */
286 static inline int zfpm_thread_should_yield(struct event *t)
287 {
288 return event_should_yield(t);
289 }
290
291 /*
292 * zfpm_state_to_str
293 */
294 static const char *zfpm_state_to_str(enum zfpm_state state)
295 {
296 switch (state) {
297
298 case ZFPM_STATE_IDLE:
299 return "idle";
300
301 case ZFPM_STATE_ACTIVE:
302 return "active";
303
304 case ZFPM_STATE_CONNECTING:
305 return "connecting";
306
307 case ZFPM_STATE_ESTABLISHED:
308 return "established";
309
310 default:
311 return "unknown";
312 }
313 }
314
315 /*
316 * zfpm_get_elapsed_time
317 *
318 * Returns the time elapsed (in seconds) since the given time.
319 */
320 static time_t zfpm_get_elapsed_time(time_t reference)
321 {
322 time_t now;
323
324 now = monotime(NULL);
325
326 if (now < reference) {
327 assert(0);
328 return 0;
329 }
330
331 return now - reference;
332 }
333
334 /*
335 * zfpm_rnodes_iter_init
336 */
337 static inline void zfpm_rnodes_iter_init(struct zfpm_rnodes_iter *iter)
338 {
339 memset(iter, 0, sizeof(*iter));
340 rib_tables_iter_init(&iter->tables_iter);
341
342 /*
343 * This is a hack, but it makes implementing 'next' easier by
344 * ensuring that route_table_iter_next() will return NULL the first
345 * time we call it.
346 */
347 route_table_iter_init(&iter->iter, NULL);
348 route_table_iter_cleanup(&iter->iter);
349 }
350
351 /*
352 * zfpm_rnodes_iter_next
353 */
354 static inline struct route_node *
355 zfpm_rnodes_iter_next(struct zfpm_rnodes_iter *iter)
356 {
357 struct route_node *rn;
358 struct route_table *table;
359
360 while (1) {
361 rn = route_table_iter_next(&iter->iter);
362 if (rn)
363 return rn;
364
365 /*
366 * We've made our way through this table, go to the next one.
367 */
368 route_table_iter_cleanup(&iter->iter);
369
370 table = rib_tables_iter_next(&iter->tables_iter);
371
372 if (!table)
373 return NULL;
374
375 route_table_iter_init(&iter->iter, table);
376 }
377
378 return NULL;
379 }
380
381 /*
382 * zfpm_rnodes_iter_pause
383 */
384 static inline void zfpm_rnodes_iter_pause(struct zfpm_rnodes_iter *iter)
385 {
386 route_table_iter_pause(&iter->iter);
387 }
388
389 /*
390 * zfpm_rnodes_iter_cleanup
391 */
392 static inline void zfpm_rnodes_iter_cleanup(struct zfpm_rnodes_iter *iter)
393 {
394 route_table_iter_cleanup(&iter->iter);
395 rib_tables_iter_cleanup(&iter->tables_iter);
396 }
397
398 /*
399 * zfpm_stats_init
400 *
401 * Initialize a statistics block.
402 */
403 static inline void zfpm_stats_init(struct zfpm_stats *stats)
404 {
405 memset(stats, 0, sizeof(*stats));
406 }
407
408 /*
409 * zfpm_stats_reset
410 */
411 static inline void zfpm_stats_reset(struct zfpm_stats *stats)
412 {
413 zfpm_stats_init(stats);
414 }
415
416 /*
417 * zfpm_stats_copy
418 */
419 static inline void zfpm_stats_copy(const struct zfpm_stats *src,
420 struct zfpm_stats *dest)
421 {
422 memcpy(dest, src, sizeof(*dest));
423 }
424
425 /*
426 * zfpm_stats_compose
427 *
428 * Total up the statistics in two stats structures ('s1 and 's2') and
429 * return the result in the third argument, 'result'. Note that the
430 * pointer 'result' may be the same as 's1' or 's2'.
431 *
432 * For simplicity, the implementation below assumes that the stats
433 * structure is composed entirely of counters. This can easily be
434 * changed when necessary.
435 */
436 static void zfpm_stats_compose(const struct zfpm_stats *s1,
437 const struct zfpm_stats *s2,
438 struct zfpm_stats *result)
439 {
440 const unsigned long *p1, *p2;
441 unsigned long *result_p;
442 int i, num_counters;
443
444 p1 = (const unsigned long *)s1;
445 p2 = (const unsigned long *)s2;
446 result_p = (unsigned long *)result;
447
448 num_counters = (sizeof(struct zfpm_stats) / sizeof(unsigned long));
449
450 for (i = 0; i < num_counters; i++) {
451 result_p[i] = p1[i] + p2[i];
452 }
453 }
454
455 /*
456 * zfpm_read_on
457 */
458 static inline void zfpm_read_on(void)
459 {
460 assert(!zfpm_g->t_read);
461 assert(zfpm_g->sock >= 0);
462
463 event_add_read(zfpm_g->master, zfpm_read_cb, 0, zfpm_g->sock,
464 &zfpm_g->t_read);
465 }
466
467 /*
468 * zfpm_write_on
469 */
470 static inline void zfpm_write_on(void)
471 {
472 assert(!zfpm_g->t_write);
473 assert(zfpm_g->sock >= 0);
474
475 event_add_write(zfpm_g->master, zfpm_write_cb, 0, zfpm_g->sock,
476 &zfpm_g->t_write);
477 }
478
479 /*
480 * zfpm_read_off
481 */
482 static inline void zfpm_read_off(void)
483 {
484 EVENT_OFF(zfpm_g->t_read);
485 }
486
487 /*
488 * zfpm_write_off
489 */
490 static inline void zfpm_write_off(void)
491 {
492 EVENT_OFF(zfpm_g->t_write);
493 }
494
495 static inline void zfpm_connect_off(void)
496 {
497 EVENT_OFF(zfpm_g->t_connect);
498 }
499
500 static inline void zfpm_conn_down_off(void)
501 {
502 EVENT_OFF(zfpm_g->t_conn_down);
503 }
504
505 /*
506 * zfpm_conn_up_thread_cb
507 *
508 * Callback for actions to be taken when the connection to the FPM
509 * comes up.
510 */
511 static void zfpm_conn_up_thread_cb(struct event *thread)
512 {
513 struct route_node *rnode;
514 struct zfpm_rnodes_iter *iter;
515 rib_dest_t *dest;
516
517 iter = &zfpm_g->t_conn_up_state.iter;
518
519 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED) {
520 zfpm_debug(
521 "Connection not up anymore, conn_up thread aborting");
522 zfpm_g->stats.t_conn_up_aborts++;
523 goto done;
524 }
525
526 if (!zfpm_g->fpm_mac_dump_done) {
527 /* Enqueue FPM updates for all the RMAC entries */
528 hash_iterate(zrouter.l3vni_table, zfpm_iterate_rmac_table,
529 NULL);
530 /* mark dump done so that its not repeated after yield */
531 zfpm_g->fpm_mac_dump_done = true;
532 }
533
534 while ((rnode = zfpm_rnodes_iter_next(iter))) {
535 dest = rib_dest_from_rnode(rnode);
536
537 if (dest) {
538 zfpm_g->stats.t_conn_up_dests_processed++;
539 zfpm_trigger_update(rnode, NULL);
540 }
541
542 /*
543 * Yield if need be.
544 */
545 if (!zfpm_thread_should_yield(thread))
546 continue;
547
548 zfpm_g->stats.t_conn_up_yields++;
549 zfpm_rnodes_iter_pause(iter);
550 event_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb,
551 NULL, 0, &zfpm_g->t_conn_up);
552 return;
553 }
554
555 zfpm_g->stats.t_conn_up_finishes++;
556
557 done:
558 zfpm_rnodes_iter_cleanup(iter);
559 }
560
561 /*
562 * zfpm_connection_up
563 *
564 * Called when the connection to the FPM comes up.
565 */
566 static void zfpm_connection_up(const char *detail)
567 {
568 assert(zfpm_g->sock >= 0);
569 zfpm_read_on();
570 zfpm_write_on();
571 zfpm_set_state(ZFPM_STATE_ESTABLISHED, detail);
572
573 /*
574 * Start thread to push existing routes to the FPM.
575 */
576 EVENT_OFF(zfpm_g->t_conn_up);
577
578 zfpm_rnodes_iter_init(&zfpm_g->t_conn_up_state.iter);
579 zfpm_g->fpm_mac_dump_done = false;
580
581 zfpm_debug("Starting conn_up thread");
582
583 event_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0,
584 &zfpm_g->t_conn_up);
585 zfpm_g->stats.t_conn_up_starts++;
586 }
587
588 /*
589 * zfpm_connect_check
590 *
591 * Check if an asynchronous connect() to the FPM is complete.
592 */
593 static void zfpm_connect_check(void)
594 {
595 int status;
596 socklen_t slen;
597 int ret;
598
599 zfpm_read_off();
600 zfpm_write_off();
601
602 slen = sizeof(status);
603 ret = getsockopt(zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *)&status,
604 &slen);
605
606 if (ret >= 0 && status == 0) {
607 zfpm_connection_up("async connect complete");
608 return;
609 }
610
611 /*
612 * getsockopt() failed or indicated an error on the socket.
613 */
614 close(zfpm_g->sock);
615 zfpm_g->sock = -1;
616
617 zfpm_start_connect_timer("getsockopt() after async connect failed");
618 return;
619 }
620
621 /*
622 * zfpm_conn_down_thread_cb
623 *
624 * Callback that is invoked to clean up state after the TCP connection
625 * to the FPM goes down.
626 */
627 static void zfpm_conn_down_thread_cb(struct event *thread)
628 {
629 struct route_node *rnode;
630 struct zfpm_rnodes_iter *iter;
631 rib_dest_t *dest;
632 struct fpm_mac_info_t *mac = NULL;
633
634 assert(zfpm_g->state == ZFPM_STATE_IDLE);
635
636 /*
637 * Delink and free all fpm_mac_info_t nodes
638 * in the mac_q and fpm_mac_info_hash
639 */
640 while ((mac = TAILQ_FIRST(&zfpm_g->mac_q)) != NULL)
641 zfpm_mac_info_del(mac);
642
643 iter = &zfpm_g->t_conn_down_state.iter;
644
645 while ((rnode = zfpm_rnodes_iter_next(iter))) {
646 dest = rib_dest_from_rnode(rnode);
647
648 if (dest) {
649 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
650 TAILQ_REMOVE(&zfpm_g->dest_q, dest,
651 fpm_q_entries);
652 }
653
654 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
655 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
656
657 zfpm_g->stats.t_conn_down_dests_processed++;
658
659 /*
660 * Check if the dest should be deleted.
661 */
662 rib_gc_dest(rnode);
663 }
664
665 /*
666 * Yield if need be.
667 */
668 if (!zfpm_thread_should_yield(thread))
669 continue;
670
671 zfpm_g->stats.t_conn_down_yields++;
672 zfpm_rnodes_iter_pause(iter);
673 event_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb,
674 NULL, 0, &zfpm_g->t_conn_down);
675 return;
676 }
677
678 zfpm_g->stats.t_conn_down_finishes++;
679 zfpm_rnodes_iter_cleanup(iter);
680
681 /*
682 * Start the process of connecting to the FPM again.
683 */
684 zfpm_start_connect_timer("cleanup complete");
685 }
686
687 /*
688 * zfpm_connection_down
689 *
690 * Called when the connection to the FPM has gone down.
691 */
692 static void zfpm_connection_down(const char *detail)
693 {
694 if (!detail)
695 detail = "unknown";
696
697 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
698
699 zlog_info("connection to the FPM has gone down: %s", detail);
700
701 zfpm_read_off();
702 zfpm_write_off();
703
704 stream_reset(zfpm_g->ibuf);
705 stream_reset(zfpm_g->obuf);
706
707 if (zfpm_g->sock >= 0) {
708 close(zfpm_g->sock);
709 zfpm_g->sock = -1;
710 }
711
712 /*
713 * Start thread to clean up state after the connection goes down.
714 */
715 assert(!zfpm_g->t_conn_down);
716 zfpm_rnodes_iter_init(&zfpm_g->t_conn_down_state.iter);
717 zfpm_conn_down_off();
718 event_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0,
719 &zfpm_g->t_conn_down);
720 zfpm_g->stats.t_conn_down_starts++;
721
722 zfpm_set_state(ZFPM_STATE_IDLE, detail);
723 }
724
725 /*
726 * zfpm_read_cb
727 */
728 static void zfpm_read_cb(struct event *thread)
729 {
730 size_t already;
731 struct stream *ibuf;
732 uint16_t msg_len;
733 fpm_msg_hdr_t *hdr;
734
735 zfpm_g->stats.read_cb_calls++;
736
737 /*
738 * Check if async connect is now done.
739 */
740 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
741 zfpm_connect_check();
742 return;
743 }
744
745 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
746 assert(zfpm_g->sock >= 0);
747
748 ibuf = zfpm_g->ibuf;
749
750 already = stream_get_endp(ibuf);
751 if (already < FPM_MSG_HDR_LEN) {
752 ssize_t nbyte;
753
754 nbyte = stream_read_try(ibuf, zfpm_g->sock,
755 FPM_MSG_HDR_LEN - already);
756 if (nbyte == 0 || nbyte == -1) {
757 if (nbyte == -1) {
758 char buffer[1024];
759
760 snprintf(buffer, sizeof(buffer),
761 "closed socket in read(%d): %s", errno,
762 safe_strerror(errno));
763 zfpm_connection_down(buffer);
764 } else
765 zfpm_connection_down("closed socket in read");
766 return;
767 }
768
769 if (nbyte != (ssize_t)(FPM_MSG_HDR_LEN - already))
770 goto done;
771
772 already = FPM_MSG_HDR_LEN;
773 }
774
775 stream_set_getp(ibuf, 0);
776
777 hdr = (fpm_msg_hdr_t *)stream_pnt(ibuf);
778
779 if (!fpm_msg_hdr_ok(hdr)) {
780 zfpm_connection_down("invalid message header");
781 return;
782 }
783
784 msg_len = fpm_msg_len(hdr);
785
786 /*
787 * Read out the rest of the packet.
788 */
789 if (already < msg_len) {
790 ssize_t nbyte;
791
792 nbyte = stream_read_try(ibuf, zfpm_g->sock, msg_len - already);
793
794 if (nbyte == 0 || nbyte == -1) {
795 if (nbyte == -1) {
796 char buffer[1024];
797
798 snprintf(buffer, sizeof(buffer),
799 "failed to read message(%d) %s", errno,
800 safe_strerror(errno));
801 zfpm_connection_down(buffer);
802 } else
803 zfpm_connection_down("failed to read message");
804 return;
805 }
806
807 if (nbyte != (ssize_t)(msg_len - already))
808 goto done;
809 }
810
811 /*
812 * Just throw it away for now.
813 */
814 stream_reset(ibuf);
815
816 done:
817 zfpm_read_on();
818 }
819
820 static bool zfpm_updates_pending(void)
821 {
822 if (!(TAILQ_EMPTY(&zfpm_g->dest_q)) || !(TAILQ_EMPTY(&zfpm_g->mac_q)))
823 return true;
824
825 return false;
826 }
827
828 /*
829 * zfpm_writes_pending
830 *
831 * Returns true if we may have something to write to the FPM.
832 */
833 static int zfpm_writes_pending(void)
834 {
835
836 /*
837 * Check if there is any data in the outbound buffer that has not
838 * been written to the socket yet.
839 */
840 if (stream_get_endp(zfpm_g->obuf) - stream_get_getp(zfpm_g->obuf))
841 return 1;
842
843 /*
844 * Check if there are any updates scheduled on the outbound queues.
845 */
846 if (zfpm_updates_pending())
847 return 1;
848
849 return 0;
850 }
851
852 /*
853 * zfpm_encode_route
854 *
855 * Encode a message to the FPM with information about the given route.
856 *
857 * Returns the number of bytes written to the buffer. 0 or a negative
858 * value indicates an error.
859 */
860 static inline int zfpm_encode_route(rib_dest_t *dest, struct route_entry *re,
861 char *in_buf, size_t in_buf_len,
862 fpm_msg_type_e *msg_type)
863 {
864 size_t len;
865 #ifdef HAVE_NETLINK
866 int cmd;
867 #endif
868 len = 0;
869
870 *msg_type = FPM_MSG_TYPE_NONE;
871
872 switch (zfpm_g->message_format) {
873
874 case ZFPM_MSG_FORMAT_PROTOBUF:
875 #ifdef HAVE_PROTOBUF
876 len = zfpm_protobuf_encode_route(dest, re, (uint8_t *)in_buf,
877 in_buf_len);
878 *msg_type = FPM_MSG_TYPE_PROTOBUF;
879 #endif
880 break;
881
882 case ZFPM_MSG_FORMAT_NETLINK:
883 #ifdef HAVE_NETLINK
884 *msg_type = FPM_MSG_TYPE_NETLINK;
885 cmd = re ? RTM_NEWROUTE : RTM_DELROUTE;
886 len = zfpm_netlink_encode_route(cmd, dest, re, in_buf,
887 in_buf_len);
888 assert(fpm_msg_align(len) == len);
889 #endif /* HAVE_NETLINK */
890 break;
891
892 case ZFPM_MSG_FORMAT_NONE:
893 break;
894 }
895
896 return len;
897 }
898
899 /*
900 * zfpm_route_for_update
901 *
902 * Returns the re that is to be sent to the FPM for a given dest.
903 */
904 struct route_entry *zfpm_route_for_update(rib_dest_t *dest)
905 {
906 return dest->selected_fib;
907 }
908
909 /*
910 * Define an enum for return codes for queue processing functions
911 *
912 * FPM_WRITE_STOP: This return code indicates that the write buffer is full.
913 * Stop processing all the queues and empty the buffer by writing its content
914 * to the socket.
915 *
916 * FPM_GOTO_NEXT_Q: This return code indicates that either this queue is
917 * empty or we have processed enough updates from this queue.
918 * So, move on to the next queue.
919 */
920 enum {
921 FPM_WRITE_STOP = 0,
922 FPM_GOTO_NEXT_Q = 1
923 };
924
925 #define FPM_QUEUE_PROCESS_LIMIT 10000
926
927 /*
928 * zfpm_build_route_updates
929 *
930 * Process the dest_q queue and write FPM messages to the outbound buffer.
931 */
932 static int zfpm_build_route_updates(void)
933 {
934 struct stream *s;
935 rib_dest_t *dest;
936 unsigned char *buf, *data, *buf_end;
937 size_t msg_len;
938 size_t data_len;
939 fpm_msg_hdr_t *hdr;
940 struct route_entry *re;
941 int is_add, write_msg;
942 fpm_msg_type_e msg_type;
943 uint16_t q_limit;
944
945 if (TAILQ_EMPTY(&zfpm_g->dest_q))
946 return FPM_GOTO_NEXT_Q;
947
948 s = zfpm_g->obuf;
949 q_limit = FPM_QUEUE_PROCESS_LIMIT;
950
951 do {
952 /*
953 * Make sure there is enough space to write another message.
954 */
955 if (STREAM_WRITEABLE(s) < FPM_MAX_MSG_LEN)
956 return FPM_WRITE_STOP;
957
958 buf = STREAM_DATA(s) + stream_get_endp(s);
959 buf_end = buf + STREAM_WRITEABLE(s);
960
961 dest = TAILQ_FIRST(&zfpm_g->dest_q);
962 if (!dest)
963 return FPM_GOTO_NEXT_Q;
964
965 assert(CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM));
966
967 hdr = (fpm_msg_hdr_t *)buf;
968 hdr->version = FPM_PROTO_VERSION;
969
970 data = fpm_msg_data(hdr);
971
972 re = zfpm_route_for_update(dest);
973 is_add = re ? 1 : 0;
974
975 write_msg = 1;
976
977 /*
978 * If this is a route deletion, and we have not sent the route
979 * to
980 * the FPM previously, skip it.
981 */
982 if (!is_add && !CHECK_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM)) {
983 write_msg = 0;
984 zfpm_g->stats.nop_deletes_skipped++;
985 }
986
987 if (write_msg) {
988 data_len = zfpm_encode_route(dest, re, (char *)data,
989 buf_end - data, &msg_type);
990
991 if (data_len) {
992 hdr->msg_type = msg_type;
993 msg_len = fpm_data_len_to_msg_len(data_len);
994 hdr->msg_len = htons(msg_len);
995 stream_forward_endp(s, msg_len);
996
997 if (is_add)
998 zfpm_g->stats.route_adds++;
999 else
1000 zfpm_g->stats.route_dels++;
1001 } else {
1002 zlog_err("%s: Encoding Prefix: %pRN No valid nexthops",
1003 __func__, dest->rnode);
1004 }
1005 }
1006
1007 /*
1008 * Remove the dest from the queue, and reset the flag.
1009 */
1010 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1011 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
1012
1013 if (is_add) {
1014 SET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
1015 } else {
1016 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
1017 }
1018
1019 /*
1020 * Delete the destination if necessary.
1021 */
1022 if (rib_gc_dest(dest->rnode))
1023 zfpm_g->stats.dests_del_after_update++;
1024
1025 q_limit--;
1026 if (q_limit == 0) {
1027 /*
1028 * We have processed enough updates in this queue.
1029 * Now yield for other queues.
1030 */
1031 return FPM_GOTO_NEXT_Q;
1032 }
1033 } while (true);
1034 }
1035
1036 /*
1037 * zfpm_encode_mac
1038 *
1039 * Encode a message to FPM with information about the given MAC.
1040 *
1041 * Returns the number of bytes written to the buffer.
1042 */
1043 static inline int zfpm_encode_mac(struct fpm_mac_info_t *mac, char *in_buf,
1044 size_t in_buf_len, fpm_msg_type_e *msg_type)
1045 {
1046 size_t len = 0;
1047
1048 *msg_type = FPM_MSG_TYPE_NONE;
1049
1050 switch (zfpm_g->message_format) {
1051
1052 case ZFPM_MSG_FORMAT_NONE:
1053 break;
1054 case ZFPM_MSG_FORMAT_NETLINK:
1055 #ifdef HAVE_NETLINK
1056 len = zfpm_netlink_encode_mac(mac, in_buf, in_buf_len);
1057 assert(fpm_msg_align(len) == len);
1058 *msg_type = FPM_MSG_TYPE_NETLINK;
1059 #endif /* HAVE_NETLINK */
1060 break;
1061 case ZFPM_MSG_FORMAT_PROTOBUF:
1062 break;
1063 }
1064 return len;
1065 }
1066
1067 static int zfpm_build_mac_updates(void)
1068 {
1069 struct stream *s;
1070 struct fpm_mac_info_t *mac;
1071 unsigned char *buf, *data, *buf_end;
1072 fpm_msg_hdr_t *hdr;
1073 size_t data_len, msg_len;
1074 fpm_msg_type_e msg_type;
1075 uint16_t q_limit;
1076
1077 if (TAILQ_EMPTY(&zfpm_g->mac_q))
1078 return FPM_GOTO_NEXT_Q;
1079
1080 s = zfpm_g->obuf;
1081 q_limit = FPM_QUEUE_PROCESS_LIMIT;
1082
1083 do {
1084 /* Make sure there is enough space to write another message. */
1085 if (STREAM_WRITEABLE(s) < FPM_MAX_MAC_MSG_LEN)
1086 return FPM_WRITE_STOP;
1087
1088 buf = STREAM_DATA(s) + stream_get_endp(s);
1089 buf_end = buf + STREAM_WRITEABLE(s);
1090
1091 mac = TAILQ_FIRST(&zfpm_g->mac_q);
1092 if (!mac)
1093 return FPM_GOTO_NEXT_Q;
1094
1095 /* Check for no-op */
1096 if (!CHECK_FLAG(mac->fpm_flags, ZEBRA_MAC_UPDATE_FPM)) {
1097 zfpm_g->stats.nop_deletes_skipped++;
1098 zfpm_mac_info_del(mac);
1099 continue;
1100 }
1101
1102 hdr = (fpm_msg_hdr_t *)buf;
1103 hdr->version = FPM_PROTO_VERSION;
1104
1105 data = fpm_msg_data(hdr);
1106 data_len = zfpm_encode_mac(mac, (char *)data, buf_end - data,
1107 &msg_type);
1108 assert(data_len);
1109
1110 hdr->msg_type = msg_type;
1111 msg_len = fpm_data_len_to_msg_len(data_len);
1112 hdr->msg_len = htons(msg_len);
1113 stream_forward_endp(s, msg_len);
1114
1115 /* Remove the MAC from the queue, and delete it. */
1116 zfpm_mac_info_del(mac);
1117
1118 q_limit--;
1119 if (q_limit == 0) {
1120 /*
1121 * We have processed enough updates in this queue.
1122 * Now yield for other queues.
1123 */
1124 return FPM_GOTO_NEXT_Q;
1125 }
1126 } while (1);
1127 }
1128
1129 /*
1130 * zfpm_build_updates
1131 *
1132 * Process the outgoing queues and write messages to the outbound
1133 * buffer.
1134 */
1135 static void zfpm_build_updates(void)
1136 {
1137 struct stream *s;
1138
1139 s = zfpm_g->obuf;
1140 assert(stream_empty(s));
1141
1142 do {
1143 /*
1144 * Stop processing the queues if zfpm_g->obuf is full
1145 * or we do not have more updates to process
1146 */
1147 if (zfpm_build_mac_updates() == FPM_WRITE_STOP)
1148 break;
1149 if (zfpm_build_route_updates() == FPM_WRITE_STOP)
1150 break;
1151 } while (zfpm_updates_pending());
1152 }
1153
1154 /*
1155 * zfpm_write_cb
1156 */
1157 static void zfpm_write_cb(struct event *thread)
1158 {
1159 struct stream *s;
1160 int num_writes;
1161
1162 zfpm_g->stats.write_cb_calls++;
1163
1164 /*
1165 * Check if async connect is now done.
1166 */
1167 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
1168 zfpm_connect_check();
1169 return;
1170 }
1171
1172 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1173 assert(zfpm_g->sock >= 0);
1174
1175 num_writes = 0;
1176
1177 do {
1178 int bytes_to_write, bytes_written;
1179
1180 s = zfpm_g->obuf;
1181
1182 /*
1183 * If the stream is empty, try fill it up with data.
1184 */
1185 if (stream_empty(s)) {
1186 zfpm_build_updates();
1187 }
1188
1189 bytes_to_write = stream_get_endp(s) - stream_get_getp(s);
1190 if (!bytes_to_write)
1191 break;
1192
1193 bytes_written =
1194 write(zfpm_g->sock, stream_pnt(s), bytes_to_write);
1195 zfpm_g->stats.write_calls++;
1196 num_writes++;
1197
1198 if (bytes_written < 0) {
1199 if (ERRNO_IO_RETRY(errno))
1200 break;
1201
1202 zfpm_connection_down("failed to write to socket");
1203 return;
1204 }
1205
1206 if (bytes_written != bytes_to_write) {
1207
1208 /*
1209 * Partial write.
1210 */
1211 stream_forward_getp(s, bytes_written);
1212 zfpm_g->stats.partial_writes++;
1213 break;
1214 }
1215
1216 /*
1217 * We've written out the entire contents of the stream.
1218 */
1219 stream_reset(s);
1220
1221 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN) {
1222 zfpm_g->stats.max_writes_hit++;
1223 break;
1224 }
1225
1226 if (zfpm_thread_should_yield(thread)) {
1227 zfpm_g->stats.t_write_yields++;
1228 break;
1229 }
1230 } while (1);
1231
1232 if (zfpm_writes_pending())
1233 zfpm_write_on();
1234 }
1235
1236 /*
1237 * zfpm_connect_cb
1238 */
1239 static void zfpm_connect_cb(struct event *t)
1240 {
1241 int sock, ret;
1242 struct sockaddr_in serv;
1243
1244 assert(zfpm_g->state == ZFPM_STATE_ACTIVE);
1245
1246 sock = socket(AF_INET, SOCK_STREAM, 0);
1247 if (sock < 0) {
1248 zlog_err("Failed to create socket for connect(): %s",
1249 strerror(errno));
1250 zfpm_g->stats.connect_no_sock++;
1251 return;
1252 }
1253
1254 set_nonblocking(sock);
1255
1256 /* Make server socket. */
1257 memset(&serv, 0, sizeof(serv));
1258 serv.sin_family = AF_INET;
1259 serv.sin_port = htons(zfpm_g->fpm_port);
1260 #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1261 serv.sin_len = sizeof(struct sockaddr_in);
1262 #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1263 if (!zfpm_g->fpm_server)
1264 serv.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1265 else
1266 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1267
1268 /*
1269 * Connect to the FPM.
1270 */
1271 zfpm_g->connect_calls++;
1272 zfpm_g->stats.connect_calls++;
1273 zfpm_g->last_connect_call_time = monotime(NULL);
1274
1275 ret = connect(sock, (struct sockaddr *)&serv, sizeof(serv));
1276 if (ret >= 0) {
1277 zfpm_g->sock = sock;
1278 zfpm_connection_up("connect succeeded");
1279 return;
1280 }
1281
1282 if (errno == EINPROGRESS) {
1283 zfpm_g->sock = sock;
1284 zfpm_read_on();
1285 zfpm_write_on();
1286 zfpm_set_state(ZFPM_STATE_CONNECTING,
1287 "async connect in progress");
1288 return;
1289 }
1290
1291 zlog_info("can't connect to FPM %d: %s", sock, safe_strerror(errno));
1292 close(sock);
1293
1294 /*
1295 * Restart timer for retrying connection.
1296 */
1297 zfpm_start_connect_timer("connect() failed");
1298 }
1299
1300 /*
1301 * zfpm_set_state
1302 *
1303 * Move state machine into the given state.
1304 */
1305 static void zfpm_set_state(enum zfpm_state state, const char *reason)
1306 {
1307 enum zfpm_state cur_state = zfpm_g->state;
1308
1309 if (!reason)
1310 reason = "Unknown";
1311
1312 if (state == cur_state)
1313 return;
1314
1315 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1316 zfpm_state_to_str(cur_state), zfpm_state_to_str(state),
1317 reason);
1318
1319 switch (state) {
1320
1321 case ZFPM_STATE_IDLE:
1322 assert(cur_state == ZFPM_STATE_ESTABLISHED);
1323 break;
1324
1325 case ZFPM_STATE_ACTIVE:
1326 assert(cur_state == ZFPM_STATE_IDLE
1327 || cur_state == ZFPM_STATE_CONNECTING);
1328 assert(zfpm_g->t_connect);
1329 break;
1330
1331 case ZFPM_STATE_CONNECTING:
1332 assert(zfpm_g->sock);
1333 assert(cur_state == ZFPM_STATE_ACTIVE);
1334 assert(zfpm_g->t_read);
1335 assert(zfpm_g->t_write);
1336 break;
1337
1338 case ZFPM_STATE_ESTABLISHED:
1339 assert(cur_state == ZFPM_STATE_ACTIVE
1340 || cur_state == ZFPM_STATE_CONNECTING);
1341 assert(zfpm_g->sock);
1342 assert(zfpm_g->t_read);
1343 assert(zfpm_g->t_write);
1344 break;
1345 }
1346
1347 zfpm_g->state = state;
1348 }
1349
1350 /*
1351 * zfpm_calc_connect_delay
1352 *
1353 * Returns the number of seconds after which we should attempt to
1354 * reconnect to the FPM.
1355 */
1356 static long zfpm_calc_connect_delay(void)
1357 {
1358 time_t elapsed;
1359
1360 /*
1361 * Return 0 if this is our first attempt to connect.
1362 */
1363 if (zfpm_g->connect_calls == 0) {
1364 return 0;
1365 }
1366
1367 elapsed = zfpm_get_elapsed_time(zfpm_g->last_connect_call_time);
1368
1369 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1370 return 0;
1371 }
1372
1373 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1374 }
1375
1376 /*
1377 * zfpm_start_connect_timer
1378 */
1379 static void zfpm_start_connect_timer(const char *reason)
1380 {
1381 long delay_secs;
1382
1383 assert(!zfpm_g->t_connect);
1384 assert(zfpm_g->sock < 0);
1385
1386 assert(zfpm_g->state == ZFPM_STATE_IDLE
1387 || zfpm_g->state == ZFPM_STATE_ACTIVE
1388 || zfpm_g->state == ZFPM_STATE_CONNECTING);
1389
1390 delay_secs = zfpm_calc_connect_delay();
1391 zfpm_debug("scheduling connect in %ld seconds", delay_secs);
1392
1393 event_add_timer(zfpm_g->master, zfpm_connect_cb, 0, delay_secs,
1394 &zfpm_g->t_connect);
1395 zfpm_set_state(ZFPM_STATE_ACTIVE, reason);
1396 }
1397
1398 /*
1399 * zfpm_is_enabled
1400 *
1401 * Returns true if the zebra FPM module has been enabled.
1402 */
1403 static inline int zfpm_is_enabled(void)
1404 {
1405 return zfpm_g->enabled;
1406 }
1407
1408 /*
1409 * zfpm_conn_is_up
1410 *
1411 * Returns true if the connection to the FPM is up.
1412 */
1413 static inline int zfpm_conn_is_up(void)
1414 {
1415 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1416 return 0;
1417
1418 assert(zfpm_g->sock >= 0);
1419
1420 return 1;
1421 }
1422
1423 /*
1424 * zfpm_trigger_update
1425 *
1426 * The zebra code invokes this function to indicate that we should
1427 * send an update to the FPM about the given route_node.
1428 */
1429 static int zfpm_trigger_update(struct route_node *rn, const char *reason)
1430 {
1431 rib_dest_t *dest;
1432
1433 /*
1434 * Ignore if the connection is down. We will update the FPM about
1435 * all destinations once the connection comes up.
1436 */
1437 if (!zfpm_conn_is_up())
1438 return 0;
1439
1440 dest = rib_dest_from_rnode(rn);
1441
1442 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
1443 zfpm_g->stats.redundant_triggers++;
1444 return 0;
1445 }
1446
1447 if (reason) {
1448 zfpm_debug("%pFX triggering update to FPM - Reason: %s", &rn->p,
1449 reason);
1450 }
1451
1452 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1453 TAILQ_INSERT_TAIL(&zfpm_g->dest_q, dest, fpm_q_entries);
1454 zfpm_g->stats.updates_triggered++;
1455
1456 /*
1457 * Make sure that writes are enabled.
1458 */
1459 if (zfpm_g->t_write)
1460 return 0;
1461
1462 zfpm_write_on();
1463 return 0;
1464 }
1465
1466 /*
1467 * zfpm_trigger_remove
1468 *
1469 * The zebra code invokes this function to indicate that we should
1470 * send an remove to the FPM about the given route_node.
1471 */
1472
1473 static int zfpm_trigger_remove(struct route_node *rn)
1474 {
1475 rib_dest_t *dest;
1476
1477 if (!zfpm_conn_is_up())
1478 return 0;
1479
1480 dest = rib_dest_from_rnode(rn);
1481 if (!CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
1482 return 0;
1483
1484 zfpm_debug("%pRN Removing from update queue shutting down", rn);
1485
1486 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1487 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
1488
1489 return 0;
1490 }
1491
1492 /*
1493 * Generate Key for FPM MAC info hash entry
1494 */
1495 static unsigned int zfpm_mac_info_hash_keymake(const void *p)
1496 {
1497 struct fpm_mac_info_t *fpm_mac = (struct fpm_mac_info_t *)p;
1498 uint32_t mac_key;
1499
1500 mac_key = jhash(fpm_mac->macaddr.octet, ETH_ALEN, 0xa5a5a55a);
1501
1502 return jhash_2words(mac_key, fpm_mac->vni, 0);
1503 }
1504
1505 /*
1506 * Compare function for FPM MAC info hash lookup
1507 */
1508 static bool zfpm_mac_info_cmp(const void *p1, const void *p2)
1509 {
1510 const struct fpm_mac_info_t *fpm_mac1 = p1;
1511 const struct fpm_mac_info_t *fpm_mac2 = p2;
1512
1513 if (memcmp(fpm_mac1->macaddr.octet, fpm_mac2->macaddr.octet, ETH_ALEN)
1514 != 0)
1515 return false;
1516 if (fpm_mac1->vni != fpm_mac2->vni)
1517 return false;
1518
1519 return true;
1520 }
1521
1522 /*
1523 * Lookup FPM MAC info hash entry.
1524 */
1525 static struct fpm_mac_info_t *zfpm_mac_info_lookup(struct fpm_mac_info_t *key)
1526 {
1527 return hash_lookup(zfpm_g->fpm_mac_info_table, key);
1528 }
1529
1530 /*
1531 * Callback to allocate fpm_mac_info_t structure.
1532 */
1533 static void *zfpm_mac_info_alloc(void *p)
1534 {
1535 const struct fpm_mac_info_t *key = p;
1536 struct fpm_mac_info_t *fpm_mac;
1537
1538 fpm_mac = XCALLOC(MTYPE_FPM_MAC_INFO, sizeof(struct fpm_mac_info_t));
1539
1540 memcpy(&fpm_mac->macaddr, &key->macaddr, ETH_ALEN);
1541 fpm_mac->vni = key->vni;
1542
1543 return (void *)fpm_mac;
1544 }
1545
1546 /*
1547 * Delink and free fpm_mac_info_t.
1548 */
1549 static void zfpm_mac_info_del(struct fpm_mac_info_t *fpm_mac)
1550 {
1551 hash_release(zfpm_g->fpm_mac_info_table, fpm_mac);
1552 TAILQ_REMOVE(&zfpm_g->mac_q, fpm_mac, fpm_mac_q_entries);
1553 XFREE(MTYPE_FPM_MAC_INFO, fpm_mac);
1554 }
1555
1556 /*
1557 * zfpm_trigger_rmac_update
1558 *
1559 * Zebra code invokes this function to indicate that we should
1560 * send an update to FPM for given MAC entry.
1561 *
1562 * This function checks if we already have enqueued an update for this RMAC,
1563 * If yes, update the same fpm_mac_info_t. Else, create and enqueue an update.
1564 */
1565 static int zfpm_trigger_rmac_update(struct zebra_mac *rmac,
1566 struct zebra_l3vni *zl3vni, bool delete,
1567 const char *reason)
1568 {
1569 struct fpm_mac_info_t *fpm_mac, key;
1570 struct interface *vxlan_if, *svi_if;
1571 bool mac_found = false;
1572
1573 /*
1574 * Ignore if the connection is down. We will update the FPM about
1575 * all destinations once the connection comes up.
1576 */
1577 if (!zfpm_conn_is_up())
1578 return 0;
1579
1580 if (reason) {
1581 zfpm_debug("triggering update to FPM - Reason: %s - %pEA",
1582 reason, &rmac->macaddr);
1583 }
1584
1585 vxlan_if = zl3vni_map_to_vxlan_if(zl3vni);
1586 svi_if = zl3vni_map_to_svi_if(zl3vni);
1587
1588 memset(&key, 0, sizeof(key));
1589
1590 memcpy(&key.macaddr, &rmac->macaddr, ETH_ALEN);
1591 key.vni = zl3vni->vni;
1592
1593 /* Check if this MAC is already present in the queue. */
1594 fpm_mac = zfpm_mac_info_lookup(&key);
1595
1596 if (fpm_mac) {
1597 mac_found = true;
1598
1599 /*
1600 * If the enqueued op is "add" and current op is "delete",
1601 * this is a noop. So, Unset ZEBRA_MAC_UPDATE_FPM flag.
1602 * While processing FPM queue, we will silently delete this
1603 * MAC entry without sending any update for this MAC.
1604 */
1605 if (!CHECK_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_DELETE_FPM) &&
1606 delete == 1) {
1607 SET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_DELETE_FPM);
1608 UNSET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_UPDATE_FPM);
1609 return 0;
1610 }
1611 } else
1612 fpm_mac = hash_get(zfpm_g->fpm_mac_info_table, &key,
1613 zfpm_mac_info_alloc);
1614
1615 fpm_mac->r_vtep_ip.s_addr = rmac->fwd_info.r_vtep_ip.s_addr;
1616 fpm_mac->zebra_flags = rmac->flags;
1617 fpm_mac->vxlan_if = vxlan_if ? vxlan_if->ifindex : 0;
1618 fpm_mac->svi_if = svi_if ? svi_if->ifindex : 0;
1619
1620 SET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_UPDATE_FPM);
1621 if (delete)
1622 SET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_DELETE_FPM);
1623 else
1624 UNSET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_DELETE_FPM);
1625
1626 if (!mac_found)
1627 TAILQ_INSERT_TAIL(&zfpm_g->mac_q, fpm_mac, fpm_mac_q_entries);
1628
1629 zfpm_g->stats.updates_triggered++;
1630
1631 /* If writes are already enabled, return. */
1632 if (zfpm_g->t_write)
1633 return 0;
1634
1635 zfpm_write_on();
1636 return 0;
1637 }
1638
1639 /*
1640 * This function is called when the FPM connections is established.
1641 * Iterate over all the RMAC entries for the given L3VNI
1642 * and enqueue the RMAC for FPM processing.
1643 */
1644 static void zfpm_trigger_rmac_update_wrapper(struct hash_bucket *bucket,
1645 void *args)
1646 {
1647 struct zebra_mac *zrmac = (struct zebra_mac *)bucket->data;
1648 struct zebra_l3vni *zl3vni = (struct zebra_l3vni *)args;
1649
1650 zfpm_trigger_rmac_update(zrmac, zl3vni, false, "RMAC added");
1651 }
1652
1653 /*
1654 * This function is called when the FPM connections is established.
1655 * This function iterates over all the L3VNIs to trigger
1656 * FPM updates for RMACs currently available.
1657 */
1658 static void zfpm_iterate_rmac_table(struct hash_bucket *bucket, void *args)
1659 {
1660 struct zebra_l3vni *zl3vni = (struct zebra_l3vni *)bucket->data;
1661
1662 hash_iterate(zl3vni->rmac_table, zfpm_trigger_rmac_update_wrapper,
1663 (void *)zl3vni);
1664 }
1665
1666 /*
1667 * struct zfpm_statsimer_cb
1668 */
1669 static void zfpm_stats_timer_cb(struct event *t)
1670 {
1671 zfpm_g->t_stats = NULL;
1672
1673 /*
1674 * Remember the stats collected in the last interval for display
1675 * purposes.
1676 */
1677 zfpm_stats_copy(&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1678
1679 /*
1680 * Add the current set of stats into the cumulative statistics.
1681 */
1682 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1683 &zfpm_g->cumulative_stats);
1684
1685 /*
1686 * Start collecting stats afresh over the next interval.
1687 */
1688 zfpm_stats_reset(&zfpm_g->stats);
1689
1690 zfpm_start_stats_timer();
1691 }
1692
1693 /*
1694 * zfpm_stop_stats_timer
1695 */
1696 static void zfpm_stop_stats_timer(void)
1697 {
1698 if (!zfpm_g->t_stats)
1699 return;
1700
1701 zfpm_debug("Stopping existing stats timer");
1702 EVENT_OFF(zfpm_g->t_stats);
1703 }
1704
1705 /*
1706 * zfpm_start_stats_timer
1707 */
1708 void zfpm_start_stats_timer(void)
1709 {
1710 assert(!zfpm_g->t_stats);
1711
1712 event_add_timer(zfpm_g->master, zfpm_stats_timer_cb, 0,
1713 ZFPM_STATS_IVL_SECS, &zfpm_g->t_stats);
1714 }
1715
1716 /*
1717 * Helper macro for zfpm_show_stats() below.
1718 */
1719 #define ZFPM_SHOW_STAT(counter) \
1720 do { \
1721 vty_out(vty, "%-40s %10lu %16lu\n", #counter, \
1722 total_stats.counter, zfpm_g->last_ivl_stats.counter); \
1723 } while (0)
1724
1725 /*
1726 * zfpm_show_stats
1727 */
1728 static void zfpm_show_stats(struct vty *vty)
1729 {
1730 struct zfpm_stats total_stats;
1731 time_t elapsed;
1732
1733 vty_out(vty, "\n%-40s %10s Last %2d secs\n\n", "Counter", "Total",
1734 ZFPM_STATS_IVL_SECS);
1735
1736 /*
1737 * Compute the total stats up to this instant.
1738 */
1739 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1740 &total_stats);
1741
1742 ZFPM_SHOW_STAT(connect_calls);
1743 ZFPM_SHOW_STAT(connect_no_sock);
1744 ZFPM_SHOW_STAT(read_cb_calls);
1745 ZFPM_SHOW_STAT(write_cb_calls);
1746 ZFPM_SHOW_STAT(write_calls);
1747 ZFPM_SHOW_STAT(partial_writes);
1748 ZFPM_SHOW_STAT(max_writes_hit);
1749 ZFPM_SHOW_STAT(t_write_yields);
1750 ZFPM_SHOW_STAT(nop_deletes_skipped);
1751 ZFPM_SHOW_STAT(route_adds);
1752 ZFPM_SHOW_STAT(route_dels);
1753 ZFPM_SHOW_STAT(updates_triggered);
1754 ZFPM_SHOW_STAT(redundant_triggers);
1755 ZFPM_SHOW_STAT(dests_del_after_update);
1756 ZFPM_SHOW_STAT(t_conn_down_starts);
1757 ZFPM_SHOW_STAT(t_conn_down_dests_processed);
1758 ZFPM_SHOW_STAT(t_conn_down_yields);
1759 ZFPM_SHOW_STAT(t_conn_down_finishes);
1760 ZFPM_SHOW_STAT(t_conn_up_starts);
1761 ZFPM_SHOW_STAT(t_conn_up_dests_processed);
1762 ZFPM_SHOW_STAT(t_conn_up_yields);
1763 ZFPM_SHOW_STAT(t_conn_up_aborts);
1764 ZFPM_SHOW_STAT(t_conn_up_finishes);
1765
1766 if (!zfpm_g->last_stats_clear_time)
1767 return;
1768
1769 elapsed = zfpm_get_elapsed_time(zfpm_g->last_stats_clear_time);
1770
1771 vty_out(vty, "\nStats were cleared %lu seconds ago\n",
1772 (unsigned long)elapsed);
1773 }
1774
1775 /*
1776 * zfpm_clear_stats
1777 */
1778 static void zfpm_clear_stats(struct vty *vty)
1779 {
1780 if (!zfpm_is_enabled()) {
1781 vty_out(vty, "The FPM module is not enabled...\n");
1782 return;
1783 }
1784
1785 zfpm_stats_reset(&zfpm_g->stats);
1786 zfpm_stats_reset(&zfpm_g->last_ivl_stats);
1787 zfpm_stats_reset(&zfpm_g->cumulative_stats);
1788
1789 zfpm_stop_stats_timer();
1790 zfpm_start_stats_timer();
1791
1792 zfpm_g->last_stats_clear_time = monotime(NULL);
1793
1794 vty_out(vty, "Cleared FPM stats\n");
1795 }
1796
1797 /*
1798 * show_zebra_fpm_stats
1799 */
1800 DEFUN (show_zebra_fpm_stats,
1801 show_zebra_fpm_stats_cmd,
1802 "show zebra fpm stats",
1803 SHOW_STR
1804 ZEBRA_STR
1805 "Forwarding Path Manager information\n"
1806 "Statistics\n")
1807 {
1808 zfpm_show_stats(vty);
1809 return CMD_SUCCESS;
1810 }
1811
1812 /*
1813 * clear_zebra_fpm_stats
1814 */
1815 DEFUN (clear_zebra_fpm_stats,
1816 clear_zebra_fpm_stats_cmd,
1817 "clear zebra fpm stats",
1818 CLEAR_STR
1819 ZEBRA_STR
1820 "Clear Forwarding Path Manager information\n"
1821 "Statistics\n")
1822 {
1823 zfpm_clear_stats(vty);
1824 return CMD_SUCCESS;
1825 }
1826
1827 /*
1828 * update fpm connection information
1829 */
1830 DEFUN (fpm_remote_ip,
1831 fpm_remote_ip_cmd,
1832 "fpm connection ip A.B.C.D port (1-65535)",
1833 "Forwarding Path Manager\n"
1834 "Configure FPM connection\n"
1835 "Connect to IPv4 address\n"
1836 "Connect to IPv4 address\n"
1837 "TCP port number\n"
1838 "TCP port number\n")
1839 {
1840
1841 in_addr_t fpm_server;
1842 uint32_t port_no;
1843
1844 fpm_server = inet_addr(argv[3]->arg);
1845 if (fpm_server == INADDR_NONE)
1846 return CMD_ERR_INCOMPLETE;
1847
1848 port_no = atoi(argv[5]->arg);
1849 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1850 return CMD_ERR_INCOMPLETE;
1851
1852 zfpm_g->fpm_server = fpm_server;
1853 zfpm_g->fpm_port = port_no;
1854
1855
1856 return CMD_SUCCESS;
1857 }
1858
1859 DEFUN (no_fpm_remote_ip,
1860 no_fpm_remote_ip_cmd,
1861 "no fpm connection ip A.B.C.D port (1-65535)",
1862 NO_STR
1863 "Forwarding Path Manager\n"
1864 "Remove configured FPM connection\n"
1865 "Connect to IPv4 address\n"
1866 "Connect to IPv4 address\n"
1867 "TCP port number\n"
1868 "TCP port number\n")
1869 {
1870 if (zfpm_g->fpm_server != inet_addr(argv[4]->arg)
1871 || zfpm_g->fpm_port != atoi(argv[6]->arg))
1872 return CMD_ERR_NO_MATCH;
1873
1874 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1875 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1876
1877 return CMD_SUCCESS;
1878 }
1879
1880 /*
1881 * zfpm_init_message_format
1882 */
1883 static inline void zfpm_init_message_format(const char *format)
1884 {
1885 int have_netlink, have_protobuf;
1886
1887 #ifdef HAVE_NETLINK
1888 have_netlink = 1;
1889 #else
1890 have_netlink = 0;
1891 #endif
1892
1893 #ifdef HAVE_PROTOBUF
1894 have_protobuf = 1;
1895 #else
1896 have_protobuf = 0;
1897 #endif
1898
1899 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
1900
1901 if (!format) {
1902 if (have_netlink) {
1903 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1904 } else if (have_protobuf) {
1905 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1906 }
1907 return;
1908 }
1909
1910 if (!strcmp("netlink", format)) {
1911 if (!have_netlink) {
1912 flog_err(EC_ZEBRA_NETLINK_NOT_AVAILABLE,
1913 "FPM netlink message format is not available");
1914 return;
1915 }
1916 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1917 return;
1918 }
1919
1920 if (!strcmp("protobuf", format)) {
1921 if (!have_protobuf) {
1922 flog_err(
1923 EC_ZEBRA_PROTOBUF_NOT_AVAILABLE,
1924 "FPM protobuf message format is not available");
1925 return;
1926 }
1927 flog_warn(EC_ZEBRA_PROTOBUF_NOT_AVAILABLE,
1928 "FPM protobuf message format is deprecated and scheduled to be removed. Please convert to using netlink format or contact dev@lists.frrouting.org with your use case.");
1929 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1930 return;
1931 }
1932
1933 flog_warn(EC_ZEBRA_FPM_FORMAT_UNKNOWN, "Unknown fpm format '%s'",
1934 format);
1935 }
1936
1937 /**
1938 * fpm_remote_srv_write
1939 *
1940 * Module to write remote fpm connection
1941 *
1942 * Returns ZERO on success.
1943 */
1944
1945 static int fpm_remote_srv_write(struct vty *vty)
1946 {
1947 struct in_addr in;
1948
1949 in.s_addr = zfpm_g->fpm_server;
1950
1951 if ((zfpm_g->fpm_server != FPM_DEFAULT_IP
1952 && zfpm_g->fpm_server != INADDR_ANY)
1953 || (zfpm_g->fpm_port != FPM_DEFAULT_PORT && zfpm_g->fpm_port != 0))
1954 vty_out(vty, "fpm connection ip %pI4 port %d\n", &in,
1955 zfpm_g->fpm_port);
1956
1957 return 0;
1958 }
1959
1960
1961 static int fpm_remote_srv_write(struct vty *vty);
1962 /* Zebra node */
1963 static struct cmd_node zebra_node = {
1964 .name = "zebra",
1965 .node = ZEBRA_NODE,
1966 .parent_node = CONFIG_NODE,
1967 .prompt = "",
1968 .config_write = fpm_remote_srv_write,
1969 };
1970
1971
1972 /**
1973 * zfpm_init
1974 *
1975 * One-time initialization of the Zebra FPM module.
1976 *
1977 * @param[in] port port at which FPM is running.
1978 * @param[in] enable true if the zebra FPM module should be enabled
1979 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
1980 *
1981 * Returns true on success.
1982 */
1983 static int zfpm_init(struct event_loop *master)
1984 {
1985 int enable = 1;
1986 uint16_t port = 0;
1987 const char *format = THIS_MODULE->load_args;
1988
1989 memset(zfpm_g, 0, sizeof(*zfpm_g));
1990 zfpm_g->master = master;
1991 TAILQ_INIT(&zfpm_g->dest_q);
1992 TAILQ_INIT(&zfpm_g->mac_q);
1993
1994 /* Create hash table for fpm_mac_info_t enties */
1995 zfpm_g->fpm_mac_info_table = hash_create(zfpm_mac_info_hash_keymake,
1996 zfpm_mac_info_cmp,
1997 "FPM MAC info hash table");
1998
1999 zfpm_g->sock = -1;
2000 zfpm_g->state = ZFPM_STATE_IDLE;
2001
2002 zfpm_stats_init(&zfpm_g->stats);
2003 zfpm_stats_init(&zfpm_g->last_ivl_stats);
2004 zfpm_stats_init(&zfpm_g->cumulative_stats);
2005
2006 memset(&ipv4ll_gateway, 0, sizeof(ipv4ll_gateway));
2007 if (inet_pton(AF_INET, ipv4_ll_buf, &ipv4ll_gateway.ipv4) != 1)
2008 zlog_warn("inet_pton failed for %s", ipv4_ll_buf);
2009
2010 install_node(&zebra_node);
2011 install_element(ENABLE_NODE, &show_zebra_fpm_stats_cmd);
2012 install_element(ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
2013 install_element(CONFIG_NODE, &fpm_remote_ip_cmd);
2014 install_element(CONFIG_NODE, &no_fpm_remote_ip_cmd);
2015
2016 zfpm_init_message_format(format);
2017
2018 /*
2019 * Disable FPM interface if no suitable format is available.
2020 */
2021 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
2022 enable = 0;
2023
2024 zfpm_g->enabled = enable;
2025
2026 if (!zfpm_g->fpm_server)
2027 zfpm_g->fpm_server = FPM_DEFAULT_IP;
2028
2029 if (!port)
2030 port = FPM_DEFAULT_PORT;
2031
2032 zfpm_g->fpm_port = port;
2033
2034 zfpm_g->obuf = stream_new(ZFPM_OBUF_SIZE);
2035 zfpm_g->ibuf = stream_new(ZFPM_IBUF_SIZE);
2036
2037 zfpm_start_stats_timer();
2038 zfpm_start_connect_timer("initialized");
2039 return 0;
2040 }
2041
2042 static int zfpm_fini(void)
2043 {
2044 zfpm_write_off();
2045 zfpm_read_off();
2046 zfpm_connect_off();
2047 zfpm_conn_down_off();
2048
2049 zfpm_stop_stats_timer();
2050
2051 hook_unregister(rib_update, zfpm_trigger_update);
2052 hook_unregister(zebra_rmac_update, zfpm_trigger_rmac_update);
2053
2054 return 0;
2055 }
2056
2057 static int zebra_fpm_module_init(void)
2058 {
2059 hook_register(rib_update, zfpm_trigger_update);
2060 hook_register(rib_shutdown, zfpm_trigger_remove);
2061 hook_register(zebra_rmac_update, zfpm_trigger_rmac_update);
2062 hook_register(frr_late_init, zfpm_init);
2063 hook_register(frr_early_fini, zfpm_fini);
2064 return 0;
2065 }
2066
2067 FRR_MODULE_SETUP(.name = "zebra_fpm", .version = FRR_VERSION,
2068 .description = "zebra FPM (Forwarding Plane Manager) module",
2069 .init = zebra_fpm_module_init,
2070 );