1 /*
2 * InspIRCd -- Internet Relay Chat Daemon
3 *
4 * Copyright (C) 2016 Adam <Adam@anope.org>
5 * Copyright (C) 2015 Daniel Vassdal <shutter@canternet.org>
6 * Copyright (C) 2013, 2016-2020 Sadie Powell <sadie@witchery.services>
7 * Copyright (C) 2012-2015 Attila Molnar <attilamolnar@hush.com>
8 * Copyright (C) 2012 Robby <robby@chatbelgie.be>
9 * Copyright (C) 2009-2010 Daniel De Graaf <danieldg@inspircd.org>
10 * Copyright (C) 2009 Uli Schlachter <psychon@inspircd.org>
11 * Copyright (C) 2008 Thomas Stagner <aquanight@inspircd.org>
12 * Copyright (C) 2007, 2009-2010 Craig Edwards <brain@inspircd.org>
13 * Copyright (C) 2007, 2009 Dennis Friis <peavey@inspircd.org>
14 * Copyright (C) 2007 Robin Burchell <robin+git@viroteck.net>
15 * Copyright (C) 2006 Oliver Lupton <om@inspircd.org>
16 *
17 * This file is part of InspIRCd. InspIRCd is free software: you can
18 * redistribute it and/or modify it under the terms of the GNU General Public
19 * License as published by the Free Software Foundation, version 2.
20 *
21 * This program is distributed in the hope that it will be useful, but WITHOUT
22 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
23 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
24 * details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 /// $CompilerFlags: -Iexecute("pg_config --includedir" "POSTGRESQL_INCLUDE_DIR")
31 /// $LinkerFlags: -Lexecute("pg_config --libdir" "POSTGRESQL_LIBRARY_DIR") -lpq
32
33 /// $PackageInfo: require_system("arch") postgresql-libs
34 /// $PackageInfo: require_system("centos") postgresql-devel
35 /// $PackageInfo: require_system("darwin") postgresql
36 /// $PackageInfo: require_system("debian") libpq-dev
37 /// $PackageInfo: require_system("ubuntu") libpq-dev
38
39
40 #include "inspircd.h"
41 #include <cstdlib>
42 #include <libpq-fe.h>
43 #include "modules/sql.h"
44
45 /* SQLConn rewritten by peavey to
46 * use EventHandler instead of
47 * BufferedSocket. This is much neater
48 * and gives total control of destroy
49 * and delete of resources.
50 */
51
52 /* Forward declare, so we can have the typedef neatly at the top */
53 class SQLConn;
54 class ModulePgSQL;
55
56 typedef insp::flat_map<std::string, SQLConn*> ConnMap;
57
58 enum SQLstatus
59 {
60 // The connection has died.
61 DEAD,
62
63 // Connecting and wants read event.
64 CREAD,
65
66 // Connecting and wants write event.
67 CWRITE,
68
69 // Connected/working and wants read event.
70 WREAD,
71
72 // Connected/working and wants write event.
73 WWRITE
74 };
75
76 class ReconnectTimer : public Timer
77 {
78 private:
79 ModulePgSQL* mod;
80 public:
ReconnectTimer(ModulePgSQL * m)81 ReconnectTimer(ModulePgSQL* m) : Timer(5, false), mod(m)
82 {
83 }
84 bool Tick(time_t TIME) CXX11_OVERRIDE;
85 };
86
87 struct QueueItem
88 {
89 SQL::Query* c;
90 std::string q;
QueueItemQueueItem91 QueueItem(SQL::Query* C, const std::string& Q) : c(C), q(Q) {}
92 };
93
94 /** PgSQLresult is a subclass of the mostly-pure-virtual class SQLresult.
95 * All SQL providers must create their own subclass and define it's methods using that
96 * database library's data retrieval functions. The aim is to avoid a slow and inefficient process
97 * of converting all data to a common format before it reaches the result structure. This way
98 * data is passes to the module nearly as directly as if it was using the API directly itself.
99 */
100
101 class PgSQLresult : public SQL::Result
102 {
103 PGresult* res;
104 int currentrow;
105 int rows;
106 std::vector<std::string> colnames;
107
getColNames()108 void getColNames()
109 {
110 colnames.resize(PQnfields(res));
111 for(unsigned int i=0; i < colnames.size(); i++)
112 {
113 colnames[i] = PQfname(res, i);
114 }
115 }
116 public:
PgSQLresult(PGresult * result)117 PgSQLresult(PGresult* result) : res(result), currentrow(0)
118 {
119 rows = PQntuples(res);
120 if (!rows)
121 rows = ConvToNum<int>(PQcmdTuples(res));
122 }
123
~PgSQLresult()124 ~PgSQLresult()
125 {
126 PQclear(res);
127 }
128
Rows()129 int Rows() CXX11_OVERRIDE
130 {
131 return rows;
132 }
133
GetCols(std::vector<std::string> & result)134 void GetCols(std::vector<std::string>& result) CXX11_OVERRIDE
135 {
136 if (colnames.empty())
137 getColNames();
138 result = colnames;
139 }
140
HasColumn(const std::string & column,size_t & index)141 bool HasColumn(const std::string& column, size_t& index) CXX11_OVERRIDE
142 {
143 if (colnames.empty())
144 getColNames();
145
146 for (size_t i = 0; i < colnames.size(); ++i)
147 {
148 if (colnames[i] == column)
149 {
150 index = i;
151 return true;
152 }
153 }
154 return false;
155 }
156
GetValue(int row,int column)157 SQL::Field GetValue(int row, int column)
158 {
159 char* v = PQgetvalue(res, row, column);
160 if (!v || PQgetisnull(res, row, column))
161 return SQL::Field();
162
163 return SQL::Field(std::string(v, PQgetlength(res, row, column)));
164 }
165
GetRow(SQL::Row & result)166 bool GetRow(SQL::Row& result) CXX11_OVERRIDE
167 {
168 if (currentrow >= PQntuples(res))
169 return false;
170 int ncols = PQnfields(res);
171
172 for(int i = 0; i < ncols; i++)
173 {
174 result.push_back(GetValue(currentrow, i));
175 }
176 currentrow++;
177
178 return true;
179 }
180 };
181
182 /** SQLConn represents one SQL session.
183 */
184 class SQLConn : public SQL::Provider, public EventHandler
185 {
186 public:
187 reference<ConfigTag> conf; /* The <database> entry */
188 std::deque<QueueItem> queue;
189 PGconn* sql; /* PgSQL database connection handle */
190 SQLstatus status; /* PgSQL database connection status */
191 QueueItem qinprog; /* If there is currently a query in progress */
192
SQLConn(Module * Creator,ConfigTag * tag)193 SQLConn(Module* Creator, ConfigTag* tag)
194 : SQL::Provider(Creator, tag->getString("id"))
195 , conf(tag)
196 , sql(NULL)
197 , status(CWRITE)
198 , qinprog(NULL, "")
199 {
200 if (!DoConnect())
201 DelayReconnect();
202 }
203
cull()204 CullResult cull() CXX11_OVERRIDE
205 {
206 this->SQL::Provider::cull();
207 ServerInstance->Modules->DelService(*this);
208 return this->EventHandler::cull();
209 }
210
~SQLConn()211 ~SQLConn()
212 {
213 SQL::Error err(SQL::BAD_DBID);
214 if (qinprog.c)
215 {
216 qinprog.c->OnError(err);
217 delete qinprog.c;
218 }
219 for(std::deque<QueueItem>::iterator i = queue.begin(); i != queue.end(); i++)
220 {
221 SQL::Query* q = i->c;
222 q->OnError(err);
223 delete q;
224 }
225 Close();
226 }
227
OnEventHandlerRead()228 void OnEventHandlerRead() CXX11_OVERRIDE
229 {
230 DoEvent();
231 }
232
OnEventHandlerWrite()233 void OnEventHandlerWrite() CXX11_OVERRIDE
234 {
235 DoEvent();
236 }
237
OnEventHandlerError(int errornum)238 void OnEventHandlerError(int errornum) CXX11_OVERRIDE
239 {
240 DelayReconnect();
241 }
242
GetDSN()243 std::string GetDSN()
244 {
245 std::ostringstream conninfo("connect_timeout = '5'");
246 std::string item;
247
248 if (conf->readString("host", item))
249 conninfo << " host = '" << item << "'";
250
251 if (conf->readString("port", item))
252 conninfo << " port = '" << item << "'";
253
254 if (conf->readString("name", item))
255 conninfo << " dbname = '" << item << "'";
256
257 if (conf->readString("user", item))
258 conninfo << " user = '" << item << "'";
259
260 if (conf->readString("pass", item))
261 conninfo << " password = '" << item << "'";
262
263 if (conf->getBool("ssl"))
264 conninfo << " sslmode = 'require'";
265 else
266 conninfo << " sslmode = 'disable'";
267
268 return conninfo.str();
269 }
270
HandleConnectError(const char * reason)271 bool HandleConnectError(const char* reason)
272 {
273 ServerInstance->Logs->Log(MODNAME, LOG_DEFAULT, "Could not connect to the \"%s\" database: %s",
274 GetId().c_str(), reason);
275 return false;
276 }
277
DoConnect()278 bool DoConnect()
279 {
280 sql = PQconnectStart(GetDSN().c_str());
281 if (!sql)
282 return HandleConnectError("PQconnectStart returned NULL");
283
284 if(PQstatus(sql) == CONNECTION_BAD)
285 return HandleConnectError("connection status is bad");
286
287 if(PQsetnonblocking(sql, 1) == -1)
288 return HandleConnectError("unable to mark fd as non-blocking");
289
290 /* OK, we've initialised the connection, now to get it hooked into the socket engine
291 * and then start polling it.
292 */
293 SetFd(PQsocket(sql));
294 if(!HasFd())
295 return HandleConnectError("PQsocket returned an invalid fd");
296
297 if (!SocketEngine::AddFd(this, FD_WANT_NO_WRITE | FD_WANT_NO_READ))
298 return HandleConnectError("could not add the pgsql socket to the socket engine");
299
300 /* Socket all hooked into the engine, now to tell PgSQL to start connecting */
301 if (!DoPoll())
302 return HandleConnectError("could not poll the connection state");
303
304 return true;
305 }
306
DoPoll()307 bool DoPoll()
308 {
309 switch(PQconnectPoll(sql))
310 {
311 case PGRES_POLLING_WRITING:
312 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_WRITE | FD_WANT_NO_READ);
313 status = CWRITE;
314 return true;
315 case PGRES_POLLING_READING:
316 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
317 status = CREAD;
318 return true;
319 case PGRES_POLLING_FAILED:
320 SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
321 status = DEAD;
322 return false;
323 case PGRES_POLLING_OK:
324 SocketEngine::ChangeEventMask(this, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
325 status = WWRITE;
326 DoConnectedPoll();
327 return true;
328 default:
329 return true;
330 }
331 }
332
DoConnectedPoll()333 void DoConnectedPoll()
334 {
335 restart:
336 while (qinprog.q.empty() && !queue.empty())
337 {
338 /* There's no query currently in progress, and there's queries in the queue. */
339 DoQuery(queue.front());
340 queue.pop_front();
341 }
342
343 if (PQconsumeInput(sql))
344 {
345 if (PQisBusy(sql))
346 {
347 /* Nothing happens here */
348 }
349 else if (qinprog.c)
350 {
351 /* Fetch the result.. */
352 PGresult* result = PQgetResult(sql);
353
354 /* PgSQL would allow a query string to be sent which has multiple
355 * queries in it, this isn't portable across database backends and
356 * we don't want modules doing it. But just in case we make sure we
357 * drain any results there are and just use the last one.
358 * If the module devs are behaving there will only be one result.
359 */
360 while (PGresult* temp = PQgetResult(sql))
361 {
362 PQclear(result);
363 result = temp;
364 }
365
366 /* ..and the result */
367 PgSQLresult reply(result);
368 switch(PQresultStatus(result))
369 {
370 case PGRES_EMPTY_QUERY:
371 case PGRES_BAD_RESPONSE:
372 case PGRES_FATAL_ERROR:
373 {
374 SQL::Error err(SQL::QREPLY_FAIL, PQresultErrorMessage(result));
375 qinprog.c->OnError(err);
376 break;
377 }
378 default:
379 /* Other values are not errors */
380 qinprog.c->OnResult(reply);
381 }
382
383 delete qinprog.c;
384 qinprog = QueueItem(NULL, "");
385 goto restart;
386 }
387 else
388 {
389 qinprog.q.clear();
390 }
391 }
392 else
393 {
394 /* I think we'll assume this means the server died...it might not,
395 * but I think that any error serious enough we actually get here
396 * deserves to reconnect [/excuse]
397 * Returning true so the core doesn't try and close the connection.
398 */
399 DelayReconnect();
400 }
401 }
402
403 void DelayReconnect();
404
DoEvent()405 void DoEvent()
406 {
407 if((status == CREAD) || (status == CWRITE))
408 {
409 DoPoll();
410 }
411 else if (status == WREAD || status == WWRITE)
412 {
413 DoConnectedPoll();
414 }
415 }
416
Submit(SQL::Query * req,const std::string & q)417 void Submit(SQL::Query *req, const std::string& q) CXX11_OVERRIDE
418 {
419 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "Executing PostgreSQL query: " + q);
420 if (qinprog.q.empty())
421 {
422 DoQuery(QueueItem(req,q));
423 }
424 else
425 {
426 // wait your turn.
427 queue.push_back(QueueItem(req,q));
428 }
429 }
430
Submit(SQL::Query * req,const std::string & q,const SQL::ParamList & p)431 void Submit(SQL::Query *req, const std::string& q, const SQL::ParamList& p) CXX11_OVERRIDE
432 {
433 std::string res;
434 unsigned int param = 0;
435 for(std::string::size_type i = 0; i < q.length(); i++)
436 {
437 if (q[i] != '?')
438 res.push_back(q[i]);
439 else
440 {
441 if (param < p.size())
442 {
443 std::string parm = p[param++];
444 std::vector<char> buffer(parm.length() * 2 + 1);
445 int error;
446 size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
447 if (error)
448 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
449 res.append(&buffer[0], escapedsize);
450 }
451 }
452 }
453 Submit(req, res);
454 }
455
Submit(SQL::Query * req,const std::string & q,const SQL::ParamMap & p)456 void Submit(SQL::Query *req, const std::string& q, const SQL::ParamMap& p) CXX11_OVERRIDE
457 {
458 std::string res;
459 for(std::string::size_type i = 0; i < q.length(); i++)
460 {
461 if (q[i] != '$')
462 res.push_back(q[i]);
463 else
464 {
465 std::string field;
466 i++;
467 while (i < q.length() && isalnum(q[i]))
468 field.push_back(q[i++]);
469 i--;
470
471 SQL::ParamMap::const_iterator it = p.find(field);
472 if (it != p.end())
473 {
474 std::string parm = it->second;
475 std::vector<char> buffer(parm.length() * 2 + 1);
476 int error;
477 size_t escapedsize = PQescapeStringConn(sql, &buffer[0], parm.data(), parm.length(), &error);
478 if (error)
479 ServerInstance->Logs->Log(MODNAME, LOG_DEBUG, "BUG: Apparently PQescapeStringConn() failed");
480 res.append(&buffer[0], escapedsize);
481 }
482 }
483 }
484 Submit(req, res);
485 }
486
DoQuery(const QueueItem & req)487 void DoQuery(const QueueItem& req)
488 {
489 if (status != WREAD && status != WWRITE)
490 {
491 // whoops, not connected...
492 SQL::Error err(SQL::BAD_CONN);
493 req.c->OnError(err);
494 delete req.c;
495 return;
496 }
497
498 if(PQsendQuery(sql, req.q.c_str()))
499 {
500 qinprog = req;
501 }
502 else
503 {
504 SQL::Error err(SQL::QSEND_FAIL, PQerrorMessage(sql));
505 req.c->OnError(err);
506 delete req.c;
507 }
508 }
509
Close()510 void Close()
511 {
512 status = DEAD;
513
514 if (HasFd() && SocketEngine::HasFd(GetFd()))
515 SocketEngine::DelFd(this);
516
517 if(sql)
518 {
519 PQfinish(sql);
520 sql = NULL;
521 }
522 }
523 };
524
525 class ModulePgSQL : public Module
526 {
527 public:
528 ConnMap connections;
529 ReconnectTimer* retimer;
530
ModulePgSQL()531 ModulePgSQL()
532 : retimer(NULL)
533 {
534 }
535
~ModulePgSQL()536 ~ModulePgSQL()
537 {
538 delete retimer;
539 ClearAllConnections();
540 }
541
ReadConfig(ConfigStatus & status)542 void ReadConfig(ConfigStatus& status) CXX11_OVERRIDE
543 {
544 ReadConf();
545 }
546
ReadConf()547 void ReadConf()
548 {
549 ConnMap conns;
550 ConfigTagList tags = ServerInstance->Config->ConfTags("database");
551 for(ConfigIter i = tags.first; i != tags.second; i++)
552 {
553 if (!stdalgo::string::equalsci(i->second->getString("module"), "pgsql"))
554 continue;
555 std::string id = i->second->getString("id");
556 ConnMap::iterator curr = connections.find(id);
557 if (curr == connections.end())
558 {
559 SQLConn* conn = new SQLConn(this, i->second);
560 if (conn->status != DEAD)
561 {
562 conns.insert(std::make_pair(id, conn));
563 ServerInstance->Modules->AddService(*conn);
564 }
565 // If the connection is dead it has already been queued for culling
566 // at the end of the main loop so we don't need to delete it here.
567 }
568 else
569 {
570 conns.insert(*curr);
571 connections.erase(curr);
572 }
573 }
574 ClearAllConnections();
575 conns.swap(connections);
576 }
577
ClearAllConnections()578 void ClearAllConnections()
579 {
580 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
581 {
582 i->second->cull();
583 delete i->second;
584 }
585 connections.clear();
586 }
587
OnUnloadModule(Module * mod)588 void OnUnloadModule(Module* mod) CXX11_OVERRIDE
589 {
590 SQL::Error err(SQL::BAD_DBID);
591 for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
592 {
593 SQLConn* conn = i->second;
594 if (conn->qinprog.c && conn->qinprog.c->creator == mod)
595 {
596 conn->qinprog.c->OnError(err);
597 delete conn->qinprog.c;
598 conn->qinprog.c = NULL;
599 }
600 std::deque<QueueItem>::iterator j = conn->queue.begin();
601 while (j != conn->queue.end())
602 {
603 SQL::Query* q = j->c;
604 if (q->creator == mod)
605 {
606 q->OnError(err);
607 delete q;
608 j = conn->queue.erase(j);
609 }
610 else
611 j++;
612 }
613 }
614 }
615
GetVersion()616 Version GetVersion() CXX11_OVERRIDE
617 {
618 return Version("Provides the ability for SQL modules to query a PostgreSQL database.", VF_VENDOR);
619 }
620 };
621
Tick(time_t time)622 bool ReconnectTimer::Tick(time_t time)
623 {
624 mod->retimer = NULL;
625 mod->ReadConf();
626 delete this;
627 return false;
628 }
629
DelayReconnect()630 void SQLConn::DelayReconnect()
631 {
632 status = DEAD;
633 ModulePgSQL* mod = (ModulePgSQL*)(Module*)creator;
634
635 ConnMap::iterator it = mod->connections.find(conf->getString("id"));
636 if (it != mod->connections.end())
637 mod->connections.erase(it);
638 ServerInstance->GlobalCulls.AddItem((EventHandler*)this);
639 if (!mod->retimer)
640 {
641 mod->retimer = new ReconnectTimer(mod);
642 ServerInstance->Timers.AddTimer(mod->retimer);
643 }
644 }
645
646 MODULE_INIT(ModulePgSQL)
647