]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_fpm.c
Merge pull request #1180 from dwalton76/ipv6-static-route-null0
[mirror_frr.git] / zebra / zebra_fpm.c
1 /*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; see the file COPYING; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 */
23
24 #include <zebra.h>
25
26 #include "log.h"
27 #include "libfrr.h"
28 #include "stream.h"
29 #include "thread.h"
30 #include "network.h"
31 #include "command.h"
32 #include "version.h"
33
34 #include "zebra/rib.h"
35 #include "zebra/zserv.h"
36 #include "zebra/zebra_ns.h"
37 #include "zebra/zebra_vrf.h"
38
39 #include "fpm/fpm.h"
40 #include "zebra_fpm_private.h"
41
42 /*
43 * Interval at which we attempt to connect to the FPM.
44 */
45 #define ZFPM_CONNECT_RETRY_IVL 5
46
47 /*
48 * Sizes of outgoing and incoming stream buffers for writing/reading
49 * FPM messages.
50 */
51 #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
52 #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
53
54 /*
55 * The maximum number of times the FPM socket write callback can call
56 * 'write' before it yields.
57 */
58 #define ZFPM_MAX_WRITES_PER_RUN 10
59
60 /*
61 * Interval over which we collect statistics.
62 */
63 #define ZFPM_STATS_IVL_SECS 10
64
65 /*
66 * Structure that holds state for iterating over all route_node
67 * structures that are candidates for being communicated to the FPM.
68 */
69 typedef struct zfpm_rnodes_iter_t_ {
70 rib_tables_iter_t tables_iter;
71 route_table_iter_t iter;
72 } zfpm_rnodes_iter_t;
73
74 /*
75 * Statistics.
76 */
77 typedef struct zfpm_stats_t_ {
78 unsigned long connect_calls;
79 unsigned long connect_no_sock;
80
81 unsigned long read_cb_calls;
82
83 unsigned long write_cb_calls;
84 unsigned long write_calls;
85 unsigned long partial_writes;
86 unsigned long max_writes_hit;
87 unsigned long t_write_yields;
88
89 unsigned long nop_deletes_skipped;
90 unsigned long route_adds;
91 unsigned long route_dels;
92
93 unsigned long updates_triggered;
94 unsigned long redundant_triggers;
95 unsigned long non_fpm_table_triggers;
96
97 unsigned long dests_del_after_update;
98
99 unsigned long t_conn_down_starts;
100 unsigned long t_conn_down_dests_processed;
101 unsigned long t_conn_down_yields;
102 unsigned long t_conn_down_finishes;
103
104 unsigned long t_conn_up_starts;
105 unsigned long t_conn_up_dests_processed;
106 unsigned long t_conn_up_yields;
107 unsigned long t_conn_up_aborts;
108 unsigned long t_conn_up_finishes;
109
110 } zfpm_stats_t;
111
112 /*
113 * States for the FPM state machine.
114 */
115 typedef enum {
116
117 /*
118 * In this state we are not yet ready to connect to the FPM. This
119 * can happen when this module is disabled, or if we're cleaning up
120 * after a connection has gone down.
121 */
122 ZFPM_STATE_IDLE,
123
124 /*
125 * Ready to talk to the FPM and periodically trying to connect to
126 * it.
127 */
128 ZFPM_STATE_ACTIVE,
129
130 /*
131 * In the middle of bringing up a TCP connection. Specifically,
132 * waiting for a connect() call to complete asynchronously.
133 */
134 ZFPM_STATE_CONNECTING,
135
136 /*
137 * TCP connection to the FPM is up.
138 */
139 ZFPM_STATE_ESTABLISHED
140
141 } zfpm_state_t;
142
143 /*
144 * Message format to be used to communicate with the FPM.
145 */
146 typedef enum {
147 ZFPM_MSG_FORMAT_NONE,
148 ZFPM_MSG_FORMAT_NETLINK,
149 ZFPM_MSG_FORMAT_PROTOBUF,
150 } zfpm_msg_format_e;
151 /*
152 * Globals.
153 */
154 typedef struct zfpm_glob_t_ {
155
156 /*
157 * True if the FPM module has been enabled.
158 */
159 int enabled;
160
161 /*
162 * Message format to be used to communicate with the fpm.
163 */
164 zfpm_msg_format_e message_format;
165
166 struct thread_master *master;
167
168 zfpm_state_t state;
169
170 in_addr_t fpm_server;
171 /*
172 * Port on which the FPM is running.
173 */
174 int fpm_port;
175
176 /*
177 * List of rib_dest_t structures to be processed
178 */
179 TAILQ_HEAD(zfpm_dest_q, rib_dest_t_) dest_q;
180
181 /*
182 * Stream socket to the FPM.
183 */
184 int sock;
185
186 /*
187 * Buffers for messages to/from the FPM.
188 */
189 struct stream *obuf;
190 struct stream *ibuf;
191
192 /*
193 * Threads for I/O.
194 */
195 struct thread *t_connect;
196 struct thread *t_write;
197 struct thread *t_read;
198
199 /*
200 * Thread to clean up after the TCP connection to the FPM goes down
201 * and the state that belongs to it.
202 */
203 struct thread *t_conn_down;
204
205 struct {
206 zfpm_rnodes_iter_t iter;
207 } t_conn_down_state;
208
209 /*
210 * Thread to take actions once the TCP conn to the FPM comes up, and
211 * the state that belongs to it.
212 */
213 struct thread *t_conn_up;
214
215 struct {
216 zfpm_rnodes_iter_t iter;
217 } t_conn_up_state;
218
219 unsigned long connect_calls;
220 time_t last_connect_call_time;
221
222 /*
223 * Stats from the start of the current statistics interval up to
224 * now. These are the counters we typically update in the code.
225 */
226 zfpm_stats_t stats;
227
228 /*
229 * Statistics that were gathered in the last collection interval.
230 */
231 zfpm_stats_t last_ivl_stats;
232
233 /*
234 * Cumulative stats from the last clear to the start of the current
235 * statistics interval.
236 */
237 zfpm_stats_t cumulative_stats;
238
239 /*
240 * Stats interval timer.
241 */
242 struct thread *t_stats;
243
244 /*
245 * If non-zero, the last time when statistics were cleared.
246 */
247 time_t last_stats_clear_time;
248
249 } zfpm_glob_t;
250
251 static zfpm_glob_t zfpm_glob_space;
252 static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
253
254 static int zfpm_trigger_update(struct route_node *rn, const char *reason);
255
256 static int zfpm_read_cb(struct thread *thread);
257 static int zfpm_write_cb(struct thread *thread);
258
259 static void zfpm_set_state(zfpm_state_t state, const char *reason);
260 static void zfpm_start_connect_timer(const char *reason);
261 static void zfpm_start_stats_timer(void);
262
263 /*
264 * zfpm_thread_should_yield
265 */
266 static inline int zfpm_thread_should_yield(struct thread *t)
267 {
268 return thread_should_yield(t);
269 }
270
271 /*
272 * zfpm_state_to_str
273 */
274 static const char *zfpm_state_to_str(zfpm_state_t state)
275 {
276 switch (state) {
277
278 case ZFPM_STATE_IDLE:
279 return "idle";
280
281 case ZFPM_STATE_ACTIVE:
282 return "active";
283
284 case ZFPM_STATE_CONNECTING:
285 return "connecting";
286
287 case ZFPM_STATE_ESTABLISHED:
288 return "established";
289
290 default:
291 return "unknown";
292 }
293 }
294
295 /*
296 * zfpm_get_elapsed_time
297 *
298 * Returns the time elapsed (in seconds) since the given time.
299 */
300 static time_t zfpm_get_elapsed_time(time_t reference)
301 {
302 time_t now;
303
304 now = monotime(NULL);
305
306 if (now < reference) {
307 assert(0);
308 return 0;
309 }
310
311 return now - reference;
312 }
313
314 /*
315 * zfpm_is_table_for_fpm
316 *
317 * Returns TRUE if the the given table is to be communicated to the
318 * FPM.
319 */
320 static inline int zfpm_is_table_for_fpm(struct route_table *table)
321 {
322 rib_table_info_t *info;
323
324 info = rib_table_info(table);
325
326 /*
327 * We only send the unicast tables in the main instance to the FPM
328 * at this point.
329 */
330 if (zvrf_id(info->zvrf) != 0)
331 return 0;
332
333 if (info->safi != SAFI_UNICAST)
334 return 0;
335
336 return 1;
337 }
338
339 /*
340 * zfpm_rnodes_iter_init
341 */
342 static inline void zfpm_rnodes_iter_init(zfpm_rnodes_iter_t *iter)
343 {
344 memset(iter, 0, sizeof(*iter));
345 rib_tables_iter_init(&iter->tables_iter);
346
347 /*
348 * This is a hack, but it makes implementing 'next' easier by
349 * ensuring that route_table_iter_next() will return NULL the first
350 * time we call it.
351 */
352 route_table_iter_init(&iter->iter, NULL);
353 route_table_iter_cleanup(&iter->iter);
354 }
355
356 /*
357 * zfpm_rnodes_iter_next
358 */
359 static inline struct route_node *zfpm_rnodes_iter_next(zfpm_rnodes_iter_t *iter)
360 {
361 struct route_node *rn;
362 struct route_table *table;
363
364 while (1) {
365 rn = route_table_iter_next(&iter->iter);
366 if (rn)
367 return rn;
368
369 /*
370 * We've made our way through this table, go to the next one.
371 */
372 route_table_iter_cleanup(&iter->iter);
373
374 while ((table = rib_tables_iter_next(&iter->tables_iter))) {
375 if (zfpm_is_table_for_fpm(table))
376 break;
377 }
378
379 if (!table)
380 return NULL;
381
382 route_table_iter_init(&iter->iter, table);
383 }
384
385 return NULL;
386 }
387
388 /*
389 * zfpm_rnodes_iter_pause
390 */
391 static inline void zfpm_rnodes_iter_pause(zfpm_rnodes_iter_t *iter)
392 {
393 route_table_iter_pause(&iter->iter);
394 }
395
396 /*
397 * zfpm_rnodes_iter_cleanup
398 */
399 static inline void zfpm_rnodes_iter_cleanup(zfpm_rnodes_iter_t *iter)
400 {
401 route_table_iter_cleanup(&iter->iter);
402 rib_tables_iter_cleanup(&iter->tables_iter);
403 }
404
405 /*
406 * zfpm_stats_init
407 *
408 * Initialize a statistics block.
409 */
410 static inline void zfpm_stats_init(zfpm_stats_t *stats)
411 {
412 memset(stats, 0, sizeof(*stats));
413 }
414
415 /*
416 * zfpm_stats_reset
417 */
418 static inline void zfpm_stats_reset(zfpm_stats_t *stats)
419 {
420 zfpm_stats_init(stats);
421 }
422
423 /*
424 * zfpm_stats_copy
425 */
426 static inline void zfpm_stats_copy(const zfpm_stats_t *src, zfpm_stats_t *dest)
427 {
428 memcpy(dest, src, sizeof(*dest));
429 }
430
431 /*
432 * zfpm_stats_compose
433 *
434 * Total up the statistics in two stats structures ('s1 and 's2') and
435 * return the result in the third argument, 'result'. Note that the
436 * pointer 'result' may be the same as 's1' or 's2'.
437 *
438 * For simplicity, the implementation below assumes that the stats
439 * structure is composed entirely of counters. This can easily be
440 * changed when necessary.
441 */
442 static void zfpm_stats_compose(const zfpm_stats_t *s1, const zfpm_stats_t *s2,
443 zfpm_stats_t *result)
444 {
445 const unsigned long *p1, *p2;
446 unsigned long *result_p;
447 int i, num_counters;
448
449 p1 = (const unsigned long *)s1;
450 p2 = (const unsigned long *)s2;
451 result_p = (unsigned long *)result;
452
453 num_counters = (sizeof(zfpm_stats_t) / sizeof(unsigned long));
454
455 for (i = 0; i < num_counters; i++) {
456 result_p[i] = p1[i] + p2[i];
457 }
458 }
459
460 /*
461 * zfpm_read_on
462 */
463 static inline void zfpm_read_on(void)
464 {
465 assert(!zfpm_g->t_read);
466 assert(zfpm_g->sock >= 0);
467
468 thread_add_read(zfpm_g->master, zfpm_read_cb, 0, zfpm_g->sock,
469 &zfpm_g->t_read);
470 }
471
472 /*
473 * zfpm_write_on
474 */
475 static inline void zfpm_write_on(void)
476 {
477 assert(!zfpm_g->t_write);
478 assert(zfpm_g->sock >= 0);
479
480 thread_add_write(zfpm_g->master, zfpm_write_cb, 0, zfpm_g->sock,
481 &zfpm_g->t_write);
482 }
483
484 /*
485 * zfpm_read_off
486 */
487 static inline void zfpm_read_off(void)
488 {
489 THREAD_READ_OFF(zfpm_g->t_read);
490 }
491
492 /*
493 * zfpm_write_off
494 */
495 static inline void zfpm_write_off(void)
496 {
497 THREAD_WRITE_OFF(zfpm_g->t_write);
498 }
499
500 /*
501 * zfpm_conn_up_thread_cb
502 *
503 * Callback for actions to be taken when the connection to the FPM
504 * comes up.
505 */
506 static int zfpm_conn_up_thread_cb(struct thread *thread)
507 {
508 struct route_node *rnode;
509 zfpm_rnodes_iter_t *iter;
510 rib_dest_t *dest;
511
512 zfpm_g->t_conn_up = NULL;
513
514 iter = &zfpm_g->t_conn_up_state.iter;
515
516 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED) {
517 zfpm_debug(
518 "Connection not up anymore, conn_up thread aborting");
519 zfpm_g->stats.t_conn_up_aborts++;
520 goto done;
521 }
522
523 while ((rnode = zfpm_rnodes_iter_next(iter))) {
524 dest = rib_dest_from_rnode(rnode);
525
526 if (dest) {
527 zfpm_g->stats.t_conn_up_dests_processed++;
528 zfpm_trigger_update(rnode, NULL);
529 }
530
531 /*
532 * Yield if need be.
533 */
534 if (!zfpm_thread_should_yield(thread))
535 continue;
536
537 zfpm_g->stats.t_conn_up_yields++;
538 zfpm_rnodes_iter_pause(iter);
539 zfpm_g->t_conn_up = NULL;
540 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb,
541 NULL, 0, &zfpm_g->t_conn_up);
542 return 0;
543 }
544
545 zfpm_g->stats.t_conn_up_finishes++;
546
547 done:
548 zfpm_rnodes_iter_cleanup(iter);
549 return 0;
550 }
551
552 /*
553 * zfpm_connection_up
554 *
555 * Called when the connection to the FPM comes up.
556 */
557 static void zfpm_connection_up(const char *detail)
558 {
559 assert(zfpm_g->sock >= 0);
560 zfpm_read_on();
561 zfpm_write_on();
562 zfpm_set_state(ZFPM_STATE_ESTABLISHED, detail);
563
564 /*
565 * Start thread to push existing routes to the FPM.
566 */
567 assert(!zfpm_g->t_conn_up);
568
569 zfpm_rnodes_iter_init(&zfpm_g->t_conn_up_state.iter);
570
571 zfpm_debug("Starting conn_up thread");
572 zfpm_g->t_conn_up = NULL;
573 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0,
574 &zfpm_g->t_conn_up);
575 zfpm_g->stats.t_conn_up_starts++;
576 }
577
578 /*
579 * zfpm_connect_check
580 *
581 * Check if an asynchronous connect() to the FPM is complete.
582 */
583 static void zfpm_connect_check(void)
584 {
585 int status;
586 socklen_t slen;
587 int ret;
588
589 zfpm_read_off();
590 zfpm_write_off();
591
592 slen = sizeof(status);
593 ret = getsockopt(zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *)&status,
594 &slen);
595
596 if (ret >= 0 && status == 0) {
597 zfpm_connection_up("async connect complete");
598 return;
599 }
600
601 /*
602 * getsockopt() failed or indicated an error on the socket.
603 */
604 close(zfpm_g->sock);
605 zfpm_g->sock = -1;
606
607 zfpm_start_connect_timer("getsockopt() after async connect failed");
608 return;
609 }
610
611 /*
612 * zfpm_conn_down_thread_cb
613 *
614 * Callback that is invoked to clean up state after the TCP connection
615 * to the FPM goes down.
616 */
617 static int zfpm_conn_down_thread_cb(struct thread *thread)
618 {
619 struct route_node *rnode;
620 zfpm_rnodes_iter_t *iter;
621 rib_dest_t *dest;
622
623 assert(zfpm_g->state == ZFPM_STATE_IDLE);
624
625 zfpm_g->t_conn_down = NULL;
626
627 iter = &zfpm_g->t_conn_down_state.iter;
628
629 while ((rnode = zfpm_rnodes_iter_next(iter))) {
630 dest = rib_dest_from_rnode(rnode);
631
632 if (dest) {
633 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
634 TAILQ_REMOVE(&zfpm_g->dest_q, dest,
635 fpm_q_entries);
636 }
637
638 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
639 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
640
641 zfpm_g->stats.t_conn_down_dests_processed++;
642
643 /*
644 * Check if the dest should be deleted.
645 */
646 rib_gc_dest(rnode);
647 }
648
649 /*
650 * Yield if need be.
651 */
652 if (!zfpm_thread_should_yield(thread))
653 continue;
654
655 zfpm_g->stats.t_conn_down_yields++;
656 zfpm_rnodes_iter_pause(iter);
657 zfpm_g->t_conn_down = NULL;
658 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb,
659 NULL, 0, &zfpm_g->t_conn_down);
660 return 0;
661 }
662
663 zfpm_g->stats.t_conn_down_finishes++;
664 zfpm_rnodes_iter_cleanup(iter);
665
666 /*
667 * Start the process of connecting to the FPM again.
668 */
669 zfpm_start_connect_timer("cleanup complete");
670 return 0;
671 }
672
673 /*
674 * zfpm_connection_down
675 *
676 * Called when the connection to the FPM has gone down.
677 */
678 static void zfpm_connection_down(const char *detail)
679 {
680 if (!detail)
681 detail = "unknown";
682
683 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
684
685 zlog_info("connection to the FPM has gone down: %s", detail);
686
687 zfpm_read_off();
688 zfpm_write_off();
689
690 stream_reset(zfpm_g->ibuf);
691 stream_reset(zfpm_g->obuf);
692
693 if (zfpm_g->sock >= 0) {
694 close(zfpm_g->sock);
695 zfpm_g->sock = -1;
696 }
697
698 /*
699 * Start thread to clean up state after the connection goes down.
700 */
701 assert(!zfpm_g->t_conn_down);
702 zfpm_debug("Starting conn_down thread");
703 zfpm_rnodes_iter_init(&zfpm_g->t_conn_down_state.iter);
704 zfpm_g->t_conn_down = NULL;
705 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0,
706 &zfpm_g->t_conn_down);
707 zfpm_g->stats.t_conn_down_starts++;
708
709 zfpm_set_state(ZFPM_STATE_IDLE, detail);
710 }
711
712 /*
713 * zfpm_read_cb
714 */
715 static int zfpm_read_cb(struct thread *thread)
716 {
717 size_t already;
718 struct stream *ibuf;
719 uint16_t msg_len;
720 fpm_msg_hdr_t *hdr;
721
722 zfpm_g->stats.read_cb_calls++;
723 zfpm_g->t_read = NULL;
724
725 /*
726 * Check if async connect is now done.
727 */
728 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
729 zfpm_connect_check();
730 return 0;
731 }
732
733 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
734 assert(zfpm_g->sock >= 0);
735
736 ibuf = zfpm_g->ibuf;
737
738 already = stream_get_endp(ibuf);
739 if (already < FPM_MSG_HDR_LEN) {
740 ssize_t nbyte;
741
742 nbyte = stream_read_try(ibuf, zfpm_g->sock,
743 FPM_MSG_HDR_LEN - already);
744 if (nbyte == 0 || nbyte == -1) {
745 zfpm_connection_down("closed socket in read");
746 return 0;
747 }
748
749 if (nbyte != (ssize_t)(FPM_MSG_HDR_LEN - already))
750 goto done;
751
752 already = FPM_MSG_HDR_LEN;
753 }
754
755 stream_set_getp(ibuf, 0);
756
757 hdr = (fpm_msg_hdr_t *)stream_pnt(ibuf);
758
759 if (!fpm_msg_hdr_ok(hdr)) {
760 zfpm_connection_down("invalid message header");
761 return 0;
762 }
763
764 msg_len = fpm_msg_len(hdr);
765
766 /*
767 * Read out the rest of the packet.
768 */
769 if (already < msg_len) {
770 ssize_t nbyte;
771
772 nbyte = stream_read_try(ibuf, zfpm_g->sock, msg_len - already);
773
774 if (nbyte == 0 || nbyte == -1) {
775 zfpm_connection_down("failed to read message");
776 return 0;
777 }
778
779 if (nbyte != (ssize_t)(msg_len - already))
780 goto done;
781 }
782
783 zfpm_debug("Read out a full fpm message");
784
785 /*
786 * Just throw it away for now.
787 */
788 stream_reset(ibuf);
789
790 done:
791 zfpm_read_on();
792 return 0;
793 }
794
795 /*
796 * zfpm_writes_pending
797 *
798 * Returns TRUE if we may have something to write to the FPM.
799 */
800 static int zfpm_writes_pending(void)
801 {
802
803 /*
804 * Check if there is any data in the outbound buffer that has not
805 * been written to the socket yet.
806 */
807 if (stream_get_endp(zfpm_g->obuf) - stream_get_getp(zfpm_g->obuf))
808 return 1;
809
810 /*
811 * Check if there are any prefixes on the outbound queue.
812 */
813 if (!TAILQ_EMPTY(&zfpm_g->dest_q))
814 return 1;
815
816 return 0;
817 }
818
819 /*
820 * zfpm_encode_route
821 *
822 * Encode a message to the FPM with information about the given route.
823 *
824 * Returns the number of bytes written to the buffer. 0 or a negative
825 * value indicates an error.
826 */
827 static inline int zfpm_encode_route(rib_dest_t *dest, struct route_entry *re,
828 char *in_buf, size_t in_buf_len,
829 fpm_msg_type_e *msg_type)
830 {
831 size_t len;
832 #ifdef HAVE_NETLINK
833 int cmd;
834 #endif
835 len = 0;
836
837 *msg_type = FPM_MSG_TYPE_NONE;
838
839 switch (zfpm_g->message_format) {
840
841 case ZFPM_MSG_FORMAT_PROTOBUF:
842 #ifdef HAVE_PROTOBUF
843 len = zfpm_protobuf_encode_route(dest, re, (uint8_t *)in_buf,
844 in_buf_len);
845 *msg_type = FPM_MSG_TYPE_PROTOBUF;
846 #endif
847 break;
848
849 case ZFPM_MSG_FORMAT_NETLINK:
850 #ifdef HAVE_NETLINK
851 *msg_type = FPM_MSG_TYPE_NETLINK;
852 cmd = re ? RTM_NEWROUTE : RTM_DELROUTE;
853 len = zfpm_netlink_encode_route(cmd, dest, re, in_buf,
854 in_buf_len);
855 assert(fpm_msg_align(len) == len);
856 *msg_type = FPM_MSG_TYPE_NETLINK;
857 #endif /* HAVE_NETLINK */
858 break;
859
860 default:
861 break;
862 }
863
864 return len;
865 }
866
867 /*
868 * zfpm_route_for_update
869 *
870 * Returns the re that is to be sent to the FPM for a given dest.
871 */
872 struct route_entry *zfpm_route_for_update(rib_dest_t *dest)
873 {
874 struct route_entry *re;
875
876 RE_DEST_FOREACH_ROUTE (dest, re) {
877 if (!CHECK_FLAG(re->status, ROUTE_ENTRY_SELECTED_FIB))
878 continue;
879
880 return re;
881 }
882
883 /*
884 * We have no route for this destination.
885 */
886 return NULL;
887 }
888
889 /*
890 * zfpm_build_updates
891 *
892 * Process the outgoing queue and write messages to the outbound
893 * buffer.
894 */
895 static void zfpm_build_updates(void)
896 {
897 struct stream *s;
898 rib_dest_t *dest;
899 unsigned char *buf, *data, *buf_end;
900 size_t msg_len;
901 size_t data_len;
902 fpm_msg_hdr_t *hdr;
903 struct route_entry *re;
904 int is_add, write_msg;
905 fpm_msg_type_e msg_type;
906
907 s = zfpm_g->obuf;
908
909 assert(stream_empty(s));
910
911 do {
912
913 /*
914 * Make sure there is enough space to write another message.
915 */
916 if (STREAM_WRITEABLE(s) < FPM_MAX_MSG_LEN)
917 break;
918
919 buf = STREAM_DATA(s) + stream_get_endp(s);
920 buf_end = buf + STREAM_WRITEABLE(s);
921
922 dest = TAILQ_FIRST(&zfpm_g->dest_q);
923 if (!dest)
924 break;
925
926 assert(CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM));
927
928 hdr = (fpm_msg_hdr_t *)buf;
929 hdr->version = FPM_PROTO_VERSION;
930
931 data = fpm_msg_data(hdr);
932
933 re = zfpm_route_for_update(dest);
934 is_add = re ? 1 : 0;
935
936 write_msg = 1;
937
938 /*
939 * If this is a route deletion, and we have not sent the route
940 * to
941 * the FPM previously, skip it.
942 */
943 if (!is_add && !CHECK_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM)) {
944 write_msg = 0;
945 zfpm_g->stats.nop_deletes_skipped++;
946 }
947
948 if (write_msg) {
949 data_len = zfpm_encode_route(dest, re, (char *)data,
950 buf_end - data, &msg_type);
951
952 assert(data_len);
953 if (data_len) {
954 hdr->msg_type = msg_type;
955 msg_len = fpm_data_len_to_msg_len(data_len);
956 hdr->msg_len = htons(msg_len);
957 stream_forward_endp(s, msg_len);
958
959 if (is_add)
960 zfpm_g->stats.route_adds++;
961 else
962 zfpm_g->stats.route_dels++;
963 }
964 }
965
966 /*
967 * Remove the dest from the queue, and reset the flag.
968 */
969 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
970 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
971
972 if (is_add) {
973 SET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
974 } else {
975 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
976 }
977
978 /*
979 * Delete the destination if necessary.
980 */
981 if (rib_gc_dest(dest->rnode))
982 zfpm_g->stats.dests_del_after_update++;
983
984 } while (1);
985 }
986
987 /*
988 * zfpm_write_cb
989 */
990 static int zfpm_write_cb(struct thread *thread)
991 {
992 struct stream *s;
993 int num_writes;
994
995 zfpm_g->stats.write_cb_calls++;
996 zfpm_g->t_write = NULL;
997
998 /*
999 * Check if async connect is now done.
1000 */
1001 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
1002 zfpm_connect_check();
1003 return 0;
1004 }
1005
1006 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1007 assert(zfpm_g->sock >= 0);
1008
1009 num_writes = 0;
1010
1011 do {
1012 int bytes_to_write, bytes_written;
1013
1014 s = zfpm_g->obuf;
1015
1016 /*
1017 * If the stream is empty, try fill it up with data.
1018 */
1019 if (stream_empty(s)) {
1020 zfpm_build_updates();
1021 }
1022
1023 bytes_to_write = stream_get_endp(s) - stream_get_getp(s);
1024 if (!bytes_to_write)
1025 break;
1026
1027 bytes_written =
1028 write(zfpm_g->sock, STREAM_PNT(s), bytes_to_write);
1029 zfpm_g->stats.write_calls++;
1030 num_writes++;
1031
1032 if (bytes_written < 0) {
1033 if (ERRNO_IO_RETRY(errno))
1034 break;
1035
1036 zfpm_connection_down("failed to write to socket");
1037 return 0;
1038 }
1039
1040 if (bytes_written != bytes_to_write) {
1041
1042 /*
1043 * Partial write.
1044 */
1045 stream_forward_getp(s, bytes_written);
1046 zfpm_g->stats.partial_writes++;
1047 break;
1048 }
1049
1050 /*
1051 * We've written out the entire contents of the stream.
1052 */
1053 stream_reset(s);
1054
1055 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN) {
1056 zfpm_g->stats.max_writes_hit++;
1057 break;
1058 }
1059
1060 if (zfpm_thread_should_yield(thread)) {
1061 zfpm_g->stats.t_write_yields++;
1062 break;
1063 }
1064 } while (1);
1065
1066 if (zfpm_writes_pending())
1067 zfpm_write_on();
1068
1069 return 0;
1070 }
1071
1072 /*
1073 * zfpm_connect_cb
1074 */
1075 static int zfpm_connect_cb(struct thread *t)
1076 {
1077 int sock, ret;
1078 struct sockaddr_in serv;
1079
1080 zfpm_g->t_connect = NULL;
1081 assert(zfpm_g->state == ZFPM_STATE_ACTIVE);
1082
1083 sock = socket(AF_INET, SOCK_STREAM, 0);
1084 if (sock < 0) {
1085 zfpm_debug("Failed to create socket for connect(): %s",
1086 strerror(errno));
1087 zfpm_g->stats.connect_no_sock++;
1088 return 0;
1089 }
1090
1091 set_nonblocking(sock);
1092
1093 /* Make server socket. */
1094 memset(&serv, 0, sizeof(serv));
1095 serv.sin_family = AF_INET;
1096 serv.sin_port = htons(zfpm_g->fpm_port);
1097 #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1098 serv.sin_len = sizeof(struct sockaddr_in);
1099 #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1100 if (!zfpm_g->fpm_server)
1101 serv.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1102 else
1103 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1104
1105 /*
1106 * Connect to the FPM.
1107 */
1108 zfpm_g->connect_calls++;
1109 zfpm_g->stats.connect_calls++;
1110 zfpm_g->last_connect_call_time = monotime(NULL);
1111
1112 ret = connect(sock, (struct sockaddr *)&serv, sizeof(serv));
1113 if (ret >= 0) {
1114 zfpm_g->sock = sock;
1115 zfpm_connection_up("connect succeeded");
1116 return 1;
1117 }
1118
1119 if (errno == EINPROGRESS) {
1120 zfpm_g->sock = sock;
1121 zfpm_read_on();
1122 zfpm_write_on();
1123 zfpm_set_state(ZFPM_STATE_CONNECTING,
1124 "async connect in progress");
1125 return 0;
1126 }
1127
1128 zlog_info("can't connect to FPM %d: %s", sock, safe_strerror(errno));
1129 close(sock);
1130
1131 /*
1132 * Restart timer for retrying connection.
1133 */
1134 zfpm_start_connect_timer("connect() failed");
1135 return 0;
1136 }
1137
1138 /*
1139 * zfpm_set_state
1140 *
1141 * Move state machine into the given state.
1142 */
1143 static void zfpm_set_state(zfpm_state_t state, const char *reason)
1144 {
1145 zfpm_state_t cur_state = zfpm_g->state;
1146
1147 if (!reason)
1148 reason = "Unknown";
1149
1150 if (state == cur_state)
1151 return;
1152
1153 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1154 zfpm_state_to_str(cur_state), zfpm_state_to_str(state),
1155 reason);
1156
1157 switch (state) {
1158
1159 case ZFPM_STATE_IDLE:
1160 assert(cur_state == ZFPM_STATE_ESTABLISHED);
1161 break;
1162
1163 case ZFPM_STATE_ACTIVE:
1164 assert(cur_state == ZFPM_STATE_IDLE
1165 || cur_state == ZFPM_STATE_CONNECTING);
1166 assert(zfpm_g->t_connect);
1167 break;
1168
1169 case ZFPM_STATE_CONNECTING:
1170 assert(zfpm_g->sock);
1171 assert(cur_state == ZFPM_STATE_ACTIVE);
1172 assert(zfpm_g->t_read);
1173 assert(zfpm_g->t_write);
1174 break;
1175
1176 case ZFPM_STATE_ESTABLISHED:
1177 assert(cur_state == ZFPM_STATE_ACTIVE
1178 || cur_state == ZFPM_STATE_CONNECTING);
1179 assert(zfpm_g->sock);
1180 assert(zfpm_g->t_read);
1181 assert(zfpm_g->t_write);
1182 break;
1183 }
1184
1185 zfpm_g->state = state;
1186 }
1187
1188 /*
1189 * zfpm_calc_connect_delay
1190 *
1191 * Returns the number of seconds after which we should attempt to
1192 * reconnect to the FPM.
1193 */
1194 static long zfpm_calc_connect_delay(void)
1195 {
1196 time_t elapsed;
1197
1198 /*
1199 * Return 0 if this is our first attempt to connect.
1200 */
1201 if (zfpm_g->connect_calls == 0) {
1202 return 0;
1203 }
1204
1205 elapsed = zfpm_get_elapsed_time(zfpm_g->last_connect_call_time);
1206
1207 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1208 return 0;
1209 }
1210
1211 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1212 }
1213
1214 /*
1215 * zfpm_start_connect_timer
1216 */
1217 static void zfpm_start_connect_timer(const char *reason)
1218 {
1219 long delay_secs;
1220
1221 assert(!zfpm_g->t_connect);
1222 assert(zfpm_g->sock < 0);
1223
1224 assert(zfpm_g->state == ZFPM_STATE_IDLE
1225 || zfpm_g->state == ZFPM_STATE_ACTIVE
1226 || zfpm_g->state == ZFPM_STATE_CONNECTING);
1227
1228 delay_secs = zfpm_calc_connect_delay();
1229 zfpm_debug("scheduling connect in %ld seconds", delay_secs);
1230
1231 thread_add_timer(zfpm_g->master, zfpm_connect_cb, 0, delay_secs,
1232 &zfpm_g->t_connect);
1233 zfpm_set_state(ZFPM_STATE_ACTIVE, reason);
1234 }
1235
1236 /*
1237 * zfpm_is_enabled
1238 *
1239 * Returns TRUE if the zebra FPM module has been enabled.
1240 */
1241 static inline int zfpm_is_enabled(void)
1242 {
1243 return zfpm_g->enabled;
1244 }
1245
1246 /*
1247 * zfpm_conn_is_up
1248 *
1249 * Returns TRUE if the connection to the FPM is up.
1250 */
1251 static inline int zfpm_conn_is_up(void)
1252 {
1253 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1254 return 0;
1255
1256 assert(zfpm_g->sock >= 0);
1257
1258 return 1;
1259 }
1260
1261 /*
1262 * zfpm_trigger_update
1263 *
1264 * The zebra code invokes this function to indicate that we should
1265 * send an update to the FPM about the given route_node.
1266 */
1267 static int zfpm_trigger_update(struct route_node *rn, const char *reason)
1268 {
1269 rib_dest_t *dest;
1270 char buf[PREFIX_STRLEN];
1271
1272 /*
1273 * Ignore if the connection is down. We will update the FPM about
1274 * all destinations once the connection comes up.
1275 */
1276 if (!zfpm_conn_is_up())
1277 return 0;
1278
1279 dest = rib_dest_from_rnode(rn);
1280
1281 /*
1282 * Ignore the trigger if the dest is not in a table that we would
1283 * send to the FPM.
1284 */
1285 if (!zfpm_is_table_for_fpm(rib_dest_table(dest))) {
1286 zfpm_g->stats.non_fpm_table_triggers++;
1287 return 0;
1288 }
1289
1290 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
1291 zfpm_g->stats.redundant_triggers++;
1292 return 0;
1293 }
1294
1295 if (reason) {
1296 zfpm_debug("%s triggering update to FPM - Reason: %s",
1297 prefix2str(&rn->p, buf, sizeof(buf)), reason);
1298 }
1299
1300 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1301 TAILQ_INSERT_TAIL(&zfpm_g->dest_q, dest, fpm_q_entries);
1302 zfpm_g->stats.updates_triggered++;
1303
1304 /*
1305 * Make sure that writes are enabled.
1306 */
1307 if (zfpm_g->t_write)
1308 return 0;
1309
1310 zfpm_write_on();
1311 return 0;
1312 }
1313
1314 /*
1315 * zfpm_stats_timer_cb
1316 */
1317 static int zfpm_stats_timer_cb(struct thread *t)
1318 {
1319 zfpm_g->t_stats = NULL;
1320
1321 /*
1322 * Remember the stats collected in the last interval for display
1323 * purposes.
1324 */
1325 zfpm_stats_copy(&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1326
1327 /*
1328 * Add the current set of stats into the cumulative statistics.
1329 */
1330 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1331 &zfpm_g->cumulative_stats);
1332
1333 /*
1334 * Start collecting stats afresh over the next interval.
1335 */
1336 zfpm_stats_reset(&zfpm_g->stats);
1337
1338 zfpm_start_stats_timer();
1339
1340 return 0;
1341 }
1342
1343 /*
1344 * zfpm_stop_stats_timer
1345 */
1346 static void zfpm_stop_stats_timer(void)
1347 {
1348 if (!zfpm_g->t_stats)
1349 return;
1350
1351 zfpm_debug("Stopping existing stats timer");
1352 THREAD_TIMER_OFF(zfpm_g->t_stats);
1353 }
1354
1355 /*
1356 * zfpm_start_stats_timer
1357 */
1358 void zfpm_start_stats_timer(void)
1359 {
1360 assert(!zfpm_g->t_stats);
1361
1362 thread_add_timer(zfpm_g->master, zfpm_stats_timer_cb, 0,
1363 ZFPM_STATS_IVL_SECS, &zfpm_g->t_stats);
1364 }
1365
1366 /*
1367 * Helper macro for zfpm_show_stats() below.
1368 */
1369 #define ZFPM_SHOW_STAT(counter) \
1370 do { \
1371 vty_out(vty, "%-40s %10lu %16lu\n", #counter, \
1372 total_stats.counter, zfpm_g->last_ivl_stats.counter); \
1373 } while (0)
1374
1375 /*
1376 * zfpm_show_stats
1377 */
1378 static void zfpm_show_stats(struct vty *vty)
1379 {
1380 zfpm_stats_t total_stats;
1381 time_t elapsed;
1382
1383 vty_out(vty, "\n%-40s %10s Last %2d secs\n\n", "Counter", "Total",
1384 ZFPM_STATS_IVL_SECS);
1385
1386 /*
1387 * Compute the total stats up to this instant.
1388 */
1389 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1390 &total_stats);
1391
1392 ZFPM_SHOW_STAT(connect_calls);
1393 ZFPM_SHOW_STAT(connect_no_sock);
1394 ZFPM_SHOW_STAT(read_cb_calls);
1395 ZFPM_SHOW_STAT(write_cb_calls);
1396 ZFPM_SHOW_STAT(write_calls);
1397 ZFPM_SHOW_STAT(partial_writes);
1398 ZFPM_SHOW_STAT(max_writes_hit);
1399 ZFPM_SHOW_STAT(t_write_yields);
1400 ZFPM_SHOW_STAT(nop_deletes_skipped);
1401 ZFPM_SHOW_STAT(route_adds);
1402 ZFPM_SHOW_STAT(route_dels);
1403 ZFPM_SHOW_STAT(updates_triggered);
1404 ZFPM_SHOW_STAT(non_fpm_table_triggers);
1405 ZFPM_SHOW_STAT(redundant_triggers);
1406 ZFPM_SHOW_STAT(dests_del_after_update);
1407 ZFPM_SHOW_STAT(t_conn_down_starts);
1408 ZFPM_SHOW_STAT(t_conn_down_dests_processed);
1409 ZFPM_SHOW_STAT(t_conn_down_yields);
1410 ZFPM_SHOW_STAT(t_conn_down_finishes);
1411 ZFPM_SHOW_STAT(t_conn_up_starts);
1412 ZFPM_SHOW_STAT(t_conn_up_dests_processed);
1413 ZFPM_SHOW_STAT(t_conn_up_yields);
1414 ZFPM_SHOW_STAT(t_conn_up_aborts);
1415 ZFPM_SHOW_STAT(t_conn_up_finishes);
1416
1417 if (!zfpm_g->last_stats_clear_time)
1418 return;
1419
1420 elapsed = zfpm_get_elapsed_time(zfpm_g->last_stats_clear_time);
1421
1422 vty_out(vty, "\nStats were cleared %lu seconds ago\n",
1423 (unsigned long)elapsed);
1424 }
1425
1426 /*
1427 * zfpm_clear_stats
1428 */
1429 static void zfpm_clear_stats(struct vty *vty)
1430 {
1431 if (!zfpm_is_enabled()) {
1432 vty_out(vty, "The FPM module is not enabled...\n");
1433 return;
1434 }
1435
1436 zfpm_stats_reset(&zfpm_g->stats);
1437 zfpm_stats_reset(&zfpm_g->last_ivl_stats);
1438 zfpm_stats_reset(&zfpm_g->cumulative_stats);
1439
1440 zfpm_stop_stats_timer();
1441 zfpm_start_stats_timer();
1442
1443 zfpm_g->last_stats_clear_time = monotime(NULL);
1444
1445 vty_out(vty, "Cleared FPM stats\n");
1446 }
1447
1448 /*
1449 * show_zebra_fpm_stats
1450 */
1451 DEFUN (show_zebra_fpm_stats,
1452 show_zebra_fpm_stats_cmd,
1453 "show zebra fpm stats",
1454 SHOW_STR
1455 "Zebra information\n"
1456 "Forwarding Path Manager information\n"
1457 "Statistics\n")
1458 {
1459 zfpm_show_stats(vty);
1460 return CMD_SUCCESS;
1461 }
1462
1463 /*
1464 * clear_zebra_fpm_stats
1465 */
1466 DEFUN (clear_zebra_fpm_stats,
1467 clear_zebra_fpm_stats_cmd,
1468 "clear zebra fpm stats",
1469 CLEAR_STR
1470 "Zebra information\n"
1471 "Clear Forwarding Path Manager information\n"
1472 "Statistics\n")
1473 {
1474 zfpm_clear_stats(vty);
1475 return CMD_SUCCESS;
1476 }
1477
1478 /*
1479 * update fpm connection information
1480 */
1481 DEFUN ( fpm_remote_ip,
1482 fpm_remote_ip_cmd,
1483 "fpm connection ip A.B.C.D port (1-65535)",
1484 "fpm connection remote ip and port\n"
1485 "Remote fpm server ip A.B.C.D\n"
1486 "Enter ip ")
1487 {
1488
1489 in_addr_t fpm_server;
1490 uint32_t port_no;
1491
1492 fpm_server = inet_addr(argv[3]->arg);
1493 if (fpm_server == INADDR_NONE)
1494 return CMD_ERR_INCOMPLETE;
1495
1496 port_no = atoi(argv[5]->arg);
1497 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1498 return CMD_ERR_INCOMPLETE;
1499
1500 zfpm_g->fpm_server = fpm_server;
1501 zfpm_g->fpm_port = port_no;
1502
1503
1504 return CMD_SUCCESS;
1505 }
1506
1507 DEFUN ( no_fpm_remote_ip,
1508 no_fpm_remote_ip_cmd,
1509 "no fpm connection ip A.B.C.D port (1-65535)",
1510 "fpm connection remote ip and port\n"
1511 "Connection\n"
1512 "Remote fpm server ip A.B.C.D\n"
1513 "Enter ip ")
1514 {
1515 if (zfpm_g->fpm_server != inet_addr(argv[4]->arg)
1516 || zfpm_g->fpm_port != atoi(argv[6]->arg))
1517 return CMD_ERR_NO_MATCH;
1518
1519 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1520 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1521
1522 return CMD_SUCCESS;
1523 }
1524
1525 /*
1526 * zfpm_init_message_format
1527 */
1528 static inline void zfpm_init_message_format(const char *format)
1529 {
1530 int have_netlink, have_protobuf;
1531
1532 #ifdef HAVE_NETLINK
1533 have_netlink = 1;
1534 #else
1535 have_netlink = 0;
1536 #endif
1537
1538 #ifdef HAVE_PROTOBUF
1539 have_protobuf = 1;
1540 #else
1541 have_protobuf = 0;
1542 #endif
1543
1544 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
1545
1546 if (!format) {
1547 if (have_netlink) {
1548 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1549 } else if (have_protobuf) {
1550 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1551 }
1552 return;
1553 }
1554
1555 if (!strcmp("netlink", format)) {
1556 if (!have_netlink) {
1557 zlog_err("FPM netlink message format is not available");
1558 return;
1559 }
1560 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1561 return;
1562 }
1563
1564 if (!strcmp("protobuf", format)) {
1565 if (!have_protobuf) {
1566 zlog_err(
1567 "FPM protobuf message format is not available");
1568 return;
1569 }
1570 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1571 return;
1572 }
1573
1574 zlog_warn("Unknown fpm format '%s'", format);
1575 }
1576
1577 /**
1578 * fpm_remote_srv_write
1579 *
1580 * Module to write remote fpm connection
1581 *
1582 * Returns ZERO on success.
1583 */
1584
1585 static int fpm_remote_srv_write(struct vty *vty)
1586 {
1587 struct in_addr in;
1588
1589 in.s_addr = zfpm_g->fpm_server;
1590
1591 if ((zfpm_g->fpm_server != FPM_DEFAULT_IP
1592 && zfpm_g->fpm_server != INADDR_ANY)
1593 || (zfpm_g->fpm_port != FPM_DEFAULT_PORT
1594 && zfpm_g->fpm_port != 0))
1595 vty_out(vty, "fpm connection ip %s port %d\n", inet_ntoa(in),
1596 zfpm_g->fpm_port);
1597
1598 return 0;
1599 }
1600
1601
1602 /* Zebra node */
1603 static struct cmd_node zebra_node = {ZEBRA_NODE, "", 1};
1604
1605
1606 /**
1607 * zfpm_init
1608 *
1609 * One-time initialization of the Zebra FPM module.
1610 *
1611 * @param[in] port port at which FPM is running.
1612 * @param[in] enable TRUE if the zebra FPM module should be enabled
1613 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
1614 *
1615 * Returns TRUE on success.
1616 */
1617 static int zfpm_init(struct thread_master *master)
1618 {
1619 int enable = 1;
1620 uint16_t port = 0;
1621 const char *format = THIS_MODULE->load_args;
1622
1623 memset(zfpm_g, 0, sizeof(*zfpm_g));
1624 zfpm_g->master = master;
1625 TAILQ_INIT(&zfpm_g->dest_q);
1626 zfpm_g->sock = -1;
1627 zfpm_g->state = ZFPM_STATE_IDLE;
1628
1629 zfpm_stats_init(&zfpm_g->stats);
1630 zfpm_stats_init(&zfpm_g->last_ivl_stats);
1631 zfpm_stats_init(&zfpm_g->cumulative_stats);
1632
1633 install_node(&zebra_node, fpm_remote_srv_write);
1634 install_element(ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1635 install_element(ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1636 install_element(CONFIG_NODE, &fpm_remote_ip_cmd);
1637 install_element(CONFIG_NODE, &no_fpm_remote_ip_cmd);
1638
1639 zfpm_init_message_format(format);
1640
1641 /*
1642 * Disable FPM interface if no suitable format is available.
1643 */
1644 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1645 enable = 0;
1646
1647 zfpm_g->enabled = enable;
1648
1649 if (!zfpm_g->fpm_server)
1650 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1651
1652 if (!port)
1653 port = FPM_DEFAULT_PORT;
1654
1655 zfpm_g->fpm_port = port;
1656
1657 zfpm_g->obuf = stream_new(ZFPM_OBUF_SIZE);
1658 zfpm_g->ibuf = stream_new(ZFPM_IBUF_SIZE);
1659
1660 zfpm_start_stats_timer();
1661 zfpm_start_connect_timer("initialized");
1662 return 0;
1663 }
1664
1665 static int zebra_fpm_module_init(void)
1666 {
1667 hook_register(rib_update, zfpm_trigger_update);
1668 hook_register(frr_late_init, zfpm_init);
1669 return 0;
1670 }
1671
1672 FRR_MODULE_SETUP(.name = "zebra_fpm", .version = FRR_VERSION,
1673 .description = "zebra FPM (Forwarding Plane Manager) module",
1674 .init = zebra_fpm_module_init, )