]> git.proxmox.com Git - mirror_frr.git/blame - lib/northbound_grpc.cpp
Merge pull request #10953 from leonshaw/fix/bgp-rm-leak
[mirror_frr.git] / lib / northbound_grpc.cpp
CommitLineData
ec2ac5f2 1//
c85ecd64 2// Copyright (c) 2021-2022, LabN Consulting, L.L.C
ec2ac5f2
RW
3// Copyright (C) 2019 NetDEF, Inc.
4// Renato Westphal
5//
6// This program is free software; you can redistribute it and/or modify it
7// under the terms of the GNU General Public License as published by the Free
8// Software Foundation; either version 2 of the License, or (at your option)
9// any later version.
10//
11// This program is distributed in the hope that it will be useful, but WITHOUT
12// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14// more details.
15//
16// You should have received a copy of the GNU General Public License along
17// with this program; see the file COPYING; if not, write to the Free Software
18// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19//
20
21#include <zebra.h>
63d12a7d
QY
22#include <grpcpp/grpcpp.h>
23#include "grpc/frr-northbound.grpc.pb.h"
ec2ac5f2
RW
24
25#include "log.h"
26#include "libfrr.h"
09781197 27#include "lib/version.h"
b680134e 28#include "lib/thread.h"
ec2ac5f2
RW
29#include "command.h"
30#include "lib_errors.h"
31#include "northbound.h"
32#include "northbound_db.h"
0edcb505 33#include "frr_pthread.h"
ec2ac5f2
RW
34
35#include <iostream>
36#include <sstream>
37#include <memory>
38#include <string>
39
ec2ac5f2
RW
40#define GRPC_DEFAULT_PORT 50051
41
48c93061
CH
42
43// ------------------------------------------------------
44// File Local Variables
45// ------------------------------------------------------
46
ec2ac5f2
RW
47/*
48 * NOTE: we can't use the FRR debugging infrastructure here since it uses
49 * atomics and C++ has a different atomics API. Enable gRPC debugging
50 * unconditionally until we figure out a way to solve this problem.
51 */
b680134e
CH
52static bool nb_dbg_client_grpc = 0;
53
54static struct thread_master *main_master;
ec2ac5f2 55
0edcb505
CS
56static struct frr_pthread *fpt;
57
83f6fce7
CH
58static bool grpc_running;
59
b680134e
CH
60#define grpc_debug(...) \
61 do { \
62 if (nb_dbg_client_grpc) \
63 zlog_debug(__VA_ARGS__); \
64 } while (0)
65
66// ------------------------------------------------------
67// New Types
68// ------------------------------------------------------
69
70enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED };
71const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"};
72
73struct candidate {
74 uint64_t id;
75 struct nb_config *config;
76 struct nb_transaction *transaction;
0edcb505 77};
ec2ac5f2 78
b680134e
CH
79class Candidates
80{
81 public:
82 ~Candidates(void)
83 {
84 // Delete candidates.
85 for (auto it = _cdb.begin(); it != _cdb.end(); it++)
d3074a52 86 delete_candidate(it->first);
b680134e
CH
87 }
88
89 struct candidate *create_candidate(void)
90 {
91 uint64_t id = ++_next_id;
92 assert(id); // TODO: implement an algorithm for unique reusable
93 // IDs.
94 struct candidate *c = &_cdb[id];
95 c->id = id;
96 c->config = nb_config_dup(running_config);
97 c->transaction = NULL;
98
99 return c;
100 }
101
d3074a52 102 bool contains(uint64_t candidate_id)
b680134e 103 {
d3074a52
CH
104 return _cdb.count(candidate_id) > 0;
105 }
106
107 void delete_candidate(uint64_t candidate_id)
108 {
109 struct candidate *c = &_cdb[candidate_id];
b680134e
CH
110 char errmsg[BUFSIZ] = {0};
111
b680134e
CH
112 nb_config_free(c->config);
113 if (c->transaction)
114 nb_candidate_commit_abort(c->transaction, errmsg,
115 sizeof(errmsg));
96d434f8 116 _cdb.erase(c->id);
b680134e
CH
117 }
118
d3074a52 119 struct candidate *get_candidate(uint64_t id)
b680134e
CH
120 {
121 return _cdb.count(id) == 0 ? NULL : &_cdb[id];
122 }
123
124 private:
125 uint64_t _next_id = 0;
d3074a52 126 std::map<uint64_t, struct candidate> _cdb;
b680134e 127};
ecf9fb30 128
48c93061
CH
129/*
130 * RpcStateBase is the common base class used to track a gRPC RPC.
131 */
ecf9fb30
QY
132class RpcStateBase
133{
134 public:
b680134e 135 virtual void do_request(::frr::Northbound::AsyncService *service,
83f6fce7
CH
136 ::grpc::ServerCompletionQueue *cq,
137 bool no_copy) = 0;
ecf9fb30 138
48c93061 139 RpcStateBase(const char *name) : name(name){};
b680134e 140
48c93061
CH
141 virtual ~RpcStateBase() = default;
142
143 CallState get_state() const
144 {
145 return state;
146 }
147
148 bool is_initial_process() const
149 {
150 /* Will always be true for Unary */
151 return entered_state == CREATE;
152 }
153
154 // Returns "more" status, if false caller can delete
155 bool run(frr::Northbound::AsyncService *service,
156 grpc::ServerCompletionQueue *cq)
b680134e 157 {
b680134e 158 /*
48c93061
CH
159 * We enter in either CREATE or MORE state, and transition to
160 * PROCESS state.
161 */
162 this->entered_state = this->state;
163 this->state = PROCESS;
164 grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name,
165 call_states[this->entered_state],
166 call_states[this->state]);
167 /*
168 * We schedule the callback on the main pthread, and wait for
169 * the state to transition out of the PROCESS state. The new
170 * state will either be MORE or FINISH. It will always be FINISH
171 * for Unary RPCs.
b680134e 172 */
b680134e
CH
173 thread_add_event(main_master, c_callback, (void *)this, 0,
174 NULL);
48c93061 175
b680134e 176 pthread_mutex_lock(&this->cmux);
48c93061 177 while (this->state == PROCESS)
b680134e
CH
178 pthread_cond_wait(&this->cond, &this->cmux);
179 pthread_mutex_unlock(&this->cmux);
180
48c93061
CH
181 grpc_debug("%s RPC in %s on grpc-io-thread", name,
182 call_states[this->state]);
183
184 if (this->state == FINISH) {
185 /*
186 * Server is done (FINISH) so prep to receive a new
187 * request of this type. We could do this earlier but
188 * that would mean we could be handling multiple same
189 * type requests in parallel without limit.
190 */
191 this->do_request(service, cq, false);
b680134e 192 }
48c93061 193 return true;
b680134e
CH
194 }
195
48c93061
CH
196 protected:
197 virtual CallState run_mainthread(struct thread *thread) = 0;
b680134e 198
cc9f21da 199 static void c_callback(struct thread *thread)
b680134e 200 {
48c93061 201 auto _tag = static_cast<RpcStateBase *>(thread->arg);
b680134e
CH
202 /*
203 * We hold the lock until the callback finishes and has updated
204 * _tag->state, then we signal done and release.
205 */
206 pthread_mutex_lock(&_tag->cmux);
207
208 CallState enter_state = _tag->state;
48c93061
CH
209 grpc_debug("%s RPC: running %s on main thread", _tag->name,
210 call_states[enter_state]);
b680134e 211
48c93061 212 _tag->state = _tag->run_mainthread(thread);
b680134e 213
48c93061 214 grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name,
b680134e
CH
215 call_states[enter_state], call_states[_tag->state]);
216
217 pthread_cond_signal(&_tag->cond);
218 pthread_mutex_unlock(&_tag->cmux);
cc9f21da 219 return;
ecf9fb30
QY
220 }
221
222 grpc::ServerContext ctx;
48c93061
CH
223 pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
224 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
225 CallState state = CREATE;
ad6bd536 226 CallState entered_state = CREATE;
48c93061
CH
227
228 public:
229 const char *name;
230};
231
232/*
233 * The UnaryRpcState class is used to track the execution of a Unary RPC.
234 *
235 * Template Args:
236 * Q - the request type for a given unary RPC
237 * S - the response type for a given unary RPC
238 */
239template <typename Q, typename S> class UnaryRpcState : public RpcStateBase
240{
241 public:
242 typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
243 ::grpc::ServerContext *, Q *,
244 ::grpc::ServerAsyncResponseWriter<S> *,
245 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
246 void *);
247
248 UnaryRpcState(Candidates *cdb, reqfunc_t rfunc,
249 grpc::Status (*cb)(UnaryRpcState<Q, S> *),
250 const char *name)
251 : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb),
252 responder(&ctx){};
253
254 void do_request(::frr::Northbound::AsyncService *service,
255 ::grpc::ServerCompletionQueue *cq,
256 bool no_copy) override
257 {
258 grpc_debug("%s, posting a request for: %s", __func__, name);
259 auto copy = no_copy ? this
260 : new UnaryRpcState(cdb, requestf, callback,
261 name);
262 (service->*requestf)(&copy->ctx, &copy->request,
263 &copy->responder, cq, cq, copy);
264 }
265
266 CallState run_mainthread(struct thread *thread) override
267 {
268 // Unary RPC are always finished, see "Unary" :)
269 grpc::Status status = this->callback(this);
270 responder.Finish(response, status, this);
271 return FINISH;
272 }
273
274 Candidates *cdb;
275
ecf9fb30
QY
276 Q request;
277 S response;
278 grpc::ServerAsyncResponseWriter<S> responder;
ecf9fb30 279
48c93061 280 grpc::Status (*callback)(UnaryRpcState<Q, S> *);
c85ecd64 281 reqfunc_t requestf = NULL;
48c93061 282};
ecf9fb30 283
48c93061
CH
284/*
285 * The StreamRpcState class is used to track the execution of a Streaming RPC.
286 *
287 * Template Args:
288 * Q - the request type for a given streaming RPC
289 * S - the response type for a given streaming RPC
290 * X - the type used to track the streaming state
291 */
292template <typename Q, typename S, typename X>
293class StreamRpcState : public RpcStateBase
294{
295 public:
296 typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
297 ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
298 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
299 void *);
b680134e 300
48c93061
CH
301 StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *),
302 const char *name)
303 : RpcStateBase(name), requestsf(rfunc), callback(cb),
304 async_responder(&ctx){};
305
306 void do_request(::frr::Northbound::AsyncService *service,
307 ::grpc::ServerCompletionQueue *cq,
308 bool no_copy) override
309 {
310 grpc_debug("%s, posting a request for: %s", __func__, name);
311 auto copy =
312 no_copy ? this
313 : new StreamRpcState(requestsf, callback, name);
314 (service->*requestsf)(&copy->ctx, &copy->request,
315 &copy->async_responder, cq, cq, copy);
316 }
317
318 CallState run_mainthread(struct thread *thread) override
319 {
320 if (this->callback(this))
321 return MORE;
322 else
323 return FINISH;
324 }
325
326 Q request;
327 S response;
328 grpc::ServerAsyncWriter<S> async_responder;
329
330 bool (*callback)(StreamRpcState<Q, S, X> *);
331 reqsfunc_t requestsf = NULL;
332
333 X context;
ecf9fb30
QY
334};
335
b680134e
CH
336// ------------------------------------------------------
337// Utility Functions
338// ------------------------------------------------------
ecf9fb30 339
b680134e
CH
340static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
341{
342 switch (encoding) {
343 case frr::JSON:
344 return LYD_JSON;
345 case frr::XML:
346 return LYD_XML;
347 default:
348 flog_err(EC_LIB_DEVELOPMENT,
349 "%s: unknown data encoding format (%u)", __func__,
350 encoding);
351 exit(1);
352 }
353}
ecf9fb30 354
b680134e 355static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path,
fe095adc 356 const char *value)
ec2ac5f2 357{
fe095adc
CH
358 LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value,
359 LYD_NEW_PATH_UPDATE, &dnode);
b680134e
CH
360 if (err != LY_SUCCESS) {
361 flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",
362 __func__, ly_errmsg(ly_native_ctx));
363 return -1;
ec2ac5f2
RW
364 }
365
b680134e
CH
366 return 0;
367}
368
369static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path)
370{
371 dnode = yang_dnode_get(dnode, path.c_str());
372 if (!dnode)
373 return -1;
374
375 lyd_free_tree(dnode);
376
377 return 0;
378}
379
380static LY_ERR data_tree_from_dnode(frr::DataTree *dt,
381 const struct lyd_node *dnode,
382 LYD_FORMAT lyd_format, bool with_defaults)
383{
384 char *strp;
385 int options = 0;
386
387 SET_FLAG(options, LYD_PRINT_WITHSIBLINGS);
388 if (with_defaults)
389 SET_FLAG(options, LYD_PRINT_WD_ALL);
390 else
391 SET_FLAG(options, LYD_PRINT_WD_TRIM);
392
393 LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options);
394 if (err == LY_SUCCESS) {
395 if (strp) {
396 dt->set_data(strp);
397 free(strp);
398 }
ec2ac5f2 399 }
b680134e
CH
400 return err;
401}
ec2ac5f2 402
b680134e
CH
403static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt,
404 bool config_only)
405{
406 struct lyd_node *dnode;
407 int options, opt2;
408 LY_ERR err;
409
410 if (config_only) {
411 options = LYD_PARSE_NO_STATE;
412 opt2 = LYD_VALIDATE_NO_STATE;
413 } else {
414 options = LYD_PARSE_STRICT;
415 opt2 = 0;
416 }
417
418 err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(),
419 encoding2lyd_format(dt->encoding()), options,
420 opt2, &dnode);
421 if (err != LY_SUCCESS) {
422 flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s",
423 __func__, ly_errmsg(ly_native_ctx));
424 }
425 return dnode;
426}
427
428static struct lyd_node *get_dnode_config(const std::string &path)
429{
430 struct lyd_node *dnode;
431
0f538858
RZ
432 if (!yang_dnode_exists(running_config->dnode,
433 path.empty() ? NULL : path.c_str()))
434 return NULL;
435
b680134e
CH
436 dnode = yang_dnode_get(running_config->dnode,
437 path.empty() ? NULL : path.c_str());
438 if (dnode)
439 dnode = yang_dnode_dup(dnode);
440
441 return dnode;
442}
443
444static int get_oper_data_cb(const struct lysc_node *snode,
445 struct yang_translator *translator,
446 struct yang_data *data, void *arg)
447{
448 struct lyd_node *dnode = static_cast<struct lyd_node *>(arg);
449 int ret = yang_dnode_edit(dnode, data->xpath, data->value);
450 yang_data_free(data);
451
452 return (ret == 0) ? NB_OK : NB_ERR;
453}
454
455static struct lyd_node *get_dnode_state(const std::string &path)
456{
457 struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false);
458 if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode)
459 != NB_OK) {
460 yang_dnode_free(dnode);
461 return NULL;
462 }
463
464 return dnode;
465}
466
467static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
468 int type, LYD_FORMAT lyd_format,
469 bool with_defaults)
470{
471 struct lyd_node *dnode_config = NULL;
472 struct lyd_node *dnode_state = NULL;
473 struct lyd_node *dnode_final;
474
475 // Configuration data.
476 if (type == frr::GetRequest_DataType_ALL
477 || type == frr::GetRequest_DataType_CONFIG) {
478 dnode_config = get_dnode_config(path);
479 if (!dnode_config)
480 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
481 "Data path not found");
482 }
483
484 // Operational data.
485 if (type == frr::GetRequest_DataType_ALL
486 || type == frr::GetRequest_DataType_STATE) {
487 dnode_state = get_dnode_state(path);
488 if (!dnode_state) {
489 if (dnode_config)
490 yang_dnode_free(dnode_config);
491 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
492 "Failed to fetch operational data");
493 }
494 }
495
496 switch (type) {
497 case frr::GetRequest_DataType_ALL:
498 //
499 // Combine configuration and state data into a single
500 // dnode.
501 //
502 if (lyd_merge_siblings(&dnode_state, dnode_config,
503 LYD_MERGE_DESTRUCT)
504 != LY_SUCCESS) {
505 yang_dnode_free(dnode_state);
506 yang_dnode_free(dnode_config);
507 return grpc::Status(
508 grpc::StatusCode::INTERNAL,
509 "Failed to merge configuration and state data",
510 ly_errmsg(ly_native_ctx));
ecf9fb30 511 }
b680134e
CH
512
513 dnode_final = dnode_state;
514 break;
515 case frr::GetRequest_DataType_CONFIG:
516 dnode_final = dnode_config;
517 break;
518 case frr::GetRequest_DataType_STATE:
519 dnode_final = dnode_state;
520 break;
ecf9fb30
QY
521 }
522
b680134e
CH
523 // Validate data to create implicit default nodes if necessary.
524 int validate_opts = 0;
525 if (type == frr::GetRequest_DataType_CONFIG)
526 validate_opts = LYD_VALIDATE_NO_STATE;
527 else
528 validate_opts = 0;
529
530 LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx,
531 validate_opts, NULL);
532
533 if (err)
534 flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s",
535 __func__, ly_errmsg(ly_native_ctx));
536 // Dump data using the requested format.
537 if (!err)
538 err = data_tree_from_dnode(dt, dnode_final, lyd_format,
539 with_defaults);
540 yang_dnode_free(dnode_final);
541 if (err)
542 return grpc::Status(grpc::StatusCode::INTERNAL,
543 "Failed to dump data");
544 return grpc::Status::OK;
545}
546
547
548// ------------------------------------------------------
549// RPC Callback Functions: run on main thread
550// ------------------------------------------------------
551
48c93061
CH
552grpc::Status HandleUnaryGetCapabilities(
553 UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse>
554 *tag)
b680134e 555{
48c93061 556 grpc_debug("%s: entered", __func__);
b680134e
CH
557
558 // Response: string frr_version = 1;
559 tag->response.set_frr_version(FRR_VERSION);
560
561 // Response: bool rollback_support = 2;
ec2ac5f2 562#ifdef HAVE_CONFIG_ROLLBACKS
b680134e 563 tag->response.set_rollback_support(true);
ec2ac5f2 564#else
b680134e 565 tag->response.set_rollback_support(false);
ec2ac5f2 566#endif
b680134e
CH
567 // Response: repeated ModuleData supported_modules = 3;
568 struct yang_module *module;
569 RB_FOREACH (module, yang_modules, &yang_modules) {
570 auto m = tag->response.add_supported_modules();
571
572 m->set_name(module->name);
573 if (module->info->revision)
574 m->set_revision(module->info->revision);
575 m->set_organization(module->info->org);
576 }
ec2ac5f2 577
b680134e
CH
578 // Response: repeated Encoding supported_encodings = 4;
579 tag->response.add_supported_encodings(frr::JSON);
580 tag->response.add_supported_encodings(frr::XML);
ec2ac5f2 581
48c93061 582 return grpc::Status::OK;
b680134e 583}
ec2ac5f2 584
48c93061
CH
585// Define the context variable type for this streaming handler
586typedef std::list<std::string> GetContextType;
b680134e 587
48c93061
CH
588bool HandleStreamingGet(
589 StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag)
590{
591 grpc_debug("%s: entered", __func__);
ec2ac5f2 592
48c93061
CH
593 auto mypathps = &tag->context;
594 if (tag->is_initial_process()) {
595 // Fill our context container first time through
596 grpc_debug("%s: initialize streaming state", __func__);
b680134e
CH
597 auto paths = tag->request.path();
598 for (const std::string &path : paths) {
48c93061 599 mypathps->push_back(std::string(path));
ecf9fb30 600 }
b680134e 601 }
ecf9fb30 602
b680134e
CH
603 // Request: DataType type = 1;
604 int type = tag->request.type();
605 // Request: Encoding encoding = 2;
606 frr::Encoding encoding = tag->request.encoding();
607 // Request: bool with_defaults = 3;
608 bool with_defaults = tag->request.with_defaults();
609
b680134e
CH
610 if (mypathps->empty()) {
611 tag->async_responder.Finish(grpc::Status::OK, tag);
48c93061 612 return false;
ec2ac5f2
RW
613 }
614
b680134e
CH
615 frr::GetResponse response;
616 grpc::Status status;
617
618 // Response: int64 timestamp = 1;
619 response.set_timestamp(time(NULL));
620
621 // Response: DataTree data = 2;
622 auto *data = response.mutable_data();
623 data->set_encoding(tag->request.encoding());
624 status = get_path(data, mypathps->back().c_str(), type,
625 encoding2lyd_format(encoding), with_defaults);
626
627 if (!status.ok()) {
628 tag->async_responder.WriteAndFinish(
629 response, grpc::WriteOptions(), status, tag);
48c93061 630 return false;
b680134e
CH
631 }
632
633 mypathps->pop_back();
634 if (mypathps->empty()) {
635 tag->async_responder.WriteAndFinish(
636 response, grpc::WriteOptions(), grpc::Status::OK, tag);
48c93061 637 return false;
b680134e
CH
638 } else {
639 tag->async_responder.Write(response, tag);
48c93061 640 return true;
b680134e
CH
641 }
642}
643
48c93061
CH
644grpc::Status HandleUnaryCreateCandidate(
645 UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse>
646 *tag)
b680134e 647{
48c93061 648 grpc_debug("%s: entered", __func__);
ec2ac5f2 649
b680134e 650 struct candidate *candidate = tag->cdb->create_candidate();
48c93061
CH
651 if (!candidate)
652 return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
653 "Can't create candidate configuration");
654 tag->response.set_candidate_id(candidate->id);
655 return grpc::Status::OK;
b680134e
CH
656}
657
48c93061
CH
658grpc::Status HandleUnaryDeleteCandidate(
659 UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse>
660 *tag)
b680134e 661{
48c93061 662 grpc_debug("%s: entered", __func__);
b680134e 663
b680134e
CH
664 uint32_t candidate_id = tag->request.candidate_id();
665
666 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
667
48c93061
CH
668 if (!tag->cdb->contains(candidate_id))
669 return grpc::Status(grpc::StatusCode::NOT_FOUND,
670 "candidate configuration not found");
671 tag->cdb->delete_candidate(candidate_id);
672 return grpc::Status::OK;
b680134e
CH
673}
674
48c93061
CH
675grpc::Status HandleUnaryUpdateCandidate(
676 UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse>
677 *tag)
b680134e 678{
48c93061 679 grpc_debug("%s: entered", __func__);
ecf9fb30 680
b680134e
CH
681 uint32_t candidate_id = tag->request.candidate_id();
682
683 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
684
685 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
686
687 if (!candidate)
48c93061
CH
688 return grpc::Status(grpc::StatusCode::NOT_FOUND,
689 "candidate configuration not found");
690 if (candidate->transaction)
691 return grpc::Status(
692 grpc::StatusCode::FAILED_PRECONDITION,
693 "candidate is in the middle of a transaction");
694 if (nb_candidate_update(candidate->config) != NB_OK)
695 return grpc::Status(grpc::StatusCode::INTERNAL,
696 "failed to update candidate configuration");
b680134e 697
48c93061 698 return grpc::Status::OK;
b680134e
CH
699}
700
48c93061
CH
701grpc::Status HandleUnaryEditCandidate(
702 UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse>
703 *tag)
b680134e 704{
48c93061 705 grpc_debug("%s: entered", __func__);
b680134e 706
b680134e 707 uint32_t candidate_id = tag->request.candidate_id();
ec2ac5f2 708
b680134e 709 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
ec2ac5f2 710
b680134e 711 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
48c93061
CH
712 if (!candidate)
713 return grpc::Status(grpc::StatusCode::NOT_FOUND,
714 "candidate configuration not found");
ec2ac5f2 715
b680134e
CH
716 struct nb_config *candidate_tmp = nb_config_dup(candidate->config);
717
718 auto pvs = tag->request.update();
719 for (const frr::PathValue &pv : pvs) {
fe095adc
CH
720 if (yang_dnode_edit(candidate_tmp->dnode, pv.path(),
721 pv.value().c_str()) != 0) {
b680134e
CH
722 nb_config_free(candidate_tmp);
723
48c93061
CH
724 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
725 "Failed to update \"" + pv.path() +
726 "\"");
ecf9fb30 727 }
ec2ac5f2
RW
728 }
729
b680134e
CH
730 pvs = tag->request.delete_();
731 for (const frr::PathValue &pv : pvs) {
732 if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) {
733 nb_config_free(candidate_tmp);
48c93061
CH
734 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
735 "Failed to remove \"" + pv.path() +
736 "\"");
ec2ac5f2 737 }
ec2ac5f2
RW
738 }
739
b680134e
CH
740 // No errors, accept all changes.
741 nb_config_replace(candidate->config, candidate_tmp, false);
48c93061 742 return grpc::Status::OK;
b680134e
CH
743}
744
48c93061
CH
745grpc::Status HandleUnaryLoadToCandidate(
746 UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse>
747 *tag)
b680134e 748{
48c93061 749 grpc_debug("%s: entered", __func__);
b680134e 750
b680134e
CH
751 uint32_t candidate_id = tag->request.candidate_id();
752
753 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
754
755 // Request: LoadType type = 2;
756 int load_type = tag->request.type();
757 // Request: DataTree config = 3;
758 auto config = tag->request.config();
759
b680134e 760 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
48c93061
CH
761 if (!candidate)
762 return grpc::Status(grpc::StatusCode::NOT_FOUND,
763 "candidate configuration not found");
ec2ac5f2 764
b680134e 765 struct lyd_node *dnode = dnode_from_data_tree(&config, true);
48c93061
CH
766 if (!dnode)
767 return grpc::Status(grpc::StatusCode::INTERNAL,
768 "Failed to parse the configuration");
ec2ac5f2 769
b680134e 770 struct nb_config *loaded_config = nb_config_new(dnode);
b680134e
CH
771 if (load_type == frr::LoadToCandidateRequest::REPLACE)
772 nb_config_replace(candidate->config, loaded_config, false);
48c93061
CH
773 else if (nb_config_merge(candidate->config, loaded_config, false) !=
774 NB_OK)
775 return grpc::Status(grpc::StatusCode::INTERNAL,
776 "Failed to merge the loaded configuration");
ec2ac5f2 777
48c93061 778 return grpc::Status::OK;
b680134e
CH
779}
780
48c93061
CH
781grpc::Status
782HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
b680134e 783{
48c93061 784 grpc_debug("%s: entered", __func__);
ec2ac5f2 785
b680134e
CH
786 // Request: uint32 candidate_id = 1;
787 uint32_t candidate_id = tag->request.candidate_id();
788
789 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
790
791 // Request: Phase phase = 2;
792 int phase = tag->request.phase();
793 // Request: string comment = 3;
794 const std::string comment = tag->request.comment();
795
796 // Find candidate configuration.
797 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
48c93061
CH
798 if (!candidate)
799 return grpc::Status(grpc::StatusCode::NOT_FOUND,
800 "candidate configuration not found");
b680134e
CH
801
802 int ret = NB_OK;
803 uint32_t transaction_id = 0;
804
805 // Check for misuse of the two-phase commit protocol.
806 switch (phase) {
807 case frr::CommitRequest::PREPARE:
808 case frr::CommitRequest::ALL:
48c93061
CH
809 if (candidate->transaction)
810 return grpc::Status(
811 grpc::StatusCode::FAILED_PRECONDITION,
812 "candidate is in the middle of a transaction");
b680134e
CH
813 break;
814 case frr::CommitRequest::ABORT:
815 case frr::CommitRequest::APPLY:
48c93061
CH
816 if (!candidate->transaction)
817 return grpc::Status(
818 grpc::StatusCode::FAILED_PRECONDITION,
819 "no transaction in progress");
b680134e
CH
820 break;
821 default:
822 break;
ec2ac5f2
RW
823 }
824
ecf9fb30 825
b680134e
CH
826 // Execute the user request.
827 struct nb_context context = {};
828 context.client = NB_CLIENT_GRPC;
829 char errmsg[BUFSIZ] = {0};
830
831 switch (phase) {
832 case frr::CommitRequest::VALIDATE:
833 grpc_debug("`-> Performing VALIDATE");
834 ret = nb_candidate_validate(&context, candidate->config, errmsg,
835 sizeof(errmsg));
836 break;
837 case frr::CommitRequest::PREPARE:
838 grpc_debug("`-> Performing PREPARE");
839 ret = nb_candidate_commit_prepare(
840 &context, candidate->config, comment.c_str(),
841 &candidate->transaction, errmsg, sizeof(errmsg));
842 break;
843 case frr::CommitRequest::ABORT:
844 grpc_debug("`-> Performing ABORT");
845 nb_candidate_commit_abort(candidate->transaction, errmsg,
846 sizeof(errmsg));
847 break;
848 case frr::CommitRequest::APPLY:
849 grpc_debug("`-> Performing APPLY");
850 nb_candidate_commit_apply(candidate->transaction, true,
851 &transaction_id, errmsg,
852 sizeof(errmsg));
853 break;
854 case frr::CommitRequest::ALL:
855 grpc_debug("`-> Performing ALL");
856 ret = nb_candidate_commit(&context, candidate->config, true,
857 comment.c_str(), &transaction_id,
858 errmsg, sizeof(errmsg));
859 break;
860 }
ec2ac5f2 861
b680134e
CH
862 // Map northbound error codes to gRPC status codes.
863 grpc::Status status;
864 switch (ret) {
865 case NB_OK:
866 status = grpc::Status::OK;
867 break;
868 case NB_ERR_NO_CHANGES:
869 status = grpc::Status(grpc::StatusCode::ABORTED, errmsg);
870 break;
871 case NB_ERR_LOCKED:
872 status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg);
873 break;
874 case NB_ERR_VALIDATION:
875 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
876 errmsg);
877 break;
878 case NB_ERR_RESOURCE:
879 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
880 errmsg);
881 break;
882 case NB_ERR:
883 default:
884 status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg);
885 break;
886 }
ec2ac5f2 887
b680134e
CH
888 grpc_debug("`-> Result: %s (message: '%s')",
889 nb_err_name((enum nb_error)ret), errmsg);
890
891 if (ret == NB_OK) {
892 // Response: uint32 transaction_id = 1;
893 if (transaction_id)
894 tag->response.set_transaction_id(transaction_id);
ec2ac5f2 895 }
b680134e
CH
896 if (strlen(errmsg) > 0)
897 tag->response.set_error_message(errmsg);
ec2ac5f2 898
48c93061 899 return status;
b680134e 900}
ec2ac5f2 901
48c93061
CH
902grpc::Status HandleUnaryLockConfig(
903 UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
b680134e 904{
48c93061 905 grpc_debug("%s: entered", __func__);
ec2ac5f2 906
48c93061
CH
907 if (nb_running_lock(NB_CLIENT_GRPC, NULL))
908 return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
909 "running configuration is locked already");
910 return grpc::Status::OK;
b680134e 911}
ec2ac5f2 912
48c93061
CH
913grpc::Status HandleUnaryUnlockConfig(
914 UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
b680134e 915{
48c93061 916 grpc_debug("%s: entered", __func__);
ec2ac5f2 917
48c93061
CH
918 if (nb_running_unlock(NB_CLIENT_GRPC, NULL))
919 return grpc::Status(
920 grpc::StatusCode::FAILED_PRECONDITION,
921 "failed to unlock the running configuration");
922 return grpc::Status::OK;
b680134e 923}
ec2ac5f2 924
b680134e
CH
925static void list_transactions_cb(void *arg, int transaction_id,
926 const char *client_name, const char *date,
927 const char *comment)
928{
929 auto list = static_cast<std::list<
930 std::tuple<int, std::string, std::string, std::string>> *>(arg);
931 list->push_back(
932 std::make_tuple(transaction_id, std::string(client_name),
933 std::string(date), std::string(comment)));
934}
935
48c93061
CH
936// Define the context variable type for this streaming handler
937typedef std::list<std::tuple<int, std::string, std::string, std::string>>
938 ListTransactionsContextType;
b680134e 939
48c93061
CH
940bool HandleStreamingListTransactions(
941 StreamRpcState<frr::ListTransactionsRequest,
942 frr::ListTransactionsResponse,
943 ListTransactionsContextType> *tag)
944{
945 grpc_debug("%s: entered", __func__);
946
947 auto list = &tag->context;
948 if (tag->is_initial_process()) {
949 grpc_debug("%s: initialize streaming state", __func__);
950 // Fill our context container first time through
951 nb_db_transactions_iterate(list_transactions_cb, list);
952 list->push_back(std::make_tuple(
b680134e
CH
953 0xFFFF, std::string("fake client"),
954 std::string("fake date"), std::string("fake comment")));
48c93061
CH
955 list->push_back(std::make_tuple(0xFFFE,
956 std::string("fake client2"),
957 std::string("fake date"),
958 std::string("fake comment2")));
ec2ac5f2
RW
959 }
960
b680134e
CH
961 if (list->empty()) {
962 tag->async_responder.Finish(grpc::Status::OK, tag);
48c93061 963 return false;
ec2ac5f2
RW
964 }
965
b680134e 966 auto item = list->back();
ec2ac5f2 967
b680134e 968 frr::ListTransactionsResponse response;
ec2ac5f2 969
b680134e
CH
970 // Response: uint32 id = 1;
971 response.set_id(std::get<0>(item));
ec2ac5f2 972
b680134e
CH
973 // Response: string client = 2;
974 response.set_client(std::get<1>(item).c_str());
975
976 // Response: string date = 3;
977 response.set_date(std::get<2>(item).c_str());
978
979 // Response: string comment = 4;
980 response.set_comment(std::get<3>(item).c_str());
ec2ac5f2 981
b680134e
CH
982 list->pop_back();
983 if (list->empty()) {
984 tag->async_responder.WriteAndFinish(
985 response, grpc::WriteOptions(), grpc::Status::OK, tag);
48c93061 986 return false;
b680134e
CH
987 } else {
988 tag->async_responder.Write(response, tag);
48c93061 989 return true;
ec2ac5f2 990 }
b680134e 991}
ec2ac5f2 992
48c93061
CH
993grpc::Status HandleUnaryGetTransaction(
994 UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse>
995 *tag)
b680134e 996{
48c93061 997 grpc_debug("%s: entered", __func__);
ec2ac5f2 998
b680134e
CH
999 // Request: uint32 transaction_id = 1;
1000 uint32_t transaction_id = tag->request.transaction_id();
1001 // Request: Encoding encoding = 2;
1002 frr::Encoding encoding = tag->request.encoding();
1003 // Request: bool with_defaults = 3;
1004 bool with_defaults = tag->request.with_defaults();
1005
1006 grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__,
1007 transaction_id, encoding);
1008
1009 struct nb_config *nb_config;
1010
1011 // Load configuration from the transactions database.
1012 nb_config = nb_db_transaction_load(transaction_id);
48c93061
CH
1013 if (!nb_config)
1014 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1015 "Transaction not found");
ec2ac5f2 1016
b680134e
CH
1017 // Response: DataTree config = 1;
1018 auto config = tag->response.mutable_config();
1019 config->set_encoding(encoding);
1020
1021 // Dump data using the requested format.
1022 if (data_tree_from_dnode(config, nb_config->dnode,
1023 encoding2lyd_format(encoding), with_defaults)
1024 != 0) {
1025 nb_config_free(nb_config);
48c93061
CH
1026 return grpc::Status(grpc::StatusCode::INTERNAL,
1027 "Failed to dump data");
ec2ac5f2
RW
1028 }
1029
b680134e 1030 nb_config_free(nb_config);
ec2ac5f2 1031
48c93061 1032 return grpc::Status::OK;
b680134e 1033}
ec2ac5f2 1034
48c93061
CH
1035grpc::Status HandleUnaryExecute(
1036 UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
b680134e 1037{
48c93061 1038 grpc_debug("%s: entered", __func__);
ec2ac5f2 1039
b680134e
CH
1040 struct nb_node *nb_node;
1041 struct list *input_list;
1042 struct list *output_list;
1043 struct listnode *node;
1044 struct yang_data *data;
1045 const char *xpath;
1046 char errmsg[BUFSIZ] = {0};
1047
1048 // Request: string path = 1;
1049 xpath = tag->request.path().c_str();
1050
1051 grpc_debug("%s(path: \"%s\")", __func__, xpath);
1052
48c93061
CH
1053 if (tag->request.path().empty())
1054 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1055 "Data path is empty");
0fe5b904 1056
b680134e 1057 nb_node = nb_node_find(xpath);
48c93061
CH
1058 if (!nb_node)
1059 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1060 "Unknown data path");
ec2ac5f2 1061
b680134e
CH
1062 input_list = yang_data_list_new();
1063 output_list = yang_data_list_new();
ec2ac5f2 1064
b680134e
CH
1065 // Read input parameters.
1066 auto input = tag->request.input();
1067 for (const frr::PathValue &pv : input) {
1068 // Request: repeated PathValue input = 2;
1069 data = yang_data_new(pv.path().c_str(), pv.value().c_str());
1070 listnode_add(input_list, data);
1071 }
ec2ac5f2 1072
b680134e
CH
1073 // Execute callback registered for this XPath.
1074 if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg,
1075 sizeof(errmsg))
1076 != NB_OK) {
1077 flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s",
1078 __func__, xpath);
1079 list_delete(&input_list);
1080 list_delete(&output_list);
1081
48c93061 1082 return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed");
ec2ac5f2 1083 }
b680134e
CH
1084
1085 // Process output parameters.
1086 for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
1087 // Response: repeated PathValue output = 1;
1088 frr::PathValue *pv = tag->response.add_output();
1089 pv->set_path(data->xpath);
1090 pv->set_value(data->value);
1091 }
1092
1093 // Release memory.
1094 list_delete(&input_list);
1095 list_delete(&output_list);
1096
48c93061 1097 return grpc::Status::OK;
b680134e
CH
1098}
1099
1100// ------------------------------------------------------
1101// Thread Initialization and Run Functions
1102// ------------------------------------------------------
1103
1104
1105#define REQUEST_NEWRPC(NAME, cdb) \
1106 do { \
48c93061
CH
1107 auto _rpcState = new UnaryRpcState<frr::NAME##Request, \
1108 frr::NAME##Response>( \
b680134e
CH
1109 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1110 &HandleUnary##NAME, #NAME); \
83f6fce7 1111 _rpcState->do_request(&service, cq.get(), true); \
b680134e
CH
1112 } while (0)
1113
48c93061 1114#define REQUEST_NEWRPC_STREAMING(NAME) \
b680134e 1115 do { \
48c93061
CH
1116 auto _rpcState = new StreamRpcState<frr::NAME##Request, \
1117 frr::NAME##Response, \
1118 NAME##ContextType>( \
1119 &frr::Northbound::AsyncService::Request##NAME, \
b680134e 1120 &HandleStreaming##NAME, #NAME); \
83f6fce7 1121 _rpcState->do_request(&service, cq.get(), true); \
b680134e
CH
1122 } while (0)
1123
1124struct grpc_pthread_attr {
1125 struct frr_pthread_attr attr;
1126 unsigned long port;
ec2ac5f2
RW
1127};
1128
de800c10 1129// Capture these objects so we can try to shut down cleanly
83f6fce7
CH
1130static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER;
1131static grpc::Server *s_server;
de800c10 1132
ec2ac5f2
RW
1133static void *grpc_pthread_start(void *arg)
1134{
0edcb505 1135 struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
b680134e
CH
1136 uint port = (uint) reinterpret_cast<intptr_t>(fpt->data);
1137
1138 Candidates candidates;
1139 grpc::ServerBuilder builder;
1140 std::stringstream server_address;
83f6fce7 1141 frr::Northbound::AsyncService service;
ec2ac5f2 1142
0edcb505
CS
1143 frr_pthread_set_name(fpt);
1144
b680134e
CH
1145 server_address << "0.0.0.0:" << port;
1146 builder.AddListeningPort(server_address.str(),
1147 grpc::InsecureServerCredentials());
83f6fce7 1148 builder.RegisterService(&service);
673e4407
RZ
1149 builder.AddChannelArgument(
1150 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
83f6fce7
CH
1151 std::unique_ptr<grpc::ServerCompletionQueue> cq =
1152 builder.AddCompletionQueue();
1153 std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
1154 s_server = server.get();
1155
ad6bd536 1156 pthread_mutex_lock(&s_server_lock); // Make coverity happy
83f6fce7 1157 grpc_running = true;
ad6bd536 1158 pthread_mutex_unlock(&s_server_lock); // Make coverity happy
b680134e 1159
48c93061 1160 /* Schedule unary RPC handlers */
b680134e
CH
1161 REQUEST_NEWRPC(GetCapabilities, NULL);
1162 REQUEST_NEWRPC(CreateCandidate, &candidates);
1163 REQUEST_NEWRPC(DeleteCandidate, &candidates);
1164 REQUEST_NEWRPC(UpdateCandidate, &candidates);
1165 REQUEST_NEWRPC(EditCandidate, &candidates);
1166 REQUEST_NEWRPC(LoadToCandidate, &candidates);
1167 REQUEST_NEWRPC(Commit, &candidates);
1168 REQUEST_NEWRPC(GetTransaction, NULL);
1169 REQUEST_NEWRPC(LockConfig, NULL);
1170 REQUEST_NEWRPC(UnlockConfig, NULL);
1171 REQUEST_NEWRPC(Execute, NULL);
48c93061
CH
1172
1173 /* Schedule streaming RPC handlers */
1174 REQUEST_NEWRPC_STREAMING(Get);
1175 REQUEST_NEWRPC_STREAMING(ListTransactions);
b680134e
CH
1176
1177 zlog_notice("gRPC server listening on %s",
1178 server_address.str().c_str());
1179
1180 /* Process inbound RPCs */
83f6fce7
CH
1181 bool ok;
1182 void *tag;
48c93061 1183 while (true) {
83f6fce7
CH
1184 if (!cq->Next(&tag, &ok)) {
1185 grpc_debug("%s: CQ empty exiting", __func__);
de800c10 1186 break;
83f6fce7 1187 }
de800c10 1188
83f6fce7
CH
1189 grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag,
1190 ok);
1191
48c93061 1192 if (!ok) {
83f6fce7
CH
1193 delete static_cast<RpcStateBase *>(tag);
1194 break;
1195 }
b680134e
CH
1196
1197 RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
48c93061
CH
1198 if (rpc->get_state() != FINISH)
1199 rpc->run(&service, cq.get());
1200 else {
1201 grpc_debug("%s RPC FINISH -> [delete]", rpc->name);
1202 delete rpc;
1203 }
b680134e 1204 }
ec2ac5f2 1205
83f6fce7
CH
1206 /* This was probably done for us to get here, but let's be safe */
1207 pthread_mutex_lock(&s_server_lock);
1208 grpc_running = false;
1209 if (s_server) {
1210 grpc_debug("%s: shutdown server and CQ", __func__);
1211 server->Shutdown();
1212 s_server = NULL;
1213 }
1214 pthread_mutex_unlock(&s_server_lock);
1215
1216 grpc_debug("%s: shutting down CQ", __func__);
1217 cq->Shutdown();
1218
1219 grpc_debug("%s: draining the CQ", __func__);
1220 while (cq->Next(&tag, &ok)) {
1221 grpc_debug("%s: drain tag %p", __func__, tag);
1222 delete static_cast<RpcStateBase *>(tag);
b680134e 1223 }
ec2ac5f2 1224
83f6fce7 1225 zlog_info("%s: exiting from grpc pthread", __func__);
ec2ac5f2
RW
1226 return NULL;
1227}
1228
b680134e
CH
1229
1230static int frr_grpc_init(uint port)
ec2ac5f2 1231{
b680134e
CH
1232 struct frr_pthread_attr attr = {
1233 .start = grpc_pthread_start,
1234 .stop = NULL,
1235 };
1236
83f6fce7
CH
1237 grpc_debug("%s: entered", __func__);
1238
0edcb505 1239 fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
b680134e 1240 fpt->data = reinterpret_cast<void *>((intptr_t)port);
0edcb505 1241
ec2ac5f2 1242 /* Create a pthread for gRPC since it runs its own event loop. */
0edcb505 1243 if (frr_pthread_run(fpt, NULL) < 0) {
ec2ac5f2
RW
1244 flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s",
1245 __func__, safe_strerror(errno));
1246 return -1;
1247 }
ec2ac5f2
RW
1248
1249 return 0;
1250}
1251
1252static int frr_grpc_finish(void)
1253{
83f6fce7 1254 grpc_debug("%s: entered", __func__);
de800c10 1255
83f6fce7
CH
1256 if (!fpt)
1257 return 0;
de800c10 1258
83f6fce7
CH
1259 /*
1260 * Shut the server down here in main thread. This will cause the wait on
1261 * the completion queue (cq.Next()) to exit and cleanup everything else.
1262 */
1263 pthread_mutex_lock(&s_server_lock);
1264 grpc_running = false;
1265 if (s_server) {
1266 grpc_debug("%s: shutdown server", __func__);
1267 s_server->Shutdown();
1268 s_server = NULL;
de800c10 1269 }
83f6fce7 1270 pthread_mutex_unlock(&s_server_lock);
de800c10 1271
83f6fce7
CH
1272 grpc_debug("%s: joining and destroy grpc thread", __func__);
1273 pthread_join(fpt->thread, NULL);
1274 frr_pthread_destroy(fpt);
79c68195
RZ
1275
1276 // Fix protobuf 'memory leaks' during shutdown.
1277 // https://groups.google.com/g/protobuf/c/4y_EmQiCGgs
1278 google::protobuf::ShutdownProtobufLibrary();
1279
ec2ac5f2
RW
1280 return 0;
1281}
1282
20b0a2ed
QY
1283/*
1284 * This is done this way because module_init and module_late_init are both
1285 * called during daemon pre-fork initialization. Because the GRPC library
1286 * spawns threads internally, we need to delay initializing it until after
1287 * fork. This is done by scheduling this init function as an event task, since
1288 * the event loop doesn't run until after fork.
1289 */
cc9f21da 1290static void frr_grpc_module_very_late_init(struct thread *thread)
ec2ac5f2 1291{
ec2ac5f2 1292 const char *args = THIS_MODULE->load_args;
b680134e 1293 uint port = GRPC_DEFAULT_PORT;
ec2ac5f2 1294
ec2ac5f2 1295 if (args) {
b680134e
CH
1296 port = std::stoul(args);
1297 if (port < 1024 || port > UINT16_MAX) {
ec2ac5f2 1298 flog_err(EC_LIB_GRPC_INIT,
b680134e
CH
1299 "%s: port number must be between 1025 and %d",
1300 __func__, UINT16_MAX);
ec2ac5f2
RW
1301 goto error;
1302 }
1303 }
1304
b680134e 1305 if (frr_grpc_init(port) < 0)
ec2ac5f2
RW
1306 goto error;
1307
cc9f21da 1308 return;
69ec5832 1309
ec2ac5f2
RW
1310error:
1311 flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module");
ec2ac5f2
RW
1312}
1313
20b0a2ed
QY
1314static int frr_grpc_module_late_init(struct thread_master *tm)
1315{
b680134e 1316 main_master = tm;
20b0a2ed 1317 hook_register(frr_fini, frr_grpc_finish);
b680134e 1318 thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL);
20b0a2ed
QY
1319 return 0;
1320}
1321
ec2ac5f2
RW
1322static int frr_grpc_module_init(void)
1323{
1324 hook_register(frr_late_init, frr_grpc_module_late_init);
1325
1326 return 0;
1327}
1328
1329FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION,
1330 .description = "FRR gRPC northbound module",
b680134e 1331 .init = frr_grpc_module_init, );