1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #include "db_ido/dbconnection.hpp"
4 #include "db_ido/dbconnection-ti.cpp"
5 #include "db_ido/dbvalue.hpp"
6 #include "icinga/icingaapplication.hpp"
7 #include "icinga/host.hpp"
8 #include "icinga/service.hpp"
9 #include "base/configtype.hpp"
10 #include "base/convert.hpp"
11 #include "base/objectlock.hpp"
12 #include "base/utility.hpp"
13 #include "base/logger.hpp"
14 #include "base/exception.hpp"
15 
16 using namespace icinga;
17 
18 REGISTER_TYPE(DbConnection);
19 
20 Timer::Ptr DbConnection::m_ProgramStatusTimer;
21 boost::once_flag DbConnection::m_OnceFlag = BOOST_ONCE_INIT;
22 
OnConfigLoaded()23 void DbConnection::OnConfigLoaded()
24 {
25 	ConfigObject::OnConfigLoaded();
26 
27 	Value categories = GetCategories();
28 
29 	SetCategoryFilter(FilterArrayToInt(categories, DbQuery::GetCategoryFilterMap(), DbCatEverything));
30 
31 	if (!GetEnableHa()) {
32 		Log(LogDebug, "DbConnection")
33 			<< "HA functionality disabled. Won't pause IDO connection: " << GetName();
34 
35 		SetHAMode(HARunEverywhere);
36 	}
37 
38 	boost::call_once(m_OnceFlag, InitializeDbTimer);
39 }
40 
Start(bool runtimeCreated)41 void DbConnection::Start(bool runtimeCreated)
42 {
43 	ObjectImpl<DbConnection>::Start(runtimeCreated);
44 
45 	Log(LogInformation, "DbConnection")
46 		<< "'" << GetName() << "' started.";
47 
48 	DbObject::OnQuery.connect([this](const DbQuery& query) { ExecuteQuery(query); });
49 	DbObject::OnMultipleQueries.connect([this](const std::vector<DbQuery>& multiQueries) { ExecuteMultipleQueries(multiQueries); });
50 }
51 
Stop(bool runtimeRemoved)52 void DbConnection::Stop(bool runtimeRemoved)
53 {
54 	Log(LogInformation, "DbConnection")
55 		<< "'" << GetName() << "' stopped.";
56 
57 	ObjectImpl<DbConnection>::Stop(runtimeRemoved);
58 }
59 
EnableActiveChangedHandler()60 void DbConnection::EnableActiveChangedHandler()
61 {
62 	if (!m_ActiveChangedHandler) {
63 		ConfigObject::OnActiveChanged.connect([this](const ConfigObject::Ptr& object, const Value&) { UpdateObject(object); });
64 		m_ActiveChangedHandler = true;
65 	}
66 }
67 
Resume()68 void DbConnection::Resume()
69 {
70 	ConfigObject::Resume();
71 
72 	Log(LogInformation, "DbConnection")
73 		<< "Resuming IDO connection: " << GetName();
74 
75 	m_CleanUpTimer = new Timer();
76 	m_CleanUpTimer->SetInterval(60);
77 	m_CleanUpTimer->OnTimerExpired.connect([this](const Timer * const&) { CleanUpHandler(); });
78 	m_CleanUpTimer->Start();
79 
80 	m_LogStatsTimeout = 0;
81 
82 	m_LogStatsTimer = new Timer();
83 	m_LogStatsTimer->SetInterval(10);
84 	m_LogStatsTimer->OnTimerExpired.connect([this](const Timer * const&) { LogStatsHandler(); });
85 	m_LogStatsTimer->Start();
86 }
87 
Pause()88 void DbConnection::Pause()
89 {
90 	Log(LogInformation, "DbConnection")
91 		<< "Pausing IDO connection: " << GetName();
92 
93 	m_CleanUpTimer.reset();
94 
95 	DbQuery query1;
96 	query1.Table = "programstatus";
97 	query1.IdColumn = "programstatus_id";
98 	query1.Type = DbQueryUpdate;
99 	query1.Category = DbCatProgramStatus;
100 	query1.WhereCriteria = new Dictionary({
101 		{ "instance_id", 0 }  /* DbConnection class fills in real ID */
102 	});
103 
104 	query1.Fields = new Dictionary({
105 		{ "instance_id", 0 }, /* DbConnection class fills in real ID */
106 		{ "program_end_time", DbValue::FromTimestamp(Utility::GetTime()) },
107 		{ "is_currently_running", 0 },
108 		{ "process_id", Empty }
109 	});
110 
111 	query1.Priority = PriorityHigh;
112 
113 	ExecuteQuery(query1);
114 
115 	NewTransaction();
116 
117 	m_QueryQueue.Enqueue([this]() { Disconnect(); }, PriorityLow);
118 
119 	/* Work on remaining tasks but never delete the threads, for HA resuming later. */
120 	m_QueryQueue.Join();
121 
122 	ConfigObject::Pause();
123 }
124 
InitializeDbTimer()125 void DbConnection::InitializeDbTimer()
126 {
127 	m_ProgramStatusTimer = new Timer();
128 	m_ProgramStatusTimer->SetInterval(10);
129 	m_ProgramStatusTimer->OnTimerExpired.connect([](const Timer * const&) { UpdateProgramStatus(); });
130 	m_ProgramStatusTimer->Start();
131 }
132 
InsertRuntimeVariable(const String & key,const Value & value)133 void DbConnection::InsertRuntimeVariable(const String& key, const Value& value)
134 {
135 	DbQuery query;
136 	query.Table = "runtimevariables";
137 	query.Type = DbQueryInsert;
138 	query.Category = DbCatProgramStatus;
139 	query.Fields = new Dictionary({
140 		{ "instance_id", 0 }, /* DbConnection class fills in real ID */
141 		{ "varname", key },
142 		{ "varvalue", value }
143 	});
144 	DbObject::OnQuery(query);
145 }
146 
UpdateProgramStatus()147 void DbConnection::UpdateProgramStatus()
148 {
149 	IcingaApplication::Ptr icingaApplication = IcingaApplication::GetInstance();
150 
151 	if (!icingaApplication)
152 		return;
153 
154 	Log(LogNotice, "DbConnection")
155 		<< "Updating programstatus table.";
156 
157 	std::vector<DbQuery> queries;
158 
159 	DbQuery query1;
160 	query1.Type = DbQueryNewTransaction;
161 	query1.Priority = PriorityImmediate;
162 	queries.emplace_back(std::move(query1));
163 
164 	DbQuery query2;
165 	query2.Table = "programstatus";
166 	query2.IdColumn = "programstatus_id";
167 	query2.Type = DbQueryInsert | DbQueryDelete;
168 	query2.Category = DbCatProgramStatus;
169 
170 	query2.Fields = new Dictionary({
171 		{ "instance_id", 0 }, /* DbConnection class fills in real ID */
172 		{ "program_version", Application::GetAppVersion() },
173 		{ "status_update_time", DbValue::FromTimestamp(Utility::GetTime()) },
174 		{ "program_start_time", DbValue::FromTimestamp(Application::GetStartTime()) },
175 		{ "is_currently_running", 1 },
176 		{ "endpoint_name", icingaApplication->GetNodeName() },
177 		{ "process_id", Utility::GetPid() },
178 		{ "daemon_mode", 1 },
179 		{ "last_command_check", DbValue::FromTimestamp(Utility::GetTime()) },
180 		{ "notifications_enabled", (icingaApplication->GetEnableNotifications() ? 1 : 0) },
181 		{ "active_host_checks_enabled", (icingaApplication->GetEnableHostChecks() ? 1 : 0) },
182 		{ "passive_host_checks_enabled", 1 },
183 		{ "active_service_checks_enabled", (icingaApplication->GetEnableServiceChecks() ? 1 : 0) },
184 		{ "passive_service_checks_enabled", 1 },
185 		{ "event_handlers_enabled", (icingaApplication->GetEnableEventHandlers() ? 1 : 0) },
186 		{ "flap_detection_enabled", (icingaApplication->GetEnableFlapping() ? 1 : 0) },
187 		{ "process_performance_data", (icingaApplication->GetEnablePerfdata() ? 1 : 0) }
188 	});
189 
190 	query2.WhereCriteria = new Dictionary({
191 		{ "instance_id", 0 }  /* DbConnection class fills in real ID */
192 	});
193 
194 	queries.emplace_back(std::move(query2));
195 
196 	DbQuery query3;
197 	query3.Type = DbQueryNewTransaction;
198 	queries.emplace_back(std::move(query3));
199 
200 	DbObject::OnMultipleQueries(queries);
201 
202 	DbQuery query4;
203 	query4.Table = "runtimevariables";
204 	query4.Type = DbQueryDelete;
205 	query4.Category = DbCatProgramStatus;
206 	query4.WhereCriteria = new Dictionary({
207 		{ "instance_id", 0 } /* DbConnection class fills in real ID */
208 	});
209 	DbObject::OnQuery(query4);
210 
211 	InsertRuntimeVariable("total_services", ConfigType::Get<Service>()->GetObjectCount());
212 	InsertRuntimeVariable("total_scheduled_services", ConfigType::Get<Service>()->GetObjectCount());
213 	InsertRuntimeVariable("total_hosts", ConfigType::Get<Host>()->GetObjectCount());
214 	InsertRuntimeVariable("total_scheduled_hosts", ConfigType::Get<Host>()->GetObjectCount());
215 }
216 
CleanUpHandler()217 void DbConnection::CleanUpHandler()
218 {
219 	auto now = static_cast<long>(Utility::GetTime());
220 
221 	struct {
222 		String name;
223 		String time_column;
224 	} tables[] = {
225 		{ "acknowledgements", "entry_time" },
226 		{ "commenthistory", "entry_time" },
227 		{ "contactnotifications", "start_time" },
228 		{ "contactnotificationmethods", "start_time" },
229 		{ "downtimehistory", "entry_time" },
230 		{ "eventhandlers", "start_time" },
231 		{ "externalcommands", "entry_time" },
232 		{ "flappinghistory", "event_time" },
233 		{ "hostchecks", "start_time" },
234 		{ "logentries", "logentry_time" },
235 		{ "notifications", "start_time" },
236 		{ "processevents", "event_time" },
237 		{ "statehistory", "state_time" },
238 		{ "servicechecks", "start_time" },
239 		{ "systemcommands", "start_time" }
240 	};
241 
242 	for (auto& table : tables) {
243 		double max_age = GetCleanup()->Get(table.name + "_age");
244 
245 		if (max_age == 0)
246 			continue;
247 
248 		CleanUpExecuteQuery(table.name, table.time_column, now - max_age);
249 		Log(LogNotice, "DbConnection")
250 			<< "Cleanup (" << table.name << "): " << max_age
251 			<< " now: " << now
252 			<< " old: " << now - max_age;
253 	}
254 
255 }
256 
LogStatsHandler()257 void DbConnection::LogStatsHandler()
258 {
259 	if (!GetConnected() || IsPaused())
260 		return;
261 
262 	auto pending = m_PendingQueries.load();
263 
264 	auto now = Utility::GetTime();
265 	bool timeoutReached = m_LogStatsTimeout < now;
266 
267 	if (pending == 0u && !timeoutReached) {
268 		return;
269 	}
270 
271 	auto output = round(m_OutputQueries.CalculateRate(now, 10));
272 
273 	if (pending < output * 5 && !timeoutReached) {
274 		return;
275 	}
276 
277 	auto input = round(m_InputQueries.CalculateRate(now, 10));
278 
279 	Log(LogInformation, GetReflectionType()->GetName())
280 		<< "Pending queries: " << pending << " (Input: " << input
281 		<< "/s; Output: " << output << "/s)";
282 
283 	/* Reschedule next log entry in 5 minutes. */
284 	if (timeoutReached) {
285 		m_LogStatsTimeout = now + 60 * 5;
286 	}
287 }
288 
CleanUpExecuteQuery(const String &,const String &,double)289 void DbConnection::CleanUpExecuteQuery(const String&, const String&, double)
290 {
291 	/* Default handler does nothing. */
292 }
293 
SetConfigHash(const DbObject::Ptr & dbobj,const String & hash)294 void DbConnection::SetConfigHash(const DbObject::Ptr& dbobj, const String& hash)
295 {
296 	SetConfigHash(dbobj->GetType(), GetObjectID(dbobj), hash);
297 }
298 
SetConfigHash(const DbType::Ptr & type,const DbReference & objid,const String & hash)299 void DbConnection::SetConfigHash(const DbType::Ptr& type, const DbReference& objid, const String& hash)
300 {
301 	if (!objid.IsValid())
302 		return;
303 
304 	if (!hash.IsEmpty())
305 		m_ConfigHashes[std::make_pair(type, objid)] = hash;
306 	else
307 		m_ConfigHashes.erase(std::make_pair(type, objid));
308 }
309 
GetConfigHash(const DbObject::Ptr & dbobj) const310 String DbConnection::GetConfigHash(const DbObject::Ptr& dbobj) const
311 {
312 	return GetConfigHash(dbobj->GetType(), GetObjectID(dbobj));
313 }
314 
GetConfigHash(const DbType::Ptr & type,const DbReference & objid) const315 String DbConnection::GetConfigHash(const DbType::Ptr& type, const DbReference& objid) const
316 {
317 	if (!objid.IsValid())
318 		return String();
319 
320 	auto it = m_ConfigHashes.find(std::make_pair(type, objid));
321 
322 	if (it == m_ConfigHashes.end())
323 		return String();
324 
325 	return it->second;
326 }
327 
SetObjectID(const DbObject::Ptr & dbobj,const DbReference & dbref)328 void DbConnection::SetObjectID(const DbObject::Ptr& dbobj, const DbReference& dbref)
329 {
330 	if (dbref.IsValid())
331 		m_ObjectIDs[dbobj] = dbref;
332 	else
333 		m_ObjectIDs.erase(dbobj);
334 }
335 
GetObjectID(const DbObject::Ptr & dbobj) const336 DbReference DbConnection::GetObjectID(const DbObject::Ptr& dbobj) const
337 {
338 	auto it = m_ObjectIDs.find(dbobj);
339 
340 	if (it == m_ObjectIDs.end())
341 		return {};
342 
343 	return it->second;
344 }
345 
SetInsertID(const DbObject::Ptr & dbobj,const DbReference & dbref)346 void DbConnection::SetInsertID(const DbObject::Ptr& dbobj, const DbReference& dbref)
347 {
348 	SetInsertID(dbobj->GetType(), GetObjectID(dbobj), dbref);
349 }
350 
SetInsertID(const DbType::Ptr & type,const DbReference & objid,const DbReference & dbref)351 void DbConnection::SetInsertID(const DbType::Ptr& type, const DbReference& objid, const DbReference& dbref)
352 {
353 	if (!objid.IsValid())
354 		return;
355 
356 	if (dbref.IsValid())
357 		m_InsertIDs[std::make_pair(type, objid)] = dbref;
358 	else
359 		m_InsertIDs.erase(std::make_pair(type, objid));
360 }
361 
GetInsertID(const DbObject::Ptr & dbobj) const362 DbReference DbConnection::GetInsertID(const DbObject::Ptr& dbobj) const
363 {
364 	return GetInsertID(dbobj->GetType(), GetObjectID(dbobj));
365 }
366 
GetInsertID(const DbType::Ptr & type,const DbReference & objid) const367 DbReference DbConnection::GetInsertID(const DbType::Ptr& type, const DbReference& objid) const
368 {
369 	if (!objid.IsValid())
370 		return {};
371 
372 	auto it = m_InsertIDs.find(std::make_pair(type, objid));
373 
374 	if (it == m_InsertIDs.end())
375 		return DbReference();
376 
377 	return it->second;
378 }
379 
SetObjectActive(const DbObject::Ptr & dbobj,bool active)380 void DbConnection::SetObjectActive(const DbObject::Ptr& dbobj, bool active)
381 {
382 	if (active)
383 		m_ActiveObjects.insert(dbobj);
384 	else
385 		m_ActiveObjects.erase(dbobj);
386 }
387 
GetObjectActive(const DbObject::Ptr & dbobj) const388 bool DbConnection::GetObjectActive(const DbObject::Ptr& dbobj) const
389 {
390 	return (m_ActiveObjects.find(dbobj) != m_ActiveObjects.end());
391 }
392 
ClearIDCache()393 void DbConnection::ClearIDCache()
394 {
395 	SetIDCacheValid(false);
396 
397 	m_ObjectIDs.clear();
398 	m_InsertIDs.clear();
399 	m_ActiveObjects.clear();
400 	m_ConfigUpdates.clear();
401 	m_StatusUpdates.clear();
402 	m_ConfigHashes.clear();
403 }
404 
SetConfigUpdate(const DbObject::Ptr & dbobj,bool hasupdate)405 void DbConnection::SetConfigUpdate(const DbObject::Ptr& dbobj, bool hasupdate)
406 {
407 	if (hasupdate)
408 		m_ConfigUpdates.insert(dbobj);
409 	else
410 		m_ConfigUpdates.erase(dbobj);
411 }
412 
GetConfigUpdate(const DbObject::Ptr & dbobj) const413 bool DbConnection::GetConfigUpdate(const DbObject::Ptr& dbobj) const
414 {
415 	return (m_ConfigUpdates.find(dbobj) != m_ConfigUpdates.end());
416 }
417 
SetStatusUpdate(const DbObject::Ptr & dbobj,bool hasupdate)418 void DbConnection::SetStatusUpdate(const DbObject::Ptr& dbobj, bool hasupdate)
419 {
420 	if (hasupdate)
421 		m_StatusUpdates.insert(dbobj);
422 	else
423 		m_StatusUpdates.erase(dbobj);
424 }
425 
GetStatusUpdate(const DbObject::Ptr & dbobj) const426 bool DbConnection::GetStatusUpdate(const DbObject::Ptr& dbobj) const
427 {
428 	return (m_StatusUpdates.find(dbobj) != m_StatusUpdates.end());
429 }
430 
UpdateObject(const ConfigObject::Ptr & object)431 void DbConnection::UpdateObject(const ConfigObject::Ptr& object)
432 {
433 	bool isShuttingDown = Application::IsShuttingDown();
434 	bool isRestarting = Application::IsRestarting();
435 
436 #ifdef I2_DEBUG
437 	if (isShuttingDown || isRestarting) {
438 		//Log(LogDebug, "DbConnection")
439 		//	<< "Updating object '" << object->GetName() << "' \t\t active '" << Convert::ToLong(object->IsActive())
440 		//	<< "' shutting down '" << Convert::ToLong(isShuttingDown) << "' restarting '" << Convert::ToLong(isRestarting) << "'.";
441 	}
442 #endif /* I2_DEBUG */
443 
444 	/* Wait until a database connection is established on reconnect. */
445 	if (!GetConnected())
446 		return;
447 
448 	/* Don't update inactive objects during shutdown/reload/restart.
449 	 * They would be marked as deleted. This gets triggered with ConfigObject::StopObjects().
450 	 * During startup/reconnect this is fine, the handler is not active there.
451 	 */
452 	if (isShuttingDown || isRestarting)
453 		return;
454 
455 	DbObject::Ptr dbobj = DbObject::GetOrCreateByObject(object);
456 
457 	if (dbobj) {
458 		bool dbActive = GetObjectActive(dbobj);
459 		bool active = object->IsActive();
460 
461 		if (active) {
462 			if (!dbActive)
463 				ActivateObject(dbobj);
464 
465 			Dictionary::Ptr configFields = dbobj->GetConfigFields();
466 			String configHash = dbobj->CalculateConfigHash(configFields);
467 			ASSERT(configHash.GetLength() <= 64);
468 			configFields->Set("config_hash", configHash);
469 
470 			String cachedHash = GetConfigHash(dbobj);
471 
472 			if (cachedHash != configHash) {
473 				dbobj->SendConfigUpdateHeavy(configFields);
474 				dbobj->SendStatusUpdate();
475 			} else {
476 				dbobj->SendConfigUpdateLight();
477 			}
478 		} else if (!active) {
479 			/* This may happen on reload/restart actions too
480 			 * and is blocked above already.
481 			 *
482 			 * Deactivate the deleted object no matter
483 			 * which state it had in the database.
484 			 */
485 			DeactivateObject(dbobj);
486 		}
487 	}
488 }
489 
UpdateAllObjects()490 void DbConnection::UpdateAllObjects()
491 {
492 	for (const Type::Ptr& type : Type::GetAllTypes()) {
493 		auto *dtype = dynamic_cast<ConfigType *>(type.get());
494 
495 		if (!dtype)
496 			continue;
497 
498 		for (const ConfigObject::Ptr& object : dtype->GetObjects()) {
499 			m_QueryQueue.Enqueue([this, object](){ UpdateObject(object); }, PriorityHigh);
500 		}
501 	}
502 }
503 
PrepareDatabase()504 void DbConnection::PrepareDatabase()
505 {
506 	for (const DbType::Ptr& type : DbType::GetAllTypes()) {
507 		FillIDCache(type);
508 	}
509 }
510 
ValidateFailoverTimeout(const Lazy<double> & lvalue,const ValidationUtils & utils)511 void DbConnection::ValidateFailoverTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils)
512 {
513 	ObjectImpl<DbConnection>::ValidateFailoverTimeout(lvalue, utils);
514 
515 	if (lvalue() < 30)
516 		BOOST_THROW_EXCEPTION(ValidationError(this, { "failover_timeout" }, "Failover timeout minimum is 30s."));
517 }
518 
ValidateCategories(const Lazy<Array::Ptr> & lvalue,const ValidationUtils & utils)519 void DbConnection::ValidateCategories(const Lazy<Array::Ptr>& lvalue, const ValidationUtils& utils)
520 {
521 	ObjectImpl<DbConnection>::ValidateCategories(lvalue, utils);
522 
523 	int filter = FilterArrayToInt(lvalue(), DbQuery::GetCategoryFilterMap(), 0);
524 
525 	if (filter != DbCatEverything && (filter & ~(DbCatInvalid | DbCatEverything | DbCatConfig | DbCatState |
526 		DbCatAcknowledgement | DbCatComment | DbCatDowntime | DbCatEventHandler | DbCatExternalCommand |
527 		DbCatFlapping | DbCatLog | DbCatNotification | DbCatProgramStatus | DbCatRetention |
528 		DbCatStateHistory)) != 0)
529 		BOOST_THROW_EXCEPTION(ValidationError(this, { "categories" }, "categories filter is invalid."));
530 }
531 
IncreaseQueryCount()532 void DbConnection::IncreaseQueryCount()
533 {
534 	double now = Utility::GetTime();
535 
536 	std::unique_lock<std::mutex> lock(m_StatsMutex);
537 	m_QueryStats.InsertValue(now, 1);
538 }
539 
GetQueryCount(RingBuffer::SizeType span)540 int DbConnection::GetQueryCount(RingBuffer::SizeType span)
541 {
542 	std::unique_lock<std::mutex> lock(m_StatsMutex);
543 	return m_QueryStats.UpdateAndGetValues(Utility::GetTime(), span);
544 }
545 
IsIDCacheValid() const546 bool DbConnection::IsIDCacheValid() const
547 {
548 	return m_IDCacheValid;
549 }
550 
SetIDCacheValid(bool valid)551 void DbConnection::SetIDCacheValid(bool valid)
552 {
553 	m_IDCacheValid = valid;
554 }
555 
GetSessionToken()556 int DbConnection::GetSessionToken()
557 {
558 	return Application::GetStartTime();
559 }
560 
IncreasePendingQueries(int count)561 void DbConnection::IncreasePendingQueries(int count)
562 {
563 	m_PendingQueries.fetch_add(count);
564 	m_InputQueries.InsertValue(Utility::GetTime(), count);
565 }
566 
DecreasePendingQueries(int count)567 void DbConnection::DecreasePendingQueries(int count)
568 {
569 	m_PendingQueries.fetch_sub(count);
570 	m_OutputQueries.InsertValue(Utility::GetTime(), count);
571 }
572