]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_fpm.c
Merge pull request #12791 from taspelund/loc_rib_json_fix
[mirror_frr.git] / zebra / zebra_fpm.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * Main implementation file for interface to Forwarding Plane Manager.
4 *
5 * Copyright (C) 2012 by Open Source Routing.
6 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
7 */
8
9 #include <zebra.h>
10
11 #include "log.h"
12 #include "libfrr.h"
13 #include "stream.h"
14 #include "thread.h"
15 #include "network.h"
16 #include "command.h"
17 #include "lib/version.h"
18 #include "jhash.h"
19
20 #include "zebra/rib.h"
21 #include "zebra/zserv.h"
22 #include "zebra/zebra_ns.h"
23 #include "zebra/zebra_vrf.h"
24 #include "zebra/zebra_errors.h"
25
26 #include "fpm/fpm.h"
27 #include "zebra_fpm_private.h"
28 #include "zebra/zebra_router.h"
29 #include "zebra_vxlan_private.h"
30
31 DEFINE_MTYPE_STATIC(ZEBRA, FPM_MAC_INFO, "FPM_MAC_INFO");
32
33 /*
34 * Interval at which we attempt to connect to the FPM.
35 */
36 #define ZFPM_CONNECT_RETRY_IVL 5
37
38 /*
39 * Sizes of outgoing and incoming stream buffers for writing/reading
40 * FPM messages.
41 */
42 #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
43 #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
44
45 /*
46 * The maximum number of times the FPM socket write callback can call
47 * 'write' before it yields.
48 */
49 #define ZFPM_MAX_WRITES_PER_RUN 10
50
51 /*
52 * Interval over which we collect statistics.
53 */
54 #define ZFPM_STATS_IVL_SECS 10
55 #define FPM_MAX_MAC_MSG_LEN 512
56
57 static void zfpm_iterate_rmac_table(struct hash_bucket *bucket, void *args);
58
59 /*
60 * Structure that holds state for iterating over all route_node
61 * structures that are candidates for being communicated to the FPM.
62 */
63 struct zfpm_rnodes_iter {
64 rib_tables_iter_t tables_iter;
65 route_table_iter_t iter;
66 };
67
68 /*
69 * Statistics.
70 */
71 struct zfpm_stats {
72 unsigned long connect_calls;
73 unsigned long connect_no_sock;
74
75 unsigned long read_cb_calls;
76
77 unsigned long write_cb_calls;
78 unsigned long write_calls;
79 unsigned long partial_writes;
80 unsigned long max_writes_hit;
81 unsigned long t_write_yields;
82
83 unsigned long nop_deletes_skipped;
84 unsigned long route_adds;
85 unsigned long route_dels;
86
87 unsigned long updates_triggered;
88 unsigned long redundant_triggers;
89
90 unsigned long dests_del_after_update;
91
92 unsigned long t_conn_down_starts;
93 unsigned long t_conn_down_dests_processed;
94 unsigned long t_conn_down_yields;
95 unsigned long t_conn_down_finishes;
96
97 unsigned long t_conn_up_starts;
98 unsigned long t_conn_up_dests_processed;
99 unsigned long t_conn_up_yields;
100 unsigned long t_conn_up_aborts;
101 unsigned long t_conn_up_finishes;
102 };
103
104 /*
105 * States for the FPM state machine.
106 */
107 enum zfpm_state {
108
109 /*
110 * In this state we are not yet ready to connect to the FPM. This
111 * can happen when this module is disabled, or if we're cleaning up
112 * after a connection has gone down.
113 */
114 ZFPM_STATE_IDLE,
115
116 /*
117 * Ready to talk to the FPM and periodically trying to connect to
118 * it.
119 */
120 ZFPM_STATE_ACTIVE,
121
122 /*
123 * In the middle of bringing up a TCP connection. Specifically,
124 * waiting for a connect() call to complete asynchronously.
125 */
126 ZFPM_STATE_CONNECTING,
127
128 /*
129 * TCP connection to the FPM is up.
130 */
131 ZFPM_STATE_ESTABLISHED
132
133 };
134
135 /*
136 * Message format to be used to communicate with the FPM.
137 */
138 enum zfpm_msg_format {
139 ZFPM_MSG_FORMAT_NONE,
140 ZFPM_MSG_FORMAT_NETLINK,
141 ZFPM_MSG_FORMAT_PROTOBUF,
142 };
143
144 /*
145 * Globals.
146 */
147 struct zfpm_glob {
148
149 /*
150 * True if the FPM module has been enabled.
151 */
152 int enabled;
153
154 /*
155 * Message format to be used to communicate with the fpm.
156 */
157 enum zfpm_msg_format message_format;
158
159 struct thread_master *master;
160
161 enum zfpm_state state;
162
163 in_addr_t fpm_server;
164 /*
165 * Port on which the FPM is running.
166 */
167 int fpm_port;
168
169 /*
170 * List of rib_dest_t structures to be processed
171 */
172 TAILQ_HEAD(zfpm_dest_q, rib_dest_t_) dest_q;
173
174 /*
175 * List of fpm_mac_info structures to be processed
176 */
177 TAILQ_HEAD(zfpm_mac_q, fpm_mac_info_t) mac_q;
178
179 /*
180 * Hash table of fpm_mac_info_t entries
181 *
182 * While adding fpm_mac_info_t for a MAC to the mac_q,
183 * it is possible that another fpm_mac_info_t node for the this MAC
184 * is already present in the queue.
185 * This is possible in the case of consecutive add->delete operations.
186 * To avoid such duplicate insertions in the mac_q,
187 * define a hash table for fpm_mac_info_t which can be looked up
188 * to see if an fpm_mac_info_t node for a MAC is already present
189 * in the mac_q.
190 */
191 struct hash *fpm_mac_info_table;
192
193 /*
194 * Stream socket to the FPM.
195 */
196 int sock;
197
198 /*
199 * Buffers for messages to/from the FPM.
200 */
201 struct stream *obuf;
202 struct stream *ibuf;
203
204 /*
205 * Threads for I/O.
206 */
207 struct thread *t_connect;
208 struct thread *t_write;
209 struct thread *t_read;
210
211 /*
212 * Thread to clean up after the TCP connection to the FPM goes down
213 * and the state that belongs to it.
214 */
215 struct thread *t_conn_down;
216
217 struct {
218 struct zfpm_rnodes_iter iter;
219 } t_conn_down_state;
220
221 /*
222 * Thread to take actions once the TCP conn to the FPM comes up, and
223 * the state that belongs to it.
224 */
225 struct thread *t_conn_up;
226
227 struct {
228 struct zfpm_rnodes_iter iter;
229 } t_conn_up_state;
230
231 unsigned long connect_calls;
232 time_t last_connect_call_time;
233
234 /*
235 * Stats from the start of the current statistics interval up to
236 * now. These are the counters we typically update in the code.
237 */
238 struct zfpm_stats stats;
239
240 /*
241 * Statistics that were gathered in the last collection interval.
242 */
243 struct zfpm_stats last_ivl_stats;
244
245 /*
246 * Cumulative stats from the last clear to the start of the current
247 * statistics interval.
248 */
249 struct zfpm_stats cumulative_stats;
250
251 /*
252 * Stats interval timer.
253 */
254 struct thread *t_stats;
255
256 /*
257 * If non-zero, the last time when statistics were cleared.
258 */
259 time_t last_stats_clear_time;
260
261 /*
262 * Flag to track the MAC dump status to FPM
263 */
264 bool fpm_mac_dump_done;
265 };
266
267 static struct zfpm_glob zfpm_glob_space;
268 static struct zfpm_glob *zfpm_g = &zfpm_glob_space;
269
270 static int zfpm_trigger_update(struct route_node *rn, const char *reason);
271
272 static void zfpm_read_cb(struct thread *thread);
273 static void zfpm_write_cb(struct thread *thread);
274
275 static void zfpm_set_state(enum zfpm_state state, const char *reason);
276 static void zfpm_start_connect_timer(const char *reason);
277 static void zfpm_start_stats_timer(void);
278 static void zfpm_mac_info_del(struct fpm_mac_info_t *fpm_mac);
279
280 static const char ipv4_ll_buf[16] = "169.254.0.1";
281 union g_addr ipv4ll_gateway;
282
283 /*
284 * zfpm_thread_should_yield
285 */
286 static inline int zfpm_thread_should_yield(struct thread *t)
287 {
288 return thread_should_yield(t);
289 }
290
291 /*
292 * zfpm_state_to_str
293 */
294 static const char *zfpm_state_to_str(enum zfpm_state state)
295 {
296 switch (state) {
297
298 case ZFPM_STATE_IDLE:
299 return "idle";
300
301 case ZFPM_STATE_ACTIVE:
302 return "active";
303
304 case ZFPM_STATE_CONNECTING:
305 return "connecting";
306
307 case ZFPM_STATE_ESTABLISHED:
308 return "established";
309
310 default:
311 return "unknown";
312 }
313 }
314
315 /*
316 * zfpm_get_elapsed_time
317 *
318 * Returns the time elapsed (in seconds) since the given time.
319 */
320 static time_t zfpm_get_elapsed_time(time_t reference)
321 {
322 time_t now;
323
324 now = monotime(NULL);
325
326 if (now < reference) {
327 assert(0);
328 return 0;
329 }
330
331 return now - reference;
332 }
333
334 /*
335 * zfpm_rnodes_iter_init
336 */
337 static inline void zfpm_rnodes_iter_init(struct zfpm_rnodes_iter *iter)
338 {
339 memset(iter, 0, sizeof(*iter));
340 rib_tables_iter_init(&iter->tables_iter);
341
342 /*
343 * This is a hack, but it makes implementing 'next' easier by
344 * ensuring that route_table_iter_next() will return NULL the first
345 * time we call it.
346 */
347 route_table_iter_init(&iter->iter, NULL);
348 route_table_iter_cleanup(&iter->iter);
349 }
350
351 /*
352 * zfpm_rnodes_iter_next
353 */
354 static inline struct route_node *
355 zfpm_rnodes_iter_next(struct zfpm_rnodes_iter *iter)
356 {
357 struct route_node *rn;
358 struct route_table *table;
359
360 while (1) {
361 rn = route_table_iter_next(&iter->iter);
362 if (rn)
363 return rn;
364
365 /*
366 * We've made our way through this table, go to the next one.
367 */
368 route_table_iter_cleanup(&iter->iter);
369
370 table = rib_tables_iter_next(&iter->tables_iter);
371
372 if (!table)
373 return NULL;
374
375 route_table_iter_init(&iter->iter, table);
376 }
377
378 return NULL;
379 }
380
381 /*
382 * zfpm_rnodes_iter_pause
383 */
384 static inline void zfpm_rnodes_iter_pause(struct zfpm_rnodes_iter *iter)
385 {
386 route_table_iter_pause(&iter->iter);
387 }
388
389 /*
390 * zfpm_rnodes_iter_cleanup
391 */
392 static inline void zfpm_rnodes_iter_cleanup(struct zfpm_rnodes_iter *iter)
393 {
394 route_table_iter_cleanup(&iter->iter);
395 rib_tables_iter_cleanup(&iter->tables_iter);
396 }
397
398 /*
399 * zfpm_stats_init
400 *
401 * Initialize a statistics block.
402 */
403 static inline void zfpm_stats_init(struct zfpm_stats *stats)
404 {
405 memset(stats, 0, sizeof(*stats));
406 }
407
408 /*
409 * zfpm_stats_reset
410 */
411 static inline void zfpm_stats_reset(struct zfpm_stats *stats)
412 {
413 zfpm_stats_init(stats);
414 }
415
416 /*
417 * zfpm_stats_copy
418 */
419 static inline void zfpm_stats_copy(const struct zfpm_stats *src,
420 struct zfpm_stats *dest)
421 {
422 memcpy(dest, src, sizeof(*dest));
423 }
424
425 /*
426 * zfpm_stats_compose
427 *
428 * Total up the statistics in two stats structures ('s1 and 's2') and
429 * return the result in the third argument, 'result'. Note that the
430 * pointer 'result' may be the same as 's1' or 's2'.
431 *
432 * For simplicity, the implementation below assumes that the stats
433 * structure is composed entirely of counters. This can easily be
434 * changed when necessary.
435 */
436 static void zfpm_stats_compose(const struct zfpm_stats *s1,
437 const struct zfpm_stats *s2,
438 struct zfpm_stats *result)
439 {
440 const unsigned long *p1, *p2;
441 unsigned long *result_p;
442 int i, num_counters;
443
444 p1 = (const unsigned long *)s1;
445 p2 = (const unsigned long *)s2;
446 result_p = (unsigned long *)result;
447
448 num_counters = (sizeof(struct zfpm_stats) / sizeof(unsigned long));
449
450 for (i = 0; i < num_counters; i++) {
451 result_p[i] = p1[i] + p2[i];
452 }
453 }
454
455 /*
456 * zfpm_read_on
457 */
458 static inline void zfpm_read_on(void)
459 {
460 assert(!zfpm_g->t_read);
461 assert(zfpm_g->sock >= 0);
462
463 thread_add_read(zfpm_g->master, zfpm_read_cb, 0, zfpm_g->sock,
464 &zfpm_g->t_read);
465 }
466
467 /*
468 * zfpm_write_on
469 */
470 static inline void zfpm_write_on(void)
471 {
472 assert(!zfpm_g->t_write);
473 assert(zfpm_g->sock >= 0);
474
475 thread_add_write(zfpm_g->master, zfpm_write_cb, 0, zfpm_g->sock,
476 &zfpm_g->t_write);
477 }
478
479 /*
480 * zfpm_read_off
481 */
482 static inline void zfpm_read_off(void)
483 {
484 THREAD_OFF(zfpm_g->t_read);
485 }
486
487 /*
488 * zfpm_write_off
489 */
490 static inline void zfpm_write_off(void)
491 {
492 THREAD_OFF(zfpm_g->t_write);
493 }
494
495 static inline void zfpm_connect_off(void)
496 {
497 THREAD_OFF(zfpm_g->t_connect);
498 }
499
500 /*
501 * zfpm_conn_up_thread_cb
502 *
503 * Callback for actions to be taken when the connection to the FPM
504 * comes up.
505 */
506 static void zfpm_conn_up_thread_cb(struct thread *thread)
507 {
508 struct route_node *rnode;
509 struct zfpm_rnodes_iter *iter;
510 rib_dest_t *dest;
511
512 iter = &zfpm_g->t_conn_up_state.iter;
513
514 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED) {
515 zfpm_debug(
516 "Connection not up anymore, conn_up thread aborting");
517 zfpm_g->stats.t_conn_up_aborts++;
518 goto done;
519 }
520
521 if (!zfpm_g->fpm_mac_dump_done) {
522 /* Enqueue FPM updates for all the RMAC entries */
523 hash_iterate(zrouter.l3vni_table, zfpm_iterate_rmac_table,
524 NULL);
525 /* mark dump done so that its not repeated after yield */
526 zfpm_g->fpm_mac_dump_done = true;
527 }
528
529 while ((rnode = zfpm_rnodes_iter_next(iter))) {
530 dest = rib_dest_from_rnode(rnode);
531
532 if (dest) {
533 zfpm_g->stats.t_conn_up_dests_processed++;
534 zfpm_trigger_update(rnode, NULL);
535 }
536
537 /*
538 * Yield if need be.
539 */
540 if (!zfpm_thread_should_yield(thread))
541 continue;
542
543 zfpm_g->stats.t_conn_up_yields++;
544 zfpm_rnodes_iter_pause(iter);
545 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb,
546 NULL, 0, &zfpm_g->t_conn_up);
547 return;
548 }
549
550 zfpm_g->stats.t_conn_up_finishes++;
551
552 done:
553 zfpm_rnodes_iter_cleanup(iter);
554 }
555
556 /*
557 * zfpm_connection_up
558 *
559 * Called when the connection to the FPM comes up.
560 */
561 static void zfpm_connection_up(const char *detail)
562 {
563 assert(zfpm_g->sock >= 0);
564 zfpm_read_on();
565 zfpm_write_on();
566 zfpm_set_state(ZFPM_STATE_ESTABLISHED, detail);
567
568 /*
569 * Start thread to push existing routes to the FPM.
570 */
571 THREAD_OFF(zfpm_g->t_conn_up);
572
573 zfpm_rnodes_iter_init(&zfpm_g->t_conn_up_state.iter);
574 zfpm_g->fpm_mac_dump_done = false;
575
576 zfpm_debug("Starting conn_up thread");
577
578 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0,
579 &zfpm_g->t_conn_up);
580 zfpm_g->stats.t_conn_up_starts++;
581 }
582
583 /*
584 * zfpm_connect_check
585 *
586 * Check if an asynchronous connect() to the FPM is complete.
587 */
588 static void zfpm_connect_check(void)
589 {
590 int status;
591 socklen_t slen;
592 int ret;
593
594 zfpm_read_off();
595 zfpm_write_off();
596
597 slen = sizeof(status);
598 ret = getsockopt(zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *)&status,
599 &slen);
600
601 if (ret >= 0 && status == 0) {
602 zfpm_connection_up("async connect complete");
603 return;
604 }
605
606 /*
607 * getsockopt() failed or indicated an error on the socket.
608 */
609 close(zfpm_g->sock);
610 zfpm_g->sock = -1;
611
612 zfpm_start_connect_timer("getsockopt() after async connect failed");
613 return;
614 }
615
616 /*
617 * zfpm_conn_down_thread_cb
618 *
619 * Callback that is invoked to clean up state after the TCP connection
620 * to the FPM goes down.
621 */
622 static void zfpm_conn_down_thread_cb(struct thread *thread)
623 {
624 struct route_node *rnode;
625 struct zfpm_rnodes_iter *iter;
626 rib_dest_t *dest;
627 struct fpm_mac_info_t *mac = NULL;
628
629 assert(zfpm_g->state == ZFPM_STATE_IDLE);
630
631 /*
632 * Delink and free all fpm_mac_info_t nodes
633 * in the mac_q and fpm_mac_info_hash
634 */
635 while ((mac = TAILQ_FIRST(&zfpm_g->mac_q)) != NULL)
636 zfpm_mac_info_del(mac);
637
638 zfpm_g->t_conn_down = NULL;
639
640 iter = &zfpm_g->t_conn_down_state.iter;
641
642 while ((rnode = zfpm_rnodes_iter_next(iter))) {
643 dest = rib_dest_from_rnode(rnode);
644
645 if (dest) {
646 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
647 TAILQ_REMOVE(&zfpm_g->dest_q, dest,
648 fpm_q_entries);
649 }
650
651 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
652 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
653
654 zfpm_g->stats.t_conn_down_dests_processed++;
655
656 /*
657 * Check if the dest should be deleted.
658 */
659 rib_gc_dest(rnode);
660 }
661
662 /*
663 * Yield if need be.
664 */
665 if (!zfpm_thread_should_yield(thread))
666 continue;
667
668 zfpm_g->stats.t_conn_down_yields++;
669 zfpm_rnodes_iter_pause(iter);
670 zfpm_g->t_conn_down = NULL;
671 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb,
672 NULL, 0, &zfpm_g->t_conn_down);
673 return;
674 }
675
676 zfpm_g->stats.t_conn_down_finishes++;
677 zfpm_rnodes_iter_cleanup(iter);
678
679 /*
680 * Start the process of connecting to the FPM again.
681 */
682 zfpm_start_connect_timer("cleanup complete");
683 }
684
685 /*
686 * zfpm_connection_down
687 *
688 * Called when the connection to the FPM has gone down.
689 */
690 static void zfpm_connection_down(const char *detail)
691 {
692 if (!detail)
693 detail = "unknown";
694
695 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
696
697 zlog_info("connection to the FPM has gone down: %s", detail);
698
699 zfpm_read_off();
700 zfpm_write_off();
701
702 stream_reset(zfpm_g->ibuf);
703 stream_reset(zfpm_g->obuf);
704
705 if (zfpm_g->sock >= 0) {
706 close(zfpm_g->sock);
707 zfpm_g->sock = -1;
708 }
709
710 /*
711 * Start thread to clean up state after the connection goes down.
712 */
713 assert(!zfpm_g->t_conn_down);
714 zfpm_rnodes_iter_init(&zfpm_g->t_conn_down_state.iter);
715 zfpm_g->t_conn_down = NULL;
716 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0,
717 &zfpm_g->t_conn_down);
718 zfpm_g->stats.t_conn_down_starts++;
719
720 zfpm_set_state(ZFPM_STATE_IDLE, detail);
721 }
722
723 /*
724 * zfpm_read_cb
725 */
726 static void zfpm_read_cb(struct thread *thread)
727 {
728 size_t already;
729 struct stream *ibuf;
730 uint16_t msg_len;
731 fpm_msg_hdr_t *hdr;
732
733 zfpm_g->stats.read_cb_calls++;
734
735 /*
736 * Check if async connect is now done.
737 */
738 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
739 zfpm_connect_check();
740 return;
741 }
742
743 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
744 assert(zfpm_g->sock >= 0);
745
746 ibuf = zfpm_g->ibuf;
747
748 already = stream_get_endp(ibuf);
749 if (already < FPM_MSG_HDR_LEN) {
750 ssize_t nbyte;
751
752 nbyte = stream_read_try(ibuf, zfpm_g->sock,
753 FPM_MSG_HDR_LEN - already);
754 if (nbyte == 0 || nbyte == -1) {
755 if (nbyte == -1) {
756 char buffer[1024];
757
758 snprintf(buffer, sizeof(buffer),
759 "closed socket in read(%d): %s", errno,
760 safe_strerror(errno));
761 zfpm_connection_down(buffer);
762 } else
763 zfpm_connection_down("closed socket in read");
764 return;
765 }
766
767 if (nbyte != (ssize_t)(FPM_MSG_HDR_LEN - already))
768 goto done;
769
770 already = FPM_MSG_HDR_LEN;
771 }
772
773 stream_set_getp(ibuf, 0);
774
775 hdr = (fpm_msg_hdr_t *)stream_pnt(ibuf);
776
777 if (!fpm_msg_hdr_ok(hdr)) {
778 zfpm_connection_down("invalid message header");
779 return;
780 }
781
782 msg_len = fpm_msg_len(hdr);
783
784 /*
785 * Read out the rest of the packet.
786 */
787 if (already < msg_len) {
788 ssize_t nbyte;
789
790 nbyte = stream_read_try(ibuf, zfpm_g->sock, msg_len - already);
791
792 if (nbyte == 0 || nbyte == -1) {
793 if (nbyte == -1) {
794 char buffer[1024];
795
796 snprintf(buffer, sizeof(buffer),
797 "failed to read message(%d) %s", errno,
798 safe_strerror(errno));
799 zfpm_connection_down(buffer);
800 } else
801 zfpm_connection_down("failed to read message");
802 return;
803 }
804
805 if (nbyte != (ssize_t)(msg_len - already))
806 goto done;
807 }
808
809 /*
810 * Just throw it away for now.
811 */
812 stream_reset(ibuf);
813
814 done:
815 zfpm_read_on();
816 }
817
818 static bool zfpm_updates_pending(void)
819 {
820 if (!(TAILQ_EMPTY(&zfpm_g->dest_q)) || !(TAILQ_EMPTY(&zfpm_g->mac_q)))
821 return true;
822
823 return false;
824 }
825
826 /*
827 * zfpm_writes_pending
828 *
829 * Returns true if we may have something to write to the FPM.
830 */
831 static int zfpm_writes_pending(void)
832 {
833
834 /*
835 * Check if there is any data in the outbound buffer that has not
836 * been written to the socket yet.
837 */
838 if (stream_get_endp(zfpm_g->obuf) - stream_get_getp(zfpm_g->obuf))
839 return 1;
840
841 /*
842 * Check if there are any updates scheduled on the outbound queues.
843 */
844 if (zfpm_updates_pending())
845 return 1;
846
847 return 0;
848 }
849
850 /*
851 * zfpm_encode_route
852 *
853 * Encode a message to the FPM with information about the given route.
854 *
855 * Returns the number of bytes written to the buffer. 0 or a negative
856 * value indicates an error.
857 */
858 static inline int zfpm_encode_route(rib_dest_t *dest, struct route_entry *re,
859 char *in_buf, size_t in_buf_len,
860 fpm_msg_type_e *msg_type)
861 {
862 size_t len;
863 #ifdef HAVE_NETLINK
864 int cmd;
865 #endif
866 len = 0;
867
868 *msg_type = FPM_MSG_TYPE_NONE;
869
870 switch (zfpm_g->message_format) {
871
872 case ZFPM_MSG_FORMAT_PROTOBUF:
873 #ifdef HAVE_PROTOBUF
874 len = zfpm_protobuf_encode_route(dest, re, (uint8_t *)in_buf,
875 in_buf_len);
876 *msg_type = FPM_MSG_TYPE_PROTOBUF;
877 #endif
878 break;
879
880 case ZFPM_MSG_FORMAT_NETLINK:
881 #ifdef HAVE_NETLINK
882 *msg_type = FPM_MSG_TYPE_NETLINK;
883 cmd = re ? RTM_NEWROUTE : RTM_DELROUTE;
884 len = zfpm_netlink_encode_route(cmd, dest, re, in_buf,
885 in_buf_len);
886 assert(fpm_msg_align(len) == len);
887 #endif /* HAVE_NETLINK */
888 break;
889
890 case ZFPM_MSG_FORMAT_NONE:
891 break;
892 }
893
894 return len;
895 }
896
897 /*
898 * zfpm_route_for_update
899 *
900 * Returns the re that is to be sent to the FPM for a given dest.
901 */
902 struct route_entry *zfpm_route_for_update(rib_dest_t *dest)
903 {
904 return dest->selected_fib;
905 }
906
907 /*
908 * Define an enum for return codes for queue processing functions
909 *
910 * FPM_WRITE_STOP: This return code indicates that the write buffer is full.
911 * Stop processing all the queues and empty the buffer by writing its content
912 * to the socket.
913 *
914 * FPM_GOTO_NEXT_Q: This return code indicates that either this queue is
915 * empty or we have processed enough updates from this queue.
916 * So, move on to the next queue.
917 */
918 enum {
919 FPM_WRITE_STOP = 0,
920 FPM_GOTO_NEXT_Q = 1
921 };
922
923 #define FPM_QUEUE_PROCESS_LIMIT 10000
924
925 /*
926 * zfpm_build_route_updates
927 *
928 * Process the dest_q queue and write FPM messages to the outbound buffer.
929 */
930 static int zfpm_build_route_updates(void)
931 {
932 struct stream *s;
933 rib_dest_t *dest;
934 unsigned char *buf, *data, *buf_end;
935 size_t msg_len;
936 size_t data_len;
937 fpm_msg_hdr_t *hdr;
938 struct route_entry *re;
939 int is_add, write_msg;
940 fpm_msg_type_e msg_type;
941 uint16_t q_limit;
942
943 if (TAILQ_EMPTY(&zfpm_g->dest_q))
944 return FPM_GOTO_NEXT_Q;
945
946 s = zfpm_g->obuf;
947 q_limit = FPM_QUEUE_PROCESS_LIMIT;
948
949 do {
950 /*
951 * Make sure there is enough space to write another message.
952 */
953 if (STREAM_WRITEABLE(s) < FPM_MAX_MSG_LEN)
954 return FPM_WRITE_STOP;
955
956 buf = STREAM_DATA(s) + stream_get_endp(s);
957 buf_end = buf + STREAM_WRITEABLE(s);
958
959 dest = TAILQ_FIRST(&zfpm_g->dest_q);
960 if (!dest)
961 return FPM_GOTO_NEXT_Q;
962
963 assert(CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM));
964
965 hdr = (fpm_msg_hdr_t *)buf;
966 hdr->version = FPM_PROTO_VERSION;
967
968 data = fpm_msg_data(hdr);
969
970 re = zfpm_route_for_update(dest);
971 is_add = re ? 1 : 0;
972
973 write_msg = 1;
974
975 /*
976 * If this is a route deletion, and we have not sent the route
977 * to
978 * the FPM previously, skip it.
979 */
980 if (!is_add && !CHECK_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM)) {
981 write_msg = 0;
982 zfpm_g->stats.nop_deletes_skipped++;
983 }
984
985 if (write_msg) {
986 data_len = zfpm_encode_route(dest, re, (char *)data,
987 buf_end - data, &msg_type);
988
989 if (data_len) {
990 hdr->msg_type = msg_type;
991 msg_len = fpm_data_len_to_msg_len(data_len);
992 hdr->msg_len = htons(msg_len);
993 stream_forward_endp(s, msg_len);
994
995 if (is_add)
996 zfpm_g->stats.route_adds++;
997 else
998 zfpm_g->stats.route_dels++;
999 } else {
1000 zlog_err("%s: Encoding Prefix: %pRN No valid nexthops",
1001 __func__, dest->rnode);
1002 }
1003 }
1004
1005 /*
1006 * Remove the dest from the queue, and reset the flag.
1007 */
1008 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1009 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
1010
1011 if (is_add) {
1012 SET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
1013 } else {
1014 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
1015 }
1016
1017 /*
1018 * Delete the destination if necessary.
1019 */
1020 if (rib_gc_dest(dest->rnode))
1021 zfpm_g->stats.dests_del_after_update++;
1022
1023 q_limit--;
1024 if (q_limit == 0) {
1025 /*
1026 * We have processed enough updates in this queue.
1027 * Now yield for other queues.
1028 */
1029 return FPM_GOTO_NEXT_Q;
1030 }
1031 } while (true);
1032 }
1033
1034 /*
1035 * zfpm_encode_mac
1036 *
1037 * Encode a message to FPM with information about the given MAC.
1038 *
1039 * Returns the number of bytes written to the buffer.
1040 */
1041 static inline int zfpm_encode_mac(struct fpm_mac_info_t *mac, char *in_buf,
1042 size_t in_buf_len, fpm_msg_type_e *msg_type)
1043 {
1044 size_t len = 0;
1045
1046 *msg_type = FPM_MSG_TYPE_NONE;
1047
1048 switch (zfpm_g->message_format) {
1049
1050 case ZFPM_MSG_FORMAT_NONE:
1051 break;
1052 case ZFPM_MSG_FORMAT_NETLINK:
1053 #ifdef HAVE_NETLINK
1054 len = zfpm_netlink_encode_mac(mac, in_buf, in_buf_len);
1055 assert(fpm_msg_align(len) == len);
1056 *msg_type = FPM_MSG_TYPE_NETLINK;
1057 #endif /* HAVE_NETLINK */
1058 break;
1059 case ZFPM_MSG_FORMAT_PROTOBUF:
1060 break;
1061 }
1062 return len;
1063 }
1064
1065 static int zfpm_build_mac_updates(void)
1066 {
1067 struct stream *s;
1068 struct fpm_mac_info_t *mac;
1069 unsigned char *buf, *data, *buf_end;
1070 fpm_msg_hdr_t *hdr;
1071 size_t data_len, msg_len;
1072 fpm_msg_type_e msg_type;
1073 uint16_t q_limit;
1074
1075 if (TAILQ_EMPTY(&zfpm_g->mac_q))
1076 return FPM_GOTO_NEXT_Q;
1077
1078 s = zfpm_g->obuf;
1079 q_limit = FPM_QUEUE_PROCESS_LIMIT;
1080
1081 do {
1082 /* Make sure there is enough space to write another message. */
1083 if (STREAM_WRITEABLE(s) < FPM_MAX_MAC_MSG_LEN)
1084 return FPM_WRITE_STOP;
1085
1086 buf = STREAM_DATA(s) + stream_get_endp(s);
1087 buf_end = buf + STREAM_WRITEABLE(s);
1088
1089 mac = TAILQ_FIRST(&zfpm_g->mac_q);
1090 if (!mac)
1091 return FPM_GOTO_NEXT_Q;
1092
1093 /* Check for no-op */
1094 if (!CHECK_FLAG(mac->fpm_flags, ZEBRA_MAC_UPDATE_FPM)) {
1095 zfpm_g->stats.nop_deletes_skipped++;
1096 zfpm_mac_info_del(mac);
1097 continue;
1098 }
1099
1100 hdr = (fpm_msg_hdr_t *)buf;
1101 hdr->version = FPM_PROTO_VERSION;
1102
1103 data = fpm_msg_data(hdr);
1104 data_len = zfpm_encode_mac(mac, (char *)data, buf_end - data,
1105 &msg_type);
1106 assert(data_len);
1107
1108 hdr->msg_type = msg_type;
1109 msg_len = fpm_data_len_to_msg_len(data_len);
1110 hdr->msg_len = htons(msg_len);
1111 stream_forward_endp(s, msg_len);
1112
1113 /* Remove the MAC from the queue, and delete it. */
1114 zfpm_mac_info_del(mac);
1115
1116 q_limit--;
1117 if (q_limit == 0) {
1118 /*
1119 * We have processed enough updates in this queue.
1120 * Now yield for other queues.
1121 */
1122 return FPM_GOTO_NEXT_Q;
1123 }
1124 } while (1);
1125 }
1126
1127 /*
1128 * zfpm_build_updates
1129 *
1130 * Process the outgoing queues and write messages to the outbound
1131 * buffer.
1132 */
1133 static void zfpm_build_updates(void)
1134 {
1135 struct stream *s;
1136
1137 s = zfpm_g->obuf;
1138 assert(stream_empty(s));
1139
1140 do {
1141 /*
1142 * Stop processing the queues if zfpm_g->obuf is full
1143 * or we do not have more updates to process
1144 */
1145 if (zfpm_build_mac_updates() == FPM_WRITE_STOP)
1146 break;
1147 if (zfpm_build_route_updates() == FPM_WRITE_STOP)
1148 break;
1149 } while (zfpm_updates_pending());
1150 }
1151
1152 /*
1153 * zfpm_write_cb
1154 */
1155 static void zfpm_write_cb(struct thread *thread)
1156 {
1157 struct stream *s;
1158 int num_writes;
1159
1160 zfpm_g->stats.write_cb_calls++;
1161
1162 /*
1163 * Check if async connect is now done.
1164 */
1165 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
1166 zfpm_connect_check();
1167 return;
1168 }
1169
1170 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1171 assert(zfpm_g->sock >= 0);
1172
1173 num_writes = 0;
1174
1175 do {
1176 int bytes_to_write, bytes_written;
1177
1178 s = zfpm_g->obuf;
1179
1180 /*
1181 * If the stream is empty, try fill it up with data.
1182 */
1183 if (stream_empty(s)) {
1184 zfpm_build_updates();
1185 }
1186
1187 bytes_to_write = stream_get_endp(s) - stream_get_getp(s);
1188 if (!bytes_to_write)
1189 break;
1190
1191 bytes_written =
1192 write(zfpm_g->sock, stream_pnt(s), bytes_to_write);
1193 zfpm_g->stats.write_calls++;
1194 num_writes++;
1195
1196 if (bytes_written < 0) {
1197 if (ERRNO_IO_RETRY(errno))
1198 break;
1199
1200 zfpm_connection_down("failed to write to socket");
1201 return;
1202 }
1203
1204 if (bytes_written != bytes_to_write) {
1205
1206 /*
1207 * Partial write.
1208 */
1209 stream_forward_getp(s, bytes_written);
1210 zfpm_g->stats.partial_writes++;
1211 break;
1212 }
1213
1214 /*
1215 * We've written out the entire contents of the stream.
1216 */
1217 stream_reset(s);
1218
1219 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN) {
1220 zfpm_g->stats.max_writes_hit++;
1221 break;
1222 }
1223
1224 if (zfpm_thread_should_yield(thread)) {
1225 zfpm_g->stats.t_write_yields++;
1226 break;
1227 }
1228 } while (1);
1229
1230 if (zfpm_writes_pending())
1231 zfpm_write_on();
1232 }
1233
1234 /*
1235 * zfpm_connect_cb
1236 */
1237 static void zfpm_connect_cb(struct thread *t)
1238 {
1239 int sock, ret;
1240 struct sockaddr_in serv;
1241
1242 assert(zfpm_g->state == ZFPM_STATE_ACTIVE);
1243
1244 sock = socket(AF_INET, SOCK_STREAM, 0);
1245 if (sock < 0) {
1246 zlog_err("Failed to create socket for connect(): %s",
1247 strerror(errno));
1248 zfpm_g->stats.connect_no_sock++;
1249 return;
1250 }
1251
1252 set_nonblocking(sock);
1253
1254 /* Make server socket. */
1255 memset(&serv, 0, sizeof(serv));
1256 serv.sin_family = AF_INET;
1257 serv.sin_port = htons(zfpm_g->fpm_port);
1258 #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1259 serv.sin_len = sizeof(struct sockaddr_in);
1260 #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1261 if (!zfpm_g->fpm_server)
1262 serv.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1263 else
1264 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1265
1266 /*
1267 * Connect to the FPM.
1268 */
1269 zfpm_g->connect_calls++;
1270 zfpm_g->stats.connect_calls++;
1271 zfpm_g->last_connect_call_time = monotime(NULL);
1272
1273 ret = connect(sock, (struct sockaddr *)&serv, sizeof(serv));
1274 if (ret >= 0) {
1275 zfpm_g->sock = sock;
1276 zfpm_connection_up("connect succeeded");
1277 return;
1278 }
1279
1280 if (errno == EINPROGRESS) {
1281 zfpm_g->sock = sock;
1282 zfpm_read_on();
1283 zfpm_write_on();
1284 zfpm_set_state(ZFPM_STATE_CONNECTING,
1285 "async connect in progress");
1286 return;
1287 }
1288
1289 zlog_info("can't connect to FPM %d: %s", sock, safe_strerror(errno));
1290 close(sock);
1291
1292 /*
1293 * Restart timer for retrying connection.
1294 */
1295 zfpm_start_connect_timer("connect() failed");
1296 }
1297
1298 /*
1299 * zfpm_set_state
1300 *
1301 * Move state machine into the given state.
1302 */
1303 static void zfpm_set_state(enum zfpm_state state, const char *reason)
1304 {
1305 enum zfpm_state cur_state = zfpm_g->state;
1306
1307 if (!reason)
1308 reason = "Unknown";
1309
1310 if (state == cur_state)
1311 return;
1312
1313 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1314 zfpm_state_to_str(cur_state), zfpm_state_to_str(state),
1315 reason);
1316
1317 switch (state) {
1318
1319 case ZFPM_STATE_IDLE:
1320 assert(cur_state == ZFPM_STATE_ESTABLISHED);
1321 break;
1322
1323 case ZFPM_STATE_ACTIVE:
1324 assert(cur_state == ZFPM_STATE_IDLE
1325 || cur_state == ZFPM_STATE_CONNECTING);
1326 assert(zfpm_g->t_connect);
1327 break;
1328
1329 case ZFPM_STATE_CONNECTING:
1330 assert(zfpm_g->sock);
1331 assert(cur_state == ZFPM_STATE_ACTIVE);
1332 assert(zfpm_g->t_read);
1333 assert(zfpm_g->t_write);
1334 break;
1335
1336 case ZFPM_STATE_ESTABLISHED:
1337 assert(cur_state == ZFPM_STATE_ACTIVE
1338 || cur_state == ZFPM_STATE_CONNECTING);
1339 assert(zfpm_g->sock);
1340 assert(zfpm_g->t_read);
1341 assert(zfpm_g->t_write);
1342 break;
1343 }
1344
1345 zfpm_g->state = state;
1346 }
1347
1348 /*
1349 * zfpm_calc_connect_delay
1350 *
1351 * Returns the number of seconds after which we should attempt to
1352 * reconnect to the FPM.
1353 */
1354 static long zfpm_calc_connect_delay(void)
1355 {
1356 time_t elapsed;
1357
1358 /*
1359 * Return 0 if this is our first attempt to connect.
1360 */
1361 if (zfpm_g->connect_calls == 0) {
1362 return 0;
1363 }
1364
1365 elapsed = zfpm_get_elapsed_time(zfpm_g->last_connect_call_time);
1366
1367 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1368 return 0;
1369 }
1370
1371 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1372 }
1373
1374 /*
1375 * zfpm_start_connect_timer
1376 */
1377 static void zfpm_start_connect_timer(const char *reason)
1378 {
1379 long delay_secs;
1380
1381 assert(!zfpm_g->t_connect);
1382 assert(zfpm_g->sock < 0);
1383
1384 assert(zfpm_g->state == ZFPM_STATE_IDLE
1385 || zfpm_g->state == ZFPM_STATE_ACTIVE
1386 || zfpm_g->state == ZFPM_STATE_CONNECTING);
1387
1388 delay_secs = zfpm_calc_connect_delay();
1389 zfpm_debug("scheduling connect in %ld seconds", delay_secs);
1390
1391 thread_add_timer(zfpm_g->master, zfpm_connect_cb, 0, delay_secs,
1392 &zfpm_g->t_connect);
1393 zfpm_set_state(ZFPM_STATE_ACTIVE, reason);
1394 }
1395
1396 /*
1397 * zfpm_is_enabled
1398 *
1399 * Returns true if the zebra FPM module has been enabled.
1400 */
1401 static inline int zfpm_is_enabled(void)
1402 {
1403 return zfpm_g->enabled;
1404 }
1405
1406 /*
1407 * zfpm_conn_is_up
1408 *
1409 * Returns true if the connection to the FPM is up.
1410 */
1411 static inline int zfpm_conn_is_up(void)
1412 {
1413 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1414 return 0;
1415
1416 assert(zfpm_g->sock >= 0);
1417
1418 return 1;
1419 }
1420
1421 /*
1422 * zfpm_trigger_update
1423 *
1424 * The zebra code invokes this function to indicate that we should
1425 * send an update to the FPM about the given route_node.
1426 */
1427 static int zfpm_trigger_update(struct route_node *rn, const char *reason)
1428 {
1429 rib_dest_t *dest;
1430
1431 /*
1432 * Ignore if the connection is down. We will update the FPM about
1433 * all destinations once the connection comes up.
1434 */
1435 if (!zfpm_conn_is_up())
1436 return 0;
1437
1438 dest = rib_dest_from_rnode(rn);
1439
1440 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
1441 zfpm_g->stats.redundant_triggers++;
1442 return 0;
1443 }
1444
1445 if (reason) {
1446 zfpm_debug("%pFX triggering update to FPM - Reason: %s", &rn->p,
1447 reason);
1448 }
1449
1450 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1451 TAILQ_INSERT_TAIL(&zfpm_g->dest_q, dest, fpm_q_entries);
1452 zfpm_g->stats.updates_triggered++;
1453
1454 /*
1455 * Make sure that writes are enabled.
1456 */
1457 if (zfpm_g->t_write)
1458 return 0;
1459
1460 zfpm_write_on();
1461 return 0;
1462 }
1463
1464 /*
1465 * zfpm_trigger_remove
1466 *
1467 * The zebra code invokes this function to indicate that we should
1468 * send an remove to the FPM about the given route_node.
1469 */
1470
1471 static int zfpm_trigger_remove(struct route_node *rn)
1472 {
1473 rib_dest_t *dest;
1474
1475 if (!zfpm_conn_is_up())
1476 return 0;
1477
1478 dest = rib_dest_from_rnode(rn);
1479 if (!CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM))
1480 return 0;
1481
1482 zfpm_debug("%pRN Removing from update queue shutting down", rn);
1483
1484 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1485 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
1486
1487 return 0;
1488 }
1489
1490 /*
1491 * Generate Key for FPM MAC info hash entry
1492 */
1493 static unsigned int zfpm_mac_info_hash_keymake(const void *p)
1494 {
1495 struct fpm_mac_info_t *fpm_mac = (struct fpm_mac_info_t *)p;
1496 uint32_t mac_key;
1497
1498 mac_key = jhash(fpm_mac->macaddr.octet, ETH_ALEN, 0xa5a5a55a);
1499
1500 return jhash_2words(mac_key, fpm_mac->vni, 0);
1501 }
1502
1503 /*
1504 * Compare function for FPM MAC info hash lookup
1505 */
1506 static bool zfpm_mac_info_cmp(const void *p1, const void *p2)
1507 {
1508 const struct fpm_mac_info_t *fpm_mac1 = p1;
1509 const struct fpm_mac_info_t *fpm_mac2 = p2;
1510
1511 if (memcmp(fpm_mac1->macaddr.octet, fpm_mac2->macaddr.octet, ETH_ALEN)
1512 != 0)
1513 return false;
1514 if (fpm_mac1->vni != fpm_mac2->vni)
1515 return false;
1516
1517 return true;
1518 }
1519
1520 /*
1521 * Lookup FPM MAC info hash entry.
1522 */
1523 static struct fpm_mac_info_t *zfpm_mac_info_lookup(struct fpm_mac_info_t *key)
1524 {
1525 return hash_lookup(zfpm_g->fpm_mac_info_table, key);
1526 }
1527
1528 /*
1529 * Callback to allocate fpm_mac_info_t structure.
1530 */
1531 static void *zfpm_mac_info_alloc(void *p)
1532 {
1533 const struct fpm_mac_info_t *key = p;
1534 struct fpm_mac_info_t *fpm_mac;
1535
1536 fpm_mac = XCALLOC(MTYPE_FPM_MAC_INFO, sizeof(struct fpm_mac_info_t));
1537
1538 memcpy(&fpm_mac->macaddr, &key->macaddr, ETH_ALEN);
1539 fpm_mac->vni = key->vni;
1540
1541 return (void *)fpm_mac;
1542 }
1543
1544 /*
1545 * Delink and free fpm_mac_info_t.
1546 */
1547 static void zfpm_mac_info_del(struct fpm_mac_info_t *fpm_mac)
1548 {
1549 hash_release(zfpm_g->fpm_mac_info_table, fpm_mac);
1550 TAILQ_REMOVE(&zfpm_g->mac_q, fpm_mac, fpm_mac_q_entries);
1551 XFREE(MTYPE_FPM_MAC_INFO, fpm_mac);
1552 }
1553
1554 /*
1555 * zfpm_trigger_rmac_update
1556 *
1557 * Zebra code invokes this function to indicate that we should
1558 * send an update to FPM for given MAC entry.
1559 *
1560 * This function checks if we already have enqueued an update for this RMAC,
1561 * If yes, update the same fpm_mac_info_t. Else, create and enqueue an update.
1562 */
1563 static int zfpm_trigger_rmac_update(struct zebra_mac *rmac,
1564 struct zebra_l3vni *zl3vni, bool delete,
1565 const char *reason)
1566 {
1567 struct fpm_mac_info_t *fpm_mac, key;
1568 struct interface *vxlan_if, *svi_if;
1569 bool mac_found = false;
1570
1571 /*
1572 * Ignore if the connection is down. We will update the FPM about
1573 * all destinations once the connection comes up.
1574 */
1575 if (!zfpm_conn_is_up())
1576 return 0;
1577
1578 if (reason) {
1579 zfpm_debug("triggering update to FPM - Reason: %s - %pEA",
1580 reason, &rmac->macaddr);
1581 }
1582
1583 vxlan_if = zl3vni_map_to_vxlan_if(zl3vni);
1584 svi_if = zl3vni_map_to_svi_if(zl3vni);
1585
1586 memset(&key, 0, sizeof(key));
1587
1588 memcpy(&key.macaddr, &rmac->macaddr, ETH_ALEN);
1589 key.vni = zl3vni->vni;
1590
1591 /* Check if this MAC is already present in the queue. */
1592 fpm_mac = zfpm_mac_info_lookup(&key);
1593
1594 if (fpm_mac) {
1595 mac_found = true;
1596
1597 /*
1598 * If the enqueued op is "add" and current op is "delete",
1599 * this is a noop. So, Unset ZEBRA_MAC_UPDATE_FPM flag.
1600 * While processing FPM queue, we will silently delete this
1601 * MAC entry without sending any update for this MAC.
1602 */
1603 if (!CHECK_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_DELETE_FPM) &&
1604 delete == 1) {
1605 SET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_DELETE_FPM);
1606 UNSET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_UPDATE_FPM);
1607 return 0;
1608 }
1609 } else
1610 fpm_mac = hash_get(zfpm_g->fpm_mac_info_table, &key,
1611 zfpm_mac_info_alloc);
1612
1613 fpm_mac->r_vtep_ip.s_addr = rmac->fwd_info.r_vtep_ip.s_addr;
1614 fpm_mac->zebra_flags = rmac->flags;
1615 fpm_mac->vxlan_if = vxlan_if ? vxlan_if->ifindex : 0;
1616 fpm_mac->svi_if = svi_if ? svi_if->ifindex : 0;
1617
1618 SET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_UPDATE_FPM);
1619 if (delete)
1620 SET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_DELETE_FPM);
1621 else
1622 UNSET_FLAG(fpm_mac->fpm_flags, ZEBRA_MAC_DELETE_FPM);
1623
1624 if (!mac_found)
1625 TAILQ_INSERT_TAIL(&zfpm_g->mac_q, fpm_mac, fpm_mac_q_entries);
1626
1627 zfpm_g->stats.updates_triggered++;
1628
1629 /* If writes are already enabled, return. */
1630 if (zfpm_g->t_write)
1631 return 0;
1632
1633 zfpm_write_on();
1634 return 0;
1635 }
1636
1637 /*
1638 * This function is called when the FPM connections is established.
1639 * Iterate over all the RMAC entries for the given L3VNI
1640 * and enqueue the RMAC for FPM processing.
1641 */
1642 static void zfpm_trigger_rmac_update_wrapper(struct hash_bucket *bucket,
1643 void *args)
1644 {
1645 struct zebra_mac *zrmac = (struct zebra_mac *)bucket->data;
1646 struct zebra_l3vni *zl3vni = (struct zebra_l3vni *)args;
1647
1648 zfpm_trigger_rmac_update(zrmac, zl3vni, false, "RMAC added");
1649 }
1650
1651 /*
1652 * This function is called when the FPM connections is established.
1653 * This function iterates over all the L3VNIs to trigger
1654 * FPM updates for RMACs currently available.
1655 */
1656 static void zfpm_iterate_rmac_table(struct hash_bucket *bucket, void *args)
1657 {
1658 struct zebra_l3vni *zl3vni = (struct zebra_l3vni *)bucket->data;
1659
1660 hash_iterate(zl3vni->rmac_table, zfpm_trigger_rmac_update_wrapper,
1661 (void *)zl3vni);
1662 }
1663
1664 /*
1665 * struct zfpm_statsimer_cb
1666 */
1667 static void zfpm_stats_timer_cb(struct thread *t)
1668 {
1669 zfpm_g->t_stats = NULL;
1670
1671 /*
1672 * Remember the stats collected in the last interval for display
1673 * purposes.
1674 */
1675 zfpm_stats_copy(&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1676
1677 /*
1678 * Add the current set of stats into the cumulative statistics.
1679 */
1680 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1681 &zfpm_g->cumulative_stats);
1682
1683 /*
1684 * Start collecting stats afresh over the next interval.
1685 */
1686 zfpm_stats_reset(&zfpm_g->stats);
1687
1688 zfpm_start_stats_timer();
1689 }
1690
1691 /*
1692 * zfpm_stop_stats_timer
1693 */
1694 static void zfpm_stop_stats_timer(void)
1695 {
1696 if (!zfpm_g->t_stats)
1697 return;
1698
1699 zfpm_debug("Stopping existing stats timer");
1700 THREAD_OFF(zfpm_g->t_stats);
1701 }
1702
1703 /*
1704 * zfpm_start_stats_timer
1705 */
1706 void zfpm_start_stats_timer(void)
1707 {
1708 assert(!zfpm_g->t_stats);
1709
1710 thread_add_timer(zfpm_g->master, zfpm_stats_timer_cb, 0,
1711 ZFPM_STATS_IVL_SECS, &zfpm_g->t_stats);
1712 }
1713
1714 /*
1715 * Helper macro for zfpm_show_stats() below.
1716 */
1717 #define ZFPM_SHOW_STAT(counter) \
1718 do { \
1719 vty_out(vty, "%-40s %10lu %16lu\n", #counter, \
1720 total_stats.counter, zfpm_g->last_ivl_stats.counter); \
1721 } while (0)
1722
1723 /*
1724 * zfpm_show_stats
1725 */
1726 static void zfpm_show_stats(struct vty *vty)
1727 {
1728 struct zfpm_stats total_stats;
1729 time_t elapsed;
1730
1731 vty_out(vty, "\n%-40s %10s Last %2d secs\n\n", "Counter", "Total",
1732 ZFPM_STATS_IVL_SECS);
1733
1734 /*
1735 * Compute the total stats up to this instant.
1736 */
1737 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1738 &total_stats);
1739
1740 ZFPM_SHOW_STAT(connect_calls);
1741 ZFPM_SHOW_STAT(connect_no_sock);
1742 ZFPM_SHOW_STAT(read_cb_calls);
1743 ZFPM_SHOW_STAT(write_cb_calls);
1744 ZFPM_SHOW_STAT(write_calls);
1745 ZFPM_SHOW_STAT(partial_writes);
1746 ZFPM_SHOW_STAT(max_writes_hit);
1747 ZFPM_SHOW_STAT(t_write_yields);
1748 ZFPM_SHOW_STAT(nop_deletes_skipped);
1749 ZFPM_SHOW_STAT(route_adds);
1750 ZFPM_SHOW_STAT(route_dels);
1751 ZFPM_SHOW_STAT(updates_triggered);
1752 ZFPM_SHOW_STAT(redundant_triggers);
1753 ZFPM_SHOW_STAT(dests_del_after_update);
1754 ZFPM_SHOW_STAT(t_conn_down_starts);
1755 ZFPM_SHOW_STAT(t_conn_down_dests_processed);
1756 ZFPM_SHOW_STAT(t_conn_down_yields);
1757 ZFPM_SHOW_STAT(t_conn_down_finishes);
1758 ZFPM_SHOW_STAT(t_conn_up_starts);
1759 ZFPM_SHOW_STAT(t_conn_up_dests_processed);
1760 ZFPM_SHOW_STAT(t_conn_up_yields);
1761 ZFPM_SHOW_STAT(t_conn_up_aborts);
1762 ZFPM_SHOW_STAT(t_conn_up_finishes);
1763
1764 if (!zfpm_g->last_stats_clear_time)
1765 return;
1766
1767 elapsed = zfpm_get_elapsed_time(zfpm_g->last_stats_clear_time);
1768
1769 vty_out(vty, "\nStats were cleared %lu seconds ago\n",
1770 (unsigned long)elapsed);
1771 }
1772
1773 /*
1774 * zfpm_clear_stats
1775 */
1776 static void zfpm_clear_stats(struct vty *vty)
1777 {
1778 if (!zfpm_is_enabled()) {
1779 vty_out(vty, "The FPM module is not enabled...\n");
1780 return;
1781 }
1782
1783 zfpm_stats_reset(&zfpm_g->stats);
1784 zfpm_stats_reset(&zfpm_g->last_ivl_stats);
1785 zfpm_stats_reset(&zfpm_g->cumulative_stats);
1786
1787 zfpm_stop_stats_timer();
1788 zfpm_start_stats_timer();
1789
1790 zfpm_g->last_stats_clear_time = monotime(NULL);
1791
1792 vty_out(vty, "Cleared FPM stats\n");
1793 }
1794
1795 /*
1796 * show_zebra_fpm_stats
1797 */
1798 DEFUN (show_zebra_fpm_stats,
1799 show_zebra_fpm_stats_cmd,
1800 "show zebra fpm stats",
1801 SHOW_STR
1802 ZEBRA_STR
1803 "Forwarding Path Manager information\n"
1804 "Statistics\n")
1805 {
1806 zfpm_show_stats(vty);
1807 return CMD_SUCCESS;
1808 }
1809
1810 /*
1811 * clear_zebra_fpm_stats
1812 */
1813 DEFUN (clear_zebra_fpm_stats,
1814 clear_zebra_fpm_stats_cmd,
1815 "clear zebra fpm stats",
1816 CLEAR_STR
1817 ZEBRA_STR
1818 "Clear Forwarding Path Manager information\n"
1819 "Statistics\n")
1820 {
1821 zfpm_clear_stats(vty);
1822 return CMD_SUCCESS;
1823 }
1824
1825 /*
1826 * update fpm connection information
1827 */
1828 DEFUN (fpm_remote_ip,
1829 fpm_remote_ip_cmd,
1830 "fpm connection ip A.B.C.D port (1-65535)",
1831 "Forwarding Path Manager\n"
1832 "Configure FPM connection\n"
1833 "Connect to IPv4 address\n"
1834 "Connect to IPv4 address\n"
1835 "TCP port number\n"
1836 "TCP port number\n")
1837 {
1838
1839 in_addr_t fpm_server;
1840 uint32_t port_no;
1841
1842 fpm_server = inet_addr(argv[3]->arg);
1843 if (fpm_server == INADDR_NONE)
1844 return CMD_ERR_INCOMPLETE;
1845
1846 port_no = atoi(argv[5]->arg);
1847 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1848 return CMD_ERR_INCOMPLETE;
1849
1850 zfpm_g->fpm_server = fpm_server;
1851 zfpm_g->fpm_port = port_no;
1852
1853
1854 return CMD_SUCCESS;
1855 }
1856
1857 DEFUN (no_fpm_remote_ip,
1858 no_fpm_remote_ip_cmd,
1859 "no fpm connection ip A.B.C.D port (1-65535)",
1860 NO_STR
1861 "Forwarding Path Manager\n"
1862 "Remove configured FPM connection\n"
1863 "Connect to IPv4 address\n"
1864 "Connect to IPv4 address\n"
1865 "TCP port number\n"
1866 "TCP port number\n")
1867 {
1868 if (zfpm_g->fpm_server != inet_addr(argv[4]->arg)
1869 || zfpm_g->fpm_port != atoi(argv[6]->arg))
1870 return CMD_ERR_NO_MATCH;
1871
1872 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1873 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1874
1875 return CMD_SUCCESS;
1876 }
1877
1878 /*
1879 * zfpm_init_message_format
1880 */
1881 static inline void zfpm_init_message_format(const char *format)
1882 {
1883 int have_netlink, have_protobuf;
1884
1885 #ifdef HAVE_NETLINK
1886 have_netlink = 1;
1887 #else
1888 have_netlink = 0;
1889 #endif
1890
1891 #ifdef HAVE_PROTOBUF
1892 have_protobuf = 1;
1893 #else
1894 have_protobuf = 0;
1895 #endif
1896
1897 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
1898
1899 if (!format) {
1900 if (have_netlink) {
1901 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1902 } else if (have_protobuf) {
1903 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1904 }
1905 return;
1906 }
1907
1908 if (!strcmp("netlink", format)) {
1909 if (!have_netlink) {
1910 flog_err(EC_ZEBRA_NETLINK_NOT_AVAILABLE,
1911 "FPM netlink message format is not available");
1912 return;
1913 }
1914 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1915 return;
1916 }
1917
1918 if (!strcmp("protobuf", format)) {
1919 if (!have_protobuf) {
1920 flog_err(
1921 EC_ZEBRA_PROTOBUF_NOT_AVAILABLE,
1922 "FPM protobuf message format is not available");
1923 return;
1924 }
1925 flog_warn(EC_ZEBRA_PROTOBUF_NOT_AVAILABLE,
1926 "FPM protobuf message format is deprecated and scheduled to be removed. Please convert to using netlink format or contact dev@lists.frrouting.org with your use case.");
1927 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1928 return;
1929 }
1930
1931 flog_warn(EC_ZEBRA_FPM_FORMAT_UNKNOWN, "Unknown fpm format '%s'",
1932 format);
1933 }
1934
1935 /**
1936 * fpm_remote_srv_write
1937 *
1938 * Module to write remote fpm connection
1939 *
1940 * Returns ZERO on success.
1941 */
1942
1943 static int fpm_remote_srv_write(struct vty *vty)
1944 {
1945 struct in_addr in;
1946
1947 in.s_addr = zfpm_g->fpm_server;
1948
1949 if ((zfpm_g->fpm_server != FPM_DEFAULT_IP
1950 && zfpm_g->fpm_server != INADDR_ANY)
1951 || (zfpm_g->fpm_port != FPM_DEFAULT_PORT && zfpm_g->fpm_port != 0))
1952 vty_out(vty, "fpm connection ip %pI4 port %d\n", &in,
1953 zfpm_g->fpm_port);
1954
1955 return 0;
1956 }
1957
1958
1959 static int fpm_remote_srv_write(struct vty *vty);
1960 /* Zebra node */
1961 static struct cmd_node zebra_node = {
1962 .name = "zebra",
1963 .node = ZEBRA_NODE,
1964 .parent_node = CONFIG_NODE,
1965 .prompt = "",
1966 .config_write = fpm_remote_srv_write,
1967 };
1968
1969
1970 /**
1971 * zfpm_init
1972 *
1973 * One-time initialization of the Zebra FPM module.
1974 *
1975 * @param[in] port port at which FPM is running.
1976 * @param[in] enable true if the zebra FPM module should be enabled
1977 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
1978 *
1979 * Returns true on success.
1980 */
1981 static int zfpm_init(struct thread_master *master)
1982 {
1983 int enable = 1;
1984 uint16_t port = 0;
1985 const char *format = THIS_MODULE->load_args;
1986
1987 memset(zfpm_g, 0, sizeof(*zfpm_g));
1988 zfpm_g->master = master;
1989 TAILQ_INIT(&zfpm_g->dest_q);
1990 TAILQ_INIT(&zfpm_g->mac_q);
1991
1992 /* Create hash table for fpm_mac_info_t enties */
1993 zfpm_g->fpm_mac_info_table = hash_create(zfpm_mac_info_hash_keymake,
1994 zfpm_mac_info_cmp,
1995 "FPM MAC info hash table");
1996
1997 zfpm_g->sock = -1;
1998 zfpm_g->state = ZFPM_STATE_IDLE;
1999
2000 zfpm_stats_init(&zfpm_g->stats);
2001 zfpm_stats_init(&zfpm_g->last_ivl_stats);
2002 zfpm_stats_init(&zfpm_g->cumulative_stats);
2003
2004 memset(&ipv4ll_gateway, 0, sizeof(ipv4ll_gateway));
2005 if (inet_pton(AF_INET, ipv4_ll_buf, &ipv4ll_gateway.ipv4) != 1)
2006 zlog_warn("inet_pton failed for %s", ipv4_ll_buf);
2007
2008 install_node(&zebra_node);
2009 install_element(ENABLE_NODE, &show_zebra_fpm_stats_cmd);
2010 install_element(ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
2011 install_element(CONFIG_NODE, &fpm_remote_ip_cmd);
2012 install_element(CONFIG_NODE, &no_fpm_remote_ip_cmd);
2013
2014 zfpm_init_message_format(format);
2015
2016 /*
2017 * Disable FPM interface if no suitable format is available.
2018 */
2019 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
2020 enable = 0;
2021
2022 zfpm_g->enabled = enable;
2023
2024 if (!zfpm_g->fpm_server)
2025 zfpm_g->fpm_server = FPM_DEFAULT_IP;
2026
2027 if (!port)
2028 port = FPM_DEFAULT_PORT;
2029
2030 zfpm_g->fpm_port = port;
2031
2032 zfpm_g->obuf = stream_new(ZFPM_OBUF_SIZE);
2033 zfpm_g->ibuf = stream_new(ZFPM_IBUF_SIZE);
2034
2035 zfpm_start_stats_timer();
2036 zfpm_start_connect_timer("initialized");
2037 return 0;
2038 }
2039
2040 static int zfpm_fini(void)
2041 {
2042 zfpm_write_off();
2043 zfpm_read_off();
2044 zfpm_connect_off();
2045
2046 zfpm_stop_stats_timer();
2047
2048 hook_unregister(rib_update, zfpm_trigger_update);
2049 return 0;
2050 }
2051
2052 static int zebra_fpm_module_init(void)
2053 {
2054 hook_register(rib_update, zfpm_trigger_update);
2055 hook_register(rib_shutdown, zfpm_trigger_remove);
2056 hook_register(zebra_rmac_update, zfpm_trigger_rmac_update);
2057 hook_register(frr_late_init, zfpm_init);
2058 hook_register(frr_early_fini, zfpm_fini);
2059 return 0;
2060 }
2061
2062 FRR_MODULE_SETUP(.name = "zebra_fpm", .version = FRR_VERSION,
2063 .description = "zebra FPM (Forwarding Plane Manager) module",
2064 .init = zebra_fpm_module_init,
2065 );