]> git.proxmox.com Git - mirror_frr.git/commitdiff
lib: add short-circuit operation between same process
authorChristian Hopps <chopps@labn.net>
Sat, 6 May 2023 04:09:46 +0000 (00:09 -0400)
committerChristian Hopps <chopps@labn.net>
Sun, 28 May 2023 09:13:22 +0000 (05:13 -0400)
- Use a socketpair for connection, and direct (no event loop)
message sending and handling.

Signed-off-by: Christian Hopps <chopps@labn.net>
lib/mgmt_be_client.c
lib/mgmt_fe_client.c
lib/mgmt_msg.c
lib/mgmt_msg.h
mgmtd/mgmt_be_adapter.c
mgmtd/mgmt_fe_adapter.c

index 0b98f979ec297472239895ae003ffcb580fffb82..29b54690d2aa226dbef299be96095669c8cda26d 100644 (file)
@@ -138,7 +138,7 @@ static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx,
        return msg_conn_send_msg(
                &client_ctx->client.conn, MGMT_MSG_VERSION_PROTOBUF, be_msg,
                mgmtd__be_message__get_packed_size(be_msg),
-               (size_t(*)(void *, void *))mgmtd__be_message__pack);
+               (size_t(*)(void *, void *))mgmtd__be_message__pack, false);
 }
 
 static struct mgmt_be_batch_ctx *
@@ -966,7 +966,7 @@ uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params,
                        MGMTD_BE_SERVER_PATH, mgmt_be_client_notify_conenct,
                        mgmt_be_client_notify_disconenct,
                        mgmt_be_client_process_msg, MGMTD_BE_MAX_NUM_MSG_PROC,
-                       MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN,
+                       MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN, false,
                        "BE-client", MGMTD_DBG_BE_CLIENT_CHECK());
 
        MGMTD_BE_CLIENT_DBG("Initialized client '%s'", params->name);
index cf031dcac3c103dd932d4c66f2afd936cf01c809..9f87c387840e22c10bcc4e46c80534f3931f6c2f 100644 (file)
@@ -98,12 +98,14 @@ mgmt_fe_find_session_by_session_id(struct mgmt_fe_client_ctx *client_ctx,
 }
 
 static int mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx,
-                                  Mgmtd__FeMessage *fe_msg)
+                                  Mgmtd__FeMessage *fe_msg,
+                                  bool short_circuit_ok)
 {
        return msg_conn_send_msg(
                &client_ctx->client.conn, MGMT_MSG_VERSION_PROTOBUF, fe_msg,
                mgmtd__fe_message__get_packed_size(fe_msg),
-               (size_t(*)(void *, void *))mgmtd__fe_message__pack);
+               (size_t(*)(void *, void *))mgmtd__fe_message__pack,
+               short_circuit_ok);
 }
 
 static int mgmt_fe_send_register_req(struct mgmt_fe_client_ctx *client_ctx)
@@ -121,7 +123,7 @@ static int mgmt_fe_send_register_req(struct mgmt_fe_client_ctx *client_ctx)
        MGMTD_FE_CLIENT_DBG(
                "Sending REGISTER_REQ message to MGMTD Frontend server");
 
