1<?php 2 3namespace Basho\Riak\Api; 4 5use Basho\Riak\Api; 6use Basho\Riak\ApiInterface; 7use Basho\Riak\Bucket; 8use Basho\Riak\Command; 9use Basho\Riak\DataType\Counter; 10use Basho\Riak\DataType\Map; 11use Basho\Riak\DataType\Set; 12use Basho\Riak\DataType\Hll; 13use Basho\Riak\Location; 14use Basho\Riak\Node; 15use Basho\Riak\DataObject; 16use Basho\Riak\Search\Doc; 17use Basho\Riak\TimeSeries\Cell; 18 19/** 20 * Handles communications between end user app & Riak via Riak HTTP API using cURL 21 * 22 * @author Christopher Mancini <cmancini at basho d0t com> 23 */ 24class Http extends Api implements ApiInterface 25{ 26 // Header keys 27 const VCLOCK_KEY = 'X-Riak-Vclock'; 28 const CONTENT_TYPE_KEY = 'Content-Type'; 29 const METADATA_PREFIX = 'X-Riak-Meta-'; 30 const LAST_MODIFIED_KEY = 'Last-Modified'; 31 32 // Content Types 33 const CONTENT_TYPE_JSON = 'application/json'; 34 const CONTENT_TYPE_XML = 'application/xml'; 35 36 const TS_API_PREFIX = '/ts/v1'; 37 38 /** 39 * Request body to be sent 40 * 41 * @var string 42 */ 43 protected $requestBody = ''; 44 45 /** 46 * Request url the request is being sent to 47 * 48 * @var string 49 */ 50 protected $requestURL = ''; 51 52 /** 53 * Response headers returned from request 54 * 55 * @var array 56 */ 57 protected $responseHeaders = []; 58 59 /** 60 * Response body returned from request 61 * 62 * @var string 63 */ 64 protected $responseBody = ''; 65 66 /** 67 * HTTP Status Code from response 68 * 69 * @var int 70 */ 71 protected $statusCode = 0; 72 73 /** 74 * cURL connection handle 75 * 76 * @var null 77 */ 78 protected $connection = null; 79 80 /** 81 * API path 82 * 83 * @var string 84 */ 85 protected $path = ''; 86 87 /** 88 * Query string 89 * 90 * @var string 91 */ 92 protected $query = ''; 93 94 private $options = []; 95 96 protected $headers = []; 97 98 /** 99 * @return int 100 */ 101 public function getStatusCode() 102 { 103 return $this->statusCode; 104 } 105 106 /** 107 * @return string 108 */ 109 public function getResponseBody() 110 { 111 return $this->responseBody; 112 } 113 114 /** 115 * @return array 116 */ 117 public function getResponseHeaders() 118 { 119 return $this->responseHeaders; 120 } 121 122 /** 123 * @return string 124 */ 125 public function getRequest() 126 { 127 return $this->request . $this->requestBody; 128 } 129 130 public function closeConnection() 131 { 132 if ($this->connection) { 133 curl_close($this->connection); 134 $this->connection = null; 135 } 136 } 137 138 /** 139 * Prepare request to be sent 140 * 141 * @param Command $command 142 * @param Node $node 143 * 144 * @return $this 145 */ 146 public function prepare(Command $command, Node $node) 147 { 148 if ($this->connection) { 149 $this->resetConnection(); 150 } 151 152 // call parent prepare method to setup object members 153 parent::prepare($command, $node); 154 155 // set the API path to be used 156 $this->buildPath(); 157 158 // general connection preparation 159 $this->prepareConnection(); 160 161 // request specific connection preparation 162 $this->prepareRequest(); 163 164 return $this; 165 } 166 167 public function resetConnection() 168 { 169 $this->command = null; 170 $this->options = []; 171 $this->path = ''; 172 $this->query = ''; 173 $this->requestBody = ''; 174 $this->requestURL = ''; 175 $this->responseHeaders = []; 176 $this->responseBody = ''; 177 178 if (version_compare(PHP_VERSION, '5.5.0') >= 0) { 179 curl_reset($this->connection); 180 } else { 181 curl_close($this->connection); 182 $this->connection = null; 183 } 184 } 185 186 /** 187 * Sets the API path for the command 188 * 189 * @return $this 190 */ 191 protected function buildPath() 192 { 193 $bucket = null; 194 $key = ''; 195 196 $bucket = $this->command->getBucket(); 197 198 $location = $this->command->getLocation(); 199 if (!empty($location) && $location instanceof Location) { 200 $key = $location->getKey(); 201 } 202 switch (get_class($this->command)) { 203 /** @noinspection PhpMissingBreakStatementInspection */ 204 case 'Basho\Riak\Command\Bucket\Store': 205 $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON; 206 case 'Basho\Riak\Command\Bucket\Fetch': 207 case 'Basho\Riak\Command\Bucket\Delete': 208 $this->path = sprintf('/types/%s/buckets/%s/props', $bucket->getType(), $bucket->getName()); 209 break; 210 /** @noinspection PhpMissingBreakStatementInspection */ 211 case 'Basho\Riak\Command\KVObject\Fetch': 212 $this->headers['Accept'] = '*/*, multipart/mixed'; 213 case 'Basho\Riak\Command\KVObject\Store': 214 case 'Basho\Riak\Command\KVObject\Delete': 215 $this->path = sprintf('/types/%s/buckets/%s/keys/%s', $bucket->getType(), $bucket->getName(), $key); 216 break; 217 case 'Basho\Riak\Command\KVObject\Keys\Fetch': 218 $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON; 219 $this->path = sprintf('/types/%s/buckets/%s/keys', $bucket->getType(), $bucket->getName()); 220 break; 221 case 'Basho\Riak\Command\DataType\Counter\Store': 222 case 'Basho\Riak\Command\DataType\GSet\Store': 223 case 'Basho\Riak\Command\DataType\Set\Store': 224 /** @noinspection PhpMissingBreakStatementInspection */ 225 case 'Basho\Riak\Command\DataType\Map\Store': 226 case 'Basho\Riak\Command\DataType\Hll\Store': 227 $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON; 228 case 'Basho\Riak\Command\DataType\Counter\Fetch': 229 case 'Basho\Riak\Command\DataType\Set\Fetch': 230 case 'Basho\Riak\Command\DataType\Map\Fetch': 231 case 'Basho\Riak\Command\DataType\Hll\Fetch': 232 $this->path = sprintf('/types/%s/buckets/%s/datatypes/%s', $bucket->getType(), $bucket->getName(), $key); 233 break; 234 /** @noinspection PhpMissingBreakStatementInspection */ 235 case 'Basho\Riak\Command\Search\Index\Store': 236 $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON; 237 case 'Basho\Riak\Command\Search\Index\Fetch': 238 case 'Basho\Riak\Command\Search\Index\Delete': 239 $this->path = sprintf('/search/index/%s', $this->command); 240 break; 241 /** @noinspection PhpMissingBreakStatementInspection */ 242 case 'Basho\Riak\Command\Search\Schema\Store': 243 $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_XML; 244 case 'Basho\Riak\Command\Search\Schema\Fetch': 245 $this->path = sprintf('/search/schema/%s', $this->command); 246 break; 247 case 'Basho\Riak\Command\Search\Fetch': 248 $this->path = sprintf('/search/query/%s', $this->command); 249 break; 250 case 'Basho\Riak\Command\MapReduce\Fetch': 251 $this->headers[static::CONTENT_TYPE_KEY] = static::CONTENT_TYPE_JSON; 252 $this->path = sprintf('/%s', $this->config['mapred_prefix']); 253 break; 254 case 'Basho\Riak\Command\Indexes\Query': 255 $this->path = $this->createIndexQueryPath($bucket); 256 break; 257 case 'Basho\Riak\Command\Ping': 258 $this->path = '/ping'; 259 break; 260 case 'Basho\Riak\Command\Stats': 261 $this->path = '/stats'; 262 break; 263 case 'Basho\Riak\Command\KVObject\FetchPreflist': 264 $this->path = sprintf('/types/%s/buckets/%s/keys/%s/preflist', $bucket->getType(), $bucket->getName(), $key); 265 break; 266 case 'Basho\Riak\Command\TimeSeries\Fetch': 267 case 'Basho\Riak\Command\TimeSeries\Delete': 268 /** @var $command Command\TimeSeries\Fetch */ 269 $command = $this->command; 270 $this->path = sprintf('%s/tables/%s/keys/%s', static::TS_API_PREFIX, $command->getTable(), implode("/", $command->getData())); 271 break; 272 case 'Basho\Riak\Command\TimeSeries\Store': 273 /** @var $command Command\TimeSeries\Store */ 274 $command = $this->command; 275 $this->path = sprintf('%s/tables/%s/keys', static::TS_API_PREFIX, $command->getTable()); 276 break; 277 case 'Basho\Riak\Command\TimeSeries\Query\Fetch': 278 $this->path = sprintf('%s/query', static::TS_API_PREFIX); 279 break; 280 default: 281 $this->path = ''; 282 } 283 284 return $this; 285 } 286 287 /** 288 * Generates the URL path for a 2i Query 289 * 290 * @param Bucket $bucket 291 * @return string 292 * @throws Api\Exception if 2i query is invalid. 293 */ 294 private function createIndexQueryPath(Bucket $bucket) 295 { 296 /** @var Command\Indexes\Query $command */ 297 $command = $this->command; 298 299 if($command->isMatchQuery()) { 300 $path = sprintf('/types/%s/buckets/%s/index/%s/%s', $bucket->getType(), 301 $bucket->getName(), 302 $command->getIndexName(), 303 $command->getMatchValue()); 304 } 305 elseif($command->isRangeQuery()) { 306 $path = sprintf('/types/%s/buckets/%s/index/%s/%s/%s', $bucket->getType(), 307 $bucket->getName(), 308 $command->getIndexName(), 309 $command->getLowerBound(), 310 $command->getUpperBound()); 311 } 312 else 313 { 314 throw new Api\Exception("Invalid Secondary Index Query."); 315 } 316 317 return $path; 318 } 319 320 /** 321 * Prepare Connection 322 * 323 * Sets general connection options that are used with every request 324 * 325 * @return $this 326 * @throws Api\Exception 327 */ 328 protected function prepareConnection() 329 { 330 // record outgoing headers 331 $this->options[CURLINFO_HEADER_OUT] = 1; 332 333 if ($this->command->getConnectionTimeout()) { 334 $this->options[CURLOPT_TIMEOUT] = $this->command->getConnectionTimeout(); 335 } 336 337 if ($this->node->useTls()) { 338 // CA File 339 if ($this->node->getCaFile()) { 340 $this->options[CURLOPT_CAINFO] = $this->node->getCaFile(); 341 } elseif ($this->node->getCaDirectory()) { 342 $this->options[CURLOPT_CAPATH] = $this->node->getCaDirectory(); 343 } else { 344 throw new Api\Exception('A Certificate Authority file is required for secure connections.'); 345 } 346 347 // verify CA file 348 $this->options[CURLOPT_SSL_VERIFYPEER] = true; 349 350 // verify host common name 351 $this->options[CURLOPT_SSL_VERIFYHOST] = 0; 352 353 if ($this->node->getUserName()) { 354 $this->options[CURLOPT_USERPWD] = sprintf('%s:%s', $this->node->getUserName(), 355 $this->node->getPassword()); 356 } elseif ($this->node->getCertificate()) { 357 /* 358 * NOT CURRENTLY SUPPORTED ON HTTP 359 * 360 $this->options[CURLOPT_SSLCERT] = $this->node->getCertificate(); 361 $this->options[CURLOPT_SSLCERTTYPE] = 'P12'; 362 if ($this->node->getCertificatePassword()) { 363 $this->options[CURLOPT_SSLCERTPASSWD] = $this->node->getCertificatePassword(); 364 } 365 if ($this->node->getPrivateKey()) { 366 $this->options[CURLOPT_SSLKEY] = $this->node->getPrivateKey(); 367 if ($this->node->getPrivateKeyPassword()) { 368 $this->options[CURLOPT_SSLKEYPASSWD] = $this->node->getPrivateKeyPassword(); 369 } 370 } 371 */ 372 } 373 } 374 375 return $this; 376 } 377 378 /** 379 * Prepare request 380 * 381 * Sets connection options that are specific to this request 382 * 383 * @return $this 384 */ 385 protected function prepareRequest() 386 { 387 return $this->prepareRequestMethod() 388 ->prepareRequestHeaders() 389 ->prepareRequestParameters() 390 ->prepareRequestUrl() 391 ->prepareRequestData(); 392 } 393 394 /** 395 * Prepare request data 396 * 397 * @return $this 398 */ 399 protected function prepareRequestData() 400 { 401 // if POST or PUT, add parameters to post data, else add to uri 402 if (in_array($this->command->getMethod(), ['POST', 'PUT'])) { 403 $this->requestBody = $this->command->getEncodedData(); 404 $this->options[CURLOPT_POSTFIELDS] = $this->requestBody; 405 } 406 407 return $this; 408 } 409 410 /** 411 * Prepares the complete request URL 412 * 413 * @return $this 414 */ 415 protected function prepareRequestUrl() 416 { 417 $protocol = $this->node->useTls() ? 'https' : 'http'; 418 $this->requestURL = sprintf('%s://%s%s?%s', $protocol, $this->node->getUri(), $this->path, $this->query); 419 420 // set the built request URL on the connection 421 $this->options[CURLOPT_URL] = $this->requestURL; 422 423 return $this; 424 } 425 426 /** 427 * Prepare request parameters 428 * 429 * @return $this 430 */ 431 protected function prepareRequestParameters() 432 { 433 if ($this->command->hasParameters()) { 434 // build query using RFC 3986 (spaces become %20 instead of '+') 435 $this->query = http_build_query($this->command->getParameters(), '', '&', PHP_QUERY_RFC3986); 436 } 437 438 return $this; 439 } 440 441 /** 442 * Prepares the request headers 443 * 444 * @return $this 445 */ 446 protected function prepareRequestHeaders() 447 { 448 $curl_headers = []; 449 450 foreach ($this->headers as $key => $value) { 451 $curl_headers[] = sprintf('%s: %s', $key, $value); 452 } 453 454 // if we have an object, set appropriate object headers 455 $object = $this->command->getObject(); 456 if ($object) { 457 if ($object->getVclock()) { 458 $curl_headers[] = sprintf('%s: %s', static::VCLOCK_KEY, $object->getVclock()); 459 } 460 461 if ($object->getContentType()) { 462 $charset = ''; 463 if ($object->getCharset()) { 464 $charset = sprintf('; charset=%s', $object->getCharset()); 465 } 466 $curl_headers[] = sprintf('%s: %s', static::CONTENT_TYPE_KEY, $object->getContentType(), $charset); 467 } 468 469 // setup index headers 470 $translator = new Api\Http\Translator\SecondaryIndex(); 471 $indexHeaders = $translator->createHeadersFromIndexes($object->getIndexes()); 472 foreach ($indexHeaders as $value) { 473 $curl_headers[] = sprintf('%s: %s', $value[0], $value[1]); 474 } 475 476 // setup metadata headers 477 foreach($object->getMetaData() as $key => $value) { 478 $curl_headers[] = sprintf('%s%s: %s', static::METADATA_PREFIX, $key, $value); 479 } 480 } 481 482 // set the request headers on the connection 483 $this->options[CURLOPT_HTTPHEADER] = $curl_headers; 484 485 // dump local headers to start fresh 486 $this->headers = []; 487 488 return $this; 489 } 490 491 /** 492 * Prepare the request method 493 * 494 * @return $this 495 */ 496 protected function prepareRequestMethod() 497 { 498 switch ($this->command->getMethod()) { 499 case "POST": 500 $this->options[CURLOPT_POST] = 1; 501 break; 502 case "PUT": 503 $this->options[CURLOPT_CUSTOMREQUEST] = 'PUT'; 504 break; 505 case "DELETE": 506 $this->options[CURLOPT_CUSTOMREQUEST] = 'DELETE'; 507 break; 508 case "HEAD": 509 $this->options[CURLOPT_NOBODY] = 1; 510 break; 511 default: 512 // reset http method to get in case its changed 513 $this->options[CURLOPT_HTTPGET] = 1; 514 } 515 516 return $this; 517 } 518 519 /** 520 * @return string 521 */ 522 public function getPath() 523 { 524 return $this->path; 525 } 526 527 /** 528 * @return string 529 */ 530 public function getQuery() 531 { 532 return $this->query; 533 } 534 535 /** 536 * @return bool 537 */ 538 public function send() 539 { 540 // set the response header and body callback functions 541 $this->options[CURLOPT_HEADERFUNCTION] = [$this, 'responseHeaderCallback']; 542 $this->options[CURLOPT_WRITEFUNCTION] = [$this, 'responseBodyCallback']; 543 if ($this->command->isVerbose()) { 544 // set curls output to be the output buffer stream 545 $this->options[CURLOPT_STDERR] = fopen('php://stdout', 'w+'); 546 $this->options[CURLOPT_VERBOSE] = 1; 547 548 // there is a bug when verbose is enabled, header out causes no output 549 // @see https://bugs.php.net/bug.php?id=65348 550 unset($this->options[CURLINFO_HEADER_OUT]); 551 552 echo "cURL Command:\n\tcurl -X{$this->command->getMethod()} {$this->requestURL} --data \"{$this->requestBody}\"\n"; 553 } 554 555 // set all options on the resource 556 curl_setopt_array($this->getConnection(), $this->options); 557 558 // execute the request 559 $this->success = curl_exec($this->getConnection()); 560 if ($this->success === false) { 561 $this->error = curl_error($this->getConnection()); 562 } elseif ($this->success === true) { 563 $this->error = ''; 564 } 565 566 $this->request = curl_getinfo($this->getConnection(), CURLINFO_HEADER_OUT); 567 568 // set the response http code 569 $this->statusCode = curl_getinfo($this->getConnection(), CURLINFO_HTTP_CODE); 570 571 $this->parseResponse(); 572 573 return $this->success; 574 } 575 576 /** 577 * Add a custom header to the request 578 * 579 * @param $key 580 * @param $value 581 */ 582 public function addHeader($key, $value) 583 { 584 $this->headers[$key] = $value; 585 } 586 587 /** 588 * @return resource 589 */ 590 public function getConnection() 591 { 592 if (!$this->connection) { 593 $this->openConnection(); 594 } 595 596 return $this->connection; 597 } 598 599 public function openConnection() 600 { 601 $this->connection = curl_init(); 602 603 return $this; 604 } 605 606 /** 607 * Response header callback 608 * 609 * Handles callback from curl when the response is received, it parses the headers into an array sets them as 610 * member of the class. 611 * 612 * Has to be public for curl to be able to access it. 613 * 614 * @param $ch 615 * @param $header 616 * 617 * @return int 618 */ 619 public function responseHeaderCallback($ch, $header) 620 { 621 if (strpos($header, ':')) { 622 list ($key, $value) = explode(':', $header, 2); 623 624 $value = trim($value); 625 626 if (!empty($value)) { 627 if (!isset($this->responseHeaders[$key])) { 628 $this->responseHeaders[$key] = $value; 629 } elseif (is_array($this->responseHeaders[$key])) { 630 $this->responseHeaders[$key] = array_merge($this->responseHeaders[$key], [$value]); 631 } else { 632 $this->responseHeaders[$key] = array_merge([$this->responseHeaders[$key]], [$value]); 633 } 634 } 635 } 636 637 return strlen($header); 638 } 639 640 /** 641 * Response body callback 642 * 643 * Handles callback from curl when the response is received, it sets the response body as a member of the class. 644 * 645 * Has to be public for curl to be able to access it. 646 * 647 * @param $ch 648 * @param $body 649 * 650 * @return int 651 */ 652 public function responseBodyCallback($ch, $body) 653 { 654 $this->responseBody .= $body; 655 return strlen($body); 656 } 657 658 protected function parseResponse() 659 { 660 // trim leading / trailing whitespace 661 $body = $this->responseBody; 662 $location = null; 663 if ($this->getResponseHeader('Location')) { 664 $location = Location::fromString($this->getResponseHeader('Location')); 665 } 666 667 if ($this->statusCode == 500) { 668 $this->success = false; 669 $this->error = $body; 670 } 671 672 switch (get_class($this->command)) { 673 case 'Basho\Riak\Command\Bucket\Store': 674 case 'Basho\Riak\Command\Bucket\Fetch': 675 $bucket = null; 676 $modified = $this->getResponseHeader(static::LAST_MODIFIED_KEY, ''); 677 $properties = json_decode($body, true); 678 if (isset($properties['props']) && $this->command->getBucket()) { 679 $bucket = new Bucket($this->command->getBucket()->getName(), $this->command->getBucket()->getType(), $properties['props']); 680 } 681 $response = new Command\Bucket\Response($this->success, $this->statusCode, $this->error, $bucket, $modified); 682 break; 683 684 case 'Basho\Riak\Command\KVObject\Fetch': 685 case 'Basho\Riak\Command\KVObject\Store': 686 /** @var Command\KVObject $command */ 687 $command = $this->command; 688 $objects = (new Api\Http\Translator\ObjectResponse($command, $this->statusCode)) 689 ->parseResponse($body, $this->responseHeaders); 690 $response = new Command\KVObject\Response($this->success, $this->statusCode, $this->error, $location, $objects); 691 break; 692 693 case 'Basho\Riak\Command\KVObject\FetchPreflist': 694 $response = new Command\KVObject\Response($this->success, $this->statusCode, $this->error, $location, [new DataObject(json_decode($body))]); 695 break; 696 697 case 'Basho\Riak\Command\KVObject\Keys\Fetch': 698 $data = json_decode($body); 699 $keys = []; 700 if ($data && isset($data->keys)) { 701 foreach ($data->keys as $key) { 702 $keys[] = new Location($key, $this->command->getBucket()); 703 } 704 } 705 $response = new Command\KVObject\Keys\Response($this->success, $this->statusCode, $this->error, $keys); 706 break; 707 708 case 'Basho\Riak\Command\DataType\Counter\Store': 709 case 'Basho\Riak\Command\DataType\Counter\Fetch': 710 $counter = null; 711 $json_object = json_decode($body); 712 if ($json_object && isset($json_object->value)) { 713 $counter = new Counter($json_object->value); 714 } 715 $response = new Command\DataType\Counter\Response( 716 $this->success, $this->statusCode, $this->error, $location, $counter, $this->getResponseHeader('Date') 717 ); 718 break; 719 720 case 'Basho\Riak\Command\DataType\GSet\Store': 721 case 'Basho\Riak\Command\DataType\Set\Store': 722 case 'Basho\Riak\Command\DataType\Set\Fetch': 723 $set = null; 724 $json_object = json_decode($body); 725 if ($json_object && isset($json_object->value)) { 726 $context = ''; 727 if (isset($json_object->context)) { 728 $context = $json_object->context; 729 } 730 $set = new Set($json_object->value, $context); 731 } 732 $response = new Command\DataType\Set\Response( 733 $this->success, $this->statusCode, $this->error, $location, $set, $this->getResponseHeader('Date') 734 ); 735 break; 736 737 case 'Basho\Riak\Command\DataType\Map\Store': 738 case 'Basho\Riak\Command\DataType\Map\Fetch': 739 $map = null; 740 $json_object = json_decode($body, true); 741 if ($json_object && isset($json_object['value'])) { 742 $map = new Map($json_object['value'], $json_object['context']); 743 } 744 $response = new Command\DataType\Map\Response( 745 $this->success, $this->statusCode, $this->error, $location, $map, $this->getResponseHeader('Date') 746 ); 747 break; 748 749 case 'Basho\Riak\Command\DataType\Hll\Store': 750 case 'Basho\Riak\Command\DataType\Hll\Fetch': 751 $hll = null; 752 $json_object = json_decode($body); 753 if ($json_object && isset($json_object->value)) { 754 $hll = new Hll($json_object->value); 755 } 756 $response = new Command\DataType\Hll\Response( 757 $this->success, $this->statusCode, $this->error, $location, $hll, $this->getResponseHeader('Date') 758 ); 759 break; 760 761 case 'Basho\Riak\Command\Search\Fetch': 762 $results = in_array($this->statusCode, [200,204]) ? json_decode($body) : null; 763 $docs = []; 764 if (!empty($results->response->docs)) { 765 foreach ($results->response->docs as $doc) { 766 $docs[] = new Doc($doc); 767 } 768 } 769 $numFound = !empty($results->response->numFound) ? $results->response->numFound : 0; 770 771 $response = new Command\Search\Response($this->success, $this->statusCode, $this->error, $numFound, $docs); 772 break; 773 case 'Basho\Riak\Command\Search\Index\Store': 774 case 'Basho\Riak\Command\Search\Index\Fetch': 775 $index = json_decode($body); 776 $response = new Command\Search\Index\Response($this->success, $this->statusCode, $this->error, $index); 777 break; 778 779 case 'Basho\Riak\Command\Search\Schema\Store': 780 case 'Basho\Riak\Command\Search\Schema\Fetch': 781 $response = new Command\Search\Schema\Response( 782 $this->success, $this->statusCode, $this->error, $body, $this->getResponseHeader(static::CONTENT_TYPE_KEY) 783 ); 784 break; 785 786 case 'Basho\Riak\Command\MapReduce\Fetch': 787 $results = in_array($this->statusCode, [200,204]) ? json_decode($body) : null; 788 $response = new Command\MapReduce\Response($this->success, $this->statusCode, $this->error, $results); 789 break; 790 case 'Basho\Riak\Command\Indexes\Query': 791 $json_object = in_array($this->statusCode, [200,204]) ? json_decode($body, true) : null; 792 $results = []; 793 $termsReturned = false; 794 $continuation = null; 795 $done = true; 796 797 if (isset($json_object['keys'])) { 798 $results = $json_object['keys']; 799 } 800 801 if (isset($json_object['results'])) { 802 $results = $json_object['results']; 803 $termsReturned = true; 804 } 805 806 if (isset($json_object['continuation'])) { 807 $continuation = $json_object['continuation']; 808 $done = false; 809 } 810 811 $response = new Command\Indexes\Response( 812 $this->success, $this->statusCode, $this->error, $results, $termsReturned, $continuation, $done, $this->getResponseHeader('Date') 813 ); 814 break; 815 case 'Basho\Riak\Command\Stats': 816 $response = new Command\Stats\Response($this->success, $this->statusCode, $this->error, json_decode($body, true)); 817 break; 818 case 'Basho\Riak\Command\TimeSeries\Fetch': 819 $row = in_array($this->statusCode, ['200','201','204']) ? json_decode($body, true) : []; 820 $response = new Command\TimeSeries\Response($this->success, $this->statusCode, $this->error, [$row]); 821 break; 822 case 'Basho\Riak\Command\TimeSeries\Query\Fetch': 823 $results = in_array($this->statusCode, ['200','204']) ? json_decode($body) : []; 824 $rows = []; 825 if (isset($results->rows)) { 826 foreach ($results->rows as $row) { 827 $cells = []; 828 foreach ($results->columns as $index => $column) { 829 $cells[$column] = $row[$index]; 830 } 831 $rows[] = $cells; 832 } 833 } 834 $response = new Command\TimeSeries\Query\Response($this->success, $this->statusCode, $this->error, $rows); 835 break; 836 case 'Basho\Riak\Command\TimeSeries\Store': 837 case 'Basho\Riak\Command\TimeSeries\Delete': 838 case 'Basho\Riak\Command\KVObject\Delete': 839 case 'Basho\Riak\Command\Bucket\Delete': 840 case 'Basho\Riak\Command\Search\Index\Delete': 841 case 'Basho\Riak\Command\Ping': 842 default: 843 $response = new Command\Response($this->success, $this->statusCode, $this->error); 844 break; 845 } 846 847 $this->response = $response; 848 } 849 850 protected function getResponseHeader($key, $default = '') 851 { 852 if (!empty($this->responseHeaders[$key])) { 853 return $this->responseHeaders[$key]; 854 } else { 855 return $default; 856 } 857 } 858} 859