]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_fpm.c
Merge pull request #1037 from donaldsharp/eigrp_split_horizon
[mirror_frr.git] / zebra / zebra_fpm.c
1 /*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; see the file COPYING; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 */
23
24 #include <zebra.h>
25
26 #include "log.h"
27 #include "libfrr.h"
28 #include "stream.h"
29 #include "thread.h"
30 #include "network.h"
31 #include "command.h"
32 #include "version.h"
33
34 #include "zebra/rib.h"
35 #include "zebra/zserv.h"
36 #include "zebra/zebra_ns.h"
37 #include "zebra/zebra_vrf.h"
38
39 #include "fpm/fpm.h"
40 #include "zebra_fpm_private.h"
41
42 /*
43 * Interval at which we attempt to connect to the FPM.
44 */
45 #define ZFPM_CONNECT_RETRY_IVL 5
46
47 /*
48 * Sizes of outgoing and incoming stream buffers for writing/reading
49 * FPM messages.
50 */
51 #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
52 #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
53
54 /*
55 * The maximum number of times the FPM socket write callback can call
56 * 'write' before it yields.
57 */
58 #define ZFPM_MAX_WRITES_PER_RUN 10
59
60 /*
61 * Interval over which we collect statistics.
62 */
63 #define ZFPM_STATS_IVL_SECS 10
64
65 /*
66 * Structure that holds state for iterating over all route_node
67 * structures that are candidates for being communicated to the FPM.
68 */
69 typedef struct zfpm_rnodes_iter_t_ {
70 rib_tables_iter_t tables_iter;
71 route_table_iter_t iter;
72 } zfpm_rnodes_iter_t;
73
74 /*
75 * Statistics.
76 */
77 typedef struct zfpm_stats_t_ {
78 unsigned long connect_calls;
79 unsigned long connect_no_sock;
80
81 unsigned long read_cb_calls;
82
83 unsigned long write_cb_calls;
84 unsigned long write_calls;
85 unsigned long partial_writes;
86 unsigned long max_writes_hit;
87 unsigned long t_write_yields;
88
89 unsigned long nop_deletes_skipped;
90 unsigned long route_adds;
91 unsigned long route_dels;
92
93 unsigned long updates_triggered;
94 unsigned long redundant_triggers;
95 unsigned long non_fpm_table_triggers;
96
97 unsigned long dests_del_after_update;
98
99 unsigned long t_conn_down_starts;
100 unsigned long t_conn_down_dests_processed;
101 unsigned long t_conn_down_yields;
102 unsigned long t_conn_down_finishes;
103
104 unsigned long t_conn_up_starts;
105 unsigned long t_conn_up_dests_processed;
106 unsigned long t_conn_up_yields;
107 unsigned long t_conn_up_aborts;
108 unsigned long t_conn_up_finishes;
109
110 } zfpm_stats_t;
111
112 /*
113 * States for the FPM state machine.
114 */
115 typedef enum {
116
117 /*
118 * In this state we are not yet ready to connect to the FPM. This
119 * can happen when this module is disabled, or if we're cleaning up
120 * after a connection has gone down.
121 */
122 ZFPM_STATE_IDLE,
123
124 /*
125 * Ready to talk to the FPM and periodically trying to connect to
126 * it.
127 */
128 ZFPM_STATE_ACTIVE,
129
130 /*
131 * In the middle of bringing up a TCP connection. Specifically,
132 * waiting for a connect() call to complete asynchronously.
133 */
134 ZFPM_STATE_CONNECTING,
135
136 /*
137 * TCP connection to the FPM is up.
138 */
139 ZFPM_STATE_ESTABLISHED
140
141 } zfpm_state_t;
142
143 /*
144 * Message format to be used to communicate with the FPM.
145 */
146 typedef enum {
147 ZFPM_MSG_FORMAT_NONE,
148 ZFPM_MSG_FORMAT_NETLINK,
149 ZFPM_MSG_FORMAT_PROTOBUF,
150 } zfpm_msg_format_e;
151 /*
152 * Globals.
153 */
154 typedef struct zfpm_glob_t_ {
155
156 /*
157 * True if the FPM module has been enabled.
158 */
159 int enabled;
160
161 /*
162 * Message format to be used to communicate with the fpm.
163 */
164 zfpm_msg_format_e message_format;
165
166 struct thread_master *master;
167
168 zfpm_state_t state;
169
170 in_addr_t fpm_server;
171 /*
172 * Port on which the FPM is running.
173 */
174 int fpm_port;
175
176 /*
177 * List of rib_dest_t structures to be processed
178 */
179 TAILQ_HEAD(zfpm_dest_q, rib_dest_t_) dest_q;
180
181 /*
182 * Stream socket to the FPM.
183 */
184 int sock;
185
186 /*
187 * Buffers for messages to/from the FPM.
188 */
189 struct stream *obuf;
190 struct stream *ibuf;
191
192 /*
193 * Threads for I/O.
194 */
195 struct thread *t_connect;
196 struct thread *t_write;
197 struct thread *t_read;
198
199 /*
200 * Thread to clean up after the TCP connection to the FPM goes down
201 * and the state that belongs to it.
202 */
203 struct thread *t_conn_down;
204
205 struct {
206 zfpm_rnodes_iter_t iter;
207 } t_conn_down_state;
208
209 /*
210 * Thread to take actions once the TCP conn to the FPM comes up, and
211 * the state that belongs to it.
212 */
213 struct thread *t_conn_up;
214
215 struct {
216 zfpm_rnodes_iter_t iter;
217 } t_conn_up_state;
218
219 unsigned long connect_calls;
220 time_t last_connect_call_time;
221
222 /*
223 * Stats from the start of the current statistics interval up to
224 * now. These are the counters we typically update in the code.
225 */
226 zfpm_stats_t stats;
227
228 /*
229 * Statistics that were gathered in the last collection interval.
230 */
231 zfpm_stats_t last_ivl_stats;
232
233 /*
234 * Cumulative stats from the last clear to the start of the current
235 * statistics interval.
236 */
237 zfpm_stats_t cumulative_stats;
238
239 /*
240 * Stats interval timer.
241 */
242 struct thread *t_stats;
243
244 /*
245 * If non-zero, the last time when statistics were cleared.
246 */
247 time_t last_stats_clear_time;
248
249 } zfpm_glob_t;
250
251 static zfpm_glob_t zfpm_glob_space;
252 static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
253
254 static int zfpm_trigger_update(struct route_node *rn, const char *reason);
255
256 static int zfpm_read_cb(struct thread *thread);
257 static int zfpm_write_cb(struct thread *thread);
258
259 static void zfpm_set_state(zfpm_state_t state, const char *reason);
260 static void zfpm_start_connect_timer(const char *reason);
261 static void zfpm_start_stats_timer(void);
262
263 /*
264 * zfpm_thread_should_yield
265 */
266 static inline int zfpm_thread_should_yield(struct thread *t)
267 {
268 return thread_should_yield(t);
269 }
270
271 /*
272 * zfpm_state_to_str
273 */
274 static const char *zfpm_state_to_str(zfpm_state_t state)
275 {
276 switch (state) {
277
278 case ZFPM_STATE_IDLE:
279 return "idle";
280
281 case ZFPM_STATE_ACTIVE:
282 return "active";
283
284 case ZFPM_STATE_CONNECTING:
285 return "connecting";
286
287 case ZFPM_STATE_ESTABLISHED:
288 return "established";
289
290 default:
291 return "unknown";
292 }
293 }
294
295 /*
296 * zfpm_get_elapsed_time
297 *
298 * Returns the time elapsed (in seconds) since the given time.
299 */
300 static time_t zfpm_get_elapsed_time(time_t reference)
301 {
302 time_t now;
303
304 now = monotime(NULL);
305
306 if (now < reference) {
307 assert(0);
308 return 0;
309 }
310
311 return now - reference;
312 }
313
314 /*
315 * zfpm_is_table_for_fpm
316 *
317 * Returns TRUE if the the given table is to be communicated to the
318 * FPM.
319 */
320 static inline int zfpm_is_table_for_fpm(struct route_table *table)
321 {
322 rib_table_info_t *info;
323
324 info = rib_table_info(table);
325
326 /*
327 * We only send the unicast tables in the main instance to the FPM
328 * at this point.
329 */
330 if (zvrf_id(info->zvrf) != 0)
331 return 0;
332
333 if (info->safi != SAFI_UNICAST)
334 return 0;
335
336 return 1;
337 }
338
339 /*
340 * zfpm_rnodes_iter_init
341 */
342 static inline void zfpm_rnodes_iter_init(zfpm_rnodes_iter_t *iter)
343 {
344 memset(iter, 0, sizeof(*iter));
345 rib_tables_iter_init(&iter->tables_iter);
346
347 /*
348 * This is a hack, but it makes implementing 'next' easier by
349 * ensuring that route_table_iter_next() will return NULL the first
350 * time we call it.
351 */
352 route_table_iter_init(&iter->iter, NULL);
353 route_table_iter_cleanup(&iter->iter);
354 }
355
356 /*
357 * zfpm_rnodes_iter_next
358 */
359 static inline struct route_node *zfpm_rnodes_iter_next(zfpm_rnodes_iter_t *iter)
360 {
361 struct route_node *rn;
362 struct route_table *table;
363
364 while (1) {
365 rn = route_table_iter_next(&iter->iter);
366 if (rn)
367 return rn;
368
369 /*
370 * We've made our way through this table, go to the next one.
371 */
372 route_table_iter_cleanup(&iter->iter);
373
374 while ((table = rib_tables_iter_next(&iter->tables_iter))) {
375 if (zfpm_is_table_for_fpm(table))
376 break;
377 }
378
379 if (!table)
380 return NULL;
381
382 route_table_iter_init(&iter->iter, table);
383 }
384
385 return NULL;
386 }
387
388 /*
389 * zfpm_rnodes_iter_pause
390 */
391 static inline void zfpm_rnodes_iter_pause(zfpm_rnodes_iter_t *iter)
392 {
393 route_table_iter_pause(&iter->iter);
394 }
395
396 /*
397 * zfpm_rnodes_iter_cleanup
398 */
399 static inline void zfpm_rnodes_iter_cleanup(zfpm_rnodes_iter_t *iter)
400 {
401 route_table_iter_cleanup(&iter->iter);
402 rib_tables_iter_cleanup(&iter->tables_iter);
403 }
404
405 /*
406 * zfpm_stats_init
407 *
408 * Initialize a statistics block.
409 */
410 static inline void zfpm_stats_init(zfpm_stats_t *stats)
411 {
412 memset(stats, 0, sizeof(*stats));
413 }
414
415 /*
416 * zfpm_stats_reset
417 */
418 static inline void zfpm_stats_reset(zfpm_stats_t *stats)
419 {
420 zfpm_stats_init(stats);
421 }
422
423 /*
424 * zfpm_stats_copy
425 */
426 static inline void zfpm_stats_copy(const zfpm_stats_t *src, zfpm_stats_t *dest)
427 {
428 memcpy(dest, src, sizeof(*dest));
429 }
430
431 /*
432 * zfpm_stats_compose
433 *
434 * Total up the statistics in two stats structures ('s1 and 's2') and
435 * return the result in the third argument, 'result'. Note that the
436 * pointer 'result' may be the same as 's1' or 's2'.
437 *
438 * For simplicity, the implementation below assumes that the stats
439 * structure is composed entirely of counters. This can easily be
440 * changed when necessary.
441 */
442 static void zfpm_stats_compose(const zfpm_stats_t *s1, const zfpm_stats_t *s2,
443 zfpm_stats_t *result)
444 {
445 const unsigned long *p1, *p2;
446 unsigned long *result_p;
447 int i, num_counters;
448
449 p1 = (const unsigned long *)s1;
450 p2 = (const unsigned long *)s2;
451 result_p = (unsigned long *)result;
452
453 num_counters = (sizeof(zfpm_stats_t) / sizeof(unsigned long));
454
455 for (i = 0; i < num_counters; i++) {
456 result_p[i] = p1[i] + p2[i];
457 }
458 }
459
460 /*
461 * zfpm_read_on
462 */
463 static inline void zfpm_read_on(void)
464 {
465 assert(!zfpm_g->t_read);
466 assert(zfpm_g->sock >= 0);
467
468 thread_add_read(zfpm_g->master, zfpm_read_cb, 0, zfpm_g->sock,
469 &zfpm_g->t_read);
470 }
471
472 /*
473 * zfpm_write_on
474 */
475 static inline void zfpm_write_on(void)
476 {
477 assert(!zfpm_g->t_write);
478 assert(zfpm_g->sock >= 0);
479
480 thread_add_write(zfpm_g->master, zfpm_write_cb, 0, zfpm_g->sock,
481 &zfpm_g->t_write);
482 }
483
484 /*
485 * zfpm_read_off
486 */
487 static inline void zfpm_read_off(void)
488 {
489 THREAD_READ_OFF(zfpm_g->t_read);
490 }
491
492 /*
493 * zfpm_write_off
494 */
495 static inline void zfpm_write_off(void)
496 {
497 THREAD_WRITE_OFF(zfpm_g->t_write);
498 }
499
500 /*
501 * zfpm_conn_up_thread_cb
502 *
503 * Callback for actions to be taken when the connection to the FPM
504 * comes up.
505 */
506 static int zfpm_conn_up_thread_cb(struct thread *thread)
507 {
508 struct route_node *rnode;
509 zfpm_rnodes_iter_t *iter;
510 rib_dest_t *dest;
511
512 zfpm_g->t_conn_up = NULL;
513
514 iter = &zfpm_g->t_conn_up_state.iter;
515
516 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED) {
517 zfpm_debug(
518 "Connection not up anymore, conn_up thread aborting");
519 zfpm_g->stats.t_conn_up_aborts++;
520 goto done;
521 }
522
523 while ((rnode = zfpm_rnodes_iter_next(iter))) {
524 dest = rib_dest_from_rnode(rnode);
525
526 if (dest) {
527 zfpm_g->stats.t_conn_up_dests_processed++;
528 zfpm_trigger_update(rnode, NULL);
529 }
530
531 /*
532 * Yield if need be.
533 */
534 if (!zfpm_thread_should_yield(thread))
535 continue;
536
537 zfpm_g->stats.t_conn_up_yields++;
538 zfpm_rnodes_iter_pause(iter);
539 zfpm_g->t_conn_up = NULL;
540 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb,
541 NULL, 0, &zfpm_g->t_conn_up);
542 return 0;
543 }
544
545 zfpm_g->stats.t_conn_up_finishes++;
546
547 done:
548 zfpm_rnodes_iter_cleanup(iter);
549 return 0;
550 }
551
552 /*
553 * zfpm_connection_up
554 *
555 * Called when the connection to the FPM comes up.
556 */
557 static void zfpm_connection_up(const char *detail)
558 {
559 assert(zfpm_g->sock >= 0);
560 zfpm_read_on();
561 zfpm_write_on();
562 zfpm_set_state(ZFPM_STATE_ESTABLISHED, detail);
563
564 /*
565 * Start thread to push existing routes to the FPM.
566 */
567 assert(!zfpm_g->t_conn_up);
568
569 zfpm_rnodes_iter_init(&zfpm_g->t_conn_up_state.iter);
570
571 zfpm_debug("Starting conn_up thread");
572 zfpm_g->t_conn_up = NULL;
573 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0,
574 &zfpm_g->t_conn_up);
575 zfpm_g->stats.t_conn_up_starts++;
576 }
577
578 /*
579 * zfpm_connect_check
580 *
581 * Check if an asynchronous connect() to the FPM is complete.
582 */
583 static void zfpm_connect_check(void)
584 {
585 int status;
586 socklen_t slen;
587 int ret;
588
589 zfpm_read_off();
590 zfpm_write_off();
591
592 slen = sizeof(status);
593 ret = getsockopt(zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *)&status,
594 &slen);
595
596 if (ret >= 0 && status == 0) {
597 zfpm_connection_up("async connect complete");
598 return;
599 }
600
601 /*
602 * getsockopt() failed or indicated an error on the socket.
603 */
604 close(zfpm_g->sock);
605 zfpm_g->sock = -1;
606
607 zfpm_start_connect_timer("getsockopt() after async connect failed");
608 return;
609 }
610
611 /*
612 * zfpm_conn_down_thread_cb
613 *
614 * Callback that is invoked to clean up state after the TCP connection
615 * to the FPM goes down.
616 */
617 static int zfpm_conn_down_thread_cb(struct thread *thread)
618 {
619 struct route_node *rnode;
620 zfpm_rnodes_iter_t *iter;
621 rib_dest_t *dest;
622
623 assert(zfpm_g->state == ZFPM_STATE_IDLE);
624
625 zfpm_g->t_conn_down = NULL;
626
627 iter = &zfpm_g->t_conn_down_state.iter;
628
629 while ((rnode = zfpm_rnodes_iter_next(iter))) {
630 dest = rib_dest_from_rnode(rnode);
631
632 if (dest) {
633 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
634 TAILQ_REMOVE(&zfpm_g->dest_q, dest,
635 fpm_q_entries);
636 }
637
638 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
639 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
640
641 zfpm_g->stats.t_conn_down_dests_processed++;
642
643 /*
644 * Check if the dest should be deleted.
645 */
646 rib_gc_dest(rnode);
647 }
648
649 /*
650 * Yield if need be.
651 */
652 if (!zfpm_thread_should_yield(thread))
653 continue;
654
655 zfpm_g->stats.t_conn_down_yields++;
656 zfpm_rnodes_iter_pause(iter);
657 zfpm_g->t_conn_down = NULL;
658 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb,
659 NULL, 0, &zfpm_g->t_conn_down);
660 return 0;
661 }
662
663 zfpm_g->stats.t_conn_down_finishes++;
664 zfpm_rnodes_iter_cleanup(iter);
665
666 /*
667 * Start the process of connecting to the FPM again.
668 */
669 zfpm_start_connect_timer("cleanup complete");
670 return 0;
671 }
672
673 /*
674 * zfpm_connection_down
675 *
676 * Called when the connection to the FPM has gone down.
677 */
678 static void zfpm_connection_down(const char *detail)
679 {
680 if (!detail)
681 detail = "unknown";
682
683 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
684
685 zlog_info("connection to the FPM has gone down: %s", detail);
686
687 zfpm_read_off();
688 zfpm_write_off();
689
690 stream_reset(zfpm_g->ibuf);
691 stream_reset(zfpm_g->obuf);
692
693 if (zfpm_g->sock >= 0) {
694 close(zfpm_g->sock);
695 zfpm_g->sock = -1;
696 }
697
698 /*
699 * Start thread to clean up state after the connection goes down.
700 */
701 assert(!zfpm_g->t_conn_down);
702 zfpm_debug("Starting conn_down thread");
703 zfpm_rnodes_iter_init(&zfpm_g->t_conn_down_state.iter);
704 zfpm_g->t_conn_down = NULL;
705 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0,
706 &zfpm_g->t_conn_down);
707 zfpm_g->stats.t_conn_down_starts++;
708
709 zfpm_set_state(ZFPM_STATE_IDLE, detail);
710 }
711
712 /*
713 * zfpm_read_cb
714 */
715 static int zfpm_read_cb(struct thread *thread)
716 {
717 size_t already;
718 struct stream *ibuf;
719 uint16_t msg_len;
720 fpm_msg_hdr_t *hdr;
721
722 zfpm_g->stats.read_cb_calls++;
723 zfpm_g->t_read = NULL;
724
725 /*
726 * Check if async connect is now done.
727 */
728 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
729 zfpm_connect_check();
730 return 0;
731 }
732
733 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
734 assert(zfpm_g->sock >= 0);
735
736 ibuf = zfpm_g->ibuf;
737
738 already = stream_get_endp(ibuf);
739 if (already < FPM_MSG_HDR_LEN) {
740 ssize_t nbyte;
741
742 nbyte = stream_read_try(ibuf, zfpm_g->sock,
743 FPM_MSG_HDR_LEN - already);
744 if (nbyte == 0 || nbyte == -1) {
745 zfpm_connection_down("closed socket in read");
746 return 0;
747 }
748
749 if (nbyte != (ssize_t)(FPM_MSG_HDR_LEN - already))
750 goto done;
751
752 already = FPM_MSG_HDR_LEN;
753 }
754
755 stream_set_getp(ibuf, 0);
756
757 hdr = (fpm_msg_hdr_t *)stream_pnt(ibuf);
758
759 if (!fpm_msg_hdr_ok(hdr)) {
760 zfpm_connection_down("invalid message header");
761 return 0;
762 }
763
764 msg_len = fpm_msg_len(hdr);
765
766 /*
767 * Read out the rest of the packet.
768 */
769 if (already < msg_len) {
770 ssize_t nbyte;
771
772 nbyte = stream_read_try(ibuf, zfpm_g->sock, msg_len - already);
773
774 if (nbyte == 0 || nbyte == -1) {
775 zfpm_connection_down("failed to read message");
776 return 0;
777 }
778
779 if (nbyte != (ssize_t)(msg_len - already))
780 goto done;
781 }
782
783 zfpm_debug("Read out a full fpm message");
784
785 /*
786 * Just throw it away for now.
787 */
788 stream_reset(ibuf);
789
790 done:
791 zfpm_read_on();
792 return 0;
793 }
794
795 /*
796 * zfpm_writes_pending
797 *
798 * Returns TRUE if we may have something to write to the FPM.
799 */
800 static int zfpm_writes_pending(void)
801 {
802
803 /*
804 * Check if there is any data in the outbound buffer that has not
805 * been written to the socket yet.
806 */
807 if (stream_get_endp(zfpm_g->obuf) - stream_get_getp(zfpm_g->obuf))
808 return 1;
809
810 /*
811 * Check if there are any prefixes on the outbound queue.
812 */
813 if (!TAILQ_EMPTY(&zfpm_g->dest_q))
814 return 1;
815
816 return 0;
817 }
818
819 /*
820 * zfpm_encode_route
821 *
822 * Encode a message to the FPM with information about the given route.
823 *
824 * Returns the number of bytes written to the buffer. 0 or a negative
825 * value indicates an error.
826 */
827 static inline int zfpm_encode_route(rib_dest_t *dest, struct route_entry *re,
828 char *in_buf, size_t in_buf_len,
829 fpm_msg_type_e *msg_type)
830 {
831 size_t len;
832 #ifdef HAVE_NETLINK
833 int cmd;
834 #endif
835 len = 0;
836
837 *msg_type = FPM_MSG_TYPE_NONE;
838
839 switch (zfpm_g->message_format) {
840
841 case ZFPM_MSG_FORMAT_PROTOBUF:
842 #ifdef HAVE_PROTOBUF
843 len = zfpm_protobuf_encode_route(dest, re, (uint8_t *)in_buf,
844 in_buf_len);
845 *msg_type = FPM_MSG_TYPE_PROTOBUF;
846 #endif
847 break;
848
849 case ZFPM_MSG_FORMAT_NETLINK:
850 #ifdef HAVE_NETLINK
851 *msg_type = FPM_MSG_TYPE_NETLINK;
852 cmd = re ? RTM_NEWROUTE : RTM_DELROUTE;
853 len = zfpm_netlink_encode_route(cmd, dest, re, in_buf,
854 in_buf_len);
855 assert(fpm_msg_align(len) == len);
856 *msg_type = FPM_MSG_TYPE_NETLINK;
857 #endif /* HAVE_NETLINK */
858 break;
859
860 default:
861 break;
862 }
863
864 return len;
865 }
866
867 /*
868 * zfpm_route_for_update
869 *
870 * Returns the re that is to be sent to the FPM for a given dest.
871 */
872 struct route_entry *zfpm_route_for_update(rib_dest_t *dest)
873 {
874 struct route_entry *re;
875
876 RE_DEST_FOREACH_ROUTE(dest, re)
877 {
878 if (!CHECK_FLAG(re->status, ROUTE_ENTRY_SELECTED_FIB))
879 continue;
880
881 return re;
882 }
883
884 /*
885 * We have no route for this destination.
886 */
887 return NULL;
888 }
889
890 /*
891 * zfpm_build_updates
892 *
893 * Process the outgoing queue and write messages to the outbound
894 * buffer.
895 */
896 static void zfpm_build_updates(void)
897 {
898 struct stream *s;
899 rib_dest_t *dest;
900 unsigned char *buf, *data, *buf_end;
901 size_t msg_len;
902 size_t data_len;
903 fpm_msg_hdr_t *hdr;
904 struct route_entry *re;
905 int is_add, write_msg;
906 fpm_msg_type_e msg_type;
907
908 s = zfpm_g->obuf;
909
910 assert(stream_empty(s));
911
912 do {
913
914 /*
915 * Make sure there is enough space to write another message.
916 */
917 if (STREAM_WRITEABLE(s) < FPM_MAX_MSG_LEN)
918 break;
919
920 buf = STREAM_DATA(s) + stream_get_endp(s);
921 buf_end = buf + STREAM_WRITEABLE(s);
922
923 dest = TAILQ_FIRST(&zfpm_g->dest_q);
924 if (!dest)
925 break;
926
927 assert(CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM));
928
929 hdr = (fpm_msg_hdr_t *)buf;
930 hdr->version = FPM_PROTO_VERSION;
931
932 data = fpm_msg_data(hdr);
933
934 re = zfpm_route_for_update(dest);
935 is_add = re ? 1 : 0;
936
937 write_msg = 1;
938
939 /*
940 * If this is a route deletion, and we have not sent the route
941 * to
942 * the FPM previously, skip it.
943 */
944 if (!is_add && !CHECK_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM)) {
945 write_msg = 0;
946 zfpm_g->stats.nop_deletes_skipped++;
947 }
948
949 if (write_msg) {
950 data_len = zfpm_encode_route(dest, re, (char *)data,
951 buf_end - data, &msg_type);
952
953 assert(data_len);
954 if (data_len) {
955 hdr->msg_type = msg_type;
956 msg_len = fpm_data_len_to_msg_len(data_len);
957 hdr->msg_len = htons(msg_len);
958 stream_forward_endp(s, msg_len);
959
960 if (is_add)
961 zfpm_g->stats.route_adds++;
962 else
963 zfpm_g->stats.route_dels++;
964 }
965 }
966
967 /*
968 * Remove the dest from the queue, and reset the flag.
969 */
970 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
971 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
972
973 if (is_add) {
974 SET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
975 } else {
976 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
977 }
978
979 /*
980 * Delete the destination if necessary.
981 */
982 if (rib_gc_dest(dest->rnode))
983 zfpm_g->stats.dests_del_after_update++;
984
985 } while (1);
986 }
987
988 /*
989 * zfpm_write_cb
990 */
991 static int zfpm_write_cb(struct thread *thread)
992 {
993 struct stream *s;
994 int num_writes;
995
996 zfpm_g->stats.write_cb_calls++;
997 zfpm_g->t_write = NULL;
998
999 /*
1000 * Check if async connect is now done.
1001 */
1002 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
1003 zfpm_connect_check();
1004 return 0;
1005 }
1006
1007 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1008 assert(zfpm_g->sock >= 0);
1009
1010 num_writes = 0;
1011
1012 do {
1013 int bytes_to_write, bytes_written;
1014
1015 s = zfpm_g->obuf;
1016
1017 /*
1018 * If the stream is empty, try fill it up with data.
1019 */
1020 if (stream_empty(s)) {
1021 zfpm_build_updates();
1022 }
1023
1024 bytes_to_write = stream_get_endp(s) - stream_get_getp(s);
1025 if (!bytes_to_write)
1026 break;
1027
1028 bytes_written =
1029 write(zfpm_g->sock, STREAM_PNT(s), bytes_to_write);
1030 zfpm_g->stats.write_calls++;
1031 num_writes++;
1032
1033 if (bytes_written < 0) {
1034 if (ERRNO_IO_RETRY(errno))
1035 break;
1036
1037 zfpm_connection_down("failed to write to socket");
1038 return 0;
1039 }
1040
1041 if (bytes_written != bytes_to_write) {
1042
1043 /*
1044 * Partial write.
1045 */
1046 stream_forward_getp(s, bytes_written);
1047 zfpm_g->stats.partial_writes++;
1048 break;
1049 }
1050
1051 /*
1052 * We've written out the entire contents of the stream.
1053 */
1054 stream_reset(s);
1055
1056 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN) {
1057 zfpm_g->stats.max_writes_hit++;
1058 break;
1059 }
1060
1061 if (zfpm_thread_should_yield(thread)) {
1062 zfpm_g->stats.t_write_yields++;
1063 break;
1064 }
1065 } while (1);
1066
1067 if (zfpm_writes_pending())
1068 zfpm_write_on();
1069
1070 return 0;
1071 }
1072
1073 /*
1074 * zfpm_connect_cb
1075 */
1076 static int zfpm_connect_cb(struct thread *t)
1077 {
1078 int sock, ret;
1079 struct sockaddr_in serv;
1080
1081 zfpm_g->t_connect = NULL;
1082 assert(zfpm_g->state == ZFPM_STATE_ACTIVE);
1083
1084 sock = socket(AF_INET, SOCK_STREAM, 0);
1085 if (sock < 0) {
1086 zfpm_debug("Failed to create socket for connect(): %s",
1087 strerror(errno));
1088 zfpm_g->stats.connect_no_sock++;
1089 return 0;
1090 }
1091
1092 set_nonblocking(sock);
1093
1094 /* Make server socket. */
1095 memset(&serv, 0, sizeof(serv));
1096 serv.sin_family = AF_INET;
1097 serv.sin_port = htons(zfpm_g->fpm_port);
1098 #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1099 serv.sin_len = sizeof(struct sockaddr_in);
1100 #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1101 if (!zfpm_g->fpm_server)
1102 serv.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1103 else
1104 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1105
1106 /*
1107 * Connect to the FPM.
1108 */
1109 zfpm_g->connect_calls++;
1110 zfpm_g->stats.connect_calls++;
1111 zfpm_g->last_connect_call_time = monotime(NULL);
1112
1113 ret = connect(sock, (struct sockaddr *)&serv, sizeof(serv));
1114 if (ret >= 0) {
1115 zfpm_g->sock = sock;
1116 zfpm_connection_up("connect succeeded");
1117 return 1;
1118 }
1119
1120 if (errno == EINPROGRESS) {
1121 zfpm_g->sock = sock;
1122 zfpm_read_on();
1123 zfpm_write_on();
1124 zfpm_set_state(ZFPM_STATE_CONNECTING,
1125 "async connect in progress");
1126 return 0;
1127 }
1128
1129 zlog_info("can't connect to FPM %d: %s", sock, safe_strerror(errno));
1130 close(sock);
1131
1132 /*
1133 * Restart timer for retrying connection.
1134 */
1135 zfpm_start_connect_timer("connect() failed");
1136 return 0;
1137 }
1138
1139 /*
1140 * zfpm_set_state
1141 *
1142 * Move state machine into the given state.
1143 */
1144 static void zfpm_set_state(zfpm_state_t state, const char *reason)
1145 {
1146 zfpm_state_t cur_state = zfpm_g->state;
1147
1148 if (!reason)
1149 reason = "Unknown";
1150
1151 if (state == cur_state)
1152 return;
1153
1154 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1155 zfpm_state_to_str(cur_state), zfpm_state_to_str(state),
1156 reason);
1157
1158 switch (state) {
1159
1160 case ZFPM_STATE_IDLE:
1161 assert(cur_state == ZFPM_STATE_ESTABLISHED);
1162 break;
1163
1164 case ZFPM_STATE_ACTIVE:
1165 assert(cur_state == ZFPM_STATE_IDLE
1166 || cur_state == ZFPM_STATE_CONNECTING);
1167 assert(zfpm_g->t_connect);
1168 break;
1169
1170 case ZFPM_STATE_CONNECTING:
1171 assert(zfpm_g->sock);
1172 assert(cur_state == ZFPM_STATE_ACTIVE);
1173 assert(zfpm_g->t_read);
1174 assert(zfpm_g->t_write);
1175 break;
1176
1177 case ZFPM_STATE_ESTABLISHED:
1178 assert(cur_state == ZFPM_STATE_ACTIVE
1179 || cur_state == ZFPM_STATE_CONNECTING);
1180 assert(zfpm_g->sock);
1181 assert(zfpm_g->t_read);
1182 assert(zfpm_g->t_write);
1183 break;
1184 }
1185
1186 zfpm_g->state = state;
1187 }
1188
1189 /*
1190 * zfpm_calc_connect_delay
1191 *
1192 * Returns the number of seconds after which we should attempt to
1193 * reconnect to the FPM.
1194 */
1195 static long zfpm_calc_connect_delay(void)
1196 {
1197 time_t elapsed;
1198
1199 /*
1200 * Return 0 if this is our first attempt to connect.
1201 */
1202 if (zfpm_g->connect_calls == 0) {
1203 return 0;
1204 }
1205
1206 elapsed = zfpm_get_elapsed_time(zfpm_g->last_connect_call_time);
1207
1208 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1209 return 0;
1210 }
1211
1212 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1213 }
1214
1215 /*
1216 * zfpm_start_connect_timer
1217 */
1218 static void zfpm_start_connect_timer(const char *reason)
1219 {
1220 long delay_secs;
1221
1222 assert(!zfpm_g->t_connect);
1223 assert(zfpm_g->sock < 0);
1224
1225 assert(zfpm_g->state == ZFPM_STATE_IDLE
1226 || zfpm_g->state == ZFPM_STATE_ACTIVE
1227 || zfpm_g->state == ZFPM_STATE_CONNECTING);
1228
1229 delay_secs = zfpm_calc_connect_delay();
1230 zfpm_debug("scheduling connect in %ld seconds", delay_secs);
1231
1232 thread_add_timer(zfpm_g->master, zfpm_connect_cb, 0, delay_secs,
1233 &zfpm_g->t_connect);
1234 zfpm_set_state(ZFPM_STATE_ACTIVE, reason);
1235 }
1236
1237 /*
1238 * zfpm_is_enabled
1239 *
1240 * Returns TRUE if the zebra FPM module has been enabled.
1241 */
1242 static inline int zfpm_is_enabled(void)
1243 {
1244 return zfpm_g->enabled;
1245 }
1246
1247 /*
1248 * zfpm_conn_is_up
1249 *
1250 * Returns TRUE if the connection to the FPM is up.
1251 */
1252 static inline int zfpm_conn_is_up(void)
1253 {
1254 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1255 return 0;
1256
1257 assert(zfpm_g->sock >= 0);
1258
1259 return 1;
1260 }
1261
1262 /*
1263 * zfpm_trigger_update
1264 *
1265 * The zebra code invokes this function to indicate that we should
1266 * send an update to the FPM about the given route_node.
1267 */
1268 static int zfpm_trigger_update(struct route_node *rn, const char *reason)
1269 {
1270 rib_dest_t *dest;
1271 char buf[PREFIX_STRLEN];
1272
1273 /*
1274 * Ignore if the connection is down. We will update the FPM about
1275 * all destinations once the connection comes up.
1276 */
1277 if (!zfpm_conn_is_up())
1278 return 0;
1279
1280 dest = rib_dest_from_rnode(rn);
1281
1282 /*
1283 * Ignore the trigger if the dest is not in a table that we would
1284 * send to the FPM.
1285 */
1286 if (!zfpm_is_table_for_fpm(rib_dest_table(dest))) {
1287 zfpm_g->stats.non_fpm_table_triggers++;
1288 return 0;
1289 }
1290
1291 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
1292 zfpm_g->stats.redundant_triggers++;
1293 return 0;
1294 }
1295
1296 if (reason) {
1297 zfpm_debug("%s triggering update to FPM - Reason: %s",
1298 prefix2str(&rn->p, buf, sizeof(buf)), reason);
1299 }
1300
1301 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1302 TAILQ_INSERT_TAIL(&zfpm_g->dest_q, dest, fpm_q_entries);
1303 zfpm_g->stats.updates_triggered++;
1304
1305 /*
1306 * Make sure that writes are enabled.
1307 */
1308 if (zfpm_g->t_write)
1309 return 0;
1310
1311 zfpm_write_on();
1312 return 0;
1313 }
1314
1315 /*
1316 * zfpm_stats_timer_cb
1317 */
1318 static int zfpm_stats_timer_cb(struct thread *t)
1319 {
1320 zfpm_g->t_stats = NULL;
1321
1322 /*
1323 * Remember the stats collected in the last interval for display
1324 * purposes.
1325 */
1326 zfpm_stats_copy(&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1327
1328 /*
1329 * Add the current set of stats into the cumulative statistics.
1330 */
1331 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1332 &zfpm_g->cumulative_stats);
1333
1334 /*
1335 * Start collecting stats afresh over the next interval.
1336 */
1337 zfpm_stats_reset(&zfpm_g->stats);
1338
1339 zfpm_start_stats_timer();
1340
1341 return 0;
1342 }
1343
1344 /*
1345 * zfpm_stop_stats_timer
1346 */
1347 static void zfpm_stop_stats_timer(void)
1348 {
1349 if (!zfpm_g->t_stats)
1350 return;
1351
1352 zfpm_debug("Stopping existing stats timer");
1353 THREAD_TIMER_OFF(zfpm_g->t_stats);
1354 }
1355
1356 /*
1357 * zfpm_start_stats_timer
1358 */
1359 void zfpm_start_stats_timer(void)
1360 {
1361 assert(!zfpm_g->t_stats);
1362
1363 thread_add_timer(zfpm_g->master, zfpm_stats_timer_cb, 0,
1364 ZFPM_STATS_IVL_SECS, &zfpm_g->t_stats);
1365 }
1366
1367 /*
1368 * Helper macro for zfpm_show_stats() below.
1369 */
1370 #define ZFPM_SHOW_STAT(counter) \
1371 do { \
1372 vty_out(vty, "%-40s %10lu %16lu\n", #counter, \
1373 total_stats.counter, zfpm_g->last_ivl_stats.counter); \
1374 } while (0)
1375
1376 /*
1377 * zfpm_show_stats
1378 */
1379 static void zfpm_show_stats(struct vty *vty)
1380 {
1381 zfpm_stats_t total_stats;
1382 time_t elapsed;
1383
1384 vty_out(vty, "\n%-40s %10s Last %2d secs\n\n", "Counter", "Total",
1385 ZFPM_STATS_IVL_SECS);
1386
1387 /*
1388 * Compute the total stats up to this instant.
1389 */
1390 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1391 &total_stats);
1392
1393 ZFPM_SHOW_STAT(connect_calls);
1394 ZFPM_SHOW_STAT(connect_no_sock);
1395 ZFPM_SHOW_STAT(read_cb_calls);
1396 ZFPM_SHOW_STAT(write_cb_calls);
1397 ZFPM_SHOW_STAT(write_calls);
1398 ZFPM_SHOW_STAT(partial_writes);
1399 ZFPM_SHOW_STAT(max_writes_hit);
1400 ZFPM_SHOW_STAT(t_write_yields);
1401 ZFPM_SHOW_STAT(nop_deletes_skipped);
1402 ZFPM_SHOW_STAT(route_adds);
1403 ZFPM_SHOW_STAT(route_dels);
1404 ZFPM_SHOW_STAT(updates_triggered);
1405 ZFPM_SHOW_STAT(non_fpm_table_triggers);
1406 ZFPM_SHOW_STAT(redundant_triggers);
1407 ZFPM_SHOW_STAT(dests_del_after_update);
1408 ZFPM_SHOW_STAT(t_conn_down_starts);
1409 ZFPM_SHOW_STAT(t_conn_down_dests_processed);
1410 ZFPM_SHOW_STAT(t_conn_down_yields);
1411 ZFPM_SHOW_STAT(t_conn_down_finishes);
1412 ZFPM_SHOW_STAT(t_conn_up_starts);
1413 ZFPM_SHOW_STAT(t_conn_up_dests_processed);
1414 ZFPM_SHOW_STAT(t_conn_up_yields);
1415 ZFPM_SHOW_STAT(t_conn_up_aborts);
1416 ZFPM_SHOW_STAT(t_conn_up_finishes);
1417
1418 if (!zfpm_g->last_stats_clear_time)
1419 return;
1420
1421 elapsed = zfpm_get_elapsed_time(zfpm_g->last_stats_clear_time);
1422
1423 vty_out(vty, "\nStats were cleared %lu seconds ago\n",
1424 (unsigned long)elapsed);
1425 }
1426
1427 /*
1428 * zfpm_clear_stats
1429 */
1430 static void zfpm_clear_stats(struct vty *vty)
1431 {
1432 if (!zfpm_is_enabled()) {
1433 vty_out(vty, "The FPM module is not enabled...\n");
1434 return;
1435 }
1436
1437 zfpm_stats_reset(&zfpm_g->stats);
1438 zfpm_stats_reset(&zfpm_g->last_ivl_stats);
1439 zfpm_stats_reset(&zfpm_g->cumulative_stats);
1440
1441 zfpm_stop_stats_timer();
1442 zfpm_start_stats_timer();
1443
1444 zfpm_g->last_stats_clear_time = monotime(NULL);
1445
1446 vty_out(vty, "Cleared FPM stats\n");
1447 }
1448
1449 /*
1450 * show_zebra_fpm_stats
1451 */
1452 DEFUN (show_zebra_fpm_stats,
1453 show_zebra_fpm_stats_cmd,
1454 "show zebra fpm stats",
1455 SHOW_STR
1456 "Zebra information\n"
1457 "Forwarding Path Manager information\n"
1458 "Statistics\n")
1459 {
1460 zfpm_show_stats(vty);
1461 return CMD_SUCCESS;
1462 }
1463
1464 /*
1465 * clear_zebra_fpm_stats
1466 */
1467 DEFUN (clear_zebra_fpm_stats,
1468 clear_zebra_fpm_stats_cmd,
1469 "clear zebra fpm stats",
1470 CLEAR_STR
1471 "Zebra information\n"
1472 "Clear Forwarding Path Manager information\n"
1473 "Statistics\n")
1474 {
1475 zfpm_clear_stats(vty);
1476 return CMD_SUCCESS;
1477 }
1478
1479 /*
1480 * update fpm connection information
1481 */
1482 DEFUN ( fpm_remote_ip,
1483 fpm_remote_ip_cmd,
1484 "fpm connection ip A.B.C.D port (1-65535)",
1485 "fpm connection remote ip and port\n"
1486 "Remote fpm server ip A.B.C.D\n"
1487 "Enter ip ")
1488 {
1489
1490 in_addr_t fpm_server;
1491 uint32_t port_no;
1492
1493 fpm_server = inet_addr(argv[3]->arg);
1494 if (fpm_server == INADDR_NONE)
1495 return CMD_ERR_INCOMPLETE;
1496
1497 port_no = atoi(argv[5]->arg);
1498 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1499 return CMD_ERR_INCOMPLETE;
1500
1501 zfpm_g->fpm_server = fpm_server;
1502 zfpm_g->fpm_port = port_no;
1503
1504
1505 return CMD_SUCCESS;
1506 }
1507
1508 DEFUN ( no_fpm_remote_ip,
1509 no_fpm_remote_ip_cmd,
1510 "no fpm connection ip A.B.C.D port (1-65535)",
1511 "fpm connection remote ip and port\n"
1512 "Connection\n"
1513 "Remote fpm server ip A.B.C.D\n"
1514 "Enter ip ")
1515 {
1516 if (zfpm_g->fpm_server != inet_addr(argv[4]->arg)
1517 || zfpm_g->fpm_port != atoi(argv[6]->arg))
1518 return CMD_ERR_NO_MATCH;
1519
1520 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1521 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1522
1523 return CMD_SUCCESS;
1524 }
1525
1526 /*
1527 * zfpm_init_message_format
1528 */
1529 static inline void zfpm_init_message_format(const char *format)
1530 {
1531 int have_netlink, have_protobuf;
1532
1533 #ifdef HAVE_NETLINK
1534 have_netlink = 1;
1535 #else
1536 have_netlink = 0;
1537 #endif
1538
1539 #ifdef HAVE_PROTOBUF
1540 have_protobuf = 1;
1541 #else
1542 have_protobuf = 0;
1543 #endif
1544
1545 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
1546
1547 if (!format) {
1548 if (have_netlink) {
1549 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1550 } else if (have_protobuf) {
1551 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1552 }
1553 return;
1554 }
1555
1556 if (!strcmp("netlink", format)) {
1557 if (!have_netlink) {
1558 zlog_err("FPM netlink message format is not available");
1559 return;
1560 }
1561 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1562 return;
1563 }
1564
1565 if (!strcmp("protobuf", format)) {
1566 if (!have_protobuf) {
1567 zlog_err(
1568 "FPM protobuf message format is not available");
1569 return;
1570 }
1571 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1572 return;
1573 }
1574
1575 zlog_warn("Unknown fpm format '%s'", format);
1576 }
1577
1578 /**
1579 * fpm_remote_srv_write
1580 *
1581 * Module to write remote fpm connection
1582 *
1583 * Returns ZERO on success.
1584 */
1585
1586 static int fpm_remote_srv_write(struct vty *vty)
1587 {
1588 struct in_addr in;
1589
1590 in.s_addr = zfpm_g->fpm_server;
1591
1592 if ((zfpm_g->fpm_server != FPM_DEFAULT_IP
1593 && zfpm_g->fpm_server != INADDR_ANY)
1594 || (zfpm_g->fpm_port != FPM_DEFAULT_PORT
1595 && zfpm_g->fpm_port != 0))
1596 vty_out(vty, "fpm connection ip %s port %d\n", inet_ntoa(in),
1597 zfpm_g->fpm_port);
1598
1599 return 0;
1600 }
1601
1602
1603 /* Zebra node */
1604 static struct cmd_node zebra_node = {ZEBRA_NODE, "", 1};
1605
1606
1607 /**
1608 * zfpm_init
1609 *
1610 * One-time initialization of the Zebra FPM module.
1611 *
1612 * @param[in] port port at which FPM is running.
1613 * @param[in] enable TRUE if the zebra FPM module should be enabled
1614 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
1615 *
1616 * Returns TRUE on success.
1617 */
1618 static int zfpm_init(struct thread_master *master)
1619 {
1620 int enable = 1;
1621 uint16_t port = 0;
1622 const char *format = THIS_MODULE->load_args;
1623
1624 memset(zfpm_g, 0, sizeof(*zfpm_g));
1625 zfpm_g->master = master;
1626 TAILQ_INIT(&zfpm_g->dest_q);
1627 zfpm_g->sock = -1;
1628 zfpm_g->state = ZFPM_STATE_IDLE;
1629
1630 zfpm_stats_init(&zfpm_g->stats);
1631 zfpm_stats_init(&zfpm_g->last_ivl_stats);
1632 zfpm_stats_init(&zfpm_g->cumulative_stats);
1633
1634 install_node(&zebra_node, fpm_remote_srv_write);
1635 install_element(ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1636 install_element(ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1637 install_element(CONFIG_NODE, &fpm_remote_ip_cmd);
1638 install_element(CONFIG_NODE, &no_fpm_remote_ip_cmd);
1639
1640 zfpm_init_message_format(format);
1641
1642 /*
1643 * Disable FPM interface if no suitable format is available.
1644 */
1645 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1646 enable = 0;
1647
1648 zfpm_g->enabled = enable;
1649
1650 if (!zfpm_g->fpm_server)
1651 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1652
1653 if (!port)
1654 port = FPM_DEFAULT_PORT;
1655
1656 zfpm_g->fpm_port = port;
1657
1658 zfpm_g->obuf = stream_new(ZFPM_OBUF_SIZE);
1659 zfpm_g->ibuf = stream_new(ZFPM_IBUF_SIZE);
1660
1661 zfpm_start_stats_timer();
1662 zfpm_start_connect_timer("initialized");
1663 return 0;
1664 }
1665
1666 static int zebra_fpm_module_init(void)
1667 {
1668 hook_register(rib_update, zfpm_trigger_update);
1669 hook_register(frr_late_init, zfpm_init);
1670 return 0;
1671 }
1672
1673 FRR_MODULE_SETUP(.name = "zebra_fpm", .version = FRR_VERSION,
1674 .description = "zebra FPM (Forwarding Plane Manager) module",
1675 .init = zebra_fpm_module_init, )