]> git.proxmox.com Git - pve-cluster.git/blobdiff - data/src/dfsm.c
bump version to 8.0.6
[pve-cluster.git] / data / src / dfsm.c
diff --git a/data/src/dfsm.c b/data/src/dfsm.c
deleted file mode 100644 (file)
index 7922a2e..0000000
+++ /dev/null
@@ -1,1694 +0,0 @@
-/*
-  Copyright (C) 2010 Proxmox Server Solutions GmbH
-
-  This program is free software: you can redistribute it and/or modify
-  it under the terms of the GNU Affero General Public License as published by
-  the Free Software Foundation, either version 3 of the License, or
-  (at your option) any later version.
-
-  This program is distributed in the hope that it will be useful,
-  but WITHOUT ANY WARRANTY; without even the implied warranty of
-  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-  GNU Affero General Public License for more details.
-
-  You should have received a copy of the GNU Affero General Public License
-  along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-  Author: Dietmar Maurer <dietmar@proxmox.com>
-
-*/
-
-
-/* NOTE: we try to keep the CPG handle as long as possible, because
- * calling cpg_initialize/cpg_finalize multiple times from the 
- * same process confuses corosync.
- * Note: CS_ERR_LIBRARY is returned when corosync died
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif /* HAVE_CONFIG_H */
-
-#include <sys/types.h>
-#include <unistd.h>
-#include <string.h>
-#include <stdlib.h>
-
-#include <corosync/corotypes.h>
-#include <corosync/cpg.h>
-#include <glib.h>
-
-#include "cfs-utils.h"
-#include "dfsm.h"
-
-static cpg_callbacks_t cpg_callbacks;
-
-typedef enum {
-       DFSM_MODE_START = 0,
-       DFSM_MODE_START_SYNC = 1,
-       DFSM_MODE_SYNCED = 2,
-       DFSM_MODE_UPDATE = 3,
-
-       /* values >= 128 indicates abnormal/error conditions */
-       DFSM_ERROR_MODE_START = 128,
-       DFSM_MODE_LEAVE = 253,
-       DFSM_MODE_VERSION_ERROR = 254,
-       DFSM_MODE_ERROR = 255,
-} dfsm_mode_t;
-
-typedef enum {
-       DFSM_MESSAGE_NORMAL = 0,
-       DFSM_MESSAGE_SYNC_START = 1,
-       DFSM_MESSAGE_STATE = 2,
-       DFSM_MESSAGE_UPDATE = 3,
-       DFSM_MESSAGE_UPDATE_COMPLETE = 4,
-       DFSM_MESSAGE_VERIFY_REQUEST = 5,
-       DFSM_MESSAGE_VERIFY = 6,
-} dfsm_message_t;
-
-#define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY)
-
-typedef struct {
-       uint16_t type;
-       uint16_t subtype;
-       uint32_t protocol_version;
-       uint32_t time;
-       uint32_t reserved;
-} dfsm_message_header_t;
-
-typedef struct {
-       uint32_t epoch; // per process (not globally unique) 
-       uint32_t time;
-       uint32_t nodeid;
-       uint32_t pid;
-} dfsm_sync_epoch_t;
-
-typedef struct {
-       dfsm_message_header_t base;
-       dfsm_sync_epoch_t epoch;
-} dfsm_message_state_header_t;
-
-typedef struct {
-       dfsm_message_header_t base;
-       uint64_t count;
-} dfsm_message_normal_header_t;
-
-typedef struct {
-       uint32_t nodeid;
-       uint32_t pid;
-       uint64_t msg_count;
-       void *msg;
-       int msg_len; // fixme: unsigned?
-} dfsm_queued_message_t;
-
-struct dfsm {
-       char *log_domain;
-       cpg_callbacks_t *cpg_callbacks;
-       dfsm_callbacks_t *dfsm_callbacks;
-       cpg_handle_t cpg_handle;
-       struct cpg_name cpg_group_name;
-       uint32_t nodeid;
-       uint32_t pid;
-       int we_are_member;
-
-       guint32 protocol_version;
-       gpointer data;
-
-       gboolean joined;
-
-       /* mode is protected with mode_mutex */
-       GMutex *mode_mutex;
-       dfsm_mode_t mode;
-
-       GHashTable *members; /* contains dfsm_node_info_t pointers  */
-       dfsm_sync_info_t *sync_info;
-       uint32_t local_epoch_counter;
-       dfsm_sync_epoch_t sync_epoch;
-       uint32_t lowest_nodeid; 
-       GSequence *msg_queue; 
-       GList *sync_queue;
-
-       /* synchrounous message transmission, protected with sync_mutex */
-       GMutex *sync_mutex;
-       GCond *sync_cond;
-       GHashTable *results;
-       uint64_t msgcount;
-       uint64_t msgcount_rcvd;
-
-       /* state verification */
-       guchar csum[32];
-       dfsm_sync_epoch_t csum_epoch;
-       uint64_t csum_id;
-       uint64_t csum_counter;
-};
-
-static gboolean dfsm_deliver_queue(dfsm_t *dfsm);
-static gboolean dfsm_deliver_sync_queue(dfsm_t *dfsm);
-
-gboolean 
-dfsm_nodeid_is_local(
-       dfsm_t *dfsm, 
-       uint32_t nodeid, 
-       uint32_t pid)
-{
-       g_return_val_if_fail(dfsm != NULL, FALSE);
-
-       return (nodeid == dfsm->nodeid && pid == dfsm->pid); 
-}
-
-
-static void 
-dfsm_send_sync_message_abort(dfsm_t *dfsm)
-{
-       g_return_if_fail(dfsm != NULL);
-       g_return_if_fail(dfsm->sync_mutex != NULL);
-       g_return_if_fail(dfsm->sync_cond != NULL);
-
-       g_mutex_lock (dfsm->sync_mutex);
-       dfsm->msgcount_rcvd = dfsm->msgcount;
-       g_cond_broadcast (dfsm->sync_cond);
-       g_mutex_unlock (dfsm->sync_mutex);
-}
-
-static void 
-dfsm_record_local_result(
-       dfsm_t *dfsm,
-       uint64_t msg_count,
-       int msg_result,
-       gboolean processed)
-{
-       g_return_if_fail(dfsm != NULL);
-       g_return_if_fail(dfsm->sync_mutex != NULL);
-       g_return_if_fail(dfsm->sync_cond != NULL);
-       g_return_if_fail(dfsm->results != NULL);
-
-       g_mutex_lock (dfsm->sync_mutex);
-       dfsm_result_t *rp = (dfsm_result_t *)g_hash_table_lookup(dfsm->results, &msg_count);
-       if (rp) {
-               rp->result = msg_result;
-               rp->processed = processed;
-       }
-       dfsm->msgcount_rcvd = msg_count;
-       g_cond_broadcast (dfsm->sync_cond);
-       g_mutex_unlock (dfsm->sync_mutex);
-}
-
-static cpg_error_t 
-dfsm_send_message_full(
-       dfsm_t *dfsm,
-       struct iovec *iov, 
-       unsigned int len,
-       int retry)
-{
-       g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
-       g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
-
-       struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
-       cpg_error_t result;
-       int retries = 0;
-loop:
-       result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
-       if (retry && result == CPG_ERR_TRY_AGAIN) {
-               nanosleep(&tvreq, NULL);
-               ++retries;
-               if ((retries % 10) == 0)
-                       cfs_dom_message(dfsm->log_domain, "cpg_send_message retry %d", retries);
-               if (retries < 100)
-                       goto loop;
-       }
-
-       if (retries)
-               cfs_dom_message(dfsm->log_domain, "cpg_send_message retried %d times", retries);
-
-       if (result != CS_OK &&
-           (!retry || result != CPG_ERR_TRY_AGAIN))
-               cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
-
-       return result;
-}
-
-static cpg_error_t 
-dfsm_send_state_message_full(
-       dfsm_t *dfsm,
-       uint16_t type,
-       struct iovec *iov, 
-       unsigned int len) 
-{
-       g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
-       g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type), CS_ERR_INVALID_PARAM);
-       g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
-
-       dfsm_message_state_header_t header;
-       header.base.type = type;
-       header.base.subtype = 0;
-       header.base.protocol_version = dfsm->protocol_version;
-       header.base.time = time(NULL);
-       header.base.reserved = 0;
-
-       header.epoch = dfsm->sync_epoch;
-
-       struct iovec real_iov[len + 1];
-
-       real_iov[0].iov_base = (char *)&header;
-       real_iov[0].iov_len = sizeof(header);
-
-       for (int i = 0; i < len; i++)
-               real_iov[i + 1] = iov[i];
-
-       return dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
-}
-
-cpg_error_t 
-dfsm_send_update(
-       dfsm_t *dfsm,
-       struct iovec *iov, 
-       unsigned int len)
-{
-       return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE, iov, len);
-}
-
-cpg_error_t 
-dfsm_send_update_complete(dfsm_t *dfsm)
-{
-       return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE_COMPLETE, NULL, 0);
-}
-
-
-cpg_error_t 
-dfsm_send_message(
-       dfsm_t *dfsm,
-       uint16_t msgtype,
-       struct iovec *iov, 
-       int len)
-{
-       return dfsm_send_message_sync(dfsm, msgtype, iov, len, NULL);
-}
-
-cpg_error_t 
-dfsm_send_message_sync(
-       dfsm_t *dfsm,
-       uint16_t msgtype,
-       struct iovec *iov, 
-       int len,
-       dfsm_result_t *rp)
-{
-       g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
-       g_return_val_if_fail(dfsm->sync_mutex != NULL, CS_ERR_INVALID_PARAM);
-       g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
-
-       g_mutex_lock (dfsm->sync_mutex);
-       /* note: hold lock until message is sent - to guarantee ordering */
-       uint64_t msgcount = ++dfsm->msgcount;
-       if (rp) {
-               rp->msgcount = msgcount;
-               rp->processed = 0;
-               g_hash_table_replace(dfsm->results, &rp->msgcount, rp);
-       }
-
-       dfsm_message_normal_header_t header;
-       header.base.type = DFSM_MESSAGE_NORMAL;
-       header.base.subtype = msgtype;
-       header.base.protocol_version = dfsm->protocol_version;
-       header.base.time = time(NULL);
-       header.base.reserved = 0;
-       header.count = msgcount;
-
-       struct iovec real_iov[len + 1];
-
-       real_iov[0].iov_base = (char *)&header;
-       real_iov[0].iov_len = sizeof(header);
-
-       for (int i = 0; i < len; i++)
-               real_iov[i + 1] = iov[i];
-
-       cpg_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
-
-       g_mutex_unlock (dfsm->sync_mutex);
-
-       if (result != CS_OK) {
-               cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
-
-               if (rp) {
-                       g_mutex_lock (dfsm->sync_mutex);
-                       g_hash_table_remove(dfsm->results, &rp->msgcount);
-                       g_mutex_unlock (dfsm->sync_mutex);
-               }
-               return result;
-       }
-
-       if (rp) {
-               g_mutex_lock (dfsm->sync_mutex);
-
-               while (dfsm->msgcount_rcvd < msgcount)
-                       g_cond_wait (dfsm->sync_cond, dfsm->sync_mutex);
-
-      
-               g_hash_table_remove(dfsm->results, &rp->msgcount);
-               
-               g_mutex_unlock (dfsm->sync_mutex);
-
-               return rp->processed ? CS_OK : CS_ERR_FAILED_OPERATION;
-       }
-
-       return CS_OK;
-}
-
-static gboolean 
-dfsm_send_checksum(dfsm_t *dfsm)
-{
-       g_return_val_if_fail(dfsm != NULL, FALSE);
-
-       int len = 2;
-       struct iovec iov[len];
-
-       iov[0].iov_base = (char *)&dfsm->csum_id;
-       iov[0].iov_len = sizeof(dfsm->csum_id);
-       iov[1].iov_base = dfsm->csum;
-       iov[1].iov_len = sizeof(dfsm->csum);
-       
-       gboolean res = (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY, iov, len) == CS_OK);
-
-       return res;
-}
-
-static void 
-dfsm_free_queue_entry(gpointer data)
-{
-       dfsm_queued_message_t *qm = (dfsm_queued_message_t *)data;
-       g_free (qm->msg);
-       g_free (qm);
-}
-
-static void 
-dfsm_free_message_queue(dfsm_t *dfsm) 
-{
-       g_return_if_fail(dfsm != NULL);
-       g_return_if_fail(dfsm->msg_queue != NULL);
-
-       GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
-       GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
-       while (iter != end) {
-               GSequenceIter *cur = iter; 
-               iter = g_sequence_iter_next(iter);
-               dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
-                       g_sequence_get(cur);
-               dfsm_free_queue_entry(qm);
-               g_sequence_remove(cur);
-       }
-}
-
-static void 
-dfsm_free_sync_queue(dfsm_t *dfsm) 
-{
-       g_return_if_fail(dfsm != NULL);
-
-       GList *iter = dfsm->sync_queue;
-       while (iter) {
-               dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
-               iter = g_list_next(iter);
-               dfsm_free_queue_entry(qm);
-       }
-
-       g_list_free(dfsm->sync_queue);
-       dfsm->sync_queue = NULL;
-}
-
-static gint 
-message_queue_sort_fn(
-       gconstpointer a,
-       gconstpointer b,
-       gpointer user_data)
-{
-       return ((dfsm_queued_message_t *)a)->msg_count - 
-               ((dfsm_queued_message_t *)b)->msg_count;
-}
-
-static dfsm_node_info_t *
-dfsm_node_info_lookup(
-       dfsm_t *dfsm,
-       uint32_t nodeid, 
-       uint32_t pid)
-{
-       g_return_val_if_fail(dfsm != NULL, NULL);
-       g_return_val_if_fail(dfsm->members != NULL, NULL);
-
-       dfsm_node_info_t info = { .nodeid = nodeid, .pid = pid };
-
-       return (dfsm_node_info_t *)g_hash_table_lookup(dfsm->members, &info);
-}
-
-static dfsm_queued_message_t *
-dfsm_queue_add_message(
-       dfsm_t *dfsm,
-       uint32_t nodeid,
-       uint32_t pid,
-       uint64_t msg_count,
-       const void *msg,
-       size_t msg_len)
-{
-       g_return_val_if_fail(dfsm != NULL, NULL);
-       g_return_val_if_fail(msg != NULL, NULL);
-       g_return_val_if_fail(msg_len != 0, NULL);
-
-       dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, nodeid, pid);
-       if (!ni) {
-               cfs_dom_critical(dfsm->log_domain, "dfsm_node_info_lookup failed");
-               return NULL;
-       }
-
-       dfsm_queued_message_t *qm = g_new0(dfsm_queued_message_t, 1);
-       g_return_val_if_fail(qm != NULL, NULL);
-               
-       qm->nodeid = nodeid;
-       qm->pid = pid;
-       qm->msg = g_memdup (msg, msg_len);
-       qm->msg_len = msg_len;
-       qm->msg_count =  msg_count;
-
-       if (dfsm->mode == DFSM_MODE_UPDATE && ni->synced) {
-               dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
-       } else {
-               /* NOTE: we only need to sort the queue because we resend all
-                * queued messages sometimes.   
-                */
-               g_sequence_insert_sorted(dfsm->msg_queue, qm, message_queue_sort_fn, NULL);
-       }
-
-       return qm;
-}
-
-static guint 
-dfsm_sync_info_hash(gconstpointer key)
-{
-       dfsm_node_info_t *info = (dfsm_node_info_t *)key;
-
-       return g_int_hash(&info->nodeid) + g_int_hash(&info->pid);
-}
-
-static gboolean 
-dfsm_sync_info_equal(
-       gconstpointer v1, 
-       gconstpointer v2)
-{
-       dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
-       dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
-
-       if (info1->nodeid == info2->nodeid &&
-           info1->pid == info2->pid)
-               return TRUE;
-
-       return FALSE;
-}
-
-static int 
-dfsm_sync_info_compare(
-       gconstpointer v1, 
-       gconstpointer v2)
-{
-       dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1;
-       dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2;
-
-       if (info1->nodeid != info2->nodeid)
-               return info1->nodeid - info2->nodeid;
-
-       return info1->pid - info2->pid;
-}
-
-static void 
-dfsm_set_mode(
-       dfsm_t *dfsm, 
-       dfsm_mode_t new_mode)
-{
-       g_return_if_fail(dfsm != NULL);
-
-       cfs_debug("dfsm_set_mode - set mode to %d", new_mode);
-
-       int changed = 0;
-       g_mutex_lock (dfsm->mode_mutex);
-       if (dfsm->mode != new_mode) {
-               if (new_mode < DFSM_ERROR_MODE_START ||
-                   (dfsm->mode < DFSM_ERROR_MODE_START || new_mode >= dfsm->mode)) {
-                       dfsm->mode = new_mode;
-                       changed = 1;
-               }
-       }
-       g_mutex_unlock (dfsm->mode_mutex);
-
-       if (!changed)
-               return;
-
-       if (new_mode == DFSM_MODE_START) {
-               cfs_dom_message(dfsm->log_domain, "start cluster connection");
-       } else if (new_mode == DFSM_MODE_START_SYNC) {
-               cfs_dom_message(dfsm->log_domain, "starting data syncronisation");
-       } else if (new_mode == DFSM_MODE_SYNCED) {
-               cfs_dom_message(dfsm->log_domain, "all data is up to date");
-               if (dfsm->dfsm_callbacks->dfsm_synced_fn)
-                       dfsm->dfsm_callbacks->dfsm_synced_fn(dfsm);
-       } else if (new_mode == DFSM_MODE_UPDATE) {
-               cfs_dom_message(dfsm->log_domain, "waiting for updates from leader");   
-       } else if (new_mode == DFSM_MODE_LEAVE) {
-               cfs_dom_critical(dfsm->log_domain, "leaving CPG group");        
-       } else if (new_mode == DFSM_MODE_ERROR) {
-               cfs_dom_critical(dfsm->log_domain, "serious internal error - stop cluster connection"); 
-       } else if (new_mode == DFSM_MODE_VERSION_ERROR) {
-               cfs_dom_critical(dfsm->log_domain, "detected newer protocol - please update this node"); 
-       }
-}
-
-static dfsm_mode_t 
-dfsm_get_mode(dfsm_t *dfsm)
-{
-       g_return_val_if_fail(dfsm != NULL, DFSM_MODE_ERROR);
-
-       g_mutex_lock (dfsm->mode_mutex);
-       dfsm_mode_t mode = dfsm->mode;
-       g_mutex_unlock (dfsm->mode_mutex);
-
-       return mode;
-}
-
-gboolean 
-dfsm_restartable(dfsm_t *dfsm)
-{
-       dfsm_mode_t mode = dfsm_get_mode(dfsm);
-
-       return !(mode == DFSM_MODE_ERROR || 
-                mode == DFSM_MODE_VERSION_ERROR);
-}
-
-void
-dfsm_set_errormode(dfsm_t *dfsm)
-{
-       dfsm_set_mode(dfsm, DFSM_MODE_ERROR);
-}
-
-static void 
-dfsm_release_sync_resources(
-       dfsm_t *dfsm,
-       const struct cpg_address *member_list, 
-       size_t member_list_entries)
-{
-       g_return_if_fail(dfsm != NULL);
-       g_return_if_fail(dfsm->members != NULL);
-       g_return_if_fail(!member_list_entries || member_list != NULL);
-
-       cfs_debug("enter dfsm_release_sync_resources");
-
-       if (dfsm->sync_info) {
-
-               if (dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn) {
-                       dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
-                       dfsm->sync_info->data = NULL;
-               }
-               
-               for (int i = 0; i < dfsm->sync_info->node_count; i++) {
-                       if (dfsm->sync_info->nodes[i].state) {
-                               g_free(dfsm->sync_info->nodes[i].state);
-                               dfsm->sync_info->nodes[i].state = NULL;
-                               dfsm->sync_info->nodes[i].state_len = 0;
-                       }                       
-               }
-       }
-
-       if (member_list) {
-
-               g_hash_table_remove_all(dfsm->members);
-
-               if (dfsm->sync_info)
-                       g_free(dfsm->sync_info);
-
-               int size = sizeof(dfsm_sync_info_t) + 
-                       member_list_entries*sizeof(dfsm_sync_info_t);
-               dfsm_sync_info_t *sync_info = dfsm->sync_info = g_malloc0(size); 
-               sync_info->node_count = member_list_entries;
-
-               for (int i = 0; i < member_list_entries; i++) {
-                       sync_info->nodes[i].nodeid = member_list[i].nodeid;
-                       sync_info->nodes[i].pid = member_list[i].pid;
-               }
-
-               qsort(sync_info->nodes, member_list_entries, sizeof(dfsm_node_info_t),
-                     dfsm_sync_info_compare);
-
-               for (int i = 0; i < member_list_entries; i++) {
-                       dfsm_node_info_t *info = &sync_info->nodes[i];
-                       g_hash_table_insert(dfsm->members, info, info);
-                       if (info->nodeid == dfsm->nodeid && info->pid == dfsm->pid)
-                               sync_info->local = info;
-               }
-       } 
-}
-
-static void 
-dfsm_cpg_deliver_callback(
-       cpg_handle_t handle,
-       const struct cpg_name *group_name,
-       uint32_t nodeid,
-       uint32_t pid,
-       void *msg,
-       size_t msg_len)
-{
-       cs_error_t result;
-
-       dfsm_t *dfsm = NULL;
-       result = cpg_context_get(handle, (gpointer *)&dfsm);
-       if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
-               cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
-               return; /* we have no valid dfsm pointer, so we can just ignore this */
-       }
-       dfsm_mode_t mode = dfsm_get_mode(dfsm);
-
-       cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
-
-       if (mode >= DFSM_ERROR_MODE_START) {
-               cfs_dom_debug(dfsm->log_domain, "error mode - ignoring message");
-               goto leave;
-       }
-
-       if (!dfsm->sync_info) {
-               cfs_dom_critical(dfsm->log_domain, "no dfsm_sync_info - internal error");
-               goto leave;
-       }
-
-       if (msg_len < sizeof(dfsm_message_header_t)) {
-               cfs_dom_critical(dfsm->log_domain, "received short message (%ld bytes)", msg_len);
-               goto leave;
-       }
-
-       dfsm_message_header_t *base_header = (dfsm_message_header_t *)msg;
-       
-       if (base_header->protocol_version > dfsm->protocol_version) {
-               cfs_dom_critical(dfsm->log_domain, "received message with protocol version %d", 
-                                base_header->protocol_version);
-               dfsm_set_mode(dfsm, DFSM_MODE_VERSION_ERROR);
-               return;
-       } else if (base_header->protocol_version < dfsm->protocol_version) {
-               cfs_dom_message(dfsm->log_domain, "ignore message with wrong protocol version %d", 
-                               base_header->protocol_version);
-               return;
-       }
-
-       if (base_header->type == DFSM_MESSAGE_NORMAL) {
-
-               dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
-
-               if (msg_len < sizeof(dfsm_message_normal_header_t)) {
-                       cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %ld bytes)",
-                                        base_header->type, base_header->subtype, msg_len);
-                       goto leave;
-               }
-
-               if (mode != DFSM_MODE_SYNCED) {
-                       cfs_dom_debug(dfsm->log_domain, "queue message %zu (subtype = %d, length = %ld)", 
-                                     header->count, base_header->subtype, msg_len); 
-
-                       if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
-                               goto leave;
-               } else {
-
-                       int msg_res = -1;
-                       int res = dfsm->dfsm_callbacks->dfsm_deliver_fn(
-                               dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype, 
-                               base_header->time, msg + sizeof(dfsm_message_normal_header_t), 
-                               msg_len - sizeof(dfsm_message_normal_header_t));
-
-                       if (nodeid == dfsm->nodeid && pid == dfsm->pid)
-                               dfsm_record_local_result(dfsm, header->count, msg_res, res);
-
-                       if (res < 0)
-                               goto leave;
-               }
-
-               return;
-       } 
-
-       /* state related messages
-        * we needs right epoch - else we simply discard the message 
-        */
-
-       dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
-
-       if (msg_len < sizeof(dfsm_message_state_header_t)) {
-               cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %ld bytes)",
-                                base_header->type, base_header->subtype, msg_len);
-               goto leave;
-       }
-
-       if (base_header->type != DFSM_MESSAGE_SYNC_START && 
-           (memcmp(&header->epoch, &dfsm->sync_epoch, sizeof(dfsm_sync_epoch_t)) != 0)) {
-               cfs_dom_debug(dfsm->log_domain, "ignore message (msg_type == %d) with "
-                             "wrong epoch (epoch %d/%d/%08X)", base_header->type, 
-                             header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
-               return;
-       }
-
-       msg += sizeof(dfsm_message_state_header_t);
-       msg_len -= sizeof(dfsm_message_state_header_t);
-
-       if (mode == DFSM_MODE_SYNCED) {
-               if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
-
-                       for (int i = 0; i < dfsm->sync_info->node_count; i++)
-                               dfsm->sync_info->nodes[i].synced = 1;
-
-                       if (!dfsm_deliver_queue(dfsm))
-                               goto leave;
-
-                       return;
-
-               } else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
-
-                       if (msg_len != sizeof(dfsm->csum_counter)) {
-                               cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len, nodeid, pid);
-                               goto leave;
-                       }
-
-                       uint64_t csum_id = *((uint64_t *)msg);
-                       msg += 8; msg_len -= 8;
-
-                       cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016zX", nodeid, csum_id);
-
-                       if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
-                               if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
-                                           dfsm, dfsm->data, dfsm->csum, sizeof(dfsm->csum))) {
-                                       cfs_dom_critical(dfsm->log_domain, "unable to compute data checksum");
-                                       goto leave;
-                               }
-
-                               dfsm->csum_epoch = header->epoch;
-                               dfsm->csum_id = csum_id;
-
-                               if (nodeid == dfsm->nodeid && pid == dfsm->pid) {
-                                       if (!dfsm_send_checksum(dfsm)) 
-                                               goto leave;
-                               }
-                       }
-
-                       return;
-
-               } else if (base_header->type == DFSM_MESSAGE_VERIFY) {
-               
-                       cfs_dom_debug(dfsm->log_domain, "received verify message");
-                       
-                       if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
-
-                               if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
-                                       cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%ld bytes)", msg_len);
-                                       goto leave;
-                               }
-
-                               uint64_t csum_id = *((uint64_t *)msg);
-                               msg += 8; msg_len -= 8;
-                               
-                               if (dfsm->csum_id == csum_id &&
-                                   (memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
-                                       if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
-                                               cfs_dom_critical(dfsm->log_domain, "wrong checksum %016zX != %016zX - restarting",
-                                                                *(uint64_t *)msg, *(uint64_t *)dfsm->csum);
-                                               goto leave;
-                                       } else {
-                                               cfs_dom_message(dfsm->log_domain, "data verification successful");
-                                       }
-                               } else {
-                                       cfs_dom_message(dfsm->log_domain, "skip verification - no checksum saved");
-                               }
-                       }
-
-                       return;
-
-               } else {
-                       /* ignore (we already got all required updates, or we are leader) */
-                       cfs_dom_debug(dfsm->log_domain, "ignore state sync message %d", 
-                                     base_header->type);
-                       return;
-               }
-               
-       } else if (mode == DFSM_MODE_START_SYNC) {
-
-               if (base_header->type == DFSM_MESSAGE_SYNC_START) {
-
-                       if (nodeid != dfsm->lowest_nodeid) {
-                               cfs_dom_critical(dfsm->log_domain, "ignore sync request from wrong member %d/%d",
-                                                nodeid, pid);
-                       }
-
-                       cfs_dom_message(dfsm->log_domain, "received sync request (epoch %d/%d/%08X)",
-                                       header->epoch.nodeid, header->epoch.pid, header->epoch.epoch);
-
-                       dfsm->sync_epoch = header->epoch;
-
-                       dfsm_release_sync_resources(dfsm, NULL, 0);
-
-                       unsigned int state_len = 0;
-                       gpointer state = NULL;
-                       
-                       state = dfsm->dfsm_callbacks->dfsm_get_state_fn(dfsm, dfsm->data, &state_len);
-
-                       if (!(state && state_len)) {
-                               cfs_dom_critical(dfsm->log_domain, "dfsm_get_state_fn failed");
-                               goto leave;
-                       }
-
-                       struct iovec iov[1];
-                       iov[0].iov_base = state;
-                       iov[0].iov_len = state_len;
-
-                       result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_STATE, iov, 1);
-
-                       if (state)
-                               g_free(state);
-
-                       if (result != CS_OK)
-                               goto leave;
-
-                       return;
-
-               } else if (base_header->type == DFSM_MESSAGE_STATE) {
-
-                       dfsm_node_info_t *ni;
-                       
-                       if (!(ni = dfsm_node_info_lookup(dfsm, nodeid, pid))) {
-                               cfs_dom_critical(dfsm->log_domain, "received state for non-member %d/%d", nodeid, pid);
-                               goto leave;
-                       }
-
-                       if (ni->state) {
-                               cfs_dom_critical(dfsm->log_domain, "received duplicate state for member %d/%d", nodeid, pid);
-                               goto leave;
-                       }
-
-                       ni->state = g_memdup(msg, msg_len);
-                       ni->state_len = msg_len;
-
-                       int received_all = 1;
-                       for (int i = 0; i < dfsm->sync_info->node_count; i++) {
-                               if (!dfsm->sync_info->nodes[i].state) {
-                                       received_all = 0;
-                                       break;
-                               }
-                       }
-
-                       if (received_all) {
-                               cfs_dom_message(dfsm->log_domain, "received all states");
-
-                               int res = dfsm->dfsm_callbacks->dfsm_process_state_update_fn(dfsm, dfsm->data, dfsm->sync_info);
-                               if (res < 0)
-                                       goto leave;
-
-                               if (dfsm->sync_info->local->synced)  {
-                                       dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
-                                       dfsm_release_sync_resources(dfsm, NULL, 0);
-
-                                       if (!dfsm_deliver_queue(dfsm))
-                                               goto leave;
-                                       
-                               } else {
-                                       dfsm_set_mode(dfsm, DFSM_MODE_UPDATE);
-
-                                       if (!dfsm_deliver_queue(dfsm))
-                                               goto leave;
-                               }
-
-                       }
-
-                       return;
-               }
-
-       } else if (mode == DFSM_MODE_UPDATE) {
-
-               if (base_header->type == DFSM_MESSAGE_UPDATE) {
-                               
-                       int res = dfsm->dfsm_callbacks->dfsm_process_update_fn(
-                               dfsm, dfsm->data, dfsm->sync_info, nodeid, pid, msg, msg_len);
-
-                       if (res < 0)
-                               goto leave;
-
-                       return;
-
-               } else if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) {
-
-
-                       int res = dfsm->dfsm_callbacks->dfsm_commit_fn(dfsm, dfsm->data, dfsm->sync_info);
-
-                       if (res < 0)
-                               goto leave;
-
-                       for (int i = 0; i < dfsm->sync_info->node_count; i++)
-                               dfsm->sync_info->nodes[i].synced = 1;
-
-                       dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
-
-                       if (!dfsm_deliver_sync_queue(dfsm))
-                               goto leave;
-
-                       if (!dfsm_deliver_queue(dfsm))
-                               goto leave;
-
-                       dfsm_release_sync_resources(dfsm, NULL, 0);
-
-                       return;
-               }
-
-       } else {
-               cfs_dom_critical(dfsm->log_domain, "internal error - unknown mode %d", mode);
-               goto leave;
-       }
-
-       if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST ||
-           base_header->type == DFSM_MESSAGE_VERIFY) {
-
-               cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
-    
-       } else {
-               cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %ld bytes)", 
-                                base_header->type, msg_len);
-               goto leave;
-       }
-
-leave:
-       dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
-       dfsm_release_sync_resources(dfsm, NULL, 0);
-       return;
-}
-
-static gboolean 
-dfsm_resend_queue(dfsm_t *dfsm)
-{
-       g_return_val_if_fail(dfsm != NULL, FALSE);
-       g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
-
-       GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
-       GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
-       gboolean res = TRUE;
-
-       while (iter != end) {
-               GSequenceIter *cur = iter; 
-               iter = g_sequence_iter_next(iter);
-
-               dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
-                       g_sequence_get(cur);
-
-               if (qm->nodeid == dfsm->nodeid && qm->pid == dfsm->pid) {
-                       cpg_error_t result;
-                       struct iovec iov[1];
-                       iov[0].iov_base = qm->msg;
-                       iov[0].iov_len = qm->msg_len;
-
-                       if ((result = dfsm_send_message_full(dfsm, iov, 1, 1)) != CS_OK) {
-                               res = FALSE;                    
-                               break;
-                       }
-               }
-       }
-
-       dfsm_free_message_queue(dfsm);
-
-       return res;
-}
-
-static gboolean
-dfsm_deliver_sync_queue(dfsm_t *dfsm) 
-{
-       g_return_val_if_fail(dfsm != NULL, FALSE);
-
-       if (!dfsm->sync_queue)
-               return TRUE;
-
-       gboolean res = TRUE;
-
-       // fixme: cfs_debug
-       cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, 
-                       g_list_length(dfsm->sync_queue));
-
-       GList *iter = dfsm->sync_queue;
-       while (iter) {
-               dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data;
-               iter = g_list_next(iter);
-
-               if (res && dfsm->mode == DFSM_MODE_SYNCED) {            
-                       dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
-                                                 qm->nodeid, qm->pid, qm->msg, qm->msg_len);
-               } else {
-                       res = FALSE;
-               }
-
-               dfsm_free_queue_entry(qm);
-       }
-       g_list_free(dfsm->sync_queue);
-       dfsm->sync_queue = NULL;
-
-       return res;
-}
-
-static gboolean 
-dfsm_deliver_queue(dfsm_t *dfsm)
-{
-       g_return_val_if_fail(dfsm != NULL, FALSE);
-       g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE);
-
-       int qlen = g_sequence_get_length(dfsm->msg_queue);
-       if (!qlen)
-               return TRUE;
-
-       GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue);
-       GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue);
-       gboolean res = TRUE;
-
-       // fixme: cfs_debug
-       cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, qlen);
-
-       while (iter != end) {
-               GSequenceIter *cur = iter; 
-               iter = g_sequence_iter_next(iter);
-
-               dfsm_queued_message_t *qm = (dfsm_queued_message_t *)
-                       g_sequence_get(cur);
-       
-               dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, qm->nodeid, qm->pid);
-               if (!ni) {
-                       cfs_dom_message(dfsm->log_domain, "remove message from non-member %d/%d", 
-                                       qm->nodeid, qm->pid);
-                       dfsm_free_queue_entry(qm);
-                       g_sequence_remove(cur);
-                       continue;
-               }
-
-               if (dfsm->mode == DFSM_MODE_SYNCED) {
-                       if (ni->synced) {
-                               dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name,
-                                                         qm->nodeid, qm->pid, qm->msg, qm->msg_len);
-                               dfsm_free_queue_entry(qm);
-                               g_sequence_remove(cur);
-                       }
-               } else if (dfsm->mode == DFSM_MODE_UPDATE) {
-                       if (ni->synced) {
-                               dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm);
-                               g_sequence_remove(cur);
-                       }
-               } else {
-                       res = FALSE;
-                       break;
-               }
-       }
-
-       return res;
-}
-
-static void 
-dfsm_cpg_confchg_callback(
-       cpg_handle_t handle,
-       const struct cpg_name *group_name,
-       const struct cpg_address *member_list, 
-       size_t member_list_entries,
-       const struct cpg_address *left_list, 
-       size_t left_list_entries,
-       const struct cpg_address *joined_list, 
-       size_t joined_list_entries)
-{
-       cs_error_t result;
-
-       dfsm_t *dfsm = NULL;
-       result = cpg_context_get(handle, (gpointer *)&dfsm);
-       if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
-               cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
-               return; /* we have no valid dfsm pointer, so we can just ignore this */
-       }
-
-       dfsm->we_are_member = 0;
-
-       /* create new epoch */
-       dfsm->local_epoch_counter++;
-       dfsm->sync_epoch.epoch = dfsm->local_epoch_counter;
-       dfsm->sync_epoch.nodeid = dfsm->nodeid;
-       dfsm->sync_epoch.pid = dfsm->pid;
-       dfsm->sync_epoch.time = time(NULL);
-
-       /* invalidate saved checksum */
-       dfsm->csum_id = dfsm->csum_counter;
-       memset(&dfsm->csum_epoch, 0, sizeof(dfsm->csum_epoch));
-
-       dfsm_free_sync_queue(dfsm);
-
-       dfsm_mode_t mode = dfsm_get_mode(dfsm);
-
-       cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode);
-
-       if (mode >= DFSM_ERROR_MODE_START) {
-               cfs_dom_debug(dfsm->log_domain, "already left group - ignore message");
-               return;
-       }
-
-       int lowest_nodeid = 0;
-       GString *str = g_string_new("members: ");
-       for (int i = 0; i < member_list_entries; i++) {
-
-               g_string_append_printf(str, i ? ", %d/%d" : "%d/%d",
-                                      member_list[i].nodeid, member_list[i].pid);
-
-               if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid)
-                       lowest_nodeid =  member_list[i].nodeid;
-
-               if (member_list[i].nodeid == dfsm->nodeid &&
-                   member_list[i].pid == dfsm->pid)
-                       dfsm->we_are_member = 1;
-       }
-
-
-       if ((dfsm->we_are_member || mode != DFSM_MODE_START))
-               cfs_dom_message(dfsm->log_domain, str->str);
-
-       g_string_free(str, 1);
-
-       dfsm->lowest_nodeid = lowest_nodeid;
-
-       /* NOTE: one node can be in left and joined list at the same time,
-          so it is better to query member list. Also JOIN/LEAVE list are
-          different on different nodes!
-       */
-
-       dfsm_release_sync_resources(dfsm, member_list, member_list_entries);
-
-       if (!dfsm->we_are_member) {
-               if (mode == DFSM_MODE_START) {
-                       cfs_dom_debug(dfsm->log_domain, "ignore leave message");
-                       return;
-               }
-               cfs_dom_message(dfsm->log_domain, "we (%d/%d) left the process group", 
-                               dfsm->nodeid, dfsm->pid);
-               goto leave;
-       }
-
-       if (member_list_entries > 1) {
-
-               int qlen = g_sequence_get_length(dfsm->msg_queue);
-               if (joined_list_entries && qlen) {
-                       /* we need to make sure that all members have the same queue. */
-                       cfs_dom_message(dfsm->log_domain, "queue not emtpy - resening %d messages", qlen);
-                       if (!dfsm_resend_queue(dfsm)) {
-                               cfs_dom_critical(dfsm->log_domain, "dfsm_resend_queue failed");
-                               goto leave;
-                       }
-               }
-
-               dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC);
-               if (lowest_nodeid == dfsm->nodeid) {
-                       if (!dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0)) {
-                               cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message");
-                               goto leave;
-                       }
-               }
-       } else {
-               dfsm_set_mode(dfsm, DFSM_MODE_SYNCED);
-               dfsm->sync_info->local->synced = 1;
-               if (!dfsm_deliver_queue(dfsm))
-                       goto leave;
-       }
-
-       if (dfsm->dfsm_callbacks->dfsm_confchg_fn) 
-               dfsm->dfsm_callbacks->dfsm_confchg_fn(dfsm, dfsm->data, member_list, member_list_entries);
-
-       return;
-leave:
-       dfsm_set_mode(dfsm, DFSM_MODE_LEAVE);
-       return;
-}
-
-static cpg_callbacks_t cpg_callbacks = {
-       .cpg_deliver_fn = dfsm_cpg_deliver_callback,
-       .cpg_confchg_fn = dfsm_cpg_confchg_callback,
-};
-
-dfsm_t *
-dfsm_new(
-       gpointer data, 
-       const char *group_name, 
-       const char *log_domain,
-       guint32 protocol_version, 
-       dfsm_callbacks_t *callbacks)
-{
-       g_return_val_if_fail(sizeof(dfsm_message_header_t) == 16, NULL);
-       g_return_val_if_fail(sizeof(dfsm_message_state_header_t) == 32, NULL);
-       g_return_val_if_fail(sizeof(dfsm_message_normal_header_t) == 24, NULL); 
-
-       g_return_val_if_fail(callbacks != NULL, NULL);
-       g_return_val_if_fail(callbacks->dfsm_deliver_fn != NULL, NULL);
-
-       g_return_val_if_fail(callbacks->dfsm_get_state_fn != NULL, NULL);
-       g_return_val_if_fail(callbacks->dfsm_process_state_update_fn != NULL, NULL);
-       g_return_val_if_fail(callbacks->dfsm_process_update_fn != NULL, NULL);
-       g_return_val_if_fail(callbacks->dfsm_commit_fn != NULL, NULL);
-  
-       dfsm_t *dfsm;
-
-       if ((dfsm = g_new0(dfsm_t, 1)) == NULL)
-               return NULL;
-
-       if (!(dfsm->sync_mutex = g_mutex_new()))
-               goto err;
-
-       if (!(dfsm->sync_cond = g_cond_new())) 
-               goto err;
-
-       if (!(dfsm->results = g_hash_table_new(g_int64_hash, g_int64_equal)))
-               goto err;
-
-       if (!(dfsm->msg_queue = g_sequence_new(NULL))) 
-               goto err;
-               
-       dfsm->log_domain = g_strdup(log_domain);
-       dfsm->data = data;
-       dfsm->mode = DFSM_MODE_START;
-       dfsm->protocol_version = protocol_version;
-       strcpy (dfsm->cpg_group_name.value, group_name);
-       dfsm->cpg_group_name.length = strlen (group_name) + 1;
-
-       dfsm->cpg_callbacks = &cpg_callbacks;
-       dfsm->dfsm_callbacks = callbacks;
-
-       dfsm->members = g_hash_table_new(dfsm_sync_info_hash, dfsm_sync_info_equal);
-       if (!dfsm->members)
-               goto err;
-
-       if ((dfsm->mode_mutex = g_mutex_new()) == NULL)
-               goto err;
-
-       return dfsm;
-
-err:
-       dfsm_destroy(dfsm);
-       return NULL;
-}
-
-gboolean 
-dfsm_lowest_nodeid(dfsm_t *dfsm)
-{
-       g_return_val_if_fail(dfsm != NULL, FALSE);
-
-       if (dfsm->lowest_nodeid && (dfsm->lowest_nodeid == dfsm->nodeid))
-               return TRUE;
-
-       return FALSE;
-}
-
-cs_error_t 
-dfsm_verify_request(dfsm_t *dfsm)
-{
-       g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
-
-       /* only do when we have lowest nodeid */
-       if (!dfsm->lowest_nodeid || (dfsm->lowest_nodeid != dfsm->nodeid))
-               return CS_OK;
-
-       dfsm_mode_t mode = dfsm_get_mode(dfsm);
-       if (mode != DFSM_MODE_SYNCED)
-               return CS_OK;           
-
-       int len = 1;
-       struct iovec iov[len];
-
-       if (dfsm->csum_counter != dfsm->csum_id) {
-               g_message("delay verify request %016zX", dfsm->csum_counter + 1);
-               return CS_OK;
-       };
-
-       dfsm->csum_counter++;
-       iov[0].iov_base = (char *)&dfsm->csum_counter;
-       iov[0].iov_len = sizeof(dfsm->csum_counter);
-       
-       cfs_debug("send verify request %016zX", dfsm->csum_counter);
-
-       cs_error_t result;
-       result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
-
-       if (result != CS_OK)
-               cfs_dom_critical(dfsm->log_domain, "failed to send VERIFY_REQUEST message");
-
-       return result;
-}
-
-
-cs_error_t
-dfsm_dispatch(
-       dfsm_t *dfsm, 
-       cs_dispatch_flags_t dispatch_types) 
-{
-       g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
-       g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_INVALID_PARAM);
-
-       cs_error_t result;
-
-       struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
-       int retries = 0;
-loop:
-       result = cpg_dispatch(dfsm->cpg_handle, dispatch_types);
-       if (result == CPG_ERR_TRY_AGAIN) {
-               nanosleep(&tvreq, NULL);
-               ++retries;
-               if ((retries % 10) == 0)
-                       cfs_dom_message(dfsm->log_domain, "cpg_dispatch retry %d", retries);
-               goto loop;
-       }
-
-       if (!(result == CS_OK || result == CS_ERR_TRY_AGAIN)) {
-               cfs_dom_critical(dfsm->log_domain, "cpg_dispatch failed: %d", result);
-       }
-
-       return result;
-}
-
-
-cs_error_t
-dfsm_initialize(dfsm_t *dfsm, int *fd) 
-{
-       g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
-       g_return_val_if_fail(fd != NULL, CS_ERR_INVALID_PARAM);
-
-       /* remove old messages */
-       dfsm_free_message_queue(dfsm);
-       dfsm_send_sync_message_abort(dfsm);
-
-       dfsm->joined = FALSE;
-       dfsm->we_are_member = 0;
-
-       dfsm_set_mode(dfsm, DFSM_MODE_START);
-
-       cs_error_t result;
-
-       if (dfsm->cpg_handle == 0) {
-               if ((result = cpg_initialize(&dfsm->cpg_handle, dfsm->cpg_callbacks)) != CS_OK) {
-                       cfs_dom_critical(dfsm->log_domain, "cpg_initialize failed: %d", result);
-                       dfsm->cpg_handle = 0;
-                       goto fail;
-               }
-
-               if ((result = cpg_local_get(dfsm->cpg_handle, &dfsm->nodeid)) != CS_OK) {
-                       cfs_dom_critical(dfsm->log_domain, "cpg_local_get failed: %d", result);
-                       goto fail;
-               }
-
-               dfsm->pid = getpid();
-
-               result = cpg_context_set(dfsm->cpg_handle, dfsm);
-               if (result != CS_OK) {
-                       cfs_dom_critical(dfsm->log_domain, "cpg_context_set failed: %d", result);
-                       goto fail;
-               }
-       }
-
-       result = cpg_fd_get(dfsm->cpg_handle, fd);
-       if (result != CS_OK) {
-               cfs_dom_critical(dfsm->log_domain, "cpg_fd_get failed: %d", result);
-               goto fail;
-       }
-       
-       return CS_OK;
-
-fail:
-       if (dfsm->cpg_handle) 
-               cpg_finalize(dfsm->cpg_handle);
-       dfsm->cpg_handle = 0;
-       return result;
-}
-
-cs_error_t
-dfsm_join(dfsm_t *dfsm) 
-{
-       g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
-       g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_LIBRARY);
-       g_return_val_if_fail(dfsm->joined == 0, CS_ERR_EXIST);
-
-       cs_error_t result;
-
-       struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
-       int retries = 0;
-loop:
-       result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name); 
-       if (result == CPG_ERR_TRY_AGAIN) {
-               nanosleep(&tvreq, NULL);
-               ++retries;
-               if ((retries % 10) == 0)
-                       cfs_dom_message(dfsm->log_domain, "cpg_join retry %d", retries);
-               goto loop;
-       }
-
-       if (result != CS_OK) {
-               cfs_dom_critical(dfsm->log_domain, "cpg_join failed: %d", result);
-               return result;
-       }
-
-       dfsm->joined = TRUE;
-       return TRUE;
-}
-
-cs_error_t
-dfsm_leave (dfsm_t *dfsm)
-{
-       g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
-       g_return_val_if_fail(dfsm->joined, CS_ERR_NOT_EXIST);
-
-       cs_error_t result;
-
-       struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
-       int retries = 0;
-loop:
-       result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
-       if (result == CPG_ERR_TRY_AGAIN) {
-               nanosleep(&tvreq, NULL);
-               ++retries;
-               if ((retries % 10) == 0)
-                       cfs_dom_message(dfsm->log_domain, "cpg_leave retry %d", retries);
-               goto loop;
-       }
-
-       if (result != CS_OK) {
-               cfs_dom_critical(dfsm->log_domain, "cpg_leave failed: %d", result);
-               return result;
-       }
-
-       dfsm->joined = FALSE;
-
-       return TRUE;            
-}
-
-gboolean 
-dfsm_finalize(dfsm_t *dfsm)
-{
-       g_return_val_if_fail(dfsm != NULL, FALSE);
-
-       dfsm_send_sync_message_abort(dfsm);
-
-       if (dfsm->joined)
-               dfsm_leave(dfsm);
-
-       if (dfsm->cpg_handle) {
-               cpg_finalize(dfsm->cpg_handle);
-               dfsm->cpg_handle = 0;
-               dfsm->joined = FALSE;
-               dfsm->we_are_member = 0;
-       }
-
-       return TRUE;
-}
-
-void 
-dfsm_destroy(dfsm_t *dfsm)
-{
-       g_return_if_fail(dfsm != NULL);
-
-       dfsm_finalize(dfsm);
-
-       if (dfsm->sync_info && dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn)
-               dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info);
-
-       dfsm_free_sync_queue(dfsm);
-
-       if (dfsm->mode_mutex)
-               g_mutex_free (dfsm->mode_mutex);
-
-       if (dfsm->sync_mutex)
-               g_mutex_free (dfsm->sync_mutex);
-
-       if (dfsm->sync_cond)
-               g_cond_free (dfsm->sync_cond);
-       if (dfsm->results)
-               g_hash_table_destroy(dfsm->results);
-
-       if (dfsm->msg_queue) {
-               dfsm_free_message_queue(dfsm);
-               g_sequence_free(dfsm->msg_queue);
-       }
-
-       if (dfsm->sync_info)
-               g_free(dfsm->sync_info);
-
-       if (dfsm->cpg_handle)
-               cpg_finalize(dfsm->cpg_handle);
-
-       if (dfsm->members)
-               g_hash_table_destroy(dfsm->members);
-
-       if (dfsm->log_domain)
-               g_free(dfsm->log_domain);
-
-       g_free(dfsm);
-}
-
-typedef struct {
-       dfsm_t *dfsm;
-} service_dfsm_private_t;
-
-static gboolean 
-service_dfsm_finalize(
-       cfs_service_t *service,
-       gpointer context)
-{
-       g_return_val_if_fail(service != NULL, FALSE);
-       g_return_val_if_fail(context != NULL, FALSE);
-
-       service_dfsm_private_t *private = (service_dfsm_private_t *)context;
-       dfsm_t *dfsm = private->dfsm;
-
-       g_return_val_if_fail(dfsm != NULL, FALSE);
-
-       return dfsm_finalize(dfsm);
-}
-
-static int 
-service_dfsm_initialize(
-       cfs_service_t *service,
-       gpointer context)
-{
-       g_return_val_if_fail(service != NULL, -1);
-       g_return_val_if_fail(context != NULL, -1);
-
-       service_dfsm_private_t *private = (service_dfsm_private_t *)context;
-       dfsm_t *dfsm = private->dfsm;
-
-       g_return_val_if_fail(dfsm != NULL, -1);
-
-       /* serious internal error - don't try to recover */
-       if (!dfsm_restartable(dfsm))
-               return -1;
-       
-       int fd = -1;
-
-       cs_error_t result;
-       if ((result = dfsm_initialize(dfsm, &fd)) != CS_OK)
-               return -1;
-       
-       result = dfsm_join(dfsm);
-       if (result != CS_OK) {
-               /* we can't dispatch if not joined, so we need to finalize */
-               dfsm_finalize(dfsm);
-               return -1;
-       }
-
-       return fd;
-}
-
-static gboolean 
-service_dfsm_dispatch(
-       cfs_service_t *service,
-       gpointer context)
-{
-       g_return_val_if_fail(service != NULL, FALSE);
-       g_return_val_if_fail(context != NULL, FALSE);
-
-       service_dfsm_private_t *private = (service_dfsm_private_t *)context;
-       dfsm_t *dfsm = private->dfsm;
-
-       g_return_val_if_fail(dfsm != NULL, FALSE);
-       g_return_val_if_fail(dfsm->cpg_handle != 0, FALSE);
-
-       cs_error_t result;
-
-       result = dfsm_dispatch(dfsm, CPG_DISPATCH_ONE);
-       if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
-               goto finalize;
-       if (result != CS_OK)
-               goto fail;
-
-       dfsm_mode_t mode = dfsm_get_mode(dfsm);
-       if (mode >= DFSM_ERROR_MODE_START) {
-               if (dfsm->joined) {
-                       result = dfsm_leave(dfsm);
-                       if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
-                               goto finalize;
-                       if (result != CS_OK)
-                               goto finalize;
-               } else {
-                       if (!dfsm->we_are_member)
-                               return FALSE;
-               }
-       }
-
-       return TRUE;
-
-fail:
-       cfs_service_set_restartable(service, dfsm_restartable(dfsm));
-       return FALSE;
-
-finalize:
-       dfsm_finalize(dfsm);
-       goto fail;
-}
-
-static void 
-service_dfsm_timer(
-       cfs_service_t *service,
-       gpointer context)
-{
-       g_return_if_fail(service != NULL);
-       g_return_if_fail(context != NULL);
-
-       service_dfsm_private_t *private = (service_dfsm_private_t *)context;
-       dfsm_t *dfsm = private->dfsm;
-
-       g_return_if_fail(dfsm != NULL);
-
-       dfsm_verify_request(dfsm);
-}
-
-static cfs_service_callbacks_t cfs_dfsm_callbacks = {
-       .cfs_service_initialize_fn =  service_dfsm_initialize,
-       .cfs_service_finalize_fn = service_dfsm_finalize,
-       .cfs_service_dispatch_fn = service_dfsm_dispatch,
-       .cfs_service_timer_fn = service_dfsm_timer,
-};
-
-cfs_service_t *
-service_dfsm_new(dfsm_t *dfsm)
-{
-       cfs_service_t *service;
-
-       g_return_val_if_fail(dfsm != NULL, NULL);
-
-       service_dfsm_private_t *private = g_new0(service_dfsm_private_t, 1);
-       if (!private)
-               return NULL;
-
-       private->dfsm = dfsm;
-
-       service = cfs_service_new(&cfs_dfsm_callbacks, dfsm->log_domain, private); 
-
-       return service;
-}
-
-void 
-service_dfsm_destroy(cfs_service_t *service) 
-{
-       g_return_if_fail(service != NULL);
-
-       service_dfsm_private_t *private = 
-               (service_dfsm_private_t *)cfs_service_get_context(service);
-
-       g_free(private);
-       g_free(service);
-}
-
-
-
-