1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #include "db_ido_mysql/idomysqlconnection.hpp"
4 #include "db_ido_mysql/idomysqlconnection-ti.cpp"
5 #include "db_ido/dbtype.hpp"
6 #include "db_ido/dbvalue.hpp"
7 #include "base/logger.hpp"
8 #include "base/objectlock.hpp"
9 #include "base/convert.hpp"
10 #include "base/utility.hpp"
11 #include "base/perfdatavalue.hpp"
12 #include "base/application.hpp"
13 #include "base/configtype.hpp"
14 #include "base/exception.hpp"
15 #include "base/statsfunction.hpp"
16 #include "base/defer.hpp"
17 #include <utility>
18 
19 using namespace icinga;
20 
21 REGISTER_TYPE(IdoMysqlConnection);
22 REGISTER_STATSFUNCTION(IdoMysqlConnection, &IdoMysqlConnection::StatsFunc);
23 
GetLatestSchemaVersion() const24 const char * IdoMysqlConnection::GetLatestSchemaVersion() const noexcept
25 {
26 	return "1.15.1";
27 }
28 
GetCompatSchemaVersion() const29 const char * IdoMysqlConnection::GetCompatSchemaVersion() const noexcept
30 {
31 	return "1.14.3";
32 }
33 
OnConfigLoaded()34 void IdoMysqlConnection::OnConfigLoaded()
35 {
36 	ObjectImpl<IdoMysqlConnection>::OnConfigLoaded();
37 
38 	m_QueryQueue.SetName("IdoMysqlConnection, " + GetName());
39 
40 	Library shimLibrary{"mysql_shim"};
41 
42 	auto create_mysql_shim = shimLibrary.GetSymbolAddress<create_mysql_shim_ptr>("create_mysql_shim");
43 
44 	m_Mysql.reset(create_mysql_shim());
45 
46 	std::swap(m_Library, shimLibrary);
47 }
48 
StatsFunc(const Dictionary::Ptr & status,const Array::Ptr & perfdata)49 void IdoMysqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
50 {
51 	DictionaryData nodes;
52 
53 	for (const IdoMysqlConnection::Ptr& idomysqlconnection : ConfigType::GetObjectsByType<IdoMysqlConnection>()) {
54 		size_t queryQueueItems = idomysqlconnection->m_QueryQueue.GetLength();
55 		double queryQueueItemRate = idomysqlconnection->m_QueryQueue.GetTaskCount(60) / 60.0;
56 
57 		nodes.emplace_back(idomysqlconnection->GetName(), new Dictionary({
58 			{ "version", idomysqlconnection->GetSchemaVersion() },
59 			{ "instance_name", idomysqlconnection->GetInstanceName() },
60 			{ "connected", idomysqlconnection->GetConnected() },
61 			{ "query_queue_items", queryQueueItems },
62 			{ "query_queue_item_rate", queryQueueItemRate }
63 		}));
64 
65 		perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_rate", idomysqlconnection->GetQueryCount(60) / 60.0));
66 		perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_1min", idomysqlconnection->GetQueryCount(60)));
67 		perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_5mins", idomysqlconnection->GetQueryCount(5 * 60)));
68 		perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_15mins", idomysqlconnection->GetQueryCount(15 * 60)));
69 		perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_query_queue_items", queryQueueItems));
70 		perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_query_queue_item_rate", queryQueueItemRate));
71 	}
72 
73 	status->Set("idomysqlconnection", new Dictionary(std::move(nodes)));
74 }
75 
Resume()76 void IdoMysqlConnection::Resume()
77 {
78 	Log(LogInformation, "IdoMysqlConnection")
79 		<< "'" << GetName() << "' resumed.";
80 
81 	SetConnected(false);
82 
83 	m_QueryQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
84 
85 	/* Immediately try to connect on Resume() without timer. */
86 	m_QueryQueue.Enqueue([this]() { Reconnect(); }, PriorityImmediate);
87 
88 	m_TxTimer = new Timer();
89 	m_TxTimer->SetInterval(1);
90 	m_TxTimer->OnTimerExpired.connect([this](const Timer * const&) { NewTransaction(); });
91 	m_TxTimer->Start();
92 
93 	m_ReconnectTimer = new Timer();
94 	m_ReconnectTimer->SetInterval(10);
95 	m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&){ ReconnectTimerHandler(); });
96 	m_ReconnectTimer->Start();
97 
98 	/* Start with queries after connect. */
99 	DbConnection::Resume();
100 
101 	ASSERT(m_Mysql->thread_safe());
102 }
103 
Pause()104 void IdoMysqlConnection::Pause()
105 {
106 	Log(LogDebug, "IdoMysqlConnection")
107 		<< "Attempting to pause '" << GetName() << "'.";
108 
109 	DbConnection::Pause();
110 
111 	m_ReconnectTimer.reset();
112 
113 #ifdef I2_DEBUG /* I2_DEBUG */
114 	Log(LogDebug, "IdoMysqlConnection")
115 		<< "Rescheduling disconnect task.";
116 #endif /* I2_DEBUG */
117 
118 	Log(LogInformation, "IdoMysqlConnection")
119 		<< "'" << GetName() << "' paused.";
120 
121 }
122 
ExceptionHandler(boost::exception_ptr exp)123 void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp)
124 {
125 	Log(LogCritical, "IdoMysqlConnection", "Exception during database operation: Verify that your database is operational!");
126 
127 	Log(LogDebug, "IdoMysqlConnection")
128 		<< "Exception during database operation: " << DiagnosticInformation(std::move(exp));
129 
130 	if (GetConnected()) {
131 		m_Mysql->close(&m_Connection);
132 
133 		SetConnected(false);
134 	}
135 }
136 
AssertOnWorkQueue()137 void IdoMysqlConnection::AssertOnWorkQueue()
138 {
139 	ASSERT(m_QueryQueue.IsWorkerThread());
140 }
141 
Disconnect()142 void IdoMysqlConnection::Disconnect()
143 {
144 	AssertOnWorkQueue();
145 
146 	if (!GetConnected())
147 		return;
148 
149 	Query("COMMIT");
150 	m_Mysql->close(&m_Connection);
151 
152 	SetConnected(false);
153 
154 	Log(LogInformation, "IdoMysqlConnection")
155 		<< "Disconnected from '" << GetName() << "' database '" << GetDatabase() << "'.";
156 }
157 
NewTransaction()158 void IdoMysqlConnection::NewTransaction()
159 {
160 	if (IsPaused() && GetPauseCalled())
161 		return;
162 
163 #ifdef I2_DEBUG /* I2_DEBUG */
164 	Log(LogDebug, "IdoMysqlConnection")
165 		<< "Scheduling new transaction and finishing async queries.";
166 #endif /* I2_DEBUG */
167 
168 	m_QueryQueue.Enqueue([this]() { InternalNewTransaction(); }, PriorityHigh);
169 }
170 
InternalNewTransaction()171 void IdoMysqlConnection::InternalNewTransaction()
172 {
173 	AssertOnWorkQueue();
174 
175 	if (!GetConnected())
176 		return;
177 
178 	IncreasePendingQueries(2);
179 
180 	AsyncQuery("COMMIT");
181 	AsyncQuery("BEGIN");
182 
183 	FinishAsyncQueries();
184 }
185 
ReconnectTimerHandler()186 void IdoMysqlConnection::ReconnectTimerHandler()
187 {
188 #ifdef I2_DEBUG /* I2_DEBUG */
189 	Log(LogDebug, "IdoMysqlConnection")
190 		<< "Scheduling reconnect task.";
191 #endif /* I2_DEBUG */
192 
193 	/* Only allow Reconnect events with high priority. */
194 	m_QueryQueue.Enqueue([this]() { Reconnect(); }, PriorityImmediate);
195 }
196 
Reconnect()197 void IdoMysqlConnection::Reconnect()
198 {
199 	AssertOnWorkQueue();
200 
201 	if (!IsActive())
202 		return;
203 
204 	CONTEXT("Reconnecting to MySQL IDO database '" + GetName() + "'");
205 
206 	double startTime = Utility::GetTime();
207 
208 	SetShouldConnect(true);
209 
210 	bool reconnect = false;
211 
212 	/* Ensure to close old connections first. */
213 	if (GetConnected()) {
214 		/* Check if we're really still connected */
215 		if (m_Mysql->ping(&m_Connection) == 0)
216 			return;
217 
218 		m_Mysql->close(&m_Connection);
219 		SetConnected(false);
220 		reconnect = true;
221 	}
222 
223 	Log(LogDebug, "IdoMysqlConnection")
224 		<< "Reconnect: Clearing ID cache.";
225 
226 	ClearIDCache();
227 
228 	String ihost, isocket_path, iuser, ipasswd, idb;
229 	String isslKey, isslCert, isslCa, isslCaPath, isslCipher;
230 	const char *host, *socket_path, *user , *passwd, *db;
231 	const char *sslKey, *sslCert, *sslCa, *sslCaPath, *sslCipher;
232 	bool enableSsl;
233 	long port;
234 
235 	ihost = GetHost();
236 	isocket_path = GetSocketPath();
237 	iuser = GetUser();
238 	ipasswd = GetPassword();
239 	idb = GetDatabase();
240 
241 	enableSsl = GetEnableSsl();
242 	isslKey = GetSslKey();
243 	isslCert = GetSslCert();
244 	isslCa = GetSslCa();
245 	isslCaPath = GetSslCapath();
246 	isslCipher = GetSslCipher();
247 
248 	host = (!ihost.IsEmpty()) ? ihost.CStr() : nullptr;
249 	port = GetPort();
250 	socket_path = (!isocket_path.IsEmpty()) ? isocket_path.CStr() : nullptr;
251 	user = (!iuser.IsEmpty()) ? iuser.CStr() : nullptr;
252 	passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : nullptr;
253 	db = (!idb.IsEmpty()) ? idb.CStr() : nullptr;
254 
255 	sslKey = (!isslKey.IsEmpty()) ? isslKey.CStr() : nullptr;
256 	sslCert = (!isslCert.IsEmpty()) ? isslCert.CStr() : nullptr;
257 	sslCa = (!isslCa.IsEmpty()) ? isslCa.CStr() : nullptr;
258 	sslCaPath = (!isslCaPath.IsEmpty()) ? isslCaPath.CStr() : nullptr;
259 	sslCipher = (!isslCipher.IsEmpty()) ? isslCipher.CStr() : nullptr;
260 
261 	/* connection */
262 	if (!m_Mysql->init(&m_Connection)) {
263 		Log(LogCritical, "IdoMysqlConnection")
264 			<< "mysql_init() failed: out of memory";
265 
266 		BOOST_THROW_EXCEPTION(std::bad_alloc());
267 	}
268 
269 	if (enableSsl)
270 		m_Mysql->ssl_set(&m_Connection, sslKey, sslCert, sslCa, sslCaPath, sslCipher);
271 
272 	if (!m_Mysql->real_connect(&m_Connection, host, user, passwd, db, port, socket_path, CLIENT_FOUND_ROWS | CLIENT_MULTI_STATEMENTS)) {
273 		Log(LogCritical, "IdoMysqlConnection")
274 			<< "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port
275 			<< "' " << (enableSsl ? "(SSL enabled) " : "") << "failed: \"" << m_Mysql->error(&m_Connection) << "\"";
276 
277 		BOOST_THROW_EXCEPTION(std::runtime_error(m_Mysql->error(&m_Connection)));
278 	}
279 
280 	Log(LogNotice, "IdoMysqlConnection")
281 		<< "Reconnect: '" << GetName() << "' is now connected to database '" << GetDatabase() << "'.";
282 
283 	SetConnected(true);
284 
285 	IdoMysqlResult result = Query("SELECT @@global.max_allowed_packet AS max_allowed_packet");
286 
287 	Dictionary::Ptr row = FetchRow(result);
288 
289 	if (row)
290 		m_MaxPacketSize = row->Get("max_allowed_packet");
291 	else
292 		m_MaxPacketSize = 64 * 1024;
293 
294 	DiscardRows(result);
295 
296 	String dbVersionName = "idoutils";
297 	result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
298 
299 	row = FetchRow(result);
300 
301 	if (!row) {
302 		m_Mysql->close(&m_Connection);
303 		SetConnected(false);
304 
305 		Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
306 
307 		BOOST_THROW_EXCEPTION(std::runtime_error("Invalid schema."));
308 	}
309 
310 	DiscardRows(result);
311 
312 	String version = row->Get("version");
313 
314 	SetSchemaVersion(version);
315 
316 	if (Utility::CompareVersion(GetCompatSchemaVersion(), version) < 0) {
317 		m_Mysql->close(&m_Connection);
318 		SetConnected(false);
319 
320 		Log(LogCritical, "IdoMysqlConnection")
321 			<< "Schema version '" << version << "' does not match the required version '"
322 			<< GetCompatSchemaVersion() << "' (or newer)! Please check the upgrade documentation at "
323 			<< "https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/#upgrading-mysql-db";
324 
325 		BOOST_THROW_EXCEPTION(std::runtime_error("Schema version mismatch."));
326 	}
327 
328 	String instanceName = GetInstanceName();
329 
330 	result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
331 	row = FetchRow(result);
332 
333 	if (!row) {
334 		Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
335 		m_InstanceID = GetLastInsertID();
336 	} else {
337 		m_InstanceID = DbReference(row->Get("instance_id"));
338 	}
339 
340 	DiscardRows(result);
341 
342 	Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
343 
344 	/* we have an endpoint in a cluster setup, so decide if we can proceed here */
345 	if (my_endpoint && GetHAMode() == HARunOnce) {
346 		/* get the current endpoint writing to programstatus table */
347 		result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
348 			GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
349 		row = FetchRow(result);
350 		DiscardRows(result);
351 
352 		String endpoint_name;
353 
354 		if (row)
355 			endpoint_name = row->Get("endpoint_name");
356 		else
357 			Log(LogNotice, "IdoMysqlConnection", "Empty program status table");
358 
359 		/* if we did not write into the database earlier, another instance is active */
360 		if (endpoint_name != my_endpoint->GetName()) {
361 			double status_update_time;
362 
363 			if (row)
364 				status_update_time = row->Get("status_update_time");
365 			else
366 				status_update_time = 0;
367 
368 			double now = Utility::GetTime();
369 
370 			double status_update_age = now - status_update_time;
371 			double failoverTimeout = GetFailoverTimeout();
372 
373 			if (status_update_age < failoverTimeout) {
374 				Log(LogInformation, "IdoMysqlConnection")
375 					<< "Last update by endpoint '" << endpoint_name << "' was "
376 					<< status_update_age << "s ago (< failover timeout of " << failoverTimeout << "s). Retrying.";
377 
378 				m_Mysql->close(&m_Connection);
379 				SetConnected(false);
380 				SetShouldConnect(false);
381 
382 				return;
383 			}
384 
385 			/* activate the IDO only, if we're authoritative in this zone */
386 			if (IsPaused()) {
387 				Log(LogNotice, "IdoMysqlConnection")
388 					<< "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out.";
389 
390 				m_Mysql->close(&m_Connection);
391 				SetConnected(false);
392 
393 				return;
394 			}
395 
396 			SetLastFailover(now);
397 
398 			Log(LogInformation, "IdoMysqlConnection")
399 				<< "Last update by endpoint '" << endpoint_name << "' was "
400 				<< status_update_age << "s ago. Taking over '" << GetName() << "' in HA zone '" << Zone::GetLocalZone()->GetName() << "'.";
401 		}
402 
403 		Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection in HA zone.");
404 	}
405 
406 	Log(LogInformation, "IdoMysqlConnection")
407 		<< "MySQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
408 
409 	/* set session time zone to utc */
410 	Query("SET SESSION TIME_ZONE='+00:00'");
411 
412 	Query("SET SESSION SQL_MODE='NO_AUTO_VALUE_ON_ZERO'");
413 
414 	Query("BEGIN");
415 
416 	/* update programstatus table */
417 	UpdateProgramStatus();
418 
419 	/* record connection */
420 	Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
421 		"(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
422 		+ Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_mysql', '" + Escape(Application::GetAppVersion())
423 		+ "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
424 
425 	/* clear config tables for the initial config dump */
426 	PrepareDatabase();
427 
428 	std::ostringstream q1buf;
429 	q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
430 	result = Query(q1buf.str());
431 
432 	std::vector<DbObject::Ptr> activeDbObjs;
433 
434 	while ((row = FetchRow(result))) {
435 		DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
436 
437 		if (!dbtype)
438 			continue;
439 
440 		DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
441 		SetObjectID(dbobj, DbReference(row->Get("object_id")));
442 		bool active = row->Get("is_active");
443 		SetObjectActive(dbobj, active);
444 
445 		if (active)
446 			activeDbObjs.emplace_back(std::move(dbobj));
447 	}
448 
449 	SetIDCacheValid(true);
450 
451 	EnableActiveChangedHandler();
452 
453 	for (const DbObject::Ptr& dbobj : activeDbObjs) {
454 		if (dbobj->GetObject())
455 			continue;
456 
457 		Log(LogNotice, "IdoMysqlConnection")
458 			<< "Deactivate deleted object name1: '" << dbobj->GetName1()
459 			<< "' name2: '" << dbobj->GetName2() + "'.";
460 		DeactivateObject(dbobj);
461 	}
462 
463 	UpdateAllObjects();
464 
465 #ifdef I2_DEBUG /* I2_DEBUG */
466 	Log(LogDebug, "IdoMysqlConnection")
467 		<< "Scheduling session table clear and finish connect task.";
468 #endif /* I2_DEBUG */
469 
470 	m_QueryQueue.Enqueue([this]() { ClearTablesBySession(); }, PriorityNormal);
471 
472 	m_QueryQueue.Enqueue([this, startTime]() { FinishConnect(startTime); }, PriorityNormal);
473 }
474 
FinishConnect(double startTime)475 void IdoMysqlConnection::FinishConnect(double startTime)
476 {
477 	AssertOnWorkQueue();
478 
479 	if (!GetConnected() || IsPaused())
480 		return;
481 
482 	FinishAsyncQueries();
483 
484 	Log(LogInformation, "IdoMysqlConnection")
485 		<< "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in "
486 		<< std::setw(2) << Utility::GetTime() - startTime << " second(s).";
487 
488 	Query("COMMIT");
489 	Query("BEGIN");
490 }
491 
ClearTablesBySession()492 void IdoMysqlConnection::ClearTablesBySession()
493 {
494 	/* delete all comments and downtimes without current session token */
495 	ClearTableBySession("comments");
496 	ClearTableBySession("scheduleddowntime");
497 }
498 
ClearTableBySession(const String & table)499 void IdoMysqlConnection::ClearTableBySession(const String& table)
500 {
501 	Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
502 		Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " +
503 		Convert::ToString(GetSessionToken()));
504 }
505 
AsyncQuery(const String & query,const std::function<void (const IdoMysqlResult &)> & callback)506 void IdoMysqlConnection::AsyncQuery(const String& query, const std::function<void (const IdoMysqlResult&)>& callback)
507 {
508 	AssertOnWorkQueue();
509 
510 	IdoAsyncQuery aq;
511 	aq.Query = query;
512 	/* XXX: Important: The callback must not immediately execute a query, but enqueue it!
513 	 * See https://github.com/Icinga/icinga2/issues/4603 for details.
514 	 */
515 	aq.Callback = callback;
516 	m_AsyncQueries.emplace_back(std::move(aq));
517 }
518 
FinishAsyncQueries()519 void IdoMysqlConnection::FinishAsyncQueries()
520 {
521 	std::vector<IdoAsyncQuery> queries;
522 	m_AsyncQueries.swap(queries);
523 
524 	std::vector<IdoAsyncQuery>::size_type offset = 0;
525 
526 	// This will be executed if there is a problem with executing the queries,
527 	// at which point this function throws an exception and the queries should
528 	// not be listed as still pending in the queue.
529 	Defer decreaseQueries ([this, &offset, &queries]() {
530 		auto lostQueries = queries.size() - offset;
531 
532 		if (lostQueries > 0) {
533 			DecreasePendingQueries(lostQueries);
534 		}
535 	});
536 
537 	while (offset < queries.size()) {
538 		std::ostringstream querybuf;
539 
540 		std::vector<IdoAsyncQuery>::size_type count = 0;
541 		size_t num_bytes = 0;
542 
543 		Defer decreaseQueries ([this, &offset, &count]() {
544 			offset += count;
545 			DecreasePendingQueries(count);
546 			m_UncommittedAsyncQueries += count;
547 		});
548 
549 		for (std::vector<IdoAsyncQuery>::size_type i = offset; i < queries.size(); i++) {
550 			const IdoAsyncQuery& aq = queries[i];
551 
552 			size_t size_query = aq.Query.GetLength() + 1;
553 
554 			if (count > 0) {
555 				if (num_bytes + size_query > m_MaxPacketSize - 512)
556 					break;
557 
558 				querybuf << ";";
559 			}
560 
561 			IncreaseQueryCount();
562 			count++;
563 
564 			Log(LogDebug, "IdoMysqlConnection")
565 				<< "Query: " << aq.Query;
566 
567 			querybuf << aq.Query;
568 			num_bytes += size_query;
569 		}
570 
571 		String query = querybuf.str();
572 
573 		if (m_Mysql->query(&m_Connection, query.CStr()) != 0) {
574 			std::ostringstream msgbuf;
575 			String message = m_Mysql->error(&m_Connection);
576 			msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
577 			Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
578 
579 			BOOST_THROW_EXCEPTION(
580 				database_error()
581 				<< errinfo_message(m_Mysql->error(&m_Connection))
582 				<< errinfo_database_query(query)
583 			);
584 		}
585 
586 		for (std::vector<IdoAsyncQuery>::size_type i = offset; i < offset + count; i++) {
587 			const IdoAsyncQuery& aq = queries[i];
588 
589 			MYSQL_RES *result = m_Mysql->store_result(&m_Connection);
590 
591 			m_AffectedRows = m_Mysql->affected_rows(&m_Connection);
592 
593 			IdoMysqlResult iresult;
594 
595 			if (!result) {
596 				if (m_Mysql->field_count(&m_Connection) > 0) {
597 					std::ostringstream msgbuf;
598 					String message = m_Mysql->error(&m_Connection);
599 					msgbuf << "Error \"" << message << "\" when executing query \"" << aq.Query << "\"";
600 					Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
601 
602 					BOOST_THROW_EXCEPTION(
603 						database_error()
604 						<< errinfo_message(m_Mysql->error(&m_Connection))
605 						<< errinfo_database_query(query)
606 					);
607 				}
608 			} else
609 				iresult = IdoMysqlResult(result, [this](MYSQL_RES* result) { m_Mysql->free_result(result); });
610 
611 			if (aq.Callback)
612 				aq.Callback(iresult);
613 
614 			if (m_Mysql->next_result(&m_Connection) > 0) {
615 				std::ostringstream msgbuf;
616 				String message = m_Mysql->error(&m_Connection);
617 				msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
618 				Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
619 
620 				BOOST_THROW_EXCEPTION(
621 					database_error()
622 					<< errinfo_message(m_Mysql->error(&m_Connection))
623 					<< errinfo_database_query(query)
624 				);
625 			}
626 		}
627 	}
628 
629 	if (m_UncommittedAsyncQueries > 25000) {
630 		m_UncommittedAsyncQueries = 0;
631 
632 		Query("COMMIT");
633 		Query("BEGIN");
634 	}
635 }
636 
Query(const String & query)637 IdoMysqlResult IdoMysqlConnection::Query(const String& query)
638 {
639 	AssertOnWorkQueue();
640 
641 	IncreasePendingQueries(1);
642 	Defer decreaseQueries ([this]() { DecreasePendingQueries(1); });
643 
644 	/* finish all async queries to maintain the right order for queries */
645 	FinishAsyncQueries();
646 
647 	Log(LogDebug, "IdoMysqlConnection")
648 		<< "Query: " << query;
649 
650 	IncreaseQueryCount();
651 
652 	if (m_Mysql->query(&m_Connection, query.CStr()) != 0) {
653 		std::ostringstream msgbuf;
654 		String message = m_Mysql->error(&m_Connection);
655 		msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
656 		Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
657 
658 		BOOST_THROW_EXCEPTION(
659 			database_error()
660 			<< errinfo_message(m_Mysql->error(&m_Connection))
661 			<< errinfo_database_query(query)
662 		);
663 	}
664 
665 	MYSQL_RES *result = m_Mysql->store_result(&m_Connection);
666 
667 	m_AffectedRows = m_Mysql->affected_rows(&m_Connection);
668 
669 	if (!result) {
670 		if (m_Mysql->field_count(&m_Connection) > 0) {
671 			std::ostringstream msgbuf;
672 			String message = m_Mysql->error(&m_Connection);
673 			msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
674 			Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
675 
676 			BOOST_THROW_EXCEPTION(
677 				database_error()
678 				<< errinfo_message(m_Mysql->error(&m_Connection))
679 				<< errinfo_database_query(query)
680 			);
681 		}
682 
683 		return IdoMysqlResult();
684 	}
685 
686 	return IdoMysqlResult(result, [this](MYSQL_RES* result) { m_Mysql->free_result(result); });
687 }
688 
GetLastInsertID()689 DbReference IdoMysqlConnection::GetLastInsertID()
690 {
691 	AssertOnWorkQueue();
692 
693 	return {static_cast<long>(m_Mysql->insert_id(&m_Connection))};
694 }
695 
GetAffectedRows()696 int IdoMysqlConnection::GetAffectedRows()
697 {
698 	AssertOnWorkQueue();
699 
700 	return m_AffectedRows;
701 }
702 
Escape(const String & s)703 String IdoMysqlConnection::Escape(const String& s)
704 {
705 	AssertOnWorkQueue();
706 
707 	String utf8s = Utility::ValidateUTF8(s);
708 
709 	size_t length = utf8s.GetLength();
710 	auto *to = new char[utf8s.GetLength() * 2 + 1];
711 
712 	m_Mysql->real_escape_string(&m_Connection, to, utf8s.CStr(), length);
713 
714 	String result = String(to);
715 
716 	delete [] to;
717 
718 	return result;
719 }
720 
FetchRow(const IdoMysqlResult & result)721 Dictionary::Ptr IdoMysqlConnection::FetchRow(const IdoMysqlResult& result)
722 {
723 	AssertOnWorkQueue();
724 
725 	MYSQL_ROW row;
726 	MYSQL_FIELD *field;
727 	unsigned long *lengths, i;
728 
729 	row = m_Mysql->fetch_row(result.get());
730 
731 	if (!row)
732 		return nullptr;
733 
734 	lengths = m_Mysql->fetch_lengths(result.get());
735 
736 	if (!lengths)
737 		return nullptr;
738 
739 	Dictionary::Ptr dict = new Dictionary();
740 
741 	m_Mysql->field_seek(result.get(), 0);
742 	for (field = m_Mysql->fetch_field(result.get()), i = 0; field; field = m_Mysql->fetch_field(result.get()), i++)
743 		dict->Set(field->name, String(row[i], row[i] + lengths[i]));
744 
745 	return dict;
746 }
747 
DiscardRows(const IdoMysqlResult & result)748 void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result)
749 {
750 	Dictionary::Ptr row;
751 
752 	while ((row = FetchRow(result)))
753 		; /* empty loop body */
754 }
755 
ActivateObject(const DbObject::Ptr & dbobj)756 void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
757 {
758 	if (IsPaused())
759 		return;
760 
761 #ifdef I2_DEBUG /* I2_DEBUG */
762 	Log(LogDebug, "IdoMysqlConnection")
763 		<< "Scheduling object activation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
764 #endif /* I2_DEBUG */
765 
766 	m_QueryQueue.Enqueue([this, dbobj]() { InternalActivateObject(dbobj); }, PriorityNormal);
767 }
768 
InternalActivateObject(const DbObject::Ptr & dbobj)769 void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
770 {
771 	AssertOnWorkQueue();
772 
773 	if (IsPaused())
774 		return;
775 
776 	if (!GetConnected())
777 		return;
778 
779 	DbReference dbref = GetObjectID(dbobj);
780 	std::ostringstream qbuf;
781 
782 	if (!dbref.IsValid()) {
783 		if (!dbobj->GetName2().IsEmpty()) {
784 			qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("
785 				<< static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
786 				<< "'" << Escape(dbobj->GetName1()) << "', '" << Escape(dbobj->GetName2()) << "', 1)";
787 		} else {
788 			qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, is_active) VALUES ("
789 				<< static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
790 				<< "'" << Escape(dbobj->GetName1()) << "', 1)";
791 		}
792 
793 		Query(qbuf.str());
794 		SetObjectID(dbobj, GetLastInsertID());
795 	} else {
796 		qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
797 		IncreasePendingQueries(1);
798 		AsyncQuery(qbuf.str());
799 	}
800 }
801 
DeactivateObject(const DbObject::Ptr & dbobj)802 void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
803 {
804 	if (IsPaused())
805 		return;
806 
807 #ifdef I2_DEBUG /* I2_DEBUG */
808 	Log(LogDebug, "IdoMysqlConnection")
809 		<< "Scheduling object deactivation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
810 #endif /* I2_DEBUG */
811 
812 	m_QueryQueue.Enqueue([this, dbobj]() { InternalDeactivateObject(dbobj); }, PriorityNormal);
813 }
814 
InternalDeactivateObject(const DbObject::Ptr & dbobj)815 void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
816 {
817 	AssertOnWorkQueue();
818 
819 	if (IsPaused())
820 		return;
821 
822 	if (!GetConnected())
823 		return;
824 
825 	DbReference dbref = GetObjectID(dbobj);
826 
827 	if (!dbref.IsValid())
828 		return;
829 
830 	std::ostringstream qbuf;
831 	qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
832 	IncreasePendingQueries(1);
833 	AsyncQuery(qbuf.str());
834 
835 	/* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
836 	 * because the object is still in the database. */
837 
838 	SetObjectActive(dbobj, false);
839 }
840 
FieldToEscapedString(const String & key,const Value & value,Value * result)841 bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
842 {
843 	if (key == "instance_id") {
844 		*result = static_cast<long>(m_InstanceID);
845 		return true;
846 	} else if (key == "session_token") {
847 		*result = GetSessionToken();
848 		return true;
849 	}
850 
851 	Value rawvalue = DbValue::ExtractValue(value);
852 
853 	if (rawvalue.GetType() == ValueEmpty) {
854 		*result = "NULL";
855 	} else if (rawvalue.IsObjectType<ConfigObject>()) {
856 		DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(rawvalue);
857 
858 		if (!dbobjcol) {
859 			*result = 0;
860 			return true;
861 		}
862 
863 		if (!IsIDCacheValid())
864 			return false;
865 
866 		DbReference dbrefcol;
867 
868 		if (DbValue::IsObjectInsertID(value)) {
869 			dbrefcol = GetInsertID(dbobjcol);
870 
871 			if (!dbrefcol.IsValid())
872 				return false;
873 		} else {
874 			dbrefcol = GetObjectID(dbobjcol);
875 
876 			if (!dbrefcol.IsValid()) {
877 				InternalActivateObject(dbobjcol);
878 
879 				dbrefcol = GetObjectID(dbobjcol);
880 
881 				if (!dbrefcol.IsValid())
882 					return false;
883 			}
884 		}
885 
886 		*result = static_cast<long>(dbrefcol);
887 	} else if (DbValue::IsTimestamp(value)) {
888 		long ts = rawvalue;
889 		std::ostringstream msgbuf;
890 		msgbuf << "FROM_UNIXTIME(" << ts << ")";
891 		*result = Value(msgbuf.str());
892 	} else if (DbValue::IsObjectInsertID(value)) {
893 		auto id = static_cast<long>(rawvalue);
894 
895 		if (id <= 0)
896 			return false;
897 
898 		*result = id;
899 		return true;
900 	} else {
901 		Value fvalue;
902 
903 		if (rawvalue.IsBoolean())
904 			fvalue = Convert::ToLong(rawvalue);
905 		else
906 			fvalue = rawvalue;
907 
908 		*result = "'" + Escape(fvalue) + "'";
909 	}
910 
911 	return true;
912 }
913 
ExecuteQuery(const DbQuery & query)914 void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
915 {
916 	if (IsPaused() && GetPauseCalled())
917 		return;
918 
919 	ASSERT(query.Category != DbCatInvalid);
920 
921 #ifdef I2_DEBUG /* I2_DEBUG */
922 	Log(LogDebug, "IdoMysqlConnection")
923 		<< "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'.";
924 #endif /* I2_DEBUG */
925 
926 	IncreasePendingQueries(1);
927 	m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority, true);
928 }
929 
ExecuteMultipleQueries(const std::vector<DbQuery> & queries)930 void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
931 {
932 	if (IsPaused())
933 		return;
934 
935 	if (queries.empty())
936 		return;
937 
938 #ifdef I2_DEBUG /* I2_DEBUG */
939 	Log(LogDebug, "IdoMysqlConnection")
940 		<< "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'.";
941 #endif /* I2_DEBUG */
942 
943 	IncreasePendingQueries(queries.size());
944 	m_QueryQueue.Enqueue([this, queries]() { InternalExecuteMultipleQueries(queries); }, queries[0].Priority, true);
945 }
946 
CanExecuteQuery(const DbQuery & query)947 bool IdoMysqlConnection::CanExecuteQuery(const DbQuery& query)
948 {
949 	if (query.Object && !IsIDCacheValid())
950 		return false;
951 
952 	if (query.WhereCriteria) {
953 		ObjectLock olock(query.WhereCriteria);
954 		Value value;
955 
956 		for (const Dictionary::Pair& kv : query.WhereCriteria) {
957 			if (!FieldToEscapedString(kv.first, kv.second, &value))
958 				return false;
959 		}
960 	}
961 
962 	if (query.Fields) {
963 		ObjectLock olock(query.Fields);
964 
965 		for (const Dictionary::Pair& kv : query.Fields) {
966 			Value value;
967 
968 			if (!FieldToEscapedString(kv.first, kv.second, &value))
969 				return false;
970 		}
971 	}
972 
973 	return true;
974 }
975 
InternalExecuteMultipleQueries(const std::vector<DbQuery> & queries)976 void IdoMysqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuery>& queries)
977 {
978 	AssertOnWorkQueue();
979 
980 	if (IsPaused()) {
981 		DecreasePendingQueries(queries.size());
982 		return;
983 	}
984 
985 	if (!GetConnected()) {
986 		DecreasePendingQueries(queries.size());
987 		return;
988 	}
989 
990 
991 	for (const DbQuery& query : queries) {
992 		ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
993 
994 		if (!CanExecuteQuery(query)) {
995 
996 #ifdef I2_DEBUG /* I2_DEBUG */
997 			Log(LogDebug, "IdoMysqlConnection")
998 				<< "Scheduling multiple execute query task again: Cannot execute query now. Type '"
999 				<< query.Type << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
1000 #endif /* I2_DEBUG */
1001 
1002 			m_QueryQueue.Enqueue([this, queries]() { InternalExecuteMultipleQueries(queries); }, query.Priority);
1003 			return;
1004 		}
1005 	}
1006 
1007 	for (const DbQuery& query : queries) {
1008 		InternalExecuteQuery(query);
1009 	}
1010 }
1011 
InternalExecuteQuery(const DbQuery & query,int typeOverride)1012 void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOverride)
1013 {
1014 	AssertOnWorkQueue();
1015 
1016 	if (IsPaused() && GetPauseCalled()) {
1017 		DecreasePendingQueries(1);
1018 		return;
1019 	}
1020 
1021 	if (!GetConnected()) {
1022 		DecreasePendingQueries(1);
1023 		return;
1024 	}
1025 
1026 	if (query.Type == DbQueryNewTransaction) {
1027 		DecreasePendingQueries(1);
1028 		InternalNewTransaction();
1029 		return;
1030 	}
1031 
1032 	/* check whether we're allowed to execute the query first */
1033 	if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0) {
1034 		DecreasePendingQueries(1);
1035 		return;
1036 	}
1037 
1038 	if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) {
1039 		DecreasePendingQueries(1);
1040 		return;
1041 	}
1042 
1043 	/* check if there are missing object/insert ids and re-enqueue the query */
1044 	if (!CanExecuteQuery(query)) {
1045 
1046 #ifdef I2_DEBUG /* I2_DEBUG */
1047 		Log(LogDebug, "IdoMysqlConnection")
1048 			<< "Scheduling execute query task again: Cannot execute query now. Type '"
1049 			<< typeOverride << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
1050 #endif /* I2_DEBUG */
1051 
1052 		m_QueryQueue.Enqueue([this, query, typeOverride]() { InternalExecuteQuery(query, typeOverride); }, query.Priority);
1053 		return;
1054 	}
1055 
1056 	std::ostringstream qbuf, where;
1057 	int type;
1058 
1059 	if (query.WhereCriteria) {
1060 		where << " WHERE ";
1061 
1062 		ObjectLock olock(query.WhereCriteria);
1063 		Value value;
1064 		bool first = true;
1065 
1066 		for (const Dictionary::Pair& kv : query.WhereCriteria) {
1067 			if (!FieldToEscapedString(kv.first, kv.second, &value)) {
1068 
1069 #ifdef I2_DEBUG /* I2_DEBUG */
1070 				Log(LogDebug, "IdoMysqlConnection")
1071 					<< "Scheduling execute query task again: Cannot execute query now. Type '"
1072 					<< typeOverride << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
1073 #endif /* I2_DEBUG */
1074 
1075 				m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority);
1076 				return;
1077 			}
1078 
1079 			if (!first)
1080 				where << " AND ";
1081 
1082 			where << kv.first << " = " << value;
1083 
1084 			if (first)
1085 				first = false;
1086 		}
1087 	}
1088 
1089 	type = (typeOverride != -1) ? typeOverride : query.Type;
1090 
1091 	bool upsert = false;
1092 
1093 	if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
1094 		bool hasid = false;
1095 
1096 		if (query.Object) {
1097 			if (query.ConfigUpdate)
1098 				hasid = GetConfigUpdate(query.Object);
1099 			else if (query.StatusUpdate)
1100 				hasid = GetStatusUpdate(query.Object);
1101 		}
1102 
1103 		if (!hasid)
1104 			upsert = true;
1105 
1106 		type = DbQueryUpdate;
1107 	}
1108 
1109 	if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
1110 		std::ostringstream qdel;
1111 		qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
1112 		IncreasePendingQueries(1);
1113 		AsyncQuery(qdel.str());
1114 
1115 		type = DbQueryInsert;
1116 	}
1117 
1118 	switch (type) {
1119 		case DbQueryInsert:
1120 			qbuf << "INSERT INTO " << GetTablePrefix() << query.Table;
1121 			break;
1122 		case DbQueryUpdate:
1123 			qbuf << "UPDATE " << GetTablePrefix() << query.Table << " SET";
1124 			break;
1125 		case DbQueryDelete:
1126 			qbuf << "DELETE FROM " << GetTablePrefix() << query.Table;
1127 			break;
1128 		default:
1129 			VERIFY(!"Invalid query type.");
1130 	}
1131 
1132 	if (type == DbQueryInsert || type == DbQueryUpdate) {
1133 		std::ostringstream colbuf, valbuf;
1134 
1135 		if (type == DbQueryUpdate && query.Fields->GetLength() == 0)
1136 			return;
1137 
1138 		ObjectLock olock(query.Fields);
1139 
1140 		bool first = true;
1141 		for (const Dictionary::Pair& kv : query.Fields) {
1142 			Value value;
1143 
1144 			if (!FieldToEscapedString(kv.first, kv.second, &value)) {
1145 
1146 #ifdef I2_DEBUG /* I2_DEBUG */
1147 				Log(LogDebug, "IdoMysqlConnection")
1148 					<< "Scheduling execute query task again: Cannot extract required INSERT/UPDATE fields, key '"
1149 					<< kv.first << "', val '" << kv.second << "', type " << typeOverride << ", table '" << query.Table << "'.";
1150 #endif /* I2_DEBUG */
1151 
1152 				m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority);
1153 				return;
1154 			}
1155 
1156 			if (type == DbQueryInsert) {
1157 				if (!first) {
1158 					colbuf << ", ";
1159 					valbuf << ", ";
1160 				}
1161 
1162 				colbuf << kv.first;
1163 				valbuf << value;
1164 			} else {
1165 				if (!first)
1166 					qbuf << ", ";
1167 
1168 				qbuf << " " << kv.first << " = " << value;
1169 			}
1170 
1171 			if (first)
1172 				first = false;
1173 		}
1174 
1175 		if (type == DbQueryInsert)
1176 			qbuf << " (" << colbuf.str() << ") VALUES (" << valbuf.str() << ")";
1177 	}
1178 
1179 	if (type != DbQueryInsert)
1180 		qbuf << where.str();
1181 
1182 	AsyncQuery(qbuf.str(), [this, query, type, upsert](const IdoMysqlResult&) { FinishExecuteQuery(query, type, upsert); });
1183 }
1184 
FinishExecuteQuery(const DbQuery & query,int type,bool upsert)1185 void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool upsert)
1186 {
1187 	if (upsert && GetAffectedRows() == 0) {
1188 
1189 #ifdef I2_DEBUG /* I2_DEBUG */
1190 		Log(LogDebug, "IdoMysqlConnection")
1191 			<< "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'.";
1192 #endif /* I2_DEBUG */
1193 
1194 		IncreasePendingQueries(1);
1195 		m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert); }, query.Priority);
1196 
1197 		return;
1198 	}
1199 
1200 	if (type == DbQueryInsert && query.Object) {
1201 		if (query.ConfigUpdate) {
1202 			SetInsertID(query.Object, GetLastInsertID());
1203 			SetConfigUpdate(query.Object, true);
1204 		} else if (query.StatusUpdate)
1205 			SetStatusUpdate(query.Object, true);
1206 	}
1207 
1208 	if (type == DbQueryInsert && query.Table == "notifications" && query.NotificationInsertID)
1209 		query.NotificationInsertID->SetValue(static_cast<long>(GetLastInsertID()));
1210 }
1211 
CleanUpExecuteQuery(const String & table,const String & time_column,double max_age)1212 void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
1213 {
1214 	if (IsPaused())
1215 		return;
1216 
1217 #ifdef I2_DEBUG /* I2_DEBUG */
1218 		Log(LogDebug, "IdoMysqlConnection")
1219 			<< "Rescheduling cleanup query for table '" << table << "' and column '"
1220 			<< time_column << "'. max_age is set to '" << max_age << "'.";
1221 #endif /* I2_DEBUG */
1222 
1223 	IncreasePendingQueries(1);
1224 	m_QueryQueue.Enqueue([this, table, time_column, max_age]() { InternalCleanUpExecuteQuery(table, time_column, max_age); }, PriorityLow, true);
1225 }
1226 
InternalCleanUpExecuteQuery(const String & table,const String & time_column,double max_age)1227 void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
1228 {
1229 	AssertOnWorkQueue();
1230 
1231 	if (IsPaused()) {
1232 		DecreasePendingQueries(1);
1233 		return;
1234 	}
1235 
1236 	if (!GetConnected()) {
1237 		DecreasePendingQueries(1);
1238 		return;
1239 	}
1240 
1241 	AsyncQuery("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
1242 		Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
1243 		" < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")");
1244 }
1245 
FillIDCache(const DbType::Ptr & type)1246 void IdoMysqlConnection::FillIDCache(const DbType::Ptr& type)
1247 {
1248 	String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s";
1249 	IdoMysqlResult result = Query(query);
1250 
1251 	Dictionary::Ptr row;
1252 
1253 	while ((row = FetchRow(result))) {
1254 		DbReference dbref(row->Get("object_id"));
1255 		SetInsertID(type, dbref, DbReference(row->Get(type->GetTable() + "_id")));
1256 		SetConfigHash(type, dbref, row->Get("config_hash"));
1257 	}
1258 }
1259 
GetPendingQueryCount() const1260 int IdoMysqlConnection::GetPendingQueryCount() const
1261 {
1262 	return m_QueryQueue.GetLength();
1263 }
1264