]>
git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/py/src/server/TNonblockingServer.py
2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
19 """Implementation of non-blocking server.
21 The main idea of the server is to receive and send requests
22 only from the main thread.
24 The thread poool should be sized for concurrent tasks, not
34 from collections
import deque
35 from six
.moves
import queue
37 from thrift
.transport
import TTransport
38 from thrift
.protocol
.TBinaryProtocol
import TBinaryProtocolFactory
40 __all__
= ['TNonblockingServer']
42 logger
= logging
.getLogger(__name__
)
45 class Worker(threading
.Thread
):
46 """Worker is a small helper to process incoming connection."""
48 def __init__(self
, queue
):
49 threading
.Thread
.__init
__(self
)
53 """Process queries from task queue, stop if processor is None."""
56 processor
, iprot
, oprot
, otrans
, callback
= self
.queue
.get()
59 processor
.process(iprot
, oprot
)
60 callback(True, otrans
.getvalue())
62 logger
.exception("Exception while processing request", exc_info
=True)
74 """Decorator which locks self.lock."""
75 def nested(self
, *args
, **kwargs
):
78 return func(self
, *args
, **kwargs
)
84 def socket_exception(func
):
85 """Decorator close object on socket.error."""
86 def read(self
, *args
, **kwargs
):
88 return func(self
, *args
, **kwargs
)
90 logger
.debug('ignoring socket exception', exc_info
=True)
95 class Message(object):
96 def __init__(self
, offset
, len_
, header
):
100 self
.is_header
= header
104 return self
.offset
+ self
.len
107 class Connection(object):
108 """Basic class is represented connection.
111 WAIT_LEN --- connection is reading request len.
112 WAIT_MESSAGE --- connection is reading request.
113 WAIT_PROCESS --- connection has just read whole request and
114 waits for call ready routine.
115 SEND_ANSWER --- connection is sending answer string (including length
117 CLOSED --- socket was closed and connection should be deleted.
119 def __init__(self
, new_socket
, wake_up
):
120 self
.socket
= new_socket
121 self
.socket
.setblocking(False)
122 self
.status
= WAIT_LEN
124 self
.received
= deque()
125 self
._reading
= Message(0, 4, True)
128 self
.lock
= threading
.Lock()
129 self
.wake_up
= wake_up
130 self
.remaining
= False
134 """Reads data from stream and switch state."""
135 assert self
.status
in (WAIT_LEN
, WAIT_MESSAGE
)
136 assert not self
.received
141 read
= self
.socket
.recv(buf_size
)
143 done
= rlen
< buf_size
145 if first
and rlen
== 0:
146 if self
.status
!= WAIT_LEN
or self
._rbuf
:
147 logger
.error('could not read frame from socket')
149 logger
.debug('read zero length. client might have disconnected')
151 while len(self
._rbuf
) >= self
._reading
.end
:
152 if self
._reading
.is_header
:
153 mlen
, = struct
.unpack('!i', self
._rbuf
[:4])
154 self
._reading
= Message(self
._reading
.end
, mlen
, False)
155 self
.status
= WAIT_MESSAGE
157 self
._reading
.buffer = self
._rbuf
158 self
.received
.append(self
._reading
)
159 self
._rbuf
= self
._rbuf
[self
._reading
.end
:]
160 self
._reading
= Message(0, 4, True)
163 self
.status
= WAIT_PROCESS
165 self
.remaining
= not done
169 """Writes data from socket and switch state."""
170 assert self
.status
== SEND_ANSWER
171 sent
= self
.socket
.send(self
._wbuf
)
172 if sent
== len(self
._wbuf
):
173 self
.status
= WAIT_LEN
177 self
._wbuf
= self
._wbuf
[sent
:]
180 def ready(self
, all_ok
, message
):
181 """Callback function for switching state and waking up main thread.
183 This function is the only function witch can be called asynchronous.
185 The ready can switch Connection to three states:
186 WAIT_LEN if request was oneway.
187 SEND_ANSWER if request was processed in normal way.
188 CLOSED if request throws unexpected exception.
190 The one wakes up main thread.
192 assert self
.status
== WAIT_PROCESS
198 if len(message
) == 0:
199 # it was a oneway request, do not write answer
201 self
.status
= WAIT_LEN
203 self
._wbuf
= struct
.pack('!i', len(message
)) + message
204 self
.status
= SEND_ANSWER
208 def is_writeable(self
):
209 """Return True if connection should be added to write list of select"""
210 return self
.status
== SEND_ANSWER
212 # it's not necessary, but...
214 def is_readable(self
):
215 """Return True if connection should be added to read list of select"""
216 return self
.status
in (WAIT_LEN
, WAIT_MESSAGE
)
220 """Returns True if connection is closed."""
221 return self
.status
== CLOSED
224 """Returns the file descriptor of the associated socket."""
225 return self
.socket
.fileno()
228 """Closes connection"""
233 class TNonblockingServer(object):
234 """Non-blocking server."""
239 inputProtocolFactory
=None,
240 outputProtocolFactory
=None,
242 self
.processor
= processor
243 self
.socket
= lsocket
244 self
.in_protocol
= inputProtocolFactory
or TBinaryProtocolFactory()
245 self
.out_protocol
= outputProtocolFactory
or self
.in_protocol
246 self
.threads
= int(threads
)
248 self
.tasks
= queue
.Queue()
249 self
._read
, self
._write
= socket
.socketpair()
250 self
.prepared
= False
253 def setNumThreads(self
, num
):
254 """Set the number of worker threads that should be created."""
255 # implement ThreadPool interface
256 assert not self
.prepared
, "Can't change number of threads after start"
260 """Prepares server for serve requests."""
264 for _
in range(self
.threads
):
265 thread
= Worker(self
.tasks
)
266 thread
.setDaemon(True)
271 """Wake up main thread.
273 The server usually waits in select call in we should terminate one.
274 The simplest way is using socketpair.
276 Select always wait to read from the first socket of socketpair.
278 In this case, we can just write anything to the second socket from
281 self
._write
.send(b
'1')
286 This method causes the serve() method to return. stop() may be invoked
287 from within your handler, or from another thread.
289 After stop() is called, serve() will return but the server will still
290 be listening on the socket. serve() may then be called again to resume
291 processing requests. Alternatively, close() may be called after
292 serve() returns to close the server socket and shutdown all worker
299 """Does select on open connections."""
300 readable
= [self
.socket
.handle
.fileno(), self
._read
.fileno()]
303 for i
, connection
in list(self
.clients
.items()):
304 if connection
.is_readable():
305 readable
.append(connection
.fileno())
306 if connection
.remaining
or connection
.received
:
307 remaining
.append(connection
.fileno())
308 if connection
.is_writeable():
309 writable
.append(connection
.fileno())
310 if connection
.is_closed():
313 return remaining
, [], [], False
315 return select
.select(readable
, writable
, readable
) + (True,)
320 WARNING! You must call prepare() BEFORE calling handle()
322 assert self
.prepared
, "You have to call prepare before handle"
323 rset
, wset
, xset
, selected
= self
._select
()
324 for readable
in rset
:
325 if readable
== self
._read
.fileno():
326 # don't care i just need to clean readable flag
327 self
._read
.recv(1024)
328 elif readable
== self
.socket
.handle
.fileno():
330 client
= self
.socket
.accept()
332 self
.clients
[client
.handle
.fileno()] = Connection(client
.handle
,
335 logger
.debug('error while accepting', exc_info
=True)
337 connection
= self
.clients
[readable
]
340 if connection
.received
:
341 connection
.status
= WAIT_PROCESS
342 msg
= connection
.received
.popleft()
343 itransport
= TTransport
.TMemoryBuffer(msg
.buffer, msg
.offset
)
344 otransport
= TTransport
.TMemoryBuffer()
345 iprot
= self
.in_protocol
.getProtocol(itransport
)
346 oprot
= self
.out_protocol
.getProtocol(otransport
)
347 self
.tasks
.put([self
.processor
, iprot
, oprot
,
348 otransport
, connection
.ready
])
349 for writeable
in wset
:
350 self
.clients
[writeable
].write()
352 self
.clients
[oob
].close()
353 del self
.clients
[oob
]
356 """Closes the server."""
357 for _
in range(self
.threads
):
358 self
.tasks
.put([None, None, None, None, None])
360 self
.prepared
= False
365 Serve requests forever, or until stop() is called.
369 while not self
._stop
: