]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2013 CohortFS, LLC | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include <sys/types.h> | |
16 | ||
17 | #include <iostream> | |
18 | #include <string> | |
19 | ||
20 | using namespace std; | |
21 | ||
22 | #include "common/config.h" | |
23 | #include "msg/msg_types.h" | |
24 | #include "msg/xio/XioMessenger.h" | |
25 | #include "msg/xio/FastStrategy.h" | |
26 | #include "msg/xio/QueueStrategy.h" | |
27 | #include "msg/xio/XioMsg.h" | |
28 | #include "messages/MPing.h" | |
29 | #include "common/Timer.h" | |
30 | #include "common/ceph_argparse.h" | |
31 | #include "global/global_init.h" | |
32 | #include "perfglue/heap_profiler.h" | |
33 | #include "common/address_helper.h" | |
34 | #include "message_helper.h" | |
35 | #include "xio_dispatcher.h" | |
36 | #include "msg/xio/XioConnection.h" | |
37 | ||
38 | #define dout_subsys ceph_subsys_xio_client | |
39 | ||
40 | void usage(ostream& out) | |
41 | { | |
42 | out << "usage: xio_client [options]\n" | |
43 | "options:\n" | |
44 | " --addr X\n" | |
45 | " --port X\n" | |
46 | " --msgs X\n" | |
47 | " --dsize X\n" | |
48 | " --nfrags X\n" | |
49 | " --dfast\n" | |
50 | ; | |
51 | } | |
52 | ||
53 | int main(int argc, const char **argv) | |
54 | { | |
55 | vector<const char*> args; | |
56 | Messenger* messenger; | |
57 | XioDispatcher *dispatcher; | |
58 | std::vector<const char*>::iterator arg_iter; | |
59 | std::string val; | |
60 | entity_addr_t dest_addr; | |
61 | ConnectionRef conn; | |
62 | int r = 0; | |
63 | ||
64 | std::string addr = "localhost"; | |
65 | std::string port = "1234"; | |
66 | int n_msgs = 50; | |
67 | int n_dsize = 0; | |
68 | int n_nfrags = 1; | |
69 | bool dfast = false; | |
70 | ||
71 | struct timespec ts; | |
72 | ts.tv_sec = 5; | |
73 | ts.tv_nsec = 0; | |
74 | ||
75 | argv_to_vec(argc, argv, args); | |
7c673cae FG |
76 | |
77 | auto cct = global_init(NULL, args, | |
78 | CEPH_ENTITY_TYPE_ANY, | |
11fdf7f2 TL |
79 | CODE_ENVIRONMENT_UTILITY, |
80 | CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); | |
7c673cae FG |
81 | |
82 | for (arg_iter = args.begin(); arg_iter != args.end();) { | |
83 | if (ceph_argparse_witharg(args, arg_iter, &val, "--addr", | |
84 | (char*) NULL)) { | |
85 | addr = val; | |
86 | } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port", | |
87 | (char*) NULL)) { | |
88 | port = val; | |
89 | } else if (ceph_argparse_witharg(args, arg_iter, &val, "--msgs", | |
90 | (char*) NULL)) { | |
91 | n_msgs = atoi(val.c_str()); | |
92 | } else if (ceph_argparse_witharg(args, arg_iter, &val, "--dsize", | |
93 | (char*) NULL)) { | |
94 | n_dsize = atoi(val.c_str()); | |
95 | } else if (ceph_argparse_witharg(args, arg_iter, &val, "--nfrags", | |
96 | (char*) NULL)) { | |
97 | n_nfrags = atoi(val.c_str()); | |
98 | } else if (ceph_argparse_flag(args, arg_iter, "--dfast", | |
99 | (char*) NULL)) { | |
100 | dfast = true; | |
101 | } else { | |
102 | ++arg_iter; | |
103 | } | |
104 | }; | |
105 | ||
106 | if (!args.empty()) { | |
107 | cerr << "What is this? -- " << args[0] << std::endl; | |
108 | usage(cerr); | |
109 | exit(1); | |
110 | } | |
111 | ||
112 | DispatchStrategy* dstrategy; | |
113 | if (dfast) | |
114 | dstrategy = new FastStrategy(); | |
115 | else | |
116 | dstrategy = new QueueStrategy(2); | |
117 | ||
118 | messenger = new XioMessenger(g_ceph_context, | |
119 | entity_name_t::MON(-1), | |
120 | "xio_client", | |
121 | 0 /* nonce */, | |
122 | 0 /* cflags */, | |
123 | dstrategy); | |
124 | ||
125 | // enable timing prints | |
126 | static_cast<XioMessenger*>(messenger)->set_magic( | |
127 | MSG_MAGIC_REDUPE /* resubmit messages on delivery (REQUIRED) */ | | |
128 | MSG_MAGIC_TRACE_CTR /* timing prints */); | |
129 | ||
130 | // ensure we have a pool of sizeof(payload data) | |
131 | if (n_dsize) | |
132 | (void) static_cast<XioMessenger*>(messenger)->pool_hint(n_dsize); | |
133 | ||
134 | messenger->set_default_policy(Messenger::Policy::lossy_client(0)); | |
135 | ||
136 | string dest_str = "tcp://"; | |
137 | dest_str += addr; | |
138 | dest_str += ":"; | |
139 | dest_str += port; | |
140 | entity_addr_from_url(&dest_addr, dest_str.c_str()); | |
141 | entity_inst_t dest_server(entity_name_t::MON(-1), dest_addr); | |
142 | ||
143 | dispatcher = new XioDispatcher(messenger); | |
144 | messenger->add_dispatcher_head(dispatcher); | |
145 | ||
146 | dispatcher->set_active(); // this side is the pinger | |
147 | ||
148 | r = messenger->start(); | |
149 | if (r < 0) | |
150 | goto out; | |
151 | ||
152 | conn = messenger->get_connection(dest_server); | |
153 | ||
154 | // do stuff | |
155 | time_t t1, t2; | |
156 | t1 = time(NULL); | |
157 | ||
158 | int msg_ix; | |
159 | for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) { | |
160 | /* add a data payload if asked */ | |
161 | if (! n_dsize) { | |
162 | conn->send_message(new MPing()); | |
163 | } else { | |
164 | conn->send_message(new_simple_ping_with_data("xio_client", n_dsize, n_nfrags)); | |
165 | } | |
166 | } | |
167 | ||
168 | // do stuff | |
169 | while (conn->is_connected()) { | |
170 | nanosleep(&ts, NULL); | |
171 | } | |
172 | ||
173 | t2 = time(NULL); | |
174 | cout << "Processed " | |
175 | << static_cast<XioConnection*>(conn->get())->get_scount() | |
176 | << " one-way messages in " << t2-t1 << "s" | |
177 | << std::endl; | |
178 | ||
179 | conn->put(); | |
180 | ||
181 | // wait a bit for cleanup to finalize | |
182 | ts.tv_sec = 5; | |
183 | nanosleep(&ts, NULL); | |
184 | ||
185 | messenger->shutdown(); | |
186 | ||
187 | out: | |
188 | return r; | |
189 | } |