]> git.proxmox.com Git - mirror_frr.git/commitdiff
lib: cleanup and stop grpc pthread
authorMark Stapp <mjs@voltanet.io>
Mon, 7 Jun 2021 17:55:11 +0000 (13:55 -0400)
committerMark Stapp <mjs@voltanet.io>
Tue, 15 Jun 2021 15:08:19 +0000 (11:08 -0400)
At shutdown, try to stop the grpc module and its
dedicated pthread cleanly.

Signed-off-by: Mark Stapp <mjs@voltanet.io>
lib/northbound_grpc.cpp

index 807d1252c45bcd4c886322af07a41fd060504dca..71f07dfe86d283697de7eb6e1483ead76707d33d 100644 (file)
@@ -1222,7 +1222,7 @@ void HandleUnaryExecute(
                                                 frr::NAME##Response>(         \
                        (cdb), &frr::Northbound::AsyncService::Request##NAME,  \
                        &HandleUnary##NAME, #NAME);                            \
-               _rpcState->do_request(service, _cq);                           \
+               _rpcState->do_request(service, s_cq);                          \
        } while (0)
 
 #define REQUEST_NEWRPC_STREAMING(NAME, cdb)                                    \
@@ -1231,7 +1231,7 @@ void HandleUnaryExecute(
                                                 frr::NAME##Response>(         \
                        (cdb), &frr::Northbound::AsyncService::Request##NAME,  \
                        &HandleStreaming##NAME, #NAME);                        \
-               _rpcState->do_request(service, _cq);                           \
+               _rpcState->do_request(service, s_cq);                          \
        } while (0)
 
 struct grpc_pthread_attr {
@@ -1239,6 +1239,10 @@ struct grpc_pthread_attr {
        unsigned long port;
 };
 
+// Capture these objects so we can try to shut down cleanly
+static std::unique_ptr<grpc::Server> s_server;
+static grpc::ServerCompletionQueue *s_cq;
+
 static void *grpc_pthread_start(void *arg)
 {
        struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
@@ -1249,7 +1253,6 @@ static void *grpc_pthread_start(void *arg)
        std::stringstream server_address;
        frr::Northbound::AsyncService *service =
                new frr::Northbound::AsyncService();
-       grpc::ServerCompletionQueue *_cq;
 
        frr_pthread_set_name(fpt);
 
@@ -1258,8 +1261,8 @@ static void *grpc_pthread_start(void *arg)
                                 grpc::InsecureServerCredentials());
        builder.RegisterService(service);
        auto cq = builder.AddCompletionQueue();
-       _cq = cq.get();
-       auto server = builder.BuildAndStart();
+       s_cq = cq.get();
+       s_server = builder.BuildAndStart();
 
        /* Schedule all RPC handlers */
        REQUEST_NEWRPC(GetCapabilities, NULL);
@@ -1284,10 +1287,12 @@ static void *grpc_pthread_start(void *arg)
                void *tag;
                bool ok;
 
-               _cq->Next(&tag, &ok);
+               s_cq->Next(&tag, &ok);
+               if (!ok)
+                       break;
+
                grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__,
                           tag, ok);
-               GPR_ASSERT(ok);
 
                RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
                CallState state = rpc->doCallback();
@@ -1302,10 +1307,9 @@ static void *grpc_pthread_start(void *arg)
                 * user indicating Finish() for cleanup.
                 */
                if (state == FINISH)
-                       rpc->do_request(service, _cq);
+                       rpc->do_request(service, s_cq);
        }
 
-       /*NOTREACHED*/
        return NULL;
 }
 
@@ -1326,16 +1330,30 @@ static int frr_grpc_init(uint port)
                         __func__, safe_strerror(errno));
                return -1;
        }
-       pthread_detach(fpt->thread);
 
        return 0;
 }
 
 static int frr_grpc_finish(void)
 {
-       if (fpt)
+       // Shutdown the grpc server
+       if (s_server) {
+               s_server->Shutdown();
+               s_cq->Shutdown();
+
+               // And drain the queue
+               void *ignore;
+               bool ok;
+
+               while (s_cq->Next(&ignore, &ok))
+                       ;
+       }
+
+       if (fpt) {
+               pthread_join(fpt->thread, NULL);
                frr_pthread_destroy(fpt);
-       // TODO: cancel the gRPC pthreads gracefully.
+       }
+
        return 0;
 }