]> git.proxmox.com Git - mirror_frr.git/blobdiff - zebra/zserv.c
zebra: reorganize zserv, batch i/o
[mirror_frr.git] / zebra / zserv.c
index 127913a63a63cd27978d805e192c1ce75f64ac31..ac2fe9c0e21913b7c85de6f6365481267750f507 100644 (file)
 #include "zebra/zebra_pbr.h"
 
 /* Event list of zebra. */
-enum event { ZEBRA_SERV, ZEBRA_READ, ZEBRA_WRITE };
-
-static void zebra_event(enum event event, int sock, struct zserv *client);
-
+enum event { ZEBRA_READ, ZEBRA_WRITE };
+/* privileges */
 extern struct zebra_privs_t zserv_privs;
+/* post event into client */
+static void zebra_event(struct zserv *client, enum event event);
 
-static void zebra_client_close(struct zserv *client);
-
-static int zserv_delayed_close(struct thread *thread)
-{
-       struct zserv *client = THREAD_ARG(thread);
 
-       client->t_suicide = NULL;
-       zebra_client_close(client);
-       return 0;
-}
+/* Public interface ======================================================== */
 
-static int zserv_flush_data(struct thread *thread)
+int zebra_server_send_message(struct zserv *client, struct stream *msg)
 {
-       struct zserv *client = THREAD_ARG(thread);
-
-       client->t_write = NULL;
-       if (client->t_suicide) {
-               zebra_client_close(client);
-               return -1;
-       }
-       switch (buffer_flush_available(client->wb, client->sock)) {
-       case BUFFER_ERROR:
-               zlog_warn(
-                       "%s: buffer_flush_available failed on zserv client fd %d, "
-                       "closing",
-                       __func__, client->sock);
-               zebra_client_close(client);
-               client = NULL;
-               break;
-       case BUFFER_PENDING:
-               client->t_write = NULL;
-               thread_add_write(zebrad.master, zserv_flush_data, client,
-                                client->sock, &client->t_write);
-               break;
-       case BUFFER_EMPTY:
-               break;
-       }
-
-       if (client)
-               client->last_write_time = monotime(NULL);
+       stream_fifo_push(client->obuf_fifo, msg);
+       zebra_event(client, ZEBRA_WRITE);
        return 0;
 }
 
-int zebra_server_send_message(struct zserv *client)
-{
-       if (client->t_suicide)
-               return -1;
-
-       if (client->is_synchronous)
-               return 0;
-
-       stream_set_getp(client->obuf, 0);
-       client->last_write_cmd = stream_getw_from(client->obuf, 6);
-       switch (buffer_write(client->wb, client->sock,
-                            STREAM_DATA(client->obuf),
-                            stream_get_endp(client->obuf))) {
-       case BUFFER_ERROR:
-               zlog_warn(
-                       "%s: buffer_write failed to zserv client fd %d, closing",
-                       __func__, client->sock);
-               /* Schedule a delayed close since many of the functions that
-                  call this
-                  one do not check the return code.  They do not allow for the
-                  possibility that an I/O error may have caused the client to
-                  be
-                  deleted. */
-               client->t_suicide = NULL;
-               thread_add_event(zebrad.master, zserv_delayed_close, client, 0,
-                                &client->t_suicide);
-               return -1;
-       case BUFFER_EMPTY:
-               THREAD_OFF(client->t_write);
-               break;
-       case BUFFER_PENDING:
-               thread_add_write(zebrad.master, zserv_flush_data, client,
-                                client->sock, &client->t_write);
-               break;
-       }
-
-       client->last_write_time = monotime(NULL);
-       return 0;
-}
+/* Encoding helpers -------------------------------------------------------- */
 
 static void zserv_encode_interface(struct stream *s, struct interface *ifp)
 {
@@ -202,6 +131,34 @@ static void zserv_encode_vrf(struct stream *s, struct zebra_vrf *zvrf)
        stream_putw_at(s, 0, stream_get_endp(s));
 }
 
+static int zserv_encode_nexthop(struct stream *s, struct nexthop *nexthop)
+{
+       stream_putc(s, nexthop->type);
+       switch (nexthop->type) {
+       case NEXTHOP_TYPE_IPV4:
+       case NEXTHOP_TYPE_IPV4_IFINDEX:
+               stream_put_in_addr(s, &nexthop->gate.ipv4);
+               stream_putl(s, nexthop->ifindex);
+               break;
+       case NEXTHOP_TYPE_IPV6:
+               stream_put(s, &nexthop->gate.ipv6, 16);
+               break;
+       case NEXTHOP_TYPE_IPV6_IFINDEX:
+               stream_put(s, &nexthop->gate.ipv6, 16);
+               stream_putl(s, nexthop->ifindex);
+               break;
+       case NEXTHOP_TYPE_IFINDEX:
+               stream_putl(s, nexthop->ifindex);
+               break;
+       default:
+               /* do nothing */
+               break;
+       }
+       return 1;
+}
+
+/* Send handlers ----------------------------------------------------------- */
+
 /* Interface is added. Send ZEBRA_INTERFACE_ADD to client. */
 /*
  * This function is called in the following situations:
@@ -215,65 +172,54 @@ static void zserv_encode_vrf(struct stream *s, struct zebra_vrf *zvrf)
  */
 int zsend_interface_add(struct zserv *client, struct interface *ifp)
 {
-       struct stream *s;
-
-       s = client->obuf;
-       stream_reset(s);
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
 
        zclient_create_header(s, ZEBRA_INTERFACE_ADD, ifp->vrf_id);
        zserv_encode_interface(s, ifp);
 
        client->ifadd_cnt++;
-       return zebra_server_send_message(client);
+       return zebra_server_send_message(client, s);
 }
 
 /* Interface deletion from zebra daemon. */
 int zsend_interface_delete(struct zserv *client, struct interface *ifp)
 {
-       struct stream *s;
-
-       s = client->obuf;
-       stream_reset(s);
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
 
        zclient_create_header(s, ZEBRA_INTERFACE_DELETE, ifp->vrf_id);
        zserv_encode_interface(s, ifp);
 
        client->ifdel_cnt++;
-       return zebra_server_send_message(client);
+       return zebra_server_send_message(client, s);
 }
 
 int zsend_vrf_add(struct zserv *client, struct zebra_vrf *zvrf)
 {
-       struct stream *s;
-
-       s = client->obuf;
-       stream_reset(s);
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
 
        zclient_create_header(s, ZEBRA_VRF_ADD, zvrf_id(zvrf));
        zserv_encode_vrf(s, zvrf);
 
        client->vrfadd_cnt++;
-       return zebra_server_send_message(client);
+       return zebra_server_send_message(client, s);
 }
 
 /* VRF deletion from zebra daemon. */
 int zsend_vrf_delete(struct zserv *client, struct zebra_vrf *zvrf)
-{
-       struct stream *s;
 
-       s = client->obuf;
-       stream_reset(s);
+{
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
 
        zclient_create_header(s, ZEBRA_VRF_DELETE, zvrf_id(zvrf));
        zserv_encode_vrf(s, zvrf);
 
        client->vrfdel_cnt++;
-       return zebra_server_send_message(client);
+       return zebra_server_send_message(client, s);
 }
 
 int zsend_interface_link_params(struct zserv *client, struct interface *ifp)
 {
-       struct stream *s;
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
 
        /* Check this client need interface information. */
        if (!client->ifinfo)
@@ -281,8 +227,6 @@ int zsend_interface_link_params(struct zserv *client, struct interface *ifp)
 
        if (!ifp->link_params)
                return 0;
-       s = client->obuf;
-       stream_reset(s);
 
        zclient_create_header(s, ZEBRA_INTERFACE_LINK_PARAMS, ifp->vrf_id);
 
@@ -296,7 +240,7 @@ int zsend_interface_link_params(struct zserv *client, struct interface *ifp)
        /* Write packet size. */
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client);
+       return zebra_server_send_message(client, s);
 }
 
 /* Interface address is added/deleted. Send ZEBRA_INTERFACE_ADDRESS_ADD or
@@ -341,11 +285,8 @@ int zsend_interface_address(int cmd, struct zserv *client,
                            struct interface *ifp, struct connected *ifc)
 {
        int blen;
-       struct stream *s;
        struct prefix *p;
-
-       s = client->obuf;
-       stream_reset(s);
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
 
        zclient_create_header(s, cmd, ifp->vrf_id);
        stream_putl(s, ifp->ifindex);
@@ -378,7 +319,7 @@ int zsend_interface_address(int cmd, struct zserv *client,
        stream_putw_at(s, 0, stream_get_endp(s));
 
        client->connected_rt_add_cnt++;
-       return zebra_server_send_message(client);
+       return zebra_server_send_message(client, s);
 }
 
 static int zsend_interface_nbr_address(int cmd, struct zserv *client,
@@ -386,12 +327,9 @@ static int zsend_interface_nbr_address(int cmd, struct zserv *client,
                                       struct nbr_connected *ifc)
 {
        int blen;
-       struct stream *s;
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
        struct prefix *p;
 
-       s = client->obuf;
-       stream_reset(s);
-
        zclient_create_header(s, cmd, ifp->vrf_id);
        stream_putl(s, ifp->ifindex);
 
@@ -412,7 +350,7 @@ static int zsend_interface_nbr_address(int cmd, struct zserv *client,
        /* Write packet size. */
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client);
+       return zebra_server_send_message(client, s);
 }
 
 /* Interface address addition. */
