1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #include "perfdata/gelfwriter.hpp"
4 #include "perfdata/gelfwriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/notification.hpp"
7 #include "icinga/checkcommand.hpp"
8 #include "icinga/macroprocessor.hpp"
9 #include "icinga/compatutility.hpp"
10 #include "base/tcpsocket.hpp"
11 #include "base/configtype.hpp"
12 #include "base/objectlock.hpp"
13 #include "base/logger.hpp"
14 #include "base/utility.hpp"
15 #include "base/perfdatavalue.hpp"
16 #include "base/application.hpp"
17 #include "base/stream.hpp"
18 #include "base/networkstream.hpp"
19 #include "base/context.hpp"
20 #include "base/exception.hpp"
21 #include "base/json.hpp"
22 #include "base/statsfunction.hpp"
23 #include <boost/algorithm/string/replace.hpp>
24 #include <utility>
25 #include "base/io-engine.hpp"
26 #include <boost/asio/write.hpp>
27 #include <boost/asio/buffer.hpp>
28 #include <boost/system/error_code.hpp>
29 #include <boost/asio/error.hpp>
30 
31 using namespace icinga;
32 
33 REGISTER_TYPE(GelfWriter);
34 
35 REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
36 
OnConfigLoaded()37 void GelfWriter::OnConfigLoaded()
38 {
39 	ObjectImpl<GelfWriter>::OnConfigLoaded();
40 
41 	m_WorkQueue.SetName("GelfWriter, " + GetName());
42 
43 	if (!GetEnableHa()) {
44 		Log(LogDebug, "GelfWriter")
45 			<< "HA functionality disabled. Won't pause connection: " << GetName();
46 
47 		SetHAMode(HARunEverywhere);
48 	} else {
49 		SetHAMode(HARunOnce);
50 	}
51 }
52 
StatsFunc(const Dictionary::Ptr & status,const Array::Ptr & perfdata)53 void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
54 {
55 	DictionaryData nodes;
56 
57 	for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
58 		size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
59 		double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
60 
61 		nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
62 			{ "work_queue_items", workQueueItems },
63 			{ "work_queue_item_rate", workQueueItemRate },
64 			{ "connected", gelfwriter->GetConnected() },
65 			{ "source", gelfwriter->GetSource() }
66 		}));
67 
68 		perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
69 		perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
70 	}
71 
72 	status->Set("gelfwriter", new Dictionary(std::move(nodes)));
73 }
74 
Resume()75 void GelfWriter::Resume()
76 {
77 	ObjectImpl<GelfWriter>::Resume();
78 
79 	Log(LogInformation, "GelfWriter")
80 		<< "'" << GetName() << "' resumed.";
81 
82 	/* Register exception handler for WQ tasks. */
83 	m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
84 
85 	/* Timer for reconnecting */
86 	m_ReconnectTimer = new Timer();
87 	m_ReconnectTimer->SetInterval(10);
88 	m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
89 	m_ReconnectTimer->Start();
90 	m_ReconnectTimer->Reschedule(0);
91 
92 	/* Register event handlers. */
93 	Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
94 		CheckResultHandler(checkable, cr);
95 	});
96 	Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification, const Checkable::Ptr& checkable,
97 		const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr, const String& author,
98 		const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
99 		NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
100 	});
101 	Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type,
102 		const MessageOrigin::Ptr&) {
103 		StateChangeHandler(checkable, cr, type);
104 	});
105 }
106 
107 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
Pause()108 void GelfWriter::Pause()
109 {
110 	m_ReconnectTimer.reset();
111 
112 	try {
113 		ReconnectInternal();
114 	} catch (const std::exception&) {
115 		Log(LogInformation, "GelfWriter")
116 			<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
117 
118 		ObjectImpl<GelfWriter>::Pause();
119 		return;
120 	}
121 
122 	m_WorkQueue.Join();
123 	DisconnectInternal();
124 
125 	Log(LogInformation, "GelfWriter")
126 		<< "'" << GetName() << "' paused.";
127 
128 	ObjectImpl<GelfWriter>::Pause();
129 }
130 
AssertOnWorkQueue()131 void GelfWriter::AssertOnWorkQueue()
132 {
133 	ASSERT(m_WorkQueue.IsWorkerThread());
134 }
135 
ExceptionHandler(boost::exception_ptr exp)136 void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
137 {
138 	Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false);
139 	Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true);
140 
141 	DisconnectInternal();
142 }
143 
Reconnect()144 void GelfWriter::Reconnect()
145 {
146 	AssertOnWorkQueue();
147 
148 	if (IsPaused()) {
149 		SetConnected(false);
150 		return;
151 	}
152 
153 	ReconnectInternal();
154 }
155 
ReconnectInternal()156 void GelfWriter::ReconnectInternal()
157 {
158 	double startTime = Utility::GetTime();
159 
160 	CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
161 
162 	SetShouldConnect(true);
163 
164 	if (GetConnected())
165 		return;
166 
167 	Log(LogNotice, "GelfWriter")
168 		<< "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
169 
170 	bool ssl = GetEnableTls();
171 
172 	if (ssl) {
173 		Shared<boost::asio::ssl::context>::Ptr sslContext;
174 
175 		try {
176 			sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
177 		} catch (const std::exception& ex) {
178 			Log(LogWarning, "GelfWriter")
179 				<< "Unable to create SSL context.";
180 			throw;
181 		}
182 
183 		m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
184 
185 	} else {
186 		m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
187 	}
188 
189 	try {
190 		icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
191 	} catch (const std::exception& ex) {
192 		Log(LogWarning, "GelfWriter")
193 			<< "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
194 		throw;
195 	}
196 
197 	if (ssl) {
198 		auto& tlsStream (m_Stream.first->next_layer());
199 
200 		try {
201 			tlsStream.handshake(tlsStream.client);
202 		} catch (const std::exception& ex) {
203 			Log(LogWarning, "GelfWriter")
204 				<< "TLS handshake with host '" << GetHost() << " failed.'";
205 			throw;
206 		}
207 
208 		if (!GetInsecureNoverify()) {
209 			if (!tlsStream.GetPeerCertificate()) {
210 				BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate."));
211 			}
212 
213 			if (!tlsStream.IsVerifyOK()) {
214 				BOOST_THROW_EXCEPTION(std::runtime_error(
215 					"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
216 				));
217 			}
218 		}
219 	}
220 
221 	SetConnected(true);
222 
223 	Log(LogInformation, "GelfWriter")
224 		<< "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
225 }
226 
ReconnectTimerHandler()227 void GelfWriter::ReconnectTimerHandler()
228 {
229 	m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal);
230 }
231 
Disconnect()232 void GelfWriter::Disconnect()
233 {
234 	AssertOnWorkQueue();
235 
236 	DisconnectInternal();
237 }
238 
DisconnectInternal()239 void GelfWriter::DisconnectInternal()
240 {
241 	if (!GetConnected())
242 		return;
243 
244 	if (m_Stream.first) {
245 		boost::system::error_code ec;
246 		m_Stream.first->next_layer().shutdown(ec);
247 
248 		// https://stackoverflow.com/a/25703699
249 		// As long as the error code's category is not an SSL category, then the protocol was securely shutdown
250 		if (ec.category() == boost::asio::error::get_ssl_category()) {
251 			Log(LogCritical, "GelfWriter")
252 				<< "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
253 		}
254 	} else if (m_Stream.second) {
255 		m_Stream.second->close();
256 	}
257 
258 	SetConnected(false);
259 
260 }
261 
CheckResultHandler(const Checkable::Ptr & checkable,const CheckResult::Ptr & cr)262 void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
263 {
264 	if (IsPaused())
265 		return;
266 
267 	m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
268 }
269 
CheckResultHandlerInternal(const Checkable::Ptr & checkable,const CheckResult::Ptr & cr)270 void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
271 {
272 	AssertOnWorkQueue();
273 
274 	CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
275 
276 	Log(LogDebug, "GelfWriter")
277 		<< "Processing check result for '" << checkable->GetName() << "'";
278 
279 	Host::Ptr host;
280 	Service::Ptr service;
281 	tie(host, service) = GetHostService(checkable);
282 
283 	Dictionary::Ptr fields = new Dictionary();
284 
285 	if (service) {
286 		fields->Set("_service_name", service->GetShortName());
287 		fields->Set("_service_state", Service::StateToString(service->GetState()));
288 		fields->Set("_last_state", service->GetLastState());
289 		fields->Set("_last_hard_state", service->GetLastHardState());
290 	} else {
291 		fields->Set("_last_state", host->GetLastState());
292 		fields->Set("_last_hard_state", host->GetLastHardState());
293 	}
294 
295 	fields->Set("_hostname", host->GetName());
296 	fields->Set("_type", "CHECK RESULT");
297 	fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
298 
299 	fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
300 	fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
301 
302 	fields->Set("_reachable", checkable->IsReachable());
303 
304 	CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
305 
306 	if (checkCommand)
307 		fields->Set("_check_command", checkCommand->GetName());
308 
309 	double ts = Utility::GetTime();
310 
311 	if (cr) {
312 		fields->Set("_latency", cr->CalculateLatency());
313 		fields->Set("_execution_time", cr->CalculateExecutionTime());
314 		fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
315 		fields->Set("full_message", cr->GetOutput());
316 		fields->Set("_check_source", cr->GetCheckSource());
317 		ts = cr->GetExecutionEnd();
318 	}
319 
320 	if (cr && GetEnableSendPerfdata()) {
321 		Array::Ptr perfdata = cr->GetPerformanceData();
322 
323 		if (perfdata) {
324 			ObjectLock olock(perfdata);
325 			for (const Value& val : perfdata) {
326 				PerfdataValue::Ptr pdv;
327 
328 				if (val.IsObjectType<PerfdataValue>())
329 					pdv = val;
330 				else {
331 					try {
332 						pdv = PerfdataValue::Parse(val);
333 					} catch (const std::exception&) {
334 						Log(LogWarning, "GelfWriter")
335 							<< "Ignoring invalid perfdata for checkable '"
336 							<< checkable->GetName() << "' and command '"
337 							<< checkCommand->GetName() << "' with value: " << val;
338 						continue;
339 					}
340 				}
341 
342 				String escaped_key = pdv->GetLabel();
343 				boost::replace_all(escaped_key, " ", "_");
344 				boost::replace_all(escaped_key, ".", "_");
345 				boost::replace_all(escaped_key, "\\", "_");
346 				boost::algorithm::replace_all(escaped_key, "::", ".");
347 
348 				fields->Set("_" + escaped_key, pdv->GetValue());
349 
350 				if (!pdv->GetMin().IsEmpty())
351 					fields->Set("_" + escaped_key + "_min", pdv->GetMin());
352 				if (!pdv->GetMax().IsEmpty())
353 					fields->Set("_" + escaped_key + "_max", pdv->GetMax());
354 				if (!pdv->GetWarn().IsEmpty())
355 					fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
356 				if (!pdv->GetCrit().IsEmpty())
357 					fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
358 
359 				if (!pdv->GetUnit().IsEmpty())
360 					fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
361 			}
362 		}
363 	}
364 
365 	SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
366 }
367 
NotificationToUserHandler(const Notification::Ptr & notification,const Checkable::Ptr & checkable,const User::Ptr & user,NotificationType notificationType,CheckResult::Ptr const & cr,const String & author,const String & commentText,const String & commandName)368 void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
369 	const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
370 	const String& author, const String& commentText, const String& commandName)
371 {
372 	if (IsPaused())
373 		return;
374 
375 	m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() {
376 		NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName);
377 	});
378 }
379 
NotificationToUserHandlerInternal(const Notification::Ptr & notification,const Checkable::Ptr & checkable,const User::Ptr & user,NotificationType notificationType,CheckResult::Ptr const & cr,const String & author,const String & commentText,const String & commandName)380 void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
381 	const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
382 	const String& author, const String& commentText, const String& commandName)
383 {
384 	AssertOnWorkQueue();
385 
386 	CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
387 
388 	Log(LogDebug, "GelfWriter")
389 		<< "Processing notification for '" << checkable->GetName() << "'";
390 
391 	Host::Ptr host;
392 	Service::Ptr service;
393 	tie(host, service) = GetHostService(checkable);
394 
395 	String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types.
396 
397 	String authorComment = "";
398 
399 	if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
400 		authorComment = author + ";" + commentText;
401 	}
402 
403 	String output;
404 	double ts = Utility::GetTime();
405 
406 	if (cr) {
407 		output = CompatUtility::GetCheckResultOutput(cr);
408 		ts = cr->GetExecutionEnd();
409 	}
410 
411 	Dictionary::Ptr fields = new Dictionary();
412 
413 	if (service) {
414 		fields->Set("_type", "SERVICE NOTIFICATION");
415 		//TODO: fix this to _service_name
416 		fields->Set("_service", service->GetShortName());
417 		fields->Set("short_message", output);
418 	} else {
419 		fields->Set("_type", "HOST NOTIFICATION");
420 		fields->Set("short_message", output);
421 	}
422 
423 	fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
424 
425 	fields->Set("_hostname", host->GetName());
426 	fields->Set("_command", commandName);
427 	fields->Set("_notification_type", notificationTypeString);
428 	fields->Set("_comment", authorComment);
429 
430 	CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
431 
432 	if (commandObj)
433 		fields->Set("_check_command", commandObj->GetName());
434 
435 	SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
436 }
437 
StateChangeHandler(const Checkable::Ptr & checkable,const CheckResult::Ptr & cr,StateType type)438 void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
439 {
440 	if (IsPaused())
441 		return;
442 
443 	m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
444 }
445 
StateChangeHandlerInternal(const Checkable::Ptr & checkable,const CheckResult::Ptr & cr,StateType type)446 void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
447 {
448 	AssertOnWorkQueue();
449 
450 	CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
451 
452 	Log(LogDebug, "GelfWriter")
453 		<< "Processing state change for '" << checkable->GetName() << "'";
454 
455 	Host::Ptr host;
456 	Service::Ptr service;
457 	tie(host, service) = GetHostService(checkable);
458 
459 	Dictionary::Ptr fields = new Dictionary();
460 
461 	fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
462 	fields->Set("_type", "STATE CHANGE");
463 	fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
464 	fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
465 	fields->Set("_hostname", host->GetName());
466 
467 	if (service) {
468 		fields->Set("_service_name", service->GetShortName());
469 		fields->Set("_service_state", Service::StateToString(service->GetState()));
470 		fields->Set("_last_state", service->GetLastState());
471 		fields->Set("_last_hard_state", service->GetLastHardState());
472 	} else {
473 		fields->Set("_last_state", host->GetLastState());
474 		fields->Set("_last_hard_state", host->GetLastHardState());
475 	}
476 
477 	CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
478 
479 	if (commandObj)
480 		fields->Set("_check_command", commandObj->GetName());
481 
482 	double ts = Utility::GetTime();
483 
484 	if (cr) {
485 		fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
486 		fields->Set("full_message", cr->GetOutput());
487 		fields->Set("_check_source", cr->GetCheckSource());
488 		ts = cr->GetExecutionEnd();
489 	}
490 
491 	SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
492 }
493 
ComposeGelfMessage(const Dictionary::Ptr & fields,const String & source,double ts)494 String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
495 {
496 	fields->Set("version", "1.1");
497 	fields->Set("host", source);
498 	fields->Set("timestamp", ts);
499 
500 	return JsonEncode(fields);
501 }
502 
SendLogMessage(const Checkable::Ptr & checkable,const String & gelfMessage)503 void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
504 {
505 	std::ostringstream msgbuf;
506 	msgbuf << gelfMessage;
507 	msgbuf << '\0';
508 
509 	String log = msgbuf.str();
510 
511 	ObjectLock olock(this);
512 
513 	if (!GetConnected())
514 		return;
515 
516 	try {
517 		Log(LogDebug, "GelfWriter")
518 			<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
519 
520 		if (m_Stream.first) {
521 			boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
522 			m_Stream.first->flush();
523 		} else {
524 			boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
525 			m_Stream.second->flush();
526 		}
527 	} catch (const std::exception& ex) {
528 		Log(LogCritical, "GelfWriter")
529 			<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
530 
531 		throw ex;
532 	}
533 }
534