1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "perfdata/gelfwriter.hpp"
4 #include "perfdata/gelfwriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/notification.hpp"
7 #include "icinga/checkcommand.hpp"
8 #include "icinga/macroprocessor.hpp"
9 #include "icinga/compatutility.hpp"
10 #include "base/tcpsocket.hpp"
11 #include "base/configtype.hpp"
12 #include "base/objectlock.hpp"
13 #include "base/logger.hpp"
14 #include "base/utility.hpp"
15 #include "base/perfdatavalue.hpp"
16 #include "base/application.hpp"
17 #include "base/stream.hpp"
18 #include "base/networkstream.hpp"
19 #include "base/context.hpp"
20 #include "base/exception.hpp"
21 #include "base/json.hpp"
22 #include "base/statsfunction.hpp"
23 #include <boost/algorithm/string/replace.hpp>
24 #include <utility>
25 #include "base/io-engine.hpp"
26 #include <boost/asio/write.hpp>
27 #include <boost/asio/buffer.hpp>
28 #include <boost/system/error_code.hpp>
29 #include <boost/asio/error.hpp>
30
31 using namespace icinga;
32
33 REGISTER_TYPE(GelfWriter);
34
35 REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
36
OnConfigLoaded()37 void GelfWriter::OnConfigLoaded()
38 {
39 ObjectImpl<GelfWriter>::OnConfigLoaded();
40
41 m_WorkQueue.SetName("GelfWriter, " + GetName());
42
43 if (!GetEnableHa()) {
44 Log(LogDebug, "GelfWriter")
45 << "HA functionality disabled. Won't pause connection: " << GetName();
46
47 SetHAMode(HARunEverywhere);
48 } else {
49 SetHAMode(HARunOnce);
50 }
51 }
52
StatsFunc(const Dictionary::Ptr & status,const Array::Ptr & perfdata)53 void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
54 {
55 DictionaryData nodes;
56
57 for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
58 size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
59 double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
60
61 nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
62 { "work_queue_items", workQueueItems },
63 { "work_queue_item_rate", workQueueItemRate },
64 { "connected", gelfwriter->GetConnected() },
65 { "source", gelfwriter->GetSource() }
66 }));
67
68 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
69 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
70 }
71
72 status->Set("gelfwriter", new Dictionary(std::move(nodes)));
73 }
74
Resume()75 void GelfWriter::Resume()
76 {
77 ObjectImpl<GelfWriter>::Resume();
78
79 Log(LogInformation, "GelfWriter")
80 << "'" << GetName() << "' resumed.";
81
82 /* Register exception handler for WQ tasks. */
83 m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
84
85 /* Timer for reconnecting */
86 m_ReconnectTimer = new Timer();
87 m_ReconnectTimer->SetInterval(10);
88 m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
89 m_ReconnectTimer->Start();
90 m_ReconnectTimer->Reschedule(0);
91
92 /* Register event handlers. */
93 Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
94 CheckResultHandler(checkable, cr);
95 });
96 Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification, const Checkable::Ptr& checkable,
97 const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr, const String& author,
98 const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
99 NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
100 });
101 Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type,
102 const MessageOrigin::Ptr&) {
103 StateChangeHandler(checkable, cr, type);
104 });
105 }
106
107 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
Pause()108 void GelfWriter::Pause()
109 {
110 m_ReconnectTimer.reset();
111
112 try {
113 ReconnectInternal();
114 } catch (const std::exception&) {
115 Log(LogInformation, "GelfWriter")
116 << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
117
118 ObjectImpl<GelfWriter>::Pause();
119 return;
120 }
121
122 m_WorkQueue.Join();
123 DisconnectInternal();
124
125 Log(LogInformation, "GelfWriter")
126 << "'" << GetName() << "' paused.";
127
128 ObjectImpl<GelfWriter>::Pause();
129 }
130
AssertOnWorkQueue()131 void GelfWriter::AssertOnWorkQueue()
132 {
133 ASSERT(m_WorkQueue.IsWorkerThread());
134 }
135
ExceptionHandler(boost::exception_ptr exp)136 void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
137 {
138 Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false);
139 Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true);
140
141 DisconnectInternal();
142 }
143
Reconnect()144 void GelfWriter::Reconnect()
145 {
146 AssertOnWorkQueue();
147
148 if (IsPaused()) {
149 SetConnected(false);
150 return;
151 }
152
153 ReconnectInternal();
154 }
155
ReconnectInternal()156 void GelfWriter::ReconnectInternal()
157 {
158 double startTime = Utility::GetTime();
159
160 CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
161
162 SetShouldConnect(true);
163
164 if (GetConnected())
165 return;
166
167 Log(LogNotice, "GelfWriter")
168 << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
169
170 bool ssl = GetEnableTls();
171
172 if (ssl) {
173 Shared<boost::asio::ssl::context>::Ptr sslContext;
174
175 try {
176 sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
177 } catch (const std::exception& ex) {
178 Log(LogWarning, "GelfWriter")
179 << "Unable to create SSL context.";
180 throw;
181 }
182
183 m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
184
185 } else {
186 m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
187 }
188
189 try {
190 icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
191 } catch (const std::exception& ex) {
192 Log(LogWarning, "GelfWriter")
193 << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
194 throw;
195 }
196
197 if (ssl) {
198 auto& tlsStream (m_Stream.first->next_layer());
199
200 try {
201 tlsStream.handshake(tlsStream.client);
202 } catch (const std::exception& ex) {
203 Log(LogWarning, "GelfWriter")
204 << "TLS handshake with host '" << GetHost() << " failed.'";
205 throw;
206 }
207
208 if (!GetInsecureNoverify()) {
209 if (!tlsStream.GetPeerCertificate()) {
210 BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate."));
211 }
212
213 if (!tlsStream.IsVerifyOK()) {
214 BOOST_THROW_EXCEPTION(std::runtime_error(
215 "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
216 ));
217 }
218 }
219 }
220
221 SetConnected(true);
222
223 Log(LogInformation, "GelfWriter")
224 << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
225 }
226
ReconnectTimerHandler()227 void GelfWriter::ReconnectTimerHandler()
228 {
229 m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal);
230 }
231
Disconnect()232 void GelfWriter::Disconnect()
233 {
234 AssertOnWorkQueue();
235
236 DisconnectInternal();
237 }
238
DisconnectInternal()239 void GelfWriter::DisconnectInternal()
240 {
241 if (!GetConnected())
242 return;
243
244 if (m_Stream.first) {
245 boost::system::error_code ec;
246 m_Stream.first->next_layer().shutdown(ec);
247
248 // https://stackoverflow.com/a/25703699
249 // As long as the error code's category is not an SSL category, then the protocol was securely shutdown
250 if (ec.category() == boost::asio::error::get_ssl_category()) {
251 Log(LogCritical, "GelfWriter")
252 << "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
253 }
254 } else if (m_Stream.second) {
255 m_Stream.second->close();
256 }
257
258 SetConnected(false);
259
260 }
261
CheckResultHandler(const Checkable::Ptr & checkable,const CheckResult::Ptr & cr)262 void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
263 {
264 if (IsPaused())
265 return;
266
267 m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
268 }
269
CheckResultHandlerInternal(const Checkable::Ptr & checkable,const CheckResult::Ptr & cr)270 void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
271 {
272 AssertOnWorkQueue();
273
274 CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
275
276 Log(LogDebug, "GelfWriter")
277 << "Processing check result for '" << checkable->GetName() << "'";
278
279 Host::Ptr host;
280 Service::Ptr service;
281 tie(host, service) = GetHostService(checkable);
282
283 Dictionary::Ptr fields = new Dictionary();
284
285 if (service) {
286 fields->Set("_service_name", service->GetShortName());
287 fields->Set("_service_state", Service::StateToString(service->GetState()));
288 fields->Set("_last_state", service->GetLastState());
289 fields->Set("_last_hard_state", service->GetLastHardState());
290 } else {
291 fields->Set("_last_state", host->GetLastState());
292 fields->Set("_last_hard_state", host->GetLastHardState());
293 }
294
295 fields->Set("_hostname", host->GetName());
296 fields->Set("_type", "CHECK RESULT");
297 fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
298
299 fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
300 fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
301
302 fields->Set("_reachable", checkable->IsReachable());
303
304 CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
305
306 if (checkCommand)
307 fields->Set("_check_command", checkCommand->GetName());
308
309 double ts = Utility::GetTime();
310
311 if (cr) {
312 fields->Set("_latency", cr->CalculateLatency());
313 fields->Set("_execution_time", cr->CalculateExecutionTime());
314 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
315 fields->Set("full_message", cr->GetOutput());
316 fields->Set("_check_source", cr->GetCheckSource());
317 ts = cr->GetExecutionEnd();
318 }
319
320 if (cr && GetEnableSendPerfdata()) {
321 Array::Ptr perfdata = cr->GetPerformanceData();
322
323 if (perfdata) {
324 ObjectLock olock(perfdata);
325 for (const Value& val : perfdata) {
326 PerfdataValue::Ptr pdv;
327
328 if (val.IsObjectType<PerfdataValue>())
329 pdv = val;
330 else {
331 try {
332 pdv = PerfdataValue::Parse(val);
333 } catch (const std::exception&) {
334 Log(LogWarning, "GelfWriter")
335 << "Ignoring invalid perfdata for checkable '"
336 << checkable->GetName() << "' and command '"
337 << checkCommand->GetName() << "' with value: " << val;
338 continue;
339 }
340 }
341
342 String escaped_key = pdv->GetLabel();
343 boost::replace_all(escaped_key, " ", "_");
344 boost::replace_all(escaped_key, ".", "_");
345 boost::replace_all(escaped_key, "\\", "_");
346 boost::algorithm::replace_all(escaped_key, "::", ".");
347
348 fields->Set("_" + escaped_key, pdv->GetValue());
349
350 if (!pdv->GetMin().IsEmpty())
351 fields->Set("_" + escaped_key + "_min", pdv->GetMin());
352 if (!pdv->GetMax().IsEmpty())
353 fields->Set("_" + escaped_key + "_max", pdv->GetMax());
354 if (!pdv->GetWarn().IsEmpty())
355 fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
356 if (!pdv->GetCrit().IsEmpty())
357 fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
358
359 if (!pdv->GetUnit().IsEmpty())
360 fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
361 }
362 }
363 }
364
365 SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
366 }
367
NotificationToUserHandler(const Notification::Ptr & notification,const Checkable::Ptr & checkable,const User::Ptr & user,NotificationType notificationType,CheckResult::Ptr const & cr,const String & author,const String & commentText,const String & commandName)368 void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
369 const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
370 const String& author, const String& commentText, const String& commandName)
371 {
372 if (IsPaused())
373 return;
374
375 m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() {
376 NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName);
377 });
378 }
379
NotificationToUserHandlerInternal(const Notification::Ptr & notification,const Checkable::Ptr & checkable,const User::Ptr & user,NotificationType notificationType,CheckResult::Ptr const & cr,const String & author,const String & commentText,const String & commandName)380 void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
381 const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
382 const String& author, const String& commentText, const String& commandName)
383 {
384 AssertOnWorkQueue();
385
386 CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
387
388 Log(LogDebug, "GelfWriter")
389 << "Processing notification for '" << checkable->GetName() << "'";
390
391 Host::Ptr host;
392 Service::Ptr service;
393 tie(host, service) = GetHostService(checkable);
394
395 String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types.
396
397 String authorComment = "";
398
399 if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
400 authorComment = author + ";" + commentText;
401 }
402
403 String output;
404 double ts = Utility::GetTime();
405
406 if (cr) {
407 output = CompatUtility::GetCheckResultOutput(cr);
408 ts = cr->GetExecutionEnd();
409 }
410
411 Dictionary::Ptr fields = new Dictionary();
412
413 if (service) {
414 fields->Set("_type", "SERVICE NOTIFICATION");
415 //TODO: fix this to _service_name
416 fields->Set("_service", service->GetShortName());
417 fields->Set("short_message", output);
418 } else {
419 fields->Set("_type", "HOST NOTIFICATION");
420 fields->Set("short_message", output);
421 }
422
423 fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
424
425 fields->Set("_hostname", host->GetName());
426 fields->Set("_command", commandName);
427 fields->Set("_notification_type", notificationTypeString);
428 fields->Set("_comment", authorComment);
429
430 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
431
432 if (commandObj)
433 fields->Set("_check_command", commandObj->GetName());
434
435 SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
436 }
437
StateChangeHandler(const Checkable::Ptr & checkable,const CheckResult::Ptr & cr,StateType type)438 void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
439 {
440 if (IsPaused())
441 return;
442
443 m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
444 }
445
StateChangeHandlerInternal(const Checkable::Ptr & checkable,const CheckResult::Ptr & cr,StateType type)446 void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
447 {
448 AssertOnWorkQueue();
449
450 CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
451
452 Log(LogDebug, "GelfWriter")
453 << "Processing state change for '" << checkable->GetName() << "'";
454
455 Host::Ptr host;
456 Service::Ptr service;
457 tie(host, service) = GetHostService(checkable);
458
459 Dictionary::Ptr fields = new Dictionary();
460
461 fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
462 fields->Set("_type", "STATE CHANGE");
463 fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
464 fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
465 fields->Set("_hostname", host->GetName());
466
467 if (service) {
468 fields->Set("_service_name", service->GetShortName());
469 fields->Set("_service_state", Service::StateToString(service->GetState()));
470 fields->Set("_last_state", service->GetLastState());
471 fields->Set("_last_hard_state", service->GetLastHardState());
472 } else {
473 fields->Set("_last_state", host->GetLastState());
474 fields->Set("_last_hard_state", host->GetLastHardState());
475 }
476
477 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
478
479 if (commandObj)
480 fields->Set("_check_command", commandObj->GetName());
481
482 double ts = Utility::GetTime();
483
484 if (cr) {
485 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
486 fields->Set("full_message", cr->GetOutput());
487 fields->Set("_check_source", cr->GetCheckSource());
488 ts = cr->GetExecutionEnd();
489 }
490
491 SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
492 }
493
ComposeGelfMessage(const Dictionary::Ptr & fields,const String & source,double ts)494 String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
495 {
496 fields->Set("version", "1.1");
497 fields->Set("host", source);
498 fields->Set("timestamp", ts);
499
500 return JsonEncode(fields);
501 }
502
SendLogMessage(const Checkable::Ptr & checkable,const String & gelfMessage)503 void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
504 {
505 std::ostringstream msgbuf;
506 msgbuf << gelfMessage;
507 msgbuf << '\0';
508
509 String log = msgbuf.str();
510
511 ObjectLock olock(this);
512
513 if (!GetConnected())
514 return;
515
516 try {
517 Log(LogDebug, "GelfWriter")
518 << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
519
520 if (m_Stream.first) {
521 boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
522 m_Stream.first->flush();
523 } else {
524 boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
525 m_Stream.second->flush();
526 }
527 } catch (const std::exception& ex) {
528 Log(LogCritical, "GelfWriter")
529 << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
530
531 throw ex;
532 }
533 }
534