]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/raft-rpc.c
dist-docs: Include manpages generated from rST.
[mirror_ovs.git] / ovsdb / raft-rpc.c
1 /*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "raft-rpc.h"
20 #include <stdlib.h>
21 #include <string.h>
22 #include "compiler.h"
23 #include "jsonrpc.h"
24 #include "ovsdb-error.h"
25 #include "ovsdb-parser.h"
26 #include "openvswitch/dynamic-string.h"
27 #include "openvswitch/json.h"
28 #include "openvswitch/vlog.h"
29 #include "sset.h"
30
31 VLOG_DEFINE_THIS_MODULE(raft_rpc);
32
33 #define RAFT_RPC(ENUM, NAME) \
34 static void raft_##NAME##_uninit(struct raft_##NAME *); \
35 static void raft_##NAME##_clone(struct raft_##NAME *, \
36 const struct raft_##NAME *); \
37 static void raft_##NAME##_to_jsonrpc(const struct raft_##NAME *, \
38 struct json *); \
39 static void raft_##NAME##_from_jsonrpc(struct ovsdb_parser *, \
40 struct raft_##NAME *); \
41 static void raft_format_##NAME(const struct raft_##NAME *, struct ds *);
42 RAFT_RPC_TYPES
43 #undef RAFT_RPC
44 \f
45 /* raft_rpc_type. */
46 const char *
47 raft_rpc_type_to_string(enum raft_rpc_type status)
48 {
49 switch (status) {
50 #define RAFT_RPC(ENUM, NAME) case ENUM: return #NAME;
51 RAFT_RPC_TYPES
52 #undef RAFT_RPC
53 }
54 return "<unknown>";
55 }
56
57 bool
58 raft_rpc_type_from_string(const char *s, enum raft_rpc_type *status)
59 {
60 #define RAFT_RPC(ENUM, NAME) \
61 if (!strcmp(s, #NAME)) { \
62 *status = ENUM; \
63 return true; \
64 }
65 RAFT_RPC_TYPES
66 #undef RAFT_RPC
67 return false;
68 }
69 \f
70 /* raft_hello_request. */
71
72 static void
73 raft_hello_request_uninit(struct raft_hello_request *rq)
74 {
75 free(rq->address);
76 }
77
78 static void
79 raft_hello_request_clone(struct raft_hello_request *dst,
80 const struct raft_hello_request *src)
81 {
82 dst->address = nullable_xstrdup(src->address);
83 }
84
85 static void
86 raft_hello_request_to_jsonrpc(const struct raft_hello_request *rq,
87 struct json *args)
88 {
89 json_object_put_string(args, "address", rq->address);
90 }
91
92 static void
93 raft_hello_request_from_jsonrpc(struct ovsdb_parser *p,
94 struct raft_hello_request *rq)
95 {
96 rq->address = nullable_xstrdup(raft_parse_required_string(p, "address"));
97 }
98
99 static void
100 raft_format_hello_request(const struct raft_hello_request *rq,
101 struct ds *s)
102 {
103 ds_put_format(s, " address=\"%s\"", rq->address);
104 }
105 \f
106 /* raft_append_request. */
107
108 static void
109 raft_append_request_uninit(struct raft_append_request *rq)
110 {
111 for (size_t i = 0; i < rq->n_entries; i++) {
112 raft_entry_uninit(&rq->entries[i]);
113 }
114 free(rq->entries);
115 }
116
117 static void
118 raft_append_request_clone(struct raft_append_request *dst,
119 const struct raft_append_request *src)
120 {
121 dst->entries = xmalloc(src->n_entries * sizeof *dst->entries);
122 for (size_t i = 0; i < src->n_entries; i++) {
123 raft_entry_clone(&dst->entries[i], &src->entries[i]);
124 }
125 }
126
127 static void
128 raft_append_request_to_jsonrpc(const struct raft_append_request *rq,
129 struct json *args)
130 {
131 raft_put_uint64(args, "term", rq->term);
132 raft_put_uint64(args, "prev_log_index", rq->prev_log_index);
133 raft_put_uint64(args, "prev_log_term", rq->prev_log_term);
134 raft_put_uint64(args, "leader_commit", rq->leader_commit);
135
136 struct json **entries = xmalloc(rq->n_entries * sizeof *entries);
137 for (size_t i = 0; i < rq->n_entries; i++) {
138 entries[i] = raft_entry_to_json(&rq->entries[i]);
139 }
140 json_object_put(args, "log", json_array_create(entries, rq->n_entries));
141 }
142
143 static void
144 raft_append_request_from_jsonrpc(struct ovsdb_parser *p,
145 struct raft_append_request *rq)
146 {
147 rq->term = raft_parse_required_uint64(p, "term");
148 rq->prev_log_index = raft_parse_required_uint64(p, "prev_log_index");
149 rq->prev_log_term = raft_parse_required_uint64(p, "prev_log_term");
150 rq->leader_commit = raft_parse_required_uint64(p, "leader_commit");
151
152 const struct json *log = ovsdb_parser_member(p, "log", OP_ARRAY);
153 if (!log) {
154 return;
155 }
156 const struct json_array *entries = json_array(log);
157 rq->entries = xmalloc(entries->n * sizeof *rq->entries);
158 rq->n_entries = 0;
159 for (size_t i = 0; i < entries->n; i++) {
160 struct ovsdb_error *error = raft_entry_from_json(entries->elems[i],
161 &rq->entries[i]);
162 if (error) {
163 ovsdb_parser_put_error(p, error);
164 break;
165 }
166 rq->n_entries++;
167 }
168 }
169
170 static void
171 raft_format_append_request(const struct raft_append_request *rq,
172 struct ds *s)
173 {
174 ds_put_format(s, " term=%"PRIu64, rq->term);
175 ds_put_format(s, " prev_log_index=%"PRIu64, rq->prev_log_index);
176 ds_put_format(s, " prev_log_term=%"PRIu64, rq->prev_log_term);
177 ds_put_format(s, " leader_commit=%"PRIu64, rq->leader_commit);
178 ds_put_format(s, " n_entries=%u", rq->n_entries);
179 }
180 \f
181 /* raft_append_reply. */
182
183 const char *
184 raft_append_result_to_string(enum raft_append_result result)
185 {
186 switch (result) {
187 case RAFT_APPEND_OK:
188 return "OK";
189 case RAFT_APPEND_INCONSISTENCY:
190 return "inconsistency";
191 case RAFT_APPEND_IO_ERROR:
192 return "I/O error";
193 default:
194 return NULL;
195 }
196 }
197
198 bool
199 raft_append_result_from_string(const char *s, enum raft_append_result *resultp)
200 {
201 for (enum raft_append_result result = 0; ; result++) {
202 const char *s2 = raft_append_result_to_string(result);
203 if (!s2) {
204 *resultp = 0;
205 return false;
206 } else if (!strcmp(s, s2)) {
207 *resultp = result;
208 return true;
209 }
210 }
211 }
212
213 static void
214 raft_append_reply_uninit(struct raft_append_reply *rpy OVS_UNUSED)
215 {
216 }
217
218 static void
219 raft_append_reply_clone(struct raft_append_reply *dst OVS_UNUSED,
220 const struct raft_append_reply *src OVS_UNUSED)
221 {
222 }
223
224 static void
225 raft_append_reply_to_jsonrpc(const struct raft_append_reply *rpy,
226 struct json *args)
227 {
228 raft_put_uint64(args, "term", rpy->term);
229 raft_put_uint64(args, "log_end", rpy->log_end);
230 raft_put_uint64(args, "prev_log_index", rpy->prev_log_index);
231 raft_put_uint64(args, "prev_log_term", rpy->prev_log_term);
232 raft_put_uint64(args, "n_entries", rpy->n_entries);
233 json_object_put_string(args, "result",
234 raft_append_result_to_string(rpy->result));
235 }
236
237 static void
238 raft_append_reply_from_jsonrpc(struct ovsdb_parser *p,
239 struct raft_append_reply *rpy)
240 {
241 rpy->term = raft_parse_required_uint64(p, "term");
242 rpy->log_end = raft_parse_required_uint64(p, "log_end");
243 rpy->prev_log_index = raft_parse_required_uint64(p, "prev_log_index");
244 rpy->prev_log_term = raft_parse_required_uint64(p, "prev_log_term");
245 rpy->n_entries = raft_parse_required_uint64(p, "n_entries");
246
247 const char *result = raft_parse_required_string(p, "result");
248 if (result && !raft_append_result_from_string(result, &rpy->result)) {
249 ovsdb_parser_raise_error(p, "unknown result \"%s\"", result);
250 }
251 }
252
253 static void
254 raft_format_append_reply(const struct raft_append_reply *rpy, struct ds *s)
255 {
256 ds_put_format(s, " term=%"PRIu64, rpy->term);
257 ds_put_format(s, " log_end=%"PRIu64, rpy->log_end);
258 ds_put_format(s, " result=\"%s\"",
259 raft_append_result_to_string(rpy->result));
260 }
261 \f
262 /* raft_vote_request. */
263
264 static void
265 raft_vote_request_uninit(struct raft_vote_request *rq OVS_UNUSED)
266 {
267 }
268
269 static void
270 raft_vote_request_clone(struct raft_vote_request *dst OVS_UNUSED,
271 const struct raft_vote_request *src OVS_UNUSED)
272 {
273 }
274
275 static void
276 raft_vote_request_to_jsonrpc(const struct raft_vote_request *rq,
277 struct json *args)
278 {
279 raft_put_uint64(args, "term", rq->term);
280 raft_put_uint64(args, "last_log_index", rq->last_log_index);
281 raft_put_uint64(args, "last_log_term", rq->last_log_term);
282 if (rq->leadership_transfer) {
283 json_object_put(args, "leadership_transfer",
284 json_boolean_create(true));
285 }
286 }
287
288 static void
289 raft_vote_request_from_jsonrpc(struct ovsdb_parser *p,
290 struct raft_vote_request *rq)
291 {
292 rq->term = raft_parse_required_uint64(p, "term");
293 rq->last_log_index = raft_parse_required_uint64(p, "last_log_index");
294 rq->last_log_term = raft_parse_required_uint64(p, "last_log_term");
295 rq->leadership_transfer
296 = raft_parse_optional_boolean(p, "leadership_transfer") == 1;
297 }
298
299 static void
300 raft_format_vote_request(const struct raft_vote_request *rq, struct ds *s)
301 {
302 ds_put_format(s, " term=%"PRIu64, rq->term);
303 ds_put_format(s, " last_log_index=%"PRIu64, rq->last_log_index);
304 ds_put_format(s, " last_log_term=%"PRIu64, rq->last_log_term);
305 if (rq->leadership_transfer) {
306 ds_put_cstr(s, " leadership_transfer=true");
307 }
308 }
309 \f
310 /* raft_vote_reply. */
311
312 static void
313 raft_vote_reply_uninit(struct raft_vote_reply *rpy OVS_UNUSED)
314 {
315 }
316
317 static void
318 raft_vote_reply_clone(struct raft_vote_reply *dst OVS_UNUSED,
319 const struct raft_vote_reply *src OVS_UNUSED)
320 {
321 }
322
323 static void
324 raft_vote_reply_to_jsonrpc(const struct raft_vote_reply *rpy,
325 struct json *args)
326 {
327 raft_put_uint64(args, "term", rpy->term);
328 json_object_put_format(args, "vote", UUID_FMT, UUID_ARGS(&rpy->vote));
329 }
330
331 static void
332 raft_vote_reply_from_jsonrpc(struct ovsdb_parser *p,
333 struct raft_vote_reply *rpy)
334 {
335 rpy->term = raft_parse_required_uint64(p, "term");
336 rpy->vote = raft_parse_required_uuid(p, "vote");
337 }
338
339 static void
340 raft_format_vote_reply(const struct raft_vote_reply *rpy, struct ds *s)
341 {
342 ds_put_format(s, " term=%"PRIu64, rpy->term);
343 ds_put_format(s, " vote="SID_FMT, SID_ARGS(&rpy->vote));
344 }
345 \f
346 /* raft_add_server_request */
347
348 static void
349 raft_add_server_request_uninit(struct raft_add_server_request *rq)
350 {
351 free(rq->address);
352 }
353
354 static void
355 raft_add_server_request_clone(struct raft_add_server_request *dst,
356 const struct raft_add_server_request *src)
357 {
358 dst->address = nullable_xstrdup(src->address);
359 }
360
361 static void
362 raft_add_server_request_to_jsonrpc(const struct raft_add_server_request *rq,
363 struct json *args)
364 {
365 json_object_put_string(args, "address", rq->address);
366 }
367
368 static void
369 raft_add_server_request_from_jsonrpc(struct ovsdb_parser *p,
370 struct raft_add_server_request *rq)
371 {
372 rq->address = nullable_xstrdup(raft_parse_required_string(p, "address"));
373 }
374
375 static void
376 raft_format_add_server_request(const struct raft_add_server_request *rq,
377 struct ds *s)
378 {
379 ds_put_format(s, " address=\"%s\"", rq->address);
380 }
381 \f
382 /* raft_add_server_reply. */
383
384 static void
385 raft_add_server_reply_uninit(struct raft_add_server_reply *rpy)
386 {
387 sset_destroy(&rpy->remote_addresses);
388 }
389
390 static void
391 raft_add_server_reply_clone(struct raft_add_server_reply *dst,
392 const struct raft_add_server_reply *src)
393 {
394 sset_clone(&dst->remote_addresses, &src->remote_addresses);
395 }
396
397 static void
398 raft_add_server_reply_to_jsonrpc(const struct raft_add_server_reply *rpy,
399 struct json *args)
400 {
401 json_object_put(args, "success", json_boolean_create(rpy->success));
402 if (!sset_is_empty(&rpy->remote_addresses)) {
403 json_object_put(args, "remote_addresses",
404 raft_addresses_to_json(&rpy->remote_addresses));
405 }
406 }
407
408 static void
409 raft_add_server_reply_from_jsonrpc(struct ovsdb_parser *p,
410 struct raft_add_server_reply *rpy)
411 {
412 rpy->success = raft_parse_required_boolean(p, "success");
413
414 const struct json *json = ovsdb_parser_member(p, "remote_addresses",
415 OP_ARRAY | OP_OPTIONAL);
416 if (json) {
417 ovsdb_parser_put_error(p, raft_addresses_from_json(
418 json, &rpy->remote_addresses));
419 } else {
420 sset_init(&rpy->remote_addresses);
421 }
422 }
423
424 static void
425 raft_format_add_server_reply(const struct raft_add_server_reply *rpy,
426 struct ds *s)
427 {
428 ds_put_format(s, " success=%s", rpy->success ? "true" : "false");
429 if (!sset_is_empty(&rpy->remote_addresses)) {
430 ds_put_cstr(s, " remote_addresses=[");
431
432 const char *address;
433 int i = 0;
434 SSET_FOR_EACH (address, &rpy->remote_addresses) {
435 if (i++ > 0) {
436 ds_put_cstr(s, ", ");
437 }
438 ds_put_cstr(s, address);
439 }
440 ds_put_char(s, ']');
441 }
442 }
443 \f
444 /* raft_remove_server_reply. */
445
446 static void
447 raft_remove_server_reply_uninit(
448 struct raft_remove_server_reply *rpy OVS_UNUSED)
449 {
450 }
451
452 static void
453 raft_remove_server_reply_clone(
454 struct raft_remove_server_reply *dst OVS_UNUSED,
455 const struct raft_remove_server_reply *src OVS_UNUSED)
456 {
457 }
458
459 static void
460 raft_remove_server_reply_to_jsonrpc(const struct raft_remove_server_reply *rpy,
461 struct json *args)
462 {
463 if (!uuid_is_zero(&rpy->target_sid)) {
464 json_object_put_format(args, "target_server",
465 UUID_FMT, UUID_ARGS(&rpy->target_sid));
466 }
467 json_object_put(args, "success", json_boolean_create(rpy->success));
468 }
469
470 static void
471 raft_remove_server_reply_from_jsonrpc(struct ovsdb_parser *p,
472 struct raft_remove_server_reply *rpy)
473 {
474 rpy->success = raft_parse_required_boolean(p, "success");
475 raft_parse_optional_uuid(p, "target_server", &rpy->target_sid);
476 }
477
478 static void
479 raft_format_remove_server_reply(const struct raft_remove_server_reply *rpy,
480 struct ds *s)
481 {
482 ds_put_format(s, " success=%s", rpy->success ? "true" : "false");
483 }
484 \f
485 /* raft_install_snapshot_request. */
486
487 static void
488 raft_install_snapshot_request_uninit(
489 struct raft_install_snapshot_request *rq)
490 {
491 json_destroy(rq->last_servers);
492 json_destroy(rq->data);
493 }
494
495 static void
496 raft_install_snapshot_request_clone(
497 struct raft_install_snapshot_request *dst,
498 const struct raft_install_snapshot_request *src)
499 {
500 dst->last_servers = json_clone(src->last_servers);
501 dst->data = json_clone(src->data);
502 }
503
504 static void
505 raft_install_snapshot_request_to_jsonrpc(
506 const struct raft_install_snapshot_request *rq, struct json *args)
507 {
508 raft_put_uint64(args, "term", rq->term);
509 raft_put_uint64(args, "last_index", rq->last_index);
510 raft_put_uint64(args, "last_term", rq->last_term);
511 json_object_put(args, "last_servers", json_clone(rq->last_servers));
512 json_object_put_format(args, "last_eid",
513 UUID_FMT, UUID_ARGS(&rq->last_eid));
514 raft_put_uint64(args, "election_timer", rq->election_timer);
515
516 json_object_put(args, "data", json_clone(rq->data));
517 }
518
519 static void
520 raft_install_snapshot_request_from_jsonrpc(
521 struct ovsdb_parser *p, struct raft_install_snapshot_request *rq)
522 {
523 rq->last_servers = json_nullable_clone(
524 ovsdb_parser_member(p, "last_servers", OP_OBJECT));
525 ovsdb_parser_put_error(p, raft_servers_validate_json(rq->last_servers));
526
527 rq->term = raft_parse_required_uint64(p, "term");
528 rq->last_index = raft_parse_required_uint64(p, "last_index");
529 rq->last_term = raft_parse_required_uint64(p, "last_term");
530 rq->last_eid = raft_parse_required_uuid(p, "last_eid");
531 /* election_timer is optional in file header, but is always populated in
532 * install_snapshot_request. */
533 rq->election_timer = raft_parse_required_uint64(p, "election_timer");
534
535 rq->data = json_nullable_clone(
536 ovsdb_parser_member(p, "data", OP_OBJECT | OP_ARRAY));
537 }
538
539 static void
540 raft_format_install_snapshot_request(
541 const struct raft_install_snapshot_request *rq, struct ds *s)
542 {
543 ds_put_format(s, " term=%"PRIu64, rq->term);
544 ds_put_format(s, " last_index=%"PRIu64, rq->last_index);
545 ds_put_format(s, " last_term=%"PRIu64, rq->last_term);
546 ds_put_format(s, " last_eid="UUID_FMT, UUID_ARGS(&rq->last_eid));
547 ds_put_format(s, " election_timer=%"PRIu64, rq->election_timer);
548 ds_put_cstr(s, " last_servers=");
549
550 struct hmap servers;
551 struct ovsdb_error *error =
552 raft_servers_from_json(rq->last_servers, &servers);
553 if (!error) {
554 raft_servers_format(&servers, s);
555 raft_servers_destroy(&servers);
556 } else {
557 ds_put_cstr(s, "***error***");
558 ovsdb_error_destroy(error);
559 }
560 }
561 \f
562 /* raft_install_snapshot_reply. */
563
564 static void
565 raft_install_snapshot_reply_uninit(
566 struct raft_install_snapshot_reply *rpy OVS_UNUSED)
567 {
568 }
569
570 static void
571 raft_install_snapshot_reply_clone(
572 struct raft_install_snapshot_reply *dst OVS_UNUSED,
573 const struct raft_install_snapshot_reply *src OVS_UNUSED)
574 {
575 }
576
577 static void
578 raft_install_snapshot_reply_to_jsonrpc(
579 const struct raft_install_snapshot_reply *rpy, struct json *args)
580 {
581 raft_put_uint64(args, "term", rpy->term);
582 raft_put_uint64(args, "last_index", rpy->last_index);
583 raft_put_uint64(args, "last_term", rpy->last_term);
584 }
585
586 static void
587 raft_install_snapshot_reply_from_jsonrpc(
588 struct ovsdb_parser *p,
589 struct raft_install_snapshot_reply *rpy)
590 {
591 rpy->term = raft_parse_required_uint64(p, "term");
592 rpy->last_index = raft_parse_required_uint64(p, "last_index");
593 rpy->last_term = raft_parse_required_uint64(p, "last_term");
594 }
595
596 static void
597 raft_format_install_snapshot_reply(
598 const struct raft_install_snapshot_reply *rpy, struct ds *s)
599 {
600 ds_put_format(s, " term=%"PRIu64, rpy->term);
601 ds_put_format(s, " last_index=%"PRIu64, rpy->last_index);
602 ds_put_format(s, " last_term=%"PRIu64, rpy->last_term);
603 }
604 \f
605 /* raft_remove_server_request. */
606
607 static void
608 raft_remove_server_request_uninit(
609 struct raft_remove_server_request *rq OVS_UNUSED)
610 {
611 }
612
613 static void
614 raft_remove_server_request_clone(
615 struct raft_remove_server_request *dst OVS_UNUSED,
616 const struct raft_remove_server_request *src OVS_UNUSED)
617 {
618 }
619
620 static void
621 raft_remove_server_request_to_jsonrpc(
622 const struct raft_remove_server_request *rq, struct json *args)
623 {
624 json_object_put_format(args, "server_id", UUID_FMT, UUID_ARGS(&rq->sid));
625 }
626
627 static void
628 raft_remove_server_request_from_jsonrpc(struct ovsdb_parser *p,
629 struct raft_remove_server_request *rq)
630 {
631 rq->sid = raft_parse_required_uuid(p, "server_id");
632 }
633
634 static void
635 raft_format_remove_server_request(const struct raft_remove_server_request *rq,
636 struct ds *s)
637 {
638 ds_put_format(s, " server="SID_FMT, SID_ARGS(&rq->sid));
639 }
640 \f
641 /* raft_become_leader. */
642
643 static void
644 raft_become_leader_uninit(struct raft_become_leader *rpc OVS_UNUSED)
645 {
646 }
647
648 static void
649 raft_become_leader_clone(struct raft_become_leader *dst OVS_UNUSED,
650 const struct raft_become_leader *src OVS_UNUSED)
651 {
652 }
653
654 static void
655 raft_become_leader_to_jsonrpc(const struct raft_become_leader *rpc,
656 struct json *args)
657 {
658 raft_put_uint64(args, "term", rpc->term);
659 }
660
661 static void
662 raft_become_leader_from_jsonrpc(struct ovsdb_parser *p,
663 struct raft_become_leader *rpc)
664 {
665 rpc->term = raft_parse_required_uint64(p, "term");
666 }
667
668 static void
669 raft_format_become_leader(const struct raft_become_leader *rq, struct ds *s)
670 {
671 ds_put_format(s, " term=%"PRIu64, rq->term);
672 }
673 \f
674 /* raft_execute_command_request. */
675
676 static void
677 raft_execute_command_request_uninit(
678 struct raft_execute_command_request *rq)
679 {
680 json_destroy(rq->data);
681 }
682
683 static void
684 raft_execute_command_request_clone(
685 struct raft_execute_command_request *dst,
686 const struct raft_execute_command_request *src)
687 {
688 dst->data = json_clone(src->data);
689 }
690
691 static void
692 raft_execute_command_request_to_jsonrpc(
693 const struct raft_execute_command_request *rq, struct json *args)
694 {
695 json_object_put(args, "data", json_clone(rq->data));
696 json_object_put_format(args, "prereq", UUID_FMT, UUID_ARGS(&rq->prereq));
697 json_object_put_format(args, "result", UUID_FMT, UUID_ARGS(&rq->result));
698 }
699
700 static void
701 raft_execute_command_request_from_jsonrpc(
702 struct ovsdb_parser *p, struct raft_execute_command_request *rq)
703 {
704 rq->data = json_nullable_clone(ovsdb_parser_member(p, "data",
705 OP_OBJECT | OP_ARRAY));
706 rq->prereq = raft_parse_required_uuid(p, "prereq");
707 rq->result = raft_parse_required_uuid(p, "result");
708 }
709
710 static void
711 raft_format_execute_command_request(
712 const struct raft_execute_command_request *rq, struct ds *s)
713 {
714 ds_put_format(s, " prereq="UUID_FMT, UUID_ARGS(&rq->prereq));
715 ds_put_format(s, " result="UUID_FMT, UUID_ARGS(&rq->result));
716 ds_put_format(s, " data=");
717 json_to_ds(rq->data, JSSF_SORT, s);
718 }
719 \f
720 /* raft_execute_command_reply. */
721
722 static void
723 raft_execute_command_reply_uninit(
724 struct raft_execute_command_reply *rpy OVS_UNUSED)
725 {
726 }
727
728 static void
729 raft_execute_command_reply_clone(
730 struct raft_execute_command_reply *dst OVS_UNUSED,
731 const struct raft_execute_command_reply *src OVS_UNUSED)
732 {
733 }
734
735 static void
736 raft_execute_command_reply_to_jsonrpc(
737 const struct raft_execute_command_reply *rpy, struct json *args)
738 {
739 json_object_put_format(args, "result", UUID_FMT, UUID_ARGS(&rpy->result));
740 json_object_put_string(args, "status",
741 raft_command_status_to_string(rpy->status));
742 if (rpy->commit_index) {
743 raft_put_uint64(args, "commit_index", rpy->commit_index);
744 }
745 }
746
747 static void
748 raft_execute_command_reply_from_jsonrpc(
749 struct ovsdb_parser *p, struct raft_execute_command_reply *rpy)
750 {
751 rpy->result = raft_parse_required_uuid(p, "result");
752
753 const char *status = raft_parse_required_string(p, "status");
754 if (status && !raft_command_status_from_string(status, &rpy->status)) {
755 ovsdb_parser_raise_error(p, "unknown status \"%s\"", status);
756 }
757
758 rpy->commit_index = raft_parse_optional_uint64(p, "commit_index");
759 }
760
761 static void
762 raft_format_execute_command_reply(
763 const struct raft_execute_command_reply *rpy, struct ds *s)
764 {
765 ds_put_format(s, " result="UUID_FMT, UUID_ARGS(&rpy->result));
766 ds_put_format(s, " status=\"%s\"",
767 raft_command_status_to_string(rpy->status));
768 if (rpy->commit_index) {
769 ds_put_format(s, " commit_index=%"PRIu64, rpy->commit_index);
770 }
771 }
772 \f
773 void
774 raft_rpc_uninit(union raft_rpc *rpc)
775 {
776 if (rpc) {
777 free(rpc->common.comment);
778
779 switch (rpc->type) {
780 #define RAFT_RPC(ENUM, NAME) \
781 case ENUM: \
782 raft_##NAME##_uninit(&rpc->NAME); \
783 break;
784 RAFT_RPC_TYPES
785 #undef RAFT_RPC
786 }
787 }
788 }
789
790 union raft_rpc *
791 raft_rpc_clone(const union raft_rpc *src)
792 {
793 union raft_rpc *dst = xmemdup(src, sizeof *src);
794 dst->common.comment = nullable_xstrdup(src->common.comment);
795
796 switch (src->type) {
797 #define RAFT_RPC(ENUM, NAME) \
798 case ENUM: \
799 raft_##NAME##_clone(&dst->NAME, &src->NAME); \
800 break;
801 RAFT_RPC_TYPES
802 #undef RAFT_RPC
803 }
804
805 return dst;
806 }
807
808 /* Returns 'rpc' converted to a jsonrpc_msg. The caller must eventually free
809 * the returned message.
810 *
811 * 'rpc->common.sid' should be the destination server ID; it is omitted if
812 * all-zeros. 'sid' is the source. 'cid' should be the cluster ID; it is
813 * omitted if all-zeros. */
814 struct jsonrpc_msg *
815 raft_rpc_to_jsonrpc(const struct uuid *cid,
816 const struct uuid *sid,
817 const union raft_rpc *rpc)
818 {
819 struct json *args = json_object_create();
820 if (!uuid_is_zero(cid)) {
821 json_object_put_format(args, "cluster", UUID_FMT, UUID_ARGS(cid));
822 }
823 if (!uuid_is_zero(&rpc->common.sid)) {
824 json_object_put_format(args, "to", UUID_FMT,
825 UUID_ARGS(&rpc->common.sid));
826 }
827 json_object_put_format(args, "from", UUID_FMT, UUID_ARGS(sid));
828 if (rpc->common.comment) {
829 json_object_put_string(args, "comment", rpc->common.comment);
830 }
831
832 switch (rpc->type) {
833 #define RAFT_RPC(ENUM, NAME) \
834 case ENUM: \
835 raft_##NAME##_to_jsonrpc(&rpc->NAME, args); \
836 break;
837 RAFT_RPC_TYPES
838 #undef RAFT_RPC
839 default:
840 OVS_NOT_REACHED();
841 }
842
843 return jsonrpc_create_notify(raft_rpc_type_to_string(rpc->type),
844 json_array_create_1(args));
845 }
846
847 /* Parses 'msg' as a Raft message directed to 'sid' and initializes 'rpc'
848 * appropriately. On success, returns NULL and the caller owns the contents of
849 * 'rpc' and must eventually uninitialize it with raft_rpc_uninit(). On
850 * failure, returns an error that the caller must eventually free.
851 *
852 * 'cidp' must point to the Raft cluster's ID. If the cluster ID isn't yet
853 * known, then '*cidp' must be UUID_ZERO and this function will attempt to
854 * initialize it based on 'msg'. */
855 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
856 raft_rpc_from_jsonrpc(struct uuid *cidp, const struct uuid *sid,
857 const struct jsonrpc_msg *msg, union raft_rpc *rpc)
858 {
859 memset(rpc, 0, sizeof *rpc);
860 if (msg->type != JSONRPC_NOTIFY) {
861 return ovsdb_error(NULL, "expecting notify RPC but received %s",
862 jsonrpc_msg_type_to_string(msg->type));
863 }
864
865 if (!raft_rpc_type_from_string(msg->method, &rpc->type)) {
866 return ovsdb_error(NULL, "unknown method %s", msg->method);
867 }
868
869 if (json_array(msg->params)->n != 1) {
870 return ovsdb_error(NULL,
871 "%s RPC has %"PRIuSIZE" parameters (expected 1)",
872 msg->method, json_array(msg->params)->n);
873 }
874
875 struct ovsdb_parser p;
876 ovsdb_parser_init(&p, json_array(msg->params)->elems[0],
877 "raft %s RPC", msg->method);
878
879 bool is_hello = rpc->type == RAFT_RPC_HELLO_REQUEST;
880 bool is_add = rpc->type == RAFT_RPC_ADD_SERVER_REQUEST;
881
882 struct uuid cid;
883 if (raft_parse_uuid(&p, "cluster", is_add, &cid)
884 && !uuid_equals(&cid, cidp)) {
885 if (uuid_is_zero(cidp)) {
886 *cidp = cid;
887 VLOG_INFO("learned cluster ID "CID_FMT, CID_ARGS(&cid));
888 } else {
889 ovsdb_parser_raise_error(&p, "wrong cluster "CID_FMT" "
890 "(expected "CID_FMT")",
891 CID_ARGS(&cid), CID_ARGS(cidp));
892 }
893 }
894
895 struct uuid to_sid;
896 if (raft_parse_uuid(&p, "to", is_add || is_hello, &to_sid)
897 && !uuid_equals(&to_sid, sid)) {
898 ovsdb_parser_raise_error(&p, "misrouted message (addressed to "
899 SID_FMT" but we're "SID_FMT")",
900 SID_ARGS(&to_sid), SID_ARGS(sid));
901 }
902
903 rpc->common.sid = raft_parse_required_uuid(&p, "from");
904 rpc->common.comment = nullable_xstrdup(
905 raft_parse_optional_string(&p, "comment"));
906
907 switch (rpc->type) {
908 #define RAFT_RPC(ENUM, NAME) \
909 case ENUM: \
910 raft_##NAME##_from_jsonrpc(&p, &rpc->NAME); \
911 break;
912 RAFT_RPC_TYPES
913 #undef RAFT_RPC
914
915 default:
916 OVS_NOT_REACHED();
917 }
918
919 struct ovsdb_error *error = ovsdb_parser_finish(&p);
920 if (error) {
921 raft_rpc_uninit(rpc);
922 }
923 return error;
924 }
925
926 /* Appends a formatted representation of 'rpc' to 's'.
927 *
928 * Does not include the RPC's server ID in the formatted representation, since
929 * the caller usually has more context that allows for a more human friendly
930 * name. */
931 void
932 raft_rpc_format(const union raft_rpc *rpc, struct ds *s)
933 {
934 ds_put_cstr(s, raft_rpc_type_to_string(rpc->type));
935 if (rpc->common.comment) {
936 ds_put_format(s, " \"%s\"", rpc->common.comment);
937 }
938 ds_put_char(s, ':');
939
940 switch (rpc->type) {
941 #define RAFT_RPC(ENUM, NAME) \
942 case ENUM: \
943 raft_format_##NAME(&rpc->NAME, s); \
944 break;
945 RAFT_RPC_TYPES
946 #undef RAFT_RPC
947 default:
948 OVS_NOT_REACHED();
949 }
950 }
951
952 uint64_t
953 raft_rpc_get_term(const union raft_rpc *rpc)
954 {
955 switch (rpc->type) {
956 case RAFT_RPC_HELLO_REQUEST:
957 case RAFT_RPC_ADD_SERVER_REQUEST:
958 case RAFT_RPC_ADD_SERVER_REPLY:
959 case RAFT_RPC_REMOVE_SERVER_REQUEST:
960 case RAFT_RPC_REMOVE_SERVER_REPLY:
961 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
962 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
963 return 0;
964
965 case RAFT_RPC_APPEND_REQUEST:
966 return rpc->append_request.term;
967
968 case RAFT_RPC_APPEND_REPLY:
969 return rpc->append_reply.term;
970
971 case RAFT_RPC_VOTE_REQUEST:
972 return rpc->vote_request.term;
973
974 case RAFT_RPC_VOTE_REPLY:
975 return rpc->vote_reply.term;
976
977 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
978 return rpc->install_snapshot_request.term;
979
980 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
981 return rpc->install_snapshot_reply.term;
982
983 case RAFT_RPC_BECOME_LEADER:
984 return rpc->become_leader.term;
985
986 default:
987 OVS_NOT_REACHED();
988 }
989 }
990
991 const struct uuid *
992 raft_rpc_get_vote(const union raft_rpc *rpc)
993 {
994 switch (rpc->type) {
995 case RAFT_RPC_HELLO_REQUEST:
996 case RAFT_RPC_ADD_SERVER_REQUEST:
997 case RAFT_RPC_ADD_SERVER_REPLY:
998 case RAFT_RPC_REMOVE_SERVER_REQUEST:
999 case RAFT_RPC_REMOVE_SERVER_REPLY:
1000 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
1001 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
1002 case RAFT_RPC_APPEND_REQUEST:
1003 case RAFT_RPC_APPEND_REPLY:
1004 case RAFT_RPC_VOTE_REQUEST:
1005 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
1006 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
1007 case RAFT_RPC_BECOME_LEADER:
1008 return NULL;
1009
1010 case RAFT_RPC_VOTE_REPLY:
1011 return &raft_vote_reply_cast(rpc)->vote;
1012
1013 default:
1014 OVS_NOT_REACHED();
1015 }
1016 }
1017
1018 /* Returns the minimum log index that must be synced to disk if 'rpc' is to be
1019 * sent. (This is generally the biggest log index in the message but some
1020 * messages, e.g. RAFT_RPC_APPEND_REQUEST, don't need their entries synced.) */
1021 uint64_t
1022 raft_rpc_get_min_sync_index(const union raft_rpc *rpc)
1023 {
1024 switch (rpc->type) {
1025 case RAFT_RPC_HELLO_REQUEST:
1026 case RAFT_RPC_ADD_SERVER_REQUEST:
1027 case RAFT_RPC_ADD_SERVER_REPLY:
1028 case RAFT_RPC_REMOVE_SERVER_REQUEST:
1029 case RAFT_RPC_REMOVE_SERVER_REPLY:
1030 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
1031 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
1032 case RAFT_RPC_APPEND_REQUEST:
1033 case RAFT_RPC_BECOME_LEADER:
1034 case RAFT_RPC_VOTE_REPLY:
1035 return 0;
1036
1037 case RAFT_RPC_APPEND_REPLY:
1038 return raft_append_reply_cast(rpc)->log_end - 1;
1039
1040 case RAFT_RPC_VOTE_REQUEST:
1041 return raft_vote_request_cast(rpc)->last_log_index;
1042
1043 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
1044 return raft_install_snapshot_request_cast(rpc)->last_index;
1045
1046 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
1047 /* This will need to change if install_snapshot_reply becomes able to
1048 * report an error */
1049 return raft_install_snapshot_reply_cast(rpc)->last_index;
1050
1051 default:
1052 OVS_NOT_REACHED();
1053 }
1054 }