@@ -498,10 +436,7 @@ int zsend_interface_addresses(struct zserv *client, struct interface *ifp)
 int zsend_interface_vrf_update(struct zserv *client, struct interface *ifp,
                               vrf_id_t vrf_id)
 {
-       struct stream *s;
-
-       s = client->obuf;
-       stream_reset(s);
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
 
        zclient_create_header(s, ZEBRA_INTERFACE_VRF_UPDATE, ifp->vrf_id);
 
@@ -513,7 +448,7 @@ int zsend_interface_vrf_update(struct zserv *client, struct interface *ifp,
        stream_putw_at(s, 0, stream_get_endp(s));
 
        client->if_vrfchg_cnt++;
-       return zebra_server_send_message(client);
+       return zebra_server_send_message(client, s);
 }
 
 /* Add new nbr connected IPv6 address */
@@ -575,10 +510,7 @@ void nbr_connected_delete_ipv6(struct interface *ifp, struct in6_addr *address)
  */
 int zsend_interface_update(int cmd, struct zserv *client, struct interface *ifp)
 {
-       struct stream *s;
-
-       s = client->obuf;
-       stream_reset(s);
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
 
        zclient_create_header(s, cmd, ifp->vrf_id);
        zserv_encode_interface(s, ifp);
@@ -588,7 +520,7 @@ int zsend_interface_update(int cmd, struct zserv *client, struct interface *ifp)
        else
                client->ifdown_cnt++;
 
-       return zebra_server_send_message(client);
+       return zebra_server_send_message(client, s);
 }
 
 int zsend_redistribute_route(int cmd, struct zserv *client, struct prefix *p,
@@ -660,134 +592,342 @@ int zsend_redistribute_route(int cmd, struct zserv *client, struct prefix *p,
        SET_FLAG(api.message, ZAPI_MESSAGE_MTU);
        api.mtu = re->mtu;
 
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
+
        /* Encode route and send. */
-       if (zapi_route_encode(cmd, client->obuf, &api) < 0)
+       if (zapi_route_encode(cmd, s, &api) < 0)
                return -1;
-       return zebra_server_send_message(client);
-}
-
-static int zsend_write_nexthop(struct stream *s, struct nexthop *nexthop)
-{
-       stream_putc(s, nexthop->type);
-       switch (nexthop->type) {
-       case NEXTHOP_TYPE_IPV4:
-       case NEXTHOP_TYPE_IPV4_IFINDEX:
-               stream_put_in_addr(s, &nexthop->gate.ipv4);
-               stream_putl(s, nexthop->ifindex);
-               break;
-       case NEXTHOP_TYPE_IPV6:
-               stream_put(s, &nexthop->gate.ipv6, 16);
-               break;
-       case NEXTHOP_TYPE_IPV6_IFINDEX:
-               stream_put(s, &nexthop->gate.ipv6, 16);
-               stream_putl(s, nexthop->ifindex);
-               break;
-       case NEXTHOP_TYPE_IFINDEX:
-               stream_putl(s, nexthop->ifindex);
-               break;
-       default:
-               /* do nothing */
-               break;
-       }
-       return 1;
+       return zebra_server_send_message(client, s);
 }
 
-int cmd2type[] = {
-       [ZEBRA_NEXTHOP_REGISTER] = RNH_NEXTHOP_TYPE,
-       [ZEBRA_NEXTHOP_UNREGISTER] = RNH_NEXTHOP_TYPE,
-       [ZEBRA_IMPORT_ROUTE_REGISTER] = RNH_IMPORT_CHECK_TYPE,
-       [ZEBRA_IMPORT_ROUTE_UNREGISTER] = RNH_IMPORT_CHECK_TYPE,
-};
-
-/* Nexthop register */
-static void zserv_rnh_register(ZAPI_HANDLER_ARGS)
+/*
+ * Modified version of zsend_ipv4_nexthop_lookup(): Query unicast rib if
+ * nexthop is not found on mrib. Returns both route metric and protocol
+ * distance.
+ */
+static int zsend_ipv4_nexthop_lookup_mrib(struct zserv *client,
+                                         struct in_addr addr,
+                                         struct route_entry *re,
+                                         struct zebra_vrf *zvrf)
 {
-       struct rnh *rnh;
        struct stream *s;
-       struct prefix p;
-       u_short l = 0;
-       u_char flags = 0;
-       uint16_t type = cmd2type[hdr->command];
-
-       if (IS_ZEBRA_DEBUG_NHT)
-               zlog_debug(
-                       "rnh_register msg from client %s: length=%d, type=%s\n",
-                       zebra_route_string(client->proto), hdr->length,
-                       (type == RNH_NEXTHOP_TYPE) ? "nexthop" : "route");
+       unsigned long nump;
+       u_char num;
+       struct nexthop *nexthop;
 
-       s = client->ibuf;
+       /* Get output stream. */
+       s = stream_new(ZEBRA_MAX_PACKET_SIZ);
+       stream_reset(s);
 
-       client->nh_reg_time = monotime(NULL);
+       /* Fill in result. */
+       zclient_create_header(s, ZEBRA_IPV4_NEXTHOP_LOOKUP_MRIB, zvrf_id(zvrf));
+       stream_put_in_addr(s, &addr);
 
-       while (l < hdr->length) {
-               STREAM_GETC(s, flags);
-               STREAM_GETW(s, p.family);
-               STREAM_GETC(s, p.prefixlen);
-               l += 4;
-               if (p.family == AF_INET) {
-                       if (p.prefixlen > IPV4_MAX_BITLEN) {
-                               zlog_warn(
-                                       "%s: Specified prefix length %d is too large for a v4 address",
-                                       __PRETTY_FUNCTION__, p.prefixlen);
-                               return;
-                       }
-                       STREAM_GET(&p.u.prefix4.s_addr, s, IPV4_MAX_BYTELEN);
-                       l += IPV4_MAX_BYTELEN;
-               } else if (p.family == AF_INET6) {
-                       if (p.prefixlen > IPV6_MAX_BITLEN) {
-                               zlog_warn(
-                                       "%s: Specified prefix length %d is to large for a v6 address",
-                                       __PRETTY_FUNCTION__, p.prefixlen);
-                               return;
-                       }
-                       STREAM_GET(&p.u.prefix6, s, IPV6_MAX_BYTELEN);
-                       l += IPV6_MAX_BYTELEN;
-               } else {
-                       zlog_err(
-                               "rnh_register: Received unknown family type %d\n",
-                               p.family);
-                       return;
-               }
-               rnh = zebra_add_rnh(&p, zvrf_id(zvrf), type);
-               if (type == RNH_NEXTHOP_TYPE) {
-                       if (flags
-                           && !CHECK_FLAG(rnh->flags, ZEBRA_NHT_CONNECTED))
-                               SET_FLAG(rnh->flags, ZEBRA_NHT_CONNECTED);
-                       else if (!flags
-                                && CHECK_FLAG(rnh->flags, ZEBRA_NHT_CONNECTED))
-                               UNSET_FLAG(rnh->flags, ZEBRA_NHT_CONNECTED);
-               } else if (type == RNH_IMPORT_CHECK_TYPE) {
-                       if (flags
-                           && !CHECK_FLAG(rnh->flags, ZEBRA_NHT_EXACT_MATCH))
-                               SET_FLAG(rnh->flags, ZEBRA_NHT_EXACT_MATCH);
-                       else if (!flags && CHECK_FLAG(rnh->flags,
-                                                     ZEBRA_NHT_EXACT_MATCH))
-                               UNSET_FLAG(rnh->flags, ZEBRA_NHT_EXACT_MATCH);
-               }
+       if (re) {
+               stream_putc(s, re->distance);
+               stream_putl(s, re->metric);
+               num = 0;
+               nump = stream_get_endp(
+                       s);     /* remember position for nexthop_num */
+               stream_putc(s, 0); /* reserve room for nexthop_num */
+               /* Only non-recursive routes are elegible to resolve the nexthop
+                * we
+                * are looking up. Therefore, we will just iterate over the top
+                * chain of nexthops. */
+               for (nexthop = re->ng.nexthop; nexthop; nexthop = nexthop->next)
+                       if (CHECK_FLAG(nexthop->flags, NEXTHOP_FLAG_ACTIVE))
+                               num += zsend_write_nexthop(s, nexthop);
 
-               zebra_add_rnh_client(rnh, client, type, zvrf_id(zvrf));
-               /* Anything not AF_INET/INET6 has been filtered out above */
-               zebra_evaluate_rnh(zvrf_id(zvrf), p.family, 1, type, &p);
+               stream_putc_at(s, nump, num); /* store nexthop_num */
+       } else {
+               stream_putc(s, 0); /* distance */
+               stream_putl(s, 0); /* metric */
+               stream_putc(s, 0); /* nexthop_num */
        }
 
-stream_failure:
-       return;
+       stream_putw_at(s, 0, stream_get_endp(s));
+
+       return zebra_server_send_message(client, s);
 }
 
-/* Nexthop register */
-static void zserv_rnh_unregister(ZAPI_HANDLER_ARGS)
+int zsend_route_notify_owner(struct route_entry *re, struct prefix *p,
+                            enum zapi_route_notify_owner note)
 {
-       struct rnh *rnh;
+       struct zserv *client;
        struct stream *s;
-       struct prefix p;
-       u_short l = 0;
-       uint16_t type = cmd2type[hdr->command];
+       uint8_t blen;
 
-       if (IS_ZEBRA_DEBUG_NHT)
-               zlog_debug("rnh_unregister msg from client %s: length=%d\n",
-                          zebra_route_string(client->proto), hdr->length);
+       client = zebra_find_client(re->type, re->instance);
+       if (!client || !client->notify_owner) {
+               if (IS_ZEBRA_DEBUG_PACKET) {
+                       char buff[PREFIX_STRLEN];
 
-       s = client->ibuf;
+                       zlog_debug(
+                               "Not Notifying Owner: %u about prefix %s(%u) %d",
+                               re->type, prefix2str(p, buff, sizeof(buff)),
+                               re->table, note);
+               }
+               return 0;
+       }
+
+       if (IS_ZEBRA_DEBUG_PACKET) {
+               char buff[PREFIX_STRLEN];
+
+               zlog_debug("Notifying Owner: %u about prefix %s(%u) %d",
+                          re->type, prefix2str(p, buff, sizeof(buff)),
+                          re->table, note);
+       }
+
+       s = stream_new(ZEBRA_MAX_PACKET_SIZ);
+       stream_reset(s);
+
+       zclient_create_header(s, ZEBRA_ROUTE_NOTIFY_OWNER, re->vrf_id);
+
+       stream_put(s, &note, sizeof(note));
+
+       stream_putc(s, p->family);
+
+       blen = prefix_blen(p);
+       stream_putc(s, p->prefixlen);
+       stream_put(s, &p->u.prefix, blen);
+
+       stream_putl(s, re->table);
+
+       stream_putw_at(s, 0, stream_get_endp(s));
+
+       return zebra_server_send_message(client, s);
+}
+
+void zsend_rule_notify_owner(struct zebra_pbr_rule *rule,
+                            enum zapi_rule_notify_owner note)
+{
+       struct listnode *node;
+       struct zserv *client;
+       struct stream *s;
+
+       if (IS_ZEBRA_DEBUG_PACKET) {
+               zlog_debug("%s: Notifying %u",
+                          __PRETTY_FUNCTION__, rule->unique);
+       }
+
+       for (ALL_LIST_ELEMENTS_RO(zebrad.client_list, node, client)) {
+               if (rule->sock == client->sock)
+                       break;
+       }
+
+       if (!client)
+               return;
+
+       s = stream_new(ZEBRA_MAX_PACKET_SIZ);
+       stream_reset(s);
+
+       zclient_create_header(s, ZEBRA_RULE_NOTIFY_OWNER, VRF_DEFAULT);
+       stream_put(s, &note, sizeof(note));
+       stream_putl(s, rule->seq);
+       stream_putl(s, rule->priority);
+       stream_putl(s, rule->unique);
+       if (rule->ifp)
+               stream_putl(s, rule->ifp->ifindex);
+       else
+               stream_putl(s, 0);
+
+       stream_putw_at(s, 0, stream_get_endp(s));
+
+       zebra_server_send_message(client, s);
+}
+
+/* Router-id is updated. Send ZEBRA_ROUTER_ID_ADD to client. */
+int zsend_router_id_update(struct zserv *client, struct prefix *p,
+                          vrf_id_t vrf_id)
+{
+       int blen;
+
+       /* Check this client need interface information. */
+       if (!vrf_bitmap_check(client->ridinfo, vrf_id))
+               return 0;
+
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
+
+       /* Message type. */
+       zclient_create_header(s, ZEBRA_ROUTER_ID_UPDATE, vrf_id);
+
+       /* Prefix information. */
+       stream_putc(s, p->family);
+       blen = prefix_blen(p);
+       stream_put(s, &p->u.prefix, blen);
+       stream_putc(s, p->prefixlen);
+
+       /* Write packet size. */
+       stream_putw_at(s, 0, stream_get_endp(s));
+
+       return zebra_server_send_message(client, s);
+}
+
+/*
+ * Function used by Zebra to send a PW status update to LDP daemon
+ */
+int zsend_pw_update(struct zserv *client, struct zebra_pw *pw)
+{
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
+
+       zclient_create_header(s, ZEBRA_PW_STATUS_UPDATE, pw->vrf_id);
+       stream_write(s, pw->ifname, IF_NAMESIZE);
+       stream_putl(s, pw->ifindex);
+       stream_putl(s, pw->status);
+
+       /* Put length at the first point of the stream. */
+       stream_putw_at(s, 0, stream_get_endp(s));
+
+       return zebra_server_send_message(client, s);
+}
+
+/* Send response to a get label chunk request to client */
+static int zsend_assign_label_chunk_response(struct zserv *client,
+                                            vrf_id_t vrf_id,
+                                            struct label_manager_chunk *lmc)
+{
+       int ret;
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
+
+       zclient_create_header(s, ZEBRA_GET_LABEL_CHUNK, vrf_id);
+
+       if (lmc) {
+               /* keep */
+               stream_putc(s, lmc->keep);
+               /* start and end labels */
+               stream_putl(s, lmc->start);
+               stream_putl(s, lmc->end);
+       }
+
+       /* Write packet size. */
+       stream_putw_at(s, 0, stream_get_endp(s));
+
+       ret = writen(client->sock, s->data, stream_get_endp(s));
+       stream_free(s);
+       return ret;
+}
+
+/* Send response to a label manager connect request to client */
+static int zsend_label_manager_connect_response(struct zserv *client,
+                                               vrf_id_t vrf_id, u_short result)
+{
+       int ret;
+       struct stream *s = stream_new(ZEBRA_MAX_PACKET_SIZ);
+
+       zclient_create_header(s, ZEBRA_LABEL_MANAGER_CONNECT, vrf_id);
+
+       /* result */
+       stream_putc(s, result);
+
+       /* Write packet size. */
+       stream_putw_at(s, 0, stream_get_endp(s));
+
+       ret = writen(client->sock, s->data, stream_get_endp(s));
+       stream_free(s);
+
+       return ret;
+}
+
+/* Inbound message handling ------------------------------------------------ */
+
+int cmd2type[] = {
+       [ZEBRA_NEXTHOP_REGISTER] = RNH_NEXTHOP_TYPE,
+       [ZEBRA_NEXTHOP_UNREGISTER] = RNH_NEXTHOP_TYPE,
+       [ZEBRA_IMPORT_ROUTE_REGISTER] = RNH_IMPORT_CHECK_TYPE,
+       [ZEBRA_IMPORT_ROUTE_UNREGISTER] = RNH_IMPORT_CHECK_TYPE,
+};
+
+/* Nexthop register */
+static void zread_rnh_register(ZAPI_HANDLER_ARGS)
+{
+       struct rnh *rnh;
+       struct stream *s;
+       struct prefix p;
+       u_short l = 0;
+       u_char flags = 0;
+       uint16_t type = cmd2type[hdr->command];
+
+       if (IS_ZEBRA_DEBUG_NHT)
+               zlog_debug(
+                       "rnh_register msg from client %s: hdr->length=%d, type=%s\n",
+                       zebra_route_string(client->proto), hdr->length,
+                       (type == RNH_NEXTHOP_TYPE) ? "nexthop" : "route");
+
+       s = msg;
+
+       client->nh_reg_time = monotime(NULL);
+
+       while (l < hdr->length) {
+               STREAM_GETC(s, flags);
+               STREAM_GETW(s, p.family);
+               STREAM_GETC(s, p.prefixlen);
+               l += 4;
+               if (p.family == AF_INET) {
+                       if (p.prefixlen > IPV4_MAX_BITLEN) {
+                               zlog_warn(
+                                       "%s: Specified prefix hdr->length %d is too large for a v4 address",
+                                       __PRETTY_FUNCTION__, p.prefixlen);
+                               return;
+                       }
+                       STREAM_GET(&p.u.prefix4.s_addr, s, IPV4_MAX_BYTELEN);
+                       l += IPV4_MAX_BYTELEN;
+               } else if (p.family == AF_INET6) {
+                       if (p.prefixlen > IPV6_MAX_BITLEN) {
+                               zlog_warn(
+                                       "%s: Specified prefix hdr->length %d is to large for a v6 address",
+                                       __PRETTY_FUNCTION__, p.prefixlen);
+                               return;
+                       }
+                       STREAM_GET(&p.u.prefix6, s, IPV6_MAX_BYTELEN);
+                       l += IPV6_MAX_BYTELEN;
+               } else {
+                       zlog_err(
+                               "rnh_register: Received unknown family type %d\n",
+                               p.family);
+                       return;
+               }
+               rnh = zebra_add_rnh(&p, zvrf_id(zvrf), type);
+               if (type == RNH_NEXTHOP_TYPE) {
+                       if (flags
+                           && !CHECK_FLAG(rnh->flags, ZEBRA_NHT_CONNECTED))
+                               SET_FLAG(rnh->flags, ZEBRA_NHT_CONNECTED);
+                       else if (!flags
+                                && CHECK_FLAG(rnh->flags, ZEBRA_NHT_CONNECTED))
+                               UNSET_FLAG(rnh->flags, ZEBRA_NHT_CONNECTED);
+               } else if (type == RNH_IMPORT_CHECK_TYPE) {
+                       if (flags
+                           && !CHECK_FLAG(rnh->flags, ZEBRA_NHT_EXACT_MATCH))
+                               SET_FLAG(rnh->flags, ZEBRA_NHT_EXACT_MATCH);
+                       else if (!flags
+                                && CHECK_FLAG(rnh->flags,
+                                              ZEBRA_NHT_EXACT_MATCH))
+                               UNSET_FLAG(rnh->flags, ZEBRA_NHT_EXACT_MATCH);
+               }
+
+               zebra_add_rnh_client(rnh, client, type, zvrf_id(zvrf));
+               /* Anything not AF_INET/INET6 has been filtered out above */
+               zebra_evaluate_rnh(zvrf_id(zvrf), p.family, 1, type, &p);
+       }
+
+stream_failure:
+       return;
+}
+
+/* Nexthop register */
+static void zread_rnh_unregister(ZAPI_HANDLER_ARGS)
+{
+       struct rnh *rnh;
+       struct stream *s;
+       struct prefix p;
+       u_short l = 0;
+       uint16_t type = cmd2type[hdr->command];
+
+       if (IS_ZEBRA_DEBUG_NHT)
+               zlog_debug(
+                       "rnh_unregister msg from client %s: hdr->length=%d\n",
+                       zebra_route_string(client->proto), hdr->length);
+
+       s = msg;
 
        while (l < hdr->length) {
                uint8_t flags;
@@ -802,7 +942,7 @@ static void zserv_rnh_unregister(ZAPI_HANDLER_ARGS)
                if (p.family == AF_INET) {
                        if (p.prefixlen > IPV4_MAX_BITLEN) {
                                zlog_warn(
-                                       "%s: Specified prefix length %d is to large for a v4 address",
+                                       "%s: Specified prefix hdr->length %d is to large for a v4 address",
                                        __PRETTY_FUNCTION__, p.prefixlen);
                                return;
                        }
@@ -811,7 +951,7 @@ static void zserv_rnh_unregister(ZAPI_HANDLER_ARGS)
                } else if (p.family == AF_INET6) {
                        if (p.prefixlen > IPV6_MAX_BITLEN) {
                                zlog_warn(
-                                       "%s: Specified prefix length %d is to large for a v6 address",
+                                       "%s: Specified prefix hdr->length %d is to large for a v6 address",
                                        __PRETTY_FUNCTION__, p.prefixlen);
                                return;
                        }
@@ -836,18 +976,18 @@ stream_failure:
 #define ZEBRA_MIN_FEC_LENGTH 5
 
 /* FEC register */
-static void zserv_fec_register(ZAPI_HANDLER_ARGS)
+static void zread_fec_register(ZAPI_HANDLER_ARGS)
 {
        struct stream *s;
        u_short l = 0;
        struct prefix p;
-       u_int16_t flags;
-       u_int32_t label_index = MPLS_INVALID_LABEL_INDEX;
+       uint16_t flags;
+       uint32_t label_index = MPLS_INVALID_LABEL_INDEX;
 
-       s = client->ibuf;
+       s = msg;
        zvrf = vrf_info_lookup(VRF_DEFAULT);
        if (!zvrf)
-               return;
+               return; // unexpected
 
        /*
         * The minimum amount of data that can be sent for one fec
@@ -855,7 +995,7 @@ static void zserv_fec_register(ZAPI_HANDLER_ARGS)
         */
        if (hdr->length < ZEBRA_MIN_FEC_LENGTH) {
                zlog_err(
-                       "fec_register: Received a fec register of length %d, it is of insufficient size to properly decode",
+                       "fec_register: Received a fec register of hdr->length %d, it is of insufficient size to properly decode",
                        hdr->length);
                return;
        }
@@ -875,7 +1015,7 @@ static void zserv_fec_register(ZAPI_HANDLER_ARGS)
                    || (p.family == AF_INET6
                        && p.prefixlen > IPV6_MAX_BITLEN)) {
                        zlog_warn(
-                               "%s: Specified prefix length: %d is to long for %d",
+                               "%s: Specified prefix hdr->length: %d is to long for %d",
                                __PRETTY_FUNCTION__, p.prefixlen, p.family);
                        return;
                }
@@ -895,17 +1035,17 @@ stream_failure:
 }
 
 /* FEC unregister */
-static void zserv_fec_unregister(ZAPI_HANDLER_ARGS)
+static void zread_fec_unregister(ZAPI_HANDLER_ARGS)
 {
        struct stream *s;
        u_short l = 0;
        struct prefix p;
        uint16_t flags;
 
-       s = client->ibuf;
+       s = msg;
        zvrf = vrf_info_lookup(VRF_DEFAULT);
        if (!zvrf)
-               return;
+               return; // unexpected
 
        /*
         * The minimum amount of data that can be sent for one
@@ -913,7 +1053,7 @@ static void zserv_fec_unregister(ZAPI_HANDLER_ARGS)
         */
        if (hdr->length < ZEBRA_MIN_FEC_LENGTH) {
                zlog_err(
-                       "fec_unregister: Received a fec unregister of length %d, it is of insufficient size to properly decode",
+                       "fec_unregister: Received a fec unregister of hdr->length %d, it is of insufficient size to properly decode",
                        hdr->length);
                return;
        }
@@ -936,7 +1076,7 @@ static void zserv_fec_unregister(ZAPI_HANDLER_ARGS)
                    || (p.family == AF_INET6
                        && p.prefixlen > IPV6_MAX_BITLEN)) {
                        zlog_warn(
-                               "%s: Received prefix length %d which is greater than %d can support",
+                               "%s: Received prefix hdr->length %d which is greater than %d can support",
                                __PRETTY_FUNCTION__, p.prefixlen, p.family);
                        return;
                }
@@ -950,199 +1090,16 @@ stream_failure:
        return;
 }
 
+
+
 /*
-  Modified version of zsend_ipv4_nexthop_lookup():
-  Query unicast rib if nexthop is not found on mrib.
-  Returns both route metric and protocol distance.
-*/
-static int zsend_ipv4_nexthop_lookup_mrib(struct zserv *client,
-                                         struct in_addr addr,
-                                         struct route_entry *re,
-                                         struct zebra_vrf *zvrf)
+ * Register zebra server interface information.
+ * Send current all interface and address information.
+ */
+static void zread_interface_add(ZAPI_HANDLER_ARGS)
 {
-       struct stream *s;
-       unsigned long nump;
-       u_char num;
-       struct nexthop *nexthop;
-
-       /* Get output stream. */
-       s = client->obuf;
-       stream_reset(s);
-
-       /* Fill in result. */
-       zclient_create_header(s, ZEBRA_IPV4_NEXTHOP_LOOKUP_MRIB, zvrf_id(zvrf));
-       stream_put_in_addr(s, &addr);
-
-       if (re) {
-               stream_putc(s, re->distance);
-               stream_putl(s, re->metric);
-               num = 0;
-               nump = stream_get_endp(
-                       s);     /* remember position for nexthop_num */
-               stream_putc(s, 0); /* reserve room for nexthop_num */
-               /* Only non-recursive routes are elegible to resolve the nexthop
-                * we
-                * are looking up. Therefore, we will just iterate over the top
-                * chain of nexthops. */
-               for (nexthop = re->ng.nexthop; nexthop; nexthop = nexthop->next)
-                       if (CHECK_FLAG(nexthop->flags, NEXTHOP_FLAG_ACTIVE))
-                               num += zsend_write_nexthop(s, nexthop);
-
-               stream_putc_at(s, nump, num); /* store nexthop_num */
-       } else {
-               stream_putc(s, 0); /* distance */
-               stream_putl(s, 0); /* metric */
-               stream_putc(s, 0); /* nexthop_num */
-       }
-
-       stream_putw_at(s, 0, stream_get_endp(s));
-
-       return zebra_server_send_message(client);
-}
-
-int zsend_route_notify_owner(struct route_entry *re, struct prefix *p,
-                            enum zapi_route_notify_owner note)
-{
-       struct zserv *client;
-       struct stream *s;
-       uint8_t blen;
-
-       client = zebra_find_client(re->type, re->instance);
-       if (!client || !client->notify_owner) {
-               if (IS_ZEBRA_DEBUG_PACKET) {
-                       char buff[PREFIX_STRLEN];
-
-                       zlog_debug(
-                               "Not Notifying Owner: %u about prefix %s(%u) %d",
-                               re->type, prefix2str(p, buff, sizeof(buff)),
-                               re->table, note);
-               }
-               return 0;
-       }
-
-       if (IS_ZEBRA_DEBUG_PACKET) {
-               char buff[PREFIX_STRLEN];
-
-               zlog_debug("Notifying Owner: %u about prefix %s(%u) %d",
-                          re->type, prefix2str(p, buff, sizeof(buff)),
-                          re->table, note);
-       }
-
-       s = client->obuf;
-       stream_reset(s);
-
-       zclient_create_header(s, ZEBRA_ROUTE_NOTIFY_OWNER, re->vrf_id);
-
-       stream_put(s, &note, sizeof(note));
-
-       stream_putc(s, p->family);
-
-       blen = prefix_blen(p);
-       stream_putc(s, p->prefixlen);
-       stream_put(s, &p->u.prefix, blen);
-
-       stream_putl(s, re->table);
-
-       stream_putw_at(s, 0, stream_get_endp(s));
-
-       return zebra_server_send_message(client);
-}
-
-void zsend_rule_notify_owner(struct zebra_pbr_rule *rule,
-                            enum zapi_rule_notify_owner note)
-{
-       struct listnode *node;
-       struct zserv *client;
-       struct stream *s;
-
-       if (IS_ZEBRA_DEBUG_PACKET) {
-               zlog_debug("%s: Notifying %u",
-                          __PRETTY_FUNCTION__, rule->unique);
-       }
-
-       for (ALL_LIST_ELEMENTS_RO(zebrad.client_list, node, client)) {
-               if (rule->sock == client->sock)
-                       break;
-       }
-
-       if (!client)
-               return;
-
-       s = client->obuf;
-       stream_reset(s);
-
-       zclient_create_header(s, ZEBRA_RULE_NOTIFY_OWNER, VRF_DEFAULT);
-       stream_put(s, &note, sizeof(note));
-       stream_putl(s, rule->seq);
-       stream_putl(s, rule->priority);
-       stream_putl(s, rule->unique);
-       if (rule->ifp)
-               stream_putl(s, rule->ifp->ifindex);
-       else
-               stream_putl(s, 0);
-
-       stream_putw_at(s, 0, stream_get_endp(s));
-
-       zebra_server_send_message(client);
-}
-
-/* Router-id is updated. Send ZEBRA_ROUTER_ID_ADD to client. */
-int zsend_router_id_update(struct zserv *client, struct prefix *p,
-                          vrf_id_t vrf_id)
-{
-       struct stream *s;
-       int blen;
-
-       /* Check this client need interface information. */
-       if (!vrf_bitmap_check(client->ridinfo, vrf_id))
-               return 0;
-
-       s = client->obuf;
-       stream_reset(s);
-
-       /* Message type. */
-       zclient_create_header(s, ZEBRA_ROUTER_ID_UPDATE, vrf_id);
-
-       /* Prefix information. */
-       stream_putc(s, p->family);
-       blen = prefix_blen(p);
-       stream_put(s, &p->u.prefix, blen);
-       stream_putc(s, p->prefixlen);
-
-       /* Write packet size. */
-       stream_putw_at(s, 0, stream_get_endp(s));
-
-       return zebra_server_send_message(client);
-}
-
-/*
- * Function used by Zebra to send a PW status update to LDP daemon
- */
-int zsend_pw_update(struct zserv *client, struct zebra_pw *pw)
-{
-       struct stream *s;
-
-       s = client->obuf;
-       stream_reset(s);
-
-       zclient_create_header(s, ZEBRA_PW_STATUS_UPDATE, pw->vrf_id);
-       stream_write(s, pw->ifname, IF_NAMESIZE);
-       stream_putl(s, pw->ifindex);
-       stream_putl(s, pw->status);
-
-       /* Put length at the first point of the stream. */
-       stream_putw_at(s, 0, stream_get_endp(s));
-
-       return zebra_server_send_message(client);
-}
-
-/* Register zebra server interface information.  Send current all
-   interface and address information. */
-static void zread_interface_add(ZAPI_HANDLER_ARGS)
-
-{
-       struct vrf *vrf;
-       struct interface *ifp;
+       struct vrf *vrf;
+       struct interface *ifp;
 
        /* Interface information is needed. */
        vrf_bitmap_set(client->ifinfo, zvrf_id(zvrf));
@@ -1153,14 +1110,10 @@ static void zread_interface_add(ZAPI_HANDLER_ARGS)
                        if (!CHECK_FLAG(ifp->status, ZEBRA_INTERFACE_ACTIVE))
                                continue;
 
-                       if (zsend_interface_add(client, ifp) < 0)
-                               return;
-
-                       if (zsend_interface_addresses(client, ifp) < 0)
-                               return;
+                       zsend_interface_add(client, ifp);
+                       zsend_interface_addresses(client, ifp);
                }
        }
-       return;
 }
 
 /* Unregister zebra server interface information. */
@@ -1193,9 +1146,8 @@ static void zread_route_add(ZAPI_HANDLER_ARGS)
        int i, ret;
        vrf_id_t vrf_id = 0;
 
-       s = client->ibuf;
-       if (zapi_route_decode(s, &api) < 0)
-               return;
+       s = msg;
+       zapi_route_decode(s, &api);
 
        /* Allocate new route. */
        vrf_id = zvrf_id(zvrf);
@@ -1342,7 +1294,7 @@ static void zread_route_del(ZAPI_HANDLER_ARGS)
        afi_t afi;
        struct prefix_ipv6 *src_p = NULL;
 
-       s = client->ibuf;
+       s = msg;
        if (zapi_route_decode(s, &api) < 0)
                return;
 
@@ -1368,8 +1320,6 @@ static void zread_route_del(ZAPI_HANDLER_ARGS)
                client->v6_route_del_cnt++;
                break;
        }
-
-       return;
 }
 
 /* This function support multiple nexthop. */
