2 * Zebra opaque message handler module
3 * Copyright (c) 2020 Volta Networks, Inc.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include "lib/debug.h"
23 #include "lib/frr_pthread.h"
24 #include "lib/stream.h"
25 #include "zebra/debug.h"
26 #include "zebra/zserv.h"
27 #include "zebra/zebra_opaque.h"
30 DEFINE_MTYPE_STATIC(ZEBRA
, OPQ
, "ZAPI Opaque Information");
32 /* Hash to hold message registration info from zapi clients */
33 PREDECL_HASH(opq_regh
);
35 /* Registered client info */
36 struct opq_client_reg
{
41 struct opq_client_reg
*next
;
42 struct opq_client_reg
*prev
;
45 /* Opaque message registration info */
47 struct opq_regh_item item
;
52 struct opq_client_reg
*clients
;
55 /* Registration helper prototypes */
56 static uint32_t registration_hash(const struct opq_msg_reg
*reg
);
57 static int registration_compare(const struct opq_msg_reg
*reg1
,
58 const struct opq_msg_reg
*reg2
);
60 DECLARE_HASH(opq_regh
, struct opq_msg_reg
, item
, registration_compare
,
63 static struct opq_regh_head opq_reg_hash
;
68 static struct zebra_opaque_globals
{
70 /* Sentinel for run or start of shutdown */
73 /* Limit number of pending, unprocessed updates */
74 _Atomic
uint32_t max_queued_updates
;
76 /* Limit number of new messages dequeued at once, to pace an
79 uint32_t msgs_per_cycle
;
81 /* Stats: counters of incoming messages, errors, and yields (when
82 * the limit has been reached.)
84 _Atomic
uint32_t msgs_in
;
85 _Atomic
uint32_t msg_errors
;
86 _Atomic
uint32_t yields
;
89 struct frr_pthread
*pthread
;
91 /* Event-delivery context 'master' for the module */
92 struct thread_master
*master
;
94 /* Event/'thread' pointer for queued zapi messages */
95 struct thread
*t_msgs
;
97 /* Input fifo queue to the module, and lock to protect it. */
98 pthread_mutex_t mutex
;
99 struct stream_fifo in_fifo
;
103 /* Name string for debugs/logs */
104 static const char LOG_NAME
[] = "Zebra Opaque";
108 /* Main event loop, processing incoming message queue */
109 static int process_messages(struct thread
*event
);
110 static int handle_opq_registration(const struct zmsghdr
*hdr
,
112 static int handle_opq_unregistration(const struct zmsghdr
*hdr
,
114 static int dispatch_opq_messages(struct stream_fifo
*msg_fifo
);
115 static struct opq_msg_reg
*opq_reg_lookup(uint32_t type
);
116 static bool opq_client_match(const struct opq_client_reg
*client
,
117 const struct zapi_opaque_reg_info
*info
);
118 static struct opq_msg_reg
*opq_reg_alloc(uint32_t type
);
119 static void opq_reg_free(struct opq_msg_reg
**reg
);
120 static struct opq_client_reg
*opq_client_alloc(
121 const struct zapi_opaque_reg_info
*info
);
122 static void opq_client_free(struct opq_client_reg
**client
);
123 static const char *opq_client2str(char *buf
, size_t buflen
,
124 const struct opq_client_reg
*client
);
127 * Initialize the module at startup
129 void zebra_opaque_init(void)
131 memset(&zo_info
, 0, sizeof(zo_info
));
133 pthread_mutex_init(&zo_info
.mutex
, NULL
);
134 stream_fifo_init(&zo_info
.in_fifo
);
136 zo_info
.msgs_per_cycle
= ZEBRA_OPAQUE_MSG_LIMIT
;
140 * Start the module pthread. This step is run later than the
141 * 'init' step, in case zebra has fork-ed.
143 void zebra_opaque_start(void)
145 struct frr_pthread_attr pattr
= {
146 .start
= frr_pthread_attr_default
.start
,
147 .stop
= frr_pthread_attr_default
.stop
150 if (IS_ZEBRA_DEBUG_EVENT
)
151 zlog_debug("%s module starting", LOG_NAME
);
154 zo_info
.pthread
= frr_pthread_new(&pattr
, "Zebra Opaque thread",
157 /* Associate event 'master' */
158 zo_info
.master
= zo_info
.pthread
->master
;
160 atomic_store_explicit(&zo_info
.run
, 1, memory_order_relaxed
);
162 /* Enqueue an initial event for the pthread */
163 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
166 /* And start the pthread */
167 frr_pthread_run(zo_info
.pthread
, NULL
);
171 * Module stop, halting the dedicated pthread; called from the main pthread.
173 void zebra_opaque_stop(void)
175 if (IS_ZEBRA_DEBUG_EVENT
)
176 zlog_debug("%s module stop", LOG_NAME
);
178 atomic_store_explicit(&zo_info
.run
, 0, memory_order_relaxed
);
180 frr_pthread_stop(zo_info
.pthread
, NULL
);
182 frr_pthread_destroy(zo_info
.pthread
);
184 if (IS_ZEBRA_DEBUG_EVENT
)
185 zlog_debug("%s module stop complete", LOG_NAME
);
189 * Module final cleanup, called from the zebra main pthread.
191 void zebra_opaque_finish(void)
193 struct opq_msg_reg
*reg
;
194 struct opq_client_reg
*client
;
196 if (IS_ZEBRA_DEBUG_EVENT
)
197 zlog_debug("%s module shutdown", LOG_NAME
);
199 /* Clear out registration info */
200 while ((reg
= opq_regh_pop(&opq_reg_hash
)) != NULL
) {
201 client
= reg
->clients
;
203 reg
->clients
= client
->next
;
204 opq_client_free(&client
);
205 client
= reg
->clients
;
211 opq_regh_fini(&opq_reg_hash
);
213 pthread_mutex_destroy(&zo_info
.mutex
);
214 stream_fifo_deinit(&zo_info
.in_fifo
);
218 * Does this module handle (intercept) the specified zapi message type?
220 bool zebra_opaque_handles_msgid(uint16_t id
)
225 case ZEBRA_OPAQUE_MESSAGE
:
226 case ZEBRA_OPAQUE_REGISTER
:
227 case ZEBRA_OPAQUE_UNREGISTER
:
238 * Enqueue a batch of messages for processing - this is the public api
239 * used from the zapi processing threads.
241 uint32_t zebra_opaque_enqueue_batch(struct stream_fifo
*batch
)
243 uint32_t counter
= 0;
246 /* Dequeue messages from the incoming batch, and save them
247 * on the module fifo.
249 frr_with_mutex(&zo_info
.mutex
) {
250 msg
= stream_fifo_pop(batch
);
252 stream_fifo_push(&zo_info
.in_fifo
, msg
);
254 msg
= stream_fifo_pop(batch
);
258 /* Schedule module pthread to process the batch */
260 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
261 zlog_debug("%s: received %u messages",
263 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
271 * Pthread event loop, process the incoming message queue.
273 static int process_messages(struct thread
*event
)
275 struct stream_fifo fifo
;
278 bool need_resched
= false;
280 stream_fifo_init(&fifo
);
282 /* Check for zebra shutdown */
283 if (atomic_load_explicit(&zo_info
.run
, memory_order_relaxed
) == 0)
287 * Dequeue some messages from the incoming queue, temporarily
288 * save them on the local fifo
290 frr_with_mutex(&zo_info
.mutex
) {
292 for (i
= 0; i
< zo_info
.msgs_per_cycle
; i
++) {
293 msg
= stream_fifo_pop(&zo_info
.in_fifo
);
297 stream_fifo_push(&fifo
, msg
);
301 * We may need to reschedule, if there are still
304 if (stream_fifo_head(&zo_info
.in_fifo
) != NULL
)
309 atomic_fetch_add_explicit(&zo_info
.msgs_in
, i
, memory_order_relaxed
);
311 /* Check for zebra shutdown */
312 if (atomic_load_explicit(&zo_info
.run
, memory_order_relaxed
) == 0) {
313 need_resched
= false;
317 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
318 zlog_debug("%s: processing %u messages", __func__
, i
);
321 * Process the messages from the temporary fifo. We send the whole
322 * fifo so that we can take advantage of batching internally. Note
323 * that registration/deregistration messages are handled here also.
325 dispatch_opq_messages(&fifo
);
330 atomic_fetch_add_explicit(&zo_info
.yields
, 1,
331 memory_order_relaxed
);
332 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
336 /* This will also free any leftover messages, in the shutdown case */
337 stream_fifo_deinit(&fifo
);
343 * Process (dispatch) or drop opaque messages.
345 static int dispatch_opq_messages(struct stream_fifo
*msg_fifo
)
347 struct stream
*msg
, *dup
;
349 struct zapi_opaque_msg info
;
350 struct opq_msg_reg
*reg
;
352 struct opq_client_reg
*client
;
353 struct zserv
*zclient
;
356 while ((msg
= stream_fifo_pop(msg_fifo
)) != NULL
) {
357 zapi_parse_header(msg
, &hdr
);
358 hdr
.length
-= ZEBRA_HEADER_SIZE
;
360 /* Handle client registration messages */
361 if (hdr
.command
== ZEBRA_OPAQUE_REGISTER
) {
362 handle_opq_registration(&hdr
, msg
);
364 } else if (hdr
.command
== ZEBRA_OPAQUE_UNREGISTER
) {
365 handle_opq_unregistration(&hdr
, msg
);
369 /* We only process OPAQUE messages - drop anything else */
370 if (hdr
.command
!= ZEBRA_OPAQUE_MESSAGE
)
373 /* Dispatch to any registered ZAPI client(s) */
375 /* Extract subtype and flags */
376 ret
= zclient_opaque_decode(msg
, &info
);
380 /* Look up registered ZAPI client(s) */
381 reg
= opq_reg_lookup(info
.type
);
383 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
384 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
385 __func__
, info
.type
, info
.flags
);
389 /* Reset read pointer, since we'll be re-sending message */
390 stream_set_getp(msg
, 0);
392 /* Send a copy of the message to all registered clients */
393 for (client
= reg
->clients
; client
; client
= client
->next
) {
396 if (CHECK_FLAG(info
.flags
, ZAPI_OPAQUE_FLAG_UNICAST
)) {
398 if (client
->proto
!= info
.proto
||
399 client
->instance
!= info
.instance
||
400 client
->session_id
!= info
.session_id
)
403 if (IS_ZEBRA_DEBUG_RECV
&&
404 IS_ZEBRA_DEBUG_DETAIL
)
405 zlog_debug("%s: found matching unicast client %s",
412 /* Copy message if more clients */
414 dup
= stream_dup(msg
);
418 * TODO -- this isn't ideal: we're going through an
419 * acquire/release cycle for each client for each
420 * message. Replace this with a batching version.
422 zclient
= zserv_acquire_client(client
->proto
,
426 if (IS_ZEBRA_DEBUG_SEND
&&
427 IS_ZEBRA_DEBUG_DETAIL
)
428 zlog_debug("%s: sending %s to client %s",
430 (dup
? "dup" : "msg"),
436 * Sending a message actually means enqueuing
437 * it for a zapi io pthread to send - so we
438 * don't touch the message after this call.
440 zserv_send_message(zclient
, dup
? dup
: msg
);
446 zserv_release_client(zclient
);
448 if (IS_ZEBRA_DEBUG_RECV
&&
449 IS_ZEBRA_DEBUG_DETAIL
)
450 zlog_debug("%s: type %u: no zclient for %s",
455 /* Registered but gone? */
460 /* If unicast, we're done */
461 if (CHECK_FLAG(info
.flags
, ZAPI_OPAQUE_FLAG_UNICAST
))
475 * Process a register/unregister message
477 static int handle_opq_registration(const struct zmsghdr
*hdr
,
481 struct zapi_opaque_reg_info info
;
482 struct opq_client_reg
*client
;
483 struct opq_msg_reg key
, *reg
;
486 memset(&info
, 0, sizeof(info
));
488 if (zapi_opaque_reg_decode(msg
, &info
) < 0) {
493 memset(&key
, 0, sizeof(key
));
495 key
.type
= info
.type
;
497 reg
= opq_regh_find(&opq_reg_hash
, &key
);
499 /* Look for dup client */
500 for (client
= reg
->clients
; client
!= NULL
;
501 client
= client
->next
) {
502 if (opq_client_match(client
, &info
))
507 /* Oops - duplicate registration? */
508 if (IS_ZEBRA_DEBUG_RECV
)
509 zlog_debug("%s: duplicate opq reg for client %s",
511 opq_client2str(buf
, sizeof(buf
),
516 client
= opq_client_alloc(&info
);
518 if (IS_ZEBRA_DEBUG_RECV
)
519 zlog_debug("%s: client %s registers for %u",
521 opq_client2str(buf
, sizeof(buf
), client
),
524 /* Link client into registration */
525 client
->next
= reg
->clients
;
527 reg
->clients
->prev
= client
;
528 reg
->clients
= client
;
531 * No existing registrations - create one, add the
532 * client, and add registration to hash.
534 reg
= opq_reg_alloc(info
.type
);
535 client
= opq_client_alloc(&info
);
537 if (IS_ZEBRA_DEBUG_RECV
)
538 zlog_debug("%s: client %s registers for new reg %u",
540 opq_client2str(buf
, sizeof(buf
), client
),
543 reg
->clients
= client
;
545 opq_regh_add(&opq_reg_hash
, reg
);
555 * Process a register/unregister message
557 static int handle_opq_unregistration(const struct zmsghdr
*hdr
,
561 struct zapi_opaque_reg_info info
;
562 struct opq_client_reg
*client
;
563 struct opq_msg_reg key
, *reg
;
566 memset(&info
, 0, sizeof(info
));
568 if (zapi_opaque_reg_decode(msg
, &info
) < 0) {
573 memset(&key
, 0, sizeof(key
));
575 key
.type
= info
.type
;
577 reg
= opq_regh_find(&opq_reg_hash
, &key
);
579 /* Weird: unregister for unknown message? */
580 if (IS_ZEBRA_DEBUG_RECV
)
581 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
583 zebra_route_string(info
.proto
),
584 info
.instance
, info
.session_id
, info
.type
);
588 /* Look for client */
589 for (client
= reg
->clients
; client
!= NULL
;
590 client
= client
->next
) {
591 if (opq_client_match(client
, &info
))
595 if (client
== NULL
) {
596 /* Oops - unregister for unknown client? */
597 if (IS_ZEBRA_DEBUG_RECV
)
598 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
599 __func__
, zebra_route_string(info
.proto
),
600 info
.instance
, info
.session_id
, info
.type
);
604 if (IS_ZEBRA_DEBUG_RECV
)
605 zlog_debug("%s: client %s unregisters for %u",
606 __func__
, opq_client2str(buf
, sizeof(buf
), client
),
610 client
->prev
->next
= client
->next
;
612 client
->next
->prev
= client
->prev
;
613 if (reg
->clients
== client
)
614 reg
->clients
= client
->next
;
616 opq_client_free(&client
);
618 /* Is registration empty now? */
619 if (reg
->clients
== NULL
) {
620 opq_regh_del(&opq_reg_hash
, reg
);
630 /* Compare utility for registered clients */
631 static bool opq_client_match(const struct opq_client_reg
*client
,
632 const struct zapi_opaque_reg_info
*info
)
634 if (client
->proto
== info
->proto
&&
635 client
->instance
== info
->instance
&&
636 client
->session_id
== info
->session_id
)
642 static struct opq_msg_reg
*opq_reg_lookup(uint32_t type
)
644 struct opq_msg_reg key
, *reg
;
646 memset(&key
, 0, sizeof(key
));
650 reg
= opq_regh_find(&opq_reg_hash
, &key
);
655 static struct opq_msg_reg
*opq_reg_alloc(uint32_t type
)
657 struct opq_msg_reg
*reg
;
659 reg
= XCALLOC(MTYPE_OPQ
, sizeof(struct opq_msg_reg
));
662 INIT_HASH(®
->item
);
667 static void opq_reg_free(struct opq_msg_reg
**reg
)
669 XFREE(MTYPE_OPQ
, (*reg
));
672 static struct opq_client_reg
*opq_client_alloc(
673 const struct zapi_opaque_reg_info
*info
)
675 struct opq_client_reg
*client
;
677 client
= XCALLOC(MTYPE_OPQ
, sizeof(struct opq_client_reg
));
679 client
->proto
= info
->proto
;
680 client
->instance
= info
->instance
;
681 client
->session_id
= info
->session_id
;
686 static void opq_client_free(struct opq_client_reg
**client
)
688 XFREE(MTYPE_OPQ
, (*client
));
691 static const char *opq_client2str(char *buf
, size_t buflen
,
692 const struct opq_client_reg
*client
)
696 snprintf(buf
, buflen
, "%s/%u", zebra_route_string(client
->proto
),
698 if (client
->session_id
> 0) {
699 snprintf(sbuf
, sizeof(sbuf
), "/%u", client
->session_id
);
700 strlcat(buf
, sbuf
, buflen
);
706 /* Hash function for clients registered for messages */
707 static uint32_t registration_hash(const struct opq_msg_reg
*reg
)
712 /* Comparison function for client registrations */
713 static int registration_compare(const struct opq_msg_reg
*reg1
,
714 const struct opq_msg_reg
*reg2
)
716 if (reg1
->type
== reg2
->type
)