]> git.proxmox.com Git - mirror_frr.git/blame - lib/northbound_grpc.cpp
Merge pull request #10767 from donaldsharp/some_fixes
[mirror_frr.git] / lib / northbound_grpc.cpp
CommitLineData
ec2ac5f2 1//
c85ecd64 2// Copyright (c) 2021-2022, LabN Consulting, L.L.C
ec2ac5f2
RW
3// Copyright (C) 2019 NetDEF, Inc.
4// Renato Westphal
5//
6// This program is free software; you can redistribute it and/or modify it
7// under the terms of the GNU General Public License as published by the Free
8// Software Foundation; either version 2 of the License, or (at your option)
9// any later version.
10//
11// This program is distributed in the hope that it will be useful, but WITHOUT
12// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14// more details.
15//
16// You should have received a copy of the GNU General Public License along
17// with this program; see the file COPYING; if not, write to the Free Software
18// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19//
20
21#include <zebra.h>
63d12a7d
QY
22#include <grpcpp/grpcpp.h>
23#include "grpc/frr-northbound.grpc.pb.h"
ec2ac5f2
RW
24
25#include "log.h"
26#include "libfrr.h"
09781197 27#include "lib/version.h"
b680134e 28#include "lib/thread.h"
ec2ac5f2
RW
29#include "command.h"
30#include "lib_errors.h"
31#include "northbound.h"
32#include "northbound_db.h"
0edcb505 33#include "frr_pthread.h"
ec2ac5f2
RW
34
35#include <iostream>
36#include <sstream>
37#include <memory>
38#include <string>
39
ec2ac5f2
RW
40#define GRPC_DEFAULT_PORT 50051
41
42/*
43 * NOTE: we can't use the FRR debugging infrastructure here since it uses
44 * atomics and C++ has a different atomics API. Enable gRPC debugging
45 * unconditionally until we figure out a way to solve this problem.
46 */
b680134e
CH
47static bool nb_dbg_client_grpc = 0;
48
49static struct thread_master *main_master;
ec2ac5f2 50
0edcb505
CS
51static struct frr_pthread *fpt;
52
83f6fce7
CH
53static bool grpc_running;
54
b680134e
CH
55#define grpc_debug(...) \
56 do { \
57 if (nb_dbg_client_grpc) \
58 zlog_debug(__VA_ARGS__); \
59 } while (0)
60
61// ------------------------------------------------------
62// New Types
63// ------------------------------------------------------
64
65enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED };
66const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"};
67
68struct candidate {
69 uint64_t id;
70 struct nb_config *config;
71 struct nb_transaction *transaction;
0edcb505 72};
ec2ac5f2 73
b680134e
CH
74class Candidates
75{
76 public:
77 ~Candidates(void)
78 {
79 // Delete candidates.
80 for (auto it = _cdb.begin(); it != _cdb.end(); it++)
81 delete_candidate(&it->second);
82 }
83
84 struct candidate *create_candidate(void)
85 {
86 uint64_t id = ++_next_id;
87 assert(id); // TODO: implement an algorithm for unique reusable
88 // IDs.
89 struct candidate *c = &_cdb[id];
90 c->id = id;
91 c->config = nb_config_dup(running_config);
92 c->transaction = NULL;
93
94 return c;
95 }
96
97 void delete_candidate(struct candidate *c)
98 {
99 char errmsg[BUFSIZ] = {0};
100
b680134e
CH
101 nb_config_free(c->config);
102 if (c->transaction)
103 nb_candidate_commit_abort(c->transaction, errmsg,
104 sizeof(errmsg));
96d434f8 105 _cdb.erase(c->id);
b680134e
CH
106 }
107
108 struct candidate *get_candidate(uint32_t id)
109 {
110 return _cdb.count(id) == 0 ? NULL : &_cdb[id];
111 }
112
113 private:
114 uint64_t _next_id = 0;
115 std::map<uint32_t, struct candidate> _cdb;
116};
ecf9fb30 117
ecf9fb30
QY
118class RpcStateBase
119{
120 public:
83f6fce7 121 virtual ~RpcStateBase() = default;
b680134e
CH
122 virtual CallState doCallback() = 0;
123 virtual void do_request(::frr::Northbound::AsyncService *service,
83f6fce7
CH
124 ::grpc::ServerCompletionQueue *cq,
125 bool no_copy) = 0;
ecf9fb30
QY
126};
127
b680134e
CH
128/*
129 * The RPC state class is used to track the execution of an RPC.
130 */
131template <typename Q, typename S> class NewRpcState : RpcStateBase
ecf9fb30 132{
b680134e
CH
133 typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
134 ::grpc::ServerContext *, Q *,
135 ::grpc::ServerAsyncResponseWriter<S> *,
136 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
137 void *);
138 typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
139 ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
140 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
141 void *);
142
ecf9fb30 143 public:
b680134e
CH
144 NewRpcState(Candidates *cdb, reqfunc_t rfunc,
145 void (*cb)(NewRpcState<Q, S> *), const char *name)
146 : requestf(rfunc), callback(cb), responder(&ctx),
147 async_responder(&ctx), name(name), cdb(cdb){};
148 NewRpcState(Candidates *cdb, reqsfunc_t rfunc,
149 void (*cb)(NewRpcState<Q, S> *), const char *name)
150 : requestsf(rfunc), callback(cb), responder(&ctx),
151 async_responder(&ctx), name(name), cdb(cdb){};
152
153 CallState doCallback() override
154 {
155 CallState enter_state = this->state;
156 CallState new_state;
157 if (enter_state == FINISH) {
158 grpc_debug("%s RPC FINISH -> DELETED", name);
159 new_state = FINISH;
160 } else {
161 grpc_debug("%s RPC: %s -> PROCESS", name,
162 call_states[this->state]);
163 new_state = PROCESS;
164 }
165 /*
166 * We are either in state CREATE, MORE or FINISH. If CREATE or
167 * MORE move back to PROCESS, otherwise we are cleaning up
168 * (FINISH) so leave it in that state. Run the callback on the
169 * main threadmaster/pthread; and wait for expected transition
170 * from main thread. If transition is to FINISH->DELETED.
171 * delete us.
172 *
173 * We update the state prior to scheduling the callback which
174 * may then update the state in the master pthread. Then we
175 * obtain the lock in the condvar-check-loop as the callback
176 * will be modifying updating the state value.
177 */
178 this->state = new_state;
179 thread_add_event(main_master, c_callback, (void *)this, 0,
180 NULL);
181 pthread_mutex_lock(&this->cmux);
182 while (this->state == new_state)
183 pthread_cond_wait(&this->cond, &this->cmux);
184 pthread_mutex_unlock(&this->cmux);
185
186 if (this->state == DELETED) {
187 grpc_debug("%s RPC: -> [DELETED]", name);
188 delete this;
189 return DELETED;
190 }
191 return this->state;
192 }
ecf9fb30 193
b680134e 194 void do_request(::frr::Northbound::AsyncService *service,
83f6fce7
CH
195 ::grpc::ServerCompletionQueue *cq,
196 bool no_copy) override
ecf9fb30 197 {
b680134e
CH
198 grpc_debug("%s, posting a request for: %s", __func__, name);
199 if (requestf) {
200 NewRpcState<Q, S> *copy =
83f6fce7
CH
201 no_copy ? this
202 : new NewRpcState(cdb, requestf,
203 callback, name);
b680134e
CH
204 (service->*requestf)(&copy->ctx, &copy->request,
205 &copy->responder, cq, cq, copy);
206 } else {
207 NewRpcState<Q, S> *copy =
83f6fce7
CH
208 no_copy ? this
209 : new NewRpcState(cdb, requestsf,
210 callback, name);
b680134e
CH
211 (service->*requestsf)(&copy->ctx, &copy->request,
212 &copy->async_responder, cq, cq,
213 copy);
214 }
215 }
216
217
cc9f21da 218 static void c_callback(struct thread *thread)
b680134e
CH
219 {
220 auto _tag = static_cast<NewRpcState<Q, S> *>(thread->arg);
221 /*
222 * We hold the lock until the callback finishes and has updated
223 * _tag->state, then we signal done and release.
224 */
225 pthread_mutex_lock(&_tag->cmux);
226
227 CallState enter_state = _tag->state;
228 grpc_debug("%s RPC running on main thread", _tag->name);
229
230 _tag->callback(_tag);
231
232 grpc_debug("%s RPC: %s -> %s", _tag->name,
233 call_states[enter_state], call_states[_tag->state]);
234
235 pthread_cond_signal(&_tag->cond);
236 pthread_mutex_unlock(&_tag->cmux);
cc9f21da 237 return;
ecf9fb30
QY
238 }
239
b680134e 240 const char *name;
ecf9fb30
QY
241 grpc::ServerContext ctx;
242 Q request;
243 S response;
244 grpc::ServerAsyncResponseWriter<S> responder;
245 grpc::ServerAsyncWriter<S> async_responder;
246
b680134e
CH
247 Candidates *cdb;
248 void (*callback)(NewRpcState<Q, S> *);
c85ecd64
CH
249 reqfunc_t requestf = NULL;
250 reqsfunc_t requestsf = NULL;
ecf9fb30 251
b680134e
CH
252 pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
253 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
c85ecd64 254 void *context = 0;
b680134e
CH
255
256 CallState state = CREATE;
ecf9fb30
QY
257};
258
b680134e
CH
259// ------------------------------------------------------
260// Utility Functions
261// ------------------------------------------------------
ecf9fb30 262
b680134e
CH
263static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
264{
265 switch (encoding) {
266 case frr::JSON:
267 return LYD_JSON;
268 case frr::XML:
269 return LYD_XML;
270 default:
271 flog_err(EC_LIB_DEVELOPMENT,
272 "%s: unknown data encoding format (%u)", __func__,
273 encoding);
274 exit(1);
275 }
276}
ecf9fb30 277
b680134e 278static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path,
fe095adc 279 const char *value)
ec2ac5f2 280{
fe095adc
CH
281 LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value,
282 LYD_NEW_PATH_UPDATE, &dnode);
b680134e
CH
283 if (err != LY_SUCCESS) {
284 flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",
285 __func__, ly_errmsg(ly_native_ctx));
286 return -1;
ec2ac5f2
RW
287 }
288
b680134e
CH
289 return 0;
290}
291
292static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path)
293{
294 dnode = yang_dnode_get(dnode, path.c_str());
295 if (!dnode)
296 return -1;
297
298 lyd_free_tree(dnode);
299
300 return 0;
301}
302
303static LY_ERR data_tree_from_dnode(frr::DataTree *dt,
304 const struct lyd_node *dnode,
305 LYD_FORMAT lyd_format, bool with_defaults)
306{
307 char *strp;
308 int options = 0;
309
310 SET_FLAG(options, LYD_PRINT_WITHSIBLINGS);
311 if (with_defaults)
312 SET_FLAG(options, LYD_PRINT_WD_ALL);
313 else
314 SET_FLAG(options, LYD_PRINT_WD_TRIM);
315
316 LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options);
317 if (err == LY_SUCCESS) {
318 if (strp) {
319 dt->set_data(strp);
320 free(strp);
321 }
ec2ac5f2 322 }
b680134e
CH
323 return err;
324}
ec2ac5f2 325
b680134e
CH
326static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt,
327 bool config_only)
328{
329 struct lyd_node *dnode;
330 int options, opt2;
331 LY_ERR err;
332
333 if (config_only) {
334 options = LYD_PARSE_NO_STATE;
335 opt2 = LYD_VALIDATE_NO_STATE;
336 } else {
337 options = LYD_PARSE_STRICT;
338 opt2 = 0;
339 }
340
341 err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(),
342 encoding2lyd_format(dt->encoding()), options,
343 opt2, &dnode);
344 if (err != LY_SUCCESS) {
345 flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s",
346 __func__, ly_errmsg(ly_native_ctx));
347 }
348 return dnode;
349}
350
351static struct lyd_node *get_dnode_config(const std::string &path)
352{
353 struct lyd_node *dnode;
354
0f538858
RZ
355 if (!yang_dnode_exists(running_config->dnode,
356 path.empty() ? NULL : path.c_str()))
357 return NULL;
358
b680134e
CH
359 dnode = yang_dnode_get(running_config->dnode,
360 path.empty() ? NULL : path.c_str());
361 if (dnode)
362 dnode = yang_dnode_dup(dnode);
363
364 return dnode;
365}
366
367static int get_oper_data_cb(const struct lysc_node *snode,
368 struct yang_translator *translator,
369 struct yang_data *data, void *arg)
370{
371 struct lyd_node *dnode = static_cast<struct lyd_node *>(arg);
372 int ret = yang_dnode_edit(dnode, data->xpath, data->value);
373 yang_data_free(data);
374
375 return (ret == 0) ? NB_OK : NB_ERR;
376}
377
378static struct lyd_node *get_dnode_state(const std::string &path)
379{
380 struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false);
381 if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode)
382 != NB_OK) {
383 yang_dnode_free(dnode);
384 return NULL;
385 }
386
387 return dnode;
388}
389
390static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
391 int type, LYD_FORMAT lyd_format,
392 bool with_defaults)
393{
394 struct lyd_node *dnode_config = NULL;
395 struct lyd_node *dnode_state = NULL;
396 struct lyd_node *dnode_final;
397
398 // Configuration data.
399 if (type == frr::GetRequest_DataType_ALL
400 || type == frr::GetRequest_DataType_CONFIG) {
401 dnode_config = get_dnode_config(path);
402 if (!dnode_config)
403 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
404 "Data path not found");
405 }
406
407 // Operational data.
408 if (type == frr::GetRequest_DataType_ALL
409 || type == frr::GetRequest_DataType_STATE) {
410 dnode_state = get_dnode_state(path);
411 if (!dnode_state) {
412 if (dnode_config)
413 yang_dnode_free(dnode_config);
414 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
415 "Failed to fetch operational data");
416 }
417 }
418
419 switch (type) {
420 case frr::GetRequest_DataType_ALL:
421 //
422 // Combine configuration and state data into a single
423 // dnode.
424 //
425 if (lyd_merge_siblings(&dnode_state, dnode_config,
426 LYD_MERGE_DESTRUCT)
427 != LY_SUCCESS) {
428 yang_dnode_free(dnode_state);
429 yang_dnode_free(dnode_config);
430 return grpc::Status(
431 grpc::StatusCode::INTERNAL,
432 "Failed to merge configuration and state data",
433 ly_errmsg(ly_native_ctx));
ecf9fb30 434 }
b680134e
CH
435
436 dnode_final = dnode_state;
437 break;
438 case frr::GetRequest_DataType_CONFIG:
439 dnode_final = dnode_config;
440 break;
441 case frr::GetRequest_DataType_STATE:
442 dnode_final = dnode_state;
443 break;
ecf9fb30
QY
444 }
445
b680134e
CH
446 // Validate data to create implicit default nodes if necessary.
447 int validate_opts = 0;
448 if (type == frr::GetRequest_DataType_CONFIG)
449 validate_opts = LYD_VALIDATE_NO_STATE;
450 else
451 validate_opts = 0;
452
453 LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx,
454 validate_opts, NULL);
455
456 if (err)
457 flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s",
458 __func__, ly_errmsg(ly_native_ctx));
459 // Dump data using the requested format.
460 if (!err)
461 err = data_tree_from_dnode(dt, dnode_final, lyd_format,
462 with_defaults);
463 yang_dnode_free(dnode_final);
464 if (err)
465 return grpc::Status(grpc::StatusCode::INTERNAL,
466 "Failed to dump data");
467 return grpc::Status::OK;
468}
469
470
471// ------------------------------------------------------
472// RPC Callback Functions: run on main thread
473// ------------------------------------------------------
474
475void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest,
ecf9fb30 476 frr::GetCapabilitiesResponse> *tag)
b680134e
CH
477{
478 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
479
480 if (tag->state == FINISH) {
481 tag->state = DELETED;
482 return;
483 }
484
485 // Response: string frr_version = 1;
486 tag->response.set_frr_version(FRR_VERSION);
487
488 // Response: bool rollback_support = 2;
ec2ac5f2 489#ifdef HAVE_CONFIG_ROLLBACKS
b680134e 490 tag->response.set_rollback_support(true);
ec2ac5f2 491#else
b680134e 492 tag->response.set_rollback_support(false);
ec2ac5f2 493#endif
b680134e
CH
494 // Response: repeated ModuleData supported_modules = 3;
495 struct yang_module *module;
496 RB_FOREACH (module, yang_modules, &yang_modules) {
497 auto m = tag->response.add_supported_modules();
498
499 m->set_name(module->name);
500 if (module->info->revision)
501 m->set_revision(module->info->revision);
502 m->set_organization(module->info->org);
503 }
ec2ac5f2 504
b680134e
CH
505 // Response: repeated Encoding supported_encodings = 4;
506 tag->response.add_supported_encodings(frr::JSON);
507 tag->response.add_supported_encodings(frr::XML);
ec2ac5f2 508
b680134e
CH
509 /* Should we do this in the async process call? */
510 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
ec2ac5f2 511
b680134e
CH
512 /* Indicate we are done. */
513 tag->state = FINISH;
514}
ec2ac5f2 515
b680134e
CH
516void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag)
517{
518 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
519
520 if (tag->state == FINISH) {
521 delete static_cast<std::list<std::string> *>(tag->context);
522 tag->state = DELETED;
523 return;
ec2ac5f2
RW
524 }
525
b680134e
CH
526 if (!tag->context) {
527 /* Creating, first time called for this RPC */
528 auto mypaths = new std::list<std::string>();
529 tag->context = mypaths;
530 auto paths = tag->request.path();
531 for (const std::string &path : paths) {
532 mypaths->push_back(std::string(path));
ecf9fb30 533 }
b680134e 534 }
ecf9fb30 535
b680134e
CH
536 // Request: DataType type = 1;
537 int type = tag->request.type();
538 // Request: Encoding encoding = 2;
539 frr::Encoding encoding = tag->request.encoding();
540 // Request: bool with_defaults = 3;
541 bool with_defaults = tag->request.with_defaults();
542
543 auto mypathps = static_cast<std::list<std::string> *>(tag->context);
544 if (mypathps->empty()) {
545 tag->async_responder.Finish(grpc::Status::OK, tag);
546 tag->state = FINISH;
547 return;
ec2ac5f2
RW
548 }
549
b680134e
CH
550 frr::GetResponse response;
551 grpc::Status status;
552
553 // Response: int64 timestamp = 1;
554 response.set_timestamp(time(NULL));
555
556 // Response: DataTree data = 2;
557 auto *data = response.mutable_data();
558 data->set_encoding(tag->request.encoding());
559 status = get_path(data, mypathps->back().c_str(), type,
560 encoding2lyd_format(encoding), with_defaults);
561
562 if (!status.ok()) {
563 tag->async_responder.WriteAndFinish(
564 response, grpc::WriteOptions(), status, tag);
565 tag->state = FINISH;
566 return;
567 }
568
569 mypathps->pop_back();
570 if (mypathps->empty()) {
571 tag->async_responder.WriteAndFinish(
572 response, grpc::WriteOptions(), grpc::Status::OK, tag);
573 tag->state = FINISH;
574 } else {
575 tag->async_responder.Write(response, tag);
576 tag->state = MORE;
577 }
578}
579
580void HandleUnaryCreateCandidate(NewRpcState<frr::CreateCandidateRequest,
ecf9fb30 581 frr::CreateCandidateResponse> *tag)
b680134e
CH
582{
583 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
ec2ac5f2 584
b680134e
CH
585 if (tag->state == FINISH) {
586 tag->state = DELETED;
587 return;
588 }
ec2ac5f2 589
b680134e
CH
590 struct candidate *candidate = tag->cdb->create_candidate();
591 if (!candidate) {
592 tag->responder.Finish(
593 tag->response,
594 grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
595 "Can't create candidate configuration"),
596 tag);
597 } else {
598 tag->response.set_candidate_id(candidate->id);
599 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
ec2ac5f2
RW
600 }
601
b680134e
CH
602 tag->state = FINISH;
603}
604
605void HandleUnaryDeleteCandidate(NewRpcState<frr::DeleteCandidateRequest,
ecf9fb30 606 frr::DeleteCandidateResponse> *tag)
b680134e
CH
607{
608 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
609
610 if (tag->state == FINISH) {
611 tag->state = DELETED;
612 return;
ecf9fb30 613 }
ec2ac5f2 614
b680134e
CH
615 // Request: uint32 candidate_id = 1;
616 uint32_t candidate_id = tag->request.candidate_id();
617
618 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
619
620 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
621 if (!candidate) {
622 tag->responder.Finish(
623 tag->response,
624 grpc::Status(grpc::StatusCode::NOT_FOUND,
625 "candidate configuration not found"),
626 tag);
627 } else {
628 tag->cdb->delete_candidate(candidate);
629 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
630 }
631 tag->state = FINISH;
632}
633
634void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest,
ecf9fb30 635 frr::UpdateCandidateResponse> *tag)
b680134e
CH
636{
637 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
ecf9fb30 638
b680134e
CH
639 if (tag->state == FINISH) {
640 tag->state = DELETED;
641 return;
642 }
ec2ac5f2 643
b680134e
CH
644 // Request: uint32 candidate_id = 1;
645 uint32_t candidate_id = tag->request.candidate_id();
646
647 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
648
649 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
650
651 if (!candidate)
652 tag->responder.Finish(
653 tag->response,
654 grpc::Status(grpc::StatusCode::NOT_FOUND,
655 "candidate configuration not found"),
656 tag);
657 else if (candidate->transaction)
658 tag->responder.Finish(
659 tag->response,
660 grpc::Status(
661 grpc::StatusCode::FAILED_PRECONDITION,
662 "candidate is in the middle of a transaction"),
663 tag);
664 else if (nb_candidate_update(candidate->config) != NB_OK)
665 tag->responder.Finish(
666 tag->response,
667 grpc::Status(
668 grpc::StatusCode::INTERNAL,
669 "failed to update candidate configuration"),
670 tag);
671
672 else
673 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
674
675 tag->state = FINISH;
676}
677
678void HandleUnaryEditCandidate(
679 NewRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> *tag)
680{
681 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
682
683 if (tag->state == FINISH) {
684 tag->state = DELETED;
685 return;
ec2ac5f2
RW
686 }
687
b680134e
CH
688 // Request: uint32 candidate_id = 1;
689 uint32_t candidate_id = tag->request.candidate_id();
ec2ac5f2 690
b680134e 691 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
ec2ac5f2 692
b680134e
CH
693 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
694
695 if (!candidate) {
696 tag->responder.Finish(
697 tag->response,
698 grpc::Status(grpc::StatusCode::NOT_FOUND,
699 "candidate configuration not found"),
700 tag);
701 tag->state = FINISH;
702 return;
ec2ac5f2
RW
703 }
704
b680134e
CH
705 struct nb_config *candidate_tmp = nb_config_dup(candidate->config);
706
707 auto pvs = tag->request.update();
708 for (const frr::PathValue &pv : pvs) {
fe095adc
CH
709 if (yang_dnode_edit(candidate_tmp->dnode, pv.path(),
710 pv.value().c_str()) != 0) {
b680134e
CH
711 nb_config_free(candidate_tmp);
712
713 tag->responder.Finish(
714 tag->response,
715 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
716 "Failed to update \"" + pv.path()
717 + "\""),
718 tag);
719
ecf9fb30 720 tag->state = FINISH;
b680134e 721 return;
ecf9fb30 722 }
ec2ac5f2
RW
723 }
724
b680134e
CH
725 pvs = tag->request.delete_();
726 for (const frr::PathValue &pv : pvs) {
727 if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) {
728 nb_config_free(candidate_tmp);
729 tag->responder.Finish(
730 tag->response,
731 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
732 "Failed to remove \"" + pv.path()
733 + "\""),
734 tag);
ecf9fb30 735 tag->state = FINISH;
b680134e 736 return;
ec2ac5f2 737 }
ec2ac5f2
RW
738 }
739
b680134e
CH
740 // No errors, accept all changes.
741 nb_config_replace(candidate->config, candidate_tmp, false);
742
743 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
744
745 tag->state = FINISH;
746}
747
748void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest,
749 frr::LoadToCandidateResponse> *tag)
750{
751 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
752
753 if (tag->state == FINISH) {
754 tag->state = DELETED;
755 return;
ec2ac5f2
RW
756 }
757
b680134e
CH
758 // Request: uint32 candidate_id = 1;
759 uint32_t candidate_id = tag->request.candidate_id();
760
761 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
762
763 // Request: LoadType type = 2;
764 int load_type = tag->request.type();
765 // Request: DataTree config = 3;
766 auto config = tag->request.config();
767
768
769 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
770
771 if (!candidate) {
772 tag->responder.Finish(
773 tag->response,
774 grpc::Status(grpc::StatusCode::NOT_FOUND,
775 "candidate configuration not found"),
776 tag);
777 tag->state = FINISH;
778 return;
ec2ac5f2
RW
779 }
780
b680134e
CH
781 struct lyd_node *dnode = dnode_from_data_tree(&config, true);
782 if (!dnode) {
783 tag->responder.Finish(
784 tag->response,
785 grpc::Status(grpc::StatusCode::INTERNAL,
786 "Failed to parse the configuration"),
787 tag);
788 tag->state = FINISH;
789 return;
ecf9fb30 790 }
ec2ac5f2 791
b680134e
CH
792 struct nb_config *loaded_config = nb_config_new(dnode);
793
794 if (load_type == frr::LoadToCandidateRequest::REPLACE)
795 nb_config_replace(candidate->config, loaded_config, false);
796 else if (nb_config_merge(candidate->config, loaded_config, false)
797 != NB_OK) {
798 tag->responder.Finish(
799 tag->response,
800 grpc::Status(
801 grpc::StatusCode::INTERNAL,
802 "Failed to merge the loaded configuration"),
803 tag);
804 tag->state = FINISH;
805 return;
ec2ac5f2
RW
806 }
807
b680134e
CH
808 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
809 tag->state = FINISH;
810}
811
812void HandleUnaryCommit(
813 NewRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
814{
815 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
816
817 if (tag->state == FINISH) {
818 tag->state = DELETED;
819 return;
820 }
ec2ac5f2 821
b680134e
CH
822 // Request: uint32 candidate_id = 1;
823 uint32_t candidate_id = tag->request.candidate_id();
824
825 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
826
827 // Request: Phase phase = 2;
828 int phase = tag->request.phase();
829 // Request: string comment = 3;
830 const std::string comment = tag->request.comment();
831
832 // Find candidate configuration.
833 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
834 if (!candidate) {
835 tag->responder.Finish(
836 tag->response,
837 grpc::Status(grpc::StatusCode::NOT_FOUND,
838 "candidate configuration not found"),
839 tag);
840 tag->state = FINISH;
841 return;
842 }
843
844 int ret = NB_OK;
845 uint32_t transaction_id = 0;
846
847 // Check for misuse of the two-phase commit protocol.
848 switch (phase) {
849 case frr::CommitRequest::PREPARE:
850 case frr::CommitRequest::ALL:
851 if (candidate->transaction) {
852 tag->responder.Finish(
853 tag->response,
854 grpc::Status(
855 grpc::StatusCode::FAILED_PRECONDITION,
856 "candidate is in the middle of a transaction"),
857 tag);
ecf9fb30 858 tag->state = FINISH;
b680134e 859 return;
ecf9fb30 860 }
b680134e
CH
861 break;
862 case frr::CommitRequest::ABORT:
863 case frr::CommitRequest::APPLY:
864 if (!candidate->transaction) {
865 tag->responder.Finish(
866 tag->response,
867 grpc::Status(
868 grpc::StatusCode::FAILED_PRECONDITION,
869 "no transaction in progress"),
870 tag);
871 tag->state = FINISH;
872 return;
ec2ac5f2 873 }
b680134e
CH
874 break;
875 default:
876 break;
ec2ac5f2
RW
877 }
878
ecf9fb30 879
b680134e
CH
880 // Execute the user request.
881 struct nb_context context = {};
882 context.client = NB_CLIENT_GRPC;
883 char errmsg[BUFSIZ] = {0};
884
885 switch (phase) {
886 case frr::CommitRequest::VALIDATE:
887 grpc_debug("`-> Performing VALIDATE");
888 ret = nb_candidate_validate(&context, candidate->config, errmsg,
889 sizeof(errmsg));
890 break;
891 case frr::CommitRequest::PREPARE:
892 grpc_debug("`-> Performing PREPARE");
893 ret = nb_candidate_commit_prepare(
894 &context, candidate->config, comment.c_str(),
895 &candidate->transaction, errmsg, sizeof(errmsg));
896 break;
897 case frr::CommitRequest::ABORT:
898 grpc_debug("`-> Performing ABORT");
899 nb_candidate_commit_abort(candidate->transaction, errmsg,
900 sizeof(errmsg));
901 break;
902 case frr::CommitRequest::APPLY:
903 grpc_debug("`-> Performing APPLY");
904 nb_candidate_commit_apply(candidate->transaction, true,
905 &transaction_id, errmsg,
906 sizeof(errmsg));
907 break;
908 case frr::CommitRequest::ALL:
909 grpc_debug("`-> Performing ALL");
910 ret = nb_candidate_commit(&context, candidate->config, true,
911 comment.c_str(), &transaction_id,
912 errmsg, sizeof(errmsg));
913 break;
914 }
ec2ac5f2 915
b680134e
CH
916 // Map northbound error codes to gRPC status codes.
917 grpc::Status status;
918 switch (ret) {
919 case NB_OK:
920 status = grpc::Status::OK;
921 break;
922 case NB_ERR_NO_CHANGES:
923 status = grpc::Status(grpc::StatusCode::ABORTED, errmsg);
924 break;
925 case NB_ERR_LOCKED:
926 status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg);
927 break;
928 case NB_ERR_VALIDATION:
929 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
930 errmsg);
931 break;
932 case NB_ERR_RESOURCE:
933 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
934 errmsg);
935 break;
936 case NB_ERR:
937 default:
938 status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg);
939 break;
940 }
ec2ac5f2 941
b680134e
CH
942 grpc_debug("`-> Result: %s (message: '%s')",
943 nb_err_name((enum nb_error)ret), errmsg);
944
945 if (ret == NB_OK) {
946 // Response: uint32 transaction_id = 1;
947 if (transaction_id)
948 tag->response.set_transaction_id(transaction_id);
ec2ac5f2 949 }
b680134e
CH
950 if (strlen(errmsg) > 0)
951 tag->response.set_error_message(errmsg);
ec2ac5f2 952
b680134e
CH
953 tag->responder.Finish(tag->response, status, tag);
954 tag->state = FINISH;
955}
ec2ac5f2 956
b680134e
CH
957void HandleUnaryLockConfig(
958 NewRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
959{
960 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
ec2ac5f2 961
b680134e
CH
962 if (tag->state == FINISH) {
963 tag->state = DELETED;
964 return;
ec2ac5f2
RW
965 }
966
b680134e
CH
967 if (nb_running_lock(NB_CLIENT_GRPC, NULL)) {
968 tag->responder.Finish(
969 tag->response,
970 grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
971 "running configuration is locked already"),
972 tag);
973 } else {
974 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
ec2ac5f2 975 }
b680134e
CH
976 tag->state = FINISH;
977}
ec2ac5f2 978
b680134e
CH
979void HandleUnaryUnlockConfig(
980 NewRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
981{
982 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
ec2ac5f2 983
b680134e
CH
984 if (tag->state == FINISH) {
985 tag->state = DELETED;
986 return;
ec2ac5f2
RW
987 }
988
b680134e
CH
989 if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) {
990 tag->responder.Finish(
991 tag->response,
992 grpc::Status(
993 grpc::StatusCode::FAILED_PRECONDITION,
994 "failed to unlock the running configuration"),
995 tag);
996 } else {
997 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
998 }
999 tag->state = FINISH;
1000}
ec2ac5f2 1001
b680134e
CH
1002static void list_transactions_cb(void *arg, int transaction_id,
1003 const char *client_name, const char *date,
1004 const char *comment)
1005{
1006 auto list = static_cast<std::list<
1007 std::tuple<int, std::string, std::string, std::string>> *>(arg);
1008 list->push_back(
1009 std::make_tuple(transaction_id, std::string(client_name),
1010 std::string(date), std::string(comment)));
1011}
1012
1013void HandleStreamingListTransactions(
1014 NewRpcState<frr::ListTransactionsRequest, frr::ListTransactionsResponse>
1015 *tag)
1016{
1017 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
1018
1019 if (tag->state == FINISH) {
1020 delete static_cast<std::list<std::tuple<
1021 int, std::string, std::string, std::string>> *>(
1022 tag->context);
1023 tag->state = DELETED;
1024 return;
ec2ac5f2
RW
1025 }
1026
b680134e
CH
1027 if (!tag->context) {
1028 /* Creating, first time called for this RPC */
1029 auto new_list =
1030 new std::list<std::tuple<int, std::string, std::string,
1031 std::string>>();
1032 tag->context = new_list;
1033 nb_db_transactions_iterate(list_transactions_cb, tag->context);
1034
1035 new_list->push_back(std::make_tuple(
1036 0xFFFF, std::string("fake client"),
1037 std::string("fake date"), std::string("fake comment")));
1038 new_list->push_back(
1039 std::make_tuple(0xFFFE, std::string("fake client2"),
1040 std::string("fake date"),
1041 std::string("fake comment2")));
ec2ac5f2
RW
1042 }
1043
b680134e
CH
1044 auto list = static_cast<std::list<
1045 std::tuple<int, std::string, std::string, std::string>> *>(
1046 tag->context);
af1b88e9 1047
b680134e
CH
1048 if (list->empty()) {
1049 tag->async_responder.Finish(grpc::Status::OK, tag);
1050 tag->state = FINISH;
1051 return;
ec2ac5f2
RW
1052 }
1053
b680134e 1054 auto item = list->back();
ec2ac5f2 1055
b680134e 1056 frr::ListTransactionsResponse response;
ec2ac5f2 1057
b680134e
CH
1058 // Response: uint32 id = 1;
1059 response.set_id(std::get<0>(item));
ec2ac5f2 1060
b680134e
CH
1061 // Response: string client = 2;
1062 response.set_client(std::get<1>(item).c_str());
1063
1064 // Response: string date = 3;
1065 response.set_date(std::get<2>(item).c_str());
1066
1067 // Response: string comment = 4;
1068 response.set_comment(std::get<3>(item).c_str());
ec2ac5f2 1069
b680134e
CH
1070 list->pop_back();
1071 if (list->empty()) {
1072 tag->async_responder.WriteAndFinish(
1073 response, grpc::WriteOptions(), grpc::Status::OK, tag);
1074 tag->state = FINISH;
1075 } else {
1076 tag->async_responder.Write(response, tag);
1077 tag->state = MORE;
ec2ac5f2 1078 }
b680134e 1079}
ec2ac5f2 1080
b680134e
CH
1081void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest,
1082 frr::GetTransactionResponse> *tag)
1083{
1084 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
ec2ac5f2 1085
b680134e
CH
1086 if (tag->state == FINISH) {
1087 tag->state = DELETED;
1088 return;
1089 }
ec2ac5f2 1090
b680134e
CH
1091 // Request: uint32 transaction_id = 1;
1092 uint32_t transaction_id = tag->request.transaction_id();
1093 // Request: Encoding encoding = 2;
1094 frr::Encoding encoding = tag->request.encoding();
1095 // Request: bool with_defaults = 3;
1096 bool with_defaults = tag->request.with_defaults();
1097
1098 grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__,
1099 transaction_id, encoding);
1100
1101 struct nb_config *nb_config;
1102
1103 // Load configuration from the transactions database.
1104 nb_config = nb_db_transaction_load(transaction_id);
1105 if (!nb_config) {
1106 tag->responder.Finish(
1107 tag->response,
1108 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1109 "Transaction not found"),
1110 tag);
1111 tag->state = FINISH;
1112 return;
1113 }
ec2ac5f2 1114
b680134e
CH
1115 // Response: DataTree config = 1;
1116 auto config = tag->response.mutable_config();
1117 config->set_encoding(encoding);
1118
1119 // Dump data using the requested format.
1120 if (data_tree_from_dnode(config, nb_config->dnode,
1121 encoding2lyd_format(encoding), with_defaults)
1122 != 0) {
1123 nb_config_free(nb_config);
1124 tag->responder.Finish(tag->response,
1125 grpc::Status(grpc::StatusCode::INTERNAL,
1126 "Failed to dump data"),
1127 tag);
1128 tag->state = FINISH;
1129 return;
ec2ac5f2
RW
1130 }
1131
b680134e 1132 nb_config_free(nb_config);
ec2ac5f2 1133
b680134e
CH
1134 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
1135 tag->state = FINISH;
1136}
ec2ac5f2 1137
b680134e
CH
1138void HandleUnaryExecute(
1139 NewRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
1140{
1141 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
ec2ac5f2 1142
b680134e
CH
1143 if (tag->state == FINISH) {
1144 tag->state = DELETED;
1145 return;
ec2ac5f2
RW
1146 }
1147
b680134e
CH
1148 struct nb_node *nb_node;
1149 struct list *input_list;
1150 struct list *output_list;
1151 struct listnode *node;
1152 struct yang_data *data;
1153 const char *xpath;
1154 char errmsg[BUFSIZ] = {0};
1155
1156 // Request: string path = 1;
1157 xpath = tag->request.path().c_str();
1158
1159 grpc_debug("%s(path: \"%s\")", __func__, xpath);
1160
1161 if (tag->request.path().empty()) {
1162 tag->responder.Finish(
1163 tag->response,
1164 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1165 "Data path is empty"),
1166 tag);
1167 tag->state = FINISH;
1168 return;
1169 }
0fe5b904 1170
b680134e
CH
1171 nb_node = nb_node_find(xpath);
1172 if (!nb_node) {
1173 tag->responder.Finish(
1174 tag->response,
1175 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1176 "Unknown data path"),
1177 tag);
1178 tag->state = FINISH;
1179 return;
ec2ac5f2
RW
1180 }
1181
b680134e
CH
1182 input_list = yang_data_list_new();
1183 output_list = yang_data_list_new();
ec2ac5f2 1184
b680134e
CH
1185 // Read input parameters.
1186 auto input = tag->request.input();
1187 for (const frr::PathValue &pv : input) {
1188 // Request: repeated PathValue input = 2;
1189 data = yang_data_new(pv.path().c_str(), pv.value().c_str());
1190 listnode_add(input_list, data);
1191 }
ec2ac5f2 1192
b680134e
CH
1193 // Execute callback registered for this XPath.
1194 if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg,
1195 sizeof(errmsg))
1196 != NB_OK) {
1197 flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s",
1198 __func__, xpath);
1199 list_delete(&input_list);
1200 list_delete(&output_list);
1201
1202 tag->responder.Finish(
1203 tag->response,
1204 grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"),
1205 tag);
1206 tag->state = FINISH;
1207 return;
ec2ac5f2 1208 }
b680134e
CH
1209
1210 // Process output parameters.
1211 for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
1212 // Response: repeated PathValue output = 1;
1213 frr::PathValue *pv = tag->response.add_output();
1214 pv->set_path(data->xpath);
1215 pv->set_value(data->value);
1216 }
1217
1218 // Release memory.
1219 list_delete(&input_list);
1220 list_delete(&output_list);
1221
1222 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
1223 tag->state = FINISH;
1224}
1225
1226// ------------------------------------------------------
1227// Thread Initialization and Run Functions
1228// ------------------------------------------------------
1229
1230
1231#define REQUEST_NEWRPC(NAME, cdb) \
1232 do { \
1233 auto _rpcState = new NewRpcState<frr::NAME##Request, \
1234 frr::NAME##Response>( \
1235 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1236 &HandleUnary##NAME, #NAME); \
83f6fce7 1237 _rpcState->do_request(&service, cq.get(), true); \
b680134e
CH
1238 } while (0)
1239
1240#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \
1241 do { \
1242 auto _rpcState = new NewRpcState<frr::NAME##Request, \
1243 frr::NAME##Response>( \
1244 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1245 &HandleStreaming##NAME, #NAME); \
83f6fce7 1246 _rpcState->do_request(&service, cq.get(), true); \
b680134e
CH
1247 } while (0)
1248
1249struct grpc_pthread_attr {
1250 struct frr_pthread_attr attr;
1251 unsigned long port;
ec2ac5f2
RW
1252};
1253
de800c10 1254// Capture these objects so we can try to shut down cleanly
83f6fce7
CH
1255static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER;
1256static grpc::Server *s_server;
de800c10 1257
ec2ac5f2
RW
1258static void *grpc_pthread_start(void *arg)
1259{
0edcb505 1260 struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
b680134e
CH
1261 uint port = (uint) reinterpret_cast<intptr_t>(fpt->data);
1262
1263 Candidates candidates;
1264 grpc::ServerBuilder builder;
1265 std::stringstream server_address;
83f6fce7 1266 frr::Northbound::AsyncService service;
ec2ac5f2 1267
0edcb505
CS
1268 frr_pthread_set_name(fpt);
1269
b680134e
CH
1270 server_address << "0.0.0.0:" << port;
1271 builder.AddListeningPort(server_address.str(),
1272 grpc::InsecureServerCredentials());
83f6fce7 1273 builder.RegisterService(&service);
673e4407
RZ
1274 builder.AddChannelArgument(
1275 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
83f6fce7
CH
1276 std::unique_ptr<grpc::ServerCompletionQueue> cq =
1277 builder.AddCompletionQueue();
1278 std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
1279 s_server = server.get();
1280
1281 grpc_running = true;
b680134e
CH
1282
1283 /* Schedule all RPC handlers */
1284 REQUEST_NEWRPC(GetCapabilities, NULL);
1285 REQUEST_NEWRPC(CreateCandidate, &candidates);
1286 REQUEST_NEWRPC(DeleteCandidate, &candidates);
1287 REQUEST_NEWRPC(UpdateCandidate, &candidates);
1288 REQUEST_NEWRPC(EditCandidate, &candidates);
1289 REQUEST_NEWRPC(LoadToCandidate, &candidates);
1290 REQUEST_NEWRPC(Commit, &candidates);
1291 REQUEST_NEWRPC(GetTransaction, NULL);
1292 REQUEST_NEWRPC(LockConfig, NULL);
1293 REQUEST_NEWRPC(UnlockConfig, NULL);
1294 REQUEST_NEWRPC(Execute, NULL);
1295 REQUEST_NEWRPC_STREAMING(Get, NULL);
1296 REQUEST_NEWRPC_STREAMING(ListTransactions, NULL);
1297
1298 zlog_notice("gRPC server listening on %s",
1299 server_address.str().c_str());
1300
1301 /* Process inbound RPCs */
83f6fce7
CH
1302 bool ok;
1303 void *tag;
1304 while (grpc_running) {
1305 if (!cq->Next(&tag, &ok)) {
1306 grpc_debug("%s: CQ empty exiting", __func__);
de800c10 1307 break;
83f6fce7 1308 }
de800c10 1309
83f6fce7
CH
1310 grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag,
1311 ok);
1312
1313 if (!ok || !grpc_running) {
1314 delete static_cast<RpcStateBase *>(tag);
1315 break;
1316 }
b680134e
CH
1317
1318 RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
1319 CallState state = rpc->doCallback();
83f6fce7 1320 grpc_debug("%s: callback returned RPC State: %s", __func__,
b680134e
CH
1321 call_states[state]);
1322
1323 /*
1324 * Our side is done (FINISH) receive new requests of this type
1325 * We could do this earlier but that would mean we could be
1326 * handling multiple same type requests in parallel. We expect
1327 * to be called back once more in the FINISH state (from the
1328 * user indicating Finish() for cleanup.
1329 */
83f6fce7
CH
1330 if (state == FINISH && grpc_running)
1331 rpc->do_request(&service, cq.get(), false);
b680134e 1332 }
ec2ac5f2 1333
83f6fce7
CH
1334 /* This was probably done for us to get here, but let's be safe */
1335 pthread_mutex_lock(&s_server_lock);
1336 grpc_running = false;
1337 if (s_server) {
1338 grpc_debug("%s: shutdown server and CQ", __func__);
1339 server->Shutdown();
1340 s_server = NULL;
1341 }
1342 pthread_mutex_unlock(&s_server_lock);
1343
1344 grpc_debug("%s: shutting down CQ", __func__);
1345 cq->Shutdown();
1346
1347 grpc_debug("%s: draining the CQ", __func__);
1348 while (cq->Next(&tag, &ok)) {
1349 grpc_debug("%s: drain tag %p", __func__, tag);
1350 delete static_cast<RpcStateBase *>(tag);
b680134e 1351 }
ec2ac5f2 1352
83f6fce7 1353 zlog_info("%s: exiting from grpc pthread", __func__);
ec2ac5f2
RW
1354 return NULL;
1355}
1356
b680134e
CH
1357
1358static int frr_grpc_init(uint port)
ec2ac5f2 1359{
b680134e
CH
1360 struct frr_pthread_attr attr = {
1361 .start = grpc_pthread_start,
1362 .stop = NULL,
1363 };
1364
83f6fce7
CH
1365 grpc_debug("%s: entered", __func__);
1366
0edcb505 1367 fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
b680134e 1368 fpt->data = reinterpret_cast<void *>((intptr_t)port);
0edcb505 1369
ec2ac5f2 1370 /* Create a pthread for gRPC since it runs its own event loop. */
0edcb505 1371 if (frr_pthread_run(fpt, NULL) < 0) {
ec2ac5f2
RW
1372 flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s",
1373 __func__, safe_strerror(errno));
1374 return -1;
1375 }
ec2ac5f2
RW
1376
1377 return 0;
1378}
1379
1380static int frr_grpc_finish(void)
1381{
83f6fce7 1382 grpc_debug("%s: entered", __func__);
de800c10 1383
83f6fce7
CH
1384 if (!fpt)
1385 return 0;
de800c10 1386
83f6fce7
CH
1387 /*
1388 * Shut the server down here in main thread. This will cause the wait on
1389 * the completion queue (cq.Next()) to exit and cleanup everything else.
1390 */
1391 pthread_mutex_lock(&s_server_lock);
1392 grpc_running = false;
1393 if (s_server) {
1394 grpc_debug("%s: shutdown server", __func__);
1395 s_server->Shutdown();
1396 s_server = NULL;
de800c10 1397 }
83f6fce7 1398 pthread_mutex_unlock(&s_server_lock);
de800c10 1399
83f6fce7
CH
1400 grpc_debug("%s: joining and destroy grpc thread", __func__);
1401 pthread_join(fpt->thread, NULL);
1402 frr_pthread_destroy(fpt);
ec2ac5f2
RW
1403 return 0;
1404}
1405
20b0a2ed
QY
1406/*
1407 * This is done this way because module_init and module_late_init are both
1408 * called during daemon pre-fork initialization. Because the GRPC library
1409 * spawns threads internally, we need to delay initializing it until after
1410 * fork. This is done by scheduling this init function as an event task, since
1411 * the event loop doesn't run until after fork.
1412 */
cc9f21da 1413static void frr_grpc_module_very_late_init(struct thread *thread)
ec2ac5f2 1414{
ec2ac5f2 1415 const char *args = THIS_MODULE->load_args;
b680134e 1416 uint port = GRPC_DEFAULT_PORT;
ec2ac5f2 1417
ec2ac5f2 1418 if (args) {
b680134e
CH
1419 port = std::stoul(args);
1420 if (port < 1024 || port > UINT16_MAX) {
ec2ac5f2 1421 flog_err(EC_LIB_GRPC_INIT,
b680134e
CH
1422 "%s: port number must be between 1025 and %d",
1423 __func__, UINT16_MAX);
ec2ac5f2
RW
1424 goto error;
1425 }
1426 }
1427
b680134e 1428 if (frr_grpc_init(port) < 0)
ec2ac5f2
RW
1429 goto error;
1430
cc9f21da 1431 return;
69ec5832 1432
ec2ac5f2
RW
1433error:
1434 flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module");
ec2ac5f2
RW
1435}
1436
20b0a2ed
QY
1437static int frr_grpc_module_late_init(struct thread_master *tm)
1438{
b680134e 1439 main_master = tm;
20b0a2ed 1440 hook_register(frr_fini, frr_grpc_finish);
b680134e 1441 thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL);
20b0a2ed
QY
1442 return 0;
1443}
1444
ec2ac5f2
RW
1445static int frr_grpc_module_init(void)
1446{
1447 hook_register(frr_late_init, frr_grpc_module_late_init);
1448
1449 return 0;
1450}
1451
1452FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION,
1453 .description = "FRR gRPC northbound module",
b680134e 1454 .init = frr_grpc_module_init, );