]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_opaque.c
*: Convert `struct event_master` to `struct event_loop`
[mirror_frr.git] / zebra / zebra_opaque.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * Zebra opaque message handler module
4 * Copyright (c) 2020 Volta Networks, Inc.
5 */
6
7
8 #include <zebra.h>
9 #include "lib/debug.h"
10 #include "lib/frr_pthread.h"
11 #include "lib/stream.h"
12 #include "zebra/debug.h"
13 #include "zebra/zserv.h"
14 #include "zebra/zebra_opaque.h"
15 #include "zebra/rib.h"
16
17 /* Mem type */
18 DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
19
20 /* Hash to hold message registration info from zapi clients */
21 PREDECL_HASH(opq_regh);
22
23 /* Registered client info */
24 struct opq_client_reg {
25 int proto;
26 int instance;
27 uint32_t session_id;
28
29 struct opq_client_reg *next;
30 struct opq_client_reg *prev;
31 };
32
33 /* Opaque message registration info */
34 struct opq_msg_reg {
35 struct opq_regh_item item;
36
37 /* Message type */
38 uint32_t type;
39
40 struct opq_client_reg *clients;
41 };
42
43 /* Registration helper prototypes */
44 static uint32_t registration_hash(const struct opq_msg_reg *reg);
45 static int registration_compare(const struct opq_msg_reg *reg1,
46 const struct opq_msg_reg *reg2);
47
48 DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
49 registration_hash);
50
51 static struct opq_regh_head opq_reg_hash;
52
53 /*
54 * Globals
55 */
56 static struct zebra_opaque_globals {
57
58 /* Sentinel for run or start of shutdown */
59 _Atomic uint32_t run;
60
61 /* Limit number of pending, unprocessed updates */
62 _Atomic uint32_t max_queued_updates;
63
64 /* Limit number of new messages dequeued at once, to pace an
65 * incoming burst.
66 */
67 uint32_t msgs_per_cycle;
68
69 /* Stats: counters of incoming messages, errors, and yields (when
70 * the limit has been reached.)
71 */
72 _Atomic uint32_t msgs_in;
73 _Atomic uint32_t msg_errors;
74 _Atomic uint32_t yields;
75
76 /* pthread */
77 struct frr_pthread *pthread;
78
79 /* Event-delivery context 'master' for the module */
80 struct event_loop *master;
81
82 /* Event/'thread' pointer for queued zapi messages */
83 struct event *t_msgs;
84
85 /* Input fifo queue to the module, and lock to protect it. */
86 pthread_mutex_t mutex;
87 struct stream_fifo in_fifo;
88
89 } zo_info;
90
91 /* Name string for debugs/logs */
92 static const char LOG_NAME[] = "Zebra Opaque";
93
94 /* Prototypes */
95
96 /* Main event loop, processing incoming message queue */
97 static void process_messages(struct event *event);
98 static int handle_opq_registration(const struct zmsghdr *hdr,
99 struct stream *msg);
100 static int handle_opq_unregistration(const struct zmsghdr *hdr,
101 struct stream *msg);
102 static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
103 static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
104 static bool opq_client_match(const struct opq_client_reg *client,
105 const struct zapi_opaque_reg_info *info);
106 static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
107 static void opq_reg_free(struct opq_msg_reg **reg);
108 static struct opq_client_reg *opq_client_alloc(
109 const struct zapi_opaque_reg_info *info);
110 static void opq_client_free(struct opq_client_reg **client);
111 static const char *opq_client2str(char *buf, size_t buflen,
112 const struct opq_client_reg *client);
113
114 /*
115 * Initialize the module at startup
116 */
117 void zebra_opaque_init(void)
118 {
119 memset(&zo_info, 0, sizeof(zo_info));
120
121 pthread_mutex_init(&zo_info.mutex, NULL);
122 stream_fifo_init(&zo_info.in_fifo);
123
124 zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
125 }
126
127 /*
128 * Start the module pthread. This step is run later than the
129 * 'init' step, in case zebra has fork-ed.
130 */
131 void zebra_opaque_start(void)
132 {
133 struct frr_pthread_attr pattr = {
134 .start = frr_pthread_attr_default.start,
135 .stop = frr_pthread_attr_default.stop
136 };
137
138 if (IS_ZEBRA_DEBUG_EVENT)
139 zlog_debug("%s module starting", LOG_NAME);
140
141 /* Start pthread */
142 zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
143 "zebra_opaque");
144
145 /* Associate event 'master' */
146 zo_info.master = zo_info.pthread->master;
147
148 atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
149
150 /* Enqueue an initial event for the pthread */
151 event_add_event(zo_info.master, process_messages, NULL, 0,
152 &zo_info.t_msgs);
153
154 /* And start the pthread */
155 frr_pthread_run(zo_info.pthread, NULL);
156 }
157
158 /*
159 * Module stop, halting the dedicated pthread; called from the main pthread.
160 */
161 void zebra_opaque_stop(void)
162 {
163 if (IS_ZEBRA_DEBUG_EVENT)
164 zlog_debug("%s module stop", LOG_NAME);
165
166 atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
167
168 frr_pthread_stop(zo_info.pthread, NULL);
169
170 frr_pthread_destroy(zo_info.pthread);
171
172 if (IS_ZEBRA_DEBUG_EVENT)
173 zlog_debug("%s module stop complete", LOG_NAME);
174 }
175
176 /*
177 * Module final cleanup, called from the zebra main pthread.
178 */
179 void zebra_opaque_finish(void)
180 {
181 struct opq_msg_reg *reg;
182 struct opq_client_reg *client;
183
184 if (IS_ZEBRA_DEBUG_EVENT)
185 zlog_debug("%s module shutdown", LOG_NAME);
186
187 /* Clear out registration info */
188 while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
189 client = reg->clients;
190 while (client) {
191 reg->clients = client->next;
192 opq_client_free(&client);
193 client = reg->clients;
194 }
195
196 opq_reg_free(&reg);
197 }
198
199 opq_regh_fini(&opq_reg_hash);
200
201 pthread_mutex_destroy(&zo_info.mutex);
202 stream_fifo_deinit(&zo_info.in_fifo);
203 }
204
205 /*
206 * Does this module handle (intercept) the specified zapi message type?
207 */
208 bool zebra_opaque_handles_msgid(uint16_t id)
209 {
210 bool ret = false;
211
212 switch (id) {
213 case ZEBRA_OPAQUE_MESSAGE:
214 case ZEBRA_OPAQUE_REGISTER:
215 case ZEBRA_OPAQUE_UNREGISTER:
216 ret = true;
217 break;
218 default:
219 break;
220 }
221
222 return ret;
223 }
224
225 /*
226 * Enqueue a batch of messages for processing - this is the public api
227 * used from the zapi processing threads.
228 */
229 uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
230 {
231 uint32_t counter = 0;
232 struct stream *msg;
233
234 /* Dequeue messages from the incoming batch, and save them
235 * on the module fifo.
236 */
237 frr_with_mutex (&zo_info.mutex) {
238 msg = stream_fifo_pop(batch);
239 while (msg) {
240 stream_fifo_push(&zo_info.in_fifo, msg);
241 counter++;
242 msg = stream_fifo_pop(batch);
243 }
244 }
245
246 /* Schedule module pthread to process the batch */
247 if (counter > 0) {
248 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
249 zlog_debug("%s: received %u messages",
250 __func__, counter);
251 event_add_event(zo_info.master, process_messages, NULL, 0,
252 &zo_info.t_msgs);
253 }
254
255 return counter;
256 }
257
258 /*
259 * Pthread event loop, process the incoming message queue.
260 */
261 static void process_messages(struct event *event)
262 {
263 struct stream_fifo fifo;
264 struct stream *msg;
265 uint32_t i;
266 bool need_resched = false;
267
268 stream_fifo_init(&fifo);
269
270 /* Check for zebra shutdown */
271 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
272 goto done;
273
274 /*
275 * Dequeue some messages from the incoming queue, temporarily
276 * save them on the local fifo
277 */
278 frr_with_mutex (&zo_info.mutex) {
279
280 for (i = 0; i < zo_info.msgs_per_cycle; i++) {
281 msg = stream_fifo_pop(&zo_info.in_fifo);
282 if (msg == NULL)
283 break;
284
285 stream_fifo_push(&fifo, msg);
286 }
287
288 /*
289 * We may need to reschedule, if there are still
290 * queued messages
291 */
292 if (stream_fifo_head(&zo_info.in_fifo) != NULL)
293 need_resched = true;
294 }
295
296 /* Update stats */
297 atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
298
299 /* Check for zebra shutdown */
300 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
301 need_resched = false;
302 goto done;
303 }
304
305 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
306 zlog_debug("%s: processing %u messages", __func__, i);
307
308 /*
309 * Process the messages from the temporary fifo. We send the whole
310 * fifo so that we can take advantage of batching internally. Note
311 * that registration/deregistration messages are handled here also.
312 */
313 dispatch_opq_messages(&fifo);
314
315 done:
316
317 if (need_resched) {
318 atomic_fetch_add_explicit(&zo_info.yields, 1,
319 memory_order_relaxed);
320 event_add_event(zo_info.master, process_messages, NULL, 0,
321 &zo_info.t_msgs);
322 }
323
324 /* This will also free any leftover messages, in the shutdown case */
325 stream_fifo_deinit(&fifo);
326 }
327
328 /*
329 * Process (dispatch) or drop opaque messages.
330 */
331 static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
332 {
333 struct stream *msg, *dup;
334 struct zmsghdr hdr;
335 struct zapi_opaque_msg info;
336 struct opq_msg_reg *reg;
337 int ret;
338 struct opq_client_reg *client;
339 struct zserv *zclient;
340 char buf[50];
341
342 while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
343 zapi_parse_header(msg, &hdr);
344 hdr.length -= ZEBRA_HEADER_SIZE;
345
346 /* Handle client registration messages */
347 if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
348 handle_opq_registration(&hdr, msg);
349 continue;
350 } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
351 handle_opq_unregistration(&hdr, msg);
352 continue;
353 }
354
355 /* We only process OPAQUE messages - drop anything else */
356 if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
357 goto drop_it;
358
359 /* Dispatch to any registered ZAPI client(s) */
360
361 /* Extract subtype and flags */
362 ret = zclient_opaque_decode(msg, &info);
363 if (ret != 0)
364 goto drop_it;
365
366 /* Look up registered ZAPI client(s) */
367 reg = opq_reg_lookup(info.type);
368 if (reg == NULL) {
369 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
370 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
371 __func__, info.type, info.flags);
372 goto drop_it;
373 }
374
375 /* Reset read pointer, since we'll be re-sending message */
376 stream_set_getp(msg, 0);
377
378 /* Send a copy of the message to all registered clients */
379 for (client = reg->clients; client; client = client->next) {
380 dup = NULL;
381
382 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
383
384 if (client->proto != info.proto ||
385 client->instance != info.instance ||
386 client->session_id != info.session_id)
387 continue;
388
389 if (IS_ZEBRA_DEBUG_RECV &&
390 IS_ZEBRA_DEBUG_DETAIL)
391 zlog_debug("%s: found matching unicast client %s",
392 __func__,
393 opq_client2str(buf,
394 sizeof(buf),
395 client));
396
397 } else {
398 /* Copy message if more clients */
399 if (client->next)
400 dup = stream_dup(msg);
401 }
402
403 /*
404 * TODO -- this isn't ideal: we're going through an
405 * acquire/release cycle for each client for each
406 * message. Replace this with a batching version.
407 */
408 zclient = zserv_acquire_client(client->proto,
409 client->instance,
410 client->session_id);
411 if (zclient) {
412 if (IS_ZEBRA_DEBUG_SEND &&
413 IS_ZEBRA_DEBUG_DETAIL)
414 zlog_debug("%s: sending %s to client %s",
415 __func__,
416 (dup ? "dup" : "msg"),
417 opq_client2str(buf,
418 sizeof(buf),
419 client));
420
421 /*
422 * Sending a message actually means enqueuing
423 * it for a zapi io pthread to send - so we
424 * don't touch the message after this call.
425 */
426 zserv_send_message(zclient, dup ? dup : msg);
427 if (dup)
428 dup = NULL;
429 else
430 msg = NULL;
431
432 zserv_release_client(zclient);
433 } else {
434 if (IS_ZEBRA_DEBUG_RECV &&
435 IS_ZEBRA_DEBUG_DETAIL)
436 zlog_debug("%s: type %u: no zclient for %s",
437 __func__, info.type,
438 opq_client2str(buf,
439 sizeof(buf),
440 client));
441 /* Registered but gone? */
442 if (dup)
443 stream_free(dup);
444 }
445
446 /* If unicast, we're done */
447 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST))
448 break;
449 }
450
451 drop_it:
452
453 if (msg)
454 stream_free(msg);
455 }
456
457 return 0;
458 }
459
460 /*
461 * Process a register/unregister message
462 */
463 static int handle_opq_registration(const struct zmsghdr *hdr,
464 struct stream *msg)
465 {
466 int ret = 0;
467 struct zapi_opaque_reg_info info;
468 struct opq_client_reg *client;
469 struct opq_msg_reg key, *reg;
470 char buf[50];
471
472 memset(&info, 0, sizeof(info));
473
474 if (zapi_opaque_reg_decode(msg, &info) < 0) {
475 ret = -1;
476 goto done;
477 }
478
479 memset(&key, 0, sizeof(key));
480
481 key.type = info.type;
482
483 reg = opq_regh_find(&opq_reg_hash, &key);
484 if (reg) {
485 /* Look for dup client */
486 for (client = reg->clients; client != NULL;
487 client = client->next) {
488 if (opq_client_match(client, &info))
489 break;
490 }
491
492 if (client) {
493 /* Oops - duplicate registration? */
494 if (IS_ZEBRA_DEBUG_RECV)
495 zlog_debug("%s: duplicate opq reg for client %s",
496 __func__,
497 opq_client2str(buf, sizeof(buf),
498 client));
499 goto done;
500 }
501
502 client = opq_client_alloc(&info);
503
504 if (IS_ZEBRA_DEBUG_RECV)
505 zlog_debug("%s: client %s registers for %u",
506 __func__,
507 opq_client2str(buf, sizeof(buf), client),
508 info.type);
509
510 /* Link client into registration */
511 client->next = reg->clients;
512 if (reg->clients)
513 reg->clients->prev = client;
514 reg->clients = client;
515 } else {
516 /*
517 * No existing registrations - create one, add the
518 * client, and add registration to hash.
519 */
520 reg = opq_reg_alloc(info.type);
521 client = opq_client_alloc(&info);
522
523 if (IS_ZEBRA_DEBUG_RECV)
524 zlog_debug("%s: client %s registers for new reg %u",
525 __func__,
526 opq_client2str(buf, sizeof(buf), client),
527 info.type);
528
529 reg->clients = client;
530
531 opq_regh_add(&opq_reg_hash, reg);
532 }
533
534 done:
535
536 stream_free(msg);
537 return ret;
538 }
539
540 /*
541 * Process a register/unregister message
542 */
543 static int handle_opq_unregistration(const struct zmsghdr *hdr,
544 struct stream *msg)
545 {
546 int ret = 0;
547 struct zapi_opaque_reg_info info;
548 struct opq_client_reg *client;
549 struct opq_msg_reg key, *reg;
550 char buf[50];
551
552 memset(&info, 0, sizeof(info));
553
554 if (zapi_opaque_reg_decode(msg, &info) < 0) {
555 ret = -1;
556 goto done;
557 }
558
559 memset(&key, 0, sizeof(key));
560
561 key.type = info.type;
562
563 reg = opq_regh_find(&opq_reg_hash, &key);
564 if (reg == NULL) {
565 /* Weird: unregister for unknown message? */
566 if (IS_ZEBRA_DEBUG_RECV)
567 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
568 __func__,
569 zebra_route_string(info.proto),
570 info.instance, info.session_id, info.type);
571 goto done;
572 }
573
574 /* Look for client */
575 for (client = reg->clients; client != NULL;
576 client = client->next) {
577 if (opq_client_match(client, &info))
578 break;
579 }
580
581 if (client == NULL) {
582 /* Oops - unregister for unknown client? */
583 if (IS_ZEBRA_DEBUG_RECV)
584 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
585 __func__, zebra_route_string(info.proto),
586 info.instance, info.session_id, info.type);
587 goto done;
588 }
589
590 if (IS_ZEBRA_DEBUG_RECV)
591 zlog_debug("%s: client %s unregisters for %u",
592 __func__, opq_client2str(buf, sizeof(buf), client),
593 info.type);
594
595 if (client->prev)
596 client->prev->next = client->next;
597 if (client->next)
598 client->next->prev = client->prev;
599 if (reg->clients == client)
600 reg->clients = client->next;
601
602 opq_client_free(&client);
603
604 /* Is registration empty now? */
605 if (reg->clients == NULL) {
606 opq_regh_del(&opq_reg_hash, reg);
607 opq_reg_free(&reg);
608 }
609
610 done:
611
612 stream_free(msg);
613 return ret;
614 }
615
616 /* Compare utility for registered clients */
617 static bool opq_client_match(const struct opq_client_reg *client,
618 const struct zapi_opaque_reg_info *info)
619 {
620 if (client->proto == info->proto &&
621 client->instance == info->instance &&
622 client->session_id == info->session_id)
623 return true;
624 else
625 return false;
626 }
627
628 static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
629 {
630 struct opq_msg_reg key, *reg;
631
632 memset(&key, 0, sizeof(key));
633
634 key.type = type;
635
636 reg = opq_regh_find(&opq_reg_hash, &key);
637
638 return reg;
639 }
640
641 static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
642 {
643 struct opq_msg_reg *reg;
644
645 reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
646
647 reg->type = type;
648 INIT_HASH(&reg->item);
649
650 return reg;
651 }
652
653 static void opq_reg_free(struct opq_msg_reg **reg)
654 {
655 XFREE(MTYPE_OPQ, (*reg));
656 }
657
658 static struct opq_client_reg *opq_client_alloc(
659 const struct zapi_opaque_reg_info *info)
660 {
661 struct opq_client_reg *client;
662
663 client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
664
665 client->proto = info->proto;
666 client->instance = info->instance;
667 client->session_id = info->session_id;
668
669 return client;
670 }
671
672 static void opq_client_free(struct opq_client_reg **client)
673 {
674 XFREE(MTYPE_OPQ, (*client));
675 }
676
677 static const char *opq_client2str(char *buf, size_t buflen,
678 const struct opq_client_reg *client)
679 {
680 char sbuf[20];
681
682 snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
683 client->instance);
684 if (client->session_id > 0) {
685 snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
686 strlcat(buf, sbuf, buflen);
687 }
688
689 return buf;
690 }
691
692 /* Hash function for clients registered for messages */
693 static uint32_t registration_hash(const struct opq_msg_reg *reg)
694 {
695 return reg->type;
696 }
697
698 /* Comparison function for client registrations */
699 static int registration_compare(const struct opq_msg_reg *reg1,
700 const struct opq_msg_reg *reg2)
701 {
702 if (reg1->type == reg2->type)
703 return 0;
704 else
705 return -1;
706 }