]> git.proxmox.com Git - mirror_corosync.git/blob - exec/totempg.c
configure: Modernize configure.ac a bit
[mirror_corosync.git] / exec / totempg.c
1 /*
2 * Copyright (c) 2003-2005 MontaVista Software, Inc.
3 * Copyright (c) 2005 OSDL.
4 * Copyright (c) 2006-2012 Red Hat, Inc.
5 *
6 * All rights reserved.
7 *
8 * Author: Steven Dake (sdake@redhat.com)
9 * Author: Mark Haverkamp (markh@osdl.org)
10 *
11 * This software licensed under BSD license, the text of which follows:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
36 */
37
38 /*
39 * FRAGMENTATION AND PACKING ALGORITHM:
40 *
41 * Assemble the entire message into one buffer
42 * if full fragment
43 * store fragment into lengths list
44 * for each full fragment
45 * multicast fragment
46 * set length and fragment fields of pg mesage
47 * store remaining multicast into head of fragmentation data and set lens field
48 *
49 * If a message exceeds the maximum packet size allowed by the totem
50 * single ring protocol, the protocol could lose forward progress.
51 * Statically calculating the allowed data amount doesn't work because
52 * the amount of data allowed depends on the number of fragments in
53 * each message. In this implementation, the maximum fragment size
54 * is dynamically calculated for each fragment added to the message.
55
56 * It is possible for a message to be two bytes short of the maximum
57 * packet size. This occurs when a message or collection of
58 * messages + the mcast header + the lens are two bytes short of the
59 * end of the packet. Since another len field consumes two bytes, the
60 * len field would consume the rest of the packet without room for data.
61 *
62 * One optimization would be to forgo the final len field and determine
63 * it from the size of the udp datagram. Then this condition would no
64 * longer occur.
65 */
66
67 /*
68 * ASSEMBLY AND UNPACKING ALGORITHM:
69 *
70 * copy incoming packet into assembly data buffer indexed by current
71 * location of end of fragment
72 *
73 * if not fragmented
74 * deliver all messages in assembly data buffer
75 * else
76 * if msg_count > 1 and fragmented
77 * deliver all messages except last message in assembly data buffer
78 * copy last fragmented section to start of assembly data buffer
79 * else
80 * if msg_count = 1 and fragmented
81 * do nothing
82 *
83 */
84
85 #include <config.h>
86
87 #ifdef HAVE_ALLOCA_H
88 #include <alloca.h>
89 #endif
90 #include <sys/types.h>
91 #include <sys/socket.h>
92 #include <netinet/in.h>
93 #include <arpa/inet.h>
94 #include <sys/uio.h>
95 #include <stdio.h>
96 #include <stdlib.h>
97 #include <string.h>
98 #include <assert.h>
99 #include <pthread.h>
100 #include <errno.h>
101 #include <limits.h>
102
103 #include <corosync/swab.h>
104 #include <qb/qblist.h>
105 #include <qb/qbloop.h>
106 #include <qb/qbipcs.h>
107 #include <corosync/totem/totempg.h>
108 #define LOGSYS_UTILS_ONLY 1
109 #include <corosync/logsys.h>
110
111 #include "util.h"
112 #include "totemsrp.h"
113
114 struct totempg_mcast_header {
115 short version;
116 short type;
117 };
118
119 #if !(defined(__i386__) || defined(__x86_64__))
120 /*
121 * Need align on architectures different then i386 or x86_64
122 */
123 #define TOTEMPG_NEED_ALIGN 1
124 #endif
125
126 /*
127 * totempg_mcast structure
128 *
129 * header: Identify the mcast.
130 * fragmented: Set if this message continues into next message
131 * continuation: Set if this message is a continuation from last message
132 * msg_count Indicates how many packed messages are contained
133 * in the mcast.
134 * Also, the size of each packed message and the messages themselves are
135 * appended to the end of this structure when sent.
136 */
137 struct totempg_mcast {
138 struct totempg_mcast_header header;
139 unsigned char fragmented;
140 unsigned char continuation;
141 unsigned short msg_count;
142 /*
143 * short msg_len[msg_count];
144 */
145 /*
146 * data for messages
147 */
148 };
149
150 /*
151 * Maximum packet size for totem pg messages
152 */
153 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
154 sizeof (struct totempg_mcast))
155
156 /*
157 * Local variables used for packing small messages
158 */
159 static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
160
161 static int mcast_packed_msg_count = 0;
162
163 static int totempg_reserved = 1;
164
165 static unsigned int totempg_size_limit;
166
167 static totem_queue_level_changed_fn totem_queue_level_changed = NULL;
168
169 static uint32_t totempg_threaded_mode = 0;
170
171 static void *totemsrp_context;
172
173 /*
174 * Function and data used to log messages
175 */
176 static int totempg_log_level_security;
177 static int totempg_log_level_error;
178 static int totempg_log_level_warning;
179 static int totempg_log_level_notice;
180 static int totempg_log_level_debug;
181 static int totempg_subsys_id;
182 static void (*totempg_log_printf) (
183 int level,
184 int subsys,
185 const char *function,
186 const char *file,
187 int line,
188 const char *format, ...) __attribute__((format(printf, 6, 7)));
189
190 struct totem_config *totempg_totem_config;
191
192 static totempg_stats_t totempg_stats;
193
194 enum throw_away_mode {
195 THROW_AWAY_INACTIVE,
196 THROW_AWAY_ACTIVE
197 };
198
199 struct assembly {
200 unsigned int nodeid;
201 unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE];
202 int index;
203 unsigned char last_frag_num;
204 enum throw_away_mode throw_away_mode;
205 struct qb_list_head list;
206 };
207
208 static void assembly_deref (struct assembly *assembly);
209
210 static int callback_token_received_fn (enum totem_callback_token_type type,
211 const void *data);
212
213 QB_LIST_DECLARE(assembly_list_inuse);
214
215 /*
216 * Free list is used both for transitional and operational assemblies
217 */
218 QB_LIST_DECLARE(assembly_list_free);
219
220 QB_LIST_DECLARE(assembly_list_inuse_trans);
221
222 QB_LIST_DECLARE(totempg_groups_list);
223
224 /*
225 * Staging buffer for packed messages. Messages are staged in this buffer
226 * before sending. Multiple messages may fit which cuts down on the
227 * number of mcasts sent. If a message doesn't completely fit, then
228 * the mcast header has a fragment bit set that says that there are more
229 * data to follow. fragment_size is an index into the buffer. It indicates
230 * the size of message data and where to place new message data.
231 * fragment_contuation indicates whether the first packed message in
232 * the buffer is a continuation of a previously packed fragment.
233 */
234 static unsigned char *fragmentation_data;
235
236 static int fragment_size = 0;
237
238 static int fragment_continuation = 0;
239
240 static int totempg_waiting_transack = 0;
241
242 struct totempg_group_instance {
243 void (*deliver_fn) (
244 unsigned int nodeid,
245 const void *msg,
246 unsigned int msg_len,
247 int endian_conversion_required);
248
249 void (*confchg_fn) (
250 enum totem_configuration_type configuration_type,
251 const unsigned int *member_list, size_t member_list_entries,
252 const unsigned int *left_list, size_t left_list_entries,
253 const unsigned int *joined_list, size_t joined_list_entries,
254 const struct memb_ring_id *ring_id);
255
256 struct totempg_group *groups;
257
258 int groups_cnt;
259 int32_t q_level;
260
261 struct qb_list_head list;
262 };
263
264 static unsigned char next_fragment = 1;
265
266 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
267
268 static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
269
270 static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
271
272 #define log_printf(level, format, args...) \
273 do { \
274 totempg_log_printf(level, \
275 totempg_subsys_id, \
276 __FUNCTION__, __FILE__, __LINE__, \
277 format, ##args); \
278 } while (0);
279
280 static int msg_count_send_ok (int msg_count);
281
282 static int byte_count_send_ok (int byte_count);
283
284 static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
285 {
286 log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
287 totempg_waiting_transack = waiting_trans_ack;
288 }
289
290 static struct assembly *assembly_ref (unsigned int nodeid)
291 {
292 struct assembly *assembly;
293 struct qb_list_head *list;
294 struct qb_list_head *active_assembly_list_inuse;
295
296 if (totempg_waiting_transack) {
297 active_assembly_list_inuse = &assembly_list_inuse_trans;
298 } else {
299 active_assembly_list_inuse = &assembly_list_inuse;
300 }
301
302 /*
303 * Search inuse list for node id and return assembly buffer if found
304 */
305 qb_list_for_each(list, active_assembly_list_inuse) {
306 assembly = qb_list_entry (list, struct assembly, list);
307
308 if (nodeid == assembly->nodeid) {
309 return (assembly);
310 }
311 }
312
313 /*
314 * Nothing found in inuse list get one from free list if available
315 */
316 if (qb_list_empty (&assembly_list_free) == 0) {
317 assembly = qb_list_first_entry (&assembly_list_free, struct assembly, list);
318 qb_list_del (&assembly->list);
319 qb_list_add (&assembly->list, active_assembly_list_inuse);
320 assembly->nodeid = nodeid;
321 assembly->index = 0;
322 assembly->last_frag_num = 0;
323 assembly->throw_away_mode = THROW_AWAY_INACTIVE;
324 return (assembly);
325 }
326
327 /*
328 * Nothing available in inuse or free list, so allocate a new one
329 */
330 assembly = malloc (sizeof (struct assembly));
331 /*
332 * TODO handle memory allocation failure here
333 */
334 assert (assembly);
335 assembly->nodeid = nodeid;
336 assembly->data[0] = 0;
337 assembly->index = 0;
338 assembly->last_frag_num = 0;
339 assembly->throw_away_mode = THROW_AWAY_INACTIVE;
340 qb_list_init (&assembly->list);
341 qb_list_add (&assembly->list, active_assembly_list_inuse);
342
343 return (assembly);
344 }
345
346 static void assembly_deref (struct assembly *assembly)
347 {
348 qb_list_del (&assembly->list);
349 qb_list_add (&assembly->list, &assembly_list_free);
350 }
351
352 static void assembly_deref_from_normal_and_trans (int nodeid)
353 {
354 int j;
355 struct qb_list_head *list, *tmp_iter;
356 struct qb_list_head *active_assembly_list_inuse;
357 struct assembly *assembly;
358
359 for (j = 0; j < 2; j++) {
360 if (j == 0) {
361 active_assembly_list_inuse = &assembly_list_inuse;
362 } else {
363 active_assembly_list_inuse = &assembly_list_inuse_trans;
364 }
365
366 qb_list_for_each_safe(list, tmp_iter, active_assembly_list_inuse) {
367 assembly = qb_list_entry (list, struct assembly, list);
368
369 if (nodeid == assembly->nodeid) {
370 qb_list_del (&assembly->list);
371 qb_list_add (&assembly->list, &assembly_list_free);
372 }
373 }
374 }
375
376 }
377
378 static inline void app_confchg_fn (
379 enum totem_configuration_type configuration_type,
380 const unsigned int *member_list, size_t member_list_entries,
381 const unsigned int *left_list, size_t left_list_entries,
382 const unsigned int *joined_list, size_t joined_list_entries,
383 const struct memb_ring_id *ring_id)
384 {
385 int i;
386 struct totempg_group_instance *instance;
387 struct qb_list_head *list;
388
389 /*
390 * For every leaving processor, add to free list
391 * This also has the side effect of clearing out the dataset
392 * In the leaving processor's assembly buffer.
393 */
394 for (i = 0; i < left_list_entries; i++) {
395 assembly_deref_from_normal_and_trans (left_list[i]);
396 }
397
398 qb_list_for_each(list, &totempg_groups_list) {
399 instance = qb_list_entry (list, struct totempg_group_instance, list);
400
401 if (instance->confchg_fn) {
402 instance->confchg_fn (
403 configuration_type,
404 member_list,
405 member_list_entries,
406 left_list,
407 left_list_entries,
408 joined_list,
409 joined_list_entries,
410 ring_id);
411 }
412 }
413 }
414
415 static inline void group_endian_convert (
416 void *msg,
417 int msg_len)
418 {
419 unsigned short *group_len;
420 int i;
421 char *aligned_msg;
422
423 #ifdef TOTEMPG_NEED_ALIGN
424 /*
425 * Align data structure for not i386 or x86_64
426 */
427 if ((size_t)msg % sizeof(char *) != 0) {
428 aligned_msg = alloca(msg_len);
429 memcpy(aligned_msg, msg, msg_len);
430 } else {
431 aligned_msg = msg;
432 }
433 #else
434 aligned_msg = msg;
435 #endif
436
437 group_len = (unsigned short *)aligned_msg;
438 group_len[0] = swab16(group_len[0]);
439 for (i = 1; i < group_len[0] + 1; i++) {
440 group_len[i] = swab16(group_len[i]);
441 }
442
443 if (aligned_msg != msg) {
444 memcpy(msg, aligned_msg, msg_len);
445 }
446 }
447
448 static inline int group_matches (
449 struct iovec *iovec,
450 unsigned int iov_len,
451 struct totempg_group *groups_b,
452 unsigned int group_b_cnt,
453 unsigned int *adjust_iovec)
454 {
455 unsigned short *group_len;
456 char *group_name;
457 int i;
458 int j;
459 #ifdef TOTEMPG_NEED_ALIGN
460 struct iovec iovec_aligned = { NULL, 0 };
461 #endif
462
463 assert (iov_len == 1);
464
465 #ifdef TOTEMPG_NEED_ALIGN
466 /*
467 * Align data structure for not i386 or x86_64
468 */
469 if ((size_t)iovec->iov_base % sizeof(char *) != 0) {
470 iovec_aligned.iov_base = alloca(iovec->iov_len);
471 memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
472 iovec_aligned.iov_len = iovec->iov_len;
473 iovec = &iovec_aligned;
474 }
475 #endif
476
477 group_len = (unsigned short *)iovec->iov_base;
478 group_name = ((char *)iovec->iov_base) +
479 sizeof (unsigned short) * (group_len[0] + 1);
480
481
482 /*
483 * Calculate amount to adjust the iovec by before delivering to app
484 */
485 *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
486 for (i = 1; i < group_len[0] + 1; i++) {
487 *adjust_iovec += group_len[i];
488 }
489
490 /*
491 * Determine if this message should be delivered to this instance
492 */
493 for (i = 1; i < group_len[0] + 1; i++) {
494 for (j = 0; j < group_b_cnt; j++) {
495 if ((group_len[i] == groups_b[j].group_len) &&
496 (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
497 return (1);
498 }
499 }
500 group_name += group_len[i];
501 }
502 return (0);
503 }
504
505
506 static inline void app_deliver_fn (
507 unsigned int nodeid,
508 void *msg,
509 unsigned int msg_len,
510 int endian_conversion_required)
511 {
512 struct totempg_group_instance *instance;
513 struct iovec stripped_iovec;
514 unsigned int adjust_iovec;
515 struct iovec *iovec;
516 struct qb_list_head *list;
517
518 struct iovec aligned_iovec = { NULL, 0 };
519
520 if (endian_conversion_required) {
521 group_endian_convert (msg, msg_len);
522 }
523
524 /*
525 * TODO: segmentation/assembly need to be redesigned to provide aligned access
526 * in all cases to avoid memory copies on non386 archs. Probably broke backwars
527 * compatibility
528 */
529
530 #ifdef TOTEMPG_NEED_ALIGN
531 /*
532 * Align data structure for not i386 or x86_64
533 */
534 aligned_iovec.iov_base = alloca(msg_len);
535 aligned_iovec.iov_len = msg_len;
536 memcpy(aligned_iovec.iov_base, msg, msg_len);
537 #else
538 aligned_iovec.iov_base = msg;
539 aligned_iovec.iov_len = msg_len;
540 #endif
541
542 iovec = &aligned_iovec;
543
544 qb_list_for_each(list, &totempg_groups_list) {
545 instance = qb_list_entry (list, struct totempg_group_instance, list);
546 if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
547 stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
548 stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
549
550 #ifdef TOTEMPG_NEED_ALIGN
551 /*
552 * Align data structure for not i386 or x86_64
553 */
554 if ((uintptr_t)((char *)iovec->iov_base + adjust_iovec) % (sizeof(char *)) != 0) {
555 /*
556 * Deal with misalignment
557 */
558 stripped_iovec.iov_base =
559 alloca (stripped_iovec.iov_len);
560 memcpy (stripped_iovec.iov_base,
561 (char *)iovec->iov_base + adjust_iovec,
562 stripped_iovec.iov_len);
563 }
564 #endif
565 instance->deliver_fn (
566 nodeid,
567 stripped_iovec.iov_base,
568 stripped_iovec.iov_len,
569 endian_conversion_required);
570 }
571 }
572 }
573
574 static void totempg_confchg_fn (
575 enum totem_configuration_type configuration_type,
576 const unsigned int *member_list, size_t member_list_entries,
577 const unsigned int *left_list, size_t left_list_entries,
578 const unsigned int *joined_list, size_t joined_list_entries,
579 const struct memb_ring_id *ring_id)
580 {
581 // TODO optimize this
582 app_confchg_fn (configuration_type,
583 member_list, member_list_entries,
584 left_list, left_list_entries,
585 joined_list, joined_list_entries,
586 ring_id);
587 }
588
589 static void totempg_deliver_fn (
590 unsigned int nodeid,
591 const void *msg,
592 unsigned int msg_len,
593 int endian_conversion_required)
594 {
595 struct totempg_mcast *mcast;
596 unsigned short *msg_lens;
597 int i;
598 struct assembly *assembly;
599 char header[FRAME_SIZE_MAX];
600 int msg_count;
601 int continuation;
602 int start;
603 const char *data;
604 int datasize;
605 struct iovec iov_delv;
606 size_t expected_msg_len;
607
608 assembly = assembly_ref (nodeid);
609 assert (assembly);
610
611 if (msg_len < sizeof(struct totempg_mcast)) {
612 log_printf(LOG_WARNING,
613 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID " is too short... Ignoring.", nodeid);
614
615 return ;
616 }
617
618 /*
619 * Assemble the header into one block of data and
620 * assemble the packet contents into one block of data to simplify delivery
621 */
622
623 mcast = (struct totempg_mcast *)msg;
624 if (endian_conversion_required) {
625 mcast->msg_count = swab16 (mcast->msg_count);
626 }
627
628 msg_count = mcast->msg_count;
629 datasize = sizeof (struct totempg_mcast) +
630 msg_count * sizeof (unsigned short);
631
632 if (msg_len < datasize) {
633 log_printf(LOG_WARNING,
634 "Message (totempg_mcast datasize) received from node " CS_PRI_NODE_ID
635 " is too short... Ignoring.", nodeid);
636
637 return ;
638 }
639
640 memcpy (header, msg, datasize);
641 data = msg;
642
643 msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
644 expected_msg_len = datasize;
645 for (i = 0; i < mcast->msg_count; i++) {
646 if (endian_conversion_required) {
647 msg_lens[i] = swab16 (msg_lens[i]);
648 }
649
650 expected_msg_len += msg_lens[i];
651 }
652
653 if (msg_len != expected_msg_len) {
654 log_printf(LOG_WARNING,
655 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID
656 " doesn't have expected length of %zu (has %u) bytes... Ignoring.",
657 nodeid, expected_msg_len, msg_len);
658
659 return ;
660 }
661
662 assert((assembly->index+msg_len) < sizeof(assembly->data));
663 memcpy (&assembly->data[assembly->index], &data[datasize],
664 msg_len - datasize);
665
666 /*
667 * If the last message in the buffer is a fragment, then we
668 * can't deliver it. We'll first deliver the full messages
669 * then adjust the assembly buffer so we can add the rest of the
670 * fragment when it arrives.
671 */
672 msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
673 continuation = mcast->continuation;
674 iov_delv.iov_base = (void *)&assembly->data[0];
675 iov_delv.iov_len = assembly->index + msg_lens[0];
676
677 /*
678 * Make sure that if this message is a continuation, that it
679 * matches the sequence number of the previous fragment.
680 * Also, if the first packed message is a continuation
681 * of a previous message, but the assembly buffer
682 * is empty, then we need to discard it since we can't
683 * assemble a complete message. Likewise, if this message isn't a
684 * continuation and the assembly buffer is empty, we have to discard
685 * the continued message.
686 */
687 start = 0;
688
689 if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) {
690 /* Throw away the first msg block */
691 if (mcast->fragmented == 0 || mcast->fragmented == 1) {
692 assembly->throw_away_mode = THROW_AWAY_INACTIVE;
693
694 assembly->index += msg_lens[0];
695 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
696 iov_delv.iov_len = msg_lens[1];
697 start = 1;
698 }
699 } else
700 if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) {
701 if (continuation == assembly->last_frag_num) {
702 assembly->last_frag_num = mcast->fragmented;
703 for (i = start; i < msg_count; i++) {
704 app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
705 endian_conversion_required);
706 assembly->index += msg_lens[i];
707 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
708 if (i < (msg_count - 1)) {
709 iov_delv.iov_len = msg_lens[i + 1];
710 }
711 }
712 } else {
713 log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
714 continuation, assembly->last_frag_num);
715 assembly->throw_away_mode = THROW_AWAY_ACTIVE;
716 }
717 }
718
719 if (mcast->fragmented == 0) {
720 /*
721 * End of messages, dereference assembly struct
722 */
723 assembly->last_frag_num = 0;
724 assembly->index = 0;
725 assembly_deref (assembly);
726 } else {
727 /*
728 * Message is fragmented, keep around assembly list
729 */
730 if (mcast->msg_count > 1) {
731 memmove (&assembly->data[0],
732 &assembly->data[assembly->index],
733 msg_lens[msg_count]);
734
735 assembly->index = 0;
736 }
737 assembly->index += msg_lens[msg_count];
738 }
739 }
740
741 /*
742 * Totem Process Group Abstraction
743 * depends on poll abstraction, POSIX, IPV4
744 */
745
746 void *callback_token_received_handle;
747
748 int callback_token_received_fn (enum totem_callback_token_type type,
749 const void *data)
750 {
751 struct totempg_mcast mcast;
752 struct iovec iovecs[3];
753
754 if (totempg_threaded_mode == 1) {
755 pthread_mutex_lock (&mcast_msg_mutex);
756 }
757 if (mcast_packed_msg_count == 0) {
758 if (totempg_threaded_mode == 1) {
759 pthread_mutex_unlock (&mcast_msg_mutex);
760 }
761 return (0);
762 }
763 if (totemsrp_avail(totemsrp_context) == 0) {
764 if (totempg_threaded_mode == 1) {
765 pthread_mutex_unlock (&mcast_msg_mutex);
766 }
767 return (0);
768 }
769 mcast.header.version = 0;
770 mcast.header.type = 0;
771 mcast.fragmented = 0;
772
773 /*
774 * Was the first message in this buffer a continuation of a
775 * fragmented message?
776 */
777 mcast.continuation = fragment_continuation;
778 fragment_continuation = 0;
779
780 mcast.msg_count = mcast_packed_msg_count;
781
782 iovecs[0].iov_base = (void *)&mcast;
783 iovecs[0].iov_len = sizeof (struct totempg_mcast);
784 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
785 iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
786 iovecs[2].iov_base = (void *)&fragmentation_data[0];
787 iovecs[2].iov_len = fragment_size;
788 (void)totemsrp_mcast (totemsrp_context, iovecs, 3, 0);
789
790 mcast_packed_msg_count = 0;
791 fragment_size = 0;
792
793 if (totempg_threaded_mode == 1) {
794 pthread_mutex_unlock (&mcast_msg_mutex);
795 }
796 return (0);
797 }
798
799 /*
800 * Initialize the totem process group abstraction
801 */
802 int totempg_initialize (
803 qb_loop_t *poll_handle,
804 struct totem_config *totem_config)
805 {
806 int res;
807
808 totempg_totem_config = totem_config;
809 totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
810 totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
811 totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
812 totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
813 totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
814 totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
815 totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
816
817 fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
818 if (fragmentation_data == 0) {
819 return (-1);
820 }
821
822 totemsrp_net_mtu_adjust (totem_config);
823
824 res = totemsrp_initialize (
825 poll_handle,
826 &totemsrp_context,
827 totem_config,
828 &totempg_stats,
829 totempg_deliver_fn,
830 totempg_confchg_fn,
831 totempg_waiting_trans_ack_cb);
832
833 if (res == -1) {
834 goto error_exit;
835 }
836
837 totemsrp_callback_token_create (
838 totemsrp_context,
839 &callback_token_received_handle,
840 TOTEM_CALLBACK_TOKEN_RECEIVED,
841 0,
842 callback_token_received_fn,
843 0);
844
845 totempg_size_limit = (totemsrp_avail(totemsrp_context) - 1) *
846 (totempg_totem_config->net_mtu -
847 sizeof (struct totempg_mcast) - 16);
848
849 qb_list_init (&totempg_groups_list);
850
851 error_exit:
852 return (res);
853 }
854
855 void totempg_finalize (void)
856 {
857 if (totempg_threaded_mode == 1) {
858 pthread_mutex_lock (&totempg_mutex);
859 }
860 totemsrp_finalize (totemsrp_context);
861 if (totempg_threaded_mode == 1) {
862 pthread_mutex_unlock (&totempg_mutex);
863 }
864 }
865
866 /*
867 * Multicast a message
868 */
869 static int mcast_msg (
870 struct iovec *iovec_in,
871 unsigned int iov_len,
872 int guarantee)
873 {
874 int res = 0;
875 struct totempg_mcast mcast;
876 struct iovec iovecs[3];
877 struct iovec iovec[64];
878 int i;
879 int dest, src;
880 int max_packet_size = 0;
881 int copy_len = 0;
882 int copy_base = 0;
883 int total_size = 0;
884
885 if (totempg_threaded_mode == 1) {
886 pthread_mutex_lock (&mcast_msg_mutex);
887 }
888 totemsrp_event_signal (totemsrp_context, TOTEM_EVENT_NEW_MSG, 1);
889
890 /*
891 * Remove zero length iovectors from the list
892 */
893 assert (iov_len < 64);
894 for (dest = 0, src = 0; src < iov_len; src++) {
895 if (iovec_in[src].iov_len) {
896 memcpy (&iovec[dest++], &iovec_in[src],
897 sizeof (struct iovec));
898 }
899 }
900 iov_len = dest;
901
902 max_packet_size = TOTEMPG_PACKET_SIZE -
903 (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
904
905 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
906
907 /*
908 * Check if we would overwrite new message queue
909 */
910 for (i = 0; i < iov_len; i++) {
911 total_size += iovec[i].iov_len;
912 }
913
914 if (byte_count_send_ok (total_size + sizeof(unsigned short) *
915 (mcast_packed_msg_count)) == 0) {
916
917 if (totempg_threaded_mode == 1) {
918 pthread_mutex_unlock (&mcast_msg_mutex);
919 }
920 return(-1);
921 }
922
923 memset(&mcast, 0, sizeof(mcast));
924
925 mcast.header.version = 0;
926 for (i = 0; i < iov_len; ) {
927 mcast.fragmented = 0;
928 mcast.continuation = fragment_continuation;
929 copy_len = iovec[i].iov_len - copy_base;
930
931 /*
932 * If it all fits with room left over, copy it in.
933 * We need to leave at least sizeof(short) + 1 bytes in the
934 * fragment_buffer on exit so that max_packet_size + fragment_size
935 * doesn't exceed the size of the fragment_buffer on the next call.
936 */
937 if ((iovec[i].iov_len + fragment_size) <
938 (max_packet_size - sizeof (unsigned short))) {
939
940 memcpy (&fragmentation_data[fragment_size],
941 (char *)iovec[i].iov_base + copy_base, copy_len);
942 fragment_size += copy_len;
943 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
944 next_fragment = 1;
945 copy_len = 0;
946 copy_base = 0;
947 i++;
948 continue;
949
950 /*
951 * If it just fits or is too big, then send out what fits.
952 */
953 } else {
954 unsigned char *data_ptr;
955
956 copy_len = min(copy_len, max_packet_size - fragment_size);
957 if( copy_len == max_packet_size )
958 data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
959 else {
960 data_ptr = fragmentation_data;
961 }
962
963 memcpy (&fragmentation_data[fragment_size],
964 (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
965 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
966
967 /*
968 * if we're not on the last iovec or the iovec is too large to
969 * fit, then indicate a fragment. This also means that the next
970 * message will have the continuation of this one.
971 */
972 if ((i < (iov_len - 1)) ||
973 ((copy_base + copy_len) < iovec[i].iov_len)) {
974 if (!next_fragment) {
975 next_fragment++;
976 }
977 fragment_continuation = next_fragment;
978 mcast.fragmented = next_fragment++;
979 assert(fragment_continuation != 0);
980 assert(mcast.fragmented != 0);
981 } else {
982 fragment_continuation = 0;
983 }
984
985 /*
986 * assemble the message and send it
987 */
988 mcast.msg_count = ++mcast_packed_msg_count;
989 iovecs[0].iov_base = (void *)&mcast;
990 iovecs[0].iov_len = sizeof(struct totempg_mcast);
991 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
992 iovecs[1].iov_len = mcast_packed_msg_count *
993 sizeof(unsigned short);
994 iovecs[2].iov_base = (void *)data_ptr;
995 iovecs[2].iov_len = fragment_size + copy_len;
996 assert (totemsrp_avail(totemsrp_context) > 0);
997 res = totemsrp_mcast (totemsrp_context, iovecs, 3, guarantee);
998 if (res == -1) {
999 goto error_exit;
1000 }
1001
1002 /*
1003 * Recalculate counts and indexes for the next.
1004 */
1005 mcast_packed_msg_lens[0] = 0;
1006 mcast_packed_msg_count = 0;
1007 fragment_size = 0;
1008 max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
1009
1010 /*
1011 * If the iovec all fit, go to the next iovec
1012 */
1013 if ((copy_base + copy_len) == iovec[i].iov_len) {
1014 copy_len = 0;
1015 copy_base = 0;
1016 i++;
1017
1018 /*
1019 * Continue with the rest of the current iovec.
1020 */
1021 } else {
1022 copy_base += copy_len;
1023 }
1024 }
1025 }
1026
1027 /*
1028 * Bump only if we added message data. This may be zero if
1029 * the last buffer just fit into the fragmentation_data buffer
1030 * and we were at the last iovec.
1031 */
1032 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1033 mcast_packed_msg_count++;
1034 }
1035
1036 error_exit:
1037 if (totempg_threaded_mode == 1) {
1038 pthread_mutex_unlock (&mcast_msg_mutex);
1039 }
1040 return (res);
1041 }
1042
1043 /*
1044 * Determine if a message of msg_size could be queued
1045 */
1046 static int msg_count_send_ok (
1047 int msg_count)
1048 {
1049 int avail = 0;
1050
1051 avail = totemsrp_avail (totemsrp_context);
1052 totempg_stats.msg_queue_avail = avail;
1053
1054 return ((avail - totempg_reserved) > msg_count);
1055 }
1056
1057 static int byte_count_send_ok (
1058 int byte_count)
1059 {
1060 unsigned int msg_count = 0;
1061 int avail = 0;
1062
1063 avail = totemsrp_avail (totemsrp_context);
1064
1065 msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1066
1067 return (avail >= msg_count);
1068 }
1069
1070 static int send_reserve (
1071 int msg_size)
1072 {
1073 unsigned int msg_count = 0;
1074
1075 msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1076 totempg_reserved += msg_count;
1077 totempg_stats.msg_reserved = totempg_reserved;
1078
1079 return (msg_count);
1080 }
1081
1082 static void send_release (
1083 int msg_count)
1084 {
1085 totempg_reserved -= msg_count;
1086 totempg_stats.msg_reserved = totempg_reserved;
1087 }
1088
1089 #ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1090 #undef MESSAGE_QUEUE_MAX
1091 #define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1092 #endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1093
1094 static uint32_t q_level_precent_used(void)
1095 {
1096 return (100 - (((totemsrp_avail(totemsrp_context) - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX));
1097 }
1098
1099 int totempg_callback_token_create (
1100 void **handle_out,
1101 enum totem_callback_token_type type,
1102 int delete,
1103 int (*callback_fn) (enum totem_callback_token_type type, const void *),
1104 const void *data)
1105 {
1106 unsigned int res;
1107 if (totempg_threaded_mode == 1) {
1108 pthread_mutex_lock (&callback_token_mutex);
1109 }
1110 res = totemsrp_callback_token_create (totemsrp_context, handle_out, type, delete,
1111 callback_fn, data);
1112 if (totempg_threaded_mode == 1) {
1113 pthread_mutex_unlock (&callback_token_mutex);
1114 }
1115 return (res);
1116 }
1117
1118 void totempg_callback_token_destroy (
1119 void *handle_out)
1120 {
1121 if (totempg_threaded_mode == 1) {
1122 pthread_mutex_lock (&callback_token_mutex);
1123 }
1124 totemsrp_callback_token_destroy (totemsrp_context, handle_out);
1125 if (totempg_threaded_mode == 1) {
1126 pthread_mutex_unlock (&callback_token_mutex);
1127 }
1128 }
1129
1130 /*
1131 * vi: set autoindent tabstop=4 shiftwidth=4 :
1132 */
1133
1134 int totempg_groups_initialize (
1135 void **totempg_groups_instance,
1136
1137 void (*deliver_fn) (
1138 unsigned int nodeid,
1139 const void *msg,
1140 unsigned int msg_len,
1141 int endian_conversion_required),
1142
1143 void (*confchg_fn) (
1144 enum totem_configuration_type configuration_type,
1145 const unsigned int *member_list, size_t member_list_entries,
1146 const unsigned int *left_list, size_t left_list_entries,
1147 const unsigned int *joined_list, size_t joined_list_entries,
1148 const struct memb_ring_id *ring_id))
1149 {
1150 struct totempg_group_instance *instance;
1151
1152 if (totempg_threaded_mode == 1) {
1153 pthread_mutex_lock (&totempg_mutex);
1154 }
1155
1156 instance = malloc (sizeof (struct totempg_group_instance));
1157 if (instance == NULL) {
1158 goto error_exit;
1159 }
1160
1161 instance->deliver_fn = deliver_fn;
1162 instance->confchg_fn = confchg_fn;
1163 instance->groups = 0;
1164 instance->groups_cnt = 0;
1165 instance->q_level = QB_LOOP_MED;
1166 qb_list_init (&instance->list);
1167 qb_list_add (&instance->list, &totempg_groups_list);
1168
1169 if (totempg_threaded_mode == 1) {
1170 pthread_mutex_unlock (&totempg_mutex);
1171 }
1172 *totempg_groups_instance = instance;
1173 return (0);
1174
1175 error_exit:
1176 if (totempg_threaded_mode == 1) {
1177 pthread_mutex_unlock (&totempg_mutex);
1178 }
1179 return (-1);
1180 }
1181
1182 int totempg_groups_join (
1183 void *totempg_groups_instance,
1184 const struct totempg_group *groups,
1185 size_t group_cnt)
1186 {
1187 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1188 struct totempg_group *new_groups;
1189 int res = 0;
1190
1191 if (totempg_threaded_mode == 1) {
1192 pthread_mutex_lock (&totempg_mutex);
1193 }
1194
1195 new_groups = realloc (instance->groups,
1196 sizeof (struct totempg_group) *
1197 (instance->groups_cnt + group_cnt));
1198 if (new_groups == 0) {
1199 res = -1;
1200 goto error_exit;
1201 }
1202 memcpy (&new_groups[instance->groups_cnt],
1203 groups, group_cnt * sizeof (struct totempg_group));
1204 instance->groups = new_groups;
1205 instance->groups_cnt += group_cnt;
1206
1207 error_exit:
1208 if (totempg_threaded_mode == 1) {
1209 pthread_mutex_unlock (&totempg_mutex);
1210 }
1211 return (res);
1212 }
1213
1214 int totempg_groups_leave (
1215 void *totempg_groups_instance,
1216 const struct totempg_group *groups,
1217 size_t group_cnt)
1218 {
1219 if (totempg_threaded_mode == 1) {
1220 pthread_mutex_lock (&totempg_mutex);
1221 }
1222
1223 if (totempg_threaded_mode == 1) {
1224 pthread_mutex_unlock (&totempg_mutex);
1225 }
1226 return (0);
1227 }
1228
1229 #define MAX_IOVECS_FROM_APP 32
1230 #define MAX_GROUPS_PER_MSG 32
1231
1232 int totempg_groups_mcast_joined (
1233 void *totempg_groups_instance,
1234 const struct iovec *iovec,
1235 unsigned int iov_len,
1236 int guarantee)
1237 {
1238 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1239 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1240 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1241 int i;
1242 unsigned int res;
1243
1244 if (totempg_threaded_mode == 1) {
1245 pthread_mutex_lock (&totempg_mutex);
1246 }
1247
1248 /*
1249 * Build group_len structure and the iovec_mcast structure
1250 */
1251 group_len[0] = instance->groups_cnt;
1252 for (i = 0; i < instance->groups_cnt; i++) {
1253 group_len[i + 1] = instance->groups[i].group_len;
1254 iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
1255 iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
1256 }
1257 iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
1258 iovec_mcast[0].iov_base = group_len;
1259 for (i = 0; i < iov_len; i++) {
1260 iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1261 iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1262 }
1263
1264 res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
1265
1266 if (totempg_threaded_mode == 1) {
1267 pthread_mutex_unlock (&totempg_mutex);
1268 }
1269
1270 return (res);
1271 }
1272
1273 static void check_q_level(
1274 void *totempg_groups_instance)
1275 {
1276 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1277 int32_t old_level = instance->q_level;
1278 int32_t percent_used = q_level_precent_used();
1279
1280 if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) {
1281 instance->q_level = TOTEM_Q_LEVEL_CRITICAL;
1282 } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) {
1283 instance->q_level = TOTEM_Q_LEVEL_LOW;
1284 } else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) {
1285 instance->q_level = TOTEM_Q_LEVEL_GOOD;
1286 } else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) {
1287 instance->q_level = TOTEM_Q_LEVEL_HIGH;
1288 }
1289 if (totem_queue_level_changed && old_level != instance->q_level) {
1290 totem_queue_level_changed(instance->q_level);
1291 }
1292 }
1293
1294 void totempg_check_q_level(
1295 void *totempg_groups_instance)
1296 {
1297 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1298
1299 check_q_level(instance);
1300 }
1301
1302 int totempg_groups_joined_reserve (
1303 void *totempg_groups_instance,
1304 const struct iovec *iovec,
1305 unsigned int iov_len)
1306 {
1307 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1308 unsigned int size = 0;
1309 unsigned int i;
1310 unsigned int reserved = 0;
1311
1312 if (totempg_threaded_mode == 1) {
1313 pthread_mutex_lock (&totempg_mutex);
1314 pthread_mutex_lock (&mcast_msg_mutex);
1315 }
1316
1317 for (i = 0; i < instance->groups_cnt; i++) {
1318 size += instance->groups[i].group_len;
1319 }
1320 for (i = 0; i < iov_len; i++) {
1321 size += iovec[i].iov_len;
1322 }
1323
1324 if (size >= totempg_size_limit) {
1325 reserved = -1;
1326 goto error_exit;
1327 }
1328
1329 if (byte_count_send_ok (size)) {
1330 reserved = send_reserve (size);
1331 } else {
1332 reserved = 0;
1333 }
1334
1335 error_exit:
1336 check_q_level(instance);
1337
1338 if (totempg_threaded_mode == 1) {
1339 pthread_mutex_unlock (&mcast_msg_mutex);
1340 pthread_mutex_unlock (&totempg_mutex);
1341 }
1342 return (reserved);
1343 }
1344
1345
1346 int totempg_groups_joined_release (int msg_count)
1347 {
1348 if (totempg_threaded_mode == 1) {
1349 pthread_mutex_lock (&totempg_mutex);
1350 pthread_mutex_lock (&mcast_msg_mutex);
1351 }
1352 send_release (msg_count);
1353 if (totempg_threaded_mode == 1) {
1354 pthread_mutex_unlock (&mcast_msg_mutex);
1355 pthread_mutex_unlock (&totempg_mutex);
1356 }
1357 return 0;
1358 }
1359
1360 int totempg_groups_mcast_groups (
1361 void *totempg_groups_instance,
1362 int guarantee,
1363 const struct totempg_group *groups,
1364 size_t groups_cnt,
1365 const struct iovec *iovec,
1366 unsigned int iov_len)
1367 {
1368 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1369 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1370 int i;
1371 unsigned int res;
1372
1373 if (totempg_threaded_mode == 1) {
1374 pthread_mutex_lock (&totempg_mutex);
1375 }
1376
1377 /*
1378 * Build group_len structure and the iovec_mcast structure
1379 */
1380 group_len[0] = groups_cnt;
1381 for (i = 0; i < groups_cnt; i++) {
1382 group_len[i + 1] = groups[i].group_len;
1383 iovec_mcast[i + 1].iov_len = groups[i].group_len;
1384 iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
1385 }
1386 iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
1387 iovec_mcast[0].iov_base = group_len;
1388 for (i = 0; i < iov_len; i++) {
1389 iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1390 iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1391 }
1392
1393 res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1394
1395 if (totempg_threaded_mode == 1) {
1396 pthread_mutex_unlock (&totempg_mutex);
1397 }
1398 return (res);
1399 }
1400
1401 /*
1402 * Returns -1 if error, 0 if can't send, 1 if can send the message
1403 */
1404 int totempg_groups_send_ok_groups (
1405 void *totempg_groups_instance,
1406 const struct totempg_group *groups,
1407 size_t groups_cnt,
1408 const struct iovec *iovec,
1409 unsigned int iov_len)
1410 {
1411 unsigned int size = 0;
1412 unsigned int i;
1413 unsigned int res;
1414
1415 if (totempg_threaded_mode == 1) {
1416 pthread_mutex_lock (&totempg_mutex);
1417 }
1418
1419 for (i = 0; i < groups_cnt; i++) {
1420 size += groups[i].group_len;
1421 }
1422 for (i = 0; i < iov_len; i++) {
1423 size += iovec[i].iov_len;
1424 }
1425
1426 res = msg_count_send_ok (size);
1427
1428 if (totempg_threaded_mode == 1) {
1429 pthread_mutex_unlock (&totempg_mutex);
1430 }
1431 return (res);
1432 }
1433
1434 int totempg_iface_set (
1435 struct totem_ip_address *interface_addr,
1436 unsigned short ip_port,
1437 unsigned int iface_no)
1438 {
1439 int res;
1440
1441 res = totemsrp_iface_set (
1442 totemsrp_context,
1443 interface_addr,
1444 ip_port,
1445 iface_no);
1446
1447 return (res);
1448 }
1449
1450 int totempg_nodestatus_get (unsigned int nodeid,
1451 struct totem_node_status *node_status)
1452 {
1453 memset(node_status, 0, sizeof(struct totem_node_status));
1454 return totemsrp_nodestatus_get (totemsrp_context, nodeid, node_status);
1455 }
1456
1457 int totempg_ifaces_get (
1458 unsigned int nodeid,
1459 unsigned int *interface_id,
1460 struct totem_ip_address *interfaces,
1461 unsigned int interfaces_size,
1462 char ***status,
1463 unsigned int *iface_count)
1464 {
1465 int res;
1466
1467 res = totemsrp_ifaces_get (
1468 totemsrp_context,
1469 nodeid,
1470 interface_id,
1471 interfaces,
1472 interfaces_size,
1473 status,
1474 iface_count);
1475
1476 return (res);
1477 }
1478
1479 void totempg_event_signal (enum totem_event_type type, int value)
1480 {
1481 totemsrp_event_signal (totemsrp_context, type, value);
1482 }
1483
1484 void* totempg_get_stats (void)
1485 {
1486 return &totempg_stats;
1487 }
1488
1489 int totempg_crypto_set (
1490 const char *cipher_type,
1491 const char *hash_type)
1492 {
1493 int res;
1494
1495 res = totemsrp_crypto_set (totemsrp_context, cipher_type, hash_type);
1496
1497 return (res);
1498 }
1499
1500 #define ONE_IFACE_LEN 63
1501 const char *totempg_ifaces_print (unsigned int nodeid)
1502 {
1503 static char iface_string[256 * INTERFACE_MAX];
1504 char one_iface[ONE_IFACE_LEN+1];
1505 struct totem_ip_address interfaces[INTERFACE_MAX];
1506 unsigned int iface_count;
1507 unsigned int iface_ids[INTERFACE_MAX];
1508 unsigned int i;
1509 int res;
1510
1511 iface_string[0] = '\0';
1512
1513 res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1514 if (res == -1) {
1515 return ("no interface found for nodeid");
1516 }
1517
1518 res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1519
1520 for (i = 0; i < iface_count; i++) {
1521 if (!interfaces[i].family) {
1522 continue;
1523 }
1524 snprintf (one_iface, ONE_IFACE_LEN,
1525 "r(%d) ip(%s) ",
1526 i, totemip_print (&interfaces[i]));
1527 strcat (iface_string, one_iface);
1528 }
1529 return (iface_string);
1530 }
1531
1532 unsigned int totempg_my_nodeid_get (void)
1533 {
1534 return (totemsrp_my_nodeid_get(totemsrp_context));
1535 }
1536
1537 int totempg_my_family_get (void)
1538 {
1539 return (totemsrp_my_family_get(totemsrp_context));
1540 }
1541 extern void totempg_service_ready_register (
1542 void (*totem_service_ready) (void))
1543 {
1544 totemsrp_service_ready_register (totemsrp_context, totem_service_ready);
1545 }
1546
1547 void totempg_queue_level_register_callback (totem_queue_level_changed_fn fn)
1548 {
1549 totem_queue_level_changed = fn;
1550 }
1551
1552 extern int totempg_member_add (
1553 const struct totem_ip_address *member,
1554 int ring_no)
1555 {
1556 return totemsrp_member_add (totemsrp_context, member, ring_no);
1557 }
1558
1559 extern int totempg_member_remove (
1560 const struct totem_ip_address *member,
1561 int ring_no)
1562 {
1563 return totemsrp_member_remove (totemsrp_context, member, ring_no);
1564 }
1565
1566 extern int totempg_reconfigure (void)
1567 {
1568 return totemsrp_reconfigure (totemsrp_context, totempg_totem_config);
1569 }
1570
1571 extern int totempg_crypto_reconfigure_phase (cfg_message_crypto_reconfig_phase_t phase)
1572 {
1573 return totemsrp_crypto_reconfigure_phase (totemsrp_context, totempg_totem_config, phase);
1574 }
1575
1576 extern void totempg_stats_clear (int flags)
1577 {
1578 if (flags & TOTEMPG_STATS_CLEAR_TOTEM) {
1579 totempg_stats.msg_reserved = 0;
1580 totempg_stats.msg_queue_avail = 0;
1581 }
1582 return totemsrp_stats_clear (totemsrp_context, flags);
1583 }
1584
1585 void totempg_threaded_mode_enable (void)
1586 {
1587 totempg_threaded_mode = 1;
1588 totemsrp_threaded_mode_enable (totemsrp_context);
1589 }
1590
1591 void totempg_trans_ack (void)
1592 {
1593 totemsrp_trans_ack (totemsrp_context);
1594 }
1595
1596 void totempg_force_gather (void)
1597 {
1598 totemsrp_force_gather(totemsrp_context);
1599 }
1600
1601 /* Assumes ->orig_interfaces is already allocated */
1602 void totempg_get_config(struct totem_config *config)
1603 {
1604 struct totem_interface *temp_if = config->orig_interfaces;
1605
1606 memcpy(config, totempg_totem_config, sizeof(struct totem_config));
1607 config->orig_interfaces = temp_if;
1608 memcpy(config->orig_interfaces, totempg_totem_config->interfaces, sizeof(struct totem_interface) * INTERFACE_MAX);
1609 config->interfaces = NULL;
1610 }
1611
1612 void totempg_put_config(struct totem_config *config)
1613 {
1614 struct totem_interface *temp_if = totempg_totem_config->interfaces;
1615
1616 /* Preseve the existing interfaces[] array as transports might have pointers saved */
1617 memcpy(totempg_totem_config->interfaces, config->interfaces, sizeof(struct totem_interface) * INTERFACE_MAX);
1618 memcpy(totempg_totem_config, config, sizeof(struct totem_config));
1619 totempg_totem_config->interfaces = temp_if;
1620 }