]> git.proxmox.com Git - mirror_ovs.git/blame - ovsdb/raft-rpc.c
raft: Send all missing logs in one single append_request.
[mirror_ovs.git] / ovsdb / raft-rpc.c
CommitLineData
1b1d2e6d
BP
1/*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <config.h>
18
19#include "raft-rpc.h"
20#include <stdlib.h>
21#include <string.h>
22#include "compiler.h"
23#include "jsonrpc.h"
24#include "ovsdb-error.h"
25#include "ovsdb-parser.h"
26#include "openvswitch/dynamic-string.h"
27#include "openvswitch/json.h"
28#include "openvswitch/vlog.h"
29#include "sset.h"
30
31VLOG_DEFINE_THIS_MODULE(raft_rpc);
32
33#define RAFT_RPC(ENUM, NAME) \
34 static void raft_##NAME##_uninit(struct raft_##NAME *); \
35 static void raft_##NAME##_clone(struct raft_##NAME *, \
36 const struct raft_##NAME *); \
37 static void raft_##NAME##_to_jsonrpc(const struct raft_##NAME *, \
38 struct json *); \
39 static void raft_##NAME##_from_jsonrpc(struct ovsdb_parser *, \
40 struct raft_##NAME *); \
41 static void raft_format_##NAME(const struct raft_##NAME *, struct ds *);
42RAFT_RPC_TYPES
43#undef RAFT_RPC
44\f
45/* raft_rpc_type. */
46const char *
47raft_rpc_type_to_string(enum raft_rpc_type status)
48{
49 switch (status) {
50#define RAFT_RPC(ENUM, NAME) case ENUM: return #NAME;
51 RAFT_RPC_TYPES
52#undef RAFT_RPC
53 }
54 return "<unknown>";
55}
56
57bool
58raft_rpc_type_from_string(const char *s, enum raft_rpc_type *status)
59{
60#define RAFT_RPC(ENUM, NAME) \
61 if (!strcmp(s, #NAME)) { \
62 *status = ENUM; \
63 return true; \
64 }
65 RAFT_RPC_TYPES
66#undef RAFT_RPC
67 return false;
68}
69\f
70/* raft_hello_request. */
71
72static void
73raft_hello_request_uninit(struct raft_hello_request *rq)
74{
75 free(rq->address);
76}
77
78static void
79raft_hello_request_clone(struct raft_hello_request *dst,
80 const struct raft_hello_request *src)
81{
82 dst->address = nullable_xstrdup(src->address);
83}
84
85static void
86raft_hello_request_to_jsonrpc(const struct raft_hello_request *rq,
87 struct json *args)
88{
89 json_object_put_string(args, "address", rq->address);
90}
91
92static void
93raft_hello_request_from_jsonrpc(struct ovsdb_parser *p,
94 struct raft_hello_request *rq)
95{
96 rq->address = nullable_xstrdup(raft_parse_required_string(p, "address"));
97}
98
99static void
100raft_format_hello_request(const struct raft_hello_request *rq,
101 struct ds *s)
102{
103 ds_put_format(s, " address=\"%s\"", rq->address);
104}
105\f
106/* raft_append_request. */
107
108static void
109raft_append_request_uninit(struct raft_append_request *rq)
110{
111 for (size_t i = 0; i < rq->n_entries; i++) {
112 raft_entry_uninit(&rq->entries[i]);
113 }
114 free(rq->entries);
115}
116
117static void
118raft_append_request_clone(struct raft_append_request *dst,
119 const struct raft_append_request *src)
120{
121 dst->entries = xmalloc(src->n_entries * sizeof *dst->entries);
122 for (size_t i = 0; i < src->n_entries; i++) {
123 raft_entry_clone(&dst->entries[i], &src->entries[i]);
124 }
125}
126
127static void
128raft_append_request_to_jsonrpc(const struct raft_append_request *rq,
129 struct json *args)
130{
131 raft_put_uint64(args, "term", rq->term);
132 raft_put_uint64(args, "prev_log_index", rq->prev_log_index);
133 raft_put_uint64(args, "prev_log_term", rq->prev_log_term);
134 raft_put_uint64(args, "leader_commit", rq->leader_commit);
135
136 struct json **entries = xmalloc(rq->n_entries * sizeof *entries);
137 for (size_t i = 0; i < rq->n_entries; i++) {
138 entries[i] = raft_entry_to_json(&rq->entries[i]);
139 }
140 json_object_put(args, "log", json_array_create(entries, rq->n_entries));
141}
142
143static void
144raft_append_request_from_jsonrpc(struct ovsdb_parser *p,
145 struct raft_append_request *rq)
146{
147 rq->term = raft_parse_required_uint64(p, "term");
148 rq->prev_log_index = raft_parse_required_uint64(p, "prev_log_index");
149 rq->prev_log_term = raft_parse_required_uint64(p, "prev_log_term");
150 rq->leader_commit = raft_parse_required_uint64(p, "leader_commit");
151
152 const struct json *log = ovsdb_parser_member(p, "log", OP_ARRAY);
153 if (!log) {
154 return;
155 }
156 const struct json_array *entries = json_array(log);
157 rq->entries = xmalloc(entries->n * sizeof *rq->entries);
158 rq->n_entries = 0;
159 for (size_t i = 0; i < entries->n; i++) {
160 struct ovsdb_error *error = raft_entry_from_json(entries->elems[i],
161 &rq->entries[i]);
162 if (error) {
163 ovsdb_parser_put_error(p, error);
164 break;
165 }
166 rq->n_entries++;
167 }
168}
169
170static void
171raft_format_append_request(const struct raft_append_request *rq,
172 struct ds *s)
173{
174 ds_put_format(s, " term=%"PRIu64, rq->term);
175 ds_put_format(s, " prev_log_index=%"PRIu64, rq->prev_log_index);
176 ds_put_format(s, " prev_log_term=%"PRIu64, rq->prev_log_term);
177 ds_put_format(s, " leader_commit=%"PRIu64, rq->leader_commit);
178 ds_put_format(s, " n_entries=%u", rq->n_entries);
179}
180\f
181/* raft_append_reply. */
182
183const char *
184raft_append_result_to_string(enum raft_append_result result)
185{
186 switch (result) {
187 case RAFT_APPEND_OK:
188 return "OK";
189 case RAFT_APPEND_INCONSISTENCY:
190 return "inconsistency";
191 case RAFT_APPEND_IO_ERROR:
192 return "I/O error";
193 default:
194 return NULL;
195 }
196}
197
198bool
199raft_append_result_from_string(const char *s, enum raft_append_result *resultp)
200{
201 for (enum raft_append_result result = 0; ; result++) {
202 const char *s2 = raft_append_result_to_string(result);
203 if (!s2) {
204 *resultp = 0;
205 return false;
206 } else if (!strcmp(s, s2)) {
207 *resultp = result;
208 return true;
209 }
210 }
211}
212
213static void
214raft_append_reply_uninit(struct raft_append_reply *rpy OVS_UNUSED)
215{
216}
217
218static void
219raft_append_reply_clone(struct raft_append_reply *dst OVS_UNUSED,
220 const struct raft_append_reply *src OVS_UNUSED)
221{
222}
223
224static void
225raft_append_reply_to_jsonrpc(const struct raft_append_reply *rpy,
226 struct json *args)
227{
228 raft_put_uint64(args, "term", rpy->term);
229 raft_put_uint64(args, "log_end", rpy->log_end);
230 raft_put_uint64(args, "prev_log_index", rpy->prev_log_index);
231 raft_put_uint64(args, "prev_log_term", rpy->prev_log_term);
232 raft_put_uint64(args, "n_entries", rpy->n_entries);
233 json_object_put_string(args, "result",
234 raft_append_result_to_string(rpy->result));
235}
236
237static void
238raft_append_reply_from_jsonrpc(struct ovsdb_parser *p,
239 struct raft_append_reply *rpy)
240{
241 rpy->term = raft_parse_required_uint64(p, "term");
242 rpy->log_end = raft_parse_required_uint64(p, "log_end");
243 rpy->prev_log_index = raft_parse_required_uint64(p, "prev_log_index");
244 rpy->prev_log_term = raft_parse_required_uint64(p, "prev_log_term");
245 rpy->n_entries = raft_parse_required_uint64(p, "n_entries");
246
247 const char *result = raft_parse_required_string(p, "result");
248 if (result && !raft_append_result_from_string(result, &rpy->result)) {
249 ovsdb_parser_raise_error(p, "unknown result \"%s\"", result);
250 }
251}
252
253static void
254raft_format_append_reply(const struct raft_append_reply *rpy, struct ds *s)
255{
256 ds_put_format(s, " term=%"PRIu64, rpy->term);
257 ds_put_format(s, " log_end=%"PRIu64, rpy->log_end);
258 ds_put_format(s, " result=\"%s\"",
259 raft_append_result_to_string(rpy->result));
260}
261\f
262/* raft_vote_request. */
263
264static void
265raft_vote_request_uninit(struct raft_vote_request *rq OVS_UNUSED)
266{
267}
268
269static void
270raft_vote_request_clone(struct raft_vote_request *dst OVS_UNUSED,
271 const struct raft_vote_request *src OVS_UNUSED)
272{
273}
274
275static void
276raft_vote_request_to_jsonrpc(const struct raft_vote_request *rq,
277 struct json *args)
278{
279 raft_put_uint64(args, "term", rq->term);
280 raft_put_uint64(args, "last_log_index", rq->last_log_index);
281 raft_put_uint64(args, "last_log_term", rq->last_log_term);
282 if (rq->leadership_transfer) {
283 json_object_put(args, "leadership_transfer",
284 json_boolean_create(true));
285 }
286}
287
288static void
289raft_vote_request_from_jsonrpc(struct ovsdb_parser *p,
290 struct raft_vote_request *rq)
291{
292 rq->term = raft_parse_required_uint64(p, "term");
293 rq->last_log_index = raft_parse_required_uint64(p, "last_log_index");
294 rq->last_log_term = raft_parse_required_uint64(p, "last_log_term");
295 rq->leadership_transfer
296 = raft_parse_optional_boolean(p, "leadership_transfer") == 1;
297}
298
299static void
300raft_format_vote_request(const struct raft_vote_request *rq, struct ds *s)
301{
302 ds_put_format(s, " term=%"PRIu64, rq->term);
303 ds_put_format(s, " last_log_index=%"PRIu64, rq->last_log_index);
304 ds_put_format(s, " last_log_term=%"PRIu64, rq->last_log_term);
305 if (rq->leadership_transfer) {
306 ds_put_cstr(s, " leadership_transfer=true");
307 }
308}
309\f
310/* raft_vote_reply. */
311
312static void
313raft_vote_reply_uninit(struct raft_vote_reply *rpy OVS_UNUSED)
314{
315}
316
317static void
318raft_vote_reply_clone(struct raft_vote_reply *dst OVS_UNUSED,
319 const struct raft_vote_reply *src OVS_UNUSED)
320{
321}
322
323static void
324raft_vote_reply_to_jsonrpc(const struct raft_vote_reply *rpy,
325 struct json *args)
326{
327 raft_put_uint64(args, "term", rpy->term);
328 json_object_put_format(args, "vote", UUID_FMT, UUID_ARGS(&rpy->vote));
329}
330
331static void
332raft_vote_reply_from_jsonrpc(struct ovsdb_parser *p,
333 struct raft_vote_reply *rpy)
334{
335 rpy->term = raft_parse_required_uint64(p, "term");
336 rpy->vote = raft_parse_required_uuid(p, "vote");
337}
338
339static void
340raft_format_vote_reply(const struct raft_vote_reply *rpy, struct ds *s)
341{
342 ds_put_format(s, " term=%"PRIu64, rpy->term);
343 ds_put_format(s, " vote="SID_FMT, SID_ARGS(&rpy->vote));
344}
345\f
346/* raft_add_server_request */
347
348static void
349raft_add_server_request_uninit(struct raft_add_server_request *rq)
350{
351 free(rq->address);
352}
353
354static void
355raft_add_server_request_clone(struct raft_add_server_request *dst,
356 const struct raft_add_server_request *src)
357{
358 dst->address = nullable_xstrdup(src->address);
359}
360
361static void
362raft_add_server_request_to_jsonrpc(const struct raft_add_server_request *rq,
363 struct json *args)
364{
365 json_object_put_string(args, "address", rq->address);
366}
367
368static void
369raft_add_server_request_from_jsonrpc(struct ovsdb_parser *p,
370 struct raft_add_server_request *rq)
371{
372 rq->address = nullable_xstrdup(raft_parse_required_string(p, "address"));
373}
374
375static void
376raft_format_add_server_request(const struct raft_add_server_request *rq,
377 struct ds *s)
378{
379 ds_put_format(s, " address=\"%s\"", rq->address);
380}
381\f
382/* raft_add_server_reply. */
383
384static void
385raft_add_server_reply_uninit(struct raft_add_server_reply *rpy)
386{
387 sset_destroy(&rpy->remote_addresses);
388}
389
390static void
391raft_add_server_reply_clone(struct raft_add_server_reply *dst,
392 const struct raft_add_server_reply *src)
393{
394 sset_clone(&dst->remote_addresses, &src->remote_addresses);
395}
396
397static void
398raft_add_server_reply_to_jsonrpc(const struct raft_add_server_reply *rpy,
399 struct json *args)
400{
401 json_object_put(args, "success", json_boolean_create(rpy->success));
402 if (!sset_is_empty(&rpy->remote_addresses)) {
403 json_object_put(args, "remote_addresses",
404 raft_addresses_to_json(&rpy->remote_addresses));
405 }
406}
407
408static void
409raft_add_server_reply_from_jsonrpc(struct ovsdb_parser *p,
410 struct raft_add_server_reply *rpy)
411{
412 rpy->success = raft_parse_required_boolean(p, "success");
413
414 const struct json *json = ovsdb_parser_member(p, "remote_addresses",
415 OP_ARRAY | OP_OPTIONAL);
416 if (json) {
417 ovsdb_parser_put_error(p, raft_addresses_from_json(
418 json, &rpy->remote_addresses));
419 } else {
420 sset_init(&rpy->remote_addresses);
421 }
422}
423
424static void
425raft_format_add_server_reply(const struct raft_add_server_reply *rpy,
426 struct ds *s)
427{
428 ds_put_format(s, " success=%s", rpy->success ? "true" : "false");
429 if (!sset_is_empty(&rpy->remote_addresses)) {
430 ds_put_cstr(s, " remote_addresses=[");
431
432 const char *address;
433 int i = 0;
434 SSET_FOR_EACH (address, &rpy->remote_addresses) {
435 if (i++ > 0) {
436 ds_put_cstr(s, ", ");
437 }
438 ds_put_cstr(s, address);
439 }
440 ds_put_char(s, ']');
441 }
442}
443\f
444/* raft_remove_server_reply. */
445
446static void
447raft_remove_server_reply_uninit(
448 struct raft_remove_server_reply *rpy OVS_UNUSED)
449{
450}
451
452static void
453raft_remove_server_reply_clone(
454 struct raft_remove_server_reply *dst OVS_UNUSED,
455 const struct raft_remove_server_reply *src OVS_UNUSED)
456{
457}
458
459static void
460raft_remove_server_reply_to_jsonrpc(const struct raft_remove_server_reply *rpy,
461 struct json *args)
462{
17bd4149
BP
463 if (!uuid_is_zero(&rpy->target_sid)) {
464 json_object_put_format(args, "target_server",
465 UUID_FMT, UUID_ARGS(&rpy->target_sid));
466 }
1b1d2e6d
BP
467 json_object_put(args, "success", json_boolean_create(rpy->success));
468}
469
470static void
471raft_remove_server_reply_from_jsonrpc(struct ovsdb_parser *p,
472 struct raft_remove_server_reply *rpy)
473{
474 rpy->success = raft_parse_required_boolean(p, "success");
17bd4149 475 raft_parse_optional_uuid(p, "target_server", &rpy->target_sid);
1b1d2e6d
BP
476}
477
478static void
479raft_format_remove_server_reply(const struct raft_remove_server_reply *rpy,
480 struct ds *s)
481{
482 ds_put_format(s, " success=%s", rpy->success ? "true" : "false");
483}
484\f
485/* raft_install_snapshot_request. */
486
487static void
488raft_install_snapshot_request_uninit(
489 struct raft_install_snapshot_request *rq)
490{
491 json_destroy(rq->last_servers);
492 json_destroy(rq->data);
493}
494
495static void
496raft_install_snapshot_request_clone(
497 struct raft_install_snapshot_request *dst,
498 const struct raft_install_snapshot_request *src)
499{
500 dst->last_servers = json_clone(src->last_servers);
501 dst->data = json_clone(src->data);
502}
503
504static void
505raft_install_snapshot_request_to_jsonrpc(
506 const struct raft_install_snapshot_request *rq, struct json *args)
507{
508 raft_put_uint64(args, "term", rq->term);
509 raft_put_uint64(args, "last_index", rq->last_index);
510 raft_put_uint64(args, "last_term", rq->last_term);
511 json_object_put(args, "last_servers", json_clone(rq->last_servers));
512 json_object_put_format(args, "last_eid",
513 UUID_FMT, UUID_ARGS(&rq->last_eid));
9bfb280a 514 raft_put_uint64(args, "election_timer", rq->election_timer);
1b1d2e6d
BP
515
516 json_object_put(args, "data", json_clone(rq->data));
517}
518
519static void
520raft_install_snapshot_request_from_jsonrpc(
521 struct ovsdb_parser *p, struct raft_install_snapshot_request *rq)
522{
523 rq->last_servers = json_nullable_clone(
524 ovsdb_parser_member(p, "last_servers", OP_OBJECT));
525 ovsdb_parser_put_error(p, raft_servers_validate_json(rq->last_servers));
526
527 rq->term = raft_parse_required_uint64(p, "term");
528 rq->last_index = raft_parse_required_uint64(p, "last_index");
529 rq->last_term = raft_parse_required_uint64(p, "last_term");
530 rq->last_eid = raft_parse_required_uuid(p, "last_eid");
9bfb280a
HZ
531 /* election_timer is optional in file header, but is always populated in
532 * install_snapshot_request. */
533 rq->election_timer = raft_parse_required_uint64(p, "election_timer");
1b1d2e6d
BP
534
535 rq->data = json_nullable_clone(
536 ovsdb_parser_member(p, "data", OP_OBJECT | OP_ARRAY));
537}
538
539static void
540raft_format_install_snapshot_request(
541 const struct raft_install_snapshot_request *rq, struct ds *s)
542{
543 ds_put_format(s, " term=%"PRIu64, rq->term);
544 ds_put_format(s, " last_index=%"PRIu64, rq->last_index);
545 ds_put_format(s, " last_term=%"PRIu64, rq->last_term);
546 ds_put_format(s, " last_eid="UUID_FMT, UUID_ARGS(&rq->last_eid));
9bfb280a 547 ds_put_format(s, " election_timer=%"PRIu64, rq->election_timer);
692a09cb 548 ds_put_cstr(s, " last_servers=");
1b1d2e6d
BP
549
550 struct hmap servers;
551 struct ovsdb_error *error =
552 raft_servers_from_json(rq->last_servers, &servers);
553 if (!error) {
554 raft_servers_format(&servers, s);
555 raft_servers_destroy(&servers);
556 } else {
557 ds_put_cstr(s, "***error***");
558 ovsdb_error_destroy(error);
559 }
560}
561\f
562/* raft_install_snapshot_reply. */
563
564static void
565raft_install_snapshot_reply_uninit(
566 struct raft_install_snapshot_reply *rpy OVS_UNUSED)
567{
568}
569
570static void
571raft_install_snapshot_reply_clone(
572 struct raft_install_snapshot_reply *dst OVS_UNUSED,
573 const struct raft_install_snapshot_reply *src OVS_UNUSED)
574{
575}
576
577static void
578raft_install_snapshot_reply_to_jsonrpc(
579 const struct raft_install_snapshot_reply *rpy, struct json *args)
580{
581 raft_put_uint64(args, "term", rpy->term);
582 raft_put_uint64(args, "last_index", rpy->last_index);
583 raft_put_uint64(args, "last_term", rpy->last_term);
584}
585
586static void
587raft_install_snapshot_reply_from_jsonrpc(
588 struct ovsdb_parser *p,
589 struct raft_install_snapshot_reply *rpy)
590{
591 rpy->term = raft_parse_required_uint64(p, "term");
592 rpy->last_index = raft_parse_required_uint64(p, "last_index");
593 rpy->last_term = raft_parse_required_uint64(p, "last_term");
594}
595
596static void
597raft_format_install_snapshot_reply(
598 const struct raft_install_snapshot_reply *rpy, struct ds *s)
599{
600 ds_put_format(s, " term=%"PRIu64, rpy->term);
601 ds_put_format(s, " last_index=%"PRIu64, rpy->last_index);
602 ds_put_format(s, " last_term=%"PRIu64, rpy->last_term);
603}
604\f
605/* raft_remove_server_request. */
606
607static void
608raft_remove_server_request_uninit(
609 struct raft_remove_server_request *rq OVS_UNUSED)
610{
611}
612
613static void
614raft_remove_server_request_clone(
615 struct raft_remove_server_request *dst OVS_UNUSED,
616 const struct raft_remove_server_request *src OVS_UNUSED)
617{
618}
619
620static void
621raft_remove_server_request_to_jsonrpc(
622 const struct raft_remove_server_request *rq, struct json *args)
623{
624 json_object_put_format(args, "server_id", UUID_FMT, UUID_ARGS(&rq->sid));
625}
626
627static void
628raft_remove_server_request_from_jsonrpc(struct ovsdb_parser *p,
629 struct raft_remove_server_request *rq)
630{
631 rq->sid = raft_parse_required_uuid(p, "server_id");
632}
633
634static void
635raft_format_remove_server_request(const struct raft_remove_server_request *rq,
636 struct ds *s)
637{
638 ds_put_format(s, " server="SID_FMT, SID_ARGS(&rq->sid));
639}
640\f
641/* raft_become_leader. */
642
643static void
644raft_become_leader_uninit(struct raft_become_leader *rpc OVS_UNUSED)
645{
646}
647
648static void
649raft_become_leader_clone(struct raft_become_leader *dst OVS_UNUSED,
650 const struct raft_become_leader *src OVS_UNUSED)
651{
652}
653
654static void
655raft_become_leader_to_jsonrpc(const struct raft_become_leader *rpc,
656 struct json *args)
657{
658 raft_put_uint64(args, "term", rpc->term);
659}
660
661static void
662raft_become_leader_from_jsonrpc(struct ovsdb_parser *p,
663 struct raft_become_leader *rpc)
664{
665 rpc->term = raft_parse_required_uint64(p, "term");
666}
667
668static void
669raft_format_become_leader(const struct raft_become_leader *rq, struct ds *s)
670{
671 ds_put_format(s, " term=%"PRIu64, rq->term);
672}
673\f
674/* raft_execute_command_request. */
675
676static void
677raft_execute_command_request_uninit(
678 struct raft_execute_command_request *rq)
679{
680 json_destroy(rq->data);
681}
682
683static void
684raft_execute_command_request_clone(
685 struct raft_execute_command_request *dst,
686 const struct raft_execute_command_request *src)
687{
688 dst->data = json_clone(src->data);
689}
690
691static void
692raft_execute_command_request_to_jsonrpc(
693 const struct raft_execute_command_request *rq, struct json *args)
694{
695 json_object_put(args, "data", json_clone(rq->data));
696 json_object_put_format(args, "prereq", UUID_FMT, UUID_ARGS(&rq->prereq));
697 json_object_put_format(args, "result", UUID_FMT, UUID_ARGS(&rq->result));
698}
699
700static void
701raft_execute_command_request_from_jsonrpc(
702 struct ovsdb_parser *p, struct raft_execute_command_request *rq)
703{
704 rq->data = json_nullable_clone(ovsdb_parser_member(p, "data",
705 OP_OBJECT | OP_ARRAY));
706 rq->prereq = raft_parse_required_uuid(p, "prereq");
707 rq->result = raft_parse_required_uuid(p, "result");
708}
709
710static void
711raft_format_execute_command_request(
712 const struct raft_execute_command_request *rq, struct ds *s)
713{
714 ds_put_format(s, " prereq="UUID_FMT, UUID_ARGS(&rq->prereq));
715 ds_put_format(s, " result="UUID_FMT, UUID_ARGS(&rq->result));
716 ds_put_format(s, " data=");
717 json_to_ds(rq->data, JSSF_SORT, s);
718}
719\f
720/* raft_execute_command_reply. */
721
722static void
723raft_execute_command_reply_uninit(
724 struct raft_execute_command_reply *rpy OVS_UNUSED)
725{
726}
727
728static void
729raft_execute_command_reply_clone(
730 struct raft_execute_command_reply *dst OVS_UNUSED,
731 const struct raft_execute_command_reply *src OVS_UNUSED)
732{
733}
734
735static void
736raft_execute_command_reply_to_jsonrpc(
737 const struct raft_execute_command_reply *rpy, struct json *args)
738{
739 json_object_put_format(args, "result", UUID_FMT, UUID_ARGS(&rpy->result));
740 json_object_put_string(args, "status",
741 raft_command_status_to_string(rpy->status));
742 if (rpy->commit_index) {
743 raft_put_uint64(args, "commit_index", rpy->commit_index);
744 }
745}
746
747static void
748raft_execute_command_reply_from_jsonrpc(
749 struct ovsdb_parser *p, struct raft_execute_command_reply *rpy)
750{
751 rpy->result = raft_parse_required_uuid(p, "result");
752
753 const char *status = raft_parse_required_string(p, "status");
754 if (status && !raft_command_status_from_string(status, &rpy->status)) {
755 ovsdb_parser_raise_error(p, "unknown status \"%s\"", status);
756 }
757
758 rpy->commit_index = raft_parse_optional_uint64(p, "commit_index");
759}
760
761static void
762raft_format_execute_command_reply(
763 const struct raft_execute_command_reply *rpy, struct ds *s)
764{
765 ds_put_format(s, " result="UUID_FMT, UUID_ARGS(&rpy->result));
766 ds_put_format(s, " status=\"%s\"",
767 raft_command_status_to_string(rpy->status));
768 if (rpy->commit_index) {
769 ds_put_format(s, " commit_index=%"PRIu64, rpy->commit_index);
770 }
771}
772\f
773void
774raft_rpc_uninit(union raft_rpc *rpc)
775{
776 if (rpc) {
777 free(rpc->common.comment);
778
779 switch (rpc->type) {
780#define RAFT_RPC(ENUM, NAME) \
781 case ENUM: \
782 raft_##NAME##_uninit(&rpc->NAME); \
783 break;
784 RAFT_RPC_TYPES
785#undef RAFT_RPC
786 }
787 }
788}
789
790union raft_rpc *
791raft_rpc_clone(const union raft_rpc *src)
792{
793 union raft_rpc *dst = xmemdup(src, sizeof *src);
794 dst->common.comment = nullable_xstrdup(src->common.comment);
795
796 switch (src->type) {
797#define RAFT_RPC(ENUM, NAME) \
798 case ENUM: \
799 raft_##NAME##_clone(&dst->NAME, &src->NAME); \
800 break;
801 RAFT_RPC_TYPES
802#undef RAFT_RPC
803 }
804
805 return dst;
806}
807
808/* Returns 'rpc' converted to a jsonrpc_msg. The caller must eventually free
809 * the returned message.
810 *
811 * 'rpc->common.sid' should be the destination server ID; it is omitted if
812 * all-zeros. 'sid' is the source. 'cid' should be the cluster ID; it is
813 * omitted if all-zeros. */
814struct jsonrpc_msg *
815raft_rpc_to_jsonrpc(const struct uuid *cid,
816 const struct uuid *sid,
817 const union raft_rpc *rpc)
818{
819 struct json *args = json_object_create();
820 if (!uuid_is_zero(cid)) {
821 json_object_put_format(args, "cluster", UUID_FMT, UUID_ARGS(cid));
822 }
823 if (!uuid_is_zero(&rpc->common.sid)) {
824 json_object_put_format(args, "to", UUID_FMT,
825 UUID_ARGS(&rpc->common.sid));
826 }
827 json_object_put_format(args, "from", UUID_FMT, UUID_ARGS(sid));
828 if (rpc->common.comment) {
829 json_object_put_string(args, "comment", rpc->common.comment);
830 }
831
832 switch (rpc->type) {
833#define RAFT_RPC(ENUM, NAME) \
834 case ENUM: \
835 raft_##NAME##_to_jsonrpc(&rpc->NAME, args); \
836 break;
837 RAFT_RPC_TYPES
838#undef RAFT_RPC
839 default:
840 OVS_NOT_REACHED();
841 }
842
843 return jsonrpc_create_notify(raft_rpc_type_to_string(rpc->type),
844 json_array_create_1(args));
845}
846
847/* Parses 'msg' as a Raft message directed to 'sid' and initializes 'rpc'
848 * appropriately. On success, returns NULL and the caller owns the contents of
849 * 'rpc' and must eventually uninitialize it with raft_rpc_uninit(). On
850 * failure, returns an error that the caller must eventually free.
851 *
852 * 'cidp' must point to the Raft cluster's ID. If the cluster ID isn't yet
853 * known, then '*cidp' must be UUID_ZERO and this function will attempt to
854 * initialize it based on 'msg'. */
855struct ovsdb_error * OVS_WARN_UNUSED_RESULT
856raft_rpc_from_jsonrpc(struct uuid *cidp, const struct uuid *sid,
857 const struct jsonrpc_msg *msg, union raft_rpc *rpc)
858{
859 memset(rpc, 0, sizeof *rpc);
860 if (msg->type != JSONRPC_NOTIFY) {
861 return ovsdb_error(NULL, "expecting notify RPC but received %s",
862 jsonrpc_msg_type_to_string(msg->type));
863 }
864
865 if (!raft_rpc_type_from_string(msg->method, &rpc->type)) {
866 return ovsdb_error(NULL, "unknown method %s", msg->method);
867 }
868
869 if (json_array(msg->params)->n != 1) {
870 return ovsdb_error(NULL,
871 "%s RPC has %"PRIuSIZE" parameters (expected 1)",
872 msg->method, json_array(msg->params)->n);
873 }
874
875 struct ovsdb_parser p;
876 ovsdb_parser_init(&p, json_array(msg->params)->elems[0],
877 "raft %s RPC", msg->method);
878
879 bool is_hello = rpc->type == RAFT_RPC_HELLO_REQUEST;
880 bool is_add = rpc->type == RAFT_RPC_ADD_SERVER_REQUEST;
881
882 struct uuid cid;
883 if (raft_parse_uuid(&p, "cluster", is_add, &cid)
884 && !uuid_equals(&cid, cidp)) {
885 if (uuid_is_zero(cidp)) {
886 *cidp = cid;
887 VLOG_INFO("learned cluster ID "CID_FMT, CID_ARGS(&cid));
888 } else {
889 ovsdb_parser_raise_error(&p, "wrong cluster "CID_FMT" "
890 "(expected "CID_FMT")",
891 CID_ARGS(&cid), CID_ARGS(cidp));
892 }
893 }
894
895 struct uuid to_sid;
896 if (raft_parse_uuid(&p, "to", is_add || is_hello, &to_sid)
897 && !uuid_equals(&to_sid, sid)) {
898 ovsdb_parser_raise_error(&p, "misrouted message (addressed to "
899 SID_FMT" but we're "SID_FMT")",
900 SID_ARGS(&to_sid), SID_ARGS(sid));
901 }
902
903 rpc->common.sid = raft_parse_required_uuid(&p, "from");
904 rpc->common.comment = nullable_xstrdup(
905 raft_parse_optional_string(&p, "comment"));
906
907 switch (rpc->type) {
908#define RAFT_RPC(ENUM, NAME) \
909 case ENUM: \
910 raft_##NAME##_from_jsonrpc(&p, &rpc->NAME); \
911 break;
912 RAFT_RPC_TYPES
913#undef RAFT_RPC
914
915 default:
916 OVS_NOT_REACHED();
917 }
918
919 struct ovsdb_error *error = ovsdb_parser_finish(&p);
920 if (error) {
921 raft_rpc_uninit(rpc);
922 }
923 return error;
924}
925
926/* Appends a formatted representation of 'rpc' to 's'.
927 *
928 * Does not include the RPC's server ID in the formatted representation, since
929 * the caller usually has more context that allows for a more human friendly
930 * name. */
931void
932raft_rpc_format(const union raft_rpc *rpc, struct ds *s)
933{
934 ds_put_cstr(s, raft_rpc_type_to_string(rpc->type));
935 if (rpc->common.comment) {
936 ds_put_format(s, " \"%s\"", rpc->common.comment);
937 }
938 ds_put_char(s, ':');
939
940 switch (rpc->type) {
941#define RAFT_RPC(ENUM, NAME) \
942 case ENUM: \
943 raft_format_##NAME(&rpc->NAME, s); \
944 break;
945 RAFT_RPC_TYPES
946#undef RAFT_RPC
947 default:
948 OVS_NOT_REACHED();
949 }
950}
951
952uint64_t
953raft_rpc_get_term(const union raft_rpc *rpc)
954{
955 switch (rpc->type) {
956 case RAFT_RPC_HELLO_REQUEST:
957 case RAFT_RPC_ADD_SERVER_REQUEST:
958 case RAFT_RPC_ADD_SERVER_REPLY:
959 case RAFT_RPC_REMOVE_SERVER_REQUEST:
960 case RAFT_RPC_REMOVE_SERVER_REPLY:
961 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
962 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
963 return 0;
964
965 case RAFT_RPC_APPEND_REQUEST:
966 return rpc->append_request.term;
967
968 case RAFT_RPC_APPEND_REPLY:
969 return rpc->append_reply.term;
970
971 case RAFT_RPC_VOTE_REQUEST:
972 return rpc->vote_request.term;
973
974 case RAFT_RPC_VOTE_REPLY:
975 return rpc->vote_reply.term;
976
977 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
978 return rpc->install_snapshot_request.term;
979
980 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
981 return rpc->install_snapshot_reply.term;
982
983 case RAFT_RPC_BECOME_LEADER:
984 return rpc->become_leader.term;
985
986 default:
987 OVS_NOT_REACHED();
988 }
989}
990
991const struct uuid *
992raft_rpc_get_vote(const union raft_rpc *rpc)
993{
994 switch (rpc->type) {
995 case RAFT_RPC_HELLO_REQUEST:
996 case RAFT_RPC_ADD_SERVER_REQUEST:
997 case RAFT_RPC_ADD_SERVER_REPLY:
998 case RAFT_RPC_REMOVE_SERVER_REQUEST:
999 case RAFT_RPC_REMOVE_SERVER_REPLY:
1000 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
1001 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
1002 case RAFT_RPC_APPEND_REQUEST:
1003 case RAFT_RPC_APPEND_REPLY:
1004 case RAFT_RPC_VOTE_REQUEST:
1005 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
1006 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
1007 case RAFT_RPC_BECOME_LEADER:
1008 return NULL;
1009
1010 case RAFT_RPC_VOTE_REPLY:
1011 return &raft_vote_reply_cast(rpc)->vote;
1012
1013 default:
1014 OVS_NOT_REACHED();
1015 }
1016}
1017
1018/* Returns the minimum log index that must be synced to disk if 'rpc' is to be
1019 * sent. (This is generally the biggest log index in the message but some
1020 * messages, e.g. RAFT_RPC_APPEND_REQUEST, don't need their entries synced.) */
1021uint64_t
1022raft_rpc_get_min_sync_index(const union raft_rpc *rpc)
1023{
1024 switch (rpc->type) {
1025 case RAFT_RPC_HELLO_REQUEST:
1026 case RAFT_RPC_ADD_SERVER_REQUEST:
1027 case RAFT_RPC_ADD_SERVER_REPLY:
1028 case RAFT_RPC_REMOVE_SERVER_REQUEST:
1029 case RAFT_RPC_REMOVE_SERVER_REPLY:
1030 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
1031 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
1032 case RAFT_RPC_APPEND_REQUEST:
1033 case RAFT_RPC_BECOME_LEADER:
1034 case RAFT_RPC_VOTE_REPLY:
1035 return 0;
1036
1037 case RAFT_RPC_APPEND_REPLY:
1038 return raft_append_reply_cast(rpc)->log_end - 1;
1039
1040 case RAFT_RPC_VOTE_REQUEST:
1041 return raft_vote_request_cast(rpc)->last_log_index;
1042
1043 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
1044 return raft_install_snapshot_request_cast(rpc)->last_index;
1045
1046 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
1047 /* This will need to change if install_snapshot_reply becomes able to
1048 * report an error */
1049 return raft_install_snapshot_reply_cast(rpc)->last_index;
1050
1051 default:
1052 OVS_NOT_REACHED();
1053 }
1054}