]> git.proxmox.com Git - mirror_corosync.git/commitdiff
Use unnamed shared posix semaphores on platforms which support them.
authorSteven Dake <sdake@redhat.com>
Tue, 4 Aug 2009 00:22:41 +0000 (00:22 +0000)
committerSteven Dake <sdake@redhat.com>
Tue, 4 Aug 2009 00:22:41 +0000 (00:22 +0000)
git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2379 fd59a12c-fef9-0310-b244-a6a79926bd2f

exec/coroipcs.c
include/corosync/coroipc_ipc.h
lib/coroipcc.c

index f13f05a062859c6192f12eac1618288073ae3c35..f8948fbe7460e52220f83098014913eff7623248 100644 (file)
@@ -67,7 +67,7 @@
 #include <string.h>
 
 #include <sys/shm.h>
-#include <sys/sem.h>
+
 #include <corosync/corotypes.h>
 #include <corosync/list.h>
 
 #include <corosync/coroipcs.h>
 #include <corosync/coroipc_ipc.h>
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+#include <semaphore.h>
+#else
+#include <sys/sem.h>
+#endif
+
 #ifndef MSG_NOSIGNAL
 #define MSG_NOSIGNAL 0
 #endif
@@ -100,6 +106,7 @@ struct zcb_mapped {
        size_t size;
 };
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 #if defined(_SEM_SEMUN_UNDEFINED)
 union semun {
        int val;
@@ -108,6 +115,8 @@ union semun {
        struct seminfo *__buf;
 };
 #endif
+#endif
+
 
 enum conn_state {
        CONN_STATE_THREAD_INACTIVE = 0,
@@ -126,9 +135,10 @@ struct conn_info {
        enum conn_state state;
        int notify_flow_control_enabled;
        int refcount;
-       key_t shmkey;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        key_t semkey;
        int semid;
+#endif
        unsigned int pending_semops;
        pthread_mutex_t mutex;
        struct control_buffer *control_buffer;
@@ -159,6 +169,32 @@ static void ipc_disconnect (struct conn_info *conn_info);
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
                      int locked);
 
+static void sem_post_exit_thread (struct conn_info *conn_info)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+       struct sembuf sop;
+#endif
+       int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semop:
+       res = sem_post (&conn_info->control_buffer->sem0);
+       if (res == -1 && errno == EINTR) {
+               goto retry_semop;
+       }
+#else
+       sop.sem_num = 1;
+       sop.sem_op = 1;
+       sop.sem_flg = 0;
+
+retry_semop:
+       res = semop (conn_info->semid, &sop, 1);
+       if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+               goto retry_semop;
+       }
+#endif
+}
+
 static int
 memory_map (
        const char *path,
@@ -392,7 +428,7 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
        }
 
        if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
-               pthread_kill (conn_info->thread, SIGUSR1);
+               sem_post_exit_thread (conn_info);
                return (0);
        }
 
@@ -419,13 +455,19 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
        list_del (&conn_info->list);
        pthread_mutex_unlock (&conn_info->mutex);
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       sem_destroy (&conn_info->control_buffer->sem0);
+       sem_destroy (&conn_info->control_buffer->sem1);
+       sem_destroy (&conn_info->control_buffer->sem2);
+#else
+       semctl (conn_info->semid, 0, IPC_RMID);
+#endif
        /*
         * Destroy shared memory segment and semaphore
         */
        res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
        res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
        res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
