From: Steven Dake Date: Wed, 22 Apr 2009 05:41:30 +0000 (+0000) Subject: Change shared memory to use mmap() system calls. X-Git-Tag: v1.3.0~556 X-Git-Url: https://git.proxmox.com/?a=commitdiff_plain;h=cee464489f0b5934f440077323160748f9860a53;p=mirror_corosync.git Change shared memory to use mmap() system calls. git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2115 fd59a12c-fef9-0310-b244-a6a79926bd2f --- diff --git a/exec/coroipcs.c b/exec/coroipcs.c index 7ae50767..10e158b5 100644 --- a/exec/coroipcs.c +++ b/exec/coroipcs.c @@ -125,12 +125,17 @@ struct conn_info { int refcount; key_t shmkey; key_t semkey; - int shmid; int semid; unsigned int pending_semops; pthread_mutex_t mutex; - struct shared_memory *mem; + struct control_buffer *control_buffer; + char *request_buffer; + char *response_buffer; char *dispatch_buffer; + size_t control_size; + size_t request_size; + size_t response_size; + size_t dispatch_size; struct list_head outq_head; void *private_data; struct list_head list; @@ -152,7 +157,10 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len, int locked); static int -memory_map (const char *path, void **buf, size_t bytes) +memory_map ( + const char *path, + size_t bytes, + void **buf) { int fd; void *addr_orig; @@ -188,7 +196,10 @@ memory_map (const char *path, void **buf, size_t bytes) } static int -circular_memory_map (const char *path, void **buf, size_t bytes) +circular_memory_map ( + const char *path, + size_t bytes, + void **buf) { int fd; void *addr_orig; @@ -301,8 +312,8 @@ static inline int zcb_alloc ( res = memory_map ( path_to_file, - addr, - size); + size, + addr); if (res == -1) { return (-1); } @@ -399,8 +410,9 @@ static inline int conn_info_destroy (struct conn_info *conn_info) /* * Destroy shared memory segment and semaphore */ - shmdt (conn_info->mem); - res = shmctl (conn_info->shmid, IPC_RMID, NULL); + res = munmap (conn_info->control_buffer, conn_info->control_size); + res = munmap (conn_info->request_buffer, conn_info->request_size); + res = munmap (conn_info->response_buffer, conn_info->response_size); semctl (conn_info->semid, 0, IPC_RMID); /* @@ -410,7 +422,7 @@ static inline int conn_info_destroy (struct conn_info *conn_info) api->free (conn_info->private_data); } close (conn_info->fd); - res = circular_memory_unmap (conn_info->dispatch_buffer, DISPATCH_SIZE); + res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size); zcb_all_free (conn_info); api->free (conn_info); api->serialize_unlock (); @@ -450,7 +462,7 @@ static inline void zerocopy_operations_process ( { mar_req_header_t *header; - header = (mar_req_header_t *)conn_info->mem->req_buffer; + header = (mar_req_header_t *)conn_info->request_buffer; if (header->id == ZC_ALLOC_HEADER) { mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)header; mar_res_header_t res_header; @@ -816,14 +828,25 @@ void coroipcs_ipc_exit (void) { struct list_head *list; struct conn_info *conn_info; + unsigned int res; for (list = conn_info_list_head.next; list != &conn_info_list_head; list = list->next) { conn_info = list_entry (list, struct conn_info, list); - shmdt (conn_info->mem); - shmctl (conn_info->shmid, IPC_RMID, NULL); + /* + * Unmap memory segments + */ + res = munmap (conn_info->control_buffer, + conn_info->control_size); + res = munmap (conn_info->request_buffer, + conn_info->request_size); + res = munmap (conn_info->response_buffer, + conn_info->response_size); + res = circular_memory_unmap (conn_info->dispatch_buffer, + conn_info->dispatch_size); + semctl (conn_info->semid, 0, IPC_RMID); pthread_kill (conn_info->thread, SIGUSR1); @@ -846,7 +869,7 @@ int coroipcs_response_send (void *conn, const void *msg, size_t mlen) struct sembuf sop; int res; - memcpy (conn_info->mem->res_buffer, msg, mlen); + memcpy (conn_info->response_buffer, msg, mlen); sop.sem_num = 1; sop.sem_op = 1; sop.sem_flg = 0; @@ -871,7 +894,8 @@ int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned in int i; for (i = 0; i < iov_len; i++) { - memcpy (&conn_info->mem->res_buffer[write_idx], iov[i].iov_base, iov[i].iov_len); + memcpy (&conn_info->response_buffer[write_idx], + iov[i].iov_base, iov[i].iov_len); write_idx += iov[i].iov_len; } @@ -896,11 +920,11 @@ static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info) unsigned int n_write; unsigned int bytes_left; - n_read = conn_info->mem->read; - n_write = conn_info->mem->write; + n_read = conn_info->control_buffer->read; + n_write = conn_info->control_buffer->write; if (n_read <= n_write) { - bytes_left = DISPATCH_SIZE - n_write + n_read; + bytes_left = conn_info->dispatch_size - n_write + n_read; } else { bytes_left = n_read - n_write; } @@ -911,10 +935,10 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l { unsigned int write_idx; - write_idx = conn_info->mem->write; + write_idx = conn_info->control_buffer->write; memcpy (&conn_info->dispatch_buffer[write_idx], msg, len); - conn_info->mem->write = (write_idx + len) % (DISPATCH_SIZE); + conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size; } static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len, @@ -1230,21 +1254,36 @@ int coroipcs_handler_dispatch ( return (0); } - conn_info->shmkey = req_setup->shmkey; conn_info->semkey = req_setup->semkey; + res = memory_map ( + req_setup->control_file, + req_setup->control_size, + (void *)&conn_info->control_buffer); + conn_info->control_size = req_setup->control_size; + + res = memory_map ( + req_setup->request_file, + req_setup->request_size, + (void *)&conn_info->request_buffer); + conn_info->request_size = req_setup->request_size; + + res = memory_map ( + req_setup->response_file, + req_setup->response_size, + (void *)&conn_info->response_buffer); + conn_info->response_size = req_setup->response_size; + res = circular_memory_map ( req_setup->dispatch_file, - (void *)&conn_info->dispatch_buffer, - DISPATCH_SIZE); + req_setup->dispatch_size, + (void *)&conn_info->dispatch_buffer); + conn_info->dispatch_size = req_setup->dispatch_size; conn_info->service = req_setup->service; conn_info->refcount = 0; conn_info->notify_flow_control_enabled = 0; conn_info->setup_bytes_read = 0; - conn_info->shmid = shmget (conn_info->shmkey, - sizeof (struct shared_memory), 0600); - conn_info->mem = shmat (conn_info->shmid, NULL, 0); conn_info->semid = semget (conn_info->semkey, 3, 0600); conn_info->pending_semops = 0; diff --git a/include/corosync/coroipcc.h b/include/corosync/coroipcc.h index d57d3f66..2c7fc2e8 100644 --- a/include/corosync/coroipcc.h +++ b/include/corosync/coroipcc.h @@ -47,7 +47,10 @@ extern cs_error_t coroipcc_service_connect ( const char *socket_name, - enum service_types service, + unsigned int service, + size_t request_size, + size_t respnse__size, + size_t dispatch_size, void **ipc_context); extern cs_error_t diff --git a/include/corosync/ipc_gen.h b/include/corosync/ipc_gen.h index 01fee2c6..7ec9eaae 100644 --- a/include/corosync/ipc_gen.h +++ b/include/corosync/ipc_gen.h @@ -64,13 +64,11 @@ enum req_init_types { #define MESSAGE_REQ_CHANGE_EUID 1 #define MESSAGE_REQ_OUTQ_FLUSH 2 -#define REQ_SIZE 1000000 -#define RES_SIZE 1000000 -#define DISPATCH_SIZE 8192*128 +#define IPC_REQUEST_SIZE 8192*128 +#define IPC_RESPONSE_SIZE 8192*128 +#define IPC_DISPATCH_SIZE 8192*128 -struct shared_memory { - unsigned char req_buffer[REQ_SIZE]; - unsigned char res_buffer[RES_SIZE]; +struct control_buffer { unsigned int read; unsigned int write; }; @@ -86,9 +84,15 @@ typedef struct { typedef struct { int service __attribute__((aligned(8))); - unsigned long long shmkey __attribute__((aligned(8))); unsigned long long semkey __attribute__((aligned(8))); - char dispatch_file[64]__attribute__((aligned(8))); + char control_file[64] __attribute__((aligned(8))); + char request_file[64] __attribute__((aligned(8))); + char response_file[64] __attribute__((aligned(8))); + char dispatch_file[64] __attribute__((aligned(8))); + size_t control_size __attribute__((aligned(8))); + size_t request_size __attribute__((aligned(8))); + size_t response_size __attribute__((aligned(8))); + size_t dispatch_size __attribute__((aligned(8))); } mar_req_setup_t __attribute__((aligned(8))); typedef struct { diff --git a/lib/cfg.c b/lib/cfg.c index 45eb047c..ea7e301b 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -105,7 +105,13 @@ corosync_cfg_initialize ( goto error_destroy; } - error = coroipcc_service_connect (IPC_SOCKET_NAME, CFG_SERVICE, &cfg_instance->ipc_ctx); + error = coroipcc_service_connect ( + IPC_SOCKET_NAME, + CFG_SERVICE, + IPC_REQUEST_SIZE, + IPC_RESPONSE_SIZE, + IPC_DISPATCH_SIZE, + &cfg_instance->ipc_ctx); if (error != CS_OK) { goto error_put_destroy; } diff --git a/lib/confdb.c b/lib/confdb.c index cbebb3c2..898cd097 100644 --- a/lib/confdb.c +++ b/lib/confdb.c @@ -157,7 +157,13 @@ cs_error_t confdb_initialize ( confdb_inst->standalone = 1; } else { - error = coroipcc_service_connect (IPC_SOCKET_NAME, CONFDB_SERVICE, &confdb_inst->ipc_ctx); + error = coroipcc_service_connect ( + IPC_SOCKET_NAME, + CONFDB_SERVICE, + IPC_REQUEST_SIZE, + IPC_RESPONSE_SIZE, + IPC_DISPATCH_SIZE, + &confdb_inst->ipc_ctx); } if (error != CS_OK) goto error_put_destroy; diff --git a/lib/coroipcc.c b/lib/coroipcc.c index 679072b8..22254b02 100644 --- a/lib/coroipcc.c +++ b/lib/coroipcc.c @@ -80,8 +80,14 @@ struct ipc_segment { int shmid; int semid; int flow_control_state; - struct shared_memory *shared_memory; - void *dispatch_buffer; + struct control_buffer *control_buffer; + char *request_buffer; + char *response_buffer; + char *dispatch_buffer; + size_t control_size; + size_t request_size; + size_t response_size; + size_t dispatch_size; uid_t euid; }; @@ -375,22 +381,28 @@ memory_map (char *path, const char *file, void **buf, size_t bytes) return (0); } -cs_error_t +extern cs_error_t coroipcc_service_connect ( const char *socket_name, - enum service_types service, - void **shmseg) + unsigned int service, + size_t request_size, + size_t response_size, + size_t dispatch_size, + void **ipc_context) + { int request_fd; struct sockaddr_un address; cs_error_t error; struct ipc_segment *ipc_segment; - key_t shmkey = 0; key_t semkey = 0; int res; mar_req_setup_t req_setup; mar_res_setup_t res_setup; union semun semun; + char control_map_path[128]; + char request_map_path[128]; + char response_map_path[128]; char dispatch_map_path[128]; res_setup.error = CS_ERR_LIBRARY; @@ -425,21 +437,6 @@ coroipcc_service_connect ( } bzero (ipc_segment, sizeof (struct ipc_segment)); - /* - * Allocate a shared memory segment - */ - while (1) { - shmkey = random(); - if ((ipc_segment->shmid - = shmget (shmkey, sizeof (struct shared_memory), - IPC_CREAT|IPC_EXCL|0600)) != -1) { - break; - } - if (errno != EEXIST) { - goto error_exit; - } - } - /* * Allocate a semaphore segment */ @@ -455,14 +452,6 @@ coroipcc_service_connect ( } } - /* - * Attach to shared memory segment - */ - ipc_segment->shared_memory = shmat (ipc_segment->shmid, NULL, 0); - if (ipc_segment->shared_memory == (void *)-1) { - goto error_exit; - } - semun.val = 0; res = semctl (ipc_segment->semid, 0, SETVAL, semun); if (res != 0) { @@ -474,14 +463,43 @@ coroipcc_service_connect ( goto error_exit; } - res = circular_memory_map (dispatch_map_path, - "dispatch_bufer-XXXXXX", - &ipc_segment->dispatch_buffer, DISPATCH_SIZE); - strcpy (req_setup.dispatch_file, dispatch_map_path); - req_setup.shmkey = shmkey; - req_setup.semkey = semkey; + res = memory_map ( + control_map_path, + "control_buffer-XXXXXX", + (void *)&ipc_segment->control_buffer, + 8192); + + res = memory_map ( + request_map_path, + "request_buffer-XXXXXX", + (void *)&ipc_segment->request_buffer, + request_size); + + res = memory_map ( + response_map_path, + "response_buffer-XXXXXX", + (void *)&ipc_segment->response_buffer, + response_size); + + res = circular_memory_map ( + dispatch_map_path, + "dispatch_buffer-XXXXXX", + (void *)&ipc_segment->dispatch_buffer, + dispatch_size); + /* + * Initialize IPC setup message + */ req_setup.service = service; + strcpy (req_setup.control_file, control_map_path); + strcpy (req_setup.request_file, request_map_path); + strcpy (req_setup.response_file, response_map_path); + strcpy (req_setup.dispatch_file, dispatch_map_path); + req_setup.control_size = 8192; + req_setup.request_size = request_size; + req_setup.response_size = response_size; + req_setup.dispatch_size = dispatch_size; + req_setup.semkey = semkey; error = coroipcc_send (request_fd, &req_setup, sizeof (mar_req_setup_t)); if (error != 0) { @@ -494,22 +512,21 @@ coroipcc_service_connect ( ipc_segment->fd = request_fd; ipc_segment->flow_control_state = 0; - *shmseg = ipc_segment; - /* - * Something go wrong with server - * Cleanup all - */ if (res_setup.error == CS_ERR_TRY_AGAIN) { goto error_exit; } + ipc_segment->control_size = 8192; + ipc_segment->request_size = request_size; + ipc_segment->response_size = response_size; + ipc_segment->dispatch_size = dispatch_size; + + *ipc_context = ipc_segment; return (res_setup.error); error_exit: close (request_fd); - if (ipc_segment->shmid > 0) - shmctl (ipc_segment->shmid, IPC_RMID, NULL); if (ipc_segment->semid > 0) semctl (ipc_segment->semid, 0, IPC_RMID); return (res_setup.error); @@ -523,11 +540,13 @@ coroipcc_service_disconnect ( shutdown (ipc_segment->fd, SHUT_RDWR); close (ipc_segment->fd); - shmdt (ipc_segment->shared_memory); /* * << 1 (or multiplied by 2) because this is a wrapped memory buffer */ - memory_unmap (ipc_segment->dispatch_buffer, (DISPATCH_SIZE) << 1); + memory_unmap (ipc_segment->control_buffer, ipc_segment->control_size); + memory_unmap (ipc_segment->request_buffer, ipc_segment->request_size); + memory_unmap (ipc_segment->response_buffer, ipc_segment->response_size); + memory_unmap (ipc_segment->dispatch_buffer, (ipc_segment->dispatch_size) << 1); free (ipc_segment); return (CS_OK); } @@ -614,7 +633,7 @@ retry_recv: data_addr = ipc_segment->dispatch_buffer; - data_addr = &data_addr[ipc_segment->shared_memory->read]; + data_addr = &data_addr[ipc_segment->control_buffer->read]; *data = (void *)data_addr; return (1); @@ -648,10 +667,10 @@ retry_semop: addr = ipc_segment->dispatch_buffer; - read_idx = ipc_segment->shared_memory->read; + read_idx = ipc_segment->control_buffer->read; header = (mar_res_header_t *) &addr[read_idx]; - ipc_segment->shared_memory->read = - (read_idx + header->size) % (DISPATCH_SIZE); + ipc_segment->control_buffer->read = + (read_idx + header->size) % ipc_segment->dispatch_size; return (0); } @@ -668,7 +687,7 @@ coroipcc_msg_send ( int req_buffer_idx = 0; for (i = 0; i < iov_len; i++) { - memcpy (&ipc_segment->shared_memory->req_buffer[req_buffer_idx], + memcpy (&ipc_segment->request_buffer[req_buffer_idx], iov[i].iov_base, iov[i].iov_len); req_buffer_idx += iov[i].iov_len; @@ -726,7 +745,7 @@ retry_semop: return (CS_ERR_LIBRARY); } - memcpy (res_msg, ipc_segment->shared_memory->res_buffer, res_len); + memcpy (res_msg, ipc_segment->response_buffer, res_len); return (CS_OK); } @@ -760,7 +779,7 @@ retry_semop: return (CS_ERR_LIBRARY); } - *res_msg = (char *)ipc_segment->shared_memory->res_buffer; + *res_msg = (char *)ipc_segment->response_buffer; return (CS_OK); } diff --git a/lib/cpg.c b/lib/cpg.c index e11175da..dd02161a 100644 --- a/lib/cpg.c +++ b/lib/cpg.c @@ -102,7 +102,13 @@ cs_error_t cpg_initialize ( goto error_destroy; } - error = coroipcc_service_connect (IPC_SOCKET_NAME, CPG_SERVICE, &cpg_inst->ipc_ctx); + error = coroipcc_service_connect ( + IPC_SOCKET_NAME, + CPG_SERVICE, + IPC_REQUEST_SIZE, + IPC_RESPONSE_SIZE, + IPC_DISPATCH_SIZE, + &cpg_inst->ipc_ctx); if (error != CS_OK) { goto error_put_destroy; } diff --git a/lib/evs.c b/lib/evs.c index 5b9174c8..32dbfc3e 100644 --- a/lib/evs.c +++ b/lib/evs.c @@ -110,7 +110,13 @@ evs_error_t evs_initialize ( goto error_destroy; } - error = coroipcc_service_connect (IPC_SOCKET_NAME, EVS_SERVICE, &evs_inst->ipc_ctx); + error = coroipcc_service_connect ( + IPC_SOCKET_NAME, + EVS_SERVICE, + IPC_REQUEST_SIZE, + IPC_RESPONSE_SIZE, + IPC_DISPATCH_SIZE, + &evs_inst->ipc_ctx); if (error != EVS_OK) { goto error_put_destroy; } diff --git a/lib/pload.c b/lib/pload.c index 9fb8f77e..b04eb949 100644 --- a/lib/pload.c +++ b/lib/pload.c @@ -101,7 +101,13 @@ unsigned int pload_initialize ( goto error_destroy; } - error = coroipcc_service_connect (IPC_SOCKET_NAME, PLOAD_SERVICE, &pload_inst->ipc_ctx); + error = coroipcc_service_connect ( + IPC_SOCKET_NAME, + PLOAD_SERVICE, + IPC_REQUEST_SIZE, + IPC_RESPONSE_SIZE, + IPC_DISPATCH_SIZE, + &pload_inst->ipc_ctx); if (error != CS_OK) { goto error_put_destroy; } diff --git a/lib/quorum.c b/lib/quorum.c index 7047a2ba..b8971513 100644 --- a/lib/quorum.c +++ b/lib/quorum.c @@ -92,7 +92,13 @@ cs_error_t quorum_initialize ( goto error_destroy; } - error = coroipcc_service_connect (IPC_SOCKET_NAME, QUORUM_SERVICE, &quorum_inst->ipc_ctx); + error = coroipcc_service_connect ( + IPC_SOCKET_NAME, + QUORUM_SERVICE, + IPC_REQUEST_SIZE, + IPC_RESPONSE_SIZE, + IPC_DISPATCH_SIZE, + &quorum_inst->ipc_ctx); if (error != CS_OK) { goto error_put_destroy; } diff --git a/lib/votequorum.c b/lib/votequorum.c index ac926a55..3b86156a 100644 --- a/lib/votequorum.c +++ b/lib/votequorum.c @@ -92,7 +92,13 @@ cs_error_t votequorum_initialize ( goto error_destroy; } - error = coroipcc_service_connect (IPC_SOCKET_NAME, VOTEQUORUM_SERVICE, &votequorum_inst->ipc_ctx); + error = coroipcc_service_connect ( + IPC_SOCKET_NAME, + VOTEQUORUM_SERVICE, + IPC_REQUEST_SIZE, + IPC_RESPONSE_SIZE, + IPC_DISPATCH_SIZE, + &votequorum_inst->ipc_ctx); if (error != CS_OK) { goto error_put_destroy; }