]> git.proxmox.com Git - mirror_frr.git/commitdiff
Zebra: ADD Protobuf Encoding & Decoding for MLAG Messages
authorSatheesh Kumar K <sathk@cumulusnetworks.com>
Tue, 12 Nov 2019 09:41:04 +0000 (01:41 -0800)
committerSatheesh Kumar K <sathk@cumulusnetworks.com>
Thu, 14 Nov 2019 06:47:32 +0000 (22:47 -0800)
1. add the Mlag ProtoBuf Lib to Zebra Compilation
2. Encode the messages with protobuf before writing to MLAG
3. Decode the MLAG Messages using protobuf and write to clients
   based on their subscrption.

Signed-off-by: Satheesh Kumar K <sathk@cumulusnetworks.com>
Makefile.am
pimd/pim_mlag.c
zebra/subdir.am
zebra/zebra_mlag.c
zebra/zebra_mlag.h
zebra/zebra_mlag_private.c
zebra/zebra_router.h

index ada715dbcaf75caf25dff9d296d01f2261b865d7..34f112bf01da8dda396e2310605e6d1fbb0f669d 100644 (file)
@@ -125,11 +125,11 @@ include doc/manpages/subdir.am
 include doc/developer/subdir.am
 include include/subdir.am
 include lib/subdir.am
+include mlag/subdir.am
 include zebra/subdir.am
 include watchfrr/subdir.am
 include qpb/subdir.am
 include fpm/subdir.am
-include mlag/subdir.am
 include grpc/subdir.am
 include tools/subdir.am
 include solaris/subdir.am
