1# -*- coding: utf-8 -*-
2
3## Amazon S3 manager
4## Author: Michal Ludvig <michal@logix.cz>
5##         http://www.logix.cz/michal
6## License: GPL Version 2
7## Copyright: TGRMN Software and contributors
8
9from __future__ import absolute_import, division
10
11import sys
12import os
13import time
14import errno
15import mimetypes
16import io
17import pprint
18from xml.sax import saxutils
19from socket import timeout as SocketTimeoutException
20from logging import debug, info, warning, error
21from stat import ST_SIZE
22try:
23    # python 3 support
24    from urlparse import urlparse
25except ImportError:
26    from urllib.parse import urlparse
27try:
28    # Python 2 support
29    from base64 import encodestring
30except ImportError:
31    # Python 3.9.0+ support
32    from base64 import encodebytes as encodestring
33
34import select
35
36try:
37    from hashlib import md5
38except ImportError:
39    from md5 import md5
40
41from .BaseUtils import (getListFromXml, getTextFromXml, getRootTagName,
42                        decode_from_s3, encode_to_s3, s3_quote)
43from .Utils import (convertHeaderTupleListToDict, hash_file_md5, unicodise,
44                    deunicodise, check_bucket_name,
45                    check_bucket_name_dns_support, getHostnameFromBucket,
46                    calculateChecksum)
47from .SortedDict import SortedDict
48from .AccessLog import AccessLog
49from .ACL import ACL, GranteeLogDelivery
50from .BidirMap import BidirMap
51from .Config import Config
52from .Exceptions import *
53from .MultiPart import MultiPartUpload
54from .S3Uri import S3Uri
55from .ConnMan import ConnMan
56from .Crypto import (sign_request_v2, sign_request_v4, checksum_sha256_file,
57                     checksum_sha256_buffer, format_param_str)
58
59try:
60    from ctypes import ArgumentError
61    import magic
62    try:
63        ## https://github.com/ahupp/python-magic
64        ## Always expect unicode for python 2
65        ## (has Magic class but no "open()" function)
66        magic_ = magic.Magic(mime=True)
67        def mime_magic_file(file):
68            return magic_.from_file(file)
69    except TypeError:
70        try:
71            ## file-5.11 built-in python bindings
72            ## Sources: http://www.darwinsys.com/file/
73            ## Expects unicode since version 5.19, encoded strings before
74            ## we can't tell if a given copy of the magic library will take a
75            ## filesystem-encoded string or a unicode value, so try first
76            ## with the unicode, then with the encoded string.
77            ## (has Magic class and "open()" function)
78            magic_ = magic.open(magic.MAGIC_MIME)
79            magic_.load()
80            def mime_magic_file(file):
81                try:
82                    return magic_.file(file)
83                except (UnicodeDecodeError, UnicodeEncodeError, ArgumentError):
84                    return magic_.file(deunicodise(file))
85        except AttributeError:
86            ## http://pypi.python.org/pypi/filemagic
87            ## Accept gracefully both unicode and encoded
88            ## (has Magic class but not "mime" argument and no "open()" function )
89            magic_ = magic.Magic(flags=magic.MAGIC_MIME)
90            def mime_magic_file(file):
91                return magic_.id_filename(file)
92
93    except AttributeError:
94        ## Older python-magic versions doesn't have a "Magic" method
95        ## Only except encoded strings
96        ## (has no Magic class but "open()" function)
97        magic_ = magic.open(magic.MAGIC_MIME)
98        magic_.load()
99        def mime_magic_file(file):
100            return magic_.file(deunicodise(file))
101
102except (ImportError, OSError) as e:
103    error_str = str(e)
104    if 'magic' in error_str:
105        magic_message = "Module python-magic is not available."
106    else:
107        magic_message = "Module python-magic can't be used (%s)." % error_str
108    magic_message += " Guessing MIME types based on file extensions."
109    magic_warned = False
110    def mime_magic_file(file):
111        global magic_warned
112        if (not magic_warned):
113            warning(magic_message)
114            magic_warned = True
115        return mimetypes.guess_type(file)[0]
116
117def mime_magic(file):
118    ## NOTE: So far in the code, "file" var is already unicode
119    def _mime_magic(file):
120        magictype = mime_magic_file(file)
121        return magictype
122
123    result = _mime_magic(file)
124    if result is not None:
125        if isinstance(result, str):
126            if ';' in result:
127                mimetype, charset = result.split(';')
128                charset = charset[len('charset'):]
129                result = (mimetype, charset)
130            else:
131                result = (result, None)
132    if result is None:
133        result = (None, None)
134    return result
135
136
137EXPECT_CONTINUE_TIMEOUT = 2
138SIZE_1MB = 1024 * 1024
139
140__all__ = []
141
142class S3Request(object):
143    region_map = {}
144    ## S3 sometimes sends HTTP-301, HTTP-307 response
145    redir_map = {}
146
147    def __init__(self, s3, method_string, resource, headers, body, params = None):
148        self.s3 = s3
149        self.headers = SortedDict(headers or {}, ignore_case = True)
150        if len(self.s3.config.access_token)>0:
151            self.s3.config.role_refresh()
152            self.headers['x-amz-security-token']=self.s3.config.access_token
153        self.resource = resource
154        self.method_string = method_string
155        self.params = params or {}
156        self.body = body
157        self.requester_pays()
158
159    def requester_pays(self):
160        if self.s3.config.requester_pays and self.method_string in ("GET", "POST", "PUT", "HEAD"):
161            self.headers['x-amz-request-payer'] = 'requester'
162
163    def update_timestamp(self):
164        if "date" in self.headers:
165            del(self.headers["date"])
166        self.headers["x-amz-date"] = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime())
167
168    def use_signature_v2(self):
169        if self.s3.endpoint_requires_signature_v4:
170            return False
171
172        if self.s3.config.signature_v2 or self.s3.fallback_to_signature_v2:
173            return True
174
175        return False
176
177    def sign(self):
178        bucket_name = self.resource.get('bucket')
179
180        if self.use_signature_v2():
181            debug("Using signature v2")
182            if bucket_name:
183                resource_uri = "/%s%s" % (bucket_name, self.resource['uri'])
184            else:
185                resource_uri = self.resource['uri']
186
187            self.headers = sign_request_v2(self.method_string, resource_uri, self.params, self.headers)
188        else:
189            debug("Using signature v4")
190            hostname = self.s3.get_hostname(self.resource['bucket'])
191
192            ## Default to bucket part of DNS.
193            ## If bucket is not part of DNS assume path style to complete the request.
194            ## Like for format_uri, take care that redirection could be to base path
195            if bucket_name and (
196                (bucket_name in S3Request.redir_map
197                 and not S3Request.redir_map.get(bucket_name, '').startswith("%s."% bucket_name))
198                or (bucket_name not in S3Request.redir_map
199                 and not check_bucket_name_dns_support(Config().host_bucket, bucket_name))
200            ):
201                resource_uri = "/%s%s" % (bucket_name, self.resource['uri'])
202            else:
203                resource_uri = self.resource['uri']
204
205            bucket_region = S3Request.region_map.get(self.resource['bucket'], Config().bucket_location)
206            ## Sign the data.
207            self.headers = sign_request_v4(self.method_string, hostname, resource_uri, self.params,
208                                          bucket_region, self.headers, self.body)
209
210    def get_triplet(self):
211        self.update_timestamp()
212        self.sign()
213
214        resource = dict(self.resource)  ## take a copy
215
216        # URL Encode the uri for the http request
217        resource['uri'] = s3_quote(resource['uri'], quote_backslashes=False, unicode_output=True)
218        # Get the final uri by adding the uri parameters
219        resource['uri'] += format_param_str(self.params)
220        return (self.method_string, resource, self.headers)
221
222class S3(object):
223    http_methods = BidirMap(
224        GET = 0x01,
225        PUT = 0x02,
226        HEAD = 0x04,
227        DELETE = 0x08,
228        POST = 0x10,
229        MASK = 0x1F,
230    )
231
232    targets = BidirMap(
233        SERVICE = 0x0100,
234        BUCKET = 0x0200,
235        OBJECT = 0x0400,
236        BATCH = 0x0800,
237        MASK = 0x0700,
238    )
239
240    operations = BidirMap(
241        UNDFINED = 0x0000,
242        LIST_ALL_BUCKETS = targets["SERVICE"] | http_methods["GET"],
243        BUCKET_CREATE = targets["BUCKET"] | http_methods["PUT"],
244        BUCKET_LIST = targets["BUCKET"] | http_methods["GET"],
245        BUCKET_DELETE = targets["BUCKET"] | http_methods["DELETE"],
246        OBJECT_PUT = targets["OBJECT"] | http_methods["PUT"],
247        OBJECT_GET = targets["OBJECT"] | http_methods["GET"],
248        OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"],
249        OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"],
250        OBJECT_POST = targets["OBJECT"] | http_methods["POST"],
251        BATCH_DELETE = targets["BATCH"] | http_methods["POST"],
252    )
253
254    codes = {
255        "NoSuchBucket" : "Bucket '%s' does not exist",
256        "AccessDenied" : "Access to bucket '%s' was denied",
257        "BucketAlreadyExists" : "Bucket '%s' already exists",
258    }
259
260    ## Maximum attempts of re-issuing failed requests
261    _max_retries = 5
262
263    def __init__(self, config):
264        self.config = config
265        self.fallback_to_signature_v2 = False
266        self.endpoint_requires_signature_v4 = False
267        self.expect_continue_not_supported = False
268
269    def storage_class(self):
270        # Note - you cannot specify GLACIER here
271        # https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html
272        cls = 'STANDARD'
273        if self.config.storage_class != "":
274            return self.config.storage_class
275        if self.config.reduced_redundancy:
276            cls = 'REDUCED_REDUNDANCY'
277        return cls
278
279    def get_hostname(self, bucket):
280        if bucket and bucket in S3Request.redir_map:
281            host = S3Request.redir_map[bucket]
282        elif bucket and check_bucket_name_dns_support(self.config.host_bucket, bucket):
283            host = getHostnameFromBucket(bucket)
284        else:
285            host = self.config.host_base.lower()
286        # The following hack is needed because it looks like that some servers
287        # are not respecting the HTTP spec and so will fail the signature check
288        # if the port is specified in the "Host" header for default ports.
289        # STUPIDIEST THING EVER FOR A SERVER...
290        # See: https://github.com/minio/minio/issues/9169
291        if self.config.use_https:
292            if host.endswith(':443'):
293                host = host[:-4]
294        elif host.endswith(':80'):
295            host = host[:-3]
296
297        debug('get_hostname(%s): %s' % (bucket, host))
298        return host
299
300    def set_hostname(self, bucket, redir_hostname):
301        S3Request.redir_map[bucket] = redir_hostname.lower()
302
303    def format_uri(self, resource, base_path=None):
304        bucket_name = resource.get('bucket')
305        if bucket_name and (
306             (bucket_name in S3Request.redir_map
307              and not S3Request.redir_map.get(bucket_name, '').startswith("%s."% bucket_name))
308             or (bucket_name not in S3Request.redir_map
309                and not check_bucket_name_dns_support(self.config.host_bucket, bucket_name))
310            ):
311                uri = "/%s%s" % (s3_quote(bucket_name, quote_backslashes=False,
312                                          unicode_output=True),
313                                 resource['uri'])
314        else:
315            uri = resource['uri']
316        if base_path:
317            uri = "%s%s" % (base_path, uri)
318        if self.config.proxy_host != "" and not self.config.use_https:
319            uri = "http://%s%s" % (self.get_hostname(bucket_name), uri)
320        debug('format_uri(): ' + uri)
321        return uri
322
323    ## Commands / Actions
324    def list_all_buckets(self):
325        request = self.create_request("LIST_ALL_BUCKETS")
326        response = self.send_request(request)
327        response["list"] = getListFromXml(response["data"], "Bucket")
328        return response
329
330    def bucket_list(self, bucket, prefix = None, recursive = None, uri_params = None, limit = -1):
331        item_list = []
332        prefixes = []
333        for truncated, dirs, objects in self.bucket_list_streaming(bucket, prefix, recursive, uri_params, limit):
334            item_list.extend(objects)
335            prefixes.extend(dirs)
336
337        response = {}
338        response['list'] = item_list
339        response['common_prefixes'] = prefixes
340        response['truncated'] = truncated
341        return response
342
343    def bucket_list_streaming(self, bucket, prefix = None, recursive = None, uri_params = None, limit = -1):
344        """ Generator that produces <dir_list>, <object_list> pairs of groups of content of a specified bucket. """
345        def _list_truncated(data):
346            ## <IsTruncated> can either be "true" or "false" or be missing completely
347            is_truncated = getTextFromXml(data, ".//IsTruncated") or "false"
348            return is_truncated.lower() != "false"
349
350        def _get_contents(data):
351            return getListFromXml(data, "Contents")
352
353        def _get_common_prefixes(data):
354            return getListFromXml(data, "CommonPrefixes")
355
356        def _get_next_marker(data, current_list):
357            return getTextFromXml(response["data"], "NextMarker") or current_list[-1]["Key"]
358
359        uri_params = uri_params and uri_params.copy() or {}
360        truncated = True
361        prefixes = []
362
363        num_objects = 0
364        num_prefixes = 0
365        max_keys = limit
366        while truncated:
367            response = self.bucket_list_noparse(bucket, prefix, recursive,
368                                                uri_params, max_keys)
369            current_list = _get_contents(response["data"])
370            current_prefixes = _get_common_prefixes(response["data"])
371            num_objects += len(current_list)
372            num_prefixes += len(current_prefixes)
373            if limit > num_objects + num_prefixes:
374                max_keys = limit - (num_objects + num_prefixes)
375            truncated = _list_truncated(response["data"])
376            if truncated:
377                if limit == -1 or num_objects + num_prefixes < limit:
378                    if current_list:
379                        uri_params['marker'] = \
380                            _get_next_marker(response["data"], current_list)
381                    elif current_prefixes:
382                        uri_params['marker'] = current_prefixes[-1]["Prefix"]
383                    else:
384                        # Unexpectedly, the server lied, and so the previous
385                        # response was not truncated. So, no new key to get.
386                        yield False, current_prefixes, current_list
387                        break
388                    debug("Listing continues after '%s'" % uri_params['marker'])
389                else:
390                    yield truncated, current_prefixes, current_list
391                    break
392
393            yield truncated, current_prefixes, current_list
394
395    def bucket_list_noparse(self, bucket, prefix = None, recursive = None, uri_params = None, max_keys = -1):
396        if uri_params is None:
397            uri_params = {}
398        if prefix:
399            uri_params['prefix'] = prefix
400        if not self.config.recursive and not recursive:
401            uri_params['delimiter'] = "/"
402        if max_keys != -1:
403            uri_params['max-keys'] = str(max_keys)
404        request = self.create_request("BUCKET_LIST", bucket = bucket, uri_params = uri_params)
405        response = self.send_request(request)
406        #debug(response)
407        return response
408
409    def bucket_create(self, bucket, bucket_location = None, extra_headers = None):
410        headers = SortedDict(ignore_case = True)
411        if extra_headers:
412            headers.update(extra_headers)
413
414        body = ""
415        if bucket_location and bucket_location.strip().upper() != "US" and bucket_location.strip().lower() != "us-east-1":
416            bucket_location = bucket_location.strip()
417            if bucket_location.upper() == "EU":
418                bucket_location = bucket_location.upper()
419            body  = "<CreateBucketConfiguration><LocationConstraint>"
420            body += bucket_location
421            body += "</LocationConstraint></CreateBucketConfiguration>"
422            debug("bucket_location: " + body)
423            check_bucket_name(bucket, dns_strict = True)
424        else:
425            check_bucket_name(bucket, dns_strict = False)
426        if self.config.acl_public:
427            headers["x-amz-acl"] = "public-read"
428
429        request = self.create_request("BUCKET_CREATE", bucket = bucket, headers = headers, body = body)
430        response = self.send_request(request)
431        return response
432
433    def bucket_delete(self, bucket):
434        request = self.create_request("BUCKET_DELETE", bucket = bucket)
435        response = self.send_request(request)
436        return response
437
438    def get_bucket_location(self, uri, force_us_default=False):
439        bucket = uri.bucket()
440        request = self.create_request("BUCKET_LIST", bucket = uri.bucket(),
441                                      uri_params = {'location': None})
442
443        saved_redir_map = S3Request.redir_map.get(bucket, '')
444        saved_region_map = S3Request.region_map.get(bucket, '')
445
446        try:
447            if force_us_default and not (saved_redir_map and saved_region_map):
448                S3Request.redir_map[bucket] = self.config.host_base
449                S3Request.region_map[bucket] = 'us-east-1'
450
451            response = self.send_request(request)
452        finally:
453            if bucket in saved_redir_map:
454                S3Request.redir_map[bucket] = saved_redir_map
455            elif bucket in S3Request.redir_map:
456                del S3Request.redir_map[bucket]
457
458            if bucket in saved_region_map:
459                S3Request.region_map[bucket] = saved_region_map
460            elif bucket in S3Request.region_map:
461                del S3Request.region_map[bucket]
462
463
464        location = getTextFromXml(response['data'], "LocationConstraint")
465        if not location or location in [ "", "US" ]:
466            location = "us-east-1"
467        elif location == "EU":
468            location = "eu-west-1"
469        return location
470
471    def get_bucket_requester_pays(self, uri):
472        request = self.create_request("BUCKET_LIST", bucket=uri.bucket(),
473                                      uri_params={'requestPayment': None})
474        response = self.send_request(request)
475        resp_data = response.get('data', '')
476        if resp_data:
477            payer = getTextFromXml(response['data'], "Payer")
478        else:
479            payer = None
480        return payer
481
482    def bucket_info(self, uri):
483        response = {}
484        response['bucket-location'] = self.get_bucket_location(uri)
485        try:
486            response['requester-pays'] = self.get_bucket_requester_pays(uri)
487        except S3Error as e:
488            response['requester-pays'] = None
489        return response
490
491    def website_info(self, uri, bucket_location = None):
492        bucket = uri.bucket()
493
494        request = self.create_request("BUCKET_LIST", bucket = bucket,
495                                      uri_params = {'website': None})
496        try:
497            response = self.send_request(request)
498            response['index_document'] = getTextFromXml(response['data'], ".//IndexDocument//Suffix")
499            response['error_document'] = getTextFromXml(response['data'], ".//ErrorDocument//Key")
500            response['website_endpoint'] = self.config.website_endpoint % {
501                "bucket" : uri.bucket(),
502                "location" : self.get_bucket_location(uri)}
503            return response
504        except S3Error as e:
505            if e.status == 404:
506                debug("Could not get /?website - website probably not configured for this bucket")
507                return None
508            raise
509
510    def website_create(self, uri, bucket_location = None):
511        bucket = uri.bucket()
512        body = '<WebsiteConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
513        body += '  <IndexDocument>'
514        body += ('    <Suffix>%s</Suffix>' % self.config.website_index)
515        body += '  </IndexDocument>'
516        if self.config.website_error:
517            body += '  <ErrorDocument>'
518            body += ('    <Key>%s</Key>' % self.config.website_error)
519            body += '  </ErrorDocument>'
520        body += '</WebsiteConfiguration>'
521
522        request = self.create_request("BUCKET_CREATE", bucket = bucket, body = body,
523                                      uri_params = {'website': None})
524        response = self.send_request(request)
525        debug("Received response '%s'" % (response))
526
527        return response
528
529    def website_delete(self, uri, bucket_location = None):
530        bucket = uri.bucket()
531
532        request = self.create_request("BUCKET_DELETE", bucket = bucket,
533                                      uri_params = {'website': None})
534        response = self.send_request(request)
535        debug("Received response '%s'" % (response))
536
537        if response['status'] != 204:
538            raise S3ResponseError("Expected status 204: %s" % response)
539
540        return response
541
542    def expiration_info(self, uri, bucket_location = None):
543        bucket = uri.bucket()
544
545        request = self.create_request("BUCKET_LIST", bucket=bucket,
546                                      uri_params={'lifecycle': None})
547        try:
548            response = self.send_request(request)
549        except S3Error as e:
550            if e.status == 404:
551                debug("Could not get /?lifecycle - lifecycle probably not "
552                      "configured for this bucket")
553                return None
554            elif e.status == 501:
555                debug("Could not get /?lifecycle - lifecycle support not "
556                      "implemented by the server")
557                return None
558            raise
559
560        root_tag_name = getRootTagName(response['data'])
561        if root_tag_name != "LifecycleConfiguration":
562            debug("Could not get /?lifecycle - unexpected xml response: "
563                  "%s", root_tag_name)
564            return None
565        response['prefix'] = getTextFromXml(response['data'],
566                                            ".//Rule//Prefix")
567        response['date'] = getTextFromXml(response['data'],
568                                          ".//Rule//Expiration//Date")
569        response['days'] = getTextFromXml(response['data'],
570                                          ".//Rule//Expiration//Days")
571        return response
572
573    def expiration_set(self, uri, bucket_location = None):
574        if self.config.expiry_date and self.config.expiry_days:
575             raise ParameterError("Expect either --expiry-day or --expiry-date")
576        if not (self.config.expiry_date or self.config.expiry_days):
577             if self.config.expiry_prefix:
578                 raise ParameterError("Expect either --expiry-day or --expiry-date")
579             debug("del bucket lifecycle")
580             bucket = uri.bucket()
581             request = self.create_request("BUCKET_DELETE", bucket = bucket,
582                                           uri_params = {'lifecycle': None})
583        else:
584             request = self._expiration_set(uri)
585        response = self.send_request(request)
586        debug("Received response '%s'" % (response))
587        return response
588
589    def _expiration_set(self, uri):
590        debug("put bucket lifecycle")
591        body = '<LifecycleConfiguration>'
592        body += '  <Rule>'
593        body += ('    <Prefix>%s</Prefix>' % self.config.expiry_prefix)
594        body += ('    <Status>Enabled</Status>')
595        body += ('    <Expiration>')
596        if self.config.expiry_date:
597            body += ('    <Date>%s</Date>' % self.config.expiry_date)
598        elif self.config.expiry_days:
599            body += ('    <Days>%s</Days>' % self.config.expiry_days)
600        body += ('    </Expiration>')
601        body += '  </Rule>'
602        body += '</LifecycleConfiguration>'
603
604        headers = SortedDict(ignore_case = True)
605        headers['content-md5'] = compute_content_md5(body)
606        bucket = uri.bucket()
607        request =  self.create_request("BUCKET_CREATE", bucket = bucket,
608                                       headers = headers, body = body,
609                                       uri_params = {'lifecycle': None})
610        return (request)
611
612    def _guess_content_type(self, filename):
613        content_type = self.config.default_mime_type
614        content_charset = None
615
616        if filename == "-" and not self.config.default_mime_type:
617            raise ParameterError("You must specify --mime-type or --default-mime-type for files uploaded from stdin.")
618
619        if self.config.guess_mime_type:
620            if self.config.follow_symlinks:
621                filename = unicodise(os.path.realpath(deunicodise(filename)))
622            if self.config.use_mime_magic:
623                (content_type, content_charset) = mime_magic(filename)
624            else:
625                (content_type, content_charset) = mimetypes.guess_type(filename)
626        if not content_type:
627            content_type = self.config.default_mime_type
628        return (content_type, content_charset)
629
630    def stdin_content_type(self):
631        content_type = self.config.mime_type
632        if not content_type:
633            content_type = self.config.default_mime_type
634
635        content_type += "; charset=" + self.config.encoding.upper()
636        return content_type
637
638    def content_type(self, filename=None):
639        # explicit command line argument always wins
640        content_type = self.config.mime_type
641        content_charset = None
642
643        if filename == u'-':
644            return self.stdin_content_type()
645        if not content_type:
646            (content_type, content_charset) = self._guess_content_type(filename)
647
648        ## add charset to content type
649        if not content_charset:
650            content_charset = self.config.encoding.upper()
651        if self.add_encoding(filename, content_type) and content_charset is not None:
652            content_type = content_type + "; charset=" + content_charset
653
654        return content_type
655
656    def add_encoding(self, filename, content_type):
657        if 'charset=' in content_type:
658           return False
659        exts = self.config.add_encoding_exts.split(',')
660        if exts[0]=='':
661            return False
662        parts = filename.rsplit('.',2)
663        if len(parts) < 2:
664            return False
665        ext = parts[1]
666        if ext in exts:
667            return True
668        else:
669            return False
670
671    def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
672        # TODO TODO
673        # Make it consistent with stream-oriented object_get()
674        if uri.type != "s3":
675            raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
676
677        if filename != "-" and not os.path.isfile(deunicodise(filename)):
678            raise InvalidFileError(u"Not a regular file")
679        try:
680            if filename == "-":
681                src_stream = io.open(sys.stdin.fileno(), mode='rb', closefd=False)
682                src_stream.stream_name = u'<stdin>'
683                size = 0
684            else:
685                src_stream = io.open(deunicodise(filename), mode='rb')
686                src_stream.stream_name = filename
687                size = os.stat(deunicodise(filename))[ST_SIZE]
688        except (IOError, OSError) as e:
689            raise InvalidFileError(u"%s" % e.strerror)
690
691        headers = SortedDict(ignore_case = True)
692        if extra_headers:
693            headers.update(extra_headers)
694
695        ## Set server side encryption
696        if self.config.server_side_encryption:
697            headers["x-amz-server-side-encryption"] = "AES256"
698
699        ## Set kms headers
700        if self.config.kms_key:
701            headers['x-amz-server-side-encryption'] = 'aws:kms'
702            headers['x-amz-server-side-encryption-aws-kms-key-id'] = self.config.kms_key
703
704        ## MIME-type handling
705        headers["content-type"] = self.content_type(filename=filename)
706
707        ## Other Amazon S3 attributes
708        if self.config.acl_public:
709            headers["x-amz-acl"] = "public-read"
710        headers["x-amz-storage-class"] = self.storage_class()
711
712        ## Multipart decision
713        multipart = False
714        if not self.config.enable_multipart and filename == "-":
715            raise ParameterError("Multi-part upload is required to upload from stdin")
716        if self.config.enable_multipart:
717            if size > self.config.multipart_chunk_size_mb * SIZE_1MB or filename == "-":
718                multipart = True
719                if size > self.config.multipart_max_chunks * self.config.multipart_chunk_size_mb * SIZE_1MB:
720                    raise ParameterError("Chunk size %d MB results in more than %d chunks. Please increase --multipart-chunk-size-mb" % \
721                          (self.config.multipart_chunk_size_mb, self.config.multipart_max_chunks))
722        if multipart:
723            # Multipart requests are quite different... drop here
724            return self.send_file_multipart(src_stream, headers, uri, size, extra_label)
725
726        ## Not multipart...
727        if self.config.put_continue:
728            # Note, if input was stdin, we would be performing multipart upload.
729            # So this will always work as long as the file already uploaded was
730            # not uploaded via MultiUpload, in which case its ETag will not be
731            # an md5.
732            try:
733                info = self.object_info(uri)
734            except Exception:
735                info = None
736
737            if info is not None:
738                remote_size = int(info['headers']['content-length'])
739                remote_checksum = info['headers']['etag'].strip('"\'')
740                if size == remote_size:
741                    checksum = calculateChecksum('', src_stream, 0, size, self.config.send_chunk)
742                    if remote_checksum == checksum:
743                        warning("Put: size and md5sum match for %s, skipping." % uri)
744                        return
745                    else:
746                        warning("MultiPart: checksum (%s vs %s) does not match for %s, reuploading."
747                                % (remote_checksum, checksum, uri))
748                else:
749                    warning("MultiPart: size (%d vs %d) does not match for %s, reuploading."
750                            % (remote_size, size, uri))
751
752        headers["content-length"] = str(size)
753        request = self.create_request("OBJECT_PUT", uri = uri, headers = headers)
754        labels = { 'source' : filename, 'destination' : uri.uri(), 'extra' : extra_label }
755        response = self.send_file(request, src_stream, labels)
756        return response
757
758    def object_get(self, uri, stream, dest_name, start_position = 0, extra_label = ""):
759        if uri.type != "s3":
760            raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
761        request = self.create_request("OBJECT_GET", uri = uri)
762        labels = { 'source' : uri.uri(), 'destination' : dest_name, 'extra' : extra_label }
763        response = self.recv_file(request, stream, labels, start_position)
764        return response
765
766    def object_batch_delete(self, remote_list):
767        """ Batch delete given a remote_list """
768        uris = [remote_list[item]['object_uri_str'] for item in remote_list]
769        self.object_batch_delete_uri_strs(uris)
770
771    def object_batch_delete_uri_strs(self, uris):
772        """ Batch delete given a list of object uris """
773        def compose_batch_del_xml(bucket, key_list):
774            body = u"<?xml version=\"1.0\" encoding=\"UTF-8\"?><Delete>"
775            for key in key_list:
776                uri = S3Uri(key)
777                if uri.type != "s3":
778                    raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
779                if not uri.has_object():
780                    raise ValueError("URI '%s' has no object" % key)
781                if uri.bucket() != bucket:
782                    raise ValueError("The batch should contain keys from the same bucket")
783                object = saxutils.escape(uri.object())
784                body += u"<Object><Key>%s</Key></Object>" % object
785            body += u"</Delete>"
786            body = encode_to_s3(body)
787            return body
788
789        batch = uris
790        if len(batch) == 0:
791            raise ValueError("Key list is empty")
792        bucket = S3Uri(batch[0]).bucket()
793        request_body = compose_batch_del_xml(bucket, batch)
794        headers = SortedDict({'content-md5': compute_content_md5(request_body),
795                   'content-type': 'application/xml'}, ignore_case=True)
796        request = self.create_request("BATCH_DELETE", bucket = bucket,
797                                      headers = headers, body = request_body,
798                                      uri_params = {'delete': None})
799        response = self.send_request(request)
800        return response
801
802    def object_delete(self, uri):
803        if uri.type != "s3":
804            raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
805        request = self.create_request("OBJECT_DELETE", uri = uri)
806        response = self.send_request(request)
807        return response
808
809    def object_restore(self, uri):
810        if uri.type != "s3":
811            raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
812        if self.config.restore_days < 1:
813            raise ParameterError("You must restore a file for 1 or more days")
814        if self.config.restore_priority not in ['Standard', 'Expedited', 'Bulk']:
815            raise ParameterError("Valid restoration priorities: bulk, standard, expedited")
816        body =   '<RestoreRequest xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'
817        body += ('  <Days>%s</Days>' % self.config.restore_days)
818        body +=  '  <GlacierJobParameters>'
819        body += ('    <Tier>%s</Tier>' % self.config.restore_priority)
820        body +=  '  </GlacierJobParameters>'
821        body +=  '</RestoreRequest>'
822        request = self.create_request("OBJECT_POST", uri = uri, body = body,
823                                      uri_params = {'restore': None})
824        response = self.send_request(request)
825        debug("Received response '%s'" % (response))
826        return response
827
828    def _sanitize_headers(self, headers):
829        to_remove = [
830            # from http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
831            'date',
832            'content-length',
833            'last-modified',
834            'content-md5',
835            'x-amz-version-id',
836            'x-amz-delete-marker',
837            # other headers returned from object_info() we don't want to send
838            'accept-ranges',
839            'connection',
840            'etag',
841            'server',
842            'x-amz-id-2',
843            'x-amz-request-id',
844            # Other headers that are not copying by a direct copy
845            'x-amz-storage-class',
846            ## We should probably also add server-side encryption headers
847        ]
848
849        for h in to_remove + self.config.remove_headers:
850            if h.lower() in headers:
851                del headers[h.lower()]
852        return headers
853
854    def object_copy(self, src_uri, dst_uri, extra_headers=None,
855                    src_size=None, extra_label="", replace_meta=False):
856        """Remote copy an object and eventually set metadata
857
858        Note: A little memo description of the nightmare for performance here:
859        ** FOR AWS, 2 cases:
860        - COPY will copy the metadata of the source to dest, but you can't
861        modify them. Any additional header will be ignored anyway.
862        - REPLACE will set the additional metadata headers that are provided
863        but will not copy any of the source headers.
864        So, to add to existing meta during copy, you have to do an object_info
865        to get original source headers, then modify, then use REPLACE for the
866        copy operation.
867
868        ** For Minio and maybe other implementations:
869        - if additional headers are sent, they will be set to the destination
870        on top of source original meta in all cases COPY and REPLACE.
871        It is a nice behavior except that it is different of the aws one.
872
873        As it was still too easy, there is another catch:
874        In all cases, for multipart copies, metadata data are never copied
875        from the source.
876        """
877        if src_uri.type != "s3":
878            raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
879        if dst_uri.type != "s3":
880            raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type)
881        if self.config.acl_public is None:
882            try:
883                acl = self.get_acl(src_uri)
884            except S3Error as exc:
885                # Ignore the exception and don't fail the copy
886                # if the server doesn't support setting ACLs
887                if exc.status != 501:
888                    raise exc
889                acl = None
890
891        multipart = False
892        headers = None
893
894        if extra_headers or self.config.mime_type:
895            # Force replace, that will force getting meta with object_info()
896            replace_meta = True
897
898        if replace_meta:
899            src_info = self.object_info(src_uri)
900            headers = src_info['headers']
901            src_size = int(headers["content-length"])
902
903        if self.config.enable_multipart:
904            # Get size of remote source only if multipart is enabled and that no
905            # size info was provided
906            src_headers = headers
907            if src_size is None:
908                src_info = self.object_info(src_uri)
909                src_headers = src_info['headers']
910                src_size = int(src_headers["content-length"])
911
912            # If we are over the grand maximum size for a normal copy/modify
913            # (> 5GB) go nuclear and use multipart copy as the only option to
914            # modify an object.
915            # Reason is an aws s3 design bug. See:
916            # https://github.com/aws/aws-sdk-java/issues/367
917            if src_uri is dst_uri:
918                # optimisation in the case of modify
919                threshold = MultiPartUpload.MAX_CHUNK_SIZE_MB * SIZE_1MB
920            else:
921                threshold = self.config.multipart_copy_chunk_size_mb * SIZE_1MB
922
923            if src_size > threshold:
924                # Sadly, s3 has a bad logic as metadata will not be copied for
925                # multipart copy unlike what is done for direct copies.
926                # TODO: Optimize by re-using the object_info request done
927                # earlier earlier at fetch remote stage, and preserve headers.
928                if src_headers is None:
929                    src_info = self.object_info(src_uri)
930                    src_headers = src_info['headers']
931                    src_size = int(src_headers["content-length"])
932                headers = src_headers
933                multipart = True
934
935        if headers:
936            self._sanitize_headers(headers)
937            headers = SortedDict(headers, ignore_case=True)
938        else:
939            headers = SortedDict(ignore_case=True)
940
941        # Following meta data are updated even in COPY by aws
942        if self.config.acl_public:
943            headers["x-amz-acl"] = "public-read"
944
945        headers["x-amz-storage-class"] = self.storage_class()
946
947        ## Set server side encryption
948        if self.config.server_side_encryption:
949            headers["x-amz-server-side-encryption"] = "AES256"
950
951        ## Set kms headers
952        if self.config.kms_key:
953            headers['x-amz-server-side-encryption'] = 'aws:kms'
954            headers['x-amz-server-side-encryption-aws-kms-key-id'] = \
955                self.config.kms_key
956
957        # Following meta data are not updated in simple COPY by aws.
958        if extra_headers:
959            headers.update(extra_headers)
960
961        if self.config.mime_type:
962            headers["content-type"] = self.config.mime_type
963
964        # "COPY" or "REPLACE"
965        if not replace_meta:
966            headers['x-amz-metadata-directive'] = "COPY"
967        else:
968            headers['x-amz-metadata-directive'] = "REPLACE"
969
970        if multipart:
971            # Multipart decision. Only do multipart copy for remote s3 files
972            # bigger than the multipart copy threshold.
973
974            # Multipart requests are quite different... delegate
975            response = self.copy_file_multipart(src_uri, dst_uri, src_size,
976                                                headers, extra_label)
977        else:
978            # Not multipart... direct request
979            headers['x-amz-copy-source'] = s3_quote(
980                "/%s/%s" % (src_uri.bucket(), src_uri.object()),
981                quote_backslashes=False, unicode_output=True)
982
983            request = self.create_request("OBJECT_PUT", uri=dst_uri,
984                                          headers=headers)
985            response = self.send_request(request)
986
987        if response["data"] and getRootTagName(response["data"]) == "Error":
988            # http://doc.s3.amazonaws.com/proposals/copy.html
989            # Error during copy, status will be 200, so force error code 500
990            response["status"] = 500
991            error("Server error during the COPY operation. Overwrite response "
992                  "status to 500")
993            raise S3Error(response)
994
995        if self.config.acl_public is None and acl:
996            try:
997                self.set_acl(dst_uri, acl)
998            except S3Error as exc:
999                # Ignore the exception and don't fail the copy
1000                # if the server doesn't support setting ACLs
1001                if exc.status != 501:
1002                    raise exc
1003        return response
1004
1005    def object_modify(self, src_uri, dst_uri, extra_headers=None,
1006                      src_size=None, extra_label=""):
1007        # dst_uri = src_uri Will optimize by using multipart just in worst case
1008        return self.object_copy(src_uri, src_uri, extra_headers, src_size,
1009                                extra_label, replace_meta=True)
1010
1011    def object_move(self, src_uri, dst_uri, extra_headers=None,
1012                    src_size=None, extra_label=""):
1013        response_copy = self.object_copy(src_uri, dst_uri, extra_headers,
1014                                         src_size, extra_label)
1015        debug("Object %s copied to %s" % (src_uri, dst_uri))
1016        if not response_copy["data"] \
1017           or getRootTagName(response_copy["data"]) \
1018           in ["CopyObjectResult", "CompleteMultipartUploadResult"]:
1019            self.object_delete(src_uri)
1020            debug("Object '%s' deleted", src_uri)
1021        else:
1022            warning("Object '%s' NOT deleted because of an unexpected "
1023                    "response data content.", src_uri)
1024        return response_copy
1025
1026    def object_info(self, uri):
1027        request = self.create_request("OBJECT_HEAD", uri=uri)
1028        response = self.send_request(request)
1029        return response
1030
1031    def get_acl(self, uri):
1032        if uri.has_object():
1033            request = self.create_request("OBJECT_GET", uri=uri,
1034                                          uri_params={'acl': None})
1035        else:
1036            request = self.create_request("BUCKET_LIST", bucket=uri.bucket(),
1037                                          uri_params={'acl': None})
1038
1039        response = self.send_request(request)
1040        acl = ACL(response['data'])
1041        return acl
1042
1043    def set_acl(self, uri, acl):
1044        body = u"%s"% acl
1045        debug(u"set_acl(%s): acl-xml: %s" % (uri, body))
1046
1047        headers = SortedDict({'content-type': 'application/xml'}, ignore_case = True)
1048        if uri.has_object():
1049            request = self.create_request("OBJECT_PUT", uri = uri,
1050                                          headers = headers, body = body,
1051                                          uri_params = {'acl': None})
1052        else:
1053            request = self.create_request("BUCKET_CREATE", bucket = uri.bucket(),
1054                                          headers = headers, body = body,
1055                                          uri_params = {'acl': None})
1056
1057        response = self.send_request(request)
1058        return response
1059
1060    def get_policy(self, uri):
1061        request = self.create_request("BUCKET_LIST", bucket = uri.bucket(),
1062                                      uri_params = {'policy': None})
1063        response = self.send_request(request)
1064        return decode_from_s3(response['data'])
1065
1066    def set_policy(self, uri, policy):
1067        headers = SortedDict(ignore_case = True)
1068        # TODO check policy is proper json string
1069        headers['content-type'] = 'application/json'
1070        request = self.create_request("BUCKET_CREATE", uri = uri,
1071                                      headers=headers, body = policy,
1072                                      uri_params = {'policy': None})
1073        response = self.send_request(request)
1074        return response
1075
1076    def delete_policy(self, uri):
1077        request = self.create_request("BUCKET_DELETE", uri = uri,
1078                                      uri_params = {'policy': None})
1079        debug(u"delete_policy(%s)" % uri)
1080        response = self.send_request(request)
1081        return response
1082
1083    def get_cors(self, uri):
1084        request = self.create_request("BUCKET_LIST", bucket = uri.bucket(),
1085                                      uri_params = {'cors': None})
1086        response = self.send_request(request)
1087        return decode_from_s3(response['data'])
1088
1089    def set_cors(self, uri, cors):
1090        headers = SortedDict(ignore_case = True)
1091        # TODO check cors is proper json string
1092        headers['content-type'] = 'application/xml'
1093        headers['content-md5'] = compute_content_md5(cors)
1094        request = self.create_request("BUCKET_CREATE", uri = uri,
1095                                      headers=headers, body = cors,
1096                                      uri_params = {'cors': None})
1097        response = self.send_request(request)
1098        return response
1099
1100    def delete_cors(self, uri):
1101        request = self.create_request("BUCKET_DELETE", uri = uri,
1102                                      uri_params = {'cors': None})
1103        debug(u"delete_cors(%s)" % uri)
1104        response = self.send_request(request)
1105        return response
1106
1107    def set_lifecycle_policy(self, uri, policy):
1108        headers = SortedDict(ignore_case = True)
1109        headers['content-md5'] = compute_content_md5(policy)
1110        request = self.create_request("BUCKET_CREATE", uri = uri,
1111                                      headers=headers, body = policy,
1112                                      uri_params = {'lifecycle': None})
1113        debug(u"set_lifecycle_policy(%s): policy-xml: %s" % (uri, policy))
1114        response = self.send_request(request)
1115        return response
1116
1117    def set_payer(self, uri):
1118        headers = SortedDict(ignore_case = True)
1119        headers['content-type'] = 'application/xml'
1120        body = '<RequestPaymentConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">\n'
1121        if self.config.requester_pays:
1122            body += '<Payer>Requester</Payer>\n'
1123        else:
1124            body += '<Payer>BucketOwner</Payer>\n'
1125        body += '</RequestPaymentConfiguration>\n'
1126        request = self.create_request("BUCKET_CREATE", uri = uri, body = body,
1127                                      uri_params = {'requestPayment': None})
1128        response = self.send_request(request)
1129        return response
1130
1131    def get_lifecycle_policy(self, uri):
1132        request = self.create_request("BUCKET_LIST", bucket = uri.bucket(),
1133                                      uri_params = {'lifecycle': None})
1134        debug(u"get_lifecycle_policy(%s)" % uri)
1135        response = self.send_request(request)
1136
1137        debug(u"%s: Got Lifecycle Policy" % response['status'])
1138        return response
1139
1140    def delete_lifecycle_policy(self, uri):
1141        request = self.create_request("BUCKET_DELETE", uri = uri,
1142                                      uri_params = {'lifecycle': None})
1143        debug(u"delete_lifecycle_policy(%s)" % uri)
1144        response = self.send_request(request)
1145        return response
1146
1147    def get_multipart(self, uri, uri_params=None, limit=-1):
1148        upload_list = []
1149        for truncated, uploads in self.get_multipart_streaming(uri,
1150                                                               uri_params,
1151                                                               limit):
1152            upload_list.extend(uploads)
1153
1154        return upload_list
1155
1156    def get_multipart_streaming(self, uri, uri_params=None, limit=-1):
1157        uri_params = uri_params and uri_params.copy() or {}
1158        bucket = uri.bucket()
1159
1160        truncated = True
1161        num_objects = 0
1162        max_keys = limit
1163
1164        # It is the "uploads: None" in uri_params that will change the
1165        # behavior of bucket_list to return multiparts instead of keys
1166        uri_params['uploads'] = None
1167        while truncated:
1168            response = self.bucket_list_noparse(bucket, recursive=True,
1169                                                uri_params=uri_params,
1170                                                max_keys=max_keys)
1171
1172            xml_data = response["data"]
1173            # extract list of info of uploads
1174            upload_list = getListFromXml(xml_data, "Upload")
1175            num_objects += len(upload_list)
1176            if limit > num_objects:
1177                max_keys = limit - num_objects
1178
1179            xml_truncated = getTextFromXml(xml_data, ".//IsTruncated")
1180            if not xml_truncated or xml_truncated.lower() == "false":
1181                truncated = False
1182
1183            if truncated:
1184                if limit == -1 or num_objects < limit:
1185                    if upload_list:
1186                        next_key = getTextFromXml(xml_data, "NextKeyMarker")
1187                        if not next_key:
1188                            next_key = upload_list[-1]["Key"]
1189                        uri_params['KeyMarker'] = next_key
1190
1191                        upload_id_marker = getTextFromXml(
1192                            xml_data, "NextUploadIdMarker")
1193                        if upload_id_marker:
1194                            uri_params['UploadIdMarker'] = upload_id_marker
1195                        elif 'UploadIdMarker' in uri_params:
1196                            # Clear any pre-existing value
1197                            del uri_params['UploadIdMarker']
1198                    else:
1199                        # Unexpectedly, the server lied, and so the previous
1200                        # response was not truncated. So, no new key to get.
1201                        yield False, upload_list
1202                        break
1203                    debug("Listing continues after '%s'" %
1204                          uri_params['KeyMarker'])
1205                else:
1206                    yield truncated, upload_list
1207                    break
1208            yield truncated, upload_list
1209
1210    def list_multipart(self, uri, upload_id, uri_params=None, limit=-1):
1211        part_list = []
1212        for truncated, parts in self.list_multipart_streaming(uri,
1213                                                              upload_id,
1214                                                              uri_params,
1215                                                              limit):
1216            part_list.extend(parts)
1217
1218        return part_list
1219
1220    def list_multipart_streaming(self, uri, upload_id, uri_params=None,
1221                                 limit=-1):
1222        uri_params = uri_params and uri_params.copy() or {}
1223
1224        truncated = True
1225        num_objects = 0
1226        max_parts = limit
1227
1228        while truncated:
1229            response = self.list_multipart_noparse(uri, upload_id,
1230                                                   uri_params, max_parts)
1231
1232            xml_data = response["data"]
1233            # extract list of multipart upload parts
1234            part_list = getListFromXml(xml_data, "Part")
1235            num_objects += len(part_list)
1236            if limit > num_objects:
1237                max_parts = limit - num_objects
1238
1239            xml_truncated = getTextFromXml(xml_data, ".//IsTruncated")
1240            if not xml_truncated or xml_truncated.lower() == "false":
1241                truncated = False
1242
1243            if truncated:
1244                if limit == -1 or num_objects < limit:
1245                    if part_list:
1246                        next_part_number = getTextFromXml(
1247                            xml_data, "NextPartNumberMarker")
1248                        if not next_part_number:
1249                            next_part_number = part_list[-1]["PartNumber"]
1250                        uri_params['part-number-marker'] = next_part_number
1251                    else:
1252                        # Unexpectedly, the server lied, and so the previous
1253                        # response was not truncated. So, no new part to get.
1254                        yield False, part_list
1255                        break
1256                    debug("Listing continues after Part '%s'" %
1257                          uri_params['part-number-marker'])
1258                else:
1259                    yield truncated, part_list
1260                    break
1261            yield truncated, part_list
1262
1263    def list_multipart_noparse(self, uri, upload_id, uri_params=None,
1264                               max_parts=-1):
1265        if uri_params is None:
1266            uri_params = {}
1267        if max_parts != -1:
1268            uri_params['max-parts'] = str(max_parts)
1269        uri_params['uploadId'] = upload_id
1270        request = self.create_request("OBJECT_GET", uri=uri,
1271                                      uri_params=uri_params)
1272        response = self.send_request(request)
1273        return response
1274
1275    def abort_multipart(self, uri, id):
1276        request = self.create_request("OBJECT_DELETE", uri = uri,
1277                                      uri_params = {'uploadId': id})
1278        response = self.send_request(request)
1279        return response
1280
1281    def get_accesslog(self, uri):
1282        request = self.create_request("BUCKET_LIST", bucket = uri.bucket(),
1283                                      uri_params = {'logging': None})
1284        response = self.send_request(request)
1285        accesslog = AccessLog(response['data'])
1286        return accesslog
1287
1288    def set_accesslog_acl(self, uri):
1289        acl = self.get_acl(uri)
1290        debug("Current ACL(%s): %s" % (uri.uri(), acl))
1291        acl.appendGrantee(GranteeLogDelivery("READ_ACP"))
1292        acl.appendGrantee(GranteeLogDelivery("WRITE"))
1293        debug("Updated ACL(%s): %s" % (uri.uri(), acl))
1294        self.set_acl(uri, acl)
1295
1296    def set_accesslog(self, uri, enable, log_target_prefix_uri = None, acl_public = False):
1297        accesslog = AccessLog()
1298        if enable:
1299            accesslog.enableLogging(log_target_prefix_uri)
1300            accesslog.setAclPublic(acl_public)
1301        else:
1302            accesslog.disableLogging()
1303
1304        body = "%s" % accesslog
1305        debug(u"set_accesslog(%s): accesslog-xml: %s" % (uri, body))
1306
1307        request = self.create_request("BUCKET_CREATE", bucket = uri.bucket(),
1308                                      body = body, uri_params = {'logging': None})
1309        try:
1310            response = self.send_request(request)
1311        except S3Error as e:
1312            if e.info['Code'] == "InvalidTargetBucketForLogging":
1313                info("Setting up log-delivery ACL for target bucket.")
1314                self.set_accesslog_acl(S3Uri(u"s3://%s" % log_target_prefix_uri.bucket()))
1315                response = self.send_request(request)
1316            else:
1317                raise
1318        return accesslog, response
1319
1320    def create_request(self, operation, uri = None, bucket = None, object = None, headers = None, body = "", uri_params = None):
1321        resource = { 'bucket' : None, 'uri' : "/" }
1322
1323        if uri and (bucket or object):
1324            raise ValueError("Both 'uri' and either 'bucket' or 'object' parameters supplied")
1325        ## If URI is given use that instead of bucket/object parameters
1326        if uri:
1327            bucket = uri.bucket()
1328            object = uri.has_object() and uri.object() or None
1329
1330        if bucket:
1331            resource['bucket'] = bucket
1332            if object:
1333                resource['uri'] = "/" + object
1334
1335        method_string = S3.http_methods.getkey(S3.operations[operation] & S3.http_methods["MASK"])
1336
1337        request = S3Request(self, method_string, resource, headers, body, uri_params)
1338
1339        debug("CreateRequest: resource[uri]=%s", resource['uri'])
1340        return request
1341
1342    def _fail_wait(self, retries):
1343        # Wait a few seconds. The more it fails the more we wait.
1344        return (self._max_retries - retries + 1) * 3
1345
1346    def _http_redirection_handler(self, request, response, fn, *args, **kwargs):
1347        # Region info might already be available through the x-amz-bucket-region header
1348        redir_region = response['headers'].get('x-amz-bucket-region')
1349
1350        if 'data' in response and len(response['data']) > 0:
1351            redir_bucket = getTextFromXml(response['data'], ".//Bucket")
1352            redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
1353            self.set_hostname(redir_bucket, redir_hostname)
1354            info(u'Redirected to: %s', redir_hostname)
1355            if redir_region:
1356                S3Request.region_map[redir_bucket] = redir_region
1357                info(u'Redirected to region: %s', redir_region)
1358            return fn(*args, **kwargs)
1359        elif request.method_string == 'HEAD':
1360            # Head is a special case, redirection info usually are in the body
1361            # but there is no body for an HEAD request.
1362            location_url = response['headers'].get('location')
1363            if location_url:
1364                # Sometimes a "location" http header could be available and
1365                # can help us deduce the redirection path.
1366                # It is the case of "dns-style" syntax, but not for "path-style" syntax.
1367                if location_url.startswith("http://"):
1368                    location_url = location_url[7:]
1369                elif location_url.startswith("https://"):
1370                    location_url = location_url[8:]
1371                location_url = urlparse('https://' + location_url).hostname
1372                redir_bucket = request.resource['bucket']
1373                self.set_hostname(redir_bucket, location_url)
1374                info(u'Redirected to: %s', location_url)
1375                if redir_region:
1376                    S3Request.region_map[redir_bucket] = redir_region
1377                    info(u'Redirected to region: %s', redir_region)
1378                return fn(*args, **kwargs)
1379            warning(u'Redirection error: No info provided by the server to where should be forwarded the request (HEAD request). (Hint target region: %s)', redir_region)
1380
1381        raise S3Error(response)
1382
1383    def _http_400_handler(self, request, response, fn, *args, **kwargs):
1384        """
1385        Returns None if no handler available for the specific error code
1386        """
1387        # AWS response AuthorizationHeaderMalformed means we sent the request to the wrong region
1388        # get the right region out of the response and send it there.
1389        if 'data' in response and len(response['data']) > 0:
1390            failureCode = getTextFromXml(response['data'], 'Code')
1391            if failureCode == 'AuthorizationHeaderMalformed':
1392                # we sent the request to the wrong region
1393                region = getTextFromXml(response['data'], 'Region')
1394                if region is not None:
1395                    S3Request.region_map[request.resource['bucket']] = region
1396                    info('Forwarding request to %s', region)
1397                    return fn(*args, **kwargs)
1398                else:
1399                    warning(u'Could not determine bucket the location. Please consider using the --region parameter.')
1400
1401            elif failureCode == 'InvalidRequest':
1402                message = getTextFromXml(response['data'], 'Message')
1403                if message == 'The authorization mechanism you have provided is not supported. Please use AWS4-HMAC-SHA256.':
1404                    debug(u'Endpoint requires signature v4')
1405                    self.endpoint_requires_signature_v4 = True
1406                    return fn(*args, **kwargs)
1407
1408            elif failureCode == 'InvalidArgument':
1409                # returned by DreamObjects on send_request and send_file,
1410                # which doesn't support signature v4. Retry with signature v2
1411                if not request.use_signature_v2() and not self.fallback_to_signature_v2: # have not tried with v2 yet
1412                    debug(u'Falling back to signature v2')
1413                    self.fallback_to_signature_v2 = True
1414                    return fn(*args, **kwargs)
1415        else:
1416            # returned by DreamObjects on recv_file, which doesn't support signature v4. Retry with signature v2
1417            if not request.use_signature_v2() and not self.fallback_to_signature_v2:
1418                # have not tried with v2 yet
1419                debug(u'Falling back to signature v2')
1420                self.fallback_to_signature_v2 = True
1421                return fn(*args, **kwargs)
1422
1423        return None
1424
1425    def _http_403_handler(self, request, response, fn, *args, **kwargs):
1426        if 'data' in response and len(response['data']) > 0:
1427            failureCode = getTextFromXml(response['data'], 'Code')
1428            if failureCode == 'AccessDenied':
1429                # traditional HTTP 403
1430                message = getTextFromXml(response['data'], 'Message')
1431                if message == 'AWS authentication requires a valid Date or x-amz-date header': # message from an Eucalyptus walrus server
1432                    if not request.use_signature_v2() and not self.fallback_to_signature_v2: # have not tried with v2 yet
1433                        debug(u'Falling back to signature v2')
1434                        self.fallback_to_signature_v2 = True
1435                        return fn(*args, **kwargs)
1436
1437        raise S3Error(response)
1438
1439    def update_region_inner_request(self, request):
1440        """Get and update region for the request if needed.
1441
1442        Signature v4 needs the region of the bucket or the request will fail
1443        with the indication of the correct region.
1444        We are trying to avoid this failure by pre-emptively getting the
1445        correct region to use, if not provided by the user.
1446        """
1447        if request.resource.get('bucket') and not request.use_signature_v2() \
1448           and S3Request.region_map.get(
1449                request.resource['bucket'], Config().bucket_location
1450               ) == "US":
1451            debug("===== SEND Inner request to determine the bucket region "
1452                  "=====")
1453            try:
1454                s3_uri = S3Uri(u's3://' + request.resource['bucket'])
1455                # "force_us_default" should prevent infinite recursivity because
1456                # it will set the region_map dict.
1457                region = self.get_bucket_location(s3_uri, force_us_default=True)
1458                if region is not None:
1459                    S3Request.region_map[request.resource['bucket']] = region
1460                debug("===== SUCCESS Inner request to determine the bucket "
1461                      "region (%r) =====", region)
1462            except Exception as exc:
1463                # Ignore errors, it is just an optimisation, so nothing critical
1464                debug("getlocation inner request failure reason: %s", exc)
1465                debug("===== FAILED Inner request to determine the bucket "
1466                      "region =====")
1467
1468    def send_request(self, request, retries = _max_retries):
1469        self.update_region_inner_request(request)
1470
1471        request.body = encode_to_s3(request.body)
1472        headers = request.headers
1473
1474        method_string, resource, headers = request.get_triplet()
1475        response = {}
1476        debug("Processing request, please wait...")
1477
1478        conn = None
1479        try:
1480            conn = ConnMan.get(self.get_hostname(resource['bucket']))
1481            # TODO: Check what was supposed to be the usage of conn.path here
1482            # Currently this is always "None" all the time as not defined in ConnMan
1483            uri = self.format_uri(resource, conn.path)
1484            debug("Sending request method_string=%r, uri=%r, headers=%r, body=(%i bytes)" % (method_string, uri, headers, len(request.body or "")))
1485            conn.c.request(method_string, uri, request.body, headers)
1486            http_response = conn.c.getresponse()
1487            response["status"] = http_response.status
1488            response["reason"] = http_response.reason
1489            response["headers"] = convertHeaderTupleListToDict(http_response.getheaders())
1490            response["data"] =  http_response.read()
1491            if "x-amz-meta-s3cmd-attrs" in response["headers"]:
1492                attrs = parse_attrs_header(response["headers"]["x-amz-meta-s3cmd-attrs"])
1493                response["s3cmd-attrs"] = attrs
1494            ConnMan.put(conn)
1495        except (S3SSLError, S3SSLCertificateError):
1496            # In case of failure to validate the certificate for a ssl
1497            # connection,no need to retry, abort immediately
1498            raise
1499        except (IOError, Exception) as e:
1500            debug("Response:\n" + pprint.pformat(response))
1501            if ((hasattr(e, 'errno') and e.errno
1502                 and e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT))
1503                or "[Errno 104]" in str(e)
1504                or "[Errno 32]" in str(e)
1505               ) and not isinstance(e, SocketTimeoutException):
1506                raise
1507            # When the connection is broken, BadStatusLine is raised with py2
1508            # and RemoteDisconnected is raised by py3 with a trap:
1509            # RemoteDisconnected has an errno field with a None value.
1510
1511            # close the connection and re-establish
1512            ConnMan.close(conn)
1513            if retries:
1514                warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
1515                warning("Waiting %d sec..." % self._fail_wait(retries))
1516                time.sleep(self._fail_wait(retries))
1517                return self.send_request(request, retries - 1)
1518            else:
1519                raise S3RequestError("Request failed for: %s" % resource['uri'])
1520
1521        except:
1522            # Only KeyboardInterrupt and SystemExit will not be covered by Exception
1523            debug("Response:\n" + pprint.pformat(response))
1524            raise
1525
1526        debug("Response:\n" + pprint.pformat(response))
1527
1528        if response["status"] in [301, 307]:
1529            ## RedirectTemporary or RedirectPermanent
1530            return self._http_redirection_handler(request, response, self.send_request, request)
1531
1532        if response["status"] == 400:
1533            handler_fn = self._http_400_handler(request, response, self.send_request, request)
1534            if handler_fn:
1535                return handler_fn
1536            err = S3Error(response)
1537            if retries and err.code in ['BadDigest', 'OperationAborted',
1538                                        'TokenRefreshRequired', 'RequestTimeout']:
1539                warning(u"Retrying failed request: %s (%s)" % (resource['uri'], err))
1540                warning("Waiting %d sec..." % self._fail_wait(retries))
1541                time.sleep(self._fail_wait(retries))
1542                return self.send_request(request, retries - 1)
1543            raise err
1544
1545        if response["status"] == 403:
1546            return self._http_403_handler(request, response, self.send_request, request)
1547        if response["status"] == 405: # Method Not Allowed.  Don't retry.
1548            raise S3Error(response)
1549
1550        if response["status"] >= 500 or response["status"] == 429:
1551            e = S3Error(response)
1552
1553            if response["status"] == 501:
1554                ## NotImplemented server error - no need to retry
1555                retries = 0
1556
1557            if retries:
1558                warning(u"Retrying failed request: %s (%s)" % (resource['uri'], e))
1559                warning("Waiting %d sec..." % self._fail_wait(retries))
1560                time.sleep(self._fail_wait(retries))
1561                return self.send_request(request, retries - 1)
1562            else:
1563                raise e
1564
1565        if response["status"] < 200 or response["status"] > 299:
1566            raise S3Error(response)
1567
1568        return response
1569
1570    def send_request_with_progress(self, request, labels, operation_size=0):
1571        """Wrapper around send_request for slow requests.
1572
1573        To be able to show progression for small requests
1574        """
1575        if not self.config.progress_meter:
1576            info("Sending slow request, please wait...")
1577            return self.send_request(request)
1578
1579        if 'action' not in labels:
1580            labels[u'action'] = u'request'
1581        progress = self.config.progress_class(labels, operation_size)
1582
1583        try:
1584            response = self.send_request(request)
1585        except Exception as exc:
1586            progress.done("failed")
1587            raise
1588
1589        progress.update(current_position=operation_size)
1590        progress.done("done")
1591
1592        return response
1593
1594    def send_file(self, request, stream, labels, buffer = '', throttle = 0,
1595                  retries = _max_retries, offset = 0, chunk_size = -1,
1596                  use_expect_continue = None):
1597        self.update_region_inner_request(request)
1598
1599        if use_expect_continue is None:
1600            use_expect_continue = self.config.use_http_expect
1601        if self.expect_continue_not_supported and use_expect_continue:
1602            use_expect_continue = False
1603
1604        headers = request.headers
1605
1606        size_left = size_total = int(headers["content-length"])
1607
1608        filename = stream.stream_name
1609        if self.config.progress_meter:
1610            labels[u'action'] = u'upload'
1611            progress = self.config.progress_class(labels, size_total)
1612        else:
1613            info("Sending file '%s', please wait..." % filename)
1614        timestamp_start = time.time()
1615
1616        if buffer:
1617            sha256_hash = checksum_sha256_buffer(buffer, offset, size_total)
1618        else:
1619            sha256_hash = checksum_sha256_file(filename, offset, size_total)
1620        request.body = sha256_hash
1621
1622        if use_expect_continue:
1623            if not size_total:
1624                use_expect_continue = False
1625            else:
1626                headers['expect'] = '100-continue'
1627
1628        method_string, resource, headers = request.get_triplet()
1629        try:
1630            conn = ConnMan.get(self.get_hostname(resource['bucket']))
1631            conn.c.putrequest(method_string, self.format_uri(resource, conn.path))
1632            for header in headers.keys():
1633                conn.c.putheader(encode_to_s3(header), encode_to_s3(headers[header]))
1634            conn.c.endheaders()
1635        except ParameterError as e:
1636            raise
1637        except Exception as e:
1638            if self.config.progress_meter:
1639                progress.done("failed")
1640            if retries:
1641                warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
1642                warning("Waiting %d sec..." % self._fail_wait(retries))
1643                time.sleep(self._fail_wait(retries))
1644                # Connection error -> same throttle value
1645                return self.send_file(request, stream, labels, buffer, throttle, retries - 1, offset, chunk_size)
1646            else:
1647                raise S3UploadError("Upload failed for: %s" % resource['uri'])
1648        if buffer == '':
1649            stream.seek(offset)
1650        md5_hash = md5()
1651
1652        try:
1653            http_response = None
1654            if use_expect_continue:
1655                # Wait for the 100-Continue before sending the content
1656                readable, writable, exceptional = select.select([conn.c.sock],[], [], EXPECT_CONTINUE_TIMEOUT)
1657                if readable:
1658                    # 100-CONTINUE STATUS RECEIVED, get it before continuing.
1659                    http_response = conn.c.getresponse()
1660                elif not writable and not exceptional:
1661                    warning("HTTP Expect Continue feature disabled because of no reply of the server in %.2fs.", EXPECT_CONTINUE_TIMEOUT)
1662                    self.expect_continue_not_supported = True
1663                    use_expect_continue = False
1664
1665            if not use_expect_continue or (http_response and http_response.status == ConnMan.CONTINUE):
1666                if http_response:
1667                    # CONTINUE case. Reset the response
1668                    http_response.read()
1669                    conn.c._HTTPConnection__state = ConnMan._CS_REQ_SENT
1670
1671                while size_left > 0:
1672                    #debug("SendFile: Reading up to %d bytes from '%s' - remaining bytes: %s" % (self.config.send_chunk, filename, size_left))
1673                    l = min(self.config.send_chunk, size_left)
1674                    if buffer == '':
1675                        data = stream.read(l)
1676                    else:
1677                        data = buffer
1678
1679                    if not data:
1680                        raise InvalidFileError("File smaller than expected. Was the file truncated?")
1681
1682                    if self.config.limitrate > 0:
1683                        start_time = time.time()
1684
1685                    md5_hash.update(data)
1686
1687                    conn.c.wrapper_send_body(data)
1688                    if self.config.progress_meter:
1689                        progress.update(delta_position = len(data))
1690                    size_left -= len(data)
1691
1692                    #throttle
1693                    limitrate_throttle = throttle
1694                    if self.config.limitrate > 0:
1695                        real_duration = time.time() - start_time
1696                        expected_duration = float(l) / self.config.limitrate
1697                        limitrate_throttle = max(expected_duration - real_duration, limitrate_throttle)
1698                    if limitrate_throttle:
1699                        time.sleep(min(limitrate_throttle, self.config.throttle_max))
1700
1701                md5_computed = md5_hash.hexdigest()
1702                http_response = conn.c.getresponse()
1703
1704            response = {}
1705            response["status"] = http_response.status
1706            response["reason"] = http_response.reason
1707            response["headers"] = convertHeaderTupleListToDict(http_response.getheaders())
1708            response["data"] = http_response.read()
1709            response["size"] = size_total
1710            ConnMan.put(conn)
1711            debug(u"Response:\n" + pprint.pformat(response))
1712        except ParameterError as e:
1713            raise
1714        except InvalidFileError as e:
1715            if self.config.progress_meter:
1716                progress.done("failed")
1717            raise
1718        except Exception as e:
1719            if self.config.progress_meter:
1720                progress.done("failed")
1721            if retries:
1722                known_error = False
1723                if ((hasattr(e, 'errno') and e.errno
1724                     and e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT))
1725                    or "[Errno 104]" in str(e) or "[Errno 32]" in str(e)
1726                   ) and not isinstance(e, SocketTimeoutException):
1727                    # We have to detect these errors by looking at the error string
1728                    # Connection reset by peer and Broken pipe
1729                    # The server broke the connection early with an error like
1730                    # in a HTTP Expect Continue case even if asked nothing.
1731                    try:
1732                        http_response = conn.c.getresponse()
1733                        response = {}
1734                        response["status"] = http_response.status
1735                        response["reason"] = http_response.reason
1736                        response["headers"] = convertHeaderTupleListToDict(http_response.getheaders())
1737                        response["data"] = http_response.read()
1738                        response["size"] = size_total
1739                        known_error = True
1740                    except Exception:
1741                        error("Cannot retrieve any response status before encountering an EPIPE or ECONNRESET exception")
1742                if not known_error:
1743                    warning("Upload failed: %s (%s)" % (resource['uri'], e))
1744                    warning("Waiting %d sec..." % self._fail_wait(retries))
1745                    time.sleep(self._fail_wait(retries))
1746                    # Connection error -> same throttle value
1747                    return self.send_file(request, stream, labels, buffer, throttle,
1748                                      retries - 1, offset, chunk_size, use_expect_continue)
1749            else:
1750                debug("Giving up on '%s' %s" % (filename, e))
1751                raise S3UploadError("Upload failed for: %s" % resource['uri'])
1752
1753        timestamp_end = time.time()
1754        response["elapsed"] = timestamp_end - timestamp_start
1755        response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
1756
1757        if self.config.progress_meter:
1758            ## Finalising the upload takes some time -> update() progress meter
1759            ## to correct the average speed. Otherwise people will complain that
1760            ## 'progress' and response["speed"] are inconsistent ;-)
1761            progress.update()
1762            progress.done("done")
1763
1764        if response["status"] in [301, 307]:
1765            ## RedirectTemporary or RedirectPermanent
1766            return self._http_redirection_handler(request, response,
1767                                                  self.send_file, request, stream, labels, buffer, offset = offset, chunk_size = chunk_size, use_expect_continue = use_expect_continue)
1768
1769        if response["status"] == 400:
1770            handler_fn = self._http_400_handler(request, response,
1771                                                self.send_file, request, stream, labels, buffer, offset = offset, chunk_size = chunk_size, use_expect_continue = use_expect_continue)
1772            if handler_fn:
1773                return handler_fn
1774            err = S3Error(response)
1775            if err.code not in ['BadDigest', 'OperationAborted',
1776                                'TokenRefreshRequired', 'RequestTimeout']:
1777                raise err
1778            # else the error will be handled later with a retry
1779
1780        if response["status"] == 403:
1781            return self._http_403_handler(request, response,
1782                                          self.send_file, request, stream, labels, buffer, offset = offset, chunk_size = chunk_size, use_expect_continue = use_expect_continue)
1783
1784        if response["status"] == 417 and retries:
1785            # Expect 100-continue not supported by proxy/server
1786            self.expect_continue_not_supported = True
1787            return self.send_file(request, stream, labels, buffer, throttle,
1788                                  retries - 1, offset, chunk_size, use_expect_continue = False)
1789
1790        # S3 from time to time doesn't send ETag back in a response :-(
1791        # Force re-upload here.
1792        if 'etag' not in response['headers']:
1793            response['headers']['etag'] = ''
1794
1795        if response["status"] < 200 or response["status"] > 299:
1796            try_retry = False
1797            if response["status"] >= 500:
1798                # AWS internal error - retry
1799                try_retry = True
1800                if response["status"] == 503:
1801                    ## SlowDown error
1802                    throttle = throttle and throttle * 5 or 0.01
1803            elif response["status"] == 429:
1804                # Not an AWS error, but s3 compatible server possible error:
1805                # TooManyRequests/Busy/slowdown
1806                try_retry = True
1807                throttle = throttle and throttle * 5 or 0.01
1808            elif response["status"] >= 400:
1809                err = S3Error(response)
1810                ## Retriable client error?
1811                if err.code in ['BadDigest', 'OperationAborted', 'TokenRefreshRequired', 'RequestTimeout']:
1812                    try_retry = True
1813
1814            if try_retry:
1815                if retries:
1816                    warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
1817                    if throttle:
1818                        warning("Retrying on lower speed (throttle=%0.2f)" % throttle)
1819                    warning("Waiting %d sec..." % self._fail_wait(retries))
1820                    time.sleep(self._fail_wait(retries))
1821                    return self.send_file(request, stream, labels, buffer, throttle,
1822                                          retries - 1, offset, chunk_size, use_expect_continue)
1823                else:
1824                    warning("Too many failures. Giving up on '%s'" % filename)
1825                    raise S3UploadError("Too many failures. Giving up on '%s'"
1826                                        % filename)
1827
1828            ## Non-recoverable error
1829            raise S3Error(response)
1830
1831        debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"].get('etag', '').strip('"\'')))
1832        ## when using KMS encryption, MD5 etag value will not match
1833        md5_from_s3 = response["headers"].get("etag", "").strip('"\'')
1834        if ('-' not in md5_from_s3) and (md5_from_s3 != md5_hash.hexdigest()) and response["headers"].get("x-amz-server-side-encryption") != 'aws:kms':
1835            warning("MD5 Sums don't match!")
1836            if retries:
1837                warning("Retrying upload of %s" % (filename))
1838                return self.send_file(request, stream, labels, buffer, throttle,
1839                                      retries - 1, offset, chunk_size, use_expect_continue)
1840            else:
1841                warning("Too many failures. Giving up on '%s'" % (filename))
1842                raise S3UploadError("Too many failures. Giving up on '%s'"
1843                                    % filename)
1844
1845        return response
1846
1847    def send_file_multipart(self, stream, headers, uri, size, extra_label=""):
1848        timestamp_start = time.time()
1849        upload = MultiPartUpload(self, stream, uri, headers, size)
1850        upload.upload_all_parts(extra_label)
1851        response = upload.complete_multipart_upload()
1852        timestamp_end = time.time()
1853        response["elapsed"] = timestamp_end - timestamp_start
1854        response["size"] = size
1855        response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
1856        if response["data"] and getRootTagName(response["data"]) == "Error":
1857            #http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html
1858            # Error Complete Multipart UPLOAD, status may be 200
1859            # raise S3UploadError
1860            raise S3UploadError(getTextFromXml(response["data"], 'Message'))
1861        return response
1862
1863    def copy_file_multipart(self, src_uri, dst_uri, size, headers,
1864                            extra_label=""):
1865        return self.send_file_multipart(src_uri, headers, dst_uri, size,
1866                                        extra_label)
1867
1868    def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
1869        self.update_region_inner_request(request)
1870
1871        method_string, resource, headers = request.get_triplet()
1872        filename = stream.stream_name
1873        if self.config.progress_meter:
1874            labels[u'action'] = u'download'
1875            progress = self.config.progress_class(labels, 0)
1876        else:
1877            info("Receiving file '%s', please wait..." % filename)
1878        timestamp_start = time.time()
1879
1880        conn = None
1881        try:
1882            conn = ConnMan.get(self.get_hostname(resource['bucket']))
1883            conn.c.putrequest(method_string, self.format_uri(resource, conn.path))
1884            for header in headers.keys():
1885                conn.c.putheader(encode_to_s3(header), encode_to_s3(headers[header]))
1886            if start_position > 0:
1887                debug("Requesting Range: %d .. end" % start_position)
1888                conn.c.putheader("Range", "bytes=%d-" % start_position)
1889            conn.c.endheaders()
1890            response = {}
1891            http_response = conn.c.getresponse()
1892            response["status"] = http_response.status
1893            response["reason"] = http_response.reason
1894            response["headers"] = convertHeaderTupleListToDict(http_response.getheaders())
1895            if "x-amz-meta-s3cmd-attrs" in response["headers"]:
1896                attrs = parse_attrs_header(response["headers"]["x-amz-meta-s3cmd-attrs"])
1897                response["s3cmd-attrs"] = attrs
1898            debug("Response:\n" + pprint.pformat(response))
1899        except ParameterError as e:
1900            raise
1901        except (IOError, Exception) as e:
1902            if self.config.progress_meter:
1903                progress.done("failed")
1904            if ((hasattr(e, 'errno') and e.errno and
1905                 e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT))
1906                or "[Errno 104]" in str(e) or "[Errno 32]" in str(e)
1907               ) and not isinstance(e, SocketTimeoutException):
1908                raise
1909
1910            # close the connection and re-establish
1911            ConnMan.close(conn)
1912
1913            if retries:
1914                warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
1915                warning("Waiting %d sec..." % self._fail_wait(retries))
1916                time.sleep(self._fail_wait(retries))
1917                # Connection error -> same throttle value
1918                return self.recv_file(request, stream, labels, start_position, retries - 1)
1919            else:
1920                raise S3DownloadError("Download failed for: %s" % resource['uri'])
1921
1922        if response["status"] in [301, 307]:
1923            ## RedirectPermanent or RedirectTemporary
1924            response['data'] = http_response.read()
1925            return self._http_redirection_handler(request, response,
1926                                                  self.recv_file, request,
1927                                                  stream, labels, start_position)
1928
1929        if response["status"] == 400:
1930            response['data'] = http_response.read()
1931            handler_fn = self._http_400_handler(request, response, self.recv_file,
1932                                                request, stream, labels, start_position)
1933            if handler_fn:
1934                return handler_fn
1935            raise S3Error(response)
1936
1937        if response["status"] == 403:
1938            response['data'] = http_response.read()
1939            return self._http_403_handler(request, response, self.recv_file,
1940                                          request, stream, labels, start_position)
1941
1942        if response["status"] == 405: # Method Not Allowed.  Don't retry.
1943            response['data'] = http_response.read()
1944            raise S3Error(response)
1945
1946        if response["status"] < 200 or response["status"] > 299:
1947            response['data'] = http_response.read()
1948            raise S3Error(response)
1949
1950        if start_position == 0:
1951            # Only compute MD5 on the fly if we're downloading from beginning
1952            # Otherwise we'd get a nonsense.
1953            md5_hash = md5()
1954        size_left = int(response["headers"]["content-length"])
1955        size_total = start_position + size_left
1956        current_position = start_position
1957
1958        if self.config.progress_meter:
1959            progress.total_size = size_total
1960            progress.initial_position = current_position
1961            progress.current_position = current_position
1962
1963        try:
1964            # Fix for issue #432. Even when content size is 0, httplib expect the response to be read.
1965            if size_left == 0:
1966                data = http_response.read(1)
1967                # It is not supposed to be some data returned in that case
1968                assert(len(data) == 0)
1969            while (current_position < size_total):
1970                this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
1971
1972                if self.config.limitrate > 0:
1973                    start_time = time.time()
1974
1975                data = http_response.read(this_chunk)
1976                if len(data) == 0:
1977                    raise S3ResponseError("EOF from S3!")
1978
1979                #throttle
1980                if self.config.limitrate > 0:
1981                    real_duration = time.time() - start_time
1982                    expected_duration = float(this_chunk) / self.config.limitrate
1983                    if expected_duration > real_duration:
1984                        time.sleep(expected_duration - real_duration)
1985
1986                stream.write(data)
1987                if start_position == 0:
1988                    md5_hash.update(data)
1989                current_position += len(data)
1990                ## Call progress meter from here...
1991                if self.config.progress_meter:
1992                    progress.update(delta_position = len(data))
1993            ConnMan.put(conn)
1994        except OSError:
1995            raise
1996        except (IOError, Exception) as e:
1997            if self.config.progress_meter:
1998                progress.done("failed")
1999            if ((hasattr(e, 'errno') and e.errno
2000                 and e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT))
2001                or "[Errno 104]" in str(e) or "[Errno 32]" in str(e)
2002               ) and not isinstance(e, SocketTimeoutException):
2003                raise
2004            # close the connection and re-establish
2005            ConnMan.close(conn)
2006
2007            if retries:
2008                warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
2009                warning("Waiting %d sec..." % self._fail_wait(retries))
2010                time.sleep(self._fail_wait(retries))
2011                # Connection error -> same throttle value
2012                return self.recv_file(request, stream, labels, current_position, retries - 1)
2013            else:
2014                raise S3DownloadError("Download failed for: %s" % resource['uri'])
2015
2016        stream.flush()
2017        timestamp_end = time.time()
2018
2019        if self.config.progress_meter:
2020            ## The above stream.flush() may take some time -> update() progress meter
2021            ## to correct the average speed. Otherwise people will complain that
2022            ## 'progress' and response["speed"] are inconsistent ;-)
2023            progress.update()
2024            progress.done("done")
2025
2026        md5_from_s3 = response["headers"].get("etag", "").strip('"\'')
2027        if not 'x-amz-meta-s3tools-gpgenc' in response["headers"]:
2028            # we can't trust our stored md5 because we
2029            # encrypted the file after calculating it but before
2030            # uploading it.
2031            try:
2032                md5_from_s3 = response["s3cmd-attrs"]["md5"]
2033            except KeyError:
2034                pass
2035        # we must have something to compare against to bother with the calculation
2036        if '-' not in md5_from_s3:
2037            if start_position == 0:
2038                # Only compute MD5 on the fly if we were downloading from the beginning
2039                response["md5"] = md5_hash.hexdigest()
2040            else:
2041                # Otherwise try to compute MD5 of the output file
2042                try:
2043                    response["md5"] = hash_file_md5(filename)
2044                except IOError as e:
2045                    if e.errno != errno.ENOENT:
2046                        warning("Unable to open file: %s: %s" % (filename, e))
2047                    warning("Unable to verify MD5. Assume it matches.")
2048
2049        response["md5match"] = response.get("md5") == md5_from_s3
2050        response["elapsed"] = timestamp_end - timestamp_start
2051        response["size"] = current_position
2052        response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
2053        if response["size"] != start_position + int(response["headers"]["content-length"]):
2054            warning("Reported size (%s) does not match received size (%s)" % (
2055                start_position + int(response["headers"]["content-length"]), response["size"]))
2056        debug("ReceiveFile: Computed MD5 = %s" % response.get("md5"))
2057        # avoid ETags from multipart uploads that aren't the real md5
2058        if ('-' not in md5_from_s3 and not response["md5match"]) and (response["headers"].get("x-amz-server-side-encryption") != 'aws:kms'):
2059            warning("MD5 signatures do not match: computed=%s, received=%s" % (
2060                response.get("md5"), md5_from_s3))
2061        return response
2062__all__.append("S3")
2063
2064def parse_attrs_header(attrs_header):
2065    attrs = {}
2066    for attr in attrs_header.split("/"):
2067        key, val = attr.split(":")
2068        attrs[key] = val
2069    return attrs
2070
2071def compute_content_md5(body):
2072    m = md5(encode_to_s3(body))
2073    base64md5 = encodestring(m.digest())
2074    base64md5 = decode_from_s3(base64md5)
2075    if base64md5[-1] == '\n':
2076        base64md5 = base64md5[0:-1]
2077    return decode_from_s3(base64md5)
2078# vim:et:ts=4:sts=4:ai
2079