/*
- Copyright (C) 2010 Proxmox Server Solutions GmbH
+ Copyright (C) 2010 - 2020 Proxmox Server Solutions GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
#endif /* HAVE_CONFIG_H */
#include <sys/types.h>
+#include <inttypes.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
cpg_callbacks_t *cpg_callbacks;
dfsm_callbacks_t *dfsm_callbacks;
cpg_handle_t cpg_handle;
+ GMutex cpg_mutex;
struct cpg_name cpg_group_name;
uint32_t nodeid;
uint32_t pid;
cs_error_t result;
int retries = 0;
loop:
+ g_mutex_lock (&dfsm->cpg_mutex);
result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
+ g_mutex_unlock (&dfsm->cpg_mutex);
if (retry && result == CS_ERR_TRY_AGAIN) {
nanosleep(&tvreq, NULL);
++retries;
qm->nodeid = nodeid;
qm->pid = pid;
- qm->msg = g_memdup (msg, msg_len);
+ qm->msg = g_memdup2 (msg, msg_len);
qm->msg_len = msg_len;
qm->msg_count = msg_count;
dfsm_t *dfsm = NULL;
result = cpg_context_get(handle, (gpointer *)&dfsm);
if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
- cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
+ cfs_critical("cpg_context_get error: %d (%p)", result, (void *) dfsm);
return; /* we have no valid dfsm pointer, so we can just ignore this */
}
dfsm_mode_t mode = dfsm_get_mode(dfsm);
}
if (msg_len < sizeof(dfsm_message_header_t)) {
- cfs_dom_critical(dfsm->log_domain, "received short message (%ld bytes)", msg_len);
+ cfs_dom_critical(dfsm->log_domain, "received short message (%zd bytes)", msg_len);
goto leave;
}
dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
if (msg_len < sizeof(dfsm_message_normal_header_t)) {
- cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %ld bytes)",
+ cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %zd bytes)",
base_header->type, base_header->subtype, msg_len);
goto leave;
}
if (mode != DFSM_MODE_SYNCED) {
- cfs_dom_debug(dfsm->log_domain, "queue message %zu (subtype = %d, length = %ld)",
+ cfs_dom_debug(dfsm->log_domain, "queue message %" PRIu64 " (subtype = %d, length = %zd)",
header->count, base_header->subtype, msg_len);
if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
int msg_res = -1;
int res = dfsm->dfsm_callbacks->dfsm_deliver_fn(
dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype,
- base_header->time, msg + sizeof(dfsm_message_normal_header_t),
+ base_header->time, (uint8_t *)msg + sizeof(dfsm_message_normal_header_t),
msg_len - sizeof(dfsm_message_normal_header_t));
if (nodeid == dfsm->nodeid && pid == dfsm->pid)
dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
if (msg_len < sizeof(dfsm_message_state_header_t)) {
- cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %ld bytes)",
+ cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %zd bytes)",
base_header->type, base_header->subtype, msg_len);
goto leave;
}
return;
}
- msg += sizeof(dfsm_message_state_header_t);
+ msg = (uint8_t *) msg + sizeof(dfsm_message_state_header_t);
msg_len -= sizeof(dfsm_message_state_header_t);
if (mode == DFSM_MODE_SYNCED) {
} else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
if (msg_len != sizeof(dfsm->csum_counter)) {
- cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len, nodeid, pid);
+ cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%zd bytes) form node %d/%d", msg_len, nodeid, pid);
goto leave;
}
uint64_t csum_id = *((uint64_t *)msg);
- msg += 8; msg_len -= 8;
+ msg = (uint8_t *) msg + 8; msg_len -= 8;
- cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016zX", nodeid, csum_id);
+ cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016" PRIX64, nodeid, csum_id);
if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
- cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%ld bytes)", msg_len);
+ cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%zd bytes)", msg_len);
goto leave;
}
uint64_t csum_id = *((uint64_t *)msg);
- msg += 8; msg_len -= 8;
+ msg = (uint8_t *) msg + 8; msg_len -= 8;
if (dfsm->csum_id == csum_id &&
(memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
- cfs_dom_critical(dfsm->log_domain, "wrong checksum %016zX != %016zX - restarting",
+ cfs_dom_critical(dfsm->log_domain, "wrong checksum %016" PRIX64 " != %016" PRIX64 " - restarting",
*(uint64_t *)msg, *(uint64_t *)dfsm->csum);
goto leave;
} else {
goto leave;
}
- ni->state = g_memdup(msg, msg_len);
+ ni->state = g_memdup2(msg, msg_len);
ni->state_len = msg_len;
int received_all = 1;
cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
} else {
- cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %ld bytes)",
+ cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %zd bytes)",
base_header->type, msg_len);
goto leave;
}
dfsm_t *dfsm = NULL;
result = cpg_context_get(handle, (gpointer *)&dfsm);
if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
- cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
+ cfs_critical("cpg_context_get error: %d (%p)", result, (void *) dfsm);
return; /* we have no valid dfsm pointer, so we can just ignore this */
}
}
int lowest_nodeid = 0;
- GString *str = g_string_new("members: ");
+ GString *member_ids = g_string_new(NULL);
for (int i = 0; i < member_list_entries; i++) {
- g_string_append_printf(str, i ? ", %d/%d" : "%d/%d",
+ g_string_append_printf(member_ids, i ? ", %d/%d" : "%d/%d",
member_list[i].nodeid, member_list[i].pid);
if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid)
if ((dfsm->we_are_member || mode != DFSM_MODE_START))
- cfs_dom_message(dfsm->log_domain, str->str);
+ cfs_dom_message(dfsm->log_domain, "members: %s", member_ids->str);
- g_string_free(str, 1);
+ g_string_free(member_ids, 1);
dfsm->lowest_nodeid = lowest_nodeid;
dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC);
if (lowest_nodeid == dfsm->nodeid) {
- if (!dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0)) {
+ if (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0) != CS_OK) {
cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message");
goto leave;
}
if (!(dfsm->msg_queue = g_sequence_new(NULL)))
goto err;
-
+
+ g_mutex_init(&dfsm->cpg_mutex);
+
dfsm->log_domain = log_domain;
dfsm->data = data;
dfsm->mode = DFSM_MODE_START;
struct iovec iov[len];
if (dfsm->csum_counter != dfsm->csum_id) {
- g_message("delay verify request %016zX", dfsm->csum_counter + 1);
+ g_message("delay verify request %016" PRIX64, dfsm->csum_counter + 1);
return CS_OK;
};
iov[0].iov_base = (char *)&dfsm->csum_counter;
iov[0].iov_len = sizeof(dfsm->csum_counter);
- cfs_debug("send verify request %016zX", dfsm->csum_counter);
+ cfs_debug("send verify request %016" PRIX64, dfsm->csum_counter);
cs_error_t result;
result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
int retries = 0;
loop:
+ g_mutex_lock (&dfsm->cpg_mutex);
result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name);
+ g_mutex_unlock (&dfsm->cpg_mutex);
if (result == CS_ERR_TRY_AGAIN) {
nanosleep(&tvreq, NULL);
++retries;
struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
int retries = 0;
loop:
+ g_mutex_lock (&dfsm->cpg_mutex);
result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
+ g_mutex_unlock (&dfsm->cpg_mutex);
if (result == CS_ERR_TRY_AGAIN) {
nanosleep(&tvreq, NULL);
++retries;
g_mutex_clear (&dfsm->sync_mutex);
g_cond_clear (&dfsm->sync_cond);
+
+ g_mutex_clear (&dfsm->cpg_mutex);
if (dfsm->results)
g_hash_table_destroy(dfsm->results);