1 /*
2  * InspIRCd -- Internet Relay Chat Daemon
3  *
4  *   Copyright (C) 2016 Adam <Adam@anope.org>
5  *   Copyright (C) 2015 Daniel Vassdal <shutter@canternet.org>
6  *   Copyright (C) 2013, 2016-2020 Sadie Powell <sadie@witchery.services>
7  *   Copyright (C) 2012-2015 Attila Molnar <attilamolnar@hush.com>
8  *   Copyright (C) 2012 Robby <robby@chatbelgie.be>
9  *   Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
10  *   Copyright (C) 2009 Uli Schlachter <psychon@inspircd.org>
11  *   Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
12  *   Copyright (C) 2007, 2009-2010 Craig Edwards <brain@inspircd.org>
13  *   Copyright (C) 2007, 2009 Dennis Friis <peavey@inspircd.org>
14  *   Copyright (C) 2007 Robin Burchell <robin+git@viroteck.net>
15  *   Copyright (C) 2006 Oliver Lupton <om@inspircd.org>
16  *
17  * This file is part of InspIRCd.  InspIRCd is free software: you can
18  * redistribute it and/or modify it under the terms of the GNU General Public
19  * License as published by the Free Software Foundation, version 2.
20  *
21  * This program is distributed in the hope that it will be useful, but WITHOUT
22  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
23  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
24  * details.
25  *
26  * You should have received a copy of the GNU General Public License
27  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
28  */
29 
30 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
31 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
32 
33 /// $PackageInfo: require_system("arch") postgresql-libs
34 /// $PackageInfo: require_system("centos") postgresql-devel
35 /// $PackageInfo: require_system("darwin") postgresql
36 /// $PackageInfo: require_system("debian") libpq-dev
37 /// $PackageInfo: require_system("ubuntu") libpq-dev
38 
39 
40 #include "inspircd.h"
41 #include <cstdlib>
42 #include <libpq-fe.h>
43 #include "modules/sql.h"
44 
45 /* SQLConn rewritten by peavey to
46  * use EventHandler instead of
47  * BufferedSocket. This is much neater
48  * and gives total control of destroy
49  * and delete of resources.
50  */
51 
52 /* Forward declare, so we can have the typedef neatly at the top */
53 class SQLConn;
54 class ModulePgSQL;
55 
56 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
57 
58 enum SQLstatus
59 {
60 	// The connection has died.
61 	DEAD,
62 
63 	// Connecting and wants read event.
64 	CREAD,
65 
66 	// Connecting and wants write event.
67 	CWRITE,
68 
69 	// Connected/working and wants read event.
70 	WREAD,
71 
72 	// Connected/working and wants write event.
73 	WWRITE
74 };
75 
76 class ReconnectTimer : public Timer
77 {
78  private:
79 	ModulePgSQL* mod;
80  public:
ReconnectTimer(ModulePgSQL * m)81 	ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
82 	{
83 	}
84 	bool Tick(time_t TIME) CXX11_OVERRIDE;
85 };
86 
87 struct QueueItem
88 {
89 	SQL::Query* c;
90 	std::string q;
QueueItemQueueItem91 	QueueItem(SQL::Query* C, const std::string& Q) : c(C), q(Q) {}
92 };
93 
94 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
95  * All SQL providers must create their own subclass and define it's methods using that
96  * database library's data retrieval functions. The aim is to avoid a slow and inefficient process
97  * of converting all data to a common format before it reaches the result structure. This way
98  * data is passes to the module nearly as directly as if it was using the API directly itself.
99  */
100 
101 class PgSQLresult : public SQL::Result
102 {
103 	PGresult* res;
104 	int currentrow;
105 	int rows;
106 	std::vector<std::string> colnames;
107 
getColNames()108 	void getColNames()
109 	{
110 		colnames.resize(PQnfields(res));
111 		for(unsigned int i=0; i < colnames.size(); i++)
112 		{
113 			colnames[i] = PQfname(res, i);
114 		}
115 	}
116  public:
PgSQLresult(PGresult * result)117 	PgSQLresult(PGresult* result) : res(result), currentrow(0)
118 	{
119 		rows = PQntuples(res);
120 		if (!rows)
121 			rows = ConvToNum<int>(PQcmdTuples(res));
122 	}
123 
~PgSQLresult()124 	~PgSQLresult()
125 	{
126 		PQclear(res);
127 	}
128 
Rows()129 	int Rows() CXX11_OVERRIDE
130 	{
131 		return rows;
132 	}
133 
GetCols(std::vector<std::string> & result)134 	void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
135 	{
136 		if (colnames.empty())
137 			getColNames();
138 		result = colnames;
139 	}
140 
HasColumn(const std::string & column,size_t & index)141 	bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
142 	{
143 		if (colnames.empty())
144 			getColNames();
145 
146 		for (size_t i = 0; i < colnames.size(); ++i)
147 		{
148 			if (colnames[i] == column)
149 			{
150 				index = i;
151 				return true;
152 			}
153 		}
154 		return false;
155 	}
156 
GetValue(int row,int column)157 	SQL::Field GetValue(int row, int column)
158 	{
159 		char* v = PQgetvalue(res, row, column);
160 		if (!v || PQgetisnull(res, row, column))
161 			return SQL::Field();
162 
163 		return SQL::Field(std::string(v, PQgetlength(res, row, column)));
164 	}
165 
GetRow(SQL::Row & result)166 	bool GetRow(SQL::Row& result) CXX11_OVERRIDE
167 	{
168 		if (currentrow >= PQntuples(res))
169 			return false;
170 		int ncols = PQnfields(res);
171 
172 		for(int i = 0; i < ncols; i++)
173 		{
174 			result.push_back(GetValue(currentrow, i));
175 		}
176 		currentrow++;
177 
178 		return true;
179 	}
180 };
181 
182 /** SQLConn represents one SQL session.
183  */
184 class SQLConn : public SQL::Provider, public EventHandler
185 {
186  public:
187 	reference<ConfigTag> conf;	/* The <database> entry */
188 	std::deque<QueueItem> queue;
189 	PGconn*			sql;		/* PgSQL database connection handle */
190 	SQLstatus		status;		/* PgSQL database connection status */
191 	QueueItem		qinprog;	/* If there is currently a query in progress */
192 
SQLConn(Module * Creator,ConfigTag * tag)193 	SQLConn(Module* Creator, ConfigTag* tag)
194 		: SQL::Provider(Creator, tag->getString("id"))
195 		, conf(tag)
196 		, sql(NULL)
197 		, status(CWRITE)
198 		, qinprog(NULL, "")
199 	{
200 		if (!DoConnect())
201 			DelayReconnect();
202 	}
203 
cull()204 	CullResult cull() CXX11_OVERRIDE
205 	{
206 		this->SQL::Provider::cull();
207 		ServerInstance->Modules->DelService(*this);
208 		return this->EventHandler::cull();
209 	}
210 
~SQLConn()211 	~SQLConn()
212 	{
213 		SQL::Error err(SQL::BAD_DBID);
214 		if (qinprog.c)
215 		{
216 			qinprog.c->OnError(err);
217 			delete qinprog.c;
218 		}
219 		for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
220 		{
221 			SQL::Query* q = i->c;
222 			q->OnError(err);
223 			delete q;
224 		}
225 		Close();
226 	}
227 
OnEventHandlerRead()228 	void OnEventHandlerRead() CXX11_OVERRIDE
229 	{
230 		DoEvent();
231 	}
232 
OnEventHandlerWrite()233 	void OnEventHandlerWrite() CXX11_OVERRIDE
234 	{
235 		DoEvent();
236 	}
237 
OnEventHandlerError(int errornum)238 	void OnEventHandlerError(int errornum) CXX11_OVERRIDE
239 	{
240 		DelayReconnect();
241 	}
242 
GetDSN()243 	std::string GetDSN()
244 	{
245 		std::ostringstream conninfo("connect_timeout = '5'");
246 		std::string item;
247 
248 		if (conf->readString("host", item))
249 			conninfo << " host = '" << item << "'";
250 
251 		if (conf->readString("port", item))
252 			conninfo << " port = '" << item << "'";
253 
254 		if (conf->readString("name", item))
255 			conninfo << " dbname = '" << item << "'";
256 
257 		if (conf->readString("user", item))
258 			conninfo << " user = '" << item << "'";
259 
260 		if (conf->readString("pass", item))
261 			conninfo << " password = '" << item << "'";
262 
263 		if (conf->getBool("ssl"))
264 			conninfo << " sslmode = 'require'";
265 		else
266 			conninfo << " sslmode = 'disable'";
267 
268 		return conninfo.str();
269 	}
270 
HandleConnectError(const char * reason)271 	bool HandleConnectError(const char* reason)
272 	{
273 		ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "Could not connect to the \"%s\" database: %s",
274 			GetId().c_str(), reason);
275 		return false;
276 	}
277 
DoConnect()278 	bool DoConnect()
279 	{
280 		sql = PQconnectStart(GetDSN().c_str());
281 		if (!sql)
282 			return HandleConnectError("PQconnectStart returned NULL");
283 
284 		if(PQstatus(sql) == CONNECTION_BAD)
285 			return HandleConnectError("connection status is bad");
286 
287 		if(PQsetnonblocking(sql, 1) == -1)
288 			return HandleConnectError("unable to mark fd as non-blocking");
289 
290 		/* OK, we've initialised the connection, now to get it hooked into the socket engine
291 		* and then start polling it.
292 		*/
293 		SetFd(PQsocket(sql));
294 		if(!HasFd())
295 			return HandleConnectError("PQsocket returned an invalid fd");
296 
297 		if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
298 			return HandleConnectError("could not add the pgsql socket to the socket engine");
299 
300 		/* Socket all hooked into the engine, now to tell PgSQL to start connecting */
301 		if (!DoPoll())
302 			return HandleConnectError("could not poll the connection state");
303 
304 		return true;
305 	}
306 
DoPoll()307 	bool DoPoll()
308 	{
309 		switch(PQconnectPoll(sql))
310 		{
311 			case PGRES_POLLING_WRITING:
312 				SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
313 				status = CWRITE;
314 				return true;
315 			case PGRES_POLLING_READING:
316 				SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
317 				status = CREAD;
318 				return true;
319 			case PGRES_POLLING_FAILED:
320 				SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
321 				status = DEAD;
322 				return false;
323 			case PGRES_POLLING_OK:
324 				SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
325 				status = WWRITE;
326 				DoConnectedPoll();
327 				return true;
328 			default:
329 				return true;
330 		}
331 	}
332 
DoConnectedPoll()333 	void DoConnectedPoll()
334 	{
335 restart:
336 		while (qinprog.q.empty() && !queue.empty())
337 		{
338 			/* There's no query currently in progress, and there's queries in the queue. */
339 			DoQuery(queue.front());
340 			queue.pop_front();
341 		}
342 
343 		if (PQconsumeInput(sql))
344 		{
345 			if (PQisBusy(sql))
346 			{
347 				/* Nothing happens here */
348 			}
349 			else if (qinprog.c)
350 			{
351 				/* Fetch the result.. */
352 				PGresult* result = PQgetResult(sql);
353 
354 				/* PgSQL would allow a query string to be sent which has multiple
355 				 * queries in it, this isn't portable across database backends and
356 				 * we don't want modules doing it. But just in case we make sure we
357 				 * drain any results there are and just use the last one.
358 				 * If the module devs are behaving there will only be one result.
359 				 */
360 				while (PGresult* temp = PQgetResult(sql))
361 				{
362 					PQclear(result);
363 					result = temp;
364 				}
365 
366 				/* ..and the result */
367 				PgSQLresult reply(result);
368 				switch(PQresultStatus(result))
369 				{
370 					case PGRES_EMPTY_QUERY:
371 					case PGRES_BAD_RESPONSE:
372 					case PGRES_FATAL_ERROR:
373 					{
374 						SQL::Error err(SQL::QREPLY_FAIL, PQresultErrorMessage(result));
375 						qinprog.c->OnError(err);
376 						break;
377 					}
378 					default:
379 						/* Other values are not errors */
380 						qinprog.c->OnResult(reply);
381 				}
382 
383 				delete qinprog.c;
384 				qinprog = QueueItem(NULL, "");
385 				goto restart;
386 			}
387 			else
388 			{
389 				qinprog.q.clear();
390 			}
391 		}
392 		else
393 		{
394 			/* I think we'll assume this means the server died...it might not,
395 			 * but I think that any error serious enough we actually get here
396 			 * deserves to reconnect [/excuse]
397 			 * Returning true so the core doesn't try and close the connection.
398 			 */
399 			DelayReconnect();
400 		}
401 	}
402 
403 	void DelayReconnect();
404 
DoEvent()405 	void DoEvent()
406 	{
407 		if((status == CREAD) || (status == CWRITE))
408 		{
409 			DoPoll();
410 		}
411 		else if (status == WREAD || status == WWRITE)
412 		{
413 			DoConnectedPoll();
414 		}
415 	}
416 
Submit(SQL::Query * req,const std::string & q)417 	void Submit(SQL::Query *req, const std::string& q) CXX11_OVERRIDE
418 	{
419 		ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing PostgreSQL query: " + q);
420 		if (qinprog.q.empty())
421 		{
422 			DoQuery(QueueItem(req,q));
423 		}
424 		else
425 		{
426 			// wait your turn.
427 			queue.push_back(QueueItem(req,q));
428 		}
429 	}
430 
Submit(SQL::Query * req,const std::string & q,const SQL::ParamList & p)431 	void Submit(SQL::Query *req, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
432 	{
433 		std::string res;
434 		unsigned int param = 0;
435 		for(std::string::size_type i = 0; i < q.length(); i++)
436 		{
437 			if (q[i] != '?')
438 				res.push_back(q[i]);
439 			else
440 			{
441 				if (param < p.size())
442 				{
443 					std::string parm = p[param++];
444 					std::vector<char> buffer(parm.length() * 2 + 1);
445 					int error;
446 					size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
447 					if (error)
448 						ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
449 					res.append(&buffer[0], escapedsize);
450 				}
451 			}
452 		}
453 		Submit(req, res);
454 	}
455 
Submit(SQL::Query * req,const std::string & q,const SQL::ParamMap & p)456 	void Submit(SQL::Query *req, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
457 	{
458 		std::string res;
459 		for(std::string::size_type i = 0; i < q.length(); i++)
460 		{
461 			if (q[i] != '$')
462 				res.push_back(q[i]);
463 			else
464 			{
465 				std::string field;
466 				i++;
467 				while (i < q.length() && isalnum(q[i]))
468 					field.push_back(q[i++]);
469 				i--;
470 
471 				SQL::ParamMap::const_iterator it = p.find(field);
472 				if (it != p.end())
473 				{
474 					std::string parm = it->second;
475 					std::vector<char> buffer(parm.length() * 2 + 1);
476 					int error;
477 					size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
478 					if (error)
479 						ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
480 					res.append(&buffer[0], escapedsize);
481 				}
482 			}
483 		}
484 		Submit(req, res);
485 	}
486 
DoQuery(const QueueItem & req)487 	void DoQuery(const QueueItem& req)
488 	{
489 		if (status != WREAD && status != WWRITE)
490 		{
491 			// whoops, not connected...
492 			SQL::Error err(SQL::BAD_CONN);
493 			req.c->OnError(err);
494 			delete req.c;
495 			return;
496 		}
497 
498 		if(PQsendQuery(sql, req.q.c_str()))
499 		{
500 			qinprog = req;
501 		}
502 		else
503 		{
504 			SQL::Error err(SQL::QSEND_FAIL, PQerrorMessage(sql));
505 			req.c->OnError(err);
506 			delete req.c;
507 		}
508 	}
509 
Close()510 	void Close()
511 	{
512 		status = DEAD;
513 
514 		if (HasFd() && SocketEngine::HasFd(GetFd()))
515 			SocketEngine::DelFd(this);
516 
517 		if(sql)
518 		{
519 			PQfinish(sql);
520 			sql = NULL;
521 		}
522 	}
523 };
524 
525 class ModulePgSQL : public Module
526 {
527  public:
528 	ConnMap connections;
529 	ReconnectTimer* retimer;
530 
ModulePgSQL()531 	ModulePgSQL()
532 		: retimer(NULL)
533 	{
534 	}
535 
~ModulePgSQL()536 	~ModulePgSQL()
537 	{
538 		delete retimer;
539 		ClearAllConnections();
540 	}
541 
ReadConfig(ConfigStatus & status)542 	void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
543 	{
544 		ReadConf();
545 	}
546 
ReadConf()547 	void ReadConf()
548 	{
549 		ConnMap conns;
550 		ConfigTagList tags = ServerInstance->Config->ConfTags("database");
551 		for(ConfigIter i = tags.first; i != tags.second; i++)
552 		{
553 			if (!stdalgo::string::equalsci(i->second->getString("module"), "pgsql"))
554 				continue;
555 			std::string id = i->second->getString("id");
556 			ConnMap::iterator curr = connections.find(id);
557 			if (curr == connections.end())
558 			{
559 				SQLConn* conn = new SQLConn(this, i->second);
560 				if (conn->status != DEAD)
561 				{
562 					conns.insert(std::make_pair(id, conn));
563 					ServerInstance->Modules->AddService(*conn);
564 				}
565 				// If the connection is dead it has already been queued for culling
566 				// at the end of the main loop so we don't need to delete it here.
567 			}
568 			else
569 			{
570 				conns.insert(*curr);
571 				connections.erase(curr);
572 			}
573 		}
574 		ClearAllConnections();
575 		conns.swap(connections);
576 	}
577 
ClearAllConnections()578 	void ClearAllConnections()
579 	{
580 		for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
581 		{
582 			i->second->cull();
583 			delete i->second;
584 		}
585 		connections.clear();
586 	}
587 
OnUnloadModule(Module * mod)588 	void OnUnloadModule(Module* mod) CXX11_OVERRIDE
589 	{
590 		SQL::Error err(SQL::BAD_DBID);
591 		for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
592 		{
593 			SQLConn* conn = i->second;
594 			if (conn->qinprog.c && conn->qinprog.c->creator == mod)
595 			{
596 				conn->qinprog.c->OnError(err);
597 				delete conn->qinprog.c;
598 				conn->qinprog.c = NULL;
599 			}
600 			std::deque<QueueItem>::iterator j = conn->queue.begin();
601 			while (j != conn->queue.end())
602 			{
603 				SQL::Query* q = j->c;
604 				if (q->creator == mod)
605 				{
606 					q->OnError(err);
607 					delete q;
608 					j = conn->queue.erase(j);
609 				}
610 				else
611 					j++;
612 			}
613 		}
614 	}
615 
GetVersion()616 	Version GetVersion() CXX11_OVERRIDE
617 	{
618 		return Version("Provides the ability for SQL modules to query a PostgreSQL database.", VF_VENDOR);
619 	}
620 };
621 
Tick(time_t time)622 bool ReconnectTimer::Tick(time_t time)
623 {
624 	mod->retimer = NULL;
625 	mod->ReadConf();
626 	delete this;
627 	return false;
628 }
629 
DelayReconnect()630 void SQLConn::DelayReconnect()
631 {
632 	status = DEAD;
633 	ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
634 
635 	ConnMap::iterator it = mod->connections.find(conf->getString("id"));
636 	if (it != mod->connections.end())
637 		mod->connections.erase(it);
638 	ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
639 	if (!mod->retimer)
640 	{
641 		mod->retimer = new ReconnectTimer(mod);
642 		ServerInstance->Timers.AddTimer(mod->retimer);
643 	}
644 }
645 
646 MODULE_INIT(ModulePgSQL)
647