]> git.proxmox.com Git - mirror_corosync.git/commitdiff
- sync_abort is called if there is a new config change during synchronization
authorHans Feldt <hans.feldt@ericsson.com>
Tue, 24 Oct 2006 06:30:50 +0000 (06:30 +0000)
committerHans Feldt <hans.feldt@ericsson.com>
Tue, 24 Oct 2006 06:30:50 +0000 (06:30 +0000)
- a new function sync_request() that can be called by a user to execute
synchronization on request of a specified service.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1280 fd59a12c-fef9-0310-b244-a6a79926bd2f

exec/sync.c
exec/sync.h

index f1cd879b09ed86f9160645e0f8aca97a79539ba7..2666c56bc322273aa3978257cf4da24abc26feab 100644 (file)
@@ -1,9 +1,11 @@
 /*
  * Copyright (c) 2005-2006 MontaVista Software, Inc.
+ * Copyright (c) 2006 Ericsson AB.
+ * Author: Steven Dake (sdake@mvista.com)
+ * Author: Hans Feldt
  *
  * All rights reserved.
  *
- * Author: Steven Dake (sdake@mvista.com)
  *
  * This software licensed under BSD license, the text of which follows:
  * 
 #include "vsf.h"
 #include "../lcr/lcr_ifact.h"
 #include "print.h"
+#include "util.h"
 
 #define MESSAGE_REQ_SYNC_BARRIER 0
+#define MESSAGE_REQ_SYNC_REQUEST 1
 
 struct barrier_data {
        unsigned int nodeid;
@@ -69,7 +73,8 @@ static struct memb_ring_id *sync_ring_id;
 
 static int vsf_none = 0;
 
-static int (*sync_callbacks_retrieve) (int sync_id, struct sync_callbacks *callack);
+static int (*sync_callbacks_retrieve) (int sync_id,
+       struct sync_callbacks *callack);
 
 static struct sync_callbacks sync_callbacks;
 
@@ -80,10 +85,7 @@ static void (*sync_synchronization_completed) (void);
 static int sync_recovery_index = 0;
 
 static void *sync_callback_token_handle = 0;
-
-static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX];
-
-static int barrier_data_confchg_entries;
+static void *sync_request_token_handle;
 
 static struct barrier_data barrier_data_process[PROCESSOR_COUNT_MAX];
 
@@ -91,11 +93,13 @@ static struct openais_vsf_iface_ver0 *vsf_iface;
 
 static int sync_barrier_send (struct memb_ring_id *ring_id);
 
-static int sync_start_process (enum totem_callback_token_type type, void *data);
+static int sync_start_process (
+       enum totem_callback_token_type type, void *data);
 
 static void sync_service_init (struct memb_ring_id *ring_id);
 
-static int sync_service_process (enum totem_callback_token_type type, void *data);
+static int sync_service_process (
+       enum totem_callback_token_type type, void *data);
 
 static void sync_deliver_fn (
        unsigned int nodeid,
@@ -117,52 +121,71 @@ static void sync_primary_callback_fn (
        struct memb_ring_id *ring_id);
 
 static struct totempg_group sync_group = {
-    .group      = "sync",
-    .group_len  = 4
+       .group      = "sync",
+       .group_len  = 4
 };
 
 static totempg_groups_handle sync_group_handle;
+static char *service_name;
+static struct memb_ring_id deliver_ring_id;
+static unsigned int current_members[PROCESSOR_COUNT_MAX];
+static unsigned int current_members_cnt;
+
+struct sync_barrier_start {
+};
 
-struct req_exec_sync_barrier_start {
+struct sync_request {
+       uint32_t name_len;
+       char name[0] __attribute__((aligned(8)));
+};
+
+typedef struct sync_msg {
        mar_req_header_t header;
        struct memb_ring_id ring_id;
-};
+       union {
+               struct sync_barrier_start sync_barrier_start;
+               struct sync_request sync_request;
+       };
+} sync_msg_t;
 
 /*
  * Send a barrier data structure
  */
 static int sync_barrier_send (struct memb_ring_id *ring_id)
 {
-       struct req_exec_sync_barrier_start req_exec_sync_barrier_start;
+       sync_msg_t msg;
        struct iovec iovec;
        int res;
 
-       req_exec_sync_barrier_start.header.size = sizeof (struct req_exec_sync_barrier_start);
-       req_exec_sync_barrier_start.header.id = MESSAGE_REQ_SYNC_BARRIER;
+       msg.header.size = sizeof (sync_msg_t);
+       msg.header.id = MESSAGE_REQ_SYNC_BARRIER;
 
-       memcpy (&req_exec_sync_barrier_start.ring_id, ring_id,
-               sizeof (struct memb_ring_id));
+       memcpy (&msg.ring_id, ring_id, sizeof (struct memb_ring_id));
 
-       iovec.iov_base = (char *)&req_exec_sync_barrier_start;
-       iovec.iov_len = sizeof (req_exec_sync_barrier_start);
+       iovec.iov_base = (char *)&msg;
+       iovec.iov_len = sizeof (msg);
 
-       res = totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED);
+       res = totempg_groups_mcast_joined (
+               sync_group_handle, &iovec, 1, TOTEMPG_AGREED);
 
        return (res);
 }
 
