1# Copyright 2010 Google Inc. 2# 3# Permission is hereby granted, free of charge, to any person obtaining a 4# copy of this software and associated documentation files (the 5# "Software"), to deal in the Software without restriction, including 6# without limitation the rights to use, copy, modify, merge, publish, dis- 7# tribute, sublicense, and/or sell copies of the Software, and to permit 8# persons to whom the Software is furnished to do so, subject to the fol- 9# lowing conditions: 10# 11# The above copyright notice and this permission notice shall be included 12# in all copies or substantial portions of the Software. 13# 14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 20# IN THE SOFTWARE. 21import errno 22import httplib 23import os 24import random 25import re 26import socket 27import time 28import urlparse 29from hashlib import md5 30from boto import config, UserAgent 31from boto.connection import AWSAuthConnection 32from boto.exception import InvalidUriError 33from boto.exception import ResumableTransferDisposition 34from boto.exception import ResumableUploadException 35from boto.s3.keyfile import KeyFile 36 37""" 38Handler for Google Cloud Storage resumable uploads. See 39http://code.google.com/apis/storage/docs/developer-guide.html#resumable 40for details. 41 42Resumable uploads will retry failed uploads, resuming at the byte 43count completed by the last upload attempt. If too many retries happen with 44no progress (per configurable num_retries param), the upload will be 45aborted in the current process. 46 47The caller can optionally specify a tracker_file_name param in the 48ResumableUploadHandler constructor. If you do this, that file will 49save the state needed to allow retrying later, in a separate process 50(e.g., in a later run of gsutil). 51""" 52 53 54class ResumableUploadHandler(object): 55 56 BUFFER_SIZE = 8192 57 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, 58 socket.gaierror) 59 60 # (start, end) response indicating server has nothing (upload protocol uses 61 # inclusive numbering). 62 SERVER_HAS_NOTHING = (0, -1) 63 64 def __init__(self, tracker_file_name=None, num_retries=None): 65 """ 66 Constructor. Instantiate once for each uploaded file. 67 68 :type tracker_file_name: string 69 :param tracker_file_name: optional file name to save tracker URI. 70 If supplied and the current process fails the upload, it can be 71 retried in a new process. If called with an existing file containing 72 a valid tracker URI, we'll resume the upload from this URI; else 73 we'll start a new resumable upload (and write the URI to this 74 tracker file). 75 76 :type num_retries: int 77 :param num_retries: the number of times we'll re-try a resumable upload 78 making no progress. (Count resets every time we get progress, so 79 upload can span many more than this number of retries.) 80 """ 81 self.tracker_file_name = tracker_file_name 82 self.num_retries = num_retries 83 self.server_has_bytes = 0 # Byte count at last server check. 84 self.tracker_uri = None 85 if tracker_file_name: 86 self._load_tracker_uri_from_file() 87 # Save upload_start_point in instance state so caller can find how 88 # much was transferred by this ResumableUploadHandler (across retries). 89 self.upload_start_point = None 90 91 def _load_tracker_uri_from_file(self): 92 f = None 93 try: 94 f = open(self.tracker_file_name, 'r') 95 uri = f.readline().strip() 96 self._set_tracker_uri(uri) 97 except IOError as e: 98 # Ignore non-existent file (happens first time an upload 99 # is attempted on a file), but warn user for other errors. 100 if e.errno != errno.ENOENT: 101 # Will restart because self.tracker_uri is None. 102 print('Couldn\'t read URI tracker file (%s): %s. Restarting ' 103 'upload from scratch.' % 104 (self.tracker_file_name, e.strerror)) 105 except InvalidUriError as e: 106 # Warn user, but proceed (will restart because 107 # self.tracker_uri is None). 108 print('Invalid tracker URI (%s) found in URI tracker file ' 109 '(%s). Restarting upload from scratch.' % 110 (uri, self.tracker_file_name)) 111 finally: 112 if f: 113 f.close() 114 115 def _save_tracker_uri_to_file(self): 116 """ 117 Saves URI to tracker file if one was passed to constructor. 118 """ 119 if not self.tracker_file_name: 120 return 121 f = None 122 try: 123 with os.fdopen(os.open(self.tracker_file_name, 124 os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: 125 f.write(self.tracker_uri) 126 except IOError as e: 127 raise ResumableUploadException( 128 'Couldn\'t write URI tracker file (%s): %s.\nThis can happen' 129 'if you\'re using an incorrectly configured upload tool\n' 130 '(e.g., gsutil configured to save tracker files to an ' 131 'unwritable directory)' % 132 (self.tracker_file_name, e.strerror), 133 ResumableTransferDisposition.ABORT) 134 135 def _set_tracker_uri(self, uri): 136 """ 137 Called when we start a new resumable upload or get a new tracker 138 URI for the upload. Saves URI and resets upload state. 139 140 Raises InvalidUriError if URI is syntactically invalid. 141 """ 142 parse_result = urlparse.urlparse(uri) 143 if (parse_result.scheme.lower() not in ['http', 'https'] or 144 not parse_result.netloc): 145 raise InvalidUriError('Invalid tracker URI (%s)' % uri) 146 self.tracker_uri = uri 147 self.tracker_uri_host = parse_result.netloc 148 self.tracker_uri_path = '%s?%s' % ( 149 parse_result.path, parse_result.query) 150 self.server_has_bytes = 0 151 152 def get_tracker_uri(self): 153 """ 154 Returns upload tracker URI, or None if the upload has not yet started. 155 """ 156 return self.tracker_uri 157 158 def get_upload_id(self): 159 """ 160 Returns the upload ID for the resumable upload, or None if the upload 161 has not yet started. 162 """ 163 # We extract the upload_id from the tracker uri. We could retrieve the 164 # upload_id from the headers in the response but this only works for 165 # the case where we get the tracker uri from the service. In the case 166 # where we get the tracker from the tracking file we need to do this 167 # logic anyway. 168 delim = '?upload_id=' 169 if self.tracker_uri and delim in self.tracker_uri: 170 return self.tracker_uri[self.tracker_uri.index(delim) + len(delim):] 171 else: 172 return None 173 174 def _remove_tracker_file(self): 175 if (self.tracker_file_name and 176 os.path.exists(self.tracker_file_name)): 177 os.unlink(self.tracker_file_name) 178 179 def _build_content_range_header(self, range_spec='*', length_spec='*'): 180 return 'bytes %s/%s' % (range_spec, length_spec) 181 182 def _query_server_state(self, conn, file_length): 183 """ 184 Queries server to find out state of given upload. 185 186 Note that this method really just makes special case use of the 187 fact that the upload server always returns the current start/end 188 state whenever a PUT doesn't complete. 189 190 Returns HTTP response from sending request. 191 192 Raises ResumableUploadException if problem querying server. 193 """ 194 # Send an empty PUT so that server replies with this resumable 195 # transfer's state. 196 put_headers = {} 197 put_headers['Content-Range'] = ( 198 self._build_content_range_header('*', file_length)) 199 put_headers['Content-Length'] = '0' 200 return AWSAuthConnection.make_request(conn, 'PUT', 201 path=self.tracker_uri_path, 202 auth_path=self.tracker_uri_path, 203 headers=put_headers, 204 host=self.tracker_uri_host) 205 206 def _query_server_pos(self, conn, file_length): 207 """ 208 Queries server to find out what bytes it currently has. 209 210 Returns (server_start, server_end), where the values are inclusive. 211 For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2. 212 213 Raises ResumableUploadException if problem querying server. 214 """ 215 resp = self._query_server_state(conn, file_length) 216 if resp.status == 200: 217 # To handle the boundary condition where the server has the complete 218 # file, we return (server_start, file_length-1). That way the 219 # calling code can always simply read up through server_end. (If we 220 # didn't handle this boundary condition here, the caller would have 221 # to check whether server_end == file_length and read one fewer byte 222 # in that case.) 223 return (0, file_length - 1) # Completed upload. 224 if resp.status != 308: 225 # This means the server didn't have any state for the given 226 # upload ID, which can happen (for example) if the caller saved 227 # the tracker URI to a file and then tried to restart the transfer 228 # after that upload ID has gone stale. In that case we need to 229 # start a new transfer (and the caller will then save the new 230 # tracker URI to the tracker file). 231 raise ResumableUploadException( 232 'Got non-308 response (%s) from server state query' % 233 resp.status, ResumableTransferDisposition.START_OVER) 234 got_valid_response = False 235 range_spec = resp.getheader('range') 236 if range_spec: 237 # Parse 'bytes=<from>-<to>' range_spec. 238 m = re.search('bytes=(\d+)-(\d+)', range_spec) 239 if m: 240 server_start = long(m.group(1)) 241 server_end = long(m.group(2)) 242 got_valid_response = True 243 else: 244 # No Range header, which means the server does not yet have 245 # any bytes. Note that the Range header uses inclusive 'from' 246 # and 'to' values. Since Range 0-0 would mean that the server 247 # has byte 0, omitting the Range header is used to indicate that 248 # the server doesn't have any bytes. 249 return self.SERVER_HAS_NOTHING 250 if not got_valid_response: 251 raise ResumableUploadException( 252 'Couldn\'t parse upload server state query response (%s)' % 253 str(resp.getheaders()), ResumableTransferDisposition.START_OVER) 254 if conn.debug >= 1: 255 print('Server has: Range: %d - %d.' % (server_start, server_end)) 256 return (server_start, server_end) 257 258 def _start_new_resumable_upload(self, key, headers=None): 259 """ 260 Starts a new resumable upload. 261 262 Raises ResumableUploadException if any errors occur. 263 """ 264 conn = key.bucket.connection 265 if conn.debug >= 1: 266 print('Starting new resumable upload.') 267 self.server_has_bytes = 0 268 269 # Start a new resumable upload by sending a POST request with an 270 # empty body and the "X-Goog-Resumable: start" header. Include any 271 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length 272 # (and raise an exception if they tried to pass one, since it's 273 # a semantic error to specify it at this point, and if we were to 274 # include one now it would cause the server to expect that many 275 # bytes; the POST doesn't include the actual file bytes We set 276 # the Content-Length in the subsequent PUT, based on the uploaded 277 # file size. 278 post_headers = {} 279 for k in headers: 280 if k.lower() == 'content-length': 281 raise ResumableUploadException( 282 'Attempt to specify Content-Length header (disallowed)', 283 ResumableTransferDisposition.ABORT) 284 post_headers[k] = headers[k] 285 post_headers[conn.provider.resumable_upload_header] = 'start' 286 287 resp = conn.make_request( 288 'POST', key.bucket.name, key.name, post_headers) 289 # Get tracker URI from response 'Location' header. 290 body = resp.read() 291 292 # Check for various status conditions. 293 if resp.status in [500, 503]: 294 # Retry status 500 and 503 errors after a delay. 295 raise ResumableUploadException( 296 'Got status %d from attempt to start resumable upload. ' 297 'Will wait/retry' % resp.status, 298 ResumableTransferDisposition.WAIT_BEFORE_RETRY) 299 elif resp.status != 200 and resp.status != 201: 300 raise ResumableUploadException( 301 'Got status %d from attempt to start resumable upload. ' 302 'Aborting' % resp.status, 303 ResumableTransferDisposition.ABORT) 304 305 # Else we got 200 or 201 response code, indicating the resumable 306 # upload was created. 307 tracker_uri = resp.getheader('Location') 308 if not tracker_uri: 309 raise ResumableUploadException( 310 'No resumable tracker URI found in resumable initiation ' 311 'POST response (%s)' % body, 312 ResumableTransferDisposition.WAIT_BEFORE_RETRY) 313 self._set_tracker_uri(tracker_uri) 314 self._save_tracker_uri_to_file() 315 316 def _upload_file_bytes(self, conn, http_conn, fp, file_length, 317 total_bytes_uploaded, cb, num_cb, headers): 318 """ 319 Makes one attempt to upload file bytes, using an existing resumable 320 upload connection. 321 322 Returns (etag, generation, metageneration) from server upon success. 323 324 Raises ResumableUploadException if any problems occur. 325 """ 326 buf = fp.read(self.BUFFER_SIZE) 327 if cb: 328 # The cb_count represents the number of full buffers to send between 329 # cb executions. 330 if num_cb > 2: 331 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2) 332 elif num_cb < 0: 333 cb_count = -1 334 else: 335 cb_count = 0 336 i = 0 337 cb(total_bytes_uploaded, file_length) 338 339 # Build resumable upload headers for the transfer. Don't send a 340 # Content-Range header if the file is 0 bytes long, because the 341 # resumable upload protocol uses an *inclusive* end-range (so, sending 342 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file). 343 if not headers: 344 put_headers = {} 345 else: 346 put_headers = headers.copy() 347 if file_length: 348 if total_bytes_uploaded == file_length: 349 range_header = self._build_content_range_header( 350 '*', file_length) 351 else: 352 range_header = self._build_content_range_header( 353 '%d-%d' % (total_bytes_uploaded, file_length - 1), 354 file_length) 355 put_headers['Content-Range'] = range_header 356 # Set Content-Length to the total bytes we'll send with this PUT. 357 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded) 358 http_request = AWSAuthConnection.build_base_http_request( 359 conn, 'PUT', path=self.tracker_uri_path, auth_path=None, 360 headers=put_headers, host=self.tracker_uri_host) 361 http_conn.putrequest('PUT', http_request.path) 362 for k in put_headers: 363 http_conn.putheader(k, put_headers[k]) 364 http_conn.endheaders() 365 366 # Turn off debug on http connection so upload content isn't included 367 # in debug stream. 368 http_conn.set_debuglevel(0) 369 while buf: 370 http_conn.send(buf) 371 for alg in self.digesters: 372 self.digesters[alg].update(buf) 373 total_bytes_uploaded += len(buf) 374 if cb: 375 i += 1 376 if i == cb_count or cb_count == -1: 377 cb(total_bytes_uploaded, file_length) 378 i = 0 379 buf = fp.read(self.BUFFER_SIZE) 380 http_conn.set_debuglevel(conn.debug) 381 if cb: 382 cb(total_bytes_uploaded, file_length) 383 if total_bytes_uploaded != file_length: 384 # Abort (and delete the tracker file) so if the user retries 385 # they'll start a new resumable upload rather than potentially 386 # attempting to pick back up later where we left off. 387 raise ResumableUploadException( 388 'File changed during upload: EOF at %d bytes of %d byte file.' % 389 (total_bytes_uploaded, file_length), 390 ResumableTransferDisposition.ABORT) 391 resp = http_conn.getresponse() 392 # Restore http connection debug level. 393 http_conn.set_debuglevel(conn.debug) 394 395 if resp.status == 200: 396 # Success. 397 return (resp.getheader('etag'), 398 resp.getheader('x-goog-generation'), 399 resp.getheader('x-goog-metageneration')) 400 # Retry timeout (408) and status 500 and 503 errors after a delay. 401 elif resp.status in [408, 500, 503]: 402 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY 403 else: 404 # Catch all for any other error codes. 405 disposition = ResumableTransferDisposition.ABORT 406 raise ResumableUploadException('Got response code %d while attempting ' 407 'upload (%s)' % 408 (resp.status, resp.reason), disposition) 409 410 def _attempt_resumable_upload(self, key, fp, file_length, headers, cb, 411 num_cb): 412 """ 413 Attempts a resumable upload. 414 415 Returns (etag, generation, metageneration) from server upon success. 416 417 Raises ResumableUploadException if any problems occur. 418 """ 419 (server_start, server_end) = self.SERVER_HAS_NOTHING 420 conn = key.bucket.connection 421 if self.tracker_uri: 422 # Try to resume existing resumable upload. 423 try: 424 (server_start, server_end) = ( 425 self._query_server_pos(conn, file_length)) 426 self.server_has_bytes = server_start 427 428 if server_end: 429 # If the server already has some of the content, we need to 430 # update the digesters with the bytes that have already been 431 # uploaded to ensure we get a complete hash in the end. 432 print('Catching up hash digest(s) for resumed upload') 433 fp.seek(0) 434 # Read local file's bytes through position server has. For 435 # example, if server has (0, 3) we want to read 3-0+1=4 bytes. 436 bytes_to_go = server_end + 1 437 while bytes_to_go: 438 chunk = fp.read(min(key.BufferSize, bytes_to_go)) 439 if not chunk: 440 raise ResumableUploadException( 441 'Hit end of file during resumable upload hash ' 442 'catchup. This should not happen under\n' 443 'normal circumstances, as it indicates the ' 444 'server has more bytes of this transfer\nthan' 445 ' the current file size. Restarting upload.', 446 ResumableTransferDisposition.START_OVER) 447 for alg in self.digesters: 448 self.digesters[alg].update(chunk) 449 bytes_to_go -= len(chunk) 450 451 if conn.debug >= 1: 452 print('Resuming transfer.') 453 except ResumableUploadException as e: 454 if conn.debug >= 1: 455 print('Unable to resume transfer (%s).' % e.message) 456 self._start_new_resumable_upload(key, headers) 457 else: 458 self._start_new_resumable_upload(key, headers) 459 460 # upload_start_point allows the code that instantiated the 461 # ResumableUploadHandler to find out the point from which it started 462 # uploading (e.g., so it can correctly compute throughput). 463 if self.upload_start_point is None: 464 self.upload_start_point = server_end 465 466 total_bytes_uploaded = server_end + 1 467 # Corner case: Don't attempt to seek if we've already uploaded the 468 # entire file, because if the file is a stream (e.g., the KeyFile 469 # wrapper around input key when copying between providers), attempting 470 # to seek to the end of file would result in an InvalidRange error. 471 if file_length < total_bytes_uploaded: 472 fp.seek(total_bytes_uploaded) 473 conn = key.bucket.connection 474 475 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses 476 # pool connections) because httplib requires a new HTTP connection per 477 # transaction. (Without this, calling http_conn.getresponse() would get 478 # "ResponseNotReady".) 479 http_conn = conn.new_http_connection(self.tracker_uri_host, conn.port, 480 conn.is_secure) 481 http_conn.set_debuglevel(conn.debug) 482 483 # Make sure to close http_conn at end so if a local file read 484 # failure occurs partway through server will terminate current upload 485 # and can report that progress on next attempt. 486 try: 487 return self._upload_file_bytes(conn, http_conn, fp, file_length, 488 total_bytes_uploaded, cb, num_cb, 489 headers) 490 except (ResumableUploadException, socket.error): 491 resp = self._query_server_state(conn, file_length) 492 if resp.status == 400: 493 raise ResumableUploadException('Got 400 response from server ' 494 'state query after failed resumable upload attempt. This ' 495 'can happen for various reasons, including specifying an ' 496 'invalid request (e.g., an invalid canned ACL) or if the ' 497 'file size changed between upload attempts', 498 ResumableTransferDisposition.ABORT) 499 else: 500 raise 501 finally: 502 http_conn.close() 503 504 def _check_final_md5(self, key, etag): 505 """ 506 Checks that etag from server agrees with md5 computed before upload. 507 This is important, since the upload could have spanned a number of 508 hours and multiple processes (e.g., gsutil runs), and the user could 509 change some of the file and not realize they have inconsistent data. 510 """ 511 if key.bucket.connection.debug >= 1: 512 print('Checking md5 against etag.') 513 if key.md5 != etag.strip('"\''): 514 # Call key.open_read() before attempting to delete the 515 # (incorrect-content) key, so we perform that request on a 516 # different HTTP connection. This is neededb because httplib 517 # will return a "Response not ready" error if you try to perform 518 # a second transaction on the connection. 519 key.open_read() 520 key.close() 521 key.delete() 522 raise ResumableUploadException( 523 'File changed during upload: md5 signature doesn\'t match etag ' 524 '(incorrect uploaded object deleted)', 525 ResumableTransferDisposition.ABORT) 526 527 def handle_resumable_upload_exception(self, e, debug): 528 if (e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS): 529 if debug >= 1: 530 print('Caught non-retryable ResumableUploadException (%s); ' 531 'aborting but retaining tracker file' % e.message) 532 raise 533 elif (e.disposition == ResumableTransferDisposition.ABORT): 534 if debug >= 1: 535 print('Caught non-retryable ResumableUploadException (%s); ' 536 'aborting and removing tracker file' % e.message) 537 self._remove_tracker_file() 538 raise 539 else: 540 if debug >= 1: 541 print('Caught ResumableUploadException (%s) - will retry' % 542 e.message) 543 544 def track_progress_less_iterations(self, server_had_bytes_before_attempt, 545 roll_back_md5=True, debug=0): 546 # At this point we had a re-tryable failure; see if made progress. 547 if self.server_has_bytes > server_had_bytes_before_attempt: 548 self.progress_less_iterations = 0 # If progress, reset counter. 549 else: 550 self.progress_less_iterations += 1 551 if roll_back_md5: 552 # Rollback any potential hash updates, as we did not 553 # make any progress in this iteration. 554 self.digesters = self.digesters_before_attempt 555 556 if self.progress_less_iterations > self.num_retries: 557 # Don't retry any longer in the current process. 558 raise ResumableUploadException( 559 'Too many resumable upload attempts failed without ' 560 'progress. You might try this upload again later', 561 ResumableTransferDisposition.ABORT_CUR_PROCESS) 562 563 # Use binary exponential backoff to desynchronize client requests. 564 sleep_time_secs = random.random() * (2**self.progress_less_iterations) 565 if debug >= 1: 566 print('Got retryable failure (%d progress-less in a row).\n' 567 'Sleeping %3.1f seconds before re-trying' % 568 (self.progress_less_iterations, sleep_time_secs)) 569 time.sleep(sleep_time_secs) 570 571 def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None): 572 """ 573 Upload a file to a key into a bucket on GS, using GS resumable upload 574 protocol. 575 576 :type key: :class:`boto.s3.key.Key` or subclass 577 :param key: The Key object to which data is to be uploaded 578 579 :type fp: file-like object 580 :param fp: The file pointer to upload 581 582 :type headers: dict 583 :param headers: The headers to pass along with the PUT request 584 585 :type cb: function 586 :param cb: a callback function that will be called to report progress on 587 the upload. The callback should accept two integer parameters, the 588 first representing the number of bytes that have been successfully 589 transmitted to GS, and the second representing the total number of 590 bytes that need to be transmitted. 591 592 :type num_cb: int 593 :param num_cb: (optional) If a callback is specified with the cb 594 parameter, this parameter determines the granularity of the callback 595 by defining the maximum number of times the callback will be called 596 during the file transfer. Providing a negative integer will cause 597 your callback to be called with each buffer read. 598 599 :type hash_algs: dictionary 600 :param hash_algs: (optional) Dictionary mapping hash algorithm 601 descriptions to corresponding state-ful hashing objects that 602 implement update(), digest(), and copy() (e.g. hashlib.md5()). 603 Defaults to {'md5': md5()}. 604 605 Raises ResumableUploadException if a problem occurs during the transfer. 606 """ 607 608 if not headers: 609 headers = {} 610 # If Content-Type header is present and set to None, remove it. 611 # This is gsutil's way of asking boto to refrain from auto-generating 612 # that header. 613 CT = 'Content-Type' 614 if CT in headers and headers[CT] is None: 615 del headers[CT] 616 617 headers['User-Agent'] = UserAgent 618 619 # Determine file size different ways for case where fp is actually a 620 # wrapper around a Key vs an actual file. 621 if isinstance(fp, KeyFile): 622 file_length = fp.getkey().size 623 else: 624 fp.seek(0, os.SEEK_END) 625 file_length = fp.tell() 626 fp.seek(0) 627 debug = key.bucket.connection.debug 628 629 # Compute the MD5 checksum on the fly. 630 if hash_algs is None: 631 hash_algs = {'md5': md5} 632 self.digesters = dict( 633 (alg, hash_algs[alg]()) for alg in hash_algs or {}) 634 635 # Use num-retries from constructor if one was provided; else check 636 # for a value specified in the boto config file; else default to 5. 637 if self.num_retries is None: 638 self.num_retries = config.getint('Boto', 'num_retries', 6) 639 self.progress_less_iterations = 0 640 641 while True: # Retry as long as we're making progress. 642 server_had_bytes_before_attempt = self.server_has_bytes 643 self.digesters_before_attempt = dict( 644 (alg, self.digesters[alg].copy()) 645 for alg in self.digesters) 646 try: 647 # Save generation and metageneration in class state so caller 648 # can find these values, for use in preconditions of future 649 # operations on the uploaded object. 650 (etag, self.generation, self.metageneration) = ( 651 self._attempt_resumable_upload(key, fp, file_length, 652 headers, cb, num_cb)) 653 654 # Get the final digests for the uploaded content. 655 for alg in self.digesters: 656 key.local_hashes[alg] = self.digesters[alg].digest() 657 658 # Upload succceded, so remove the tracker file (if have one). 659 self._remove_tracker_file() 660 self._check_final_md5(key, etag) 661 key.generation = self.generation 662 if debug >= 1: 663 print('Resumable upload complete.') 664 return 665 except self.RETRYABLE_EXCEPTIONS as e: 666 if debug >= 1: 667 print('Caught exception (%s)' % e.__repr__()) 668 if isinstance(e, IOError) and e.errno == errno.EPIPE: 669 # Broken pipe error causes httplib to immediately 670 # close the socket (http://bugs.python.org/issue5542), 671 # so we need to close the connection before we resume 672 # the upload (which will cause a new connection to be 673 # opened the next time an HTTP request is sent). 674 key.bucket.connection.connection.close() 675 except ResumableUploadException as e: 676 self.handle_resumable_upload_exception(e, debug) 677 678 self.track_progress_less_iterations(server_had_bytes_before_attempt, 679 True, debug) 680