]> git.proxmox.com Git - mirror_frr.git/commitdiff
Merge pull request #2123 from qlyoung/zserv-mt
authorRuss White <russ@riw.us>
Fri, 8 Jun 2018 10:47:14 +0000 (06:47 -0400)
committerGitHub <noreply@github.com>
Fri, 8 Jun 2018 10:47:14 +0000 (06:47 -0400)
Multithreaded Zserv

16 files changed:
zebra/label_manager.c
zebra/main.c
zebra/table_manager.c
zebra/zapi_msg.c
zebra/zapi_msg.h
zebra/zebra_mpls.c
zebra/zebra_mroute.c
zebra/zebra_pbr.c
zebra/zebra_ptm.c
zebra/zebra_ptm_redistribute.c
zebra/zebra_pw.c
zebra/zebra_rnh.c
zebra/zebra_vty.c
zebra/zebra_vxlan.c
zebra/zserv.c
zebra/zserv.h

index f3fa3ba94efd618e9730c46527f8c2f64f14390f..b24a4b68dc0d7b2e97fb3c4baa9e81013ac9e9c3 100644 (file)
@@ -99,7 +99,7 @@ static int relay_response_back(void)
        proto_str = zebra_route_string(proto);
 
        /* lookup the client to relay the msg to */
-       zserv = zebra_find_client(proto, instance);
+       zserv = zserv_find_client(proto, instance);
        if (!zserv) {
                zlog_err(
                        "Error relaying LM response: can't find client %s, instance %u",
@@ -350,7 +350,7 @@ void label_manager_init(char *lm_zserv_path)
 
        obuf = stream_new(ZEBRA_MAX_PACKET_SIZ);
 
-       hook_register(zapi_client_close, release_daemon_label_chunks);
+       hook_register(zserv_client_close, release_daemon_label_chunks);
 }
 
 /**
index 9c721f0a7ee9683c1752ac4c0cecd9c69d069910..c5246999fa47ebe5dcd457f44b724716259e7459 100644 (file)
@@ -37,6 +37,7 @@
 #include "logicalrouter.h"
 #include "libfrr.h"
 #include "routemap.h"
+#include "frr_pthread.h"
 
 #include "zebra/rib.h"
 #include "zebra/zserv.h"
@@ -378,8 +379,11 @@ int main(int argc, char **argv)
        /* Needed for BSD routing socket. */
        pid = getpid();
 
-       /* This must be done only after locking pidfile (bug #403). */
-       zebra_zserv_socket_init(zserv_path);
+       /* Intialize pthread library */
+       frr_pthread_init();
+
+       /* Start Zebra API server */
+       zserv_start(zserv_path);
 
        /* Init label manager */
        label_manager_init(lblmgr_path);
index cb8c384436977bb44228e598f2d5c710f0973ad2..5bcc2c40d6d085a6f5568d89fe565d2ee56f604d 100644 (file)
@@ -78,7 +78,7 @@ void table_manager_enable(ns_id_t ns_id)
                return;
        tbl_mgr.lc_list = list_new();
        tbl_mgr.lc_list->del = delete_table_chunk;
-       hook_register(zapi_client_close, release_daemon_table_chunks);
+       hook_register(zserv_client_close, release_daemon_table_chunks);
 }
 
 /**
index 943329b1962bc0d3b6121d07f36394e94dff0ffd..b17bbc95c26ceab0b2fa6f6bebeb002a797420f8 100644 (file)
@@ -162,7 +162,7 @@ int zsend_interface_add(struct zserv *client, struct interface *ifp)
        zserv_encode_interface(s, ifp);
 
        client->ifadd_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /* Interface deletion from zebra daemon. */
@@ -174,7 +174,7 @@ int zsend_interface_delete(struct zserv *client, struct interface *ifp)
        zserv_encode_interface(s, ifp);
 
        client->ifdel_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 int zsend_vrf_add(struct zserv *client, struct zebra_vrf *zvrf)
@@ -185,7 +185,7 @@ int zsend_vrf_add(struct zserv *client, struct zebra_vrf *zvrf)
        zserv_encode_vrf(s, zvrf);
 
        client->vrfadd_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /* VRF deletion from zebra daemon. */
@@ -198,7 +198,7 @@ int zsend_vrf_delete(struct zserv *client, struct zebra_vrf *zvrf)
        zserv_encode_vrf(s, zvrf);
 
        client->vrfdel_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 int zsend_interface_link_params(struct zserv *client, struct interface *ifp)
