]> git.proxmox.com Git - mirror_frr.git/blame - zebra/zebra_opaque.c
Merge pull request #8639 from idryzhov/isis-new-bfd-lib
[mirror_frr.git] / zebra / zebra_opaque.c
CommitLineData
9bb02389
MS
1/*
2 * Zebra opaque message handler module
3 * Copyright (c) 2020 Volta Networks, Inc.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20
21#include <zebra.h>
22#include "lib/debug.h"
23#include "lib/frr_pthread.h"
24#include "lib/stream.h"
25#include "zebra/debug.h"
26#include "zebra/zserv.h"
27#include "zebra/zebra_opaque.h"
28
6703a038
MS
29/* Mem type */
30DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
31
32/* Hash to hold message registration info from zapi clients */
33PREDECL_HASH(opq_regh);
34
35/* Registered client info */
36struct opq_client_reg {
37 int proto;
38 int instance;
39 uint32_t session_id;
40
41 struct opq_client_reg *next;
42 struct opq_client_reg *prev;
43};
44
45/* Opaque message registration info */
46struct opq_msg_reg {
47 struct opq_regh_item item;
48
49 /* Message type */
50 uint32_t type;
51
52 struct opq_client_reg *clients;
53};
54
55/* Registration helper prototypes */
56static uint32_t registration_hash(const struct opq_msg_reg *reg);
57static int registration_compare(const struct opq_msg_reg *reg1,
58 const struct opq_msg_reg *reg2);
59
60DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
61 registration_hash);
62
63static struct opq_regh_head opq_reg_hash;
64
9bb02389
MS
65/*
66 * Globals
67 */
68static struct zebra_opaque_globals {
69
70 /* Sentinel for run or start of shutdown */
71 _Atomic uint32_t run;
72
73 /* Limit number of pending, unprocessed updates */
74 _Atomic uint32_t max_queued_updates;
75
76 /* Limit number of new messages dequeued at once, to pace an
77 * incoming burst.
78 */
79 uint32_t msgs_per_cycle;
80
81 /* Stats: counters of incoming messages, errors, and yields (when
82 * the limit has been reached.)
83 */
84 _Atomic uint32_t msgs_in;
85 _Atomic uint32_t msg_errors;
86 _Atomic uint32_t yields;
87
88 /* pthread */
89 struct frr_pthread *pthread;
90
91 /* Event-delivery context 'master' for the module */
92 struct thread_master *master;
93
94 /* Event/'thread' pointer for queued zapi messages */
95 struct thread *t_msgs;
96
97 /* Input fifo queue to the module, and lock to protect it. */
98 pthread_mutex_t mutex;
99 struct stream_fifo in_fifo;
100
101} zo_info;
102
103/* Name string for debugs/logs */
104static const char LOG_NAME[] = "Zebra Opaque";
105
106/* Prototypes */
107
108/* Main event loop, processing incoming message queue */
109static int process_messages(struct thread *event);
6703a038
MS
110static int handle_opq_registration(const struct zmsghdr *hdr,
111 struct stream *msg);
112static int handle_opq_unregistration(const struct zmsghdr *hdr,
113 struct stream *msg);
114static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
115static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
116static bool opq_client_match(const struct opq_client_reg *client,
117 const struct zapi_opaque_reg_info *info);
118static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
119static void opq_reg_free(struct opq_msg_reg **reg);
120static struct opq_client_reg *opq_client_alloc(
121 const struct zapi_opaque_reg_info *info);
122static void opq_client_free(struct opq_client_reg **client);
123static const char *opq_client2str(char *buf, size_t buflen,
124 const struct opq_client_reg *client);
9bb02389
MS
125
126/*
127 * Initialize the module at startup
128 */
129void zebra_opaque_init(void)
130{
131 memset(&zo_info, 0, sizeof(zo_info));
132
133 pthread_mutex_init(&zo_info.mutex, NULL);
134 stream_fifo_init(&zo_info.in_fifo);
135
136 zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
137}
138
139/*
140 * Start the module pthread. This step is run later than the
141 * 'init' step, in case zebra has fork-ed.
142 */
143void zebra_opaque_start(void)
144{
145 struct frr_pthread_attr pattr = {
146 .start = frr_pthread_attr_default.start,
147 .stop = frr_pthread_attr_default.stop
148 };
149
150 if (IS_ZEBRA_DEBUG_EVENT)
151 zlog_debug("%s module starting", LOG_NAME);
152
153 /* Start pthread */
154 zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
155 "zebra_opaque");
156
157 /* Associate event 'master' */
158 zo_info.master = zo_info.pthread->master;
159
160 atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
161
162 /* Enqueue an initial event for the pthread */
163 thread_add_event(zo_info.master, process_messages, NULL, 0,
164 &zo_info.t_msgs);
165
166 /* And start the pthread */
167 frr_pthread_run(zo_info.pthread, NULL);
168}
169
170/*
171 * Module stop, halting the dedicated pthread; called from the main pthread.
172 */
173void zebra_opaque_stop(void)
174{
175 if (IS_ZEBRA_DEBUG_EVENT)
176 zlog_debug("%s module stop", LOG_NAME);
177
178 atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
179
180 frr_pthread_stop(zo_info.pthread, NULL);
181
182 frr_pthread_destroy(zo_info.pthread);
183
184 if (IS_ZEBRA_DEBUG_EVENT)
185 zlog_debug("%s module stop complete", LOG_NAME);
186}
187
188/*
189 * Module final cleanup, called from the zebra main pthread.
190 */
191void zebra_opaque_finish(void)
192{
6703a038
MS
193 struct opq_msg_reg *reg;
194 struct opq_client_reg *client;
195
9bb02389
MS
196 if (IS_ZEBRA_DEBUG_EVENT)
197 zlog_debug("%s module shutdown", LOG_NAME);
198
6703a038
MS
199 /* Clear out registration info */
200 while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
201 client = reg->clients;
202 while (client) {
203 reg->clients = client->next;
204 opq_client_free(&client);
205 client = reg->clients;
206 }
207
208 opq_reg_free(&reg);
209 }
210
211 opq_regh_fini(&opq_reg_hash);
212
9bb02389
MS
213 pthread_mutex_destroy(&zo_info.mutex);
214 stream_fifo_deinit(&zo_info.in_fifo);
215}
216
217/*
218 * Does this module handle (intercept) the specified zapi message type?
219 */
220bool zebra_opaque_handles_msgid(uint16_t id)
221{
222 bool ret = false;
223
224 switch (id) {
225 case ZEBRA_OPAQUE_MESSAGE:
226 case ZEBRA_OPAQUE_REGISTER:
227 case ZEBRA_OPAQUE_UNREGISTER:
228 ret = true;
229 break;
230 default:
231 break;
232 }
233
234 return ret;
235}
236
237/*
238 * Enqueue a batch of messages for processing - this is the public api
239 * used from the zapi processing threads.
240 */
241uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
242{
243 uint32_t counter = 0;
244 struct stream *msg;
245
246 /* Dequeue messages from the incoming batch, and save them
247 * on the module fifo.
248 */
249 frr_with_mutex(&zo_info.mutex) {
250 msg = stream_fifo_pop(batch);
251 while (msg) {
252 stream_fifo_push(&zo_info.in_fifo, msg);
253 counter++;
254 msg = stream_fifo_pop(batch);
255 }
256 }
257
258 /* Schedule module pthread to process the batch */
259 if (counter > 0) {
674afc2b 260 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
9bb02389
MS
261 zlog_debug("%s: received %u messages",
262 __func__, counter);
263 thread_add_event(zo_info.master, process_messages, NULL, 0,
264 &zo_info.t_msgs);
265 }
266
267 return counter;
268}
269
270/*
271 * Pthread event loop, process the incoming message queue.
272 */
273static int process_messages(struct thread *event)
274{
275 struct stream_fifo fifo;
276 struct stream *msg;
277 uint32_t i;
278 bool need_resched = false;
279
280 stream_fifo_init(&fifo);
281
282 /* Check for zebra shutdown */
283 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
284 goto done;
285
6703a038
MS
286 /*
287 * Dequeue some messages from the incoming queue, temporarily
9bb02389
MS
288 * save them on the local fifo
289 */
290 frr_with_mutex(&zo_info.mutex) {
291
292 for (i = 0; i < zo_info.msgs_per_cycle; i++) {
293 msg = stream_fifo_pop(&zo_info.in_fifo);
294 if (msg == NULL)
295 break;
296
297 stream_fifo_push(&fifo, msg);
298 }
299
6703a038
MS
300 /*
301 * We may need to reschedule, if there are still
9bb02389
MS
302 * queued messages
303 */
304 if (stream_fifo_head(&zo_info.in_fifo) != NULL)
305 need_resched = true;
306 }
307
308 /* Update stats */
309 atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
310
311 /* Check for zebra shutdown */
312 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
313 need_resched = false;
314 goto done;
315 }
316
674afc2b 317 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
9bb02389
MS
318 zlog_debug("%s: processing %u messages", __func__, i);
319
6703a038
MS
320 /*
321 * Process the messages from the temporary fifo. We send the whole
322 * fifo so that we can take advantage of batching internally. Note
323 * that registration/deregistration messages are handled here also.
324 */
325 dispatch_opq_messages(&fifo);
9bb02389
MS
326
327done:
328
329 if (need_resched) {
330 atomic_fetch_add_explicit(&zo_info.yields, 1,
331 memory_order_relaxed);
332 thread_add_event(zo_info.master, process_messages, NULL, 0,
333 &zo_info.t_msgs);
334 }
335
336 /* This will also free any leftover messages, in the shutdown case */
337 stream_fifo_deinit(&fifo);
338
339 return 0;
340}
6703a038
MS
341
342/*
343 * Process (dispatch) or drop opaque messages.
344 */
345static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
346{
347 struct stream *msg, *dup;
348 struct zmsghdr hdr;
387831ff 349 struct zapi_opaque_msg info;
6703a038 350 struct opq_msg_reg *reg;
387831ff 351 int ret;
6703a038
MS
352 struct opq_client_reg *client;
353 struct zserv *zclient;
354 char buf[50];
355
356 while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
357 zapi_parse_header(msg, &hdr);
358 hdr.length -= ZEBRA_HEADER_SIZE;
359
360 /* Handle client registration messages */
361 if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
362 handle_opq_registration(&hdr, msg);
363 continue;
364 } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
365 handle_opq_unregistration(&hdr, msg);
366 continue;
367 }
368
369 /* We only process OPAQUE messages - drop anything else */
370 if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
371 goto drop_it;
372
373 /* Dispatch to any registered ZAPI client(s) */
374
387831ff
MS
375 /* Extract subtype and flags */
376 ret = zclient_opaque_decode(msg, &info);
377 if (ret != 0)
378 goto drop_it;
6703a038
MS
379
380 /* Look up registered ZAPI client(s) */
387831ff 381 reg = opq_reg_lookup(info.type);
6703a038 382 if (reg == NULL) {
674afc2b 383 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
387831ff
MS
384 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
385 __func__, info.type, info.flags);
6703a038
MS
386 goto drop_it;
387 }
388
389 /* Reset read pointer, since we'll be re-sending message */
390 stream_set_getp(msg, 0);
391
392 /* Send a copy of the message to all registered clients */
393 for (client = reg->clients; client; client = client->next) {
394 dup = NULL;
395
c8b27f2a
MS
396 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
397
398 if (client->proto != info.proto ||
399 client->instance != info.instance ||
400 client->session_id != info.session_id)
401 continue;
402
674afc2b
MS
403 if (IS_ZEBRA_DEBUG_RECV &&
404 IS_ZEBRA_DEBUG_DETAIL)
c8b27f2a
MS
405 zlog_debug("%s: found matching unicast client %s",
406 __func__,
407 opq_client2str(buf,
408 sizeof(buf),
409 client));
410
411 } else {
412 /* Copy message if more clients */
413 if (client->next)
414 dup = stream_dup(msg);
415 }
6703a038
MS
416
417 /*
418 * TODO -- this isn't ideal: we're going through an
419 * acquire/release cycle for each client for each
420 * message. Replace this with a batching version.
421 */
422 zclient = zserv_acquire_client(client->proto,
423 client->instance,
424 client->session_id);
425 if (zclient) {
674afc2b
MS
426 if (IS_ZEBRA_DEBUG_SEND &&
427 IS_ZEBRA_DEBUG_DETAIL)
6703a038
MS
428 zlog_debug("%s: sending %s to client %s",
429 __func__,
430 (dup ? "dup" : "msg"),
431 opq_client2str(buf,
432 sizeof(buf),
433 client));
434
435 /*
436 * Sending a message actually means enqueuing
437 * it for a zapi io pthread to send - so we
438 * don't touch the message after this call.
439 */
440 zserv_send_message(zclient, dup ? dup : msg);
441 if (dup)
442 dup = NULL;
443 else
444 msg = NULL;
445
446 zserv_release_client(zclient);
447 } else {
674afc2b
MS
448 if (IS_ZEBRA_DEBUG_RECV &&
449 IS_ZEBRA_DEBUG_DETAIL)
6703a038 450 zlog_debug("%s: type %u: no zclient for %s",
387831ff 451 __func__, info.type,
6703a038
MS
452 opq_client2str(buf,
453 sizeof(buf),
454 client));
455 /* Registered but gone? */
456 if (dup)
457 stream_free(dup);
458 }
c8b27f2a
MS
459
460 /* If unicast, we're done */
461 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST))
462 break;
6703a038
MS
463 }
464
465drop_it:
387831ff 466
6703a038
MS
467 if (msg)
468 stream_free(msg);
469 }
470
471 return 0;
472}
473
474/*
475 * Process a register/unregister message
476 */
477static int handle_opq_registration(const struct zmsghdr *hdr,
478 struct stream *msg)
479{
480 int ret = 0;
481 struct zapi_opaque_reg_info info;
482 struct opq_client_reg *client;
483 struct opq_msg_reg key, *reg;
484 char buf[50];
485
486 memset(&info, 0, sizeof(info));
487
d2ddc141 488 if (zapi_opaque_reg_decode(msg, &info) < 0) {
6703a038
MS
489 ret = -1;
490 goto done;
491 }
492
493 memset(&key, 0, sizeof(key));
494
495 key.type = info.type;
496
497 reg = opq_regh_find(&opq_reg_hash, &key);
498 if (reg) {
499 /* Look for dup client */
500 for (client = reg->clients; client != NULL;
501 client = client->next) {
502 if (opq_client_match(client, &info))
503 break;
504 }
505
506 if (client) {
507 /* Oops - duplicate registration? */
508 if (IS_ZEBRA_DEBUG_RECV)
509 zlog_debug("%s: duplicate opq reg for client %s",
510 __func__,
511 opq_client2str(buf, sizeof(buf),
512 client));
513 goto done;
514 }
515
516 client = opq_client_alloc(&info);
517
518 if (IS_ZEBRA_DEBUG_RECV)
519 zlog_debug("%s: client %s registers for %u",
520 __func__,
521 opq_client2str(buf, sizeof(buf), client),
522 info.type);
523
524 /* Link client into registration */
525 client->next = reg->clients;
526 if (reg->clients)
527 reg->clients->prev = client;
528 reg->clients = client;
529 } else {
530 /*
531 * No existing registrations - create one, add the
532 * client, and add registration to hash.
533 */
534 reg = opq_reg_alloc(info.type);
535 client = opq_client_alloc(&info);
536
537 if (IS_ZEBRA_DEBUG_RECV)
538 zlog_debug("%s: client %s registers for new reg %u",
539 __func__,
540 opq_client2str(buf, sizeof(buf), client),
541 info.type);
542
543 reg->clients = client;
544
545 opq_regh_add(&opq_reg_hash, reg);
546 }
547
548done:
549
550 stream_free(msg);
551 return ret;
552}
553
554/*
555 * Process a register/unregister message
556 */
557static int handle_opq_unregistration(const struct zmsghdr *hdr,
558 struct stream *msg)
559{
560 int ret = 0;
561 struct zapi_opaque_reg_info info;
562 struct opq_client_reg *client;
563 struct opq_msg_reg key, *reg;
564 char buf[50];
565
566 memset(&info, 0, sizeof(info));
567
d2ddc141 568 if (zapi_opaque_reg_decode(msg, &info) < 0) {
6703a038
MS
569 ret = -1;
570 goto done;
571 }
572
573 memset(&key, 0, sizeof(key));
574
575 key.type = info.type;
576
577 reg = opq_regh_find(&opq_reg_hash, &key);
578 if (reg == NULL) {
579 /* Weird: unregister for unknown message? */
580 if (IS_ZEBRA_DEBUG_RECV)
581 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
582 __func__,
583 zebra_route_string(info.proto),
584 info.instance, info.session_id, info.type);
585 goto done;
586 }
587
588 /* Look for client */
589 for (client = reg->clients; client != NULL;
590 client = client->next) {
591 if (opq_client_match(client, &info))
592 break;
593 }
594
595 if (client == NULL) {
596 /* Oops - unregister for unknown client? */
597 if (IS_ZEBRA_DEBUG_RECV)
598 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
599 __func__, zebra_route_string(info.proto),
600 info.instance, info.session_id, info.type);
601 goto done;
602 }
603
604 if (IS_ZEBRA_DEBUG_RECV)
605 zlog_debug("%s: client %s unregisters for %u",
606 __func__, opq_client2str(buf, sizeof(buf), client),
607 info.type);
608
609 if (client->prev)
610 client->prev->next = client->next;
611 if (client->next)
612 client->next->prev = client->prev;
613 if (reg->clients == client)
614 reg->clients = client->next;
615
616 opq_client_free(&client);
617
618 /* Is registration empty now? */
619 if (reg->clients == NULL) {
6703a038
MS
620 opq_regh_del(&opq_reg_hash, reg);
621 opq_reg_free(&reg);
622 }
623
624done:
625
626 stream_free(msg);
627 return ret;
628}
629
630/* Compare utility for registered clients */
631static bool opq_client_match(const struct opq_client_reg *client,
632 const struct zapi_opaque_reg_info *info)
633{
634 if (client->proto == info->proto &&
635 client->instance == info->instance &&
636 client->session_id == info->session_id)
637 return true;
638 else
639 return false;
640}
641
642static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
643{
644 struct opq_msg_reg key, *reg;
645
646 memset(&key, 0, sizeof(key));
647
648 key.type = type;
649
650 reg = opq_regh_find(&opq_reg_hash, &key);
651
652 return reg;
653}
654
655static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
656{
657 struct opq_msg_reg *reg;
658
659 reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
660
661 reg->type = type;
662 INIT_HASH(&reg->item);
663
664 return reg;
665}
666
667static void opq_reg_free(struct opq_msg_reg **reg)
668{
669 XFREE(MTYPE_OPQ, (*reg));
670}
671
672static struct opq_client_reg *opq_client_alloc(
673 const struct zapi_opaque_reg_info *info)
674{
675 struct opq_client_reg *client;
676
677 client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
678
679 client->proto = info->proto;
680 client->instance = info->instance;
681 client->session_id = info->session_id;
682
683 return client;
684}
685
686static void opq_client_free(struct opq_client_reg **client)
687{
688 XFREE(MTYPE_OPQ, (*client));
689}
690
691static const char *opq_client2str(char *buf, size_t buflen,
692 const struct opq_client_reg *client)
693{
694 char sbuf[20];
695
696 snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
697 client->instance);
698 if (client->session_id > 0) {
699 snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
700 strlcat(buf, sbuf, buflen);
701 }
702
703 return buf;
704}
705
706/* Hash function for clients registered for messages */
707static uint32_t registration_hash(const struct opq_msg_reg *reg)
708{
709 return reg->type;
710}
711
712/* Comparison function for client registrations */
713static int registration_compare(const struct opq_msg_reg *reg1,
714 const struct opq_msg_reg *reg2)
715{
716 if (reg1->type == reg2->type)
717 return 0;
718 else
719 return -1;
720}