]> git.proxmox.com Git - mirror_ovs.git/blob - ovsdb/raft-rpc.c
trigger: Free leaked ovsdb_schema
[mirror_ovs.git] / ovsdb / raft-rpc.c
1 /*
2 * Copyright (c) 2017, 2018 Nicira, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18
19 #include "raft-rpc.h"
20 #include <stdlib.h>
21 #include <string.h>
22 #include "compiler.h"
23 #include "jsonrpc.h"
24 #include "ovsdb-error.h"
25 #include "ovsdb-parser.h"
26 #include "openvswitch/dynamic-string.h"
27 #include "openvswitch/json.h"
28 #include "openvswitch/vlog.h"
29 #include "sset.h"
30
31 VLOG_DEFINE_THIS_MODULE(raft_rpc);
32
33 #define RAFT_RPC(ENUM, NAME) \
34 static void raft_##NAME##_uninit(struct raft_##NAME *); \
35 static void raft_##NAME##_clone(struct raft_##NAME *, \
36 const struct raft_##NAME *); \
37 static void raft_##NAME##_to_jsonrpc(const struct raft_##NAME *, \
38 struct json *); \
39 static void raft_##NAME##_from_jsonrpc(struct ovsdb_parser *, \
40 struct raft_##NAME *); \
41 static void raft_format_##NAME(const struct raft_##NAME *, struct ds *);
42 RAFT_RPC_TYPES
43 #undef RAFT_RPC
44 \f
45 /* raft_rpc_type. */
46 const char *
47 raft_rpc_type_to_string(enum raft_rpc_type status)
48 {
49 switch (status) {
50 #define RAFT_RPC(ENUM, NAME) case ENUM: return #NAME;
51 RAFT_RPC_TYPES
52 #undef RAFT_RPC
53 }
54 return "<unknown>";
55 }
56
57 bool
58 raft_rpc_type_from_string(const char *s, enum raft_rpc_type *status)
59 {
60 #define RAFT_RPC(ENUM, NAME) \
61 if (!strcmp(s, #NAME)) { \
62 *status = ENUM; \
63 return true; \
64 }
65 RAFT_RPC_TYPES
66 #undef RAFT_RPC
67 return false;
68 }
69 \f
70 /* raft_hello_request. */
71
72 static void
73 raft_hello_request_uninit(struct raft_hello_request *rq)
74 {
75 free(rq->address);
76 }
77
78 static void
79 raft_hello_request_clone(struct raft_hello_request *dst,
80 const struct raft_hello_request *src)
81 {
82 dst->address = nullable_xstrdup(src->address);
83 }
84
85 static void
86 raft_hello_request_to_jsonrpc(const struct raft_hello_request *rq,
87 struct json *args)
88 {
89 json_object_put_string(args, "address", rq->address);
90 }
91
92 static void
93 raft_hello_request_from_jsonrpc(struct ovsdb_parser *p,
94 struct raft_hello_request *rq)
95 {
96 rq->address = nullable_xstrdup(raft_parse_required_string(p, "address"));
97 }
98
99 static void
100 raft_format_hello_request(const struct raft_hello_request *rq,
101 struct ds *s)
102 {
103 ds_put_format(s, " address=\"%s\"", rq->address);
104 }
105 \f
106 /* raft_append_request. */
107
108 static void
109 raft_append_request_uninit(struct raft_append_request *rq)
110 {
111 for (size_t i = 0; i < rq->n_entries; i++) {
112 raft_entry_uninit(&rq->entries[i]);
113 }
114 free(rq->entries);
115 }
116
117 static void
118 raft_append_request_clone(struct raft_append_request *dst,
119 const struct raft_append_request *src)
120 {
121 dst->entries = xmalloc(src->n_entries * sizeof *dst->entries);
122 for (size_t i = 0; i < src->n_entries; i++) {
123 raft_entry_clone(&dst->entries[i], &src->entries[i]);
124 }
125 }
126
127 static void
128 raft_append_request_to_jsonrpc(const struct raft_append_request *rq,
129 struct json *args)
130 {
131 raft_put_uint64(args, "term", rq->term);
132 raft_put_uint64(args, "prev_log_index", rq->prev_log_index);
133 raft_put_uint64(args, "prev_log_term", rq->prev_log_term);
134 raft_put_uint64(args, "leader_commit", rq->leader_commit);
135
136 struct json **entries = xmalloc(rq->n_entries * sizeof *entries);
137 for (size_t i = 0; i < rq->n_entries; i++) {
138 entries[i] = raft_entry_to_json(&rq->entries[i]);
139 }
140 json_object_put(args, "log", json_array_create(entries, rq->n_entries));
141 }
142
143 static void
144 raft_append_request_from_jsonrpc(struct ovsdb_parser *p,
145 struct raft_append_request *rq)
146 {
147 rq->term = raft_parse_required_uint64(p, "term");
148 rq->prev_log_index = raft_parse_required_uint64(p, "prev_log_index");
149 rq->prev_log_term = raft_parse_required_uint64(p, "prev_log_term");
150 rq->leader_commit = raft_parse_required_uint64(p, "leader_commit");
151
152 const struct json *log = ovsdb_parser_member(p, "log", OP_ARRAY);
153 if (!log) {
154 return;
155 }
156 const struct json_array *entries = json_array(log);
157 rq->entries = xmalloc(entries->n * sizeof *rq->entries);
158 rq->n_entries = 0;
159 for (size_t i = 0; i < entries->n; i++) {
160 struct ovsdb_error *error = raft_entry_from_json(entries->elems[i],
161 &rq->entries[i]);
162 if (error) {
163 ovsdb_parser_put_error(p, error);
164 break;
165 }
166 rq->n_entries++;
167 }
168 }
169
170 static void
171 raft_format_append_request(const struct raft_append_request *rq,
172 struct ds *s)
173 {
174 ds_put_format(s, " term=%"PRIu64, rq->term);
175 ds_put_format(s, " prev_log_index=%"PRIu64, rq->prev_log_index);
176 ds_put_format(s, " prev_log_term=%"PRIu64, rq->prev_log_term);
177 ds_put_format(s, " leader_commit=%"PRIu64, rq->leader_commit);
178 ds_put_format(s, " n_entries=%u", rq->n_entries);
179 }
180 \f
181 /* raft_append_reply. */
182
183 const char *
184 raft_append_result_to_string(enum raft_append_result result)
185 {
186 switch (result) {
187 case RAFT_APPEND_OK:
188 return "OK";
189 case RAFT_APPEND_INCONSISTENCY:
190 return "inconsistency";
191 case RAFT_APPEND_IO_ERROR:
192 return "I/O error";
193 default:
194 return NULL;
195 }
196 }
197
198 bool
199 raft_append_result_from_string(const char *s, enum raft_append_result *resultp)
200 {
201 for (enum raft_append_result result = 0; ; result++) {
202 const char *s2 = raft_append_result_to_string(result);
203 if (!s2) {
204 *resultp = 0;
205 return false;
206 } else if (!strcmp(s, s2)) {
207 *resultp = result;
208 return true;
209 }
210 }
211 }
212
213 static void
214 raft_append_reply_uninit(struct raft_append_reply *rpy OVS_UNUSED)
215 {
216 }
217
218 static void
219 raft_append_reply_clone(struct raft_append_reply *dst OVS_UNUSED,
220 const struct raft_append_reply *src OVS_UNUSED)
221 {
222 }
223
224 static void
225 raft_append_reply_to_jsonrpc(const struct raft_append_reply *rpy,
226 struct json *args)
227 {
228 raft_put_uint64(args, "term", rpy->term);
229 raft_put_uint64(args, "log_end", rpy->log_end);
230 raft_put_uint64(args, "prev_log_index", rpy->prev_log_index);
231 raft_put_uint64(args, "prev_log_term", rpy->prev_log_term);
232 raft_put_uint64(args, "n_entries", rpy->n_entries);
233 json_object_put_string(args, "result",
234 raft_append_result_to_string(rpy->result));
235 }
236
237 static void
238 raft_append_reply_from_jsonrpc(struct ovsdb_parser *p,
239 struct raft_append_reply *rpy)
240 {
241 rpy->term = raft_parse_required_uint64(p, "term");
242 rpy->log_end = raft_parse_required_uint64(p, "log_end");
243 rpy->prev_log_index = raft_parse_required_uint64(p, "prev_log_index");
244 rpy->prev_log_term = raft_parse_required_uint64(p, "prev_log_term");
245 rpy->n_entries = raft_parse_required_uint64(p, "n_entries");
246
247 const char *result = raft_parse_required_string(p, "result");
248 if (result && !raft_append_result_from_string(result, &rpy->result)) {
249 ovsdb_parser_raise_error(p, "unknown result \"%s\"", result);
250 }
251 }
252
253 static void
254 raft_format_append_reply(const struct raft_append_reply *rpy, struct ds *s)
255 {
256 ds_put_format(s, " term=%"PRIu64, rpy->term);
257 ds_put_format(s, " log_end=%"PRIu64, rpy->log_end);
258 ds_put_format(s, " result=\"%s\"",
259 raft_append_result_to_string(rpy->result));
260 }
261 \f
262 /* raft_vote_request. */
263
264 static void
265 raft_vote_request_uninit(struct raft_vote_request *rq OVS_UNUSED)
266 {
267 }
268
269 static void
270 raft_vote_request_clone(struct raft_vote_request *dst OVS_UNUSED,
271 const struct raft_vote_request *src OVS_UNUSED)
272 {
273 }
274
275 static void
276 raft_vote_request_to_jsonrpc(const struct raft_vote_request *rq,
277 struct json *args)
278 {
279 raft_put_uint64(args, "term", rq->term);
280 raft_put_uint64(args, "last_log_index", rq->last_log_index);
281 raft_put_uint64(args, "last_log_term", rq->last_log_term);
282 if (rq->leadership_transfer) {
283 json_object_put(args, "leadership_transfer",
284 json_boolean_create(true));
285 }
286 }
287
288 static void
289 raft_vote_request_from_jsonrpc(struct ovsdb_parser *p,
290 struct raft_vote_request *rq)
291 {
292 rq->term = raft_parse_required_uint64(p, "term");
293 rq->last_log_index = raft_parse_required_uint64(p, "last_log_index");
294 rq->last_log_term = raft_parse_required_uint64(p, "last_log_term");
295 rq->leadership_transfer
296 = raft_parse_optional_boolean(p, "leadership_transfer") == 1;
297 }
298
299 static void
300 raft_format_vote_request(const struct raft_vote_request *rq, struct ds *s)
301 {
302 ds_put_format(s, " term=%"PRIu64, rq->term);
303 ds_put_format(s, " last_log_index=%"PRIu64, rq->last_log_index);
304 ds_put_format(s, " last_log_term=%"PRIu64, rq->last_log_term);
305 if (rq->leadership_transfer) {
306 ds_put_cstr(s, " leadership_transfer=true");
307 }
308 }
309 \f
310 /* raft_vote_reply. */
311
312 static void
313 raft_vote_reply_uninit(struct raft_vote_reply *rpy OVS_UNUSED)
314 {
315 }
316
317 static void
318 raft_vote_reply_clone(struct raft_vote_reply *dst OVS_UNUSED,
319 const struct raft_vote_reply *src OVS_UNUSED)
320 {
321 }
322
323 static void
324 raft_vote_reply_to_jsonrpc(const struct raft_vote_reply *rpy,
325 struct json *args)
326 {
327 raft_put_uint64(args, "term", rpy->term);
328 json_object_put_format(args, "vote", UUID_FMT, UUID_ARGS(&rpy->vote));
329 }
330
331 static void
332 raft_vote_reply_from_jsonrpc(struct ovsdb_parser *p,
333 struct raft_vote_reply *rpy)
334 {
335 rpy->term = raft_parse_required_uint64(p, "term");
336 rpy->vote = raft_parse_required_uuid(p, "vote");
337 }
338
339 static void
340 raft_format_vote_reply(const struct raft_vote_reply *rpy, struct ds *s)
341 {
342 ds_put_format(s, " term=%"PRIu64, rpy->term);
343 ds_put_format(s, " vote="SID_FMT, SID_ARGS(&rpy->vote));
344 }
345 \f
346 /* raft_add_server_request */
347
348 static void
349 raft_add_server_request_uninit(struct raft_add_server_request *rq)
350 {
351 free(rq->address);
352 }
353
354 static void
355 raft_add_server_request_clone(struct raft_add_server_request *dst,
356 const struct raft_add_server_request *src)
357 {
358 dst->address = nullable_xstrdup(src->address);
359 }
360
361 static void
362 raft_add_server_request_to_jsonrpc(const struct raft_add_server_request *rq,
363 struct json *args)
364 {
365 json_object_put_string(args, "address", rq->address);
366 }
367
368 static void
369 raft_add_server_request_from_jsonrpc(struct ovsdb_parser *p,
370 struct raft_add_server_request *rq)
371 {
372 rq->address = nullable_xstrdup(raft_parse_required_string(p, "address"));
373 }
374
375 static void
376 raft_format_add_server_request(const struct raft_add_server_request *rq,
377 struct ds *s)
378 {
379 ds_put_format(s, " address=\"%s\"", rq->address);
380 }
381 \f
382 /* raft_add_server_reply. */
383
384 static void
385 raft_add_server_reply_uninit(struct raft_add_server_reply *rpy)
386 {
387 sset_destroy(&rpy->remote_addresses);
388 }
389
390 static void
391 raft_add_server_reply_clone(struct raft_add_server_reply *dst,
392 const struct raft_add_server_reply *src)
393 {
394 sset_clone(&dst->remote_addresses, &src->remote_addresses);
395 }
396
397 static void
398 raft_add_server_reply_to_jsonrpc(const struct raft_add_server_reply *rpy,
399 struct json *args)
400 {
401 json_object_put(args, "success", json_boolean_create(rpy->success));
402 if (!sset_is_empty(&rpy->remote_addresses)) {
403 json_object_put(args, "remote_addresses",
404 raft_addresses_to_json(&rpy->remote_addresses));
405 }
406 }
407
408 static void
409 raft_add_server_reply_from_jsonrpc(struct ovsdb_parser *p,
410 struct raft_add_server_reply *rpy)
411 {
412 rpy->success = raft_parse_required_boolean(p, "success");
413
414 const struct json *json = ovsdb_parser_member(p, "remote_addresses",
415 OP_ARRAY | OP_OPTIONAL);
416 if (json) {
417 ovsdb_parser_put_error(p, raft_addresses_from_json(
418 json, &rpy->remote_addresses));
419 } else {
420 sset_init(&rpy->remote_addresses);
421 }
422 }
423
424 static void
425 raft_format_add_server_reply(const struct raft_add_server_reply *rpy,
426 struct ds *s)
427 {
428 ds_put_format(s, " success=%s", rpy->success ? "true" : "false");
429 if (!sset_is_empty(&rpy->remote_addresses)) {
430 ds_put_cstr(s, " remote_addresses=[");
431
432 const char *address;
433 int i = 0;
434 SSET_FOR_EACH (address, &rpy->remote_addresses) {
435 if (i++ > 0) {
436 ds_put_cstr(s, ", ");
437 }
438 ds_put_cstr(s, address);
439 }
440 ds_put_char(s, ']');
441 }
442 }
443 \f
444 /* raft_remove_server_reply. */
445
446 static void
447 raft_remove_server_reply_uninit(
448 struct raft_remove_server_reply *rpy OVS_UNUSED)
449 {
450 }
451
452 static void
453 raft_remove_server_reply_clone(
454 struct raft_remove_server_reply *dst OVS_UNUSED,
455 const struct raft_remove_server_reply *src OVS_UNUSED)
456 {
457 }
458
459 static void
460 raft_remove_server_reply_to_jsonrpc(const struct raft_remove_server_reply *rpy,
461 struct json *args)
462 {
463 if (!uuid_is_zero(&rpy->target_sid)) {
464 json_object_put_format(args, "target_server",
465 UUID_FMT, UUID_ARGS(&rpy->target_sid));
466 }
467 json_object_put(args, "success", json_boolean_create(rpy->success));
468 }
469
470 static void
471 raft_remove_server_reply_from_jsonrpc(struct ovsdb_parser *p,
472 struct raft_remove_server_reply *rpy)
473 {
474 rpy->success = raft_parse_required_boolean(p, "success");
475 raft_parse_optional_uuid(p, "target_server", &rpy->target_sid);
476 }
477
478 static void
479 raft_format_remove_server_reply(const struct raft_remove_server_reply *rpy,
480 struct ds *s)
481 {
482 ds_put_format(s, " success=%s", rpy->success ? "true" : "false");
483 }
484 \f
485 /* raft_install_snapshot_request. */
486
487 static void
488 raft_install_snapshot_request_uninit(
489 struct raft_install_snapshot_request *rq)
490 {
491 json_destroy(rq->last_servers);
492 json_destroy(rq->data);
493 }
494
495 static void
496 raft_install_snapshot_request_clone(
497 struct raft_install_snapshot_request *dst,
498 const struct raft_install_snapshot_request *src)
499 {
500 dst->last_servers = json_clone(src->last_servers);
501 dst->data = json_clone(src->data);
502 }
503
504 static void
505 raft_install_snapshot_request_to_jsonrpc(
506 const struct raft_install_snapshot_request *rq, struct json *args)
507 {
508 raft_put_uint64(args, "term", rq->term);
509 raft_put_uint64(args, "last_index", rq->last_index);
510 raft_put_uint64(args, "last_term", rq->last_term);
511 json_object_put(args, "last_servers", json_clone(rq->last_servers));
512 json_object_put_format(args, "last_eid",
513 UUID_FMT, UUID_ARGS(&rq->last_eid));
514
515 json_object_put(args, "data", json_clone(rq->data));
516 }
517
518 static void
519 raft_install_snapshot_request_from_jsonrpc(
520 struct ovsdb_parser *p, struct raft_install_snapshot_request *rq)
521 {
522 rq->last_servers = json_nullable_clone(
523 ovsdb_parser_member(p, "last_servers", OP_OBJECT));
524 ovsdb_parser_put_error(p, raft_servers_validate_json(rq->last_servers));
525
526 rq->term = raft_parse_required_uint64(p, "term");
527 rq->last_index = raft_parse_required_uint64(p, "last_index");
528 rq->last_term = raft_parse_required_uint64(p, "last_term");
529 rq->last_eid = raft_parse_required_uuid(p, "last_eid");
530
531 rq->data = json_nullable_clone(
532 ovsdb_parser_member(p, "data", OP_OBJECT | OP_ARRAY));
533 }
534
535 static void
536 raft_format_install_snapshot_request(
537 const struct raft_install_snapshot_request *rq, struct ds *s)
538 {
539 ds_put_format(s, " term=%"PRIu64, rq->term);
540 ds_put_format(s, " last_index=%"PRIu64, rq->last_index);
541 ds_put_format(s, " last_term=%"PRIu64, rq->last_term);
542 ds_put_format(s, " last_eid="UUID_FMT, UUID_ARGS(&rq->last_eid));
543 ds_put_cstr(s, " last_servers=");
544
545 struct hmap servers;
546 struct ovsdb_error *error =
547 raft_servers_from_json(rq->last_servers, &servers);
548 if (!error) {
549 raft_servers_format(&servers, s);
550 raft_servers_destroy(&servers);
551 } else {
552 ds_put_cstr(s, "***error***");
553 ovsdb_error_destroy(error);
554 }
555 }
556 \f
557 /* raft_install_snapshot_reply. */
558
559 static void
560 raft_install_snapshot_reply_uninit(
561 struct raft_install_snapshot_reply *rpy OVS_UNUSED)
562 {
563 }
564
565 static void
566 raft_install_snapshot_reply_clone(
567 struct raft_install_snapshot_reply *dst OVS_UNUSED,
568 const struct raft_install_snapshot_reply *src OVS_UNUSED)
569 {
570 }
571
572 static void
573 raft_install_snapshot_reply_to_jsonrpc(
574 const struct raft_install_snapshot_reply *rpy, struct json *args)
575 {
576 raft_put_uint64(args, "term", rpy->term);
577 raft_put_uint64(args, "last_index", rpy->last_index);
578 raft_put_uint64(args, "last_term", rpy->last_term);
579 }
580
581 static void
582 raft_install_snapshot_reply_from_jsonrpc(
583 struct ovsdb_parser *p,
584 struct raft_install_snapshot_reply *rpy)
585 {
586 rpy->term = raft_parse_required_uint64(p, "term");
587 rpy->last_index = raft_parse_required_uint64(p, "last_index");
588 rpy->last_term = raft_parse_required_uint64(p, "last_term");
589 }
590
591 static void
592 raft_format_install_snapshot_reply(
593 const struct raft_install_snapshot_reply *rpy, struct ds *s)
594 {
595 ds_put_format(s, " term=%"PRIu64, rpy->term);
596 ds_put_format(s, " last_index=%"PRIu64, rpy->last_index);
597 ds_put_format(s, " last_term=%"PRIu64, rpy->last_term);
598 }
599 \f
600 /* raft_remove_server_request. */
601
602 static void
603 raft_remove_server_request_uninit(
604 struct raft_remove_server_request *rq OVS_UNUSED)
605 {
606 }
607
608 static void
609 raft_remove_server_request_clone(
610 struct raft_remove_server_request *dst OVS_UNUSED,
611 const struct raft_remove_server_request *src OVS_UNUSED)
612 {
613 }
614
615 static void
616 raft_remove_server_request_to_jsonrpc(
617 const struct raft_remove_server_request *rq, struct json *args)
618 {
619 json_object_put_format(args, "server_id", UUID_FMT, UUID_ARGS(&rq->sid));
620 }
621
622 static void
623 raft_remove_server_request_from_jsonrpc(struct ovsdb_parser *p,
624 struct raft_remove_server_request *rq)
625 {
626 rq->sid = raft_parse_required_uuid(p, "server_id");
627 }
628
629 static void
630 raft_format_remove_server_request(const struct raft_remove_server_request *rq,
631 struct ds *s)
632 {
633 ds_put_format(s, " server="SID_FMT, SID_ARGS(&rq->sid));
634 }
635 \f
636 /* raft_become_leader. */
637
638 static void
639 raft_become_leader_uninit(struct raft_become_leader *rpc OVS_UNUSED)
640 {
641 }
642
643 static void
644 raft_become_leader_clone(struct raft_become_leader *dst OVS_UNUSED,
645 const struct raft_become_leader *src OVS_UNUSED)
646 {
647 }
648
649 static void
650 raft_become_leader_to_jsonrpc(const struct raft_become_leader *rpc,
651 struct json *args)
652 {
653 raft_put_uint64(args, "term", rpc->term);
654 }
655
656 static void
657 raft_become_leader_from_jsonrpc(struct ovsdb_parser *p,
658 struct raft_become_leader *rpc)
659 {
660 rpc->term = raft_parse_required_uint64(p, "term");
661 }
662
663 static void
664 raft_format_become_leader(const struct raft_become_leader *rq, struct ds *s)
665 {
666 ds_put_format(s, " term=%"PRIu64, rq->term);
667 }
668 \f
669 /* raft_execute_command_request. */
670
671 static void
672 raft_execute_command_request_uninit(
673 struct raft_execute_command_request *rq)
674 {
675 json_destroy(rq->data);
676 }
677
678 static void
679 raft_execute_command_request_clone(
680 struct raft_execute_command_request *dst,
681 const struct raft_execute_command_request *src)
682 {
683 dst->data = json_clone(src->data);
684 }
685
686 static void
687 raft_execute_command_request_to_jsonrpc(
688 const struct raft_execute_command_request *rq, struct json *args)
689 {
690 json_object_put(args, "data", json_clone(rq->data));
691 json_object_put_format(args, "prereq", UUID_FMT, UUID_ARGS(&rq->prereq));
692 json_object_put_format(args, "result", UUID_FMT, UUID_ARGS(&rq->result));
693 }
694
695 static void
696 raft_execute_command_request_from_jsonrpc(
697 struct ovsdb_parser *p, struct raft_execute_command_request *rq)
698 {
699 rq->data = json_nullable_clone(ovsdb_parser_member(p, "data",
700 OP_OBJECT | OP_ARRAY));
701 rq->prereq = raft_parse_required_uuid(p, "prereq");
702 rq->result = raft_parse_required_uuid(p, "result");
703 }
704
705 static void
706 raft_format_execute_command_request(
707 const struct raft_execute_command_request *rq, struct ds *s)
708 {
709 ds_put_format(s, " prereq="UUID_FMT, UUID_ARGS(&rq->prereq));
710 ds_put_format(s, " result="UUID_FMT, UUID_ARGS(&rq->result));
711 ds_put_format(s, " data=");
712 json_to_ds(rq->data, JSSF_SORT, s);
713 }
714 \f
715 /* raft_execute_command_reply. */
716
717 static void
718 raft_execute_command_reply_uninit(
719 struct raft_execute_command_reply *rpy OVS_UNUSED)
720 {
721 }
722
723 static void
724 raft_execute_command_reply_clone(
725 struct raft_execute_command_reply *dst OVS_UNUSED,
726 const struct raft_execute_command_reply *src OVS_UNUSED)
727 {
728 }
729
730 static void
731 raft_execute_command_reply_to_jsonrpc(
732 const struct raft_execute_command_reply *rpy, struct json *args)
733 {
734 json_object_put_format(args, "result", UUID_FMT, UUID_ARGS(&rpy->result));
735 json_object_put_string(args, "status",
736 raft_command_status_to_string(rpy->status));
737 if (rpy->commit_index) {
738 raft_put_uint64(args, "commit_index", rpy->commit_index);
739 }
740 }
741
742 static void
743 raft_execute_command_reply_from_jsonrpc(
744 struct ovsdb_parser *p, struct raft_execute_command_reply *rpy)
745 {
746 rpy->result = raft_parse_required_uuid(p, "result");
747
748 const char *status = raft_parse_required_string(p, "status");
749 if (status && !raft_command_status_from_string(status, &rpy->status)) {
750 ovsdb_parser_raise_error(p, "unknown status \"%s\"", status);
751 }
752
753 rpy->commit_index = raft_parse_optional_uint64(p, "commit_index");
754 }
755
756 static void
757 raft_format_execute_command_reply(
758 const struct raft_execute_command_reply *rpy, struct ds *s)
759 {
760 ds_put_format(s, " result="UUID_FMT, UUID_ARGS(&rpy->result));
761 ds_put_format(s, " status=\"%s\"",
762 raft_command_status_to_string(rpy->status));
763 if (rpy->commit_index) {
764 ds_put_format(s, " commit_index=%"PRIu64, rpy->commit_index);
765 }
766 }
767 \f
768 void
769 raft_rpc_uninit(union raft_rpc *rpc)
770 {
771 if (rpc) {
772 free(rpc->common.comment);
773
774 switch (rpc->type) {
775 #define RAFT_RPC(ENUM, NAME) \
776 case ENUM: \
777 raft_##NAME##_uninit(&rpc->NAME); \
778 break;
779 RAFT_RPC_TYPES
780 #undef RAFT_RPC
781 }
782 }
783 }
784
785 union raft_rpc *
786 raft_rpc_clone(const union raft_rpc *src)
787 {
788 union raft_rpc *dst = xmemdup(src, sizeof *src);
789 dst->common.comment = nullable_xstrdup(src->common.comment);
790
791 switch (src->type) {
792 #define RAFT_RPC(ENUM, NAME) \
793 case ENUM: \
794 raft_##NAME##_clone(&dst->NAME, &src->NAME); \
795 break;
796 RAFT_RPC_TYPES
797 #undef RAFT_RPC
798 }
799
800 return dst;
801 }
802
803 /* Returns 'rpc' converted to a jsonrpc_msg. The caller must eventually free
804 * the returned message.
805 *
806 * 'rpc->common.sid' should be the destination server ID; it is omitted if
807 * all-zeros. 'sid' is the source. 'cid' should be the cluster ID; it is
808 * omitted if all-zeros. */
809 struct jsonrpc_msg *
810 raft_rpc_to_jsonrpc(const struct uuid *cid,
811 const struct uuid *sid,
812 const union raft_rpc *rpc)
813 {
814 struct json *args = json_object_create();
815 if (!uuid_is_zero(cid)) {
816 json_object_put_format(args, "cluster", UUID_FMT, UUID_ARGS(cid));
817 }
818 if (!uuid_is_zero(&rpc->common.sid)) {
819 json_object_put_format(args, "to", UUID_FMT,
820 UUID_ARGS(&rpc->common.sid));
821 }
822 json_object_put_format(args, "from", UUID_FMT, UUID_ARGS(sid));
823 if (rpc->common.comment) {
824 json_object_put_string(args, "comment", rpc->common.comment);
825 }
826
827 switch (rpc->type) {
828 #define RAFT_RPC(ENUM, NAME) \
829 case ENUM: \
830 raft_##NAME##_to_jsonrpc(&rpc->NAME, args); \
831 break;
832 RAFT_RPC_TYPES
833 #undef RAFT_RPC
834 default:
835 OVS_NOT_REACHED();
836 }
837
838 return jsonrpc_create_notify(raft_rpc_type_to_string(rpc->type),
839 json_array_create_1(args));
840 }
841
842 /* Parses 'msg' as a Raft message directed to 'sid' and initializes 'rpc'
843 * appropriately. On success, returns NULL and the caller owns the contents of
844 * 'rpc' and must eventually uninitialize it with raft_rpc_uninit(). On
845 * failure, returns an error that the caller must eventually free.
846 *
847 * 'cidp' must point to the Raft cluster's ID. If the cluster ID isn't yet
848 * known, then '*cidp' must be UUID_ZERO and this function will attempt to
849 * initialize it based on 'msg'. */
850 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
851 raft_rpc_from_jsonrpc(struct uuid *cidp, const struct uuid *sid,
852 const struct jsonrpc_msg *msg, union raft_rpc *rpc)
853 {
854 memset(rpc, 0, sizeof *rpc);
855 if (msg->type != JSONRPC_NOTIFY) {
856 return ovsdb_error(NULL, "expecting notify RPC but received %s",
857 jsonrpc_msg_type_to_string(msg->type));
858 }
859
860 if (!raft_rpc_type_from_string(msg->method, &rpc->type)) {
861 return ovsdb_error(NULL, "unknown method %s", msg->method);
862 }
863
864 if (json_array(msg->params)->n != 1) {
865 return ovsdb_error(NULL,
866 "%s RPC has %"PRIuSIZE" parameters (expected 1)",
867 msg->method, json_array(msg->params)->n);
868 }
869
870 struct ovsdb_parser p;
871 ovsdb_parser_init(&p, json_array(msg->params)->elems[0],
872 "raft %s RPC", msg->method);
873
874 bool is_hello = rpc->type == RAFT_RPC_HELLO_REQUEST;
875 bool is_add = rpc->type == RAFT_RPC_ADD_SERVER_REQUEST;
876
877 struct uuid cid;
878 if (raft_parse_uuid(&p, "cluster", is_add, &cid)
879 && !uuid_equals(&cid, cidp)) {
880 if (uuid_is_zero(cidp)) {
881 *cidp = cid;
882 VLOG_INFO("learned cluster ID "CID_FMT, CID_ARGS(&cid));
883 } else {
884 ovsdb_parser_raise_error(&p, "wrong cluster "CID_FMT" "
885 "(expected "CID_FMT")",
886 CID_ARGS(&cid), CID_ARGS(cidp));
887 }
888 }
889
890 struct uuid to_sid;
891 if (raft_parse_uuid(&p, "to", is_add || is_hello, &to_sid)
892 && !uuid_equals(&to_sid, sid)) {
893 ovsdb_parser_raise_error(&p, "misrouted message (addressed to "
894 SID_FMT" but we're "SID_FMT")",
895 SID_ARGS(&to_sid), SID_ARGS(sid));
896 }
897
898 rpc->common.sid = raft_parse_required_uuid(&p, "from");
899 rpc->common.comment = nullable_xstrdup(
900 raft_parse_optional_string(&p, "comment"));
901
902 switch (rpc->type) {
903 #define RAFT_RPC(ENUM, NAME) \
904 case ENUM: \
905 raft_##NAME##_from_jsonrpc(&p, &rpc->NAME); \
906 break;
907 RAFT_RPC_TYPES
908 #undef RAFT_RPC
909
910 default:
911 OVS_NOT_REACHED();
912 }
913
914 struct ovsdb_error *error = ovsdb_parser_finish(&p);
915 if (error) {
916 raft_rpc_uninit(rpc);
917 }
918 return error;
919 }
920
921 /* Appends a formatted representation of 'rpc' to 's'.
922 *
923 * Does not include the RPC's server ID in the formatted representation, since
924 * the caller usually has more context that allows for a more human friendly
925 * name. */
926 void
927 raft_rpc_format(const union raft_rpc *rpc, struct ds *s)
928 {
929 ds_put_cstr(s, raft_rpc_type_to_string(rpc->type));
930 if (rpc->common.comment) {
931 ds_put_format(s, " \"%s\"", rpc->common.comment);
932 }
933 ds_put_char(s, ':');
934
935 switch (rpc->type) {
936 #define RAFT_RPC(ENUM, NAME) \
937 case ENUM: \
938 raft_format_##NAME(&rpc->NAME, s); \
939 break;
940 RAFT_RPC_TYPES
941 #undef RAFT_RPC
942 default:
943 OVS_NOT_REACHED();
944 }
945 }
946
947 uint64_t
948 raft_rpc_get_term(const union raft_rpc *rpc)
949 {
950 switch (rpc->type) {
951 case RAFT_RPC_HELLO_REQUEST:
952 case RAFT_RPC_ADD_SERVER_REQUEST:
953 case RAFT_RPC_ADD_SERVER_REPLY:
954 case RAFT_RPC_REMOVE_SERVER_REQUEST:
955 case RAFT_RPC_REMOVE_SERVER_REPLY:
956 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
957 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
958 return 0;
959
960 case RAFT_RPC_APPEND_REQUEST:
961 return rpc->append_request.term;
962
963 case RAFT_RPC_APPEND_REPLY:
964 return rpc->append_reply.term;
965
966 case RAFT_RPC_VOTE_REQUEST:
967 return rpc->vote_request.term;
968
969 case RAFT_RPC_VOTE_REPLY:
970 return rpc->vote_reply.term;
971
972 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
973 return rpc->install_snapshot_request.term;
974
975 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
976 return rpc->install_snapshot_reply.term;
977
978 case RAFT_RPC_BECOME_LEADER:
979 return rpc->become_leader.term;
980
981 default:
982 OVS_NOT_REACHED();
983 }
984 }
985
986 const struct uuid *
987 raft_rpc_get_vote(const union raft_rpc *rpc)
988 {
989 switch (rpc->type) {
990 case RAFT_RPC_HELLO_REQUEST:
991 case RAFT_RPC_ADD_SERVER_REQUEST:
992 case RAFT_RPC_ADD_SERVER_REPLY:
993 case RAFT_RPC_REMOVE_SERVER_REQUEST:
994 case RAFT_RPC_REMOVE_SERVER_REPLY:
995 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
996 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
997 case RAFT_RPC_APPEND_REQUEST:
998 case RAFT_RPC_APPEND_REPLY:
999 case RAFT_RPC_VOTE_REQUEST:
1000 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
1001 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
1002 case RAFT_RPC_BECOME_LEADER:
1003 return NULL;
1004
1005 case RAFT_RPC_VOTE_REPLY:
1006 return &raft_vote_reply_cast(rpc)->vote;
1007
1008 default:
1009 OVS_NOT_REACHED();
1010 }
1011 }
1012
1013 /* Returns the minimum log index that must be synced to disk if 'rpc' is to be
1014 * sent. (This is generally the biggest log index in the message but some
1015 * messages, e.g. RAFT_RPC_APPEND_REQUEST, don't need their entries synced.) */
1016 uint64_t
1017 raft_rpc_get_min_sync_index(const union raft_rpc *rpc)
1018 {
1019 switch (rpc->type) {
1020 case RAFT_RPC_HELLO_REQUEST:
1021 case RAFT_RPC_ADD_SERVER_REQUEST:
1022 case RAFT_RPC_ADD_SERVER_REPLY:
1023 case RAFT_RPC_REMOVE_SERVER_REQUEST:
1024 case RAFT_RPC_REMOVE_SERVER_REPLY:
1025 case RAFT_RPC_EXECUTE_COMMAND_REQUEST:
1026 case RAFT_RPC_EXECUTE_COMMAND_REPLY:
1027 case RAFT_RPC_APPEND_REQUEST:
1028 case RAFT_RPC_BECOME_LEADER:
1029 case RAFT_RPC_VOTE_REPLY:
1030 return 0;
1031
1032 case RAFT_RPC_APPEND_REPLY:
1033 return raft_append_reply_cast(rpc)->log_end - 1;
1034
1035 case RAFT_RPC_VOTE_REQUEST:
1036 return raft_vote_request_cast(rpc)->last_log_index;
1037
1038 case RAFT_RPC_INSTALL_SNAPSHOT_REQUEST:
1039 return raft_install_snapshot_request_cast(rpc)->last_index;
1040
1041 case RAFT_RPC_INSTALL_SNAPSHOT_REPLY:
1042 /* This will need to change if install_snapshot_reply becomes able to
1043 * report an error */
1044 return raft_install_snapshot_reply_cast(rpc)->last_index;
1045
1046 default:
1047 OVS_NOT_REACHED();
1048 }
1049 }