2 * Zebra opaque message handler module
3 * Copyright (c) 2020 Volta Networks, Inc.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include "lib/debug.h"
23 #include "lib/frr_pthread.h"
24 #include "lib/stream.h"
25 #include "zebra/debug.h"
26 #include "zebra/zserv.h"
27 #include "zebra/zebra_opaque.h"
28 #include "zebra/rib.h"
31 DEFINE_MTYPE_STATIC(ZEBRA
, OPQ
, "ZAPI Opaque Information");
33 /* Hash to hold message registration info from zapi clients */
34 PREDECL_HASH(opq_regh
);
36 /* Registered client info */
37 struct opq_client_reg
{
42 struct opq_client_reg
*next
;
43 struct opq_client_reg
*prev
;
46 /* Opaque message registration info */
48 struct opq_regh_item item
;
53 struct opq_client_reg
*clients
;
56 /* Registration helper prototypes */
57 static uint32_t registration_hash(const struct opq_msg_reg
*reg
);
58 static int registration_compare(const struct opq_msg_reg
*reg1
,
59 const struct opq_msg_reg
*reg2
);
61 DECLARE_HASH(opq_regh
, struct opq_msg_reg
, item
, registration_compare
,
64 static struct opq_regh_head opq_reg_hash
;
69 static struct zebra_opaque_globals
{
71 /* Sentinel for run or start of shutdown */
74 /* Limit number of pending, unprocessed updates */
75 _Atomic
uint32_t max_queued_updates
;
77 /* Limit number of new messages dequeued at once, to pace an
80 uint32_t msgs_per_cycle
;
82 /* Stats: counters of incoming messages, errors, and yields (when
83 * the limit has been reached.)
85 _Atomic
uint32_t msgs_in
;
86 _Atomic
uint32_t msg_errors
;
87 _Atomic
uint32_t yields
;
90 struct frr_pthread
*pthread
;
92 /* Event-delivery context 'master' for the module */
93 struct thread_master
*master
;
95 /* Event/'thread' pointer for queued zapi messages */
96 struct thread
*t_msgs
;
98 /* Input fifo queue to the module, and lock to protect it. */
99 pthread_mutex_t mutex
;
100 struct stream_fifo in_fifo
;
104 /* Name string for debugs/logs */
105 static const char LOG_NAME
[] = "Zebra Opaque";
109 /* Main event loop, processing incoming message queue */
110 static void process_messages(struct thread
*event
);
111 static int handle_opq_registration(const struct zmsghdr
*hdr
,
113 static int handle_opq_unregistration(const struct zmsghdr
*hdr
,
115 static int dispatch_opq_messages(struct stream_fifo
*msg_fifo
);
116 static struct opq_msg_reg
*opq_reg_lookup(uint32_t type
);
117 static bool opq_client_match(const struct opq_client_reg
*client
,
118 const struct zapi_opaque_reg_info
*info
);
119 static struct opq_msg_reg
*opq_reg_alloc(uint32_t type
);
120 static void opq_reg_free(struct opq_msg_reg
**reg
);
121 static struct opq_client_reg
*opq_client_alloc(
122 const struct zapi_opaque_reg_info
*info
);
123 static void opq_client_free(struct opq_client_reg
**client
);
124 static const char *opq_client2str(char *buf
, size_t buflen
,
125 const struct opq_client_reg
*client
);
128 * Initialize the module at startup
130 void zebra_opaque_init(void)
132 memset(&zo_info
, 0, sizeof(zo_info
));
134 pthread_mutex_init(&zo_info
.mutex
, NULL
);
135 stream_fifo_init(&zo_info
.in_fifo
);
137 zo_info
.msgs_per_cycle
= ZEBRA_OPAQUE_MSG_LIMIT
;
141 * Start the module pthread. This step is run later than the
142 * 'init' step, in case zebra has fork-ed.
144 void zebra_opaque_start(void)
146 struct frr_pthread_attr pattr
= {
147 .start
= frr_pthread_attr_default
.start
,
148 .stop
= frr_pthread_attr_default
.stop
151 if (IS_ZEBRA_DEBUG_EVENT
)
152 zlog_debug("%s module starting", LOG_NAME
);
155 zo_info
.pthread
= frr_pthread_new(&pattr
, "Zebra Opaque thread",
158 /* Associate event 'master' */
159 zo_info
.master
= zo_info
.pthread
->master
;
161 atomic_store_explicit(&zo_info
.run
, 1, memory_order_relaxed
);
163 /* Enqueue an initial event for the pthread */
164 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
167 /* And start the pthread */
168 frr_pthread_run(zo_info
.pthread
, NULL
);
172 * Module stop, halting the dedicated pthread; called from the main pthread.
174 void zebra_opaque_stop(void)
176 if (IS_ZEBRA_DEBUG_EVENT
)
177 zlog_debug("%s module stop", LOG_NAME
);
179 atomic_store_explicit(&zo_info
.run
, 0, memory_order_relaxed
);
181 frr_pthread_stop(zo_info
.pthread
, NULL
);
183 frr_pthread_destroy(zo_info
.pthread
);
185 if (IS_ZEBRA_DEBUG_EVENT
)
186 zlog_debug("%s module stop complete", LOG_NAME
);
190 * Module final cleanup, called from the zebra main pthread.
192 void zebra_opaque_finish(void)
194 struct opq_msg_reg
*reg
;
195 struct opq_client_reg
*client
;
197 if (IS_ZEBRA_DEBUG_EVENT
)
198 zlog_debug("%s module shutdown", LOG_NAME
);
200 /* Clear out registration info */
201 while ((reg
= opq_regh_pop(&opq_reg_hash
)) != NULL
) {
202 client
= reg
->clients
;
204 reg
->clients
= client
->next
;
205 opq_client_free(&client
);
206 client
= reg
->clients
;
212 opq_regh_fini(&opq_reg_hash
);
214 pthread_mutex_destroy(&zo_info
.mutex
);
215 stream_fifo_deinit(&zo_info
.in_fifo
);
219 * Does this module handle (intercept) the specified zapi message type?
221 bool zebra_opaque_handles_msgid(uint16_t id
)
226 case ZEBRA_OPAQUE_MESSAGE
:
227 case ZEBRA_OPAQUE_REGISTER
:
228 case ZEBRA_OPAQUE_UNREGISTER
:
239 * Enqueue a batch of messages for processing - this is the public api
240 * used from the zapi processing threads.
242 uint32_t zebra_opaque_enqueue_batch(struct stream_fifo
*batch
)
244 uint32_t counter
= 0;
247 /* Dequeue messages from the incoming batch, and save them
248 * on the module fifo.
250 frr_with_mutex(&zo_info
.mutex
) {
251 msg
= stream_fifo_pop(batch
);
253 stream_fifo_push(&zo_info
.in_fifo
, msg
);
255 msg
= stream_fifo_pop(batch
);
259 /* Schedule module pthread to process the batch */
261 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
262 zlog_debug("%s: received %u messages",
264 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
272 * Pthread event loop, process the incoming message queue.
274 static void process_messages(struct thread
*event
)
276 struct stream_fifo fifo
;
279 bool need_resched
= false;
281 stream_fifo_init(&fifo
);
283 /* Check for zebra shutdown */
284 if (atomic_load_explicit(&zo_info
.run
, memory_order_relaxed
) == 0)
288 * Dequeue some messages from the incoming queue, temporarily
289 * save them on the local fifo
291 frr_with_mutex(&zo_info
.mutex
) {
293 for (i
= 0; i
< zo_info
.msgs_per_cycle
; i
++) {
294 msg
= stream_fifo_pop(&zo_info
.in_fifo
);
298 stream_fifo_push(&fifo
, msg
);
302 * We may need to reschedule, if there are still
305 if (stream_fifo_head(&zo_info
.in_fifo
) != NULL
)
310 atomic_fetch_add_explicit(&zo_info
.msgs_in
, i
, memory_order_relaxed
);
312 /* Check for zebra shutdown */
313 if (atomic_load_explicit(&zo_info
.run
, memory_order_relaxed
) == 0) {
314 need_resched
= false;
318 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
319 zlog_debug("%s: processing %u messages", __func__
, i
);
322 * Process the messages from the temporary fifo. We send the whole
323 * fifo so that we can take advantage of batching internally. Note
324 * that registration/deregistration messages are handled here also.
326 dispatch_opq_messages(&fifo
);
331 atomic_fetch_add_explicit(&zo_info
.yields
, 1,
332 memory_order_relaxed
);
333 thread_add_event(zo_info
.master
, process_messages
, NULL
, 0,
337 /* This will also free any leftover messages, in the shutdown case */
338 stream_fifo_deinit(&fifo
);
342 * Process (dispatch) or drop opaque messages.
344 static int dispatch_opq_messages(struct stream_fifo
*msg_fifo
)
346 struct stream
*msg
, *dup
;
348 struct zapi_opaque_msg info
;
349 struct opq_msg_reg
*reg
;
351 struct opq_client_reg
*client
;
352 struct zserv
*zclient
;
355 while ((msg
= stream_fifo_pop(msg_fifo
)) != NULL
) {
356 zapi_parse_header(msg
, &hdr
);
357 hdr
.length
-= ZEBRA_HEADER_SIZE
;
359 /* Handle client registration messages */
360 if (hdr
.command
== ZEBRA_OPAQUE_REGISTER
) {
361 handle_opq_registration(&hdr
, msg
);
363 } else if (hdr
.command
== ZEBRA_OPAQUE_UNREGISTER
) {
364 handle_opq_unregistration(&hdr
, msg
);
368 /* We only process OPAQUE messages - drop anything else */
369 if (hdr
.command
!= ZEBRA_OPAQUE_MESSAGE
)
372 /* Dispatch to any registered ZAPI client(s) */
374 /* Extract subtype and flags */
375 ret
= zclient_opaque_decode(msg
, &info
);
379 /* Look up registered ZAPI client(s) */
380 reg
= opq_reg_lookup(info
.type
);
382 if (IS_ZEBRA_DEBUG_RECV
&& IS_ZEBRA_DEBUG_DETAIL
)
383 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
384 __func__
, info
.type
, info
.flags
);
388 /* Reset read pointer, since we'll be re-sending message */
389 stream_set_getp(msg
, 0);
391 /* Send a copy of the message to all registered clients */
392 for (client
= reg
->clients
; client
; client
= client
->next
) {
395 if (CHECK_FLAG(info
.flags
, ZAPI_OPAQUE_FLAG_UNICAST
)) {
397 if (client
->proto
!= info
.proto
||
398 client
->instance
!= info
.instance
||
399 client
->session_id
!= info
.session_id
)
402 if (IS_ZEBRA_DEBUG_RECV
&&
403 IS_ZEBRA_DEBUG_DETAIL
)
404 zlog_debug("%s: found matching unicast client %s",
411 /* Copy message if more clients */
413 dup
= stream_dup(msg
);
417 * TODO -- this isn't ideal: we're going through an
418 * acquire/release cycle for each client for each
419 * message. Replace this with a batching version.
421 zclient
= zserv_acquire_client(client
->proto
,
425 if (IS_ZEBRA_DEBUG_SEND
&&
426 IS_ZEBRA_DEBUG_DETAIL
)
427 zlog_debug("%s: sending %s to client %s",
429 (dup
? "dup" : "msg"),
435 * Sending a message actually means enqueuing
436 * it for a zapi io pthread to send - so we
437 * don't touch the message after this call.
439 zserv_send_message(zclient
, dup
? dup
: msg
);
445 zserv_release_client(zclient
);
447 if (IS_ZEBRA_DEBUG_RECV
&&
448 IS_ZEBRA_DEBUG_DETAIL
)
449 zlog_debug("%s: type %u: no zclient for %s",
454 /* Registered but gone? */
459 /* If unicast, we're done */
460 if (CHECK_FLAG(info
.flags
, ZAPI_OPAQUE_FLAG_UNICAST
))
474 * Process a register/unregister message
476 static int handle_opq_registration(const struct zmsghdr
*hdr
,
480 struct zapi_opaque_reg_info info
;
481 struct opq_client_reg
*client
;
482 struct opq_msg_reg key
, *reg
;
485 memset(&info
, 0, sizeof(info
));
487 if (zapi_opaque_reg_decode(msg
, &info
) < 0) {
492 memset(&key
, 0, sizeof(key
));
494 key
.type
= info
.type
;
496 reg
= opq_regh_find(&opq_reg_hash
, &key
);
498 /* Look for dup client */
499 for (client
= reg
->clients
; client
!= NULL
;
500 client
= client
->next
) {
501 if (opq_client_match(client
, &info
))
506 /* Oops - duplicate registration? */
507 if (IS_ZEBRA_DEBUG_RECV
)
508 zlog_debug("%s: duplicate opq reg for client %s",
510 opq_client2str(buf
, sizeof(buf
),
515 client
= opq_client_alloc(&info
);
517 if (IS_ZEBRA_DEBUG_RECV
)
518 zlog_debug("%s: client %s registers for %u",
520 opq_client2str(buf
, sizeof(buf
), client
),
523 /* Link client into registration */
524 client
->next
= reg
->clients
;
526 reg
->clients
->prev
= client
;
527 reg
->clients
= client
;
530 * No existing registrations - create one, add the
531 * client, and add registration to hash.
533 reg
= opq_reg_alloc(info
.type
);
534 client
= opq_client_alloc(&info
);
536 if (IS_ZEBRA_DEBUG_RECV
)
537 zlog_debug("%s: client %s registers for new reg %u",
539 opq_client2str(buf
, sizeof(buf
), client
),
542 reg
->clients
= client
;
544 opq_regh_add(&opq_reg_hash
, reg
);
554 * Process a register/unregister message
556 static int handle_opq_unregistration(const struct zmsghdr
*hdr
,
560 struct zapi_opaque_reg_info info
;
561 struct opq_client_reg
*client
;
562 struct opq_msg_reg key
, *reg
;
565 memset(&info
, 0, sizeof(info
));
567 if (zapi_opaque_reg_decode(msg
, &info
) < 0) {
572 memset(&key
, 0, sizeof(key
));
574 key
.type
= info
.type
;
576 reg
= opq_regh_find(&opq_reg_hash
, &key
);
578 /* Weird: unregister for unknown message? */
579 if (IS_ZEBRA_DEBUG_RECV
)
580 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
582 zebra_route_string(info
.proto
),
583 info
.instance
, info
.session_id
, info
.type
);
587 /* Look for client */
588 for (client
= reg
->clients
; client
!= NULL
;
589 client
= client
->next
) {
590 if (opq_client_match(client
, &info
))
594 if (client
== NULL
) {
595 /* Oops - unregister for unknown client? */
596 if (IS_ZEBRA_DEBUG_RECV
)
597 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
598 __func__
, zebra_route_string(info
.proto
),
599 info
.instance
, info
.session_id
, info
.type
);
603 if (IS_ZEBRA_DEBUG_RECV
)
604 zlog_debug("%s: client %s unregisters for %u",
605 __func__
, opq_client2str(buf
, sizeof(buf
), client
),
609 client
->prev
->next
= client
->next
;
611 client
->next
->prev
= client
->prev
;
612 if (reg
->clients
== client
)
613 reg
->clients
= client
->next
;
615 opq_client_free(&client
);
617 /* Is registration empty now? */
618 if (reg
->clients
== NULL
) {
619 opq_regh_del(&opq_reg_hash
, reg
);
629 /* Compare utility for registered clients */
630 static bool opq_client_match(const struct opq_client_reg
*client
,
631 const struct zapi_opaque_reg_info
*info
)
633 if (client
->proto
== info
->proto
&&
634 client
->instance
== info
->instance
&&
635 client
->session_id
== info
->session_id
)
641 static struct opq_msg_reg
*opq_reg_lookup(uint32_t type
)
643 struct opq_msg_reg key
, *reg
;
645 memset(&key
, 0, sizeof(key
));
649 reg
= opq_regh_find(&opq_reg_hash
, &key
);
654 static struct opq_msg_reg
*opq_reg_alloc(uint32_t type
)
656 struct opq_msg_reg
*reg
;
658 reg
= XCALLOC(MTYPE_OPQ
, sizeof(struct opq_msg_reg
));
661 INIT_HASH(®
->item
);
666 static void opq_reg_free(struct opq_msg_reg
**reg
)
668 XFREE(MTYPE_OPQ
, (*reg
));
671 static struct opq_client_reg
*opq_client_alloc(
672 const struct zapi_opaque_reg_info
*info
)
674 struct opq_client_reg
*client
;
676 client
= XCALLOC(MTYPE_OPQ
, sizeof(struct opq_client_reg
));
678 client
->proto
= info
->proto
;
679 client
->instance
= info
->instance
;
680 client
->session_id
= info
->session_id
;
685 static void opq_client_free(struct opq_client_reg
**client
)
687 XFREE(MTYPE_OPQ
, (*client
));
690 static const char *opq_client2str(char *buf
, size_t buflen
,
691 const struct opq_client_reg
*client
)
695 snprintf(buf
, buflen
, "%s/%u", zebra_route_string(client
->proto
),
697 if (client
->session_id
> 0) {
698 snprintf(sbuf
, sizeof(sbuf
), "/%u", client
->session_id
);
699 strlcat(buf
, sbuf
, buflen
);
705 /* Hash function for clients registered for messages */
706 static uint32_t registration_hash(const struct opq_msg_reg
*reg
)
711 /* Comparison function for client registrations */
712 static int registration_compare(const struct opq_msg_reg
*reg1
,
713 const struct opq_msg_reg
*reg2
)
715 if (reg1
->type
== reg2
->type
)