]> git.proxmox.com Git - mirror_frr.git/blame - lib/northbound_grpc.cpp
tools: fix frr-reload.py daemon option
[mirror_frr.git] / lib / northbound_grpc.cpp
CommitLineData
ec2ac5f2
RW
1//
2// Copyright (C) 2019 NetDEF, Inc.
3// Renato Westphal
4//
5// This program is free software; you can redistribute it and/or modify it
6// under the terms of the GNU General Public License as published by the Free
7// Software Foundation; either version 2 of the License, or (at your option)
8// any later version.
9//
10// This program is distributed in the hope that it will be useful, but WITHOUT
11// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13// more details.
14//
15// You should have received a copy of the GNU General Public License along
16// with this program; see the file COPYING; if not, write to the Free Software
17// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18//
19
20#include <zebra.h>
63d12a7d
QY
21#include <grpcpp/grpcpp.h>
22#include "grpc/frr-northbound.grpc.pb.h"
ec2ac5f2
RW
23
24#include "log.h"
25#include "libfrr.h"
26#include "version.h"
27#include "command.h"
28#include "lib_errors.h"
29#include "northbound.h"
30#include "northbound_db.h"
0edcb505 31#include "frr_pthread.h"
ec2ac5f2
RW
32
33#include <iostream>
34#include <sstream>
35#include <memory>
36#include <string>
37
ec2ac5f2
RW
38#define GRPC_DEFAULT_PORT 50051
39
0edcb505
CS
40static void *grpc_pthread_start(void *arg);
41
ec2ac5f2
RW
42/*
43 * NOTE: we can't use the FRR debugging infrastructure here since it uses
44 * atomics and C++ has a different atomics API. Enable gRPC debugging
45 * unconditionally until we figure out a way to solve this problem.
46 */
47static bool nb_dbg_client_grpc = 1;
48
0edcb505
CS
49static struct frr_pthread *fpt;
50
51/* Default frr_pthread attributes */
52static const struct frr_pthread_attr attr = {
53 .start = grpc_pthread_start,
54 .stop = NULL,
55};
ec2ac5f2 56
ecf9fb30
QY
57enum CallStatus { CREATE, PROCESS, FINISH };
58
59/* Thanks gooble */
60class RpcStateBase
61{
62 public:
63 virtual void doCallback() = 0;
64};
65
66class NorthboundImpl;
67
68template <typename Q, typename S> class RpcState : RpcStateBase
69{
70 public:
71 RpcState(NorthboundImpl *svc,
72 void (NorthboundImpl::*cb)(RpcState<Q, S> *))
73 : callback(cb), responder(&ctx), async_responder(&ctx),
74 service(svc){};
75
76 void doCallback() override
77 {
78 (service->*callback)(this);
79 }
80
81 grpc::ServerContext ctx;
82 Q request;
83 S response;
84 grpc::ServerAsyncResponseWriter<S> responder;
85 grpc::ServerAsyncWriter<S> async_responder;
86
87 NorthboundImpl *service;
88 void (NorthboundImpl::*callback)(RpcState<Q, S> *);
89
90 void *context;
91 CallStatus state = CREATE;
92};
93
94#define REQUEST_RPC(NAME) \
95 do { \
96 auto _rpcState = \
97 new RpcState<frr::NAME##Request, frr::NAME##Response>( \
98 this, &NorthboundImpl::Handle##NAME); \
99 _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \
100 &_rpcState->responder, _cq, _cq, \
101 _rpcState); \
102 } while (0)
103
104#define REQUEST_RPC_STREAMING(NAME) \
105 do { \
106 auto _rpcState = \
107 new RpcState<frr::NAME##Request, frr::NAME##Response>( \
108 this, &NorthboundImpl::Handle##NAME); \
109 _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \
110 &_rpcState->async_responder, _cq, _cq, \
111 _rpcState); \
112 } while (0)
113
114class NorthboundImpl
ec2ac5f2
RW
115{
116 public:
117 NorthboundImpl(void)
118 {
119 _nextCandidateId = 0;
ecf9fb30 120 _service = new frr::Northbound::AsyncService();
ec2ac5f2
RW
121 }
122
123 ~NorthboundImpl(void)
124 {
125 // Delete candidates.
126 for (auto it = _candidates.begin(); it != _candidates.end();
127 it++)
128 delete_candidate(&it->second);
129 }
130
ecf9fb30
QY
131 void Run(unsigned long port)
132 {
133 grpc::ServerBuilder builder;
134 std::stringstream server_address;
135
136 server_address << "0.0.0.0:" << port;
137
138 builder.AddListeningPort(server_address.str(),
139 grpc::InsecureServerCredentials());
140 builder.RegisterService(_service);
141
142 auto cq = builder.AddCompletionQueue();
143 _cq = cq.get();
144 auto _server = builder.BuildAndStart();
145
146 /* Schedule all RPC handlers */
147 REQUEST_RPC(GetCapabilities);
148 REQUEST_RPC(CreateCandidate);
149 REQUEST_RPC(DeleteCandidate);
150 REQUEST_RPC(UpdateCandidate);
151 REQUEST_RPC(EditCandidate);
152 REQUEST_RPC(LoadToCandidate);
153 REQUEST_RPC(Commit);
154 REQUEST_RPC(GetTransaction);
155 REQUEST_RPC(LockConfig);
156 REQUEST_RPC(UnlockConfig);
157 REQUEST_RPC(Execute);
158 REQUEST_RPC_STREAMING(Get);
159 REQUEST_RPC_STREAMING(ListTransactions);
160
161 zlog_notice("gRPC server listening on %s",
162 server_address.str().c_str());
163
164 /* Process inbound RPCs */
165 void *tag;
166 bool ok;
167 while (true) {
168 _cq->Next(&tag, &ok);
169 GPR_ASSERT(ok);
170 static_cast<RpcStateBase *>(tag)->doCallback();
171 tag = nullptr;
172 }
173 }
174
175 void HandleGetCapabilities(RpcState<frr::GetCapabilitiesRequest,
176 frr::GetCapabilitiesResponse> *tag)
ec2ac5f2 177 {
ecf9fb30 178 switch (tag->state) {
f6239dc7 179 case CREATE:
ecf9fb30
QY
180 REQUEST_RPC(GetCapabilities);
181 tag->state = PROCESS;
ecf9fb30 182 case PROCESS: {
0e10aeeb
RW
183 if (nb_dbg_client_grpc)
184 zlog_debug("received RPC GetCapabilities()");
ecf9fb30
QY
185
186 // Response: string frr_version = 1;
187 tag->response.set_frr_version(FRR_VERSION);
ec2ac5f2 188
ecf9fb30 189 // Response: bool rollback_support = 2;
ec2ac5f2 190#ifdef HAVE_CONFIG_ROLLBACKS
ecf9fb30 191 tag->response.set_rollback_support(true);
ec2ac5f2 192#else
ecf9fb30 193 tag->response.set_rollback_support(false);
ec2ac5f2
RW
194#endif
195
ecf9fb30
QY
196 // Response: repeated ModuleData supported_modules = 3;
197 struct yang_module *module;
198 RB_FOREACH (module, yang_modules, &yang_modules) {
199 auto m = tag->response.add_supported_modules();
ec2ac5f2 200
ecf9fb30
QY
201 m->set_name(module->name);
202 if (module->info->rev_size)
203 m->set_revision(
204 module->info->rev[0].date);
205 m->set_organization(module->info->org);
206 }
ec2ac5f2 207
ecf9fb30
QY
208 // Response: repeated Encoding supported_encodings = 4;
209 tag->response.add_supported_encodings(frr::JSON);
210 tag->response.add_supported_encodings(frr::XML);
ec2ac5f2 211
ecf9fb30
QY
212 tag->responder.Finish(tag->response, grpc::Status::OK,
213 tag);
214 tag->state = FINISH;
215 break;
216 }
217 case FINISH:
218 delete tag;
219 }
ec2ac5f2
RW
220 }
221
ecf9fb30 222 void HandleGet(RpcState<frr::GetRequest, frr::GetResponse> *tag)
ec2ac5f2 223 {
ecf9fb30
QY
224 switch (tag->state) {
225 case CREATE: {
226 auto mypaths = new std::list<std::string>();
227 tag->context = mypaths;
228 auto paths = tag->request.path();
229 for (const std::string &path : paths) {
230 mypaths->push_back(std::string(path));
231 }
232 REQUEST_RPC_STREAMING(Get);
233 tag->state = PROCESS;
234 }
235 case PROCESS: {
236 // Request: DataType type = 1;
237 int type = tag->request.type();
238 // Request: Encoding encoding = 2;
239 frr::Encoding encoding = tag->request.encoding();
240 // Request: bool with_defaults = 3;
241 bool with_defaults = tag->request.with_defaults();
242
243 if (nb_dbg_client_grpc)
244 zlog_debug(
245 "received RPC Get(type: %u, encoding: %u, with_defaults: %u)",
246 type, encoding, with_defaults);
247
248 auto mypaths = static_cast<std::list<std::string> *>(
249 tag->context);
250
251 if (mypaths->empty()) {
252 tag->async_responder.Finish(grpc::Status::OK,
253 tag);
254 tag->state = FINISH;
255 return;
256 }
ec2ac5f2 257
ec2ac5f2 258
ec2ac5f2
RW
259 frr::GetResponse response;
260 grpc::Status status;
261
262 // Response: int64 timestamp = 1;
263 response.set_timestamp(time(NULL));
264
265 // Response: DataTree data = 2;
266 auto *data = response.mutable_data();
ecf9fb30
QY
267 data->set_encoding(tag->request.encoding());
268 status = get_path(data, mypaths->back().c_str(), type,
ec2ac5f2
RW
269 encoding2lyd_format(encoding),
270 with_defaults);
271
272 // Something went wrong...
ecf9fb30
QY
273 if (!status.ok()) {
274 tag->async_responder.WriteAndFinish(
275 response, grpc::WriteOptions(), status,
276 tag);
277 tag->state = FINISH;
278 return;
279 }
ec2ac5f2 280
ecf9fb30 281 mypaths->pop_back();
ec2ac5f2 282
ecf9fb30 283 tag->async_responder.Write(response, tag);
ec2ac5f2 284
ecf9fb30
QY
285 break;
286 }
287 case FINISH:
288 if (nb_dbg_client_grpc)
289 zlog_debug("received RPC Get() end");
290
291 delete static_cast<std::list<std::string> *>(
292 tag->context);
293 delete tag;
294 }
ec2ac5f2
RW
295 }
296
ecf9fb30
QY
297 void HandleCreateCandidate(RpcState<frr::CreateCandidateRequest,
298 frr::CreateCandidateResponse> *tag)
ec2ac5f2 299 {
ecf9fb30 300 switch (tag->state) {
f6239dc7 301 case CREATE:
ecf9fb30
QY
302 REQUEST_RPC(CreateCandidate);
303 tag->state = PROCESS;
ecf9fb30 304 case PROCESS: {
0e10aeeb
RW
305 if (nb_dbg_client_grpc)
306 zlog_debug("received RPC CreateCandidate()");
307
ecf9fb30
QY
308 struct candidate *candidate = create_candidate();
309 if (!candidate) {
310 tag->responder.Finish(
311 tag->response,
312 grpc::Status(
313 grpc::StatusCode::
314 RESOURCE_EXHAUSTED,
315 "Can't create candidate configuration"),
316 tag);
317 } else {
318 tag->response.set_candidate_id(candidate->id);
319 tag->responder.Finish(tag->response,
320 grpc::Status::OK, tag);
321 }
ec2ac5f2 322
ecf9fb30 323 tag->state = FINISH;
ec2ac5f2 324
ecf9fb30
QY
325 break;
326 }
327 case FINISH:
328 delete tag;
329 }
ec2ac5f2
RW
330 }
331
ecf9fb30
QY
332 void HandleDeleteCandidate(RpcState<frr::DeleteCandidateRequest,
333 frr::DeleteCandidateResponse> *tag)
ec2ac5f2 334 {
ecf9fb30 335 switch (tag->state) {
f6239dc7 336 case CREATE:
ecf9fb30
QY
337 REQUEST_RPC(DeleteCandidate);
338 tag->state = PROCESS;
ecf9fb30
QY
339 case PROCESS: {
340
341 // Request: uint32 candidate_id = 1;
342 uint32_t candidate_id = tag->request.candidate_id();
343
344 if (nb_dbg_client_grpc)
345 zlog_debug(
346 "received RPC DeleteCandidate(candidate_id: %u)",
347 candidate_id);
348
349 struct candidate *candidate =
350 get_candidate(candidate_id);
351 if (!candidate) {
352 tag->responder.Finish(
353 tag->response,
354 grpc::Status(
355 grpc::StatusCode::NOT_FOUND,
356 "candidate configuration not found"),
357 tag);
358 tag->state = FINISH;
359 return;
360 } else {
361 delete_candidate(candidate);
362 tag->responder.Finish(tag->response,
363 grpc::Status::OK, tag);
364 tag->state = FINISH;
365 return;
366 }
367 tag->state = FINISH;
368 break;
369 }
370 case FINISH:
371 delete tag;
372 }
373 }
ec2ac5f2 374
ecf9fb30
QY
375 void HandleUpdateCandidate(RpcState<frr::UpdateCandidateRequest,
376 frr::UpdateCandidateResponse> *tag)
377 {
378 switch (tag->state) {
f6239dc7 379 case CREATE:
ecf9fb30
QY
380 REQUEST_RPC(UpdateCandidate);
381 tag->state = PROCESS;
ecf9fb30
QY
382 case PROCESS: {
383
384 // Request: uint32 candidate_id = 1;
385 uint32_t candidate_id = tag->request.candidate_id();
386
387 if (nb_dbg_client_grpc)
388 zlog_debug(
389 "received RPC UpdateCandidate(candidate_id: %u)",
390 candidate_id);
391
392 struct candidate *candidate =
393 get_candidate(candidate_id);
394
395 if (!candidate)
396 tag->responder.Finish(
397 tag->response,
398 grpc::Status(
399 grpc::StatusCode::NOT_FOUND,
400 "candidate configuration not found"),
401 tag);
402 else if (candidate->transaction)
403 tag->responder.Finish(
404 tag->response,
405 grpc::Status(
406 grpc::StatusCode::
407 FAILED_PRECONDITION,
408 "candidate is in the middle of a transaction"),
409 tag);
410 else if (nb_candidate_update(candidate->config)
411 != NB_OK)
412 tag->responder.Finish(
413 tag->response,
414 grpc::Status(
415 grpc::StatusCode::INTERNAL,
416 "failed to update candidate configuration"),
417 tag);
418
419 else
420 tag->responder.Finish(tag->response,
421 grpc::Status::OK, tag);
422
423 tag->state = FINISH;
ec2ac5f2 424
ecf9fb30
QY
425 break;
426 }
427 case FINISH:
428 delete tag;
429 }
ec2ac5f2
RW
430 }
431
ecf9fb30
QY
432 void HandleEditCandidate(RpcState<frr::EditCandidateRequest,
433 frr::EditCandidateResponse> *tag)
ec2ac5f2 434 {
ecf9fb30 435 switch (tag->state) {
f6239dc7 436 case CREATE:
ecf9fb30
QY
437 REQUEST_RPC(EditCandidate);
438 tag->state = PROCESS;
ecf9fb30
QY
439 case PROCESS: {
440
441 // Request: uint32 candidate_id = 1;
442 uint32_t candidate_id = tag->request.candidate_id();
443
444 if (nb_dbg_client_grpc)
445 zlog_debug(
446 "received RPC EditCandidate(candidate_id: %u)",
447 candidate_id);
448
449 struct candidate *candidate =
450 get_candidate(candidate_id);
451
452 if (!candidate) {
453 tag->responder.Finish(
454 tag->response,
455 grpc::Status(
456 grpc::StatusCode::NOT_FOUND,
457 "candidate configuration not found"),
458 tag);
459 tag->state = FINISH;
460 break;
461 }
ec2ac5f2 462
ecf9fb30
QY
463 struct nb_config *candidate_tmp =
464 nb_config_dup(candidate->config);
465
466 auto pvs = tag->request.update();
467 for (const frr::PathValue &pv : pvs) {
468 if (yang_dnode_edit(candidate_tmp->dnode,
469 pv.path(), pv.value())
470 != 0) {
471 nb_config_free(candidate_tmp);
472
473 tag->responder.Finish(
474 tag->response,
475 grpc::Status(
476 grpc::StatusCode::
477 INVALID_ARGUMENT,
478 "Failed to update \""
479 + pv.path()
480 + "\""),
481 tag);
482
483 tag->state = FINISH;
484 return;
485 }
486 }
487
488 pvs = tag->request.delete_();
489 for (const frr::PathValue &pv : pvs) {
490 if (yang_dnode_delete(candidate_tmp->dnode,
491 pv.path())
492 != 0) {
493 nb_config_free(candidate_tmp);
494 tag->responder.Finish(
495 tag->response,
496 grpc::Status(
497 grpc::StatusCode::
498 INVALID_ARGUMENT,
499 "Failed to remove \""
500 + pv.path()
501 + "\""),
502 tag);
503 tag->state = FINISH;
504 return;
505 }
506 }
ec2ac5f2 507
ecf9fb30
QY
508 // No errors, accept all changes.
509 nb_config_replace(candidate->config, candidate_tmp,
510 false);
ec2ac5f2 511
ecf9fb30
QY
512 tag->responder.Finish(tag->response, grpc::Status::OK,
513 tag);
ec2ac5f2 514
ecf9fb30 515 tag->state = FINISH;
ec2ac5f2 516
ecf9fb30
QY
517 break;
518 }
519 case FINISH:
520 delete tag;
521 }
ec2ac5f2
RW
522 }
523
ecf9fb30
QY
524 void HandleLoadToCandidate(RpcState<frr::LoadToCandidateRequest,
525 frr::LoadToCandidateResponse> *tag)
ec2ac5f2 526 {
ecf9fb30 527 switch (tag->state) {
f6239dc7 528 case CREATE:
ecf9fb30
QY
529 REQUEST_RPC(LoadToCandidate);
530 tag->state = PROCESS;
ecf9fb30 531 case PROCESS: {
ecf9fb30
QY
532 // Request: uint32 candidate_id = 1;
533 uint32_t candidate_id = tag->request.candidate_id();
534
535 if (nb_dbg_client_grpc)
536 zlog_debug(
537 "received RPC LoadToCandidate(candidate_id: %u)",
538 candidate_id);
539
540 // Request: LoadType type = 2;
541 int load_type = tag->request.type();
542 // Request: DataTree config = 3;
543 auto config = tag->request.config();
544
545
546 struct candidate *candidate =
547 get_candidate(candidate_id);
548
549 if (!candidate) {
550 tag->responder.Finish(
551 tag->response,
552 grpc::Status(
553 grpc::StatusCode::NOT_FOUND,
554 "candidate configuration not found"),
555 tag);
556 tag->state = FINISH;
557 return;
558 }
ec2ac5f2 559
ecf9fb30
QY
560 struct lyd_node *dnode =
561 dnode_from_data_tree(&config, true);
562 if (!dnode) {
563 tag->responder.Finish(
564 tag->response,
565 grpc::Status(
566 grpc::StatusCode::INTERNAL,
567 "Failed to parse the configuration"),
568 tag);
569 tag->state = FINISH;
570 return;
ec2ac5f2 571 }
ec2ac5f2 572
ecf9fb30
QY
573 struct nb_config *loaded_config = nb_config_new(dnode);
574
575 if (load_type == frr::LoadToCandidateRequest::REPLACE)
576 nb_config_replace(candidate->config,
577 loaded_config, false);
578 else if (nb_config_merge(candidate->config,
579 loaded_config, false)
580 != NB_OK) {
581 tag->responder.Finish(
582 tag->response,
583 grpc::Status(
584 grpc::StatusCode::INTERNAL,
585 "Failed to merge the loaded configuration"),
586 tag);
587 tag->state = FINISH;
588 return;
589 }
ec2ac5f2 590
ecf9fb30
QY
591 tag->responder.Finish(tag->response, grpc::Status::OK,
592 tag);
593 tag->state = FINISH;
594 break;
595 }
596 case FINISH:
597 delete tag;
598 }
ec2ac5f2
RW
599 }
600
ecf9fb30
QY
601 void
602 HandleCommit(RpcState<frr::CommitRequest, frr::CommitResponse> *tag)
ec2ac5f2 603 {
ecf9fb30 604 switch (tag->state) {
f6239dc7 605 case CREATE:
ecf9fb30
QY
606 REQUEST_RPC(Commit);
607 tag->state = PROCESS;
ecf9fb30 608 case PROCESS: {
ecf9fb30
QY
609 // Request: uint32 candidate_id = 1;
610 uint32_t candidate_id = tag->request.candidate_id();
611 if (nb_dbg_client_grpc)
612 zlog_debug(
613 "received RPC Commit(candidate_id: %u)",
614 candidate_id);
615
616 // Request: Phase phase = 2;
617 int phase = tag->request.phase();
618 // Request: string comment = 3;
619 const std::string comment = tag->request.comment();
620
621 // Find candidate configuration.
622 struct candidate *candidate =
623 get_candidate(candidate_id);
624 if (!candidate) {
625 tag->responder.Finish(
626 tag->response,
627 grpc::Status(
628 grpc::StatusCode::NOT_FOUND,
629 "candidate configuration not found"),
630 tag);
631 tag->state = FINISH;
632 return;
633 }
ec2ac5f2 634
ecf9fb30
QY
635 int ret = NB_OK;
636 uint32_t transaction_id = 0;
637
638 // Check for misuse of the two-phase commit protocol.
639 switch (phase) {
640 case frr::CommitRequest::PREPARE:
641 case frr::CommitRequest::ALL:
642 if (candidate->transaction) {
643 tag->responder.Finish(
644 tag->response,
645 grpc::Status(
646 grpc::StatusCode::
647 FAILED_PRECONDITION,
648 "candidate is in the middle of a transaction"),
649 tag);
650 tag->state = FINISH;
651 return;
652 }
653 break;
654 case frr::CommitRequest::ABORT:
655 case frr::CommitRequest::APPLY:
656 if (!candidate->transaction) {
657 tag->responder.Finish(
658 tag->response,
659 grpc::Status(
660 grpc::StatusCode::
661 FAILED_PRECONDITION,
662 "no transaction in progress"),
663 tag);
664 tag->state = FINISH;
665 return;
666 }
667 break;
668 default:
669 break;
670 }
ec2ac5f2 671
ec2ac5f2 672
ecf9fb30 673 // Execute the user request.
13d6b9c1
RW
674 struct nb_context context = {};
675 context.client = NB_CLIENT_GRPC;
df5eda3d 676 char errmsg[BUFSIZ] = {0};
13d6b9c1 677
ecf9fb30
QY
678 switch (phase) {
679 case frr::CommitRequest::VALIDATE:
df5eda3d
RW
680 ret = nb_candidate_validate(
681 &context, candidate->config, errmsg,
682 sizeof(errmsg));
ecf9fb30
QY
683 break;
684 case frr::CommitRequest::PREPARE:
685 ret = nb_candidate_commit_prepare(
13d6b9c1 686 &context, candidate->config,
ecf9fb30 687 comment.c_str(),
df5eda3d
RW
688 &candidate->transaction, errmsg,
689 sizeof(errmsg));
ecf9fb30
QY
690 break;
691 case frr::CommitRequest::ABORT:
692 nb_candidate_commit_abort(
693 candidate->transaction);
694 break;
695 case frr::CommitRequest::APPLY:
696 nb_candidate_commit_apply(
697 candidate->transaction, true,
698 &transaction_id);
699 break;
700 case frr::CommitRequest::ALL:
701 ret = nb_candidate_commit(
13d6b9c1 702 &context, candidate->config, true,
df5eda3d
RW
703 comment.c_str(), &transaction_id,
704 errmsg, sizeof(errmsg));
ecf9fb30
QY
705 break;
706 }
ec2ac5f2 707
df5eda3d
RW
708 // Map northbound error codes to gRPC status codes.
709 grpc::Status status;
ecf9fb30 710 switch (ret) {
df5eda3d
RW
711 case NB_OK:
712 status = grpc::Status::OK;
713 break;
ecf9fb30 714 case NB_ERR_NO_CHANGES:
df5eda3d
RW
715 status = grpc::Status(grpc::StatusCode::ABORTED,
716 errmsg);
717 break;
ecf9fb30 718 case NB_ERR_LOCKED:
df5eda3d
RW
719 status = grpc::Status(
720 grpc::StatusCode::UNAVAILABLE, errmsg);
721 break;
ecf9fb30 722 case NB_ERR_VALIDATION:
df5eda3d
RW
723 status = grpc::Status(
724 grpc::StatusCode::INVALID_ARGUMENT,
725 errmsg);
726 break;
ecf9fb30 727 case NB_ERR_RESOURCE:
df5eda3d
RW
728 status = grpc::Status(
729 grpc::StatusCode::RESOURCE_EXHAUSTED,
730 errmsg);
731 break;
ecf9fb30 732 case NB_ERR:
ecf9fb30 733 default:
df5eda3d
RW
734 status = grpc::Status(
735 grpc::StatusCode::INTERNAL, errmsg);
ecf9fb30
QY
736 break;
737 }
df5eda3d
RW
738 if (ret == NB_OK) {
739 // Response: uint32 transaction_id = 1;
740 if (transaction_id)
741 tag->response.set_transaction_id(
742 transaction_id);
743 }
ec2ac5f2 744
df5eda3d 745 tag->responder.Finish(tag->response, status, tag);
ecf9fb30 746 tag->state = FINISH;
ec2ac5f2
RW
747 break;
748 }
ecf9fb30
QY
749 case FINISH:
750 delete tag;
ec2ac5f2 751 }
ec2ac5f2
RW
752 }
753
ecf9fb30
QY
754 void
755 HandleListTransactions(RpcState<frr::ListTransactionsRequest,
756 frr::ListTransactionsResponse> *tag)
ec2ac5f2 757 {
ecf9fb30 758 switch (tag->state) {
f6239dc7 759 case CREATE:
ecf9fb30
QY
760 REQUEST_RPC_STREAMING(ListTransactions);
761 tag->context = new std::list<std::tuple<
762 int, std::string, std::string, std::string>>();
763 nb_db_transactions_iterate(list_transactions_cb,
764 tag->context);
765 tag->state = PROCESS;
ecf9fb30 766 case PROCESS: {
0e10aeeb
RW
767 if (nb_dbg_client_grpc)
768 zlog_debug("received RPC ListTransactions()");
769
ecf9fb30
QY
770 auto list = static_cast<std::list<std::tuple<
771 int, std::string, std::string, std::string>> *>(
772 tag->context);
773 if (list->empty()) {
774 tag->async_responder.Finish(grpc::Status::OK,
775 tag);
776 tag->state = FINISH;
777 return;
778 }
779 auto item = list->back();
ec2ac5f2 780
ec2ac5f2 781
ecf9fb30 782 frr::ListTransactionsResponse response;
ec2ac5f2 783
ecf9fb30
QY
784 // Response: uint32 id = 1;
785 response.set_id(std::get<0>(item));
ec2ac5f2 786
ecf9fb30
QY
787 // Response: string client = 2;
788 response.set_client(std::get<1>(item).c_str());
ec2ac5f2 789
ecf9fb30
QY
790 // Response: string date = 3;
791 response.set_date(std::get<2>(item).c_str());
ec2ac5f2 792
ecf9fb30
QY
793 // Response: string comment = 4;
794 response.set_comment(std::get<3>(item).c_str());
ec2ac5f2 795
ecf9fb30 796 list->pop_back();
ec2ac5f2 797
ecf9fb30
QY
798 tag->async_responder.Write(response, tag);
799 break;
800 }
801 case FINISH:
802 delete static_cast<std::list<std::tuple<
803 int, std::string, std::string, std::string>> *>(
804 tag->context);
805 delete tag;
806 }
ec2ac5f2
RW
807 }
808
ecf9fb30
QY
809 void HandleGetTransaction(RpcState<frr::GetTransactionRequest,
810 frr::GetTransactionResponse> *tag)
ec2ac5f2 811 {
ecf9fb30 812 switch (tag->state) {
f6239dc7 813 case CREATE:
ecf9fb30
QY
814 REQUEST_RPC(GetTransaction);
815 tag->state = PROCESS;
ecf9fb30
QY
816 case PROCESS: {
817 // Request: uint32 transaction_id = 1;
818 uint32_t transaction_id = tag->request.transaction_id();
819 // Request: Encoding encoding = 2;
820 frr::Encoding encoding = tag->request.encoding();
821 // Request: bool with_defaults = 3;
822 bool with_defaults = tag->request.with_defaults();
823
824 if (nb_dbg_client_grpc)
825 zlog_debug(
826 "received RPC GetTransaction(transaction_id: %u, encoding: %u)",
827 transaction_id, encoding);
828
829 struct nb_config *nb_config;
830
831 // Load configuration from the transactions database.
832 nb_config = nb_db_transaction_load(transaction_id);
833 if (!nb_config) {
834 tag->responder.Finish(
835 tag->response,
836 grpc::Status(grpc::StatusCode::
837 INVALID_ARGUMENT,
838 "Transaction not found"),
839 tag);
840 tag->state = FINISH;
841 return;
842 }
ec2ac5f2 843
ecf9fb30
QY
844 // Response: DataTree config = 1;
845 auto config = tag->response.mutable_config();
846 config->set_encoding(encoding);
ec2ac5f2 847
ecf9fb30
QY
848 // Dump data using the requested format.
849 if (data_tree_from_dnode(config, nb_config->dnode,
850 encoding2lyd_format(encoding),
851 with_defaults)
852 != 0) {
853 nb_config_free(nb_config);
854 tag->responder.Finish(
855 tag->response,
856 grpc::Status(grpc::StatusCode::INTERNAL,
857 "Failed to dump data"),
858 tag);
859 tag->state = FINISH;
860 return;
861 }
862
863 nb_config_free(nb_config);
864
865 tag->responder.Finish(tag->response, grpc::Status::OK,
866 tag);
867 tag->state = FINISH;
868 break;
869 }
870 case FINISH:
871 delete tag;
872 }
ec2ac5f2
RW
873 }
874
ecf9fb30
QY
875 void HandleLockConfig(
876 RpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
ec2ac5f2 877 {
ecf9fb30 878 switch (tag->state) {
f6239dc7 879 case CREATE:
ecf9fb30
QY
880 REQUEST_RPC(LockConfig);
881 tag->state = PROCESS;
ecf9fb30 882 case PROCESS: {
0e10aeeb
RW
883 if (nb_dbg_client_grpc)
884 zlog_debug("received RPC LockConfig()");
885
ecf9fb30
QY
886 if (nb_running_lock(NB_CLIENT_GRPC, NULL)) {
887 tag->responder.Finish(
888 tag->response,
889 grpc::Status(
890 grpc::StatusCode::
891 FAILED_PRECONDITION,
892 "running configuration is locked already"),
893 tag);
894 tag->state = FINISH;
895 return;
896 }
897
898 tag->responder.Finish(tag->response, grpc::Status::OK,
899 tag);
900 tag->state = FINISH;
901 break;
902 }
903 case FINISH:
904 delete tag;
905 }
906 }
ec2ac5f2 907
ecf9fb30
QY
908 void HandleUnlockConfig(RpcState<frr::UnlockConfigRequest,
909 frr::UnlockConfigResponse> *tag)
910 {
911 switch (tag->state) {
f6239dc7 912 case CREATE:
ecf9fb30
QY
913 REQUEST_RPC(UnlockConfig);
914 tag->state = PROCESS;
ecf9fb30 915 case PROCESS: {
0e10aeeb
RW
916 if (nb_dbg_client_grpc)
917 zlog_debug("received RPC UnlockConfig()");
918
ecf9fb30
QY
919 if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) {
920 tag->responder.Finish(
921 tag->response,
922 grpc::Status(
923 grpc::StatusCode::
924 FAILED_PRECONDITION,
925 "failed to unlock the running configuration"),
926 tag);
927 tag->state = FINISH;
928 return;
929 }
ec2ac5f2 930
ecf9fb30
QY
931 tag->responder.Finish(tag->response, grpc::Status::OK,
932 tag);
933 tag->state = FINISH;
934 break;
935 }
936 case FINISH:
937 delete tag;
938 }
ec2ac5f2
RW
939 }
940
ecf9fb30
QY
941 void
942 HandleExecute(RpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
ec2ac5f2
RW
943 {
944 struct nb_node *nb_node;
945 struct list *input_list;
946 struct list *output_list;
947 struct listnode *node;
948 struct yang_data *data;
949 const char *xpath;
950
ecf9fb30 951 switch (tag->state) {
f6239dc7 952 case CREATE:
ecf9fb30
QY
953 REQUEST_RPC(Execute);
954 tag->state = PROCESS;
ecf9fb30 955 case PROCESS: {
ecf9fb30
QY
956 // Request: string path = 1;
957 xpath = tag->request.path().c_str();
958
959 if (nb_dbg_client_grpc)
960 zlog_debug("received RPC Execute(path: \"%s\")",
961 xpath);
962
963 if (tag->request.path().empty()) {
964 tag->responder.Finish(
965 tag->response,
966 grpc::Status(grpc::StatusCode::
967 INVALID_ARGUMENT,
968 "Data path is empty"),
969 tag);
970 tag->state = FINISH;
971 return;
972 }
ec2ac5f2 973
ecf9fb30
QY
974 nb_node = nb_node_find(xpath);
975 if (!nb_node) {
976 tag->responder.Finish(
977 tag->response,
978 grpc::Status(grpc::StatusCode::
979 INVALID_ARGUMENT,
980 "Unknown data path"),
981 tag);
982 tag->state = FINISH;
983 return;
984 }
ec2ac5f2 985
ecf9fb30
QY
986 input_list = yang_data_list_new();
987 output_list = yang_data_list_new();
ec2ac5f2 988
ecf9fb30
QY
989 // Read input parameters.
990 auto input = tag->request.input();
991 for (const frr::PathValue &pv : input) {
992 // Request: repeated PathValue input = 2;
993 data = yang_data_new(pv.path().c_str(),
994 pv.value().c_str());
995 listnode_add(input_list, data);
996 }
ec2ac5f2 997
ecf9fb30
QY
998 // Execute callback registered for this XPath.
999 if (nb_callback_rpc(nb_node, xpath, input_list,
1000 output_list)
1001 != NB_OK) {
1002 flog_warn(EC_LIB_NB_CB_RPC,
1003 "%s: rpc callback failed: %s",
1004 __func__, xpath);
1005 list_delete(&input_list);
1006 list_delete(&output_list);
1007
1008 tag->responder.Finish(
1009 tag->response,
1010 grpc::Status(grpc::StatusCode::INTERNAL,
1011 "RPC failed"),
1012 tag);
1013 tag->state = FINISH;
1014 return;
1015 }
ec2ac5f2 1016
ecf9fb30
QY
1017 // Process output parameters.
1018 for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
1019 // Response: repeated PathValue output = 1;
1020 frr::PathValue *pv = tag->response.add_output();
1021 pv->set_path(data->xpath);
1022 pv->set_value(data->value);
1023 }
ec2ac5f2 1024
ecf9fb30 1025 // Release memory.
ec2ac5f2
RW
1026 list_delete(&input_list);
1027 list_delete(&output_list);
ec2ac5f2 1028
ecf9fb30
QY
1029 tag->responder.Finish(tag->response, grpc::Status::OK,
1030 tag);
1031 tag->state = FINISH;
1032 break;
1033 }
1034 case FINISH:
1035 delete tag;
ec2ac5f2 1036 }
ec2ac5f2
RW
1037 }
1038
1039 private:
ecf9fb30
QY
1040 frr::Northbound::AsyncService *_service;
1041 grpc::ServerCompletionQueue *_cq;
1042
ec2ac5f2
RW
1043 struct candidate {
1044 uint32_t id;
1045 struct nb_config *config;
1046 struct nb_transaction *transaction;
1047 };
1048 std::map<uint32_t, struct candidate> _candidates;
1049 uint32_t _nextCandidateId;
1050
1051 static int yang_dnode_edit(struct lyd_node *dnode,
1052 const std::string &path,
1053 const std::string &value)
1054 {
1055 ly_errno = LY_SUCCESS;
1056 dnode = lyd_new_path(dnode, ly_native_ctx, path.c_str(),
1057 (void *)value.c_str(),
1058 (LYD_ANYDATA_VALUETYPE)0,
1059 LYD_PATH_OPT_UPDATE);
1060 if (!dnode && ly_errno != LY_SUCCESS) {
1061 flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed",
1062 __func__);
1063 return -1;
1064 }
1065
1066 return 0;
1067 }
1068
1069 static int yang_dnode_delete(struct lyd_node *dnode,
1070 const std::string &path)
1071 {
1072 dnode = yang_dnode_get(dnode, path.c_str());
1073 if (!dnode)
1074 return -1;
1075
1076 lyd_free(dnode);
1077
1078 return 0;
1079 }
1080
1081 static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
1082 {
1083 switch (encoding) {
1084 case frr::JSON:
1085 return LYD_JSON;
1086 case frr::XML:
1087 return LYD_XML;
07705c8b
RW
1088 default:
1089 flog_err(EC_LIB_DEVELOPMENT,
1090 "%s: unknown data encoding format (%u)",
1091 __func__, encoding);
1092 exit(1);
ec2ac5f2
RW
1093 }
1094 }
1095
1096 static int get_oper_data_cb(const struct lys_node *snode,
1097 struct yang_translator *translator,
1098 struct yang_data *data, void *arg)
1099 {
1100 struct lyd_node *dnode = static_cast<struct lyd_node *>(arg);
1101 int ret = yang_dnode_edit(dnode, data->xpath, data->value);
1102 yang_data_free(data);
1103
1104 return (ret == 0) ? NB_OK : NB_ERR;
1105 }
1106
1107 static void list_transactions_cb(void *arg, int transaction_id,
1108 const char *client_name,
1109 const char *date, const char *comment)
1110 {
ec2ac5f2 1111
ecf9fb30
QY
1112 auto list = static_cast<std::list<std::tuple<
1113 int, std::string, std::string, std::string>> *>(arg);
1114 list->push_back(std::make_tuple(
1115 transaction_id, std::string(client_name),
1116 std::string(date), std::string(comment)));
ec2ac5f2
RW
1117 }
1118
1119 static int data_tree_from_dnode(frr::DataTree *dt,
1120 const struct lyd_node *dnode,
1121 LYD_FORMAT lyd_format,
1122 bool with_defaults)
1123 {
1124 char *strp;
1125 int options = 0;
1126
1127 SET_FLAG(options, LYP_FORMAT | LYP_WITHSIBLINGS);
1128 if (with_defaults)
1129 SET_FLAG(options, LYP_WD_ALL);
1130 else
1131 SET_FLAG(options, LYP_WD_TRIM);
1132
1133 if (lyd_print_mem(&strp, dnode, lyd_format, options) == 0) {
1134 if (strp) {
1135 dt->set_data(strp);
1136 free(strp);
1137 }
1138 return 0;
1139 }
1140
1141 return -1;
1142 }
1143
1144 static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt,
1145 bool config_only)
1146 {
1147 struct lyd_node *dnode;
1148 int options;
1149
1150 if (config_only)
1151 options = LYD_OPT_CONFIG;
1152 else
1153 options = LYD_OPT_DATA | LYD_OPT_DATA_NO_YANGLIB;
1154
1155 dnode = lyd_parse_mem(ly_native_ctx, dt->data().c_str(),
1156 encoding2lyd_format(dt->encoding()),
1157 options);
1158
1159 return dnode;
1160 }
1161
1162 static struct lyd_node *get_dnode_config(const std::string &path)
1163 {
1164 struct lyd_node *dnode;
1165
8685be73
RW
1166 dnode = yang_dnode_get(running_config->dnode,
1167 path.empty() ? NULL : path.c_str());
1168 if (dnode)
1169 dnode = yang_dnode_dup(dnode);
ec2ac5f2
RW
1170
1171 return dnode;
1172 }
1173
1174 static struct lyd_node *get_dnode_state(const std::string &path)
1175 {
1176 struct lyd_node *dnode;
1177
1178 dnode = yang_dnode_new(ly_native_ctx, false);
1179 if (nb_oper_data_iterate(path.c_str(), NULL, 0,
1180 get_oper_data_cb, dnode)
1181 != NB_OK) {
1182 yang_dnode_free(dnode);
1183 return NULL;
1184 }
1185
1186 return dnode;
1187 }
1188
1189 static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
1190 int type, LYD_FORMAT lyd_format,
1191 bool with_defaults)
1192 {
1193 struct lyd_node *dnode_config = NULL;
1194 struct lyd_node *dnode_state = NULL;
1195 struct lyd_node *dnode_final;
1196
1197 // Configuration data.
1198 if (type == frr::GetRequest_DataType_ALL
1199 || type == frr::GetRequest_DataType_CONFIG) {
1200 dnode_config = get_dnode_config(path);
1201 if (!dnode_config)
1202 return grpc::Status(
1203 grpc::StatusCode::INVALID_ARGUMENT,
1204 "Data path not found");
1205 }
1206
1207 // Operational data.
1208 if (type == frr::GetRequest_DataType_ALL
1209 || type == frr::GetRequest_DataType_STATE) {
1210 dnode_state = get_dnode_state(path);
1211 if (!dnode_state) {
1212 if (dnode_config)
1213 yang_dnode_free(dnode_config);
1214 return grpc::Status(
1215 grpc::StatusCode::INVALID_ARGUMENT,
1216 "Failed to fetch operational data");
1217 }
1218 }
1219
1220 switch (type) {
1221 case frr::GetRequest_DataType_ALL:
1222 //
1223 // Combine configuration and state data into a single
1224 // dnode.
1225 //
1226 if (lyd_merge(dnode_state, dnode_config,
1227 LYD_OPT_EXPLICIT)
1228 != 0) {
1229 yang_dnode_free(dnode_state);
1230 yang_dnode_free(dnode_config);
1231 return grpc::Status(
1232 grpc::StatusCode::INTERNAL,
1233 "Failed to merge configuration and state data");
1234 }
1235
1236 dnode_final = dnode_state;
1237 break;
1238 case frr::GetRequest_DataType_CONFIG:
1239 dnode_final = dnode_config;
1240 break;
1241 case frr::GetRequest_DataType_STATE:
1242 dnode_final = dnode_state;
1243 break;
1244 }
1245
1246 // Validate data to create implicit default nodes if necessary.
1247 int validate_opts = 0;
1248 if (type == frr::GetRequest_DataType_CONFIG)
1249 validate_opts = LYD_OPT_CONFIG;
1250 else
1251 validate_opts = LYD_OPT_DATA | LYD_OPT_DATA_NO_YANGLIB;
1252 lyd_validate(&dnode_final, validate_opts, ly_native_ctx);
1253
1254 // Dump data using the requested format.
1255 int ret = data_tree_from_dnode(dt, dnode_final, lyd_format,
1256 with_defaults);
1257 yang_dnode_free(dnode_final);
1258 if (ret != 0)
1259 return grpc::Status(grpc::StatusCode::INTERNAL,
1260 "Failed to dump data");
1261
1262 return grpc::Status::OK;
1263 }
1264
1265 struct candidate *create_candidate(void)
1266 {
1267 uint32_t candidate_id = ++_nextCandidateId;
1268
1269 // Check for overflow.
1270 // TODO: implement an algorithm for unique reusable IDs.
1271 if (candidate_id == 0)
1272 return NULL;
1273
1274 struct candidate *candidate = &_candidates[candidate_id];
1275 candidate->id = candidate_id;
8685be73 1276 candidate->config = nb_config_dup(running_config);
ec2ac5f2
RW
1277 candidate->transaction = NULL;
1278
1279 return candidate;
1280 }
1281
1282 void delete_candidate(struct candidate *candidate)
1283 {
1284 _candidates.erase(candidate->id);
1285 nb_config_free(candidate->config);
1286 if (candidate->transaction)
1287 nb_candidate_commit_abort(candidate->transaction);
1288 }
1289
1290 struct candidate *get_candidate(uint32_t candidate_id)
1291 {
1292 struct candidate *candidate;
1293
1294 if (_candidates.count(candidate_id) == 0)
1295 return NULL;
1296
1297 return &_candidates[candidate_id];
1298 }
1299};
1300
1301static void *grpc_pthread_start(void *arg)
1302{
0edcb505
CS
1303 struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
1304 unsigned long *port = static_cast<unsigned long *>(fpt->data);
ec2ac5f2 1305
0edcb505
CS
1306 frr_pthread_set_name(fpt);
1307
ecf9fb30
QY
1308 NorthboundImpl nb;
1309 nb.Run(*port);
ec2ac5f2
RW
1310
1311 return NULL;
1312}
1313
1314static int frr_grpc_init(unsigned long *port)
1315{
0edcb505
CS
1316 fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
1317 fpt->data = static_cast<void *>(port);
1318
ec2ac5f2 1319 /* Create a pthread for gRPC since it runs its own event loop. */
0edcb505 1320 if (frr_pthread_run(fpt, NULL) < 0) {
ec2ac5f2
RW
1321 flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s",
1322 __func__, safe_strerror(errno));
1323 return -1;
1324 }
0edcb505 1325 pthread_detach(fpt->thread);
ec2ac5f2
RW
1326
1327 return 0;
1328}
1329
1330static int frr_grpc_finish(void)
1331{
0edcb505
CS
1332 if (fpt)
1333 frr_pthread_destroy(fpt);
ec2ac5f2
RW
1334 // TODO: cancel the gRPC pthreads gracefully.
1335
1336 return 0;
1337}
1338
20b0a2ed
QY
1339/*
1340 * This is done this way because module_init and module_late_init are both
1341 * called during daemon pre-fork initialization. Because the GRPC library
1342 * spawns threads internally, we need to delay initializing it until after
1343 * fork. This is done by scheduling this init function as an event task, since
1344 * the event loop doesn't run until after fork.
1345 */
1346static int frr_grpc_module_very_late_init(struct thread *thread)
ec2ac5f2
RW
1347{
1348 static unsigned long port = GRPC_DEFAULT_PORT;
1349 const char *args = THIS_MODULE->load_args;
1350
1351 // Parse port number.
1352 if (args) {
1353 try {
1354 port = std::stoul(args);
1355 if (port < 1024)
1356 throw std::invalid_argument(
1357 "can't use privileged port");
1358 if (port > UINT16_MAX)
1359 throw std::invalid_argument(
1360 "port number is too big");
1361 } catch (std::exception &e) {
1362 flog_err(EC_LIB_GRPC_INIT,
1363 "%s: failed to parse port number: %s",
1364 __func__, e.what());
1365 goto error;
1366 }
1367 }
1368
1369 if (frr_grpc_init(&port) < 0)
1370 goto error;
1371
69ec5832
RW
1372 return 0;
1373
ec2ac5f2
RW
1374error:
1375 flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module");
1376 return -1;
1377}
1378
20b0a2ed
QY
1379static int frr_grpc_module_late_init(struct thread_master *tm)
1380{
1381 thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL);
1382 hook_register(frr_fini, frr_grpc_finish);
1383
1384 return 0;
1385}
1386
ec2ac5f2
RW
1387static int frr_grpc_module_init(void)
1388{
1389 hook_register(frr_late_init, frr_grpc_module_late_init);
1390
1391 return 0;
1392}
1393
1394FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION,
1395 .description = "FRR gRPC northbound module",
1396 .init = frr_grpc_module_init, )