]> git.proxmox.com Git - mirror_frr.git/blame - zebra/zebra_fpm.c
Merge pull request #2823 from opensourcerouting/snap-staticd
[mirror_frr.git] / zebra / zebra_fpm.c
CommitLineData
5adc2528
AS
1/*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
896014f4
DL
19 * You should have received a copy of the GNU General Public License along
20 * with this program; see the file COPYING; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
5adc2528
AS
22 */
23
24#include <zebra.h>
25
26#include "log.h"
4f8ea50c 27#include "libfrr.h"
5adc2528
AS
28#include "stream.h"
29#include "thread.h"
30#include "network.h"
31#include "command.h"
4f8ea50c 32#include "version.h"
5adc2528
AS
33
34#include "zebra/rib.h"
7c551956
DS
35#include "zebra/zserv.h"
36#include "zebra/zebra_ns.h"
37#include "zebra/zebra_vrf.h"
5adc2528
AS
38
39#include "fpm/fpm.h"
5adc2528
AS
40#include "zebra_fpm_private.h"
41
42/*
43 * Interval at which we attempt to connect to the FPM.
44 */
45#define ZFPM_CONNECT_RETRY_IVL 5
46
47/*
48 * Sizes of outgoing and incoming stream buffers for writing/reading
49 * FPM messages.
50 */
51#define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
52#define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
53
54/*
55 * The maximum number of times the FPM socket write callback can call
56 * 'write' before it yields.
57 */
58#define ZFPM_MAX_WRITES_PER_RUN 10
59
60/*
61 * Interval over which we collect statistics.
62 */
63#define ZFPM_STATS_IVL_SECS 10
64
65/*
66 * Structure that holds state for iterating over all route_node
67 * structures that are candidates for being communicated to the FPM.
68 */
d62a17ae 69typedef struct zfpm_rnodes_iter_t_ {
70 rib_tables_iter_t tables_iter;
71 route_table_iter_t iter;
5adc2528
AS
72} zfpm_rnodes_iter_t;
73
74/*
75 * Statistics.
76 */
77typedef struct zfpm_stats_t_ {
d62a17ae 78 unsigned long connect_calls;
79 unsigned long connect_no_sock;
5adc2528 80
d62a17ae 81 unsigned long read_cb_calls;
5adc2528 82
d62a17ae 83 unsigned long write_cb_calls;
84 unsigned long write_calls;
85 unsigned long partial_writes;
86 unsigned long max_writes_hit;
87 unsigned long t_write_yields;
5adc2528 88
d62a17ae 89 unsigned long nop_deletes_skipped;
90 unsigned long route_adds;
91 unsigned long route_dels;
5adc2528 92
d62a17ae 93 unsigned long updates_triggered;
94 unsigned long redundant_triggers;
5adc2528 95
d62a17ae 96 unsigned long dests_del_after_update;
5adc2528 97
d62a17ae 98 unsigned long t_conn_down_starts;
99 unsigned long t_conn_down_dests_processed;
100 unsigned long t_conn_down_yields;
101 unsigned long t_conn_down_finishes;
5adc2528 102
d62a17ae 103 unsigned long t_conn_up_starts;
104 unsigned long t_conn_up_dests_processed;
105 unsigned long t_conn_up_yields;
106 unsigned long t_conn_up_aborts;
107 unsigned long t_conn_up_finishes;
5adc2528
AS
108
109} zfpm_stats_t;
110
111/*
112 * States for the FPM state machine.
113 */
114typedef enum {
115
d62a17ae 116 /*
117 * In this state we are not yet ready to connect to the FPM. This
118 * can happen when this module is disabled, or if we're cleaning up
119 * after a connection has gone down.
120 */
121 ZFPM_STATE_IDLE,
122
123 /*
124 * Ready to talk to the FPM and periodically trying to connect to
125 * it.
126 */
127 ZFPM_STATE_ACTIVE,
128
129 /*
130 * In the middle of bringing up a TCP connection. Specifically,
131 * waiting for a connect() call to complete asynchronously.
132 */
133 ZFPM_STATE_CONNECTING,
134
135 /*
136 * TCP connection to the FPM is up.
137 */
138 ZFPM_STATE_ESTABLISHED
5adc2528
AS
139
140} zfpm_state_t;
141
fb0aa886
AS
142/*
143 * Message format to be used to communicate with the FPM.
144 */
d62a17ae 145typedef enum {
146 ZFPM_MSG_FORMAT_NONE,
147 ZFPM_MSG_FORMAT_NETLINK,
148 ZFPM_MSG_FORMAT_PROTOBUF,
fb0aa886 149} zfpm_msg_format_e;
5adc2528
AS
150/*
151 * Globals.
152 */
d62a17ae 153typedef struct zfpm_glob_t_ {
154
155 /*
156 * True if the FPM module has been enabled.
157 */
158 int enabled;
159
160 /*
161 * Message format to be used to communicate with the fpm.
162 */
163 zfpm_msg_format_e message_format;
164
165 struct thread_master *master;
166
167 zfpm_state_t state;
168
169 in_addr_t fpm_server;
170 /*
171 * Port on which the FPM is running.
172 */
173 int fpm_port;
174
175 /*
176 * List of rib_dest_t structures to be processed
177 */
178 TAILQ_HEAD(zfpm_dest_q, rib_dest_t_) dest_q;
179
180 /*
181 * Stream socket to the FPM.
182 */
183 int sock;
184
185 /*
186 * Buffers for messages to/from the FPM.
187 */
188 struct stream *obuf;
189 struct stream *ibuf;
190
191 /*
192 * Threads for I/O.
193 */
194 struct thread *t_connect;
195 struct thread *t_write;
196 struct thread *t_read;
197
198 /*
199 * Thread to clean up after the TCP connection to the FPM goes down
200 * and the state that belongs to it.
201 */
202 struct thread *t_conn_down;
203
204 struct {
205 zfpm_rnodes_iter_t iter;
206 } t_conn_down_state;
207
208 /*
209 * Thread to take actions once the TCP conn to the FPM comes up, and
210 * the state that belongs to it.
211 */
212 struct thread *t_conn_up;
213
214 struct {
215 zfpm_rnodes_iter_t iter;
216 } t_conn_up_state;
217
218 unsigned long connect_calls;
219 time_t last_connect_call_time;
220
221 /*
222 * Stats from the start of the current statistics interval up to
223 * now. These are the counters we typically update in the code.
224 */
225 zfpm_stats_t stats;
226
227 /*
228 * Statistics that were gathered in the last collection interval.
229 */
230 zfpm_stats_t last_ivl_stats;
231
232 /*
233 * Cumulative stats from the last clear to the start of the current
234 * statistics interval.
235 */
236 zfpm_stats_t cumulative_stats;
237
238 /*
239 * Stats interval timer.
240 */
241 struct thread *t_stats;
242
243 /*
244 * If non-zero, the last time when statistics were cleared.
245 */
246 time_t last_stats_clear_time;
5adc2528
AS
247
248} zfpm_glob_t;
249
250static zfpm_glob_t zfpm_glob_space;
251static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
252
d62a17ae 253static int zfpm_trigger_update(struct route_node *rn, const char *reason);
4f8ea50c 254
d62a17ae 255static int zfpm_read_cb(struct thread *thread);
256static int zfpm_write_cb(struct thread *thread);
5adc2528 257
d62a17ae 258static void zfpm_set_state(zfpm_state_t state, const char *reason);
259static void zfpm_start_connect_timer(const char *reason);
260static void zfpm_start_stats_timer(void);
5adc2528
AS
261
262/*
263 * zfpm_thread_should_yield
264 */
d62a17ae 265static inline int zfpm_thread_should_yield(struct thread *t)
5adc2528 266{
d62a17ae 267 return thread_should_yield(t);
5adc2528
AS
268}
269
270/*
271 * zfpm_state_to_str
272 */
d62a17ae 273static const char *zfpm_state_to_str(zfpm_state_t state)
5adc2528 274{
d62a17ae 275 switch (state) {
5adc2528 276
d62a17ae 277 case ZFPM_STATE_IDLE:
278 return "idle";
5adc2528 279
d62a17ae 280 case ZFPM_STATE_ACTIVE:
281 return "active";
5adc2528 282
d62a17ae 283 case ZFPM_STATE_CONNECTING:
284 return "connecting";
5adc2528 285
d62a17ae 286 case ZFPM_STATE_ESTABLISHED:
287 return "established";
5adc2528 288
d62a17ae 289 default:
290 return "unknown";
291 }
5adc2528
AS
292}
293
5adc2528
AS
294/*
295 * zfpm_get_elapsed_time
296 *
297 * Returns the time elapsed (in seconds) since the given time.
298 */
d62a17ae 299static time_t zfpm_get_elapsed_time(time_t reference)
5adc2528 300{
d62a17ae 301 time_t now;
5adc2528 302
d62a17ae 303 now = monotime(NULL);
5adc2528 304
d62a17ae 305 if (now < reference) {
306 assert(0);
307 return 0;
308 }
5adc2528 309
d62a17ae 310 return now - reference;
5adc2528
AS
311}
312
5adc2528
AS
313/*
314 * zfpm_rnodes_iter_init
315 */
d62a17ae 316static inline void zfpm_rnodes_iter_init(zfpm_rnodes_iter_t *iter)
5adc2528 317{
d62a17ae 318 memset(iter, 0, sizeof(*iter));
319 rib_tables_iter_init(&iter->tables_iter);
320
321 /*
322 * This is a hack, but it makes implementing 'next' easier by
323 * ensuring that route_table_iter_next() will return NULL the first
324 * time we call it.
325 */
326 route_table_iter_init(&iter->iter, NULL);
327 route_table_iter_cleanup(&iter->iter);
5adc2528
AS
328}
329
330/*
331 * zfpm_rnodes_iter_next
332 */
d62a17ae 333static inline struct route_node *zfpm_rnodes_iter_next(zfpm_rnodes_iter_t *iter)
5adc2528 334{
d62a17ae 335 struct route_node *rn;
336 struct route_table *table;
5adc2528 337
d62a17ae 338 while (1) {
339 rn = route_table_iter_next(&iter->iter);
340 if (rn)
341 return rn;
5adc2528 342
d62a17ae 343 /*
344 * We've made our way through this table, go to the next one.
345 */
346 route_table_iter_cleanup(&iter->iter);
5adc2528 347
c6bbea17 348 table = rib_tables_iter_next(&iter->tables_iter);
5adc2528 349
d62a17ae 350 if (!table)
351 return NULL;
5adc2528 352
d62a17ae 353 route_table_iter_init(&iter->iter, table);
354 }
5adc2528 355
d62a17ae 356 return NULL;
5adc2528
AS
357}
358
359/*
360 * zfpm_rnodes_iter_pause
361 */
d62a17ae 362static inline void zfpm_rnodes_iter_pause(zfpm_rnodes_iter_t *iter)
5adc2528 363{
d62a17ae 364 route_table_iter_pause(&iter->iter);
5adc2528
AS
365}
366
367/*
368 * zfpm_rnodes_iter_cleanup
369 */
d62a17ae 370static inline void zfpm_rnodes_iter_cleanup(zfpm_rnodes_iter_t *iter)
5adc2528 371{
d62a17ae 372 route_table_iter_cleanup(&iter->iter);
373 rib_tables_iter_cleanup(&iter->tables_iter);
5adc2528
AS
374}
375
376/*
377 * zfpm_stats_init
378 *
379 * Initialize a statistics block.
380 */
d62a17ae 381static inline void zfpm_stats_init(zfpm_stats_t *stats)
5adc2528 382{
d62a17ae 383 memset(stats, 0, sizeof(*stats));
5adc2528
AS
384}
385
386/*
387 * zfpm_stats_reset
388 */
d62a17ae 389static inline void zfpm_stats_reset(zfpm_stats_t *stats)
5adc2528 390{
d62a17ae 391 zfpm_stats_init(stats);
5adc2528
AS
392}
393
394/*
395 * zfpm_stats_copy
396 */
d62a17ae 397static inline void zfpm_stats_copy(const zfpm_stats_t *src, zfpm_stats_t *dest)
5adc2528 398{
d62a17ae 399 memcpy(dest, src, sizeof(*dest));
5adc2528
AS
400}
401
402/*
403 * zfpm_stats_compose
404 *
405 * Total up the statistics in two stats structures ('s1 and 's2') and
406 * return the result in the third argument, 'result'. Note that the
407 * pointer 'result' may be the same as 's1' or 's2'.
408 *
409 * For simplicity, the implementation below assumes that the stats
410 * structure is composed entirely of counters. This can easily be
411 * changed when necessary.
412 */
d62a17ae 413static void zfpm_stats_compose(const zfpm_stats_t *s1, const zfpm_stats_t *s2,
414 zfpm_stats_t *result)
5adc2528 415{
d62a17ae 416 const unsigned long *p1, *p2;
417 unsigned long *result_p;
418 int i, num_counters;
5adc2528 419
d62a17ae 420 p1 = (const unsigned long *)s1;
421 p2 = (const unsigned long *)s2;
422 result_p = (unsigned long *)result;
5adc2528 423
d62a17ae 424 num_counters = (sizeof(zfpm_stats_t) / sizeof(unsigned long));
5adc2528 425
d62a17ae 426 for (i = 0; i < num_counters; i++) {
427 result_p[i] = p1[i] + p2[i];
428 }
5adc2528
AS
429}
430
431/*
432 * zfpm_read_on
433 */
d62a17ae 434static inline void zfpm_read_on(void)
5adc2528 435{
d62a17ae 436 assert(!zfpm_g->t_read);
437 assert(zfpm_g->sock >= 0);
5adc2528 438
d62a17ae 439 thread_add_read(zfpm_g->master, zfpm_read_cb, 0, zfpm_g->sock,
440 &zfpm_g->t_read);
5adc2528
AS
441}
442
443/*
444 * zfpm_write_on
445 */
d62a17ae 446static inline void zfpm_write_on(void)
5adc2528 447{
d62a17ae 448 assert(!zfpm_g->t_write);
449 assert(zfpm_g->sock >= 0);
5adc2528 450
d62a17ae 451 thread_add_write(zfpm_g->master, zfpm_write_cb, 0, zfpm_g->sock,
452 &zfpm_g->t_write);
5adc2528
AS
453}
454
455/*
456 * zfpm_read_off
457 */
d62a17ae 458static inline void zfpm_read_off(void)
5adc2528 459{
d62a17ae 460 THREAD_READ_OFF(zfpm_g->t_read);
5adc2528
AS
461}
462
463/*
464 * zfpm_write_off
465 */
d62a17ae 466static inline void zfpm_write_off(void)
5adc2528 467{
d62a17ae 468 THREAD_WRITE_OFF(zfpm_g->t_write);
5adc2528
AS
469}
470
471/*
472 * zfpm_conn_up_thread_cb
473 *
474 * Callback for actions to be taken when the connection to the FPM
475 * comes up.
476 */
d62a17ae 477static int zfpm_conn_up_thread_cb(struct thread *thread)
5adc2528 478{
d62a17ae 479 struct route_node *rnode;
480 zfpm_rnodes_iter_t *iter;
481 rib_dest_t *dest;
5adc2528 482
d62a17ae 483 zfpm_g->t_conn_up = NULL;
5adc2528 484
d62a17ae 485 iter = &zfpm_g->t_conn_up_state.iter;
5adc2528 486
d62a17ae 487 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED) {
488 zfpm_debug(
489 "Connection not up anymore, conn_up thread aborting");
490 zfpm_g->stats.t_conn_up_aborts++;
491 goto done;
492 }
5adc2528 493
d62a17ae 494 while ((rnode = zfpm_rnodes_iter_next(iter))) {
495 dest = rib_dest_from_rnode(rnode);
496
497 if (dest) {
498 zfpm_g->stats.t_conn_up_dests_processed++;
499 zfpm_trigger_update(rnode, NULL);
500 }
501
502 /*
503 * Yield if need be.
504 */
505 if (!zfpm_thread_should_yield(thread))
506 continue;
507
508 zfpm_g->stats.t_conn_up_yields++;
509 zfpm_rnodes_iter_pause(iter);
510 zfpm_g->t_conn_up = NULL;
511 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb,
512 NULL, 0, &zfpm_g->t_conn_up);
513 return 0;
5adc2528
AS
514 }
515
d62a17ae 516 zfpm_g->stats.t_conn_up_finishes++;
517
518done:
519 zfpm_rnodes_iter_cleanup(iter);
520 return 0;
5adc2528
AS
521}
522
523/*
524 * zfpm_connection_up
525 *
526 * Called when the connection to the FPM comes up.
527 */
d62a17ae 528static void zfpm_connection_up(const char *detail)
5adc2528 529{
d62a17ae 530 assert(zfpm_g->sock >= 0);
531 zfpm_read_on();
532 zfpm_write_on();
533 zfpm_set_state(ZFPM_STATE_ESTABLISHED, detail);
534
535 /*
536 * Start thread to push existing routes to the FPM.
537 */
538 assert(!zfpm_g->t_conn_up);
539
540 zfpm_rnodes_iter_init(&zfpm_g->t_conn_up_state.iter);
541
542 zfpm_debug("Starting conn_up thread");
543 zfpm_g->t_conn_up = NULL;
544 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0,
545 &zfpm_g->t_conn_up);
546 zfpm_g->stats.t_conn_up_starts++;
5adc2528
AS
547}
548
549/*
550 * zfpm_connect_check
551 *
552 * Check if an asynchronous connect() to the FPM is complete.
553 */
d62a17ae 554static void zfpm_connect_check(void)
5adc2528 555{
d62a17ae 556 int status;
557 socklen_t slen;
558 int ret;
559
560 zfpm_read_off();
561 zfpm_write_off();
562
563 slen = sizeof(status);
564 ret = getsockopt(zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *)&status,
565 &slen);
566
567 if (ret >= 0 && status == 0) {
568 zfpm_connection_up("async connect complete");
569 return;
570 }
571
572 /*
573 * getsockopt() failed or indicated an error on the socket.
574 */
575 close(zfpm_g->sock);
576 zfpm_g->sock = -1;
577
578 zfpm_start_connect_timer("getsockopt() after async connect failed");
579 return;
5adc2528
AS
580}
581
582/*
583 * zfpm_conn_down_thread_cb
584 *
585 * Callback that is invoked to clean up state after the TCP connection
586 * to the FPM goes down.
587 */
d62a17ae 588static int zfpm_conn_down_thread_cb(struct thread *thread)
5adc2528 589{
d62a17ae 590 struct route_node *rnode;
591 zfpm_rnodes_iter_t *iter;
592 rib_dest_t *dest;
5adc2528 593
d62a17ae 594 assert(zfpm_g->state == ZFPM_STATE_IDLE);
5adc2528 595
d62a17ae 596 zfpm_g->t_conn_down = NULL;
5adc2528 597
d62a17ae 598 iter = &zfpm_g->t_conn_down_state.iter;
5adc2528 599
d62a17ae 600 while ((rnode = zfpm_rnodes_iter_next(iter))) {
601 dest = rib_dest_from_rnode(rnode);
5adc2528 602
d62a17ae 603 if (dest) {
604 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
605 TAILQ_REMOVE(&zfpm_g->dest_q, dest,
606 fpm_q_entries);
607 }
608
609 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
610 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
5adc2528 611
d62a17ae 612 zfpm_g->stats.t_conn_down_dests_processed++;
5adc2528 613
d62a17ae 614 /*
615 * Check if the dest should be deleted.
616 */
617 rib_gc_dest(rnode);
618 }
5adc2528 619
d62a17ae 620 /*
621 * Yield if need be.
622 */
623 if (!zfpm_thread_should_yield(thread))
624 continue;
625
626 zfpm_g->stats.t_conn_down_yields++;
627 zfpm_rnodes_iter_pause(iter);
628 zfpm_g->t_conn_down = NULL;
629 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb,
630 NULL, 0, &zfpm_g->t_conn_down);
631 return 0;
5adc2528
AS
632 }
633
d62a17ae 634 zfpm_g->stats.t_conn_down_finishes++;
635 zfpm_rnodes_iter_cleanup(iter);
636
637 /*
638 * Start the process of connecting to the FPM again.
639 */
640 zfpm_start_connect_timer("cleanup complete");
641 return 0;
5adc2528
AS
642}
643
644/*
645 * zfpm_connection_down
646 *
647 * Called when the connection to the FPM has gone down.
648 */
d62a17ae 649static void zfpm_connection_down(const char *detail)
5adc2528 650{
d62a17ae 651 if (!detail)
652 detail = "unknown";
5adc2528 653
d62a17ae 654 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
5adc2528 655
d62a17ae 656 zlog_info("connection to the FPM has gone down: %s", detail);
5adc2528 657
d62a17ae 658 zfpm_read_off();
659 zfpm_write_off();
5adc2528 660
d62a17ae 661 stream_reset(zfpm_g->ibuf);
662 stream_reset(zfpm_g->obuf);
5adc2528 663
d62a17ae 664 if (zfpm_g->sock >= 0) {
665 close(zfpm_g->sock);
666 zfpm_g->sock = -1;
667 }
5adc2528 668
d62a17ae 669 /*
670 * Start thread to clean up state after the connection goes down.
671 */
672 assert(!zfpm_g->t_conn_down);
673 zfpm_debug("Starting conn_down thread");
674 zfpm_rnodes_iter_init(&zfpm_g->t_conn_down_state.iter);
675 zfpm_g->t_conn_down = NULL;
676 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0,
677 &zfpm_g->t_conn_down);
678 zfpm_g->stats.t_conn_down_starts++;
679
680 zfpm_set_state(ZFPM_STATE_IDLE, detail);
5adc2528
AS
681}
682
683/*
684 * zfpm_read_cb
685 */
d62a17ae 686static int zfpm_read_cb(struct thread *thread)
5adc2528 687{
d62a17ae 688 size_t already;
689 struct stream *ibuf;
690 uint16_t msg_len;
691 fpm_msg_hdr_t *hdr;
692
693 zfpm_g->stats.read_cb_calls++;
694 zfpm_g->t_read = NULL;
695
696 /*
697 * Check if async connect is now done.
698 */
699 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
700 zfpm_connect_check();
701 return 0;
5adc2528
AS
702 }
703
d62a17ae 704 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
705 assert(zfpm_g->sock >= 0);
5adc2528 706
d62a17ae 707 ibuf = zfpm_g->ibuf;
5adc2528 708
d62a17ae 709 already = stream_get_endp(ibuf);
710 if (already < FPM_MSG_HDR_LEN) {
711 ssize_t nbyte;
5adc2528 712
d62a17ae 713 nbyte = stream_read_try(ibuf, zfpm_g->sock,
714 FPM_MSG_HDR_LEN - already);
715 if (nbyte == 0 || nbyte == -1) {
677f704d
DS
716 if (nbyte == -1) {
717 char buffer[1024];
718
719 sprintf(buffer, "closed socket in read(%d): %s",
720 errno, safe_strerror(errno));
721 zfpm_connection_down(buffer);
996c9314 722 } else
677f704d 723 zfpm_connection_down("closed socket in read");
d62a17ae 724 return 0;
725 }
5adc2528 726
d62a17ae 727 if (nbyte != (ssize_t)(FPM_MSG_HDR_LEN - already))
728 goto done;
5adc2528 729
d62a17ae 730 already = FPM_MSG_HDR_LEN;
731 }
5adc2528 732
d62a17ae 733 stream_set_getp(ibuf, 0);
5adc2528 734
d62a17ae 735 hdr = (fpm_msg_hdr_t *)stream_pnt(ibuf);
5adc2528 736
d62a17ae 737 if (!fpm_msg_hdr_ok(hdr)) {
738 zfpm_connection_down("invalid message header");
739 return 0;
5adc2528
AS
740 }
741
d62a17ae 742 msg_len = fpm_msg_len(hdr);
5adc2528 743
d62a17ae 744 /*
745 * Read out the rest of the packet.
746 */
747 if (already < msg_len) {
748 ssize_t nbyte;
5adc2528 749
d62a17ae 750 nbyte = stream_read_try(ibuf, zfpm_g->sock, msg_len - already);
5adc2528 751
d62a17ae 752 if (nbyte == 0 || nbyte == -1) {
677f704d
DS
753 if (nbyte == -1) {
754 char buffer[1024];
755
756 sprintf(buffer, "failed to read message(%d) %s",
757 errno, safe_strerror(errno));
758 zfpm_connection_down(buffer);
996c9314 759 } else
677f704d 760 zfpm_connection_down("failed to read message");
d62a17ae 761 return 0;
762 }
763
764 if (nbyte != (ssize_t)(msg_len - already))
765 goto done;
766 }
767
768 zfpm_debug("Read out a full fpm message");
769
770 /*
771 * Just throw it away for now.
772 */
773 stream_reset(ibuf);
774
775done:
776 zfpm_read_on();
777 return 0;
5adc2528
AS
778}
779
780/*
781 * zfpm_writes_pending
782 *
783 * Returns TRUE if we may have something to write to the FPM.
784 */
d62a17ae 785static int zfpm_writes_pending(void)
5adc2528
AS
786{
787
d62a17ae 788 /*
789 * Check if there is any data in the outbound buffer that has not
790 * been written to the socket yet.
791 */
792 if (stream_get_endp(zfpm_g->obuf) - stream_get_getp(zfpm_g->obuf))
793 return 1;
5adc2528 794
d62a17ae 795 /*
796 * Check if there are any prefixes on the outbound queue.
797 */
798 if (!TAILQ_EMPTY(&zfpm_g->dest_q))
799 return 1;
5adc2528 800
d62a17ae 801 return 0;
5adc2528
AS
802}
803
804/*
805 * zfpm_encode_route
806 *
807 * Encode a message to the FPM with information about the given route.
808 *
809 * Returns the number of bytes written to the buffer. 0 or a negative
810 * value indicates an error.
811 */
d62a17ae 812static inline int zfpm_encode_route(rib_dest_t *dest, struct route_entry *re,
813 char *in_buf, size_t in_buf_len,
814 fpm_msg_type_e *msg_type)
5adc2528 815{
d62a17ae 816 size_t len;
9bf75362 817#ifdef HAVE_NETLINK
d62a17ae 818 int cmd;
9bf75362 819#endif
d62a17ae 820 len = 0;
5adc2528 821
d62a17ae 822 *msg_type = FPM_MSG_TYPE_NONE;
5adc2528 823
d62a17ae 824 switch (zfpm_g->message_format) {
5adc2528 825
d62a17ae 826 case ZFPM_MSG_FORMAT_PROTOBUF:
fb0aa886 827#ifdef HAVE_PROTOBUF
d62a17ae 828 len = zfpm_protobuf_encode_route(dest, re, (uint8_t *)in_buf,
829 in_buf_len);
830 *msg_type = FPM_MSG_TYPE_PROTOBUF;
fb0aa886 831#endif
d62a17ae 832 break;
5adc2528 833
d62a17ae 834 case ZFPM_MSG_FORMAT_NETLINK:
fb0aa886 835#ifdef HAVE_NETLINK
d62a17ae 836 *msg_type = FPM_MSG_TYPE_NETLINK;
837 cmd = re ? RTM_NEWROUTE : RTM_DELROUTE;
838 len = zfpm_netlink_encode_route(cmd, dest, re, in_buf,
839 in_buf_len);
840 assert(fpm_msg_align(len) == len);
841 *msg_type = FPM_MSG_TYPE_NETLINK;
5adc2528 842#endif /* HAVE_NETLINK */
d62a17ae 843 break;
fb0aa886 844
d62a17ae 845 default:
846 break;
847 }
fb0aa886 848
d62a17ae 849 return len;
5adc2528
AS
850}
851
852/*
853 * zfpm_route_for_update
854 *
f0f77c9a 855 * Returns the re that is to be sent to the FPM for a given dest.
5adc2528 856 */
d62a17ae 857struct route_entry *zfpm_route_for_update(rib_dest_t *dest)
5adc2528 858{
5f7a4718 859 return dest->selected_fib;
5adc2528
AS
860}
861
862/*
863 * zfpm_build_updates
864 *
865 * Process the outgoing queue and write messages to the outbound
866 * buffer.
867 */
d62a17ae 868static void zfpm_build_updates(void)
5adc2528 869{
d62a17ae 870 struct stream *s;
871 rib_dest_t *dest;
872 unsigned char *buf, *data, *buf_end;
873 size_t msg_len;
874 size_t data_len;
875 fpm_msg_hdr_t *hdr;
876 struct route_entry *re;
877 int is_add, write_msg;
878 fpm_msg_type_e msg_type;
879
880 s = zfpm_g->obuf;
881
882 assert(stream_empty(s));
883
884 do {
885
886 /*
887 * Make sure there is enough space to write another message.
888 */
889 if (STREAM_WRITEABLE(s) < FPM_MAX_MSG_LEN)
890 break;
891
892 buf = STREAM_DATA(s) + stream_get_endp(s);
893 buf_end = buf + STREAM_WRITEABLE(s);
894
895 dest = TAILQ_FIRST(&zfpm_g->dest_q);
896 if (!dest)
897 break;
898
899 assert(CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM));
900
901 hdr = (fpm_msg_hdr_t *)buf;
902 hdr->version = FPM_PROTO_VERSION;
903
904 data = fpm_msg_data(hdr);
905
906 re = zfpm_route_for_update(dest);
907 is_add = re ? 1 : 0;
908
909 write_msg = 1;
910
911 /*
912 * If this is a route deletion, and we have not sent the route
913 * to
914 * the FPM previously, skip it.
915 */
916 if (!is_add && !CHECK_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM)) {
917 write_msg = 0;
918 zfpm_g->stats.nop_deletes_skipped++;
919 }
920
921 if (write_msg) {
922 data_len = zfpm_encode_route(dest, re, (char *)data,
923 buf_end - data, &msg_type);
924
925 assert(data_len);
926 if (data_len) {
927 hdr->msg_type = msg_type;
928 msg_len = fpm_data_len_to_msg_len(data_len);
929 hdr->msg_len = htons(msg_len);
930 stream_forward_endp(s, msg_len);
931
932 if (is_add)
933 zfpm_g->stats.route_adds++;
934 else
935 zfpm_g->stats.route_dels++;
936 }
937 }
938
939 /*
940 * Remove the dest from the queue, and reset the flag.
941 */
942 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
943 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
944
945 if (is_add) {
946 SET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
947 } else {
948 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
949 }
950
951 /*
952 * Delete the destination if necessary.
953 */
954 if (rib_gc_dest(dest->rnode))
955 zfpm_g->stats.dests_del_after_update++;
956
957 } while (1);
5adc2528
AS
958}
959
960/*
961 * zfpm_write_cb
962 */
d62a17ae 963static int zfpm_write_cb(struct thread *thread)
5adc2528 964{
d62a17ae 965 struct stream *s;
966 int num_writes;
967
968 zfpm_g->stats.write_cb_calls++;
969 zfpm_g->t_write = NULL;
970
971 /*
972 * Check if async connect is now done.
973 */
974 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
975 zfpm_connect_check();
976 return 0;
977 }
5adc2528 978
d62a17ae 979 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
980 assert(zfpm_g->sock >= 0);
5adc2528 981
d62a17ae 982 num_writes = 0;
5adc2528 983
d62a17ae 984 do {
985 int bytes_to_write, bytes_written;
5adc2528 986
d62a17ae 987 s = zfpm_g->obuf;
5adc2528 988
d62a17ae 989 /*
990 * If the stream is empty, try fill it up with data.
991 */
992 if (stream_empty(s)) {
993 zfpm_build_updates();
994 }
5adc2528 995
d62a17ae 996 bytes_to_write = stream_get_endp(s) - stream_get_getp(s);
997 if (!bytes_to_write)
998 break;
5adc2528 999
d62a17ae 1000 bytes_written =
2d34fb80 1001 write(zfpm_g->sock, stream_pnt(s), bytes_to_write);
d62a17ae 1002 zfpm_g->stats.write_calls++;
1003 num_writes++;
5adc2528 1004
d62a17ae 1005 if (bytes_written < 0) {
1006 if (ERRNO_IO_RETRY(errno))
1007 break;
5adc2528 1008
d62a17ae 1009 zfpm_connection_down("failed to write to socket");
1010 return 0;
1011 }
5adc2528 1012
d62a17ae 1013 if (bytes_written != bytes_to_write) {
5adc2528 1014
d62a17ae 1015 /*
1016 * Partial write.
1017 */
1018 stream_forward_getp(s, bytes_written);
1019 zfpm_g->stats.partial_writes++;
1020 break;
1021 }
5adc2528 1022
d62a17ae 1023 /*
1024 * We've written out the entire contents of the stream.
1025 */
1026 stream_reset(s);
5adc2528 1027
d62a17ae 1028 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN) {
1029 zfpm_g->stats.max_writes_hit++;
1030 break;
1031 }
5adc2528 1032
d62a17ae 1033 if (zfpm_thread_should_yield(thread)) {
1034 zfpm_g->stats.t_write_yields++;
1035 break;
1036 }
1037 } while (1);
5adc2528 1038
d62a17ae 1039 if (zfpm_writes_pending())
1040 zfpm_write_on();
5adc2528 1041
d62a17ae 1042 return 0;
5adc2528
AS
1043}
1044
1045/*
1046 * zfpm_connect_cb
1047 */
d62a17ae 1048static int zfpm_connect_cb(struct thread *t)
5adc2528 1049{
d62a17ae 1050 int sock, ret;
1051 struct sockaddr_in serv;
1052
1053 zfpm_g->t_connect = NULL;
1054 assert(zfpm_g->state == ZFPM_STATE_ACTIVE);
1055
1056 sock = socket(AF_INET, SOCK_STREAM, 0);
1057 if (sock < 0) {
1058 zfpm_debug("Failed to create socket for connect(): %s",
1059 strerror(errno));
1060 zfpm_g->stats.connect_no_sock++;
1061 return 0;
1062 }
1063
1064 set_nonblocking(sock);
1065
1066 /* Make server socket. */
1067 memset(&serv, 0, sizeof(serv));
1068 serv.sin_family = AF_INET;
1069 serv.sin_port = htons(zfpm_g->fpm_port);
5adc2528 1070#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
d62a17ae 1071 serv.sin_len = sizeof(struct sockaddr_in);
5adc2528 1072#endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
d62a17ae 1073 if (!zfpm_g->fpm_server)
1074 serv.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1075 else
1076 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1077
1078 /*
1079 * Connect to the FPM.
1080 */
1081 zfpm_g->connect_calls++;
1082 zfpm_g->stats.connect_calls++;
1083 zfpm_g->last_connect_call_time = monotime(NULL);
1084
1085 ret = connect(sock, (struct sockaddr *)&serv, sizeof(serv));
1086 if (ret >= 0) {
1087 zfpm_g->sock = sock;
1088 zfpm_connection_up("connect succeeded");
1089 return 1;
1090 }
1091
1092 if (errno == EINPROGRESS) {
1093 zfpm_g->sock = sock;
1094 zfpm_read_on();
1095 zfpm_write_on();
1096 zfpm_set_state(ZFPM_STATE_CONNECTING,
1097 "async connect in progress");
1098 return 0;
1099 }
1100
1101 zlog_info("can't connect to FPM %d: %s", sock, safe_strerror(errno));
1102 close(sock);
1103
1104 /*
1105 * Restart timer for retrying connection.
1106 */
1107 zfpm_start_connect_timer("connect() failed");
1108 return 0;
5adc2528
AS
1109}
1110
1111/*
1112 * zfpm_set_state
1113 *
1114 * Move state machine into the given state.
1115 */
d62a17ae 1116static void zfpm_set_state(zfpm_state_t state, const char *reason)
5adc2528 1117{
d62a17ae 1118 zfpm_state_t cur_state = zfpm_g->state;
1119
1120 if (!reason)
1121 reason = "Unknown";
1122
1123 if (state == cur_state)
1124 return;
1125
1126 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1127 zfpm_state_to_str(cur_state), zfpm_state_to_str(state),
1128 reason);
1129
1130 switch (state) {
1131
1132 case ZFPM_STATE_IDLE:
1133 assert(cur_state == ZFPM_STATE_ESTABLISHED);
1134 break;
1135
1136 case ZFPM_STATE_ACTIVE:
1137 assert(cur_state == ZFPM_STATE_IDLE
1138 || cur_state == ZFPM_STATE_CONNECTING);
1139 assert(zfpm_g->t_connect);
1140 break;
1141
1142 case ZFPM_STATE_CONNECTING:
1143 assert(zfpm_g->sock);
1144 assert(cur_state == ZFPM_STATE_ACTIVE);
1145 assert(zfpm_g->t_read);
1146 assert(zfpm_g->t_write);
1147 break;
1148
1149 case ZFPM_STATE_ESTABLISHED:
1150 assert(cur_state == ZFPM_STATE_ACTIVE
1151 || cur_state == ZFPM_STATE_CONNECTING);
1152 assert(zfpm_g->sock);
1153 assert(zfpm_g->t_read);
1154 assert(zfpm_g->t_write);
1155 break;
1156 }
1157
1158 zfpm_g->state = state;
5adc2528
AS
1159}
1160
1161/*
1162 * zfpm_calc_connect_delay
1163 *
1164 * Returns the number of seconds after which we should attempt to
1165 * reconnect to the FPM.
1166 */
d62a17ae 1167static long zfpm_calc_connect_delay(void)
5adc2528 1168{
d62a17ae 1169 time_t elapsed;
5adc2528 1170
d62a17ae 1171 /*
1172 * Return 0 if this is our first attempt to connect.
1173 */
1174 if (zfpm_g->connect_calls == 0) {
1175 return 0;
1176 }
5adc2528 1177
d62a17ae 1178 elapsed = zfpm_get_elapsed_time(zfpm_g->last_connect_call_time);
5adc2528 1179
d62a17ae 1180 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1181 return 0;
1182 }
5adc2528 1183
d62a17ae 1184 return ZFPM_CONNECT_RETRY_IVL - elapsed;
5adc2528
AS
1185}
1186
1187/*
1188 * zfpm_start_connect_timer
1189 */
d62a17ae 1190static void zfpm_start_connect_timer(const char *reason)
5adc2528 1191{
d62a17ae 1192 long delay_secs;
5adc2528 1193
d62a17ae 1194 assert(!zfpm_g->t_connect);
1195 assert(zfpm_g->sock < 0);
5adc2528 1196
d62a17ae 1197 assert(zfpm_g->state == ZFPM_STATE_IDLE
1198 || zfpm_g->state == ZFPM_STATE_ACTIVE
1199 || zfpm_g->state == ZFPM_STATE_CONNECTING);
5adc2528 1200
d62a17ae 1201 delay_secs = zfpm_calc_connect_delay();
1202 zfpm_debug("scheduling connect in %ld seconds", delay_secs);
5adc2528 1203
d62a17ae 1204 thread_add_timer(zfpm_g->master, zfpm_connect_cb, 0, delay_secs,
1205 &zfpm_g->t_connect);
1206 zfpm_set_state(ZFPM_STATE_ACTIVE, reason);
5adc2528
AS
1207}
1208
1209/*
1210 * zfpm_is_enabled
1211 *
1212 * Returns TRUE if the zebra FPM module has been enabled.
1213 */
d62a17ae 1214static inline int zfpm_is_enabled(void)
5adc2528 1215{
d62a17ae 1216 return zfpm_g->enabled;
5adc2528
AS
1217}
1218
1219/*
1220 * zfpm_conn_is_up
1221 *
1222 * Returns TRUE if the connection to the FPM is up.
1223 */
d62a17ae 1224static inline int zfpm_conn_is_up(void)
5adc2528 1225{
d62a17ae 1226 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1227 return 0;
5adc2528 1228
d62a17ae 1229 assert(zfpm_g->sock >= 0);
5adc2528 1230
d62a17ae 1231 return 1;
5adc2528
AS
1232}
1233
1234/*
1235 * zfpm_trigger_update
1236 *
1237 * The zebra code invokes this function to indicate that we should
1238 * send an update to the FPM about the given route_node.
1239 */
d62a17ae 1240static int zfpm_trigger_update(struct route_node *rn, const char *reason)
5adc2528 1241{
d62a17ae 1242 rib_dest_t *dest;
1243 char buf[PREFIX_STRLEN];
1244
1245 /*
1246 * Ignore if the connection is down. We will update the FPM about
1247 * all destinations once the connection comes up.
1248 */
1249 if (!zfpm_conn_is_up())
1250 return 0;
1251
1252 dest = rib_dest_from_rnode(rn);
1253
d62a17ae 1254 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
1255 zfpm_g->stats.redundant_triggers++;
1256 return 0;
1257 }
1258
1259 if (reason) {
1260 zfpm_debug("%s triggering update to FPM - Reason: %s",
1261 prefix2str(&rn->p, buf, sizeof(buf)), reason);
1262 }
1263
1264 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1265 TAILQ_INSERT_TAIL(&zfpm_g->dest_q, dest, fpm_q_entries);
1266 zfpm_g->stats.updates_triggered++;
1267
1268 /*
1269 * Make sure that writes are enabled.
1270 */
1271 if (zfpm_g->t_write)
1272 return 0;
1273
1274 zfpm_write_on();
1275 return 0;
5adc2528
AS
1276}
1277
1278/*
1279 * zfpm_stats_timer_cb
1280 */
d62a17ae 1281static int zfpm_stats_timer_cb(struct thread *t)
5adc2528 1282{
d62a17ae 1283 zfpm_g->t_stats = NULL;
5adc2528 1284
d62a17ae 1285 /*
1286 * Remember the stats collected in the last interval for display
1287 * purposes.
1288 */
1289 zfpm_stats_copy(&zfpm_g->stats, &zfpm_g->last_ivl_stats);
5adc2528 1290
d62a17ae 1291 /*
1292 * Add the current set of stats into the cumulative statistics.
1293 */
1294 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1295 &zfpm_g->cumulative_stats);
5adc2528 1296
d62a17ae 1297 /*
1298 * Start collecting stats afresh over the next interval.
1299 */
1300 zfpm_stats_reset(&zfpm_g->stats);
5adc2528 1301
d62a17ae 1302 zfpm_start_stats_timer();
5adc2528 1303
d62a17ae 1304 return 0;
5adc2528
AS
1305}
1306
1307/*
1308 * zfpm_stop_stats_timer
1309 */
d62a17ae 1310static void zfpm_stop_stats_timer(void)
5adc2528 1311{
d62a17ae 1312 if (!zfpm_g->t_stats)
1313 return;
5adc2528 1314
d62a17ae 1315 zfpm_debug("Stopping existing stats timer");
1316 THREAD_TIMER_OFF(zfpm_g->t_stats);
5adc2528
AS
1317}
1318
1319/*
1320 * zfpm_start_stats_timer
1321 */
d62a17ae 1322void zfpm_start_stats_timer(void)
5adc2528 1323{
d62a17ae 1324 assert(!zfpm_g->t_stats);
5adc2528 1325
d62a17ae 1326 thread_add_timer(zfpm_g->master, zfpm_stats_timer_cb, 0,
1327 ZFPM_STATS_IVL_SECS, &zfpm_g->t_stats);
5adc2528
AS
1328}
1329
1330/*
1331 * Helper macro for zfpm_show_stats() below.
1332 */
d62a17ae 1333#define ZFPM_SHOW_STAT(counter) \
1334 do { \
1335 vty_out(vty, "%-40s %10lu %16lu\n", #counter, \
1336 total_stats.counter, zfpm_g->last_ivl_stats.counter); \
1337 } while (0)
5adc2528
AS
1338
1339/*
1340 * zfpm_show_stats
1341 */
d62a17ae 1342static void zfpm_show_stats(struct vty *vty)
5adc2528 1343{
d62a17ae 1344 zfpm_stats_t total_stats;
1345 time_t elapsed;
1346
1347 vty_out(vty, "\n%-40s %10s Last %2d secs\n\n", "Counter", "Total",
1348 ZFPM_STATS_IVL_SECS);
1349
1350 /*
1351 * Compute the total stats up to this instant.
1352 */
1353 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1354 &total_stats);
1355
1356 ZFPM_SHOW_STAT(connect_calls);
1357 ZFPM_SHOW_STAT(connect_no_sock);
1358 ZFPM_SHOW_STAT(read_cb_calls);
1359 ZFPM_SHOW_STAT(write_cb_calls);
1360 ZFPM_SHOW_STAT(write_calls);
1361 ZFPM_SHOW_STAT(partial_writes);
1362 ZFPM_SHOW_STAT(max_writes_hit);
1363 ZFPM_SHOW_STAT(t_write_yields);
1364 ZFPM_SHOW_STAT(nop_deletes_skipped);
1365 ZFPM_SHOW_STAT(route_adds);
1366 ZFPM_SHOW_STAT(route_dels);
1367 ZFPM_SHOW_STAT(updates_triggered);
d62a17ae 1368 ZFPM_SHOW_STAT(redundant_triggers);
1369 ZFPM_SHOW_STAT(dests_del_after_update);
1370 ZFPM_SHOW_STAT(t_conn_down_starts);
1371 ZFPM_SHOW_STAT(t_conn_down_dests_processed);
1372 ZFPM_SHOW_STAT(t_conn_down_yields);
1373 ZFPM_SHOW_STAT(t_conn_down_finishes);
1374 ZFPM_SHOW_STAT(t_conn_up_starts);
1375 ZFPM_SHOW_STAT(t_conn_up_dests_processed);
1376 ZFPM_SHOW_STAT(t_conn_up_yields);
1377 ZFPM_SHOW_STAT(t_conn_up_aborts);
1378 ZFPM_SHOW_STAT(t_conn_up_finishes);
1379
1380 if (!zfpm_g->last_stats_clear_time)
1381 return;
1382
1383 elapsed = zfpm_get_elapsed_time(zfpm_g->last_stats_clear_time);
1384
1385 vty_out(vty, "\nStats were cleared %lu seconds ago\n",
1386 (unsigned long)elapsed);
5adc2528
AS
1387}
1388
1389/*
1390 * zfpm_clear_stats
1391 */
d62a17ae 1392static void zfpm_clear_stats(struct vty *vty)
5adc2528 1393{
d62a17ae 1394 if (!zfpm_is_enabled()) {
1395 vty_out(vty, "The FPM module is not enabled...\n");
1396 return;
1397 }
5adc2528 1398
d62a17ae 1399 zfpm_stats_reset(&zfpm_g->stats);
1400 zfpm_stats_reset(&zfpm_g->last_ivl_stats);
1401 zfpm_stats_reset(&zfpm_g->cumulative_stats);
5adc2528 1402
d62a17ae 1403 zfpm_stop_stats_timer();
1404 zfpm_start_stats_timer();
5adc2528 1405
d62a17ae 1406 zfpm_g->last_stats_clear_time = monotime(NULL);
5adc2528 1407
d62a17ae 1408 vty_out(vty, "Cleared FPM stats\n");
5adc2528
AS
1409}
1410
1411/*
1412 * show_zebra_fpm_stats
1413 */
1414DEFUN (show_zebra_fpm_stats,
1415 show_zebra_fpm_stats_cmd,
1416 "show zebra fpm stats",
1417 SHOW_STR
41e7fb80 1418 ZEBRA_STR
5adc2528
AS
1419 "Forwarding Path Manager information\n"
1420 "Statistics\n")
1421{
d62a17ae 1422 zfpm_show_stats(vty);
1423 return CMD_SUCCESS;
5adc2528
AS
1424}
1425
1426/*
1427 * clear_zebra_fpm_stats
1428 */
1429DEFUN (clear_zebra_fpm_stats,
1430 clear_zebra_fpm_stats_cmd,
1431 "clear zebra fpm stats",
1432 CLEAR_STR
41e7fb80 1433 ZEBRA_STR
5adc2528
AS
1434 "Clear Forwarding Path Manager information\n"
1435 "Statistics\n")
1436{
d62a17ae 1437 zfpm_clear_stats(vty);
1438 return CMD_SUCCESS;
5adc2528
AS
1439}
1440
711ff0ba 1441/*
d62a17ae 1442 * update fpm connection information
711ff0ba 1443 */
e52702f2
QY
1444DEFUN ( fpm_remote_ip,
1445 fpm_remote_ip_cmd,
1446 "fpm connection ip A.B.C.D port (1-65535)",
711ff0ba
USK
1447 "fpm connection remote ip and port\n"
1448 "Remote fpm server ip A.B.C.D\n"
1449 "Enter ip ")
1450{
1451
d62a17ae 1452 in_addr_t fpm_server;
1453 uint32_t port_no;
711ff0ba 1454
d62a17ae 1455 fpm_server = inet_addr(argv[3]->arg);
1456 if (fpm_server == INADDR_NONE)
1457 return CMD_ERR_INCOMPLETE;
711ff0ba 1458
d62a17ae 1459 port_no = atoi(argv[5]->arg);
1460 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1461 return CMD_ERR_INCOMPLETE;
711ff0ba 1462
d62a17ae 1463 zfpm_g->fpm_server = fpm_server;
1464 zfpm_g->fpm_port = port_no;
711ff0ba
USK
1465
1466
d62a17ae 1467 return CMD_SUCCESS;
711ff0ba
USK
1468}
1469
e52702f2
QY
1470DEFUN ( no_fpm_remote_ip,
1471 no_fpm_remote_ip_cmd,
1472 "no fpm connection ip A.B.C.D port (1-65535)",
711ff0ba
USK
1473 "fpm connection remote ip and port\n"
1474 "Connection\n"
1475 "Remote fpm server ip A.B.C.D\n"
1476 "Enter ip ")
1477{
d62a17ae 1478 if (zfpm_g->fpm_server != inet_addr(argv[4]->arg)
1479 || zfpm_g->fpm_port != atoi(argv[6]->arg))
1480 return CMD_ERR_NO_MATCH;
711ff0ba 1481
d62a17ae 1482 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1483 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
711ff0ba 1484
d62a17ae 1485 return CMD_SUCCESS;
711ff0ba 1486}
711ff0ba 1487
fb0aa886
AS
1488/*
1489 * zfpm_init_message_format
1490 */
d62a17ae 1491static inline void zfpm_init_message_format(const char *format)
fb0aa886 1492{
d62a17ae 1493 int have_netlink, have_protobuf;
fb0aa886 1494
fb0aa886 1495#ifdef HAVE_NETLINK
d62a17ae 1496 have_netlink = 1;
4b2792b5 1497#else
d62a17ae 1498 have_netlink = 0;
fb0aa886
AS
1499#endif
1500
1501#ifdef HAVE_PROTOBUF
d62a17ae 1502 have_protobuf = 1;
4b2792b5 1503#else
d62a17ae 1504 have_protobuf = 0;
fb0aa886
AS
1505#endif
1506
d62a17ae 1507 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
fb0aa886 1508
d62a17ae 1509 if (!format) {
1510 if (have_netlink) {
1511 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1512 } else if (have_protobuf) {
1513 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1514 }
1515 return;
fb0aa886 1516 }
fb0aa886 1517
d62a17ae 1518 if (!strcmp("netlink", format)) {
1519 if (!have_netlink) {
1520 zlog_err("FPM netlink message format is not available");
1521 return;
1522 }
1523 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1524 return;
fb0aa886 1525 }
fb0aa886 1526
d62a17ae 1527 if (!strcmp("protobuf", format)) {
1528 if (!have_protobuf) {
1529 zlog_err(
1530 "FPM protobuf message format is not available");
1531 return;
1532 }
1533 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1534 return;
fb0aa886 1535 }
fb0aa886 1536
d62a17ae 1537 zlog_warn("Unknown fpm format '%s'", format);
fb0aa886
AS
1538}
1539
711ff0ba 1540/**
d62a17ae 1541 * fpm_remote_srv_write
711ff0ba 1542 *
d62a17ae 1543 * Module to write remote fpm connection
711ff0ba
USK
1544 *
1545 * Returns ZERO on success.
1546 */
1547
d62a17ae 1548static int fpm_remote_srv_write(struct vty *vty)
711ff0ba 1549{
d62a17ae 1550 struct in_addr in;
711ff0ba 1551
d62a17ae 1552 in.s_addr = zfpm_g->fpm_server;
711ff0ba 1553
9d1c2659 1554 if ((zfpm_g->fpm_server != FPM_DEFAULT_IP
996c9314
LB
1555 && zfpm_g->fpm_server != INADDR_ANY)
1556 || (zfpm_g->fpm_port != FPM_DEFAULT_PORT && zfpm_g->fpm_port != 0))
d62a17ae 1557 vty_out(vty, "fpm connection ip %s port %d\n", inet_ntoa(in),
1558 zfpm_g->fpm_port);
711ff0ba 1559
d62a17ae 1560 return 0;
711ff0ba
USK
1561}
1562
1563
4f8ea50c 1564/* Zebra node */
d62a17ae 1565static struct cmd_node zebra_node = {ZEBRA_NODE, "", 1};
4f8ea50c
DL
1566
1567
5adc2528
AS
1568/**
1569 * zfpm_init
1570 *
1571 * One-time initialization of the Zebra FPM module.
1572 *
1573 * @param[in] port port at which FPM is running.
1574 * @param[in] enable TRUE if the zebra FPM module should be enabled
fb0aa886 1575 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
5adc2528
AS
1576 *
1577 * Returns TRUE on success.
1578 */
d62a17ae 1579static int zfpm_init(struct thread_master *master)
5adc2528 1580{
d62a17ae 1581 int enable = 1;
1582 uint16_t port = 0;
1583 const char *format = THIS_MODULE->load_args;
5adc2528 1584
d62a17ae 1585 memset(zfpm_g, 0, sizeof(*zfpm_g));
1586 zfpm_g->master = master;
1587 TAILQ_INIT(&zfpm_g->dest_q);
1588 zfpm_g->sock = -1;
1589 zfpm_g->state = ZFPM_STATE_IDLE;
5adc2528 1590
d62a17ae 1591 zfpm_stats_init(&zfpm_g->stats);
1592 zfpm_stats_init(&zfpm_g->last_ivl_stats);
1593 zfpm_stats_init(&zfpm_g->cumulative_stats);
5adc2528 1594
d62a17ae 1595 install_node(&zebra_node, fpm_remote_srv_write);
1596 install_element(ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1597 install_element(ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1598 install_element(CONFIG_NODE, &fpm_remote_ip_cmd);
1599 install_element(CONFIG_NODE, &no_fpm_remote_ip_cmd);
5adc2528 1600
d62a17ae 1601 zfpm_init_message_format(format);
fb0aa886 1602
d62a17ae 1603 /*
1604 * Disable FPM interface if no suitable format is available.
1605 */
1606 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1607 enable = 0;
fb0aa886 1608
d62a17ae 1609 zfpm_g->enabled = enable;
5adc2528 1610
d62a17ae 1611 if (!zfpm_g->fpm_server)
1612 zfpm_g->fpm_server = FPM_DEFAULT_IP;
711ff0ba 1613
d62a17ae 1614 if (!port)
1615 port = FPM_DEFAULT_PORT;
5adc2528 1616
d62a17ae 1617 zfpm_g->fpm_port = port;
5adc2528 1618
d62a17ae 1619 zfpm_g->obuf = stream_new(ZFPM_OBUF_SIZE);
1620 zfpm_g->ibuf = stream_new(ZFPM_IBUF_SIZE);
5adc2528 1621
d62a17ae 1622 zfpm_start_stats_timer();
1623 zfpm_start_connect_timer("initialized");
1624 return 0;
4f8ea50c 1625}
5adc2528 1626
d62a17ae 1627static int zebra_fpm_module_init(void)
4f8ea50c 1628{
d62a17ae 1629 hook_register(rib_update, zfpm_trigger_update);
1630 hook_register(frr_late_init, zfpm_init);
1631 return 0;
5adc2528 1632}
4f8ea50c 1633
d62a17ae 1634FRR_MODULE_SETUP(.name = "zebra_fpm", .version = FRR_VERSION,
1635 .description = "zebra FPM (Forwarding Plane Manager) module",
1636 .init = zebra_fpm_module_init, )