]> git.proxmox.com Git - mirror_frr.git/blame - zebra/zebra_fpm.c
lib: enforce vrf_name_to_id by returning default_vrf when name is null
[mirror_frr.git] / zebra / zebra_fpm.c
CommitLineData
5adc2528
AS
1/*
2 * Main implementation file for interface to Forwarding Plane Manager.
3 *
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6 *
7 * This file is part of GNU Zebra.
8 *
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
12 * later version.
13 *
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
18 *
896014f4
DL
19 * You should have received a copy of the GNU General Public License along
20 * with this program; see the file COPYING; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
5adc2528
AS
22 */
23
24#include <zebra.h>
25
26#include "log.h"
4f8ea50c 27#include "libfrr.h"
5adc2528
AS
28#include "stream.h"
29#include "thread.h"
30#include "network.h"
31#include "command.h"
4f8ea50c 32#include "version.h"
5adc2528
AS
33
34#include "zebra/rib.h"
7c551956
DS
35#include "zebra/zserv.h"
36#include "zebra/zebra_ns.h"
37#include "zebra/zebra_vrf.h"
67aeb554 38#include "zebra/zebra_errors.h"
5adc2528
AS
39
40#include "fpm/fpm.h"
5adc2528
AS
41#include "zebra_fpm_private.h"
42
43/*
44 * Interval at which we attempt to connect to the FPM.
45 */
46#define ZFPM_CONNECT_RETRY_IVL 5
47
48/*
49 * Sizes of outgoing and incoming stream buffers for writing/reading
50 * FPM messages.
51 */
52#define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
53#define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
54
55/*
56 * The maximum number of times the FPM socket write callback can call
57 * 'write' before it yields.
58 */
59#define ZFPM_MAX_WRITES_PER_RUN 10
60
61/*
62 * Interval over which we collect statistics.
63 */
64#define ZFPM_STATS_IVL_SECS 10
65
66/*
67 * Structure that holds state for iterating over all route_node
68 * structures that are candidates for being communicated to the FPM.
69 */
d62a17ae 70typedef struct zfpm_rnodes_iter_t_ {
71 rib_tables_iter_t tables_iter;
72 route_table_iter_t iter;
5adc2528
AS
73} zfpm_rnodes_iter_t;
74
75/*
76 * Statistics.
77 */
78typedef struct zfpm_stats_t_ {
d62a17ae 79 unsigned long connect_calls;
80 unsigned long connect_no_sock;
5adc2528 81
d62a17ae 82 unsigned long read_cb_calls;
5adc2528 83
d62a17ae 84 unsigned long write_cb_calls;
85 unsigned long write_calls;
86 unsigned long partial_writes;
87 unsigned long max_writes_hit;
88 unsigned long t_write_yields;
5adc2528 89
d62a17ae 90 unsigned long nop_deletes_skipped;
91 unsigned long route_adds;
92 unsigned long route_dels;
5adc2528 93
d62a17ae 94 unsigned long updates_triggered;
95 unsigned long redundant_triggers;
5adc2528 96
d62a17ae 97 unsigned long dests_del_after_update;
5adc2528 98
d62a17ae 99 unsigned long t_conn_down_starts;
100 unsigned long t_conn_down_dests_processed;
101 unsigned long t_conn_down_yields;
102 unsigned long t_conn_down_finishes;
5adc2528 103
d62a17ae 104 unsigned long t_conn_up_starts;
105 unsigned long t_conn_up_dests_processed;
106 unsigned long t_conn_up_yields;
107 unsigned long t_conn_up_aborts;
108 unsigned long t_conn_up_finishes;
5adc2528
AS
109
110} zfpm_stats_t;
111
112/*
113 * States for the FPM state machine.
114 */
115typedef enum {
116
d62a17ae 117 /*
118 * In this state we are not yet ready to connect to the FPM. This
119 * can happen when this module is disabled, or if we're cleaning up
120 * after a connection has gone down.
121 */
122 ZFPM_STATE_IDLE,
123
124 /*
125 * Ready to talk to the FPM and periodically trying to connect to
126 * it.
127 */
128 ZFPM_STATE_ACTIVE,
129
130 /*
131 * In the middle of bringing up a TCP connection. Specifically,
132 * waiting for a connect() call to complete asynchronously.
133 */
134 ZFPM_STATE_CONNECTING,
135
136 /*
137 * TCP connection to the FPM is up.
138 */
139 ZFPM_STATE_ESTABLISHED
5adc2528
AS
140
141} zfpm_state_t;
142
fb0aa886
AS
143/*
144 * Message format to be used to communicate with the FPM.
145 */
d62a17ae 146typedef enum {
147 ZFPM_MSG_FORMAT_NONE,
148 ZFPM_MSG_FORMAT_NETLINK,
149 ZFPM_MSG_FORMAT_PROTOBUF,
fb0aa886 150} zfpm_msg_format_e;
5adc2528
AS
151/*
152 * Globals.
153 */
d62a17ae 154typedef struct zfpm_glob_t_ {
155
156 /*
157 * True if the FPM module has been enabled.
158 */
159 int enabled;
160
161 /*
162 * Message format to be used to communicate with the fpm.
163 */
164 zfpm_msg_format_e message_format;
165
166 struct thread_master *master;
167
168 zfpm_state_t state;
169
170 in_addr_t fpm_server;
171 /*
172 * Port on which the FPM is running.
173 */
174 int fpm_port;
175
176 /*
177 * List of rib_dest_t structures to be processed
178 */
179 TAILQ_HEAD(zfpm_dest_q, rib_dest_t_) dest_q;
180
181 /*
182 * Stream socket to the FPM.
183 */
184 int sock;
185
186 /*
187 * Buffers for messages to/from the FPM.
188 */
189 struct stream *obuf;
190 struct stream *ibuf;
191
192 /*
193 * Threads for I/O.
194 */
195 struct thread *t_connect;
196 struct thread *t_write;
197 struct thread *t_read;
198
199 /*
200 * Thread to clean up after the TCP connection to the FPM goes down
201 * and the state that belongs to it.
202 */
203 struct thread *t_conn_down;
204
205 struct {
206 zfpm_rnodes_iter_t iter;
207 } t_conn_down_state;
208
209 /*
210 * Thread to take actions once the TCP conn to the FPM comes up, and
211 * the state that belongs to it.
212 */
213 struct thread *t_conn_up;
214
215 struct {
216 zfpm_rnodes_iter_t iter;
217 } t_conn_up_state;
218
219 unsigned long connect_calls;
220 time_t last_connect_call_time;
221
222 /*
223 * Stats from the start of the current statistics interval up to
224 * now. These are the counters we typically update in the code.
225 */
226 zfpm_stats_t stats;
227
228 /*
229 * Statistics that were gathered in the last collection interval.
230 */
231 zfpm_stats_t last_ivl_stats;
232
233 /*
234 * Cumulative stats from the last clear to the start of the current
235 * statistics interval.
236 */
237 zfpm_stats_t cumulative_stats;
238
239 /*
240 * Stats interval timer.
241 */
242 struct thread *t_stats;
243
244 /*
245 * If non-zero, the last time when statistics were cleared.
246 */
247 time_t last_stats_clear_time;
5adc2528
AS
248
249} zfpm_glob_t;
250
251static zfpm_glob_t zfpm_glob_space;
252static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
253
d62a17ae 254static int zfpm_trigger_update(struct route_node *rn, const char *reason);
4f8ea50c 255
d62a17ae 256static int zfpm_read_cb(struct thread *thread);
257static int zfpm_write_cb(struct thread *thread);
5adc2528 258
d62a17ae 259static void zfpm_set_state(zfpm_state_t state, const char *reason);
260static void zfpm_start_connect_timer(const char *reason);
261static void zfpm_start_stats_timer(void);
5adc2528
AS
262
263/*
264 * zfpm_thread_should_yield
265 */
d62a17ae 266static inline int zfpm_thread_should_yield(struct thread *t)
5adc2528 267{
d62a17ae 268 return thread_should_yield(t);
5adc2528
AS
269}
270
271/*
272 * zfpm_state_to_str
273 */
d62a17ae 274static const char *zfpm_state_to_str(zfpm_state_t state)
5adc2528 275{
d62a17ae 276 switch (state) {
5adc2528 277
d62a17ae 278 case ZFPM_STATE_IDLE:
279 return "idle";
5adc2528 280
d62a17ae 281 case ZFPM_STATE_ACTIVE:
282 return "active";
5adc2528 283
d62a17ae 284 case ZFPM_STATE_CONNECTING:
285 return "connecting";
5adc2528 286
d62a17ae 287 case ZFPM_STATE_ESTABLISHED:
288 return "established";
5adc2528 289
d62a17ae 290 default:
291 return "unknown";
292 }
5adc2528
AS
293}
294
5adc2528
AS
295/*
296 * zfpm_get_elapsed_time
297 *
298 * Returns the time elapsed (in seconds) since the given time.
299 */
d62a17ae 300static time_t zfpm_get_elapsed_time(time_t reference)
5adc2528 301{
d62a17ae 302 time_t now;
5adc2528 303
d62a17ae 304 now = monotime(NULL);
5adc2528 305
d62a17ae 306 if (now < reference) {
307 assert(0);
308 return 0;
309 }
5adc2528 310
d62a17ae 311 return now - reference;
5adc2528
AS
312}
313
5adc2528
AS
314/*
315 * zfpm_rnodes_iter_init
316 */
d62a17ae 317static inline void zfpm_rnodes_iter_init(zfpm_rnodes_iter_t *iter)
5adc2528 318{
d62a17ae 319 memset(iter, 0, sizeof(*iter));
320 rib_tables_iter_init(&iter->tables_iter);
321
322 /*
323 * This is a hack, but it makes implementing 'next' easier by
324 * ensuring that route_table_iter_next() will return NULL the first
325 * time we call it.
326 */
327 route_table_iter_init(&iter->iter, NULL);
328 route_table_iter_cleanup(&iter->iter);
5adc2528
AS
329}
330
331/*
332 * zfpm_rnodes_iter_next
333 */
d62a17ae 334static inline struct route_node *zfpm_rnodes_iter_next(zfpm_rnodes_iter_t *iter)
5adc2528 335{
d62a17ae 336 struct route_node *rn;
337 struct route_table *table;
5adc2528 338
d62a17ae 339 while (1) {
340 rn = route_table_iter_next(&iter->iter);
341 if (rn)
342 return rn;
5adc2528 343
d62a17ae 344 /*
345 * We've made our way through this table, go to the next one.
346 */
347 route_table_iter_cleanup(&iter->iter);
5adc2528 348
c6bbea17 349 table = rib_tables_iter_next(&iter->tables_iter);
5adc2528 350
d62a17ae 351 if (!table)
352 return NULL;
5adc2528 353
d62a17ae 354 route_table_iter_init(&iter->iter, table);
355 }
5adc2528 356
d62a17ae 357 return NULL;
5adc2528
AS
358}
359
360/*
361 * zfpm_rnodes_iter_pause
362 */
d62a17ae 363static inline void zfpm_rnodes_iter_pause(zfpm_rnodes_iter_t *iter)
5adc2528 364{
d62a17ae 365 route_table_iter_pause(&iter->iter);
5adc2528
AS
366}
367
368/*
369 * zfpm_rnodes_iter_cleanup
370 */
d62a17ae 371static inline void zfpm_rnodes_iter_cleanup(zfpm_rnodes_iter_t *iter)
5adc2528 372{
d62a17ae 373 route_table_iter_cleanup(&iter->iter);
374 rib_tables_iter_cleanup(&iter->tables_iter);
5adc2528
AS
375}
376
377/*
378 * zfpm_stats_init
379 *
380 * Initialize a statistics block.
381 */
d62a17ae 382static inline void zfpm_stats_init(zfpm_stats_t *stats)
5adc2528 383{
d62a17ae 384 memset(stats, 0, sizeof(*stats));
5adc2528
AS
385}
386
387/*
388 * zfpm_stats_reset
389 */
d62a17ae 390static inline void zfpm_stats_reset(zfpm_stats_t *stats)
5adc2528 391{
d62a17ae 392 zfpm_stats_init(stats);
5adc2528
AS
393}
394
395/*
396 * zfpm_stats_copy
397 */
d62a17ae 398static inline void zfpm_stats_copy(const zfpm_stats_t *src, zfpm_stats_t *dest)
5adc2528 399{
d62a17ae 400 memcpy(dest, src, sizeof(*dest));
5adc2528
AS
401}
402
403/*
404 * zfpm_stats_compose
405 *
406 * Total up the statistics in two stats structures ('s1 and 's2') and
407 * return the result in the third argument, 'result'. Note that the
408 * pointer 'result' may be the same as 's1' or 's2'.
409 *
410 * For simplicity, the implementation below assumes that the stats
411 * structure is composed entirely of counters. This can easily be
412 * changed when necessary.
413 */
d62a17ae 414static void zfpm_stats_compose(const zfpm_stats_t *s1, const zfpm_stats_t *s2,
415 zfpm_stats_t *result)
5adc2528 416{
d62a17ae 417 const unsigned long *p1, *p2;
418 unsigned long *result_p;
419 int i, num_counters;
5adc2528 420
d62a17ae 421 p1 = (const unsigned long *)s1;
422 p2 = (const unsigned long *)s2;
423 result_p = (unsigned long *)result;
5adc2528 424
d62a17ae 425 num_counters = (sizeof(zfpm_stats_t) / sizeof(unsigned long));
5adc2528 426
d62a17ae 427 for (i = 0; i < num_counters; i++) {
428 result_p[i] = p1[i] + p2[i];
429 }
5adc2528
AS
430}
431
432/*
433 * zfpm_read_on
434 */
d62a17ae 435static inline void zfpm_read_on(void)
5adc2528 436{
d62a17ae 437 assert(!zfpm_g->t_read);
438 assert(zfpm_g->sock >= 0);
5adc2528 439
d62a17ae 440 thread_add_read(zfpm_g->master, zfpm_read_cb, 0, zfpm_g->sock,
441 &zfpm_g->t_read);
5adc2528
AS
442}
443
444/*
445 * zfpm_write_on
446 */
d62a17ae 447static inline void zfpm_write_on(void)
5adc2528 448{
d62a17ae 449 assert(!zfpm_g->t_write);
450 assert(zfpm_g->sock >= 0);
5adc2528 451
d62a17ae 452 thread_add_write(zfpm_g->master, zfpm_write_cb, 0, zfpm_g->sock,
453 &zfpm_g->t_write);
5adc2528
AS
454}
455
456/*
457 * zfpm_read_off
458 */
d62a17ae 459static inline void zfpm_read_off(void)
5adc2528 460{
d62a17ae 461 THREAD_READ_OFF(zfpm_g->t_read);
5adc2528
AS
462}
463
464/*
465 * zfpm_write_off
466 */
d62a17ae 467static inline void zfpm_write_off(void)
5adc2528 468{
d62a17ae 469 THREAD_WRITE_OFF(zfpm_g->t_write);
5adc2528
AS
470}
471
472/*
473 * zfpm_conn_up_thread_cb
474 *
475 * Callback for actions to be taken when the connection to the FPM
476 * comes up.
477 */
d62a17ae 478static int zfpm_conn_up_thread_cb(struct thread *thread)
5adc2528 479{
d62a17ae 480 struct route_node *rnode;
481 zfpm_rnodes_iter_t *iter;
482 rib_dest_t *dest;
5adc2528 483
d62a17ae 484 zfpm_g->t_conn_up = NULL;
5adc2528 485
d62a17ae 486 iter = &zfpm_g->t_conn_up_state.iter;
5adc2528 487
d62a17ae 488 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED) {
489 zfpm_debug(
490 "Connection not up anymore, conn_up thread aborting");
491 zfpm_g->stats.t_conn_up_aborts++;
492 goto done;
493 }
5adc2528 494
d62a17ae 495 while ((rnode = zfpm_rnodes_iter_next(iter))) {
496 dest = rib_dest_from_rnode(rnode);
497
498 if (dest) {
499 zfpm_g->stats.t_conn_up_dests_processed++;
500 zfpm_trigger_update(rnode, NULL);
501 }
502
503 /*
504 * Yield if need be.
505 */
506 if (!zfpm_thread_should_yield(thread))
507 continue;
508
509 zfpm_g->stats.t_conn_up_yields++;
510 zfpm_rnodes_iter_pause(iter);
511 zfpm_g->t_conn_up = NULL;
512 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb,
513 NULL, 0, &zfpm_g->t_conn_up);
514 return 0;
5adc2528
AS
515 }
516
d62a17ae 517 zfpm_g->stats.t_conn_up_finishes++;
518
519done:
520 zfpm_rnodes_iter_cleanup(iter);
521 return 0;
5adc2528
AS
522}
523
524/*
525 * zfpm_connection_up
526 *
527 * Called when the connection to the FPM comes up.
528 */
d62a17ae 529static void zfpm_connection_up(const char *detail)
5adc2528 530{
d62a17ae 531 assert(zfpm_g->sock >= 0);
532 zfpm_read_on();
533 zfpm_write_on();
534 zfpm_set_state(ZFPM_STATE_ESTABLISHED, detail);
535
536 /*
537 * Start thread to push existing routes to the FPM.
538 */
539 assert(!zfpm_g->t_conn_up);
540
541 zfpm_rnodes_iter_init(&zfpm_g->t_conn_up_state.iter);
542
543 zfpm_debug("Starting conn_up thread");
544 zfpm_g->t_conn_up = NULL;
545 thread_add_timer_msec(zfpm_g->master, zfpm_conn_up_thread_cb, NULL, 0,
546 &zfpm_g->t_conn_up);
547 zfpm_g->stats.t_conn_up_starts++;
5adc2528
AS
548}
549
550/*
551 * zfpm_connect_check
552 *
553 * Check if an asynchronous connect() to the FPM is complete.
554 */
d62a17ae 555static void zfpm_connect_check(void)
5adc2528 556{
d62a17ae 557 int status;
558 socklen_t slen;
559 int ret;
560
561 zfpm_read_off();
562 zfpm_write_off();
563
564 slen = sizeof(status);
565 ret = getsockopt(zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *)&status,
566 &slen);
567
568 if (ret >= 0 && status == 0) {
569 zfpm_connection_up("async connect complete");
570 return;
571 }
572
573 /*
574 * getsockopt() failed or indicated an error on the socket.
575 */
576 close(zfpm_g->sock);
577 zfpm_g->sock = -1;
578
579 zfpm_start_connect_timer("getsockopt() after async connect failed");
580 return;
5adc2528
AS
581}
582
583/*
584 * zfpm_conn_down_thread_cb
585 *
586 * Callback that is invoked to clean up state after the TCP connection
587 * to the FPM goes down.
588 */
d62a17ae 589static int zfpm_conn_down_thread_cb(struct thread *thread)
5adc2528 590{
d62a17ae 591 struct route_node *rnode;
592 zfpm_rnodes_iter_t *iter;
593 rib_dest_t *dest;
5adc2528 594
d62a17ae 595 assert(zfpm_g->state == ZFPM_STATE_IDLE);
5adc2528 596
d62a17ae 597 zfpm_g->t_conn_down = NULL;
5adc2528 598
d62a17ae 599 iter = &zfpm_g->t_conn_down_state.iter;
5adc2528 600
d62a17ae 601 while ((rnode = zfpm_rnodes_iter_next(iter))) {
602 dest = rib_dest_from_rnode(rnode);
5adc2528 603
d62a17ae 604 if (dest) {
605 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
606 TAILQ_REMOVE(&zfpm_g->dest_q, dest,
607 fpm_q_entries);
608 }
609
610 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
611 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
5adc2528 612
d62a17ae 613 zfpm_g->stats.t_conn_down_dests_processed++;
5adc2528 614
d62a17ae 615 /*
616 * Check if the dest should be deleted.
617 */
618 rib_gc_dest(rnode);
619 }
5adc2528 620
d62a17ae 621 /*
622 * Yield if need be.
623 */
624 if (!zfpm_thread_should_yield(thread))
625 continue;
626
627 zfpm_g->stats.t_conn_down_yields++;
628 zfpm_rnodes_iter_pause(iter);
629 zfpm_g->t_conn_down = NULL;
630 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb,
631 NULL, 0, &zfpm_g->t_conn_down);
632 return 0;
5adc2528
AS
633 }
634
d62a17ae 635 zfpm_g->stats.t_conn_down_finishes++;
636 zfpm_rnodes_iter_cleanup(iter);
637
638 /*
639 * Start the process of connecting to the FPM again.
640 */
641 zfpm_start_connect_timer("cleanup complete");
642 return 0;
5adc2528
AS
643}
644
645/*
646 * zfpm_connection_down
647 *
648 * Called when the connection to the FPM has gone down.
649 */
d62a17ae 650static void zfpm_connection_down(const char *detail)
5adc2528 651{
d62a17ae 652 if (!detail)
653 detail = "unknown";
5adc2528 654
d62a17ae 655 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
5adc2528 656
d62a17ae 657 zlog_info("connection to the FPM has gone down: %s", detail);
5adc2528 658
d62a17ae 659 zfpm_read_off();
660 zfpm_write_off();
5adc2528 661
d62a17ae 662 stream_reset(zfpm_g->ibuf);
663 stream_reset(zfpm_g->obuf);
5adc2528 664
d62a17ae 665 if (zfpm_g->sock >= 0) {
666 close(zfpm_g->sock);
667 zfpm_g->sock = -1;
668 }
5adc2528 669
d62a17ae 670 /*
671 * Start thread to clean up state after the connection goes down.
672 */
673 assert(!zfpm_g->t_conn_down);
674 zfpm_debug("Starting conn_down thread");
675 zfpm_rnodes_iter_init(&zfpm_g->t_conn_down_state.iter);
676 zfpm_g->t_conn_down = NULL;
677 thread_add_timer_msec(zfpm_g->master, zfpm_conn_down_thread_cb, NULL, 0,
678 &zfpm_g->t_conn_down);
679 zfpm_g->stats.t_conn_down_starts++;
680
681 zfpm_set_state(ZFPM_STATE_IDLE, detail);
5adc2528
AS
682}
683
684/*
685 * zfpm_read_cb
686 */
d62a17ae 687static int zfpm_read_cb(struct thread *thread)
5adc2528 688{
d62a17ae 689 size_t already;
690 struct stream *ibuf;
691 uint16_t msg_len;
692 fpm_msg_hdr_t *hdr;
693
694 zfpm_g->stats.read_cb_calls++;
695 zfpm_g->t_read = NULL;
696
697 /*
698 * Check if async connect is now done.
699 */
700 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
701 zfpm_connect_check();
702 return 0;
5adc2528
AS
703 }
704
d62a17ae 705 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
706 assert(zfpm_g->sock >= 0);
5adc2528 707
d62a17ae 708 ibuf = zfpm_g->ibuf;
5adc2528 709
d62a17ae 710 already = stream_get_endp(ibuf);
711 if (already < FPM_MSG_HDR_LEN) {
712 ssize_t nbyte;
5adc2528 713
d62a17ae 714 nbyte = stream_read_try(ibuf, zfpm_g->sock,
715 FPM_MSG_HDR_LEN - already);
716 if (nbyte == 0 || nbyte == -1) {
677f704d
DS
717 if (nbyte == -1) {
718 char buffer[1024];
719
720 sprintf(buffer, "closed socket in read(%d): %s",
721 errno, safe_strerror(errno));
722 zfpm_connection_down(buffer);
996c9314 723 } else
677f704d 724 zfpm_connection_down("closed socket in read");
d62a17ae 725 return 0;
726 }
5adc2528 727
d62a17ae 728 if (nbyte != (ssize_t)(FPM_MSG_HDR_LEN - already))
729 goto done;
5adc2528 730
d62a17ae 731 already = FPM_MSG_HDR_LEN;
732 }
5adc2528 733
d62a17ae 734 stream_set_getp(ibuf, 0);
5adc2528 735
d62a17ae 736 hdr = (fpm_msg_hdr_t *)stream_pnt(ibuf);
5adc2528 737
d62a17ae 738 if (!fpm_msg_hdr_ok(hdr)) {
739 zfpm_connection_down("invalid message header");
740 return 0;
5adc2528
AS
741 }
742
d62a17ae 743 msg_len = fpm_msg_len(hdr);
5adc2528 744
d62a17ae 745 /*
746 * Read out the rest of the packet.
747 */
748 if (already < msg_len) {
749 ssize_t nbyte;
5adc2528 750
d62a17ae 751 nbyte = stream_read_try(ibuf, zfpm_g->sock, msg_len - already);
5adc2528 752
d62a17ae 753 if (nbyte == 0 || nbyte == -1) {
677f704d
DS
754 if (nbyte == -1) {
755 char buffer[1024];
756
757 sprintf(buffer, "failed to read message(%d) %s",
758 errno, safe_strerror(errno));
759 zfpm_connection_down(buffer);
996c9314 760 } else
677f704d 761 zfpm_connection_down("failed to read message");
d62a17ae 762 return 0;
763 }
764
765 if (nbyte != (ssize_t)(msg_len - already))
766 goto done;
767 }
768
769 zfpm_debug("Read out a full fpm message");
770
771 /*
772 * Just throw it away for now.
773 */
774 stream_reset(ibuf);
775
776done:
777 zfpm_read_on();
778 return 0;
5adc2528
AS
779}
780
781/*
782 * zfpm_writes_pending
783 *
784 * Returns TRUE if we may have something to write to the FPM.
785 */
d62a17ae 786static int zfpm_writes_pending(void)
5adc2528
AS
787{
788
d62a17ae 789 /*
790 * Check if there is any data in the outbound buffer that has not
791 * been written to the socket yet.
792 */
793 if (stream_get_endp(zfpm_g->obuf) - stream_get_getp(zfpm_g->obuf))
794 return 1;
5adc2528 795
d62a17ae 796 /*
797 * Check if there are any prefixes on the outbound queue.
798 */
799 if (!TAILQ_EMPTY(&zfpm_g->dest_q))
800 return 1;
5adc2528 801
d62a17ae 802 return 0;
5adc2528
AS
803}
804
805/*
806 * zfpm_encode_route
807 *
808 * Encode a message to the FPM with information about the given route.
809 *
810 * Returns the number of bytes written to the buffer. 0 or a negative
811 * value indicates an error.
812 */
d62a17ae 813static inline int zfpm_encode_route(rib_dest_t *dest, struct route_entry *re,
814 char *in_buf, size_t in_buf_len,
815 fpm_msg_type_e *msg_type)
5adc2528 816{
d62a17ae 817 size_t len;
9bf75362 818#ifdef HAVE_NETLINK
d62a17ae 819 int cmd;
9bf75362 820#endif
d62a17ae 821 len = 0;
5adc2528 822
d62a17ae 823 *msg_type = FPM_MSG_TYPE_NONE;
5adc2528 824
d62a17ae 825 switch (zfpm_g->message_format) {
5adc2528 826
d62a17ae 827 case ZFPM_MSG_FORMAT_PROTOBUF:
fb0aa886 828#ifdef HAVE_PROTOBUF
d62a17ae 829 len = zfpm_protobuf_encode_route(dest, re, (uint8_t *)in_buf,
830 in_buf_len);
831 *msg_type = FPM_MSG_TYPE_PROTOBUF;
fb0aa886 832#endif
d62a17ae 833 break;
5adc2528 834
d62a17ae 835 case ZFPM_MSG_FORMAT_NETLINK:
fb0aa886 836#ifdef HAVE_NETLINK
d62a17ae 837 *msg_type = FPM_MSG_TYPE_NETLINK;
838 cmd = re ? RTM_NEWROUTE : RTM_DELROUTE;
839 len = zfpm_netlink_encode_route(cmd, dest, re, in_buf,
840 in_buf_len);
841 assert(fpm_msg_align(len) == len);
842 *msg_type = FPM_MSG_TYPE_NETLINK;
5adc2528 843#endif /* HAVE_NETLINK */
d62a17ae 844 break;
fb0aa886 845
d62a17ae 846 default:
847 break;
848 }
fb0aa886 849
d62a17ae 850 return len;
5adc2528
AS
851}
852
853/*
854 * zfpm_route_for_update
855 *
f0f77c9a 856 * Returns the re that is to be sent to the FPM for a given dest.
5adc2528 857 */
d62a17ae 858struct route_entry *zfpm_route_for_update(rib_dest_t *dest)
5adc2528 859{
5f7a4718 860 return dest->selected_fib;
5adc2528
AS
861}
862
863/*
864 * zfpm_build_updates
865 *
866 * Process the outgoing queue and write messages to the outbound
867 * buffer.
868 */
d62a17ae 869static void zfpm_build_updates(void)
5adc2528 870{
d62a17ae 871 struct stream *s;
872 rib_dest_t *dest;
873 unsigned char *buf, *data, *buf_end;
874 size_t msg_len;
875 size_t data_len;
876 fpm_msg_hdr_t *hdr;
877 struct route_entry *re;
878 int is_add, write_msg;
879 fpm_msg_type_e msg_type;
880
881 s = zfpm_g->obuf;
882
883 assert(stream_empty(s));
884
885 do {
886
887 /*
888 * Make sure there is enough space to write another message.
889 */
890 if (STREAM_WRITEABLE(s) < FPM_MAX_MSG_LEN)
891 break;
892
893 buf = STREAM_DATA(s) + stream_get_endp(s);
894 buf_end = buf + STREAM_WRITEABLE(s);
895
896 dest = TAILQ_FIRST(&zfpm_g->dest_q);
897 if (!dest)
898 break;
899
900 assert(CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM));
901
902 hdr = (fpm_msg_hdr_t *)buf;
903 hdr->version = FPM_PROTO_VERSION;
904
905 data = fpm_msg_data(hdr);
906
907 re = zfpm_route_for_update(dest);
908 is_add = re ? 1 : 0;
909
910 write_msg = 1;
911
912 /*
913 * If this is a route deletion, and we have not sent the route
914 * to
915 * the FPM previously, skip it.
916 */
917 if (!is_add && !CHECK_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM)) {
918 write_msg = 0;
919 zfpm_g->stats.nop_deletes_skipped++;
920 }
921
922 if (write_msg) {
923 data_len = zfpm_encode_route(dest, re, (char *)data,
924 buf_end - data, &msg_type);
925
926 assert(data_len);
927 if (data_len) {
928 hdr->msg_type = msg_type;
929 msg_len = fpm_data_len_to_msg_len(data_len);
930 hdr->msg_len = htons(msg_len);
931 stream_forward_endp(s, msg_len);
932
933 if (is_add)
934 zfpm_g->stats.route_adds++;
935 else
936 zfpm_g->stats.route_dels++;
937 }
938 }
939
940 /*
941 * Remove the dest from the queue, and reset the flag.
942 */
943 UNSET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
944 TAILQ_REMOVE(&zfpm_g->dest_q, dest, fpm_q_entries);
945
946 if (is_add) {
947 SET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
948 } else {
949 UNSET_FLAG(dest->flags, RIB_DEST_SENT_TO_FPM);
950 }
951
952 /*
953 * Delete the destination if necessary.
954 */
955 if (rib_gc_dest(dest->rnode))
956 zfpm_g->stats.dests_del_after_update++;
957
958 } while (1);
5adc2528
AS
959}
960
961/*
962 * zfpm_write_cb
963 */
d62a17ae 964static int zfpm_write_cb(struct thread *thread)
5adc2528 965{
d62a17ae 966 struct stream *s;
967 int num_writes;
968
969 zfpm_g->stats.write_cb_calls++;
970 zfpm_g->t_write = NULL;
971
972 /*
973 * Check if async connect is now done.
974 */
975 if (zfpm_g->state == ZFPM_STATE_CONNECTING) {
976 zfpm_connect_check();
977 return 0;
978 }
5adc2528 979
d62a17ae 980 assert(zfpm_g->state == ZFPM_STATE_ESTABLISHED);
981 assert(zfpm_g->sock >= 0);
5adc2528 982
d62a17ae 983 num_writes = 0;
5adc2528 984
d62a17ae 985 do {
986 int bytes_to_write, bytes_written;
5adc2528 987
d62a17ae 988 s = zfpm_g->obuf;
5adc2528 989
d62a17ae 990 /*
991 * If the stream is empty, try fill it up with data.
992 */
993 if (stream_empty(s)) {
994 zfpm_build_updates();
995 }
5adc2528 996
d62a17ae 997 bytes_to_write = stream_get_endp(s) - stream_get_getp(s);
998 if (!bytes_to_write)
999 break;
5adc2528 1000
d62a17ae 1001 bytes_written =
2d34fb80 1002 write(zfpm_g->sock, stream_pnt(s), bytes_to_write);
d62a17ae 1003 zfpm_g->stats.write_calls++;
1004 num_writes++;
5adc2528 1005
d62a17ae 1006 if (bytes_written < 0) {
1007 if (ERRNO_IO_RETRY(errno))
1008 break;
5adc2528 1009
d62a17ae 1010 zfpm_connection_down("failed to write to socket");
1011 return 0;
1012 }
5adc2528 1013
d62a17ae 1014 if (bytes_written != bytes_to_write) {
5adc2528 1015
d62a17ae 1016 /*
1017 * Partial write.
1018 */
1019 stream_forward_getp(s, bytes_written);
1020 zfpm_g->stats.partial_writes++;
1021 break;
1022 }
5adc2528 1023
d62a17ae 1024 /*
1025 * We've written out the entire contents of the stream.
1026 */
1027 stream_reset(s);
5adc2528 1028
d62a17ae 1029 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN) {
1030 zfpm_g->stats.max_writes_hit++;
1031 break;
1032 }
5adc2528 1033
d62a17ae 1034 if (zfpm_thread_should_yield(thread)) {
1035 zfpm_g->stats.t_write_yields++;
1036 break;
1037 }
1038 } while (1);
5adc2528 1039
d62a17ae 1040 if (zfpm_writes_pending())
1041 zfpm_write_on();
5adc2528 1042
d62a17ae 1043 return 0;
5adc2528
AS
1044}
1045
1046/*
1047 * zfpm_connect_cb
1048 */
d62a17ae 1049static int zfpm_connect_cb(struct thread *t)
5adc2528 1050{
d62a17ae 1051 int sock, ret;
1052 struct sockaddr_in serv;
1053
1054 zfpm_g->t_connect = NULL;
1055 assert(zfpm_g->state == ZFPM_STATE_ACTIVE);
1056
1057 sock = socket(AF_INET, SOCK_STREAM, 0);
1058 if (sock < 0) {
1059 zfpm_debug("Failed to create socket for connect(): %s",
1060 strerror(errno));
1061 zfpm_g->stats.connect_no_sock++;
1062 return 0;
1063 }
1064
1065 set_nonblocking(sock);
1066
1067 /* Make server socket. */
1068 memset(&serv, 0, sizeof(serv));
1069 serv.sin_family = AF_INET;
1070 serv.sin_port = htons(zfpm_g->fpm_port);
5adc2528 1071#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
d62a17ae 1072 serv.sin_len = sizeof(struct sockaddr_in);
5adc2528 1073#endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
d62a17ae 1074 if (!zfpm_g->fpm_server)
1075 serv.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1076 else
1077 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1078
1079 /*
1080 * Connect to the FPM.
1081 */
1082 zfpm_g->connect_calls++;
1083 zfpm_g->stats.connect_calls++;
1084 zfpm_g->last_connect_call_time = monotime(NULL);
1085
1086 ret = connect(sock, (struct sockaddr *)&serv, sizeof(serv));
1087 if (ret >= 0) {
1088 zfpm_g->sock = sock;
1089 zfpm_connection_up("connect succeeded");
1090 return 1;
1091 }
1092
1093 if (errno == EINPROGRESS) {
1094 zfpm_g->sock = sock;
1095 zfpm_read_on();
1096 zfpm_write_on();
1097 zfpm_set_state(ZFPM_STATE_CONNECTING,
1098 "async connect in progress");
1099 return 0;
1100 }
1101
1102 zlog_info("can't connect to FPM %d: %s", sock, safe_strerror(errno));
1103 close(sock);
1104
1105 /*
1106 * Restart timer for retrying connection.
1107 */
1108 zfpm_start_connect_timer("connect() failed");
1109 return 0;
5adc2528
AS
1110}
1111
1112/*
1113 * zfpm_set_state
1114 *
1115 * Move state machine into the given state.
1116 */
d62a17ae 1117static void zfpm_set_state(zfpm_state_t state, const char *reason)
5adc2528 1118{
d62a17ae 1119 zfpm_state_t cur_state = zfpm_g->state;
1120
1121 if (!reason)
1122 reason = "Unknown";
1123
1124 if (state == cur_state)
1125 return;
1126
1127 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1128 zfpm_state_to_str(cur_state), zfpm_state_to_str(state),
1129 reason);
1130
1131 switch (state) {
1132
1133 case ZFPM_STATE_IDLE:
1134 assert(cur_state == ZFPM_STATE_ESTABLISHED);
1135 break;
1136
1137 case ZFPM_STATE_ACTIVE:
1138 assert(cur_state == ZFPM_STATE_IDLE
1139 || cur_state == ZFPM_STATE_CONNECTING);
1140 assert(zfpm_g->t_connect);
1141 break;
1142
1143 case ZFPM_STATE_CONNECTING:
1144 assert(zfpm_g->sock);
1145 assert(cur_state == ZFPM_STATE_ACTIVE);
1146 assert(zfpm_g->t_read);
1147 assert(zfpm_g->t_write);
1148 break;
1149
1150 case ZFPM_STATE_ESTABLISHED:
1151 assert(cur_state == ZFPM_STATE_ACTIVE
1152 || cur_state == ZFPM_STATE_CONNECTING);
1153 assert(zfpm_g->sock);
1154 assert(zfpm_g->t_read);
1155 assert(zfpm_g->t_write);
1156 break;
1157 }
1158
1159 zfpm_g->state = state;
5adc2528
AS
1160}
1161
1162/*
1163 * zfpm_calc_connect_delay
1164 *
1165 * Returns the number of seconds after which we should attempt to
1166 * reconnect to the FPM.
1167 */
d62a17ae 1168static long zfpm_calc_connect_delay(void)
5adc2528 1169{
d62a17ae 1170 time_t elapsed;
5adc2528 1171
d62a17ae 1172 /*
1173 * Return 0 if this is our first attempt to connect.
1174 */
1175 if (zfpm_g->connect_calls == 0) {
1176 return 0;
1177 }
5adc2528 1178
d62a17ae 1179 elapsed = zfpm_get_elapsed_time(zfpm_g->last_connect_call_time);
5adc2528 1180
d62a17ae 1181 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1182 return 0;
1183 }
5adc2528 1184
d62a17ae 1185 return ZFPM_CONNECT_RETRY_IVL - elapsed;
5adc2528
AS
1186}
1187
1188/*
1189 * zfpm_start_connect_timer
1190 */
d62a17ae 1191static void zfpm_start_connect_timer(const char *reason)
5adc2528 1192{
d62a17ae 1193 long delay_secs;
5adc2528 1194
d62a17ae 1195 assert(!zfpm_g->t_connect);
1196 assert(zfpm_g->sock < 0);
5adc2528 1197
d62a17ae 1198 assert(zfpm_g->state == ZFPM_STATE_IDLE
1199 || zfpm_g->state == ZFPM_STATE_ACTIVE
1200 || zfpm_g->state == ZFPM_STATE_CONNECTING);
5adc2528 1201
d62a17ae 1202 delay_secs = zfpm_calc_connect_delay();
1203 zfpm_debug("scheduling connect in %ld seconds", delay_secs);
5adc2528 1204
d62a17ae 1205 thread_add_timer(zfpm_g->master, zfpm_connect_cb, 0, delay_secs,
1206 &zfpm_g->t_connect);
1207 zfpm_set_state(ZFPM_STATE_ACTIVE, reason);
5adc2528
AS
1208}
1209
1210/*
1211 * zfpm_is_enabled
1212 *
1213 * Returns TRUE if the zebra FPM module has been enabled.
1214 */
d62a17ae 1215static inline int zfpm_is_enabled(void)
5adc2528 1216{
d62a17ae 1217 return zfpm_g->enabled;
5adc2528
AS
1218}
1219
1220/*
1221 * zfpm_conn_is_up
1222 *
1223 * Returns TRUE if the connection to the FPM is up.
1224 */
d62a17ae 1225static inline int zfpm_conn_is_up(void)
5adc2528 1226{
d62a17ae 1227 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1228 return 0;
5adc2528 1229
d62a17ae 1230 assert(zfpm_g->sock >= 0);
5adc2528 1231
d62a17ae 1232 return 1;
5adc2528
AS
1233}
1234
1235/*
1236 * zfpm_trigger_update
1237 *
1238 * The zebra code invokes this function to indicate that we should
1239 * send an update to the FPM about the given route_node.
1240 */
d62a17ae 1241static int zfpm_trigger_update(struct route_node *rn, const char *reason)
5adc2528 1242{
d62a17ae 1243 rib_dest_t *dest;
1244 char buf[PREFIX_STRLEN];
1245
1246 /*
1247 * Ignore if the connection is down. We will update the FPM about
1248 * all destinations once the connection comes up.
1249 */
1250 if (!zfpm_conn_is_up())
1251 return 0;
1252
1253 dest = rib_dest_from_rnode(rn);
1254
d62a17ae 1255 if (CHECK_FLAG(dest->flags, RIB_DEST_UPDATE_FPM)) {
1256 zfpm_g->stats.redundant_triggers++;
1257 return 0;
1258 }
1259
1260 if (reason) {
1261 zfpm_debug("%s triggering update to FPM - Reason: %s",
1262 prefix2str(&rn->p, buf, sizeof(buf)), reason);
1263 }
1264
1265 SET_FLAG(dest->flags, RIB_DEST_UPDATE_FPM);
1266 TAILQ_INSERT_TAIL(&zfpm_g->dest_q, dest, fpm_q_entries);
1267 zfpm_g->stats.updates_triggered++;
1268
1269 /*
1270 * Make sure that writes are enabled.
1271 */
1272 if (zfpm_g->t_write)
1273 return 0;
1274
1275 zfpm_write_on();
1276 return 0;
5adc2528
AS
1277}
1278
1279/*
1280 * zfpm_stats_timer_cb
1281 */
d62a17ae 1282static int zfpm_stats_timer_cb(struct thread *t)
5adc2528 1283{
d62a17ae 1284 zfpm_g->t_stats = NULL;
5adc2528 1285
d62a17ae 1286 /*
1287 * Remember the stats collected in the last interval for display
1288 * purposes.
1289 */
1290 zfpm_stats_copy(&zfpm_g->stats, &zfpm_g->last_ivl_stats);
5adc2528 1291
d62a17ae 1292 /*
1293 * Add the current set of stats into the cumulative statistics.
1294 */
1295 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1296 &zfpm_g->cumulative_stats);
5adc2528 1297
d62a17ae 1298 /*
1299 * Start collecting stats afresh over the next interval.
1300 */
1301 zfpm_stats_reset(&zfpm_g->stats);
5adc2528 1302
d62a17ae 1303 zfpm_start_stats_timer();
5adc2528 1304
d62a17ae 1305 return 0;
5adc2528
AS
1306}
1307
1308/*
1309 * zfpm_stop_stats_timer
1310 */
d62a17ae 1311static void zfpm_stop_stats_timer(void)
5adc2528 1312{
d62a17ae 1313 if (!zfpm_g->t_stats)
1314 return;
5adc2528 1315
d62a17ae 1316 zfpm_debug("Stopping existing stats timer");
1317 THREAD_TIMER_OFF(zfpm_g->t_stats);
5adc2528
AS
1318}
1319
1320/*
1321 * zfpm_start_stats_timer
1322 */
d62a17ae 1323void zfpm_start_stats_timer(void)
5adc2528 1324{
d62a17ae 1325 assert(!zfpm_g->t_stats);
5adc2528 1326
d62a17ae 1327 thread_add_timer(zfpm_g->master, zfpm_stats_timer_cb, 0,
1328 ZFPM_STATS_IVL_SECS, &zfpm_g->t_stats);
5adc2528
AS
1329}
1330
1331/*
1332 * Helper macro for zfpm_show_stats() below.
1333 */
d62a17ae 1334#define ZFPM_SHOW_STAT(counter) \
1335 do { \
1336 vty_out(vty, "%-40s %10lu %16lu\n", #counter, \
1337 total_stats.counter, zfpm_g->last_ivl_stats.counter); \
1338 } while (0)
5adc2528
AS
1339
1340/*
1341 * zfpm_show_stats
1342 */
d62a17ae 1343static void zfpm_show_stats(struct vty *vty)
5adc2528 1344{
d62a17ae 1345 zfpm_stats_t total_stats;
1346 time_t elapsed;
1347
1348 vty_out(vty, "\n%-40s %10s Last %2d secs\n\n", "Counter", "Total",
1349 ZFPM_STATS_IVL_SECS);
1350
1351 /*
1352 * Compute the total stats up to this instant.
1353 */
1354 zfpm_stats_compose(&zfpm_g->cumulative_stats, &zfpm_g->stats,
1355 &total_stats);
1356
1357 ZFPM_SHOW_STAT(connect_calls);
1358 ZFPM_SHOW_STAT(connect_no_sock);
1359 ZFPM_SHOW_STAT(read_cb_calls);
1360 ZFPM_SHOW_STAT(write_cb_calls);
1361 ZFPM_SHOW_STAT(write_calls);
1362 ZFPM_SHOW_STAT(partial_writes);
1363 ZFPM_SHOW_STAT(max_writes_hit);
1364 ZFPM_SHOW_STAT(t_write_yields);
1365 ZFPM_SHOW_STAT(nop_deletes_skipped);
1366 ZFPM_SHOW_STAT(route_adds);
1367 ZFPM_SHOW_STAT(route_dels);
1368 ZFPM_SHOW_STAT(updates_triggered);
d62a17ae 1369 ZFPM_SHOW_STAT(redundant_triggers);
1370 ZFPM_SHOW_STAT(dests_del_after_update);
1371 ZFPM_SHOW_STAT(t_conn_down_starts);
1372 ZFPM_SHOW_STAT(t_conn_down_dests_processed);
1373 ZFPM_SHOW_STAT(t_conn_down_yields);
1374 ZFPM_SHOW_STAT(t_conn_down_finishes);
1375 ZFPM_SHOW_STAT(t_conn_up_starts);
1376 ZFPM_SHOW_STAT(t_conn_up_dests_processed);
1377 ZFPM_SHOW_STAT(t_conn_up_yields);
1378 ZFPM_SHOW_STAT(t_conn_up_aborts);
1379 ZFPM_SHOW_STAT(t_conn_up_finishes);
1380
1381 if (!zfpm_g->last_stats_clear_time)
1382 return;
1383
1384 elapsed = zfpm_get_elapsed_time(zfpm_g->last_stats_clear_time);
1385
1386 vty_out(vty, "\nStats were cleared %lu seconds ago\n",
1387 (unsigned long)elapsed);
5adc2528
AS
1388}
1389
1390/*
1391 * zfpm_clear_stats
1392 */
d62a17ae 1393static void zfpm_clear_stats(struct vty *vty)
5adc2528 1394{
d62a17ae 1395 if (!zfpm_is_enabled()) {
1396 vty_out(vty, "The FPM module is not enabled...\n");
1397 return;
1398 }
5adc2528 1399
d62a17ae 1400 zfpm_stats_reset(&zfpm_g->stats);
1401 zfpm_stats_reset(&zfpm_g->last_ivl_stats);
1402 zfpm_stats_reset(&zfpm_g->cumulative_stats);
5adc2528 1403
d62a17ae 1404 zfpm_stop_stats_timer();
1405 zfpm_start_stats_timer();
5adc2528 1406
d62a17ae 1407 zfpm_g->last_stats_clear_time = monotime(NULL);
5adc2528 1408
d62a17ae 1409 vty_out(vty, "Cleared FPM stats\n");
5adc2528
AS
1410}
1411
1412/*
1413 * show_zebra_fpm_stats
1414 */
1415DEFUN (show_zebra_fpm_stats,
1416 show_zebra_fpm_stats_cmd,
1417 "show zebra fpm stats",
1418 SHOW_STR
41e7fb80 1419 ZEBRA_STR
5adc2528
AS
1420 "Forwarding Path Manager information\n"
1421 "Statistics\n")
1422{
d62a17ae 1423 zfpm_show_stats(vty);
1424 return CMD_SUCCESS;
5adc2528
AS
1425}
1426
1427/*
1428 * clear_zebra_fpm_stats
1429 */
1430DEFUN (clear_zebra_fpm_stats,
1431 clear_zebra_fpm_stats_cmd,
1432 "clear zebra fpm stats",
1433 CLEAR_STR
41e7fb80 1434 ZEBRA_STR
5adc2528
AS
1435 "Clear Forwarding Path Manager information\n"
1436 "Statistics\n")
1437{
d62a17ae 1438 zfpm_clear_stats(vty);
1439 return CMD_SUCCESS;
5adc2528
AS
1440}
1441
711ff0ba 1442/*
d62a17ae 1443 * update fpm connection information
711ff0ba 1444 */
e52702f2
QY
1445DEFUN ( fpm_remote_ip,
1446 fpm_remote_ip_cmd,
1447 "fpm connection ip A.B.C.D port (1-65535)",
711ff0ba
USK
1448 "fpm connection remote ip and port\n"
1449 "Remote fpm server ip A.B.C.D\n"
1450 "Enter ip ")
1451{
1452
d62a17ae 1453 in_addr_t fpm_server;
1454 uint32_t port_no;
711ff0ba 1455
d62a17ae 1456 fpm_server = inet_addr(argv[3]->arg);
1457 if (fpm_server == INADDR_NONE)
1458 return CMD_ERR_INCOMPLETE;
711ff0ba 1459
d62a17ae 1460 port_no = atoi(argv[5]->arg);
1461 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1462 return CMD_ERR_INCOMPLETE;
711ff0ba 1463
d62a17ae 1464 zfpm_g->fpm_server = fpm_server;
1465 zfpm_g->fpm_port = port_no;
711ff0ba
USK
1466
1467
d62a17ae 1468 return CMD_SUCCESS;
711ff0ba
USK
1469}
1470
e52702f2
QY
1471DEFUN ( no_fpm_remote_ip,
1472 no_fpm_remote_ip_cmd,
1473 "no fpm connection ip A.B.C.D port (1-65535)",
711ff0ba
USK
1474 "fpm connection remote ip and port\n"
1475 "Connection\n"
1476 "Remote fpm server ip A.B.C.D\n"
1477 "Enter ip ")
1478{
d62a17ae 1479 if (zfpm_g->fpm_server != inet_addr(argv[4]->arg)
1480 || zfpm_g->fpm_port != atoi(argv[6]->arg))
1481 return CMD_ERR_NO_MATCH;
711ff0ba 1482
d62a17ae 1483 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1484 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
711ff0ba 1485
d62a17ae 1486 return CMD_SUCCESS;
711ff0ba 1487}
711ff0ba 1488
fb0aa886
AS
1489/*
1490 * zfpm_init_message_format
1491 */
d62a17ae 1492static inline void zfpm_init_message_format(const char *format)
fb0aa886 1493{
d62a17ae 1494 int have_netlink, have_protobuf;
fb0aa886 1495
fb0aa886 1496#ifdef HAVE_NETLINK
d62a17ae 1497 have_netlink = 1;
4b2792b5 1498#else
d62a17ae 1499 have_netlink = 0;
fb0aa886
AS
1500#endif
1501
1502#ifdef HAVE_PROTOBUF
d62a17ae 1503 have_protobuf = 1;
4b2792b5 1504#else
d62a17ae 1505 have_protobuf = 0;
fb0aa886
AS
1506#endif
1507
d62a17ae 1508 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
fb0aa886 1509
d62a17ae 1510 if (!format) {
1511 if (have_netlink) {
1512 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1513 } else if (have_protobuf) {
1514 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1515 }
1516 return;
fb0aa886 1517 }
fb0aa886 1518
d62a17ae 1519 if (!strcmp("netlink", format)) {
1520 if (!have_netlink) {
1c50c1c0
QY
1521 flog_err(EC_ZEBRA_NETLINK_NOT_AVAILABLE,
1522 "FPM netlink message format is not available");
d62a17ae 1523 return;
1524 }
1525 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1526 return;
fb0aa886 1527 }
fb0aa886 1528
d62a17ae 1529 if (!strcmp("protobuf", format)) {
1530 if (!have_protobuf) {
af4c2728 1531 flog_err(
e914ccbe 1532 EC_ZEBRA_PROTOBUF_NOT_AVAILABLE,
d62a17ae 1533 "FPM protobuf message format is not available");
1534 return;
1535 }
1536 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1537 return;
fb0aa886 1538 }
fb0aa886 1539
e914ccbe 1540 flog_warn(EC_ZEBRA_FPM_FORMAT_UNKNOWN, "Unknown fpm format '%s'",
9df414fe 1541 format);
fb0aa886
AS
1542}
1543
711ff0ba 1544/**
d62a17ae 1545 * fpm_remote_srv_write
711ff0ba 1546 *
d62a17ae 1547 * Module to write remote fpm connection
711ff0ba
USK
1548 *
1549 * Returns ZERO on success.
1550 */
1551
d62a17ae 1552static int fpm_remote_srv_write(struct vty *vty)
711ff0ba 1553{
d62a17ae 1554 struct in_addr in;
711ff0ba 1555
d62a17ae 1556 in.s_addr = zfpm_g->fpm_server;
711ff0ba 1557
9d1c2659 1558 if ((zfpm_g->fpm_server != FPM_DEFAULT_IP
996c9314
LB
1559 && zfpm_g->fpm_server != INADDR_ANY)
1560 || (zfpm_g->fpm_port != FPM_DEFAULT_PORT && zfpm_g->fpm_port != 0))
d62a17ae 1561 vty_out(vty, "fpm connection ip %s port %d\n", inet_ntoa(in),
1562 zfpm_g->fpm_port);
711ff0ba 1563
d62a17ae 1564 return 0;
711ff0ba
USK
1565}
1566
1567
4f8ea50c 1568/* Zebra node */
d62a17ae 1569static struct cmd_node zebra_node = {ZEBRA_NODE, "", 1};
4f8ea50c
DL
1570
1571
5adc2528
AS
1572/**
1573 * zfpm_init
1574 *
1575 * One-time initialization of the Zebra FPM module.
1576 *
1577 * @param[in] port port at which FPM is running.
1578 * @param[in] enable TRUE if the zebra FPM module should be enabled
fb0aa886 1579 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
5adc2528
AS
1580 *
1581 * Returns TRUE on success.
1582 */
d62a17ae 1583static int zfpm_init(struct thread_master *master)
5adc2528 1584{
d62a17ae 1585 int enable = 1;
1586 uint16_t port = 0;
1587 const char *format = THIS_MODULE->load_args;
5adc2528 1588
d62a17ae 1589 memset(zfpm_g, 0, sizeof(*zfpm_g));
1590 zfpm_g->master = master;
1591 TAILQ_INIT(&zfpm_g->dest_q);
1592 zfpm_g->sock = -1;
1593 zfpm_g->state = ZFPM_STATE_IDLE;
5adc2528 1594
d62a17ae 1595 zfpm_stats_init(&zfpm_g->stats);
1596 zfpm_stats_init(&zfpm_g->last_ivl_stats);
1597 zfpm_stats_init(&zfpm_g->cumulative_stats);
5adc2528 1598
d62a17ae 1599 install_node(&zebra_node, fpm_remote_srv_write);
1600 install_element(ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1601 install_element(ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1602 install_element(CONFIG_NODE, &fpm_remote_ip_cmd);
1603 install_element(CONFIG_NODE, &no_fpm_remote_ip_cmd);
5adc2528 1604
d62a17ae 1605 zfpm_init_message_format(format);
fb0aa886 1606
d62a17ae 1607 /*
1608 * Disable FPM interface if no suitable format is available.
1609 */
1610 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1611 enable = 0;
fb0aa886 1612
d62a17ae 1613 zfpm_g->enabled = enable;
5adc2528 1614
d62a17ae 1615 if (!zfpm_g->fpm_server)
1616 zfpm_g->fpm_server = FPM_DEFAULT_IP;
711ff0ba 1617
d62a17ae 1618 if (!port)
1619 port = FPM_DEFAULT_PORT;
5adc2528 1620
d62a17ae 1621 zfpm_g->fpm_port = port;
5adc2528 1622
d62a17ae 1623 zfpm_g->obuf = stream_new(ZFPM_OBUF_SIZE);
1624 zfpm_g->ibuf = stream_new(ZFPM_IBUF_SIZE);
5adc2528 1625
d62a17ae 1626 zfpm_start_stats_timer();
1627 zfpm_start_connect_timer("initialized");
1628 return 0;
4f8ea50c 1629}
5adc2528 1630
d62a17ae 1631static int zebra_fpm_module_init(void)
4f8ea50c 1632{
d62a17ae 1633 hook_register(rib_update, zfpm_trigger_update);
1634 hook_register(frr_late_init, zfpm_init);
1635 return 0;
5adc2528 1636}
4f8ea50c 1637
d62a17ae 1638FRR_MODULE_SETUP(.name = "zebra_fpm", .version = FRR_VERSION,
1639 .description = "zebra FPM (Forwarding Plane Manager) module",
1640 .init = zebra_fpm_module_init, )