2 * Zebra opaque message handler module
3 * Copyright (c) 2020 Volta Networks, Inc.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include "lib/debug.h"
23 #include "lib/frr_pthread.h"
24 #include "lib/stream.h"
25 #include "zebra/debug.h"
26 #include "zebra/zserv.h"
27 #include "zebra/zebra_opaque.h"
30 DEFINE_MTYPE_STATIC(ZEBRA
, OPQ
, "ZAPI Opaque Information");
32 /* Hash to hold message registration info from zapi clients */
33 PREDECL_HASH(opq_regh
);
35 /* Registered client info */
36 struct opq_client_reg
{
41 struct opq_client_reg
*next
;
42 struct opq_client_reg
*prev
;
45 /* Opaque message registration info */
47 struct opq_regh_item item
;
52 struct opq_client_reg
*clients
;
55 /* Registration helper prototypes */
56 static uint32_t registration_hash(const struct opq_msg_reg
*reg
);
57 static int registration_compare(const struct opq_msg_reg
*reg1
,
58 const struct opq_msg_reg
*reg2
);
60 DECLARE_HASH(opq_regh
, struct opq_msg_reg
, item
, registration_compare
,
63 static struct opq_regh_head opq_reg_hash
;
68 static struct zebra_opaque_globals
{
70 /* Sentinel for run or start of shutdown */
73 /* Limit number of pending, unprocessed updates */
74 _Atomic
uint32_t max_queued_updates
;
76 /* Limit number of new messages dequeued at once, to pace an
79 uint32_t msgs_per_cycle
;
81 /* Stats: counters of incoming messages, errors, and yields (when
82 * the limit has been reached.)
84 _Atomic
uint32_t msgs_in
;
85 _Atomic
uint32_t msg_errors
;
86 _Atomic
uint32_t yields
;
89 struct frr_pthread
*pthread
;
91 /* Event-delivery context 'master' for the module */
92 struct thread_master
*master
;
94 /* Event/'thread' pointer for queued zapi messages */
95 struct thread
*t_msgs
;
97 /* Input fifo queue to the module, and lock to protect it. */
98 pthread_mutex_t mutex
;
99 struct stream_fifo in_fifo
;
103 /* Name string for debugs/logs */
104 static const char LOG_NAME
[] = "Zebra Opaque";
108 /* Main event loop, processing incoming message queue */
109 static void process_messages(struct thread
*event
);
110 static int handle_opq_registration(const struct zmsghdr
*hdr
,
112 static int handle_opq_unregistration(const struct zmsghdr
*hdr
,
114 static int dispatch_opq_messages(struct stream_fifo
*msg_fifo
);
115 static struct opq_msg_reg
*opq_reg_lookup(uint32_t type
);
116 static bool opq_client_match(const struct opq_client_reg
*client
,
117 const struct zapi_opaque_reg_info
*info
);
118 static struct opq_msg_reg
*opq_reg_alloc(uint32_t type
);
119 static void opq_reg_free(struct opq_msg_reg
**reg
);
120 static struct opq_client_reg
*opq_client_alloc(
121 const struct zapi_opaque_reg_info
*info
);
122 static void opq_client_free(struct opq_client_reg
**client
);
123 static const char *opq_client2str(char *buf
, size_t buflen
,
124 const struct opq_client_reg
*client
);
127 * Initialize the module at startup
129 void zebra_opaque_init(void)
131 memset(&zo_info
, 0, sizeof(zo_info
));
133 pthread_mutex_init(&zo_info
.mutex
, NULL
);
134 stream_fifo_init(&zo_info
.in_fifo
);
136 zo_info
.msgs_per_cycle
= ZEBRA_OPAQUE_MSG_LIMIT
;
140 * Start the module pthread. This step is run later than the
141 * 'init' step, in case zebra has fork-ed.
143 void zebra_opaque_start(void)
145 struct frr_pthread_attr pattr
= {
146 .start
= frr_pthread_attr_default
.start
,
147 .stop
= frr_pthread_attr_default
.stop
150 if (IS_ZEBRA_DEBUG_EVENT
)
151 zlog_debug("%s module starting", LOG_NAME
);
154 zo_info
.pthread
= frr_pthread_new(&pattr
, "Zebra Opaque thread",
157 /* Associate event 'master' */
158 zo_info
.master
= zo_info
.pthread
->master
;
160 atomic_store_explicit(&zo_info
.run
, 1, memory_order_relaxed
);
162 /* Enqueue an initial event for the pthread */
163 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
166 /* And start the pthread */
167 frr_pthread_run(zo_info
.pthread
, NULL
);
171 * Module stop, halting the dedicated pthread; called from the main pthread.
173 void zebra_opaque_stop(void)
175 if (IS_ZEBRA_DEBUG_EVENT
)
176 zlog_debug("%s module stop", LOG_NAME
);
178 atomic_store_explicit(&zo_info
.run
, 0, memory_order_relaxed
);
180 frr_pthread_stop(zo_info
.pthread
, NULL
);
182 frr_pthread_destroy(zo_info
.pthread
);
184 if (IS_ZEBRA_DEBUG_EVENT
)
185 zlog_debug("%s module stop complete", LOG_NAME
);
189 * Module final cleanup, called from the zebra main pthread.
191 void zebra_opaque_finish(void)
193 struct opq_msg_reg
*reg
;
194 struct opq_client_reg
*client
;
196 if (IS_ZEBRA_DEBUG_EVENT
)
197 zlog_debug("%s module shutdown", LOG_NAME
);
199 /* Clear out registration info */
200 while ((reg
= opq_regh_pop(&opq_reg_hash
)) != NULL
) {
201 client
= reg
->clients
;
203 reg
->clients
= client
->next
;
204 opq_client_free(&client
);
205 client
= reg
->clients
;
211 opq_regh_fini(&opq_reg_hash
);
213 pthread_mutex_destroy(&zo_info
.mutex
);
214 stream_fifo_deinit(&zo_info
.in_fifo
);
218 * Does this module handle (intercept) the specified zapi message type?
220 bool zebra_opaque_handles_msgid(uint16_t id
)
225 case ZEBRA_OPAQUE_MESSAGE
:
226 case ZEBRA_OPAQUE_REGISTER
:
227 case ZEBRA_OPAQUE_UNREGISTER
:
238 * Enqueue a batch of messages for processing - this is the public api
239 * used from the zapi processing threads.
241 uint32_t zebra_opaque_enqueue_batch(struct stream_fifo
*batch
)
243 uint32_t counter
= 0;
246 /* Dequeue messages from the incoming batch, and save them
247 * on the module fifo.
249 frr_with_mutex(&zo_info
.mutex
) {
250 msg
= stream_fifo_pop(batch
);
252 stream_fifo_push(&zo_info
.in_fifo
, msg
);
254 msg
= stream_fifo_pop(batch
);
258 /* Schedule module pthread to process the batch */
260 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
261 zlog_debug("%s: received %u messages",
263 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
271 * Pthread event loop, process the incoming message queue.
273 static void process_messages(struct thread
*event
)
275 struct stream_fifo fifo
;
278 bool need_resched
= false;
280 stream_fifo_init(&fifo
);
282 /* Check for zebra shutdown */
283 if (atomic_load_explicit(&zo_info
.run
, memory_order_relaxed
) == 0)
287 * Dequeue some messages from the incoming queue, temporarily
288 * save them on the local fifo
290 frr_with_mutex(&zo_info
.mutex
) {
292 for (i
= 0; i
< zo_info
.msgs_per_cycle
; i
++) {
293 msg
= stream_fifo_pop(&zo_info
.in_fifo
);
297 stream_fifo_push(&fifo
, msg
);
301 * We may need to reschedule, if there are still
304 if (stream_fifo_head(&zo_info
.in_fifo
) != NULL
)
309 atomic_fetch_add_explicit(&zo_info
.msgs_in
, i
, memory_order_relaxed
);
311 /* Check for zebra shutdown */
312 if (atomic_load_explicit(&zo_info
.run
, memory_order_relaxed
) == 0) {
313 need_resched
= false;
317 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
318 zlog_debug("%s: processing %u messages", __func__
, i
);
321 * Process the messages from the temporary fifo. We send the whole
322 * fifo so that we can take advantage of batching internally. Note
323 * that registration/deregistration messages are handled here also.
325 dispatch_opq_messages(&fifo
);
330 atomic_fetch_add_explicit(&zo_info
.yields
, 1,
331 memory_order_relaxed
);
332 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
336 /* This will also free any leftover messages, in the shutdown case */
337 stream_fifo_deinit(&fifo
);
341 * Process (dispatch) or drop opaque messages.
343 static int dispatch_opq_messages(struct stream_fifo
*msg_fifo
)
345 struct stream
*msg
, *dup
;
347 struct zapi_opaque_msg info
;
348 struct opq_msg_reg
*reg
;
350 struct opq_client_reg
*client
;
351 struct zserv
*zclient
;
354 while ((msg
= stream_fifo_pop(msg_fifo
)) != NULL
) {
355 zapi_parse_header(msg
, &hdr
);
356 hdr
.length
-= ZEBRA_HEADER_SIZE
;
358 /* Handle client registration messages */
359 if (hdr
.command
== ZEBRA_OPAQUE_REGISTER
) {
360 handle_opq_registration(&hdr
, msg
);
362 } else if (hdr
.command
== ZEBRA_OPAQUE_UNREGISTER
) {
363 handle_opq_unregistration(&hdr
, msg
);
367 /* We only process OPAQUE messages - drop anything else */
368 if (hdr
.command
!= ZEBRA_OPAQUE_MESSAGE
)
371 /* Dispatch to any registered ZAPI client(s) */
373 /* Extract subtype and flags */
374 ret
= zclient_opaque_decode(msg
, &info
);
378 /* Look up registered ZAPI client(s) */
379 reg
= opq_reg_lookup(info
.type
);
381 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
382 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
383 __func__
, info
.type
, info
.flags
);
387 /* Reset read pointer, since we'll be re-sending message */
388 stream_set_getp(msg
, 0);
390 /* Send a copy of the message to all registered clients */
391 for (client
= reg
->clients
; client
; client
= client
->next
) {
394 if (CHECK_FLAG(info
.flags
, ZAPI_OPAQUE_FLAG_UNICAST
)) {
396 if (client
->proto
!= info
.proto
||
397 client
->instance
!= info
.instance
||
398 client
->session_id
!= info
.session_id
)
401 if (IS_ZEBRA_DEBUG_RECV
&&
402 IS_ZEBRA_DEBUG_DETAIL
)
403 zlog_debug("%s: found matching unicast client %s",
410 /* Copy message if more clients */
412 dup
= stream_dup(msg
);
416 * TODO -- this isn't ideal: we're going through an
417 * acquire/release cycle for each client for each
418 * message. Replace this with a batching version.
420 zclient
= zserv_acquire_client(client
->proto
,
424 if (IS_ZEBRA_DEBUG_SEND
&&
425 IS_ZEBRA_DEBUG_DETAIL
)
426 zlog_debug("%s: sending %s to client %s",
428 (dup
? "dup" : "msg"),
434 * Sending a message actually means enqueuing
435 * it for a zapi io pthread to send - so we
436 * don't touch the message after this call.
438 zserv_send_message(zclient
, dup
? dup
: msg
);
444 zserv_release_client(zclient
);
446 if (IS_ZEBRA_DEBUG_RECV
&&
447 IS_ZEBRA_DEBUG_DETAIL
)
448 zlog_debug("%s: type %u: no zclient for %s",
453 /* Registered but gone? */
458 /* If unicast, we're done */
459 if (CHECK_FLAG(info
.flags
, ZAPI_OPAQUE_FLAG_UNICAST
))
473 * Process a register/unregister message
475 static int handle_opq_registration(const struct zmsghdr
*hdr
,
479 struct zapi_opaque_reg_info info
;
480 struct opq_client_reg
*client
;
481 struct opq_msg_reg key
, *reg
;
484 memset(&info
, 0, sizeof(info
));
486 if (zapi_opaque_reg_decode(msg
, &info
) < 0) {
491 memset(&key
, 0, sizeof(key
));
493 key
.type
= info
.type
;
495 reg
= opq_regh_find(&opq_reg_hash
, &key
);
497 /* Look for dup client */
498 for (client
= reg
->clients
; client
!= NULL
;
499 client
= client
->next
) {
500 if (opq_client_match(client
, &info
))
505 /* Oops - duplicate registration? */
506 if (IS_ZEBRA_DEBUG_RECV
)
507 zlog_debug("%s: duplicate opq reg for client %s",
509 opq_client2str(buf
, sizeof(buf
),
514 client
= opq_client_alloc(&info
);
516 if (IS_ZEBRA_DEBUG_RECV
)
517 zlog_debug("%s: client %s registers for %u",
519 opq_client2str(buf
, sizeof(buf
), client
),
522 /* Link client into registration */
523 client
->next
= reg
->clients
;
525 reg
->clients
->prev
= client
;
526 reg
->clients
= client
;
529 * No existing registrations - create one, add the
530 * client, and add registration to hash.
532 reg
= opq_reg_alloc(info
.type
);
533 client
= opq_client_alloc(&info
);
535 if (IS_ZEBRA_DEBUG_RECV
)
536 zlog_debug("%s: client %s registers for new reg %u",
538 opq_client2str(buf
, sizeof(buf
), client
),
541 reg
->clients
= client
;
543 opq_regh_add(&opq_reg_hash
, reg
);
553 * Process a register/unregister message
555 static int handle_opq_unregistration(const struct zmsghdr
*hdr
,
559 struct zapi_opaque_reg_info info
;
560 struct opq_client_reg
*client
;
561 struct opq_msg_reg key
, *reg
;
564 memset(&info
, 0, sizeof(info
));
566 if (zapi_opaque_reg_decode(msg
, &info
) < 0) {
571 memset(&key
, 0, sizeof(key
));
573 key
.type
= info
.type
;
575 reg
= opq_regh_find(&opq_reg_hash
, &key
);
577 /* Weird: unregister for unknown message? */
578 if (IS_ZEBRA_DEBUG_RECV
)
579 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
581 zebra_route_string(info
.proto
),
582 info
.instance
, info
.session_id
, info
.type
);
586 /* Look for client */
587 for (client
= reg
->clients
; client
!= NULL
;
588 client
= client
->next
) {
589 if (opq_client_match(client
, &info
))
593 if (client
== NULL
) {
594 /* Oops - unregister for unknown client? */
595 if (IS_ZEBRA_DEBUG_RECV
)
596 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
597 __func__
, zebra_route_string(info
.proto
),
598 info
.instance
, info
.session_id
, info
.type
);
602 if (IS_ZEBRA_DEBUG_RECV
)
603 zlog_debug("%s: client %s unregisters for %u",
604 __func__
, opq_client2str(buf
, sizeof(buf
), client
),
608 client
->prev
->next
= client
->next
;
610 client
->next
->prev
= client
->prev
;
611 if (reg
->clients
== client
)
612 reg
->clients
= client
->next
;
614 opq_client_free(&client
);
616 /* Is registration empty now? */
617 if (reg
->clients
== NULL
) {
618 opq_regh_del(&opq_reg_hash
, reg
);
628 /* Compare utility for registered clients */
629 static bool opq_client_match(const struct opq_client_reg
*client
,
630 const struct zapi_opaque_reg_info
*info
)
632 if (client
->proto
== info
->proto
&&
633 client
->instance
== info
->instance
&&
634 client
->session_id
== info
->session_id
)
640 static struct opq_msg_reg
*opq_reg_lookup(uint32_t type
)
642 struct opq_msg_reg key
, *reg
;
644 memset(&key
, 0, sizeof(key
));
648 reg
= opq_regh_find(&opq_reg_hash
, &key
);
653 static struct opq_msg_reg
*opq_reg_alloc(uint32_t type
)
655 struct opq_msg_reg
*reg
;
657 reg
= XCALLOC(MTYPE_OPQ
, sizeof(struct opq_msg_reg
));
660 INIT_HASH(®
->item
);
665 static void opq_reg_free(struct opq_msg_reg
**reg
)
667 XFREE(MTYPE_OPQ
, (*reg
));
670 static struct opq_client_reg
*opq_client_alloc(
671 const struct zapi_opaque_reg_info
*info
)
673 struct opq_client_reg
*client
;
675 client
= XCALLOC(MTYPE_OPQ
, sizeof(struct opq_client_reg
));
677 client
->proto
= info
->proto
;
678 client
->instance
= info
->instance
;
679 client
->session_id
= info
->session_id
;
684 static void opq_client_free(struct opq_client_reg
**client
)
686 XFREE(MTYPE_OPQ
, (*client
));
689 static const char *opq_client2str(char *buf
, size_t buflen
,
690 const struct opq_client_reg
*client
)
694 snprintf(buf
, buflen
, "%s/%u", zebra_route_string(client
->proto
),
696 if (client
->session_id
> 0) {
697 snprintf(sbuf
, sizeof(sbuf
), "/%u", client
->session_id
);
698 strlcat(buf
, sbuf
, buflen
);
704 /* Hash function for clients registered for messages */
705 static uint32_t registration_hash(const struct opq_msg_reg
*reg
)
710 /* Comparison function for client registrations */
711 static int registration_compare(const struct opq_msg_reg
*reg1
,
712 const struct opq_msg_reg
*reg2
)
714 if (reg1
->type
== reg2
->type
)