@@ -1396,7 +1346,7 @@ static void zread_ipv4_add(ZAPI_HANDLER_ARGS)
        enum blackhole_type bh_type = BLACKHOLE_NULL;
 
        /* Get input stream.  */
-       s = client->ibuf;
+       s = msg;
 
        /* Allocate new re. */
        re = XCALLOC(MTYPE_RE, sizeof(struct route_entry));
@@ -1527,7 +1477,6 @@ static void zread_ipv4_add(ZAPI_HANDLER_ARGS)
 stream_failure:
        nexthops_free(re->ng.nexthop);
        XFREE(MTYPE_RE, re);
-       return;
 }
 
 /* Zebra server IPv4 prefix delete function. */
@@ -1538,7 +1487,7 @@ static void zread_ipv4_delete(ZAPI_HANDLER_ARGS)
        struct prefix p;
        u_int32_t table_id;
 
-       s = client->ibuf;
+       s = msg;
 
        /* Type, flags, message. */
        STREAM_GETC(s, api.type);
@@ -1574,10 +1523,9 @@ static void zread_ipv4_nexthop_lookup_mrib(ZAPI_HANDLER_ARGS)
        struct in_addr addr;
        struct route_entry *re;
 
-       STREAM_GET(&addr.s_addr, client->ibuf, IPV4_MAX_BYTELEN);
+       STREAM_GET(&addr.s_addr, msg, IPV4_MAX_BYTELEN);
        re = rib_match_ipv4_multicast(zvrf_id(zvrf), addr, NULL);
        zsend_ipv4_nexthop_lookup_mrib(client, addr, re, zvrf);
