1<?php 2/* Icinga Web 2 Elasticsearch Module (c) 2017 Icinga Development Team | GPLv2+ */ 3 4namespace Icinga\Module\Elasticsearch; 5 6use RuntimeException; 7use Icinga\Data\Paginatable; 8use Icinga\Data\Queryable; 9use Icinga\Util\Json; 10use iplx\Http\Client; 11use iplx\Http\Request; 12use iplx\Http\Uri; 13 14class Query implements Queryable, Paginatable 15{ 16 const MAX_RESULT_WINDOW = 10000; 17 18 protected $elastic; 19 20 protected $fields; 21 22 protected $filter; 23 24 protected $index; 25 26 protected $limit; 27 28 protected $offset; 29 30 protected $response; 31 32 protected $patch = []; 33 34 public function __construct(Elastic $elastic, array $fields = []) 35 { 36 $this->elastic = $elastic; 37 38 $this->fields = $fields; 39 } 40 41 /** 42 * {@inheritdoc} 43 * 44 * @return $this 45 */ 46 public function from($target, array $fields = null) 47 { 48 $this->index = $target; 49 50 if (! empty($fields)) { 51 $this->fields = $fields; 52 } 53 54 return $this; 55 } 56 57 public function limit($count = null, $offset = null) 58 { 59 $this->limit = $count; 60 $this->offset = $offset; 61 62 return $this; 63 } 64 65 public function hasLimit() 66 { 67 return $this->limit !== null; 68 } 69 70 public function getLimit() 71 { 72 return $this->limit; 73 } 74 75 public function hasOffset() 76 { 77 return $this->offset !== null; 78 } 79 80 public function getOffset() 81 { 82 return $this->offset; 83 } 84 85 public function count() 86 { 87 $this->execute(); 88 89 $total = $this->response['hits']['total']; 90 if ($total > self::MAX_RESULT_WINDOW) { 91 return self::MAX_RESULT_WINDOW; 92 } 93 94 return $total; 95 } 96 97 public function filter($filter) 98 { 99 $this->filter = $filter; 100 101 return $this; 102 } 103 104 protected function execute() 105 { 106 if ($this->response === null) { 107 $config = $this->elastic->getConfig(); 108 109 $client = new Client(); 110 111 $curl = []; 112 113 if (! empty($config->ca)) { 114 if (is_dir($config->ca) 115 || (is_link($config->ca) && is_dir(readlink($config->ca))) 116 ) { 117 $curl[CURLOPT_CAPATH] = $config->ca; 118 } else { 119 $curl[CURLOPT_CAINFO] = $config->ca; 120 } 121 } 122 123 if (! empty($config->client_certificate)) { 124 $curl[CURLOPT_SSLCERT] = $config->client_certificate; 125 } 126 127 if (! empty($config->client_private_key)) { 128 $curl[CURLOPT_SSLCERT] = $config->client_private_key; 129 } 130 131 $uri = (new Uri("{$config->uri}/{$this->index}/_search")) 132 ->withUserInfo($config->user, $config->password); 133 134 $request = new Request( 135 'GET', 136 $uri, 137 ['Content-Type' => 'application/json'], 138 json_encode(array_filter(array_merge([ 139 '_source' => array_merge(['@timestamp'], $this->fields), 140 'query' => $this->filter, 141 'from' => $this->getOffset(), 142 'size' => $this->getLimit(), 143 'sort' => ['@timestamp' => 'desc'] 144 ], $this->patch), function ($part) { return $part !== null; })) 145 ); 146 147 $response = Json::decode((string) $client->send($request, ['curl' => $curl])->getBody(), true); 148 149 if (isset($response['error'])) { 150 throw new RuntimeException( 151 'Got error from Elasticsearch: '. $response['error']['type'] . ': ' . $response['error']['reason'] 152 ); 153 } 154 155 $this->response = $response; 156 } 157 } 158 159 public function getFields() 160 { 161 $this->execute(); 162 163 $events = $this->response['hits']['hits']; 164 165 $fields = []; 166 167 if (! empty($events)) { 168 $event = reset($events); 169 170 Elastic::extractFields($event['_source'], $fields); 171 } 172 173 return $fields; 174 } 175 176 public function fetchAll() 177 { 178 $this->execute(); 179 180 return $this->response['hits']['hits']; 181 } 182 183 public function patch(array $patch) 184 { 185 $this->patch = $patch; 186 187 return $this; 188 } 189 190 public function getResponse() 191 { 192 $this->execute(); 193 194 return $this->response; 195 } 196 197 public function get($target) 198 { 199 $config = $this->elastic->getConfig(); 200 201 $client = new Client(); 202 203 $curl = []; 204 205 if (! empty($config->ca)) { 206 if (is_dir($config->ca) 207 || (is_link($config->ca) && is_dir(readlink($config->ca))) 208 ) { 209 $curl[CURLOPT_CAPATH] = $config->ca; 210 } else { 211 $curl[CURLOPT_CAINFO] = $config->ca; 212 } 213 } 214 215 if (! empty($config->client_certificate)) { 216 $curl[CURLOPT_SSLCERT] = $config->client_certificate; 217 } 218 219 if (! empty($config->client_private_key)) { 220 $curl[CURLOPT_SSLCERT] = $config->client_private_key; 221 } 222 223 $uri = (new Uri("{$config->uri}/{$target}")) 224 ->withUserInfo($config->user, $config->password); 225 226 $request = new Request( 227 'GET', 228 $uri, 229 ['Content-Type' => 'application/json'] 230 ); 231 232 $response = Json::decode((string) $client->send($request, ['curl' => $curl])->getBody(), true); 233 234 if (isset($response['error'])) { 235 throw new RuntimeException( 236 'Got error from Elasticsearch: '. $response['error']['type'] . ': ' . $response['error']['reason'] 237 ); 238 } 239 240 return $response; 241 } 242} 243