2 * Copyright (c) 2017, 2018 Nicira, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include "ovsdb-error.h"
25 #include "ovsdb-parser.h"
26 #include "openvswitch/dynamic-string.h"
27 #include "openvswitch/json.h"
28 #include "openvswitch/vlog.h"
31 VLOG_DEFINE_THIS_MODULE(raft_rpc
);
33 #define RAFT_RPC(ENUM, NAME) \
34 static void raft_##NAME##_uninit(struct raft_##NAME *); \
35 static void raft_##NAME##_clone(struct raft_##NAME *, \
36 const struct raft_##NAME *); \
37 static void raft_##NAME##_to_jsonrpc(const struct raft_##NAME *, \
39 static void raft_##NAME##_from_jsonrpc(struct ovsdb_parser *, \
40 struct raft_##NAME *); \
41 static void raft_format_##NAME(const struct raft_##NAME *, struct ds *);
47 raft_rpc_type_to_string(enum raft_rpc_type status
)
50 #define RAFT_RPC(ENUM, NAME) case ENUM: return #NAME;
58 raft_rpc_type_from_string(const char *s
, enum raft_rpc_type
*status
)
60 #define RAFT_RPC(ENUM, NAME) \
61 if (!strcmp(s, #NAME)) { \
70 /* raft_hello_request. */
73 raft_hello_request_uninit(struct raft_hello_request
*rq
)
79 raft_hello_request_clone(struct raft_hello_request
*dst
,
80 const struct raft_hello_request
*src
)
82 dst
->address
= nullable_xstrdup(src
->address
);
86 raft_hello_request_to_jsonrpc(const struct raft_hello_request
*rq
,
89 json_object_put_string(args
, "address", rq
->address
);
93 raft_hello_request_from_jsonrpc(struct ovsdb_parser
*p
,
94 struct raft_hello_request
*rq
)
96 rq
->address
= nullable_xstrdup(raft_parse_required_string(p
, "address"));
100 raft_format_hello_request(const struct raft_hello_request
*rq
,
103 ds_put_format(s
, " address=\"%s\"", rq
->address
);
106 /* raft_append_request. */
109 raft_append_request_uninit(struct raft_append_request
*rq
)
111 for (size_t i
= 0; i
< rq
->n_entries
; i
++) {
112 raft_entry_uninit(&rq
->entries
[i
]);
118 raft_append_request_clone(struct raft_append_request
*dst
,
119 const struct raft_append_request
*src
)
121 dst
->entries
= xmalloc(src
->n_entries
* sizeof *dst
->entries
);
122 for (size_t i
= 0; i
< src
->n_entries
; i
++) {
123 raft_entry_clone(&dst
->entries
[i
], &src
->entries
[i
]);
128 raft_append_request_to_jsonrpc(const struct raft_append_request
*rq
,
131 raft_put_uint64(args
, "term", rq
->term
);
132 raft_put_uint64(args
, "prev_log_index", rq
->prev_log_index
);
133 raft_put_uint64(args
, "prev_log_term", rq
->prev_log_term
);
134 raft_put_uint64(args
, "leader_commit", rq
->leader_commit
);
136 struct json
**entries
= xmalloc(rq
->n_entries
* sizeof *entries
);
137 for (size_t i
= 0; i
< rq
->n_entries
; i
++) {
138 entries
[i
] = raft_entry_to_json(&rq
->entries
[i
]);
140 json_object_put(args
, "log", json_array_create(entries
, rq
->n_entries
));
144 raft_append_request_from_jsonrpc(struct ovsdb_parser
*p
,
145 struct raft_append_request
*rq
)
147 rq
->term
= raft_parse_required_uint64(p
, "term");
148 rq
->prev_log_index
= raft_parse_required_uint64(p
, "prev_log_index");
149 rq
->prev_log_term
= raft_parse_required_uint64(p
, "prev_log_term");
150 rq
->leader_commit
= raft_parse_required_uint64(p
, "leader_commit");
152 const struct json
*log
= ovsdb_parser_member(p
, "log", OP_ARRAY
);
156 const struct json_array
*entries
= json_array(log
);
157 rq
->entries
= xmalloc(entries
->n
* sizeof *rq
->entries
);
159 for (size_t i
= 0; i
< entries
->n
; i
++) {
160 struct ovsdb_error
*error
= raft_entry_from_json(entries
->elems
[i
],
163 ovsdb_parser_put_error(p
, error
);
171 raft_format_append_request(const struct raft_append_request
*rq
,
174 ds_put_format(s
, " term=%"PRIu64
, rq
->term
);
175 ds_put_format(s
, " prev_log_index=%"PRIu64
, rq
->prev_log_index
);
176 ds_put_format(s
, " prev_log_term=%"PRIu64
, rq
->prev_log_term
);
177 ds_put_format(s
, " leader_commit=%"PRIu64
, rq
->leader_commit
);
178 ds_put_format(s
, " n_entries=%u", rq
->n_entries
);
181 /* raft_append_reply. */
184 raft_append_result_to_string(enum raft_append_result result
)
189 case RAFT_APPEND_INCONSISTENCY
:
190 return "inconsistency";
191 case RAFT_APPEND_IO_ERROR
:
199 raft_append_result_from_string(const char *s
, enum raft_append_result
*resultp
)
201 for (enum raft_append_result result
= 0; ; result
++) {
202 const char *s2
= raft_append_result_to_string(result
);
206 } else if (!strcmp(s
, s2
)) {
214 raft_append_reply_uninit(struct raft_append_reply
*rpy OVS_UNUSED
)
219 raft_append_reply_clone(struct raft_append_reply
*dst OVS_UNUSED
,
220 const struct raft_append_reply
*src OVS_UNUSED
)
225 raft_append_reply_to_jsonrpc(const struct raft_append_reply
*rpy
,
228 raft_put_uint64(args
, "term", rpy
->term
);
229 raft_put_uint64(args
, "log_end", rpy
->log_end
);
230 raft_put_uint64(args
, "prev_log_index", rpy
->prev_log_index
);
231 raft_put_uint64(args
, "prev_log_term", rpy
->prev_log_term
);
232 raft_put_uint64(args
, "n_entries", rpy
->n_entries
);
233 json_object_put_string(args
, "result",
234 raft_append_result_to_string(rpy
->result
));
238 raft_append_reply_from_jsonrpc(struct ovsdb_parser
*p
,
239 struct raft_append_reply
*rpy
)
241 rpy
->term
= raft_parse_required_uint64(p
, "term");
242 rpy
->log_end
= raft_parse_required_uint64(p
, "log_end");
243 rpy
->prev_log_index
= raft_parse_required_uint64(p
, "prev_log_index");
244 rpy
->prev_log_term
= raft_parse_required_uint64(p
, "prev_log_term");
245 rpy
->n_entries
= raft_parse_required_uint64(p
, "n_entries");
247 const char *result
= raft_parse_required_string(p
, "result");
248 if (result
&& !raft_append_result_from_string(result
, &rpy
->result
)) {
249 ovsdb_parser_raise_error(p
, "unknown result \"%s\"", result
);
254 raft_format_append_reply(const struct raft_append_reply
*rpy
, struct ds
*s
)
256 ds_put_format(s
, " term=%"PRIu64
, rpy
->term
);
257 ds_put_format(s
, " log_end=%"PRIu64
, rpy
->log_end
);
258 ds_put_format(s
, " result=\"%s\"",
259 raft_append_result_to_string(rpy
->result
));
262 /* raft_vote_request. */
265 raft_vote_request_uninit(struct raft_vote_request
*rq OVS_UNUSED
)
270 raft_vote_request_clone(struct raft_vote_request
*dst OVS_UNUSED
,
271 const struct raft_vote_request
*src OVS_UNUSED
)
276 raft_vote_request_to_jsonrpc(const struct raft_vote_request
*rq
,
279 raft_put_uint64(args
, "term", rq
->term
);
280 raft_put_uint64(args
, "last_log_index", rq
->last_log_index
);
281 raft_put_uint64(args
, "last_log_term", rq
->last_log_term
);
282 if (rq
->leadership_transfer
) {
283 json_object_put(args
, "leadership_transfer",
284 json_boolean_create(true));
289 raft_vote_request_from_jsonrpc(struct ovsdb_parser
*p
,
290 struct raft_vote_request
*rq
)
292 rq
->term
= raft_parse_required_uint64(p
, "term");
293 rq
->last_log_index
= raft_parse_required_uint64(p
, "last_log_index");
294 rq
->last_log_term
= raft_parse_required_uint64(p
, "last_log_term");
295 rq
->leadership_transfer
296 = raft_parse_optional_boolean(p
, "leadership_transfer") == 1;
300 raft_format_vote_request(const struct raft_vote_request
*rq
, struct ds
*s
)
302 ds_put_format(s
, " term=%"PRIu64
, rq
->term
);
303 ds_put_format(s
, " last_log_index=%"PRIu64
, rq
->last_log_index
);
304 ds_put_format(s
, " last_log_term=%"PRIu64
, rq
->last_log_term
);
305 if (rq
->leadership_transfer
) {
306 ds_put_cstr(s
, " leadership_transfer=true");
310 /* raft_vote_reply. */
313 raft_vote_reply_uninit(struct raft_vote_reply
*rpy OVS_UNUSED
)
318 raft_vote_reply_clone(struct raft_vote_reply
*dst OVS_UNUSED
,
319 const struct raft_vote_reply
*src OVS_UNUSED
)
324 raft_vote_reply_to_jsonrpc(const struct raft_vote_reply
*rpy
,
327 raft_put_uint64(args
, "term", rpy
->term
);
328 json_object_put_format(args
, "vote", UUID_FMT
, UUID_ARGS(&rpy
->vote
));
332 raft_vote_reply_from_jsonrpc(struct ovsdb_parser
*p
,
333 struct raft_vote_reply
*rpy
)
335 rpy
->term
= raft_parse_required_uint64(p
, "term");
336 rpy
->vote
= raft_parse_required_uuid(p
, "vote");
340 raft_format_vote_reply(const struct raft_vote_reply
*rpy
, struct ds
*s
)
342 ds_put_format(s
, " term=%"PRIu64
, rpy
->term
);
343 ds_put_format(s
, " vote="SID_FMT
, SID_ARGS(&rpy
->vote
));
346 /* raft_add_server_request */
349 raft_add_server_request_uninit(struct raft_add_server_request
*rq
)
355 raft_add_server_request_clone(struct raft_add_server_request
*dst
,
356 const struct raft_add_server_request
*src
)
358 dst
->address
= nullable_xstrdup(src
->address
);
362 raft_add_server_request_to_jsonrpc(const struct raft_add_server_request
*rq
,
365 json_object_put_string(args
, "address", rq
->address
);
369 raft_add_server_request_from_jsonrpc(struct ovsdb_parser
*p
,
370 struct raft_add_server_request
*rq
)
372 rq
->address
= nullable_xstrdup(raft_parse_required_string(p
, "address"));
376 raft_format_add_server_request(const struct raft_add_server_request
*rq
,
379 ds_put_format(s
, " address=\"%s\"", rq
->address
);
382 /* raft_add_server_reply. */
385 raft_add_server_reply_uninit(struct raft_add_server_reply
*rpy
)
387 sset_destroy(&rpy
->remote_addresses
);
391 raft_add_server_reply_clone(struct raft_add_server_reply
*dst
,
392 const struct raft_add_server_reply
*src
)
394 sset_clone(&dst
->remote_addresses
, &src
->remote_addresses
);
398 raft_add_server_reply_to_jsonrpc(const struct raft_add_server_reply
*rpy
,
401 json_object_put(args
, "success", json_boolean_create(rpy
->success
));
402 if (!sset_is_empty(&rpy
->remote_addresses
)) {
403 json_object_put(args
, "remote_addresses",
404 raft_addresses_to_json(&rpy
->remote_addresses
));
409 raft_add_server_reply_from_jsonrpc(struct ovsdb_parser
*p
,
410 struct raft_add_server_reply
*rpy
)
412 rpy
->success
= raft_parse_required_boolean(p
, "success");
414 const struct json
*json
= ovsdb_parser_member(p
, "remote_addresses",
415 OP_ARRAY
| OP_OPTIONAL
);
417 ovsdb_parser_put_error(p
, raft_addresses_from_json(
418 json
, &rpy
->remote_addresses
));
420 sset_init(&rpy
->remote_addresses
);
425 raft_format_add_server_reply(const struct raft_add_server_reply
*rpy
,
428 ds_put_format(s
, " success=%s", rpy
->success
? "true" : "false");
429 if (!sset_is_empty(&rpy
->remote_addresses
)) {
430 ds_put_cstr(s
, " remote_addresses=[");
434 SSET_FOR_EACH (address
, &rpy
->remote_addresses
) {
436 ds_put_cstr(s
, ", ");
438 ds_put_cstr(s
, address
);
444 /* raft_remove_server_reply. */
447 raft_remove_server_reply_uninit(
448 struct raft_remove_server_reply
*rpy OVS_UNUSED
)
453 raft_remove_server_reply_clone(
454 struct raft_remove_server_reply
*dst OVS_UNUSED
,
455 const struct raft_remove_server_reply
*src OVS_UNUSED
)
460 raft_remove_server_reply_to_jsonrpc(const struct raft_remove_server_reply
*rpy
,
463 if (!uuid_is_zero(&rpy
->target_sid
)) {
464 json_object_put_format(args
, "target_server",
465 UUID_FMT
, UUID_ARGS(&rpy
->target_sid
));
467 json_object_put(args
, "success", json_boolean_create(rpy
->success
));
471 raft_remove_server_reply_from_jsonrpc(struct ovsdb_parser
*p
,
472 struct raft_remove_server_reply
*rpy
)
474 rpy
->success
= raft_parse_required_boolean(p
, "success");
475 raft_parse_optional_uuid(p
, "target_server", &rpy
->target_sid
);
479 raft_format_remove_server_reply(const struct raft_remove_server_reply
*rpy
,
482 ds_put_format(s
, " success=%s", rpy
->success
? "true" : "false");
485 /* raft_install_snapshot_request. */
488 raft_install_snapshot_request_uninit(
489 struct raft_install_snapshot_request
*rq
)
491 json_destroy(rq
->last_servers
);
492 json_destroy(rq
->data
);
496 raft_install_snapshot_request_clone(
497 struct raft_install_snapshot_request
*dst
,
498 const struct raft_install_snapshot_request
*src
)
500 dst
->last_servers
= json_clone(src
->last_servers
);
501 dst
->data
= json_clone(src
->data
);
505 raft_install_snapshot_request_to_jsonrpc(
506 const struct raft_install_snapshot_request
*rq
, struct json
*args
)
508 raft_put_uint64(args
, "term", rq
->term
);
509 raft_put_uint64(args
, "last_index", rq
->last_index
);
510 raft_put_uint64(args
, "last_term", rq
->last_term
);
511 json_object_put(args
, "last_servers", json_clone(rq
->last_servers
));
512 json_object_put_format(args
, "last_eid",
513 UUID_FMT
, UUID_ARGS(&rq
->last_eid
));
514 raft_put_uint64(args
, "election_timer", rq
->election_timer
);
516 json_object_put(args
, "data", json_clone(rq
->data
));
520 raft_install_snapshot_request_from_jsonrpc(
521 struct ovsdb_parser
*p
, struct raft_install_snapshot_request
*rq
)
523 rq
->last_servers
= json_nullable_clone(
524 ovsdb_parser_member(p
, "last_servers", OP_OBJECT
));
525 ovsdb_parser_put_error(p
, raft_servers_validate_json(rq
->last_servers
));
527 rq
->term
= raft_parse_required_uint64(p
, "term");
528 rq
->last_index
= raft_parse_required_uint64(p
, "last_index");
529 rq
->last_term
= raft_parse_required_uint64(p
, "last_term");
530 rq
->last_eid
= raft_parse_required_uuid(p
, "last_eid");
531 /* election_timer is optional in file header, but is always populated in
532 * install_snapshot_request. */
533 rq
->election_timer
= raft_parse_required_uint64(p
, "election_timer");
535 rq
->data
= json_nullable_clone(
536 ovsdb_parser_member(p
, "data", OP_OBJECT
| OP_ARRAY
));
540 raft_format_install_snapshot_request(
541 const struct raft_install_snapshot_request
*rq
, struct ds
*s
)
543 ds_put_format(s
, " term=%"PRIu64
, rq
->term
);
544 ds_put_format(s
, " last_index=%"PRIu64
, rq
->last_index
);
545 ds_put_format(s
, " last_term=%"PRIu64
, rq
->last_term
);
546 ds_put_format(s
, " last_eid="UUID_FMT
, UUID_ARGS(&rq
->last_eid
));
547 ds_put_format(s
, " election_timer=%"PRIu64
, rq
->election_timer
);
548 ds_put_cstr(s
, " last_servers=");
551 struct ovsdb_error
*error
=
552 raft_servers_from_json(rq
->last_servers
, &servers
);
554 raft_servers_format(&servers
, s
);
555 raft_servers_destroy(&servers
);
557 ds_put_cstr(s
, "***error***");
558 ovsdb_error_destroy(error
);
562 /* raft_install_snapshot_reply. */
565 raft_install_snapshot_reply_uninit(
566 struct raft_install_snapshot_reply
*rpy OVS_UNUSED
)
571 raft_install_snapshot_reply_clone(
572 struct raft_install_snapshot_reply
*dst OVS_UNUSED
,
573 const struct raft_install_snapshot_reply
*src OVS_UNUSED
)
578 raft_install_snapshot_reply_to_jsonrpc(
579 const struct raft_install_snapshot_reply
*rpy
, struct json
*args
)
581 raft_put_uint64(args
, "term", rpy
->term
);
582 raft_put_uint64(args
, "last_index", rpy
->last_index
);
583 raft_put_uint64(args
, "last_term", rpy
->last_term
);
587 raft_install_snapshot_reply_from_jsonrpc(
588 struct ovsdb_parser
*p
,
589 struct raft_install_snapshot_reply
*rpy
)
591 rpy
->term
= raft_parse_required_uint64(p
, "term");
592 rpy
->last_index
= raft_parse_required_uint64(p
, "last_index");
593 rpy
->last_term
= raft_parse_required_uint64(p
, "last_term");
597 raft_format_install_snapshot_reply(
598 const struct raft_install_snapshot_reply
*rpy
, struct ds
*s
)
600 ds_put_format(s
, " term=%"PRIu64
, rpy
->term
);
601 ds_put_format(s
, " last_index=%"PRIu64
, rpy
->last_index
);
602 ds_put_format(s
, " last_term=%"PRIu64
, rpy
->last_term
);
605 /* raft_remove_server_request. */
608 raft_remove_server_request_uninit(
609 struct raft_remove_server_request
*rq OVS_UNUSED
)
614 raft_remove_server_request_clone(
615 struct raft_remove_server_request
*dst OVS_UNUSED
,
616 const struct raft_remove_server_request
*src OVS_UNUSED
)
621 raft_remove_server_request_to_jsonrpc(
622 const struct raft_remove_server_request
*rq
, struct json
*args
)
624 json_object_put_format(args
, "server_id", UUID_FMT
, UUID_ARGS(&rq
->sid
));
628 raft_remove_server_request_from_jsonrpc(struct ovsdb_parser
*p
,
629 struct raft_remove_server_request
*rq
)
631 rq
->sid
= raft_parse_required_uuid(p
, "server_id");
635 raft_format_remove_server_request(const struct raft_remove_server_request
*rq
,
638 ds_put_format(s
, " server="SID_FMT
, SID_ARGS(&rq
->sid
));
641 /* raft_become_leader. */
644 raft_become_leader_uninit(struct raft_become_leader
*rpc OVS_UNUSED
)
649 raft_become_leader_clone(struct raft_become_leader
*dst OVS_UNUSED
,
650 const struct raft_become_leader
*src OVS_UNUSED
)
655 raft_become_leader_to_jsonrpc(const struct raft_become_leader
*rpc
,
658 raft_put_uint64(args
, "term", rpc
->term
);
662 raft_become_leader_from_jsonrpc(struct ovsdb_parser
*p
,
663 struct raft_become_leader
*rpc
)
665 rpc
->term
= raft_parse_required_uint64(p
, "term");
669 raft_format_become_leader(const struct raft_become_leader
*rq
, struct ds
*s
)
671 ds_put_format(s
, " term=%"PRIu64
, rq
->term
);
674 /* raft_execute_command_request. */
677 raft_execute_command_request_uninit(
678 struct raft_execute_command_request
*rq
)
680 json_destroy(rq
->data
);
684 raft_execute_command_request_clone(
685 struct raft_execute_command_request
*dst
,
686 const struct raft_execute_command_request
*src
)
688 dst
->data
= json_clone(src
->data
);
692 raft_execute_command_request_to_jsonrpc(
693 const struct raft_execute_command_request
*rq
, struct json
*args
)
695 json_object_put(args
, "data", json_clone(rq
->data
));
696 json_object_put_format(args
, "prereq", UUID_FMT
, UUID_ARGS(&rq
->prereq
));
697 json_object_put_format(args
, "result", UUID_FMT
, UUID_ARGS(&rq
->result
));
701 raft_execute_command_request_from_jsonrpc(
702 struct ovsdb_parser
*p
, struct raft_execute_command_request
*rq
)
704 rq
->data
= json_nullable_clone(ovsdb_parser_member(p
, "data",
705 OP_OBJECT
| OP_ARRAY
));
706 rq
->prereq
= raft_parse_required_uuid(p
, "prereq");
707 rq
->result
= raft_parse_required_uuid(p
, "result");
711 raft_format_execute_command_request(
712 const struct raft_execute_command_request
*rq
, struct ds
*s
)
714 ds_put_format(s
, " prereq="UUID_FMT
, UUID_ARGS(&rq
->prereq
));
715 ds_put_format(s
, " result="UUID_FMT
, UUID_ARGS(&rq
->result
));
716 ds_put_format(s
, " data=");
717 json_to_ds(rq
->data
, JSSF_SORT
, s
);
720 /* raft_execute_command_reply. */
723 raft_execute_command_reply_uninit(
724 struct raft_execute_command_reply
*rpy OVS_UNUSED
)
729 raft_execute_command_reply_clone(
730 struct raft_execute_command_reply
*dst OVS_UNUSED
,
731 const struct raft_execute_command_reply
*src OVS_UNUSED
)
736 raft_execute_command_reply_to_jsonrpc(
737 const struct raft_execute_command_reply
*rpy
, struct json
*args
)
739 json_object_put_format(args
, "result", UUID_FMT
, UUID_ARGS(&rpy
->result
));
740 json_object_put_string(args
, "status",
741 raft_command_status_to_string(rpy
->status
));
742 if (rpy
->commit_index
) {
743 raft_put_uint64(args
, "commit_index", rpy
->commit_index
);
748 raft_execute_command_reply_from_jsonrpc(
749 struct ovsdb_parser
*p
, struct raft_execute_command_reply
*rpy
)
751 rpy
->result
= raft_parse_required_uuid(p
, "result");
753 const char *status
= raft_parse_required_string(p
, "status");
754 if (status
&& !raft_command_status_from_string(status
, &rpy
->status
)) {
755 ovsdb_parser_raise_error(p
, "unknown status \"%s\"", status
);
758 rpy
->commit_index
= raft_parse_optional_uint64(p
, "commit_index");
762 raft_format_execute_command_reply(
763 const struct raft_execute_command_reply
*rpy
, struct ds
*s
)
765 ds_put_format(s
, " result="UUID_FMT
, UUID_ARGS(&rpy
->result
));
766 ds_put_format(s
, " status=\"%s\"",
767 raft_command_status_to_string(rpy
->status
));
768 if (rpy
->commit_index
) {
769 ds_put_format(s
, " commit_index=%"PRIu64
, rpy
->commit_index
);
774 raft_rpc_uninit(union raft_rpc
*rpc
)
777 free(rpc
->common
.comment
);
780 #define RAFT_RPC(ENUM, NAME) \
782 raft_##NAME##_uninit(&rpc->NAME); \
791 raft_rpc_clone(const union raft_rpc
*src
)
793 union raft_rpc
*dst
= xmemdup(src
, sizeof *src
);
794 dst
->common
.comment
= nullable_xstrdup(src
->common
.comment
);
797 #define RAFT_RPC(ENUM, NAME) \
799 raft_##NAME##_clone(&dst->NAME, &src->NAME); \
808 /* Returns 'rpc' converted to a jsonrpc_msg. The caller must eventually free
809 * the returned message.
811 * 'rpc->common.sid' should be the destination server ID; it is omitted if
812 * all-zeros. 'sid' is the source. 'cid' should be the cluster ID; it is
813 * omitted if all-zeros. */
815 raft_rpc_to_jsonrpc(const struct uuid
*cid
,
816 const struct uuid
*sid
,
817 const union raft_rpc
*rpc
)
819 struct json
*args
= json_object_create();
820 if (!uuid_is_zero(cid
)) {
821 json_object_put_format(args
, "cluster", UUID_FMT
, UUID_ARGS(cid
));
823 if (!uuid_is_zero(&rpc
->common
.sid
)) {
824 json_object_put_format(args
, "to", UUID_FMT
,
825 UUID_ARGS(&rpc
->common
.sid
));
827 json_object_put_format(args
, "from", UUID_FMT
, UUID_ARGS(sid
));
828 if (rpc
->common
.comment
) {
829 json_object_put_string(args
, "comment", rpc
->common
.comment
);
833 #define RAFT_RPC(ENUM, NAME) \
835 raft_##NAME##_to_jsonrpc(&rpc->NAME, args); \
843 return jsonrpc_create_notify(raft_rpc_type_to_string(rpc
->type
),
844 json_array_create_1(args
));
847 /* Parses 'msg' as a Raft message directed to 'sid' and initializes 'rpc'
848 * appropriately. On success, returns NULL and the caller owns the contents of
849 * 'rpc' and must eventually uninitialize it with raft_rpc_uninit(). On
850 * failure, returns an error that the caller must eventually free.
852 * 'cidp' must point to the Raft cluster's ID. If the cluster ID isn't yet
853 * known, then '*cidp' must be UUID_ZERO and this function will attempt to
854 * initialize it based on 'msg'. */
855 struct ovsdb_error
* OVS_WARN_UNUSED_RESULT
856 raft_rpc_from_jsonrpc(struct uuid
*cidp
, const struct uuid
*sid
,
857 const struct jsonrpc_msg
*msg
, union raft_rpc
*rpc
)
859 memset(rpc
, 0, sizeof *rpc
);
860 if (msg
->type
!= JSONRPC_NOTIFY
) {
861 return ovsdb_error(NULL
, "expecting notify RPC but received %s",
862 jsonrpc_msg_type_to_string(msg
->type
));
865 if (!raft_rpc_type_from_string(msg
->method
, &rpc
->type
)) {
866 return ovsdb_error(NULL
, "unknown method %s", msg
->method
);
869 if (json_array(msg
->params
)->n
!= 1) {
870 return ovsdb_error(NULL
,
871 "%s RPC has %"PRIuSIZE
" parameters (expected 1)",
872 msg
->method
, json_array(msg
->params
)->n
);
875 struct ovsdb_parser p
;
876 ovsdb_parser_init(&p
, json_array(msg
->params
)->elems
[0],
877 "raft %s RPC", msg
->method
);
879 bool is_hello
= rpc
->type
== RAFT_RPC_HELLO_REQUEST
;
880 bool is_add
= rpc
->type
== RAFT_RPC_ADD_SERVER_REQUEST
;
883 if (raft_parse_uuid(&p
, "cluster", is_add
, &cid
)
884 && !uuid_equals(&cid
, cidp
)) {
885 if (uuid_is_zero(cidp
)) {
887 VLOG_INFO("learned cluster ID "CID_FMT
, CID_ARGS(&cid
));
889 ovsdb_parser_raise_error(&p
, "wrong cluster "CID_FMT
" "
890 "(expected "CID_FMT
")",
891 CID_ARGS(&cid
), CID_ARGS(cidp
));
896 if (raft_parse_uuid(&p
, "to", is_add
|| is_hello
, &to_sid
)
897 && !uuid_equals(&to_sid
, sid
)) {
898 ovsdb_parser_raise_error(&p
, "misrouted message (addressed to "
899 SID_FMT
" but we're "SID_FMT
")",
900 SID_ARGS(&to_sid
), SID_ARGS(sid
));
903 rpc
->common
.sid
= raft_parse_required_uuid(&p
, "from");
904 rpc
->common
.comment
= nullable_xstrdup(
905 raft_parse_optional_string(&p
, "comment"));
908 #define RAFT_RPC(ENUM, NAME) \
910 raft_##NAME##_from_jsonrpc(&p, &rpc->NAME); \
919 struct ovsdb_error
*error
= ovsdb_parser_finish(&p
);
921 raft_rpc_uninit(rpc
);
926 /* Appends a formatted representation of 'rpc' to 's'.
928 * Does not include the RPC's server ID in the formatted representation, since
929 * the caller usually has more context that allows for a more human friendly
932 raft_rpc_format(const union raft_rpc
*rpc
, struct ds
*s
)
934 ds_put_cstr(s
, raft_rpc_type_to_string(rpc
->type
));
935 if (rpc
->common
.comment
) {
936 ds_put_format(s
, " \"%s\"", rpc
->common
.comment
);
941 #define RAFT_RPC(ENUM, NAME) \
943 raft_format_##NAME(&rpc->NAME, s); \
953 raft_rpc_get_term(const union raft_rpc
*rpc
)
956 case RAFT_RPC_HELLO_REQUEST
:
957 case RAFT_RPC_ADD_SERVER_REQUEST
:
958 case RAFT_RPC_ADD_SERVER_REPLY
:
959 case RAFT_RPC_REMOVE_SERVER_REQUEST
:
960 case RAFT_RPC_REMOVE_SERVER_REPLY
:
961 case RAFT_RPC_EXECUTE_COMMAND_REQUEST
:
962 case RAFT_RPC_EXECUTE_COMMAND_REPLY
:
965 case RAFT_RPC_APPEND_REQUEST
:
966 return rpc
->append_request
.term
;
968 case RAFT_RPC_APPEND_REPLY
:
969 return rpc
->append_reply
.term
;
971 case RAFT_RPC_VOTE_REQUEST
:
972 return rpc
->vote_request
.term
;
974 case RAFT_RPC_VOTE_REPLY
:
975 return rpc
->vote_reply
.term
;
977 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST
:
978 return rpc
->install_snapshot_request
.term
;
980 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY
:
981 return rpc
->install_snapshot_reply
.term
;
983 case RAFT_RPC_BECOME_LEADER
:
984 return rpc
->become_leader
.term
;
992 raft_rpc_get_vote(const union raft_rpc
*rpc
)
995 case RAFT_RPC_HELLO_REQUEST
:
996 case RAFT_RPC_ADD_SERVER_REQUEST
:
997 case RAFT_RPC_ADD_SERVER_REPLY
:
998 case RAFT_RPC_REMOVE_SERVER_REQUEST
:
999 case RAFT_RPC_REMOVE_SERVER_REPLY
:
1000 case RAFT_RPC_EXECUTE_COMMAND_REQUEST
:
1001 case RAFT_RPC_EXECUTE_COMMAND_REPLY
:
1002 case RAFT_RPC_APPEND_REQUEST
:
1003 case RAFT_RPC_APPEND_REPLY
:
1004 case RAFT_RPC_VOTE_REQUEST
:
1005 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST
:
1006 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY
:
1007 case RAFT_RPC_BECOME_LEADER
:
1010 case RAFT_RPC_VOTE_REPLY
:
1011 return &raft_vote_reply_cast(rpc
)->vote
;
1018 /* Returns the minimum log index that must be synced to disk if 'rpc' is to be
1019 * sent. (This is generally the biggest log index in the message but some
1020 * messages, e.g. RAFT_RPC_APPEND_REQUEST, don't need their entries synced.) */
1022 raft_rpc_get_min_sync_index(const union raft_rpc
*rpc
)
1024 switch (rpc
->type
) {
1025 case RAFT_RPC_HELLO_REQUEST
:
1026 case RAFT_RPC_ADD_SERVER_REQUEST
:
1027 case RAFT_RPC_ADD_SERVER_REPLY
:
1028 case RAFT_RPC_REMOVE_SERVER_REQUEST
:
1029 case RAFT_RPC_REMOVE_SERVER_REPLY
:
1030 case RAFT_RPC_EXECUTE_COMMAND_REQUEST
:
1031 case RAFT_RPC_EXECUTE_COMMAND_REPLY
:
1032 case RAFT_RPC_APPEND_REQUEST
:
1033 case RAFT_RPC_BECOME_LEADER
:
1034 case RAFT_RPC_VOTE_REPLY
:
1037 case RAFT_RPC_APPEND_REPLY
:
1038 return raft_append_reply_cast(rpc
)->log_end
- 1;
1040 case RAFT_RPC_VOTE_REQUEST
:
1041 return raft_vote_request_cast(rpc
)->last_log_index
;
1043 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST
:
1044 return raft_install_snapshot_request_cast(rpc
)->last_index
;
1046 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY
:
1047 /* This will need to change if install_snapshot_reply becomes able to
1048 * report an error */
1049 return raft_install_snapshot_reply_cast(rpc
)->last_index
;