2 * Copyright (c) 2003-2005 MontaVista Software, Inc.
3 * Copyright (c) 2005 OSDL.
4 * Copyright (c) 2006-2012 Red Hat, Inc.
8 * Author: Steven Dake (sdake@redhat.com)
9 * Author: Mark Haverkamp (markh@osdl.org)
11 * This software licensed under BSD license, the text of which follows:
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
39 * FRAGMENTATION AND PACKING ALGORITHM:
41 * Assemble the entire message into one buffer
43 * store fragment into lengths list
44 * for each full fragment
46 * set length and fragment fields of pg mesage
47 * store remaining multicast into head of fragmentation data and set lens field
49 * If a message exceeds the maximum packet size allowed by the totem
50 * single ring protocol, the protocol could lose forward progress.
51 * Statically calculating the allowed data amount doesn't work because
52 * the amount of data allowed depends on the number of fragments in
53 * each message. In this implementation, the maximum fragment size
54 * is dynamically calculated for each fragment added to the message.
56 * It is possible for a message to be two bytes short of the maximum
57 * packet size. This occurs when a message or collection of
58 * messages + the mcast header + the lens are two bytes short of the
59 * end of the packet. Since another len field consumes two bytes, the
60 * len field would consume the rest of the packet without room for data.
62 * One optimization would be to forgo the final len field and determine
63 * it from the size of the udp datagram. Then this condition would no
68 * ASSEMBLY AND UNPACKING ALGORITHM:
70 * copy incoming packet into assembly data buffer indexed by current
71 * location of end of fragment
74 * deliver all messages in assembly data buffer
76 * if msg_count > 1 and fragmented
77 * deliver all messages except last message in assembly data buffer
78 * copy last fragmented section to start of assembly data buffer
80 * if msg_count = 1 and fragmented
90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <netinet/in.h>
93 #include <arpa/inet.h>
103 #include <corosync/swab.h>
104 #include <qb/qblist.h>
105 #include <qb/qbloop.h>
106 #include <qb/qbipcs.h>
107 #include <corosync/totem/totempg.h>
108 #define LOGSYS_UTILS_ONLY 1
109 #include <corosync/logsys.h>
112 #include "totemsrp.h"
114 struct totempg_mcast_header
{
119 #if !(defined(__i386__) || defined(__x86_64__))
121 * Need align on architectures different then i386 or x86_64
123 #define TOTEMPG_NEED_ALIGN 1
127 * totempg_mcast structure
129 * header: Identify the mcast.
130 * fragmented: Set if this message continues into next message
131 * continuation: Set if this message is a continuation from last message
132 * msg_count Indicates how many packed messages are contained
134 * Also, the size of each packed message and the messages themselves are
135 * appended to the end of this structure when sent.
137 struct totempg_mcast
{
138 struct totempg_mcast_header header
;
139 unsigned char fragmented
;
140 unsigned char continuation
;
141 unsigned short msg_count
;
143 * short msg_len[msg_count];
151 * Maximum packet size for totem pg messages
153 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
154 sizeof (struct totempg_mcast))
157 * Local variables used for packing small messages
159 static unsigned short mcast_packed_msg_lens
[FRAME_SIZE_MAX
];
161 static int mcast_packed_msg_count
= 0;
163 static int totempg_reserved
= 1;
165 static unsigned int totempg_size_limit
;
167 static totem_queue_level_changed_fn totem_queue_level_changed
= NULL
;
169 static uint32_t totempg_threaded_mode
= 0;
171 static void *totemsrp_context
;
174 * Function and data used to log messages
176 static int totempg_log_level_security
;
177 static int totempg_log_level_error
;
178 static int totempg_log_level_warning
;
179 static int totempg_log_level_notice
;
180 static int totempg_log_level_debug
;
181 static int totempg_subsys_id
;
182 static void (*totempg_log_printf
) (
185 const char *function
,
188 const char *format
, ...) __attribute__((format(printf
, 6, 7)));
190 struct totem_config
*totempg_totem_config
;
192 static totempg_stats_t totempg_stats
;
194 enum throw_away_mode
{
201 unsigned char data
[MESSAGE_SIZE_MAX
+KNET_MAX_PACKET_SIZE
];
203 unsigned char last_frag_num
;
204 enum throw_away_mode throw_away_mode
;
205 struct qb_list_head list
;
208 static void assembly_deref (struct assembly
*assembly
);
210 static int callback_token_received_fn (enum totem_callback_token_type type
,
213 QB_LIST_DECLARE(assembly_list_inuse
);
216 * Free list is used both for transitional and operational assemblies
218 QB_LIST_DECLARE(assembly_list_free
);
220 QB_LIST_DECLARE(assembly_list_inuse_trans
);
222 QB_LIST_DECLARE(totempg_groups_list
);
225 * Staging buffer for packed messages. Messages are staged in this buffer
226 * before sending. Multiple messages may fit which cuts down on the
227 * number of mcasts sent. If a message doesn't completely fit, then
228 * the mcast header has a fragment bit set that says that there are more
229 * data to follow. fragment_size is an index into the buffer. It indicates
230 * the size of message data and where to place new message data.
231 * fragment_contuation indicates whether the first packed message in
232 * the buffer is a continuation of a previously packed fragment.
234 static unsigned char *fragmentation_data
;
236 static int fragment_size
= 0;
238 static int fragment_continuation
= 0;
240 static int totempg_waiting_transack
= 0;
242 struct totempg_group_instance
{
246 unsigned int msg_len
,
247 int endian_conversion_required
);
250 enum totem_configuration_type configuration_type
,
251 const unsigned int *member_list
, size_t member_list_entries
,
252 const unsigned int *left_list
, size_t left_list_entries
,
253 const unsigned int *joined_list
, size_t joined_list_entries
,
254 const struct memb_ring_id
*ring_id
);
256 struct totempg_group
*groups
;
261 struct qb_list_head list
;
264 static unsigned char next_fragment
= 1;
266 static pthread_mutex_t totempg_mutex
= PTHREAD_MUTEX_INITIALIZER
;
268 static pthread_mutex_t callback_token_mutex
= PTHREAD_MUTEX_INITIALIZER
;
270 static pthread_mutex_t mcast_msg_mutex
= PTHREAD_MUTEX_INITIALIZER
;
272 #define log_printf(level, format, args...) \
274 totempg_log_printf(level, \
276 __FUNCTION__, __FILE__, __LINE__, \
280 static int msg_count_send_ok (int msg_count
);
282 static int byte_count_send_ok (int byte_count
);
284 static void totempg_waiting_trans_ack_cb (int waiting_trans_ack
)
286 log_printf(LOG_DEBUG
, "waiting_trans_ack changed to %u", waiting_trans_ack
);
287 totempg_waiting_transack
= waiting_trans_ack
;
290 static struct assembly
*assembly_ref (unsigned int nodeid
)
292 struct assembly
*assembly
;
293 struct qb_list_head
*list
;
294 struct qb_list_head
*active_assembly_list_inuse
;
296 if (totempg_waiting_transack
) {
297 active_assembly_list_inuse
= &assembly_list_inuse_trans
;
299 active_assembly_list_inuse
= &assembly_list_inuse
;
303 * Search inuse list for node id and return assembly buffer if found
305 qb_list_for_each(list
, active_assembly_list_inuse
) {
306 assembly
= qb_list_entry (list
, struct assembly
, list
);
308 if (nodeid
== assembly
->nodeid
) {
314 * Nothing found in inuse list get one from free list if available
316 if (qb_list_empty (&assembly_list_free
) == 0) {
317 assembly
= qb_list_first_entry (&assembly_list_free
, struct assembly
, list
);
318 qb_list_del (&assembly
->list
);
319 qb_list_add (&assembly
->list
, active_assembly_list_inuse
);
320 assembly
->nodeid
= nodeid
;
322 assembly
->last_frag_num
= 0;
323 assembly
->throw_away_mode
= THROW_AWAY_INACTIVE
;
328 * Nothing available in inuse or free list, so allocate a new one
330 assembly
= malloc (sizeof (struct assembly
));
332 * TODO handle memory allocation failure here
335 assembly
->nodeid
= nodeid
;
336 assembly
->data
[0] = 0;
338 assembly
->last_frag_num
= 0;
339 assembly
->throw_away_mode
= THROW_AWAY_INACTIVE
;
340 qb_list_init (&assembly
->list
);
341 qb_list_add (&assembly
->list
, active_assembly_list_inuse
);
346 static void assembly_deref (struct assembly
*assembly
)
348 qb_list_del (&assembly
->list
);
349 qb_list_add (&assembly
->list
, &assembly_list_free
);
352 static void assembly_deref_from_normal_and_trans (int nodeid
)
355 struct qb_list_head
*list
, *tmp_iter
;
356 struct qb_list_head
*active_assembly_list_inuse
;
357 struct assembly
*assembly
;
359 for (j
= 0; j
< 2; j
++) {
361 active_assembly_list_inuse
= &assembly_list_inuse
;
363 active_assembly_list_inuse
= &assembly_list_inuse_trans
;
366 qb_list_for_each_safe(list
, tmp_iter
, active_assembly_list_inuse
) {
367 assembly
= qb_list_entry (list
, struct assembly
, list
);
369 if (nodeid
== assembly
->nodeid
) {
370 qb_list_del (&assembly
->list
);
371 qb_list_add (&assembly
->list
, &assembly_list_free
);
378 static inline void app_confchg_fn (
379 enum totem_configuration_type configuration_type
,
380 const unsigned int *member_list
, size_t member_list_entries
,
381 const unsigned int *left_list
, size_t left_list_entries
,
382 const unsigned int *joined_list
, size_t joined_list_entries
,
383 const struct memb_ring_id
*ring_id
)
386 struct totempg_group_instance
*instance
;
387 struct qb_list_head
*list
;
390 * For every leaving processor, add to free list
391 * This also has the side effect of clearing out the dataset
392 * In the leaving processor's assembly buffer.
394 for (i
= 0; i
< left_list_entries
; i
++) {
395 assembly_deref_from_normal_and_trans (left_list
[i
]);
398 qb_list_for_each(list
, &totempg_groups_list
) {
399 instance
= qb_list_entry (list
, struct totempg_group_instance
, list
);
401 if (instance
->confchg_fn
) {
402 instance
->confchg_fn (
415 static inline void group_endian_convert (
419 unsigned short *group_len
;
423 #ifdef TOTEMPG_NEED_ALIGN
425 * Align data structure for not i386 or x86_64
427 if ((size_t)msg
% sizeof(char *) != 0) {
428 aligned_msg
= alloca(msg_len
);
429 memcpy(aligned_msg
, msg
, msg_len
);
437 group_len
= (unsigned short *)aligned_msg
;
438 group_len
[0] = swab16(group_len
[0]);
439 for (i
= 1; i
< group_len
[0] + 1; i
++) {
440 group_len
[i
] = swab16(group_len
[i
]);
443 if (aligned_msg
!= msg
) {
444 memcpy(msg
, aligned_msg
, msg_len
);
448 static inline int group_matches (
450 unsigned int iov_len
,
451 struct totempg_group
*groups_b
,
452 unsigned int group_b_cnt
,
453 unsigned int *adjust_iovec
)
455 unsigned short *group_len
;
459 #ifdef TOTEMPG_NEED_ALIGN
460 struct iovec iovec_aligned
= { NULL
, 0 };
463 assert (iov_len
== 1);
465 #ifdef TOTEMPG_NEED_ALIGN
467 * Align data structure for not i386 or x86_64
469 if ((size_t)iovec
->iov_base
% sizeof(char *) != 0) {
470 iovec_aligned
.iov_base
= alloca(iovec
->iov_len
);
471 memcpy(iovec_aligned
.iov_base
, iovec
->iov_base
, iovec
->iov_len
);
472 iovec_aligned
.iov_len
= iovec
->iov_len
;
473 iovec
= &iovec_aligned
;
477 group_len
= (unsigned short *)iovec
->iov_base
;
478 group_name
= ((char *)iovec
->iov_base
) +
479 sizeof (unsigned short) * (group_len
[0] + 1);
483 * Calculate amount to adjust the iovec by before delivering to app
485 *adjust_iovec
= sizeof (unsigned short) * (group_len
[0] + 1);
486 for (i
= 1; i
< group_len
[0] + 1; i
++) {
487 *adjust_iovec
+= group_len
[i
];
491 * Determine if this message should be delivered to this instance
493 for (i
= 1; i
< group_len
[0] + 1; i
++) {
494 for (j
= 0; j
< group_b_cnt
; j
++) {
495 if ((group_len
[i
] == groups_b
[j
].group_len
) &&
496 (memcmp (groups_b
[j
].group
, group_name
, group_len
[i
]) == 0)) {
500 group_name
+= group_len
[i
];
506 static inline void app_deliver_fn (
509 unsigned int msg_len
,
510 int endian_conversion_required
)
512 struct totempg_group_instance
*instance
;
513 struct iovec stripped_iovec
;
514 unsigned int adjust_iovec
;
516 struct qb_list_head
*list
;
518 struct iovec aligned_iovec
= { NULL
, 0 };
520 if (endian_conversion_required
) {
521 group_endian_convert (msg
, msg_len
);
525 * TODO: segmentation/assembly need to be redesigned to provide aligned access
526 * in all cases to avoid memory copies on non386 archs. Probably broke backwars
530 #ifdef TOTEMPG_NEED_ALIGN
532 * Align data structure for not i386 or x86_64
534 aligned_iovec
.iov_base
= alloca(msg_len
);
535 aligned_iovec
.iov_len
= msg_len
;
536 memcpy(aligned_iovec
.iov_base
, msg
, msg_len
);
538 aligned_iovec
.iov_base
= msg
;
539 aligned_iovec
.iov_len
= msg_len
;
542 iovec
= &aligned_iovec
;
544 qb_list_for_each(list
, &totempg_groups_list
) {
545 instance
= qb_list_entry (list
, struct totempg_group_instance
, list
);
546 if (group_matches (iovec
, 1, instance
->groups
, instance
->groups_cnt
, &adjust_iovec
)) {
547 stripped_iovec
.iov_len
= iovec
->iov_len
- adjust_iovec
;
548 stripped_iovec
.iov_base
= (char *)iovec
->iov_base
+ adjust_iovec
;
550 #ifdef TOTEMPG_NEED_ALIGN
552 * Align data structure for not i386 or x86_64
554 if ((uintptr_t)((char *)iovec
->iov_base
+ adjust_iovec
) % (sizeof(char *)) != 0) {
556 * Deal with misalignment
558 stripped_iovec
.iov_base
=
559 alloca (stripped_iovec
.iov_len
);
560 memcpy (stripped_iovec
.iov_base
,
561 (char *)iovec
->iov_base
+ adjust_iovec
,
562 stripped_iovec
.iov_len
);
565 instance
->deliver_fn (
567 stripped_iovec
.iov_base
,
568 stripped_iovec
.iov_len
,
569 endian_conversion_required
);
574 static void totempg_confchg_fn (
575 enum totem_configuration_type configuration_type
,
576 const unsigned int *member_list
, size_t member_list_entries
,
577 const unsigned int *left_list
, size_t left_list_entries
,
578 const unsigned int *joined_list
, size_t joined_list_entries
,
579 const struct memb_ring_id
*ring_id
)
581 // TODO optimize this
582 app_confchg_fn (configuration_type
,
583 member_list
, member_list_entries
,
584 left_list
, left_list_entries
,
585 joined_list
, joined_list_entries
,
589 static void totempg_deliver_fn (
592 unsigned int msg_len
,
593 int endian_conversion_required
)
595 struct totempg_mcast
*mcast
;
596 unsigned short *msg_lens
;
598 struct assembly
*assembly
;
599 char header
[FRAME_SIZE_MAX
];
605 struct iovec iov_delv
;
606 size_t expected_msg_len
;
608 assembly
= assembly_ref (nodeid
);
611 if (msg_len
< sizeof(struct totempg_mcast
)) {
612 log_printf(LOG_WARNING
,
613 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID
" is too short... Ignoring.", nodeid
);
619 * Assemble the header into one block of data and
620 * assemble the packet contents into one block of data to simplify delivery
623 mcast
= (struct totempg_mcast
*)msg
;
624 if (endian_conversion_required
) {
625 mcast
->msg_count
= swab16 (mcast
->msg_count
);
628 msg_count
= mcast
->msg_count
;
629 datasize
= sizeof (struct totempg_mcast
) +
630 msg_count
* sizeof (unsigned short);
632 if (msg_len
< datasize
) {
633 log_printf(LOG_WARNING
,
634 "Message (totempg_mcast datasize) received from node " CS_PRI_NODE_ID
635 " is too short... Ignoring.", nodeid
);
640 memcpy (header
, msg
, datasize
);
643 msg_lens
= (unsigned short *) (header
+ sizeof (struct totempg_mcast
));
644 expected_msg_len
= datasize
;
645 for (i
= 0; i
< mcast
->msg_count
; i
++) {
646 if (endian_conversion_required
) {
647 msg_lens
[i
] = swab16 (msg_lens
[i
]);
650 expected_msg_len
+= msg_lens
[i
];
653 if (msg_len
!= expected_msg_len
) {
654 log_printf(LOG_WARNING
,
655 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID
656 " doesn't have expected length of %zu (has %u) bytes... Ignoring.",
657 nodeid
, expected_msg_len
, msg_len
);
662 assert((assembly
->index
+msg_len
) < sizeof(assembly
->data
));
663 memcpy (&assembly
->data
[assembly
->index
], &data
[datasize
],
667 * If the last message in the buffer is a fragment, then we
668 * can't deliver it. We'll first deliver the full messages
669 * then adjust the assembly buffer so we can add the rest of the
670 * fragment when it arrives.
672 msg_count
= mcast
->fragmented
? mcast
->msg_count
- 1 : mcast
->msg_count
;
673 continuation
= mcast
->continuation
;
674 iov_delv
.iov_base
= (void *)&assembly
->data
[0];
675 iov_delv
.iov_len
= assembly
->index
+ msg_lens
[0];
678 * Make sure that if this message is a continuation, that it
679 * matches the sequence number of the previous fragment.
680 * Also, if the first packed message is a continuation
681 * of a previous message, but the assembly buffer
682 * is empty, then we need to discard it since we can't
683 * assemble a complete message. Likewise, if this message isn't a
684 * continuation and the assembly buffer is empty, we have to discard
685 * the continued message.
689 if (assembly
->throw_away_mode
== THROW_AWAY_ACTIVE
) {
690 /* Throw away the first msg block */
691 if (mcast
->fragmented
== 0 || mcast
->fragmented
== 1) {
692 assembly
->throw_away_mode
= THROW_AWAY_INACTIVE
;
694 assembly
->index
+= msg_lens
[0];
695 iov_delv
.iov_base
= (void *)&assembly
->data
[assembly
->index
];
696 iov_delv
.iov_len
= msg_lens
[1];
700 if (assembly
->throw_away_mode
== THROW_AWAY_INACTIVE
) {
701 if (continuation
== assembly
->last_frag_num
) {
702 assembly
->last_frag_num
= mcast
->fragmented
;
703 for (i
= start
; i
< msg_count
; i
++) {
704 app_deliver_fn(nodeid
, iov_delv
.iov_base
, iov_delv
.iov_len
,
705 endian_conversion_required
);
706 assembly
->index
+= msg_lens
[i
];
707 iov_delv
.iov_base
= (void *)&assembly
->data
[assembly
->index
];
708 if (i
< (msg_count
- 1)) {
709 iov_delv
.iov_len
= msg_lens
[i
+ 1];
713 log_printf (LOG_DEBUG
, "fragmented continuation %u is not equal to assembly last_frag_num %u",
714 continuation
, assembly
->last_frag_num
);
715 assembly
->throw_away_mode
= THROW_AWAY_ACTIVE
;
719 if (mcast
->fragmented
== 0) {
721 * End of messages, dereference assembly struct
723 assembly
->last_frag_num
= 0;
725 assembly_deref (assembly
);
728 * Message is fragmented, keep around assembly list
730 if (mcast
->msg_count
> 1) {
731 memmove (&assembly
->data
[0],
732 &assembly
->data
[assembly
->index
],
733 msg_lens
[msg_count
]);
737 assembly
->index
+= msg_lens
[msg_count
];
742 * Totem Process Group Abstraction
743 * depends on poll abstraction, POSIX, IPV4
746 void *callback_token_received_handle
;
748 int callback_token_received_fn (enum totem_callback_token_type type
,
751 struct totempg_mcast mcast
;
752 struct iovec iovecs
[3];
754 if (totempg_threaded_mode
== 1) {
755 pthread_mutex_lock (&mcast_msg_mutex
);
757 if (mcast_packed_msg_count
== 0) {
758 if (totempg_threaded_mode
== 1) {
759 pthread_mutex_unlock (&mcast_msg_mutex
);
763 if (totemsrp_avail(totemsrp_context
) == 0) {
764 if (totempg_threaded_mode
== 1) {
765 pthread_mutex_unlock (&mcast_msg_mutex
);
769 mcast
.header
.version
= 0;
770 mcast
.header
.type
= 0;
771 mcast
.fragmented
= 0;
774 * Was the first message in this buffer a continuation of a
775 * fragmented message?
777 mcast
.continuation
= fragment_continuation
;
778 fragment_continuation
= 0;
780 mcast
.msg_count
= mcast_packed_msg_count
;
782 iovecs
[0].iov_base
= (void *)&mcast
;
783 iovecs
[0].iov_len
= sizeof (struct totempg_mcast
);
784 iovecs
[1].iov_base
= (void *)mcast_packed_msg_lens
;
785 iovecs
[1].iov_len
= mcast_packed_msg_count
* sizeof (unsigned short);
786 iovecs
[2].iov_base
= (void *)&fragmentation_data
[0];
787 iovecs
[2].iov_len
= fragment_size
;
788 (void)totemsrp_mcast (totemsrp_context
, iovecs
, 3, 0);
790 mcast_packed_msg_count
= 0;
793 if (totempg_threaded_mode
== 1) {
794 pthread_mutex_unlock (&mcast_msg_mutex
);
800 * Initialize the totem process group abstraction
802 int totempg_initialize (
803 qb_loop_t
*poll_handle
,
804 struct totem_config
*totem_config
)
808 totempg_totem_config
= totem_config
;
809 totempg_log_level_security
= totem_config
->totem_logging_configuration
.log_level_security
;
810 totempg_log_level_error
= totem_config
->totem_logging_configuration
.log_level_error
;
811 totempg_log_level_warning
= totem_config
->totem_logging_configuration
.log_level_warning
;
812 totempg_log_level_notice
= totem_config
->totem_logging_configuration
.log_level_notice
;
813 totempg_log_level_debug
= totem_config
->totem_logging_configuration
.log_level_debug
;
814 totempg_log_printf
= totem_config
->totem_logging_configuration
.log_printf
;
815 totempg_subsys_id
= totem_config
->totem_logging_configuration
.log_subsys_id
;
817 fragmentation_data
= malloc (TOTEMPG_PACKET_SIZE
);
818 if (fragmentation_data
== 0) {
822 totemsrp_net_mtu_adjust (totem_config
);
824 res
= totemsrp_initialize (
831 totempg_waiting_trans_ack_cb
);
837 totemsrp_callback_token_create (
839 &callback_token_received_handle
,
840 TOTEM_CALLBACK_TOKEN_RECEIVED
,
842 callback_token_received_fn
,
845 totempg_size_limit
= (totemsrp_avail(totemsrp_context
) - 1) *
846 (totempg_totem_config
->net_mtu
-
847 sizeof (struct totempg_mcast
) - 16);
849 qb_list_init (&totempg_groups_list
);
855 void totempg_finalize (void)
857 if (totempg_threaded_mode
== 1) {
858 pthread_mutex_lock (&totempg_mutex
);
860 totemsrp_finalize (totemsrp_context
);
861 if (totempg_threaded_mode
== 1) {
862 pthread_mutex_unlock (&totempg_mutex
);
867 * Multicast a message
869 static int mcast_msg (
870 struct iovec
*iovec_in
,
871 unsigned int iov_len
,
875 struct totempg_mcast mcast
;
876 struct iovec iovecs
[3];
877 struct iovec iovec
[64];
880 int max_packet_size
= 0;
885 if (totempg_threaded_mode
== 1) {
886 pthread_mutex_lock (&mcast_msg_mutex
);
888 totemsrp_event_signal (totemsrp_context
, TOTEM_EVENT_NEW_MSG
, 1);
891 * Remove zero length iovectors from the list
893 assert (iov_len
< 64);
894 for (dest
= 0, src
= 0; src
< iov_len
; src
++) {
895 if (iovec_in
[src
].iov_len
) {
896 memcpy (&iovec
[dest
++], &iovec_in
[src
],
897 sizeof (struct iovec
));
902 max_packet_size
= TOTEMPG_PACKET_SIZE
-
903 (sizeof (unsigned short) * (mcast_packed_msg_count
+ 1));
905 mcast_packed_msg_lens
[mcast_packed_msg_count
] = 0;
908 * Check if we would overwrite new message queue
910 for (i
= 0; i
< iov_len
; i
++) {
911 total_size
+= iovec
[i
].iov_len
;
914 if (byte_count_send_ok (total_size
+ sizeof(unsigned short) *
915 (mcast_packed_msg_count
)) == 0) {
917 if (totempg_threaded_mode
== 1) {
918 pthread_mutex_unlock (&mcast_msg_mutex
);
923 memset(&mcast
, 0, sizeof(mcast
));
925 mcast
.header
.version
= 0;
926 for (i
= 0; i
< iov_len
; ) {
927 mcast
.fragmented
= 0;
928 mcast
.continuation
= fragment_continuation
;
929 copy_len
= iovec
[i
].iov_len
- copy_base
;
932 * If it all fits with room left over, copy it in.
933 * We need to leave at least sizeof(short) + 1 bytes in the
934 * fragment_buffer on exit so that max_packet_size + fragment_size
935 * doesn't exceed the size of the fragment_buffer on the next call.
937 if ((iovec
[i
].iov_len
+ fragment_size
) <
938 (max_packet_size
- sizeof (unsigned short))) {
940 memcpy (&fragmentation_data
[fragment_size
],
941 (char *)iovec
[i
].iov_base
+ copy_base
, copy_len
);
942 fragment_size
+= copy_len
;
943 mcast_packed_msg_lens
[mcast_packed_msg_count
] += copy_len
;
951 * If it just fits or is too big, then send out what fits.
954 unsigned char *data_ptr
;
956 copy_len
= min(copy_len
, max_packet_size
- fragment_size
);
957 if( copy_len
== max_packet_size
)
958 data_ptr
= (unsigned char *)iovec
[i
].iov_base
+ copy_base
;
960 data_ptr
= fragmentation_data
;
963 memcpy (&fragmentation_data
[fragment_size
],
964 (unsigned char *)iovec
[i
].iov_base
+ copy_base
, copy_len
);
965 mcast_packed_msg_lens
[mcast_packed_msg_count
] += copy_len
;
968 * if we're not on the last iovec or the iovec is too large to
969 * fit, then indicate a fragment. This also means that the next
970 * message will have the continuation of this one.
972 if ((i
< (iov_len
- 1)) ||
973 ((copy_base
+ copy_len
) < iovec
[i
].iov_len
)) {
974 if (!next_fragment
) {
977 fragment_continuation
= next_fragment
;
978 mcast
.fragmented
= next_fragment
++;
979 assert(fragment_continuation
!= 0);
980 assert(mcast
.fragmented
!= 0);
982 fragment_continuation
= 0;
986 * assemble the message and send it
988 mcast
.msg_count
= ++mcast_packed_msg_count
;
989 iovecs
[0].iov_base
= (void *)&mcast
;
990 iovecs
[0].iov_len
= sizeof(struct totempg_mcast
);
991 iovecs
[1].iov_base
= (void *)mcast_packed_msg_lens
;
992 iovecs
[1].iov_len
= mcast_packed_msg_count
*
993 sizeof(unsigned short);
994 iovecs
[2].iov_base
= (void *)data_ptr
;
995 iovecs
[2].iov_len
= fragment_size
+ copy_len
;
996 assert (totemsrp_avail(totemsrp_context
) > 0);
997 res
= totemsrp_mcast (totemsrp_context
, iovecs
, 3, guarantee
);
1003 * Recalculate counts and indexes for the next.
1005 mcast_packed_msg_lens
[0] = 0;
1006 mcast_packed_msg_count
= 0;
1008 max_packet_size
= TOTEMPG_PACKET_SIZE
- (sizeof(unsigned short));
1011 * If the iovec all fit, go to the next iovec
1013 if ((copy_base
+ copy_len
) == iovec
[i
].iov_len
) {
1019 * Continue with the rest of the current iovec.
1022 copy_base
+= copy_len
;
1028 * Bump only if we added message data. This may be zero if
1029 * the last buffer just fit into the fragmentation_data buffer
1030 * and we were at the last iovec.
1032 if (mcast_packed_msg_lens
[mcast_packed_msg_count
]) {
1033 mcast_packed_msg_count
++;
1037 if (totempg_threaded_mode
== 1) {
1038 pthread_mutex_unlock (&mcast_msg_mutex
);
1044 * Determine if a message of msg_size could be queued
1046 static int msg_count_send_ok (
1051 avail
= totemsrp_avail (totemsrp_context
);
1052 totempg_stats
.msg_queue_avail
= avail
;
1054 return ((avail
- totempg_reserved
) > msg_count
);
1057 static int byte_count_send_ok (
1060 unsigned int msg_count
= 0;
1063 avail
= totemsrp_avail (totemsrp_context
);
1065 msg_count
= (byte_count
/ (totempg_totem_config
->net_mtu
- sizeof (struct totempg_mcast
) - 16)) + 1;
1067 return (avail
>= msg_count
);
1070 static int send_reserve (
1073 unsigned int msg_count
= 0;
1075 msg_count
= (msg_size
/ (totempg_totem_config
->net_mtu
- sizeof (struct totempg_mcast
) - 16)) + 1;
1076 totempg_reserved
+= msg_count
;
1077 totempg_stats
.msg_reserved
= totempg_reserved
;
1082 static void send_release (
1085 totempg_reserved
-= msg_count
;
1086 totempg_stats
.msg_reserved
= totempg_reserved
;
1089 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1090 #undef MESSAGE_QUEUE_MAX
1091 #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1092 #endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1094 static uint32_t q_level_precent_used(void)
1096 return (100 - (((totemsrp_avail(totemsrp_context
) - totempg_reserved
) * 100) / MESSAGE_QUEUE_MAX
));
1099 int totempg_callback_token_create (
1101 enum totem_callback_token_type type
,
1103 int (*callback_fn
) (enum totem_callback_token_type type
, const void *),
1107 if (totempg_threaded_mode
== 1) {
1108 pthread_mutex_lock (&callback_token_mutex
);
1110 res
= totemsrp_callback_token_create (totemsrp_context
, handle_out
, type
, delete,
1112 if (totempg_threaded_mode
== 1) {
1113 pthread_mutex_unlock (&callback_token_mutex
);
1118 void totempg_callback_token_destroy (
1121 if (totempg_threaded_mode
== 1) {
1122 pthread_mutex_lock (&callback_token_mutex
);
1124 totemsrp_callback_token_destroy (totemsrp_context
, handle_out
);
1125 if (totempg_threaded_mode
== 1) {
1126 pthread_mutex_unlock (&callback_token_mutex
);
1131 * vi: set autoindent tabstop=4 shiftwidth=4 :
1134 int totempg_groups_initialize (
1135 void **totempg_groups_instance
,
1137 void (*deliver_fn
) (
1138 unsigned int nodeid
,
1140 unsigned int msg_len
,
1141 int endian_conversion_required
),
1143 void (*confchg_fn
) (
1144 enum totem_configuration_type configuration_type
,
1145 const unsigned int *member_list
, size_t member_list_entries
,
1146 const unsigned int *left_list
, size_t left_list_entries
,
1147 const unsigned int *joined_list
, size_t joined_list_entries
,
1148 const struct memb_ring_id
*ring_id
))
1150 struct totempg_group_instance
*instance
;
1152 if (totempg_threaded_mode
== 1) {
1153 pthread_mutex_lock (&totempg_mutex
);
1156 instance
= malloc (sizeof (struct totempg_group_instance
));
1157 if (instance
== NULL
) {
1161 instance
->deliver_fn
= deliver_fn
;
1162 instance
->confchg_fn
= confchg_fn
;
1163 instance
->groups
= 0;
1164 instance
->groups_cnt
= 0;
1165 instance
->q_level
= QB_LOOP_MED
;
1166 qb_list_init (&instance
->list
);
1167 qb_list_add (&instance
->list
, &totempg_groups_list
);
1169 if (totempg_threaded_mode
== 1) {
1170 pthread_mutex_unlock (&totempg_mutex
);
1172 *totempg_groups_instance
= instance
;
1176 if (totempg_threaded_mode
== 1) {
1177 pthread_mutex_unlock (&totempg_mutex
);
1182 int totempg_groups_join (
1183 void *totempg_groups_instance
,
1184 const struct totempg_group
*groups
,
1187 struct totempg_group_instance
*instance
= (struct totempg_group_instance
*)totempg_groups_instance
;
1188 struct totempg_group
*new_groups
;
1191 if (totempg_threaded_mode
== 1) {
1192 pthread_mutex_lock (&totempg_mutex
);
1195 new_groups
= realloc (instance
->groups
,
1196 sizeof (struct totempg_group
) *
1197 (instance
->groups_cnt
+ group_cnt
));
1198 if (new_groups
== 0) {
1202 memcpy (&new_groups
[instance
->groups_cnt
],
1203 groups
, group_cnt
* sizeof (struct totempg_group
));
1204 instance
->groups
= new_groups
;
1205 instance
->groups_cnt
+= group_cnt
;
1208 if (totempg_threaded_mode
== 1) {
1209 pthread_mutex_unlock (&totempg_mutex
);
1214 int totempg_groups_leave (
1215 void *totempg_groups_instance
,
1216 const struct totempg_group
*groups
,
1219 if (totempg_threaded_mode
== 1) {
1220 pthread_mutex_lock (&totempg_mutex
);
1223 if (totempg_threaded_mode
== 1) {
1224 pthread_mutex_unlock (&totempg_mutex
);
1229 #define MAX_IOVECS_FROM_APP 32
1230 #define MAX_GROUPS_PER_MSG 32
1232 int totempg_groups_mcast_joined (
1233 void *totempg_groups_instance
,
1234 const struct iovec
*iovec
,
1235 unsigned int iov_len
,
1238 struct totempg_group_instance
*instance
= (struct totempg_group_instance
*)totempg_groups_instance
;
1239 unsigned short group_len
[MAX_GROUPS_PER_MSG
+ 1];
1240 struct iovec iovec_mcast
[MAX_GROUPS_PER_MSG
+ 1 + MAX_IOVECS_FROM_APP
];
1244 if (totempg_threaded_mode
== 1) {
1245 pthread_mutex_lock (&totempg_mutex
);
1249 * Build group_len structure and the iovec_mcast structure
1251 group_len
[0] = instance
->groups_cnt
;
1252 for (i
= 0; i
< instance
->groups_cnt
; i
++) {
1253 group_len
[i
+ 1] = instance
->groups
[i
].group_len
;
1254 iovec_mcast
[i
+ 1].iov_len
= instance
->groups
[i
].group_len
;
1255 iovec_mcast
[i
+ 1].iov_base
= (void *) instance
->groups
[i
].group
;
1257 iovec_mcast
[0].iov_len
= (instance
->groups_cnt
+ 1) * sizeof (unsigned short);
1258 iovec_mcast
[0].iov_base
= group_len
;
1259 for (i
= 0; i
< iov_len
; i
++) {
1260 iovec_mcast
[i
+ instance
->groups_cnt
+ 1].iov_len
= iovec
[i
].iov_len
;
1261 iovec_mcast
[i
+ instance
->groups_cnt
+ 1].iov_base
= iovec
[i
].iov_base
;
1264 res
= mcast_msg (iovec_mcast
, iov_len
+ instance
->groups_cnt
+ 1, guarantee
);
1266 if (totempg_threaded_mode
== 1) {
1267 pthread_mutex_unlock (&totempg_mutex
);
1273 static void check_q_level(
1274 void *totempg_groups_instance
)
1276 struct totempg_group_instance
*instance
= (struct totempg_group_instance
*)totempg_groups_instance
;
1277 int32_t old_level
= instance
->q_level
;
1278 int32_t percent_used
= q_level_precent_used();
1280 if (percent_used
>= 75 && instance
->q_level
!= TOTEM_Q_LEVEL_CRITICAL
) {
1281 instance
->q_level
= TOTEM_Q_LEVEL_CRITICAL
;
1282 } else if (percent_used
< 30 && instance
->q_level
!= TOTEM_Q_LEVEL_LOW
) {
1283 instance
->q_level
= TOTEM_Q_LEVEL_LOW
;
1284 } else if (percent_used
> 40 && percent_used
< 50 && instance
->q_level
!= TOTEM_Q_LEVEL_GOOD
) {
1285 instance
->q_level
= TOTEM_Q_LEVEL_GOOD
;
1286 } else if (percent_used
> 60 && percent_used
< 70 && instance
->q_level
!= TOTEM_Q_LEVEL_HIGH
) {
1287 instance
->q_level
= TOTEM_Q_LEVEL_HIGH
;
1289 if (totem_queue_level_changed
&& old_level
!= instance
->q_level
) {
1290 totem_queue_level_changed(instance
->q_level
);
1294 void totempg_check_q_level(
1295 void *totempg_groups_instance
)
1297 struct totempg_group_instance
*instance
= (struct totempg_group_instance
*)totempg_groups_instance
;
1299 check_q_level(instance
);
1302 int totempg_groups_joined_reserve (
1303 void *totempg_groups_instance
,
1304 const struct iovec
*iovec
,
1305 unsigned int iov_len
)
1307 struct totempg_group_instance
*instance
= (struct totempg_group_instance
*)totempg_groups_instance
;
1308 unsigned int size
= 0;
1310 unsigned int reserved
= 0;
1312 if (totempg_threaded_mode
== 1) {
1313 pthread_mutex_lock (&totempg_mutex
);
1314 pthread_mutex_lock (&mcast_msg_mutex
);
1317 for (i
= 0; i
< instance
->groups_cnt
; i
++) {
1318 size
+= instance
->groups
[i
].group_len
;
1320 for (i
= 0; i
< iov_len
; i
++) {
1321 size
+= iovec
[i
].iov_len
;
1324 if (size
>= totempg_size_limit
) {
1329 if (byte_count_send_ok (size
)) {
1330 reserved
= send_reserve (size
);
1336 check_q_level(instance
);
1338 if (totempg_threaded_mode
== 1) {
1339 pthread_mutex_unlock (&mcast_msg_mutex
);
1340 pthread_mutex_unlock (&totempg_mutex
);
1346 int totempg_groups_joined_release (int msg_count
)
1348 if (totempg_threaded_mode
== 1) {
1349 pthread_mutex_lock (&totempg_mutex
);
1350 pthread_mutex_lock (&mcast_msg_mutex
);
1352 send_release (msg_count
);
1353 if (totempg_threaded_mode
== 1) {
1354 pthread_mutex_unlock (&mcast_msg_mutex
);
1355 pthread_mutex_unlock (&totempg_mutex
);
1360 int totempg_groups_mcast_groups (
1361 void *totempg_groups_instance
,
1363 const struct totempg_group
*groups
,
1365 const struct iovec
*iovec
,
1366 unsigned int iov_len
)
1368 unsigned short group_len
[MAX_GROUPS_PER_MSG
+ 1];
1369 struct iovec iovec_mcast
[MAX_GROUPS_PER_MSG
+ 1 + MAX_IOVECS_FROM_APP
];
1373 if (totempg_threaded_mode
== 1) {
1374 pthread_mutex_lock (&totempg_mutex
);
1378 * Build group_len structure and the iovec_mcast structure
1380 group_len
[0] = groups_cnt
;
1381 for (i
= 0; i
< groups_cnt
; i
++) {
1382 group_len
[i
+ 1] = groups
[i
].group_len
;
1383 iovec_mcast
[i
+ 1].iov_len
= groups
[i
].group_len
;
1384 iovec_mcast
[i
+ 1].iov_base
= (void *) groups
[i
].group
;
1386 iovec_mcast
[0].iov_len
= (groups_cnt
+ 1) * sizeof (unsigned short);
1387 iovec_mcast
[0].iov_base
= group_len
;
1388 for (i
= 0; i
< iov_len
; i
++) {
1389 iovec_mcast
[i
+ groups_cnt
+ 1].iov_len
= iovec
[i
].iov_len
;
1390 iovec_mcast
[i
+ groups_cnt
+ 1].iov_base
= iovec
[i
].iov_base
;
1393 res
= mcast_msg (iovec_mcast
, iov_len
+ groups_cnt
+ 1, guarantee
);
1395 if (totempg_threaded_mode
== 1) {
1396 pthread_mutex_unlock (&totempg_mutex
);
1402 * Returns -1 if error, 0 if can't send, 1 if can send the message
1404 int totempg_groups_send_ok_groups (
1405 void *totempg_groups_instance
,
1406 const struct totempg_group
*groups
,
1408 const struct iovec
*iovec
,
1409 unsigned int iov_len
)
1411 unsigned int size
= 0;
1415 if (totempg_threaded_mode
== 1) {
1416 pthread_mutex_lock (&totempg_mutex
);
1419 for (i
= 0; i
< groups_cnt
; i
++) {
1420 size
+= groups
[i
].group_len
;
1422 for (i
= 0; i
< iov_len
; i
++) {
1423 size
+= iovec
[i
].iov_len
;
1426 res
= msg_count_send_ok (size
);
1428 if (totempg_threaded_mode
== 1) {
1429 pthread_mutex_unlock (&totempg_mutex
);
1434 int totempg_iface_set (
1435 struct totem_ip_address
*interface_addr
,
1436 unsigned short ip_port
,
1437 unsigned int iface_no
)
1441 res
= totemsrp_iface_set (
1450 int totempg_nodestatus_get (unsigned int nodeid
,
1451 struct totem_node_status
*node_status
)
1453 memset(node_status
, 0, sizeof(struct totem_node_status
));
1454 return totemsrp_nodestatus_get (totemsrp_context
, nodeid
, node_status
);
1457 int totempg_ifaces_get (
1458 unsigned int nodeid
,
1459 unsigned int *interface_id
,
1460 struct totem_ip_address
*interfaces
,
1461 unsigned int interfaces_size
,
1463 unsigned int *iface_count
)
1467 res
= totemsrp_ifaces_get (
1479 void totempg_event_signal (enum totem_event_type type
, int value
)
1481 totemsrp_event_signal (totemsrp_context
, type
, value
);
1484 void* totempg_get_stats (void)
1486 return &totempg_stats
;
1489 int totempg_crypto_set (
1490 const char *cipher_type
,
1491 const char *hash_type
)
1495 res
= totemsrp_crypto_set (totemsrp_context
, cipher_type
, hash_type
);
1500 #define ONE_IFACE_LEN 63
1501 const char *totempg_ifaces_print (unsigned int nodeid
)
1503 static char iface_string
[256 * INTERFACE_MAX
];
1504 char one_iface
[ONE_IFACE_LEN
+1];
1505 struct totem_ip_address interfaces
[INTERFACE_MAX
];
1506 unsigned int iface_count
;
1507 unsigned int iface_ids
[INTERFACE_MAX
];
1511 iface_string
[0] = '\0';
1513 res
= totempg_ifaces_get (nodeid
, iface_ids
, interfaces
, INTERFACE_MAX
, NULL
, &iface_count
);
1515 return ("no interface found for nodeid");
1518 res
= totempg_ifaces_get (nodeid
, iface_ids
, interfaces
, INTERFACE_MAX
, NULL
, &iface_count
);
1520 for (i
= 0; i
< iface_count
; i
++) {
1521 if (!interfaces
[i
].family
) {
1524 snprintf (one_iface
, ONE_IFACE_LEN
,
1526 i
, totemip_print (&interfaces
[i
]));
1527 strcat (iface_string
, one_iface
);
1529 return (iface_string
);
1532 unsigned int totempg_my_nodeid_get (void)
1534 return (totemsrp_my_nodeid_get(totemsrp_context
));
1537 int totempg_my_family_get (void)
1539 return (totemsrp_my_family_get(totemsrp_context
));
1541 extern void totempg_service_ready_register (
1542 void (*totem_service_ready
) (void))
1544 totemsrp_service_ready_register (totemsrp_context
, totem_service_ready
);
1547 void totempg_queue_level_register_callback (totem_queue_level_changed_fn fn
)
1549 totem_queue_level_changed
= fn
;
1552 extern int totempg_member_add (
1553 const struct totem_ip_address
*member
,
1556 return totemsrp_member_add (totemsrp_context
, member
, ring_no
);
1559 extern int totempg_member_remove (
1560 const struct totem_ip_address
*member
,
1563 return totemsrp_member_remove (totemsrp_context
, member
, ring_no
);
1566 extern int totempg_reconfigure (void)
1568 return totemsrp_reconfigure (totemsrp_context
, totempg_totem_config
);
1571 extern int totempg_crypto_reconfigure_phase (cfg_message_crypto_reconfig_phase_t phase
)
1573 return totemsrp_crypto_reconfigure_phase (totemsrp_context
, totempg_totem_config
, phase
);
1576 extern void totempg_stats_clear (int flags
)
1578 if (flags
& TOTEMPG_STATS_CLEAR_TOTEM
) {
1579 totempg_stats
.msg_reserved
= 0;
1580 totempg_stats
.msg_queue_avail
= 0;
1582 return totemsrp_stats_clear (totemsrp_context
, flags
);
1585 void totempg_threaded_mode_enable (void)
1587 totempg_threaded_mode
= 1;
1588 totemsrp_threaded_mode_enable (totemsrp_context
);
1591 void totempg_trans_ack (void)
1593 totemsrp_trans_ack (totemsrp_context
);
1596 void totempg_force_gather (void)
1598 totemsrp_force_gather(totemsrp_context
);
1601 /* Assumes ->orig_interfaces is already allocated */
1602 void totempg_get_config(struct totem_config
*config
)
1604 struct totem_interface
*temp_if
= config
->orig_interfaces
;
1606 memcpy(config
, totempg_totem_config
, sizeof(struct totem_config
));
1607 config
->orig_interfaces
= temp_if
;
1608 memcpy(config
->orig_interfaces
, totempg_totem_config
->interfaces
, sizeof(struct totem_interface
) * INTERFACE_MAX
);
1609 config
->interfaces
= NULL
;
1612 void totempg_put_config(struct totem_config
*config
)
1614 struct totem_interface
*temp_if
= totempg_totem_config
->interfaces
;
1616 /* Preseve the existing interfaces[] array as transports might have pointers saved */
1617 memcpy(totempg_totem_config
->interfaces
, config
->interfaces
, sizeof(struct totem_interface
) * INTERFACE_MAX
);
1618 memcpy(totempg_totem_config
, config
, sizeof(struct totem_config
));
1619 totempg_totem_config
->interfaces
= temp_if
;