1# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"). You 4# may not use this file except in compliance with the License. A copy of 5# the License is located at 6# 7# http://aws.amazon.com/apache2.0/ 8# 9# or in the "license" file accompanying this file. This file is 10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 11# ANY KIND, either express or implied. See the License for the specific 12# language governing permissions and limitations under the License. 13import logging 14import os 15 16from s3transfer.manager import TransferManager 17 18from awscli.customizations.s3.utils import ( 19 human_readable_size, MAX_UPLOAD_SIZE, find_bucket_key, relative_path, 20 create_warning, NonSeekableStream) 21from awscli.customizations.s3.transferconfig import \ 22 create_transfer_config_from_runtime_config 23from awscli.customizations.s3.results import UploadResultSubscriber 24from awscli.customizations.s3.results import DownloadResultSubscriber 25from awscli.customizations.s3.results import CopyResultSubscriber 26from awscli.customizations.s3.results import UploadStreamResultSubscriber 27from awscli.customizations.s3.results import DownloadStreamResultSubscriber 28from awscli.customizations.s3.results import DeleteResultSubscriber 29from awscli.customizations.s3.results import QueuedResult 30from awscli.customizations.s3.results import SuccessResult 31from awscli.customizations.s3.results import FailureResult 32from awscli.customizations.s3.results import DryRunResult 33from awscli.customizations.s3.results import ResultRecorder 34from awscli.customizations.s3.results import ResultPrinter 35from awscli.customizations.s3.results import OnlyShowErrorsResultPrinter 36from awscli.customizations.s3.results import NoProgressResultPrinter 37from awscli.customizations.s3.results import ResultProcessor 38from awscli.customizations.s3.results import CommandResultRecorder 39from awscli.customizations.s3.utils import RequestParamsMapper 40from awscli.customizations.s3.utils import StdoutBytesWriter 41from awscli.customizations.s3.utils import ProvideSizeSubscriber 42from awscli.customizations.s3.utils import ProvideUploadContentTypeSubscriber 43from awscli.customizations.s3.utils import ProvideCopyContentTypeSubscriber 44from awscli.customizations.s3.utils import ProvideLastModifiedTimeSubscriber 45from awscli.customizations.s3.utils import DirectoryCreatorSubscriber 46from awscli.customizations.s3.utils import DeleteSourceFileSubscriber 47from awscli.customizations.s3.utils import DeleteSourceObjectSubscriber 48from awscli.customizations.s3.utils import DeleteCopySourceObjectSubscriber 49from awscli.compat import get_binary_stdin 50 51 52LOGGER = logging.getLogger(__name__) 53 54 55class S3TransferHandlerFactory(object): 56 MAX_IN_MEMORY_CHUNKS = 6 57 58 def __init__(self, cli_params, runtime_config): 59 """Factory for S3TransferHandlers 60 61 :type cli_params: dict 62 :param cli_params: The parameters provide to the CLI command 63 64 :type runtime_config: RuntimeConfig 65 :param runtime_config: The runtime config for the CLI command 66 being run 67 """ 68 self._cli_params = cli_params 69 self._runtime_config = runtime_config 70 71 def __call__(self, client, result_queue): 72 """Creates a S3TransferHandler instance 73 74 :type client: botocore.client.Client 75 :param client: The client to power the S3TransferHandler 76 77 :type result_queue: queue.Queue 78 :param result_queue: The result queue to be used to process results 79 for the S3TransferHandler 80 81 :returns: A S3TransferHandler instance 82 """ 83 transfer_config = create_transfer_config_from_runtime_config( 84 self._runtime_config) 85 transfer_config.max_in_memory_upload_chunks = self.MAX_IN_MEMORY_CHUNKS 86 transfer_config.max_in_memory_download_chunks = \ 87 self.MAX_IN_MEMORY_CHUNKS 88 89 transfer_manager = TransferManager(client, transfer_config) 90 91 LOGGER.debug( 92 "Using a multipart threshold of %s and a part size of %s", 93 transfer_config.multipart_threshold, 94 transfer_config.multipart_chunksize 95 ) 96 result_recorder = ResultRecorder() 97 result_processor_handlers = [result_recorder] 98 self._add_result_printer(result_recorder, result_processor_handlers) 99 result_processor = ResultProcessor( 100 result_queue, result_processor_handlers) 101 command_result_recorder = CommandResultRecorder( 102 result_queue, result_recorder, result_processor) 103 104 return S3TransferHandler( 105 transfer_manager, self._cli_params, command_result_recorder) 106 107 def _add_result_printer(self, result_recorder, result_processor_handlers): 108 if self._cli_params.get('quiet'): 109 return 110 elif self._cli_params.get('only_show_errors'): 111 result_printer = OnlyShowErrorsResultPrinter(result_recorder) 112 elif self._cli_params.get('is_stream'): 113 result_printer = OnlyShowErrorsResultPrinter(result_recorder) 114 elif not self._cli_params.get('progress'): 115 result_printer = NoProgressResultPrinter(result_recorder) 116 else: 117 result_printer = ResultPrinter(result_recorder) 118 result_processor_handlers.append(result_printer) 119 120 121class S3TransferHandler(object): 122 def __init__(self, transfer_manager, cli_params, result_command_recorder): 123 """Backend for performing S3 transfers 124 125 :type transfer_manager: s3transfer.manager.TransferManager 126 :param transfer_manager: Transfer manager to use for transfers 127 128 :type cli_params: dict 129 :param cli_params: The parameters passed to the CLI command in the 130 form of a dictionary 131 132 :type result_command_recorder: ResultCommandRecorder 133 :param result_command_recorder: The result command recorder to be 134 used to get the final result of the transfer 135 """ 136 self._transfer_manager = transfer_manager 137 # TODO: Ideally the s3 transfer handler should not need to know 138 # about the result command recorder. It really only needs an interface 139 # for adding results to the queue. When all of the commands have 140 # converted to use this transfer handler, an effort should be made 141 # to replace the passing of a result command recorder with an 142 # abstraction to enqueue results. 143 self._result_command_recorder = result_command_recorder 144 145 submitter_args = ( 146 self._transfer_manager, self._result_command_recorder.result_queue, 147 cli_params 148 ) 149 self._submitters = [ 150 UploadStreamRequestSubmitter(*submitter_args), 151 DownloadStreamRequestSubmitter(*submitter_args), 152 UploadRequestSubmitter(*submitter_args), 153 DownloadRequestSubmitter(*submitter_args), 154 CopyRequestSubmitter(*submitter_args), 155 DeleteRequestSubmitter(*submitter_args), 156 LocalDeleteRequestSubmitter(*submitter_args) 157 ] 158 159 def call(self, fileinfos): 160 """Process iterable of FileInfos for transfer 161 162 :type fileinfos: iterable of FileInfos 163 param fileinfos: Set of FileInfos to submit to underlying transfer 164 request submitters to make transfer API calls to S3 165 166 :rtype: CommandResult 167 :returns: The result of the command that specifies the number of 168 failures and warnings encountered. 169 """ 170 with self._result_command_recorder: 171 with self._transfer_manager: 172 total_submissions = 0 173 for fileinfo in fileinfos: 174 for submitter in self._submitters: 175 if submitter.can_submit(fileinfo): 176 if submitter.submit(fileinfo): 177 total_submissions += 1 178 break 179 self._result_command_recorder.notify_total_submissions( 180 total_submissions) 181 return self._result_command_recorder.get_command_result() 182 183 184class BaseTransferRequestSubmitter(object): 185 REQUEST_MAPPER_METHOD = None 186 RESULT_SUBSCRIBER_CLASS = None 187 188 def __init__(self, transfer_manager, result_queue, cli_params): 189 """Submits transfer requests to the TransferManager 190 191 Given a FileInfo object and provided CLI parameters, it will add the 192 necessary extra arguments and subscribers in making a call to the 193 TransferManager. 194 195 :type transfer_manager: s3transfer.manager.TransferManager 196 :param transfer_manager: The underlying transfer manager 197 198 :type result_queue: queue.Queue 199 :param result_queue: The result queue to use 200 201 :type cli_params: dict 202 :param cli_params: The associated CLI parameters passed in to the 203 command as a dictionary. 204 """ 205 self._transfer_manager = transfer_manager 206 self._result_queue = result_queue 207 self._cli_params = cli_params 208 209 def submit(self, fileinfo): 210 """Submits a transfer request based on the FileInfo provided 211 212 There is no guarantee that the transfer request will be made on 213 behalf of the fileinfo as a fileinfo may be skipped based on 214 circumstances in which the transfer is not possible. 215 216 :type fileinfo: awscli.customizations.s3.fileinfo.FileInfo 217 :param fileinfo: The FileInfo to be used to submit a transfer 218 request to the underlying transfer manager. 219 220 :rtype: s3transfer.futures.TransferFuture 221 :returns: A TransferFuture representing the transfer if it the 222 transfer was submitted. If it was not submitted nothing 223 is returned. 224 """ 225 should_skip = self._warn_and_signal_if_skip(fileinfo) 226 if not should_skip: 227 return self._do_submit(fileinfo) 228 229 def can_submit(self, fileinfo): 230 """Checks whether it can submit a particular FileInfo 231 232 :type fileinfo: awscli.customizations.s3.fileinfo.FileInfo 233 :param fileinfo: The FileInfo to check if the transfer request 234 submitter can handle. 235 236 :returns: True if it can use the provided FileInfo to make a transfer 237 request to the underlying transfer manager. False, otherwise. 238 """ 239 raise NotImplementedError('can_submit()') 240 241 def _do_submit(self, fileinfo): 242 extra_args = {} 243 if self.REQUEST_MAPPER_METHOD: 244 self.REQUEST_MAPPER_METHOD(extra_args, self._cli_params) 245 subscribers = [] 246 self._add_additional_subscribers(subscribers, fileinfo) 247 # The result subscriber class should always be the last registered 248 # subscriber to ensure it is not missing any information that 249 # may have been added in a different subscriber such as size. 250 if self.RESULT_SUBSCRIBER_CLASS: 251 result_kwargs = {'result_queue': self._result_queue} 252 if self._cli_params.get('is_move', False): 253 result_kwargs['transfer_type'] = 'move' 254 subscribers.append(self.RESULT_SUBSCRIBER_CLASS(**result_kwargs)) 255 256 if not self._cli_params.get('dryrun'): 257 return self._submit_transfer_request( 258 fileinfo, extra_args, subscribers) 259 else: 260 self._submit_dryrun(fileinfo) 261 262 def _submit_dryrun(self, fileinfo): 263 transfer_type = fileinfo.operation_name 264 if self._cli_params.get('is_move', False): 265 transfer_type = 'move' 266 src, dest = self._format_src_dest(fileinfo) 267 self._result_queue.put(DryRunResult( 268 transfer_type=transfer_type, src=src, dest=dest)) 269 270 def _add_additional_subscribers(self, subscribers, fileinfo): 271 pass 272 273 def _submit_transfer_request(self, fileinfo, extra_args, subscribers): 274 raise NotImplementedError('_submit_transfer_request()') 275 276 def _warn_and_signal_if_skip(self, fileinfo): 277 for warning_handler in self._get_warning_handlers(): 278 if warning_handler(fileinfo): 279 # On the first warning handler that returns a signal to skip 280 # immediately propagate this signal and no longer check 281 # the other warning handlers as no matter what the file will 282 # be skipped. 283 return True 284 285 def _get_warning_handlers(self): 286 # Returns a list of warning handlers, which are callables that 287 # take in a single parameter representing a FileInfo. It will then 288 # add a warning to result_queue if needed and return True if 289 # that FileInfo should be skipped. 290 return [] 291 292 def _should_inject_content_type(self): 293 return ( 294 self._cli_params.get('guess_mime_type') and 295 not self._cli_params.get('content_type') 296 ) 297 298 def _warn_glacier(self, fileinfo): 299 if not self._cli_params.get('force_glacier_transfer'): 300 if not fileinfo.is_glacier_compatible(): 301 LOGGER.debug( 302 'Encountered glacier object s3://%s. Not performing ' 303 '%s on object.' % (fileinfo.src, fileinfo.operation_name)) 304 if not self._cli_params.get('ignore_glacier_warnings'): 305 warning = create_warning( 306 's3://'+fileinfo.src, 307 'Object is of storage class GLACIER. Unable to ' 308 'perform %s operations on GLACIER objects. You must ' 309 'restore the object to be able to perform the ' 310 'operation. See aws s3 %s help for additional ' 311 'parameter options to ignore or force these ' 312 'transfers.' % 313 (fileinfo.operation_name, fileinfo.operation_name) 314 ) 315 self._result_queue.put(warning) 316 return True 317 return False 318 319 def _warn_parent_reference(self, fileinfo): 320 # normpath() will use the OS path separator so we 321 # need to take that into account when checking for a parent prefix. 322 parent_prefix = '..' + os.path.sep 323 escapes_cwd = os.path.normpath(fileinfo.compare_key).startswith( 324 parent_prefix) 325 if escapes_cwd: 326 warning = create_warning( 327 fileinfo.compare_key, "File references a parent directory.") 328 self._result_queue.put(warning) 329 return True 330 return False 331 332 def _format_src_dest(self, fileinfo): 333 """Returns formatted versions of a fileinfos source and destination.""" 334 raise NotImplementedError('_format_src_dest') 335 336 def _format_local_path(self, path): 337 return relative_path(path) 338 339 def _format_s3_path(self, path): 340 if path.startswith('s3://'): 341 return path 342 return 's3://' + path 343 344 345class UploadRequestSubmitter(BaseTransferRequestSubmitter): 346 REQUEST_MAPPER_METHOD = RequestParamsMapper.map_put_object_params 347 RESULT_SUBSCRIBER_CLASS = UploadResultSubscriber 348 349 def can_submit(self, fileinfo): 350 return fileinfo.operation_name == 'upload' 351 352 def _add_additional_subscribers(self, subscribers, fileinfo): 353 subscribers.append(ProvideSizeSubscriber(fileinfo.size)) 354 if self._should_inject_content_type(): 355 subscribers.append(ProvideUploadContentTypeSubscriber()) 356 if self._cli_params.get('is_move', False): 357 subscribers.append(DeleteSourceFileSubscriber()) 358 359 def _submit_transfer_request(self, fileinfo, extra_args, subscribers): 360 bucket, key = find_bucket_key(fileinfo.dest) 361 filein = self._get_filein(fileinfo) 362 return self._transfer_manager.upload( 363 fileobj=filein, bucket=bucket, key=key, 364 extra_args=extra_args, subscribers=subscribers 365 ) 366 367 def _get_filein(self, fileinfo): 368 return fileinfo.src 369 370 def _get_warning_handlers(self): 371 return [self._warn_if_too_large] 372 373 def _warn_if_too_large(self, fileinfo): 374 if getattr(fileinfo, 'size') and fileinfo.size > MAX_UPLOAD_SIZE: 375 file_path = relative_path(fileinfo.src) 376 warning_message = ( 377 "File %s exceeds s3 upload limit of %s." % ( 378 file_path, human_readable_size(MAX_UPLOAD_SIZE))) 379 warning = create_warning( 380 file_path, warning_message, skip_file=False) 381 self._result_queue.put(warning) 382 383 def _format_src_dest(self, fileinfo): 384 src = self._format_local_path(fileinfo.src) 385 dest = self._format_s3_path(fileinfo.dest) 386 return src, dest 387 388 389class DownloadRequestSubmitter(BaseTransferRequestSubmitter): 390 REQUEST_MAPPER_METHOD = RequestParamsMapper.map_get_object_params 391 RESULT_SUBSCRIBER_CLASS = DownloadResultSubscriber 392 393 def can_submit(self, fileinfo): 394 return fileinfo.operation_name == 'download' 395 396 def _add_additional_subscribers(self, subscribers, fileinfo): 397 subscribers.append(ProvideSizeSubscriber(fileinfo.size)) 398 subscribers.append(DirectoryCreatorSubscriber()) 399 subscribers.append(ProvideLastModifiedTimeSubscriber( 400 fileinfo.last_update, self._result_queue)) 401 if self._cli_params.get('is_move', False): 402 subscribers.append(DeleteSourceObjectSubscriber( 403 fileinfo.source_client)) 404 405 def _submit_transfer_request(self, fileinfo, extra_args, subscribers): 406 bucket, key = find_bucket_key(fileinfo.src) 407 fileout = self._get_fileout(fileinfo) 408 return self._transfer_manager.download( 409 fileobj=fileout, bucket=bucket, key=key, 410 extra_args=extra_args, subscribers=subscribers 411 ) 412 413 def _get_fileout(self, fileinfo): 414 return fileinfo.dest 415 416 def _get_warning_handlers(self): 417 return [self._warn_glacier, self._warn_parent_reference] 418 419 def _format_src_dest(self, fileinfo): 420 src = self._format_s3_path(fileinfo.src) 421 dest = self._format_local_path(fileinfo.dest) 422 return src, dest 423 424 425class CopyRequestSubmitter(BaseTransferRequestSubmitter): 426 REQUEST_MAPPER_METHOD = RequestParamsMapper.map_copy_object_params 427 RESULT_SUBSCRIBER_CLASS = CopyResultSubscriber 428 429 def can_submit(self, fileinfo): 430 return fileinfo.operation_name == 'copy' 431 432 def _add_additional_subscribers(self, subscribers, fileinfo): 433 subscribers.append(ProvideSizeSubscriber(fileinfo.size)) 434 if self._should_inject_content_type(): 435 subscribers.append(ProvideCopyContentTypeSubscriber()) 436 if self._cli_params.get('is_move', False): 437 subscribers.append(DeleteCopySourceObjectSubscriber( 438 fileinfo.source_client)) 439 440 def _submit_transfer_request(self, fileinfo, extra_args, subscribers): 441 bucket, key = find_bucket_key(fileinfo.dest) 442 source_bucket, source_key = find_bucket_key(fileinfo.src) 443 copy_source = {'Bucket': source_bucket, 'Key': source_key} 444 return self._transfer_manager.copy( 445 bucket=bucket, key=key, copy_source=copy_source, 446 extra_args=extra_args, subscribers=subscribers, 447 source_client=fileinfo.source_client 448 ) 449 450 def _get_warning_handlers(self): 451 return [self._warn_glacier] 452 453 def _format_src_dest(self, fileinfo): 454 src = self._format_s3_path(fileinfo.src) 455 dest = self._format_s3_path(fileinfo.dest) 456 return src, dest 457 458 459class UploadStreamRequestSubmitter(UploadRequestSubmitter): 460 RESULT_SUBSCRIBER_CLASS = UploadStreamResultSubscriber 461 462 def can_submit(self, fileinfo): 463 return ( 464 fileinfo.operation_name == 'upload' and 465 self._cli_params.get('is_stream') 466 ) 467 468 def _add_additional_subscribers(self, subscribers, fileinfo): 469 expected_size = self._cli_params.get('expected_size', None) 470 if expected_size is not None: 471 subscribers.append(ProvideSizeSubscriber(int(expected_size))) 472 473 def _get_filein(self, fileinfo): 474 binary_stdin = get_binary_stdin() 475 return NonSeekableStream(binary_stdin) 476 477 def _format_local_path(self, path): 478 return '-' 479 480 481class DownloadStreamRequestSubmitter(DownloadRequestSubmitter): 482 RESULT_SUBSCRIBER_CLASS = DownloadStreamResultSubscriber 483 484 def can_submit(self, fileinfo): 485 return ( 486 fileinfo.operation_name == 'download' and 487 self._cli_params.get('is_stream') 488 ) 489 490 def _add_additional_subscribers(self, subscribers, fileinfo): 491 pass 492 493 def _get_fileout(self, fileinfo): 494 return StdoutBytesWriter() 495 496 def _format_local_path(self, path): 497 return '-' 498 499 500class DeleteRequestSubmitter(BaseTransferRequestSubmitter): 501 REQUEST_MAPPER_METHOD = RequestParamsMapper.map_delete_object_params 502 RESULT_SUBSCRIBER_CLASS = DeleteResultSubscriber 503 504 def can_submit(self, fileinfo): 505 return fileinfo.operation_name == 'delete' and \ 506 fileinfo.src_type == 's3' 507 508 def _submit_transfer_request(self, fileinfo, extra_args, subscribers): 509 bucket, key = find_bucket_key(fileinfo.src) 510 return self._transfer_manager.delete( 511 bucket=bucket, key=key, extra_args=extra_args, 512 subscribers=subscribers) 513 514 def _format_src_dest(self, fileinfo): 515 return self._format_s3_path(fileinfo.src), None 516 517 518class LocalDeleteRequestSubmitter(BaseTransferRequestSubmitter): 519 REQUEST_MAPPER_METHOD = None 520 RESULT_SUBSCRIBER_CLASS = None 521 522 def can_submit(self, fileinfo): 523 return fileinfo.operation_name == 'delete' and \ 524 fileinfo.src_type == 'local' 525 526 def _submit_transfer_request(self, fileinfo, extra_args, subscribers): 527 # This is quirky but essentially instead of relying on a built-in 528 # method of s3 transfer, the logic lives directly in the submitter. 529 # The reason a explicit delete local file does not 530 # live in s3transfer is because it is outside the scope of s3transfer; 531 # it should only have interfaces for interacting with S3. Therefore, 532 # the burden of this functionality should live in the CLI. 533 534 # The main downsides in doing this is that delete and the result 535 # creation happens in the main thread as opposed to a separate thread 536 # in s3transfer. However, this is not too big of a downside because 537 # deleting a local file only happens for sync --delete downloads and 538 # is very fast compared to all of the other types of transfers. 539 src, dest = self._format_src_dest(fileinfo) 540 result_kwargs = { 541 'transfer_type': 'delete', 542 'src': src, 543 'dest': dest 544 } 545 try: 546 self._result_queue.put(QueuedResult( 547 total_transfer_size=0, **result_kwargs)) 548 os.remove(fileinfo.src) 549 self._result_queue.put(SuccessResult(**result_kwargs)) 550 except Exception as e: 551 self._result_queue.put( 552 FailureResult(exception=e, **result_kwargs)) 553 finally: 554 # Return True to indicate that the transfer was submitted 555 return True 556 557 def _format_src_dest(self, fileinfo): 558 return self._format_local_path(fileinfo.src), None 559