]> git.proxmox.com Git - mirror_frr.git/blame - zebra/zebra_opaque.c
lib,zebra: rename opaque decode api
[mirror_frr.git] / zebra / zebra_opaque.c
CommitLineData
9bb02389
MS
1/*
2 * Zebra opaque message handler module
3 * Copyright (c) 2020 Volta Networks, Inc.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20
21#include <zebra.h>
22#include "lib/debug.h"
23#include "lib/frr_pthread.h"
24#include "lib/stream.h"
25#include "zebra/debug.h"
26#include "zebra/zserv.h"
6703a038 27#include "zebra/zebra_memory.h"
9bb02389
MS
28#include "zebra/zebra_opaque.h"
29
6703a038
MS
30/* Mem type */
31DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
32
33/* Hash to hold message registration info from zapi clients */
34PREDECL_HASH(opq_regh);
35
36/* Registered client info */
37struct opq_client_reg {
38 int proto;
39 int instance;
40 uint32_t session_id;
41
42 struct opq_client_reg *next;
43 struct opq_client_reg *prev;
44};
45
46/* Opaque message registration info */
47struct opq_msg_reg {
48 struct opq_regh_item item;
49
50 /* Message type */
51 uint32_t type;
52
53 struct opq_client_reg *clients;
54};
55
56/* Registration helper prototypes */
57static uint32_t registration_hash(const struct opq_msg_reg *reg);
58static int registration_compare(const struct opq_msg_reg *reg1,
59 const struct opq_msg_reg *reg2);
60
61DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
62 registration_hash);
63
64static struct opq_regh_head opq_reg_hash;
65
9bb02389
MS
66/*
67 * Globals
68 */
69static struct zebra_opaque_globals {
70
71 /* Sentinel for run or start of shutdown */
72 _Atomic uint32_t run;
73
74 /* Limit number of pending, unprocessed updates */
75 _Atomic uint32_t max_queued_updates;
76
77 /* Limit number of new messages dequeued at once, to pace an
78 * incoming burst.
79 */
80 uint32_t msgs_per_cycle;
81
82 /* Stats: counters of incoming messages, errors, and yields (when
83 * the limit has been reached.)
84 */
85 _Atomic uint32_t msgs_in;
86 _Atomic uint32_t msg_errors;
87 _Atomic uint32_t yields;
88
89 /* pthread */
90 struct frr_pthread *pthread;
91
92 /* Event-delivery context 'master' for the module */
93 struct thread_master *master;
94
95 /* Event/'thread' pointer for queued zapi messages */
96 struct thread *t_msgs;
97
98 /* Input fifo queue to the module, and lock to protect it. */
99 pthread_mutex_t mutex;
100 struct stream_fifo in_fifo;
101
102} zo_info;
103
104/* Name string for debugs/logs */
105static const char LOG_NAME[] = "Zebra Opaque";
106
107/* Prototypes */
108
109/* Main event loop, processing incoming message queue */
110static int process_messages(struct thread *event);
6703a038
MS
111static int handle_opq_registration(const struct zmsghdr *hdr,
112 struct stream *msg);
113static int handle_opq_unregistration(const struct zmsghdr *hdr,
114 struct stream *msg);
115static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
116static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
117static bool opq_client_match(const struct opq_client_reg *client,
118 const struct zapi_opaque_reg_info *info);
119static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
120static void opq_reg_free(struct opq_msg_reg **reg);
121static struct opq_client_reg *opq_client_alloc(
122 const struct zapi_opaque_reg_info *info);
123static void opq_client_free(struct opq_client_reg **client);
124static const char *opq_client2str(char *buf, size_t buflen,
125 const struct opq_client_reg *client);
9bb02389
MS
126
127/*
128 * Initialize the module at startup
129 */
130void zebra_opaque_init(void)
131{
132 memset(&zo_info, 0, sizeof(zo_info));
133
134 pthread_mutex_init(&zo_info.mutex, NULL);
135 stream_fifo_init(&zo_info.in_fifo);
136
137 zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
138}
139
140/*
141 * Start the module pthread. This step is run later than the
142 * 'init' step, in case zebra has fork-ed.
143 */
144void zebra_opaque_start(void)
145{
146 struct frr_pthread_attr pattr = {
147 .start = frr_pthread_attr_default.start,
148 .stop = frr_pthread_attr_default.stop
149 };
150
151 if (IS_ZEBRA_DEBUG_EVENT)
152 zlog_debug("%s module starting", LOG_NAME);
153
154 /* Start pthread */
155 zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
156 "zebra_opaque");
157
158 /* Associate event 'master' */
159 zo_info.master = zo_info.pthread->master;
160
161 atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
162
163 /* Enqueue an initial event for the pthread */
164 thread_add_event(zo_info.master, process_messages, NULL, 0,
165 &zo_info.t_msgs);
166
167 /* And start the pthread */
168 frr_pthread_run(zo_info.pthread, NULL);
169}
170
171/*
172 * Module stop, halting the dedicated pthread; called from the main pthread.
173 */
174void zebra_opaque_stop(void)
175{
176 if (IS_ZEBRA_DEBUG_EVENT)
177 zlog_debug("%s module stop", LOG_NAME);
178
179 atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
180
181 frr_pthread_stop(zo_info.pthread, NULL);
182
183 frr_pthread_destroy(zo_info.pthread);
184
185 if (IS_ZEBRA_DEBUG_EVENT)
186 zlog_debug("%s module stop complete", LOG_NAME);
187}
188
189/*
190 * Module final cleanup, called from the zebra main pthread.
191 */
192void zebra_opaque_finish(void)
193{
6703a038
MS
194 struct opq_msg_reg *reg;
195 struct opq_client_reg *client;
196
9bb02389
MS
197 if (IS_ZEBRA_DEBUG_EVENT)
198 zlog_debug("%s module shutdown", LOG_NAME);
199
6703a038
MS
200 /* Clear out registration info */
201 while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
202 client = reg->clients;
203 while (client) {
204 reg->clients = client->next;
205 opq_client_free(&client);
206 client = reg->clients;
207 }
208
209 opq_reg_free(&reg);
210 }
211
212 opq_regh_fini(&opq_reg_hash);
213
9bb02389
MS
214 pthread_mutex_destroy(&zo_info.mutex);
215 stream_fifo_deinit(&zo_info.in_fifo);
216}
217
218/*
219 * Does this module handle (intercept) the specified zapi message type?
220 */
221bool zebra_opaque_handles_msgid(uint16_t id)
222{
223 bool ret = false;
224
225 switch (id) {
226 case ZEBRA_OPAQUE_MESSAGE:
227 case ZEBRA_OPAQUE_REGISTER:
228 case ZEBRA_OPAQUE_UNREGISTER:
229 ret = true;
230 break;
231 default:
232 break;
233 }
234
235 return ret;
236}
237
238/*
239 * Enqueue a batch of messages for processing - this is the public api
240 * used from the zapi processing threads.
241 */
242uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
243{
244 uint32_t counter = 0;
245 struct stream *msg;
246
247 /* Dequeue messages from the incoming batch, and save them
248 * on the module fifo.
249 */
250 frr_with_mutex(&zo_info.mutex) {
251 msg = stream_fifo_pop(batch);
252 while (msg) {
253 stream_fifo_push(&zo_info.in_fifo, msg);
254 counter++;
255 msg = stream_fifo_pop(batch);
256 }
257 }
258
259 /* Schedule module pthread to process the batch */
260 if (counter > 0) {
261 if (IS_ZEBRA_DEBUG_RECV)
262 zlog_debug("%s: received %u messages",
263 __func__, counter);
264 thread_add_event(zo_info.master, process_messages, NULL, 0,
265 &zo_info.t_msgs);
266 }
267
268 return counter;
269}
270
271/*
272 * Pthread event loop, process the incoming message queue.
273 */
274static int process_messages(struct thread *event)
275{
276 struct stream_fifo fifo;
277 struct stream *msg;
278 uint32_t i;
279 bool need_resched = false;
280
281 stream_fifo_init(&fifo);
282
283 /* Check for zebra shutdown */
284 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
285 goto done;
286
6703a038
MS
287 /*
288 * Dequeue some messages from the incoming queue, temporarily
9bb02389
MS
289 * save them on the local fifo
290 */
291 frr_with_mutex(&zo_info.mutex) {
292
293 for (i = 0; i < zo_info.msgs_per_cycle; i++) {
294 msg = stream_fifo_pop(&zo_info.in_fifo);
295 if (msg == NULL)
296 break;
297
298 stream_fifo_push(&fifo, msg);
299 }
300
6703a038
MS
301 /*
302 * We may need to reschedule, if there are still
9bb02389
MS
303 * queued messages
304 */
305 if (stream_fifo_head(&zo_info.in_fifo) != NULL)
306 need_resched = true;
307 }
308
309 /* Update stats */
310 atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
311
312 /* Check for zebra shutdown */
313 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
314 need_resched = false;
315 goto done;
316 }
317
318 if (IS_ZEBRA_DEBUG_RECV)
319 zlog_debug("%s: processing %u messages", __func__, i);
320
6703a038
MS
321 /*
322 * Process the messages from the temporary fifo. We send the whole
323 * fifo so that we can take advantage of batching internally. Note
324 * that registration/deregistration messages are handled here also.
325 */
326 dispatch_opq_messages(&fifo);
9bb02389
MS
327
328done:
329
330 if (need_resched) {
331 atomic_fetch_add_explicit(&zo_info.yields, 1,
332 memory_order_relaxed);
333 thread_add_event(zo_info.master, process_messages, NULL, 0,
334 &zo_info.t_msgs);
335 }
336
337 /* This will also free any leftover messages, in the shutdown case */
338 stream_fifo_deinit(&fifo);
339
340 return 0;
341}
6703a038
MS
342
343/*
344 * Process (dispatch) or drop opaque messages.
345 */
346static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
347{
348 struct stream *msg, *dup;
349 struct zmsghdr hdr;
350 struct opq_msg_reg *reg;
351 uint32_t type;
352 struct opq_client_reg *client;
353 struct zserv *zclient;
354 char buf[50];
355
356 while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
357 zapi_parse_header(msg, &hdr);
358 hdr.length -= ZEBRA_HEADER_SIZE;
359
360 /* Handle client registration messages */
361 if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
362 handle_opq_registration(&hdr, msg);
363 continue;
364 } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
365 handle_opq_unregistration(&hdr, msg);
366 continue;
367 }
368
369 /* We only process OPAQUE messages - drop anything else */
370 if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
371 goto drop_it;
372
373 /* Dispatch to any registered ZAPI client(s) */
374
375 /* Extract subtype */
376 STREAM_GETL(msg, type);
377
378 /* Look up registered ZAPI client(s) */
379 reg = opq_reg_lookup(type);
380 if (reg == NULL) {
381 if (IS_ZEBRA_DEBUG_RECV)
382 zlog_debug("%s: no registrations for opaque type %u",
383 __func__, type);
384 goto drop_it;
385 }
386
387 /* Reset read pointer, since we'll be re-sending message */
388 stream_set_getp(msg, 0);
389
390 /* Send a copy of the message to all registered clients */
391 for (client = reg->clients; client; client = client->next) {
392 dup = NULL;
393
394 /* Copy message if necessary */
395 if (client->next)
396 dup = stream_dup(msg);
397
398 /*
399 * TODO -- this isn't ideal: we're going through an
400 * acquire/release cycle for each client for each
401 * message. Replace this with a batching version.
402 */
403 zclient = zserv_acquire_client(client->proto,
404 client->instance,
405 client->session_id);
406 if (zclient) {
407 if (IS_ZEBRA_DEBUG_RECV)
408 zlog_debug("%s: sending %s to client %s",
409 __func__,
410 (dup ? "dup" : "msg"),
411 opq_client2str(buf,
412 sizeof(buf),
413 client));
414
415 /*
416 * Sending a message actually means enqueuing
417 * it for a zapi io pthread to send - so we
418 * don't touch the message after this call.
419 */
420 zserv_send_message(zclient, dup ? dup : msg);
421 if (dup)
422 dup = NULL;
423 else
424 msg = NULL;
425
426 zserv_release_client(zclient);
427 } else {
428 if (IS_ZEBRA_DEBUG_RECV)
429 zlog_debug("%s: type %u: no zclient for %s",
430 __func__, type,
431 opq_client2str(buf,
432 sizeof(buf),
433 client));
434 /* Registered but gone? */
435 if (dup)
436 stream_free(dup);
437 }
438 }
439
440drop_it:
441stream_failure:
442 if (msg)
443 stream_free(msg);
444 }
445
446 return 0;
447}
448
449/*
450 * Process a register/unregister message
451 */
452static int handle_opq_registration(const struct zmsghdr *hdr,
453 struct stream *msg)
454{
455 int ret = 0;
456 struct zapi_opaque_reg_info info;
457 struct opq_client_reg *client;
458 struct opq_msg_reg key, *reg;
459 char buf[50];
460
461 memset(&info, 0, sizeof(info));
462
d2ddc141 463 if (zapi_opaque_reg_decode(msg, &info) < 0) {
6703a038
MS
464 ret = -1;
465 goto done;
466 }
467
468 memset(&key, 0, sizeof(key));
469
470 key.type = info.type;
471
472 reg = opq_regh_find(&opq_reg_hash, &key);
473 if (reg) {
474 /* Look for dup client */
475 for (client = reg->clients; client != NULL;
476 client = client->next) {
477 if (opq_client_match(client, &info))
478 break;
479 }
480
481 if (client) {
482 /* Oops - duplicate registration? */
483 if (IS_ZEBRA_DEBUG_RECV)
484 zlog_debug("%s: duplicate opq reg for client %s",
485 __func__,
486 opq_client2str(buf, sizeof(buf),
487 client));
488 goto done;
489 }
490
491 client = opq_client_alloc(&info);
492
493 if (IS_ZEBRA_DEBUG_RECV)
494 zlog_debug("%s: client %s registers for %u",
495 __func__,
496 opq_client2str(buf, sizeof(buf), client),
497 info.type);
498
499 /* Link client into registration */
500 client->next = reg->clients;
501 if (reg->clients)
502 reg->clients->prev = client;
503 reg->clients = client;
504 } else {
505 /*
506 * No existing registrations - create one, add the
507 * client, and add registration to hash.
508 */
509 reg = opq_reg_alloc(info.type);
510 client = opq_client_alloc(&info);
511
512 if (IS_ZEBRA_DEBUG_RECV)
513 zlog_debug("%s: client %s registers for new reg %u",
514 __func__,
515 opq_client2str(buf, sizeof(buf), client),
516 info.type);
517
518 reg->clients = client;
519
520 opq_regh_add(&opq_reg_hash, reg);
521 }
522
523done:
524
525 stream_free(msg);
526 return ret;
527}
528
529/*
530 * Process a register/unregister message
531 */
532static int handle_opq_unregistration(const struct zmsghdr *hdr,
533 struct stream *msg)
534{
535 int ret = 0;
536 struct zapi_opaque_reg_info info;
537 struct opq_client_reg *client;
538 struct opq_msg_reg key, *reg;
539 char buf[50];
540
541 memset(&info, 0, sizeof(info));
542
d2ddc141 543 if (zapi_opaque_reg_decode(msg, &info) < 0) {
6703a038
MS
544 ret = -1;
545 goto done;
546 }
547
548 memset(&key, 0, sizeof(key));
549
550 key.type = info.type;
551
552 reg = opq_regh_find(&opq_reg_hash, &key);
553 if (reg == NULL) {
554 /* Weird: unregister for unknown message? */
555 if (IS_ZEBRA_DEBUG_RECV)
556 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
557 __func__,
558 zebra_route_string(info.proto),
559 info.instance, info.session_id, info.type);
560 goto done;
561 }
562
563 /* Look for client */
564 for (client = reg->clients; client != NULL;
565 client = client->next) {
566 if (opq_client_match(client, &info))
567 break;
568 }
569
570 if (client == NULL) {
571 /* Oops - unregister for unknown client? */
572 if (IS_ZEBRA_DEBUG_RECV)
573 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
574 __func__, zebra_route_string(info.proto),
575 info.instance, info.session_id, info.type);
576 goto done;
577 }
578
579 if (IS_ZEBRA_DEBUG_RECV)
580 zlog_debug("%s: client %s unregisters for %u",
581 __func__, opq_client2str(buf, sizeof(buf), client),
582 info.type);
583
584 if (client->prev)
585 client->prev->next = client->next;
586 if (client->next)
587 client->next->prev = client->prev;
588 if (reg->clients == client)
589 reg->clients = client->next;
590
591 opq_client_free(&client);
592
593 /* Is registration empty now? */
594 if (reg->clients == NULL) {
595 if (IS_ZEBRA_DEBUG_RECV)
596 zlog_debug("%s: free empty reg %u", __func__,
597 reg->type);
598
599 opq_regh_del(&opq_reg_hash, reg);
600 opq_reg_free(&reg);
601 }
602
603done:
604
605 stream_free(msg);
606 return ret;
607}
608
609/* Compare utility for registered clients */
610static bool opq_client_match(const struct opq_client_reg *client,
611 const struct zapi_opaque_reg_info *info)
612{
613 if (client->proto == info->proto &&
614 client->instance == info->instance &&
615 client->session_id == info->session_id)
616 return true;
617 else
618 return false;
619}
620
621static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
622{
623 struct opq_msg_reg key, *reg;
624
625 memset(&key, 0, sizeof(key));
626
627 key.type = type;
628
629 reg = opq_regh_find(&opq_reg_hash, &key);
630
631 return reg;
632}
633
634static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
635{
636 struct opq_msg_reg *reg;
637
638 reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
639
640 reg->type = type;
641 INIT_HASH(&reg->item);
642
643 return reg;
644}
645
646static void opq_reg_free(struct opq_msg_reg **reg)
647{
648 XFREE(MTYPE_OPQ, (*reg));
649}
650
651static struct opq_client_reg *opq_client_alloc(
652 const struct zapi_opaque_reg_info *info)
653{
654 struct opq_client_reg *client;
655
656 client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
657
658 client->proto = info->proto;
659 client->instance = info->instance;
660 client->session_id = info->session_id;
661
662 return client;
663}
664
665static void opq_client_free(struct opq_client_reg **client)
666{
667 XFREE(MTYPE_OPQ, (*client));
668}
669
670static const char *opq_client2str(char *buf, size_t buflen,
671 const struct opq_client_reg *client)
672{
673 char sbuf[20];
674
675 snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
676 client->instance);
677 if (client->session_id > 0) {
678 snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
679 strlcat(buf, sbuf, buflen);
680 }
681
682 return buf;
683}
684
685/* Hash function for clients registered for messages */
686static uint32_t registration_hash(const struct opq_msg_reg *reg)
687{
688 return reg->type;
689}
690
691/* Comparison function for client registrations */
692static int registration_compare(const struct opq_msg_reg *reg1,
693 const struct opq_msg_reg *reg2)
694{
695 if (reg1->type == reg2->type)
696 return 0;
697 else
698 return -1;
699}