#endif /* HAVE_CONFIG_H */
#include <sys/types.h>
+#include <inttypes.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
} dfsm_queued_message_t;
struct dfsm {
- char *log_domain;
+ const char *log_domain;
cpg_callbacks_t *cpg_callbacks;
dfsm_callbacks_t *dfsm_callbacks;
cpg_handle_t cpg_handle;
gboolean joined;
/* mode is protected with mode_mutex */
- GMutex *mode_mutex;
+ GMutex mode_mutex;
dfsm_mode_t mode;
GHashTable *members; /* contains dfsm_node_info_t pointers */
GList *sync_queue;
/* synchrounous message transmission, protected with sync_mutex */
- GMutex *sync_mutex;
- GCond *sync_cond;
+ GMutex sync_mutex;
+ GCond sync_cond;
GHashTable *results;
uint64_t msgcount;
uint64_t msgcount_rcvd;
dfsm_send_sync_message_abort(dfsm_t *dfsm)
{
g_return_if_fail(dfsm != NULL);
- g_return_if_fail(dfsm->sync_mutex != NULL);
- g_return_if_fail(dfsm->sync_cond != NULL);
- g_mutex_lock (dfsm->sync_mutex);
+ g_mutex_lock (&dfsm->sync_mutex);
dfsm->msgcount_rcvd = dfsm->msgcount;
- g_cond_broadcast (dfsm->sync_cond);
- g_mutex_unlock (dfsm->sync_mutex);
+ g_cond_broadcast (&dfsm->sync_cond);
+ g_mutex_unlock (&dfsm->sync_mutex);
}
static void
gboolean processed)
{
g_return_if_fail(dfsm != NULL);
- g_return_if_fail(dfsm->sync_mutex != NULL);
- g_return_if_fail(dfsm->sync_cond != NULL);
g_return_if_fail(dfsm->results != NULL);
- g_mutex_lock (dfsm->sync_mutex);
+ g_mutex_lock (&dfsm->sync_mutex);
dfsm_result_t *rp = (dfsm_result_t *)g_hash_table_lookup(dfsm->results, &msg_count);
if (rp) {
rp->result = msg_result;
rp->processed = processed;
}
dfsm->msgcount_rcvd = msg_count;
- g_cond_broadcast (dfsm->sync_cond);
- g_mutex_unlock (dfsm->sync_mutex);
+ g_cond_broadcast (&dfsm->sync_cond);
+ g_mutex_unlock (&dfsm->sync_mutex);
}
-static cpg_error_t
+static cs_error_t
dfsm_send_message_full(
dfsm_t *dfsm,
struct iovec *iov,
g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
- cpg_error_t result;
+ cs_error_t result;
int retries = 0;
loop:
result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
- if (retry && result == CPG_ERR_TRY_AGAIN) {
+ if (retry && result == CS_ERR_TRY_AGAIN) {
nanosleep(&tvreq, NULL);
++retries;
if ((retries % 10) == 0)
cfs_dom_message(dfsm->log_domain, "cpg_send_message retried %d times", retries);
if (result != CS_OK &&
- (!retry || result != CPG_ERR_TRY_AGAIN))
+ (!retry || result != CS_ERR_TRY_AGAIN))
cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
return result;
}
-static cpg_error_t
+static cs_error_t
dfsm_send_state_message_full(
dfsm_t *dfsm,
uint16_t type,
return dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
}
-cpg_error_t
+cs_error_t
dfsm_send_update(
dfsm_t *dfsm,
struct iovec *iov,
return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE, iov, len);
}
-cpg_error_t
+cs_error_t
dfsm_send_update_complete(dfsm_t *dfsm)
{
return dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_UPDATE_COMPLETE, NULL, 0);
}
-cpg_error_t
+cs_error_t
dfsm_send_message(
dfsm_t *dfsm,
uint16_t msgtype,
return dfsm_send_message_sync(dfsm, msgtype, iov, len, NULL);
}
-cpg_error_t
+cs_error_t
dfsm_send_message_sync(
dfsm_t *dfsm,
uint16_t msgtype,
dfsm_result_t *rp)
{
g_return_val_if_fail(dfsm != NULL, CS_ERR_INVALID_PARAM);
- g_return_val_if_fail(dfsm->sync_mutex != NULL, CS_ERR_INVALID_PARAM);
g_return_val_if_fail(!len || iov != NULL, CS_ERR_INVALID_PARAM);
- g_mutex_lock (dfsm->sync_mutex);
+ g_mutex_lock (&dfsm->sync_mutex);
/* note: hold lock until message is sent - to guarantee ordering */
uint64_t msgcount = ++dfsm->msgcount;
if (rp) {
for (int i = 0; i < len; i++)
real_iov[i + 1] = iov[i];
- cpg_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
+ cs_error_t result = dfsm_send_message_full(dfsm, real_iov, len + 1, 1);
- g_mutex_unlock (dfsm->sync_mutex);
+ g_mutex_unlock (&dfsm->sync_mutex);
if (result != CS_OK) {
cfs_dom_critical(dfsm->log_domain, "cpg_send_message failed: %d", result);
if (rp) {
- g_mutex_lock (dfsm->sync_mutex);
+ g_mutex_lock (&dfsm->sync_mutex);
g_hash_table_remove(dfsm->results, &rp->msgcount);
- g_mutex_unlock (dfsm->sync_mutex);
+ g_mutex_unlock (&dfsm->sync_mutex);
}
return result;
}
if (rp) {
- g_mutex_lock (dfsm->sync_mutex);
+ g_mutex_lock (&dfsm->sync_mutex);
while (dfsm->msgcount_rcvd < msgcount)
- g_cond_wait (dfsm->sync_cond, dfsm->sync_mutex);
+ g_cond_wait (&dfsm->sync_cond, &dfsm->sync_mutex);
g_hash_table_remove(dfsm->results, &rp->msgcount);
- g_mutex_unlock (dfsm->sync_mutex);
+ g_mutex_unlock (&dfsm->sync_mutex);
return rp->processed ? CS_OK : CS_ERR_FAILED_OPERATION;
}
cfs_debug("dfsm_set_mode - set mode to %d", new_mode);
int changed = 0;
- g_mutex_lock (dfsm->mode_mutex);
+ g_mutex_lock (&dfsm->mode_mutex);
if (dfsm->mode != new_mode) {
if (new_mode < DFSM_ERROR_MODE_START ||
(dfsm->mode < DFSM_ERROR_MODE_START || new_mode >= dfsm->mode)) {
changed = 1;
}
}
- g_mutex_unlock (dfsm->mode_mutex);
+ g_mutex_unlock (&dfsm->mode_mutex);
if (!changed)
return;
{
g_return_val_if_fail(dfsm != NULL, DFSM_MODE_ERROR);
- g_mutex_lock (dfsm->mode_mutex);
+ g_mutex_lock (&dfsm->mode_mutex);
dfsm_mode_t mode = dfsm->mode;
- g_mutex_unlock (dfsm->mode_mutex);
+ g_mutex_unlock (&dfsm->mode_mutex);
return mode;
}
}
if (msg_len < sizeof(dfsm_message_header_t)) {
- cfs_dom_critical(dfsm->log_domain, "received short message (%ld bytes)", msg_len);
+ cfs_dom_critical(dfsm->log_domain, "received short message (%zd bytes)", msg_len);
goto leave;
}
dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
if (msg_len < sizeof(dfsm_message_normal_header_t)) {
- cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %ld bytes)",
+ cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %zd bytes)",
base_header->type, base_header->subtype, msg_len);
goto leave;
}
if (mode != DFSM_MODE_SYNCED) {
- cfs_dom_debug(dfsm->log_domain, "queue message %zu (subtype = %d, length = %ld)",
+ cfs_dom_debug(dfsm->log_domain, "queue message %" PRIu64 " (subtype = %d, length = %zd)",
header->count, base_header->subtype, msg_len);
if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
if (msg_len < sizeof(dfsm_message_state_header_t)) {
- cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %ld bytes)",
+ cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %zd bytes)",
base_header->type, base_header->subtype, msg_len);
goto leave;
}
} else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
if (msg_len != sizeof(dfsm->csum_counter)) {
- cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len, nodeid, pid);
+ cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%zd bytes) form node %d/%d", msg_len, nodeid, pid);
goto leave;
}
uint64_t csum_id = *((uint64_t *)msg);
msg += 8; msg_len -= 8;
- cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016zX", nodeid, csum_id);
+ cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016" PRIX64, nodeid, csum_id);
if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
- cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%ld bytes)", msg_len);
+ cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%zd bytes)", msg_len);
goto leave;
}
if (dfsm->csum_id == csum_id &&
(memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
- cfs_dom_critical(dfsm->log_domain, "wrong checksum %016zX != %016zX - restarting",
+ cfs_dom_critical(dfsm->log_domain, "wrong checksum %016" PRIX64 " != %016" PRIX64 " - restarting",
*(uint64_t *)msg, *(uint64_t *)dfsm->csum);
goto leave;
} else {
cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
} else {
- cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %ld bytes)",
+ cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %zd bytes)",
base_header->type, msg_len);
goto leave;
}
g_sequence_get(cur);
if (qm->nodeid == dfsm->nodeid && qm->pid == dfsm->pid) {
- cpg_error_t result;
+ cs_error_t result;
struct iovec iov[1];
iov[0].iov_base = qm->msg;
iov[0].iov_len = qm->msg_len;
if ((dfsm = g_new0(dfsm_t, 1)) == NULL)
return NULL;
- if (!(dfsm->sync_mutex = g_mutex_new()))
- goto err;
-
- if (!(dfsm->sync_cond = g_cond_new()))
- goto err;
+ g_mutex_init(&dfsm->sync_mutex);
+
+ g_cond_init(&dfsm->sync_cond);
if (!(dfsm->results = g_hash_table_new(g_int64_hash, g_int64_equal)))
goto err;
if (!(dfsm->msg_queue = g_sequence_new(NULL)))
goto err;
- dfsm->log_domain = g_strdup(log_domain);
+ dfsm->log_domain = log_domain;
dfsm->data = data;
dfsm->mode = DFSM_MODE_START;
dfsm->protocol_version = protocol_version;
if (!dfsm->members)
goto err;
- if ((dfsm->mode_mutex = g_mutex_new()) == NULL)
- goto err;
+ g_mutex_init(&dfsm->mode_mutex);
return dfsm;
return NULL;
}
+gboolean
+dfsm_is_initialized(dfsm_t *dfsm)
+{
+ g_return_val_if_fail(dfsm != NULL, FALSE);
+
+ return (dfsm->cpg_handle != 0) ? TRUE : FALSE;
+}
+
gboolean
dfsm_lowest_nodeid(dfsm_t *dfsm)
{
struct iovec iov[len];
if (dfsm->csum_counter != dfsm->csum_id) {
- g_message("delay verify request %016zX", dfsm->csum_counter + 1);
+ g_message("delay verify request %016" PRIX64, dfsm->csum_counter + 1);
return CS_OK;
};
iov[0].iov_base = (char *)&dfsm->csum_counter;
iov[0].iov_len = sizeof(dfsm->csum_counter);
- cfs_debug("send verify request %016zX", dfsm->csum_counter);
+ cfs_debug("send verify request %016" PRIX64, dfsm->csum_counter);
cs_error_t result;
result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
int retries = 0;
loop:
result = cpg_dispatch(dfsm->cpg_handle, dispatch_types);
- if (result == CPG_ERR_TRY_AGAIN) {
+ if (result == CS_ERR_TRY_AGAIN) {
nanosleep(&tvreq, NULL);
++retries;
if ((retries % 10) == 0)
if (dfsm->cpg_handle == 0) {
if ((result = cpg_initialize(&dfsm->cpg_handle, dfsm->cpg_callbacks)) != CS_OK) {
cfs_dom_critical(dfsm->log_domain, "cpg_initialize failed: %d", result);
- dfsm->cpg_handle = 0;
- goto fail;
+ goto err_no_finalize;
}
if ((result = cpg_local_get(dfsm->cpg_handle, &dfsm->nodeid)) != CS_OK) {
cfs_dom_critical(dfsm->log_domain, "cpg_local_get failed: %d", result);
- goto fail;
+ goto err_finalize;
}
dfsm->pid = getpid();
result = cpg_context_set(dfsm->cpg_handle, dfsm);
if (result != CS_OK) {
cfs_dom_critical(dfsm->log_domain, "cpg_context_set failed: %d", result);
- goto fail;
+ goto err_finalize;
}
}
result = cpg_fd_get(dfsm->cpg_handle, fd);
if (result != CS_OK) {
cfs_dom_critical(dfsm->log_domain, "cpg_fd_get failed: %d", result);
- goto fail;
+ goto err_finalize;
}
return CS_OK;
-fail:
- if (dfsm->cpg_handle)
- cpg_finalize(dfsm->cpg_handle);
+ err_finalize:
+ cpg_finalize(dfsm->cpg_handle);
+ err_no_finalize:
dfsm->cpg_handle = 0;
return result;
}
int retries = 0;
loop:
result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name);
- if (result == CPG_ERR_TRY_AGAIN) {
+ if (result == CS_ERR_TRY_AGAIN) {
nanosleep(&tvreq, NULL);
++retries;
if ((retries % 10) == 0)
int retries = 0;
loop:
result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
- if (result == CPG_ERR_TRY_AGAIN) {
+ if (result == CS_ERR_TRY_AGAIN) {
nanosleep(&tvreq, NULL);
++retries;
if ((retries % 10) == 0)
dfsm_free_sync_queue(dfsm);
- if (dfsm->mode_mutex)
- g_mutex_free (dfsm->mode_mutex);
+ g_mutex_clear (&dfsm->mode_mutex);
- if (dfsm->sync_mutex)
- g_mutex_free (dfsm->sync_mutex);
+ g_mutex_clear (&dfsm->sync_mutex);
- if (dfsm->sync_cond)
- g_cond_free (dfsm->sync_cond);
+ g_cond_clear (&dfsm->sync_cond);
if (dfsm->results)
g_hash_table_destroy(dfsm->results);
if (dfsm->members)
g_hash_table_destroy(dfsm->members);
- if (dfsm->log_domain)
- g_free(dfsm->log_domain);
-
g_free(dfsm);
}
cs_error_t result;
- result = dfsm_dispatch(dfsm, CPG_DISPATCH_ONE);
+ result = dfsm_dispatch(dfsm, CS_DISPATCH_ONE);
if (result == CS_ERR_LIBRARY || result == CS_ERR_BAD_HANDLE)
goto finalize;
if (result != CS_OK)
return TRUE;
+finalize:
+ dfsm_finalize(dfsm);
fail:
cfs_service_set_restartable(service, dfsm_restartable(dfsm));
return FALSE;
-
-finalize:
- dfsm_finalize(dfsm);
- goto fail;
}
static void