-       return;
 
 stream_failure:
        return;
@@ -1605,7 +1553,7 @@ static void zread_ipv4_route_ipv6_nexthop_add(ZAPI_HANDLER_ARGS)
        enum blackhole_type bh_type = BLACKHOLE_NULL;
 
        /* Get input stream.  */
-       s = client->ibuf;
+       s = msg;
 
        memset(&nhop_addr, 0, sizeof(struct in6_addr));
 
@@ -1618,7 +1566,6 @@ static void zread_ipv4_route_ipv6_nexthop_add(ZAPI_HANDLER_ARGS)
                zlog_warn("%s: Specified route type: %d is not a legal value\n",
                          __PRETTY_FUNCTION__, re->type);
                XFREE(MTYPE_RE, re);
-
                return;
        }
        STREAM_GETW(s, re->instance);
@@ -1754,7 +1701,6 @@ static void zread_ipv4_route_ipv6_nexthop_add(ZAPI_HANDLER_ARGS)
 stream_failure:
        nexthops_free(re->ng.nexthop);
        XFREE(MTYPE_RE, re);
-       return;
 }
 
 static void zread_ipv6_add(ZAPI_HANDLER_ARGS)
@@ -1780,7 +1726,7 @@ static void zread_ipv6_add(ZAPI_HANDLER_ARGS)
        enum blackhole_type bh_type = BLACKHOLE_NULL;
 
        /* Get input stream.  */
-       s = client->ibuf;
+       s = msg;
 
        memset(&nhop_addr, 0, sizeof(struct in6_addr));
 
@@ -1947,8 +1893,6 @@ static void zread_ipv6_add(ZAPI_HANDLER_ARGS)
 stream_failure:
        nexthops_free(re->ng.nexthop);
        XFREE(MTYPE_RE, re);
-
-       return;
 }
 
 /* Zebra server IPv6 prefix delete function. */
@@ -1959,7 +1903,7 @@ static void zread_ipv6_delete(ZAPI_HANDLER_ARGS)
        struct prefix p;
        struct prefix_ipv6 src_p, *src_pp;
 
-       s = client->ibuf;
+       s = msg;
 
        /* Type, flags, message. */
        STREAM_GETC(s, api.type);
@@ -2020,9 +1964,9 @@ static void zread_hello(ZAPI_HANDLER_ARGS)
        u_short instance;
        u_char notify;
 
