]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | # |
2 | # Licensed to the Apache Software Foundation (ASF) under one | |
3 | # or more contributor license agreements. See the NOTICE file | |
4 | # distributed with this work for additional information | |
5 | # regarding copyright ownership. The ASF licenses this file | |
6 | # to you under the Apache License, Version 2.0 (the | |
7 | # "License"); you may not use this file except in compliance | |
8 | # with the License. You may obtain a copy of the License at | |
9 | # | |
10 | # http://www.apache.org/licenses/LICENSE-2.0 | |
11 | # | |
12 | # Unless required by applicable law or agreed to in writing, | |
13 | # software distributed under the License is distributed on an | |
14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | # KIND, either express or implied. See the License for the | |
16 | # specific language governing permissions and limitations | |
17 | # under the License. | |
18 | # | |
19 | ||
20 | ||
21 | import logging | |
22 | ||
23 | from multiprocessing import Process, Value, Condition | |
24 | ||
25 | from .TServer import TServer | |
26 | from thrift.transport.TTransport import TTransportException | |
27 | ||
28 | logger = logging.getLogger(__name__) | |
29 | ||
30 | ||
31 | class TProcessPoolServer(TServer): | |
32 | """Server with a fixed size pool of worker subprocesses to service requests | |
33 | ||
34 | Note that if you need shared state between the handlers - it's up to you! | |
35 | Written by Dvir Volk, doat.com | |
36 | """ | |
37 | def __init__(self, *args): | |
38 | TServer.__init__(self, *args) | |
39 | self.numWorkers = 10 | |
40 | self.workers = [] | |
41 | self.isRunning = Value('b', False) | |
42 | self.stopCondition = Condition() | |
43 | self.postForkCallback = None | |
44 | ||
45 | def setPostForkCallback(self, callback): | |
46 | if not callable(callback): | |
47 | raise TypeError("This is not a callback!") | |
48 | self.postForkCallback = callback | |
49 | ||
50 | def setNumWorkers(self, num): | |
51 | """Set the number of worker threads that should be created""" | |
52 | self.numWorkers = num | |
53 | ||
54 | def workerProcess(self): | |
55 | """Loop getting clients from the shared queue and process them""" | |
56 | if self.postForkCallback: | |
57 | self.postForkCallback() | |
58 | ||
59 | while self.isRunning.value: | |
60 | try: | |
61 | client = self.serverTransport.accept() | |
62 | if not client: | |
63 | continue | |
64 | self.serveClient(client) | |
65 | except (KeyboardInterrupt, SystemExit): | |
66 | return 0 | |
67 | except Exception as x: | |
68 | logger.exception(x) | |
69 | ||
70 | def serveClient(self, client): | |
71 | """Process input/output from a client for as long as possible""" | |
72 | itrans = self.inputTransportFactory.getTransport(client) | |
73 | otrans = self.outputTransportFactory.getTransport(client) | |
74 | iprot = self.inputProtocolFactory.getProtocol(itrans) | |
75 | oprot = self.outputProtocolFactory.getProtocol(otrans) | |
76 | ||
77 | try: | |
78 | while True: | |
79 | self.processor.process(iprot, oprot) | |
80 | except TTransportException: | |
81 | pass | |
82 | except Exception as x: | |
83 | logger.exception(x) | |
84 | ||
85 | itrans.close() | |
86 | otrans.close() | |
87 | ||
88 | def serve(self): | |
89 | """Start workers and put into queue""" | |
90 | # this is a shared state that can tell the workers to exit when False | |
91 | self.isRunning.value = True | |
92 | ||
93 | # first bind and listen to the port | |
94 | self.serverTransport.listen() | |
95 | ||
96 | # fork the children | |
97 | for i in range(self.numWorkers): | |
98 | try: | |
99 | w = Process(target=self.workerProcess) | |
100 | w.daemon = True | |
101 | w.start() | |
102 | self.workers.append(w) | |
103 | except Exception as x: | |
104 | logger.exception(x) | |
105 | ||
106 | # wait until the condition is set by stop() | |
107 | while True: | |
108 | self.stopCondition.acquire() | |
109 | try: | |
110 | self.stopCondition.wait() | |
111 | break | |
112 | except (SystemExit, KeyboardInterrupt): | |
113 | break | |
114 | except Exception as x: | |
115 | logger.exception(x) | |
116 | ||
117 | self.isRunning.value = False | |
118 | ||
119 | def stop(self): | |
120 | self.isRunning.value = False | |
121 | self.stopCondition.acquire() | |
122 | self.stopCondition.notify() | |
123 | self.stopCondition.release() |