]> git.proxmox.com Git - mirror_frr.git/blame - lib/northbound_grpc.cpp
Merge pull request #12837 from donaldsharp/unlikely_routemap
[mirror_frr.git] / lib / northbound_grpc.cpp
CommitLineData
47a3a827 1// SPDX-License-Identifier: GPL-2.0-or-later
ec2ac5f2 2//
c85ecd64 3// Copyright (c) 2021-2022, LabN Consulting, L.L.C
ec2ac5f2
RW
4// Copyright (C) 2019 NetDEF, Inc.
5// Renato Westphal
6//
ec2ac5f2
RW
7
8#include <zebra.h>
63d12a7d
QY
9#include <grpcpp/grpcpp.h>
10#include "grpc/frr-northbound.grpc.pb.h"
ec2ac5f2
RW
11
12#include "log.h"
13#include "libfrr.h"
09781197 14#include "lib/version.h"
24a58196 15#include "frrevent.h"
ec2ac5f2
RW
16#include "command.h"
17#include "lib_errors.h"
18#include "northbound.h"
19#include "northbound_db.h"
0edcb505 20#include "frr_pthread.h"
ec2ac5f2
RW
21
22#include <iostream>
23#include <sstream>
24#include <memory>
25#include <string>
26
ec2ac5f2
RW
27#define GRPC_DEFAULT_PORT 50051
28
48c93061
CH
29
30// ------------------------------------------------------
31// File Local Variables
32// ------------------------------------------------------
33
ec2ac5f2
RW
34/*
35 * NOTE: we can't use the FRR debugging infrastructure here since it uses
36 * atomics and C++ has a different atomics API. Enable gRPC debugging
37 * unconditionally until we figure out a way to solve this problem.
38 */
b680134e
CH
39static bool nb_dbg_client_grpc = 0;
40
cd9d0537 41static struct event_loop *main_master;
ec2ac5f2 42
0edcb505
CS
43static struct frr_pthread *fpt;
44
83f6fce7
CH
45static bool grpc_running;
46
b680134e
CH
47#define grpc_debug(...) \
48 do { \
49 if (nb_dbg_client_grpc) \
50 zlog_debug(__VA_ARGS__); \
51 } while (0)
52
53// ------------------------------------------------------
54// New Types
55// ------------------------------------------------------
56
57enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED };
58const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"};
59
60struct candidate {
61 uint64_t id;
62 struct nb_config *config;
63 struct nb_transaction *transaction;
0edcb505 64};
ec2ac5f2 65
b680134e
CH
66class Candidates
67{
68 public:
69 ~Candidates(void)
70 {
71 // Delete candidates.
72 for (auto it = _cdb.begin(); it != _cdb.end(); it++)
d3074a52 73 delete_candidate(it->first);
b680134e
CH
74 }
75
76 struct candidate *create_candidate(void)
77 {
78 uint64_t id = ++_next_id;
79 assert(id); // TODO: implement an algorithm for unique reusable
80 // IDs.
81 struct candidate *c = &_cdb[id];
82 c->id = id;
83 c->config = nb_config_dup(running_config);
84 c->transaction = NULL;
85
86 return c;
87 }
88
d3074a52 89 bool contains(uint64_t candidate_id)
b680134e 90 {
d3074a52
CH
91 return _cdb.count(candidate_id) > 0;
92 }
93
94 void delete_candidate(uint64_t candidate_id)
95 {
96 struct candidate *c = &_cdb[candidate_id];
b680134e
CH
97 char errmsg[BUFSIZ] = {0};
98
b680134e
CH
99 nb_config_free(c->config);
100 if (c->transaction)
101 nb_candidate_commit_abort(c->transaction, errmsg,
102 sizeof(errmsg));
96d434f8 103 _cdb.erase(c->id);
b680134e
CH
104 }
105
d3074a52 106 struct candidate *get_candidate(uint64_t id)
b680134e
CH
107 {
108 return _cdb.count(id) == 0 ? NULL : &_cdb[id];
109 }
110
111 private:
112 uint64_t _next_id = 0;
d3074a52 113 std::map<uint64_t, struct candidate> _cdb;
b680134e 114};
ecf9fb30 115
48c93061
CH
116/*
117 * RpcStateBase is the common base class used to track a gRPC RPC.
118 */
ecf9fb30
QY
119class RpcStateBase
120{
121 public:
b680134e 122 virtual void do_request(::frr::Northbound::AsyncService *service,
83f6fce7
CH
123 ::grpc::ServerCompletionQueue *cq,
124 bool no_copy) = 0;
ecf9fb30 125
48c93061 126 RpcStateBase(const char *name) : name(name){};
b680134e 127
48c93061
CH
128 virtual ~RpcStateBase() = default;
129
130 CallState get_state() const
131 {
132 return state;
133 }
134
135 bool is_initial_process() const
136 {
137 /* Will always be true for Unary */
138 return entered_state == CREATE;
139 }
140
141 // Returns "more" status, if false caller can delete
142 bool run(frr::Northbound::AsyncService *service,
143 grpc::ServerCompletionQueue *cq)
b680134e 144 {
b680134e 145 /*
48c93061
CH
146 * We enter in either CREATE or MORE state, and transition to
147 * PROCESS state.
148 */
149 this->entered_state = this->state;
150 this->state = PROCESS;
151 grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name,
152 call_states[this->entered_state],
153 call_states[this->state]);
154 /*
155 * We schedule the callback on the main pthread, and wait for
156 * the state to transition out of the PROCESS state. The new
157 * state will either be MORE or FINISH. It will always be FINISH
158 * for Unary RPCs.
b680134e 159 */
907a2395 160 event_add_event(main_master, c_callback, (void *)this, 0, NULL);
48c93061 161
b680134e 162 pthread_mutex_lock(&this->cmux);
48c93061 163 while (this->state == PROCESS)
b680134e
CH
164 pthread_cond_wait(&this->cond, &this->cmux);
165 pthread_mutex_unlock(&this->cmux);
166
48c93061
CH
167 grpc_debug("%s RPC in %s on grpc-io-thread", name,
168 call_states[this->state]);
169
170 if (this->state == FINISH) {
171 /*
172 * Server is done (FINISH) so prep to receive a new
173 * request of this type. We could do this earlier but
174 * that would mean we could be handling multiple same
175 * type requests in parallel without limit.
176 */
177 this->do_request(service, cq, false);
b680134e 178 }
48c93061 179 return true;
b680134e
CH
180 }
181
48c93061 182 protected:
e6685141 183 virtual CallState run_mainthread(struct event *thread) = 0;
b680134e 184
e6685141 185 static void c_callback(struct event *thread)
b680134e 186 {
e16d030c 187 auto _tag = static_cast<RpcStateBase *>(EVENT_ARG(thread));
b680134e
CH
188 /*
189 * We hold the lock until the callback finishes and has updated
190 * _tag->state, then we signal done and release.
191 */
192 pthread_mutex_lock(&_tag->cmux);
193
194 CallState enter_state = _tag->state;
48c93061
CH
195 grpc_debug("%s RPC: running %s on main thread", _tag->name,
196 call_states[enter_state]);
b680134e 197
48c93061 198 _tag->state = _tag->run_mainthread(thread);
b680134e 199
48c93061 200 grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name,
b680134e
CH
201 call_states[enter_state], call_states[_tag->state]);
202
203 pthread_cond_signal(&_tag->cond);
204 pthread_mutex_unlock(&_tag->cmux);
cc9f21da 205 return;
ecf9fb30
QY
206 }
207
208 grpc::ServerContext ctx;
48c93061
CH
209 pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
210 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
211 CallState state = CREATE;
ad6bd536 212 CallState entered_state = CREATE;
48c93061
CH
213
214 public:
215 const char *name;
216};
217
218/*
219 * The UnaryRpcState class is used to track the execution of a Unary RPC.
220 *
221 * Template Args:
222 * Q - the request type for a given unary RPC
223 * S - the response type for a given unary RPC
224 */
225template <typename Q, typename S> class UnaryRpcState : public RpcStateBase
226{
227 public:
228 typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
229 ::grpc::ServerContext *, Q *,
230 ::grpc::ServerAsyncResponseWriter<S> *,
231 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
232 void *);
233
234 UnaryRpcState(Candidates *cdb, reqfunc_t rfunc,
235 grpc::Status (*cb)(UnaryRpcState<Q, S> *),
236 const char *name)
237 : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb),
238 responder(&ctx){};
239
240 void do_request(::frr::Northbound::AsyncService *service,
241 ::grpc::ServerCompletionQueue *cq,
242 bool no_copy) override
243 {
244 grpc_debug("%s, posting a request for: %s", __func__, name);
245 auto copy = no_copy ? this
246 : new UnaryRpcState(cdb, requestf, callback,
247 name);
248 (service->*requestf)(&copy->ctx, &copy->request,
249 &copy->responder, cq, cq, copy);
250 }
251
e6685141 252 CallState run_mainthread(struct event *thread) override
48c93061
CH
253 {
254 // Unary RPC are always finished, see "Unary" :)
255 grpc::Status status = this->callback(this);
256 responder.Finish(response, status, this);
257 return FINISH;
258 }
259
260 Candidates *cdb;
261
ecf9fb30
QY
262 Q request;
263 S response;
264 grpc::ServerAsyncResponseWriter<S> responder;
ecf9fb30 265
48c93061 266 grpc::Status (*callback)(UnaryRpcState<Q, S> *);
c85ecd64 267 reqfunc_t requestf = NULL;
48c93061 268};
ecf9fb30 269
48c93061
CH
270/*
271 * The StreamRpcState class is used to track the execution of a Streaming RPC.
272 *
273 * Template Args:
274 * Q - the request type for a given streaming RPC
275 * S - the response type for a given streaming RPC
276 * X - the type used to track the streaming state
277 */
278template <typename Q, typename S, typename X>
279class StreamRpcState : public RpcStateBase
280{
281 public:
282 typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
283 ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
284 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
285 void *);
b680134e 286
48c93061
CH
287 StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *),
288 const char *name)
289 : RpcStateBase(name), requestsf(rfunc), callback(cb),
290 async_responder(&ctx){};
291
292 void do_request(::frr::Northbound::AsyncService *service,
293 ::grpc::ServerCompletionQueue *cq,
294 bool no_copy) override
295 {
296 grpc_debug("%s, posting a request for: %s", __func__, name);
297 auto copy =
298 no_copy ? this
299 : new StreamRpcState(requestsf, callback, name);
300 (service->*requestsf)(&copy->ctx, &copy->request,
301 &copy->async_responder, cq, cq, copy);
302 }
303
e6685141 304 CallState run_mainthread(struct event *thread) override
48c93061
CH
305 {
306 if (this->callback(this))
307 return MORE;
308 else
309 return FINISH;
310 }
311
312 Q request;
313 S response;
314 grpc::ServerAsyncWriter<S> async_responder;
315
316 bool (*callback)(StreamRpcState<Q, S, X> *);
317 reqsfunc_t requestsf = NULL;
318
319 X context;
ecf9fb30
QY
320};
321
b680134e
CH
322// ------------------------------------------------------
323// Utility Functions
324// ------------------------------------------------------
ecf9fb30 325
b680134e
CH
326static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
327{
328 switch (encoding) {
329 case frr::JSON:
330 return LYD_JSON;
331 case frr::XML:
332 return LYD_XML;
333 default:
334 flog_err(EC_LIB_DEVELOPMENT,
335 "%s: unknown data encoding format (%u)", __func__,
336 encoding);
337 exit(1);
338 }
339}
ecf9fb30 340
b680134e 341static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path,
fe095adc 342 const char *value)
ec2ac5f2 343{
fe095adc
CH
344 LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value,
345 LYD_NEW_PATH_UPDATE, &dnode);
b680134e
CH
346 if (err != LY_SUCCESS) {
347 flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",
348 __func__, ly_errmsg(ly_native_ctx));
349 return -1;
ec2ac5f2
RW
350 }
351
b680134e
CH
352 return 0;
353}
354
355static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path)
356{
357 dnode = yang_dnode_get(dnode, path.c_str());
358 if (!dnode)
359 return -1;
360
361 lyd_free_tree(dnode);
362
363 return 0;
364}
365
366static LY_ERR data_tree_from_dnode(frr::DataTree *dt,
367 const struct lyd_node *dnode,
368 LYD_FORMAT lyd_format, bool with_defaults)
369{
370 char *strp;
371 int options = 0;
372
373 SET_FLAG(options, LYD_PRINT_WITHSIBLINGS);
374 if (with_defaults)
375 SET_FLAG(options, LYD_PRINT_WD_ALL);
376 else
377 SET_FLAG(options, LYD_PRINT_WD_TRIM);
378
379 LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options);
380 if (err == LY_SUCCESS) {
381 if (strp) {
382 dt->set_data(strp);
383 free(strp);
384 }
ec2ac5f2 385 }
b680134e
CH
386 return err;
387}
ec2ac5f2 388
b680134e
CH
389static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt,
390 bool config_only)
391{
392 struct lyd_node *dnode;
393 int options, opt2;
394 LY_ERR err;
395
396 if (config_only) {
397 options = LYD_PARSE_NO_STATE;
398 opt2 = LYD_VALIDATE_NO_STATE;
399 } else {
400 options = LYD_PARSE_STRICT;
401 opt2 = 0;
402 }
403
404 err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(),
405 encoding2lyd_format(dt->encoding()), options,
406 opt2, &dnode);
407 if (err != LY_SUCCESS) {
408 flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s",
409 __func__, ly_errmsg(ly_native_ctx));
410 }
411 return dnode;
412}
413
414static struct lyd_node *get_dnode_config(const std::string &path)
415{
416 struct lyd_node *dnode;
417
0f538858
RZ
418 if (!yang_dnode_exists(running_config->dnode,
419 path.empty() ? NULL : path.c_str()))
420 return NULL;
421
b680134e
CH
422 dnode = yang_dnode_get(running_config->dnode,
423 path.empty() ? NULL : path.c_str());
424 if (dnode)
425 dnode = yang_dnode_dup(dnode);
426
427 return dnode;
428}
429
430static int get_oper_data_cb(const struct lysc_node *snode,
431 struct yang_translator *translator,
432 struct yang_data *data, void *arg)
433{
434 struct lyd_node *dnode = static_cast<struct lyd_node *>(arg);
435 int ret = yang_dnode_edit(dnode, data->xpath, data->value);
436 yang_data_free(data);
437
438 return (ret == 0) ? NB_OK : NB_ERR;
439}
440
441static struct lyd_node *get_dnode_state(const std::string &path)
442{
443 struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false);
444 if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode)
445 != NB_OK) {
446 yang_dnode_free(dnode);
447 return NULL;
448 }
449
450 return dnode;
451}
452
453static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
454 int type, LYD_FORMAT lyd_format,
455 bool with_defaults)
456{
457 struct lyd_node *dnode_config = NULL;
458 struct lyd_node *dnode_state = NULL;
459 struct lyd_node *dnode_final;
460
461 // Configuration data.
462 if (type == frr::GetRequest_DataType_ALL
463 || type == frr::GetRequest_DataType_CONFIG) {
464 dnode_config = get_dnode_config(path);
465 if (!dnode_config)
466 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
467 "Data path not found");
468 }
469
470 // Operational data.
471 if (type == frr::GetRequest_DataType_ALL
472 || type == frr::GetRequest_DataType_STATE) {
473 dnode_state = get_dnode_state(path);
474 if (!dnode_state) {
475 if (dnode_config)
476 yang_dnode_free(dnode_config);
477 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
478 "Failed to fetch operational data");
479 }
480 }
481
482 switch (type) {
483 case frr::GetRequest_DataType_ALL:
484 //
485 // Combine configuration and state data into a single
486 // dnode.
487 //
488 if (lyd_merge_siblings(&dnode_state, dnode_config,
489 LYD_MERGE_DESTRUCT)
490 != LY_SUCCESS) {
491 yang_dnode_free(dnode_state);
492 yang_dnode_free(dnode_config);
493 return grpc::Status(
494 grpc::StatusCode::INTERNAL,
495 "Failed to merge configuration and state data",
496 ly_errmsg(ly_native_ctx));
ecf9fb30 497 }
b680134e
CH
498
499 dnode_final = dnode_state;
500 break;
501 case frr::GetRequest_DataType_CONFIG:
502 dnode_final = dnode_config;
503 break;
504 case frr::GetRequest_DataType_STATE:
505 dnode_final = dnode_state;
506 break;
ecf9fb30
QY
507 }
508
b680134e
CH
509 // Validate data to create implicit default nodes if necessary.
510 int validate_opts = 0;
511 if (type == frr::GetRequest_DataType_CONFIG)
512 validate_opts = LYD_VALIDATE_NO_STATE;
513 else
514 validate_opts = 0;
515
516 LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx,
517 validate_opts, NULL);
518
519 if (err)
520 flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s",
521 __func__, ly_errmsg(ly_native_ctx));
522 // Dump data using the requested format.
523 if (!err)
524 err = data_tree_from_dnode(dt, dnode_final, lyd_format,
525 with_defaults);
526 yang_dnode_free(dnode_final);
527 if (err)
528 return grpc::Status(grpc::StatusCode::INTERNAL,
529 "Failed to dump data");
530 return grpc::Status::OK;
531}
532
533
534// ------------------------------------------------------
535// RPC Callback Functions: run on main thread
536// ------------------------------------------------------
537
48c93061
CH
538grpc::Status HandleUnaryGetCapabilities(
539 UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse>
540 *tag)
b680134e 541{
48c93061 542 grpc_debug("%s: entered", __func__);
b680134e
CH
543
544 // Response: string frr_version = 1;
545 tag->response.set_frr_version(FRR_VERSION);
546
547 // Response: bool rollback_support = 2;
ec2ac5f2 548#ifdef HAVE_CONFIG_ROLLBACKS
b680134e 549 tag->response.set_rollback_support(true);
ec2ac5f2 550#else
b680134e 551 tag->response.set_rollback_support(false);
ec2ac5f2 552#endif
b680134e
CH
553 // Response: repeated ModuleData supported_modules = 3;
554 struct yang_module *module;
555 RB_FOREACH (module, yang_modules, &yang_modules) {
556 auto m = tag->response.add_supported_modules();
557
558 m->set_name(module->name);
559 if (module->info->revision)
560 m->set_revision(module->info->revision);
561 m->set_organization(module->info->org);
562 }
ec2ac5f2 563
b680134e
CH
564 // Response: repeated Encoding supported_encodings = 4;
565 tag->response.add_supported_encodings(frr::JSON);
566 tag->response.add_supported_encodings(frr::XML);
ec2ac5f2 567
48c93061 568 return grpc::Status::OK;
b680134e 569}
ec2ac5f2 570
48c93061
CH
571// Define the context variable type for this streaming handler
572typedef std::list<std::string> GetContextType;
b680134e 573
48c93061
CH
574bool HandleStreamingGet(
575 StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag)
576{
577 grpc_debug("%s: entered", __func__);
ec2ac5f2 578
48c93061
CH
579 auto mypathps = &tag->context;
580 if (tag->is_initial_process()) {
581 // Fill our context container first time through
582 grpc_debug("%s: initialize streaming state", __func__);
b680134e
CH
583 auto paths = tag->request.path();
584 for (const std::string &path : paths) {
48c93061 585 mypathps->push_back(std::string(path));
ecf9fb30 586 }
b680134e 587 }
ecf9fb30 588
b680134e
CH
589 // Request: DataType type = 1;
590 int type = tag->request.type();
591 // Request: Encoding encoding = 2;
592 frr::Encoding encoding = tag->request.encoding();
593 // Request: bool with_defaults = 3;
594 bool with_defaults = tag->request.with_defaults();
595
b680134e
CH
596 if (mypathps->empty()) {
597 tag->async_responder.Finish(grpc::Status::OK, tag);
48c93061 598 return false;
ec2ac5f2
RW
599 }
600
b680134e
CH
601 frr::GetResponse response;
602 grpc::Status status;
603
604 // Response: int64 timestamp = 1;
605 response.set_timestamp(time(NULL));
606
607 // Response: DataTree data = 2;
608 auto *data = response.mutable_data();
609 data->set_encoding(tag->request.encoding());
610 status = get_path(data, mypathps->back().c_str(), type,
611 encoding2lyd_format(encoding), with_defaults);
612
613 if (!status.ok()) {
614 tag->async_responder.WriteAndFinish(
615 response, grpc::WriteOptions(), status, tag);
48c93061 616 return false;
b680134e
CH
617 }
618
619 mypathps->pop_back();
620 if (mypathps->empty()) {
621 tag->async_responder.WriteAndFinish(
622 response, grpc::WriteOptions(), grpc::Status::OK, tag);
48c93061 623 return false;
b680134e
CH
624 } else {
625 tag->async_responder.Write(response, tag);
48c93061 626 return true;
b680134e
CH
627 }
628}
629
48c93061
CH
630grpc::Status HandleUnaryCreateCandidate(
631 UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse>
632 *tag)
b680134e 633{
48c93061 634 grpc_debug("%s: entered", __func__);
ec2ac5f2 635
b680134e 636 struct candidate *candidate = tag->cdb->create_candidate();
48c93061
CH
637 if (!candidate)
638 return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
639 "Can't create candidate configuration");
640 tag->response.set_candidate_id(candidate->id);
641 return grpc::Status::OK;
b680134e
CH
642}
643
48c93061
CH
644grpc::Status HandleUnaryDeleteCandidate(
645 UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse>
646 *tag)
b680134e 647{
48c93061 648 grpc_debug("%s: entered", __func__);
b680134e 649
b680134e
CH
650 uint32_t candidate_id = tag->request.candidate_id();
651
652 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
653
48c93061
CH
654 if (!tag->cdb->contains(candidate_id))
655 return grpc::Status(grpc::StatusCode::NOT_FOUND,
656 "candidate configuration not found");
657 tag->cdb->delete_candidate(candidate_id);
658 return grpc::Status::OK;
b680134e
CH
659}
660
48c93061
CH
661grpc::Status HandleUnaryUpdateCandidate(
662 UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse>
663 *tag)
b680134e 664{
48c93061 665 grpc_debug("%s: entered", __func__);
ecf9fb30 666
b680134e
CH
667 uint32_t candidate_id = tag->request.candidate_id();
668
669 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
670
671 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
672
673 if (!candidate)
48c93061
CH
674 return grpc::Status(grpc::StatusCode::NOT_FOUND,
675 "candidate configuration not found");
676 if (candidate->transaction)
677 return grpc::Status(
678 grpc::StatusCode::FAILED_PRECONDITION,
679 "candidate is in the middle of a transaction");
680 if (nb_candidate_update(candidate->config) != NB_OK)
681 return grpc::Status(grpc::StatusCode::INTERNAL,
682 "failed to update candidate configuration");
b680134e 683
48c93061 684 return grpc::Status::OK;
b680134e
CH
685}
686
48c93061
CH
687grpc::Status HandleUnaryEditCandidate(
688 UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse>
689 *tag)
b680134e 690{
48c93061 691 grpc_debug("%s: entered", __func__);
b680134e 692
b680134e 693 uint32_t candidate_id = tag->request.candidate_id();
ec2ac5f2 694
b680134e 695 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
ec2ac5f2 696
b680134e 697 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
48c93061
CH
698 if (!candidate)
699 return grpc::Status(grpc::StatusCode::NOT_FOUND,
700 "candidate configuration not found");
ec2ac5f2 701
b680134e
CH
702 struct nb_config *candidate_tmp = nb_config_dup(candidate->config);
703
704 auto pvs = tag->request.update();
705 for (const frr::PathValue &pv : pvs) {
fe095adc
CH
706 if (yang_dnode_edit(candidate_tmp->dnode, pv.path(),
707 pv.value().c_str()) != 0) {
b680134e
CH
708 nb_config_free(candidate_tmp);
709
48c93061
CH
710 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
711 "Failed to update \"" + pv.path() +
712 "\"");
ecf9fb30 713 }
ec2ac5f2
RW
714 }
715
b680134e
CH
716 pvs = tag->request.delete_();
717 for (const frr::PathValue &pv : pvs) {
718 if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) {
719 nb_config_free(candidate_tmp);
48c93061
CH
720 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
721 "Failed to remove \"" + pv.path() +
722 "\"");
ec2ac5f2 723 }
ec2ac5f2
RW
724 }
725
b680134e
CH
726 // No errors, accept all changes.
727 nb_config_replace(candidate->config, candidate_tmp, false);
48c93061 728 return grpc::Status::OK;
b680134e
CH
729}
730
48c93061
CH
731grpc::Status HandleUnaryLoadToCandidate(
732 UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse>
733 *tag)
b680134e 734{
48c93061 735 grpc_debug("%s: entered", __func__);
b680134e 736
b680134e
CH
737 uint32_t candidate_id = tag->request.candidate_id();
738
739 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
740
741 // Request: LoadType type = 2;
742 int load_type = tag->request.type();
743 // Request: DataTree config = 3;
744 auto config = tag->request.config();
745
b680134e 746 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
48c93061
CH
747 if (!candidate)
748 return grpc::Status(grpc::StatusCode::NOT_FOUND,
749 "candidate configuration not found");
ec2ac5f2 750
b680134e 751 struct lyd_node *dnode = dnode_from_data_tree(&config, true);
48c93061
CH
752 if (!dnode)
753 return grpc::Status(grpc::StatusCode::INTERNAL,
754 "Failed to parse the configuration");
ec2ac5f2 755
b680134e 756 struct nb_config *loaded_config = nb_config_new(dnode);
b680134e
CH
757 if (load_type == frr::LoadToCandidateRequest::REPLACE)
758 nb_config_replace(candidate->config, loaded_config, false);
48c93061
CH
759 else if (nb_config_merge(candidate->config, loaded_config, false) !=
760 NB_OK)
761 return grpc::Status(grpc::StatusCode::INTERNAL,
762 "Failed to merge the loaded configuration");
ec2ac5f2 763
48c93061 764 return grpc::Status::OK;
b680134e
CH
765}
766
48c93061
CH
767grpc::Status
768HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
b680134e 769{
48c93061 770 grpc_debug("%s: entered", __func__);
ec2ac5f2 771
b680134e
CH
772 // Request: uint32 candidate_id = 1;
773 uint32_t candidate_id = tag->request.candidate_id();
774
775 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
776
777 // Request: Phase phase = 2;
778 int phase = tag->request.phase();
779 // Request: string comment = 3;
780 const std::string comment = tag->request.comment();
781
782 // Find candidate configuration.
783 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
48c93061
CH
784 if (!candidate)
785 return grpc::Status(grpc::StatusCode::NOT_FOUND,
786 "candidate configuration not found");
b680134e
CH
787
788 int ret = NB_OK;
789 uint32_t transaction_id = 0;
790
791 // Check for misuse of the two-phase commit protocol.
792 switch (phase) {
793 case frr::CommitRequest::PREPARE:
794 case frr::CommitRequest::ALL:
48c93061
CH
795 if (candidate->transaction)
796 return grpc::Status(
797 grpc::StatusCode::FAILED_PRECONDITION,
798 "candidate is in the middle of a transaction");
b680134e
CH
799 break;
800 case frr::CommitRequest::ABORT:
801 case frr::CommitRequest::APPLY:
48c93061
CH
802 if (!candidate->transaction)
803 return grpc::Status(
804 grpc::StatusCode::FAILED_PRECONDITION,
805 "no transaction in progress");
b680134e
CH
806 break;
807 default:
808 break;
ec2ac5f2
RW
809 }
810
ecf9fb30 811
b680134e
CH
812 // Execute the user request.
813 struct nb_context context = {};
814 context.client = NB_CLIENT_GRPC;
815 char errmsg[BUFSIZ] = {0};
816
817 switch (phase) {
818 case frr::CommitRequest::VALIDATE:
819 grpc_debug("`-> Performing VALIDATE");
820 ret = nb_candidate_validate(&context, candidate->config, errmsg,
821 sizeof(errmsg));
822 break;
823 case frr::CommitRequest::PREPARE:
824 grpc_debug("`-> Performing PREPARE");
825 ret = nb_candidate_commit_prepare(
41ef7327 826 context, candidate->config, comment.c_str(),
7d65b7b7
CH
827 &candidate->transaction, false, false, errmsg,
828 sizeof(errmsg));
b680134e
CH
829 break;
830 case frr::CommitRequest::ABORT:
831 grpc_debug("`-> Performing ABORT");
832 nb_candidate_commit_abort(candidate->transaction, errmsg,
833 sizeof(errmsg));
834 break;
835 case frr::CommitRequest::APPLY:
836 grpc_debug("`-> Performing APPLY");
837 nb_candidate_commit_apply(candidate->transaction, true,
838 &transaction_id, errmsg,
839 sizeof(errmsg));
840 break;
841 case frr::CommitRequest::ALL:
842 grpc_debug("`-> Performing ALL");
41ef7327 843 ret = nb_candidate_commit(context, candidate->config, true,
b680134e
CH
844 comment.c_str(), &transaction_id,
845 errmsg, sizeof(errmsg));
846 break;
847 }
ec2ac5f2 848
b680134e
CH
849 // Map northbound error codes to gRPC status codes.
850 grpc::Status status;
851 switch (ret) {
852 case NB_OK:
853 status = grpc::Status::OK;
854 break;
855 case NB_ERR_NO_CHANGES:
856 status = grpc::Status(grpc::StatusCode::ABORTED, errmsg);
857 break;
858 case NB_ERR_LOCKED:
859 status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg);
860 break;
861 case NB_ERR_VALIDATION:
862 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
863 errmsg);
864 break;
865 case NB_ERR_RESOURCE:
866 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
867 errmsg);
868 break;
869 case NB_ERR:
870 default:
871 status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg);
872 break;
873 }
ec2ac5f2 874
b680134e
CH
875 grpc_debug("`-> Result: %s (message: '%s')",
876 nb_err_name((enum nb_error)ret), errmsg);
877
878 if (ret == NB_OK) {
879 // Response: uint32 transaction_id = 1;
880 if (transaction_id)
881 tag->response.set_transaction_id(transaction_id);
ec2ac5f2 882 }
b680134e
CH
883 if (strlen(errmsg) > 0)
884 tag->response.set_error_message(errmsg);
ec2ac5f2 885
48c93061 886 return status;
b680134e 887}
ec2ac5f2 888
48c93061
CH
889grpc::Status HandleUnaryLockConfig(
890 UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
b680134e 891{
48c93061 892 grpc_debug("%s: entered", __func__);
ec2ac5f2 893
48c93061
CH
894 if (nb_running_lock(NB_CLIENT_GRPC, NULL))
895 return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
896 "running configuration is locked already");
897 return grpc::Status::OK;
b680134e 898}
ec2ac5f2 899
48c93061
CH
900grpc::Status HandleUnaryUnlockConfig(
901 UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
b680134e 902{
48c93061 903 grpc_debug("%s: entered", __func__);
ec2ac5f2 904
48c93061
CH
905 if (nb_running_unlock(NB_CLIENT_GRPC, NULL))
906 return grpc::Status(
907 grpc::StatusCode::FAILED_PRECONDITION,
908 "failed to unlock the running configuration");
909 return grpc::Status::OK;
b680134e 910}
ec2ac5f2 911
b680134e
CH
912static void list_transactions_cb(void *arg, int transaction_id,
913 const char *client_name, const char *date,
914 const char *comment)
915{
916 auto list = static_cast<std::list<
917 std::tuple<int, std::string, std::string, std::string>> *>(arg);
918 list->push_back(
919 std::make_tuple(transaction_id, std::string(client_name),
920 std::string(date), std::string(comment)));
921}
922
48c93061
CH
923// Define the context variable type for this streaming handler
924typedef std::list<std::tuple<int, std::string, std::string, std::string>>
925 ListTransactionsContextType;
b680134e 926
48c93061
CH
927bool HandleStreamingListTransactions(
928 StreamRpcState<frr::ListTransactionsRequest,
929 frr::ListTransactionsResponse,
930 ListTransactionsContextType> *tag)
931{
932 grpc_debug("%s: entered", __func__);
933
934 auto list = &tag->context;
935 if (tag->is_initial_process()) {
936 grpc_debug("%s: initialize streaming state", __func__);
937 // Fill our context container first time through
938 nb_db_transactions_iterate(list_transactions_cb, list);
939 list->push_back(std::make_tuple(
b680134e
CH
940 0xFFFF, std::string("fake client"),
941 std::string("fake date"), std::string("fake comment")));
48c93061
CH
942 list->push_back(std::make_tuple(0xFFFE,
943 std::string("fake client2"),
944 std::string("fake date"),
945 std::string("fake comment2")));
ec2ac5f2
RW
946 }
947
b680134e
CH
948 if (list->empty()) {
949 tag->async_responder.Finish(grpc::Status::OK, tag);
48c93061 950 return false;
ec2ac5f2
RW
951 }
952
b680134e 953 auto item = list->back();
ec2ac5f2 954
b680134e 955 frr::ListTransactionsResponse response;
ec2ac5f2 956
b680134e
CH
957 // Response: uint32 id = 1;
958 response.set_id(std::get<0>(item));
ec2ac5f2 959
b680134e
CH
960 // Response: string client = 2;
961 response.set_client(std::get<1>(item).c_str());
962
963 // Response: string date = 3;
964 response.set_date(std::get<2>(item).c_str());
965
966 // Response: string comment = 4;
967 response.set_comment(std::get<3>(item).c_str());
ec2ac5f2 968
b680134e
CH
969 list->pop_back();
970 if (list->empty()) {
971 tag->async_responder.WriteAndFinish(
972 response, grpc::WriteOptions(), grpc::Status::OK, tag);
48c93061 973 return false;
b680134e
CH
974 } else {
975 tag->async_responder.Write(response, tag);
48c93061 976 return true;
ec2ac5f2 977 }
b680134e 978}
ec2ac5f2 979
48c93061
CH
980grpc::Status HandleUnaryGetTransaction(
981 UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse>
982 *tag)
b680134e 983{
48c93061 984 grpc_debug("%s: entered", __func__);
ec2ac5f2 985
b680134e
CH
986 // Request: uint32 transaction_id = 1;
987 uint32_t transaction_id = tag->request.transaction_id();
988 // Request: Encoding encoding = 2;
989 frr::Encoding encoding = tag->request.encoding();
990 // Request: bool with_defaults = 3;
991 bool with_defaults = tag->request.with_defaults();
992
993 grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__,
994 transaction_id, encoding);
995
996 struct nb_config *nb_config;
997
998 // Load configuration from the transactions database.
999 nb_config = nb_db_transaction_load(transaction_id);
48c93061
CH
1000 if (!nb_config)
1001 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1002 "Transaction not found");
ec2ac5f2 1003
b680134e
CH
1004 // Response: DataTree config = 1;
1005 auto config = tag->response.mutable_config();
1006 config->set_encoding(encoding);
1007
1008 // Dump data using the requested format.
1009 if (data_tree_from_dnode(config, nb_config->dnode,
1010 encoding2lyd_format(encoding), with_defaults)
1011 != 0) {
1012 nb_config_free(nb_config);
48c93061
CH
1013 return grpc::Status(grpc::StatusCode::INTERNAL,
1014 "Failed to dump data");
ec2ac5f2
RW
1015 }
1016
b680134e 1017 nb_config_free(nb_config);
ec2ac5f2 1018
48c93061 1019 return grpc::Status::OK;
b680134e 1020}
ec2ac5f2 1021
48c93061
CH
1022grpc::Status HandleUnaryExecute(
1023 UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
b680134e 1024{
48c93061 1025 grpc_debug("%s: entered", __func__);
ec2ac5f2 1026
b680134e
CH
1027 struct nb_node *nb_node;
1028 struct list *input_list;
1029 struct list *output_list;
1030 struct listnode *node;
1031 struct yang_data *data;
1032 const char *xpath;
1033 char errmsg[BUFSIZ] = {0};
1034
1035 // Request: string path = 1;
1036 xpath = tag->request.path().c_str();
1037
1038 grpc_debug("%s(path: \"%s\")", __func__, xpath);
1039
48c93061
CH
1040 if (tag->request.path().empty())
1041 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1042 "Data path is empty");
0fe5b904 1043
b680134e 1044 nb_node = nb_node_find(xpath);
48c93061
CH
1045 if (!nb_node)
1046 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1047 "Unknown data path");
ec2ac5f2 1048
b680134e
CH
1049 input_list = yang_data_list_new();
1050 output_list = yang_data_list_new();
ec2ac5f2 1051
b680134e
CH
1052 // Read input parameters.
1053 auto input = tag->request.input();
1054 for (const frr::PathValue &pv : input) {
1055 // Request: repeated PathValue input = 2;
1056 data = yang_data_new(pv.path().c_str(), pv.value().c_str());
1057 listnode_add(input_list, data);
1058 }
ec2ac5f2 1059
b680134e
CH
1060 // Execute callback registered for this XPath.
1061 if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg,
1062 sizeof(errmsg))
1063 != NB_OK) {
1064 flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s",
1065 __func__, xpath);
1066 list_delete(&input_list);
1067 list_delete(&output_list);
1068
48c93061 1069 return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed");
ec2ac5f2 1070 }
b680134e
CH
1071
1072 // Process output parameters.
1073 for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
1074 // Response: repeated PathValue output = 1;
1075 frr::PathValue *pv = tag->response.add_output();
1076 pv->set_path(data->xpath);
1077 pv->set_value(data->value);
1078 }
1079
1080 // Release memory.
1081 list_delete(&input_list);
1082 list_delete(&output_list);
1083
48c93061 1084 return grpc::Status::OK;
b680134e
CH
1085}
1086
1087// ------------------------------------------------------
1088// Thread Initialization and Run Functions
1089// ------------------------------------------------------
1090
1091
1092#define REQUEST_NEWRPC(NAME, cdb) \
1093 do { \
48c93061
CH
1094 auto _rpcState = new UnaryRpcState<frr::NAME##Request, \
1095 frr::NAME##Response>( \
b680134e
CH
1096 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1097 &HandleUnary##NAME, #NAME); \
83f6fce7 1098 _rpcState->do_request(&service, cq.get(), true); \
b680134e
CH
1099 } while (0)
1100
48c93061 1101#define REQUEST_NEWRPC_STREAMING(NAME) \
b680134e 1102 do { \
48c93061
CH
1103 auto _rpcState = new StreamRpcState<frr::NAME##Request, \
1104 frr::NAME##Response, \
1105 NAME##ContextType>( \
1106 &frr::Northbound::AsyncService::Request##NAME, \
b680134e 1107 &HandleStreaming##NAME, #NAME); \
83f6fce7 1108 _rpcState->do_request(&service, cq.get(), true); \
b680134e
CH
1109 } while (0)
1110
1111struct grpc_pthread_attr {
1112 struct frr_pthread_attr attr;
1113 unsigned long port;
ec2ac5f2
RW
1114};
1115
de800c10 1116// Capture these objects so we can try to shut down cleanly
83f6fce7
CH
1117static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER;
1118static grpc::Server *s_server;
de800c10 1119
ec2ac5f2
RW
1120static void *grpc_pthread_start(void *arg)
1121{
0edcb505 1122 struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
b680134e
CH
1123 uint port = (uint) reinterpret_cast<intptr_t>(fpt->data);
1124
1125 Candidates candidates;
1126 grpc::ServerBuilder builder;
1127 std::stringstream server_address;
83f6fce7 1128 frr::Northbound::AsyncService service;
ec2ac5f2 1129
0edcb505
CS
1130 frr_pthread_set_name(fpt);
1131
b680134e
CH
1132 server_address << "0.0.0.0:" << port;
1133 builder.AddListeningPort(server_address.str(),
1134 grpc::InsecureServerCredentials());
83f6fce7 1135 builder.RegisterService(&service);
673e4407
RZ
1136 builder.AddChannelArgument(
1137 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
83f6fce7
CH
1138 std::unique_ptr<grpc::ServerCompletionQueue> cq =
1139 builder.AddCompletionQueue();
1140 std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
1141 s_server = server.get();
1142
ad6bd536 1143 pthread_mutex_lock(&s_server_lock); // Make coverity happy
83f6fce7 1144 grpc_running = true;
ad6bd536 1145 pthread_mutex_unlock(&s_server_lock); // Make coverity happy
b680134e 1146
48c93061 1147 /* Schedule unary RPC handlers */
b680134e
CH
1148 REQUEST_NEWRPC(GetCapabilities, NULL);
1149 REQUEST_NEWRPC(CreateCandidate, &candidates);
1150 REQUEST_NEWRPC(DeleteCandidate, &candidates);
1151 REQUEST_NEWRPC(UpdateCandidate, &candidates);
1152 REQUEST_NEWRPC(EditCandidate, &candidates);
1153 REQUEST_NEWRPC(LoadToCandidate, &candidates);
1154 REQUEST_NEWRPC(Commit, &candidates);
1155 REQUEST_NEWRPC(GetTransaction, NULL);
1156 REQUEST_NEWRPC(LockConfig, NULL);
1157 REQUEST_NEWRPC(UnlockConfig, NULL);
1158 REQUEST_NEWRPC(Execute, NULL);
48c93061
CH
1159
1160 /* Schedule streaming RPC handlers */
1161 REQUEST_NEWRPC_STREAMING(Get);
1162 REQUEST_NEWRPC_STREAMING(ListTransactions);
b680134e
CH
1163
1164 zlog_notice("gRPC server listening on %s",
1165 server_address.str().c_str());
1166
1167 /* Process inbound RPCs */
83f6fce7
CH
1168 bool ok;
1169 void *tag;
48c93061 1170 while (true) {
83f6fce7
CH
1171 if (!cq->Next(&tag, &ok)) {
1172 grpc_debug("%s: CQ empty exiting", __func__);
de800c10 1173 break;
83f6fce7 1174 }
de800c10 1175
83f6fce7
CH
1176 grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag,
1177 ok);
1178
48c93061 1179 if (!ok) {
83f6fce7
CH
1180 delete static_cast<RpcStateBase *>(tag);
1181 break;
1182 }
b680134e
CH
1183
1184 RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
48c93061
CH
1185 if (rpc->get_state() != FINISH)
1186 rpc->run(&service, cq.get());
1187 else {
1188 grpc_debug("%s RPC FINISH -> [delete]", rpc->name);
1189 delete rpc;
1190 }
b680134e 1191 }
ec2ac5f2 1192
83f6fce7
CH
1193 /* This was probably done for us to get here, but let's be safe */
1194 pthread_mutex_lock(&s_server_lock);
1195 grpc_running = false;
1196 if (s_server) {
1197 grpc_debug("%s: shutdown server and CQ", __func__);
1198 server->Shutdown();
1199 s_server = NULL;
1200 }
1201 pthread_mutex_unlock(&s_server_lock);
1202
1203 grpc_debug("%s: shutting down CQ", __func__);
1204 cq->Shutdown();
1205
1206 grpc_debug("%s: draining the CQ", __func__);
1207 while (cq->Next(&tag, &ok)) {
1208 grpc_debug("%s: drain tag %p", __func__, tag);
1209 delete static_cast<RpcStateBase *>(tag);
b680134e 1210 }
ec2ac5f2 1211
83f6fce7 1212 zlog_info("%s: exiting from grpc pthread", __func__);
ec2ac5f2
RW
1213 return NULL;
1214}
1215
b680134e
CH
1216
1217static int frr_grpc_init(uint port)
ec2ac5f2 1218{
b680134e
CH
1219 struct frr_pthread_attr attr = {
1220 .start = grpc_pthread_start,
1221 .stop = NULL,
1222 };
1223
83f6fce7
CH
1224 grpc_debug("%s: entered", __func__);
1225
0edcb505 1226 fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
b680134e 1227 fpt->data = reinterpret_cast<void *>((intptr_t)port);
0edcb505 1228
ec2ac5f2 1229 /* Create a pthread for gRPC since it runs its own event loop. */
0edcb505 1230 if (frr_pthread_run(fpt, NULL) < 0) {
ec2ac5f2
RW
1231 flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s",
1232 __func__, safe_strerror(errno));
1233 return -1;
1234 }
ec2ac5f2
RW
1235
1236 return 0;
1237}
1238
1239static int frr_grpc_finish(void)
1240{
83f6fce7 1241 grpc_debug("%s: entered", __func__);
de800c10 1242
83f6fce7
CH
1243 if (!fpt)
1244 return 0;
de800c10 1245
83f6fce7
CH
1246 /*
1247 * Shut the server down here in main thread. This will cause the wait on
1248 * the completion queue (cq.Next()) to exit and cleanup everything else.
1249 */
1250 pthread_mutex_lock(&s_server_lock);
1251 grpc_running = false;
1252 if (s_server) {
1253 grpc_debug("%s: shutdown server", __func__);
1254 s_server->Shutdown();
1255 s_server = NULL;
de800c10 1256 }
83f6fce7 1257 pthread_mutex_unlock(&s_server_lock);
de800c10 1258
83f6fce7
CH
1259 grpc_debug("%s: joining and destroy grpc thread", __func__);
1260 pthread_join(fpt->thread, NULL);
1261 frr_pthread_destroy(fpt);
79c68195
RZ
1262
1263 // Fix protobuf 'memory leaks' during shutdown.
1264 // https://groups.google.com/g/protobuf/c/4y_EmQiCGgs
1265 google::protobuf::ShutdownProtobufLibrary();
1266
ec2ac5f2
RW
1267 return 0;
1268}
1269
20b0a2ed
QY
1270/*
1271 * This is done this way because module_init and module_late_init are both
1272 * called during daemon pre-fork initialization. Because the GRPC library
1273 * spawns threads internally, we need to delay initializing it until after
1274 * fork. This is done by scheduling this init function as an event task, since
1275 * the event loop doesn't run until after fork.
1276 */
e6685141 1277static void frr_grpc_module_very_late_init(struct event *thread)
ec2ac5f2 1278{
ec2ac5f2 1279 const char *args = THIS_MODULE->load_args;
b680134e 1280 uint port = GRPC_DEFAULT_PORT;
ec2ac5f2 1281
ec2ac5f2 1282 if (args) {
b680134e
CH
1283 port = std::stoul(args);
1284 if (port < 1024 || port > UINT16_MAX) {
ec2ac5f2 1285 flog_err(EC_LIB_GRPC_INIT,
b680134e
CH
1286 "%s: port number must be between 1025 and %d",
1287 __func__, UINT16_MAX);
ec2ac5f2
RW
1288 goto error;
1289 }
1290 }
1291
b680134e 1292 if (frr_grpc_init(port) < 0)
ec2ac5f2
RW
1293 goto error;
1294
cc9f21da 1295 return;
69ec5832 1296
ec2ac5f2
RW
1297error:
1298 flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module");
ec2ac5f2
RW
1299}
1300
cd9d0537 1301static int frr_grpc_module_late_init(struct event_loop *tm)
20b0a2ed 1302{
b680134e 1303 main_master = tm;
20b0a2ed 1304 hook_register(frr_fini, frr_grpc_finish);
907a2395 1305 event_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL);
20b0a2ed
QY
1306 return 0;
1307}
1308
ec2ac5f2
RW
1309static int frr_grpc_module_init(void)
1310{
1311 hook_register(frr_late_init, frr_grpc_module_late_init);
1312
1313 return 0;
1314}
1315
1316FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION,
1317 .description = "FRR gRPC northbound module",
b680134e 1318 .init = frr_grpc_module_init, );