2 // Copyright (C) 2019 NetDEF, Inc.
4 // Copyright (c) 2021, LabN Consulting, L.L.C
6 // This program is free software; you can redistribute it and/or modify it
7 // under the terms of the GNU General Public License as published by the Free
8 // Software Foundation; either version 2 of the License, or (at your option)
11 // This program is distributed in the hope that it will be useful, but WITHOUT
12 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 // You should have received a copy of the GNU General Public License along
17 // with this program; see the file COPYING; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <grpcpp/grpcpp.h>
23 #include "grpc/frr-northbound.grpc.pb.h"
27 #include "lib/version.h"
28 #include "lib/thread.h"
30 #include "lib_errors.h"
31 #include "northbound.h"
32 #include "northbound_db.h"
33 #include "frr_pthread.h"
40 #define GRPC_DEFAULT_PORT 50051
43 * NOTE: we can't use the FRR debugging infrastructure here since it uses
44 * atomics and C++ has a different atomics API. Enable gRPC debugging
45 * unconditionally until we figure out a way to solve this problem.
47 static bool nb_dbg_client_grpc
= 0;
49 static struct thread_master
*main_master
;
51 static struct frr_pthread
*fpt
;
53 #define grpc_debug(...) \
55 if (nb_dbg_client_grpc) \
56 zlog_debug(__VA_ARGS__); \
59 // ------------------------------------------------------
61 // ------------------------------------------------------
63 enum CallState
{ CREATE
, PROCESS
, MORE
, FINISH
, DELETED
};
64 const char *call_states
[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"};
68 struct nb_config
*config
;
69 struct nb_transaction
*transaction
;
78 for (auto it
= _cdb
.begin(); it
!= _cdb
.end(); it
++)
79 delete_candidate(&it
->second
);
82 struct candidate
*create_candidate(void)
84 uint64_t id
= ++_next_id
;
85 assert(id
); // TODO: implement an algorithm for unique reusable
87 struct candidate
*c
= &_cdb
[id
];
89 c
->config
= nb_config_dup(running_config
);
90 c
->transaction
= NULL
;
95 void delete_candidate(struct candidate
*c
)
97 char errmsg
[BUFSIZ
] = {0};
100 nb_config_free(c
->config
);
102 nb_candidate_commit_abort(c
->transaction
, errmsg
,
106 struct candidate
*get_candidate(uint32_t id
)
108 return _cdb
.count(id
) == 0 ? NULL
: &_cdb
[id
];
112 uint64_t _next_id
= 0;
113 std::map
<uint32_t, struct candidate
> _cdb
;
119 virtual CallState
doCallback() = 0;
120 virtual void do_request(::frr::Northbound::AsyncService
*service
,
121 ::grpc::ServerCompletionQueue
*cq
) = 0;
125 * The RPC state class is used to track the execution of an RPC.
127 template <typename Q
, typename S
> class NewRpcState
: RpcStateBase
129 typedef void (frr::Northbound::AsyncService::*reqfunc_t
)(
130 ::grpc::ServerContext
*, Q
*,
131 ::grpc::ServerAsyncResponseWriter
<S
> *,
132 ::grpc::CompletionQueue
*, ::grpc::ServerCompletionQueue
*,
134 typedef void (frr::Northbound::AsyncService::*reqsfunc_t
)(
135 ::grpc::ServerContext
*, Q
*, ::grpc::ServerAsyncWriter
<S
> *,
136 ::grpc::CompletionQueue
*, ::grpc::ServerCompletionQueue
*,
140 NewRpcState(Candidates
*cdb
, reqfunc_t rfunc
,
141 void (*cb
)(NewRpcState
<Q
, S
> *), const char *name
)
142 : requestf(rfunc
), callback(cb
), responder(&ctx
),
143 async_responder(&ctx
), name(name
), cdb(cdb
){};
144 NewRpcState(Candidates
*cdb
, reqsfunc_t rfunc
,
145 void (*cb
)(NewRpcState
<Q
, S
> *), const char *name
)
146 : requestsf(rfunc
), callback(cb
), responder(&ctx
),
147 async_responder(&ctx
), name(name
), cdb(cdb
){};
149 CallState
doCallback() override
151 CallState enter_state
= this->state
;
153 if (enter_state
== FINISH
) {
154 grpc_debug("%s RPC FINISH -> DELETED", name
);
157 grpc_debug("%s RPC: %s -> PROCESS", name
,
158 call_states
[this->state
]);
162 * We are either in state CREATE, MORE or FINISH. If CREATE or
163 * MORE move back to PROCESS, otherwise we are cleaning up
164 * (FINISH) so leave it in that state. Run the callback on the
165 * main threadmaster/pthread; and wait for expected transition
166 * from main thread. If transition is to FINISH->DELETED.
169 * We update the state prior to scheduling the callback which
170 * may then update the state in the master pthread. Then we
171 * obtain the lock in the condvar-check-loop as the callback
172 * will be modifying updating the state value.
174 this->state
= new_state
;
175 thread_add_event(main_master
, c_callback
, (void *)this, 0,
177 pthread_mutex_lock(&this->cmux
);
178 while (this->state
== new_state
)
179 pthread_cond_wait(&this->cond
, &this->cmux
);
180 pthread_mutex_unlock(&this->cmux
);
182 if (this->state
== DELETED
) {
183 grpc_debug("%s RPC: -> [DELETED]", name
);
190 void do_request(::frr::Northbound::AsyncService
*service
,
191 ::grpc::ServerCompletionQueue
*cq
) override
193 grpc_debug("%s, posting a request for: %s", __func__
, name
);
195 NewRpcState
<Q
, S
> *copy
=
196 new NewRpcState(cdb
, requestf
, callback
, name
);
197 (service
->*requestf
)(©
->ctx
, ©
->request
,
198 ©
->responder
, cq
, cq
, copy
);
200 NewRpcState
<Q
, S
> *copy
=
201 new NewRpcState(cdb
, requestsf
, callback
, name
);
202 (service
->*requestsf
)(©
->ctx
, ©
->request
,
203 ©
->async_responder
, cq
, cq
,
209 static int c_callback(struct thread
*thread
)
211 auto _tag
= static_cast<NewRpcState
<Q
, S
> *>(thread
->arg
);
213 * We hold the lock until the callback finishes and has updated
214 * _tag->state, then we signal done and release.
216 pthread_mutex_lock(&_tag
->cmux
);
218 CallState enter_state
= _tag
->state
;
219 grpc_debug("%s RPC running on main thread", _tag
->name
);
221 _tag
->callback(_tag
);
223 grpc_debug("%s RPC: %s -> %s", _tag
->name
,
224 call_states
[enter_state
], call_states
[_tag
->state
]);
226 pthread_cond_signal(&_tag
->cond
);
227 pthread_mutex_unlock(&_tag
->cmux
);
230 NewRpcState
<Q
, S
> *orig
;
233 grpc::ServerContext ctx
;
236 grpc::ServerAsyncResponseWriter
<S
> responder
;
237 grpc::ServerAsyncWriter
<S
> async_responder
;
240 void (*callback
)(NewRpcState
<Q
, S
> *);
242 reqsfunc_t requestsf
;
244 pthread_mutex_t cmux
= PTHREAD_MUTEX_INITIALIZER
;
245 pthread_cond_t cond
= PTHREAD_COND_INITIALIZER
;
248 CallState state
= CREATE
;
251 // ------------------------------------------------------
253 // ------------------------------------------------------
255 static LYD_FORMAT
encoding2lyd_format(enum frr::Encoding encoding
)
263 flog_err(EC_LIB_DEVELOPMENT
,
264 "%s: unknown data encoding format (%u)", __func__
,
270 static int yang_dnode_edit(struct lyd_node
*dnode
, const std::string
&path
,
271 const std::string
&value
)
273 LY_ERR err
= lyd_new_path(dnode
, ly_native_ctx
, path
.c_str(),
274 value
.c_str(), LYD_NEW_PATH_UPDATE
, &dnode
);
275 if (err
!= LY_SUCCESS
) {
276 flog_warn(EC_LIB_LIBYANG
, "%s: lyd_new_path() failed: %s",
277 __func__
, ly_errmsg(ly_native_ctx
));
284 static int yang_dnode_delete(struct lyd_node
*dnode
, const std::string
&path
)
286 dnode
= yang_dnode_get(dnode
, path
.c_str());
290 lyd_free_tree(dnode
);
295 static LY_ERR
data_tree_from_dnode(frr::DataTree
*dt
,
296 const struct lyd_node
*dnode
,
297 LYD_FORMAT lyd_format
, bool with_defaults
)
302 SET_FLAG(options
, LYD_PRINT_WITHSIBLINGS
);
304 SET_FLAG(options
, LYD_PRINT_WD_ALL
);
306 SET_FLAG(options
, LYD_PRINT_WD_TRIM
);
308 LY_ERR err
= lyd_print_mem(&strp
, dnode
, lyd_format
, options
);
309 if (err
== LY_SUCCESS
) {
318 static struct lyd_node
*dnode_from_data_tree(const frr::DataTree
*dt
,
321 struct lyd_node
*dnode
;
326 options
= LYD_PARSE_NO_STATE
;
327 opt2
= LYD_VALIDATE_NO_STATE
;
329 options
= LYD_PARSE_STRICT
;
333 err
= lyd_parse_data_mem(ly_native_ctx
, dt
->data().c_str(),
334 encoding2lyd_format(dt
->encoding()), options
,
336 if (err
!= LY_SUCCESS
) {
337 flog_warn(EC_LIB_LIBYANG
, "%s: lyd_parse_mem() failed: %s",
338 __func__
, ly_errmsg(ly_native_ctx
));
343 static struct lyd_node
*get_dnode_config(const std::string
&path
)
345 struct lyd_node
*dnode
;
347 if (!yang_dnode_exists(running_config
->dnode
,
348 path
.empty() ? NULL
: path
.c_str()))
351 dnode
= yang_dnode_get(running_config
->dnode
,
352 path
.empty() ? NULL
: path
.c_str());
354 dnode
= yang_dnode_dup(dnode
);
359 static int get_oper_data_cb(const struct lysc_node
*snode
,
360 struct yang_translator
*translator
,
361 struct yang_data
*data
, void *arg
)
363 struct lyd_node
*dnode
= static_cast<struct lyd_node
*>(arg
);
364 int ret
= yang_dnode_edit(dnode
, data
->xpath
, data
->value
);
365 yang_data_free(data
);
367 return (ret
== 0) ? NB_OK
: NB_ERR
;
370 static struct lyd_node
*get_dnode_state(const std::string
&path
)
372 struct lyd_node
*dnode
= yang_dnode_new(ly_native_ctx
, false);
373 if (nb_oper_data_iterate(path
.c_str(), NULL
, 0, get_oper_data_cb
, dnode
)
375 yang_dnode_free(dnode
);
382 static grpc::Status
get_path(frr::DataTree
*dt
, const std::string
&path
,
383 int type
, LYD_FORMAT lyd_format
,
386 struct lyd_node
*dnode_config
= NULL
;
387 struct lyd_node
*dnode_state
= NULL
;
388 struct lyd_node
*dnode_final
;
390 // Configuration data.
391 if (type
== frr::GetRequest_DataType_ALL
392 || type
== frr::GetRequest_DataType_CONFIG
) {
393 dnode_config
= get_dnode_config(path
);
395 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
396 "Data path not found");
400 if (type
== frr::GetRequest_DataType_ALL
401 || type
== frr::GetRequest_DataType_STATE
) {
402 dnode_state
= get_dnode_state(path
);
405 yang_dnode_free(dnode_config
);
406 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
407 "Failed to fetch operational data");
412 case frr::GetRequest_DataType_ALL
:
414 // Combine configuration and state data into a single
417 if (lyd_merge_siblings(&dnode_state
, dnode_config
,
420 yang_dnode_free(dnode_state
);
421 yang_dnode_free(dnode_config
);
423 grpc::StatusCode::INTERNAL
,
424 "Failed to merge configuration and state data",
425 ly_errmsg(ly_native_ctx
));
428 dnode_final
= dnode_state
;
430 case frr::GetRequest_DataType_CONFIG
:
431 dnode_final
= dnode_config
;
433 case frr::GetRequest_DataType_STATE
:
434 dnode_final
= dnode_state
;
438 // Validate data to create implicit default nodes if necessary.
439 int validate_opts
= 0;
440 if (type
== frr::GetRequest_DataType_CONFIG
)
441 validate_opts
= LYD_VALIDATE_NO_STATE
;
445 LY_ERR err
= lyd_validate_all(&dnode_final
, ly_native_ctx
,
446 validate_opts
, NULL
);
449 flog_warn(EC_LIB_LIBYANG
, "%s: lyd_validate_all() failed: %s",
450 __func__
, ly_errmsg(ly_native_ctx
));
451 // Dump data using the requested format.
453 err
= data_tree_from_dnode(dt
, dnode_final
, lyd_format
,
455 yang_dnode_free(dnode_final
);
457 return grpc::Status(grpc::StatusCode::INTERNAL
,
458 "Failed to dump data");
459 return grpc::Status::OK
;
463 // ------------------------------------------------------
464 // RPC Callback Functions: run on main thread
465 // ------------------------------------------------------
467 void HandleUnaryGetCapabilities(NewRpcState
<frr::GetCapabilitiesRequest
,
468 frr::GetCapabilitiesResponse
> *tag
)
470 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
472 if (tag
->state
== FINISH
) {
473 tag
->state
= DELETED
;
477 // Response: string frr_version = 1;
478 tag
->response
.set_frr_version(FRR_VERSION
);
480 // Response: bool rollback_support = 2;
481 #ifdef HAVE_CONFIG_ROLLBACKS
482 tag
->response
.set_rollback_support(true);
484 tag
->response
.set_rollback_support(false);
486 // Response: repeated ModuleData supported_modules = 3;
487 struct yang_module
*module
;
488 RB_FOREACH (module
, yang_modules
, &yang_modules
) {
489 auto m
= tag
->response
.add_supported_modules();
491 m
->set_name(module
->name
);
492 if (module
->info
->revision
)
493 m
->set_revision(module
->info
->revision
);
494 m
->set_organization(module
->info
->org
);
497 // Response: repeated Encoding supported_encodings = 4;
498 tag
->response
.add_supported_encodings(frr::JSON
);
499 tag
->response
.add_supported_encodings(frr::XML
);
501 /* Should we do this in the async process call? */
502 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
504 /* Indicate we are done. */
508 void HandleStreamingGet(NewRpcState
<frr::GetRequest
, frr::GetResponse
> *tag
)
510 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
512 if (tag
->state
== FINISH
) {
513 delete static_cast<std::list
<std::string
> *>(tag
->context
);
514 tag
->state
= DELETED
;
519 /* Creating, first time called for this RPC */
520 auto mypaths
= new std::list
<std::string
>();
521 tag
->context
= mypaths
;
522 auto paths
= tag
->request
.path();
523 for (const std::string
&path
: paths
) {
524 mypaths
->push_back(std::string(path
));
528 // Request: DataType type = 1;
529 int type
= tag
->request
.type();
530 // Request: Encoding encoding = 2;
531 frr::Encoding encoding
= tag
->request
.encoding();
532 // Request: bool with_defaults = 3;
533 bool with_defaults
= tag
->request
.with_defaults();
535 auto mypathps
= static_cast<std::list
<std::string
> *>(tag
->context
);
536 if (mypathps
->empty()) {
537 tag
->async_responder
.Finish(grpc::Status::OK
, tag
);
542 frr::GetResponse response
;
545 // Response: int64 timestamp = 1;
546 response
.set_timestamp(time(NULL
));
548 // Response: DataTree data = 2;
549 auto *data
= response
.mutable_data();
550 data
->set_encoding(tag
->request
.encoding());
551 status
= get_path(data
, mypathps
->back().c_str(), type
,
552 encoding2lyd_format(encoding
), with_defaults
);
555 tag
->async_responder
.WriteAndFinish(
556 response
, grpc::WriteOptions(), status
, tag
);
561 mypathps
->pop_back();
562 if (mypathps
->empty()) {
563 tag
->async_responder
.WriteAndFinish(
564 response
, grpc::WriteOptions(), grpc::Status::OK
, tag
);
567 tag
->async_responder
.Write(response
, tag
);
572 void HandleUnaryCreateCandidate(NewRpcState
<frr::CreateCandidateRequest
,
573 frr::CreateCandidateResponse
> *tag
)
575 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
577 if (tag
->state
== FINISH
) {
578 tag
->state
= DELETED
;
582 struct candidate
*candidate
= tag
->cdb
->create_candidate();
584 tag
->responder
.Finish(
586 grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED
,
587 "Can't create candidate configuration"),
590 tag
->response
.set_candidate_id(candidate
->id
);
591 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
597 void HandleUnaryDeleteCandidate(NewRpcState
<frr::DeleteCandidateRequest
,
598 frr::DeleteCandidateResponse
> *tag
)
600 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
602 if (tag
->state
== FINISH
) {
603 tag
->state
= DELETED
;
607 // Request: uint32 candidate_id = 1;
608 uint32_t candidate_id
= tag
->request
.candidate_id();
610 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
612 struct candidate
*candidate
= tag
->cdb
->get_candidate(candidate_id
);
614 tag
->responder
.Finish(
616 grpc::Status(grpc::StatusCode::NOT_FOUND
,
617 "candidate configuration not found"),
620 tag
->cdb
->delete_candidate(candidate
);
621 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
626 void HandleUnaryUpdateCandidate(NewRpcState
<frr::UpdateCandidateRequest
,
627 frr::UpdateCandidateResponse
> *tag
)
629 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
631 if (tag
->state
== FINISH
) {
632 tag
->state
= DELETED
;
636 // Request: uint32 candidate_id = 1;
637 uint32_t candidate_id
= tag
->request
.candidate_id();
639 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
641 struct candidate
*candidate
= tag
->cdb
->get_candidate(candidate_id
);
644 tag
->responder
.Finish(
646 grpc::Status(grpc::StatusCode::NOT_FOUND
,
647 "candidate configuration not found"),
649 else if (candidate
->transaction
)
650 tag
->responder
.Finish(
653 grpc::StatusCode::FAILED_PRECONDITION
,
654 "candidate is in the middle of a transaction"),
656 else if (nb_candidate_update(candidate
->config
) != NB_OK
)
657 tag
->responder
.Finish(
660 grpc::StatusCode::INTERNAL
,
661 "failed to update candidate configuration"),
665 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
670 void HandleUnaryEditCandidate(
671 NewRpcState
<frr::EditCandidateRequest
, frr::EditCandidateResponse
> *tag
)
673 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
675 if (tag
->state
== FINISH
) {
676 tag
->state
= DELETED
;
680 // Request: uint32 candidate_id = 1;
681 uint32_t candidate_id
= tag
->request
.candidate_id();
683 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
685 struct candidate
*candidate
= tag
->cdb
->get_candidate(candidate_id
);
688 tag
->responder
.Finish(
690 grpc::Status(grpc::StatusCode::NOT_FOUND
,
691 "candidate configuration not found"),
697 struct nb_config
*candidate_tmp
= nb_config_dup(candidate
->config
);
699 auto pvs
= tag
->request
.update();
700 for (const frr::PathValue
&pv
: pvs
) {
701 if (yang_dnode_edit(candidate_tmp
->dnode
, pv
.path(), pv
.value())
703 nb_config_free(candidate_tmp
);
705 tag
->responder
.Finish(
707 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
708 "Failed to update \"" + pv
.path()
717 pvs
= tag
->request
.delete_();
718 for (const frr::PathValue
&pv
: pvs
) {
719 if (yang_dnode_delete(candidate_tmp
->dnode
, pv
.path()) != 0) {
720 nb_config_free(candidate_tmp
);
721 tag
->responder
.Finish(
723 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
724 "Failed to remove \"" + pv
.path()
732 // No errors, accept all changes.
733 nb_config_replace(candidate
->config
, candidate_tmp
, false);
735 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
740 void HandleUnaryLoadToCandidate(NewRpcState
<frr::LoadToCandidateRequest
,
741 frr::LoadToCandidateResponse
> *tag
)
743 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
745 if (tag
->state
== FINISH
) {
746 tag
->state
= DELETED
;
750 // Request: uint32 candidate_id = 1;
751 uint32_t candidate_id
= tag
->request
.candidate_id();
753 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
755 // Request: LoadType type = 2;
756 int load_type
= tag
->request
.type();
757 // Request: DataTree config = 3;
758 auto config
= tag
->request
.config();
761 struct candidate
*candidate
= tag
->cdb
->get_candidate(candidate_id
);
764 tag
->responder
.Finish(
766 grpc::Status(grpc::StatusCode::NOT_FOUND
,
767 "candidate configuration not found"),
773 struct lyd_node
*dnode
= dnode_from_data_tree(&config
, true);
775 tag
->responder
.Finish(
777 grpc::Status(grpc::StatusCode::INTERNAL
,
778 "Failed to parse the configuration"),
784 struct nb_config
*loaded_config
= nb_config_new(dnode
);
786 if (load_type
== frr::LoadToCandidateRequest::REPLACE
)
787 nb_config_replace(candidate
->config
, loaded_config
, false);
788 else if (nb_config_merge(candidate
->config
, loaded_config
, false)
790 tag
->responder
.Finish(
793 grpc::StatusCode::INTERNAL
,
794 "Failed to merge the loaded configuration"),
800 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
804 void HandleUnaryCommit(
805 NewRpcState
<frr::CommitRequest
, frr::CommitResponse
> *tag
)
807 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
809 if (tag
->state
== FINISH
) {
810 tag
->state
= DELETED
;
814 // Request: uint32 candidate_id = 1;
815 uint32_t candidate_id
= tag
->request
.candidate_id();
817 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
819 // Request: Phase phase = 2;
820 int phase
= tag
->request
.phase();
821 // Request: string comment = 3;
822 const std::string comment
= tag
->request
.comment();
824 // Find candidate configuration.
825 struct candidate
*candidate
= tag
->cdb
->get_candidate(candidate_id
);
827 tag
->responder
.Finish(
829 grpc::Status(grpc::StatusCode::NOT_FOUND
,
830 "candidate configuration not found"),
837 uint32_t transaction_id
= 0;
839 // Check for misuse of the two-phase commit protocol.
841 case frr::CommitRequest::PREPARE
:
842 case frr::CommitRequest::ALL
:
843 if (candidate
->transaction
) {
844 tag
->responder
.Finish(
847 grpc::StatusCode::FAILED_PRECONDITION
,
848 "candidate is in the middle of a transaction"),
854 case frr::CommitRequest::ABORT
:
855 case frr::CommitRequest::APPLY
:
856 if (!candidate
->transaction
) {
857 tag
->responder
.Finish(
860 grpc::StatusCode::FAILED_PRECONDITION
,
861 "no transaction in progress"),
872 // Execute the user request.
873 struct nb_context context
= {};
874 context
.client
= NB_CLIENT_GRPC
;
875 char errmsg
[BUFSIZ
] = {0};
878 case frr::CommitRequest::VALIDATE
:
879 grpc_debug("`-> Performing VALIDATE");
880 ret
= nb_candidate_validate(&context
, candidate
->config
, errmsg
,
883 case frr::CommitRequest::PREPARE
:
884 grpc_debug("`-> Performing PREPARE");
885 ret
= nb_candidate_commit_prepare(
886 &context
, candidate
->config
, comment
.c_str(),
887 &candidate
->transaction
, errmsg
, sizeof(errmsg
));
889 case frr::CommitRequest::ABORT
:
890 grpc_debug("`-> Performing ABORT");
891 nb_candidate_commit_abort(candidate
->transaction
, errmsg
,
894 case frr::CommitRequest::APPLY
:
895 grpc_debug("`-> Performing APPLY");
896 nb_candidate_commit_apply(candidate
->transaction
, true,
897 &transaction_id
, errmsg
,
900 case frr::CommitRequest::ALL
:
901 grpc_debug("`-> Performing ALL");
902 ret
= nb_candidate_commit(&context
, candidate
->config
, true,
903 comment
.c_str(), &transaction_id
,
904 errmsg
, sizeof(errmsg
));
908 // Map northbound error codes to gRPC status codes.
912 status
= grpc::Status::OK
;
914 case NB_ERR_NO_CHANGES
:
915 status
= grpc::Status(grpc::StatusCode::ABORTED
, errmsg
);
918 status
= grpc::Status(grpc::StatusCode::UNAVAILABLE
, errmsg
);
920 case NB_ERR_VALIDATION
:
921 status
= grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
924 case NB_ERR_RESOURCE
:
925 status
= grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED
,
930 status
= grpc::Status(grpc::StatusCode::INTERNAL
, errmsg
);
934 grpc_debug("`-> Result: %s (message: '%s')",
935 nb_err_name((enum nb_error
)ret
), errmsg
);
938 // Response: uint32 transaction_id = 1;
940 tag
->response
.set_transaction_id(transaction_id
);
942 if (strlen(errmsg
) > 0)
943 tag
->response
.set_error_message(errmsg
);
945 tag
->responder
.Finish(tag
->response
, status
, tag
);
949 void HandleUnaryLockConfig(
950 NewRpcState
<frr::LockConfigRequest
, frr::LockConfigResponse
> *tag
)
952 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
954 if (tag
->state
== FINISH
) {
955 tag
->state
= DELETED
;
959 if (nb_running_lock(NB_CLIENT_GRPC
, NULL
)) {
960 tag
->responder
.Finish(
962 grpc::Status(grpc::StatusCode::FAILED_PRECONDITION
,
963 "running configuration is locked already"),
966 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
971 void HandleUnaryUnlockConfig(
972 NewRpcState
<frr::UnlockConfigRequest
, frr::UnlockConfigResponse
> *tag
)
974 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
976 if (tag
->state
== FINISH
) {
977 tag
->state
= DELETED
;
981 if (nb_running_unlock(NB_CLIENT_GRPC
, NULL
)) {
982 tag
->responder
.Finish(
985 grpc::StatusCode::FAILED_PRECONDITION
,
986 "failed to unlock the running configuration"),
989 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
994 static void list_transactions_cb(void *arg
, int transaction_id
,
995 const char *client_name
, const char *date
,
998 auto list
= static_cast<std::list
<
999 std::tuple
<int, std::string
, std::string
, std::string
>> *>(arg
);
1001 std::make_tuple(transaction_id
, std::string(client_name
),
1002 std::string(date
), std::string(comment
)));
1005 void HandleStreamingListTransactions(
1006 NewRpcState
<frr::ListTransactionsRequest
, frr::ListTransactionsResponse
>
1009 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
1011 if (tag
->state
== FINISH
) {
1012 delete static_cast<std::list
<std::tuple
<
1013 int, std::string
, std::string
, std::string
>> *>(
1015 tag
->state
= DELETED
;
1019 if (!tag
->context
) {
1020 /* Creating, first time called for this RPC */
1022 new std::list
<std::tuple
<int, std::string
, std::string
,
1024 tag
->context
= new_list
;
1025 nb_db_transactions_iterate(list_transactions_cb
, tag
->context
);
1027 new_list
->push_back(std::make_tuple(
1028 0xFFFF, std::string("fake client"),
1029 std::string("fake date"), std::string("fake comment")));
1030 new_list
->push_back(
1031 std::make_tuple(0xFFFE, std::string("fake client2"),
1032 std::string("fake date"),
1033 std::string("fake comment2")));
1036 auto list
= static_cast<std::list
<
1037 std::tuple
<int, std::string
, std::string
, std::string
>> *>(
1040 if (list
->empty()) {
1041 tag
->async_responder
.Finish(grpc::Status::OK
, tag
);
1042 tag
->state
= FINISH
;
1046 auto item
= list
->back();
1048 frr::ListTransactionsResponse response
;
1050 // Response: uint32 id = 1;
1051 response
.set_id(std::get
<0>(item
));
1053 // Response: string client = 2;
1054 response
.set_client(std::get
<1>(item
).c_str());
1056 // Response: string date = 3;
1057 response
.set_date(std::get
<2>(item
).c_str());
1059 // Response: string comment = 4;
1060 response
.set_comment(std::get
<3>(item
).c_str());
1063 if (list
->empty()) {
1064 tag
->async_responder
.WriteAndFinish(
1065 response
, grpc::WriteOptions(), grpc::Status::OK
, tag
);
1066 tag
->state
= FINISH
;
1068 tag
->async_responder
.Write(response
, tag
);
1073 void HandleUnaryGetTransaction(NewRpcState
<frr::GetTransactionRequest
,
1074 frr::GetTransactionResponse
> *tag
)
1076 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
1078 if (tag
->state
== FINISH
) {
1079 tag
->state
= DELETED
;
1083 // Request: uint32 transaction_id = 1;
1084 uint32_t transaction_id
= tag
->request
.transaction_id();
1085 // Request: Encoding encoding = 2;
1086 frr::Encoding encoding
= tag
->request
.encoding();
1087 // Request: bool with_defaults = 3;
1088 bool with_defaults
= tag
->request
.with_defaults();
1090 grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__
,
1091 transaction_id
, encoding
);
1093 struct nb_config
*nb_config
;
1095 // Load configuration from the transactions database.
1096 nb_config
= nb_db_transaction_load(transaction_id
);
1098 tag
->responder
.Finish(
1100 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
1101 "Transaction not found"),
1103 tag
->state
= FINISH
;
1107 // Response: DataTree config = 1;
1108 auto config
= tag
->response
.mutable_config();
1109 config
->set_encoding(encoding
);
1111 // Dump data using the requested format.
1112 if (data_tree_from_dnode(config
, nb_config
->dnode
,
1113 encoding2lyd_format(encoding
), with_defaults
)
1115 nb_config_free(nb_config
);
1116 tag
->responder
.Finish(tag
->response
,
1117 grpc::Status(grpc::StatusCode::INTERNAL
,
1118 "Failed to dump data"),
1120 tag
->state
= FINISH
;
1124 nb_config_free(nb_config
);
1126 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
1127 tag
->state
= FINISH
;
1130 void HandleUnaryExecute(
1131 NewRpcState
<frr::ExecuteRequest
, frr::ExecuteResponse
> *tag
)
1133 grpc_debug("%s: state: %s", __func__
, call_states
[tag
->state
]);
1135 if (tag
->state
== FINISH
) {
1136 tag
->state
= DELETED
;
1140 struct nb_node
*nb_node
;
1141 struct list
*input_list
;
1142 struct list
*output_list
;
1143 struct listnode
*node
;
1144 struct yang_data
*data
;
1146 char errmsg
[BUFSIZ
] = {0};
1148 // Request: string path = 1;
1149 xpath
= tag
->request
.path().c_str();
1151 grpc_debug("%s(path: \"%s\")", __func__
, xpath
);
1153 if (tag
->request
.path().empty()) {
1154 tag
->responder
.Finish(
1156 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
1157 "Data path is empty"),
1159 tag
->state
= FINISH
;
1163 nb_node
= nb_node_find(xpath
);
1165 tag
->responder
.Finish(
1167 grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
1168 "Unknown data path"),
1170 tag
->state
= FINISH
;
1174 input_list
= yang_data_list_new();
1175 output_list
= yang_data_list_new();
1177 // Read input parameters.
1178 auto input
= tag
->request
.input();
1179 for (const frr::PathValue
&pv
: input
) {
1180 // Request: repeated PathValue input = 2;
1181 data
= yang_data_new(pv
.path().c_str(), pv
.value().c_str());
1182 listnode_add(input_list
, data
);
1185 // Execute callback registered for this XPath.
1186 if (nb_callback_rpc(nb_node
, xpath
, input_list
, output_list
, errmsg
,
1189 flog_warn(EC_LIB_NB_CB_RPC
, "%s: rpc callback failed: %s",
1191 list_delete(&input_list
);
1192 list_delete(&output_list
);
1194 tag
->responder
.Finish(
1196 grpc::Status(grpc::StatusCode::INTERNAL
, "RPC failed"),
1198 tag
->state
= FINISH
;
1202 // Process output parameters.
1203 for (ALL_LIST_ELEMENTS_RO(output_list
, node
, data
)) {
1204 // Response: repeated PathValue output = 1;
1205 frr::PathValue
*pv
= tag
->response
.add_output();
1206 pv
->set_path(data
->xpath
);
1207 pv
->set_value(data
->value
);
1211 list_delete(&input_list
);
1212 list_delete(&output_list
);
1214 tag
->responder
.Finish(tag
->response
, grpc::Status::OK
, tag
);
1215 tag
->state
= FINISH
;
1218 // ------------------------------------------------------
1219 // Thread Initialization and Run Functions
1220 // ------------------------------------------------------
1223 #define REQUEST_NEWRPC(NAME, cdb) \
1225 auto _rpcState = new NewRpcState<frr::NAME##Request, \
1226 frr::NAME##Response>( \
1227 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1228 &HandleUnary##NAME, #NAME); \
1229 _rpcState->do_request(service, s_cq); \
1232 #define REQUEST_NEWRPC_STREAMING(NAME, cdb) \
1234 auto _rpcState = new NewRpcState<frr::NAME##Request, \
1235 frr::NAME##Response>( \
1236 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1237 &HandleStreaming##NAME, #NAME); \
1238 _rpcState->do_request(service, s_cq); \
1241 struct grpc_pthread_attr
{
1242 struct frr_pthread_attr attr
;
1246 // Capture these objects so we can try to shut down cleanly
1247 static std::unique_ptr
<grpc::Server
> s_server
;
1248 static grpc::ServerCompletionQueue
*s_cq
;
1250 static void *grpc_pthread_start(void *arg
)
1252 struct frr_pthread
*fpt
= static_cast<frr_pthread
*>(arg
);
1253 uint port
= (uint
) reinterpret_cast<intptr_t>(fpt
->data
);
1255 Candidates candidates
;
1256 grpc::ServerBuilder builder
;
1257 std::stringstream server_address
;
1258 frr::Northbound::AsyncService
*service
=
1259 new frr::Northbound::AsyncService();
1261 frr_pthread_set_name(fpt
);
1263 server_address
<< "0.0.0.0:" << port
;
1264 builder
.AddListeningPort(server_address
.str(),
1265 grpc::InsecureServerCredentials());
1266 builder
.RegisterService(service
);
1267 auto cq
= builder
.AddCompletionQueue();
1269 s_server
= builder
.BuildAndStart();
1271 /* Schedule all RPC handlers */
1272 REQUEST_NEWRPC(GetCapabilities
, NULL
);
1273 REQUEST_NEWRPC(CreateCandidate
, &candidates
);
1274 REQUEST_NEWRPC(DeleteCandidate
, &candidates
);
1275 REQUEST_NEWRPC(UpdateCandidate
, &candidates
);
1276 REQUEST_NEWRPC(EditCandidate
, &candidates
);
1277 REQUEST_NEWRPC(LoadToCandidate
, &candidates
);
1278 REQUEST_NEWRPC(Commit
, &candidates
);
1279 REQUEST_NEWRPC(GetTransaction
, NULL
);
1280 REQUEST_NEWRPC(LockConfig
, NULL
);
1281 REQUEST_NEWRPC(UnlockConfig
, NULL
);
1282 REQUEST_NEWRPC(Execute
, NULL
);
1283 REQUEST_NEWRPC_STREAMING(Get
, NULL
);
1284 REQUEST_NEWRPC_STREAMING(ListTransactions
, NULL
);
1286 zlog_notice("gRPC server listening on %s",
1287 server_address
.str().c_str());
1289 /* Process inbound RPCs */
1294 s_cq
->Next(&tag
, &ok
);
1298 grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__
,
1301 RpcStateBase
*rpc
= static_cast<RpcStateBase
*>(tag
);
1302 CallState state
= rpc
->doCallback();
1303 grpc_debug("%s: Callback returned RPC State: %s", __func__
,
1304 call_states
[state
]);
1307 * Our side is done (FINISH) receive new requests of this type
1308 * We could do this earlier but that would mean we could be
1309 * handling multiple same type requests in parallel. We expect
1310 * to be called back once more in the FINISH state (from the
1311 * user indicating Finish() for cleanup.
1313 if (state
== FINISH
)
1314 rpc
->do_request(service
, s_cq
);
1321 static int frr_grpc_init(uint port
)
1323 struct frr_pthread_attr attr
= {
1324 .start
= grpc_pthread_start
,
1328 fpt
= frr_pthread_new(&attr
, "frr-grpc", "frr-grpc");
1329 fpt
->data
= reinterpret_cast<void *>((intptr_t)port
);
1331 /* Create a pthread for gRPC since it runs its own event loop. */
1332 if (frr_pthread_run(fpt
, NULL
) < 0) {
1333 flog_err(EC_LIB_SYSTEM_CALL
, "%s: error creating pthread: %s",
1334 __func__
, safe_strerror(errno
));
1341 static int frr_grpc_finish(void)
1343 // Shutdown the grpc server
1345 s_server
->Shutdown();
1348 // And drain the queue
1352 while (s_cq
->Next(&ignore
, &ok
))
1357 pthread_join(fpt
->thread
, NULL
);
1358 frr_pthread_destroy(fpt
);
1365 * This is done this way because module_init and module_late_init are both
1366 * called during daemon pre-fork initialization. Because the GRPC library
1367 * spawns threads internally, we need to delay initializing it until after
1368 * fork. This is done by scheduling this init function as an event task, since
1369 * the event loop doesn't run until after fork.
1371 static int frr_grpc_module_very_late_init(struct thread
*thread
)
1373 const char *args
= THIS_MODULE
->load_args
;
1374 uint port
= GRPC_DEFAULT_PORT
;
1377 port
= std::stoul(args
);
1378 if (port
< 1024 || port
> UINT16_MAX
) {
1379 flog_err(EC_LIB_GRPC_INIT
,
1380 "%s: port number must be between 1025 and %d",
1381 __func__
, UINT16_MAX
);
1386 if (frr_grpc_init(port
) < 0)
1392 flog_err(EC_LIB_GRPC_INIT
, "failed to initialize the gRPC module");
1396 static int frr_grpc_module_late_init(struct thread_master
*tm
)
1399 hook_register(frr_fini
, frr_grpc_finish
);
1400 thread_add_event(tm
, frr_grpc_module_very_late_init
, NULL
, 0, NULL
);
1404 static int frr_grpc_module_init(void)
1406 hook_register(frr_late_init
, frr_grpc_module_late_init
);
1411 FRR_MODULE_SETUP(.name
= "frr_grpc", .version
= FRR_VERSION
,
1412 .description
= "FRR gRPC northbound module",
1413 .init
= frr_grpc_module_init
, );