]> git.proxmox.com Git - pve-cluster.git/blob - data/src/dfsm.c
datacenter config: mark notification settings as deprecated/unused
[pve-cluster.git] / data / src / dfsm.c
1 /*
2 Copyright (C) 2010 Proxmox Server Solutions GmbH
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU Affero General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Affero General Public License for more details.
13
14 You should have received a copy of the GNU Affero General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 Author: Dietmar Maurer <dietmar@proxmox.com>
18
19 */
20
21
22 /* NOTE: we try to keep the CPG handle as long as possible, because
23 * calling cpg_initialize/cpg_finalize multiple times from the
24 * same process confuses corosync.
25 * Note: CS_ERR_LIBRARY is returned when corosync died
26 */
27
28 #ifdef HAVE_CONFIG_H
29 #include <config.h>
30 #endif /* HAVE_CONFIG_H */
31
32 #include <sys/types.h>
33 #include <inttypes.h>
34 #include <unistd.h>
35 #include <string.h>
36 #include <stdlib.h>
37
38 #include <corosync/corotypes.h>
39 #include <corosync/cpg.h>
40 #include <glib.h>
41
42 #include "cfs-utils.h"
43 #include "dfsm.h"
44
45 static cpg_callbacks_t cpg_callbacks;
46
47 typedef enum {
48 DFSM_MODE_START = 0,
49 DFSM_MODE_START_SYNC = 1,
50 DFSM_MODE_SYNCED = 2,
51 DFSM_MODE_UPDATE = 3,
52
53 /* values >= 128 indicates abnormal/error conditions */
54 DFSM_ERROR_MODE_START = 128,
55 DFSM_MODE_LEAVE = 253,
56 DFSM_MODE_VERSION_ERROR = 254,
57 DFSM_MODE_ERROR = 255,
58 } dfsm_mode_t;
59
60 typedef enum {
61 DFSM_MESSAGE_NORMAL = 0,
62 DFSM_MESSAGE_SYNC_START = 1,
63 DFSM_MESSAGE_STATE = 2,
64 DFSM_MESSAGE_UPDATE = 3,
65 DFSM_MESSAGE_UPDATE_COMPLETE = 4,
66 DFSM_MESSAGE_VERIFY_REQUEST = 5,
67 DFSM_MESSAGE_VERIFY = 6,
68 } dfsm_message_t;
69
70 #define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
71
72 typedef struct {
73 uint16_t type;
74 uint16_t subtype;
75 uint32_t protocol_version;
76 uint32_t time;
77 uint32_t reserved;
78 } dfsm_message_header_t;
79
80 typedef struct {
81 uint32_t epoch; // per process (not globally unique)
82 uint32_t time;
83 uint32_t nodeid;
84 uint32_t pid;
85 } dfsm_sync_epoch_t;
86
87 typedef struct {
88 dfsm_message_header_t base;
89 dfsm_sync_epoch_t epoch;
90 } dfsm_message_state_header_t;
91
92 typedef struct {
93 dfsm_message_header_t base;
94 uint64_t count;
95 } dfsm_message_normal_header_t;
96
97 typedef struct {
98 uint32_t nodeid;
99 uint32_t pid;
100 uint64_t msg_count;
101 void *msg;
102 int msg_len; // fixme: unsigned?
103 } dfsm_queued_message_t;
104
105 struct dfsm {
106 const char *log_domain;
107 cpg_callbacks_t *cpg_callbacks;
108 dfsm_callbacks_t *dfsm_callbacks;
109 cpg_handle_t cpg_handle;
110 struct cpg_name cpg_group_name;
111 uint32_t nodeid;
112 uint32_t pid;
113 int we_are_member;
114
115 guint32 protocol_version;
116 gpointer data;
117
118 gboolean joined;
119
120 /* mode is protected with mode_mutex */
121 GMutex mode_mutex;
122 dfsm_mode_t mode;
123
124 GHashTable *members; /* contains dfsm_node_info_t pointers */
125 dfsm_sync_info_t *sync_info;
126 uint32_t local_epoch_counter;
127 dfsm_sync_epoch_t sync_epoch;
128 uint32_t lowest_nodeid;
129 GSequence *msg_queue;
130 GList *sync_queue;
131
132 /* synchrounous message transmission, protected with sync_mutex */
133 GMutex sync_mutex;
134 GCond sync_cond;
135 GHashTable *results;
136 uint64_t msgcount;
137 uint64_t msgcount_rcvd;
138
139 /* state verification */
140 guchar csum[32];
141 dfsm_sync_epoch_t csum_epoch;
142 uint64_t csum_id;
143 uint64_t csum_counter;
144 };
145
146 static gboolean dfsm_deliver_queue(dfsm_t *dfsm);
147 static gboolean dfsm_deliver_sync_queue(dfsm_t *dfsm);
148
149 gboolean
150 dfsm_nodeid_is_local(
151 dfsm_t *dfsm,
152 uint32_t nodeid,
153 uint32_t pid)
154 {
155 g_return_val_if_fail(dfsm != NULL, FALSE);
156
157 return (nodeid == dfsm->nodeid && pid == dfsm->pid);
158 }
159
160
161 static void
162 dfsm_send_sync_message_abort(dfsm_t *dfsm)
163 {
164 g_return_if_fail(dfsm != NULL);
165
166 g_mutex_lock (&dfsm->sync_mutex);
167 dfsm->msgcount_rcvd = dfsm->msgcount;
168 g_cond_broadcast (&dfsm->sync_cond);
169 g_mutex_unlock (&dfsm->sync_mutex);
170 }
171
172 static void
173 dfsm_record_local_result(
174 dfsm_t *dfsm,
175 uint64_t msg_count,
176 int msg_result,
177 gboolean processed)
178 {
179 g_return_if_fail(dfsm != NULL);
180 g_return_if_fail(dfsm->results != NULL);
181
182 g_mutex_lock (&dfsm->sync_mutex);
183 dfsm_result_t *rp = (dfsm_result_t *)g_hash_table_lookup(dfsm->results, &msg_count);
184 if (rp) {
185 rp->result = msg_result;
186 rp->processed = processed;
187 }
188 dfsm->msgcount_rcvd = msg_count;
189 g_cond_broadcast (&dfsm->sync_cond);
190 g_mutex_unlock (&dfsm->sync_mutex);
191 }
192
193 static cs_error_t
194 dfsm_send_message_full(
195 dfsm_t *dfsm,
196 struct iovec *iov,
197 unsigned int len,
198 int retry)
199 {
200 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
201 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
202
203 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
204 cs_error_t result;
205 int retries = 0;
206 loop:
207 result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
208 if (retry && result == CS_ERR_TRY_AGAIN) {
209 nanosleep(&tvreq, NULL);
210 ++retries;
211 if ((retries % 10) == 0)
212 cfs_dom_message(dfsm->log_domain, "cpg_send_message retry %d", retries);
213 if (retries < 100)
214 goto loop;
215 }
216
217 if (retries)
218 cfs_dom_message(dfsm->log_domain, "cpg_send_message retried %d times", retries);
219
220 if (result != CS_OK &&
221 (!retry || result != CS_ERR_TRY_AGAIN))
222 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
223
224 return result;
225 }
226
227 static cs_error_t
228 dfsm_send_state_message_full(
229 dfsm_t *dfsm,
230 uint16_t type,
231 struct iovec *iov,
232 unsigned int len)
233 {
234 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
235 g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type), CS_ERR_INVALID_PARAM);
236 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
237
238 dfsm_message_state_header_t header;
239 header.base.type = type;
240 header.base.subtype = 0;
241 header.base.protocol_version = dfsm->protocol_version;
242 header.base.time = time(NULL);
243 header.base.reserved = 0;
244
245 header.epoch = dfsm->sync_epoch;
246
247 struct iovec real_iov[len + 1];
248
249 real_iov[0].iov_base = (char *)&header;
250 real_iov[0].iov_len = sizeof(header);
251
252 for (int i = 0; i < len; i++)
253 real_iov[i + 1] = iov[i];
254
255 return dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
256 }
257
258 cs_error_t
259 dfsm_send_update(
260 dfsm_t *dfsm,
261 struct iovec *iov,
262 unsigned int len)
263 {
264 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE, iov, len);
265 }
266
267 cs_error_t
268 dfsm_send_update_complete(dfsm_t *dfsm)
269 {
270 return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE_COMPLETE, NULL, 0);
271 }
272
273
274 cs_error_t
275 dfsm_send_message(
276 dfsm_t *dfsm,
277 uint16_t msgtype,
278 struct iovec *iov,
279 int len)
280 {
281 return dfsm_send_message_sync(dfsm, msgtype, iov, len, NULL);
282 }
283
284 cs_error_t
285 dfsm_send_message_sync(
286 dfsm_t *dfsm,
287 uint16_t msgtype,
288 struct iovec *iov,
289 int len,
290 dfsm_result_t *rp)
291 {
292 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
293 g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
294
295 g_mutex_lock (&dfsm->sync_mutex);
296 /* note: hold lock until message is sent - to guarantee ordering */
297 uint64_t msgcount = ++dfsm->msgcount;
298 if (rp) {
299 rp->msgcount = msgcount;
300 rp->processed = 0;
301 g_hash_table_replace(dfsm->results, &rp->msgcount, rp);
302 }
303
304 dfsm_message_normal_header_t header;
305 header.base.type = DFSM_MESSAGE_NORMAL;
306 header.base.subtype = msgtype;
307 header.base.protocol_version = dfsm->protocol_version;
308 header.base.time = time(NULL);
309 header.base.reserved = 0;
310 header.count = msgcount;
311
312 struct iovec real_iov[len + 1];
313
314 real_iov[0].iov_base = (char *)&header;
315 real_iov[0].iov_len = sizeof(header);
316
317 for (int i = 0; i < len; i++)
318 real_iov[i + 1] = iov[i];
319
320 cs_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
321
322 g_mutex_unlock (&dfsm->sync_mutex);
323
324 if (result != CS_OK) {
325 cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
326
327 if (rp) {
328 g_mutex_lock (&dfsm->sync_mutex);
329 g_hash_table_remove(dfsm->results, &rp->msgcount);
330 g_mutex_unlock (&dfsm->sync_mutex);
331 }
332 return result;
333 }
334
335 if (rp) {
336 g_mutex_lock (&dfsm->sync_mutex);
337
338 while (dfsm->msgcount_rcvd < msgcount)
339 g_cond_wait (&dfsm->sync_cond, &dfsm->sync_mutex);
340
341
342 g_hash_table_remove(dfsm->results, &rp->msgcount);
343
344 g_mutex_unlock (&dfsm->sync_mutex);
345
346 return rp->processed ? CS_OK : CS_ERR_FAILED_OPERATION;
347 }
348
349 return CS_OK;
350 }
351
352 static gboolean
353 dfsm_send_checksum(dfsm_t *dfsm)
354 {
355 g_return_val_if_fail(dfsm != NULL, FALSE);
356
357 int len = 2;
358 struct iovec iov[len];
359
360 iov[0].iov_base = (char *)&dfsm->csum_id;
361 iov[0].iov_len = sizeof(dfsm->csum_id);
362 iov[1].iov_base = dfsm->csum;
363 iov[1].iov_len = sizeof(dfsm->csum);
364
365 gboolean res = (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY, iov, len) == CS_OK);
366
367 return res;
368 }
369
370 static void
371 dfsm_free_queue_entry(gpointer data)
372 {
373 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)data;
374 g_free (qm->msg);
375 g_free (qm);
376 }
377
378 static void
379 dfsm_free_message_queue(dfsm_t *dfsm)
380 {
381 g_return_if_fail(dfsm != NULL);
382 g_return_if_fail(dfsm->msg_queue != NULL);
383
384 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
385 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
386 while (iter != end) {
387 GSequenceIter *cur = iter;
388 iter = g_sequence_iter_next(iter);
389 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
390 g_sequence_get(cur);
391 dfsm_free_queue_entry(qm);
392 g_sequence_remove(cur);
393 }
394 }
395
396 static void
397 dfsm_free_sync_queue(dfsm_t *dfsm)
398 {
399 g_return_if_fail(dfsm != NULL);
400
401 GList *iter = dfsm->sync_queue;
402 while (iter) {
403 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
404 iter = g_list_next(iter);
405 dfsm_free_queue_entry(qm);
406 }
407
408 g_list_free(dfsm->sync_queue);
409 dfsm->sync_queue = NULL;
410 }
411
412 static gint
413 message_queue_sort_fn(
414 gconstpointer a,
415 gconstpointer b,
416 gpointer user_data)
417 {
418 return ((dfsm_queued_message_t *)a)->msg_count -
419 ((dfsm_queued_message_t *)b)->msg_count;
420 }
421
422 static dfsm_node_info_t *
423 dfsm_node_info_lookup(
424 dfsm_t *dfsm,
425 uint32_t nodeid,
426 uint32_t pid)
427 {
428 g_return_val_if_fail(dfsm != NULL, NULL);
429 g_return_val_if_fail(dfsm->members != NULL, NULL);
430
431 dfsm_node_info_t info = { .nodeid = nodeid, .pid = pid };
432
433 return (dfsm_node_info_t *)g_hash_table_lookup(dfsm->members, &info);
434 }
435
436 static dfsm_queued_message_t *
437 dfsm_queue_add_message(
438 dfsm_t *dfsm,
439 uint32_t nodeid,
440 uint32_t pid,
441 uint64_t msg_count,
442 const void *msg,
443 size_t msg_len)
444 {
445 g_return_val_if_fail(dfsm != NULL, NULL);
446 g_return_val_if_fail(msg != NULL, NULL);
447 g_return_val_if_fail(msg_len != 0, NULL);
448
449 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, nodeid, pid);
450 if (!ni) {
451 cfs_dom_critical(dfsm->log_domain, "dfsm_node_info_lookup failed");
452 return NULL;
453 }
454
455 dfsm_queued_message_t *qm = g_new0(dfsm_queued_message_t, 1);
456 g_return_val_if_fail(qm != NULL, NULL);
457
458 qm->nodeid = nodeid;
459 qm->pid = pid;
460 qm->msg = g_memdup (msg, msg_len);
461 qm->msg_len = msg_len;
462 qm->msg_count = msg_count;
463
464 if (dfsm->mode == DFSM_MODE_UPDATE && ni->synced) {
465 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
466 } else {
467 /* NOTE: we only need to sort the queue because we resend all
468 * queued messages sometimes.
469 */
470 g_sequence_insert_sorted(dfsm->msg_queue, qm, message_queue_sort_fn, NULL);
471 }
472
473 return qm;
474 }
475
476 static guint
477 dfsm_sync_info_hash(gconstpointer key)
478 {
479 dfsm_node_info_t *info = (dfsm_node_info_t *)key;
480
481 return g_int_hash(&info->nodeid) + g_int_hash(&info->pid);
482 }
483
484 static gboolean
485 dfsm_sync_info_equal(
486 gconstpointer v1,
487 gconstpointer v2)
488 {
489 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
490 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
491
492 if (info1->nodeid == info2->nodeid &&
493 info1->pid == info2->pid)
494 return TRUE;
495
496 return FALSE;
497 }
498
499 static int
500 dfsm_sync_info_compare(
501 gconstpointer v1,
502 gconstpointer v2)
503 {
504 dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
505 dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
506
507 if (info1->nodeid != info2->nodeid)
508 return info1->nodeid - info2->nodeid;
509
510 return info1->pid - info2->pid;
511 }
512
513 static void
514 dfsm_set_mode(
515 dfsm_t *dfsm,
516 dfsm_mode_t new_mode)
517 {
518 g_return_if_fail(dfsm != NULL);
519
520 cfs_debug("dfsm_set_mode - set mode to %d", new_mode);
521
522 int changed = 0;
523 g_mutex_lock (&dfsm->mode_mutex);
524 if (dfsm->mode != new_mode) {
525 if (new_mode < DFSM_ERROR_MODE_START ||
526 (dfsm->mode < DFSM_ERROR_MODE_START || new_mode >= dfsm->mode)) {
527 dfsm->mode = new_mode;
528 changed = 1;
529 }
530 }
531 g_mutex_unlock (&dfsm->mode_mutex);
532
533 if (!changed)
534 return;
535
536 if (new_mode == DFSM_MODE_START) {
537 cfs_dom_message(dfsm->log_domain, "start cluster connection");
538 } else if (new_mode == DFSM_MODE_START_SYNC) {
539 cfs_dom_message(dfsm->log_domain, "starting data syncronisation");
540 } else if (new_mode == DFSM_MODE_SYNCED) {
541 cfs_dom_message(dfsm->log_domain, "all data is up to date");
542 if (dfsm->dfsm_callbacks->dfsm_synced_fn)
543 dfsm->dfsm_callbacks->dfsm_synced_fn(dfsm);
544 } else if (new_mode == DFSM_MODE_UPDATE) {
545 cfs_dom_message(dfsm->log_domain, "waiting for updates from leader");
546 } else if (new_mode == DFSM_MODE_LEAVE) {
547 cfs_dom_critical(dfsm->log_domain, "leaving CPG group");
548 } else if (new_mode == DFSM_MODE_ERROR) {
549 cfs_dom_critical(dfsm->log_domain, "serious internal error - stop cluster connection");
550 } else if (new_mode == DFSM_MODE_VERSION_ERROR) {
551 cfs_dom_critical(dfsm->log_domain, "detected newer protocol - please update this node");
552 }
553 }
554
555 static dfsm_mode_t
556 dfsm_get_mode(dfsm_t *dfsm)
557 {
558 g_return_val_if_fail(dfsm != NULL, DFSM_MODE_ERROR);
559
560 g_mutex_lock (&dfsm->mode_mutex);
561 dfsm_mode_t mode = dfsm->mode;
562 g_mutex_unlock (&dfsm->mode_mutex);
563
564 return mode;
565 }
566
567 gboolean
568 dfsm_restartable(dfsm_t *dfsm)
569 {
570 dfsm_mode_t mode = dfsm_get_mode(dfsm);
571
572 return !(mode == DFSM_MODE_ERROR ||
573 mode == DFSM_MODE_VERSION_ERROR);
574 }
575
576 void
577 dfsm_set_errormode(dfsm_t *dfsm)
578 {
579 dfsm_set_mode(dfsm, DFSM_MODE_ERROR);
580 }
581
582 static void
583 dfsm_release_sync_resources(
584 dfsm_t *dfsm,
585 const struct cpg_address *member_list,
586 size_t member_list_entries)
587 {
588 g_return_if_fail(dfsm != NULL);
589 g_return_if_fail(dfsm->members != NULL);
590 g_return_if_fail(!member_list_entries || member_list != NULL);
591
592 cfs_debug("enter dfsm_release_sync_resources");
593
594 if (dfsm->sync_info) {
595
596 if (dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn) {
597 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
598 dfsm->sync_info->data = NULL;
599 }
600
601 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
602 if (dfsm->sync_info->nodes[i].state) {
603 g_free(dfsm->sync_info->nodes[i].state);
604 dfsm->sync_info->nodes[i].state = NULL;
605 dfsm->sync_info->nodes[i].state_len = 0;
606 }
607 }
608 }
609
610 if (member_list) {
611
612 g_hash_table_remove_all(dfsm->members);
613
614 if (dfsm->sync_info)
615 g_free(dfsm->sync_info);
616
617 int size = sizeof(dfsm_sync_info_t) +
618 member_list_entries*sizeof(dfsm_sync_info_t);
619 dfsm_sync_info_t *sync_info = dfsm->sync_info = g_malloc0(size);
620 sync_info->node_count = member_list_entries;
621
622 for (int i = 0; i < member_list_entries; i++) {
623 sync_info->nodes[i].nodeid = member_list[i].nodeid;
624 sync_info->nodes[i].pid = member_list[i].pid;
625 }
626
627 qsort(sync_info->nodes, member_list_entries, sizeof(dfsm_node_info_t),
628 dfsm_sync_info_compare);
629
630 for (int i = 0; i < member_list_entries; i++) {
631 dfsm_node_info_t *info = &sync_info->nodes[i];
632 g_hash_table_insert(dfsm->members, info, info);
633 if (info->nodeid == dfsm->nodeid && info->pid == dfsm->pid)
634 sync_info->local = info;
635 }
636 }
637 }
638
639 static void
640 dfsm_cpg_deliver_callback(
641 cpg_handle_t handle,
642 const struct cpg_name *group_name,
643 uint32_t nodeid,
644 uint32_t pid,
645 void *msg,
646 size_t msg_len)
647 {
648 cs_error_t result;
649
650 dfsm_t *dfsm = NULL;
651 result = cpg_context_get(handle, (gpointer *)&dfsm);
652 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
653 cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
654 return; /* we have no valid dfsm pointer, so we can just ignore this */
655 }
656 dfsm_mode_t mode = dfsm_get_mode(dfsm);
657
658 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
659
660 if (mode >= DFSM_ERROR_MODE_START) {
661 cfs_dom_debug(dfsm->log_domain, "error mode - ignoring message");
662 goto leave;
663 }
664
665 if (!dfsm->sync_info) {
666 cfs_dom_critical(dfsm->log_domain, "no dfsm_sync_info - internal error");
667 goto leave;
668 }
669
670 if (msg_len < sizeof(dfsm_message_header_t)) {
671 cfs_dom_critical(dfsm->log_domain, "received short message (%zd bytes)", msg_len);
672 goto leave;
673 }
674
675 dfsm_message_header_t *base_header = (dfsm_message_header_t *)msg;
676
677 if (base_header->protocol_version > dfsm->protocol_version) {
678 cfs_dom_critical(dfsm->log_domain, "received message with protocol version %d",
679 base_header->protocol_version);
680 dfsm_set_mode(dfsm, DFSM_MODE_VERSION_ERROR);
681 return;
682 } else if (base_header->protocol_version < dfsm->protocol_version) {
683 cfs_dom_message(dfsm->log_domain, "ignore message with wrong protocol version %d",
684 base_header->protocol_version);
685 return;
686 }
687
688 if (base_header->type == DFSM_MESSAGE_NORMAL) {
689
690 dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
691
692 if (msg_len < sizeof(dfsm_message_normal_header_t)) {
693 cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %zd bytes)",
694 base_header->type, base_header->subtype, msg_len);
695 goto leave;
696 }
697
698 if (mode != DFSM_MODE_SYNCED) {
699 cfs_dom_debug(dfsm->log_domain, "queue message %" PRIu64 " (subtype = %d, length = %zd)",
700 header->count, base_header->subtype, msg_len);
701
702 if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
703 goto leave;
704 } else {
705
706 int msg_res = -1;
707 int res = dfsm->dfsm_callbacks->dfsm_deliver_fn(
708 dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype,
709 base_header->time, msg + sizeof(dfsm_message_normal_header_t),
710 msg_len - sizeof(dfsm_message_normal_header_t));
711
712 if (nodeid == dfsm->nodeid && pid == dfsm->pid)
713 dfsm_record_local_result(dfsm, header->count, msg_res, res);
714
715 if (res < 0)
716 goto leave;
717 }
718
719 return;
720 }
721
722 /* state related messages
723 * we needs right epoch - else we simply discard the message
724 */
725
726 dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
727
728 if (msg_len < sizeof(dfsm_message_state_header_t)) {
729 cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %zd bytes)",
730 base_header->type, base_header->subtype, msg_len);
731 goto leave;
732 }
733
734 if (base_header->type != DFSM_MESSAGE_SYNC_START &&
735 (memcmp(&header->epoch, &dfsm->sync_epoch, sizeof(dfsm_sync_epoch_t)) != 0)) {
736 cfs_dom_debug(dfsm->log_domain, "ignore message (msg_type == %d) with "
737 "wrong epoch (epoch %d/%d/%08X)", base_header->type,
738 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
739 return;
740 }
741
742 msg += sizeof(dfsm_message_state_header_t);
743 msg_len -= sizeof(dfsm_message_state_header_t);
744
745 if (mode == DFSM_MODE_SYNCED) {
746 if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
747
748 for (int i = 0; i < dfsm->sync_info->node_count; i++)
749 dfsm->sync_info->nodes[i].synced = 1;
750
751 if (!dfsm_deliver_queue(dfsm))
752 goto leave;
753
754 return;
755
756 } else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
757
758 if (msg_len != sizeof(dfsm->csum_counter)) {
759 cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%zd bytes) form node %d/%d", msg_len, nodeid, pid);
760 goto leave;
761 }
762
763 uint64_t csum_id = *((uint64_t *)msg);
764 msg += 8; msg_len -= 8;
765
766 cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016" PRIX64, nodeid, csum_id);
767
768 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
769 if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
770 dfsm, dfsm->data, dfsm->csum, sizeof(dfsm->csum))) {
771 cfs_dom_critical(dfsm->log_domain, "unable to compute data checksum");
772 goto leave;
773 }
774
775 dfsm->csum_epoch = header->epoch;
776 dfsm->csum_id = csum_id;
777
778 if (nodeid == dfsm->nodeid && pid == dfsm->pid) {
779 if (!dfsm_send_checksum(dfsm))
780 goto leave;
781 }
782 }
783
784 return;
785
786 } else if (base_header->type == DFSM_MESSAGE_VERIFY) {
787
788 cfs_dom_debug(dfsm->log_domain, "received verify message");
789
790 if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
791
792 if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
793 cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%zd bytes)", msg_len);
794 goto leave;
795 }
796
797 uint64_t csum_id = *((uint64_t *)msg);
798 msg += 8; msg_len -= 8;
799
800 if (dfsm->csum_id == csum_id &&
801 (memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
802 if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
803 cfs_dom_critical(dfsm->log_domain, "wrong checksum %016" PRIX64 " != %016" PRIX64 " - restarting",
804 *(uint64_t *)msg, *(uint64_t *)dfsm->csum);
805 goto leave;
806 } else {
807 cfs_dom_message(dfsm->log_domain, "data verification successful");
808 }
809 } else {
810 cfs_dom_message(dfsm->log_domain, "skip verification - no checksum saved");
811 }
812 }
813
814 return;
815
816 } else {
817 /* ignore (we already got all required updates, or we are leader) */
818 cfs_dom_debug(dfsm->log_domain, "ignore state sync message %d",
819 base_header->type);
820 return;
821 }
822
823 } else if (mode == DFSM_MODE_START_SYNC) {
824
825 if (base_header->type == DFSM_MESSAGE_SYNC_START) {
826
827 if (nodeid != dfsm->lowest_nodeid) {
828 cfs_dom_critical(dfsm->log_domain, "ignore sync request from wrong member %d/%d",
829 nodeid, pid);
830 }
831
832 cfs_dom_message(dfsm->log_domain, "received sync request (epoch %d/%d/%08X)",
833 header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
834
835 dfsm->sync_epoch = header->epoch;
836
837 dfsm_release_sync_resources(dfsm, NULL, 0);
838
839 unsigned int state_len = 0;
840 gpointer state = NULL;
841
842 state = dfsm->dfsm_callbacks->dfsm_get_state_fn(dfsm, dfsm->data, &state_len);
843
844 if (!(state && state_len)) {
845 cfs_dom_critical(dfsm->log_domain, "dfsm_get_state_fn failed");
846 goto leave;
847 }
848
849 struct iovec iov[1];
850 iov[0].iov_base = state;
851 iov[0].iov_len = state_len;
852
853 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_STATE, iov, 1);
854
855 if (state)
856 g_free(state);
857
858 if (result != CS_OK)
859 goto leave;
860
861 return;
862
863 } else if (base_header->type == DFSM_MESSAGE_STATE) {
864
865 dfsm_node_info_t *ni;
866
867 if (!(ni = dfsm_node_info_lookup(dfsm, nodeid, pid))) {
868 cfs_dom_critical(dfsm->log_domain, "received state for non-member %d/%d", nodeid, pid);
869 goto leave;
870 }
871
872 if (ni->state) {
873 cfs_dom_critical(dfsm->log_domain, "received duplicate state for member %d/%d", nodeid, pid);
874 goto leave;
875 }
876
877 ni->state = g_memdup(msg, msg_len);
878 ni->state_len = msg_len;
879
880 int received_all = 1;
881 for (int i = 0; i < dfsm->sync_info->node_count; i++) {
882 if (!dfsm->sync_info->nodes[i].state) {
883 received_all = 0;
884 break;
885 }
886 }
887
888 if (received_all) {
889 cfs_dom_message(dfsm->log_domain, "received all states");
890
891 int res = dfsm->dfsm_callbacks->dfsm_process_state_update_fn(dfsm, dfsm->data, dfsm->sync_info);
892 if (res < 0)
893 goto leave;
894
895 if (dfsm->sync_info->local->synced) {
896 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
897 dfsm_release_sync_resources(dfsm, NULL, 0);
898
899 if (!dfsm_deliver_queue(dfsm))
900 goto leave;
901
902 } else {
903 dfsm_set_mode(dfsm, DFSM_MODE_UPDATE);
904
905 if (!dfsm_deliver_queue(dfsm))
906 goto leave;
907 }
908
909 }
910
911 return;
912 }
913
914 } else if (mode == DFSM_MODE_UPDATE) {
915
916 if (base_header->type == DFSM_MESSAGE_UPDATE) {
917
918 int res = dfsm->dfsm_callbacks->dfsm_process_update_fn(
919 dfsm, dfsm->data, dfsm->sync_info, nodeid, pid, msg, msg_len);
920
921 if (res < 0)
922 goto leave;
923
924 return;
925
926 } else if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
927
928
929 int res = dfsm->dfsm_callbacks->dfsm_commit_fn(dfsm, dfsm->data, dfsm->sync_info);
930
931 if (res < 0)
932 goto leave;
933
934 for (int i = 0; i < dfsm->sync_info->node_count; i++)
935 dfsm->sync_info->nodes[i].synced = 1;
936
937 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
938
939 if (!dfsm_deliver_sync_queue(dfsm))
940 goto leave;
941
942 if (!dfsm_deliver_queue(dfsm))
943 goto leave;
944
945 dfsm_release_sync_resources(dfsm, NULL, 0);
946
947 return;
948 }
949
950 } else {
951 cfs_dom_critical(dfsm->log_domain, "internal error - unknown mode %d", mode);
952 goto leave;
953 }
954
955 if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST ||
956 base_header->type == DFSM_MESSAGE_VERIFY) {
957
958 cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
959
960 } else {
961 cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %zd bytes)",
962 base_header->type, msg_len);
963 goto leave;
964 }
965
966 leave:
967 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
968 dfsm_release_sync_resources(dfsm, NULL, 0);
969 return;
970 }
971
972 static gboolean
973 dfsm_resend_queue(dfsm_t *dfsm)
974 {
975 g_return_val_if_fail(dfsm != NULL, FALSE);
976 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
977
978 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
979 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
980 gboolean res = TRUE;
981
982 while (iter != end) {
983 GSequenceIter *cur = iter;
984 iter = g_sequence_iter_next(iter);
985
986 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
987 g_sequence_get(cur);
988
989 if (qm->nodeid == dfsm->nodeid && qm->pid == dfsm->pid) {
990 cs_error_t result;
991 struct iovec iov[1];
992 iov[0].iov_base = qm->msg;
993 iov[0].iov_len = qm->msg_len;
994
995 if ((result = dfsm_send_message_full(dfsm, iov, 1, 1)) != CS_OK) {
996 res = FALSE;
997 break;
998 }
999 }
1000 }
1001
1002 dfsm_free_message_queue(dfsm);
1003
1004 return res;
1005 }
1006
1007 static gboolean
1008 dfsm_deliver_sync_queue(dfsm_t *dfsm)
1009 {
1010 g_return_val_if_fail(dfsm != NULL, FALSE);
1011
1012 if (!dfsm->sync_queue)
1013 return TRUE;
1014
1015 gboolean res = TRUE;
1016
1017 // fixme: cfs_debug
1018 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__,
1019 g_list_length(dfsm->sync_queue));
1020
1021 GList *iter = dfsm->sync_queue;
1022 while (iter) {
1023 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
1024 iter = g_list_next(iter);
1025
1026 if (res && dfsm->mode == DFSM_MODE_SYNCED) {
1027 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1028 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1029 } else {
1030 res = FALSE;
1031 }
1032
1033 dfsm_free_queue_entry(qm);
1034 }
1035 g_list_free(dfsm->sync_queue);
1036 dfsm->sync_queue = NULL;
1037
1038 return res;
1039 }
1040
1041 static gboolean
1042 dfsm_deliver_queue(dfsm_t *dfsm)
1043 {
1044 g_return_val_if_fail(dfsm != NULL, FALSE);
1045 g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
1046
1047 int qlen = g_sequence_get_length(dfsm->msg_queue);
1048 if (!qlen)
1049 return TRUE;
1050
1051 GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
1052 GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
1053 gboolean res = TRUE;
1054
1055 // fixme: cfs_debug
1056 cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, qlen);
1057
1058 while (iter != end) {
1059 GSequenceIter *cur = iter;
1060 iter = g_sequence_iter_next(iter);
1061
1062 dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
1063 g_sequence_get(cur);
1064
1065 dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, qm->nodeid, qm->pid);
1066 if (!ni) {
1067 cfs_dom_message(dfsm->log_domain, "remove message from non-member %d/%d",
1068 qm->nodeid, qm->pid);
1069 dfsm_free_queue_entry(qm);
1070 g_sequence_remove(cur);
1071 continue;
1072 }
1073
1074 if (dfsm->mode == DFSM_MODE_SYNCED) {
1075 if (ni->synced) {
1076 dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
1077 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
1078 dfsm_free_queue_entry(qm);
1079 g_sequence_remove(cur);
1080 }
1081 } else if (dfsm->mode == DFSM_MODE_UPDATE) {
1082 if (ni->synced) {
1083 dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
1084 g_sequence_remove(cur);
1085 }
1086 } else {
1087 res = FALSE;
1088 break;
1089 }
1090 }
1091
1092 return res;
1093 }
1094
1095 static void
1096 dfsm_cpg_confchg_callback(
1097 cpg_handle_t handle,
1098 const struct cpg_name *group_name,
1099 const struct cpg_address *member_list,
1100 size_t member_list_entries,
1101 const struct cpg_address *left_list,
1102 size_t left_list_entries,
1103 const struct cpg_address *joined_list,
1104 size_t joined_list_entries)
1105 {
1106 cs_error_t result;
1107
1108 dfsm_t *dfsm = NULL;
1109 result = cpg_context_get(handle, (gpointer *)&dfsm);
1110 if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
1111 cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
1112 return; /* we have no valid dfsm pointer, so we can just ignore this */
1113 }
1114
1115 dfsm->we_are_member = 0;
1116
1117 /* create new epoch */
1118 dfsm->local_epoch_counter++;
1119 dfsm->sync_epoch.epoch = dfsm->local_epoch_counter;
1120 dfsm->sync_epoch.nodeid = dfsm->nodeid;
1121 dfsm->sync_epoch.pid = dfsm->pid;
1122 dfsm->sync_epoch.time = time(NULL);
1123
1124 /* invalidate saved checksum */
1125 dfsm->csum_id = dfsm->csum_counter;
1126 memset(&dfsm->csum_epoch, 0, sizeof(dfsm->csum_epoch));
1127
1128 dfsm_free_sync_queue(dfsm);
1129
1130 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1131
1132 cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
1133
1134 if (mode >= DFSM_ERROR_MODE_START) {
1135 cfs_dom_debug(dfsm->log_domain, "already left group - ignore message");
1136 return;
1137 }
1138
1139 int lowest_nodeid = 0;
1140 GString *str = g_string_new("members: ");
1141 for (int i = 0; i < member_list_entries; i++) {
1142
1143 g_string_append_printf(str, i ? ", %d/%d" : "%d/%d",
1144 member_list[i].nodeid, member_list[i].pid);
1145
1146 if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid)
1147 lowest_nodeid = member_list[i].nodeid;
1148
1149 if (member_list[i].nodeid == dfsm->nodeid &&
1150 member_list[i].pid == dfsm->pid)
1151 dfsm->we_are_member = 1;
1152 }
1153
1154
1155 if ((dfsm->we_are_member || mode != DFSM_MODE_START))
1156 cfs_dom_message(dfsm->log_domain, str->str);
1157
1158 g_string_free(str, 1);
1159
1160 dfsm->lowest_nodeid = lowest_nodeid;
1161
1162 /* NOTE: one node can be in left and joined list at the same time,
1163 so it is better to query member list. Also JOIN/LEAVE list are
1164 different on different nodes!
1165 */
1166
1167 dfsm_release_sync_resources(dfsm, member_list, member_list_entries);
1168
1169 if (!dfsm->we_are_member) {
1170 if (mode == DFSM_MODE_START) {
1171 cfs_dom_debug(dfsm->log_domain, "ignore leave message");
1172 return;
1173 }
1174 cfs_dom_message(dfsm->log_domain, "we (%d/%d) left the process group",
1175 dfsm->nodeid, dfsm->pid);
1176 goto leave;
1177 }
1178
1179 if (member_list_entries > 1) {
1180
1181 int qlen = g_sequence_get_length(dfsm->msg_queue);
1182 if (joined_list_entries && qlen) {
1183 /* we need to make sure that all members have the same queue. */
1184 cfs_dom_message(dfsm->log_domain, "queue not emtpy - resening %d messages", qlen);
1185 if (!dfsm_resend_queue(dfsm)) {
1186 cfs_dom_critical(dfsm->log_domain, "dfsm_resend_queue failed");
1187 goto leave;
1188 }
1189 }
1190
1191 dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC);
1192 if (lowest_nodeid == dfsm->nodeid) {
1193 if (!dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0)) {
1194 cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message");
1195 goto leave;
1196 }
1197 }
1198 } else {
1199 dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
1200 dfsm->sync_info->local->synced = 1;
1201 if (!dfsm_deliver_queue(dfsm))
1202 goto leave;
1203 }
1204
1205 if (dfsm->dfsm_callbacks->dfsm_confchg_fn)
1206 dfsm->dfsm_callbacks->dfsm_confchg_fn(dfsm, dfsm->data, member_list, member_list_entries);
1207
1208 return;
1209 leave:
1210 dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
1211 return;
1212 }
1213
1214 static cpg_callbacks_t cpg_callbacks = {
1215 .cpg_deliver_fn = dfsm_cpg_deliver_callback,
1216 .cpg_confchg_fn = dfsm_cpg_confchg_callback,
1217 };
1218
1219 dfsm_t *
1220 dfsm_new(
1221 gpointer data,
1222 const char *group_name,
1223 const char *log_domain,
1224 guint32 protocol_version,
1225 dfsm_callbacks_t *callbacks)
1226 {
1227 g_return_val_if_fail(sizeof(dfsm_message_header_t) == 16, NULL);
1228 g_return_val_if_fail(sizeof(dfsm_message_state_header_t) == 32, NULL);
1229 g_return_val_if_fail(sizeof(dfsm_message_normal_header_t) == 24, NULL);
1230
1231 g_return_val_if_fail(callbacks != NULL, NULL);
1232 g_return_val_if_fail(callbacks->dfsm_deliver_fn != NULL, NULL);
1233
1234 g_return_val_if_fail(callbacks->dfsm_get_state_fn != NULL, NULL);
1235 g_return_val_if_fail(callbacks->dfsm_process_state_update_fn != NULL, NULL);
1236 g_return_val_if_fail(callbacks->dfsm_process_update_fn != NULL, NULL);
1237 g_return_val_if_fail(callbacks->dfsm_commit_fn != NULL, NULL);
1238
1239 dfsm_t *dfsm;
1240
1241 if ((dfsm = g_new0(dfsm_t, 1)) == NULL)
1242 return NULL;
1243
1244 g_mutex_init(&dfsm->sync_mutex);
1245
1246 g_cond_init(&dfsm->sync_cond);
1247
1248 if (!(dfsm->results = g_hash_table_new(g_int64_hash, g_int64_equal)))
1249 goto err;
1250
1251 if (!(dfsm->msg_queue = g_sequence_new(NULL)))
1252 goto err;
1253
1254 dfsm->log_domain = log_domain;
1255 dfsm->data = data;
1256 dfsm->mode = DFSM_MODE_START;
1257 dfsm->protocol_version = protocol_version;
1258 strcpy (dfsm->cpg_group_name.value, group_name);
1259 dfsm->cpg_group_name.length = strlen (group_name) + 1;
1260
1261 dfsm->cpg_callbacks = &cpg_callbacks;
1262 dfsm->dfsm_callbacks = callbacks;
1263
1264 dfsm->members = g_hash_table_new(dfsm_sync_info_hash, dfsm_sync_info_equal);
1265 if (!dfsm->members)
1266 goto err;
1267
1268 g_mutex_init(&dfsm->mode_mutex);
1269
1270 return dfsm;
1271
1272 err:
1273 dfsm_destroy(dfsm);
1274 return NULL;
1275 }
1276
1277 gboolean
1278 dfsm_is_initialized(dfsm_t *dfsm)
1279 {
1280 g_return_val_if_fail(dfsm != NULL, FALSE);
1281
1282 return (dfsm->cpg_handle != 0) ? TRUE : FALSE;
1283 }
1284
1285 gboolean
1286 dfsm_lowest_nodeid(dfsm_t *dfsm)
1287 {
1288 g_return_val_if_fail(dfsm != NULL, FALSE);
1289
1290 if (dfsm->lowest_nodeid && (dfsm->lowest_nodeid == dfsm->nodeid))
1291 return TRUE;
1292
1293 return FALSE;
1294 }
1295
1296 cs_error_t
1297 dfsm_verify_request(dfsm_t *dfsm)
1298 {
1299 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1300
1301 /* only do when we have lowest nodeid */
1302 if (!dfsm->lowest_nodeid || (dfsm->lowest_nodeid != dfsm->nodeid))
1303 return CS_OK;
1304
1305 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1306 if (mode != DFSM_MODE_SYNCED)
1307 return CS_OK;
1308
1309 int len = 1;
1310 struct iovec iov[len];
1311
1312 if (dfsm->csum_counter != dfsm->csum_id) {
1313 g_message("delay verify request %016" PRIX64, dfsm->csum_counter + 1);
1314 return CS_OK;
1315 };
1316
1317 dfsm->csum_counter++;
1318 iov[0].iov_base = (char *)&dfsm->csum_counter;
1319 iov[0].iov_len = sizeof(dfsm->csum_counter);
1320
1321 cfs_debug("send verify request %016" PRIX64, dfsm->csum_counter);
1322
1323 cs_error_t result;
1324 result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
1325
1326 if (result != CS_OK)
1327 cfs_dom_critical(dfsm->log_domain, "failed to send VERIFY_REQUEST message");
1328
1329 return result;
1330 }
1331
1332
1333 cs_error_t
1334 dfsm_dispatch(
1335 dfsm_t *dfsm,
1336 cs_dispatch_flags_t dispatch_types)
1337 {
1338 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1339 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_INVALID_PARAM);
1340
1341 cs_error_t result;
1342
1343 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1344 int retries = 0;
1345 loop:
1346 result = cpg_dispatch(dfsm->cpg_handle, dispatch_types);
1347 if (result == CS_ERR_TRY_AGAIN) {
1348 nanosleep(&tvreq, NULL);
1349 ++retries;
1350 if ((retries % 10) == 0)
1351 cfs_dom_message(dfsm->log_domain, "cpg_dispatch retry %d", retries);
1352 goto loop;
1353 }
1354
1355 if (!(result == CS_OK || result == CS_ERR_TRY_AGAIN)) {
1356 cfs_dom_critical(dfsm->log_domain, "cpg_dispatch failed: %d", result);
1357 }
1358
1359 return result;
1360 }
1361
1362
1363 cs_error_t
1364 dfsm_initialize(dfsm_t *dfsm, int *fd)
1365 {
1366 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1367 g_return_val_if_fail(fd != NULL, CS_ERR_INVALID_PARAM);
1368
1369 /* remove old messages */
1370 dfsm_free_message_queue(dfsm);
1371 dfsm_send_sync_message_abort(dfsm);
1372
1373 dfsm->joined = FALSE;
1374 dfsm->we_are_member = 0;
1375
1376 dfsm_set_mode(dfsm, DFSM_MODE_START);
1377
1378 cs_error_t result;
1379
1380 if (dfsm->cpg_handle == 0) {
1381 if ((result = cpg_initialize(&dfsm->cpg_handle, dfsm->cpg_callbacks)) != CS_OK) {
1382 cfs_dom_critical(dfsm->log_domain, "cpg_initialize failed: %d", result);
1383 goto err_no_finalize;
1384 }
1385
1386 if ((result = cpg_local_get(dfsm->cpg_handle, &dfsm->nodeid)) != CS_OK) {
1387 cfs_dom_critical(dfsm->log_domain, "cpg_local_get failed: %d", result);
1388 goto err_finalize;
1389 }
1390
1391 dfsm->pid = getpid();
1392
1393 result = cpg_context_set(dfsm->cpg_handle, dfsm);
1394 if (result != CS_OK) {
1395 cfs_dom_critical(dfsm->log_domain, "cpg_context_set failed: %d", result);
1396 goto err_finalize;
1397 }
1398 }
1399
1400 result = cpg_fd_get(dfsm->cpg_handle, fd);
1401 if (result != CS_OK) {
1402 cfs_dom_critical(dfsm->log_domain, "cpg_fd_get failed: %d", result);
1403 goto err_finalize;
1404 }
1405
1406 return CS_OK;
1407
1408 err_finalize:
1409 cpg_finalize(dfsm->cpg_handle);
1410 err_no_finalize:
1411 dfsm->cpg_handle = 0;
1412 return result;
1413 }
1414
1415 cs_error_t
1416 dfsm_join(dfsm_t *dfsm)
1417 {
1418 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1419 g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_LIBRARY);
1420 g_return_val_if_fail(dfsm->joined == 0, CS_ERR_EXIST);
1421
1422 cs_error_t result;
1423
1424 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1425 int retries = 0;
1426 loop:
1427 result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name);
1428 if (result == CS_ERR_TRY_AGAIN) {
1429 nanosleep(&tvreq, NULL);
1430 ++retries;
1431 if ((retries % 10) == 0)
1432 cfs_dom_message(dfsm->log_domain, "cpg_join retry %d", retries);
1433 goto loop;
1434 }
1435
1436 if (result != CS_OK) {
1437 cfs_dom_critical(dfsm->log_domain, "cpg_join failed: %d", result);
1438 return result;
1439 }
1440
1441 dfsm->joined = TRUE;
1442 return TRUE;
1443 }
1444
1445 cs_error_t
1446 dfsm_leave (dfsm_t *dfsm)
1447 {
1448 g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
1449 g_return_val_if_fail(dfsm->joined, CS_ERR_NOT_EXIST);
1450
1451 cs_error_t result;
1452
1453 struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
1454 int retries = 0;
1455 loop:
1456 result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
1457 if (result == CS_ERR_TRY_AGAIN) {
1458 nanosleep(&tvreq, NULL);
1459 ++retries;
1460 if ((retries % 10) == 0)
1461 cfs_dom_message(dfsm->log_domain, "cpg_leave retry %d", retries);
1462 goto loop;
1463 }
1464
1465 if (result != CS_OK) {
1466 cfs_dom_critical(dfsm->log_domain, "cpg_leave failed: %d", result);
1467 return result;
1468 }
1469
1470 dfsm->joined = FALSE;
1471
1472 return TRUE;
1473 }
1474
1475 gboolean
1476 dfsm_finalize(dfsm_t *dfsm)
1477 {
1478 g_return_val_if_fail(dfsm != NULL, FALSE);
1479
1480 dfsm_send_sync_message_abort(dfsm);
1481
1482 if (dfsm->joined)
1483 dfsm_leave(dfsm);
1484
1485 if (dfsm->cpg_handle) {
1486 cpg_finalize(dfsm->cpg_handle);
1487 dfsm->cpg_handle = 0;
1488 dfsm->joined = FALSE;
1489 dfsm->we_are_member = 0;
1490 }
1491
1492 return TRUE;
1493 }
1494
1495 void
1496 dfsm_destroy(dfsm_t *dfsm)
1497 {
1498 g_return_if_fail(dfsm != NULL);
1499
1500 dfsm_finalize(dfsm);
1501
1502 if (dfsm->sync_info && dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn)
1503 dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
1504
1505 dfsm_free_sync_queue(dfsm);
1506
1507 g_mutex_clear (&dfsm->mode_mutex);
1508
1509 g_mutex_clear (&dfsm->sync_mutex);
1510
1511 g_cond_clear (&dfsm->sync_cond);
1512
1513 if (dfsm->results)
1514 g_hash_table_destroy(dfsm->results);
1515
1516 if (dfsm->msg_queue) {
1517 dfsm_free_message_queue(dfsm);
1518 g_sequence_free(dfsm->msg_queue);
1519 }
1520
1521 if (dfsm->sync_info)
1522 g_free(dfsm->sync_info);
1523
1524 if (dfsm->cpg_handle)
1525 cpg_finalize(dfsm->cpg_handle);
1526
1527 if (dfsm->members)
1528 g_hash_table_destroy(dfsm->members);
1529
1530 g_free(dfsm);
1531 }
1532
1533 typedef struct {
1534 dfsm_t *dfsm;
1535 } service_dfsm_private_t;
1536
1537 static gboolean
1538 service_dfsm_finalize(
1539 cfs_service_t *service,
1540 gpointer context)
1541 {
1542 g_return_val_if_fail(service != NULL, FALSE);
1543 g_return_val_if_fail(context != NULL, FALSE);
1544
1545 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1546 dfsm_t *dfsm = private->dfsm;
1547
1548 g_return_val_if_fail(dfsm != NULL, FALSE);
1549
1550 return dfsm_finalize(dfsm);
1551 }
1552
1553 static int
1554 service_dfsm_initialize(
1555 cfs_service_t *service,
1556 gpointer context)
1557 {
1558 g_return_val_if_fail(service != NULL, -1);
1559 g_return_val_if_fail(context != NULL, -1);
1560
1561 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1562 dfsm_t *dfsm = private->dfsm;
1563
1564 g_return_val_if_fail(dfsm != NULL, -1);
1565
1566 /* serious internal error - don't try to recover */
1567 if (!dfsm_restartable(dfsm))
1568 return -1;
1569
1570 int fd = -1;
1571
1572 cs_error_t result;
1573 if ((result = dfsm_initialize(dfsm, &fd)) != CS_OK)
1574 return -1;
1575
1576 result = dfsm_join(dfsm);
1577 if (result != CS_OK) {
1578 /* we can't dispatch if not joined, so we need to finalize */
1579 dfsm_finalize(dfsm);
1580 return -1;
1581 }
1582
1583 return fd;
1584 }
1585
1586 static gboolean
1587 service_dfsm_dispatch(
1588 cfs_service_t *service,
1589 gpointer context)
1590 {
1591 g_return_val_if_fail(service != NULL, FALSE);
1592 g_return_val_if_fail(context != NULL, FALSE);
1593
1594 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1595 dfsm_t *dfsm = private->dfsm;
1596
1597 g_return_val_if_fail(dfsm != NULL, FALSE);
1598 g_return_val_if_fail(dfsm->cpg_handle != 0, FALSE);
1599
1600 cs_error_t result;
1601
1602 result = dfsm_dispatch(dfsm, CS_DISPATCH_ONE);
1603 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1604 goto finalize;
1605 if (result != CS_OK)
1606 goto fail;
1607
1608 dfsm_mode_t mode = dfsm_get_mode(dfsm);
1609 if (mode >= DFSM_ERROR_MODE_START) {
1610 if (dfsm->joined) {
1611 result = dfsm_leave(dfsm);
1612 if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
1613 goto finalize;
1614 if (result != CS_OK)
1615 goto finalize;
1616 } else {
1617 if (!dfsm->we_are_member)
1618 return FALSE;
1619 }
1620 }
1621
1622 return TRUE;
1623
1624 finalize:
1625 dfsm_finalize(dfsm);
1626 fail:
1627 cfs_service_set_restartable(service, dfsm_restartable(dfsm));
1628 return FALSE;
1629 }
1630
1631 static void
1632 service_dfsm_timer(
1633 cfs_service_t *service,
1634 gpointer context)
1635 {
1636 g_return_if_fail(service != NULL);
1637 g_return_if_fail(context != NULL);
1638
1639 service_dfsm_private_t *private = (service_dfsm_private_t *)context;
1640 dfsm_t *dfsm = private->dfsm;
1641
1642 g_return_if_fail(dfsm != NULL);
1643
1644 dfsm_verify_request(dfsm);
1645 }
1646
1647 static cfs_service_callbacks_t cfs_dfsm_callbacks = {
1648 .cfs_service_initialize_fn = service_dfsm_initialize,
1649 .cfs_service_finalize_fn = service_dfsm_finalize,
1650 .cfs_service_dispatch_fn = service_dfsm_dispatch,
1651 .cfs_service_timer_fn = service_dfsm_timer,
1652 };
1653
1654 cfs_service_t *
1655 service_dfsm_new(dfsm_t *dfsm)
1656 {
1657 cfs_service_t *service;
1658
1659 g_return_val_if_fail(dfsm != NULL, NULL);
1660
1661 service_dfsm_private_t *private = g_new0(service_dfsm_private_t, 1);
1662 if (!private)
1663 return NULL;
1664
1665 private->dfsm = dfsm;
1666
1667 service = cfs_service_new(&cfs_dfsm_callbacks, dfsm->log_domain, private);
1668
1669 return service;
1670 }
1671
1672 void
1673 service_dfsm_destroy(cfs_service_t *service)
1674 {
1675 g_return_if_fail(service != NULL);
1676
1677 service_dfsm_private_t *private =
1678 (service_dfsm_private_t *)cfs_service_get_context(service);
1679
1680 g_free(private);
1681 g_free(service);
1682 }
1683
1684
1685
1686