]> git.proxmox.com Git - mirror_frr.git/commitdiff
zebra: handle OPAQUE registrations and forward messages
authorMark Stapp <mjs@voltanet.io>
Tue, 21 Apr 2020 12:41:35 +0000 (08:41 -0400)
committerMark Stapp <mjs@voltanet.io>
Tue, 2 Jun 2020 12:22:24 +0000 (08:22 -0400)
Add initial support to maintain client daemon registrations for
OPAQUE messages. Use the registered zapi client info to forward
copies of OPAQUE messages sent to zebra.

Signed-off-by: Mark Stapp <mjs@voltanet.io>
zebra/zebra_opaque.c

index 570387e78528343946bce385bf5e245518f8f0b9..d1e0497154d20f97c9962a3b81197edca0cb7c5f 100644 (file)
 #include "lib/stream.h"
 #include "zebra/debug.h"
 #include "zebra/zserv.h"
+#include "zebra/zebra_memory.h"
 #include "zebra/zebra_opaque.h"
 
+/* Mem type */
+DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
+
+/* Hash to hold message registration info from zapi clients */
+PREDECL_HASH(opq_regh);
+
+/* Registered client info */
+struct opq_client_reg {
+       int proto;
+       int instance;
+       uint32_t session_id;
+
+       struct opq_client_reg *next;
+       struct opq_client_reg *prev;
+};
+
+/* Opaque message registration info */
+struct opq_msg_reg {
+       struct opq_regh_item item;
+
+       /* Message type */
+       uint32_t type;
+
+       struct opq_client_reg *clients;
+};
+
+/* Registration helper prototypes */
+static uint32_t registration_hash(const struct opq_msg_reg *reg);
+static int registration_compare(const struct opq_msg_reg *reg1,
+                               const struct opq_msg_reg *reg2);
+
+DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
+            registration_hash);
+
+static struct opq_regh_head opq_reg_hash;
+
 /*
  * Globals
  */
@@ -71,6 +108,21 @@ static const char LOG_NAME[] = "Zebra Opaque";
 
 /* Main event loop, processing incoming message queue */
 static int process_messages(struct thread *event);
+static int handle_opq_registration(const struct zmsghdr *hdr,
+                                  struct stream *msg);
+static int handle_opq_unregistration(const struct zmsghdr *hdr,
+                                    struct stream *msg);
+static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
+static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
+static bool opq_client_match(const struct opq_client_reg *client,
+                            const struct zapi_opaque_reg_info *info);
+static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
+static void opq_reg_free(struct opq_msg_reg **reg);
+static struct opq_client_reg *opq_client_alloc(
+       const struct zapi_opaque_reg_info *info);
+static void opq_client_free(struct opq_client_reg **client);
+static const char *opq_client2str(char *buf, size_t buflen,
+                                 const struct opq_client_reg *client);
 
 /*
  * Initialize the module at startup
@@ -139,9 +191,26 @@ void zebra_opaque_stop(void)
  */
 void zebra_opaque_finish(void)
 {
+       struct opq_msg_reg *reg;
+       struct opq_client_reg *client;
+
        if (IS_ZEBRA_DEBUG_EVENT)
                zlog_debug("%s module shutdown", LOG_NAME);
 
+       /* Clear out registration info */
+       while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
+               client = reg->clients;
+               while (client) {
+                       reg->clients = client->next;
+                       opq_client_free(&client);
+                       client = reg->clients;
+               }
+
+               opq_reg_free(&reg);
+       }
+
+       opq_regh_fini(&opq_reg_hash);
+
        pthread_mutex_destroy(&zo_info.mutex);
        stream_fifo_deinit(&zo_info.in_fifo);
 }
@@ -215,7 +284,8 @@ static int process_messages(struct thread *event)
        if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
                goto done;
 
-       /* Dequeue some messages from the incoming queue, temporarily
+       /*
+        * Dequeue some messages from the incoming queue, temporarily
         * save them on the local fifo
         */
        frr_with_mutex(&zo_info.mutex) {
@@ -228,7 +298,8 @@ static int process_messages(struct thread *event)
                        stream_fifo_push(&fifo, msg);
                }
 
-               /* We may need to reschedule, if there are still
+               /*
+                * We may need to reschedule, if there are still
                 * queued messages
                 */
                if (stream_fifo_head(&zo_info.in_fifo) != NULL)
@@ -247,13 +318,12 @@ static int process_messages(struct thread *event)
        if (IS_ZEBRA_DEBUG_RECV)
                zlog_debug("%s: processing %u messages", __func__, i);
 
-       /* Process the messages on the local fifo */
-       /* TODO -- just discarding the messages for now */
-       msg = stream_fifo_pop(&fifo);
-       while (msg) {
-               stream_free(msg);
-               msg = stream_fifo_pop(&fifo);
-       }
+       /*
+        * Process the messages from the temporary fifo. We send the whole
+        * fifo so that we can take advantage of batching internally. Note
+        * that registration/deregistration messages are handled here also.
+        */
+       dispatch_opq_messages(&fifo);
 
 done:
 
