]> git.proxmox.com Git - mirror_corosync.git/commitdiff
Implement thread saftey in corosync trunk.
authorSteven Dake <sdake@redhat.com>
Wed, 29 Apr 2009 07:21:21 +0000 (07:21 +0000)
committerSteven Dake <sdake@redhat.com>
Wed, 29 Apr 2009 07:21:21 +0000 (07:21 +0000)
git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2165 fd59a12c-fef9-0310-b244-a6a79926bd2f

include/corosync/coroipcc.h
lib/cfg.c
lib/confdb.c
lib/coroipcc.c
lib/cpg.c
lib/evs.c
lib/pload.c
lib/quorum.c
lib/sa-confdb.c
lib/votequorum.c

index 4fa42cd5828f85a8e8504fa621d05023fd75ad78..bc912ca1b6a02f813618b8c6e3fe586b31c0c6a9 100644 (file)
@@ -40,6 +40,7 @@
 #include <sys/poll.h>
 #include <sys/socket.h>
 #include <corosync/corotypes.h>
+#include <corosync/hdb.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -52,65 +53,70 @@ coroipcc_service_connect (
        size_t request_size,
        size_t respnse__size,
        size_t dispatch_size,
-       void **ipc_context);
+       hdb_handle_t *handle);
 
 extern cs_error_t
 coroipcc_service_disconnect (
-       void *ipc_context);
+       hdb_handle_t handle);
 
-extern int
+extern cs_error_t
 coroipcc_fd_get (
-       void *ipc_context);
+       hdb_handle_t handle,
+       int *fd);
 
-extern int
+extern cs_error_t
 coroipcc_dispatch_get (
-       void *ipc_context,
+       hdb_handle_t handle,
        void **buf,
        int timeout);
 
-extern int
+extern cs_error_t
 coroipcc_dispatch_put (
-       void *ipc_context);
+       hdb_handle_t handle);
 
-extern int
+extern cs_error_t
 coroipcc_dispatch_flow_control_get (
-       void *ipc_context);
+       hdb_handle_t handle,
+       unsigned int *flow_control_state);
 
 extern cs_error_t
 coroipcc_msg_send_reply_receive (
-       void *ipc_context,
+       hdb_handle_t handle,
        const struct iovec *iov,
        unsigned int iov_len,
        void *res_msg,
        size_t res_len);
 
 extern cs_error_t
-coroipcc_msg_send_reply_receive_in_buf (
-       void *ipc_context,
+coroipcc_msg_send_reply_receive_in_buf_get (
+       hdb_handle_t handle,
        const struct iovec *iov,
        unsigned int iov_len,
        void **res_msg);
 
+extern cs_error_t
+coroipcc_msg_send_reply_receive_in_buf_put (
+       hdb_handle_t handle);
+
 extern cs_error_t
 coroipcc_zcb_alloc (
-       void *ipc_context,
+       hdb_handle_t handle,
        void **buffer,
        size_t size,
         size_t header_size);
 
 extern cs_error_t
 coroipcc_zcb_free (
-       void *ipc_context,
+       hdb_handle_t handle,
        void *buffer);
 
 extern cs_error_t
 coroipcc_zcb_msg_send_reply_receive (
-       void *ipc_context,
+       hdb_handle_t handle,
        void *msg,
        void *res_msg,
        size_t res_len);
 
-
 #ifdef __cplusplus
 }
 #endif
index 9b67f36ea9e3f72c06e70ea8ea9f8b80feefba6f..4cd0c92ff72a34db94b3abf5c3307cd5651d5a63 100644 (file)
--- a/lib/cfg.c
+++ b/lib/cfg.c
  * Data structure for instance data
  */
 struct cfg_instance {
-       void *ipc_ctx;
+       hdb_handle_t handle;
        corosync_cfg_callbacks_t callbacks;
        cs_name_t comp_name;
        int comp_registered;
        int finalize;
-       pthread_mutex_t response_mutex;
-       pthread_mutex_t dispatch_mutex;
 };
 
-static void cfg_handle_instance_destructor (void *);
-
 /*
  * All instances in one database
  */
-DECLARE_HDB_DATABASE (cfg_hdb,cfg_handle_instance_destructor);
+DECLARE_HDB_DATABASE (cfg_hdb,NULL);
 
 /*
  * Implementation
  */
-void cfg_handle_instance_destructor (void *instance)
-{
-       struct cfg_instance *cfg_instance = instance;
-
-       pthread_mutex_destroy (&cfg_instance->response_mutex);
-       pthread_mutex_destroy (&cfg_instance->dispatch_mutex);
-}
 
 cs_error_t
 corosync_cfg_initialize (
@@ -114,7 +103,7 @@ corosync_cfg_initialize (
                IPC_REQUEST_SIZE,
                IPC_RESPONSE_SIZE,
                IPC_DISPATCH_SIZE,
-               &cfg_instance->ipc_ctx);
+               &cfg_instance->handle);
        if (error != CS_OK) {
                goto error_put_destroy;
        }
@@ -123,10 +112,6 @@ corosync_cfg_initialize (
        memcpy (&cfg_instance->callbacks, cfg_callbacks, sizeof (corosync_cfg_callbacks_t));
        }
 
-       pthread_mutex_init (&cfg_instance->response_mutex, NULL);
-
-       pthread_mutex_init (&cfg_instance->dispatch_mutex, NULL);
-
        (void)hdb_handle_put (&cfg_hdb, *cfg_handle);
 
        return (CS_OK);
@@ -152,10 +137,10 @@ corosync_cfg_fd_get (
                return (error);
        }
 
-       *selection_fd = coroipcc_fd_get (cfg_instance->ipc_ctx);
+       error = coroipcc_fd_get (cfg_instance->handle, selection_fd);
 
        (void)hdb_handle_put (&cfg_hdb, cfg_handle);
-       return (CS_OK);
+       return (error);
 }
 
 cs_error_t
