From ece8efaec9e837037e9d0a50c3c52c41ade9a213 Mon Sep 17 00:00:00 2001 From: Steven Dake Date: Mon, 21 Jul 2008 07:34:08 +0000 Subject: [PATCH] Add ability to track changes to queue groups in the messaqge service. git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1580 fd59a12c-fef9-0310-b244-a6a79926bd2f --- exec/msg.c | 458 +++++++++++++++++++++++++++++++++++++++++++--- include/ipc_msg.h | 4 + lib/msg.c | 89 ++++++++- 3 files changed, 525 insertions(+), 26 deletions(-) diff --git a/exec/msg.c b/exec/msg.c index 120a2032..817ced99 100644 --- a/exec/msg.c +++ b/exec/msg.c @@ -101,16 +101,18 @@ struct message_queue { struct queue_group { SaNameT name; + SaUint8T track_flags; + SaMsgQueueGroupPolicyT policy; struct list_head list; struct list_head message_queue_head; -}; +}; struct queue_group_entry { + SaMsgQueueGroupChangesT change; struct message_queue *message_queue; struct list_head list; }; - /* struct queue_cleanup { struct message_queue *queue; @@ -540,6 +542,7 @@ struct req_exec_msg_queuegroupcreate { mar_req_header_t header; mar_message_source_t source; SaNameT queue_group_name; + SaMsgQueueGroupPolicyT policy; }; struct req_exec_msg_queuegroupinsert { @@ -566,6 +569,8 @@ struct req_exec_msg_queuegrouptrack { mar_req_header_t header; mar_message_source_t source; SaNameT queue_group_name; + SaUint8T track_flags; + SaUint8T buffer_flag; }; struct req_exec_msg_queuegrouptrackstop { @@ -654,7 +659,25 @@ static void print_message_list (struct message_queue *queue) entry = list_entry (list, struct message_entry, list); log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_message_list (%s) (%llu)\n", - (char *)(entry->message.data), (unsigned long long)(entry->time)); + (char *)(entry->message.data), + (unsigned long long)(entry->time)); + } +} + +static void print_queue_group_list (struct queue_group *group) +{ + struct list_head *list; + struct queue_group_entry *entry; + + for (list = group->message_queue_head.next; + list != &group->message_queue_head; + list = list->next) + { + entry = list_entry (list, struct queue_group_entry, list); + + log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_queue_group_list (%s) (%u)\n", + (char *)(entry->message_queue->name.value), + (unsigned int)(entry->change)); } } @@ -679,41 +702,130 @@ static struct message_queue *queue_find (SaNameT *name) static struct queue_group *queue_group_find (SaNameT *name) { struct list_head *list; - struct queue_group *queue_group; + struct queue_group *group; for (list = queue_group_list_head.next; list != &queue_group_list_head; list = list->next) { - queue_group = list_entry (list, struct queue_group, list); + group = list_entry (list, struct queue_group, list); - if (name_match (name, &queue_group->name)) { - return (queue_group); + if (name_match (name, &group->name)) { + return (group); } } return (0); } -static struct queue_group_entry *queue_group_entry_find ( - struct queue_group *queue_group, - struct message_queue *queue) +static struct queue_group_entry *queue_group_entry_find (struct queue_group *group, struct message_queue *queue) { struct list_head *list; - struct queue_group_entry *queue_group_entry; + struct queue_group_entry *entry; - for (list = queue_group->message_queue_head.next; - list != &queue_group->message_queue_head; + for (list = group->message_queue_head.next; + list != &group->message_queue_head; list = list->next) { - queue_group_entry = list_entry (list, struct queue_group_entry, list); - - if (queue_group_entry->message_queue == queue) { - return (queue_group_entry); + entry = list_entry (list, struct queue_group_entry, list); + + if (entry->message_queue == queue) { + return (entry); } } return (0); } +static unsigned int queue_group_member_count (struct queue_group *group) +{ + struct list_head *list; + + unsigned int count = 0; + + for (list = group->message_queue_head.next; + list != &group->message_queue_head; + list = list->next) + { + count++; + } + return (count); +} + +static unsigned int queue_group_change_count (struct queue_group *group) +{ + struct list_head *list; + struct queue_group_entry *entry; + + unsigned int count = 0; + + for (list = group->message_queue_head.next; + list != &group->message_queue_head; + list = list->next) + { + entry = list_entry (list, struct queue_group_entry, list); + + if (entry->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) { + count++; + } + } + return (count); +} + +static unsigned int queue_group_track ( + struct queue_group *group, + unsigned int flags, + void *buffer) +{ + struct list_head *list; + struct queue_group_entry *entry; + + unsigned int i = 0; + + SaMsgQueueGroupNotificationT *notification = + (SaMsgQueueGroupNotificationT *) buffer; + + + switch (flags) { + + case SA_TRACK_CURRENT: + case SA_TRACK_CHANGES: + + for (list = group->message_queue_head.next; + list != &group->message_queue_head; + list = list->next) + { + entry = list_entry (list, struct queue_group_entry, list); + memcpy (¬ification[i].member.queueName, + &entry->message_queue->name, + sizeof (SaNameT)); + notification[i].change = entry->change; + i++; + } + break; + + case SA_TRACK_CHANGES_ONLY: + + for (list = group->message_queue_head.next; + list != &group->message_queue_head; + list = list->next) + { + entry = list_entry (list, struct queue_group_entry, list); + if (entry->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) { + memcpy (¬ification[i].member.queueName, + &entry->message_queue->name, + sizeof (SaNameT)); + notification[i].change = entry->change; + i++; + } + } + break; + + default: + break; + } + + return (i); +} + static int msg_exec_init_fn (struct objdb_iface_ver0 *objdb) { /* @@ -1016,10 +1128,16 @@ static void message_handler_req_exec_msg_queuegroupinsert ( struct req_exec_msg_queuegroupinsert *req_exec_msg_queuegroupinsert = (struct req_exec_msg_queuegroupinsert *)message; struct res_lib_msg_queuegroupinsert res_lib_msg_queuegroupinsert; + struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack; struct message_queue *queue; struct queue_group *queue_group; struct queue_group_entry *queue_group_entry; + SaMsgQueueGroupNotificationT *notification; SaAisErrorT error = SA_AIS_OK; + SaAisErrorT error_cb = SA_AIS_OK; + + unsigned int change_count = 0; + unsigned int member_count = 0; queue_group = queue_group_find (&req_exec_msg_queuegroupinsert->queue_group_name); @@ -1043,7 +1161,50 @@ static void message_handler_req_exec_msg_queuegroupinsert ( list_init (&queue_group_entry->list); list_add (&queue_group_entry->list, &queue_group->message_queue_head); list_add (&queue->list, &queue_list_head); + queue_group_entry->message_queue = queue; + queue_group_entry->change = SA_MSG_QUEUE_GROUP_ADDED; + + if (queue_group->track_flags & SA_TRACK_CHANGES) { + member_count = queue_group_member_count (queue_group); + change_count = queue_group_change_count (queue_group); + + notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count); + + if (notification == NULL) { + error_cb = SA_AIS_ERR_NO_MEMORY; + goto error_track; + } + + memset (notification, 0, sizeof (SaMsgQueueGroupNotificationT) * member_count); + + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems = + queue_group_track (queue_group, + SA_TRACK_CHANGES, + (void *)(notification)); + } + + if (queue_group->track_flags & SA_TRACK_CHANGES_ONLY) { + member_count = queue_group_member_count (queue_group); + change_count = queue_group_change_count (queue_group); + + notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * change_count); + + if (notification == NULL) { + error_cb = SA_AIS_ERR_NO_MEMORY; + goto error_track; + } + + memset (notification, 0, sizeof (SaMsgQueueGroupNotificationT) * change_count); + + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems = + queue_group_track (queue_group, + SA_TRACK_CHANGES_ONLY, + (void *)(notification)); + } + +error_track: + queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE; error_exit: if (message_source_is_local(&req_exec_msg_queuegroupinsert->source)) { @@ -1057,6 +1218,38 @@ error_exit: req_exec_msg_queuegroupinsert->source.conn, &res_lib_msg_queuegroupinsert, sizeof (struct res_lib_msg_queuegroupinsert)); + + /* + * Track changes (callback) if tracking is enabled + */ + + if ((queue_group->track_flags & SA_TRACK_CHANGES) || + (queue_group->track_flags & SA_TRACK_CHANGES_ONLY)) + { + res_lib_msg_queuegrouptrack.header.size = + (sizeof (struct res_lib_msg_queuegrouptrack) + + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems)); + res_lib_msg_queuegrouptrack.header.id = + MESSAGE_RES_MSG_QUEUEGROUPTRACK; + res_lib_msg_queuegrouptrack.header.error = error_cb; + res_lib_msg_queuegrouptrack.numberOfMembers = member_count; + + memcpy (&res_lib_msg_queuegrouptrack.queueGroupName, + &req_exec_msg_queuegroupinsert->queue_group_name, + sizeof (SaNameT)); + + openais_conn_send_response ( + openais_conn_partner_get (req_exec_msg_queuegroupinsert->source.conn), + &res_lib_msg_queuegrouptrack, + sizeof (struct res_lib_msg_queuegrouptrack)); + + openais_conn_send_response ( + openais_conn_partner_get (req_exec_msg_queuegroupinsert->source.conn), + notification, + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems)); + } } } @@ -1067,10 +1260,16 @@ static void message_handler_req_exec_msg_queuegroupremove ( struct req_exec_msg_queuegroupremove *req_exec_msg_queuegroupremove = (struct req_exec_msg_queuegroupremove *)message; struct res_lib_msg_queuegroupremove res_lib_msg_queuegroupremove; + struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack; struct queue_group *queue_group; struct message_queue *queue; struct queue_group_entry *queue_group_entry; + SaMsgQueueGroupNotificationT *notification; SaAisErrorT error = SA_AIS_OK; + SaAisErrorT error_cb = SA_AIS_OK; + + unsigned int change_count = 0; + unsigned int member_count = 0; queue_group = queue_group_find (&req_exec_msg_queuegroupremove->queue_group_name); if (queue_group == 0) { @@ -1090,6 +1289,49 @@ static void message_handler_req_exec_msg_queuegroupremove ( goto error_exit; } + queue_group_entry->change = SA_MSG_QUEUE_GROUP_REMOVED; + + if (queue_group->track_flags & SA_TRACK_CHANGES) { + member_count = queue_group_member_count (queue_group); + change_count = queue_group_change_count (queue_group); + + notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count); + + if (notification == NULL) { + error_cb = SA_AIS_ERR_NO_MEMORY; + goto error_track; + } + + memset (notification, 0, (sizeof (SaMsgQueueGroupNotificationT) * member_count)); + + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems = + queue_group_track (queue_group, + SA_TRACK_CHANGES, + (void *)(notification)); + } + + if (queue_group->track_flags & SA_TRACK_CHANGES_ONLY) { + member_count = queue_group_member_count (queue_group); + change_count = queue_group_change_count (queue_group); + + notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * change_count); + + if (notification == NULL) { + error_cb = SA_AIS_ERR_NO_MEMORY; + goto error_track; + } + + memset (notification, 0, (sizeof (SaMsgQueueGroupNotificationT) * change_count)); + + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems = + queue_group_track (queue_group, + SA_TRACK_CHANGES_ONLY, + (void *)(notification)); + } + +error_track: + queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE; + list_del (&queue_group_entry->list); error_exit: @@ -1104,6 +1346,38 @@ error_exit: req_exec_msg_queuegroupremove->source.conn, &res_lib_msg_queuegroupremove, sizeof (struct res_lib_msg_queuegroupremove)); + + /* + * Track changes (callback) if tracking is enabled + */ + + if ((queue_group->track_flags & SA_TRACK_CHANGES) || + (queue_group->track_flags & SA_TRACK_CHANGES_ONLY)) + { + res_lib_msg_queuegrouptrack.header.size = + (sizeof (struct res_lib_msg_queuegrouptrack) + + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems)); + res_lib_msg_queuegrouptrack.header.id = + MESSAGE_RES_MSG_QUEUEGROUPTRACK; + res_lib_msg_queuegrouptrack.header.error = error_cb; + res_lib_msg_queuegrouptrack.numberOfMembers = member_count; + + memcpy (&res_lib_msg_queuegrouptrack.queueGroupName, + &req_exec_msg_queuegroupremove->queue_group_name, + sizeof (SaNameT)); + + openais_conn_send_response ( + openais_conn_partner_get (req_exec_msg_queuegroupremove->source.conn), + &res_lib_msg_queuegrouptrack, + sizeof (struct res_lib_msg_queuegrouptrack)); + + openais_conn_send_response ( + openais_conn_partner_get (req_exec_msg_queuegroupremove->source.conn), + notification, + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems)); + } } } @@ -1144,22 +1418,153 @@ static void message_handler_req_exec_msg_queuegrouptrack ( void *message, unsigned int nodeid) { -#if 0 struct req_exec_msg_queuegrouptrack *req_exec_msg_queuegrouptrack = (struct req_exec_msg_queuegrouptrack *)message; struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack; -#endif + struct queue_group *queue_group; + SaAisErrorT error = SA_AIS_OK; + + unsigned int change_count = 0; + unsigned int member_count = 0; + + SaMsgQueueGroupNotificationT *notification; + + queue_group = queue_group_find (&req_exec_msg_queuegrouptrack->queue_group_name); + + if (queue_group == 0) { + error = SA_AIS_ERR_NOT_EXIST; + goto error_exit; + } + + member_count = queue_group_member_count (queue_group); + change_count = queue_group_change_count (queue_group); + + if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CURRENT) { + /* DEBUG */ + log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CURRENT\n"); + + notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count); + + if (notification == NULL) { + error = SA_AIS_ERR_NO_MEMORY; + goto error_exit; + } + + memset (notification, 0, sizeof (SaMsgQueueGroupNotificationT) * member_count); + + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems = + queue_group_track (queue_group, SA_TRACK_CURRENT, (void *)(notification)); + } + + if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CHANGES) { + /* DEBUG */ + log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CHANGES\n"); + queue_group->track_flags = req_exec_msg_queuegrouptrack->track_flags; + } + + if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CHANGES_ONLY) { + /* DEBUG */ + log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CHANGES_ONLY\n"); + queue_group->track_flags = req_exec_msg_queuegrouptrack->track_flags; + } + +error_exit: + if (message_source_is_local(&req_exec_msg_queuegrouptrack->source)) { + res_lib_msg_queuegrouptrack.header.size = + sizeof (struct res_lib_msg_queuegrouptrack); + res_lib_msg_queuegrouptrack.header.id = + MESSAGE_RES_MSG_QUEUEGROUPTRACK; + res_lib_msg_queuegrouptrack.header.error = error; + res_lib_msg_queuegrouptrack.numberOfMembers = member_count; + + memcpy (&res_lib_msg_queuegrouptrack.queueGroupName, + &req_exec_msg_queuegrouptrack->queue_group_name, + sizeof (SaNameT)); + + if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CURRENT) { + if (req_exec_msg_queuegrouptrack->buffer_flag) { + res_lib_msg_queuegrouptrack.header.size += + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems); + + openais_conn_send_response ( + req_exec_msg_queuegrouptrack->source.conn, + &res_lib_msg_queuegrouptrack, + sizeof (struct res_lib_msg_queuegrouptrack)); + + openais_conn_send_response ( + req_exec_msg_queuegrouptrack->source.conn, + notification, + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems)); + } else { + openais_conn_send_response ( + req_exec_msg_queuegrouptrack->source.conn, + &res_lib_msg_queuegrouptrack, + sizeof (struct res_lib_msg_queuegrouptrack)); + + res_lib_msg_queuegrouptrack.header.size += + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems); + + openais_conn_send_response ( + openais_conn_partner_get (req_exec_msg_queuegrouptrack->source.conn), + &res_lib_msg_queuegrouptrack, + sizeof (struct res_lib_msg_queuegrouptrack)); + + openais_conn_send_response ( + openais_conn_partner_get (req_exec_msg_queuegrouptrack->source.conn), + notification, + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems)); + } + } else { + openais_conn_send_response ( + req_exec_msg_queuegrouptrack->source.conn, + &res_lib_msg_queuegrouptrack, + sizeof (struct res_lib_msg_queuegrouptrack)); + } + } } static void message_handler_req_exec_msg_queuegrouptrackstop ( void *message, unsigned int nodeid) { -#if 0 struct req_exec_msg_queuegrouptrackstop *req_exec_msg_queuegrouptrackstop = (struct req_exec_msg_queuegrouptrackstop *)message; struct res_lib_msg_queuegrouptrackstop res_lib_msg_queuegrouptrackstop; -#endif + struct queue_group *queue_group; + SaAisErrorT error = SA_AIS_OK; + + queue_group = queue_group_find (&req_exec_msg_queuegrouptrackstop->queue_group_name); + + if (queue_group == 0) { + error = SA_AIS_ERR_NOT_EXIST; + goto error_exit; + } + + if ((queue_group->track_flags != SA_TRACK_CHANGES) && + (queue_group->track_flags != SA_TRACK_CHANGES_ONLY)) { + error = SA_AIS_ERR_NOT_EXIST; + goto error_exit; + } + + queue_group->track_flags = 0; + +error_exit: + if (message_source_is_local(&req_exec_msg_queuegrouptrackstop->source)) { + res_lib_msg_queuegrouptrackstop.header.size = + sizeof (struct res_lib_msg_queuegrouptrackstop); + res_lib_msg_queuegrouptrackstop.header.id = + MESSAGE_RES_MSG_QUEUEGROUPTRACKSTOP; + res_lib_msg_queuegrouptrackstop.header.error = error; + + openais_conn_send_response ( + req_exec_msg_queuegrouptrackstop->source.conn, + &res_lib_msg_queuegrouptrackstop, + sizeof (struct res_lib_msg_queuegrouptrackstop)); + } } static void message_handler_req_exec_msg_messagesend ( @@ -1209,9 +1614,6 @@ static void message_handler_req_exec_msg_messagesend ( list_add_tail (&entry->list, &queue->message_list_head); - /* DEBUG */ - print_message_list (queue); - error_exit: if (message_source_is_local(&req_exec_msg_messagesend->source)) { @@ -1536,6 +1938,9 @@ static void message_handler_req_lib_msg_queuegroupcreate ( memcpy (&req_exec_msg_queuegroupcreate.queue_group_name, &req_lib_msg_queuegroupcreate->queueGroupName, sizeof (SaNameT)); + req_exec_msg_queuegroupcreate.policy = + req_lib_msg_queuegroupcreate->queueGroupPolicy; + iovec.iov_base = (char *)&req_exec_msg_queuegroupcreate; iovec.iov_len = sizeof (req_exec_msg_queuegroupcreate); @@ -1656,6 +2061,11 @@ static void message_handler_req_lib_msg_queuegrouptrack ( memcpy (&req_exec_msg_queuegrouptrack.queue_group_name, &req_lib_msg_queuegrouptrack->queueGroupName, sizeof (SaNameT)); + req_exec_msg_queuegrouptrack.track_flags = + req_lib_msg_queuegrouptrack->trackFlags; + req_exec_msg_queuegrouptrack.buffer_flag = + req_lib_msg_queuegrouptrack->bufferFlag; + iovec.iov_base = (char *)&req_exec_msg_queuegrouptrack; iovec.iov_len = sizeof (req_exec_msg_queuegrouptrack); diff --git a/include/ipc_msg.h b/include/ipc_msg.h index 527f739f..de811c61 100644 --- a/include/ipc_msg.h +++ b/include/ipc_msg.h @@ -178,10 +178,14 @@ struct req_lib_msg_queuegrouptrack { mar_req_header_t header; SaNameT queueGroupName; SaUint8T trackFlags; + SaUint8T bufferFlag; }; struct res_lib_msg_queuegrouptrack { mar_res_header_t header; + SaNameT queueGroupName; + SaUint32T numberOfMembers; + SaMsgQueueGroupNotificationBufferT notificationBuffer; }; struct req_lib_msg_queuegrouptrackstop { diff --git a/lib/msg.c b/lib/msg.c index 14ae2000..c04ffb64 100644 --- a/lib/msg.c +++ b/lib/msg.c @@ -301,7 +301,7 @@ saMsgDispatch ( struct res_lib_msg_queueopenasync *res_lib_msg_queueopenasync; struct res_lib_msg_messagesendasync *res_lib_msg_messagesendasync; - + struct res_lib_msg_queuegrouptrack *res_lib_msg_queuegrouptrack; if (dispatchFlags != SA_DISPATCH_ONE && dispatchFlags != SA_DISPATCH_ALL && @@ -354,6 +354,7 @@ saMsgDispatch ( pthread_mutex_unlock(&msgInstance->dispatch_mutex); break; /* exit do while cont is 1 loop */ } else + if (dispatch_avail == 0) { pthread_mutex_unlock(&msgInstance->dispatch_mutex); continue; @@ -366,6 +367,7 @@ saMsgDispatch ( if (error != SA_AIS_OK) { goto error_unlock; } + if (dispatch_data.header.size > sizeof (mar_res_header_t)) { error = saRecvRetry (msgInstance->dispatch_fd, &dispatch_data.data, dispatch_data.header.size - sizeof (mar_res_header_t)); @@ -453,6 +455,26 @@ saMsgDispatch ( break; + case MESSAGE_RES_MSG_QUEUEGROUPTRACK: + + if (callbacks.saMsgQueueGroupTrackCallback == NULL) { + continue; + } + res_lib_msg_queuegrouptrack = + (struct res_lib_msg_queuegrouptrack *) &dispatch_data; + + res_lib_msg_queuegrouptrack->notificationBuffer.notification = + (SaMsgQueueGroupNotificationT *) + (((char *) &dispatch_data) + sizeof (struct res_lib_msg_queuegrouptrack)); + + callbacks.saMsgQueueGroupTrackCallback ( + &res_lib_msg_queuegrouptrack->queueGroupName, + &res_lib_msg_queuegrouptrack->notificationBuffer, + res_lib_msg_queuegrouptrack->numberOfMembers, + res_lib_msg_queuegrouptrack->header.error); + + break; + default: /* TODO */ break; @@ -1117,6 +1139,22 @@ saMsgQueueGroupTrack ( return (SA_AIS_ERR_INVALID_PARAM); } + if ((notificationBuffer != NULL) && + (notificationBuffer->notification != NULL) && + (notificationBuffer->numberOfItems == 0)) { + return (SA_AIS_ERR_INVALID_PARAM); + } + + if ((notificationBuffer != NULL) && + (notificationBuffer->notification == NULL)) { + notificationBuffer->numberOfItems = 0; + } + + if ((trackFlags & SA_TRACK_CHANGES) && + (trackFlags & SA_TRACK_CHANGES_ONLY)) { + return (SA_AIS_ERR_BAD_FLAGS); + } + /* DEBUG */ printf ("[DEBUG]: saMsgQueueGroupTrack { queueGroupName = %s }\n", (char *) queueGroupName->value); @@ -1133,20 +1171,67 @@ saMsgQueueGroupTrack ( MESSAGE_REQ_MSG_QUEUEGROUPTRACK; req_lib_msg_queuegrouptrack.trackFlags = trackFlags; + req_lib_msg_queuegrouptrack.bufferFlag = (notificationBuffer != NULL); + + /* DEBUG */ + printf ("[DEBUG]: saMsgQueueGroupTrack { bufferFlag = %d }\n", + (int)(req_lib_msg_queuegrouptrack.bufferFlag)); memcpy (&req_lib_msg_queuegrouptrack.queueGroupName, queueGroupName, sizeof (SaNameT)); pthread_mutex_lock (&msgInstance->response_mutex); + /* error = saSendReceiveReply (msgInstance->response_fd, &req_lib_msg_queuegrouptrack, sizeof (struct req_lib_msg_queuegrouptrack), &res_lib_msg_queuegrouptrack, sizeof (struct res_lib_msg_queuegrouptrack)); + */ - pthread_mutex_unlock (&msgInstance->response_mutex); + error = saSendRetry (msgInstance->response_fd, &req_lib_msg_queuegrouptrack, + sizeof (struct req_lib_msg_queuegrouptrack)); + if (error != SA_AIS_OK) { + goto error_exit; + } + + error = saRecvRetry (msgInstance->response_fd, &res_lib_msg_queuegrouptrack, + sizeof (struct res_lib_msg_queuegrouptrack)); + if (error != SA_AIS_OK) { + goto error_exit; + } + if ((trackFlags & SA_TRACK_CURRENT) && (notificationBuffer != NULL)) { + if (notificationBuffer->notification != NULL) { + if (notificationBuffer->numberOfItems < res_lib_msg_queuegrouptrack.numberOfMembers) { + error = SA_AIS_ERR_NO_SPACE; + goto error_exit; + } + } else { + notificationBuffer->notification = + malloc (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.numberOfMembers); + + if (notificationBuffer->notification == NULL) { + error = SA_AIS_ERR_NO_MEMORY; + goto error_exit; + } + + memset (notificationBuffer->notification, 0, + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.numberOfMembers)); + } + + error = saRecvRetry (msgInstance->response_fd, + notificationBuffer->notification, + (sizeof (SaMsgQueueGroupNotificationT) * + res_lib_msg_queuegrouptrack.numberOfMembers)); + } + +error_exit: + pthread_mutex_unlock (&msgInstance->response_mutex); +error_put_msg: saHandleInstancePut (&msgHandleDatabase, msgHandle); return (error == SA_AIS_OK ? res_lib_msg_queuegrouptrack.header.error : error); -- 2.39.5