-       return mgmt_fe_client_send_msg(client_ctx, &fe_msg);
+       return mgmt_fe_client_send_msg(client_ctx, &fe_msg, true);
 }
 
 static int mgmt_fe_send_session_req(struct mgmt_fe_client_ctx *client_ctx,
@@ -130,15 +132,18 @@ static int mgmt_fe_send_session_req(struct mgmt_fe_client_ctx *client_ctx,
 {
        Mgmtd__FeMessage fe_msg;
        Mgmtd__FeSessionReq sess_req;
+       bool scok;
 
        mgmtd__fe_session_req__init(&sess_req);
        sess_req.create = create;
        if (create) {
                sess_req.id_case = MGMTD__FE_SESSION_REQ__ID_CLIENT_CONN_ID;
                sess_req.client_conn_id = session->client_id;
+               scok = true;
        } else {
                sess_req.id_case = MGMTD__FE_SESSION_REQ__ID_SESSION_ID;
                sess_req.session_id = session->session_id;
+               scok = false;
        }
 
        mgmtd__fe_message__init(&fe_msg);
@@ -149,7 +154,7 @@ static int mgmt_fe_send_session_req(struct mgmt_fe_client_ctx *client_ctx,
                "Sending SESSION_REQ %s message for client-id %" PRIu64,
                create ? "create" : "destroy", session->client_id);
 
-       return mgmt_fe_client_send_msg(client_ctx, &fe_msg);
+       return mgmt_fe_client_send_msg(client_ctx, &fe_msg, scok);
 }
 
 static int mgmt_fe_send_lockds_req(struct mgmt_fe_client_ctx *client_ctx,
@@ -174,7 +179,7 @@ static int mgmt_fe_send_lockds_req(struct mgmt_fe_client_ctx *client_ctx,
                "Sending %sLOCK_REQ message for Ds:%d session-id %" PRIu64,
                lock ? "" : "UN", ds_id, session_id);
 
-       return mgmt_fe_client_send_msg(client_ctx, &fe_msg);
+       return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false);
 }
 
 static int mgmt_fe_send_setcfg_req(struct mgmt_fe_client_ctx *client_ctx,
@@ -206,7 +211,7 @@ static int mgmt_fe_send_setcfg_req(struct mgmt_fe_client_ctx *client_ctx,
                " (#xpaths:%d)",
                ds_id, session_id, num_data_reqs);
 
-       return mgmt_fe_client_send_msg(client_ctx, &fe_msg);
+       return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false);
 }
 
 static int mgmt_fe_send_commitcfg_req(struct mgmt_fe_client_ctx *client_ctx,
@@ -235,7 +240,7 @@ static int mgmt_fe_send_commitcfg_req(struct mgmt_fe_client_ctx *client_ctx,
                "Sending COMMIT_CONFIG_REQ message for Src-Ds:%d, Dst-Ds:%d session-id %" PRIu64,
                src_ds_id, dest_ds_id, session_id);
 
-       return mgmt_fe_client_send_msg(client_ctx, &fe_msg);
+       return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false);
 }
 
 static int mgmt_fe_send_getcfg_req(struct mgmt_fe_client_ctx *client_ctx,
@@ -264,7 +269,7 @@ static int mgmt_fe_send_getcfg_req(struct mgmt_fe_client_ctx *client_ctx,
                " (#xpaths:%d)",
                ds_id, session_id, num_data_reqs);
 
-       return mgmt_fe_client_send_msg(client_ctx, &fe_msg);
+       return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false);
 }
 
 static int mgmt_fe_send_getdata_req(struct mgmt_fe_client_ctx *client_ctx,
@@ -293,7 +298,7 @@ static int mgmt_fe_send_getdata_req(struct mgmt_fe_client_ctx *client_ctx,
                " (#xpaths:%d)",
                ds_id, session_id, num_data_reqs);
 
-       return mgmt_fe_client_send_msg(client_ctx, &fe_msg);
+       return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false);
 }
 
 static int mgmt_fe_send_regnotify_req(struct mgmt_fe_client_ctx *client_ctx,
@@ -318,7 +323,7 @@ static int mgmt_fe_send_regnotify_req(struct mgmt_fe_client_ctx *client_ctx,
        fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_REGNOTIFY_REQ;
        fe_msg.regnotify_req = &regntfy_req;
 
-       return mgmt_fe_client_send_msg(client_ctx, &fe_msg);
+       return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false);
 }
 
 static int mgmt_fe_client_handle_msg(struct mgmt_fe_client_ctx *client_ctx,
@@ -640,7 +645,7 @@ uintptr_t mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params,
                        MGMTD_FE_SERVER_PATH, mgmt_fe_client_notify_connect,
                        mgmt_fe_client_notify_disconnect,
                        mgmt_fe_client_process_msg, MGMTD_FE_MAX_NUM_MSG_PROC,
-                       MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN,
+                       MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN, false,
                        "FE-client", MGMTD_DBG_FE_CLIENT_CHECK());
 
        MGMTD_FE_CLIENT_DBG("Initialized client '%s'", params->name);
