]> git.proxmox.com Git - mirror_frr.git/blob - lib/northbound_grpc.cpp
Merge pull request #9846 from idryzhov/lib-zebra-netns
[mirror_frr.git] / lib / northbound_grpc.cpp
1 //
2 // Copyright (C) 2019 NetDEF, Inc.
3 // Renato Westphal
4 // Copyright (c) 2021, LabN Consulting, L.L.C
5 //
6 // This program is free software; you can redistribute it and/or modify it
7 // under the terms of the GNU General Public License as published by the Free
8 // Software Foundation; either version 2 of the License, or (at your option)
9 // any later version.
10 //
11 // This program is distributed in the hope that it will be useful, but WITHOUT
12 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 // more details.
15 //
16 // You should have received a copy of the GNU General Public License along
17 // with this program; see the file COPYING; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 //
20
21 #include <zebra.h>
22 #include <grpcpp/grpcpp.h>
23 #include "grpc/frr-northbound.grpc.pb.h"
24
25 #include "log.h"
26 #include "libfrr.h"
27 #include "lib/version.h"
28 #include "lib/thread.h"
29 #include "command.h"
30 #include "lib_errors.h"
31 #include "northbound.h"
32 #include "northbound_db.h"
33 #include "frr_pthread.h"
34
35 #include <iostream>
36 #include <sstream>
37 #include <memory>
38 #include <string>
39
40 #define GRPC_DEFAULT_PORT 50051
41
42 /*
43 * NOTE: we can't use the FRR debugging infrastructure here since it uses
44 * atomics and C++ has a different atomics API. Enable gRPC debugging
45 * unconditionally until we figure out a way to solve this problem.
46 */
47 static bool nb_dbg_client_grpc = 0;
48
49 static struct thread_master *main_master;
50
51 static struct frr_pthread *fpt;
52
53 #define grpc_debug(...) \
54 do { \
55 if (nb_dbg_client_grpc) \
56 zlog_debug(__VA_ARGS__); \
57 } while (0)
58
59 // ------------------------------------------------------
60 // New Types
61 // ------------------------------------------------------
62
63 enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED };
64 const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"};
65
66 struct candidate {
67 uint64_t id;
68 struct nb_config *config;
69 struct nb_transaction *transaction;
70 };
71
72 class Candidates
73 {
74 public:
75 ~Candidates(void)
76 {
77 // Delete candidates.
78 for (auto it = _cdb.begin(); it != _cdb.end(); it++)
79 delete_candidate(&it->second);
80 }
81
82 struct candidate *create_candidate(void)
83 {
84 uint64_t id = ++_next_id;
85 assert(id); // TODO: implement an algorithm for unique reusable
86 // IDs.
87 struct candidate *c = &_cdb[id];
88 c->id = id;
89 c->config = nb_config_dup(running_config);
90 c->transaction = NULL;
91
92 return c;
93 }
94
95 void delete_candidate(struct candidate *c)
96 {
97 char errmsg[BUFSIZ] = {0};
98
99 _cdb.erase(c->id);
100 nb_config_free(c->config);
101 if (c->transaction)
102 nb_candidate_commit_abort(c->transaction, errmsg,
103 sizeof(errmsg));
104 }
105
106 struct candidate *get_candidate(uint32_t id)
107 {
108 return _cdb.count(id) == 0 ? NULL : &_cdb[id];
109 }
110
111 private:
112 uint64_t _next_id = 0;
113 std::map<uint32_t, struct candidate> _cdb;
114 };
115
116 class RpcStateBase
117 {
118 public:
119 virtual CallState doCallback() = 0;
120 virtual void do_request(::frr::Northbound::AsyncService *service,
121 ::grpc::ServerCompletionQueue *cq) = 0;
122 };
123
124 /*
125 * The RPC state class is used to track the execution of an RPC.
126 */
127 template <typename Q, typename S> class NewRpcState : RpcStateBase
128 {
129 typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
130 ::grpc::ServerContext *, Q *,
131 ::grpc::ServerAsyncResponseWriter<S> *,
132 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
133 void *);
134 typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
135 ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
136 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
137 void *);
138
139 public:
140 NewRpcState(Candidates *cdb, reqfunc_t rfunc,
141 void (*cb)(NewRpcState<Q, S> *), const char *name)
142 : requestf(rfunc), callback(cb), responder(&ctx),
143 async_responder(&ctx), name(name), cdb(cdb){};
144 NewRpcState(Candidates *cdb, reqsfunc_t rfunc,
145 void (*cb)(NewRpcState<Q, S> *), const char *name)
146 : requestsf(rfunc), callback(cb), responder(&ctx),
147 async_responder(&ctx), name(name), cdb(cdb){};
148
149 CallState doCallback() override
150 {
151 CallState enter_state = this->state;
152 CallState new_state;
153 if (enter_state == FINISH) {
154 grpc_debug("%s RPC FINISH -> DELETED", name);
155 new_state = FINISH;
156 } else {
157 grpc_debug("%s RPC: %s -> PROCESS", name,
158 call_states[this->state]);
159 new_state = PROCESS;
160 }
161 /*
162 * We are either in state CREATE, MORE or FINISH. If CREATE or
163 * MORE move back to PROCESS, otherwise we are cleaning up
164 * (FINISH) so leave it in that state. Run the callback on the
165 * main threadmaster/pthread; and wait for expected transition
166 * from main thread. If transition is to FINISH->DELETED.
167 * delete us.
168 *
169 * We update the state prior to scheduling the callback which
170 * may then update the state in the master pthread. Then we
171 * obtain the lock in the condvar-check-loop as the callback
172 * will be modifying updating the state value.
173 */
174 this->state = new_state;
175 thread_add_event(main_master, c_callback, (void *)this, 0,
176 NULL);
177 pthread_mutex_lock(&this->cmux);
178 while (this->state == new_state)
179 pthread_cond_wait(&this->cond, &this->cmux);
180 pthread_mutex_unlock(&this->cmux);
181
182 if (this->state == DELETED) {
183 grpc_debug("%s RPC: -> [DELETED]", name);
184 delete this;
185 return DELETED;
186 }
187 return this->state;
188 }
189
190 void do_request(::frr::Northbound::AsyncService *service,
191 ::grpc::ServerCompletionQueue *cq) override
192 {
193 grpc_debug("%s, posting a request for: %s", __func__, name);
194 if (requestf) {
195 NewRpcState<Q, S> *copy =
196 new NewRpcState(cdb, requestf, callback, name);
197 (service->*requestf)(&copy->ctx, &copy->request,
198 &copy->responder, cq, cq, copy);
199 } else {
200 NewRpcState<Q, S> *copy =
201 new NewRpcState(cdb, requestsf, callback, name);
202 (service->*requestsf)(&copy->ctx, &copy->request,
203 &copy->async_responder, cq, cq,
204 copy);
205 }
206 }
207
208
209 static int c_callback(struct thread *thread)
210 {
211 auto _tag = static_cast<NewRpcState<Q, S> *>(thread->arg);
212 /*
213 * We hold the lock until the callback finishes and has updated
214 * _tag->state, then we signal done and release.
215 */
216 pthread_mutex_lock(&_tag->cmux);
217
218 CallState enter_state = _tag->state;
219 grpc_debug("%s RPC running on main thread", _tag->name);
220
221 _tag->callback(_tag);
222
223 grpc_debug("%s RPC: %s -> %s", _tag->name,
224 call_states[enter_state], call_states[_tag->state]);
225
226 pthread_cond_signal(&_tag->cond);
227 pthread_mutex_unlock(&_tag->cmux);
228 return 0;
229 }
230 NewRpcState<Q, S> *orig;
231
232 const char *name;
233 grpc::ServerContext ctx;
234 Q request;
235 S response;
236 grpc::ServerAsyncResponseWriter<S> responder;
237 grpc::ServerAsyncWriter<S> async_responder;
238
239 Candidates *cdb;
240 void (*callback)(NewRpcState<Q, S> *);
241 reqfunc_t requestf;
242 reqsfunc_t requestsf;
243
244 pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
245 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
246 void *context;
247
248 CallState state = CREATE;
249 };
250
251 // ------------------------------------------------------
252 // Utility Functions
253 // ------------------------------------------------------
254
255 static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
256 {
257 switch (encoding) {
258 case frr::JSON:
259 return LYD_JSON;
260 case frr::XML:
261 return LYD_XML;
262 default:
263 flog_err(EC_LIB_DEVELOPMENT,
264 "%s: unknown data encoding format (%u)", __func__,
265 encoding);
266 exit(1);
267 }
268 }
269
270 static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path,
271 const std::string &value)
272 {
273 LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(),
274 value.c_str(), LYD_NEW_PATH_UPDATE, &dnode);
275 if (err != LY_SUCCESS) {
276 flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",
277 __func__, ly_errmsg(ly_native_ctx));
278 return -1;
279 }
280
281 return 0;
282 }
283
284 static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path)
285 {
286 dnode = yang_dnode_get(dnode, path.c_str());
287 if (!dnode)
288 return -1;
289
290 lyd_free_tree(dnode);
291
292 return 0;
293 }
294
295 static LY_ERR data_tree_from_dnode(frr::DataTree *dt,
296 const struct lyd_node *dnode,
297 LYD_FORMAT lyd_format, bool with_defaults)
298 {
299 char *strp;
300 int options = 0;
301
302 SET_FLAG(options, LYD_PRINT_WITHSIBLINGS);
303 if (with_defaults)
304 SET_FLAG(options, LYD_PRINT_WD_ALL);
305 else
306 SET_FLAG(options, LYD_PRINT_WD_TRIM);
307
308 LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options);
309 if (err == LY_SUCCESS) {
310 if (strp) {
311 dt->set_data(strp);
312 free(strp);
313 }
314 }
315 return err;
316 }
317
318 static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt,
319 bool config_only)
320 {
321 struct lyd_node *dnode;
322 int options, opt2;
323 LY_ERR err;
324
325 if (config_only) {
326 options = LYD_PARSE_NO_STATE;
327 opt2 = LYD_VALIDATE_NO_STATE;
328 } else {
329 options = LYD_PARSE_STRICT;
330 opt2 = 0;
331 }
332
333 err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(),
334 encoding2lyd_format(dt->encoding()), options,
335 opt2, &dnode);
336 if (err != LY_SUCCESS) {
337 flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s",
338 __func__, ly_errmsg(ly_native_ctx));
339 }
340 return dnode;
341 }
342
343 static struct lyd_node *get_dnode_config(const std::string &path)
344 {
345 struct lyd_node *dnode;
346
347 if (!yang_dnode_exists(running_config->dnode,
348 path.empty() ? NULL : path.c_str()))
349 return NULL;
350
351 dnode = yang_dnode_get(running_config->dnode,
352 path.empty() ? NULL : path.c_str());
353 if (dnode)
354 dnode = yang_dnode_dup(dnode);
355
356 return dnode;
357 }
358
359 static int get_oper_data_cb(const struct lysc_node *snode,
360 struct yang_translator *translator,
361 struct yang_data *data, void *arg)
362 {
363 struct lyd_node *dnode = static_cast<struct lyd_node *>(arg);
364 int ret = yang_dnode_edit(dnode, data->xpath, data->value);
365 yang_data_free(data);
366
367 return (ret == 0) ? NB_OK : NB_ERR;
368 }
369
370 static struct lyd_node *get_dnode_state(const std::string &path)
371 {
372 struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false);
373 if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode)
374 != NB_OK) {
375 yang_dnode_free(dnode);
376 return NULL;
377 }
378
379 return dnode;
380 }
381
382 static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
383 int type, LYD_FORMAT lyd_format,
384 bool with_defaults)
385 {
386 struct lyd_node *dnode_config = NULL;
387 struct lyd_node *dnode_state = NULL;
388 struct lyd_node *dnode_final;
389
390 // Configuration data.
391 if (type == frr::GetRequest_DataType_ALL
392 || type == frr::GetRequest_DataType_CONFIG) {
393 dnode_config = get_dnode_config(path);
394 if (!dnode_config)
395 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
396 "Data path not found");
397 }
398
399 // Operational data.
400 if (type == frr::GetRequest_DataType_ALL
401 || type == frr::GetRequest_DataType_STATE) {
402 dnode_state = get_dnode_state(path);
403 if (!dnode_state) {
404 if (dnode_config)
405 yang_dnode_free(dnode_config);
406 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
407 "Failed to fetch operational data");
408 }
409 }
410
411 switch (type) {
412 case frr::GetRequest_DataType_ALL:
413 //
414 // Combine configuration and state data into a single
415 // dnode.
416 //
417 if (lyd_merge_siblings(&dnode_state, dnode_config,
418 LYD_MERGE_DESTRUCT)
419 != LY_SUCCESS) {
420 yang_dnode_free(dnode_state);
421 yang_dnode_free(dnode_config);
422 return grpc::Status(
423 grpc::StatusCode::INTERNAL,
424 "Failed to merge configuration and state data",
425 ly_errmsg(ly_native_ctx));
426 }
427
428 dnode_final = dnode_state;
429 break;
430 case frr::GetRequest_DataType_CONFIG:
431 dnode_final = dnode_config;
432 break;
433 case frr::GetRequest_DataType_STATE:
434 dnode_final = dnode_state;
435 break;
436 }
437
438 // Validate data to create implicit default nodes if necessary.
439 int validate_opts = 0;
440 if (type == frr::GetRequest_DataType_CONFIG)
441 validate_opts = LYD_VALIDATE_NO_STATE;
442 else
443 validate_opts = 0;
444
445 LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx,
446 validate_opts, NULL);
447
448 if (err)
449 flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s",
450 __func__, ly_errmsg(ly_native_ctx));
451 // Dump data using the requested format.
452 if (!err)
453 err = data_tree_from_dnode(dt, dnode_final, lyd_format,
454 with_defaults);
455 yang_dnode_free(dnode_final);
456 if (err)
457 return grpc::Status(grpc::StatusCode::INTERNAL,
458 "Failed to dump data");
459 return grpc::Status::OK;
460 }
461
462
463 // ------------------------------------------------------
464 // RPC Callback Functions: run on main thread
465 // ------------------------------------------------------
466
467 void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest,
468 frr::GetCapabilitiesResponse> *tag)
469 {
470 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
471
472 if (tag->state == FINISH) {
473 tag->state = DELETED;
474 return;
475 }
476
477 // Response: string frr_version = 1;
478 tag->response.set_frr_version(FRR_VERSION);
479
480 // Response: bool rollback_support = 2;
481 #ifdef HAVE_CONFIG_ROLLBACKS
482 tag->response.set_rollback_support(true);
483 #else
484 tag->response.set_rollback_support(false);
485 #endif
486 // Response: repeated ModuleData supported_modules = 3;
487 struct yang_module *module;
488 RB_FOREACH (module, yang_modules, &yang_modules) {
489 auto m = tag->response.add_supported_modules();
490
491 m->set_name(module->name);
492 if (module->info->revision)
493 m->set_revision(module->info->revision);
494 m->set_organization(module->info->org);
495 }
496
497 // Response: repeated Encoding supported_encodings = 4;
498 tag->response.add_supported_encodings(frr::JSON);
499 tag->response.add_supported_encodings(frr::XML);
500
501 /* Should we do this in the async process call? */
502 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
503
504 /* Indicate we are done. */
505 tag->state = FINISH;
506 }
507
508 void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag)
509 {
510 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
511
512 if (tag->state == FINISH) {
513 delete static_cast<std::list<std::string> *>(tag->context);
514 tag->state = DELETED;
515 return;
516 }
517
518 if (!tag->context) {
519 /* Creating, first time called for this RPC */
520 auto mypaths = new std::list<std::string>();
521 tag->context = mypaths;
522 auto paths = tag->request.path();
523 for (const std::string &path : paths) {
524 mypaths->push_back(std::string(path));
525 }
526 }
527
528 // Request: DataType type = 1;
529 int type = tag->request.type();
530 // Request: Encoding encoding = 2;
531 frr::Encoding encoding = tag->request.encoding();
532 // Request: bool with_defaults = 3;
533 bool with_defaults = tag->request.with_defaults();
534
535 auto mypathps = static_cast<std::list<std::string> *>(tag->context);
536 if (mypathps->empty()) {
537 tag->async_responder.Finish(grpc::Status::OK, tag);
538 tag->state = FINISH;
539 return;
540 }
541
542 frr::GetResponse response;
543 grpc::Status status;
544
545 // Response: int64 timestamp = 1;
546 response.set_timestamp(time(NULL));
547
548 // Response: DataTree data = 2;
549 auto *data = response.mutable_data();
550 data->set_encoding(tag->request.encoding());
551 status = get_path(data, mypathps->back().c_str(), type,
552 encoding2lyd_format(encoding), with_defaults);
553
554 if (!status.ok()) {
555 tag->async_responder.WriteAndFinish(
556 response, grpc::WriteOptions(), status, tag);
557 tag->state = FINISH;
558 return;
559 }
560
561 mypathps->pop_back();
562 if (mypathps->empty()) {
563 tag->async_responder.WriteAndFinish(
564 response, grpc::WriteOptions(), grpc::Status::OK, tag);
565 tag->state = FINISH;
566 } else {
567 tag->async_responder.Write(response, tag);
568 tag->state = MORE;
569 }
570 }
571
572 void HandleUnaryCreateCandidate(NewRpcState<frr::CreateCandidateRequest,
573 frr::CreateCandidateResponse> *tag)
574 {
575 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
576
577 if (tag->state == FINISH) {
578 tag->state = DELETED;
579 return;
580 }
581
582 struct candidate *candidate = tag->cdb->create_candidate();
583 if (!candidate) {
584 tag->responder.Finish(
585 tag->response,
586 grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
587 "Can't create candidate configuration"),
588 tag);
589 } else {
590 tag->response.set_candidate_id(candidate->id);
591 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
592 }
593
594 tag->state = FINISH;
595 }
596
597 void HandleUnaryDeleteCandidate(NewRpcState<frr::DeleteCandidateRequest,
598 frr::DeleteCandidateResponse> *tag)
599 {
600 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
601
602 if (tag->state == FINISH) {
603 tag->state = DELETED;
604 return;
605 }
606
607 // Request: uint32 candidate_id = 1;
608 uint32_t candidate_id = tag->request.candidate_id();
609
610 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
611
612 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
613 if (!candidate) {
614 tag->responder.Finish(
615 tag->response,
616 grpc::Status(grpc::StatusCode::NOT_FOUND,
617 "candidate configuration not found"),
618 tag);
619 } else {
620 tag->cdb->delete_candidate(candidate);
621 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
622 }
623 tag->state = FINISH;
624 }
625
626 void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest,
627 frr::UpdateCandidateResponse> *tag)
628 {
629 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
630
631 if (tag->state == FINISH) {
632 tag->state = DELETED;
633 return;
634 }
635
636 // Request: uint32 candidate_id = 1;
637 uint32_t candidate_id = tag->request.candidate_id();
638
639 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
640
641 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
642
643 if (!candidate)
644 tag->responder.Finish(
645 tag->response,
646 grpc::Status(grpc::StatusCode::NOT_FOUND,
647 "candidate configuration not found"),
648 tag);
649 else if (candidate->transaction)
650 tag->responder.Finish(
651 tag->response,
652 grpc::Status(
653 grpc::StatusCode::FAILED_PRECONDITION,
654 "candidate is in the middle of a transaction"),
655 tag);
656 else if (nb_candidate_update(candidate->config) != NB_OK)
657 tag->responder.Finish(
658 tag->response,
659 grpc::Status(
660 grpc::StatusCode::INTERNAL,
661 "failed to update candidate configuration"),
662 tag);
663
664 else
665 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
666
667 tag->state = FINISH;
668 }
669
670 void HandleUnaryEditCandidate(
671 NewRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> *tag)
672 {
673 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
674
675 if (tag->state == FINISH) {
676 tag->state = DELETED;
677 return;
678 }
679
680 // Request: uint32 candidate_id = 1;
681 uint32_t candidate_id = tag->request.candidate_id();
682
683 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
684
685 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
686
687 if (!candidate) {
688 tag->responder.Finish(
689 tag->response,
690 grpc::Status(grpc::StatusCode::NOT_FOUND,
691 "candidate configuration not found"),
692 tag);
693 tag->state = FINISH;
694 return;
695 }
696
697 struct nb_config *candidate_tmp = nb_config_dup(candidate->config);
698
699 auto pvs = tag->request.update();
700 for (const frr::PathValue &pv : pvs) {
701 if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), pv.value())
702 != 0) {
703 nb_config_free(candidate_tmp);
704
705 tag->responder.Finish(
706 tag->response,
707 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
708 "Failed to update \"" + pv.path()
709 + "\""),
710 tag);
711
712 tag->state = FINISH;
713 return;
714 }
715 }
716
717 pvs = tag->request.delete_();
718 for (const frr::PathValue &pv : pvs) {
719 if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) {
720 nb_config_free(candidate_tmp);
721 tag->responder.Finish(
722 tag->response,
723 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
724 "Failed to remove \"" + pv.path()
725 + "\""),
726 tag);
727 tag->state = FINISH;
728 return;
729 }
730 }
731
732 // No errors, accept all changes.
733 nb_config_replace(candidate->config, candidate_tmp, false);
734
735 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
736
737 tag->state = FINISH;
738 }
739
740 void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest,
741 frr::LoadToCandidateResponse> *tag)
742 {
743 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
744
745 if (tag->state == FINISH) {
746 tag->state = DELETED;
747 return;
748 }
749
750 // Request: uint32 candidate_id = 1;
751 uint32_t candidate_id = tag->request.candidate_id();
752
753 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
754
755 // Request: LoadType type = 2;
756 int load_type = tag->request.type();
757 // Request: DataTree config = 3;
758 auto config = tag->request.config();
759
760
761 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
762
763 if (!candidate) {
764 tag->responder.Finish(
765 tag->response,
766 grpc::Status(grpc::StatusCode::NOT_FOUND,
767 "candidate configuration not found"),
768 tag);
769 tag->state = FINISH;
770 return;
771 }
772
773 struct lyd_node *dnode = dnode_from_data_tree(&config, true);
774 if (!dnode) {
775 tag->responder.Finish(
776 tag->response,
777 grpc::Status(grpc::StatusCode::INTERNAL,
778 "Failed to parse the configuration"),
779 tag);
780 tag->state = FINISH;
781 return;
782 }
783
784 struct nb_config *loaded_config = nb_config_new(dnode);
785
786 if (load_type == frr::LoadToCandidateRequest::REPLACE)
787 nb_config_replace(candidate->config, loaded_config, false);
788 else if (nb_config_merge(candidate->config, loaded_config, false)
789 != NB_OK) {
790 tag->responder.Finish(
791 tag->response,
792 grpc::Status(
793 grpc::StatusCode::INTERNAL,
794 "Failed to merge the loaded configuration"),
795 tag);
796 tag->state = FINISH;
797 return;
798 }
799
800 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
801 tag->state = FINISH;
802 }
803
804 void HandleUnaryCommit(
805 NewRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
806 {
807 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
808
809 if (tag->state == FINISH) {
810 tag->state = DELETED;
811 return;
812 }
813
814 // Request: uint32 candidate_id = 1;
815 uint32_t candidate_id = tag->request.candidate_id();
816
817 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
818
819 // Request: Phase phase = 2;
820 int phase = tag->request.phase();
821 // Request: string comment = 3;
822 const std::string comment = tag->request.comment();
823
824 // Find candidate configuration.
825 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
826 if (!candidate) {
827 tag->responder.Finish(
828 tag->response,
829 grpc::Status(grpc::StatusCode::NOT_FOUND,
830 "candidate configuration not found"),
831 tag);
832 tag->state = FINISH;
833 return;
834 }
835
836 int ret = NB_OK;
837 uint32_t transaction_id = 0;
838
839 // Check for misuse of the two-phase commit protocol.
840 switch (phase) {
841 case frr::CommitRequest::PREPARE:
842 case frr::CommitRequest::ALL:
843 if (candidate->transaction) {
844 tag->responder.Finish(
845 tag->response,
846 grpc::Status(
847 grpc::StatusCode::FAILED_PRECONDITION,
848 "candidate is in the middle of a transaction"),
849 tag);
850 tag->state = FINISH;
851 return;
852 }
853 break;
854 case frr::CommitRequest::ABORT:
855 case frr::CommitRequest::APPLY:
856 if (!candidate->transaction) {
857 tag->responder.Finish(
858 tag->response,
859 grpc::Status(
860 grpc::StatusCode::FAILED_PRECONDITION,
861 "no transaction in progress"),
862 tag);
863 tag->state = FINISH;
864 return;
865 }
866 break;
867 default:
868 break;
869 }
870
871
872 // Execute the user request.
873 struct nb_context context = {};
874 context.client = NB_CLIENT_GRPC;
875 char errmsg[BUFSIZ] = {0};
876
877 switch (phase) {
878 case frr::CommitRequest::VALIDATE:
879 grpc_debug("`-> Performing VALIDATE");
880 ret = nb_candidate_validate(&context, candidate->config, errmsg,
881 sizeof(errmsg));
882 break;
883 case frr::CommitRequest::PREPARE:
884 grpc_debug("`-> Performing PREPARE");
885 ret = nb_candidate_commit_prepare(
886 &context, candidate->config, comment.c_str(),
887 &candidate->transaction, errmsg, sizeof(errmsg));
888 break;
889 case frr::CommitRequest::ABORT:
890 grpc_debug("`-> Performing ABORT");
891 nb_candidate_commit_abort(candidate->transaction, errmsg,
892 sizeof(errmsg));
893 break;
894 case frr::CommitRequest::APPLY:
895 grpc_debug("`-> Performing APPLY");
896 nb_candidate_commit_apply(candidate->transaction, true,
897 &transaction_id, errmsg,
898 sizeof(errmsg));
899 break;
900 case frr::CommitRequest::ALL:
901 grpc_debug("`-> Performing ALL");
902 ret = nb_candidate_commit(&context, candidate->config, true,
903 comment.c_str(), &transaction_id,
904 errmsg, sizeof(errmsg));
905 break;
906 }
907
908 // Map northbound error codes to gRPC status codes.
909 grpc::Status status;
910 switch (ret) {
911 case NB_OK:
912 status = grpc::Status::OK;
913 break;
914 case NB_ERR_NO_CHANGES:
915 status = grpc::Status(grpc::StatusCode::ABORTED, errmsg);
916 break;
917 case NB_ERR_LOCKED:
918 status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg);
919 break;
920 case NB_ERR_VALIDATION:
921 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
922 errmsg);
923 break;
924 case NB_ERR_RESOURCE:
925 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
926 errmsg);
927 break;
928 case NB_ERR:
929 default:
930 status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg);
931 break;
932 }
933
934 grpc_debug("`-> Result: %s (message: '%s')",
935 nb_err_name((enum nb_error)ret), errmsg);
936
937 if (ret == NB_OK) {
938 // Response: uint32 transaction_id = 1;
939 if (transaction_id)
940 tag->response.set_transaction_id(transaction_id);
941 }
942 if (strlen(errmsg) > 0)
943 tag->response.set_error_message(errmsg);
944
945 tag->responder.Finish(tag->response, status, tag);
946 tag->state = FINISH;
947 }
948
949 void HandleUnaryLockConfig(
950 NewRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
951 {
952 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
953
954 if (tag->state == FINISH) {
955 tag->state = DELETED;
956 return;
957 }
958
959 if (nb_running_lock(NB_CLIENT_GRPC, NULL)) {
960 tag->responder.Finish(
961 tag->response,
962 grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
963 "running configuration is locked already"),
964 tag);
965 } else {
966 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
967 }
968 tag->state = FINISH;
969 }
970
971 void HandleUnaryUnlockConfig(
972 NewRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
973 {
974 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
975
976 if (tag->state == FINISH) {
977 tag->state = DELETED;
978 return;
979 }
980
981 if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) {
982 tag->responder.Finish(
983 tag->response,
984 grpc::Status(
985 grpc::StatusCode::FAILED_PRECONDITION,
986 "failed to unlock the running configuration"),
987 tag);
988 } else {
989 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
990 }
991 tag->state = FINISH;
992 }
993
994 static void list_transactions_cb(void *arg, int transaction_id,
995 const char *client_name, const char *date,
996 const char *comment)
997 {
998 auto list = static_cast<std::list<
999 std::tuple<int, std::string, std::string, std::string>> *>(arg);
1000 list->push_back(
1001 std::make_tuple(transaction_id, std::string(client_name),
1002 std::string(date), std::string(comment)));
1003 }
1004
1005 void HandleStreamingListTransactions(
1006 NewRpcState<frr::ListTransactionsRequest, frr::ListTransactionsResponse>
1007 *tag)
1008 {
1009 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
1010
1011 if (tag->state == FINISH) {
1012 delete static_cast<std::list<std::tuple<
1013 int, std::string, std::string, std::string>> *>(
1014 tag->context);
1015 tag->state = DELETED;
1016 return;
1017 }
1018
1019 if (!tag->context) {
1020 /* Creating, first time called for this RPC */
1021 auto new_list =
1022 new std::list<std::tuple<int, std::string, std::string,
1023 std::string>>();
1024 tag->context = new_list;
1025 nb_db_transactions_iterate(list_transactions_cb, tag->context);
1026
1027 new_list->push_back(std::make_tuple(
1028 0xFFFF, std::string("fake client"),
1029 std::string("fake date"), std::string("fake comment")));
1030 new_list->push_back(
1031 std::make_tuple(0xFFFE, std::string("fake client2"),
1032 std::string("fake date"),
1033 std::string("fake comment2")));
1034 }
1035
1036 auto list = static_cast<std::list<
1037 std::tuple<int, std::string, std::string, std::string>> *>(
1038 tag->context);
1039
1040 if (list->empty()) {
1041 tag->async_responder.Finish(grpc::Status::OK, tag);
1042 tag->state = FINISH;
1043 return;
1044 }
1045
1046 auto item = list->back();
1047
1048 frr::ListTransactionsResponse response;
1049
1050 // Response: uint32 id = 1;
1051 response.set_id(std::get<0>(item));
1052
1053 // Response: string client = 2;
1054 response.set_client(std::get<1>(item).c_str());
1055
1056 // Response: string date = 3;
1057 response.set_date(std::get<2>(item).c_str());
1058
1059 // Response: string comment = 4;
1060 response.set_comment(std::get<3>(item).c_str());
1061
1062 list->pop_back();
1063 if (list->empty()) {
1064 tag->async_responder.WriteAndFinish(
1065 response, grpc::WriteOptions(), grpc::Status::OK, tag);
1066 tag->state = FINISH;
1067 } else {
1068 tag->async_responder.Write(response, tag);
1069 tag->state = MORE;
1070 }
1071 }
1072
1073 void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest,
1074 frr::GetTransactionResponse> *tag)
1075 {
1076 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
1077
1078 if (tag->state == FINISH) {
1079 tag->state = DELETED;
1080 return;
1081 }
1082
1083 // Request: uint32 transaction_id = 1;
1084 uint32_t transaction_id = tag->request.transaction_id();
1085 // Request: Encoding encoding = 2;
1086 frr::Encoding encoding = tag->request.encoding();
1087 // Request: bool with_defaults = 3;
1088 bool with_defaults = tag->request.with_defaults();
1089
1090 grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__,
1091 transaction_id, encoding);
1092
1093 struct nb_config *nb_config;
1094
1095 // Load configuration from the transactions database.
1096 nb_config = nb_db_transaction_load(transaction_id);
1097 if (!nb_config) {
1098 tag->responder.Finish(
1099 tag->response,
1100 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1101 "Transaction not found"),
1102 tag);
1103 tag->state = FINISH;
1104 return;
1105 }
1106
1107 // Response: DataTree config = 1;
1108 auto config = tag->response.mutable_config();
1109 config->set_encoding(encoding);
1110
1111 // Dump data using the requested format.
1112 if (data_tree_from_dnode(config, nb_config->dnode,
1113 encoding2lyd_format(encoding), with_defaults)
1114 != 0) {
1115 nb_config_free(nb_config);
1116 tag->responder.Finish(tag->response,
1117 grpc::Status(grpc::StatusCode::INTERNAL,
1118 "Failed to dump data"),
1119 tag);
1120 tag->state = FINISH;
1121 return;
1122 }
1123
1124 nb_config_free(nb_config);
1125
1126 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
1127 tag->state = FINISH;
1128 }
1129
1130 void HandleUnaryExecute(
1131 NewRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
1132 {
1133 grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
1134
1135 if (tag->state == FINISH) {
1136 tag->state = DELETED;
1137 return;
1138 }
1139
1140 struct nb_node *nb_node;
1141 struct list *input_list;
1142 struct list *output_list;
1143 struct listnode *node;
1144 struct yang_data *data;
1145 const char *xpath;
1146 char errmsg[BUFSIZ] = {0};
1147
1148 // Request: string path = 1;
1149 xpath = tag->request.path().c_str();
1150
1151 grpc_debug("%s(path: \"%s\")", __func__, xpath);
1152
1153 if (tag->request.path().empty()) {
1154 tag->responder.Finish(
1155 tag->response,
1156 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1157 "Data path is empty"),
1158 tag);
1159 tag->state = FINISH;
1160 return;
1161 }
1162
1163 nb_node = nb_node_find(xpath);
1164 if (!nb_node) {
1165 tag->responder.Finish(
1166 tag->response,
1167 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1168 "Unknown data path"),
1169 tag);
1170 tag->state = FINISH;
1171 return;
1172 }
1173
1174 input_list = yang_data_list_new();
1175 output_list = yang_data_list_new();
1176
1177 // Read input parameters.
1178 auto input = tag->request.input();
1179 for (const frr::PathValue &pv : input) {
1180 // Request: repeated PathValue input = 2;
1181 data = yang_data_new(pv.path().c_str(), pv.value().c_str());
1182 listnode_add(input_list, data);
1183 }
1184
1185 // Execute callback registered for this XPath.
1186 if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg,
1187 sizeof(errmsg))
1188 != NB_OK) {
1189 flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s",
1190 __func__, xpath);
1191 list_delete(&input_list);
1192 list_delete(&output_list);
1193
1194 tag->responder.Finish(
1195 tag->response,
1196 grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"),
1197 tag);
1198 tag->state = FINISH;
1199 return;
1200 }
1201
1202 // Process output parameters.
1203 for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
1204 // Response: repeated PathValue output = 1;
1205 frr::PathValue *pv = tag->response.add_output();
1206 pv->set_path(data->xpath);
1207 pv->set_value(data->value);
1208 }
1209
1210 // Release memory.
1211 list_delete(&input_list);
1212 list_delete(&output_list);
1213
1214 tag->responder.Finish(tag->response, grpc::Status::OK, tag);
1215 tag->state = FINISH;
1216 }
1217
1218 // ------------------------------------------------------
1219 // Thread Initialization and Run Functions
1220 // ------------------------------------------------------
1221
1222
1223 #define REQUEST_NEWRPC(NAME, cdb) \
1224 do { \
1225 auto _rpcState = new NewRpcState<frr::NAME##Request, \
1226 frr::NAME##Response>( \
1227 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1228 &HandleUnary##NAME, #NAME); \
1229 _rpcState->do_request(service, s_cq); \
1230 } while (0)
1231
1232 #define REQUEST_NEWRPC_STREAMING(NAME, cdb) \
1233 do { \
1234 auto _rpcState = new NewRpcState<frr::NAME##Request, \
1235 frr::NAME##Response>( \
1236 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1237 &HandleStreaming##NAME, #NAME); \
1238 _rpcState->do_request(service, s_cq); \
1239 } while (0)
1240
1241 struct grpc_pthread_attr {
1242 struct frr_pthread_attr attr;
1243 unsigned long port;
1244 };
1245
1246 // Capture these objects so we can try to shut down cleanly
1247 static std::unique_ptr<grpc::Server> s_server;
1248 static grpc::ServerCompletionQueue *s_cq;
1249
1250 static void *grpc_pthread_start(void *arg)
1251 {
1252 struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
1253 uint port = (uint) reinterpret_cast<intptr_t>(fpt->data);
1254
1255 Candidates candidates;
1256 grpc::ServerBuilder builder;
1257 std::stringstream server_address;
1258 frr::Northbound::AsyncService *service =
1259 new frr::Northbound::AsyncService();
1260
1261 frr_pthread_set_name(fpt);
1262
1263 server_address << "0.0.0.0:" << port;
1264 builder.AddListeningPort(server_address.str(),
1265 grpc::InsecureServerCredentials());
1266 builder.RegisterService(service);
1267 auto cq = builder.AddCompletionQueue();
1268 s_cq = cq.get();
1269 s_server = builder.BuildAndStart();
1270
1271 /* Schedule all RPC handlers */
1272 REQUEST_NEWRPC(GetCapabilities, NULL);
1273 REQUEST_NEWRPC(CreateCandidate, &candidates);
1274 REQUEST_NEWRPC(DeleteCandidate, &candidates);
1275 REQUEST_NEWRPC(UpdateCandidate, &candidates);
1276 REQUEST_NEWRPC(EditCandidate, &candidates);
1277 REQUEST_NEWRPC(LoadToCandidate, &candidates);
1278 REQUEST_NEWRPC(Commit, &candidates);
1279 REQUEST_NEWRPC(GetTransaction, NULL);
1280 REQUEST_NEWRPC(LockConfig, NULL);
1281 REQUEST_NEWRPC(UnlockConfig, NULL);
1282 REQUEST_NEWRPC(Execute, NULL);
1283 REQUEST_NEWRPC_STREAMING(Get, NULL);
1284 REQUEST_NEWRPC_STREAMING(ListTransactions, NULL);
1285
1286 zlog_notice("gRPC server listening on %s",
1287 server_address.str().c_str());
1288
1289 /* Process inbound RPCs */
1290 while (true) {
1291 void *tag;
1292 bool ok;
1293
1294 s_cq->Next(&tag, &ok);
1295 if (!ok)
1296 break;
1297
1298 grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__,
1299 tag, ok);
1300
1301 RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
1302 CallState state = rpc->doCallback();
1303 grpc_debug("%s: Callback returned RPC State: %s", __func__,
1304 call_states[state]);
1305
1306 /*
1307 * Our side is done (FINISH) receive new requests of this type
1308 * We could do this earlier but that would mean we could be
1309 * handling multiple same type requests in parallel. We expect
1310 * to be called back once more in the FINISH state (from the
1311 * user indicating Finish() for cleanup.
1312 */
1313 if (state == FINISH)
1314 rpc->do_request(service, s_cq);
1315 }
1316
1317 return NULL;
1318 }
1319
1320
1321 static int frr_grpc_init(uint port)
1322 {
1323 struct frr_pthread_attr attr = {
1324 .start = grpc_pthread_start,
1325 .stop = NULL,
1326 };
1327
1328 fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
1329 fpt->data = reinterpret_cast<void *>((intptr_t)port);
1330
1331 /* Create a pthread for gRPC since it runs its own event loop. */
1332 if (frr_pthread_run(fpt, NULL) < 0) {
1333 flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s",
1334 __func__, safe_strerror(errno));
1335 return -1;
1336 }
1337
1338 return 0;
1339 }
1340
1341 static int frr_grpc_finish(void)
1342 {
1343 // Shutdown the grpc server
1344 if (s_server) {
1345 s_server->Shutdown();
1346 s_cq->Shutdown();
1347
1348 // And drain the queue
1349 void *ignore;
1350 bool ok;
1351
1352 while (s_cq->Next(&ignore, &ok))
1353 ;
1354 }
1355
1356 if (fpt) {
1357 pthread_join(fpt->thread, NULL);
1358 frr_pthread_destroy(fpt);
1359 }
1360
1361 return 0;
1362 }
1363
1364 /*
1365 * This is done this way because module_init and module_late_init are both
1366 * called during daemon pre-fork initialization. Because the GRPC library
1367 * spawns threads internally, we need to delay initializing it until after
1368 * fork. This is done by scheduling this init function as an event task, since
1369 * the event loop doesn't run until after fork.
1370 */
1371 static int frr_grpc_module_very_late_init(struct thread *thread)
1372 {
1373 const char *args = THIS_MODULE->load_args;
1374 uint port = GRPC_DEFAULT_PORT;
1375
1376 if (args) {
1377 port = std::stoul(args);
1378 if (port < 1024 || port > UINT16_MAX) {
1379 flog_err(EC_LIB_GRPC_INIT,
1380 "%s: port number must be between 1025 and %d",
1381 __func__, UINT16_MAX);
1382 goto error;
1383 }
1384 }
1385
1386 if (frr_grpc_init(port) < 0)
1387 goto error;
1388
1389 return 0;
1390
1391 error:
1392 flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module");
1393 return -1;
1394 }
1395
1396 static int frr_grpc_module_late_init(struct thread_master *tm)
1397 {
1398 main_master = tm;
1399 hook_register(frr_fini, frr_grpc_finish);
1400 thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL);
1401 return 0;
1402 }
1403
1404 static int frr_grpc_module_init(void)
1405 {
1406 hook_register(frr_late_init, frr_grpc_module_late_init);
1407
1408 return 0;
1409 }
1410
1411 FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION,
1412 .description = "FRR gRPC northbound module",
1413 .init = frr_grpc_module_init, );