2 // Copyright (c) 2021-2022, LabN Consulting, L.L.C
3 // Copyright (C) 2019 NetDEF, Inc.
6 // This program is free software; you can redistribute it and/or modify it
7 // under the terms of the GNU General Public License as published by the Free
8 // Software Foundation; either version 2 of the License, or (at your option)
11 // This program is distributed in the hope that it will be useful, but WITHOUT
12 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 // You should have received a copy of the GNU General Public License along
17 // with this program; see the file COPYING; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <grpcpp/grpcpp.h>
23 #include "grpc/frr-northbound.grpc.pb.h"
27 #include "lib/version.h"
28 #include "lib/thread.h"
30 #include "lib_errors.h"
31 #include "northbound.h"
32 #include "northbound_db.h"
33 #include "frr_pthread.h"
40 #define GRPC_DEFAULT_PORT 50051
43 // ------------------------------------------------------
44 // File Local Variables
45 // ------------------------------------------------------
48 * NOTE: we can't use the FRR debugging infrastructure here since it uses
49 * atomics and C++ has a different atomics API. Enable gRPC debugging
50 * unconditionally until we figure out a way to solve this problem.
52 static bool nb_dbg_client_grpc
= 0;
54 static struct thread_master
*main_master
;
56 static struct frr_pthread
*fpt
;
58 static bool grpc_running
;
60 #define grpc_debug(...) \
62 if (nb_dbg_client_grpc) \
63 zlog_debug(__VA_ARGS__); \
66 // ------------------------------------------------------
68 // ------------------------------------------------------
70 enum CallState
{ CREATE
, PROCESS
, MORE
, FINISH
, DELETED
};
71 const char *call_states
[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"};
75 struct nb_config
*config
;
76 struct nb_transaction
*transaction
;
85 for (auto it
= _cdb
.begin(); it
!= _cdb
.end(); it
++)
86 delete_candidate(it
->first
);
89 struct candidate
*create_candidate(void)
91 uint64_t id
= ++_next_id
;
92 assert(id
); // TODO: implement an algorithm for unique reusable
94 struct candidate
*c
= &_cdb
[id
];
96 c
->config
= nb_config_dup(running_config
);
97 c
->transaction
= NULL
;
102 bool contains(uint64_t candidate_id
)
104 return _cdb
.count(candidate_id
) > 0;
107 void delete_candidate(uint64_t candidate_id
)
109 struct candidate
*c
= &_cdb
[candidate_id
];
110 char errmsg
[BUFSIZ
] = {0};
112 nb_config_free(c
->config
);
114 nb_candidate_commit_abort(c
->transaction
, errmsg
,
119 struct candidate
*get_candidate(uint64_t id
)
121 return _cdb
.count(id
) == 0 ? NULL
: &_cdb
[id
];
125 uint64_t _next_id
= 0;
126 std::map
<uint64_t, struct candidate
> _cdb
;
130 * RpcStateBase is the common base class used to track a gRPC RPC.
135 virtual void do_request(::frr::Northbound::AsyncService
*service
,
136 ::grpc::ServerCompletionQueue
*cq
,
139 RpcStateBase(const char *name
) : name(name
){};
141 virtual ~RpcStateBase() = default;
143 CallState
get_state() const
148 bool is_initial_process() const
150 /* Will always be true for Unary */
151 return entered_state
== CREATE
;
154 // Returns "more" status, if false caller can delete
155 bool run(frr::Northbound::AsyncService
*service
,
156 grpc::ServerCompletionQueue
*cq
)
159 * We enter in either CREATE or MORE state, and transition to
162 this->entered_state
= this->state
;
163 this->state
= PROCESS
;
164 grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name
,
165 call_states
[this->entered_state
],
166 call_states
[this->state
]);
168 * We schedule the callback on the main pthread, and wait for
169 * the state to transition out of the PROCESS state. The new
170 * state will either be MORE or FINISH. It will always be FINISH
173 thread_add_event(main_master
, c_callback
, (void *)this, 0,
176 pthread_mutex_lock(&this->cmux
);
177 while (this->state
== PROCESS
)
178 pthread_cond_wait(&this->cond
, &this->cmux
);
179 pthread_mutex_unlock(&this->cmux
);
181 grpc_debug("%s RPC in %s on grpc-io-thread", name
,
182 call_states
[this->state
]);
184 if (this->state
== FINISH
) {
186 * Server is done (FINISH) so prep to receive a new
187 * request of this type. We could do this earlier but
188 * that would mean we could be handling multiple same
189 * type requests in parallel without limit.
191 this->do_request(service
, cq
, false);
197 virtual CallState
run_mainthread(struct thread
*thread
) = 0;
199 static void c_callback(struct thread
*thread
)
201 auto _tag
= static_cast<RpcStateBase
*>(thread
->arg
);
203 * We hold the lock until the callback finishes and has updated
204 * _tag->state, then we signal done and release.
206 pthread_mutex_lock(&_tag
->cmux
);
208 CallState enter_state
= _tag
->state
;
209 grpc_debug("%s RPC: running %s on main thread", _tag
->name
,
210 call_states
[enter_state
]);
212 _tag
->state
= _tag
->run_mainthread(thread
);
214 grpc_debug("%s RPC: %s -> %s [main thread]", _tag
->name
,
215 call_states
[enter_state
], call_states
[_tag
->state
]);
217 pthread_cond_signal(&_tag
->cond
);
218 pthread_mutex_unlock(&_tag
->cmux
);
222 grpc::ServerContext ctx
;
223 pthread_mutex_t cmux
= PTHREAD_MUTEX_INITIALIZER
;
224 pthread_cond_t cond
= PTHREAD_COND_INITIALIZER
;
225 CallState state
= CREATE
;
226 CallState entered_state
= CREATE
;
233 * The UnaryRpcState class is used to track the execution of a Unary RPC.
236 * Q - the request type for a given unary RPC
237 * S - the response type for a given unary RPC
239 template <typename Q
, typename S
> class UnaryRpcState
: public RpcStateBase
242 typedef void (frr::Northbound::AsyncService::*reqfunc_t
)(
243 ::grpc::ServerContext
*, Q
*,
244 ::grpc::ServerAsyncResponseWriter
<S
> *,
245 ::grpc::CompletionQueue
*, ::grpc::ServerCompletionQueue
*,
248 UnaryRpcState(Candidates
*cdb
, reqfunc_t rfunc
,
249 grpc::Status (*cb
)(UnaryRpcState
<Q
, S
> *),
251 : RpcStateBase(name
), cdb(cdb
), requestf(rfunc
), callback(cb
),
254 void do_request(::frr::Northbound::AsyncService
*service
,
255 ::grpc::ServerCompletionQueue
*cq
,
256 bool no_copy
) override
258 grpc_debug("%s, posting a request for: %s", __func__
, name
);
259 auto copy
= no_copy
? this
260 : new UnaryRpcState(cdb
, requestf
, callback
,
262 (service
->*requestf
)(©
->ctx
, ©
->request
,
263 ©
->responder
, cq
, cq
, copy
);
266 CallState
run_mainthread(struct thread
*thread
) override
268 // Unary RPC are always finished, see "Unary" :)
269 grpc::Status status
= this->callback(this);
270 responder
.Finish(response
, status
, this);
278 grpc::ServerAsyncResponseWriter
<S
> responder
;
280 grpc::Status (*callback
)(UnaryRpcState
<Q
, S
> *);
281 reqfunc_t requestf
= NULL
;
285 * The StreamRpcState class is used to track the execution of a Streaming RPC.
288 * Q - the request type for a given streaming RPC
289 * S - the response type for a given streaming RPC
290 * X - the type used to track the streaming state
292 template <typename Q
, typename S
, typename X
>
293 class StreamRpcState
: public RpcStateBase
296 typedef void (frr::Northbound::AsyncService::*reqsfunc_t
)(
297 ::grpc::ServerContext
*, Q
*, ::grpc::ServerAsyncWriter
<S
> *,
298 ::grpc::CompletionQueue
*, ::grpc::ServerCompletionQueue
*,
301 StreamRpcState(reqsfunc_t rfunc
, bool (*cb
)(StreamRpcState
<Q
, S
, X
> *),
303 : RpcStateBase(name
), requestsf(rfunc
), callback(cb
),
304 async_responder(&ctx
){};
306 void do_request(::frr::Northbound::AsyncService
*service
,
307 ::grpc::ServerCompletionQueue
*cq
,
308 bool no_copy
) override
310 grpc_debug("%s, posting a request for: %s", __func__
, name
);
313 : new StreamRpcState(requestsf
, callback
, name
);
314 (service
->*requestsf
)(©
->ctx
, ©
->request
,
315 ©
->async_responder
, cq
, cq
, copy
);
318 CallState
run_mainthread(struct thread
*thread
) override
320 if (this->callback(this))
328 grpc::ServerAsyncWriter
<S
> async_responder
;
330 bool (*callback
)(StreamRpcState
<Q
, S
, X
> *);
331 reqsfunc_t requestsf
= NULL
;
336 // ------------------------------------------------------
338 // ------------------------------------------------------
340 static LYD_FORMAT
encoding2lyd_format(enum frr::Encoding encoding
)
348 flog_err(EC_LIB_DEVELOPMENT
,
349 "%s: unknown data encoding format (%u)", __func__
,
355 static int yang_dnode_edit(struct lyd_node
*dnode
, const std::string
&path
,
358 LY_ERR err
= lyd_new_path(dnode
, ly_native_ctx
, path
.c_str(), value
,
359 LYD_NEW_PATH_UPDATE
, &dnode
);
360 if (err
!= LY_SUCCESS
) {
361 flog_warn(EC_LIB_LIBYANG
, "%s: lyd_new_path() failed: %s",
362 __func__
, ly_errmsg(ly_native_ctx
));
369 static int yang_dnode_delete(struct lyd_node
*dnode
, const std::string
&path
)
371 dnode
= yang_dnode_get(dnode
, path
.c_str());
375 lyd_free_tree(dnode
);
380 static LY_ERR
data_tree_from_dnode(frr::DataTree
*dt
,
381 const struct lyd_node
*dnode
,
382 LYD_FORMAT lyd_format
, bool with_defaults
)
387 SET_FLAG(options
, LYD_PRINT_WITHSIBLINGS
);
389 SET_FLAG(options
, LYD_PRINT_WD_ALL
);
391 SET_FLAG(options
, LYD_PRINT_WD_TRIM
);
393 LY_ERR err
= lyd_print_mem(&strp
, dnode
, lyd_format
, options
);
394 if (err
== LY_SUCCESS
) {
403 static struct lyd_node
*dnode_from_data_tree(const frr::DataTree
*dt
,
406 struct lyd_node
*dnode
;
411 options
= LYD_PARSE_NO_STATE
;
412 opt2
= LYD_VALIDATE_NO_STATE
;
414 options
= LYD_PARSE_STRICT
;
418 err
= lyd_parse_data_mem(ly_native_ctx
, dt
->data().c_str(),
419 encoding2lyd_format(dt
->encoding()), options
,
421 if (err
!= LY_SUCCESS
) {
422 flog_warn(EC_LIB_LIBYANG
, "%s: lyd_parse_mem() failed: %s",
423 __func__
, ly_errmsg(ly_native_ctx
));
428 static struct lyd_node
*get_dnode_config(const std::string
&path
)
430 struct lyd_node
*dnode
;
432 if (!yang_dnode_exists(running_config
->dnode
,
433 path
.empty() ? NULL
: path
.c_str()))
436 dnode
= yang_dnode_get(running_config
->dnode
,
437 path
.empty() ? NULL
: path
.c_str());
439 dnode
= yang_dnode_dup(dnode
);
444 static int get_oper_data_cb(const struct lysc_node
*snode
,
445 struct yang_translator
*translator
,
446 struct yang_data
*data
, void *arg
)
448 struct lyd_node
*dnode
= static_cast<struct lyd_node
*>(arg
);
449 int ret
= yang_dnode_edit(dnode
, data
->xpath
, data
->value
);
450 yang_data_free(data
);
452 return (ret
== 0) ? NB_OK
: NB_ERR
;
455 static struct lyd_node
*get_dnode_state(const std::string
&path
)
457 struct lyd_node
*dnode
= yang_dnode_new(ly_native_ctx
, false);
458 if (nb_oper_data_iterate(path
.c_str(), NULL
, 0, get_oper_data_cb
, dnode
)
460 yang_dnode_free(dnode
);
467 static grpc::Status
get_path(frr::DataTree
*dt
, const std::string
&path
,
468 int type
, LYD_FORMAT lyd_format
,
471 struct lyd_node
*dnode_config
= NULL
;
472 struct lyd_node
*dnode_state
= NULL
;
473 struct lyd_node
*dnode_final
;
475 // Configuration data.
476 if (type
== frr::GetRequest_DataType_ALL
477 || type
== frr::GetRequest_DataType_CONFIG
) {
478 dnode_config
= get_dnode_config(path
);
480 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
481 "Data path not found");
485 if (type
== frr::GetRequest_DataType_ALL
486 || type
== frr::GetRequest_DataType_STATE
) {
487 dnode_state
= get_dnode_state(path
);
490 yang_dnode_free(dnode_config
);
491 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
492 "Failed to fetch operational data");
497 case frr::GetRequest_DataType_ALL
:
499 // Combine configuration and state data into a single
502 if (lyd_merge_siblings(&dnode_state
, dnode_config
,
505 yang_dnode_free(dnode_state
);
506 yang_dnode_free(dnode_config
);
508 grpc::StatusCode::INTERNAL
,
509 "Failed to merge configuration and state data",
510 ly_errmsg(ly_native_ctx
));
513 dnode_final
= dnode_state
;
515 case frr::GetRequest_DataType_CONFIG
:
516 dnode_final
= dnode_config
;
518 case frr::GetRequest_DataType_STATE
:
519 dnode_final
= dnode_state
;
523 // Validate data to create implicit default nodes if necessary.
524 int validate_opts
= 0;
525 if (type
== frr::GetRequest_DataType_CONFIG
)
526 validate_opts
= LYD_VALIDATE_NO_STATE
;
530 LY_ERR err
= lyd_validate_all(&dnode_final
, ly_native_ctx
,
531 validate_opts
, NULL
);
534 flog_warn(EC_LIB_LIBYANG
, "%s: lyd_validate_all() failed: %s",
535 __func__
, ly_errmsg(ly_native_ctx
));
536 // Dump data using the requested format.
538 err
= data_tree_from_dnode(dt
, dnode_final
, lyd_format
,
540 yang_dnode_free(dnode_final
);
542 return grpc::Status(grpc::StatusCode::INTERNAL
,
543 "Failed to dump data");
544 return grpc::Status::OK
;
548 // ------------------------------------------------------
549 // RPC Callback Functions: run on main thread
550 // ------------------------------------------------------
552 grpc::Status
HandleUnaryGetCapabilities(
553 UnaryRpcState
<frr::GetCapabilitiesRequest
, frr::GetCapabilitiesResponse
>
556 grpc_debug("%s: entered", __func__
);
558 // Response: string frr_version = 1;
559 tag
->response
.set_frr_version(FRR_VERSION
);
561 // Response: bool rollback_support = 2;
562 #ifdef HAVE_CONFIG_ROLLBACKS
563 tag
->response
.set_rollback_support(true);
565 tag
->response
.set_rollback_support(false);
567 // Response: repeated ModuleData supported_modules = 3;
568 struct yang_module
*module
;
569 RB_FOREACH (module
, yang_modules
, &yang_modules
) {
570 auto m
= tag
->response
.add_supported_modules();
572 m
->set_name(module
->name
);
573 if (module
->info
->revision
)
574 m
->set_revision(module
->info
->revision
);
575 m
->set_organization(module
->info
->org
);
578 // Response: repeated Encoding supported_encodings = 4;
579 tag
->response
.add_supported_encodings(frr::JSON
);
580 tag
->response
.add_supported_encodings(frr::XML
);
582 return grpc::Status::OK
;
585 // Define the context variable type for this streaming handler
586 typedef std::list
<std::string
> GetContextType
;
588 bool HandleStreamingGet(
589 StreamRpcState
<frr::GetRequest
, frr::GetResponse
, GetContextType
> *tag
)
591 grpc_debug("%s: entered", __func__
);
593 auto mypathps
= &tag
->context
;
594 if (tag
->is_initial_process()) {
595 // Fill our context container first time through
596 grpc_debug("%s: initialize streaming state", __func__
);
597 auto paths
= tag
->request
.path();
598 for (const std::string
&path
: paths
) {
599 mypathps
->push_back(std::string(path
));
603 // Request: DataType type = 1;
604 int type
= tag
->request
.type();
605 // Request: Encoding encoding = 2;
606 frr::Encoding encoding
= tag
->request
.encoding();
607 // Request: bool with_defaults = 3;
608 bool with_defaults
= tag
->request
.with_defaults();
610 if (mypathps
->empty()) {
611 tag
->async_responder
.Finish(grpc::Status::OK
, tag
);
615 frr::GetResponse response
;
618 // Response: int64 timestamp = 1;
619 response
.set_timestamp(time(NULL
));
621 // Response: DataTree data = 2;
622 auto *data
= response
.mutable_data();
623 data
->set_encoding(tag
->request
.encoding());
624 status
= get_path(data
, mypathps
->back().c_str(), type
,
625 encoding2lyd_format(encoding
), with_defaults
);
628 tag
->async_responder
.WriteAndFinish(
629 response
, grpc::WriteOptions(), status
, tag
);
633 mypathps
->pop_back();
634 if (mypathps
->empty()) {
635 tag
->async_responder
.WriteAndFinish(
636 response
, grpc::WriteOptions(), grpc::Status::OK
, tag
);
639 tag
->async_responder
.Write(response
, tag
);
644 grpc::Status
HandleUnaryCreateCandidate(
645 UnaryRpcState
<frr::CreateCandidateRequest
, frr::CreateCandidateResponse
>
648 grpc_debug("%s: entered", __func__
);
650 struct candidate
*candidate
= tag
->cdb
->create_candidate();
652 return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED
,
653 "Can't create candidate configuration");
654 tag
->response
.set_candidate_id(candidate
->id
);
655 return grpc::Status::OK
;
658 grpc::Status
HandleUnaryDeleteCandidate(
659 UnaryRpcState
<frr::DeleteCandidateRequest
, frr::DeleteCandidateResponse
>
662 grpc_debug("%s: entered", __func__
);
664 uint32_t candidate_id
= tag
->request
.candidate_id();
666 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
668 if (!tag
->cdb
->contains(candidate_id
))
669 return grpc::Status(grpc::StatusCode::NOT_FOUND
,
670 "candidate configuration not found");
671 tag
->cdb
->delete_candidate(candidate_id
);
672 return grpc::Status::OK
;
675 grpc::Status
HandleUnaryUpdateCandidate(
676 UnaryRpcState
<frr::UpdateCandidateRequest
, frr::UpdateCandidateResponse
>
679 grpc_debug("%s: entered", __func__
);
681 uint32_t candidate_id
= tag
->request
.candidate_id();
683 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
685 struct candidate
*candidate
= tag
->cdb
->get_candidate(candidate_id
);
688 return grpc::Status(grpc::StatusCode::NOT_FOUND
,
689 "candidate configuration not found");
690 if (candidate
->transaction
)
692 grpc::StatusCode::FAILED_PRECONDITION
,
693 "candidate is in the middle of a transaction");
694 if (nb_candidate_update(candidate
->config
) != NB_OK
)
695 return grpc::Status(grpc::StatusCode::INTERNAL
,
696 "failed to update candidate configuration");
698 return grpc::Status::OK
;
701 grpc::Status
HandleUnaryEditCandidate(
702 UnaryRpcState
<frr::EditCandidateRequest
, frr::EditCandidateResponse
>
705 grpc_debug("%s: entered", __func__
);
707 uint32_t candidate_id
= tag
->request
.candidate_id();
709 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
711 struct candidate
*candidate
= tag
->cdb
->get_candidate(candidate_id
);
713 return grpc::Status(grpc::StatusCode::NOT_FOUND
,
714 "candidate configuration not found");
716 struct nb_config
*candidate_tmp
= nb_config_dup(candidate
->config
);
718 auto pvs
= tag
->request
.update();
719 for (const frr::PathValue
&pv
: pvs
) {
720 if (yang_dnode_edit(candidate_tmp
->dnode
, pv
.path(),
721 pv
.value().c_str()) != 0) {
722 nb_config_free(candidate_tmp
);
724 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
725 "Failed to update \"" + pv
.path() +
730 pvs
= tag
->request
.delete_();
731 for (const frr::PathValue
&pv
: pvs
) {
732 if (yang_dnode_delete(candidate_tmp
->dnode
, pv
.path()) != 0) {
733 nb_config_free(candidate_tmp
);
734 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
735 "Failed to remove \"" + pv
.path() +
740 // No errors, accept all changes.
741 nb_config_replace(candidate
->config
, candidate_tmp
, false);
742 return grpc::Status::OK
;
745 grpc::Status
HandleUnaryLoadToCandidate(
746 UnaryRpcState
<frr::LoadToCandidateRequest
, frr::LoadToCandidateResponse
>
749 grpc_debug("%s: entered", __func__
);
751 uint32_t candidate_id
= tag
->request
.candidate_id();
753 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
755 // Request: LoadType type = 2;
756 int load_type
= tag
->request
.type();
757 // Request: DataTree config = 3;
758 auto config
= tag
->request
.config();
760 struct candidate
*candidate
= tag
->cdb
->get_candidate(candidate_id
);
762 return grpc::Status(grpc::StatusCode::NOT_FOUND
,
763 "candidate configuration not found");
765 struct lyd_node
*dnode
= dnode_from_data_tree(&config
, true);
767 return grpc::Status(grpc::StatusCode::INTERNAL
,
768 "Failed to parse the configuration");
770 struct nb_config
*loaded_config
= nb_config_new(dnode
);
771 if (load_type
== frr::LoadToCandidateRequest::REPLACE
)
772 nb_config_replace(candidate
->config
, loaded_config
, false);
773 else if (nb_config_merge(candidate
->config
, loaded_config
, false) !=
775 return grpc::Status(grpc::StatusCode::INTERNAL
,
776 "Failed to merge the loaded configuration");
778 return grpc::Status::OK
;
782 HandleUnaryCommit(UnaryRpcState
<frr::CommitRequest
, frr::CommitResponse
> *tag
)
784 grpc_debug("%s: entered", __func__
);
786 // Request: uint32 candidate_id = 1;
787 uint32_t candidate_id
= tag
->request
.candidate_id();
789 grpc_debug("%s(candidate_id: %u)", __func__
, candidate_id
);
791 // Request: Phase phase = 2;
792 int phase
= tag
->request
.phase();
793 // Request: string comment = 3;
794 const std::string comment
= tag
->request
.comment();
796 // Find candidate configuration.
797 struct candidate
*candidate
= tag
->cdb
->get_candidate(candidate_id
);
799 return grpc::Status(grpc::StatusCode::NOT_FOUND
,
800 "candidate configuration not found");
803 uint32_t transaction_id
= 0;
805 // Check for misuse of the two-phase commit protocol.
807 case frr::CommitRequest::PREPARE
:
808 case frr::CommitRequest::ALL
:
809 if (candidate
->transaction
)
811 grpc::StatusCode::FAILED_PRECONDITION
,
812 "candidate is in the middle of a transaction");
814 case frr::CommitRequest::ABORT
:
815 case frr::CommitRequest::APPLY
:
816 if (!candidate
->transaction
)
818 grpc::StatusCode::FAILED_PRECONDITION
,
819 "no transaction in progress");
826 // Execute the user request.
827 struct nb_context context
= {};
828 context
.client
= NB_CLIENT_GRPC
;
829 char errmsg
[BUFSIZ
] = {0};
832 case frr::CommitRequest::VALIDATE
:
833 grpc_debug("`-> Performing VALIDATE");
834 ret
= nb_candidate_validate(&context
, candidate
->config
, errmsg
,
837 case frr::CommitRequest::PREPARE
:
838 grpc_debug("`-> Performing PREPARE");
839 ret
= nb_candidate_commit_prepare(
840 &context
, candidate
->config
, comment
.c_str(),
841 &candidate
->transaction
, errmsg
, sizeof(errmsg
));
843 case frr::CommitRequest::ABORT
:
844 grpc_debug("`-> Performing ABORT");
845 nb_candidate_commit_abort(candidate
->transaction
, errmsg
,
848 case frr::CommitRequest::APPLY
:
849 grpc_debug("`-> Performing APPLY");
850 nb_candidate_commit_apply(candidate
->transaction
, true,
851 &transaction_id
, errmsg
,
854 case frr::CommitRequest::ALL
:
855 grpc_debug("`-> Performing ALL");
856 ret
= nb_candidate_commit(&context
, candidate
->config
, true,
857 comment
.c_str(), &transaction_id
,
858 errmsg
, sizeof(errmsg
));
862 // Map northbound error codes to gRPC status codes.
866 status
= grpc::Status::OK
;
868 case NB_ERR_NO_CHANGES
:
869 status
= grpc::Status(grpc::StatusCode::ABORTED
, errmsg
);
872 status
= grpc::Status(grpc::StatusCode::UNAVAILABLE
, errmsg
);
874 case NB_ERR_VALIDATION
:
875 status
= grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
878 case NB_ERR_RESOURCE
:
879 status
= grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED
,
884 status
= grpc::Status(grpc::StatusCode::INTERNAL
, errmsg
);
888 grpc_debug("`-> Result: %s (message: '%s')",
889 nb_err_name((enum nb_error
)ret
), errmsg
);
892 // Response: uint32 transaction_id = 1;
894 tag
->response
.set_transaction_id(transaction_id
);
896 if (strlen(errmsg
) > 0)
897 tag
->response
.set_error_message(errmsg
);
902 grpc::Status
HandleUnaryLockConfig(
903 UnaryRpcState
<frr::LockConfigRequest
, frr::LockConfigResponse
> *tag
)
905 grpc_debug("%s: entered", __func__
);
907 if (nb_running_lock(NB_CLIENT_GRPC
, NULL
))
908 return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION
,
909 "running configuration is locked already");
910 return grpc::Status::OK
;
913 grpc::Status
HandleUnaryUnlockConfig(
914 UnaryRpcState
<frr::UnlockConfigRequest
, frr::UnlockConfigResponse
> *tag
)
916 grpc_debug("%s: entered", __func__
);
918 if (nb_running_unlock(NB_CLIENT_GRPC
, NULL
))
920 grpc::StatusCode::FAILED_PRECONDITION
,
921 "failed to unlock the running configuration");
922 return grpc::Status::OK
;
925 static void list_transactions_cb(void *arg
, int transaction_id
,
926 const char *client_name
, const char *date
,
929 auto list
= static_cast<std::list
<
930 std::tuple
<int, std::string
, std::string
, std::string
>> *>(arg
);
932 std::make_tuple(transaction_id
, std::string(client_name
),
933 std::string(date
), std::string(comment
)));
936 // Define the context variable type for this streaming handler
937 typedef std::list
<std::tuple
<int, std::string
, std::string
, std::string
>>
938 ListTransactionsContextType
;
940 bool HandleStreamingListTransactions(
941 StreamRpcState
<frr::ListTransactionsRequest
,
942 frr::ListTransactionsResponse
,
943 ListTransactionsContextType
> *tag
)
945 grpc_debug("%s: entered", __func__
);
947 auto list
= &tag
->context
;
948 if (tag
->is_initial_process()) {
949 grpc_debug("%s: initialize streaming state", __func__
);
950 // Fill our context container first time through
951 nb_db_transactions_iterate(list_transactions_cb
, list
);
952 list
->push_back(std::make_tuple(
953 0xFFFF, std::string("fake client"),
954 std::string("fake date"), std::string("fake comment")));
955 list
->push_back(std::make_tuple(0xFFFE,
956 std::string("fake client2"),
957 std::string("fake date"),
958 std::string("fake comment2")));
962 tag
->async_responder
.Finish(grpc::Status::OK
, tag
);
966 auto item
= list
->back();
968 frr::ListTransactionsResponse response
;
970 // Response: uint32 id = 1;
971 response
.set_id(std::get
<0>(item
));
973 // Response: string client = 2;
974 response
.set_client(std::get
<1>(item
).c_str());
976 // Response: string date = 3;
977 response
.set_date(std::get
<2>(item
).c_str());
979 // Response: string comment = 4;
980 response
.set_comment(std::get
<3>(item
).c_str());
984 tag
->async_responder
.WriteAndFinish(
985 response
, grpc::WriteOptions(), grpc::Status::OK
, tag
);
988 tag
->async_responder
.Write(response
, tag
);
993 grpc::Status
HandleUnaryGetTransaction(
994 UnaryRpcState
<frr::GetTransactionRequest
, frr::GetTransactionResponse
>
997 grpc_debug("%s: entered", __func__
);
999 // Request: uint32 transaction_id = 1;
1000 uint32_t transaction_id
= tag
->request
.transaction_id();
1001 // Request: Encoding encoding = 2;
1002 frr::Encoding encoding
= tag
->request
.encoding();
1003 // Request: bool with_defaults = 3;
1004 bool with_defaults
= tag
->request
.with_defaults();
1006 grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__
,
1007 transaction_id
, encoding
);
1009 struct nb_config
*nb_config
;
1011 // Load configuration from the transactions database.
1012 nb_config
= nb_db_transaction_load(transaction_id
);
1014 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
1015 "Transaction not found");
1017 // Response: DataTree config = 1;
1018 auto config
= tag
->response
.mutable_config();
1019 config
->set_encoding(encoding
);
1021 // Dump data using the requested format.
1022 if (data_tree_from_dnode(config
, nb_config
->dnode
,
1023 encoding2lyd_format(encoding
), with_defaults
)
1025 nb_config_free(nb_config
);
1026 return grpc::Status(grpc::StatusCode::INTERNAL
,
1027 "Failed to dump data");
1030 nb_config_free(nb_config
);
1032 return grpc::Status::OK
;
1035 grpc::Status
HandleUnaryExecute(
1036 UnaryRpcState
<frr::ExecuteRequest
, frr::ExecuteResponse
> *tag
)
1038 grpc_debug("%s: entered", __func__
);
1040 struct nb_node
*nb_node
;
1041 struct list
*input_list
;
1042 struct list
*output_list
;
1043 struct listnode
*node
;
1044 struct yang_data
*data
;
1046 char errmsg
[BUFSIZ
] = {0};
1048 // Request: string path = 1;
1049 xpath
= tag
->request
.path().c_str();
1051 grpc_debug("%s(path: \"%s\")", __func__
, xpath
);
1053 if (tag
->request
.path().empty())
1054 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
1055 "Data path is empty");
1057 nb_node
= nb_node_find(xpath
);
1059 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT
,
1060 "Unknown data path");
1062 input_list
= yang_data_list_new();
1063 output_list
= yang_data_list_new();
1065 // Read input parameters.
1066 auto input
= tag
->request
.input();
1067 for (const frr::PathValue
&pv
: input
) {
1068 // Request: repeated PathValue input = 2;
1069 data
= yang_data_new(pv
.path().c_str(), pv
.value().c_str());
1070 listnode_add(input_list
, data
);
1073 // Execute callback registered for this XPath.
1074 if (nb_callback_rpc(nb_node
, xpath
, input_list
, output_list
, errmsg
,
1077 flog_warn(EC_LIB_NB_CB_RPC
, "%s: rpc callback failed: %s",
1079 list_delete(&input_list
);
1080 list_delete(&output_list
);
1082 return grpc::Status(grpc::StatusCode::INTERNAL
, "RPC failed");
1085 // Process output parameters.
1086 for (ALL_LIST_ELEMENTS_RO(output_list
, node
, data
)) {
1087 // Response: repeated PathValue output = 1;
1088 frr::PathValue
*pv
= tag
->response
.add_output();
1089 pv
->set_path(data
->xpath
);
1090 pv
->set_value(data
->value
);
1094 list_delete(&input_list
);
1095 list_delete(&output_list
);
1097 return grpc::Status::OK
;
1100 // ------------------------------------------------------
1101 // Thread Initialization and Run Functions
1102 // ------------------------------------------------------
1105 #define REQUEST_NEWRPC(NAME, cdb) \
1107 auto _rpcState = new UnaryRpcState<frr::NAME##Request, \
1108 frr::NAME##Response>( \
1109 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1110 &HandleUnary##NAME, #NAME); \
1111 _rpcState->do_request(&service, cq.get(), true); \
1114 #define REQUEST_NEWRPC_STREAMING(NAME) \
1116 auto _rpcState = new StreamRpcState<frr::NAME##Request, \
1117 frr::NAME##Response, \
1118 NAME##ContextType>( \
1119 &frr::Northbound::AsyncService::Request##NAME, \
1120 &HandleStreaming##NAME, #NAME); \
1121 _rpcState->do_request(&service, cq.get(), true); \
1124 struct grpc_pthread_attr
{
1125 struct frr_pthread_attr attr
;
1129 // Capture these objects so we can try to shut down cleanly
1130 static pthread_mutex_t s_server_lock
= PTHREAD_MUTEX_INITIALIZER
;
1131 static grpc::Server
*s_server
;
1133 static void *grpc_pthread_start(void *arg
)
1135 struct frr_pthread
*fpt
= static_cast<frr_pthread
*>(arg
);
1136 uint port
= (uint
) reinterpret_cast<intptr_t>(fpt
->data
);
1138 Candidates candidates
;
1139 grpc::ServerBuilder builder
;
1140 std::stringstream server_address
;
1141 frr::Northbound::AsyncService service
;
1143 frr_pthread_set_name(fpt
);
1145 server_address
<< "0.0.0.0:" << port
;
1146 builder
.AddListeningPort(server_address
.str(),
1147 grpc::InsecureServerCredentials());
1148 builder
.RegisterService(&service
);
1149 builder
.AddChannelArgument(
1150 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS
, 5000);
1151 std::unique_ptr
<grpc::ServerCompletionQueue
> cq
=
1152 builder
.AddCompletionQueue();
1153 std::unique_ptr
<grpc::Server
> server
= builder
.BuildAndStart();
1154 s_server
= server
.get();
1156 pthread_mutex_lock(&s_server_lock
); // Make coverity happy
1157 grpc_running
= true;
1158 pthread_mutex_unlock(&s_server_lock
); // Make coverity happy
1160 /* Schedule unary RPC handlers */
1161 REQUEST_NEWRPC(GetCapabilities
, NULL
);
1162 REQUEST_NEWRPC(CreateCandidate
, &candidates
);
1163 REQUEST_NEWRPC(DeleteCandidate
, &candidates
);
1164 REQUEST_NEWRPC(UpdateCandidate
, &candidates
);
1165 REQUEST_NEWRPC(EditCandidate
, &candidates
);
1166 REQUEST_NEWRPC(LoadToCandidate
, &candidates
);
1167 REQUEST_NEWRPC(Commit
, &candidates
);
1168 REQUEST_NEWRPC(GetTransaction
, NULL
);
1169 REQUEST_NEWRPC(LockConfig
, NULL
);
1170 REQUEST_NEWRPC(UnlockConfig
, NULL
);
1171 REQUEST_NEWRPC(Execute
, NULL
);
1173 /* Schedule streaming RPC handlers */
1174 REQUEST_NEWRPC_STREAMING(Get
);
1175 REQUEST_NEWRPC_STREAMING(ListTransactions
);
1177 zlog_notice("gRPC server listening on %s",
1178 server_address
.str().c_str());
1180 /* Process inbound RPCs */
1184 if (!cq
->Next(&tag
, &ok
)) {
1185 grpc_debug("%s: CQ empty exiting", __func__
);
1189 grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__
, tag
,
1193 delete static_cast<RpcStateBase
*>(tag
);
1197 RpcStateBase
*rpc
= static_cast<RpcStateBase
*>(tag
);
1198 if (rpc
->get_state() != FINISH
)
1199 rpc
->run(&service
, cq
.get());
1201 grpc_debug("%s RPC FINISH -> [delete]", rpc
->name
);
1206 /* This was probably done for us to get here, but let's be safe */
1207 pthread_mutex_lock(&s_server_lock
);
1208 grpc_running
= false;
1210 grpc_debug("%s: shutdown server and CQ", __func__
);
1214 pthread_mutex_unlock(&s_server_lock
);
1216 grpc_debug("%s: shutting down CQ", __func__
);
1219 grpc_debug("%s: draining the CQ", __func__
);
1220 while (cq
->Next(&tag
, &ok
)) {
1221 grpc_debug("%s: drain tag %p", __func__
, tag
);
1222 delete static_cast<RpcStateBase
*>(tag
);
1225 zlog_info("%s: exiting from grpc pthread", __func__
);
1230 static int frr_grpc_init(uint port
)
1232 struct frr_pthread_attr attr
= {
1233 .start
= grpc_pthread_start
,
1237 grpc_debug("%s: entered", __func__
);
1239 fpt
= frr_pthread_new(&attr
, "frr-grpc", "frr-grpc");
1240 fpt
->data
= reinterpret_cast<void *>((intptr_t)port
);
1242 /* Create a pthread for gRPC since it runs its own event loop. */
1243 if (frr_pthread_run(fpt
, NULL
) < 0) {
1244 flog_err(EC_LIB_SYSTEM_CALL
, "%s: error creating pthread: %s",
1245 __func__
, safe_strerror(errno
));
1252 static int frr_grpc_finish(void)
1254 grpc_debug("%s: entered", __func__
);
1260 * Shut the server down here in main thread. This will cause the wait on
1261 * the completion queue (cq.Next()) to exit and cleanup everything else.
1263 pthread_mutex_lock(&s_server_lock
);
1264 grpc_running
= false;
1266 grpc_debug("%s: shutdown server", __func__
);
1267 s_server
->Shutdown();
1270 pthread_mutex_unlock(&s_server_lock
);
1272 grpc_debug("%s: joining and destroy grpc thread", __func__
);
1273 pthread_join(fpt
->thread
, NULL
);
1274 frr_pthread_destroy(fpt
);
1276 // Fix protobuf 'memory leaks' during shutdown.
1277 // https://groups.google.com/g/protobuf/c/4y_EmQiCGgs
1278 google::protobuf::ShutdownProtobufLibrary();
1284 * This is done this way because module_init and module_late_init are both
1285 * called during daemon pre-fork initialization. Because the GRPC library
1286 * spawns threads internally, we need to delay initializing it until after
1287 * fork. This is done by scheduling this init function as an event task, since
1288 * the event loop doesn't run until after fork.
1290 static void frr_grpc_module_very_late_init(struct thread
*thread
)
1292 const char *args
= THIS_MODULE
->load_args
;
1293 uint port
= GRPC_DEFAULT_PORT
;
1296 port
= std::stoul(args
);
1297 if (port
< 1024 || port
> UINT16_MAX
) {
1298 flog_err(EC_LIB_GRPC_INIT
,
1299 "%s: port number must be between 1025 and %d",
1300 __func__
, UINT16_MAX
);
1305 if (frr_grpc_init(port
) < 0)
1311 flog_err(EC_LIB_GRPC_INIT
, "failed to initialize the gRPC module");
1314 static int frr_grpc_module_late_init(struct thread_master
*tm
)
1317 hook_register(frr_fini
, frr_grpc_finish
);
1318 thread_add_event(tm
, frr_grpc_module_very_late_init
, NULL
, 0, NULL
);
1322 static int frr_grpc_module_init(void)
1324 hook_register(frr_late_init
, frr_grpc_module_late_init
);
1329 FRR_MODULE_SETUP(.name
= "frr_grpc", .version
= FRR_VERSION
,
1330 .description
= "FRR gRPC northbound module",
1331 .init
= frr_grpc_module_init
, );