@@ -230,7 +230,7 @@ int zsend_interface_link_params(struct zserv *client, struct interface *ifp)
        /* Write packet size. */
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /* Interface address is added/deleted. Send ZEBRA_INTERFACE_ADDRESS_ADD or
@@ -309,7 +309,7 @@ int zsend_interface_address(int cmd, struct zserv *client,
        stream_putw_at(s, 0, stream_get_endp(s));
 
        client->connected_rt_add_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 static int zsend_interface_nbr_address(int cmd, struct zserv *client,
@@ -340,7 +340,7 @@ static int zsend_interface_nbr_address(int cmd, struct zserv *client,
        /* Write packet size. */
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /* Interface address addition. */
@@ -438,7 +438,7 @@ int zsend_interface_vrf_update(struct zserv *client, struct interface *ifp,
        stream_putw_at(s, 0, stream_get_endp(s));
 
        client->if_vrfchg_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /* Add new nbr connected IPv6 address */
@@ -511,7 +511,7 @@ int zsend_interface_update(int cmd, struct zserv *client, struct interface *ifp)
        else
                client->ifdown_cnt++;
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 int zsend_redistribute_route(int cmd, struct zserv *client, struct prefix *p,
@@ -602,7 +602,7 @@ int zsend_redistribute_route(int cmd, struct zserv *client, struct prefix *p,
                           zebra_route_string(api.type), api.vrf_id,
                           buf_prefix);
        }
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /*
@@ -655,7 +655,7 @@ static int zsend_ipv4_nexthop_lookup_mrib(struct zserv *client,
 
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 int zsend_route_notify_owner(struct route_entry *re, struct prefix *p,
@@ -665,7 +665,7 @@ int zsend_route_notify_owner(struct route_entry *re, struct prefix *p,
        struct stream *s;
        uint8_t blen;
 
-       client = zebra_find_client(re->type, re->instance);
+       client = zserv_find_client(re->type, re->instance);
        if (!client || !client->notify_owner) {
                if (IS_ZEBRA_DEBUG_PACKET) {
                        char buff[PREFIX_STRLEN];
@@ -703,7 +703,7 @@ int zsend_route_notify_owner(struct route_entry *re, struct prefix *p,
 
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 void zsend_rule_notify_owner(struct zebra_pbr_rule *rule,
@@ -739,7 +739,7 @@ void zsend_rule_notify_owner(struct zebra_pbr_rule *rule,
 
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       zebra_server_send_message(client, s);
+       zserv_send_message(client, s);
 }
 
 void zsend_ipset_notify_owner(struct zebra_pbr_ipset *ipset,
@@ -769,7 +769,7 @@ void zsend_ipset_notify_owner(struct zebra_pbr_ipset *ipset,
        stream_put(s, ipset->ipset_name, ZEBRA_IPSET_NAME_SIZE);
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       zebra_server_send_message(client, s);
+       zserv_send_message(client, s);
 }
 
 void zsend_ipset_entry_notify_owner(struct zebra_pbr_ipset_entry *ipset,
@@ -799,7 +799,7 @@ void zsend_ipset_entry_notify_owner(struct zebra_pbr_ipset_entry *ipset,
        stream_put(s, ipset->backpointer->ipset_name, ZEBRA_IPSET_NAME_SIZE);
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       zebra_server_send_message(client, s);
+       zserv_send_message(client, s);
 }
 
 void zsend_iptable_notify_owner(struct zebra_pbr_iptable *iptable,
@@ -828,7 +828,7 @@ void zsend_iptable_notify_owner(struct zebra_pbr_iptable *iptable,
        stream_putl(s, iptable->unique);
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       zebra_server_send_message(client, s);
+       zserv_send_message(client, s);
 }
 
 /* Router-id is updated. Send ZEBRA_ROUTER_ID_ADD to client. */
@@ -855,7 +855,7 @@ int zsend_router_id_update(struct zserv *client, struct prefix *p,
        /* Write packet size. */
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /*
@@ -873,7 +873,7 @@ int zsend_pw_update(struct zserv *client, struct zebra_pw *pw)
        /* Put length at the first point of the stream. */
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /* Send response to a get label chunk request to client */
@@ -952,7 +952,7 @@ static int zsend_assign_table_chunk_response(struct zserv *client,
        /* Write packet size. */
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 static int zsend_table_manager_connect_response(struct zserv *client,
@@ -968,7 +968,7 @@ static int zsend_table_manager_connect_response(struct zserv *client,
 
        stream_putw_at(s, 0, stream_get_endp(s));
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /* Inbound message handling ------------------------------------------------ */
@@ -2194,7 +2194,7 @@ static void zsend_capabilities(struct zserv *client, struct zebra_vrf *zvrf)
        stream_putl(s, multipath_num);
 
        stream_putw_at(s, 0, stream_get_endp(s));
-       zebra_server_send_message(client, s);
+       zserv_send_message(client, s);
 }
 
 /* Tie up route-type and client->sock */
@@ -3017,14 +3017,53 @@ void (*zserv_handlers[])(ZAPI_HANDLER_ARGS) = {
        [ZEBRA_IPTABLE_DELETE] = zread_iptable,
 };
 
-void zserv_handle_commands(struct zserv *client, struct zmsghdr *hdr,
-                          struct stream *msg, struct zebra_vrf *zvrf)
+#if defined(HANDLE_ZAPI_FUZZING)
+extern struct zebra_privs_t zserv_privs;
+
+static void zserv_write_incoming(struct stream *orig, uint16_t command)
 {
-       if (hdr->command > array_size(zserv_handlers)
-           || zserv_handlers[hdr->command] == NULL)
-               zlog_info("Zebra received unknown command %d", hdr->command);
-       else
-               zserv_handlers[hdr->command](client, hdr, msg, zvrf);
+       char fname[MAXPATHLEN];
+       struct stream *copy;
+       int fd = -1;
+
+       copy = stream_dup(orig);
+       stream_set_getp(copy, 0);
+
+       zserv_privs.change(ZPRIVS_RAISE);
+       snprintf(fname, MAXPATHLEN, "%s/%u", DAEMON_VTY_DIR, command);
+       fd = open(fname, O_CREAT | O_WRONLY | O_EXCL, 0644);
+       stream_flush(copy, fd);
+       close(fd);
+       zserv_privs.change(ZPRIVS_LOWER);
+       stream_free(copy);
+}
+#endif
+
+void zserv_handle_commands(struct zserv *client, struct stream *msg)
+{
+       struct zmsghdr hdr;
+       struct zebra_vrf *zvrf;
 
-       stream_free(msg);
+       zapi_parse_header(msg, &hdr);
+
+#if defined(HANDLE_ZAPI_FUZZING)
+       zserv_write_incoming(msg, hdr.command);
+#endif
+
+       hdr.length -= ZEBRA_HEADER_SIZE;
+
+       /* lookup vrf */
+       zvrf = zebra_vrf_lookup_by_id(hdr.vrf_id);
+       if (!zvrf) {
+               if (IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV)
+                       zlog_warn("ZAPI message specifies unknown VRF: %d",
+                                 hdr.vrf_id);
+               return;
+       }
+
+       if (hdr.command > array_size(zserv_handlers)
+           || zserv_handlers[hdr.command] == NULL)
+               zlog_info("Zebra received unknown command %d", hdr.command);
+       else
+               zserv_handlers[hdr.command](client, &hdr, msg, zvrf);
 }
index 1658c9852d388daff40f4bb6a503306275eb65e0..f27897580a98d16e17a23092b91c3ccc15f93684 100644 (file)
  * client
  *    the client datastructure
  *
- * hdr
- *    the message header
- *
  * msg
- *    the message contents, without the header
- *
- * zvrf
- *    the vrf
+ *    the message
  */
-extern void zserv_handle_commands(struct zserv *client, struct zmsghdr *hdr,
-                                 struct stream *msg, struct zebra_vrf *zvrf);
+extern void zserv_handle_commands(struct zserv *client, struct stream *msg);
 
 extern int zsend_vrf_add(struct zserv *zclient, struct zebra_vrf *zvrf);
 extern int zsend_vrf_delete(struct zserv *zclient, struct zebra_vrf *zvrf);
index 3ad640653f92a972568ebbc69d98707c22b71895..e040808e010acf5816bd963f0e22353425d86f70 100644 (file)
@@ -463,7 +463,7 @@ static int fec_send(zebra_fec_t *fec, struct zserv *client)
        stream_put_prefix(s, &rn->p);
        stream_putl(s, fec->label);
        stream_putw_at(s, 0, stream_get_endp(s));
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /*
@@ -2916,5 +2916,5 @@ void zebra_mpls_init(void)
        if (!mpls_processq_init(&zebrad))
                mpls_enabled = 1;
 
-       hook_register(zapi_client_close, zebra_mpls_cleanup_fecs_for_client);
+       hook_register(zserv_client_close, zebra_mpls_cleanup_fecs_for_client);
 }
index 042bd3769e774c1fcaea995aeb46ddf764bd9325..3af3cd5bb2fc72ef91efd76cc52c76ed34a9cb05 100644 (file)
@@ -67,5 +67,5 @@ stream_failure:
        stream_putl(s, suc);
 
        stream_putw_at(s, 0, stream_get_endp(s));
-       zebra_server_send_message(client, s);
+       zserv_send_message(client, s);
 }
index 6a42aaecb464db307dc2bc65e31b374a5f56965d..799b85d4d1fb85821374606fc5e51e4a06f39061 100644 (file)
@@ -30,6 +30,7 @@
 #include "zebra/rt.h"
 #include "zebra/zapi_msg.h"
 #include "zebra/zebra_memory.h"
+#include "zebra/zserv.h"
 
 /* definitions */
 DEFINE_MTYPE_STATIC(ZEBRA, PBR_IPTABLE_IFNAME, "PBR interface list")
@@ -463,7 +464,7 @@ static int zebra_pbr_client_close_cleanup(struct zserv *client)
 
 void zebra_pbr_init(void)
 {
-       hook_register(zapi_client_close, zebra_pbr_client_close_cleanup);
+       hook_register(zserv_client_close, zebra_pbr_client_close_cleanup);
 }
 
 static void *pbr_ipset_alloc_intern(void *arg)
index d20f93f521a8bdf65513b6058aa64c013dea8dbc..8c23bf34cf1670c080003fafe9dc4ece86d73fb9 100644 (file)
@@ -126,7 +126,7 @@ void zebra_ptm_init(void)
 
        ptm_cb.ptm_sock = -1;
 
-       hook_register(zapi_client_close, zebra_ptm_bfd_client_deregister);
+       hook_register(zserv_client_close, zebra_ptm_bfd_client_deregister);
 }
 
 void zebra_ptm_finish(void)
index 74771476987f4b9418a2d6ca0f36bb33eef25804..815f61d157a083b4dd42789ea1e35053d6125aaf 100644 (file)
@@ -66,7 +66,7 @@ static int zsend_interface_bfd_update(int cmd, struct zserv *client,
        stream_putw_at(s, 0, stream_get_endp(s));
 
        client->if_bfd_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 void zebra_interface_bfd_update(struct interface *ifp, struct prefix *dp,
@@ -101,7 +101,7 @@ static int zsend_bfd_peer_replay(int cmd, struct zserv *client)
        stream_putw_at(s, 0, stream_get_endp(s));
 
        client->bfd_peer_replay_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 void zebra_bfd_peer_replay_req(void)
index 28e09fe1934470826c2898bf76c2c5df4bb5cb08..bf76f7e86b7d32758764e8110fc17e2e230f79ad 100644 (file)
@@ -292,7 +292,7 @@ void zebra_pw_init(struct zebra_vrf *zvrf)
        RB_INIT(zebra_pw_head, &zvrf->pseudowires);
        RB_INIT(zebra_static_pw_head, &zvrf->static_pseudowires);
 
-       hook_register(zapi_client_close, zebra_pw_client_close);
+       hook_register(zserv_client_close, zebra_pw_client_close);
 }
 
 void zebra_pw_exit(struct zebra_vrf *zvrf)
index 90c39bcc6f01e319f79da7b6121f8fdcf043587a..d482e0ab3da3ec1d796c304e3eaddf9eb031022d 100644 (file)
@@ -73,7 +73,7 @@ int zebra_rnh_ipv6_default_route = 0;
 
 void zebra_rnh_init(void)
 {
-       hook_register(zapi_client_close, zebra_client_cleanup_rnh);
+       hook_register(zserv_client_close, zebra_client_cleanup_rnh);
 }
 
 static inline struct route_table *get_rnh_table(vrf_id_t vrfid, int family,
@@ -1106,7 +1106,7 @@ static int send_client(struct rnh *rnh, struct zserv *client, rnh_type_t type,
 
        client->nh_last_upd_time = monotime(NULL);
        client->last_write_cmd = cmd;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 static void print_nh(struct nexthop *nexthop, struct vty *vty)
index f7548f618f532264d2660108c43fc6c756c4b85a..eb11941a3a5f5843aa394f0dcfcf350ac2473d78 100644 (file)
@@ -3544,7 +3544,8 @@ DEFUN_HIDDEN (zebra_packet_process,
 {
        uint32_t packets = strtoul(argv[2]->arg, NULL, 10);
 
-       zebrad.packets_to_process = packets;
+       atomic_store_explicit(&zebrad.packets_to_process, packets,
+                             memory_order_relaxed);
 
        return CMD_SUCCESS;
 }
@@ -3557,7 +3558,9 @@ DEFUN_HIDDEN (no_zebra_packet_process,
              "Zapi Protocol\n"
              "Number of packets to process before relinquishing thread\n")
 {
-       zebrad.packets_to_process = ZEBRA_ZAPI_PACKETS_TO_PROCESS;
+       atomic_store_explicit(&zebrad.packets_to_process,
+                             ZEBRA_ZAPI_PACKETS_TO_PROCESS,
+                             memory_order_relaxed);
 
        return CMD_SUCCESS;
 }
index 74c1f3f1781cc47a309ed8b25f2ddd2eefd78f7e..14644d223d317ed40260d2cde45d44632d1cfea9 100644 (file)
@@ -1195,7 +1195,7 @@ static int zvni_macip_send_msg_to_client(vni_t vni, struct ethaddr *macaddr,
        struct zserv *client = NULL;
        struct stream *s = NULL;
 
-       client = zebra_find_client(ZEBRA_ROUTE_BGP, 0);
+       client = zserv_find_client(ZEBRA_ROUTE_BGP, 0);
        /* BGP may not be running. */
        if (!client)
                return 0;
@@ -1237,7 +1237,7 @@ static int zvni_macip_send_msg_to_client(vni_t vni, struct ethaddr *macaddr,
        else
                client->macipdel_cnt++;
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /*
@@ -2779,7 +2779,7 @@ static int zvni_send_add_to_client(zebra_vni_t *zvni)
        struct zserv *client;
        struct stream *s;
 
-       client = zebra_find_client(ZEBRA_ROUTE_BGP, 0);
+       client = zserv_find_client(ZEBRA_ROUTE_BGP, 0);
        /* BGP may not be running. */
        if (!client)
                return 0;
@@ -2801,7 +2801,7 @@ static int zvni_send_add_to_client(zebra_vni_t *zvni)
                           zebra_route_string(client->proto));
 
        client->vniadd_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /*
@@ -2812,7 +2812,7 @@ static int zvni_send_del_to_client(vni_t vni)
        struct zserv *client;
        struct stream *s;
 
-       client = zebra_find_client(ZEBRA_ROUTE_BGP, 0);
+       client = zserv_find_client(ZEBRA_ROUTE_BGP, 0);
        /* BGP may not be running. */
        if (!client)
                return 0;
@@ -2831,7 +2831,7 @@ static int zvni_send_del_to_client(vni_t vni)
                           zebra_route_string(client->proto));
 
        client->vnidel_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /*
@@ -3747,7 +3747,7 @@ static int zl3vni_send_add_to_client(zebra_l3vni_t *zl3vni)
        struct ethaddr rmac;
        char buf[ETHER_ADDR_STRLEN];
 
-       client = zebra_find_client(ZEBRA_ROUTE_BGP, 0);
+       client = zserv_find_client(ZEBRA_ROUTE_BGP, 0);
        /* BGP may not be running. */
        if (!client)
                return 0;
@@ -3779,7 +3779,7 @@ static int zl3vni_send_add_to_client(zebra_l3vni_t *zl3vni)
                        zebra_route_string(client->proto));
 
        client->l3vniadd_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /*
@@ -3790,7 +3790,7 @@ static int zl3vni_send_del_to_client(zebra_l3vni_t *zl3vni)
        struct stream *s = NULL;
        struct zserv *client = NULL;
 
-       client = zebra_find_client(ZEBRA_ROUTE_BGP, 0);
+       client = zserv_find_client(ZEBRA_ROUTE_BGP, 0);
        /* BGP may not be running. */
        if (!client)
                return 0;
@@ -3809,7 +3809,7 @@ static int zl3vni_send_del_to_client(zebra_l3vni_t *zl3vni)
                           zebra_route_string(client->proto));
 
        client->l3vnidel_cnt++;
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 static void zebra_vxlan_process_l3vni_oper_up(zebra_l3vni_t *zl3vni)
@@ -3922,7 +3922,7 @@ static int ip_prefix_send_to_client(vrf_id_t vrf_id, struct prefix *p,
        struct stream *s = NULL;
        char buf[PREFIX_STRLEN];
 
-       client = zebra_find_client(ZEBRA_ROUTE_BGP, 0);
+       client = zserv_find_client(ZEBRA_ROUTE_BGP, 0);
        /* BGP may not be running. */
        if (!client)
                return 0;
@@ -3946,7 +3946,7 @@ static int ip_prefix_send_to_client(vrf_id_t vrf_id, struct prefix *p,
        else
                client->prefixdel_cnt++;
 
-       return zebra_server_send_message(client, s);
+       return zserv_send_message(client, s);
 }
 
 /* re-add remote rmac if needed */
index 7dcd654240d50eb5927101883a33365a846b1348..a099cfc057ddf941e9f694ee49d0aebd44b32deb 100644 (file)
@@ -52,6 +52,8 @@
 #include "lib/vty.h"              /* for vty_out, vty (ptr only) */
 #include "lib/zassert.h"          /* for assert */
 #include "lib/zclient.h"          /* for zmsghdr, ZEBRA_HEADER_SIZE, ZEBRA... */
+#include "lib/frr_pthread.h"      /* for frr_pthread_new, frr_pthread_stop... */
+#include "lib/frratomic.h"        /* for atomic_load_explicit, atomic_stor... */
 
 #include "zebra/debug.h"          /* for various debugging macros */
 #include "zebra/rib.h"            /* for rib_score_proto */
 #include "zebra/zserv.h"          /* for zserv */
 /* clang-format on */
 
-/* Event list of zebra. */
-enum event { ZEBRA_READ, ZEBRA_WRITE };
 /* privileges */
 extern struct zebra_privs_t zserv_privs;
-/* post event into client */
-static void zebra_event(struct zserv *client, enum event event);
-
-
-/* Public interface --------------------------------------------------------- */
-
-int zebra_server_send_message(struct zserv *client, struct stream *msg)
-{
-       stream_fifo_push(client->obuf_fifo, msg);
-       zebra_event(client, ZEBRA_WRITE);
-       return 0;
-}
-
-/* Lifecycle ---------------------------------------------------------------- */
-
-/* Hooks for client connect / disconnect */
-DEFINE_HOOK(zapi_client_connect, (struct zserv *client), (client));
-DEFINE_KOOH(zapi_client_close, (struct zserv *client), (client));
-
-/* free zebra client information. */
-static void zebra_client_free(struct zserv *client)
-{
-       hook_call(zapi_client_close, client);
-
-       /* Close file descriptor. */
-       if (client->sock) {
-               unsigned long nroutes;
-
-               close(client->sock);
-               nroutes = rib_score_proto(client->proto, client->instance);
-               zlog_notice(
-                       "client %d disconnected. %lu %s routes removed from the rib",
-                       client->sock, nroutes,
-                       zebra_route_string(client->proto));
-               client->sock = -1;
-       }
-
-       /* Free stream buffers. */
-       if (client->ibuf_work)
-               stream_free(client->ibuf_work);
-       if (client->obuf_work)
-               stream_free(client->obuf_work);
-       if (client->ibuf_fifo)
-               stream_fifo_free(client->ibuf_fifo);
-       if (client->obuf_fifo)
-               stream_fifo_free(client->obuf_fifo);
-       if (client->wb)
-               buffer_free(client->wb);
-
-       /* Release threads. */
-       if (client->t_read)
-               thread_cancel(client->t_read);
-       if (client->t_write)
-               thread_cancel(client->t_write);
-       if (client->t_suicide)
-               thread_cancel(client->t_suicide);
-
-       /* Free bitmaps. */
-       for (afi_t afi = AFI_IP; afi < AFI_MAX; afi++)
-               for (int i = 0; i < ZEBRA_ROUTE_MAX; i++)
-                       vrf_bitmap_free(client->redist[afi][i]);
-
-       vrf_bitmap_free(client->redist_default);
-       vrf_bitmap_free(client->ifinfo);
-       vrf_bitmap_free(client->ridinfo);
-
-       XFREE(MTYPE_TMP, client);
-}
 
 /*
- * Called from client thread to terminate itself.
+ * Client thread events.
+ *
+ * These are used almost exclusively by client threads to drive their own event
+ * loops. The only exception is in zebra_client_create(), which pushes an
+ * initial ZSERV_CLIENT_READ event to start the API handler loop.
  */
-static void zebra_client_close(struct zserv *client)
-{
-       listnode_delete(zebrad.client_list, client);
-       zebra_client_free(client);
-}
-
-/* Make new client. */
-static void zebra_client_create(int sock)
-{
-       struct zserv *client;
-       int i;
-       afi_t afi;
-
-       client = XCALLOC(MTYPE_TMP, sizeof(struct zserv));
-
-       /* Make client input/output buffer. */
-       client->sock = sock;
-       client->ibuf_fifo = stream_fifo_new();
-       client->obuf_fifo = stream_fifo_new();
-       client->ibuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
-       client->obuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
-       client->wb = buffer_new(0);
-
-       /* Set table number. */
-       client->rtm_table = zebrad.rtm_table_default;
-
-       client->connect_time = monotime(NULL);
-       /* Initialize flags */
-       for (afi = AFI_IP; afi < AFI_MAX; afi++)
-               for (i = 0; i < ZEBRA_ROUTE_MAX; i++)
-                       client->redist[afi][i] = vrf_bitmap_init();
-       client->redist_default = vrf_bitmap_init();
-       client->ifinfo = vrf_bitmap_init();
-       client->ridinfo = vrf_bitmap_init();
-
-       /* by default, it's not a synchronous client */
-       client->is_synchronous = 0;
+enum zserv_client_event {
+       /* Schedule a socket read */
+       ZSERV_CLIENT_READ,
+       /* Schedule a buffer write */
+       ZSERV_CLIENT_WRITE,
+};
 
-       /* Add this client to linked list. */
-       listnode_add(zebrad.client_list, client);
-
-       zebra_vrf_update_all(client);
+/*
+ * Main thread events.
+ *
+ * These are used by client threads to notify the main thread about various
+ * events and to make processing requests.
+ */
+enum zserv_event {
+       /* Schedule listen job on Zebra API socket */
+       ZSERV_ACCEPT,
+       /* The calling client has packets on its input buffer */
+       ZSERV_PROCESS_MESSAGES,
+       /* The calling client wishes to be killed */
+       ZSERV_HANDLE_CLOSE,
+};
 
-       hook_call(zapi_client_connect, client);
+/*
+ * Zebra server event driver for all client threads.
+ *
+ * This is essentially a wrapper around thread_add_event() that centralizes
+ * those scheduling calls into one place.
+ *
+ * All calls to this function schedule an event on the pthread running the
+ * provided client.
+ *
+ * client
+ *    the client in question, and thread target
+ *
+ * event
+ *    the event to notify them about
+ */
+static void zserv_client_event(struct zserv *client,
+                              enum zserv_client_event event);
 
-       /* start read loop */
-       zebra_event(client, ZEBRA_READ);
-}
+/*
+ * Zebra server event driver for the main thread.
+ *
+ * This is essentially a wrapper around thread_add_event() that centralizes
+ * those scheduling calls into one place.
+ *
+ * All calls to this function schedule an event on Zebra's main pthread.
+ *
+ * client
+ *    the client in question
+ *
+ * event
+ *    the event to notify the main thread about
+ */
+static void zserv_event(struct zserv *client, enum zserv_event event);
 
-static int zserv_delayed_close(struct thread *thread)
-{
-       struct zserv *client = THREAD_ARG(thread);
 
-       client->t_suicide = NULL;
-       zebra_client_close(client);
-       return 0;
-}
+/* Client thread lifecycle -------------------------------------------------- */
 
 /*
  * Log zapi message to zlog.
@@ -220,176 +157,157 @@ static void zserv_log_message(const char *errmsg, struct stream *msg,
        zlog_hexdump(msg->data, STREAM_READABLE(msg));
 }
 
-static int zserv_flush_data(struct thread *thread)
+/*
+ * Gracefully shut down a client connection.
+ *
+ * Cancel any pending tasks for the client's thread. Then schedule a task on the
+ * main thread to shut down the calling thread.
+ *
+ * Must be called from the client pthread, never the main thread.
+ */
+static void zserv_client_close(struct zserv *client)
 {
-       struct zserv *client = THREAD_ARG(thread);
-
-       client->t_write = NULL;
-       if (client->t_suicide) {
-               zebra_client_close(client);
-               return -1;
-       }
-       switch (buffer_flush_available(client->wb, client->sock)) {
-       case BUFFER_ERROR:
-               zlog_warn(
-                       "%s: buffer_flush_available failed on zserv client fd %d, closing",
-                       __func__, client->sock);
-               zebra_client_close(client);
-               client = NULL;
-               break;
-       case BUFFER_PENDING:
-               client->t_write = NULL;
-               thread_add_write(zebrad.master, zserv_flush_data, client,
-                                client->sock, &client->t_write);
-               break;
-       case BUFFER_EMPTY:
-               break;
-       }
-
-       if (client)
-               client->last_write_time = monotime(NULL);
-       return 0;
+       atomic_store_explicit(&client->pthread->running, false,
+                             memory_order_seq_cst);
+       THREAD_OFF(client->t_read);
+       THREAD_OFF(client->t_write);
+       zserv_event(client, ZSERV_HANDLE_CLOSE);
 }
 
 /*
- * Write a single packet.
+ * Write all pending messages to client socket.
+ *
+ * This function first attempts to flush any buffered data. If unsuccessful,
+ * the function reschedules itself and returns. If successful, it pops all
+ * available messages from the output queue and continues to write data
+ * directly to the socket until the socket would block. If the socket never
+ * blocks and all data is written, the function returns without rescheduling
+ * itself. If the socket ends up throwing EWOULDBLOCK, the remaining data is
+ * buffered and the function reschedules itself.
+ *
+ * The utility of the buffer is that it allows us to vastly reduce lock
+ * contention by allowing us to pop *all* messages off the output queue at once
+ * instead of locking and unlocking each time we want to pop a single message
+ * off the queue. The same thing could arguably be accomplished faster by
+ * allowing the main thread to write directly into the buffer instead of
+ * enqueuing packets onto an intermediary queue, but the intermediary queue
+ * allows us to expose information about input and output queues to the user in
+ * terms of number of packets rather than size of data.
  */
 static int zserv_write(struct thread *thread)
 {
        struct zserv *client = THREAD_ARG(thread);
        struct stream *msg;
-       int writerv;
-
-       if (client->t_suicide)
-               return -1;
-
-       if (client->is_synchronous)
-               return 0;
-
-       msg = stream_fifo_pop(client->obuf_fifo);
-       stream_set_getp(msg, 0);
-       client->last_write_cmd = stream_getw_from(msg, 6);
+       uint32_t wcmd;
+       struct stream_fifo *cache;
 
-       writerv = buffer_write(client->wb, client->sock, STREAM_DATA(msg),
-                              stream_get_endp(msg));
-
-       stream_free(msg);
-
-       switch (writerv) {
+       /* If we have any data pending, try to flush it first */
+       switch (buffer_flush_all(client->wb, client->sock)) {
        case BUFFER_ERROR:
-               zlog_warn(
-                       "%s: buffer_write failed to zserv client fd %d, closing",
-                       __func__, client->sock);
-               /*
-                * Schedule a delayed close since many of the functions that
-                * call this one do not check the return code. They do not
-                * allow for the possibility that an I/O error may have caused
-                * the client to be deleted.
-                */
-               client->t_suicide = NULL;
-               thread_add_event(zebrad.master, zserv_delayed_close, client, 0,
-                                &client->t_suicide);
-               return -1;
-       case BUFFER_EMPTY:
-               THREAD_OFF(client->t_write);
-               break;
+               goto zwrite_fail;
        case BUFFER_PENDING:
-               thread_add_write(zebrad.master, zserv_flush_data, client,
-                                client->sock, &client->t_write);
+               atomic_store_explicit(&client->last_write_time,
+                                     (uint32_t)monotime(NULL),
+                                     memory_order_relaxed);
+               zserv_client_event(client, ZSERV_CLIENT_WRITE);
+               return 0;
+       case BUFFER_EMPTY:
                break;
        }
 
-       if (client->obuf_fifo->count)
-               zebra_event(client, ZEBRA_WRITE);
-
-       client->last_write_time = monotime(NULL);
-       return 0;
-}
-
-#if defined(HANDLE_ZAPI_FUZZING)
-static void zserv_write_incoming(struct stream *orig, uint16_t command)
-{
-       char fname[MAXPATHLEN];
-       struct stream *copy;
-       int fd = -1;
+       cache = stream_fifo_new();
 
-       copy = stream_dup(orig);
-       stream_set_getp(copy, 0);
+       pthread_mutex_lock(&client->obuf_mtx);
+       {
+               while (stream_fifo_head(client->obuf_fifo))
+                       stream_fifo_push(cache,
+                                        stream_fifo_pop(client->obuf_fifo));
+       }
+       pthread_mutex_unlock(&client->obuf_mtx);
 
-       zserv_privs.change(ZPRIVS_RAISE);
-       snprintf(fname, MAXPATHLEN, "%s/%u", DAEMON_VTY_DIR, command);
-       fd = open(fname, O_CREAT | O_WRONLY | O_EXCL, 0644);
-       stream_flush(copy, fd);
-       close(fd);
-       zserv_privs.change(ZPRIVS_LOWER);
-       stream_free(copy);
-}
-#endif
+       if (cache->tail) {
+               msg = cache->tail;
+               stream_set_getp(msg, 0);
+               wcmd = stream_getw_from(msg, 6);
+       }
 
-static int zserv_process_messages(struct thread *thread)
-{
-       struct zserv *client = THREAD_ARG(thread);
-       struct zebra_vrf *zvrf;
-       struct zmsghdr hdr;
-       struct stream *msg;
-       bool hdrvalid;
+       while (stream_fifo_head(cache)) {
+               msg = stream_fifo_pop(cache);
+               buffer_put(client->wb, STREAM_DATA(msg), stream_get_endp(msg));
+               stream_free(msg);
+       }
 
-       do {
-               msg = stream_fifo_pop(client->ibuf_fifo);
+       stream_fifo_free(cache);
 
-               /* break if out of messages */
-               if (!msg)
-                       continue;
+       /* If we have any data pending, try to flush it first */
+       switch (buffer_flush_all(client->wb, client->sock)) {
+       case BUFFER_ERROR:
+               goto zwrite_fail;
+       case BUFFER_PENDING:
+               atomic_store_explicit(&client->last_write_time,
+                                     (uint32_t)monotime(NULL),
+                                     memory_order_relaxed);
+               zserv_client_event(client, ZSERV_CLIENT_WRITE);
+               return 0;
+       case BUFFER_EMPTY:
+               break;
+       }
 
-               /* read & check header */
-               hdrvalid = zapi_parse_header(msg, &hdr);
-               if (!hdrvalid && IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV) {
-                       const char *emsg = "Message has corrupt header";
-                       zserv_log_message(emsg, msg, NULL);
-               }
-               if (!hdrvalid)
-                       continue;
-
-               hdr.length -= ZEBRA_HEADER_SIZE;
-               /* lookup vrf */
-               zvrf = zebra_vrf_lookup_by_id(hdr.vrf_id);
-               if (!zvrf && IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV) {
-                       const char *emsg = "Message specifies unknown VRF";
-                       zserv_log_message(emsg, msg, &hdr);
-               }
-               if (!zvrf)
-                       continue;
+       atomic_store_explicit(&client->last_write_cmd, wcmd,
+                             memory_order_relaxed);
 
-               /* process commands */
-               zserv_handle_commands(client, &hdr, msg, zvrf);
+       atomic_store_explicit(&client->last_write_time,
+                             (uint32_t)monotime(NULL), memory_order_relaxed);
 
-       } while (msg);
+       return 0;
 
+zwrite_fail:
+       zlog_warn("%s: could not write to %s [fd = %d], closing.", __func__,
+                 zebra_route_string(client->proto), client->sock);
+       zserv_client_close(client);
        return 0;
 }
 
-/* Handler of zebra service request. */
+/*
+ * Read and process data from a client socket.
+ *
+ * The responsibilities here are to read raw data from the client socket,
+ * validate the header, encapsulate it into a single stream object, push it
+ * onto the input queue and then notify the main thread that there is new data
+ * available.
+ *
+ * This function first looks for any data in the client structure's working
+ * input buffer. If data is present, it is assumed that reading stopped in a
+ * previous invocation of this task and needs to be resumed to finish a message.
+ * Otherwise, the socket data stream is assumed to be at the beginning of a new
+ * ZAPI message (specifically at the header). The header is read and validated.
+ * If the header passed validation then the length field found in the header is
+ * used to compute the total length of the message. That much data is read (but
+ * not inspected), appended to the header, placed into a stream and pushed onto
+ * the client's input queue. A task is then scheduled on the main thread to
+ * process the client's input queue. Finally, if all of this was successful,
+ * this task reschedules itself.
+ *
+ * Any failure in any of these actions is handled by terminating the client.
+ */
 static int zserv_read(struct thread *thread)
 {
+       struct zserv *client = THREAD_ARG(thread);
        int sock;
-       struct zserv *client;
        size_t already;
-#if defined(HANDLE_ZAPI_FUZZING)
-       int packets = 1;
-#else
-       int packets = zebrad.packets_to_process;
-#endif
-       /* Get thread data.  Reset reading thread because I'm running. */
-       sock = THREAD_FD(thread);
-       client = THREAD_ARG(thread);
+       struct stream_fifo *cache;
+       uint32_t p2p_orig;
 
-       if (client->t_suicide) {
-               zebra_client_close(client);
-               return -1;
-       }
+       uint32_t p2p;
+       struct zmsghdr hdr;
 
-       while (packets) {
-               struct zmsghdr hdr;
+       p2p_orig = atomic_load_explicit(&zebrad.packets_to_process,
+                                       memory_order_relaxed);
+       cache = stream_fifo_new();
+       p2p = p2p_orig;
+       sock = THREAD_FD(thread);
+
+       while (p2p) {
                ssize_t nb;
                bool hdrvalid;
                char errmsg[256];
@@ -449,6 +367,7 @@ static int zserv_read(struct thread *thread)
                                "Message has corrupt header\n%s: socket %d message length %u exceeds buffer size %lu",
                                __func__, sock, hdr.length,
                                (unsigned long)STREAM_SIZE(client->ibuf_work));
+                       zserv_log_message(errmsg, client->ibuf_work, &hdr);
                        goto zread_fail;
                }
 
@@ -468,10 +387,6 @@ static int zserv_read(struct thread *thread)
                        }
                }
 
-#if defined(HANDLE_ZAPI_FUZZING)
-               zserv_write_incoming(client->ibuf_work, command);
-#endif
-
                /* Debug packet information. */
                if (IS_ZEBRA_DEBUG_EVENT)
                        zlog_debug("zebra message comes from socket [%d]",
@@ -480,55 +395,319 @@ static int zserv_read(struct thread *thread)
                if (IS_ZEBRA_DEBUG_PACKET && IS_ZEBRA_DEBUG_RECV)
                        zserv_log_message(NULL, client->ibuf_work, &hdr);
 
-               client->last_read_time = monotime(NULL);
-               client->last_read_cmd = hdr.command;
-
                stream_set_getp(client->ibuf_work, 0);
                struct stream *msg = stream_dup(client->ibuf_work);
 
-               stream_fifo_push(client->ibuf_fifo, msg);
+               stream_fifo_push(cache, msg);
+               stream_reset(client->ibuf_work);
+               p2p--;
+       }
 
-               if (client->t_suicide)
-                       goto zread_fail;
+       if (p2p < p2p_orig) {
+               /* update session statistics */
+               atomic_store_explicit(&client->last_read_time, monotime(NULL),
+                                     memory_order_relaxed);
+               atomic_store_explicit(&client->last_read_cmd, hdr.command,
+                                     memory_order_relaxed);
+
+               /* publish read packets on client's input queue */
+               pthread_mutex_lock(&client->ibuf_mtx);
+               {
+                       while (cache->head)
+                               stream_fifo_push(client->ibuf_fifo,
+                                                stream_fifo_pop(cache));
+               }
+               pthread_mutex_unlock(&client->ibuf_mtx);
+
+               /* Schedule job to process those packets */
+               zserv_event(client, ZSERV_PROCESS_MESSAGES);
 
-               --packets;
-               stream_reset(client->ibuf_work);
        }
 
        if (IS_ZEBRA_DEBUG_PACKET)
-               zlog_debug("Read %d packets",
-                          zebrad.packets_to_process - packets);
-
-       /* Schedule job to process those packets */
-       thread_add_event(zebrad.master, &zserv_process_messages, client, 0,
-                        NULL);
+               zlog_debug("Read %d packets", p2p_orig - p2p);
 
        /* Reschedule ourselves */
-       zebra_event(client, ZEBRA_READ);
+       zserv_client_event(client, ZSERV_CLIENT_READ);
+
+       stream_fifo_free(cache);
 
        return 0;
 
 zread_fail:
-       zebra_client_close(client);
+       stream_fifo_free(cache);
+       zserv_client_close(client);
        return -1;
 }
 
-static void zebra_event(struct zserv *client, enum event event)
+static void zserv_client_event(struct zserv *client,
+                              enum zserv_client_event event)
 {
        switch (event) {
-       case ZEBRA_READ:
-               thread_add_read(zebrad.master, zserv_read, client, client->sock,
-                               &client->t_read);
+       case ZSERV_CLIENT_READ:
+               thread_add_read(client->pthread->master, zserv_read, client,
+                               client->sock, &client->t_read);
                break;
-       case ZEBRA_WRITE:
-               thread_add_write(zebrad.master, zserv_write, client,
+       case ZSERV_CLIENT_WRITE:
+               thread_add_write(client->pthread->master, zserv_write, client,
                                 client->sock, &client->t_write);
                break;
        }
 }
 
-/* Accept code of zebra server socket. */
-static int zebra_accept(struct thread *thread)
+/* Main thread lifecycle ---------------------------------------------------- */
+
+/*
+ * Read and process messages from a client.
+ *
+ * This task runs on the main pthread. It is scheduled by client pthreads when
+ * they have new messages available on their input queues. The client is passed
+ * as the task argument.
+ *
+ * Each message is popped off the client's input queue and the action associated
+ * with the message is executed. This proceeds until there are no more messages,
+ * an error occurs, or the processing limit is reached.
+ *
+ * The client's I/O thread can push at most zebrad.packets_to_process messages
+ * onto the input buffer before notifying us there are packets to read. As long
+ * as we always process zebrad.packets_to_process messages here, then we can
+ * rely on the read thread to handle queuing this task enough times to process
+ * everything on the input queue.
+ */
+static int zserv_process_messages(struct thread *thread)
+{
+       struct zserv *client = THREAD_ARG(thread);
+       struct stream *msg;
+       struct stream_fifo *cache = stream_fifo_new();
+
+       uint32_t p2p = zebrad.packets_to_process;
+
+       pthread_mutex_lock(&client->ibuf_mtx);
+       {
+               uint32_t i;
+               for (i = 0; i < p2p && stream_fifo_head(client->ibuf_fifo);
+                    ++i) {
+                       msg = stream_fifo_pop(client->ibuf_fifo);
+                       stream_fifo_push(cache, msg);
+               }
+
+               msg = NULL;
+       }
+       pthread_mutex_unlock(&client->ibuf_mtx);
+
+       while (stream_fifo_head(cache)) {
+               msg = stream_fifo_pop(cache);
+               zserv_handle_commands(client, msg);
+               stream_free(msg);
+       }
+
+       stream_fifo_free(cache);
+
+       return 0;
+}
+
+int zserv_send_message(struct zserv *client, struct stream *msg)
+{
+       /*
+        * This is a somewhat poorly named variable added with Zebra's portion
+        * of the label manager. That component does not use the regular
+        * zserv/zapi_msg interface for handling its messages, as the client
+        * itself runs in-process. Instead it uses synchronous writes on the
+        * zserv client's socket directly in the zread* handlers for its
+        * message types. Furthermore, it cannot handle the usual messages
+        * Zebra sends (such as those for interface changes) and so has added
+        * this flag and check here as a hack to suppress all messages that it
+        * does not explicitly know about.
+        *
+        * In any case this needs to be cleaned up at some point.
+        *
+        * See also:
+        *    zread_label_manager_request
+        *    zsend_label_manager_connect_response
+        *    zsend_assign_label_chunk_response
+        *    ...
+        */
+       if (client->is_synchronous)
+               return 0;
+
+       pthread_mutex_lock(&client->obuf_mtx);
+       {
+               stream_fifo_push(client->obuf_fifo, msg);
+       }
+       pthread_mutex_unlock(&client->obuf_mtx);
+
+       zserv_client_event(client, ZSERV_CLIENT_WRITE);
+
+       return 0;
+}
+
+
+/* Hooks for client connect / disconnect */
+DEFINE_HOOK(zserv_client_connect, (struct zserv *client), (client));
+DEFINE_KOOH(zserv_client_close, (struct zserv *client), (client));
+
+/*
+ * Deinitialize zebra client.
+ *
+ * - Deregister and deinitialize related internal resources
+ * - Gracefully close socket
+ * - Free associated resources
+ * - Free client structure
+ *
+ * This does *not* take any action on the struct thread * fields. These are
+ * managed by the owning pthread and any tasks associated with them must have
+ * been stopped prior to invoking this function.
+ */
+static void zserv_client_free(struct zserv *client)
+{
+       hook_call(zserv_client_close, client);
+
+       /* Close file descriptor. */
+       if (client->sock) {
+               unsigned long nroutes;
+
+               close(client->sock);
+               nroutes = rib_score_proto(client->proto, client->instance);
+               zlog_notice(
+                       "client %d disconnected. %lu %s routes removed from the rib",
+                       client->sock, nroutes,
+                       zebra_route_string(client->proto));
+               client->sock = -1;
+       }
+
+       /* Free stream buffers. */
+       if (client->ibuf_work)
+               stream_free(client->ibuf_work);
+       if (client->obuf_work)
+               stream_free(client->obuf_work);
+       if (client->ibuf_fifo)
+               stream_fifo_free(client->ibuf_fifo);
+       if (client->obuf_fifo)
+               stream_fifo_free(client->obuf_fifo);
+       if (client->wb)
+               buffer_free(client->wb);
+
+       /* Free buffer mutexes */
+       pthread_mutex_destroy(&client->obuf_mtx);
+       pthread_mutex_destroy(&client->ibuf_mtx);
+
+       /* Free bitmaps. */
+       for (afi_t afi = AFI_IP; afi < AFI_MAX; afi++)
+               for (int i = 0; i < ZEBRA_ROUTE_MAX; i++)
+                       vrf_bitmap_free(client->redist[afi][i]);
+
+       vrf_bitmap_free(client->redist_default);
+       vrf_bitmap_free(client->ifinfo);
+       vrf_bitmap_free(client->ridinfo);
+
+       XFREE(MTYPE_TMP, client);
+}
+
+/*
+ * Finish closing a client.
+ *
+ * This task is scheduled by a ZAPI client pthread on the main pthread when it
+ * wants to stop itself. When this executes, the client connection should
+ * already have been closed. This task's responsibility is to gracefully
+ * terminate the client thread, update relevant internal datastructures and
+ * free any resources allocated by the main thread.
+ */
+static int zserv_handle_client_close(struct thread *thread)
+{
+       struct zserv *client = THREAD_ARG(thread);
+
+       /*
+        * Ensure these have been nulled. This does not equate to the
+        * associated task(s) being scheduled or unscheduled on the client
+        * pthread's threadmaster.
+        */
+       assert(!client->t_read);
+       assert(!client->t_write);
+
+       /* synchronously stop thread */
+       frr_pthread_stop(client->pthread, NULL);
+
+       /* destroy frr_pthread */
+       frr_pthread_destroy(client->pthread);
+       client->pthread = NULL;
+
+       listnode_delete(zebrad.client_list, client);
+       zserv_client_free(client);
+       return 0;
+}
+
+/*
+ * Create a new client.
+ *
+ * This is called when a new connection is accept()'d on the ZAPI socket. It
+ * initializes new client structure, notifies any subscribers of the connection
+ * event and spawns the client's thread.
+ *
+ * sock
+ *    client's socket file descriptor
+ */
+static void zserv_client_create(int sock)
+{
+       struct zserv *client;
+       int i;
+       afi_t afi;
+
+       client = XCALLOC(MTYPE_TMP, sizeof(struct zserv));
+
+       /* Make client input/output buffer. */
+       client->sock = sock;
+       client->ibuf_fifo = stream_fifo_new();
+       client->obuf_fifo = stream_fifo_new();
+       client->ibuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
+       client->obuf_work = stream_new(ZEBRA_MAX_PACKET_SIZ);
+       pthread_mutex_init(&client->ibuf_mtx, NULL);
+       pthread_mutex_init(&client->obuf_mtx, NULL);
+       client->wb = buffer_new(0);
+
+       /* Set table number. */
+       client->rtm_table = zebrad.rtm_table_default;
+
+       atomic_store_explicit(&client->connect_time, (uint32_t) monotime(NULL),
+                             memory_order_relaxed);
+
+       /* Initialize flags */
+       for (afi = AFI_IP; afi < AFI_MAX; afi++)
+               for (i = 0; i < ZEBRA_ROUTE_MAX; i++)
+                       client->redist[afi][i] = vrf_bitmap_init();
+       client->redist_default = vrf_bitmap_init();
+       client->ifinfo = vrf_bitmap_init();
+       client->ridinfo = vrf_bitmap_init();
+
+       /* by default, it's not a synchronous client */
+       client->is_synchronous = 0;
+
+       /* Add this client to linked list. */
+       listnode_add(zebrad.client_list, client);
+
+       struct frr_pthread_attr zclient_pthr_attrs = {
+               .id = frr_pthread_get_id(),
+               .start = frr_pthread_attr_default.start,
+               .stop = frr_pthread_attr_default.stop
+       };
+       client->pthread =
+               frr_pthread_new(&zclient_pthr_attrs, "Zebra API client thread");
+
+       zebra_vrf_update_all(client);
+
+       /* start read loop */
+       zserv_client_event(client, ZSERV_CLIENT_READ);
+
+       /* call callbacks */
+       hook_call(zserv_client_connect, client);
+
+       /* start pthread */
+       frr_pthread_run(client->pthread, NULL);
+}
+
+/*
+ * Accept socket connection.
+ */
+static int zserv_accept(struct thread *thread)
 {
        int accept_sock;
        int client_sock;
@@ -538,7 +717,7 @@ static int zebra_accept(struct thread *thread)
        accept_sock = THREAD_FD(thread);
 
        /* Reregister myself. */
-       thread_add_read(zebrad.master, zebra_accept, NULL, accept_sock, NULL);
+       zserv_event(NULL, ZSERV_ACCEPT);
 
        len = sizeof(struct sockaddr_in);
        client_sock = accept(accept_sock, (struct sockaddr *)&client, &len);
@@ -553,16 +732,14 @@ static int zebra_accept(struct thread *thread)
        set_nonblocking(client_sock);
 
        /* Create new zebra client. */
-       zebra_client_create(client_sock);
+       zserv_client_create(client_sock);
 
        return 0;
 }
 
-/* Make zebra server socket, wiping any existing one (see bug #403). */
-void zebra_zserv_socket_init(char *path)
+void zserv_start(char *path)
 {
        int ret;
-       int sock;
        mode_t old_mask;
        struct sockaddr_storage sa;
        socklen_t sa_len;
@@ -575,8 +752,8 @@ void zebra_zserv_socket_init(char *path)
        old_mask = umask(0077);
 
        /* Make UNIX domain socket. */
-       sock = socket(sa.ss_family, SOCK_STREAM, 0);
-       if (sock < 0) {
+       zebrad.sock = socket(sa.ss_family, SOCK_STREAM, 0);
+       if (zebrad.sock < 0) {
                zlog_warn("Can't create zserv socket: %s",
                          safe_strerror(errno));
                zlog_warn(
@@ -585,8 +762,8 @@ void zebra_zserv_socket_init(char *path)
        }
 
        if (sa.ss_family != AF_UNIX) {
-               sockopt_reuseaddr(sock);
-               sockopt_reuseport(sock);
+               sockopt_reuseaddr(zebrad.sock);
+               sockopt_reuseport(zebrad.sock);
        } else {
                struct sockaddr_un *suna = (struct sockaddr_un *)&sa;
                if (suna->sun_path[0])
@@ -594,40 +771,62 @@ void zebra_zserv_socket_init(char *path)
        }
 
        zserv_privs.change(ZPRIVS_RAISE);
-       setsockopt_so_recvbuf(sock, 1048576);
-       setsockopt_so_sendbuf(sock, 1048576);
+       setsockopt_so_recvbuf(zebrad.sock, 1048576);
+       setsockopt_so_sendbuf(zebrad.sock, 1048576);
        zserv_privs.change(ZPRIVS_LOWER);
 
        if (sa.ss_family != AF_UNIX && zserv_privs.change(ZPRIVS_RAISE))
                zlog_err("Can't raise privileges");
 
-       ret = bind(sock, (struct sockaddr *)&sa, sa_len);
+       ret = bind(zebrad.sock, (struct sockaddr *)&sa, sa_len);
        if (ret < 0) {
                zlog_warn("Can't bind zserv socket on %s: %s", path,
                          safe_strerror(errno));
                zlog_warn(
                        "zebra can't provide full functionality due to above error");
-               close(sock);
+               close(zebrad.sock);
+               zebrad.sock = -1;
                return;
        }
        if (sa.ss_family != AF_UNIX && zserv_privs.change(ZPRIVS_LOWER))
                zlog_err("Can't lower privileges");
 
-       ret = listen(sock, 5);
+       ret = listen(zebrad.sock, 5);
        if (ret < 0) {
                zlog_warn("Can't listen to zserv socket %s: %s", path,
                          safe_strerror(errno));
                zlog_warn(
                        "zebra can't provide full functionality due to above error");
-               close(sock);
+               close(zebrad.sock);
+               zebrad.sock = -1;
                return;
        }
 
        umask(old_mask);
 
-       thread_add_read(zebrad.master, zebra_accept, NULL, sock, NULL);
+       zserv_event(NULL, ZSERV_ACCEPT);
+}
+
+void zserv_event(struct zserv *client, enum zserv_event event)
+{
+       switch (event) {
+       case ZSERV_ACCEPT:
+               thread_add_read(zebrad.master, zserv_accept, NULL, zebrad.sock,
+                               NULL);
+               break;
+       case ZSERV_PROCESS_MESSAGES:
+               thread_add_event(zebrad.master, zserv_process_messages, client,
+                                0, NULL);
+               break;
+       case ZSERV_HANDLE_CLOSE:
+               thread_add_event(zebrad.master, zserv_handle_client_close,
+                                client, 0, NULL);
+       }
 }
 
+
+/* General purpose ---------------------------------------------------------- */
+
 #define ZEBRA_TIME_BUF 32
 static char *zserv_time_buf(time_t *time1, char *buf, int buflen)
 {
@@ -663,6 +862,8 @@ static void zebra_show_client_detail(struct vty *vty, struct zserv *client)
 {
        char cbuf[ZEBRA_TIME_BUF], rbuf[ZEBRA_TIME_BUF];
        char wbuf[ZEBRA_TIME_BUF], nhbuf[ZEBRA_TIME_BUF], mbuf[ZEBRA_TIME_BUF];
+       time_t connect_time, last_read_time, last_write_time;
+       uint16_t last_read_cmd, last_write_cmd;
 
        vty_out(vty, "Client: %s", zebra_route_string(client->proto));
        if (client->instance)
@@ -673,8 +874,11 @@ static void zebra_show_client_detail(struct vty *vty, struct zserv *client)
        vty_out(vty, "FD: %d \n", client->sock);
        vty_out(vty, "Route Table ID: %d \n", client->rtm_table);
 
+       connect_time = (time_t) atomic_load_explicit(&client->connect_time,
+                                                    memory_order_relaxed);
+
        vty_out(vty, "Connect Time: %s \n",
-               zserv_time_buf(&client->connect_time, cbuf, ZEBRA_TIME_BUF));
+               zserv_time_buf(&connect_time, cbuf, ZEBRA_TIME_BUF));
        if (client->nh_reg_time) {
                vty_out(vty, "Nexthop Registry Time: %s \n",
                        zserv_time_buf(&client->nh_reg_time, nhbuf,
@@ -688,16 +892,26 @@ static void zebra_show_client_detail(struct vty *vty, struct zserv *client)
        } else
                vty_out(vty, "Not registered for Nexthop Updates\n");
 
+       last_read_time = (time_t)atomic_load_explicit(&client->last_read_time,
+                                                     memory_order_relaxed);
+       last_write_time = (time_t)atomic_load_explicit(&client->last_write_time,
+                                                      memory_order_relaxed);
+
+       last_read_cmd = atomic_load_explicit(&client->last_read_cmd,
+                                            memory_order_relaxed);
+       last_write_cmd = atomic_load_explicit(&client->last_write_cmd,
+                                             memory_order_relaxed);
+
        vty_out(vty, "Last Msg Rx Time: %s \n",
-               zserv_time_buf(&client->last_read_time, rbuf, ZEBRA_TIME_BUF));
+               zserv_time_buf(&last_read_time, rbuf, ZEBRA_TIME_BUF));
        vty_out(vty, "Last Msg Tx Time: %s \n",
-               zserv_time_buf(&client->last_write_time, wbuf, ZEBRA_TIME_BUF));
-       if (client->last_read_time)
+               zserv_time_buf(&last_write_time, wbuf, ZEBRA_TIME_BUF));
+       if (last_read_cmd)
                vty_out(vty, "Last Rcvd Cmd: %s \n",
-                       zserv_command_string(client->last_read_cmd));
-       if (client->last_write_time)
+                       zserv_command_string(last_read_cmd));
+       if (last_write_cmd)
                vty_out(vty, "Last Sent Cmd: %s \n",
-                       zserv_command_string(client->last_write_cmd));
+                       zserv_command_string(last_write_cmd));
        vty_out(vty, "\n");
 
        vty_out(vty, "Type        Add        Update     Del \n");
@@ -731,19 +945,27 @@ static void zebra_show_client_brief(struct vty *vty, struct zserv *client)
 {
        char cbuf[ZEBRA_TIME_BUF], rbuf[ZEBRA_TIME_BUF];
        char wbuf[ZEBRA_TIME_BUF];
+       time_t connect_time, last_read_time, last_write_time;
+
+       connect_time = (time_t)atomic_load_explicit(&client->connect_time,
+                                                   memory_order_relaxed);
+       last_read_time = (time_t)atomic_load_explicit(&client->last_read_time,
+                                                     memory_order_relaxed);
+       last_write_time = (time_t)atomic_load_explicit(&client->last_write_time,
+                                                      memory_order_relaxed);
 
        vty_out(vty, "%-8s%12s %12s%12s%8d/%-8d%8d/%-8d\n",
                zebra_route_string(client->proto),
-               zserv_time_buf(&client->connect_time, cbuf, ZEBRA_TIME_BUF),
-               zserv_time_buf(&client->last_read_time, rbuf, ZEBRA_TIME_BUF),
-               zserv_time_buf(&client->last_write_time, wbuf, ZEBRA_TIME_BUF),
+               zserv_time_buf(&connect_time, cbuf, ZEBRA_TIME_BUF),
+               zserv_time_buf(&last_read_time, rbuf, ZEBRA_TIME_BUF),
+               zserv_time_buf(&last_write_time, wbuf, ZEBRA_TIME_BUF),
                client->v4_route_add_cnt + client->v4_route_upd8_cnt,
                client->v4_route_del_cnt,
                client->v6_route_add_cnt + client->v6_route_upd8_cnt,
                client->v6_route_del_cnt);
 }
 
-struct zserv *zebra_find_client(uint8_t proto, unsigned short instance)
+struct zserv *zserv_find_client(uint8_t proto, unsigned short instance)
 {
        struct listnode *node, *nnode;
        struct zserv *client;
@@ -805,13 +1027,17 @@ void zserv_read_file(char *input)
        struct thread t;
 
        zebra_client_create(-1);
-       client = zebrad.client_list->head->data;
+
+       frr_pthread_stop(client->pthread, NULL);
+       frr_pthread_destroy(client->pthread);
+       client->pthread = NULL;
+
        t.arg = client;
 
        fd = open(input, O_RDONLY | O_NONBLOCK);
        t.u.fd = fd;
 
-       zebra_client_read(&t);
+       zserv_read(&t);
 
        close(fd);
 }
@@ -821,7 +1047,10 @@ void zserv_init(void)
 {
        /* Client list init. */
        zebrad.client_list = list_new();
-       zebrad.client_list->del = (void (*)(void *))zebra_client_free;
+       zebrad.client_list->del = (void (*)(void *)) zserv_client_free;
+
+       /* Misc init. */
+       zebrad.sock = -1;
 
        install_element(ENABLE_NODE, &show_zebra_client_cmd);
        install_element(ENABLE_NODE, &show_zebra_client_summary_cmd);
index a5b5acbb3346ea6473c6f6eb3e2f7aba9860153d..78cc200fa8c4576aff24a4669ebba1e1143e44ac 100644 (file)
 
 /* Client structure. */
 struct zserv {
+       /* Client pthread */
+       struct frr_pthread *pthread;
+
        /* Client file descriptor. */
        int sock;
 
        /* Input/output buffer to the client. */
+       pthread_mutex_t ibuf_mtx;
        struct stream_fifo *ibuf_fifo;
+       pthread_mutex_t obuf_mtx;
        struct stream_fifo *obuf_fifo;
 
        /* Private I/O buffers */
@@ -68,9 +73,6 @@ struct zserv {
        struct thread *t_read;
        struct thread *t_write;
 
-       /* Thread for delayed close. */
-       struct thread *t_suicide;
-
        /* default routing table this client munges */
        int rtm_table;
 
@@ -129,15 +131,28 @@ struct zserv {
        uint32_t prefixadd_cnt;
        uint32_t prefixdel_cnt;
 
-       time_t connect_time;
-       time_t last_read_time;
-       time_t last_write_time;
        time_t nh_reg_time;
        time_t nh_dereg_time;
        time_t nh_last_upd_time;
 
-       int last_read_cmd;
-       int last_write_cmd;
+       /*
+        * Session information.
+        *
+        * These are not synchronous with respect to each other. For instance,
+        * last_read_cmd may contain a value that has been read in the future
+        * relative to last_read_time.
+        */
+
+       /* monotime of client creation */
+       _Atomic uint32_t connect_time;
+       /* monotime of last message received */
+       _Atomic uint32_t last_read_time;
+       /* monotime of last message sent */
+       _Atomic uint32_t last_write_time;
+       /* command code of last message read */
+       _Atomic uint16_t last_read_cmd;
+       /* command code of last message written */
+       _Atomic uint16_t last_write_cmd;
 };
 
 #define ZAPI_HANDLER_ARGS                                                      \
@@ -145,8 +160,8 @@ struct zserv {
                struct zebra_vrf *zvrf
 
 /* Hooks for client connect / disconnect */
-DECLARE_HOOK(zapi_client_connect, (struct zserv *client), (client));
-DECLARE_KOOH(zapi_client_close, (struct zserv *client), (client));
+DECLARE_HOOK(zserv_client_connect, (struct zserv *client), (client));
+DECLARE_KOOH(zserv_client_close, (struct zserv *client), (client));
 
 /* Zebra instance */
 struct zebra_t {
@@ -154,6 +169,9 @@ struct zebra_t {
        struct thread_master *master;
        struct list *client_list;
 
+       /* Socket */
+       int sock;
+
        /* default table */
        uint32_t rtm_table_default;
 
@@ -165,18 +183,54 @@ struct zebra_t {
        /* LSP work queue */
        struct work_queue *lsp_process_q;
 
-#define ZEBRA_ZAPI_PACKETS_TO_PROCESS 10
-       uint32_t packets_to_process;
+#define ZEBRA_ZAPI_PACKETS_TO_PROCESS 1000
+       _Atomic uint32_t packets_to_process;
 };
 extern struct zebra_t zebrad;
 extern unsigned int multipath_num;
 
-/* Prototypes. */
+/*
+ * Initialize Zebra API server.
+ *
+ * Installs CLI commands and creates the client list.
+ */
 extern void zserv_init(void);
-extern void zebra_zserv_socket_init(char *path);
-extern int zebra_server_send_message(struct zserv *client, struct stream *msg);
 
-extern struct zserv *zebra_find_client(uint8_t proto, unsigned short instance);
+/*
+ * Start Zebra API server.
+ *
+ * Allocates resources, creates the server socket and begins listening on the
+ * socket.
+ *
+ * path
+ *    where to place the Unix domain socket
+ */
+extern void zserv_start(char *path);
+
+/*
+ * Send a message to a connected Zebra API client.
+ *
+ * client
+ *    the client to send to
+ *
+ * msg
+ *    the message to send
+ */
+extern int zserv_send_message(struct zserv *client, struct stream *msg);
+
+/*
+ * Retrieve a client by its protocol and instance number.
+ *
+ * proto
+ *    protocol number
+ *
+ * instance
+ *    instance number
+ *
+ * Returns:
+ *    The Zebra API client.
+ */
+extern struct zserv *zserv_find_client(uint8_t proto, unsigned short instance);
 
 #if defined(HANDLE_ZAPI_FUZZING)
 extern void zserv_read_file(char *input);