index 03e896a0830c3233dbd1f7a7cfd99308ade9a858..0d9802a2b364b29f08cdcbdda6fc85b024bdf0d3 100644 (file)
@@ -507,6 +507,12 @@ static void msg_conn_sched_proc_msgs(struct msg_conn *conn)
 void msg_conn_disconnect(struct msg_conn *conn, bool reconnect)
 {
 
+       /* disconnect short-circuit if present */
+       if (conn->remote_conn) {
+               conn->remote_conn->remote_conn = NULL;
+               conn->remote_conn = NULL;
+       }
+
        if (conn->fd != -1) {
                close(conn->fd);
                conn->fd = -1;
@@ -525,14 +531,41 @@ void msg_conn_disconnect(struct msg_conn *conn, bool reconnect)
 }
 
 int msg_conn_send_msg(struct msg_conn *conn, uint8_t version, void *msg,
-                     size_t mlen, size_t (*packf)(void *, void *))
+                     size_t mlen, size_t (*packf)(void *, void *),
+                     bool short_circuit_ok)
 {
+       const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
+
        if (conn->fd == -1) {
                MGMT_MSG_ERR(&conn->mstate,
                             "can't send message on closed connection");
                return -1;
        }
 
+       /* immediately handle the message if short-circuit is present */
+       if (conn->remote_conn && short_circuit_ok) {
+               uint8_t *buf = msg;
+               size_t n = mlen;
+
+               if (packf) {
+                       buf = XMALLOC(MTYPE_TMP, mlen);
+                       n = packf(msg, buf);
+               }
+
+               MGMT_MSG_DBG(dbgtag, "SC send: depth %u msg: %p",
+                            ++conn->short_circuit_depth, msg);
+
+               conn->remote_conn->handle_msg(version, buf, n,
+                                             conn->remote_conn);
+
+               MGMT_MSG_DBG(dbgtag, "SC return from depth: %u msg: %p",
+                            conn->short_circuit_depth--, msg);
+
+               if (packf)
+                       XFREE(MTYPE_TMP, buf);
+               return 0;
+       }
+
        int rv = mgmt_msg_send_msg(&conn->mstate, version, msg, mlen, packf,
                                   conn->debug);
 
@@ -545,6 +578,12 @@ void msg_conn_cleanup(struct msg_conn *conn)
 {
        struct mgmt_msg_state *ms = &conn->mstate;
 
+       /* disconnect short-circuit if present */
+       if (conn->remote_conn) {
+               conn->remote_conn->remote_conn = NULL;
+               conn->remote_conn = NULL;
+       }
+
        if (conn->fd != -1) {
                close(conn->fd);
                conn->fd = -1;
@@ -561,6 +600,10 @@ void msg_conn_cleanup(struct msg_conn *conn)
  * Client Connections
  */
 
+DECLARE_LIST(msg_server_list, struct msg_server, link);
+
+static struct msg_server_list_head msg_servers;
+
 static void msg_client_connect(struct msg_client *conn);
 
 static void msg_client_connect_timer(struct event *thread)
@@ -583,6 +626,64 @@ static void msg_client_sched_connect(struct msg_client *client,
                                &client->conn_retry_tmr);
 }
 
+static bool msg_client_connect_short_circuit(struct msg_client *client)
+{
+       struct msg_conn *server_conn;
+       struct msg_server *server;
+       const char *dbgtag =
+               client->conn.debug ? client->conn.mstate.idtag : NULL;
+       union sockunion su = {0};
+       int sockets[2];
+
+       frr_each (msg_server_list, &msg_servers, server)
+               if (!strcmp(server->sopath, client->sopath))
+                       break;
+       if (!server) {
+               MGMT_MSG_DBG(dbgtag,
+                            "no short-circuit connection available for %s",
+                            client->sopath);
+
+               return false;
+       }
+
+       if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets)) {
+               MGMT_MSG_ERR(
+                       &client->conn.mstate,
+                       "socketpair failed trying to short-circuit connection on %s: %s",
+                       client->sopath, safe_strerror(errno));
+               return false;
+       }
+
+       /* client side */
+       client->conn.fd = sockets[0];
+       set_nonblocking(sockets[0]);
+       setsockopt_so_sendbuf(sockets[0], client->conn.mstate.max_write_buf);
+       setsockopt_so_recvbuf(sockets[0], client->conn.mstate.max_read_buf);
+       client->conn.is_short_circuit = true;
+
+       /* server side */
+       memset(&su, 0, sizeof(union sockunion));
+       server_conn = server->create(sockets[1], &su);
+       server_conn->is_short_circuit = true;
+
+       client->conn.remote_conn = server_conn;
+       server_conn->remote_conn = &client->conn;
+
+       MGMT_MSG_DBG(
+               dbgtag,
+               "short-circuit connection on %s server %s:%d to client %s:%d",
+               client->sopath, server_conn->mstate.idtag, server_conn->fd,
+               client->conn.mstate.idtag, client->conn.fd);
+
+       MGMT_MSG_DBG(
+               server_conn->debug ? server_conn->mstate.idtag : NULL,
+               "short-circuit connection on %s client %s:%d to server %s:%d",
+               client->sopath, client->conn.mstate.idtag, client->conn.fd,
+               server_conn->mstate.idtag, server_conn->fd);
+
+       return true;
+}
+
 
 /* Connect and start reading from the socket */
 static void msg_client_connect(struct msg_client *client)
