1# 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, 13# software distributed under the License is distributed on an 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15# KIND, either express or implied. See the License for the 16# specific language governing permissions and limitations 17# under the License. 18# 19 20"""TZlibTransport provides a compressed transport and transport factory 21class, using the python standard library zlib module to implement 22data compression. 23""" 24 25from __future__ import division 26import zlib 27from .TTransport import TTransportBase, CReadableTransport 28from ..compat import BufferIO 29 30 31class TZlibTransportFactory(object): 32 """Factory transport that builds zlib compressed transports. 33 34 This factory caches the last single client/transport that it was passed 35 and returns the same TZlibTransport object that was created. 36 37 This caching means the TServer class will get the _same_ transport 38 object for both input and output transports from this factory. 39 (For non-threaded scenarios only, since the cache only holds one object) 40 41 The purpose of this caching is to allocate only one TZlibTransport where 42 only one is really needed (since it must have separate read/write buffers), 43 and makes the statistics from getCompSavings() and getCompRatio() 44 easier to understand. 45 """ 46 # class scoped cache of last transport given and zlibtransport returned 47 _last_trans = None 48 _last_z = None 49 50 def getTransport(self, trans, compresslevel=9): 51 """Wrap a transport, trans, with the TZlibTransport 52 compressed transport class, returning a new 53 transport to the caller. 54 55 @param compresslevel: The zlib compression level, ranging 56 from 0 (no compression) to 9 (best compression). Defaults to 9. 57 @type compresslevel: int 58 59 This method returns a TZlibTransport which wraps the 60 passed C{trans} TTransport derived instance. 61 """ 62 if trans == self._last_trans: 63 return self._last_z 64 ztrans = TZlibTransport(trans, compresslevel) 65 self._last_trans = trans 66 self._last_z = ztrans 67 return ztrans 68 69 70class TZlibTransport(TTransportBase, CReadableTransport): 71 """Class that wraps a transport with zlib, compressing writes 72 and decompresses reads, using the python standard 73 library zlib module. 74 """ 75 # Read buffer size for the python fastbinary C extension, 76 # the TBinaryProtocolAccelerated class. 77 DEFAULT_BUFFSIZE = 4096 78 79 def __init__(self, trans, compresslevel=9): 80 """Create a new TZlibTransport, wrapping C{trans}, another 81 TTransport derived object. 82 83 @param trans: A thrift transport object, i.e. a TSocket() object. 84 @type trans: TTransport 85 @param compresslevel: The zlib compression level, ranging 86 from 0 (no compression) to 9 (best compression). Default is 9. 87 @type compresslevel: int 88 """ 89 self.__trans = trans 90 self.compresslevel = compresslevel 91 self.__rbuf = BufferIO() 92 self.__wbuf = BufferIO() 93 self._init_zlib() 94 self._init_stats() 95 96 def _reinit_buffers(self): 97 """Internal method to initialize/reset the internal StringIO objects 98 for read and write buffers. 99 """ 100 self.__rbuf = BufferIO() 101 self.__wbuf = BufferIO() 102 103 def _init_stats(self): 104 """Internal method to reset the internal statistics counters 105 for compression ratios and bandwidth savings. 106 """ 107 self.bytes_in = 0 108 self.bytes_out = 0 109 self.bytes_in_comp = 0 110 self.bytes_out_comp = 0 111 112 def _init_zlib(self): 113 """Internal method for setting up the zlib compression and 114 decompression objects. 115 """ 116 self._zcomp_read = zlib.decompressobj() 117 self._zcomp_write = zlib.compressobj(self.compresslevel) 118 119 def getCompRatio(self): 120 """Get the current measured compression ratios (in,out) from 121 this transport. 122 123 Returns a tuple of: 124 (inbound_compression_ratio, outbound_compression_ratio) 125 126 The compression ratios are computed as: 127 compressed / uncompressed 128 129 E.g., data that compresses by 10x will have a ratio of: 0.10 130 and data that compresses to half of ts original size will 131 have a ratio of 0.5 132 133 None is returned if no bytes have yet been processed in 134 a particular direction. 135 """ 136 r_percent, w_percent = (None, None) 137 if self.bytes_in > 0: 138 r_percent = self.bytes_in_comp / self.bytes_in 139 if self.bytes_out > 0: 140 w_percent = self.bytes_out_comp / self.bytes_out 141 return (r_percent, w_percent) 142 143 def getCompSavings(self): 144 """Get the current count of saved bytes due to data 145 compression. 146 147 Returns a tuple of: 148 (inbound_saved_bytes, outbound_saved_bytes) 149 150 Note: if compression is actually expanding your 151 data (only likely with very tiny thrift objects), then 152 the values returned will be negative. 153 """ 154 r_saved = self.bytes_in - self.bytes_in_comp 155 w_saved = self.bytes_out - self.bytes_out_comp 156 return (r_saved, w_saved) 157 158 def isOpen(self): 159 """Return the underlying transport's open status""" 160 return self.__trans.isOpen() 161 162 def open(self): 163 """Open the underlying transport""" 164 self._init_stats() 165 return self.__trans.open() 166 167 def listen(self): 168 """Invoke the underlying transport's listen() method""" 169 self.__trans.listen() 170 171 def accept(self): 172 """Accept connections on the underlying transport""" 173 return self.__trans.accept() 174 175 def close(self): 176 """Close the underlying transport,""" 177 self._reinit_buffers() 178 self._init_zlib() 179 return self.__trans.close() 180 181 def read(self, sz): 182 """Read up to sz bytes from the decompressed bytes buffer, and 183 read from the underlying transport if the decompression 184 buffer is empty. 185 """ 186 ret = self.__rbuf.read(sz) 187 if len(ret) > 0: 188 return ret 189 # keep reading from transport until something comes back 190 while True: 191 if self.readComp(sz): 192 break 193 ret = self.__rbuf.read(sz) 194 return ret 195 196 def readComp(self, sz): 197 """Read compressed data from the underlying transport, then 198 decompress it and append it to the internal StringIO read buffer 199 """ 200 zbuf = self.__trans.read(sz) 201 zbuf = self._zcomp_read.unconsumed_tail + zbuf 202 buf = self._zcomp_read.decompress(zbuf) 203 self.bytes_in += len(zbuf) 204 self.bytes_in_comp += len(buf) 205 old = self.__rbuf.read() 206 self.__rbuf = BufferIO(old + buf) 207 if len(old) + len(buf) == 0: 208 return False 209 return True 210 211 def write(self, buf): 212 """Write some bytes, putting them into the internal write 213 buffer for eventual compression. 214 """ 215 self.__wbuf.write(buf) 216 217 def flush(self): 218 """Flush any queued up data in the write buffer and ensure the 219 compression buffer is flushed out to the underlying transport 220 """ 221 wout = self.__wbuf.getvalue() 222 if len(wout) > 0: 223 zbuf = self._zcomp_write.compress(wout) 224 self.bytes_out += len(wout) 225 self.bytes_out_comp += len(zbuf) 226 else: 227 zbuf = '' 228 ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH) 229 self.bytes_out_comp += len(ztail) 230 if (len(zbuf) + len(ztail)) > 0: 231 self.__wbuf = BufferIO() 232 self.__trans.write(zbuf + ztail) 233 self.__trans.flush() 234 235 @property 236 def cstringio_buf(self): 237 """Implement the CReadableTransport interface""" 238 return self.__rbuf 239 240 def cstringio_refill(self, partialread, reqlen): 241 """Implement the CReadableTransport interface for refill""" 242 retstring = partialread 243 if reqlen < self.DEFAULT_BUFFSIZE: 244 retstring += self.read(self.DEFAULT_BUFFSIZE) 245 while len(retstring) < reqlen: 246 retstring += self.read(reqlen - len(retstring)) 247 self.__rbuf = BufferIO(retstring) 248 return self.__rbuf 249