#include <string.h>
#include <sys/shm.h>
-#include <sys/sem.h>
+
#include <corosync/corotypes.h>
#include <corosync/list.h>
#include <corosync/coroipcs.h>
#include <corosync/coroipc_ipc.h>
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+#include <semaphore.h>
+#else
+#include <sys/sem.h>
+#endif
+
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
size_t size;
};
+#if _POSIX_THREAD_PROCESS_SHARED < 1
#if defined(_SEM_SEMUN_UNDEFINED)
union semun {
int val;
struct seminfo *__buf;
};
#endif
+#endif
+
enum conn_state {
CONN_STATE_THREAD_INACTIVE = 0,
enum conn_state state;
int notify_flow_control_enabled;
int refcount;
- key_t shmkey;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
key_t semkey;
int semid;
+#endif
unsigned int pending_semops;
pthread_mutex_t mutex;
struct control_buffer *control_buffer;
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
int locked);
+static void sem_post_exit_thread (struct conn_info *conn_info)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+ struct sembuf sop;
+#endif
+ int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semop:
+ res = sem_post (&conn_info->control_buffer->sem0);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ }
+#else
+ sop.sem_num = 1;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (conn_info->semid, &sop, 1);
+ if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+ goto retry_semop;
+ }
+#endif
+}
+
static int
memory_map (
const char *path,
}
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
- pthread_kill (conn_info->thread, SIGUSR1);
+ sem_post_exit_thread (conn_info);
return (0);
}
list_del (&conn_info->list);
pthread_mutex_unlock (&conn_info->mutex);
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ sem_destroy (&conn_info->control_buffer->sem0);
+ sem_destroy (&conn_info->control_buffer->sem1);
+ sem_destroy (&conn_info->control_buffer->sem2);
+#else
+ semctl (conn_info->semid, 0, IPC_RMID);
+#endif
/*
* Destroy shared memory segment and semaphore
*/
res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
- semctl (conn_info->semid, 0, IPC_RMID);
/*
* Free allocated data needed to retry exiting library IPC connection
static void *pthread_ipc_consumer (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
+#endif
int res;
coroipc_request_header_t *header;
coroipc_response_header_t coroipc_response_header;
#endif
for (;;) {
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+ res = sem_wait (&conn_info->control_buffer->sem0);
+ if (ipc_thread_active (conn_info) == 0) {
+ coroipcs_refcount_dec (conn_info);
+ pthread_exit (0);
+ }
+ if ((res == -1) && (errno == EINTR)) {
+ goto retry_semwait;
+ }
+#else
+
sop.sem_num = 0;
sop.sem_op = -1;
sop.sem_flg = 0;
coroipcs_refcount_dec (conn_info);
pthread_exit (0);
}
+#endif
zerocopy_operations_process (conn_info, &header, &new_message);
/*
conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
pthread_mutex_unlock (&conn_info->mutex);
- pthread_kill (conn_info->thread, SIGUSR1);
+ sem_post_exit_thread (conn_info);
}
static int conn_info_create (int fd)
conn_info = list_entry (list, struct conn_info, list);
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ sem_destroy (&conn_info->control_buffer->sem0);
+ sem_destroy (&conn_info->control_buffer->sem1);
+ sem_destroy (&conn_info->control_buffer->sem2);
+#else
+ semctl (conn_info->semid, 0, IPC_RMID);
+#endif
+
/*
* Unmap memory segments
*/
res = circular_memory_unmap (conn_info->dispatch_buffer,
conn_info->dispatch_size);
- semctl (conn_info->semid, 0, IPC_RMID);
-
- pthread_kill (conn_info->thread, SIGUSR1);
+ sem_post_exit_thread (conn_info);
}
}
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
+#endif
int res;
memcpy (conn_info->response_buffer, msg, mlen);
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ res = sem_post (&conn_info->control_buffer->sem1);
+ if (res == -1) {
+ return (-1);
+ }
+#else
sop.sem_num = 1;
sop.sem_op = 1;
sop.sem_flg = 0;
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
return (0);
}
+#endif
return (0);
}
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
+#endif
int res;
int write_idx = 0;
int i;
write_idx += iov[i].iov_len;
}
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ res = sem_post (&conn_info->control_buffer->sem1);
+ if (res == -1) {
+ return (-1);
+ }
+#else
sop.sem_num = 1;
sop.sem_op = 1;
sop.sem_flg = 0;
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
return (0);
}
+#endif
return (0);
}
int locked)
{
struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
+#endif
int res;
int i;
char buf;
if (res == -1) {
ipc_disconnect (conn_info);
}
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ res = sem_post (&conn_info->control_buffer->sem2);
+#else
sop.sem_num = 2;
sop.sem_op = 1;
sop.sem_flg = 0;
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
return;
}
+#endif
}
static void outq_flush (struct conn_info *conn_info) {
{
mar_req_priv_change req_priv_change;
unsigned int res;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
union semun semun;
struct semid_ds ipc_set;
int i;
+#endif
retry_recv:
res = recv (conn_info->fd, &req_priv_change,
}
#endif
+#if _POSIX_THREAD_PROCESS_SHARED < 1
ipc_set.sem_perm.uid = req_priv_change.euid;
ipc_set.sem_perm.gid = req_priv_change.egid;
ipc_set.sem_perm.mode = 0600;
return (-1);
}
}
+#endif
return (0);
}
return (0);
}
+#if _POSIX_THREAD_PROCESS_SHARED < 1
conn_info->semkey = req_setup->semkey;
+#endif
res = memory_map (
req_setup->control_file,
req_setup->control_size,
conn_info->notify_flow_control_enabled = 0;
conn_info->setup_bytes_read = 0;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
conn_info->semid = semget (conn_info->semkey, 3, 0600);
+#endif
conn_info->pending_semops = 0;
/*
#include <netinet/in.h>
#include <assert.h>
#include <sys/shm.h>
-#include <sys/sem.h>
#include <sys/mman.h>
#include <corosync/corotypes.h>
#include <corosync/coroipcc.h>
#include <corosync/hdb.h>
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+#include <semaphore.h>
+#else
+#include <sys/sem.h>
+#endif
+
#include "util.h"
struct ipc_instance {
int fd;
- int shmid;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
int semid;
+#endif
int flow_control_state;
struct control_buffer *control_buffer;
char *request_buffer;
return (0);
}
+#if _POSIX_THREAD_PROCESS_SHARED < 1
#if defined(_SEM_SEMUN_UNDEFINED)
union semun {
int val;
struct seminfo *__buf;
};
#endif
+#endif
static int
circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
const struct iovec *iov,
unsigned int iov_len)
{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
+#endif
+
int i;
int res;
int req_buffer_idx = 0;
iov[i].iov_len);
req_buffer_idx += iov[i].iov_len;
}
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ res = sem_post (&ipc_instance->control_buffer->sem0);
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#else
/*
* Signal semaphore #0 indicting a new message from client
* to server request queue
if (res == -1) {
return (CS_ERR_LIBRARY);
}
+#endif
return (CS_OK);
}
void *res_msg,
size_t res_len)
{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
+#endif
coroipc_response_header_t *response_header;
int res;
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+ res = sem_wait (&ipc_instance->control_buffer->sem1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semwait;
+ }
+#else
/*
* Wait for semaphore #1 indicating a new message from server
* to client in the response queue
if (res == -1) {
return (CS_ERR_LIBRARY);
}
+#endif
response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
if (response_header->error == CS_ERR_TRY_AGAIN) {
struct ipc_instance *ipc_instance,
void **res_msg)
{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
+#endif
int res;
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+ res = sem_wait (&ipc_instance->control_buffer->sem1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semwait;
+ }
+#else
/*
* Wait for semaphore #1 indicating a new message from server
* to client in the response queue
if (res == -1) {
return (CS_ERR_LIBRARY);
}
+#endif
*res_msg = (char *)ipc_instance->response_buffer;
return (CS_OK);
struct sockaddr_un address;
cs_error_t res;
struct ipc_instance *ipc_instance;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
key_t semkey = 0;
+ union semun semun;
+#endif
int sys_res;
mar_req_setup_t req_setup;
mar_res_setup_t res_setup;
- union semun semun;
char control_map_path[128];
char request_map_path[128];
char response_map_path[128];
return (CS_ERR_TRY_AGAIN);
}
+ res = memory_map (
+ control_map_path,
+ "control_buffer-XXXXXX",
+ (void *)&ipc_instance->control_buffer,
+ 8192);
+
+ res = memory_map (
+ request_map_path,
+ "request_buffer-XXXXXX",
+ (void *)&ipc_instance->request_buffer,
+ request_size);
+
+ res = memory_map (
+ response_map_path,
+ "response_buffer-XXXXXX",
+ (void *)&ipc_instance->response_buffer,
+ response_size);
+
+ res = circular_memory_map (
+ dispatch_map_path,
+ "dispatch_buffer-XXXXXX",
+ (void *)&ipc_instance->dispatch_buffer,
+ dispatch_size);
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+#else
+
/*
* Allocate a semaphore segment
*/
if (res != 0) {
goto res_exit;
}
-
- res = memory_map (
- control_map_path,
- "control_buffer-XXXXXX",
- (void *)&ipc_instance->control_buffer,
- 8192);
-
- res = memory_map (
- request_map_path,
- "request_buffer-XXXXXX",
- (void *)&ipc_instance->request_buffer,
- request_size);
-
- res = memory_map (
- response_map_path,
- "response_buffer-XXXXXX",
- (void *)&ipc_instance->response_buffer,
- response_size);
-
- res = circular_memory_map (
- dispatch_map_path,
- "dispatch_buffer-XXXXXX",
- (void *)&ipc_instance->dispatch_buffer,
- dispatch_size);
+#endif
/*
* Initialize IPC setup message
req_setup.request_size = request_size;
req_setup.response_size = response_size;
req_setup.dispatch_size = dispatch_size;
+
+#if _POSIX_THREAD_PROCESS_SHARED < 1
req_setup.semkey = semkey;
+#endif
res = socket_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
if (res != CS_OK) {
res_exit:
close (request_fd);
+#if _POSIX_THREAD_PROCESS_SHARED < 1
if (ipc_instance->semid > 0)
semctl (ipc_instance->semid, 0, IPC_RMID);
+#endif
return (res_setup.error);
}
cs_error_t
coroipcc_dispatch_put (hdb_handle_t handle)
{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
struct sembuf sop;
+#endif
coroipc_response_header_t *header;
struct ipc_instance *ipc_instance;
int res;
if (res != CS_OK) {
return (res);
}
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+ res = sem_wait (&ipc_instance->control_buffer->sem2);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semwait;
+ }
+#else
sop.sem_num = 2;
sop.sem_op = -1;
sop.sem_flg = 0;
if (res == -1) {
return (CS_ERR_LIBRARY);
}
+#endif
addr = ipc_instance->dispatch_buffer;