@@ -590,8 +691,11 @@ static void msg_client_connect(struct msg_client *client)
        struct msg_conn *conn = &client->conn;
        const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
 
-       conn->fd = mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE,
-                                   MSG_CONN_RECV_BUF_SIZE, dbgtag);
+       if (!client->short_circuit_ok ||
+           !msg_client_connect_short_circuit(client))
+               conn->fd =
+                       mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE,
+                                        MSG_CONN_RECV_BUF_SIZE, dbgtag);
 
        if (conn->fd == -1)
                /* retry the connection */
@@ -612,7 +716,8 @@ void msg_client_init(struct msg_client *client, struct event_loop *tm,
                     void (*handle_msg)(uint8_t version, uint8_t *data,
                                        size_t len, struct msg_conn *client),
                     size_t max_read_buf, size_t max_write_buf,
-                    size_t max_msg_sz, const char *idtag, bool debug)
+                    size_t max_msg_sz, bool short_circuit_ok,
+                    const char *idtag, bool debug)
 {
        struct msg_conn *conn = &client->conn;
        memset(client, 0, sizeof(*client));
@@ -623,6 +728,7 @@ void msg_client_init(struct msg_client *client, struct event_loop *tm,
        conn->notify_disconnect = notify_disconnect;
        conn->is_client = true;
        conn->debug = debug;
+       client->short_circuit_ok = short_circuit_ok;
        client->sopath = strdup(sopath);
        client->notify_connect = notify_connect;
 
@@ -726,6 +832,8 @@ int msg_server_init(struct msg_server *server, const char *sopath,
        server->create = create;
        server->debug = debug;
 
+       msg_server_list_add_head(&msg_servers, server);
+
        event_add_read(server->loop, msg_server_accept, server, server->fd,
                       &server->listen_ev);
 
@@ -746,6 +854,9 @@ void msg_server_cleanup(struct msg_server *server)
 
        if (server->listen_ev)
                EVENT_OFF(server->listen_ev);
+
+       msg_server_list_del(&msg_servers, server);
+
        if (server->fd >= 0)
                close(server->fd);
        free((char *)server->sopath);
index 79b1e44c176ff693a9a2fc5213d521054a073db9..9fdcb9ecd35621e02cd3dae3e09e4e36e4f67fa6 100644 (file)
@@ -92,11 +92,14 @@ struct msg_conn {
        struct event *read_ev;
        struct event *write_ev;
        struct event *proc_msg_ev;
+       struct msg_conn *remote_conn;
        int (*notify_disconnect)(struct msg_conn *conn);
        void (*handle_msg)(uint8_t version, uint8_t *data, size_t len,
                           struct msg_conn *conn);
        void *user;
+       uint short_circuit_depth;
        bool is_client;
+       bool is_short_circuit;
        bool debug;
 };
 
@@ -110,7 +113,8 @@ extern void msg_conn_cleanup(struct msg_conn *conn);
 extern void msg_conn_disconnect(struct msg_conn *conn, bool reconnect);
 extern int msg_conn_send_msg(struct msg_conn *client, uint8_t version,
                             void *msg, size_t mlen,
-                            size_t (*packf)(void *, void *));
+                            size_t (*packf)(void *, void *),
+                            bool short_circuit_ok);
 
 /*
  * Client-side Connections
@@ -121,6 +125,7 @@ struct msg_client {
        struct event *conn_retry_tmr;
        char *sopath;
        int (*notify_connect)(struct msg_client *client);
+       bool short_circuit_ok;
 };
 
 /*
@@ -135,23 +140,26 @@ extern void msg_client_cleanup(struct msg_client *client);
  * called for a client which is currently connected. The socket is closed
  * but there is no notification.
  */
-extern void msg_client_init(struct msg_client *client, struct event_loop *tm,
-                           const char *sopath,
-                           int (*notify_connect)(struct msg_client *client),
-                           int (*notify_disconnect)(struct msg_conn *client),
-                           void (*handle_msg)(uint8_t version, uint8_t *data,
-                                              size_t len,
-                                              struct msg_conn *client),
-                           size_t max_read_buf, size_t max_write_buf,
-                           size_t max_msg_sz, const char *idtag, bool debug);
+extern void
+msg_client_init(struct msg_client *client, struct event_loop *tm,
+               const char *sopath,
+               int (*notify_connect)(struct msg_client *client),
+               int (*notify_disconnect)(struct msg_conn *client),
+               void (*handle_msg)(uint8_t version, uint8_t *data, size_t len,
+                                  struct msg_conn *client),
+               size_t max_read_buf, size_t max_write_buf, size_t max_msg_sz,
+               bool short_circuit_ok, const char *idtag, bool debug);
 
 /*
  * Server-side Connections
  */
 #define MGMTD_MAX_CONN 32
 
+PREDECL_LIST(msg_server_list);
+
 struct msg_server {
        int fd;
+       struct msg_server_list_item link;
        struct event_loop *loop;
        struct event *listen_ev;
        const char *sopath;
index da67511e02f032032c57b6dfb4da2209ec09f13f..0eba9120ec9da51a1010e73cd719daa173db4c9d 100644 (file)
@@ -489,7 +489,7 @@ static int mgmt_be_adapter_send_msg(struct mgmt_be_client_adapter *adapter,
        return msg_conn_send_msg(
                adapter->conn, MGMT_MSG_VERSION_PROTOBUF, be_msg,
                mgmtd__be_message__get_packed_size(be_msg),
-               (size_t(*)(void *, void *))mgmtd__be_message__pack);
+               (size_t(*)(void *, void *))mgmtd__be_message__pack, false);
 }
 
 static int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter,
index b9753e21dc4755c7142dbd0897f9baaa1264aed6..996d9a2306955cce36b0942ca7871a0650fa7504 100644 (file)
@@ -327,12 +327,14 @@ mgmt_fe_create_session(struct mgmt_fe_client_adapter *adapter,
 }
 
 static int mgmt_fe_adapter_send_msg(struct mgmt_fe_client_adapter *adapter,
-                                   Mgmtd__FeMessage *fe_msg)
+                                   Mgmtd__FeMessage *fe_msg,
+                                   bool short_circuit_ok)
 {
        return msg_conn_send_msg(
                adapter->conn, MGMT_MSG_VERSION_PROTOBUF, fe_msg,
                mgmtd__fe_message__get_packed_size(fe_msg),
-               (size_t(*)(void *, void *))mgmtd__fe_message__pack);
+               (size_t(*)(void *, void *))mgmtd__fe_message__pack,
+               short_circuit_ok);
 }
 
 static int
