X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=data%2Fsrc%2Fdfsm.c;h=cdf473e8226ab9706d693a457ae70c0809afa0fa;hb=HEAD;hp=7922a2ec03ed7603a6c1e409b382272f8065122d;hpb=d3eeade6cb9e5b56cb10edae86b2d9fd27e64ccc;p=pve-cluster.git diff --git a/data/src/dfsm.c b/data/src/dfsm.c deleted file mode 100644 index 7922a2e..0000000 --- a/data/src/dfsm.c +++ /dev/null @@ -1,1694 +0,0 @@ -/* - Copyright (C) 2010 Proxmox Server Solutions GmbH - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . - - Author: Dietmar Maurer - -*/ - - -/* NOTE: we try to keep the CPG handle as long as possible, because - * calling cpg_initialize/cpg_finalize multiple times from the - * same process confuses corosync. - * Note: CS_ERR_LIBRARY is returned when corosync died - */ - -#ifdef HAVE_CONFIG_H -#include -#endif /* HAVE_CONFIG_H */ - -#include -#include -#include -#include - -#include -#include -#include - -#include "cfs-utils.h" -#include "dfsm.h" - -static cpg_callbacks_t cpg_callbacks; - -typedef enum { - DFSM_MODE_START = 0, - DFSM_MODE_START_SYNC = 1, - DFSM_MODE_SYNCED = 2, - DFSM_MODE_UPDATE = 3, - - /* values >= 128 indicates abnormal/error conditions */ - DFSM_ERROR_MODE_START = 128, - DFSM_MODE_LEAVE = 253, - DFSM_MODE_VERSION_ERROR = 254, - DFSM_MODE_ERROR = 255, -} dfsm_mode_t; - -typedef enum { - DFSM_MESSAGE_NORMAL = 0, - DFSM_MESSAGE_SYNC_START = 1, - DFSM_MESSAGE_STATE = 2, - DFSM_MESSAGE_UPDATE = 3, - DFSM_MESSAGE_UPDATE_COMPLETE = 4, - DFSM_MESSAGE_VERIFY_REQUEST = 5, - DFSM_MESSAGE_VERIFY = 6, -} dfsm_message_t; - -#define DFSM_VALID_STATE_MESSAGE(mt) (mt >= DFSM_MESSAGE_SYNC_START && mt <= DFSM_MESSAGE_VERIFY) - -typedef struct { - uint16_t type; - uint16_t subtype; - uint32_t protocol_version; - uint32_t time; - uint32_t reserved; -} dfsm_message_header_t; - -typedef struct { - uint32_t epoch; // per process (not globally unique) - uint32_t time; - uint32_t nodeid; - uint32_t pid; -} dfsm_sync_epoch_t; - -typedef struct { - dfsm_message_header_t base; - dfsm_sync_epoch_t epoch; -} dfsm_message_state_header_t; - -typedef struct { - dfsm_message_header_t base; - uint64_t count; -} dfsm_message_normal_header_t; - -typedef struct { - uint32_t nodeid; - uint32_t pid; - uint64_t msg_count; - void *msg; - int msg_len; // fixme: unsigned? -} dfsm_queued_message_t; - -struct dfsm { - char *log_domain; - cpg_callbacks_t *cpg_callbacks; - dfsm_callbacks_t *dfsm_callbacks; - cpg_handle_t cpg_handle; - struct cpg_name cpg_group_name; - uint32_t nodeid; - uint32_t pid; - int we_are_member; - - guint32 protocol_version; - gpointer data; - - gboolean joined; - - /* mode is protected with mode_mutex */ - GMutex *mode_mutex; - dfsm_mode_t mode; - - GHashTable *members; /* contains dfsm_node_info_t pointers */ - dfsm_sync_info_t *sync_info; - uint32_t local_epoch_counter; - dfsm_sync_epoch_t sync_epoch; - uint32_t lowest_nodeid; - GSequence *msg_queue; - GList *sync_queue; - - /* synchrounous message transmission, protected with sync_mutex */ - GMutex *sync_mutex; - GCond *sync_cond; - GHashTable *results; - uint64_t msgcount; - uint64_t msgcount_rcvd; - - /* state verification */ - guchar csum[32]; - dfsm_sync_epoch_t csum_epoch; - uint64_t csum_id; - uint64_t csum_counter; -}; - -static gboolean dfsm_deliver_queue(dfsm_t *dfsm); -static gboolean dfsm_deliver_sync_queue(dfsm_t *dfsm); - -gboolean -dfsm_nodeid_is_local( - dfsm_t *dfsm, - uint32_t nodeid, - uint32_t pid) -{ - g_return_val_if_fail(dfsm != NULL, FALSE); - - return (nodeid == dfsm->nodeid && pid == dfsm->pid); -} - - -static void -dfsm_send_sync_message_abort(dfsm_t *dfsm) -{ - g_return_if_fail(dfsm != NULL); - g_return_if_fail(dfsm->sync_mutex != NULL); - g_return_if_fail(dfsm->sync_cond != NULL); - - g_mutex_lock (dfsm->sync_mutex); - dfsm->msgcount_rcvd = dfsm->msgcount; - g_cond_broadcast (dfsm->sync_cond); - g_mutex_unlock (dfsm->sync_mutex); -} - -static void -dfsm_record_local_result( - dfsm_t *dfsm, - uint64_t msg_count, - int msg_result, - gboolean processed) -{ - g_return_if_fail(dfsm != NULL); - g_return_if_fail(dfsm->sync_mutex != NULL); - g_return_if_fail(dfsm->sync_cond != NULL); - g_return_if_fail(dfsm->results != NULL); - - g_mutex_lock (dfsm->sync_mutex); - dfsm_result_t *rp = (dfsm_result_t *)g_hash_table_lookup(dfsm->results, &msg_count); - if (rp) { - rp->result = msg_result; - rp->processed = processed; - } - dfsm->msgcount_rcvd = msg_count; - g_cond_broadcast (dfsm->sync_cond); - g_mutex_unlock (dfsm->sync_mutex); -} - -static cpg_error_t -dfsm_send_message_full( - dfsm_t *dfsm, - struct iovec *iov, - unsigned int len, - int retry) -{ - g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM); - g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM); - - struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 }; - cpg_error_t result; - int retries = 0; -loop: - result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len); - if (retry && result == CPG_ERR_TRY_AGAIN) { - nanosleep(&tvreq, NULL); - ++retries; - if ((retries % 10) == 0) - cfs_dom_message(dfsm->log_domain, "cpg_send_message retry %d", retries); - if (retries < 100) - goto loop; - } - - if (retries) - cfs_dom_message(dfsm->log_domain, "cpg_send_message retried %d times", retries); - - if (result != CS_OK && - (!retry || result != CPG_ERR_TRY_AGAIN)) - cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result); - - return result; -} - -static cpg_error_t -dfsm_send_state_message_full( - dfsm_t *dfsm, - uint16_t type, - struct iovec *iov, - unsigned int len) -{ - g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM); - g_return_val_if_fail(DFSM_VALID_STATE_MESSAGE(type), CS_ERR_INVALID_PARAM); - g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM); - - dfsm_message_state_header_t header; - header.base.type = type; - header.base.subtype = 0; - header.base.protocol_version = dfsm->protocol_version; - header.base.time = time(NULL); - header.base.reserved = 0; - - header.epoch = dfsm->sync_epoch; - - struct iovec real_iov[len + 1]; - - real_iov[0].iov_base = (char *)&header; - real_iov[0].iov_len = sizeof(header); - - for (int i = 0; i < len; i++) - real_iov[i + 1] = iov[i]; - - return dfsm_send_message_full(dfsm, real_iov, len + 1, 1); -} - -cpg_error_t -dfsm_send_update( - dfsm_t *dfsm, - struct iovec *iov, - unsigned int len) -{ - return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE, iov, len); -} - -cpg_error_t -dfsm_send_update_complete(dfsm_t *dfsm) -{ - return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE_COMPLETE, NULL, 0); -} - - -cpg_error_t -dfsm_send_message( - dfsm_t *dfsm, - uint16_t msgtype, - struct iovec *iov, - int len) -{ - return dfsm_send_message_sync(dfsm, msgtype, iov, len, NULL); -} - -cpg_error_t -dfsm_send_message_sync( - dfsm_t *dfsm, - uint16_t msgtype, - struct iovec *iov, - int len, - dfsm_result_t *rp) -{ - g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM); - g_return_val_if_fail(dfsm->sync_mutex != NULL, CS_ERR_INVALID_PARAM); - g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM); - - g_mutex_lock (dfsm->sync_mutex); - /* note: hold lock until message is sent - to guarantee ordering */ - uint64_t msgcount = ++dfsm->msgcount; - if (rp) { - rp->msgcount = msgcount; - rp->processed = 0; - g_hash_table_replace(dfsm->results, &rp->msgcount, rp); - } - - dfsm_message_normal_header_t header; - header.base.type = DFSM_MESSAGE_NORMAL; - header.base.subtype = msgtype; - header.base.protocol_version = dfsm->protocol_version; - header.base.time = time(NULL); - header.base.reserved = 0; - header.count = msgcount; - - struct iovec real_iov[len + 1]; - - real_iov[0].iov_base = (char *)&header; - real_iov[0].iov_len = sizeof(header); - - for (int i = 0; i < len; i++) - real_iov[i + 1] = iov[i]; - - cpg_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1); - - g_mutex_unlock (dfsm->sync_mutex); - - if (result != CS_OK) { - cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result); - - if (rp) { - g_mutex_lock (dfsm->sync_mutex); - g_hash_table_remove(dfsm->results, &rp->msgcount); - g_mutex_unlock (dfsm->sync_mutex); - } - return result; - } - - if (rp) { - g_mutex_lock (dfsm->sync_mutex); - - while (dfsm->msgcount_rcvd < msgcount) - g_cond_wait (dfsm->sync_cond, dfsm->sync_mutex); - - - g_hash_table_remove(dfsm->results, &rp->msgcount); - - g_mutex_unlock (dfsm->sync_mutex); - - return rp->processed ? CS_OK : CS_ERR_FAILED_OPERATION; - } - - return CS_OK; -} - -static gboolean -dfsm_send_checksum(dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, FALSE); - - int len = 2; - struct iovec iov[len]; - - iov[0].iov_base = (char *)&dfsm->csum_id; - iov[0].iov_len = sizeof(dfsm->csum_id); - iov[1].iov_base = dfsm->csum; - iov[1].iov_len = sizeof(dfsm->csum); - - gboolean res = (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY, iov, len) == CS_OK); - - return res; -} - -static void -dfsm_free_queue_entry(gpointer data) -{ - dfsm_queued_message_t *qm = (dfsm_queued_message_t *)data; - g_free (qm->msg); - g_free (qm); -} - -static void -dfsm_free_message_queue(dfsm_t *dfsm) -{ - g_return_if_fail(dfsm != NULL); - g_return_if_fail(dfsm->msg_queue != NULL); - - GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue); - GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue); - while (iter != end) { - GSequenceIter *cur = iter; - iter = g_sequence_iter_next(iter); - dfsm_queued_message_t *qm = (dfsm_queued_message_t *) - g_sequence_get(cur); - dfsm_free_queue_entry(qm); - g_sequence_remove(cur); - } -} - -static void -dfsm_free_sync_queue(dfsm_t *dfsm) -{ - g_return_if_fail(dfsm != NULL); - - GList *iter = dfsm->sync_queue; - while (iter) { - dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data; - iter = g_list_next(iter); - dfsm_free_queue_entry(qm); - } - - g_list_free(dfsm->sync_queue); - dfsm->sync_queue = NULL; -} - -static gint -message_queue_sort_fn( - gconstpointer a, - gconstpointer b, - gpointer user_data) -{ - return ((dfsm_queued_message_t *)a)->msg_count - - ((dfsm_queued_message_t *)b)->msg_count; -} - -static dfsm_node_info_t * -dfsm_node_info_lookup( - dfsm_t *dfsm, - uint32_t nodeid, - uint32_t pid) -{ - g_return_val_if_fail(dfsm != NULL, NULL); - g_return_val_if_fail(dfsm->members != NULL, NULL); - - dfsm_node_info_t info = { .nodeid = nodeid, .pid = pid }; - - return (dfsm_node_info_t *)g_hash_table_lookup(dfsm->members, &info); -} - -static dfsm_queued_message_t * -dfsm_queue_add_message( - dfsm_t *dfsm, - uint32_t nodeid, - uint32_t pid, - uint64_t msg_count, - const void *msg, - size_t msg_len) -{ - g_return_val_if_fail(dfsm != NULL, NULL); - g_return_val_if_fail(msg != NULL, NULL); - g_return_val_if_fail(msg_len != 0, NULL); - - dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, nodeid, pid); - if (!ni) { - cfs_dom_critical(dfsm->log_domain, "dfsm_node_info_lookup failed"); - return NULL; - } - - dfsm_queued_message_t *qm = g_new0(dfsm_queued_message_t, 1); - g_return_val_if_fail(qm != NULL, NULL); - - qm->nodeid = nodeid; - qm->pid = pid; - qm->msg = g_memdup (msg, msg_len); - qm->msg_len = msg_len; - qm->msg_count = msg_count; - - if (dfsm->mode == DFSM_MODE_UPDATE && ni->synced) { - dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm); - } else { - /* NOTE: we only need to sort the queue because we resend all - * queued messages sometimes. - */ - g_sequence_insert_sorted(dfsm->msg_queue, qm, message_queue_sort_fn, NULL); - } - - return qm; -} - -static guint -dfsm_sync_info_hash(gconstpointer key) -{ - dfsm_node_info_t *info = (dfsm_node_info_t *)key; - - return g_int_hash(&info->nodeid) + g_int_hash(&info->pid); -} - -static gboolean -dfsm_sync_info_equal( - gconstpointer v1, - gconstpointer v2) -{ - dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1; - dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2; - - if (info1->nodeid == info2->nodeid && - info1->pid == info2->pid) - return TRUE; - - return FALSE; -} - -static int -dfsm_sync_info_compare( - gconstpointer v1, - gconstpointer v2) -{ - dfsm_node_info_t *info1 = (dfsm_node_info_t *)v1; - dfsm_node_info_t *info2 = (dfsm_node_info_t *)v2; - - if (info1->nodeid != info2->nodeid) - return info1->nodeid - info2->nodeid; - - return info1->pid - info2->pid; -} - -static void -dfsm_set_mode( - dfsm_t *dfsm, - dfsm_mode_t new_mode) -{ - g_return_if_fail(dfsm != NULL); - - cfs_debug("dfsm_set_mode - set mode to %d", new_mode); - - int changed = 0; - g_mutex_lock (dfsm->mode_mutex); - if (dfsm->mode != new_mode) { - if (new_mode < DFSM_ERROR_MODE_START || - (dfsm->mode < DFSM_ERROR_MODE_START || new_mode >= dfsm->mode)) { - dfsm->mode = new_mode; - changed = 1; - } - } - g_mutex_unlock (dfsm->mode_mutex); - - if (!changed) - return; - - if (new_mode == DFSM_MODE_START) { - cfs_dom_message(dfsm->log_domain, "start cluster connection"); - } else if (new_mode == DFSM_MODE_START_SYNC) { - cfs_dom_message(dfsm->log_domain, "starting data syncronisation"); - } else if (new_mode == DFSM_MODE_SYNCED) { - cfs_dom_message(dfsm->log_domain, "all data is up to date"); - if (dfsm->dfsm_callbacks->dfsm_synced_fn) - dfsm->dfsm_callbacks->dfsm_synced_fn(dfsm); - } else if (new_mode == DFSM_MODE_UPDATE) { - cfs_dom_message(dfsm->log_domain, "waiting for updates from leader"); - } else if (new_mode == DFSM_MODE_LEAVE) { - cfs_dom_critical(dfsm->log_domain, "leaving CPG group"); - } else if (new_mode == DFSM_MODE_ERROR) { - cfs_dom_critical(dfsm->log_domain, "serious internal error - stop cluster connection"); - } else if (new_mode == DFSM_MODE_VERSION_ERROR) { - cfs_dom_critical(dfsm->log_domain, "detected newer protocol - please update this node"); - } -} - -static dfsm_mode_t -dfsm_get_mode(dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, DFSM_MODE_ERROR); - - g_mutex_lock (dfsm->mode_mutex); - dfsm_mode_t mode = dfsm->mode; - g_mutex_unlock (dfsm->mode_mutex); - - return mode; -} - -gboolean -dfsm_restartable(dfsm_t *dfsm) -{ - dfsm_mode_t mode = dfsm_get_mode(dfsm); - - return !(mode == DFSM_MODE_ERROR || - mode == DFSM_MODE_VERSION_ERROR); -} - -void -dfsm_set_errormode(dfsm_t *dfsm) -{ - dfsm_set_mode(dfsm, DFSM_MODE_ERROR); -} - -static void -dfsm_release_sync_resources( - dfsm_t *dfsm, - const struct cpg_address *member_list, - size_t member_list_entries) -{ - g_return_if_fail(dfsm != NULL); - g_return_if_fail(dfsm->members != NULL); - g_return_if_fail(!member_list_entries || member_list != NULL); - - cfs_debug("enter dfsm_release_sync_resources"); - - if (dfsm->sync_info) { - - if (dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn) { - dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info); - dfsm->sync_info->data = NULL; - } - - for (int i = 0; i < dfsm->sync_info->node_count; i++) { - if (dfsm->sync_info->nodes[i].state) { - g_free(dfsm->sync_info->nodes[i].state); - dfsm->sync_info->nodes[i].state = NULL; - dfsm->sync_info->nodes[i].state_len = 0; - } - } - } - - if (member_list) { - - g_hash_table_remove_all(dfsm->members); - - if (dfsm->sync_info) - g_free(dfsm->sync_info); - - int size = sizeof(dfsm_sync_info_t) + - member_list_entries*sizeof(dfsm_sync_info_t); - dfsm_sync_info_t *sync_info = dfsm->sync_info = g_malloc0(size); - sync_info->node_count = member_list_entries; - - for (int i = 0; i < member_list_entries; i++) { - sync_info->nodes[i].nodeid = member_list[i].nodeid; - sync_info->nodes[i].pid = member_list[i].pid; - } - - qsort(sync_info->nodes, member_list_entries, sizeof(dfsm_node_info_t), - dfsm_sync_info_compare); - - for (int i = 0; i < member_list_entries; i++) { - dfsm_node_info_t *info = &sync_info->nodes[i]; - g_hash_table_insert(dfsm->members, info, info); - if (info->nodeid == dfsm->nodeid && info->pid == dfsm->pid) - sync_info->local = info; - } - } -} - -static void -dfsm_cpg_deliver_callback( - cpg_handle_t handle, - const struct cpg_name *group_name, - uint32_t nodeid, - uint32_t pid, - void *msg, - size_t msg_len) -{ - cs_error_t result; - - dfsm_t *dfsm = NULL; - result = cpg_context_get(handle, (gpointer *)&dfsm); - if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) { - cfs_critical("cpg_context_get error: %d (%p)", result, dfsm); - return; /* we have no valid dfsm pointer, so we can just ignore this */ - } - dfsm_mode_t mode = dfsm_get_mode(dfsm); - - cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode); - - if (mode >= DFSM_ERROR_MODE_START) { - cfs_dom_debug(dfsm->log_domain, "error mode - ignoring message"); - goto leave; - } - - if (!dfsm->sync_info) { - cfs_dom_critical(dfsm->log_domain, "no dfsm_sync_info - internal error"); - goto leave; - } - - if (msg_len < sizeof(dfsm_message_header_t)) { - cfs_dom_critical(dfsm->log_domain, "received short message (%ld bytes)", msg_len); - goto leave; - } - - dfsm_message_header_t *base_header = (dfsm_message_header_t *)msg; - - if (base_header->protocol_version > dfsm->protocol_version) { - cfs_dom_critical(dfsm->log_domain, "received message with protocol version %d", - base_header->protocol_version); - dfsm_set_mode(dfsm, DFSM_MODE_VERSION_ERROR); - return; - } else if (base_header->protocol_version < dfsm->protocol_version) { - cfs_dom_message(dfsm->log_domain, "ignore message with wrong protocol version %d", - base_header->protocol_version); - return; - } - - if (base_header->type == DFSM_MESSAGE_NORMAL) { - - dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg; - - if (msg_len < sizeof(dfsm_message_normal_header_t)) { - cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %ld bytes)", - base_header->type, base_header->subtype, msg_len); - goto leave; - } - - if (mode != DFSM_MODE_SYNCED) { - cfs_dom_debug(dfsm->log_domain, "queue message %zu (subtype = %d, length = %ld)", - header->count, base_header->subtype, msg_len); - - if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len)) - goto leave; - } else { - - int msg_res = -1; - int res = dfsm->dfsm_callbacks->dfsm_deliver_fn( - dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype, - base_header->time, msg + sizeof(dfsm_message_normal_header_t), - msg_len - sizeof(dfsm_message_normal_header_t)); - - if (nodeid == dfsm->nodeid && pid == dfsm->pid) - dfsm_record_local_result(dfsm, header->count, msg_res, res); - - if (res < 0) - goto leave; - } - - return; - } - - /* state related messages - * we needs right epoch - else we simply discard the message - */ - - dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg; - - if (msg_len < sizeof(dfsm_message_state_header_t)) { - cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %ld bytes)", - base_header->type, base_header->subtype, msg_len); - goto leave; - } - - if (base_header->type != DFSM_MESSAGE_SYNC_START && - (memcmp(&header->epoch, &dfsm->sync_epoch, sizeof(dfsm_sync_epoch_t)) != 0)) { - cfs_dom_debug(dfsm->log_domain, "ignore message (msg_type == %d) with " - "wrong epoch (epoch %d/%d/%08X)", base_header->type, - header->epoch.nodeid, header->epoch.pid, header->epoch.epoch); - return; - } - - msg += sizeof(dfsm_message_state_header_t); - msg_len -= sizeof(dfsm_message_state_header_t); - - if (mode == DFSM_MODE_SYNCED) { - if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) { - - for (int i = 0; i < dfsm->sync_info->node_count; i++) - dfsm->sync_info->nodes[i].synced = 1; - - if (!dfsm_deliver_queue(dfsm)) - goto leave; - - return; - - } else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) { - - if (msg_len != sizeof(dfsm->csum_counter)) { - cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len, nodeid, pid); - goto leave; - } - - uint64_t csum_id = *((uint64_t *)msg); - msg += 8; msg_len -= 8; - - cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016zX", nodeid, csum_id); - - if (dfsm->dfsm_callbacks->dfsm_checksum_fn) { - if (!dfsm->dfsm_callbacks->dfsm_checksum_fn( - dfsm, dfsm->data, dfsm->csum, sizeof(dfsm->csum))) { - cfs_dom_critical(dfsm->log_domain, "unable to compute data checksum"); - goto leave; - } - - dfsm->csum_epoch = header->epoch; - dfsm->csum_id = csum_id; - - if (nodeid == dfsm->nodeid && pid == dfsm->pid) { - if (!dfsm_send_checksum(dfsm)) - goto leave; - } - } - - return; - - } else if (base_header->type == DFSM_MESSAGE_VERIFY) { - - cfs_dom_debug(dfsm->log_domain, "received verify message"); - - if (dfsm->dfsm_callbacks->dfsm_checksum_fn) { - - if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) { - cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%ld bytes)", msg_len); - goto leave; - } - - uint64_t csum_id = *((uint64_t *)msg); - msg += 8; msg_len -= 8; - - if (dfsm->csum_id == csum_id && - (memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) { - if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) { - cfs_dom_critical(dfsm->log_domain, "wrong checksum %016zX != %016zX - restarting", - *(uint64_t *)msg, *(uint64_t *)dfsm->csum); - goto leave; - } else { - cfs_dom_message(dfsm->log_domain, "data verification successful"); - } - } else { - cfs_dom_message(dfsm->log_domain, "skip verification - no checksum saved"); - } - } - - return; - - } else { - /* ignore (we already got all required updates, or we are leader) */ - cfs_dom_debug(dfsm->log_domain, "ignore state sync message %d", - base_header->type); - return; - } - - } else if (mode == DFSM_MODE_START_SYNC) { - - if (base_header->type == DFSM_MESSAGE_SYNC_START) { - - if (nodeid != dfsm->lowest_nodeid) { - cfs_dom_critical(dfsm->log_domain, "ignore sync request from wrong member %d/%d", - nodeid, pid); - } - - cfs_dom_message(dfsm->log_domain, "received sync request (epoch %d/%d/%08X)", - header->epoch.nodeid, header->epoch.pid, header->epoch.epoch); - - dfsm->sync_epoch = header->epoch; - - dfsm_release_sync_resources(dfsm, NULL, 0); - - unsigned int state_len = 0; - gpointer state = NULL; - - state = dfsm->dfsm_callbacks->dfsm_get_state_fn(dfsm, dfsm->data, &state_len); - - if (!(state && state_len)) { - cfs_dom_critical(dfsm->log_domain, "dfsm_get_state_fn failed"); - goto leave; - } - - struct iovec iov[1]; - iov[0].iov_base = state; - iov[0].iov_len = state_len; - - result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_STATE, iov, 1); - - if (state) - g_free(state); - - if (result != CS_OK) - goto leave; - - return; - - } else if (base_header->type == DFSM_MESSAGE_STATE) { - - dfsm_node_info_t *ni; - - if (!(ni = dfsm_node_info_lookup(dfsm, nodeid, pid))) { - cfs_dom_critical(dfsm->log_domain, "received state for non-member %d/%d", nodeid, pid); - goto leave; - } - - if (ni->state) { - cfs_dom_critical(dfsm->log_domain, "received duplicate state for member %d/%d", nodeid, pid); - goto leave; - } - - ni->state = g_memdup(msg, msg_len); - ni->state_len = msg_len; - - int received_all = 1; - for (int i = 0; i < dfsm->sync_info->node_count; i++) { - if (!dfsm->sync_info->nodes[i].state) { - received_all = 0; - break; - } - } - - if (received_all) { - cfs_dom_message(dfsm->log_domain, "received all states"); - - int res = dfsm->dfsm_callbacks->dfsm_process_state_update_fn(dfsm, dfsm->data, dfsm->sync_info); - if (res < 0) - goto leave; - - if (dfsm->sync_info->local->synced) { - dfsm_set_mode(dfsm, DFSM_MODE_SYNCED); - dfsm_release_sync_resources(dfsm, NULL, 0); - - if (!dfsm_deliver_queue(dfsm)) - goto leave; - - } else { - dfsm_set_mode(dfsm, DFSM_MODE_UPDATE); - - if (!dfsm_deliver_queue(dfsm)) - goto leave; - } - - } - - return; - } - - } else if (mode == DFSM_MODE_UPDATE) { - - if (base_header->type == DFSM_MESSAGE_UPDATE) { - - int res = dfsm->dfsm_callbacks->dfsm_process_update_fn( - dfsm, dfsm->data, dfsm->sync_info, nodeid, pid, msg, msg_len); - - if (res < 0) - goto leave; - - return; - - } else if (base_header->type == DFSM_MESSAGE_UPDATE_COMPLETE) { - - - int res = dfsm->dfsm_callbacks->dfsm_commit_fn(dfsm, dfsm->data, dfsm->sync_info); - - if (res < 0) - goto leave; - - for (int i = 0; i < dfsm->sync_info->node_count; i++) - dfsm->sync_info->nodes[i].synced = 1; - - dfsm_set_mode(dfsm, DFSM_MODE_SYNCED); - - if (!dfsm_deliver_sync_queue(dfsm)) - goto leave; - - if (!dfsm_deliver_queue(dfsm)) - goto leave; - - dfsm_release_sync_resources(dfsm, NULL, 0); - - return; - } - - } else { - cfs_dom_critical(dfsm->log_domain, "internal error - unknown mode %d", mode); - goto leave; - } - - if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST || - base_header->type == DFSM_MESSAGE_VERIFY) { - - cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type); - - } else { - cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %ld bytes)", - base_header->type, msg_len); - goto leave; - } - -leave: - dfsm_set_mode(dfsm, DFSM_MODE_LEAVE); - dfsm_release_sync_resources(dfsm, NULL, 0); - return; -} - -static gboolean -dfsm_resend_queue(dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, FALSE); - g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE); - - GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue); - GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue); - gboolean res = TRUE; - - while (iter != end) { - GSequenceIter *cur = iter; - iter = g_sequence_iter_next(iter); - - dfsm_queued_message_t *qm = (dfsm_queued_message_t *) - g_sequence_get(cur); - - if (qm->nodeid == dfsm->nodeid && qm->pid == dfsm->pid) { - cpg_error_t result; - struct iovec iov[1]; - iov[0].iov_base = qm->msg; - iov[0].iov_len = qm->msg_len; - - if ((result = dfsm_send_message_full(dfsm, iov, 1, 1)) != CS_OK) { - res = FALSE; - break; - } - } - } - - dfsm_free_message_queue(dfsm); - - return res; -} - -static gboolean -dfsm_deliver_sync_queue(dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, FALSE); - - if (!dfsm->sync_queue) - return TRUE; - - gboolean res = TRUE; - - // fixme: cfs_debug - cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, - g_list_length(dfsm->sync_queue)); - - GList *iter = dfsm->sync_queue; - while (iter) { - dfsm_queued_message_t *qm = (dfsm_queued_message_t *)iter->data; - iter = g_list_next(iter); - - if (res && dfsm->mode == DFSM_MODE_SYNCED) { - dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name, - qm->nodeid, qm->pid, qm->msg, qm->msg_len); - } else { - res = FALSE; - } - - dfsm_free_queue_entry(qm); - } - g_list_free(dfsm->sync_queue); - dfsm->sync_queue = NULL; - - return res; -} - -static gboolean -dfsm_deliver_queue(dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, FALSE); - g_return_val_if_fail(dfsm->msg_queue != NULL, FALSE); - - int qlen = g_sequence_get_length(dfsm->msg_queue); - if (!qlen) - return TRUE; - - GSequenceIter *iter = g_sequence_get_begin_iter(dfsm->msg_queue); - GSequenceIter *end = g_sequence_get_end_iter(dfsm->msg_queue); - gboolean res = TRUE; - - // fixme: cfs_debug - cfs_dom_message(dfsm->log_domain, "%s: queue length %d", __func__, qlen); - - while (iter != end) { - GSequenceIter *cur = iter; - iter = g_sequence_iter_next(iter); - - dfsm_queued_message_t *qm = (dfsm_queued_message_t *) - g_sequence_get(cur); - - dfsm_node_info_t *ni = dfsm_node_info_lookup(dfsm, qm->nodeid, qm->pid); - if (!ni) { - cfs_dom_message(dfsm->log_domain, "remove message from non-member %d/%d", - qm->nodeid, qm->pid); - dfsm_free_queue_entry(qm); - g_sequence_remove(cur); - continue; - } - - if (dfsm->mode == DFSM_MODE_SYNCED) { - if (ni->synced) { - dfsm_cpg_deliver_callback(dfsm->cpg_handle, &dfsm->cpg_group_name, - qm->nodeid, qm->pid, qm->msg, qm->msg_len); - dfsm_free_queue_entry(qm); - g_sequence_remove(cur); - } - } else if (dfsm->mode == DFSM_MODE_UPDATE) { - if (ni->synced) { - dfsm->sync_queue = g_list_append(dfsm->sync_queue, qm); - g_sequence_remove(cur); - } - } else { - res = FALSE; - break; - } - } - - return res; -} - -static void -dfsm_cpg_confchg_callback( - cpg_handle_t handle, - const struct cpg_name *group_name, - const struct cpg_address *member_list, - size_t member_list_entries, - const struct cpg_address *left_list, - size_t left_list_entries, - const struct cpg_address *joined_list, - size_t joined_list_entries) -{ - cs_error_t result; - - dfsm_t *dfsm = NULL; - result = cpg_context_get(handle, (gpointer *)&dfsm); - if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) { - cfs_critical("cpg_context_get error: %d (%p)", result, dfsm); - return; /* we have no valid dfsm pointer, so we can just ignore this */ - } - - dfsm->we_are_member = 0; - - /* create new epoch */ - dfsm->local_epoch_counter++; - dfsm->sync_epoch.epoch = dfsm->local_epoch_counter; - dfsm->sync_epoch.nodeid = dfsm->nodeid; - dfsm->sync_epoch.pid = dfsm->pid; - dfsm->sync_epoch.time = time(NULL); - - /* invalidate saved checksum */ - dfsm->csum_id = dfsm->csum_counter; - memset(&dfsm->csum_epoch, 0, sizeof(dfsm->csum_epoch)); - - dfsm_free_sync_queue(dfsm); - - dfsm_mode_t mode = dfsm_get_mode(dfsm); - - cfs_dom_debug(dfsm->log_domain, "dfsm mode is %d", mode); - - if (mode >= DFSM_ERROR_MODE_START) { - cfs_dom_debug(dfsm->log_domain, "already left group - ignore message"); - return; - } - - int lowest_nodeid = 0; - GString *str = g_string_new("members: "); - for (int i = 0; i < member_list_entries; i++) { - - g_string_append_printf(str, i ? ", %d/%d" : "%d/%d", - member_list[i].nodeid, member_list[i].pid); - - if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid) - lowest_nodeid = member_list[i].nodeid; - - if (member_list[i].nodeid == dfsm->nodeid && - member_list[i].pid == dfsm->pid) - dfsm->we_are_member = 1; - } - - - if ((dfsm->we_are_member || mode != DFSM_MODE_START)) - cfs_dom_message(dfsm->log_domain, str->str); - - g_string_free(str, 1); - - dfsm->lowest_nodeid = lowest_nodeid; - - /* NOTE: one node can be in left and joined list at the same time, - so it is better to query member list. Also JOIN/LEAVE list are - different on different nodes! - */ - - dfsm_release_sync_resources(dfsm, member_list, member_list_entries); - - if (!dfsm->we_are_member) { - if (mode == DFSM_MODE_START) { - cfs_dom_debug(dfsm->log_domain, "ignore leave message"); - return; - } - cfs_dom_message(dfsm->log_domain, "we (%d/%d) left the process group", - dfsm->nodeid, dfsm->pid); - goto leave; - } - - if (member_list_entries > 1) { - - int qlen = g_sequence_get_length(dfsm->msg_queue); - if (joined_list_entries && qlen) { - /* we need to make sure that all members have the same queue. */ - cfs_dom_message(dfsm->log_domain, "queue not emtpy - resening %d messages", qlen); - if (!dfsm_resend_queue(dfsm)) { - cfs_dom_critical(dfsm->log_domain, "dfsm_resend_queue failed"); - goto leave; - } - } - - dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC); - if (lowest_nodeid == dfsm->nodeid) { - if (!dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0)) { - cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message"); - goto leave; - } - } - } else { - dfsm_set_mode(dfsm, DFSM_MODE_SYNCED); - dfsm->sync_info->local->synced = 1; - if (!dfsm_deliver_queue(dfsm)) - goto leave; - } - - if (dfsm->dfsm_callbacks->dfsm_confchg_fn) - dfsm->dfsm_callbacks->dfsm_confchg_fn(dfsm, dfsm->data, member_list, member_list_entries); - - return; -leave: - dfsm_set_mode(dfsm, DFSM_MODE_LEAVE); - return; -} - -static cpg_callbacks_t cpg_callbacks = { - .cpg_deliver_fn = dfsm_cpg_deliver_callback, - .cpg_confchg_fn = dfsm_cpg_confchg_callback, -}; - -dfsm_t * -dfsm_new( - gpointer data, - const char *group_name, - const char *log_domain, - guint32 protocol_version, - dfsm_callbacks_t *callbacks) -{ - g_return_val_if_fail(sizeof(dfsm_message_header_t) == 16, NULL); - g_return_val_if_fail(sizeof(dfsm_message_state_header_t) == 32, NULL); - g_return_val_if_fail(sizeof(dfsm_message_normal_header_t) == 24, NULL); - - g_return_val_if_fail(callbacks != NULL, NULL); - g_return_val_if_fail(callbacks->dfsm_deliver_fn != NULL, NULL); - - g_return_val_if_fail(callbacks->dfsm_get_state_fn != NULL, NULL); - g_return_val_if_fail(callbacks->dfsm_process_state_update_fn != NULL, NULL); - g_return_val_if_fail(callbacks->dfsm_process_update_fn != NULL, NULL); - g_return_val_if_fail(callbacks->dfsm_commit_fn != NULL, NULL); - - dfsm_t *dfsm; - - if ((dfsm = g_new0(dfsm_t, 1)) == NULL) - return NULL; - - if (!(dfsm->sync_mutex = g_mutex_new())) - goto err; - - if (!(dfsm->sync_cond = g_cond_new())) - goto err; - - if (!(dfsm->results = g_hash_table_new(g_int64_hash, g_int64_equal))) - goto err; - - if (!(dfsm->msg_queue = g_sequence_new(NULL))) - goto err; - - dfsm->log_domain = g_strdup(log_domain); - dfsm->data = data; - dfsm->mode = DFSM_MODE_START; - dfsm->protocol_version = protocol_version; - strcpy (dfsm->cpg_group_name.value, group_name); - dfsm->cpg_group_name.length = strlen (group_name) + 1; - - dfsm->cpg_callbacks = &cpg_callbacks; - dfsm->dfsm_callbacks = callbacks; - - dfsm->members = g_hash_table_new(dfsm_sync_info_hash, dfsm_sync_info_equal); - if (!dfsm->members) - goto err; - - if ((dfsm->mode_mutex = g_mutex_new()) == NULL) - goto err; - - return dfsm; - -err: - dfsm_destroy(dfsm); - return NULL; -} - -gboolean -dfsm_lowest_nodeid(dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, FALSE); - - if (dfsm->lowest_nodeid && (dfsm->lowest_nodeid == dfsm->nodeid)) - return TRUE; - - return FALSE; -} - -cs_error_t -dfsm_verify_request(dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM); - - /* only do when we have lowest nodeid */ - if (!dfsm->lowest_nodeid || (dfsm->lowest_nodeid != dfsm->nodeid)) - return CS_OK; - - dfsm_mode_t mode = dfsm_get_mode(dfsm); - if (mode != DFSM_MODE_SYNCED) - return CS_OK; - - int len = 1; - struct iovec iov[len]; - - if (dfsm->csum_counter != dfsm->csum_id) { - g_message("delay verify request %016zX", dfsm->csum_counter + 1); - return CS_OK; - }; - - dfsm->csum_counter++; - iov[0].iov_base = (char *)&dfsm->csum_counter; - iov[0].iov_len = sizeof(dfsm->csum_counter); - - cfs_debug("send verify request %016zX", dfsm->csum_counter); - - cs_error_t result; - result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len); - - if (result != CS_OK) - cfs_dom_critical(dfsm->log_domain, "failed to send VERIFY_REQUEST message"); - - return result; -} - - -cs_error_t -dfsm_dispatch( - dfsm_t *dfsm, - cs_dispatch_flags_t dispatch_types) -{ - g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM); - g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_INVALID_PARAM); - - cs_error_t result; - - struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 }; - int retries = 0; -loop: - result = cpg_dispatch(dfsm->cpg_handle, dispatch_types); - if (result == CPG_ERR_TRY_AGAIN) { - nanosleep(&tvreq, NULL); - ++retries; - if ((retries % 10) == 0) - cfs_dom_message(dfsm->log_domain, "cpg_dispatch retry %d", retries); - goto loop; - } - - if (!(result == CS_OK || result == CS_ERR_TRY_AGAIN)) { - cfs_dom_critical(dfsm->log_domain, "cpg_dispatch failed: %d", result); - } - - return result; -} - - -cs_error_t -dfsm_initialize(dfsm_t *dfsm, int *fd) -{ - g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM); - g_return_val_if_fail(fd != NULL, CS_ERR_INVALID_PARAM); - - /* remove old messages */ - dfsm_free_message_queue(dfsm); - dfsm_send_sync_message_abort(dfsm); - - dfsm->joined = FALSE; - dfsm->we_are_member = 0; - - dfsm_set_mode(dfsm, DFSM_MODE_START); - - cs_error_t result; - - if (dfsm->cpg_handle == 0) { - if ((result = cpg_initialize(&dfsm->cpg_handle, dfsm->cpg_callbacks)) != CS_OK) { - cfs_dom_critical(dfsm->log_domain, "cpg_initialize failed: %d", result); - dfsm->cpg_handle = 0; - goto fail; - } - - if ((result = cpg_local_get(dfsm->cpg_handle, &dfsm->nodeid)) != CS_OK) { - cfs_dom_critical(dfsm->log_domain, "cpg_local_get failed: %d", result); - goto fail; - } - - dfsm->pid = getpid(); - - result = cpg_context_set(dfsm->cpg_handle, dfsm); - if (result != CS_OK) { - cfs_dom_critical(dfsm->log_domain, "cpg_context_set failed: %d", result); - goto fail; - } - } - - result = cpg_fd_get(dfsm->cpg_handle, fd); - if (result != CS_OK) { - cfs_dom_critical(dfsm->log_domain, "cpg_fd_get failed: %d", result); - goto fail; - } - - return CS_OK; - -fail: - if (dfsm->cpg_handle) - cpg_finalize(dfsm->cpg_handle); - dfsm->cpg_handle = 0; - return result; -} - -cs_error_t -dfsm_join(dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM); - g_return_val_if_fail(dfsm->cpg_handle != 0, CS_ERR_LIBRARY); - g_return_val_if_fail(dfsm->joined == 0, CS_ERR_EXIST); - - cs_error_t result; - - struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 }; - int retries = 0; -loop: - result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name); - if (result == CPG_ERR_TRY_AGAIN) { - nanosleep(&tvreq, NULL); - ++retries; - if ((retries % 10) == 0) - cfs_dom_message(dfsm->log_domain, "cpg_join retry %d", retries); - goto loop; - } - - if (result != CS_OK) { - cfs_dom_critical(dfsm->log_domain, "cpg_join failed: %d", result); - return result; - } - - dfsm->joined = TRUE; - return TRUE; -} - -cs_error_t -dfsm_leave (dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM); - g_return_val_if_fail(dfsm->joined, CS_ERR_NOT_EXIST); - - cs_error_t result; - - struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 }; - int retries = 0; -loop: - result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name); - if (result == CPG_ERR_TRY_AGAIN) { - nanosleep(&tvreq, NULL); - ++retries; - if ((retries % 10) == 0) - cfs_dom_message(dfsm->log_domain, "cpg_leave retry %d", retries); - goto loop; - } - - if (result != CS_OK) { - cfs_dom_critical(dfsm->log_domain, "cpg_leave failed: %d", result); - return result; - } - - dfsm->joined = FALSE; - - return TRUE; -} - -gboolean -dfsm_finalize(dfsm_t *dfsm) -{ - g_return_val_if_fail(dfsm != NULL, FALSE); - - dfsm_send_sync_message_abort(dfsm); - - if (dfsm->joined) - dfsm_leave(dfsm); - - if (dfsm->cpg_handle) { - cpg_finalize(dfsm->cpg_handle); - dfsm->cpg_handle = 0; - dfsm->joined = FALSE; - dfsm->we_are_member = 0; - } - - return TRUE; -} - -void -dfsm_destroy(dfsm_t *dfsm) -{ - g_return_if_fail(dfsm != NULL); - - dfsm_finalize(dfsm); - - if (dfsm->sync_info && dfsm->sync_info->data && dfsm->dfsm_callbacks->dfsm_cleanup_fn) - dfsm->dfsm_callbacks->dfsm_cleanup_fn(dfsm, dfsm->data, dfsm->sync_info); - - dfsm_free_sync_queue(dfsm); - - if (dfsm->mode_mutex) - g_mutex_free (dfsm->mode_mutex); - - if (dfsm->sync_mutex) - g_mutex_free (dfsm->sync_mutex); - - if (dfsm->sync_cond) - g_cond_free (dfsm->sync_cond); - - if (dfsm->results) - g_hash_table_destroy(dfsm->results); - - if (dfsm->msg_queue) { - dfsm_free_message_queue(dfsm); - g_sequence_free(dfsm->msg_queue); - } - - if (dfsm->sync_info) - g_free(dfsm->sync_info); - - if (dfsm->cpg_handle) - cpg_finalize(dfsm->cpg_handle); - - if (dfsm->members) - g_hash_table_destroy(dfsm->members); - - if (dfsm->log_domain) - g_free(dfsm->log_domain); - - g_free(dfsm); -} - -typedef struct { - dfsm_t *dfsm; -} service_dfsm_private_t; - -static gboolean -service_dfsm_finalize( - cfs_service_t *service, - gpointer context) -{ - g_return_val_if_fail(service != NULL, FALSE); - g_return_val_if_fail(context != NULL, FALSE); - - service_dfsm_private_t *private = (service_dfsm_private_t *)context; - dfsm_t *dfsm = private->dfsm; - - g_return_val_if_fail(dfsm != NULL, FALSE); - - return dfsm_finalize(dfsm); -} - -static int -service_dfsm_initialize( - cfs_service_t *service, - gpointer context) -{ - g_return_val_if_fail(service != NULL, -1); - g_return_val_if_fail(context != NULL, -1); - - service_dfsm_private_t *private = (service_dfsm_private_t *)context; - dfsm_t *dfsm = private->dfsm; - - g_return_val_if_fail(dfsm != NULL, -1); - - /* serious internal error - don't try to recover */ - if (!dfsm_restartable(dfsm)) - return -1; - - int fd = -1; - - cs_error_t result; - if ((result = dfsm_initialize(dfsm, &fd)) != CS_OK) - return -1; - - result = dfsm_join(dfsm); - if (result != CS_OK) { - /* we can't dispatch if not joined, so we need to finalize */ - dfsm_finalize(dfsm); - return -1; - } - - return fd; -} - -static gboolean -service_dfsm_dispatch( - cfs_service_t *service, - gpointer context) -{ - g_return_val_if_fail(service != NULL, FALSE); - g_return_val_if_fail(context != NULL, FALSE); - - service_dfsm_private_t *private = (service_dfsm_private_t *)context; - dfsm_t *dfsm = private->dfsm; - - g_return_val_if_fail(dfsm != NULL, FALSE); - g_return_val_if_fail(dfsm->cpg_handle != 0, FALSE); - - cs_error_t result; - - result = dfsm_dispatch(dfsm, CPG_DISPATCH_ONE); - if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE) - goto finalize; - if (result != CS_OK) - goto fail; - - dfsm_mode_t mode = dfsm_get_mode(dfsm); - if (mode >= DFSM_ERROR_MODE_START) { - if (dfsm->joined) { - result = dfsm_leave(dfsm); - if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE) - goto finalize; - if (result != CS_OK) - goto finalize; - } else { - if (!dfsm->we_are_member) - return FALSE; - } - } - - return TRUE; - -fail: - cfs_service_set_restartable(service, dfsm_restartable(dfsm)); - return FALSE; - -finalize: - dfsm_finalize(dfsm); - goto fail; -} - -static void -service_dfsm_timer( - cfs_service_t *service, - gpointer context) -{ - g_return_if_fail(service != NULL); - g_return_if_fail(context != NULL); - - service_dfsm_private_t *private = (service_dfsm_private_t *)context; - dfsm_t *dfsm = private->dfsm; - - g_return_if_fail(dfsm != NULL); - - dfsm_verify_request(dfsm); -} - -static cfs_service_callbacks_t cfs_dfsm_callbacks = { - .cfs_service_initialize_fn = service_dfsm_initialize, - .cfs_service_finalize_fn = service_dfsm_finalize, - .cfs_service_dispatch_fn = service_dfsm_dispatch, - .cfs_service_timer_fn = service_dfsm_timer, -}; - -cfs_service_t * -service_dfsm_new(dfsm_t *dfsm) -{ - cfs_service_t *service; - - g_return_val_if_fail(dfsm != NULL, NULL); - - service_dfsm_private_t *private = g_new0(service_dfsm_private_t, 1); - if (!private) - return NULL; - - private->dfsm = dfsm; - - service = cfs_service_new(&cfs_dfsm_callbacks, dfsm->log_domain, private); - - return service; -} - -void -service_dfsm_destroy(cfs_service_t *service) -{ - g_return_if_fail(service != NULL); - - service_dfsm_private_t *private = - (service_dfsm_private_t *)cfs_service_get_context(service); - - g_free(private); - g_free(service); -} - - - -