1# Copyright 2010 Google Inc.
2#
3# Permission is hereby granted, free of charge, to any person obtaining a
4# copy of this software and associated documentation files (the
5# "Software"), to deal in the Software without restriction, including
6# without limitation the rights to use, copy, modify, merge, publish, dis-
7# tribute, sublicense, and/or sell copies of the Software, and to permit
8# persons to whom the Software is furnished to do so, subject to the fol-
9# lowing conditions:
10#
11# The above copyright notice and this permission notice shall be included
12# in all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20# IN THE SOFTWARE.
21import errno
22import httplib
23import os
24import random
25import re
26import socket
27import time
28import urlparse
29from hashlib import md5
30from boto import config, UserAgent
31from boto.connection import AWSAuthConnection
32from boto.exception import InvalidUriError
33from boto.exception import ResumableTransferDisposition
34from boto.exception import ResumableUploadException
35from boto.s3.keyfile import KeyFile
36
37"""
38Handler for Google Cloud Storage resumable uploads. See
39http://code.google.com/apis/storage/docs/developer-guide.html#resumable
40for details.
41
42Resumable uploads will retry failed uploads, resuming at the byte
43count completed by the last upload attempt. If too many retries happen with
44no progress (per configurable num_retries param), the upload will be
45aborted in the current process.
46
47The caller can optionally specify a tracker_file_name param in the
48ResumableUploadHandler constructor. If you do this, that file will
49save the state needed to allow retrying later, in a separate process
50(e.g., in a later run of gsutil).
51"""
52
53
54class ResumableUploadHandler(object):
55
56    BUFFER_SIZE = 8192
57    RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
58                            socket.gaierror)
59
60    # (start, end) response indicating server has nothing (upload protocol uses
61    # inclusive numbering).
62    SERVER_HAS_NOTHING = (0, -1)
63
64    def __init__(self, tracker_file_name=None, num_retries=None):
65        """
66        Constructor. Instantiate once for each uploaded file.
67
68        :type tracker_file_name: string
69        :param tracker_file_name: optional file name to save tracker URI.
70            If supplied and the current process fails the upload, it can be
71            retried in a new process. If called with an existing file containing
72            a valid tracker URI, we'll resume the upload from this URI; else
73            we'll start a new resumable upload (and write the URI to this
74            tracker file).
75
76        :type num_retries: int
77        :param num_retries: the number of times we'll re-try a resumable upload
78            making no progress. (Count resets every time we get progress, so
79            upload can span many more than this number of retries.)
80        """
81        self.tracker_file_name = tracker_file_name
82        self.num_retries = num_retries
83        self.server_has_bytes = 0  # Byte count at last server check.
84        self.tracker_uri = None
85        if tracker_file_name:
86            self._load_tracker_uri_from_file()
87        # Save upload_start_point in instance state so caller can find how
88        # much was transferred by this ResumableUploadHandler (across retries).
89        self.upload_start_point = None
90
91    def _load_tracker_uri_from_file(self):
92        f = None
93        try:
94            f = open(self.tracker_file_name, 'r')
95            uri = f.readline().strip()
96            self._set_tracker_uri(uri)
97        except IOError as e:
98            # Ignore non-existent file (happens first time an upload
99            # is attempted on a file), but warn user for other errors.
100            if e.errno != errno.ENOENT:
101                # Will restart because self.tracker_uri is None.
102                print('Couldn\'t read URI tracker file (%s): %s. Restarting '
103                      'upload from scratch.' %
104                      (self.tracker_file_name, e.strerror))
105        except InvalidUriError as e:
106            # Warn user, but proceed (will restart because
107            # self.tracker_uri is None).
108            print('Invalid tracker URI (%s) found in URI tracker file '
109                  '(%s). Restarting upload from scratch.' %
110                  (uri, self.tracker_file_name))
111        finally:
112            if f:
113                f.close()
114
115    def _save_tracker_uri_to_file(self):
116        """
117        Saves URI to tracker file if one was passed to constructor.
118        """
119        if not self.tracker_file_name:
120            return
121        f = None
122        try:
123            with os.fdopen(os.open(self.tracker_file_name,
124                                   os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f:
125              f.write(self.tracker_uri)
126        except IOError as e:
127            raise ResumableUploadException(
128                'Couldn\'t write URI tracker file (%s): %s.\nThis can happen'
129                'if you\'re using an incorrectly configured upload tool\n'
130                '(e.g., gsutil configured to save tracker files to an '
131                'unwritable directory)' %
132                (self.tracker_file_name, e.strerror),
133                ResumableTransferDisposition.ABORT)
134
135    def _set_tracker_uri(self, uri):
136        """
137        Called when we start a new resumable upload or get a new tracker
138        URI for the upload. Saves URI and resets upload state.
139
140        Raises InvalidUriError if URI is syntactically invalid.
141        """
142        parse_result = urlparse.urlparse(uri)
143        if (parse_result.scheme.lower() not in ['http', 'https'] or
144            not parse_result.netloc):
145            raise InvalidUriError('Invalid tracker URI (%s)' % uri)
146        self.tracker_uri = uri
147        self.tracker_uri_host = parse_result.netloc
148        self.tracker_uri_path = '%s?%s' % (
149            parse_result.path, parse_result.query)
150        self.server_has_bytes = 0
151
152    def get_tracker_uri(self):
153        """
154        Returns upload tracker URI, or None if the upload has not yet started.
155        """
156        return self.tracker_uri
157
158    def get_upload_id(self):
159        """
160        Returns the upload ID for the resumable upload, or None if the upload
161        has not yet started.
162        """
163        # We extract the upload_id from the tracker uri. We could retrieve the
164        # upload_id from the headers in the response but this only works for
165        # the case where we get the tracker uri from the service. In the case
166        # where we get the tracker from the tracking file we need to do this
167        # logic anyway.
168        delim = '?upload_id='
169        if self.tracker_uri and delim in self.tracker_uri:
170          return self.tracker_uri[self.tracker_uri.index(delim) + len(delim):]
171        else:
172          return None
173
174    def _remove_tracker_file(self):
175        if (self.tracker_file_name and
176            os.path.exists(self.tracker_file_name)):
177                os.unlink(self.tracker_file_name)
178
179    def _build_content_range_header(self, range_spec='*', length_spec='*'):
180        return 'bytes %s/%s' % (range_spec, length_spec)
181
182    def _query_server_state(self, conn, file_length):
183        """
184        Queries server to find out state of given upload.
185
186        Note that this method really just makes special case use of the
187        fact that the upload server always returns the current start/end
188        state whenever a PUT doesn't complete.
189
190        Returns HTTP response from sending request.
191
192        Raises ResumableUploadException if problem querying server.
193        """
194        # Send an empty PUT so that server replies with this resumable
195        # transfer's state.
196        put_headers = {}
197        put_headers['Content-Range'] = (
198            self._build_content_range_header('*', file_length))
199        put_headers['Content-Length'] = '0'
200        return AWSAuthConnection.make_request(conn, 'PUT',
201                                              path=self.tracker_uri_path,
202                                              auth_path=self.tracker_uri_path,
203                                              headers=put_headers,
204                                              host=self.tracker_uri_host)
205
206    def _query_server_pos(self, conn, file_length):
207        """
208        Queries server to find out what bytes it currently has.
209
210        Returns (server_start, server_end), where the values are inclusive.
211        For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
212
213        Raises ResumableUploadException if problem querying server.
214        """
215        resp = self._query_server_state(conn, file_length)
216        if resp.status == 200:
217            # To handle the boundary condition where the server has the complete
218            # file, we return (server_start, file_length-1). That way the
219            # calling code can always simply read up through server_end. (If we
220            # didn't handle this boundary condition here, the caller would have
221            # to check whether server_end == file_length and read one fewer byte
222            # in that case.)
223            return (0, file_length - 1)  # Completed upload.
224        if resp.status != 308:
225            # This means the server didn't have any state for the given
226            # upload ID, which can happen (for example) if the caller saved
227            # the tracker URI to a file and then tried to restart the transfer
228            # after that upload ID has gone stale. In that case we need to
229            # start a new transfer (and the caller will then save the new
230            # tracker URI to the tracker file).
231            raise ResumableUploadException(
232                'Got non-308 response (%s) from server state query' %
233                resp.status, ResumableTransferDisposition.START_OVER)
234        got_valid_response = False
235        range_spec = resp.getheader('range')
236        if range_spec:
237            # Parse 'bytes=<from>-<to>' range_spec.
238            m = re.search('bytes=(\d+)-(\d+)', range_spec)
239            if m:
240                server_start = long(m.group(1))
241                server_end = long(m.group(2))
242                got_valid_response = True
243        else:
244            # No Range header, which means the server does not yet have
245            # any bytes. Note that the Range header uses inclusive 'from'
246            # and 'to' values. Since Range 0-0 would mean that the server
247            # has byte 0, omitting the Range header is used to indicate that
248            # the server doesn't have any bytes.
249            return self.SERVER_HAS_NOTHING
250        if not got_valid_response:
251            raise ResumableUploadException(
252                'Couldn\'t parse upload server state query response (%s)' %
253                str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
254        if conn.debug >= 1:
255            print('Server has: Range: %d - %d.' % (server_start, server_end))
256        return (server_start, server_end)
257
258    def _start_new_resumable_upload(self, key, headers=None):
259        """
260        Starts a new resumable upload.
261
262        Raises ResumableUploadException if any errors occur.
263        """
264        conn = key.bucket.connection
265        if conn.debug >= 1:
266            print('Starting new resumable upload.')
267        self.server_has_bytes = 0
268
269        # Start a new resumable upload by sending a POST request with an
270        # empty body and the "X-Goog-Resumable: start" header. Include any
271        # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
272        # (and raise an exception if they tried to pass one, since it's
273        # a semantic error to specify it at this point, and if we were to
274        # include one now it would cause the server to expect that many
275        # bytes; the POST doesn't include the actual file bytes  We set
276        # the Content-Length in the subsequent PUT, based on the uploaded
277        # file size.
278        post_headers = {}
279        for k in headers:
280            if k.lower() == 'content-length':
281                raise ResumableUploadException(
282                    'Attempt to specify Content-Length header (disallowed)',
283                    ResumableTransferDisposition.ABORT)
284            post_headers[k] = headers[k]
285        post_headers[conn.provider.resumable_upload_header] = 'start'
286
287        resp = conn.make_request(
288            'POST', key.bucket.name, key.name, post_headers)
289        # Get tracker URI from response 'Location' header.
290        body = resp.read()
291
292        # Check for various status conditions.
293        if resp.status in [500, 503]:
294            # Retry status 500 and 503 errors after a delay.
295            raise ResumableUploadException(
296                'Got status %d from attempt to start resumable upload. '
297                'Will wait/retry' % resp.status,
298                ResumableTransferDisposition.WAIT_BEFORE_RETRY)
299        elif resp.status != 200 and resp.status != 201:
300            raise ResumableUploadException(
301                'Got status %d from attempt to start resumable upload. '
302                'Aborting' % resp.status,
303                ResumableTransferDisposition.ABORT)
304
305        # Else we got 200 or 201 response code, indicating the resumable
306        # upload was created.
307        tracker_uri = resp.getheader('Location')
308        if not tracker_uri:
309            raise ResumableUploadException(
310                'No resumable tracker URI found in resumable initiation '
311                'POST response (%s)' % body,
312                ResumableTransferDisposition.WAIT_BEFORE_RETRY)
313        self._set_tracker_uri(tracker_uri)
314        self._save_tracker_uri_to_file()
315
316    def _upload_file_bytes(self, conn, http_conn, fp, file_length,
317                           total_bytes_uploaded, cb, num_cb, headers):
318        """
319        Makes one attempt to upload file bytes, using an existing resumable
320        upload connection.
321
322        Returns (etag, generation, metageneration) from server upon success.
323
324        Raises ResumableUploadException if any problems occur.
325        """
326        buf = fp.read(self.BUFFER_SIZE)
327        if cb:
328            # The cb_count represents the number of full buffers to send between
329            # cb executions.
330            if num_cb > 2:
331                cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
332            elif num_cb < 0:
333                cb_count = -1
334            else:
335                cb_count = 0
336            i = 0
337            cb(total_bytes_uploaded, file_length)
338
339        # Build resumable upload headers for the transfer. Don't send a
340        # Content-Range header if the file is 0 bytes long, because the
341        # resumable upload protocol uses an *inclusive* end-range (so, sending
342        # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
343        if not headers:
344          put_headers = {}
345        else:
346          put_headers = headers.copy()
347        if file_length:
348            if total_bytes_uploaded == file_length:
349                range_header = self._build_content_range_header(
350                    '*', file_length)
351            else:
352                range_header = self._build_content_range_header(
353                    '%d-%d' % (total_bytes_uploaded, file_length - 1),
354                    file_length)
355            put_headers['Content-Range'] = range_header
356        # Set Content-Length to the total bytes we'll send with this PUT.
357        put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
358        http_request = AWSAuthConnection.build_base_http_request(
359            conn, 'PUT', path=self.tracker_uri_path, auth_path=None,
360            headers=put_headers, host=self.tracker_uri_host)
361        http_conn.putrequest('PUT', http_request.path)
362        for k in put_headers:
363            http_conn.putheader(k, put_headers[k])
364        http_conn.endheaders()
365
366        # Turn off debug on http connection so upload content isn't included
367        # in debug stream.
368        http_conn.set_debuglevel(0)
369        while buf:
370            http_conn.send(buf)
371            for alg in self.digesters:
372                self.digesters[alg].update(buf)
373            total_bytes_uploaded += len(buf)
374            if cb:
375                i += 1
376                if i == cb_count or cb_count == -1:
377                    cb(total_bytes_uploaded, file_length)
378                    i = 0
379            buf = fp.read(self.BUFFER_SIZE)
380        http_conn.set_debuglevel(conn.debug)
381        if cb:
382            cb(total_bytes_uploaded, file_length)
383        if total_bytes_uploaded != file_length:
384            # Abort (and delete the tracker file) so if the user retries
385            # they'll start a new resumable upload rather than potentially
386            # attempting to pick back up later where we left off.
387            raise ResumableUploadException(
388                'File changed during upload: EOF at %d bytes of %d byte file.' %
389                (total_bytes_uploaded, file_length),
390                ResumableTransferDisposition.ABORT)
391        resp = http_conn.getresponse()
392        # Restore http connection debug level.
393        http_conn.set_debuglevel(conn.debug)
394
395        if resp.status == 200:
396            # Success.
397            return (resp.getheader('etag'),
398                    resp.getheader('x-goog-generation'),
399                    resp.getheader('x-goog-metageneration'))
400        # Retry timeout (408) and status 500 and 503 errors after a delay.
401        elif resp.status in [408, 500, 503]:
402            disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
403        else:
404            # Catch all for any other error codes.
405            disposition = ResumableTransferDisposition.ABORT
406        raise ResumableUploadException('Got response code %d while attempting '
407                                       'upload (%s)' %
408                                       (resp.status, resp.reason), disposition)
409
410    def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
411                                  num_cb):
412        """
413        Attempts a resumable upload.
414
415        Returns (etag, generation, metageneration) from server upon success.
416
417        Raises ResumableUploadException if any problems occur.
418        """
419        (server_start, server_end) = self.SERVER_HAS_NOTHING
420        conn = key.bucket.connection
421        if self.tracker_uri:
422            # Try to resume existing resumable upload.
423            try:
424                (server_start, server_end) = (
425                    self._query_server_pos(conn, file_length))
426                self.server_has_bytes = server_start
427
428                if server_end:
429                  # If the server already has some of the content, we need to
430                  # update the digesters with the bytes that have already been
431                  # uploaded to ensure we get a complete hash in the end.
432                  print('Catching up hash digest(s) for resumed upload')
433                  fp.seek(0)
434                  # Read local file's bytes through position server has. For
435                  # example, if server has (0, 3) we want to read 3-0+1=4 bytes.
436                  bytes_to_go = server_end + 1
437                  while bytes_to_go:
438                      chunk = fp.read(min(key.BufferSize, bytes_to_go))
439                      if not chunk:
440                          raise ResumableUploadException(
441                              'Hit end of file during resumable upload hash '
442                              'catchup. This should not happen under\n'
443                              'normal circumstances, as it indicates the '
444                              'server has more bytes of this transfer\nthan'
445                              ' the current file size. Restarting upload.',
446                              ResumableTransferDisposition.START_OVER)
447                      for alg in self.digesters:
448                          self.digesters[alg].update(chunk)
449                      bytes_to_go -= len(chunk)
450
451                if conn.debug >= 1:
452                    print('Resuming transfer.')
453            except ResumableUploadException as e:
454                if conn.debug >= 1:
455                    print('Unable to resume transfer (%s).' % e.message)
456                self._start_new_resumable_upload(key, headers)
457        else:
458            self._start_new_resumable_upload(key, headers)
459
460        # upload_start_point allows the code that instantiated the
461        # ResumableUploadHandler to find out the point from which it started
462        # uploading (e.g., so it can correctly compute throughput).
463        if self.upload_start_point is None:
464            self.upload_start_point = server_end
465
466        total_bytes_uploaded = server_end + 1
467        # Corner case: Don't attempt to seek if we've already uploaded the
468        # entire file, because if the file is a stream (e.g., the KeyFile
469        # wrapper around input key when copying between providers), attempting
470        # to seek to the end of file would result in an InvalidRange error.
471        if file_length < total_bytes_uploaded:
472          fp.seek(total_bytes_uploaded)
473        conn = key.bucket.connection
474
475        # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
476        # pool connections) because httplib requires a new HTTP connection per
477        # transaction. (Without this, calling http_conn.getresponse() would get
478        # "ResponseNotReady".)
479        http_conn = conn.new_http_connection(self.tracker_uri_host, conn.port,
480                                             conn.is_secure)
481        http_conn.set_debuglevel(conn.debug)
482
483        # Make sure to close http_conn at end so if a local file read
484        # failure occurs partway through server will terminate current upload
485        # and can report that progress on next attempt.
486        try:
487            return self._upload_file_bytes(conn, http_conn, fp, file_length,
488                                           total_bytes_uploaded, cb, num_cb,
489                                           headers)
490        except (ResumableUploadException, socket.error):
491            resp = self._query_server_state(conn, file_length)
492            if resp.status == 400:
493                raise ResumableUploadException('Got 400 response from server '
494                    'state query after failed resumable upload attempt. This '
495                    'can happen for various reasons, including specifying an '
496                    'invalid request (e.g., an invalid canned ACL) or if the '
497                    'file size changed between upload attempts',
498                    ResumableTransferDisposition.ABORT)
499            else:
500                raise
501        finally:
502            http_conn.close()
503
504    def _check_final_md5(self, key, etag):
505        """
506        Checks that etag from server agrees with md5 computed before upload.
507        This is important, since the upload could have spanned a number of
508        hours and multiple processes (e.g., gsutil runs), and the user could
509        change some of the file and not realize they have inconsistent data.
510        """
511        if key.bucket.connection.debug >= 1:
512            print('Checking md5 against etag.')
513        if key.md5 != etag.strip('"\''):
514            # Call key.open_read() before attempting to delete the
515            # (incorrect-content) key, so we perform that request on a
516            # different HTTP connection. This is neededb because httplib
517            # will return a "Response not ready" error if you try to perform
518            # a second transaction on the connection.
519            key.open_read()
520            key.close()
521            key.delete()
522            raise ResumableUploadException(
523                'File changed during upload: md5 signature doesn\'t match etag '
524                '(incorrect uploaded object deleted)',
525                ResumableTransferDisposition.ABORT)
526
527    def handle_resumable_upload_exception(self, e, debug):
528        if (e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS):
529            if debug >= 1:
530                print('Caught non-retryable ResumableUploadException (%s); '
531                      'aborting but retaining tracker file' % e.message)
532            raise
533        elif (e.disposition == ResumableTransferDisposition.ABORT):
534            if debug >= 1:
535                print('Caught non-retryable ResumableUploadException (%s); '
536                      'aborting and removing tracker file' % e.message)
537            self._remove_tracker_file()
538            raise
539        else:
540            if debug >= 1:
541                print('Caught ResumableUploadException (%s) - will retry' %
542                      e.message)
543
544    def track_progress_less_iterations(self, server_had_bytes_before_attempt,
545                                       roll_back_md5=True, debug=0):
546        # At this point we had a re-tryable failure; see if made progress.
547        if self.server_has_bytes > server_had_bytes_before_attempt:
548            self.progress_less_iterations = 0   # If progress, reset counter.
549        else:
550            self.progress_less_iterations += 1
551            if roll_back_md5:
552                # Rollback any potential hash updates, as we did not
553                # make any progress in this iteration.
554                self.digesters = self.digesters_before_attempt
555
556        if self.progress_less_iterations > self.num_retries:
557            # Don't retry any longer in the current process.
558            raise ResumableUploadException(
559                    'Too many resumable upload attempts failed without '
560                    'progress. You might try this upload again later',
561                    ResumableTransferDisposition.ABORT_CUR_PROCESS)
562
563        # Use binary exponential backoff to desynchronize client requests.
564        sleep_time_secs = random.random() * (2**self.progress_less_iterations)
565        if debug >= 1:
566            print('Got retryable failure (%d progress-less in a row).\n'
567                   'Sleeping %3.1f seconds before re-trying' %
568                   (self.progress_less_iterations, sleep_time_secs))
569        time.sleep(sleep_time_secs)
570
571    def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None):
572        """
573        Upload a file to a key into a bucket on GS, using GS resumable upload
574        protocol.
575
576        :type key: :class:`boto.s3.key.Key` or subclass
577        :param key: The Key object to which data is to be uploaded
578
579        :type fp: file-like object
580        :param fp: The file pointer to upload
581
582        :type headers: dict
583        :param headers: The headers to pass along with the PUT request
584
585        :type cb: function
586        :param cb: a callback function that will be called to report progress on
587            the upload.  The callback should accept two integer parameters, the
588            first representing the number of bytes that have been successfully
589            transmitted to GS, and the second representing the total number of
590            bytes that need to be transmitted.
591
592        :type num_cb: int
593        :param num_cb: (optional) If a callback is specified with the cb
594            parameter, this parameter determines the granularity of the callback
595            by defining the maximum number of times the callback will be called
596            during the file transfer. Providing a negative integer will cause
597            your callback to be called with each buffer read.
598
599        :type hash_algs: dictionary
600        :param hash_algs: (optional) Dictionary mapping hash algorithm
601            descriptions to corresponding state-ful hashing objects that
602            implement update(), digest(), and copy() (e.g. hashlib.md5()).
603            Defaults to {'md5': md5()}.
604
605        Raises ResumableUploadException if a problem occurs during the transfer.
606        """
607
608        if not headers:
609            headers = {}
610        # If Content-Type header is present and set to None, remove it.
611        # This is gsutil's way of asking boto to refrain from auto-generating
612        # that header.
613        CT = 'Content-Type'
614        if CT in headers and headers[CT] is None:
615            del headers[CT]
616
617        headers['User-Agent'] = UserAgent
618
619        # Determine file size different ways for case where fp is actually a
620        # wrapper around a Key vs an actual file.
621        if isinstance(fp, KeyFile):
622            file_length = fp.getkey().size
623        else:
624            fp.seek(0, os.SEEK_END)
625            file_length = fp.tell()
626            fp.seek(0)
627        debug = key.bucket.connection.debug
628
629        # Compute the MD5 checksum on the fly.
630        if hash_algs is None:
631            hash_algs = {'md5': md5}
632        self.digesters = dict(
633            (alg, hash_algs[alg]()) for alg in hash_algs or {})
634
635        # Use num-retries from constructor if one was provided; else check
636        # for a value specified in the boto config file; else default to 5.
637        if self.num_retries is None:
638            self.num_retries = config.getint('Boto', 'num_retries', 6)
639        self.progress_less_iterations = 0
640
641        while True:  # Retry as long as we're making progress.
642            server_had_bytes_before_attempt = self.server_has_bytes
643            self.digesters_before_attempt = dict(
644                (alg, self.digesters[alg].copy())
645                for alg in self.digesters)
646            try:
647                # Save generation and metageneration in class state so caller
648                # can find these values, for use in preconditions of future
649                # operations on the uploaded object.
650                (etag, self.generation, self.metageneration) = (
651                    self._attempt_resumable_upload(key, fp, file_length,
652                                                   headers, cb, num_cb))
653
654                # Get the final digests for the uploaded content.
655                for alg in self.digesters:
656                    key.local_hashes[alg] = self.digesters[alg].digest()
657
658                # Upload succceded, so remove the tracker file (if have one).
659                self._remove_tracker_file()
660                self._check_final_md5(key, etag)
661                key.generation = self.generation
662                if debug >= 1:
663                    print('Resumable upload complete.')
664                return
665            except self.RETRYABLE_EXCEPTIONS as e:
666                if debug >= 1:
667                    print('Caught exception (%s)' % e.__repr__())
668                if isinstance(e, IOError) and e.errno == errno.EPIPE:
669                    # Broken pipe error causes httplib to immediately
670                    # close the socket (http://bugs.python.org/issue5542),
671                    # so we need to close the connection before we resume
672                    # the upload (which will cause a new connection to be
673                    # opened the next time an HTTP request is sent).
674                    key.bucket.connection.connection.close()
675            except ResumableUploadException as e:
676                self.handle_resumable_upload_exception(e, debug)
677
678            self.track_progress_less_iterations(server_had_bytes_before_attempt,
679                                                True, debug)
680