1# -*- coding: utf-8 -*- 2 3## Amazon S3 manager 4## Author: Michal Ludvig <michal@logix.cz> 5## http://www.logix.cz/michal 6## License: GPL Version 2 7## Copyright: TGRMN Software and contributors 8 9from __future__ import absolute_import, division 10 11import sys 12import os 13import time 14import errno 15import mimetypes 16import io 17import pprint 18from xml.sax import saxutils 19from socket import timeout as SocketTimeoutException 20from logging import debug, info, warning, error 21from stat import ST_SIZE 22try: 23 # python 3 support 24 from urlparse import urlparse 25except ImportError: 26 from urllib.parse import urlparse 27try: 28 # Python 2 support 29 from base64 import encodestring 30except ImportError: 31 # Python 3.9.0+ support 32 from base64 import encodebytes as encodestring 33 34import select 35 36try: 37 from hashlib import md5 38except ImportError: 39 from md5 import md5 40 41from .BaseUtils import (getListFromXml, getTextFromXml, getRootTagName, 42 decode_from_s3, encode_to_s3, s3_quote) 43from .Utils import (convertHeaderTupleListToDict, hash_file_md5, unicodise, 44 deunicodise, check_bucket_name, 45 check_bucket_name_dns_support, getHostnameFromBucket, 46 calculateChecksum) 47from .SortedDict import SortedDict 48from .AccessLog import AccessLog 49from .ACL import ACL, GranteeLogDelivery 50from .BidirMap import BidirMap 51from .Config import Config 52from .Exceptions import * 53from .MultiPart import MultiPartUpload 54from .S3Uri import S3Uri 55from .ConnMan import ConnMan 56from .Crypto import (sign_request_v2, sign_request_v4, checksum_sha256_file, 57 checksum_sha256_buffer, format_param_str) 58 59try: 60 from ctypes import ArgumentError 61 import magic 62 try: 63 ## https://github.com/ahupp/python-magic 64 ## Always expect unicode for python 2 65 ## (has Magic class but no "open()" function) 66 magic_ = magic.Magic(mime=True) 67 def mime_magic_file(file): 68 return magic_.from_file(file) 69 except TypeError: 70 try: 71 ## file-5.11 built-in python bindings 72 ## Sources: http://www.darwinsys.com/file/ 73 ## Expects unicode since version 5.19, encoded strings before 74 ## we can't tell if a given copy of the magic library will take a 75 ## filesystem-encoded string or a unicode value, so try first 76 ## with the unicode, then with the encoded string. 77 ## (has Magic class and "open()" function) 78 magic_ = magic.open(magic.MAGIC_MIME) 79 magic_.load() 80 def mime_magic_file(file): 81 try: 82 return magic_.file(file) 83 except (UnicodeDecodeError, UnicodeEncodeError, ArgumentError): 84 return magic_.file(deunicodise(file)) 85 except AttributeError: 86 ## http://pypi.python.org/pypi/filemagic 87 ## Accept gracefully both unicode and encoded 88 ## (has Magic class but not "mime" argument and no "open()" function ) 89 magic_ = magic.Magic(flags=magic.MAGIC_MIME) 90 def mime_magic_file(file): 91 return magic_.id_filename(file) 92 93 except AttributeError: 94 ## Older python-magic versions doesn't have a "Magic" method 95 ## Only except encoded strings 96 ## (has no Magic class but "open()" function) 97 magic_ = magic.open(magic.MAGIC_MIME) 98 magic_.load() 99 def mime_magic_file(file): 100 return magic_.file(deunicodise(file)) 101 102except (ImportError, OSError) as e: 103 error_str = str(e) 104 if 'magic' in error_str: 105 magic_message = "Module python-magic is not available." 106 else: 107 magic_message = "Module python-magic can't be used (%s)." % error_str 108 magic_message += " Guessing MIME types based on file extensions." 109 magic_warned = False 110 def mime_magic_file(file): 111 global magic_warned 112 if (not magic_warned): 113 warning(magic_message) 114 magic_warned = True 115 return mimetypes.guess_type(file)[0] 116 117def mime_magic(file): 118 ## NOTE: So far in the code, "file" var is already unicode 119 def _mime_magic(file): 120 magictype = mime_magic_file(file) 121 return magictype 122 123 result = _mime_magic(file) 124 if result is not None: 125 if isinstance(result, str): 126 if ';' in result: 127 mimetype, charset = result.split(';') 128 charset = charset[len('charset'):] 129 result = (mimetype, charset) 130 else: 131 result = (result, None) 132 if result is None: 133 result = (None, None) 134 return result 135 136 137EXPECT_CONTINUE_TIMEOUT = 2 138SIZE_1MB = 1024 * 1024 139 140__all__ = [] 141 142class S3Request(object): 143 region_map = {} 144 ## S3 sometimes sends HTTP-301, HTTP-307 response 145 redir_map = {} 146 147 def __init__(self, s3, method_string, resource, headers, body, params = None): 148 self.s3 = s3 149 self.headers = SortedDict(headers or {}, ignore_case = True) 150 if len(self.s3.config.access_token)>0: 151 self.s3.config.role_refresh() 152 self.headers['x-amz-security-token']=self.s3.config.access_token 153 self.resource = resource 154 self.method_string = method_string 155 self.params = params or {} 156 self.body = body 157 self.requester_pays() 158 159 def requester_pays(self): 160 if self.s3.config.requester_pays and self.method_string in ("GET", "POST", "PUT", "HEAD"): 161 self.headers['x-amz-request-payer'] = 'requester' 162 163 def update_timestamp(self): 164 if "date" in self.headers: 165 del(self.headers["date"]) 166 self.headers["x-amz-date"] = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()) 167 168 def use_signature_v2(self): 169 if self.s3.endpoint_requires_signature_v4: 170 return False 171 172 if self.s3.config.signature_v2 or self.s3.fallback_to_signature_v2: 173 return True 174 175 return False 176 177 def sign(self): 178 bucket_name = self.resource.get('bucket') 179 180 if self.use_signature_v2(): 181 debug("Using signature v2") 182 if bucket_name: 183 resource_uri = "/%s%s" % (bucket_name, self.resource['uri']) 184 else: 185 resource_uri = self.resource['uri'] 186 187 self.headers = sign_request_v2(self.method_string, resource_uri, self.params, self.headers) 188 else: 189 debug("Using signature v4") 190 hostname = self.s3.get_hostname(self.resource['bucket']) 191 192 ## Default to bucket part of DNS. 193 ## If bucket is not part of DNS assume path style to complete the request. 194 ## Like for format_uri, take care that redirection could be to base path 195 if bucket_name and ( 196 (bucket_name in S3Request.redir_map 197 and not S3Request.redir_map.get(bucket_name, '').startswith("%s."% bucket_name)) 198 or (bucket_name not in S3Request.redir_map 199 and not check_bucket_name_dns_support(Config().host_bucket, bucket_name)) 200 ): 201 resource_uri = "/%s%s" % (bucket_name, self.resource['uri']) 202 else: 203 resource_uri = self.resource['uri'] 204 205 bucket_region = S3Request.region_map.get(self.resource['bucket'], Config().bucket_location) 206 ## Sign the data. 207 self.headers = sign_request_v4(self.method_string, hostname, resource_uri, self.params, 208 bucket_region, self.headers, self.body) 209 210 def get_triplet(self): 211 self.update_timestamp() 212 self.sign() 213 214 resource = dict(self.resource) ## take a copy 215 216 # URL Encode the uri for the http request 217 resource['uri'] = s3_quote(resource['uri'], quote_backslashes=False, unicode_output=True) 218 # Get the final uri by adding the uri parameters 219 resource['uri'] += format_param_str(self.params) 220 return (self.method_string, resource, self.headers) 221 222class S3(object): 223 http_methods = BidirMap( 224 GET = 0x01, 225 PUT = 0x02, 226 HEAD = 0x04, 227 DELETE = 0x08, 228 POST = 0x10, 229 MASK = 0x1F, 230 ) 231 232 targets = BidirMap( 233 SERVICE = 0x0100, 234 BUCKET = 0x0200, 235 OBJECT = 0x0400, 236 BATCH = 0x0800, 237 MASK = 0x0700, 238 ) 239 240 operations = BidirMap( 241 UNDFINED = 0x0000, 242 LIST_ALL_BUCKETS = targets["SERVICE"] | http_methods["GET"], 243 BUCKET_CREATE = targets["BUCKET"] | http_methods["PUT"], 244 BUCKET_LIST = targets["BUCKET"] | http_methods["GET"], 245 BUCKET_DELETE = targets["BUCKET"] | http_methods["DELETE"], 246 OBJECT_PUT = targets["OBJECT"] | http_methods["PUT"], 247 OBJECT_GET = targets["OBJECT"] | http_methods["GET"], 248 OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"], 249 OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"], 250 OBJECT_POST = targets["OBJECT"] | http_methods["POST"], 251 BATCH_DELETE = targets["BATCH"] | http_methods["POST"], 252 ) 253 254 codes = { 255 "NoSuchBucket" : "Bucket '%s' does not exist", 256 "AccessDenied" : "Access to bucket '%s' was denied", 257 "BucketAlreadyExists" : "Bucket '%s' already exists", 258 } 259 260 ## Maximum attempts of re-issuing failed requests 261 _max_retries = 5 262 263 def __init__(self, config): 264 self.config = config 265 self.fallback_to_signature_v2 = False 266 self.endpoint_requires_signature_v4 = False 267 self.expect_continue_not_supported = False 268 269 def storage_class(self): 270 # Note - you cannot specify GLACIER here 271 # https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html 272 cls = 'STANDARD' 273 if self.config.storage_class != "": 274 return self.config.storage_class 275 if self.config.reduced_redundancy: 276 cls = 'REDUCED_REDUNDANCY' 277 return cls 278 279 def get_hostname(self, bucket): 280 if bucket and bucket in S3Request.redir_map: 281 host = S3Request.redir_map[bucket] 282 elif bucket and check_bucket_name_dns_support(self.config.host_bucket, bucket): 283 host = getHostnameFromBucket(bucket) 284 else: 285 host = self.config.host_base.lower() 286 # The following hack is needed because it looks like that some servers 287 # are not respecting the HTTP spec and so will fail the signature check 288 # if the port is specified in the "Host" header for default ports. 289 # STUPIDIEST THING EVER FOR A SERVER... 290 # See: https://github.com/minio/minio/issues/9169 291 if self.config.use_https: 292 if host.endswith(':443'): 293 host = host[:-4] 294 elif host.endswith(':80'): 295 host = host[:-3] 296 297 debug('get_hostname(%s): %s' % (bucket, host)) 298 return host 299 300 def set_hostname(self, bucket, redir_hostname): 301 S3Request.redir_map[bucket] = redir_hostname.lower() 302 303 def format_uri(self, resource, base_path=None): 304 bucket_name = resource.get('bucket') 305 if bucket_name and ( 306 (bucket_name in S3Request.redir_map 307 and not S3Request.redir_map.get(bucket_name, '').startswith("%s."% bucket_name)) 308 or (bucket_name not in S3Request.redir_map 309 and not check_bucket_name_dns_support(self.config.host_bucket, bucket_name)) 310 ): 311 uri = "/%s%s" % (s3_quote(bucket_name, quote_backslashes=False, 312 unicode_output=True), 313 resource['uri']) 314 else: 315 uri = resource['uri'] 316 if base_path: 317 uri = "%s%s" % (base_path, uri) 318 if self.config.proxy_host != "" and not self.config.use_https: 319 uri = "http://%s%s" % (self.get_hostname(bucket_name), uri) 320 debug('format_uri(): ' + uri) 321 return uri 322 323 ## Commands / Actions 324 def list_all_buckets(self): 325 request = self.create_request("LIST_ALL_BUCKETS") 326 response = self.send_request(request) 327 response["list"] = getListFromXml(response["data"], "Bucket") 328 return response 329 330 def bucket_list(self, bucket, prefix = None, recursive = None, uri_params = None, limit = -1): 331 item_list = [] 332 prefixes = [] 333 for truncated, dirs, objects in self.bucket_list_streaming(bucket, prefix, recursive, uri_params, limit): 334 item_list.extend(objects) 335 prefixes.extend(dirs) 336 337 response = {} 338 response['list'] = item_list 339 response['common_prefixes'] = prefixes 340 response['truncated'] = truncated 341 return response 342 343 def bucket_list_streaming(self, bucket, prefix = None, recursive = None, uri_params = None, limit = -1): 344 """ Generator that produces <dir_list>, <object_list> pairs of groups of content of a specified bucket. """ 345 def _list_truncated(data): 346 ## <IsTruncated> can either be "true" or "false" or be missing completely 347 is_truncated = getTextFromXml(data, ".//IsTruncated") or "false" 348 return is_truncated.lower() != "false" 349 350 def _get_contents(data): 351 return getListFromXml(data, "Contents") 352 353 def _get_common_prefixes(data): 354 return getListFromXml(data, "CommonPrefixes") 355 356 def _get_next_marker(data, current_list): 357 return getTextFromXml(response["data"], "NextMarker") or current_list[-1]["Key"] 358 359 uri_params = uri_params and uri_params.copy() or {} 360 truncated = True 361 prefixes = [] 362 363 num_objects = 0 364 num_prefixes = 0 365 max_keys = limit 366 while truncated: 367 response = self.bucket_list_noparse(bucket, prefix, recursive, 368 uri_params, max_keys) 369 current_list = _get_contents(response["data"]) 370 current_prefixes = _get_common_prefixes(response["data"]) 371 num_objects += len(current_list) 372 num_prefixes += len(current_prefixes) 373 if limit > num_objects + num_prefixes: 374 max_keys = limit - (num_objects + num_prefixes) 375 truncated = _list_truncated(response["data"]) 376 if truncated: 377 if limit == -1 or num_objects + num_prefixes < limit: 378 if current_list: 379 uri_params['marker'] = \ 380 _get_next_marker(response["data"], current_list) 381 elif current_prefixes: 382 uri_params['marker'] = current_prefixes[-1]["Prefix"] 383 else: 384 # Unexpectedly, the server lied, and so the previous 385 # response was not truncated. So, no new key to get. 386 yield False, current_prefixes, current_list 387 break 388 debug("Listing continues after '%s'" % uri_params['marker']) 389 else: 390 yield truncated, current_prefixes, current_list 391 break 392 393 yield truncated, current_prefixes, current_list 394 395 def bucket_list_noparse(self, bucket, prefix = None, recursive = None, uri_params = None, max_keys = -1): 396 if uri_params is None: 397 uri_params = {} 398 if prefix: 399 uri_params['prefix'] = prefix 400 if not self.config.recursive and not recursive: 401 uri_params['delimiter'] = "/" 402 if max_keys != -1: 403 uri_params['max-keys'] = str(max_keys) 404 request = self.create_request("BUCKET_LIST", bucket = bucket, uri_params = uri_params) 405 response = self.send_request(request) 406 #debug(response) 407 return response 408 409 def bucket_create(self, bucket, bucket_location = None, extra_headers = None): 410 headers = SortedDict(ignore_case = True) 411 if extra_headers: 412 headers.update(extra_headers) 413 414 body = "" 415 if bucket_location and bucket_location.strip().upper() != "US" and bucket_location.strip().lower() != "us-east-1": 416 bucket_location = bucket_location.strip() 417 if bucket_location.upper() == "EU": 418 bucket_location = bucket_location.upper() 419 body = "<CreateBucketConfiguration><LocationConstraint>" 420 body += bucket_location 421 body += "</LocationConstraint></CreateBucketConfiguration>" 422 debug("bucket_location: " + body) 423 check_bucket_name(bucket, dns_strict = True) 424 else: 425 check_bucket_name(bucket, dns_strict = False) 426 if self.config.acl_public: 427 headers["x-amz-acl"] = "public-read" 428 429 request = self.create_request("BUCKET_CREATE", bucket = bucket, headers = headers, body = body) 430 response = self.send_request(request) 431 return response 432 433 def bucket_delete(self, bucket): 434 request = self.create_request("BUCKET_DELETE", bucket = bucket) 435 response = self.send_request(request) 436 return response 437 438 def get_bucket_location(self, uri, force_us_default=False): 439 bucket = uri.bucket() 440 request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), 441 uri_params = {'location': None}) 442 443 saved_redir_map = S3Request.redir_map.get(bucket, '') 444 saved_region_map = S3Request.region_map.get(bucket, '') 445 446 try: 447 if force_us_default and not (saved_redir_map and saved_region_map): 448 S3Request.redir_map[bucket] = self.config.host_base 449 S3Request.region_map[bucket] = 'us-east-1' 450 451 response = self.send_request(request) 452 finally: 453 if bucket in saved_redir_map: 454 S3Request.redir_map[bucket] = saved_redir_map 455 elif bucket in S3Request.redir_map: 456 del S3Request.redir_map[bucket] 457 458 if bucket in saved_region_map: 459 S3Request.region_map[bucket] = saved_region_map 460 elif bucket in S3Request.region_map: 461 del S3Request.region_map[bucket] 462 463 464 location = getTextFromXml(response['data'], "LocationConstraint") 465 if not location or location in [ "", "US" ]: 466 location = "us-east-1" 467 elif location == "EU": 468 location = "eu-west-1" 469 return location 470 471 def get_bucket_requester_pays(self, uri): 472 request = self.create_request("BUCKET_LIST", bucket=uri.bucket(), 473 uri_params={'requestPayment': None}) 474 response = self.send_request(request) 475 resp_data = response.get('data', '') 476 if resp_data: 477 payer = getTextFromXml(response['data'], "Payer") 478 else: 479 payer = None 480 return payer 481 482 def bucket_info(self, uri): 483 response = {} 484 response['bucket-location'] = self.get_bucket_location(uri) 485 try: 486 response['requester-pays'] = self.get_bucket_requester_pays(uri) 487 except S3Error as e: 488 response['requester-pays'] = None 489 return response 490 491 def website_info(self, uri, bucket_location = None): 492 bucket = uri.bucket() 493 494 request = self.create_request("BUCKET_LIST", bucket = bucket, 495 uri_params = {'website': None}) 496 try: 497 response = self.send_request(request) 498 response['index_document'] = getTextFromXml(response['data'], ".//IndexDocument//Suffix") 499 response['error_document'] = getTextFromXml(response['data'], ".//ErrorDocument//Key") 500 response['website_endpoint'] = self.config.website_endpoint % { 501 "bucket" : uri.bucket(), 502 "location" : self.get_bucket_location(uri)} 503 return response 504 except S3Error as e: 505 if e.status == 404: 506 debug("Could not get /?website - website probably not configured for this bucket") 507 return None 508 raise 509 510 def website_create(self, uri, bucket_location = None): 511 bucket = uri.bucket() 512 body = '<WebsiteConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">' 513 body += ' <IndexDocument>' 514 body += (' <Suffix>%s</Suffix>' % self.config.website_index) 515 body += ' </IndexDocument>' 516 if self.config.website_error: 517 body += ' <ErrorDocument>' 518 body += (' <Key>%s</Key>' % self.config.website_error) 519 body += ' </ErrorDocument>' 520 body += '</WebsiteConfiguration>' 521 522 request = self.create_request("BUCKET_CREATE", bucket = bucket, body = body, 523 uri_params = {'website': None}) 524 response = self.send_request(request) 525 debug("Received response '%s'" % (response)) 526 527 return response 528 529 def website_delete(self, uri, bucket_location = None): 530 bucket = uri.bucket() 531 532 request = self.create_request("BUCKET_DELETE", bucket = bucket, 533 uri_params = {'website': None}) 534 response = self.send_request(request) 535 debug("Received response '%s'" % (response)) 536 537 if response['status'] != 204: 538 raise S3ResponseError("Expected status 204: %s" % response) 539 540 return response 541 542 def expiration_info(self, uri, bucket_location = None): 543 bucket = uri.bucket() 544 545 request = self.create_request("BUCKET_LIST", bucket=bucket, 546 uri_params={'lifecycle': None}) 547 try: 548 response = self.send_request(request) 549 except S3Error as e: 550 if e.status == 404: 551 debug("Could not get /?lifecycle - lifecycle probably not " 552 "configured for this bucket") 553 return None 554 elif e.status == 501: 555 debug("Could not get /?lifecycle - lifecycle support not " 556 "implemented by the server") 557 return None 558 raise 559 560 root_tag_name = getRootTagName(response['data']) 561 if root_tag_name != "LifecycleConfiguration": 562 debug("Could not get /?lifecycle - unexpected xml response: " 563 "%s", root_tag_name) 564 return None 565 response['prefix'] = getTextFromXml(response['data'], 566 ".//Rule//Prefix") 567 response['date'] = getTextFromXml(response['data'], 568 ".//Rule//Expiration//Date") 569 response['days'] = getTextFromXml(response['data'], 570 ".//Rule//Expiration//Days") 571 return response 572 573 def expiration_set(self, uri, bucket_location = None): 574 if self.config.expiry_date and self.config.expiry_days: 575 raise ParameterError("Expect either --expiry-day or --expiry-date") 576 if not (self.config.expiry_date or self.config.expiry_days): 577 if self.config.expiry_prefix: 578 raise ParameterError("Expect either --expiry-day or --expiry-date") 579 debug("del bucket lifecycle") 580 bucket = uri.bucket() 581 request = self.create_request("BUCKET_DELETE", bucket = bucket, 582 uri_params = {'lifecycle': None}) 583 else: 584 request = self._expiration_set(uri) 585 response = self.send_request(request) 586 debug("Received response '%s'" % (response)) 587 return response 588 589 def _expiration_set(self, uri): 590 debug("put bucket lifecycle") 591 body = '<LifecycleConfiguration>' 592 body += ' <Rule>' 593 body += (' <Prefix>%s</Prefix>' % self.config.expiry_prefix) 594 body += (' <Status>Enabled</Status>') 595 body += (' <Expiration>') 596 if self.config.expiry_date: 597 body += (' <Date>%s</Date>' % self.config.expiry_date) 598 elif self.config.expiry_days: 599 body += (' <Days>%s</Days>' % self.config.expiry_days) 600 body += (' </Expiration>') 601 body += ' </Rule>' 602 body += '</LifecycleConfiguration>' 603 604 headers = SortedDict(ignore_case = True) 605 headers['content-md5'] = compute_content_md5(body) 606 bucket = uri.bucket() 607 request = self.create_request("BUCKET_CREATE", bucket = bucket, 608 headers = headers, body = body, 609 uri_params = {'lifecycle': None}) 610 return (request) 611 612 def _guess_content_type(self, filename): 613 content_type = self.config.default_mime_type 614 content_charset = None 615 616 if filename == "-" and not self.config.default_mime_type: 617 raise ParameterError("You must specify --mime-type or --default-mime-type for files uploaded from stdin.") 618 619 if self.config.guess_mime_type: 620 if self.config.follow_symlinks: 621 filename = unicodise(os.path.realpath(deunicodise(filename))) 622 if self.config.use_mime_magic: 623 (content_type, content_charset) = mime_magic(filename) 624 else: 625 (content_type, content_charset) = mimetypes.guess_type(filename) 626 if not content_type: 627 content_type = self.config.default_mime_type 628 return (content_type, content_charset) 629 630 def stdin_content_type(self): 631 content_type = self.config.mime_type 632 if not content_type: 633 content_type = self.config.default_mime_type 634 635 content_type += "; charset=" + self.config.encoding.upper() 636 return content_type 637 638 def content_type(self, filename=None): 639 # explicit command line argument always wins 640 content_type = self.config.mime_type 641 content_charset = None 642 643 if filename == u'-': 644 return self.stdin_content_type() 645 if not content_type: 646 (content_type, content_charset) = self._guess_content_type(filename) 647 648 ## add charset to content type 649 if not content_charset: 650 content_charset = self.config.encoding.upper() 651 if self.add_encoding(filename, content_type) and content_charset is not None: 652 content_type = content_type + "; charset=" + content_charset 653 654 return content_type 655 656 def add_encoding(self, filename, content_type): 657 if 'charset=' in content_type: 658 return False 659 exts = self.config.add_encoding_exts.split(',') 660 if exts[0]=='': 661 return False 662 parts = filename.rsplit('.',2) 663 if len(parts) < 2: 664 return False 665 ext = parts[1] 666 if ext in exts: 667 return True 668 else: 669 return False 670 671 def object_put(self, filename, uri, extra_headers = None, extra_label = ""): 672 # TODO TODO 673 # Make it consistent with stream-oriented object_get() 674 if uri.type != "s3": 675 raise ValueError("Expected URI type 's3', got '%s'" % uri.type) 676 677 if filename != "-" and not os.path.isfile(deunicodise(filename)): 678 raise InvalidFileError(u"Not a regular file") 679 try: 680 if filename == "-": 681 src_stream = io.open(sys.stdin.fileno(), mode='rb', closefd=False) 682 src_stream.stream_name = u'<stdin>' 683 size = 0 684 else: 685 src_stream = io.open(deunicodise(filename), mode='rb') 686 src_stream.stream_name = filename 687 size = os.stat(deunicodise(filename))[ST_SIZE] 688 except (IOError, OSError) as e: 689 raise InvalidFileError(u"%s" % e.strerror) 690 691 headers = SortedDict(ignore_case = True) 692 if extra_headers: 693 headers.update(extra_headers) 694 695 ## Set server side encryption 696 if self.config.server_side_encryption: 697 headers["x-amz-server-side-encryption"] = "AES256" 698 699 ## Set kms headers 700 if self.config.kms_key: 701 headers['x-amz-server-side-encryption'] = 'aws:kms' 702 headers['x-amz-server-side-encryption-aws-kms-key-id'] = self.config.kms_key 703 704 ## MIME-type handling 705 headers["content-type"] = self.content_type(filename=filename) 706 707 ## Other Amazon S3 attributes 708 if self.config.acl_public: 709 headers["x-amz-acl"] = "public-read" 710 headers["x-amz-storage-class"] = self.storage_class() 711 712 ## Multipart decision 713 multipart = False 714 if not self.config.enable_multipart and filename == "-": 715 raise ParameterError("Multi-part upload is required to upload from stdin") 716 if self.config.enable_multipart: 717 if size > self.config.multipart_chunk_size_mb * SIZE_1MB or filename == "-": 718 multipart = True 719 if size > self.config.multipart_max_chunks * self.config.multipart_chunk_size_mb * SIZE_1MB: 720 raise ParameterError("Chunk size %d MB results in more than %d chunks. Please increase --multipart-chunk-size-mb" % \ 721 (self.config.multipart_chunk_size_mb, self.config.multipart_max_chunks)) 722 if multipart: 723 # Multipart requests are quite different... drop here 724 return self.send_file_multipart(src_stream, headers, uri, size, extra_label) 725 726 ## Not multipart... 727 if self.config.put_continue: 728 # Note, if input was stdin, we would be performing multipart upload. 729 # So this will always work as long as the file already uploaded was 730 # not uploaded via MultiUpload, in which case its ETag will not be 731 # an md5. 732 try: 733 info = self.object_info(uri) 734 except Exception: 735 info = None 736 737 if info is not None: 738 remote_size = int(info['headers']['content-length']) 739 remote_checksum = info['headers']['etag'].strip('"\'') 740 if size == remote_size: 741 checksum = calculateChecksum('', src_stream, 0, size, self.config.send_chunk) 742 if remote_checksum == checksum: 743 warning("Put: size and md5sum match for %s, skipping." % uri) 744 return 745 else: 746 warning("MultiPart: checksum (%s vs %s) does not match for %s, reuploading." 747 % (remote_checksum, checksum, uri)) 748 else: 749 warning("MultiPart: size (%d vs %d) does not match for %s, reuploading." 750 % (remote_size, size, uri)) 751 752 headers["content-length"] = str(size) 753 request = self.create_request("OBJECT_PUT", uri = uri, headers = headers) 754 labels = { 'source' : filename, 'destination' : uri.uri(), 'extra' : extra_label } 755 response = self.send_file(request, src_stream, labels) 756 return response 757 758 def object_get(self, uri, stream, dest_name, start_position = 0, extra_label = ""): 759 if uri.type != "s3": 760 raise ValueError("Expected URI type 's3', got '%s'" % uri.type) 761 request = self.create_request("OBJECT_GET", uri = uri) 762 labels = { 'source' : uri.uri(), 'destination' : dest_name, 'extra' : extra_label } 763 response = self.recv_file(request, stream, labels, start_position) 764 return response 765 766 def object_batch_delete(self, remote_list): 767 """ Batch delete given a remote_list """ 768 uris = [remote_list[item]['object_uri_str'] for item in remote_list] 769 self.object_batch_delete_uri_strs(uris) 770 771 def object_batch_delete_uri_strs(self, uris): 772 """ Batch delete given a list of object uris """ 773 def compose_batch_del_xml(bucket, key_list): 774 body = u"<?xml version=\"1.0\" encoding=\"UTF-8\"?><Delete>" 775 for key in key_list: 776 uri = S3Uri(key) 777 if uri.type != "s3": 778 raise ValueError("Expected URI type 's3', got '%s'" % uri.type) 779 if not uri.has_object(): 780 raise ValueError("URI '%s' has no object" % key) 781 if uri.bucket() != bucket: 782 raise ValueError("The batch should contain keys from the same bucket") 783 object = saxutils.escape(uri.object()) 784 body += u"<Object><Key>%s</Key></Object>" % object 785 body += u"</Delete>" 786 body = encode_to_s3(body) 787 return body 788 789 batch = uris 790 if len(batch) == 0: 791 raise ValueError("Key list is empty") 792 bucket = S3Uri(batch[0]).bucket() 793 request_body = compose_batch_del_xml(bucket, batch) 794 headers = SortedDict({'content-md5': compute_content_md5(request_body), 795 'content-type': 'application/xml'}, ignore_case=True) 796 request = self.create_request("BATCH_DELETE", bucket = bucket, 797 headers = headers, body = request_body, 798 uri_params = {'delete': None}) 799 response = self.send_request(request) 800 return response 801 802 def object_delete(self, uri): 803 if uri.type != "s3": 804 raise ValueError("Expected URI type 's3', got '%s'" % uri.type) 805 request = self.create_request("OBJECT_DELETE", uri = uri) 806 response = self.send_request(request) 807 return response 808 809 def object_restore(self, uri): 810 if uri.type != "s3": 811 raise ValueError("Expected URI type 's3', got '%s'" % uri.type) 812 if self.config.restore_days < 1: 813 raise ParameterError("You must restore a file for 1 or more days") 814 if self.config.restore_priority not in ['Standard', 'Expedited', 'Bulk']: 815 raise ParameterError("Valid restoration priorities: bulk, standard, expedited") 816 body = '<RestoreRequest xmlns="http://s3.amazonaws.com/doc/2006-03-01/">' 817 body += (' <Days>%s</Days>' % self.config.restore_days) 818 body += ' <GlacierJobParameters>' 819 body += (' <Tier>%s</Tier>' % self.config.restore_priority) 820 body += ' </GlacierJobParameters>' 821 body += '</RestoreRequest>' 822 request = self.create_request("OBJECT_POST", uri = uri, body = body, 823 uri_params = {'restore': None}) 824 response = self.send_request(request) 825 debug("Received response '%s'" % (response)) 826 return response 827 828 def _sanitize_headers(self, headers): 829 to_remove = [ 830 # from http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html 831 'date', 832 'content-length', 833 'last-modified', 834 'content-md5', 835 'x-amz-version-id', 836 'x-amz-delete-marker', 837 # other headers returned from object_info() we don't want to send 838 'accept-ranges', 839 'connection', 840 'etag', 841 'server', 842 'x-amz-id-2', 843 'x-amz-request-id', 844 # Other headers that are not copying by a direct copy 845 'x-amz-storage-class', 846 ## We should probably also add server-side encryption headers 847 ] 848 849 for h in to_remove + self.config.remove_headers: 850 if h.lower() in headers: 851 del headers[h.lower()] 852 return headers 853 854 def object_copy(self, src_uri, dst_uri, extra_headers=None, 855 src_size=None, extra_label="", replace_meta=False): 856 """Remote copy an object and eventually set metadata 857 858 Note: A little memo description of the nightmare for performance here: 859 ** FOR AWS, 2 cases: 860 - COPY will copy the metadata of the source to dest, but you can't 861 modify them. Any additional header will be ignored anyway. 862 - REPLACE will set the additional metadata headers that are provided 863 but will not copy any of the source headers. 864 So, to add to existing meta during copy, you have to do an object_info 865 to get original source headers, then modify, then use REPLACE for the 866 copy operation. 867 868 ** For Minio and maybe other implementations: 869 - if additional headers are sent, they will be set to the destination 870 on top of source original meta in all cases COPY and REPLACE. 871 It is a nice behavior except that it is different of the aws one. 872 873 As it was still too easy, there is another catch: 874 In all cases, for multipart copies, metadata data are never copied 875 from the source. 876 """ 877 if src_uri.type != "s3": 878 raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type) 879 if dst_uri.type != "s3": 880 raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type) 881 if self.config.acl_public is None: 882 try: 883 acl = self.get_acl(src_uri) 884 except S3Error as exc: 885 # Ignore the exception and don't fail the copy 886 # if the server doesn't support setting ACLs 887 if exc.status != 501: 888 raise exc 889 acl = None 890 891 multipart = False 892 headers = None 893 894 if extra_headers or self.config.mime_type: 895 # Force replace, that will force getting meta with object_info() 896 replace_meta = True 897 898 if replace_meta: 899 src_info = self.object_info(src_uri) 900 headers = src_info['headers'] 901 src_size = int(headers["content-length"]) 902 903 if self.config.enable_multipart: 904 # Get size of remote source only if multipart is enabled and that no 905 # size info was provided 906 src_headers = headers 907 if src_size is None: 908 src_info = self.object_info(src_uri) 909 src_headers = src_info['headers'] 910 src_size = int(src_headers["content-length"]) 911 912 # If we are over the grand maximum size for a normal copy/modify 913 # (> 5GB) go nuclear and use multipart copy as the only option to 914 # modify an object. 915 # Reason is an aws s3 design bug. See: 916 # https://github.com/aws/aws-sdk-java/issues/367 917 if src_uri is dst_uri: 918 # optimisation in the case of modify 919 threshold = MultiPartUpload.MAX_CHUNK_SIZE_MB * SIZE_1MB 920 else: 921 threshold = self.config.multipart_copy_chunk_size_mb * SIZE_1MB 922 923 if src_size > threshold: 924 # Sadly, s3 has a bad logic as metadata will not be copied for 925 # multipart copy unlike what is done for direct copies. 926 # TODO: Optimize by re-using the object_info request done 927 # earlier earlier at fetch remote stage, and preserve headers. 928 if src_headers is None: 929 src_info = self.object_info(src_uri) 930 src_headers = src_info['headers'] 931 src_size = int(src_headers["content-length"]) 932 headers = src_headers 933 multipart = True 934 935 if headers: 936 self._sanitize_headers(headers) 937 headers = SortedDict(headers, ignore_case=True) 938 else: 939 headers = SortedDict(ignore_case=True) 940 941 # Following meta data are updated even in COPY by aws 942 if self.config.acl_public: 943 headers["x-amz-acl"] = "public-read" 944 945 headers["x-amz-storage-class"] = self.storage_class() 946 947 ## Set server side encryption 948 if self.config.server_side_encryption: 949 headers["x-amz-server-side-encryption"] = "AES256" 950 951 ## Set kms headers 952 if self.config.kms_key: 953 headers['x-amz-server-side-encryption'] = 'aws:kms' 954 headers['x-amz-server-side-encryption-aws-kms-key-id'] = \ 955 self.config.kms_key 956 957 # Following meta data are not updated in simple COPY by aws. 958 if extra_headers: 959 headers.update(extra_headers) 960 961 if self.config.mime_type: 962 headers["content-type"] = self.config.mime_type 963 964 # "COPY" or "REPLACE" 965 if not replace_meta: 966 headers['x-amz-metadata-directive'] = "COPY" 967 else: 968 headers['x-amz-metadata-directive'] = "REPLACE" 969 970 if multipart: 971 # Multipart decision. Only do multipart copy for remote s3 files 972 # bigger than the multipart copy threshold. 973 974 # Multipart requests are quite different... delegate 975 response = self.copy_file_multipart(src_uri, dst_uri, src_size, 976 headers, extra_label) 977 else: 978 # Not multipart... direct request 979 headers['x-amz-copy-source'] = s3_quote( 980 "/%s/%s" % (src_uri.bucket(), src_uri.object()), 981 quote_backslashes=False, unicode_output=True) 982 983 request = self.create_request("OBJECT_PUT", uri=dst_uri, 984 headers=headers) 985 response = self.send_request(request) 986 987 if response["data"] and getRootTagName(response["data"]) == "Error": 988 # http://doc.s3.amazonaws.com/proposals/copy.html 989 # Error during copy, status will be 200, so force error code 500 990 response["status"] = 500 991 error("Server error during the COPY operation. Overwrite response " 992 "status to 500") 993 raise S3Error(response) 994 995 if self.config.acl_public is None and acl: 996 try: 997 self.set_acl(dst_uri, acl) 998 except S3Error as exc: 999 # Ignore the exception and don't fail the copy 1000 # if the server doesn't support setting ACLs 1001 if exc.status != 501: 1002 raise exc 1003 return response 1004 1005 def object_modify(self, src_uri, dst_uri, extra_headers=None, 1006 src_size=None, extra_label=""): 1007 # dst_uri = src_uri Will optimize by using multipart just in worst case 1008 return self.object_copy(src_uri, src_uri, extra_headers, src_size, 1009 extra_label, replace_meta=True) 1010 1011 def object_move(self, src_uri, dst_uri, extra_headers=None, 1012 src_size=None, extra_label=""): 1013 response_copy = self.object_copy(src_uri, dst_uri, extra_headers, 1014 src_size, extra_label) 1015 debug("Object %s copied to %s" % (src_uri, dst_uri)) 1016 if not response_copy["data"] \ 1017 or getRootTagName(response_copy["data"]) \ 1018 in ["CopyObjectResult", "CompleteMultipartUploadResult"]: 1019 self.object_delete(src_uri) 1020 debug("Object '%s' deleted", src_uri) 1021 else: 1022 warning("Object '%s' NOT deleted because of an unexpected " 1023 "response data content.", src_uri) 1024 return response_copy 1025 1026 def object_info(self, uri): 1027 request = self.create_request("OBJECT_HEAD", uri=uri) 1028 response = self.send_request(request) 1029 return response 1030 1031 def get_acl(self, uri): 1032 if uri.has_object(): 1033 request = self.create_request("OBJECT_GET", uri=uri, 1034 uri_params={'acl': None}) 1035 else: 1036 request = self.create_request("BUCKET_LIST", bucket=uri.bucket(), 1037 uri_params={'acl': None}) 1038 1039 response = self.send_request(request) 1040 acl = ACL(response['data']) 1041 return acl 1042 1043 def set_acl(self, uri, acl): 1044 body = u"%s"% acl 1045 debug(u"set_acl(%s): acl-xml: %s" % (uri, body)) 1046 1047 headers = SortedDict({'content-type': 'application/xml'}, ignore_case = True) 1048 if uri.has_object(): 1049 request = self.create_request("OBJECT_PUT", uri = uri, 1050 headers = headers, body = body, 1051 uri_params = {'acl': None}) 1052 else: 1053 request = self.create_request("BUCKET_CREATE", bucket = uri.bucket(), 1054 headers = headers, body = body, 1055 uri_params = {'acl': None}) 1056 1057 response = self.send_request(request) 1058 return response 1059 1060 def get_policy(self, uri): 1061 request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), 1062 uri_params = {'policy': None}) 1063 response = self.send_request(request) 1064 return decode_from_s3(response['data']) 1065 1066 def set_policy(self, uri, policy): 1067 headers = SortedDict(ignore_case = True) 1068 # TODO check policy is proper json string 1069 headers['content-type'] = 'application/json' 1070 request = self.create_request("BUCKET_CREATE", uri = uri, 1071 headers=headers, body = policy, 1072 uri_params = {'policy': None}) 1073 response = self.send_request(request) 1074 return response 1075 1076 def delete_policy(self, uri): 1077 request = self.create_request("BUCKET_DELETE", uri = uri, 1078 uri_params = {'policy': None}) 1079 debug(u"delete_policy(%s)" % uri) 1080 response = self.send_request(request) 1081 return response 1082 1083 def get_cors(self, uri): 1084 request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), 1085 uri_params = {'cors': None}) 1086 response = self.send_request(request) 1087 return decode_from_s3(response['data']) 1088 1089 def set_cors(self, uri, cors): 1090 headers = SortedDict(ignore_case = True) 1091 # TODO check cors is proper json string 1092 headers['content-type'] = 'application/xml' 1093 headers['content-md5'] = compute_content_md5(cors) 1094 request = self.create_request("BUCKET_CREATE", uri = uri, 1095 headers=headers, body = cors, 1096 uri_params = {'cors': None}) 1097 response = self.send_request(request) 1098 return response 1099 1100 def delete_cors(self, uri): 1101 request = self.create_request("BUCKET_DELETE", uri = uri, 1102 uri_params = {'cors': None}) 1103 debug(u"delete_cors(%s)" % uri) 1104 response = self.send_request(request) 1105 return response 1106 1107 def set_lifecycle_policy(self, uri, policy): 1108 headers = SortedDict(ignore_case = True) 1109 headers['content-md5'] = compute_content_md5(policy) 1110 request = self.create_request("BUCKET_CREATE", uri = uri, 1111 headers=headers, body = policy, 1112 uri_params = {'lifecycle': None}) 1113 debug(u"set_lifecycle_policy(%s): policy-xml: %s" % (uri, policy)) 1114 response = self.send_request(request) 1115 return response 1116 1117 def set_payer(self, uri): 1118 headers = SortedDict(ignore_case = True) 1119 headers['content-type'] = 'application/xml' 1120 body = '<RequestPaymentConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">\n' 1121 if self.config.requester_pays: 1122 body += '<Payer>Requester</Payer>\n' 1123 else: 1124 body += '<Payer>BucketOwner</Payer>\n' 1125 body += '</RequestPaymentConfiguration>\n' 1126 request = self.create_request("BUCKET_CREATE", uri = uri, body = body, 1127 uri_params = {'requestPayment': None}) 1128 response = self.send_request(request) 1129 return response 1130 1131 def get_lifecycle_policy(self, uri): 1132 request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), 1133 uri_params = {'lifecycle': None}) 1134 debug(u"get_lifecycle_policy(%s)" % uri) 1135 response = self.send_request(request) 1136 1137 debug(u"%s: Got Lifecycle Policy" % response['status']) 1138 return response 1139 1140 def delete_lifecycle_policy(self, uri): 1141 request = self.create_request("BUCKET_DELETE", uri = uri, 1142 uri_params = {'lifecycle': None}) 1143 debug(u"delete_lifecycle_policy(%s)" % uri) 1144 response = self.send_request(request) 1145 return response 1146 1147 def get_multipart(self, uri, uri_params=None, limit=-1): 1148 upload_list = [] 1149 for truncated, uploads in self.get_multipart_streaming(uri, 1150 uri_params, 1151 limit): 1152 upload_list.extend(uploads) 1153 1154 return upload_list 1155 1156 def get_multipart_streaming(self, uri, uri_params=None, limit=-1): 1157 uri_params = uri_params and uri_params.copy() or {} 1158 bucket = uri.bucket() 1159 1160 truncated = True 1161 num_objects = 0 1162 max_keys = limit 1163 1164 # It is the "uploads: None" in uri_params that will change the 1165 # behavior of bucket_list to return multiparts instead of keys 1166 uri_params['uploads'] = None 1167 while truncated: 1168 response = self.bucket_list_noparse(bucket, recursive=True, 1169 uri_params=uri_params, 1170 max_keys=max_keys) 1171 1172 xml_data = response["data"] 1173 # extract list of info of uploads 1174 upload_list = getListFromXml(xml_data, "Upload") 1175 num_objects += len(upload_list) 1176 if limit > num_objects: 1177 max_keys = limit - num_objects 1178 1179 xml_truncated = getTextFromXml(xml_data, ".//IsTruncated") 1180 if not xml_truncated or xml_truncated.lower() == "false": 1181 truncated = False 1182 1183 if truncated: 1184 if limit == -1 or num_objects < limit: 1185 if upload_list: 1186 next_key = getTextFromXml(xml_data, "NextKeyMarker") 1187 if not next_key: 1188 next_key = upload_list[-1]["Key"] 1189 uri_params['KeyMarker'] = next_key 1190 1191 upload_id_marker = getTextFromXml( 1192 xml_data, "NextUploadIdMarker") 1193 if upload_id_marker: 1194 uri_params['UploadIdMarker'] = upload_id_marker 1195 elif 'UploadIdMarker' in uri_params: 1196 # Clear any pre-existing value 1197 del uri_params['UploadIdMarker'] 1198 else: 1199 # Unexpectedly, the server lied, and so the previous 1200 # response was not truncated. So, no new key to get. 1201 yield False, upload_list 1202 break 1203 debug("Listing continues after '%s'" % 1204 uri_params['KeyMarker']) 1205 else: 1206 yield truncated, upload_list 1207 break 1208 yield truncated, upload_list 1209 1210 def list_multipart(self, uri, upload_id, uri_params=None, limit=-1): 1211 part_list = [] 1212 for truncated, parts in self.list_multipart_streaming(uri, 1213 upload_id, 1214 uri_params, 1215 limit): 1216 part_list.extend(parts) 1217 1218 return part_list 1219 1220 def list_multipart_streaming(self, uri, upload_id, uri_params=None, 1221 limit=-1): 1222 uri_params = uri_params and uri_params.copy() or {} 1223 1224 truncated = True 1225 num_objects = 0 1226 max_parts = limit 1227 1228 while truncated: 1229 response = self.list_multipart_noparse(uri, upload_id, 1230 uri_params, max_parts) 1231 1232 xml_data = response["data"] 1233 # extract list of multipart upload parts 1234 part_list = getListFromXml(xml_data, "Part") 1235 num_objects += len(part_list) 1236 if limit > num_objects: 1237 max_parts = limit - num_objects 1238 1239 xml_truncated = getTextFromXml(xml_data, ".//IsTruncated") 1240 if not xml_truncated or xml_truncated.lower() == "false": 1241 truncated = False 1242 1243 if truncated: 1244 if limit == -1 or num_objects < limit: 1245 if part_list: 1246 next_part_number = getTextFromXml( 1247 xml_data, "NextPartNumberMarker") 1248 if not next_part_number: 1249 next_part_number = part_list[-1]["PartNumber"] 1250 uri_params['part-number-marker'] = next_part_number 1251 else: 1252 # Unexpectedly, the server lied, and so the previous 1253 # response was not truncated. So, no new part to get. 1254 yield False, part_list 1255 break 1256 debug("Listing continues after Part '%s'" % 1257 uri_params['part-number-marker']) 1258 else: 1259 yield truncated, part_list 1260 break 1261 yield truncated, part_list 1262 1263 def list_multipart_noparse(self, uri, upload_id, uri_params=None, 1264 max_parts=-1): 1265 if uri_params is None: 1266 uri_params = {} 1267 if max_parts != -1: 1268 uri_params['max-parts'] = str(max_parts) 1269 uri_params['uploadId'] = upload_id 1270 request = self.create_request("OBJECT_GET", uri=uri, 1271 uri_params=uri_params) 1272 response = self.send_request(request) 1273 return response 1274 1275 def abort_multipart(self, uri, id): 1276 request = self.create_request("OBJECT_DELETE", uri = uri, 1277 uri_params = {'uploadId': id}) 1278 response = self.send_request(request) 1279 return response 1280 1281 def get_accesslog(self, uri): 1282 request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), 1283 uri_params = {'logging': None}) 1284 response = self.send_request(request) 1285 accesslog = AccessLog(response['data']) 1286 return accesslog 1287 1288 def set_accesslog_acl(self, uri): 1289 acl = self.get_acl(uri) 1290 debug("Current ACL(%s): %s" % (uri.uri(), acl)) 1291 acl.appendGrantee(GranteeLogDelivery("READ_ACP")) 1292 acl.appendGrantee(GranteeLogDelivery("WRITE")) 1293 debug("Updated ACL(%s): %s" % (uri.uri(), acl)) 1294 self.set_acl(uri, acl) 1295 1296 def set_accesslog(self, uri, enable, log_target_prefix_uri = None, acl_public = False): 1297 accesslog = AccessLog() 1298 if enable: 1299 accesslog.enableLogging(log_target_prefix_uri) 1300 accesslog.setAclPublic(acl_public) 1301 else: 1302 accesslog.disableLogging() 1303 1304 body = "%s" % accesslog 1305 debug(u"set_accesslog(%s): accesslog-xml: %s" % (uri, body)) 1306 1307 request = self.create_request("BUCKET_CREATE", bucket = uri.bucket(), 1308 body = body, uri_params = {'logging': None}) 1309 try: 1310 response = self.send_request(request) 1311 except S3Error as e: 1312 if e.info['Code'] == "InvalidTargetBucketForLogging": 1313 info("Setting up log-delivery ACL for target bucket.") 1314 self.set_accesslog_acl(S3Uri(u"s3://%s" % log_target_prefix_uri.bucket())) 1315 response = self.send_request(request) 1316 else: 1317 raise 1318 return accesslog, response 1319 1320 def create_request(self, operation, uri = None, bucket = None, object = None, headers = None, body = "", uri_params = None): 1321 resource = { 'bucket' : None, 'uri' : "/" } 1322 1323 if uri and (bucket or object): 1324 raise ValueError("Both 'uri' and either 'bucket' or 'object' parameters supplied") 1325 ## If URI is given use that instead of bucket/object parameters 1326 if uri: 1327 bucket = uri.bucket() 1328 object = uri.has_object() and uri.object() or None 1329 1330 if bucket: 1331 resource['bucket'] = bucket 1332 if object: 1333 resource['uri'] = "/" + object 1334 1335 method_string = S3.http_methods.getkey(S3.operations[operation] & S3.http_methods["MASK"]) 1336 1337 request = S3Request(self, method_string, resource, headers, body, uri_params) 1338 1339 debug("CreateRequest: resource[uri]=%s", resource['uri']) 1340 return request 1341 1342 def _fail_wait(self, retries): 1343 # Wait a few seconds. The more it fails the more we wait. 1344 return (self._max_retries - retries + 1) * 3 1345 1346 def _http_redirection_handler(self, request, response, fn, *args, **kwargs): 1347 # Region info might already be available through the x-amz-bucket-region header 1348 redir_region = response['headers'].get('x-amz-bucket-region') 1349 1350 if 'data' in response and len(response['data']) > 0: 1351 redir_bucket = getTextFromXml(response['data'], ".//Bucket") 1352 redir_hostname = getTextFromXml(response['data'], ".//Endpoint") 1353 self.set_hostname(redir_bucket, redir_hostname) 1354 info(u'Redirected to: %s', redir_hostname) 1355 if redir_region: 1356 S3Request.region_map[redir_bucket] = redir_region 1357 info(u'Redirected to region: %s', redir_region) 1358 return fn(*args, **kwargs) 1359 elif request.method_string == 'HEAD': 1360 # Head is a special case, redirection info usually are in the body 1361 # but there is no body for an HEAD request. 1362 location_url = response['headers'].get('location') 1363 if location_url: 1364 # Sometimes a "location" http header could be available and 1365 # can help us deduce the redirection path. 1366 # It is the case of "dns-style" syntax, but not for "path-style" syntax. 1367 if location_url.startswith("http://"): 1368 location_url = location_url[7:] 1369 elif location_url.startswith("https://"): 1370 location_url = location_url[8:] 1371 location_url = urlparse('https://' + location_url).hostname 1372 redir_bucket = request.resource['bucket'] 1373 self.set_hostname(redir_bucket, location_url) 1374 info(u'Redirected to: %s', location_url) 1375 if redir_region: 1376 S3Request.region_map[redir_bucket] = redir_region 1377 info(u'Redirected to region: %s', redir_region) 1378 return fn(*args, **kwargs) 1379 warning(u'Redirection error: No info provided by the server to where should be forwarded the request (HEAD request). (Hint target region: %s)', redir_region) 1380 1381 raise S3Error(response) 1382 1383 def _http_400_handler(self, request, response, fn, *args, **kwargs): 1384 """ 1385 Returns None if no handler available for the specific error code 1386 """ 1387 # AWS response AuthorizationHeaderMalformed means we sent the request to the wrong region 1388 # get the right region out of the response and send it there. 1389 if 'data' in response and len(response['data']) > 0: 1390 failureCode = getTextFromXml(response['data'], 'Code') 1391 if failureCode == 'AuthorizationHeaderMalformed': 1392 # we sent the request to the wrong region 1393 region = getTextFromXml(response['data'], 'Region') 1394 if region is not None: 1395 S3Request.region_map[request.resource['bucket']] = region 1396 info('Forwarding request to %s', region) 1397 return fn(*args, **kwargs) 1398 else: 1399 warning(u'Could not determine bucket the location. Please consider using the --region parameter.') 1400 1401 elif failureCode == 'InvalidRequest': 1402 message = getTextFromXml(response['data'], 'Message') 1403 if message == 'The authorization mechanism you have provided is not supported. Please use AWS4-HMAC-SHA256.': 1404 debug(u'Endpoint requires signature v4') 1405 self.endpoint_requires_signature_v4 = True 1406 return fn(*args, **kwargs) 1407 1408 elif failureCode == 'InvalidArgument': 1409 # returned by DreamObjects on send_request and send_file, 1410 # which doesn't support signature v4. Retry with signature v2 1411 if not request.use_signature_v2() and not self.fallback_to_signature_v2: # have not tried with v2 yet 1412 debug(u'Falling back to signature v2') 1413 self.fallback_to_signature_v2 = True 1414 return fn(*args, **kwargs) 1415 else: 1416 # returned by DreamObjects on recv_file, which doesn't support signature v4. Retry with signature v2 1417 if not request.use_signature_v2() and not self.fallback_to_signature_v2: 1418 # have not tried with v2 yet 1419 debug(u'Falling back to signature v2') 1420 self.fallback_to_signature_v2 = True 1421 return fn(*args, **kwargs) 1422 1423 return None 1424 1425 def _http_403_handler(self, request, response, fn, *args, **kwargs): 1426 if 'data' in response and len(response['data']) > 0: 1427 failureCode = getTextFromXml(response['data'], 'Code') 1428 if failureCode == 'AccessDenied': 1429 # traditional HTTP 403 1430 message = getTextFromXml(response['data'], 'Message') 1431 if message == 'AWS authentication requires a valid Date or x-amz-date header': # message from an Eucalyptus walrus server 1432 if not request.use_signature_v2() and not self.fallback_to_signature_v2: # have not tried with v2 yet 1433 debug(u'Falling back to signature v2') 1434 self.fallback_to_signature_v2 = True 1435 return fn(*args, **kwargs) 1436 1437 raise S3Error(response) 1438 1439 def update_region_inner_request(self, request): 1440 """Get and update region for the request if needed. 1441 1442 Signature v4 needs the region of the bucket or the request will fail 1443 with the indication of the correct region. 1444 We are trying to avoid this failure by pre-emptively getting the 1445 correct region to use, if not provided by the user. 1446 """ 1447 if request.resource.get('bucket') and not request.use_signature_v2() \ 1448 and S3Request.region_map.get( 1449 request.resource['bucket'], Config().bucket_location 1450 ) == "US": 1451 debug("===== SEND Inner request to determine the bucket region " 1452 "=====") 1453 try: 1454 s3_uri = S3Uri(u's3://' + request.resource['bucket']) 1455 # "force_us_default" should prevent infinite recursivity because 1456 # it will set the region_map dict. 1457 region = self.get_bucket_location(s3_uri, force_us_default=True) 1458 if region is not None: 1459 S3Request.region_map[request.resource['bucket']] = region 1460 debug("===== SUCCESS Inner request to determine the bucket " 1461 "region (%r) =====", region) 1462 except Exception as exc: 1463 # Ignore errors, it is just an optimisation, so nothing critical 1464 debug("getlocation inner request failure reason: %s", exc) 1465 debug("===== FAILED Inner request to determine the bucket " 1466 "region =====") 1467 1468 def send_request(self, request, retries = _max_retries): 1469 self.update_region_inner_request(request) 1470 1471 request.body = encode_to_s3(request.body) 1472 headers = request.headers 1473 1474 method_string, resource, headers = request.get_triplet() 1475 response = {} 1476 debug("Processing request, please wait...") 1477 1478 conn = None 1479 try: 1480 conn = ConnMan.get(self.get_hostname(resource['bucket'])) 1481 # TODO: Check what was supposed to be the usage of conn.path here 1482 # Currently this is always "None" all the time as not defined in ConnMan 1483 uri = self.format_uri(resource, conn.path) 1484 debug("Sending request method_string=%r, uri=%r, headers=%r, body=(%i bytes)" % (method_string, uri, headers, len(request.body or ""))) 1485 conn.c.request(method_string, uri, request.body, headers) 1486 http_response = conn.c.getresponse() 1487 response["status"] = http_response.status 1488 response["reason"] = http_response.reason 1489 response["headers"] = convertHeaderTupleListToDict(http_response.getheaders()) 1490 response["data"] = http_response.read() 1491 if "x-amz-meta-s3cmd-attrs" in response["headers"]: 1492 attrs = parse_attrs_header(response["headers"]["x-amz-meta-s3cmd-attrs"]) 1493 response["s3cmd-attrs"] = attrs 1494 ConnMan.put(conn) 1495 except (S3SSLError, S3SSLCertificateError): 1496 # In case of failure to validate the certificate for a ssl 1497 # connection,no need to retry, abort immediately 1498 raise 1499 except (IOError, Exception) as e: 1500 debug("Response:\n" + pprint.pformat(response)) 1501 if ((hasattr(e, 'errno') and e.errno 1502 and e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT)) 1503 or "[Errno 104]" in str(e) 1504 or "[Errno 32]" in str(e) 1505 ) and not isinstance(e, SocketTimeoutException): 1506 raise 1507 # When the connection is broken, BadStatusLine is raised with py2 1508 # and RemoteDisconnected is raised by py3 with a trap: 1509 # RemoteDisconnected has an errno field with a None value. 1510 1511 # close the connection and re-establish 1512 ConnMan.close(conn) 1513 if retries: 1514 warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) 1515 warning("Waiting %d sec..." % self._fail_wait(retries)) 1516 time.sleep(self._fail_wait(retries)) 1517 return self.send_request(request, retries - 1) 1518 else: 1519 raise S3RequestError("Request failed for: %s" % resource['uri']) 1520 1521 except: 1522 # Only KeyboardInterrupt and SystemExit will not be covered by Exception 1523 debug("Response:\n" + pprint.pformat(response)) 1524 raise 1525 1526 debug("Response:\n" + pprint.pformat(response)) 1527 1528 if response["status"] in [301, 307]: 1529 ## RedirectTemporary or RedirectPermanent 1530 return self._http_redirection_handler(request, response, self.send_request, request) 1531 1532 if response["status"] == 400: 1533 handler_fn = self._http_400_handler(request, response, self.send_request, request) 1534 if handler_fn: 1535 return handler_fn 1536 err = S3Error(response) 1537 if retries and err.code in ['BadDigest', 'OperationAborted', 1538 'TokenRefreshRequired', 'RequestTimeout']: 1539 warning(u"Retrying failed request: %s (%s)" % (resource['uri'], err)) 1540 warning("Waiting %d sec..." % self._fail_wait(retries)) 1541 time.sleep(self._fail_wait(retries)) 1542 return self.send_request(request, retries - 1) 1543 raise err 1544 1545 if response["status"] == 403: 1546 return self._http_403_handler(request, response, self.send_request, request) 1547 if response["status"] == 405: # Method Not Allowed. Don't retry. 1548 raise S3Error(response) 1549 1550 if response["status"] >= 500 or response["status"] == 429: 1551 e = S3Error(response) 1552 1553 if response["status"] == 501: 1554 ## NotImplemented server error - no need to retry 1555 retries = 0 1556 1557 if retries: 1558 warning(u"Retrying failed request: %s (%s)" % (resource['uri'], e)) 1559 warning("Waiting %d sec..." % self._fail_wait(retries)) 1560 time.sleep(self._fail_wait(retries)) 1561 return self.send_request(request, retries - 1) 1562 else: 1563 raise e 1564 1565 if response["status"] < 200 or response["status"] > 299: 1566 raise S3Error(response) 1567 1568 return response 1569 1570 def send_request_with_progress(self, request, labels, operation_size=0): 1571 """Wrapper around send_request for slow requests. 1572 1573 To be able to show progression for small requests 1574 """ 1575 if not self.config.progress_meter: 1576 info("Sending slow request, please wait...") 1577 return self.send_request(request) 1578 1579 if 'action' not in labels: 1580 labels[u'action'] = u'request' 1581 progress = self.config.progress_class(labels, operation_size) 1582 1583 try: 1584 response = self.send_request(request) 1585 except Exception as exc: 1586 progress.done("failed") 1587 raise 1588 1589 progress.update(current_position=operation_size) 1590 progress.done("done") 1591 1592 return response 1593 1594 def send_file(self, request, stream, labels, buffer = '', throttle = 0, 1595 retries = _max_retries, offset = 0, chunk_size = -1, 1596 use_expect_continue = None): 1597 self.update_region_inner_request(request) 1598 1599 if use_expect_continue is None: 1600 use_expect_continue = self.config.use_http_expect 1601 if self.expect_continue_not_supported and use_expect_continue: 1602 use_expect_continue = False 1603 1604 headers = request.headers 1605 1606 size_left = size_total = int(headers["content-length"]) 1607 1608 filename = stream.stream_name 1609 if self.config.progress_meter: 1610 labels[u'action'] = u'upload' 1611 progress = self.config.progress_class(labels, size_total) 1612 else: 1613 info("Sending file '%s', please wait..." % filename) 1614 timestamp_start = time.time() 1615 1616 if buffer: 1617 sha256_hash = checksum_sha256_buffer(buffer, offset, size_total) 1618 else: 1619 sha256_hash = checksum_sha256_file(filename, offset, size_total) 1620 request.body = sha256_hash 1621 1622 if use_expect_continue: 1623 if not size_total: 1624 use_expect_continue = False 1625 else: 1626 headers['expect'] = '100-continue' 1627 1628 method_string, resource, headers = request.get_triplet() 1629 try: 1630 conn = ConnMan.get(self.get_hostname(resource['bucket'])) 1631 conn.c.putrequest(method_string, self.format_uri(resource, conn.path)) 1632 for header in headers.keys(): 1633 conn.c.putheader(encode_to_s3(header), encode_to_s3(headers[header])) 1634 conn.c.endheaders() 1635 except ParameterError as e: 1636 raise 1637 except Exception as e: 1638 if self.config.progress_meter: 1639 progress.done("failed") 1640 if retries: 1641 warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) 1642 warning("Waiting %d sec..." % self._fail_wait(retries)) 1643 time.sleep(self._fail_wait(retries)) 1644 # Connection error -> same throttle value 1645 return self.send_file(request, stream, labels, buffer, throttle, retries - 1, offset, chunk_size) 1646 else: 1647 raise S3UploadError("Upload failed for: %s" % resource['uri']) 1648 if buffer == '': 1649 stream.seek(offset) 1650 md5_hash = md5() 1651 1652 try: 1653 http_response = None 1654 if use_expect_continue: 1655 # Wait for the 100-Continue before sending the content 1656 readable, writable, exceptional = select.select([conn.c.sock],[], [], EXPECT_CONTINUE_TIMEOUT) 1657 if readable: 1658 # 100-CONTINUE STATUS RECEIVED, get it before continuing. 1659 http_response = conn.c.getresponse() 1660 elif not writable and not exceptional: 1661 warning("HTTP Expect Continue feature disabled because of no reply of the server in %.2fs.", EXPECT_CONTINUE_TIMEOUT) 1662 self.expect_continue_not_supported = True 1663 use_expect_continue = False 1664 1665 if not use_expect_continue or (http_response and http_response.status == ConnMan.CONTINUE): 1666 if http_response: 1667 # CONTINUE case. Reset the response 1668 http_response.read() 1669 conn.c._HTTPConnection__state = ConnMan._CS_REQ_SENT 1670 1671 while size_left > 0: 1672 #debug("SendFile: Reading up to %d bytes from '%s' - remaining bytes: %s" % (self.config.send_chunk, filename, size_left)) 1673 l = min(self.config.send_chunk, size_left) 1674 if buffer == '': 1675 data = stream.read(l) 1676 else: 1677 data = buffer 1678 1679 if not data: 1680 raise InvalidFileError("File smaller than expected. Was the file truncated?") 1681 1682 if self.config.limitrate > 0: 1683 start_time = time.time() 1684 1685 md5_hash.update(data) 1686 1687 conn.c.wrapper_send_body(data) 1688 if self.config.progress_meter: 1689 progress.update(delta_position = len(data)) 1690 size_left -= len(data) 1691 1692 #throttle 1693 limitrate_throttle = throttle 1694 if self.config.limitrate > 0: 1695 real_duration = time.time() - start_time 1696 expected_duration = float(l) / self.config.limitrate 1697 limitrate_throttle = max(expected_duration - real_duration, limitrate_throttle) 1698 if limitrate_throttle: 1699 time.sleep(min(limitrate_throttle, self.config.throttle_max)) 1700 1701 md5_computed = md5_hash.hexdigest() 1702 http_response = conn.c.getresponse() 1703 1704 response = {} 1705 response["status"] = http_response.status 1706 response["reason"] = http_response.reason 1707 response["headers"] = convertHeaderTupleListToDict(http_response.getheaders()) 1708 response["data"] = http_response.read() 1709 response["size"] = size_total 1710 ConnMan.put(conn) 1711 debug(u"Response:\n" + pprint.pformat(response)) 1712 except ParameterError as e: 1713 raise 1714 except InvalidFileError as e: 1715 if self.config.progress_meter: 1716 progress.done("failed") 1717 raise 1718 except Exception as e: 1719 if self.config.progress_meter: 1720 progress.done("failed") 1721 if retries: 1722 known_error = False 1723 if ((hasattr(e, 'errno') and e.errno 1724 and e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT)) 1725 or "[Errno 104]" in str(e) or "[Errno 32]" in str(e) 1726 ) and not isinstance(e, SocketTimeoutException): 1727 # We have to detect these errors by looking at the error string 1728 # Connection reset by peer and Broken pipe 1729 # The server broke the connection early with an error like 1730 # in a HTTP Expect Continue case even if asked nothing. 1731 try: 1732 http_response = conn.c.getresponse() 1733 response = {} 1734 response["status"] = http_response.status 1735 response["reason"] = http_response.reason 1736 response["headers"] = convertHeaderTupleListToDict(http_response.getheaders()) 1737 response["data"] = http_response.read() 1738 response["size"] = size_total 1739 known_error = True 1740 except Exception: 1741 error("Cannot retrieve any response status before encountering an EPIPE or ECONNRESET exception") 1742 if not known_error: 1743 warning("Upload failed: %s (%s)" % (resource['uri'], e)) 1744 warning("Waiting %d sec..." % self._fail_wait(retries)) 1745 time.sleep(self._fail_wait(retries)) 1746 # Connection error -> same throttle value 1747 return self.send_file(request, stream, labels, buffer, throttle, 1748 retries - 1, offset, chunk_size, use_expect_continue) 1749 else: 1750 debug("Giving up on '%s' %s" % (filename, e)) 1751 raise S3UploadError("Upload failed for: %s" % resource['uri']) 1752 1753 timestamp_end = time.time() 1754 response["elapsed"] = timestamp_end - timestamp_start 1755 response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) 1756 1757 if self.config.progress_meter: 1758 ## Finalising the upload takes some time -> update() progress meter 1759 ## to correct the average speed. Otherwise people will complain that 1760 ## 'progress' and response["speed"] are inconsistent ;-) 1761 progress.update() 1762 progress.done("done") 1763 1764 if response["status"] in [301, 307]: 1765 ## RedirectTemporary or RedirectPermanent 1766 return self._http_redirection_handler(request, response, 1767 self.send_file, request, stream, labels, buffer, offset = offset, chunk_size = chunk_size, use_expect_continue = use_expect_continue) 1768 1769 if response["status"] == 400: 1770 handler_fn = self._http_400_handler(request, response, 1771 self.send_file, request, stream, labels, buffer, offset = offset, chunk_size = chunk_size, use_expect_continue = use_expect_continue) 1772 if handler_fn: 1773 return handler_fn 1774 err = S3Error(response) 1775 if err.code not in ['BadDigest', 'OperationAborted', 1776 'TokenRefreshRequired', 'RequestTimeout']: 1777 raise err 1778 # else the error will be handled later with a retry 1779 1780 if response["status"] == 403: 1781 return self._http_403_handler(request, response, 1782 self.send_file, request, stream, labels, buffer, offset = offset, chunk_size = chunk_size, use_expect_continue = use_expect_continue) 1783 1784 if response["status"] == 417 and retries: 1785 # Expect 100-continue not supported by proxy/server 1786 self.expect_continue_not_supported = True 1787 return self.send_file(request, stream, labels, buffer, throttle, 1788 retries - 1, offset, chunk_size, use_expect_continue = False) 1789 1790 # S3 from time to time doesn't send ETag back in a response :-( 1791 # Force re-upload here. 1792 if 'etag' not in response['headers']: 1793 response['headers']['etag'] = '' 1794 1795 if response["status"] < 200 or response["status"] > 299: 1796 try_retry = False 1797 if response["status"] >= 500: 1798 # AWS internal error - retry 1799 try_retry = True 1800 if response["status"] == 503: 1801 ## SlowDown error 1802 throttle = throttle and throttle * 5 or 0.01 1803 elif response["status"] == 429: 1804 # Not an AWS error, but s3 compatible server possible error: 1805 # TooManyRequests/Busy/slowdown 1806 try_retry = True 1807 throttle = throttle and throttle * 5 or 0.01 1808 elif response["status"] >= 400: 1809 err = S3Error(response) 1810 ## Retriable client error? 1811 if err.code in ['BadDigest', 'OperationAborted', 'TokenRefreshRequired', 'RequestTimeout']: 1812 try_retry = True 1813 1814 if try_retry: 1815 if retries: 1816 warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response))) 1817 if throttle: 1818 warning("Retrying on lower speed (throttle=%0.2f)" % throttle) 1819 warning("Waiting %d sec..." % self._fail_wait(retries)) 1820 time.sleep(self._fail_wait(retries)) 1821 return self.send_file(request, stream, labels, buffer, throttle, 1822 retries - 1, offset, chunk_size, use_expect_continue) 1823 else: 1824 warning("Too many failures. Giving up on '%s'" % filename) 1825 raise S3UploadError("Too many failures. Giving up on '%s'" 1826 % filename) 1827 1828 ## Non-recoverable error 1829 raise S3Error(response) 1830 1831 debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"].get('etag', '').strip('"\''))) 1832 ## when using KMS encryption, MD5 etag value will not match 1833 md5_from_s3 = response["headers"].get("etag", "").strip('"\'') 1834 if ('-' not in md5_from_s3) and (md5_from_s3 != md5_hash.hexdigest()) and response["headers"].get("x-amz-server-side-encryption") != 'aws:kms': 1835 warning("MD5 Sums don't match!") 1836 if retries: 1837 warning("Retrying upload of %s" % (filename)) 1838 return self.send_file(request, stream, labels, buffer, throttle, 1839 retries - 1, offset, chunk_size, use_expect_continue) 1840 else: 1841 warning("Too many failures. Giving up on '%s'" % (filename)) 1842 raise S3UploadError("Too many failures. Giving up on '%s'" 1843 % filename) 1844 1845 return response 1846 1847 def send_file_multipart(self, stream, headers, uri, size, extra_label=""): 1848 timestamp_start = time.time() 1849 upload = MultiPartUpload(self, stream, uri, headers, size) 1850 upload.upload_all_parts(extra_label) 1851 response = upload.complete_multipart_upload() 1852 timestamp_end = time.time() 1853 response["elapsed"] = timestamp_end - timestamp_start 1854 response["size"] = size 1855 response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) 1856 if response["data"] and getRootTagName(response["data"]) == "Error": 1857 #http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html 1858 # Error Complete Multipart UPLOAD, status may be 200 1859 # raise S3UploadError 1860 raise S3UploadError(getTextFromXml(response["data"], 'Message')) 1861 return response 1862 1863 def copy_file_multipart(self, src_uri, dst_uri, size, headers, 1864 extra_label=""): 1865 return self.send_file_multipart(src_uri, headers, dst_uri, size, 1866 extra_label) 1867 1868 def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries): 1869 self.update_region_inner_request(request) 1870 1871 method_string, resource, headers = request.get_triplet() 1872 filename = stream.stream_name 1873 if self.config.progress_meter: 1874 labels[u'action'] = u'download' 1875 progress = self.config.progress_class(labels, 0) 1876 else: 1877 info("Receiving file '%s', please wait..." % filename) 1878 timestamp_start = time.time() 1879 1880 conn = None 1881 try: 1882 conn = ConnMan.get(self.get_hostname(resource['bucket'])) 1883 conn.c.putrequest(method_string, self.format_uri(resource, conn.path)) 1884 for header in headers.keys(): 1885 conn.c.putheader(encode_to_s3(header), encode_to_s3(headers[header])) 1886 if start_position > 0: 1887 debug("Requesting Range: %d .. end" % start_position) 1888 conn.c.putheader("Range", "bytes=%d-" % start_position) 1889 conn.c.endheaders() 1890 response = {} 1891 http_response = conn.c.getresponse() 1892 response["status"] = http_response.status 1893 response["reason"] = http_response.reason 1894 response["headers"] = convertHeaderTupleListToDict(http_response.getheaders()) 1895 if "x-amz-meta-s3cmd-attrs" in response["headers"]: 1896 attrs = parse_attrs_header(response["headers"]["x-amz-meta-s3cmd-attrs"]) 1897 response["s3cmd-attrs"] = attrs 1898 debug("Response:\n" + pprint.pformat(response)) 1899 except ParameterError as e: 1900 raise 1901 except (IOError, Exception) as e: 1902 if self.config.progress_meter: 1903 progress.done("failed") 1904 if ((hasattr(e, 'errno') and e.errno and 1905 e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT)) 1906 or "[Errno 104]" in str(e) or "[Errno 32]" in str(e) 1907 ) and not isinstance(e, SocketTimeoutException): 1908 raise 1909 1910 # close the connection and re-establish 1911 ConnMan.close(conn) 1912 1913 if retries: 1914 warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) 1915 warning("Waiting %d sec..." % self._fail_wait(retries)) 1916 time.sleep(self._fail_wait(retries)) 1917 # Connection error -> same throttle value 1918 return self.recv_file(request, stream, labels, start_position, retries - 1) 1919 else: 1920 raise S3DownloadError("Download failed for: %s" % resource['uri']) 1921 1922 if response["status"] in [301, 307]: 1923 ## RedirectPermanent or RedirectTemporary 1924 response['data'] = http_response.read() 1925 return self._http_redirection_handler(request, response, 1926 self.recv_file, request, 1927 stream, labels, start_position) 1928 1929 if response["status"] == 400: 1930 response['data'] = http_response.read() 1931 handler_fn = self._http_400_handler(request, response, self.recv_file, 1932 request, stream, labels, start_position) 1933 if handler_fn: 1934 return handler_fn 1935 raise S3Error(response) 1936 1937 if response["status"] == 403: 1938 response['data'] = http_response.read() 1939 return self._http_403_handler(request, response, self.recv_file, 1940 request, stream, labels, start_position) 1941 1942 if response["status"] == 405: # Method Not Allowed. Don't retry. 1943 response['data'] = http_response.read() 1944 raise S3Error(response) 1945 1946 if response["status"] < 200 or response["status"] > 299: 1947 response['data'] = http_response.read() 1948 raise S3Error(response) 1949 1950 if start_position == 0: 1951 # Only compute MD5 on the fly if we're downloading from beginning 1952 # Otherwise we'd get a nonsense. 1953 md5_hash = md5() 1954 size_left = int(response["headers"]["content-length"]) 1955 size_total = start_position + size_left 1956 current_position = start_position 1957 1958 if self.config.progress_meter: 1959 progress.total_size = size_total 1960 progress.initial_position = current_position 1961 progress.current_position = current_position 1962 1963 try: 1964 # Fix for issue #432. Even when content size is 0, httplib expect the response to be read. 1965 if size_left == 0: 1966 data = http_response.read(1) 1967 # It is not supposed to be some data returned in that case 1968 assert(len(data) == 0) 1969 while (current_position < size_total): 1970 this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left 1971 1972 if self.config.limitrate > 0: 1973 start_time = time.time() 1974 1975 data = http_response.read(this_chunk) 1976 if len(data) == 0: 1977 raise S3ResponseError("EOF from S3!") 1978 1979 #throttle 1980 if self.config.limitrate > 0: 1981 real_duration = time.time() - start_time 1982 expected_duration = float(this_chunk) / self.config.limitrate 1983 if expected_duration > real_duration: 1984 time.sleep(expected_duration - real_duration) 1985 1986 stream.write(data) 1987 if start_position == 0: 1988 md5_hash.update(data) 1989 current_position += len(data) 1990 ## Call progress meter from here... 1991 if self.config.progress_meter: 1992 progress.update(delta_position = len(data)) 1993 ConnMan.put(conn) 1994 except OSError: 1995 raise 1996 except (IOError, Exception) as e: 1997 if self.config.progress_meter: 1998 progress.done("failed") 1999 if ((hasattr(e, 'errno') and e.errno 2000 and e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT)) 2001 or "[Errno 104]" in str(e) or "[Errno 32]" in str(e) 2002 ) and not isinstance(e, SocketTimeoutException): 2003 raise 2004 # close the connection and re-establish 2005 ConnMan.close(conn) 2006 2007 if retries: 2008 warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) 2009 warning("Waiting %d sec..." % self._fail_wait(retries)) 2010 time.sleep(self._fail_wait(retries)) 2011 # Connection error -> same throttle value 2012 return self.recv_file(request, stream, labels, current_position, retries - 1) 2013 else: 2014 raise S3DownloadError("Download failed for: %s" % resource['uri']) 2015 2016 stream.flush() 2017 timestamp_end = time.time() 2018 2019 if self.config.progress_meter: 2020 ## The above stream.flush() may take some time -> update() progress meter 2021 ## to correct the average speed. Otherwise people will complain that 2022 ## 'progress' and response["speed"] are inconsistent ;-) 2023 progress.update() 2024 progress.done("done") 2025 2026 md5_from_s3 = response["headers"].get("etag", "").strip('"\'') 2027 if not 'x-amz-meta-s3tools-gpgenc' in response["headers"]: 2028 # we can't trust our stored md5 because we 2029 # encrypted the file after calculating it but before 2030 # uploading it. 2031 try: 2032 md5_from_s3 = response["s3cmd-attrs"]["md5"] 2033 except KeyError: 2034 pass 2035 # we must have something to compare against to bother with the calculation 2036 if '-' not in md5_from_s3: 2037 if start_position == 0: 2038 # Only compute MD5 on the fly if we were downloading from the beginning 2039 response["md5"] = md5_hash.hexdigest() 2040 else: 2041 # Otherwise try to compute MD5 of the output file 2042 try: 2043 response["md5"] = hash_file_md5(filename) 2044 except IOError as e: 2045 if e.errno != errno.ENOENT: 2046 warning("Unable to open file: %s: %s" % (filename, e)) 2047 warning("Unable to verify MD5. Assume it matches.") 2048 2049 response["md5match"] = response.get("md5") == md5_from_s3 2050 response["elapsed"] = timestamp_end - timestamp_start 2051 response["size"] = current_position 2052 response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) 2053 if response["size"] != start_position + int(response["headers"]["content-length"]): 2054 warning("Reported size (%s) does not match received size (%s)" % ( 2055 start_position + int(response["headers"]["content-length"]), response["size"])) 2056 debug("ReceiveFile: Computed MD5 = %s" % response.get("md5")) 2057 # avoid ETags from multipart uploads that aren't the real md5 2058 if ('-' not in md5_from_s3 and not response["md5match"]) and (response["headers"].get("x-amz-server-side-encryption") != 'aws:kms'): 2059 warning("MD5 signatures do not match: computed=%s, received=%s" % ( 2060 response.get("md5"), md5_from_s3)) 2061 return response 2062__all__.append("S3") 2063 2064def parse_attrs_header(attrs_header): 2065 attrs = {} 2066 for attr in attrs_header.split("/"): 2067 key, val = attr.split(":") 2068 attrs[key] = val 2069 return attrs 2070 2071def compute_content_md5(body): 2072 m = md5(encode_to_s3(body)) 2073 base64md5 = encodestring(m.digest()) 2074 base64md5 = decode_from_s3(base64md5) 2075 if base64md5[-1] == '\n': 2076 base64md5 = base64md5[0:-1] 2077 return decode_from_s3(base64md5) 2078# vim:et:ts=4:sts=4:ai 2079