@@ -360,7 +362,7 @@ mgmt_fe_send_session_reply(struct mgmt_fe_client_adapter *adapter,
                "Sending SESSION_REPLY message to MGMTD Frontend client '%s'",
                adapter->name);
 
-       return mgmt_fe_adapter_send_msg(adapter, &fe_msg);
+       return mgmt_fe_adapter_send_msg(adapter, &fe_msg, true);
 }
 
 static int mgmt_fe_send_lockds_reply(struct mgmt_fe_session_ctx *session,
@@ -390,7 +392,7 @@ static int mgmt_fe_send_lockds_reply(struct mgmt_fe_session_ctx *session,
                "Sending LOCK_DS_REPLY message to MGMTD Frontend client '%s'",
                session->adapter->name);
 
-       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg);
+       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false);
 }
 
 static int mgmt_fe_send_setcfg_reply(struct mgmt_fe_session_ctx *session,
@@ -436,7 +438,7 @@ static int mgmt_fe_send_setcfg_reply(struct mgmt_fe_session_ctx *session,
                gettimeofday(&session->adapter->setcfg_stats.last_end, NULL);
        mgmt_fe_adapter_compute_set_cfg_timers(&session->adapter->setcfg_stats);
 
-       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg);
+       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false);
 }
 
 static int mgmt_fe_send_commitcfg_reply(
@@ -482,7 +484,7 @@ static int mgmt_fe_send_commitcfg_reply(
        if (mm->perf_stats_en)
                gettimeofday(&session->adapter->cmt_stats.last_end, NULL);
        mgmt_fe_session_compute_commit_timers(&session->adapter->cmt_stats);
-       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg);
+       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false);
 }
 
 static int mgmt_fe_send_getcfg_reply(struct mgmt_fe_session_ctx *session,
@@ -520,7 +522,7 @@ static int mgmt_fe_send_getcfg_reply(struct mgmt_fe_session_ctx *session,
                mgmt_fe_session_register_event(
                        session, MGMTD_FE_SESSION_SHOW_TXN_CLNUP);
 
-       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg);
+       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false);
 }
 
 static int mgmt_fe_send_getdata_reply(struct mgmt_fe_session_ctx *session,
@@ -558,7 +560,7 @@ static int mgmt_fe_send_getdata_reply(struct mgmt_fe_session_ctx *session,
                mgmt_fe_session_register_event(
                        session, MGMTD_FE_SESSION_SHOW_TXN_CLNUP);
 
-       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg);
+       return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false);
 }
 
 static void mgmt_fe_session_cfg_txn_clnup(struct event *thread)