]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/py/src/server/TProcessPoolServer.py
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / py / src / server / TProcessPoolServer.py
CommitLineData
f67539c2
TL
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20
21import logging
22
23from multiprocessing import Process, Value, Condition
24
25from .TServer import TServer
26from thrift.transport.TTransport import TTransportException
27
28logger = logging.getLogger(__name__)
29
30
31class TProcessPoolServer(TServer):
32 """Server with a fixed size pool of worker subprocesses to service requests
33
34 Note that if you need shared state between the handlers - it's up to you!
35 Written by Dvir Volk, doat.com
36 """
37 def __init__(self, *args):
38 TServer.__init__(self, *args)
39 self.numWorkers = 10
40 self.workers = []
41 self.isRunning = Value('b', False)
42 self.stopCondition = Condition()
43 self.postForkCallback = None
44
45 def setPostForkCallback(self, callback):
46 if not callable(callback):
47 raise TypeError("This is not a callback!")
48 self.postForkCallback = callback
49
50 def setNumWorkers(self, num):
51 """Set the number of worker threads that should be created"""
52 self.numWorkers = num
53
54 def workerProcess(self):
55 """Loop getting clients from the shared queue and process them"""
56 if self.postForkCallback:
57 self.postForkCallback()
58
59 while self.isRunning.value:
60 try:
61 client = self.serverTransport.accept()
62 if not client:
63 continue
64 self.serveClient(client)
65 except (KeyboardInterrupt, SystemExit):
66 return 0
67 except Exception as x:
68 logger.exception(x)
69
70 def serveClient(self, client):
71 """Process input/output from a client for as long as possible"""
72 itrans = self.inputTransportFactory.getTransport(client)
73 otrans = self.outputTransportFactory.getTransport(client)
74 iprot = self.inputProtocolFactory.getProtocol(itrans)
75 oprot = self.outputProtocolFactory.getProtocol(otrans)
76
77 try:
78 while True:
79 self.processor.process(iprot, oprot)
80 except TTransportException:
81 pass
82 except Exception as x:
83 logger.exception(x)
84
85 itrans.close()
86 otrans.close()
87
88 def serve(self):
89 """Start workers and put into queue"""
90 # this is a shared state that can tell the workers to exit when False
91 self.isRunning.value = True
92
93 # first bind and listen to the port
94 self.serverTransport.listen()
95
96 # fork the children
97 for i in range(self.numWorkers):
98 try:
99 w = Process(target=self.workerProcess)
100 w.daemon = True
101 w.start()
102 self.workers.append(w)
103 except Exception as x:
104 logger.exception(x)
105
106 # wait until the condition is set by stop()
107 while True:
108 self.stopCondition.acquire()
109 try:
110 self.stopCondition.wait()
111 break
112 except (SystemExit, KeyboardInterrupt):
113 break
114 except Exception as x:
115 logger.exception(x)
116
117 self.isRunning.value = False
118
119 def stop(self):
120 self.isRunning.value = False
121 self.stopCondition.acquire()
122 self.stopCondition.notify()
123 self.stopCondition.release()