1# -*- coding: utf-8 -*-
2# Copyright 2014 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Media helper functions and classes for Google Cloud Storage JSON API."""
16
17from __future__ import absolute_import
18from __future__ import print_function
19from __future__ import division
20from __future__ import unicode_literals
21
22import copy
23import logging
24import re
25import socket
26import types
27
28import six
29from six.moves import http_client
30from six.moves import urllib
31from six.moves import cStringIO
32
33from apitools.base.py import exceptions as apitools_exceptions
34
35from gslib.cloud_api import BadRequestException
36from gslib.lazy_wrapper import LazyWrapper
37from gslib.progress_callback import ProgressCallbackWithTimeout
38from gslib.utils.constants import DEBUGLEVEL_DUMP_REQUESTS
39from gslib.utils.constants import SSL_TIMEOUT_SEC
40from gslib.utils.constants import TRANSFER_BUFFER_SIZE
41from gslib.utils.constants import UTF8
42from gslib.utils import text_util
43import httplib2
44from httplib2 import parse_uri
45
46if six.PY3:
47  long = int
48
49# A regex for matching any series of decimal digits.
50DECIMAL_REGEX = LazyWrapper(lambda: (re.compile(r'\d+')))
51
52
53class BytesTransferredContainer(object):
54  """Container class for passing number of bytes transferred to lower layers.
55
56  For resumed transfers or connection rebuilds in the middle of a transfer, we
57  need to rebuild the connection class with how much we've transferred so far.
58  For uploads, we don't know the total number of bytes uploaded until we've
59  queried the server, but we need to create the connection class to pass to
60  httplib2 before we can query the server. This container object allows us to
61  pass a reference into Upload/DownloadCallbackConnection.
62  """
63
64  def __init__(self):
65    self.__bytes_transferred = 0
66
67  @property
68  def bytes_transferred(self):
69    return self.__bytes_transferred
70
71  @bytes_transferred.setter
72  def bytes_transferred(self, value):
73    self.__bytes_transferred = value
74
75
76class UploadCallbackConnectionClassFactory(object):
77  """Creates a class that can override an httplib2 connection.
78
79  This is used to provide progress callbacks and disable dumping the upload
80  payload during debug statements. It can later be used to provide on-the-fly
81  hash digestion during upload.
82  """
83
84  def __init__(self,
85               bytes_uploaded_container,
86               buffer_size=TRANSFER_BUFFER_SIZE,
87               total_size=0,
88               progress_callback=None,
89               logger=None,
90               debug=0):
91    self.bytes_uploaded_container = bytes_uploaded_container
92    self.buffer_size = buffer_size
93    self.total_size = total_size
94    self.progress_callback = progress_callback
95    self.logger = logger
96    self.debug = debug
97
98  def GetConnectionClass(self):
99    """Returns a connection class that overrides send."""
100    outer_bytes_uploaded_container = self.bytes_uploaded_container
101    outer_buffer_size = self.buffer_size
102    outer_total_size = self.total_size
103    outer_progress_callback = self.progress_callback
104    outer_logger = self.logger
105    outer_debug = self.debug
106
107    class UploadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
108      """Connection class override for uploads."""
109      bytes_uploaded_container = outer_bytes_uploaded_container
110      # After we instantiate this class, apitools will check with the server
111      # to find out how many bytes remain for a resumable upload.  This allows
112      # us to update our progress once based on that number.
113      processed_initial_bytes = False
114      GCS_JSON_BUFFER_SIZE = outer_buffer_size
115      callback_processor = None
116      size = outer_total_size
117      header_encoding = ''
118      header_length = None
119      header_range = None
120      size_modifier = 1.0
121
122      def __init__(self, *args, **kwargs):
123        kwargs['timeout'] = SSL_TIMEOUT_SEC
124        httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
125
126      # Override httplib.HTTPConnection._send_output for debug logging.
127      # Because the distinction between headers and message body occurs
128      # only in this httplib function, we can only differentiate them here.
129      def _send_output(self, message_body=None, encode_chunked=False):
130        r"""Send the currently buffered request and clear the buffer.
131
132        Appends an extra \r\n to the buffer.
133
134        Args:
135          message_body: if specified, this is appended to the request.
136        """
137        # TODO: Presently, apitools will set http2lib2.debuglevel to 0
138        # (no prints) or 4 (dump upload payload, httplib prints to stdout).
139        # Refactor to allow our media-handling functions to handle
140        # debuglevel == 4 and print messages to stderr.
141        self._buffer.extend((b'', b''))
142        if six.PY2:
143          items = self._buffer
144        else:
145          items = []
146          for item in self._buffer:
147            if isinstance(item, bytes):
148              items.append(item)
149            else:
150              items.append(item.encode(UTF8))
151        msg = b'\r\n'.join(items)
152        num_metadata_bytes = len(msg)
153        if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger:
154          outer_logger.debug('send: %s' % msg)
155        del self._buffer[:]
156        # If msg and message_body are sent in a single send() call,
157        # it will avoid performance problems caused by the interaction
158        # between delayed ack and the Nagle algorithm.
159        if isinstance(message_body, str):
160          msg += message_body
161          message_body = None
162        self.send(msg, num_metadata_bytes=num_metadata_bytes)
163        if message_body is not None:
164          # message_body was not a string (i.e. it is a file) and
165          # we must run the risk of Nagle
166          self.send(message_body)
167
168      def putheader(self, header, *values):
169        """Overrides HTTPConnection.putheader.
170
171        Send a request header line to the server. For example:
172        h.putheader('Accept', 'text/html').
173
174        This override records the content encoding, length, and range of the
175        payload. For uploads where the content-range difference does not match
176        the content-length, progress printing will under-report progress. These
177        headers are used to calculate a multiplier to correct the progress.
178
179        For example: the content-length for gzip transport encoded data
180        represents the compressed size of the data while the content-range
181        difference represents the uncompressed size. Dividing the
182        content-range difference by the content-length gives the ratio to
183        multiply the progress by to correctly report the relative progress.
184
185        Args:
186          header: The header.
187          *values: A set of values for the header.
188        """
189        if header == 'content-encoding':
190          value = ''.join([str(v) for v in values])
191          self.header_encoding = value
192          if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger:
193            outer_logger.debug(
194                'send: Using gzip transport encoding for the request.')
195        elif header == 'content-length':
196          try:
197            value = int(''.join([str(v) for v in values]))
198            self.header_length = value
199          except ValueError:
200            pass
201        elif header == 'content-range':
202          try:
203            # There are 3 valid header formats:
204            #  '*/%d', '%d-%d/*', and '%d-%d/%d'
205            value = ''.join([str(v) for v in values])
206            ranges = DECIMAL_REGEX().findall(value)
207            # If there are 2 or more range values, they will always
208            # correspond to the start and end ranges in the header.
209            if len(ranges) > 1:
210              # Subtract the end position from the start position.
211              self.header_range = (int(ranges[1]) - int(ranges[0])) + 1
212          except ValueError:
213            pass
214        # If the content header is gzip, and a range and length are set,
215        # update the modifier.
216        if (self.header_encoding == 'gzip' and self.header_length and
217            self.header_range):
218          # Update the modifier
219          self.size_modifier = self.header_range / float(self.header_length)
220          # Reset the headers
221          self.header_encoding = ''
222          self.header_length = None
223          self.header_range = None
224          # Log debug information to catch in tests.
225          if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger:
226            outer_logger.debug('send: Setting progress modifier to %s.' %
227                               (self.size_modifier))
228        # Propagate header values.
229        http_client.HTTPSConnection.putheader(self, header, *values)
230
231      def send(self, data, num_metadata_bytes=0):
232        """Overrides HTTPConnection.send.
233
234        Args:
235          data: string or file-like object (implements read()) of data to send.
236          num_metadata_bytes: number of bytes that consist of metadata
237              (headers, etc.) not representing the data being uploaded.
238        """
239        if not self.processed_initial_bytes:
240          self.processed_initial_bytes = True
241          if outer_progress_callback:
242            self.callback_processor = ProgressCallbackWithTimeout(
243                outer_total_size, outer_progress_callback)
244            self.callback_processor.Progress(
245                self.bytes_uploaded_container.bytes_transferred)
246        # httplib.HTTPConnection.send accepts either a string or a file-like
247        # object (anything that implements read()).
248        if isinstance(data, six.text_type):
249          full_buffer = cStringIO(data)
250        elif isinstance(data, six.binary_type):
251          full_buffer = six.BytesIO(data)
252        else:
253          full_buffer = data
254        partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
255        while partial_buffer:
256          if six.PY2:
257            httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer)
258          else:
259            if isinstance(partial_buffer, bytes):
260              httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer)
261            else:
262              httplib2.HTTPSConnectionWithTimeout.send(
263                  self, partial_buffer.encode(UTF8))
264          sent_data_bytes = len(partial_buffer)
265          if num_metadata_bytes:
266            if num_metadata_bytes <= sent_data_bytes:
267              sent_data_bytes -= num_metadata_bytes
268              num_metadata_bytes = 0
269            else:
270              num_metadata_bytes -= sent_data_bytes
271              sent_data_bytes = 0
272          if self.callback_processor:
273            # Modify the sent data bytes by the size modifier. These are
274            # stored as floats, so the result should be floored.
275            sent_data_bytes = int(sent_data_bytes * self.size_modifier)
276            # TODO: We can't differentiate the multipart upload
277            # metadata in the request body from the actual upload bytes, so we
278            # will actually report slightly more bytes than desired to the
279            # callback handler. Get the number of multipart upload metadata
280            # bytes from apitools and subtract from sent_data_bytes.
281            self.callback_processor.Progress(sent_data_bytes)
282          partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
283
284    return UploadCallbackConnection
285
286
287def WrapUploadHttpRequest(upload_http):
288  """Wraps upload_http so we only use our custom connection_type on PUTs.
289
290  POSTs are used to refresh oauth tokens, and we don't want to process the
291  data sent in those requests.
292
293  Args:
294    upload_http: httplib2.Http instance to wrap
295  """
296  request_orig = upload_http.request
297
298  def NewRequest(uri,
299                 method='GET',
300                 body=None,
301                 headers=None,
302                 redirections=httplib2.DEFAULT_MAX_REDIRECTS,
303                 connection_type=None):
304    if method == 'PUT' or method == 'POST':
305      override_connection_type = connection_type
306    else:
307      override_connection_type = None
308    return request_orig(uri,
309                        method=method,
310                        body=body,
311                        headers=headers,
312                        redirections=redirections,
313                        connection_type=override_connection_type)
314
315  # Replace the request method with our own closure.
316  upload_http.request = NewRequest
317
318
319class DownloadCallbackConnectionClassFactory(object):
320  """Creates a class that can override an httplib2 connection.
321
322  This is used to provide progress callbacks, disable dumping the download
323  payload during debug statements, and provide on-the-fly hash digestion during
324  download. On-the-fly digestion is particularly important because httplib2
325  will decompress gzipped content on-the-fly, thus this class provides our
326  only opportunity to calculate the correct hash for an object that has a
327  gzip hash in the cloud.
328  """
329
330  def __init__(self,
331               bytes_downloaded_container,
332               buffer_size=TRANSFER_BUFFER_SIZE,
333               total_size=0,
334               progress_callback=None,
335               digesters=None):
336    self.buffer_size = buffer_size
337    self.total_size = total_size
338    self.progress_callback = progress_callback
339    self.digesters = digesters
340    self.bytes_downloaded_container = bytes_downloaded_container
341
342  def GetConnectionClass(self):
343    """Returns a connection class that overrides getresponse."""
344
345    class DownloadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
346      """Connection class override for downloads."""
347      outer_total_size = self.total_size
348      outer_digesters = self.digesters
349      outer_progress_callback = self.progress_callback
350      outer_bytes_downloaded_container = self.bytes_downloaded_container
351      processed_initial_bytes = False
352      callback_processor = None
353
354      def __init__(self, *args, **kwargs):
355        kwargs['timeout'] = SSL_TIMEOUT_SEC
356        httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
357
358      def getresponse(self, buffering=False):
359        """Wraps an HTTPResponse to perform callbacks and hashing.
360
361        In this function, self is a DownloadCallbackConnection.
362
363        Args:
364          buffering: Unused. This function uses a local buffer.
365
366        Returns:
367          HTTPResponse object with wrapped read function.
368        """
369        orig_response = http_client.HTTPConnection.getresponse(self)
370        if orig_response.status not in (http_client.OK,
371                                        http_client.PARTIAL_CONTENT):
372          return orig_response
373        orig_read_func = orig_response.read
374
375        def read(amt=None):  # pylint: disable=invalid-name
376          """Overrides HTTPConnection.getresponse.read.
377
378          This function only supports reads of TRANSFER_BUFFER_SIZE or smaller.
379
380          Args:
381            amt: Integer n where 0 < n <= TRANSFER_BUFFER_SIZE. This is a
382                 keyword argument to match the read function it overrides,
383                 but it is required.
384
385          Returns:
386            Data read from HTTPConnection.
387          """
388          if not amt or amt > TRANSFER_BUFFER_SIZE:
389            raise BadRequestException(
390                'Invalid HTTP read size %s during download, expected %s.' %
391                (amt, TRANSFER_BUFFER_SIZE))
392          else:
393            amt = amt or TRANSFER_BUFFER_SIZE
394
395          if not self.processed_initial_bytes:
396            self.processed_initial_bytes = True
397            if self.outer_progress_callback:
398              self.callback_processor = ProgressCallbackWithTimeout(
399                  self.outer_total_size, self.outer_progress_callback)
400              self.callback_processor.Progress(
401                  self.outer_bytes_downloaded_container.bytes_transferred)
402
403          data = orig_read_func(amt)
404          read_length = len(data)
405          if self.callback_processor:
406            self.callback_processor.Progress(read_length)
407          if self.outer_digesters:
408            for alg in self.outer_digesters:
409              self.outer_digesters[alg].update(data)
410          return data
411
412        orig_response.read = read
413
414        return orig_response
415
416    return DownloadCallbackConnection
417
418
419def WrapDownloadHttpRequest(download_http):
420  """Overrides download request functions for an httplib2.Http object.
421
422  Args:
423    download_http: httplib2.Http.object to wrap / override.
424
425  Returns:
426    Wrapped / overridden httplib2.Http object.
427  """
428
429  # httplib2 has a bug (https://github.com/httplib2/httplib2/issues/75) where
430  # custom connection_type is not respected after redirects.  This function is
431  # copied from httplib2 and overrides the request function so that the
432  # connection_type is properly passed through (everything here should be
433  # identical to the _request method in httplib2, with the exception of the line
434  # below marked by the "BUGFIX" comment).
435  # pylint: disable=protected-access,g-inconsistent-quotes,unused-variable
436  # pylint: disable=g-equals-none,g-doc-return-or-yield
437  # pylint: disable=g-short-docstring-punctuation,g-doc-args
438  # pylint: disable=too-many-statements
439  # yapf: disable
440  def OverrideRequest(self, conn, host, absolute_uri, request_uri, method,
441                      body, headers, redirections, cachekey):
442    """Do the actual request using the connection object.
443    Also follow one level of redirects if necessary.
444    """
445
446    auths = ([(auth.depth(request_uri), auth) for auth in self.authorizations
447              if auth.inscope(host, request_uri)])
448    auth = auths and sorted(auths)[0][1] or None
449    if auth:
450      auth.request(method, request_uri, headers, body)
451
452    (response, content) = self._conn_request(conn, request_uri, method, body,
453                                             headers)
454
455    if auth:
456      if auth.response(response, body):
457        auth.request(method, request_uri, headers, body)
458        (response, content) = self._conn_request(conn, request_uri, method,
459                                                 body, headers)
460        response._stale_digest = 1
461
462    if response.status == 401:
463      for authorization in self._auth_from_challenge(
464          host, request_uri, headers, response, content):
465        authorization.request(method, request_uri, headers, body)
466        (response, content) = self._conn_request(conn, request_uri, method,
467                                                 body, headers)
468        if response.status != 401:
469          self.authorizations.append(authorization)
470          authorization.response(response, body)
471          break
472
473    if (self.follow_all_redirects or (method in ["GET", "HEAD"])
474        or response.status == 303):
475      if self.follow_redirects and response.status in [300, 301, 302,
476                                                       303, 307]:
477        # Pick out the location header and basically start from the beginning
478        # remembering first to strip the ETag header and decrement our 'depth'
479        if redirections:
480          if 'location' not in response and response.status != 300:
481            raise httplib2.RedirectMissingLocation(
482                "Redirected but the response is missing a Location: header.",
483                response, content)
484          # Fix-up relative redirects (which violate an RFC 2616 MUST)
485          if 'location' in response:
486            location = response['location']
487            (scheme, authority, path, query, fragment) = parse_uri(location)
488            if authority is None:
489              response['location'] = urllib.parse.urljoin(absolute_uri, location)
490          if response.status == 301 and method in ["GET", "HEAD"]:
491            response['-x-permanent-redirect-url'] = response['location']
492            if 'content-location' not in response:
493              response['content-location'] = absolute_uri
494            httplib2._updateCache(headers, response, content, self.cache,
495                                  cachekey)
496          if 'if-none-match' in headers:
497            del headers['if-none-match']
498          if 'if-modified-since' in headers:
499            del headers['if-modified-since']
500          if ('authorization' in headers and
501              not self.forward_authorization_headers):
502            del headers['authorization']
503          if 'location' in response:
504            location = response['location']
505            old_response = copy.deepcopy(response)
506            if 'content-location' not in old_response:
507              old_response['content-location'] = absolute_uri
508            redirect_method = method
509            if response.status in [302, 303]:
510              redirect_method = "GET"
511              body = None
512            (response, content) = self.request(
513                location, redirect_method, body=body, headers=headers,
514                redirections=redirections-1,
515                # BUGFIX (see comments at the top of this function):
516                connection_type=conn.__class__)
517            response.previous = old_response
518        else:
519          raise httplib2.RedirectLimit(
520              "Redirected more times than redirection_limit allows.",
521              response, content)
522      elif response.status in [200, 203] and method in ["GET", "HEAD"]:
523        # Don't cache 206's since we aren't going to handle byte range
524        # requests
525        if 'content-location' in response:
526          response['content-location'] = absolute_uri
527        httplib2._updateCache(headers, response, content, self.cache,
528                              cachekey)
529
530    return (response, content)
531
532  # Wrap download_http so we do not use our custom connection_type
533  # on POSTS, which are used to refresh oauth tokens. We don't want to
534  # process the data received in those requests.
535  request_orig = download_http.request
536  def NewRequest(uri, method='GET', body=None, headers=None,
537                 redirections=httplib2.DEFAULT_MAX_REDIRECTS,
538                 connection_type=None):
539    if method == 'POST':
540      return request_orig(uri, method=method, body=body,
541                          headers=headers, redirections=redirections,
542                          connection_type=None)
543    else:
544      return request_orig(uri, method=method, body=body,
545                          headers=headers, redirections=redirections,
546                          connection_type=connection_type)
547
548  # Replace the request methods with our own closures.
549  download_http._request = types.MethodType(OverrideRequest, download_http)
550  download_http.request = NewRequest
551
552  return download_http
553
554
555class HttpWithNoRetries(httplib2.Http):
556  """httplib2.Http variant that does not retry.
557
558  httplib2 automatically retries requests according to httplib2.RETRIES, but
559  in certain cases httplib2 ignores the RETRIES value and forces a retry.
560  Because httplib2 does not handle the case where the underlying request body
561  is a stream, a retry may cause a non-idempotent write as the stream is
562  partially consumed and not reset before the retry occurs.
563
564  Here we override _conn_request to disable retries unequivocally, so that
565  uploads may be retried at higher layers that properly handle stream request
566  bodies.
567  """
568
569  def _conn_request(self, conn, request_uri, method, body, headers):  # pylint: disable=too-many-statements
570
571    try:
572      if hasattr(conn, 'sock') and conn.sock is None:
573        conn.connect()
574      conn.request(method, request_uri, body, headers)
575    except socket.timeout:
576      raise
577    except socket.gaierror:
578      conn.close()
579      raise httplib2.ServerNotFoundError('Unable to find the server at %s' %
580                                         conn.host)
581    except httplib2.ssl.SSLError:
582      conn.close()
583      raise
584    except socket.error as e:
585      err = 0
586      if hasattr(e, 'args'):
587        err = getattr(e, 'args')[0]
588      else:
589        err = e.errno
590      if err == httplib2.errno.ECONNREFUSED:  # Connection refused
591        raise
592    except http_client.HTTPException:
593      conn.close()
594      raise
595    try:
596      response = conn.getresponse()
597    except (socket.error, http_client.HTTPException):
598      conn.close()
599      raise
600    else:
601      content = ''
602      if method == 'HEAD':
603        conn.close()
604      else:
605        content = response.read()
606      response = httplib2.Response(response)
607      if method != 'HEAD':
608        # pylint: disable=protected-access
609        content = httplib2._decompressContent(response, content)
610    return (response, content)
611
612
613class HttpWithDownloadStream(httplib2.Http):
614  """httplib2.Http variant that only pushes bytes through a stream.
615
616  httplib2 handles media by storing entire chunks of responses in memory, which
617  is undesirable particularly when multiple instances are used during
618  multi-threaded/multi-process copy. This class copies and then overrides some
619  httplib2 functions to use a streaming copy approach that uses small memory
620  buffers.
621
622  Also disables httplib2 retries (for reasons stated in the HttpWithNoRetries
623  class doc).
624  """
625
626  def __init__(self, *args, **kwds):
627    self._stream = None
628    self._logger = logging.getLogger()
629    super(HttpWithDownloadStream, self).__init__(*args, **kwds)
630
631  @property
632  def stream(self):
633    return self._stream
634
635  @stream.setter
636  def stream(self, value):
637    self._stream = value
638
639  # pylint: disable=too-many-statements
640  def _conn_request(self, conn, request_uri, method, body, headers):
641    try:
642      if hasattr(conn, 'sock') and conn.sock is None:
643        conn.connect()
644      conn.request(method, request_uri, body, headers)
645    except socket.timeout:
646      raise
647    except socket.gaierror:
648      conn.close()
649      raise httplib2.ServerNotFoundError('Unable to find the server at %s' %
650                                         conn.host)
651    except httplib2.ssl.SSLError:
652      conn.close()
653      raise
654    except socket.error as e:
655      err = 0
656      if hasattr(e, 'args'):
657        err = getattr(e, 'args')[0]
658      else:
659        err = e.errno
660      if err == httplib2.errno.ECONNREFUSED:  # Connection refused
661        raise
662    except http_client.HTTPException:
663      # Just because the server closed the connection doesn't apparently mean
664      # that the server didn't send a response.
665      conn.close()
666      raise
667    try:
668      response = conn.getresponse()
669    except (socket.error, http_client.HTTPException) as e:
670      conn.close()
671      raise
672    else:
673      content = ''
674      if method == 'HEAD':
675        conn.close()
676        response = httplib2.Response(response)
677      elif method == 'GET' and response.status in (http_client.OK,
678                                                   http_client.PARTIAL_CONTENT):
679        content_length = None
680        if hasattr(response, 'msg'):
681          content_length = response.getheader('content-length')
682        http_stream = response
683        bytes_read = 0
684        while True:
685          new_data = http_stream.read(TRANSFER_BUFFER_SIZE)
686          if new_data:
687            if self.stream is None:
688              raise apitools_exceptions.InvalidUserInputError(
689                  'Cannot exercise HttpWithDownloadStream with no stream')
690            text_util.write_to_fd(self.stream, new_data)
691            bytes_read += len(new_data)
692          else:
693            break
694
695        if (content_length is not None and
696            long(bytes_read) != long(content_length)):
697          # The input stream terminated before we were able to read the
698          # entire contents, possibly due to a network condition. Set
699          # content-length to indicate how many bytes we actually read.
700          self._logger.log(
701              logging.DEBUG, 'Only got %s bytes out of content-length %s '
702              'for request URI %s. Resetting content-length to match '
703              'bytes read.', bytes_read, content_length, request_uri)
704          response.msg['content-length'] = str(bytes_read)
705        response = httplib2.Response(response)
706      else:
707        # We fall back to the current httplib2 behavior if we're
708        # not processing download bytes, e.g., it's a redirect, an
709        # oauth2client POST to refresh an access token, or any HTTP
710        # status code that doesn't include object content.
711        content = response.read()
712        response = httplib2.Response(response)
713        # pylint: disable=protected-access
714        content = httplib2._decompressContent(response, content)
715    return (response, content)
716
717  # pylint: enable=too-many-statements
718