@@ -269,3 +339,361 @@ done:
 
        return 0;
 }
+
+/*
+ * Process (dispatch) or drop opaque messages.
+ */
+static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
+{
+       struct stream *msg, *dup;
+       struct zmsghdr hdr;
+       struct opq_msg_reg *reg;
+       uint32_t type;
+       struct opq_client_reg *client;
+       struct zserv *zclient;
+       char buf[50];
+
+       while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
+               zapi_parse_header(msg, &hdr);
+               hdr.length -= ZEBRA_HEADER_SIZE;
+
+               /* Handle client registration messages */
+               if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
+                       handle_opq_registration(&hdr, msg);
+                       continue;
+               } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
+                       handle_opq_unregistration(&hdr, msg);
+                       continue;
+               }
+
+               /* We only process OPAQUE messages - drop anything else */
+               if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
+                       goto drop_it;
+
+               /* Dispatch to any registered ZAPI client(s) */
+
+               /* Extract subtype */
+               STREAM_GETL(msg, type);
+
+               /* Look up registered ZAPI client(s) */
+               reg = opq_reg_lookup(type);
+               if (reg == NULL) {
+                       if (IS_ZEBRA_DEBUG_RECV)
+                               zlog_debug("%s: no registrations for opaque type %u",
+                                          __func__, type);
+                       goto drop_it;
+               }
+
+               /* Reset read pointer, since we'll be re-sending message */
+               stream_set_getp(msg, 0);
+
+               /* Send a copy of the message to all registered clients */
+               for (client = reg->clients; client; client = client->next) {
+                       dup = NULL;
+
+                       /* Copy message if necessary */
+                       if (client->next)
+                               dup = stream_dup(msg);
+
+                       /*
+                        * TODO -- this isn't ideal: we're going through an
+                        * acquire/release cycle for each client for each
+                        * message. Replace this with a batching version.
+                        */
+                       zclient = zserv_acquire_client(client->proto,
+                                                      client->instance,
+                                                      client->session_id);
+                       if (zclient) {
+                               if (IS_ZEBRA_DEBUG_RECV)
+                                       zlog_debug("%s: sending %s to client %s",
+                                                  __func__,
+                                                  (dup ? "dup" : "msg"),
+                                                  opq_client2str(buf,
+                                                                 sizeof(buf),
+                                                                 client));
+
+                               /*
+                                * Sending a message actually means enqueuing
+                                * it for a zapi io pthread to send - so we
+                                * don't touch the message after this call.
+                                */
+                               zserv_send_message(zclient, dup ? dup : msg);
+                               if (dup)
+                                       dup = NULL;
+                               else
+                                       msg = NULL;
+
+                               zserv_release_client(zclient);
+                       } else {
+                               if (IS_ZEBRA_DEBUG_RECV)
+                                       zlog_debug("%s: type %u: no zclient for %s",
+                                                  __func__, type,
+                                                  opq_client2str(buf,
+                                                                 sizeof(buf),
+                                                                 client));
+                               /* Registered but gone? */
+                               if (dup)
+                                       stream_free(dup);
+                       }
+               }
+
+drop_it:
+stream_failure:
+               if (msg)
+                       stream_free(msg);
+       }
+
+       return 0;
+}
+
+/*
+ * Process a register/unregister message
+ */
+static int handle_opq_registration(const struct zmsghdr *hdr,
+                                  struct stream *msg)
+{
+       int ret = 0;
+       struct zapi_opaque_reg_info info;
+       struct opq_client_reg *client;
+       struct opq_msg_reg key, *reg;
+       char buf[50];
+
+       memset(&info, 0, sizeof(info));
+
+       if (zapi_parse_opaque_reg(msg, &info) < 0) {
+               ret = -1;
+               goto done;
+       }
+
+       memset(&key, 0, sizeof(key));
+
+       key.type = info.type;
+
+       reg = opq_regh_find(&opq_reg_hash, &key);
+       if (reg) {
+               /* Look for dup client */
+               for (client = reg->clients; client != NULL;
+                    client = client->next) {
+                       if (opq_client_match(client, &info))
+                               break;
+               }
+
+               if (client) {
+                       /* Oops - duplicate registration? */
+                       if (IS_ZEBRA_DEBUG_RECV)
+                               zlog_debug("%s: duplicate opq reg for client %s",
+                                          __func__,
+                                          opq_client2str(buf, sizeof(buf),
+                                                         client));
+                       goto done;
+               }
+
+               client = opq_client_alloc(&info);
+
+               if (IS_ZEBRA_DEBUG_RECV)
+                       zlog_debug("%s: client %s registers for %u",
+                                  __func__,
+                                  opq_client2str(buf, sizeof(buf), client),
+                                  info.type);
+
+               /* Link client into registration */
+               client->next = reg->clients;
+               if (reg->clients)
+                       reg->clients->prev = client;
+               reg->clients = client;
+       } else {
+               /*
+                * No existing registrations - create one, add the
+                * client, and add registration to hash.
+                */
+               reg = opq_reg_alloc(info.type);
+               client = opq_client_alloc(&info);
+
+               if (IS_ZEBRA_DEBUG_RECV)
+                       zlog_debug("%s: client %s registers for new reg %u",
+                                  __func__,
+                                  opq_client2str(buf, sizeof(buf), client),
+                                  info.type);
+
+               reg->clients = client;
+
+               opq_regh_add(&opq_reg_hash, reg);
+       }
+
+done:
+
+       stream_free(msg);
+       return ret;
+}
+
+/*
+ * Process a register/unregister message
+ */
+static int handle_opq_unregistration(const struct zmsghdr *hdr,
+                                    struct stream *msg)
+{
+       int ret = 0;
+       struct zapi_opaque_reg_info info;
+       struct opq_client_reg *client;
+       struct opq_msg_reg key, *reg;
+       char buf[50];
+
+       memset(&info, 0, sizeof(info));
+
+       if (zapi_parse_opaque_reg(msg, &info) < 0) {
+               ret = -1;
+               goto done;
+       }
+
+       memset(&key, 0, sizeof(key));
+
+       key.type = info.type;
+
+       reg = opq_regh_find(&opq_reg_hash, &key);
+       if (reg == NULL) {
+               /* Weird: unregister for unknown message? */
+               if (IS_ZEBRA_DEBUG_RECV)
+                       zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
+                                  __func__,
+                                  zebra_route_string(info.proto),
+                                  info.instance, info.session_id, info.type);
+               goto done;
+       }
+
+       /* Look for client */
+       for (client = reg->clients; client != NULL;
+            client = client->next) {
+               if (opq_client_match(client, &info))
+                       break;
+       }
+
+       if (client == NULL) {
+               /* Oops - unregister for unknown client? */
+               if (IS_ZEBRA_DEBUG_RECV)
+                       zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
+                                  __func__, zebra_route_string(info.proto),
+                                  info.instance, info.session_id, info.type);
+               goto done;
+       }
+
+       if (IS_ZEBRA_DEBUG_RECV)
+               zlog_debug("%s: client %s unregisters for %u",
+                          __func__, opq_client2str(buf, sizeof(buf), client),
+                          info.type);
+
+       if (client->prev)
+               client->prev->next = client->next;
+       if (client->next)
+               client->next->prev = client->prev;
+       if (reg->clients == client)
+               reg->clients = client->next;
+
+       opq_client_free(&client);
+
+       /* Is registration empty now? */
+       if (reg->clients == NULL) {
+               if (IS_ZEBRA_DEBUG_RECV)
+                       zlog_debug("%s: free empty reg %u", __func__,
+                                  reg->type);
+
+               opq_regh_del(&opq_reg_hash, reg);
+               opq_reg_free(&reg);
+       }
+
+done:
+
+       stream_free(msg);
+       return ret;
+}
+
+/* Compare utility for registered clients */
+static bool opq_client_match(const struct opq_client_reg *client,
+                            const struct zapi_opaque_reg_info *info)
+{
+       if (client->proto == info->proto &&
+           client->instance == info->instance &&
+           client->session_id == info->session_id)
+               return true;
+       else
+               return false;
+}
+
+static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
+{
+       struct opq_msg_reg key, *reg;
+
+       memset(&key, 0, sizeof(key));
+
+       key.type = type;
+
+       reg = opq_regh_find(&opq_reg_hash, &key);
+
+       return reg;
+}
+
+static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
+{
+       struct opq_msg_reg *reg;
+
+       reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
+
+       reg->type = type;
+       INIT_HASH(&reg->item);
+
+       return reg;
+}
+
+static void opq_reg_free(struct opq_msg_reg **reg)
+{
+       XFREE(MTYPE_OPQ, (*reg));
+}
+
+static struct opq_client_reg *opq_client_alloc(
+       const struct zapi_opaque_reg_info *info)
+{
+       struct opq_client_reg *client;
+
+       client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
+
+       client->proto = info->proto;
+       client->instance = info->instance;
+       client->session_id = info->session_id;
+
+       return client;
+}
+
+static void opq_client_free(struct opq_client_reg **client)
+{
+       XFREE(MTYPE_OPQ, (*client));
+}
+
+static const char *opq_client2str(char *buf, size_t buflen,
+                                 const struct opq_client_reg *client)
+{
+       char sbuf[20];
+
+       snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
+                client->instance);
+       if (client->session_id > 0) {
+               snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
+               strlcat(buf, sbuf, buflen);
+       }
+
+       return buf;
+}
+
+/* Hash function for clients registered for messages */
+static uint32_t registration_hash(const struct opq_msg_reg *reg)
+{
+       return reg->type;
+}
+
+/* Comparison function for client registrations */
+static int registration_compare(const struct opq_msg_reg *reg1,
+                               const struct opq_msg_reg *reg2)
+{
+       if (reg1->type == reg2->type)
+               return 0;
+       else
+               return -1;
+}