-void sync_start_init (struct memb_ring_id *ring_id)
+static void sync_start_init (struct memb_ring_id *ring_id)
 {
+       ENTER("");
        totempg_callback_token_create (
                &sync_callback_token_handle,
                TOTEM_CALLBACK_TOKEN_SENT,
                0, /* don't delete after callback */
                sync_start_process,
                (void *)ring_id);
+       LEAVE("");
 }
 
 static void sync_service_init (struct memb_ring_id *ring_id)
 {
+       ENTER("");
        sync_callbacks.sync_init ();
        totempg_callback_token_destroy (&sync_callback_token_handle);
 
@@ -175,13 +198,16 @@ static void sync_service_init (struct memb_ring_id *ring_id)
                0, /* don't delete after callback */
                sync_service_process,
                (void *)ring_id);
+       LEAVE("");
 }
 
-static int sync_start_process (enum totem_callback_token_type type, void *data)
+static int sync_start_process (
+       enum totem_callback_token_type type, void *data)
 {
        int res;
        struct memb_ring_id *ring_id = (struct memb_ring_id *)data;
 
+       ENTER("");
        res = sync_barrier_send (ring_id);
        if (res == 0) {
                /*
@@ -189,13 +215,15 @@ static int sync_start_process (enum totem_callback_token_type type, void *data)
                 */
                totempg_callback_token_destroy (&sync_callback_token_handle);
        }
+       LEAVE("");
        return (0);
 }
 
-void sync_callbacks_load (void)
+static void sync_callbacks_load (void)
 {
        int res;
 
+       ENTER("");
 // TODO rewrite this to get rid of the for (;;)
        for (;;) {
                res = sync_callbacks_retrieve (sync_recovery_index, &sync_callbacks);
@@ -206,25 +234,33 @@ void sync_callbacks_load (void)
                        sync_processing = 0;
                        break;
                }
+               if ((service_name != NULL) &&
+                       strcmp ((char*)sync_callbacks.name, service_name) != 0) {
+                       sync_recovery_index += 1;
+                       continue;
+               }
                sync_recovery_index += 1;
                if (sync_callbacks.sync_init) {
                        break;
                }
        }
+       LEAVE("");
 }
 
