#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
#define MAXIOVS 5
#define RETRANSMIT_ENTRIES_MAX 30
-#define TOKEN_SIZE_MAX 64000 /* bytes */
/*
* Rollover handling:
static void memb_ring_id_create_or_load (struct totemsrp_instance *, struct memb_ring_id *);
static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
-static void memb_state_gather_enter (struct totemsrp_instance *instance, unsigned int gather_from);
+static void memb_state_gather_enter (struct totemsrp_instance *instance);
static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
int fcc_mcasts_allowed);
instance->my_commit_token_seq = SEQNO_START_TOKEN - 1;
- instance->orf_token_retransmit = malloc (TOKEN_SIZE_MAX);
+ instance->orf_token_retransmit = malloc (15000);
instance->memb_state = MEMB_STATE_OPERATIONAL;
memb_set_merge (no_consensus_list, no_consensus_list_entries,
instance->my_failed_list, &instance->my_failed_list_entries);
- memb_state_gather_enter (instance, 0);
+ memb_state_gather_enter (instance);
}
}
{
struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
+ log_printf (instance->totemsrp_log_level_notice,
+ "The token was lost in state %d from timer %p\n", instance->memb_state, data);
switch (instance->memb_state) {
case MEMB_STATE_OPERATIONAL:
- log_printf (instance->totemsrp_log_level_notice,
- "The token was lost in the OPERATIONAL state.\n");
totemrrp_iface_check (instance->totemrrp_handle);
- memb_state_gather_enter (instance, 1);
+ memb_state_gather_enter (instance);
break;
case MEMB_STATE_GATHER:
- log_printf (instance->totemsrp_log_level_notice,
- "The consensus timeout expired.\n");
memb_state_consensus_timeout_expired (instance);
- memb_state_gather_enter (instance, 2);
+ memb_state_gather_enter (instance);
break;
case MEMB_STATE_COMMIT:
- log_printf (instance->totemsrp_log_level_notice,
- "The token was lost in the COMMIT state.\n");
- memb_state_gather_enter (instance, 3);
+ memb_state_gather_enter (instance);
break;
case MEMB_STATE_RECOVERY:
- log_printf (instance->totemsrp_log_level_notice,
- "The token was lost in the RECOVERY state.\n");
ring_state_restore (instance);
- memb_state_gather_enter (instance, 4);
+ memb_state_gather_enter (instance);
break;
}
}
return;
}
-static void memb_state_gather_enter (
- struct totemsrp_instance *instance,
- unsigned int gather_from)
+static void memb_state_gather_enter (struct totemsrp_instance *instance)
{
instance->my_commit_token_seq = SEQNO_START_TOKEN - 1;
memb_consensus_set (instance, &instance->my_id);
log_printf (instance->totemsrp_log_level_notice,
- "entering GATHER state %d.\n", gather_from);
+ "entering GATHER state.\n");
instance->memb_state = MEMB_STATE_GATHER;
case MEMB_STATE_GATHER:
break;
case MEMB_STATE_COMMIT:
+ break;
case MEMB_STATE_OPERATIONAL:
case MEMB_STATE_RECOVERY:
token_retransmit (instance);
iovec.iov_len = sizeof (struct memb_commit_token) +
((sizeof (struct srp_addr) +
sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
- /*
- * Make a copy for retransmission if necessary
- */
- memcpy (instance->orf_token_retransmit, commit_token, iovec.iov_len);
- instance->orf_token_retransmit_size = iovec.iov_len;
for (i = 0; i < instance->totem_config->interface_count; i++) {
totemrrp_token_target_set (
&iovec,
1);
- /*
- * Request retransmission of the commit token in case it is lost
- */
- reset_token_retransmit_timeout (instance);
return (0);
}
* Message Handlers
*/
-struct timeval tv_old;
/*
* message handler called when TOKEN message type received
*/
timersub (&tv_current, &tv_old, &tv_diff);
memcpy (&tv_old, &tv_current, sizeof (struct timeval));
- log_printf (instance->totemsrp_log_level_notice,
- "Time since last token %0.4f ms\n",
- (((float)tv_diff.tv_sec) * 1000) + ((float)tv_diff.tv_usec)
- / 1000.0);
+ if ((((float)tv_diff.tv_usec) / 100.0) > 5.0) {
+ printf ("OTHERS %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
+ }
#endif
#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
ring_state_restore (instance);
printf ("gather 1");
- memb_state_gather_enter (instance, 5);
+ memb_state_gather_enter (instance);
} else {
instance->my_token_seq = token->token_seq;
token->token_seq += 1;
gettimeofday (&tv_current, NULL);
timersub (&tv_current, &tv_old, &tv_diff);
memcpy (&tv_old, &tv_current, sizeof (struct timeval));
- log_printf (instance->totemsrp_log_level_notice,
- "I held %0.4f ms\n",
- ((float)tv_diff.tv_usec) / 1000.0);
+ if ((((float)tv_diff.tv_usec) / 100.0) > 5.0) {
+ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
+ }
#endif
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
messages_deliver_to_app (instance, 0,
1,
instance->my_deliver_memb_list,
instance->my_deliver_memb_entries) == 0) {
-
- instance->my_high_delivered = my_high_delivered_stored + i;
+ instance->my_high_delivered = my_high_delivered_stored + i;
continue;
}
&mcast_header.system_from, 1,
instance->my_proc_list, &instance->my_proc_list_entries);
printf ("gather 2");
- memb_state_gather_enter (instance, 6);
+ memb_state_gather_enter (instance);
break;
case MEMB_STATE_GATHER:
memb_set_merge (&mcast_header.system_from, 1,
instance->my_proc_list, &instance->my_proc_list_entries);
- memb_state_gather_enter (instance, 7);
+ memb_state_gather_enter (instance);
return (0);
}
break;
case MEMB_STATE_OPERATIONAL:
memb_set_merge (&memb_merge_detect->system_from, 1,
instance->my_proc_list, &instance->my_proc_list_entries);
- memb_state_gather_enter (instance, 8);
+printf ("gather 3");
+ memb_state_gather_enter (instance);
break;
case MEMB_STATE_GATHER:
memb_set_merge (&memb_merge_detect->system_from, 1,
instance->my_proc_list, &instance->my_proc_list_entries);
- memb_state_gather_enter (instance, 9);
+printf ("gather 4");
+ memb_state_gather_enter (instance);
return (0);
}
break;
struct totemsrp_instance *instance,
struct memb_join *memb_join)
{
- unsigned char *commit_token_storage[TOKEN_SIZE_MAX];
+ unsigned char *commit_token_storage[32000];
struct memb_commit_token *my_commit_token =
(struct memb_commit_token *)commit_token_storage;
struct srp_addr *proc_list;
memb_join->failed_list_entries,
instance->my_failed_list, &instance->my_failed_list_entries);
}
- memb_state_gather_enter (instance, 10);
+ memb_state_gather_enter (instance);
return (1); /* gather entered */
}
return (0); /* gather not entered */
gather_entered = memb_join_process (instance,
memb_join);
if (gather_entered == 0) {
- memb_state_gather_enter (instance, 11);
+ memb_state_gather_enter (instance);
}
break;
memb_join->ring_seq >= instance->my_ring_id.seq) {
memb_join_process (instance, memb_join);
- memb_state_gather_enter (instance, 12);
+ memb_state_gather_enter (instance);
}
break;
ring_state_restore (instance);
memb_join_process (instance, memb_join);
- memb_state_gather_enter (instance, 13);
+ memb_state_gather_enter (instance);
}
break;
}
struct srp_addr *addr;
struct memb_commit_token_memb_entry *memb_list;
- log_printf (instance->totemsrp_log_level_debug,
- "got commit token\n");
-
if (endian_conversion_needed) {
memb_commit_token = memb_commit_token_convert;
memb_commit_token_endian_convert (msg, memb_commit_token);
}
if (instance->iface_changes >= instance->totem_config->interface_count) {
- memb_state_gather_enter (instance, 14);
+ memb_state_gather_enter (instance);
}
}