-       STREAM_GETC(client->ibuf, proto);
-       STREAM_GETW(client->ibuf, instance);
-       STREAM_GETC(client->ibuf, notify);
+       STREAM_GETC(msg, proto);
+       STREAM_GETW(msg, instance);
+       STREAM_GETC(msg, notify);
        if (notify)
                client->notify_owner = true;
 
@@ -2068,7 +2012,7 @@ static void zread_mpls_labels(ZAPI_HANDLER_ARGS)
        u_int8_t distance;
 
        /* Get input stream.  */
-       s = client->ibuf;
+       s = msg;
 
        /* Get data. */
        STREAM_GETC(s, type);
@@ -2139,27 +2083,9 @@ static void zread_mpls_labels(ZAPI_HANDLER_ARGS)
 stream_failure:
        return;
 }
-/* Send response to a label manager connect request to client */
-static int zsend_label_manager_connect_response(struct zserv *client,
-                                               vrf_id_t vrf_id, u_short result)
-{
-       struct stream *s;
-
-       s = client->obuf;
-       stream_reset(s);
-
-       zclient_create_header(s, ZEBRA_LABEL_MANAGER_CONNECT, vrf_id);
-
-       /* result */
-       stream_putc(s, result);
-
-       /* Write packet size. */
-       stream_putw_at(s, 0, stream_get_endp(s));
-
-       return writen(client->sock, s->data, stream_get_endp(s));
-}
 
-static void zread_label_manager_connect(struct zserv *client, vrf_id_t vrf_id)
+static void zread_label_manager_connect(struct zserv *client,
+                                       struct stream *msg, vrf_id_t vrf_id)
 {
        struct stream *s;
        /* type of protocol (lib/zebra.h) */
@@ -2167,7 +2093,7 @@ static void zread_label_manager_connect(struct zserv *client, vrf_id_t vrf_id)
        u_short instance;
 
        /* Get input stream.  */
-       s = client->ibuf;
+       s = msg;
 
        /* Get data. */
        STREAM_GETC(s, proto);
@@ -2200,33 +2126,9 @@ static void zread_label_manager_connect(struct zserv *client, vrf_id_t vrf_id)
 stream_failure:
        return;
 }
-/* Send response to a get label chunk request to client */
-static int zsend_assign_label_chunk_response(struct zserv *client,
-                                            vrf_id_t vrf_id,
-                                            struct label_manager_chunk *lmc)
-{
-       struct stream *s;
-
-       s = client->obuf;
-       stream_reset(s);
-
-       zclient_create_header(s, ZEBRA_GET_LABEL_CHUNK, vrf_id);
-
-       if (lmc) {
-               /* keep */
-               stream_putc(s, lmc->keep);
-               /* start and end labels */
-               stream_putl(s, lmc->start);
-               stream_putl(s, lmc->end);
-       }
-
-       /* Write packet size. */
-       stream_putw_at(s, 0, stream_get_endp(s));
-
-       return writen(client->sock, s->data, stream_get_endp(s));
-}
 
-static void zread_get_label_chunk(struct zserv *client, vrf_id_t vrf_id)
+static void zread_get_label_chunk(struct zserv *client, struct stream *msg,
+                                 vrf_id_t vrf_id)
 {
        struct stream *s;
        u_char keep;
@@ -2234,7 +2136,7 @@ static void zread_get_label_chunk(struct zserv *client, vrf_id_t vrf_id)
        struct label_manager_chunk *lmc;
 
        /* Get input stream.  */
-       s = client->ibuf;
+       s = msg;
 
        /* Get data. */
        STREAM_GETC(s, keep);
@@ -2254,13 +2156,13 @@ stream_failure:
        return;
 }
 
-static void zread_release_label_chunk(struct zserv *client)
+static void zread_release_label_chunk(struct zserv *client, struct stream *msg)
 {
        struct stream *s;
        uint32_t start, end;
 
        /* Get input stream.  */
-       s = client->ibuf;
+       s = msg;
 
        /* Get data. */
        STREAM_GETL(s, start);
@@ -2284,7 +2186,7 @@ static void zread_label_manager_request(ZAPI_HANDLER_ARGS)
        /* this is a label manager */
        else {
                if (hdr->command == ZEBRA_LABEL_MANAGER_CONNECT)
-                       zread_label_manager_connect(client, zvrf_id(zvrf));
+                       zread_label_manager_connect(client, msg, zvrf_id(zvrf));
                else {
                        /* Sanity: don't allow 'unidentified' requests */
                        if (!client->proto) {
@@ -2293,9 +2195,10 @@ static void zread_label_manager_request(ZAPI_HANDLER_ARGS)
                                return;
                        }
                        if (hdr->command == ZEBRA_GET_LABEL_CHUNK)
-                               zread_get_label_chunk(client, zvrf_id(zvrf));
+                               zread_get_label_chunk(client, msg,
+                                                     zvrf_id(zvrf));
                        else if (hdr->command == ZEBRA_RELEASE_LABEL_CHUNK)
-                               zread_release_label_chunk(client);
+                               zread_release_label_chunk(client, msg);
                }
        }
 }
@@ -2316,7 +2219,7 @@ static void zread_pseudowire(ZAPI_HANDLER_ARGS)
        struct zebra_pw *pw;
 
        /* Get input stream.  */
-       s = client->ibuf;
+       s = msg;
 
        /* Get data. */
        STREAM_GET(ifname, s, IF_NAMESIZE);
@@ -2413,116 +2316,11 @@ static void zebra_client_close_cleanup_rnh(struct zserv *client)
        }
 }
 
