1<?php 2namespace Aws\S3; 3 4use Aws; 5use Aws\CommandInterface; 6use Aws\Exception\AwsException; 7use GuzzleHttp\Promise; 8use GuzzleHttp\Promise\PromisorInterface; 9use Iterator; 10 11/** 12 * Transfers files from the local filesystem to S3 or from S3 to the local 13 * filesystem. 14 * 15 * This class does not support copying from the local filesystem to somewhere 16 * else on the local filesystem or from one S3 bucket to another. 17 */ 18class Transfer implements PromisorInterface 19{ 20 private $client; 21 private $promise; 22 private $source; 23 private $sourceMetadata; 24 private $destination; 25 private $concurrency; 26 private $mupThreshold; 27 private $before; 28 private $s3Args = []; 29 30 /** 31 * When providing the $source argument, you may provide a string referencing 32 * the path to a directory on disk to upload, an s3 scheme URI that contains 33 * the bucket and key (e.g., "s3://bucket/key"), or an \Iterator object 34 * that yields strings containing filenames that are the path to a file on 35 * disk or an s3 scheme URI. The bucket portion of the s3 URI may be an S3 36 * access point ARN. The "/key" portion of an s3 URI is optional. 37 * 38 * When providing an iterator for the $source argument, you must also 39 * provide a 'base_dir' key value pair in the $options argument. 40 * 41 * The $dest argument can be the path to a directory on disk or an s3 42 * scheme URI (e.g., "s3://bucket/key"). 43 * 44 * The options array can contain the following key value pairs: 45 * 46 * - base_dir: (string) Base directory of the source, if $source is an 47 * iterator. If the $source option is not an array, then this option is 48 * ignored. 49 * - before: (callable) A callback to invoke before each transfer. The 50 * callback accepts a single argument: Aws\CommandInterface $command. 51 * The provided command will be either a GetObject, PutObject, 52 * InitiateMultipartUpload, or UploadPart command. 53 * - mup_threshold: (int) Size in bytes in which a multipart upload should 54 * be used instead of PutObject. Defaults to 20971520 (20 MB). 55 * - concurrency: (int, default=5) Number of files to upload concurrently. 56 * The ideal concurrency value will vary based on the number of files 57 * being uploaded and the average size of each file. Generally speaking, 58 * smaller files benefit from a higher concurrency while larger files 59 * will not. 60 * - debug: (bool) Set to true to print out debug information for 61 * transfers. Set to an fopen() resource to write to a specific stream 62 * rather than writing to STDOUT. 63 * 64 * @param S3ClientInterface $client Client used for transfers. 65 * @param string|Iterator $source Where the files are transferred from. 66 * @param string $dest Where the files are transferred to. 67 * @param array $options Hash of options. 68 */ 69 public function __construct( 70 S3ClientInterface $client, 71 $source, 72 $dest, 73 array $options = [] 74 ) { 75 $this->client = $client; 76 77 // Prepare the destination. 78 $this->destination = $this->prepareTarget($dest); 79 if ($this->destination['scheme'] === 's3') { 80 $this->s3Args = $this->getS3Args($this->destination['path']); 81 } 82 83 // Prepare the source. 84 if (is_string($source)) { 85 $this->sourceMetadata = $this->prepareTarget($source); 86 $this->source = $source; 87 } elseif ($source instanceof Iterator) { 88 if (empty($options['base_dir'])) { 89 throw new \InvalidArgumentException('You must provide the source' 90 . ' argument as a string or provide the "base_dir" option.'); 91 } 92 93 $this->sourceMetadata = $this->prepareTarget($options['base_dir']); 94 $this->source = $source; 95 } else { 96 throw new \InvalidArgumentException('source must be the path to a ' 97 . 'directory or an iterator that yields file names.'); 98 } 99 100 // Validate schemes. 101 if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) { 102 throw new \InvalidArgumentException("You cannot copy from" 103 . " {$this->sourceMetadata['scheme']} to" 104 . " {$this->destination['scheme']}." 105 ); 106 } 107 108 // Handle multipart-related options. 109 $this->concurrency = isset($options['concurrency']) 110 ? $options['concurrency'] 111 : MultipartUploader::DEFAULT_CONCURRENCY; 112 $this->mupThreshold = isset($options['mup_threshold']) 113 ? $options['mup_threshold'] 114 : 16777216; 115 if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) { 116 throw new \InvalidArgumentException('mup_threshold must be >= 5MB'); 117 } 118 119 // Handle "before" callback option. 120 if (isset($options['before'])) { 121 $this->before = $options['before']; 122 if (!is_callable($this->before)) { 123 throw new \InvalidArgumentException('before must be a callable.'); 124 } 125 } 126 127 // Handle "debug" option. 128 if (isset($options['debug'])) { 129 if ($options['debug'] === true) { 130 $options['debug'] = fopen('php://output', 'w'); 131 } 132 if (is_resource($options['debug'])) { 133 $this->addDebugToBefore($options['debug']); 134 } 135 } 136 } 137 138 /** 139 * Transfers the files. 140 */ 141 public function promise() 142 { 143 // If the promise has been created, just return it. 144 if (!$this->promise) { 145 // Create an upload/download promise for the transfer. 146 $this->promise = $this->sourceMetadata['scheme'] === 'file' 147 ? $this->createUploadPromise() 148 : $this->createDownloadPromise(); 149 } 150 151 return $this->promise; 152 } 153 154 /** 155 * Transfers the files synchronously. 156 */ 157 public function transfer() 158 { 159 $this->promise()->wait(); 160 } 161 162 private function prepareTarget($targetPath) 163 { 164 $target = [ 165 'path' => $this->normalizePath($targetPath), 166 'scheme' => $this->determineScheme($targetPath), 167 ]; 168 169 if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') { 170 throw new \InvalidArgumentException('Scheme must be "s3" or "file".'); 171 } 172 173 return $target; 174 } 175 176 /** 177 * Creates an array that contains Bucket and Key by parsing the filename. 178 * 179 * @param string $path Path to parse. 180 * 181 * @return array 182 */ 183 private function getS3Args($path) 184 { 185 $parts = explode('/', str_replace('s3://', '', $path), 2); 186 $args = ['Bucket' => $parts[0]]; 187 if (isset($parts[1])) { 188 $args['Key'] = $parts[1]; 189 } 190 191 return $args; 192 } 193 194 /** 195 * Parses the scheme from a filename. 196 * 197 * @param string $path Path to parse. 198 * 199 * @return string 200 */ 201 private function determineScheme($path) 202 { 203 return !strpos($path, '://') ? 'file' : explode('://', $path)[0]; 204 } 205 206 /** 207 * Normalize a path so that it has UNIX-style directory separators and no trailing / 208 * 209 * @param string $path 210 * 211 * @return string 212 */ 213 private function normalizePath($path) 214 { 215 return rtrim(str_replace('\\', '/', $path), '/'); 216 } 217 218 private function resolveUri($uri) 219 { 220 $resolved = []; 221 $sections = explode('/', $uri); 222 foreach ($sections as $section) { 223 if ($section === '.' || $section === '') { 224 continue; 225 } 226 if ($section === '..') { 227 array_pop($resolved); 228 } else { 229 $resolved []= $section; 230 } 231 } 232 233 return ($uri[0] === '/' ? '/' : '') 234 . implode('/', $resolved); 235 } 236 237 private function createDownloadPromise() 238 { 239 $parts = $this->getS3Args($this->sourceMetadata['path']); 240 $prefix = "s3://{$parts['Bucket']}/" 241 . (isset($parts['Key']) ? $parts['Key'] . '/' : ''); 242 243 244 $commands = []; 245 foreach ($this->getDownloadsIterator() as $object) { 246 // Prepare the sink. 247 $objectKey = preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object); 248 249 $resolveSink = $this->destination['path'] . '/'; 250 if (isset($parts['Key']) && strpos($objectKey, $parts['Key']) !== 0) { 251 $resolveSink .= $parts['Key'] . '/'; 252 } 253 $resolveSink .= $objectKey; 254 $sink = $this->destination['path'] . '/' . $objectKey; 255 256 $command = $this->client->getCommand( 257 'GetObject', 258 $this->getS3Args($object) + ['@http' => ['sink' => $sink]] 259 ); 260 261 if (strpos( 262 $this->resolveUri($resolveSink), 263 $this->destination['path'] 264 ) !== 0 265 ) { 266 throw new AwsException( 267 'Cannot download key ' . $objectKey 268 . ', its relative path resolves outside the' 269 . ' parent directory', $command); 270 } 271 272 // Create the directory if needed. 273 $dir = dirname($sink); 274 if (!is_dir($dir) && !mkdir($dir, 0777, true)) { 275 throw new \RuntimeException("Could not create dir: {$dir}"); 276 } 277 278 // Create the command. 279 $commands []= $command; 280 } 281 282 // Create a GetObject command pool and return the promise. 283 return (new Aws\CommandPool($this->client, $commands, [ 284 'concurrency' => $this->concurrency, 285 'before' => $this->before, 286 'rejected' => function ($reason, $idx, Promise\PromiseInterface $p) { 287 $p->reject($reason); 288 } 289 ]))->promise(); 290 } 291 292 private function createUploadPromise() 293 { 294 // Map each file into a promise that performs the actual transfer. 295 $files = \Aws\map($this->getUploadsIterator(), function ($file) { 296 return (filesize($file) >= $this->mupThreshold) 297 ? $this->uploadMultipart($file) 298 : $this->upload($file); 299 }); 300 301 // Create an EachPromise, that will concurrently handle the upload 302 // operations' yielded promises from the iterator. 303 return Promise\each_limit_all($files, $this->concurrency); 304 } 305 306 /** @return Iterator */ 307 private function getUploadsIterator() 308 { 309 if (is_string($this->source)) { 310 return Aws\filter( 311 Aws\recursive_dir_iterator($this->sourceMetadata['path']), 312 function ($file) { return !is_dir($file); } 313 ); 314 } 315 316 return $this->source; 317 } 318 319 /** @return Iterator */ 320 private function getDownloadsIterator() 321 { 322 if (is_string($this->source)) { 323 $listArgs = $this->getS3Args($this->sourceMetadata['path']); 324 if (isset($listArgs['Key'])) { 325 $listArgs['Prefix'] = $listArgs['Key'] . '/'; 326 unset($listArgs['Key']); 327 } 328 329 $files = $this->client 330 ->getPaginator('ListObjects', $listArgs) 331 ->search('Contents[].Key'); 332 $files = Aws\map($files, function ($key) use ($listArgs) { 333 return "s3://{$listArgs['Bucket']}/$key"; 334 }); 335 return Aws\filter($files, function ($key) { 336 return substr($key, -1, 1) !== '/'; 337 }); 338 } 339 340 return $this->source; 341 } 342 343 private function upload($filename) 344 { 345 $args = $this->s3Args; 346 $args['SourceFile'] = $filename; 347 $args['Key'] = $this->createS3Key($filename); 348 $command = $this->client->getCommand('PutObject', $args); 349 $this->before and call_user_func($this->before, $command); 350 351 return $this->client->executeAsync($command); 352 } 353 354 private function uploadMultipart($filename) 355 { 356 $args = $this->s3Args; 357 $args['Key'] = $this->createS3Key($filename); 358 $filename = $filename instanceof \SplFileInfo ? $filename->getPathname() : $filename; 359 360 return (new MultipartUploader($this->client, $filename, [ 361 'bucket' => $args['Bucket'], 362 'key' => $args['Key'], 363 'before_initiate' => $this->before, 364 'before_upload' => $this->before, 365 'before_complete' => $this->before, 366 'concurrency' => $this->concurrency, 367 ]))->promise(); 368 } 369 370 private function createS3Key($filename) 371 { 372 $filename = $this->normalizePath($filename); 373 $relative_file_path = ltrim( 374 preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename), 375 '/\\' 376 ); 377 378 if (isset($this->s3Args['Key'])) { 379 return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path; 380 } 381 382 return $relative_file_path; 383 } 384 385 private function addDebugToBefore($debug) 386 { 387 $before = $this->before; 388 $sourcePath = $this->sourceMetadata['path']; 389 $s3Args = $this->s3Args; 390 391 $this->before = static function ( 392 CommandInterface $command 393 ) use ($before, $debug, $sourcePath, $s3Args) { 394 // Call the composed before function. 395 $before and $before($command); 396 397 // Determine the source and dest values based on operation. 398 switch ($operation = $command->getName()) { 399 case 'GetObject': 400 $source = "s3://{$command['Bucket']}/{$command['Key']}"; 401 $dest = $command['@http']['sink']; 402 break; 403 case 'PutObject': 404 $source = $command['SourceFile']; 405 $dest = "s3://{$command['Bucket']}/{$command['Key']}"; 406 break; 407 case 'UploadPart': 408 $part = $command['PartNumber']; 409 case 'CreateMultipartUpload': 410 case 'CompleteMultipartUpload': 411 $sourceKey = $command['Key']; 412 if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) { 413 $sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1); 414 } 415 $source = "{$sourcePath}/{$sourceKey}"; 416 $dest = "s3://{$command['Bucket']}/{$command['Key']}"; 417 break; 418 default: 419 throw new \UnexpectedValueException( 420 "Transfer encountered an unexpected operation: {$operation}." 421 ); 422 } 423 424 // Print the debugging message. 425 $context = sprintf('%s -> %s (%s)', $source, $dest, $operation); 426 if (isset($part)) { 427 $context .= " : Part={$part}"; 428 } 429 fwrite($debug, "Transferring {$context}\n"); 430 }; 431 } 432} 433