1 /*
2  *
3  * (C) 2010-2020 Anope Team
4  * Contact us at team@anope.org
5  *
6  * Please read COPYING and README for further details.
7  */
8 
9 #include "module.h"
10 #include "modules/sql.h"
11 #define NO_CLIENT_LONG_LONG
12 #ifdef WIN32
13 # include <mysql.h>
14 #else
15 # include <mysql/mysql.h>
16 #endif
17 
18 using namespace SQL;
19 
20 /** Non blocking threaded MySQL API, based loosely from InspIRCd's m_mysql.cpp
21  *
22  * This module spawns a single thread that is used to execute blocking MySQL queries.
23  * When a module requests a query to be executed it is added to a list for the thread
24  * (which never stops looping and sleeing) to pick up and execute, the result of which
25  * is inserted in to another queue to be picked up by the main thread. The main thread
26  * uses Pipe to become notified through the socket engine when there are results waiting
27  * to be sent back to the modules requesting the query
28  */
29 
30 class MySQLService;
31 
32 /** A query request
33  */
34 struct QueryRequest
35 {
36 	/* The connection to the database */
37 	MySQLService *service;
38 	/* The interface to use once we have the result to send the data back */
39 	Interface *sqlinterface;
40 	/* The actual query */
41 	Query query;
42 
QueryRequestQueryRequest43 	QueryRequest(MySQLService *s, Interface *i, const Query &q) : service(s), sqlinterface(i), query(q) { }
44 };
45 
46 /** A query result */
47 struct QueryResult
48 {
49 	/* The interface to send the data back on */
50 	Interface *sqlinterface;
51 	/* The result */
52 	Result result;
53 
QueryResultQueryResult54 	QueryResult(Interface *i, Result &r) : sqlinterface(i), result(r) { }
55 };
56 
57 /** A MySQL result
58  */
59 class MySQLResult : public Result
60 {
61 	MYSQL_RES *res;
62 
63  public:
MySQLResult(unsigned int i,const Query & q,const Anope::string & fq,MYSQL_RES * r)64 	MySQLResult(unsigned int i, const Query &q, const Anope::string &fq, MYSQL_RES *r) : Result(i, q, fq), res(r)
65 	{
66 		unsigned num_fields = res ? mysql_num_fields(res) : 0;
67 
68 		/* It is not thread safe to log anything here using Log(this->owner) now :( */
69 
70 		if (!num_fields)
71 			return;
72 
73 		for (MYSQL_ROW row; (row = mysql_fetch_row(res));)
74 		{
75 			MYSQL_FIELD *fields = mysql_fetch_fields(res);
76 
77 			if (fields)
78 			{
79 				std::map<Anope::string, Anope::string> items;
80 
81 				for (unsigned field_count = 0; field_count < num_fields; ++field_count)
82 				{
83 					Anope::string column = (fields[field_count].name ? fields[field_count].name : "");
84 					Anope::string data = (row[field_count] ? row[field_count] : "");
85 
86 					items[column] = data;
87 				}
88 
89 				this->entries.push_back(items);
90 			}
91 		}
92 	}
93 
MySQLResult(const Query & q,const Anope::string & fq,const Anope::string & err)94 	MySQLResult(const Query &q, const Anope::string &fq, const Anope::string &err) : Result(0, q, fq, err), res(NULL)
95 	{
96 	}
97 
~MySQLResult()98 	~MySQLResult()
99 	{
100 		if (this->res)
101 			mysql_free_result(this->res);
102 	}
103 };
104 
105 /** A MySQL connection, there can be multiple
106  */
107 class MySQLService : public Provider
108 {
109 	std::map<Anope::string, std::set<Anope::string> > active_schema;
110 
111 	Anope::string database;
112 	Anope::string server;
113 	Anope::string user;
114 	Anope::string password;
115 	int port;
116 
117 	MYSQL *sql;
118 
119 	/** Escape a query.
120 	 * Note the mutex must be held!
121 	 */
122 	Anope::string Escape(const Anope::string &query);
123 
124  public:
125 	/* Locked by the SQL thread when a query is pending on this database,
126 	 * prevents us from deleting a connection while a query is executing
127 	 * in the thread
128 	 */
129 	Mutex Lock;
130 
131 	MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po);
132 
133 	~MySQLService();
134 
135 	void Run(Interface *i, const Query &query) anope_override;
136 
137 	Result RunQuery(const Query &query) anope_override;
138 
139 	std::vector<Query> CreateTable(const Anope::string &table, const Data &data) anope_override;
140 
141 	Query BuildInsert(const Anope::string &table, unsigned int id, Data &data) anope_override;
142 
143 	Query GetTables(const Anope::string &prefix) anope_override;
144 
145 	void Connect();
146 
147 	bool CheckConnection();
148 
149 	Anope::string BuildQuery(const Query &q);
150 
151 	Anope::string FromUnixtime(time_t);
152 };
153 
154 /** The SQL thread used to execute queries
155  */
156 class DispatcherThread : public Thread, public Condition
157 {
158  public:
DispatcherThread()159 	DispatcherThread() : Thread() { }
160 
161 	void Run() anope_override;
162 };
163 
164 class ModuleSQL;
165 static ModuleSQL *me;
166 class ModuleSQL : public Module, public Pipe
167 {
168 	/* SQL connections */
169 	std::map<Anope::string, MySQLService *> MySQLServices;
170  public:
171 	/* Pending query requests */
172 	std::deque<QueryRequest> QueryRequests;
173 	/* Pending finished requests with results */
174 	std::deque<QueryResult> FinishedRequests;
175 	/* The thread used to execute queries */
176 	DispatcherThread *DThread;
177 
ModuleSQL(const Anope::string & modname,const Anope::string & creator)178 	ModuleSQL(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR)
179 	{
180 		me = this;
181 
182 
183 		DThread = new DispatcherThread();
184 		DThread->Start();
185 	}
186 
~ModuleSQL()187 	~ModuleSQL()
188 	{
189 		for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end(); ++it)
190 			delete it->second;
191 		MySQLServices.clear();
192 
193 		DThread->SetExitState();
194 		DThread->Wakeup();
195 		DThread->Join();
196 		delete DThread;
197 	}
198 
OnReload(Configuration::Conf * conf)199 	void OnReload(Configuration::Conf *conf) anope_override
200 	{
201 		Configuration::Block *config = conf->GetModule(this);
202 
203 		for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end();)
204 		{
205 			const Anope::string &cname = it->first;
206 			MySQLService *s = it->second;
207 			int i;
208 
209 			++it;
210 
211 			for (i = 0; i < config->CountBlock("mysql"); ++i)
212 				if (config->GetBlock("mysql", i)->Get<const Anope::string>("name", "mysql/main") == cname)
213 					break;
214 
215 			if (i == config->CountBlock("mysql"))
216 			{
217 				Log(LOG_NORMAL, "mysql") << "MySQL: Removing server connection " << cname;
218 
219 				delete s;
220 				this->MySQLServices.erase(cname);
221 			}
222 		}
223 
224 		for (int i = 0; i < config->CountBlock("mysql"); ++i)
225 		{
226 			Configuration::Block *block = config->GetBlock("mysql", i);
227 			const Anope::string &connname = block->Get<const Anope::string>("name", "mysql/main");
228 
229 			if (this->MySQLServices.find(connname) == this->MySQLServices.end())
230 			{
231 				const Anope::string &database = block->Get<const Anope::string>("database", "anope");
232 				const Anope::string &server = block->Get<const Anope::string>("server", "127.0.0.1");
233 				const Anope::string &user = block->Get<const Anope::string>("username", "anope");
234 				const Anope::string &password = block->Get<const Anope::string>("password");
235 				int port = block->Get<int>("port", "3306");
236 
237 				try
238 				{
239 					MySQLService *ss = new MySQLService(this, connname, database, server, user, password, port);
240 					this->MySQLServices.insert(std::make_pair(connname, ss));
241 
242 					Log(LOG_NORMAL, "mysql") << "MySQL: Successfully connected to server " << connname << " (" << server << ")";
243 				}
244 				catch (const SQL::Exception &ex)
245 				{
246 					Log(LOG_NORMAL, "mysql") << "MySQL: " << ex.GetReason();
247 				}
248 			}
249 		}
250 	}
251 
OnModuleUnload(User *,Module * m)252 	void OnModuleUnload(User *, Module *m) anope_override
253 	{
254 		this->DThread->Lock();
255 
256 		for (unsigned i = this->QueryRequests.size(); i > 0; --i)
257 		{
258 			QueryRequest &r = this->QueryRequests[i - 1];
259 
260 			if (r.sqlinterface && r.sqlinterface->owner == m)
261 			{
262 				if (i == 1)
263 				{
264 					r.service->Lock.Lock();
265 					r.service->Lock.Unlock();
266 				}
267 
268 				this->QueryRequests.erase(this->QueryRequests.begin() + i - 1);
269 			}
270 		}
271 
272 		this->DThread->Unlock();
273 
274 		this->OnNotify();
275 	}
276 
OnNotify()277 	void OnNotify() anope_override
278 	{
279 		this->DThread->Lock();
280 		std::deque<QueryResult> finishedRequests = this->FinishedRequests;
281 		this->FinishedRequests.clear();
282 		this->DThread->Unlock();
283 
284 		for (std::deque<QueryResult>::const_iterator it = finishedRequests.begin(), it_end = finishedRequests.end(); it != it_end; ++it)
285 		{
286 			const QueryResult &qr = *it;
287 
288 			if (!qr.sqlinterface)
289 				throw SQL::Exception("NULL qr.sqlinterface in MySQLPipe::OnNotify() ?");
290 
291 			if (qr.result.GetError().empty())
292 				qr.sqlinterface->OnResult(qr.result);
293 			else
294 				qr.sqlinterface->OnError(qr.result);
295 		}
296 	}
297 };
298 
MySQLService(Module * o,const Anope::string & n,const Anope::string & d,const Anope::string & s,const Anope::string & u,const Anope::string & p,int po)299 MySQLService::MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po)
300 : Provider(o, n), database(d), server(s), user(u), password(p), port(po), sql(NULL)
301 {
302 	Connect();
303 }
304 
~MySQLService()305 MySQLService::~MySQLService()
306 {
307 	me->DThread->Lock();
308 	this->Lock.Lock();
309 	mysql_close(this->sql);
310 	this->sql = NULL;
311 
312 	for (unsigned i = me->QueryRequests.size(); i > 0; --i)
313 	{
314 		QueryRequest &r = me->QueryRequests[i - 1];
315 
316 		if (r.service == this)
317 		{
318 			if (r.sqlinterface)
319 				r.sqlinterface->OnError(Result(0, r.query, "SQL Interface is going away"));
320 			me->QueryRequests.erase(me->QueryRequests.begin() + i - 1);
321 		}
322 	}
323 	this->Lock.Unlock();
324 	me->DThread->Unlock();
325 }
326 
Run(Interface * i,const Query & query)327 void MySQLService::Run(Interface *i, const Query &query)
328 {
329 	me->DThread->Lock();
330 	me->QueryRequests.push_back(QueryRequest(this, i, query));
331 	me->DThread->Unlock();
332 	me->DThread->Wakeup();
333 }
334 
RunQuery(const Query & query)335 Result MySQLService::RunQuery(const Query &query)
336 {
337 	this->Lock.Lock();
338 
339 	Anope::string real_query = this->BuildQuery(query);
340 
341 	if (this->CheckConnection() && !mysql_real_query(this->sql, real_query.c_str(), real_query.length()))
342 	{
343 		MYSQL_RES *res = mysql_store_result(this->sql);
344 		unsigned int id = mysql_insert_id(this->sql);
345 
346 		/* because we enabled CLIENT_MULTI_RESULTS in our options
347 		 * a multiple statement or a procedure call can return
348 		 * multiple result sets.
349 		 * we must process them all before the next query.
350 		 */
351 
352 		while (!mysql_next_result(this->sql))
353 			mysql_free_result(mysql_store_result(this->sql));
354 
355 		this->Lock.Unlock();
356 		return MySQLResult(id, query, real_query, res);
357 	}
358 	else
359 	{
360 		Anope::string error = mysql_error(this->sql);
361 		this->Lock.Unlock();
362 		return MySQLResult(query, real_query, error);
363 	}
364 }
365 
CreateTable(const Anope::string & table,const Data & data)366 std::vector<Query> MySQLService::CreateTable(const Anope::string &table, const Data &data)
367 {
368 	std::vector<Query> queries;
369 	std::set<Anope::string> &known_cols = this->active_schema[table];
370 
371 	if (known_cols.empty())
372 	{
373 		Log(LOG_DEBUG) << "m_mysql: Fetching columns for " << table;
374 
375 		Result columns = this->RunQuery("SHOW COLUMNS FROM `" + table + "`");
376 		for (int i = 0; i < columns.Rows(); ++i)
377 		{
378 			const Anope::string &column = columns.Get(i, "Field");
379 
380 			Log(LOG_DEBUG) << "m_mysql: Column #" << i << " for " << table << ": " << column;
381 			known_cols.insert(column);
382 		}
383 	}
384 
385 	if (known_cols.empty())
386 	{
387 		Anope::string query_text = "CREATE TABLE `" + table + "` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT,"
388 			" `timestamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP";
389 		for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
390 		{
391 			known_cols.insert(it->first);
392 
393 			query_text += ", `" + it->first + "` ";
394 			if (data.GetType(it->first) == Serialize::Data::DT_INT)
395 				query_text += "int(11)";
396 			else
397 				query_text += "text";
398 		}
399 		query_text += ", PRIMARY KEY (`id`), KEY `timestamp_idx` (`timestamp`))";
400 		queries.push_back(query_text);
401 	}
402 	else
403 		for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
404 		{
405 			if (known_cols.count(it->first) > 0)
406 				continue;
407 
408 			known_cols.insert(it->first);
409 
410 			Anope::string query_text = "ALTER TABLE `" + table + "` ADD `" + it->first + "` ";
411 			if (data.GetType(it->first) == Serialize::Data::DT_INT)
412 				query_text += "int(11)";
413 			else
414 				query_text += "text";
415 
416 			queries.push_back(query_text);
417 		}
418 
419 	return queries;
420 }
421 
BuildInsert(const Anope::string & table,unsigned int id,Data & data)422 Query MySQLService::BuildInsert(const Anope::string &table, unsigned int id, Data &data)
423 {
424 	/* Empty columns not present in the data set */
425 	const std::set<Anope::string> &known_cols = this->active_schema[table];
426 	for (std::set<Anope::string>::iterator it = known_cols.begin(), it_end = known_cols.end(); it != it_end; ++it)
427 		if (*it != "id" && *it != "timestamp" && data.data.count(*it) == 0)
428 			data[*it] << "";
429 
430 	Anope::string query_text = "INSERT INTO `" + table + "` (`id`";
431 	for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
432 		query_text += ",`" + it->first + "`";
433 	query_text += ") VALUES (" + stringify(id);
434 	for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
435 		query_text += ",@" + it->first + "@";
436 	query_text += ") ON DUPLICATE KEY UPDATE ";
437 	for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
438 		query_text += "`" + it->first + "`=VALUES(`" + it->first + "`),";
439 	query_text.erase(query_text.end() - 1);
440 
441 	Query query(query_text);
442 	for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
443 	{
444 		Anope::string buf;
445 		*it->second >> buf;
446 
447 		bool escape = true;
448 		if (buf.empty())
449 		{
450 			buf = "NULL";
451 			escape = false;
452 		}
453 
454 		query.SetValue(it->first, buf, escape);
455 	}
456 
457 	return query;
458 }
459 
GetTables(const Anope::string & prefix)460 Query MySQLService::GetTables(const Anope::string &prefix)
461 {
462 	return Query("SHOW TABLES LIKE '" + prefix + "%';");
463 }
464 
Connect()465 void MySQLService::Connect()
466 {
467 	this->sql = mysql_init(this->sql);
468 
469 	const unsigned int timeout = 1;
470 	mysql_options(this->sql, MYSQL_OPT_CONNECT_TIMEOUT, reinterpret_cast<const char *>(&timeout));
471 
472 	bool connect = mysql_real_connect(this->sql, this->server.c_str(), this->user.c_str(), this->password.c_str(), this->database.c_str(), this->port, NULL, CLIENT_MULTI_RESULTS);
473 
474 	if (!connect)
475 		throw SQL::Exception("Unable to connect to MySQL service " + this->name + ": " + mysql_error(this->sql));
476 
477 	Log(LOG_DEBUG) << "Successfully connected to MySQL service " << this->name << " at " << this->server << ":" << this->port;
478 }
479 
480 
CheckConnection()481 bool MySQLService::CheckConnection()
482 {
483 	if (!this->sql || mysql_ping(this->sql))
484 	{
485 		try
486 		{
487 			this->Connect();
488 		}
489 		catch (const SQL::Exception &)
490 		{
491 			return false;
492 		}
493 	}
494 
495 	return true;
496 }
497 
Escape(const Anope::string & query)498 Anope::string MySQLService::Escape(const Anope::string &query)
499 {
500 	std::vector<char> buffer(query.length() * 2 + 1);
501 	mysql_real_escape_string(this->sql, &buffer[0], query.c_str(), query.length());
502 	return &buffer[0];
503 }
504 
BuildQuery(const Query & q)505 Anope::string MySQLService::BuildQuery(const Query &q)
506 {
507 	Anope::string real_query = q.query;
508 
509 	for (std::map<Anope::string, QueryData>::const_iterator it = q.parameters.begin(), it_end = q.parameters.end(); it != it_end; ++it)
510 		real_query = real_query.replace_all_cs("@" + it->first + "@", (it->second.escape ? ("'" + this->Escape(it->second.data) + "'") : it->second.data));
511 
512 	return real_query;
513 }
514 
FromUnixtime(time_t t)515 Anope::string MySQLService::FromUnixtime(time_t t)
516 {
517 	return "FROM_UNIXTIME(" + stringify(t) + ")";
518 }
519 
Run()520 void DispatcherThread::Run()
521 {
522 	this->Lock();
523 
524 	while (!this->GetExitState())
525 	{
526 		if (!me->QueryRequests.empty())
527 		{
528 			QueryRequest &r = me->QueryRequests.front();
529 			this->Unlock();
530 
531 			Result sresult = r.service->RunQuery(r.query);
532 
533 			this->Lock();
534 			if (!me->QueryRequests.empty() && me->QueryRequests.front().query == r.query)
535 			{
536 				if (r.sqlinterface)
537 					me->FinishedRequests.push_back(QueryResult(r.sqlinterface, sresult));
538 				me->QueryRequests.pop_front();
539 			}
540 		}
541 		else
542 		{
543 			if (!me->FinishedRequests.empty())
544 				me->Notify();
545 			this->Wait();
546 		}
547 	}
548 
549 	this->Unlock();
550 }
551 
552 MODULE_INIT(ModuleSQL)
553