]> git.proxmox.com Git - pve-cluster.git/blobdiff - data/src/dfsm.c
pmxcfs: migrate to g_memdup2
[pve-cluster.git] / data / src / dfsm.c
index fd3517455106cadf3cbf3f8320daed8332858a57..84f7df7b253fcc4ff6371fb2d491e3dc973985a0 100644 (file)
@@ -1,5 +1,5 @@
 /*
-  Copyright (C) 2010 Proxmox Server Solutions GmbH
+  Copyright (C) 2010 - 2020 Proxmox Server Solutions GmbH
 
   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU Affero General Public License as published by
@@ -30,6 +30,7 @@
 #endif /* HAVE_CONFIG_H */
 
 #include <sys/types.h>
+#include <inttypes.h>
 #include <unistd.h>
 #include <string.h>
 #include <stdlib.h>
@@ -106,6 +107,7 @@ struct dfsm {
        cpg_callbacks_t *cpg_callbacks;
        dfsm_callbacks_t *dfsm_callbacks;
        cpg_handle_t cpg_handle;
+       GMutex cpg_mutex;
        struct cpg_name cpg_group_name;
        uint32_t nodeid;
        uint32_t pid;
@@ -203,7 +205,9 @@ dfsm_send_message_full(
        cs_error_t result;
        int retries = 0;
 loop:
+       g_mutex_lock (&dfsm->cpg_mutex);
        result = cpg_mcast_joined(dfsm->cpg_handle, CPG_TYPE_AGREED, iov, len);
+       g_mutex_unlock (&dfsm->cpg_mutex);
        if (retry && result == CS_ERR_TRY_AGAIN) {
                nanosleep(&tvreq, NULL);
                ++retries;
@@ -456,7 +460,7 @@ dfsm_queue_add_message(
                
        qm->nodeid = nodeid;
        qm->pid = pid;
-       qm->msg = g_memdup (msg, msg_len);
+       qm->msg = g_memdup2 (msg, msg_len);
        qm->msg_len = msg_len;
        qm->msg_count =  msg_count;
 
@@ -649,7 +653,7 @@ dfsm_cpg_deliver_callback(
        dfsm_t *dfsm = NULL;
        result = cpg_context_get(handle, (gpointer *)&dfsm);
        if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
-               cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
+               cfs_critical("cpg_context_get error: %d (%p)", result, (void *) dfsm);
                return; /* we have no valid dfsm pointer, so we can just ignore this */
        }
        dfsm_mode_t mode = dfsm_get_mode(dfsm);
@@ -667,7 +671,7 @@ dfsm_cpg_deliver_callback(
        }
 
        if (msg_len < sizeof(dfsm_message_header_t)) {
-               cfs_dom_critical(dfsm->log_domain, "received short message (%ld bytes)", msg_len);
+               cfs_dom_critical(dfsm->log_domain, "received short message (%zd bytes)", msg_len);
                goto leave;
        }
 
@@ -689,13 +693,13 @@ dfsm_cpg_deliver_callback(
                dfsm_message_normal_header_t *header = (dfsm_message_normal_header_t *)msg;
 
                if (msg_len < sizeof(dfsm_message_normal_header_t)) {
-                       cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %ld bytes)",
+                       cfs_dom_critical(dfsm->log_domain, "received short message (type = %d, subtype = %d, %zd bytes)",
                                         base_header->type, base_header->subtype, msg_len);
                        goto leave;
                }
 
                if (mode != DFSM_MODE_SYNCED) {
-                       cfs_dom_debug(dfsm->log_domain, "queue message %zu (subtype = %d, length = %ld)", 
+                       cfs_dom_debug(dfsm->log_domain, "queue message %" PRIu64 " (subtype = %d, length = %zd)",
                                      header->count, base_header->subtype, msg_len); 
 
                        if (!dfsm_queue_add_message(dfsm, nodeid, pid, header->count, msg, msg_len))
@@ -705,7 +709,7 @@ dfsm_cpg_deliver_callback(
                        int msg_res = -1;
                        int res = dfsm->dfsm_callbacks->dfsm_deliver_fn(
                                dfsm, dfsm->data, &msg_res, nodeid, pid, base_header->subtype, 
-                               base_header->time, msg + sizeof(dfsm_message_normal_header_t), 
+                               base_header->time, (uint8_t *)msg + sizeof(dfsm_message_normal_header_t),
                                msg_len - sizeof(dfsm_message_normal_header_t));
 
                        if (nodeid == dfsm->nodeid && pid == dfsm->pid)
@@ -725,7 +729,7 @@ dfsm_cpg_deliver_callback(
        dfsm_message_state_header_t *header = (dfsm_message_state_header_t *)msg;
 
        if (msg_len < sizeof(dfsm_message_state_header_t)) {
-               cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %ld bytes)",
+               cfs_dom_critical(dfsm->log_domain, "received short state message (type = %d, subtype = %d, %zd bytes)",
                                 base_header->type, base_header->subtype, msg_len);
                goto leave;
        }
@@ -738,7 +742,7 @@ dfsm_cpg_deliver_callback(
                return;
        }
 
-       msg += sizeof(dfsm_message_state_header_t);
+       msg = (uint8_t *) msg + sizeof(dfsm_message_state_header_t);
        msg_len -= sizeof(dfsm_message_state_header_t);
 
        if (mode == DFSM_MODE_SYNCED) {
@@ -755,14 +759,14 @@ dfsm_cpg_deliver_callback(
                } else if (base_header->type == DFSM_MESSAGE_VERIFY_REQUEST) {
 
                        if (msg_len != sizeof(dfsm->csum_counter)) {
-                               cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%ld bytes) form node %d/%d", msg_len, nodeid, pid);
+                               cfs_dom_critical(dfsm->log_domain, "cpg received verify request with wrong length (%zd bytes) form node %d/%d", msg_len, nodeid, pid);
                                goto leave;
                        }
 
                        uint64_t csum_id = *((uint64_t *)msg);
-                       msg += 8; msg_len -= 8;
+                       msg = (uint8_t *) msg + 8; msg_len -= 8;
 
-                       cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016zX", nodeid, csum_id);
+                       cfs_dom_debug(dfsm->log_domain, "got verify request from node %d %016" PRIX64, nodeid, csum_id);
 
                        if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
                                if (!dfsm->dfsm_callbacks->dfsm_checksum_fn(
@@ -789,17 +793,17 @@ dfsm_cpg_deliver_callback(
                        if (dfsm->dfsm_callbacks->dfsm_checksum_fn) {
 
                                if (msg_len != (sizeof(dfsm->csum_id) + sizeof(dfsm->csum))) {
-                                       cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%ld bytes)", msg_len);
+                                       cfs_dom_critical(dfsm->log_domain, "cpg received verify message with wrong length (%zd bytes)", msg_len);
                                        goto leave;
                                }
 
                                uint64_t csum_id = *((uint64_t *)msg);
-                               msg += 8; msg_len -= 8;
+                               msg = (uint8_t *) msg + 8; msg_len -= 8;
                                
                                if (dfsm->csum_id == csum_id &&
                                    (memcmp(&dfsm->csum_epoch, &header->epoch, sizeof(dfsm_sync_epoch_t)) == 0)) {
                                        if (memcmp(msg, dfsm->csum, sizeof(dfsm->csum)) != 0) {
-                                               cfs_dom_critical(dfsm->log_domain, "wrong checksum %016zX != %016zX - restarting",
+                                               cfs_dom_critical(dfsm->log_domain, "wrong checksum %016" PRIX64 " != %016" PRIX64 " - restarting",
                                                                 *(uint64_t *)msg, *(uint64_t *)dfsm->csum);
                                                goto leave;
                                        } else {
@@ -873,7 +877,7 @@ dfsm_cpg_deliver_callback(
                                goto leave;
                        }
 
-                       ni->state = g_memdup(msg, msg_len);
+                       ni->state = g_memdup2(msg, msg_len);
                        ni->state_len = msg_len;
 
                        int received_all = 1;
@@ -957,7 +961,7 @@ dfsm_cpg_deliver_callback(
                cfs_dom_debug(dfsm->log_domain, "ignore verify message %d while not synced", base_header->type);
     
        } else {
-               cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %ld bytes)", 
+               cfs_dom_critical(dfsm->log_domain, "received unknown state message type (type = %d, %zd bytes)",
                                 base_header->type, msg_len);
                goto leave;
        }
@@ -1107,7 +1111,7 @@ dfsm_cpg_confchg_callback(
        dfsm_t *dfsm = NULL;
        result = cpg_context_get(handle, (gpointer *)&dfsm);
        if (result != CS_OK || !dfsm || dfsm->cpg_callbacks != &cpg_callbacks) {
-               cfs_critical("cpg_context_get error: %d (%p)", result, dfsm);
+               cfs_critical("cpg_context_get error: %d (%p)", result, (void *) dfsm);
                return; /* we have no valid dfsm pointer, so we can just ignore this */
        }
 
@@ -1136,10 +1140,10 @@ dfsm_cpg_confchg_callback(
        }
 
        int lowest_nodeid = 0;
-       GString *str = g_string_new("members: ");
+       GString *member_ids = g_string_new(NULL);
        for (int i = 0; i < member_list_entries; i++) {
 
-               g_string_append_printf(str, i ? ", %d/%d" : "%d/%d",
+               g_string_append_printf(member_ids, i ? ", %d/%d" : "%d/%d",
                                       member_list[i].nodeid, member_list[i].pid);
 
                if (lowest_nodeid == 0 || lowest_nodeid > member_list[i].nodeid)
@@ -1152,9 +1156,9 @@ dfsm_cpg_confchg_callback(
 
 
        if ((dfsm->we_are_member || mode != DFSM_MODE_START))
-               cfs_dom_message(dfsm->log_domain, str->str);
+               cfs_dom_message(dfsm->log_domain, "members: %s",  member_ids->str);
 
-       g_string_free(str, 1);
+       g_string_free(member_ids, 1);
 
        dfsm->lowest_nodeid = lowest_nodeid;
 
@@ -1189,7 +1193,7 @@ dfsm_cpg_confchg_callback(
 
                dfsm_set_mode(dfsm, DFSM_MODE_START_SYNC);
                if (lowest_nodeid == dfsm->nodeid) {
-                       if (!dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0)) {
+                       if (dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_SYNC_START, NULL, 0) != CS_OK) {
                                cfs_dom_critical(dfsm->log_domain, "failed to send SYNC_START message");
                                goto leave;
                        }
@@ -1249,7 +1253,9 @@ dfsm_new(
 
        if (!(dfsm->msg_queue = g_sequence_new(NULL))) 
                goto err;
-               
+
+       g_mutex_init(&dfsm->cpg_mutex);
+
        dfsm->log_domain = log_domain;
        dfsm->data = data;
        dfsm->mode = DFSM_MODE_START;
@@ -1309,7 +1315,7 @@ dfsm_verify_request(dfsm_t *dfsm)
        struct iovec iov[len];
 
        if (dfsm->csum_counter != dfsm->csum_id) {
-               g_message("delay verify request %016zX", dfsm->csum_counter + 1);
+               g_message("delay verify request %016" PRIX64, dfsm->csum_counter + 1);
                return CS_OK;
        };
 
@@ -1317,7 +1323,7 @@ dfsm_verify_request(dfsm_t *dfsm)
        iov[0].iov_base = (char *)&dfsm->csum_counter;
        iov[0].iov_len = sizeof(dfsm->csum_counter);
        
-       cfs_debug("send verify request %016zX", dfsm->csum_counter);
+       cfs_debug("send verify request %016" PRIX64, dfsm->csum_counter);
 
        cs_error_t result;
        result = dfsm_send_state_message_full(dfsm, DFSM_MESSAGE_VERIFY_REQUEST, iov, len);
@@ -1423,7 +1429,9 @@ dfsm_join(dfsm_t *dfsm)
        struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
        int retries = 0;
 loop:
+       g_mutex_lock (&dfsm->cpg_mutex);
        result = cpg_join(dfsm->cpg_handle, &dfsm->cpg_group_name); 
+       g_mutex_unlock (&dfsm->cpg_mutex);
        if (result == CS_ERR_TRY_AGAIN) {
                nanosleep(&tvreq, NULL);
                ++retries;
@@ -1452,7 +1460,9 @@ dfsm_leave (dfsm_t *dfsm)
        struct timespec tvreq = { .tv_sec = 0, .tv_nsec = 100000000 };
        int retries = 0;
 loop:
+       g_mutex_lock (&dfsm->cpg_mutex);
        result = cpg_leave(dfsm->cpg_handle, &dfsm->cpg_group_name);
+       g_mutex_unlock (&dfsm->cpg_mutex);
        if (result == CS_ERR_TRY_AGAIN) {
                nanosleep(&tvreq, NULL);
                ++retries;
@@ -1508,6 +1518,8 @@ dfsm_destroy(dfsm_t *dfsm)
        g_mutex_clear (&dfsm->sync_mutex);
 
        g_cond_clear (&dfsm->sync_cond);
+
+       g_mutex_clear (&dfsm->cpg_mutex);
  
        if (dfsm->results)
                g_hash_table_destroy(dfsm->results);