]> git.proxmox.com Git - mirror_frr.git/blob - lib/northbound_grpc.cpp
lib: Remove tests for ipv[4|6]_prefix_table
[mirror_frr.git] / lib / northbound_grpc.cpp
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 //
3 // Copyright (c) 2021-2022, LabN Consulting, L.L.C
4 // Copyright (C) 2019 NetDEF, Inc.
5 // Renato Westphal
6 //
7
8 #include <zebra.h>
9 #include <grpcpp/grpcpp.h>
10 #include "grpc/frr-northbound.grpc.pb.h"
11
12 #include "log.h"
13 #include "libfrr.h"
14 #include "lib/version.h"
15 #include "lib/thread.h"
16 #include "command.h"
17 #include "lib_errors.h"
18 #include "northbound.h"
19 #include "northbound_db.h"
20 #include "frr_pthread.h"
21
22 #include <iostream>
23 #include <sstream>
24 #include <memory>
25 #include <string>
26
27 #define GRPC_DEFAULT_PORT 50051
28
29
30 // ------------------------------------------------------
31 // File Local Variables
32 // ------------------------------------------------------
33
34 /*
35 * NOTE: we can't use the FRR debugging infrastructure here since it uses
36 * atomics and C++ has a different atomics API. Enable gRPC debugging
37 * unconditionally until we figure out a way to solve this problem.
38 */
39 static bool nb_dbg_client_grpc = 0;
40
41 static struct thread_master *main_master;
42
43 static struct frr_pthread *fpt;
44
45 static bool grpc_running;
46
47 #define grpc_debug(...) \
48 do { \
49 if (nb_dbg_client_grpc) \
50 zlog_debug(__VA_ARGS__); \
51 } while (0)
52
53 // ------------------------------------------------------
54 // New Types
55 // ------------------------------------------------------
56
57 enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED };
58 const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"};
59
60 struct candidate {
61 uint64_t id;
62 struct nb_config *config;
63 struct nb_transaction *transaction;
64 };
65
66 class Candidates
67 {
68 public:
69 ~Candidates(void)
70 {
71 // Delete candidates.
72 for (auto it = _cdb.begin(); it != _cdb.end(); it++)
73 delete_candidate(it->first);
74 }
75
76 struct candidate *create_candidate(void)
77 {
78 uint64_t id = ++_next_id;
79 assert(id); // TODO: implement an algorithm for unique reusable
80 // IDs.
81 struct candidate *c = &_cdb[id];
82 c->id = id;
83 c->config = nb_config_dup(running_config);
84 c->transaction = NULL;
85
86 return c;
87 }
88
89 bool contains(uint64_t candidate_id)
90 {
91 return _cdb.count(candidate_id) > 0;
92 }
93
94 void delete_candidate(uint64_t candidate_id)
95 {
96 struct candidate *c = &_cdb[candidate_id];
97 char errmsg[BUFSIZ] = {0};
98
99 nb_config_free(c->config);
100 if (c->transaction)
101 nb_candidate_commit_abort(c->transaction, errmsg,
102 sizeof(errmsg));
103 _cdb.erase(c->id);
104 }
105
106 struct candidate *get_candidate(uint64_t id)
107 {
108 return _cdb.count(id) == 0 ? NULL : &_cdb[id];
109 }
110
111 private:
112 uint64_t _next_id = 0;
113 std::map<uint64_t, struct candidate> _cdb;
114 };
115
116 /*
117 * RpcStateBase is the common base class used to track a gRPC RPC.
118 */
119 class RpcStateBase
120 {
121 public:
122 virtual void do_request(::frr::Northbound::AsyncService *service,
123 ::grpc::ServerCompletionQueue *cq,
124 bool no_copy) = 0;
125
126 RpcStateBase(const char *name) : name(name){};
127
128 virtual ~RpcStateBase() = default;
129
130 CallState get_state() const
131 {
132 return state;
133 }
134
135 bool is_initial_process() const
136 {
137 /* Will always be true for Unary */
138 return entered_state == CREATE;
139 }
140
141 // Returns "more" status, if false caller can delete
142 bool run(frr::Northbound::AsyncService *service,
143 grpc::ServerCompletionQueue *cq)
144 {
145 /*
146 * We enter in either CREATE or MORE state, and transition to
147 * PROCESS state.
148 */
149 this->entered_state = this->state;
150 this->state = PROCESS;
151 grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name,
152 call_states[this->entered_state],
153 call_states[this->state]);
154 /*
155 * We schedule the callback on the main pthread, and wait for
156 * the state to transition out of the PROCESS state. The new
157 * state will either be MORE or FINISH. It will always be FINISH
158 * for Unary RPCs.
159 */
160 thread_add_event(main_master, c_callback, (void *)this, 0,
161 NULL);
162
163 pthread_mutex_lock(&this->cmux);
164 while (this->state == PROCESS)
165 pthread_cond_wait(&this->cond, &this->cmux);
166 pthread_mutex_unlock(&this->cmux);
167
168 grpc_debug("%s RPC in %s on grpc-io-thread", name,
169 call_states[this->state]);
170
171 if (this->state == FINISH) {
172 /*
173 * Server is done (FINISH) so prep to receive a new
174 * request of this type. We could do this earlier but
175 * that would mean we could be handling multiple same
176 * type requests in parallel without limit.
177 */
178 this->do_request(service, cq, false);
179 }
180 return true;
181 }
182
183 protected:
184 virtual CallState run_mainthread(struct thread *thread) = 0;
185
186 static void c_callback(struct thread *thread)
187 {
188 auto _tag = static_cast<RpcStateBase *>(THREAD_ARG(thread));
189 /*
190 * We hold the lock until the callback finishes and has updated
191 * _tag->state, then we signal done and release.
192 */
193 pthread_mutex_lock(&_tag->cmux);
194
195 CallState enter_state = _tag->state;
196 grpc_debug("%s RPC: running %s on main thread", _tag->name,
197 call_states[enter_state]);
198
199 _tag->state = _tag->run_mainthread(thread);
200
201 grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name,
202 call_states[enter_state], call_states[_tag->state]);
203
204 pthread_cond_signal(&_tag->cond);
205 pthread_mutex_unlock(&_tag->cmux);
206 return;
207 }
208
209 grpc::ServerContext ctx;
210 pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
211 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
212 CallState state = CREATE;
213 CallState entered_state = CREATE;
214
215 public:
216 const char *name;
217 };
218
219 /*
220 * The UnaryRpcState class is used to track the execution of a Unary RPC.
221 *
222 * Template Args:
223 * Q - the request type for a given unary RPC
224 * S - the response type for a given unary RPC
225 */
226 template <typename Q, typename S> class UnaryRpcState : public RpcStateBase
227 {
228 public:
229 typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
230 ::grpc::ServerContext *, Q *,
231 ::grpc::ServerAsyncResponseWriter<S> *,
232 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
233 void *);
234
235 UnaryRpcState(Candidates *cdb, reqfunc_t rfunc,
236 grpc::Status (*cb)(UnaryRpcState<Q, S> *),
237 const char *name)
238 : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb),
239 responder(&ctx){};
240
241 void do_request(::frr::Northbound::AsyncService *service,
242 ::grpc::ServerCompletionQueue *cq,
243 bool no_copy) override
244 {
245 grpc_debug("%s, posting a request for: %s", __func__, name);
246 auto copy = no_copy ? this
247 : new UnaryRpcState(cdb, requestf, callback,
248 name);
249 (service->*requestf)(&copy->ctx, &copy->request,
250 &copy->responder, cq, cq, copy);
251 }
252
253 CallState run_mainthread(struct thread *thread) override
254 {
255 // Unary RPC are always finished, see "Unary" :)
256 grpc::Status status = this->callback(this);
257 responder.Finish(response, status, this);
258 return FINISH;
259 }
260
261 Candidates *cdb;
262
263 Q request;
264 S response;
265 grpc::ServerAsyncResponseWriter<S> responder;
266
267 grpc::Status (*callback)(UnaryRpcState<Q, S> *);
268 reqfunc_t requestf = NULL;
269 };
270
271 /*
272 * The StreamRpcState class is used to track the execution of a Streaming RPC.
273 *
274 * Template Args:
275 * Q - the request type for a given streaming RPC
276 * S - the response type for a given streaming RPC
277 * X - the type used to track the streaming state
278 */
279 template <typename Q, typename S, typename X>
280 class StreamRpcState : public RpcStateBase
281 {
282 public:
283 typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
284 ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
285 ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
286 void *);
287
288 StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *),
289 const char *name)
290 : RpcStateBase(name), requestsf(rfunc), callback(cb),
291 async_responder(&ctx){};
292
293 void do_request(::frr::Northbound::AsyncService *service,
294 ::grpc::ServerCompletionQueue *cq,
295 bool no_copy) override
296 {
297 grpc_debug("%s, posting a request for: %s", __func__, name);
298 auto copy =
299 no_copy ? this
300 : new StreamRpcState(requestsf, callback, name);
301 (service->*requestsf)(&copy->ctx, &copy->request,
302 &copy->async_responder, cq, cq, copy);
303 }
304
305 CallState run_mainthread(struct thread *thread) override
306 {
307 if (this->callback(this))
308 return MORE;
309 else
310 return FINISH;
311 }
312
313 Q request;
314 S response;
315 grpc::ServerAsyncWriter<S> async_responder;
316
317 bool (*callback)(StreamRpcState<Q, S, X> *);
318 reqsfunc_t requestsf = NULL;
319
320 X context;
321 };
322
323 // ------------------------------------------------------
324 // Utility Functions
325 // ------------------------------------------------------
326
327 static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
328 {
329 switch (encoding) {
330 case frr::JSON:
331 return LYD_JSON;
332 case frr::XML:
333 return LYD_XML;
334 default:
335 flog_err(EC_LIB_DEVELOPMENT,
336 "%s: unknown data encoding format (%u)", __func__,
337 encoding);
338 exit(1);
339 }
340 }
341
342 static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path,
343 const char *value)
344 {
345 LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value,
346 LYD_NEW_PATH_UPDATE, &dnode);
347 if (err != LY_SUCCESS) {
348 flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",
349 __func__, ly_errmsg(ly_native_ctx));
350 return -1;
351 }
352
353 return 0;
354 }
355
356 static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path)
357 {
358 dnode = yang_dnode_get(dnode, path.c_str());
359 if (!dnode)
360 return -1;
361
362 lyd_free_tree(dnode);
363
364 return 0;
365 }
366
367 static LY_ERR data_tree_from_dnode(frr::DataTree *dt,
368 const struct lyd_node *dnode,
369 LYD_FORMAT lyd_format, bool with_defaults)
370 {
371 char *strp;
372 int options = 0;
373
374 SET_FLAG(options, LYD_PRINT_WITHSIBLINGS);
375 if (with_defaults)
376 SET_FLAG(options, LYD_PRINT_WD_ALL);
377 else
378 SET_FLAG(options, LYD_PRINT_WD_TRIM);
379
380 LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options);
381 if (err == LY_SUCCESS) {
382 if (strp) {
383 dt->set_data(strp);
384 free(strp);
385 }
386 }
387 return err;
388 }
389
390 static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt,
391 bool config_only)
392 {
393 struct lyd_node *dnode;
394 int options, opt2;
395 LY_ERR err;
396
397 if (config_only) {
398 options = LYD_PARSE_NO_STATE;
399 opt2 = LYD_VALIDATE_NO_STATE;
400 } else {
401 options = LYD_PARSE_STRICT;
402 opt2 = 0;
403 }
404
405 err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(),
406 encoding2lyd_format(dt->encoding()), options,
407 opt2, &dnode);
408 if (err != LY_SUCCESS) {
409 flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s",
410 __func__, ly_errmsg(ly_native_ctx));
411 }
412 return dnode;
413 }
414
415 static struct lyd_node *get_dnode_config(const std::string &path)
416 {
417 struct lyd_node *dnode;
418
419 if (!yang_dnode_exists(running_config->dnode,
420 path.empty() ? NULL : path.c_str()))
421 return NULL;
422
423 dnode = yang_dnode_get(running_config->dnode,
424 path.empty() ? NULL : path.c_str());
425 if (dnode)
426 dnode = yang_dnode_dup(dnode);
427
428 return dnode;
429 }
430
431 static int get_oper_data_cb(const struct lysc_node *snode,
432 struct yang_translator *translator,
433 struct yang_data *data, void *arg)
434 {
435 struct lyd_node *dnode = static_cast<struct lyd_node *>(arg);
436 int ret = yang_dnode_edit(dnode, data->xpath, data->value);
437 yang_data_free(data);
438
439 return (ret == 0) ? NB_OK : NB_ERR;
440 }
441
442 static struct lyd_node *get_dnode_state(const std::string &path)
443 {
444 struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false);
445 if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode)
446 != NB_OK) {
447 yang_dnode_free(dnode);
448 return NULL;
449 }
450
451 return dnode;
452 }
453
454 static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
455 int type, LYD_FORMAT lyd_format,
456 bool with_defaults)
457 {
458 struct lyd_node *dnode_config = NULL;
459 struct lyd_node *dnode_state = NULL;
460 struct lyd_node *dnode_final;
461
462 // Configuration data.
463 if (type == frr::GetRequest_DataType_ALL
464 || type == frr::GetRequest_DataType_CONFIG) {
465 dnode_config = get_dnode_config(path);
466 if (!dnode_config)
467 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
468 "Data path not found");
469 }
470
471 // Operational data.
472 if (type == frr::GetRequest_DataType_ALL
473 || type == frr::GetRequest_DataType_STATE) {
474 dnode_state = get_dnode_state(path);
475 if (!dnode_state) {
476 if (dnode_config)
477 yang_dnode_free(dnode_config);
478 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
479 "Failed to fetch operational data");
480 }
481 }
482
483 switch (type) {
484 case frr::GetRequest_DataType_ALL:
485 //
486 // Combine configuration and state data into a single
487 // dnode.
488 //
489 if (lyd_merge_siblings(&dnode_state, dnode_config,
490 LYD_MERGE_DESTRUCT)
491 != LY_SUCCESS) {
492 yang_dnode_free(dnode_state);
493 yang_dnode_free(dnode_config);
494 return grpc::Status(
495 grpc::StatusCode::INTERNAL,
496 "Failed to merge configuration and state data",
497 ly_errmsg(ly_native_ctx));
498 }
499
500 dnode_final = dnode_state;
501 break;
502 case frr::GetRequest_DataType_CONFIG:
503 dnode_final = dnode_config;
504 break;
505 case frr::GetRequest_DataType_STATE:
506 dnode_final = dnode_state;
507 break;
508 }
509
510 // Validate data to create implicit default nodes if necessary.
511 int validate_opts = 0;
512 if (type == frr::GetRequest_DataType_CONFIG)
513 validate_opts = LYD_VALIDATE_NO_STATE;
514 else
515 validate_opts = 0;
516
517 LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx,
518 validate_opts, NULL);
519
520 if (err)
521 flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s",
522 __func__, ly_errmsg(ly_native_ctx));
523 // Dump data using the requested format.
524 if (!err)
525 err = data_tree_from_dnode(dt, dnode_final, lyd_format,
526 with_defaults);
527 yang_dnode_free(dnode_final);
528 if (err)
529 return grpc::Status(grpc::StatusCode::INTERNAL,
530 "Failed to dump data");
531 return grpc::Status::OK;
532 }
533
534
535 // ------------------------------------------------------
536 // RPC Callback Functions: run on main thread
537 // ------------------------------------------------------
538
539 grpc::Status HandleUnaryGetCapabilities(
540 UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse>
541 *tag)
542 {
543 grpc_debug("%s: entered", __func__);
544
545 // Response: string frr_version = 1;
546 tag->response.set_frr_version(FRR_VERSION);
547
548 // Response: bool rollback_support = 2;
549 #ifdef HAVE_CONFIG_ROLLBACKS
550 tag->response.set_rollback_support(true);
551 #else
552 tag->response.set_rollback_support(false);
553 #endif
554 // Response: repeated ModuleData supported_modules = 3;
555 struct yang_module *module;
556 RB_FOREACH (module, yang_modules, &yang_modules) {
557 auto m = tag->response.add_supported_modules();
558
559 m->set_name(module->name);
560 if (module->info->revision)
561 m->set_revision(module->info->revision);
562 m->set_organization(module->info->org);
563 }
564
565 // Response: repeated Encoding supported_encodings = 4;
566 tag->response.add_supported_encodings(frr::JSON);
567 tag->response.add_supported_encodings(frr::XML);
568
569 return grpc::Status::OK;
570 }
571
572 // Define the context variable type for this streaming handler
573 typedef std::list<std::string> GetContextType;
574
575 bool HandleStreamingGet(
576 StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag)
577 {
578 grpc_debug("%s: entered", __func__);
579
580 auto mypathps = &tag->context;
581 if (tag->is_initial_process()) {
582 // Fill our context container first time through
583 grpc_debug("%s: initialize streaming state", __func__);
584 auto paths = tag->request.path();
585 for (const std::string &path : paths) {
586 mypathps->push_back(std::string(path));
587 }
588 }
589
590 // Request: DataType type = 1;
591 int type = tag->request.type();
592 // Request: Encoding encoding = 2;
593 frr::Encoding encoding = tag->request.encoding();
594 // Request: bool with_defaults = 3;
595 bool with_defaults = tag->request.with_defaults();
596
597 if (mypathps->empty()) {
598 tag->async_responder.Finish(grpc::Status::OK, tag);
599 return false;
600 }
601
602 frr::GetResponse response;
603 grpc::Status status;
604
605 // Response: int64 timestamp = 1;
606 response.set_timestamp(time(NULL));
607
608 // Response: DataTree data = 2;
609 auto *data = response.mutable_data();
610 data->set_encoding(tag->request.encoding());
611 status = get_path(data, mypathps->back().c_str(), type,
612 encoding2lyd_format(encoding), with_defaults);
613
614 if (!status.ok()) {
615 tag->async_responder.WriteAndFinish(
616 response, grpc::WriteOptions(), status, tag);
617 return false;
618 }
619
620 mypathps->pop_back();
621 if (mypathps->empty()) {
622 tag->async_responder.WriteAndFinish(
623 response, grpc::WriteOptions(), grpc::Status::OK, tag);
624 return false;
625 } else {
626 tag->async_responder.Write(response, tag);
627 return true;
628 }
629 }
630
631 grpc::Status HandleUnaryCreateCandidate(
632 UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse>
633 *tag)
634 {
635 grpc_debug("%s: entered", __func__);
636
637 struct candidate *candidate = tag->cdb->create_candidate();
638 if (!candidate)
639 return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
640 "Can't create candidate configuration");
641 tag->response.set_candidate_id(candidate->id);
642 return grpc::Status::OK;
643 }
644
645 grpc::Status HandleUnaryDeleteCandidate(
646 UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse>
647 *tag)
648 {
649 grpc_debug("%s: entered", __func__);
650
651 uint32_t candidate_id = tag->request.candidate_id();
652
653 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
654
655 if (!tag->cdb->contains(candidate_id))
656 return grpc::Status(grpc::StatusCode::NOT_FOUND,
657 "candidate configuration not found");
658 tag->cdb->delete_candidate(candidate_id);
659 return grpc::Status::OK;
660 }
661
662 grpc::Status HandleUnaryUpdateCandidate(
663 UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse>
664 *tag)
665 {
666 grpc_debug("%s: entered", __func__);
667
668 uint32_t candidate_id = tag->request.candidate_id();
669
670 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
671
672 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
673
674 if (!candidate)
675 return grpc::Status(grpc::StatusCode::NOT_FOUND,
676 "candidate configuration not found");
677 if (candidate->transaction)
678 return grpc::Status(
679 grpc::StatusCode::FAILED_PRECONDITION,
680 "candidate is in the middle of a transaction");
681 if (nb_candidate_update(candidate->config) != NB_OK)
682 return grpc::Status(grpc::StatusCode::INTERNAL,
683 "failed to update candidate configuration");
684
685 return grpc::Status::OK;
686 }
687
688 grpc::Status HandleUnaryEditCandidate(
689 UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse>
690 *tag)
691 {
692 grpc_debug("%s: entered", __func__);
693
694 uint32_t candidate_id = tag->request.candidate_id();
695
696 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
697
698 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
699 if (!candidate)
700 return grpc::Status(grpc::StatusCode::NOT_FOUND,
701 "candidate configuration not found");
702
703 struct nb_config *candidate_tmp = nb_config_dup(candidate->config);
704
705 auto pvs = tag->request.update();
706 for (const frr::PathValue &pv : pvs) {
707 if (yang_dnode_edit(candidate_tmp->dnode, pv.path(),
708 pv.value().c_str()) != 0) {
709 nb_config_free(candidate_tmp);
710
711 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
712 "Failed to update \"" + pv.path() +
713 "\"");
714 }
715 }
716
717 pvs = tag->request.delete_();
718 for (const frr::PathValue &pv : pvs) {
719 if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) {
720 nb_config_free(candidate_tmp);
721 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
722 "Failed to remove \"" + pv.path() +
723 "\"");
724 }
725 }
726
727 // No errors, accept all changes.
728 nb_config_replace(candidate->config, candidate_tmp, false);
729 return grpc::Status::OK;
730 }
731
732 grpc::Status HandleUnaryLoadToCandidate(
733 UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse>
734 *tag)
735 {
736 grpc_debug("%s: entered", __func__);
737
738 uint32_t candidate_id = tag->request.candidate_id();
739
740 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
741
742 // Request: LoadType type = 2;
743 int load_type = tag->request.type();
744 // Request: DataTree config = 3;
745 auto config = tag->request.config();
746
747 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
748 if (!candidate)
749 return grpc::Status(grpc::StatusCode::NOT_FOUND,
750 "candidate configuration not found");
751
752 struct lyd_node *dnode = dnode_from_data_tree(&config, true);
753 if (!dnode)
754 return grpc::Status(grpc::StatusCode::INTERNAL,
755 "Failed to parse the configuration");
756
757 struct nb_config *loaded_config = nb_config_new(dnode);
758 if (load_type == frr::LoadToCandidateRequest::REPLACE)
759 nb_config_replace(candidate->config, loaded_config, false);
760 else if (nb_config_merge(candidate->config, loaded_config, false) !=
761 NB_OK)
762 return grpc::Status(grpc::StatusCode::INTERNAL,
763 "Failed to merge the loaded configuration");
764
765 return grpc::Status::OK;
766 }
767
768 grpc::Status
769 HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
770 {
771 grpc_debug("%s: entered", __func__);
772
773 // Request: uint32 candidate_id = 1;
774 uint32_t candidate_id = tag->request.candidate_id();
775
776 grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
777
778 // Request: Phase phase = 2;
779 int phase = tag->request.phase();
780 // Request: string comment = 3;
781 const std::string comment = tag->request.comment();
782
783 // Find candidate configuration.
784 struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
785 if (!candidate)
786 return grpc::Status(grpc::StatusCode::NOT_FOUND,
787 "candidate configuration not found");
788
789 int ret = NB_OK;
790 uint32_t transaction_id = 0;
791
792 // Check for misuse of the two-phase commit protocol.
793 switch (phase) {
794 case frr::CommitRequest::PREPARE:
795 case frr::CommitRequest::ALL:
796 if (candidate->transaction)
797 return grpc::Status(
798 grpc::StatusCode::FAILED_PRECONDITION,
799 "candidate is in the middle of a transaction");
800 break;
801 case frr::CommitRequest::ABORT:
802 case frr::CommitRequest::APPLY:
803 if (!candidate->transaction)
804 return grpc::Status(
805 grpc::StatusCode::FAILED_PRECONDITION,
806 "no transaction in progress");
807 break;
808 default:
809 break;
810 }
811
812
813 // Execute the user request.
814 struct nb_context context = {};
815 context.client = NB_CLIENT_GRPC;
816 char errmsg[BUFSIZ] = {0};
817
818 switch (phase) {
819 case frr::CommitRequest::VALIDATE:
820 grpc_debug("`-> Performing VALIDATE");
821 ret = nb_candidate_validate(&context, candidate->config, errmsg,
822 sizeof(errmsg));
823 break;
824 case frr::CommitRequest::PREPARE:
825 grpc_debug("`-> Performing PREPARE");
826 ret = nb_candidate_commit_prepare(
827 context, candidate->config, comment.c_str(),
828 &candidate->transaction, false, false, errmsg,
829 sizeof(errmsg));
830 break;
831 case frr::CommitRequest::ABORT:
832 grpc_debug("`-> Performing ABORT");
833 nb_candidate_commit_abort(candidate->transaction, errmsg,
834 sizeof(errmsg));
835 break;
836 case frr::CommitRequest::APPLY:
837 grpc_debug("`-> Performing APPLY");
838 nb_candidate_commit_apply(candidate->transaction, true,
839 &transaction_id, errmsg,
840 sizeof(errmsg));
841 break;
842 case frr::CommitRequest::ALL:
843 grpc_debug("`-> Performing ALL");
844 ret = nb_candidate_commit(context, candidate->config, true,
845 comment.c_str(), &transaction_id,
846 errmsg, sizeof(errmsg));
847 break;
848 }
849
850 // Map northbound error codes to gRPC status codes.
851 grpc::Status status;
852 switch (ret) {
853 case NB_OK:
854 status = grpc::Status::OK;
855 break;
856 case NB_ERR_NO_CHANGES:
857 status = grpc::Status(grpc::StatusCode::ABORTED, errmsg);
858 break;
859 case NB_ERR_LOCKED:
860 status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg);
861 break;
862 case NB_ERR_VALIDATION:
863 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
864 errmsg);
865 break;
866 case NB_ERR_RESOURCE:
867 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
868 errmsg);
869 break;
870 case NB_ERR:
871 default:
872 status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg);
873 break;
874 }
875
876 grpc_debug("`-> Result: %s (message: '%s')",
877 nb_err_name((enum nb_error)ret), errmsg);
878
879 if (ret == NB_OK) {
880 // Response: uint32 transaction_id = 1;
881 if (transaction_id)
882 tag->response.set_transaction_id(transaction_id);
883 }
884 if (strlen(errmsg) > 0)
885 tag->response.set_error_message(errmsg);
886
887 return status;
888 }
889
890 grpc::Status HandleUnaryLockConfig(
891 UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
892 {
893 grpc_debug("%s: entered", __func__);
894
895 if (nb_running_lock(NB_CLIENT_GRPC, NULL))
896 return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
897 "running configuration is locked already");
898 return grpc::Status::OK;
899 }
900
901 grpc::Status HandleUnaryUnlockConfig(
902 UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
903 {
904 grpc_debug("%s: entered", __func__);
905
906 if (nb_running_unlock(NB_CLIENT_GRPC, NULL))
907 return grpc::Status(
908 grpc::StatusCode::FAILED_PRECONDITION,
909 "failed to unlock the running configuration");
910 return grpc::Status::OK;
911 }
912
913 static void list_transactions_cb(void *arg, int transaction_id,
914 const char *client_name, const char *date,
915 const char *comment)
916 {
917 auto list = static_cast<std::list<
918 std::tuple<int, std::string, std::string, std::string>> *>(arg);
919 list->push_back(
920 std::make_tuple(transaction_id, std::string(client_name),
921 std::string(date), std::string(comment)));
922 }
923
924 // Define the context variable type for this streaming handler
925 typedef std::list<std::tuple<int, std::string, std::string, std::string>>
926 ListTransactionsContextType;
927
928 bool HandleStreamingListTransactions(
929 StreamRpcState<frr::ListTransactionsRequest,
930 frr::ListTransactionsResponse,
931 ListTransactionsContextType> *tag)
932 {
933 grpc_debug("%s: entered", __func__);
934
935 auto list = &tag->context;
936 if (tag->is_initial_process()) {
937 grpc_debug("%s: initialize streaming state", __func__);
938 // Fill our context container first time through
939 nb_db_transactions_iterate(list_transactions_cb, list);
940 list->push_back(std::make_tuple(
941 0xFFFF, std::string("fake client"),
942 std::string("fake date"), std::string("fake comment")));
943 list->push_back(std::make_tuple(0xFFFE,
944 std::string("fake client2"),
945 std::string("fake date"),
946 std::string("fake comment2")));
947 }
948
949 if (list->empty()) {
950 tag->async_responder.Finish(grpc::Status::OK, tag);
951 return false;
952 }
953
954 auto item = list->back();
955
956 frr::ListTransactionsResponse response;
957
958 // Response: uint32 id = 1;
959 response.set_id(std::get<0>(item));
960
961 // Response: string client = 2;
962 response.set_client(std::get<1>(item).c_str());
963
964 // Response: string date = 3;
965 response.set_date(std::get<2>(item).c_str());
966
967 // Response: string comment = 4;
968 response.set_comment(std::get<3>(item).c_str());
969
970 list->pop_back();
971 if (list->empty()) {
972 tag->async_responder.WriteAndFinish(
973 response, grpc::WriteOptions(), grpc::Status::OK, tag);
974 return false;
975 } else {
976 tag->async_responder.Write(response, tag);
977 return true;
978 }
979 }
980
981 grpc::Status HandleUnaryGetTransaction(
982 UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse>
983 *tag)
984 {
985 grpc_debug("%s: entered", __func__);
986
987 // Request: uint32 transaction_id = 1;
988 uint32_t transaction_id = tag->request.transaction_id();
989 // Request: Encoding encoding = 2;
990 frr::Encoding encoding = tag->request.encoding();
991 // Request: bool with_defaults = 3;
992 bool with_defaults = tag->request.with_defaults();
993
994 grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__,
995 transaction_id, encoding);
996
997 struct nb_config *nb_config;
998
999 // Load configuration from the transactions database.
1000 nb_config = nb_db_transaction_load(transaction_id);
1001 if (!nb_config)
1002 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1003 "Transaction not found");
1004
1005 // Response: DataTree config = 1;
1006 auto config = tag->response.mutable_config();
1007 config->set_encoding(encoding);
1008
1009 // Dump data using the requested format.
1010 if (data_tree_from_dnode(config, nb_config->dnode,
1011 encoding2lyd_format(encoding), with_defaults)
1012 != 0) {
1013 nb_config_free(nb_config);
1014 return grpc::Status(grpc::StatusCode::INTERNAL,
1015 "Failed to dump data");
1016 }
1017
1018 nb_config_free(nb_config);
1019
1020 return grpc::Status::OK;
1021 }
1022
1023 grpc::Status HandleUnaryExecute(
1024 UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
1025 {
1026 grpc_debug("%s: entered", __func__);
1027
1028 struct nb_node *nb_node;
1029 struct list *input_list;
1030 struct list *output_list;
1031 struct listnode *node;
1032 struct yang_data *data;
1033 const char *xpath;
1034 char errmsg[BUFSIZ] = {0};
1035
1036 // Request: string path = 1;
1037 xpath = tag->request.path().c_str();
1038
1039 grpc_debug("%s(path: \"%s\")", __func__, xpath);
1040
1041 if (tag->request.path().empty())
1042 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1043 "Data path is empty");
1044
1045 nb_node = nb_node_find(xpath);
1046 if (!nb_node)
1047 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
1048 "Unknown data path");
1049
1050 input_list = yang_data_list_new();
1051 output_list = yang_data_list_new();
1052
1053 // Read input parameters.
1054 auto input = tag->request.input();
1055 for (const frr::PathValue &pv : input) {
1056 // Request: repeated PathValue input = 2;
1057 data = yang_data_new(pv.path().c_str(), pv.value().c_str());
1058 listnode_add(input_list, data);
1059 }
1060
1061 // Execute callback registered for this XPath.
1062 if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg,
1063 sizeof(errmsg))
1064 != NB_OK) {
1065 flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s",
1066 __func__, xpath);
1067 list_delete(&input_list);
1068 list_delete(&output_list);
1069
1070 return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed");
1071 }
1072
1073 // Process output parameters.
1074 for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
1075 // Response: repeated PathValue output = 1;
1076 frr::PathValue *pv = tag->response.add_output();
1077 pv->set_path(data->xpath);
1078 pv->set_value(data->value);
1079 }
1080
1081 // Release memory.
1082 list_delete(&input_list);
1083 list_delete(&output_list);
1084
1085 return grpc::Status::OK;
1086 }
1087
1088 // ------------------------------------------------------
1089 // Thread Initialization and Run Functions
1090 // ------------------------------------------------------
1091
1092
1093 #define REQUEST_NEWRPC(NAME, cdb) \
1094 do { \
1095 auto _rpcState = new UnaryRpcState<frr::NAME##Request, \
1096 frr::NAME##Response>( \
1097 (cdb), &frr::Northbound::AsyncService::Request##NAME, \
1098 &HandleUnary##NAME, #NAME); \
1099 _rpcState->do_request(&service, cq.get(), true); \
1100 } while (0)
1101
1102 #define REQUEST_NEWRPC_STREAMING(NAME) \
1103 do { \
1104 auto _rpcState = new StreamRpcState<frr::NAME##Request, \
1105 frr::NAME##Response, \
1106 NAME##ContextType>( \
1107 &frr::Northbound::AsyncService::Request##NAME, \
1108 &HandleStreaming##NAME, #NAME); \
1109 _rpcState->do_request(&service, cq.get(), true); \
1110 } while (0)
1111
1112 struct grpc_pthread_attr {
1113 struct frr_pthread_attr attr;
1114 unsigned long port;
1115 };
1116
1117 // Capture these objects so we can try to shut down cleanly
1118 static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER;
1119 static grpc::Server *s_server;
1120
1121 static void *grpc_pthread_start(void *arg)
1122 {
1123 struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
1124 uint port = (uint) reinterpret_cast<intptr_t>(fpt->data);
1125
1126 Candidates candidates;
1127 grpc::ServerBuilder builder;
1128 std::stringstream server_address;
1129 frr::Northbound::AsyncService service;
1130
1131 frr_pthread_set_name(fpt);
1132
1133 server_address << "0.0.0.0:" << port;
1134 builder.AddListeningPort(server_address.str(),
1135 grpc::InsecureServerCredentials());
1136 builder.RegisterService(&service);
1137 builder.AddChannelArgument(
1138 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
1139 std::unique_ptr<grpc::ServerCompletionQueue> cq =
1140 builder.AddCompletionQueue();
1141 std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
1142 s_server = server.get();
1143
1144 pthread_mutex_lock(&s_server_lock); // Make coverity happy
1145 grpc_running = true;
1146 pthread_mutex_unlock(&s_server_lock); // Make coverity happy
1147
1148 /* Schedule unary RPC handlers */
1149 REQUEST_NEWRPC(GetCapabilities, NULL);
1150 REQUEST_NEWRPC(CreateCandidate, &candidates);
1151 REQUEST_NEWRPC(DeleteCandidate, &candidates);
1152 REQUEST_NEWRPC(UpdateCandidate, &candidates);
1153 REQUEST_NEWRPC(EditCandidate, &candidates);
1154 REQUEST_NEWRPC(LoadToCandidate, &candidates);
1155 REQUEST_NEWRPC(Commit, &candidates);
1156 REQUEST_NEWRPC(GetTransaction, NULL);
1157 REQUEST_NEWRPC(LockConfig, NULL);
1158 REQUEST_NEWRPC(UnlockConfig, NULL);
1159 REQUEST_NEWRPC(Execute, NULL);
1160
1161 /* Schedule streaming RPC handlers */
1162 REQUEST_NEWRPC_STREAMING(Get);
1163 REQUEST_NEWRPC_STREAMING(ListTransactions);
1164
1165 zlog_notice("gRPC server listening on %s",
1166 server_address.str().c_str());
1167
1168 /* Process inbound RPCs */
1169 bool ok;
1170 void *tag;
1171 while (true) {
1172 if (!cq->Next(&tag, &ok)) {
1173 grpc_debug("%s: CQ empty exiting", __func__);
1174 break;
1175 }
1176
1177 grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag,
1178 ok);
1179
1180 if (!ok) {
1181 delete static_cast<RpcStateBase *>(tag);
1182 break;
1183 }
1184
1185 RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
1186 if (rpc->get_state() != FINISH)
1187 rpc->run(&service, cq.get());
1188 else {
1189 grpc_debug("%s RPC FINISH -> [delete]", rpc->name);
1190 delete rpc;
1191 }
1192 }
1193
1194 /* This was probably done for us to get here, but let's be safe */
1195 pthread_mutex_lock(&s_server_lock);
1196 grpc_running = false;
1197 if (s_server) {
1198 grpc_debug("%s: shutdown server and CQ", __func__);
1199 server->Shutdown();
1200 s_server = NULL;
1201 }
1202 pthread_mutex_unlock(&s_server_lock);
1203
1204 grpc_debug("%s: shutting down CQ", __func__);
1205 cq->Shutdown();
1206
1207 grpc_debug("%s: draining the CQ", __func__);
1208 while (cq->Next(&tag, &ok)) {
1209 grpc_debug("%s: drain tag %p", __func__, tag);
1210 delete static_cast<RpcStateBase *>(tag);
1211 }
1212
1213 zlog_info("%s: exiting from grpc pthread", __func__);
1214 return NULL;
1215 }
1216
1217
1218 static int frr_grpc_init(uint port)
1219 {
1220 struct frr_pthread_attr attr = {
1221 .start = grpc_pthread_start,
1222 .stop = NULL,
1223 };
1224
1225 grpc_debug("%s: entered", __func__);
1226
1227 fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
1228 fpt->data = reinterpret_cast<void *>((intptr_t)port);
1229
1230 /* Create a pthread for gRPC since it runs its own event loop. */
1231 if (frr_pthread_run(fpt, NULL) < 0) {
1232 flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s",
1233 __func__, safe_strerror(errno));
1234 return -1;
1235 }
1236
1237 return 0;
1238 }
1239
1240 static int frr_grpc_finish(void)
1241 {
1242 grpc_debug("%s: entered", __func__);
1243
1244 if (!fpt)
1245 return 0;
1246
1247 /*
1248 * Shut the server down here in main thread. This will cause the wait on
1249 * the completion queue (cq.Next()) to exit and cleanup everything else.
1250 */
1251 pthread_mutex_lock(&s_server_lock);
1252 grpc_running = false;
1253 if (s_server) {
1254 grpc_debug("%s: shutdown server", __func__);
1255 s_server->Shutdown();
1256 s_server = NULL;
1257 }
1258 pthread_mutex_unlock(&s_server_lock);
1259
1260 grpc_debug("%s: joining and destroy grpc thread", __func__);
1261 pthread_join(fpt->thread, NULL);
1262 frr_pthread_destroy(fpt);
1263
1264 // Fix protobuf 'memory leaks' during shutdown.
1265 // https://groups.google.com/g/protobuf/c/4y_EmQiCGgs
1266 google::protobuf::ShutdownProtobufLibrary();
1267
1268 return 0;
1269 }
1270
1271 /*
1272 * This is done this way because module_init and module_late_init are both
1273 * called during daemon pre-fork initialization. Because the GRPC library
1274 * spawns threads internally, we need to delay initializing it until after
1275 * fork. This is done by scheduling this init function as an event task, since
1276 * the event loop doesn't run until after fork.
1277 */
1278 static void frr_grpc_module_very_late_init(struct thread *thread)
1279 {
1280 const char *args = THIS_MODULE->load_args;
1281 uint port = GRPC_DEFAULT_PORT;
1282
1283 if (args) {
1284 port = std::stoul(args);
1285 if (port < 1024 || port > UINT16_MAX) {
1286 flog_err(EC_LIB_GRPC_INIT,
1287 "%s: port number must be between 1025 and %d",
1288 __func__, UINT16_MAX);
1289 goto error;
1290 }
1291 }
1292
1293 if (frr_grpc_init(port) < 0)
1294 goto error;
1295
1296 return;
1297
1298 error:
1299 flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module");
1300 }
1301
1302 static int frr_grpc_module_late_init(struct thread_master *tm)
1303 {
1304 main_master = tm;
1305 hook_register(frr_fini, frr_grpc_finish);
1306 thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL);
1307 return 0;
1308 }
1309
1310 static int frr_grpc_module_init(void)
1311 {
1312 hook_register(frr_late_init, frr_grpc_module_late_init);
1313
1314 return 0;
1315 }
1316
1317 FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION,
1318 .description = "FRR gRPC northbound module",
1319 .init = frr_grpc_module_init, );