1<?php 2namespace Aws; 3 4use GuzzleHttp\Promise\PromisorInterface; 5use GuzzleHttp\Promise\EachPromise; 6 7/** 8 * Sends and iterator of commands concurrently using a capped pool size. 9 * 10 * The pool will read command objects from an iterator until it is cancelled or 11 * until the iterator is consumed. 12 */ 13class CommandPool implements PromisorInterface 14{ 15 /** @var EachPromise */ 16 private $each; 17 18 /** 19 * The CommandPool constructor accepts a hash of configuration options: 20 * 21 * - concurrency: (callable|int) Maximum number of commands to execute 22 * concurrently. Provide a function to resize the pool dynamically. The 23 * function will be provided the current number of pending requests and 24 * is expected to return an integer representing the new pool size limit. 25 * - before: (callable) function to invoke before sending each command. The 26 * before function accepts the command and the key of the iterator of the 27 * command. You can mutate the command as needed in the before function 28 * before sending the command. 29 * - fulfilled: (callable) Function to invoke when a promise is fulfilled. 30 * The function is provided the result object, id of the iterator that the 31 * result came from, and the aggregate promise that can be resolved/rejected 32 * if you need to short-circuit the pool. 33 * - rejected: (callable) Function to invoke when a promise is rejected. 34 * The function is provided an AwsException object, id of the iterator that 35 * the exception came from, and the aggregate promise that can be 36 * resolved/rejected if you need to short-circuit the pool. 37 * - preserve_iterator_keys: (bool) Retain the iterator key when generating 38 * the commands. 39 * 40 * @param AwsClientInterface $client Client used to execute commands. 41 * @param array|\Iterator $commands Iterable that yields commands. 42 * @param array $config Associative array of options. 43 */ 44 public function __construct( 45 AwsClientInterface $client, 46 $commands, 47 array $config = [] 48 ) { 49 if (!isset($config['concurrency'])) { 50 $config['concurrency'] = 25; 51 } 52 53 $before = $this->getBefore($config); 54 $mapFn = function ($commands) use ($client, $before, $config) { 55 foreach ($commands as $key => $command) { 56 if (!($command instanceof CommandInterface)) { 57 throw new \InvalidArgumentException('Each value yielded by ' 58 . 'the iterator must be an Aws\CommandInterface.'); 59 } 60 if ($before) { 61 $before($command, $key); 62 } 63 if (!empty($config['preserve_iterator_keys'])) { 64 yield $key => $client->executeAsync($command); 65 } else { 66 yield $client->executeAsync($command); 67 } 68 } 69 }; 70 71 $this->each = new EachPromise($mapFn($commands), $config); 72 } 73 74 /** 75 * @return \GuzzleHttp\Promise\PromiseInterface 76 */ 77 public function promise() 78 { 79 return $this->each->promise(); 80 } 81 82 /** 83 * Executes a pool synchronously and aggregates the results of the pool 84 * into an indexed array in the same order as the passed in array. 85 * 86 * @param AwsClientInterface $client Client used to execute commands. 87 * @param mixed $commands Iterable that yields commands. 88 * @param array $config Configuration options. 89 * 90 * @return array 91 * @see \Aws\CommandPool::__construct for available configuration options. 92 */ 93 public static function batch( 94 AwsClientInterface $client, 95 $commands, 96 array $config = [] 97 ) { 98 $results = []; 99 self::cmpCallback($config, 'fulfilled', $results); 100 self::cmpCallback($config, 'rejected', $results); 101 102 return (new self($client, $commands, $config)) 103 ->promise() 104 ->then(static function () use (&$results) { 105 ksort($results); 106 return $results; 107 }) 108 ->wait(); 109 } 110 111 /** 112 * @return callable 113 */ 114 private function getBefore(array $config) 115 { 116 if (!isset($config['before'])) { 117 return null; 118 } 119 120 if (is_callable($config['before'])) { 121 return $config['before']; 122 } 123 124 throw new \InvalidArgumentException('before must be callable'); 125 } 126 127 /** 128 * Adds an onFulfilled or onRejected callback that aggregates results into 129 * an array. If a callback is already present, it is replaced with the 130 * composed function. 131 * 132 * @param array $config 133 * @param $name 134 * @param array $results 135 */ 136 private static function cmpCallback(array &$config, $name, array &$results) 137 { 138 if (!isset($config[$name])) { 139 $config[$name] = function ($v, $k) use (&$results) { 140 $results[$k] = $v; 141 }; 142 } else { 143 $currentFn = $config[$name]; 144 $config[$name] = function ($v, $k) use (&$results, $currentFn) { 145 $currentFn($v, $k); 146 $results[$k] = $v; 147 }; 148 } 149 } 150} 151