]>
Commit | Line | Data |
---|---|---|
ec2ac5f2 | 1 | // |
c85ecd64 | 2 | // Copyright (c) 2021-2022, LabN Consulting, L.L.C |
ec2ac5f2 RW |
3 | // Copyright (C) 2019 NetDEF, Inc. |
4 | // Renato Westphal | |
5 | // | |
6 | // This program is free software; you can redistribute it and/or modify it | |
7 | // under the terms of the GNU General Public License as published by the Free | |
8 | // Software Foundation; either version 2 of the License, or (at your option) | |
9 | // any later version. | |
10 | // | |
11 | // This program is distributed in the hope that it will be useful, but WITHOUT | |
12 | // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
13 | // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
14 | // more details. | |
15 | // | |
16 | // You should have received a copy of the GNU General Public License along | |
17 | // with this program; see the file COPYING; if not, write to the Free Software | |
18 | // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
19 | // | |
20 | ||
21 | #include <zebra.h> | |
63d12a7d QY |
22 | #include <grpcpp/grpcpp.h> |
23 | #include "grpc/frr-northbound.grpc.pb.h" | |
ec2ac5f2 RW |
24 | |
25 | #include "log.h" | |
26 | #include "libfrr.h" | |
09781197 | 27 | #include "lib/version.h" |
b680134e | 28 | #include "lib/thread.h" |
ec2ac5f2 RW |
29 | #include "command.h" |
30 | #include "lib_errors.h" | |
31 | #include "northbound.h" | |
32 | #include "northbound_db.h" | |
0edcb505 | 33 | #include "frr_pthread.h" |
ec2ac5f2 RW |
34 | |
35 | #include <iostream> | |
36 | #include <sstream> | |
37 | #include <memory> | |
38 | #include <string> | |
39 | ||
ec2ac5f2 RW |
40 | #define GRPC_DEFAULT_PORT 50051 |
41 | ||
48c93061 CH |
42 | |
43 | // ------------------------------------------------------ | |
44 | // File Local Variables | |
45 | // ------------------------------------------------------ | |
46 | ||
ec2ac5f2 RW |
47 | /* |
48 | * NOTE: we can't use the FRR debugging infrastructure here since it uses | |
49 | * atomics and C++ has a different atomics API. Enable gRPC debugging | |
50 | * unconditionally until we figure out a way to solve this problem. | |
51 | */ | |
b680134e CH |
52 | static bool nb_dbg_client_grpc = 0; |
53 | ||
54 | static struct thread_master *main_master; | |
ec2ac5f2 | 55 | |
0edcb505 CS |
56 | static struct frr_pthread *fpt; |
57 | ||
83f6fce7 CH |
58 | static bool grpc_running; |
59 | ||
b680134e CH |
60 | #define grpc_debug(...) \ |
61 | do { \ | |
62 | if (nb_dbg_client_grpc) \ | |
63 | zlog_debug(__VA_ARGS__); \ | |
64 | } while (0) | |
65 | ||
66 | // ------------------------------------------------------ | |
67 | // New Types | |
68 | // ------------------------------------------------------ | |
69 | ||
70 | enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED }; | |
71 | const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"}; | |
72 | ||
73 | struct candidate { | |
74 | uint64_t id; | |
75 | struct nb_config *config; | |
76 | struct nb_transaction *transaction; | |
0edcb505 | 77 | }; |
ec2ac5f2 | 78 | |
b680134e CH |
79 | class Candidates |
80 | { | |
81 | public: | |
82 | ~Candidates(void) | |
83 | { | |
84 | // Delete candidates. | |
85 | for (auto it = _cdb.begin(); it != _cdb.end(); it++) | |
d3074a52 | 86 | delete_candidate(it->first); |
b680134e CH |
87 | } |
88 | ||
89 | struct candidate *create_candidate(void) | |
90 | { | |
91 | uint64_t id = ++_next_id; | |
92 | assert(id); // TODO: implement an algorithm for unique reusable | |
93 | // IDs. | |
94 | struct candidate *c = &_cdb[id]; | |
95 | c->id = id; | |
96 | c->config = nb_config_dup(running_config); | |
97 | c->transaction = NULL; | |
98 | ||
99 | return c; | |
100 | } | |
101 | ||
d3074a52 | 102 | bool contains(uint64_t candidate_id) |
b680134e | 103 | { |
d3074a52 CH |
104 | return _cdb.count(candidate_id) > 0; |
105 | } | |
106 | ||
107 | void delete_candidate(uint64_t candidate_id) | |
108 | { | |
109 | struct candidate *c = &_cdb[candidate_id]; | |
b680134e CH |
110 | char errmsg[BUFSIZ] = {0}; |
111 | ||
b680134e CH |
112 | nb_config_free(c->config); |
113 | if (c->transaction) | |
114 | nb_candidate_commit_abort(c->transaction, errmsg, | |
115 | sizeof(errmsg)); | |
96d434f8 | 116 | _cdb.erase(c->id); |
b680134e CH |
117 | } |
118 | ||
d3074a52 | 119 | struct candidate *get_candidate(uint64_t id) |
b680134e CH |
120 | { |
121 | return _cdb.count(id) == 0 ? NULL : &_cdb[id]; | |
122 | } | |
123 | ||
124 | private: | |
125 | uint64_t _next_id = 0; | |
d3074a52 | 126 | std::map<uint64_t, struct candidate> _cdb; |
b680134e | 127 | }; |
ecf9fb30 | 128 | |
48c93061 CH |
129 | /* |
130 | * RpcStateBase is the common base class used to track a gRPC RPC. | |
131 | */ | |
ecf9fb30 QY |
132 | class RpcStateBase |
133 | { | |
134 | public: | |
b680134e | 135 | virtual void do_request(::frr::Northbound::AsyncService *service, |
83f6fce7 CH |
136 | ::grpc::ServerCompletionQueue *cq, |
137 | bool no_copy) = 0; | |
ecf9fb30 | 138 | |
48c93061 | 139 | RpcStateBase(const char *name) : name(name){}; |
b680134e | 140 | |
48c93061 CH |
141 | virtual ~RpcStateBase() = default; |
142 | ||
143 | CallState get_state() const | |
144 | { | |
145 | return state; | |
146 | } | |
147 | ||
148 | bool is_initial_process() const | |
149 | { | |
150 | /* Will always be true for Unary */ | |
151 | return entered_state == CREATE; | |
152 | } | |
153 | ||
154 | // Returns "more" status, if false caller can delete | |
155 | bool run(frr::Northbound::AsyncService *service, | |
156 | grpc::ServerCompletionQueue *cq) | |
b680134e | 157 | { |
b680134e | 158 | /* |
48c93061 CH |
159 | * We enter in either CREATE or MORE state, and transition to |
160 | * PROCESS state. | |
161 | */ | |
162 | this->entered_state = this->state; | |
163 | this->state = PROCESS; | |
164 | grpc_debug("%s RPC: %s -> %s on grpc-io-thread", name, | |
165 | call_states[this->entered_state], | |
166 | call_states[this->state]); | |
167 | /* | |
168 | * We schedule the callback on the main pthread, and wait for | |
169 | * the state to transition out of the PROCESS state. The new | |
170 | * state will either be MORE or FINISH. It will always be FINISH | |
171 | * for Unary RPCs. | |
b680134e | 172 | */ |
b680134e CH |
173 | thread_add_event(main_master, c_callback, (void *)this, 0, |
174 | NULL); | |
48c93061 | 175 | |
b680134e | 176 | pthread_mutex_lock(&this->cmux); |
48c93061 | 177 | while (this->state == PROCESS) |
b680134e CH |
178 | pthread_cond_wait(&this->cond, &this->cmux); |
179 | pthread_mutex_unlock(&this->cmux); | |
180 | ||
48c93061 CH |
181 | grpc_debug("%s RPC in %s on grpc-io-thread", name, |
182 | call_states[this->state]); | |
183 | ||
184 | if (this->state == FINISH) { | |
185 | /* | |
186 | * Server is done (FINISH) so prep to receive a new | |
187 | * request of this type. We could do this earlier but | |
188 | * that would mean we could be handling multiple same | |
189 | * type requests in parallel without limit. | |
190 | */ | |
191 | this->do_request(service, cq, false); | |
b680134e | 192 | } |
48c93061 | 193 | return true; |
b680134e CH |
194 | } |
195 | ||
48c93061 CH |
196 | protected: |
197 | virtual CallState run_mainthread(struct thread *thread) = 0; | |
b680134e | 198 | |
cc9f21da | 199 | static void c_callback(struct thread *thread) |
b680134e | 200 | { |
48c93061 | 201 | auto _tag = static_cast<RpcStateBase *>(thread->arg); |
b680134e CH |
202 | /* |
203 | * We hold the lock until the callback finishes and has updated | |
204 | * _tag->state, then we signal done and release. | |
205 | */ | |
206 | pthread_mutex_lock(&_tag->cmux); | |
207 | ||
208 | CallState enter_state = _tag->state; | |
48c93061 CH |
209 | grpc_debug("%s RPC: running %s on main thread", _tag->name, |
210 | call_states[enter_state]); | |
b680134e | 211 | |
48c93061 | 212 | _tag->state = _tag->run_mainthread(thread); |
b680134e | 213 | |
48c93061 | 214 | grpc_debug("%s RPC: %s -> %s [main thread]", _tag->name, |
b680134e CH |
215 | call_states[enter_state], call_states[_tag->state]); |
216 | ||
217 | pthread_cond_signal(&_tag->cond); | |
218 | pthread_mutex_unlock(&_tag->cmux); | |
cc9f21da | 219 | return; |
ecf9fb30 QY |
220 | } |
221 | ||
222 | grpc::ServerContext ctx; | |
48c93061 CH |
223 | pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; |
224 | pthread_cond_t cond = PTHREAD_COND_INITIALIZER; | |
225 | CallState state = CREATE; | |
ad6bd536 | 226 | CallState entered_state = CREATE; |
48c93061 CH |
227 | |
228 | public: | |
229 | const char *name; | |
230 | }; | |
231 | ||
232 | /* | |
233 | * The UnaryRpcState class is used to track the execution of a Unary RPC. | |
234 | * | |
235 | * Template Args: | |
236 | * Q - the request type for a given unary RPC | |
237 | * S - the response type for a given unary RPC | |
238 | */ | |
239 | template <typename Q, typename S> class UnaryRpcState : public RpcStateBase | |
240 | { | |
241 | public: | |
242 | typedef void (frr::Northbound::AsyncService::*reqfunc_t)( | |
243 | ::grpc::ServerContext *, Q *, | |
244 | ::grpc::ServerAsyncResponseWriter<S> *, | |
245 | ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, | |
246 | void *); | |
247 | ||
248 | UnaryRpcState(Candidates *cdb, reqfunc_t rfunc, | |
249 | grpc::Status (*cb)(UnaryRpcState<Q, S> *), | |
250 | const char *name) | |
251 | : RpcStateBase(name), cdb(cdb), requestf(rfunc), callback(cb), | |
252 | responder(&ctx){}; | |
253 | ||
254 | void do_request(::frr::Northbound::AsyncService *service, | |
255 | ::grpc::ServerCompletionQueue *cq, | |
256 | bool no_copy) override | |
257 | { | |
258 | grpc_debug("%s, posting a request for: %s", __func__, name); | |
259 | auto copy = no_copy ? this | |
260 | : new UnaryRpcState(cdb, requestf, callback, | |
261 | name); | |
262 | (service->*requestf)(©->ctx, ©->request, | |
263 | ©->responder, cq, cq, copy); | |
264 | } | |
265 | ||
266 | CallState run_mainthread(struct thread *thread) override | |
267 | { | |
268 | // Unary RPC are always finished, see "Unary" :) | |
269 | grpc::Status status = this->callback(this); | |
270 | responder.Finish(response, status, this); | |
271 | return FINISH; | |
272 | } | |
273 | ||
274 | Candidates *cdb; | |
275 | ||
ecf9fb30 QY |
276 | Q request; |
277 | S response; | |
278 | grpc::ServerAsyncResponseWriter<S> responder; | |
ecf9fb30 | 279 | |
48c93061 | 280 | grpc::Status (*callback)(UnaryRpcState<Q, S> *); |
c85ecd64 | 281 | reqfunc_t requestf = NULL; |
48c93061 | 282 | }; |
ecf9fb30 | 283 | |
48c93061 CH |
284 | /* |
285 | * The StreamRpcState class is used to track the execution of a Streaming RPC. | |
286 | * | |
287 | * Template Args: | |
288 | * Q - the request type for a given streaming RPC | |
289 | * S - the response type for a given streaming RPC | |
290 | * X - the type used to track the streaming state | |
291 | */ | |
292 | template <typename Q, typename S, typename X> | |
293 | class StreamRpcState : public RpcStateBase | |
294 | { | |
295 | public: | |
296 | typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( | |
297 | ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *, | |
298 | ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, | |
299 | void *); | |
b680134e | 300 | |
48c93061 CH |
301 | StreamRpcState(reqsfunc_t rfunc, bool (*cb)(StreamRpcState<Q, S, X> *), |
302 | const char *name) | |
303 | : RpcStateBase(name), requestsf(rfunc), callback(cb), | |
304 | async_responder(&ctx){}; | |
305 | ||
306 | void do_request(::frr::Northbound::AsyncService *service, | |
307 | ::grpc::ServerCompletionQueue *cq, | |
308 | bool no_copy) override | |
309 | { | |
310 | grpc_debug("%s, posting a request for: %s", __func__, name); | |
311 | auto copy = | |
312 | no_copy ? this | |
313 | : new StreamRpcState(requestsf, callback, name); | |
314 | (service->*requestsf)(©->ctx, ©->request, | |
315 | ©->async_responder, cq, cq, copy); | |
316 | } | |
317 | ||
318 | CallState run_mainthread(struct thread *thread) override | |
319 | { | |
320 | if (this->callback(this)) | |
321 | return MORE; | |
322 | else | |
323 | return FINISH; | |
324 | } | |
325 | ||
326 | Q request; | |
327 | S response; | |
328 | grpc::ServerAsyncWriter<S> async_responder; | |
329 | ||
330 | bool (*callback)(StreamRpcState<Q, S, X> *); | |
331 | reqsfunc_t requestsf = NULL; | |
332 | ||
333 | X context; | |
ecf9fb30 QY |
334 | }; |
335 | ||
b680134e CH |
336 | // ------------------------------------------------------ |
337 | // Utility Functions | |
338 | // ------------------------------------------------------ | |
ecf9fb30 | 339 | |
b680134e CH |
340 | static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding) |
341 | { | |
342 | switch (encoding) { | |
343 | case frr::JSON: | |
344 | return LYD_JSON; | |
345 | case frr::XML: | |
346 | return LYD_XML; | |
347 | default: | |
348 | flog_err(EC_LIB_DEVELOPMENT, | |
349 | "%s: unknown data encoding format (%u)", __func__, | |
350 | encoding); | |
351 | exit(1); | |
352 | } | |
353 | } | |
ecf9fb30 | 354 | |
b680134e | 355 | static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path, |
fe095adc | 356 | const char *value) |
ec2ac5f2 | 357 | { |
fe095adc CH |
358 | LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value, |
359 | LYD_NEW_PATH_UPDATE, &dnode); | |
b680134e CH |
360 | if (err != LY_SUCCESS) { |
361 | flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s", | |
362 | __func__, ly_errmsg(ly_native_ctx)); | |
363 | return -1; | |
ec2ac5f2 RW |
364 | } |
365 | ||
b680134e CH |
366 | return 0; |
367 | } | |
368 | ||
369 | static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path) | |
370 | { | |
371 | dnode = yang_dnode_get(dnode, path.c_str()); | |
372 | if (!dnode) | |
373 | return -1; | |
374 | ||
375 | lyd_free_tree(dnode); | |
376 | ||
377 | return 0; | |
378 | } | |
379 | ||
380 | static LY_ERR data_tree_from_dnode(frr::DataTree *dt, | |
381 | const struct lyd_node *dnode, | |
382 | LYD_FORMAT lyd_format, bool with_defaults) | |
383 | { | |
384 | char *strp; | |
385 | int options = 0; | |
386 | ||
387 | SET_FLAG(options, LYD_PRINT_WITHSIBLINGS); | |
388 | if (with_defaults) | |
389 | SET_FLAG(options, LYD_PRINT_WD_ALL); | |
390 | else | |
391 | SET_FLAG(options, LYD_PRINT_WD_TRIM); | |
392 | ||
393 | LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options); | |
394 | if (err == LY_SUCCESS) { | |
395 | if (strp) { | |
396 | dt->set_data(strp); | |
397 | free(strp); | |
398 | } | |
ec2ac5f2 | 399 | } |
b680134e CH |
400 | return err; |
401 | } | |
ec2ac5f2 | 402 | |
b680134e CH |
403 | static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt, |
404 | bool config_only) | |
405 | { | |
406 | struct lyd_node *dnode; | |
407 | int options, opt2; | |
408 | LY_ERR err; | |
409 | ||
410 | if (config_only) { | |
411 | options = LYD_PARSE_NO_STATE; | |
412 | opt2 = LYD_VALIDATE_NO_STATE; | |
413 | } else { | |
414 | options = LYD_PARSE_STRICT; | |
415 | opt2 = 0; | |
416 | } | |
417 | ||
418 | err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(), | |
419 | encoding2lyd_format(dt->encoding()), options, | |
420 | opt2, &dnode); | |
421 | if (err != LY_SUCCESS) { | |
422 | flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s", | |
423 | __func__, ly_errmsg(ly_native_ctx)); | |
424 | } | |
425 | return dnode; | |
426 | } | |
427 | ||
428 | static struct lyd_node *get_dnode_config(const std::string &path) | |
429 | { | |
430 | struct lyd_node *dnode; | |
431 | ||
0f538858 RZ |
432 | if (!yang_dnode_exists(running_config->dnode, |
433 | path.empty() ? NULL : path.c_str())) | |
434 | return NULL; | |
435 | ||
b680134e CH |
436 | dnode = yang_dnode_get(running_config->dnode, |
437 | path.empty() ? NULL : path.c_str()); | |
438 | if (dnode) | |
439 | dnode = yang_dnode_dup(dnode); | |
440 | ||
441 | return dnode; | |
442 | } | |
443 | ||
444 | static int get_oper_data_cb(const struct lysc_node *snode, | |
445 | struct yang_translator *translator, | |
446 | struct yang_data *data, void *arg) | |
447 | { | |
448 | struct lyd_node *dnode = static_cast<struct lyd_node *>(arg); | |
449 | int ret = yang_dnode_edit(dnode, data->xpath, data->value); | |
450 | yang_data_free(data); | |
451 | ||
452 | return (ret == 0) ? NB_OK : NB_ERR; | |
453 | } | |
454 | ||
455 | static struct lyd_node *get_dnode_state(const std::string &path) | |
456 | { | |
457 | struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false); | |
458 | if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode) | |
459 | != NB_OK) { | |
460 | yang_dnode_free(dnode); | |
461 | return NULL; | |
462 | } | |
463 | ||
464 | return dnode; | |
465 | } | |
466 | ||
467 | static grpc::Status get_path(frr::DataTree *dt, const std::string &path, | |
468 | int type, LYD_FORMAT lyd_format, | |
469 | bool with_defaults) | |
470 | { | |
471 | struct lyd_node *dnode_config = NULL; | |
472 | struct lyd_node *dnode_state = NULL; | |
473 | struct lyd_node *dnode_final; | |
474 | ||
475 | // Configuration data. | |
476 | if (type == frr::GetRequest_DataType_ALL | |
477 | || type == frr::GetRequest_DataType_CONFIG) { | |
478 | dnode_config = get_dnode_config(path); | |
479 | if (!dnode_config) | |
480 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
481 | "Data path not found"); | |
482 | } | |
483 | ||
484 | // Operational data. | |
485 | if (type == frr::GetRequest_DataType_ALL | |
486 | || type == frr::GetRequest_DataType_STATE) { | |
487 | dnode_state = get_dnode_state(path); | |
488 | if (!dnode_state) { | |
489 | if (dnode_config) | |
490 | yang_dnode_free(dnode_config); | |
491 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
492 | "Failed to fetch operational data"); | |
493 | } | |
494 | } | |
495 | ||
496 | switch (type) { | |
497 | case frr::GetRequest_DataType_ALL: | |
498 | // | |
499 | // Combine configuration and state data into a single | |
500 | // dnode. | |
501 | // | |
502 | if (lyd_merge_siblings(&dnode_state, dnode_config, | |
503 | LYD_MERGE_DESTRUCT) | |
504 | != LY_SUCCESS) { | |
505 | yang_dnode_free(dnode_state); | |
506 | yang_dnode_free(dnode_config); | |
507 | return grpc::Status( | |
508 | grpc::StatusCode::INTERNAL, | |
509 | "Failed to merge configuration and state data", | |
510 | ly_errmsg(ly_native_ctx)); | |
ecf9fb30 | 511 | } |
b680134e CH |
512 | |
513 | dnode_final = dnode_state; | |
514 | break; | |
515 | case frr::GetRequest_DataType_CONFIG: | |
516 | dnode_final = dnode_config; | |
517 | break; | |
518 | case frr::GetRequest_DataType_STATE: | |
519 | dnode_final = dnode_state; | |
520 | break; | |
ecf9fb30 QY |
521 | } |
522 | ||
b680134e CH |
523 | // Validate data to create implicit default nodes if necessary. |
524 | int validate_opts = 0; | |
525 | if (type == frr::GetRequest_DataType_CONFIG) | |
526 | validate_opts = LYD_VALIDATE_NO_STATE; | |
527 | else | |
528 | validate_opts = 0; | |
529 | ||
530 | LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx, | |
531 | validate_opts, NULL); | |
532 | ||
533 | if (err) | |
534 | flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s", | |
535 | __func__, ly_errmsg(ly_native_ctx)); | |
536 | // Dump data using the requested format. | |
537 | if (!err) | |
538 | err = data_tree_from_dnode(dt, dnode_final, lyd_format, | |
539 | with_defaults); | |
540 | yang_dnode_free(dnode_final); | |
541 | if (err) | |
542 | return grpc::Status(grpc::StatusCode::INTERNAL, | |
543 | "Failed to dump data"); | |
544 | return grpc::Status::OK; | |
545 | } | |
546 | ||
547 | ||
548 | // ------------------------------------------------------ | |
549 | // RPC Callback Functions: run on main thread | |
550 | // ------------------------------------------------------ | |
551 | ||
48c93061 CH |
552 | grpc::Status HandleUnaryGetCapabilities( |
553 | UnaryRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse> | |
554 | *tag) | |
b680134e | 555 | { |
48c93061 | 556 | grpc_debug("%s: entered", __func__); |
b680134e CH |
557 | |
558 | // Response: string frr_version = 1; | |
559 | tag->response.set_frr_version(FRR_VERSION); | |
560 | ||
561 | // Response: bool rollback_support = 2; | |
ec2ac5f2 | 562 | #ifdef HAVE_CONFIG_ROLLBACKS |
b680134e | 563 | tag->response.set_rollback_support(true); |
ec2ac5f2 | 564 | #else |
b680134e | 565 | tag->response.set_rollback_support(false); |
ec2ac5f2 | 566 | #endif |
b680134e CH |
567 | // Response: repeated ModuleData supported_modules = 3; |
568 | struct yang_module *module; | |
569 | RB_FOREACH (module, yang_modules, &yang_modules) { | |
570 | auto m = tag->response.add_supported_modules(); | |
571 | ||
572 | m->set_name(module->name); | |
573 | if (module->info->revision) | |
574 | m->set_revision(module->info->revision); | |
575 | m->set_organization(module->info->org); | |
576 | } | |
ec2ac5f2 | 577 | |
b680134e CH |
578 | // Response: repeated Encoding supported_encodings = 4; |
579 | tag->response.add_supported_encodings(frr::JSON); | |
580 | tag->response.add_supported_encodings(frr::XML); | |
ec2ac5f2 | 581 | |
48c93061 | 582 | return grpc::Status::OK; |
b680134e | 583 | } |
ec2ac5f2 | 584 | |
48c93061 CH |
585 | // Define the context variable type for this streaming handler |
586 | typedef std::list<std::string> GetContextType; | |
b680134e | 587 | |
48c93061 CH |
588 | bool HandleStreamingGet( |
589 | StreamRpcState<frr::GetRequest, frr::GetResponse, GetContextType> *tag) | |
590 | { | |
591 | grpc_debug("%s: entered", __func__); | |
ec2ac5f2 | 592 | |
48c93061 CH |
593 | auto mypathps = &tag->context; |
594 | if (tag->is_initial_process()) { | |
595 | // Fill our context container first time through | |
596 | grpc_debug("%s: initialize streaming state", __func__); | |
b680134e CH |
597 | auto paths = tag->request.path(); |
598 | for (const std::string &path : paths) { | |
48c93061 | 599 | mypathps->push_back(std::string(path)); |
ecf9fb30 | 600 | } |
b680134e | 601 | } |
ecf9fb30 | 602 | |
b680134e CH |
603 | // Request: DataType type = 1; |
604 | int type = tag->request.type(); | |
605 | // Request: Encoding encoding = 2; | |
606 | frr::Encoding encoding = tag->request.encoding(); | |
607 | // Request: bool with_defaults = 3; | |
608 | bool with_defaults = tag->request.with_defaults(); | |
609 | ||
b680134e CH |
610 | if (mypathps->empty()) { |
611 | tag->async_responder.Finish(grpc::Status::OK, tag); | |
48c93061 | 612 | return false; |
ec2ac5f2 RW |
613 | } |
614 | ||
b680134e CH |
615 | frr::GetResponse response; |
616 | grpc::Status status; | |
617 | ||
618 | // Response: int64 timestamp = 1; | |
619 | response.set_timestamp(time(NULL)); | |
620 | ||
621 | // Response: DataTree data = 2; | |
622 | auto *data = response.mutable_data(); | |
623 | data->set_encoding(tag->request.encoding()); | |
624 | status = get_path(data, mypathps->back().c_str(), type, | |
625 | encoding2lyd_format(encoding), with_defaults); | |
626 | ||
627 | if (!status.ok()) { | |
628 | tag->async_responder.WriteAndFinish( | |
629 | response, grpc::WriteOptions(), status, tag); | |
48c93061 | 630 | return false; |
b680134e CH |
631 | } |
632 | ||
633 | mypathps->pop_back(); | |
634 | if (mypathps->empty()) { | |
635 | tag->async_responder.WriteAndFinish( | |
636 | response, grpc::WriteOptions(), grpc::Status::OK, tag); | |
48c93061 | 637 | return false; |
b680134e CH |
638 | } else { |
639 | tag->async_responder.Write(response, tag); | |
48c93061 | 640 | return true; |
b680134e CH |
641 | } |
642 | } | |
643 | ||
48c93061 CH |
644 | grpc::Status HandleUnaryCreateCandidate( |
645 | UnaryRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse> | |
646 | *tag) | |
b680134e | 647 | { |
48c93061 | 648 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 649 | |
b680134e | 650 | struct candidate *candidate = tag->cdb->create_candidate(); |
48c93061 CH |
651 | if (!candidate) |
652 | return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, | |
653 | "Can't create candidate configuration"); | |
654 | tag->response.set_candidate_id(candidate->id); | |
655 | return grpc::Status::OK; | |
b680134e CH |
656 | } |
657 | ||
48c93061 CH |
658 | grpc::Status HandleUnaryDeleteCandidate( |
659 | UnaryRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse> | |
660 | *tag) | |
b680134e | 661 | { |
48c93061 | 662 | grpc_debug("%s: entered", __func__); |
b680134e | 663 | |
b680134e CH |
664 | uint32_t candidate_id = tag->request.candidate_id(); |
665 | ||
666 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
667 | ||
48c93061 CH |
668 | if (!tag->cdb->contains(candidate_id)) |
669 | return grpc::Status(grpc::StatusCode::NOT_FOUND, | |
670 | "candidate configuration not found"); | |
671 | tag->cdb->delete_candidate(candidate_id); | |
672 | return grpc::Status::OK; | |
b680134e CH |
673 | } |
674 | ||
48c93061 CH |
675 | grpc::Status HandleUnaryUpdateCandidate( |
676 | UnaryRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse> | |
677 | *tag) | |
b680134e | 678 | { |
48c93061 | 679 | grpc_debug("%s: entered", __func__); |
ecf9fb30 | 680 | |
b680134e CH |
681 | uint32_t candidate_id = tag->request.candidate_id(); |
682 | ||
683 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
684 | ||
685 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); | |
686 | ||
687 | if (!candidate) | |
48c93061 CH |
688 | return grpc::Status(grpc::StatusCode::NOT_FOUND, |
689 | "candidate configuration not found"); | |
690 | if (candidate->transaction) | |
691 | return grpc::Status( | |
692 | grpc::StatusCode::FAILED_PRECONDITION, | |
693 | "candidate is in the middle of a transaction"); | |
694 | if (nb_candidate_update(candidate->config) != NB_OK) | |
695 | return grpc::Status(grpc::StatusCode::INTERNAL, | |
696 | "failed to update candidate configuration"); | |
b680134e | 697 | |
48c93061 | 698 | return grpc::Status::OK; |
b680134e CH |
699 | } |
700 | ||
48c93061 CH |
701 | grpc::Status HandleUnaryEditCandidate( |
702 | UnaryRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> | |
703 | *tag) | |
b680134e | 704 | { |
48c93061 | 705 | grpc_debug("%s: entered", __func__); |
b680134e | 706 | |
b680134e | 707 | uint32_t candidate_id = tag->request.candidate_id(); |
ec2ac5f2 | 708 | |
b680134e | 709 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); |
ec2ac5f2 | 710 | |
b680134e | 711 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); |
48c93061 CH |
712 | if (!candidate) |
713 | return grpc::Status(grpc::StatusCode::NOT_FOUND, | |
714 | "candidate configuration not found"); | |
ec2ac5f2 | 715 | |
b680134e CH |
716 | struct nb_config *candidate_tmp = nb_config_dup(candidate->config); |
717 | ||
718 | auto pvs = tag->request.update(); | |
719 | for (const frr::PathValue &pv : pvs) { | |
fe095adc CH |
720 | if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), |
721 | pv.value().c_str()) != 0) { | |
b680134e CH |
722 | nb_config_free(candidate_tmp); |
723 | ||
48c93061 CH |
724 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, |
725 | "Failed to update \"" + pv.path() + | |
726 | "\""); | |
ecf9fb30 | 727 | } |
ec2ac5f2 RW |
728 | } |
729 | ||
b680134e CH |
730 | pvs = tag->request.delete_(); |
731 | for (const frr::PathValue &pv : pvs) { | |
732 | if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) { | |
733 | nb_config_free(candidate_tmp); | |
48c93061 CH |
734 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, |
735 | "Failed to remove \"" + pv.path() + | |
736 | "\""); | |
ec2ac5f2 | 737 | } |
ec2ac5f2 RW |
738 | } |
739 | ||
b680134e CH |
740 | // No errors, accept all changes. |
741 | nb_config_replace(candidate->config, candidate_tmp, false); | |
48c93061 | 742 | return grpc::Status::OK; |
b680134e CH |
743 | } |
744 | ||
48c93061 CH |
745 | grpc::Status HandleUnaryLoadToCandidate( |
746 | UnaryRpcState<frr::LoadToCandidateRequest, frr::LoadToCandidateResponse> | |
747 | *tag) | |
b680134e | 748 | { |
48c93061 | 749 | grpc_debug("%s: entered", __func__); |
b680134e | 750 | |
b680134e CH |
751 | uint32_t candidate_id = tag->request.candidate_id(); |
752 | ||
753 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
754 | ||
755 | // Request: LoadType type = 2; | |
756 | int load_type = tag->request.type(); | |
757 | // Request: DataTree config = 3; | |
758 | auto config = tag->request.config(); | |
759 | ||
b680134e | 760 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); |
48c93061 CH |
761 | if (!candidate) |
762 | return grpc::Status(grpc::StatusCode::NOT_FOUND, | |
763 | "candidate configuration not found"); | |
ec2ac5f2 | 764 | |
b680134e | 765 | struct lyd_node *dnode = dnode_from_data_tree(&config, true); |
48c93061 CH |
766 | if (!dnode) |
767 | return grpc::Status(grpc::StatusCode::INTERNAL, | |
768 | "Failed to parse the configuration"); | |
ec2ac5f2 | 769 | |
b680134e | 770 | struct nb_config *loaded_config = nb_config_new(dnode); |
b680134e CH |
771 | if (load_type == frr::LoadToCandidateRequest::REPLACE) |
772 | nb_config_replace(candidate->config, loaded_config, false); | |
48c93061 CH |
773 | else if (nb_config_merge(candidate->config, loaded_config, false) != |
774 | NB_OK) | |
775 | return grpc::Status(grpc::StatusCode::INTERNAL, | |
776 | "Failed to merge the loaded configuration"); | |
ec2ac5f2 | 777 | |
48c93061 | 778 | return grpc::Status::OK; |
b680134e CH |
779 | } |
780 | ||
48c93061 CH |
781 | grpc::Status |
782 | HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag) | |
b680134e | 783 | { |
48c93061 | 784 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 785 | |
b680134e CH |
786 | // Request: uint32 candidate_id = 1; |
787 | uint32_t candidate_id = tag->request.candidate_id(); | |
788 | ||
789 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
790 | ||
791 | // Request: Phase phase = 2; | |
792 | int phase = tag->request.phase(); | |
793 | // Request: string comment = 3; | |
794 | const std::string comment = tag->request.comment(); | |
795 | ||
796 | // Find candidate configuration. | |
797 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); | |
48c93061 CH |
798 | if (!candidate) |
799 | return grpc::Status(grpc::StatusCode::NOT_FOUND, | |
800 | "candidate configuration not found"); | |
b680134e CH |
801 | |
802 | int ret = NB_OK; | |
803 | uint32_t transaction_id = 0; | |
804 | ||
805 | // Check for misuse of the two-phase commit protocol. | |
806 | switch (phase) { | |
807 | case frr::CommitRequest::PREPARE: | |
808 | case frr::CommitRequest::ALL: | |
48c93061 CH |
809 | if (candidate->transaction) |
810 | return grpc::Status( | |
811 | grpc::StatusCode::FAILED_PRECONDITION, | |
812 | "candidate is in the middle of a transaction"); | |
b680134e CH |
813 | break; |
814 | case frr::CommitRequest::ABORT: | |
815 | case frr::CommitRequest::APPLY: | |
48c93061 CH |
816 | if (!candidate->transaction) |
817 | return grpc::Status( | |
818 | grpc::StatusCode::FAILED_PRECONDITION, | |
819 | "no transaction in progress"); | |
b680134e CH |
820 | break; |
821 | default: | |
822 | break; | |
ec2ac5f2 RW |
823 | } |
824 | ||
ecf9fb30 | 825 | |
b680134e CH |
826 | // Execute the user request. |
827 | struct nb_context context = {}; | |
828 | context.client = NB_CLIENT_GRPC; | |
829 | char errmsg[BUFSIZ] = {0}; | |
830 | ||
831 | switch (phase) { | |
832 | case frr::CommitRequest::VALIDATE: | |
833 | grpc_debug("`-> Performing VALIDATE"); | |
834 | ret = nb_candidate_validate(&context, candidate->config, errmsg, | |
835 | sizeof(errmsg)); | |
836 | break; | |
837 | case frr::CommitRequest::PREPARE: | |
838 | grpc_debug("`-> Performing PREPARE"); | |
839 | ret = nb_candidate_commit_prepare( | |
840 | &context, candidate->config, comment.c_str(), | |
841 | &candidate->transaction, errmsg, sizeof(errmsg)); | |
842 | break; | |
843 | case frr::CommitRequest::ABORT: | |
844 | grpc_debug("`-> Performing ABORT"); | |
845 | nb_candidate_commit_abort(candidate->transaction, errmsg, | |
846 | sizeof(errmsg)); | |
847 | break; | |
848 | case frr::CommitRequest::APPLY: | |
849 | grpc_debug("`-> Performing APPLY"); | |
850 | nb_candidate_commit_apply(candidate->transaction, true, | |
851 | &transaction_id, errmsg, | |
852 | sizeof(errmsg)); | |
853 | break; | |
854 | case frr::CommitRequest::ALL: | |
855 | grpc_debug("`-> Performing ALL"); | |
856 | ret = nb_candidate_commit(&context, candidate->config, true, | |
857 | comment.c_str(), &transaction_id, | |
858 | errmsg, sizeof(errmsg)); | |
859 | break; | |
860 | } | |
ec2ac5f2 | 861 | |
b680134e CH |
862 | // Map northbound error codes to gRPC status codes. |
863 | grpc::Status status; | |
864 | switch (ret) { | |
865 | case NB_OK: | |
866 | status = grpc::Status::OK; | |
867 | break; | |
868 | case NB_ERR_NO_CHANGES: | |
869 | status = grpc::Status(grpc::StatusCode::ABORTED, errmsg); | |
870 | break; | |
871 | case NB_ERR_LOCKED: | |
872 | status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg); | |
873 | break; | |
874 | case NB_ERR_VALIDATION: | |
875 | status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
876 | errmsg); | |
877 | break; | |
878 | case NB_ERR_RESOURCE: | |
879 | status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, | |
880 | errmsg); | |
881 | break; | |
882 | case NB_ERR: | |
883 | default: | |
884 | status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg); | |
885 | break; | |
886 | } | |
ec2ac5f2 | 887 | |
b680134e CH |
888 | grpc_debug("`-> Result: %s (message: '%s')", |
889 | nb_err_name((enum nb_error)ret), errmsg); | |
890 | ||
891 | if (ret == NB_OK) { | |
892 | // Response: uint32 transaction_id = 1; | |
893 | if (transaction_id) | |
894 | tag->response.set_transaction_id(transaction_id); | |
ec2ac5f2 | 895 | } |
b680134e CH |
896 | if (strlen(errmsg) > 0) |
897 | tag->response.set_error_message(errmsg); | |
ec2ac5f2 | 898 | |
48c93061 | 899 | return status; |
b680134e | 900 | } |
ec2ac5f2 | 901 | |
48c93061 CH |
902 | grpc::Status HandleUnaryLockConfig( |
903 | UnaryRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) | |
b680134e | 904 | { |
48c93061 | 905 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 906 | |
48c93061 CH |
907 | if (nb_running_lock(NB_CLIENT_GRPC, NULL)) |
908 | return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, | |
909 | "running configuration is locked already"); | |
910 | return grpc::Status::OK; | |
b680134e | 911 | } |
ec2ac5f2 | 912 | |
48c93061 CH |
913 | grpc::Status HandleUnaryUnlockConfig( |
914 | UnaryRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag) | |
b680134e | 915 | { |
48c93061 | 916 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 917 | |
48c93061 CH |
918 | if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) |
919 | return grpc::Status( | |
920 | grpc::StatusCode::FAILED_PRECONDITION, | |
921 | "failed to unlock the running configuration"); | |
922 | return grpc::Status::OK; | |
b680134e | 923 | } |
ec2ac5f2 | 924 | |
b680134e CH |
925 | static void list_transactions_cb(void *arg, int transaction_id, |
926 | const char *client_name, const char *date, | |
927 | const char *comment) | |
928 | { | |
929 | auto list = static_cast<std::list< | |
930 | std::tuple<int, std::string, std::string, std::string>> *>(arg); | |
931 | list->push_back( | |
932 | std::make_tuple(transaction_id, std::string(client_name), | |
933 | std::string(date), std::string(comment))); | |
934 | } | |
935 | ||
48c93061 CH |
936 | // Define the context variable type for this streaming handler |
937 | typedef std::list<std::tuple<int, std::string, std::string, std::string>> | |
938 | ListTransactionsContextType; | |
b680134e | 939 | |
48c93061 CH |
940 | bool HandleStreamingListTransactions( |
941 | StreamRpcState<frr::ListTransactionsRequest, | |
942 | frr::ListTransactionsResponse, | |
943 | ListTransactionsContextType> *tag) | |
944 | { | |
945 | grpc_debug("%s: entered", __func__); | |
946 | ||
947 | auto list = &tag->context; | |
948 | if (tag->is_initial_process()) { | |
949 | grpc_debug("%s: initialize streaming state", __func__); | |
950 | // Fill our context container first time through | |
951 | nb_db_transactions_iterate(list_transactions_cb, list); | |
952 | list->push_back(std::make_tuple( | |
b680134e CH |
953 | 0xFFFF, std::string("fake client"), |
954 | std::string("fake date"), std::string("fake comment"))); | |
48c93061 CH |
955 | list->push_back(std::make_tuple(0xFFFE, |
956 | std::string("fake client2"), | |
957 | std::string("fake date"), | |
958 | std::string("fake comment2"))); | |
ec2ac5f2 RW |
959 | } |
960 | ||
b680134e CH |
961 | if (list->empty()) { |
962 | tag->async_responder.Finish(grpc::Status::OK, tag); | |
48c93061 | 963 | return false; |
ec2ac5f2 RW |
964 | } |
965 | ||
b680134e | 966 | auto item = list->back(); |
ec2ac5f2 | 967 | |
b680134e | 968 | frr::ListTransactionsResponse response; |
ec2ac5f2 | 969 | |
b680134e CH |
970 | // Response: uint32 id = 1; |
971 | response.set_id(std::get<0>(item)); | |
ec2ac5f2 | 972 | |
b680134e CH |
973 | // Response: string client = 2; |
974 | response.set_client(std::get<1>(item).c_str()); | |
975 | ||
976 | // Response: string date = 3; | |
977 | response.set_date(std::get<2>(item).c_str()); | |
978 | ||
979 | // Response: string comment = 4; | |
980 | response.set_comment(std::get<3>(item).c_str()); | |
ec2ac5f2 | 981 | |
b680134e CH |
982 | list->pop_back(); |
983 | if (list->empty()) { | |
984 | tag->async_responder.WriteAndFinish( | |
985 | response, grpc::WriteOptions(), grpc::Status::OK, tag); | |
48c93061 | 986 | return false; |
b680134e CH |
987 | } else { |
988 | tag->async_responder.Write(response, tag); | |
48c93061 | 989 | return true; |
ec2ac5f2 | 990 | } |
b680134e | 991 | } |
ec2ac5f2 | 992 | |
48c93061 CH |
993 | grpc::Status HandleUnaryGetTransaction( |
994 | UnaryRpcState<frr::GetTransactionRequest, frr::GetTransactionResponse> | |
995 | *tag) | |
b680134e | 996 | { |
48c93061 | 997 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 998 | |
b680134e CH |
999 | // Request: uint32 transaction_id = 1; |
1000 | uint32_t transaction_id = tag->request.transaction_id(); | |
1001 | // Request: Encoding encoding = 2; | |
1002 | frr::Encoding encoding = tag->request.encoding(); | |
1003 | // Request: bool with_defaults = 3; | |
1004 | bool with_defaults = tag->request.with_defaults(); | |
1005 | ||
1006 | grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__, | |
1007 | transaction_id, encoding); | |
1008 | ||
1009 | struct nb_config *nb_config; | |
1010 | ||
1011 | // Load configuration from the transactions database. | |
1012 | nb_config = nb_db_transaction_load(transaction_id); | |
48c93061 CH |
1013 | if (!nb_config) |
1014 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
1015 | "Transaction not found"); | |
ec2ac5f2 | 1016 | |
b680134e CH |
1017 | // Response: DataTree config = 1; |
1018 | auto config = tag->response.mutable_config(); | |
1019 | config->set_encoding(encoding); | |
1020 | ||
1021 | // Dump data using the requested format. | |
1022 | if (data_tree_from_dnode(config, nb_config->dnode, | |
1023 | encoding2lyd_format(encoding), with_defaults) | |
1024 | != 0) { | |
1025 | nb_config_free(nb_config); | |
48c93061 CH |
1026 | return grpc::Status(grpc::StatusCode::INTERNAL, |
1027 | "Failed to dump data"); | |
ec2ac5f2 RW |
1028 | } |
1029 | ||
b680134e | 1030 | nb_config_free(nb_config); |
ec2ac5f2 | 1031 | |
48c93061 | 1032 | return grpc::Status::OK; |
b680134e | 1033 | } |
ec2ac5f2 | 1034 | |
48c93061 CH |
1035 | grpc::Status HandleUnaryExecute( |
1036 | UnaryRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) | |
b680134e | 1037 | { |
48c93061 | 1038 | grpc_debug("%s: entered", __func__); |
ec2ac5f2 | 1039 | |
b680134e CH |
1040 | struct nb_node *nb_node; |
1041 | struct list *input_list; | |
1042 | struct list *output_list; | |
1043 | struct listnode *node; | |
1044 | struct yang_data *data; | |
1045 | const char *xpath; | |
1046 | char errmsg[BUFSIZ] = {0}; | |
1047 | ||
1048 | // Request: string path = 1; | |
1049 | xpath = tag->request.path().c_str(); | |
1050 | ||
1051 | grpc_debug("%s(path: \"%s\")", __func__, xpath); | |
1052 | ||
48c93061 CH |
1053 | if (tag->request.path().empty()) |
1054 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
1055 | "Data path is empty"); | |
0fe5b904 | 1056 | |
b680134e | 1057 | nb_node = nb_node_find(xpath); |
48c93061 CH |
1058 | if (!nb_node) |
1059 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
1060 | "Unknown data path"); | |
ec2ac5f2 | 1061 | |
b680134e CH |
1062 | input_list = yang_data_list_new(); |
1063 | output_list = yang_data_list_new(); | |
ec2ac5f2 | 1064 | |
b680134e CH |
1065 | // Read input parameters. |
1066 | auto input = tag->request.input(); | |
1067 | for (const frr::PathValue &pv : input) { | |
1068 | // Request: repeated PathValue input = 2; | |
1069 | data = yang_data_new(pv.path().c_str(), pv.value().c_str()); | |
1070 | listnode_add(input_list, data); | |
1071 | } | |
ec2ac5f2 | 1072 | |
b680134e CH |
1073 | // Execute callback registered for this XPath. |
1074 | if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg, | |
1075 | sizeof(errmsg)) | |
1076 | != NB_OK) { | |
1077 | flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s", | |
1078 | __func__, xpath); | |
1079 | list_delete(&input_list); | |
1080 | list_delete(&output_list); | |
1081 | ||
48c93061 | 1082 | return grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"); |
ec2ac5f2 | 1083 | } |
b680134e CH |
1084 | |
1085 | // Process output parameters. | |
1086 | for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { | |
1087 | // Response: repeated PathValue output = 1; | |
1088 | frr::PathValue *pv = tag->response.add_output(); | |
1089 | pv->set_path(data->xpath); | |
1090 | pv->set_value(data->value); | |
1091 | } | |
1092 | ||
1093 | // Release memory. | |
1094 | list_delete(&input_list); | |
1095 | list_delete(&output_list); | |
1096 | ||
48c93061 | 1097 | return grpc::Status::OK; |
b680134e CH |
1098 | } |
1099 | ||
1100 | // ------------------------------------------------------ | |
1101 | // Thread Initialization and Run Functions | |
1102 | // ------------------------------------------------------ | |
1103 | ||
1104 | ||
1105 | #define REQUEST_NEWRPC(NAME, cdb) \ | |
1106 | do { \ | |
48c93061 CH |
1107 | auto _rpcState = new UnaryRpcState<frr::NAME##Request, \ |
1108 | frr::NAME##Response>( \ | |
b680134e CH |
1109 | (cdb), &frr::Northbound::AsyncService::Request##NAME, \ |
1110 | &HandleUnary##NAME, #NAME); \ | |
83f6fce7 | 1111 | _rpcState->do_request(&service, cq.get(), true); \ |
b680134e CH |
1112 | } while (0) |
1113 | ||
48c93061 | 1114 | #define REQUEST_NEWRPC_STREAMING(NAME) \ |
b680134e | 1115 | do { \ |
48c93061 CH |
1116 | auto _rpcState = new StreamRpcState<frr::NAME##Request, \ |
1117 | frr::NAME##Response, \ | |
1118 | NAME##ContextType>( \ | |
1119 | &frr::Northbound::AsyncService::Request##NAME, \ | |
b680134e | 1120 | &HandleStreaming##NAME, #NAME); \ |
83f6fce7 | 1121 | _rpcState->do_request(&service, cq.get(), true); \ |
b680134e CH |
1122 | } while (0) |
1123 | ||
1124 | struct grpc_pthread_attr { | |
1125 | struct frr_pthread_attr attr; | |
1126 | unsigned long port; | |
ec2ac5f2 RW |
1127 | }; |
1128 | ||
de800c10 | 1129 | // Capture these objects so we can try to shut down cleanly |
83f6fce7 CH |
1130 | static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER; |
1131 | static grpc::Server *s_server; | |
de800c10 | 1132 | |
ec2ac5f2 RW |
1133 | static void *grpc_pthread_start(void *arg) |
1134 | { | |
0edcb505 | 1135 | struct frr_pthread *fpt = static_cast<frr_pthread *>(arg); |
b680134e CH |
1136 | uint port = (uint) reinterpret_cast<intptr_t>(fpt->data); |
1137 | ||
1138 | Candidates candidates; | |
1139 | grpc::ServerBuilder builder; | |
1140 | std::stringstream server_address; | |
83f6fce7 | 1141 | frr::Northbound::AsyncService service; |
ec2ac5f2 | 1142 | |
0edcb505 CS |
1143 | frr_pthread_set_name(fpt); |
1144 | ||
b680134e CH |
1145 | server_address << "0.0.0.0:" << port; |
1146 | builder.AddListeningPort(server_address.str(), | |
1147 | grpc::InsecureServerCredentials()); | |
83f6fce7 | 1148 | builder.RegisterService(&service); |
673e4407 RZ |
1149 | builder.AddChannelArgument( |
1150 | GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); | |
83f6fce7 CH |
1151 | std::unique_ptr<grpc::ServerCompletionQueue> cq = |
1152 | builder.AddCompletionQueue(); | |
1153 | std::unique_ptr<grpc::Server> server = builder.BuildAndStart(); | |
1154 | s_server = server.get(); | |
1155 | ||
ad6bd536 | 1156 | pthread_mutex_lock(&s_server_lock); // Make coverity happy |
83f6fce7 | 1157 | grpc_running = true; |
ad6bd536 | 1158 | pthread_mutex_unlock(&s_server_lock); // Make coverity happy |
b680134e | 1159 | |
48c93061 | 1160 | /* Schedule unary RPC handlers */ |
b680134e CH |
1161 | REQUEST_NEWRPC(GetCapabilities, NULL); |
1162 | REQUEST_NEWRPC(CreateCandidate, &candidates); | |
1163 | REQUEST_NEWRPC(DeleteCandidate, &candidates); | |
1164 | REQUEST_NEWRPC(UpdateCandidate, &candidates); | |
1165 | REQUEST_NEWRPC(EditCandidate, &candidates); | |
1166 | REQUEST_NEWRPC(LoadToCandidate, &candidates); | |
1167 | REQUEST_NEWRPC(Commit, &candidates); | |
1168 | REQUEST_NEWRPC(GetTransaction, NULL); | |
1169 | REQUEST_NEWRPC(LockConfig, NULL); | |
1170 | REQUEST_NEWRPC(UnlockConfig, NULL); | |
1171 | REQUEST_NEWRPC(Execute, NULL); | |
48c93061 CH |
1172 | |
1173 | /* Schedule streaming RPC handlers */ | |
1174 | REQUEST_NEWRPC_STREAMING(Get); | |
1175 | REQUEST_NEWRPC_STREAMING(ListTransactions); | |
b680134e CH |
1176 | |
1177 | zlog_notice("gRPC server listening on %s", | |
1178 | server_address.str().c_str()); | |
1179 | ||
1180 | /* Process inbound RPCs */ | |
83f6fce7 CH |
1181 | bool ok; |
1182 | void *tag; | |
48c93061 | 1183 | while (true) { |
83f6fce7 CH |
1184 | if (!cq->Next(&tag, &ok)) { |
1185 | grpc_debug("%s: CQ empty exiting", __func__); | |
de800c10 | 1186 | break; |
83f6fce7 | 1187 | } |
de800c10 | 1188 | |
83f6fce7 CH |
1189 | grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag, |
1190 | ok); | |
1191 | ||
48c93061 | 1192 | if (!ok) { |
83f6fce7 CH |
1193 | delete static_cast<RpcStateBase *>(tag); |
1194 | break; | |
1195 | } | |
b680134e CH |
1196 | |
1197 | RpcStateBase *rpc = static_cast<RpcStateBase *>(tag); | |
48c93061 CH |
1198 | if (rpc->get_state() != FINISH) |
1199 | rpc->run(&service, cq.get()); | |
1200 | else { | |
1201 | grpc_debug("%s RPC FINISH -> [delete]", rpc->name); | |
1202 | delete rpc; | |
1203 | } | |
b680134e | 1204 | } |
ec2ac5f2 | 1205 | |
83f6fce7 CH |
1206 | /* This was probably done for us to get here, but let's be safe */ |
1207 | pthread_mutex_lock(&s_server_lock); | |
1208 | grpc_running = false; | |
1209 | if (s_server) { | |
1210 | grpc_debug("%s: shutdown server and CQ", __func__); | |
1211 | server->Shutdown(); | |
1212 | s_server = NULL; | |
1213 | } | |
1214 | pthread_mutex_unlock(&s_server_lock); | |
1215 | ||
1216 | grpc_debug("%s: shutting down CQ", __func__); | |
1217 | cq->Shutdown(); | |
1218 | ||
1219 | grpc_debug("%s: draining the CQ", __func__); | |
1220 | while (cq->Next(&tag, &ok)) { | |
1221 | grpc_debug("%s: drain tag %p", __func__, tag); | |
1222 | delete static_cast<RpcStateBase *>(tag); | |
b680134e | 1223 | } |
ec2ac5f2 | 1224 | |
83f6fce7 | 1225 | zlog_info("%s: exiting from grpc pthread", __func__); |
ec2ac5f2 RW |
1226 | return NULL; |
1227 | } | |
1228 | ||
b680134e CH |
1229 | |
1230 | static int frr_grpc_init(uint port) | |
ec2ac5f2 | 1231 | { |
b680134e CH |
1232 | struct frr_pthread_attr attr = { |
1233 | .start = grpc_pthread_start, | |
1234 | .stop = NULL, | |
1235 | }; | |
1236 | ||
83f6fce7 CH |
1237 | grpc_debug("%s: entered", __func__); |
1238 | ||
0edcb505 | 1239 | fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc"); |
b680134e | 1240 | fpt->data = reinterpret_cast<void *>((intptr_t)port); |
0edcb505 | 1241 | |
ec2ac5f2 | 1242 | /* Create a pthread for gRPC since it runs its own event loop. */ |
0edcb505 | 1243 | if (frr_pthread_run(fpt, NULL) < 0) { |
ec2ac5f2 RW |
1244 | flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s", |
1245 | __func__, safe_strerror(errno)); | |
1246 | return -1; | |
1247 | } | |
ec2ac5f2 RW |
1248 | |
1249 | return 0; | |
1250 | } | |
1251 | ||
1252 | static int frr_grpc_finish(void) | |
1253 | { | |
83f6fce7 | 1254 | grpc_debug("%s: entered", __func__); |
de800c10 | 1255 | |
83f6fce7 CH |
1256 | if (!fpt) |
1257 | return 0; | |
de800c10 | 1258 | |
83f6fce7 CH |
1259 | /* |
1260 | * Shut the server down here in main thread. This will cause the wait on | |
1261 | * the completion queue (cq.Next()) to exit and cleanup everything else. | |
1262 | */ | |
1263 | pthread_mutex_lock(&s_server_lock); | |
1264 | grpc_running = false; | |
1265 | if (s_server) { | |
1266 | grpc_debug("%s: shutdown server", __func__); | |
1267 | s_server->Shutdown(); | |
1268 | s_server = NULL; | |
de800c10 | 1269 | } |
83f6fce7 | 1270 | pthread_mutex_unlock(&s_server_lock); |
de800c10 | 1271 | |
83f6fce7 CH |
1272 | grpc_debug("%s: joining and destroy grpc thread", __func__); |
1273 | pthread_join(fpt->thread, NULL); | |
1274 | frr_pthread_destroy(fpt); | |
79c68195 RZ |
1275 | |
1276 | // Fix protobuf 'memory leaks' during shutdown. | |
1277 | // https://groups.google.com/g/protobuf/c/4y_EmQiCGgs | |
1278 | google::protobuf::ShutdownProtobufLibrary(); | |
1279 | ||
ec2ac5f2 RW |
1280 | return 0; |
1281 | } | |
1282 | ||
20b0a2ed QY |
1283 | /* |
1284 | * This is done this way because module_init and module_late_init are both | |
1285 | * called during daemon pre-fork initialization. Because the GRPC library | |
1286 | * spawns threads internally, we need to delay initializing it until after | |
1287 | * fork. This is done by scheduling this init function as an event task, since | |
1288 | * the event loop doesn't run until after fork. | |
1289 | */ | |
cc9f21da | 1290 | static void frr_grpc_module_very_late_init(struct thread *thread) |
ec2ac5f2 | 1291 | { |
ec2ac5f2 | 1292 | const char *args = THIS_MODULE->load_args; |
b680134e | 1293 | uint port = GRPC_DEFAULT_PORT; |
ec2ac5f2 | 1294 | |
ec2ac5f2 | 1295 | if (args) { |
b680134e CH |
1296 | port = std::stoul(args); |
1297 | if (port < 1024 || port > UINT16_MAX) { | |
ec2ac5f2 | 1298 | flog_err(EC_LIB_GRPC_INIT, |
b680134e CH |
1299 | "%s: port number must be between 1025 and %d", |
1300 | __func__, UINT16_MAX); | |
ec2ac5f2 RW |
1301 | goto error; |
1302 | } | |
1303 | } | |
1304 | ||
b680134e | 1305 | if (frr_grpc_init(port) < 0) |
ec2ac5f2 RW |
1306 | goto error; |
1307 | ||
cc9f21da | 1308 | return; |
69ec5832 | 1309 | |
ec2ac5f2 RW |
1310 | error: |
1311 | flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module"); | |
ec2ac5f2 RW |
1312 | } |
1313 | ||
20b0a2ed QY |
1314 | static int frr_grpc_module_late_init(struct thread_master *tm) |
1315 | { | |
b680134e | 1316 | main_master = tm; |
20b0a2ed | 1317 | hook_register(frr_fini, frr_grpc_finish); |
b680134e | 1318 | thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL); |
20b0a2ed QY |
1319 | return 0; |
1320 | } | |
1321 | ||
ec2ac5f2 RW |
1322 | static int frr_grpc_module_init(void) |
1323 | { | |
1324 | hook_register(frr_late_init, frr_grpc_module_late_init); | |
1325 | ||
1326 | return 0; | |
1327 | } | |
1328 | ||
1329 | FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION, | |
1330 | .description = "FRR gRPC northbound module", | |
b680134e | 1331 | .init = frr_grpc_module_init, ); |