@@ -166,7 +151,6 @@ corosync_cfg_dispatch (
        int timeout = -1;
        cs_error_t error;
        int cont = 1; /* always continue do loop except when set to 0 */
-       int dispatch_avail;
        struct cfg_instance *cfg_instance;
        struct res_lib_cfg_testshutdown *res_lib_cfg_testshutdown;
        corosync_cfg_callbacks_t callbacks;
@@ -186,29 +170,21 @@ corosync_cfg_dispatch (
        }
 
        do {
-               pthread_mutex_lock (&cfg_instance->dispatch_mutex);
 
-               dispatch_avail = coroipcc_dispatch_get (
-                       cfg_instance->ipc_ctx,
+               error = coroipcc_dispatch_get (
+                       cfg_instance->handle,
                        (void **)&dispatch_data,
                        timeout);
-
-               /*
-                * Handle has been finalized in another thread
-                */
-               if (cfg_instance->finalize == 1) {
-                       error = CS_OK;
-                       pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
+               if (error != CS_OK) {
                        goto error_put;
                }
 
-               if (dispatch_avail == 0 && dispatch_flags == CS_DISPATCH_ALL) {
-                       pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
-                       break; /* exit do while cont is 1 loop */
-               } else
-               if (dispatch_avail == 0) {
-                       pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
-                       continue; /* next poll */
+               if (dispatch_data == NULL) {
+                       if (dispatch_flags == CPG_DISPATCH_ALL) {
+                               break; /* exit do while cont is 1 loop */
+                       } else {
+                               continue; /* next poll */
+                       }
                }
 
                /*
@@ -217,7 +193,6 @@ corosync_cfg_dispatch (
                 * operate at the same time that cfgFinalize has been called in another thread.
                 */
                memcpy (&callbacks, &cfg_instance->callbacks, sizeof (corosync_cfg_callbacks_t));
-               pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
 
                /*
                 * Dispatch incoming response
@@ -230,12 +205,12 @@ corosync_cfg_dispatch (
                        }
                        break;
                default:
-                       coroipcc_dispatch_put (cfg_instance->ipc_ctx);
+                       coroipcc_dispatch_put (cfg_instance->handle);
                        error = CS_ERR_LIBRARY;
                        goto error_nounlock;
                        break;
                }
-               coroipcc_dispatch_put (cfg_instance->ipc_ctx);
+               coroipcc_dispatch_put (cfg_instance->handle);
 
                /*
                 * Determine if more messages should be processed
@@ -269,31 +244,17 @@ corosync_cfg_finalize (
                return (error);
        }
 
-       pthread_mutex_lock (&cfg_instance->dispatch_mutex);
-
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
        /*
         * Another thread has already started finalizing
         */
        if (cfg_instance->finalize) {
-               pthread_mutex_unlock (&cfg_instance->response_mutex);
-               pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
                (void)hdb_handle_put (&cfg_hdb, cfg_handle);
                return (CS_ERR_BAD_HANDLE);
        }
 
        cfg_instance->finalize = 1;
 
-       coroipcc_service_disconnect (cfg_instance->ipc_ctx);
-
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
-
-       pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
-
-       pthread_mutex_destroy (&cfg_instance->response_mutex);
-
-       pthread_mutex_destroy (&cfg_instance->dispatch_mutex);
+       coroipcc_service_disconnect (cfg_instance->handle);
 
        (void)hdb_handle_destroy (&cfg_hdb, cfg_handle);
 
@@ -327,16 +288,12 @@ corosync_cfg_ring_status_get (
        iov.iov_base =  &req_lib_cfg_ringstatusget,
        iov.iov_len = sizeof (struct req_lib_cfg_ringstatusget),
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive(cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive(cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_ringstatusget,
                sizeof (struct res_lib_cfg_ringstatusget));
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
-
        *interface_count = res_lib_cfg_ringstatusget.interface_count;
        *interface_names = malloc (sizeof (char *) * *interface_count);
        if (*interface_names == NULL) {
@@ -407,15 +364,12 @@ corosync_cfg_ring_reenable (
        iov.iov_base = &req_lib_cfg_ringreenable,
        iov.iov_len = sizeof (struct req_lib_cfg_ringreenable);
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_ringreenable,
                sizeof (struct res_lib_cfg_ringreenable));
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
        (void)hdb_handle_put (&cfg_hdb, cfg_handle);
 
        return (error);
@@ -449,15 +403,12 @@ corosync_cfg_service_load (
        iov.iov_base = &req_lib_cfg_serviceload;
        iov.iov_len = sizeof (req_lib_cfg_serviceload);
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_serviceload,
                sizeof (struct res_lib_cfg_serviceload));
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
        (void)hdb_handle_put (&cfg_hdb, cfg_handle);
 
        return (error);
@@ -491,15 +442,12 @@ corosync_cfg_service_unload (
        iov.iov_base = &req_lib_cfg_serviceunload;
        iov.iov_len = sizeof (req_lib_cfg_serviceunload);
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_serviceunload,
                sizeof (struct res_lib_cfg_serviceunload));
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
        (void)hdb_handle_put (&cfg_hdb, cfg_handle);
 
        return (error);
@@ -530,16 +478,12 @@ corosync_cfg_state_track (
        iov.iov_base = &req_lib_cfg_statetrack,
        iov.iov_len = sizeof (struct req_lib_cfg_statetrack),
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_statetrack,
                sizeof (struct res_lib_cfg_statetrack));
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
-
        (void)hdb_handle_put (&cfg_hdb, cfg_handle);
 
         return (error == CS_OK ? res_lib_cfg_statetrack.header.error : error);
@@ -566,16 +510,13 @@ corosync_cfg_state_track_stop (
 
        iov.iov_base = &req_lib_cfg_statetrackstop,
        iov.iov_len = sizeof (struct req_lib_cfg_statetrackstop),
-       pthread_mutex_lock (&cfg_instance->response_mutex);
 
-       error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_statetrackstop,
                sizeof (struct res_lib_cfg_statetrackstop));
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
-
        (void)hdb_handle_put (&cfg_hdb, cfg_handle);
 
         return (error == CS_OK ? res_lib_cfg_statetrackstop.header.error : error);
@@ -611,9 +552,7 @@ corosync_cfg_kill_node (
        iov.iov_base = &req_lib_cfg_killnode;
        iov.iov_len = sizeof (struct req_lib_cfg_killnode);
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_killnode,
@@ -621,8 +560,6 @@ corosync_cfg_kill_node (
 
        error = res_lib_cfg_killnode.header.error;
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
-
        (void)hdb_handle_put (&cfg_hdb, cfg_handle);
 
         return (error == CS_OK ? res_lib_cfg_killnode.header.error : error);
@@ -652,16 +589,12 @@ corosync_cfg_try_shutdown (
        iov.iov_base = &req_lib_cfg_tryshutdown;
        iov.iov_len = sizeof (req_lib_cfg_tryshutdown);
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_tryshutdown,
                sizeof (struct res_lib_cfg_tryshutdown));
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
-
        (void)hdb_handle_put (&cfg_hdb, cfg_handle);
 
         return (error == CS_OK ? res_lib_cfg_tryshutdown.header.error : error);
@@ -691,16 +624,12 @@ corosync_cfg_replyto_shutdown (
        iov.iov_base = &req_lib_cfg_replytoshutdown;
        iov.iov_len = sizeof (struct req_lib_cfg_replytoshutdown);
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_replytoshutdown,
                sizeof (struct res_lib_cfg_replytoshutdown));
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
-
        return (error);
 }
 
@@ -733,18 +662,15 @@ cs_error_t corosync_cfg_get_node_addrs (
        iov.iov_base = (char *)&req_lib_cfg_get_node_addrs;
        iov.iov_len = sizeof (req_lib_cfg_get_node_addrs);
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive_in_buf (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive_in_buf_get (
+               cfg_instance->handle,
                &iov,
                1,
                &return_address);
        res_lib_cfg_get_node_addrs = return_address;
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
-
        if (error != CS_OK) {
-               goto error_exit;
+               goto error_put;
        }
 
        if (res_lib_cfg_get_node_addrs->family == AF_INET)
@@ -772,7 +698,9 @@ cs_error_t corosync_cfg_get_node_addrs (
        *num_addrs = res_lib_cfg_get_node_addrs->num_addrs;
        errno = error = res_lib_cfg_get_node_addrs->header.error;
 
-error_exit:
+error_put:
+       error = coroipcc_msg_send_reply_receive_in_buf_put (cfg_instance->handle);
+       hdb_handle_put (&cfg_hdb, cfg_handle);
 
        return (error);
 }
@@ -798,17 +726,13 @@ cs_error_t corosync_cfg_local_get (
        iov.iov_base = &req_lib_cfg_local_get;
        iov.iov_len = sizeof (struct req_lib_cfg_local_get);
 
-       pthread_mutex_lock (&cfg_inst->response_mutex);
-
        error = coroipcc_msg_send_reply_receive (
-               cfg_inst->ipc_ctx,
+               cfg_inst->handle,
                &iov,
                1,
                &res_lib_cfg_local_get,
                sizeof (struct res_lib_cfg_local_get));
 
-       pthread_mutex_unlock (&cfg_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -847,16 +771,12 @@ corosync_cfg_crypto_set (
        iov.iov_base = &req_lib_cfg_crypto_set;
        iov.iov_len = sizeof (struct req_lib_cfg_crypto_set);
 
-       pthread_mutex_lock (&cfg_instance->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
                &iov,
                1,
                &res_lib_cfg_crypto_set,
                sizeof (struct res_lib_cfg_crypto_set));
 
-       pthread_mutex_unlock (&cfg_instance->response_mutex);
-
        if (error == CS_OK)
                error = res_lib_cfg_crypto_set.header.error;
 
index e6763727fccd4c57e3169019bd47992be2b4baa8..f35469f66d35b22256f8b08ff8db4fd13c39479a 100644 (file)
@@ -72,22 +72,18 @@ struct iter_context {
 };
 
 struct confdb_inst {
-       void *ipc_ctx;
+       hdb_handle_t handle;
        int finalize;
        int standalone;
        confdb_callbacks_t callbacks;
        const void *context;
-       pthread_mutex_t response_mutex;
-       pthread_mutex_t dispatch_mutex;
 
        struct list_head object_find_head;
        struct list_head object_iter_head;
        struct list_head key_iter_head;
 };
 
-static void confdb_instance_destructor (void *instance);
-
-DECLARE_HDB_DATABASE(confdb_handle_t_db,confdb_instance_destructor);
+DECLARE_HDB_DATABASE(confdb_handle_t_db,NULL);
 
 static cs_error_t do_find_destroy(struct confdb_inst *confdb_inst, hdb_handle_t find_handle);
 
@@ -107,17 +103,6 @@ static void free_context_list(struct confdb_inst *confdb_inst, struct list_head
        }
 }
 
-/*
- * Clean up function for a confdb instance (confdb_initialize) handle
- */
-static void confdb_instance_destructor (void *instance)
-{
-       struct confdb_inst *confdb_inst = instance;
-
-       pthread_mutex_destroy (&confdb_inst->response_mutex);
-       pthread_mutex_destroy (&confdb_inst->dispatch_mutex);
-}
-
 static struct iter_context *find_iter_context(struct list_head *list, hdb_handle_t object_handle)
 {
        struct iter_context *context;
@@ -168,16 +153,13 @@ cs_error_t confdb_initialize (
                        IPC_REQUEST_SIZE,
                        IPC_RESPONSE_SIZE,
                        IPC_DISPATCH_SIZE,
-                       &confdb_inst->ipc_ctx);
+                       &confdb_inst->handle);
        }
        if (error != CS_OK)
                goto error_put_destroy;
 
        memcpy (&confdb_inst->callbacks, callbacks, sizeof (confdb_callbacks_t));
 
-       pthread_mutex_init (&confdb_inst->response_mutex, NULL);
-       pthread_mutex_init (&confdb_inst->dispatch_mutex, NULL);
-
        list_init (&confdb_inst->object_find_head);
        list_init (&confdb_inst->object_iter_head);
        list_init (&confdb_inst->key_iter_head);
@@ -205,28 +187,23 @@ cs_error_t confdb_finalize (
                return (error);
        }
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
        /*
         * Another thread has already started finalizing
         */
        if (confdb_inst->finalize) {
-               pthread_mutex_unlock (&confdb_inst->response_mutex);
                (void)hdb_handle_put (&confdb_handle_t_db, handle);
                return (CS_ERR_BAD_HANDLE);
        }
 
        confdb_inst->finalize = 1;
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
-
        /* Free saved context handles */
        free_context_list(confdb_inst, &confdb_inst->object_find_head);
        free_context_list(confdb_inst, &confdb_inst->object_iter_head);
        free_context_list(confdb_inst, &confdb_inst->key_iter_head);
 
        if (!confdb_inst->standalone) {
-               coroipcc_service_disconnect (confdb_inst->ipc_ctx);
+               coroipcc_service_disconnect (confdb_inst->handle);
        }
 
        (void)hdb_handle_destroy (&confdb_handle_t_db, handle);
@@ -248,11 +225,11 @@ cs_error_t confdb_fd_get (
                return (error);
        }
 
-       *fd = coroipcc_fd_get (confdb_inst->ipc_ctx);
+       error = coroipcc_fd_get (confdb_inst->handle, fd);
 
        (void)hdb_handle_put (&confdb_handle_t_db, handle);
 
-       return (CS_OK);
+       return (error);
 }
 
 cs_error_t confdb_context_get (
@@ -300,7 +277,6 @@ cs_error_t confdb_dispatch (
        int timeout = -1;
        cs_error_t error;
        int cont = 1; /* always continue do loop except when set to 0 */
-       int dispatch_avail;
        struct confdb_inst *confdb_inst;
        confdb_callbacks_t callbacks;
        struct res_lib_confdb_key_change_callback *res_key_changed_pt;
@@ -327,31 +303,14 @@ cs_error_t confdb_dispatch (
        }
 
        do {
-               pthread_mutex_lock (&confdb_inst->dispatch_mutex);
-
-               dispatch_avail = coroipcc_dispatch_get (
-                       confdb_inst->ipc_ctx,
+               error = coroipcc_dispatch_get (
+                       confdb_inst->handle,
                        (void **)&dispatch_data,
                        timeout);
-
-               /*
-                * Handle has been finalized in another thread
-                */
-               if (confdb_inst->finalize == 1) {
-                       error = CS_OK;
-                       pthread_mutex_unlock (&confdb_inst->dispatch_mutex);
+               if (error != CS_OK) {
                        goto error_put;
                }
 
-               if (dispatch_avail == 0 && dispatch_types == CONFDB_DISPATCH_ALL) {
-                       pthread_mutex_unlock (&confdb_inst->dispatch_mutex);
-                       break; /* exit do while cont is 1 loop */
-               } else
-               if (dispatch_avail == 0) {
-                       pthread_mutex_unlock (&confdb_inst->dispatch_mutex);
-                       continue; /* next poll */
-               }
-
                /*
                 * Make copy of callbacks, message data, unlock instance, and call callback
                 * A risk of this dispatch method is that the callback routines may
@@ -359,7 +318,6 @@ cs_error_t confdb_dispatch (
                */
                memcpy (&callbacks, &confdb_inst->callbacks, sizeof (confdb_callbacks_t));
 
-               pthread_mutex_unlock (&confdb_inst->dispatch_mutex);
 
                /*
                 * Dispatch incoming message
@@ -400,12 +358,12 @@ cs_error_t confdb_dispatch (
                                break;
 
                        default:
-                               coroipcc_dispatch_put (confdb_inst->ipc_ctx);
+                               coroipcc_dispatch_put (confdb_inst->handle);
                                error = CS_ERR_LIBRARY;
                                goto error_noput;
                                break;
                }
-               coroipcc_dispatch_put (confdb_inst->ipc_ctx);
+               coroipcc_dispatch_put (confdb_inst->handle);
 
                /*
                 * Determine if more messages should be processed
@@ -464,16 +422,13 @@ cs_error_t confdb_object_create (
        iov.iov_base = (char *)&req_lib_confdb_object_create;
        iov.iov_len = sizeof (struct req_lib_confdb_object_create);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_object_create,
                sizeof (struct res_lib_confdb_object_create));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -517,16 +472,13 @@ cs_error_t confdb_object_destroy (
        iov.iov_base = (char *)&req_lib_confdb_object_destroy;
        iov.iov_len = sizeof (struct req_lib_confdb_object_destroy);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res,
                sizeof (coroipc_response_header_t));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -570,16 +522,13 @@ cs_error_t confdb_object_parent_get (
        iov.iov_base = (char *)&req_lib_confdb_object_parent_get;
        iov.iov_len = sizeof (struct req_lib_confdb_object_parent_get);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_object_parent_get,
                sizeof (struct res_lib_confdb_object_parent_get));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -620,16 +569,13 @@ static cs_error_t do_find_destroy(
        iov.iov_base = (char *)&req_lib_confdb_object_find_destroy;
        iov.iov_len = sizeof (struct req_lib_confdb_object_find_destroy);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res,
                sizeof (coroipc_response_header_t));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -730,16 +676,13 @@ cs_error_t confdb_key_create (
        iov.iov_base = (char *)&req_lib_confdb_key_create;
        iov.iov_len = sizeof (struct req_lib_confdb_key_create);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res,
                sizeof (res));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -792,16 +735,13 @@ cs_error_t confdb_key_delete (
        iov.iov_base = (char *)&req_lib_confdb_key_delete;
        iov.iov_len = sizeof (struct req_lib_confdb_key_delete);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res,
                sizeof (res));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -852,16 +792,13 @@ cs_error_t confdb_key_get (
        iov.iov_base = (char *)&req_lib_confdb_key_get;
        iov.iov_len = sizeof (struct req_lib_confdb_key_get);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_key_get,
                sizeof (struct res_lib_confdb_key_get));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -915,16 +852,13 @@ cs_error_t confdb_key_increment (
        iov.iov_base = (char *)&req_lib_confdb_key_get;
        iov.iov_len = sizeof (struct req_lib_confdb_key_get);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_key_incdec,
                sizeof (struct res_lib_confdb_key_incdec));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -977,16 +911,13 @@ cs_error_t confdb_key_decrement (
        iov.iov_base = (char *)&req_lib_confdb_key_get;
        iov.iov_len = sizeof (struct req_lib_confdb_key_get);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_key_incdec,
                sizeof (struct res_lib_confdb_key_incdec));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -1046,16 +977,13 @@ cs_error_t confdb_key_replace (
        iov.iov_base = (char *)&req_lib_confdb_key_replace;
        iov.iov_len = sizeof (struct req_lib_confdb_key_replace);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res,
                sizeof (res));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -1221,16 +1149,13 @@ cs_error_t confdb_object_find (
        iov.iov_base = (char *)&req_lib_confdb_object_find;
        iov.iov_len = sizeof (struct req_lib_confdb_object_find);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_object_find,
                sizeof (struct res_lib_confdb_object_find));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -1293,16 +1218,13 @@ cs_error_t confdb_object_iter (
        iov.iov_base = (char *)&req_lib_confdb_object_iter;
        iov.iov_len = sizeof (struct req_lib_confdb_object_iter);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_object_iter,
                sizeof (struct res_lib_confdb_object_iter));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -1368,16 +1290,13 @@ cs_error_t confdb_key_iter (
        iov.iov_base = (char *)&req_lib_confdb_key_iter;
        iov.iov_len = sizeof (struct req_lib_confdb_key_iter);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_key_iter,
                sizeof (struct res_lib_confdb_key_iter));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -1430,16 +1349,13 @@ cs_error_t confdb_write (
        iov.iov_base = (char *)&req;
        iov.iov_len = sizeof (coroipc_request_header_t);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_write,
                sizeof (struct res_lib_confdb_write));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                /* FIXME: set error_text */
                goto error_exit;
@@ -1491,17 +1407,13 @@ cs_error_t confdb_reload (
        iov.iov_base = (char *)&req_lib_confdb_reload;
        iov.iov_len = sizeof (req_lib_confdb_reload);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res_lib_confdb_reload,
                sizeof (struct res_lib_confdb_reload));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
-
        if (error != CS_OK) {
                /* FIXME: set error_text */
                goto error_exit;
@@ -1549,16 +1461,13 @@ cs_error_t confdb_track_changes (
        iov.iov_base = (char *)&req;
        iov.iov_len = sizeof (struct req_lib_confdb_object_track_start);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res,
                sizeof (coroipc_response_header_t));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -1595,16 +1504,13 @@ cs_error_t confdb_stop_track_changes (confdb_handle_t handle)
        iov.iov_base = (char *)&req;
        iov.iov_len = sizeof (coroipc_request_header_t);
 
-       pthread_mutex_lock (&confdb_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               confdb_inst->ipc_ctx,
+               confdb_inst->handle,
                &iov,
                1,
                 &res,
                sizeof (coroipc_response_header_t));
 
-       pthread_mutex_unlock (&confdb_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
index 0deaa5f7168c0cc074230cdf996bb8c95532b762..0178f11fbc1c109a108826ec6c31b24d8c1021ba 100644 (file)
 #include <corosync/coroipc_types.h>
 #include <corosync/coroipc_ipc.h>
 #include <corosync/coroipcc.h>
+#include <corosync/hdb.h>
 
-struct ipc_segment {
+#include "util.h"
+
+struct ipc_instance {
        int fd;
        int shmid;
        int semid;
@@ -77,8 +80,12 @@ struct ipc_segment {
        size_t response_size;
        size_t dispatch_size;
        uid_t euid;
+       pthread_mutex_t mutex;
 };
 
+void ipc_hdb_destructor (void *context);
+
+DECLARE_HDB_DATABASE(ipc_hdb,ipc_hdb_destructor);
 
 #if defined(COROSYNC_LINUX)
 /* SUN_LEN is broken for abstract namespace
@@ -100,12 +107,13 @@ void socket_nosigpipe(int s)
 #define MSG_NOSIGNAL 0
 #endif
 
-static int
-coroipcc_send (
+static cs_error_t
+socket_send (
        int s,
        void *msg,
        size_t len)
 {
+       cs_error_t res = CS_OK;
        int result;
        struct msghdr msg_send;
        struct iovec iov_send;
@@ -125,44 +133,18 @@ retry_send:
        iov_send.iov_len = len - processed;
 
        result = sendmsg (s, &msg_send, MSG_NOSIGNAL);
-
-       /*
-        * return immediately on any kind of syscall error that maps to
-        * CS_ERR if no part of message has been sent
-        */
-       if (result == -1 && processed == 0) {
-               if (errno == EINTR) {
-                       goto error_exit;
-               }
-               if (errno == EAGAIN) {
-                       goto error_exit;
-               }
-               if (errno == EFAULT) {
-                       goto error_exit;
-               }
-       }
-
-       /*
-        * retry read operations that are already started except
-        * for fault in that case, return ERR_LIBRARY
-        */
-       if (result == -1 && processed > 0) {
-               if (errno == EINTR) {
+       if (result == -1) {
+               switch (errno) {
+               case EINTR:
                        goto retry_send;
-               }
-               if (errno == EAGAIN) {
+                       break;
+               case EAGAIN:
                        goto retry_send;
+                       break;
+               default:
+                       res = CS_ERR_LIBRARY;
+                       goto res_exit;
                }
-               if (errno == EFAULT) {
-                       goto error_exit;
-               }
-       }
-
-       /*
-        * return ERR_LIBRARY on any other syscall error
-        */
-       if (result == -1) {
-               goto error_exit;
        }
 
        processed += result;
@@ -170,19 +152,19 @@ retry_send:
                goto retry_send;
        }
 
-       return (0);
+       return (CS_OK);
 
-error_exit:
-       return (-1);
+res_exit:
+       return (res);
 }
 
-static int
-coroipcc_recv (
+static cs_error_t
+socket_recv (
        int s,
        void *msg,
        size_t len)
 {
-       int error = 0;
+       cs_error_t res = CS_OK;
        int result;
        struct msghdr msg_recv;
        struct iovec iov_recv;
@@ -202,36 +184,40 @@ retry_recv:
        iov_recv.iov_len = len - processed;
 
        result = recvmsg (s, &msg_recv, MSG_NOSIGNAL|MSG_WAITALL);
-       if (result == -1 && errno == EINTR) {
-               goto retry_recv;
-       }
-       if (result == -1 && errno == EAGAIN) {
-               goto retry_recv;
+       if (result == -1) {
+               switch (errno) {
+               case EINTR:
+                       goto retry_recv;
+                       break;
+               case EAGAIN:
+                       goto retry_recv;
+                       break;
+               default:
+                       res = CS_ERR_LIBRARY;
+                       goto res_exit;
+               }
        }
 #if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
        /* On many OS poll never return POLLHUP or POLLERR.
         * EOF is detected when recvmsg return 0.
         */
        if (result == 0) {
-               error = -1;
-               goto error_exit;
+               res = CS_ERR_LIBRARY;
+               goto res_exit;
        }
 #endif
-       if (result == -1 || result == 0) {
-               error = -1;
-               goto error_exit;
-       }
+
        processed += result;
        if (processed != len) {
                goto retry_recv;
        }
        assert (processed == len);
-error_exit:
-       return (0);
+res_exit:
+       return (res);
 }
 
 static int
-priv_change_send (struct ipc_segment *ipc_segment)
+priv_change_send (struct ipc_instance *ipc_instance)
 {
        char buf_req;
        mar_req_priv_change req_priv_change;
@@ -241,24 +227,24 @@ priv_change_send (struct ipc_segment *ipc_segment)
        /*
         * Don't resend request unless euid has changed
        */
-       if (ipc_segment->euid == req_priv_change.euid) {
+       if (ipc_instance->euid == req_priv_change.euid) {
                return (0);
        }
        req_priv_change.egid = getegid();
 
        buf_req = MESSAGE_REQ_CHANGE_EUID;
-       res = coroipcc_send (ipc_segment->fd, &buf_req, 1);
+       res = socket_send (ipc_instance->fd, &buf_req, 1);
        if (res == -1) {
                return (-1);
        }
 
-       res = coroipcc_send (ipc_segment->fd, &req_priv_change,
+       res = socket_send (ipc_instance->fd, &req_priv_change,
                sizeof (req_priv_change));
        if (res == -1) {
                return (-1);
        }
 
-       ipc_segment->euid = req_priv_change.euid;
+       ipc_instance->euid = req_priv_change.euid;
        return (0);
 }
 
@@ -326,6 +312,17 @@ memory_unmap (void *addr, size_t bytes)
        res = munmap (addr, bytes);
 }
 
+void ipc_hdb_destructor (void *context ) {
+       struct ipc_instance *ipc_instance = (struct ipc_instance *)context;
+
+       /*
+        * << 1 (or multiplied by 2) because this is a wrapped memory buffer
+        */
+       memory_unmap (ipc_instance->control_buffer, ipc_instance->control_size);
+       memory_unmap (ipc_instance->request_buffer, ipc_instance->request_size);
+       memory_unmap (ipc_instance->response_buffer, ipc_instance->response_size);
+       memory_unmap (ipc_instance->dispatch_buffer, (ipc_instance->dispatch_size) << 1);
+}
 static int
 memory_map (char *path, const char *file, void **buf, size_t bytes)
 {
@@ -369,22 +366,138 @@ memory_map (char *path, const char *file, void **buf, size_t bytes)
        return (0);
 }
 
-extern cs_error_t
+static cs_error_t
+msg_send (
+       struct ipc_instance *ipc_instance,
+       const struct iovec *iov,
+       unsigned int iov_len)
+{
+       struct sembuf sop;
+       int i;
+       int res;
+       int req_buffer_idx = 0;
+
+       for (i = 0; i < iov_len; i++) {
+               memcpy (&ipc_instance->request_buffer[req_buffer_idx],
+                       iov[i].iov_base,
+                       iov[i].iov_len);
+               req_buffer_idx += iov[i].iov_len;
+       }
+       /*
+        * Signal semaphore #0 indicting a new message from client
+        * to server request queue
+        */
+       sop.sem_num = 0;
+       sop.sem_op = 1;
+       sop.sem_flg = 0;
+
+retry_semop:
+       res = semop (ipc_instance->semid, &sop, 1);
+       if (res == -1 && errno == EINTR) {
+               goto retry_semop;
+       } else
+       if (res == -1 && errno == EACCES) {
+               priv_change_send (ipc_instance);
+               goto retry_semop;
+       } else
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+       return (CS_OK);
+}
+
+static cs_error_t
+reply_receive (
+       struct ipc_instance *ipc_instance,
+       void *res_msg,
+       size_t res_len)
+{
+       struct sembuf sop;
+       coroipc_response_header_t *response_header;
+       int res;
+
+       /*
+        * Wait for semaphore #1 indicating a new message from server
+        * to client in the response queue
+        */
+       sop.sem_num = 1;
+       sop.sem_op = -1;
+       sop.sem_flg = 0;
+
+retry_semop:
+       res = semop (ipc_instance->semid, &sop, 1);
+       if (res == -1 && errno == EINTR) {
+               goto retry_semop;
+       } else
+       if (res == -1 && errno == EACCES) {
+               priv_change_send (ipc_instance);
+               goto retry_semop;
+       } else
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+
+       response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
+       if (response_header->error == CS_ERR_TRY_AGAIN) {
+               return (CS_ERR_TRY_AGAIN);
+       }
+
+       memcpy (res_msg, ipc_instance->response_buffer, res_len);
+       return (CS_OK);
+}
+
+static cs_error_t
+reply_receive_in_buf (
+       struct ipc_instance *ipc_instance,
+       void **res_msg)
+{
+       struct sembuf sop;
+       int res;
+
+       /*
+        * Wait for semaphore #1 indicating a new message from server
+        * to client in the response queue
+        */
+       sop.sem_num = 1;
+       sop.sem_op = -1;
+       sop.sem_flg = 0;
+
+retry_semop:
+       res = semop (ipc_instance->semid, &sop, 1);
+       if (res == -1 && errno == EINTR) {
+               goto retry_semop;
+       } else
+       if (res == -1 && errno == EACCES) {
+               priv_change_send (ipc_instance);
+               goto retry_semop;
+       } else
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+
+       *res_msg = (char *)ipc_instance->response_buffer;
+       return (CS_OK);
+}
+
+/*
+ * External API
+ */
+cs_error_t
 coroipcc_service_connect (
        const char *socket_name,
        unsigned int service,
        size_t request_size,
        size_t response_size,
        size_t dispatch_size,
-       void **ipc_context)
+       hdb_handle_t *handle)
 
 {
        int request_fd;
        struct sockaddr_un address;
-       cs_error_t error;
-       struct ipc_segment *ipc_segment;
+       cs_error_t res;
+       struct ipc_instance *ipc_instance;
        key_t semkey = 0;
-       int res;
+       int sys_res;
        mar_req_setup_t req_setup;
        mar_res_setup_t res_setup;
        union semun semun;
@@ -393,11 +506,22 @@ coroipcc_service_connect (
        char response_map_path[128];
        char dispatch_map_path[128];
 
+       res = hdb_error_to_cs (hdb_handle_create (&ipc_hdb,
+               sizeof (struct ipc_instance), handle));
+       if (res != CS_OK) {
+               return (res);
+       }
+
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, *handle, (void **)&ipc_instance));
+       if (res != CS_OK) {
+               return (res);
+       }
+
        res_setup.error = CS_ERR_LIBRARY;
 
        request_fd = socket (PF_UNIX, SOCK_STREAM, 0);
        if (request_fd == -1) {
-               return (-1);
+               return (CS_ERR_LIBRARY);
        }
 
        memset (&address, 0, sizeof (struct sockaddr_un));
@@ -411,68 +535,61 @@ coroipcc_service_connect (
 #else
        sprintf (address.sun_path, "%s/%s", SOCKETDIR, socket_name);
 #endif
-       res = connect (request_fd, (struct sockaddr *)&address,
+       sys_res = connect (request_fd, (struct sockaddr *)&address,
                AIS_SUN_LEN(&address));
-       if (res == -1) {
+       if (sys_res == -1) {
                close (request_fd);
                return (CS_ERR_TRY_AGAIN);
        }
 
-       ipc_segment = malloc (sizeof (struct ipc_segment));
-       if (ipc_segment == NULL) {
-               close (request_fd);
-               return (-1);
-       }
-       bzero (ipc_segment, sizeof (struct ipc_segment));
-
        /*
         * Allocate a semaphore segment
         */
        while (1) {
                semkey = random();
-               ipc_segment->euid = geteuid ();
-               if ((ipc_segment->semid
+               ipc_instance->euid = geteuid ();
+               if ((ipc_instance->semid
                     = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
                      break;
                }
                if (errno != EEXIST) {
-                       goto error_exit;
+                       goto res_exit;
                }
        }
 
        semun.val = 0;
-       res = semctl (ipc_segment->semid, 0, SETVAL, semun);
+       res = semctl (ipc_instance->semid, 0, SETVAL, semun);
        if (res != 0) {
-               goto error_exit;
+               goto res_exit;
        }
 
-       res = semctl (ipc_segment->semid, 1, SETVAL, semun);
+       res = semctl (ipc_instance->semid, 1, SETVAL, semun);
        if (res != 0) {
-               goto error_exit;
+               goto res_exit;
        }
 
        res = memory_map (
                control_map_path,
                "control_buffer-XXXXXX",
-               (void *)&ipc_segment->control_buffer,
+               (void *)&ipc_instance->control_buffer,
                8192);
 
        res = memory_map (
                request_map_path,
                "request_buffer-XXXXXX",
-               (void *)&ipc_segment->request_buffer,
+               (void *)&ipc_instance->request_buffer,
                request_size);
 
        res = memory_map (
                response_map_path,
                "response_buffer-XXXXXX",
-               (void *)&ipc_segment->response_buffer,
+               (void *)&ipc_instance->response_buffer,
                response_size);
 
        res = circular_memory_map (
                dispatch_map_path,
                "dispatch_buffer-XXXXXX",
-               (void *)&ipc_segment->dispatch_buffer,
+               (void *)&ipc_instance->dispatch_buffer,
                dispatch_size);
 
        /*
@@ -489,85 +606,120 @@ coroipcc_service_connect (
        req_setup.dispatch_size = dispatch_size;
        req_setup.semkey = semkey;
 
-       error = coroipcc_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
-       if (error != 0) {
-               goto error_exit;
+       res = socket_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
+       if (res != CS_OK) {
+               goto res_exit;
        }
-       error = coroipcc_recv (request_fd, &res_setup, sizeof (mar_res_setup_t));
-       if (error != 0) {
-               goto error_exit;
+       res = socket_recv (request_fd, &res_setup, sizeof (mar_res_setup_t));
+       if (res != CS_OK) {
+               goto res_exit;
        }
 
-       ipc_segment->fd = request_fd;
-       ipc_segment->flow_control_state = 0;
+       ipc_instance->fd = request_fd;
+       ipc_instance->flow_control_state = 0;
 
        if (res_setup.error == CS_ERR_TRY_AGAIN) {
-               goto error_exit;
+               goto res_exit;
        }
 
-       ipc_segment->control_size = 8192;
-       ipc_segment->request_size = request_size;
-       ipc_segment->response_size = response_size;
-       ipc_segment->dispatch_size = dispatch_size;
+       ipc_instance->control_size = 8192;
+       ipc_instance->request_size = request_size;
+       ipc_instance->response_size = response_size;
+       ipc_instance->dispatch_size = dispatch_size;
+
+       pthread_mutex_init (&ipc_instance->mutex, NULL);
+
+       hdb_handle_put (&ipc_hdb, *handle);
 
-       *ipc_context = ipc_segment;
        return (res_setup.error);
 
-error_exit:
+res_exit:
        close (request_fd);
-       if (ipc_segment->semid > 0)
-               semctl (ipc_segment->semid, 0, IPC_RMID);
+       if (ipc_instance->semid > 0)
+               semctl (ipc_instance->semid, 0, IPC_RMID);
        return (res_setup.error);
 }
 
 cs_error_t
 coroipcc_service_disconnect (
-       void *ipc_context)
+       hdb_handle_t handle)
 {
-       struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
+       cs_error_t res;
+       struct ipc_instance *ipc_instance;
 
-       shutdown (ipc_segment->fd, SHUT_RDWR);
-       close (ipc_segment->fd);
-       /*
-        * << 1 (or multiplied by 2) because this is a wrapped memory buffer
-        */
-       memory_unmap (ipc_segment->control_buffer, ipc_segment->control_size);
-       memory_unmap (ipc_segment->request_buffer, ipc_segment->request_size);
-       memory_unmap (ipc_segment->response_buffer, ipc_segment->response_size);
-       memory_unmap (ipc_segment->dispatch_buffer, (ipc_segment->dispatch_size) << 1);
-       free (ipc_segment);
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+       if (res != CS_OK) {
+               return (res);
+       }
+
+       shutdown (ipc_instance->fd, SHUT_RDWR);
+       close (ipc_instance->fd);
+       hdb_handle_destroy (&ipc_hdb, handle);
+       hdb_handle_put (&ipc_hdb, handle);
        return (CS_OK);
 }
 
-int
+cs_error_t
 coroipcc_dispatch_flow_control_get (
-        void *ipc_context)
+       hdb_handle_t handle,
+       unsigned int *flow_control_state)
 {
-       struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
+       struct ipc_instance *ipc_instance;
+       cs_error_t res;
+
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+       if (res != CS_OK) {
+               return (res);
+       }
+
+       *flow_control_state = ipc_instance->flow_control_state;
 
-       return (ipc_segment->flow_control_state);
+       hdb_handle_put (&ipc_hdb, handle);
+       return (res);
 }
 
-int
-coroipcc_fd_get (void *ipc_ctx)
+cs_error_t
+coroipcc_fd_get (
+       hdb_handle_t handle,
+       int *fd)
 {
-       struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+       struct ipc_instance *ipc_instance;
+       cs_error_t res;
 
-       return (ipc_segment->fd);
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+       if (res != CS_OK) {
+               return (res);
+       }
+
+       *fd = ipc_instance->fd;
+
+       hdb_handle_put (&ipc_hdb, handle);
+       return (res);
 }
 
-int
-coroipcc_dispatch_get (void *ipc_ctx, void **data, int timeout)
+cs_error_t
+coroipcc_dispatch_get (
+       hdb_handle_t handle,
+       void **data,
+       int timeout)
 {
        struct pollfd ufds;
        int poll_events;
        char buf;
-       struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+       struct ipc_instance *ipc_instance;
        int res;
        char buf_two = 1;
        char *data_addr;
+       cs_error_t error = CS_OK;
+
+       error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+       if (error != CS_OK) {
+               return (error);
+       }
 
-       ufds.fd = ipc_segment->fd;
+       *data = NULL;
+
+       ufds.fd = ipc_instance->fd;
        ufds.events = POLLIN;
        ufds.revents = 0;
 
@@ -577,259 +729,193 @@ retry_poll:
                goto retry_poll;
        } else
        if (poll_events == -1) {
-               return (-1);
+               goto error_put;
        } else
        if (poll_events == 0) {
-               return (0);
+               goto error_put;
        }
        if (poll_events == 1 && (ufds.revents & (POLLERR|POLLHUP))) {
-               return (-1);
+               error = CS_ERR_LIBRARY;
+               goto error_put;
        }
 retry_recv:
-       res = recv (ipc_segment->fd, &buf, 1, 0);
+       res = recv (ipc_instance->fd, &buf, 1, 0);
        if (res == -1 && errno == EINTR) {
                goto retry_recv;
        } else
        if (res == -1) {
-               return (-1);
+               goto error_put;
        }
        if (res == 0) {
-               return (-1);
+               goto error_put;
        }
-       ipc_segment->flow_control_state = 0;
+       ipc_instance->flow_control_state = 0;
        if (buf == 1 || buf == 2) {
-               ipc_segment->flow_control_state = 1;
+               ipc_instance->flow_control_state = 1;
        }
        /*
         * Notify executive to flush any pending dispatch messages
         */
-       if (ipc_segment->flow_control_state) {
+       if (ipc_instance->flow_control_state) {
                buf_two = MESSAGE_REQ_OUTQ_FLUSH;
-               res = coroipcc_send (ipc_segment->fd, &buf_two, 1);
-               assert (res == 0); //TODO
+               res = socket_send (ipc_instance->fd, &buf_two, 1);
+               assert (res == CS_OK); /* TODO */
        }
        /*
         * This is just a notification of flow control starting at the addition
         * of a new pending message, not a message to dispatch
         */
        if (buf == 2) {
-               return (0);
+               goto error_put;
        }
        if (buf == 3) {
-               return (0);
+               goto error_put;
        }
 
-       data_addr = ipc_segment->dispatch_buffer;
+       data_addr = ipc_instance->dispatch_buffer;
 
-       data_addr = &data_addr[ipc_segment->control_buffer->read];
+       data_addr = &data_addr[ipc_instance->control_buffer->read];
 
        *data = (void *)data_addr;
-       return (1);
+
+       return (CS_OK);
+error_put:
+       hdb_handle_put (&ipc_hdb, handle);
+       return (error);
 }
 
-int
-coroipcc_dispatch_put (void *ipc_ctx)
+cs_error_t
+coroipcc_dispatch_put (hdb_handle_t handle)
 {
        struct sembuf sop;
        coroipc_response_header_t *header;
-       struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+       struct ipc_instance *ipc_instance;
        int res;
        char *addr;
        unsigned int read_idx;
 
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+       if (res != CS_OK) {
+               return (res);
+       }
        sop.sem_num = 2;
        sop.sem_op = -1;
        sop.sem_flg = 0;
 retry_semop:
-       res = semop (ipc_segment->semid, &sop, 1);
+       res = semop (ipc_instance->semid, &sop, 1);
        if (res == -1 && errno == EINTR) {
                goto retry_semop;
        } else
        if (res == -1 && errno == EACCES) {
-               priv_change_send (ipc_segment);
+               priv_change_send (ipc_instance);
                goto retry_semop;
        } else
        if (res == -1) {
-               return (-1);
+               return (CS_ERR_LIBRARY);
        }
 
-       addr = ipc_segment->dispatch_buffer;
+       addr = ipc_instance->dispatch_buffer;
 
-       read_idx = ipc_segment->control_buffer->read;
+       read_idx = ipc_instance->control_buffer->read;
        header = (coroipc_response_header_t *) &addr[read_idx];
-       ipc_segment->control_buffer->read =
-               (read_idx + header->size) % ipc_segment->dispatch_size;
-       return (0);
-}
-
-static cs_error_t
-coroipcc_msg_send (
-       void *ipc_context,
-       const struct iovec *iov,
-       unsigned int iov_len)
-{
-       struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
-       struct sembuf sop;
-       int i;
-       int res;
-       int req_buffer_idx = 0;
-
-       for (i = 0; i < iov_len; i++) {
-               memcpy (&ipc_segment->request_buffer[req_buffer_idx],
-                       iov[i].iov_base,
-                       iov[i].iov_len);
-               req_buffer_idx += iov[i].iov_len;
-       }
+       ipc_instance->control_buffer->read =
+               (read_idx + header->size) % ipc_instance->dispatch_size;
        /*
-        * Signal semaphore #0 indicting a new message from client
-        * to server request queue
+        * Put from dispatch get and also from this call's get
         */
-       sop.sem_num = 0;
-       sop.sem_op = 1;
-       sop.sem_flg = 0;
+       hdb_handle_put (&ipc_hdb, handle);
+       hdb_handle_put (&ipc_hdb, handle);
 
-retry_semop:
-       res = semop (ipc_segment->semid, &sop, 1);
-       if (res == -1 && errno == EINTR) {
-               goto retry_semop;
-       } else
-       if (res == -1 && errno == EACCES) {
-               priv_change_send (ipc_segment);
-               goto retry_semop;
-       } else
-       if (res == -1) {
-               return (CS_ERR_LIBRARY);
-       }
        return (CS_OK);
 }
 
-static cs_error_t
-coroipcc_reply_receive (
-       void *ipc_context,
+cs_error_t
+coroipcc_msg_send_reply_receive (
+       hdb_handle_t handle,
+       const struct iovec *iov,
+       unsigned int iov_len,
        void *res_msg,
        size_t res_len)
 {
-       struct sembuf sop;
-       struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
-       coroipc_response_header_t *response_header;
-       int res;
-
-       /*
-        * Wait for semaphore #1 indicating a new message from server
-        * to client in the response queue
-        */
-       sop.sem_num = 1;
-       sop.sem_op = -1;
-       sop.sem_flg = 0;
-
-retry_semop:
-       res = semop (ipc_segment->semid, &sop, 1);
-       if (res == -1 && errno == EINTR) {
-               goto retry_semop;
-       } else
-       if (res == -1 && errno == EACCES) {
-               priv_change_send (ipc_segment);
-               goto retry_semop;
-       } else
-       if (res == -1) {
-               return (CS_ERR_LIBRARY);
-       }
+       cs_error_t res;
+       struct ipc_instance *ipc_instance;
 
-       response_header = (coroipc_response_header_t *)ipc_segment->response_buffer;
-       if (response_header->error == CS_ERR_TRY_AGAIN) {
-               return (CS_ERR_TRY_AGAIN);
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+       if (res != CS_OK) {
+               return (res);
        }
 
-       memcpy (res_msg, ipc_segment->response_buffer, res_len);
-       return (CS_OK);
-}
+       pthread_mutex_lock (&ipc_instance->mutex);
 
-static cs_error_t
-coroipcc_reply_receive_in_buf (
-       void *ipc_context,
-       void **res_msg)
-{
-       struct sembuf sop;
-       struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
-       int res;
+       res = msg_send (ipc_instance, iov, iov_len);
+       if (res != CS_OK) {
+               goto error_exit;
+       }
 
-       /*
-        * Wait for semaphore #1 indicating a new message from server
-        * to client in the response queue
-        */
-       sop.sem_num = 1;
-       sop.sem_op = -1;
-       sop.sem_flg = 0;
+       res = reply_receive (ipc_instance, res_msg, res_len);
 
-retry_semop:
-       res = semop (ipc_segment->semid, &sop, 1);
-       if (res == -1 && errno == EINTR) {
-               goto retry_semop;
-       } else
-       if (res == -1 && errno == EACCES) {
-               priv_change_send (ipc_segment);
-               goto retry_semop;
-       } else
-       if (res == -1) {
-               return (CS_ERR_LIBRARY);
-       }
+error_exit:
+       hdb_handle_put (&ipc_hdb, handle);
+       pthread_mutex_unlock (&ipc_instance->mutex);
 
-       *res_msg = (char *)ipc_segment->response_buffer;
-       return (CS_OK);
+       return (res);
 }
 
 cs_error_t
-coroipcc_msg_send_reply_receive (
-       void *ipc_context,
+coroipcc_msg_send_reply_receive_in_buf_get (
+       hdb_handle_t handle,
        const struct iovec *iov,
        unsigned int iov_len,
-       void *res_msg,
-       size_t res_len)
+       void **res_msg)
 {
-       cs_error_t res;
+       unsigned int res;
+       struct ipc_instance *ipc_instance;
 
-       res = coroipcc_msg_send (ipc_context, iov, iov_len);
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
        if (res != CS_OK) {
                return (res);
        }
 
-       res = coroipcc_reply_receive (ipc_context, res_msg, res_len);
+       pthread_mutex_lock (&ipc_instance->mutex);
+
+       res = msg_send (ipc_instance, iov, iov_len);
        if (res != CS_OK) {
-               return (res);
+               goto error_exit;
        }
 
-       return (CS_OK);
+       res = reply_receive_in_buf (ipc_instance, res_msg);
+
+error_exit:
+       pthread_mutex_unlock (&ipc_instance->mutex);
+
+       return (res);
 }
 
 cs_error_t
-coroipcc_msg_send_reply_receive_in_buf (
-       void *ipc_context,
-       const struct iovec *iov,
-       unsigned int iov_len,
-       void **res_msg)
+coroipcc_msg_send_reply_receive_in_buf_put (
+       hdb_handle_t handle)
 {
        unsigned int res;
+       struct ipc_instance *ipc_instance;
 
-       res = coroipcc_msg_send (ipc_context, iov, iov_len);
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
        if (res != CS_OK) {
                return (res);
        }
+       hdb_handle_put (&ipc_hdb, handle);
+       hdb_handle_put (&ipc_hdb, handle);
 
-       res = coroipcc_reply_receive_in_buf (ipc_context, res_msg);
-       if (res != CS_OK) {
-               return (res);
-       }
-
-       return (CS_OK);
+       return (res);
 }
 
 cs_error_t
 coroipcc_zcb_alloc (
-       void *ipc_context,
+       hdb_handle_t handle,
        void **buffer,
        size_t size,
        size_t header_size)
 {
+       struct ipc_instance *ipc_instance;
        void *buf = NULL;
        char path[128];
        unsigned int res;
@@ -839,8 +925,12 @@ coroipcc_zcb_alloc (
        struct iovec iovec;
        struct coroipcs_zc_header *hdr;
 
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+       if (res != CS_OK) {
+               return (res);
+       }
        map_size = size + header_size + sizeof (struct coroipcs_zc_header);
-       res = memory_map (path, "cpg_zc-XXXXXX", &buf, size);
+       res = memory_map (path, "corosync_zerocopy-XXXXXX", &buf, size);
        assert (res != -1);
 
        req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
@@ -853,7 +943,7 @@ coroipcc_zcb_alloc (
        iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
 
        res = coroipcc_msg_send_reply_receive (
-               ipc_context,
+               handle,
                &iovec,
                1,
                &res_coroipcs_zc_alloc,
@@ -862,21 +952,28 @@ coroipcc_zcb_alloc (
        hdr = (struct coroipcs_zc_header *)buf;
        hdr->map_size = map_size;
        *buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header);
-       return (CS_OK);
+
+       hdb_handle_put (&ipc_hdb, handle);
+       return (res);
 }
 
 cs_error_t
 coroipcc_zcb_free (
-       void *ipc_context,
+       hdb_handle_t handle,
        void *buffer)
 {
+       struct ipc_instance *ipc_instance;
        mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
        coroipc_response_header_t res_coroipcs_zc_free;
        struct iovec iovec;
        unsigned int res;
-
        struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header));
 
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+       if (res != CS_OK) {
+               return (res);
+       }
+
        req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
        req_coroipcc_zc_free.header.id = ZC_FREE_HEADER;
        req_coroipcc_zc_free.map_size = header->map_size;
@@ -886,7 +983,7 @@ coroipcc_zcb_free (
        iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
 
        res = coroipcc_msg_send_reply_receive (
-               ipc_context,
+               handle,
                &iovec,
                1,
                &res_coroipcs_zc_free,
@@ -894,21 +991,28 @@ coroipcc_zcb_free (
 
        munmap (header, header->map_size);
 
-       return (CS_OK);
+       hdb_handle_put (&ipc_hdb, handle);
+
+       return (res);
 }
 
 cs_error_t
 coroipcc_zcb_msg_send_reply_receive (
-        void *ipc_context,
+       hdb_handle_t handle,
         void *msg,
         void *res_msg,
         size_t res_len)
 {
+       struct ipc_instance *ipc_instance;
        mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
        struct coroipcs_zc_header *hdr;
        struct iovec iovec;
        cs_error_t res;
 
+       res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+       if (res != CS_OK) {
+               return (res);
+       }
        hdr = (struct coroipcs_zc_header *)(((char *)msg) - sizeof (struct coroipcs_zc_header));
 
        req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
@@ -919,11 +1023,12 @@ coroipcc_zcb_msg_send_reply_receive (
        iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
 
        res = coroipcc_msg_send_reply_receive (
-               ipc_context,
+               handle,
                &iovec,
                1,
                res_msg,
                res_len);
 
+       hdb_handle_put (&ipc_hdb, handle);
        return (res);
 }
index 6b15347dd2df5612cf65cf16a49d77144b86c4d4..05a81b0ccc4fdfb946b374563dd72c3f1862b10b 100644 (file)
--- a/lib/cpg.c
+++ b/lib/cpg.c
@@ -42,7 +42,6 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
-#include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <errno.h>
 #include "util.h"
 
 struct cpg_inst {
-       void *ipc_ctx;
+       hdb_handle_t handle;
        int finalize;
        cpg_callbacks_t callbacks;
        void *context;
-       pthread_mutex_t response_mutex;
-       pthread_mutex_t dispatch_mutex;
 };
 
-static void cpg_instance_destructor (void *instance);
-
-DECLARE_HDB_DATABASE(cpg_handle_t_db,cpg_instance_destructor);
-
-/*
- * Clean up function for a cpg instance (cpg_nitialize) handle
- */
-static void cpg_instance_destructor (void *instance)
-{
-       struct cpg_inst *cpg_inst = instance;
-
-       pthread_mutex_destroy (&cpg_inst->response_mutex);
-       pthread_mutex_destroy (&cpg_inst->dispatch_mutex);
-}
-
+DECLARE_HDB_DATABASE(cpg_handle_t_db,NULL);
 
 /**
  * @defgroup cpg_coroipcc The closed process group API
@@ -113,17 +96,13 @@ cs_error_t cpg_initialize (
                IPC_REQUEST_SIZE,
                IPC_RESPONSE_SIZE,
                IPC_DISPATCH_SIZE,
-               &cpg_inst->ipc_ctx);
+               &cpg_inst->handle);
        if (error != CS_OK) {
                goto error_put_destroy;
        }
 
        memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t));
 
-       pthread_mutex_init (&cpg_inst->response_mutex, NULL);
-
-       pthread_mutex_init (&cpg_inst->dispatch_mutex, NULL);
-
        hdb_handle_put (&cpg_handle_t_db, *handle);
 
        return (CS_OK);
@@ -147,22 +126,17 @@ cs_error_t cpg_finalize (
                return (error);
        }
 
-       pthread_mutex_lock (&cpg_inst->response_mutex);
-
        /*
         * Another thread has already started finalizing
         */
        if (cpg_inst->finalize) {
-               pthread_mutex_unlock (&cpg_inst->response_mutex);
                hdb_handle_put (&cpg_handle_t_db, handle);
                return (CPG_ERR_BAD_HANDLE);
        }
 
        cpg_inst->finalize = 1;
 
-       coroipcc_service_disconnect (cpg_inst->ipc_ctx);
-
-       pthread_mutex_unlock (&cpg_inst->response_mutex);
+       coroipcc_service_disconnect (cpg_inst->handle);
 
        hdb_handle_destroy (&cpg_handle_t_db, handle);
 
@@ -183,11 +157,11 @@ cs_error_t cpg_fd_get (
                return (error);
        }
 
-       *fd = coroipcc_fd_get (cpg_inst->ipc_ctx);
+       error = coroipcc_fd_get (cpg_inst->handle, fd);
 
        hdb_handle_put (&cpg_handle_t_db, handle);
 
-       return (CS_OK);
+       return (error);
 }
 
 cs_error_t cpg_context_get (
@@ -235,7 +209,6 @@ cs_error_t cpg_dispatch (
        int timeout = -1;
        cs_error_t error;
        int cont = 1; /* always continue do loop except when set to 0 */
-       int dispatch_avail;
        struct cpg_inst *cpg_inst;
        struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
        struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
@@ -264,30 +237,20 @@ cs_error_t cpg_dispatch (
        }
 
        do {
-               pthread_mutex_lock (&cpg_inst->dispatch_mutex);
-
-               dispatch_avail = coroipcc_dispatch_get (
-                       cpg_inst->ipc_ctx,
+               error = coroipcc_dispatch_get (
+                       cpg_inst->handle,
                        (void **)&dispatch_data,
                        timeout);
-
-               pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
-
-               if (dispatch_avail == 0 && dispatch_types == CPG_DISPATCH_ALL) {
-                       pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
-                       break; /* exit do while cont is 1 loop */
-               } else
-               if (dispatch_avail == 0) {
-                       pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
-                       continue; /* next poll */
+               if (error != CS_OK) {
+                       goto error_put;
                }
-               if (dispatch_avail == -1) {
-                       if (cpg_inst->finalize == 1) {
-                               error = CS_OK;
+
+               if (dispatch_data == NULL) {
+                       if (dispatch_types == CPG_DISPATCH_ALL) {
+                               break; /* exit do while cont is 1 loop */
                        } else {
-                               error = CS_ERR_LIBRARY;
+                               continue; /* next poll */
                        }
-                       goto error_put;
                }
 
                /*
@@ -350,12 +313,12 @@ cs_error_t cpg_dispatch (
                        break;
 
                default:
-                       coroipcc_dispatch_put (cpg_inst->ipc_ctx);
+                       coroipcc_dispatch_put (cpg_inst->handle);
                        error = CS_ERR_LIBRARY;
                        goto error_put;
                        break;
                }
-               coroipcc_dispatch_put (cpg_inst->ipc_ctx);
+               coroipcc_dispatch_put (cpg_inst->handle);
 
                /*
                 * Determine if more messages should be processed
@@ -409,13 +372,9 @@ cs_error_t cpg_join (
        iov[0].iov_len = sizeof (struct req_lib_cpg_join);
 
        do {
-               pthread_mutex_lock (&cpg_inst->response_mutex);
-
-               error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov, 1,
+               error = coroipcc_msg_send_reply_receive (cpg_inst->handle, iov, 1,
                        &res_lib_cpg_join, sizeof (struct res_lib_cpg_join));
 
-               pthread_mutex_unlock (&cpg_inst->response_mutex);
-
                if (error != CS_OK) {
                        goto error_exit;
                }
@@ -454,13 +413,9 @@ cs_error_t cpg_leave (
        iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
 
        do {
-               pthread_mutex_lock (&cpg_inst->response_mutex);
-
-               error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov, 1,
+               error = coroipcc_msg_send_reply_receive (cpg_inst->handle, iov, 1,
                        &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
 
-               pthread_mutex_unlock (&cpg_inst->response_mutex);
-
                if (error != CS_OK) {
                        goto error_exit;
                }
@@ -499,13 +454,9 @@ cs_error_t cpg_membership_get (
        iov.iov_len = sizeof (coroipc_request_header_t);
 
        do {
-               pthread_mutex_lock (&cpg_inst->response_mutex);
-
-               error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, &iov, 1,
+               error = coroipcc_msg_send_reply_receive (cpg_inst->handle, &iov, 1,
                        &res_lib_cpg_membership_get, sizeof (coroipc_response_header_t));
 
-               pthread_mutex_unlock (&cpg_inst->response_mutex);
-
                if (error != CS_OK) {
                        goto error_exit;
                }
@@ -551,13 +502,9 @@ cs_error_t cpg_local_get (
        iov.iov_base = &req_lib_cpg_local_get;
        iov.iov_len = sizeof (struct req_lib_cpg_local_get);
 
-       pthread_mutex_lock (&cpg_inst->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, &iov, 1,
+       error = coroipcc_msg_send_reply_receive (cpg_inst->handle, &iov, 1,
                &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
 
-       pthread_mutex_unlock (&cpg_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -584,7 +531,7 @@ cs_error_t cpg_flow_control_state_get (
                return (error);
        }
 
-       *flow_control_state = coroipcc_dispatch_flow_control_get (cpg_inst->ipc_ctx);
+       error = coroipcc_dispatch_flow_control_get (cpg_inst->handle, (unsigned int *)flow_control_state);
 
        hdb_handle_put (&cpg_handle_t_db, handle);
 
@@ -604,7 +551,7 @@ cs_error_t cpg_zcb_alloc (
                return (error);
        }
 
-       error = coroipcc_zcb_alloc (cpg_inst->ipc_ctx,
+       error = coroipcc_zcb_alloc (cpg_inst->handle,
                buffer,
                size,
                sizeof (struct req_lib_cpg_mcast));
@@ -627,7 +574,7 @@ cs_error_t cpg_zcb_free (
                return (error);
        }
 
-       coroipcc_zcb_free (cpg_inst->ipc_ctx, ((char *)buffer) - sizeof (struct req_lib_cpg_mcast));
+       coroipcc_zcb_free (cpg_inst->handle, ((char *)buffer) - sizeof (struct req_lib_cpg_mcast));
 
        hdb_handle_put (&cpg_handle_t_db, handle);
 
@@ -658,16 +605,12 @@ cs_error_t cpg_zcb_mcast_joined (
        req_lib_cpg_mcast->guarantee = guarantee;
        req_lib_cpg_mcast->msglen = msg_len;
 
-       pthread_mutex_lock (&cpg_inst->response_mutex);
-
        error = coroipcc_zcb_msg_send_reply_receive (
-               cpg_inst->ipc_ctx,
+               cpg_inst->handle,
                req_lib_cpg_mcast,
                &res_lib_cpg_mcast,
                sizeof (res_lib_cpg_mcast));
 
-       pthread_mutex_unlock (&cpg_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -714,13 +657,9 @@ cs_error_t cpg_mcast_joined (
        iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
        memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
 
-       pthread_mutex_lock (&cpg_inst->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov,
+       error = coroipcc_msg_send_reply_receive (cpg_inst->handle, iov,
                iov_len + 1, &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast));
 
-       pthread_mutex_unlock (&cpg_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
index 22fc9e9a07a879ad112b12a1916ec5721b3c4088..4c8d3218b9697db08750998ff49e7ccdd09f556d 100644 (file)
--- a/lib/evs.c
+++ b/lib/evs.c
@@ -43,7 +43,6 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
-#include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <errno.h>
 #define MIN(x,y) ((x) < (y) ? (x) : (y))
 
 struct evs_inst {
-       void *ipc_ctx;
+       hdb_handle_t handle;
        int finalize;
        evs_callbacks_t callbacks;
-       pthread_mutex_t response_mutex;
-       pthread_mutex_t dispatch_mutex;
 };
 
-static void evs_instance_destructor (void *instance);
-
-DECLARE_HDB_DATABASE (evs_handle_t_db, evs_instance_destructor);
+DECLARE_HDB_DATABASE (evs_handle_t_db,NULL);
 
 /*
  * Clean up function for an evt instance (saEvtInitialize) handle
  */
-static void evs_instance_destructor (void *instance)
-{
-       struct evs_inst *evs_inst = instance;
-
-       pthread_mutex_destroy (&evs_inst->response_mutex);
-       pthread_mutex_destroy (&evs_inst->dispatch_mutex);
-}
 
 
 /**
@@ -122,17 +110,13 @@ evs_error_t evs_initialize (
                IPC_REQUEST_SIZE,
                IPC_RESPONSE_SIZE,
                IPC_DISPATCH_SIZE,
-               &evs_inst->ipc_ctx);
+               &evs_inst->handle);
        if (error != EVS_OK) {
                goto error_put_destroy;
        }
 
        memcpy (&evs_inst->callbacks, callbacks, sizeof (evs_callbacks_t));
 
-       pthread_mutex_init (&evs_inst->response_mutex, NULL);
-
-       pthread_mutex_init (&evs_inst->dispatch_mutex, NULL);
-
        hdb_handle_put (&evs_handle_t_db, *handle);
 
        return (CS_OK);
@@ -155,22 +139,18 @@ evs_error_t evs_finalize (
        if (error != CS_OK) {
                return (error);
        }
-       pthread_mutex_lock (&evs_inst->response_mutex);
 
        /*
         * Another thread has already started finalizing
         */
        if (evs_inst->finalize) {
-               pthread_mutex_unlock (&evs_inst->response_mutex);
                hdb_handle_put (&evs_handle_t_db, handle);
                return (EVS_ERR_BAD_HANDLE);
        }
 
        evs_inst->finalize = 1;
 
-       coroipcc_service_disconnect (evs_inst->ipc_ctx);
-
-       pthread_mutex_unlock (&evs_inst->response_mutex);
+       coroipcc_service_disconnect (evs_inst->handle);
 
        hdb_handle_destroy (&evs_handle_t_db, handle);
 
@@ -191,7 +171,7 @@ evs_error_t evs_fd_get (
                return (error);
        }
 
-       *fd = coroipcc_fd_get (evs_inst->ipc_ctx);
+       coroipcc_fd_get (evs_inst->handle, fd);
 
        hdb_handle_put (&evs_handle_t_db, handle);
 
@@ -205,7 +185,6 @@ evs_error_t evs_dispatch (
        int timeout = -1;
        cs_error_t error;
        int cont = 1; /* always continue do loop except when set to 0 */
-       int dispatch_avail;
        struct evs_inst *evs_inst;
        struct res_evs_confchg_callback *res_evs_confchg_callback;
        struct res_evs_deliver_callback *res_evs_deliver_callback;
@@ -227,28 +206,20 @@ evs_error_t evs_dispatch (
        }
 
        do {
-               pthread_mutex_lock (&evs_inst->dispatch_mutex);
-
-               dispatch_avail = coroipcc_dispatch_get (
-                       evs_inst->ipc_ctx,
+               error = coroipcc_dispatch_get (
+                       evs_inst->handle,
                        (void **)&dispatch_data,
                        timeout);
-
-               pthread_mutex_unlock (&evs_inst->dispatch_mutex);
-
-               if (dispatch_avail == 0 && dispatch_types == EVS_DISPATCH_ALL) {
-                       break; /* exit do while cont is 1 loop */
-               } else
-               if (dispatch_avail == 0) {
-                       continue; /* next dispatch event */
+               if (error != CS_OK) {
+                       goto error_put;
                }
-               if (dispatch_avail == -1) {
-                       if (evs_inst->finalize == 1) {
-                               error = CS_OK;
+
+               if (dispatch_data == NULL) {
+                       if (dispatch_types == CPG_DISPATCH_ALL) {
+                               break; /* exit do while cont is 1 loop */
                        } else {
-                               error = CS_ERR_LIBRARY;
+                               continue; /* next poll */
                        }
-                       goto error_put;
                }
 
                /*
@@ -282,12 +253,12 @@ evs_error_t evs_dispatch (
                        break;
 
                default:
-                       coroipcc_dispatch_put (evs_inst->ipc_ctx);
+                       coroipcc_dispatch_put (evs_inst->handle);
                        error = CS_ERR_LIBRARY;
                        goto error_put;
                        break;
                }
-               coroipcc_dispatch_put (evs_inst->ipc_ctx);
+               coroipcc_dispatch_put (evs_inst->handle);
 
                /*
                 * Determine if more messages should be processed
@@ -341,13 +312,9 @@ evs_error_t evs_join (
        iov[1].iov_base = (void*) groups; /* cast away const */
        iov[1].iov_len = (group_entries * sizeof (struct evs_group));
 
-       pthread_mutex_lock (&evs_inst->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx, iov, 2,
+       error = coroipcc_msg_send_reply_receive (evs_inst->handle, iov, 2,
                &res_lib_evs_join, sizeof (struct res_lib_evs_join));
 
-       pthread_mutex_unlock (&evs_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -386,13 +353,9 @@ evs_error_t evs_leave (
        iov[1].iov_base = (void *) groups; /* cast away const */
        iov[1].iov_len = (group_entries * sizeof (struct evs_group));
 
-       pthread_mutex_lock (&evs_inst->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx, iov, 2,
+       error = coroipcc_msg_send_reply_receive (evs_inst->handle, iov, 2,
                &res_lib_evs_leave, sizeof (struct res_lib_evs_leave));
 
-       pthread_mutex_unlock (&evs_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -439,15 +402,11 @@ evs_error_t evs_mcast_joined (
        iov[0].iov_len = sizeof (struct req_lib_evs_mcast_joined);
        memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
 
-       pthread_mutex_lock (&evs_inst->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx, iov,
+       error = coroipcc_msg_send_reply_receive (evs_inst->handle, iov,
                iov_len + 1,
                &res_lib_evs_mcast_joined,
                sizeof (struct res_lib_evs_mcast_joined));
 
-       pthread_mutex_unlock (&evs_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -496,14 +455,11 @@ evs_error_t evs_mcast_groups (
        iov[1].iov_len = (group_entries * sizeof (struct evs_group));
        memcpy (&iov[2], iovec, iov_len * sizeof (struct iovec));
 
-       pthread_mutex_lock (&evs_inst->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx, iov,
+       error = coroipcc_msg_send_reply_receive (evs_inst->handle, iov,
                iov_len + 2,
                &res_lib_evs_mcast_groups,
                sizeof (struct res_lib_evs_mcast_groups));
 
-       pthread_mutex_unlock (&evs_inst->response_mutex);
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -539,16 +495,12 @@ evs_error_t evs_membership_get (
        iov.iov_base = &req_lib_evs_membership_get;
        iov.iov_len = sizeof (struct req_lib_evs_membership_get);
 
-       pthread_mutex_lock (&evs_inst->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive (evs_inst->handle,
                &iov,
                1,
                &res_lib_evs_membership_get,
                sizeof (struct res_lib_evs_membership_get));
 
-       pthread_mutex_unlock (&evs_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
index fc971dc97115dea5d67719024c52edc0c1bce773..6aa82dc4cca72d54948ec88b2b47e8cb039b8d46 100644 (file)
@@ -37,7 +37,6 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
-#include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <errno.h>
 
 #include "util.h"
 
-static void pload_instance_destructor (void *instance);
-
 struct pload_inst {
-       void *ipc_ctx;
-       pthread_mutex_t response_mutex;
-       pthread_mutex_t dispatch_mutex;
+       hdb_handle_t handle;
        unsigned int finalize;
 };
 
-DECLARE_HDB_DATABASE(pload_handle_t_db,pload_instance_destructor);
-
-/*
- * Clean up function for an evt instance (saEvtInitialize) handle
- */
-static void pload_instance_destructor (void *instance)
-{
-       struct pload_inst *pload_inst = instance;
-
-       pthread_mutex_destroy (&pload_inst->response_mutex);
-       pthread_mutex_destroy (&pload_inst->dispatch_mutex);
-}
-
+DECLARE_HDB_DATABASE(pload_handle_t_db,NULL);
 
 /**
  * @defgroup pload_corosync The extended virtual synchrony passthrough API
@@ -111,15 +94,11 @@ unsigned int pload_initialize (
                IPC_REQUEST_SIZE,
                IPC_RESPONSE_SIZE,
                IPC_DISPATCH_SIZE,
-               &pload_inst->ipc_ctx);
+               &pload_inst->handle);
        if (error != CS_OK) {
                goto error_put_destroy;
        }
 
-       pthread_mutex_init (&pload_inst->response_mutex, NULL);
-
-       pthread_mutex_init (&pload_inst->dispatch_mutex, NULL);
-
        (void)hdb_handle_put (&pload_handle_t_db, *handle);
 
        return (CS_OK);
@@ -143,22 +122,17 @@ unsigned int pload_finalize (
                return (error);
        }
 
-       pthread_mutex_lock (&pload_inst->response_mutex);
-
        /*
         * Another thread has already started finalizing
         */
        if (pload_inst->finalize) {
-               pthread_mutex_unlock (&pload_inst->response_mutex);
                (void)hdb_handle_put (&pload_handle_t_db, handle);
                return (PLOAD_ERR_BAD_HANDLE);
        }
 
        pload_inst->finalize = 1;
 
-       coroipcc_service_disconnect(pload_inst->ipc_ctx);
-
-       pthread_mutex_unlock (&pload_inst->response_mutex);
+       coroipcc_service_disconnect(pload_inst->handle);
 
        (void)hdb_handle_destroy (&pload_handle_t_db, handle);
 
@@ -179,7 +153,7 @@ unsigned int pload_fd_get (
                return (error);
        }
 
-       *fd = coroipcc_fd_get (pload_inst->ipc_ctx);
+       coroipcc_fd_get (pload_inst->handle, fd);
 
        (void)hdb_handle_put (&pload_handle_t_db, handle);
 
@@ -212,16 +186,12 @@ unsigned int pload_start (
        iov.iov_base = (char *)&req_lib_pload_start;
        iov.iov_len = sizeof (struct req_lib_pload_start);
 
-       pthread_mutex_lock (&pload_inst->response_mutex);
-
-       error = coroipcc_msg_send_reply_receive(pload_inst->ipc_ctx,
+       error = coroipcc_msg_send_reply_receive(pload_inst->handle,
                &iov,
                1,
                &res_lib_pload_start,
                sizeof (struct res_lib_pload_start));
 
-       pthread_mutex_unlock (&pload_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
index 8ff3b1c1588c2aecc5f7a227bc073d8fb9c0c485..8f6028b8f14b5bba88123cd9054da41c4b51f7a2 100644 (file)
@@ -40,7 +40,6 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
-#include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <errno.h>
 #include "util.h"
 
 struct quorum_inst {
-       void *ipc_ctx;
+       hdb_handle_t handle;
        int finalize;
        const void *context;
        quorum_callbacks_t callbacks;
-       pthread_mutex_t response_mutex;
-       pthread_mutex_t dispatch_mutex;
 };
 
-static void quorum_instance_destructor (void *instance);
-
-DECLARE_HDB_DATABASE(quorum_handle_t_db,quorum_instance_destructor);
-
-/*
- * Clean up function for a quorum instance (quorum_initialize) handle
- */
-static void quorum_instance_destructor (void *instance)
-{
-       struct quorum_inst *quorum_inst = instance;
-
-       pthread_mutex_destroy (&quorum_inst->response_mutex);
-}
+DECLARE_HDB_DATABASE(quorum_handle_t_db,NULL);
 
 cs_error_t quorum_initialize (
        quorum_handle_t *handle,
@@ -102,13 +87,11 @@ cs_error_t quorum_initialize (
                IPC_REQUEST_SIZE,
                IPC_RESPONSE_SIZE,
                IPC_DISPATCH_SIZE,
-               &quorum_inst->ipc_ctx);
+               &quorum_inst->handle);
        if (error != CS_OK) {
                goto error_put_destroy;
        }
 
-       pthread_mutex_init (&quorum_inst->response_mutex, NULL);
-       pthread_mutex_init (&quorum_inst->dispatch_mutex, NULL);
        if (callbacks)
                memcpy(&quorum_inst->callbacks, callbacks, sizeof (callbacks));
        else
@@ -137,22 +120,17 @@ cs_error_t quorum_finalize (
                return (error);
        }
 
-       pthread_mutex_lock (&quorum_inst->response_mutex);
-
        /*
         * Another thread has already started finalizing
         */
        if (quorum_inst->finalize) {
-               pthread_mutex_unlock (&quorum_inst->response_mutex);
                (void)hdb_handle_put (&quorum_handle_t_db, handle);
                return (CS_ERR_BAD_HANDLE);
        }
 
        quorum_inst->finalize = 1;
 
-       coroipcc_service_disconnect (quorum_inst->ipc_ctx);
-
-       pthread_mutex_unlock (&quorum_inst->response_mutex);
+       coroipcc_service_disconnect (quorum_inst->handle);
 
        (void)hdb_handle_destroy (&quorum_handle_t_db, handle);
 
@@ -176,8 +154,6 @@ cs_error_t quorum_getquorate (
                return (error);
        }
 
-       pthread_mutex_lock (&quorum_inst->response_mutex);
-
        req.size = sizeof (req);
        req.id = MESSAGE_REQ_QUORUM_GETQUORATE;
 
@@ -185,14 +161,12 @@ cs_error_t quorum_getquorate (
        iov.iov_len = sizeof (req);
 
        error = coroipcc_msg_send_reply_receive (
-               quorum_inst->ipc_ctx,
+               quorum_inst->handle,
                &iov,
                1,
                &res_lib_quorum_getquorate,
                sizeof (struct res_lib_quorum_getquorate));
 
-       pthread_mutex_unlock (&quorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -219,11 +193,11 @@ cs_error_t quorum_fd_get (
                return (error);
        }
 
-       *fd = coroipcc_fd_get (quorum_inst->ipc_ctx);
+       error = coroipcc_fd_get (quorum_inst->handle, fd);
 
        (void)hdb_handle_put (&quorum_handle_t_db, handle);
 
-       return (CS_OK);
+       return (error);
 }
 
 
@@ -281,8 +255,6 @@ cs_error_t quorum_trackstart (
                return (error);
        }
 
-       pthread_mutex_lock (&quorum_inst->response_mutex);
-
        req_lib_quorum_trackstart.header.size = sizeof (struct req_lib_quorum_trackstart);
        req_lib_quorum_trackstart.header.id = MESSAGE_REQ_QUORUM_TRACKSTART;
        req_lib_quorum_trackstart.track_flags = flags;
@@ -291,14 +263,12 @@ cs_error_t quorum_trackstart (
        iov.iov_len = sizeof (struct req_lib_quorum_trackstart);
 
        error = coroipcc_msg_send_reply_receive (
-               quorum_inst->ipc_ctx,
+               quorum_inst->handle,
                 &iov,
                 1,
                 &res,
                 sizeof (res));
 
-       pthread_mutex_unlock (&quorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -325,8 +295,6 @@ cs_error_t quorum_trackstop (
                return (error);
        }
 
-       pthread_mutex_lock (&quorum_inst->response_mutex);
-
        req.size = sizeof (req);
        req.id = MESSAGE_REQ_QUORUM_TRACKSTOP;
 
@@ -334,14 +302,12 @@ cs_error_t quorum_trackstop (
        iov.iov_len = sizeof (req);
 
        error = coroipcc_msg_send_reply_receive (
-               quorum_inst->ipc_ctx,
+               quorum_inst->handle,
                 &iov,
                 1,
                 &res,
                 sizeof (res));
 
-       pthread_mutex_unlock (&quorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -361,7 +327,6 @@ cs_error_t quorum_dispatch (
        int timeout = -1;
        cs_error_t error;
        int cont = 1; /* always continue do loop except when set to 0 */
-       int dispatch_avail;
        struct quorum_inst *quorum_inst;
        quorum_callbacks_t callbacks;
        coroipc_response_header_t *dispatch_data;
@@ -389,28 +354,12 @@ cs_error_t quorum_dispatch (
        }
 
        do {
-               pthread_mutex_lock (&quorum_inst->dispatch_mutex);
-
-               dispatch_avail = coroipcc_dispatch_get (
-                       quorum_inst->ipc_ctx,
+               error = coroipcc_dispatch_get (
+                       quorum_inst->handle,
                        (void **)&dispatch_data,
                        timeout);
-
-               /*
-                * Handle has been finalized in another thread
-                */
-               if (quorum_inst->finalize == 1) {
-                       error = CS_OK;
-                       goto error_unlock;
-               }
-
-               if (dispatch_avail == 0 && dispatch_types == CS_DISPATCH_ALL) {
-                       pthread_mutex_unlock (&quorum_inst->dispatch_mutex);
-                       break; /* exit do while cont is 1 loop */
-               } else
-               if (dispatch_avail == 0) {
-                       pthread_mutex_unlock (&quorum_inst->dispatch_mutex);
-                       continue; /* next poll */
+               if (error != CS_OK) {
+                       goto error_put;
                }
 
                /*
@@ -419,8 +368,6 @@ cs_error_t quorum_dispatch (
                 * operate at the same time that quorum_finalize has been called in another thread.
                 */
                memcpy (&callbacks, &quorum_inst->callbacks, sizeof (quorum_callbacks_t));
-               pthread_mutex_unlock (&quorum_inst->dispatch_mutex);
-
                /*
                 * Dispatch incoming message
                 */
@@ -440,12 +387,12 @@ cs_error_t quorum_dispatch (
                        break;
 
                default:
-                       coroipcc_dispatch_put (quorum_inst->ipc_ctx);
+                       coroipcc_dispatch_put (quorum_inst->handle);
                        error = CS_ERR_LIBRARY;
                        goto error_put;
                        break;
                }
-               coroipcc_dispatch_put (quorum_inst->ipc_ctx);
+               coroipcc_dispatch_put (quorum_inst->handle);
 
                /*
                 * Determine if more messages should be processed
@@ -463,9 +410,6 @@ cs_error_t quorum_dispatch (
 
        goto error_put;
 
-error_unlock:
-       pthread_mutex_unlock (&quorum_inst->dispatch_mutex);
-
 error_put:
        (void)hdb_handle_put (&quorum_handle_t_db, handle);
        return (error);
index 2b9628589eb13f2fd71bd8e704c7ffcda527ea01..1178cff676df739dd1255dfb99b68cc71c08c1c1 100644 (file)
@@ -41,7 +41,6 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
-#include <pthread.h>
 #include <sys/types.h>
 #include <errno.h>
 
index 4541db3d8a39691cb14461e70ed959430e2f000a..fd5a9241809212c4557b0bdb657f3de49622a055 100644 (file)
@@ -41,7 +41,6 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
-#include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <errno.h>
 #include "util.h"
 
 struct votequorum_inst {
-       void *ipc_ctx;
+       hdb_handle_t handle;
        int finalize;
        void *context;
        votequorum_callbacks_t callbacks;
-       pthread_mutex_t response_mutex;
-       pthread_mutex_t dispatch_mutex;
 };
 
 static void votequorum_instance_destructor (void *instance);
 
-DECLARE_HDB_DATABASE(votequorum_handle_t_db,votequorum_instance_destructor);
-
-/*
- * Clean up function for a quorum instance (votequorum_initialize) handle
- */
-static void votequorum_instance_destructor (void *instance)
-{
-       struct votequorum_inst *votequorum_inst = instance;
-
-       pthread_mutex_destroy (&votequorum_inst->response_mutex);
-}
+DECLARE_HDB_DATABASE(votequorum_handle_t_db,NULL);
 
 cs_error_t votequorum_initialize (
        votequorum_handle_t *handle,
@@ -104,13 +91,11 @@ cs_error_t votequorum_initialize (
                IPC_REQUEST_SIZE,
                IPC_RESPONSE_SIZE,
                IPC_DISPATCH_SIZE,
-                &votequorum_inst->ipc_ctx);
+                &votequorum_inst->handle);
        if (error != CS_OK) {
                goto error_put_destroy;
        }
 
-       pthread_mutex_init (&votequorum_inst->response_mutex, NULL);
-       pthread_mutex_init (&votequorum_inst->dispatch_mutex, NULL);
        if (callbacks)
                memcpy(&votequorum_inst->callbacks, callbacks, sizeof (*callbacks));
        else
@@ -139,22 +124,17 @@ cs_error_t votequorum_finalize (
                return (error);
        }
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
        /*
         * Another thread has already started finalizing
         */
        if (votequorum_inst->finalize) {
-               pthread_mutex_unlock (&votequorum_inst->response_mutex);
                hdb_handle_put (&votequorum_handle_t_db, handle);
                return (CS_ERR_BAD_HANDLE);
        }
 
        votequorum_inst->finalize = 1;
 
-       coroipcc_service_disconnect (votequorum_inst->ipc_ctx);
-
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
+       coroipcc_service_disconnect (votequorum_inst->handle);
 
        hdb_handle_destroy (&votequorum_handle_t_db, handle);
 
@@ -187,17 +167,13 @@ cs_error_t votequorum_getinfo (
        iov.iov_base = (char *)&req_lib_votequorum_getinfo;
        iov.iov_len = sizeof (struct req_lib_votequorum_getinfo);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_getinfo,
                sizeof (struct res_lib_votequorum_getinfo));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -241,17 +217,13 @@ cs_error_t votequorum_setexpected (
        iov.iov_base = (char *)&req_lib_votequorum_setexpected;
        iov.iov_len = sizeof (struct req_lib_votequorum_setexpected);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_status,
                sizeof (struct res_lib_votequorum_status));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -288,17 +260,13 @@ cs_error_t votequorum_setvotes (
        iov.iov_base = (char *)&req_lib_votequorum_setvotes;
        iov.iov_len = sizeof (struct req_lib_votequorum_setvotes);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_status,
                sizeof (struct res_lib_votequorum_status));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -339,17 +307,13 @@ cs_error_t votequorum_qdisk_register (
        iov.iov_base = (char *)&req_lib_votequorum_qdisk_register;
        iov.iov_len = sizeof (struct req_lib_votequorum_qdisk_register);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_status,
                sizeof (struct res_lib_votequorum_status));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -385,17 +349,13 @@ cs_error_t votequorum_qdisk_poll (
        iov.iov_base = (char *)&req_lib_votequorum_qdisk_poll;
        iov.iov_len = sizeof (struct req_lib_votequorum_qdisk_poll);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_status,
                sizeof (struct res_lib_votequorum_status));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -422,8 +382,6 @@ cs_error_t votequorum_qdisk_unregister (
                return (error);
        }
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
        req_lib_votequorum_general.header.size = sizeof (struct req_lib_votequorum_general);
        req_lib_votequorum_general.header.id = MESSAGE_REQ_VOTEQUORUM_QDISK_UNREGISTER;
 
@@ -431,14 +389,12 @@ cs_error_t votequorum_qdisk_unregister (
        iov.iov_len = sizeof (struct req_lib_votequorum_general);
 
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_status,
                sizeof (struct res_lib_votequorum_status));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -475,17 +431,13 @@ cs_error_t votequorum_qdisk_getinfo (
        iov.iov_base = (char *)&req_lib_votequorum_general;
        iov.iov_len = sizeof (struct req_lib_votequorum_general);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_qdisk_getinfo,
                sizeof (struct res_lib_votequorum_qdisk_getinfo));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -523,17 +475,13 @@ cs_error_t votequorum_setstate (
        iov.iov_base = (char *)&req_lib_votequorum_general;
        iov.iov_len = sizeof (struct req_lib_votequorum_general);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_status,
                sizeof (struct res_lib_votequorum_status));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -567,17 +515,13 @@ cs_error_t votequorum_leaving (
        iov.iov_base = (char *)&req_lib_votequorum_general;
        iov.iov_len = sizeof (struct req_lib_votequorum_general);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_status,
                sizeof (struct res_lib_votequorum_status));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -614,17 +558,13 @@ cs_error_t votequorum_trackstart (
        iov.iov_base = (char *)&req_lib_votequorum_trackstart;
        iov.iov_len = sizeof (struct req_lib_votequorum_trackstart);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_status,
                sizeof (struct res_lib_votequorum_status));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -657,17 +597,13 @@ cs_error_t votequorum_trackstop (
        iov.iov_base = (char *)&req_lib_votequorum_general;
        iov.iov_len = sizeof (struct req_lib_votequorum_general);
 
-       pthread_mutex_lock (&votequorum_inst->response_mutex);
-
         error = coroipcc_msg_send_reply_receive (
-               votequorum_inst->ipc_ctx,
+               votequorum_inst->handle,
                &iov,
                1,
                 &res_lib_votequorum_status,
                sizeof (struct res_lib_votequorum_status));
 
-       pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
        if (error != CS_OK) {
                goto error_exit;
        }
@@ -732,11 +668,11 @@ cs_error_t votequorum_fd_get (
                 return (error);
         }
 
-       *fd = coroipcc_fd_get (votequorum_inst->ipc_ctx);
+       error = coroipcc_fd_get (votequorum_inst->handle, fd);
 
        (void)hdb_handle_put (&votequorum_handle_t_db, handle);
 
-       return (CS_OK);
+       return (error);
 }
 
 cs_error_t votequorum_dispatch (
@@ -746,7 +682,6 @@ cs_error_t votequorum_dispatch (
        int timeout = -1;
        cs_error_t error;
        int cont = 1; /* always continue do loop except when set to 0 */
-       int dispatch_avail;
        struct votequorum_inst *votequorum_inst;
        votequorum_callbacks_t callbacks;
        coroipc_response_header_t *dispatch_data;
@@ -775,28 +710,20 @@ cs_error_t votequorum_dispatch (
        }
 
        do {
-               pthread_mutex_lock (&votequorum_inst->dispatch_mutex);
-
-               dispatch_avail = coroipcc_dispatch_get (
-                       votequorum_inst->ipc_ctx,
+               error = coroipcc_dispatch_get (
+                       votequorum_inst->handle,
                        (void **)&dispatch_data,
                        timeout);
-
-               /*
-                * Handle has been finalized in another thread
-                */
-               if (votequorum_inst->finalize == 1) {
-                       error = CS_OK;
-                       goto error_unlock;
+               if (error != CS_OK) {
+                       goto error_put;
                }
 
-               if (dispatch_avail == 0 && dispatch_types == CS_DISPATCH_ALL) {
-                       pthread_mutex_unlock (&votequorum_inst->dispatch_mutex);
-                       break; /* exit do while cont is 1 loop */
-               } else
-               if (dispatch_avail == 0) {
-                       pthread_mutex_unlock (&votequorum_inst->dispatch_mutex);
-                       continue; /* next poll */
+               if (dispatch_data == NULL) {
+                       if (dispatch_types == CPG_DISPATCH_ALL) {
+                               break; /* exit do while cont is 1 loop */
+                       } else {
+                               continue; /* next poll */
+                       }
                }
 
                /*
@@ -805,7 +732,6 @@ cs_error_t votequorum_dispatch (
                 * operate at the same time that votequorum_finalize has been called in another thread.
                 */
                memcpy (&callbacks, &votequorum_inst->callbacks, sizeof (votequorum_callbacks_t));
-               pthread_mutex_unlock (&votequorum_inst->dispatch_mutex);
 
                /*
                 * Dispatch incoming message
@@ -838,12 +764,12 @@ cs_error_t votequorum_dispatch (
                        break;
 
                default:
-                       coroipcc_dispatch_put (votequorum_inst->ipc_ctx);
+                       coroipcc_dispatch_put (votequorum_inst->handle);
                        error = CS_ERR_LIBRARY;
                        goto error_put;
                        break;
                }
-               coroipcc_dispatch_put (votequorum_inst->ipc_ctx);
+               coroipcc_dispatch_put (votequorum_inst->handle);
 
                /*
                 * Determine if more messages should be processed
@@ -861,9 +787,6 @@ cs_error_t votequorum_dispatch (
 
        goto error_put;
 
-error_unlock:
-       pthread_mutex_unlock (&votequorum_inst->dispatch_mutex);
-
 error_put:
        hdb_handle_put (&votequorum_handle_t_db, handle);
        return (error);