]> git.proxmox.com Git - mirror_frr.git/blame - zebra/zebra_opaque.c
Merge pull request #13386 from donaldsharp/bgp_received_routes
[mirror_frr.git] / zebra / zebra_opaque.c
CommitLineData
acddc0ed 1// SPDX-License-Identifier: GPL-2.0-or-later
9bb02389
MS
2/*
3 * Zebra opaque message handler module
4 * Copyright (c) 2020 Volta Networks, Inc.
9bb02389
MS
5 */
6
7
8#include <zebra.h>
9#include "lib/debug.h"
10#include "lib/frr_pthread.h"
11#include "lib/stream.h"
12#include "zebra/debug.h"
13#include "zebra/zserv.h"
14#include "zebra/zebra_opaque.h"
7ca9c407 15#include "zebra/rib.h"
9bb02389 16
6703a038
MS
17/* Mem type */
18DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
19
20/* Hash to hold message registration info from zapi clients */
21PREDECL_HASH(opq_regh);
22
23/* Registered client info */
24struct opq_client_reg {
25 int proto;
26 int instance;
27 uint32_t session_id;
28
29 struct opq_client_reg *next;
30 struct opq_client_reg *prev;
31};
32
33/* Opaque message registration info */
34struct opq_msg_reg {
35 struct opq_regh_item item;
36
37 /* Message type */
38 uint32_t type;
39
40 struct opq_client_reg *clients;
41};
42
43/* Registration helper prototypes */
44static uint32_t registration_hash(const struct opq_msg_reg *reg);
45static int registration_compare(const struct opq_msg_reg *reg1,
46 const struct opq_msg_reg *reg2);
47
48DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
49 registration_hash);
50
51static struct opq_regh_head opq_reg_hash;
52
9bb02389
MS
53/*
54 * Globals
55 */
56static struct zebra_opaque_globals {
57
58 /* Sentinel for run or start of shutdown */
59 _Atomic uint32_t run;
60
61 /* Limit number of pending, unprocessed updates */
62 _Atomic uint32_t max_queued_updates;
63
64 /* Limit number of new messages dequeued at once, to pace an
65 * incoming burst.
66 */
67 uint32_t msgs_per_cycle;
68
69 /* Stats: counters of incoming messages, errors, and yields (when
70 * the limit has been reached.)
71 */
72 _Atomic uint32_t msgs_in;
73 _Atomic uint32_t msg_errors;
74 _Atomic uint32_t yields;
75
76 /* pthread */
77 struct frr_pthread *pthread;
78
79 /* Event-delivery context 'master' for the module */
cd9d0537 80 struct event_loop *master;
9bb02389
MS
81
82 /* Event/'thread' pointer for queued zapi messages */
e6685141 83 struct event *t_msgs;
9bb02389
MS
84
85 /* Input fifo queue to the module, and lock to protect it. */
86 pthread_mutex_t mutex;
87 struct stream_fifo in_fifo;
88
89} zo_info;
90
91/* Name string for debugs/logs */
92static const char LOG_NAME[] = "Zebra Opaque";
93
94/* Prototypes */
95
96/* Main event loop, processing incoming message queue */
e6685141 97static void process_messages(struct event *event);
6703a038
MS
98static int handle_opq_registration(const struct zmsghdr *hdr,
99 struct stream *msg);
100static int handle_opq_unregistration(const struct zmsghdr *hdr,
101 struct stream *msg);
102static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
103static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
104static bool opq_client_match(const struct opq_client_reg *client,
105 const struct zapi_opaque_reg_info *info);
106static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
107static void opq_reg_free(struct opq_msg_reg **reg);
108static struct opq_client_reg *opq_client_alloc(
109 const struct zapi_opaque_reg_info *info);
110static void opq_client_free(struct opq_client_reg **client);
111static const char *opq_client2str(char *buf, size_t buflen,
112 const struct opq_client_reg *client);
9bb02389
MS
113
114/*
115 * Initialize the module at startup
116 */
117void zebra_opaque_init(void)
118{
119 memset(&zo_info, 0, sizeof(zo_info));
120
121 pthread_mutex_init(&zo_info.mutex, NULL);
122 stream_fifo_init(&zo_info.in_fifo);
123
124 zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
125}
126
127/*
128 * Start the module pthread. This step is run later than the
129 * 'init' step, in case zebra has fork-ed.
130 */
131void zebra_opaque_start(void)
132{
133 struct frr_pthread_attr pattr = {
134 .start = frr_pthread_attr_default.start,
135 .stop = frr_pthread_attr_default.stop
136 };
137
138 if (IS_ZEBRA_DEBUG_EVENT)
139 zlog_debug("%s module starting", LOG_NAME);
140
141 /* Start pthread */
142 zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
143 "zebra_opaque");
144
145 /* Associate event 'master' */
146 zo_info.master = zo_info.pthread->master;
147
148 atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
149
150 /* Enqueue an initial event for the pthread */
907a2395
DS
151 event_add_event(zo_info.master, process_messages, NULL, 0,
152 &zo_info.t_msgs);
9bb02389
MS
153
154 /* And start the pthread */
155 frr_pthread_run(zo_info.pthread, NULL);
156}
157
158/*
159 * Module stop, halting the dedicated pthread; called from the main pthread.
160 */
161void zebra_opaque_stop(void)
162{
163 if (IS_ZEBRA_DEBUG_EVENT)
164 zlog_debug("%s module stop", LOG_NAME);
165
166 atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
167
168 frr_pthread_stop(zo_info.pthread, NULL);
169
170 frr_pthread_destroy(zo_info.pthread);
171
172 if (IS_ZEBRA_DEBUG_EVENT)
173 zlog_debug("%s module stop complete", LOG_NAME);
174}
175
176/*
177 * Module final cleanup, called from the zebra main pthread.
178 */
179void zebra_opaque_finish(void)
180{
6703a038
MS
181 struct opq_msg_reg *reg;
182 struct opq_client_reg *client;
183
9bb02389
MS
184 if (IS_ZEBRA_DEBUG_EVENT)
185 zlog_debug("%s module shutdown", LOG_NAME);
186
6703a038
MS
187 /* Clear out registration info */
188 while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
189 client = reg->clients;
190 while (client) {
191 reg->clients = client->next;
192 opq_client_free(&client);
193 client = reg->clients;
194 }
195
196 opq_reg_free(&reg);
197 }
198
199 opq_regh_fini(&opq_reg_hash);
200
9bb02389
MS
201 pthread_mutex_destroy(&zo_info.mutex);
202 stream_fifo_deinit(&zo_info.in_fifo);
203}
204
205/*
206 * Does this module handle (intercept) the specified zapi message type?
207 */
208bool zebra_opaque_handles_msgid(uint16_t id)
209{
210 bool ret = false;
211
212 switch (id) {
213 case ZEBRA_OPAQUE_MESSAGE:
214 case ZEBRA_OPAQUE_REGISTER:
215 case ZEBRA_OPAQUE_UNREGISTER:
216 ret = true;
217 break;
218 default:
219 break;
220 }
221
222 return ret;
223}
224
225/*
226 * Enqueue a batch of messages for processing - this is the public api
227 * used from the zapi processing threads.
228 */
229uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
230{
231 uint32_t counter = 0;
232 struct stream *msg;
233
234 /* Dequeue messages from the incoming batch, and save them
235 * on the module fifo.
236 */
cb1991af 237 frr_with_mutex (&zo_info.mutex) {
9bb02389
MS
238 msg = stream_fifo_pop(batch);
239 while (msg) {
240 stream_fifo_push(&zo_info.in_fifo, msg);
241 counter++;
242 msg = stream_fifo_pop(batch);
243 }
244 }
245
246 /* Schedule module pthread to process the batch */
247 if (counter > 0) {
674afc2b 248 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
9bb02389
MS
249 zlog_debug("%s: received %u messages",
250 __func__, counter);
907a2395
DS
251 event_add_event(zo_info.master, process_messages, NULL, 0,
252 &zo_info.t_msgs);
9bb02389
MS
253 }
254
255 return counter;
256}
257
258/*
259 * Pthread event loop, process the incoming message queue.
260 */
e6685141 261static void process_messages(struct event *event)
9bb02389
MS
262{
263 struct stream_fifo fifo;
264 struct stream *msg;
265 uint32_t i;
266 bool need_resched = false;
267
268 stream_fifo_init(&fifo);
269
270 /* Check for zebra shutdown */
271 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
272 goto done;
273
6703a038
MS
274 /*
275 * Dequeue some messages from the incoming queue, temporarily
9bb02389
MS
276 * save them on the local fifo
277 */
cb1991af 278 frr_with_mutex (&zo_info.mutex) {
9bb02389
MS
279
280 for (i = 0; i < zo_info.msgs_per_cycle; i++) {
281 msg = stream_fifo_pop(&zo_info.in_fifo);
282 if (msg == NULL)
283 break;
284
285 stream_fifo_push(&fifo, msg);
286 }
287
6703a038
MS
288 /*
289 * We may need to reschedule, if there are still
9bb02389
MS
290 * queued messages
291 */
292 if (stream_fifo_head(&zo_info.in_fifo) != NULL)
293 need_resched = true;
294 }
295
296 /* Update stats */
297 atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
298
299 /* Check for zebra shutdown */
300 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
301 need_resched = false;
302 goto done;
303 }
304
674afc2b 305 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
9bb02389
MS
306 zlog_debug("%s: processing %u messages", __func__, i);
307
6703a038
MS
308 /*
309 * Process the messages from the temporary fifo. We send the whole
310 * fifo so that we can take advantage of batching internally. Note
311 * that registration/deregistration messages are handled here also.
312 */
313 dispatch_opq_messages(&fifo);
9bb02389
MS
314
315done:
316
317 if (need_resched) {
318 atomic_fetch_add_explicit(&zo_info.yields, 1,
319 memory_order_relaxed);
907a2395
DS
320 event_add_event(zo_info.master, process_messages, NULL, 0,
321 &zo_info.t_msgs);
9bb02389
MS
322 }
323
324 /* This will also free any leftover messages, in the shutdown case */
325 stream_fifo_deinit(&fifo);
9bb02389 326}
6703a038
MS
327
328/*
329 * Process (dispatch) or drop opaque messages.
330 */
331static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
332{
333 struct stream *msg, *dup;
334 struct zmsghdr hdr;
387831ff 335 struct zapi_opaque_msg info;
6703a038 336 struct opq_msg_reg *reg;
387831ff 337 int ret;
6703a038
MS
338 struct opq_client_reg *client;
339 struct zserv *zclient;
340 char buf[50];
341
342 while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
343 zapi_parse_header(msg, &hdr);
344 hdr.length -= ZEBRA_HEADER_SIZE;
345
346 /* Handle client registration messages */
347 if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
348 handle_opq_registration(&hdr, msg);
349 continue;
350 } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
351 handle_opq_unregistration(&hdr, msg);
352 continue;
353 }
354
355 /* We only process OPAQUE messages - drop anything else */
356 if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
357 goto drop_it;
358
359 /* Dispatch to any registered ZAPI client(s) */
360
387831ff
MS
361 /* Extract subtype and flags */
362 ret = zclient_opaque_decode(msg, &info);
363 if (ret != 0)
364 goto drop_it;
6703a038
MS
365
366 /* Look up registered ZAPI client(s) */
387831ff 367 reg = opq_reg_lookup(info.type);
6703a038 368 if (reg == NULL) {
674afc2b 369 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
387831ff
MS
370 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
371 __func__, info.type, info.flags);
6703a038
MS
372 goto drop_it;
373 }
374
375 /* Reset read pointer, since we'll be re-sending message */
376 stream_set_getp(msg, 0);
377
378 /* Send a copy of the message to all registered clients */
379 for (client = reg->clients; client; client = client->next) {
380 dup = NULL;
381
c8b27f2a
MS
382 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
383
384 if (client->proto != info.proto ||
385 client->instance != info.instance ||
386 client->session_id != info.session_id)
387 continue;
388
674afc2b
MS
389 if (IS_ZEBRA_DEBUG_RECV &&
390 IS_ZEBRA_DEBUG_DETAIL)
c8b27f2a
MS
391 zlog_debug("%s: found matching unicast client %s",
392 __func__,
393 opq_client2str(buf,
394 sizeof(buf),
395 client));
396
397 } else {
398 /* Copy message if more clients */
399 if (client->next)
400 dup = stream_dup(msg);
401 }
6703a038
MS
402
403 /*
404 * TODO -- this isn't ideal: we're going through an
405 * acquire/release cycle for each client for each
406 * message. Replace this with a batching version.
407 */
408 zclient = zserv_acquire_client(client->proto,
409 client->instance,
410 client->session_id);
411 if (zclient) {
674afc2b
MS
412 if (IS_ZEBRA_DEBUG_SEND &&
413 IS_ZEBRA_DEBUG_DETAIL)
6703a038
MS
414 zlog_debug("%s: sending %s to client %s",
415 __func__,
416 (dup ? "dup" : "msg"),
417 opq_client2str(buf,
418 sizeof(buf),
419 client));
420
421 /*
422 * Sending a message actually means enqueuing
423 * it for a zapi io pthread to send - so we
424 * don't touch the message after this call.
425 */
426 zserv_send_message(zclient, dup ? dup : msg);
427 if (dup)
428 dup = NULL;
429 else
430 msg = NULL;
431
432 zserv_release_client(zclient);
433 } else {
674afc2b
MS
434 if (IS_ZEBRA_DEBUG_RECV &&
435 IS_ZEBRA_DEBUG_DETAIL)
6703a038 436 zlog_debug("%s: type %u: no zclient for %s",
387831ff 437 __func__, info.type,
6703a038
MS
438 opq_client2str(buf,
439 sizeof(buf),
440 client));
441 /* Registered but gone? */
442 if (dup)
443 stream_free(dup);
444 }
c8b27f2a
MS
445
446 /* If unicast, we're done */
447 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST))
448 break;
6703a038
MS
449 }
450
451drop_it:
387831ff 452
6703a038
MS
453 if (msg)
454 stream_free(msg);
455 }
456
457 return 0;
458}
459
460/*
461 * Process a register/unregister message
462 */
463static int handle_opq_registration(const struct zmsghdr *hdr,
464 struct stream *msg)
465{
466 int ret = 0;
467 struct zapi_opaque_reg_info info;
468 struct opq_client_reg *client;
469 struct opq_msg_reg key, *reg;
470 char buf[50];
471
472 memset(&info, 0, sizeof(info));
473
d2ddc141 474 if (zapi_opaque_reg_decode(msg, &info) < 0) {
6703a038
MS
475 ret = -1;
476 goto done;
477 }
478
479 memset(&key, 0, sizeof(key));
480
481 key.type = info.type;
482
483 reg = opq_regh_find(&opq_reg_hash, &key);
484 if (reg) {
485 /* Look for dup client */
486 for (client = reg->clients; client != NULL;
487 client = client->next) {
488 if (opq_client_match(client, &info))
489 break;
490 }
491
492 if (client) {
493 /* Oops - duplicate registration? */
494 if (IS_ZEBRA_DEBUG_RECV)
495 zlog_debug("%s: duplicate opq reg for client %s",
496 __func__,
497 opq_client2str(buf, sizeof(buf),
498 client));
499 goto done;
500 }
501
502 client = opq_client_alloc(&info);
503
504 if (IS_ZEBRA_DEBUG_RECV)
505 zlog_debug("%s: client %s registers for %u",
506 __func__,
507 opq_client2str(buf, sizeof(buf), client),
508 info.type);
509
510 /* Link client into registration */
511 client->next = reg->clients;
512 if (reg->clients)
513 reg->clients->prev = client;
514 reg->clients = client;
515 } else {
516 /*
517 * No existing registrations - create one, add the
518 * client, and add registration to hash.
519 */
520 reg = opq_reg_alloc(info.type);
521 client = opq_client_alloc(&info);
522
523 if (IS_ZEBRA_DEBUG_RECV)
524 zlog_debug("%s: client %s registers for new reg %u",
525 __func__,
526 opq_client2str(buf, sizeof(buf), client),
527 info.type);
528
529 reg->clients = client;
530
531 opq_regh_add(&opq_reg_hash, reg);
532 }
533
534done:
535
536 stream_free(msg);
537 return ret;
538}
539
540/*
541 * Process a register/unregister message
542 */
543static int handle_opq_unregistration(const struct zmsghdr *hdr,
544 struct stream *msg)
545{
546 int ret = 0;
547 struct zapi_opaque_reg_info info;
548 struct opq_client_reg *client;
549 struct opq_msg_reg key, *reg;
550 char buf[50];
551
552 memset(&info, 0, sizeof(info));
553
d2ddc141 554 if (zapi_opaque_reg_decode(msg, &info) < 0) {
6703a038
MS
555 ret = -1;
556 goto done;
557 }
558
559 memset(&key, 0, sizeof(key));
560
561 key.type = info.type;
562
563 reg = opq_regh_find(&opq_reg_hash, &key);
564 if (reg == NULL) {
565 /* Weird: unregister for unknown message? */
566 if (IS_ZEBRA_DEBUG_RECV)
567 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
568 __func__,
569 zebra_route_string(info.proto),
570 info.instance, info.session_id, info.type);
571 goto done;
572 }
573
574 /* Look for client */
575 for (client = reg->clients; client != NULL;
576 client = client->next) {
577 if (opq_client_match(client, &info))
578 break;
579 }
580
581 if (client == NULL) {
582 /* Oops - unregister for unknown client? */
583 if (IS_ZEBRA_DEBUG_RECV)
584 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
585 __func__, zebra_route_string(info.proto),
586 info.instance, info.session_id, info.type);
587 goto done;
588 }
589
590 if (IS_ZEBRA_DEBUG_RECV)
591 zlog_debug("%s: client %s unregisters for %u",
592 __func__, opq_client2str(buf, sizeof(buf), client),
593 info.type);
594
595 if (client->prev)
596 client->prev->next = client->next;
597 if (client->next)
598 client->next->prev = client->prev;
599 if (reg->clients == client)
600 reg->clients = client->next;
601
602 opq_client_free(&client);
603
604 /* Is registration empty now? */
605 if (reg->clients == NULL) {
6703a038
MS
606 opq_regh_del(&opq_reg_hash, reg);
607 opq_reg_free(&reg);
608 }
609
610done:
611
612 stream_free(msg);
613 return ret;
614}
615
616/* Compare utility for registered clients */
617static bool opq_client_match(const struct opq_client_reg *client,
618 const struct zapi_opaque_reg_info *info)
619{
620 if (client->proto == info->proto &&
621 client->instance == info->instance &&
622 client->session_id == info->session_id)
623 return true;
624 else
625 return false;
626}
627
628static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
629{
630 struct opq_msg_reg key, *reg;
631
632 memset(&key, 0, sizeof(key));
633
634 key.type = type;
635
636 reg = opq_regh_find(&opq_reg_hash, &key);
637
638 return reg;
639}
640
641static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
642{
643 struct opq_msg_reg *reg;
644
645 reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
646
647 reg->type = type;
648 INIT_HASH(&reg->item);
649
650 return reg;
651}
652
653static void opq_reg_free(struct opq_msg_reg **reg)
654{
655 XFREE(MTYPE_OPQ, (*reg));
656}
657
658static struct opq_client_reg *opq_client_alloc(
659 const struct zapi_opaque_reg_info *info)
660{
661 struct opq_client_reg *client;
662
663 client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
664
665 client->proto = info->proto;
666 client->instance = info->instance;
667 client->session_id = info->session_id;
668
669 return client;
670}
671
672static void opq_client_free(struct opq_client_reg **client)
673{
674 XFREE(MTYPE_OPQ, (*client));
675}
676
677static const char *opq_client2str(char *buf, size_t buflen,
678 const struct opq_client_reg *client)
679{
680 char sbuf[20];
681
682 snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
683 client->instance);
684 if (client->session_id > 0) {
685 snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
686 strlcat(buf, sbuf, buflen);
687 }
688
689 return buf;
690}
691
692/* Hash function for clients registered for messages */
693static uint32_t registration_hash(const struct opq_msg_reg *reg)
694{
695 return reg->type;
696}
697
698/* Comparison function for client registrations */
699static int registration_compare(const struct opq_msg_reg *reg1,
700 const struct opq_msg_reg *reg2)
701{
702 if (reg1->type == reg2->type)
703 return 0;
704 else
705 return -1;
706}