]> git.proxmox.com Git - mirror_frr.git/blame - zebra/zebra_opaque.c
zebra: Prevent installation of connected multiple times
[mirror_frr.git] / zebra / zebra_opaque.c
CommitLineData
9bb02389
MS
1/*
2 * Zebra opaque message handler module
3 * Copyright (c) 2020 Volta Networks, Inc.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20
21#include <zebra.h>
22#include "lib/debug.h"
23#include "lib/frr_pthread.h"
24#include "lib/stream.h"
25#include "zebra/debug.h"
26#include "zebra/zserv.h"
27#include "zebra/zebra_opaque.h"
28
6703a038
MS
29/* Mem type */
30DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
31
32/* Hash to hold message registration info from zapi clients */
33PREDECL_HASH(opq_regh);
34
35/* Registered client info */
36struct opq_client_reg {
37 int proto;
38 int instance;
39 uint32_t session_id;
40
41 struct opq_client_reg *next;
42 struct opq_client_reg *prev;
43};
44
45/* Opaque message registration info */
46struct opq_msg_reg {
47 struct opq_regh_item item;
48
49 /* Message type */
50 uint32_t type;
51
52 struct opq_client_reg *clients;
53};
54
55/* Registration helper prototypes */
56static uint32_t registration_hash(const struct opq_msg_reg *reg);
57static int registration_compare(const struct opq_msg_reg *reg1,
58 const struct opq_msg_reg *reg2);
59
60DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
61 registration_hash);
62
63static struct opq_regh_head opq_reg_hash;
64
9bb02389
MS
65/*
66 * Globals
67 */
68static struct zebra_opaque_globals {
69
70 /* Sentinel for run or start of shutdown */
71 _Atomic uint32_t run;
72
73 /* Limit number of pending, unprocessed updates */
74 _Atomic uint32_t max_queued_updates;
75
76 /* Limit number of new messages dequeued at once, to pace an
77 * incoming burst.
78 */
79 uint32_t msgs_per_cycle;
80
81 /* Stats: counters of incoming messages, errors, and yields (when
82 * the limit has been reached.)
83 */
84 _Atomic uint32_t msgs_in;
85 _Atomic uint32_t msg_errors;
86 _Atomic uint32_t yields;
87
88 /* pthread */
89 struct frr_pthread *pthread;
90
91 /* Event-delivery context 'master' for the module */
92 struct thread_master *master;
93
94 /* Event/'thread' pointer for queued zapi messages */
95 struct thread *t_msgs;
96
97 /* Input fifo queue to the module, and lock to protect it. */
98 pthread_mutex_t mutex;
99 struct stream_fifo in_fifo;
100
101} zo_info;
102
103/* Name string for debugs/logs */
104static const char LOG_NAME[] = "Zebra Opaque";
105
106/* Prototypes */
107
108/* Main event loop, processing incoming message queue */
cc9f21da 109static void process_messages(struct thread *event);
6703a038
MS
110static int handle_opq_registration(const struct zmsghdr *hdr,
111 struct stream *msg);
112static int handle_opq_unregistration(const struct zmsghdr *hdr,
113 struct stream *msg);
114static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
115static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
116static bool opq_client_match(const struct opq_client_reg *client,
117 const struct zapi_opaque_reg_info *info);
118static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
119static void opq_reg_free(struct opq_msg_reg **reg);
120static struct opq_client_reg *opq_client_alloc(
121 const struct zapi_opaque_reg_info *info);
122static void opq_client_free(struct opq_client_reg **client);
123static const char *opq_client2str(char *buf, size_t buflen,
124 const struct opq_client_reg *client);
9bb02389
MS
125
126/*
127 * Initialize the module at startup
128 */
129void zebra_opaque_init(void)
130{
131 memset(&zo_info, 0, sizeof(zo_info));
132
133 pthread_mutex_init(&zo_info.mutex, NULL);
134 stream_fifo_init(&zo_info.in_fifo);
135
136 zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
137}
138
139/*
140 * Start the module pthread. This step is run later than the
141 * 'init' step, in case zebra has fork-ed.
142 */
143void zebra_opaque_start(void)
144{
145 struct frr_pthread_attr pattr = {
146 .start = frr_pthread_attr_default.start,
147 .stop = frr_pthread_attr_default.stop
148 };
149
150 if (IS_ZEBRA_DEBUG_EVENT)
151 zlog_debug("%s module starting", LOG_NAME);
152
153 /* Start pthread */
154 zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
155 "zebra_opaque");
156
157 /* Associate event 'master' */
158 zo_info.master = zo_info.pthread->master;
159
160 atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
161
162 /* Enqueue an initial event for the pthread */
163 thread_add_event(zo_info.master, process_messages, NULL, 0,
164 &zo_info.t_msgs);
165
166 /* And start the pthread */
167 frr_pthread_run(zo_info.pthread, NULL);
168}
169
170/*
171 * Module stop, halting the dedicated pthread; called from the main pthread.
172 */
173void zebra_opaque_stop(void)
174{
175 if (IS_ZEBRA_DEBUG_EVENT)
176 zlog_debug("%s module stop", LOG_NAME);
177
178 atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
179
180 frr_pthread_stop(zo_info.pthread, NULL);
181
182 frr_pthread_destroy(zo_info.pthread);
183
184 if (IS_ZEBRA_DEBUG_EVENT)
185 zlog_debug("%s module stop complete", LOG_NAME);
186}
187
188/*
189 * Module final cleanup, called from the zebra main pthread.
190 */
191void zebra_opaque_finish(void)
192{
6703a038
MS
193 struct opq_msg_reg *reg;
194 struct opq_client_reg *client;
195
9bb02389
MS
196 if (IS_ZEBRA_DEBUG_EVENT)
197 zlog_debug("%s module shutdown", LOG_NAME);
198
6703a038
MS
199 /* Clear out registration info */
200 while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
201 client = reg->clients;
202 while (client) {
203 reg->clients = client->next;
204 opq_client_free(&client);
205 client = reg->clients;
206 }
207
208 opq_reg_free(&reg);
209 }
210
211 opq_regh_fini(&opq_reg_hash);
212
9bb02389
MS
213 pthread_mutex_destroy(&zo_info.mutex);
214 stream_fifo_deinit(&zo_info.in_fifo);
215}
216
217/*
218 * Does this module handle (intercept) the specified zapi message type?
219 */
220bool zebra_opaque_handles_msgid(uint16_t id)
221{
222 bool ret = false;
223
224 switch (id) {
225 case ZEBRA_OPAQUE_MESSAGE:
226 case ZEBRA_OPAQUE_REGISTER:
227 case ZEBRA_OPAQUE_UNREGISTER:
228 ret = true;
229 break;
230 default:
231 break;
232 }
233
234 return ret;
235}
236
237/*
238 * Enqueue a batch of messages for processing - this is the public api
239 * used from the zapi processing threads.
240 */
241uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
242{
243 uint32_t counter = 0;
244 struct stream *msg;
245
246 /* Dequeue messages from the incoming batch, and save them
247 * on the module fifo.
248 */
249 frr_with_mutex(&zo_info.mutex) {
250 msg = stream_fifo_pop(batch);
251 while (msg) {
252 stream_fifo_push(&zo_info.in_fifo, msg);
253 counter++;
254 msg = stream_fifo_pop(batch);
255 }
256 }
257
258 /* Schedule module pthread to process the batch */
259 if (counter > 0) {
674afc2b 260 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
9bb02389
MS
261 zlog_debug("%s: received %u messages",
262 __func__, counter);
263 thread_add_event(zo_info.master, process_messages, NULL, 0,
264 &zo_info.t_msgs);
265 }
266
267 return counter;
268}
269
270/*
271 * Pthread event loop, process the incoming message queue.
272 */
cc9f21da 273static void process_messages(struct thread *event)
9bb02389
MS
274{
275 struct stream_fifo fifo;
276 struct stream *msg;
277 uint32_t i;
278 bool need_resched = false;
279
280 stream_fifo_init(&fifo);
281
282 /* Check for zebra shutdown */
283 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
284 goto done;
285
6703a038
MS
286 /*
287 * Dequeue some messages from the incoming queue, temporarily
9bb02389
MS
288 * save them on the local fifo
289 */
290 frr_with_mutex(&zo_info.mutex) {
291
292 for (i = 0; i < zo_info.msgs_per_cycle; i++) {
293 msg = stream_fifo_pop(&zo_info.in_fifo);
294 if (msg == NULL)
295 break;
296
297 stream_fifo_push(&fifo, msg);
298 }
299
6703a038
MS
300 /*
301 * We may need to reschedule, if there are still
9bb02389
MS
302 * queued messages
303 */
304 if (stream_fifo_head(&zo_info.in_fifo) != NULL)
305 need_resched = true;
306 }
307
308 /* Update stats */
309 atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
310
311 /* Check for zebra shutdown */
312 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
313 need_resched = false;
314 goto done;
315 }
316
674afc2b 317 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
9bb02389
MS
318 zlog_debug("%s: processing %u messages", __func__, i);
319
6703a038
MS
320 /*
321 * Process the messages from the temporary fifo. We send the whole
322 * fifo so that we can take advantage of batching internally. Note
323 * that registration/deregistration messages are handled here also.
324 */
325 dispatch_opq_messages(&fifo);
9bb02389
MS
326
327done:
328
329 if (need_resched) {
330 atomic_fetch_add_explicit(&zo_info.yields, 1,
331 memory_order_relaxed);
332 thread_add_event(zo_info.master, process_messages, NULL, 0,
333 &zo_info.t_msgs);
334 }
335
336 /* This will also free any leftover messages, in the shutdown case */
337 stream_fifo_deinit(&fifo);
9bb02389 338}
6703a038
MS
339
340/*
341 * Process (dispatch) or drop opaque messages.
342 */
343static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
344{
345 struct stream *msg, *dup;
346 struct zmsghdr hdr;
387831ff 347 struct zapi_opaque_msg info;
6703a038 348 struct opq_msg_reg *reg;
387831ff 349 int ret;
6703a038
MS
350 struct opq_client_reg *client;
351 struct zserv *zclient;
352 char buf[50];
353
354 while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
355 zapi_parse_header(msg, &hdr);
356 hdr.length -= ZEBRA_HEADER_SIZE;
357
358 /* Handle client registration messages */
359 if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
360 handle_opq_registration(&hdr, msg);
361 continue;
362 } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
363 handle_opq_unregistration(&hdr, msg);
364 continue;
365 }
366
367 /* We only process OPAQUE messages - drop anything else */
368 if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
369 goto drop_it;
370
371 /* Dispatch to any registered ZAPI client(s) */
372
387831ff
MS
373 /* Extract subtype and flags */
374 ret = zclient_opaque_decode(msg, &info);
375 if (ret != 0)
376 goto drop_it;
6703a038
MS
377
378 /* Look up registered ZAPI client(s) */
387831ff 379 reg = opq_reg_lookup(info.type);
6703a038 380 if (reg == NULL) {
674afc2b 381 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
387831ff
MS
382 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
383 __func__, info.type, info.flags);
6703a038
MS
384 goto drop_it;
385 }
386
387 /* Reset read pointer, since we'll be re-sending message */
388 stream_set_getp(msg, 0);
389
390 /* Send a copy of the message to all registered clients */
391 for (client = reg->clients; client; client = client->next) {
392 dup = NULL;
393
c8b27f2a
MS
394 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
395
396 if (client->proto != info.proto ||
397 client->instance != info.instance ||
398 client->session_id != info.session_id)
399 continue;
400
674afc2b
MS
401 if (IS_ZEBRA_DEBUG_RECV &&
402 IS_ZEBRA_DEBUG_DETAIL)
c8b27f2a
MS
403 zlog_debug("%s: found matching unicast client %s",
404 __func__,
405 opq_client2str(buf,
406 sizeof(buf),
407 client));
408
409 } else {
410 /* Copy message if more clients */
411 if (client->next)
412 dup = stream_dup(msg);
413 }
6703a038
MS
414
415 /*
416 * TODO -- this isn't ideal: we're going through an
417 * acquire/release cycle for each client for each
418 * message. Replace this with a batching version.
419 */
420 zclient = zserv_acquire_client(client->proto,
421 client->instance,
422 client->session_id);
423 if (zclient) {
674afc2b
MS
424 if (IS_ZEBRA_DEBUG_SEND &&
425 IS_ZEBRA_DEBUG_DETAIL)
6703a038
MS
426 zlog_debug("%s: sending %s to client %s",
427 __func__,
428 (dup ? "dup" : "msg"),
429 opq_client2str(buf,
430 sizeof(buf),
431 client));
432
433 /*
434 * Sending a message actually means enqueuing
435 * it for a zapi io pthread to send - so we
436 * don't touch the message after this call.
437 */
438 zserv_send_message(zclient, dup ? dup : msg);
439 if (dup)
440 dup = NULL;
441 else
442 msg = NULL;
443
444 zserv_release_client(zclient);
445 } else {
674afc2b
MS
446 if (IS_ZEBRA_DEBUG_RECV &&
447 IS_ZEBRA_DEBUG_DETAIL)
6703a038 448 zlog_debug("%s: type %u: no zclient for %s",
387831ff 449 __func__, info.type,
6703a038
MS
450 opq_client2str(buf,
451 sizeof(buf),
452 client));
453 /* Registered but gone? */
454 if (dup)
455 stream_free(dup);
456 }
c8b27f2a
MS
457
458 /* If unicast, we're done */
459 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST))
460 break;
6703a038
MS
461 }
462
463drop_it:
387831ff 464
6703a038
MS
465 if (msg)
466 stream_free(msg);
467 }
468
469 return 0;
470}
471
472/*
473 * Process a register/unregister message
474 */
475static int handle_opq_registration(const struct zmsghdr *hdr,
476 struct stream *msg)
477{
478 int ret = 0;
479 struct zapi_opaque_reg_info info;
480 struct opq_client_reg *client;
481 struct opq_msg_reg key, *reg;
482 char buf[50];
483
484 memset(&info, 0, sizeof(info));
485
d2ddc141 486 if (zapi_opaque_reg_decode(msg, &info) < 0) {
6703a038
MS
487 ret = -1;
488 goto done;
489 }
490
491 memset(&key, 0, sizeof(key));
492
493 key.type = info.type;
494
495 reg = opq_regh_find(&opq_reg_hash, &key);
496 if (reg) {
497 /* Look for dup client */
498 for (client = reg->clients; client != NULL;
499 client = client->next) {
500 if (opq_client_match(client, &info))
501 break;
502 }
503
504 if (client) {
505 /* Oops - duplicate registration? */
506 if (IS_ZEBRA_DEBUG_RECV)
507 zlog_debug("%s: duplicate opq reg for client %s",
508 __func__,
509 opq_client2str(buf, sizeof(buf),
510 client));
511 goto done;
512 }
513
514 client = opq_client_alloc(&info);
515
516 if (IS_ZEBRA_DEBUG_RECV)
517 zlog_debug("%s: client %s registers for %u",
518 __func__,
519 opq_client2str(buf, sizeof(buf), client),
520 info.type);
521
522 /* Link client into registration */
523 client->next = reg->clients;
524 if (reg->clients)
525 reg->clients->prev = client;
526 reg->clients = client;
527 } else {
528 /*
529 * No existing registrations - create one, add the
530 * client, and add registration to hash.
531 */
532 reg = opq_reg_alloc(info.type);
533 client = opq_client_alloc(&info);
534
535 if (IS_ZEBRA_DEBUG_RECV)
536 zlog_debug("%s: client %s registers for new reg %u",
537 __func__,
538 opq_client2str(buf, sizeof(buf), client),
539 info.type);
540
541 reg->clients = client;
542
543 opq_regh_add(&opq_reg_hash, reg);
544 }
545
546done:
547
548 stream_free(msg);
549 return ret;
550}
551
552/*
553 * Process a register/unregister message
554 */
555static int handle_opq_unregistration(const struct zmsghdr *hdr,
556 struct stream *msg)
557{
558 int ret = 0;
559 struct zapi_opaque_reg_info info;
560 struct opq_client_reg *client;
561 struct opq_msg_reg key, *reg;
562 char buf[50];
563
564 memset(&info, 0, sizeof(info));
565
d2ddc141 566 if (zapi_opaque_reg_decode(msg, &info) < 0) {
6703a038
MS
567 ret = -1;
568 goto done;
569 }
570
571 memset(&key, 0, sizeof(key));
572
573 key.type = info.type;
574
575 reg = opq_regh_find(&opq_reg_hash, &key);
576 if (reg == NULL) {
577 /* Weird: unregister for unknown message? */
578 if (IS_ZEBRA_DEBUG_RECV)
579 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
580 __func__,
581 zebra_route_string(info.proto),
582 info.instance, info.session_id, info.type);
583 goto done;
584 }
585
586 /* Look for client */
587 for (client = reg->clients; client != NULL;
588 client = client->next) {
589 if (opq_client_match(client, &info))
590 break;
591 }
592
593 if (client == NULL) {
594 /* Oops - unregister for unknown client? */
595 if (IS_ZEBRA_DEBUG_RECV)
596 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
597 __func__, zebra_route_string(info.proto),
598 info.instance, info.session_id, info.type);
599 goto done;
600 }
601
602 if (IS_ZEBRA_DEBUG_RECV)
603 zlog_debug("%s: client %s unregisters for %u",
604 __func__, opq_client2str(buf, sizeof(buf), client),
605 info.type);
606
607 if (client->prev)
608 client->prev->next = client->next;
609 if (client->next)
610 client->next->prev = client->prev;
611 if (reg->clients == client)
612 reg->clients = client->next;
613
614 opq_client_free(&client);
615
616 /* Is registration empty now? */
617 if (reg->clients == NULL) {
6703a038
MS
618 opq_regh_del(&opq_reg_hash, reg);
619 opq_reg_free(&reg);
620 }
621
622done:
623
624 stream_free(msg);
625 return ret;
626}
627
628/* Compare utility for registered clients */
629static bool opq_client_match(const struct opq_client_reg *client,
630 const struct zapi_opaque_reg_info *info)
631{
632 if (client->proto == info->proto &&
633 client->instance == info->instance &&
634 client->session_id == info->session_id)
635 return true;
636 else
637 return false;
638}
639
640static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
641{
642 struct opq_msg_reg key, *reg;
643
644 memset(&key, 0, sizeof(key));
645
646 key.type = type;
647
648 reg = opq_regh_find(&opq_reg_hash, &key);
649
650 return reg;
651}
652
653static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
654{
655 struct opq_msg_reg *reg;
656
657 reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
658
659 reg->type = type;
660 INIT_HASH(&reg->item);
661
662 return reg;
663}
664
665static void opq_reg_free(struct opq_msg_reg **reg)
666{
667 XFREE(MTYPE_OPQ, (*reg));
668}
669
670static struct opq_client_reg *opq_client_alloc(
671 const struct zapi_opaque_reg_info *info)
672{
673 struct opq_client_reg *client;
674
675 client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
676
677 client->proto = info->proto;
678 client->instance = info->instance;
679 client->session_id = info->session_id;
680
681 return client;
682}
683
684static void opq_client_free(struct opq_client_reg **client)
685{
686 XFREE(MTYPE_OPQ, (*client));
687}
688
689static const char *opq_client2str(char *buf, size_t buflen,
690 const struct opq_client_reg *client)
691{
692 char sbuf[20];
693
694 snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
695 client->instance);
696 if (client->session_id > 0) {
697 snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
698 strlcat(buf, sbuf, buflen);
699 }
700
701 return buf;
702}
703
704/* Hash function for clients registered for messages */
705static uint32_t registration_hash(const struct opq_msg_reg *reg)
706{
707 return reg->type;
708}
709
710/* Comparison function for client registrations */
711static int registration_compare(const struct opq_msg_reg *reg1,
712 const struct opq_msg_reg *reg2)
713{
714 if (reg1->type == reg2->type)
715 return 0;
716 else
717 return -1;
718}