2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 module thrift_test_server;
22 import core.stdc.errno : errno;
23 import core.stdc.signal : signal, SIGINT, SIG_DFL, SIG_ERR;
24 import core.thread : dur, Thread;
26 import std.exception : enforce;
28 import std.parallelism : totalCPUs;
31 import std.typetuple : TypeTuple, staticMap;
33 import thrift.codegen.processor;
34 import thrift.protocol.base;
35 import thrift.protocol.binary;
36 import thrift.protocol.compact;
37 import thrift.protocol.json;
38 import thrift.server.base;
39 import thrift.server.transport.socket;
40 import thrift.server.transport.ssl;
41 import thrift.transport.base;
42 import thrift.transport.buffered;
43 import thrift.transport.framed;
44 import thrift.transport.http;
45 import thrift.transport.ssl;
46 import thrift.util.cancellation;
47 import thrift.util.hashset;
50 import thrift_test_common;
51 import thrift.test.ThriftTest_types;
52 import thrift.test.ThriftTest;
54 class TestHandler : ThriftTest {
59 override void testVoid() {
60 if (trace_) writeln("testVoid()");
63 override string testString(string thing) {
64 if (trace_) writefln("testString(\"%s\")", thing);
68 override byte testByte(byte thing) {
69 if (trace_) writefln("testByte(%s)", thing);
73 override int testI32(int thing) {
74 if (trace_) writefln("testI32(%s)", thing);
78 override long testI64(long thing) {
79 if (trace_) writefln("testI64(%s)", thing);
83 override double testDouble(double thing) {
84 if (trace_) writefln("testDouble(%s)", thing);
88 override string testBinary(string thing) {
89 if (trace_) writefln("testBinary(\"%s\")", thing);
93 override bool testBool(bool thing) {
94 if (trace_) writefln("testBool(\"%s\")", thing);
98 override Xtruct testStruct(ref const(Xtruct) thing) {
99 if (trace_) writefln("testStruct({\"%s\", %s, %s, %s})",
100 thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing);
104 override Xtruct2 testNest(ref const(Xtruct2) nest) {
105 auto thing = nest.struct_thing;
106 if (trace_) writefln("testNest({%s, {\"%s\", %s, %s, %s}, %s})",
107 nest.byte_thing, thing.string_thing, thing.byte_thing, thing.i32_thing,
108 thing.i64_thing, nest.i32_thing);
112 override int[int] testMap(int[int] thing) {
113 if (trace_) writefln("testMap({%s})", thing);
117 override HashSet!int testSet(HashSet!int thing) {
118 if (trace_) writefln("testSet({%s})",
119 join(map!`to!string(a)`(thing[]), ", "));
123 override int[] testList(int[] thing) {
124 if (trace_) writefln("testList(%s)", thing);
128 override Numberz testEnum(Numberz thing) {
129 if (trace_) writefln("testEnum(%s)", thing);
133 override UserId testTypedef(UserId thing) {
134 if (trace_) writefln("testTypedef(%s)", thing);
138 override string[string] testStringMap(string[string] thing) {
139 if (trace_) writefln("testStringMap(%s)", thing);
143 override int[int][int] testMapMap(int hello) {
144 if (trace_) writefln("testMapMap(%s)", hello);
145 return testMapMapReturn;
148 override Insanity[Numberz][UserId] testInsanity(ref const(Insanity) argument) {
149 if (trace_) writeln("testInsanity()");
150 Insanity[Numberz][UserId] ret;
151 Insanity[Numberz] m1;
152 Insanity[Numberz] m2;
154 tmp = cast(Insanity)argument;
155 m1[Numberz.TWO] = tmp;
156 m1[Numberz.THREE] = tmp;
157 m2[Numberz.SIX] = Insanity();
163 override Xtruct testMulti(byte arg0, int arg1, long arg2, string[short] arg3,
164 Numberz arg4, UserId arg5)
166 if (trace_) writeln("testMulti()");
167 return Xtruct("Hello2", arg0, arg1, arg2);
170 override void testException(string arg) {
171 if (trace_) writefln("testException(%s)", arg);
172 if (arg == "Xception") {
173 auto e = new Xception();
177 } else if (arg == "TException") {
178 throw new TException();
179 } else if (arg == "ApplicationException") {
180 throw new TException();
184 override Xtruct testMultiException(string arg0, string arg1) {
185 if (trace_) writefln("testMultiException(%s, %s)", arg0, arg1);
187 if (arg0 == "Xception") {
188 auto e = new Xception();
190 e.message = "This is an Xception";
192 } else if (arg0 == "Xception2") {
193 auto e = new Xception2();
195 e.struct_thing.string_thing = "This is an Xception2";
202 override void testOneway(int sleepFor) {
203 if (trace_) writefln("testOneway(%s): Sleeping...", sleepFor);
204 Thread.sleep(dur!"seconds"(sleepFor));
205 if (trace_) writefln("testOneway(%s): done sleeping!", sleepFor);
212 shared(bool) gShutdown = false;
214 nothrow @nogc extern(C) void handleSignal(int sig) {
218 // Runs a thread that waits for shutdown to be
219 // signaled and then triggers cancellation,
220 // causing the server to stop. While we could
221 // use a signalfd for this purpose, we are instead
222 // opting for a busy waiting scheme for maximum
223 // portability since signalfd is a linux thing.
225 class ShutdownThread : Thread {
226 this(TCancellationOrigin cancellation) {
227 cancellation_ = cancellation;
234 Thread.sleep(dur!("msecs")(25));
236 cancellation_.trigger();
239 TCancellationOrigin cancellation_;
242 void main(string[] args) {
244 ServerType serverType;
245 ProtocolType protocolType;
246 size_t numIOThreads = 1;
247 TransportType transportType;
250 size_t taskPoolSize = totalCPUs;
252 getopt(args, "port", &port, "protocol", &protocolType, "server-type",
253 &serverType, "ssl", &ssl, "num-io-threads", &numIOThreads,
254 "task-pool-size", &taskPoolSize, "trace", &trace,
255 "transport", &transportType);
257 if (serverType == ServerType.nonblocking ||
258 serverType == ServerType.pooledNonblocking
260 enforce(transportType == TransportType.framed,
261 "Need to use framed transport with non-blocking server.");
262 enforce(!ssl, "The non-blocking server does not support SSL yet.");
264 // Don't wrap the contents into another layer of framing.
265 transportType = TransportType.raw;
268 version (ThriftTestTemplates) {
269 // Only exercise the specialized template code paths if explicitly enabled
270 // to reduce memory consumption on regular test suite runs – there should
271 // not be much that can go wrong with that specifically anyway.
272 alias TypeTuple!(TBufferedTransport, TFramedTransport, TServerHttpTransport)
275 staticMap!(TBinaryProtocol, AvailableTransports),
276 staticMap!(TCompactProtocol, AvailableTransports)
277 ) AvailableProtocols;
279 alias TypeTuple!() AvailableTransports;
280 alias TypeTuple!() AvailableProtocols;
283 TProtocolFactory protocolFactory;
284 final switch (protocolType) {
285 case ProtocolType.binary:
286 protocolFactory = new TBinaryProtocolFactory!AvailableTransports;
288 case ProtocolType.compact:
289 protocolFactory = new TCompactProtocolFactory!AvailableTransports;
291 case ProtocolType.json:
292 protocolFactory = new TJsonProtocolFactory!AvailableTransports;
296 auto processor = new TServiceProcessor!(ThriftTest, AvailableProtocols)(
297 new TestHandler(trace));
299 TServerSocket serverSocket;
301 auto sslContext = new TSSLContext();
302 sslContext.serverSide = true;
303 sslContext.loadCertificate("../../../test/keys/server.crt");
304 sslContext.loadPrivateKey("../../../test/keys/server.key");
305 sslContext.ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
306 serverSocket = new TSSLServerSocket(port, sslContext);
308 serverSocket = new TServerSocket(port);
311 auto transportFactory = createTransportFactory(transportType);
313 auto server = createServer(serverType, numIOThreads, taskPoolSize,
314 processor, serverSocket, transportFactory, protocolFactory);
316 // Set up SIGINT signal handling
317 enforce(signal(SIGINT, &handleSignal) != SIG_ERR,
318 "Could not replace the SIGINT signal handler: errno {0}".format(errno()));
320 // Set up a server cancellation trigger
321 auto cancel = new TCancellationOrigin();
323 // Set up a listener for the shutdown condition - this will
324 // wake up when the signal occurs and trigger cancellation.
325 auto shutdown = new ShutdownThread(cancel);
328 // Serve from this thread; the signal will stop the server
329 // and control will return here
330 writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType,
331 transportType, serverType, ssl ? "(using SSL) ": "", port);
332 server.serve(cancel);
334 signal(SIGINT, SIG_DFL);