]> git.proxmox.com Git - mirror_corosync.git/blame - test/cpghum.c
test: Fix cpgtest
[mirror_corosync.git] / test / cpghum.c
CommitLineData
3842ba60 1/*
ce188ff3 2 * Copyright (c) 2015-2017 Red Hat, Inc.
3842ba60
JF
3 *
4 * All rights reserved.
5 *
6 * Author: Christine Caulfield <ccaulfie@redhat.com>
7 *
8 * This software licensed under BSD license, the text of which follows:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
12 *
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the MontaVista Software, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38#include <signal.h>
39#include <unistd.h>
0a9c3be5 40#include <assert.h>
3842ba60 41#include <errno.h>
3842ba60 42#include <time.h>
16a865e5 43#include <limits.h>
93d48c8c 44#include <ctype.h>
ce188ff3
CC
45#include <syslog.h>
46#include <stdarg.h>
5731af27 47#include <inttypes.h>
3842ba60
JF
48#include <sys/time.h>
49#include <sys/types.h>
50#include <sys/socket.h>
51#include <sys/select.h>
52#include <sys/uio.h>
53#include <sys/un.h>
54#include <netinet/in.h>
55#include <arpa/inet.h>
56#include <pthread.h>
57#include <zlib.h>
58#include <libgen.h>
93d48c8c 59#include <getopt.h>
3842ba60 60
3842ba60
JF
61#include <corosync/corotypes.h>
62#include <corosync/cpg.h>
63
64static cpg_handle_t handle;
65
66static pthread_t thread;
67
68#ifndef timersub
69#define timersub(a, b, result) \
70 do { \
71 (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
72 (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
73 if ((result)->tv_usec < 0) { \
74 --(result)->tv_sec; \
75 (result)->tv_usec += 1000000; \
76 } \
77 } while (0)
78#endif /* timersub */
79
80static int alarm_notice;
ce188ff3 81#define MAX_NODEID 65536
3842ba60
JF
82#define ONE_MEG 1048576
83#define DATASIZE (ONE_MEG*20)
84static char data[DATASIZE];
85static int send_counter = 0;
86static int do_syslog = 0;
87static int quiet = 0;
71707549 88static int report_rtt = 0;
40c246fb 89static int abort_on_error = 0;
ce188ff3
CC
90static int machine_readable = 0;
91static char delimiter = ',';
92static int to_stderr = 0;
71707549 93static unsigned int g_our_nodeid;
3842ba60 94static volatile int stopped;
93d48c8c
CC
95static unsigned int flood_start = 64;
96static unsigned int flood_multiplier = 5;
97static unsigned long flood_max = (ONE_MEG - 100);
3842ba60
JF
98
99// stats
100static unsigned int length_errors=0;
101static unsigned int crc_errors=0;
102static unsigned int sequence_errors=0;
103static unsigned int packets_sent=0;
104static unsigned int packets_recvd=0;
0be72f03 105static unsigned int packets_recvd1=0; /* For flood intermediates */
3842ba60
JF
106static unsigned int send_retries=0;
107static unsigned int send_fails=0;
71707549
CC
108static unsigned long avg_rtt=0;
109static unsigned long max_rtt=0;
16a865e5 110static unsigned long min_rtt=LONG_MAX;
57c4086f
CC
111static unsigned long interim_avg_rtt=0;
112static unsigned long interim_max_rtt=0;
113static unsigned long interim_min_rtt=LONG_MAX;
71707549
CC
114
115struct cpghum_header {
116 unsigned int counter;
117 unsigned int crc;
0be72f03 118 unsigned int size;
71707549
CC
119 struct timeval timestamp;
120};
3842ba60
JF
121
122static void cpg_bm_confchg_fn (
123 cpg_handle_t handle_in,
124 const struct cpg_name *group_name,
125 const struct cpg_address *member_list, size_t member_list_entries,
126 const struct cpg_address *left_list, size_t left_list_entries,
127 const struct cpg_address *joined_list, size_t joined_list_entries)
128{
129}
130
131static unsigned int g_recv_count;
132static unsigned int g_recv_length;
ce188ff3
CC
133static int g_recv_start[MAX_NODEID+1];
134static int g_recv_counter[MAX_NODEID+1];
135static int g_recv_size[MAX_NODEID+1];
136static int g_log_mask = 0xFFFF;
137typedef enum
138{
139 CPGH_LOG_INFO = 1,
140 CPGH_LOG_PERF = 2,
141 CPGH_LOG_RTT = 4,
142 CPGH_LOG_STATS = 8,
143 CPGH_LOG_ERR = 16
144} log_type_t;
145
0a9c3be5
JF
146static void cpgh_print_message(int syslog_level, const char *facility_name, const char *format, va_list ap)
147 __attribute__((format(printf, 3, 0)));
148
149static void cpgh_log_printf(log_type_t type, const char *format, ...)
150 __attribute__((format(printf, 2, 3)));
151
ce188ff3
CC
152static void cpgh_print_message(int syslog_level, const char *facility_name, const char *format, va_list ap)
153{
154 char msg[1024];
155 int start = 0;
156
157 if (machine_readable) {
efef3a90 158 snprintf(msg, sizeof(msg), "%s%c", facility_name, delimiter);
ce188ff3
CC
159 start = strlen(msg);
160 }
161
0a9c3be5
JF
162 assert(vsnprintf(msg+start, sizeof(msg)-start, format, ap) < sizeof(msg)-start);
163
ce188ff3
CC
164 if (to_stderr || (syslog_level <= LOG_ERR)) {
165 fprintf(stderr, "%s", msg);
166 }
167 else {
168 printf("%s", msg);
169 }
170 if (do_syslog) {
171 syslog(syslog_level, "%s", msg);
172 }
173}
174
175static void cpgh_log_printf(log_type_t type, const char *format, ...)
176{
177 va_list ap;
178
179 if (!(type & g_log_mask)) {
180 return;
181 }
182
183 va_start(ap, format);
184
185 switch (type) {
186 case CPGH_LOG_INFO:
187 cpgh_print_message(LOG_INFO, "[Info]", format, ap);
188 break;
189 case CPGH_LOG_PERF:
190 cpgh_print_message(LOG_INFO, "[Perf]", format, ap);
191 break;
192 case CPGH_LOG_RTT:
193 cpgh_print_message(LOG_INFO, "[RTT]", format, ap);
194 break;
195 case CPGH_LOG_STATS:
196 cpgh_print_message(LOG_INFO, "[Stats]", format, ap);
197 break;
198 case CPGH_LOG_ERR:
199 cpgh_print_message(LOG_ERR, "[Err]", format, ap);
200 break;
201 default:
202 break;
203 }
204
205 va_end(ap);
206}
3842ba60 207
57c4086f
CC
208static unsigned long update_rtt(struct timeval *header_timestamp, int packet_count,
209 unsigned long *rtt_min, unsigned long *rtt_avg, unsigned long *rtt_max)
210{
211 struct timeval tv1;
212 struct timeval rtt;
213 unsigned long rtt_usecs;
214
215 gettimeofday (&tv1, NULL);
216 timersub(&tv1, header_timestamp, &rtt);
217
218 rtt_usecs = rtt.tv_usec + rtt.tv_sec*1000000;
219 if (rtt_usecs > *rtt_max) {
220 *rtt_max = rtt_usecs;
221 }
222 if (rtt_usecs < *rtt_min) {
223 *rtt_min = rtt_usecs;
224 }
225
226 /* Don't start the average with 0 */
227 if (*rtt_avg == 0) {
228 *rtt_avg = rtt_usecs;
229 }
230 else {
231 *rtt_avg = ((*rtt_avg * packet_count) + rtt_usecs) / (packet_count+1);
232 }
233
234 return rtt_usecs;
235}
236
237
3842ba60
JF
238static void cpg_bm_deliver_fn (
239 cpg_handle_t handle_in,
240 const struct cpg_name *group_name,
241 uint32_t nodeid,
242 uint32_t pid,
243 void *msg,
244 size_t msg_len)
245{
3842ba60 246 uLong crc=0;
71707549
CC
247 struct cpghum_header *header = (struct cpghum_header *)msg;
248 uLong recv_crc = header->crc & 0xFFFFFFFF;
e6d0f87f 249 unsigned int *dataint = (unsigned int *)((char*)msg + sizeof(struct cpghum_header));
0be72f03 250 unsigned int datalen;
3842ba60 251
ce188ff3 252 if (nodeid > MAX_NODEID) {
5731af27 253 cpgh_log_printf(CPGH_LOG_ERR, "Got message from invalid nodeid " CS_PRI_NODE_ID " (too high for us). Quitting\n", nodeid);
ce188ff3
CC
254 exit(1);
255 }
256
3842ba60 257 packets_recvd++;
0be72f03 258 packets_recvd1++;
3842ba60 259 g_recv_length = msg_len;
0be72f03 260 datalen = header->size - sizeof(struct cpghum_header);
3842ba60 261
40c246fb
CC
262 // Report RTT first in case abort_on_error is set
263 if (nodeid == g_our_nodeid) {
ce188ff3 264 unsigned long rtt_usecs;
40c246fb 265
57c4086f
CC
266 // For flood
267 update_rtt(&header->timestamp, packets_recvd1, &interim_min_rtt, &interim_avg_rtt, &interim_max_rtt);
40c246fb 268
57c4086f 269 rtt_usecs = update_rtt(&header->timestamp, g_recv_counter[nodeid], &min_rtt, &avg_rtt, &max_rtt);
40c246fb
CC
270
271 if (report_rtt) {
ce188ff3 272 if (machine_readable) {
efef3a90 273 cpgh_log_printf(CPGH_LOG_RTT, "%ld%c%ld%c%ld%c%ld\n", rtt_usecs, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
ce188ff3
CC
274 }
275 else {
276 cpgh_log_printf(CPGH_LOG_RTT, "%s: RTT %ld uS (min/avg/max): %ld/%ld/%ld\n", group_name->value, rtt_usecs, min_rtt, avg_rtt, max_rtt);
40c246fb
CC
277 }
278 }
279 }
280
3842ba60 281 // Basic check, packets should all be the right size
0be72f03 282 if (msg_len != header->size) {
3842ba60 283 length_errors++;
5731af27 284 cpgh_log_printf(CPGH_LOG_ERR, "%s: message sizes don't match. got %zu, expected %u from node " CS_PRI_NODE_ID "\n", group_name->value, msg_len, header->size, nodeid);
40c246fb
CC
285
286 if (abort_on_error) {
eca52f67 287 exit(2);
40c246fb 288 }
3842ba60 289 }
ce188ff3 290 g_recv_size[nodeid] = msg_len;
3842ba60
JF
291
292 // Sequence counters are incrementing in step?
ce188ff3 293 if (header->counter != g_recv_counter[nodeid]) {
40c246fb 294
ce188ff3
CC
295 /* Don't report the first mismatch or a newly restarted sender, we're just catching up */
296 if (g_recv_counter[nodeid] && header->counter) {
297 sequence_errors++;
5731af27 298 cpgh_log_printf(CPGH_LOG_ERR, "%s: counters don't match. got %d, expected %d from node " CS_PRI_NODE_ID "\n", group_name->value, header->counter, g_recv_counter[nodeid], nodeid);
ce188ff3
CC
299
300 if (abort_on_error) {
301 exit(2);
302 }
303 }
304 else {
305 g_recv_start[nodeid] = header->counter;
40c246fb
CC
306 }
307
ce188ff3
CC
308 /* Catch up or we'll be printing errors for ever */
309 g_recv_counter[nodeid] = header->counter+1;
40c246fb
CC
310 }
311 else {
ce188ff3 312 g_recv_counter[nodeid]++;
3842ba60
JF
313 }
314
ce188ff3 315 /* Check crc */
3842ba60 316 crc = crc32(0, NULL, 0);
71707549 317 crc = crc32(crc, (Bytef *)dataint, datalen) & 0xFFFFFFFF;
3842ba60
JF
318 if (crc != recv_crc) {
319 crc_errors++;
5731af27 320 cpgh_log_printf(CPGH_LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx from nodeid " CS_PRI_NODE_ID "\n", group_name->value, recv_crc, crc, nodeid);
ce188ff3 321
40c246fb 322 if (abort_on_error) {
eca52f67 323 exit(2);
16a865e5 324 }
71707549 325
71707549
CC
326 }
327
3842ba60
JF
328 g_recv_count++;
329
330}
331
332static cpg_model_v1_data_t model1_data = {
333 .cpg_deliver_fn = cpg_bm_deliver_fn,
334 .cpg_confchg_fn = cpg_bm_confchg_fn,
335};
336
337static cpg_callbacks_t callbacks = {
338 .cpg_deliver_fn = cpg_bm_deliver_fn,
339 .cpg_confchg_fn = cpg_bm_confchg_fn
340};
341
342static struct cpg_name group_name = {
343 .value = "cpghum",
344 .length = 7
345};
346
0be72f03
CC
347static void set_packet(int write_size, int counter)
348{
349 struct cpghum_header *header = (struct cpghum_header *)data;
350 int i;
351 unsigned int *dataint = (unsigned int *)(data + sizeof(struct cpghum_header));
352 unsigned int datalen = write_size - sizeof(struct cpghum_header);
353 struct timeval tv1;
354 uLong crc;
355
356 header->counter = counter;
357 for (i=0; i<(datalen/4); i++) {
358 dataint[i] = rand();
359 }
360 crc = crc32(0, NULL, 0);
361 header->crc = crc32(crc, (Bytef*)&dataint[0], datalen);
362 header->size = write_size;
363
364 gettimeofday (&tv1, NULL);
365 memcpy(&header->timestamp, &tv1, sizeof(struct timeval));
366}
367
ce188ff3 368/* Basically this is cpgbench.c */
0be72f03
CC
369static void cpg_flood (
370 cpg_handle_t handle_in,
371 int write_size)
372{
373 struct timeval tv1, tv2, tv_elapsed;
374 struct iovec iov;
375 unsigned int res = CS_OK;
376
377 alarm_notice = 0;
378 iov.iov_base = data;
379 iov.iov_len = write_size;
380
381 alarm (10);
382 packets_recvd1 = 0;
57c4086f
CC
383 interim_avg_rtt = 0;
384 interim_max_rtt = 0;
385 interim_min_rtt = LONG_MAX;
0be72f03
CC
386
387 gettimeofday (&tv1, NULL);
388 do {
ce188ff3
CC
389 if (res == CS_OK) {
390 set_packet(write_size, send_counter);
0be72f03
CC
391 }
392
393 res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
394 if (res == CS_OK) {
ce188ff3 395 /* Only increment the packet counter if it was sucessfully sent */
0be72f03 396 packets_sent++;
ce188ff3 397 send_counter++;
0be72f03
CC
398 }
399 else {
400 if (res == CS_ERR_TRY_AGAIN) {
401 send_retries++;
402 }
403 else {
404 send_fails++;
405 }
406 }
407 } while (!stopped && alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN));
408 gettimeofday (&tv2, NULL);
409 timersub (&tv2, &tv1, &tv_elapsed);
410
411 if (!quiet) {
ce188ff3 412 if (machine_readable) {
efef3a90 413 cpgh_log_printf (CPGH_LOG_PERF, "%d%c%d%c%f%c%f%c%f%c%ld%c%ld%c%ld\n", packets_recvd1, delimiter, write_size, delimiter,
ce188ff3
CC
414 (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
415 ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
57c4086f
CC
416 ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0), delimiter,
417 interim_min_rtt, delimiter, interim_avg_rtt, delimiter, interim_max_rtt);
ce188ff3
CC
418 }
419 else {
420 cpgh_log_printf (CPGH_LOG_PERF, "%5d messages received ", packets_recvd1);
421 cpgh_log_printf (CPGH_LOG_PERF, "%5d bytes per write ", write_size);
422 cpgh_log_printf (CPGH_LOG_PERF, "%7.3f Seconds runtime ",
423 (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
424 cpgh_log_printf (CPGH_LOG_PERF, "%9.3f TP/s ",
425 ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
57c4086f 426 cpgh_log_printf (CPGH_LOG_PERF, "%7.3f MB/s ",
ce188ff3 427 ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
57c4086f
CC
428 cpgh_log_printf (CPGH_LOG_PERF, "RTT for this size (min/avg/max) %ld/%ld/%ld\n",
429 interim_min_rtt, interim_avg_rtt, interim_max_rtt);
ce188ff3 430 }
0be72f03 431 }
0be72f03
CC
432}
433
3842ba60
JF
434static void cpg_test (
435 cpg_handle_t handle_in,
436 int write_size,
437 int delay_time,
438 int print_time)
439{
3842ba60
JF
440 struct iovec iov;
441 unsigned int res;
3842ba60
JF
442
443 alarm_notice = 0;
444 iov.iov_base = data;
445 iov.iov_len = write_size;
446
447 g_recv_count = 0;
448 alarm (print_time);
449
3842ba60 450 do {
0be72f03 451 send_counter++;
3842ba60 452 resend:
0be72f03 453 set_packet(write_size, send_counter);
71707549 454
3842ba60
JF
455 res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
456 if (res == CS_ERR_TRY_AGAIN) {
457 usleep(10000);
458 send_retries++;
459 goto resend;
460 }
461 if (res != CS_OK) {
ce188ff3 462 cpgh_log_printf(CPGH_LOG_ERR, "send failed: %d\n", res);
3842ba60
JF
463 send_fails++;
464 }
465 else {
466 packets_sent++;
467 }
468 usleep(delay_time*1000);
469 } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
3842ba60
JF
470
471 if (!quiet) {
ce188ff3 472 if (machine_readable) {
efef3a90 473 cpgh_log_printf(CPGH_LOG_RTT, "%d%c%ld%c%ld%c%ld\n", 0, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
ce188ff3
CC
474 }
475 else {
476 cpgh_log_printf(CPGH_LOG_PERF, "%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
477 cpgh_log_printf(CPGH_LOG_PERF, "%5d bytes per write. ", write_size);
478 cpgh_log_printf(CPGH_LOG_RTT, "RTT min/avg/max: %ld/%ld/%ld\n", min_rtt, avg_rtt, max_rtt);
479 }
3842ba60
JF
480 }
481
482}
483
484static void sigalrm_handler (int num)
485{
486 alarm_notice = 1;
487}
488
489static void sigint_handler (int num)
490{
491 stopped = 1;
492}
493
494static void* dispatch_thread (void *arg)
495{
496 cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
497 return NULL;
498}
499
500static void usage(char *cmd)
501{
502 fprintf(stderr, "%s [OPTIONS]\n", cmd);
503 fprintf(stderr, "\n");
504 fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
505 fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
506 fprintf(stderr, "corrupted messages will be detected and reported.\n");
507 fprintf(stderr, "\n");
508 fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
509 fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
510 fprintf(stderr, "\n");
3842ba60
JF
511 fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
512 fprintf(stderr, "different nodes by using the -n option.\n");
513 fprintf(stderr, "\n");
ce188ff3
CC
514 fprintf(stderr, "%s can handle more than 1 sender in the same CPG provided they are on\n", cmd);
515 fprintf(stderr, "different nodes.\n");
3842ba60 516 fprintf(stderr, "\n");
93d48c8c
CC
517 fprintf(stderr, " -w<num>, --size-bytes Write size in Kbytes, default 4\n");
518 fprintf(stderr, " -W<num>, --size-kb Write size in bytes, default 4096\n");
519 fprintf(stderr, " -n<name>, --name CPG name to use, default 'cpghum'\n");
520 fprintf(stderr, " -M Write machine-readable results\n");
521 fprintf(stderr, " -D<char> Delimiter for machine-readable results (default ',')\n");
522 fprintf(stderr, " -E Send normal output to stderr instead of stdout\n");
523 fprintf(stderr, " -d<num>, --delay Delay between sending packets (mS), default 1000\n");
524 fprintf(stderr, " -r<num> Number of repetitions, default 100\n");
525 fprintf(stderr, " -p<num> Delay between printing output (seconds), default 10s\n");
526 fprintf(stderr, " -l, --listen Listen and check CRCs only, don't send (^C to quit)\n");
527 fprintf(stderr, " -t, --rtt Report Round Trip Times for each packet.\n");
528 fprintf(stderr, " -m<num> cpg_initialise() model. Default 1.\n");
529 fprintf(stderr, " -s Also send errors to syslog.\n");
530 fprintf(stderr, " -f, --flood Flood test CPG (cpgbench). see --flood-* long options\n");
531 fprintf(stderr, " -a Abort on crc/length/sequence error\n");
532 fprintf(stderr, " -q, --quiet Quiet. Don't print messages every 10s (see also -p)\n");
533 fprintf(stderr, " -qq Very quiet. Don't print stats at the end\n");
534 fprintf(stderr, " --flood-start=bytes Start value for --flood\n");
535 fprintf(stderr, " --flood-mult=value Packet size multiplier value for --flood\n");
536 fprintf(stderr, " --flood-max=bytes Maximum packet size for --flood\n");
537 fprintf(stderr, "\n");
538 fprintf(stderr, " values for --flood* and -W can have K or M suffixes to indicate\n");
539 fprintf(stderr, " Kilobytes or Megabytes\n");
3842ba60 540 fprintf(stderr, "\n");
eca52f67
JF
541 fprintf(stderr, "%s exit code is 0 if no error happened, 1 on generic error and 2 on\n", cmd);
542 fprintf(stderr, "send/crc/length/sequence error");
543 fprintf(stderr, "\n");
3842ba60
JF
544}
545
93d48c8c
CC
546/* Parse a size, optionally ending in 'K', 'M' */
547static long parse_bytes(const char *valstring)
548{
549 unsigned int value;
550 int multiplier = 1;
551 char suffix = '\0';
552 int have_suffix = 0;
553
554 /* Suffix is optional */
555 if (sscanf(valstring, "%u%c", &value, &suffix) == 0) {
556 return 0;
557 }
558
559 if (toupper(suffix) == 'M') {
560 multiplier = 1024*1024;
561 have_suffix = 1;
562 }
563 if (toupper(suffix) == 'K') {
564 multiplier = 1024;
565 have_suffix = 1;
566 }
567
568 if (!have_suffix && suffix != '\0') {
569 fprintf(stderr, "Invalid suffix '%c', only K or M supported\n", suffix);
570 return 0;
571 }
572 return value * multiplier;
573}
574
575
3842ba60
JF
576int main (int argc, char *argv[]) {
577 int i;
578 unsigned int res;
579 uint32_t maxsize;
580 int opt;
581 int bs;
582 int write_size = 4096;
583 int delay_time = 1000;
584 int repetitions = 100;
585 int print_time = 10;
586 int have_size = 0;
587 int listen_only = 0;
0be72f03 588 int flood = 0;
3842ba60 589 int model = 1;
93d48c8c
CC
590 int option_index = 0;
591 struct option long_options[] = {
592 {"flood-start", required_argument, 0, 0 },
593 {"flood-mult", required_argument, 0, 0 },
594 {"flood-max", required_argument, 0, 0 },
595 {"size-kb", required_argument, 0, 'w' },
596 {"size-bytes", required_argument, 0, 'W' },
597 {"name", required_argument, 0, 'n' },
598 {"rtt", no_argument, 0, 't' },
599 {"flood", no_argument, 0, 'f' },
600 {"quiet", no_argument, 0, 'q' },
601 {"listen", no_argument, 0, 'l' },
602 {"help", no_argument, 0, '?' },
603 {0, 0, 0, 0 }
604 };
605
606 while ( (opt = getopt_long(argc, argv, "qlstafMEn:d:r:p:m:w:W:D:",
607 long_options, &option_index)) != -1 ) {
3842ba60 608 switch (opt) {
93d48c8c
CC
609 case 0: // Long-only options
610 if (strcmp(long_options[option_index].name, "flood-start") == 0) {
611 flood_start = parse_bytes(optarg);
612 if (flood_start == 0) {
613 fprintf(stderr, "flood-start value invalid\n");
614 exit(1);
615 }
616 }
617 if (strcmp(long_options[option_index].name, "flood-mult") == 0) {
618 flood_multiplier = parse_bytes(optarg);
619 if (flood_multiplier == 0) {
620 fprintf(stderr, "flood-mult value invalid\n");
621 exit(1);
622 }
623 }
624 if (strcmp(long_options[option_index].name, "flood-max") == 0) {
625 flood_max = parse_bytes(optarg);
626 if (flood_max == 0) {
627 fprintf(stderr, "flood-max value invalid\n");
628 exit(1);
629 }
630 }
631 break;
3842ba60
JF
632 case 'w': // Write size in K
633 bs = atoi(optarg);
634 if (bs > 0) {
635 write_size = bs*1024;
636 have_size = 1;
637 }
638 break;
93d48c8c
CC
639 case 'W': // Write size in bytes (or with a suffix)
640 bs = parse_bytes(optarg);
3842ba60
JF
641 if (bs > 0) {
642 write_size = bs;
643 have_size = 1;
644 }
645 break;
646 case 'n':
19250749
JF
647 if (strlen(optarg) >= CPG_MAX_NAME_LENGTH) {
648 fprintf(stderr, "CPG name too long\n");
649 exit(1);
650 }
651
3842ba60
JF
652 strcpy(group_name.value, optarg);
653 group_name.length = strlen(group_name.value);
654 break;
71707549
CC
655 case 't':
656 report_rtt = 1;
657 break;
ce188ff3
CC
658 case 'E':
659 to_stderr = 1;
660 break;
661 case 'M':
662 machine_readable = 1;
663 break;
0be72f03
CC
664 case 'f':
665 flood = 1;
0be72f03 666 break;
40c246fb
CC
667 case 'a':
668 abort_on_error = 1;
669 break;
3842ba60
JF
670 case 'd':
671 delay_time = atoi(optarg);
672 break;
ce188ff3
CC
673 case 'D':
674 delimiter = optarg[0];
675 break;
3842ba60
JF
676 case 'r':
677 repetitions = atoi(optarg);
678 break;
679 case 'p':
680 print_time = atoi(optarg);
681 break;
682 case 'l':
683 listen_only = 1;
684 break;
685 case 's':
686 do_syslog = 1;
687 break;
688 case 'q':
0be72f03 689 quiet++;
3842ba60
JF
690 break;
691 case 'm':
692 model = atoi(optarg);
693 if (model < 0 || model > 1) {
694 fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
695 exit(1);
696 }
697 break;
698 case '?':
699 usage(basename(argv[0]));
eca52f67 700 exit(1);
3842ba60
JF
701 }
702 }
703
0be72f03 704 if (!have_size && flood) {
93d48c8c 705 write_size = flood_start;
0be72f03
CC
706 }
707
3842ba60
JF
708 signal (SIGALRM, sigalrm_handler);
709 signal (SIGINT, sigint_handler);
710 switch (model) {
711 case 0:
712 res = cpg_initialize (&handle, &callbacks);
713 break;
714 case 1:
715 res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
716 break;
717 default:
718 res=999; // can't get here but it keeps the compiler happy
719 break;
720 }
721
722 if (res != CS_OK) {
ce188ff3 723 cpgh_log_printf(CPGH_LOG_ERR, "cpg_initialize failed with result %d\n", res);
3842ba60
JF
724 exit (1);
725 }
055add91
JF
726 res = cpg_local_get(handle, &g_our_nodeid);
727 if (res != CS_OK) {
728 cpgh_log_printf(CPGH_LOG_ERR, "cpg_local_get failed with result %d\n", res);
729 exit (1);
730 }
71707549 731
3842ba60
JF
732 pthread_create (&thread, NULL, dispatch_thread, NULL);
733
734 res = cpg_join (handle, &group_name);
735 if (res != CS_OK) {
ce188ff3 736 cpgh_log_printf(CPGH_LOG_ERR, "cpg_join failed with result %d\n", res);
3842ba60
JF
737 exit (1);
738 }
739
740 if (listen_only) {
19250749 741 int secs = 0;
3842ba60 742
3842ba60
JF
743 while (!stopped) {
744 sleep(1);
745 if (++secs > print_time && !quiet) {
ce188ff3
CC
746 int nodes_printed = 0;
747
748 if (!machine_readable) {
749 for (i=1; i<MAX_NODEID; i++) {
750 if (g_recv_counter[i]) {
5731af27 751 cpgh_log_printf(CPGH_LOG_INFO, "%s: %5d message%s of %d bytes received from node " CS_PRI_NODE_ID "\n",
ce188ff3
CC
752 group_name.value, g_recv_counter[i] - g_recv_start[i],
753 g_recv_counter[i]==1?"":"s",
754 g_recv_size[i], i);
755 nodes_printed++;
756 }
757 }
758 }
759
760 /* Separate list of nodes if more than one */
761 if (nodes_printed > 1) {
762 cpgh_log_printf(CPGH_LOG_INFO, "\n");
763 }
3842ba60 764 secs = 0;
3842ba60
JF
765 }
766 }
767 }
768 else {
769 cpg_max_atomic_msgsize_get (handle, &maxsize);
0be72f03 770 if (write_size > maxsize) {
3842ba60
JF
771 fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
772 write_size, maxsize);
773 }
ce188ff3
CC
774
775 /* The main job starts here */
0be72f03
CC
776 if (flood) {
777 for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */
778 cpg_flood (handle, write_size);
779 signal (SIGALRM, sigalrm_handler);
93d48c8c
CC
780 write_size *= flood_multiplier;
781 if (write_size > flood_max) {
0be72f03
CC
782 break;
783 }
784 }
785 }
786 else {
ce188ff3 787 send_counter = -1; /* So we start from zero to allow listeners to sync */
0be72f03
CC
788 for (i = 0; i < repetitions && !stopped; i++) {
789 cpg_test (handle, write_size, delay_time, print_time);
790 signal (SIGALRM, sigalrm_handler);
791 }
3842ba60
JF
792 }
793 }
794
795 res = cpg_finalize (handle);
796 if (res != CS_OK) {
ce188ff3 797 cpgh_log_printf(CPGH_LOG_ERR, "cpg_finalize failed with result %d\n", res);
3842ba60
JF
798 exit (1);
799 }
800
0be72f03 801 if (quiet < 2) {
ce188ff3
CC
802 /* Don't print LONG_MAX for min_rtt if we don't have a value */
803 if (min_rtt == LONG_MAX) {
804 min_rtt = 0L;
805 }
806
807 if (machine_readable) {
efef3a90 808 cpgh_log_printf(CPGH_LOG_STATS, "%d%c%d%c%d%c%d%c%d%c%d%c%d%c%ld%c%ld%c%ld\n",
ce188ff3
CC
809 packets_sent, delimiter,
810 send_fails, delimiter,
811 send_retries, delimiter,
812 length_errors, delimiter,
813 packets_recvd, delimiter,
814 sequence_errors, delimiter,
815 crc_errors, delimiter,
816 min_rtt, delimiter,
efef3a90
CC
817 avg_rtt, delimiter,
818 max_rtt);
0be72f03 819 }
ce188ff3
CC
820 else {
821 cpgh_log_printf(CPGH_LOG_STATS, "\n");
822 cpgh_log_printf(CPGH_LOG_STATS, "Stats:\n");
823 if (!listen_only) {
824 cpgh_log_printf(CPGH_LOG_STATS, " packets sent: %d\n", packets_sent);
825 cpgh_log_printf(CPGH_LOG_STATS, " send failures: %d\n", send_fails);
826 cpgh_log_printf(CPGH_LOG_STATS, " send retries: %d\n", send_retries);
827 }
828 cpgh_log_printf(CPGH_LOG_STATS, " length errors: %d\n", length_errors);
829 cpgh_log_printf(CPGH_LOG_STATS, " packets recvd: %d\n", packets_recvd);
830 cpgh_log_printf(CPGH_LOG_STATS, " sequence errors: %d\n", sequence_errors);
831 cpgh_log_printf(CPGH_LOG_STATS, " crc errors: %d\n", crc_errors);
832 if (!listen_only) {
833 cpgh_log_printf(CPGH_LOG_STATS, " min RTT: %ld\n", min_rtt);
834 cpgh_log_printf(CPGH_LOG_STATS, " max RTT: %ld\n", max_rtt);
835 cpgh_log_printf(CPGH_LOG_STATS, " avg RTT: %ld\n", avg_rtt);
836 }
837 cpgh_log_printf(CPGH_LOG_STATS, "\n");
0be72f03 838 }
3842ba60 839 }
eca52f67
JF
840
841 res = 0;
0be72f03 842
eca52f67
JF
843 if (send_fails > 0 || (have_size && length_errors > 0) || sequence_errors > 0 || crc_errors > 0) {
844 res = 2;
845 }
846
847 return (res);
3842ba60 848}