#include <sys/poll.h>
#include <sys/socket.h>
#include <corosync/corotypes.h>
+#include <corosync/hdb.h>
#ifdef __cplusplus
extern "C" {
size_t request_size,
size_t respnse__size,
size_t dispatch_size,
- void **ipc_context);
+ hdb_handle_t *handle);
extern cs_error_t
coroipcc_service_disconnect (
- void *ipc_context);
+ hdb_handle_t handle);
-extern int
+extern cs_error_t
coroipcc_fd_get (
- void *ipc_context);
+ hdb_handle_t handle,
+ int *fd);
-extern int
+extern cs_error_t
coroipcc_dispatch_get (
- void *ipc_context,
+ hdb_handle_t handle,
void **buf,
int timeout);
-extern int
+extern cs_error_t
coroipcc_dispatch_put (
- void *ipc_context);
+ hdb_handle_t handle);
-extern int
+extern cs_error_t
coroipcc_dispatch_flow_control_get (
- void *ipc_context);
+ hdb_handle_t handle,
+ unsigned int *flow_control_state);
extern cs_error_t
coroipcc_msg_send_reply_receive (
- void *ipc_context,
+ hdb_handle_t handle,
const struct iovec *iov,
unsigned int iov_len,
void *res_msg,
size_t res_len);
extern cs_error_t
-coroipcc_msg_send_reply_receive_in_buf (
- void *ipc_context,
+coroipcc_msg_send_reply_receive_in_buf_get (
+ hdb_handle_t handle,
const struct iovec *iov,
unsigned int iov_len,
void **res_msg);
+extern cs_error_t
+coroipcc_msg_send_reply_receive_in_buf_put (
+ hdb_handle_t handle);
+
extern cs_error_t
coroipcc_zcb_alloc (
- void *ipc_context,
+ hdb_handle_t handle,
void **buffer,
size_t size,
size_t header_size);
extern cs_error_t
coroipcc_zcb_free (
- void *ipc_context,
+ hdb_handle_t handle,
void *buffer);
extern cs_error_t
coroipcc_zcb_msg_send_reply_receive (
- void *ipc_context,
+ hdb_handle_t handle,
void *msg,
void *res_msg,
size_t res_len);
-
#ifdef __cplusplus
}
#endif
* Data structure for instance data
*/
struct cfg_instance {
- void *ipc_ctx;
+ hdb_handle_t handle;
corosync_cfg_callbacks_t callbacks;
cs_name_t comp_name;
int comp_registered;
int finalize;
- pthread_mutex_t response_mutex;
- pthread_mutex_t dispatch_mutex;
};
-static void cfg_handle_instance_destructor (void *);
-
/*
* All instances in one database
*/
-DECLARE_HDB_DATABASE (cfg_hdb,cfg_handle_instance_destructor);
+DECLARE_HDB_DATABASE (cfg_hdb,NULL);
/*
* Implementation
*/
-void cfg_handle_instance_destructor (void *instance)
-{
- struct cfg_instance *cfg_instance = instance;
-
- pthread_mutex_destroy (&cfg_instance->response_mutex);
- pthread_mutex_destroy (&cfg_instance->dispatch_mutex);
-}
cs_error_t
corosync_cfg_initialize (
IPC_REQUEST_SIZE,
IPC_RESPONSE_SIZE,
IPC_DISPATCH_SIZE,
- &cfg_instance->ipc_ctx);
+ &cfg_instance->handle);
if (error != CS_OK) {
goto error_put_destroy;
}
memcpy (&cfg_instance->callbacks, cfg_callbacks, sizeof (corosync_cfg_callbacks_t));
}
- pthread_mutex_init (&cfg_instance->response_mutex, NULL);
-
- pthread_mutex_init (&cfg_instance->dispatch_mutex, NULL);
-
(void)hdb_handle_put (&cfg_hdb, *cfg_handle);
return (CS_OK);
return (error);
}
- *selection_fd = coroipcc_fd_get (cfg_instance->ipc_ctx);
+ error = coroipcc_fd_get (cfg_instance->handle, selection_fd);
(void)hdb_handle_put (&cfg_hdb, cfg_handle);
- return (CS_OK);
+ return (error);
}
cs_error_t
int timeout = -1;
cs_error_t error;
int cont = 1; /* always continue do loop except when set to 0 */
- int dispatch_avail;
struct cfg_instance *cfg_instance;
struct res_lib_cfg_testshutdown *res_lib_cfg_testshutdown;
corosync_cfg_callbacks_t callbacks;
}
do {
- pthread_mutex_lock (&cfg_instance->dispatch_mutex);
- dispatch_avail = coroipcc_dispatch_get (
- cfg_instance->ipc_ctx,
+ error = coroipcc_dispatch_get (
+ cfg_instance->handle,
(void **)&dispatch_data,
timeout);
-
- /*
- * Handle has been finalized in another thread
- */
- if (cfg_instance->finalize == 1) {
- error = CS_OK;
- pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
+ if (error != CS_OK) {
goto error_put;
}
- if (dispatch_avail == 0 && dispatch_flags == CS_DISPATCH_ALL) {
- pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
- break; /* exit do while cont is 1 loop */
- } else
- if (dispatch_avail == 0) {
- pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
- continue; /* next poll */
+ if (dispatch_data == NULL) {
+ if (dispatch_flags == CPG_DISPATCH_ALL) {
+ break; /* exit do while cont is 1 loop */
+ } else {
+ continue; /* next poll */
+ }
}
/*
* operate at the same time that cfgFinalize has been called in another thread.
*/
memcpy (&callbacks, &cfg_instance->callbacks, sizeof (corosync_cfg_callbacks_t));
- pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
/*
* Dispatch incoming response
}
break;
default:
- coroipcc_dispatch_put (cfg_instance->ipc_ctx);
+ coroipcc_dispatch_put (cfg_instance->handle);
error = CS_ERR_LIBRARY;
goto error_nounlock;
break;
}
- coroipcc_dispatch_put (cfg_instance->ipc_ctx);
+ coroipcc_dispatch_put (cfg_instance->handle);
/*
* Determine if more messages should be processed
return (error);
}
- pthread_mutex_lock (&cfg_instance->dispatch_mutex);
-
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
/*
* Another thread has already started finalizing
*/
if (cfg_instance->finalize) {
- pthread_mutex_unlock (&cfg_instance->response_mutex);
- pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
(void)hdb_handle_put (&cfg_hdb, cfg_handle);
return (CS_ERR_BAD_HANDLE);
}
cfg_instance->finalize = 1;
- coroipcc_service_disconnect (cfg_instance->ipc_ctx);
-
- pthread_mutex_unlock (&cfg_instance->response_mutex);
-
- pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
-
- pthread_mutex_destroy (&cfg_instance->response_mutex);
-
- pthread_mutex_destroy (&cfg_instance->dispatch_mutex);
+ coroipcc_service_disconnect (cfg_instance->handle);
(void)hdb_handle_destroy (&cfg_hdb, cfg_handle);
iov.iov_base = &req_lib_cfg_ringstatusget,
iov.iov_len = sizeof (struct req_lib_cfg_ringstatusget),
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive(cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive(cfg_instance->handle,
&iov,
1,
&res_lib_cfg_ringstatusget,
sizeof (struct res_lib_cfg_ringstatusget));
- pthread_mutex_unlock (&cfg_instance->response_mutex);
-
*interface_count = res_lib_cfg_ringstatusget.interface_count;
*interface_names = malloc (sizeof (char *) * *interface_count);
if (*interface_names == NULL) {
iov.iov_base = &req_lib_cfg_ringreenable,
iov.iov_len = sizeof (struct req_lib_cfg_ringreenable);
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
&iov,
1,
&res_lib_cfg_ringreenable,
sizeof (struct res_lib_cfg_ringreenable));
- pthread_mutex_unlock (&cfg_instance->response_mutex);
(void)hdb_handle_put (&cfg_hdb, cfg_handle);
return (error);
iov.iov_base = &req_lib_cfg_serviceload;
iov.iov_len = sizeof (req_lib_cfg_serviceload);
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
&iov,
1,
&res_lib_cfg_serviceload,
sizeof (struct res_lib_cfg_serviceload));
- pthread_mutex_unlock (&cfg_instance->response_mutex);
(void)hdb_handle_put (&cfg_hdb, cfg_handle);
return (error);
iov.iov_base = &req_lib_cfg_serviceunload;
iov.iov_len = sizeof (req_lib_cfg_serviceunload);
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
&iov,
1,
&res_lib_cfg_serviceunload,
sizeof (struct res_lib_cfg_serviceunload));
- pthread_mutex_unlock (&cfg_instance->response_mutex);
(void)hdb_handle_put (&cfg_hdb, cfg_handle);
return (error);
iov.iov_base = &req_lib_cfg_statetrack,
iov.iov_len = sizeof (struct req_lib_cfg_statetrack),
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
&iov,
1,
&res_lib_cfg_statetrack,
sizeof (struct res_lib_cfg_statetrack));
- pthread_mutex_unlock (&cfg_instance->response_mutex);
-
(void)hdb_handle_put (&cfg_hdb, cfg_handle);
return (error == CS_OK ? res_lib_cfg_statetrack.header.error : error);
iov.iov_base = &req_lib_cfg_statetrackstop,
iov.iov_len = sizeof (struct req_lib_cfg_statetrackstop),
- pthread_mutex_lock (&cfg_instance->response_mutex);
- error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
&iov,
1,
&res_lib_cfg_statetrackstop,
sizeof (struct res_lib_cfg_statetrackstop));
- pthread_mutex_unlock (&cfg_instance->response_mutex);
-
(void)hdb_handle_put (&cfg_hdb, cfg_handle);
return (error == CS_OK ? res_lib_cfg_statetrackstop.header.error : error);
iov.iov_base = &req_lib_cfg_killnode;
iov.iov_len = sizeof (struct req_lib_cfg_killnode);
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
&iov,
1,
&res_lib_cfg_killnode,
error = res_lib_cfg_killnode.header.error;
- pthread_mutex_unlock (&cfg_instance->response_mutex);
-
(void)hdb_handle_put (&cfg_hdb, cfg_handle);
return (error == CS_OK ? res_lib_cfg_killnode.header.error : error);
iov.iov_base = &req_lib_cfg_tryshutdown;
iov.iov_len = sizeof (req_lib_cfg_tryshutdown);
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
&iov,
1,
&res_lib_cfg_tryshutdown,
sizeof (struct res_lib_cfg_tryshutdown));
- pthread_mutex_unlock (&cfg_instance->response_mutex);
-
(void)hdb_handle_put (&cfg_hdb, cfg_handle);
return (error == CS_OK ? res_lib_cfg_tryshutdown.header.error : error);
iov.iov_base = &req_lib_cfg_replytoshutdown;
iov.iov_len = sizeof (struct req_lib_cfg_replytoshutdown);
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
&iov,
1,
&res_lib_cfg_replytoshutdown,
sizeof (struct res_lib_cfg_replytoshutdown));
- pthread_mutex_unlock (&cfg_instance->response_mutex);
-
return (error);
}
iov.iov_base = (char *)&req_lib_cfg_get_node_addrs;
iov.iov_len = sizeof (req_lib_cfg_get_node_addrs);
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive_in_buf (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive_in_buf_get (
+ cfg_instance->handle,
&iov,
1,
&return_address);
res_lib_cfg_get_node_addrs = return_address;
- pthread_mutex_unlock (&cfg_instance->response_mutex);
-
if (error != CS_OK) {
- goto error_exit;
+ goto error_put;
}
if (res_lib_cfg_get_node_addrs->family == AF_INET)
*num_addrs = res_lib_cfg_get_node_addrs->num_addrs;
errno = error = res_lib_cfg_get_node_addrs->header.error;
-error_exit:
+error_put:
+ error = coroipcc_msg_send_reply_receive_in_buf_put (cfg_instance->handle);
+ hdb_handle_put (&cfg_hdb, cfg_handle);
return (error);
}
iov.iov_base = &req_lib_cfg_local_get;
iov.iov_len = sizeof (struct req_lib_cfg_local_get);
- pthread_mutex_lock (&cfg_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- cfg_inst->ipc_ctx,
+ cfg_inst->handle,
&iov,
1,
&res_lib_cfg_local_get,
sizeof (struct res_lib_cfg_local_get));
- pthread_mutex_unlock (&cfg_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = &req_lib_cfg_crypto_set;
iov.iov_len = sizeof (struct req_lib_cfg_crypto_set);
- pthread_mutex_lock (&cfg_instance->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cfg_instance->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (cfg_instance->handle,
&iov,
1,
&res_lib_cfg_crypto_set,
sizeof (struct res_lib_cfg_crypto_set));
- pthread_mutex_unlock (&cfg_instance->response_mutex);
-
if (error == CS_OK)
error = res_lib_cfg_crypto_set.header.error;
};
struct confdb_inst {
- void *ipc_ctx;
+ hdb_handle_t handle;
int finalize;
int standalone;
confdb_callbacks_t callbacks;
const void *context;
- pthread_mutex_t response_mutex;
- pthread_mutex_t dispatch_mutex;
struct list_head object_find_head;
struct list_head object_iter_head;
struct list_head key_iter_head;
};
-static void confdb_instance_destructor (void *instance);
-
-DECLARE_HDB_DATABASE(confdb_handle_t_db,confdb_instance_destructor);
+DECLARE_HDB_DATABASE(confdb_handle_t_db,NULL);
static cs_error_t do_find_destroy(struct confdb_inst *confdb_inst, hdb_handle_t find_handle);
}
}
-/*
- * Clean up function for a confdb instance (confdb_initialize) handle
- */
-static void confdb_instance_destructor (void *instance)
-{
- struct confdb_inst *confdb_inst = instance;
-
- pthread_mutex_destroy (&confdb_inst->response_mutex);
- pthread_mutex_destroy (&confdb_inst->dispatch_mutex);
-}
-
static struct iter_context *find_iter_context(struct list_head *list, hdb_handle_t object_handle)
{
struct iter_context *context;
IPC_REQUEST_SIZE,
IPC_RESPONSE_SIZE,
IPC_DISPATCH_SIZE,
- &confdb_inst->ipc_ctx);
+ &confdb_inst->handle);
}
if (error != CS_OK)
goto error_put_destroy;
memcpy (&confdb_inst->callbacks, callbacks, sizeof (confdb_callbacks_t));
- pthread_mutex_init (&confdb_inst->response_mutex, NULL);
- pthread_mutex_init (&confdb_inst->dispatch_mutex, NULL);
-
list_init (&confdb_inst->object_find_head);
list_init (&confdb_inst->object_iter_head);
list_init (&confdb_inst->key_iter_head);
return (error);
}
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
/*
* Another thread has already started finalizing
*/
if (confdb_inst->finalize) {
- pthread_mutex_unlock (&confdb_inst->response_mutex);
(void)hdb_handle_put (&confdb_handle_t_db, handle);
return (CS_ERR_BAD_HANDLE);
}
confdb_inst->finalize = 1;
- pthread_mutex_unlock (&confdb_inst->response_mutex);
-
/* Free saved context handles */
free_context_list(confdb_inst, &confdb_inst->object_find_head);
free_context_list(confdb_inst, &confdb_inst->object_iter_head);
free_context_list(confdb_inst, &confdb_inst->key_iter_head);
if (!confdb_inst->standalone) {
- coroipcc_service_disconnect (confdb_inst->ipc_ctx);
+ coroipcc_service_disconnect (confdb_inst->handle);
}
(void)hdb_handle_destroy (&confdb_handle_t_db, handle);
return (error);
}
- *fd = coroipcc_fd_get (confdb_inst->ipc_ctx);
+ error = coroipcc_fd_get (confdb_inst->handle, fd);
(void)hdb_handle_put (&confdb_handle_t_db, handle);
- return (CS_OK);
+ return (error);
}
cs_error_t confdb_context_get (
int timeout = -1;
cs_error_t error;
int cont = 1; /* always continue do loop except when set to 0 */
- int dispatch_avail;
struct confdb_inst *confdb_inst;
confdb_callbacks_t callbacks;
struct res_lib_confdb_key_change_callback *res_key_changed_pt;
}
do {
- pthread_mutex_lock (&confdb_inst->dispatch_mutex);
-
- dispatch_avail = coroipcc_dispatch_get (
- confdb_inst->ipc_ctx,
+ error = coroipcc_dispatch_get (
+ confdb_inst->handle,
(void **)&dispatch_data,
timeout);
-
- /*
- * Handle has been finalized in another thread
- */
- if (confdb_inst->finalize == 1) {
- error = CS_OK;
- pthread_mutex_unlock (&confdb_inst->dispatch_mutex);
+ if (error != CS_OK) {
goto error_put;
}
- if (dispatch_avail == 0 && dispatch_types == CONFDB_DISPATCH_ALL) {
- pthread_mutex_unlock (&confdb_inst->dispatch_mutex);
- break; /* exit do while cont is 1 loop */
- } else
- if (dispatch_avail == 0) {
- pthread_mutex_unlock (&confdb_inst->dispatch_mutex);
- continue; /* next poll */
- }
-
/*
* Make copy of callbacks, message data, unlock instance, and call callback
* A risk of this dispatch method is that the callback routines may
*/
memcpy (&callbacks, &confdb_inst->callbacks, sizeof (confdb_callbacks_t));
- pthread_mutex_unlock (&confdb_inst->dispatch_mutex);
/*
* Dispatch incoming message
break;
default:
- coroipcc_dispatch_put (confdb_inst->ipc_ctx);
+ coroipcc_dispatch_put (confdb_inst->handle);
error = CS_ERR_LIBRARY;
goto error_noput;
break;
}
- coroipcc_dispatch_put (confdb_inst->ipc_ctx);
+ coroipcc_dispatch_put (confdb_inst->handle);
/*
* Determine if more messages should be processed
iov.iov_base = (char *)&req_lib_confdb_object_create;
iov.iov_len = sizeof (struct req_lib_confdb_object_create);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_object_create,
sizeof (struct res_lib_confdb_object_create));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_object_destroy;
iov.iov_len = sizeof (struct req_lib_confdb_object_destroy);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res,
sizeof (coroipc_response_header_t));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_object_parent_get;
iov.iov_len = sizeof (struct req_lib_confdb_object_parent_get);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_object_parent_get,
sizeof (struct res_lib_confdb_object_parent_get));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_object_find_destroy;
iov.iov_len = sizeof (struct req_lib_confdb_object_find_destroy);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res,
sizeof (coroipc_response_header_t));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_key_create;
iov.iov_len = sizeof (struct req_lib_confdb_key_create);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res,
sizeof (res));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_key_delete;
iov.iov_len = sizeof (struct req_lib_confdb_key_delete);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res,
sizeof (res));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_key_get;
iov.iov_len = sizeof (struct req_lib_confdb_key_get);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_key_get,
sizeof (struct res_lib_confdb_key_get));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_key_get;
iov.iov_len = sizeof (struct req_lib_confdb_key_get);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_key_incdec,
sizeof (struct res_lib_confdb_key_incdec));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_key_get;
iov.iov_len = sizeof (struct req_lib_confdb_key_get);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_key_incdec,
sizeof (struct res_lib_confdb_key_incdec));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_key_replace;
iov.iov_len = sizeof (struct req_lib_confdb_key_replace);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res,
sizeof (res));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_object_find;
iov.iov_len = sizeof (struct req_lib_confdb_object_find);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_object_find,
sizeof (struct res_lib_confdb_object_find));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_object_iter;
iov.iov_len = sizeof (struct req_lib_confdb_object_iter);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_object_iter,
sizeof (struct res_lib_confdb_object_iter));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_confdb_key_iter;
iov.iov_len = sizeof (struct req_lib_confdb_key_iter);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_key_iter,
sizeof (struct res_lib_confdb_key_iter));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req;
iov.iov_len = sizeof (coroipc_request_header_t);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_write,
sizeof (struct res_lib_confdb_write));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
/* FIXME: set error_text */
goto error_exit;
iov.iov_base = (char *)&req_lib_confdb_reload;
iov.iov_len = sizeof (req_lib_confdb_reload);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res_lib_confdb_reload,
sizeof (struct res_lib_confdb_reload));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
-
if (error != CS_OK) {
/* FIXME: set error_text */
goto error_exit;
iov.iov_base = (char *)&req;
iov.iov_len = sizeof (struct req_lib_confdb_object_track_start);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res,
sizeof (coroipc_response_header_t));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req;
iov.iov_len = sizeof (coroipc_request_header_t);
- pthread_mutex_lock (&confdb_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- confdb_inst->ipc_ctx,
+ confdb_inst->handle,
&iov,
1,
&res,
sizeof (coroipc_response_header_t));
- pthread_mutex_unlock (&confdb_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
#include <corosync/coroipc_types.h>
#include <corosync/coroipc_ipc.h>
#include <corosync/coroipcc.h>
+#include <corosync/hdb.h>
-struct ipc_segment {
+#include "util.h"
+
+struct ipc_instance {
int fd;
int shmid;
int semid;
size_t response_size;
size_t dispatch_size;
uid_t euid;
+ pthread_mutex_t mutex;
};
+void ipc_hdb_destructor (void *context);
+
+DECLARE_HDB_DATABASE(ipc_hdb,ipc_hdb_destructor);
#if defined(COROSYNC_LINUX)
/* SUN_LEN is broken for abstract namespace
#define MSG_NOSIGNAL 0
#endif
-static int
-coroipcc_send (
+static cs_error_t
+socket_send (
int s,
void *msg,
size_t len)
{
+ cs_error_t res = CS_OK;
int result;
struct msghdr msg_send;
struct iovec iov_send;
iov_send.iov_len = len - processed;
result = sendmsg (s, &msg_send, MSG_NOSIGNAL);
-
- /*
- * return immediately on any kind of syscall error that maps to
- * CS_ERR if no part of message has been sent
- */
- if (result == -1 && processed == 0) {
- if (errno == EINTR) {
- goto error_exit;
- }
- if (errno == EAGAIN) {
- goto error_exit;
- }
- if (errno == EFAULT) {
- goto error_exit;
- }
- }
-
- /*
- * retry read operations that are already started except
- * for fault in that case, return ERR_LIBRARY
- */
- if (result == -1 && processed > 0) {
- if (errno == EINTR) {
+ if (result == -1) {
+ switch (errno) {
+ case EINTR:
goto retry_send;
- }
- if (errno == EAGAIN) {
+ break;
+ case EAGAIN:
goto retry_send;
+ break;
+ default:
+ res = CS_ERR_LIBRARY;
+ goto res_exit;
}
- if (errno == EFAULT) {
- goto error_exit;
- }
- }
-
- /*
- * return ERR_LIBRARY on any other syscall error
- */
- if (result == -1) {
- goto error_exit;
}
processed += result;
goto retry_send;
}
- return (0);
+ return (CS_OK);
-error_exit:
- return (-1);
+res_exit:
+ return (res);
}
-static int
-coroipcc_recv (
+static cs_error_t
+socket_recv (
int s,
void *msg,
size_t len)
{
- int error = 0;
+ cs_error_t res = CS_OK;
int result;
struct msghdr msg_recv;
struct iovec iov_recv;
iov_recv.iov_len = len - processed;
result = recvmsg (s, &msg_recv, MSG_NOSIGNAL|MSG_WAITALL);
- if (result == -1 && errno == EINTR) {
- goto retry_recv;
- }
- if (result == -1 && errno == EAGAIN) {
- goto retry_recv;
+ if (result == -1) {
+ switch (errno) {
+ case EINTR:
+ goto retry_recv;
+ break;
+ case EAGAIN:
+ goto retry_recv;
+ break;
+ default:
+ res = CS_ERR_LIBRARY;
+ goto res_exit;
+ }
}
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
/* On many OS poll never return POLLHUP or POLLERR.
* EOF is detected when recvmsg return 0.
*/
if (result == 0) {
- error = -1;
- goto error_exit;
+ res = CS_ERR_LIBRARY;
+ goto res_exit;
}
#endif
- if (result == -1 || result == 0) {
- error = -1;
- goto error_exit;
- }
+
processed += result;
if (processed != len) {
goto retry_recv;
}
assert (processed == len);
-error_exit:
- return (0);
+res_exit:
+ return (res);
}
static int
-priv_change_send (struct ipc_segment *ipc_segment)
+priv_change_send (struct ipc_instance *ipc_instance)
{
char buf_req;
mar_req_priv_change req_priv_change;
/*
* Don't resend request unless euid has changed
*/
- if (ipc_segment->euid == req_priv_change.euid) {
+ if (ipc_instance->euid == req_priv_change.euid) {
return (0);
}
req_priv_change.egid = getegid();
buf_req = MESSAGE_REQ_CHANGE_EUID;
- res = coroipcc_send (ipc_segment->fd, &buf_req, 1);
+ res = socket_send (ipc_instance->fd, &buf_req, 1);
if (res == -1) {
return (-1);
}
- res = coroipcc_send (ipc_segment->fd, &req_priv_change,
+ res = socket_send (ipc_instance->fd, &req_priv_change,
sizeof (req_priv_change));
if (res == -1) {
return (-1);
}
- ipc_segment->euid = req_priv_change.euid;
+ ipc_instance->euid = req_priv_change.euid;
return (0);
}
res = munmap (addr, bytes);
}
+void ipc_hdb_destructor (void *context ) {
+ struct ipc_instance *ipc_instance = (struct ipc_instance *)context;
+
+ /*
+ * << 1 (or multiplied by 2) because this is a wrapped memory buffer
+ */
+ memory_unmap (ipc_instance->control_buffer, ipc_instance->control_size);
+ memory_unmap (ipc_instance->request_buffer, ipc_instance->request_size);
+ memory_unmap (ipc_instance->response_buffer, ipc_instance->response_size);
+ memory_unmap (ipc_instance->dispatch_buffer, (ipc_instance->dispatch_size) << 1);
+}
static int
memory_map (char *path, const char *file, void **buf, size_t bytes)
{
return (0);
}
-extern cs_error_t
+static cs_error_t
+msg_send (
+ struct ipc_instance *ipc_instance,
+ const struct iovec *iov,
+ unsigned int iov_len)
+{
+ struct sembuf sop;
+ int i;
+ int res;
+ int req_buffer_idx = 0;
+
+ for (i = 0; i < iov_len; i++) {
+ memcpy (&ipc_instance->request_buffer[req_buffer_idx],
+ iov[i].iov_base,
+ iov[i].iov_len);
+ req_buffer_idx += iov[i].iov_len;
+ }
+ /*
+ * Signal semaphore #0 indicting a new message from client
+ * to server request queue
+ */
+ sop.sem_num = 0;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (ipc_instance->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ } else
+ if (res == -1 && errno == EACCES) {
+ priv_change_send (ipc_instance);
+ goto retry_semop;
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+ return (CS_OK);
+}
+
+static cs_error_t
+reply_receive (
+ struct ipc_instance *ipc_instance,
+ void *res_msg,
+ size_t res_len)
+{
+ struct sembuf sop;
+ coroipc_response_header_t *response_header;
+ int res;
+
+ /*
+ * Wait for semaphore #1 indicating a new message from server
+ * to client in the response queue
+ */
+ sop.sem_num = 1;
+ sop.sem_op = -1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (ipc_instance->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ } else
+ if (res == -1 && errno == EACCES) {
+ priv_change_send (ipc_instance);
+ goto retry_semop;
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+
+ response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
+ if (response_header->error == CS_ERR_TRY_AGAIN) {
+ return (CS_ERR_TRY_AGAIN);
+ }
+
+ memcpy (res_msg, ipc_instance->response_buffer, res_len);
+ return (CS_OK);
+}
+
+static cs_error_t
+reply_receive_in_buf (
+ struct ipc_instance *ipc_instance,
+ void **res_msg)
+{
+ struct sembuf sop;
+ int res;
+
+ /*
+ * Wait for semaphore #1 indicating a new message from server
+ * to client in the response queue
+ */
+ sop.sem_num = 1;
+ sop.sem_op = -1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (ipc_instance->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ } else
+ if (res == -1 && errno == EACCES) {
+ priv_change_send (ipc_instance);
+ goto retry_semop;
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+
+ *res_msg = (char *)ipc_instance->response_buffer;
+ return (CS_OK);
+}
+
+/*
+ * External API
+ */
+cs_error_t
coroipcc_service_connect (
const char *socket_name,
unsigned int service,
size_t request_size,
size_t response_size,
size_t dispatch_size,
- void **ipc_context)
+ hdb_handle_t *handle)
{
int request_fd;
struct sockaddr_un address;
- cs_error_t error;
- struct ipc_segment *ipc_segment;
+ cs_error_t res;
+ struct ipc_instance *ipc_instance;
key_t semkey = 0;
- int res;
+ int sys_res;
mar_req_setup_t req_setup;
mar_res_setup_t res_setup;
union semun semun;
char response_map_path[128];
char dispatch_map_path[128];
+ res = hdb_error_to_cs (hdb_handle_create (&ipc_hdb,
+ sizeof (struct ipc_instance), handle));
+ if (res != CS_OK) {
+ return (res);
+ }
+
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, *handle, (void **)&ipc_instance));
+ if (res != CS_OK) {
+ return (res);
+ }
+
res_setup.error = CS_ERR_LIBRARY;
request_fd = socket (PF_UNIX, SOCK_STREAM, 0);
if (request_fd == -1) {
- return (-1);
+ return (CS_ERR_LIBRARY);
}
memset (&address, 0, sizeof (struct sockaddr_un));
#else
sprintf (address.sun_path, "%s/%s", SOCKETDIR, socket_name);
#endif
- res = connect (request_fd, (struct sockaddr *)&address,
+ sys_res = connect (request_fd, (struct sockaddr *)&address,
AIS_SUN_LEN(&address));
- if (res == -1) {
+ if (sys_res == -1) {
close (request_fd);
return (CS_ERR_TRY_AGAIN);
}
- ipc_segment = malloc (sizeof (struct ipc_segment));
- if (ipc_segment == NULL) {
- close (request_fd);
- return (-1);
- }
- bzero (ipc_segment, sizeof (struct ipc_segment));
-
/*
* Allocate a semaphore segment
*/
while (1) {
semkey = random();
- ipc_segment->euid = geteuid ();
- if ((ipc_segment->semid
+ ipc_instance->euid = geteuid ();
+ if ((ipc_instance->semid
= semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
break;
}
if (errno != EEXIST) {
- goto error_exit;
+ goto res_exit;
}
}
semun.val = 0;
- res = semctl (ipc_segment->semid, 0, SETVAL, semun);
+ res = semctl (ipc_instance->semid, 0, SETVAL, semun);
if (res != 0) {
- goto error_exit;
+ goto res_exit;
}
- res = semctl (ipc_segment->semid, 1, SETVAL, semun);
+ res = semctl (ipc_instance->semid, 1, SETVAL, semun);
if (res != 0) {
- goto error_exit;
+ goto res_exit;
}
res = memory_map (
control_map_path,
"control_buffer-XXXXXX",
- (void *)&ipc_segment->control_buffer,
+ (void *)&ipc_instance->control_buffer,
8192);
res = memory_map (
request_map_path,
"request_buffer-XXXXXX",
- (void *)&ipc_segment->request_buffer,
+ (void *)&ipc_instance->request_buffer,
request_size);
res = memory_map (
response_map_path,
"response_buffer-XXXXXX",
- (void *)&ipc_segment->response_buffer,
+ (void *)&ipc_instance->response_buffer,
response_size);
res = circular_memory_map (
dispatch_map_path,
"dispatch_buffer-XXXXXX",
- (void *)&ipc_segment->dispatch_buffer,
+ (void *)&ipc_instance->dispatch_buffer,
dispatch_size);
/*
req_setup.dispatch_size = dispatch_size;
req_setup.semkey = semkey;
- error = coroipcc_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
- if (error != 0) {
- goto error_exit;
+ res = socket_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
+ if (res != CS_OK) {
+ goto res_exit;
}
- error = coroipcc_recv (request_fd, &res_setup, sizeof (mar_res_setup_t));
- if (error != 0) {
- goto error_exit;
+ res = socket_recv (request_fd, &res_setup, sizeof (mar_res_setup_t));
+ if (res != CS_OK) {
+ goto res_exit;
}
- ipc_segment->fd = request_fd;
- ipc_segment->flow_control_state = 0;
+ ipc_instance->fd = request_fd;
+ ipc_instance->flow_control_state = 0;
if (res_setup.error == CS_ERR_TRY_AGAIN) {
- goto error_exit;
+ goto res_exit;
}
- ipc_segment->control_size = 8192;
- ipc_segment->request_size = request_size;
- ipc_segment->response_size = response_size;
- ipc_segment->dispatch_size = dispatch_size;
+ ipc_instance->control_size = 8192;
+ ipc_instance->request_size = request_size;
+ ipc_instance->response_size = response_size;
+ ipc_instance->dispatch_size = dispatch_size;
+
+ pthread_mutex_init (&ipc_instance->mutex, NULL);
+
+ hdb_handle_put (&ipc_hdb, *handle);
- *ipc_context = ipc_segment;
return (res_setup.error);
-error_exit:
+res_exit:
close (request_fd);
- if (ipc_segment->semid > 0)
- semctl (ipc_segment->semid, 0, IPC_RMID);
+ if (ipc_instance->semid > 0)
+ semctl (ipc_instance->semid, 0, IPC_RMID);
return (res_setup.error);
}
cs_error_t
coroipcc_service_disconnect (
- void *ipc_context)
+ hdb_handle_t handle)
{
- struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
+ cs_error_t res;
+ struct ipc_instance *ipc_instance;
- shutdown (ipc_segment->fd, SHUT_RDWR);
- close (ipc_segment->fd);
- /*
- * << 1 (or multiplied by 2) because this is a wrapped memory buffer
- */
- memory_unmap (ipc_segment->control_buffer, ipc_segment->control_size);
- memory_unmap (ipc_segment->request_buffer, ipc_segment->request_size);
- memory_unmap (ipc_segment->response_buffer, ipc_segment->response_size);
- memory_unmap (ipc_segment->dispatch_buffer, (ipc_segment->dispatch_size) << 1);
- free (ipc_segment);
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+ if (res != CS_OK) {
+ return (res);
+ }
+
+ shutdown (ipc_instance->fd, SHUT_RDWR);
+ close (ipc_instance->fd);
+ hdb_handle_destroy (&ipc_hdb, handle);
+ hdb_handle_put (&ipc_hdb, handle);
return (CS_OK);
}
-int
+cs_error_t
coroipcc_dispatch_flow_control_get (
- void *ipc_context)
+ hdb_handle_t handle,
+ unsigned int *flow_control_state)
{
- struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
+ struct ipc_instance *ipc_instance;
+ cs_error_t res;
+
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+ if (res != CS_OK) {
+ return (res);
+ }
+
+ *flow_control_state = ipc_instance->flow_control_state;
- return (ipc_segment->flow_control_state);
+ hdb_handle_put (&ipc_hdb, handle);
+ return (res);
}
-int
-coroipcc_fd_get (void *ipc_ctx)
+cs_error_t
+coroipcc_fd_get (
+ hdb_handle_t handle,
+ int *fd)
{
- struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+ struct ipc_instance *ipc_instance;
+ cs_error_t res;
- return (ipc_segment->fd);
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+ if (res != CS_OK) {
+ return (res);
+ }
+
+ *fd = ipc_instance->fd;
+
+ hdb_handle_put (&ipc_hdb, handle);
+ return (res);
}
-int
-coroipcc_dispatch_get (void *ipc_ctx, void **data, int timeout)
+cs_error_t
+coroipcc_dispatch_get (
+ hdb_handle_t handle,
+ void **data,
+ int timeout)
{
struct pollfd ufds;
int poll_events;
char buf;
- struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+ struct ipc_instance *ipc_instance;
int res;
char buf_two = 1;
char *data_addr;
+ cs_error_t error = CS_OK;
+
+ error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+ if (error != CS_OK) {
+ return (error);
+ }
- ufds.fd = ipc_segment->fd;
+ *data = NULL;
+
+ ufds.fd = ipc_instance->fd;
ufds.events = POLLIN;
ufds.revents = 0;
goto retry_poll;
} else
if (poll_events == -1) {
- return (-1);
+ goto error_put;
} else
if (poll_events == 0) {
- return (0);
+ goto error_put;
}
if (poll_events == 1 && (ufds.revents & (POLLERR|POLLHUP))) {
- return (-1);
+ error = CS_ERR_LIBRARY;
+ goto error_put;
}
retry_recv:
- res = recv (ipc_segment->fd, &buf, 1, 0);
+ res = recv (ipc_instance->fd, &buf, 1, 0);
if (res == -1 && errno == EINTR) {
goto retry_recv;
} else
if (res == -1) {
- return (-1);
+ goto error_put;
}
if (res == 0) {
- return (-1);
+ goto error_put;
}
- ipc_segment->flow_control_state = 0;
+ ipc_instance->flow_control_state = 0;
if (buf == 1 || buf == 2) {
- ipc_segment->flow_control_state = 1;
+ ipc_instance->flow_control_state = 1;
}
/*
* Notify executive to flush any pending dispatch messages
*/
- if (ipc_segment->flow_control_state) {
+ if (ipc_instance->flow_control_state) {
buf_two = MESSAGE_REQ_OUTQ_FLUSH;
- res = coroipcc_send (ipc_segment->fd, &buf_two, 1);
- assert (res == 0); //TODO
+ res = socket_send (ipc_instance->fd, &buf_two, 1);
+ assert (res == CS_OK); /* TODO */
}
/*
* This is just a notification of flow control starting at the addition
* of a new pending message, not a message to dispatch
*/
if (buf == 2) {
- return (0);
+ goto error_put;
}
if (buf == 3) {
- return (0);
+ goto error_put;
}
- data_addr = ipc_segment->dispatch_buffer;
+ data_addr = ipc_instance->dispatch_buffer;
- data_addr = &data_addr[ipc_segment->control_buffer->read];
+ data_addr = &data_addr[ipc_instance->control_buffer->read];
*data = (void *)data_addr;
- return (1);
+
+ return (CS_OK);
+error_put:
+ hdb_handle_put (&ipc_hdb, handle);
+ return (error);
}
-int
-coroipcc_dispatch_put (void *ipc_ctx)
+cs_error_t
+coroipcc_dispatch_put (hdb_handle_t handle)
{
struct sembuf sop;
coroipc_response_header_t *header;
- struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+ struct ipc_instance *ipc_instance;
int res;
char *addr;
unsigned int read_idx;
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+ if (res != CS_OK) {
+ return (res);
+ }
sop.sem_num = 2;
sop.sem_op = -1;
sop.sem_flg = 0;
retry_semop:
- res = semop (ipc_segment->semid, &sop, 1);
+ res = semop (ipc_instance->semid, &sop, 1);
if (res == -1 && errno == EINTR) {
goto retry_semop;
} else
if (res == -1 && errno == EACCES) {
- priv_change_send (ipc_segment);
+ priv_change_send (ipc_instance);
goto retry_semop;
} else
if (res == -1) {
- return (-1);
+ return (CS_ERR_LIBRARY);
}
- addr = ipc_segment->dispatch_buffer;
+ addr = ipc_instance->dispatch_buffer;
- read_idx = ipc_segment->control_buffer->read;
+ read_idx = ipc_instance->control_buffer->read;
header = (coroipc_response_header_t *) &addr[read_idx];
- ipc_segment->control_buffer->read =
- (read_idx + header->size) % ipc_segment->dispatch_size;
- return (0);
-}
-
-static cs_error_t
-coroipcc_msg_send (
- void *ipc_context,
- const struct iovec *iov,
- unsigned int iov_len)
-{
- struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
- struct sembuf sop;
- int i;
- int res;
- int req_buffer_idx = 0;
-
- for (i = 0; i < iov_len; i++) {
- memcpy (&ipc_segment->request_buffer[req_buffer_idx],
- iov[i].iov_base,
- iov[i].iov_len);
- req_buffer_idx += iov[i].iov_len;
- }
+ ipc_instance->control_buffer->read =
+ (read_idx + header->size) % ipc_instance->dispatch_size;
/*
- * Signal semaphore #0 indicting a new message from client
- * to server request queue
+ * Put from dispatch get and also from this call's get
*/
- sop.sem_num = 0;
- sop.sem_op = 1;
- sop.sem_flg = 0;
+ hdb_handle_put (&ipc_hdb, handle);
+ hdb_handle_put (&ipc_hdb, handle);
-retry_semop:
- res = semop (ipc_segment->semid, &sop, 1);
- if (res == -1 && errno == EINTR) {
- goto retry_semop;
- } else
- if (res == -1 && errno == EACCES) {
- priv_change_send (ipc_segment);
- goto retry_semop;
- } else
- if (res == -1) {
- return (CS_ERR_LIBRARY);
- }
return (CS_OK);
}
-static cs_error_t
-coroipcc_reply_receive (
- void *ipc_context,
+cs_error_t
+coroipcc_msg_send_reply_receive (
+ hdb_handle_t handle,
+ const struct iovec *iov,
+ unsigned int iov_len,
void *res_msg,
size_t res_len)
{
- struct sembuf sop;
- struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
- coroipc_response_header_t *response_header;
- int res;
-
- /*
- * Wait for semaphore #1 indicating a new message from server
- * to client in the response queue
- */
- sop.sem_num = 1;
- sop.sem_op = -1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (ipc_segment->semid, &sop, 1);
- if (res == -1 && errno == EINTR) {
- goto retry_semop;
- } else
- if (res == -1 && errno == EACCES) {
- priv_change_send (ipc_segment);
- goto retry_semop;
- } else
- if (res == -1) {
- return (CS_ERR_LIBRARY);
- }
+ cs_error_t res;
+ struct ipc_instance *ipc_instance;
- response_header = (coroipc_response_header_t *)ipc_segment->response_buffer;
- if (response_header->error == CS_ERR_TRY_AGAIN) {
- return (CS_ERR_TRY_AGAIN);
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+ if (res != CS_OK) {
+ return (res);
}
- memcpy (res_msg, ipc_segment->response_buffer, res_len);
- return (CS_OK);
-}
+ pthread_mutex_lock (&ipc_instance->mutex);
-static cs_error_t
-coroipcc_reply_receive_in_buf (
- void *ipc_context,
- void **res_msg)
-{
- struct sembuf sop;
- struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_context;
- int res;
+ res = msg_send (ipc_instance, iov, iov_len);
+ if (res != CS_OK) {
+ goto error_exit;
+ }
- /*
- * Wait for semaphore #1 indicating a new message from server
- * to client in the response queue
- */
- sop.sem_num = 1;
- sop.sem_op = -1;
- sop.sem_flg = 0;
+ res = reply_receive (ipc_instance, res_msg, res_len);
-retry_semop:
- res = semop (ipc_segment->semid, &sop, 1);
- if (res == -1 && errno == EINTR) {
- goto retry_semop;
- } else
- if (res == -1 && errno == EACCES) {
- priv_change_send (ipc_segment);
- goto retry_semop;
- } else
- if (res == -1) {
- return (CS_ERR_LIBRARY);
- }
+error_exit:
+ hdb_handle_put (&ipc_hdb, handle);
+ pthread_mutex_unlock (&ipc_instance->mutex);
- *res_msg = (char *)ipc_segment->response_buffer;
- return (CS_OK);
+ return (res);
}
cs_error_t
-coroipcc_msg_send_reply_receive (
- void *ipc_context,
+coroipcc_msg_send_reply_receive_in_buf_get (
+ hdb_handle_t handle,
const struct iovec *iov,
unsigned int iov_len,
- void *res_msg,
- size_t res_len)
+ void **res_msg)
{
- cs_error_t res;
+ unsigned int res;
+ struct ipc_instance *ipc_instance;
- res = coroipcc_msg_send (ipc_context, iov, iov_len);
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
if (res != CS_OK) {
return (res);
}
- res = coroipcc_reply_receive (ipc_context, res_msg, res_len);
+ pthread_mutex_lock (&ipc_instance->mutex);
+
+ res = msg_send (ipc_instance, iov, iov_len);
if (res != CS_OK) {
- return (res);
+ goto error_exit;
}
- return (CS_OK);
+ res = reply_receive_in_buf (ipc_instance, res_msg);
+
+error_exit:
+ pthread_mutex_unlock (&ipc_instance->mutex);
+
+ return (res);
}
cs_error_t
-coroipcc_msg_send_reply_receive_in_buf (
- void *ipc_context,
- const struct iovec *iov,
- unsigned int iov_len,
- void **res_msg)
+coroipcc_msg_send_reply_receive_in_buf_put (
+ hdb_handle_t handle)
{
unsigned int res;
+ struct ipc_instance *ipc_instance;
- res = coroipcc_msg_send (ipc_context, iov, iov_len);
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
if (res != CS_OK) {
return (res);
}
+ hdb_handle_put (&ipc_hdb, handle);
+ hdb_handle_put (&ipc_hdb, handle);
- res = coroipcc_reply_receive_in_buf (ipc_context, res_msg);
- if (res != CS_OK) {
- return (res);
- }
-
- return (CS_OK);
+ return (res);
}
cs_error_t
coroipcc_zcb_alloc (
- void *ipc_context,
+ hdb_handle_t handle,
void **buffer,
size_t size,
size_t header_size)
{
+ struct ipc_instance *ipc_instance;
void *buf = NULL;
char path[128];
unsigned int res;
struct iovec iovec;
struct coroipcs_zc_header *hdr;
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+ if (res != CS_OK) {
+ return (res);
+ }
map_size = size + header_size + sizeof (struct coroipcs_zc_header);
- res = memory_map (path, "cpg_zc-XXXXXX", &buf, size);
+ res = memory_map (path, "corosync_zerocopy-XXXXXX", &buf, size);
assert (res != -1);
req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
res = coroipcc_msg_send_reply_receive (
- ipc_context,
+ handle,
&iovec,
1,
&res_coroipcs_zc_alloc,
hdr = (struct coroipcs_zc_header *)buf;
hdr->map_size = map_size;
*buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header);
- return (CS_OK);
+
+ hdb_handle_put (&ipc_hdb, handle);
+ return (res);
}
cs_error_t
coroipcc_zcb_free (
- void *ipc_context,
+ hdb_handle_t handle,
void *buffer)
{
+ struct ipc_instance *ipc_instance;
mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
coroipc_response_header_t res_coroipcs_zc_free;
struct iovec iovec;
unsigned int res;
-
struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header));
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+ if (res != CS_OK) {
+ return (res);
+ }
+
req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
req_coroipcc_zc_free.header.id = ZC_FREE_HEADER;
req_coroipcc_zc_free.map_size = header->map_size;
iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
res = coroipcc_msg_send_reply_receive (
- ipc_context,
+ handle,
&iovec,
1,
&res_coroipcs_zc_free,
munmap (header, header->map_size);
- return (CS_OK);
+ hdb_handle_put (&ipc_hdb, handle);
+
+ return (res);
}
cs_error_t
coroipcc_zcb_msg_send_reply_receive (
- void *ipc_context,
+ hdb_handle_t handle,
void *msg,
void *res_msg,
size_t res_len)
{
+ struct ipc_instance *ipc_instance;
mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
struct coroipcs_zc_header *hdr;
struct iovec iovec;
cs_error_t res;
+ res = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
+ if (res != CS_OK) {
+ return (res);
+ }
hdr = (struct coroipcs_zc_header *)(((char *)msg) - sizeof (struct coroipcs_zc_header));
req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
res = coroipcc_msg_send_reply_receive (
- ipc_context,
+ handle,
&iovec,
1,
res_msg,
res_len);
+ hdb_handle_put (&ipc_hdb, handle);
return (res);
}
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include "util.h"
struct cpg_inst {
- void *ipc_ctx;
+ hdb_handle_t handle;
int finalize;
cpg_callbacks_t callbacks;
void *context;
- pthread_mutex_t response_mutex;
- pthread_mutex_t dispatch_mutex;
};
-static void cpg_instance_destructor (void *instance);
-
-DECLARE_HDB_DATABASE(cpg_handle_t_db,cpg_instance_destructor);
-
-/*
- * Clean up function for a cpg instance (cpg_nitialize) handle
- */
-static void cpg_instance_destructor (void *instance)
-{
- struct cpg_inst *cpg_inst = instance;
-
- pthread_mutex_destroy (&cpg_inst->response_mutex);
- pthread_mutex_destroy (&cpg_inst->dispatch_mutex);
-}
-
+DECLARE_HDB_DATABASE(cpg_handle_t_db,NULL);
/**
* @defgroup cpg_coroipcc The closed process group API
IPC_REQUEST_SIZE,
IPC_RESPONSE_SIZE,
IPC_DISPATCH_SIZE,
- &cpg_inst->ipc_ctx);
+ &cpg_inst->handle);
if (error != CS_OK) {
goto error_put_destroy;
}
memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t));
- pthread_mutex_init (&cpg_inst->response_mutex, NULL);
-
- pthread_mutex_init (&cpg_inst->dispatch_mutex, NULL);
-
hdb_handle_put (&cpg_handle_t_db, *handle);
return (CS_OK);
return (error);
}
- pthread_mutex_lock (&cpg_inst->response_mutex);
-
/*
* Another thread has already started finalizing
*/
if (cpg_inst->finalize) {
- pthread_mutex_unlock (&cpg_inst->response_mutex);
hdb_handle_put (&cpg_handle_t_db, handle);
return (CPG_ERR_BAD_HANDLE);
}
cpg_inst->finalize = 1;
- coroipcc_service_disconnect (cpg_inst->ipc_ctx);
-
- pthread_mutex_unlock (&cpg_inst->response_mutex);
+ coroipcc_service_disconnect (cpg_inst->handle);
hdb_handle_destroy (&cpg_handle_t_db, handle);
return (error);
}
- *fd = coroipcc_fd_get (cpg_inst->ipc_ctx);
+ error = coroipcc_fd_get (cpg_inst->handle, fd);
hdb_handle_put (&cpg_handle_t_db, handle);
- return (CS_OK);
+ return (error);
}
cs_error_t cpg_context_get (
int timeout = -1;
cs_error_t error;
int cont = 1; /* always continue do loop except when set to 0 */
- int dispatch_avail;
struct cpg_inst *cpg_inst;
struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
}
do {
- pthread_mutex_lock (&cpg_inst->dispatch_mutex);
-
- dispatch_avail = coroipcc_dispatch_get (
- cpg_inst->ipc_ctx,
+ error = coroipcc_dispatch_get (
+ cpg_inst->handle,
(void **)&dispatch_data,
timeout);
-
- pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
-
- if (dispatch_avail == 0 && dispatch_types == CPG_DISPATCH_ALL) {
- pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
- break; /* exit do while cont is 1 loop */
- } else
- if (dispatch_avail == 0) {
- pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
- continue; /* next poll */
+ if (error != CS_OK) {
+ goto error_put;
}
- if (dispatch_avail == -1) {
- if (cpg_inst->finalize == 1) {
- error = CS_OK;
+
+ if (dispatch_data == NULL) {
+ if (dispatch_types == CPG_DISPATCH_ALL) {
+ break; /* exit do while cont is 1 loop */
} else {
- error = CS_ERR_LIBRARY;
+ continue; /* next poll */
}
- goto error_put;
}
/*
break;
default:
- coroipcc_dispatch_put (cpg_inst->ipc_ctx);
+ coroipcc_dispatch_put (cpg_inst->handle);
error = CS_ERR_LIBRARY;
goto error_put;
break;
}
- coroipcc_dispatch_put (cpg_inst->ipc_ctx);
+ coroipcc_dispatch_put (cpg_inst->handle);
/*
* Determine if more messages should be processed
iov[0].iov_len = sizeof (struct req_lib_cpg_join);
do {
- pthread_mutex_lock (&cpg_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov, 1,
+ error = coroipcc_msg_send_reply_receive (cpg_inst->handle, iov, 1,
&res_lib_cpg_join, sizeof (struct res_lib_cpg_join));
- pthread_mutex_unlock (&cpg_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
do {
- pthread_mutex_lock (&cpg_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov, 1,
+ error = coroipcc_msg_send_reply_receive (cpg_inst->handle, iov, 1,
&res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
- pthread_mutex_unlock (&cpg_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_len = sizeof (coroipc_request_header_t);
do {
- pthread_mutex_lock (&cpg_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, &iov, 1,
+ error = coroipcc_msg_send_reply_receive (cpg_inst->handle, &iov, 1,
&res_lib_cpg_membership_get, sizeof (coroipc_response_header_t));
- pthread_mutex_unlock (&cpg_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = &req_lib_cpg_local_get;
iov.iov_len = sizeof (struct req_lib_cpg_local_get);
- pthread_mutex_lock (&cpg_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, &iov, 1,
+ error = coroipcc_msg_send_reply_receive (cpg_inst->handle, &iov, 1,
&res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
- pthread_mutex_unlock (&cpg_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
return (error);
}
- *flow_control_state = coroipcc_dispatch_flow_control_get (cpg_inst->ipc_ctx);
+ error = coroipcc_dispatch_flow_control_get (cpg_inst->handle, (unsigned int *)flow_control_state);
hdb_handle_put (&cpg_handle_t_db, handle);
return (error);
}
- error = coroipcc_zcb_alloc (cpg_inst->ipc_ctx,
+ error = coroipcc_zcb_alloc (cpg_inst->handle,
buffer,
size,
sizeof (struct req_lib_cpg_mcast));
return (error);
}
- coroipcc_zcb_free (cpg_inst->ipc_ctx, ((char *)buffer) - sizeof (struct req_lib_cpg_mcast));
+ coroipcc_zcb_free (cpg_inst->handle, ((char *)buffer) - sizeof (struct req_lib_cpg_mcast));
hdb_handle_put (&cpg_handle_t_db, handle);
req_lib_cpg_mcast->guarantee = guarantee;
req_lib_cpg_mcast->msglen = msg_len;
- pthread_mutex_lock (&cpg_inst->response_mutex);
-
error = coroipcc_zcb_msg_send_reply_receive (
- cpg_inst->ipc_ctx,
+ cpg_inst->handle,
req_lib_cpg_mcast,
&res_lib_cpg_mcast,
sizeof (res_lib_cpg_mcast));
- pthread_mutex_unlock (&cpg_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
- pthread_mutex_lock (&cpg_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov,
+ error = coroipcc_msg_send_reply_receive (cpg_inst->handle, iov,
iov_len + 1, &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast));
- pthread_mutex_unlock (&cpg_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#define MIN(x,y) ((x) < (y) ? (x) : (y))
struct evs_inst {
- void *ipc_ctx;
+ hdb_handle_t handle;
int finalize;
evs_callbacks_t callbacks;
- pthread_mutex_t response_mutex;
- pthread_mutex_t dispatch_mutex;
};
-static void evs_instance_destructor (void *instance);
-
-DECLARE_HDB_DATABASE (evs_handle_t_db, evs_instance_destructor);
+DECLARE_HDB_DATABASE (evs_handle_t_db,NULL);
/*
* Clean up function for an evt instance (saEvtInitialize) handle
*/
-static void evs_instance_destructor (void *instance)
-{
- struct evs_inst *evs_inst = instance;
-
- pthread_mutex_destroy (&evs_inst->response_mutex);
- pthread_mutex_destroy (&evs_inst->dispatch_mutex);
-}
/**
IPC_REQUEST_SIZE,
IPC_RESPONSE_SIZE,
IPC_DISPATCH_SIZE,
- &evs_inst->ipc_ctx);
+ &evs_inst->handle);
if (error != EVS_OK) {
goto error_put_destroy;
}
memcpy (&evs_inst->callbacks, callbacks, sizeof (evs_callbacks_t));
- pthread_mutex_init (&evs_inst->response_mutex, NULL);
-
- pthread_mutex_init (&evs_inst->dispatch_mutex, NULL);
-
hdb_handle_put (&evs_handle_t_db, *handle);
return (CS_OK);
if (error != CS_OK) {
return (error);
}
- pthread_mutex_lock (&evs_inst->response_mutex);
/*
* Another thread has already started finalizing
*/
if (evs_inst->finalize) {
- pthread_mutex_unlock (&evs_inst->response_mutex);
hdb_handle_put (&evs_handle_t_db, handle);
return (EVS_ERR_BAD_HANDLE);
}
evs_inst->finalize = 1;
- coroipcc_service_disconnect (evs_inst->ipc_ctx);
-
- pthread_mutex_unlock (&evs_inst->response_mutex);
+ coroipcc_service_disconnect (evs_inst->handle);
hdb_handle_destroy (&evs_handle_t_db, handle);
return (error);
}
- *fd = coroipcc_fd_get (evs_inst->ipc_ctx);
+ coroipcc_fd_get (evs_inst->handle, fd);
hdb_handle_put (&evs_handle_t_db, handle);
int timeout = -1;
cs_error_t error;
int cont = 1; /* always continue do loop except when set to 0 */
- int dispatch_avail;
struct evs_inst *evs_inst;
struct res_evs_confchg_callback *res_evs_confchg_callback;
struct res_evs_deliver_callback *res_evs_deliver_callback;
}
do {
- pthread_mutex_lock (&evs_inst->dispatch_mutex);
-
- dispatch_avail = coroipcc_dispatch_get (
- evs_inst->ipc_ctx,
+ error = coroipcc_dispatch_get (
+ evs_inst->handle,
(void **)&dispatch_data,
timeout);
-
- pthread_mutex_unlock (&evs_inst->dispatch_mutex);
-
- if (dispatch_avail == 0 && dispatch_types == EVS_DISPATCH_ALL) {
- break; /* exit do while cont is 1 loop */
- } else
- if (dispatch_avail == 0) {
- continue; /* next dispatch event */
+ if (error != CS_OK) {
+ goto error_put;
}
- if (dispatch_avail == -1) {
- if (evs_inst->finalize == 1) {
- error = CS_OK;
+
+ if (dispatch_data == NULL) {
+ if (dispatch_types == CPG_DISPATCH_ALL) {
+ break; /* exit do while cont is 1 loop */
} else {
- error = CS_ERR_LIBRARY;
+ continue; /* next poll */
}
- goto error_put;
}
/*
break;
default:
- coroipcc_dispatch_put (evs_inst->ipc_ctx);
+ coroipcc_dispatch_put (evs_inst->handle);
error = CS_ERR_LIBRARY;
goto error_put;
break;
}
- coroipcc_dispatch_put (evs_inst->ipc_ctx);
+ coroipcc_dispatch_put (evs_inst->handle);
/*
* Determine if more messages should be processed
iov[1].iov_base = (void*) groups; /* cast away const */
iov[1].iov_len = (group_entries * sizeof (struct evs_group));
- pthread_mutex_lock (&evs_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx, iov, 2,
+ error = coroipcc_msg_send_reply_receive (evs_inst->handle, iov, 2,
&res_lib_evs_join, sizeof (struct res_lib_evs_join));
- pthread_mutex_unlock (&evs_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov[1].iov_base = (void *) groups; /* cast away const */
iov[1].iov_len = (group_entries * sizeof (struct evs_group));
- pthread_mutex_lock (&evs_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx, iov, 2,
+ error = coroipcc_msg_send_reply_receive (evs_inst->handle, iov, 2,
&res_lib_evs_leave, sizeof (struct res_lib_evs_leave));
- pthread_mutex_unlock (&evs_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov[0].iov_len = sizeof (struct req_lib_evs_mcast_joined);
memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
- pthread_mutex_lock (&evs_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx, iov,
+ error = coroipcc_msg_send_reply_receive (evs_inst->handle, iov,
iov_len + 1,
&res_lib_evs_mcast_joined,
sizeof (struct res_lib_evs_mcast_joined));
- pthread_mutex_unlock (&evs_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov[1].iov_len = (group_entries * sizeof (struct evs_group));
memcpy (&iov[2], iovec, iov_len * sizeof (struct iovec));
- pthread_mutex_lock (&evs_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx, iov,
+ error = coroipcc_msg_send_reply_receive (evs_inst->handle, iov,
iov_len + 2,
&res_lib_evs_mcast_groups,
sizeof (struct res_lib_evs_mcast_groups));
- pthread_mutex_unlock (&evs_inst->response_mutex);
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = &req_lib_evs_membership_get;
iov.iov_len = sizeof (struct req_lib_evs_membership_get);
- pthread_mutex_lock (&evs_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive (evs_inst->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive (evs_inst->handle,
&iov,
1,
&res_lib_evs_membership_get,
sizeof (struct res_lib_evs_membership_get));
- pthread_mutex_unlock (&evs_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include "util.h"
-static void pload_instance_destructor (void *instance);
-
struct pload_inst {
- void *ipc_ctx;
- pthread_mutex_t response_mutex;
- pthread_mutex_t dispatch_mutex;
+ hdb_handle_t handle;
unsigned int finalize;
};
-DECLARE_HDB_DATABASE(pload_handle_t_db,pload_instance_destructor);
-
-/*
- * Clean up function for an evt instance (saEvtInitialize) handle
- */
-static void pload_instance_destructor (void *instance)
-{
- struct pload_inst *pload_inst = instance;
-
- pthread_mutex_destroy (&pload_inst->response_mutex);
- pthread_mutex_destroy (&pload_inst->dispatch_mutex);
-}
-
+DECLARE_HDB_DATABASE(pload_handle_t_db,NULL);
/**
* @defgroup pload_corosync The extended virtual synchrony passthrough API
IPC_REQUEST_SIZE,
IPC_RESPONSE_SIZE,
IPC_DISPATCH_SIZE,
- &pload_inst->ipc_ctx);
+ &pload_inst->handle);
if (error != CS_OK) {
goto error_put_destroy;
}
- pthread_mutex_init (&pload_inst->response_mutex, NULL);
-
- pthread_mutex_init (&pload_inst->dispatch_mutex, NULL);
-
(void)hdb_handle_put (&pload_handle_t_db, *handle);
return (CS_OK);
return (error);
}
- pthread_mutex_lock (&pload_inst->response_mutex);
-
/*
* Another thread has already started finalizing
*/
if (pload_inst->finalize) {
- pthread_mutex_unlock (&pload_inst->response_mutex);
(void)hdb_handle_put (&pload_handle_t_db, handle);
return (PLOAD_ERR_BAD_HANDLE);
}
pload_inst->finalize = 1;
- coroipcc_service_disconnect(pload_inst->ipc_ctx);
-
- pthread_mutex_unlock (&pload_inst->response_mutex);
+ coroipcc_service_disconnect(pload_inst->handle);
(void)hdb_handle_destroy (&pload_handle_t_db, handle);
return (error);
}
- *fd = coroipcc_fd_get (pload_inst->ipc_ctx);
+ coroipcc_fd_get (pload_inst->handle, fd);
(void)hdb_handle_put (&pload_handle_t_db, handle);
iov.iov_base = (char *)&req_lib_pload_start;
iov.iov_len = sizeof (struct req_lib_pload_start);
- pthread_mutex_lock (&pload_inst->response_mutex);
-
- error = coroipcc_msg_send_reply_receive(pload_inst->ipc_ctx,
+ error = coroipcc_msg_send_reply_receive(pload_inst->handle,
&iov,
1,
&res_lib_pload_start,
sizeof (struct res_lib_pload_start));
- pthread_mutex_unlock (&pload_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include "util.h"
struct quorum_inst {
- void *ipc_ctx;
+ hdb_handle_t handle;
int finalize;
const void *context;
quorum_callbacks_t callbacks;
- pthread_mutex_t response_mutex;
- pthread_mutex_t dispatch_mutex;
};
-static void quorum_instance_destructor (void *instance);
-
-DECLARE_HDB_DATABASE(quorum_handle_t_db,quorum_instance_destructor);
-
-/*
- * Clean up function for a quorum instance (quorum_initialize) handle
- */
-static void quorum_instance_destructor (void *instance)
-{
- struct quorum_inst *quorum_inst = instance;
-
- pthread_mutex_destroy (&quorum_inst->response_mutex);
-}
+DECLARE_HDB_DATABASE(quorum_handle_t_db,NULL);
cs_error_t quorum_initialize (
quorum_handle_t *handle,
IPC_REQUEST_SIZE,
IPC_RESPONSE_SIZE,
IPC_DISPATCH_SIZE,
- &quorum_inst->ipc_ctx);
+ &quorum_inst->handle);
if (error != CS_OK) {
goto error_put_destroy;
}
- pthread_mutex_init (&quorum_inst->response_mutex, NULL);
- pthread_mutex_init (&quorum_inst->dispatch_mutex, NULL);
if (callbacks)
memcpy(&quorum_inst->callbacks, callbacks, sizeof (callbacks));
else
return (error);
}
- pthread_mutex_lock (&quorum_inst->response_mutex);
-
/*
* Another thread has already started finalizing
*/
if (quorum_inst->finalize) {
- pthread_mutex_unlock (&quorum_inst->response_mutex);
(void)hdb_handle_put (&quorum_handle_t_db, handle);
return (CS_ERR_BAD_HANDLE);
}
quorum_inst->finalize = 1;
- coroipcc_service_disconnect (quorum_inst->ipc_ctx);
-
- pthread_mutex_unlock (&quorum_inst->response_mutex);
+ coroipcc_service_disconnect (quorum_inst->handle);
(void)hdb_handle_destroy (&quorum_handle_t_db, handle);
return (error);
}
- pthread_mutex_lock (&quorum_inst->response_mutex);
-
req.size = sizeof (req);
req.id = MESSAGE_REQ_QUORUM_GETQUORATE;
iov.iov_len = sizeof (req);
error = coroipcc_msg_send_reply_receive (
- quorum_inst->ipc_ctx,
+ quorum_inst->handle,
&iov,
1,
&res_lib_quorum_getquorate,
sizeof (struct res_lib_quorum_getquorate));
- pthread_mutex_unlock (&quorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
return (error);
}
- *fd = coroipcc_fd_get (quorum_inst->ipc_ctx);
+ error = coroipcc_fd_get (quorum_inst->handle, fd);
(void)hdb_handle_put (&quorum_handle_t_db, handle);
- return (CS_OK);
+ return (error);
}
return (error);
}
- pthread_mutex_lock (&quorum_inst->response_mutex);
-
req_lib_quorum_trackstart.header.size = sizeof (struct req_lib_quorum_trackstart);
req_lib_quorum_trackstart.header.id = MESSAGE_REQ_QUORUM_TRACKSTART;
req_lib_quorum_trackstart.track_flags = flags;
iov.iov_len = sizeof (struct req_lib_quorum_trackstart);
error = coroipcc_msg_send_reply_receive (
- quorum_inst->ipc_ctx,
+ quorum_inst->handle,
&iov,
1,
&res,
sizeof (res));
- pthread_mutex_unlock (&quorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
return (error);
}
- pthread_mutex_lock (&quorum_inst->response_mutex);
-
req.size = sizeof (req);
req.id = MESSAGE_REQ_QUORUM_TRACKSTOP;
iov.iov_len = sizeof (req);
error = coroipcc_msg_send_reply_receive (
- quorum_inst->ipc_ctx,
+ quorum_inst->handle,
&iov,
1,
&res,
sizeof (res));
- pthread_mutex_unlock (&quorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
int timeout = -1;
cs_error_t error;
int cont = 1; /* always continue do loop except when set to 0 */
- int dispatch_avail;
struct quorum_inst *quorum_inst;
quorum_callbacks_t callbacks;
coroipc_response_header_t *dispatch_data;
}
do {
- pthread_mutex_lock (&quorum_inst->dispatch_mutex);
-
- dispatch_avail = coroipcc_dispatch_get (
- quorum_inst->ipc_ctx,
+ error = coroipcc_dispatch_get (
+ quorum_inst->handle,
(void **)&dispatch_data,
timeout);
-
- /*
- * Handle has been finalized in another thread
- */
- if (quorum_inst->finalize == 1) {
- error = CS_OK;
- goto error_unlock;
- }
-
- if (dispatch_avail == 0 && dispatch_types == CS_DISPATCH_ALL) {
- pthread_mutex_unlock (&quorum_inst->dispatch_mutex);
- break; /* exit do while cont is 1 loop */
- } else
- if (dispatch_avail == 0) {
- pthread_mutex_unlock (&quorum_inst->dispatch_mutex);
- continue; /* next poll */
+ if (error != CS_OK) {
+ goto error_put;
}
/*
* operate at the same time that quorum_finalize has been called in another thread.
*/
memcpy (&callbacks, &quorum_inst->callbacks, sizeof (quorum_callbacks_t));
- pthread_mutex_unlock (&quorum_inst->dispatch_mutex);
-
/*
* Dispatch incoming message
*/
break;
default:
- coroipcc_dispatch_put (quorum_inst->ipc_ctx);
+ coroipcc_dispatch_put (quorum_inst->handle);
error = CS_ERR_LIBRARY;
goto error_put;
break;
}
- coroipcc_dispatch_put (quorum_inst->ipc_ctx);
+ coroipcc_dispatch_put (quorum_inst->handle);
/*
* Determine if more messages should be processed
goto error_put;
-error_unlock:
- pthread_mutex_unlock (&quorum_inst->dispatch_mutex);
-
error_put:
(void)hdb_handle_put (&quorum_handle_t_db, handle);
return (error);
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-#include <pthread.h>
#include <sys/types.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include "util.h"
struct votequorum_inst {
- void *ipc_ctx;
+ hdb_handle_t handle;
int finalize;
void *context;
votequorum_callbacks_t callbacks;
- pthread_mutex_t response_mutex;
- pthread_mutex_t dispatch_mutex;
};
static void votequorum_instance_destructor (void *instance);
-DECLARE_HDB_DATABASE(votequorum_handle_t_db,votequorum_instance_destructor);
-
-/*
- * Clean up function for a quorum instance (votequorum_initialize) handle
- */
-static void votequorum_instance_destructor (void *instance)
-{
- struct votequorum_inst *votequorum_inst = instance;
-
- pthread_mutex_destroy (&votequorum_inst->response_mutex);
-}
+DECLARE_HDB_DATABASE(votequorum_handle_t_db,NULL);
cs_error_t votequorum_initialize (
votequorum_handle_t *handle,
IPC_REQUEST_SIZE,
IPC_RESPONSE_SIZE,
IPC_DISPATCH_SIZE,
- &votequorum_inst->ipc_ctx);
+ &votequorum_inst->handle);
if (error != CS_OK) {
goto error_put_destroy;
}
- pthread_mutex_init (&votequorum_inst->response_mutex, NULL);
- pthread_mutex_init (&votequorum_inst->dispatch_mutex, NULL);
if (callbacks)
memcpy(&votequorum_inst->callbacks, callbacks, sizeof (*callbacks));
else
return (error);
}
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
/*
* Another thread has already started finalizing
*/
if (votequorum_inst->finalize) {
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
hdb_handle_put (&votequorum_handle_t_db, handle);
return (CS_ERR_BAD_HANDLE);
}
votequorum_inst->finalize = 1;
- coroipcc_service_disconnect (votequorum_inst->ipc_ctx);
-
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
+ coroipcc_service_disconnect (votequorum_inst->handle);
hdb_handle_destroy (&votequorum_handle_t_db, handle);
iov.iov_base = (char *)&req_lib_votequorum_getinfo;
iov.iov_len = sizeof (struct req_lib_votequorum_getinfo);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_getinfo,
sizeof (struct res_lib_votequorum_getinfo));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_votequorum_setexpected;
iov.iov_len = sizeof (struct req_lib_votequorum_setexpected);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_status,
sizeof (struct res_lib_votequorum_status));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_votequorum_setvotes;
iov.iov_len = sizeof (struct req_lib_votequorum_setvotes);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_status,
sizeof (struct res_lib_votequorum_status));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_votequorum_qdisk_register;
iov.iov_len = sizeof (struct req_lib_votequorum_qdisk_register);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_status,
sizeof (struct res_lib_votequorum_status));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_votequorum_qdisk_poll;
iov.iov_len = sizeof (struct req_lib_votequorum_qdisk_poll);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_status,
sizeof (struct res_lib_votequorum_status));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
return (error);
}
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
req_lib_votequorum_general.header.size = sizeof (struct req_lib_votequorum_general);
req_lib_votequorum_general.header.id = MESSAGE_REQ_VOTEQUORUM_QDISK_UNREGISTER;
iov.iov_len = sizeof (struct req_lib_votequorum_general);
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_status,
sizeof (struct res_lib_votequorum_status));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_votequorum_general;
iov.iov_len = sizeof (struct req_lib_votequorum_general);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_qdisk_getinfo,
sizeof (struct res_lib_votequorum_qdisk_getinfo));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_votequorum_general;
iov.iov_len = sizeof (struct req_lib_votequorum_general);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_status,
sizeof (struct res_lib_votequorum_status));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_votequorum_general;
iov.iov_len = sizeof (struct req_lib_votequorum_general);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_status,
sizeof (struct res_lib_votequorum_status));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_votequorum_trackstart;
iov.iov_len = sizeof (struct req_lib_votequorum_trackstart);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_status,
sizeof (struct res_lib_votequorum_status));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
iov.iov_base = (char *)&req_lib_votequorum_general;
iov.iov_len = sizeof (struct req_lib_votequorum_general);
- pthread_mutex_lock (&votequorum_inst->response_mutex);
-
error = coroipcc_msg_send_reply_receive (
- votequorum_inst->ipc_ctx,
+ votequorum_inst->handle,
&iov,
1,
&res_lib_votequorum_status,
sizeof (struct res_lib_votequorum_status));
- pthread_mutex_unlock (&votequorum_inst->response_mutex);
-
if (error != CS_OK) {
goto error_exit;
}
return (error);
}
- *fd = coroipcc_fd_get (votequorum_inst->ipc_ctx);
+ error = coroipcc_fd_get (votequorum_inst->handle, fd);
(void)hdb_handle_put (&votequorum_handle_t_db, handle);
- return (CS_OK);
+ return (error);
}
cs_error_t votequorum_dispatch (
int timeout = -1;
cs_error_t error;
int cont = 1; /* always continue do loop except when set to 0 */
- int dispatch_avail;
struct votequorum_inst *votequorum_inst;
votequorum_callbacks_t callbacks;
coroipc_response_header_t *dispatch_data;
}
do {
- pthread_mutex_lock (&votequorum_inst->dispatch_mutex);
-
- dispatch_avail = coroipcc_dispatch_get (
- votequorum_inst->ipc_ctx,
+ error = coroipcc_dispatch_get (
+ votequorum_inst->handle,
(void **)&dispatch_data,
timeout);
-
- /*
- * Handle has been finalized in another thread
- */
- if (votequorum_inst->finalize == 1) {
- error = CS_OK;
- goto error_unlock;
+ if (error != CS_OK) {
+ goto error_put;
}
- if (dispatch_avail == 0 && dispatch_types == CS_DISPATCH_ALL) {
- pthread_mutex_unlock (&votequorum_inst->dispatch_mutex);
- break; /* exit do while cont is 1 loop */
- } else
- if (dispatch_avail == 0) {
- pthread_mutex_unlock (&votequorum_inst->dispatch_mutex);
- continue; /* next poll */
+ if (dispatch_data == NULL) {
+ if (dispatch_types == CPG_DISPATCH_ALL) {
+ break; /* exit do while cont is 1 loop */
+ } else {
+ continue; /* next poll */
+ }
}
/*
* operate at the same time that votequorum_finalize has been called in another thread.
*/
memcpy (&callbacks, &votequorum_inst->callbacks, sizeof (votequorum_callbacks_t));
- pthread_mutex_unlock (&votequorum_inst->dispatch_mutex);
/*
* Dispatch incoming message
break;
default:
- coroipcc_dispatch_put (votequorum_inst->ipc_ctx);
+ coroipcc_dispatch_put (votequorum_inst->handle);
error = CS_ERR_LIBRARY;
goto error_put;
break;
}
- coroipcc_dispatch_put (votequorum_inst->ipc_ctx);
+ coroipcc_dispatch_put (votequorum_inst->handle);
/*
* Determine if more messages should be processed
goto error_put;
-error_unlock:
- pthread_mutex_unlock (&votequorum_inst->dispatch_mutex);
-
error_put:
hdb_handle_put (&votequorum_handle_t_db, handle);
return (error);