]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/raft-rpc.c
ovsdb: Clarify that a server that leaves a cluster may never rejoin.
[mirror_ovs.git] / ovsdb / raft-rpc.c
CommitLineData
1b1d2e6d
BP
1/*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "raft-rpc.h"
20#include <stdlib.h>
21#include <string.h>
22#include "compiler.h"
23#include "jsonrpc.h"
24#include "ovsdb-error.h"
25#include "ovsdb-parser.h"
26#include "openvswitch/dynamic-string.h"
27#include "openvswitch/json.h"
28#include "openvswitch/vlog.h"
29#include "sset.h"
30
31VLOG_DEFINE_THIS_MODULE(raft_rpc);
32
33#define RAFT_RPC(ENUM, NAME) \
34 static void raft_##NAME##_uninit(struct raft_##NAME *); \
35 static void raft_##NAME##_clone(struct raft_##NAME *, \
36 const struct raft_##NAME *); \
37 static void raft_##NAME##_to_jsonrpc(const struct raft_##NAME *, \
38 struct json *); \
39 static void raft_##NAME##_from_jsonrpc(struct ovsdb_parser *, \
40 struct raft_##NAME *); \
41 static void raft_format_##NAME(const struct raft_##NAME *, struct ds *);
42RAFT_RPC_TYPES
43#undef RAFT_RPC
44\f
45/* raft_rpc_type. */
46const char *
47raft_rpc_type_to_string(enum raft_rpc_type status)
48{
49 switch (status) {
50#define RAFT_RPC(ENUM, NAME) case ENUM: return #NAME;
51 RAFT_RPC_TYPES
52#undef RAFT_RPC
53 }
54 return "<unknown>";
55}
56
57bool
58raft_rpc_type_from_string(const char *s, enum raft_rpc_type *status)
59{
60#define RAFT_RPC(ENUM, NAME) \
61 if (!strcmp(s, #NAME)) { \
62 *status = ENUM; \
63 return true; \
64 }
65 RAFT_RPC_TYPES
66#undef RAFT_RPC
67 return false;
68}
69\f
70/* raft_hello_request. */
71
72static void
73raft_hello_request_uninit(struct raft_hello_request *rq)
74{
75 free(rq->address);
76}
77
78static void
79raft_hello_request_clone(struct raft_hello_request *dst,
80 const struct raft_hello_request *src)
81{
82 dst->address = nullable_xstrdup(src->address);
83}
84
85static void
86raft_hello_request_to_jsonrpc(const struct raft_hello_request *rq,
87 struct json *args)
88{
89 json_object_put_string(args, "address", rq->address);
90}
91
92static void
93raft_hello_request_from_jsonrpc(struct ovsdb_parser *p,
94 struct raft_hello_request *rq)
95{
96 rq->address = nullable_xstrdup(raft_parse_required_string(p, "address"));
97}
98
99static void
100raft_format_hello_request(const struct raft_hello_request *rq,
101 struct ds *s)
102{
103 ds_put_format(s, " address=\"%s\"", rq->address);
104}
105\f
106/* raft_append_request. */
107
108static void
109raft_append_request_uninit(struct raft_append_request *rq)
110{
111 for (size_t i = 0; i < rq->n_entries; i++) {
112 raft_entry_uninit(&rq->entries[i]);
113 }
114 free(rq->entries);
115}
116
117static void
118raft_append_request_clone(struct raft_append_request *dst,
119 const struct raft_append_request *src)
120{
121 dst->entries = xmalloc(src->n_entries * sizeof *dst->entries);
122 for (size_t i = 0; i < src->n_entries; i++) {
123 raft_entry_clone(&dst->entries[i], &src->entries[i]);
124 }
125}
126
127static void
128raft_append_request_to_jsonrpc(const struct raft_append_request *rq,
129 struct json *args)
130{
131 raft_put_uint64(args, "term", rq->term);
132 raft_put_uint64(args, "prev_log_index", rq->prev_log_index);
133 raft_put_uint64(args, "prev_log_term", rq->prev_log_term);
134 raft_put_uint64(args, "leader_commit", rq->leader_commit);
135
136 struct json **entries = xmalloc(rq->n_entries * sizeof *entries);
137 for (size_t i = 0; i < rq->n_entries; i++) {
138 entries[i] = raft_entry_to_json(&rq->entries[i]);
139 }
140 json_object_put(args, "log", json_array_create(entries, rq->n_entries));
141}
142
143static void
144raft_append_request_from_jsonrpc(struct ovsdb_parser *p,
145 struct raft_append_request *rq)
146{
147 rq->term = raft_parse_required_uint64(p, "term");
148 rq->prev_log_index = raft_parse_required_uint64(p, "prev_log_index");
149 rq->prev_log_term = raft_parse_required_uint64(p, "prev_log_term");
150 rq->leader_commit = raft_parse_required_uint64(p, "leader_commit");
151
152 const struct json *log = ovsdb_parser_member(p, "log", OP_ARRAY);
153 if (!log) {
154 return;
155 }
156 const struct json_array *entries = json_array(log);
157 rq->entries = xmalloc(entries->n * sizeof *rq->entries);
158 rq->n_entries = 0;
159 for (size_t i = 0; i < entries->n; i++) {
160 struct ovsdb_error *error = raft_entry_from_json(entries->elems[i],
161 &rq->entries[i]);
162 if (error) {
163 ovsdb_parser_put_error(p, error);
164 break;
165 }
166 rq->n_entries++;
167 }
168}
169
170static void
171raft_format_append_request(const struct raft_append_request *rq,
172 struct ds *s)
173{
174 ds_put_format(s, " term=%"PRIu64, rq->term);
175 ds_put_format(s, " prev_log_index=%"PRIu64, rq->prev_log_index);
176 ds_put_format(s, " prev_log_term=%"PRIu64, rq->prev_log_term);
177 ds_put_format(s, " leader_commit=%"PRIu64, rq->leader_commit);
178 ds_put_format(s, " n_entries=%u", rq->n_entries);
179}
180\f
181/* raft_append_reply. */
182
183const char *
184raft_append_result_to_string(enum raft_append_result result)
185{
186 switch (result) {
187 case RAFT_APPEND_OK:
188 return "OK";
189 case RAFT_APPEND_INCONSISTENCY:
190 return "inconsistency";
191 case RAFT_APPEND_IO_ERROR:
192 return "I/O error";
193 default:
194 return NULL;
195 }
196}
197
198bool
199raft_append_result_from_string(const char *s, enum raft_append_result *resultp)
200{
201 for (enum raft_append_result result = 0; ; result++) {
202 const char *s2 = raft_append_result_to_string(result);
203 if (!s2) {
204 *resultp = 0;
205 return false;
206 } else if (!strcmp(s, s2)) {
207 *resultp = result;
208 return true;
209 }
210 }
211}
212
213static void
214raft_append_reply_uninit(struct raft_append_reply *rpy OVS_UNUSED)
215{
216}
217
218static void
219raft_append_reply_clone(struct raft_append_reply *dst OVS_UNUSED,
220 const struct raft_append_reply *src OVS_UNUSED)
221{
222}
223
224static void
225raft_append_reply_to_jsonrpc(const struct raft_append_reply *rpy,
226 struct json *args)
227{
228 raft_put_uint64(args, "term", rpy->term);
229 raft_put_uint64(args, "log_end", rpy->log_end);
230 raft_put_uint64(args, "prev_log_index", rpy->prev_log_index);
231 raft_put_uint64(args, "prev_log_term", rpy->prev_log_term);
232 raft_put_uint64(args, "n_entries", rpy->n_entries);
233 json_object_put_string(args, "result",
234 raft_append_result_to_string(rpy->result));
235}
236
237static void
238raft_append_reply_from_jsonrpc(struct ovsdb_parser *p,
239 struct raft_append_reply *rpy)
240{
241 rpy->term = raft_parse_required_uint64(p, "term");
242 rpy->log_end = raft_parse_required_uint64(p, "log_end");
243 rpy->prev_log_index = raft_parse_required_uint64(p, "prev_log_index");
244 rpy->prev_log_term = raft_parse_required_uint64(p, "prev_log_term");
245 rpy->n_entries = raft_parse_required_uint64(p, "n_entries");
246
247 const char *result = raft_parse_required_string(p, "result");
248 if (result && !raft_append_result_from_string(result, &rpy->result)) {
249 ovsdb_parser_raise_error(p, "unknown result \"%s\"", result);
250 }
251}
252
253static void
254raft_format_append_reply(const struct raft_append_reply *rpy, struct ds *s)
255{
256 ds_put_format(s, " term=%"PRIu64, rpy->term);
257 ds_put_format(s, " log_end=%"PRIu64, rpy->log_end);
258 ds_put_format(s, " result=\"%s\"",
259 raft_append_result_to_string(rpy->result));
260}
261\f
262/* raft_vote_request. */
263
264static void
265raft_vote_request_uninit(struct raft_vote_request *rq OVS_UNUSED)
266{
267}
268
269static void
270raft_vote_request_clone(struct raft_vote_request *dst OVS_UNUSED,
271 const struct raft_vote_request *src OVS_UNUSED)
272{
273}
274
275static void
276raft_vote_request_to_jsonrpc(const struct raft_vote_request *rq,
277 struct json *args)
278{
279 raft_put_uint64(args, "term", rq->term);
280 raft_put_uint64(args, "last_log_index", rq->last_log_index);
281 raft_put_uint64(args, "last_log_term", rq->last_log_term);
282 if (rq->leadership_transfer) {
283 json_object_put(args, "leadership_transfer",
284 json_boolean_create(true));
285 }
286}
287
288static void
289raft_vote_request_from_jsonrpc(struct ovsdb_parser *p,
290 struct raft_vote_request *rq)
291{
292 rq->term = raft_parse_required_uint64(p, "term");
293 rq->last_log_index = raft_parse_required_uint64(p, "last_log_index");
294 rq->last_log_term = raft_parse_required_uint64(p, "last_log_term");
295 rq->leadership_transfer
296 = raft_parse_optional_boolean(p, "leadership_transfer") == 1;
297}
298
299static void
300raft_format_vote_request(const struct raft_vote_request *rq, struct ds *s)
301{
302 ds_put_format(s, " term=%"PRIu64, rq->term);
303 ds_put_format(s, " last_log_index=%"PRIu64, rq->last_log_index);
304 ds_put_format(s, " last_log_term=%"PRIu64, rq->last_log_term);
305 if (rq->leadership_transfer) {
306 ds_put_cstr(s, " leadership_transfer=true");
307 }
308}
309\f
310/* raft_vote_reply. */
311
312static void
313raft_vote_reply_uninit(struct raft_vote_reply *rpy OVS_UNUSED)
314{
315}
316
317static void
318raft_vote_reply_clone(struct raft_vote_reply *dst OVS_UNUSED,
319 const struct raft_vote_reply *src OVS_UNUSED)
320{
321}
322
323static void
324raft_vote_reply_to_jsonrpc(const struct raft_vote_reply *rpy,
325 struct json *args)
326{
327 raft_put_uint64(args, "term", rpy->term);
328 json_object_put_format(args, "vote", UUID_FMT, UUID_ARGS(&rpy->vote));
329}
330
331static void
332raft_vote_reply_from_jsonrpc(struct ovsdb_parser *p,
333 struct raft_vote_reply *rpy)
334{
335 rpy->term = raft_parse_required_uint64(p, "term");
336 rpy->vote = raft_parse_required_uuid(p, "vote");
337}
338
339static void
340raft_format_vote_reply(const struct raft_vote_reply *rpy, struct ds *s)
341{
342 ds_put_format(s, " term=%"PRIu64, rpy->term);
343 ds_put_format(s, " vote="SID_FMT, SID_ARGS(&rpy->vote));
344}
345\f
346/* raft_add_server_request */
347
348static void
349raft_add_server_request_uninit(struct raft_add_server_request *rq)
350{
351 free(rq->address);
352}
353
354static void
355raft_add_server_request_clone(struct raft_add_server_request *dst,
356 const struct raft_add_server_request *src)
357{
358 dst->address = nullable_xstrdup(src->address);
359}
360
361static void
362raft_add_server_request_to_jsonrpc(const struct raft_add_server_request *rq,
363 struct json *args)
364{
365 json_object_put_string(args, "address", rq->address);
366}
367
368static void
369raft_add_server_request_from_jsonrpc(struct ovsdb_parser *p,
370 struct raft_add_server_request *rq)
371{
372 rq->address = nullable_xstrdup(raft_parse_required_string(p, "address"));
373}
374
375static void
376raft_format_add_server_request(const struct raft_add_server_request *rq,
377 struct ds *s)
378{
379 ds_put_format(s, " address=\"%s\"", rq->address);
380}
381\f
382/* raft_add_server_reply. */
383
384static void
385raft_add_server_reply_uninit(struct raft_add_server_reply *rpy)
386{
387 sset_destroy(&rpy->remote_addresses);
388}
389
390static void
391raft_add_server_reply_clone(struct raft_add_server_reply *dst,
392 const struct raft_add_server_reply *src)
393{
394 sset_clone(&dst->remote_addresses, &src->remote_addresses);
395}
396
397static void
398raft_add_server_reply_to_jsonrpc(const struct raft_add_server_reply *rpy,
399 struct json *args)
400{
401 json_object_put(args, "success", json_boolean_create(rpy->success));
402 if (!sset_is_empty(&rpy->remote_addresses)) {
403 json_object_put(args, "remote_addresses",
404 raft_addresses_to_json(&rpy->remote_addresses));
405 }
406}
407
408static void
409raft_add_server_reply_from_jsonrpc(struct ovsdb_parser *p,
410 struct raft_add_server_reply *rpy)
411{
412 rpy->success = raft_parse_required_boolean(p, "success");
413
414 const struct json *json = ovsdb_parser_member(p, "remote_addresses",
415 OP_ARRAY | OP_OPTIONAL);
416 if (json) {
417 ovsdb_parser_put_error(p, raft_addresses_from_json(
418 json, &rpy->remote_addresses));
419 } else {
420 sset_init(&rpy->remote_addresses);
421 }
422}
423
424static void
425raft_format_add_server_reply(const struct raft_add_server_reply *rpy,
426 struct ds *s)
427{
428 ds_put_format(s, " success=%s", rpy->success ? "true" : "false");
429 if (!sset_is_empty(&rpy->remote_addresses)) {
430 ds_put_cstr(s, " remote_addresses=[");
431
432 const char *address;
433 int i = 0;
434 SSET_FOR_EACH (address, &rpy->remote_addresses) {
435 if (i++ > 0) {
436 ds_put_cstr(s, ", ");
437 }
438 ds_put_cstr(s, address);
439 }
440 ds_put_char(s, ']');
441 }
442}
443\f
444/* raft_remove_server_reply. */
445
446static void
447raft_remove_server_reply_uninit(
448 struct raft_remove_server_reply *rpy OVS_UNUSED)
449{
450}
451
452static void
453raft_remove_server_reply_clone(
454 struct raft_remove_server_reply *dst OVS_UNUSED,
455 const struct raft_remove_server_reply *src OVS_UNUSED)
456{
457}
458
459static void
460raft_remove_server_reply_to_jsonrpc(const struct raft_remove_server_reply *rpy,
461 struct json *args)
462{
463 json_object_put(args, "success", json_boolean_create(rpy->success));
464}
465
466static void
467raft_remove_server_reply_from_jsonrpc(struct ovsdb_parser *p,
468 struct raft_remove_server_reply *rpy)
469{
470 rpy->success = raft_parse_required_boolean(p, "success");
471}
472
473static void
474raft_format_remove_server_reply(const struct raft_remove_server_reply *rpy,
475 struct ds *s)
476{
477 ds_put_format(s, " success=%s", rpy->success ? "true" : "false");
478}
479\f
480/* raft_install_snapshot_request. */
481
482static void
483raft_install_snapshot_request_uninit(
484 struct raft_install_snapshot_request *rq)
485{
486 json_destroy(rq->last_servers);
487 json_destroy(rq->data);
488}
489
490static void
491raft_install_snapshot_request_clone(
492 struct raft_install_snapshot_request *dst,
493 const struct raft_install_snapshot_request *src)
494{
495 dst->last_servers = json_clone(src->last_servers);
496 dst->data = json_clone(src->data);
497}
498
499static void
500raft_install_snapshot_request_to_jsonrpc(
501 const struct raft_install_snapshot_request *rq, struct json *args)
502{
503 raft_put_uint64(args, "term", rq->term);
504 raft_put_uint64(args, "last_index", rq->last_index);
505 raft_put_uint64(args, "last_term", rq->last_term);
506 json_object_put(args, "last_servers", json_clone(rq->last_servers));
507 json_object_put_format(args, "last_eid",
508 UUID_FMT, UUID_ARGS(&rq->last_eid));
509
510 json_object_put(args, "data", json_clone(rq->data));
511}
512
513static void
514raft_install_snapshot_request_from_jsonrpc(
515 struct ovsdb_parser *p, struct raft_install_snapshot_request *rq)
516{
517 rq->last_servers = json_nullable_clone(
518 ovsdb_parser_member(p, "last_servers", OP_OBJECT));
519 ovsdb_parser_put_error(p, raft_servers_validate_json(rq->last_servers));
520
521 rq->term = raft_parse_required_uint64(p, "term");
522 rq->last_index = raft_parse_required_uint64(p, "last_index");
523 rq->last_term = raft_parse_required_uint64(p, "last_term");
524 rq->last_eid = raft_parse_required_uuid(p, "last_eid");
525
526 rq->data = json_nullable_clone(
527 ovsdb_parser_member(p, "data", OP_OBJECT | OP_ARRAY));
528}
529
530static void
531raft_format_install_snapshot_request(
532 const struct raft_install_snapshot_request *rq, struct ds *s)
533{
534 ds_put_format(s, " term=%"PRIu64, rq->term);
535 ds_put_format(s, " last_index=%"PRIu64, rq->last_index);
536 ds_put_format(s, " last_term=%"PRIu64, rq->last_term);
537 ds_put_format(s, " last_eid="UUID_FMT, UUID_ARGS(&rq->last_eid));
538 ds_put_cstr(s, " last_servers=");
539
540 struct hmap servers;
541 struct ovsdb_error *error =
542 raft_servers_from_json(rq->last_servers, &servers);
543 if (!error) {
544 raft_servers_format(&servers, s);
545 raft_servers_destroy(&servers);
546 } else {
547 ds_put_cstr(s, "***error***");
548 ovsdb_error_destroy(error);
549 }
550}
551\f
552/* raft_install_snapshot_reply. */
553
554static void
555raft_install_snapshot_reply_uninit(
556 struct raft_install_snapshot_reply *rpy OVS_UNUSED)
557{
558}
559
560static void
561raft_install_snapshot_reply_clone(
562 struct raft_install_snapshot_reply *dst OVS_UNUSED,
563 const struct raft_install_snapshot_reply *src OVS_UNUSED)
564{
565}
566
567static void
568raft_install_snapshot_reply_to_jsonrpc(
569 const struct raft_install_snapshot_reply *rpy, struct json *args)
570{
571 raft_put_uint64(args, "term", rpy->term);
572 raft_put_uint64(args, "last_index", rpy->last_index);
573 raft_put_uint64(args, "last_term", rpy->last_term);
574}
575
576static void
577raft_install_snapshot_reply_from_jsonrpc(
578 struct ovsdb_parser *p,
579 struct raft_install_snapshot_reply *rpy)
580{
581 rpy->term = raft_parse_required_uint64(p, "term");
582 rpy->last_index = raft_parse_required_uint64(p, "last_index");
583 rpy->last_term = raft_parse_required_uint64(p, "last_term");
584}
585
586static void
587raft_format_install_snapshot_reply(
588 const struct raft_install_snapshot_reply *rpy, struct ds *s)
589{
590 ds_put_format(s, " term=%"PRIu64, rpy->term);
591 ds_put_format(s, " last_index=%"PRIu64, rpy->last_index);
592 ds_put_format(s, " last_term=%"PRIu64, rpy->last_term);
593}
594\f
595/* raft_remove_server_request. */
596
597static void
598raft_remove_server_request_uninit(
599 struct raft_remove_server_request *rq OVS_UNUSED)
600{
601}
602
603static void
604raft_remove_server_request_clone(
605 struct raft_remove_server_request *dst OVS_UNUSED,
606 const struct raft_remove_server_request *src OVS_UNUSED)
607{
608}
609
610static void
611raft_remove_server_request_to_jsonrpc(
612 const struct raft_remove_server_request *rq, struct json *args)
613{
614 json_object_put_format(args, "server_id", UUID_FMT, UUID_ARGS(&rq->sid));
615}
616
617static void
618raft_remove_server_request_from_jsonrpc(struct ovsdb_parser *p,
619 struct raft_remove_server_request *rq)
620{
621 rq->sid = raft_parse_required_uuid(p, "server_id");
622}
623
624static void
625raft_format_remove_server_request(const struct raft_remove_server_request *rq,
626 struct ds *s)
627{
628 ds_put_format(s, " server="SID_FMT, SID_ARGS(&rq->sid));
629}
630\f
631/* raft_become_leader. */
632
633static void
634raft_become_leader_uninit(struct raft_become_leader *rpc OVS_UNUSED)
635{
636}
637
638static void
639raft_become_leader_clone(struct raft_become_leader *dst OVS_UNUSED,
640 const struct raft_become_leader *src OVS_UNUSED)
641{
642}
643
644static void
645raft_become_leader_to_jsonrpc(const struct raft_become_leader *rpc,
646 struct json *args)
647{
648 raft_put_uint64(args, "term", rpc->term);
649}
650
651static void
652raft_become_leader_from_jsonrpc(struct ovsdb_parser *p,
653 struct raft_become_leader *rpc)
654{
655 rpc->term = raft_parse_required_uint64(p, "term");
656}
657
658static void
659raft_format_become_leader(const struct raft_become_leader *rq, struct ds *s)
660{
661 ds_put_format(s, " term=%"PRIu64, rq->term);
662}
663\f
664/* raft_execute_command_request. */
665
666static void
667raft_execute_command_request_uninit(
668 struct raft_execute_command_request *rq)
669{
670 json_destroy(rq->data);
671}
672
673static void
674raft_execute_command_request_clone(
675 struct raft_execute_command_request *dst,
676 const struct raft_execute_command_request *src)
677{
678 dst->data = json_clone(src->data);
679}
680
681static void
682raft_execute_command_request_to_jsonrpc(
683 const struct raft_execute_command_request *rq, struct json *args)
684{
685 json_object_put(args, "data", json_clone(rq->data));
686 json_object_put_format(args, "prereq", UUID_FMT, UUID_ARGS(&rq->prereq));
687 json_object_put_format(args, "result", UUID_FMT, UUID_ARGS(&rq->result));
688}
689
690static void
691raft_execute_command_request_from_jsonrpc(
692 struct ovsdb_parser *p, struct raft_execute_command_request *rq)
693{
694 rq->data = json_nullable_clone(ovsdb_parser_member(p, "data",
695 OP_OBJECT | OP_ARRAY));
696 rq->prereq = raft_parse_required_uuid(p, "prereq");
697 rq->result = raft_parse_required_uuid(p, "result");
698}
699
700static void
701raft_format_execute_command_request(
702 const struct raft_execute_command_request *rq, struct ds *s)
703{
704 ds_put_format(s, " prereq="UUID_FMT, UUID_ARGS(&rq->prereq));
705 ds_put_format(s, " result="UUID_FMT, UUID_ARGS(&rq->result));
706 ds_put_format(s, " data=");
707 json_to_ds(rq->data, JSSF_SORT, s);
708}
709\f
710/* raft_execute_command_reply. */
711
712static void
713raft_execute_command_reply_uninit(
714 struct raft_execute_command_reply *rpy OVS_UNUSED)
715{
716}
717
718static void
719raft_execute_command_reply_clone(
720 struct raft_execute_command_reply *dst OVS_UNUSED,
721 const struct raft_execute_command_reply *src OVS_UNUSED)
722{
723}
724
725static void
726raft_execute_command_reply_to_jsonrpc(
727 const struct raft_execute_command_reply *rpy, struct json *args)
728{
729 json_object_put_format(args, "result", UUID_FMT, UUID_ARGS(&rpy->result));
730 json_object_put_string(args, "status",
731 raft_command_status_to_string(rpy->status));
732 if (rpy->commit_index) {
733 raft_put_uint64(args, "commit_index", rpy->commit_index);
734 }
735}
736
737static void
738raft_execute_command_reply_from_jsonrpc(
739 struct ovsdb_parser *p, struct raft_execute_command_reply *rpy)
740{
741 rpy->result = raft_parse_required_uuid(p, "result");
742
743 const char *status = raft_parse_required_string(p, "status");
744 if (status && !raft_command_status_from_string(status, &rpy->status)) {
745 ovsdb_parser_raise_error(p, "unknown status \"%s\"", status);
746 }
747
748 rpy->commit_index = raft_parse_optional_uint64(p, "commit_index");
749}
750
751static void
752raft_format_execute_command_reply(
753 const struct raft_execute_command_reply *rpy, struct ds *s)
754{
755 ds_put_format(s, " result="UUID_FMT, UUID_ARGS(&rpy->result));
756 ds_put_format(s, " status=\"%s\"",
757 raft_command_status_to_string(rpy->status));
758 if (rpy->commit_index) {
759 ds_put_format(s, " commit_index=%"PRIu64, rpy->commit_index);
760 }
761}
762\f
763void
764raft_rpc_uninit(union raft_rpc *rpc)
765{
766 if (rpc) {
767 free(rpc->common.comment);
768
769 switch (rpc->type) {
770#define RAFT_RPC(ENUM, NAME) \
771 case ENUM: \
772 raft_##NAME##_uninit(&rpc->NAME); \
773 break;
774 RAFT_RPC_TYPES
775#undef RAFT_RPC
776 }
777 }
778}
779
780union raft_rpc *
781raft_rpc_clone(const union raft_rpc *src)
782{
783 union raft_rpc *dst = xmemdup(src, sizeof *src);
784 dst->common.comment = nullable_xstrdup(src->common.comment);
785
786 switch (src->type) {
787#define RAFT_RPC(ENUM, NAME) \
788 case ENUM: \
789 raft_##NAME##_clone(&dst->NAME, &src->NAME); \
790 break;
791 RAFT_RPC_TYPES
792#undef RAFT_RPC
793 }
794
795 return dst;
796}
797
798/* Returns 'rpc' converted to a jsonrpc_msg. The caller must eventually free
799 * the returned message.
800 *
801 * 'rpc->common.sid' should be the destination server ID; it is omitted if
802 * all-zeros. 'sid' is the source. 'cid' should be the cluster ID; it is
803 * omitted if all-zeros. */
804struct jsonrpc_msg *
805raft_rpc_to_jsonrpc(const struct uuid *cid,
806 const struct uuid *sid,
807 const union raft_rpc *rpc)
808{
809 struct json *args = json_object_create();
810 if (!uuid_is_zero(cid)) {
811 json_object_put_format(args, "cluster", UUID_FMT, UUID_ARGS(cid));
812 }
813 if (!uuid_is_zero(&rpc->common.sid)) {
814 json_object_put_format(args, "to", UUID_FMT,
815 UUID_ARGS(&rpc->common.sid));
816 }
817 json_object_put_format(args, "from", UUID_FMT, UUID_ARGS(sid));
818 if (rpc->common.comment) {
819 json_object_put_string(args, "comment", rpc->common.comment);
820 }
821
822 switch (rpc->type) {
823#define RAFT_RPC(ENUM, NAME) \
824 case ENUM: \
825 raft_##NAME##_to_jsonrpc(&rpc->NAME, args); \
826 break;
827 RAFT_RPC_TYPES
828#undef RAFT_RPC
829 default:
830 OVS_NOT_REACHED();
831 }
832
833 return jsonrpc_create_notify(raft_rpc_type_to_string(rpc->type),
834 json_array_create_1(args));
835}
836
837/* Parses 'msg' as a Raft message directed to 'sid' and initializes 'rpc'
838 * appropriately. On success, returns NULL and the caller owns the contents of
839 * 'rpc' and must eventually uninitialize it with raft_rpc_uninit(). On
840 * failure, returns an error that the caller must eventually free.
841 *
842 * 'cidp' must point to the Raft cluster's ID. If the cluster ID isn't yet
843 * known, then '*cidp' must be UUID_ZERO and this function will attempt to
844 * initialize it based on 'msg'. */
845struct ovsdb_error * OVS_WARN_UNUSED_RESULT
846raft_rpc_from_jsonrpc(struct uuid *cidp, const struct uuid *sid,
847 const struct jsonrpc_msg *msg, union raft_rpc *rpc)
848{
849 memset(rpc, 0, sizeof *rpc);
850 if (msg->type != JSONRPC_NOTIFY) {
851 return ovsdb_error(NULL, "expecting notify RPC but received %s",
852 jsonrpc_msg_type_to_string(msg->type));
853 }
854
855 if (!raft_rpc_type_from_string(msg->method, &rpc->type)) {
856 return ovsdb_error(NULL, "unknown method %s", msg->method);
857 }
858
859 if (json_array(msg->params)->n != 1) {
860 return ovsdb_error(NULL,
861 "%s RPC has %"PRIuSIZE" parameters (expected 1)",
862 msg->method, json_array(msg->params)->n);
863 }
864
865 struct ovsdb_parser p;
866 ovsdb_parser_init(&p, json_array(msg->params)->elems[0],
867 "raft %s RPC", msg->method);
868
869 bool is_hello = rpc->type == RAFT_RPC_HELLO_REQUEST;
870 bool is_add = rpc->type == RAFT_RPC_ADD_SERVER_REQUEST;
871
872 struct uuid cid;
873 if (raft_parse_uuid(&p, "cluster", is_add, &cid)
874 && !uuid_equals(&cid, cidp)) {
875 if (uuid_is_zero(cidp)) {
876 *cidp = cid;
877 VLOG_INFO("learned cluster ID "CID_FMT, CID_ARGS(&cid));
878 } else {
879 ovsdb_parser_raise_error(&p, "wrong cluster "CID_FMT" "
880 "(expected "CID_FMT")",
881 CID_ARGS(&cid), CID_ARGS(cidp));
882 }
883 }
884
885 struct uuid to_sid;
886 if (raft_parse_uuid(&p, "to", is_add || is_hello, &to_sid)
887 && !uuid_equals(&to_sid, sid)) {
888 ovsdb_parser_raise_error(&p, "misrouted message (addressed to "
889 SID_FMT" but we're "SID_FMT")",
890 SID_ARGS(&to_sid), SID_ARGS(sid));
891 }
892
893 rpc->common.sid = raft_parse_required_uuid(&p, "from");
894 rpc->common.comment = nullable_xstrdup(
895 raft_parse_optional_string(&p, "comment"));
896
897 switch (rpc->type) {
898#define RAFT_RPC(ENUM, NAME) \
899 case ENUM: \
900 raft_##NAME##_from_jsonrpc(&p, &rpc->NAME); \
901 break;
902 RAFT_RPC_TYPES
903#undef RAFT_RPC
904
905 default:
906 OVS_NOT_REACHED();
907 }
908
909 struct ovsdb_error *error = ovsdb_parser_finish(&p);
910 if (error) {
911 raft_rpc_uninit(rpc);
912 }
913 return error;
914}
915
916/* Appends a formatted representation of 'rpc' to 's'.
917 *
918 * Does not include the RPC's server ID in the formatted representation, since
919 * the caller usually has more context that allows for a more human friendly
920 * name. */
921void
922raft_rpc_format(const union raft_rpc *rpc, struct ds *s)
923{
924 ds_put_cstr(s, raft_rpc_type_to_string(rpc->type));
925 if (rpc->common.comment) {
926 ds_put_format(s, " \"%s\"", rpc->common.comment);
927 }
928 ds_put_char(s, ':');
929
930 switch (rpc->type) {
931#define RAFT_RPC(ENUM, NAME) \
932 case ENUM: \
933 raft_format_##NAME(&rpc->NAME, s); \
934 break;
935 RAFT_RPC_TYPES
936#undef RAFT_RPC
937 default:
938 OVS_NOT_REACHED();
939 }
940}
941
942uint64_t
943raft_rpc_get_term(const union raft_rpc *rpc)
944{
945 switch (rpc->type) {
946 case RAFT_RPC_HELLO_REQUEST:
947 case RAFT_RPC_ADD_SERVER_REQUEST:
948 case RAFT_RPC_ADD_SERVER_REPLY:
949 case RAFT_RPC_REMOVE_SERVER_REQUEST:
950 case RAFT_RPC_REMOVE_SERVER_REPLY:
951 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
952 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
953 return 0;
954
955 case RAFT_RPC_APPEND_REQUEST:
956 return rpc->append_request.term;
957
958 case RAFT_RPC_APPEND_REPLY:
959 return rpc->append_reply.term;
960
961 case RAFT_RPC_VOTE_REQUEST:
962 return rpc->vote_request.term;
963
964 case RAFT_RPC_VOTE_REPLY:
965 return rpc->vote_reply.term;
966
967 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
968 return rpc->install_snapshot_request.term;
969
970 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
971 return rpc->install_snapshot_reply.term;
972
973 case RAFT_RPC_BECOME_LEADER:
974 return rpc->become_leader.term;
975
976 default:
977 OVS_NOT_REACHED();
978 }
979}
980
981const struct uuid *
982raft_rpc_get_vote(const union raft_rpc *rpc)
983{
984 switch (rpc->type) {
985 case RAFT_RPC_HELLO_REQUEST:
986 case RAFT_RPC_ADD_SERVER_REQUEST:
987 case RAFT_RPC_ADD_SERVER_REPLY:
988 case RAFT_RPC_REMOVE_SERVER_REQUEST:
989 case RAFT_RPC_REMOVE_SERVER_REPLY:
990 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
991 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
992 case RAFT_RPC_APPEND_REQUEST:
993 case RAFT_RPC_APPEND_REPLY:
994 case RAFT_RPC_VOTE_REQUEST:
995 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
996 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
997 case RAFT_RPC_BECOME_LEADER:
998 return NULL;
999
1000 case RAFT_RPC_VOTE_REPLY:
1001 return &raft_vote_reply_cast(rpc)->vote;
1002
1003 default:
1004 OVS_NOT_REACHED();
1005 }
1006}
1007
1008/* Returns the minimum log index that must be synced to disk if 'rpc' is to be
1009 * sent. (This is generally the biggest log index in the message but some
1010 * messages, e.g. RAFT_RPC_APPEND_REQUEST, don't need their entries synced.) */
1011uint64_t
1012raft_rpc_get_min_sync_index(const union raft_rpc *rpc)
1013{
1014 switch (rpc->type) {
1015 case RAFT_RPC_HELLO_REQUEST:
1016 case RAFT_RPC_ADD_SERVER_REQUEST:
1017 case RAFT_RPC_ADD_SERVER_REPLY:
1018 case RAFT_RPC_REMOVE_SERVER_REQUEST:
1019 case RAFT_RPC_REMOVE_SERVER_REPLY:
1020 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
1021 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
1022 case RAFT_RPC_APPEND_REQUEST:
1023 case RAFT_RPC_BECOME_LEADER:
1024 case RAFT_RPC_VOTE_REPLY:
1025 return 0;
1026
1027 case RAFT_RPC_APPEND_REPLY:
1028 return raft_append_reply_cast(rpc)->log_end - 1;
1029
1030 case RAFT_RPC_VOTE_REQUEST:
1031 return raft_vote_request_cast(rpc)->last_log_index;
1032
1033 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
1034 return raft_install_snapshot_request_cast(rpc)->last_index;
1035
1036 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
1037 /* This will need to change if install_snapshot_reply becomes able to
1038 * report an error */
1039 return raft_install_snapshot_reply_cast(rpc)->last_index;
1040
1041 default:
1042 OVS_NOT_REACHED();
1043 }
1044}