2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <thrift/async/TAsyncBufferProcessor.h>
21 #include <thrift/async/TAsyncProtocolProcessor.h>
22 #include <thrift/async/TEvhttpServer.h>
23 #include <thrift/concurrency/ThreadFactory.h>
24 #include <thrift/concurrency/ThreadManager.h>
25 #include <thrift/processor/TMultiplexedProcessor.h>
26 #include <thrift/protocol/TBinaryProtocol.h>
27 #include <thrift/protocol/TCompactProtocol.h>
28 #include <thrift/protocol/THeaderProtocol.h>
29 #include <thrift/protocol/TJSONProtocol.h>
30 #include <thrift/server/TNonblockingServer.h>
31 #include <thrift/server/TSimpleServer.h>
32 #include <thrift/server/TThreadPoolServer.h>
33 #include <thrift/server/TThreadedServer.h>
34 #include <thrift/transport/THttpServer.h>
35 #include <thrift/transport/THttpTransport.h>
36 #include <thrift/transport/TNonblockingSSLServerSocket.h>
37 #include <thrift/transport/TNonblockingServerSocket.h>
38 #include <thrift/transport/TSSLServerSocket.h>
39 #include <thrift/transport/TSSLSocket.h>
40 #include <thrift/transport/TServerSocket.h>
41 #include <thrift/transport/TTransportUtils.h>
42 #include <thrift/transport/TZlibTransport.h>
44 #include "SecondService.h"
45 #include "ThriftTest.h"
50 #ifdef HAVE_INTTYPES_H
61 #include <boost/algorithm/string.hpp>
62 #include <boost/program_options.hpp>
63 #include <boost/filesystem.hpp>
66 #include <thrift/windows/TWinsockSingleton.h>
71 using namespace apache::thrift
;
72 using namespace apache::thrift::async
;
73 using namespace apache::thrift::concurrency
;
74 using namespace apache::thrift::protocol
;
75 using namespace apache::thrift::transport
;
76 using namespace apache::thrift::server
;
78 using namespace thrift::test
;
80 // to handle a controlled shutdown, signal handling is mandatory
82 apache::thrift::concurrency::Monitor gMonitor
;
83 void signal_handler(int signum
)
85 if (signum
== SIGINT
) {
91 class TestHandler
: public ThriftTestIf
{
93 TestHandler() = default;
95 void testVoid() override
{ printf("testVoid()\n"); }
97 void testString(string
& out
, const string
& thing
) override
{
98 printf("testString(\"%s\")\n", thing
.c_str());
102 bool testBool(const bool thing
) override
{
103 printf("testBool(%s)\n", thing
? "true" : "false");
107 int8_t testByte(const int8_t thing
) override
{
108 printf("testByte(%d)\n", (int)thing
);
112 int32_t testI32(const int32_t thing
) override
{
113 printf("testI32(%d)\n", thing
);
117 int64_t testI64(const int64_t thing
) override
{
118 printf("testI64(%" PRId64
")\n", thing
);
122 double testDouble(const double thing
) override
{
123 printf("testDouble(%f)\n", thing
);
127 void testBinary(std::string
& _return
, const std::string
& thing
) override
{
128 std::ostringstream hexstr
;
129 hexstr
<< std::hex
<< thing
;
130 printf("testBinary(%lu: %s)\n", safe_numeric_cast
<unsigned long>(thing
.size()), hexstr
.str().c_str());
134 void testStruct(Xtruct
& out
, const Xtruct
& thing
) override
{
135 printf("testStruct({\"%s\", %d, %d, %" PRId64
"})\n",
136 thing
.string_thing
.c_str(),
137 (int)thing
.byte_thing
,
143 void testNest(Xtruct2
& out
, const Xtruct2
& nest
) override
{
144 const Xtruct
& thing
= nest
.struct_thing
;
145 printf("testNest({%d, {\"%s\", %d, %d, %" PRId64
"}, %d})\n",
146 (int)nest
.byte_thing
,
147 thing
.string_thing
.c_str(),
148 (int)thing
.byte_thing
,
155 void testMap(map
<int32_t, int32_t>& out
, const map
<int32_t, int32_t>& thing
) override
{
157 map
<int32_t, int32_t>::const_iterator m_iter
;
159 for (m_iter
= thing
.begin(); m_iter
!= thing
.end(); ++m_iter
) {
165 printf("%d => %d", m_iter
->first
, m_iter
->second
);
171 void testStringMap(map
<std::string
, std::string
>& out
,
172 const map
<std::string
, std::string
>& thing
) override
{
174 map
<std::string
, std::string
>::const_iterator m_iter
;
176 for (m_iter
= thing
.begin(); m_iter
!= thing
.end(); ++m_iter
) {
182 printf("%s => %s", (m_iter
->first
).c_str(), (m_iter
->second
).c_str());
188 void testSet(set
<int32_t>& out
, const set
<int32_t>& thing
) override
{
190 set
<int32_t>::const_iterator s_iter
;
192 for (s_iter
= thing
.begin(); s_iter
!= thing
.end(); ++s_iter
) {
198 printf("%d", *s_iter
);
204 void testList(vector
<int32_t>& out
, const vector
<int32_t>& thing
) override
{
205 printf("testList({");
206 vector
<int32_t>::const_iterator l_iter
;
208 for (l_iter
= thing
.begin(); l_iter
!= thing
.end(); ++l_iter
) {
214 printf("%d", *l_iter
);
220 Numberz::type
testEnum(const Numberz::type thing
) override
{
221 printf("testEnum(%d)\n", thing
);
225 UserId
testTypedef(const UserId thing
) override
{
226 printf("testTypedef(%" PRId64
")\n", thing
);
230 void testMapMap(map
<int32_t, map
<int32_t, int32_t> >& mapmap
, const int32_t hello
) override
{
231 printf("testMapMap(%d)\n", hello
);
233 map
<int32_t, int32_t> pos
;
234 map
<int32_t, int32_t> neg
;
235 for (int i
= 1; i
< 5; i
++) {
236 pos
.insert(make_pair(i
, i
));
237 neg
.insert(make_pair(-i
, -i
));
240 mapmap
.insert(make_pair(4, pos
));
241 mapmap
.insert(make_pair(-4, neg
));
244 void testInsanity(map
<UserId
, map
<Numberz::type
, Insanity
> >& insane
, const Insanity
& argument
) override
{
245 printf("testInsanity()\n");
248 map
<Numberz::type
, Insanity
> first_map
;
249 map
<Numberz::type
, Insanity
> second_map
;
251 first_map
.insert(make_pair(Numberz::TWO
, argument
));
252 first_map
.insert(make_pair(Numberz::THREE
, argument
));
254 second_map
.insert(make_pair(Numberz::SIX
, looney
));
256 insane
.insert(make_pair(1, first_map
));
257 insane
.insert(make_pair(2, second_map
));
261 map
<UserId
, map
<Numberz::type
, Insanity
> >::const_iterator i_iter
;
262 for (i_iter
= insane
.begin(); i_iter
!= insane
.end(); ++i_iter
) {
263 printf("%" PRId64
" => {", i_iter
->first
);
264 map
<Numberz::type
, Insanity
>::const_iterator i2_iter
;
265 for (i2_iter
= i_iter
->second
.begin(); i2_iter
!= i_iter
->second
.end(); ++i2_iter
) {
266 printf("%d => {", i2_iter
->first
);
267 map
<Numberz::type
, UserId
> userMap
= i2_iter
->second
.userMap
;
268 map
<Numberz::type
, UserId
>::const_iterator um
;
270 for (um
= userMap
.begin(); um
!= userMap
.end(); ++um
) {
271 printf("%d => %" PRId64
", ", um
->first
, um
->second
);
275 vector
<Xtruct
> xtructs
= i2_iter
->second
.xtructs
;
276 vector
<Xtruct
>::const_iterator x
;
278 for (x
= xtructs
.begin(); x
!= xtructs
.end(); ++x
) {
279 printf("{\"%s\", %d, %d, %" PRId64
"}, ",
280 x
->string_thing
.c_str(),
294 void testMulti(Xtruct
& hello
,
298 const std::map
<int16_t, std::string
>& arg3
,
299 const Numberz::type arg4
,
300 const UserId arg5
) override
{
305 printf("testMulti()\n");
307 hello
.string_thing
= "Hello2";
308 hello
.byte_thing
= arg0
;
309 hello
.i32_thing
= arg1
;
310 hello
.i64_thing
= (int64_t)arg2
;
313 void testException(const std::string
& arg
) override
{
314 printf("testException(%s)\n", arg
.c_str());
315 if (arg
.compare("Xception") == 0) {
320 } else if (arg
.compare("TException") == 0) {
321 apache::thrift::TException e
;
325 result
.string_thing
= arg
;
330 void testMultiException(Xtruct
& result
,
331 const std::string
& arg0
,
332 const std::string
& arg1
) override
{
334 printf("testMultiException(%s, %s)\n", arg0
.c_str(), arg1
.c_str());
336 if (arg0
.compare("Xception") == 0) {
339 e
.message
= "This is an Xception";
341 } else if (arg0
.compare("Xception2") == 0) {
344 e
.struct_thing
.string_thing
= "This is an Xception2";
347 result
.string_thing
= arg1
;
352 void testOneway(const int32_t aNum
) override
{
353 printf("testOneway(%d): call received\n", aNum
);
357 class SecondHandler
: public SecondServiceIf
360 void secondtestString(std::string
& result
, const std::string
& thing
) override
361 { result
= "testString(\"" + thing
+ "\")"; }
364 class TestProcessorEventHandler
: public TProcessorEventHandler
{
365 void* getContext(const char* fn_name
, void* serverContext
) override
{
367 return new std::string(fn_name
);
369 void freeContext(void* ctx
, const char* fn_name
) override
{
371 delete static_cast<std::string
*>(ctx
);
373 void preRead(void* ctx
, const char* fn_name
) override
{ communicate("preRead", ctx
, fn_name
); }
374 void postRead(void* ctx
, const char* fn_name
, uint32_t bytes
) override
{
376 communicate("postRead", ctx
, fn_name
);
378 void preWrite(void* ctx
, const char* fn_name
) override
{ communicate("preWrite", ctx
, fn_name
); }
379 void postWrite(void* ctx
, const char* fn_name
, uint32_t bytes
) override
{
381 communicate("postWrite", ctx
, fn_name
);
383 void asyncComplete(void* ctx
, const char* fn_name
) override
{
384 communicate("asyncComplete", ctx
, fn_name
);
386 void handlerError(void* ctx
, const char* fn_name
) override
{
387 communicate("handlerError", ctx
, fn_name
);
390 void communicate(const char* event
, void* ctx
, const char* fn_name
) {
391 std::cout
<< event
<< ": " << *static_cast<std::string
*>(ctx
) << " = " << fn_name
<< std::endl
;
395 class TestHandlerAsync
: public ThriftTestCobSvIf
{
397 TestHandlerAsync(std::shared_ptr
<TestHandler
>& handler
) : _delegate(handler
) {}
398 ~TestHandlerAsync() override
= default;
400 void testVoid(std::function
<void()> cob
) override
{
401 _delegate
->testVoid();
405 void testString(std::function
<void(std::string
const& _return
)> cob
,
406 const std::string
& thing
) override
{
408 _delegate
->testString(res
, thing
);
412 void testBool(std::function
<void(bool const& _return
)> cob
, const bool thing
) override
{
413 bool res
= _delegate
->testBool(thing
);
417 void testByte(std::function
<void(int8_t const& _return
)> cob
, const int8_t thing
) override
{
418 int8_t res
= _delegate
->testByte(thing
);
422 void testI32(std::function
<void(int32_t const& _return
)> cob
, const int32_t thing
) override
{
423 int32_t res
= _delegate
->testI32(thing
);
427 void testI64(std::function
<void(int64_t const& _return
)> cob
, const int64_t thing
) override
{
428 int64_t res
= _delegate
->testI64(thing
);
432 void testDouble(std::function
<void(double const& _return
)> cob
, const double thing
) override
{
433 double res
= _delegate
->testDouble(thing
);
437 void testBinary(std::function
<void(std::string
const& _return
)> cob
,
438 const std::string
& thing
) override
{
440 _delegate
->testBinary(res
, thing
);
444 void testStruct(std::function
<void(Xtruct
const& _return
)> cob
, const Xtruct
& thing
) override
{
446 _delegate
->testStruct(res
, thing
);
450 void testNest(std::function
<void(Xtruct2
const& _return
)> cob
, const Xtruct2
& thing
) override
{
452 _delegate
->testNest(res
, thing
);
456 void testMap(std::function
<void(std::map
<int32_t, int32_t> const& _return
)> cob
,
457 const std::map
<int32_t, int32_t>& thing
) override
{
458 std::map
<int32_t, int32_t> res
;
459 _delegate
->testMap(res
, thing
);
464 std::function
<void(std::map
<std::string
, std::string
> const& _return
)> cob
,
465 const std::map
<std::string
, std::string
>& thing
) override
{
466 std::map
<std::string
, std::string
> res
;
467 _delegate
->testStringMap(res
, thing
);
471 void testSet(std::function
<void(std::set
<int32_t> const& _return
)> cob
,
472 const std::set
<int32_t>& thing
) override
{
473 std::set
<int32_t> res
;
474 _delegate
->testSet(res
, thing
);
478 void testList(std::function
<void(std::vector
<int32_t> const& _return
)> cob
,
479 const std::vector
<int32_t>& thing
) override
{
480 std::vector
<int32_t> res
;
481 _delegate
->testList(res
, thing
);
485 void testEnum(std::function
<void(Numberz::type
const& _return
)> cob
,
486 const Numberz::type thing
) override
{
487 Numberz::type res
= _delegate
->testEnum(thing
);
491 void testTypedef(std::function
<void(UserId
const& _return
)> cob
, const UserId thing
) override
{
492 UserId res
= _delegate
->testTypedef(thing
);
497 std::function
<void(std::map
<int32_t, std::map
<int32_t, int32_t> > const& _return
)> cob
,
498 const int32_t hello
) override
{
499 std::map
<int32_t, std::map
<int32_t, int32_t> > res
;
500 _delegate
->testMapMap(res
, hello
);
505 std::function
<void(std::map
<UserId
, std::map
<Numberz::type
, Insanity
> > const& _return
)> cob
,
506 const Insanity
& argument
) override
{
507 std::map
<UserId
, std::map
<Numberz::type
, Insanity
> > res
;
508 _delegate
->testInsanity(res
, argument
);
512 void testMulti(std::function
<void(Xtruct
const& _return
)> cob
,
516 const std::map
<int16_t, std::string
>& arg3
,
517 const Numberz::type arg4
,
518 const UserId arg5
) override
{
520 _delegate
->testMulti(res
, arg0
, arg1
, arg2
, arg3
, arg4
, arg5
);
525 std::function
<void()> cob
,
526 std::function
<void(::apache::thrift::TDelayedException
* _throw
)> exn_cob
,
527 const std::string
& arg
) override
{
529 _delegate
->testException(arg
);
530 } catch (const apache::thrift::TException
& e
) {
531 exn_cob(apache::thrift::TDelayedException::delayException(e
));
537 void testMultiException(
538 std::function
<void(Xtruct
const& _return
)> cob
,
539 std::function
<void(::apache::thrift::TDelayedException
* _throw
)> exn_cob
,
540 const std::string
& arg0
,
541 const std::string
& arg1
) override
{
544 _delegate
->testMultiException(res
, arg0
, arg1
);
545 } catch (const apache::thrift::TException
& e
) {
546 exn_cob(apache::thrift::TDelayedException::delayException(e
));
552 void testOneway(std::function
<void()> cob
, const int32_t secondsToSleep
) override
{
553 _delegate
->testOneway(secondsToSleep
);
558 std::shared_ptr
<TestHandler
> _delegate
;
561 namespace po
= boost::program_options
;
563 int main(int argc
, char** argv
) {
565 string testDir
= boost::filesystem::system_complete(argv
[0]).parent_path().parent_path().parent_path().string();
566 string certPath
= testDir
+ "/keys/server.crt";
567 string keyPath
= testDir
+ "/keys/server.key";
570 transport::TWinsockSingleton::create();
575 string transport_type
= "buffered";
576 string protocol_type
= "binary";
577 string server_type
= "simple";
578 string domain_socket
= "";
579 bool abstract_namespace
= false;
581 int string_limit
= 0;
582 int container_limit
= 0;
584 po::options_description
desc("Allowed options");
586 ("help,h", "produce help message")
587 ("port", po::value
<int>(&port
)->default_value(port
), "Port number to listen")
588 ("domain-socket", po::value
<string
>(&domain_socket
) ->default_value(domain_socket
), "Unix Domain Socket (e.g. /tmp/ThriftTest.thrift)")
589 ("abstract-namespace", "Create the domain socket in the Abstract Namespace (no connection with filesystem pathnames)")
590 ("server-type", po::value
<string
>(&server_type
)->default_value(server_type
), "type of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\"")
591 ("transport", po::value
<string
>(&transport_type
)->default_value(transport_type
), "transport: buffered, framed, http, zlib")
592 ("protocol", po::value
<string
>(&protocol_type
)->default_value(protocol_type
), "protocol: binary, compact, header, json, multi, multic, multih, multij")
593 ("ssl", "Encrypted Transport using SSL")
594 ("zlib", "Wrapped Transport using Zlib")
595 ("processor-events", "processor-events")
596 ("workers,n", po::value
<size_t>(&workers
)->default_value(workers
), "Number of thread pools workers. Only valid for thread-pool server type")
597 ("string-limit", po::value
<int>(&string_limit
))
598 ("container-limit", po::value
<int>(&container_limit
));
600 po::variables_map vm
;
601 po::store(po::parse_command_line(argc
, argv
, desc
), vm
);
604 if (vm
.count("help")) {
605 cout
<< desc
<< "\n";
610 if (!server_type
.empty()) {
611 if (server_type
== "simple") {
612 } else if (server_type
== "thread-pool") {
613 } else if (server_type
== "threaded") {
614 } else if (server_type
== "nonblocking") {
616 throw invalid_argument("Unknown server type " + server_type
);
620 if (!protocol_type
.empty()) {
621 if (protocol_type
== "binary") {
622 } else if (protocol_type
== "compact") {
623 } else if (protocol_type
== "json") {
624 } else if (protocol_type
== "header") {
625 } else if (protocol_type
== "multi") { // multiplexed binary
626 } else if (protocol_type
== "multic") { // multiplexed compact
627 } else if (protocol_type
== "multih") { // multiplexed header
628 } else if (protocol_type
== "multij") { // multiplexed json
630 throw invalid_argument("Unknown protocol type " + protocol_type
);
634 if (!transport_type
.empty()) {
635 if (transport_type
== "buffered") {
636 } else if (transport_type
== "framed") {
637 } else if (transport_type
== "http") {
638 } else if (transport_type
== "zlib") {
639 // crosstester will pass zlib as a flag and a transport right now...
641 throw invalid_argument("Unknown transport type " + transport_type
);
645 } catch (std::exception
& e
) {
646 cerr
<< e
.what() << endl
;
647 cout
<< desc
<< "\n";
651 if (vm
.count("ssl")) {
655 if (vm
.count("zlib")) {
659 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
661 signal(SIGPIPE
, SIG_IGN
); // for OpenSSL, otherwise we end abruptly
665 if (vm
.count("abstract-namespace")) {
666 abstract_namespace
= true;
670 std::shared_ptr
<TProtocolFactory
> protocolFactory
;
671 if (protocol_type
== "json" || protocol_type
== "multij") {
672 std::shared_ptr
<TProtocolFactory
> jsonProtocolFactory(new TJSONProtocolFactory());
673 protocolFactory
= jsonProtocolFactory
;
674 } else if (protocol_type
== "compact" || protocol_type
== "multic") {
675 auto *compactProtocolFactory
= new TCompactProtocolFactoryT
<TBufferBase
>();
676 compactProtocolFactory
->setContainerSizeLimit(container_limit
);
677 compactProtocolFactory
->setStringSizeLimit(string_limit
);
678 protocolFactory
.reset(compactProtocolFactory
);
679 } else if (protocol_type
== "header" || protocol_type
== "multih") {
680 std::shared_ptr
<TProtocolFactory
> headerProtocolFactory(new THeaderProtocolFactory());
681 protocolFactory
= headerProtocolFactory
;
683 auto* binaryProtocolFactory
= new TBinaryProtocolFactoryT
<TBufferBase
>();
684 binaryProtocolFactory
->setContainerSizeLimit(container_limit
);
685 binaryProtocolFactory
->setStringSizeLimit(string_limit
);
686 protocolFactory
.reset(binaryProtocolFactory
);
690 std::shared_ptr
<TestHandler
> testHandler(new TestHandler());
691 std::shared_ptr
<TProcessor
> testProcessor(new ThriftTestProcessor(testHandler
));
693 if (vm
.count("processor-events")) {
694 testProcessor
->setEventHandler(
695 std::shared_ptr
<TProcessorEventHandler
>(new TestProcessorEventHandler()));
699 std::shared_ptr
<TSSLSocketFactory
> sslSocketFactory
;
700 std::shared_ptr
<TServerSocket
> serverSocket
;
703 sslSocketFactory
= std::shared_ptr
<TSSLSocketFactory
>(new TSSLSocketFactory());
704 sslSocketFactory
->loadCertificate(certPath
.c_str());
705 sslSocketFactory
->loadPrivateKey(keyPath
.c_str());
706 sslSocketFactory
->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
707 if (server_type
!= "nonblocking") {
708 serverSocket
= std::shared_ptr
<TServerSocket
>(new TSSLServerSocket(port
, sslSocketFactory
));
711 if (domain_socket
!= "") {
712 if (abstract_namespace
) {
713 std::string
abstract_socket("\0", 1);
714 abstract_socket
+= domain_socket
;
715 serverSocket
= std::shared_ptr
<TServerSocket
>(new TServerSocket(abstract_socket
));
717 unlink(domain_socket
.c_str());
718 serverSocket
= std::shared_ptr
<TServerSocket
>(new TServerSocket(domain_socket
));
722 serverSocket
= std::shared_ptr
<TServerSocket
>(new TServerSocket(port
));
727 std::shared_ptr
<TTransportFactory
> transportFactory
;
729 if (transport_type
== "http" && server_type
!= "nonblocking") {
730 transportFactory
= std::make_shared
<THttpServerTransportFactory
>();
731 } else if (transport_type
== "framed") {
732 transportFactory
= std::make_shared
<TFramedTransportFactory
>();
734 transportFactory
= std::make_shared
<TBufferedTransportFactory
>();
738 // hmm.. doesn't seem to be a way to make it wrap the others...
739 transportFactory
= std::make_shared
<TZlibTransportFactory
>();
743 cout
<< "Starting \"" << server_type
<< "\" server (" << transport_type
<< "/" << protocol_type
745 if (abstract_namespace
) {
748 cout
<< domain_socket
;
754 // Multiplexed Processor if needed
755 if (boost::starts_with(protocol_type
, "multi")) {
756 std::shared_ptr
<SecondHandler
> secondHandler(new SecondHandler());
757 std::shared_ptr
<SecondServiceProcessor
> secondProcessor(new SecondServiceProcessor(secondHandler
));
759 std::shared_ptr
<TMultiplexedProcessor
> multiplexedProcessor(new TMultiplexedProcessor());
760 multiplexedProcessor
->registerDefault(testProcessor
); // non-multi clients go to the default processor (multi:binary, multic:compact, ...)
761 multiplexedProcessor
->registerProcessor("ThriftTest", testProcessor
);
762 multiplexedProcessor
->registerProcessor("SecondService", secondProcessor
);
763 testProcessor
= std::dynamic_pointer_cast
<TProcessor
>(multiplexedProcessor
);
767 std::shared_ptr
<apache::thrift::server::TServer
> server
;
769 if (server_type
== "simple") {
770 server
.reset(new TSimpleServer(testProcessor
, serverSocket
, transportFactory
, protocolFactory
));
771 } else if (server_type
== "thread-pool") {
773 std::shared_ptr
<ThreadFactory
> threadFactory
774 = std::shared_ptr
<ThreadFactory
>(new ThreadFactory());
776 std::shared_ptr
<ThreadManager
> threadManager
= ThreadManager::newSimpleThreadManager(workers
);
777 threadManager
->threadFactory(threadFactory
);
778 threadManager
->start();
780 server
.reset(new TThreadPoolServer(testProcessor
,
785 } else if (server_type
== "threaded") {
787 new TThreadedServer(testProcessor
, serverSocket
, transportFactory
, protocolFactory
));
788 } else if (server_type
== "nonblocking") {
789 if (transport_type
== "http") {
790 std::shared_ptr
<TestHandlerAsync
> testHandlerAsync(new TestHandlerAsync(testHandler
));
791 std::shared_ptr
<TAsyncProcessor
> testProcessorAsync(
792 new ThriftTestAsyncProcessor(testHandlerAsync
));
793 std::shared_ptr
<TAsyncBufferProcessor
> testBufferProcessor(
794 new TAsyncProtocolProcessor(testProcessorAsync
, protocolFactory
));
796 // not loading nonblockingServer into "server" because
797 // TEvhttpServer doesn't inherit from TServer, and doesn't
798 // provide a stop method.
799 TEvhttpServer
nonblockingServer(testBufferProcessor
, port
);
800 nonblockingServer
.serve();
801 } else if (transport_type
== "framed") {
802 std::shared_ptr
<transport::TNonblockingServerTransport
> nbSocket
;
804 ssl
? new transport::TNonblockingSSLServerSocket(port
, sslSocketFactory
)
805 : new transport::TNonblockingServerSocket(port
));
806 server
.reset(new TNonblockingServer(testProcessor
, protocolFactory
, nbSocket
));
808 cerr
<< "server-type nonblocking requires transport of http or framed" << endl
;
813 if (server
.get() != nullptr) {
814 if (protocol_type
== "header") {
815 // Tell the server to use the same protocol for input / output
817 server
->setOutputProtocolFactory(std::shared_ptr
<TProtocolFactory
>());
820 apache::thrift::concurrency::ThreadFactory factory
;
821 factory
.setDetached(false);
822 std::shared_ptr
<apache::thrift::concurrency::Runnable
> serverThreadRunner(server
);
823 std::shared_ptr
<apache::thrift::concurrency::Thread
> thread
824 = factory
.newThread(serverThreadRunner
);
827 signal(SIGINT
, signal_handler
);
831 gMonitor
.waitForever(); // wait for a shutdown signal
834 signal(SIGINT
, SIG_DFL
);
842 cout
<< "done." << endl
;