]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_opaque.c
*: Add camelCase JSON keys in addition to PascalCase
[mirror_frr.git] / zebra / zebra_opaque.c
1 /*
2 * Zebra opaque message handler module
3 * Copyright (c) 2020 Volta Networks, Inc.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20
21 #include <zebra.h>
22 #include "lib/debug.h"
23 #include "lib/frr_pthread.h"
24 #include "lib/stream.h"
25 #include "zebra/debug.h"
26 #include "zebra/zserv.h"
27 #include "zebra/zebra_opaque.h"
28
29 /* Mem type */
30 DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
31
32 /* Hash to hold message registration info from zapi clients */
33 PREDECL_HASH(opq_regh);
34
35 /* Registered client info */
36 struct opq_client_reg {
37 int proto;
38 int instance;
39 uint32_t session_id;
40
41 struct opq_client_reg *next;
42 struct opq_client_reg *prev;
43 };
44
45 /* Opaque message registration info */
46 struct opq_msg_reg {
47 struct opq_regh_item item;
48
49 /* Message type */
50 uint32_t type;
51
52 struct opq_client_reg *clients;
53 };
54
55 /* Registration helper prototypes */
56 static uint32_t registration_hash(const struct opq_msg_reg *reg);
57 static int registration_compare(const struct opq_msg_reg *reg1,
58 const struct opq_msg_reg *reg2);
59
60 DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
61 registration_hash);
62
63 static struct opq_regh_head opq_reg_hash;
64
65 /*
66 * Globals
67 */
68 static struct zebra_opaque_globals {
69
70 /* Sentinel for run or start of shutdown */
71 _Atomic uint32_t run;
72
73 /* Limit number of pending, unprocessed updates */
74 _Atomic uint32_t max_queued_updates;
75
76 /* Limit number of new messages dequeued at once, to pace an
77 * incoming burst.
78 */
79 uint32_t msgs_per_cycle;
80
81 /* Stats: counters of incoming messages, errors, and yields (when
82 * the limit has been reached.)
83 */
84 _Atomic uint32_t msgs_in;
85 _Atomic uint32_t msg_errors;
86 _Atomic uint32_t yields;
87
88 /* pthread */
89 struct frr_pthread *pthread;
90
91 /* Event-delivery context 'master' for the module */
92 struct thread_master *master;
93
94 /* Event/'thread' pointer for queued zapi messages */
95 struct thread *t_msgs;
96
97 /* Input fifo queue to the module, and lock to protect it. */
98 pthread_mutex_t mutex;
99 struct stream_fifo in_fifo;
100
101 } zo_info;
102
103 /* Name string for debugs/logs */
104 static const char LOG_NAME[] = "Zebra Opaque";
105
106 /* Prototypes */
107
108 /* Main event loop, processing incoming message queue */
109 static int process_messages(struct thread *event);
110 static int handle_opq_registration(const struct zmsghdr *hdr,
111 struct stream *msg);
112 static int handle_opq_unregistration(const struct zmsghdr *hdr,
113 struct stream *msg);
114 static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
115 static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
116 static bool opq_client_match(const struct opq_client_reg *client,
117 const struct zapi_opaque_reg_info *info);
118 static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
119 static void opq_reg_free(struct opq_msg_reg **reg);
120 static struct opq_client_reg *opq_client_alloc(
121 const struct zapi_opaque_reg_info *info);
122 static void opq_client_free(struct opq_client_reg **client);
123 static const char *opq_client2str(char *buf, size_t buflen,
124 const struct opq_client_reg *client);
125
126 /*
127 * Initialize the module at startup
128 */
129 void zebra_opaque_init(void)
130 {
131 memset(&zo_info, 0, sizeof(zo_info));
132
133 pthread_mutex_init(&zo_info.mutex, NULL);
134 stream_fifo_init(&zo_info.in_fifo);
135
136 zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
137 }
138
139 /*
140 * Start the module pthread. This step is run later than the
141 * 'init' step, in case zebra has fork-ed.
142 */
143 void zebra_opaque_start(void)
144 {
145 struct frr_pthread_attr pattr = {
146 .start = frr_pthread_attr_default.start,
147 .stop = frr_pthread_attr_default.stop
148 };
149
150 if (IS_ZEBRA_DEBUG_EVENT)
151 zlog_debug("%s module starting", LOG_NAME);
152
153 /* Start pthread */
154 zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
155 "zebra_opaque");
156
157 /* Associate event 'master' */
158 zo_info.master = zo_info.pthread->master;
159
160 atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
161
162 /* Enqueue an initial event for the pthread */
163 thread_add_event(zo_info.master, process_messages, NULL, 0,
164 &zo_info.t_msgs);
165
166 /* And start the pthread */
167 frr_pthread_run(zo_info.pthread, NULL);
168 }
169
170 /*
171 * Module stop, halting the dedicated pthread; called from the main pthread.
172 */
173 void zebra_opaque_stop(void)
174 {
175 if (IS_ZEBRA_DEBUG_EVENT)
176 zlog_debug("%s module stop", LOG_NAME);
177
178 atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
179
180 frr_pthread_stop(zo_info.pthread, NULL);
181
182 frr_pthread_destroy(zo_info.pthread);
183
184 if (IS_ZEBRA_DEBUG_EVENT)
185 zlog_debug("%s module stop complete", LOG_NAME);
186 }
187
188 /*
189 * Module final cleanup, called from the zebra main pthread.
190 */
191 void zebra_opaque_finish(void)
192 {
193 struct opq_msg_reg *reg;
194 struct opq_client_reg *client;
195
196 if (IS_ZEBRA_DEBUG_EVENT)
197 zlog_debug("%s module shutdown", LOG_NAME);
198
199 /* Clear out registration info */
200 while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
201 client = reg->clients;
202 while (client) {
203 reg->clients = client->next;
204 opq_client_free(&client);
205 client = reg->clients;
206 }
207
208 opq_reg_free(&reg);
209 }
210
211 opq_regh_fini(&opq_reg_hash);
212
213 pthread_mutex_destroy(&zo_info.mutex);
214 stream_fifo_deinit(&zo_info.in_fifo);
215 }
216
217 /*
218 * Does this module handle (intercept) the specified zapi message type?
219 */
220 bool zebra_opaque_handles_msgid(uint16_t id)
221 {
222 bool ret = false;
223
224 switch (id) {
225 case ZEBRA_OPAQUE_MESSAGE:
226 case ZEBRA_OPAQUE_REGISTER:
227 case ZEBRA_OPAQUE_UNREGISTER:
228 ret = true;
229 break;
230 default:
231 break;
232 }
233
234 return ret;
235 }
236
237 /*
238 * Enqueue a batch of messages for processing - this is the public api
239 * used from the zapi processing threads.
240 */
241 uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
242 {
243 uint32_t counter = 0;
244 struct stream *msg;
245
246 /* Dequeue messages from the incoming batch, and save them
247 * on the module fifo.
248 */
249 frr_with_mutex(&zo_info.mutex) {
250 msg = stream_fifo_pop(batch);
251 while (msg) {
252 stream_fifo_push(&zo_info.in_fifo, msg);
253 counter++;
254 msg = stream_fifo_pop(batch);
255 }
256 }
257
258 /* Schedule module pthread to process the batch */
259 if (counter > 0) {
260 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
261 zlog_debug("%s: received %u messages",
262 __func__, counter);
263 thread_add_event(zo_info.master, process_messages, NULL, 0,
264 &zo_info.t_msgs);
265 }
266
267 return counter;
268 }
269
270 /*
271 * Pthread event loop, process the incoming message queue.
272 */
273 static int process_messages(struct thread *event)
274 {
275 struct stream_fifo fifo;
276 struct stream *msg;
277 uint32_t i;
278 bool need_resched = false;
279
280 stream_fifo_init(&fifo);
281
282 /* Check for zebra shutdown */
283 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
284 goto done;
285
286 /*
287 * Dequeue some messages from the incoming queue, temporarily
288 * save them on the local fifo
289 */
290 frr_with_mutex(&zo_info.mutex) {
291
292 for (i = 0; i < zo_info.msgs_per_cycle; i++) {
293 msg = stream_fifo_pop(&zo_info.in_fifo);
294 if (msg == NULL)
295 break;
296
297 stream_fifo_push(&fifo, msg);
298 }
299
300 /*
301 * We may need to reschedule, if there are still
302 * queued messages
303 */
304 if (stream_fifo_head(&zo_info.in_fifo) != NULL)
305 need_resched = true;
306 }
307
308 /* Update stats */
309 atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
310
311 /* Check for zebra shutdown */
312 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
313 need_resched = false;
314 goto done;
315 }
316
317 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
318 zlog_debug("%s: processing %u messages", __func__, i);
319
320 /*
321 * Process the messages from the temporary fifo. We send the whole
322 * fifo so that we can take advantage of batching internally. Note
323 * that registration/deregistration messages are handled here also.
324 */
325 dispatch_opq_messages(&fifo);
326
327 done:
328
329 if (need_resched) {
330 atomic_fetch_add_explicit(&zo_info.yields, 1,
331 memory_order_relaxed);
332 thread_add_event(zo_info.master, process_messages, NULL, 0,
333 &zo_info.t_msgs);
334 }
335
336 /* This will also free any leftover messages, in the shutdown case */
337 stream_fifo_deinit(&fifo);
338
339 return 0;
340 }
341
342 /*
343 * Process (dispatch) or drop opaque messages.
344 */
345 static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
346 {
347 struct stream *msg, *dup;
348 struct zmsghdr hdr;
349 struct zapi_opaque_msg info;
350 struct opq_msg_reg *reg;
351 int ret;
352 struct opq_client_reg *client;
353 struct zserv *zclient;
354 char buf[50];
355
356 while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
357 zapi_parse_header(msg, &hdr);
358 hdr.length -= ZEBRA_HEADER_SIZE;
359
360 /* Handle client registration messages */
361 if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
362 handle_opq_registration(&hdr, msg);
363 continue;
364 } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
365 handle_opq_unregistration(&hdr, msg);
366 continue;
367 }
368
369 /* We only process OPAQUE messages - drop anything else */
370 if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
371 goto drop_it;
372
373 /* Dispatch to any registered ZAPI client(s) */
374
375 /* Extract subtype and flags */
376 ret = zclient_opaque_decode(msg, &info);
377 if (ret != 0)
378 goto drop_it;
379
380 /* Look up registered ZAPI client(s) */
381 reg = opq_reg_lookup(info.type);
382 if (reg == NULL) {
383 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
384 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
385 __func__, info.type, info.flags);
386 goto drop_it;
387 }
388
389 /* Reset read pointer, since we'll be re-sending message */
390 stream_set_getp(msg, 0);
391
392 /* Send a copy of the message to all registered clients */
393 for (client = reg->clients; client; client = client->next) {
394 dup = NULL;
395
396 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
397
398 if (client->proto != info.proto ||
399 client->instance != info.instance ||
400 client->session_id != info.session_id)
401 continue;
402
403 if (IS_ZEBRA_DEBUG_RECV &&
404 IS_ZEBRA_DEBUG_DETAIL)
405 zlog_debug("%s: found matching unicast client %s",
406 __func__,
407 opq_client2str(buf,
408 sizeof(buf),
409 client));
410
411 } else {
412 /* Copy message if more clients */
413 if (client->next)
414 dup = stream_dup(msg);
415 }
416
417 /*
418 * TODO -- this isn't ideal: we're going through an
419 * acquire/release cycle for each client for each
420 * message. Replace this with a batching version.
421 */
422 zclient = zserv_acquire_client(client->proto,
423 client->instance,
424 client->session_id);
425 if (zclient) {
426 if (IS_ZEBRA_DEBUG_SEND &&
427 IS_ZEBRA_DEBUG_DETAIL)
428 zlog_debug("%s: sending %s to client %s",
429 __func__,
430 (dup ? "dup" : "msg"),
431 opq_client2str(buf,
432 sizeof(buf),
433 client));
434
435 /*
436 * Sending a message actually means enqueuing
437 * it for a zapi io pthread to send - so we
438 * don't touch the message after this call.
439 */
440 zserv_send_message(zclient, dup ? dup : msg);
441 if (dup)
442 dup = NULL;
443 else
444 msg = NULL;
445
446 zserv_release_client(zclient);
447 } else {
448 if (IS_ZEBRA_DEBUG_RECV &&
449 IS_ZEBRA_DEBUG_DETAIL)
450 zlog_debug("%s: type %u: no zclient for %s",
451 __func__, info.type,
452 opq_client2str(buf,
453 sizeof(buf),
454 client));
455 /* Registered but gone? */
456 if (dup)
457 stream_free(dup);
458 }
459
460 /* If unicast, we're done */
461 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST))
462 break;
463 }
464
465 drop_it:
466
467 if (msg)
468 stream_free(msg);
469 }
470
471 return 0;
472 }
473
474 /*
475 * Process a register/unregister message
476 */
477 static int handle_opq_registration(const struct zmsghdr *hdr,
478 struct stream *msg)
479 {
480 int ret = 0;
481 struct zapi_opaque_reg_info info;
482 struct opq_client_reg *client;
483 struct opq_msg_reg key, *reg;
484 char buf[50];
485
486 memset(&info, 0, sizeof(info));
487
488 if (zapi_opaque_reg_decode(msg, &info) < 0) {
489 ret = -1;
490 goto done;
491 }
492
493 memset(&key, 0, sizeof(key));
494
495 key.type = info.type;
496
497 reg = opq_regh_find(&opq_reg_hash, &key);
498 if (reg) {
499 /* Look for dup client */
500 for (client = reg->clients; client != NULL;
501 client = client->next) {
502 if (opq_client_match(client, &info))
503 break;
504 }
505
506 if (client) {
507 /* Oops - duplicate registration? */
508 if (IS_ZEBRA_DEBUG_RECV)
509 zlog_debug("%s: duplicate opq reg for client %s",
510 __func__,
511 opq_client2str(buf, sizeof(buf),
512 client));
513 goto done;
514 }
515
516 client = opq_client_alloc(&info);
517
518 if (IS_ZEBRA_DEBUG_RECV)
519 zlog_debug("%s: client %s registers for %u",
520 __func__,
521 opq_client2str(buf, sizeof(buf), client),
522 info.type);
523
524 /* Link client into registration */
525 client->next = reg->clients;
526 if (reg->clients)
527 reg->clients->prev = client;
528 reg->clients = client;
529 } else {
530 /*
531 * No existing registrations - create one, add the
532 * client, and add registration to hash.
533 */
534 reg = opq_reg_alloc(info.type);
535 client = opq_client_alloc(&info);
536
537 if (IS_ZEBRA_DEBUG_RECV)
538 zlog_debug("%s: client %s registers for new reg %u",
539 __func__,
540 opq_client2str(buf, sizeof(buf), client),
541 info.type);
542
543 reg->clients = client;
544
545 opq_regh_add(&opq_reg_hash, reg);
546 }
547
548 done:
549
550 stream_free(msg);
551 return ret;
552 }
553
554 /*
555 * Process a register/unregister message
556 */
557 static int handle_opq_unregistration(const struct zmsghdr *hdr,
558 struct stream *msg)
559 {
560 int ret = 0;
561 struct zapi_opaque_reg_info info;
562 struct opq_client_reg *client;
563 struct opq_msg_reg key, *reg;
564 char buf[50];
565
566 memset(&info, 0, sizeof(info));
567
568 if (zapi_opaque_reg_decode(msg, &info) < 0) {
569 ret = -1;
570 goto done;
571 }
572
573 memset(&key, 0, sizeof(key));
574
575 key.type = info.type;
576
577 reg = opq_regh_find(&opq_reg_hash, &key);
578 if (reg == NULL) {
579 /* Weird: unregister for unknown message? */
580 if (IS_ZEBRA_DEBUG_RECV)
581 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
582 __func__,
583 zebra_route_string(info.proto),
584 info.instance, info.session_id, info.type);
585 goto done;
586 }
587
588 /* Look for client */
589 for (client = reg->clients; client != NULL;
590 client = client->next) {
591 if (opq_client_match(client, &info))
592 break;
593 }
594
595 if (client == NULL) {
596 /* Oops - unregister for unknown client? */
597 if (IS_ZEBRA_DEBUG_RECV)
598 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
599 __func__, zebra_route_string(info.proto),
600 info.instance, info.session_id, info.type);
601 goto done;
602 }
603
604 if (IS_ZEBRA_DEBUG_RECV)
605 zlog_debug("%s: client %s unregisters for %u",
606 __func__, opq_client2str(buf, sizeof(buf), client),
607 info.type);
608
609 if (client->prev)
610 client->prev->next = client->next;
611 if (client->next)
612 client->next->prev = client->prev;
613 if (reg->clients == client)
614 reg->clients = client->next;
615
616 opq_client_free(&client);
617
618 /* Is registration empty now? */
619 if (reg->clients == NULL) {
620 opq_regh_del(&opq_reg_hash, reg);
621 opq_reg_free(&reg);
622 }
623
624 done:
625
626 stream_free(msg);
627 return ret;
628 }
629
630 /* Compare utility for registered clients */
631 static bool opq_client_match(const struct opq_client_reg *client,
632 const struct zapi_opaque_reg_info *info)
633 {
634 if (client->proto == info->proto &&
635 client->instance == info->instance &&
636 client->session_id == info->session_id)
637 return true;
638 else
639 return false;
640 }
641
642 static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
643 {
644 struct opq_msg_reg key, *reg;
645
646 memset(&key, 0, sizeof(key));
647
648 key.type = type;
649
650 reg = opq_regh_find(&opq_reg_hash, &key);
651
652 return reg;
653 }
654
655 static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
656 {
657 struct opq_msg_reg *reg;
658
659 reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
660
661 reg->type = type;
662 INIT_HASH(&reg->item);
663
664 return reg;
665 }
666
667 static void opq_reg_free(struct opq_msg_reg **reg)
668 {
669 XFREE(MTYPE_OPQ, (*reg));
670 }
671
672 static struct opq_client_reg *opq_client_alloc(
673 const struct zapi_opaque_reg_info *info)
674 {
675 struct opq_client_reg *client;
676
677 client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
678
679 client->proto = info->proto;
680 client->instance = info->instance;
681 client->session_id = info->session_id;
682
683 return client;
684 }
685
686 static void opq_client_free(struct opq_client_reg **client)
687 {
688 XFREE(MTYPE_OPQ, (*client));
689 }
690
691 static const char *opq_client2str(char *buf, size_t buflen,
692 const struct opq_client_reg *client)
693 {
694 char sbuf[20];
695
696 snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
697 client->instance);
698 if (client->session_id > 0) {
699 snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
700 strlcat(buf, sbuf, buflen);
701 }
702
703 return buf;
704 }
705
706 /* Hash function for clients registered for messages */
707 static uint32_t registration_hash(const struct opq_msg_reg *reg)
708 {
709 return reg->type;
710 }
711
712 /* Comparison function for client registrations */
713 static int registration_compare(const struct opq_msg_reg *reg1,
714 const struct opq_msg_reg *reg2)
715 {
716 if (reg1->type == reg2->type)
717 return 0;
718 else
719 return -1;
720 }