]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_fpm.c
Merge pull request #1507 from donaldsharp/bgp_af_open
[mirror_frr.git] / zebra / zebra_fpm.c
1 /*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; see the file COPYING; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 */
23
24 #include <zebra.h>
25
26 #include "log.h"
27 #include "libfrr.h"
28 #include "stream.h"
29 #include "thread.h"
30 #include "network.h"
31 #include "command.h"
32 #include "version.h"
33
34 #include "zebra/rib.h"
35 #include "zebra/zserv.h"
36 #include "zebra/zebra_ns.h"
37 #include "zebra/zebra_vrf.h"
38
39 #include "fpm/fpm.h"
40 #include "zebra_fpm_private.h"
41
42 /*
43 * Interval at which we attempt to connect to the FPM.
44 */
45 #define ZFPM_CONNECT_RETRY_IVL 5
46
47 /*
48 * Sizes of outgoing and incoming stream buffers for writing/reading
49 * FPM messages.
50 */
51 #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
52 #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
53
54 /*
55 * The maximum number of times the FPM socket write callback can call
56 * 'write' before it yields.
57 */
58 #define ZFPM_MAX_WRITES_PER_RUN 10
59
60 /*
61 * Interval over which we collect statistics.
62 */
63 #define ZFPM_STATS_IVL_SECS 10
64
65 /*
66 * Structure that holds state for iterating over all route_node
67 * structures that are candidates for being communicated to the FPM.
68 */
69 typedef struct zfpm_rnodes_iter_t_ {
70 rib_tables_iter_t tables_iter;
71 route_table_iter_t iter;
72 } zfpm_rnodes_iter_t;
73
74 /*
75 * Statistics.
76 */
77 typedef struct zfpm_stats_t_ {
78 unsigned long connect_calls;
79 unsigned long connect_no_sock;
80
81 unsigned long read_cb_calls;
82
83 unsigned long write_cb_calls;
84 unsigned long write_calls;
85 unsigned long partial_writes;
86 unsigned long max_writes_hit;
87 unsigned long t_write_yields;
88
89 unsigned long nop_deletes_skipped;
90 unsigned long route_adds;
91 unsigned long route_dels;
92
93 unsigned long updates_triggered;
94 unsigned long redundant_triggers;
95
96 unsigned long dests_del_after_update;
97
98 unsigned long t_conn_down_starts;
99 unsigned long t_conn_down_dests_processed;
100 unsigned long t_conn_down_yields;
101 unsigned long t_conn_down_finishes;
102
103 unsigned long t_conn_up_starts;
104 unsigned long t_conn_up_dests_processed;
105 unsigned long t_conn_up_yields;
106 unsigned long t_conn_up_aborts;
107 unsigned long t_conn_up_finishes;
108
109 } zfpm_stats_t;
110
111 /*
112 * States for the FPM state machine.
113 */
114 typedef enum {
115
116 /*
117 * In this state we are not yet ready to connect to the FPM. This
118 * can happen when this module is disabled, or if we're cleaning up
119 * after a connection has gone down.
120 */
121 ZFPM_STATE_IDLE,
122
123 /*
124 * Ready to talk to the FPM and periodically trying to connect to
125 * it.
126 */
127 ZFPM_STATE_ACTIVE,
128
129 /*
130 * In the middle of bringing up a TCP connection. Specifically,
131 * waiting for a connect() call to complete asynchronously.
132 */
133 ZFPM_STATE_CONNECTING,
134
135 /*
136 * TCP connection to the FPM is up.
137 */
138 ZFPM_STATE_ESTABLISHED
139
140 } zfpm_state_t;
141
142 /*
143 * Message format to be used to communicate with the FPM.
144 */
145 typedef enum {
146 ZFPM_MSG_FORMAT_NONE,
147 ZFPM_MSG_FORMAT_NETLINK,
148 ZFPM_MSG_FORMAT_PROTOBUF,
149 } zfpm_msg_format_e;
150 /*
151 * Globals.
152 */
153 typedef struct zfpm_glob_t_ {
154
155 /*
156 * True if the FPM module has been enabled.
157 */
158 int enabled;
159
160 /*
161 * Message format to be used to communicate with the fpm.
162 */
163 zfpm_msg_format_e message_format;
164
165 struct thread_master *master;
166
167 zfpm_state_t state;
168
169 in_addr_t fpm_server;
170 /*
171 * Port on which the FPM is running.
172 */
173 int fpm_port;
174
175 /*
176 * List of rib_dest_t structures to be processed
177 */
178 TAILQ_HEAD(zfpm_dest_q, rib_dest_t_) dest_q;
179
180 /*
181 * Stream socket to the FPM.
182 */
183 int sock;
184
185 /*
186 * Buffers for messages to/from the FPM.
187 */
188 struct stream *obuf;
189 struct stream *ibuf;
190
191 /*
192 * Threads for I/O.
193 */
194 struct thread *t_connect;
195 struct thread *t_write;
196 struct thread *t_read;
197
198 /*
199 * Thread to clean up after the TCP connection to the FPM goes down
200 * and the state that belongs to it.
201 */
202 struct thread *t_conn_down;
203
204 struct {
205 zfpm_rnodes_iter_t iter;
206 } t_conn_down_state;
207
208 /*
209 * Thread to take actions once the TCP conn to the FPM comes up, and
210 * the state that belongs to it.
211 */
212 struct thread *t_conn_up;
213
214 struct {
215 zfpm_rnodes_iter_t iter;
216 } t_conn_up_state;
217
218 unsigned long connect_calls;
219 time_t last_connect_call_time;
220
221 /*
222 * Stats from the start of the current statistics interval up to
223 * now. These are the counters we typically update in the code.
224 */
225 zfpm_stats_t stats;
226
227 /*
228 * Statistics that were gathered in the last collection interval.
229 */
230 zfpm_stats_t last_ivl_stats;
231
232 /*
233 * Cumulative stats from the last clear to the start of the current
234 * statistics interval.
235 */
236 zfpm_stats_t cumulative_stats;
237
238 /*
239 * Stats interval timer.
240 */
241 struct thread *t_stats;
242
243 /*
244 * If non-zero, the last time when statistics were cleared.
245 */
246 time_t last_stats_clear_time;
247
248 } zfpm_glob_t;
249
250 static zfpm_glob_t zfpm_glob_space;
251 static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
252
253 static int zfpm_trigger_update(struct route_node *rn, const char *reason);
254
255 static int zfpm_read_cb(struct thread *thread);
256 static int zfpm_write_cb(struct thread *thread);
257
258 static void zfpm_set_state(zfpm_state_t state, const char *reason);
259 static void zfpm_start_connect_timer(const char *reason);
260 static void zfpm_start_stats_timer(void);
261
262 /*
263 * zfpm_thread_should_yield
264 */
265 static inline int zfpm_thread_should_yield(struct thread *t)
266 {
267 return thread_should_yield(t);
268 }
269
270 /*
271 * zfpm_state_to_str
272 */
273 static const char *zfpm_state_to_str(zfpm_state_t state)
274 {
275 switch (state) {
276
277 case ZFPM_STATE_IDLE:
278 return "idle";
279
280 case ZFPM_STATE_ACTIVE:
281 return "active";
282
283 case ZFPM_STATE_CONNECTING:
284 return "connecting";
285
286 case ZFPM_STATE_ESTABLISHED:
287 return "established";
288
289 default:
290 return "unknown";
291 }
292 }
293
294 /*
295 * zfpm_get_elapsed_time
296 *
297 * Returns the time elapsed (in seconds) since the given time.
298 */
299 static time_t zfpm_get_elapsed_time(time_t reference)
300 {
301 time_t now;
302
303 now = monotime(NULL);
304
305 if (now < reference) {
306 assert(0);
307 return 0;
308 }
309
310 return now - reference;
311 }
312
313 /*
314 * zfpm_rnodes_iter_init
315 */
316 static inline void zfpm_rnodes_iter_init(zfpm_rnodes_iter_t *iter)
317 {
318 memset(iter, 0, sizeof(*iter));
319 rib_tables_iter_init(&iter->tables_iter);
320
321 /*
322 * This is a hack, but it makes implementing 'next' easier by
323 * ensuring that route_table_iter_next() will return NULL the first
324 * time we call it.
325 */
326 route_table_iter_init(&iter->iter, NULL);
327 route_table_iter_cleanup(&iter->iter);
328 }
329
330 /*
331 * zfpm_rnodes_iter_next
332 */
333 static inline struct route_node *zfpm_rnodes_iter_next(zfpm_rnodes_iter_t *iter)
334 {
335 struct route_node *rn;
336 struct route_table *table;
337
338 while (1) {
339 rn = route_table_iter_next(&iter->iter);
340 if (rn)
341 return rn;
342
343 /*
344 * We've made our way through this table, go to the next one.
345 */
346 route_table_iter_cleanup(&iter->iter);
347
348 table = rib_tables_iter_next(&iter->tables_iter);
349
350 if (!table)
351 return NULL;
352
353 route_table_iter_init(&iter->iter, table);
354 }
355
356 return NULL;
357 }
358
359 /*
360 * zfpm_rnodes_iter_pause
361 */
362 static inline void zfpm_rnodes_iter_pause(zfpm_rnodes_iter_t *iter)
363 {
364 route_table_iter_pause(&iter->iter);
365 }
366
367 /*
368 * zfpm_rnodes_iter_cleanup
369 */
370 static inline void zfpm_rnodes_iter_cleanup(zfpm_rnodes_iter_t *iter)
371 {
372 route_table_iter_cleanup(&iter->iter);
373 rib_tables_iter_cleanup(&iter->tables_iter);
374 }
375
376 /*
377 * zfpm_stats_init
378 *
379 * Initialize a statistics block.
380 */
381 static inline void zfpm_stats_init(zfpm_stats_t *stats)
382 {
383 memset(stats, 0, sizeof(*stats));
384 }
385
386 /*
387 * zfpm_stats_reset
388 */
389 static inline void zfpm_stats_reset(zfpm_stats_t *stats)
390 {
391 zfpm_stats_init(stats);
392 }
393
394 /*
395 * zfpm_stats_copy
396 */
397 static inline void zfpm_stats_copy(const zfpm_stats_t *src, zfpm_stats_t *dest)
398 {
399 memcpy(dest, src, sizeof(*dest));
400 }
401
402 /*
403 * zfpm_stats_compose
404 *
405 * Total up the statistics in two stats structures ('s1 and 's2') and
406 * return the result in the third argument, 'result'. Note that the
407 * pointer 'result' may be the same as 's1' or 's2'.
408 *
409 * For simplicity, the implementation below assumes that the stats
410 * structure is composed entirely of counters. This can easily be
411 * changed when necessary.
412 */
413 static void zfpm_stats_compose(const zfpm_stats_t *s1, const zfpm_stats_t *s2,
414 zfpm_stats_t *result)
415 {
416 const unsigned long *p1, *p2;
417 unsigned long *result_p;
418 int i, num_counters;
419
420 p1 = (const unsigned long *)s1;
421 p2 = (const unsigned long *)s2;
422 result_p = (unsigned long *)result;
423
424 num_counters = (sizeof(zfpm_stats_t) / sizeof(unsigned long));
425
426 for (i = 0; i < num_counters; i++) {
427 result_p[i] = p1[i] + p2[i];
428 }
429 }
430
431 /*
432 * zfpm_read_on
433 */
434 static inline void zfpm_read_on(void)
435 {
436 assert(!zfpm_g->t_read);
437 assert(zfpm_g->sock >= 0);
438
439 thread_add_read(zfpm_g->master, zfpm_read_cb, 0, zfpm_g->sock,
440 &zfpm_g->t_read);
441 }
442
443 /*
444 * zfpm_write_on
445 */
446 static inline void zfpm_write_on(void)
447 {
448 assert(!zfpm_g->t_write);
449 assert(zfpm_g->sock >= 0);
450
451 thread_add_write(zfpm_g->master, zfpm_write_cb, 0, zfpm_g->sock,
452 &zfpm_g->t_write);
453 }
454
455 /*
456 * zfpm_read_off
457 */
458 static inline void zfpm_read_off(void)
459 {
460 THREAD_READ_OFF(zfpm_g->t_read);
461 }
462
463 /*
464 * zfpm_write_off
465 */
466 static inline void zfpm_write_off(void)
467 {
468 THREAD_WRITE_OFF(zfpm_g->t_write);
469 }
470
471 /*
472 * zfpm_conn_up_thread_cb
473 *
474 * Callback for actions to be taken when the connection to the FPM
475 * comes up.
476 */
477 static int zfpm_conn_up_thread_cb(struct thread *thread)
478 {
479 struct route_node *rnode;
480 zfpm_rnodes_iter_t *iter;
481 rib_dest_t *dest;
482
483 zfpm_g->t_conn_up = NULL;
484
485 iter = &zfpm_g->t_conn_up_state.iter;
486
487 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED) {
488 zfpm_debug(
489 "Connection not up anymore, conn_up thread aborting");
490 zfpm_g->stats.t_conn_up_aborts++;
491 goto done;
492 }
493
494 while ((rnode = zfpm_rnodes_iter_next(iter))) {
495 dest = rib_dest_from_rnode(rnode);
496
497 if (dest) {
498 zfpm_g->stats.t_conn_up_dests_processed++;
499 zfpm_trigger_update(rnode, NULL);
500 }
501
502 /*
503 * Yield if need be.
504 */
505 if (!zfpm_thread_should_yield(thread))
506 continue;
507
508 zfpm_g->stats.t_conn_up_yields++;
509 zfpm_rnodes_iter_pause(iter);
510 zfpm_g->t_conn_up = NULL;
511 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb,
512 NULL, 0, &zfpm_g->t_conn_up);
513 return 0;
514 }
515
516 zfpm_g->stats.t_conn_up_finishes++;
517
518 done:
519 zfpm_rnodes_iter_cleanup(iter);
520 return 0;
521 }
522
523 /*
524 * zfpm_connection_up
525 *
526 * Called when the connection to the FPM comes up.
527 */
528 static void zfpm_connection_up(const char *detail)
529 {
530 assert(zfpm_g->sock >= 0);
531 zfpm_read_on();
532 zfpm_write_on();
533 zfpm_set_state(ZFPM_STATE_ESTABLISHED, detail);
534
535 /*
536 * Start thread to push existing routes to the FPM.
537 */
538 assert(!zfpm_g->t_conn_up);
539
540 zfpm_rnodes_iter_init(&zfpm_g->t_conn_up_state.iter);
541
542 zfpm_debug("Starting conn_up thread");
543 zfpm_g->t_conn_up = NULL;
544 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0,
545 &zfpm_g->t_conn_up);
546 zfpm_g->stats.t_conn_up_starts++;
547 }
548
549 /*
550 * zfpm_connect_check
551 *
552 * Check if an asynchronous connect() to the FPM is complete.
553 */
554 static void zfpm_connect_check(void)
555 {
556 int status;
557 socklen_t slen;
558 int ret;
559
560 zfpm_read_off();
561 zfpm_write_off();
562
563 slen = sizeof(status);
564 ret = getsockopt(zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *)&status,
565 &slen);
566
567 if (ret >= 0 && status == 0) {
568 zfpm_connection_up("async connect complete");
569 return;
570 }
571
572 /*
573 * getsockopt() failed or indicated an error on the socket.
574 */
575 close(zfpm_g->sock);
576 zfpm_g->sock = -1;
577
578 zfpm_start_connect_timer("getsockopt() after async connect failed");
579 return;
580 }
581
582 /*
583 * zfpm_conn_down_thread_cb
584 *
585 * Callback that is invoked to clean up state after the TCP connection
586 * to the FPM goes down.
587 */
588 static int zfpm_conn_down_thread_cb(struct thread *thread)
589 {
590 struct route_node *rnode;
591 zfpm_rnodes_iter_t *iter;
592 rib_dest_t *dest;
593
594 assert(zfpm_g->state == ZFPM_STATE_IDLE);
595
596 zfpm_g->t_conn_down = NULL;
597
598 iter = &zfpm_g->t_conn_down_state.iter;
599
600 while ((rnode = zfpm_rnodes_iter_next(iter))) {
601 dest = rib_dest_from_rnode(rnode);
602
603 if (dest) {
604 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
605 TAILQ_REMOVE(&zfpm_g->dest_q, dest,
606 fpm_q_entries);
607 }
608
609 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
610 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
611
612 zfpm_g->stats.t_conn_down_dests_processed++;
613
614 /*
615 * Check if the dest should be deleted.
616 */
617 rib_gc_dest(rnode);
618 }
619
620 /*
621 * Yield if need be.
622 */
623 if (!zfpm_thread_should_yield(thread))
624 continue;
625
626 zfpm_g->stats.t_conn_down_yields++;
627 zfpm_rnodes_iter_pause(iter);
628 zfpm_g->t_conn_down = NULL;
629 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb,
630 NULL, 0, &zfpm_g->t_conn_down);
631 return 0;
632 }
633
634 zfpm_g->stats.t_conn_down_finishes++;
635 zfpm_rnodes_iter_cleanup(iter);
636
637 /*
638 * Start the process of connecting to the FPM again.
639 */
640 zfpm_start_connect_timer("cleanup complete");
641 return 0;
642 }
643
644 /*
645 * zfpm_connection_down
646 *
647 * Called when the connection to the FPM has gone down.
648 */
649 static void zfpm_connection_down(const char *detail)
650 {
651 if (!detail)
652 detail = "unknown";
653
654 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
655
656 zlog_info("connection to the FPM has gone down: %s", detail);
657
658 zfpm_read_off();
659 zfpm_write_off();
660
661 stream_reset(zfpm_g->ibuf);
662 stream_reset(zfpm_g->obuf);
663
664 if (zfpm_g->sock >= 0) {
665 close(zfpm_g->sock);
666 zfpm_g->sock = -1;
667 }
668
669 /*
670 * Start thread to clean up state after the connection goes down.
671 */
672 assert(!zfpm_g->t_conn_down);
673 zfpm_debug("Starting conn_down thread");
674 zfpm_rnodes_iter_init(&zfpm_g->t_conn_down_state.iter);
675 zfpm_g->t_conn_down = NULL;
676 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0,
677 &zfpm_g->t_conn_down);
678 zfpm_g->stats.t_conn_down_starts++;
679
680 zfpm_set_state(ZFPM_STATE_IDLE, detail);
681 }
682
683 /*
684 * zfpm_read_cb
685 */
686 static int zfpm_read_cb(struct thread *thread)
687 {
688 size_t already;
689 struct stream *ibuf;
690 uint16_t msg_len;
691 fpm_msg_hdr_t *hdr;
692
693 zfpm_g->stats.read_cb_calls++;
694 zfpm_g->t_read = NULL;
695
696 /*
697 * Check if async connect is now done.
698 */
699 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
700 zfpm_connect_check();
701 return 0;
702 }
703
704 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
705 assert(zfpm_g->sock >= 0);
706
707 ibuf = zfpm_g->ibuf;
708
709 already = stream_get_endp(ibuf);
710 if (already < FPM_MSG_HDR_LEN) {
711 ssize_t nbyte;
712
713 nbyte = stream_read_try(ibuf, zfpm_g->sock,
714 FPM_MSG_HDR_LEN - already);
715 if (nbyte == 0 || nbyte == -1) {
716 zfpm_connection_down("closed socket in read");
717 return 0;
718 }
719
720 if (nbyte != (ssize_t)(FPM_MSG_HDR_LEN - already))
721 goto done;
722
723 already = FPM_MSG_HDR_LEN;
724 }
725
726 stream_set_getp(ibuf, 0);
727
728 hdr = (fpm_msg_hdr_t *)stream_pnt(ibuf);
729
730 if (!fpm_msg_hdr_ok(hdr)) {
731 zfpm_connection_down("invalid message header");
732 return 0;
733 }
734
735 msg_len = fpm_msg_len(hdr);
736
737 /*
738 * Read out the rest of the packet.
739 */
740 if (already < msg_len) {
741 ssize_t nbyte;
742
743 nbyte = stream_read_try(ibuf, zfpm_g->sock, msg_len - already);
744
745 if (nbyte == 0 || nbyte == -1) {
746 zfpm_connection_down("failed to read message");
747 return 0;
748 }
749
750 if (nbyte != (ssize_t)(msg_len - already))
751 goto done;
752 }
753
754 zfpm_debug("Read out a full fpm message");
755
756 /*
757 * Just throw it away for now.
758 */
759 stream_reset(ibuf);
760
761 done:
762 zfpm_read_on();
763 return 0;
764 }
765
766 /*
767 * zfpm_writes_pending
768 *
769 * Returns TRUE if we may have something to write to the FPM.
770 */
771 static int zfpm_writes_pending(void)
772 {
773
774 /*
775 * Check if there is any data in the outbound buffer that has not
776 * been written to the socket yet.
777 */
778 if (stream_get_endp(zfpm_g->obuf) - stream_get_getp(zfpm_g->obuf))
779 return 1;
780
781 /*
782 * Check if there are any prefixes on the outbound queue.
783 */
784 if (!TAILQ_EMPTY(&zfpm_g->dest_q))
785 return 1;
786
787 return 0;
788 }
789
790 /*
791 * zfpm_encode_route
792 *
793 * Encode a message to the FPM with information about the given route.
794 *
795 * Returns the number of bytes written to the buffer. 0 or a negative
796 * value indicates an error.
797 */
798 static inline int zfpm_encode_route(rib_dest_t *dest, struct route_entry *re,
799 char *in_buf, size_t in_buf_len,
800 fpm_msg_type_e *msg_type)
801 {
802 size_t len;
803 #ifdef HAVE_NETLINK
804 int cmd;
805 #endif
806 len = 0;
807
808 *msg_type = FPM_MSG_TYPE_NONE;
809
810 switch (zfpm_g->message_format) {
811
812 case ZFPM_MSG_FORMAT_PROTOBUF:
813 #ifdef HAVE_PROTOBUF
814 len = zfpm_protobuf_encode_route(dest, re, (uint8_t *)in_buf,
815 in_buf_len);
816 *msg_type = FPM_MSG_TYPE_PROTOBUF;
817 #endif
818 break;
819
820 case ZFPM_MSG_FORMAT_NETLINK:
821 #ifdef HAVE_NETLINK
822 *msg_type = FPM_MSG_TYPE_NETLINK;
823 cmd = re ? RTM_NEWROUTE : RTM_DELROUTE;
824 len = zfpm_netlink_encode_route(cmd, dest, re, in_buf,
825 in_buf_len);
826 assert(fpm_msg_align(len) == len);
827 *msg_type = FPM_MSG_TYPE_NETLINK;
828 #endif /* HAVE_NETLINK */
829 break;
830
831 default:
832 break;
833 }
834
835 return len;
836 }
837
838 /*
839 * zfpm_route_for_update
840 *
841 * Returns the re that is to be sent to the FPM for a given dest.
842 */
843 struct route_entry *zfpm_route_for_update(rib_dest_t *dest)
844 {
845 struct route_entry *re;
846
847 RE_DEST_FOREACH_ROUTE (dest, re) {
848 if (!CHECK_FLAG(re->status, ROUTE_ENTRY_SELECTED_FIB))
849 continue;
850
851 return re;
852 }
853
854 /*
855 * We have no route for this destination.
856 */
857 return NULL;
858 }
859
860 /*
861 * zfpm_build_updates
862 *
863 * Process the outgoing queue and write messages to the outbound
864 * buffer.
865 */
866 static void zfpm_build_updates(void)
867 {
868 struct stream *s;
869 rib_dest_t *dest;
870 unsigned char *buf, *data, *buf_end;
871 size_t msg_len;
872 size_t data_len;
873 fpm_msg_hdr_t *hdr;
874 struct route_entry *re;
875 int is_add, write_msg;
876 fpm_msg_type_e msg_type;
877
878 s = zfpm_g->obuf;
879
880 assert(stream_empty(s));
881
882 do {
883
884 /*
885 * Make sure there is enough space to write another message.
886 */
887 if (STREAM_WRITEABLE(s) < FPM_MAX_MSG_LEN)
888 break;
889
890 buf = STREAM_DATA(s) + stream_get_endp(s);
891 buf_end = buf + STREAM_WRITEABLE(s);
892
893 dest = TAILQ_FIRST(&zfpm_g->dest_q);
894 if (!dest)
895 break;
896
897 assert(CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM));
898
899 hdr = (fpm_msg_hdr_t *)buf;
900 hdr->version = FPM_PROTO_VERSION;
901
902 data = fpm_msg_data(hdr);
903
904 re = zfpm_route_for_update(dest);
905 is_add = re ? 1 : 0;
906
907 write_msg = 1;
908
909 /*
910 * If this is a route deletion, and we have not sent the route
911 * to
912 * the FPM previously, skip it.
913 */
914 if (!is_add && !CHECK_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM)) {
915 write_msg = 0;
916 zfpm_g->stats.nop_deletes_skipped++;
917 }
918
919 if (write_msg) {
920 data_len = zfpm_encode_route(dest, re, (char *)data,
921 buf_end - data, &msg_type);
922
923 assert(data_len);
924 if (data_len) {
925 hdr->msg_type = msg_type;
926 msg_len = fpm_data_len_to_msg_len(data_len);
927 hdr->msg_len = htons(msg_len);
928 stream_forward_endp(s, msg_len);
929
930 if (is_add)
931 zfpm_g->stats.route_adds++;
932 else
933 zfpm_g->stats.route_dels++;
934 }
935 }
936
937 /*
938 * Remove the dest from the queue, and reset the flag.
939 */
940 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
941 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
942
943 if (is_add) {
944 SET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
945 } else {
946 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
947 }
948
949 /*
950 * Delete the destination if necessary.
951 */
952 if (rib_gc_dest(dest->rnode))
953 zfpm_g->stats.dests_del_after_update++;
954
955 } while (1);
956 }
957
958 /*
959 * zfpm_write_cb
960 */
961 static int zfpm_write_cb(struct thread *thread)
962 {
963 struct stream *s;
964 int num_writes;
965
966 zfpm_g->stats.write_cb_calls++;
967 zfpm_g->t_write = NULL;
968
969 /*
970 * Check if async connect is now done.
971 */
972 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
973 zfpm_connect_check();
974 return 0;
975 }
976
977 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
978 assert(zfpm_g->sock >= 0);
979
980 num_writes = 0;
981
982 do {
983 int bytes_to_write, bytes_written;
984
985 s = zfpm_g->obuf;
986
987 /*
988 * If the stream is empty, try fill it up with data.
989 */
990 if (stream_empty(s)) {
991 zfpm_build_updates();
992 }
993
994 bytes_to_write = stream_get_endp(s) - stream_get_getp(s);
995 if (!bytes_to_write)
996 break;
997
998 bytes_written =
999 write(zfpm_g->sock, stream_pnt(s), bytes_to_write);
1000 zfpm_g->stats.write_calls++;
1001 num_writes++;
1002
1003 if (bytes_written < 0) {
1004 if (ERRNO_IO_RETRY(errno))
1005 break;
1006
1007 zfpm_connection_down("failed to write to socket");
1008 return 0;
1009 }
1010
1011 if (bytes_written != bytes_to_write) {
1012
1013 /*
1014 * Partial write.
1015 */
1016 stream_forward_getp(s, bytes_written);
1017 zfpm_g->stats.partial_writes++;
1018 break;
1019 }
1020
1021 /*
1022 * We've written out the entire contents of the stream.
1023 */
1024 stream_reset(s);
1025
1026 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN) {
1027 zfpm_g->stats.max_writes_hit++;
1028 break;
1029 }
1030
1031 if (zfpm_thread_should_yield(thread)) {
1032 zfpm_g->stats.t_write_yields++;
1033 break;
1034 }
1035 } while (1);
1036
1037 if (zfpm_writes_pending())
1038 zfpm_write_on();
1039
1040 return 0;
1041 }
1042
1043 /*
1044 * zfpm_connect_cb
1045 */
1046 static int zfpm_connect_cb(struct thread *t)
1047 {
1048 int sock, ret;
1049 struct sockaddr_in serv;
1050
1051 zfpm_g->t_connect = NULL;
1052 assert(zfpm_g->state == ZFPM_STATE_ACTIVE);
1053
1054 sock = socket(AF_INET, SOCK_STREAM, 0);
1055 if (sock < 0) {
1056 zfpm_debug("Failed to create socket for connect(): %s",
1057 strerror(errno));
1058 zfpm_g->stats.connect_no_sock++;
1059 return 0;
1060 }
1061
1062 set_nonblocking(sock);
1063
1064 /* Make server socket. */
1065 memset(&serv, 0, sizeof(serv));
1066 serv.sin_family = AF_INET;
1067 serv.sin_port = htons(zfpm_g->fpm_port);
1068 #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1069 serv.sin_len = sizeof(struct sockaddr_in);
1070 #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1071 if (!zfpm_g->fpm_server)
1072 serv.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1073 else
1074 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1075
1076 /*
1077 * Connect to the FPM.
1078 */
1079 zfpm_g->connect_calls++;
1080 zfpm_g->stats.connect_calls++;
1081 zfpm_g->last_connect_call_time = monotime(NULL);
1082
1083 ret = connect(sock, (struct sockaddr *)&serv, sizeof(serv));
1084 if (ret >= 0) {
1085 zfpm_g->sock = sock;
1086 zfpm_connection_up("connect succeeded");
1087 return 1;
1088 }
1089
1090 if (errno == EINPROGRESS) {
1091 zfpm_g->sock = sock;
1092 zfpm_read_on();
1093 zfpm_write_on();
1094 zfpm_set_state(ZFPM_STATE_CONNECTING,
1095 "async connect in progress");
1096 return 0;
1097 }
1098
1099 zlog_info("can't connect to FPM %d: %s", sock, safe_strerror(errno));
1100 close(sock);
1101
1102 /*
1103 * Restart timer for retrying connection.
1104 */
1105 zfpm_start_connect_timer("connect() failed");
1106 return 0;
1107 }
1108
1109 /*
1110 * zfpm_set_state
1111 *
1112 * Move state machine into the given state.
1113 */
1114 static void zfpm_set_state(zfpm_state_t state, const char *reason)
1115 {
1116 zfpm_state_t cur_state = zfpm_g->state;
1117
1118 if (!reason)
1119 reason = "Unknown";
1120
1121 if (state == cur_state)
1122 return;
1123
1124 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1125 zfpm_state_to_str(cur_state), zfpm_state_to_str(state),
1126 reason);
1127
1128 switch (state) {
1129
1130 case ZFPM_STATE_IDLE:
1131 assert(cur_state == ZFPM_STATE_ESTABLISHED);
1132 break;
1133
1134 case ZFPM_STATE_ACTIVE:
1135 assert(cur_state == ZFPM_STATE_IDLE
1136 || cur_state == ZFPM_STATE_CONNECTING);
1137 assert(zfpm_g->t_connect);
1138 break;
1139
1140 case ZFPM_STATE_CONNECTING:
1141 assert(zfpm_g->sock);
1142 assert(cur_state == ZFPM_STATE_ACTIVE);
1143 assert(zfpm_g->t_read);
1144 assert(zfpm_g->t_write);
1145 break;
1146
1147 case ZFPM_STATE_ESTABLISHED:
1148 assert(cur_state == ZFPM_STATE_ACTIVE
1149 || cur_state == ZFPM_STATE_CONNECTING);
1150 assert(zfpm_g->sock);
1151 assert(zfpm_g->t_read);
1152 assert(zfpm_g->t_write);
1153 break;
1154 }
1155
1156 zfpm_g->state = state;
1157 }
1158
1159 /*
1160 * zfpm_calc_connect_delay
1161 *
1162 * Returns the number of seconds after which we should attempt to
1163 * reconnect to the FPM.
1164 */
1165 static long zfpm_calc_connect_delay(void)
1166 {
1167 time_t elapsed;
1168
1169 /*
1170 * Return 0 if this is our first attempt to connect.
1171 */
1172 if (zfpm_g->connect_calls == 0) {
1173 return 0;
1174 }
1175
1176 elapsed = zfpm_get_elapsed_time(zfpm_g->last_connect_call_time);
1177
1178 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1179 return 0;
1180 }
1181
1182 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1183 }
1184
1185 /*
1186 * zfpm_start_connect_timer
1187 */
1188 static void zfpm_start_connect_timer(const char *reason)
1189 {
1190 long delay_secs;
1191
1192 assert(!zfpm_g->t_connect);
1193 assert(zfpm_g->sock < 0);
1194
1195 assert(zfpm_g->state == ZFPM_STATE_IDLE
1196 || zfpm_g->state == ZFPM_STATE_ACTIVE
1197 || zfpm_g->state == ZFPM_STATE_CONNECTING);
1198
1199 delay_secs = zfpm_calc_connect_delay();
1200 zfpm_debug("scheduling connect in %ld seconds", delay_secs);
1201
1202 thread_add_timer(zfpm_g->master, zfpm_connect_cb, 0, delay_secs,
1203 &zfpm_g->t_connect);
1204 zfpm_set_state(ZFPM_STATE_ACTIVE, reason);
1205 }
1206
1207 /*
1208 * zfpm_is_enabled
1209 *
1210 * Returns TRUE if the zebra FPM module has been enabled.
1211 */
1212 static inline int zfpm_is_enabled(void)
1213 {
1214 return zfpm_g->enabled;
1215 }
1216
1217 /*
1218 * zfpm_conn_is_up
1219 *
1220 * Returns TRUE if the connection to the FPM is up.
1221 */
1222 static inline int zfpm_conn_is_up(void)
1223 {
1224 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1225 return 0;
1226
1227 assert(zfpm_g->sock >= 0);
1228
1229 return 1;
1230 }
1231
1232 /*
1233 * zfpm_trigger_update
1234 *
1235 * The zebra code invokes this function to indicate that we should
1236 * send an update to the FPM about the given route_node.
1237 */
1238 static int zfpm_trigger_update(struct route_node *rn, const char *reason)
1239 {
1240 rib_dest_t *dest;
1241 char buf[PREFIX_STRLEN];
1242
1243 /*
1244 * Ignore if the connection is down. We will update the FPM about
1245 * all destinations once the connection comes up.
1246 */
1247 if (!zfpm_conn_is_up())
1248 return 0;
1249
1250 dest = rib_dest_from_rnode(rn);
1251
1252 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
1253 zfpm_g->stats.redundant_triggers++;
1254 return 0;
1255 }
1256
1257 if (reason) {
1258 zfpm_debug("%s triggering update to FPM - Reason: %s",
1259 prefix2str(&rn->p, buf, sizeof(buf)), reason);
1260 }
1261
1262 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1263 TAILQ_INSERT_TAIL(&zfpm_g->dest_q, dest, fpm_q_entries);
1264 zfpm_g->stats.updates_triggered++;
1265
1266 /*
1267 * Make sure that writes are enabled.
1268 */
1269 if (zfpm_g->t_write)
1270 return 0;
1271
1272 zfpm_write_on();
1273 return 0;
1274 }
1275
1276 /*
1277 * zfpm_stats_timer_cb
1278 */
1279 static int zfpm_stats_timer_cb(struct thread *t)
1280 {
1281 zfpm_g->t_stats = NULL;
1282
1283 /*
1284 * Remember the stats collected in the last interval for display
1285 * purposes.
1286 */
1287 zfpm_stats_copy(&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1288
1289 /*
1290 * Add the current set of stats into the cumulative statistics.
1291 */
1292 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1293 &zfpm_g->cumulative_stats);
1294
1295 /*
1296 * Start collecting stats afresh over the next interval.
1297 */
1298 zfpm_stats_reset(&zfpm_g->stats);
1299
1300 zfpm_start_stats_timer();
1301
1302 return 0;
1303 }
1304
1305 /*
1306 * zfpm_stop_stats_timer
1307 */
1308 static void zfpm_stop_stats_timer(void)
1309 {
1310 if (!zfpm_g->t_stats)
1311 return;
1312
1313 zfpm_debug("Stopping existing stats timer");
1314 THREAD_TIMER_OFF(zfpm_g->t_stats);
1315 }
1316
1317 /*
1318 * zfpm_start_stats_timer
1319 */
1320 void zfpm_start_stats_timer(void)
1321 {
1322 assert(!zfpm_g->t_stats);
1323
1324 thread_add_timer(zfpm_g->master, zfpm_stats_timer_cb, 0,
1325 ZFPM_STATS_IVL_SECS, &zfpm_g->t_stats);
1326 }
1327
1328 /*
1329 * Helper macro for zfpm_show_stats() below.
1330 */
1331 #define ZFPM_SHOW_STAT(counter) \
1332 do { \
1333 vty_out(vty, "%-40s %10lu %16lu\n", #counter, \
1334 total_stats.counter, zfpm_g->last_ivl_stats.counter); \
1335 } while (0)
1336
1337 /*
1338 * zfpm_show_stats
1339 */
1340 static void zfpm_show_stats(struct vty *vty)
1341 {
1342 zfpm_stats_t total_stats;
1343 time_t elapsed;
1344
1345 vty_out(vty, "\n%-40s %10s Last %2d secs\n\n", "Counter", "Total",
1346 ZFPM_STATS_IVL_SECS);
1347
1348 /*
1349 * Compute the total stats up to this instant.
1350 */
1351 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1352 &total_stats);
1353
1354 ZFPM_SHOW_STAT(connect_calls);
1355 ZFPM_SHOW_STAT(connect_no_sock);
1356 ZFPM_SHOW_STAT(read_cb_calls);
1357 ZFPM_SHOW_STAT(write_cb_calls);
1358 ZFPM_SHOW_STAT(write_calls);
1359 ZFPM_SHOW_STAT(partial_writes);
1360 ZFPM_SHOW_STAT(max_writes_hit);
1361 ZFPM_SHOW_STAT(t_write_yields);
1362 ZFPM_SHOW_STAT(nop_deletes_skipped);
1363 ZFPM_SHOW_STAT(route_adds);
1364 ZFPM_SHOW_STAT(route_dels);
1365 ZFPM_SHOW_STAT(updates_triggered);
1366 ZFPM_SHOW_STAT(redundant_triggers);
1367 ZFPM_SHOW_STAT(dests_del_after_update);
1368 ZFPM_SHOW_STAT(t_conn_down_starts);
1369 ZFPM_SHOW_STAT(t_conn_down_dests_processed);
1370 ZFPM_SHOW_STAT(t_conn_down_yields);
1371 ZFPM_SHOW_STAT(t_conn_down_finishes);
1372 ZFPM_SHOW_STAT(t_conn_up_starts);
1373 ZFPM_SHOW_STAT(t_conn_up_dests_processed);
1374 ZFPM_SHOW_STAT(t_conn_up_yields);
1375 ZFPM_SHOW_STAT(t_conn_up_aborts);
1376 ZFPM_SHOW_STAT(t_conn_up_finishes);
1377
1378 if (!zfpm_g->last_stats_clear_time)
1379 return;
1380
1381 elapsed = zfpm_get_elapsed_time(zfpm_g->last_stats_clear_time);
1382
1383 vty_out(vty, "\nStats were cleared %lu seconds ago\n",
1384 (unsigned long)elapsed);
1385 }
1386
1387 /*
1388 * zfpm_clear_stats
1389 */
1390 static void zfpm_clear_stats(struct vty *vty)
1391 {
1392 if (!zfpm_is_enabled()) {
1393 vty_out(vty, "The FPM module is not enabled...\n");
1394 return;
1395 }
1396
1397 zfpm_stats_reset(&zfpm_g->stats);
1398 zfpm_stats_reset(&zfpm_g->last_ivl_stats);
1399 zfpm_stats_reset(&zfpm_g->cumulative_stats);
1400
1401 zfpm_stop_stats_timer();
1402 zfpm_start_stats_timer();
1403
1404 zfpm_g->last_stats_clear_time = monotime(NULL);
1405
1406 vty_out(vty, "Cleared FPM stats\n");
1407 }
1408
1409 /*
1410 * show_zebra_fpm_stats
1411 */
1412 DEFUN (show_zebra_fpm_stats,
1413 show_zebra_fpm_stats_cmd,
1414 "show zebra fpm stats",
1415 SHOW_STR
1416 ZEBRA_STR
1417 "Forwarding Path Manager information\n"
1418 "Statistics\n")
1419 {
1420 zfpm_show_stats(vty);
1421 return CMD_SUCCESS;
1422 }
1423
1424 /*
1425 * clear_zebra_fpm_stats
1426 */
1427 DEFUN (clear_zebra_fpm_stats,
1428 clear_zebra_fpm_stats_cmd,
1429 "clear zebra fpm stats",
1430 CLEAR_STR
1431 ZEBRA_STR
1432 "Clear Forwarding Path Manager information\n"
1433 "Statistics\n")
1434 {
1435 zfpm_clear_stats(vty);
1436 return CMD_SUCCESS;
1437 }
1438
1439 /*
1440 * update fpm connection information
1441 */
1442 DEFUN ( fpm_remote_ip,
1443 fpm_remote_ip_cmd,
1444 "fpm connection ip A.B.C.D port (1-65535)",
1445 "fpm connection remote ip and port\n"
1446 "Remote fpm server ip A.B.C.D\n"
1447 "Enter ip ")
1448 {
1449
1450 in_addr_t fpm_server;
1451 uint32_t port_no;
1452
1453 fpm_server = inet_addr(argv[3]->arg);
1454 if (fpm_server == INADDR_NONE)
1455 return CMD_ERR_INCOMPLETE;
1456
1457 port_no = atoi(argv[5]->arg);
1458 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1459 return CMD_ERR_INCOMPLETE;
1460
1461 zfpm_g->fpm_server = fpm_server;
1462 zfpm_g->fpm_port = port_no;
1463
1464
1465 return CMD_SUCCESS;
1466 }
1467
1468 DEFUN ( no_fpm_remote_ip,
1469 no_fpm_remote_ip_cmd,
1470 "no fpm connection ip A.B.C.D port (1-65535)",
1471 "fpm connection remote ip and port\n"
1472 "Connection\n"
1473 "Remote fpm server ip A.B.C.D\n"
1474 "Enter ip ")
1475 {
1476 if (zfpm_g->fpm_server != inet_addr(argv[4]->arg)
1477 || zfpm_g->fpm_port != atoi(argv[6]->arg))
1478 return CMD_ERR_NO_MATCH;
1479
1480 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1481 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1482
1483 return CMD_SUCCESS;
1484 }
1485
1486 /*
1487 * zfpm_init_message_format
1488 */
1489 static inline void zfpm_init_message_format(const char *format)
1490 {
1491 int have_netlink, have_protobuf;
1492
1493 #ifdef HAVE_NETLINK
1494 have_netlink = 1;
1495 #else
1496 have_netlink = 0;
1497 #endif
1498
1499 #ifdef HAVE_PROTOBUF
1500 have_protobuf = 1;
1501 #else
1502 have_protobuf = 0;
1503 #endif
1504
1505 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
1506
1507 if (!format) {
1508 if (have_netlink) {
1509 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1510 } else if (have_protobuf) {
1511 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1512 }
1513 return;
1514 }
1515
1516 if (!strcmp("netlink", format)) {
1517 if (!have_netlink) {
1518 zlog_err("FPM netlink message format is not available");
1519 return;
1520 }
1521 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1522 return;
1523 }
1524
1525 if (!strcmp("protobuf", format)) {
1526 if (!have_protobuf) {
1527 zlog_err(
1528 "FPM protobuf message format is not available");
1529 return;
1530 }
1531 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1532 return;
1533 }
1534
1535 zlog_warn("Unknown fpm format '%s'", format);
1536 }
1537
1538 /**
1539 * fpm_remote_srv_write
1540 *
1541 * Module to write remote fpm connection
1542 *
1543 * Returns ZERO on success.
1544 */
1545
1546 static int fpm_remote_srv_write(struct vty *vty)
1547 {
1548 struct in_addr in;
1549
1550 in.s_addr = zfpm_g->fpm_server;
1551
1552 if ((zfpm_g->fpm_server != FPM_DEFAULT_IP
1553 && zfpm_g->fpm_server != INADDR_ANY)
1554 || (zfpm_g->fpm_port != FPM_DEFAULT_PORT
1555 && zfpm_g->fpm_port != 0))
1556 vty_out(vty, "fpm connection ip %s port %d\n", inet_ntoa(in),
1557 zfpm_g->fpm_port);
1558
1559 return 0;
1560 }
1561
1562
1563 /* Zebra node */
1564 static struct cmd_node zebra_node = {ZEBRA_NODE, "", 1};
1565
1566
1567 /**
1568 * zfpm_init
1569 *
1570 * One-time initialization of the Zebra FPM module.
1571 *
1572 * @param[in] port port at which FPM is running.
1573 * @param[in] enable TRUE if the zebra FPM module should be enabled
1574 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
1575 *
1576 * Returns TRUE on success.
1577 */
1578 static int zfpm_init(struct thread_master *master)
1579 {
1580 int enable = 1;
1581 uint16_t port = 0;
1582 const char *format = THIS_MODULE->load_args;
1583
1584 memset(zfpm_g, 0, sizeof(*zfpm_g));
1585 zfpm_g->master = master;
1586 TAILQ_INIT(&zfpm_g->dest_q);
1587 zfpm_g->sock = -1;
1588 zfpm_g->state = ZFPM_STATE_IDLE;
1589
1590 zfpm_stats_init(&zfpm_g->stats);
1591 zfpm_stats_init(&zfpm_g->last_ivl_stats);
1592 zfpm_stats_init(&zfpm_g->cumulative_stats);
1593
1594 install_node(&zebra_node, fpm_remote_srv_write);
1595 install_element(ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1596 install_element(ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1597 install_element(CONFIG_NODE, &fpm_remote_ip_cmd);
1598 install_element(CONFIG_NODE, &no_fpm_remote_ip_cmd);
1599
1600 zfpm_init_message_format(format);
1601
1602 /*
1603 * Disable FPM interface if no suitable format is available.
1604 */
1605 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1606 enable = 0;
1607
1608 zfpm_g->enabled = enable;
1609
1610 if (!zfpm_g->fpm_server)
1611 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1612
1613 if (!port)
1614 port = FPM_DEFAULT_PORT;
1615
1616 zfpm_g->fpm_port = port;
1617
1618 zfpm_g->obuf = stream_new(ZFPM_OBUF_SIZE);
1619 zfpm_g->ibuf = stream_new(ZFPM_IBUF_SIZE);
1620
1621 zfpm_start_stats_timer();
1622 zfpm_start_connect_timer("initialized");
1623 return 0;
1624 }
1625
1626 static int zebra_fpm_module_init(void)
1627 {
1628 hook_register(rib_update, zfpm_trigger_update);
1629 hook_register(frr_late_init, zfpm_init);
1630 return 0;
1631 }
1632
1633 FRR_MODULE_SETUP(.name = "zebra_fpm", .version = FRR_VERSION,
1634 .description = "zebra FPM (Forwarding Plane Manager) module",
1635 .init = zebra_fpm_module_init, )