]>
Commit | Line | Data |
---|---|---|
47a3a827 | 1 | // SPDX-License-Identifier: GPL-2.0-or-later |
ec2ac5f2 | 2 | // |
c85ecd64 | 3 | // Copyright (c) 2021-2022, LabN Consulting, L.L.C |
ec2ac5f2 RW |
4 | // Copyright (C) 2019 NetDEF, Inc. |
5 | // Renato Westphal | |
6 | // | |
ec2ac5f2 RW |
7 | |
8 | #include <zebra.h> | |
63d12a7d QY |
9 | #include <grpcpp/grpcpp.h> |
10 | #include "grpc/frr-northbound.grpc.pb.h" | |
ec2ac5f2 RW |
11 | |
12 | #include "log.h" | |
13 | #include "libfrr.h" | |
09781197 | 14 | #include "lib/version.h" |
24a58196 | 15 | #include "frrevent.h" |
ec2ac5f2 RW |
16 | #include "command.h" |
17 | #include "lib_errors.h" | |
18 | #include "northbound.h" | |
19 | #include "northbound_db.h" | |
0edcb505 | 20 | #include "frr_pthread.h" |
ec2ac5f2 RW |
21 | |
22 | #include <iostream> | |
23 | #include <sstream> | |
24 | #include <memory> | |
25 | #include <string> | |
26 | ||
ec2ac5f2 RW |
27 | #define GRPC_DEFAULT_PORT 50051 |
28 | ||
48c93061 CH |
29 | |
30 | // ------------------------------------------------------ | |
31 | // File Local Variables | |
32 | // ------------------------------------------------------ | |
33 | ||
ec2ac5f2 RW |
34 | /* |
35 | * NOTE: we can't use the FRR debugging infrastructure here since it uses | |
36 | * atomics and C++ has a different atomics API. Enable gRPC debugging | |
37 | * unconditionally until we figure out a way to solve this problem. | |
38 | */ | |
b680134e CH |
39 | static bool nb_dbg_client_grpc = 0; |
40 | ||
cd9d0537 | 41 | static struct event_loop *main_master; |
ec2ac5f2 | 42 | |
0edcb505 CS |
43 | static struct frr_pthread *fpt; |
44 | ||
83f6fce7 CH |
45 | static bool grpc_running; |
46 | ||
b680134e CH |
47 | #define grpc_debug(...) \ |
48 | do { \ | |
49 | if (nb_dbg_client_grpc) \ | |
50 | zlog_debug(__VA_ARGS__); \ | |
51 | } while (0) | |
52 | ||
53 | // ------------------------------------------------------ | |
54 | // New Types | |
55 | // ------------------------------------------------------ | |
56 | ||
57 | enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED }; | |
58 | const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"}; | |
59 | ||
60 | struct candidate { | |
61 | uint64_t id; | |
62 | struct nb_config *config; | |
63 | struct nb_transaction *transaction; | |
0edcb505 | 64 | }; |
ec2ac5f2 | 65 | |
b680134e CH |
66 | class Candidates |
67 | { | |
68 | public: | |
69 | ~Candidates(void) | |
70 | { | |
71 | // Delete candidates. | |
72 | for (auto it = _cdb.begin(); it != _cdb.end(); it++) | |
d3074a52 | 73 | delete_candidate(it->first); |
b680134e CH |
74 | } |
75 | ||
76 | struct candidate *create_candidate(void) | |
77 | { | |
78 | uint64_t id = ++_next_id; | |
79 | assert(id); // TODO: implement an algorithm for unique reusable | |
80 | // IDs. | |
81 | struct candidate *c = &_cdb[id]; | |
82 | c->id = id; | |
83 | c->config = nb_config_dup(running_config); | |
84 | c->transaction = NULL; | |
85 | ||
86 | return c; | |
87 | } | |
88 | ||
d3074a52 | 89 | bool contains(uint64_t candidate_id) |
b680134e | 90 | { |
d3074a52 CH |
91 | return _cdb.count(candidate_id) > 0; |
92 | } | |
93 | ||
94 | void delete_candidate(uint64_t candidate_id) | |
95 | { | |
96 | struct candidate *c = &_cdb[candidate_id]; | |
b680134e CH |
97 | char errmsg[BUFSIZ] = {0}; |
98 | ||
b680134e CH |
99 | nb_config_free(c->config); |
100 | if (c->transaction) | |
101 | nb_candidate_commit_abort(c->transaction, errmsg, | |
102 | sizeof(errmsg)); | |
96d434f8 | 103 | _cdb.erase(c->id); |
b680134e CH |
104 | } |
105 | ||
d3074a52 | 106 | struct candidate *get_candidate(uint64_t id) |
b680134e CH |
107 | { |
108 | return _cdb.count(id) == 0 ? NULL : &_cdb[id]; | |
109 | } | |
110 | ||
111 | private: | |
112 | uint64_t _next_id = 0; | |
d3074a52 | 113 | std::map<uint64_t, struct candidate> _cdb; |
b680134e | 114 | }; |
ecf9fb30 | 115 | |
48c93061 CH |
116 | /* |
117 | * RpcStateBase is the common base class used to track a gRPC RPC. | |
118 | */ | |
ecf9fb30 QY |
119 | class RpcStateBase |
120 | { | |
121 | public: | |
b680134e | 122 | virtual void do_request(::frr::Northbound::AsyncService *service, |
83f6fce7 CH |
123 | ::grpc::ServerCompletionQueue *cq, |
124 | bool no_copy) = 0; | |
ecf9fb30 | 125 | |
48c93061 | 126 | RpcStateBase(const char *name) : name(name){}; |
b680134e | 127 | |
48c93061 CH |
128 | virtual ~RpcStateBase() = default; |
129 | ||
130 | CallState get_state() const | |
131 | { | |
132 | return state; | |
133 | } | |
134 | ||
135 | bool is_initial_process() const | |
136 | { | |
137 | /* Will always be true for Unary */ | |
138 | return entered_state == CREATE; | |
139 | } | |
140 | ||
141 | // Returns "more" status, if false caller can delete | |
142 | bool run(frr::Northbound::AsyncService *service, | |
143 | grpc::ServerCompletionQueue *cq) | |
b680134e | 144 | { |
b680134e | 145 | /* |
48c93061 CH |
146 | * We enter in either CREATE or MORE state, and transition to |
147 | * PROCESS state. | |
148 | */ | |
149 | this->entered_state = this->state; | |
150 | this->state = PROCESS; | |
151 | grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name, | |
152 | call_states[this->entered_state], | |
153 | call_states[this->state]); | |
154 | /* | |
155 | * We schedule the callback on the main pthread, and wait for | |
156 | * the state to transition out of the PROCESS state. The new | |
157 | * state will either be MORE or FINISH. It will always be FINISH | |
158 | * for Unary RPCs. | |
b680134e | 159 | */ |
907a2395 | 160 | event_add_event(main_master, c_callback, (void *)this, 0, NULL); |
48c93061 | 161 | |
b680134e | 162 | pthread_mutex_lock(&this->cmux); |
48c93061 | 163 | while (this->state == PROCESS) |
b680134e CH |
164 | pthread_cond_wait(&this->cond, &this->cmux); |
165 | pthread_mutex_unlock(&this->cmux); | |
166 | ||
48c93061 CH |
167 | grpc_debug("%s RPC in %s on grpc-io-thread", name, |
168 | call_states[this->state]); | |
169 | ||
170 | if (this->state == FINISH) { | |
171 | /* | |
172 | * Server is done (FINISH) so prep to receive a new | |
173 | * request of this type. We could do this earlier but | |
174 | * that would mean we could be handling multiple same | |
175 | * type requests in parallel without limit. | |
176 | */ | |
177 | this->do_request(service, cq, false); | |
b680134e | 178 | } |
48c93061 | 179 | return true; |
b680134e CH |
180 | } |
181 | ||
48c93061 | 182 | protected: |
e6685141 | 183 | virtual CallState run_mainthread(struct event *thread) = 0; |
b680134e | 184 | |
e6685141 | 185 | static void c_callback(struct event *thread) |
b680134e | 186 | { |
e16d030c | 187 | auto _tag = static_cast<RpcStateBase *>(EVENT_ARG(thread)); |
b680134e CH |
188 | /* |
189 | * We hold the lock until the callback finishes and has updated | |
190 | * _tag->state, then we signal done and release. | |
191 | */ | |
192 | pthread_mutex_lock(&_tag->cmux); | |
193 | ||
194 | CallState enter_state = _tag->state; | |
48c93061 CH |
195 | grpc_debug("%s RPC: running %s on main thread", _tag->name, |
196 | call_states[enter_state]); | |
b680134e | 197 | |
48c93061 | 198 | _tag->state = _tag->run_mainthread(thread); |
b680134e | 199 | |
48c93061 | 200 | grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name, |
b680134e CH |
201 | call_states[enter_state], call_states[_tag->state]); |
202 | ||
203 | pthread_cond_signal(&_tag->cond); | |
204 | pthread_mutex_unlock(&_tag->cmux); | |
cc9f21da | 205 | return; |
ecf9fb30 QY |
206 | } |
207 | ||
208 | grpc::ServerContext ctx; | |
48c93061 CH |
209 | pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; |
210 | pthread_cond_t cond = PTHREAD_COND_INITIALIZER; | |
211 | CallState state = CREATE; | |
ad6bd536 | 212 | CallState entered_state = CREATE; |
48c93061 CH |
213 | |
214 | public: | |
215 | const char *name; | |
216 | }; | |
217 | ||
218 | /* | |
219 | * The UnaryRpcState class is used to track the execution of a Unary RPC. | |
220 | * | |
221 | * Template Args: | |
222 | * Q - the request type for a given unary RPC | |
223 | * S - the response type for a given unary RPC | |
224 | */ | |
225 | template <typename Q, typename S> class UnaryRpcState : public RpcStateBase | |
226 | { | |
227 | public: | |
228 | typedef void (frr::Northbound::AsyncService::*reqfunc_t)( | |
229 | ::grpc::ServerContext *, Q *, | |
230 | ::grpc::ServerAsyncResponseWriter<S> *, | |
231 | ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, | |
232 | void *); | |
233 | ||
234 | UnaryRpcState(Candidates *cdb, reqfunc_t rfunc, | |
235 | grpc::Status (*cb)(UnaryRpcState<Q, S> *), | |
236 | const char *name) | |
237 | : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb), | |
238 | responder(&ctx){}; | |
239 | ||
240 | void do_request(::frr::Northbound::AsyncService *service, | |
241 | ::grpc::ServerCompletionQueue *cq, | |
242 | bool no_copy) override | |
243 | { | |
244 | grpc_debug("%s, posting a request for: %s", __func__, name); | |
245 | auto copy = no_copy ? this | |
246 | : new UnaryRpcState(cdb, requestf, callback, | |
247 | name); | |
248 | (service->*requestf)(©->ctx, ©->request, | |
249 | ©->responder, cq, cq, copy); | |
250 | } | |
251 | ||
e6685141 | 252 | CallState run_mainthread(struct event *thread) override |
48c93061 CH |
253 | { |
254 | // Unary RPC are always finished, see "Unary" :) | |
255 | grpc::Status status = this->callback(this); | |
256 | responder.Finish(response, status, this); | |
257 | return FINISH; | |
258 | } | |
259 | ||
260 | Candidates *cdb; | |
261 | ||
ecf9fb30 QY |
262 | Q request; |
263 | S response; | |
264 | grpc::ServerAsyncResponseWriter<S> responder; | |
ecf9fb30 | 265 | |
48c93061 | 266 | grpc::Status (*callback)(UnaryRpcState<Q, S> *); |
c85ecd64 | 267 | reqfunc_t requestf = NULL; |
48c93061 | 268 | }; |
ecf9fb30 | 269 | |
48c93061 CH |
270 | /* |
271 | * The StreamRpcState class is used to track the execution of a Streaming RPC. | |
272 | * | |
273 | * Template Args: | |
274 | * Q - the request type for a given streaming RPC | |
275 | * S - the response type for a given streaming RPC | |
276 | * X - the type used to track the streaming state | |
277 | */ | |
278 | template <typename Q, typename S, typename X> | |
279 | class StreamRpcState : public RpcStateBase | |
280 | { | |
281 | public: | |
282 | typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( | |
283 | ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *, | |
284 | ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, | |
285 | void *); | |
b680134e | 286 | |
48c93061 CH |
287 | StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *), |
288 | const char *name) | |
289 | : RpcStateBase(name), requestsf(rfunc), callback(cb), | |
290 | async_responder(&ctx){}; | |
291 | ||
292 | void do_request(::frr::Northbound::AsyncService *service, | |
293 | ::grpc::ServerCompletionQueue *cq, | |
294 | bool no_copy) override | |
295 | { | |
296 | grpc_debug("%s, posting a request for: %s", __func__, name); | |
297 | auto copy = | |
298 | no_copy ? this | |
299 | : new StreamRpcState(requestsf, callback, name); | |
300 | (service->*requestsf)(©->ctx, ©->request, | |
301 | ©->async_responder, cq, cq, copy); | |
302 | } | |
303 | ||
e6685141 | 304 | CallState run_mainthread(struct event *thread) override |
48c93061 CH |
305 | { |
306 | if (this->callback(this)) | |
307 | return MORE; | |
308 | else | |
309 | return FINISH; | |
310 | } | |
311 | ||
312 | Q request; | |
313 | S response; | |
314 | grpc::ServerAsyncWriter<S> async_responder; | |
315 | ||
316 | bool (*callback)(StreamRpcState<Q, S, X> *); | |
317 | reqsfunc_t requestsf = NULL; | |
318 | ||
319 | X context; | |
ecf9fb30 QY |
320 | }; |
321 | ||
b680134e CH |
322 | // ------------------------------------------------------ |
323 | // Utility Functions | |
324 | // ------------------------------------------------------ | |
ecf9fb30 | 325 | |
b680134e CH |
326 | static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding) |
327 | { | |
328 | switch (encoding) { | |
329 | case frr::JSON: | |
330 | return LYD_JSON; | |
331 | case frr::XML: | |
332 | return LYD_XML; | |
333 | default: | |
334 | flog_err(EC_LIB_DEVELOPMENT, | |
335 | "%s: unknown data encoding format (%u)", __func__, | |
336 | encoding); | |
337 | exit(1); | |
338 | } | |
339 | } | |
ecf9fb30 | 340 | |
b680134e | 341 | static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path, |
fe095adc | 342 | const char *value) |
ec2ac5f2 | 343 | { |
fe095adc CH |
344 | LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value, |
345 | LYD_NEW_PATH_UPDATE, &dnode); | |
b680134e CH |
346 | if (err != LY_SUCCESS) { |
347 | flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s", | |
348 | __func__, ly_errmsg(ly_native_ctx)); | |
349 | return -1; | |
ec2ac5f2 RW |
350 | } |
351 | ||
b680134e CH |
352 | return 0; |
353 | } | |
354 | ||
355 | static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path) | |
356 | { | |
357 | dnode = yang_dnode_get(dnode, path.c_str()); | |
358 | if (!dnode) | |
359 | return -1; | |
360 | ||
361 | lyd_free_tree(dnode); | |
362 | ||
363 | return 0; | |
364 | } | |
365 | ||
366 | static LY_ERR data_tree_from_dnode(frr::DataTree *dt, | |
367 | const struct lyd_node *dnode, | |
368 | LYD_FORMAT lyd_format, bool with_defaults) | |
369 | { | |
370 | char *strp; | |
371 | int options = 0; | |
372 | ||
373 | SET_FLAG(options, LYD_PRINT_WITHSIBLINGS); | |
374 | if (with_defaults) | |
375 | SET_FLAG(options, LYD_PRINT_WD_ALL); | |
376 | else | |
377 | SET_FLAG(options, LYD_PRINT_WD_TRIM); | |
378 | ||
379 | LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options); | |
380 | if (err == LY_SUCCESS) { | |
381 | if (strp) { | |
382 | dt->set_data(strp); | |
383 | free(strp); | |
384 | } | |
ec2ac5f2 | 385 | } |
b680134e CH |
386 | return err; |
387 | } | |
ec2ac5f2 | 388 | |
b680134e CH |
389 | static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt, |
390 | bool config_only) | |
391 | { | |
392 | struct lyd_node *dnode; | |
393 | int options, opt2; | |
394 | LY_ERR err; | |
395 | ||
396 | if (config_only) { | |
397 | options = LYD_PARSE_NO_STATE; | |
398 | opt2 = LYD_VALIDATE_NO_STATE; | |
399 | } else { | |
400 | options = LYD_PARSE_STRICT; | |
401 | opt2 = 0; | |
402 | } | |
403 | ||
404 | err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(), | |
405 | encoding2lyd_format(dt->encoding()), options, | |
406 | opt2, &dnode); | |
407 | if (err != LY_SUCCESS) { | |
408 | flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s", | |
409 | __func__, ly_errmsg(ly_native_ctx)); | |
410 | } | |
411 | return dnode; | |
412 | } | |
413 | ||
414 | static struct lyd_node *get_dnode_config(const std::string &path) | |
415 | { | |
416 | struct lyd_node *dnode; | |
417 | ||
0f538858 RZ |
418 | if (!yang_dnode_exists(running_config->dnode, |
419 | path.empty() ? NULL : path.c_str())) | |
420 | return NULL; | |
421 | ||
b680134e CH |
422 | dnode = yang_dnode_get(running_config->dnode, |
423 | path.empty() ? NULL : path.c_str()); | |
424 | if (dnode) | |
425 | dnode = yang_dnode_dup(dnode); | |
426 | ||
427 | return dnode; | |
428 | } | |
429 | ||
430 | static int get_oper_data_cb(const struct lysc_node *snode, | |
431 | struct yang_translator *translator, | |
432 | struct yang_data *data, void *arg) | |
433 | { | |
434 | struct lyd_node *dnode = static_cast<struct lyd_node *>(arg); | |
435 | int ret = yang_dnode_edit(dnode, data->xpath, data->value); | |
436 | yang_data_free(data); | |
437 | ||
438 | return (ret == 0) ? NB_OK : NB_ERR; | |
439 | } | |
440 | ||
441 | static struct lyd_node *get_dnode_state(const std::string &path) | |
442 | { | |
443 | struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false); | |
444 | if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode) | |
445 | != NB_OK) { | |
446 | yang_dnode_free(dnode); | |
447 | return NULL; | |
448 | } | |
449 | ||
450 | return dnode; | |
451 | } | |
452 | ||
453 | static grpc::Status get_path(frr::DataTree *dt, const std::string &path, | |
454 | int type, LYD_FORMAT lyd_format, | |
455 | bool with_defaults) | |
456 | { | |
457 | struct lyd_node *dnode_config = NULL; | |
458 | struct lyd_node *dnode_state = NULL; | |
459 | struct lyd_node *dnode_final; | |
460 | ||
461 | // Configuration data. | |
462 | if (type == frr::GetRequest_DataType_ALL | |
463 | || type == frr::GetRequest_DataType_CONFIG) { | |
464 | dnode_config = get_dnode_config(path); | |
465 | if (!dnode_config) | |
466 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
467 | "Data path not found"); | |
468 | } | |
469 | ||
470 | // Operational data. | |
471 | if (type == frr::GetRequest_DataType_ALL | |
472 | || type == frr::GetRequest_DataType_STATE) { | |
473 | dnode_state = get_dnode_state(path); | |
474 | if (!dnode_state) { | |
475 | if (dnode_config) | |
476 | yang_dnode_free(dnode_config); | |
477 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
478 | "Failed to fetch operational data"); | |
479 | } | |
480 | } | |
481 | ||
482 | switch (type) { | |
483 | case frr::GetRequest_DataType_ALL: | |
484 | // | |
485 | // Combine configuration and state data into a single | |
486 | // dnode. | |
487 | // | |
488 | if (lyd_merge_siblings(&dnode_state, dnode_config, | |
489 | LYD_MERGE_DESTRUCT) | |
490 | != LY_SUCCESS) { | |
491 | yang_dnode_free(dnode_state); | |
492 | yang_dnode_free(dnode_config); | |
493 | return grpc::Status( | |
494 | grpc::StatusCode::INTERNAL, | |
495 | "Failed to merge configuration and state data", | |
496 | ly_errmsg(ly_native_ctx)); | |
ecf9fb30 | 497 | } |
b680134e CH |
498 | |
499 | dnode_final = dnode_state; | |
500 | break; | |
501 | case frr::GetRequest_DataType_CONFIG: | |
502 | dnode_final = dnode_config; | |
503 | break; | |
504 | case frr::GetRequest_DataType_STATE: | |
505 | dnode_final = dnode_state; | |
506 | break; | |
ecf9fb30 QY |
507 | } |
508 | ||
b680134e CH |
509 | // Validate data to create implicit default nodes if necessary. |
510 | int validate_opts = 0; | |
511 | if (type == frr::GetRequest_DataType_CONFIG) | |
512 | validate_opts = LYD_VALIDATE_NO_STATE; | |
513 | else | |
514 | validate_opts = 0; | |
515 | ||
516 | LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx, | |
517 | validate_opts, NULL); | |
518 | ||
519 | if (err) | |
520 | flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s", | |
521 | __func__, ly_errmsg(ly_native_ctx)); | |
522 | // Dump data using the requested format. | |
523 | if (!err) | |
524 | err = data_tree_from_dnode(dt, dnode_final, lyd_format, | |
525 | with_defaults); | |
526 | yang_dnode_free(dnode_final); | |
527 | if (err) | |
528 | return grpc::Status(grpc::StatusCode::INTERNAL, | |
529 | "Failed to dump data"); | |
530 | return grpc::Status::OK; | |
531 | } | |
532 | ||
533 | ||
534 | // ------------------------------------------------------ | |
535 | // RPC Callback Functions: run on main thread | |
536 | // ------------------------------------------------------ | |
537 | ||
48c93061 CH |
538 | grpc::Status HandleUnaryGetCapabilities( |
539 | UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse> | |
540 | *tag) | |
b680134e | 541 | { |
48c93061 | 542 | grpc_debug("%s: entered", __func__); |
b680134e CH |
543 | |
544 | // Response: string frr_version = 1; | |
545 | tag->response.set_frr_version(FRR_VERSION); | |
546 | ||
547 | // Response: bool rollback_support = 2; | |
ec2ac5f2 | 548 | #ifdef HAVE_CONFIG_ROLLBACKS |
b680134e | 549 | tag->response.set_rollback_support(true); |
ec2ac5f2 | 550 | #else |
b680134e | 551 | tag->response.set_rollback_support(false); |
ec2ac5f2 | 552 | #endif |
b680134e CH |
553 | // Response: repeated ModuleData supported_modules = 3; |
554 | struct yang_module *module; | |
555 | RB_FOREACH (module, yang_modules, &yang_modules) { | |
556 | auto m = tag->response.add_supported_modules(); | |
557 | ||
558 | m->set_name(module->name); | |
559 | if (module->info->revision) | |
560 | m->set_revision(module->info->revision); | |
561 | m->set_organization(module->info->org); | |
562 | } | |
ec2ac5f2 | 563 | |
b680134e CH |
564 | // Response: repeated Encoding supported_encodings = 4; |
565 | tag->response.add_supported_encodings(frr::JSON); | |
566 | tag->response.add_supported_encodings(frr::XML); | |
ec2ac5f2 | 567 | |
48c93061 | 568 | return grpc::Status::OK; |
b680134e | 569 | } |
ec2ac5f2 | 570 | |
48c93061 CH |
571 | // Define the context variable type for this streaming handler |
572 | typedef std::list<std::string> GetContextType; | |
b680134e | 573 | |
48c93061 CH |
574 | bool HandleStreamingGet( |
575 | StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag) | |
576 | { | |
577 | grpc_debug("%s: entered", __func__); | |
ec2ac5f2 | 578 | |
48c93061 CH |
579 | auto mypathps = &tag->context; |
580 | if (tag->is_initial_process()) { | |
581 | // Fill our context container first time through | |
582 | grpc_debug("%s: initialize streaming state", __func__); | |
b680134e CH |
583 | auto paths = tag->request.path(); |
584 | for (const std::string &path : paths) { | |
48c93061 | 585 | mypathps->push_back(std::string(path)); |
ecf9fb30 | 586 | } |
b680134e | 587 | } |
ecf9fb30 | 588 | |
b680134e CH |
589 | // Request: DataType type = 1; |
590 | int type = tag->request.type(); | |
591 | // Request: Encoding encoding = 2; | |
592 | frr::Encoding encoding = tag->request.encoding(); | |
593 | // Request: bool with_defaults = 3; | |
594 | bool with_defaults = tag->request.with_defaults(); | |
595 | ||
b680134e CH |
596 | if (mypathps->empty()) { |
597 | tag->async_responder.Finish(grpc::Status::OK, tag); | |
48c93061 | 598 | return false; |
ec2ac5f2 RW |
599 | } |
600 | ||
b680134e CH |
601 | frr::GetResponse response; |
602 | grpc::Status status; | |
603 | ||
604 | // Response: int64 timestamp = 1; | |
605 | response.set_timestamp(time(NULL)); | |
606 | ||
607 | // Response: DataTree data = 2; | |
608 | auto *data = response.mutable_data(); | |
609 | data->set_encoding(tag->request.encoding()); | |
610 | status = get_path(data, mypathps->back().c_str(), type, | |
611 | encoding2lyd_format(encoding), with_defaults); | |
612 | ||
613 | if (!status.ok()) { | |
614 | tag->async_responder.WriteAndFinish( | |
615 | response, grpc::WriteOptions(), status, tag); | |
48c93061 | 616 | return false; |
b680134e CH |
617 | } |
618 | ||
619 | mypathps->pop_back(); | |
620 | if (mypathps->empty()) { | |
621 | tag->async_responder.WriteAndFinish( | |
622 | response, grpc::WriteOptions(), grpc::Status::OK, tag); | |
48c93061 | 623 | return false; |
b680134e CH |
624 | } else { |
625 | tag->async_responder.Write(response, tag); | |
48c93061 | 626 | return true; |
b680134e CH |
627 | } |
628 | } | |
629 | ||
48c93061 CH |
630 | grpc::Status HandleUnaryCreateCandidate( |
631 | UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse> | |
632 | *tag) | |
b680134e | 633 | { |
48c93061 | 634 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 635 | |
b680134e | 636 | struct candidate *candidate = tag->cdb->create_candidate(); |
48c93061 CH |
637 | if (!candidate) |
638 | return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, | |
639 | "Can't create candidate configuration"); | |
640 | tag->response.set_candidate_id(candidate->id); | |
641 | return grpc::Status::OK; | |
b680134e CH |
642 | } |
643 | ||
48c93061 CH |
644 | grpc::Status HandleUnaryDeleteCandidate( |
645 | UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse> | |
646 | *tag) | |
b680134e | 647 | { |
48c93061 | 648 | grpc_debug("%s: entered", __func__); |
b680134e | 649 | |
b680134e CH |
650 | uint32_t candidate_id = tag->request.candidate_id(); |
651 | ||
652 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
653 | ||
48c93061 CH |
654 | if (!tag->cdb->contains(candidate_id)) |
655 | return grpc::Status(grpc::StatusCode::NOT_FOUND, | |
656 | "candidate configuration not found"); | |
657 | tag->cdb->delete_candidate(candidate_id); | |
658 | return grpc::Status::OK; | |
b680134e CH |
659 | } |
660 | ||
48c93061 CH |
661 | grpc::Status HandleUnaryUpdateCandidate( |
662 | UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse> | |
663 | *tag) | |
b680134e | 664 | { |
48c93061 | 665 | grpc_debug("%s: entered", __func__); |
ecf9fb30 | 666 | |
b680134e CH |
667 | uint32_t candidate_id = tag->request.candidate_id(); |
668 | ||
669 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
670 | ||
671 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); | |
672 | ||
673 | if (!candidate) | |
48c93061 CH |
674 | return grpc::Status(grpc::StatusCode::NOT_FOUND, |
675 | "candidate configuration not found"); | |
676 | if (candidate->transaction) | |
677 | return grpc::Status( | |
678 | grpc::StatusCode::FAILED_PRECONDITION, | |
679 | "candidate is in the middle of a transaction"); | |
680 | if (nb_candidate_update(candidate->config) != NB_OK) | |
681 | return grpc::Status(grpc::StatusCode::INTERNAL, | |
682 | "failed to update candidate configuration"); | |
b680134e | 683 | |
48c93061 | 684 | return grpc::Status::OK; |
b680134e CH |
685 | } |
686 | ||
48c93061 CH |
687 | grpc::Status HandleUnaryEditCandidate( |
688 | UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> | |
689 | *tag) | |
b680134e | 690 | { |
48c93061 | 691 | grpc_debug("%s: entered", __func__); |
b680134e | 692 | |
b680134e | 693 | uint32_t candidate_id = tag->request.candidate_id(); |
ec2ac5f2 | 694 | |
b680134e | 695 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); |
ec2ac5f2 | 696 | |
b680134e | 697 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); |
48c93061 CH |
698 | if (!candidate) |
699 | return grpc::Status(grpc::StatusCode::NOT_FOUND, | |
700 | "candidate configuration not found"); | |
ec2ac5f2 | 701 | |
b680134e CH |
702 | struct nb_config *candidate_tmp = nb_config_dup(candidate->config); |
703 | ||
704 | auto pvs = tag->request.update(); | |
705 | for (const frr::PathValue &pv : pvs) { | |
fe095adc CH |
706 | if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), |
707 | pv.value().c_str()) != 0) { | |
b680134e CH |
708 | nb_config_free(candidate_tmp); |
709 | ||
48c93061 CH |
710 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, |
711 | "Failed to update \"" + pv.path() + | |
712 | "\""); | |
ecf9fb30 | 713 | } |
ec2ac5f2 RW |
714 | } |
715 | ||
b680134e CH |
716 | pvs = tag->request.delete_(); |
717 | for (const frr::PathValue &pv : pvs) { | |
718 | if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) { | |
719 | nb_config_free(candidate_tmp); | |
48c93061 CH |
720 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, |
721 | "Failed to remove \"" + pv.path() + | |
722 | "\""); | |
ec2ac5f2 | 723 | } |
ec2ac5f2 RW |
724 | } |
725 | ||
b680134e CH |
726 | // No errors, accept all changes. |
727 | nb_config_replace(candidate->config, candidate_tmp, false); | |
48c93061 | 728 | return grpc::Status::OK; |
b680134e CH |
729 | } |
730 | ||
48c93061 CH |
731 | grpc::Status HandleUnaryLoadToCandidate( |
732 | UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse> | |
733 | *tag) | |
b680134e | 734 | { |
48c93061 | 735 | grpc_debug("%s: entered", __func__); |
b680134e | 736 | |
b680134e CH |
737 | uint32_t candidate_id = tag->request.candidate_id(); |
738 | ||
739 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
740 | ||
741 | // Request: LoadType type = 2; | |
742 | int load_type = tag->request.type(); | |
743 | // Request: DataTree config = 3; | |
744 | auto config = tag->request.config(); | |
745 | ||
b680134e | 746 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); |
48c93061 CH |
747 | if (!candidate) |
748 | return grpc::Status(grpc::StatusCode::NOT_FOUND, | |
749 | "candidate configuration not found"); | |
ec2ac5f2 | 750 | |
b680134e | 751 | struct lyd_node *dnode = dnode_from_data_tree(&config, true); |
48c93061 CH |
752 | if (!dnode) |
753 | return grpc::Status(grpc::StatusCode::INTERNAL, | |
754 | "Failed to parse the configuration"); | |
ec2ac5f2 | 755 | |
b680134e | 756 | struct nb_config *loaded_config = nb_config_new(dnode); |
b680134e CH |
757 | if (load_type == frr::LoadToCandidateRequest::REPLACE) |
758 | nb_config_replace(candidate->config, loaded_config, false); | |
48c93061 CH |
759 | else if (nb_config_merge(candidate->config, loaded_config, false) != |
760 | NB_OK) | |
761 | return grpc::Status(grpc::StatusCode::INTERNAL, | |
762 | "Failed to merge the loaded configuration"); | |
ec2ac5f2 | 763 | |
48c93061 | 764 | return grpc::Status::OK; |
b680134e CH |
765 | } |
766 | ||
48c93061 CH |
767 | grpc::Status |
768 | HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag) | |
b680134e | 769 | { |
48c93061 | 770 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 771 | |
b680134e CH |
772 | // Request: uint32 candidate_id = 1; |
773 | uint32_t candidate_id = tag->request.candidate_id(); | |
774 | ||
775 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
776 | ||
777 | // Request: Phase phase = 2; | |
778 | int phase = tag->request.phase(); | |
779 | // Request: string comment = 3; | |
780 | const std::string comment = tag->request.comment(); | |
781 | ||
782 | // Find candidate configuration. | |
783 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); | |
48c93061 CH |
784 | if (!candidate) |
785 | return grpc::Status(grpc::StatusCode::NOT_FOUND, | |
786 | "candidate configuration not found"); | |
b680134e CH |
787 | |
788 | int ret = NB_OK; | |
789 | uint32_t transaction_id = 0; | |
790 | ||
791 | // Check for misuse of the two-phase commit protocol. | |
792 | switch (phase) { | |
793 | case frr::CommitRequest::PREPARE: | |
794 | case frr::CommitRequest::ALL: | |
48c93061 CH |
795 | if (candidate->transaction) |
796 | return grpc::Status( | |
797 | grpc::StatusCode::FAILED_PRECONDITION, | |
798 | "candidate is in the middle of a transaction"); | |
b680134e CH |
799 | break; |
800 | case frr::CommitRequest::ABORT: | |
801 | case frr::CommitRequest::APPLY: | |
48c93061 CH |
802 | if (!candidate->transaction) |
803 | return grpc::Status( | |
804 | grpc::StatusCode::FAILED_PRECONDITION, | |
805 | "no transaction in progress"); | |
b680134e CH |
806 | break; |
807 | default: | |
808 | break; | |
ec2ac5f2 RW |
809 | } |
810 | ||
ecf9fb30 | 811 | |
b680134e CH |
812 | // Execute the user request. |
813 | struct nb_context context = {}; | |
814 | context.client = NB_CLIENT_GRPC; | |
815 | char errmsg[BUFSIZ] = {0}; | |
816 | ||
817 | switch (phase) { | |
818 | case frr::CommitRequest::VALIDATE: | |
819 | grpc_debug("`-> Performing VALIDATE"); | |
820 | ret = nb_candidate_validate(&context, candidate->config, errmsg, | |
821 | sizeof(errmsg)); | |
822 | break; | |
823 | case frr::CommitRequest::PREPARE: | |
824 | grpc_debug("`-> Performing PREPARE"); | |
825 | ret = nb_candidate_commit_prepare( | |
41ef7327 | 826 | context, candidate->config, comment.c_str(), |
7d65b7b7 CH |
827 | &candidate->transaction, false, false, errmsg, |
828 | sizeof(errmsg)); | |
b680134e CH |
829 | break; |
830 | case frr::CommitRequest::ABORT: | |
831 | grpc_debug("`-> Performing ABORT"); | |
832 | nb_candidate_commit_abort(candidate->transaction, errmsg, | |
833 | sizeof(errmsg)); | |
834 | break; | |
835 | case frr::CommitRequest::APPLY: | |
836 | grpc_debug("`-> Performing APPLY"); | |
837 | nb_candidate_commit_apply(candidate->transaction, true, | |
838 | &transaction_id, errmsg, | |
839 | sizeof(errmsg)); | |
840 | break; | |
841 | case frr::CommitRequest::ALL: | |
842 | grpc_debug("`-> Performing ALL"); | |
41ef7327 | 843 | ret = nb_candidate_commit(context, candidate->config, true, |
b680134e CH |
844 | comment.c_str(), &transaction_id, |
845 | errmsg, sizeof(errmsg)); | |
846 | break; | |
847 | } | |
ec2ac5f2 | 848 | |
b680134e CH |
849 | // Map northbound error codes to gRPC status codes. |
850 | grpc::Status status; | |
851 | switch (ret) { | |
852 | case NB_OK: | |
853 | status = grpc::Status::OK; | |
854 | break; | |
855 | case NB_ERR_NO_CHANGES: | |
856 | status = grpc::Status(grpc::StatusCode::ABORTED, errmsg); | |
857 | break; | |
858 | case NB_ERR_LOCKED: | |
859 | status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg); | |
860 | break; | |
861 | case NB_ERR_VALIDATION: | |
862 | status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
863 | errmsg); | |
864 | break; | |
865 | case NB_ERR_RESOURCE: | |
866 | status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, | |
867 | errmsg); | |
868 | break; | |
869 | case NB_ERR: | |
870 | default: | |
871 | status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg); | |
872 | break; | |
873 | } | |
ec2ac5f2 | 874 | |
b680134e CH |
875 | grpc_debug("`-> Result: %s (message: '%s')", |
876 | nb_err_name((enum nb_error)ret), errmsg); | |
877 | ||
878 | if (ret == NB_OK) { | |
879 | // Response: uint32 transaction_id = 1; | |
880 | if (transaction_id) | |
881 | tag->response.set_transaction_id(transaction_id); | |
ec2ac5f2 | 882 | } |
b680134e CH |
883 | if (strlen(errmsg) > 0) |
884 | tag->response.set_error_message(errmsg); | |
ec2ac5f2 | 885 | |
48c93061 | 886 | return status; |
b680134e | 887 | } |
ec2ac5f2 | 888 | |
48c93061 CH |
889 | grpc::Status HandleUnaryLockConfig( |
890 | UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) | |
b680134e | 891 | { |
48c93061 | 892 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 893 | |
48c93061 CH |
894 | if (nb_running_lock(NB_CLIENT_GRPC, NULL)) |
895 | return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, | |
896 | "running configuration is locked already"); | |
897 | return grpc::Status::OK; | |
b680134e | 898 | } |
ec2ac5f2 | 899 | |
48c93061 CH |
900 | grpc::Status HandleUnaryUnlockConfig( |
901 | UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag) | |
b680134e | 902 | { |
48c93061 | 903 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 904 | |
48c93061 CH |
905 | if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) |
906 | return grpc::Status( | |
907 | grpc::StatusCode::FAILED_PRECONDITION, | |
908 | "failed to unlock the running configuration"); | |
909 | return grpc::Status::OK; | |
b680134e | 910 | } |
ec2ac5f2 | 911 | |
b680134e CH |
912 | static void list_transactions_cb(void *arg, int transaction_id, |
913 | const char *client_name, const char *date, | |
914 | const char *comment) | |
915 | { | |
916 | auto list = static_cast<std::list< | |
917 | std::tuple<int, std::string, std::string, std::string>> *>(arg); | |
918 | list->push_back( | |
919 | std::make_tuple(transaction_id, std::string(client_name), | |
920 | std::string(date), std::string(comment))); | |
921 | } | |
922 | ||
48c93061 CH |
923 | // Define the context variable type for this streaming handler |
924 | typedef std::list<std::tuple<int, std::string, std::string, std::string>> | |
925 | ListTransactionsContextType; | |
b680134e | 926 | |
48c93061 CH |
927 | bool HandleStreamingListTransactions( |
928 | StreamRpcState<frr::ListTransactionsRequest, | |
929 | frr::ListTransactionsResponse, | |
930 | ListTransactionsContextType> *tag) | |
931 | { | |
932 | grpc_debug("%s: entered", __func__); | |
933 | ||
934 | auto list = &tag->context; | |
935 | if (tag->is_initial_process()) { | |
936 | grpc_debug("%s: initialize streaming state", __func__); | |
937 | // Fill our context container first time through | |
938 | nb_db_transactions_iterate(list_transactions_cb, list); | |
939 | list->push_back(std::make_tuple( | |
b680134e CH |
940 | 0xFFFF, std::string("fake client"), |
941 | std::string("fake date"), std::string("fake comment"))); | |
48c93061 CH |
942 | list->push_back(std::make_tuple(0xFFFE, |
943 | std::string("fake client2"), | |
944 | std::string("fake date"), | |
945 | std::string("fake comment2"))); | |
ec2ac5f2 RW |
946 | } |
947 | ||
b680134e CH |
948 | if (list->empty()) { |
949 | tag->async_responder.Finish(grpc::Status::OK, tag); | |
48c93061 | 950 | return false; |
ec2ac5f2 RW |
951 | } |
952 | ||
b680134e | 953 | auto item = list->back(); |
ec2ac5f2 | 954 | |
b680134e | 955 | frr::ListTransactionsResponse response; |
ec2ac5f2 | 956 | |
b680134e CH |
957 | // Response: uint32 id = 1; |
958 | response.set_id(std::get<0>(item)); | |
ec2ac5f2 | 959 | |
b680134e CH |
960 | // Response: string client = 2; |
961 | response.set_client(std::get<1>(item).c_str()); | |
962 | ||
963 | // Response: string date = 3; | |
964 | response.set_date(std::get<2>(item).c_str()); | |
965 | ||
966 | // Response: string comment = 4; | |
967 | response.set_comment(std::get<3>(item).c_str()); | |
ec2ac5f2 | 968 | |
b680134e CH |
969 | list->pop_back(); |
970 | if (list->empty()) { | |
971 | tag->async_responder.WriteAndFinish( | |
972 | response, grpc::WriteOptions(), grpc::Status::OK, tag); | |
48c93061 | 973 | return false; |
b680134e CH |
974 | } else { |
975 | tag->async_responder.Write(response, tag); | |
48c93061 | 976 | return true; |
ec2ac5f2 | 977 | } |
b680134e | 978 | } |
ec2ac5f2 | 979 | |
48c93061 CH |
980 | grpc::Status HandleUnaryGetTransaction( |
981 | UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse> | |
982 | *tag) | |
b680134e | 983 | { |
48c93061 | 984 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 985 | |
b680134e CH |
986 | // Request: uint32 transaction_id = 1; |
987 | uint32_t transaction_id = tag->request.transaction_id(); | |
988 | // Request: Encoding encoding = 2; | |
989 | frr::Encoding encoding = tag->request.encoding(); | |
990 | // Request: bool with_defaults = 3; | |
991 | bool with_defaults = tag->request.with_defaults(); | |
992 | ||
993 | grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__, | |
994 | transaction_id, encoding); | |
995 | ||
996 | struct nb_config *nb_config; | |
997 | ||
998 | // Load configuration from the transactions database. | |
999 | nb_config = nb_db_transaction_load(transaction_id); | |
48c93061 CH |
1000 | if (!nb_config) |
1001 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
1002 | "Transaction not found"); | |
ec2ac5f2 | 1003 | |
b680134e CH |
1004 | // Response: DataTree config = 1; |
1005 | auto config = tag->response.mutable_config(); | |
1006 | config->set_encoding(encoding); | |
1007 | ||
1008 | // Dump data using the requested format. | |
1009 | if (data_tree_from_dnode(config, nb_config->dnode, | |
1010 | encoding2lyd_format(encoding), with_defaults) | |
1011 | != 0) { | |
1012 | nb_config_free(nb_config); | |
48c93061 CH |
1013 | return grpc::Status(grpc::StatusCode::INTERNAL, |
1014 | "Failed to dump data"); | |
ec2ac5f2 RW |
1015 | } |
1016 | ||
b680134e | 1017 | nb_config_free(nb_config); |
ec2ac5f2 | 1018 | |
48c93061 | 1019 | return grpc::Status::OK; |
b680134e | 1020 | } |
ec2ac5f2 | 1021 | |
48c93061 CH |
1022 | grpc::Status HandleUnaryExecute( |
1023 | UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) | |
b680134e | 1024 | { |
48c93061 | 1025 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 1026 | |
b680134e CH |
1027 | struct nb_node *nb_node; |
1028 | struct list *input_list; | |
1029 | struct list *output_list; | |
1030 | struct listnode *node; | |
1031 | struct yang_data *data; | |
1032 | const char *xpath; | |
1033 | char errmsg[BUFSIZ] = {0}; | |
1034 | ||
1035 | // Request: string path = 1; | |
1036 | xpath = tag->request.path().c_str(); | |
1037 | ||
1038 | grpc_debug("%s(path: \"%s\")", __func__, xpath); | |
1039 | ||
48c93061 CH |
1040 | if (tag->request.path().empty()) |
1041 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
1042 | "Data path is empty"); | |
0fe5b904 | 1043 | |
b680134e | 1044 | nb_node = nb_node_find(xpath); |
48c93061 CH |
1045 | if (!nb_node) |
1046 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
1047 | "Unknown data path"); | |
ec2ac5f2 | 1048 | |
b680134e CH |
1049 | input_list = yang_data_list_new(); |
1050 | output_list = yang_data_list_new(); | |
ec2ac5f2 | 1051 | |
b680134e CH |
1052 | // Read input parameters. |
1053 | auto input = tag->request.input(); | |
1054 | for (const frr::PathValue &pv : input) { | |
1055 | // Request: repeated PathValue input = 2; | |
1056 | data = yang_data_new(pv.path().c_str(), pv.value().c_str()); | |
1057 | listnode_add(input_list, data); | |
1058 | } | |
ec2ac5f2 | 1059 | |
b680134e CH |
1060 | // Execute callback registered for this XPath. |
1061 | if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg, | |
1062 | sizeof(errmsg)) | |
1063 | != NB_OK) { | |
1064 | flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s", | |
1065 | __func__, xpath); | |
1066 | list_delete(&input_list); | |
1067 | list_delete(&output_list); | |
1068 | ||
48c93061 | 1069 | return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"); |
ec2ac5f2 | 1070 | } |
b680134e CH |
1071 | |
1072 | // Process output parameters. | |
1073 | for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { | |
1074 | // Response: repeated PathValue output = 1; | |
1075 | frr::PathValue *pv = tag->response.add_output(); | |
1076 | pv->set_path(data->xpath); | |
1077 | pv->set_value(data->value); | |
1078 | } | |
1079 | ||
1080 | // Release memory. | |
1081 | list_delete(&input_list); | |
1082 | list_delete(&output_list); | |
1083 | ||
48c93061 | 1084 | return grpc::Status::OK; |
b680134e CH |
1085 | } |
1086 | ||
1087 | // ------------------------------------------------------ | |
1088 | // Thread Initialization and Run Functions | |
1089 | // ------------------------------------------------------ | |
1090 | ||
1091 | ||
1092 | #define REQUEST_NEWRPC(NAME, cdb) \ | |
1093 | do { \ | |
48c93061 CH |
1094 | auto _rpcState = new UnaryRpcState<frr::NAME##Request, \ |
1095 | frr::NAME##Response>( \ | |
b680134e CH |
1096 | (cdb), &frr::Northbound::AsyncService::Request##NAME, \ |
1097 | &HandleUnary##NAME, #NAME); \ | |
83f6fce7 | 1098 | _rpcState->do_request(&service, cq.get(), true); \ |
b680134e CH |
1099 | } while (0) |
1100 | ||
48c93061 | 1101 | #define REQUEST_NEWRPC_STREAMING(NAME) \ |
b680134e | 1102 | do { \ |
48c93061 CH |
1103 | auto _rpcState = new StreamRpcState<frr::NAME##Request, \ |
1104 | frr::NAME##Response, \ | |
1105 | NAME##ContextType>( \ | |
1106 | &frr::Northbound::AsyncService::Request##NAME, \ | |
b680134e | 1107 | &HandleStreaming##NAME, #NAME); \ |
83f6fce7 | 1108 | _rpcState->do_request(&service, cq.get(), true); \ |
b680134e CH |
1109 | } while (0) |
1110 | ||
1111 | struct grpc_pthread_attr { | |
1112 | struct frr_pthread_attr attr; | |
1113 | unsigned long port; | |
ec2ac5f2 RW |
1114 | }; |
1115 | ||
de800c10 | 1116 | // Capture these objects so we can try to shut down cleanly |
83f6fce7 CH |
1117 | static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER; |
1118 | static grpc::Server *s_server; | |
de800c10 | 1119 | |
ec2ac5f2 RW |
1120 | static void *grpc_pthread_start(void *arg) |
1121 | { | |
0edcb505 | 1122 | struct frr_pthread *fpt = static_cast<frr_pthread *>(arg); |
b680134e CH |
1123 | uint port = (uint) reinterpret_cast<intptr_t>(fpt->data); |
1124 | ||
1125 | Candidates candidates; | |
1126 | grpc::ServerBuilder builder; | |
1127 | std::stringstream server_address; | |
83f6fce7 | 1128 | frr::Northbound::AsyncService service; |
ec2ac5f2 | 1129 | |
0edcb505 CS |
1130 | frr_pthread_set_name(fpt); |
1131 | ||
b680134e CH |
1132 | server_address << "0.0.0.0:" << port; |
1133 | builder.AddListeningPort(server_address.str(), | |
1134 | grpc::InsecureServerCredentials()); | |
83f6fce7 | 1135 | builder.RegisterService(&service); |
673e4407 RZ |
1136 | builder.AddChannelArgument( |
1137 | GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); | |
83f6fce7 CH |
1138 | std::unique_ptr<grpc::ServerCompletionQueue> cq = |
1139 | builder.AddCompletionQueue(); | |
1140 | std::unique_ptr<grpc::Server> server = builder.BuildAndStart(); | |
1141 | s_server = server.get(); | |
1142 | ||
ad6bd536 | 1143 | pthread_mutex_lock(&s_server_lock); // Make coverity happy |
83f6fce7 | 1144 | grpc_running = true; |
ad6bd536 | 1145 | pthread_mutex_unlock(&s_server_lock); // Make coverity happy |
b680134e | 1146 | |
48c93061 | 1147 | /* Schedule unary RPC handlers */ |
b680134e CH |
1148 | REQUEST_NEWRPC(GetCapabilities, NULL); |
1149 | REQUEST_NEWRPC(CreateCandidate, &candidates); | |
1150 | REQUEST_NEWRPC(DeleteCandidate, &candidates); | |
1151 | REQUEST_NEWRPC(UpdateCandidate, &candidates); | |
1152 | REQUEST_NEWRPC(EditCandidate, &candidates); | |
1153 | REQUEST_NEWRPC(LoadToCandidate, &candidates); | |
1154 | REQUEST_NEWRPC(Commit, &candidates); | |
1155 | REQUEST_NEWRPC(GetTransaction, NULL); | |
1156 | REQUEST_NEWRPC(LockConfig, NULL); | |
1157 | REQUEST_NEWRPC(UnlockConfig, NULL); | |
1158 | REQUEST_NEWRPC(Execute, NULL); | |
48c93061 CH |
1159 | |
1160 | /* Schedule streaming RPC handlers */ | |
1161 | REQUEST_NEWRPC_STREAMING(Get); | |
1162 | REQUEST_NEWRPC_STREAMING(ListTransactions); | |
b680134e CH |
1163 | |
1164 | zlog_notice("gRPC server listening on %s", | |
1165 | server_address.str().c_str()); | |
1166 | ||
1167 | /* Process inbound RPCs */ | |
83f6fce7 CH |
1168 | bool ok; |
1169 | void *tag; | |
48c93061 | 1170 | while (true) { |
83f6fce7 CH |
1171 | if (!cq->Next(&tag, &ok)) { |
1172 | grpc_debug("%s: CQ empty exiting", __func__); | |
de800c10 | 1173 | break; |
83f6fce7 | 1174 | } |
de800c10 | 1175 | |
83f6fce7 CH |
1176 | grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag, |
1177 | ok); | |
1178 | ||
48c93061 | 1179 | if (!ok) { |
83f6fce7 CH |
1180 | delete static_cast<RpcStateBase *>(tag); |
1181 | break; | |
1182 | } | |
b680134e CH |
1183 | |
1184 | RpcStateBase *rpc = static_cast<RpcStateBase *>(tag); | |
48c93061 CH |
1185 | if (rpc->get_state() != FINISH) |
1186 | rpc->run(&service, cq.get()); | |
1187 | else { | |
1188 | grpc_debug("%s RPC FINISH -> [delete]", rpc->name); | |
1189 | delete rpc; | |
1190 | } | |
b680134e | 1191 | } |
ec2ac5f2 | 1192 | |
83f6fce7 CH |
1193 | /* This was probably done for us to get here, but let's be safe */ |
1194 | pthread_mutex_lock(&s_server_lock); | |
1195 | grpc_running = false; | |
1196 | if (s_server) { | |
1197 | grpc_debug("%s: shutdown server and CQ", __func__); | |
1198 | server->Shutdown(); | |
1199 | s_server = NULL; | |
1200 | } | |
1201 | pthread_mutex_unlock(&s_server_lock); | |
1202 | ||
1203 | grpc_debug("%s: shutting down CQ", __func__); | |
1204 | cq->Shutdown(); | |
1205 | ||
1206 | grpc_debug("%s: draining the CQ", __func__); | |
1207 | while (cq->Next(&tag, &ok)) { | |
1208 | grpc_debug("%s: drain tag %p", __func__, tag); | |
1209 | delete static_cast<RpcStateBase *>(tag); | |
b680134e | 1210 | } |
ec2ac5f2 | 1211 | |
83f6fce7 | 1212 | zlog_info("%s: exiting from grpc pthread", __func__); |
ec2ac5f2 RW |
1213 | return NULL; |
1214 | } | |
1215 | ||
b680134e CH |
1216 | |
1217 | static int frr_grpc_init(uint port) | |
ec2ac5f2 | 1218 | { |
b680134e CH |
1219 | struct frr_pthread_attr attr = { |
1220 | .start = grpc_pthread_start, | |
1221 | .stop = NULL, | |
1222 | }; | |
1223 | ||
83f6fce7 CH |
1224 | grpc_debug("%s: entered", __func__); |
1225 | ||
0edcb505 | 1226 | fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc"); |
b680134e | 1227 | fpt->data = reinterpret_cast<void *>((intptr_t)port); |
0edcb505 | 1228 | |
ec2ac5f2 | 1229 | /* Create a pthread for gRPC since it runs its own event loop. */ |
0edcb505 | 1230 | if (frr_pthread_run(fpt, NULL) < 0) { |
ec2ac5f2 RW |
1231 | flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s", |
1232 | __func__, safe_strerror(errno)); | |
1233 | return -1; | |
1234 | } | |
ec2ac5f2 RW |
1235 | |
1236 | return 0; | |
1237 | } | |
1238 | ||
1239 | static int frr_grpc_finish(void) | |
1240 | { | |
83f6fce7 | 1241 | grpc_debug("%s: entered", __func__); |
de800c10 | 1242 | |
83f6fce7 CH |
1243 | if (!fpt) |
1244 | return 0; | |
de800c10 | 1245 | |
83f6fce7 CH |
1246 | /* |
1247 | * Shut the server down here in main thread. This will cause the wait on | |
1248 | * the completion queue (cq.Next()) to exit and cleanup everything else. | |
1249 | */ | |
1250 | pthread_mutex_lock(&s_server_lock); | |
1251 | grpc_running = false; | |
1252 | if (s_server) { | |
1253 | grpc_debug("%s: shutdown server", __func__); | |
1254 | s_server->Shutdown(); | |
1255 | s_server = NULL; | |
de800c10 | 1256 | } |
83f6fce7 | 1257 | pthread_mutex_unlock(&s_server_lock); |
de800c10 | 1258 | |
83f6fce7 CH |
1259 | grpc_debug("%s: joining and destroy grpc thread", __func__); |
1260 | pthread_join(fpt->thread, NULL); | |
1261 | frr_pthread_destroy(fpt); | |
79c68195 RZ |
1262 | |
1263 | // Fix protobuf 'memory leaks' during shutdown. | |
1264 | // https://groups.google.com/g/protobuf/c/4y_EmQiCGgs | |
1265 | google::protobuf::ShutdownProtobufLibrary(); | |
1266 | ||
ec2ac5f2 RW |
1267 | return 0; |
1268 | } | |
1269 | ||
20b0a2ed QY |
1270 | /* |
1271 | * This is done this way because module_init and module_late_init are both | |
1272 | * called during daemon pre-fork initialization. Because the GRPC library | |
1273 | * spawns threads internally, we need to delay initializing it until after | |
1274 | * fork. This is done by scheduling this init function as an event task, since | |
1275 | * the event loop doesn't run until after fork. | |
1276 | */ | |
e6685141 | 1277 | static void frr_grpc_module_very_late_init(struct event *thread) |
ec2ac5f2 | 1278 | { |
ec2ac5f2 | 1279 | const char *args = THIS_MODULE->load_args; |
b680134e | 1280 | uint port = GRPC_DEFAULT_PORT; |
ec2ac5f2 | 1281 | |
ec2ac5f2 | 1282 | if (args) { |
b680134e CH |
1283 | port = std::stoul(args); |
1284 | if (port < 1024 || port > UINT16_MAX) { | |
ec2ac5f2 | 1285 | flog_err(EC_LIB_GRPC_INIT, |
b680134e CH |
1286 | "%s: port number must be between 1025 and %d", |
1287 | __func__, UINT16_MAX); | |
ec2ac5f2 RW |
1288 | goto error; |
1289 | } | |
1290 | } | |
1291 | ||
b680134e | 1292 | if (frr_grpc_init(port) < 0) |
ec2ac5f2 RW |
1293 | goto error; |
1294 | ||
cc9f21da | 1295 | return; |
69ec5832 | 1296 | |
ec2ac5f2 RW |
1297 | error: |
1298 | flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module"); | |
ec2ac5f2 RW |
1299 | } |
1300 | ||
cd9d0537 | 1301 | static int frr_grpc_module_late_init(struct event_loop *tm) |
20b0a2ed | 1302 | { |
b680134e | 1303 | main_master = tm; |
20b0a2ed | 1304 | hook_register(frr_fini, frr_grpc_finish); |
907a2395 | 1305 | event_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL); |
20b0a2ed QY |
1306 | return 0; |
1307 | } | |
1308 | ||
ec2ac5f2 RW |
1309 | static int frr_grpc_module_init(void) |
1310 | { | |
1311 | hook_register(frr_late_init, frr_grpc_module_late_init); | |
1312 | ||
1313 | return 0; | |
1314 | } | |
1315 | ||
1316 | FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION, | |
1317 | .description = "FRR gRPC northbound module", | |
b680134e | 1318 | .init = frr_grpc_module_init, ); |