]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_opaque.c
doc: Add `show ipv6 rpf X:X::X:X` command to docs
[mirror_frr.git] / zebra / zebra_opaque.c
1 /*
2 * Zebra opaque message handler module
3 * Copyright (c) 2020 Volta Networks, Inc.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20
21 #include <zebra.h>
22 #include "lib/debug.h"
23 #include "lib/frr_pthread.h"
24 #include "lib/stream.h"
25 #include "zebra/debug.h"
26 #include "zebra/zserv.h"
27 #include "zebra/zebra_opaque.h"
28 #include "zebra/rib.h"
29
30 /* Mem type */
31 DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
32
33 /* Hash to hold message registration info from zapi clients */
34 PREDECL_HASH(opq_regh);
35
36 /* Registered client info */
37 struct opq_client_reg {
38 int proto;
39 int instance;
40 uint32_t session_id;
41
42 struct opq_client_reg *next;
43 struct opq_client_reg *prev;
44 };
45
46 /* Opaque message registration info */
47 struct opq_msg_reg {
48 struct opq_regh_item item;
49
50 /* Message type */
51 uint32_t type;
52
53 struct opq_client_reg *clients;
54 };
55
56 /* Registration helper prototypes */
57 static uint32_t registration_hash(const struct opq_msg_reg *reg);
58 static int registration_compare(const struct opq_msg_reg *reg1,
59 const struct opq_msg_reg *reg2);
60
61 DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
62 registration_hash);
63
64 static struct opq_regh_head opq_reg_hash;
65
66 /*
67 * Globals
68 */
69 static struct zebra_opaque_globals {
70
71 /* Sentinel for run or start of shutdown */
72 _Atomic uint32_t run;
73
74 /* Limit number of pending, unprocessed updates */
75 _Atomic uint32_t max_queued_updates;
76
77 /* Limit number of new messages dequeued at once, to pace an
78 * incoming burst.
79 */
80 uint32_t msgs_per_cycle;
81
82 /* Stats: counters of incoming messages, errors, and yields (when
83 * the limit has been reached.)
84 */
85 _Atomic uint32_t msgs_in;
86 _Atomic uint32_t msg_errors;
87 _Atomic uint32_t yields;
88
89 /* pthread */
90 struct frr_pthread *pthread;
91
92 /* Event-delivery context 'master' for the module */
93 struct thread_master *master;
94
95 /* Event/'thread' pointer for queued zapi messages */
96 struct thread *t_msgs;
97
98 /* Input fifo queue to the module, and lock to protect it. */
99 pthread_mutex_t mutex;
100 struct stream_fifo in_fifo;
101
102 } zo_info;
103
104 /* Name string for debugs/logs */
105 static const char LOG_NAME[] = "Zebra Opaque";
106
107 /* Prototypes */
108
109 /* Main event loop, processing incoming message queue */
110 static void process_messages(struct thread *event);
111 static int handle_opq_registration(const struct zmsghdr *hdr,
112 struct stream *msg);
113 static int handle_opq_unregistration(const struct zmsghdr *hdr,
114 struct stream *msg);
115 static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
116 static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
117 static bool opq_client_match(const struct opq_client_reg *client,
118 const struct zapi_opaque_reg_info *info);
119 static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
120 static void opq_reg_free(struct opq_msg_reg **reg);
121 static struct opq_client_reg *opq_client_alloc(
122 const struct zapi_opaque_reg_info *info);
123 static void opq_client_free(struct opq_client_reg **client);
124 static const char *opq_client2str(char *buf, size_t buflen,
125 const struct opq_client_reg *client);
126
127 /*
128 * Initialize the module at startup
129 */
130 void zebra_opaque_init(void)
131 {
132 memset(&zo_info, 0, sizeof(zo_info));
133
134 pthread_mutex_init(&zo_info.mutex, NULL);
135 stream_fifo_init(&zo_info.in_fifo);
136
137 zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
138 }
139
140 /*
141 * Start the module pthread. This step is run later than the
142 * 'init' step, in case zebra has fork-ed.
143 */
144 void zebra_opaque_start(void)
145 {
146 struct frr_pthread_attr pattr = {
147 .start = frr_pthread_attr_default.start,
148 .stop = frr_pthread_attr_default.stop
149 };
150
151 if (IS_ZEBRA_DEBUG_EVENT)
152 zlog_debug("%s module starting", LOG_NAME);
153
154 /* Start pthread */
155 zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
156 "zebra_opaque");
157
158 /* Associate event 'master' */
159 zo_info.master = zo_info.pthread->master;
160
161 atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
162
163 /* Enqueue an initial event for the pthread */
164 thread_add_event(zo_info.master, process_messages, NULL, 0,
165 &zo_info.t_msgs);
166
167 /* And start the pthread */
168 frr_pthread_run(zo_info.pthread, NULL);
169 }
170
171 /*
172 * Module stop, halting the dedicated pthread; called from the main pthread.
173 */
174 void zebra_opaque_stop(void)
175 {
176 if (IS_ZEBRA_DEBUG_EVENT)
177 zlog_debug("%s module stop", LOG_NAME);
178
179 atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
180
181 frr_pthread_stop(zo_info.pthread, NULL);
182
183 frr_pthread_destroy(zo_info.pthread);
184
185 if (IS_ZEBRA_DEBUG_EVENT)
186 zlog_debug("%s module stop complete", LOG_NAME);
187 }
188
189 /*
190 * Module final cleanup, called from the zebra main pthread.
191 */
192 void zebra_opaque_finish(void)
193 {
194 struct opq_msg_reg *reg;
195 struct opq_client_reg *client;
196
197 if (IS_ZEBRA_DEBUG_EVENT)
198 zlog_debug("%s module shutdown", LOG_NAME);
199
200 /* Clear out registration info */
201 while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
202 client = reg->clients;
203 while (client) {
204 reg->clients = client->next;
205 opq_client_free(&client);
206 client = reg->clients;
207 }
208
209 opq_reg_free(&reg);
210 }
211
212 opq_regh_fini(&opq_reg_hash);
213
214 pthread_mutex_destroy(&zo_info.mutex);
215 stream_fifo_deinit(&zo_info.in_fifo);
216 }
217
218 /*
219 * Does this module handle (intercept) the specified zapi message type?
220 */
221 bool zebra_opaque_handles_msgid(uint16_t id)
222 {
223 bool ret = false;
224
225 switch (id) {
226 case ZEBRA_OPAQUE_MESSAGE:
227 case ZEBRA_OPAQUE_REGISTER:
228 case ZEBRA_OPAQUE_UNREGISTER:
229 ret = true;
230 break;
231 default:
232 break;
233 }
234
235 return ret;
236 }
237
238 /*
239 * Enqueue a batch of messages for processing - this is the public api
240 * used from the zapi processing threads.
241 */
242 uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
243 {
244 uint32_t counter = 0;
245 struct stream *msg;
246
247 /* Dequeue messages from the incoming batch, and save them
248 * on the module fifo.
249 */
250 frr_with_mutex (&zo_info.mutex) {
251 msg = stream_fifo_pop(batch);
252 while (msg) {
253 stream_fifo_push(&zo_info.in_fifo, msg);
254 counter++;
255 msg = stream_fifo_pop(batch);
256 }
257 }
258
259 /* Schedule module pthread to process the batch */
260 if (counter > 0) {
261 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
262 zlog_debug("%s: received %u messages",
263 __func__, counter);
264 thread_add_event(zo_info.master, process_messages, NULL, 0,
265 &zo_info.t_msgs);
266 }
267
268 return counter;
269 }
270
271 /*
272 * Pthread event loop, process the incoming message queue.
273 */
274 static void process_messages(struct thread *event)
275 {
276 struct stream_fifo fifo;
277 struct stream *msg;
278 uint32_t i;
279 bool need_resched = false;
280
281 stream_fifo_init(&fifo);
282
283 /* Check for zebra shutdown */
284 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
285 goto done;
286
287 /*
288 * Dequeue some messages from the incoming queue, temporarily
289 * save them on the local fifo
290 */
291 frr_with_mutex (&zo_info.mutex) {
292
293 for (i = 0; i < zo_info.msgs_per_cycle; i++) {
294 msg = stream_fifo_pop(&zo_info.in_fifo);
295 if (msg == NULL)
296 break;
297
298 stream_fifo_push(&fifo, msg);
299 }
300
301 /*
302 * We may need to reschedule, if there are still
303 * queued messages
304 */
305 if (stream_fifo_head(&zo_info.in_fifo) != NULL)
306 need_resched = true;
307 }
308
309 /* Update stats */
310 atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
311
312 /* Check for zebra shutdown */
313 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
314 need_resched = false;
315 goto done;
316 }
317
318 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
319 zlog_debug("%s: processing %u messages", __func__, i);
320
321 /*
322 * Process the messages from the temporary fifo. We send the whole
323 * fifo so that we can take advantage of batching internally. Note
324 * that registration/deregistration messages are handled here also.
325 */
326 dispatch_opq_messages(&fifo);
327
328 done:
329
330 if (need_resched) {
331 atomic_fetch_add_explicit(&zo_info.yields, 1,
332 memory_order_relaxed);
333 thread_add_event(zo_info.master, process_messages, NULL, 0,
334 &zo_info.t_msgs);
335 }
336
337 /* This will also free any leftover messages, in the shutdown case */
338 stream_fifo_deinit(&fifo);
339 }
340
341 /*
342 * Process (dispatch) or drop opaque messages.
343 */
344 static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
345 {
346 struct stream *msg, *dup;
347 struct zmsghdr hdr;
348 struct zapi_opaque_msg info;
349 struct opq_msg_reg *reg;
350 int ret;
351 struct opq_client_reg *client;
352 struct zserv *zclient;
353 char buf[50];
354
355 while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
356 zapi_parse_header(msg, &hdr);
357 hdr.length -= ZEBRA_HEADER_SIZE;
358
359 /* Handle client registration messages */
360 if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
361 handle_opq_registration(&hdr, msg);
362 continue;
363 } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
364 handle_opq_unregistration(&hdr, msg);
365 continue;
366 }
367
368 /* We only process OPAQUE messages - drop anything else */
369 if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
370 goto drop_it;
371
372 /* Dispatch to any registered ZAPI client(s) */
373
374 /* Extract subtype and flags */
375 ret = zclient_opaque_decode(msg, &info);
376 if (ret != 0)
377 goto drop_it;
378
379 /* Look up registered ZAPI client(s) */
380 reg = opq_reg_lookup(info.type);
381 if (reg == NULL) {
382 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
383 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
384 __func__, info.type, info.flags);
385 goto drop_it;
386 }
387
388 /* Reset read pointer, since we'll be re-sending message */
389 stream_set_getp(msg, 0);
390
391 /* Send a copy of the message to all registered clients */
392 for (client = reg->clients; client; client = client->next) {
393 dup = NULL;
394
395 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
396
397 if (client->proto != info.proto ||
398 client->instance != info.instance ||
399 client->session_id != info.session_id)
400 continue;
401
402 if (IS_ZEBRA_DEBUG_RECV &&
403 IS_ZEBRA_DEBUG_DETAIL)
404 zlog_debug("%s: found matching unicast client %s",
405 __func__,
406 opq_client2str(buf,
407 sizeof(buf),
408 client));
409
410 } else {
411 /* Copy message if more clients */
412 if (client->next)
413 dup = stream_dup(msg);
414 }
415
416 /*
417 * TODO -- this isn't ideal: we're going through an
418 * acquire/release cycle for each client for each
419 * message. Replace this with a batching version.
420 */
421 zclient = zserv_acquire_client(client->proto,
422 client->instance,
423 client->session_id);
424 if (zclient) {
425 if (IS_ZEBRA_DEBUG_SEND &&
426 IS_ZEBRA_DEBUG_DETAIL)
427 zlog_debug("%s: sending %s to client %s",
428 __func__,
429 (dup ? "dup" : "msg"),
430 opq_client2str(buf,
431 sizeof(buf),
432 client));
433
434 /*
435 * Sending a message actually means enqueuing
436 * it for a zapi io pthread to send - so we
437 * don't touch the message after this call.
438 */
439 zserv_send_message(zclient, dup ? dup : msg);
440 if (dup)
441 dup = NULL;
442 else
443 msg = NULL;
444
445 zserv_release_client(zclient);
446 } else {
447 if (IS_ZEBRA_DEBUG_RECV &&
448 IS_ZEBRA_DEBUG_DETAIL)
449 zlog_debug("%s: type %u: no zclient for %s",
450 __func__, info.type,
451 opq_client2str(buf,
452 sizeof(buf),
453 client));
454 /* Registered but gone? */
455 if (dup)
456 stream_free(dup);
457 }
458
459 /* If unicast, we're done */
460 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST))
461 break;
462 }
463
464 drop_it:
465
466 if (msg)
467 stream_free(msg);
468 }
469
470 return 0;
471 }
472
473 /*
474 * Process a register/unregister message
475 */
476 static int handle_opq_registration(const struct zmsghdr *hdr,
477 struct stream *msg)
478 {
479 int ret = 0;
480 struct zapi_opaque_reg_info info;
481 struct opq_client_reg *client;
482 struct opq_msg_reg key, *reg;
483 char buf[50];
484
485 memset(&info, 0, sizeof(info));
486
487 if (zapi_opaque_reg_decode(msg, &info) < 0) {
488 ret = -1;
489 goto done;
490 }
491
492 memset(&key, 0, sizeof(key));
493
494 key.type = info.type;
495
496 reg = opq_regh_find(&opq_reg_hash, &key);
497 if (reg) {
498 /* Look for dup client */
499 for (client = reg->clients; client != NULL;
500 client = client->next) {
501 if (opq_client_match(client, &info))
502 break;
503 }
504
505 if (client) {
506 /* Oops - duplicate registration? */
507 if (IS_ZEBRA_DEBUG_RECV)
508 zlog_debug("%s: duplicate opq reg for client %s",
509 __func__,
510 opq_client2str(buf, sizeof(buf),
511 client));
512 goto done;
513 }
514
515 client = opq_client_alloc(&info);
516
517 if (IS_ZEBRA_DEBUG_RECV)
518 zlog_debug("%s: client %s registers for %u",
519 __func__,
520 opq_client2str(buf, sizeof(buf), client),
521 info.type);
522
523 /* Link client into registration */
524 client->next = reg->clients;
525 if (reg->clients)
526 reg->clients->prev = client;
527 reg->clients = client;
528 } else {
529 /*
530 * No existing registrations - create one, add the
531 * client, and add registration to hash.
532 */
533 reg = opq_reg_alloc(info.type);
534 client = opq_client_alloc(&info);
535
536 if (IS_ZEBRA_DEBUG_RECV)
537 zlog_debug("%s: client %s registers for new reg %u",
538 __func__,
539 opq_client2str(buf, sizeof(buf), client),
540 info.type);
541
542 reg->clients = client;
543
544 opq_regh_add(&opq_reg_hash, reg);
545 }
546
547 done:
548
549 stream_free(msg);
550 return ret;
551 }
552
553 /*
554 * Process a register/unregister message
555 */
556 static int handle_opq_unregistration(const struct zmsghdr *hdr,
557 struct stream *msg)
558 {
559 int ret = 0;
560 struct zapi_opaque_reg_info info;
561 struct opq_client_reg *client;
562 struct opq_msg_reg key, *reg;
563 char buf[50];
564
565 memset(&info, 0, sizeof(info));
566
567 if (zapi_opaque_reg_decode(msg, &info) < 0) {
568 ret = -1;
569 goto done;
570 }
571
572 memset(&key, 0, sizeof(key));
573
574 key.type = info.type;
575
576 reg = opq_regh_find(&opq_reg_hash, &key);
577 if (reg == NULL) {
578 /* Weird: unregister for unknown message? */
579 if (IS_ZEBRA_DEBUG_RECV)
580 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
581 __func__,
582 zebra_route_string(info.proto),
583 info.instance, info.session_id, info.type);
584 goto done;
585 }
586
587 /* Look for client */
588 for (client = reg->clients; client != NULL;
589 client = client->next) {
590 if (opq_client_match(client, &info))
591 break;
592 }
593
594 if (client == NULL) {
595 /* Oops - unregister for unknown client? */
596 if (IS_ZEBRA_DEBUG_RECV)
597 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
598 __func__, zebra_route_string(info.proto),
599 info.instance, info.session_id, info.type);
600 goto done;
601 }
602
603 if (IS_ZEBRA_DEBUG_RECV)
604 zlog_debug("%s: client %s unregisters for %u",
605 __func__, opq_client2str(buf, sizeof(buf), client),
606 info.type);
607
608 if (client->prev)
609 client->prev->next = client->next;
610 if (client->next)
611 client->next->prev = client->prev;
612 if (reg->clients == client)
613 reg->clients = client->next;
614
615 opq_client_free(&client);
616
617 /* Is registration empty now? */
618 if (reg->clients == NULL) {
619 opq_regh_del(&opq_reg_hash, reg);
620 opq_reg_free(&reg);
621 }
622
623 done:
624
625 stream_free(msg);
626 return ret;
627 }
628
629 /* Compare utility for registered clients */
630 static bool opq_client_match(const struct opq_client_reg *client,
631 const struct zapi_opaque_reg_info *info)
632 {
633 if (client->proto == info->proto &&
634 client->instance == info->instance &&
635 client->session_id == info->session_id)
636 return true;
637 else
638 return false;
639 }
640
641 static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
642 {
643 struct opq_msg_reg key, *reg;
644
645 memset(&key, 0, sizeof(key));
646
647 key.type = type;
648
649 reg = opq_regh_find(&opq_reg_hash, &key);
650
651 return reg;
652 }
653
654 static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
655 {
656 struct opq_msg_reg *reg;
657
658 reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
659
660 reg->type = type;
661 INIT_HASH(&reg->item);
662
663 return reg;
664 }
665
666 static void opq_reg_free(struct opq_msg_reg **reg)
667 {
668 XFREE(MTYPE_OPQ, (*reg));
669 }
670
671 static struct opq_client_reg *opq_client_alloc(
672 const struct zapi_opaque_reg_info *info)
673 {
674 struct opq_client_reg *client;
675
676 client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
677
678 client->proto = info->proto;
679 client->instance = info->instance;
680 client->session_id = info->session_id;
681
682 return client;
683 }
684
685 static void opq_client_free(struct opq_client_reg **client)
686 {
687 XFREE(MTYPE_OPQ, (*client));
688 }
689
690 static const char *opq_client2str(char *buf, size_t buflen,
691 const struct opq_client_reg *client)
692 {
693 char sbuf[20];
694
695 snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
696 client->instance);
697 if (client->session_id > 0) {
698 snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
699 strlcat(buf, sbuf, buflen);
700 }
701
702 return buf;
703 }
704
705 /* Hash function for clients registered for messages */
706 static uint32_t registration_hash(const struct opq_msg_reg *reg)
707 {
708 return reg->type;
709 }
710
711 /* Comparison function for client registrations */
712 static int registration_compare(const struct opq_msg_reg *reg1,
713 const struct opq_msg_reg *reg2)
714 {
715 if (reg1->type == reg2->type)
716 return 0;
717 else
718 return -1;
719 }