2 * Copyright (C) 2018 NetDEF, Inc.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "lib_errors.h"
28 #include "northbound.h"
31 #include <sysrepo/values.h>
32 #include <sysrepo/xpath.h>
34 DEFINE_MTYPE_STATIC(LIB
, SYSREPO
, "Sysrepo module")
36 static struct thread_master
*master
;
37 static struct list
*sysrepo_threads
;
38 static sr_session_ctx_t
*session
;
39 static sr_conn_ctx_t
*connection
;
41 static int frr_sr_read_cb(struct thread
*thread
);
42 static int frr_sr_write_cb(struct thread
*thread
);
43 static int frr_sr_finish(void);
45 /* Convert FRR YANG data value to sysrepo YANG data value. */
46 static int yang_data_frr2sr(struct yang_data
*frr_data
, sr_val_t
*sr_data
)
48 struct nb_node
*nb_node
;
49 const struct lys_node
*snode
;
50 struct lys_node_container
*scontainer
;
51 struct lys_node_leaf
*sleaf
;
52 struct lys_node_leaflist
*sleaflist
;
55 sr_val_set_xpath(sr_data
, frr_data
->xpath
);
57 nb_node
= nb_node_find(frr_data
->xpath
);
59 flog_warn(EC_LIB_YANG_UNKNOWN_DATA_PATH
,
60 "%s: unknown data path: %s", __func__
,
65 snode
= nb_node
->snode
;
66 switch (snode
->nodetype
) {
68 scontainer
= (struct lys_node_container
*)snode
;
69 if (!scontainer
->presence
)
71 sr_data
->type
= SR_CONTAINER_PRESENCE_T
;
74 sr_data
->type
= SR_LIST_T
;
77 sleaf
= (struct lys_node_leaf
*)snode
;
78 type
= sleaf
->type
.base
;
81 sleaflist
= (struct lys_node_leaflist
*)snode
;
82 type
= sleaflist
->type
.base
;
90 sr_val_set_str_data(sr_data
, SR_BINARY_T
, frr_data
->value
);
93 sr_val_set_str_data(sr_data
, SR_BITS_T
, frr_data
->value
);
96 sr_data
->type
= SR_BOOL_T
;
97 sr_data
->data
.bool_val
= yang_str2bool(frr_data
->value
);
100 sr_data
->type
= SR_DECIMAL64_T
;
101 sr_data
->data
.decimal64_val
=
102 yang_str2dec64(frr_data
->xpath
, frr_data
->value
);
105 sr_data
->type
= SR_LEAF_EMPTY_T
;
108 sr_val_set_str_data(sr_data
, SR_ENUM_T
, frr_data
->value
);
111 sr_val_set_str_data(sr_data
, SR_IDENTITYREF_T
, frr_data
->value
);
114 sr_val_set_str_data(sr_data
, SR_INSTANCEID_T
, frr_data
->value
);
117 sr_data
->type
= SR_INT8_T
;
118 sr_data
->data
.int8_val
= yang_str2int8(frr_data
->value
);
121 sr_data
->type
= SR_INT16_T
;
122 sr_data
->data
.int16_val
= yang_str2int16(frr_data
->value
);
125 sr_data
->type
= SR_INT32_T
;
126 sr_data
->data
.int32_val
= yang_str2int32(frr_data
->value
);
129 sr_data
->type
= SR_INT64_T
;
130 sr_data
->data
.int64_val
= yang_str2int64(frr_data
->value
);
133 sr_val_set_str_data(sr_data
, SR_STRING_T
, frr_data
->value
);
136 sr_data
->type
= SR_UINT8_T
;
137 sr_data
->data
.uint8_val
= yang_str2uint8(frr_data
->value
);
140 sr_data
->type
= SR_UINT16_T
;
141 sr_data
->data
.uint16_val
= yang_str2uint16(frr_data
->value
);
144 sr_data
->type
= SR_UINT32_T
;
145 sr_data
->data
.uint32_val
= yang_str2uint32(frr_data
->value
);
148 sr_data
->type
= SR_UINT64_T
;
149 sr_data
->data
.uint64_val
= yang_str2uint64(frr_data
->value
);
158 static int frr_sr_process_change(struct nb_config
*candidate
,
159 sr_change_oper_t sr_op
, sr_val_t
*sr_old_val
,
160 sr_val_t
*sr_new_val
)
162 struct nb_node
*nb_node
;
163 enum nb_operation nb_op
;
166 char value_str
[YANG_VALUE_MAXLEN
];
167 struct yang_data
*data
;
170 sr_data
= sr_new_val
? sr_new_val
: sr_old_val
;
173 xpath
= sr_data
->xpath
;
175 /* Non-presence container - nothing to do. */
176 if (sr_data
->type
== SR_CONTAINER_T
)
179 nb_node
= nb_node_find(xpath
);
181 flog_warn(EC_LIB_YANG_UNKNOWN_DATA_PATH
,
182 "%s: unknown data path: %s", __func__
, xpath
);
186 /* Map operation values. */
190 if (nb_operation_is_valid(NB_OP_CREATE
, nb_node
->snode
))
191 nb_op
= NB_OP_CREATE
;
192 else if (nb_operation_is_valid(NB_OP_MODIFY
, nb_node
->snode
)) {
193 nb_op
= NB_OP_MODIFY
;
195 /* Ignore list keys modifications. */
200 * When a list is deleted or one of its keys is changed, we are
201 * notified about the removal of all of its leafs, even the ones
202 * that are non-optional. We need to ignore these notifications.
204 if (!nb_operation_is_valid(NB_OP_DELETE
, nb_node
->snode
))
207 nb_op
= NB_OP_DELETE
;
213 flog_err(EC_LIB_DEVELOPMENT
,
214 "%s: unexpected operation %u [xpath %s]", __func__
,
219 sr_val_to_buff(sr_data
, value_str
, sizeof(value_str
));
220 data
= yang_data_new(xpath
, value_str
);
222 ret
= nb_candidate_edit(candidate
, nb_node
, nb_op
, xpath
, NULL
, data
);
223 yang_data_free(data
);
226 EC_LIB_NB_CANDIDATE_EDIT_ERROR
,
227 "%s: failed to edit candidate configuration: operation [%s] xpath [%s]",
228 __func__
, nb_operation_name(nb_op
), xpath
);
235 /* Callback for changes in the running configuration. */
236 static int frr_sr_config_change_cb(sr_session_ctx_t
*session
,
237 const char *module_name
,
238 sr_notif_event_t sr_ev
, void *private_ctx
)
240 sr_change_iter_t
*it
;
242 sr_change_oper_t sr_op
;
243 sr_val_t
*sr_old_val
, *sr_new_val
;
244 char xpath
[XPATH_MAXLEN
];
245 struct nb_config
*candidate
;
248 * Ignore SR_EV_ABORT and SR_EV_APPLY. We'll leverage the northbound
249 * layer itself to abort or apply the configuration changes when a
250 * transaction is created.
252 if (sr_ev
!= SR_EV_ENABLED
&& sr_ev
!= SR_EV_VERIFY
)
255 snprintf(xpath
, sizeof(xpath
), "/%s:*", module_name
);
256 ret
= sr_get_changes_iter(session
, xpath
, &it
);
257 if (ret
!= SR_ERR_OK
) {
258 flog_err(EC_LIB_LIBSYSREPO
,
259 "%s: sr_get_changes_iter() failed for xpath %s",
264 candidate
= nb_config_dup(running_config
);
266 while ((ret
= sr_get_change_next(session
, it
, &sr_op
, &sr_old_val
,
269 ret
= frr_sr_process_change(candidate
, sr_op
, sr_old_val
,
271 sr_free_val(sr_old_val
);
272 sr_free_val(sr_new_val
);
277 sr_free_change_iter(it
);
278 if (ret
!= NB_OK
&& ret
!= SR_ERR_NOT_FOUND
) {
279 nb_config_free(candidate
);
280 return SR_ERR_INTERNAL
;
283 /* Commit changes. */
284 ret
= nb_candidate_commit(candidate
, NB_CLIENT_SYSREPO
, true, NULL
,
286 nb_config_free(candidate
);
288 /* Map northbound return code to sysrepo return code. */
291 case NB_ERR_NO_CHANGES
:
294 return SR_ERR_LOCKED
;
295 case NB_ERR_RESOURCE
:
298 return SR_ERR_VALIDATION_FAILED
;
302 static int frr_sr_state_data_iter_cb(const struct lys_node
*snode
,
303 struct yang_translator
*translator
,
304 struct yang_data
*data
, void *arg
)
306 struct list
*elements
= arg
;
308 listnode_add(elements
, data
);
313 /* Callback for state retrieval. */
314 static int frr_sr_state_cb(const char *xpath
, sr_val_t
**values
,
315 size_t *values_cnt
, uint64_t request_id
,
318 struct list
*elements
;
319 struct yang_data
*data
;
320 struct listnode
*node
;
322 int ret
, count
, i
= 0;
324 elements
= yang_data_list_new();
325 if (nb_oper_data_iterate(xpath
, NULL
, NB_OPER_DATA_ITER_NORECURSE
,
326 frr_sr_state_data_iter_cb
, elements
)
328 flog_warn(EC_LIB_NB_OPERATIONAL_DATA
,
329 "%s: failed to obtain operational data [xpath %s]",
334 if (list_isempty(elements
))
337 count
= listcount(elements
);
338 ret
= sr_new_values(count
, &v
);
339 if (ret
!= SR_ERR_OK
) {
340 flog_err(EC_LIB_LIBSYSREPO
, "%s: sr_new_values(): %s", __func__
,
345 for (ALL_LIST_ELEMENTS_RO(elements
, node
, data
)) {
346 if (yang_data_frr2sr(data
, &v
[i
++]) != 0) {
347 flog_err(EC_LIB_SYSREPO_DATA_CONVERT
,
348 "%s: failed to convert data to sysrepo format",
356 list_delete(&elements
);
361 list_delete(&elements
);
368 static int frr_sr_config_rpc_cb(const char *xpath
, const sr_val_t
*sr_input
,
369 const size_t input_cnt
, sr_val_t
**sr_output
,
370 size_t *sr_output_cnt
, void *private_ctx
)
372 struct nb_node
*nb_node
;
375 struct yang_data
*data
;
376 size_t cb_output_cnt
;
379 nb_node
= nb_node_find(xpath
);
381 flog_warn(EC_LIB_YANG_UNKNOWN_DATA_PATH
,
382 "%s: unknown data path: %s", __func__
, xpath
);
383 return SR_ERR_INTERNAL
;
386 input
= yang_data_list_new();
387 output
= yang_data_list_new();
390 for (size_t i
= 0; i
< input_cnt
; i
++) {
391 char value_str
[YANG_VALUE_MAXLEN
];
393 sr_val_to_buff(&sr_input
[i
], value_str
, sizeof(value_str
));
395 data
= yang_data_new(xpath
, value_str
);
396 listnode_add(input
, data
);
399 /* Execute callback registered for this XPath. */
400 if (nb_node
->cbs
.rpc(xpath
, input
, output
) != NB_OK
) {
401 flog_warn(EC_LIB_NB_CB_RPC
, "%s: rpc callback failed: %s",
403 ret
= SR_ERR_OPERATION_FAILED
;
407 /* Process output. */
408 if (listcount(output
) > 0) {
409 sr_val_t
*values
= NULL
;
410 struct listnode
*node
;
413 cb_output_cnt
= listcount(output
);
414 ret
= sr_new_values(cb_output_cnt
, &values
);
415 if (ret
!= SR_ERR_OK
) {
416 flog_err(EC_LIB_LIBSYSREPO
, "%s: sr_new_values(): %s",
417 __func__
, sr_strerror(ret
));
421 for (ALL_LIST_ELEMENTS_RO(output
, node
, data
)) {
422 if (yang_data_frr2sr(data
, &values
[i
++]) != 0) {
424 EC_LIB_SYSREPO_DATA_CONVERT
,
425 "%s: failed to convert data to Sysrepo format",
427 ret
= SR_ERR_INTERNAL
;
428 sr_free_values(values
, cb_output_cnt
);
434 *sr_output_cnt
= cb_output_cnt
;
438 /* Release memory. */
440 list_delete(&output
);
445 static int frr_sr_notification_send(const char *xpath
, struct list
*arguments
)
447 sr_val_t
*values
= NULL
;
448 size_t values_cnt
= 0;
451 if (arguments
&& listcount(arguments
) > 0) {
452 struct yang_data
*data
;
453 struct listnode
*node
;
456 values_cnt
= listcount(arguments
);
457 ret
= sr_new_values(values_cnt
, &values
);
458 if (ret
!= SR_ERR_OK
) {
459 flog_err(EC_LIB_LIBSYSREPO
, "%s: sr_new_values(): %s",
460 __func__
, sr_strerror(ret
));
464 for (ALL_LIST_ELEMENTS_RO(arguments
, node
, data
)) {
465 if (yang_data_frr2sr(data
, &values
[i
++]) != 0) {
467 EC_LIB_SYSREPO_DATA_CONVERT
,
468 "%s: failed to convert data to sysrepo format",
470 sr_free_values(values
, values_cnt
);
476 ret
= sr_event_notif_send(session
, xpath
, values
, values_cnt
,
477 SR_EV_NOTIF_DEFAULT
);
478 if (ret
!= SR_ERR_OK
) {
479 flog_err(EC_LIB_LIBSYSREPO
,
480 "%s: sr_event_notif_send() failed for xpath %s",
488 /* Code to integrate the sysrepo client into FRR main event loop. */
489 struct sysrepo_thread
{
490 struct thread
*thread
;
495 static struct sysrepo_thread
*frr_sr_fd_lookup(sr_fd_event_t event
, int fd
)
497 struct sysrepo_thread
*sr_thread
;
498 struct listnode
*node
;
500 for (ALL_LIST_ELEMENTS_RO(sysrepo_threads
, node
, sr_thread
)) {
501 if (sr_thread
->event
== event
&& sr_thread
->fd
== fd
)
508 static void frr_sr_fd_add(int event
, int fd
)
510 struct sysrepo_thread
*sr_thread
;
512 if (frr_sr_fd_lookup(event
, fd
) != NULL
)
515 sr_thread
= XCALLOC(MTYPE_SYSREPO
, sizeof(*sr_thread
));
516 sr_thread
->event
= event
;
518 listnode_add(sysrepo_threads
, sr_thread
);
521 case SR_FD_INPUT_READY
:
522 thread_add_read(master
, frr_sr_read_cb
, NULL
, fd
,
525 case SR_FD_OUTPUT_READY
:
526 thread_add_write(master
, frr_sr_write_cb
, NULL
, fd
,
534 static void frr_sr_fd_free(struct sysrepo_thread
*sr_thread
)
536 THREAD_OFF(sr_thread
->thread
);
537 XFREE(MTYPE_SYSREPO
, sr_thread
);
540 static void frr_sr_fd_del(int event
, int fd
)
542 struct sysrepo_thread
*sr_thread
;
544 sr_thread
= frr_sr_fd_lookup(event
, fd
);
548 listnode_delete(sysrepo_threads
, sr_thread
);
549 frr_sr_fd_free(sr_thread
);
552 static void frr_sr_fd_update(sr_fd_change_t
*fd_change_set
,
553 size_t fd_change_set_cnt
)
555 for (size_t i
= 0; i
< fd_change_set_cnt
; i
++) {
556 int fd
= fd_change_set
[i
].fd
;
557 int event
= fd_change_set
[i
].events
;
559 if (event
!= SR_FD_INPUT_READY
&& event
!= SR_FD_OUTPUT_READY
)
562 switch (fd_change_set
[i
].action
) {
563 case SR_FD_START_WATCHING
:
564 frr_sr_fd_add(event
, fd
);
566 case SR_FD_STOP_WATCHING
:
567 frr_sr_fd_del(event
, fd
);
575 static int frr_sr_read_cb(struct thread
*thread
)
577 int fd
= THREAD_FD(thread
);
578 sr_fd_change_t
*fd_change_set
= NULL
;
579 size_t fd_change_set_cnt
= 0;
582 ret
= sr_fd_event_process(fd
, SR_FD_INPUT_READY
, &fd_change_set
,
584 if (ret
!= SR_ERR_OK
) {
585 flog_err(EC_LIB_LIBSYSREPO
, "%s: sr_fd_event_process(): %s",
586 __func__
, sr_strerror(ret
));
591 thread_add_read(master
, frr_sr_read_cb
, NULL
, fd
, &thread
);
593 frr_sr_fd_update(fd_change_set
, fd_change_set_cnt
);
599 static int frr_sr_write_cb(struct thread
*thread
)
601 int fd
= THREAD_FD(thread
);
602 sr_fd_change_t
*fd_change_set
= NULL
;
603 size_t fd_change_set_cnt
= 0;
606 ret
= sr_fd_event_process(fd
, SR_FD_OUTPUT_READY
, &fd_change_set
,
608 if (ret
!= SR_ERR_OK
) {
609 flog_err(EC_LIB_LIBSYSREPO
, "%s: sr_fd_event_process(): %s",
610 __func__
, sr_strerror(ret
));
614 frr_sr_fd_update(fd_change_set
, fd_change_set_cnt
);
620 static void frr_sr_subscribe_config(struct yang_module
*module
)
624 ret
= sr_module_change_subscribe(
625 session
, module
->name
, frr_sr_config_change_cb
, NULL
, 0,
626 SR_SUBSCR_DEFAULT
| SR_SUBSCR_EV_ENABLED
,
627 &module
->sr_subscription
);
628 if (ret
!= SR_ERR_OK
)
629 flog_err(EC_LIB_LIBSYSREPO
, "sr_module_change_subscribe(): %s",
633 static int frr_sr_subscribe_state(const struct lys_node
*snode
, void *arg
)
635 struct yang_module
*module
= arg
;
636 struct nb_node
*nb_node
;
639 if (!CHECK_FLAG(snode
->flags
, LYS_CONFIG_R
))
640 return YANG_ITER_CONTINUE
;
641 /* We only need to subscribe to the root of the state subtrees. */
642 if (snode
->parent
&& CHECK_FLAG(snode
->parent
->flags
, LYS_CONFIG_R
))
643 return YANG_ITER_CONTINUE
;
645 nb_node
= snode
->priv
;
646 if (debug_northbound
)
647 zlog_debug("%s: providing data to '%s'", __func__
,
650 ret
= sr_dp_get_items_subscribe(
651 session
, nb_node
->xpath
, frr_sr_state_cb
, NULL
,
652 SR_SUBSCR_CTX_REUSE
, &module
->sr_subscription
);
653 if (ret
!= SR_ERR_OK
)
654 flog_err(EC_LIB_LIBSYSREPO
, "sr_dp_get_items_subscribe(): %s",
657 return YANG_ITER_CONTINUE
;
660 static int frr_sr_subscribe_rpc(const struct lys_node
*snode
, void *arg
)
662 struct yang_module
*module
= arg
;
663 struct nb_node
*nb_node
;
666 if (snode
->nodetype
!= LYS_RPC
)
667 return YANG_ITER_CONTINUE
;
669 nb_node
= snode
->priv
;
670 if (debug_northbound
)
671 zlog_debug("%s: providing RPC to '%s'", __func__
,
674 ret
= sr_rpc_subscribe(session
, nb_node
->xpath
, frr_sr_config_rpc_cb
,
675 NULL
, SR_SUBSCR_CTX_REUSE
,
676 &module
->sr_subscription
);
677 if (ret
!= SR_ERR_OK
)
678 flog_err(EC_LIB_LIBSYSREPO
, "sr_rpc_subscribe(): %s",
681 return YANG_ITER_CONTINUE
;
684 static int frr_sr_subscribe_action(const struct lys_node
*snode
, void *arg
)
686 struct yang_module
*module
= arg
;
687 struct nb_node
*nb_node
;
690 if (snode
->nodetype
!= LYS_ACTION
)
691 return YANG_ITER_CONTINUE
;
693 nb_node
= snode
->priv
;
694 if (debug_northbound
)
695 zlog_debug("%s: providing action to '%s'", __func__
,
698 ret
= sr_action_subscribe(session
, nb_node
->xpath
, frr_sr_config_rpc_cb
,
699 NULL
, SR_SUBSCR_CTX_REUSE
,
700 &module
->sr_subscription
);
701 if (ret
!= SR_ERR_OK
)
702 flog_err(EC_LIB_LIBSYSREPO
, "sr_action_subscribe(): %s",
705 return YANG_ITER_CONTINUE
;
708 /* FRR's Sysrepo initialization. */
709 static int frr_sr_init(const char *program_name
)
711 struct yang_module
*module
;
714 sysrepo_threads
= list_new();
716 ret
= sr_fd_watcher_init(&sysrepo_fd
, NULL
);
717 if (ret
!= SR_ERR_OK
) {
718 flog_err(EC_LIB_SYSREPO_INIT
, "%s: sr_fd_watcher_init(): %s",
719 __func__
, sr_strerror(ret
));
723 /* Connect to Sysrepo. */
724 ret
= sr_connect(program_name
, SR_CONN_DEFAULT
, &connection
);
725 if (ret
!= SR_ERR_OK
) {
726 flog_err(EC_LIB_SYSREPO_INIT
, "%s: sr_connect(): %s", __func__
,
732 ret
= sr_session_start(connection
, SR_DS_RUNNING
, SR_SESS_DEFAULT
,
734 if (ret
!= SR_ERR_OK
) {
735 flog_err(EC_LIB_SYSREPO_INIT
, "%s: sr_session_start(): %s",
736 __func__
, sr_strerror(ret
));
740 /* Perform subscriptions. */
741 RB_FOREACH (module
, yang_modules
, &yang_modules
) {
742 frr_sr_subscribe_config(module
);
743 yang_snodes_iterate_module(module
->info
, frr_sr_subscribe_state
,
745 yang_snodes_iterate_module(module
->info
, frr_sr_subscribe_rpc
,
747 yang_snodes_iterate_module(module
->info
,
748 frr_sr_subscribe_action
, 0, module
);
751 hook_register(nb_notification_send
, frr_sr_notification_send
);
753 frr_sr_fd_add(SR_FD_INPUT_READY
, sysrepo_fd
);
763 static int frr_sr_finish(void)
765 struct yang_module
*module
;
767 RB_FOREACH (module
, yang_modules
, &yang_modules
) {
768 if (!module
->sr_subscription
)
770 sr_unsubscribe(session
, module
->sr_subscription
);
774 sr_session_stop(session
);
776 sr_disconnect(connection
);
778 sysrepo_threads
->del
= (void (*)(void *))frr_sr_fd_free
;
779 list_delete(&sysrepo_threads
);
780 sr_fd_watcher_cleanup();
785 static int frr_sr_module_late_init(struct thread_master
*tm
)
789 if (frr_sr_init(frr_get_progname()) < 0) {
790 flog_err(EC_LIB_SYSREPO_INIT
,
791 "failed to initialize the Sysrepo module");
795 hook_register(frr_fini
, frr_sr_finish
);
800 static int frr_sr_module_init(void)
802 hook_register(frr_late_init
, frr_sr_module_late_init
);
807 FRR_MODULE_SETUP(.name
= "frr_sysrepo", .version
= FRR_VERSION
,
808 .description
= "FRR sysrepo integration module",
809 .init
= frr_sr_module_init
, )