]> git.proxmox.com Git - mirror_frr.git/blob - zebra/zebra_mlag.c
Merge pull request #9345 from mjstapp/fix_lib_zmq_free
[mirror_frr.git] / zebra / zebra_mlag.c
1 /* Zebra Mlag Code.
2 * Copyright (C) 2018 Cumulus Networks, Inc.
3 * Donald Sharp
4 *
5 * This file is part of FRR.
6 *
7 * FRR is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; either version 2, or (at your option) any
10 * later version.
11 *
12 * FRR is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with FRR; see the file COPYING. If not, write to the Free
19 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
20 * 02111-1307, USA.
21 */
22 #include "zebra.h"
23
24 #include "command.h"
25 #include "hook.h"
26 #include "frr_pthread.h"
27 #include "mlag.h"
28
29 #include "zebra/zebra_mlag.h"
30 #include "zebra/zebra_mlag_vty.h"
31 #include "zebra/zebra_router.h"
32 #include "zebra/zapi_msg.h"
33 #include "zebra/debug.h"
34
35 #ifdef HAVE_PROTOBUF_VERSION_3
36 #include "mlag/mlag.pb-c.h"
37 #endif
38
39 DEFINE_HOOK(zebra_mlag_private_write_data,
40 (uint8_t *data, uint32_t len), (data, len));
41 DEFINE_HOOK(zebra_mlag_private_monitor_state, (), ());
42 DEFINE_HOOK(zebra_mlag_private_open_channel, (), ());
43 DEFINE_HOOK(zebra_mlag_private_close_channel, (), ());
44 DEFINE_HOOK(zebra_mlag_private_cleanup_data, (), ());
45
46 #define ZEBRA_MLAG_METADATA_LEN 4
47 #define ZEBRA_MLAG_MSG_BCAST 0xFFFFFFFF
48
49 uint8_t mlag_wr_buffer[ZEBRA_MLAG_BUF_LIMIT];
50 uint8_t mlag_rd_buffer[ZEBRA_MLAG_BUF_LIMIT];
51
52 static bool test_mlag_in_progress;
53
54 static int zebra_mlag_signal_write_thread(void);
55 static int zebra_mlag_terminate_pthread(struct thread *event);
56 static int zebra_mlag_post_data_from_main_thread(struct thread *thread);
57 static void zebra_mlag_publish_process_state(struct zserv *client,
58 zebra_message_types_t msg_type);
59
60 /**********************MLAG Interaction***************************************/
61
62 /*
63 * API to post the Registration to MLAGD
64 * MLAG will not process any messages with out the registration
65 */
66 void zebra_mlag_send_register(void)
67 {
68 struct stream *s = NULL;
69
70 s = stream_new(sizeof(struct mlag_msg));
71
72 stream_putl(s, MLAG_REGISTER);
73 stream_putw(s, MLAG_MSG_NULL_PAYLOAD);
74 stream_putw(s, MLAG_MSG_NO_BATCH);
75 stream_fifo_push_safe(zrouter.mlag_info.mlag_fifo, s);
76 zebra_mlag_signal_write_thread();
77
78 if (IS_ZEBRA_DEBUG_MLAG)
79 zlog_debug("%s: Enqueued MLAG Register to MLAG Thread ",
80 __func__);
81 }
82
83 /*
84 * API to post the De-Registration to MLAGD
85 * MLAG will not process any messages after the de-registration
86 */
87 void zebra_mlag_send_deregister(void)
88 {
89 struct stream *s = NULL;
90
91 s = stream_new(sizeof(struct mlag_msg));
92
93 stream_putl(s, MLAG_DEREGISTER);
94 stream_putw(s, MLAG_MSG_NULL_PAYLOAD);
95 stream_putw(s, MLAG_MSG_NO_BATCH);
96 stream_fifo_push_safe(zrouter.mlag_info.mlag_fifo, s);
97 zebra_mlag_signal_write_thread();
98
99 if (IS_ZEBRA_DEBUG_MLAG)
100 zlog_debug("%s: Enqueued MLAG De-Register to MLAG Thread ",
101 __func__);
102 }
103
104 /*
105 * API To handle MLAG Received data
106 * Decodes the data using protobuf and enqueue to main thread
107 * main thread publish this to clients based on client subscription
108 */
109 void zebra_mlag_process_mlag_data(uint8_t *data, uint32_t len)
110 {
111 struct stream *s = NULL;
112 int msg_type = 0;
113
114 s = stream_new(ZEBRA_MLAG_BUF_LIMIT);
115 /*
116 * Place holder we need the message type first
117 */
118 stream_putl(s, msg_type);
119 msg_type = zebra_mlag_protobuf_decode_message(s, data, len);
120
121 if (msg_type <= 0) {
122 /* Something went wrong in decoding */
123 stream_free(s);
124 zlog_err("%s: failed to process mlag data-%d, %u", __func__,
125 msg_type, len);
126 return;
127 }
128
129 /*
130 * additional four bytes are for message type
131 */
132 stream_putl_at(s, 0, msg_type);
133 thread_add_event(zrouter.master, zebra_mlag_post_data_from_main_thread,
134 s, 0, NULL);
135 }
136
137 /**********************End of MLAG Interaction********************************/
138
139 /************************MLAG Thread Processing*******************************/
140
141 /*
142 * after posting every 'ZEBRA_MLAG_POST_LIMIT' packets, MLAG Thread will be
143 * yielded to give CPU for other threads
144 */
145 #define ZEBRA_MLAG_POST_LIMIT 100
146
147 /*
148 * This thread reads the clients data from the Global queue and encodes with
149 * protobuf and pass on to the MLAG socket.
150 */
151 static int zebra_mlag_client_msg_handler(struct thread *event)
152 {
153 struct stream *s;
154 uint32_t wr_count = 0;
155 uint32_t msg_type = 0;
156 uint32_t max_count = 0;
157 int len = 0;
158
159 wr_count = stream_fifo_count_safe(zrouter.mlag_info.mlag_fifo);
160 if (IS_ZEBRA_DEBUG_MLAG)
161 zlog_debug(":%s: Processing MLAG write, %u messages in queue",
162 __func__, wr_count);
163
164 max_count = MIN(wr_count, ZEBRA_MLAG_POST_LIMIT);
165
166 for (wr_count = 0; wr_count < max_count; wr_count++) {
167 s = stream_fifo_pop_safe(zrouter.mlag_info.mlag_fifo);
168 if (!s) {
169 zlog_debug(":%s: Got a NULL Messages, some thing wrong",
170 __func__);
171 break;
172 }
173
174 /*
175 * Encode the data now
176 */
177 len = zebra_mlag_protobuf_encode_client_data(s, &msg_type);
178
179 /*
180 * write to MCLAGD
181 */
182 if (len > 0) {
183 hook_call(zebra_mlag_private_write_data,
184 mlag_wr_buffer, len);
185
186 /*
187 * If message type is De-register, send a signal to main
188 * thread, so that necessary cleanup will be done by
189 * main thread.
190 */
191 if (msg_type == MLAG_DEREGISTER) {
192 thread_add_event(zrouter.master,
193 zebra_mlag_terminate_pthread,
194 NULL, 0, NULL);
195 }
196 }
197
198 stream_free(s);
199 }
200
201 if (IS_ZEBRA_DEBUG_MLAG)
202 zlog_debug(":%s: Posted %d messages to MLAGD", __func__,
203 wr_count);
204 /*
205 * Currently there is only message write task is enqueued to this
206 * thread, yielding was added for future purpose, so that this thread
207 * can server other tasks also and in case FIFO is empty, this task will
208 * be schedule when main thread adds some messages
209 */
210 if (wr_count >= ZEBRA_MLAG_POST_LIMIT)
211 zebra_mlag_signal_write_thread();
212 return 0;
213 }
214
215 /*
216 * API to handle the process state.
217 * In case of Down, Zebra keep monitoring the MLAG state.
218 * all the state Notifications will be published to clients
219 */
220 void zebra_mlag_handle_process_state(enum zebra_mlag_state state)
221 {
222 if (state == MLAG_UP) {
223 zrouter.mlag_info.connected = true;
224 zebra_mlag_publish_process_state(NULL, ZEBRA_MLAG_PROCESS_UP);
225 zebra_mlag_send_register();
226 } else if (state == MLAG_DOWN) {
227 zrouter.mlag_info.connected = false;
228 zebra_mlag_publish_process_state(NULL, ZEBRA_MLAG_PROCESS_DOWN);
229 hook_call(zebra_mlag_private_monitor_state);
230 }
231 }
232
233 /***********************End of MLAG Thread processing*************************/
234
235 /*************************Multi-entratnt Api's********************************/
236
237 /*
238 * Provider api to signal that work/events are available
239 * for the Zebra MLAG Write pthread.
240 * This API is called from 2 pthreads..
241 * 1) by main thread when client posts a MLAG Message
242 * 2) by MLAG Thread, in case of yield
243 * though this api, is called from two threads we don't need any locking
244 * because Thread task enqueue is thread safe means internally it had
245 * necessary protection
246 */
247 static int zebra_mlag_signal_write_thread(void)
248 {
249 if (IS_ZEBRA_DEBUG_MLAG)
250 zlog_debug(":%s: Scheduling MLAG write", __func__);
251 /*
252 * This api will be called from Both main & MLAG Threads.
253 * main thread writes, "zrouter.mlag_info.th_master" only
254 * during Zebra Init/after MLAG thread is destroyed.
255 * so it is safe to use without any locking
256 */
257 thread_add_event(zrouter.mlag_info.th_master,
258 zebra_mlag_client_msg_handler, NULL, 0,
259 &zrouter.mlag_info.t_write);
260 return 0;
261 }
262
263 /*
264 * API will be used to publish the MLAG state to interested clients
265 * In case client is passed, state is posted only for that client,
266 * otherwise to all interested clients
267 * this api can be called from two threads.
268 * 1) from main thread: when client is passed
269 * 2) from MLAG Thread: when client is NULL
270 *
271 * In second case, to avoid global data access data will be post to Main
272 * thread, so that actual posting to clients will happen from Main thread.
273 */
274 static void zebra_mlag_publish_process_state(struct zserv *client,
275 zebra_message_types_t msg_type)
276 {
277 struct stream *s;
278
279 if (IS_ZEBRA_DEBUG_MLAG)
280 zlog_debug("%s: Publishing MLAG process state:%s to %s Client",
281 __func__,
282 (msg_type == ZEBRA_MLAG_PROCESS_UP) ? "UP" : "DOWN",
283 (client) ? "one" : "all");
284
285 if (client) {
286 s = stream_new(ZEBRA_HEADER_SIZE);
287 zclient_create_header(s, msg_type, VRF_DEFAULT);
288 zserv_send_message(client, s);
289 return;
290 }
291
292
293 /*
294 * additional four bytes are for mesasge type
295 */
296 s = stream_new(ZEBRA_HEADER_SIZE + ZEBRA_MLAG_METADATA_LEN);
297 stream_putl(s, ZEBRA_MLAG_MSG_BCAST);
298 zclient_create_header(s, msg_type, VRF_DEFAULT);
299 thread_add_event(zrouter.master, zebra_mlag_post_data_from_main_thread,
300 s, 0, NULL);
301 }
302
303 /**************************End of Multi-entrant Apis**************************/
304
305 /***********************Zebra Main thread processing**************************/
306
307 /*
308 * To avoid data corruption, messages will be post to clients only from
309 * main thread, because for that access was needed for clients list.
310 * so instead of forcing the locks, messages will be posted from main thread.
311 */
312 static int zebra_mlag_post_data_from_main_thread(struct thread *thread)
313 {
314 struct stream *s = THREAD_ARG(thread);
315 struct stream *zebra_s = NULL;
316 struct listnode *node;
317 struct zserv *client;
318 uint32_t msg_type = 0;
319 uint32_t msg_len = 0;
320
321 if (!s)
322 return -1;
323
324 STREAM_GETL(s, msg_type);
325 if (IS_ZEBRA_DEBUG_MLAG)
326 zlog_debug(
327 "%s: Posting MLAG data for msg_type:0x%x to interested clients",
328 __func__, msg_type);
329
330 msg_len = s->endp - ZEBRA_MLAG_METADATA_LEN;
331 for (ALL_LIST_ELEMENTS_RO(zrouter.client_list, node, client)) {
332 if (client->mlag_updates_interested == true) {
333 if (msg_type != ZEBRA_MLAG_MSG_BCAST
334 && !CHECK_FLAG(client->mlag_reg_mask1,
335 (1 << msg_type))) {
336 continue;
337 }
338
339 if (IS_ZEBRA_DEBUG_MLAG)
340 zlog_debug(
341 "%s: Posting MLAG data of length-%d to client:%d ",
342 __func__, msg_len, client->proto);
343
344 zebra_s = stream_new(msg_len);
345 STREAM_GET(zebra_s->data, s, msg_len);
346 zebra_s->endp = msg_len;
347 stream_putw_at(zebra_s, 0, msg_len);
348
349 /*
350 * This stream will be enqueued to client_obuf, it will
351 * be freed after posting to client socket.
352 */
353 zserv_send_message(client, zebra_s);
354 zebra_s = NULL;
355 }
356 }
357
358 stream_free(s);
359 return 0;
360 stream_failure:
361 stream_free(s);
362 if (zebra_s)
363 stream_free(zebra_s);
364 return 0;
365 }
366
367 /*
368 * Start the MLAG Thread, this will be used to write client data on to
369 * MLAG Process and to read the data from MLAG and post to clients.
370 * when all clients are un-registered, this Thread will be
371 * suspended.
372 */
373 static void zebra_mlag_spawn_pthread(void)
374 {
375 /* Start MLAG write pthread */
376
377 struct frr_pthread_attr pattr = {.start =
378 frr_pthread_attr_default.start,
379 .stop = frr_pthread_attr_default.stop};
380
381 zrouter.mlag_info.zebra_pth_mlag =
382 frr_pthread_new(&pattr, "Zebra MLAG thread", "Zebra MLAG");
383
384 zrouter.mlag_info.th_master = zrouter.mlag_info.zebra_pth_mlag->master;
385
386
387 /* Enqueue an initial event to the Newly spawn MLAG pthread */
388 zebra_mlag_signal_write_thread();
389
390 frr_pthread_run(zrouter.mlag_info.zebra_pth_mlag, NULL);
391 }
392
393 /*
394 * all clients are un-registered for MLAG Updates, terminate the
395 * MLAG write thread
396 */
397 static int zebra_mlag_terminate_pthread(struct thread *event)
398 {
399 if (IS_ZEBRA_DEBUG_MLAG)
400 zlog_debug("Zebra MLAG write thread terminate called");
401
402 if (zrouter.mlag_info.clients_interested_cnt) {
403 if (IS_ZEBRA_DEBUG_MLAG)
404 zlog_debug(
405 "Zebra MLAG: still some clients are interested");
406 return 0;
407 }
408
409 frr_pthread_stop(zrouter.mlag_info.zebra_pth_mlag, NULL);
410
411 /* Destroy pthread */
412 frr_pthread_destroy(zrouter.mlag_info.zebra_pth_mlag);
413 zrouter.mlag_info.zebra_pth_mlag = NULL;
414 zrouter.mlag_info.th_master = NULL;
415 zrouter.mlag_info.t_read = NULL;
416 zrouter.mlag_info.t_write = NULL;
417
418 /*
419 * Send Notification to clean private data
420 */
421 hook_call(zebra_mlag_private_cleanup_data);
422 return 0;
423 }
424
425 /*
426 * API to register zebra client for MLAG Updates
427 */
428 void zebra_mlag_client_register(ZAPI_HANDLER_ARGS)
429 {
430 struct stream *s;
431 uint32_t reg_mask = 0;
432 int rc = 0;
433
434 if (IS_ZEBRA_DEBUG_MLAG)
435 zlog_debug("Received MLAG Registration from client-proto:%d",
436 client->proto);
437
438
439 /* Get input stream. */
440 s = msg;
441
442 /* Get data. */
443 STREAM_GETL(s, reg_mask);
444
445 if (client->mlag_updates_interested == true) {
446
447 if (IS_ZEBRA_DEBUG_MLAG)
448 zlog_debug(
449 "Client is registered, existing mask: 0x%x, new mask: 0x%x",
450 client->mlag_reg_mask1, reg_mask);
451 if (client->mlag_reg_mask1 != reg_mask)
452 client->mlag_reg_mask1 = reg_mask;
453 /*
454 * Client might missed MLAG-UP Notification, post-it again
455 */
456 zebra_mlag_publish_process_state(client, ZEBRA_MLAG_PROCESS_UP);
457 return;
458 }
459
460
461 client->mlag_updates_interested = true;
462 client->mlag_reg_mask1 = reg_mask;
463 if (IS_ZEBRA_DEBUG_MLAG)
464 zlog_debug("Registering for MLAG Updates with mask: 0x%x, ",
465 client->mlag_reg_mask1);
466
467 zrouter.mlag_info.clients_interested_cnt++;
468
469 if (zrouter.mlag_info.clients_interested_cnt == 1) {
470 /*
471 * First-client for MLAG Updates,open the communication channel
472 * with MLAG
473 */
474 if (IS_ZEBRA_DEBUG_MLAG)
475 zlog_debug(
476 "First client, opening the channel with MLAG");
477
478 zebra_mlag_spawn_pthread();
479 rc = hook_call(zebra_mlag_private_open_channel);
480 if (rc < 0) {
481 /*
482 * For some reason, zebra not able to open the
483 * comm-channel with MLAG, so post MLAG-DOWN to client.
484 * later when the channel is open, zebra will send
485 * MLAG-UP
486 */
487 if (IS_ZEBRA_DEBUG_MLAG)
488 zlog_debug(
489 "Fail to open channel with MLAG,rc:%d, post Proto-down",
490 rc);
491 zebra_mlag_publish_process_state(
492 client, ZEBRA_MLAG_PROCESS_DOWN);
493 }
494 }
495
496 if (IS_ZEBRA_DEBUG_MLAG)
497 zlog_debug("Client Registered successfully for MLAG Updates");
498
499 if (zrouter.mlag_info.connected == true)
500 zebra_mlag_publish_process_state(client, ZEBRA_MLAG_PROCESS_UP);
501 stream_failure:
502 return;
503 }
504
505 /*
506 * API to un-register for MLAG Updates
507 */
508 void zebra_mlag_client_unregister(ZAPI_HANDLER_ARGS)
509 {
510 if (IS_ZEBRA_DEBUG_MLAG)
511 zlog_debug("Received MLAG De-Registration from client-proto:%d",
512 client->proto);
513
514 if (client->mlag_updates_interested == false)
515 /* Unexpected */
516 return;
517
518 client->mlag_updates_interested = false;
519 client->mlag_reg_mask1 = 0;
520 zrouter.mlag_info.clients_interested_cnt--;
521
522 if (zrouter.mlag_info.clients_interested_cnt == 0) {
523 /*
524 * No-client is interested for MLAG Updates,close the
525 * communication channel with MLAG
526 */
527 if (IS_ZEBRA_DEBUG_MLAG)
528 zlog_debug("Last client for MLAG, close the channel ");
529
530 /*
531 * Clean up flow:
532 * =============
533 * 1) main thread calls socket close which posts De-register
534 * to MLAG write thread
535 * 2) after MLAG write thread posts De-register it sends a
536 * signal back to main thread to do the thread cleanup
537 * this was mainly to make sure De-register is posted to MCLAGD.
538 */
539 hook_call(zebra_mlag_private_close_channel);
540 }
541
542 if (IS_ZEBRA_DEBUG_MLAG)
543 zlog_debug(
544 "Client De-Registered successfully for MLAG Updates");
545 }
546
547 /*
548 * Does following things.
549 * 1) allocated new local stream, and copies the client data and enqueue
550 * to MLAG Thread
551 * 2) MLAG Thread after dequeing, encode the client data using protobuf
552 * and write on to MLAG
553 */
554 void zebra_mlag_forward_client_msg(ZAPI_HANDLER_ARGS)
555 {
556 struct stream *zebra_s;
557 struct stream *mlag_s;
558
559 if (IS_ZEBRA_DEBUG_MLAG)
560 zlog_debug("Received Client MLAG Data from client-proto:%d",
561 client->proto);
562
563 /* Get input stream. */
564 zebra_s = msg;
565 mlag_s = stream_new(zebra_s->endp);
566
567 /*
568 * Client data is | Zebra Header + MLAG Data |
569 * we need to enqueue only the MLAG data, skipping Zebra Header
570 */
571 stream_put(mlag_s, zebra_s->data + zebra_s->getp,
572 STREAM_READABLE(zebra_s));
573 stream_fifo_push_safe(zrouter.mlag_info.mlag_fifo, mlag_s);
574 zebra_mlag_signal_write_thread();
575
576 if (IS_ZEBRA_DEBUG_MLAG)
577 zlog_debug("%s: Enqueued Client:%d data to MLAG Thread ",
578 __func__, client->proto);
579 }
580
581 /***********************End of Zebra Main thread processing*************/
582
583 enum mlag_role zebra_mlag_get_role(void)
584 {
585 return zrouter.mlag_info.role;
586 }
587
588 int32_t zebra_mlag_test_mlag_internal(const char *none, const char *primary,
589 const char *secondary)
590 {
591 enum mlag_role orig = zrouter.mlag_info.role;
592 char buf1[MLAG_ROLE_STRSIZE], buf2[MLAG_ROLE_STRSIZE];
593
594 if (none)
595 zrouter.mlag_info.role = MLAG_ROLE_NONE;
596 if (primary)
597 zrouter.mlag_info.role = MLAG_ROLE_PRIMARY;
598 if (secondary)
599 zrouter.mlag_info.role = MLAG_ROLE_SECONDARY;
600
601 if (IS_ZEBRA_DEBUG_MLAG)
602 zlog_debug("Test: Changing role from %s to %s",
603 mlag_role2str(orig, buf1, sizeof(buf1)),
604 mlag_role2str(orig, buf2, sizeof(buf2)));
605
606 if (orig != zrouter.mlag_info.role) {
607 zsend_capabilities_all_clients();
608 if (zrouter.mlag_info.role != MLAG_ROLE_NONE) {
609 if (zrouter.mlag_info.clients_interested_cnt == 0
610 && !test_mlag_in_progress) {
611 if (zrouter.mlag_info.zebra_pth_mlag == NULL)
612 zebra_mlag_spawn_pthread();
613 zrouter.mlag_info.clients_interested_cnt++;
614 test_mlag_in_progress = true;
615 hook_call(zebra_mlag_private_open_channel);
616 }
617 } else {
618 if (test_mlag_in_progress) {
619 test_mlag_in_progress = false;
620 zrouter.mlag_info.clients_interested_cnt--;
621 hook_call(zebra_mlag_private_close_channel);
622 }
623 }
624 }
625
626 return CMD_SUCCESS;
627 }
628
629 void zebra_mlag_init(void)
630 {
631 zebra_mlag_vty_init();
632
633 /*
634 * Intialiaze the MLAG Global variables
635 * write thread will be created during actual registration with MCLAG
636 */
637 zrouter.mlag_info.clients_interested_cnt = 0;
638 zrouter.mlag_info.connected = false;
639 zrouter.mlag_info.timer_running = false;
640 zrouter.mlag_info.mlag_fifo = stream_fifo_new();
641 zrouter.mlag_info.zebra_pth_mlag = NULL;
642 zrouter.mlag_info.th_master = NULL;
643 zrouter.mlag_info.t_read = NULL;
644 zrouter.mlag_info.t_write = NULL;
645 test_mlag_in_progress = false;
646 zebra_mlag_reset_read_buffer();
647 }
648
649 void zebra_mlag_terminate(void)
650 {
651 }
652
653
654 /*
655 *
656 * ProtoBuf Encoding APIs
657 */
658
659 #ifdef HAVE_PROTOBUF_VERSION_3
660
661 DEFINE_MTYPE_STATIC(ZEBRA, MLAG_PBUF, "ZEBRA MLAG PROTOBUF");
662
663 int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type)
664 {
665 ZebraMlagHeader hdr = ZEBRA_MLAG__HEADER__INIT;
666 struct mlag_msg mlag_msg;
667 uint8_t tmp_buf[ZEBRA_MLAG_BUF_LIMIT];
668 int len = 0;
669 int n_len = 0;
670 int rc = 0;
671 char buf[ZLOG_FILTER_LENGTH_MAX];
672 size_t length;
673
674 if (IS_ZEBRA_DEBUG_MLAG)
675 zlog_debug("%s: Entering..", __func__);
676
677 rc = mlag_lib_decode_mlag_hdr(s, &mlag_msg, &length);
678 if (rc)
679 return rc;
680
681 memset(tmp_buf, 0, ZEBRA_MLAG_BUF_LIMIT);
682
683 if (IS_ZEBRA_DEBUG_MLAG)
684 zlog_debug("%s: Mlag ProtoBuf encoding of message:%s, len:%d",
685 __func__,
686 mlag_lib_msgid_to_str(mlag_msg.msg_type, buf,
687 sizeof(buf)),
688 mlag_msg.data_len);
689 *msg_type = mlag_msg.msg_type;
690 switch (mlag_msg.msg_type) {
691 case MLAG_MROUTE_ADD: {
692 struct mlag_mroute_add msg;
693 ZebraMlagMrouteAdd pay_load = ZEBRA_MLAG_MROUTE_ADD__INIT;
694 uint32_t vrf_name_len = 0;
695
696 rc = mlag_lib_decode_mroute_add(s, &msg, &length);
697 if (rc)
698 return rc;
699
700 vrf_name_len = strlen(msg.vrf_name) + 1;
701 pay_load.vrf_name = XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
702 strlcpy(pay_load.vrf_name, msg.vrf_name, vrf_name_len);
703 pay_load.source_ip = msg.source_ip;
704 pay_load.group_ip = msg.group_ip;
705 pay_load.cost_to_rp = msg.cost_to_rp;
706 pay_load.owner_id = msg.owner_id;
707 pay_load.am_i_dr = msg.am_i_dr;
708 pay_load.am_i_dual_active = msg.am_i_dual_active;
709 pay_load.vrf_id = msg.vrf_id;
710
711 if (msg.owner_id == MLAG_OWNER_INTERFACE) {
712 vrf_name_len = strlen(msg.intf_name) + 1;
713 pay_load.intf_name =
714 XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
715 strlcpy(pay_load.intf_name, msg.intf_name,
716 vrf_name_len);
717 }
718
719 len = zebra_mlag_mroute_add__pack(&pay_load, tmp_buf);
720 XFREE(MTYPE_MLAG_PBUF, pay_load.vrf_name);
721 if (msg.owner_id == MLAG_OWNER_INTERFACE)
722 XFREE(MTYPE_MLAG_PBUF, pay_load.intf_name);
723 } break;
724 case MLAG_MROUTE_DEL: {
725 struct mlag_mroute_del msg;
726 ZebraMlagMrouteDel pay_load = ZEBRA_MLAG_MROUTE_DEL__INIT;
727 uint32_t vrf_name_len = 0;
728
729 rc = mlag_lib_decode_mroute_del(s, &msg, &length);
730 if (rc)
731 return rc;
732 vrf_name_len = strlen(msg.vrf_name) + 1;
733 pay_load.vrf_name = XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
734 strlcpy(pay_load.vrf_name, msg.vrf_name, vrf_name_len);
735 pay_load.source_ip = msg.source_ip;
736 pay_load.group_ip = msg.group_ip;
737 pay_load.owner_id = msg.owner_id;
738 pay_load.vrf_id = msg.vrf_id;
739
740 if (msg.owner_id == MLAG_OWNER_INTERFACE) {
741 vrf_name_len = strlen(msg.intf_name) + 1;
742 pay_load.intf_name =
743 XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
744 strlcpy(pay_load.intf_name, msg.intf_name,
745 vrf_name_len);
746 }
747
748 len = zebra_mlag_mroute_del__pack(&pay_load, tmp_buf);
749 XFREE(MTYPE_MLAG_PBUF, pay_load.vrf_name);
750 if (msg.owner_id == MLAG_OWNER_INTERFACE)
751 XFREE(MTYPE_MLAG_PBUF, pay_load.intf_name);
752 } break;
753 case MLAG_MROUTE_ADD_BULK: {
754 struct mlag_mroute_add msg;
755 ZebraMlagMrouteAddBulk Bulk_msg =
756 ZEBRA_MLAG_MROUTE_ADD_BULK__INIT;
757 ZebraMlagMrouteAdd **pay_load = NULL;
758 bool cleanup = false;
759 uint32_t i, actual;
760
761 Bulk_msg.n_mroute_add = mlag_msg.msg_cnt;
762 pay_load = XMALLOC(MTYPE_MLAG_PBUF, sizeof(ZebraMlagMrouteAdd *)
763 * mlag_msg.msg_cnt);
764
765 for (i = 0, actual = 0; i < mlag_msg.msg_cnt; i++, actual++) {
766
767 uint32_t vrf_name_len = 0;
768
769 rc = mlag_lib_decode_mroute_add(s, &msg, &length);
770 if (rc) {
771 cleanup = true;
772 break;
773 }
774 pay_load[i] = XMALLOC(MTYPE_MLAG_PBUF,
775 sizeof(ZebraMlagMrouteAdd));
776 zebra_mlag_mroute_add__init(pay_load[i]);
777
778 vrf_name_len = strlen(msg.vrf_name) + 1;
779 pay_load[i]->vrf_name =
780 XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
781 strlcpy(pay_load[i]->vrf_name, msg.vrf_name,
782 vrf_name_len);
783 pay_load[i]->source_ip = msg.source_ip;
784 pay_load[i]->group_ip = msg.group_ip;
785 pay_load[i]->cost_to_rp = msg.cost_to_rp;
786 pay_load[i]->owner_id = msg.owner_id;
787 pay_load[i]->am_i_dr = msg.am_i_dr;
788 pay_load[i]->am_i_dual_active = msg.am_i_dual_active;
789 pay_load[i]->vrf_id = msg.vrf_id;
790 if (msg.owner_id == MLAG_OWNER_INTERFACE) {
791 vrf_name_len = strlen(msg.intf_name) + 1;
792 pay_load[i]->intf_name =
793 XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
794
795 strlcpy(pay_load[i]->intf_name, msg.intf_name,
796 vrf_name_len);
797 }
798 }
799 if (!cleanup) {
800 Bulk_msg.mroute_add = pay_load;
801 len = zebra_mlag_mroute_add_bulk__pack(&Bulk_msg,
802 tmp_buf);
803 }
804
805 for (i = 0; i < actual; i++) {
806 /*
807 * The mlag_lib_decode_mroute_add can
808 * fail to properly decode and cause nothing
809 * to be allocated. Prevent a crash
810 */
811 if (!pay_load[i])
812 continue;
813
814 XFREE(MTYPE_MLAG_PBUF, pay_load[i]->vrf_name);
815 if (pay_load[i]->owner_id == MLAG_OWNER_INTERFACE
816 && pay_load[i]->intf_name)
817 XFREE(MTYPE_MLAG_PBUF, pay_load[i]->intf_name);
818 XFREE(MTYPE_MLAG_PBUF, pay_load[i]);
819 }
820 XFREE(MTYPE_MLAG_PBUF, pay_load);
821 if (cleanup)
822 return -1;
823 } break;
824 case MLAG_MROUTE_DEL_BULK: {
825 struct mlag_mroute_del msg;
826 ZebraMlagMrouteDelBulk Bulk_msg =
827 ZEBRA_MLAG_MROUTE_DEL_BULK__INIT;
828 ZebraMlagMrouteDel **pay_load = NULL;
829 bool cleanup = false;
830 uint32_t i, actual;
831
832 Bulk_msg.n_mroute_del = mlag_msg.msg_cnt;
833 pay_load = XMALLOC(MTYPE_MLAG_PBUF, sizeof(ZebraMlagMrouteDel *)
834 * mlag_msg.msg_cnt);
835
836 for (i = 0, actual = 0; i < mlag_msg.msg_cnt; i++, actual++) {
837
838 uint32_t vrf_name_len = 0;
839
840 rc = mlag_lib_decode_mroute_del(s, &msg, &length);
841 if (rc) {
842 cleanup = true;
843 break;
844 }
845
846 pay_load[i] = XMALLOC(MTYPE_MLAG_PBUF,
847 sizeof(ZebraMlagMrouteDel));
848 zebra_mlag_mroute_del__init(pay_load[i]);
849
850 vrf_name_len = strlen(msg.vrf_name) + 1;
851 pay_load[i]->vrf_name =
852 XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
853
854 strlcpy(pay_load[i]->vrf_name, msg.vrf_name,
855 vrf_name_len);
856 pay_load[i]->source_ip = msg.source_ip;
857 pay_load[i]->group_ip = msg.group_ip;
858 pay_load[i]->owner_id = msg.owner_id;
859 pay_load[i]->vrf_id = msg.vrf_id;
860 if (msg.owner_id == MLAG_OWNER_INTERFACE) {
861 vrf_name_len = strlen(msg.intf_name) + 1;
862 pay_load[i]->intf_name =
863 XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
864
865 strlcpy(pay_load[i]->intf_name, msg.intf_name,
866 vrf_name_len);
867 }
868 }
869 if (!cleanup) {
870 Bulk_msg.mroute_del = pay_load;
871 len = zebra_mlag_mroute_del_bulk__pack(&Bulk_msg,
872 tmp_buf);
873 }
874
875 for (i = 0; i < actual; i++) {
876 /*
877 * The mlag_lib_decode_mroute_add can
878 * fail to properly decode and cause nothing
879 * to be allocated. Prevent a crash
880 */
881 if (!pay_load[i])
882 continue;
883
884 XFREE(MTYPE_MLAG_PBUF, pay_load[i]->vrf_name);
885 if (pay_load[i]->owner_id == MLAG_OWNER_INTERFACE
886 && pay_load[i]->intf_name)
887 XFREE(MTYPE_MLAG_PBUF, pay_load[i]->intf_name);
888 XFREE(MTYPE_MLAG_PBUF, pay_load[i]);
889 }
890 XFREE(MTYPE_MLAG_PBUF, pay_load);
891 if (cleanup)
892 return -1;
893 } break;
894 default:
895 break;
896 }
897
898 if (IS_ZEBRA_DEBUG_MLAG)
899 zlog_debug("%s: length of Mlag ProtoBuf encoded message:%s, %d",
900 __func__,
901 mlag_lib_msgid_to_str(mlag_msg.msg_type, buf,
902 sizeof(buf)),
903 len);
904 hdr.type = (ZebraMlagHeader__MessageType)mlag_msg.msg_type;
905 if (len != 0) {
906 hdr.data.len = len;
907 hdr.data.data = XMALLOC(MTYPE_MLAG_PBUF, len);
908 memcpy(hdr.data.data, tmp_buf, len);
909 }
910
911 /*
912 * ProtoBuf Infra will not support to demarc the pointers whem multiple
913 * messages are posted inside a single Buffer.
914 * 2 -solutions exist to solve this
915 * 1. add Unenoced length at the beginning of every message, this will
916 * be used to point to next message in the buffer
917 * 2. another solution is defining all messages insides another message
918 * But this will permit only 32 messages. this can be extended with
919 * multiple levels.
920 * for simplicity we are going with solution-1.
921 */
922 len = zebra_mlag__header__pack(&hdr,
923 (mlag_wr_buffer + ZEBRA_MLAG_LEN_SIZE));
924 n_len = htonl(len);
925 memcpy(mlag_wr_buffer, &n_len, ZEBRA_MLAG_LEN_SIZE);
926 len += ZEBRA_MLAG_LEN_SIZE;
927
928 if (IS_ZEBRA_DEBUG_MLAG)
929 zlog_debug(
930 "%s: length of Mlag ProtoBuf message:%s with Header %d",
931 __func__,
932 mlag_lib_msgid_to_str(mlag_msg.msg_type, buf,
933 sizeof(buf)),
934 len);
935 XFREE(MTYPE_MLAG_PBUF, hdr.data.data);
936
937 return len;
938 }
939
940 static void zebra_fill_protobuf_msg(struct stream *s, char *name, int len)
941 {
942 int str_len = strlen(name) + 1;
943
944 stream_put(s, name, str_len);
945 /* Fill the rest with Null Character for aligning */
946 stream_put(s, NULL, len - str_len);
947 }
948
949 int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data,
950 uint32_t len)
951 {
952 uint32_t msg_type;
953 ZebraMlagHeader *hdr;
954 char buf[80];
955
956 hdr = zebra_mlag__header__unpack(NULL, len, data);
957 if (hdr == NULL)
958 return -1;
959
960 /*
961 * ADD The MLAG Header
962 */
963 zclient_create_header(s, ZEBRA_MLAG_FORWARD_MSG, VRF_DEFAULT);
964
965 msg_type = hdr->type;
966
967 if (IS_ZEBRA_DEBUG_MLAG)
968 zlog_debug("%s: Mlag ProtoBuf decoding of message:%s", __func__,
969 mlag_lib_msgid_to_str(msg_type, buf, 80));
970
971 /*
972 * Internal MLAG Message-types & MLAG.proto message types should
973 * always match, otherwise there can be decoding errors
974 * To avoid exposing clients with Protobuf flags, using internal
975 * message-types
976 */
977 stream_putl(s, hdr->type);
978
979 if (hdr->data.len == 0) {
980 /* NULL Payload */
981 stream_putw(s, MLAG_MSG_NULL_PAYLOAD);
982 /* No Batching */
983 stream_putw(s, MLAG_MSG_NO_BATCH);
984 } else {
985 switch (msg_type) {
986 case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_STATUS_UPDATE: {
987 ZebraMlagStatusUpdate *msg = NULL;
988
989 msg = zebra_mlag_status_update__unpack(
990 NULL, hdr->data.len, hdr->data.data);
991 if (msg == NULL) {
992 zebra_mlag__header__free_unpacked(hdr, NULL);
993 return -1;
994 }
995 /* Payload len */
996 stream_putw(s, sizeof(struct mlag_status));
997 /* No Batching */
998 stream_putw(s, MLAG_MSG_NO_BATCH);
999 /* Actual Data */
1000 zebra_fill_protobuf_msg(s, msg->peerlink,
1001 INTERFACE_NAMSIZ);
1002 stream_putl(s, msg->my_role);
1003 stream_putl(s, msg->peer_state);
1004 zebra_mlag_status_update__free_unpacked(msg, NULL);
1005 } break;
1006 case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_VXLAN_UPDATE: {
1007 ZebraMlagVxlanUpdate *msg = NULL;
1008
1009 msg = zebra_mlag_vxlan_update__unpack(
1010 NULL, hdr->data.len, hdr->data.data);
1011 if (msg == NULL) {
1012 zebra_mlag__header__free_unpacked(hdr, NULL);
1013 return -1;
1014 }
1015 /* Payload len */
1016 stream_putw(s, sizeof(struct mlag_vxlan));
1017 /* No Batching */
1018 stream_putw(s, MLAG_MSG_NO_BATCH);
1019 /* Actual Data */
1020 stream_putl(s, msg->anycast_ip);
1021 stream_putl(s, msg->local_ip);
1022 zebra_mlag_vxlan_update__free_unpacked(msg, NULL);
1023 } break;
1024 case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_ADD: {
1025 ZebraMlagMrouteAdd *msg = NULL;
1026
1027 msg = zebra_mlag_mroute_add__unpack(NULL, hdr->data.len,
1028 hdr->data.data);
1029 if (msg == NULL) {
1030 zebra_mlag__header__free_unpacked(hdr, NULL);
1031 return -1;
1032 }
1033 /* Payload len */
1034 stream_putw(s, sizeof(struct mlag_mroute_add));
1035 /* No Batching */
1036 stream_putw(s, MLAG_MSG_NO_BATCH);
1037 /* Actual Data */
1038 zebra_fill_protobuf_msg(s, msg->vrf_name, VRF_NAMSIZ);
1039
1040 stream_putl(s, msg->source_ip);
1041 stream_putl(s, msg->group_ip);
1042 stream_putl(s, msg->cost_to_rp);
1043 stream_putl(s, msg->owner_id);
1044 stream_putc(s, msg->am_i_dr);
1045 stream_putc(s, msg->am_i_dual_active);
1046 stream_putl(s, msg->vrf_id);
1047 if (msg->owner_id == MLAG_OWNER_INTERFACE)
1048 zebra_fill_protobuf_msg(s, msg->intf_name,
1049 INTERFACE_NAMSIZ);
1050 else
1051 stream_put(s, NULL, INTERFACE_NAMSIZ);
1052 zebra_mlag_mroute_add__free_unpacked(msg, NULL);
1053 } break;
1054 case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_DEL: {
1055 ZebraMlagMrouteDel *msg = NULL;
1056
1057 msg = zebra_mlag_mroute_del__unpack(NULL, hdr->data.len,
1058 hdr->data.data);
1059 if (msg == NULL) {
1060 zebra_mlag__header__free_unpacked(hdr, NULL);
1061 return -1;
1062 }
1063 /* Payload len */
1064 stream_putw(s, sizeof(struct mlag_mroute_del));
1065 /* No Batching */
1066 stream_putw(s, MLAG_MSG_NO_BATCH);
1067 /* Actual Data */
1068 zebra_fill_protobuf_msg(s, msg->vrf_name, VRF_NAMSIZ);
1069
1070 stream_putl(s, msg->source_ip);
1071 stream_putl(s, msg->group_ip);
1072 stream_putl(s, msg->owner_id);
1073 stream_putl(s, msg->vrf_id);
1074 if (msg->owner_id == MLAG_OWNER_INTERFACE)
1075 zebra_fill_protobuf_msg(s, msg->intf_name,
1076 INTERFACE_NAMSIZ);
1077 else
1078 stream_put(s, NULL, INTERFACE_NAMSIZ);
1079 zebra_mlag_mroute_del__free_unpacked(msg, NULL);
1080 } break;
1081 case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_ADD_BULK: {
1082 ZebraMlagMrouteAddBulk *Bulk_msg = NULL;
1083 ZebraMlagMrouteAdd *msg = NULL;
1084 size_t i, length_spot;
1085
1086 Bulk_msg = zebra_mlag_mroute_add_bulk__unpack(
1087 NULL, hdr->data.len, hdr->data.data);
1088 if (Bulk_msg == NULL) {
1089 zebra_mlag__header__free_unpacked(hdr, NULL);
1090 return -1;
1091 }
1092 /* Payload len */
1093 stream_putw(s, (Bulk_msg->n_mroute_add
1094 * sizeof(struct mlag_mroute_add)));
1095 /* No. of msgs in Batch */
1096 length_spot = stream_putw(s, Bulk_msg->n_mroute_add);
1097
1098 /* Actual Data */
1099 for (i = 0; i < Bulk_msg->n_mroute_add; i++) {
1100 if (STREAM_SIZE(s)
1101 < VRF_NAMSIZ + 22 + INTERFACE_NAMSIZ) {
1102 zlog_warn(
1103 "We have received more messages than we can parse at this point in time: %zu",
1104 Bulk_msg->n_mroute_add);
1105 break;
1106 }
1107
1108 msg = Bulk_msg->mroute_add[i];
1109
1110 zebra_fill_protobuf_msg(s, msg->vrf_name,
1111 VRF_NAMSIZ);
1112 stream_putl(s, msg->source_ip);
1113 stream_putl(s, msg->group_ip);
1114 stream_putl(s, msg->cost_to_rp);
1115 stream_putl(s, msg->owner_id);
1116 stream_putc(s, msg->am_i_dr);
1117 stream_putc(s, msg->am_i_dual_active);
1118 stream_putl(s, msg->vrf_id);
1119 if (msg->owner_id == MLAG_OWNER_INTERFACE)
1120 zebra_fill_protobuf_msg(
1121 s, msg->intf_name,
1122 INTERFACE_NAMSIZ);
1123 else
1124 stream_put(s, NULL, INTERFACE_NAMSIZ);
1125 }
1126
1127 stream_putw_at(s, length_spot, i + 1);
1128
1129 zebra_mlag_mroute_add_bulk__free_unpacked(Bulk_msg,
1130 NULL);
1131 } break;
1132 case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_DEL_BULK: {
1133 ZebraMlagMrouteDelBulk *Bulk_msg = NULL;
1134 ZebraMlagMrouteDel *msg = NULL;
1135 size_t i, length_spot;
1136
1137 Bulk_msg = zebra_mlag_mroute_del_bulk__unpack(
1138 NULL, hdr->data.len, hdr->data.data);
1139 if (Bulk_msg == NULL) {
1140 zebra_mlag__header__free_unpacked(hdr, NULL);
1141 return -1;
1142 }
1143 /* Payload len */
1144 stream_putw(s, (Bulk_msg->n_mroute_del
1145 * sizeof(struct mlag_mroute_del)));
1146 /* No. of msgs in Batch */
1147 length_spot = stream_putw(s, Bulk_msg->n_mroute_del);
1148
1149 /* Actual Data */
1150 for (i = 0; i < Bulk_msg->n_mroute_del; i++) {
1151 if (STREAM_SIZE(s)
1152 < VRF_NAMSIZ + 16 + INTERFACE_NAMSIZ) {
1153 zlog_warn(
1154 "We have received more messages than we can parse at this time");
1155 break;
1156 }
1157
1158 msg = Bulk_msg->mroute_del[i];
1159
1160 zebra_fill_protobuf_msg(s, msg->vrf_name,
1161 VRF_NAMSIZ);
1162 stream_putl(s, msg->source_ip);
1163 stream_putl(s, msg->group_ip);
1164 stream_putl(s, msg->owner_id);
1165 stream_putl(s, msg->vrf_id);
1166 if (msg->owner_id == MLAG_OWNER_INTERFACE)
1167 zebra_fill_protobuf_msg(
1168 s, msg->intf_name,
1169 INTERFACE_NAMSIZ);
1170 else
1171 stream_put(s, NULL, INTERFACE_NAMSIZ);
1172 }
1173
1174 stream_putw_at(s, length_spot, i + 1);
1175
1176 zebra_mlag_mroute_del_bulk__free_unpacked(Bulk_msg,
1177 NULL);
1178 } break;
1179 case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_ZEBRA_STATUS_UPDATE: {
1180 ZebraMlagZebraStatusUpdate *msg = NULL;
1181
1182 msg = zebra_mlag_zebra_status_update__unpack(
1183 NULL, hdr->data.len, hdr->data.data);
1184 if (msg == NULL) {
1185 zebra_mlag__header__free_unpacked(hdr, NULL);
1186 return -1;
1187 }
1188 /* Payload len */
1189 stream_putw(s, sizeof(struct mlag_frr_status));
1190 /* No Batching */
1191 stream_putw(s, MLAG_MSG_NO_BATCH);
1192 /* Actual Data */
1193 stream_putl(s, msg->peer_frrstate);
1194 zebra_mlag_zebra_status_update__free_unpacked(msg,
1195 NULL);
1196 } break;
1197 default:
1198 break;
1199 }
1200 }
1201 zebra_mlag__header__free_unpacked(hdr, NULL);
1202 return msg_type;
1203 }
1204
1205 #else
1206 int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type)
1207 {
1208 return 0;
1209 }
1210
1211 int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data,
1212 uint32_t len)
1213 {
1214 return 0;
1215 }
1216 #endif