1# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7#     http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import logging
14import os
15
16from s3transfer.manager import TransferManager
17
18from awscli.customizations.s3.utils import (
19    human_readable_size, MAX_UPLOAD_SIZE, find_bucket_key, relative_path,
20    create_warning, NonSeekableStream)
21from awscli.customizations.s3.transferconfig import \
22    create_transfer_config_from_runtime_config
23from awscli.customizations.s3.results import UploadResultSubscriber
24from awscli.customizations.s3.results import DownloadResultSubscriber
25from awscli.customizations.s3.results import CopyResultSubscriber
26from awscli.customizations.s3.results import UploadStreamResultSubscriber
27from awscli.customizations.s3.results import DownloadStreamResultSubscriber
28from awscli.customizations.s3.results import DeleteResultSubscriber
29from awscli.customizations.s3.results import QueuedResult
30from awscli.customizations.s3.results import SuccessResult
31from awscli.customizations.s3.results import FailureResult
32from awscli.customizations.s3.results import DryRunResult
33from awscli.customizations.s3.results import ResultRecorder
34from awscli.customizations.s3.results import ResultPrinter
35from awscli.customizations.s3.results import OnlyShowErrorsResultPrinter
36from awscli.customizations.s3.results import NoProgressResultPrinter
37from awscli.customizations.s3.results import ResultProcessor
38from awscli.customizations.s3.results import CommandResultRecorder
39from awscli.customizations.s3.utils import RequestParamsMapper
40from awscli.customizations.s3.utils import StdoutBytesWriter
41from awscli.customizations.s3.utils import ProvideSizeSubscriber
42from awscli.customizations.s3.utils import ProvideUploadContentTypeSubscriber
43from awscli.customizations.s3.utils import ProvideCopyContentTypeSubscriber
44from awscli.customizations.s3.utils import ProvideLastModifiedTimeSubscriber
45from awscli.customizations.s3.utils import DirectoryCreatorSubscriber
46from awscli.customizations.s3.utils import DeleteSourceFileSubscriber
47from awscli.customizations.s3.utils import DeleteSourceObjectSubscriber
48from awscli.customizations.s3.utils import DeleteCopySourceObjectSubscriber
49from awscli.compat import get_binary_stdin
50
51
52LOGGER = logging.getLogger(__name__)
53
54
55class S3TransferHandlerFactory(object):
56    MAX_IN_MEMORY_CHUNKS = 6
57
58    def __init__(self, cli_params, runtime_config):
59        """Factory for S3TransferHandlers
60
61        :type cli_params: dict
62        :param cli_params: The parameters provide to the CLI command
63
64        :type runtime_config: RuntimeConfig
65        :param runtime_config: The runtime config for the CLI command
66            being run
67        """
68        self._cli_params = cli_params
69        self._runtime_config = runtime_config
70
71    def __call__(self, client, result_queue):
72        """Creates a S3TransferHandler instance
73
74        :type client: botocore.client.Client
75        :param client: The client to power the S3TransferHandler
76
77        :type result_queue: queue.Queue
78        :param result_queue: The result queue to be used to process results
79            for the S3TransferHandler
80
81        :returns: A S3TransferHandler instance
82        """
83        transfer_config = create_transfer_config_from_runtime_config(
84            self._runtime_config)
85        transfer_config.max_in_memory_upload_chunks = self.MAX_IN_MEMORY_CHUNKS
86        transfer_config.max_in_memory_download_chunks = \
87            self.MAX_IN_MEMORY_CHUNKS
88
89        transfer_manager = TransferManager(client, transfer_config)
90
91        LOGGER.debug(
92            "Using a multipart threshold of %s and a part size of %s",
93            transfer_config.multipart_threshold,
94            transfer_config.multipart_chunksize
95        )
96        result_recorder = ResultRecorder()
97        result_processor_handlers = [result_recorder]
98        self._add_result_printer(result_recorder, result_processor_handlers)
99        result_processor = ResultProcessor(
100            result_queue, result_processor_handlers)
101        command_result_recorder = CommandResultRecorder(
102            result_queue, result_recorder, result_processor)
103
104        return S3TransferHandler(
105            transfer_manager, self._cli_params, command_result_recorder)
106
107    def _add_result_printer(self, result_recorder, result_processor_handlers):
108        if self._cli_params.get('quiet'):
109            return
110        elif self._cli_params.get('only_show_errors'):
111            result_printer = OnlyShowErrorsResultPrinter(result_recorder)
112        elif self._cli_params.get('is_stream'):
113            result_printer = OnlyShowErrorsResultPrinter(result_recorder)
114        elif not self._cli_params.get('progress'):
115            result_printer = NoProgressResultPrinter(result_recorder)
116        else:
117            result_printer = ResultPrinter(result_recorder)
118        result_processor_handlers.append(result_printer)
119
120
121class S3TransferHandler(object):
122    def __init__(self, transfer_manager, cli_params, result_command_recorder):
123        """Backend for performing S3 transfers
124
125        :type transfer_manager: s3transfer.manager.TransferManager
126        :param transfer_manager: Transfer manager to use for transfers
127
128        :type cli_params: dict
129        :param cli_params: The parameters passed to the CLI command in the
130            form of a dictionary
131
132        :type result_command_recorder: ResultCommandRecorder
133        :param result_command_recorder: The result command recorder to be
134            used to get the final result of the transfer
135        """
136        self._transfer_manager = transfer_manager
137        # TODO: Ideally the s3 transfer handler should not need to know
138        # about the result command recorder. It really only needs an interface
139        # for adding results to the queue. When all of the commands have
140        # converted to use this transfer handler, an effort should be made
141        # to replace the passing of a result command recorder with an
142        # abstraction to enqueue results.
143        self._result_command_recorder = result_command_recorder
144
145        submitter_args = (
146            self._transfer_manager, self._result_command_recorder.result_queue,
147            cli_params
148        )
149        self._submitters = [
150            UploadStreamRequestSubmitter(*submitter_args),
151            DownloadStreamRequestSubmitter(*submitter_args),
152            UploadRequestSubmitter(*submitter_args),
153            DownloadRequestSubmitter(*submitter_args),
154            CopyRequestSubmitter(*submitter_args),
155            DeleteRequestSubmitter(*submitter_args),
156            LocalDeleteRequestSubmitter(*submitter_args)
157        ]
158
159    def call(self, fileinfos):
160        """Process iterable of FileInfos for transfer
161
162        :type fileinfos: iterable of FileInfos
163        param fileinfos: Set of FileInfos to submit to underlying transfer
164            request submitters to make transfer API calls to S3
165
166        :rtype: CommandResult
167        :returns: The result of the command that specifies the number of
168            failures and warnings encountered.
169        """
170        with self._result_command_recorder:
171            with self._transfer_manager:
172                total_submissions = 0
173                for fileinfo in fileinfos:
174                    for submitter in self._submitters:
175                        if submitter.can_submit(fileinfo):
176                            if submitter.submit(fileinfo):
177                                total_submissions += 1
178                            break
179                self._result_command_recorder.notify_total_submissions(
180                    total_submissions)
181        return self._result_command_recorder.get_command_result()
182
183
184class BaseTransferRequestSubmitter(object):
185    REQUEST_MAPPER_METHOD = None
186    RESULT_SUBSCRIBER_CLASS = None
187
188    def __init__(self, transfer_manager, result_queue, cli_params):
189        """Submits transfer requests to the TransferManager
190
191        Given a FileInfo object and provided CLI parameters, it will add the
192        necessary extra arguments and subscribers in making a call to the
193        TransferManager.
194
195        :type transfer_manager: s3transfer.manager.TransferManager
196        :param transfer_manager: The underlying transfer manager
197
198        :type result_queue: queue.Queue
199        :param result_queue: The result queue to use
200
201        :type cli_params: dict
202        :param cli_params: The associated CLI parameters passed in to the
203            command as a dictionary.
204        """
205        self._transfer_manager = transfer_manager
206        self._result_queue = result_queue
207        self._cli_params = cli_params
208
209    def submit(self, fileinfo):
210        """Submits a transfer request based on the FileInfo provided
211
212        There is no guarantee that the transfer request will be made on
213        behalf of the fileinfo as a fileinfo may be skipped based on
214        circumstances in which the transfer is not possible.
215
216        :type fileinfo: awscli.customizations.s3.fileinfo.FileInfo
217        :param fileinfo: The FileInfo to be used to submit a transfer
218            request to the underlying transfer manager.
219
220        :rtype: s3transfer.futures.TransferFuture
221        :returns: A TransferFuture representing the transfer if it the
222            transfer was submitted. If it was not submitted nothing
223            is returned.
224        """
225        should_skip = self._warn_and_signal_if_skip(fileinfo)
226        if not should_skip:
227            return self._do_submit(fileinfo)
228
229    def can_submit(self, fileinfo):
230        """Checks whether it can submit a particular FileInfo
231
232        :type fileinfo: awscli.customizations.s3.fileinfo.FileInfo
233        :param fileinfo: The FileInfo to check if the transfer request
234            submitter can handle.
235
236        :returns: True if it can use the provided FileInfo to make a transfer
237            request to the underlying transfer manager. False, otherwise.
238        """
239        raise NotImplementedError('can_submit()')
240
241    def _do_submit(self, fileinfo):
242        extra_args = {}
243        if self.REQUEST_MAPPER_METHOD:
244            self.REQUEST_MAPPER_METHOD(extra_args, self._cli_params)
245        subscribers = []
246        self._add_additional_subscribers(subscribers, fileinfo)
247        # The result subscriber class should always be the last registered
248        # subscriber to ensure it is not missing any information that
249        # may have been added in a different subscriber such as size.
250        if self.RESULT_SUBSCRIBER_CLASS:
251            result_kwargs = {'result_queue': self._result_queue}
252            if self._cli_params.get('is_move', False):
253                result_kwargs['transfer_type'] = 'move'
254            subscribers.append(self.RESULT_SUBSCRIBER_CLASS(**result_kwargs))
255
256        if not self._cli_params.get('dryrun'):
257            return self._submit_transfer_request(
258                fileinfo, extra_args, subscribers)
259        else:
260            self._submit_dryrun(fileinfo)
261
262    def _submit_dryrun(self, fileinfo):
263        transfer_type = fileinfo.operation_name
264        if self._cli_params.get('is_move', False):
265            transfer_type = 'move'
266        src, dest = self._format_src_dest(fileinfo)
267        self._result_queue.put(DryRunResult(
268            transfer_type=transfer_type, src=src, dest=dest))
269
270    def _add_additional_subscribers(self, subscribers, fileinfo):
271        pass
272
273    def _submit_transfer_request(self, fileinfo, extra_args, subscribers):
274        raise NotImplementedError('_submit_transfer_request()')
275
276    def _warn_and_signal_if_skip(self, fileinfo):
277        for warning_handler in self._get_warning_handlers():
278            if warning_handler(fileinfo):
279                # On the first warning handler that returns a signal to skip
280                # immediately propagate this signal and no longer check
281                # the other warning handlers as no matter what the file will
282                # be skipped.
283                return True
284
285    def _get_warning_handlers(self):
286        # Returns a list of warning handlers, which are callables that
287        # take in a single parameter representing a FileInfo. It will then
288        # add a warning to result_queue if needed and return True if
289        # that FileInfo should be skipped.
290        return []
291
292    def _should_inject_content_type(self):
293        return (
294            self._cli_params.get('guess_mime_type') and
295            not self._cli_params.get('content_type')
296        )
297
298    def _warn_glacier(self, fileinfo):
299        if not self._cli_params.get('force_glacier_transfer'):
300            if not fileinfo.is_glacier_compatible():
301                LOGGER.debug(
302                    'Encountered glacier object s3://%s. Not performing '
303                    '%s on object.' % (fileinfo.src, fileinfo.operation_name))
304                if not self._cli_params.get('ignore_glacier_warnings'):
305                    warning = create_warning(
306                        's3://'+fileinfo.src,
307                        'Object is of storage class GLACIER. Unable to '
308                        'perform %s operations on GLACIER objects. You must '
309                        'restore the object to be able to perform the '
310                        'operation. See aws s3 %s help for additional '
311                        'parameter options to ignore or force these '
312                        'transfers.' %
313                        (fileinfo.operation_name, fileinfo.operation_name)
314                    )
315                    self._result_queue.put(warning)
316                return True
317        return False
318
319    def _warn_parent_reference(self, fileinfo):
320        # normpath() will use the OS path separator so we
321        # need to take that into account when checking for a parent prefix.
322        parent_prefix = '..' + os.path.sep
323        escapes_cwd = os.path.normpath(fileinfo.compare_key).startswith(
324            parent_prefix)
325        if escapes_cwd:
326            warning = create_warning(
327                fileinfo.compare_key, "File references a parent directory.")
328            self._result_queue.put(warning)
329            return True
330        return False
331
332    def _format_src_dest(self, fileinfo):
333        """Returns formatted versions of a fileinfos source and destination."""
334        raise NotImplementedError('_format_src_dest')
335
336    def _format_local_path(self, path):
337        return relative_path(path)
338
339    def _format_s3_path(self, path):
340        if path.startswith('s3://'):
341            return path
342        return 's3://' + path
343
344
345class UploadRequestSubmitter(BaseTransferRequestSubmitter):
346    REQUEST_MAPPER_METHOD = RequestParamsMapper.map_put_object_params
347    RESULT_SUBSCRIBER_CLASS = UploadResultSubscriber
348
349    def can_submit(self, fileinfo):
350        return fileinfo.operation_name == 'upload'
351
352    def _add_additional_subscribers(self, subscribers, fileinfo):
353        subscribers.append(ProvideSizeSubscriber(fileinfo.size))
354        if self._should_inject_content_type():
355            subscribers.append(ProvideUploadContentTypeSubscriber())
356        if self._cli_params.get('is_move', False):
357            subscribers.append(DeleteSourceFileSubscriber())
358
359    def _submit_transfer_request(self, fileinfo, extra_args, subscribers):
360        bucket, key = find_bucket_key(fileinfo.dest)
361        filein = self._get_filein(fileinfo)
362        return self._transfer_manager.upload(
363            fileobj=filein, bucket=bucket, key=key,
364            extra_args=extra_args, subscribers=subscribers
365        )
366
367    def _get_filein(self, fileinfo):
368        return fileinfo.src
369
370    def _get_warning_handlers(self):
371        return [self._warn_if_too_large]
372
373    def _warn_if_too_large(self, fileinfo):
374        if getattr(fileinfo, 'size') and fileinfo.size > MAX_UPLOAD_SIZE:
375            file_path = relative_path(fileinfo.src)
376            warning_message = (
377                "File %s exceeds s3 upload limit of %s." % (
378                    file_path, human_readable_size(MAX_UPLOAD_SIZE)))
379            warning = create_warning(
380                file_path, warning_message, skip_file=False)
381            self._result_queue.put(warning)
382
383    def _format_src_dest(self, fileinfo):
384        src = self._format_local_path(fileinfo.src)
385        dest = self._format_s3_path(fileinfo.dest)
386        return src, dest
387
388
389class DownloadRequestSubmitter(BaseTransferRequestSubmitter):
390    REQUEST_MAPPER_METHOD = RequestParamsMapper.map_get_object_params
391    RESULT_SUBSCRIBER_CLASS = DownloadResultSubscriber
392
393    def can_submit(self, fileinfo):
394        return fileinfo.operation_name == 'download'
395
396    def _add_additional_subscribers(self, subscribers, fileinfo):
397        subscribers.append(ProvideSizeSubscriber(fileinfo.size))
398        subscribers.append(DirectoryCreatorSubscriber())
399        subscribers.append(ProvideLastModifiedTimeSubscriber(
400            fileinfo.last_update, self._result_queue))
401        if self._cli_params.get('is_move', False):
402            subscribers.append(DeleteSourceObjectSubscriber(
403                fileinfo.source_client))
404
405    def _submit_transfer_request(self, fileinfo, extra_args, subscribers):
406        bucket, key = find_bucket_key(fileinfo.src)
407        fileout = self._get_fileout(fileinfo)
408        return self._transfer_manager.download(
409            fileobj=fileout, bucket=bucket, key=key,
410            extra_args=extra_args, subscribers=subscribers
411        )
412
413    def _get_fileout(self, fileinfo):
414        return fileinfo.dest
415
416    def _get_warning_handlers(self):
417        return [self._warn_glacier, self._warn_parent_reference]
418
419    def _format_src_dest(self, fileinfo):
420        src = self._format_s3_path(fileinfo.src)
421        dest = self._format_local_path(fileinfo.dest)
422        return src, dest
423
424
425class CopyRequestSubmitter(BaseTransferRequestSubmitter):
426    REQUEST_MAPPER_METHOD = RequestParamsMapper.map_copy_object_params
427    RESULT_SUBSCRIBER_CLASS = CopyResultSubscriber
428
429    def can_submit(self, fileinfo):
430        return fileinfo.operation_name == 'copy'
431
432    def _add_additional_subscribers(self, subscribers, fileinfo):
433        subscribers.append(ProvideSizeSubscriber(fileinfo.size))
434        if self._should_inject_content_type():
435            subscribers.append(ProvideCopyContentTypeSubscriber())
436        if self._cli_params.get('is_move', False):
437            subscribers.append(DeleteCopySourceObjectSubscriber(
438                fileinfo.source_client))
439
440    def _submit_transfer_request(self, fileinfo, extra_args, subscribers):
441        bucket, key = find_bucket_key(fileinfo.dest)
442        source_bucket, source_key = find_bucket_key(fileinfo.src)
443        copy_source = {'Bucket': source_bucket, 'Key': source_key}
444        return self._transfer_manager.copy(
445            bucket=bucket, key=key, copy_source=copy_source,
446            extra_args=extra_args, subscribers=subscribers,
447            source_client=fileinfo.source_client
448        )
449
450    def _get_warning_handlers(self):
451        return [self._warn_glacier]
452
453    def _format_src_dest(self, fileinfo):
454        src = self._format_s3_path(fileinfo.src)
455        dest = self._format_s3_path(fileinfo.dest)
456        return src, dest
457
458
459class UploadStreamRequestSubmitter(UploadRequestSubmitter):
460    RESULT_SUBSCRIBER_CLASS = UploadStreamResultSubscriber
461
462    def can_submit(self, fileinfo):
463        return (
464            fileinfo.operation_name == 'upload' and
465            self._cli_params.get('is_stream')
466        )
467
468    def _add_additional_subscribers(self, subscribers, fileinfo):
469        expected_size = self._cli_params.get('expected_size', None)
470        if expected_size is not None:
471            subscribers.append(ProvideSizeSubscriber(int(expected_size)))
472
473    def _get_filein(self, fileinfo):
474        binary_stdin = get_binary_stdin()
475        return NonSeekableStream(binary_stdin)
476
477    def _format_local_path(self, path):
478        return '-'
479
480
481class DownloadStreamRequestSubmitter(DownloadRequestSubmitter):
482    RESULT_SUBSCRIBER_CLASS = DownloadStreamResultSubscriber
483
484    def can_submit(self, fileinfo):
485        return (
486            fileinfo.operation_name == 'download' and
487            self._cli_params.get('is_stream')
488        )
489
490    def _add_additional_subscribers(self, subscribers, fileinfo):
491        pass
492
493    def _get_fileout(self, fileinfo):
494        return StdoutBytesWriter()
495
496    def _format_local_path(self, path):
497        return '-'
498
499
500class DeleteRequestSubmitter(BaseTransferRequestSubmitter):
501    REQUEST_MAPPER_METHOD = RequestParamsMapper.map_delete_object_params
502    RESULT_SUBSCRIBER_CLASS = DeleteResultSubscriber
503
504    def can_submit(self, fileinfo):
505        return fileinfo.operation_name == 'delete' and \
506            fileinfo.src_type == 's3'
507
508    def _submit_transfer_request(self, fileinfo, extra_args, subscribers):
509        bucket, key = find_bucket_key(fileinfo.src)
510        return self._transfer_manager.delete(
511            bucket=bucket, key=key, extra_args=extra_args,
512            subscribers=subscribers)
513
514    def _format_src_dest(self, fileinfo):
515        return self._format_s3_path(fileinfo.src), None
516
517
518class LocalDeleteRequestSubmitter(BaseTransferRequestSubmitter):
519    REQUEST_MAPPER_METHOD = None
520    RESULT_SUBSCRIBER_CLASS = None
521
522    def can_submit(self, fileinfo):
523        return fileinfo.operation_name == 'delete' and \
524            fileinfo.src_type == 'local'
525
526    def _submit_transfer_request(self, fileinfo, extra_args, subscribers):
527        # This is quirky but essentially instead of relying on a built-in
528        # method of s3 transfer, the logic lives directly in the submitter.
529        # The reason a explicit delete local file does not
530        # live in s3transfer is because it is outside the scope of s3transfer;
531        # it should only have interfaces for interacting with S3. Therefore,
532        # the burden of this functionality should live in the CLI.
533
534        # The main downsides in doing this is that delete and the result
535        # creation happens in the main thread as opposed to a separate thread
536        # in s3transfer. However, this is not too big of a downside because
537        # deleting a local file only happens for sync --delete downloads and
538        # is very fast compared to all of the other types of transfers.
539        src, dest = self._format_src_dest(fileinfo)
540        result_kwargs = {
541            'transfer_type': 'delete',
542            'src': src,
543            'dest': dest
544        }
545        try:
546            self._result_queue.put(QueuedResult(
547                total_transfer_size=0, **result_kwargs))
548            os.remove(fileinfo.src)
549            self._result_queue.put(SuccessResult(**result_kwargs))
550        except Exception as e:
551            self._result_queue.put(
552                FailureResult(exception=e, **result_kwargs))
553        finally:
554            # Return True to indicate that the transfer was submitted
555            return True
556
557    def _format_src_dest(self, fileinfo):
558        return self._format_local_path(fileinfo.src), None
559