]>
Commit | Line | Data |
---|---|---|
ec2ac5f2 | 1 | // |
c85ecd64 | 2 | // Copyright (c) 2021-2022, LabN Consulting, L.L.C |
ec2ac5f2 RW |
3 | // Copyright (C) 2019 NetDEF, Inc. |
4 | // Renato Westphal | |
5 | // | |
6 | // This program is free software; you can redistribute it and/or modify it | |
7 | // under the terms of the GNU General Public License as published by the Free | |
8 | // Software Foundation; either version 2 of the License, or (at your option) | |
9 | // any later version. | |
10 | // | |
11 | // This program is distributed in the hope that it will be useful, but WITHOUT | |
12 | // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
13 | // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
14 | // more details. | |
15 | // | |
16 | // You should have received a copy of the GNU General Public License along | |
17 | // with this program; see the file COPYING; if not, write to the Free Software | |
18 | // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
19 | // | |
20 | ||
21 | #include <zebra.h> | |
63d12a7d QY |
22 | #include <grpcpp/grpcpp.h> |
23 | #include "grpc/frr-northbound.grpc.pb.h" | |
ec2ac5f2 RW |
24 | |
25 | #include "log.h" | |
26 | #include "libfrr.h" | |
09781197 | 27 | #include "lib/version.h" |
b680134e | 28 | #include "lib/thread.h" |
ec2ac5f2 RW |
29 | #include "command.h" |
30 | #include "lib_errors.h" | |
31 | #include "northbound.h" | |
32 | #include "northbound_db.h" | |
0edcb505 | 33 | #include "frr_pthread.h" |
ec2ac5f2 RW |
34 | |
35 | #include <iostream> | |
36 | #include <sstream> | |
37 | #include <memory> | |
38 | #include <string> | |
39 | ||
ec2ac5f2 RW |
40 | #define GRPC_DEFAULT_PORT 50051 |
41 | ||
42 | /* | |
43 | * NOTE: we can't use the FRR debugging infrastructure here since it uses | |
44 | * atomics and C++ has a different atomics API. Enable gRPC debugging | |
45 | * unconditionally until we figure out a way to solve this problem. | |
46 | */ | |
b680134e CH |
47 | static bool nb_dbg_client_grpc = 0; |
48 | ||
49 | static struct thread_master *main_master; | |
ec2ac5f2 | 50 | |
0edcb505 CS |
51 | static struct frr_pthread *fpt; |
52 | ||
83f6fce7 CH |
53 | static bool grpc_running; |
54 | ||
b680134e CH |
55 | #define grpc_debug(...) \ |
56 | do { \ | |
57 | if (nb_dbg_client_grpc) \ | |
58 | zlog_debug(__VA_ARGS__); \ | |
59 | } while (0) | |
60 | ||
61 | // ------------------------------------------------------ | |
62 | // New Types | |
63 | // ------------------------------------------------------ | |
64 | ||
65 | enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED }; | |
66 | const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"}; | |
67 | ||
68 | struct candidate { | |
69 | uint64_t id; | |
70 | struct nb_config *config; | |
71 | struct nb_transaction *transaction; | |
0edcb505 | 72 | }; |
ec2ac5f2 | 73 | |
b680134e CH |
74 | class Candidates |
75 | { | |
76 | public: | |
77 | ~Candidates(void) | |
78 | { | |
79 | // Delete candidates. | |
80 | for (auto it = _cdb.begin(); it != _cdb.end(); it++) | |
81 | delete_candidate(&it->second); | |
82 | } | |
83 | ||
84 | struct candidate *create_candidate(void) | |
85 | { | |
86 | uint64_t id = ++_next_id; | |
87 | assert(id); // TODO: implement an algorithm for unique reusable | |
88 | // IDs. | |
89 | struct candidate *c = &_cdb[id]; | |
90 | c->id = id; | |
91 | c->config = nb_config_dup(running_config); | |
92 | c->transaction = NULL; | |
93 | ||
94 | return c; | |
95 | } | |
96 | ||
97 | void delete_candidate(struct candidate *c) | |
98 | { | |
99 | char errmsg[BUFSIZ] = {0}; | |
100 | ||
b680134e CH |
101 | nb_config_free(c->config); |
102 | if (c->transaction) | |
103 | nb_candidate_commit_abort(c->transaction, errmsg, | |
104 | sizeof(errmsg)); | |
96d434f8 | 105 | _cdb.erase(c->id); |
b680134e CH |
106 | } |
107 | ||
108 | struct candidate *get_candidate(uint32_t id) | |
109 | { | |
110 | return _cdb.count(id) == 0 ? NULL : &_cdb[id]; | |
111 | } | |
112 | ||
113 | private: | |
114 | uint64_t _next_id = 0; | |
115 | std::map<uint32_t, struct candidate> _cdb; | |
116 | }; | |
ecf9fb30 | 117 | |
ecf9fb30 QY |
118 | class RpcStateBase |
119 | { | |
120 | public: | |
83f6fce7 | 121 | virtual ~RpcStateBase() = default; |
b680134e CH |
122 | virtual CallState doCallback() = 0; |
123 | virtual void do_request(::frr::Northbound::AsyncService *service, | |
83f6fce7 CH |
124 | ::grpc::ServerCompletionQueue *cq, |
125 | bool no_copy) = 0; | |
ecf9fb30 QY |
126 | }; |
127 | ||
b680134e CH |
128 | /* |
129 | * The RPC state class is used to track the execution of an RPC. | |
130 | */ | |
131 | template <typename Q, typename S> class NewRpcState : RpcStateBase | |
ecf9fb30 | 132 | { |
b680134e CH |
133 | typedef void (frr::Northbound::AsyncService::*reqfunc_t)( |
134 | ::grpc::ServerContext *, Q *, | |
135 | ::grpc::ServerAsyncResponseWriter<S> *, | |
136 | ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, | |
137 | void *); | |
138 | typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( | |
139 | ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *, | |
140 | ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, | |
141 | void *); | |
142 | ||
ecf9fb30 | 143 | public: |
b680134e CH |
144 | NewRpcState(Candidates *cdb, reqfunc_t rfunc, |
145 | void (*cb)(NewRpcState<Q, S> *), const char *name) | |
146 | : requestf(rfunc), callback(cb), responder(&ctx), | |
147 | async_responder(&ctx), name(name), cdb(cdb){}; | |
148 | NewRpcState(Candidates *cdb, reqsfunc_t rfunc, | |
149 | void (*cb)(NewRpcState<Q, S> *), const char *name) | |
150 | : requestsf(rfunc), callback(cb), responder(&ctx), | |
151 | async_responder(&ctx), name(name), cdb(cdb){}; | |
152 | ||
153 | CallState doCallback() override | |
154 | { | |
155 | CallState enter_state = this->state; | |
156 | CallState new_state; | |
157 | if (enter_state == FINISH) { | |
158 | grpc_debug("%s RPC FINISH -> DELETED", name); | |
159 | new_state = FINISH; | |
160 | } else { | |
161 | grpc_debug("%s RPC: %s -> PROCESS", name, | |
162 | call_states[this->state]); | |
163 | new_state = PROCESS; | |
164 | } | |
165 | /* | |
166 | * We are either in state CREATE, MORE or FINISH. If CREATE or | |
167 | * MORE move back to PROCESS, otherwise we are cleaning up | |
168 | * (FINISH) so leave it in that state. Run the callback on the | |
169 | * main threadmaster/pthread; and wait for expected transition | |
170 | * from main thread. If transition is to FINISH->DELETED. | |
171 | * delete us. | |
172 | * | |
173 | * We update the state prior to scheduling the callback which | |
174 | * may then update the state in the master pthread. Then we | |
175 | * obtain the lock in the condvar-check-loop as the callback | |
176 | * will be modifying updating the state value. | |
177 | */ | |
178 | this->state = new_state; | |
179 | thread_add_event(main_master, c_callback, (void *)this, 0, | |
180 | NULL); | |
181 | pthread_mutex_lock(&this->cmux); | |
182 | while (this->state == new_state) | |
183 | pthread_cond_wait(&this->cond, &this->cmux); | |
184 | pthread_mutex_unlock(&this->cmux); | |
185 | ||
186 | if (this->state == DELETED) { | |
187 | grpc_debug("%s RPC: -> [DELETED]", name); | |
188 | delete this; | |
189 | return DELETED; | |
190 | } | |
191 | return this->state; | |
192 | } | |
ecf9fb30 | 193 | |
b680134e | 194 | void do_request(::frr::Northbound::AsyncService *service, |
83f6fce7 CH |
195 | ::grpc::ServerCompletionQueue *cq, |
196 | bool no_copy) override | |
ecf9fb30 | 197 | { |
b680134e CH |
198 | grpc_debug("%s, posting a request for: %s", __func__, name); |
199 | if (requestf) { | |
200 | NewRpcState<Q, S> *copy = | |
83f6fce7 CH |
201 | no_copy ? this |
202 | : new NewRpcState(cdb, requestf, | |
203 | callback, name); | |
b680134e CH |
204 | (service->*requestf)(©->ctx, ©->request, |
205 | ©->responder, cq, cq, copy); | |
206 | } else { | |
207 | NewRpcState<Q, S> *copy = | |
83f6fce7 CH |
208 | no_copy ? this |
209 | : new NewRpcState(cdb, requestsf, | |
210 | callback, name); | |
b680134e CH |
211 | (service->*requestsf)(©->ctx, ©->request, |
212 | ©->async_responder, cq, cq, | |
213 | copy); | |
214 | } | |
215 | } | |
216 | ||
217 | ||
cc9f21da | 218 | static void c_callback(struct thread *thread) |
b680134e CH |
219 | { |
220 | auto _tag = static_cast<NewRpcState<Q, S> *>(thread->arg); | |
221 | /* | |
222 | * We hold the lock until the callback finishes and has updated | |
223 | * _tag->state, then we signal done and release. | |
224 | */ | |
225 | pthread_mutex_lock(&_tag->cmux); | |
226 | ||
227 | CallState enter_state = _tag->state; | |
228 | grpc_debug("%s RPC running on main thread", _tag->name); | |
229 | ||
230 | _tag->callback(_tag); | |
231 | ||
232 | grpc_debug("%s RPC: %s -> %s", _tag->name, | |
233 | call_states[enter_state], call_states[_tag->state]); | |
234 | ||
235 | pthread_cond_signal(&_tag->cond); | |
236 | pthread_mutex_unlock(&_tag->cmux); | |
cc9f21da | 237 | return; |
ecf9fb30 QY |
238 | } |
239 | ||
b680134e | 240 | const char *name; |
ecf9fb30 QY |
241 | grpc::ServerContext ctx; |
242 | Q request; | |
243 | S response; | |
244 | grpc::ServerAsyncResponseWriter<S> responder; | |
245 | grpc::ServerAsyncWriter<S> async_responder; | |
246 | ||
b680134e CH |
247 | Candidates *cdb; |
248 | void (*callback)(NewRpcState<Q, S> *); | |
c85ecd64 CH |
249 | reqfunc_t requestf = NULL; |
250 | reqsfunc_t requestsf = NULL; | |
ecf9fb30 | 251 | |
b680134e CH |
252 | pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; |
253 | pthread_cond_t cond = PTHREAD_COND_INITIALIZER; | |
c85ecd64 | 254 | void *context = 0; |
b680134e CH |
255 | |
256 | CallState state = CREATE; | |
ecf9fb30 QY |
257 | }; |
258 | ||
b680134e CH |
259 | // ------------------------------------------------------ |
260 | // Utility Functions | |
261 | // ------------------------------------------------------ | |
ecf9fb30 | 262 | |
b680134e CH |
263 | static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding) |
264 | { | |
265 | switch (encoding) { | |
266 | case frr::JSON: | |
267 | return LYD_JSON; | |
268 | case frr::XML: | |
269 | return LYD_XML; | |
270 | default: | |
271 | flog_err(EC_LIB_DEVELOPMENT, | |
272 | "%s: unknown data encoding format (%u)", __func__, | |
273 | encoding); | |
274 | exit(1); | |
275 | } | |
276 | } | |
ecf9fb30 | 277 | |
b680134e | 278 | static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path, |
fe095adc | 279 | const char *value) |
ec2ac5f2 | 280 | { |
fe095adc CH |
281 | LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value, |
282 | LYD_NEW_PATH_UPDATE, &dnode); | |
b680134e CH |
283 | if (err != LY_SUCCESS) { |
284 | flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s", | |
285 | __func__, ly_errmsg(ly_native_ctx)); | |
286 | return -1; | |
ec2ac5f2 RW |
287 | } |
288 | ||
b680134e CH |
289 | return 0; |
290 | } | |
291 | ||
292 | static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path) | |
293 | { | |
294 | dnode = yang_dnode_get(dnode, path.c_str()); | |
295 | if (!dnode) | |
296 | return -1; | |
297 | ||
298 | lyd_free_tree(dnode); | |
299 | ||
300 | return 0; | |
301 | } | |
302 | ||
303 | static LY_ERR data_tree_from_dnode(frr::DataTree *dt, | |
304 | const struct lyd_node *dnode, | |
305 | LYD_FORMAT lyd_format, bool with_defaults) | |
306 | { | |
307 | char *strp; | |
308 | int options = 0; | |
309 | ||
310 | SET_FLAG(options, LYD_PRINT_WITHSIBLINGS); | |
311 | if (with_defaults) | |
312 | SET_FLAG(options, LYD_PRINT_WD_ALL); | |
313 | else | |
314 | SET_FLAG(options, LYD_PRINT_WD_TRIM); | |
315 | ||
316 | LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options); | |
317 | if (err == LY_SUCCESS) { | |
318 | if (strp) { | |
319 | dt->set_data(strp); | |
320 | free(strp); | |
321 | } | |
ec2ac5f2 | 322 | } |
b680134e CH |
323 | return err; |
324 | } | |
ec2ac5f2 | 325 | |
b680134e CH |
326 | static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt, |
327 | bool config_only) | |
328 | { | |
329 | struct lyd_node *dnode; | |
330 | int options, opt2; | |
331 | LY_ERR err; | |
332 | ||
333 | if (config_only) { | |
334 | options = LYD_PARSE_NO_STATE; | |
335 | opt2 = LYD_VALIDATE_NO_STATE; | |
336 | } else { | |
337 | options = LYD_PARSE_STRICT; | |
338 | opt2 = 0; | |
339 | } | |
340 | ||
341 | err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(), | |
342 | encoding2lyd_format(dt->encoding()), options, | |
343 | opt2, &dnode); | |
344 | if (err != LY_SUCCESS) { | |
345 | flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s", | |
346 | __func__, ly_errmsg(ly_native_ctx)); | |
347 | } | |
348 | return dnode; | |
349 | } | |
350 | ||
351 | static struct lyd_node *get_dnode_config(const std::string &path) | |
352 | { | |
353 | struct lyd_node *dnode; | |
354 | ||
0f538858 RZ |
355 | if (!yang_dnode_exists(running_config->dnode, |
356 | path.empty() ? NULL : path.c_str())) | |
357 | return NULL; | |
358 | ||
b680134e CH |
359 | dnode = yang_dnode_get(running_config->dnode, |
360 | path.empty() ? NULL : path.c_str()); | |
361 | if (dnode) | |
362 | dnode = yang_dnode_dup(dnode); | |
363 | ||
364 | return dnode; | |
365 | } | |
366 | ||
367 | static int get_oper_data_cb(const struct lysc_node *snode, | |
368 | struct yang_translator *translator, | |
369 | struct yang_data *data, void *arg) | |
370 | { | |
371 | struct lyd_node *dnode = static_cast<struct lyd_node *>(arg); | |
372 | int ret = yang_dnode_edit(dnode, data->xpath, data->value); | |
373 | yang_data_free(data); | |
374 | ||
375 | return (ret == 0) ? NB_OK : NB_ERR; | |
376 | } | |
377 | ||
378 | static struct lyd_node *get_dnode_state(const std::string &path) | |
379 | { | |
380 | struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false); | |
381 | if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode) | |
382 | != NB_OK) { | |
383 | yang_dnode_free(dnode); | |
384 | return NULL; | |
385 | } | |
386 | ||
387 | return dnode; | |
388 | } | |
389 | ||
390 | static grpc::Status get_path(frr::DataTree *dt, const std::string &path, | |
391 | int type, LYD_FORMAT lyd_format, | |
392 | bool with_defaults) | |
393 | { | |
394 | struct lyd_node *dnode_config = NULL; | |
395 | struct lyd_node *dnode_state = NULL; | |
396 | struct lyd_node *dnode_final; | |
397 | ||
398 | // Configuration data. | |
399 | if (type == frr::GetRequest_DataType_ALL | |
400 | || type == frr::GetRequest_DataType_CONFIG) { | |
401 | dnode_config = get_dnode_config(path); | |
402 | if (!dnode_config) | |
403 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
404 | "Data path not found"); | |
405 | } | |
406 | ||
407 | // Operational data. | |
408 | if (type == frr::GetRequest_DataType_ALL | |
409 | || type == frr::GetRequest_DataType_STATE) { | |
410 | dnode_state = get_dnode_state(path); | |
411 | if (!dnode_state) { | |
412 | if (dnode_config) | |
413 | yang_dnode_free(dnode_config); | |
414 | return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
415 | "Failed to fetch operational data"); | |
416 | } | |
417 | } | |
418 | ||
419 | switch (type) { | |
420 | case frr::GetRequest_DataType_ALL: | |
421 | // | |
422 | // Combine configuration and state data into a single | |
423 | // dnode. | |
424 | // | |
425 | if (lyd_merge_siblings(&dnode_state, dnode_config, | |
426 | LYD_MERGE_DESTRUCT) | |
427 | != LY_SUCCESS) { | |
428 | yang_dnode_free(dnode_state); | |
429 | yang_dnode_free(dnode_config); | |
430 | return grpc::Status( | |
431 | grpc::StatusCode::INTERNAL, | |
432 | "Failed to merge configuration and state data", | |
433 | ly_errmsg(ly_native_ctx)); | |
ecf9fb30 | 434 | } |
b680134e CH |
435 | |
436 | dnode_final = dnode_state; | |
437 | break; | |
438 | case frr::GetRequest_DataType_CONFIG: | |
439 | dnode_final = dnode_config; | |
440 | break; | |
441 | case frr::GetRequest_DataType_STATE: | |
442 | dnode_final = dnode_state; | |
443 | break; | |
ecf9fb30 QY |
444 | } |
445 | ||
b680134e CH |
446 | // Validate data to create implicit default nodes if necessary. |
447 | int validate_opts = 0; | |
448 | if (type == frr::GetRequest_DataType_CONFIG) | |
449 | validate_opts = LYD_VALIDATE_NO_STATE; | |
450 | else | |
451 | validate_opts = 0; | |
452 | ||
453 | LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx, | |
454 | validate_opts, NULL); | |
455 | ||
456 | if (err) | |
457 | flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s", | |
458 | __func__, ly_errmsg(ly_native_ctx)); | |
459 | // Dump data using the requested format. | |
460 | if (!err) | |
461 | err = data_tree_from_dnode(dt, dnode_final, lyd_format, | |
462 | with_defaults); | |
463 | yang_dnode_free(dnode_final); | |
464 | if (err) | |
465 | return grpc::Status(grpc::StatusCode::INTERNAL, | |
466 | "Failed to dump data"); | |
467 | return grpc::Status::OK; | |
468 | } | |
469 | ||
470 | ||
471 | // ------------------------------------------------------ | |
472 | // RPC Callback Functions: run on main thread | |
473 | // ------------------------------------------------------ | |
474 | ||
475 | void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest, | |
ecf9fb30 | 476 | frr::GetCapabilitiesResponse> *tag) |
b680134e CH |
477 | { |
478 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
479 | ||
480 | if (tag->state == FINISH) { | |
481 | tag->state = DELETED; | |
482 | return; | |
483 | } | |
484 | ||
485 | // Response: string frr_version = 1; | |
486 | tag->response.set_frr_version(FRR_VERSION); | |
487 | ||
488 | // Response: bool rollback_support = 2; | |
ec2ac5f2 | 489 | #ifdef HAVE_CONFIG_ROLLBACKS |
b680134e | 490 | tag->response.set_rollback_support(true); |
ec2ac5f2 | 491 | #else |
b680134e | 492 | tag->response.set_rollback_support(false); |
ec2ac5f2 | 493 | #endif |
b680134e CH |
494 | // Response: repeated ModuleData supported_modules = 3; |
495 | struct yang_module *module; | |
496 | RB_FOREACH (module, yang_modules, &yang_modules) { | |
497 | auto m = tag->response.add_supported_modules(); | |
498 | ||
499 | m->set_name(module->name); | |
500 | if (module->info->revision) | |
501 | m->set_revision(module->info->revision); | |
502 | m->set_organization(module->info->org); | |
503 | } | |
ec2ac5f2 | 504 | |
b680134e CH |
505 | // Response: repeated Encoding supported_encodings = 4; |
506 | tag->response.add_supported_encodings(frr::JSON); | |
507 | tag->response.add_supported_encodings(frr::XML); | |
ec2ac5f2 | 508 | |
b680134e CH |
509 | /* Should we do this in the async process call? */ |
510 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); | |
ec2ac5f2 | 511 | |
b680134e CH |
512 | /* Indicate we are done. */ |
513 | tag->state = FINISH; | |
514 | } | |
ec2ac5f2 | 515 | |
b680134e CH |
516 | void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag) |
517 | { | |
518 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
519 | ||
520 | if (tag->state == FINISH) { | |
521 | delete static_cast<std::list<std::string> *>(tag->context); | |
522 | tag->state = DELETED; | |
523 | return; | |
ec2ac5f2 RW |
524 | } |
525 | ||
b680134e CH |
526 | if (!tag->context) { |
527 | /* Creating, first time called for this RPC */ | |
528 | auto mypaths = new std::list<std::string>(); | |
529 | tag->context = mypaths; | |
530 | auto paths = tag->request.path(); | |
531 | for (const std::string &path : paths) { | |
532 | mypaths->push_back(std::string(path)); | |
ecf9fb30 | 533 | } |
b680134e | 534 | } |
ecf9fb30 | 535 | |
b680134e CH |
536 | // Request: DataType type = 1; |
537 | int type = tag->request.type(); | |
538 | // Request: Encoding encoding = 2; | |
539 | frr::Encoding encoding = tag->request.encoding(); | |
540 | // Request: bool with_defaults = 3; | |
541 | bool with_defaults = tag->request.with_defaults(); | |
542 | ||
543 | auto mypathps = static_cast<std::list<std::string> *>(tag->context); | |
544 | if (mypathps->empty()) { | |
545 | tag->async_responder.Finish(grpc::Status::OK, tag); | |
546 | tag->state = FINISH; | |
547 | return; | |
ec2ac5f2 RW |
548 | } |
549 | ||
b680134e CH |
550 | frr::GetResponse response; |
551 | grpc::Status status; | |
552 | ||
553 | // Response: int64 timestamp = 1; | |
554 | response.set_timestamp(time(NULL)); | |
555 | ||
556 | // Response: DataTree data = 2; | |
557 | auto *data = response.mutable_data(); | |
558 | data->set_encoding(tag->request.encoding()); | |
559 | status = get_path(data, mypathps->back().c_str(), type, | |
560 | encoding2lyd_format(encoding), with_defaults); | |
561 | ||
562 | if (!status.ok()) { | |
563 | tag->async_responder.WriteAndFinish( | |
564 | response, grpc::WriteOptions(), status, tag); | |
565 | tag->state = FINISH; | |
566 | return; | |
567 | } | |
568 | ||
569 | mypathps->pop_back(); | |
570 | if (mypathps->empty()) { | |
571 | tag->async_responder.WriteAndFinish( | |
572 | response, grpc::WriteOptions(), grpc::Status::OK, tag); | |
573 | tag->state = FINISH; | |
574 | } else { | |
575 | tag->async_responder.Write(response, tag); | |
576 | tag->state = MORE; | |
577 | } | |
578 | } | |
579 | ||
580 | void HandleUnaryCreateCandidate(NewRpcState<frr::CreateCandidateRequest, | |
ecf9fb30 | 581 | frr::CreateCandidateResponse> *tag) |
b680134e CH |
582 | { |
583 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
ec2ac5f2 | 584 | |
b680134e CH |
585 | if (tag->state == FINISH) { |
586 | tag->state = DELETED; | |
587 | return; | |
588 | } | |
ec2ac5f2 | 589 | |
b680134e CH |
590 | struct candidate *candidate = tag->cdb->create_candidate(); |
591 | if (!candidate) { | |
592 | tag->responder.Finish( | |
593 | tag->response, | |
594 | grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, | |
595 | "Can't create candidate configuration"), | |
596 | tag); | |
597 | } else { | |
598 | tag->response.set_candidate_id(candidate->id); | |
599 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); | |
ec2ac5f2 RW |
600 | } |
601 | ||
b680134e CH |
602 | tag->state = FINISH; |
603 | } | |
604 | ||
605 | void HandleUnaryDeleteCandidate(NewRpcState<frr::DeleteCandidateRequest, | |
ecf9fb30 | 606 | frr::DeleteCandidateResponse> *tag) |
b680134e CH |
607 | { |
608 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
609 | ||
610 | if (tag->state == FINISH) { | |
611 | tag->state = DELETED; | |
612 | return; | |
ecf9fb30 | 613 | } |
ec2ac5f2 | 614 | |
b680134e CH |
615 | // Request: uint32 candidate_id = 1; |
616 | uint32_t candidate_id = tag->request.candidate_id(); | |
617 | ||
618 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
619 | ||
620 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); | |
621 | if (!candidate) { | |
622 | tag->responder.Finish( | |
623 | tag->response, | |
624 | grpc::Status(grpc::StatusCode::NOT_FOUND, | |
625 | "candidate configuration not found"), | |
626 | tag); | |
627 | } else { | |
628 | tag->cdb->delete_candidate(candidate); | |
629 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); | |
630 | } | |
631 | tag->state = FINISH; | |
632 | } | |
633 | ||
634 | void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest, | |
ecf9fb30 | 635 | frr::UpdateCandidateResponse> *tag) |
b680134e CH |
636 | { |
637 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
ecf9fb30 | 638 | |
b680134e CH |
639 | if (tag->state == FINISH) { |
640 | tag->state = DELETED; | |
641 | return; | |
642 | } | |
ec2ac5f2 | 643 | |
b680134e CH |
644 | // Request: uint32 candidate_id = 1; |
645 | uint32_t candidate_id = tag->request.candidate_id(); | |
646 | ||
647 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
648 | ||
649 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); | |
650 | ||
651 | if (!candidate) | |
652 | tag->responder.Finish( | |
653 | tag->response, | |
654 | grpc::Status(grpc::StatusCode::NOT_FOUND, | |
655 | "candidate configuration not found"), | |
656 | tag); | |
657 | else if (candidate->transaction) | |
658 | tag->responder.Finish( | |
659 | tag->response, | |
660 | grpc::Status( | |
661 | grpc::StatusCode::FAILED_PRECONDITION, | |
662 | "candidate is in the middle of a transaction"), | |
663 | tag); | |
664 | else if (nb_candidate_update(candidate->config) != NB_OK) | |
665 | tag->responder.Finish( | |
666 | tag->response, | |
667 | grpc::Status( | |
668 | grpc::StatusCode::INTERNAL, | |
669 | "failed to update candidate configuration"), | |
670 | tag); | |
671 | ||
672 | else | |
673 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); | |
674 | ||
675 | tag->state = FINISH; | |
676 | } | |
677 | ||
678 | void HandleUnaryEditCandidate( | |
679 | NewRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> *tag) | |
680 | { | |
681 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
682 | ||
683 | if (tag->state == FINISH) { | |
684 | tag->state = DELETED; | |
685 | return; | |
ec2ac5f2 RW |
686 | } |
687 | ||
b680134e CH |
688 | // Request: uint32 candidate_id = 1; |
689 | uint32_t candidate_id = tag->request.candidate_id(); | |
ec2ac5f2 | 690 | |
b680134e | 691 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); |
ec2ac5f2 | 692 | |
b680134e CH |
693 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); |
694 | ||
695 | if (!candidate) { | |
696 | tag->responder.Finish( | |
697 | tag->response, | |
698 | grpc::Status(grpc::StatusCode::NOT_FOUND, | |
699 | "candidate configuration not found"), | |
700 | tag); | |
701 | tag->state = FINISH; | |
702 | return; | |
ec2ac5f2 RW |
703 | } |
704 | ||
b680134e CH |
705 | struct nb_config *candidate_tmp = nb_config_dup(candidate->config); |
706 | ||
707 | auto pvs = tag->request.update(); | |
708 | for (const frr::PathValue &pv : pvs) { | |
fe095adc CH |
709 | if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), |
710 | pv.value().c_str()) != 0) { | |
b680134e CH |
711 | nb_config_free(candidate_tmp); |
712 | ||
713 | tag->responder.Finish( | |
714 | tag->response, | |
715 | grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
716 | "Failed to update \"" + pv.path() | |
717 | + "\""), | |
718 | tag); | |
719 | ||
ecf9fb30 | 720 | tag->state = FINISH; |
b680134e | 721 | return; |
ecf9fb30 | 722 | } |
ec2ac5f2 RW |
723 | } |
724 | ||
b680134e CH |
725 | pvs = tag->request.delete_(); |
726 | for (const frr::PathValue &pv : pvs) { | |
727 | if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) { | |
728 | nb_config_free(candidate_tmp); | |
729 | tag->responder.Finish( | |
730 | tag->response, | |
731 | grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
732 | "Failed to remove \"" + pv.path() | |
733 | + "\""), | |
734 | tag); | |
ecf9fb30 | 735 | tag->state = FINISH; |
b680134e | 736 | return; |
ec2ac5f2 | 737 | } |
ec2ac5f2 RW |
738 | } |
739 | ||
b680134e CH |
740 | // No errors, accept all changes. |
741 | nb_config_replace(candidate->config, candidate_tmp, false); | |
742 | ||
743 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); | |
744 | ||
745 | tag->state = FINISH; | |
746 | } | |
747 | ||
748 | void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest, | |
749 | frr::LoadToCandidateResponse> *tag) | |
750 | { | |
751 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
752 | ||
753 | if (tag->state == FINISH) { | |
754 | tag->state = DELETED; | |
755 | return; | |
ec2ac5f2 RW |
756 | } |
757 | ||
b680134e CH |
758 | // Request: uint32 candidate_id = 1; |
759 | uint32_t candidate_id = tag->request.candidate_id(); | |
760 | ||
761 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
762 | ||
763 | // Request: LoadType type = 2; | |
764 | int load_type = tag->request.type(); | |
765 | // Request: DataTree config = 3; | |
766 | auto config = tag->request.config(); | |
767 | ||
768 | ||
769 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); | |
770 | ||
771 | if (!candidate) { | |
772 | tag->responder.Finish( | |
773 | tag->response, | |
774 | grpc::Status(grpc::StatusCode::NOT_FOUND, | |
775 | "candidate configuration not found"), | |
776 | tag); | |
777 | tag->state = FINISH; | |
778 | return; | |
ec2ac5f2 RW |
779 | } |
780 | ||
b680134e CH |
781 | struct lyd_node *dnode = dnode_from_data_tree(&config, true); |
782 | if (!dnode) { | |
783 | tag->responder.Finish( | |
784 | tag->response, | |
785 | grpc::Status(grpc::StatusCode::INTERNAL, | |
786 | "Failed to parse the configuration"), | |
787 | tag); | |
788 | tag->state = FINISH; | |
789 | return; | |
ecf9fb30 | 790 | } |
ec2ac5f2 | 791 | |
b680134e CH |
792 | struct nb_config *loaded_config = nb_config_new(dnode); |
793 | ||
794 | if (load_type == frr::LoadToCandidateRequest::REPLACE) | |
795 | nb_config_replace(candidate->config, loaded_config, false); | |
796 | else if (nb_config_merge(candidate->config, loaded_config, false) | |
797 | != NB_OK) { | |
798 | tag->responder.Finish( | |
799 | tag->response, | |
800 | grpc::Status( | |
801 | grpc::StatusCode::INTERNAL, | |
802 | "Failed to merge the loaded configuration"), | |
803 | tag); | |
804 | tag->state = FINISH; | |
805 | return; | |
ec2ac5f2 RW |
806 | } |
807 | ||
b680134e CH |
808 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); |
809 | tag->state = FINISH; | |
810 | } | |
811 | ||
812 | void HandleUnaryCommit( | |
813 | NewRpcState<frr::CommitRequest, frr::CommitResponse> *tag) | |
814 | { | |
815 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
816 | ||
817 | if (tag->state == FINISH) { | |
818 | tag->state = DELETED; | |
819 | return; | |
820 | } | |
ec2ac5f2 | 821 | |
b680134e CH |
822 | // Request: uint32 candidate_id = 1; |
823 | uint32_t candidate_id = tag->request.candidate_id(); | |
824 | ||
825 | grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); | |
826 | ||
827 | // Request: Phase phase = 2; | |
828 | int phase = tag->request.phase(); | |
829 | // Request: string comment = 3; | |
830 | const std::string comment = tag->request.comment(); | |
831 | ||
832 | // Find candidate configuration. | |
833 | struct candidate *candidate = tag->cdb->get_candidate(candidate_id); | |
834 | if (!candidate) { | |
835 | tag->responder.Finish( | |
836 | tag->response, | |
837 | grpc::Status(grpc::StatusCode::NOT_FOUND, | |
838 | "candidate configuration not found"), | |
839 | tag); | |
840 | tag->state = FINISH; | |
841 | return; | |
842 | } | |
843 | ||
844 | int ret = NB_OK; | |
845 | uint32_t transaction_id = 0; | |
846 | ||
847 | // Check for misuse of the two-phase commit protocol. | |
848 | switch (phase) { | |
849 | case frr::CommitRequest::PREPARE: | |
850 | case frr::CommitRequest::ALL: | |
851 | if (candidate->transaction) { | |
852 | tag->responder.Finish( | |
853 | tag->response, | |
854 | grpc::Status( | |
855 | grpc::StatusCode::FAILED_PRECONDITION, | |
856 | "candidate is in the middle of a transaction"), | |
857 | tag); | |
ecf9fb30 | 858 | tag->state = FINISH; |
b680134e | 859 | return; |
ecf9fb30 | 860 | } |
b680134e CH |
861 | break; |
862 | case frr::CommitRequest::ABORT: | |
863 | case frr::CommitRequest::APPLY: | |
864 | if (!candidate->transaction) { | |
865 | tag->responder.Finish( | |
866 | tag->response, | |
867 | grpc::Status( | |
868 | grpc::StatusCode::FAILED_PRECONDITION, | |
869 | "no transaction in progress"), | |
870 | tag); | |
871 | tag->state = FINISH; | |
872 | return; | |
ec2ac5f2 | 873 | } |
b680134e CH |
874 | break; |
875 | default: | |
876 | break; | |
ec2ac5f2 RW |
877 | } |
878 | ||
ecf9fb30 | 879 | |
b680134e CH |
880 | // Execute the user request. |
881 | struct nb_context context = {}; | |
882 | context.client = NB_CLIENT_GRPC; | |
883 | char errmsg[BUFSIZ] = {0}; | |
884 | ||
885 | switch (phase) { | |
886 | case frr::CommitRequest::VALIDATE: | |
887 | grpc_debug("`-> Performing VALIDATE"); | |
888 | ret = nb_candidate_validate(&context, candidate->config, errmsg, | |
889 | sizeof(errmsg)); | |
890 | break; | |
891 | case frr::CommitRequest::PREPARE: | |
892 | grpc_debug("`-> Performing PREPARE"); | |
893 | ret = nb_candidate_commit_prepare( | |
894 | &context, candidate->config, comment.c_str(), | |
895 | &candidate->transaction, errmsg, sizeof(errmsg)); | |
896 | break; | |
897 | case frr::CommitRequest::ABORT: | |
898 | grpc_debug("`-> Performing ABORT"); | |
899 | nb_candidate_commit_abort(candidate->transaction, errmsg, | |
900 | sizeof(errmsg)); | |
901 | break; | |
902 | case frr::CommitRequest::APPLY: | |
903 | grpc_debug("`-> Performing APPLY"); | |
904 | nb_candidate_commit_apply(candidate->transaction, true, | |
905 | &transaction_id, errmsg, | |
906 | sizeof(errmsg)); | |
907 | break; | |
908 | case frr::CommitRequest::ALL: | |
909 | grpc_debug("`-> Performing ALL"); | |
910 | ret = nb_candidate_commit(&context, candidate->config, true, | |
911 | comment.c_str(), &transaction_id, | |
912 | errmsg, sizeof(errmsg)); | |
913 | break; | |
914 | } | |
ec2ac5f2 | 915 | |
b680134e CH |
916 | // Map northbound error codes to gRPC status codes. |
917 | grpc::Status status; | |
918 | switch (ret) { | |
919 | case NB_OK: | |
920 | status = grpc::Status::OK; | |
921 | break; | |
922 | case NB_ERR_NO_CHANGES: | |
923 | status = grpc::Status(grpc::StatusCode::ABORTED, errmsg); | |
924 | break; | |
925 | case NB_ERR_LOCKED: | |
926 | status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg); | |
927 | break; | |
928 | case NB_ERR_VALIDATION: | |
929 | status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
930 | errmsg); | |
931 | break; | |
932 | case NB_ERR_RESOURCE: | |
933 | status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, | |
934 | errmsg); | |
935 | break; | |
936 | case NB_ERR: | |
937 | default: | |
938 | status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg); | |
939 | break; | |
940 | } | |
ec2ac5f2 | 941 | |
b680134e CH |
942 | grpc_debug("`-> Result: %s (message: '%s')", |
943 | nb_err_name((enum nb_error)ret), errmsg); | |
944 | ||
945 | if (ret == NB_OK) { | |
946 | // Response: uint32 transaction_id = 1; | |
947 | if (transaction_id) | |
948 | tag->response.set_transaction_id(transaction_id); | |
ec2ac5f2 | 949 | } |
b680134e CH |
950 | if (strlen(errmsg) > 0) |
951 | tag->response.set_error_message(errmsg); | |
ec2ac5f2 | 952 | |
b680134e CH |
953 | tag->responder.Finish(tag->response, status, tag); |
954 | tag->state = FINISH; | |
955 | } | |
ec2ac5f2 | 956 | |
b680134e CH |
957 | void HandleUnaryLockConfig( |
958 | NewRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) | |
959 | { | |
960 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
ec2ac5f2 | 961 | |
b680134e CH |
962 | if (tag->state == FINISH) { |
963 | tag->state = DELETED; | |
964 | return; | |
ec2ac5f2 RW |
965 | } |
966 | ||
b680134e CH |
967 | if (nb_running_lock(NB_CLIENT_GRPC, NULL)) { |
968 | tag->responder.Finish( | |
969 | tag->response, | |
970 | grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, | |
971 | "running configuration is locked already"), | |
972 | tag); | |
973 | } else { | |
974 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); | |
ec2ac5f2 | 975 | } |
b680134e CH |
976 | tag->state = FINISH; |
977 | } | |
ec2ac5f2 | 978 | |
b680134e CH |
979 | void HandleUnaryUnlockConfig( |
980 | NewRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag) | |
981 | { | |
982 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
ec2ac5f2 | 983 | |
b680134e CH |
984 | if (tag->state == FINISH) { |
985 | tag->state = DELETED; | |
986 | return; | |
ec2ac5f2 RW |
987 | } |
988 | ||
b680134e CH |
989 | if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) { |
990 | tag->responder.Finish( | |
991 | tag->response, | |
992 | grpc::Status( | |
993 | grpc::StatusCode::FAILED_PRECONDITION, | |
994 | "failed to unlock the running configuration"), | |
995 | tag); | |
996 | } else { | |
997 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); | |
998 | } | |
999 | tag->state = FINISH; | |
1000 | } | |
ec2ac5f2 | 1001 | |
b680134e CH |
1002 | static void list_transactions_cb(void *arg, int transaction_id, |
1003 | const char *client_name, const char *date, | |
1004 | const char *comment) | |
1005 | { | |
1006 | auto list = static_cast<std::list< | |
1007 | std::tuple<int, std::string, std::string, std::string>> *>(arg); | |
1008 | list->push_back( | |
1009 | std::make_tuple(transaction_id, std::string(client_name), | |
1010 | std::string(date), std::string(comment))); | |
1011 | } | |
1012 | ||
1013 | void HandleStreamingListTransactions( | |
1014 | NewRpcState<frr::ListTransactionsRequest, frr::ListTransactionsResponse> | |
1015 | *tag) | |
1016 | { | |
1017 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
1018 | ||
1019 | if (tag->state == FINISH) { | |
1020 | delete static_cast<std::list<std::tuple< | |
1021 | int, std::string, std::string, std::string>> *>( | |
1022 | tag->context); | |
1023 | tag->state = DELETED; | |
1024 | return; | |
ec2ac5f2 RW |
1025 | } |
1026 | ||
b680134e CH |
1027 | if (!tag->context) { |
1028 | /* Creating, first time called for this RPC */ | |
1029 | auto new_list = | |
1030 | new std::list<std::tuple<int, std::string, std::string, | |
1031 | std::string>>(); | |
1032 | tag->context = new_list; | |
1033 | nb_db_transactions_iterate(list_transactions_cb, tag->context); | |
1034 | ||
1035 | new_list->push_back(std::make_tuple( | |
1036 | 0xFFFF, std::string("fake client"), | |
1037 | std::string("fake date"), std::string("fake comment"))); | |
1038 | new_list->push_back( | |
1039 | std::make_tuple(0xFFFE, std::string("fake client2"), | |
1040 | std::string("fake date"), | |
1041 | std::string("fake comment2"))); | |
ec2ac5f2 RW |
1042 | } |
1043 | ||
b680134e CH |
1044 | auto list = static_cast<std::list< |
1045 | std::tuple<int, std::string, std::string, std::string>> *>( | |
1046 | tag->context); | |
af1b88e9 | 1047 | |
b680134e CH |
1048 | if (list->empty()) { |
1049 | tag->async_responder.Finish(grpc::Status::OK, tag); | |
1050 | tag->state = FINISH; | |
1051 | return; | |
ec2ac5f2 RW |
1052 | } |
1053 | ||
b680134e | 1054 | auto item = list->back(); |
ec2ac5f2 | 1055 | |
b680134e | 1056 | frr::ListTransactionsResponse response; |
ec2ac5f2 | 1057 | |
b680134e CH |
1058 | // Response: uint32 id = 1; |
1059 | response.set_id(std::get<0>(item)); | |
ec2ac5f2 | 1060 | |
b680134e CH |
1061 | // Response: string client = 2; |
1062 | response.set_client(std::get<1>(item).c_str()); | |
1063 | ||
1064 | // Response: string date = 3; | |
1065 | response.set_date(std::get<2>(item).c_str()); | |
1066 | ||
1067 | // Response: string comment = 4; | |
1068 | response.set_comment(std::get<3>(item).c_str()); | |
ec2ac5f2 | 1069 | |
b680134e CH |
1070 | list->pop_back(); |
1071 | if (list->empty()) { | |
1072 | tag->async_responder.WriteAndFinish( | |
1073 | response, grpc::WriteOptions(), grpc::Status::OK, tag); | |
1074 | tag->state = FINISH; | |
1075 | } else { | |
1076 | tag->async_responder.Write(response, tag); | |
1077 | tag->state = MORE; | |
ec2ac5f2 | 1078 | } |
b680134e | 1079 | } |
ec2ac5f2 | 1080 | |
b680134e CH |
1081 | void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest, |
1082 | frr::GetTransactionResponse> *tag) | |
1083 | { | |
1084 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
ec2ac5f2 | 1085 | |
b680134e CH |
1086 | if (tag->state == FINISH) { |
1087 | tag->state = DELETED; | |
1088 | return; | |
1089 | } | |
ec2ac5f2 | 1090 | |
b680134e CH |
1091 | // Request: uint32 transaction_id = 1; |
1092 | uint32_t transaction_id = tag->request.transaction_id(); | |
1093 | // Request: Encoding encoding = 2; | |
1094 | frr::Encoding encoding = tag->request.encoding(); | |
1095 | // Request: bool with_defaults = 3; | |
1096 | bool with_defaults = tag->request.with_defaults(); | |
1097 | ||
1098 | grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__, | |
1099 | transaction_id, encoding); | |
1100 | ||
1101 | struct nb_config *nb_config; | |
1102 | ||
1103 | // Load configuration from the transactions database. | |
1104 | nb_config = nb_db_transaction_load(transaction_id); | |
1105 | if (!nb_config) { | |
1106 | tag->responder.Finish( | |
1107 | tag->response, | |
1108 | grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
1109 | "Transaction not found"), | |
1110 | tag); | |
1111 | tag->state = FINISH; | |
1112 | return; | |
1113 | } | |
ec2ac5f2 | 1114 | |
b680134e CH |
1115 | // Response: DataTree config = 1; |
1116 | auto config = tag->response.mutable_config(); | |
1117 | config->set_encoding(encoding); | |
1118 | ||
1119 | // Dump data using the requested format. | |
1120 | if (data_tree_from_dnode(config, nb_config->dnode, | |
1121 | encoding2lyd_format(encoding), with_defaults) | |
1122 | != 0) { | |
1123 | nb_config_free(nb_config); | |
1124 | tag->responder.Finish(tag->response, | |
1125 | grpc::Status(grpc::StatusCode::INTERNAL, | |
1126 | "Failed to dump data"), | |
1127 | tag); | |
1128 | tag->state = FINISH; | |
1129 | return; | |
ec2ac5f2 RW |
1130 | } |
1131 | ||
b680134e | 1132 | nb_config_free(nb_config); |
ec2ac5f2 | 1133 | |
b680134e CH |
1134 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); |
1135 | tag->state = FINISH; | |
1136 | } | |
ec2ac5f2 | 1137 | |
b680134e CH |
1138 | void HandleUnaryExecute( |
1139 | NewRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) | |
1140 | { | |
1141 | grpc_debug("%s: state: %s", __func__, call_states[tag->state]); | |
ec2ac5f2 | 1142 | |
b680134e CH |
1143 | if (tag->state == FINISH) { |
1144 | tag->state = DELETED; | |
1145 | return; | |
ec2ac5f2 RW |
1146 | } |
1147 | ||
b680134e CH |
1148 | struct nb_node *nb_node; |
1149 | struct list *input_list; | |
1150 | struct list *output_list; | |
1151 | struct listnode *node; | |
1152 | struct yang_data *data; | |
1153 | const char *xpath; | |
1154 | char errmsg[BUFSIZ] = {0}; | |
1155 | ||
1156 | // Request: string path = 1; | |
1157 | xpath = tag->request.path().c_str(); | |
1158 | ||
1159 | grpc_debug("%s(path: \"%s\")", __func__, xpath); | |
1160 | ||
1161 | if (tag->request.path().empty()) { | |
1162 | tag->responder.Finish( | |
1163 | tag->response, | |
1164 | grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
1165 | "Data path is empty"), | |
1166 | tag); | |
1167 | tag->state = FINISH; | |
1168 | return; | |
1169 | } | |
0fe5b904 | 1170 | |
b680134e CH |
1171 | nb_node = nb_node_find(xpath); |
1172 | if (!nb_node) { | |
1173 | tag->responder.Finish( | |
1174 | tag->response, | |
1175 | grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, | |
1176 | "Unknown data path"), | |
1177 | tag); | |
1178 | tag->state = FINISH; | |
1179 | return; | |
ec2ac5f2 RW |
1180 | } |
1181 | ||
b680134e CH |
1182 | input_list = yang_data_list_new(); |
1183 | output_list = yang_data_list_new(); | |
ec2ac5f2 | 1184 | |
b680134e CH |
1185 | // Read input parameters. |
1186 | auto input = tag->request.input(); | |
1187 | for (const frr::PathValue &pv : input) { | |
1188 | // Request: repeated PathValue input = 2; | |
1189 | data = yang_data_new(pv.path().c_str(), pv.value().c_str()); | |
1190 | listnode_add(input_list, data); | |
1191 | } | |
ec2ac5f2 | 1192 | |
b680134e CH |
1193 | // Execute callback registered for this XPath. |
1194 | if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg, | |
1195 | sizeof(errmsg)) | |
1196 | != NB_OK) { | |
1197 | flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s", | |
1198 | __func__, xpath); | |
1199 | list_delete(&input_list); | |
1200 | list_delete(&output_list); | |
1201 | ||
1202 | tag->responder.Finish( | |
1203 | tag->response, | |
1204 | grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"), | |
1205 | tag); | |
1206 | tag->state = FINISH; | |
1207 | return; | |
ec2ac5f2 | 1208 | } |
b680134e CH |
1209 | |
1210 | // Process output parameters. | |
1211 | for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { | |
1212 | // Response: repeated PathValue output = 1; | |
1213 | frr::PathValue *pv = tag->response.add_output(); | |
1214 | pv->set_path(data->xpath); | |
1215 | pv->set_value(data->value); | |
1216 | } | |
1217 | ||
1218 | // Release memory. | |
1219 | list_delete(&input_list); | |
1220 | list_delete(&output_list); | |
1221 | ||
1222 | tag->responder.Finish(tag->response, grpc::Status::OK, tag); | |
1223 | tag->state = FINISH; | |
1224 | } | |
1225 | ||
1226 | // ------------------------------------------------------ | |
1227 | // Thread Initialization and Run Functions | |
1228 | // ------------------------------------------------------ | |
1229 | ||
1230 | ||
1231 | #define REQUEST_NEWRPC(NAME, cdb) \ | |
1232 | do { \ | |
1233 | auto _rpcState = new NewRpcState<frr::NAME##Request, \ | |
1234 | frr::NAME##Response>( \ | |
1235 | (cdb), &frr::Northbound::AsyncService::Request##NAME, \ | |
1236 | &HandleUnary##NAME, #NAME); \ | |
83f6fce7 | 1237 | _rpcState->do_request(&service, cq.get(), true); \ |
b680134e CH |
1238 | } while (0) |
1239 | ||
1240 | #define REQUEST_NEWRPC_STREAMING(NAME, cdb) \ | |
1241 | do { \ | |
1242 | auto _rpcState = new NewRpcState<frr::NAME##Request, \ | |
1243 | frr::NAME##Response>( \ | |
1244 | (cdb), &frr::Northbound::AsyncService::Request##NAME, \ | |
1245 | &HandleStreaming##NAME, #NAME); \ | |
83f6fce7 | 1246 | _rpcState->do_request(&service, cq.get(), true); \ |
b680134e CH |
1247 | } while (0) |
1248 | ||
1249 | struct grpc_pthread_attr { | |
1250 | struct frr_pthread_attr attr; | |
1251 | unsigned long port; | |
ec2ac5f2 RW |
1252 | }; |
1253 | ||
de800c10 | 1254 | // Capture these objects so we can try to shut down cleanly |
83f6fce7 CH |
1255 | static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER; |
1256 | static grpc::Server *s_server; | |
de800c10 | 1257 | |
ec2ac5f2 RW |
1258 | static void *grpc_pthread_start(void *arg) |
1259 | { | |
0edcb505 | 1260 | struct frr_pthread *fpt = static_cast<frr_pthread *>(arg); |
b680134e CH |
1261 | uint port = (uint) reinterpret_cast<intptr_t>(fpt->data); |
1262 | ||
1263 | Candidates candidates; | |
1264 | grpc::ServerBuilder builder; | |
1265 | std::stringstream server_address; | |
83f6fce7 | 1266 | frr::Northbound::AsyncService service; |
ec2ac5f2 | 1267 | |
0edcb505 CS |
1268 | frr_pthread_set_name(fpt); |
1269 | ||
b680134e CH |
1270 | server_address << "0.0.0.0:" << port; |
1271 | builder.AddListeningPort(server_address.str(), | |
1272 | grpc::InsecureServerCredentials()); | |
83f6fce7 | 1273 | builder.RegisterService(&service); |
673e4407 RZ |
1274 | builder.AddChannelArgument( |
1275 | GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); | |
83f6fce7 CH |
1276 | std::unique_ptr<grpc::ServerCompletionQueue> cq = |
1277 | builder.AddCompletionQueue(); | |
1278 | std::unique_ptr<grpc::Server> server = builder.BuildAndStart(); | |
1279 | s_server = server.get(); | |
1280 | ||
1281 | grpc_running = true; | |
b680134e CH |
1282 | |
1283 | /* Schedule all RPC handlers */ | |
1284 | REQUEST_NEWRPC(GetCapabilities, NULL); | |
1285 | REQUEST_NEWRPC(CreateCandidate, &candidates); | |
1286 | REQUEST_NEWRPC(DeleteCandidate, &candidates); | |
1287 | REQUEST_NEWRPC(UpdateCandidate, &candidates); | |
1288 | REQUEST_NEWRPC(EditCandidate, &candidates); | |
1289 | REQUEST_NEWRPC(LoadToCandidate, &candidates); | |
1290 | REQUEST_NEWRPC(Commit, &candidates); | |
1291 | REQUEST_NEWRPC(GetTransaction, NULL); | |
1292 | REQUEST_NEWRPC(LockConfig, NULL); | |
1293 | REQUEST_NEWRPC(UnlockConfig, NULL); | |
1294 | REQUEST_NEWRPC(Execute, NULL); | |
1295 | REQUEST_NEWRPC_STREAMING(Get, NULL); | |
1296 | REQUEST_NEWRPC_STREAMING(ListTransactions, NULL); | |
1297 | ||
1298 | zlog_notice("gRPC server listening on %s", | |
1299 | server_address.str().c_str()); | |
1300 | ||
1301 | /* Process inbound RPCs */ | |
83f6fce7 CH |
1302 | bool ok; |
1303 | void *tag; | |
1304 | while (grpc_running) { | |
1305 | if (!cq->Next(&tag, &ok)) { | |
1306 | grpc_debug("%s: CQ empty exiting", __func__); | |
de800c10 | 1307 | break; |
83f6fce7 | 1308 | } |
de800c10 | 1309 | |
83f6fce7 CH |
1310 | grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag, |
1311 | ok); | |
1312 | ||
1313 | if (!ok || !grpc_running) { | |
1314 | delete static_cast<RpcStateBase *>(tag); | |
1315 | break; | |
1316 | } | |
b680134e CH |
1317 | |
1318 | RpcStateBase *rpc = static_cast<RpcStateBase *>(tag); | |
1319 | CallState state = rpc->doCallback(); | |
83f6fce7 | 1320 | grpc_debug("%s: callback returned RPC State: %s", __func__, |
b680134e CH |
1321 | call_states[state]); |
1322 | ||
1323 | /* | |
1324 | * Our side is done (FINISH) receive new requests of this type | |
1325 | * We could do this earlier but that would mean we could be | |
1326 | * handling multiple same type requests in parallel. We expect | |
1327 | * to be called back once more in the FINISH state (from the | |
1328 | * user indicating Finish() for cleanup. | |
1329 | */ | |
83f6fce7 CH |
1330 | if (state == FINISH && grpc_running) |
1331 | rpc->do_request(&service, cq.get(), false); | |
b680134e | 1332 | } |
ec2ac5f2 | 1333 | |
83f6fce7 CH |
1334 | /* This was probably done for us to get here, but let's be safe */ |
1335 | pthread_mutex_lock(&s_server_lock); | |
1336 | grpc_running = false; | |
1337 | if (s_server) { | |
1338 | grpc_debug("%s: shutdown server and CQ", __func__); | |
1339 | server->Shutdown(); | |
1340 | s_server = NULL; | |
1341 | } | |
1342 | pthread_mutex_unlock(&s_server_lock); | |
1343 | ||
1344 | grpc_debug("%s: shutting down CQ", __func__); | |
1345 | cq->Shutdown(); | |
1346 | ||
1347 | grpc_debug("%s: draining the CQ", __func__); | |
1348 | while (cq->Next(&tag, &ok)) { | |
1349 | grpc_debug("%s: drain tag %p", __func__, tag); | |
1350 | delete static_cast<RpcStateBase *>(tag); | |
b680134e | 1351 | } |
ec2ac5f2 | 1352 | |
83f6fce7 | 1353 | zlog_info("%s: exiting from grpc pthread", __func__); |
ec2ac5f2 RW |
1354 | return NULL; |
1355 | } | |
1356 | ||
b680134e CH |
1357 | |
1358 | static int frr_grpc_init(uint port) | |
ec2ac5f2 | 1359 | { |
b680134e CH |
1360 | struct frr_pthread_attr attr = { |
1361 | .start = grpc_pthread_start, | |
1362 | .stop = NULL, | |
1363 | }; | |
1364 | ||
83f6fce7 CH |
1365 | grpc_debug("%s: entered", __func__); |
1366 | ||
0edcb505 | 1367 | fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc"); |
b680134e | 1368 | fpt->data = reinterpret_cast<void *>((intptr_t)port); |
0edcb505 | 1369 | |
ec2ac5f2 | 1370 | /* Create a pthread for gRPC since it runs its own event loop. */ |
0edcb505 | 1371 | if (frr_pthread_run(fpt, NULL) < 0) { |
ec2ac5f2 RW |
1372 | flog_err(EC_LIB_SYSTEM_CALL, "%s: error creating pthread: %s", |
1373 | __func__, safe_strerror(errno)); | |
1374 | return -1; | |
1375 | } | |
ec2ac5f2 RW |
1376 | |
1377 | return 0; | |
1378 | } | |
1379 | ||
1380 | static int frr_grpc_finish(void) | |
1381 | { | |
83f6fce7 | 1382 | grpc_debug("%s: entered", __func__); |
de800c10 | 1383 | |
83f6fce7 CH |
1384 | if (!fpt) |
1385 | return 0; | |
de800c10 | 1386 | |
83f6fce7 CH |
1387 | /* |
1388 | * Shut the server down here in main thread. This will cause the wait on | |
1389 | * the completion queue (cq.Next()) to exit and cleanup everything else. | |
1390 | */ | |
1391 | pthread_mutex_lock(&s_server_lock); | |
1392 | grpc_running = false; | |
1393 | if (s_server) { | |
1394 | grpc_debug("%s: shutdown server", __func__); | |
1395 | s_server->Shutdown(); | |
1396 | s_server = NULL; | |
de800c10 | 1397 | } |
83f6fce7 | 1398 | pthread_mutex_unlock(&s_server_lock); |
de800c10 | 1399 | |
83f6fce7 CH |
1400 | grpc_debug("%s: joining and destroy grpc thread", __func__); |
1401 | pthread_join(fpt->thread, NULL); | |
1402 | frr_pthread_destroy(fpt); | |
ec2ac5f2 RW |
1403 | return 0; |
1404 | } | |
1405 | ||
20b0a2ed QY |
1406 | /* |
1407 | * This is done this way because module_init and module_late_init are both | |
1408 | * called during daemon pre-fork initialization. Because the GRPC library | |
1409 | * spawns threads internally, we need to delay initializing it until after | |
1410 | * fork. This is done by scheduling this init function as an event task, since | |
1411 | * the event loop doesn't run until after fork. | |
1412 | */ | |
cc9f21da | 1413 | static void frr_grpc_module_very_late_init(struct thread *thread) |
ec2ac5f2 | 1414 | { |
ec2ac5f2 | 1415 | const char *args = THIS_MODULE->load_args; |
b680134e | 1416 | uint port = GRPC_DEFAULT_PORT; |
ec2ac5f2 | 1417 | |
ec2ac5f2 | 1418 | if (args) { |
b680134e CH |
1419 | port = std::stoul(args); |
1420 | if (port < 1024 || port > UINT16_MAX) { | |
ec2ac5f2 | 1421 | flog_err(EC_LIB_GRPC_INIT, |
b680134e CH |
1422 | "%s: port number must be between 1025 and %d", |
1423 | __func__, UINT16_MAX); | |
ec2ac5f2 RW |
1424 | goto error; |
1425 | } | |
1426 | } | |
1427 | ||
b680134e | 1428 | if (frr_grpc_init(port) < 0) |
ec2ac5f2 RW |
1429 | goto error; |
1430 | ||
cc9f21da | 1431 | return; |
69ec5832 | 1432 | |
ec2ac5f2 RW |
1433 | error: |
1434 | flog_err(EC_LIB_GRPC_INIT, "failed to initialize the gRPC module"); | |
ec2ac5f2 RW |
1435 | } |
1436 | ||
20b0a2ed QY |
1437 | static int frr_grpc_module_late_init(struct thread_master *tm) |
1438 | { | |
b680134e | 1439 | main_master = tm; |
20b0a2ed | 1440 | hook_register(frr_fini, frr_grpc_finish); |
b680134e | 1441 | thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL); |
20b0a2ed QY |
1442 | return 0; |
1443 | } | |
1444 | ||
ec2ac5f2 RW |
1445 | static int frr_grpc_module_init(void) |
1446 | { | |
1447 | hook_register(frr_late_init, frr_grpc_module_late_init); | |
1448 | ||
1449 | return 0; | |
1450 | } | |
1451 | ||
1452 | FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION, | |
1453 | .description = "FRR gRPC northbound module", | |
b680134e | 1454 | .init = frr_grpc_module_init, ); |