1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "db_ido_mysql/idomysqlconnection.hpp"
4 #include "db_ido_mysql/idomysqlconnection-ti.cpp"
5 #include "db_ido/dbtype.hpp"
6 #include "db_ido/dbvalue.hpp"
7 #include "base/logger.hpp"
8 #include "base/objectlock.hpp"
9 #include "base/convert.hpp"
10 #include "base/utility.hpp"
11 #include "base/perfdatavalue.hpp"
12 #include "base/application.hpp"
13 #include "base/configtype.hpp"
14 #include "base/exception.hpp"
15 #include "base/statsfunction.hpp"
16 #include "base/defer.hpp"
17 #include <utility>
18
19 using namespace icinga;
20
21 REGISTER_TYPE(IdoMysqlConnection);
22 REGISTER_STATSFUNCTION(IdoMysqlConnection, &IdoMysqlConnection::StatsFunc);
23
GetLatestSchemaVersion() const24 const char * IdoMysqlConnection::GetLatestSchemaVersion() const noexcept
25 {
26 return "1.15.1";
27 }
28
GetCompatSchemaVersion() const29 const char * IdoMysqlConnection::GetCompatSchemaVersion() const noexcept
30 {
31 return "1.14.3";
32 }
33
OnConfigLoaded()34 void IdoMysqlConnection::OnConfigLoaded()
35 {
36 ObjectImpl<IdoMysqlConnection>::OnConfigLoaded();
37
38 m_QueryQueue.SetName("IdoMysqlConnection, " + GetName());
39
40 Library shimLibrary{"mysql_shim"};
41
42 auto create_mysql_shim = shimLibrary.GetSymbolAddress<create_mysql_shim_ptr>("create_mysql_shim");
43
44 m_Mysql.reset(create_mysql_shim());
45
46 std::swap(m_Library, shimLibrary);
47 }
48
StatsFunc(const Dictionary::Ptr & status,const Array::Ptr & perfdata)49 void IdoMysqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
50 {
51 DictionaryData nodes;
52
53 for (const IdoMysqlConnection::Ptr& idomysqlconnection : ConfigType::GetObjectsByType<IdoMysqlConnection>()) {
54 size_t queryQueueItems = idomysqlconnection->m_QueryQueue.GetLength();
55 double queryQueueItemRate = idomysqlconnection->m_QueryQueue.GetTaskCount(60) / 60.0;
56
57 nodes.emplace_back(idomysqlconnection->GetName(), new Dictionary({
58 { "version", idomysqlconnection->GetSchemaVersion() },
59 { "instance_name", idomysqlconnection->GetInstanceName() },
60 { "connected", idomysqlconnection->GetConnected() },
61 { "query_queue_items", queryQueueItems },
62 { "query_queue_item_rate", queryQueueItemRate }
63 }));
64
65 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_rate", idomysqlconnection->GetQueryCount(60) / 60.0));
66 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_1min", idomysqlconnection->GetQueryCount(60)));
67 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_5mins", idomysqlconnection->GetQueryCount(5 * 60)));
68 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_15mins", idomysqlconnection->GetQueryCount(15 * 60)));
69 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_query_queue_items", queryQueueItems));
70 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_query_queue_item_rate", queryQueueItemRate));
71 }
72
73 status->Set("idomysqlconnection", new Dictionary(std::move(nodes)));
74 }
75
Resume()76 void IdoMysqlConnection::Resume()
77 {
78 Log(LogInformation, "IdoMysqlConnection")
79 << "'" << GetName() << "' resumed.";
80
81 SetConnected(false);
82
83 m_QueryQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
84
85 /* Immediately try to connect on Resume() without timer. */
86 m_QueryQueue.Enqueue([this]() { Reconnect(); }, PriorityImmediate);
87
88 m_TxTimer = new Timer();
89 m_TxTimer->SetInterval(1);
90 m_TxTimer->OnTimerExpired.connect([this](const Timer * const&) { NewTransaction(); });
91 m_TxTimer->Start();
92
93 m_ReconnectTimer = new Timer();
94 m_ReconnectTimer->SetInterval(10);
95 m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&){ ReconnectTimerHandler(); });
96 m_ReconnectTimer->Start();
97
98 /* Start with queries after connect. */
99 DbConnection::Resume();
100
101 ASSERT(m_Mysql->thread_safe());
102 }
103
Pause()104 void IdoMysqlConnection::Pause()
105 {
106 Log(LogDebug, "IdoMysqlConnection")
107 << "Attempting to pause '" << GetName() << "'.";
108
109 DbConnection::Pause();
110
111 m_ReconnectTimer.reset();
112
113 #ifdef I2_DEBUG /* I2_DEBUG */
114 Log(LogDebug, "IdoMysqlConnection")
115 << "Rescheduling disconnect task.";
116 #endif /* I2_DEBUG */
117
118 Log(LogInformation, "IdoMysqlConnection")
119 << "'" << GetName() << "' paused.";
120
121 }
122
ExceptionHandler(boost::exception_ptr exp)123 void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp)
124 {
125 Log(LogCritical, "IdoMysqlConnection", "Exception during database operation: Verify that your database is operational!");
126
127 Log(LogDebug, "IdoMysqlConnection")
128 << "Exception during database operation: " << DiagnosticInformation(std::move(exp));
129
130 if (GetConnected()) {
131 m_Mysql->close(&m_Connection);
132
133 SetConnected(false);
134 }
135 }
136
AssertOnWorkQueue()137 void IdoMysqlConnection::AssertOnWorkQueue()
138 {
139 ASSERT(m_QueryQueue.IsWorkerThread());
140 }
141
Disconnect()142 void IdoMysqlConnection::Disconnect()
143 {
144 AssertOnWorkQueue();
145
146 if (!GetConnected())
147 return;
148
149 Query("COMMIT");
150 m_Mysql->close(&m_Connection);
151
152 SetConnected(false);
153
154 Log(LogInformation, "IdoMysqlConnection")
155 << "Disconnected from '" << GetName() << "' database '" << GetDatabase() << "'.";
156 }
157
NewTransaction()158 void IdoMysqlConnection::NewTransaction()
159 {
160 if (IsPaused() && GetPauseCalled())
161 return;
162
163 #ifdef I2_DEBUG /* I2_DEBUG */
164 Log(LogDebug, "IdoMysqlConnection")
165 << "Scheduling new transaction and finishing async queries.";
166 #endif /* I2_DEBUG */
167
168 m_QueryQueue.Enqueue([this]() { InternalNewTransaction(); }, PriorityHigh);
169 }
170
InternalNewTransaction()171 void IdoMysqlConnection::InternalNewTransaction()
172 {
173 AssertOnWorkQueue();
174
175 if (!GetConnected())
176 return;
177
178 IncreasePendingQueries(2);
179
180 AsyncQuery("COMMIT");
181 AsyncQuery("BEGIN");
182
183 FinishAsyncQueries();
184 }
185
ReconnectTimerHandler()186 void IdoMysqlConnection::ReconnectTimerHandler()
187 {
188 #ifdef I2_DEBUG /* I2_DEBUG */
189 Log(LogDebug, "IdoMysqlConnection")
190 << "Scheduling reconnect task.";
191 #endif /* I2_DEBUG */
192
193 /* Only allow Reconnect events with high priority. */
194 m_QueryQueue.Enqueue([this]() { Reconnect(); }, PriorityImmediate);
195 }
196
Reconnect()197 void IdoMysqlConnection::Reconnect()
198 {
199 AssertOnWorkQueue();
200
201 if (!IsActive())
202 return;
203
204 CONTEXT("Reconnecting to MySQL IDO database '" + GetName() + "'");
205
206 double startTime = Utility::GetTime();
207
208 SetShouldConnect(true);
209
210 bool reconnect = false;
211
212 /* Ensure to close old connections first. */
213 if (GetConnected()) {
214 /* Check if we're really still connected */
215 if (m_Mysql->ping(&m_Connection) == 0)
216 return;
217
218 m_Mysql->close(&m_Connection);
219 SetConnected(false);
220 reconnect = true;
221 }
222
223 Log(LogDebug, "IdoMysqlConnection")
224 << "Reconnect: Clearing ID cache.";
225
226 ClearIDCache();
227
228 String ihost, isocket_path, iuser, ipasswd, idb;
229 String isslKey, isslCert, isslCa, isslCaPath, isslCipher;
230 const char *host, *socket_path, *user , *passwd, *db;
231 const char *sslKey, *sslCert, *sslCa, *sslCaPath, *sslCipher;
232 bool enableSsl;
233 long port;
234
235 ihost = GetHost();
236 isocket_path = GetSocketPath();
237 iuser = GetUser();
238 ipasswd = GetPassword();
239 idb = GetDatabase();
240
241 enableSsl = GetEnableSsl();
242 isslKey = GetSslKey();
243 isslCert = GetSslCert();
244 isslCa = GetSslCa();
245 isslCaPath = GetSslCapath();
246 isslCipher = GetSslCipher();
247
248 host = (!ihost.IsEmpty()) ? ihost.CStr() : nullptr;
249 port = GetPort();
250 socket_path = (!isocket_path.IsEmpty()) ? isocket_path.CStr() : nullptr;
251 user = (!iuser.IsEmpty()) ? iuser.CStr() : nullptr;
252 passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : nullptr;
253 db = (!idb.IsEmpty()) ? idb.CStr() : nullptr;
254
255 sslKey = (!isslKey.IsEmpty()) ? isslKey.CStr() : nullptr;
256 sslCert = (!isslCert.IsEmpty()) ? isslCert.CStr() : nullptr;
257 sslCa = (!isslCa.IsEmpty()) ? isslCa.CStr() : nullptr;
258 sslCaPath = (!isslCaPath.IsEmpty()) ? isslCaPath.CStr() : nullptr;
259 sslCipher = (!isslCipher.IsEmpty()) ? isslCipher.CStr() : nullptr;
260
261 /* connection */
262 if (!m_Mysql->init(&m_Connection)) {
263 Log(LogCritical, "IdoMysqlConnection")
264 << "mysql_init() failed: out of memory";
265
266 BOOST_THROW_EXCEPTION(std::bad_alloc());
267 }
268
269 if (enableSsl)
270 m_Mysql->ssl_set(&m_Connection, sslKey, sslCert, sslCa, sslCaPath, sslCipher);
271
272 if (!m_Mysql->real_connect(&m_Connection, host, user, passwd, db, port, socket_path, CLIENT_FOUND_ROWS | CLIENT_MULTI_STATEMENTS)) {
273 Log(LogCritical, "IdoMysqlConnection")
274 << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port
275 << "' " << (enableSsl ? "(SSL enabled) " : "") << "failed: \"" << m_Mysql->error(&m_Connection) << "\"";
276
277 BOOST_THROW_EXCEPTION(std::runtime_error(m_Mysql->error(&m_Connection)));
278 }
279
280 Log(LogNotice, "IdoMysqlConnection")
281 << "Reconnect: '" << GetName() << "' is now connected to database '" << GetDatabase() << "'.";
282
283 SetConnected(true);
284
285 IdoMysqlResult result = Query("SELECT @@global.max_allowed_packet AS max_allowed_packet");
286
287 Dictionary::Ptr row = FetchRow(result);
288
289 if (row)
290 m_MaxPacketSize = row->Get("max_allowed_packet");
291 else
292 m_MaxPacketSize = 64 * 1024;
293
294 DiscardRows(result);
295
296 String dbVersionName = "idoutils";
297 result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
298
299 row = FetchRow(result);
300
301 if (!row) {
302 m_Mysql->close(&m_Connection);
303 SetConnected(false);
304
305 Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
306
307 BOOST_THROW_EXCEPTION(std::runtime_error("Invalid schema."));
308 }
309
310 DiscardRows(result);
311
312 String version = row->Get("version");
313
314 SetSchemaVersion(version);
315
316 if (Utility::CompareVersion(GetCompatSchemaVersion(), version) < 0) {
317 m_Mysql->close(&m_Connection);
318 SetConnected(false);
319
320 Log(LogCritical, "IdoMysqlConnection")
321 << "Schema version '" << version << "' does not match the required version '"
322 << GetCompatSchemaVersion() << "' (or newer)! Please check the upgrade documentation at "
323 << "https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/#upgrading-mysql-db";
324
325 BOOST_THROW_EXCEPTION(std::runtime_error("Schema version mismatch."));
326 }
327
328 String instanceName = GetInstanceName();
329
330 result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
331 row = FetchRow(result);
332
333 if (!row) {
334 Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
335 m_InstanceID = GetLastInsertID();
336 } else {
337 m_InstanceID = DbReference(row->Get("instance_id"));
338 }
339
340 DiscardRows(result);
341
342 Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
343
344 /* we have an endpoint in a cluster setup, so decide if we can proceed here */
345 if (my_endpoint && GetHAMode() == HARunOnce) {
346 /* get the current endpoint writing to programstatus table */
347 result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
348 GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
349 row = FetchRow(result);
350 DiscardRows(result);
351
352 String endpoint_name;
353
354 if (row)
355 endpoint_name = row->Get("endpoint_name");
356 else
357 Log(LogNotice, "IdoMysqlConnection", "Empty program status table");
358
359 /* if we did not write into the database earlier, another instance is active */
360 if (endpoint_name != my_endpoint->GetName()) {
361 double status_update_time;
362
363 if (row)
364 status_update_time = row->Get("status_update_time");
365 else
366 status_update_time = 0;
367
368 double now = Utility::GetTime();
369
370 double status_update_age = now - status_update_time;
371 double failoverTimeout = GetFailoverTimeout();
372
373 if (status_update_age < failoverTimeout) {
374 Log(LogInformation, "IdoMysqlConnection")
375 << "Last update by endpoint '" << endpoint_name << "' was "
376 << status_update_age << "s ago (< failover timeout of " << failoverTimeout << "s). Retrying.";
377
378 m_Mysql->close(&m_Connection);
379 SetConnected(false);
380 SetShouldConnect(false);
381
382 return;
383 }
384
385 /* activate the IDO only, if we're authoritative in this zone */
386 if (IsPaused()) {
387 Log(LogNotice, "IdoMysqlConnection")
388 << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out.";
389
390 m_Mysql->close(&m_Connection);
391 SetConnected(false);
392
393 return;
394 }
395
396 SetLastFailover(now);
397
398 Log(LogInformation, "IdoMysqlConnection")
399 << "Last update by endpoint '" << endpoint_name << "' was "
400 << status_update_age << "s ago. Taking over '" << GetName() << "' in HA zone '" << Zone::GetLocalZone()->GetName() << "'.";
401 }
402
403 Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection in HA zone.");
404 }
405
406 Log(LogInformation, "IdoMysqlConnection")
407 << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
408
409 /* set session time zone to utc */
410 Query("SET SESSION TIME_ZONE='+00:00'");
411
412 Query("SET SESSION SQL_MODE='NO_AUTO_VALUE_ON_ZERO'");
413
414 Query("BEGIN");
415
416 /* update programstatus table */
417 UpdateProgramStatus();
418
419 /* record connection */
420 Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
421 "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
422 + Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_mysql', '" + Escape(Application::GetAppVersion())
423 + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
424
425 /* clear config tables for the initial config dump */
426 PrepareDatabase();
427
428 std::ostringstream q1buf;
429 q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
430 result = Query(q1buf.str());
431
432 std::vector<DbObject::Ptr> activeDbObjs;
433
434 while ((row = FetchRow(result))) {
435 DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
436
437 if (!dbtype)
438 continue;
439
440 DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
441 SetObjectID(dbobj, DbReference(row->Get("object_id")));
442 bool active = row->Get("is_active");
443 SetObjectActive(dbobj, active);
444
445 if (active)
446 activeDbObjs.emplace_back(std::move(dbobj));
447 }
448
449 SetIDCacheValid(true);
450
451 EnableActiveChangedHandler();
452
453 for (const DbObject::Ptr& dbobj : activeDbObjs) {
454 if (dbobj->GetObject())
455 continue;
456
457 Log(LogNotice, "IdoMysqlConnection")
458 << "Deactivate deleted object name1: '" << dbobj->GetName1()
459 << "' name2: '" << dbobj->GetName2() + "'.";
460 DeactivateObject(dbobj);
461 }
462
463 UpdateAllObjects();
464
465 #ifdef I2_DEBUG /* I2_DEBUG */
466 Log(LogDebug, "IdoMysqlConnection")
467 << "Scheduling session table clear and finish connect task.";
468 #endif /* I2_DEBUG */
469
470 m_QueryQueue.Enqueue([this]() { ClearTablesBySession(); }, PriorityNormal);
471
472 m_QueryQueue.Enqueue([this, startTime]() { FinishConnect(startTime); }, PriorityNormal);
473 }
474
FinishConnect(double startTime)475 void IdoMysqlConnection::FinishConnect(double startTime)
476 {
477 AssertOnWorkQueue();
478
479 if (!GetConnected() || IsPaused())
480 return;
481
482 FinishAsyncQueries();
483
484 Log(LogInformation, "IdoMysqlConnection")
485 << "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in "
486 << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
487
488 Query("COMMIT");
489 Query("BEGIN");
490 }
491
ClearTablesBySession()492 void IdoMysqlConnection::ClearTablesBySession()
493 {
494 /* delete all comments and downtimes without current session token */
495 ClearTableBySession("comments");
496 ClearTableBySession("scheduleddowntime");
497 }
498
ClearTableBySession(const String & table)499 void IdoMysqlConnection::ClearTableBySession(const String& table)
500 {
501 Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
502 Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " +
503 Convert::ToString(GetSessionToken()));
504 }
505
AsyncQuery(const String & query,const std::function<void (const IdoMysqlResult &)> & callback)506 void IdoMysqlConnection::AsyncQuery(const String& query, const std::function<void (const IdoMysqlResult&)>& callback)
507 {
508 AssertOnWorkQueue();
509
510 IdoAsyncQuery aq;
511 aq.Query = query;
512 /* XXX: Important: The callback must not immediately execute a query, but enqueue it!
513 * See https://github.com/Icinga/icinga2/issues/4603 for details.
514 */
515 aq.Callback = callback;
516 m_AsyncQueries.emplace_back(std::move(aq));
517 }
518
FinishAsyncQueries()519 void IdoMysqlConnection::FinishAsyncQueries()
520 {
521 std::vector<IdoAsyncQuery> queries;
522 m_AsyncQueries.swap(queries);
523
524 std::vector<IdoAsyncQuery>::size_type offset = 0;
525
526 // This will be executed if there is a problem with executing the queries,
527 // at which point this function throws an exception and the queries should
528 // not be listed as still pending in the queue.
529 Defer decreaseQueries ([this, &offset, &queries]() {
530 auto lostQueries = queries.size() - offset;
531
532 if (lostQueries > 0) {
533 DecreasePendingQueries(lostQueries);
534 }
535 });
536
537 while (offset < queries.size()) {
538 std::ostringstream querybuf;
539
540 std::vector<IdoAsyncQuery>::size_type count = 0;
541 size_t num_bytes = 0;
542
543 Defer decreaseQueries ([this, &offset, &count]() {
544 offset += count;
545 DecreasePendingQueries(count);
546 m_UncommittedAsyncQueries += count;
547 });
548
549 for (std::vector<IdoAsyncQuery>::size_type i = offset; i < queries.size(); i++) {
550 const IdoAsyncQuery& aq = queries[i];
551
552 size_t size_query = aq.Query.GetLength() + 1;
553
554 if (count > 0) {
555 if (num_bytes + size_query > m_MaxPacketSize - 512)
556 break;
557
558 querybuf << ";";
559 }
560
561 IncreaseQueryCount();
562 count++;
563
564 Log(LogDebug, "IdoMysqlConnection")
565 << "Query: " << aq.Query;
566
567 querybuf << aq.Query;
568 num_bytes += size_query;
569 }
570
571 String query = querybuf.str();
572
573 if (m_Mysql->query(&m_Connection, query.CStr()) != 0) {
574 std::ostringstream msgbuf;
575 String message = m_Mysql->error(&m_Connection);
576 msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
577 Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
578
579 BOOST_THROW_EXCEPTION(
580 database_error()
581 << errinfo_message(m_Mysql->error(&m_Connection))
582 << errinfo_database_query(query)
583 );
584 }
585
586 for (std::vector<IdoAsyncQuery>::size_type i = offset; i < offset + count; i++) {
587 const IdoAsyncQuery& aq = queries[i];
588
589 MYSQL_RES *result = m_Mysql->store_result(&m_Connection);
590
591 m_AffectedRows = m_Mysql->affected_rows(&m_Connection);
592
593 IdoMysqlResult iresult;
594
595 if (!result) {
596 if (m_Mysql->field_count(&m_Connection) > 0) {
597 std::ostringstream msgbuf;
598 String message = m_Mysql->error(&m_Connection);
599 msgbuf << "Error \"" << message << "\" when executing query \"" << aq.Query << "\"";
600 Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
601
602 BOOST_THROW_EXCEPTION(
603 database_error()
604 << errinfo_message(m_Mysql->error(&m_Connection))
605 << errinfo_database_query(query)
606 );
607 }
608 } else
609 iresult = IdoMysqlResult(result, [this](MYSQL_RES* result) { m_Mysql->free_result(result); });
610
611 if (aq.Callback)
612 aq.Callback(iresult);
613
614 if (m_Mysql->next_result(&m_Connection) > 0) {
615 std::ostringstream msgbuf;
616 String message = m_Mysql->error(&m_Connection);
617 msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
618 Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
619
620 BOOST_THROW_EXCEPTION(
621 database_error()
622 << errinfo_message(m_Mysql->error(&m_Connection))
623 << errinfo_database_query(query)
624 );
625 }
626 }
627 }
628
629 if (m_UncommittedAsyncQueries > 25000) {
630 m_UncommittedAsyncQueries = 0;
631
632 Query("COMMIT");
633 Query("BEGIN");
634 }
635 }
636
Query(const String & query)637 IdoMysqlResult IdoMysqlConnection::Query(const String& query)
638 {
639 AssertOnWorkQueue();
640
641 IncreasePendingQueries(1);
642 Defer decreaseQueries ([this]() { DecreasePendingQueries(1); });
643
644 /* finish all async queries to maintain the right order for queries */
645 FinishAsyncQueries();
646
647 Log(LogDebug, "IdoMysqlConnection")
648 << "Query: " << query;
649
650 IncreaseQueryCount();
651
652 if (m_Mysql->query(&m_Connection, query.CStr()) != 0) {
653 std::ostringstream msgbuf;
654 String message = m_Mysql->error(&m_Connection);
655 msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
656 Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
657
658 BOOST_THROW_EXCEPTION(
659 database_error()
660 << errinfo_message(m_Mysql->error(&m_Connection))
661 << errinfo_database_query(query)
662 );
663 }
664
665 MYSQL_RES *result = m_Mysql->store_result(&m_Connection);
666
667 m_AffectedRows = m_Mysql->affected_rows(&m_Connection);
668
669 if (!result) {
670 if (m_Mysql->field_count(&m_Connection) > 0) {
671 std::ostringstream msgbuf;
672 String message = m_Mysql->error(&m_Connection);
673 msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
674 Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
675
676 BOOST_THROW_EXCEPTION(
677 database_error()
678 << errinfo_message(m_Mysql->error(&m_Connection))
679 << errinfo_database_query(query)
680 );
681 }
682
683 return IdoMysqlResult();
684 }
685
686 return IdoMysqlResult(result, [this](MYSQL_RES* result) { m_Mysql->free_result(result); });
687 }
688
GetLastInsertID()689 DbReference IdoMysqlConnection::GetLastInsertID()
690 {
691 AssertOnWorkQueue();
692
693 return {static_cast<long>(m_Mysql->insert_id(&m_Connection))};
694 }
695
GetAffectedRows()696 int IdoMysqlConnection::GetAffectedRows()
697 {
698 AssertOnWorkQueue();
699
700 return m_AffectedRows;
701 }
702
Escape(const String & s)703 String IdoMysqlConnection::Escape(const String& s)
704 {
705 AssertOnWorkQueue();
706
707 String utf8s = Utility::ValidateUTF8(s);
708
709 size_t length = utf8s.GetLength();
710 auto *to = new char[utf8s.GetLength() * 2 + 1];
711
712 m_Mysql->real_escape_string(&m_Connection, to, utf8s.CStr(), length);
713
714 String result = String(to);
715
716 delete [] to;
717
718 return result;
719 }
720
FetchRow(const IdoMysqlResult & result)721 Dictionary::Ptr IdoMysqlConnection::FetchRow(const IdoMysqlResult& result)
722 {
723 AssertOnWorkQueue();
724
725 MYSQL_ROW row;
726 MYSQL_FIELD *field;
727 unsigned long *lengths, i;
728
729 row = m_Mysql->fetch_row(result.get());
730
731 if (!row)
732 return nullptr;
733
734 lengths = m_Mysql->fetch_lengths(result.get());
735
736 if (!lengths)
737 return nullptr;
738
739 Dictionary::Ptr dict = new Dictionary();
740
741 m_Mysql->field_seek(result.get(), 0);
742 for (field = m_Mysql->fetch_field(result.get()), i = 0; field; field = m_Mysql->fetch_field(result.get()), i++)
743 dict->Set(field->name, String(row[i], row[i] + lengths[i]));
744
745 return dict;
746 }
747
DiscardRows(const IdoMysqlResult & result)748 void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result)
749 {
750 Dictionary::Ptr row;
751
752 while ((row = FetchRow(result)))
753 ; /* empty loop body */
754 }
755
ActivateObject(const DbObject::Ptr & dbobj)756 void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
757 {
758 if (IsPaused())
759 return;
760
761 #ifdef I2_DEBUG /* I2_DEBUG */
762 Log(LogDebug, "IdoMysqlConnection")
763 << "Scheduling object activation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
764 #endif /* I2_DEBUG */
765
766 m_QueryQueue.Enqueue([this, dbobj]() { InternalActivateObject(dbobj); }, PriorityNormal);
767 }
768
InternalActivateObject(const DbObject::Ptr & dbobj)769 void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
770 {
771 AssertOnWorkQueue();
772
773 if (IsPaused())
774 return;
775
776 if (!GetConnected())
777 return;
778
779 DbReference dbref = GetObjectID(dbobj);
780 std::ostringstream qbuf;
781
782 if (!dbref.IsValid()) {
783 if (!dbobj->GetName2().IsEmpty()) {
784 qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("
785 << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
786 << "'" << Escape(dbobj->GetName1()) << "', '" << Escape(dbobj->GetName2()) << "', 1)";
787 } else {
788 qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, is_active) VALUES ("
789 << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
790 << "'" << Escape(dbobj->GetName1()) << "', 1)";
791 }
792
793 Query(qbuf.str());
794 SetObjectID(dbobj, GetLastInsertID());
795 } else {
796 qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
797 IncreasePendingQueries(1);
798 AsyncQuery(qbuf.str());
799 }
800 }
801
DeactivateObject(const DbObject::Ptr & dbobj)802 void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
803 {
804 if (IsPaused())
805 return;
806
807 #ifdef I2_DEBUG /* I2_DEBUG */
808 Log(LogDebug, "IdoMysqlConnection")
809 << "Scheduling object deactivation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
810 #endif /* I2_DEBUG */
811
812 m_QueryQueue.Enqueue([this, dbobj]() { InternalDeactivateObject(dbobj); }, PriorityNormal);
813 }
814
InternalDeactivateObject(const DbObject::Ptr & dbobj)815 void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
816 {
817 AssertOnWorkQueue();
818
819 if (IsPaused())
820 return;
821
822 if (!GetConnected())
823 return;
824
825 DbReference dbref = GetObjectID(dbobj);
826
827 if (!dbref.IsValid())
828 return;
829
830 std::ostringstream qbuf;
831 qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
832 IncreasePendingQueries(1);
833 AsyncQuery(qbuf.str());
834
835 /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
836 * because the object is still in the database. */
837
838 SetObjectActive(dbobj, false);
839 }
840
FieldToEscapedString(const String & key,const Value & value,Value * result)841 bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
842 {
843 if (key == "instance_id") {
844 *result = static_cast<long>(m_InstanceID);
845 return true;
846 } else if (key == "session_token") {
847 *result = GetSessionToken();
848 return true;
849 }
850
851 Value rawvalue = DbValue::ExtractValue(value);
852
853 if (rawvalue.GetType() == ValueEmpty) {
854 *result = "NULL";
855 } else if (rawvalue.IsObjectType<ConfigObject>()) {
856 DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(rawvalue);
857
858 if (!dbobjcol) {
859 *result = 0;
860 return true;
861 }
862
863 if (!IsIDCacheValid())
864 return false;
865
866 DbReference dbrefcol;
867
868 if (DbValue::IsObjectInsertID(value)) {
869 dbrefcol = GetInsertID(dbobjcol);
870
871 if (!dbrefcol.IsValid())
872 return false;
873 } else {
874 dbrefcol = GetObjectID(dbobjcol);
875
876 if (!dbrefcol.IsValid()) {
877 InternalActivateObject(dbobjcol);
878
879 dbrefcol = GetObjectID(dbobjcol);
880
881 if (!dbrefcol.IsValid())
882 return false;
883 }
884 }
885
886 *result = static_cast<long>(dbrefcol);
887 } else if (DbValue::IsTimestamp(value)) {
888 long ts = rawvalue;
889 std::ostringstream msgbuf;
890 msgbuf << "FROM_UNIXTIME(" << ts << ")";
891 *result = Value(msgbuf.str());
892 } else if (DbValue::IsObjectInsertID(value)) {
893 auto id = static_cast<long>(rawvalue);
894
895 if (id <= 0)
896 return false;
897
898 *result = id;
899 return true;
900 } else {
901 Value fvalue;
902
903 if (rawvalue.IsBoolean())
904 fvalue = Convert::ToLong(rawvalue);
905 else
906 fvalue = rawvalue;
907
908 *result = "'" + Escape(fvalue) + "'";
909 }
910
911 return true;
912 }
913
ExecuteQuery(const DbQuery & query)914 void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
915 {
916 if (IsPaused() && GetPauseCalled())
917 return;
918
919 ASSERT(query.Category != DbCatInvalid);
920
921 #ifdef I2_DEBUG /* I2_DEBUG */
922 Log(LogDebug, "IdoMysqlConnection")
923 << "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'.";
924 #endif /* I2_DEBUG */
925
926 IncreasePendingQueries(1);
927 m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority, true);
928 }
929
ExecuteMultipleQueries(const std::vector<DbQuery> & queries)930 void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
931 {
932 if (IsPaused())
933 return;
934
935 if (queries.empty())
936 return;
937
938 #ifdef I2_DEBUG /* I2_DEBUG */
939 Log(LogDebug, "IdoMysqlConnection")
940 << "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'.";
941 #endif /* I2_DEBUG */
942
943 IncreasePendingQueries(queries.size());
944 m_QueryQueue.Enqueue([this, queries]() { InternalExecuteMultipleQueries(queries); }, queries[0].Priority, true);
945 }
946
CanExecuteQuery(const DbQuery & query)947 bool IdoMysqlConnection::CanExecuteQuery(const DbQuery& query)
948 {
949 if (query.Object && !IsIDCacheValid())
950 return false;
951
952 if (query.WhereCriteria) {
953 ObjectLock olock(query.WhereCriteria);
954 Value value;
955
956 for (const Dictionary::Pair& kv : query.WhereCriteria) {
957 if (!FieldToEscapedString(kv.first, kv.second, &value))
958 return false;
959 }
960 }
961
962 if (query.Fields) {
963 ObjectLock olock(query.Fields);
964
965 for (const Dictionary::Pair& kv : query.Fields) {
966 Value value;
967
968 if (!FieldToEscapedString(kv.first, kv.second, &value))
969 return false;
970 }
971 }
972
973 return true;
974 }
975
InternalExecuteMultipleQueries(const std::vector<DbQuery> & queries)976 void IdoMysqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuery>& queries)
977 {
978 AssertOnWorkQueue();
979
980 if (IsPaused()) {
981 DecreasePendingQueries(queries.size());
982 return;
983 }
984
985 if (!GetConnected()) {
986 DecreasePendingQueries(queries.size());
987 return;
988 }
989
990
991 for (const DbQuery& query : queries) {
992 ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
993
994 if (!CanExecuteQuery(query)) {
995
996 #ifdef I2_DEBUG /* I2_DEBUG */
997 Log(LogDebug, "IdoMysqlConnection")
998 << "Scheduling multiple execute query task again: Cannot execute query now. Type '"
999 << query.Type << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
1000 #endif /* I2_DEBUG */
1001
1002 m_QueryQueue.Enqueue([this, queries]() { InternalExecuteMultipleQueries(queries); }, query.Priority);
1003 return;
1004 }
1005 }
1006
1007 for (const DbQuery& query : queries) {
1008 InternalExecuteQuery(query);
1009 }
1010 }
1011
InternalExecuteQuery(const DbQuery & query,int typeOverride)1012 void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOverride)
1013 {
1014 AssertOnWorkQueue();
1015
1016 if (IsPaused() && GetPauseCalled()) {
1017 DecreasePendingQueries(1);
1018 return;
1019 }
1020
1021 if (!GetConnected()) {
1022 DecreasePendingQueries(1);
1023 return;
1024 }
1025
1026 if (query.Type == DbQueryNewTransaction) {
1027 DecreasePendingQueries(1);
1028 InternalNewTransaction();
1029 return;
1030 }
1031
1032 /* check whether we're allowed to execute the query first */
1033 if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0) {
1034 DecreasePendingQueries(1);
1035 return;
1036 }
1037
1038 if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) {
1039 DecreasePendingQueries(1);
1040 return;
1041 }
1042
1043 /* check if there are missing object/insert ids and re-enqueue the query */
1044 if (!CanExecuteQuery(query)) {
1045
1046 #ifdef I2_DEBUG /* I2_DEBUG */
1047 Log(LogDebug, "IdoMysqlConnection")
1048 << "Scheduling execute query task again: Cannot execute query now. Type '"
1049 << typeOverride << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
1050 #endif /* I2_DEBUG */
1051
1052 m_QueryQueue.Enqueue([this, query, typeOverride]() { InternalExecuteQuery(query, typeOverride); }, query.Priority);
1053 return;
1054 }
1055
1056 std::ostringstream qbuf, where;
1057 int type;
1058
1059 if (query.WhereCriteria) {
1060 where << " WHERE ";
1061
1062 ObjectLock olock(query.WhereCriteria);
1063 Value value;
1064 bool first = true;
1065
1066 for (const Dictionary::Pair& kv : query.WhereCriteria) {
1067 if (!FieldToEscapedString(kv.first, kv.second, &value)) {
1068
1069 #ifdef I2_DEBUG /* I2_DEBUG */
1070 Log(LogDebug, "IdoMysqlConnection")
1071 << "Scheduling execute query task again: Cannot execute query now. Type '"
1072 << typeOverride << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
1073 #endif /* I2_DEBUG */
1074
1075 m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority);
1076 return;
1077 }
1078
1079 if (!first)
1080 where << " AND ";
1081
1082 where << kv.first << " = " << value;
1083
1084 if (first)
1085 first = false;
1086 }
1087 }
1088
1089 type = (typeOverride != -1) ? typeOverride : query.Type;
1090
1091 bool upsert = false;
1092
1093 if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
1094 bool hasid = false;
1095
1096 if (query.Object) {
1097 if (query.ConfigUpdate)
1098 hasid = GetConfigUpdate(query.Object);
1099 else if (query.StatusUpdate)
1100 hasid = GetStatusUpdate(query.Object);
1101 }
1102
1103 if (!hasid)
1104 upsert = true;
1105
1106 type = DbQueryUpdate;
1107 }
1108
1109 if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
1110 std::ostringstream qdel;
1111 qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
1112 IncreasePendingQueries(1);
1113 AsyncQuery(qdel.str());
1114
1115 type = DbQueryInsert;
1116 }
1117
1118 switch (type) {
1119 case DbQueryInsert:
1120 qbuf << "INSERT INTO " << GetTablePrefix() << query.Table;
1121 break;
1122 case DbQueryUpdate:
1123 qbuf << "UPDATE " << GetTablePrefix() << query.Table << " SET";
1124 break;
1125 case DbQueryDelete:
1126 qbuf << "DELETE FROM " << GetTablePrefix() << query.Table;
1127 break;
1128 default:
1129 VERIFY(!"Invalid query type.");
1130 }
1131
1132 if (type == DbQueryInsert || type == DbQueryUpdate) {
1133 std::ostringstream colbuf, valbuf;
1134
1135 if (type == DbQueryUpdate && query.Fields->GetLength() == 0)
1136 return;
1137
1138 ObjectLock olock(query.Fields);
1139
1140 bool first = true;
1141 for (const Dictionary::Pair& kv : query.Fields) {
1142 Value value;
1143
1144 if (!FieldToEscapedString(kv.first, kv.second, &value)) {
1145
1146 #ifdef I2_DEBUG /* I2_DEBUG */
1147 Log(LogDebug, "IdoMysqlConnection")
1148 << "Scheduling execute query task again: Cannot extract required INSERT/UPDATE fields, key '"
1149 << kv.first << "', val '" << kv.second << "', type " << typeOverride << ", table '" << query.Table << "'.";
1150 #endif /* I2_DEBUG */
1151
1152 m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority);
1153 return;
1154 }
1155
1156 if (type == DbQueryInsert) {
1157 if (!first) {
1158 colbuf << ", ";
1159 valbuf << ", ";
1160 }
1161
1162 colbuf << kv.first;
1163 valbuf << value;
1164 } else {
1165 if (!first)
1166 qbuf << ", ";
1167
1168 qbuf << " " << kv.first << " = " << value;
1169 }
1170
1171 if (first)
1172 first = false;
1173 }
1174
1175 if (type == DbQueryInsert)
1176 qbuf << " (" << colbuf.str() << ") VALUES (" << valbuf.str() << ")";
1177 }
1178
1179 if (type != DbQueryInsert)
1180 qbuf << where.str();
1181
1182 AsyncQuery(qbuf.str(), [this, query, type, upsert](const IdoMysqlResult&) { FinishExecuteQuery(query, type, upsert); });
1183 }
1184
FinishExecuteQuery(const DbQuery & query,int type,bool upsert)1185 void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool upsert)
1186 {
1187 if (upsert && GetAffectedRows() == 0) {
1188
1189 #ifdef I2_DEBUG /* I2_DEBUG */
1190 Log(LogDebug, "IdoMysqlConnection")
1191 << "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'.";
1192 #endif /* I2_DEBUG */
1193
1194 IncreasePendingQueries(1);
1195 m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert); }, query.Priority);
1196
1197 return;
1198 }
1199
1200 if (type == DbQueryInsert && query.Object) {
1201 if (query.ConfigUpdate) {
1202 SetInsertID(query.Object, GetLastInsertID());
1203 SetConfigUpdate(query.Object, true);
1204 } else if (query.StatusUpdate)
1205 SetStatusUpdate(query.Object, true);
1206 }
1207
1208 if (type == DbQueryInsert && query.Table == "notifications" && query.NotificationInsertID)
1209 query.NotificationInsertID->SetValue(static_cast<long>(GetLastInsertID()));
1210 }
1211
CleanUpExecuteQuery(const String & table,const String & time_column,double max_age)1212 void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
1213 {
1214 if (IsPaused())
1215 return;
1216
1217 #ifdef I2_DEBUG /* I2_DEBUG */
1218 Log(LogDebug, "IdoMysqlConnection")
1219 << "Rescheduling cleanup query for table '" << table << "' and column '"
1220 << time_column << "'. max_age is set to '" << max_age << "'.";
1221 #endif /* I2_DEBUG */
1222
1223 IncreasePendingQueries(1);
1224 m_QueryQueue.Enqueue([this, table, time_column, max_age]() { InternalCleanUpExecuteQuery(table, time_column, max_age); }, PriorityLow, true);
1225 }
1226
InternalCleanUpExecuteQuery(const String & table,const String & time_column,double max_age)1227 void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
1228 {
1229 AssertOnWorkQueue();
1230
1231 if (IsPaused()) {
1232 DecreasePendingQueries(1);
1233 return;
1234 }
1235
1236 if (!GetConnected()) {
1237 DecreasePendingQueries(1);
1238 return;
1239 }
1240
1241 AsyncQuery("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
1242 Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
1243 " < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")");
1244 }
1245
FillIDCache(const DbType::Ptr & type)1246 void IdoMysqlConnection::FillIDCache(const DbType::Ptr& type)
1247 {
1248 String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s";
1249 IdoMysqlResult result = Query(query);
1250
1251 Dictionary::Ptr row;
1252
1253 while ((row = FetchRow(result))) {
1254 DbReference dbref(row->Get("object_id"));
1255 SetInsertID(type, dbref, DbReference(row->Get(type->GetTable() + "_id")));
1256 SetConfigHash(type, dbref, row->Get("config_hash"));
1257 }
1258 }
1259
GetPendingQueryCount() const1260 int IdoMysqlConnection::GetPendingQueryCount() const
1261 {
1262 return m_QueryQueue.GetLength();
1263 }
1264