]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_opaque.c
zebra: Prevent installation of connected multiple times
[mirror_frr.git] / zebra / zebra_opaque.c
1 /*
2 * Zebra opaque message handler module
3 * Copyright (c) 2020 Volta Networks, Inc.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20
21 #include <zebra.h>
22 #include "lib/debug.h"
23 #include "lib/frr_pthread.h"
24 #include "lib/stream.h"
25 #include "zebra/debug.h"
26 #include "zebra/zserv.h"
27 #include "zebra/zebra_opaque.h"
28
29 /* Mem type */
30 DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
31
32 /* Hash to hold message registration info from zapi clients */
33 PREDECL_HASH(opq_regh);
34
35 /* Registered client info */
36 struct opq_client_reg {
37 int proto;
38 int instance;
39 uint32_t session_id;
40
41 struct opq_client_reg *next;
42 struct opq_client_reg *prev;
43 };
44
45 /* Opaque message registration info */
46 struct opq_msg_reg {
47 struct opq_regh_item item;
48
49 /* Message type */
50 uint32_t type;
51
52 struct opq_client_reg *clients;
53 };
54
55 /* Registration helper prototypes */
56 static uint32_t registration_hash(const struct opq_msg_reg *reg);
57 static int registration_compare(const struct opq_msg_reg *reg1,
58 const struct opq_msg_reg *reg2);
59
60 DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
61 registration_hash);
62
63 static struct opq_regh_head opq_reg_hash;
64
65 /*
66 * Globals
67 */
68 static struct zebra_opaque_globals {
69
70 /* Sentinel for run or start of shutdown */
71 _Atomic uint32_t run;
72
73 /* Limit number of pending, unprocessed updates */
74 _Atomic uint32_t max_queued_updates;
75
76 /* Limit number of new messages dequeued at once, to pace an
77 * incoming burst.
78 */
79 uint32_t msgs_per_cycle;
80
81 /* Stats: counters of incoming messages, errors, and yields (when
82 * the limit has been reached.)
83 */
84 _Atomic uint32_t msgs_in;
85 _Atomic uint32_t msg_errors;
86 _Atomic uint32_t yields;
87
88 /* pthread */
89 struct frr_pthread *pthread;
90
91 /* Event-delivery context 'master' for the module */
92 struct thread_master *master;
93
94 /* Event/'thread' pointer for queued zapi messages */
95 struct thread *t_msgs;
96
97 /* Input fifo queue to the module, and lock to protect it. */
98 pthread_mutex_t mutex;
99 struct stream_fifo in_fifo;
100
101 } zo_info;
102
103 /* Name string for debugs/logs */
104 static const char LOG_NAME[] = "Zebra Opaque";
105
106 /* Prototypes */
107
108 /* Main event loop, processing incoming message queue */
109 static void process_messages(struct thread *event);
110 static int handle_opq_registration(const struct zmsghdr *hdr,
111 struct stream *msg);
112 static int handle_opq_unregistration(const struct zmsghdr *hdr,
113 struct stream *msg);
114 static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
115 static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
116 static bool opq_client_match(const struct opq_client_reg *client,
117 const struct zapi_opaque_reg_info *info);
118 static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
119 static void opq_reg_free(struct opq_msg_reg **reg);
120 static struct opq_client_reg *opq_client_alloc(
121 const struct zapi_opaque_reg_info *info);
122 static void opq_client_free(struct opq_client_reg **client);
123 static const char *opq_client2str(char *buf, size_t buflen,
124 const struct opq_client_reg *client);
125
126 /*
127 * Initialize the module at startup
128 */
129 void zebra_opaque_init(void)
130 {
131 memset(&zo_info, 0, sizeof(zo_info));
132
133 pthread_mutex_init(&zo_info.mutex, NULL);
134 stream_fifo_init(&zo_info.in_fifo);
135
136 zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
137 }
138
139 /*
140 * Start the module pthread. This step is run later than the
141 * 'init' step, in case zebra has fork-ed.
142 */
143 void zebra_opaque_start(void)
144 {
145 struct frr_pthread_attr pattr = {
146 .start = frr_pthread_attr_default.start,
147 .stop = frr_pthread_attr_default.stop
148 };
149
150 if (IS_ZEBRA_DEBUG_EVENT)
151 zlog_debug("%s module starting", LOG_NAME);
152
153 /* Start pthread */
154 zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
155 "zebra_opaque");
156
157 /* Associate event 'master' */
158 zo_info.master = zo_info.pthread->master;
159
160 atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
161
162 /* Enqueue an initial event for the pthread */
163 thread_add_event(zo_info.master, process_messages, NULL, 0,
164 &zo_info.t_msgs);
165
166 /* And start the pthread */
167 frr_pthread_run(zo_info.pthread, NULL);
168 }
169
170 /*
171 * Module stop, halting the dedicated pthread; called from the main pthread.
172 */
173 void zebra_opaque_stop(void)
174 {
175 if (IS_ZEBRA_DEBUG_EVENT)
176 zlog_debug("%s module stop", LOG_NAME);
177
178 atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
179
180 frr_pthread_stop(zo_info.pthread, NULL);
181
182 frr_pthread_destroy(zo_info.pthread);
183
184 if (IS_ZEBRA_DEBUG_EVENT)
185 zlog_debug("%s module stop complete", LOG_NAME);
186 }
187
188 /*
189 * Module final cleanup, called from the zebra main pthread.
190 */
191 void zebra_opaque_finish(void)
192 {
193 struct opq_msg_reg *reg;
194 struct opq_client_reg *client;
195
196 if (IS_ZEBRA_DEBUG_EVENT)
197 zlog_debug("%s module shutdown", LOG_NAME);
198
199 /* Clear out registration info */
200 while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
201 client = reg->clients;
202 while (client) {
203 reg->clients = client->next;
204 opq_client_free(&client);
205 client = reg->clients;
206 }
207
208 opq_reg_free(&reg);
209 }
210
211 opq_regh_fini(&opq_reg_hash);
212
213 pthread_mutex_destroy(&zo_info.mutex);
214 stream_fifo_deinit(&zo_info.in_fifo);
215 }
216
217 /*
218 * Does this module handle (intercept) the specified zapi message type?
219 */
220 bool zebra_opaque_handles_msgid(uint16_t id)
221 {
222 bool ret = false;
223
224 switch (id) {
225 case ZEBRA_OPAQUE_MESSAGE:
226 case ZEBRA_OPAQUE_REGISTER:
227 case ZEBRA_OPAQUE_UNREGISTER:
228 ret = true;
229 break;
230 default:
231 break;
232 }
233
234 return ret;
235 }
236
237 /*
238 * Enqueue a batch of messages for processing - this is the public api
239 * used from the zapi processing threads.
240 */
241 uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
242 {
243 uint32_t counter = 0;
244 struct stream *msg;
245
246 /* Dequeue messages from the incoming batch, and save them
247 * on the module fifo.
248 */
249 frr_with_mutex(&zo_info.mutex) {
250 msg = stream_fifo_pop(batch);
251 while (msg) {
252 stream_fifo_push(&zo_info.in_fifo, msg);
253 counter++;
254 msg = stream_fifo_pop(batch);
255 }
256 }
257
258 /* Schedule module pthread to process the batch */
259 if (counter > 0) {
260 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
261 zlog_debug("%s: received %u messages",
262 __func__, counter);
263 thread_add_event(zo_info.master, process_messages, NULL, 0,
264 &zo_info.t_msgs);
265 }
266
267 return counter;
268 }
269
270 /*
271 * Pthread event loop, process the incoming message queue.
272 */
273 static void process_messages(struct thread *event)
274 {
275 struct stream_fifo fifo;
276 struct stream *msg;
277 uint32_t i;
278 bool need_resched = false;
279
280 stream_fifo_init(&fifo);
281
282 /* Check for zebra shutdown */
283 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
284 goto done;
285
286 /*
287 * Dequeue some messages from the incoming queue, temporarily
288 * save them on the local fifo
289 */
290 frr_with_mutex(&zo_info.mutex) {
291
292 for (i = 0; i < zo_info.msgs_per_cycle; i++) {
293 msg = stream_fifo_pop(&zo_info.in_fifo);
294 if (msg == NULL)
295 break;
296
297 stream_fifo_push(&fifo, msg);
298 }
299
300 /*
301 * We may need to reschedule, if there are still
302 * queued messages
303 */
304 if (stream_fifo_head(&zo_info.in_fifo) != NULL)
305 need_resched = true;
306 }
307
308 /* Update stats */
309 atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
310
311 /* Check for zebra shutdown */
312 if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
313 need_resched = false;
314 goto done;
315 }
316
317 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
318 zlog_debug("%s: processing %u messages", __func__, i);
319
320 /*
321 * Process the messages from the temporary fifo. We send the whole
322 * fifo so that we can take advantage of batching internally. Note
323 * that registration/deregistration messages are handled here also.
324 */
325 dispatch_opq_messages(&fifo);
326
327 done:
328
329 if (need_resched) {
330 atomic_fetch_add_explicit(&zo_info.yields, 1,
331 memory_order_relaxed);
332 thread_add_event(zo_info.master, process_messages, NULL, 0,
333 &zo_info.t_msgs);
334 }
335
336 /* This will also free any leftover messages, in the shutdown case */
337 stream_fifo_deinit(&fifo);
338 }
339
340 /*
341 * Process (dispatch) or drop opaque messages.
342 */
343 static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
344 {
345 struct stream *msg, *dup;
346 struct zmsghdr hdr;
347 struct zapi_opaque_msg info;
348 struct opq_msg_reg *reg;
349 int ret;
350 struct opq_client_reg *client;
351 struct zserv *zclient;
352 char buf[50];
353
354 while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
355 zapi_parse_header(msg, &hdr);
356 hdr.length -= ZEBRA_HEADER_SIZE;
357
358 /* Handle client registration messages */
359 if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
360 handle_opq_registration(&hdr, msg);
361 continue;
362 } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
363 handle_opq_unregistration(&hdr, msg);
364 continue;
365 }
366
367 /* We only process OPAQUE messages - drop anything else */
368 if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
369 goto drop_it;
370
371 /* Dispatch to any registered ZAPI client(s) */
372
373 /* Extract subtype and flags */
374 ret = zclient_opaque_decode(msg, &info);
375 if (ret != 0)
376 goto drop_it;
377
378 /* Look up registered ZAPI client(s) */
379 reg = opq_reg_lookup(info.type);
380 if (reg == NULL) {
381 if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
382 zlog_debug("%s: no registrations for opaque type %u, flags %#x",
383 __func__, info.type, info.flags);
384 goto drop_it;
385 }
386
387 /* Reset read pointer, since we'll be re-sending message */
388 stream_set_getp(msg, 0);
389
390 /* Send a copy of the message to all registered clients */
391 for (client = reg->clients; client; client = client->next) {
392 dup = NULL;
393
394 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
395
396 if (client->proto != info.proto ||
397 client->instance != info.instance ||
398 client->session_id != info.session_id)
399 continue;
400
401 if (IS_ZEBRA_DEBUG_RECV &&
402 IS_ZEBRA_DEBUG_DETAIL)
403 zlog_debug("%s: found matching unicast client %s",
404 __func__,
405 opq_client2str(buf,
406 sizeof(buf),
407 client));
408
409 } else {
410 /* Copy message if more clients */
411 if (client->next)
412 dup = stream_dup(msg);
413 }
414
415 /*
416 * TODO -- this isn't ideal: we're going through an
417 * acquire/release cycle for each client for each
418 * message. Replace this with a batching version.
419 */
420 zclient = zserv_acquire_client(client->proto,
421 client->instance,
422 client->session_id);
423 if (zclient) {
424 if (IS_ZEBRA_DEBUG_SEND &&
425 IS_ZEBRA_DEBUG_DETAIL)
426 zlog_debug("%s: sending %s to client %s",
427 __func__,
428 (dup ? "dup" : "msg"),
429 opq_client2str(buf,
430 sizeof(buf),
431 client));
432
433 /*
434 * Sending a message actually means enqueuing
435 * it for a zapi io pthread to send - so we
436 * don't touch the message after this call.
437 */
438 zserv_send_message(zclient, dup ? dup : msg);
439 if (dup)
440 dup = NULL;
441 else
442 msg = NULL;
443
444 zserv_release_client(zclient);
445 } else {
446 if (IS_ZEBRA_DEBUG_RECV &&
447 IS_ZEBRA_DEBUG_DETAIL)
448 zlog_debug("%s: type %u: no zclient for %s",
449 __func__, info.type,
450 opq_client2str(buf,
451 sizeof(buf),
452 client));
453 /* Registered but gone? */
454 if (dup)
455 stream_free(dup);
456 }
457
458 /* If unicast, we're done */
459 if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST))
460 break;
461 }
462
463 drop_it:
464
465 if (msg)
466 stream_free(msg);
467 }
468
469 return 0;
470 }
471
472 /*
473 * Process a register/unregister message
474 */
475 static int handle_opq_registration(const struct zmsghdr *hdr,
476 struct stream *msg)
477 {
478 int ret = 0;
479 struct zapi_opaque_reg_info info;
480 struct opq_client_reg *client;
481 struct opq_msg_reg key, *reg;
482 char buf[50];
483
484 memset(&info, 0, sizeof(info));
485
486 if (zapi_opaque_reg_decode(msg, &info) < 0) {
487 ret = -1;
488 goto done;
489 }
490
491 memset(&key, 0, sizeof(key));
492
493 key.type = info.type;
494
495 reg = opq_regh_find(&opq_reg_hash, &key);
496 if (reg) {
497 /* Look for dup client */
498 for (client = reg->clients; client != NULL;
499 client = client->next) {
500 if (opq_client_match(client, &info))
501 break;
502 }
503
504 if (client) {
505 /* Oops - duplicate registration? */
506 if (IS_ZEBRA_DEBUG_RECV)
507 zlog_debug("%s: duplicate opq reg for client %s",
508 __func__,
509 opq_client2str(buf, sizeof(buf),
510 client));
511 goto done;
512 }
513
514 client = opq_client_alloc(&info);
515
516 if (IS_ZEBRA_DEBUG_RECV)
517 zlog_debug("%s: client %s registers for %u",
518 __func__,
519 opq_client2str(buf, sizeof(buf), client),
520 info.type);
521
522 /* Link client into registration */
523 client->next = reg->clients;
524 if (reg->clients)
525 reg->clients->prev = client;
526 reg->clients = client;
527 } else {
528 /*
529 * No existing registrations - create one, add the
530 * client, and add registration to hash.
531 */
532 reg = opq_reg_alloc(info.type);
533 client = opq_client_alloc(&info);
534
535 if (IS_ZEBRA_DEBUG_RECV)
536 zlog_debug("%s: client %s registers for new reg %u",
537 __func__,
538 opq_client2str(buf, sizeof(buf), client),
539 info.type);
540
541 reg->clients = client;
542
543 opq_regh_add(&opq_reg_hash, reg);
544 }
545
546 done:
547
548 stream_free(msg);
549 return ret;
550 }
551
552 /*
553 * Process a register/unregister message
554 */
555 static int handle_opq_unregistration(const struct zmsghdr *hdr,
556 struct stream *msg)
557 {
558 int ret = 0;
559 struct zapi_opaque_reg_info info;
560 struct opq_client_reg *client;
561 struct opq_msg_reg key, *reg;
562 char buf[50];
563
564 memset(&info, 0, sizeof(info));
565
566 if (zapi_opaque_reg_decode(msg, &info) < 0) {
567 ret = -1;
568 goto done;
569 }
570
571 memset(&key, 0, sizeof(key));
572
573 key.type = info.type;
574
575 reg = opq_regh_find(&opq_reg_hash, &key);
576 if (reg == NULL) {
577 /* Weird: unregister for unknown message? */
578 if (IS_ZEBRA_DEBUG_RECV)
579 zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
580 __func__,
581 zebra_route_string(info.proto),
582 info.instance, info.session_id, info.type);
583 goto done;
584 }
585
586 /* Look for client */
587 for (client = reg->clients; client != NULL;
588 client = client->next) {
589 if (opq_client_match(client, &info))
590 break;
591 }
592
593 if (client == NULL) {
594 /* Oops - unregister for unknown client? */
595 if (IS_ZEBRA_DEBUG_RECV)
596 zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
597 __func__, zebra_route_string(info.proto),
598 info.instance, info.session_id, info.type);
599 goto done;
600 }
601
602 if (IS_ZEBRA_DEBUG_RECV)
603 zlog_debug("%s: client %s unregisters for %u",
604 __func__, opq_client2str(buf, sizeof(buf), client),
605 info.type);
606
607 if (client->prev)
608 client->prev->next = client->next;
609 if (client->next)
610 client->next->prev = client->prev;
611 if (reg->clients == client)
612 reg->clients = client->next;
613
614 opq_client_free(&client);
615
616 /* Is registration empty now? */
617 if (reg->clients == NULL) {
618 opq_regh_del(&opq_reg_hash, reg);
619 opq_reg_free(&reg);
620 }
621
622 done:
623
624 stream_free(msg);
625 return ret;
626 }
627
628 /* Compare utility for registered clients */
629 static bool opq_client_match(const struct opq_client_reg *client,
630 const struct zapi_opaque_reg_info *info)
631 {
632 if (client->proto == info->proto &&
633 client->instance == info->instance &&
634 client->session_id == info->session_id)
635 return true;
636 else
637 return false;
638 }
639
640 static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
641 {
642 struct opq_msg_reg key, *reg;
643
644 memset(&key, 0, sizeof(key));
645
646 key.type = type;
647
648 reg = opq_regh_find(&opq_reg_hash, &key);
649
650 return reg;
651 }
652
653 static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
654 {
655 struct opq_msg_reg *reg;
656
657 reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
658
659 reg->type = type;
660 INIT_HASH(&reg->item);
661
662 return reg;
663 }
664
665 static void opq_reg_free(struct opq_msg_reg **reg)
666 {
667 XFREE(MTYPE_OPQ, (*reg));
668 }
669
670 static struct opq_client_reg *opq_client_alloc(
671 const struct zapi_opaque_reg_info *info)
672 {
673 struct opq_client_reg *client;
674
675 client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
676
677 client->proto = info->proto;
678 client->instance = info->instance;
679 client->session_id = info->session_id;
680
681 return client;
682 }
683
684 static void opq_client_free(struct opq_client_reg **client)
685 {
686 XFREE(MTYPE_OPQ, (*client));
687 }
688
689 static const char *opq_client2str(char *buf, size_t buflen,
690 const struct opq_client_reg *client)
691 {
692 char sbuf[20];
693
694 snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
695 client->instance);
696 if (client->session_id > 0) {
697 snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
698 strlcat(buf, sbuf, buflen);
699 }
700
701 return buf;
702 }
703
704 /* Hash function for clients registered for messages */
705 static uint32_t registration_hash(const struct opq_msg_reg *reg)
706 {
707 return reg->type;
708 }
709
710 /* Comparison function for client registrations */
711 static int registration_compare(const struct opq_msg_reg *reg1,
712 const struct opq_msg_reg *reg2)
713 {
714 if (reg1->type == reg2->type)
715 return 0;
716 else
717 return -1;
718 }