]> git.proxmox.com Git - mirror_frr.git/blame - zebra/zebra_fpm.c
Use only the ISC license for .proto files.
[mirror_frr.git] / zebra / zebra_fpm.c
CommitLineData
5adc2528
AS
1/*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with GNU Zebra; see the file COPYING. If not, write to the Free
21 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
22 * 02111-1307, USA.
23 */
24
25#include <zebra.h>
26
27#include "log.h"
28#include "stream.h"
29#include "thread.h"
30#include "network.h"
31#include "command.h"
32
33#include "zebra/rib.h"
7c551956
DS
34#include "zebra/zserv.h"
35#include "zebra/zebra_ns.h"
36#include "zebra/zebra_vrf.h"
5adc2528
AS
37
38#include "fpm/fpm.h"
39#include "zebra_fpm.h"
40#include "zebra_fpm_private.h"
41
42/*
43 * Interval at which we attempt to connect to the FPM.
44 */
45#define ZFPM_CONNECT_RETRY_IVL 5
46
47/*
48 * Sizes of outgoing and incoming stream buffers for writing/reading
49 * FPM messages.
50 */
51#define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
52#define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
53
54/*
55 * The maximum number of times the FPM socket write callback can call
56 * 'write' before it yields.
57 */
58#define ZFPM_MAX_WRITES_PER_RUN 10
59
60/*
61 * Interval over which we collect statistics.
62 */
63#define ZFPM_STATS_IVL_SECS 10
64
65/*
66 * Structure that holds state for iterating over all route_node
67 * structures that are candidates for being communicated to the FPM.
68 */
69typedef struct zfpm_rnodes_iter_t_
70{
71 rib_tables_iter_t tables_iter;
72 route_table_iter_t iter;
73} zfpm_rnodes_iter_t;
74
75/*
76 * Statistics.
77 */
78typedef struct zfpm_stats_t_ {
79 unsigned long connect_calls;
80 unsigned long connect_no_sock;
81
82 unsigned long read_cb_calls;
83
84 unsigned long write_cb_calls;
85 unsigned long write_calls;
86 unsigned long partial_writes;
87 unsigned long max_writes_hit;
88 unsigned long t_write_yields;
89
90 unsigned long nop_deletes_skipped;
91 unsigned long route_adds;
92 unsigned long route_dels;
93
94 unsigned long updates_triggered;
95 unsigned long redundant_triggers;
96 unsigned long non_fpm_table_triggers;
97
98 unsigned long dests_del_after_update;
99
100 unsigned long t_conn_down_starts;
101 unsigned long t_conn_down_dests_processed;
102 unsigned long t_conn_down_yields;
103 unsigned long t_conn_down_finishes;
104
105 unsigned long t_conn_up_starts;
106 unsigned long t_conn_up_dests_processed;
107 unsigned long t_conn_up_yields;
108 unsigned long t_conn_up_aborts;
109 unsigned long t_conn_up_finishes;
110
111} zfpm_stats_t;
112
113/*
114 * States for the FPM state machine.
115 */
116typedef enum {
117
118 /*
119 * In this state we are not yet ready to connect to the FPM. This
120 * can happen when this module is disabled, or if we're cleaning up
121 * after a connection has gone down.
122 */
123 ZFPM_STATE_IDLE,
124
125 /*
126 * Ready to talk to the FPM and periodically trying to connect to
127 * it.
128 */
129 ZFPM_STATE_ACTIVE,
130
131 /*
132 * In the middle of bringing up a TCP connection. Specifically,
133 * waiting for a connect() call to complete asynchronously.
134 */
135 ZFPM_STATE_CONNECTING,
136
137 /*
138 * TCP connection to the FPM is up.
139 */
140 ZFPM_STATE_ESTABLISHED
141
142} zfpm_state_t;
143
144/*
145 * Globals.
146 */
147typedef struct zfpm_glob_t_
148{
149
150 /*
151 * True if the FPM module has been enabled.
152 */
153 int enabled;
154
155 struct thread_master *master;
156
157 zfpm_state_t state;
158
711ff0ba 159 in_addr_t fpm_server;
5adc2528
AS
160 /*
161 * Port on which the FPM is running.
162 */
163 int fpm_port;
164
165 /*
166 * List of rib_dest_t structures to be processed
167 */
168 TAILQ_HEAD (zfpm_dest_q, rib_dest_t_) dest_q;
169
170 /*
171 * Stream socket to the FPM.
172 */
173 int sock;
174
175 /*
176 * Buffers for messages to/from the FPM.
177 */
178 struct stream *obuf;
179 struct stream *ibuf;
180
181 /*
182 * Threads for I/O.
183 */
184 struct thread *t_connect;
185 struct thread *t_write;
186 struct thread *t_read;
187
188 /*
189 * Thread to clean up after the TCP connection to the FPM goes down
190 * and the state that belongs to it.
191 */
192 struct thread *t_conn_down;
193
194 struct {
195 zfpm_rnodes_iter_t iter;
196 } t_conn_down_state;
197
198 /*
199 * Thread to take actions once the TCP conn to the FPM comes up, and
200 * the state that belongs to it.
201 */
202 struct thread *t_conn_up;
203
204 struct {
205 zfpm_rnodes_iter_t iter;
206 } t_conn_up_state;
207
208 unsigned long connect_calls;
209 time_t last_connect_call_time;
210
211 /*
212 * Stats from the start of the current statistics interval up to
213 * now. These are the counters we typically update in the code.
214 */
215 zfpm_stats_t stats;
216
217 /*
218 * Statistics that were gathered in the last collection interval.
219 */
220 zfpm_stats_t last_ivl_stats;
221
222 /*
223 * Cumulative stats from the last clear to the start of the current
224 * statistics interval.
225 */
226 zfpm_stats_t cumulative_stats;
227
228 /*
229 * Stats interval timer.
230 */
231 struct thread *t_stats;
232
233 /*
234 * If non-zero, the last time when statistics were cleared.
235 */
236 time_t last_stats_clear_time;
237
238} zfpm_glob_t;
239
240static zfpm_glob_t zfpm_glob_space;
241static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
242
243static int zfpm_read_cb (struct thread *thread);
244static int zfpm_write_cb (struct thread *thread);
245
246static void zfpm_set_state (zfpm_state_t state, const char *reason);
247static void zfpm_start_connect_timer (const char *reason);
248static void zfpm_start_stats_timer (void);
249
250/*
251 * zfpm_thread_should_yield
252 */
253static inline int
254zfpm_thread_should_yield (struct thread *t)
255{
256 return thread_should_yield (t);
257}
258
259/*
260 * zfpm_state_to_str
261 */
262static const char *
263zfpm_state_to_str (zfpm_state_t state)
264{
265 switch (state)
266 {
267
268 case ZFPM_STATE_IDLE:
269 return "idle";
270
271 case ZFPM_STATE_ACTIVE:
272 return "active";
273
274 case ZFPM_STATE_CONNECTING:
275 return "connecting";
276
277 case ZFPM_STATE_ESTABLISHED:
278 return "established";
279
280 default:
281 return "unknown";
282 }
283}
284
285/*
286 * zfpm_get_time
287 */
288static time_t
289zfpm_get_time (void)
290{
291 struct timeval tv;
292
293 if (quagga_gettime (QUAGGA_CLK_MONOTONIC, &tv) < 0)
294 zlog_warn ("FPM: quagga_gettime failed!!");
295
296 return tv.tv_sec;
297}
298
299/*
300 * zfpm_get_elapsed_time
301 *
302 * Returns the time elapsed (in seconds) since the given time.
303 */
304static time_t
305zfpm_get_elapsed_time (time_t reference)
306{
307 time_t now;
308
309 now = zfpm_get_time ();
310
311 if (now < reference)
312 {
313 assert (0);
314 return 0;
315 }
316
317 return now - reference;
318}
319
320/*
321 * zfpm_is_table_for_fpm
322 *
323 * Returns TRUE if the the given table is to be communicated to the
324 * FPM.
325 */
326static inline int
327zfpm_is_table_for_fpm (struct route_table *table)
328{
329 rib_table_info_t *info;
330
331 info = rib_table_info (table);
332
333 /*
334 * We only send the unicast tables in the main instance to the FPM
335 * at this point.
336 */
b72ede27 337 if (info->zvrf->vrf_id != 0)
5adc2528
AS
338 return 0;
339
340 if (info->safi != SAFI_UNICAST)
341 return 0;
342
343 return 1;
344}
345
346/*
347 * zfpm_rnodes_iter_init
348 */
349static inline void
350zfpm_rnodes_iter_init (zfpm_rnodes_iter_t *iter)
351{
352 memset (iter, 0, sizeof (*iter));
353 rib_tables_iter_init (&iter->tables_iter);
354
355 /*
356 * This is a hack, but it makes implementing 'next' easier by
357 * ensuring that route_table_iter_next() will return NULL the first
358 * time we call it.
359 */
360 route_table_iter_init (&iter->iter, NULL);
361 route_table_iter_cleanup (&iter->iter);
362}
363
364/*
365 * zfpm_rnodes_iter_next
366 */
367static inline struct route_node *
368zfpm_rnodes_iter_next (zfpm_rnodes_iter_t *iter)
369{
370 struct route_node *rn;
371 struct route_table *table;
372
373 while (1)
374 {
375 rn = route_table_iter_next (&iter->iter);
376 if (rn)
377 return rn;
378
379 /*
380 * We've made our way through this table, go to the next one.
381 */
382 route_table_iter_cleanup (&iter->iter);
383
384 while ((table = rib_tables_iter_next (&iter->tables_iter)))
385 {
386 if (zfpm_is_table_for_fpm (table))
387 break;
388 }
389
390 if (!table)
391 return NULL;
392
393 route_table_iter_init (&iter->iter, table);
394 }
395
396 return NULL;
397}
398
399/*
400 * zfpm_rnodes_iter_pause
401 */
402static inline void
403zfpm_rnodes_iter_pause (zfpm_rnodes_iter_t *iter)
404{
405 route_table_iter_pause (&iter->iter);
406}
407
408/*
409 * zfpm_rnodes_iter_cleanup
410 */
411static inline void
412zfpm_rnodes_iter_cleanup (zfpm_rnodes_iter_t *iter)
413{
414 route_table_iter_cleanup (&iter->iter);
415 rib_tables_iter_cleanup (&iter->tables_iter);
416}
417
418/*
419 * zfpm_stats_init
420 *
421 * Initialize a statistics block.
422 */
423static inline void
424zfpm_stats_init (zfpm_stats_t *stats)
425{
426 memset (stats, 0, sizeof (*stats));
427}
428
429/*
430 * zfpm_stats_reset
431 */
432static inline void
433zfpm_stats_reset (zfpm_stats_t *stats)
434{
435 zfpm_stats_init (stats);
436}
437
438/*
439 * zfpm_stats_copy
440 */
441static inline void
442zfpm_stats_copy (const zfpm_stats_t *src, zfpm_stats_t *dest)
443{
444 memcpy (dest, src, sizeof (*dest));
445}
446
447/*
448 * zfpm_stats_compose
449 *
450 * Total up the statistics in two stats structures ('s1 and 's2') and
451 * return the result in the third argument, 'result'. Note that the
452 * pointer 'result' may be the same as 's1' or 's2'.
453 *
454 * For simplicity, the implementation below assumes that the stats
455 * structure is composed entirely of counters. This can easily be
456 * changed when necessary.
457 */
458static void
459zfpm_stats_compose (const zfpm_stats_t *s1, const zfpm_stats_t *s2,
460 zfpm_stats_t *result)
461{
462 const unsigned long *p1, *p2;
463 unsigned long *result_p;
464 int i, num_counters;
465
466 p1 = (const unsigned long *) s1;
467 p2 = (const unsigned long *) s2;
468 result_p = (unsigned long *) result;
469
470 num_counters = (sizeof (zfpm_stats_t) / sizeof (unsigned long));
471
472 for (i = 0; i < num_counters; i++)
473 {
474 result_p[i] = p1[i] + p2[i];
475 }
476}
477
478/*
479 * zfpm_read_on
480 */
481static inline void
482zfpm_read_on (void)
483{
484 assert (!zfpm_g->t_read);
485 assert (zfpm_g->sock >= 0);
486
487 THREAD_READ_ON (zfpm_g->master, zfpm_g->t_read, zfpm_read_cb, 0,
488 zfpm_g->sock);
489}
490
491/*
492 * zfpm_write_on
493 */
494static inline void
495zfpm_write_on (void)
496{
497 assert (!zfpm_g->t_write);
498 assert (zfpm_g->sock >= 0);
499
500 THREAD_WRITE_ON (zfpm_g->master, zfpm_g->t_write, zfpm_write_cb, 0,
501 zfpm_g->sock);
502}
503
504/*
505 * zfpm_read_off
506 */
507static inline void
508zfpm_read_off (void)
509{
510 THREAD_READ_OFF (zfpm_g->t_read);
511}
512
513/*
514 * zfpm_write_off
515 */
516static inline void
517zfpm_write_off (void)
518{
519 THREAD_WRITE_OFF (zfpm_g->t_write);
520}
521
522/*
523 * zfpm_conn_up_thread_cb
524 *
525 * Callback for actions to be taken when the connection to the FPM
526 * comes up.
527 */
528static int
529zfpm_conn_up_thread_cb (struct thread *thread)
530{
531 struct route_node *rnode;
532 zfpm_rnodes_iter_t *iter;
533 rib_dest_t *dest;
534
535 assert (zfpm_g->t_conn_up);
536 zfpm_g->t_conn_up = NULL;
537
538 iter = &zfpm_g->t_conn_up_state.iter;
539
540 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
541 {
542 zfpm_debug ("Connection not up anymore, conn_up thread aborting");
543 zfpm_g->stats.t_conn_up_aborts++;
544 goto done;
545 }
546
547 while ((rnode = zfpm_rnodes_iter_next (iter)))
548 {
549 dest = rib_dest_from_rnode (rnode);
550
551 if (dest)
552 {
553 zfpm_g->stats.t_conn_up_dests_processed++;
554 zfpm_trigger_update (rnode, NULL);
555 }
556
557 /*
558 * Yield if need be.
559 */
560 if (!zfpm_thread_should_yield (thread))
561 continue;
562
563 zfpm_g->stats.t_conn_up_yields++;
564 zfpm_rnodes_iter_pause (iter);
565 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
566 zfpm_conn_up_thread_cb,
567 0, 0);
568 return 0;
569 }
570
571 zfpm_g->stats.t_conn_up_finishes++;
572
573 done:
574 zfpm_rnodes_iter_cleanup (iter);
575 return 0;
576}
577
578/*
579 * zfpm_connection_up
580 *
581 * Called when the connection to the FPM comes up.
582 */
583static void
584zfpm_connection_up (const char *detail)
585{
586 assert (zfpm_g->sock >= 0);
587 zfpm_read_on ();
588 zfpm_write_on ();
589 zfpm_set_state (ZFPM_STATE_ESTABLISHED, detail);
590
591 /*
592 * Start thread to push existing routes to the FPM.
593 */
594 assert (!zfpm_g->t_conn_up);
595
596 zfpm_rnodes_iter_init (&zfpm_g->t_conn_up_state.iter);
597
598 zfpm_debug ("Starting conn_up thread");
599 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
600 zfpm_conn_up_thread_cb, 0, 0);
601 zfpm_g->stats.t_conn_up_starts++;
602}
603
604/*
605 * zfpm_connect_check
606 *
607 * Check if an asynchronous connect() to the FPM is complete.
608 */
609static void
35dece84 610zfpm_connect_check (void)
5adc2528
AS
611{
612 int status;
613 socklen_t slen;
614 int ret;
615
616 zfpm_read_off ();
617 zfpm_write_off ();
618
619 slen = sizeof (status);
620 ret = getsockopt (zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *) &status,
621 &slen);
622
623 if (ret >= 0 && status == 0)
624 {
625 zfpm_connection_up ("async connect complete");
626 return;
627 }
628
629 /*
630 * getsockopt() failed or indicated an error on the socket.
631 */
632 close (zfpm_g->sock);
633 zfpm_g->sock = -1;
634
635 zfpm_start_connect_timer ("getsockopt() after async connect failed");
636 return;
637}
638
639/*
640 * zfpm_conn_down_thread_cb
641 *
642 * Callback that is invoked to clean up state after the TCP connection
643 * to the FPM goes down.
644 */
645static int
646zfpm_conn_down_thread_cb (struct thread *thread)
647{
648 struct route_node *rnode;
649 zfpm_rnodes_iter_t *iter;
650 rib_dest_t *dest;
651
652 assert (zfpm_g->state == ZFPM_STATE_IDLE);
653
654 assert (zfpm_g->t_conn_down);
655 zfpm_g->t_conn_down = NULL;
656
657 iter = &zfpm_g->t_conn_down_state.iter;
658
659 while ((rnode = zfpm_rnodes_iter_next (iter)))
660 {
661 dest = rib_dest_from_rnode (rnode);
662
663 if (dest)
664 {
665 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM))
666 {
667 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
668 }
669
670 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
671 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
672
673 zfpm_g->stats.t_conn_down_dests_processed++;
674
675 /*
676 * Check if the dest should be deleted.
677 */
678 rib_gc_dest(rnode);
679 }
680
681 /*
682 * Yield if need be.
683 */
684 if (!zfpm_thread_should_yield (thread))
685 continue;
686
687 zfpm_g->stats.t_conn_down_yields++;
688 zfpm_rnodes_iter_pause (iter);
689 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
690 zfpm_conn_down_thread_cb,
691 0, 0);
692 return 0;
693 }
694
695 zfpm_g->stats.t_conn_down_finishes++;
696 zfpm_rnodes_iter_cleanup (iter);
697
698 /*
699 * Start the process of connecting to the FPM again.
700 */
701 zfpm_start_connect_timer ("cleanup complete");
702 return 0;
703}
704
705/*
706 * zfpm_connection_down
707 *
708 * Called when the connection to the FPM has gone down.
709 */
710static void
711zfpm_connection_down (const char *detail)
712{
713 if (!detail)
714 detail = "unknown";
715
716 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
717
718 zlog_info ("connection to the FPM has gone down: %s", detail);
719
720 zfpm_read_off ();
721 zfpm_write_off ();
722
723 stream_reset (zfpm_g->ibuf);
724 stream_reset (zfpm_g->obuf);
725
726 if (zfpm_g->sock >= 0) {
727 close (zfpm_g->sock);
728 zfpm_g->sock = -1;
729 }
730
731 /*
732 * Start thread to clean up state after the connection goes down.
733 */
734 assert (!zfpm_g->t_conn_down);
735 zfpm_debug ("Starting conn_down thread");
736 zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter);
737 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
738 zfpm_conn_down_thread_cb, 0, 0);
739 zfpm_g->stats.t_conn_down_starts++;
740
741 zfpm_set_state (ZFPM_STATE_IDLE, detail);
742}
743
744/*
745 * zfpm_read_cb
746 */
747static int
748zfpm_read_cb (struct thread *thread)
749{
750 size_t already;
751 struct stream *ibuf;
752 uint16_t msg_len;
753 fpm_msg_hdr_t *hdr;
754
755 zfpm_g->stats.read_cb_calls++;
756 assert (zfpm_g->t_read);
757 zfpm_g->t_read = NULL;
758
759 /*
760 * Check if async connect is now done.
761 */
762 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
763 {
764 zfpm_connect_check();
765 return 0;
766 }
767
768 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
769 assert (zfpm_g->sock >= 0);
770
771 ibuf = zfpm_g->ibuf;
772
773 already = stream_get_endp (ibuf);
774 if (already < FPM_MSG_HDR_LEN)
775 {
776 ssize_t nbyte;
777
778 nbyte = stream_read_try (ibuf, zfpm_g->sock, FPM_MSG_HDR_LEN - already);
779 if (nbyte == 0 || nbyte == -1)
780 {
781 zfpm_connection_down ("closed socket in read");
782 return 0;
783 }
784
785 if (nbyte != (ssize_t) (FPM_MSG_HDR_LEN - already))
786 goto done;
787
788 already = FPM_MSG_HDR_LEN;
789 }
790
791 stream_set_getp (ibuf, 0);
792
793 hdr = (fpm_msg_hdr_t *) stream_pnt (ibuf);
794
795 if (!fpm_msg_hdr_ok (hdr))
796 {
797 zfpm_connection_down ("invalid message header");
798 return 0;
799 }
800
801 msg_len = fpm_msg_len (hdr);
802
803 /*
804 * Read out the rest of the packet.
805 */
806 if (already < msg_len)
807 {
808 ssize_t nbyte;
809
810 nbyte = stream_read_try (ibuf, zfpm_g->sock, msg_len - already);
811
812 if (nbyte == 0 || nbyte == -1)
813 {
814 zfpm_connection_down ("failed to read message");
815 return 0;
816 }
817
818 if (nbyte != (ssize_t) (msg_len - already))
819 goto done;
820 }
821
822 zfpm_debug ("Read out a full fpm message");
823
824 /*
825 * Just throw it away for now.
826 */
827 stream_reset (ibuf);
828
829 done:
830 zfpm_read_on ();
831 return 0;
832}
833
834/*
835 * zfpm_writes_pending
836 *
837 * Returns TRUE if we may have something to write to the FPM.
838 */
839static int
840zfpm_writes_pending (void)
841{
842
843 /*
844 * Check if there is any data in the outbound buffer that has not
845 * been written to the socket yet.
846 */
847 if (stream_get_endp (zfpm_g->obuf) - stream_get_getp (zfpm_g->obuf))
848 return 1;
849
850 /*
851 * Check if there are any prefixes on the outbound queue.
852 */
853 if (!TAILQ_EMPTY (&zfpm_g->dest_q))
854 return 1;
855
856 return 0;
857}
858
859/*
860 * zfpm_encode_route
861 *
862 * Encode a message to the FPM with information about the given route.
863 *
864 * Returns the number of bytes written to the buffer. 0 or a negative
865 * value indicates an error.
866 */
867static inline int
868zfpm_encode_route (rib_dest_t *dest, struct rib *rib, char *in_buf,
869 size_t in_buf_len)
870{
871#ifndef HAVE_NETLINK
872 return 0;
873#else
874
875 int cmd;
876
877 cmd = rib ? RTM_NEWROUTE : RTM_DELROUTE;
878
879 return zfpm_netlink_encode_route (cmd, dest, rib, in_buf, in_buf_len);
880
881#endif /* HAVE_NETLINK */
882}
883
884/*
885 * zfpm_route_for_update
886 *
887 * Returns the rib that is to be sent to the FPM for a given dest.
888 */
889static struct rib *
890zfpm_route_for_update (rib_dest_t *dest)
891{
892 struct rib *rib;
893
894 RIB_DEST_FOREACH_ROUTE (dest, rib)
895 {
510dc060 896 if (!CHECK_FLAG (rib->flags, ZEBRA_FLAG_SELECTED))
5adc2528
AS
897 continue;
898
899 return rib;
900 }
901
902 /*
903 * We have no route for this destination.
904 */
905 return NULL;
906}
907
908/*
909 * zfpm_build_updates
910 *
911 * Process the outgoing queue and write messages to the outbound
912 * buffer.
913 */
914static void
915zfpm_build_updates (void)
916{
917 struct stream *s;
918 rib_dest_t *dest;
919 unsigned char *buf, *data, *buf_end;
920 size_t msg_len;
921 size_t data_len;
922 fpm_msg_hdr_t *hdr;
923 struct rib *rib;
924 int is_add, write_msg;
925
926 s = zfpm_g->obuf;
927
928 assert (stream_empty (s));
929
930 do {
931
932 /*
933 * Make sure there is enough space to write another message.
934 */
935 if (STREAM_WRITEABLE (s) < FPM_MAX_MSG_LEN)
936 break;
937
938 buf = STREAM_DATA (s) + stream_get_endp (s);
939 buf_end = buf + STREAM_WRITEABLE (s);
940
941 dest = TAILQ_FIRST (&zfpm_g->dest_q);
942 if (!dest)
943 break;
944
945 assert (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM));
946
947 hdr = (fpm_msg_hdr_t *) buf;
948 hdr->version = FPM_PROTO_VERSION;
949 hdr->msg_type = FPM_MSG_TYPE_NETLINK;
950
951 data = fpm_msg_data (hdr);
952
953 rib = zfpm_route_for_update (dest);
954 is_add = rib ? 1 : 0;
955
956 write_msg = 1;
957
958 /*
959 * If this is a route deletion, and we have not sent the route to
960 * the FPM previously, skip it.
961 */
962 if (!is_add && !CHECK_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM))
963 {
964 write_msg = 0;
965 zfpm_g->stats.nop_deletes_skipped++;
966 }
967
968 if (write_msg) {
969 data_len = zfpm_encode_route (dest, rib, (char *) data, buf_end - data);
970
971 assert (data_len);
972 if (data_len)
973 {
974 msg_len = fpm_data_len_to_msg_len (data_len);
975 hdr->msg_len = htons (msg_len);
976 stream_forward_endp (s, msg_len);
977
978 if (is_add)
979 zfpm_g->stats.route_adds++;
980 else
981 zfpm_g->stats.route_dels++;
982 }
983 }
984
985 /*
986 * Remove the dest from the queue, and reset the flag.
987 */
988 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
989 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
990
991 if (is_add)
992 {
993 SET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
994 }
995 else
996 {
997 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
998 }
999
1000 /*
1001 * Delete the destination if necessary.
1002 */
1003 if (rib_gc_dest (dest->rnode))
1004 zfpm_g->stats.dests_del_after_update++;
1005
1006 } while (1);
1007
1008}
1009
1010/*
1011 * zfpm_write_cb
1012 */
1013static int
1014zfpm_write_cb (struct thread *thread)
1015{
1016 struct stream *s;
1017 int num_writes;
1018
1019 zfpm_g->stats.write_cb_calls++;
1020 assert (zfpm_g->t_write);
1021 zfpm_g->t_write = NULL;
1022
1023 /*
1024 * Check if async connect is now done.
1025 */
1026 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
1027 {
1028 zfpm_connect_check ();
1029 return 0;
1030 }
1031
1032 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1033 assert (zfpm_g->sock >= 0);
1034
1035 num_writes = 0;
1036
1037 do
1038 {
1039 int bytes_to_write, bytes_written;
1040
1041 s = zfpm_g->obuf;
1042
1043 /*
1044 * If the stream is empty, try fill it up with data.
1045 */
1046 if (stream_empty (s))
1047 {
1048 zfpm_build_updates ();
1049 }
1050
1051 bytes_to_write = stream_get_endp (s) - stream_get_getp (s);
1052 if (!bytes_to_write)
1053 break;
1054
1055 bytes_written = write (zfpm_g->sock, STREAM_PNT (s), bytes_to_write);
1056 zfpm_g->stats.write_calls++;
1057 num_writes++;
1058
1059 if (bytes_written < 0)
1060 {
1061 if (ERRNO_IO_RETRY (errno))
1062 break;
1063
1064 zfpm_connection_down ("failed to write to socket");
1065 return 0;
1066 }
1067
1068 if (bytes_written != bytes_to_write)
1069 {
1070
1071 /*
1072 * Partial write.
1073 */
1074 stream_forward_getp (s, bytes_written);
1075 zfpm_g->stats.partial_writes++;
1076 break;
1077 }
1078
1079 /*
1080 * We've written out the entire contents of the stream.
1081 */
1082 stream_reset (s);
1083
1084 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN)
1085 {
1086 zfpm_g->stats.max_writes_hit++;
1087 break;
1088 }
1089
1090 if (zfpm_thread_should_yield (thread))
1091 {
1092 zfpm_g->stats.t_write_yields++;
1093 break;
1094 }
1095 } while (1);
1096
1097 if (zfpm_writes_pending ())
1098 zfpm_write_on ();
1099
1100 return 0;
1101}
1102
1103/*
1104 * zfpm_connect_cb
1105 */
1106static int
1107zfpm_connect_cb (struct thread *t)
1108{
1109 int sock, ret;
1110 struct sockaddr_in serv;
1111
1112 assert (zfpm_g->t_connect);
1113 zfpm_g->t_connect = NULL;
1114 assert (zfpm_g->state == ZFPM_STATE_ACTIVE);
1115
1116 sock = socket (AF_INET, SOCK_STREAM, 0);
1117 if (sock < 0)
1118 {
1119 zfpm_debug ("Failed to create socket for connect(): %s", strerror(errno));
1120 zfpm_g->stats.connect_no_sock++;
1121 return 0;
1122 }
1123
1124 set_nonblocking(sock);
1125
1126 /* Make server socket. */
1127 memset (&serv, 0, sizeof (serv));
1128 serv.sin_family = AF_INET;
1129 serv.sin_port = htons (zfpm_g->fpm_port);
1130#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1131 serv.sin_len = sizeof (struct sockaddr_in);
1132#endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
711ff0ba
USK
1133 if (!zfpm_g->fpm_server)
1134 serv.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
1135 else
1136 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
5adc2528
AS
1137
1138 /*
1139 * Connect to the FPM.
1140 */
1141 zfpm_g->connect_calls++;
1142 zfpm_g->stats.connect_calls++;
1143 zfpm_g->last_connect_call_time = zfpm_get_time ();
1144
1145 ret = connect (sock, (struct sockaddr *) &serv, sizeof (serv));
1146 if (ret >= 0)
1147 {
1148 zfpm_g->sock = sock;
1149 zfpm_connection_up ("connect succeeded");
1150 return 1;
1151 }
1152
1153 if (errno == EINPROGRESS)
1154 {
1155 zfpm_g->sock = sock;
1156 zfpm_read_on ();
1157 zfpm_write_on ();
1158 zfpm_set_state (ZFPM_STATE_CONNECTING, "async connect in progress");
1159 return 0;
1160 }
1161
1162 zlog_info ("can't connect to FPM %d: %s", sock, safe_strerror (errno));
1163 close (sock);
1164
1165 /*
1166 * Restart timer for retrying connection.
1167 */
1168 zfpm_start_connect_timer ("connect() failed");
1169 return 0;
1170}
1171
1172/*
1173 * zfpm_set_state
1174 *
1175 * Move state machine into the given state.
1176 */
1177static void
1178zfpm_set_state (zfpm_state_t state, const char *reason)
1179{
1180 zfpm_state_t cur_state = zfpm_g->state;
1181
1182 if (!reason)
1183 reason = "Unknown";
1184
1185 if (state == cur_state)
1186 return;
1187
1188 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1189 zfpm_state_to_str (cur_state), zfpm_state_to_str (state),
1190 reason);
1191
1192 switch (state) {
1193
1194 case ZFPM_STATE_IDLE:
1195 assert (cur_state == ZFPM_STATE_ESTABLISHED);
1196 break;
1197
1198 case ZFPM_STATE_ACTIVE:
1199 assert (cur_state == ZFPM_STATE_IDLE ||
1200 cur_state == ZFPM_STATE_CONNECTING);
1201 assert (zfpm_g->t_connect);
1202 break;
1203
1204 case ZFPM_STATE_CONNECTING:
1205 assert (zfpm_g->sock);
1206 assert (cur_state == ZFPM_STATE_ACTIVE);
1207 assert (zfpm_g->t_read);
1208 assert (zfpm_g->t_write);
1209 break;
1210
1211 case ZFPM_STATE_ESTABLISHED:
1212 assert (cur_state == ZFPM_STATE_ACTIVE ||
1213 cur_state == ZFPM_STATE_CONNECTING);
1214 assert (zfpm_g->sock);
1215 assert (zfpm_g->t_read);
1216 assert (zfpm_g->t_write);
1217 break;
1218 }
1219
1220 zfpm_g->state = state;
1221}
1222
1223/*
1224 * zfpm_calc_connect_delay
1225 *
1226 * Returns the number of seconds after which we should attempt to
1227 * reconnect to the FPM.
1228 */
1229static long
1230zfpm_calc_connect_delay (void)
1231{
1232 time_t elapsed;
1233
1234 /*
1235 * Return 0 if this is our first attempt to connect.
1236 */
1237 if (zfpm_g->connect_calls == 0)
1238 {
1239 return 0;
1240 }
1241
1242 elapsed = zfpm_get_elapsed_time (zfpm_g->last_connect_call_time);
1243
1244 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1245 return 0;
1246 }
1247
1248 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1249}
1250
1251/*
1252 * zfpm_start_connect_timer
1253 */
1254static void
1255zfpm_start_connect_timer (const char *reason)
1256{
1257 long delay_secs;
1258
1259 assert (!zfpm_g->t_connect);
1260 assert (zfpm_g->sock < 0);
1261
1262 assert(zfpm_g->state == ZFPM_STATE_IDLE ||
1263 zfpm_g->state == ZFPM_STATE_ACTIVE ||
1264 zfpm_g->state == ZFPM_STATE_CONNECTING);
1265
1266 delay_secs = zfpm_calc_connect_delay();
1267 zfpm_debug ("scheduling connect in %ld seconds", delay_secs);
1268
1269 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_connect, zfpm_connect_cb, 0,
1270 delay_secs);
1271 zfpm_set_state (ZFPM_STATE_ACTIVE, reason);
1272}
1273
1274/*
1275 * zfpm_is_enabled
1276 *
1277 * Returns TRUE if the zebra FPM module has been enabled.
1278 */
1279static inline int
1280zfpm_is_enabled (void)
1281{
1282 return zfpm_g->enabled;
1283}
1284
1285/*
1286 * zfpm_conn_is_up
1287 *
1288 * Returns TRUE if the connection to the FPM is up.
1289 */
1290static inline int
1291zfpm_conn_is_up (void)
1292{
1293 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1294 return 0;
1295
1296 assert (zfpm_g->sock >= 0);
1297
1298 return 1;
1299}
1300
1301/*
1302 * zfpm_trigger_update
1303 *
1304 * The zebra code invokes this function to indicate that we should
1305 * send an update to the FPM about the given route_node.
1306 */
1307void
1308zfpm_trigger_update (struct route_node *rn, const char *reason)
1309{
1310 rib_dest_t *dest;
35d921cc 1311 char buf[PREFIX_STRLEN];
5adc2528
AS
1312
1313 /*
1314 * Ignore if the connection is down. We will update the FPM about
1315 * all destinations once the connection comes up.
1316 */
1317 if (!zfpm_conn_is_up ())
1318 return;
1319
1320 dest = rib_dest_from_rnode (rn);
1321
1322 /*
1323 * Ignore the trigger if the dest is not in a table that we would
1324 * send to the FPM.
1325 */
1326 if (!zfpm_is_table_for_fpm (rib_dest_table (dest)))
1327 {
1328 zfpm_g->stats.non_fpm_table_triggers++;
1329 return;
1330 }
1331
1332 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM)) {
1333 zfpm_g->stats.redundant_triggers++;
1334 return;
1335 }
1336
1337 if (reason)
1338 {
35d921cc
TT
1339 zfpm_debug ("%s triggering update to FPM - Reason: %s",
1340 prefix2str (&rn->p, buf, sizeof(buf)), reason);
5adc2528
AS
1341 }
1342
1343 SET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1344 TAILQ_INSERT_TAIL (&zfpm_g->dest_q, dest, fpm_q_entries);
1345 zfpm_g->stats.updates_triggered++;
1346
1347 /*
1348 * Make sure that writes are enabled.
1349 */
1350 if (zfpm_g->t_write)
1351 return;
1352
1353 zfpm_write_on ();
1354}
1355
1356/*
1357 * zfpm_stats_timer_cb
1358 */
1359static int
1360zfpm_stats_timer_cb (struct thread *t)
1361{
1362 assert (zfpm_g->t_stats);
1363 zfpm_g->t_stats = NULL;
1364
1365 /*
1366 * Remember the stats collected in the last interval for display
1367 * purposes.
1368 */
1369 zfpm_stats_copy (&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1370
1371 /*
1372 * Add the current set of stats into the cumulative statistics.
1373 */
1374 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1375 &zfpm_g->cumulative_stats);
1376
1377 /*
1378 * Start collecting stats afresh over the next interval.
1379 */
1380 zfpm_stats_reset (&zfpm_g->stats);
1381
1382 zfpm_start_stats_timer ();
1383
1384 return 0;
1385}
1386
1387/*
1388 * zfpm_stop_stats_timer
1389 */
1390static void
1391zfpm_stop_stats_timer (void)
1392{
1393 if (!zfpm_g->t_stats)
1394 return;
1395
1396 zfpm_debug ("Stopping existing stats timer");
1397 THREAD_TIMER_OFF (zfpm_g->t_stats);
1398}
1399
1400/*
1401 * zfpm_start_stats_timer
1402 */
1403void
1404zfpm_start_stats_timer (void)
1405{
1406 assert (!zfpm_g->t_stats);
1407
1408 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_stats, zfpm_stats_timer_cb, 0,
1409 ZFPM_STATS_IVL_SECS);
1410}
1411
1412/*
1413 * Helper macro for zfpm_show_stats() below.
1414 */
1415#define ZFPM_SHOW_STAT(counter) \
1416 do { \
1417 vty_out (vty, "%-40s %10lu %16lu%s", #counter, total_stats.counter, \
1418 zfpm_g->last_ivl_stats.counter, VTY_NEWLINE); \
1419 } while (0)
1420
1421/*
1422 * zfpm_show_stats
1423 */
1424static void
1425zfpm_show_stats (struct vty *vty)
1426{
1427 zfpm_stats_t total_stats;
1428 time_t elapsed;
1429
1430 vty_out (vty, "%s%-40s %10s Last %2d secs%s%s", VTY_NEWLINE, "Counter",
1431 "Total", ZFPM_STATS_IVL_SECS, VTY_NEWLINE, VTY_NEWLINE);
1432
1433 /*
1434 * Compute the total stats up to this instant.
1435 */
1436 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1437 &total_stats);
1438
1439 ZFPM_SHOW_STAT (connect_calls);
1440 ZFPM_SHOW_STAT (connect_no_sock);
1441 ZFPM_SHOW_STAT (read_cb_calls);
1442 ZFPM_SHOW_STAT (write_cb_calls);
1443 ZFPM_SHOW_STAT (write_calls);
1444 ZFPM_SHOW_STAT (partial_writes);
1445 ZFPM_SHOW_STAT (max_writes_hit);
1446 ZFPM_SHOW_STAT (t_write_yields);
1447 ZFPM_SHOW_STAT (nop_deletes_skipped);
1448 ZFPM_SHOW_STAT (route_adds);
1449 ZFPM_SHOW_STAT (route_dels);
1450 ZFPM_SHOW_STAT (updates_triggered);
1451 ZFPM_SHOW_STAT (non_fpm_table_triggers);
1452 ZFPM_SHOW_STAT (redundant_triggers);
1453 ZFPM_SHOW_STAT (dests_del_after_update);
1454 ZFPM_SHOW_STAT (t_conn_down_starts);
1455 ZFPM_SHOW_STAT (t_conn_down_dests_processed);
1456 ZFPM_SHOW_STAT (t_conn_down_yields);
1457 ZFPM_SHOW_STAT (t_conn_down_finishes);
1458 ZFPM_SHOW_STAT (t_conn_up_starts);
1459 ZFPM_SHOW_STAT (t_conn_up_dests_processed);
1460 ZFPM_SHOW_STAT (t_conn_up_yields);
1461 ZFPM_SHOW_STAT (t_conn_up_aborts);
1462 ZFPM_SHOW_STAT (t_conn_up_finishes);
1463
1464 if (!zfpm_g->last_stats_clear_time)
1465 return;
1466
1467 elapsed = zfpm_get_elapsed_time (zfpm_g->last_stats_clear_time);
1468
1469 vty_out (vty, "%sStats were cleared %lu seconds ago%s", VTY_NEWLINE,
1470 (unsigned long) elapsed, VTY_NEWLINE);
1471}
1472
1473/*
1474 * zfpm_clear_stats
1475 */
1476static void
1477zfpm_clear_stats (struct vty *vty)
1478{
1479 if (!zfpm_is_enabled ())
1480 {
1481 vty_out (vty, "The FPM module is not enabled...%s", VTY_NEWLINE);
1482 return;
1483 }
1484
1485 zfpm_stats_reset (&zfpm_g->stats);
1486 zfpm_stats_reset (&zfpm_g->last_ivl_stats);
1487 zfpm_stats_reset (&zfpm_g->cumulative_stats);
1488
1489 zfpm_stop_stats_timer ();
1490 zfpm_start_stats_timer ();
1491
1492 zfpm_g->last_stats_clear_time = zfpm_get_time();
1493
1494 vty_out (vty, "Cleared FPM stats%s", VTY_NEWLINE);
1495}
1496
1497/*
1498 * show_zebra_fpm_stats
1499 */
1500DEFUN (show_zebra_fpm_stats,
1501 show_zebra_fpm_stats_cmd,
1502 "show zebra fpm stats",
1503 SHOW_STR
1504 "Zebra information\n"
1505 "Forwarding Path Manager information\n"
1506 "Statistics\n")
1507{
1508 zfpm_show_stats (vty);
1509 return CMD_SUCCESS;
1510}
1511
1512/*
1513 * clear_zebra_fpm_stats
1514 */
1515DEFUN (clear_zebra_fpm_stats,
1516 clear_zebra_fpm_stats_cmd,
1517 "clear zebra fpm stats",
1518 CLEAR_STR
1519 "Zebra information\n"
1520 "Clear Forwarding Path Manager information\n"
1521 "Statistics\n")
1522{
1523 zfpm_clear_stats (vty);
1524 return CMD_SUCCESS;
1525}
1526
711ff0ba
USK
1527/*
1528 * update fpm connection information
1529 */
1530DEFUN ( fpm_remote_ip,
1531 fpm_remote_ip_cmd,
1532 "fpm connection ip A.B.C.D port <1-65535>",
1533 "fpm connection remote ip and port\n"
1534 "Remote fpm server ip A.B.C.D\n"
1535 "Enter ip ")
1536{
1537
1538 in_addr_t fpm_server;
1539 uint32_t port_no;
1540
1541 fpm_server = inet_addr (argv[0]);
1542 if (fpm_server == INADDR_NONE)
1543 return CMD_ERR_INCOMPLETE;
1544
1545 port_no = atoi (argv[1]);
1546 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1547 return CMD_ERR_INCOMPLETE;
1548
1549 zfpm_g->fpm_server = fpm_server;
1550 zfpm_g->fpm_port = port_no;
1551
1552
1553 return CMD_SUCCESS;
1554}
1555
1556DEFUN ( no_fpm_remote_ip,
1557 no_fpm_remote_ip_cmd,
1558 "no fpm connection ip A.B.C.D port <1-65535>",
1559 "fpm connection remote ip and port\n"
1560 "Connection\n"
1561 "Remote fpm server ip A.B.C.D\n"
1562 "Enter ip ")
1563{
1564 if (zfpm_g->fpm_server != inet_addr (argv[0]) ||
1565 zfpm_g->fpm_port != atoi (argv[1]))
1566 return CMD_ERR_NO_MATCH;
1567
1568 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1569 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1570
1571 return CMD_SUCCESS;
1572}
1573
1574
1575/**
1576 * fpm_remote_srv_write
1577 *
1578 * Module to write remote fpm connection
1579 *
1580 * Returns ZERO on success.
1581 */
1582
1583int fpm_remote_srv_write (struct vty *vty )
1584{
1585 struct in_addr in;
1586
1587 in.s_addr = zfpm_g->fpm_server;
1588
1589 if (zfpm_g->fpm_server != FPM_DEFAULT_IP ||
1590 zfpm_g->fpm_port != FPM_DEFAULT_PORT)
1591 vty_out (vty,"fpm connection ip %s port %d%s", inet_ntoa (in),zfpm_g->fpm_port,VTY_NEWLINE);
1592
1593 return 0;
1594}
1595
1596
5adc2528
AS
1597/**
1598 * zfpm_init
1599 *
1600 * One-time initialization of the Zebra FPM module.
1601 *
1602 * @param[in] port port at which FPM is running.
1603 * @param[in] enable TRUE if the zebra FPM module should be enabled
1604 *
1605 * Returns TRUE on success.
1606 */
1607int
1608zfpm_init (struct thread_master *master, int enable, uint16_t port)
1609{
1610 static int initialized = 0;
1611
1612 if (initialized) {
1613 return 1;
1614 }
1615
1616 initialized = 1;
1617
1618 memset (zfpm_g, 0, sizeof (*zfpm_g));
1619 zfpm_g->master = master;
1620 TAILQ_INIT(&zfpm_g->dest_q);
1621 zfpm_g->sock = -1;
1622 zfpm_g->state = ZFPM_STATE_IDLE;
1623
1624 /*
1625 * Netlink must currently be available for the Zebra-FPM interface
1626 * to be enabled.
1627 */
1628#ifndef HAVE_NETLINK
1629 enable = 0;
1630#endif
1631
1632 zfpm_g->enabled = enable;
1633
1634 zfpm_stats_init (&zfpm_g->stats);
1635 zfpm_stats_init (&zfpm_g->last_ivl_stats);
1636 zfpm_stats_init (&zfpm_g->cumulative_stats);
1637
1638 install_element (ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1639 install_element (ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
711ff0ba
USK
1640 install_element (CONFIG_NODE, &fpm_remote_ip_cmd);
1641 install_element (CONFIG_NODE, &no_fpm_remote_ip_cmd);
5adc2528
AS
1642
1643 if (!enable) {
1644 return 1;
1645 }
1646
711ff0ba
USK
1647 if (!zfpm_g->fpm_server)
1648 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1649
5adc2528
AS
1650 if (!port)
1651 port = FPM_DEFAULT_PORT;
1652
1653 zfpm_g->fpm_port = port;
1654
1655 zfpm_g->obuf = stream_new (ZFPM_OBUF_SIZE);
1656 zfpm_g->ibuf = stream_new (ZFPM_IBUF_SIZE);
1657
1658 zfpm_start_stats_timer ();
1659 zfpm_start_connect_timer ("initialized");
1660
1661 return 1;
1662}