]>
git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/test/cpp/src/StressTestNonBlocking.cpp
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <thrift/concurrency/ThreadManager.h>
21 #include <thrift/concurrency/ThreadFactory.h>
22 #include <thrift/concurrency/Monitor.h>
23 #include <thrift/concurrency/Mutex.h>
24 #include <thrift/protocol/TBinaryProtocol.h>
25 #include <thrift/server/TSimpleServer.h>
26 #include <thrift/server/TThreadPoolServer.h>
27 #include <thrift/server/TThreadedServer.h>
28 #include <thrift/server/TNonblockingServer.h>
29 #include <thrift/transport/TServerSocket.h>
30 #include <thrift/transport/TSocket.h>
31 #include <thrift/transport/TNonblockingServerSocket.h>
32 #include <thrift/transport/TTransportUtils.h>
33 #include <thrift/transport/TFileTransport.h>
34 #include <thrift/TLogging.h>
44 #include <thrift/windows/TWinsockSingleton.h>
49 using namespace apache::thrift
;
50 using namespace apache::thrift::protocol
;
51 using namespace apache::thrift::transport
;
52 using namespace apache::thrift::server
;
53 using namespace apache::thrift::concurrency
;
55 using namespace test::stress
;
58 bool operator()(const char* s1
, const char* s2
) const { return strcmp(s1
, s2
) == 0; }
62 bool operator()(const char* s1
, const char* s2
) const { return strcmp(s1
, s2
) < 0; }
65 // typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map;
66 typedef map
<const char*, int, ltstr
> count_map
;
68 class Server
: public ServiceIf
{
72 void count(const char* method
) {
74 int ct
= counts_
[method
];
75 counts_
[method
] = ++ct
;
78 void echoVoid() override
{
80 // Sleep to simulate work
85 count_map
getCount() {
90 int8_t echoByte(const int8_t arg
) override
{ return arg
; }
91 int32_t echoI32(const int32_t arg
) override
{ return arg
; }
92 int64_t echoI64(const int64_t arg
) override
{ return arg
; }
93 void echoString(string
& out
, const string
& arg
) override
{
95 T_ERROR_ABORT("WRONG STRING (%s)!!!!", arg
.c_str());
99 void echoList(vector
<int8_t>& out
, const vector
<int8_t>& arg
) override
{ out
= arg
; }
100 void echoSet(set
<int8_t>& out
, const set
<int8_t>& arg
) override
{ out
= arg
; }
101 void echoMap(map
<int8_t, int8_t>& out
, const map
<int8_t, int8_t>& arg
) override
{ out
= arg
; }
108 class ClientThread
: public Runnable
{
110 ClientThread(std::shared_ptr
<TTransport
> transport
,
111 std::shared_ptr
<ServiceClient
> client
,
116 : _transport(transport
),
119 _workerCount(workerCount
),
120 _loopCount(loopCount
),
121 _loopType(loopType
) {}
123 void run() override
{
125 // Wait for all worker threads to start
128 Synchronized
s(_monitor
);
129 while (_workerCount
== 0) {
134 _startTime
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
155 cerr
<< "Unexpected loop type" << _loopType
<< endl
;
159 _endTime
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
166 Synchronized
s(_monitor
);
170 if (_workerCount
== 0) {
177 void loopEchoVoid() {
178 for (size_t ix
= 0; ix
< _loopCount
; ix
++) {
183 void loopEchoByte() {
184 for (size_t ix
= 0; ix
< _loopCount
; ix
++) {
187 result
= _client
->echoByte(arg
);
189 assert(result
== arg
);
194 for (size_t ix
= 0; ix
< _loopCount
; ix
++) {
197 result
= _client
->echoI32(arg
);
199 assert(result
== arg
);
204 for (size_t ix
= 0; ix
< _loopCount
; ix
++) {
207 result
= _client
->echoI64(arg
);
209 assert(result
== arg
);
213 void loopEchoString() {
214 for (size_t ix
= 0; ix
< _loopCount
; ix
++) {
215 string arg
= "hello";
217 _client
->echoString(result
, arg
);
218 assert(result
== arg
);
222 std::shared_ptr
<TTransport
> _transport
;
223 std::shared_ptr
<ServiceClient
> _client
;
225 size_t& _workerCount
;
234 int main(int argc
, char** argv
) {
236 transport::TWinsockSingleton::create();
240 string serverType
= "simple";
241 string protocolType
= "binary";
242 uint32_t workerCount
= 4;
243 uint32_t clientCount
= 20;
244 uint32_t loopCount
= 1000;
245 TType loopType
= T_VOID
;
246 string callName
= "echoVoid";
247 bool runServer
= true;
248 bool logRequests
= false;
249 string requestLogPath
= "./requestlog.tlog";
250 bool replayRequests
= false;
254 usage
<< argv
[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] "
255 "[--protocol-type=<protocol-type>] [--workers=<worker-count>] "
256 "[--clients=<client-count>] [--loop=<loop-count>]" << endl
257 << "\tclients Number of client threads to create - 0 implies no clients, i.e. "
258 "server only. Default is " << clientCount
<< endl
259 << "\thelp Prints this help text." << endl
260 << "\tcall Service method to call. Default is " << callName
<< endl
261 << "\tloop The number of remote thrift calls each client makes. Default is "
262 << loopCount
<< endl
<< "\tport The port the server and clients should bind to "
263 "for thrift network connections. Default is " << port
<< endl
264 << "\tserver Run the Thrift server in this process. Default is " << runServer
265 << endl
<< "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is "
266 << serverType
<< endl
267 << "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is "
268 << protocolType
<< endl
269 << "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests
270 << endl
<< "\treplay-request Replay requests from log file (./requestlog.tlog) Default is "
271 << replayRequests
<< endl
<< "\tworkers Number of thread pools workers. Only valid "
272 "for thread-pool server type. Default is " << workerCount
275 map
<string
, string
> args
;
277 for (int ix
= 1; ix
< argc
; ix
++) {
279 string
arg(argv
[ix
]);
281 if (arg
.compare(0, 2, "--") == 0) {
283 size_t end
= arg
.find_first_of("=", 2);
285 string key
= string(arg
, 2, end
- 2);
287 if (end
!= string::npos
) {
288 args
[key
] = string(arg
, end
+ 1);
293 throw invalid_argument("Unexcepted command line token: " + arg
);
299 if (!args
["clients"].empty()) {
300 clientCount
= atoi(args
["clients"].c_str());
303 if (!args
["help"].empty()) {
308 if (!args
["loop"].empty()) {
309 loopCount
= atoi(args
["loop"].c_str());
312 if (!args
["call"].empty()) {
313 callName
= args
["call"];
316 if (!args
["port"].empty()) {
317 port
= atoi(args
["port"].c_str());
320 if (!args
["server"].empty()) {
321 runServer
= args
["server"] == "true";
324 if (!args
["log-request"].empty()) {
325 logRequests
= args
["log-request"] == "true";
328 if (!args
["replay-request"].empty()) {
329 replayRequests
= args
["replay-request"] == "true";
332 if (!args
["server-type"].empty()) {
333 serverType
= args
["server-type"];
336 if (!args
["workers"].empty()) {
337 workerCount
= atoi(args
["workers"].c_str());
340 } catch (std::exception
& e
) {
341 cerr
<< e
.what() << endl
;
345 std::shared_ptr
<ThreadFactory
> threadFactory
346 = std::shared_ptr
<ThreadFactory
>(new ThreadFactory());
349 std::shared_ptr
<Server
> serviceHandler(new Server());
351 if (replayRequests
) {
352 std::shared_ptr
<Server
> serviceHandler(new Server());
353 std::shared_ptr
<ServiceProcessor
> serviceProcessor(new ServiceProcessor(serviceHandler
));
356 std::shared_ptr
<TFileTransport
> fileTransport(new TFileTransport(requestLogPath
));
357 fileTransport
->setChunkSize(2 * 1024 * 1024);
358 fileTransport
->setMaxEventSize(1024 * 16);
359 fileTransport
->seekToEnd();
362 std::shared_ptr
<TProtocolFactory
> protocolFactory(new TBinaryProtocolFactory());
364 TFileProcessor
fileProcessor(serviceProcessor
, protocolFactory
, fileTransport
);
366 fileProcessor
.process(0, true);
372 std::shared_ptr
<ServiceProcessor
> serviceProcessor(new ServiceProcessor(serviceHandler
));
375 std::shared_ptr
<TProtocolFactory
> protocolFactory(new TBinaryProtocolFactory());
378 std::shared_ptr
<TTransportFactory
> transportFactory
;
381 // initialize the log file
382 std::shared_ptr
<TFileTransport
> fileTransport(new TFileTransport(requestLogPath
));
383 fileTransport
->setChunkSize(2 * 1024 * 1024);
384 fileTransport
->setMaxEventSize(1024 * 16);
387 = std::shared_ptr
<TTransportFactory
>(new TPipedTransportFactory(fileTransport
));
390 std::shared_ptr
<Thread
> serverThread
;
391 std::shared_ptr
<Thread
> serverThread2
;
392 std::shared_ptr
<transport::TNonblockingServerSocket
> nbSocket1
;
393 std::shared_ptr
<transport::TNonblockingServerSocket
> nbSocket2
;
395 if (serverType
== "simple") {
397 nbSocket1
.reset(new transport::TNonblockingServerSocket(port
));
398 serverThread
= threadFactory
->newThread(std::shared_ptr
<TServer
>(
399 new TNonblockingServer(serviceProcessor
, protocolFactory
, nbSocket1
)));
400 nbSocket2
.reset(new transport::TNonblockingServerSocket(port
+ 1));
401 serverThread2
= threadFactory
->newThread(std::shared_ptr
<TServer
>(
402 new TNonblockingServer(serviceProcessor
, protocolFactory
, nbSocket2
)));
404 } else if (serverType
== "thread-pool") {
406 std::shared_ptr
<ThreadManager
> threadManager
407 = ThreadManager::newSimpleThreadManager(workerCount
);
409 threadManager
->threadFactory(threadFactory
);
410 threadManager
->start();
411 nbSocket1
.reset(new transport::TNonblockingServerSocket(port
));
412 serverThread
= threadFactory
->newThread(std::shared_ptr
<TServer
>(
413 new TNonblockingServer(serviceProcessor
, protocolFactory
, nbSocket1
, threadManager
)));
414 nbSocket2
.reset(new transport::TNonblockingServerSocket(port
+ 1));
415 serverThread2
= threadFactory
->newThread(std::shared_ptr
<TServer
>(
416 new TNonblockingServer(serviceProcessor
, protocolFactory
, nbSocket2
, threadManager
)));
419 cerr
<< "Starting the server on port " << port
<< " and " << (port
+ 1) << endl
;
420 serverThread
->start();
421 serverThread2
->start();
423 // If we aren't running clients, just wait forever for external clients
425 if (clientCount
== 0) {
426 serverThread
->join();
427 serverThread2
->join();
432 if (clientCount
> 0) {
436 size_t threadCount
= 0;
438 set
<std::shared_ptr
<Thread
> > clientThreads
;
440 if (callName
== "echoVoid") {
442 } else if (callName
== "echoByte") {
444 } else if (callName
== "echoI32") {
446 } else if (callName
== "echoI64") {
448 } else if (callName
== "echoString") {
451 throw invalid_argument("Unknown service call " + callName
);
454 for (uint32_t ix
= 0; ix
< clientCount
; ix
++) {
456 std::shared_ptr
<TSocket
> socket(new TSocket("127.0.0.1", port
+ (ix
% 2)));
457 std::shared_ptr
<TFramedTransport
> framedSocket(new TFramedTransport(socket
));
458 std::shared_ptr
<TProtocol
> protocol(new TBinaryProtocol(framedSocket
));
459 std::shared_ptr
<ServiceClient
> serviceClient(new ServiceClient(protocol
));
461 clientThreads
.insert(threadFactory
->newThread(std::shared_ptr
<ClientThread
>(
462 new ClientThread(socket
, serviceClient
, monitor
, threadCount
, loopCount
, loopType
))));
465 for (auto thread
= clientThreads
.begin();
466 thread
!= clientThreads
.end();
475 Synchronized
s(monitor
);
476 threadCount
= clientCount
;
478 cerr
<< "Launch " << clientCount
<< " client threads" << endl
;
480 time00
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
484 while (threadCount
> 0) {
488 time01
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
491 int64_t firstTime
= 9223372036854775807LL;
492 int64_t lastTime
= 0;
494 double averageTime
= 0;
495 int64_t minTime
= 9223372036854775807LL;
498 for (auto ix
= clientThreads
.begin();
499 ix
!= clientThreads
.end();
502 std::shared_ptr
<ClientThread
> client
503 = std::dynamic_pointer_cast
<ClientThread
>((*ix
)->runnable());
505 int64_t delta
= client
->_endTime
- client
->_startTime
;
509 if (client
->_startTime
< firstTime
) {
510 firstTime
= client
->_startTime
;
513 if (client
->_endTime
> lastTime
) {
514 lastTime
= client
->_endTime
;
517 if (delta
< minTime
) {
521 if (delta
> maxTime
) {
525 averageTime
+= delta
;
528 averageTime
/= clientCount
;
530 cout
<< "workers :" << workerCount
<< ", client : " << clientCount
<< ", loops : " << loopCount
531 << ", rate : " << (clientCount
* loopCount
* 1000) / ((double)(time01
- time00
)) << endl
;
533 count_map count
= serviceHandler
->getCount();
534 count_map::iterator iter
;
535 for (iter
= count
.begin(); iter
!= count
.end(); ++iter
) {
536 printf("%s => %d\n", iter
->first
, iter
->second
);
538 cerr
<< "done." << endl
;