frr::NAME##Response>( \
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
&HandleUnary##NAME, #NAME); \
- _rpcState->do_request(service, _cq); \
+ _rpcState->do_request(service, s_cq); \
} while (0)
#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \
frr::NAME##Response>( \
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
&HandleStreaming##NAME, #NAME); \
- _rpcState->do_request(service, _cq); \
+ _rpcState->do_request(service, s_cq); \
} while (0)
struct grpc_pthread_attr {
unsigned long port;
};
+// Capture these objects so we can try to shut down cleanly
+static std::unique_ptr<grpc::Server> s_server;
+static grpc::ServerCompletionQueue *s_cq;
+
static void *grpc_pthread_start(void *arg)
{
struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
std::stringstream server_address;
frr::Northbound::AsyncService *service =
new frr::Northbound::AsyncService();
- grpc::ServerCompletionQueue *_cq;
frr_pthread_set_name(fpt);
grpc::InsecureServerCredentials());
builder.RegisterService(service);
auto cq = builder.AddCompletionQueue();
- _cq = cq.get();
- auto server = builder.BuildAndStart();
+ s_cq = cq.get();
+ s_server = builder.BuildAndStart();
/* Schedule all RPC handlers */
REQUEST_NEWRPC(GetCapabilities, NULL);
void *tag;
bool ok;
- _cq->Next(&tag, &ok);
+ s_cq->Next(&tag, &ok);
+ if (!ok)
+ break;
+
grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__,
tag, ok);
- GPR_ASSERT(ok);
RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
CallState state = rpc->doCallback();
* user indicating Finish() for cleanup.
*/
if (state == FINISH)
- rpc->do_request(service, _cq);
+ rpc->do_request(service, s_cq);
}
- /*NOTREACHED*/
return NULL;
}
__func__, safe_strerror(errno));
return -1;
}
- pthread_detach(fpt->thread);
return 0;
}
static int frr_grpc_finish(void)
{
- if (fpt)
+ // Shutdown the grpc server
+ if (s_server) {
+ s_server->Shutdown();
+ s_cq->Shutdown();
+
+ // And drain the queue
+ void *ignore;
+ bool ok;
+
+ while (s_cq->Next(&ignore, &ok))
+ ;
+ }
+
+ if (fpt) {
+ pthread_join(fpt->thread, NULL);
frr_pthread_destroy(fpt);
- // TODO: cancel the gRPC pthreads gracefully.
+ }
+
return 0;
}