1# -*- coding: utf-8 -*- 2# Copyright 2014 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Media helper functions and classes for Google Cloud Storage JSON API.""" 16 17from __future__ import absolute_import 18from __future__ import print_function 19from __future__ import division 20from __future__ import unicode_literals 21 22import copy 23import logging 24import re 25import socket 26import types 27 28import six 29from six.moves import http_client 30from six.moves import urllib 31from six.moves import cStringIO 32 33from apitools.base.py import exceptions as apitools_exceptions 34 35from gslib.cloud_api import BadRequestException 36from gslib.lazy_wrapper import LazyWrapper 37from gslib.progress_callback import ProgressCallbackWithTimeout 38from gslib.utils.constants import DEBUGLEVEL_DUMP_REQUESTS 39from gslib.utils.constants import SSL_TIMEOUT_SEC 40from gslib.utils.constants import TRANSFER_BUFFER_SIZE 41from gslib.utils.constants import UTF8 42from gslib.utils import text_util 43import httplib2 44from httplib2 import parse_uri 45 46if six.PY3: 47 long = int 48 49# A regex for matching any series of decimal digits. 50DECIMAL_REGEX = LazyWrapper(lambda: (re.compile(r'\d+'))) 51 52 53class BytesTransferredContainer(object): 54 """Container class for passing number of bytes transferred to lower layers. 55 56 For resumed transfers or connection rebuilds in the middle of a transfer, we 57 need to rebuild the connection class with how much we've transferred so far. 58 For uploads, we don't know the total number of bytes uploaded until we've 59 queried the server, but we need to create the connection class to pass to 60 httplib2 before we can query the server. This container object allows us to 61 pass a reference into Upload/DownloadCallbackConnection. 62 """ 63 64 def __init__(self): 65 self.__bytes_transferred = 0 66 67 @property 68 def bytes_transferred(self): 69 return self.__bytes_transferred 70 71 @bytes_transferred.setter 72 def bytes_transferred(self, value): 73 self.__bytes_transferred = value 74 75 76class UploadCallbackConnectionClassFactory(object): 77 """Creates a class that can override an httplib2 connection. 78 79 This is used to provide progress callbacks and disable dumping the upload 80 payload during debug statements. It can later be used to provide on-the-fly 81 hash digestion during upload. 82 """ 83 84 def __init__(self, 85 bytes_uploaded_container, 86 buffer_size=TRANSFER_BUFFER_SIZE, 87 total_size=0, 88 progress_callback=None, 89 logger=None, 90 debug=0): 91 self.bytes_uploaded_container = bytes_uploaded_container 92 self.buffer_size = buffer_size 93 self.total_size = total_size 94 self.progress_callback = progress_callback 95 self.logger = logger 96 self.debug = debug 97 98 def GetConnectionClass(self): 99 """Returns a connection class that overrides send.""" 100 outer_bytes_uploaded_container = self.bytes_uploaded_container 101 outer_buffer_size = self.buffer_size 102 outer_total_size = self.total_size 103 outer_progress_callback = self.progress_callback 104 outer_logger = self.logger 105 outer_debug = self.debug 106 107 class UploadCallbackConnection(httplib2.HTTPSConnectionWithTimeout): 108 """Connection class override for uploads.""" 109 bytes_uploaded_container = outer_bytes_uploaded_container 110 # After we instantiate this class, apitools will check with the server 111 # to find out how many bytes remain for a resumable upload. This allows 112 # us to update our progress once based on that number. 113 processed_initial_bytes = False 114 GCS_JSON_BUFFER_SIZE = outer_buffer_size 115 callback_processor = None 116 size = outer_total_size 117 header_encoding = '' 118 header_length = None 119 header_range = None 120 size_modifier = 1.0 121 122 def __init__(self, *args, **kwargs): 123 kwargs['timeout'] = SSL_TIMEOUT_SEC 124 httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs) 125 126 # Override httplib.HTTPConnection._send_output for debug logging. 127 # Because the distinction between headers and message body occurs 128 # only in this httplib function, we can only differentiate them here. 129 def _send_output(self, message_body=None, encode_chunked=False): 130 r"""Send the currently buffered request and clear the buffer. 131 132 Appends an extra \r\n to the buffer. 133 134 Args: 135 message_body: if specified, this is appended to the request. 136 """ 137 # TODO: Presently, apitools will set http2lib2.debuglevel to 0 138 # (no prints) or 4 (dump upload payload, httplib prints to stdout). 139 # Refactor to allow our media-handling functions to handle 140 # debuglevel == 4 and print messages to stderr. 141 self._buffer.extend((b'', b'')) 142 if six.PY2: 143 items = self._buffer 144 else: 145 items = [] 146 for item in self._buffer: 147 if isinstance(item, bytes): 148 items.append(item) 149 else: 150 items.append(item.encode(UTF8)) 151 msg = b'\r\n'.join(items) 152 num_metadata_bytes = len(msg) 153 if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger: 154 outer_logger.debug('send: %s' % msg) 155 del self._buffer[:] 156 # If msg and message_body are sent in a single send() call, 157 # it will avoid performance problems caused by the interaction 158 # between delayed ack and the Nagle algorithm. 159 if isinstance(message_body, str): 160 msg += message_body 161 message_body = None 162 self.send(msg, num_metadata_bytes=num_metadata_bytes) 163 if message_body is not None: 164 # message_body was not a string (i.e. it is a file) and 165 # we must run the risk of Nagle 166 self.send(message_body) 167 168 def putheader(self, header, *values): 169 """Overrides HTTPConnection.putheader. 170 171 Send a request header line to the server. For example: 172 h.putheader('Accept', 'text/html'). 173 174 This override records the content encoding, length, and range of the 175 payload. For uploads where the content-range difference does not match 176 the content-length, progress printing will under-report progress. These 177 headers are used to calculate a multiplier to correct the progress. 178 179 For example: the content-length for gzip transport encoded data 180 represents the compressed size of the data while the content-range 181 difference represents the uncompressed size. Dividing the 182 content-range difference by the content-length gives the ratio to 183 multiply the progress by to correctly report the relative progress. 184 185 Args: 186 header: The header. 187 *values: A set of values for the header. 188 """ 189 if header == 'content-encoding': 190 value = ''.join([str(v) for v in values]) 191 self.header_encoding = value 192 if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger: 193 outer_logger.debug( 194 'send: Using gzip transport encoding for the request.') 195 elif header == 'content-length': 196 try: 197 value = int(''.join([str(v) for v in values])) 198 self.header_length = value 199 except ValueError: 200 pass 201 elif header == 'content-range': 202 try: 203 # There are 3 valid header formats: 204 # '*/%d', '%d-%d/*', and '%d-%d/%d' 205 value = ''.join([str(v) for v in values]) 206 ranges = DECIMAL_REGEX().findall(value) 207 # If there are 2 or more range values, they will always 208 # correspond to the start and end ranges in the header. 209 if len(ranges) > 1: 210 # Subtract the end position from the start position. 211 self.header_range = (int(ranges[1]) - int(ranges[0])) + 1 212 except ValueError: 213 pass 214 # If the content header is gzip, and a range and length are set, 215 # update the modifier. 216 if (self.header_encoding == 'gzip' and self.header_length and 217 self.header_range): 218 # Update the modifier 219 self.size_modifier = self.header_range / float(self.header_length) 220 # Reset the headers 221 self.header_encoding = '' 222 self.header_length = None 223 self.header_range = None 224 # Log debug information to catch in tests. 225 if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger: 226 outer_logger.debug('send: Setting progress modifier to %s.' % 227 (self.size_modifier)) 228 # Propagate header values. 229 http_client.HTTPSConnection.putheader(self, header, *values) 230 231 def send(self, data, num_metadata_bytes=0): 232 """Overrides HTTPConnection.send. 233 234 Args: 235 data: string or file-like object (implements read()) of data to send. 236 num_metadata_bytes: number of bytes that consist of metadata 237 (headers, etc.) not representing the data being uploaded. 238 """ 239 if not self.processed_initial_bytes: 240 self.processed_initial_bytes = True 241 if outer_progress_callback: 242 self.callback_processor = ProgressCallbackWithTimeout( 243 outer_total_size, outer_progress_callback) 244 self.callback_processor.Progress( 245 self.bytes_uploaded_container.bytes_transferred) 246 # httplib.HTTPConnection.send accepts either a string or a file-like 247 # object (anything that implements read()). 248 if isinstance(data, six.text_type): 249 full_buffer = cStringIO(data) 250 elif isinstance(data, six.binary_type): 251 full_buffer = six.BytesIO(data) 252 else: 253 full_buffer = data 254 partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE) 255 while partial_buffer: 256 if six.PY2: 257 httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer) 258 else: 259 if isinstance(partial_buffer, bytes): 260 httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer) 261 else: 262 httplib2.HTTPSConnectionWithTimeout.send( 263 self, partial_buffer.encode(UTF8)) 264 sent_data_bytes = len(partial_buffer) 265 if num_metadata_bytes: 266 if num_metadata_bytes <= sent_data_bytes: 267 sent_data_bytes -= num_metadata_bytes 268 num_metadata_bytes = 0 269 else: 270 num_metadata_bytes -= sent_data_bytes 271 sent_data_bytes = 0 272 if self.callback_processor: 273 # Modify the sent data bytes by the size modifier. These are 274 # stored as floats, so the result should be floored. 275 sent_data_bytes = int(sent_data_bytes * self.size_modifier) 276 # TODO: We can't differentiate the multipart upload 277 # metadata in the request body from the actual upload bytes, so we 278 # will actually report slightly more bytes than desired to the 279 # callback handler. Get the number of multipart upload metadata 280 # bytes from apitools and subtract from sent_data_bytes. 281 self.callback_processor.Progress(sent_data_bytes) 282 partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE) 283 284 return UploadCallbackConnection 285 286 287def WrapUploadHttpRequest(upload_http): 288 """Wraps upload_http so we only use our custom connection_type on PUTs. 289 290 POSTs are used to refresh oauth tokens, and we don't want to process the 291 data sent in those requests. 292 293 Args: 294 upload_http: httplib2.Http instance to wrap 295 """ 296 request_orig = upload_http.request 297 298 def NewRequest(uri, 299 method='GET', 300 body=None, 301 headers=None, 302 redirections=httplib2.DEFAULT_MAX_REDIRECTS, 303 connection_type=None): 304 if method == 'PUT' or method == 'POST': 305 override_connection_type = connection_type 306 else: 307 override_connection_type = None 308 return request_orig(uri, 309 method=method, 310 body=body, 311 headers=headers, 312 redirections=redirections, 313 connection_type=override_connection_type) 314 315 # Replace the request method with our own closure. 316 upload_http.request = NewRequest 317 318 319class DownloadCallbackConnectionClassFactory(object): 320 """Creates a class that can override an httplib2 connection. 321 322 This is used to provide progress callbacks, disable dumping the download 323 payload during debug statements, and provide on-the-fly hash digestion during 324 download. On-the-fly digestion is particularly important because httplib2 325 will decompress gzipped content on-the-fly, thus this class provides our 326 only opportunity to calculate the correct hash for an object that has a 327 gzip hash in the cloud. 328 """ 329 330 def __init__(self, 331 bytes_downloaded_container, 332 buffer_size=TRANSFER_BUFFER_SIZE, 333 total_size=0, 334 progress_callback=None, 335 digesters=None): 336 self.buffer_size = buffer_size 337 self.total_size = total_size 338 self.progress_callback = progress_callback 339 self.digesters = digesters 340 self.bytes_downloaded_container = bytes_downloaded_container 341 342 def GetConnectionClass(self): 343 """Returns a connection class that overrides getresponse.""" 344 345 class DownloadCallbackConnection(httplib2.HTTPSConnectionWithTimeout): 346 """Connection class override for downloads.""" 347 outer_total_size = self.total_size 348 outer_digesters = self.digesters 349 outer_progress_callback = self.progress_callback 350 outer_bytes_downloaded_container = self.bytes_downloaded_container 351 processed_initial_bytes = False 352 callback_processor = None 353 354 def __init__(self, *args, **kwargs): 355 kwargs['timeout'] = SSL_TIMEOUT_SEC 356 httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs) 357 358 def getresponse(self, buffering=False): 359 """Wraps an HTTPResponse to perform callbacks and hashing. 360 361 In this function, self is a DownloadCallbackConnection. 362 363 Args: 364 buffering: Unused. This function uses a local buffer. 365 366 Returns: 367 HTTPResponse object with wrapped read function. 368 """ 369 orig_response = http_client.HTTPConnection.getresponse(self) 370 if orig_response.status not in (http_client.OK, 371 http_client.PARTIAL_CONTENT): 372 return orig_response 373 orig_read_func = orig_response.read 374 375 def read(amt=None): # pylint: disable=invalid-name 376 """Overrides HTTPConnection.getresponse.read. 377 378 This function only supports reads of TRANSFER_BUFFER_SIZE or smaller. 379 380 Args: 381 amt: Integer n where 0 < n <= TRANSFER_BUFFER_SIZE. This is a 382 keyword argument to match the read function it overrides, 383 but it is required. 384 385 Returns: 386 Data read from HTTPConnection. 387 """ 388 if not amt or amt > TRANSFER_BUFFER_SIZE: 389 raise BadRequestException( 390 'Invalid HTTP read size %s during download, expected %s.' % 391 (amt, TRANSFER_BUFFER_SIZE)) 392 else: 393 amt = amt or TRANSFER_BUFFER_SIZE 394 395 if not self.processed_initial_bytes: 396 self.processed_initial_bytes = True 397 if self.outer_progress_callback: 398 self.callback_processor = ProgressCallbackWithTimeout( 399 self.outer_total_size, self.outer_progress_callback) 400 self.callback_processor.Progress( 401 self.outer_bytes_downloaded_container.bytes_transferred) 402 403 data = orig_read_func(amt) 404 read_length = len(data) 405 if self.callback_processor: 406 self.callback_processor.Progress(read_length) 407 if self.outer_digesters: 408 for alg in self.outer_digesters: 409 self.outer_digesters[alg].update(data) 410 return data 411 412 orig_response.read = read 413 414 return orig_response 415 416 return DownloadCallbackConnection 417 418 419def WrapDownloadHttpRequest(download_http): 420 """Overrides download request functions for an httplib2.Http object. 421 422 Args: 423 download_http: httplib2.Http.object to wrap / override. 424 425 Returns: 426 Wrapped / overridden httplib2.Http object. 427 """ 428 429 # httplib2 has a bug (https://github.com/httplib2/httplib2/issues/75) where 430 # custom connection_type is not respected after redirects. This function is 431 # copied from httplib2 and overrides the request function so that the 432 # connection_type is properly passed through (everything here should be 433 # identical to the _request method in httplib2, with the exception of the line 434 # below marked by the "BUGFIX" comment). 435 # pylint: disable=protected-access,g-inconsistent-quotes,unused-variable 436 # pylint: disable=g-equals-none,g-doc-return-or-yield 437 # pylint: disable=g-short-docstring-punctuation,g-doc-args 438 # pylint: disable=too-many-statements 439 # yapf: disable 440 def OverrideRequest(self, conn, host, absolute_uri, request_uri, method, 441 body, headers, redirections, cachekey): 442 """Do the actual request using the connection object. 443 Also follow one level of redirects if necessary. 444 """ 445 446 auths = ([(auth.depth(request_uri), auth) for auth in self.authorizations 447 if auth.inscope(host, request_uri)]) 448 auth = auths and sorted(auths)[0][1] or None 449 if auth: 450 auth.request(method, request_uri, headers, body) 451 452 (response, content) = self._conn_request(conn, request_uri, method, body, 453 headers) 454 455 if auth: 456 if auth.response(response, body): 457 auth.request(method, request_uri, headers, body) 458 (response, content) = self._conn_request(conn, request_uri, method, 459 body, headers) 460 response._stale_digest = 1 461 462 if response.status == 401: 463 for authorization in self._auth_from_challenge( 464 host, request_uri, headers, response, content): 465 authorization.request(method, request_uri, headers, body) 466 (response, content) = self._conn_request(conn, request_uri, method, 467 body, headers) 468 if response.status != 401: 469 self.authorizations.append(authorization) 470 authorization.response(response, body) 471 break 472 473 if (self.follow_all_redirects or (method in ["GET", "HEAD"]) 474 or response.status == 303): 475 if self.follow_redirects and response.status in [300, 301, 302, 476 303, 307]: 477 # Pick out the location header and basically start from the beginning 478 # remembering first to strip the ETag header and decrement our 'depth' 479 if redirections: 480 if 'location' not in response and response.status != 300: 481 raise httplib2.RedirectMissingLocation( 482 "Redirected but the response is missing a Location: header.", 483 response, content) 484 # Fix-up relative redirects (which violate an RFC 2616 MUST) 485 if 'location' in response: 486 location = response['location'] 487 (scheme, authority, path, query, fragment) = parse_uri(location) 488 if authority is None: 489 response['location'] = urllib.parse.urljoin(absolute_uri, location) 490 if response.status == 301 and method in ["GET", "HEAD"]: 491 response['-x-permanent-redirect-url'] = response['location'] 492 if 'content-location' not in response: 493 response['content-location'] = absolute_uri 494 httplib2._updateCache(headers, response, content, self.cache, 495 cachekey) 496 if 'if-none-match' in headers: 497 del headers['if-none-match'] 498 if 'if-modified-since' in headers: 499 del headers['if-modified-since'] 500 if ('authorization' in headers and 501 not self.forward_authorization_headers): 502 del headers['authorization'] 503 if 'location' in response: 504 location = response['location'] 505 old_response = copy.deepcopy(response) 506 if 'content-location' not in old_response: 507 old_response['content-location'] = absolute_uri 508 redirect_method = method 509 if response.status in [302, 303]: 510 redirect_method = "GET" 511 body = None 512 (response, content) = self.request( 513 location, redirect_method, body=body, headers=headers, 514 redirections=redirections-1, 515 # BUGFIX (see comments at the top of this function): 516 connection_type=conn.__class__) 517 response.previous = old_response 518 else: 519 raise httplib2.RedirectLimit( 520 "Redirected more times than redirection_limit allows.", 521 response, content) 522 elif response.status in [200, 203] and method in ["GET", "HEAD"]: 523 # Don't cache 206's since we aren't going to handle byte range 524 # requests 525 if 'content-location' in response: 526 response['content-location'] = absolute_uri 527 httplib2._updateCache(headers, response, content, self.cache, 528 cachekey) 529 530 return (response, content) 531 532 # Wrap download_http so we do not use our custom connection_type 533 # on POSTS, which are used to refresh oauth tokens. We don't want to 534 # process the data received in those requests. 535 request_orig = download_http.request 536 def NewRequest(uri, method='GET', body=None, headers=None, 537 redirections=httplib2.DEFAULT_MAX_REDIRECTS, 538 connection_type=None): 539 if method == 'POST': 540 return request_orig(uri, method=method, body=body, 541 headers=headers, redirections=redirections, 542 connection_type=None) 543 else: 544 return request_orig(uri, method=method, body=body, 545 headers=headers, redirections=redirections, 546 connection_type=connection_type) 547 548 # Replace the request methods with our own closures. 549 download_http._request = types.MethodType(OverrideRequest, download_http) 550 download_http.request = NewRequest 551 552 return download_http 553 554 555class HttpWithNoRetries(httplib2.Http): 556 """httplib2.Http variant that does not retry. 557 558 httplib2 automatically retries requests according to httplib2.RETRIES, but 559 in certain cases httplib2 ignores the RETRIES value and forces a retry. 560 Because httplib2 does not handle the case where the underlying request body 561 is a stream, a retry may cause a non-idempotent write as the stream is 562 partially consumed and not reset before the retry occurs. 563 564 Here we override _conn_request to disable retries unequivocally, so that 565 uploads may be retried at higher layers that properly handle stream request 566 bodies. 567 """ 568 569 def _conn_request(self, conn, request_uri, method, body, headers): # pylint: disable=too-many-statements 570 571 try: 572 if hasattr(conn, 'sock') and conn.sock is None: 573 conn.connect() 574 conn.request(method, request_uri, body, headers) 575 except socket.timeout: 576 raise 577 except socket.gaierror: 578 conn.close() 579 raise httplib2.ServerNotFoundError('Unable to find the server at %s' % 580 conn.host) 581 except httplib2.ssl.SSLError: 582 conn.close() 583 raise 584 except socket.error as e: 585 err = 0 586 if hasattr(e, 'args'): 587 err = getattr(e, 'args')[0] 588 else: 589 err = e.errno 590 if err == httplib2.errno.ECONNREFUSED: # Connection refused 591 raise 592 except http_client.HTTPException: 593 conn.close() 594 raise 595 try: 596 response = conn.getresponse() 597 except (socket.error, http_client.HTTPException): 598 conn.close() 599 raise 600 else: 601 content = '' 602 if method == 'HEAD': 603 conn.close() 604 else: 605 content = response.read() 606 response = httplib2.Response(response) 607 if method != 'HEAD': 608 # pylint: disable=protected-access 609 content = httplib2._decompressContent(response, content) 610 return (response, content) 611 612 613class HttpWithDownloadStream(httplib2.Http): 614 """httplib2.Http variant that only pushes bytes through a stream. 615 616 httplib2 handles media by storing entire chunks of responses in memory, which 617 is undesirable particularly when multiple instances are used during 618 multi-threaded/multi-process copy. This class copies and then overrides some 619 httplib2 functions to use a streaming copy approach that uses small memory 620 buffers. 621 622 Also disables httplib2 retries (for reasons stated in the HttpWithNoRetries 623 class doc). 624 """ 625 626 def __init__(self, *args, **kwds): 627 self._stream = None 628 self._logger = logging.getLogger() 629 super(HttpWithDownloadStream, self).__init__(*args, **kwds) 630 631 @property 632 def stream(self): 633 return self._stream 634 635 @stream.setter 636 def stream(self, value): 637 self._stream = value 638 639 # pylint: disable=too-many-statements 640 def _conn_request(self, conn, request_uri, method, body, headers): 641 try: 642 if hasattr(conn, 'sock') and conn.sock is None: 643 conn.connect() 644 conn.request(method, request_uri, body, headers) 645 except socket.timeout: 646 raise 647 except socket.gaierror: 648 conn.close() 649 raise httplib2.ServerNotFoundError('Unable to find the server at %s' % 650 conn.host) 651 except httplib2.ssl.SSLError: 652 conn.close() 653 raise 654 except socket.error as e: 655 err = 0 656 if hasattr(e, 'args'): 657 err = getattr(e, 'args')[0] 658 else: 659 err = e.errno 660 if err == httplib2.errno.ECONNREFUSED: # Connection refused 661 raise 662 except http_client.HTTPException: 663 # Just because the server closed the connection doesn't apparently mean 664 # that the server didn't send a response. 665 conn.close() 666 raise 667 try: 668 response = conn.getresponse() 669 except (socket.error, http_client.HTTPException) as e: 670 conn.close() 671 raise 672 else: 673 content = '' 674 if method == 'HEAD': 675 conn.close() 676 response = httplib2.Response(response) 677 elif method == 'GET' and response.status in (http_client.OK, 678 http_client.PARTIAL_CONTENT): 679 content_length = None 680 if hasattr(response, 'msg'): 681 content_length = response.getheader('content-length') 682 http_stream = response 683 bytes_read = 0 684 while True: 685 new_data = http_stream.read(TRANSFER_BUFFER_SIZE) 686 if new_data: 687 if self.stream is None: 688 raise apitools_exceptions.InvalidUserInputError( 689 'Cannot exercise HttpWithDownloadStream with no stream') 690 text_util.write_to_fd(self.stream, new_data) 691 bytes_read += len(new_data) 692 else: 693 break 694 695 if (content_length is not None and 696 long(bytes_read) != long(content_length)): 697 # The input stream terminated before we were able to read the 698 # entire contents, possibly due to a network condition. Set 699 # content-length to indicate how many bytes we actually read. 700 self._logger.log( 701 logging.DEBUG, 'Only got %s bytes out of content-length %s ' 702 'for request URI %s. Resetting content-length to match ' 703 'bytes read.', bytes_read, content_length, request_uri) 704 response.msg['content-length'] = str(bytes_read) 705 response = httplib2.Response(response) 706 else: 707 # We fall back to the current httplib2 behavior if we're 708 # not processing download bytes, e.g., it's a redirect, an 709 # oauth2client POST to refresh an access token, or any HTTP 710 # status code that doesn't include object content. 711 content = response.read() 712 response = httplib2.Response(response) 713 # pylint: disable=protected-access 714 content = httplib2._decompressContent(response, content) 715 return (response, content) 716 717 # pylint: enable=too-many-statements 718