]> git.proxmox.com Git - mirror_frr.git/blame - zebra/zebra_fpm.c
cumulus: Fixup function only needed for linux
[mirror_frr.git] / zebra / zebra_fpm.c
CommitLineData
5adc2528
AS
1/*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with GNU Zebra; see the file COPYING. If not, write to the Free
21 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
22 * 02111-1307, USA.
23 */
24
25#include <zebra.h>
26
27#include "log.h"
28#include "stream.h"
29#include "thread.h"
30#include "network.h"
31#include "command.h"
32
33#include "zebra/rib.h"
7c551956
DS
34#include "zebra/zserv.h"
35#include "zebra/zebra_ns.h"
36#include "zebra/zebra_vrf.h"
5adc2528
AS
37
38#include "fpm/fpm.h"
39#include "zebra_fpm.h"
40#include "zebra_fpm_private.h"
41
42/*
43 * Interval at which we attempt to connect to the FPM.
44 */
45#define ZFPM_CONNECT_RETRY_IVL 5
46
47/*
48 * Sizes of outgoing and incoming stream buffers for writing/reading
49 * FPM messages.
50 */
51#define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
52#define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
53
54/*
55 * The maximum number of times the FPM socket write callback can call
56 * 'write' before it yields.
57 */
58#define ZFPM_MAX_WRITES_PER_RUN 10
59
60/*
61 * Interval over which we collect statistics.
62 */
63#define ZFPM_STATS_IVL_SECS 10
64
65/*
66 * Structure that holds state for iterating over all route_node
67 * structures that are candidates for being communicated to the FPM.
68 */
69typedef struct zfpm_rnodes_iter_t_
70{
71 rib_tables_iter_t tables_iter;
72 route_table_iter_t iter;
73} zfpm_rnodes_iter_t;
74
75/*
76 * Statistics.
77 */
78typedef struct zfpm_stats_t_ {
79 unsigned long connect_calls;
80 unsigned long connect_no_sock;
81
82 unsigned long read_cb_calls;
83
84 unsigned long write_cb_calls;
85 unsigned long write_calls;
86 unsigned long partial_writes;
87 unsigned long max_writes_hit;
88 unsigned long t_write_yields;
89
90 unsigned long nop_deletes_skipped;
91 unsigned long route_adds;
92 unsigned long route_dels;
93
94 unsigned long updates_triggered;
95 unsigned long redundant_triggers;
96 unsigned long non_fpm_table_triggers;
97
98 unsigned long dests_del_after_update;
99
100 unsigned long t_conn_down_starts;
101 unsigned long t_conn_down_dests_processed;
102 unsigned long t_conn_down_yields;
103 unsigned long t_conn_down_finishes;
104
105 unsigned long t_conn_up_starts;
106 unsigned long t_conn_up_dests_processed;
107 unsigned long t_conn_up_yields;
108 unsigned long t_conn_up_aborts;
109 unsigned long t_conn_up_finishes;
110
111} zfpm_stats_t;
112
113/*
114 * States for the FPM state machine.
115 */
116typedef enum {
117
118 /*
119 * In this state we are not yet ready to connect to the FPM. This
120 * can happen when this module is disabled, or if we're cleaning up
121 * after a connection has gone down.
122 */
123 ZFPM_STATE_IDLE,
124
125 /*
126 * Ready to talk to the FPM and periodically trying to connect to
127 * it.
128 */
129 ZFPM_STATE_ACTIVE,
130
131 /*
132 * In the middle of bringing up a TCP connection. Specifically,
133 * waiting for a connect() call to complete asynchronously.
134 */
135 ZFPM_STATE_CONNECTING,
136
137 /*
138 * TCP connection to the FPM is up.
139 */
140 ZFPM_STATE_ESTABLISHED
141
142} zfpm_state_t;
143
fb0aa886
AS
144/*
145 * Message format to be used to communicate with the FPM.
146 */
147typedef enum
148{
149 ZFPM_MSG_FORMAT_NONE,
150 ZFPM_MSG_FORMAT_NETLINK,
151 ZFPM_MSG_FORMAT_PROTOBUF,
152} zfpm_msg_format_e;
5adc2528
AS
153/*
154 * Globals.
155 */
156typedef struct zfpm_glob_t_
157{
158
159 /*
160 * True if the FPM module has been enabled.
161 */
162 int enabled;
163
fb0aa886
AS
164 /*
165 * Message format to be used to communicate with the fpm.
166 */
167 zfpm_msg_format_e message_format;
168
5adc2528
AS
169 struct thread_master *master;
170
171 zfpm_state_t state;
172
711ff0ba 173 in_addr_t fpm_server;
5adc2528
AS
174 /*
175 * Port on which the FPM is running.
176 */
177 int fpm_port;
178
179 /*
180 * List of rib_dest_t structures to be processed
181 */
182 TAILQ_HEAD (zfpm_dest_q, rib_dest_t_) dest_q;
183
184 /*
185 * Stream socket to the FPM.
186 */
187 int sock;
188
189 /*
190 * Buffers for messages to/from the FPM.
191 */
192 struct stream *obuf;
193 struct stream *ibuf;
194
195 /*
196 * Threads for I/O.
197 */
198 struct thread *t_connect;
199 struct thread *t_write;
200 struct thread *t_read;
201
202 /*
203 * Thread to clean up after the TCP connection to the FPM goes down
204 * and the state that belongs to it.
205 */
206 struct thread *t_conn_down;
207
208 struct {
209 zfpm_rnodes_iter_t iter;
210 } t_conn_down_state;
211
212 /*
213 * Thread to take actions once the TCP conn to the FPM comes up, and
214 * the state that belongs to it.
215 */
216 struct thread *t_conn_up;
217
218 struct {
219 zfpm_rnodes_iter_t iter;
220 } t_conn_up_state;
221
222 unsigned long connect_calls;
223 time_t last_connect_call_time;
224
225 /*
226 * Stats from the start of the current statistics interval up to
227 * now. These are the counters we typically update in the code.
228 */
229 zfpm_stats_t stats;
230
231 /*
232 * Statistics that were gathered in the last collection interval.
233 */
234 zfpm_stats_t last_ivl_stats;
235
236 /*
237 * Cumulative stats from the last clear to the start of the current
238 * statistics interval.
239 */
240 zfpm_stats_t cumulative_stats;
241
242 /*
243 * Stats interval timer.
244 */
245 struct thread *t_stats;
246
247 /*
248 * If non-zero, the last time when statistics were cleared.
249 */
250 time_t last_stats_clear_time;
251
252} zfpm_glob_t;
253
254static zfpm_glob_t zfpm_glob_space;
255static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
256
257static int zfpm_read_cb (struct thread *thread);
258static int zfpm_write_cb (struct thread *thread);
259
260static void zfpm_set_state (zfpm_state_t state, const char *reason);
261static void zfpm_start_connect_timer (const char *reason);
262static void zfpm_start_stats_timer (void);
263
264/*
265 * zfpm_thread_should_yield
266 */
267static inline int
268zfpm_thread_should_yield (struct thread *t)
269{
270 return thread_should_yield (t);
271}
272
273/*
274 * zfpm_state_to_str
275 */
276static const char *
277zfpm_state_to_str (zfpm_state_t state)
278{
279 switch (state)
280 {
281
282 case ZFPM_STATE_IDLE:
283 return "idle";
284
285 case ZFPM_STATE_ACTIVE:
286 return "active";
287
288 case ZFPM_STATE_CONNECTING:
289 return "connecting";
290
291 case ZFPM_STATE_ESTABLISHED:
292 return "established";
293
294 default:
295 return "unknown";
296 }
297}
298
299/*
300 * zfpm_get_time
301 */
302static time_t
303zfpm_get_time (void)
304{
305 struct timeval tv;
306
307 if (quagga_gettime (QUAGGA_CLK_MONOTONIC, &tv) < 0)
308 zlog_warn ("FPM: quagga_gettime failed!!");
309
310 return tv.tv_sec;
311}
312
313/*
314 * zfpm_get_elapsed_time
315 *
316 * Returns the time elapsed (in seconds) since the given time.
317 */
318static time_t
319zfpm_get_elapsed_time (time_t reference)
320{
321 time_t now;
322
323 now = zfpm_get_time ();
324
325 if (now < reference)
326 {
327 assert (0);
328 return 0;
329 }
330
331 return now - reference;
332}
333
334/*
335 * zfpm_is_table_for_fpm
336 *
337 * Returns TRUE if the the given table is to be communicated to the
338 * FPM.
339 */
340static inline int
341zfpm_is_table_for_fpm (struct route_table *table)
342{
343 rib_table_info_t *info;
344
345 info = rib_table_info (table);
346
347 /*
348 * We only send the unicast tables in the main instance to the FPM
349 * at this point.
350 */
661512bf 351 if (zvrf_id (info->zvrf) != 0)
5adc2528
AS
352 return 0;
353
354 if (info->safi != SAFI_UNICAST)
355 return 0;
356
357 return 1;
358}
359
360/*
361 * zfpm_rnodes_iter_init
362 */
363static inline void
364zfpm_rnodes_iter_init (zfpm_rnodes_iter_t *iter)
365{
366 memset (iter, 0, sizeof (*iter));
367 rib_tables_iter_init (&iter->tables_iter);
368
369 /*
370 * This is a hack, but it makes implementing 'next' easier by
371 * ensuring that route_table_iter_next() will return NULL the first
372 * time we call it.
373 */
374 route_table_iter_init (&iter->iter, NULL);
375 route_table_iter_cleanup (&iter->iter);
376}
377
378/*
379 * zfpm_rnodes_iter_next
380 */
381static inline struct route_node *
382zfpm_rnodes_iter_next (zfpm_rnodes_iter_t *iter)
383{
384 struct route_node *rn;
385 struct route_table *table;
386
387 while (1)
388 {
389 rn = route_table_iter_next (&iter->iter);
390 if (rn)
391 return rn;
392
393 /*
394 * We've made our way through this table, go to the next one.
395 */
396 route_table_iter_cleanup (&iter->iter);
397
398 while ((table = rib_tables_iter_next (&iter->tables_iter)))
399 {
400 if (zfpm_is_table_for_fpm (table))
401 break;
402 }
403
404 if (!table)
405 return NULL;
406
407 route_table_iter_init (&iter->iter, table);
408 }
409
410 return NULL;
411}
412
413/*
414 * zfpm_rnodes_iter_pause
415 */
416static inline void
417zfpm_rnodes_iter_pause (zfpm_rnodes_iter_t *iter)
418{
419 route_table_iter_pause (&iter->iter);
420}
421
422/*
423 * zfpm_rnodes_iter_cleanup
424 */
425static inline void
426zfpm_rnodes_iter_cleanup (zfpm_rnodes_iter_t *iter)
427{
428 route_table_iter_cleanup (&iter->iter);
429 rib_tables_iter_cleanup (&iter->tables_iter);
430}
431
432/*
433 * zfpm_stats_init
434 *
435 * Initialize a statistics block.
436 */
437static inline void
438zfpm_stats_init (zfpm_stats_t *stats)
439{
440 memset (stats, 0, sizeof (*stats));
441}
442
443/*
444 * zfpm_stats_reset
445 */
446static inline void
447zfpm_stats_reset (zfpm_stats_t *stats)
448{
449 zfpm_stats_init (stats);
450}
451
452/*
453 * zfpm_stats_copy
454 */
455static inline void
456zfpm_stats_copy (const zfpm_stats_t *src, zfpm_stats_t *dest)
457{
458 memcpy (dest, src, sizeof (*dest));
459}
460
461/*
462 * zfpm_stats_compose
463 *
464 * Total up the statistics in two stats structures ('s1 and 's2') and
465 * return the result in the third argument, 'result'. Note that the
466 * pointer 'result' may be the same as 's1' or 's2'.
467 *
468 * For simplicity, the implementation below assumes that the stats
469 * structure is composed entirely of counters. This can easily be
470 * changed when necessary.
471 */
472static void
473zfpm_stats_compose (const zfpm_stats_t *s1, const zfpm_stats_t *s2,
474 zfpm_stats_t *result)
475{
476 const unsigned long *p1, *p2;
477 unsigned long *result_p;
478 int i, num_counters;
479
480 p1 = (const unsigned long *) s1;
481 p2 = (const unsigned long *) s2;
482 result_p = (unsigned long *) result;
483
484 num_counters = (sizeof (zfpm_stats_t) / sizeof (unsigned long));
485
486 for (i = 0; i < num_counters; i++)
487 {
488 result_p[i] = p1[i] + p2[i];
489 }
490}
491
492/*
493 * zfpm_read_on
494 */
495static inline void
496zfpm_read_on (void)
497{
498 assert (!zfpm_g->t_read);
499 assert (zfpm_g->sock >= 0);
500
501 THREAD_READ_ON (zfpm_g->master, zfpm_g->t_read, zfpm_read_cb, 0,
502 zfpm_g->sock);
503}
504
505/*
506 * zfpm_write_on
507 */
508static inline void
509zfpm_write_on (void)
510{
511 assert (!zfpm_g->t_write);
512 assert (zfpm_g->sock >= 0);
513
514 THREAD_WRITE_ON (zfpm_g->master, zfpm_g->t_write, zfpm_write_cb, 0,
515 zfpm_g->sock);
516}
517
518/*
519 * zfpm_read_off
520 */
521static inline void
522zfpm_read_off (void)
523{
524 THREAD_READ_OFF (zfpm_g->t_read);
525}
526
527/*
528 * zfpm_write_off
529 */
530static inline void
531zfpm_write_off (void)
532{
533 THREAD_WRITE_OFF (zfpm_g->t_write);
534}
535
536/*
537 * zfpm_conn_up_thread_cb
538 *
539 * Callback for actions to be taken when the connection to the FPM
540 * comes up.
541 */
542static int
543zfpm_conn_up_thread_cb (struct thread *thread)
544{
545 struct route_node *rnode;
546 zfpm_rnodes_iter_t *iter;
547 rib_dest_t *dest;
548
549 assert (zfpm_g->t_conn_up);
550 zfpm_g->t_conn_up = NULL;
551
552 iter = &zfpm_g->t_conn_up_state.iter;
553
554 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
555 {
556 zfpm_debug ("Connection not up anymore, conn_up thread aborting");
557 zfpm_g->stats.t_conn_up_aborts++;
558 goto done;
559 }
560
561 while ((rnode = zfpm_rnodes_iter_next (iter)))
562 {
563 dest = rib_dest_from_rnode (rnode);
564
565 if (dest)
566 {
567 zfpm_g->stats.t_conn_up_dests_processed++;
568 zfpm_trigger_update (rnode, NULL);
569 }
570
571 /*
572 * Yield if need be.
573 */
574 if (!zfpm_thread_should_yield (thread))
575 continue;
576
577 zfpm_g->stats.t_conn_up_yields++;
578 zfpm_rnodes_iter_pause (iter);
579 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
580 zfpm_conn_up_thread_cb,
581 0, 0);
582 return 0;
583 }
584
585 zfpm_g->stats.t_conn_up_finishes++;
586
587 done:
588 zfpm_rnodes_iter_cleanup (iter);
589 return 0;
590}
591
592/*
593 * zfpm_connection_up
594 *
595 * Called when the connection to the FPM comes up.
596 */
597static void
598zfpm_connection_up (const char *detail)
599{
600 assert (zfpm_g->sock >= 0);
601 zfpm_read_on ();
602 zfpm_write_on ();
603 zfpm_set_state (ZFPM_STATE_ESTABLISHED, detail);
604
605 /*
606 * Start thread to push existing routes to the FPM.
607 */
608 assert (!zfpm_g->t_conn_up);
609
610 zfpm_rnodes_iter_init (&zfpm_g->t_conn_up_state.iter);
611
612 zfpm_debug ("Starting conn_up thread");
613 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
614 zfpm_conn_up_thread_cb, 0, 0);
615 zfpm_g->stats.t_conn_up_starts++;
616}
617
618/*
619 * zfpm_connect_check
620 *
621 * Check if an asynchronous connect() to the FPM is complete.
622 */
623static void
35dece84 624zfpm_connect_check (void)
5adc2528
AS
625{
626 int status;
627 socklen_t slen;
628 int ret;
629
630 zfpm_read_off ();
631 zfpm_write_off ();
632
633 slen = sizeof (status);
634 ret = getsockopt (zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *) &status,
635 &slen);
636
637 if (ret >= 0 && status == 0)
638 {
639 zfpm_connection_up ("async connect complete");
640 return;
641 }
642
643 /*
644 * getsockopt() failed or indicated an error on the socket.
645 */
646 close (zfpm_g->sock);
647 zfpm_g->sock = -1;
648
649 zfpm_start_connect_timer ("getsockopt() after async connect failed");
650 return;
651}
652
653/*
654 * zfpm_conn_down_thread_cb
655 *
656 * Callback that is invoked to clean up state after the TCP connection
657 * to the FPM goes down.
658 */
659static int
660zfpm_conn_down_thread_cb (struct thread *thread)
661{
662 struct route_node *rnode;
663 zfpm_rnodes_iter_t *iter;
664 rib_dest_t *dest;
665
666 assert (zfpm_g->state == ZFPM_STATE_IDLE);
667
668 assert (zfpm_g->t_conn_down);
669 zfpm_g->t_conn_down = NULL;
670
671 iter = &zfpm_g->t_conn_down_state.iter;
672
673 while ((rnode = zfpm_rnodes_iter_next (iter)))
674 {
675 dest = rib_dest_from_rnode (rnode);
676
677 if (dest)
678 {
679 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM))
680 {
681 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
682 }
683
684 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
685 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
686
687 zfpm_g->stats.t_conn_down_dests_processed++;
688
689 /*
690 * Check if the dest should be deleted.
691 */
692 rib_gc_dest(rnode);
693 }
694
695 /*
696 * Yield if need be.
697 */
698 if (!zfpm_thread_should_yield (thread))
699 continue;
700
701 zfpm_g->stats.t_conn_down_yields++;
702 zfpm_rnodes_iter_pause (iter);
703 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
704 zfpm_conn_down_thread_cb,
705 0, 0);
706 return 0;
707 }
708
709 zfpm_g->stats.t_conn_down_finishes++;
710 zfpm_rnodes_iter_cleanup (iter);
711
712 /*
713 * Start the process of connecting to the FPM again.
714 */
715 zfpm_start_connect_timer ("cleanup complete");
716 return 0;
717}
718
719/*
720 * zfpm_connection_down
721 *
722 * Called when the connection to the FPM has gone down.
723 */
724static void
725zfpm_connection_down (const char *detail)
726{
727 if (!detail)
728 detail = "unknown";
729
730 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
731
732 zlog_info ("connection to the FPM has gone down: %s", detail);
733
734 zfpm_read_off ();
735 zfpm_write_off ();
736
737 stream_reset (zfpm_g->ibuf);
738 stream_reset (zfpm_g->obuf);
739
740 if (zfpm_g->sock >= 0) {
741 close (zfpm_g->sock);
742 zfpm_g->sock = -1;
743 }
744
745 /*
746 * Start thread to clean up state after the connection goes down.
747 */
748 assert (!zfpm_g->t_conn_down);
749 zfpm_debug ("Starting conn_down thread");
750 zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter);
751 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
752 zfpm_conn_down_thread_cb, 0, 0);
753 zfpm_g->stats.t_conn_down_starts++;
754
755 zfpm_set_state (ZFPM_STATE_IDLE, detail);
756}
757
758/*
759 * zfpm_read_cb
760 */
761static int
762zfpm_read_cb (struct thread *thread)
763{
764 size_t already;
765 struct stream *ibuf;
766 uint16_t msg_len;
767 fpm_msg_hdr_t *hdr;
768
769 zfpm_g->stats.read_cb_calls++;
770 assert (zfpm_g->t_read);
771 zfpm_g->t_read = NULL;
772
773 /*
774 * Check if async connect is now done.
775 */
776 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
777 {
778 zfpm_connect_check();
779 return 0;
780 }
781
782 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
783 assert (zfpm_g->sock >= 0);
784
785 ibuf = zfpm_g->ibuf;
786
787 already = stream_get_endp (ibuf);
788 if (already < FPM_MSG_HDR_LEN)
789 {
790 ssize_t nbyte;
791
792 nbyte = stream_read_try (ibuf, zfpm_g->sock, FPM_MSG_HDR_LEN - already);
793 if (nbyte == 0 || nbyte == -1)
794 {
795 zfpm_connection_down ("closed socket in read");
796 return 0;
797 }
798
799 if (nbyte != (ssize_t) (FPM_MSG_HDR_LEN - already))
800 goto done;
801
802 already = FPM_MSG_HDR_LEN;
803 }
804
805 stream_set_getp (ibuf, 0);
806
807 hdr = (fpm_msg_hdr_t *) stream_pnt (ibuf);
808
809 if (!fpm_msg_hdr_ok (hdr))
810 {
811 zfpm_connection_down ("invalid message header");
812 return 0;
813 }
814
815 msg_len = fpm_msg_len (hdr);
816
817 /*
818 * Read out the rest of the packet.
819 */
820 if (already < msg_len)
821 {
822 ssize_t nbyte;
823
824 nbyte = stream_read_try (ibuf, zfpm_g->sock, msg_len - already);
825
826 if (nbyte == 0 || nbyte == -1)
827 {
828 zfpm_connection_down ("failed to read message");
829 return 0;
830 }
831
832 if (nbyte != (ssize_t) (msg_len - already))
833 goto done;
834 }
835
836 zfpm_debug ("Read out a full fpm message");
837
838 /*
839 * Just throw it away for now.
840 */
841 stream_reset (ibuf);
842
843 done:
844 zfpm_read_on ();
845 return 0;
846}
847
848/*
849 * zfpm_writes_pending
850 *
851 * Returns TRUE if we may have something to write to the FPM.
852 */
853static int
854zfpm_writes_pending (void)
855{
856
857 /*
858 * Check if there is any data in the outbound buffer that has not
859 * been written to the socket yet.
860 */
861 if (stream_get_endp (zfpm_g->obuf) - stream_get_getp (zfpm_g->obuf))
862 return 1;
863
864 /*
865 * Check if there are any prefixes on the outbound queue.
866 */
867 if (!TAILQ_EMPTY (&zfpm_g->dest_q))
868 return 1;
869
870 return 0;
871}
872
873/*
874 * zfpm_encode_route
875 *
876 * Encode a message to the FPM with information about the given route.
877 *
878 * Returns the number of bytes written to the buffer. 0 or a negative
879 * value indicates an error.
880 */
881static inline int
882zfpm_encode_route (rib_dest_t *dest, struct rib *rib, char *in_buf,
fb0aa886 883 size_t in_buf_len, fpm_msg_type_e *msg_type)
5adc2528 884{
fb0aa886 885 size_t len;
5adc2528 886 int cmd;
fb0aa886 887 len = 0;
5adc2528 888
fb0aa886 889 *msg_type = FPM_MSG_TYPE_NONE;
5adc2528 890
fb0aa886 891 switch (zfpm_g->message_format) {
5adc2528 892
fb0aa886
AS
893 case ZFPM_MSG_FORMAT_PROTOBUF:
894#ifdef HAVE_PROTOBUF
895 len = zfpm_protobuf_encode_route (dest, rib, (uint8_t *) in_buf,
896 in_buf_len);
897 *msg_type = FPM_MSG_TYPE_PROTOBUF;
898#endif
899 break;
5adc2528 900
fb0aa886
AS
901 case ZFPM_MSG_FORMAT_NETLINK:
902#ifdef HAVE_NETLINK
903 *msg_type = FPM_MSG_TYPE_NETLINK;
904 cmd = rib ? RTM_NEWROUTE : RTM_DELROUTE;
905 len = zfpm_netlink_encode_route (cmd, dest, rib, in_buf, in_buf_len);
906 assert(fpm_msg_align(len) == len);
907 *msg_type = FPM_MSG_TYPE_NETLINK;
5adc2528 908#endif /* HAVE_NETLINK */
fb0aa886
AS
909 break;
910
911 default:
912 break;
913 }
914
915 return len;
916
5adc2528
AS
917}
918
919/*
920 * zfpm_route_for_update
921 *
922 * Returns the rib that is to be sent to the FPM for a given dest.
923 */
fb0aa886 924struct rib *
5adc2528
AS
925zfpm_route_for_update (rib_dest_t *dest)
926{
927 struct rib *rib;
928
929 RIB_DEST_FOREACH_ROUTE (dest, rib)
930 {
446bb95e 931 if (!CHECK_FLAG (rib->status, RIB_ENTRY_SELECTED_FIB))
5adc2528
AS
932 continue;
933
934 return rib;
935 }
936
937 /*
938 * We have no route for this destination.
939 */
940 return NULL;
941}
942
943/*
944 * zfpm_build_updates
945 *
946 * Process the outgoing queue and write messages to the outbound
947 * buffer.
948 */
949static void
950zfpm_build_updates (void)
951{
952 struct stream *s;
953 rib_dest_t *dest;
954 unsigned char *buf, *data, *buf_end;
955 size_t msg_len;
956 size_t data_len;
957 fpm_msg_hdr_t *hdr;
958 struct rib *rib;
959 int is_add, write_msg;
fb0aa886 960 fpm_msg_type_e msg_type;
5adc2528
AS
961
962 s = zfpm_g->obuf;
963
964 assert (stream_empty (s));
965
966 do {
967
968 /*
969 * Make sure there is enough space to write another message.
970 */
971 if (STREAM_WRITEABLE (s) < FPM_MAX_MSG_LEN)
972 break;
973
974 buf = STREAM_DATA (s) + stream_get_endp (s);
975 buf_end = buf + STREAM_WRITEABLE (s);
976
977 dest = TAILQ_FIRST (&zfpm_g->dest_q);
978 if (!dest)
979 break;
980
981 assert (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM));
982
983 hdr = (fpm_msg_hdr_t *) buf;
984 hdr->version = FPM_PROTO_VERSION;
5adc2528
AS
985
986 data = fpm_msg_data (hdr);
987
988 rib = zfpm_route_for_update (dest);
989 is_add = rib ? 1 : 0;
990
991 write_msg = 1;
992
993 /*
994 * If this is a route deletion, and we have not sent the route to
995 * the FPM previously, skip it.
996 */
997 if (!is_add && !CHECK_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM))
998 {
999 write_msg = 0;
1000 zfpm_g->stats.nop_deletes_skipped++;
1001 }
1002
1003 if (write_msg) {
fb0aa886
AS
1004 data_len = zfpm_encode_route (dest, rib, (char *) data, buf_end - data,
1005 &msg_type);
5adc2528
AS
1006
1007 assert (data_len);
1008 if (data_len)
1009 {
fb0aa886 1010 hdr->msg_type = msg_type;
5adc2528
AS
1011 msg_len = fpm_data_len_to_msg_len (data_len);
1012 hdr->msg_len = htons (msg_len);
1013 stream_forward_endp (s, msg_len);
1014
1015 if (is_add)
1016 zfpm_g->stats.route_adds++;
1017 else
1018 zfpm_g->stats.route_dels++;
1019 }
1020 }
1021
1022 /*
1023 * Remove the dest from the queue, and reset the flag.
1024 */
1025 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1026 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
1027
1028 if (is_add)
1029 {
1030 SET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
1031 }
1032 else
1033 {
1034 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
1035 }
1036
1037 /*
1038 * Delete the destination if necessary.
1039 */
1040 if (rib_gc_dest (dest->rnode))
1041 zfpm_g->stats.dests_del_after_update++;
1042
1043 } while (1);
1044
1045}
1046
1047/*
1048 * zfpm_write_cb
1049 */
1050static int
1051zfpm_write_cb (struct thread *thread)
1052{
1053 struct stream *s;
1054 int num_writes;
1055
1056 zfpm_g->stats.write_cb_calls++;
1057 assert (zfpm_g->t_write);
1058 zfpm_g->t_write = NULL;
1059
1060 /*
1061 * Check if async connect is now done.
1062 */
1063 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
1064 {
1065 zfpm_connect_check ();
1066 return 0;
1067 }
1068
1069 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1070 assert (zfpm_g->sock >= 0);
1071
1072 num_writes = 0;
1073
1074 do
1075 {
1076 int bytes_to_write, bytes_written;
1077
1078 s = zfpm_g->obuf;
1079
1080 /*
1081 * If the stream is empty, try fill it up with data.
1082 */
1083 if (stream_empty (s))
1084 {
1085 zfpm_build_updates ();
1086 }
1087
1088 bytes_to_write = stream_get_endp (s) - stream_get_getp (s);
1089 if (!bytes_to_write)
1090 break;
1091
1092 bytes_written = write (zfpm_g->sock, STREAM_PNT (s), bytes_to_write);
1093 zfpm_g->stats.write_calls++;
1094 num_writes++;
1095
1096 if (bytes_written < 0)
1097 {
1098 if (ERRNO_IO_RETRY (errno))
1099 break;
1100
1101 zfpm_connection_down ("failed to write to socket");
1102 return 0;
1103 }
1104
1105 if (bytes_written != bytes_to_write)
1106 {
1107
1108 /*
1109 * Partial write.
1110 */
1111 stream_forward_getp (s, bytes_written);
1112 zfpm_g->stats.partial_writes++;
1113 break;
1114 }
1115
1116 /*
1117 * We've written out the entire contents of the stream.
1118 */
1119 stream_reset (s);
1120
1121 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN)
1122 {
1123 zfpm_g->stats.max_writes_hit++;
1124 break;
1125 }
1126
1127 if (zfpm_thread_should_yield (thread))
1128 {
1129 zfpm_g->stats.t_write_yields++;
1130 break;
1131 }
1132 } while (1);
1133
1134 if (zfpm_writes_pending ())
1135 zfpm_write_on ();
1136
1137 return 0;
1138}
1139
1140/*
1141 * zfpm_connect_cb
1142 */
1143static int
1144zfpm_connect_cb (struct thread *t)
1145{
1146 int sock, ret;
1147 struct sockaddr_in serv;
1148
1149 assert (zfpm_g->t_connect);
1150 zfpm_g->t_connect = NULL;
1151 assert (zfpm_g->state == ZFPM_STATE_ACTIVE);
1152
1153 sock = socket (AF_INET, SOCK_STREAM, 0);
1154 if (sock < 0)
1155 {
1156 zfpm_debug ("Failed to create socket for connect(): %s", strerror(errno));
1157 zfpm_g->stats.connect_no_sock++;
1158 return 0;
1159 }
1160
1161 set_nonblocking(sock);
1162
1163 /* Make server socket. */
1164 memset (&serv, 0, sizeof (serv));
1165 serv.sin_family = AF_INET;
1166 serv.sin_port = htons (zfpm_g->fpm_port);
1167#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1168 serv.sin_len = sizeof (struct sockaddr_in);
1169#endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
711ff0ba
USK
1170 if (!zfpm_g->fpm_server)
1171 serv.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
1172 else
1173 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
5adc2528
AS
1174
1175 /*
1176 * Connect to the FPM.
1177 */
1178 zfpm_g->connect_calls++;
1179 zfpm_g->stats.connect_calls++;
1180 zfpm_g->last_connect_call_time = zfpm_get_time ();
1181
1182 ret = connect (sock, (struct sockaddr *) &serv, sizeof (serv));
1183 if (ret >= 0)
1184 {
1185 zfpm_g->sock = sock;
1186 zfpm_connection_up ("connect succeeded");
1187 return 1;
1188 }
1189
1190 if (errno == EINPROGRESS)
1191 {
1192 zfpm_g->sock = sock;
1193 zfpm_read_on ();
1194 zfpm_write_on ();
1195 zfpm_set_state (ZFPM_STATE_CONNECTING, "async connect in progress");
1196 return 0;
1197 }
1198
1199 zlog_info ("can't connect to FPM %d: %s", sock, safe_strerror (errno));
1200 close (sock);
1201
1202 /*
1203 * Restart timer for retrying connection.
1204 */
1205 zfpm_start_connect_timer ("connect() failed");
1206 return 0;
1207}
1208
1209/*
1210 * zfpm_set_state
1211 *
1212 * Move state machine into the given state.
1213 */
1214static void
1215zfpm_set_state (zfpm_state_t state, const char *reason)
1216{
1217 zfpm_state_t cur_state = zfpm_g->state;
1218
1219 if (!reason)
1220 reason = "Unknown";
1221
1222 if (state == cur_state)
1223 return;
1224
1225 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1226 zfpm_state_to_str (cur_state), zfpm_state_to_str (state),
1227 reason);
1228
1229 switch (state) {
1230
1231 case ZFPM_STATE_IDLE:
1232 assert (cur_state == ZFPM_STATE_ESTABLISHED);
1233 break;
1234
1235 case ZFPM_STATE_ACTIVE:
1236 assert (cur_state == ZFPM_STATE_IDLE ||
1237 cur_state == ZFPM_STATE_CONNECTING);
1238 assert (zfpm_g->t_connect);
1239 break;
1240
1241 case ZFPM_STATE_CONNECTING:
1242 assert (zfpm_g->sock);
1243 assert (cur_state == ZFPM_STATE_ACTIVE);
1244 assert (zfpm_g->t_read);
1245 assert (zfpm_g->t_write);
1246 break;
1247
1248 case ZFPM_STATE_ESTABLISHED:
1249 assert (cur_state == ZFPM_STATE_ACTIVE ||
1250 cur_state == ZFPM_STATE_CONNECTING);
1251 assert (zfpm_g->sock);
1252 assert (zfpm_g->t_read);
1253 assert (zfpm_g->t_write);
1254 break;
1255 }
1256
1257 zfpm_g->state = state;
1258}
1259
1260/*
1261 * zfpm_calc_connect_delay
1262 *
1263 * Returns the number of seconds after which we should attempt to
1264 * reconnect to the FPM.
1265 */
1266static long
1267zfpm_calc_connect_delay (void)
1268{
1269 time_t elapsed;
1270
1271 /*
1272 * Return 0 if this is our first attempt to connect.
1273 */
1274 if (zfpm_g->connect_calls == 0)
1275 {
1276 return 0;
1277 }
1278
1279 elapsed = zfpm_get_elapsed_time (zfpm_g->last_connect_call_time);
1280
1281 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1282 return 0;
1283 }
1284
1285 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1286}
1287
1288/*
1289 * zfpm_start_connect_timer
1290 */
1291static void
1292zfpm_start_connect_timer (const char *reason)
1293{
1294 long delay_secs;
1295
1296 assert (!zfpm_g->t_connect);
1297 assert (zfpm_g->sock < 0);
1298
1299 assert(zfpm_g->state == ZFPM_STATE_IDLE ||
1300 zfpm_g->state == ZFPM_STATE_ACTIVE ||
1301 zfpm_g->state == ZFPM_STATE_CONNECTING);
1302
1303 delay_secs = zfpm_calc_connect_delay();
1304 zfpm_debug ("scheduling connect in %ld seconds", delay_secs);
1305
1306 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_connect, zfpm_connect_cb, 0,
1307 delay_secs);
1308 zfpm_set_state (ZFPM_STATE_ACTIVE, reason);
1309}
1310
5697001a 1311#if defined (HAVE_FPM)
5adc2528
AS
1312/*
1313 * zfpm_is_enabled
1314 *
1315 * Returns TRUE if the zebra FPM module has been enabled.
1316 */
1317static inline int
1318zfpm_is_enabled (void)
1319{
1320 return zfpm_g->enabled;
1321}
5697001a 1322#endif
5adc2528
AS
1323
1324/*
1325 * zfpm_conn_is_up
1326 *
1327 * Returns TRUE if the connection to the FPM is up.
1328 */
1329static inline int
1330zfpm_conn_is_up (void)
1331{
1332 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1333 return 0;
1334
1335 assert (zfpm_g->sock >= 0);
1336
1337 return 1;
1338}
1339
1340/*
1341 * zfpm_trigger_update
1342 *
1343 * The zebra code invokes this function to indicate that we should
1344 * send an update to the FPM about the given route_node.
1345 */
1346void
1347zfpm_trigger_update (struct route_node *rn, const char *reason)
1348{
1349 rib_dest_t *dest;
35d921cc 1350 char buf[PREFIX_STRLEN];
5adc2528
AS
1351
1352 /*
1353 * Ignore if the connection is down. We will update the FPM about
1354 * all destinations once the connection comes up.
1355 */
1356 if (!zfpm_conn_is_up ())
1357 return;
1358
1359 dest = rib_dest_from_rnode (rn);
1360
1361 /*
1362 * Ignore the trigger if the dest is not in a table that we would
1363 * send to the FPM.
1364 */
1365 if (!zfpm_is_table_for_fpm (rib_dest_table (dest)))
1366 {
1367 zfpm_g->stats.non_fpm_table_triggers++;
1368 return;
1369 }
1370
1371 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM)) {
1372 zfpm_g->stats.redundant_triggers++;
1373 return;
1374 }
1375
1376 if (reason)
1377 {
35d921cc
TT
1378 zfpm_debug ("%s triggering update to FPM - Reason: %s",
1379 prefix2str (&rn->p, buf, sizeof(buf)), reason);
5adc2528
AS
1380 }
1381
1382 SET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1383 TAILQ_INSERT_TAIL (&zfpm_g->dest_q, dest, fpm_q_entries);
1384 zfpm_g->stats.updates_triggered++;
1385
1386 /*
1387 * Make sure that writes are enabled.
1388 */
1389 if (zfpm_g->t_write)
1390 return;
1391
1392 zfpm_write_on ();
1393}
1394
1395/*
1396 * zfpm_stats_timer_cb
1397 */
1398static int
1399zfpm_stats_timer_cb (struct thread *t)
1400{
1401 assert (zfpm_g->t_stats);
1402 zfpm_g->t_stats = NULL;
1403
1404 /*
1405 * Remember the stats collected in the last interval for display
1406 * purposes.
1407 */
1408 zfpm_stats_copy (&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1409
1410 /*
1411 * Add the current set of stats into the cumulative statistics.
1412 */
1413 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1414 &zfpm_g->cumulative_stats);
1415
1416 /*
1417 * Start collecting stats afresh over the next interval.
1418 */
1419 zfpm_stats_reset (&zfpm_g->stats);
1420
1421 zfpm_start_stats_timer ();
1422
1423 return 0;
1424}
1425
5697001a 1426#if defined (HAVE_FPM)
5adc2528
AS
1427/*
1428 * zfpm_stop_stats_timer
1429 */
1430static void
1431zfpm_stop_stats_timer (void)
1432{
1433 if (!zfpm_g->t_stats)
1434 return;
1435
1436 zfpm_debug ("Stopping existing stats timer");
1437 THREAD_TIMER_OFF (zfpm_g->t_stats);
1438}
5697001a 1439#endif
5adc2528
AS
1440
1441/*
1442 * zfpm_start_stats_timer
1443 */
1444void
1445zfpm_start_stats_timer (void)
1446{
1447 assert (!zfpm_g->t_stats);
1448
1449 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_stats, zfpm_stats_timer_cb, 0,
1450 ZFPM_STATS_IVL_SECS);
1451}
1452
1453/*
1454 * Helper macro for zfpm_show_stats() below.
1455 */
1456#define ZFPM_SHOW_STAT(counter) \
1457 do { \
1458 vty_out (vty, "%-40s %10lu %16lu%s", #counter, total_stats.counter, \
1459 zfpm_g->last_ivl_stats.counter, VTY_NEWLINE); \
1460 } while (0)
1461
5697001a 1462#if defined (HAVE_FPM)
5adc2528
AS
1463/*
1464 * zfpm_show_stats
1465 */
1466static void
1467zfpm_show_stats (struct vty *vty)
1468{
1469 zfpm_stats_t total_stats;
1470 time_t elapsed;
1471
1472 vty_out (vty, "%s%-40s %10s Last %2d secs%s%s", VTY_NEWLINE, "Counter",
1473 "Total", ZFPM_STATS_IVL_SECS, VTY_NEWLINE, VTY_NEWLINE);
1474
1475 /*
1476 * Compute the total stats up to this instant.
1477 */
1478 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1479 &total_stats);
1480
1481 ZFPM_SHOW_STAT (connect_calls);
1482 ZFPM_SHOW_STAT (connect_no_sock);
1483 ZFPM_SHOW_STAT (read_cb_calls);
1484 ZFPM_SHOW_STAT (write_cb_calls);
1485 ZFPM_SHOW_STAT (write_calls);
1486 ZFPM_SHOW_STAT (partial_writes);
1487 ZFPM_SHOW_STAT (max_writes_hit);
1488 ZFPM_SHOW_STAT (t_write_yields);
1489 ZFPM_SHOW_STAT (nop_deletes_skipped);
1490 ZFPM_SHOW_STAT (route_adds);
1491 ZFPM_SHOW_STAT (route_dels);
1492 ZFPM_SHOW_STAT (updates_triggered);
1493 ZFPM_SHOW_STAT (non_fpm_table_triggers);
1494 ZFPM_SHOW_STAT (redundant_triggers);
1495 ZFPM_SHOW_STAT (dests_del_after_update);
1496 ZFPM_SHOW_STAT (t_conn_down_starts);
1497 ZFPM_SHOW_STAT (t_conn_down_dests_processed);
1498 ZFPM_SHOW_STAT (t_conn_down_yields);
1499 ZFPM_SHOW_STAT (t_conn_down_finishes);
1500 ZFPM_SHOW_STAT (t_conn_up_starts);
1501 ZFPM_SHOW_STAT (t_conn_up_dests_processed);
1502 ZFPM_SHOW_STAT (t_conn_up_yields);
1503 ZFPM_SHOW_STAT (t_conn_up_aborts);
1504 ZFPM_SHOW_STAT (t_conn_up_finishes);
1505
1506 if (!zfpm_g->last_stats_clear_time)
1507 return;
1508
1509 elapsed = zfpm_get_elapsed_time (zfpm_g->last_stats_clear_time);
1510
1511 vty_out (vty, "%sStats were cleared %lu seconds ago%s", VTY_NEWLINE,
1512 (unsigned long) elapsed, VTY_NEWLINE);
1513}
1514
1515/*
1516 * zfpm_clear_stats
1517 */
1518static void
1519zfpm_clear_stats (struct vty *vty)
1520{
1521 if (!zfpm_is_enabled ())
1522 {
1523 vty_out (vty, "The FPM module is not enabled...%s", VTY_NEWLINE);
1524 return;
1525 }
1526
1527 zfpm_stats_reset (&zfpm_g->stats);
1528 zfpm_stats_reset (&zfpm_g->last_ivl_stats);
1529 zfpm_stats_reset (&zfpm_g->cumulative_stats);
1530
1531 zfpm_stop_stats_timer ();
1532 zfpm_start_stats_timer ();
1533
1534 zfpm_g->last_stats_clear_time = zfpm_get_time();
1535
1536 vty_out (vty, "Cleared FPM stats%s", VTY_NEWLINE);
1537}
1538
1539/*
1540 * show_zebra_fpm_stats
1541 */
1542DEFUN (show_zebra_fpm_stats,
1543 show_zebra_fpm_stats_cmd,
1544 "show zebra fpm stats",
1545 SHOW_STR
1546 "Zebra information\n"
1547 "Forwarding Path Manager information\n"
1548 "Statistics\n")
1549{
1550 zfpm_show_stats (vty);
1551 return CMD_SUCCESS;
1552}
1553
1554/*
1555 * clear_zebra_fpm_stats
1556 */
1557DEFUN (clear_zebra_fpm_stats,
1558 clear_zebra_fpm_stats_cmd,
1559 "clear zebra fpm stats",
1560 CLEAR_STR
1561 "Zebra information\n"
1562 "Clear Forwarding Path Manager information\n"
1563 "Statistics\n")
1564{
1565 zfpm_clear_stats (vty);
1566 return CMD_SUCCESS;
1567}
1568
711ff0ba
USK
1569/*
1570 * update fpm connection information
1571 */
e52702f2
QY
1572DEFUN ( fpm_remote_ip,
1573 fpm_remote_ip_cmd,
1574 "fpm connection ip A.B.C.D port (1-65535)",
711ff0ba
USK
1575 "fpm connection remote ip and port\n"
1576 "Remote fpm server ip A.B.C.D\n"
1577 "Enter ip ")
1578{
1579
1580 in_addr_t fpm_server;
1581 uint32_t port_no;
1582
e52702f2 1583 fpm_server = inet_addr (argv[3]->arg);
711ff0ba
USK
1584 if (fpm_server == INADDR_NONE)
1585 return CMD_ERR_INCOMPLETE;
1586
e52702f2 1587 port_no = atoi (argv[5]->arg);
711ff0ba
USK
1588 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1589 return CMD_ERR_INCOMPLETE;
1590
1591 zfpm_g->fpm_server = fpm_server;
1592 zfpm_g->fpm_port = port_no;
1593
1594
1595 return CMD_SUCCESS;
1596}
1597
e52702f2
QY
1598DEFUN ( no_fpm_remote_ip,
1599 no_fpm_remote_ip_cmd,
1600 "no fpm connection ip A.B.C.D port (1-65535)",
711ff0ba
USK
1601 "fpm connection remote ip and port\n"
1602 "Connection\n"
1603 "Remote fpm server ip A.B.C.D\n"
1604 "Enter ip ")
1605{
e52702f2
QY
1606 if (zfpm_g->fpm_server != inet_addr (argv[4]->arg) ||
1607 zfpm_g->fpm_port != atoi (argv[6]->arg))
711ff0ba
USK
1608 return CMD_ERR_NO_MATCH;
1609
1610 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1611 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1612
1613 return CMD_SUCCESS;
1614}
5697001a 1615#endif
711ff0ba 1616
fb0aa886
AS
1617/*
1618 * zfpm_init_message_format
1619 */
1620static inline void
1621zfpm_init_message_format (const char *format)
1622{
1623 int have_netlink, have_protobuf;
1624
fb0aa886
AS
1625#ifdef HAVE_NETLINK
1626 have_netlink = 1;
4b2792b5
DS
1627#else
1628 have_netlink = 0;
fb0aa886
AS
1629#endif
1630
1631#ifdef HAVE_PROTOBUF
1632 have_protobuf = 1;
4b2792b5
DS
1633#else
1634 have_protobuf = 0;
fb0aa886
AS
1635#endif
1636
1637 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
1638
1639 if (!format)
1640 {
1641 if (have_netlink)
1642 {
1643 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1644 }
1645 else if (have_protobuf)
1646 {
1647 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1648 }
1649 return;
1650 }
1651
1652 if (!strcmp ("netlink", format))
1653 {
1654 if (!have_netlink)
1655 {
1656 zlog_err ("FPM netlink message format is not available");
1657 return;
1658 }
1659 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1660 return;
1661 }
1662
1663 if (!strcmp ("protobuf", format))
1664 {
1665 if (!have_protobuf)
1666 {
1667 zlog_err ("FPM protobuf message format is not available");
1668 return;
1669 }
1670 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1671 return;
1672 }
1673
1674 zlog_warn ("Unknown fpm format '%s'", format);
1675}
1676
711ff0ba
USK
1677/**
1678 * fpm_remote_srv_write
1679 *
1680 * Module to write remote fpm connection
1681 *
1682 * Returns ZERO on success.
1683 */
1684
1685int fpm_remote_srv_write (struct vty *vty )
1686{
1687 struct in_addr in;
1688
1689 in.s_addr = zfpm_g->fpm_server;
1690
1691 if (zfpm_g->fpm_server != FPM_DEFAULT_IP ||
1692 zfpm_g->fpm_port != FPM_DEFAULT_PORT)
1693 vty_out (vty,"fpm connection ip %s port %d%s", inet_ntoa (in),zfpm_g->fpm_port,VTY_NEWLINE);
1694
1695 return 0;
1696}
1697
1698
5adc2528
AS
1699/**
1700 * zfpm_init
1701 *
1702 * One-time initialization of the Zebra FPM module.
1703 *
1704 * @param[in] port port at which FPM is running.
1705 * @param[in] enable TRUE if the zebra FPM module should be enabled
fb0aa886 1706 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
5adc2528
AS
1707 *
1708 * Returns TRUE on success.
1709 */
1710int
fb0aa886
AS
1711zfpm_init (struct thread_master *master, int enable, uint16_t port,
1712 const char *format)
5adc2528
AS
1713{
1714 static int initialized = 0;
1715
1716 if (initialized) {
1717 return 1;
1718 }
1719
1720 initialized = 1;
1721
1722 memset (zfpm_g, 0, sizeof (*zfpm_g));
1723 zfpm_g->master = master;
1724 TAILQ_INIT(&zfpm_g->dest_q);
1725 zfpm_g->sock = -1;
1726 zfpm_g->state = ZFPM_STATE_IDLE;
1727
5adc2528
AS
1728 zfpm_stats_init (&zfpm_g->stats);
1729 zfpm_stats_init (&zfpm_g->last_ivl_stats);
1730 zfpm_stats_init (&zfpm_g->cumulative_stats);
1731
5697001a 1732#if defined (HAVE_FPM)
5adc2528
AS
1733 install_element (ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1734 install_element (ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
711ff0ba
USK
1735 install_element (CONFIG_NODE, &fpm_remote_ip_cmd);
1736 install_element (CONFIG_NODE, &no_fpm_remote_ip_cmd);
5697001a 1737#endif
5adc2528 1738
fb0aa886
AS
1739 zfpm_init_message_format(format);
1740
1741 /*
1742 * Disable FPM interface if no suitable format is available.
1743 */
1744 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1745 enable = 0;
1746
1747 zfpm_g->enabled = enable;
5adc2528
AS
1748
1749 if (!enable) {
1750 return 1;
1751 }
1752
711ff0ba
USK
1753 if (!zfpm_g->fpm_server)
1754 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1755
5adc2528
AS
1756 if (!port)
1757 port = FPM_DEFAULT_PORT;
1758
1759 zfpm_g->fpm_port = port;
1760
1761 zfpm_g->obuf = stream_new (ZFPM_OBUF_SIZE);
1762 zfpm_g->ibuf = stream_new (ZFPM_IBUF_SIZE);
1763
1764 zfpm_start_stats_timer ();
1765 zfpm_start_connect_timer ("initialized");
1766
1767 return 1;
1768}