1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "db_ido/dbconnection.hpp"
4 #include "db_ido/dbconnection-ti.cpp"
5 #include "db_ido/dbvalue.hpp"
6 #include "icinga/icingaapplication.hpp"
7 #include "icinga/host.hpp"
8 #include "icinga/service.hpp"
9 #include "base/configtype.hpp"
10 #include "base/convert.hpp"
11 #include "base/objectlock.hpp"
12 #include "base/utility.hpp"
13 #include "base/logger.hpp"
14 #include "base/exception.hpp"
15
16 using namespace icinga;
17
18 REGISTER_TYPE(DbConnection);
19
20 Timer::Ptr DbConnection::m_ProgramStatusTimer;
21 boost::once_flag DbConnection::m_OnceFlag = BOOST_ONCE_INIT;
22
OnConfigLoaded()23 void DbConnection::OnConfigLoaded()
24 {
25 ConfigObject::OnConfigLoaded();
26
27 Value categories = GetCategories();
28
29 SetCategoryFilter(FilterArrayToInt(categories, DbQuery::GetCategoryFilterMap(), DbCatEverything));
30
31 if (!GetEnableHa()) {
32 Log(LogDebug, "DbConnection")
33 << "HA functionality disabled. Won't pause IDO connection: " << GetName();
34
35 SetHAMode(HARunEverywhere);
36 }
37
38 boost::call_once(m_OnceFlag, InitializeDbTimer);
39 }
40
Start(bool runtimeCreated)41 void DbConnection::Start(bool runtimeCreated)
42 {
43 ObjectImpl<DbConnection>::Start(runtimeCreated);
44
45 Log(LogInformation, "DbConnection")
46 << "'" << GetName() << "' started.";
47
48 DbObject::OnQuery.connect([this](const DbQuery& query) { ExecuteQuery(query); });
49 DbObject::OnMultipleQueries.connect([this](const std::vector<DbQuery>& multiQueries) { ExecuteMultipleQueries(multiQueries); });
50 }
51
Stop(bool runtimeRemoved)52 void DbConnection::Stop(bool runtimeRemoved)
53 {
54 Log(LogInformation, "DbConnection")
55 << "'" << GetName() << "' stopped.";
56
57 ObjectImpl<DbConnection>::Stop(runtimeRemoved);
58 }
59
EnableActiveChangedHandler()60 void DbConnection::EnableActiveChangedHandler()
61 {
62 if (!m_ActiveChangedHandler) {
63 ConfigObject::OnActiveChanged.connect([this](const ConfigObject::Ptr& object, const Value&) { UpdateObject(object); });
64 m_ActiveChangedHandler = true;
65 }
66 }
67
Resume()68 void DbConnection::Resume()
69 {
70 ConfigObject::Resume();
71
72 Log(LogInformation, "DbConnection")
73 << "Resuming IDO connection: " << GetName();
74
75 m_CleanUpTimer = new Timer();
76 m_CleanUpTimer->SetInterval(60);
77 m_CleanUpTimer->OnTimerExpired.connect([this](const Timer * const&) { CleanUpHandler(); });
78 m_CleanUpTimer->Start();
79
80 m_LogStatsTimeout = 0;
81
82 m_LogStatsTimer = new Timer();
83 m_LogStatsTimer->SetInterval(10);
84 m_LogStatsTimer->OnTimerExpired.connect([this](const Timer * const&) { LogStatsHandler(); });
85 m_LogStatsTimer->Start();
86 }
87
Pause()88 void DbConnection::Pause()
89 {
90 Log(LogInformation, "DbConnection")
91 << "Pausing IDO connection: " << GetName();
92
93 m_CleanUpTimer.reset();
94
95 DbQuery query1;
96 query1.Table = "programstatus";
97 query1.IdColumn = "programstatus_id";
98 query1.Type = DbQueryUpdate;
99 query1.Category = DbCatProgramStatus;
100 query1.WhereCriteria = new Dictionary({
101 { "instance_id", 0 } /* DbConnection class fills in real ID */
102 });
103
104 query1.Fields = new Dictionary({
105 { "instance_id", 0 }, /* DbConnection class fills in real ID */
106 { "program_end_time", DbValue::FromTimestamp(Utility::GetTime()) },
107 { "is_currently_running", 0 },
108 { "process_id", Empty }
109 });
110
111 query1.Priority = PriorityHigh;
112
113 ExecuteQuery(query1);
114
115 NewTransaction();
116
117 m_QueryQueue.Enqueue([this]() { Disconnect(); }, PriorityLow);
118
119 /* Work on remaining tasks but never delete the threads, for HA resuming later. */
120 m_QueryQueue.Join();
121
122 ConfigObject::Pause();
123 }
124
InitializeDbTimer()125 void DbConnection::InitializeDbTimer()
126 {
127 m_ProgramStatusTimer = new Timer();
128 m_ProgramStatusTimer->SetInterval(10);
129 m_ProgramStatusTimer->OnTimerExpired.connect([](const Timer * const&) { UpdateProgramStatus(); });
130 m_ProgramStatusTimer->Start();
131 }
132
InsertRuntimeVariable(const String & key,const Value & value)133 void DbConnection::InsertRuntimeVariable(const String& key, const Value& value)
134 {
135 DbQuery query;
136 query.Table = "runtimevariables";
137 query.Type = DbQueryInsert;
138 query.Category = DbCatProgramStatus;
139 query.Fields = new Dictionary({
140 { "instance_id", 0 }, /* DbConnection class fills in real ID */
141 { "varname", key },
142 { "varvalue", value }
143 });
144 DbObject::OnQuery(query);
145 }
146
UpdateProgramStatus()147 void DbConnection::UpdateProgramStatus()
148 {
149 IcingaApplication::Ptr icingaApplication = IcingaApplication::GetInstance();
150
151 if (!icingaApplication)
152 return;
153
154 Log(LogNotice, "DbConnection")
155 << "Updating programstatus table.";
156
157 std::vector<DbQuery> queries;
158
159 DbQuery query1;
160 query1.Type = DbQueryNewTransaction;
161 query1.Priority = PriorityImmediate;
162 queries.emplace_back(std::move(query1));
163
164 DbQuery query2;
165 query2.Table = "programstatus";
166 query2.IdColumn = "programstatus_id";
167 query2.Type = DbQueryInsert | DbQueryDelete;
168 query2.Category = DbCatProgramStatus;
169
170 query2.Fields = new Dictionary({
171 { "instance_id", 0 }, /* DbConnection class fills in real ID */
172 { "program_version", Application::GetAppVersion() },
173 { "status_update_time", DbValue::FromTimestamp(Utility::GetTime()) },
174 { "program_start_time", DbValue::FromTimestamp(Application::GetStartTime()) },
175 { "is_currently_running", 1 },
176 { "endpoint_name", icingaApplication->GetNodeName() },
177 { "process_id", Utility::GetPid() },
178 { "daemon_mode", 1 },
179 { "last_command_check", DbValue::FromTimestamp(Utility::GetTime()) },
180 { "notifications_enabled", (icingaApplication->GetEnableNotifications() ? 1 : 0) },
181 { "active_host_checks_enabled", (icingaApplication->GetEnableHostChecks() ? 1 : 0) },
182 { "passive_host_checks_enabled", 1 },
183 { "active_service_checks_enabled", (icingaApplication->GetEnableServiceChecks() ? 1 : 0) },
184 { "passive_service_checks_enabled", 1 },
185 { "event_handlers_enabled", (icingaApplication->GetEnableEventHandlers() ? 1 : 0) },
186 { "flap_detection_enabled", (icingaApplication->GetEnableFlapping() ? 1 : 0) },
187 { "process_performance_data", (icingaApplication->GetEnablePerfdata() ? 1 : 0) }
188 });
189
190 query2.WhereCriteria = new Dictionary({
191 { "instance_id", 0 } /* DbConnection class fills in real ID */
192 });
193
194 queries.emplace_back(std::move(query2));
195
196 DbQuery query3;
197 query3.Type = DbQueryNewTransaction;
198 queries.emplace_back(std::move(query3));
199
200 DbObject::OnMultipleQueries(queries);
201
202 DbQuery query4;
203 query4.Table = "runtimevariables";
204 query4.Type = DbQueryDelete;
205 query4.Category = DbCatProgramStatus;
206 query4.WhereCriteria = new Dictionary({
207 { "instance_id", 0 } /* DbConnection class fills in real ID */
208 });
209 DbObject::OnQuery(query4);
210
211 InsertRuntimeVariable("total_services", ConfigType::Get<Service>()->GetObjectCount());
212 InsertRuntimeVariable("total_scheduled_services", ConfigType::Get<Service>()->GetObjectCount());
213 InsertRuntimeVariable("total_hosts", ConfigType::Get<Host>()->GetObjectCount());
214 InsertRuntimeVariable("total_scheduled_hosts", ConfigType::Get<Host>()->GetObjectCount());
215 }
216
CleanUpHandler()217 void DbConnection::CleanUpHandler()
218 {
219 auto now = static_cast<long>(Utility::GetTime());
220
221 struct {
222 String name;
223 String time_column;
224 } tables[] = {
225 { "acknowledgements", "entry_time" },
226 { "commenthistory", "entry_time" },
227 { "contactnotifications", "start_time" },
228 { "contactnotificationmethods", "start_time" },
229 { "downtimehistory", "entry_time" },
230 { "eventhandlers", "start_time" },
231 { "externalcommands", "entry_time" },
232 { "flappinghistory", "event_time" },
233 { "hostchecks", "start_time" },
234 { "logentries", "logentry_time" },
235 { "notifications", "start_time" },
236 { "processevents", "event_time" },
237 { "statehistory", "state_time" },
238 { "servicechecks", "start_time" },
239 { "systemcommands", "start_time" }
240 };
241
242 for (auto& table : tables) {
243 double max_age = GetCleanup()->Get(table.name + "_age");
244
245 if (max_age == 0)
246 continue;
247
248 CleanUpExecuteQuery(table.name, table.time_column, now - max_age);
249 Log(LogNotice, "DbConnection")
250 << "Cleanup (" << table.name << "): " << max_age
251 << " now: " << now
252 << " old: " << now - max_age;
253 }
254
255 }
256
LogStatsHandler()257 void DbConnection::LogStatsHandler()
258 {
259 if (!GetConnected() || IsPaused())
260 return;
261
262 auto pending = m_PendingQueries.load();
263
264 auto now = Utility::GetTime();
265 bool timeoutReached = m_LogStatsTimeout < now;
266
267 if (pending == 0u && !timeoutReached) {
268 return;
269 }
270
271 auto output = round(m_OutputQueries.CalculateRate(now, 10));
272
273 if (pending < output * 5 && !timeoutReached) {
274 return;
275 }
276
277 auto input = round(m_InputQueries.CalculateRate(now, 10));
278
279 Log(LogInformation, GetReflectionType()->GetName())
280 << "Pending queries: " << pending << " (Input: " << input
281 << "/s; Output: " << output << "/s)";
282
283 /* Reschedule next log entry in 5 minutes. */
284 if (timeoutReached) {
285 m_LogStatsTimeout = now + 60 * 5;
286 }
287 }
288
CleanUpExecuteQuery(const String &,const String &,double)289 void DbConnection::CleanUpExecuteQuery(const String&, const String&, double)
290 {
291 /* Default handler does nothing. */
292 }
293
SetConfigHash(const DbObject::Ptr & dbobj,const String & hash)294 void DbConnection::SetConfigHash(const DbObject::Ptr& dbobj, const String& hash)
295 {
296 SetConfigHash(dbobj->GetType(), GetObjectID(dbobj), hash);
297 }
298
SetConfigHash(const DbType::Ptr & type,const DbReference & objid,const String & hash)299 void DbConnection::SetConfigHash(const DbType::Ptr& type, const DbReference& objid, const String& hash)
300 {
301 if (!objid.IsValid())
302 return;
303
304 if (!hash.IsEmpty())
305 m_ConfigHashes[std::make_pair(type, objid)] = hash;
306 else
307 m_ConfigHashes.erase(std::make_pair(type, objid));
308 }
309
GetConfigHash(const DbObject::Ptr & dbobj) const310 String DbConnection::GetConfigHash(const DbObject::Ptr& dbobj) const
311 {
312 return GetConfigHash(dbobj->GetType(), GetObjectID(dbobj));
313 }
314
GetConfigHash(const DbType::Ptr & type,const DbReference & objid) const315 String DbConnection::GetConfigHash(const DbType::Ptr& type, const DbReference& objid) const
316 {
317 if (!objid.IsValid())
318 return String();
319
320 auto it = m_ConfigHashes.find(std::make_pair(type, objid));
321
322 if (it == m_ConfigHashes.end())
323 return String();
324
325 return it->second;
326 }
327
SetObjectID(const DbObject::Ptr & dbobj,const DbReference & dbref)328 void DbConnection::SetObjectID(const DbObject::Ptr& dbobj, const DbReference& dbref)
329 {
330 if (dbref.IsValid())
331 m_ObjectIDs[dbobj] = dbref;
332 else
333 m_ObjectIDs.erase(dbobj);
334 }
335
GetObjectID(const DbObject::Ptr & dbobj) const336 DbReference DbConnection::GetObjectID(const DbObject::Ptr& dbobj) const
337 {
338 auto it = m_ObjectIDs.find(dbobj);
339
340 if (it == m_ObjectIDs.end())
341 return {};
342
343 return it->second;
344 }
345
SetInsertID(const DbObject::Ptr & dbobj,const DbReference & dbref)346 void DbConnection::SetInsertID(const DbObject::Ptr& dbobj, const DbReference& dbref)
347 {
348 SetInsertID(dbobj->GetType(), GetObjectID(dbobj), dbref);
349 }
350
SetInsertID(const DbType::Ptr & type,const DbReference & objid,const DbReference & dbref)351 void DbConnection::SetInsertID(const DbType::Ptr& type, const DbReference& objid, const DbReference& dbref)
352 {
353 if (!objid.IsValid())
354 return;
355
356 if (dbref.IsValid())
357 m_InsertIDs[std::make_pair(type, objid)] = dbref;
358 else
359 m_InsertIDs.erase(std::make_pair(type, objid));
360 }
361
GetInsertID(const DbObject::Ptr & dbobj) const362 DbReference DbConnection::GetInsertID(const DbObject::Ptr& dbobj) const
363 {
364 return GetInsertID(dbobj->GetType(), GetObjectID(dbobj));
365 }
366
GetInsertID(const DbType::Ptr & type,const DbReference & objid) const367 DbReference DbConnection::GetInsertID(const DbType::Ptr& type, const DbReference& objid) const
368 {
369 if (!objid.IsValid())
370 return {};
371
372 auto it = m_InsertIDs.find(std::make_pair(type, objid));
373
374 if (it == m_InsertIDs.end())
375 return DbReference();
376
377 return it->second;
378 }
379
SetObjectActive(const DbObject::Ptr & dbobj,bool active)380 void DbConnection::SetObjectActive(const DbObject::Ptr& dbobj, bool active)
381 {
382 if (active)
383 m_ActiveObjects.insert(dbobj);
384 else
385 m_ActiveObjects.erase(dbobj);
386 }
387
GetObjectActive(const DbObject::Ptr & dbobj) const388 bool DbConnection::GetObjectActive(const DbObject::Ptr& dbobj) const
389 {
390 return (m_ActiveObjects.find(dbobj) != m_ActiveObjects.end());
391 }
392
ClearIDCache()393 void DbConnection::ClearIDCache()
394 {
395 SetIDCacheValid(false);
396
397 m_ObjectIDs.clear();
398 m_InsertIDs.clear();
399 m_ActiveObjects.clear();
400 m_ConfigUpdates.clear();
401 m_StatusUpdates.clear();
402 m_ConfigHashes.clear();
403 }
404
SetConfigUpdate(const DbObject::Ptr & dbobj,bool hasupdate)405 void DbConnection::SetConfigUpdate(const DbObject::Ptr& dbobj, bool hasupdate)
406 {
407 if (hasupdate)
408 m_ConfigUpdates.insert(dbobj);
409 else
410 m_ConfigUpdates.erase(dbobj);
411 }
412
GetConfigUpdate(const DbObject::Ptr & dbobj) const413 bool DbConnection::GetConfigUpdate(const DbObject::Ptr& dbobj) const
414 {
415 return (m_ConfigUpdates.find(dbobj) != m_ConfigUpdates.end());
416 }
417
SetStatusUpdate(const DbObject::Ptr & dbobj,bool hasupdate)418 void DbConnection::SetStatusUpdate(const DbObject::Ptr& dbobj, bool hasupdate)
419 {
420 if (hasupdate)
421 m_StatusUpdates.insert(dbobj);
422 else
423 m_StatusUpdates.erase(dbobj);
424 }
425
GetStatusUpdate(const DbObject::Ptr & dbobj) const426 bool DbConnection::GetStatusUpdate(const DbObject::Ptr& dbobj) const
427 {
428 return (m_StatusUpdates.find(dbobj) != m_StatusUpdates.end());
429 }
430
UpdateObject(const ConfigObject::Ptr & object)431 void DbConnection::UpdateObject(const ConfigObject::Ptr& object)
432 {
433 bool isShuttingDown = Application::IsShuttingDown();
434 bool isRestarting = Application::IsRestarting();
435
436 #ifdef I2_DEBUG
437 if (isShuttingDown || isRestarting) {
438 //Log(LogDebug, "DbConnection")
439 // << "Updating object '" << object->GetName() << "' \t\t active '" << Convert::ToLong(object->IsActive())
440 // << "' shutting down '" << Convert::ToLong(isShuttingDown) << "' restarting '" << Convert::ToLong(isRestarting) << "'.";
441 }
442 #endif /* I2_DEBUG */
443
444 /* Wait until a database connection is established on reconnect. */
445 if (!GetConnected())
446 return;
447
448 /* Don't update inactive objects during shutdown/reload/restart.
449 * They would be marked as deleted. This gets triggered with ConfigObject::StopObjects().
450 * During startup/reconnect this is fine, the handler is not active there.
451 */
452 if (isShuttingDown || isRestarting)
453 return;
454
455 DbObject::Ptr dbobj = DbObject::GetOrCreateByObject(object);
456
457 if (dbobj) {
458 bool dbActive = GetObjectActive(dbobj);
459 bool active = object->IsActive();
460
461 if (active) {
462 if (!dbActive)
463 ActivateObject(dbobj);
464
465 Dictionary::Ptr configFields = dbobj->GetConfigFields();
466 String configHash = dbobj->CalculateConfigHash(configFields);
467 ASSERT(configHash.GetLength() <= 64);
468 configFields->Set("config_hash", configHash);
469
470 String cachedHash = GetConfigHash(dbobj);
471
472 if (cachedHash != configHash) {
473 dbobj->SendConfigUpdateHeavy(configFields);
474 dbobj->SendStatusUpdate();
475 } else {
476 dbobj->SendConfigUpdateLight();
477 }
478 } else if (!active) {
479 /* This may happen on reload/restart actions too
480 * and is blocked above already.
481 *
482 * Deactivate the deleted object no matter
483 * which state it had in the database.
484 */
485 DeactivateObject(dbobj);
486 }
487 }
488 }
489
UpdateAllObjects()490 void DbConnection::UpdateAllObjects()
491 {
492 for (const Type::Ptr& type : Type::GetAllTypes()) {
493 auto *dtype = dynamic_cast<ConfigType *>(type.get());
494
495 if (!dtype)
496 continue;
497
498 for (const ConfigObject::Ptr& object : dtype->GetObjects()) {
499 m_QueryQueue.Enqueue([this, object](){ UpdateObject(object); }, PriorityHigh);
500 }
501 }
502 }
503
PrepareDatabase()504 void DbConnection::PrepareDatabase()
505 {
506 for (const DbType::Ptr& type : DbType::GetAllTypes()) {
507 FillIDCache(type);
508 }
509 }
510
ValidateFailoverTimeout(const Lazy<double> & lvalue,const ValidationUtils & utils)511 void DbConnection::ValidateFailoverTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils)
512 {
513 ObjectImpl<DbConnection>::ValidateFailoverTimeout(lvalue, utils);
514
515 if (lvalue() < 30)
516 BOOST_THROW_EXCEPTION(ValidationError(this, { "failover_timeout" }, "Failover timeout minimum is 30s."));
517 }
518
ValidateCategories(const Lazy<Array::Ptr> & lvalue,const ValidationUtils & utils)519 void DbConnection::ValidateCategories(const Lazy<Array::Ptr>& lvalue, const ValidationUtils& utils)
520 {
521 ObjectImpl<DbConnection>::ValidateCategories(lvalue, utils);
522
523 int filter = FilterArrayToInt(lvalue(), DbQuery::GetCategoryFilterMap(), 0);
524
525 if (filter != DbCatEverything && (filter & ~(DbCatInvalid | DbCatEverything | DbCatConfig | DbCatState |
526 DbCatAcknowledgement | DbCatComment | DbCatDowntime | DbCatEventHandler | DbCatExternalCommand |
527 DbCatFlapping | DbCatLog | DbCatNotification | DbCatProgramStatus | DbCatRetention |
528 DbCatStateHistory)) != 0)
529 BOOST_THROW_EXCEPTION(ValidationError(this, { "categories" }, "categories filter is invalid."));
530 }
531
IncreaseQueryCount()532 void DbConnection::IncreaseQueryCount()
533 {
534 double now = Utility::GetTime();
535
536 std::unique_lock<std::mutex> lock(m_StatsMutex);
537 m_QueryStats.InsertValue(now, 1);
538 }
539
GetQueryCount(RingBuffer::SizeType span)540 int DbConnection::GetQueryCount(RingBuffer::SizeType span)
541 {
542 std::unique_lock<std::mutex> lock(m_StatsMutex);
543 return m_QueryStats.UpdateAndGetValues(Utility::GetTime(), span);
544 }
545
IsIDCacheValid() const546 bool DbConnection::IsIDCacheValid() const
547 {
548 return m_IDCacheValid;
549 }
550
SetIDCacheValid(bool valid)551 void DbConnection::SetIDCacheValid(bool valid)
552 {
553 m_IDCacheValid = valid;
554 }
555
GetSessionToken()556 int DbConnection::GetSessionToken()
557 {
558 return Application::GetStartTime();
559 }
560
IncreasePendingQueries(int count)561 void DbConnection::IncreasePendingQueries(int count)
562 {
563 m_PendingQueries.fetch_add(count);
564 m_InputQueries.InsertValue(Utility::GetTime(), count);
565 }
566
DecreasePendingQueries(int count)567 void DbConnection::DecreasePendingQueries(int count)
568 {
569 m_PendingQueries.fetch_sub(count);
570 m_OutputQueries.InsertValue(Utility::GetTime(), count);
571 }
572