]>
git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/py/src/server/TServer.py
2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
20 from six
.moves
import queue
25 from thrift
.protocol
import TBinaryProtocol
26 from thrift
.protocol
.THeaderProtocol
import THeaderProtocolFactory
27 from thrift
.transport
import TTransport
29 logger
= logging
.getLogger(__name__
)
32 class TServer(object):
33 """Base interface for a server, which must have a serve() method.
35 Three constructors for all servers:
36 1) (processor, serverTransport)
37 2) (processor, serverTransport, transportFactory, protocolFactory)
38 3) (processor, serverTransport,
39 inputTransportFactory, outputTransportFactory,
40 inputProtocolFactory, outputProtocolFactory)
42 def __init__(self
, *args
):
44 self
.__initArgs
__(args
[0], args
[1],
45 TTransport
.TTransportFactoryBase(),
46 TTransport
.TTransportFactoryBase(),
47 TBinaryProtocol
.TBinaryProtocolFactory(),
48 TBinaryProtocol
.TBinaryProtocolFactory())
49 elif (len(args
) == 4):
50 self
.__initArgs
__(args
[0], args
[1], args
[2], args
[2], args
[3], args
[3])
51 elif (len(args
) == 6):
52 self
.__initArgs
__(args
[0], args
[1], args
[2], args
[3], args
[4], args
[5])
54 def __initArgs__(self
, processor
, serverTransport
,
55 inputTransportFactory
, outputTransportFactory
,
56 inputProtocolFactory
, outputProtocolFactory
):
57 self
.processor
= processor
58 self
.serverTransport
= serverTransport
59 self
.inputTransportFactory
= inputTransportFactory
60 self
.outputTransportFactory
= outputTransportFactory
61 self
.inputProtocolFactory
= inputProtocolFactory
62 self
.outputProtocolFactory
= outputProtocolFactory
64 input_is_header
= isinstance(self
.inputProtocolFactory
, THeaderProtocolFactory
)
65 output_is_header
= isinstance(self
.outputProtocolFactory
, THeaderProtocolFactory
)
66 if any((input_is_header
, output_is_header
)) and input_is_header
!= output_is_header
:
67 raise ValueError("THeaderProtocol servers require that both the input and "
68 "output protocols are THeaderProtocol.")
74 class TSimpleServer(TServer
):
75 """Simple single-threaded server that just pumps around one transport."""
77 def __init__(self
, *args
):
78 TServer
.__init
__(self
, *args
)
81 self
.serverTransport
.listen()
83 client
= self
.serverTransport
.accept()
87 itrans
= self
.inputTransportFactory
.getTransport(client
)
88 iprot
= self
.inputProtocolFactory
.getProtocol(itrans
)
90 # for THeaderProtocol, we must use the same protocol instance for
91 # input and output so that the response is in the same dialect that
92 # the server detected the request was in.
93 if isinstance(self
.inputProtocolFactory
, THeaderProtocolFactory
):
97 otrans
= self
.outputTransportFactory
.getTransport(client
)
98 oprot
= self
.outputProtocolFactory
.getProtocol(otrans
)
102 self
.processor
.process(iprot
, oprot
)
103 except TTransport
.TTransportException
:
105 except Exception as x
:
113 class TThreadedServer(TServer
):
114 """Threaded server that spawns a new thread per each connection."""
116 def __init__(self
, *args
, **kwargs
):
117 TServer
.__init
__(self
, *args
)
118 self
.daemon
= kwargs
.get("daemon", False)
121 self
.serverTransport
.listen()
124 client
= self
.serverTransport
.accept()
127 t
= threading
.Thread(target
=self
.handle
, args
=(client
,))
128 t
.setDaemon(self
.daemon
)
130 except KeyboardInterrupt:
132 except Exception as x
:
135 def handle(self
, client
):
136 itrans
= self
.inputTransportFactory
.getTransport(client
)
137 iprot
= self
.inputProtocolFactory
.getProtocol(itrans
)
139 # for THeaderProtocol, we must use the same protocol instance for input
140 # and output so that the response is in the same dialect that the
141 # server detected the request was in.
142 if isinstance(self
.inputProtocolFactory
, THeaderProtocolFactory
):
146 otrans
= self
.outputTransportFactory
.getTransport(client
)
147 oprot
= self
.outputProtocolFactory
.getProtocol(otrans
)
151 self
.processor
.process(iprot
, oprot
)
152 except TTransport
.TTransportException
:
154 except Exception as x
:
162 class TThreadPoolServer(TServer
):
163 """Server with a fixed size pool of threads which service requests."""
165 def __init__(self
, *args
, **kwargs
):
166 TServer
.__init
__(self
, *args
)
167 self
.clients
= queue
.Queue()
169 self
.daemon
= kwargs
.get("daemon", False)
171 def setNumThreads(self
, num
):
172 """Set the number of worker threads that should be created"""
175 def serveThread(self
):
176 """Loop around getting clients from the shared queue and process them."""
179 client
= self
.clients
.get()
180 self
.serveClient(client
)
181 except Exception as x
:
184 def serveClient(self
, client
):
185 """Process input/output from a client for as long as possible"""
186 itrans
= self
.inputTransportFactory
.getTransport(client
)
187 iprot
= self
.inputProtocolFactory
.getProtocol(itrans
)
189 # for THeaderProtocol, we must use the same protocol instance for input
190 # and output so that the response is in the same dialect that the
191 # server detected the request was in.
192 if isinstance(self
.inputProtocolFactory
, THeaderProtocolFactory
):
196 otrans
= self
.outputTransportFactory
.getTransport(client
)
197 oprot
= self
.outputProtocolFactory
.getProtocol(otrans
)
201 self
.processor
.process(iprot
, oprot
)
202 except TTransport
.TTransportException
:
204 except Exception as x
:
212 """Start a fixed number of worker threads and put client into a queue"""
213 for i
in range(self
.threads
):
215 t
= threading
.Thread(target
=self
.serveThread
)
216 t
.setDaemon(self
.daemon
)
218 except Exception as x
:
221 # Pump the socket for clients
222 self
.serverTransport
.listen()
225 client
= self
.serverTransport
.accept()
228 self
.clients
.put(client
)
229 except Exception as x
:
233 class TForkingServer(TServer
):
234 """A Thrift server that forks a new process for each request
236 This is more scalable than the threaded server as it does not cause
239 Note that this has different semantics from the threading server.
240 Specifically, updates to shared variables will no longer be shared.
241 It will also not work on windows.
243 This code is heavily inspired by SocketServer.ForkingMixIn in the
246 def __init__(self
, *args
):
247 TServer
.__init
__(self
, *args
)
255 logger
.warning(e
, exc_info
=True)
257 self
.serverTransport
.listen()
259 client
= self
.serverTransport
.accept()
266 # add before collect, otherwise you race w/ waitpid
267 self
.children
.append(pid
)
268 self
.collect_children()
270 # Parent must close socket or the connection may not get
272 itrans
= self
.inputTransportFactory
.getTransport(client
)
273 otrans
= self
.outputTransportFactory
.getTransport(client
)
277 itrans
= self
.inputTransportFactory
.getTransport(client
)
278 iprot
= self
.inputProtocolFactory
.getProtocol(itrans
)
280 # for THeaderProtocol, we must use the same protocol
281 # instance for input and output so that the response is in
282 # the same dialect that the server detected the request was
284 if isinstance(self
.inputProtocolFactory
, THeaderProtocolFactory
):
288 otrans
= self
.outputTransportFactory
.getTransport(client
)
289 oprot
= self
.outputProtocolFactory
.getProtocol(otrans
)
295 self
.processor
.process(iprot
, oprot
)
296 except TTransport
.TTransportException
:
298 except Exception as e
:
308 except TTransport
.TTransportException
:
310 except Exception as x
:
313 def collect_children(self
):
316 pid
, status
= os
.waitpid(0, os
.WNOHANG
)
321 self
.children
.remove(pid
)