-       semctl (conn_info->semid, 0, IPC_RMID);
 
        /*
         * Free allocated data needed to retry exiting library IPC connection
@@ -521,7 +563,9 @@ static inline void zerocopy_operations_process (
 static void *pthread_ipc_consumer (void *conn)
 {
        struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        struct sembuf sop;
+#endif
        int res;
        coroipc_request_header_t *header;
        coroipc_response_header_t coroipc_response_header;
@@ -536,6 +580,18 @@ static void *pthread_ipc_consumer (void *conn)
 #endif
 
        for (;;) {
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+               res = sem_wait (&conn_info->control_buffer->sem0);
+               if (ipc_thread_active (conn_info) == 0) {
+                       coroipcs_refcount_dec (conn_info);
+                       pthread_exit (0);
+               }
+               if ((res == -1) && (errno == EINTR)) {
+                       goto retry_semwait;
+               }
+#else
+
                sop.sem_num = 0;
                sop.sem_op = -1;
                sop.sem_flg = 0;
@@ -552,6 +608,7 @@ retry_semop:
                        coroipcs_refcount_dec (conn_info);
                        pthread_exit (0);
                }
+#endif
 
                zerocopy_operations_process (conn_info, &header, &new_message);
                /*
@@ -755,7 +812,7 @@ static void ipc_disconnect (struct conn_info *conn_info)
        conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
        pthread_mutex_unlock (&conn_info->mutex);
 
-       pthread_kill (conn_info->thread, SIGUSR1);
+       sem_post_exit_thread (conn_info);
 }
 
 static int conn_info_create (int fd)
@@ -874,6 +931,14 @@ void coroipcs_ipc_exit (void)
 
                conn_info = list_entry (list, struct conn_info, list);
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+               sem_destroy (&conn_info->control_buffer->sem0);
+               sem_destroy (&conn_info->control_buffer->sem1);
+               sem_destroy (&conn_info->control_buffer->sem2);
+#else
+               semctl (conn_info->semid, 0, IPC_RMID);
+#endif
+
                /*
                 * Unmap memory segments
                 */
@@ -886,9 +951,7 @@ void coroipcs_ipc_exit (void)
                res = circular_memory_unmap (conn_info->dispatch_buffer,
                        conn_info->dispatch_size);
 
-               semctl (conn_info->semid, 0, IPC_RMID);
-
-               pthread_kill (conn_info->thread, SIGUSR1);
+               sem_post_exit_thread (conn_info);
        }
 }
 
@@ -905,10 +968,19 @@ void *coroipcs_private_data_get (void *conn)
 int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
 {
        struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        struct sembuf sop;
+#endif
        int res;
 
        memcpy (conn_info->response_buffer, msg, mlen);
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       res = sem_post (&conn_info->control_buffer->sem1);
+       if (res == -1) {
+               return (-1);
+       }
+#else
        sop.sem_num = 1;
        sop.sem_op = 1;
        sop.sem_flg = 0;
@@ -921,13 +993,16 @@ retry_semop:
        if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
                return (0);
        }
+#endif
        return (0);
 }
 
 int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
 {
        struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        struct sembuf sop;
+#endif
        int res;
        int write_idx = 0;
        int i;
@@ -938,6 +1013,12 @@ int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned in
                write_idx += iov[i].iov_len;
        }
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       res = sem_post (&conn_info->control_buffer->sem1);
+       if (res == -1) {
+               return (-1);
+       }
+#else
        sop.sem_num = 1;
        sop.sem_op = 1;
        sop.sem_flg = 0;
@@ -950,6 +1031,7 @@ retry_semop:
        if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
                return (0);
        }
+#endif
        return (0);
 }
 
@@ -984,7 +1066,9 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
                      int locked)
 {
        struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        struct sembuf sop;
+#endif
        int res;
        int i;
        char buf;
@@ -1009,6 +1093,9 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
        if (res == -1) {
                ipc_disconnect (conn_info);
        }
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       res = sem_post (&conn_info->control_buffer->sem2);
+#else
        sop.sem_num = 2;
        sop.sem_op = 1;
        sop.sem_flg = 0;
@@ -1021,6 +1108,7 @@ retry_semop:
        if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
                return;
        }
+#endif
 }
 
 static void outq_flush (struct conn_info *conn_info) {
@@ -1062,9 +1150,11 @@ static int priv_change (struct conn_info *conn_info)
 {
        mar_req_priv_change req_priv_change;
        unsigned int res;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        union semun semun;
        struct semid_ds ipc_set;
        int i;
+#endif
 
 retry_recv:
        res = recv (conn_info->fd, &req_priv_change,
@@ -1087,6 +1177,7 @@ retry_recv:
        }
 #endif
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        ipc_set.sem_perm.uid = req_priv_change.euid;
        ipc_set.sem_perm.gid = req_priv_change.egid;
        ipc_set.sem_perm.mode = 0600;
@@ -1099,6 +1190,7 @@ retry_recv:
                        return (-1);
                }
        }
+#endif
        return (0);
 }
 
@@ -1293,7 +1385,9 @@ int coroipcs_handler_dispatch (
                        return (0);
                }
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
                conn_info->semkey = req_setup->semkey;