-static int sync_service_process (enum totem_callback_token_type type, void *data)
+static int sync_service_process (
+       enum totem_callback_token_type type, void *data)
 {
        int res;
        struct memb_ring_id *ring_id = (struct memb_ring_id *)data;
 
-       
+       ENTER("");
+
        /*
         * If process operation not from this ring id, then ignore it and stop
         * processing
         */
        if (memcmp (ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) {
-               return (0);
+               goto end;
        }
        
        /*
@@ -233,12 +269,14 @@ static int sync_service_process (enum totem_callback_token_type type, void *data
         */
        res = sync_callbacks.sync_process ();
        if (res != 0) {
-               return (0);
+               goto end;
        }
        totempg_callback_token_destroy (&sync_callback_token_handle);
 
        sync_start_init (ring_id);
 
+end:
+       LEAVE("");
        return (0);
 }
 
@@ -314,10 +352,16 @@ static void sync_primary_callback_fn (
 {
        int i;
 
+       ENTER("");
+
        if (primary_designated) {
-               log_printf (LOG_LEVEL_NOTICE, "This node is within the primary component and will provide service.\n");
+               log_printf (LOG_LEVEL_NOTICE,
+                       "This node is within the primary component and will provide"
+                       " service.\n");
        } else {
-               log_printf (LOG_LEVEL_NOTICE, "This node is within the non-primary component and will NOT provide any services.\n");
+               log_printf (LOG_LEVEL_NOTICE,
+                       "This node is within the non-primary component and will NOT"
+                       " provide any services.\n");
                return;
        }
 
@@ -329,23 +373,14 @@ static void sync_primary_callback_fn (
        totempg_callback_token_destroy (&sync_callback_token_handle);
 
        sync_recovery_index = 0;
-       memset (&barrier_data_confchg, 0, sizeof (barrier_data_confchg));
+
        for (i = 0; i < view_list_entries; i++) {
-               barrier_data_confchg[i].nodeid = view_list[i];
-               barrier_data_confchg[i].completed = 0;
+               barrier_data_process[i].nodeid = view_list[i];
+               barrier_data_process[i].completed = 0;
        }
-       memcpy (barrier_data_process, barrier_data_confchg,
-               sizeof (barrier_data_confchg));
-       barrier_data_confchg_entries = view_list_entries;
-       sync_start_init (sync_ring_id);
-}
-
-static struct memb_ring_id deliver_ring_id;
 
-void sync_endian_convert (struct req_exec_sync_barrier_start *req_exec_sync_barrier_start)
-{
-       /* XXX no swab on mar_req_header_t? */
-       swab_memb_ring_id_t (&req_exec_sync_barrier_start->ring_id);
+       sync_start_init (sync_ring_id);
+       LEAVE("");
 }
 
 static void sync_deliver_fn (
@@ -354,36 +389,67 @@ static void sync_deliver_fn (
        int iov_len,
        int endian_conversion_required)
 {
-       struct req_exec_sync_barrier_start *req_exec_sync_barrier_start =
-               (struct req_exec_sync_barrier_start *)iovec[0].iov_base;
-
        int i;
+       int barrier_completed;
+       sync_msg_t *msg = iovec[0].iov_base;
+
+       ENTER("type %d, len %d", msg->header.id, iovec[0].iov_len);
 
        if (endian_conversion_required) {
-               sync_endian_convert (req_exec_sync_barrier_start);
+               swab_mar_req_header_t (&msg->header);
+               swab_memb_ring_id_t (&msg->ring_id);
        }
 
-       int barrier_completed = 1;
-
-       memcpy (&deliver_ring_id, &req_exec_sync_barrier_start->ring_id,
-               sizeof (struct memb_ring_id));
-
        /*
-        * Is this barrier from this configuration, if not, ignore it
+        * If this message is not from this configuration, ignore it
         */
-       if (memcmp (&req_exec_sync_barrier_start->ring_id, sync_ring_id,
+       if (memcmp (&msg->ring_id, sync_ring_id,
                sizeof (struct memb_ring_id)) != 0) {
-               return;
+               goto end;
        }
 
+       if (msg->header.id == MESSAGE_REQ_SYNC_REQUEST) {
+               if (endian_conversion_required) {
+                       swab_mar_uint32_t (&msg->sync_request.name_len);
+               }       
+               /*
+                * If there is an ongoing sync, abort it. A requested sync is
+                * only allowed to abort other requested synchronizations,
+                * not full synchronizations.
+                */
+               if (sync_processing && sync_callbacks.sync_abort) {
+                       sync_callbacks.sync_abort();
+                       sync_callbacks.sync_activate = NULL;
+                       sync_processing = 0;
+                       assert (service_name != NULL);
+                       free (service_name);
+                       service_name = NULL;
+               }
+
+               service_name = malloc (msg->sync_request.name_len);
+               strcpy (service_name, msg->sync_request.name);
+
+               /* 
+                * Start requested synchronization 
+                */
+               sync_primary_callback_fn (current_members, current_members_cnt, 1,
+                       sync_ring_id);
+
+               goto end;
+       }
+
+       barrier_completed = 1;
+
+       memcpy (&deliver_ring_id, &msg->ring_id, sizeof (struct memb_ring_id));
+
        /*
         * Set completion for source_addr's address
         */
-       for (i = 0; i < barrier_data_confchg_entries; i++) {
+       for (i = 0; i < current_members_cnt; i++) {
                if (nodeid == barrier_data_process[i].nodeid) {
                        barrier_data_process[i].completed = 1;
                        log_printf (LOG_LEVEL_DEBUG,
-                               "Barrier Start Recieved From %d\n",
+                               "Barrier Start Received From %d\n",
                                barrier_data_process[i].nodeid);
                        break;
                }
@@ -392,37 +458,37 @@ static void sync_deliver_fn (
        /*
         * Test if barrier is complete
         */
-       for (i = 0; i < barrier_data_confchg_entries; i++) {
+       for (i = 0; i < current_members_cnt; i++) {
                log_printf (LOG_LEVEL_DEBUG,
                        "Barrier completion status for nodeid %d = %d. \n", 
                        barrier_data_process[i].nodeid,
                        barrier_data_process[i].completed);
+
                if (barrier_data_process[i].completed == 0) {
                        barrier_completed = 0;
                }
        }
-       if (barrier_completed) {
-               log_printf (LOG_LEVEL_DEBUG,
-                       "Synchronization barrier completed\n");
-       }
-       /*
-        * This sync is complete so activate and start next service sync
-        */
-       if (barrier_completed && sync_callbacks.sync_activate) {
-               sync_callbacks.sync_activate ();
-       
-               log_printf (LOG_LEVEL_DEBUG,
-                       "Committing synchronization for (%s)\n",
-                       sync_callbacks.name);
-       
-       }
 
-       /*
-        * Start synchronization if the barrier has completed
-        */
        if (barrier_completed) {
-               memcpy (barrier_data_process, barrier_data_confchg,
-                       sizeof (barrier_data_confchg));
+               log_printf (LOG_LEVEL_DEBUG, "Synchronization barrier completed\n");
+               /*
+                * This sync is complete so activate and start next service sync
+               */
+               if (sync_callbacks.sync_activate) {
+                       log_printf (LOG_LEVEL_DEBUG,
+                               "Committing synchronization for (%s)\n",
+                               sync_callbacks.name);
+
+                       sync_callbacks.sync_activate ();
+               }
+
+               /*
+                * Start synchronization if the barrier has completed
+               */
+               for (i = 0; i < current_members_cnt; i++) {
+                       barrier_data_process[i].nodeid = current_members[i];
+                       barrier_data_process[i].completed = 0;
+               }
 
                sync_callbacks_load();
 
@@ -434,9 +500,15 @@ static void sync_deliver_fn (
                                "Synchronization actions starting for (%s)\n",
                                sync_callbacks.name);
                        sync_service_init (&deliver_ring_id);
+               } else {
+                       if (service_name != NULL) {
+                               free (service_name);
+                               service_name = NULL;
+                       }
                }
        }
-       return;
+end:
+       LEAVE("");
 }
 
 static void sync_confchg_fn (
@@ -446,19 +518,94 @@ static void sync_confchg_fn (
        unsigned int *joined_list, int joined_list_entries,
        struct memb_ring_id *ring_id)
 {
+       int i;
+
+       ENTER("");
+
+       /*
+        * Save current members and ring ID for later use
+        */
+       for (i = 0; i < member_list_entries; i++) {
+               current_members[i] = member_list[i];
+       }
+       current_members_cnt = member_list_entries;
        sync_ring_id = ring_id;
 
        /*
-        * If no virtual synchrony filter configured, then start
-        * synchronization process
+        * If no virtual synchrony filter configured.
         */
        if (vsf_none == 1) {
+               /*
+                * If there is an ongoing synchronization, abort it.
+                */
+               if (sync_processing && sync_callbacks.sync_abort) {
+                       sync_callbacks.sync_abort();
+                       sync_callbacks.sync_activate = NULL;
+                       sync_processing = 0;
+                       if (service_name != NULL) {
+                               free (service_name);
+                               service_name = NULL;
+                       }
+               }
+
+               /*
+                * Start new synchronization process 
+                */
                sync_primary_callback_fn (
-                       member_list,
-                       member_list_entries,
-                       1,
-                       ring_id);
+                       member_list, member_list_entries,       1, ring_id);
+       }
+       LEAVE("");
+}
+
+/**
+ * TOTEM callback function used to multicast a sync_request
+ * message
+ * @param type
+ * @param _name
+ * 
+ * @return int
+ */
+static int sync_request_send (
+       enum totem_callback_token_type type, void *_name)
+{
+       int res;
+       char *name = _name;
+       sync_msg_t msg;
+       struct iovec iovec[2];
+       int name_len;
+
+       ENTER("'%s'", name);
+
+       name_len = strlen (name) + 1;
+       msg.header.size = sizeof (msg) + name_len;
+       msg.header.id = MESSAGE_REQ_SYNC_REQUEST;
+
+       memcpy (&msg.ring_id, sync_ring_id,     sizeof (struct memb_ring_id));
+       msg.sync_request.name_len = name_len;
+
+       iovec[0].iov_base = (char *)&msg;
+       iovec[0].iov_len = sizeof (msg);
+       iovec[1].iov_base = _name;
+       iovec[1].iov_len = name_len;
+
+       res = totempg_groups_mcast_joined (
+               sync_group_handle, iovec, 2, TOTEMPG_AGREED);
+
+       if (res == 0) {
+               /*
+                * We managed to multicast the message so delete the token callback
+                * for the sync request.
+                */
+               totempg_callback_token_destroy (&sync_request_token_handle);
        }
+
+       /*
+        * if we failed to multicast the message, this function will be called
+        * again.
+        */
+
+       LEAVE("");
+       return (0);
 }
 
 int sync_in_process (void)
@@ -474,3 +621,28 @@ int sync_primary_designated (void)
                return (vsf_iface->primary());
        }
 }
+
+/**
+ * Execute synchronization upon request for the named service
+ * @param name
+ * 
+ * @return int
+ */
+int sync_request (char *name)
+{
+       assert (name != NULL);
+
+       ENTER("'%s'", name);
+
+       if (sync_processing) {
+               return -1;
+       }
+
+       totempg_callback_token_create (&sync_request_token_handle,
+               TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */
+               sync_request_send, name);
+
+       LEAVE("");
+
+       return 0;
+}
index 8dce8ae49a21af9a21fabdffbdbf100aceeef43f..82ee8a18e412f11d438bcf896399d1b6a34feab7 100644 (file)
@@ -56,4 +56,12 @@ int sync_in_process (void);
 
 int sync_primary_designated (void);
 
+/**
+ * Execute synchronization upon request for the named service
+ * @param name service handler name to synchronize
+ * 
+ * @return int 0 OK, error code otherwise
+ */
+extern int sync_request (char *name);
+
 #endif /* SYNC_H_DEFINED */