index 740373bc4000a9f58d237204bb4c6a79c119bf48..a66f604d52b12b59344c39df25ec43fea4a41aa4 100644 (file)
@@ -141,7 +141,7 @@ int pim_zebra_mlag_handle_msg(struct stream *s, int len)
        } break;
        case MLAG_MROUTE_ADD_BULK: {
                struct mlag_mroute_add msg;
-               int i = 0;
+               int i;
 
                for (i = 0; i < mlag_msg.msg_cnt; i++) {
 
@@ -153,7 +153,7 @@ int pim_zebra_mlag_handle_msg(struct stream *s, int len)
        } break;
        case MLAG_MROUTE_DEL_BULK: {
                struct mlag_mroute_del msg;
-               int i = 0;
+               int i;
 
                for (i = 0; i < mlag_msg.msg_cnt; i++) {
 
index 78d374b7b10f19ed3a030b8916b2843067c8f064..d0f32d6a14137c65985b9fe242bdfea07d278dba 100644 (file)
@@ -38,6 +38,9 @@ man8 += $(MANBUILD)/zebra.8
 endif
 
 zebra_zebra_LDADD = lib/libfrr.la $(LIBCAP)
+if HAVE_PROTOBUF
+zebra_zebra_LDADD += mlag/libmlag_pb.la $(PROTOBUF_C_LIBS)
+endif
 zebra_zebra_SOURCES = \
        zebra/connected.c \
        zebra/debug.c \
@@ -131,6 +134,7 @@ noinst_HEADERS += \
        zebra/rtadv.h \
        zebra/rule_netlink.h \
        zebra/zebra_mlag.h \
+       zebra/zebra_mlag_private.h \
        zebra/zebra_fpm_private.h \
        zebra/zebra_l2.h \
        zebra/zebra_dplane.h \
index 4d1b7712c5324eed30362e95b28837e25183c95d..1cbe54b178714c250561ac39265fd03cacd35e2a 100644 (file)
@@ -108,7 +108,7 @@ void zebra_mlag_process_mlag_data(uint8_t *data, uint32_t len)
        int msg_type = 0;
 
        s = stream_new(ZEBRA_MAX_PACKET_SIZ);
-       msg_type = zebra_mlag_protobuf_decode_message(&s, data, len);
+       msg_type = zebra_mlag_protobuf_decode_message(s, data, len);
 
        if (msg_type <= 0) {
                /* Something went wrong in decoding */
@@ -148,6 +148,7 @@ static int zebra_mlag_client_msg_handler(struct thread *event)
        struct stream *s;
        uint32_t wr_count = 0;
        uint32_t msg_type = 0;
+       uint32_t max_count = 0;
        int len = 0;
 
        wr_count = stream_fifo_count_safe(zrouter.mlag_info.mlag_fifo);
@@ -155,12 +156,9 @@ static int zebra_mlag_client_msg_handler(struct thread *event)
                zlog_debug(":%s: Processing MLAG write, %d messages in queue",
                           __func__, wr_count);
 
-       zrouter.mlag_info.t_write = NULL;
-       for (wr_count = 0; wr_count < ZEBRA_MLAG_POST_LIMIT; wr_count++) {
-               /* FIFO is empty,wait for teh message to be add */
-               if (stream_fifo_count_safe(zrouter.mlag_info.mlag_fifo) == 0)
-                       break;
+       max_count = MIN(wr_count, ZEBRA_MLAG_POST_LIMIT);
 
+       for (wr_count = 0; wr_count < max_count; wr_count++) {
                s = stream_fifo_pop_safe(zrouter.mlag_info.mlag_fifo);
                if (!s) {
                        zlog_debug(":%s: Got a NULL Messages, some thing wrong",
@@ -168,7 +166,6 @@ static int zebra_mlag_client_msg_handler(struct thread *event)
                        break;
                }
 
-               zebra_mlag_reset_write_buffer();
                /*
                 * Encode the data now
                 */
@@ -177,17 +174,19 @@ static int zebra_mlag_client_msg_handler(struct thread *event)
                /*
                 * write to MCLAGD
                 */
-               if (len > 0)
+               if (len > 0) {
                        zebra_mlag_private_write_data(mlag_wr_buffer, len);
 
-               /*
-                * If message type is De-register, send a signal to main thread,
-                * so that necessary cleanup will be done by main thread.
-                */
-               if (msg_type == MLAG_DEREGISTER) {
-                       thread_add_event(zrouter.master,
-                                        zebra_mlag_terminate_pthread, NULL, 0,
-                                        NULL);
+                       /*
+                        * If message type is De-register, send a signal to main
+                        * thread, so that necessary cleanup will be done by
+                        * main thread.
+                        */
+                       if (msg_type == MLAG_DEREGISTER) {
+                               thread_add_event(zrouter.master,
+                                                zebra_mlag_terminate_pthread,
+                                                NULL, 0, NULL);
+                       }
                }
 
                stream_free(s);
@@ -241,12 +240,15 @@ void zebra_mlag_handle_process_state(enum zebra_mlag_state state)
  */
 static int zebra_mlag_signal_write_thread(void)
 {
-       if (zrouter.mlag_info.zebra_pth_mlag) {
-               if (IS_ZEBRA_DEBUG_MLAG)
-                       zlog_debug(":%s: Scheduling MLAG write", __func__);
-               thread_add_event(zrouter.mlag_info.th_master,
-                                zebra_mlag_client_msg_handler, NULL, 0,
-                                &zrouter.mlag_info.t_write);
+       frr_with_mutex (&zrouter.mlag_info.mlag_th_mtx) {
+               if (zrouter.mlag_info.zebra_pth_mlag) {
+                       if (IS_ZEBRA_DEBUG_MLAG)
+                               zlog_debug(":%s: Scheduling MLAG write",
+                                          __func__);
+                       thread_add_event(zrouter.mlag_info.th_master,
+                                        zebra_mlag_client_msg_handler, NULL, 0,
+                                        &zrouter.mlag_info.t_write);
+               }
        }
        return 0;
 }
@@ -375,7 +377,7 @@ static void zebra_mlag_spawn_pthread(void)
        zrouter.mlag_info.th_master = zrouter.mlag_info.zebra_pth_mlag->master;
 
 
-       /* Enqueue an initial event for the dataplane pthread */
+       /* Enqueue an initial event to the Newly spawn MLAG pthread */
        zebra_mlag_signal_write_thread();
 
        frr_pthread_run(zrouter.mlag_info.zebra_pth_mlag, NULL);
@@ -583,7 +585,7 @@ DEFUN_HIDDEN (show_mlag,
              ZEBRA_STR
              "The mlag role on this machine\n")
 {
-       char buf[80];
+       char buf[MLAG_ROLE_STRSIZE];
 
        vty_out(vty, "MLag is configured to: %s\n",
                mlag_role2str(zrouter.mlag_info.role, buf, sizeof(buf)));
@@ -600,7 +602,7 @@ DEFPY(test_mlag, test_mlag_cmd,
       "Mlag is setup to be the secondary\n")
 {
        enum mlag_role orig = zrouter.mlag_info.role;
-       char buf1[80], buf2[80];
+       char buf1[MLAG_ROLE_STRSIZE], buf2[MLAG_ROLE_STRSIZE];
 
        if (none)
                zrouter.mlag_info.role = MLAG_ROLE_NONE;
@@ -619,8 +621,12 @@ DEFPY(test_mlag, test_mlag_cmd,
                if (zrouter.mlag_info.role != MLAG_ROLE_NONE) {
                        if (zrouter.mlag_info.clients_interested_cnt == 0
                            && test_mlag_in_progress == false) {
-                               if (zrouter.mlag_info.zebra_pth_mlag == NULL)
-                                       zebra_mlag_spawn_pthread();
+                               frr_with_mutex (
+                                       &zrouter.mlag_info.mlag_th_mtx) {
+                                       if (zrouter.mlag_info.zebra_pth_mlag
+                                           == NULL)
+                                               zebra_mlag_spawn_pthread();
+                               }
                                zrouter.mlag_info.clients_interested_cnt++;
                                test_mlag_in_progress = true;
                                zebra_mlag_private_open_channel();
@@ -655,8 +661,8 @@ void zebra_mlag_init(void)
        zrouter.mlag_info.t_read = NULL;
        zrouter.mlag_info.t_write = NULL;
        test_mlag_in_progress = false;
-       zebra_mlag_reset_write_buffer();
        zebra_mlag_reset_read_buffer();
+       pthread_mutex_init(&zrouter.mlag_info.mlag_th_mtx, NULL);
 }
 
 void zebra_mlag_terminate(void)
@@ -669,13 +675,514 @@ void zebra_mlag_terminate(void)
  *  ProtoBuf Encoding APIs
  */
 
+#ifdef HAVE_PROTOBUF
+
+DEFINE_MTYPE_STATIC(ZEBRA, MLAG_PBUF, "ZEBRA MLAG PROTOBUF")
+
+int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type)
+{
+       ZebraMlagHeader hdr = ZEBRA_MLAG__HEADER__INIT;
+       struct mlag_msg mlag_msg;
+       uint8_t tmp_buf[ZEBRA_MLAG_BUF_LIMIT];
+       int len = 0;
+       int n_len = 0;
+       int rc = 0;
+       char buf[ZLOG_FILTER_LENGTH_MAX];
+
+       if (IS_ZEBRA_DEBUG_MLAG)
+               zlog_debug("%s: Entering..", __func__);
+
+       rc = zebra_mlag_lib_decode_mlag_hdr(s, &mlag_msg);
+       if (rc)
+               return rc;
+
+       if (IS_ZEBRA_DEBUG_MLAG)
+               zlog_debug("%s: Decoded msg length:%d..", __func__,
+                          mlag_msg.data_len);
+
+       if (IS_ZEBRA_DEBUG_MLAG)
+               zlog_debug("%s: Mlag ProtoBuf encoding of message:%s", __func__,
+                          zebra_mlag_lib_msgid_to_str(mlag_msg.msg_type, buf,
+                                                      sizeof(buf)));
+       *msg_type = mlag_msg.msg_type;
+       switch (mlag_msg.msg_type) {
+       case MLAG_MROUTE_ADD: {
+               struct mlag_mroute_add msg;
+               ZebraMlagMrouteAdd pay_load = ZEBRA_MLAG_MROUTE_ADD__INIT;
+               uint32_t vrf_name_len = 0;
+
+               rc = zebra_mlag_lib_decode_mroute_add(s, &msg);
+               if (rc)
+                       return rc;
+               vrf_name_len = strlen(msg.vrf_name) + 1;
+               pay_load.vrf_name = XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
+               strlcpy(pay_load.vrf_name, msg.vrf_name, vrf_name_len);
+               pay_load.source_ip = msg.source_ip;
+               pay_load.group_ip = msg.group_ip;
+               pay_load.cost_to_rp = msg.cost_to_rp;
+               pay_load.owner_id = msg.owner_id;
+               pay_load.am_i_dr = msg.am_i_dr;
+               pay_load.am_i_dual_active = msg.am_i_dual_active;
+               pay_load.vrf_id = msg.vrf_id;
+
+               if (msg.owner_id == MLAG_OWNER_INTERFACE) {
+                       vrf_name_len = strlen(msg.intf_name) + 1;
+                       pay_load.intf_name =
+                               XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
+                       strlcpy(pay_load.intf_name, msg.intf_name,
+                               vrf_name_len);
+               }
+
+               len = zebra_mlag_mroute_add__pack(&pay_load, tmp_buf);
+               XFREE(MTYPE_MLAG_PBUF, pay_load.vrf_name);
+               if (msg.owner_id == MLAG_OWNER_INTERFACE)
+                       XFREE(MTYPE_MLAG_PBUF, pay_load.intf_name);
+       } break;
+       case MLAG_MROUTE_DEL: {
+               struct mlag_mroute_del msg;
+               ZebraMlagMrouteDel pay_load = ZEBRA_MLAG_MROUTE_DEL__INIT;
+               uint32_t vrf_name_len = 0;
+
+               rc = zebra_mlag_lib_decode_mroute_del(s, &msg);
+               if (rc)
+                       return rc;
+               vrf_name_len = strlen(msg.vrf_name) + 1;
+               pay_load.vrf_name = XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
+               strlcpy(pay_load.vrf_name, msg.vrf_name, vrf_name_len);
+               pay_load.source_ip = msg.source_ip;
+               pay_load.group_ip = msg.group_ip;
+               pay_load.owner_id = msg.owner_id;
+               pay_load.vrf_id = msg.vrf_id;
+
+               if (msg.owner_id == MLAG_OWNER_INTERFACE) {
+                       vrf_name_len = strlen(msg.intf_name) + 1;
+                       pay_load.intf_name =
+                               XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
+                       strlcpy(pay_load.intf_name, msg.intf_name,
+                               vrf_name_len);
+               }
+
+               len = zebra_mlag_mroute_del__pack(&pay_load, tmp_buf);
+               XFREE(MTYPE_MLAG_PBUF, pay_load.vrf_name);
+               if (msg.owner_id == MLAG_OWNER_INTERFACE)
+                       XFREE(MTYPE_MLAG_PBUF, pay_load.intf_name);
+       } break;
+       case MLAG_MROUTE_ADD_BULK: {
+               struct mlag_mroute_add msg;
+               ZebraMlagMrouteAddBulk Bulk_msg =
+                       ZEBRA_MLAG_MROUTE_ADD_BULK__INIT;
+               ZebraMlagMrouteAdd **pay_load = NULL;
+               int i;
+               bool cleanup = false;
+
+               Bulk_msg.n_mroute_add = mlag_msg.msg_cnt;
+               pay_load = XMALLOC(MTYPE_MLAG_PBUF, sizeof(ZebraMlagMrouteAdd *)
+                                                           * mlag_msg.msg_cnt);
+
+               for (i = 0; i < mlag_msg.msg_cnt; i++) {
+
+                       uint32_t vrf_name_len = 0;
+
+                       rc = zebra_mlag_lib_decode_mroute_add(s, &msg);
+                       if (rc) {
+                               cleanup = true;
+                               break;
+                       }
+                       pay_load[i] = XMALLOC(MTYPE_MLAG_PBUF,
+                                             sizeof(ZebraMlagMrouteAdd));
+                       zebra_mlag_mroute_add__init(pay_load[i]);
+
+                       vrf_name_len = strlen(msg.vrf_name) + 1;
+                       pay_load[i]->vrf_name =
+                               XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
+                       strlcpy(pay_load[i]->vrf_name, msg.vrf_name,
+                               vrf_name_len);
+                       pay_load[i]->source_ip = msg.source_ip;
+                       pay_load[i]->group_ip = msg.group_ip;
+                       pay_load[i]->cost_to_rp = msg.cost_to_rp;
+                       pay_load[i]->owner_id = msg.owner_id;
+                       pay_load[i]->am_i_dr = msg.am_i_dr;
+                       pay_load[i]->am_i_dual_active = msg.am_i_dual_active;
+                       pay_load[i]->vrf_id = msg.vrf_id;
+                       if (msg.owner_id == MLAG_OWNER_INTERFACE) {
+                               vrf_name_len = strlen(msg.intf_name) + 1;
+                               pay_load[i]->intf_name =
+                                       XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
+
+                               strlcpy(pay_load[i]->intf_name, msg.intf_name,
+                                       vrf_name_len);
+                       }
+               }
+               if (cleanup == false) {
+                       Bulk_msg.mroute_add = pay_load;
+                       len = zebra_mlag_mroute_add_bulk__pack(&Bulk_msg,
+                                                              tmp_buf);
+               }
+
+               for (i = 0; i < mlag_msg.msg_cnt; i++) {
+                       if (pay_load[i]->vrf_name)
+                               XFREE(MTYPE_MLAG_PBUF, pay_load[i]->vrf_name);
+                       if (pay_load[i]->owner_id == MLAG_OWNER_INTERFACE
+                           && pay_load[i]->intf_name)
+                               XFREE(MTYPE_MLAG_PBUF, pay_load[i]->intf_name);
+                       if (pay_load[i])
+                               XFREE(MTYPE_MLAG_PBUF, pay_load[i]);
+               }
+               XFREE(MTYPE_MLAG_PBUF, pay_load);
+               if (cleanup == true)
+                       return -1;
+       } break;
+       case MLAG_MROUTE_DEL_BULK: {
+               struct mlag_mroute_del msg;
+               ZebraMlagMrouteDelBulk Bulk_msg =
+                       ZEBRA_MLAG_MROUTE_DEL_BULK__INIT;
+               ZebraMlagMrouteDel **pay_load = NULL;
+               int i;
+               bool cleanup = false;
+
+               Bulk_msg.n_mroute_del = mlag_msg.msg_cnt;
+               pay_load = XMALLOC(MTYPE_MLAG_PBUF, sizeof(ZebraMlagMrouteDel *)
+                                                           * mlag_msg.msg_cnt);
+
+               for (i = 0; i < mlag_msg.msg_cnt; i++) {
+
+                       uint32_t vrf_name_len = 0;
+
+                       rc = zebra_mlag_lib_decode_mroute_del(s, &msg);
+                       if (rc) {
+                               cleanup = true;
+                               break;
+                       }
+
+                       pay_load[i] = XMALLOC(MTYPE_MLAG_PBUF,
+                                             sizeof(ZebraMlagMrouteDel));
+                       zebra_mlag_mroute_del__init(pay_load[i]);
+
+                       vrf_name_len = strlen(msg.vrf_name) + 1;
+                       pay_load[i]->vrf_name =
+                               XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
+
+                       strlcpy(pay_load[i]->vrf_name, msg.vrf_name,
+                               vrf_name_len);
+                       pay_load[i]->source_ip = msg.source_ip;
+                       pay_load[i]->group_ip = msg.group_ip;
+                       pay_load[i]->owner_id = msg.owner_id;
+                       pay_load[i]->vrf_id = msg.vrf_id;
+                       if (msg.owner_id == MLAG_OWNER_INTERFACE) {
+                               vrf_name_len = strlen(msg.intf_name) + 1;
+                               pay_load[i]->intf_name =
+                                       XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len);
+
+                               strlcpy(pay_load[i]->intf_name, msg.intf_name,
+                                       vrf_name_len);
+                       }
+               }
+               if (!cleanup) {
+                       Bulk_msg.mroute_del = pay_load;
+                       len = zebra_mlag_mroute_del_bulk__pack(&Bulk_msg,
+                                                              tmp_buf);
+               }
+
+               for (i = 0; i < mlag_msg.msg_cnt; i++) {
+                       if (pay_load[i]->vrf_name)
+                               XFREE(MTYPE_MLAG_PBUF, pay_load[i]->vrf_name);
+                       if (pay_load[i]->owner_id == MLAG_OWNER_INTERFACE
+                           && pay_load[i]->intf_name)
+                               XFREE(MTYPE_MLAG_PBUF, pay_load[i]->intf_name);
+                       if (pay_load[i])
+                               XFREE(MTYPE_MLAG_PBUF, pay_load[i]);
+               }
+               XFREE(MTYPE_MLAG_PBUF, pay_load);
+               if (cleanup)
+                       return -1;
+       } break;
+       default:
+               break;
+       }
+
+       if (IS_ZEBRA_DEBUG_MLAG)
+               zlog_debug("%s: length of Mlag ProtoBuf encoded message:%s, %d",
+                          __func__,
+                          zebra_mlag_lib_msgid_to_str(mlag_msg.msg_type, buf,
+                                                      sizeof(buf)),
+                          len);
+       hdr.type = (ZebraMlagHeader__MessageType)mlag_msg.msg_type;
+       if (len != 0) {
+               hdr.data.len = len;
+               hdr.data.data = XMALLOC(MTYPE_MLAG_PBUF, len);
+               memcpy(hdr.data.data, tmp_buf, len);
+       }
+
+       /*
+        * ProtoBuf Infra will not support to demarc the pointers whem multiple
+        * messages are posted inside a single Buffer.
+        * 2 -solutions exist to solve this
+        * 1. add Unenoced length at the beginning of every message, this will
+        *    be used to point to next message in the buffer
+        * 2. another solution is defining all messages insides another message
+        *    But this will permit only 32 messages. this can be extended with
+        *    multiple levels.
+        * for simplicity we are going with solution-1.
+        */
+       len = zebra_mlag__header__pack(&hdr,
+                                      (mlag_wr_buffer + ZEBRA_MLAG_LEN_SIZE));
+       n_len = htonl(len);
+       memcpy(mlag_wr_buffer, &n_len, ZEBRA_MLAG_LEN_SIZE);
+       len += ZEBRA_MLAG_LEN_SIZE;
+
+       if (IS_ZEBRA_DEBUG_MLAG)
+               zlog_debug(
+                       "%s: length of Mlag ProtoBuf message:%s with Header  %d",
+                       __func__,
+                       zebra_mlag_lib_msgid_to_str(mlag_msg.msg_type, buf,
+                                                   sizeof(buf)),
+                       len);
+       if (hdr.data.data)
+               XFREE(MTYPE_MLAG_PBUF, hdr.data.data);
+
+       return len;
+}
+
+int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data,
+                                      uint32_t len)
+{
+       uint32_t msg_type;
+       ZebraMlagHeader *hdr;
+       char buf[80];
+
+       hdr = zebra_mlag__header__unpack(NULL, len, data);
+       if (hdr == NULL)
+               return -1;
+
+       /*
+        * ADD The MLAG Header
+        */
+       zclient_create_header(s, ZEBRA_MLAG_FORWARD_MSG, VRF_DEFAULT);
+
+       msg_type = hdr->type;
+
+       if (IS_ZEBRA_DEBUG_MLAG)
+               zlog_debug("%s: Mlag ProtoBuf decoding of message:%s", __func__,
+                          zebra_mlag_lib_msgid_to_str(msg_type, buf, 80));
+
+       /*
+        * Internal MLAG Message-types & MLAG.proto message types should
+        * always match, otherwise there can be decoding errors
+        * To avoid exposing clients with Protobuf flags, using internal
+        * message-types
+        */
+       stream_putl(s, hdr->type);
+
+       if (hdr->data.len == 0) {
+               /* NULL Payload */
+               stream_putw(s, MLAG_MSG_NULL_PAYLOAD);
+               /* No Batching */
+               stream_putw(s, MLAG_MSG_NO_BATCH);
+       } else {
+               switch (msg_type) {
+               case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_STATUS_UPDATE: {
+                       ZebraMlagStatusUpdate *msg = NULL;
+
+                       msg = zebra_mlag_status_update__unpack(
+                               NULL, hdr->data.len, hdr->data.data);
+                       if (msg == NULL) {
+                               zebra_mlag__header__free_unpacked(hdr, NULL);
+                               return -1;
+                       }
+                       /* Payload len */
+                       stream_putw(s, sizeof(struct mlag_status));
+                       /* No Batching */
+                       stream_putw(s, MLAG_MSG_NO_BATCH);
+                       /* Actual Data */
+                       stream_put(s, msg->peerlink, INTERFACE_NAMSIZ);
+                       stream_putl(s, msg->my_role);
+                       stream_putl(s, msg->peer_state);
+                       zebra_mlag_status_update__free_unpacked(msg, NULL);
+               } break;
+               case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_VXLAN_UPDATE: {
+                       ZebraMlagVxlanUpdate *msg = NULL;
+
+                       msg = zebra_mlag_vxlan_update__unpack(
+                               NULL, hdr->data.len, hdr->data.data);
+                       if (msg == NULL) {
+                               zebra_mlag__header__free_unpacked(hdr, NULL);
+                               return -1;
+                       }
+                       /* Payload len */
+                       stream_putw(s, sizeof(struct mlag_vxlan));
+                       /* No Batching */
+                       stream_putw(s, MLAG_MSG_NO_BATCH);
+                       /* Actual Data */
+                       stream_putl(s, msg->anycast_ip);
+                       stream_putl(s, msg->local_ip);
+                       zebra_mlag_vxlan_update__free_unpacked(msg, NULL);
+               } break;
+               case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_ADD: {
+                       ZebraMlagMrouteAdd *msg = NULL;
+
+                       msg = zebra_mlag_mroute_add__unpack(NULL, hdr->data.len,
+                                                           hdr->data.data);
+                       if (msg == NULL) {
+                               zebra_mlag__header__free_unpacked(hdr, NULL);
+                               return -1;
+                       }
+                       /* Payload len */
+                       stream_putw(s, sizeof(struct mlag_mroute_add));
+                       /* No Batching */
+                       stream_putw(s, MLAG_MSG_NO_BATCH);
+                       /* Actual Data */
+                       stream_put(s, msg->vrf_name, VRF_NAMSIZ);
+
+                       stream_putl(s, msg->source_ip);
+                       stream_putl(s, msg->group_ip);
+                       stream_putl(s, msg->cost_to_rp);
+                       stream_putl(s, msg->owner_id);
+                       stream_putc(s, msg->am_i_dr);
+                       stream_putc(s, msg->am_i_dual_active);
+                       stream_putl(s, msg->vrf_id);
+                       if (msg->owner_id == MLAG_OWNER_INTERFACE)
+                               stream_put(s, msg->intf_name, INTERFACE_NAMSIZ);
+                       else
+                               stream_put(s, NULL, INTERFACE_NAMSIZ);
+                       zebra_mlag_mroute_add__free_unpacked(msg, NULL);
+               } break;
+               case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_DEL: {
+                       ZebraMlagMrouteDel *msg = NULL;
+
+                       msg = zebra_mlag_mroute_del__unpack(NULL, hdr->data.len,
+                                                           hdr->data.data);
+                       if (msg == NULL) {
+                               zebra_mlag__header__free_unpacked(hdr, NULL);
+                               return -1;
+                       }
+                       /* Payload len */
+                       stream_putw(s, sizeof(struct mlag_mroute_del));
+                       /* No Batching */
+                       stream_putw(s, MLAG_MSG_NO_BATCH);
+                       /* Actual Data */
+                       stream_put(s, msg->vrf_name, VRF_NAMSIZ);
+
+                       stream_putl(s, msg->source_ip);
+                       stream_putl(s, msg->group_ip);
+                       stream_putl(s, msg->group_ip);
+                       stream_putl(s, msg->owner_id);
+                       stream_putl(s, msg->vrf_id);
+                       if (msg->owner_id == MLAG_OWNER_INTERFACE)
+                               stream_put(s, msg->intf_name, INTERFACE_NAMSIZ);
+                       else
+                               stream_put(s, NULL, INTERFACE_NAMSIZ);
+                       zebra_mlag_mroute_del__free_unpacked(msg, NULL);
+               } break;
+               case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_ADD_BULK: {
+                       ZebraMlagMrouteAddBulk *Bulk_msg = NULL;
+                       ZebraMlagMrouteAdd *msg = NULL;
+                       size_t i;
+
+                       Bulk_msg = zebra_mlag_mroute_add_bulk__unpack(
+                               NULL, hdr->data.len, hdr->data.data);
+                       if (Bulk_msg == NULL) {
+                               zebra_mlag__header__free_unpacked(hdr, NULL);
+                               return -1;
+                       }
+                       /* Payload len */
+                       stream_putw(s, (Bulk_msg->n_mroute_add
+                                       * sizeof(struct mlag_mroute_add)));
+                       /* No. of msgs in Batch */
+                       stream_putw(s, Bulk_msg->n_mroute_add);
+
+                       /* Actual Data */
+                       for (i = 0; i < Bulk_msg->n_mroute_add; i++) {
+
+                               msg = Bulk_msg->mroute_add[i];
+
+                               stream_put(s, msg->vrf_name, VRF_NAMSIZ);
+                               stream_putl(s, msg->source_ip);
+                               stream_putl(s, msg->group_ip);
+                               stream_putl(s, msg->cost_to_rp);
+                               stream_putl(s, msg->owner_id);
+                               stream_putc(s, msg->am_i_dr);
+                               stream_putc(s, msg->am_i_dual_active);
+                               stream_putl(s, msg->vrf_id);
+                               if (msg->owner_id == MLAG_OWNER_INTERFACE)
+                                       stream_put(s, msg->intf_name,
+                                                  INTERFACE_NAMSIZ);
+                               else
+                                       stream_put(s, NULL, INTERFACE_NAMSIZ);
+                       }
+                       zebra_mlag_mroute_add_bulk__free_unpacked(Bulk_msg,
+                                                                 NULL);
+               } break;
+               case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_DEL_BULK: {
+                       ZebraMlagMrouteDelBulk *Bulk_msg = NULL;
+                       ZebraMlagMrouteDel *msg = NULL;
+                       size_t i;
+
+                       Bulk_msg = zebra_mlag_mroute_del_bulk__unpack(
+                               NULL, hdr->data.len, hdr->data.data);
+                       if (Bulk_msg == NULL) {
+                               zebra_mlag__header__free_unpacked(hdr, NULL);
+                               return -1;
+                       }
+                       /* Payload len */
+                       stream_putw(s, (Bulk_msg->n_mroute_del
+                                       * sizeof(struct mlag_mroute_del)));
+                       /* No. of msgs in Batch */
+                       stream_putw(s, Bulk_msg->n_mroute_del);
+
+                       /* Actual Data */
+                       for (i = 0; i < Bulk_msg->n_mroute_del; i++) {
+
+                               msg = Bulk_msg->mroute_del[i];
+
+                               stream_put(s, msg->vrf_name, VRF_NAMSIZ);
+                               stream_putl(s, msg->source_ip);
+                               stream_putl(s, msg->group_ip);
+                               stream_putl(s, msg->owner_id);
+                               stream_putl(s, msg->vrf_id);
+                               if (msg->owner_id == MLAG_OWNER_INTERFACE)
+                                       stream_put(s, msg->intf_name,
+                                                  INTERFACE_NAMSIZ);
+                               else
+                                       stream_put(s, NULL, INTERFACE_NAMSIZ);
+                       }
+                       zebra_mlag_mroute_del_bulk__free_unpacked(Bulk_msg,
+                                                                 NULL);
+               } break;
+               case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_ZEBRA_STATUS_UPDATE: {
+                       ZebraMlagZebraStatusUpdate *msg = NULL;
+
+                       msg = zebra_mlag_zebra_status_update__unpack(
+                               NULL, hdr->data.len, hdr->data.data);
+                       if (msg == NULL) {
+                               zebra_mlag__header__free_unpacked(hdr, NULL);
+                               return -1;
+                       }
+                       /* Payload len */
+                       stream_putw(s, sizeof(struct mlag_frr_status));
+                       /* No Batching */
+                       stream_putw(s, MLAG_MSG_NO_BATCH);
+                       /* Actual Data */
+                       stream_putl(s, msg->peer_frrstate);
+                       zebra_mlag_zebra_status_update__free_unpacked(msg,
+                                                                     NULL);
+               } break;
+               default:
+                       break;
+               }
+       }
+       zebra_mlag__header__free_unpacked(hdr, NULL);
+       return msg_type;
+}
+
+#else
 int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type)
 {
        return 0;
 }
 
-int zebra_mlag_protobuf_decode_message(struct stream **s, uint8_t *data,
+int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data,
                                       uint32_t len)
 {
        return 0;
 }
+#endif
index 6506089afc161f8a6f0d8e7cb0fb8540f3fedf39..6f7ef8319f3ab00d8868d475978a156f6f07840b 100644 (file)
 #include "zclient.h"
 #include "zebra/zserv.h"
 
+#ifdef HAVE_PROTOBUF
+#include "mlag/mlag.pb-c.h"
+#endif
+
 #define ZEBRA_MLAG_BUF_LIMIT 2048
 #define ZEBRA_MLAG_LEN_SIZE 4
 
@@ -33,14 +37,8 @@ extern uint8_t mlag_wr_buffer[ZEBRA_MLAG_BUF_LIMIT];
 extern uint8_t mlag_rd_buffer[ZEBRA_MLAG_BUF_LIMIT];
 extern uint32_t mlag_rd_buf_offset;
 
-static inline void zebra_mlag_reset_write_buffer(void)
-{
-       memset(mlag_wr_buffer, 0, ZEBRA_MLAG_BUF_LIMIT);
-}
-
 static inline void zebra_mlag_reset_read_buffer(void)
 {
-       memset(mlag_rd_buffer, 0, ZEBRA_MLAG_BUF_LIMIT);
        mlag_rd_buf_offset = 0;
 }
 
@@ -64,6 +62,6 @@ void zebra_mlag_process_mlag_data(uint8_t *data, uint32_t len);
  */
 int zebra_mlag_protobuf_encode_client_data(struct stream *s,
                                           uint32_t *msg_type);
-int zebra_mlag_protobuf_decode_message(struct stream **s, uint8_t *data,
+int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data,
                                       uint32_t len);
 #endif
index efaaa73c4d747bae5adc9906ff114ac18c205034..6cb40a9c12a03a66d59cb9686866ecbcd63ffd02 100644 (file)
@@ -26,6 +26,7 @@
 #include "hook.h"
 #include "module.h"
 #include "thread.h"
+#include "frr_pthread.h"
 #include "libfrr.h"
 #include "version.h"
 #include "network.h"
@@ -70,8 +71,10 @@ int zebra_mlag_private_write_data(uint8_t *data, uint32_t len)
 
 static void zebra_mlag_sched_read(void)
 {
-       thread_add_read(zmlag_master, zebra_mlag_read, NULL, mlag_socket,
-                       &zrouter.mlag_info.t_read);
+       frr_with_mutex (&zrouter.mlag_info.mlag_th_mtx) {
+               thread_add_read(zmlag_master, zebra_mlag_read, NULL,
+                               mlag_socket, &zrouter.mlag_info.t_read);
+       }
 }
 
 static int zebra_mlag_read(struct thread *thread)
@@ -80,8 +83,6 @@ static int zebra_mlag_read(struct thread *thread)
        uint32_t h_msglen;
        uint32_t tot_len, curr_len = mlag_rd_buf_offset;
 
-       zrouter.mlag_info.t_read = NULL;
-
        /*
         * Received message in sock_stream looks like below
         * | len-1 (4 Bytes) | payload-1 (len-1) |
@@ -103,6 +104,7 @@ static int zebra_mlag_read(struct thread *thread)
                        zebra_mlag_handle_process_state(MLAG_DOWN);
                        return -1;
                }
+               mlag_rd_buf_offset += data_len;
                if (data_len != (ssize_t)ZEBRA_MLAG_LEN_SIZE - curr_len) {
                        /* Try again later */
                        zebra_mlag_sched_read();
@@ -131,6 +133,7 @@ static int zebra_mlag_read(struct thread *thread)
                        zebra_mlag_handle_process_state(MLAG_DOWN);
                        return -1;
                }
+               mlag_rd_buf_offset += data_len;
                if (data_len != (ssize_t)tot_len - curr_len) {
                        /* Try again later */
                        zebra_mlag_sched_read();
index 437cab5bbd242cd1ec6ab2c8ea98a560976917fe..6fe3e4840f3f8346a1b94be65bfbe6c0dffd1f5f 100644 (file)
@@ -100,6 +100,7 @@ struct zebra_mlag_info {
        /* Threads for read/write. */
        struct thread *t_read;
        struct thread *t_write;
+       pthread_mutex_t mlag_th_mtx;
 };
 
 struct zebra_router {