1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #include "livestatus/livestatusquery.hpp"
4 #include "livestatus/countaggregator.hpp"
5 #include "livestatus/sumaggregator.hpp"
6 #include "livestatus/minaggregator.hpp"
7 #include "livestatus/maxaggregator.hpp"
8 #include "livestatus/avgaggregator.hpp"
9 #include "livestatus/stdaggregator.hpp"
10 #include "livestatus/invsumaggregator.hpp"
11 #include "livestatus/invavgaggregator.hpp"
12 #include "livestatus/attributefilter.hpp"
13 #include "livestatus/negatefilter.hpp"
14 #include "livestatus/orfilter.hpp"
15 #include "livestatus/andfilter.hpp"
16 #include "icinga/externalcommandprocessor.hpp"
17 #include "base/debug.hpp"
18 #include "base/convert.hpp"
19 #include "base/objectlock.hpp"
20 #include "base/logger.hpp"
21 #include "base/exception.hpp"
22 #include "base/utility.hpp"
23 #include "base/json.hpp"
24 #include "base/serializer.hpp"
25 #include "base/timer.hpp"
26 #include "base/initialize.hpp"
27 #include <boost/algorithm/string/replace.hpp>
28 #include <boost/algorithm/string/join.hpp>
29 
30 using namespace icinga;
31 
32 static int l_ExternalCommands = 0;
33 static std::mutex l_QueryMutex;
34 
LivestatusQuery(const std::vector<String> & lines,const String & compat_log_path)35 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
36 	: m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1), m_ErrorCode(0),
37 	m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
38 {
39 	if (lines.size() == 0) {
40 		m_Verb = "ERROR";
41 		m_ErrorCode = LivestatusErrorQuery;
42 		m_ErrorMessage = "Empty Query. Aborting.";
43 		return;
44 	}
45 
46 	String msg;
47 	for (const String& line : lines) {
48 		msg += line + "\n";
49 	}
50 	Log(LogDebug, "LivestatusQuery", msg);
51 
52 	m_CompatLogPath = compat_log_path;
53 
54 	/* default separators */
55 	m_Separators.emplace_back("\n");
56 	m_Separators.emplace_back(";");
57 	m_Separators.emplace_back(",");
58 	m_Separators.emplace_back("|");
59 
60 	String line = lines[0];
61 
62 	size_t sp_index = line.FindFirstOf(" ");
63 
64 	if (sp_index == String::NPos)
65 		BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
66 
67 	String verb = line.SubStr(0, sp_index);
68 	String target = line.SubStr(sp_index + 1);
69 
70 	m_Verb = verb;
71 
72 	if (m_Verb == "COMMAND") {
73 		m_KeepAlive = true;
74 		m_Command = target;
75 	} else if (m_Verb == "GET") {
76 		m_Table = target;
77 	} else {
78 		m_Verb = "ERROR";
79 		m_ErrorCode = LivestatusErrorQuery;
80 		m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
81 		return;
82 	}
83 
84 	std::deque<Filter::Ptr> filters, stats;
85 	std::deque<Aggregator::Ptr> aggregators;
86 
87 	for (unsigned int i = 1; i < lines.size(); i++) {
88 		line = lines[i];
89 
90 		size_t col_index = line.FindFirstOf(":");
91 		String header = line.SubStr(0, col_index);
92 		String params;
93 
94 		//OutputFormat:json or OutputFormat: json
95 		if (line.GetLength() > col_index + 1)
96 			params = line.SubStr(col_index + 1).Trim();
97 
98 		if (header == "ResponseHeader")
99 			m_ResponseHeader = params;
100 		else if (header == "OutputFormat")
101 			m_OutputFormat = params;
102 		else if (header == "KeepAlive")
103 			m_KeepAlive = (params == "on");
104 		else if (header == "Columns") {
105 			m_ColumnHeaders = false; // Might be explicitly re-enabled later on
106 			m_Columns = params.Split(" ");
107 		} else if (header == "Separators") {
108 			std::vector<String> separators = params.Split(" ");
109 
110 			/* ugly ascii long to char conversion, but works */
111 			if (separators.size() > 0)
112 				m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
113 			if (separators.size() > 1)
114 				m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
115 			if (separators.size() > 2)
116 				m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
117 			if (separators.size() > 3)
118 				m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
119 		} else if (header == "ColumnHeaders")
120 			m_ColumnHeaders = (params == "on");
121 		else if (header == "Limit")
122 			m_Limit = Convert::ToLong(params);
123 		else if (header == "Filter") {
124 			Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
125 
126 			if (!filter) {
127 				m_Verb = "ERROR";
128 				m_ErrorCode = LivestatusErrorQuery;
129 				m_ErrorMessage = "Invalid filter specification: " + line;
130 				return;
131 			}
132 
133 			filters.push_back(filter);
134 		} else if (header == "Stats") {
135 			m_ColumnHeaders = false; // Might be explicitly re-enabled later on
136 
137 			std::vector<String> tokens = params.Split(" ");
138 
139 			if (tokens.size() < 2) {
140 				m_Verb = "ERROR";
141 				m_ErrorCode = LivestatusErrorQuery;
142 				m_ErrorMessage = "Missing aggregator column name: " + line;
143 				return;
144 			}
145 
146 			String aggregate_arg = tokens[0];
147 			String aggregate_attr = tokens[1];
148 
149 			Aggregator::Ptr aggregator;
150 			Filter::Ptr filter;
151 
152 			if (aggregate_arg == "sum") {
153 				aggregator = new SumAggregator(aggregate_attr);
154 			} else if (aggregate_arg == "min") {
155 				aggregator = new MinAggregator(aggregate_attr);
156 			} else if (aggregate_arg == "max") {
157 				aggregator = new MaxAggregator(aggregate_attr);
158 			} else if (aggregate_arg == "avg") {
159 				aggregator = new AvgAggregator(aggregate_attr);
160 			} else if (aggregate_arg == "std") {
161 				aggregator = new StdAggregator(aggregate_attr);
162 			} else if (aggregate_arg == "suminv") {
163 				aggregator = new InvSumAggregator(aggregate_attr);
164 			} else if (aggregate_arg == "avginv") {
165 				aggregator = new InvAvgAggregator(aggregate_attr);
166 			} else {
167 				filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
168 
169 				if (!filter) {
170 					m_Verb = "ERROR";
171 					m_ErrorCode = LivestatusErrorQuery;
172 					m_ErrorMessage = "Invalid filter specification: " + line;
173 					return;
174 				}
175 
176 				aggregator = new CountAggregator();
177 			}
178 
179 			aggregator->SetFilter(filter);
180 			aggregators.push_back(aggregator);
181 
182 			stats.push_back(filter);
183 		} else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
184 			std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
185 
186 			unsigned int num = Convert::ToLong(params);
187 			CombinerFilter::Ptr filter;
188 
189 			if (header == "Or" || header == "StatsOr") {
190 				filter = new OrFilter();
191 				Log(LogDebug, "LivestatusQuery")
192 					<< "Add OR filter for " << params << " column(s). " << deq.size() << " filters available.";
193 			} else {
194 				filter = new AndFilter();
195 				Log(LogDebug, "LivestatusQuery")
196 					<< "Add AND filter for " << params << " column(s). " << deq.size() << " filters available.";
197 			}
198 
199 			if (num > deq.size()) {
200 				m_Verb = "ERROR";
201 				m_ErrorCode = 451;
202 				m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
203 				return;
204 			}
205 
206 			while (num > 0 && num--) {
207 				filter->AddSubFilter(deq.back());
208 				Log(LogDebug, "LivestatusQuery")
209 					<< "Add " << num << " filter.";
210 				deq.pop_back();
211 				if (&deq == &stats)
212 					aggregators.pop_back();
213 			}
214 
215 			deq.emplace_back(filter);
216 			if (&deq == &stats) {
217 				Aggregator::Ptr aggregator = new CountAggregator();
218 				aggregator->SetFilter(filter);
219 				aggregators.push_back(aggregator);
220 			}
221 		} else if (header == "Negate" || header == "StatsNegate") {
222 			std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
223 
224 			if (deq.empty()) {
225 				m_Verb = "ERROR";
226 				m_ErrorCode = 451;
227 				m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
228 				return;
229 			}
230 
231 			Filter::Ptr filter = deq.back();
232 			deq.pop_back();
233 
234 			if (!filter) {
235 				m_Verb = "ERROR";
236 				m_ErrorCode = 451;
237 				m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
238 				return;
239 			}
240 
241 			deq.push_back(new NegateFilter(filter));
242 
243 			if (deq == stats) {
244 				Aggregator::Ptr aggregator = aggregators.back();
245 				aggregator->SetFilter(filter);
246 			}
247 		}
248 	}
249 
250 	/* Combine all top-level filters into a single filter. */
251 	AndFilter::Ptr top_filter = new AndFilter();
252 
253 	for (const Filter::Ptr& filter : filters) {
254 		top_filter->AddSubFilter(filter);
255 	}
256 
257 	m_Filter = top_filter;
258 	m_Aggregators.swap(aggregators);
259 }
260 
GetExternalCommands()261 int LivestatusQuery::GetExternalCommands()
262 {
263 	std::unique_lock<std::mutex> lock(l_QueryMutex);
264 
265 	return l_ExternalCommands;
266 }
267 
ParseFilter(const String & params,unsigned long & from,unsigned long & until)268 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
269 {
270 	/*
271 	 * time >= 1382696656
272 	 * type = SERVICE FLAPPING ALERT
273 	 */
274 	std::vector<String> tokens;
275 	size_t sp_index;
276 	String temp_buffer = params;
277 
278 	/* extract attr and op */
279 	for (int i = 0; i < 2; i++) {
280 		sp_index = temp_buffer.FindFirstOf(" ");
281 
282 		/* check if this is the last argument */
283 		if (sp_index == String::NPos) {
284 			/* 'attr op' or 'attr op val' is valid */
285 			if (i < 1)
286 				BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
287 
288 			break;
289 		}
290 
291 		tokens.emplace_back(temp_buffer.SubStr(0, sp_index));
292 		temp_buffer = temp_buffer.SubStr(sp_index + 1);
293 	}
294 
295 	/* add the rest as value */
296 	tokens.emplace_back(std::move(temp_buffer));
297 
298 	if (tokens.size() == 2)
299 		tokens.emplace_back("");
300 
301 	if (tokens.size() < 3)
302 		return nullptr;
303 
304 	bool negate = false;
305 	String attr = tokens[0];
306 	String op = tokens[1];
307 	String val = tokens[2];
308 
309 	if (op == "!=") {
310 		op = "=";
311 		negate = true;
312 	} else if (op == "!~") {
313 		op = "~";
314 		negate = true;
315 	} else if (op == "!=~") {
316 		op = "=~";
317 		negate = true;
318 	} else if (op == "!~~") {
319 		op = "~~";
320 		negate = true;
321 	}
322 
323 	Filter::Ptr filter = new AttributeFilter(attr, op, val);
324 
325 	if (negate)
326 		filter = new NegateFilter(filter);
327 
328 	/* pre-filter log time duration */
329 	if (attr == "time") {
330 		if (op == "<" || op == "<=") {
331 			until = Convert::ToLong(val);
332 		} else if (op == ">" || op == ">=") {
333 			from = Convert::ToLong(val);
334 		}
335 	}
336 
337 	Log(LogDebug, "LivestatusQuery")
338 		<< "Parsed filter with attr: '" << attr << "' op: '" << op << "' val: '" << val << "'.";
339 
340 	return filter;
341 }
342 
BeginResultSet(std::ostream & fp) const343 void LivestatusQuery::BeginResultSet(std::ostream& fp) const
344 {
345 	if (m_OutputFormat == "json" || m_OutputFormat == "python")
346 		fp << "[";
347 }
348 
EndResultSet(std::ostream & fp) const349 void LivestatusQuery::EndResultSet(std::ostream& fp) const
350 {
351 	if (m_OutputFormat == "json" || m_OutputFormat == "python")
352 		fp << "]";
353 }
354 
AppendResultRow(std::ostream & fp,const Array::Ptr & row,bool & first_row) const355 void LivestatusQuery::AppendResultRow(std::ostream& fp, const Array::Ptr& row, bool& first_row) const
356 {
357 	if (m_OutputFormat == "csv") {
358 		bool first = true;
359 
360 		ObjectLock rlock(row);
361 		for (const Value& value : row) {
362 			if (first)
363 				first = false;
364 			else
365 				fp << m_Separators[1];
366 
367 			if (value.IsObjectType<Array>())
368 				PrintCsvArray(fp, value, 0);
369 			else
370 				fp << value;
371 		}
372 
373 		fp << m_Separators[0];
374 	} else if (m_OutputFormat == "json") {
375 		if (!first_row)
376 			fp << ", ";
377 
378 		fp << JsonEncode(row);
379 	} else if (m_OutputFormat == "python") {
380 		if (!first_row)
381 			fp << ", ";
382 
383 		PrintPythonArray(fp, row);
384 	}
385 
386 	first_row = false;
387 }
388 
PrintCsvArray(std::ostream & fp,const Array::Ptr & array,int level) const389 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
390 {
391 	bool first = true;
392 
393 	ObjectLock olock(array);
394 	for (const Value& value : array) {
395 		if (first)
396 			first = false;
397 		else
398 			fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
399 
400 		if (value.IsObjectType<Array>())
401 			PrintCsvArray(fp, value, level + 1);
402 		else if (value.IsBoolean())
403 			fp << Convert::ToLong(value);
404 		else
405 			fp << value;
406 	}
407 }
408 
PrintPythonArray(std::ostream & fp,const Array::Ptr & rs) const409 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
410 {
411 	fp << "[ ";
412 
413 	bool first = true;
414 
415 	for (const Value& value : rs) {
416 		if (first)
417 			first = false;
418 		else
419 			fp << ", ";
420 
421 		if (value.IsObjectType<Array>())
422 			PrintPythonArray(fp, value);
423 		else if (value.IsNumber())
424 			fp << value;
425 		else
426 			fp << QuoteStringPython(value);
427 	}
428 
429 	fp << " ]";
430 }
431 
QuoteStringPython(const String & str)432 String LivestatusQuery::QuoteStringPython(const String& str) {
433 	String result = str;
434 	boost::algorithm::replace_all(result, "\"", "\\\"");
435 	return "r\"" + result + "\"";
436 }
437 
ExecuteGetHelper(const Stream::Ptr & stream)438 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
439 {
440 	Log(LogNotice, "LivestatusQuery")
441 		<< "Table: " << m_Table;
442 
443 	Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
444 
445 	if (!table) {
446 		SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
447 
448 		return;
449 	}
450 
451 	std::vector<LivestatusRowValue> objects = table->FilterRows(m_Filter, m_Limit);
452 	std::vector<String> columns;
453 
454 	if (m_Columns.size() > 0)
455 		columns = m_Columns;
456 	else
457 		columns = table->GetColumnNames();
458 
459 	std::ostringstream result;
460 	bool first_row = true;
461 	BeginResultSet(result);
462 
463 	if (m_Aggregators.empty()) {
464 		typedef std::pair<String, Column> ColumnPair;
465 
466 		std::vector<ColumnPair> column_objs;
467 		column_objs.reserve(columns.size());
468 
469 		for (const String& columnName : columns)
470 			column_objs.emplace_back(columnName, table->GetColumn(columnName));
471 
472 		ArrayData header;
473 
474 		for (const LivestatusRowValue& object : objects) {
475 			ArrayData row;
476 
477 			row.reserve(column_objs.size());
478 
479 			for (const ColumnPair& cv : column_objs) {
480 				if (m_ColumnHeaders)
481 					header.push_back(cv.first);
482 
483 				row.push_back(cv.second.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
484 			}
485 
486 			if (m_ColumnHeaders) {
487 				AppendResultRow(result, new Array(std::move(header)), first_row);
488 				m_ColumnHeaders = false;
489 			}
490 
491 			AppendResultRow(result, new Array(std::move(row)), first_row);
492 		}
493 	} else {
494 		std::map<std::vector<Value>, std::vector<AggregatorState *> > allStats;
495 
496 		/* add aggregated stats */
497 		for (const LivestatusRowValue& object : objects) {
498 			std::vector<Value> statsKey;
499 
500 			for (const String& columnName : m_Columns) {
501 				Column column = table->GetColumn(columnName);
502 				statsKey.emplace_back(column.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
503 			}
504 
505 			auto it = allStats.find(statsKey);
506 
507 			if (it == allStats.end()) {
508 				std::vector<AggregatorState *> newStats(m_Aggregators.size(), nullptr);
509 				it = allStats.insert(std::make_pair(statsKey, newStats)).first;
510 			}
511 
512 			auto& stats = it->second;
513 
514 			int index = 0;
515 
516 			for (const Aggregator::Ptr& aggregator : m_Aggregators) {
517 				aggregator->Apply(table, object.Row, &stats[index]);
518 				index++;
519 			}
520 		}
521 
522 		/* add column headers both for raw and aggregated data */
523 		if (m_ColumnHeaders) {
524 			ArrayData header;
525 
526 			for (const String& columnName : m_Columns) {
527 				header.push_back(columnName);
528 			}
529 
530 			for (size_t i = 1; i <= m_Aggregators.size(); i++) {
531 				header.push_back("stats_" + Convert::ToString(i));
532 			}
533 
534 			AppendResultRow(result, new Array(std::move(header)), first_row);
535 		}
536 
537 		for (const auto& kv : allStats) {
538 			ArrayData row;
539 
540 			row.reserve(m_Columns.size() + m_Aggregators.size());
541 
542 			for (const Value& keyPart : kv.first) {
543 				row.push_back(keyPart);
544 			}
545 
546 			auto& stats = kv.second;
547 
548 			for (size_t i = 0; i < m_Aggregators.size(); i++)
549 				row.push_back(m_Aggregators[i]->GetResultAndFreeState(stats[i]));
550 
551 			AppendResultRow(result, new Array(std::move(row)), first_row);
552 		}
553 
554 		/* add a bogus zero value if aggregated is empty*/
555 		if (allStats.empty()) {
556 			ArrayData row;
557 
558 			row.reserve(m_Aggregators.size());
559 
560 			for (size_t i = 1; i <= m_Aggregators.size(); i++) {
561 				row.push_back(0);
562 			}
563 
564 			AppendResultRow(result, new Array(std::move(row)), first_row);
565 		}
566 	}
567 
568 	EndResultSet(result);
569 
570 	SendResponse(stream, LivestatusErrorOK, result.str());
571 }
572 
ExecuteCommandHelper(const Stream::Ptr & stream)573 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
574 {
575 	{
576 		std::unique_lock<std::mutex> lock(l_QueryMutex);
577 
578 		l_ExternalCommands++;
579 	}
580 
581 	Log(LogNotice, "LivestatusQuery")
582 		<< "Executing command: " << m_Command;
583 	ExternalCommandProcessor::Execute(m_Command);
584 	SendResponse(stream, LivestatusErrorOK, "");
585 }
586 
ExecuteErrorHelper(const Stream::Ptr & stream)587 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
588 {
589 	Log(LogDebug, "LivestatusQuery")
590 		<< "ERROR: Code: '" << m_ErrorCode << "' Message: '" << m_ErrorMessage << "'.";
591 	SendResponse(stream, m_ErrorCode, m_ErrorMessage);
592 }
593 
SendResponse(const Stream::Ptr & stream,int code,const String & data)594 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
595 {
596 	if (m_ResponseHeader == "fixed16")
597 		PrintFixed16(stream, code, data);
598 
599 	if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
600 		try {
601 			stream->Write(data.CStr(), data.GetLength());
602 		} catch (const std::exception&) {
603 			Log(LogCritical, "LivestatusQuery", "Cannot write query response to socket.");
604 		}
605 	}
606 }
607 
PrintFixed16(const Stream::Ptr & stream,int code,const String & data)608 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
609 {
610 	ASSERT(code >= 100 && code <= 999);
611 
612 	String sCode = Convert::ToString(code);
613 	String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
614 
615 	String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
616 
617 	try {
618 		stream->Write(header.CStr(), header.GetLength());
619 	} catch (const std::exception&) {
620 		Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
621 	}
622 }
623 
Execute(const Stream::Ptr & stream)624 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
625 {
626 	try {
627 		Log(LogNotice, "LivestatusQuery")
628 			<< "Executing livestatus query: " << m_Verb;
629 
630 		if (m_Verb == "GET")
631 			ExecuteGetHelper(stream);
632 		else if (m_Verb == "COMMAND")
633 			ExecuteCommandHelper(stream);
634 		else if (m_Verb == "ERROR")
635 			ExecuteErrorHelper(stream);
636 		else
637 			BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
638 	} catch (const std::exception& ex) {
639 		SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));
640 	}
641 
642 	if (!m_KeepAlive) {
643 		stream->Close();
644 		return false;
645 	}
646 
647 	return true;
648 }
649