instance->last_msg_seq_num++;
- if (msg_create_node_list(&send_buffer->buffer, 1, instance->last_msg_seq_num,
+ if (msg_create_node_list(&send_buffer->buffer, instance->last_msg_seq_num,
(initial ? TLV_NODE_LIST_TYPE_INITIAL_CONFIG : TLV_NODE_LIST_TYPE_CHANGED_CONFIG),
0, NULL, send_config_version, config_version, 0, TLV_QUORATE_INQUORATE, &nlist) == 0) {
qdevice_net_log(LOG_ERR, "Can't allocate send buffer for config list msg");
send_config_version = qdevice_net_get_cmap_config_version(instance->cmap_handle,
&config_version);
- if (msg_create_node_list(&send_buffer->buffer, 1, instance->last_msg_seq_num,
+ if (msg_create_node_list(&send_buffer->buffer, instance->last_msg_seq_num,
TLV_NODE_LIST_TYPE_MEMBERSHIP,
- 1, ring_id, send_config_version, config_version, 0, TLV_QUORATE_INQUORATE, &nlist) == 0) {
+ 1, ring_id, send_config_version, config_version, 1, quorate, &nlist) == 0) {
qdevice_net_log(LOG_ERR, "Can't allocate send buffer for config list msg");
node_list_free(&nlist);
return (0);
}
+ if (!msg->seq_number_set) {
+ qnetd_log(LOG_ERR, "Received node list message without seq number set. "
+ "Sending error reply.");
+
+ if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
+ TLV_REPLY_ERROR_CODE_DOESNT_CONTAIN_REQUIRED_OPTION) != 0) {
+ return (-1);
+ }
+
+ return (0);
+ }
+
switch (msg->node_list_type) {
case TLV_NODE_LIST_TYPE_INITIAL_CONFIG:
- reply_error_code = qnetd_algorithm_config_node_list_received(client,
- &msg->nodes, 1, &result_vote);
- break;
case TLV_NODE_LIST_TYPE_CHANGED_CONFIG:
+ reply_error_code = qnetd_algorithm_config_node_list_received(client,
+ msg->seq_number, msg->config_version_set, msg->config_version,
+ &msg->nodes,
+ (msg->node_list_type == TLV_NODE_LIST_TYPE_INITIAL_CONFIG),
+ &result_vote);
break;
case TLV_NODE_LIST_TYPE_MEMBERSHIP:
+ if (!msg->ring_id_set) {
+ qnetd_log(LOG_ERR, "Received node list message without ring id number set. "
+ "Sending error reply.");
+
+ if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
+ TLV_REPLY_ERROR_CODE_DOESNT_CONTAIN_REQUIRED_OPTION) != 0) {
+ return (-1);
+ }
+
+ return (0);
+ }
+
+ if (!msg->quorate_set) {
+ qnetd_log(LOG_ERR, "Received node list message without quorate set. "
+ "Sending error reply.");
+
+ if (qnetd_client_send_err(client, msg->seq_number_set, msg->seq_number,
+ TLV_REPLY_ERROR_CODE_DOESNT_CONTAIN_REQUIRED_OPTION) != 0) {
+ return (-1);
+ }
+
+ return (0);
+ }
+
reply_error_code = qnetd_algorithm_membership_node_list_received(client,
- &msg->nodes, &result_vote);
+ msg->seq_number, msg->config_version_set, msg->config_version,
+ &msg->ring_id, msg->quorate, &msg->nodes, &result_vote);
break;
default:
errx(1, "qnetd_client_msg_received_node_list fatal error. "
return (0);
}
+ /*
+ * Store node list for future use
+ */
+ switch (msg->node_list_type) {
+ case TLV_NODE_LIST_TYPE_INITIAL_CONFIG:
+ case TLV_NODE_LIST_TYPE_CHANGED_CONFIG:
+ node_list_free(&client->configuration_node_list);
+ if (node_list_clone(&client->configuration_node_list, &msg->nodes) == -1) {
+ qnetd_log(LOG_ERR, "Can't alloc config node list clone. "
+ "Disconnecting client connection.");
+
+ return (-1);
+ }
+ break;
+ case TLV_NODE_LIST_TYPE_MEMBERSHIP:
+ node_list_free(&client->last_membership_node_list);
+ if (node_list_clone(&client->last_membership_node_list, &msg->nodes) == -1) {
+ qnetd_log(LOG_ERR, "Can't alloc membership node list clone. "
+ "Disconnecting client connection.");
+
+ return (-1);
+ }
+ break;
+ default:
+ errx(1, "qnetd_client_msg_received_node_list fatal error. "
+ "Unhandled node_list_type");
+ break;
+ }
+
send_buffer = send_buffer_list_get_new(&client->send_buffer_list);
if (send_buffer == NULL) {
qnetd_log(LOG_ERR, "Can't alloc node list reply msg from list. "
return (-1);
}
- if (msg_create_node_list_reply(&send_buffer->buffer, msg->seq_number_set, msg->seq_number,
- result_vote) == -1) {
+ if (msg_create_node_list_reply(&send_buffer->buffer, msg->seq_number, result_vote) == -1) {
qnetd_log(LOG_ERR, "Can't alloc node list reply msg. "
"Disconnecting client connection.");
}
static void
-qnetd_client_disconnect(struct qnetd_instance *instance, struct qnetd_client *client)
+qnetd_client_disconnect(struct qnetd_instance *instance, struct qnetd_client *client,
+ int server_going_down)
{
+ qnetd_algorithm_client_disconnect(client, server_going_down);
PR_Close(client->socket);
qnetd_client_list_del(&instance->clients, client);
}
* If client is scheduled for disconnect, disconnect it
*/
if (client_disconnect) {
- qnetd_client_disconnect(instance, client);
+ qnetd_client_disconnect(instance, client, 0);
}
}
}
while (client != NULL) {
client_next = TAILQ_NEXT(client, entries);
- qnetd_client_disconnect(instance, client);
+ qnetd_client_disconnect(instance, client, 1);
client = client_next;
}
}
size_t
-msg_create_node_list(struct dynar *msg, int add_msg_seq_number,
+msg_create_node_list(struct dynar *msg,
uint32_t msg_seq_number, enum tlv_node_list_type node_list_type,
int add_ring_id, const struct tlv_ring_id *ring_id,
int add_config_version, uint64_t config_version,
msg_add_type(msg, MSG_TYPE_NODE_LIST);
msg_add_len(msg);
- if (add_msg_seq_number) {
- if (tlv_add_msg_seq_number(msg, msg_seq_number) == -1) {
- goto small_buf_err;
- }
+ if (tlv_add_msg_seq_number(msg, msg_seq_number) == -1) {
+ goto small_buf_err;
}
if (tlv_add_node_list_type(msg, node_list_type) == -1) {
}
}
+ if (add_quorate) {
+ if (tlv_add_quorate(msg, quorate) == -1) {
+ goto small_buf_err;
+ }
+ }
+
TAILQ_FOREACH(node_info, nodes, entries) {
node_list_entry_to_tlv_node_info(node_info, &tlv_ni);
}
size_t
-msg_create_node_list_reply(struct dynar *msg, int add_msg_seq_number, uint32_t msg_seq_number,
+msg_create_node_list_reply(struct dynar *msg, uint32_t msg_seq_number,
enum tlv_vote vote)
{
msg_add_type(msg, MSG_TYPE_NODE_LIST_REPLY);
msg_add_len(msg);
- if (add_msg_seq_number) {
- if (tlv_add_msg_seq_number(msg, msg_seq_number) == -1) {
- goto small_buf_err;
- }
+ if (tlv_add_msg_seq_number(msg, msg_seq_number) == -1) {
+ goto small_buf_err;
}
if (tlv_add_vote(msg, vote) == -1) {
const struct dynar *echo_request_msg);
extern size_t msg_create_node_list(struct dynar *msg,
- int add_msg_seq_number, uint32_t msg_seq_number, enum tlv_node_list_type node_list_type,
+ uint32_t msg_seq_number, enum tlv_node_list_type node_list_type,
int add_ring_id, const struct tlv_ring_id *ring_id,
int add_config_version, uint64_t config_version,
int add_quorate, enum tlv_quorate quorate,
const struct node_list *nodes);
-extern size_t msg_create_node_list_reply(struct dynar *msg, int add_msg_seq_number,
- uint32_t msg_seq_number, enum tlv_vote vote);
+extern size_t msg_create_node_list_reply(struct dynar *msg, uint32_t msg_seq_number,
+ enum tlv_vote vote);
extern size_t msg_get_header_length(void);
node_info->node_state));
}
+int
+node_list_clone(struct node_list *dst_list, const struct node_list *src_list)
+{
+ struct node_list_entry *node_entry;
+
+ node_list_init(dst_list);
+
+ TAILQ_FOREACH(node_entry, src_list, entries) {
+ if (node_list_add(dst_list, node_entry->node_id, node_entry->data_center_id,
+ node_entry->node_state) == NULL) {
+ node_list_free(dst_list);
+
+ return (-1);
+ }
+ }
+
+ return (0);
+}
+
void
node_list_entry_to_tlv_node_info(const struct node_list_entry *node,
struct tlv_node_info *node_info)
extern struct node_list_entry *node_list_add_from_node_info(
struct node_list *list, const struct tlv_node_info *node_info);
+extern int node_list_clone(struct node_list *dst_list,
+ const struct node_list *src_list);
+
extern void node_list_free(struct node_list *list);
extern void node_list_del(struct node_list *list,
* Called after client sent configuration node list
* All client fields are already set. Nodes is actual node list, initial is used
* for distrinquish between initial node list and changed node list.
+ * msg_seq_num is 32-bit number set by client. If client sent config file version,
+ * config_version_set is set to 1 and config_version contains valid config file version.
*
* Function has to return result_vote. This can be one of ack/nack, ask_later (client
* should ask later for a vote) or wait_for_reply (client should wait for reply).
*/
enum tlv_reply_error_code
qnetd_algo_test_config_node_list_received(struct qnetd_client *client,
+ uint32_t msg_seq_num, int config_version_set, uint64_t config_version,
const struct node_list *nodes, int initial, enum tlv_vote *result_vote)
{
"sent %s node list.", client, client->cluster_name, client->node_id,
(initial ? "initial" : "changed"));
+ qnetd_log(LOG_INFO, "algo-test: msg seq num %"PRIu32, msg_seq_num);
+
+ if (config_version_set) {
+ qnetd_log(LOG_INFO, "algo-test: config version %"PRIu64, config_version);
+ }
+
qnetd_algo_dump_node_list(client, nodes);
*result_vote = TLV_VOTE_ACK;
return (TLV_REPLY_ERROR_CODE_NO_ERROR);
}
+/*
+ * Called after client sent membership node list.
+ * All client fields are already set. Nodes is actual node list.
+ * msg_seq_num is 32-bit number set by client. If client sent config file version,
+ * config_version_set is set to 1 and config_version contains valid config file version.
+ * ring_id and quorate are copied from client votequorum callback.
+ *
+ * Function has to return result_vote. This can be one of ack/nack, ask_later (client
+ * should ask later for a vote) or wait_for_reply (client should wait for reply).
+ *
+ * Return TLV_REPLY_ERROR_CODE_NO_ERROR on success, different TLV_REPLY_ERROR_CODE_*
+ * on failure (error is send back to client)
+ */
+
enum tlv_reply_error_code
qnetd_algo_test_membership_node_list_received(struct qnetd_client *client,
+ uint32_t msg_seq_num, int config_version_set, uint64_t config_version,
+ const struct tlv_ring_id *ring_id, enum tlv_quorate quorate,
const struct node_list *nodes, enum tlv_vote *result_vote)
{
qnetd_log(LOG_INFO, "algo-test: Client %p (cluster %s, node_id %"PRIx32") "
"sent membership node list.", client, client->cluster_name, client->node_id);
+ qnetd_log(LOG_INFO, "algo-test: msg seq num %"PRIu32, msg_seq_num);
+
+ if (config_version_set) {
+ qnetd_log(LOG_INFO, "algo-test: config version = %"PRIu64, config_version);
+ }
+
+ qnetd_log(LOG_INFO, "algo-test: ring id = (%"PRIx32".%"PRIx64")",
+ ring_id->node_id, ring_id->seq);
+ qnetd_log(LOG_INFO, "algo-test: quorate = %u", quorate);
+
qnetd_algo_dump_node_list(client, nodes);
*result_vote = TLV_VOTE_ACK;
return (TLV_REPLY_ERROR_CODE_NO_ERROR);
}
+
+void
+qnetd_algo_test_client_disconnect(struct qnetd_client *client, int server_going_down)
+{
+
+ qnetd_log(LOG_INFO, "algo-test: Client %p (cluster %s, node_id %"PRIx32") "
+ "disconnect", client, client->cluster_name, client->node_id);
+
+ qnetd_log(LOG_INFO, "algo-test: server going down %u", server_going_down);
+
+ free(client->algorithm_data);
+}
extern enum tlv_reply_error_code qnetd_algo_test_client_init(struct qnetd_client *client);
extern enum tlv_reply_error_code qnetd_algo_test_config_node_list_received(
- struct qnetd_client *client, const struct node_list *nodes, int initial,
+ struct qnetd_client *client, uint32_t msg_seq_num, int config_version_set,
+ uint64_t config_version, const struct node_list *nodes, int initial,
enum tlv_vote *result_vote);
extern enum tlv_reply_error_code qnetd_algo_test_membership_node_list_received(
- struct qnetd_client *client, const struct node_list *nodes, enum tlv_vote *result_vote);
+ struct qnetd_client *client, uint32_t msg_seq_num, int config_version_set,
+ uint64_t config_version, const struct tlv_ring_id *ring_id, enum tlv_quorate quorate,
+ const struct node_list *nodes, enum tlv_vote *result_vote);
+
+extern void qnetd_algo_test_client_disconnect(
+ struct qnetd_client *client, int server_going_down);
#ifdef __cplusplus
}
enum tlv_reply_error_code
qnetd_algorithm_config_node_list_received(struct qnetd_client *client,
+ uint32_t msg_seq_num, int config_version_set, uint64_t config_version,
const struct node_list *nodes, int initial, enum tlv_vote *result_vote)
{
switch (client->decision_algorithm) {
case TLV_DECISION_ALGORITHM_TYPE_TEST:
- return (qnetd_algo_test_config_node_list_received(client, nodes, initial,
- result_vote));
+ return (qnetd_algo_test_config_node_list_received(client, msg_seq_num,
+ config_version_set, config_version, nodes, initial, result_vote));
break;
default:
errx(1, "qnetd_algorithm_config_node_list_received unhandled "
enum tlv_reply_error_code
qnetd_algorithm_membership_node_list_received(struct qnetd_client *client,
+ uint32_t msg_seq_num, int config_version_set, uint64_t config_version,
+ const struct tlv_ring_id *ring_id, enum tlv_quorate quorate,
const struct node_list *nodes, enum tlv_vote *result_vote)
{
switch (client->decision_algorithm) {
case TLV_DECISION_ALGORITHM_TYPE_TEST:
- return (qnetd_algo_test_membership_node_list_received(client, nodes,
- result_vote));
+ return (qnetd_algo_test_membership_node_list_received(client, msg_seq_num,
+ config_version_set, config_version, ring_id, quorate, nodes, result_vote));
break;
default:
errx(1, "qnetd_algorithm_membership_node_list_received unhandled "
return (TLV_REPLY_ERROR_CODE_INTERNAL_ERROR);
}
+
+void
+qnetd_algorithm_client_disconnect(struct qnetd_client *client, int server_going_down)
+{
+
+ switch (client->decision_algorithm) {
+ case TLV_DECISION_ALGORITHM_TYPE_TEST:
+ qnetd_algo_test_client_disconnect(client, server_going_down);
+ break;
+ default:
+ errx(1, "qnetd_algorithm_client_disconnect unhandled decision algorithm");
+ break;
+ }
+}
extern enum tlv_reply_error_code qnetd_algorithm_client_init(struct qnetd_client *client);
extern enum tlv_reply_error_code qnetd_algorithm_config_node_list_received(
- struct qnetd_client *client, const struct node_list *nodes, int initial,
+ struct qnetd_client *client, uint32_t msg_seq_num, int config_version_set,
+ uint64_t config_version, const struct node_list *nodes, int initial,
enum tlv_vote *result_vote);
extern enum tlv_reply_error_code qnetd_algorithm_membership_node_list_received(
- struct qnetd_client *client, const struct node_list *nodes, enum tlv_vote *result_vote);
+ struct qnetd_client *client, uint32_t msg_seq_num, int config_version_set,
+ uint64_t config_version, const struct tlv_ring_id *ring_id, enum tlv_quorate quorate,
+ const struct node_list *nodes, enum tlv_vote *result_vote);
+
+extern void qnetd_algorithm_client_disconnect(
+ struct qnetd_client *client, int server_going_down);
#ifdef __cplusplus
}
memcpy(&client->addr, addr, sizeof(*addr));
dynar_init(&client->receive_buffer, max_receive_size);
send_buffer_list_init(&client->send_buffer_list, max_send_buffers, max_send_size);
+ node_list_init(&client->configuration_node_list);
+ node_list_init(&client->last_membership_node_list);
}
void
qnetd_client_destroy(struct qnetd_client *client)
{
- dynar_destroy(&client->receive_buffer);
+ node_list_free(&client->last_membership_node_list);
+ node_list_free(&client->configuration_node_list);
send_buffer_list_free(&client->send_buffer_list);
+ dynar_destroy(&client->receive_buffer);
}
uint32_t heartbeat_interval;
enum tlv_reply_error_code skipping_msg_reason;
void *algorithm_data;
- struct node_list configuration_nodes;
- struct node_list membership_nodes;
+ struct node_list configuration_node_list;
+ struct node_list last_membership_node_list;
TAILQ_ENTRY(qnetd_client) entries;
};