1<?php
2namespace Aws\S3;
3
4use Aws;
5use Aws\CommandInterface;
6use Aws\Exception\AwsException;
7use GuzzleHttp\Promise;
8use GuzzleHttp\Promise\PromisorInterface;
9use Iterator;
10
11/**
12 * Transfers files from the local filesystem to S3 or from S3 to the local
13 * filesystem.
14 *
15 * This class does not support copying from the local filesystem to somewhere
16 * else on the local filesystem or from one S3 bucket to another.
17 */
18class Transfer implements PromisorInterface
19{
20    private $client;
21    private $promise;
22    private $source;
23    private $sourceMetadata;
24    private $destination;
25    private $concurrency;
26    private $mupThreshold;
27    private $before;
28    private $s3Args = [];
29
30    /**
31     * When providing the $source argument, you may provide a string referencing
32     * the path to a directory on disk to upload, an s3 scheme URI that contains
33     * the bucket and key (e.g., "s3://bucket/key"), or an \Iterator object
34     * that yields strings containing filenames that are the path to a file on
35     * disk or an s3 scheme URI. The bucket portion of the s3 URI may be an S3
36     * access point ARN. The "/key" portion of an s3 URI is optional.
37     *
38     * When providing an iterator for the $source argument, you must also
39     * provide a 'base_dir' key value pair in the $options argument.
40     *
41     * The $dest argument can be the path to a directory on disk or an s3
42     * scheme URI (e.g., "s3://bucket/key").
43     *
44     * The options array can contain the following key value pairs:
45     *
46     * - base_dir: (string) Base directory of the source, if $source is an
47     *   iterator. If the $source option is not an array, then this option is
48     *   ignored.
49     * - before: (callable) A callback to invoke before each transfer. The
50     *   callback accepts a single argument: Aws\CommandInterface $command.
51     *   The provided command will be either a GetObject, PutObject,
52     *   InitiateMultipartUpload, or UploadPart command.
53     * - mup_threshold: (int) Size in bytes in which a multipart upload should
54     *   be used instead of PutObject. Defaults to 20971520 (20 MB).
55     * - concurrency: (int, default=5) Number of files to upload concurrently.
56     *   The ideal concurrency value will vary based on the number of files
57     *   being uploaded and the average size of each file. Generally speaking,
58     *   smaller files benefit from a higher concurrency while larger files
59     *   will not.
60     * - debug: (bool) Set to true to print out debug information for
61     *   transfers. Set to an fopen() resource to write to a specific stream
62     *   rather than writing to STDOUT.
63     *
64     * @param S3ClientInterface $client  Client used for transfers.
65     * @param string|Iterator   $source  Where the files are transferred from.
66     * @param string            $dest    Where the files are transferred to.
67     * @param array             $options Hash of options.
68     */
69    public function __construct(
70        S3ClientInterface $client,
71        $source,
72        $dest,
73        array $options = []
74    ) {
75        $this->client = $client;
76
77        // Prepare the destination.
78        $this->destination = $this->prepareTarget($dest);
79        if ($this->destination['scheme'] === 's3') {
80            $this->s3Args = $this->getS3Args($this->destination['path']);
81        }
82
83        // Prepare the source.
84        if (is_string($source)) {
85            $this->sourceMetadata = $this->prepareTarget($source);
86            $this->source = $source;
87        } elseif ($source instanceof Iterator) {
88            if (empty($options['base_dir'])) {
89                throw new \InvalidArgumentException('You must provide the source'
90                    . ' argument as a string or provide the "base_dir" option.');
91            }
92
93            $this->sourceMetadata = $this->prepareTarget($options['base_dir']);
94            $this->source = $source;
95        } else {
96            throw new \InvalidArgumentException('source must be the path to a '
97                . 'directory or an iterator that yields file names.');
98        }
99
100        // Validate schemes.
101        if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) {
102            throw new \InvalidArgumentException("You cannot copy from"
103                . " {$this->sourceMetadata['scheme']} to"
104                . " {$this->destination['scheme']}."
105            );
106        }
107
108        // Handle multipart-related options.
109        $this->concurrency = isset($options['concurrency'])
110            ? $options['concurrency']
111            : MultipartUploader::DEFAULT_CONCURRENCY;
112        $this->mupThreshold = isset($options['mup_threshold'])
113            ? $options['mup_threshold']
114            : 16777216;
115        if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) {
116            throw new \InvalidArgumentException('mup_threshold must be >= 5MB');
117        }
118
119        // Handle "before" callback option.
120        if (isset($options['before'])) {
121            $this->before = $options['before'];
122            if (!is_callable($this->before)) {
123                throw new \InvalidArgumentException('before must be a callable.');
124            }
125        }
126
127        // Handle "debug" option.
128        if (isset($options['debug'])) {
129            if ($options['debug'] === true) {
130                $options['debug'] = fopen('php://output', 'w');
131            }
132            if (is_resource($options['debug'])) {
133                $this->addDebugToBefore($options['debug']);
134            }
135        }
136    }
137
138    /**
139     * Transfers the files.
140     */
141    public function promise()
142    {
143        // If the promise has been created, just return it.
144        if (!$this->promise) {
145            // Create an upload/download promise for the transfer.
146            $this->promise = $this->sourceMetadata['scheme'] === 'file'
147                ? $this->createUploadPromise()
148                : $this->createDownloadPromise();
149        }
150
151        return $this->promise;
152    }
153
154    /**
155     * Transfers the files synchronously.
156     */
157    public function transfer()
158    {
159        $this->promise()->wait();
160    }
161
162    private function prepareTarget($targetPath)
163    {
164        $target = [
165            'path'   => $this->normalizePath($targetPath),
166            'scheme' => $this->determineScheme($targetPath),
167        ];
168
169        if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') {
170            throw new \InvalidArgumentException('Scheme must be "s3" or "file".');
171        }
172
173        return $target;
174    }
175
176    /**
177     * Creates an array that contains Bucket and Key by parsing the filename.
178     *
179     * @param string $path Path to parse.
180     *
181     * @return array
182     */
183    private function getS3Args($path)
184    {
185        $parts = explode('/', str_replace('s3://', '', $path), 2);
186        $args = ['Bucket' => $parts[0]];
187        if (isset($parts[1])) {
188            $args['Key'] = $parts[1];
189        }
190
191        return $args;
192    }
193
194    /**
195     * Parses the scheme from a filename.
196     *
197     * @param string $path Path to parse.
198     *
199     * @return string
200     */
201    private function determineScheme($path)
202    {
203        return !strpos($path, '://') ? 'file' : explode('://', $path)[0];
204    }
205
206    /**
207     * Normalize a path so that it has UNIX-style directory separators and no trailing /
208     *
209     * @param string $path
210     *
211     * @return string
212     */
213    private function normalizePath($path)
214    {
215        return rtrim(str_replace('\\', '/', $path), '/');
216    }
217
218    private function resolveUri($uri)
219    {
220        $resolved = [];
221        $sections = explode('/', $uri);
222        foreach ($sections as $section) {
223            if ($section === '.' || $section === '') {
224                continue;
225            }
226            if ($section === '..') {
227                array_pop($resolved);
228            } else {
229                $resolved []= $section;
230            }
231        }
232
233        return ($uri[0] === '/' ? '/' : '')
234            . implode('/', $resolved);
235    }
236
237    private function createDownloadPromise()
238    {
239        $parts = $this->getS3Args($this->sourceMetadata['path']);
240        $prefix = "s3://{$parts['Bucket']}/"
241            . (isset($parts['Key']) ? $parts['Key'] . '/' : '');
242
243
244        $commands = [];
245        foreach ($this->getDownloadsIterator() as $object) {
246            // Prepare the sink.
247            $objectKey = preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object);
248
249            $resolveSink = $this->destination['path'] . '/';
250            if (isset($parts['Key']) && strpos($objectKey, $parts['Key']) !== 0) {
251                $resolveSink .= $parts['Key'] . '/';
252            }
253            $resolveSink .= $objectKey;
254            $sink = $this->destination['path'] . '/' . $objectKey;
255
256            $command = $this->client->getCommand(
257                'GetObject',
258                $this->getS3Args($object) + ['@http'  => ['sink'  => $sink]]
259            );
260
261            if (strpos(
262                    $this->resolveUri($resolveSink),
263                    $this->destination['path']
264                ) !== 0
265            ) {
266                throw new AwsException(
267                    'Cannot download key ' . $objectKey
268                    . ', its relative path resolves outside the'
269                    . ' parent directory', $command);
270            }
271
272            // Create the directory if needed.
273            $dir = dirname($sink);
274            if (!is_dir($dir) && !mkdir($dir, 0777, true)) {
275                throw new \RuntimeException("Could not create dir: {$dir}");
276            }
277
278            // Create the command.
279            $commands []= $command;
280        }
281
282        // Create a GetObject command pool and return the promise.
283        return (new Aws\CommandPool($this->client, $commands, [
284            'concurrency' => $this->concurrency,
285            'before'      => $this->before,
286            'rejected'    => function ($reason, $idx, Promise\PromiseInterface $p) {
287                $p->reject($reason);
288            }
289        ]))->promise();
290    }
291
292    private function createUploadPromise()
293    {
294        // Map each file into a promise that performs the actual transfer.
295        $files = \Aws\map($this->getUploadsIterator(), function ($file) {
296            return (filesize($file) >= $this->mupThreshold)
297                ? $this->uploadMultipart($file)
298                : $this->upload($file);
299        });
300
301        // Create an EachPromise, that will concurrently handle the upload
302        // operations' yielded promises from the iterator.
303        return Promise\each_limit_all($files, $this->concurrency);
304    }
305
306    /** @return Iterator */
307    private function getUploadsIterator()
308    {
309        if (is_string($this->source)) {
310            return Aws\filter(
311                Aws\recursive_dir_iterator($this->sourceMetadata['path']),
312                function ($file) { return !is_dir($file); }
313            );
314        }
315
316        return $this->source;
317    }
318
319    /** @return Iterator */
320    private function getDownloadsIterator()
321    {
322        if (is_string($this->source)) {
323            $listArgs = $this->getS3Args($this->sourceMetadata['path']);
324            if (isset($listArgs['Key'])) {
325                $listArgs['Prefix'] = $listArgs['Key'] . '/';
326                unset($listArgs['Key']);
327            }
328
329            $files = $this->client
330                ->getPaginator('ListObjects', $listArgs)
331                ->search('Contents[].Key');
332            $files = Aws\map($files, function ($key) use ($listArgs) {
333                return "s3://{$listArgs['Bucket']}/$key";
334            });
335            return Aws\filter($files, function ($key) {
336                return substr($key, -1, 1) !== '/';
337            });
338        }
339
340        return $this->source;
341    }
342
343    private function upload($filename)
344    {
345        $args = $this->s3Args;
346        $args['SourceFile'] = $filename;
347        $args['Key'] = $this->createS3Key($filename);
348        $command = $this->client->getCommand('PutObject', $args);
349        $this->before and call_user_func($this->before, $command);
350
351        return $this->client->executeAsync($command);
352    }
353
354    private function uploadMultipart($filename)
355    {
356        $args = $this->s3Args;
357        $args['Key'] = $this->createS3Key($filename);
358        $filename = $filename instanceof \SplFileInfo ? $filename->getPathname() : $filename;
359
360        return (new MultipartUploader($this->client, $filename, [
361            'bucket'          => $args['Bucket'],
362            'key'             => $args['Key'],
363            'before_initiate' => $this->before,
364            'before_upload'   => $this->before,
365            'before_complete' => $this->before,
366            'concurrency'     => $this->concurrency,
367        ]))->promise();
368    }
369
370    private function createS3Key($filename)
371    {
372        $filename = $this->normalizePath($filename);
373        $relative_file_path = ltrim(
374            preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename),
375            '/\\'
376        );
377
378        if (isset($this->s3Args['Key'])) {
379            return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path;
380        }
381
382        return $relative_file_path;
383    }
384
385    private function addDebugToBefore($debug)
386    {
387        $before = $this->before;
388        $sourcePath = $this->sourceMetadata['path'];
389        $s3Args = $this->s3Args;
390
391        $this->before = static function (
392            CommandInterface $command
393        ) use ($before, $debug, $sourcePath, $s3Args) {
394            // Call the composed before function.
395            $before and $before($command);
396
397            // Determine the source and dest values based on operation.
398            switch ($operation = $command->getName()) {
399                case 'GetObject':
400                    $source = "s3://{$command['Bucket']}/{$command['Key']}";
401                    $dest = $command['@http']['sink'];
402                    break;
403                case 'PutObject':
404                    $source = $command['SourceFile'];
405                    $dest = "s3://{$command['Bucket']}/{$command['Key']}";
406                    break;
407                case 'UploadPart':
408                    $part = $command['PartNumber'];
409                case 'CreateMultipartUpload':
410                case 'CompleteMultipartUpload':
411                    $sourceKey = $command['Key'];
412                    if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) {
413                        $sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1);
414                    }
415                    $source = "{$sourcePath}/{$sourceKey}";
416                    $dest = "s3://{$command['Bucket']}/{$command['Key']}";
417                    break;
418                default:
419                    throw new \UnexpectedValueException(
420                        "Transfer encountered an unexpected operation: {$operation}."
421                    );
422            }
423
424            // Print the debugging message.
425            $context = sprintf('%s -> %s (%s)', $source, $dest, $operation);
426            if (isset($part)) {
427                $context .= " : Part={$part}";
428            }
429            fwrite($debug, "Transferring {$context}\n");
430        };
431    }
432}
433