]> git.proxmox.com Git - mirror_frr.git/blame - zebra/zebra_fpm.c
*: use clang's 'ForEachMacros' format style option
[mirror_frr.git] / zebra / zebra_fpm.c
CommitLineData
5adc2528
AS
1/*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
896014f4
DL
19 * You should have received a copy of the GNU General Public License along
20 * with this program; see the file COPYING; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
5adc2528
AS
22 */
23
24#include <zebra.h>
25
26#include "log.h"
4f8ea50c 27#include "libfrr.h"
5adc2528
AS
28#include "stream.h"
29#include "thread.h"
30#include "network.h"
31#include "command.h"
4f8ea50c 32#include "version.h"
5adc2528
AS
33
34#include "zebra/rib.h"
7c551956
DS
35#include "zebra/zserv.h"
36#include "zebra/zebra_ns.h"
37#include "zebra/zebra_vrf.h"
5adc2528
AS
38
39#include "fpm/fpm.h"
5adc2528
AS
40#include "zebra_fpm_private.h"
41
42/*
43 * Interval at which we attempt to connect to the FPM.
44 */
45#define ZFPM_CONNECT_RETRY_IVL 5
46
47/*
48 * Sizes of outgoing and incoming stream buffers for writing/reading
49 * FPM messages.
50 */
51#define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
52#define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
53
54/*
55 * The maximum number of times the FPM socket write callback can call
56 * 'write' before it yields.
57 */
58#define ZFPM_MAX_WRITES_PER_RUN 10
59
60/*
61 * Interval over which we collect statistics.
62 */
63#define ZFPM_STATS_IVL_SECS 10
64
65/*
66 * Structure that holds state for iterating over all route_node
67 * structures that are candidates for being communicated to the FPM.
68 */
d62a17ae 69typedef struct zfpm_rnodes_iter_t_ {
70 rib_tables_iter_t tables_iter;
71 route_table_iter_t iter;
5adc2528
AS
72} zfpm_rnodes_iter_t;
73
74/*
75 * Statistics.
76 */
77typedef struct zfpm_stats_t_ {
d62a17ae 78 unsigned long connect_calls;
79 unsigned long connect_no_sock;
5adc2528 80
d62a17ae 81 unsigned long read_cb_calls;
5adc2528 82
d62a17ae 83 unsigned long write_cb_calls;
84 unsigned long write_calls;
85 unsigned long partial_writes;
86 unsigned long max_writes_hit;
87 unsigned long t_write_yields;
5adc2528 88
d62a17ae 89 unsigned long nop_deletes_skipped;
90 unsigned long route_adds;
91 unsigned long route_dels;
5adc2528 92
d62a17ae 93 unsigned long updates_triggered;
94 unsigned long redundant_triggers;
95 unsigned long non_fpm_table_triggers;
5adc2528 96
d62a17ae 97 unsigned long dests_del_after_update;
5adc2528 98
d62a17ae 99 unsigned long t_conn_down_starts;
100 unsigned long t_conn_down_dests_processed;
101 unsigned long t_conn_down_yields;
102 unsigned long t_conn_down_finishes;
5adc2528 103
d62a17ae 104 unsigned long t_conn_up_starts;
105 unsigned long t_conn_up_dests_processed;
106 unsigned long t_conn_up_yields;
107 unsigned long t_conn_up_aborts;
108 unsigned long t_conn_up_finishes;
5adc2528
AS
109
110} zfpm_stats_t;
111
112/*
113 * States for the FPM state machine.
114 */
115typedef enum {
116
d62a17ae 117 /*
118 * In this state we are not yet ready to connect to the FPM. This
119 * can happen when this module is disabled, or if we're cleaning up
120 * after a connection has gone down.
121 */
122 ZFPM_STATE_IDLE,
123
124 /*
125 * Ready to talk to the FPM and periodically trying to connect to
126 * it.
127 */
128 ZFPM_STATE_ACTIVE,
129
130 /*
131 * In the middle of bringing up a TCP connection. Specifically,
132 * waiting for a connect() call to complete asynchronously.
133 */
134 ZFPM_STATE_CONNECTING,
135
136 /*
137 * TCP connection to the FPM is up.
138 */
139 ZFPM_STATE_ESTABLISHED
5adc2528
AS
140
141} zfpm_state_t;
142
fb0aa886
AS
143/*
144 * Message format to be used to communicate with the FPM.
145 */
d62a17ae 146typedef enum {
147 ZFPM_MSG_FORMAT_NONE,
148 ZFPM_MSG_FORMAT_NETLINK,
149 ZFPM_MSG_FORMAT_PROTOBUF,
fb0aa886 150} zfpm_msg_format_e;
5adc2528
AS
151/*
152 * Globals.
153 */
d62a17ae 154typedef struct zfpm_glob_t_ {
155
156 /*
157 * True if the FPM module has been enabled.
158 */
159 int enabled;
160
161 /*
162 * Message format to be used to communicate with the fpm.
163 */
164 zfpm_msg_format_e message_format;
165
166 struct thread_master *master;
167
168 zfpm_state_t state;
169
170 in_addr_t fpm_server;
171 /*
172 * Port on which the FPM is running.
173 */
174 int fpm_port;
175
176 /*
177 * List of rib_dest_t structures to be processed
178 */
179 TAILQ_HEAD(zfpm_dest_q, rib_dest_t_) dest_q;
180
181 /*
182 * Stream socket to the FPM.
183 */
184 int sock;
185
186 /*
187 * Buffers for messages to/from the FPM.
188 */
189 struct stream *obuf;
190 struct stream *ibuf;
191
192 /*
193 * Threads for I/O.
194 */
195 struct thread *t_connect;
196 struct thread *t_write;
197 struct thread *t_read;
198
199 /*
200 * Thread to clean up after the TCP connection to the FPM goes down
201 * and the state that belongs to it.
202 */
203 struct thread *t_conn_down;
204
205 struct {
206 zfpm_rnodes_iter_t iter;
207 } t_conn_down_state;
208
209 /*
210 * Thread to take actions once the TCP conn to the FPM comes up, and
211 * the state that belongs to it.
212 */
213 struct thread *t_conn_up;
214
215 struct {
216 zfpm_rnodes_iter_t iter;
217 } t_conn_up_state;
218
219 unsigned long connect_calls;
220 time_t last_connect_call_time;
221
222 /*
223 * Stats from the start of the current statistics interval up to
224 * now. These are the counters we typically update in the code.
225 */
226 zfpm_stats_t stats;
227
228 /*
229 * Statistics that were gathered in the last collection interval.
230 */
231 zfpm_stats_t last_ivl_stats;
232
233 /*
234 * Cumulative stats from the last clear to the start of the current
235 * statistics interval.
236 */
237 zfpm_stats_t cumulative_stats;
238
239 /*
240 * Stats interval timer.
241 */
242 struct thread *t_stats;
243
244 /*
245 * If non-zero, the last time when statistics were cleared.
246 */
247 time_t last_stats_clear_time;
5adc2528
AS
248
249} zfpm_glob_t;
250
251static zfpm_glob_t zfpm_glob_space;
252static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
253
d62a17ae 254static int zfpm_trigger_update(struct route_node *rn, const char *reason);
4f8ea50c 255
d62a17ae 256static int zfpm_read_cb(struct thread *thread);
257static int zfpm_write_cb(struct thread *thread);
5adc2528 258
d62a17ae 259static void zfpm_set_state(zfpm_state_t state, const char *reason);
260static void zfpm_start_connect_timer(const char *reason);
261static void zfpm_start_stats_timer(void);
5adc2528
AS
262
263/*
264 * zfpm_thread_should_yield
265 */
d62a17ae 266static inline int zfpm_thread_should_yield(struct thread *t)
5adc2528 267{
d62a17ae 268 return thread_should_yield(t);
5adc2528
AS
269}
270
271/*
272 * zfpm_state_to_str
273 */
d62a17ae 274static const char *zfpm_state_to_str(zfpm_state_t state)
5adc2528 275{
d62a17ae 276 switch (state) {
5adc2528 277
d62a17ae 278 case ZFPM_STATE_IDLE:
279 return "idle";
5adc2528 280
d62a17ae 281 case ZFPM_STATE_ACTIVE:
282 return "active";
5adc2528 283
d62a17ae 284 case ZFPM_STATE_CONNECTING:
285 return "connecting";
5adc2528 286
d62a17ae 287 case ZFPM_STATE_ESTABLISHED:
288 return "established";
5adc2528 289
d62a17ae 290 default:
291 return "unknown";
292 }
5adc2528
AS
293}
294
5adc2528
AS
295/*
296 * zfpm_get_elapsed_time
297 *
298 * Returns the time elapsed (in seconds) since the given time.
299 */
d62a17ae 300static time_t zfpm_get_elapsed_time(time_t reference)
5adc2528 301{
d62a17ae 302 time_t now;
5adc2528 303
d62a17ae 304 now = monotime(NULL);
5adc2528 305
d62a17ae 306 if (now < reference) {
307 assert(0);
308 return 0;
309 }
5adc2528 310
d62a17ae 311 return now - reference;
5adc2528
AS
312}
313
314/*
315 * zfpm_is_table_for_fpm
316 *
317 * Returns TRUE if the the given table is to be communicated to the
318 * FPM.
319 */
d62a17ae 320static inline int zfpm_is_table_for_fpm(struct route_table *table)
5adc2528 321{
d62a17ae 322 rib_table_info_t *info;
5adc2528 323
d62a17ae 324 info = rib_table_info(table);
5adc2528 325
d62a17ae 326 /*
327 * We only send the unicast tables in the main instance to the FPM
328 * at this point.
329 */
330 if (zvrf_id(info->zvrf) != 0)
331 return 0;
5adc2528 332
d62a17ae 333 if (info->safi != SAFI_UNICAST)
334 return 0;
5adc2528 335
d62a17ae 336 return 1;
5adc2528
AS
337}
338
339/*
340 * zfpm_rnodes_iter_init
341 */
d62a17ae 342static inline void zfpm_rnodes_iter_init(zfpm_rnodes_iter_t *iter)
5adc2528 343{
d62a17ae 344 memset(iter, 0, sizeof(*iter));
345 rib_tables_iter_init(&iter->tables_iter);
346
347 /*
348 * This is a hack, but it makes implementing 'next' easier by
349 * ensuring that route_table_iter_next() will return NULL the first
350 * time we call it.
351 */
352 route_table_iter_init(&iter->iter, NULL);
353 route_table_iter_cleanup(&iter->iter);
5adc2528
AS
354}
355
356/*
357 * zfpm_rnodes_iter_next
358 */
d62a17ae 359static inline struct route_node *zfpm_rnodes_iter_next(zfpm_rnodes_iter_t *iter)
5adc2528 360{
d62a17ae 361 struct route_node *rn;
362 struct route_table *table;
5adc2528 363
d62a17ae 364 while (1) {
365 rn = route_table_iter_next(&iter->iter);
366 if (rn)
367 return rn;
5adc2528 368
d62a17ae 369 /*
370 * We've made our way through this table, go to the next one.
371 */
372 route_table_iter_cleanup(&iter->iter);
5adc2528 373
d62a17ae 374 while ((table = rib_tables_iter_next(&iter->tables_iter))) {
375 if (zfpm_is_table_for_fpm(table))
376 break;
377 }
5adc2528 378
d62a17ae 379 if (!table)
380 return NULL;
5adc2528 381
d62a17ae 382 route_table_iter_init(&iter->iter, table);
383 }
5adc2528 384
d62a17ae 385 return NULL;
5adc2528
AS
386}
387
388/*
389 * zfpm_rnodes_iter_pause
390 */
d62a17ae 391static inline void zfpm_rnodes_iter_pause(zfpm_rnodes_iter_t *iter)
5adc2528 392{
d62a17ae 393 route_table_iter_pause(&iter->iter);
5adc2528
AS
394}
395
396/*
397 * zfpm_rnodes_iter_cleanup
398 */
d62a17ae 399static inline void zfpm_rnodes_iter_cleanup(zfpm_rnodes_iter_t *iter)
5adc2528 400{
d62a17ae 401 route_table_iter_cleanup(&iter->iter);
402 rib_tables_iter_cleanup(&iter->tables_iter);
5adc2528
AS
403}
404
405/*
406 * zfpm_stats_init
407 *
408 * Initialize a statistics block.
409 */
d62a17ae 410static inline void zfpm_stats_init(zfpm_stats_t *stats)
5adc2528 411{
d62a17ae 412 memset(stats, 0, sizeof(*stats));
5adc2528
AS
413}
414
415/*
416 * zfpm_stats_reset
417 */
d62a17ae 418static inline void zfpm_stats_reset(zfpm_stats_t *stats)
5adc2528 419{
d62a17ae 420 zfpm_stats_init(stats);
5adc2528
AS
421}
422
423/*
424 * zfpm_stats_copy
425 */
d62a17ae 426static inline void zfpm_stats_copy(const zfpm_stats_t *src, zfpm_stats_t *dest)
5adc2528 427{
d62a17ae 428 memcpy(dest, src, sizeof(*dest));
5adc2528
AS
429}
430
431/*
432 * zfpm_stats_compose
433 *
434 * Total up the statistics in two stats structures ('s1 and 's2') and
435 * return the result in the third argument, 'result'. Note that the
436 * pointer 'result' may be the same as 's1' or 's2'.
437 *
438 * For simplicity, the implementation below assumes that the stats
439 * structure is composed entirely of counters. This can easily be
440 * changed when necessary.
441 */
d62a17ae 442static void zfpm_stats_compose(const zfpm_stats_t *s1, const zfpm_stats_t *s2,
443 zfpm_stats_t *result)
5adc2528 444{
d62a17ae 445 const unsigned long *p1, *p2;
446 unsigned long *result_p;
447 int i, num_counters;
5adc2528 448
d62a17ae 449 p1 = (const unsigned long *)s1;
450 p2 = (const unsigned long *)s2;
451 result_p = (unsigned long *)result;
5adc2528 452
d62a17ae 453 num_counters = (sizeof(zfpm_stats_t) / sizeof(unsigned long));
5adc2528 454
d62a17ae 455 for (i = 0; i < num_counters; i++) {
456 result_p[i] = p1[i] + p2[i];
457 }
5adc2528
AS
458}
459
460/*
461 * zfpm_read_on
462 */
d62a17ae 463static inline void zfpm_read_on(void)
5adc2528 464{
d62a17ae 465 assert(!zfpm_g->t_read);
466 assert(zfpm_g->sock >= 0);
5adc2528 467
d62a17ae 468 thread_add_read(zfpm_g->master, zfpm_read_cb, 0, zfpm_g->sock,
469 &zfpm_g->t_read);
5adc2528
AS
470}
471
472/*
473 * zfpm_write_on
474 */
d62a17ae 475static inline void zfpm_write_on(void)
5adc2528 476{
d62a17ae 477 assert(!zfpm_g->t_write);
478 assert(zfpm_g->sock >= 0);
5adc2528 479
d62a17ae 480 thread_add_write(zfpm_g->master, zfpm_write_cb, 0, zfpm_g->sock,
481 &zfpm_g->t_write);
5adc2528
AS
482}
483
484/*
485 * zfpm_read_off
486 */
d62a17ae 487static inline void zfpm_read_off(void)
5adc2528 488{
d62a17ae 489 THREAD_READ_OFF(zfpm_g->t_read);
5adc2528
AS
490}
491
492/*
493 * zfpm_write_off
494 */
d62a17ae 495static inline void zfpm_write_off(void)
5adc2528 496{
d62a17ae 497 THREAD_WRITE_OFF(zfpm_g->t_write);
5adc2528
AS
498}
499
500/*
501 * zfpm_conn_up_thread_cb
502 *
503 * Callback for actions to be taken when the connection to the FPM
504 * comes up.
505 */
d62a17ae 506static int zfpm_conn_up_thread_cb(struct thread *thread)
5adc2528 507{
d62a17ae 508 struct route_node *rnode;
509 zfpm_rnodes_iter_t *iter;
510 rib_dest_t *dest;
5adc2528 511
d62a17ae 512 zfpm_g->t_conn_up = NULL;
5adc2528 513
d62a17ae 514 iter = &zfpm_g->t_conn_up_state.iter;
5adc2528 515
d62a17ae 516 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED) {
517 zfpm_debug(
518 "Connection not up anymore, conn_up thread aborting");
519 zfpm_g->stats.t_conn_up_aborts++;
520 goto done;
521 }
5adc2528 522
d62a17ae 523 while ((rnode = zfpm_rnodes_iter_next(iter))) {
524 dest = rib_dest_from_rnode(rnode);
525
526 if (dest) {
527 zfpm_g->stats.t_conn_up_dests_processed++;
528 zfpm_trigger_update(rnode, NULL);
529 }
530
531 /*
532 * Yield if need be.
533 */
534 if (!zfpm_thread_should_yield(thread))
535 continue;
536
537 zfpm_g->stats.t_conn_up_yields++;
538 zfpm_rnodes_iter_pause(iter);
539 zfpm_g->t_conn_up = NULL;
540 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb,
541 NULL, 0, &zfpm_g->t_conn_up);
542 return 0;
5adc2528
AS
543 }
544
d62a17ae 545 zfpm_g->stats.t_conn_up_finishes++;
546
547done:
548 zfpm_rnodes_iter_cleanup(iter);
549 return 0;
5adc2528
AS
550}
551
552/*
553 * zfpm_connection_up
554 *
555 * Called when the connection to the FPM comes up.
556 */
d62a17ae 557static void zfpm_connection_up(const char *detail)
5adc2528 558{
d62a17ae 559 assert(zfpm_g->sock >= 0);
560 zfpm_read_on();
561 zfpm_write_on();
562 zfpm_set_state(ZFPM_STATE_ESTABLISHED, detail);
563
564 /*
565 * Start thread to push existing routes to the FPM.
566 */
567 assert(!zfpm_g->t_conn_up);
568
569 zfpm_rnodes_iter_init(&zfpm_g->t_conn_up_state.iter);
570
571 zfpm_debug("Starting conn_up thread");
572 zfpm_g->t_conn_up = NULL;
573 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0,
574 &zfpm_g->t_conn_up);
575 zfpm_g->stats.t_conn_up_starts++;
5adc2528
AS
576}
577
578/*
579 * zfpm_connect_check
580 *
581 * Check if an asynchronous connect() to the FPM is complete.
582 */
d62a17ae 583static void zfpm_connect_check(void)
5adc2528 584{
d62a17ae 585 int status;
586 socklen_t slen;
587 int ret;
588
589 zfpm_read_off();
590 zfpm_write_off();
591
592 slen = sizeof(status);
593 ret = getsockopt(zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *)&status,
594 &slen);
595
596 if (ret >= 0 && status == 0) {
597 zfpm_connection_up("async connect complete");
598 return;
599 }
600
601 /*
602 * getsockopt() failed or indicated an error on the socket.
603 */
604 close(zfpm_g->sock);
605 zfpm_g->sock = -1;
606
607 zfpm_start_connect_timer("getsockopt() after async connect failed");
608 return;
5adc2528
AS
609}
610
611/*
612 * zfpm_conn_down_thread_cb
613 *
614 * Callback that is invoked to clean up state after the TCP connection
615 * to the FPM goes down.
616 */
d62a17ae 617static int zfpm_conn_down_thread_cb(struct thread *thread)
5adc2528 618{
d62a17ae 619 struct route_node *rnode;
620 zfpm_rnodes_iter_t *iter;
621 rib_dest_t *dest;
5adc2528 622
d62a17ae 623 assert(zfpm_g->state == ZFPM_STATE_IDLE);
5adc2528 624
d62a17ae 625 zfpm_g->t_conn_down = NULL;
5adc2528 626
d62a17ae 627 iter = &zfpm_g->t_conn_down_state.iter;
5adc2528 628
d62a17ae 629 while ((rnode = zfpm_rnodes_iter_next(iter))) {
630 dest = rib_dest_from_rnode(rnode);
5adc2528 631
d62a17ae 632 if (dest) {
633 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
634 TAILQ_REMOVE(&zfpm_g->dest_q, dest,
635 fpm_q_entries);
636 }
637
638 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
639 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
5adc2528 640
d62a17ae 641 zfpm_g->stats.t_conn_down_dests_processed++;
5adc2528 642
d62a17ae 643 /*
644 * Check if the dest should be deleted.
645 */
646 rib_gc_dest(rnode);
647 }
5adc2528 648
d62a17ae 649 /*
650 * Yield if need be.
651 */
652 if (!zfpm_thread_should_yield(thread))
653 continue;
654
655 zfpm_g->stats.t_conn_down_yields++;
656 zfpm_rnodes_iter_pause(iter);
657 zfpm_g->t_conn_down = NULL;
658 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb,
659 NULL, 0, &zfpm_g->t_conn_down);
660 return 0;
5adc2528
AS
661 }
662
d62a17ae 663 zfpm_g->stats.t_conn_down_finishes++;
664 zfpm_rnodes_iter_cleanup(iter);
665
666 /*
667 * Start the process of connecting to the FPM again.
668 */
669 zfpm_start_connect_timer("cleanup complete");
670 return 0;
5adc2528
AS
671}
672
673/*
674 * zfpm_connection_down
675 *
676 * Called when the connection to the FPM has gone down.
677 */
d62a17ae 678static void zfpm_connection_down(const char *detail)
5adc2528 679{
d62a17ae 680 if (!detail)
681 detail = "unknown";
5adc2528 682
d62a17ae 683 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
5adc2528 684
d62a17ae 685 zlog_info("connection to the FPM has gone down: %s", detail);
5adc2528 686
d62a17ae 687 zfpm_read_off();
688 zfpm_write_off();
5adc2528 689
d62a17ae 690 stream_reset(zfpm_g->ibuf);
691 stream_reset(zfpm_g->obuf);
5adc2528 692
d62a17ae 693 if (zfpm_g->sock >= 0) {
694 close(zfpm_g->sock);
695 zfpm_g->sock = -1;
696 }
5adc2528 697
d62a17ae 698 /*
699 * Start thread to clean up state after the connection goes down.
700 */
701 assert(!zfpm_g->t_conn_down);
702 zfpm_debug("Starting conn_down thread");
703 zfpm_rnodes_iter_init(&zfpm_g->t_conn_down_state.iter);
704 zfpm_g->t_conn_down = NULL;
705 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0,
706 &zfpm_g->t_conn_down);
707 zfpm_g->stats.t_conn_down_starts++;
708
709 zfpm_set_state(ZFPM_STATE_IDLE, detail);
5adc2528
AS
710}
711
712/*
713 * zfpm_read_cb
714 */
d62a17ae 715static int zfpm_read_cb(struct thread *thread)
5adc2528 716{
d62a17ae 717 size_t already;
718 struct stream *ibuf;
719 uint16_t msg_len;
720 fpm_msg_hdr_t *hdr;
721
722 zfpm_g->stats.read_cb_calls++;
723 zfpm_g->t_read = NULL;
724
725 /*
726 * Check if async connect is now done.
727 */
728 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
729 zfpm_connect_check();
730 return 0;
5adc2528
AS
731 }
732
d62a17ae 733 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
734 assert(zfpm_g->sock >= 0);
5adc2528 735
d62a17ae 736 ibuf = zfpm_g->ibuf;
5adc2528 737
d62a17ae 738 already = stream_get_endp(ibuf);
739 if (already < FPM_MSG_HDR_LEN) {
740 ssize_t nbyte;
5adc2528 741
d62a17ae 742 nbyte = stream_read_try(ibuf, zfpm_g->sock,
743 FPM_MSG_HDR_LEN - already);
744 if (nbyte == 0 || nbyte == -1) {
745 zfpm_connection_down("closed socket in read");
746 return 0;
747 }
5adc2528 748
d62a17ae 749 if (nbyte != (ssize_t)(FPM_MSG_HDR_LEN - already))
750 goto done;
5adc2528 751
d62a17ae 752 already = FPM_MSG_HDR_LEN;
753 }
5adc2528 754
d62a17ae 755 stream_set_getp(ibuf, 0);
5adc2528 756
d62a17ae 757 hdr = (fpm_msg_hdr_t *)stream_pnt(ibuf);
5adc2528 758
d62a17ae 759 if (!fpm_msg_hdr_ok(hdr)) {
760 zfpm_connection_down("invalid message header");
761 return 0;
5adc2528
AS
762 }
763
d62a17ae 764 msg_len = fpm_msg_len(hdr);
5adc2528 765
d62a17ae 766 /*
767 * Read out the rest of the packet.
768 */
769 if (already < msg_len) {
770 ssize_t nbyte;
5adc2528 771
d62a17ae 772 nbyte = stream_read_try(ibuf, zfpm_g->sock, msg_len - already);
5adc2528 773
d62a17ae 774 if (nbyte == 0 || nbyte == -1) {
775 zfpm_connection_down("failed to read message");
776 return 0;
777 }
778
779 if (nbyte != (ssize_t)(msg_len - already))
780 goto done;
781 }
782
783 zfpm_debug("Read out a full fpm message");
784
785 /*
786 * Just throw it away for now.
787 */
788 stream_reset(ibuf);
789
790done:
791 zfpm_read_on();
792 return 0;
5adc2528
AS
793}
794
795/*
796 * zfpm_writes_pending
797 *
798 * Returns TRUE if we may have something to write to the FPM.
799 */
d62a17ae 800static int zfpm_writes_pending(void)
5adc2528
AS
801{
802
d62a17ae 803 /*
804 * Check if there is any data in the outbound buffer that has not
805 * been written to the socket yet.
806 */
807 if (stream_get_endp(zfpm_g->obuf) - stream_get_getp(zfpm_g->obuf))
808 return 1;
5adc2528 809
d62a17ae 810 /*
811 * Check if there are any prefixes on the outbound queue.
812 */
813 if (!TAILQ_EMPTY(&zfpm_g->dest_q))
814 return 1;
5adc2528 815
d62a17ae 816 return 0;
5adc2528
AS
817}
818
819/*
820 * zfpm_encode_route
821 *
822 * Encode a message to the FPM with information about the given route.
823 *
824 * Returns the number of bytes written to the buffer. 0 or a negative
825 * value indicates an error.
826 */
d62a17ae 827static inline int zfpm_encode_route(rib_dest_t *dest, struct route_entry *re,
828 char *in_buf, size_t in_buf_len,
829 fpm_msg_type_e *msg_type)
5adc2528 830{
d62a17ae 831 size_t len;
9bf75362 832#ifdef HAVE_NETLINK
d62a17ae 833 int cmd;
9bf75362 834#endif
d62a17ae 835 len = 0;
5adc2528 836
d62a17ae 837 *msg_type = FPM_MSG_TYPE_NONE;
5adc2528 838
d62a17ae 839 switch (zfpm_g->message_format) {
5adc2528 840
d62a17ae 841 case ZFPM_MSG_FORMAT_PROTOBUF:
fb0aa886 842#ifdef HAVE_PROTOBUF
d62a17ae 843 len = zfpm_protobuf_encode_route(dest, re, (uint8_t *)in_buf,
844 in_buf_len);
845 *msg_type = FPM_MSG_TYPE_PROTOBUF;
fb0aa886 846#endif
d62a17ae 847 break;
5adc2528 848
d62a17ae 849 case ZFPM_MSG_FORMAT_NETLINK:
fb0aa886 850#ifdef HAVE_NETLINK
d62a17ae 851 *msg_type = FPM_MSG_TYPE_NETLINK;
852 cmd = re ? RTM_NEWROUTE : RTM_DELROUTE;
853 len = zfpm_netlink_encode_route(cmd, dest, re, in_buf,
854 in_buf_len);
855 assert(fpm_msg_align(len) == len);
856 *msg_type = FPM_MSG_TYPE_NETLINK;
5adc2528 857#endif /* HAVE_NETLINK */
d62a17ae 858 break;
fb0aa886 859
d62a17ae 860 default:
861 break;
862 }
fb0aa886 863
d62a17ae 864 return len;
5adc2528
AS
865}
866
867/*
868 * zfpm_route_for_update
869 *
f0f77c9a 870 * Returns the re that is to be sent to the FPM for a given dest.
5adc2528 871 */
d62a17ae 872struct route_entry *zfpm_route_for_update(rib_dest_t *dest)
5adc2528 873{
d62a17ae 874 struct route_entry *re;
5adc2528 875
a2addae8 876 RE_DEST_FOREACH_ROUTE (dest, re) {
d62a17ae 877 if (!CHECK_FLAG(re->status, ROUTE_ENTRY_SELECTED_FIB))
878 continue;
5adc2528 879
d62a17ae 880 return re;
881 }
5adc2528 882
d62a17ae 883 /*
884 * We have no route for this destination.
885 */
886 return NULL;
5adc2528
AS
887}
888
889/*
890 * zfpm_build_updates
891 *
892 * Process the outgoing queue and write messages to the outbound
893 * buffer.
894 */
d62a17ae 895static void zfpm_build_updates(void)
5adc2528 896{
d62a17ae 897 struct stream *s;
898 rib_dest_t *dest;
899 unsigned char *buf, *data, *buf_end;
900 size_t msg_len;
901 size_t data_len;
902 fpm_msg_hdr_t *hdr;
903 struct route_entry *re;
904 int is_add, write_msg;
905 fpm_msg_type_e msg_type;
906
907 s = zfpm_g->obuf;
908
909 assert(stream_empty(s));
910
911 do {
912
913 /*
914 * Make sure there is enough space to write another message.
915 */
916 if (STREAM_WRITEABLE(s) < FPM_MAX_MSG_LEN)
917 break;
918
919 buf = STREAM_DATA(s) + stream_get_endp(s);
920 buf_end = buf + STREAM_WRITEABLE(s);
921
922 dest = TAILQ_FIRST(&zfpm_g->dest_q);
923 if (!dest)
924 break;
925
926 assert(CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM));
927
928 hdr = (fpm_msg_hdr_t *)buf;
929 hdr->version = FPM_PROTO_VERSION;
930
931 data = fpm_msg_data(hdr);
932
933 re = zfpm_route_for_update(dest);
934 is_add = re ? 1 : 0;
935
936 write_msg = 1;
937
938 /*
939 * If this is a route deletion, and we have not sent the route
940 * to
941 * the FPM previously, skip it.
942 */
943 if (!is_add && !CHECK_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM)) {
944 write_msg = 0;
945 zfpm_g->stats.nop_deletes_skipped++;
946 }
947
948 if (write_msg) {
949 data_len = zfpm_encode_route(dest, re, (char *)data,
950 buf_end - data, &msg_type);
951
952 assert(data_len);
953 if (data_len) {
954 hdr->msg_type = msg_type;
955 msg_len = fpm_data_len_to_msg_len(data_len);
956 hdr->msg_len = htons(msg_len);
957 stream_forward_endp(s, msg_len);
958
959 if (is_add)
960 zfpm_g->stats.route_adds++;
961 else
962 zfpm_g->stats.route_dels++;
963 }
964 }
965
966 /*
967 * Remove the dest from the queue, and reset the flag.
968 */
969 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
970 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
971
972 if (is_add) {
973 SET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
974 } else {
975 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
976 }
977
978 /*
979 * Delete the destination if necessary.
980 */
981 if (rib_gc_dest(dest->rnode))
982 zfpm_g->stats.dests_del_after_update++;
983
984 } while (1);
5adc2528
AS
985}
986
987/*
988 * zfpm_write_cb
989 */
d62a17ae 990static int zfpm_write_cb(struct thread *thread)
5adc2528 991{
d62a17ae 992 struct stream *s;
993 int num_writes;
994
995 zfpm_g->stats.write_cb_calls++;
996 zfpm_g->t_write = NULL;
997
998 /*
999 * Check if async connect is now done.
1000 */
1001 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
1002 zfpm_connect_check();
1003 return 0;
1004 }
5adc2528 1005
d62a17ae 1006 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1007 assert(zfpm_g->sock >= 0);
5adc2528 1008
d62a17ae 1009 num_writes = 0;
5adc2528 1010
d62a17ae 1011 do {
1012 int bytes_to_write, bytes_written;
5adc2528 1013
d62a17ae 1014 s = zfpm_g->obuf;
5adc2528 1015
d62a17ae 1016 /*
1017 * If the stream is empty, try fill it up with data.
1018 */
1019 if (stream_empty(s)) {
1020 zfpm_build_updates();
1021 }
5adc2528 1022
d62a17ae 1023 bytes_to_write = stream_get_endp(s) - stream_get_getp(s);
1024 if (!bytes_to_write)
1025 break;
5adc2528 1026
d62a17ae 1027 bytes_written =
1028 write(zfpm_g->sock, STREAM_PNT(s), bytes_to_write);
1029 zfpm_g->stats.write_calls++;
1030 num_writes++;
5adc2528 1031
d62a17ae 1032 if (bytes_written < 0) {
1033 if (ERRNO_IO_RETRY(errno))
1034 break;
5adc2528 1035
d62a17ae 1036 zfpm_connection_down("failed to write to socket");
1037 return 0;
1038 }
5adc2528 1039
d62a17ae 1040 if (bytes_written != bytes_to_write) {
5adc2528 1041
d62a17ae 1042 /*
1043 * Partial write.
1044 */
1045 stream_forward_getp(s, bytes_written);
1046 zfpm_g->stats.partial_writes++;
1047 break;
1048 }
5adc2528 1049
d62a17ae 1050 /*
1051 * We've written out the entire contents of the stream.
1052 */
1053 stream_reset(s);
5adc2528 1054
d62a17ae 1055 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN) {
1056 zfpm_g->stats.max_writes_hit++;
1057 break;
1058 }
5adc2528 1059
d62a17ae 1060 if (zfpm_thread_should_yield(thread)) {
1061 zfpm_g->stats.t_write_yields++;
1062 break;
1063 }
1064 } while (1);
5adc2528 1065
d62a17ae 1066 if (zfpm_writes_pending())
1067 zfpm_write_on();
5adc2528 1068
d62a17ae 1069 return 0;
5adc2528
AS
1070}
1071
1072/*
1073 * zfpm_connect_cb
1074 */
d62a17ae 1075static int zfpm_connect_cb(struct thread *t)
5adc2528 1076{
d62a17ae 1077 int sock, ret;
1078 struct sockaddr_in serv;
1079
1080 zfpm_g->t_connect = NULL;
1081 assert(zfpm_g->state == ZFPM_STATE_ACTIVE);
1082
1083 sock = socket(AF_INET, SOCK_STREAM, 0);
1084 if (sock < 0) {
1085 zfpm_debug("Failed to create socket for connect(): %s",
1086 strerror(errno));
1087 zfpm_g->stats.connect_no_sock++;
1088 return 0;
1089 }
1090
1091 set_nonblocking(sock);
1092
1093 /* Make server socket. */
1094 memset(&serv, 0, sizeof(serv));
1095 serv.sin_family = AF_INET;
1096 serv.sin_port = htons(zfpm_g->fpm_port);
5adc2528 1097#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
d62a17ae 1098 serv.sin_len = sizeof(struct sockaddr_in);
5adc2528 1099#endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
d62a17ae 1100 if (!zfpm_g->fpm_server)
1101 serv.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1102 else
1103 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1104
1105 /*
1106 * Connect to the FPM.
1107 */
1108 zfpm_g->connect_calls++;
1109 zfpm_g->stats.connect_calls++;
1110 zfpm_g->last_connect_call_time = monotime(NULL);
1111
1112 ret = connect(sock, (struct sockaddr *)&serv, sizeof(serv));
1113 if (ret >= 0) {
1114 zfpm_g->sock = sock;
1115 zfpm_connection_up("connect succeeded");
1116 return 1;
1117 }
1118
1119 if (errno == EINPROGRESS) {
1120 zfpm_g->sock = sock;
1121 zfpm_read_on();
1122 zfpm_write_on();
1123 zfpm_set_state(ZFPM_STATE_CONNECTING,
1124 "async connect in progress");
1125 return 0;
1126 }
1127
1128 zlog_info("can't connect to FPM %d: %s", sock, safe_strerror(errno));
1129 close(sock);
1130
1131 /*
1132 * Restart timer for retrying connection.
1133 */
1134 zfpm_start_connect_timer("connect() failed");
1135 return 0;
5adc2528
AS
1136}
1137
1138/*
1139 * zfpm_set_state
1140 *
1141 * Move state machine into the given state.
1142 */
d62a17ae 1143static void zfpm_set_state(zfpm_state_t state, const char *reason)
5adc2528 1144{
d62a17ae 1145 zfpm_state_t cur_state = zfpm_g->state;
1146
1147 if (!reason)
1148 reason = "Unknown";
1149
1150 if (state == cur_state)
1151 return;
1152
1153 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1154 zfpm_state_to_str(cur_state), zfpm_state_to_str(state),
1155 reason);
1156
1157 switch (state) {
1158
1159 case ZFPM_STATE_IDLE:
1160 assert(cur_state == ZFPM_STATE_ESTABLISHED);
1161 break;
1162
1163 case ZFPM_STATE_ACTIVE:
1164 assert(cur_state == ZFPM_STATE_IDLE
1165 || cur_state == ZFPM_STATE_CONNECTING);
1166 assert(zfpm_g->t_connect);
1167 break;
1168
1169 case ZFPM_STATE_CONNECTING:
1170 assert(zfpm_g->sock);
1171 assert(cur_state == ZFPM_STATE_ACTIVE);
1172 assert(zfpm_g->t_read);
1173 assert(zfpm_g->t_write);
1174 break;
1175
1176 case ZFPM_STATE_ESTABLISHED:
1177 assert(cur_state == ZFPM_STATE_ACTIVE
1178 || cur_state == ZFPM_STATE_CONNECTING);
1179 assert(zfpm_g->sock);
1180 assert(zfpm_g->t_read);
1181 assert(zfpm_g->t_write);
1182 break;
1183 }
1184
1185 zfpm_g->state = state;
5adc2528
AS
1186}
1187
1188/*
1189 * zfpm_calc_connect_delay
1190 *
1191 * Returns the number of seconds after which we should attempt to
1192 * reconnect to the FPM.
1193 */
d62a17ae 1194static long zfpm_calc_connect_delay(void)
5adc2528 1195{
d62a17ae 1196 time_t elapsed;
5adc2528 1197
d62a17ae 1198 /*
1199 * Return 0 if this is our first attempt to connect.
1200 */
1201 if (zfpm_g->connect_calls == 0) {
1202 return 0;
1203 }
5adc2528 1204
d62a17ae 1205 elapsed = zfpm_get_elapsed_time(zfpm_g->last_connect_call_time);
5adc2528 1206
d62a17ae 1207 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1208 return 0;
1209 }
5adc2528 1210
d62a17ae 1211 return ZFPM_CONNECT_RETRY_IVL - elapsed;
5adc2528
AS
1212}
1213
1214/*
1215 * zfpm_start_connect_timer
1216 */
d62a17ae 1217static void zfpm_start_connect_timer(const char *reason)
5adc2528 1218{
d62a17ae 1219 long delay_secs;
5adc2528 1220
d62a17ae 1221 assert(!zfpm_g->t_connect);
1222 assert(zfpm_g->sock < 0);
5adc2528 1223
d62a17ae 1224 assert(zfpm_g->state == ZFPM_STATE_IDLE
1225 || zfpm_g->state == ZFPM_STATE_ACTIVE
1226 || zfpm_g->state == ZFPM_STATE_CONNECTING);
5adc2528 1227
d62a17ae 1228 delay_secs = zfpm_calc_connect_delay();
1229 zfpm_debug("scheduling connect in %ld seconds", delay_secs);
5adc2528 1230
d62a17ae 1231 thread_add_timer(zfpm_g->master, zfpm_connect_cb, 0, delay_secs,
1232 &zfpm_g->t_connect);
1233 zfpm_set_state(ZFPM_STATE_ACTIVE, reason);
5adc2528
AS
1234}
1235
1236/*
1237 * zfpm_is_enabled
1238 *
1239 * Returns TRUE if the zebra FPM module has been enabled.
1240 */
d62a17ae 1241static inline int zfpm_is_enabled(void)
5adc2528 1242{
d62a17ae 1243 return zfpm_g->enabled;
5adc2528
AS
1244}
1245
1246/*
1247 * zfpm_conn_is_up
1248 *
1249 * Returns TRUE if the connection to the FPM is up.
1250 */
d62a17ae 1251static inline int zfpm_conn_is_up(void)
5adc2528 1252{
d62a17ae 1253 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1254 return 0;
5adc2528 1255
d62a17ae 1256 assert(zfpm_g->sock >= 0);
5adc2528 1257
d62a17ae 1258 return 1;
5adc2528
AS
1259}
1260
1261/*
1262 * zfpm_trigger_update
1263 *
1264 * The zebra code invokes this function to indicate that we should
1265 * send an update to the FPM about the given route_node.
1266 */
d62a17ae 1267static int zfpm_trigger_update(struct route_node *rn, const char *reason)
5adc2528 1268{
d62a17ae 1269 rib_dest_t *dest;
1270 char buf[PREFIX_STRLEN];
1271
1272 /*
1273 * Ignore if the connection is down. We will update the FPM about
1274 * all destinations once the connection comes up.
1275 */
1276 if (!zfpm_conn_is_up())
1277 return 0;
1278
1279 dest = rib_dest_from_rnode(rn);
1280
1281 /*
1282 * Ignore the trigger if the dest is not in a table that we would
1283 * send to the FPM.
1284 */
1285 if (!zfpm_is_table_for_fpm(rib_dest_table(dest))) {
1286 zfpm_g->stats.non_fpm_table_triggers++;
1287 return 0;
1288 }
1289
1290 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
1291 zfpm_g->stats.redundant_triggers++;
1292 return 0;
1293 }
1294
1295 if (reason) {
1296 zfpm_debug("%s triggering update to FPM - Reason: %s",
1297 prefix2str(&rn->p, buf, sizeof(buf)), reason);
1298 }
1299
1300 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1301 TAILQ_INSERT_TAIL(&zfpm_g->dest_q, dest, fpm_q_entries);
1302 zfpm_g->stats.updates_triggered++;
1303
1304 /*
1305 * Make sure that writes are enabled.
1306 */
1307 if (zfpm_g->t_write)
1308 return 0;
1309
1310 zfpm_write_on();
1311 return 0;
5adc2528
AS
1312}
1313
1314/*
1315 * zfpm_stats_timer_cb
1316 */
d62a17ae 1317static int zfpm_stats_timer_cb(struct thread *t)
5adc2528 1318{
d62a17ae 1319 zfpm_g->t_stats = NULL;
5adc2528 1320
d62a17ae 1321 /*
1322 * Remember the stats collected in the last interval for display
1323 * purposes.
1324 */
1325 zfpm_stats_copy(&zfpm_g->stats, &zfpm_g->last_ivl_stats);
5adc2528 1326
d62a17ae 1327 /*
1328 * Add the current set of stats into the cumulative statistics.
1329 */
1330 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1331 &zfpm_g->cumulative_stats);
5adc2528 1332
d62a17ae 1333 /*
1334 * Start collecting stats afresh over the next interval.
1335 */
1336 zfpm_stats_reset(&zfpm_g->stats);
5adc2528 1337
d62a17ae 1338 zfpm_start_stats_timer();
5adc2528 1339
d62a17ae 1340 return 0;
5adc2528
AS
1341}
1342
1343/*
1344 * zfpm_stop_stats_timer
1345 */
d62a17ae 1346static void zfpm_stop_stats_timer(void)
5adc2528 1347{
d62a17ae 1348 if (!zfpm_g->t_stats)
1349 return;
5adc2528 1350
d62a17ae 1351 zfpm_debug("Stopping existing stats timer");
1352 THREAD_TIMER_OFF(zfpm_g->t_stats);
5adc2528
AS
1353}
1354
1355/*
1356 * zfpm_start_stats_timer
1357 */
d62a17ae 1358void zfpm_start_stats_timer(void)
5adc2528 1359{
d62a17ae 1360 assert(!zfpm_g->t_stats);
5adc2528 1361
d62a17ae 1362 thread_add_timer(zfpm_g->master, zfpm_stats_timer_cb, 0,
1363 ZFPM_STATS_IVL_SECS, &zfpm_g->t_stats);
5adc2528
AS
1364}
1365
1366/*
1367 * Helper macro for zfpm_show_stats() below.
1368 */
d62a17ae 1369#define ZFPM_SHOW_STAT(counter) \
1370 do { \
1371 vty_out(vty, "%-40s %10lu %16lu\n", #counter, \
1372 total_stats.counter, zfpm_g->last_ivl_stats.counter); \
1373 } while (0)
5adc2528
AS
1374
1375/*
1376 * zfpm_show_stats
1377 */
d62a17ae 1378static void zfpm_show_stats(struct vty *vty)
5adc2528 1379{
d62a17ae 1380 zfpm_stats_t total_stats;
1381 time_t elapsed;
1382
1383 vty_out(vty, "\n%-40s %10s Last %2d secs\n\n", "Counter", "Total",
1384 ZFPM_STATS_IVL_SECS);
1385
1386 /*
1387 * Compute the total stats up to this instant.
1388 */
1389 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1390 &total_stats);
1391
1392 ZFPM_SHOW_STAT(connect_calls);
1393 ZFPM_SHOW_STAT(connect_no_sock);
1394 ZFPM_SHOW_STAT(read_cb_calls);
1395 ZFPM_SHOW_STAT(write_cb_calls);
1396 ZFPM_SHOW_STAT(write_calls);
1397 ZFPM_SHOW_STAT(partial_writes);
1398 ZFPM_SHOW_STAT(max_writes_hit);
1399 ZFPM_SHOW_STAT(t_write_yields);
1400 ZFPM_SHOW_STAT(nop_deletes_skipped);
1401 ZFPM_SHOW_STAT(route_adds);
1402 ZFPM_SHOW_STAT(route_dels);
1403 ZFPM_SHOW_STAT(updates_triggered);
1404 ZFPM_SHOW_STAT(non_fpm_table_triggers);
1405 ZFPM_SHOW_STAT(redundant_triggers);
1406 ZFPM_SHOW_STAT(dests_del_after_update);
1407 ZFPM_SHOW_STAT(t_conn_down_starts);
1408 ZFPM_SHOW_STAT(t_conn_down_dests_processed);
1409 ZFPM_SHOW_STAT(t_conn_down_yields);
1410 ZFPM_SHOW_STAT(t_conn_down_finishes);
1411 ZFPM_SHOW_STAT(t_conn_up_starts);
1412 ZFPM_SHOW_STAT(t_conn_up_dests_processed);
1413 ZFPM_SHOW_STAT(t_conn_up_yields);
1414 ZFPM_SHOW_STAT(t_conn_up_aborts);
1415 ZFPM_SHOW_STAT(t_conn_up_finishes);
1416
1417 if (!zfpm_g->last_stats_clear_time)
1418 return;
1419
1420 elapsed = zfpm_get_elapsed_time(zfpm_g->last_stats_clear_time);
1421
1422 vty_out(vty, "\nStats were cleared %lu seconds ago\n",
1423 (unsigned long)elapsed);
5adc2528
AS
1424}
1425
1426/*
1427 * zfpm_clear_stats
1428 */
d62a17ae 1429static void zfpm_clear_stats(struct vty *vty)
5adc2528 1430{
d62a17ae 1431 if (!zfpm_is_enabled()) {
1432 vty_out(vty, "The FPM module is not enabled...\n");
1433 return;
1434 }
5adc2528 1435
d62a17ae 1436 zfpm_stats_reset(&zfpm_g->stats);
1437 zfpm_stats_reset(&zfpm_g->last_ivl_stats);
1438 zfpm_stats_reset(&zfpm_g->cumulative_stats);
5adc2528 1439
d62a17ae 1440 zfpm_stop_stats_timer();
1441 zfpm_start_stats_timer();
5adc2528 1442
d62a17ae 1443 zfpm_g->last_stats_clear_time = monotime(NULL);
5adc2528 1444
d62a17ae 1445 vty_out(vty, "Cleared FPM stats\n");
5adc2528
AS
1446}
1447
1448/*
1449 * show_zebra_fpm_stats
1450 */
1451DEFUN (show_zebra_fpm_stats,
1452 show_zebra_fpm_stats_cmd,
1453 "show zebra fpm stats",
1454 SHOW_STR
1455 "Zebra information\n"
1456 "Forwarding Path Manager information\n"
1457 "Statistics\n")
1458{
d62a17ae 1459 zfpm_show_stats(vty);
1460 return CMD_SUCCESS;
5adc2528
AS
1461}
1462
1463/*
1464 * clear_zebra_fpm_stats
1465 */
1466DEFUN (clear_zebra_fpm_stats,
1467 clear_zebra_fpm_stats_cmd,
1468 "clear zebra fpm stats",
1469 CLEAR_STR
1470 "Zebra information\n"
1471 "Clear Forwarding Path Manager information\n"
1472 "Statistics\n")
1473{
d62a17ae 1474 zfpm_clear_stats(vty);
1475 return CMD_SUCCESS;
5adc2528
AS
1476}
1477
711ff0ba 1478/*
d62a17ae 1479 * update fpm connection information
711ff0ba 1480 */
e52702f2
QY
1481DEFUN ( fpm_remote_ip,
1482 fpm_remote_ip_cmd,
1483 "fpm connection ip A.B.C.D port (1-65535)",
711ff0ba
USK
1484 "fpm connection remote ip and port\n"
1485 "Remote fpm server ip A.B.C.D\n"
1486 "Enter ip ")
1487{
1488
d62a17ae 1489 in_addr_t fpm_server;
1490 uint32_t port_no;
711ff0ba 1491
d62a17ae 1492 fpm_server = inet_addr(argv[3]->arg);
1493 if (fpm_server == INADDR_NONE)
1494 return CMD_ERR_INCOMPLETE;
711ff0ba 1495
d62a17ae 1496 port_no = atoi(argv[5]->arg);
1497 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1498 return CMD_ERR_INCOMPLETE;
711ff0ba 1499
d62a17ae 1500 zfpm_g->fpm_server = fpm_server;
1501 zfpm_g->fpm_port = port_no;
711ff0ba
USK
1502
1503
d62a17ae 1504 return CMD_SUCCESS;
711ff0ba
USK
1505}
1506
e52702f2
QY
1507DEFUN ( no_fpm_remote_ip,
1508 no_fpm_remote_ip_cmd,
1509 "no fpm connection ip A.B.C.D port (1-65535)",
711ff0ba
USK
1510 "fpm connection remote ip and port\n"
1511 "Connection\n"
1512 "Remote fpm server ip A.B.C.D\n"
1513 "Enter ip ")
1514{
d62a17ae 1515 if (zfpm_g->fpm_server != inet_addr(argv[4]->arg)
1516 || zfpm_g->fpm_port != atoi(argv[6]->arg))
1517 return CMD_ERR_NO_MATCH;
711ff0ba 1518
d62a17ae 1519 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1520 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
711ff0ba 1521
d62a17ae 1522 return CMD_SUCCESS;
711ff0ba 1523}
711ff0ba 1524
fb0aa886
AS
1525/*
1526 * zfpm_init_message_format
1527 */
d62a17ae 1528static inline void zfpm_init_message_format(const char *format)
fb0aa886 1529{
d62a17ae 1530 int have_netlink, have_protobuf;
fb0aa886 1531
fb0aa886 1532#ifdef HAVE_NETLINK
d62a17ae 1533 have_netlink = 1;
4b2792b5 1534#else
d62a17ae 1535 have_netlink = 0;
fb0aa886
AS
1536#endif
1537
1538#ifdef HAVE_PROTOBUF
d62a17ae 1539 have_protobuf = 1;
4b2792b5 1540#else
d62a17ae 1541 have_protobuf = 0;
fb0aa886
AS
1542#endif
1543
d62a17ae 1544 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
fb0aa886 1545
d62a17ae 1546 if (!format) {
1547 if (have_netlink) {
1548 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1549 } else if (have_protobuf) {
1550 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1551 }
1552 return;
fb0aa886 1553 }
fb0aa886 1554
d62a17ae 1555 if (!strcmp("netlink", format)) {
1556 if (!have_netlink) {
1557 zlog_err("FPM netlink message format is not available");
1558 return;
1559 }
1560 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1561 return;
fb0aa886 1562 }
fb0aa886 1563
d62a17ae 1564 if (!strcmp("protobuf", format)) {
1565 if (!have_protobuf) {
1566 zlog_err(
1567 "FPM protobuf message format is not available");
1568 return;
1569 }
1570 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1571 return;
fb0aa886 1572 }
fb0aa886 1573
d62a17ae 1574 zlog_warn("Unknown fpm format '%s'", format);
fb0aa886
AS
1575}
1576
711ff0ba 1577/**
d62a17ae 1578 * fpm_remote_srv_write
711ff0ba 1579 *
d62a17ae 1580 * Module to write remote fpm connection
711ff0ba
USK
1581 *
1582 * Returns ZERO on success.
1583 */
1584
d62a17ae 1585static int fpm_remote_srv_write(struct vty *vty)
711ff0ba 1586{
d62a17ae 1587 struct in_addr in;
711ff0ba 1588
d62a17ae 1589 in.s_addr = zfpm_g->fpm_server;
711ff0ba 1590
9d1c2659
DL
1591 if ((zfpm_g->fpm_server != FPM_DEFAULT_IP
1592 && zfpm_g->fpm_server != INADDR_ANY)
1593 || (zfpm_g->fpm_port != FPM_DEFAULT_PORT
1594 && zfpm_g->fpm_port != 0))
d62a17ae 1595 vty_out(vty, "fpm connection ip %s port %d\n", inet_ntoa(in),
1596 zfpm_g->fpm_port);
711ff0ba 1597
d62a17ae 1598 return 0;
711ff0ba
USK
1599}
1600
1601
4f8ea50c 1602/* Zebra node */
d62a17ae 1603static struct cmd_node zebra_node = {ZEBRA_NODE, "", 1};
4f8ea50c
DL
1604
1605
5adc2528
AS
1606/**
1607 * zfpm_init
1608 *
1609 * One-time initialization of the Zebra FPM module.
1610 *
1611 * @param[in] port port at which FPM is running.
1612 * @param[in] enable TRUE if the zebra FPM module should be enabled
fb0aa886 1613 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
5adc2528
AS
1614 *
1615 * Returns TRUE on success.
1616 */
d62a17ae 1617static int zfpm_init(struct thread_master *master)
5adc2528 1618{
d62a17ae 1619 int enable = 1;
1620 uint16_t port = 0;
1621 const char *format = THIS_MODULE->load_args;
5adc2528 1622
d62a17ae 1623 memset(zfpm_g, 0, sizeof(*zfpm_g));
1624 zfpm_g->master = master;
1625 TAILQ_INIT(&zfpm_g->dest_q);
1626 zfpm_g->sock = -1;
1627 zfpm_g->state = ZFPM_STATE_IDLE;
5adc2528 1628
d62a17ae 1629 zfpm_stats_init(&zfpm_g->stats);
1630 zfpm_stats_init(&zfpm_g->last_ivl_stats);
1631 zfpm_stats_init(&zfpm_g->cumulative_stats);
5adc2528 1632
d62a17ae 1633 install_node(&zebra_node, fpm_remote_srv_write);
1634 install_element(ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1635 install_element(ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1636 install_element(CONFIG_NODE, &fpm_remote_ip_cmd);
1637 install_element(CONFIG_NODE, &no_fpm_remote_ip_cmd);
5adc2528 1638
d62a17ae 1639 zfpm_init_message_format(format);
fb0aa886 1640
d62a17ae 1641 /*
1642 * Disable FPM interface if no suitable format is available.
1643 */
1644 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1645 enable = 0;
fb0aa886 1646
d62a17ae 1647 zfpm_g->enabled = enable;
5adc2528 1648
d62a17ae 1649 if (!zfpm_g->fpm_server)
1650 zfpm_g->fpm_server = FPM_DEFAULT_IP;
711ff0ba 1651
d62a17ae 1652 if (!port)
1653 port = FPM_DEFAULT_PORT;
5adc2528 1654
d62a17ae 1655 zfpm_g->fpm_port = port;
5adc2528 1656
d62a17ae 1657 zfpm_g->obuf = stream_new(ZFPM_OBUF_SIZE);
1658 zfpm_g->ibuf = stream_new(ZFPM_IBUF_SIZE);
5adc2528 1659
d62a17ae 1660 zfpm_start_stats_timer();
1661 zfpm_start_connect_timer("initialized");
1662 return 0;
4f8ea50c 1663}
5adc2528 1664
d62a17ae 1665static int zebra_fpm_module_init(void)
4f8ea50c 1666{
d62a17ae 1667 hook_register(rib_update, zfpm_trigger_update);
1668 hook_register(frr_late_init, zfpm_init);
1669 return 0;
5adc2528 1670}
4f8ea50c 1671
d62a17ae 1672FRR_MODULE_SETUP(.name = "zebra_fpm", .version = FRR_VERSION,
1673 .description = "zebra FPM (Forwarding Plane Manager) module",
1674 .init = zebra_fpm_module_init, )