1<?php
2
3namespace Basho\Riak\Api;
4
5use Basho\Riak\Api;
6use Basho\Riak\ApiInterface;
7use Basho\Riak\Bucket;
8use Basho\Riak\Command;
9use Basho\Riak\DataType\Counter;
10use Basho\Riak\DataType\Map;
11use Basho\Riak\DataType\Set;
12use Basho\Riak\DataType\Hll;
13use Basho\Riak\Location;
14use Basho\Riak\Node;
15use Basho\Riak\DataObject;
16use Basho\Riak\Search\Doc;
17use Basho\Riak\TimeSeries\Cell;
18
19/**
20 * Handles communications between end user app & Riak via Riak HTTP API using cURL
21 *
22 * @author Christopher Mancini <cmancini at basho d0t com>
23 */
24class Http extends Api implements ApiInterface
25{
26    // Header keys
27    const VCLOCK_KEY = 'X-Riak-Vclock';
28    const CONTENT_TYPE_KEY = 'Content-Type';
29    const METADATA_PREFIX = 'X-Riak-Meta-';
30    const LAST_MODIFIED_KEY = 'Last-Modified';
31
32    // Content Types
33    const CONTENT_TYPE_JSON = 'application/json';
34    const CONTENT_TYPE_XML = 'application/xml';
35
36    const TS_API_PREFIX = '/ts/v1';
37
38    /**
39     * Request body to be sent
40     *
41     * @var string
42     */
43    protected $requestBody = '';
44
45    /**
46     * Request url the request is being sent to
47     *
48     * @var string
49     */
50    protected $requestURL = '';
51
52    /**
53     * Response headers returned from request
54     *
55     * @var array
56     */
57    protected $responseHeaders = [];
58
59    /**
60     * Response body returned from request
61     *
62     * @var string
63     */
64    protected $responseBody = '';
65
66    /**
67     * HTTP Status Code from response
68     *
69     * @var int
70     */
71    protected $statusCode = 0;
72
73    /**
74     * cURL connection handle
75     *
76     * @var null
77     */
78    protected $connection = null;
79
80    /**
81     * API path
82     *
83     * @var string
84     */
85    protected $path = '';
86
87    /**
88     * Query string
89     *
90     * @var string
91     */
92    protected $query = '';
93
94    private $options = [];
95
96    protected $headers = [];
97
98    /**
99     * @return int
100     */
101    public function getStatusCode()
102    {
103        return $this->statusCode;
104    }
105
106    /**
107     * @return string
108     */
109    public function getResponseBody()
110    {
111        return $this->responseBody;
112    }
113
114    /**
115     * @return array
116     */
117    public function getResponseHeaders()
118    {
119        return $this->responseHeaders;
120    }
121
122    /**
123     * @return string
124     */
125    public function getRequest()
126    {
127        return $this->request . $this->requestBody;
128    }
129
130    public function closeConnection()
131    {
132        if ($this->connection) {
133            curl_close($this->connection);
134            $this->connection = null;
135        }
136    }
137
138    /**
139     * Prepare request to be sent
140     *
141     * @param Command $command
142     * @param Node $node
143     *
144     * @return $this
145     */
146    public function prepare(Command $command, Node $node)
147    {
148        if ($this->connection) {
149            $this->resetConnection();
150        }
151
152        // call parent prepare method to setup object members
153        parent::prepare($command, $node);
154
155        // set the API path to be used
156        $this->buildPath();
157
158        // general connection preparation
159        $this->prepareConnection();
160
161        // request specific connection preparation
162        $this->prepareRequest();
163
164        return $this;
165    }
166
167    public function resetConnection()
168    {
169        $this->command = null;
170        $this->options = [];
171        $this->path = '';
172        $this->query = '';
173        $this->requestBody = '';
174        $this->requestURL = '';
175        $this->responseHeaders = [];
176        $this->responseBody = '';
177
178        if (version_compare(PHP_VERSION, '5.5.0') >= 0) {
179            curl_reset($this->connection);
180        } else {
181            curl_close($this->connection);
182            $this->connection = null;
183        }
184    }
185
186    /**
187     * Sets the API path for the command
188     *
189     * @return $this
190     */
191    protected function buildPath()
192    {
193        $bucket = null;
194        $key = '';
195
196        $bucket = $this->command->getBucket();
197
198        $location = $this->command->getLocation();
199        if (!empty($location) && $location instanceof Location) {
200            $key = $location->getKey();
201        }
202        switch (get_class($this->command)) {
203            /** @noinspection PhpMissingBreakStatementInspection */
204            case 'Basho\Riak\Command\Bucket\Store':
205                $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON;
206            case 'Basho\Riak\Command\Bucket\Fetch':
207            case 'Basho\Riak\Command\Bucket\Delete':
208                $this->path = sprintf('/types/%s/buckets/%s/props', $bucket->getType(), $bucket->getName());
209                break;
210            /** @noinspection PhpMissingBreakStatementInspection */
211            case 'Basho\Riak\Command\KVObject\Fetch':
212                $this->headers['Accept'] = '*/*, multipart/mixed';
213            case 'Basho\Riak\Command\KVObject\Store':
214            case 'Basho\Riak\Command\KVObject\Delete':
215                $this->path = sprintf('/types/%s/buckets/%s/keys/%s', $bucket->getType(), $bucket->getName(), $key);
216                break;
217            case 'Basho\Riak\Command\KVObject\Keys\Fetch':
218                $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON;
219                $this->path = sprintf('/types/%s/buckets/%s/keys', $bucket->getType(), $bucket->getName());
220                break;
221            case 'Basho\Riak\Command\DataType\Counter\Store':
222            case 'Basho\Riak\Command\DataType\GSet\Store':
223            case 'Basho\Riak\Command\DataType\Set\Store':
224            /** @noinspection PhpMissingBreakStatementInspection */
225            case 'Basho\Riak\Command\DataType\Map\Store':
226            case 'Basho\Riak\Command\DataType\Hll\Store':
227                $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON;
228            case 'Basho\Riak\Command\DataType\Counter\Fetch':
229            case 'Basho\Riak\Command\DataType\Set\Fetch':
230            case 'Basho\Riak\Command\DataType\Map\Fetch':
231            case 'Basho\Riak\Command\DataType\Hll\Fetch':
232                $this->path = sprintf('/types/%s/buckets/%s/datatypes/%s', $bucket->getType(), $bucket->getName(), $key);
233                break;
234            /** @noinspection PhpMissingBreakStatementInspection */
235            case 'Basho\Riak\Command\Search\Index\Store':
236                $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON;
237            case 'Basho\Riak\Command\Search\Index\Fetch':
238            case 'Basho\Riak\Command\Search\Index\Delete':
239                $this->path = sprintf('/search/index/%s', $this->command);
240                break;
241            /** @noinspection PhpMissingBreakStatementInspection */
242            case 'Basho\Riak\Command\Search\Schema\Store':
243                $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_XML;
244            case 'Basho\Riak\Command\Search\Schema\Fetch':
245                $this->path = sprintf('/search/schema/%s', $this->command);
246                break;
247            case 'Basho\Riak\Command\Search\Fetch':
248                $this->path = sprintf('/search/query/%s', $this->command);
249                break;
250            case 'Basho\Riak\Command\MapReduce\Fetch':
251                $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON;
252                $this->path = sprintf('/%s', $this->config['mapred_prefix']);
253                break;
254            case 'Basho\Riak\Command\Indexes\Query':
255                $this->path = $this->createIndexQueryPath($bucket);
256                break;
257            case 'Basho\Riak\Command\Ping':
258                $this->path = '/ping';
259                break;
260            case 'Basho\Riak\Command\Stats':
261                $this->path = '/stats';
262                break;
263            case 'Basho\Riak\Command\KVObject\FetchPreflist':
264                $this->path = sprintf('/types/%s/buckets/%s/keys/%s/preflist', $bucket->getType(), $bucket->getName(), $key);
265                break;
266            case 'Basho\Riak\Command\TimeSeries\Fetch':
267            case 'Basho\Riak\Command\TimeSeries\Delete':
268                /** @var $command Command\TimeSeries\Fetch */
269                $command = $this->command;
270                $this->path = sprintf('%s/tables/%s/keys/%s', static::TS_API_PREFIX, $command->getTable(), implode("/", $command->getData()));
271                break;
272            case 'Basho\Riak\Command\TimeSeries\Store':
273                /** @var $command Command\TimeSeries\Store */
274                $command = $this->command;
275                $this->path = sprintf('%s/tables/%s/keys', static::TS_API_PREFIX, $command->getTable());
276                break;
277            case 'Basho\Riak\Command\TimeSeries\Query\Fetch':
278                $this->path = sprintf('%s/query', static::TS_API_PREFIX);
279                break;
280            default:
281                $this->path = '';
282        }
283
284        return $this;
285    }
286
287    /**
288     * Generates the URL path for a 2i Query
289     *
290     * @param Bucket $bucket
291     * @return string
292     * @throws Api\Exception if 2i query is invalid.
293     */
294    private function createIndexQueryPath(Bucket $bucket)
295    {
296        /**  @var Command\Indexes\Query $command */
297        $command = $this->command;
298
299        if($command->isMatchQuery()) {
300            $path =  sprintf('/types/%s/buckets/%s/index/%s/%s', $bucket->getType(),
301                        $bucket->getName(),
302                        $command->getIndexName(),
303                        $command->getMatchValue());
304        }
305        elseif($command->isRangeQuery()) {
306            $path =  sprintf('/types/%s/buckets/%s/index/%s/%s/%s', $bucket->getType(),
307                        $bucket->getName(),
308                        $command->getIndexName(),
309                        $command->getLowerBound(),
310                        $command->getUpperBound());
311        }
312        else
313        {
314            throw new Api\Exception("Invalid Secondary Index Query.");
315        }
316
317        return $path;
318    }
319
320    /**
321     * Prepare Connection
322     *
323     * Sets general connection options that are used with every request
324     *
325     * @return $this
326     * @throws Api\Exception
327     */
328    protected function prepareConnection()
329    {
330        // record outgoing headers
331        $this->options[CURLINFO_HEADER_OUT] = 1;
332
333        if ($this->command->getConnectionTimeout()) {
334            $this->options[CURLOPT_TIMEOUT] = $this->command->getConnectionTimeout();
335        }
336
337        if ($this->node->useTls()) {
338            // CA File
339            if ($this->node->getCaFile()) {
340                $this->options[CURLOPT_CAINFO] = $this->node->getCaFile();
341            } elseif ($this->node->getCaDirectory()) {
342                $this->options[CURLOPT_CAPATH] = $this->node->getCaDirectory();
343            } else {
344                throw new Api\Exception('A Certificate Authority file is required for secure connections.');
345            }
346
347            // verify CA file
348            $this->options[CURLOPT_SSL_VERIFYPEER] = true;
349
350            // verify host common name
351            $this->options[CURLOPT_SSL_VERIFYHOST] = 0;
352
353            if ($this->node->getUserName()) {
354                $this->options[CURLOPT_USERPWD] = sprintf('%s:%s', $this->node->getUserName(),
355                    $this->node->getPassword());
356            } elseif ($this->node->getCertificate()) {
357                /*
358                 * NOT CURRENTLY SUPPORTED ON HTTP
359                 *
360                $this->options[CURLOPT_SSLCERT] = $this->node->getCertificate();
361                $this->options[CURLOPT_SSLCERTTYPE] = 'P12';
362                if ($this->node->getCertificatePassword()) {
363                    $this->options[CURLOPT_SSLCERTPASSWD] = $this->node->getCertificatePassword();
364                }
365                if ($this->node->getPrivateKey()) {
366                    $this->options[CURLOPT_SSLKEY] = $this->node->getPrivateKey();
367                    if ($this->node->getPrivateKeyPassword()) {
368                        $this->options[CURLOPT_SSLKEYPASSWD] = $this->node->getPrivateKeyPassword();
369                    }
370                }
371                */
372            }
373        }
374
375        return $this;
376    }
377
378    /**
379     * Prepare request
380     *
381     * Sets connection options that are specific to this request
382     *
383     * @return $this
384     */
385    protected function prepareRequest()
386    {
387        return $this->prepareRequestMethod()
388            ->prepareRequestHeaders()
389            ->prepareRequestParameters()
390            ->prepareRequestUrl()
391            ->prepareRequestData();
392    }
393
394    /**
395     * Prepare request data
396     *
397     * @return $this
398     */
399    protected function prepareRequestData()
400    {
401        // if POST or PUT, add parameters to post data, else add to uri
402        if (in_array($this->command->getMethod(), ['POST', 'PUT'])) {
403            $this->requestBody = $this->command->getEncodedData();
404            $this->options[CURLOPT_POSTFIELDS] = $this->requestBody;
405        }
406
407        return $this;
408    }
409
410    /**
411     * Prepares the complete request URL
412     *
413     * @return $this
414     */
415    protected function prepareRequestUrl()
416    {
417        $protocol = $this->node->useTls() ? 'https' : 'http';
418        $this->requestURL = sprintf('%s://%s%s?%s', $protocol, $this->node->getUri(), $this->path, $this->query);
419
420        // set the built request URL on the connection
421        $this->options[CURLOPT_URL] = $this->requestURL;
422
423        return $this;
424    }
425
426    /**
427     * Prepare request parameters
428     *
429     * @return $this
430     */
431    protected function prepareRequestParameters()
432    {
433        if ($this->command->hasParameters()) {
434            // build query using RFC 3986 (spaces become %20 instead of '+')
435            $this->query = http_build_query($this->command->getParameters(), '', '&', PHP_QUERY_RFC3986);
436        }
437
438        return $this;
439    }
440
441    /**
442     * Prepares the request headers
443     *
444     * @return $this
445     */
446    protected function prepareRequestHeaders()
447    {
448        $curl_headers = [];
449
450        foreach ($this->headers as $key => $value) {
451            $curl_headers[] = sprintf('%s: %s', $key, $value);
452        }
453
454        // if we have an object, set appropriate object headers
455        $object = $this->command->getObject();
456        if ($object) {
457            if ($object->getVclock()) {
458                $curl_headers[] = sprintf('%s: %s', static::VCLOCK_KEY, $object->getVclock());
459            }
460
461            if ($object->getContentType()) {
462                $charset = '';
463                if ($object->getCharset()) {
464                    $charset = sprintf('; charset=%s', $object->getCharset());
465                }
466                $curl_headers[] = sprintf('%s: %s', static::CONTENT_TYPE_KEY, $object->getContentType(), $charset);
467            }
468
469            // setup index headers
470            $translator = new Api\Http\Translator\SecondaryIndex();
471            $indexHeaders = $translator->createHeadersFromIndexes($object->getIndexes());
472            foreach ($indexHeaders as $value) {
473                $curl_headers[] = sprintf('%s: %s', $value[0], $value[1]);
474            }
475
476            // setup metadata headers
477            foreach($object->getMetaData() as $key => $value) {
478                $curl_headers[] = sprintf('%s%s: %s', static::METADATA_PREFIX, $key, $value);
479            }
480        }
481
482        // set the request headers on the connection
483        $this->options[CURLOPT_HTTPHEADER] = $curl_headers;
484
485        // dump local headers to start fresh
486        $this->headers = [];
487
488        return $this;
489    }
490
491    /**
492     * Prepare the request method
493     *
494     * @return $this
495     */
496    protected function prepareRequestMethod()
497    {
498        switch ($this->command->getMethod()) {
499            case "POST":
500                $this->options[CURLOPT_POST] = 1;
501                break;
502            case "PUT":
503                $this->options[CURLOPT_CUSTOMREQUEST] = 'PUT';
504                break;
505            case "DELETE":
506                $this->options[CURLOPT_CUSTOMREQUEST] = 'DELETE';
507                break;
508            case "HEAD":
509                $this->options[CURLOPT_NOBODY] = 1;
510                break;
511            default:
512                // reset http method to get in case its changed
513                $this->options[CURLOPT_HTTPGET] = 1;
514        }
515
516        return $this;
517    }
518
519    /**
520     * @return string
521     */
522    public function getPath()
523    {
524        return $this->path;
525    }
526
527    /**
528     * @return string
529     */
530    public function getQuery()
531    {
532        return $this->query;
533    }
534
535    /**
536     * @return bool
537     */
538    public function send()
539    {
540        // set the response header and body callback functions
541        $this->options[CURLOPT_HEADERFUNCTION] = [$this, 'responseHeaderCallback'];
542        $this->options[CURLOPT_WRITEFUNCTION] = [$this, 'responseBodyCallback'];
543        if ($this->command->isVerbose()) {
544            // set curls output to be the output buffer stream
545            $this->options[CURLOPT_STDERR] = fopen('php://stdout', 'w+');
546            $this->options[CURLOPT_VERBOSE] = 1;
547
548            // there is a bug when verbose is enabled, header out causes no output
549            // @see https://bugs.php.net/bug.php?id=65348
550            unset($this->options[CURLINFO_HEADER_OUT]);
551
552            echo "cURL Command:\n\tcurl -X{$this->command->getMethod()} {$this->requestURL} --data \"{$this->requestBody}\"\n";
553        }
554
555        // set all options on the resource
556        curl_setopt_array($this->getConnection(), $this->options);
557
558        // execute the request
559        $this->success = curl_exec($this->getConnection());
560        if ($this->success === false) {
561            $this->error = curl_error($this->getConnection());
562        } elseif ($this->success === true) {
563            $this->error = '';
564        }
565
566        $this->request = curl_getinfo($this->getConnection(), CURLINFO_HEADER_OUT);
567
568        // set the response http code
569        $this->statusCode = curl_getinfo($this->getConnection(), CURLINFO_HTTP_CODE);
570
571        $this->parseResponse();
572
573        return $this->success;
574    }
575
576    /**
577     * Add a custom header to the request
578     *
579     * @param $key
580     * @param $value
581     */
582    public function addHeader($key, $value)
583    {
584        $this->headers[$key] = $value;
585    }
586
587    /**
588     * @return resource
589     */
590    public function getConnection()
591    {
592        if (!$this->connection) {
593            $this->openConnection();
594        }
595
596        return $this->connection;
597    }
598
599    public function openConnection()
600    {
601        $this->connection = curl_init();
602
603        return $this;
604    }
605
606    /**
607     * Response header callback
608     *
609     * Handles callback from curl when the response is received, it parses the headers into an array sets them as
610     * member of the class.
611     *
612     * Has to be public for curl to be able to access it.
613     *
614     * @param $ch
615     * @param $header
616     *
617     * @return int
618     */
619    public function responseHeaderCallback($ch, $header)
620    {
621        if (strpos($header, ':')) {
622            list ($key, $value) = explode(':', $header, 2);
623
624            $value = trim($value);
625
626            if (!empty($value)) {
627                if (!isset($this->responseHeaders[$key])) {
628                    $this->responseHeaders[$key] = $value;
629                } elseif (is_array($this->responseHeaders[$key])) {
630                    $this->responseHeaders[$key] = array_merge($this->responseHeaders[$key], [$value]);
631                } else {
632                    $this->responseHeaders[$key] = array_merge([$this->responseHeaders[$key]], [$value]);
633                }
634            }
635        }
636
637        return strlen($header);
638    }
639
640    /**
641     * Response body callback
642     *
643     * Handles callback from curl when the response is received, it sets the response body as a member of the class.
644     *
645     * Has to be public for curl to be able to access it.
646     *
647     * @param $ch
648     * @param $body
649     *
650     * @return int
651     */
652    public function responseBodyCallback($ch, $body)
653    {
654        $this->responseBody .= $body;
655        return strlen($body);
656    }
657
658    protected function parseResponse()
659    {
660        // trim leading / trailing whitespace
661        $body = $this->responseBody;
662        $location = null;
663        if ($this->getResponseHeader('Location')) {
664            $location = Location::fromString($this->getResponseHeader('Location'));
665        }
666
667        if ($this->statusCode == 500) {
668            $this->success = false;
669            $this->error = $body;
670        }
671
672        switch (get_class($this->command)) {
673            case 'Basho\Riak\Command\Bucket\Store':
674            case 'Basho\Riak\Command\Bucket\Fetch':
675                $bucket = null;
676                $modified = $this->getResponseHeader(static::LAST_MODIFIED_KEY, '');
677                $properties = json_decode($body, true);
678                if (isset($properties['props']) && $this->command->getBucket()) {
679                    $bucket = new Bucket($this->command->getBucket()->getName(), $this->command->getBucket()->getType(), $properties['props']);
680                }
681                $response = new Command\Bucket\Response($this->success, $this->statusCode, $this->error, $bucket, $modified);
682                break;
683
684            case 'Basho\Riak\Command\KVObject\Fetch':
685            case 'Basho\Riak\Command\KVObject\Store':
686                /** @var Command\KVObject $command */
687                $command = $this->command;
688                $objects = (new Api\Http\Translator\ObjectResponse($command, $this->statusCode))
689                    ->parseResponse($body, $this->responseHeaders);
690                $response = new Command\KVObject\Response($this->success, $this->statusCode, $this->error, $location, $objects);
691                break;
692
693            case 'Basho\Riak\Command\KVObject\FetchPreflist':
694                $response = new Command\KVObject\Response($this->success, $this->statusCode, $this->error, $location, [new DataObject(json_decode($body))]);
695                break;
696
697            case 'Basho\Riak\Command\KVObject\Keys\Fetch':
698                $data = json_decode($body);
699                $keys = [];
700                if ($data && isset($data->keys)) {
701                    foreach ($data->keys as $key) {
702                        $keys[] = new Location($key, $this->command->getBucket());
703                    }
704                }
705                $response = new Command\KVObject\Keys\Response($this->success, $this->statusCode, $this->error, $keys);
706                break;
707
708            case 'Basho\Riak\Command\DataType\Counter\Store':
709            case 'Basho\Riak\Command\DataType\Counter\Fetch':
710                $counter = null;
711                $json_object = json_decode($body);
712                if ($json_object && isset($json_object->value)) {
713                    $counter = new Counter($json_object->value);
714                }
715                $response = new Command\DataType\Counter\Response(
716                    $this->success, $this->statusCode, $this->error, $location, $counter, $this->getResponseHeader('Date')
717                );
718                break;
719
720            case 'Basho\Riak\Command\DataType\GSet\Store':
721            case 'Basho\Riak\Command\DataType\Set\Store':
722            case 'Basho\Riak\Command\DataType\Set\Fetch':
723                $set = null;
724                $json_object = json_decode($body);
725                if ($json_object && isset($json_object->value)) {
726                    $context = '';
727                    if (isset($json_object->context)) {
728                        $context = $json_object->context;
729                    }
730                    $set = new Set($json_object->value, $context);
731                }
732                $response = new Command\DataType\Set\Response(
733                    $this->success, $this->statusCode, $this->error, $location, $set, $this->getResponseHeader('Date')
734                );
735                break;
736
737            case 'Basho\Riak\Command\DataType\Map\Store':
738            case 'Basho\Riak\Command\DataType\Map\Fetch':
739                $map = null;
740                $json_object = json_decode($body, true);
741                if ($json_object && isset($json_object['value'])) {
742                    $map = new Map($json_object['value'], $json_object['context']);
743                }
744                $response = new Command\DataType\Map\Response(
745                    $this->success, $this->statusCode, $this->error, $location, $map, $this->getResponseHeader('Date')
746                );
747                break;
748
749            case 'Basho\Riak\Command\DataType\Hll\Store':
750            case 'Basho\Riak\Command\DataType\Hll\Fetch':
751                $hll = null;
752                $json_object = json_decode($body);
753                if ($json_object && isset($json_object->value)) {
754                    $hll = new Hll($json_object->value);
755                }
756                $response = new Command\DataType\Hll\Response(
757                    $this->success, $this->statusCode, $this->error, $location, $hll, $this->getResponseHeader('Date')
758                );
759                break;
760
761            case 'Basho\Riak\Command\Search\Fetch':
762                $results = in_array($this->statusCode, [200,204]) ? json_decode($body) : null;
763                $docs = [];
764                if (!empty($results->response->docs)) {
765                    foreach ($results->response->docs as $doc) {
766                        $docs[] = new Doc($doc);
767                    }
768                }
769                $numFound = !empty($results->response->numFound) ? $results->response->numFound : 0;
770
771                $response = new Command\Search\Response($this->success, $this->statusCode, $this->error, $numFound, $docs);
772                break;
773            case 'Basho\Riak\Command\Search\Index\Store':
774            case 'Basho\Riak\Command\Search\Index\Fetch':
775                $index = json_decode($body);
776                $response = new Command\Search\Index\Response($this->success, $this->statusCode, $this->error, $index);
777                break;
778
779            case 'Basho\Riak\Command\Search\Schema\Store':
780            case 'Basho\Riak\Command\Search\Schema\Fetch':
781                $response = new Command\Search\Schema\Response(
782                    $this->success, $this->statusCode, $this->error, $body, $this->getResponseHeader(static::CONTENT_TYPE_KEY)
783                );
784                break;
785
786            case 'Basho\Riak\Command\MapReduce\Fetch':
787                $results = in_array($this->statusCode, [200,204]) ? json_decode($body) : null;
788                $response = new Command\MapReduce\Response($this->success, $this->statusCode, $this->error, $results);
789                break;
790            case 'Basho\Riak\Command\Indexes\Query':
791                $json_object = in_array($this->statusCode, [200,204]) ? json_decode($body, true) : null;
792                $results = [];
793                $termsReturned = false;
794                $continuation = null;
795                $done = true;
796
797                if (isset($json_object['keys'])) {
798                    $results = $json_object['keys'];
799                }
800
801                if (isset($json_object['results'])) {
802                    $results = $json_object['results'];
803                    $termsReturned = true;
804                }
805
806                if (isset($json_object['continuation'])) {
807                    $continuation = $json_object['continuation'];
808                    $done = false;
809                }
810
811                $response = new Command\Indexes\Response(
812                    $this->success, $this->statusCode, $this->error, $results, $termsReturned, $continuation, $done, $this->getResponseHeader('Date')
813                );
814                break;
815            case 'Basho\Riak\Command\Stats':
816                $response = new Command\Stats\Response($this->success, $this->statusCode, $this->error, json_decode($body, true));
817                break;
818            case 'Basho\Riak\Command\TimeSeries\Fetch':
819                $row = in_array($this->statusCode, ['200','201','204']) ? json_decode($body, true) : [];
820                $response = new Command\TimeSeries\Response($this->success, $this->statusCode, $this->error, [$row]);
821                break;
822            case 'Basho\Riak\Command\TimeSeries\Query\Fetch':
823                $results = in_array($this->statusCode, ['200','204']) ? json_decode($body) : [];
824                $rows = [];
825                if (isset($results->rows)) {
826                    foreach ($results->rows as $row) {
827                        $cells = [];
828                        foreach ($results->columns as $index => $column) {
829                            $cells[$column] = $row[$index];
830                        }
831                        $rows[] = $cells;
832                    }
833                }
834                $response = new Command\TimeSeries\Query\Response($this->success, $this->statusCode, $this->error, $rows);
835                break;
836            case 'Basho\Riak\Command\TimeSeries\Store':
837            case 'Basho\Riak\Command\TimeSeries\Delete':
838            case 'Basho\Riak\Command\KVObject\Delete':
839            case 'Basho\Riak\Command\Bucket\Delete':
840            case 'Basho\Riak\Command\Search\Index\Delete':
841            case 'Basho\Riak\Command\Ping':
842            default:
843                $response = new Command\Response($this->success, $this->statusCode, $this->error);
844                break;
845        }
846
847        $this->response = $response;
848    }
849
850    protected function getResponseHeader($key, $default = '')
851    {
852        if (!empty($this->responseHeaders[$key])) {
853            return $this->responseHeaders[$key];
854        } else {
855            return $default;
856        }
857    }
858}
859