-/* free zebra client information. */
-static void zebra_client_free(struct zserv *client)
-{
-       /* Send client de-registration to BFD */
-       zebra_ptm_bfd_client_deregister(client->proto);
-
-       /* Cleanup any registered nexthops - across all VRFs. */
-       zebra_client_close_cleanup_rnh(client);
-
-       /* Release Label Manager chunks */
-       release_daemon_chunks(client->proto, client->instance);
-
-       /* Cleanup any FECs registered by this client. */
-       zebra_mpls_cleanup_fecs_for_client(vrf_info_lookup(VRF_DEFAULT),
-                                          client);
-
-       /* Remove pseudowires associated with this client */
-       zebra_pw_client_close(client);
-
-       /* Close file descriptor. */
-       if (client->sock) {
-               unsigned long nroutes;
-
-               close(client->sock);
-               nroutes = rib_score_proto(client->proto, client->instance);
-               zlog_notice(
-                       "client %d disconnected. %lu %s routes removed from the rib",
-                       client->sock, nroutes,
-                       zebra_route_string(client->proto));
-               client->sock = -1;
-       }
-
-       /* Free stream buffers. */
-       if (client->ibuf)
-               stream_free(client->ibuf);
-       if (client->obuf)
-               stream_free(client->obuf);
-       if (client->wb)
-               buffer_free(client->wb);
-
-       /* Release threads. */
-       if (client->t_read)
-               thread_cancel(client->t_read);
-       if (client->t_write)
-               thread_cancel(client->t_write);
-       if (client->t_suicide)
-               thread_cancel(client->t_suicide);
-
-       /* Free bitmaps. */
-       for (afi_t afi = AFI_IP; afi < AFI_MAX; afi++)
-               for (int i = 0; i < ZEBRA_ROUTE_MAX; i++)
-                       vrf_bitmap_free(client->redist[afi][i]);
-
-       vrf_bitmap_free(client->redist_default);
-       vrf_bitmap_free(client->ifinfo);
-       vrf_bitmap_free(client->ridinfo);
-
-       XFREE(MTYPE_TMP, client);
-}
-
-static void zebra_client_close(struct zserv *client)
-{
-       listnode_delete(zebrad.client_list, client);
-       zebra_client_free(client);
-}
-
-/* Make new client. */
-static void zebra_client_create(int sock)
-{
-       struct zserv *client;
-       int i;
-       afi_t afi;
-
-       client = XCALLOC(MTYPE_TMP, sizeof(struct zserv));
-
-       /* Make client input/output buffer. */
-       client->sock = sock;
-       client->ibuf = stream_new(ZEBRA_MAX_PACKET_SIZ);
-       client->obuf = stream_new(ZEBRA_MAX_PACKET_SIZ);
-       client->wb = buffer_new(0);
-
-       /* Set table number. */
-       client->rtm_table = zebrad.rtm_table_default;
-
-       client->connect_time = monotime(NULL);
-       /* Initialize flags */
-       for (afi = AFI_IP; afi < AFI_MAX; afi++)
-               for (i = 0; i < ZEBRA_ROUTE_MAX; i++)
-                       client->redist[afi][i] = vrf_bitmap_init();
-       client->redist_default = vrf_bitmap_init();
-       client->ifinfo = vrf_bitmap_init();
-       client->ridinfo = vrf_bitmap_init();
-
-       /* by default, it's not a synchronous client */
-       client->is_synchronous = 0;
-
-       /* Add this client to linked list. */
-       listnode_add(zebrad.client_list, client);
-
-       /* Make new read thread. */
-       zebra_event(ZEBRA_READ, sock, client);
-
-       zebra_vrf_update_all(client);
-}
-
 static void zread_interface_set_master(ZAPI_HANDLER_ARGS)
 {
        struct interface *master;
        struct interface *slave;
-       struct stream *s = client->ibuf;
+       struct stream *s = msg;
        int ifindex;
        vrf_id_t vrf_id;
 
@@ -2553,7 +2351,7 @@ static void zread_vrf_label(ZAPI_HANDLER_ARGS)
        struct zebra_vrf *def_zvrf;
        enum lsp_types_t ltype;
 
-       s = client->ibuf;
+       s = msg;
        STREAM_GETL(s, nlabel);
        STREAM_GETC(s, afi);
        if (nlabel == zvrf->label[afi]) {
@@ -2619,146 +2417,365 @@ static inline void zread_rule(ZAPI_HANDLER_ARGS)
        uint32_t total, i;
        ifindex_t ifindex;
 
-       s = client->ibuf;
-       STREAM_GETL(s, total);
+       s = msg;
+       STREAM_GETL(s, total);
+
+       for (i = 0; i < total; i++) {
+               memset(&zpr, 0, sizeof(zpr));
+
+               zpr.sock = client->sock;
+               STREAM_GETL(s, zpr.seq);
+               STREAM_GETL(s, zpr.priority);
+               STREAM_GETL(s, zpr.unique);
+               STREAM_GETC(s, zpr.filter.src_ip.family);
+               STREAM_GETC(s, zpr.filter.src_ip.prefixlen);
+               STREAM_GET(&zpr.filter.src_ip.u.prefix, s,
+                          prefix_blen(&zpr.filter.src_ip));
+               STREAM_GETW(s, zpr.filter.src_port);
+               STREAM_GETC(s, zpr.filter.dst_ip.family);
+               STREAM_GETC(s, zpr.filter.dst_ip.prefixlen);
+               STREAM_GET(&zpr.filter.dst_ip.u.prefix, s,
+                          prefix_blen(&zpr.filter.dst_ip));
+               STREAM_GETW(s, zpr.filter.dst_port);
+               STREAM_GETL(s, zpr.action.table);
+               STREAM_GETL(s, ifindex);
+
+               zpr.ifp = if_lookup_by_index(ifindex, VRF_UNKNOWN);
+               if (!zpr.ifp) {
+                       zlog_debug("FAiled to lookup ifindex: %u", ifindex);
+                       return;
+               }
+
+               if (!is_default_prefix(&zpr.filter.src_ip))
+                       zpr.filter.filter_bm |= PBR_FILTER_SRC_IP;
+
+               if (!is_default_prefix(&zpr.filter.dst_ip))
+                       zpr.filter.filter_bm |= PBR_FILTER_DST_IP;
+
+               if (zpr.filter.src_port)
+                       zpr.filter.filter_bm |= PBR_FILTER_SRC_PORT;
+
+               if (zpr.filter.dst_port)
+                       zpr.filter.filter_bm |= PBR_FILTER_DST_PORT;
+
+               if (hdr->command == ZEBRA_RULE_ADD)
+                       zebra_pbr_add_rule(zvrf->zns, &zpr);
+               else
+                       zebra_pbr_del_rule(zvrf->zns, &zpr);
+       }
+
+stream_failure:
+       return;
+}
+
+void (*zserv_handlers[])(ZAPI_HANDLER_ARGS) = {
+       [ZEBRA_ROUTER_ID_ADD] = zread_router_id_add,
+       [ZEBRA_ROUTER_ID_DELETE] = zread_router_id_delete,
+       [ZEBRA_INTERFACE_ADD] = zread_interface_add,
+       [ZEBRA_INTERFACE_DELETE] = zread_interface_delete,
+       [ZEBRA_ROUTE_ADD] = zread_route_add,
+       [ZEBRA_ROUTE_DELETE] = zread_route_del,
+       [ZEBRA_IPV4_ROUTE_ADD] = zread_ipv4_add,
+       [ZEBRA_IPV4_ROUTE_DELETE] = zread_ipv4_delete,
+       [ZEBRA_IPV4_ROUTE_IPV6_NEXTHOP_ADD] = zread_ipv4_route_ipv6_nexthop_add,
+       [ZEBRA_IPV6_ROUTE_ADD] = zread_ipv6_add,
+       [ZEBRA_IPV6_ROUTE_DELETE] = zread_ipv6_delete,
+       [ZEBRA_REDISTRIBUTE_ADD] = zebra_redistribute_add,
+       [ZEBRA_REDISTRIBUTE_DELETE] = zebra_redistribute_delete,
+       [ZEBRA_REDISTRIBUTE_DEFAULT_ADD] = zebra_redistribute_default_add,
+       [ZEBRA_REDISTRIBUTE_DEFAULT_DELETE] = zebra_redistribute_default_delete,
+       [ZEBRA_IPV4_NEXTHOP_LOOKUP_MRIB] = zread_ipv4_nexthop_lookup_mrib,
+       [ZEBRA_HELLO] = zread_hello,
+       [ZEBRA_NEXTHOP_REGISTER] = zread_rnh_register,
+       [ZEBRA_NEXTHOP_UNREGISTER] = zread_rnh_unregister,
+       [ZEBRA_IMPORT_ROUTE_REGISTER] = zread_rnh_register,
+       [ZEBRA_IMPORT_ROUTE_UNREGISTER] = zread_rnh_unregister,
+       [ZEBRA_BFD_DEST_UPDATE] = zebra_ptm_bfd_dst_register,
+       [ZEBRA_BFD_DEST_REGISTER] = zebra_ptm_bfd_dst_register,
+       [ZEBRA_BFD_DEST_DEREGISTER] = zebra_ptm_bfd_dst_deregister,
+       [ZEBRA_VRF_UNREGISTER] = zread_vrf_unregister,
+       [ZEBRA_VRF_LABEL] = zread_vrf_label,
+       [ZEBRA_BFD_CLIENT_REGISTER] = zebra_ptm_bfd_client_register,
+#if defined(HAVE_RTADV)
+       [ZEBRA_INTERFACE_ENABLE_RADV] = zebra_interface_radv_enable,
+       [ZEBRA_INTERFACE_DISABLE_RADV] = zebra_interface_radv_disable,
+#else
+       [ZEBRA_INTERFACE_ENABLE_RADV] = NULL,
+       [ZEBRA_INTERFACE_DISABLE_RADV] = NULL,
+#endif
+       [ZEBRA_MPLS_LABELS_ADD] = zread_mpls_labels,
+       [ZEBRA_MPLS_LABELS_DELETE] = zread_mpls_labels,
+       [ZEBRA_IPMR_ROUTE_STATS] = zebra_ipmr_route_stats,
+       [ZEBRA_LABEL_MANAGER_CONNECT] = zread_label_manager_request,
+       [ZEBRA_GET_LABEL_CHUNK] = zread_label_manager_request,
+       [ZEBRA_RELEASE_LABEL_CHUNK] = zread_label_manager_request,
+       [ZEBRA_FEC_REGISTER] = zread_fec_register,
+       [ZEBRA_FEC_UNREGISTER] = zread_fec_unregister,
+       [ZEBRA_ADVERTISE_DEFAULT_GW] = zebra_vxlan_advertise_gw_macip,
+       [ZEBRA_ADVERTISE_SUBNET] = zebra_vxlan_advertise_subnet,
+       [ZEBRA_ADVERTISE_ALL_VNI] = zebra_vxlan_advertise_all_vni,
+       [ZEBRA_REMOTE_VTEP_ADD] = zebra_vxlan_remote_vtep_add,
+       [ZEBRA_REMOTE_VTEP_DEL] = zebra_vxlan_remote_vtep_del,
+       [ZEBRA_REMOTE_MACIP_ADD] = zebra_vxlan_remote_macip_add,
+       [ZEBRA_REMOTE_MACIP_DEL] = zebra_vxlan_remote_macip_del,
+       [ZEBRA_INTERFACE_SET_MASTER] = zread_interface_set_master,
+       [ZEBRA_PW_ADD] = zread_pseudowire,
+       [ZEBRA_PW_DELETE] = zread_pseudowire,
+       [ZEBRA_PW_SET] = zread_pseudowire,
+       [ZEBRA_PW_UNSET] = zread_pseudowire,
+       [ZEBRA_RULE_ADD] = zread_rule,
+       [ZEBRA_RULE_DELETE] = zread_rule,
+};
+
+static inline void zserv_handle_commands(struct zserv *client,
+                                        struct zmsghdr *hdr,
+                                        struct stream *msg,
+                                        struct zebra_vrf *zvrf)
+{
+       if (hdr->command > sizeof(zserv_handlers)
+           || zserv_handlers[hdr->command] == NULL)
+               zlog_info("Zebra received unknown command %d", hdr->command);
+       else
+               zserv_handlers[hdr->command](client, hdr, msg, zvrf);
+
+       stream_free(msg);
+}
+
+/* Lifecycle ---------------------------------------------------------------- */
+
+/* free zebra client information. */
+static void zebra_client_free(struct zserv *client)
+{
+       /* Send client de-registration to BFD */
+       zebra_ptm_bfd_client_deregister(client->proto);
+
+       /* Cleanup any registered nexthops - across all VRFs. */
+       zebra_client_close_cleanup_rnh(client);
+
+       /* Release Label Manager chunks */
+       release_daemon_chunks(client->proto, client->instance);
+
+       /* Cleanup any FECs registered by this client. */
+       zebra_mpls_cleanup_fecs_for_client(vrf_info_lookup(VRF_DEFAULT),
+                                          client);
+
+       /* Remove pseudowires associated with this client */
+       zebra_pw_client_close(client);
+
+       /* Close file descriptor. */
+       if (client->sock) {
+               unsigned long nroutes;
+
+               close(client->sock);
+               nroutes = rib_score_proto(client->proto, client->instance);
+               zlog_notice(
+                       "client %d disconnected. %lu %s routes removed from the rib",
+                       client->sock, nroutes,
+                       zebra_route_string(client->proto));
+               client->sock = -1;
+       }
+
+       /* Free stream buffers. */
+       if (client->ibuf_work)
+               stream_free(client->ibuf_work);
+       if (client->obuf_work)
+               stream_free(client->obuf_work);
+       if (client->ibuf_fifo)
+               stream_fifo_free(client->ibuf_fifo);
+       if (client->obuf_fifo)
+               stream_fifo_free(client->obuf_fifo);
+       if (client->wb)
+               buffer_free(client->wb);
+
+       /* Release threads. */
+       if (client->t_read)
+               thread_cancel(client->t_read);
+       if (client->t_write)
+               thread_cancel(client->t_write);
+       if (client->t_suicide)
+               thread_cancel(client->t_suicide);
+
+       /* Free bitmaps. */
+       for (afi_t afi = AFI_IP; afi < AFI_MAX; afi++)
+               for (int i = 0; i < ZEBRA_ROUTE_MAX; i++)
+                       vrf_bitmap_free(client->redist[afi][i]);
+
+       vrf_bitmap_free(client->redist_default);
+       vrf_bitmap_free(client->ifinfo);
+       vrf_bitmap_free(client->ridinfo);
+
+       XFREE(MTYPE_TMP, client);
+}
+
+/*
+ * Called from client thread to terminate itself.
+ */
+static void zebra_client_close(struct zserv *client)
+{
+       listnode_delete(zebrad.client_list, client);
+       zebra_client_free(client);
+}
+
+/* Make new client. */
+static void zebra_client_create(int sock)
+{
+       struct zserv *client;
+       int i;
+       afi_t afi;
+
+       client = XCALLOC(MTYPE_TMP, sizeof(struct zserv));
 
-       for (i = 0; i < total; i++) {
-               memset(&zpr, 0, sizeof(zpr));
+       /* Make client input/output buffer. */
+       client->sock = sock;
+       client->ibuf_fifo = stream_fifo_new();
+       client->obuf_fifo = stream_fifo_new();
+       client->ibuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
+       client->obuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
+       client->wb = buffer_new(0);
 
-               zpr.sock = client->sock;
-               STREAM_GETL(s, zpr.seq);
-               STREAM_GETL(s, zpr.priority);
-               STREAM_GETL(s, zpr.unique);
-               STREAM_GETC(s, zpr.filter.src_ip.family);
-               STREAM_GETC(s, zpr.filter.src_ip.prefixlen);
-               STREAM_GET(&zpr.filter.src_ip.u.prefix, s,
-                          prefix_blen(&zpr.filter.src_ip));
-               STREAM_GETW(s, zpr.filter.src_port);
-               STREAM_GETC(s, zpr.filter.dst_ip.family);
-               STREAM_GETC(s, zpr.filter.dst_ip.prefixlen);
-               STREAM_GET(&zpr.filter.dst_ip.u.prefix, s,
-                          prefix_blen(&zpr.filter.dst_ip));
-               STREAM_GETW(s, zpr.filter.dst_port);
-               STREAM_GETL(s, zpr.action.table);
-               STREAM_GETL(s, ifindex);
+       /* Set table number. */
+       client->rtm_table = zebrad.rtm_table_default;
 
-               zpr.ifp = if_lookup_by_index(ifindex, VRF_UNKNOWN);
-               if (!zpr.ifp) {
-                       zlog_debug("FAiled to lookup ifindex: %u", ifindex);
-                       return;
-               }
+       client->connect_time = monotime(NULL);
+       /* Initialize flags */
+       for (afi = AFI_IP; afi < AFI_MAX; afi++)
+               for (i = 0; i < ZEBRA_ROUTE_MAX; i++)
+                       client->redist[afi][i] = vrf_bitmap_init();
+       client->redist_default = vrf_bitmap_init();
+       client->ifinfo = vrf_bitmap_init();
+       client->ridinfo = vrf_bitmap_init();
 
-               if (!is_default_prefix(&zpr.filter.src_ip))
-                       zpr.filter.filter_bm |= PBR_FILTER_SRC_IP;
+       /* by default, it's not a synchronous client */
+       client->is_synchronous = 0;
 
-               if (!is_default_prefix(&zpr.filter.dst_ip))
-                       zpr.filter.filter_bm |= PBR_FILTER_DST_IP;
+       /* Add this client to linked list. */
+       listnode_add(zebrad.client_list, client);
 
-               if (zpr.filter.src_port)
-                       zpr.filter.filter_bm |= PBR_FILTER_SRC_PORT;
+       zebra_vrf_update_all(client);
 
-               if (zpr.filter.dst_port)
-                       zpr.filter.filter_bm |= PBR_FILTER_DST_PORT;
+       /* start read loop */
+       zebra_event(client, ZEBRA_READ);
+}
 
-               if (hdr->command == ZEBRA_RULE_ADD)
-                       zebra_pbr_add_rule(zvrf->zns, &zpr);
-               else
-                       zebra_pbr_del_rule(zvrf->zns, &zpr);
-       }
+static int zserv_delayed_close(struct thread *thread)
+{
+       struct zserv *client = THREAD_ARG(thread);
 
-stream_failure:
-       return;
+       client->t_suicide = NULL;
+       zebra_client_close(client);
+       return 0;
 }
 
 /*
- * Reads header from zmsg stream.
+ * Log zapi message to zlog.
+ *
+ * errmsg (optional)
+ *    Debugging message
  *
- * Note this advances the stream getp by the size of the header.
+ * msg
+ *    The message
+ *
+ * hdr (optional)
+ *    The message header
  */
-static bool zserv_read_header(struct stream *msg, struct zmsghdr *hdr)
-{
-       STREAM_GETW(msg, hdr->length);
-       STREAM_GETC(msg, hdr->marker);
-       STREAM_GETC(msg, hdr->version);
-       STREAM_GETL(msg, hdr->vrf_id);
-       STREAM_GETW(msg, hdr->command);
-       return true;
-stream_failure:
-       return false;
+static void zserv_log_message(const char *errmsg, struct stream *msg,
+                             struct zmsghdr *hdr)
+{
+       zlog_debug("Rx'd ZAPI message");
+       if (errmsg)
+               zlog_debug("%s", errmsg);
+       if (hdr) {
+               zlog_debug(" Length: %d", hdr->length);
+               zlog_debug("Command: %s", zserv_command_string(hdr->command));
+               zlog_debug("    VRF: %u", hdr->vrf_id);
+       }
+       zlog_hexdump(msg->data, STREAM_READABLE(msg));
 }
 
-void (*zserv_handlers[])(ZAPI_HANDLER_ARGS) = {
-       [ZEBRA_ROUTER_ID_ADD] = zread_router_id_add,
-       [ZEBRA_ROUTER_ID_DELETE] = zread_router_id_delete,
-       [ZEBRA_INTERFACE_ADD] = zread_interface_add,
-       [ZEBRA_INTERFACE_DELETE] = zread_interface_delete,
-       [ZEBRA_ROUTE_ADD] = zread_route_add,
-       [ZEBRA_ROUTE_DELETE] = zread_route_del,
-       [ZEBRA_IPV4_ROUTE_ADD] = zread_ipv4_add,
-       [ZEBRA_IPV4_ROUTE_DELETE] = zread_ipv4_delete,
-       [ZEBRA_IPV4_ROUTE_IPV6_NEXTHOP_ADD] = zread_ipv4_route_ipv6_nexthop_add,
-       [ZEBRA_IPV6_ROUTE_ADD] = zread_ipv6_add,
-       [ZEBRA_IPV6_ROUTE_DELETE] = zread_ipv6_delete,
-       [ZEBRA_REDISTRIBUTE_ADD] = zebra_redistribute_add,
-       [ZEBRA_REDISTRIBUTE_DELETE] = zebra_redistribute_delete,
-       [ZEBRA_REDISTRIBUTE_DEFAULT_ADD] = zebra_redistribute_default_add,
-       [ZEBRA_REDISTRIBUTE_DEFAULT_DELETE] = zebra_redistribute_default_delete,
-       [ZEBRA_IPV4_NEXTHOP_LOOKUP_MRIB] = zread_ipv4_nexthop_lookup_mrib,
-       [ZEBRA_HELLO] = zread_hello,
-       [ZEBRA_NEXTHOP_REGISTER] = zserv_rnh_register,
-       [ZEBRA_NEXTHOP_UNREGISTER] = zserv_rnh_unregister,
-       [ZEBRA_IMPORT_ROUTE_REGISTER] = zserv_rnh_register,
-       [ZEBRA_IMPORT_ROUTE_UNREGISTER] = zserv_rnh_unregister,
-       [ZEBRA_BFD_DEST_UPDATE] = zebra_ptm_bfd_dst_register,
-       [ZEBRA_BFD_DEST_REGISTER] = zebra_ptm_bfd_dst_register,
-       [ZEBRA_BFD_DEST_DEREGISTER] = zebra_ptm_bfd_dst_deregister,
-       [ZEBRA_VRF_UNREGISTER] = zread_vrf_unregister,
-       [ZEBRA_VRF_LABEL] = zread_vrf_label,
-       [ZEBRA_BFD_CLIENT_REGISTER] = zebra_ptm_bfd_client_register,
-#if defined(HAVE_RTADV)
-       [ZEBRA_INTERFACE_ENABLE_RADV] = zebra_interface_radv_enable,
-       [ZEBRA_INTERFACE_DISABLE_RADV] = zebra_interface_radv_disable,
-#else
-       [ZEBRA_INTERFACE_ENABLE_RADV] = NULL,
-       [ZEBRA_INTERFACE_DISABLE_RADV] = NULL,
-#endif
-       [ZEBRA_MPLS_LABELS_ADD] = zread_mpls_labels,
-       [ZEBRA_MPLS_LABELS_DELETE] = zread_mpls_labels,
-       [ZEBRA_IPMR_ROUTE_STATS] = zebra_ipmr_route_stats,
-       [ZEBRA_LABEL_MANAGER_CONNECT] = zread_label_manager_request,
-       [ZEBRA_GET_LABEL_CHUNK] = zread_label_manager_request,
-       [ZEBRA_RELEASE_LABEL_CHUNK] = zread_label_manager_request,
-       [ZEBRA_FEC_REGISTER] = zserv_fec_register,
-       [ZEBRA_FEC_UNREGISTER] = zserv_fec_unregister,
-       [ZEBRA_ADVERTISE_DEFAULT_GW] = zebra_vxlan_advertise_gw_macip,
-       [ZEBRA_ADVERTISE_SUBNET] = zebra_vxlan_advertise_subnet,
-       [ZEBRA_ADVERTISE_ALL_VNI] = zebra_vxlan_advertise_all_vni,
-       [ZEBRA_REMOTE_VTEP_ADD] = zebra_vxlan_remote_vtep_add,
-       [ZEBRA_REMOTE_VTEP_DEL] = zebra_vxlan_remote_vtep_del,
-       [ZEBRA_REMOTE_MACIP_ADD] = zebra_vxlan_remote_macip_add,
-       [ZEBRA_REMOTE_MACIP_DEL] = zebra_vxlan_remote_macip_del,
-       [ZEBRA_INTERFACE_SET_MASTER] = zread_interface_set_master,
-       [ZEBRA_PW_ADD] = zread_pseudowire,
-       [ZEBRA_PW_DELETE] = zread_pseudowire,
-       [ZEBRA_PW_SET] = zread_pseudowire,
-       [ZEBRA_PW_UNSET] = zread_pseudowire,
-       [ZEBRA_RULE_ADD] = zread_rule,
-       [ZEBRA_RULE_DELETE] = zread_rule,
-};
+static int zserv_flush_data(struct thread *thread)
+{
+       struct zserv *client = THREAD_ARG(thread);
 
-static inline void zserv_handle_commands(struct zserv *client, uint16_t command,
-                                        uint16_t length,
-                                        struct zebra_vrf *zvrf)
+       client->t_write = NULL;
+       if (client->t_suicide) {
+               zebra_client_close(client);
+               return -1;
+       }
+       switch (buffer_flush_available(client->wb, client->sock)) {
+       case BUFFER_ERROR:
+               zlog_warn(
+                       "%s: buffer_flush_available failed on zserv client fd %d, closing",
+                       __func__, client->sock);
+               zebra_client_close(client);
+               client = NULL;
+               break;
+       case BUFFER_PENDING:
+               client->t_write = NULL;
+               thread_add_write(zebrad.master, zserv_flush_data, client,
+                                client->sock, &client->t_write);
+               break;
+       case BUFFER_EMPTY:
+               break;
+       }
+
+       if (client)
+               client->last_write_time = monotime(NULL);
+       return 0;
+}
+
+/*
+ * Write a single packet.
+ */
+static int zserv_write(struct thread *thread)
 {
-       struct zmsghdr hdr;
+       struct zserv *client = THREAD_ARG(thread);
+       struct stream *msg;
+       int writerv;
 
-       stream_set_getp(client->ibuf, 0);
-       zserv_read_header(client->ibuf, &hdr);
-       if (hdr.command > sizeof(zserv_handlers)
-           || zserv_handlers[hdr.command] == NULL)
-               zlog_info("Zebra received unknown command %d", hdr.command);
-       else
-               zserv_handlers[hdr.command](client, &hdr, zvrf);
+       if (client->t_suicide)
+               return -1;
+
+       if (client->is_synchronous)
+               return 0;
+
+       msg = stream_fifo_pop(client->obuf_fifo);
+       stream_set_getp(msg, 0);
+       client->last_write_cmd = stream_getw_from(msg, 6);
+
+       writerv = buffer_write(client->wb, client->sock, STREAM_DATA(msg),
+                              stream_get_endp(msg));
+
+       stream_free(msg);
+
+       switch (writerv) {
+       case BUFFER_ERROR:
+               zlog_warn(
+                       "%s: buffer_write failed to zserv client fd %d, closing",
+                       __func__, client->sock);
+               /*
+                * Schedule a delayed close since many of the functions that
+                * call this one do not check the return code. They do not
+                * allow for the possibility that an I/O error may have caused
+                * the client to be deleted.
+                */
+               client->t_suicide = NULL;
+               thread_add_event(zebrad.master, zserv_delayed_close, client, 0,
+                                &client->t_suicide);
+               return -1;
+       case BUFFER_EMPTY:
+               THREAD_OFF(client->t_write);
+               break;
+       case BUFFER_PENDING:
+               thread_add_write(zebrad.master, zserv_flush_data, client,
+                                client->sock, &client->t_write);
+               break;
+       }
+
+       if (client->obuf_fifo->count)
+               zebra_event(client, ZEBRA_WRITE);
+
+       client->last_write_time = monotime(NULL);
+       return 0;
 }
 
 #if defined(HANDLE_ZAPI_FUZZING)
@@ -2781,26 +2798,66 @@ static void zserv_write_incoming(struct stream *orig, uint16_t command)
 }
 #endif
 
+static int zserv_process_messages(struct thread *thread)
+{
+       struct zserv *client = THREAD_ARG(thread);
+       struct zebra_vrf *zvrf;
+       struct zmsghdr hdr;
+       struct stream *msg;
+       bool hdrvalid;
+
+       do {
+               msg = stream_fifo_pop(client->ibuf_fifo);
+
+               /* break if out of messages */
+               if (!msg)
+                       continue;
+
+               /* read & check header */
+               hdrvalid = zapi_parse_header(msg, &hdr);
+               if (!hdrvalid && IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV) {
+                       const char *emsg = "Message has corrupt header";
+                       zserv_log_message(emsg, msg, NULL);
+               }
+               if (!hdrvalid)
+                       continue;
+
+               /* lookup vrf */
+               zvrf = zebra_vrf_lookup_by_id(hdr.vrf_id);
+               if (!zvrf && IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV) {
+                       const char *emsg = "Message specifies unknown VRF";
+                       zserv_log_message(emsg, msg, &hdr);
+               }
+               if (!zvrf)
+                       continue;
+
+               /* process commands */
+               zserv_handle_commands(client, &hdr, msg, zvrf);
+
+       } while (msg);
+
+       return 0;
+}
+
 /* Handler of zebra service request. */
-static int zebra_client_read(struct thread *thread)
+static int zserv_read(struct thread *thread)
 {
        int sock;
        struct zserv *client;
        size_t already;
-       uint16_t length, command;
-       uint8_t marker, version;
-       vrf_id_t vrf_id;
-       struct zebra_vrf *zvrf;
 #if defined(HANDLE_ZAPI_FUZZING)
        int packets = 1;
 #else
        int packets = zebrad.packets_to_process;
 #endif
+       struct zmsghdr hdr;
+       ssize_t nb;
+       bool hdrvalid;
+       char errmsg[256];
 
        /* Get thread data.  Reset reading thread because I'm running. */
        sock = THREAD_FD(thread);
        client = THREAD_ARG(thread);
-       client->t_read = NULL;
 
        if (client->t_suicide) {
                zebra_client_close(client);
@@ -2808,88 +2865,84 @@ static int zebra_client_read(struct thread *thread)
        }
 
        while (packets) {
+               already = stream_get_endp(client->ibuf_work);
+
                /* Read length and command (if we don't have it already). */
-               if ((already = stream_get_endp(client->ibuf))
-                   < ZEBRA_HEADER_SIZE) {
-                       ssize_t nbyte;
-                       if (((nbyte = stream_read_try(client->ibuf, sock,
-                                                     ZEBRA_HEADER_SIZE
-                                                             - already))
-                            == 0)
-                           || (nbyte == -1)) {
-                               if (IS_ZEBRA_DEBUG_EVENT)
-                                       zlog_debug(
-                                               "connection closed socket [%d]",
-                                               sock);
-                               zebra_client_close(client);
-                               return -1;
-                       }
-                       if (nbyte != (ssize_t)(ZEBRA_HEADER_SIZE - already)) {
+               if (already < ZEBRA_HEADER_SIZE) {
+                       nb = stream_read_try(client->ibuf_work, sock,
+                                            ZEBRA_HEADER_SIZE - already);
+                       if ((nb == 0 || nb == -1) && IS_ZEBRA_DEBUG_EVENT)
+                               zlog_debug("connection closed socket [%d]",
+                                          sock);
+                       if ((nb == 0 || nb == -1))
+                               goto zread_fail;
+                       if (nb != (ssize_t)(ZEBRA_HEADER_SIZE - already)) {
                                /* Try again later. */
-                               zebra_event(ZEBRA_READ, sock, client);
-                               return 0;
+                               break;
                        }
                        already = ZEBRA_HEADER_SIZE;
                }
 
                /* Reset to read from the beginning of the incoming packet. */
-               stream_set_getp(client->ibuf, 0);
+               stream_set_getp(client->ibuf_work, 0);
 
                /* Fetch header values */
-               STREAM_GETW(client->ibuf, length);
-               STREAM_GETC(client->ibuf, marker);
-               STREAM_GETC(client->ibuf, version);
-               STREAM_GETL(client->ibuf, vrf_id);
-               STREAM_GETW(client->ibuf, command);
+               hdrvalid = zapi_parse_header(client->ibuf_work, &hdr);
 
-               if (marker != ZEBRA_HEADER_MARKER || version != ZSERV_VERSION) {
-                       zlog_err(
-                               "%s: socket %d version mismatch, marker %d, version %d",
-                               __func__, sock, marker, version);
-                       zebra_client_close(client);
-                       return -1;
+               if (!hdrvalid) {
+                       snprintf(errmsg, sizeof(errmsg),
+                                "%s: Message has corrupt header", __func__);
+                       zserv_log_message(errmsg, client->ibuf_work, NULL);
+                       goto zread_fail;
                }
-               if (length < ZEBRA_HEADER_SIZE) {
-                       zlog_warn(
-                               "%s: socket %d message length %u is less than header size %d",
-                               __func__, sock, length, ZEBRA_HEADER_SIZE);
-                       zebra_client_close(client);
-                       return -1;
+
+               /* Validate header */
+               if (hdr.marker != ZEBRA_HEADER_MARKER
+                   || hdr.version != ZSERV_VERSION) {
+                       snprintf(
+                               errmsg, sizeof(errmsg),
+                               "Message has corrupt header\n%s: socket %d version mismatch, marker %d, version %d",
+                               __func__, sock, hdr.marker, hdr.version);
+                       zserv_log_message(errmsg, client->ibuf_work, &hdr);
+                       goto zread_fail;
                }
-               if (length > STREAM_SIZE(client->ibuf)) {
-                       zlog_warn(
-                               "%s: socket %d message length %u exceeds buffer size %lu",
-                               __func__, sock, length,
-                               (u_long)STREAM_SIZE(client->ibuf));
-                       zebra_client_close(client);
-                       return -1;
+               if (hdr.length < ZEBRA_HEADER_SIZE) {
+                       snprintf(
+                               errmsg, sizeof(errmsg),
+                               "Message has corrupt header\n%s: socket %d message length %u is less than header size %d",
+                               __func__, sock, hdr.length, ZEBRA_HEADER_SIZE);
+                       zserv_log_message(errmsg, client->ibuf_work, &hdr);
+                       goto zread_fail;
+               }
+               if (hdr.length > STREAM_SIZE(client->ibuf_work)) {
+                       snprintf(
+                               errmsg, sizeof(errmsg),
+                               "Message has corrupt header\n%s: socket %d message length %u exceeds buffer size %lu",
+                               __func__, sock, hdr.length,
+                               (unsigned long)STREAM_SIZE(client->ibuf_work));
+                       goto zread_fail;
                }
 
                /* Read rest of data. */
-               if (already < length) {
-                       ssize_t nbyte;
-                       if (((nbyte = stream_read_try(client->ibuf, sock,
-                                                     length - already))
-                            == 0)
-                           || (nbyte == -1)) {
-                               if (IS_ZEBRA_DEBUG_EVENT)
-                                       zlog_debug(
-                                               "connection closed [%d] when reading zebra data",
-                                               sock);
-                               zebra_client_close(client);
-                               return -1;
-                       }
-                       if (nbyte != (ssize_t)(length - already)) {
+               if (already < hdr.length) {
+                       nb = stream_read_try(client->ibuf_work, sock,
+                                            hdr.length - already);
+                       if ((nb == 0 || nb == -1) && IS_ZEBRA_DEBUG_EVENT)
+                               zlog_debug(
+                                       "connection closed [%d] when reading zebra data",
+                                       sock);
+                       if ((nb == 0 || nb == -1))
+                               goto zread_fail;
+                       if (nb != (ssize_t)(hdr.length - already)) {
                                /* Try again later. */
-                               zebra_event(ZEBRA_READ, sock, client);
-                               return 0;
+                               break;
                        }
                }
 
 #if defined(HANDLE_ZAPI_FUZZING)
-               zserv_write_incoming(client->ibuf, command);
+               zserv_write_incoming(client->ibuf_work, command);
 #endif
-               length -= ZEBRA_HEADER_SIZE;
+               hdr.length -= ZEBRA_HEADER_SIZE;
 
                /* Debug packet information. */
                if (IS_ZEBRA_DEBUG_EVENT)
@@ -2897,41 +2950,54 @@ static int zebra_client_read(struct thread *thread)
                                   sock);
 
                if (IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV)
-                       zlog_debug("zebra message received [%s] %d in VRF %u",
-                                  zserv_command_string(command), length,
-                                  vrf_id);
+                       zserv_log_message(NULL, client->ibuf_work, &hdr);
 
                client->last_read_time = monotime(NULL);
-               client->last_read_cmd = command;
-
-               zvrf = zebra_vrf_lookup_by_id(vrf_id);
-               if (!zvrf) {
-                       if (IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV)
-                               zlog_debug("zebra received unknown VRF[%u]",
-                                          vrf_id);
-                       goto zclient_read_out;
-               }
+               client->last_read_cmd = hdr.command;
 
-               zserv_handle_commands(client, command, length, zvrf);
+               stream_set_getp(client->ibuf_work, 0);
+               struct stream *msg = stream_dup(client->ibuf_work);
 
-               if (client->t_suicide) {
-                       /* No need to wait for thread callback, just kill
-                        * immediately.
-                        */
-                       zebra_client_close(client);
-                       return -1;
-               }
-               packets -= 1;
-               stream_reset(client->ibuf);
+               stream_fifo_push(client->ibuf_fifo, msg);
+
+               if (client->t_suicide)
+                       goto zread_fail;
+
+               --packets;
+               stream_reset(client->ibuf_work);
        }
 
-stream_failure:
-zclient_read_out:
-       stream_reset(client->ibuf);
-       zebra_event(ZEBRA_READ, sock, client);
+       if (IS_ZEBRA_DEBUG_PACKET)
+               zlog_debug("Read %d packets",
+                          zebrad.packets_to_process - packets);
+
+       /* Schedule job to process those packets */
+       thread_add_event(zebrad.master, &zserv_process_messages, client, 0,
+                        NULL);
+
+       /* Reschedule ourselves */
+       zebra_event(client, ZEBRA_READ);
+
        return 0;
+
+zread_fail:
+       zebra_client_close(client);
+       return -1;
 }
 
+static void zebra_event(struct zserv *client, enum event event)
+{
+       switch (event) {
+       case ZEBRA_READ:
+               thread_add_read(zebrad.master, zserv_read, client, client->sock,
+                               &client->t_read);
+               break;
+       case ZEBRA_WRITE:
+               thread_add_write(zebrad.master, zserv_write, client,
+                                client->sock, &client->t_write);
+               break;
+       }
+}
 
 /* Accept code of zebra server socket. */
 static int zebra_accept(struct thread *thread)
@@ -2944,7 +3010,7 @@ static int zebra_accept(struct thread *thread)
        accept_sock = THREAD_FD(thread);
 
        /* Reregister myself. */
-       zebra_event(ZEBRA_SERV, accept_sock, NULL);
+       thread_add_read(zebrad.master, zebra_accept, NULL, accept_sock, NULL);
 
        len = sizeof(struct sockaddr_in);
        client_sock = accept(accept_sock, (struct sockaddr *)&client, &len);
@@ -3031,26 +3097,7 @@ void zebra_zserv_socket_init(char *path)
 
        umask(old_mask);
 
-       zebra_event(ZEBRA_SERV, sock, NULL);
-}
-
-
-static void zebra_event(enum event event, int sock, struct zserv *client)
-{
-       switch (event) {
-       case ZEBRA_SERV:
-               thread_add_read(zebrad.master, zebra_accept, client, sock,
-                               NULL);
-               break;
-       case ZEBRA_READ:
-               client->t_read = NULL;
-               thread_add_read(zebrad.master, zebra_client_read, client, sock,
-                               &client->t_read);
-               break;
-       case ZEBRA_WRITE:
-               /**/
-               break;
-       }
+       thread_add_read(zebrad.master, zebra_accept, NULL, sock, NULL);
 }
 
 #define ZEBRA_TIME_BUF 32