+#endif
                res = memory_map (
                        req_setup->control_file,
                        req_setup->control_size,
@@ -1323,7 +1417,9 @@ int coroipcs_handler_dispatch (
                conn_info->notify_flow_control_enabled = 0;
                conn_info->setup_bytes_read = 0;
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
                conn_info->semid = semget (conn_info->semkey, 3, 0600);
+#endif
                conn_info->pending_semops = 0;
 
                /*
index 96aef14a795c9bdd582004a97b80fc62d652262a..07f35e918c59407721d48239a236b9d9b0370e93 100644 (file)
 #ifndef COROIPC_IPC_H_DEFINED
 #define COROIPC_IPC_H_DEFINED
 
+#include <unistd.h>
+#include "config.h"
+
+/*
+ * Darwin claims to support process shared synchronization
+ * but it really does not.  The unistd.h header file is wrong.
+ */
+#ifdef COROSYNC_DARWIN
+#undef _POSIX_THREAD_PROCESS_SHARED
+#define _POSIX_THREAD_PROCESS_SHARED -1
+#endif
+
+#ifndef _POSIX_THREAD_PROCESS_SHARED
+#define _POSIX_THREAD_PROCESS_SHARED -1
+#endif
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+#include <semaphore.h>
+#endif
+
 enum req_init_types {
        MESSAGE_REQ_RESPONSE_INIT = 0,
        MESSAGE_REQ_DISPATCH_INIT = 1
@@ -45,6 +65,11 @@ enum req_init_types {
 struct control_buffer {
        unsigned int read;
        unsigned int write;
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       sem_t sem0;
+       sem_t sem1;
+       sem_t sem2;
+#endif
 };
 
 enum res_init_types {
index b6f05eec22f1b24977d8555bb5e92758e1503f68..31de65607b5e291604e9b854fe18648a36d0830a 100644 (file)
@@ -55,7 +55,6 @@
 #include <netinet/in.h>
 #include <assert.h>
 #include <sys/shm.h>
-#include <sys/sem.h>
 #include <sys/mman.h>
 
 #include <corosync/corotypes.h>
 #include <corosync/coroipcc.h>
 #include <corosync/hdb.h>
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+#include <semaphore.h>
+#else
+#include <sys/sem.h>
+#endif
+
 #include "util.h"
 
 struct ipc_instance {
        int fd;
-       int shmid;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        int semid;
+#endif
        int flow_control_state;
        struct control_buffer *control_buffer;
        char *request_buffer;
@@ -258,6 +264,7 @@ priv_change_send (struct ipc_instance *ipc_instance)
        return (0);
 }
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 #if defined(_SEM_SEMUN_UNDEFINED)
 union semun {
         int val;
@@ -266,6 +273,7 @@ union semun {
         struct seminfo *__buf;
 };
 #endif
+#endif
 
 static int
 circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
@@ -391,7 +399,10 @@ msg_send (
        const struct iovec *iov,
        unsigned int iov_len)
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        struct sembuf sop;
+#endif
+
        int i;
        int res;
        int req_buffer_idx = 0;
@@ -402,6 +413,13 @@ msg_send (
                        iov[i].iov_len);
                req_buffer_idx += iov[i].iov_len;
        }
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       res = sem_post (&ipc_instance->control_buffer->sem0);
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+#else 
        /*
         * Signal semaphore #0 indicting a new message from client
         * to server request queue
@@ -422,6 +440,7 @@ retry_semop:
        if (res == -1) {
                return (CS_ERR_LIBRARY);
        }
+#endif
        return (CS_OK);
 }
 
@@ -431,10 +450,19 @@ reply_receive (
        void *res_msg,
        size_t res_len)
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        struct sembuf sop;
+#endif
        coroipc_response_header_t *response_header;
        int res;
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+       res = sem_wait (&ipc_instance->control_buffer->sem1);
+       if (res == -1 && errno == EINTR) {
+               goto retry_semwait;
+       }
+#else
        /*
         * Wait for semaphore #1 indicating a new message from server
         * to client in the response queue
@@ -455,6 +483,7 @@ retry_semop:
        if (res == -1) {
                return (CS_ERR_LIBRARY);
        }
+#endif
 
        response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
        if (response_header->error == CS_ERR_TRY_AGAIN) {
@@ -470,9 +499,18 @@ reply_receive_in_buf (
        struct ipc_instance *ipc_instance,
        void **res_msg)
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        struct sembuf sop;
+#endif
        int res;
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+       res = sem_wait (&ipc_instance->control_buffer->sem1);
+       if (res == -1 && errno == EINTR) {
+               goto retry_semwait;
+       }
+#else
        /*
         * Wait for semaphore #1 indicating a new message from server
         * to client in the response queue
@@ -493,6 +531,7 @@ retry_semop:
        if (res == -1) {
                return (CS_ERR_LIBRARY);
        }
+#endif
 
        *res_msg = (char *)ipc_instance->response_buffer;
        return (CS_OK);
@@ -515,11 +554,13 @@ coroipcc_service_connect (
        struct sockaddr_un address;
        cs_error_t res;
        struct ipc_instance *ipc_instance;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        key_t semkey = 0;
+       union semun semun;
+#endif
        int sys_res;
        mar_req_setup_t req_setup;
        mar_res_setup_t res_setup;
-       union semun semun;
        char control_map_path[128];
        char request_map_path[128];
        char response_map_path[128];
@@ -568,6 +609,36 @@ coroipcc_service_connect (
                return (CS_ERR_TRY_AGAIN);
        }
 
+       res = memory_map (
+               control_map_path,
+               "control_buffer-XXXXXX",
+               (void *)&ipc_instance->control_buffer,
+               8192);
+
+       res = memory_map (
+               request_map_path,
+               "request_buffer-XXXXXX",
+               (void *)&ipc_instance->request_buffer,
+               request_size);
+
+       res = memory_map (
+               response_map_path,
+               "response_buffer-XXXXXX",
+               (void *)&ipc_instance->response_buffer,
+               response_size);
+
+       res = circular_memory_map (
+               dispatch_map_path,
+               "dispatch_buffer-XXXXXX",
+               (void *)&ipc_instance->dispatch_buffer,
+               dispatch_size);
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
+       sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
+       sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+#else
+
        /*
         * Allocate a semaphore segment
         */
@@ -600,30 +671,7 @@ coroipcc_service_connect (
        if (res != 0) {
                goto res_exit;
        }
-
-       res = memory_map (
-               control_map_path,
-               "control_buffer-XXXXXX",
-               (void *)&ipc_instance->control_buffer,
-               8192);
-
-       res = memory_map (
-               request_map_path,
-               "request_buffer-XXXXXX",
-               (void *)&ipc_instance->request_buffer,
-               request_size);
-
-       res = memory_map (
-               response_map_path,
-               "response_buffer-XXXXXX",
-               (void *)&ipc_instance->response_buffer,
-               response_size);
-
-       res = circular_memory_map (
-               dispatch_map_path,
-               "dispatch_buffer-XXXXXX",
-               (void *)&ipc_instance->dispatch_buffer,
-               dispatch_size);
+#endif
 
        /*
         * Initialize IPC setup message
@@ -637,7 +685,10 @@ coroipcc_service_connect (
        req_setup.request_size = request_size;
        req_setup.response_size = response_size;
        req_setup.dispatch_size = dispatch_size;
+
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        req_setup.semkey = semkey;
+#endif
 
        res = socket_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
        if (res != CS_OK) {
@@ -668,8 +719,10 @@ coroipcc_service_connect (
 
 res_exit:
        close (request_fd);
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        if (ipc_instance->semid > 0)
                semctl (ipc_instance->semid, 0, IPC_RMID);
+#endif
        return (res_setup.error);
 }
 
@@ -821,7 +874,9 @@ error_put:
 cs_error_t
 coroipcc_dispatch_put (hdb_handle_t handle)
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        struct sembuf sop;
+#endif
        coroipc_response_header_t *header;
        struct ipc_instance *ipc_instance;
        int res;
@@ -832,6 +887,13 @@ coroipcc_dispatch_put (hdb_handle_t handle)
        if (res != CS_OK) {
                return (res);
        }
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+       res = sem_wait (&ipc_instance->control_buffer->sem2);
+       if (res == -1 && errno == EINTR) {
+               goto retry_semwait;
+       }
+#else
        sop.sem_num = 2;
        sop.sem_op = -1;
        sop.sem_flg = 0;
@@ -847,6 +909,7 @@ retry_semop:
        if (res == -1) {
                return (CS_ERR_LIBRARY);
        }
+#endif
 
        addr = ipc_instance->dispatch_buffer;