]> git.proxmox.com Git - mirror_corosync.git/blame - exec/totempg.c
config: Don't free pointers used by transports
[mirror_corosync.git] / exec / totempg.c
CommitLineData
940a8d72
SD
1/*
2 * Copyright (c) 2003-2005 MontaVista Software, Inc.
ce60bbfc 3 * Copyright (c) 2005 OSDL.
2ad0cdc8 4 * Copyright (c) 2006-2012 Red Hat, Inc.
940a8d72
SD
5 *
6 * All rights reserved.
7 *
0a19a21f 8 * Author: Steven Dake (sdake@redhat.com)
a7f4b6d8 9 * Author: Mark Haverkamp (markh@osdl.org)
940a8d72
SD
10 *
11 * This software licensed under BSD license, the text of which follows:
3a32139c 12 *
940a8d72
SD
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
36 */
37
38/*
39 * FRAGMENTATION AND PACKING ALGORITHM:
40 *
41 * Assemble the entire message into one buffer
42 * if full fragment
43 * store fragment into lengths list
44 * for each full fragment
45 * multicast fragment
46 * set length and fragment fields of pg mesage
47 * store remaining multicast into head of fragmentation data and set lens field
48 *
49 * If a message exceeds the maximum packet size allowed by the totem
283790b7 50 * single ring protocol, the protocol could lose forward progress.
940a8d72
SD
51 * Statically calculating the allowed data amount doesn't work because
52 * the amount of data allowed depends on the number of fragments in
53 * each message. In this implementation, the maximum fragment size
54 * is dynamically calculated for each fragment added to the message.
55
56 * It is possible for a message to be two bytes short of the maximum
57 * packet size. This occurs when a message or collection of
58 * messages + the mcast header + the lens are two bytes short of the
59 * end of the packet. Since another len field consumes two bytes, the
60 * len field would consume the rest of the packet without room for data.
61 *
62 * One optimization would be to forgo the final len field and determine
63 * it from the size of the udp datagram. Then this condition would no
64 * longer occur.
65 */
66
67/*
68 * ASSEMBLY AND UNPACKING ALGORITHM:
d45965ec 69 *
940a8d72
SD
70 * copy incoming packet into assembly data buffer indexed by current
71 * location of end of fragment
72 *
73 * if not fragmented
74 * deliver all messages in assembly data buffer
75 * else
76 * if msg_count > 1 and fragmented
77 * deliver all messages except last message in assembly data buffer
78 * copy last fragmented section to start of assembly data buffer
79 * else
80 * if msg_count = 1 and fragmented
81 * do nothing
d45965ec 82 *
940a8d72
SD
83 */
84
031c02f5
FDN
85#include <config.h>
86
04cf210d 87#ifdef HAVE_ALLOCA_H
b8e3951c 88#include <alloca.h>
04cf210d 89#endif
932829bf
JF
90#include <sys/types.h>
91#include <sys/socket.h>
e4dfef72 92#include <netinet/in.h>
932829bf 93#include <arpa/inet.h>
940a8d72
SD
94#include <sys/uio.h>
95#include <stdio.h>
96#include <stdlib.h>
97#include <string.h>
98#include <assert.h>
536fab94 99#include <pthread.h>
f4e89f2f 100#include <errno.h>
5597a238 101#include <limits.h>
e4dfef72 102
e1f53138 103#include <corosync/swab.h>
b4c06e52 104#include <qb/qblist.h>
fce8a3c3
AS
105#include <qb/qbloop.h>
106#include <qb/qbipcs.h>
e1f53138 107#include <corosync/totem/totempg.h>
7595cd31 108#define LOGSYS_UTILS_ONLY 1
8ad583a5 109#include <corosync/logsys.h>
e1f53138 110
9f2d5a3a 111#include "util.h"
90c88404 112#include "totemsrp.h"
940a8d72
SD
113
114struct totempg_mcast_header {
115 short version;
116 short type;
117};
118
77d98081
JF
119#if !(defined(__i386__) || defined(__x86_64__))
120/*
121 * Need align on architectures different then i386 or x86_64
122 */
123#define TOTEMPG_NEED_ALIGN 1
124#endif
064ffb17 125
c720930b
MH
126/*
127 * totempg_mcast structure
128 *
129 * header: Identify the mcast.
130 * fragmented: Set if this message continues into next message
131 * continuation: Set if this message is a continuation from last message
132 * msg_count Indicates how many packed messages are contained
133 * in the mcast.
134 * Also, the size of each packed message and the messages themselves are
135 * appended to the end of this structure when sent.
136 */
940a8d72
SD
137struct totempg_mcast {
138 struct totempg_mcast_header header;
904a10ed
JM
139 unsigned char fragmented;
140 unsigned char continuation;
8420e3b6 141 unsigned short msg_count;
940a8d72
SD
142 /*
143 * short msg_len[msg_count];
904a10ed
JM
144 */
145 /*
940a8d72
SD
146 * data for messages
147 */
904a10ed 148};
940a8d72
SD
149
150/*
151 * Maximum packet size for totem pg messages
152 */
5613db03 153#define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
283790b7 154 sizeof (struct totempg_mcast))
940a8d72
SD
155
156/*
157 * Local variables used for packing small messages
158 */
5613db03 159static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
940a8d72
SD
160
161static int mcast_packed_msg_count = 0;
162
6ae6f6e2 163static int totempg_reserved = 1;
1f05ffd2 164
f5589177
SD
165static unsigned int totempg_size_limit;
166
c6895faa
AS
167static totem_queue_level_changed_fn totem_queue_level_changed = NULL;
168
71f044bf
SD
169static uint32_t totempg_threaded_mode = 0;
170
268cde6e
CC
171static void *totemsrp_context;
172
064ffb17
FT
173/*
174 * Function and data used to log messages
175 */
176static int totempg_log_level_security;
177static int totempg_log_level_error;
178static int totempg_log_level_warning;
179static int totempg_log_level_notice;
180static int totempg_log_level_debug;
0c631e24 181static int totempg_subsys_id;
6d5ce092 182static void (*totempg_log_printf) (
37e17e7a
AS
183 int level,
184 int subsys,
6d5ce092
FDN
185 const char *function,
186 const char *file,
187 int line,
37e17e7a 188 const char *format, ...) __attribute__((format(printf, 6, 7)));
064ffb17 189
5613db03
SD
190struct totem_config *totempg_totem_config;
191
73a24c03
AS
192static totempg_stats_t totempg_stats;
193
74f6309d
SD
194enum throw_away_mode {
195 THROW_AWAY_INACTIVE,
196 THROW_AWAY_ACTIVE
197};
198
940a8d72 199struct assembly {
483a9baa 200 unsigned int nodeid;
9898fc87 201 unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE];
940a8d72 202 int index;
8420e3b6 203 unsigned char last_frag_num;
74f6309d 204 enum throw_away_mode throw_away_mode;
b4c06e52 205 struct qb_list_head list;
940a8d72
SD
206};
207
ce9768ff
SD
208static void assembly_deref (struct assembly *assembly);
209
210static int callback_token_received_fn (enum totem_callback_token_type type,
2eba2a6c 211 const void *data);
ce9768ff 212
b4c06e52 213QB_LIST_DECLARE(assembly_list_inuse);
a67178db 214
600fb408
JF
215/*
216 * Free list is used both for transitional and operational assemblies
217 */
b4c06e52 218QB_LIST_DECLARE(assembly_list_free);
940a8d72 219
b4c06e52 220QB_LIST_DECLARE(assembly_list_inuse_trans);
92e0f9c7 221
b4c06e52 222QB_LIST_DECLARE(totempg_groups_list);
e920fef7 223
c720930b
MH
224/*
225 * Staging buffer for packed messages. Messages are staged in this buffer
904a10ed
JM
226 * before sending. Multiple messages may fit which cuts down on the
227 * number of mcasts sent. If a message doesn't completely fit, then
228 * the mcast header has a fragment bit set that says that there are more
c720930b 229 * data to follow. fragment_size is an index into the buffer. It indicates
904a10ed
JM
230 * the size of message data and where to place new message data.
231 * fragment_contuation indicates whether the first packed message in
c720930b
MH
232 * the buffer is a continuation of a previously packed fragment.
233 */
a7f4b6d8 234static unsigned char *fragmentation_data;
90c88404
SD
235
236static int fragment_size = 0;
237
238static int fragment_continuation = 0;
940a8d72 239
92e0f9c7
JF
240static int totempg_waiting_transack = 0;
241
90c88404
SD
242struct totempg_group_instance {
243 void (*deliver_fn) (
483a9baa 244 unsigned int nodeid,
20d1d5fa
SD
245 const void *msg,
246 unsigned int msg_len,
90c88404
SD
247 int endian_conversion_required);
248
249 void (*confchg_fn) (
250 enum totem_configuration_type configuration_type,
3a32139c
JM
251 const unsigned int *member_list, size_t member_list_entries,
252 const unsigned int *left_list, size_t left_list_entries,
253 const unsigned int *joined_list, size_t joined_list_entries,
254 const struct memb_ring_id *ring_id);
90c88404
SD
255
256 struct totempg_group *groups;
257
258 int groups_cnt;
c6895faa 259 int32_t q_level;
e920fef7 260
b4c06e52 261 struct qb_list_head list;
90c88404
SD
262};
263
536fab94
SD
264static unsigned char next_fragment = 1;
265
266static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
267
268static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
269
2439b397
SD
270static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
271
37e17e7a
AS
272#define log_printf(level, format, args...) \
273do { \
274 totempg_log_printf(level, \
275 totempg_subsys_id, \
276 __FUNCTION__, __FILE__, __LINE__, \
277 format, ##args); \
0c631e24 278} while (0);
536fab94 279
1f05ffd2
SD
280static int msg_count_send_ok (int msg_count);
281
282static int byte_count_send_ok (int byte_count);
283
92e0f9c7
JF
284static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
285{
286 log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
287 totempg_waiting_transack = waiting_trans_ack;
288}
289
a67178db 290static struct assembly *assembly_ref (unsigned int nodeid)
940a8d72 291{
a67178db 292 struct assembly *assembly;
b4c06e52
MJ
293 struct qb_list_head *list;
294 struct qb_list_head *active_assembly_list_inuse;
92e0f9c7
JF
295
296 if (totempg_waiting_transack) {
297 active_assembly_list_inuse = &assembly_list_inuse_trans;
92e0f9c7
JF
298 } else {
299 active_assembly_list_inuse = &assembly_list_inuse;
92e0f9c7 300 }
a67178db
SD
301
302 /*
303 * Search inuse list for node id and return assembly buffer if found
304 */
1f90c31b 305 qb_list_for_each(list, active_assembly_list_inuse) {
b4c06e52 306 assembly = qb_list_entry (list, struct assembly, list);
940a8d72 307
a67178db
SD
308 if (nodeid == assembly->nodeid) {
309 return (assembly);
940a8d72
SD
310 }
311 }
a67178db
SD
312
313 /*
314 * Nothing found in inuse list get one from free list if available
315 */
b4c06e52 316 if (qb_list_empty (&assembly_list_free) == 0) {
f5dcc4a5 317 assembly = qb_list_first_entry (&assembly_list_free, struct assembly, list);
b4c06e52
MJ
318 qb_list_del (&assembly->list);
319 qb_list_add (&assembly->list, active_assembly_list_inuse);
a67178db 320 assembly->nodeid = nodeid;
74f6309d
SD
321 assembly->index = 0;
322 assembly->last_frag_num = 0;
323 assembly->throw_away_mode = THROW_AWAY_INACTIVE;
a67178db
SD
324 return (assembly);
325 }
326
327 /*
328 * Nothing available in inuse or free list, so allocate a new one
329 */
330 assembly = malloc (sizeof (struct assembly));
a67178db
SD
331 /*
332 * TODO handle memory allocation failure here
333 */
334 assert (assembly);
335 assembly->nodeid = nodeid;
5ac04efe 336 assembly->data[0] = 0;
74f6309d
SD
337 assembly->index = 0;
338 assembly->last_frag_num = 0;
339 assembly->throw_away_mode = THROW_AWAY_INACTIVE;
b4c06e52
MJ
340 qb_list_init (&assembly->list);
341 qb_list_add (&assembly->list, active_assembly_list_inuse);
a67178db
SD
342
343 return (assembly);
344}
345
ce9768ff 346static void assembly_deref (struct assembly *assembly)
a67178db 347{
b4c06e52
MJ
348 qb_list_del (&assembly->list);
349 qb_list_add (&assembly->list, &assembly_list_free);
92e0f9c7
JF
350}
351
352static void assembly_deref_from_normal_and_trans (int nodeid)
353{
354 int j;
1f90c31b 355 struct qb_list_head *list, *tmp_iter;
b4c06e52 356 struct qb_list_head *active_assembly_list_inuse;
92e0f9c7
JF
357 struct assembly *assembly;
358
359 for (j = 0; j < 2; j++) {
360 if (j == 0) {
361 active_assembly_list_inuse = &assembly_list_inuse;
92e0f9c7
JF
362 } else {
363 active_assembly_list_inuse = &assembly_list_inuse_trans;
92e0f9c7
JF
364 }
365
1f90c31b 366 qb_list_for_each_safe(list, tmp_iter, active_assembly_list_inuse) {
b4c06e52 367 assembly = qb_list_entry (list, struct assembly, list);
92e0f9c7
JF
368
369 if (nodeid == assembly->nodeid) {
b4c06e52
MJ
370 qb_list_del (&assembly->list);
371 qb_list_add (&assembly->list, &assembly_list_free);
92e0f9c7
JF
372 }
373 }
374 }
375
940a8d72
SD
376}
377
90c88404
SD
378static inline void app_confchg_fn (
379 enum totem_configuration_type configuration_type,
2eba2a6c
JM
380 const unsigned int *member_list, size_t member_list_entries,
381 const unsigned int *left_list, size_t left_list_entries,
382 const unsigned int *joined_list, size_t joined_list_entries,
383 const struct memb_ring_id *ring_id)
90c88404
SD
384{
385 int i;
90c88404 386 struct totempg_group_instance *instance;
b4c06e52 387 struct qb_list_head *list;
90c88404 388
5780b2b6
SD
389 /*
390 * For every leaving processor, add to free list
391 * This also has the side effect of clearing out the dataset
392 * In the leaving processor's assembly buffer.
393 */
394 for (i = 0; i < left_list_entries; i++) {
92e0f9c7 395 assembly_deref_from_normal_and_trans (left_list[i]);
5780b2b6 396 }
90c88404 397
1f90c31b 398 qb_list_for_each(list, &totempg_groups_list) {
b4c06e52 399 instance = qb_list_entry (list, struct totempg_group_instance, list);
e920fef7
SD
400
401 if (instance->confchg_fn) {
402 instance->confchg_fn (
403 configuration_type,
404 member_list,
405 member_list_entries,
406 left_list,
407 left_list_entries,
408 joined_list,
409 joined_list_entries,
410 ring_id);
90c88404
SD
411 }
412 }
413}
a67178db 414
7b68e9aa 415static inline void group_endian_convert (
20d1d5fa
SD
416 void *msg,
417 int msg_len)
7b68e9aa
SD
418{
419 unsigned short *group_len;
420 int i;
20d1d5fa 421 char *aligned_msg;
7b68e9aa 422
77d98081 423#ifdef TOTEMPG_NEED_ALIGN
aaf6948a 424 /*
77d98081 425 * Align data structure for not i386 or x86_64
aaf6948a 426 */
20d1d5fa
SD
427 if ((size_t)msg % 4 != 0) {
428 aligned_msg = alloca(msg_len);
429 memcpy(aligned_msg, msg, msg_len);
aaf6948a 430 } else {
20d1d5fa 431 aligned_msg = msg;
aaf6948a 432 }
77d98081
JF
433#else
434 aligned_msg = msg;
435#endif
aaf6948a 436
20d1d5fa 437 group_len = (unsigned short *)aligned_msg;
7b68e9aa
SD
438 group_len[0] = swab16(group_len[0]);
439 for (i = 1; i < group_len[0] + 1; i++) {
440 group_len[i] = swab16(group_len[i]);
441 }
442
20d1d5fa
SD
443 if (aligned_msg != msg) {
444 memcpy(msg, aligned_msg, msg_len);
aaf6948a 445 }
7b68e9aa
SD
446}
447
90c88404
SD
448static inline int group_matches (
449 struct iovec *iovec,
450 unsigned int iov_len,
451 struct totempg_group *groups_b,
452 unsigned int group_b_cnt,
453 unsigned int *adjust_iovec)
454{
455 unsigned short *group_len;
456 char *group_name;
457 int i;
458 int j;
77d98081 459#ifdef TOTEMPG_NEED_ALIGN
a7f4b6d8 460 struct iovec iovec_aligned = { NULL, 0 };
77d98081 461#endif
904a10ed 462
90c88404
SD
463 assert (iov_len == 1);
464
77d98081 465#ifdef TOTEMPG_NEED_ALIGN
a7f4b6d8 466 /*
77d98081 467 * Align data structure for not i386 or x86_64
a7f4b6d8 468 */
90ccff6b
SD
469 if ((size_t)iovec->iov_base % 4 != 0) {
470 iovec_aligned.iov_base = alloca(iovec->iov_len);
aaf6948a
FDN
471 memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
472 iovec_aligned.iov_len = iovec->iov_len;
90ccff6b
SD
473 iovec = &iovec_aligned;
474 }
77d98081 475#endif
90ccff6b 476
90c88404
SD
477 group_len = (unsigned short *)iovec->iov_base;
478 group_name = ((char *)iovec->iov_base) +
479 sizeof (unsigned short) * (group_len[0] + 1);
480
a7f4b6d8 481
90c88404
SD
482 /*
483 * Calculate amount to adjust the iovec by before delivering to app
484 */
485 *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
486 for (i = 1; i < group_len[0] + 1; i++) {
487 *adjust_iovec += group_len[i];
488 }
489
490 /*
491 * Determine if this message should be delivered to this instance
492 */
493 for (i = 1; i < group_len[0] + 1; i++) {
494 for (j = 0; j < group_b_cnt; j++) {
495 if ((group_len[i] == groups_b[j].group_len) &&
496 (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
497 return (1);
498 }
499 }
500 group_name += group_len[i];
501 }
502 return (0);
503}
904a10ed 504
90c88404
SD
505
506static inline void app_deliver_fn (
483a9baa 507 unsigned int nodeid,
20d1d5fa
SD
508 void *msg,
509 unsigned int msg_len,
90c88404
SD
510 int endian_conversion_required)
511{
90c88404
SD
512 struct totempg_group_instance *instance;
513 struct iovec stripped_iovec;
514 unsigned int adjust_iovec;
20d1d5fa 515 struct iovec *iovec;
b4c06e52 516 struct qb_list_head *list;
20d1d5fa 517
a7f4b6d8 518 struct iovec aligned_iovec = { NULL, 0 };
90c88404 519
7b68e9aa 520 if (endian_conversion_required) {
20d1d5fa 521 group_endian_convert (msg, msg_len);
7b68e9aa 522 }
a7f4b6d8
SD
523
524 /*
77d98081
JF
525 * TODO: segmentation/assembly need to be redesigned to provide aligned access
526 * in all cases to avoid memory copies on non386 archs. Probably broke backwars
527 * compatibility
528 */
529
530#ifdef TOTEMPG_NEED_ALIGN
531 /*
532 * Align data structure for not i386 or x86_64
a7f4b6d8 533 */
20d1d5fa
SD
534 aligned_iovec.iov_base = alloca(msg_len);
535 aligned_iovec.iov_len = msg_len;
536 memcpy(aligned_iovec.iov_base, msg, msg_len);
77d98081
JF
537#else
538 aligned_iovec.iov_base = msg;
539 aligned_iovec.iov_len = msg_len;
540#endif
541
a7f4b6d8
SD
542 iovec = &aligned_iovec;
543
1f90c31b 544 qb_list_for_each(list, &totempg_groups_list) {
b4c06e52 545 instance = qb_list_entry (list, struct totempg_group_instance, list);
e920fef7
SD
546 if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
547 stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
548 stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
a7f4b6d8 549
77d98081 550#ifdef TOTEMPG_NEED_ALIGN
e920fef7
SD
551 /*
552 * Align data structure for not i386 or x86_64
553 */
554 if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) {
a7f4b6d8 555 /*
e920fef7 556 * Deal with misalignment
a7f4b6d8 557 */
e920fef7
SD
558 stripped_iovec.iov_base =
559 alloca (stripped_iovec.iov_len);
560 memcpy (stripped_iovec.iov_base,
561 (char *)iovec->iov_base + adjust_iovec,
562 stripped_iovec.iov_len);
90c88404 563 }
e920fef7
SD
564#endif
565 instance->deliver_fn (
566 nodeid,
567 stripped_iovec.iov_base,
568 stripped_iovec.iov_len,
569 endian_conversion_required);
90c88404
SD
570 }
571 }
572}
a7f4b6d8 573
940a8d72 574static void totempg_confchg_fn (
20f0520e 575 enum totem_configuration_type configuration_type,
3a32139c
JM
576 const unsigned int *member_list, size_t member_list_entries,
577 const unsigned int *left_list, size_t left_list_entries,
578 const unsigned int *joined_list, size_t joined_list_entries,
579 const struct memb_ring_id *ring_id)
940a8d72 580{
a67178db 581// TODO optimize this
940a8d72 582 app_confchg_fn (configuration_type,
c3e125b1
SD
583 member_list, member_list_entries,
584 left_list, left_list_entries,
585 joined_list, joined_list_entries,
586 ring_id);
940a8d72
SD
587}
588
589static void totempg_deliver_fn (
483a9baa 590 unsigned int nodeid,
20d1d5fa
SD
591 const void *msg,
592 unsigned int msg_len,
940a8d72
SD
593 int endian_conversion_required)
594{
595 struct totempg_mcast *mcast;
596 unsigned short *msg_lens;
597 int i;
598 struct assembly *assembly;
5613db03 599 char header[FRAME_SIZE_MAX];
940a8d72 600 int msg_count;
c720930b 601 int continuation;
8420e3b6 602 int start;
20d1d5fa
SD
603 const char *data;
604 int datasize;
0e3d1a9c 605 struct iovec iov_delv;
bd11a338 606 size_t expected_msg_len;
940a8d72 607
a67178db 608 assembly = assembly_ref (nodeid);
940a8d72
SD
609 assert (assembly);
610
bd11a338
JF
611 if (msg_len < sizeof(struct totempg_mcast)) {
612 log_printf(LOG_WARNING,
613 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID " is too short... Ignoring.", nodeid);
614
615 return ;
616 }
617
940a8d72 618 /*
c720930b
MH
619 * Assemble the header into one block of data and
620 * assemble the packet contents into one block of data to simplify delivery
940a8d72 621 */
7b68e9aa 622
20d1d5fa
SD
623 mcast = (struct totempg_mcast *)msg;
624 if (endian_conversion_required) {
625 mcast->msg_count = swab16 (mcast->msg_count);
626 }
940a8d72 627
20d1d5fa
SD
628 msg_count = mcast->msg_count;
629 datasize = sizeof (struct totempg_mcast) +
630 msg_count * sizeof (unsigned short);
940a8d72 631
bd11a338
JF
632 if (msg_len < datasize) {
633 log_printf(LOG_WARNING,
634 "Message (totempg_mcast datasize) received from node " CS_PRI_NODE_ID
635 " is too short... Ignoring.", nodeid);
636
637 return ;
638 }
639
20d1d5fa
SD
640 memcpy (header, msg, datasize);
641 data = msg;
940a8d72 642
20d1d5fa 643 msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
bd11a338
JF
644 expected_msg_len = datasize;
645 for (i = 0; i < mcast->msg_count; i++) {
646 if (endian_conversion_required) {
20d1d5fa 647 msg_lens[i] = swab16 (msg_lens[i]);
940a8d72 648 }
bd11a338
JF
649
650 expected_msg_len += msg_lens[i];
651 }
652
653 if (msg_len != expected_msg_len) {
654 log_printf(LOG_WARNING,
655 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID
656 " doesn't have expected length of %zu (has %u) bytes... Ignoring.",
657 nodeid, expected_msg_len, msg_len);
658
659 return ;
940a8d72
SD
660 }
661
9898fc87 662 assert((assembly->index+msg_len) < sizeof(assembly->data));
20d1d5fa
SD
663 memcpy (&assembly->data[assembly->index], &data[datasize],
664 msg_len - datasize);
665
940a8d72 666 /*
c720930b
MH
667 * If the last message in the buffer is a fragment, then we
668 * can't deliver it. We'll first deliver the full messages
904a10ed 669 * then adjust the assembly buffer so we can add the rest of the
c720930b 670 * fragment when it arrives.
940a8d72 671 */
c720930b
MH
672 msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
673 continuation = mcast->continuation;
b8e3951c 674 iov_delv.iov_base = (void *)&assembly->data[0];
c720930b 675 iov_delv.iov_len = assembly->index + msg_lens[0];
940a8d72 676
8420e3b6
SD
677 /*
678 * Make sure that if this message is a continuation, that it
679 * matches the sequence number of the previous fragment.
680 * Also, if the first packed message is a continuation
681 * of a previous message, but the assembly buffer
682 * is empty, then we need to discard it since we can't
904a10ed 683 * assemble a complete message. Likewise, if this message isn't a
8420e3b6
SD
684 * continuation and the assembly buffer is empty, we have to discard
685 * the continued message.
686 */
687 start = 0;
8420e3b6 688
74f6309d 689 if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) {
3442f911
SD
690 /* Throw away the first msg block */
691 if (mcast->fragmented == 0 || mcast->fragmented == 1) {
74f6309d 692 assembly->throw_away_mode = THROW_AWAY_INACTIVE;
8420e3b6 693
8420e3b6 694 assembly->index += msg_lens[0];
b8e3951c 695 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
8420e3b6
SD
696 iov_delv.iov_len = msg_lens[1];
697 start = 1;
698 }
904a10ed 699 } else
74f6309d 700 if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) {
3442f911
SD
701 if (continuation == assembly->last_frag_num) {
702 assembly->last_frag_num = mcast->fragmented;
703 for (i = start; i < msg_count; i++) {
20d1d5fa 704 app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
3442f911
SD
705 endian_conversion_required);
706 assembly->index += msg_lens[i];
b8e3951c 707 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
3442f911
SD
708 if (i < (msg_count - 1)) {
709 iov_delv.iov_len = msg_lens[i + 1];
710 }
711 }
712 } else {
92e0f9c7
JF
713 log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
714 continuation, assembly->last_frag_num);
74f6309d 715 assembly->throw_away_mode = THROW_AWAY_ACTIVE;
940a8d72 716 }
c720930b
MH
717 }
718
a67178db
SD
719 if (mcast->fragmented == 0) {
720 /*
721 * End of messages, dereference assembly struct
722 */
723 assembly->last_frag_num = 0;
724 assembly->index = 0;
725 assembly_deref (assembly);
726 } else {
727 /*
728 * Message is fragmented, keep around assembly list
729 */
940a8d72
SD
730 if (mcast->msg_count > 1) {
731 memmove (&assembly->data[0],
732 &assembly->data[assembly->index],
c720930b 733 msg_lens[msg_count]);
940a8d72
SD
734
735 assembly->index = 0;
736 }
c720930b 737 assembly->index += msg_lens[msg_count];
940a8d72
SD
738 }
739}
740
741/*
742 * Totem Process Group Abstraction
743 * depends on poll abstraction, POSIX, IPV4
744 */
940a8d72
SD
745
746void *callback_token_received_handle;
747
20f0520e 748int callback_token_received_fn (enum totem_callback_token_type type,
2eba2a6c 749 const void *data)
940a8d72
SD
750{
751 struct totempg_mcast mcast;
752 struct iovec iovecs[3];
940a8d72 753
71f044bf
SD
754 if (totempg_threaded_mode == 1) {
755 pthread_mutex_lock (&mcast_msg_mutex);
756 }
940a8d72 757 if (mcast_packed_msg_count == 0) {
71f044bf
SD
758 if (totempg_threaded_mode == 1) {
759 pthread_mutex_unlock (&mcast_msg_mutex);
760 }
940a8d72
SD
761 return (0);
762 }
268cde6e 763 if (totemsrp_avail(totemsrp_context) == 0) {
71f044bf
SD
764 if (totempg_threaded_mode == 1) {
765 pthread_mutex_unlock (&mcast_msg_mutex);
766 }
b879b390
SD
767 return (0);
768 }
176d80ea 769 mcast.header.version = 0;
6e990d20 770 mcast.header.type = 0;
940a8d72 771 mcast.fragmented = 0;
c720930b
MH
772
773 /*
774 * Was the first message in this buffer a continuation of a
775 * fragmented message?
776 */
777 mcast.continuation = fragment_continuation;
778 fragment_continuation = 0;
779
940a8d72
SD
780 mcast.msg_count = mcast_packed_msg_count;
781
b8e3951c 782 iovecs[0].iov_base = (void *)&mcast;
940a8d72 783 iovecs[0].iov_len = sizeof (struct totempg_mcast);
b8e3951c 784 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
940a8d72 785 iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
b8e3951c 786 iovecs[2].iov_base = (void *)&fragmentation_data[0];
940a8d72 787 iovecs[2].iov_len = fragment_size;
268cde6e 788 (void)totemsrp_mcast (totemsrp_context, iovecs, 3, 0);
940a8d72
SD
789
790 mcast_packed_msg_count = 0;
791 fragment_size = 0;
792
71f044bf
SD
793 if (totempg_threaded_mode == 1) {
794 pthread_mutex_unlock (&mcast_msg_mutex);
795 }
940a8d72
SD
796 return (0);
797}
798
799/*
800 * Initialize the totem process group abstraction
801 */
802int totempg_initialize (
fce8a3c3 803 qb_loop_t *poll_handle,
90c88404 804 struct totem_config *totem_config)
940a8d72
SD
805{
806 int res;
807
5613db03 808 totempg_totem_config = totem_config;
064ffb17
FT
809 totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
810 totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
811 totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
812 totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
813 totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
814 totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
0c631e24 815 totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
5613db03 816
90181d3d
SD
817 fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
818 if (fragmentation_data == 0) {
819 return (-1);
820 }
821
f5589177
SD
822 totemsrp_net_mtu_adjust (totem_config);
823
268cde6e 824 res = totemsrp_initialize (
940a8d72 825 poll_handle,
268cde6e 826 &totemsrp_context,
283790b7 827 totem_config,
73a24c03 828 &totempg_stats,
175ead19 829 totempg_deliver_fn,
92e0f9c7
JF
830 totempg_confchg_fn,
831 totempg_waiting_trans_ack_cb);
283790b7 832
564b4bf7
JF
833 if (res == -1) {
834 goto error_exit;
835 }
836
268cde6e
CC
837 totemsrp_callback_token_create (
838 totemsrp_context,
904a10ed 839 &callback_token_received_handle,
20f0520e 840 TOTEM_CALLBACK_TOKEN_RECEIVED,
940a8d72
SD
841 0,
842 callback_token_received_fn,
843 0);
844
268cde6e 845 totempg_size_limit = (totemsrp_avail(totemsrp_context) - 1) *
f5589177
SD
846 (totempg_totem_config->net_mtu -
847 sizeof (struct totempg_mcast) - 16);
5613db03 848
b4c06e52 849 qb_list_init (&totempg_groups_list);
e920fef7 850
564b4bf7 851error_exit:
940a8d72
SD
852 return (res);
853}
854
283790b7
SD
855void totempg_finalize (void)
856{
71f044bf
SD
857 if (totempg_threaded_mode == 1) {
858 pthread_mutex_lock (&totempg_mutex);
859 }
268cde6e 860 totemsrp_finalize (totemsrp_context);
71f044bf
SD
861 if (totempg_threaded_mode == 1) {
862 pthread_mutex_unlock (&totempg_mutex);
863 }
283790b7
SD
864}
865
940a8d72
SD
866/*
867 * Multicast a message
868 */
90c88404 869static int mcast_msg (
69b80c71 870 struct iovec *iovec_in,
36eefbad 871 unsigned int iov_len,
5ab2ec0d 872 int guarantee)
940a8d72
SD
873{
874 int res = 0;
875 struct totempg_mcast mcast;
6ea4afe6 876 struct iovec iovecs[3];
69b80c71 877 struct iovec iovec[64];
940a8d72 878 int i;
69b80c71 879 int dest, src;
940a8d72 880 int max_packet_size = 0;
904a10ed 881 int copy_len = 0;
ce60bbfc 882 int copy_base = 0;
5613db03 883 int total_size = 0;
940a8d72 884
71f044bf
SD
885 if (totempg_threaded_mode == 1) {
886 pthread_mutex_lock (&mcast_msg_mutex);
887 }
268cde6e 888 totemsrp_event_signal (totemsrp_context, TOTEM_EVENT_NEW_MSG, 1);
f047d8f4 889
69b80c71
SD
890 /*
891 * Remove zero length iovectors from the list
892 */
893 assert (iov_len < 64);
894 for (dest = 0, src = 0; src < iov_len; src++) {
895 if (iovec_in[src].iov_len) {
896 memcpy (&iovec[dest++], &iovec_in[src],
897 sizeof (struct iovec));
898 }
899 }
900 iov_len = dest;
901
940a8d72
SD
902 max_packet_size = TOTEMPG_PACKET_SIZE -
903 (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
904
ce60bbfc
SD
905 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
906
90181d3d
SD
907 /*
908 * Check if we would overwrite new message queue
909 */
5613db03
SD
910 for (i = 0; i < iov_len; i++) {
911 total_size += iovec[i].iov_len;
912 }
913
1f05ffd2 914 if (byte_count_send_ok (total_size + sizeof(unsigned short) *
f5589177 915 (mcast_packed_msg_count)) == 0) {
90181d3d 916
71f044bf
SD
917 if (totempg_threaded_mode == 1) {
918 pthread_mutex_unlock (&mcast_msg_mutex);
919 }
90181d3d
SD
920 return(-1);
921 }
922
6ba9870f
JF
923 memset(&mcast, 0, sizeof(mcast));
924
176d80ea 925 mcast.header.version = 0;
ce60bbfc
SD
926 for (i = 0; i < iov_len; ) {
927 mcast.fragmented = 0;
c720930b 928 mcast.continuation = fragment_continuation;
ce60bbfc 929 copy_len = iovec[i].iov_len - copy_base;
940a8d72
SD
930
931 /*
ce60bbfc 932 * If it all fits with room left over, copy it in.
e6a0eca1
SD
933 * We need to leave at least sizeof(short) + 1 bytes in the
934 * fragment_buffer on exit so that max_packet_size + fragment_size
935 * doesn't exceed the size of the fragment_buffer on the next call.
940a8d72 936 */
0ebae6b4 937 if ((iovec[i].iov_len + fragment_size) <
283790b7
SD
938 (max_packet_size - sizeof (unsigned short))) {
939
ce60bbfc 940 memcpy (&fragmentation_data[fragment_size],
105f3ae9
RK
941 (char *)iovec[i].iov_base + copy_base, copy_len);
942 fragment_size += copy_len;
943 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
3442f911 944 next_fragment = 1;
ce60bbfc
SD
945 copy_len = 0;
946 copy_base = 0;
947 i++;
948 continue;
940a8d72
SD
949
950 /*
ce60bbfc 951 * If it just fits or is too big, then send out what fits.
940a8d72 952 */
ce60bbfc 953 } else {
a7f4b6d8 954 unsigned char *data_ptr;
27b272c4 955
e6a0eca1 956 copy_len = min(copy_len, max_packet_size - fragment_size);
6ea4afe6 957 if( copy_len == max_packet_size )
48f96e68 958 data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
6ea4afe6 959 else {
27b272c4 960 data_ptr = fragmentation_data;
27b272c4
SD
961 }
962
6ea4afe6
SD
963 memcpy (&fragmentation_data[fragment_size],
964 (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
ce60bbfc
SD
965 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
966
967 /*
968 * if we're not on the last iovec or the iovec is too large to
c720930b
MH
969 * fit, then indicate a fragment. This also means that the next
970 * message will have the continuation of this one.
ce60bbfc 971 */
904a10ed 972 if ((i < (iov_len - 1)) ||
ce60bbfc 973 ((copy_base + copy_len) < iovec[i].iov_len)) {
8420e3b6
SD
974 if (!next_fragment) {
975 next_fragment++;
976 }
977 fragment_continuation = next_fragment;
978 mcast.fragmented = next_fragment++;
979 assert(fragment_continuation != 0);
980 assert(mcast.fragmented != 0);
c720930b
MH
981 } else {
982 fragment_continuation = 0;
940a8d72
SD
983 }
984
ce60bbfc
SD
985 /*
986 * assemble the message and send it
987 */
988 mcast.msg_count = ++mcast_packed_msg_count;
b8e3951c 989 iovecs[0].iov_base = (void *)&mcast;
ce60bbfc 990 iovecs[0].iov_len = sizeof(struct totempg_mcast);
b8e3951c 991 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
904a10ed 992 iovecs[1].iov_len = mcast_packed_msg_count *
90c88404 993 sizeof(unsigned short);
b8e3951c 994 iovecs[2].iov_base = (void *)data_ptr;
0ebae6b4 995 iovecs[2].iov_len = fragment_size + copy_len;
268cde6e
CC
996 assert (totemsrp_avail(totemsrp_context) > 0);
997 res = totemsrp_mcast (totemsrp_context, iovecs, 3, guarantee);
f5589177
SD
998 if (res == -1) {
999 goto error_exit;
1000 }
940a8d72 1001
ce60bbfc
SD
1002 /*
1003 * Recalculate counts and indexes for the next.
1004 */
1005 mcast_packed_msg_lens[0] = 0;
940a8d72
SD
1006 mcast_packed_msg_count = 0;
1007 fragment_size = 0;
ce60bbfc
SD
1008 max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
1009
1010 /*
1011 * If the iovec all fit, go to the next iovec
1012 */
1013 if ((copy_base + copy_len) == iovec[i].iov_len) {
1014 copy_len = 0;
1015 copy_base = 0;
1016 i++;
904a10ed 1017
ce60bbfc
SD
1018 /*
1019 * Continue with the rest of the current iovec.
1020 */
1021 } else {
1022 copy_base += copy_len;
1023 }
940a8d72 1024 }
ce60bbfc
SD
1025 }
1026
1027 /*
1028 * Bump only if we added message data. This may be zero if
1029 * the last buffer just fit into the fragmentation_data buffer
1030 * and we were at the last iovec.
1031 */
1032 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1033 mcast_packed_msg_count++;
940a8d72
SD
1034 }
1035
f5589177 1036error_exit:
71f044bf
SD
1037 if (totempg_threaded_mode == 1) {
1038 pthread_mutex_unlock (&mcast_msg_mutex);
1039 }
940a8d72
SD
1040 return (res);
1041}
1042
940a8d72
SD
1043/*
1044 * Determine if a message of msg_size could be queued
1045 */
1f05ffd2
SD
1046static int msg_count_send_ok (
1047 int msg_count)
940a8d72
SD
1048{
1049 int avail = 0;
5613db03 1050
268cde6e 1051 avail = totemsrp_avail (totemsrp_context);
04f37df2 1052 totempg_stats.msg_queue_avail = avail;
904a10ed 1053
6ae6f6e2 1054 return ((avail - totempg_reserved) > msg_count);
1f05ffd2
SD
1055}
1056
1057static int byte_count_send_ok (
1058 int byte_count)
1059{
1060 unsigned int msg_count = 0;
1061 int avail = 0;
1062
268cde6e 1063 avail = totemsrp_avail (totemsrp_context);
1f05ffd2 1064
519616de 1065 msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1f05ffd2 1066
519616de 1067 return (avail >= msg_count);
1f05ffd2
SD
1068}
1069
1070static int send_reserve (
1071 int msg_size)
1072{
1073 unsigned int msg_count = 0;
1074
f5589177 1075 msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1f05ffd2 1076 totempg_reserved += msg_count;
04f37df2 1077 totempg_stats.msg_reserved = totempg_reserved;
1f05ffd2
SD
1078
1079 return (msg_count);
1080}
5613db03 1081
1f05ffd2
SD
1082static void send_release (
1083 int msg_count)
1084{
1085 totempg_reserved -= msg_count;
04f37df2 1086 totempg_stats.msg_reserved = totempg_reserved;
940a8d72 1087}
90c88404 1088
2ba4ebe0
AS
1089#ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1090#undef MESSAGE_QUEUE_MAX
1091#define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1092#endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1093
1094static uint32_t q_level_precent_used(void)
1095{
268cde6e 1096 return (100 - (((totemsrp_avail(totemsrp_context) - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX));
2ba4ebe0
AS
1097}
1098
175ead19
SD
1099int totempg_callback_token_create (
1100 void **handle_out,
1101 enum totem_callback_token_type type,
1102 int delete,
2eba2a6c
JM
1103 int (*callback_fn) (enum totem_callback_token_type type, const void *),
1104 const void *data)
175ead19 1105{
536fab94 1106 unsigned int res;
71f044bf
SD
1107 if (totempg_threaded_mode == 1) {
1108 pthread_mutex_lock (&callback_token_mutex);
1109 }
268cde6e 1110 res = totemsrp_callback_token_create (totemsrp_context, handle_out, type, delete,
536fab94 1111 callback_fn, data);
71f044bf
SD
1112 if (totempg_threaded_mode == 1) {
1113 pthread_mutex_unlock (&callback_token_mutex);
1114 }
536fab94 1115 return (res);
175ead19
SD
1116}
1117
1118void totempg_callback_token_destroy (
1119 void *handle_out)
1120{
71f044bf
SD
1121 if (totempg_threaded_mode == 1) {
1122 pthread_mutex_lock (&callback_token_mutex);
1123 }
268cde6e 1124 totemsrp_callback_token_destroy (totemsrp_context, handle_out);
71f044bf
SD
1125 if (totempg_threaded_mode == 1) {
1126 pthread_mutex_unlock (&callback_token_mutex);
1127 }
175ead19
SD
1128}
1129
e6a0eca1
SD
1130/*
1131 * vi: set autoindent tabstop=4 shiftwidth=4 :
1132 */
90c88404
SD
1133
1134int totempg_groups_initialize (
e920fef7 1135 void **totempg_groups_instance,
90c88404
SD
1136
1137 void (*deliver_fn) (
483a9baa 1138 unsigned int nodeid,
20d1d5fa
SD
1139 const void *msg,
1140 unsigned int msg_len,
90c88404
SD
1141 int endian_conversion_required),
1142
1143 void (*confchg_fn) (
1144 enum totem_configuration_type configuration_type,
3a32139c
JM
1145 const unsigned int *member_list, size_t member_list_entries,
1146 const unsigned int *left_list, size_t left_list_entries,
1147 const unsigned int *joined_list, size_t joined_list_entries,
1148 const struct memb_ring_id *ring_id))
90c88404 1149{
90c88404
SD
1150 struct totempg_group_instance *instance;
1151
71f044bf
SD
1152 if (totempg_threaded_mode == 1) {
1153 pthread_mutex_lock (&totempg_mutex);
1154 }
e920fef7
SD
1155
1156 instance = malloc (sizeof (struct totempg_group_instance));
1157 if (instance == NULL) {
90c88404
SD
1158 goto error_exit;
1159 }
1160
90c88404
SD
1161 instance->deliver_fn = deliver_fn;
1162 instance->confchg_fn = confchg_fn;
1163 instance->groups = 0;
1164 instance->groups_cnt = 0;
c6895faa 1165 instance->q_level = QB_LOOP_MED;
b4c06e52
MJ
1166 qb_list_init (&instance->list);
1167 qb_list_add (&instance->list, &totempg_groups_list);
90c88404 1168
71f044bf
SD
1169 if (totempg_threaded_mode == 1) {
1170 pthread_mutex_unlock (&totempg_mutex);
1171 }
e920fef7 1172 *totempg_groups_instance = instance;
90c88404 1173 return (0);
90c88404
SD
1174
1175error_exit:
71f044bf
SD
1176 if (totempg_threaded_mode == 1) {
1177 pthread_mutex_unlock (&totempg_mutex);
1178 }
90c88404
SD
1179 return (-1);
1180}
1181
1182int totempg_groups_join (
e920fef7 1183 void *totempg_groups_instance,
e7fa045d
JM
1184 const struct totempg_group *groups,
1185 size_t group_cnt)
90c88404 1186{
e920fef7 1187 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
90c88404 1188 struct totempg_group *new_groups;
9c5b39d4 1189 int res = 0;
90c88404 1190
71f044bf
SD
1191 if (totempg_threaded_mode == 1) {
1192 pthread_mutex_lock (&totempg_mutex);
1193 }
1194
90c88404
SD
1195 new_groups = realloc (instance->groups,
1196 sizeof (struct totempg_group) *
1197 (instance->groups_cnt + group_cnt));
a7f4b6d8 1198 if (new_groups == 0) {
9c5b39d4 1199 res = -1;
90c88404
SD
1200 goto error_exit;
1201 }
904a10ed 1202 memcpy (&new_groups[instance->groups_cnt],
90c88404
SD
1203 groups, group_cnt * sizeof (struct totempg_group));
1204 instance->groups = new_groups;
6553cf14 1205 instance->groups_cnt += group_cnt;
90c88404 1206
90c88404 1207error_exit:
71f044bf
SD
1208 if (totempg_threaded_mode == 1) {
1209 pthread_mutex_unlock (&totempg_mutex);
1210 }
f6cfe4e8 1211 return (res);
90c88404
SD
1212}
1213
1214int totempg_groups_leave (
e920fef7 1215 void *totempg_groups_instance,
e7fa045d
JM
1216 const struct totempg_group *groups,
1217 size_t group_cnt)
90c88404 1218{
71f044bf
SD
1219 if (totempg_threaded_mode == 1) {
1220 pthread_mutex_lock (&totempg_mutex);
1221 }
90c88404 1222
71f044bf
SD
1223 if (totempg_threaded_mode == 1) {
1224 pthread_mutex_unlock (&totempg_mutex);
1225 }
e920fef7 1226 return (0);
90c88404
SD
1227}
1228
1229#define MAX_IOVECS_FROM_APP 32
1230#define MAX_GROUPS_PER_MSG 32
1231
1232int totempg_groups_mcast_joined (
e920fef7 1233 void *totempg_groups_instance,
921a5a84 1234 const struct iovec *iovec,
36eefbad 1235 unsigned int iov_len,
90c88404
SD
1236 int guarantee)
1237{
e920fef7 1238 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
90c88404
SD
1239 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1240 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1241 int i;
f6cfe4e8 1242 unsigned int res;
90c88404 1243
71f044bf
SD
1244 if (totempg_threaded_mode == 1) {
1245 pthread_mutex_lock (&totempg_mutex);
1246 }
1247
90c88404
SD
1248 /*
1249 * Build group_len structure and the iovec_mcast structure
1250 */
1251 group_len[0] = instance->groups_cnt;
1252 for (i = 0; i < instance->groups_cnt; i++) {
1253 group_len[i + 1] = instance->groups[i].group_len;
1254 iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
2eba2a6c 1255 iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
90c88404
SD
1256 }
1257 iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
a7f4b6d8 1258 iovec_mcast[0].iov_base = group_len;
90c88404
SD
1259 for (i = 0; i < iov_len; i++) {
1260 iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1261 iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1262 }
1263
f6cfe4e8 1264 res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
90c88404 1265
71f044bf
SD
1266 if (totempg_threaded_mode == 1) {
1267 pthread_mutex_unlock (&totempg_mutex);
1268 }
1269
f6cfe4e8 1270 return (res);
90c88404
SD
1271}
1272
e920fef7
SD
1273static void check_q_level(
1274 void *totempg_groups_instance)
c6895faa 1275{
e920fef7 1276 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
2ba4ebe0
AS
1277 int32_t old_level = instance->q_level;
1278 int32_t percent_used = q_level_precent_used();
c6895faa 1279
2ba4ebe0 1280 if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) {
c6895faa
AS
1281 instance->q_level = TOTEM_Q_LEVEL_CRITICAL;
1282 } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) {
1283 instance->q_level = TOTEM_Q_LEVEL_LOW;
2ba4ebe0 1284 } else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) {
c6895faa 1285 instance->q_level = TOTEM_Q_LEVEL_GOOD;
2ba4ebe0 1286 } else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) {
c6895faa
AS
1287 instance->q_level = TOTEM_Q_LEVEL_HIGH;
1288 }
1289 if (totem_queue_level_changed && old_level != instance->q_level) {
1290 totem_queue_level_changed(instance->q_level);
1291 }
c6895faa
AS
1292}
1293
e920fef7
SD
1294void totempg_check_q_level(
1295 void *totempg_groups_instance)
c6895faa 1296{
e920fef7 1297 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
c6895faa 1298
c6895faa 1299 check_q_level(instance);
c6895faa
AS
1300}
1301
1f05ffd2 1302int totempg_groups_joined_reserve (
e920fef7 1303 void *totempg_groups_instance,
b94d4f6a 1304 const struct iovec *iovec,
36eefbad 1305 unsigned int iov_len)
90c88404 1306{
e920fef7 1307 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
90c88404
SD
1308 unsigned int size = 0;
1309 unsigned int i;
1f05ffd2 1310 unsigned int reserved = 0;
90c88404 1311
71f044bf
SD
1312 if (totempg_threaded_mode == 1) {
1313 pthread_mutex_lock (&totempg_mutex);
1314 pthread_mutex_lock (&mcast_msg_mutex);
1315 }
90c88404
SD
1316
1317 for (i = 0; i < instance->groups_cnt; i++) {
1318 size += instance->groups[i].group_len;
1319 }
1320 for (i = 0; i < iov_len; i++) {
1321 size += iovec[i].iov_len;
1322 }
c6895faa 1323
f5589177
SD
1324 if (size >= totempg_size_limit) {
1325 reserved = -1;
e920fef7 1326 goto error_exit;
f5589177 1327 }
90c88404 1328
7b02f176
AS
1329 if (byte_count_send_ok (size)) {
1330 reserved = send_reserve (size);
1331 } else {
1f05ffd2
SD
1332 reserved = 0;
1333 }
90c88404 1334
90c88404 1335error_exit:
2ba4ebe0
AS
1336 check_q_level(instance);
1337
71f044bf
SD
1338 if (totempg_threaded_mode == 1) {
1339 pthread_mutex_unlock (&mcast_msg_mutex);
1340 pthread_mutex_unlock (&totempg_mutex);
1341 }
1f05ffd2
SD
1342 return (reserved);
1343}
1344
1345
2eba2a6c 1346int totempg_groups_joined_release (int msg_count)
1f05ffd2 1347{
71f044bf
SD
1348 if (totempg_threaded_mode == 1) {
1349 pthread_mutex_lock (&totempg_mutex);
1350 pthread_mutex_lock (&mcast_msg_mutex);
1351 }
1f05ffd2 1352 send_release (msg_count);
71f044bf
SD
1353 if (totempg_threaded_mode == 1) {
1354 pthread_mutex_unlock (&mcast_msg_mutex);
1355 pthread_mutex_unlock (&totempg_mutex);
1356 }
2eba2a6c 1357 return 0;
90c88404
SD
1358}
1359
1360int totempg_groups_mcast_groups (
e920fef7 1361 void *totempg_groups_instance,
90c88404 1362 int guarantee,
e7fa045d
JM
1363 const struct totempg_group *groups,
1364 size_t groups_cnt,
1365 const struct iovec *iovec,
29eb77a9 1366 unsigned int iov_len)
90c88404 1367{
90c88404
SD
1368 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1369 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1370 int i;
f6cfe4e8 1371 unsigned int res;
90c88404 1372
71f044bf
SD
1373 if (totempg_threaded_mode == 1) {
1374 pthread_mutex_lock (&totempg_mutex);
1375 }
90c88404
SD
1376
1377 /*
1378 * Build group_len structure and the iovec_mcast structure
1379 */
1380 group_len[0] = groups_cnt;
1381 for (i = 0; i < groups_cnt; i++) {
1382 group_len[i + 1] = groups[i].group_len;
1383 iovec_mcast[i + 1].iov_len = groups[i].group_len;
2eba2a6c 1384 iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
90c88404
SD
1385 }
1386 iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
a7f4b6d8 1387 iovec_mcast[0].iov_base = group_len;
90c88404
SD
1388 for (i = 0; i < iov_len; i++) {
1389 iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1390 iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1391 }
1392
f6cfe4e8 1393 res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
90c88404 1394
71f044bf
SD
1395 if (totempg_threaded_mode == 1) {
1396 pthread_mutex_unlock (&totempg_mutex);
1397 }
f6cfe4e8 1398 return (res);
90c88404
SD
1399}
1400
536fab94
SD
1401/*
1402 * Returns -1 if error, 0 if can't send, 1 if can send the message
1403 */
90c88404 1404int totempg_groups_send_ok_groups (
e920fef7 1405 void *totempg_groups_instance,
e7fa045d
JM
1406 const struct totempg_group *groups,
1407 size_t groups_cnt,
1408 const struct iovec *iovec,
29eb77a9 1409 unsigned int iov_len)
90c88404 1410{
90c88404
SD
1411 unsigned int size = 0;
1412 unsigned int i;
1413 unsigned int res;
1414
71f044bf
SD
1415 if (totempg_threaded_mode == 1) {
1416 pthread_mutex_lock (&totempg_mutex);
1417 }
90c88404
SD
1418
1419 for (i = 0; i < groups_cnt; i++) {
1420 size += groups[i].group_len;
1421 }
1422 for (i = 0; i < iov_len; i++) {
1423 size += iovec[i].iov_len;
1424 }
1425
1f05ffd2 1426 res = msg_count_send_ok (size);
904a10ed 1427
71f044bf
SD
1428 if (totempg_threaded_mode == 1) {
1429 pthread_mutex_unlock (&totempg_mutex);
1430 }
f6cfe4e8 1431 return (res);
90c88404
SD
1432}
1433
294a629f
CC
1434int totempg_iface_set (
1435 struct totem_ip_address *interface_addr,
1436 unsigned short ip_port,
1437 unsigned int iface_no)
1438{
1439 int res;
1440
1441 res = totemsrp_iface_set (
1442 totemsrp_context,
1443 interface_addr,
1444 ip_port,
1445 iface_no);
1446
1447 return (res);
1448}
1449
483a9baa
SD
1450int totempg_ifaces_get (
1451 unsigned int nodeid,
2c20590d 1452 unsigned int *interface_id,
483a9baa 1453 struct totem_ip_address *interfaces,
e925f421 1454 unsigned int interfaces_size,
640cfba2 1455 char ***status,
483a9baa
SD
1456 unsigned int *iface_count)
1457{
1458 int res;
1459
268cde6e
CC
1460 res = totemsrp_ifaces_get (
1461 totemsrp_context,
483a9baa 1462 nodeid,
2c20590d 1463 interface_id,
483a9baa 1464 interfaces,
e925f421 1465 interfaces_size,
640cfba2 1466 status,
483a9baa
SD
1467 iface_count);
1468
1469 return (res);
1470}
1471
29eb20a3
AS
1472void totempg_event_signal (enum totem_event_type type, int value)
1473{
268cde6e 1474 totemsrp_event_signal (totemsrp_context, type, value);
29eb20a3
AS
1475}
1476
73a24c03
AS
1477void* totempg_get_stats (void)
1478{
1479 return &totempg_stats;
1480}
1481
e9660ee6 1482int totempg_crypto_set (
3b7c2f05
JF
1483 const char *cipher_type,
1484 const char *hash_type)
e9660ee6
CC
1485{
1486 int res;
1487
268cde6e 1488 res = totemsrp_crypto_set (totemsrp_context, cipher_type, hash_type);
e9660ee6
CC
1489
1490 return (res);
1491}
1492
abc3b6ab 1493#define ONE_IFACE_LEN 63
7f3f9496 1494const char *totempg_ifaces_print (unsigned int nodeid)
483a9baa
SD
1495{
1496 static char iface_string[256 * INTERFACE_MAX];
abc3b6ab 1497 char one_iface[ONE_IFACE_LEN+1];
483a9baa
SD
1498 struct totem_ip_address interfaces[INTERFACE_MAX];
1499 unsigned int iface_count;
2c20590d 1500 unsigned int iface_ids[INTERFACE_MAX];
483a9baa
SD
1501 unsigned int i;
1502 int res;
1503
1504 iface_string[0] = '\0';
1505
2c20590d 1506 res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
483a9baa
SD
1507 if (res == -1) {
1508 return ("no interface found for nodeid");
1509 }
1510
2c20590d 1511 res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
e925f421 1512
483a9baa 1513 for (i = 0; i < iface_count; i++) {
1ca72a11
CC
1514 if (!interfaces[i].family) {
1515 continue;
1516 }
abc3b6ab
AS
1517 snprintf (one_iface, ONE_IFACE_LEN,
1518 "r(%d) ip(%s) ",
1519 i, totemip_print (&interfaces[i]));
483a9baa
SD
1520 strcat (iface_string, one_iface);
1521 }
1522 return (iface_string);
1523}
1524
f2f20533 1525unsigned int totempg_my_nodeid_get (void)
0a19a21f 1526{
268cde6e 1527 return (totemsrp_my_nodeid_get(totemsrp_context));
0a19a21f
SD
1528}
1529
1530int totempg_my_family_get (void)
1531{
268cde6e 1532 return (totemsrp_my_family_get(totemsrp_context));
0a19a21f 1533}
69928e30
SD
1534extern void totempg_service_ready_register (
1535 void (*totem_service_ready) (void))
1536{
268cde6e 1537 totemsrp_service_ready_register (totemsrp_context, totem_service_ready);
69928e30
SD
1538}
1539
c6895faa
AS
1540void totempg_queue_level_register_callback (totem_queue_level_changed_fn fn)
1541{
1542 totem_queue_level_changed = fn;
1543}
1544
bb05aed9
SD
1545extern int totempg_member_add (
1546 const struct totem_ip_address *member,
a358791d
AJ
1547 int ring_no)
1548{
268cde6e 1549 return totemsrp_member_add (totemsrp_context, member, ring_no);
a358791d 1550}
bb05aed9
SD
1551
1552extern int totempg_member_remove (
1553 const struct totem_ip_address *member,
a358791d
AJ
1554 int ring_no)
1555{
268cde6e 1556 return totemsrp_member_remove (totemsrp_context, member, ring_no);
a358791d 1557}
71f044bf 1558
16f616b6
CC
1559extern int totempg_reconfigure (void)
1560{
1561 return totemsrp_reconfigure (totemsrp_context, totempg_totem_config);
1562}
1563
d9dfd41e
CC
1564extern void totempg_stats_clear (int flags)
1565{
1566 if (flags & TOTEMPG_STATS_CLEAR_TOTEM) {
1567 totempg_stats.msg_reserved = 0;
1568 totempg_stats.msg_queue_avail = 0;
1569 }
1570 return totemsrp_stats_clear (totemsrp_context, flags);
1571}
1572
71f044bf
SD
1573void totempg_threaded_mode_enable (void)
1574{
1575 totempg_threaded_mode = 1;
268cde6e 1576 totemsrp_threaded_mode_enable (totemsrp_context);
71f044bf
SD
1577}
1578
40263892
SD
1579void totempg_trans_ack (void)
1580{
268cde6e 1581 totemsrp_trans_ack (totemsrp_context);
40263892
SD
1582}
1583
51989b4a
CW
1584void totempg_force_gather (void)
1585{
1586 totemsrp_force_gather(totemsrp_context);
1587}
f078fff6
CC
1588
1589/* Assumes ->orig_interfaces is already allocated */
1590void totempg_get_config(struct totem_config *config)
1591{
1592 struct totem_interface *temp_if = config->orig_interfaces;
1593
1594 memcpy(config, totempg_totem_config, sizeof(struct totem_config));
1595 config->orig_interfaces = temp_if;
1596 memcpy(config->orig_interfaces, totempg_totem_config->interfaces, sizeof(struct totem_interface) * INTERFACE_MAX);
1597 config->interfaces = NULL;
1598}
1599
1600void totempg_put_config(struct totem_config *config)
1601{
4ddc96cd
CC
1602 struct totem_interface *temp_if = totempg_totem_config->interfaces;
1603
1604 /* Preseve the existing interfaces[] array as transports might have pointers saved */
1605 memcpy(totempg_totem_config->interfaces, config->interfaces, sizeof(struct totem_interface) * INTERFACE_MAX);
f078fff6 1606 memcpy(totempg_totem_config, config, sizeof(struct totem_config));
4ddc96cd 1607 totempg_totem_config->interfaces = temp_if;
f078fff6 1608}