]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | # |
2 | # Licensed to the Apache Software Foundation (ASF) under one | |
3 | # or more contributor license agreements. See the NOTICE file | |
4 | # distributed with this work for additional information | |
5 | # regarding copyright ownership. The ASF licenses this file | |
6 | # to you under the Apache License, Version 2.0 (the | |
7 | # "License"); you may not use this file except in compliance | |
8 | # with the License. You may obtain a copy of the License at | |
9 | # | |
10 | # http://www.apache.org/licenses/LICENSE-2.0 | |
11 | # | |
12 | # Unless required by applicable law or agreed to in writing, | |
13 | # software distributed under the License is distributed on an | |
14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | # KIND, either express or implied. See the License for the | |
16 | # specific language governing permissions and limitations | |
17 | # under the License. | |
18 | # | |
19 | ||
20 | """TZlibTransport provides a compressed transport and transport factory | |
21 | class, using the python standard library zlib module to implement | |
22 | data compression. | |
23 | """ | |
24 | ||
25 | from __future__ import division | |
26 | import zlib | |
27 | from .TTransport import TTransportBase, CReadableTransport | |
28 | from ..compat import BufferIO | |
29 | ||
30 | ||
31 | class TZlibTransportFactory(object): | |
32 | """Factory transport that builds zlib compressed transports. | |
33 | ||
34 | This factory caches the last single client/transport that it was passed | |
35 | and returns the same TZlibTransport object that was created. | |
36 | ||
37 | This caching means the TServer class will get the _same_ transport | |
38 | object for both input and output transports from this factory. | |
39 | (For non-threaded scenarios only, since the cache only holds one object) | |
40 | ||
41 | The purpose of this caching is to allocate only one TZlibTransport where | |
42 | only one is really needed (since it must have separate read/write buffers), | |
43 | and makes the statistics from getCompSavings() and getCompRatio() | |
44 | easier to understand. | |
45 | """ | |
46 | # class scoped cache of last transport given and zlibtransport returned | |
47 | _last_trans = None | |
48 | _last_z = None | |
49 | ||
50 | def getTransport(self, trans, compresslevel=9): | |
51 | """Wrap a transport, trans, with the TZlibTransport | |
52 | compressed transport class, returning a new | |
53 | transport to the caller. | |
54 | ||
55 | @param compresslevel: The zlib compression level, ranging | |
56 | from 0 (no compression) to 9 (best compression). Defaults to 9. | |
57 | @type compresslevel: int | |
58 | ||
59 | This method returns a TZlibTransport which wraps the | |
60 | passed C{trans} TTransport derived instance. | |
61 | """ | |
62 | if trans == self._last_trans: | |
63 | return self._last_z | |
64 | ztrans = TZlibTransport(trans, compresslevel) | |
65 | self._last_trans = trans | |
66 | self._last_z = ztrans | |
67 | return ztrans | |
68 | ||
69 | ||
70 | class TZlibTransport(TTransportBase, CReadableTransport): | |
71 | """Class that wraps a transport with zlib, compressing writes | |
72 | and decompresses reads, using the python standard | |
73 | library zlib module. | |
74 | """ | |
75 | # Read buffer size for the python fastbinary C extension, | |
76 | # the TBinaryProtocolAccelerated class. | |
77 | DEFAULT_BUFFSIZE = 4096 | |
78 | ||
79 | def __init__(self, trans, compresslevel=9): | |
80 | """Create a new TZlibTransport, wrapping C{trans}, another | |
81 | TTransport derived object. | |
82 | ||
83 | @param trans: A thrift transport object, i.e. a TSocket() object. | |
84 | @type trans: TTransport | |
85 | @param compresslevel: The zlib compression level, ranging | |
86 | from 0 (no compression) to 9 (best compression). Default is 9. | |
87 | @type compresslevel: int | |
88 | """ | |
89 | self.__trans = trans | |
90 | self.compresslevel = compresslevel | |
91 | self.__rbuf = BufferIO() | |
92 | self.__wbuf = BufferIO() | |
93 | self._init_zlib() | |
94 | self._init_stats() | |
95 | ||
96 | def _reinit_buffers(self): | |
97 | """Internal method to initialize/reset the internal StringIO objects | |
98 | for read and write buffers. | |
99 | """ | |
100 | self.__rbuf = BufferIO() | |
101 | self.__wbuf = BufferIO() | |
102 | ||
103 | def _init_stats(self): | |
104 | """Internal method to reset the internal statistics counters | |
105 | for compression ratios and bandwidth savings. | |
106 | """ | |
107 | self.bytes_in = 0 | |
108 | self.bytes_out = 0 | |
109 | self.bytes_in_comp = 0 | |
110 | self.bytes_out_comp = 0 | |
111 | ||
112 | def _init_zlib(self): | |
113 | """Internal method for setting up the zlib compression and | |
114 | decompression objects. | |
115 | """ | |
116 | self._zcomp_read = zlib.decompressobj() | |
117 | self._zcomp_write = zlib.compressobj(self.compresslevel) | |
118 | ||
119 | def getCompRatio(self): | |
120 | """Get the current measured compression ratios (in,out) from | |
121 | this transport. | |
122 | ||
123 | Returns a tuple of: | |
124 | (inbound_compression_ratio, outbound_compression_ratio) | |
125 | ||
126 | The compression ratios are computed as: | |
127 | compressed / uncompressed | |
128 | ||
129 | E.g., data that compresses by 10x will have a ratio of: 0.10 | |
130 | and data that compresses to half of ts original size will | |
131 | have a ratio of 0.5 | |
132 | ||
133 | None is returned if no bytes have yet been processed in | |
134 | a particular direction. | |
135 | """ | |
136 | r_percent, w_percent = (None, None) | |
137 | if self.bytes_in > 0: | |
138 | r_percent = self.bytes_in_comp / self.bytes_in | |
139 | if self.bytes_out > 0: | |
140 | w_percent = self.bytes_out_comp / self.bytes_out | |
141 | return (r_percent, w_percent) | |
142 | ||
143 | def getCompSavings(self): | |
144 | """Get the current count of saved bytes due to data | |
145 | compression. | |
146 | ||
147 | Returns a tuple of: | |
148 | (inbound_saved_bytes, outbound_saved_bytes) | |
149 | ||
150 | Note: if compression is actually expanding your | |
151 | data (only likely with very tiny thrift objects), then | |
152 | the values returned will be negative. | |
153 | """ | |
154 | r_saved = self.bytes_in - self.bytes_in_comp | |
155 | w_saved = self.bytes_out - self.bytes_out_comp | |
156 | return (r_saved, w_saved) | |
157 | ||
158 | def isOpen(self): | |
159 | """Return the underlying transport's open status""" | |
160 | return self.__trans.isOpen() | |
161 | ||
162 | def open(self): | |
163 | """Open the underlying transport""" | |
164 | self._init_stats() | |
165 | return self.__trans.open() | |
166 | ||
167 | def listen(self): | |
168 | """Invoke the underlying transport's listen() method""" | |
169 | self.__trans.listen() | |
170 | ||
171 | def accept(self): | |
172 | """Accept connections on the underlying transport""" | |
173 | return self.__trans.accept() | |
174 | ||
175 | def close(self): | |
176 | """Close the underlying transport,""" | |
177 | self._reinit_buffers() | |
178 | self._init_zlib() | |
179 | return self.__trans.close() | |
180 | ||
181 | def read(self, sz): | |
182 | """Read up to sz bytes from the decompressed bytes buffer, and | |
183 | read from the underlying transport if the decompression | |
184 | buffer is empty. | |
185 | """ | |
186 | ret = self.__rbuf.read(sz) | |
187 | if len(ret) > 0: | |
188 | return ret | |
189 | # keep reading from transport until something comes back | |
190 | while True: | |
191 | if self.readComp(sz): | |
192 | break | |
193 | ret = self.__rbuf.read(sz) | |
194 | return ret | |
195 | ||
196 | def readComp(self, sz): | |
197 | """Read compressed data from the underlying transport, then | |
198 | decompress it and append it to the internal StringIO read buffer | |
199 | """ | |
200 | zbuf = self.__trans.read(sz) | |
201 | zbuf = self._zcomp_read.unconsumed_tail + zbuf | |
202 | buf = self._zcomp_read.decompress(zbuf) | |
203 | self.bytes_in += len(zbuf) | |
204 | self.bytes_in_comp += len(buf) | |
205 | old = self.__rbuf.read() | |
206 | self.__rbuf = BufferIO(old + buf) | |
207 | if len(old) + len(buf) == 0: | |
208 | return False | |
209 | return True | |
210 | ||
211 | def write(self, buf): | |
212 | """Write some bytes, putting them into the internal write | |
213 | buffer for eventual compression. | |
214 | """ | |
215 | self.__wbuf.write(buf) | |
216 | ||
217 | def flush(self): | |
218 | """Flush any queued up data in the write buffer and ensure the | |
219 | compression buffer is flushed out to the underlying transport | |
220 | """ | |
221 | wout = self.__wbuf.getvalue() | |
222 | if len(wout) > 0: | |
223 | zbuf = self._zcomp_write.compress(wout) | |
224 | self.bytes_out += len(wout) | |
225 | self.bytes_out_comp += len(zbuf) | |
226 | else: | |
227 | zbuf = '' | |
228 | ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH) | |
229 | self.bytes_out_comp += len(ztail) | |
230 | if (len(zbuf) + len(ztail)) > 0: | |
231 | self.__wbuf = BufferIO() | |
232 | self.__trans.write(zbuf + ztail) | |
233 | self.__trans.flush() | |
234 | ||
235 | @property | |
236 | def cstringio_buf(self): | |
237 | """Implement the CReadableTransport interface""" | |
238 | return self.__rbuf | |
239 | ||
240 | def cstringio_refill(self, partialread, reqlen): | |
241 | """Implement the CReadableTransport interface for refill""" | |
242 | retstring = partialread | |
243 | if reqlen < self.DEFAULT_BUFFSIZE: | |
244 | retstring += self.read(self.DEFAULT_BUFFSIZE) | |
245 | while len(retstring) < reqlen: | |
246 | retstring += self.read(reqlen - len(retstring)) | |
247 | self.__rbuf = BufferIO(retstring) | |
248 | return self.__rbuf |