1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10#   http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20"""TZlibTransport provides a compressed transport and transport factory
21class, using the python standard library zlib module to implement
22data compression.
23"""
24
25from __future__ import division
26import zlib
27from .TTransport import TTransportBase, CReadableTransport
28from ..compat import BufferIO
29
30
31class TZlibTransportFactory(object):
32    """Factory transport that builds zlib compressed transports.
33
34    This factory caches the last single client/transport that it was passed
35    and returns the same TZlibTransport object that was created.
36
37    This caching means the TServer class will get the _same_ transport
38    object for both input and output transports from this factory.
39    (For non-threaded scenarios only, since the cache only holds one object)
40
41    The purpose of this caching is to allocate only one TZlibTransport where
42    only one is really needed (since it must have separate read/write buffers),
43    and makes the statistics from getCompSavings() and getCompRatio()
44    easier to understand.
45    """
46    # class scoped cache of last transport given and zlibtransport returned
47    _last_trans = None
48    _last_z = None
49
50    def getTransport(self, trans, compresslevel=9):
51        """Wrap a transport, trans, with the TZlibTransport
52        compressed transport class, returning a new
53        transport to the caller.
54
55        @param compresslevel: The zlib compression level, ranging
56        from 0 (no compression) to 9 (best compression).  Defaults to 9.
57        @type compresslevel: int
58
59        This method returns a TZlibTransport which wraps the
60        passed C{trans} TTransport derived instance.
61        """
62        if trans == self._last_trans:
63            return self._last_z
64        ztrans = TZlibTransport(trans, compresslevel)
65        self._last_trans = trans
66        self._last_z = ztrans
67        return ztrans
68
69
70class TZlibTransport(TTransportBase, CReadableTransport):
71    """Class that wraps a transport with zlib, compressing writes
72    and decompresses reads, using the python standard
73    library zlib module.
74    """
75    # Read buffer size for the python fastbinary C extension,
76    # the TBinaryProtocolAccelerated class.
77    DEFAULT_BUFFSIZE = 4096
78
79    def __init__(self, trans, compresslevel=9):
80        """Create a new TZlibTransport, wrapping C{trans}, another
81        TTransport derived object.
82
83        @param trans: A thrift transport object, i.e. a TSocket() object.
84        @type trans: TTransport
85        @param compresslevel: The zlib compression level, ranging
86        from 0 (no compression) to 9 (best compression).  Default is 9.
87        @type compresslevel: int
88        """
89        self.__trans = trans
90        self.compresslevel = compresslevel
91        self.__rbuf = BufferIO()
92        self.__wbuf = BufferIO()
93        self._init_zlib()
94        self._init_stats()
95
96    def _reinit_buffers(self):
97        """Internal method to initialize/reset the internal StringIO objects
98        for read and write buffers.
99        """
100        self.__rbuf = BufferIO()
101        self.__wbuf = BufferIO()
102
103    def _init_stats(self):
104        """Internal method to reset the internal statistics counters
105        for compression ratios and bandwidth savings.
106        """
107        self.bytes_in = 0
108        self.bytes_out = 0
109        self.bytes_in_comp = 0
110        self.bytes_out_comp = 0
111
112    def _init_zlib(self):
113        """Internal method for setting up the zlib compression and
114        decompression objects.
115        """
116        self._zcomp_read = zlib.decompressobj()
117        self._zcomp_write = zlib.compressobj(self.compresslevel)
118
119    def getCompRatio(self):
120        """Get the current measured compression ratios (in,out) from
121        this transport.
122
123        Returns a tuple of:
124        (inbound_compression_ratio, outbound_compression_ratio)
125
126        The compression ratios are computed as:
127            compressed / uncompressed
128
129        E.g., data that compresses by 10x will have a ratio of: 0.10
130        and data that compresses to half of ts original size will
131        have a ratio of 0.5
132
133        None is returned if no bytes have yet been processed in
134        a particular direction.
135        """
136        r_percent, w_percent = (None, None)
137        if self.bytes_in > 0:
138            r_percent = self.bytes_in_comp / self.bytes_in
139        if self.bytes_out > 0:
140            w_percent = self.bytes_out_comp / self.bytes_out
141        return (r_percent, w_percent)
142
143    def getCompSavings(self):
144        """Get the current count of saved bytes due to data
145        compression.
146
147        Returns a tuple of:
148        (inbound_saved_bytes, outbound_saved_bytes)
149
150        Note: if compression is actually expanding your
151        data (only likely with very tiny thrift objects), then
152        the values returned will be negative.
153        """
154        r_saved = self.bytes_in - self.bytes_in_comp
155        w_saved = self.bytes_out - self.bytes_out_comp
156        return (r_saved, w_saved)
157
158    def isOpen(self):
159        """Return the underlying transport's open status"""
160        return self.__trans.isOpen()
161
162    def open(self):
163        """Open the underlying transport"""
164        self._init_stats()
165        return self.__trans.open()
166
167    def listen(self):
168        """Invoke the underlying transport's listen() method"""
169        self.__trans.listen()
170
171    def accept(self):
172        """Accept connections on the underlying transport"""
173        return self.__trans.accept()
174
175    def close(self):
176        """Close the underlying transport,"""
177        self._reinit_buffers()
178        self._init_zlib()
179        return self.__trans.close()
180
181    def read(self, sz):
182        """Read up to sz bytes from the decompressed bytes buffer, and
183        read from the underlying transport if the decompression
184        buffer is empty.
185        """
186        ret = self.__rbuf.read(sz)
187        if len(ret) > 0:
188            return ret
189        # keep reading from transport until something comes back
190        while True:
191            if self.readComp(sz):
192                break
193        ret = self.__rbuf.read(sz)
194        return ret
195
196    def readComp(self, sz):
197        """Read compressed data from the underlying transport, then
198        decompress it and append it to the internal StringIO read buffer
199        """
200        zbuf = self.__trans.read(sz)
201        zbuf = self._zcomp_read.unconsumed_tail + zbuf
202        buf = self._zcomp_read.decompress(zbuf)
203        self.bytes_in += len(zbuf)
204        self.bytes_in_comp += len(buf)
205        old = self.__rbuf.read()
206        self.__rbuf = BufferIO(old + buf)
207        if len(old) + len(buf) == 0:
208            return False
209        return True
210
211    def write(self, buf):
212        """Write some bytes, putting them into the internal write
213        buffer for eventual compression.
214        """
215        self.__wbuf.write(buf)
216
217    def flush(self):
218        """Flush any queued up data in the write buffer and ensure the
219        compression buffer is flushed out to the underlying transport
220        """
221        wout = self.__wbuf.getvalue()
222        if len(wout) > 0:
223            zbuf = self._zcomp_write.compress(wout)
224            self.bytes_out += len(wout)
225            self.bytes_out_comp += len(zbuf)
226        else:
227            zbuf = ''
228        ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH)
229        self.bytes_out_comp += len(ztail)
230        if (len(zbuf) + len(ztail)) > 0:
231            self.__wbuf = BufferIO()
232            self.__trans.write(zbuf + ztail)
233        self.__trans.flush()
234
235    @property
236    def cstringio_buf(self):
237        """Implement the CReadableTransport interface"""
238        return self.__rbuf
239
240    def cstringio_refill(self, partialread, reqlen):
241        """Implement the CReadableTransport interface for refill"""
242        retstring = partialread
243        if reqlen < self.DEFAULT_BUFFSIZE:
244            retstring += self.read(self.DEFAULT_BUFFSIZE)
245        while len(retstring) < reqlen:
246            retstring += self.read(reqlen - len(retstring))
247        self.__rbuf = BufferIO(retstring)
248        return self.__rbuf
249