2 * Copyright (c) 2017, 2018 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include "ovsdb-error.h"
25 #include "ovsdb-parser.h"
26 #include "openvswitch/dynamic-string.h"
27 #include "openvswitch/json.h"
28 #include "openvswitch/vlog.h"
31 VLOG_DEFINE_THIS_MODULE(raft_rpc
);
33 #define RAFT_RPC(ENUM, NAME) \
34 static void raft_##NAME##_uninit(struct raft_##NAME *); \
35 static void raft_##NAME##_clone(struct raft_##NAME *, \
36 const struct raft_##NAME *); \
37 static void raft_##NAME##_to_jsonrpc(const struct raft_##NAME *, \
39 static void raft_##NAME##_from_jsonrpc(struct ovsdb_parser *, \
40 struct raft_##NAME *); \
41 static void raft_format_##NAME(const struct raft_##NAME *, struct ds *);
47 raft_rpc_type_to_string(enum raft_rpc_type status
)
50 #define RAFT_RPC(ENUM, NAME) case ENUM: return #NAME;
58 raft_rpc_type_from_string(const char *s
, enum raft_rpc_type
*status
)
60 #define RAFT_RPC(ENUM, NAME) \
61 if (!strcmp(s, #NAME)) { \
70 /* raft_hello_request. */
73 raft_hello_request_uninit(struct raft_hello_request
*rq
)
79 raft_hello_request_clone(struct raft_hello_request
*dst
,
80 const struct raft_hello_request
*src
)
82 dst
->address
= nullable_xstrdup(src
->address
);
86 raft_hello_request_to_jsonrpc(const struct raft_hello_request
*rq
,
89 json_object_put_string(args
, "address", rq
->address
);
93 raft_hello_request_from_jsonrpc(struct ovsdb_parser
*p
,
94 struct raft_hello_request
*rq
)
96 rq
->address
= nullable_xstrdup(raft_parse_required_string(p
, "address"));
100 raft_format_hello_request(const struct raft_hello_request
*rq
,
103 ds_put_format(s
, " address=\"%s\"", rq
->address
);
106 /* raft_append_request. */
109 raft_append_request_uninit(struct raft_append_request
*rq
)
111 for (size_t i
= 0; i
< rq
->n_entries
; i
++) {
112 raft_entry_uninit(&rq
->entries
[i
]);
118 raft_append_request_clone(struct raft_append_request
*dst
,
119 const struct raft_append_request
*src
)
121 dst
->entries
= xmalloc(src
->n_entries
* sizeof *dst
->entries
);
122 for (size_t i
= 0; i
< src
->n_entries
; i
++) {
123 raft_entry_clone(&dst
->entries
[i
], &src
->entries
[i
]);
128 raft_append_request_to_jsonrpc(const struct raft_append_request
*rq
,
131 raft_put_uint64(args
, "term", rq
->term
);
132 raft_put_uint64(args
, "prev_log_index", rq
->prev_log_index
);
133 raft_put_uint64(args
, "prev_log_term", rq
->prev_log_term
);
134 raft_put_uint64(args
, "leader_commit", rq
->leader_commit
);
136 struct json
**entries
= xmalloc(rq
->n_entries
* sizeof *entries
);
137 for (size_t i
= 0; i
< rq
->n_entries
; i
++) {
138 entries
[i
] = raft_entry_to_json(&rq
->entries
[i
]);
140 json_object_put(args
, "log", json_array_create(entries
, rq
->n_entries
));
144 raft_append_request_from_jsonrpc(struct ovsdb_parser
*p
,
145 struct raft_append_request
*rq
)
147 rq
->term
= raft_parse_required_uint64(p
, "term");
148 rq
->prev_log_index
= raft_parse_required_uint64(p
, "prev_log_index");
149 rq
->prev_log_term
= raft_parse_required_uint64(p
, "prev_log_term");
150 rq
->leader_commit
= raft_parse_required_uint64(p
, "leader_commit");
152 const struct json
*log
= ovsdb_parser_member(p
, "log", OP_ARRAY
);
156 const struct json_array
*entries
= json_array(log
);
157 rq
->entries
= xmalloc(entries
->n
* sizeof *rq
->entries
);
159 for (size_t i
= 0; i
< entries
->n
; i
++) {
160 struct ovsdb_error
*error
= raft_entry_from_json(entries
->elems
[i
],
163 ovsdb_parser_put_error(p
, error
);
171 raft_format_append_request(const struct raft_append_request
*rq
,
174 ds_put_format(s
, " term=%"PRIu64
, rq
->term
);
175 ds_put_format(s
, " prev_log_index=%"PRIu64
, rq
->prev_log_index
);
176 ds_put_format(s
, " prev_log_term=%"PRIu64
, rq
->prev_log_term
);
177 ds_put_format(s
, " leader_commit=%"PRIu64
, rq
->leader_commit
);
178 ds_put_format(s
, " n_entries=%u", rq
->n_entries
);
181 /* raft_append_reply. */
184 raft_append_result_to_string(enum raft_append_result result
)
189 case RAFT_APPEND_INCONSISTENCY
:
190 return "inconsistency";
191 case RAFT_APPEND_IO_ERROR
:
199 raft_append_result_from_string(const char *s
, enum raft_append_result
*resultp
)
201 for (enum raft_append_result result
= 0; ; result
++) {
202 const char *s2
= raft_append_result_to_string(result
);
206 } else if (!strcmp(s
, s2
)) {
214 raft_append_reply_uninit(struct raft_append_reply
*rpy OVS_UNUSED
)
219 raft_append_reply_clone(struct raft_append_reply
*dst OVS_UNUSED
,
220 const struct raft_append_reply
*src OVS_UNUSED
)
225 raft_append_reply_to_jsonrpc(const struct raft_append_reply
*rpy
,
228 raft_put_uint64(args
, "term", rpy
->term
);
229 raft_put_uint64(args
, "log_end", rpy
->log_end
);
230 raft_put_uint64(args
, "prev_log_index", rpy
->prev_log_index
);
231 raft_put_uint64(args
, "prev_log_term", rpy
->prev_log_term
);
232 raft_put_uint64(args
, "n_entries", rpy
->n_entries
);
233 json_object_put_string(args
, "result",
234 raft_append_result_to_string(rpy
->result
));
238 raft_append_reply_from_jsonrpc(struct ovsdb_parser
*p
,
239 struct raft_append_reply
*rpy
)
241 rpy
->term
= raft_parse_required_uint64(p
, "term");
242 rpy
->log_end
= raft_parse_required_uint64(p
, "log_end");
243 rpy
->prev_log_index
= raft_parse_required_uint64(p
, "prev_log_index");
244 rpy
->prev_log_term
= raft_parse_required_uint64(p
, "prev_log_term");
245 rpy
->n_entries
= raft_parse_required_uint64(p
, "n_entries");
247 const char *result
= raft_parse_required_string(p
, "result");
248 if (result
&& !raft_append_result_from_string(result
, &rpy
->result
)) {
249 ovsdb_parser_raise_error(p
, "unknown result \"%s\"", result
);
254 raft_format_append_reply(const struct raft_append_reply
*rpy
, struct ds
*s
)
256 ds_put_format(s
, " term=%"PRIu64
, rpy
->term
);
257 ds_put_format(s
, " log_end=%"PRIu64
, rpy
->log_end
);
258 ds_put_format(s
, " result=\"%s\"",
259 raft_append_result_to_string(rpy
->result
));
262 /* raft_vote_request. */
265 raft_vote_request_uninit(struct raft_vote_request
*rq OVS_UNUSED
)
270 raft_vote_request_clone(struct raft_vote_request
*dst OVS_UNUSED
,
271 const struct raft_vote_request
*src OVS_UNUSED
)
276 raft_vote_request_to_jsonrpc(const struct raft_vote_request
*rq
,
279 raft_put_uint64(args
, "term", rq
->term
);
280 raft_put_uint64(args
, "last_log_index", rq
->last_log_index
);
281 raft_put_uint64(args
, "last_log_term", rq
->last_log_term
);
282 if (rq
->leadership_transfer
) {
283 json_object_put(args
, "leadership_transfer",
284 json_boolean_create(true));
289 raft_vote_request_from_jsonrpc(struct ovsdb_parser
*p
,
290 struct raft_vote_request
*rq
)
292 rq
->term
= raft_parse_required_uint64(p
, "term");
293 rq
->last_log_index
= raft_parse_required_uint64(p
, "last_log_index");
294 rq
->last_log_term
= raft_parse_required_uint64(p
, "last_log_term");
295 rq
->leadership_transfer
296 = raft_parse_optional_boolean(p
, "leadership_transfer") == 1;
300 raft_format_vote_request(const struct raft_vote_request
*rq
, struct ds
*s
)
302 ds_put_format(s
, " term=%"PRIu64
, rq
->term
);
303 ds_put_format(s
, " last_log_index=%"PRIu64
, rq
->last_log_index
);
304 ds_put_format(s
, " last_log_term=%"PRIu64
, rq
->last_log_term
);
305 if (rq
->leadership_transfer
) {
306 ds_put_cstr(s
, " leadership_transfer=true");
310 /* raft_vote_reply. */
313 raft_vote_reply_uninit(struct raft_vote_reply
*rpy OVS_UNUSED
)
318 raft_vote_reply_clone(struct raft_vote_reply
*dst OVS_UNUSED
,
319 const struct raft_vote_reply
*src OVS_UNUSED
)
324 raft_vote_reply_to_jsonrpc(const struct raft_vote_reply
*rpy
,
327 raft_put_uint64(args
, "term", rpy
->term
);
328 json_object_put_format(args
, "vote", UUID_FMT
, UUID_ARGS(&rpy
->vote
));
332 raft_vote_reply_from_jsonrpc(struct ovsdb_parser
*p
,
333 struct raft_vote_reply
*rpy
)
335 rpy
->term
= raft_parse_required_uint64(p
, "term");
336 rpy
->vote
= raft_parse_required_uuid(p
, "vote");
340 raft_format_vote_reply(const struct raft_vote_reply
*rpy
, struct ds
*s
)
342 ds_put_format(s
, " term=%"PRIu64
, rpy
->term
);
343 ds_put_format(s
, " vote="SID_FMT
, SID_ARGS(&rpy
->vote
));
346 /* raft_add_server_request */
349 raft_add_server_request_uninit(struct raft_add_server_request
*rq
)
355 raft_add_server_request_clone(struct raft_add_server_request
*dst
,
356 const struct raft_add_server_request
*src
)
358 dst
->address
= nullable_xstrdup(src
->address
);
362 raft_add_server_request_to_jsonrpc(const struct raft_add_server_request
*rq
,
365 json_object_put_string(args
, "address", rq
->address
);
369 raft_add_server_request_from_jsonrpc(struct ovsdb_parser
*p
,
370 struct raft_add_server_request
*rq
)
372 rq
->address
= nullable_xstrdup(raft_parse_required_string(p
, "address"));
376 raft_format_add_server_request(const struct raft_add_server_request
*rq
,
379 ds_put_format(s
, " address=\"%s\"", rq
->address
);
382 /* raft_add_server_reply. */
385 raft_add_server_reply_uninit(struct raft_add_server_reply
*rpy
)
387 sset_destroy(&rpy
->remote_addresses
);
391 raft_add_server_reply_clone(struct raft_add_server_reply
*dst
,
392 const struct raft_add_server_reply
*src
)
394 sset_clone(&dst
->remote_addresses
, &src
->remote_addresses
);
398 raft_add_server_reply_to_jsonrpc(const struct raft_add_server_reply
*rpy
,
401 json_object_put(args
, "success", json_boolean_create(rpy
->success
));
402 if (!sset_is_empty(&rpy
->remote_addresses
)) {
403 json_object_put(args
, "remote_addresses",
404 raft_addresses_to_json(&rpy
->remote_addresses
));
409 raft_add_server_reply_from_jsonrpc(struct ovsdb_parser
*p
,
410 struct raft_add_server_reply
*rpy
)
412 rpy
->success
= raft_parse_required_boolean(p
, "success");
414 const struct json
*json
= ovsdb_parser_member(p
, "remote_addresses",
415 OP_ARRAY
| OP_OPTIONAL
);
417 ovsdb_parser_put_error(p
, raft_addresses_from_json(
418 json
, &rpy
->remote_addresses
));
420 sset_init(&rpy
->remote_addresses
);
425 raft_format_add_server_reply(const struct raft_add_server_reply
*rpy
,
428 ds_put_format(s
, " success=%s", rpy
->success
? "true" : "false");
429 if (!sset_is_empty(&rpy
->remote_addresses
)) {
430 ds_put_cstr(s
, " remote_addresses=[");
434 SSET_FOR_EACH (address
, &rpy
->remote_addresses
) {
436 ds_put_cstr(s
, ", ");
438 ds_put_cstr(s
, address
);
444 /* raft_remove_server_reply. */
447 raft_remove_server_reply_uninit(
448 struct raft_remove_server_reply
*rpy OVS_UNUSED
)
453 raft_remove_server_reply_clone(
454 struct raft_remove_server_reply
*dst OVS_UNUSED
,
455 const struct raft_remove_server_reply
*src OVS_UNUSED
)
460 raft_remove_server_reply_to_jsonrpc(const struct raft_remove_server_reply
*rpy
,
463 if (!uuid_is_zero(&rpy
->target_sid
)) {
464 json_object_put_format(args
, "target_server",
465 UUID_FMT
, UUID_ARGS(&rpy
->target_sid
));
467 json_object_put(args
, "success", json_boolean_create(rpy
->success
));
471 raft_remove_server_reply_from_jsonrpc(struct ovsdb_parser
*p
,
472 struct raft_remove_server_reply
*rpy
)
474 rpy
->success
= raft_parse_required_boolean(p
, "success");
475 raft_parse_optional_uuid(p
, "target_server", &rpy
->target_sid
);
479 raft_format_remove_server_reply(const struct raft_remove_server_reply
*rpy
,
482 ds_put_format(s
, " success=%s", rpy
->success
? "true" : "false");
485 /* raft_install_snapshot_request. */
488 raft_install_snapshot_request_uninit(
489 struct raft_install_snapshot_request
*rq
)
491 json_destroy(rq
->last_servers
);
492 json_destroy(rq
->data
);
496 raft_install_snapshot_request_clone(
497 struct raft_install_snapshot_request
*dst
,
498 const struct raft_install_snapshot_request
*src
)
500 dst
->last_servers
= json_clone(src
->last_servers
);
501 dst
->data
= json_clone(src
->data
);
505 raft_install_snapshot_request_to_jsonrpc(
506 const struct raft_install_snapshot_request
*rq
, struct json
*args
)
508 raft_put_uint64(args
, "term", rq
->term
);
509 raft_put_uint64(args
, "last_index", rq
->last_index
);
510 raft_put_uint64(args
, "last_term", rq
->last_term
);
511 json_object_put(args
, "last_servers", json_clone(rq
->last_servers
));
512 json_object_put_format(args
, "last_eid",
513 UUID_FMT
, UUID_ARGS(&rq
->last_eid
));
515 json_object_put(args
, "data", json_clone(rq
->data
));
519 raft_install_snapshot_request_from_jsonrpc(
520 struct ovsdb_parser
*p
, struct raft_install_snapshot_request
*rq
)
522 rq
->last_servers
= json_nullable_clone(
523 ovsdb_parser_member(p
, "last_servers", OP_OBJECT
));
524 ovsdb_parser_put_error(p
, raft_servers_validate_json(rq
->last_servers
));
526 rq
->term
= raft_parse_required_uint64(p
, "term");
527 rq
->last_index
= raft_parse_required_uint64(p
, "last_index");
528 rq
->last_term
= raft_parse_required_uint64(p
, "last_term");
529 rq
->last_eid
= raft_parse_required_uuid(p
, "last_eid");
531 rq
->data
= json_nullable_clone(
532 ovsdb_parser_member(p
, "data", OP_OBJECT
| OP_ARRAY
));
536 raft_format_install_snapshot_request(
537 const struct raft_install_snapshot_request
*rq
, struct ds
*s
)
539 ds_put_format(s
, " term=%"PRIu64
, rq
->term
);
540 ds_put_format(s
, " last_index=%"PRIu64
, rq
->last_index
);
541 ds_put_format(s
, " last_term=%"PRIu64
, rq
->last_term
);
542 ds_put_format(s
, " last_eid="UUID_FMT
, UUID_ARGS(&rq
->last_eid
));
543 ds_put_cstr(s
, " last_servers=");
546 struct ovsdb_error
*error
=
547 raft_servers_from_json(rq
->last_servers
, &servers
);
549 raft_servers_format(&servers
, s
);
550 raft_servers_destroy(&servers
);
552 ds_put_cstr(s
, "***error***");
553 ovsdb_error_destroy(error
);
557 /* raft_install_snapshot_reply. */
560 raft_install_snapshot_reply_uninit(
561 struct raft_install_snapshot_reply
*rpy OVS_UNUSED
)
566 raft_install_snapshot_reply_clone(
567 struct raft_install_snapshot_reply
*dst OVS_UNUSED
,
568 const struct raft_install_snapshot_reply
*src OVS_UNUSED
)
573 raft_install_snapshot_reply_to_jsonrpc(
574 const struct raft_install_snapshot_reply
*rpy
, struct json
*args
)
576 raft_put_uint64(args
, "term", rpy
->term
);
577 raft_put_uint64(args
, "last_index", rpy
->last_index
);
578 raft_put_uint64(args
, "last_term", rpy
->last_term
);
582 raft_install_snapshot_reply_from_jsonrpc(
583 struct ovsdb_parser
*p
,
584 struct raft_install_snapshot_reply
*rpy
)
586 rpy
->term
= raft_parse_required_uint64(p
, "term");
587 rpy
->last_index
= raft_parse_required_uint64(p
, "last_index");
588 rpy
->last_term
= raft_parse_required_uint64(p
, "last_term");
592 raft_format_install_snapshot_reply(
593 const struct raft_install_snapshot_reply
*rpy
, struct ds
*s
)
595 ds_put_format(s
, " term=%"PRIu64
, rpy
->term
);
596 ds_put_format(s
, " last_index=%"PRIu64
, rpy
->last_index
);
597 ds_put_format(s
, " last_term=%"PRIu64
, rpy
->last_term
);
600 /* raft_remove_server_request. */
603 raft_remove_server_request_uninit(
604 struct raft_remove_server_request
*rq OVS_UNUSED
)
609 raft_remove_server_request_clone(
610 struct raft_remove_server_request
*dst OVS_UNUSED
,
611 const struct raft_remove_server_request
*src OVS_UNUSED
)
616 raft_remove_server_request_to_jsonrpc(
617 const struct raft_remove_server_request
*rq
, struct json
*args
)
619 json_object_put_format(args
, "server_id", UUID_FMT
, UUID_ARGS(&rq
->sid
));
623 raft_remove_server_request_from_jsonrpc(struct ovsdb_parser
*p
,
624 struct raft_remove_server_request
*rq
)
626 rq
->sid
= raft_parse_required_uuid(p
, "server_id");
630 raft_format_remove_server_request(const struct raft_remove_server_request
*rq
,
633 ds_put_format(s
, " server="SID_FMT
, SID_ARGS(&rq
->sid
));
636 /* raft_become_leader. */
639 raft_become_leader_uninit(struct raft_become_leader
*rpc OVS_UNUSED
)
644 raft_become_leader_clone(struct raft_become_leader
*dst OVS_UNUSED
,
645 const struct raft_become_leader
*src OVS_UNUSED
)
650 raft_become_leader_to_jsonrpc(const struct raft_become_leader
*rpc
,
653 raft_put_uint64(args
, "term", rpc
->term
);
657 raft_become_leader_from_jsonrpc(struct ovsdb_parser
*p
,
658 struct raft_become_leader
*rpc
)
660 rpc
->term
= raft_parse_required_uint64(p
, "term");
664 raft_format_become_leader(const struct raft_become_leader
*rq
, struct ds
*s
)
666 ds_put_format(s
, " term=%"PRIu64
, rq
->term
);
669 /* raft_execute_command_request. */
672 raft_execute_command_request_uninit(
673 struct raft_execute_command_request
*rq
)
675 json_destroy(rq
->data
);
679 raft_execute_command_request_clone(
680 struct raft_execute_command_request
*dst
,
681 const struct raft_execute_command_request
*src
)
683 dst
->data
= json_clone(src
->data
);
687 raft_execute_command_request_to_jsonrpc(
688 const struct raft_execute_command_request
*rq
, struct json
*args
)
690 json_object_put(args
, "data", json_clone(rq
->data
));
691 json_object_put_format(args
, "prereq", UUID_FMT
, UUID_ARGS(&rq
->prereq
));
692 json_object_put_format(args
, "result", UUID_FMT
, UUID_ARGS(&rq
->result
));
696 raft_execute_command_request_from_jsonrpc(
697 struct ovsdb_parser
*p
, struct raft_execute_command_request
*rq
)
699 rq
->data
= json_nullable_clone(ovsdb_parser_member(p
, "data",
700 OP_OBJECT
| OP_ARRAY
));
701 rq
->prereq
= raft_parse_required_uuid(p
, "prereq");
702 rq
->result
= raft_parse_required_uuid(p
, "result");
706 raft_format_execute_command_request(
707 const struct raft_execute_command_request
*rq
, struct ds
*s
)
709 ds_put_format(s
, " prereq="UUID_FMT
, UUID_ARGS(&rq
->prereq
));
710 ds_put_format(s
, " result="UUID_FMT
, UUID_ARGS(&rq
->result
));
711 ds_put_format(s
, " data=");
712 json_to_ds(rq
->data
, JSSF_SORT
, s
);
715 /* raft_execute_command_reply. */
718 raft_execute_command_reply_uninit(
719 struct raft_execute_command_reply
*rpy OVS_UNUSED
)
724 raft_execute_command_reply_clone(
725 struct raft_execute_command_reply
*dst OVS_UNUSED
,
726 const struct raft_execute_command_reply
*src OVS_UNUSED
)
731 raft_execute_command_reply_to_jsonrpc(
732 const struct raft_execute_command_reply
*rpy
, struct json
*args
)
734 json_object_put_format(args
, "result", UUID_FMT
, UUID_ARGS(&rpy
->result
));
735 json_object_put_string(args
, "status",
736 raft_command_status_to_string(rpy
->status
));
737 if (rpy
->commit_index
) {
738 raft_put_uint64(args
, "commit_index", rpy
->commit_index
);
743 raft_execute_command_reply_from_jsonrpc(
744 struct ovsdb_parser
*p
, struct raft_execute_command_reply
*rpy
)
746 rpy
->result
= raft_parse_required_uuid(p
, "result");
748 const char *status
= raft_parse_required_string(p
, "status");
749 if (status
&& !raft_command_status_from_string(status
, &rpy
->status
)) {
750 ovsdb_parser_raise_error(p
, "unknown status \"%s\"", status
);
753 rpy
->commit_index
= raft_parse_optional_uint64(p
, "commit_index");
757 raft_format_execute_command_reply(
758 const struct raft_execute_command_reply
*rpy
, struct ds
*s
)
760 ds_put_format(s
, " result="UUID_FMT
, UUID_ARGS(&rpy
->result
));
761 ds_put_format(s
, " status=\"%s\"",
762 raft_command_status_to_string(rpy
->status
));
763 if (rpy
->commit_index
) {
764 ds_put_format(s
, " commit_index=%"PRIu64
, rpy
->commit_index
);
769 raft_rpc_uninit(union raft_rpc
*rpc
)
772 free(rpc
->common
.comment
);
775 #define RAFT_RPC(ENUM, NAME) \
777 raft_##NAME##_uninit(&rpc->NAME); \
786 raft_rpc_clone(const union raft_rpc
*src
)
788 union raft_rpc
*dst
= xmemdup(src
, sizeof *src
);
789 dst
->common
.comment
= nullable_xstrdup(src
->common
.comment
);
792 #define RAFT_RPC(ENUM, NAME) \
794 raft_##NAME##_clone(&dst->NAME, &src->NAME); \
803 /* Returns 'rpc' converted to a jsonrpc_msg. The caller must eventually free
804 * the returned message.
806 * 'rpc->common.sid' should be the destination server ID; it is omitted if
807 * all-zeros. 'sid' is the source. 'cid' should be the cluster ID; it is
808 * omitted if all-zeros. */
810 raft_rpc_to_jsonrpc(const struct uuid
*cid
,
811 const struct uuid
*sid
,
812 const union raft_rpc
*rpc
)
814 struct json
*args
= json_object_create();
815 if (!uuid_is_zero(cid
)) {
816 json_object_put_format(args
, "cluster", UUID_FMT
, UUID_ARGS(cid
));
818 if (!uuid_is_zero(&rpc
->common
.sid
)) {
819 json_object_put_format(args
, "to", UUID_FMT
,
820 UUID_ARGS(&rpc
->common
.sid
));
822 json_object_put_format(args
, "from", UUID_FMT
, UUID_ARGS(sid
));
823 if (rpc
->common
.comment
) {
824 json_object_put_string(args
, "comment", rpc
->common
.comment
);
828 #define RAFT_RPC(ENUM, NAME) \
830 raft_##NAME##_to_jsonrpc(&rpc->NAME, args); \
838 return jsonrpc_create_notify(raft_rpc_type_to_string(rpc
->type
),
839 json_array_create_1(args
));
842 /* Parses 'msg' as a Raft message directed to 'sid' and initializes 'rpc'
843 * appropriately. On success, returns NULL and the caller owns the contents of
844 * 'rpc' and must eventually uninitialize it with raft_rpc_uninit(). On
845 * failure, returns an error that the caller must eventually free.
847 * 'cidp' must point to the Raft cluster's ID. If the cluster ID isn't yet
848 * known, then '*cidp' must be UUID_ZERO and this function will attempt to
849 * initialize it based on 'msg'. */
850 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
851 raft_rpc_from_jsonrpc(struct uuid
*cidp
, const struct uuid
*sid
,
852 const struct jsonrpc_msg
*msg
, union raft_rpc
*rpc
)
854 memset(rpc
, 0, sizeof *rpc
);
855 if (msg
->type
!= JSONRPC_NOTIFY
) {
856 return ovsdb_error(NULL
, "expecting notify RPC but received %s",
857 jsonrpc_msg_type_to_string(msg
->type
));
860 if (!raft_rpc_type_from_string(msg
->method
, &rpc
->type
)) {
861 return ovsdb_error(NULL
, "unknown method %s", msg
->method
);
864 if (json_array(msg
->params
)->n
!= 1) {
865 return ovsdb_error(NULL
,
866 "%s RPC has %"PRIuSIZE
" parameters (expected 1)",
867 msg
->method
, json_array(msg
->params
)->n
);
870 struct ovsdb_parser p
;
871 ovsdb_parser_init(&p
, json_array(msg
->params
)->elems
[0],
872 "raft %s RPC", msg
->method
);
874 bool is_hello
= rpc
->type
== RAFT_RPC_HELLO_REQUEST
;
875 bool is_add
= rpc
->type
== RAFT_RPC_ADD_SERVER_REQUEST
;
878 if (raft_parse_uuid(&p
, "cluster", is_add
, &cid
)
879 && !uuid_equals(&cid
, cidp
)) {
880 if (uuid_is_zero(cidp
)) {
882 VLOG_INFO("learned cluster ID "CID_FMT
, CID_ARGS(&cid
));
884 ovsdb_parser_raise_error(&p
, "wrong cluster "CID_FMT
" "
885 "(expected "CID_FMT
")",
886 CID_ARGS(&cid
), CID_ARGS(cidp
));
891 if (raft_parse_uuid(&p
, "to", is_add
|| is_hello
, &to_sid
)
892 && !uuid_equals(&to_sid
, sid
)) {
893 ovsdb_parser_raise_error(&p
, "misrouted message (addressed to "
894 SID_FMT
" but we're "SID_FMT
")",
895 SID_ARGS(&to_sid
), SID_ARGS(sid
));
898 rpc
->common
.sid
= raft_parse_required_uuid(&p
, "from");
899 rpc
->common
.comment
= nullable_xstrdup(
900 raft_parse_optional_string(&p
, "comment"));
903 #define RAFT_RPC(ENUM, NAME) \
905 raft_##NAME##_from_jsonrpc(&p, &rpc->NAME); \
914 struct ovsdb_error
*error
= ovsdb_parser_finish(&p
);
916 raft_rpc_uninit(rpc
);
921 /* Appends a formatted representation of 'rpc' to 's'.
923 * Does not include the RPC's server ID in the formatted representation, since
924 * the caller usually has more context that allows for a more human friendly
927 raft_rpc_format(const union raft_rpc
*rpc
, struct ds
*s
)
929 ds_put_cstr(s
, raft_rpc_type_to_string(rpc
->type
));
930 if (rpc
->common
.comment
) {
931 ds_put_format(s
, " \"%s\"", rpc
->common
.comment
);
936 #define RAFT_RPC(ENUM, NAME) \
938 raft_format_##NAME(&rpc->NAME, s); \
948 raft_rpc_get_term(const union raft_rpc
*rpc
)
951 case RAFT_RPC_HELLO_REQUEST
:
952 case RAFT_RPC_ADD_SERVER_REQUEST
:
953 case RAFT_RPC_ADD_SERVER_REPLY
:
954 case RAFT_RPC_REMOVE_SERVER_REQUEST
:
955 case RAFT_RPC_REMOVE_SERVER_REPLY
:
956 case RAFT_RPC_EXECUTE_COMMAND_REQUEST
:
957 case RAFT_RPC_EXECUTE_COMMAND_REPLY
:
960 case RAFT_RPC_APPEND_REQUEST
:
961 return rpc
->append_request
.term
;
963 case RAFT_RPC_APPEND_REPLY
:
964 return rpc
->append_reply
.term
;
966 case RAFT_RPC_VOTE_REQUEST
:
967 return rpc
->vote_request
.term
;
969 case RAFT_RPC_VOTE_REPLY
:
970 return rpc
->vote_reply
.term
;
972 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST
:
973 return rpc
->install_snapshot_request
.term
;
975 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY
:
976 return rpc
->install_snapshot_reply
.term
;
978 case RAFT_RPC_BECOME_LEADER
:
979 return rpc
->become_leader
.term
;
987 raft_rpc_get_vote(const union raft_rpc
*rpc
)
990 case RAFT_RPC_HELLO_REQUEST
:
991 case RAFT_RPC_ADD_SERVER_REQUEST
:
992 case RAFT_RPC_ADD_SERVER_REPLY
:
993 case RAFT_RPC_REMOVE_SERVER_REQUEST
:
994 case RAFT_RPC_REMOVE_SERVER_REPLY
:
995 case RAFT_RPC_EXECUTE_COMMAND_REQUEST
:
996 case RAFT_RPC_EXECUTE_COMMAND_REPLY
:
997 case RAFT_RPC_APPEND_REQUEST
:
998 case RAFT_RPC_APPEND_REPLY
:
999 case RAFT_RPC_VOTE_REQUEST
:
1000 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST
:
1001 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY
:
1002 case RAFT_RPC_BECOME_LEADER
:
1005 case RAFT_RPC_VOTE_REPLY
:
1006 return &raft_vote_reply_cast(rpc
)->vote
;
1013 /* Returns the minimum log index that must be synced to disk if 'rpc' is to be
1014 * sent. (This is generally the biggest log index in the message but some
1015 * messages, e.g. RAFT_RPC_APPEND_REQUEST, don't need their entries synced.) */
1017 raft_rpc_get_min_sync_index(const union raft_rpc
*rpc
)
1019 switch (rpc
->type
) {
1020 case RAFT_RPC_HELLO_REQUEST
:
1021 case RAFT_RPC_ADD_SERVER_REQUEST
:
1022 case RAFT_RPC_ADD_SERVER_REPLY
:
1023 case RAFT_RPC_REMOVE_SERVER_REQUEST
:
1024 case RAFT_RPC_REMOVE_SERVER_REPLY
:
1025 case RAFT_RPC_EXECUTE_COMMAND_REQUEST
:
1026 case RAFT_RPC_EXECUTE_COMMAND_REPLY
:
1027 case RAFT_RPC_APPEND_REQUEST
:
1028 case RAFT_RPC_BECOME_LEADER
:
1029 case RAFT_RPC_VOTE_REPLY
:
1032 case RAFT_RPC_APPEND_REPLY
:
1033 return raft_append_reply_cast(rpc
)->log_end
- 1;
1035 case RAFT_RPC_VOTE_REQUEST
:
1036 return raft_vote_request_cast(rpc
)->last_log_index
;
1038 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST
:
1039 return raft_install_snapshot_request_cast(rpc
)->last_index
;
1041 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY
:
1042 /* This will need to change if install_snapshot_reply becomes able to
1043 * report an error */
1044 return raft_install_snapshot_reply_cast(rpc
)->last_index
;