1 // SPDX-License-Identifier: GPL-2.0-or-later
3 * Zebra opaque message handler module
4 * Copyright (c) 2020 Volta Networks, Inc.
10 #include "lib/frr_pthread.h"
11 #include "lib/stream.h"
12 #include "zebra/debug.h"
13 #include "zebra/zserv.h"
14 #include "zebra/zebra_opaque.h"
15 #include "zebra/rib.h"
18 DEFINE_MTYPE_STATIC(ZEBRA
, OPQ
, "ZAPI Opaque Information");
20 /* Hash to hold message registration info from zapi clients */
21 PREDECL_HASH(opq_regh
);
23 /* Registered client info */
24 struct opq_client_reg
{
29 struct opq_client_reg
*next
;
30 struct opq_client_reg
*prev
;
33 /* Opaque message registration info */
35 struct opq_regh_item item
;
40 struct opq_client_reg
*clients
;
43 /* Registration helper prototypes */
44 static uint32_t registration_hash(const struct opq_msg_reg
*reg
);
45 static int registration_compare(const struct opq_msg_reg
*reg1
,
46 const struct opq_msg_reg
*reg2
);
48 DECLARE_HASH(opq_regh
, struct opq_msg_reg
, item
, registration_compare
,
51 static struct opq_regh_head opq_reg_hash
;
56 static struct zebra_opaque_globals
{
58 /* Sentinel for run or start of shutdown */
61 /* Limit number of pending, unprocessed updates */
62 _Atomic
uint32_t max_queued_updates
;
64 /* Limit number of new messages dequeued at once, to pace an
67 uint32_t msgs_per_cycle
;
69 /* Stats: counters of incoming messages, errors, and yields (when
70 * the limit has been reached.)
72 _Atomic
uint32_t msgs_in
;
73 _Atomic
uint32_t msg_errors
;
74 _Atomic
uint32_t yields
;
77 struct frr_pthread
*pthread
;
79 /* Event-delivery context 'master' for the module */
80 struct thread_master
*master
;
82 /* Event/'thread' pointer for queued zapi messages */
83 struct thread
*t_msgs
;
85 /* Input fifo queue to the module, and lock to protect it. */
86 pthread_mutex_t mutex
;
87 struct stream_fifo in_fifo
;
91 /* Name string for debugs/logs */
92 static const char LOG_NAME
[] = "Zebra Opaque";
96 /* Main event loop, processing incoming message queue */
97 static void process_messages(struct thread
*event
);
98 static int handle_opq_registration(const struct zmsghdr
*hdr
,
100 static int handle_opq_unregistration(const struct zmsghdr
*hdr
,
102 static int dispatch_opq_messages(struct stream_fifo
*msg_fifo
);
103 static struct opq_msg_reg
*opq_reg_lookup(uint32_t type
);
104 static bool opq_client_match(const struct opq_client_reg
*client
,
105 const struct zapi_opaque_reg_info
*info
);
106 static struct opq_msg_reg
*opq_reg_alloc(uint32_t type
);
107 static void opq_reg_free(struct opq_msg_reg
**reg
);
108 static struct opq_client_reg
*opq_client_alloc(
109 const struct zapi_opaque_reg_info
*info
);
110 static void opq_client_free(struct opq_client_reg
**client
);
111 static const char *opq_client2str(char *buf
, size_t buflen
,
112 const struct opq_client_reg
*client
);
115 * Initialize the module at startup
117 void zebra_opaque_init(void)
119 memset(&zo_info
, 0, sizeof(zo_info
));
121 pthread_mutex_init(&zo_info
.mutex
, NULL
);
122 stream_fifo_init(&zo_info
.in_fifo
);
124 zo_info
.msgs_per_cycle
= ZEBRA_OPAQUE_MSG_LIMIT
;
128 * Start the module pthread. This step is run later than the
129 * 'init' step, in case zebra has fork-ed.
131 void zebra_opaque_start(void)
133 struct frr_pthread_attr pattr
= {
134 .start
= frr_pthread_attr_default
.start
,
135 .stop
= frr_pthread_attr_default
.stop
138 if (IS_ZEBRA_DEBUG_EVENT
)
139 zlog_debug("%s module starting", LOG_NAME
);
142 zo_info
.pthread
= frr_pthread_new(&pattr
, "Zebra Opaque thread",
145 /* Associate event 'master' */
146 zo_info
.master
= zo_info
.pthread
->master
;
148 atomic_store_explicit(&zo_info
.run
, 1, memory_order_relaxed
);
150 /* Enqueue an initial event for the pthread */
151 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
154 /* And start the pthread */
155 frr_pthread_run(zo_info
.pthread
, NULL
);
159 * Module stop, halting the dedicated pthread; called from the main pthread.
161 void zebra_opaque_stop(void)
163 if (IS_ZEBRA_DEBUG_EVENT
)
164 zlog_debug("%s module stop", LOG_NAME
);
166 atomic_store_explicit(&zo_info
.run
, 0, memory_order_relaxed
);
168 frr_pthread_stop(zo_info
.pthread
, NULL
);
170 frr_pthread_destroy(zo_info
.pthread
);
172 if (IS_ZEBRA_DEBUG_EVENT
)
173 zlog_debug("%s module stop complete", LOG_NAME
);
177 * Module final cleanup, called from the zebra main pthread.
179 void zebra_opaque_finish(void)
181 struct opq_msg_reg
*reg
;
182 struct opq_client_reg
*client
;
184 if (IS_ZEBRA_DEBUG_EVENT
)
185 zlog_debug("%s module shutdown", LOG_NAME
);
187 /* Clear out registration info */
188 while ((reg
= opq_regh_pop(&opq_reg_hash
)) != NULL
) {
189 client
= reg
->clients
;
191 reg
->clients
= client
->next
;
192 opq_client_free(&client
);
193 client
= reg
->clients
;
199 opq_regh_fini(&opq_reg_hash
);
201 pthread_mutex_destroy(&zo_info
.mutex
);
202 stream_fifo_deinit(&zo_info
.in_fifo
);
206 * Does this module handle (intercept) the specified zapi message type?
208 bool zebra_opaque_handles_msgid(uint16_t id
)
213 case ZEBRA_OPAQUE_MESSAGE
:
214 case ZEBRA_OPAQUE_REGISTER
:
215 case ZEBRA_OPAQUE_UNREGISTER
:
226 * Enqueue a batch of messages for processing - this is the public api
227 * used from the zapi processing threads.
229 uint32_t zebra_opaque_enqueue_batch(struct stream_fifo
*batch
)
231 uint32_t counter
= 0;
234 /* Dequeue messages from the incoming batch, and save them
235 * on the module fifo.
237 frr_with_mutex (&zo_info
.mutex
) {
238 msg
= stream_fifo_pop(batch
);
240 stream_fifo_push(&zo_info
.in_fifo
, msg
);
242 msg
= stream_fifo_pop(batch
);
246 /* Schedule module pthread to process the batch */
248 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
249 zlog_debug("%s: received %u messages",
251 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
259 * Pthread event loop, process the incoming message queue.
261 static void process_messages(struct thread
*event
)
263 struct stream_fifo fifo
;
266 bool need_resched
= false;
268 stream_fifo_init(&fifo
);
270 /* Check for zebra shutdown */
271 if (atomic_load_explicit(&zo_info
.run
, memory_order_relaxed
) == 0)
275 * Dequeue some messages from the incoming queue, temporarily
276 * save them on the local fifo
278 frr_with_mutex (&zo_info
.mutex
) {
280 for (i
= 0; i
< zo_info
.msgs_per_cycle
; i
++) {
281 msg
= stream_fifo_pop(&zo_info
.in_fifo
);
285 stream_fifo_push(&fifo
, msg
);
289 * We may need to reschedule, if there are still
292 if (stream_fifo_head(&zo_info
.in_fifo
) != NULL
)
297 atomic_fetch_add_explicit(&zo_info
.msgs_in
, i
, memory_order_relaxed
);
299 /* Check for zebra shutdown */
300 if (atomic_load_explicit(&zo_info
.run
, memory_order_relaxed
) == 0) {
301 need_resched
= false;
305 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
306 zlog_debug("%s: processing %u messages", __func__
, i
);
309 * Process the messages from the temporary fifo. We send the whole
310 * fifo so that we can take advantage of batching internally. Note
311 * that registration/deregistration messages are handled here also.
313 dispatch_opq_messages(&fifo
);
318 atomic_fetch_add_explicit(&zo_info
.yields
, 1,
319 memory_order_relaxed
);
320 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
324 /* This will also free any leftover messages, in the shutdown case */
325 stream_fifo_deinit(&fifo
);
329 * Process (dispatch) or drop opaque messages.
331 static int dispatch_opq_messages(struct stream_fifo
*msg_fifo
)
333 struct stream
*msg
, *dup
;
335 struct zapi_opaque_msg info
;
336 struct opq_msg_reg
*reg
;
338 struct opq_client_reg
*client
;
339 struct zserv
*zclient
;
342 while ((msg
= stream_fifo_pop(msg_fifo
)) != NULL
) {
343 zapi_parse_header(msg
, &hdr
);
344 hdr
.length
-= ZEBRA_HEADER_SIZE
;
346 /* Handle client registration messages */
347 if (hdr
.command
== ZEBRA_OPAQUE_REGISTER
) {
348 handle_opq_registration(&hdr
, msg
);
350 } else if (hdr
.command
== ZEBRA_OPAQUE_UNREGISTER
) {
351 handle_opq_unregistration(&hdr
, msg
);
355 /* We only process OPAQUE messages - drop anything else */
356 if (hdr
.command
!= ZEBRA_OPAQUE_MESSAGE
)
359 /* Dispatch to any registered ZAPI client(s) */
361 /* Extract subtype and flags */
362 ret
= zclient_opaque_decode(msg
, &info
);
366 /* Look up registered ZAPI client(s) */
367 reg
= opq_reg_lookup(info
.type
);
369 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
370 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
371 __func__
, info
.type
, info
.flags
);
375 /* Reset read pointer, since we'll be re-sending message */
376 stream_set_getp(msg
, 0);
378 /* Send a copy of the message to all registered clients */
379 for (client
= reg
->clients
; client
; client
= client
->next
) {
382 if (CHECK_FLAG(info
.flags
, ZAPI_OPAQUE_FLAG_UNICAST
)) {
384 if (client
->proto
!= info
.proto
||
385 client
->instance
!= info
.instance
||
386 client
->session_id
!= info
.session_id
)
389 if (IS_ZEBRA_DEBUG_RECV
&&
390 IS_ZEBRA_DEBUG_DETAIL
)
391 zlog_debug("%s: found matching unicast client %s",
398 /* Copy message if more clients */
400 dup
= stream_dup(msg
);
404 * TODO -- this isn't ideal: we're going through an
405 * acquire/release cycle for each client for each
406 * message. Replace this with a batching version.
408 zclient
= zserv_acquire_client(client
->proto
,
412 if (IS_ZEBRA_DEBUG_SEND
&&
413 IS_ZEBRA_DEBUG_DETAIL
)
414 zlog_debug("%s: sending %s to client %s",
416 (dup
? "dup" : "msg"),
422 * Sending a message actually means enqueuing
423 * it for a zapi io pthread to send - so we
424 * don't touch the message after this call.
426 zserv_send_message(zclient
, dup
? dup
: msg
);
432 zserv_release_client(zclient
);
434 if (IS_ZEBRA_DEBUG_RECV
&&
435 IS_ZEBRA_DEBUG_DETAIL
)
436 zlog_debug("%s: type %u: no zclient for %s",
441 /* Registered but gone? */
446 /* If unicast, we're done */
447 if (CHECK_FLAG(info
.flags
, ZAPI_OPAQUE_FLAG_UNICAST
))
461 * Process a register/unregister message
463 static int handle_opq_registration(const struct zmsghdr
*hdr
,
467 struct zapi_opaque_reg_info info
;
468 struct opq_client_reg
*client
;
469 struct opq_msg_reg key
, *reg
;
472 memset(&info
, 0, sizeof(info
));
474 if (zapi_opaque_reg_decode(msg
, &info
) < 0) {
479 memset(&key
, 0, sizeof(key
));
481 key
.type
= info
.type
;
483 reg
= opq_regh_find(&opq_reg_hash
, &key
);
485 /* Look for dup client */
486 for (client
= reg
->clients
; client
!= NULL
;
487 client
= client
->next
) {
488 if (opq_client_match(client
, &info
))
493 /* Oops - duplicate registration? */
494 if (IS_ZEBRA_DEBUG_RECV
)
495 zlog_debug("%s: duplicate opq reg for client %s",
497 opq_client2str(buf
, sizeof(buf
),
502 client
= opq_client_alloc(&info
);
504 if (IS_ZEBRA_DEBUG_RECV
)
505 zlog_debug("%s: client %s registers for %u",
507 opq_client2str(buf
, sizeof(buf
), client
),
510 /* Link client into registration */
511 client
->next
= reg
->clients
;
513 reg
->clients
->prev
= client
;
514 reg
->clients
= client
;
517 * No existing registrations - create one, add the
518 * client, and add registration to hash.
520 reg
= opq_reg_alloc(info
.type
);
521 client
= opq_client_alloc(&info
);
523 if (IS_ZEBRA_DEBUG_RECV
)
524 zlog_debug("%s: client %s registers for new reg %u",
526 opq_client2str(buf
, sizeof(buf
), client
),
529 reg
->clients
= client
;
531 opq_regh_add(&opq_reg_hash
, reg
);
541 * Process a register/unregister message
543 static int handle_opq_unregistration(const struct zmsghdr
*hdr
,
547 struct zapi_opaque_reg_info info
;
548 struct opq_client_reg
*client
;
549 struct opq_msg_reg key
, *reg
;
552 memset(&info
, 0, sizeof(info
));
554 if (zapi_opaque_reg_decode(msg
, &info
) < 0) {
559 memset(&key
, 0, sizeof(key
));
561 key
.type
= info
.type
;
563 reg
= opq_regh_find(&opq_reg_hash
, &key
);
565 /* Weird: unregister for unknown message? */
566 if (IS_ZEBRA_DEBUG_RECV
)
567 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
569 zebra_route_string(info
.proto
),
570 info
.instance
, info
.session_id
, info
.type
);
574 /* Look for client */
575 for (client
= reg
->clients
; client
!= NULL
;
576 client
= client
->next
) {
577 if (opq_client_match(client
, &info
))
581 if (client
== NULL
) {
582 /* Oops - unregister for unknown client? */
583 if (IS_ZEBRA_DEBUG_RECV
)
584 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
585 __func__
, zebra_route_string(info
.proto
),
586 info
.instance
, info
.session_id
, info
.type
);
590 if (IS_ZEBRA_DEBUG_RECV
)
591 zlog_debug("%s: client %s unregisters for %u",
592 __func__
, opq_client2str(buf
, sizeof(buf
), client
),
596 client
->prev
->next
= client
->next
;
598 client
->next
->prev
= client
->prev
;
599 if (reg
->clients
== client
)
600 reg
->clients
= client
->next
;
602 opq_client_free(&client
);
604 /* Is registration empty now? */
605 if (reg
->clients
== NULL
) {
606 opq_regh_del(&opq_reg_hash
, reg
);
616 /* Compare utility for registered clients */
617 static bool opq_client_match(const struct opq_client_reg
*client
,
618 const struct zapi_opaque_reg_info
*info
)
620 if (client
->proto
== info
->proto
&&
621 client
->instance
== info
->instance
&&
622 client
->session_id
== info
->session_id
)
628 static struct opq_msg_reg
*opq_reg_lookup(uint32_t type
)
630 struct opq_msg_reg key
, *reg
;
632 memset(&key
, 0, sizeof(key
));
636 reg
= opq_regh_find(&opq_reg_hash
, &key
);
641 static struct opq_msg_reg
*opq_reg_alloc(uint32_t type
)
643 struct opq_msg_reg
*reg
;
645 reg
= XCALLOC(MTYPE_OPQ
, sizeof(struct opq_msg_reg
));
648 INIT_HASH(®
->item
);
653 static void opq_reg_free(struct opq_msg_reg
**reg
)
655 XFREE(MTYPE_OPQ
, (*reg
));
658 static struct opq_client_reg
*opq_client_alloc(
659 const struct zapi_opaque_reg_info
*info
)
661 struct opq_client_reg
*client
;
663 client
= XCALLOC(MTYPE_OPQ
, sizeof(struct opq_client_reg
));
665 client
->proto
= info
->proto
;
666 client
->instance
= info
->instance
;
667 client
->session_id
= info
->session_id
;
672 static void opq_client_free(struct opq_client_reg
**client
)
674 XFREE(MTYPE_OPQ
, (*client
));
677 static const char *opq_client2str(char *buf
, size_t buflen
,
678 const struct opq_client_reg
*client
)
682 snprintf(buf
, buflen
, "%s/%u", zebra_route_string(client
->proto
),
684 if (client
->session_id
> 0) {
685 snprintf(sbuf
, sizeof(sbuf
), "/%u", client
->session_id
);
686 strlcat(buf
, sbuf
, buflen
);
692 /* Hash function for clients registered for messages */
693 static uint32_t registration_hash(const struct opq_msg_reg
*reg
)
698 /* Comparison function for client registrations */
699 static int registration_compare(const struct opq_msg_reg
*reg1
,
700 const struct opq_msg_reg
*reg2
)
702 if (reg1
->type
== reg2
->type
)