1 /*
2  * Copyright (C) Tildeslash Ltd. All rights reserved.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License version 3.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
14  *
15  * In addition, as a special exception, the copyright holders give
16  * permission to link the code of portions of this program with the
17  * OpenSSL library under certain conditions as described in each
18  * individual source file, and distribute linked combinations
19  * including the two.
20  *
21  * You must obey the GNU General Public License in all respects
22  * for all of the code used other than OpenSSL.
23  */
24 
25 
26 #include "Config.h"
27 
28 #include <stdio.h>
29 #include <string.h>
30 
31 #include "URL.h"
32 #include "Thread.h"
33 #include "system/Time.h"
34 #include "Vector.h"
35 #include "ResultSet.h"
36 #include "PreparedStatement.h"
37 #include "Connection.h"
38 #include "ConnectionPool.h"
39 
40 
41 /**
42  * Implementation of the ConnectionPool interface
43  *
44  * @file
45  */
46 
47 
48 /* ----------------------------------------------------------- Definitions */
49 
50 
51 #define T ConnectionPool_T
52 struct ConnectionPool_S {
53         URL_T url;
54         int filled;
55         int doSweep;
56         char *error;
57         Sem_T alarm;
58 	Mutex_T mutex;
59 	Vector_T pool;
60         Thread_T reaper;
61         int sweepInterval;
62 	int maxConnections;
63         volatile int stopped;
64         int connectionTimeout;
65 	int initialConnections;
66 };
67 
68 int ZBDEBUG = false;
69 #ifdef PACKAGE_PROTECTED
70 #pragma GCC visibility push(hidden)
71 #endif
72 void(*AbortHandler)(const char *error) = NULL;
73 #ifdef PACKAGE_PROTECTED
74 #pragma GCC visibility pop
75 #endif
76 
77 
78 /* ------------------------------------------------------- Private methods */
79 
80 
_drainPool(T P)81 static void _drainPool(T P) {
82         while (! Vector_isEmpty(P->pool)) {
83 		Connection_T con = Vector_pop(P->pool);
84 		Connection_free(&con);
85 	}
86 }
87 
88 
_fillPool(T P)89 static bool _fillPool(T P) {
90 	for (int i = 0; i < P->initialConnections; i++) {
91                 Connection_T con = Connection_new(P, &P->error);
92 		if (! con) {
93                         if (i > 0) {
94                                 DEBUG("Failed to fill the pool with initial connections -- %s\n", P->error);
95                                 FREE(P->error);
96                                 return true;
97                         }
98                         return false;
99                 }
100 		Vector_push(P->pool, con);
101 	}
102 	return true;
103 }
104 
105 
_getActive(T P)106 static int _getActive(T P){
107         int i, n = 0, size = Vector_size(P->pool);
108         for (i = 0; i < size; i++)
109                 if (! Connection_isAvailable(Vector_get(P->pool, i)))
110                         n++;
111         return n;
112 }
113 
114 
_reapConnections(T P)115 static int _reapConnections(T P) {
116         int n = 0;
117         int x = Vector_size(P->pool) - _getActive(P) - P->initialConnections;
118         time_t timedout = Time_now() - P->connectionTimeout;
119         for (int i = 0; ((n < x) && (i < Vector_size(P->pool))); i++) {
120                 Connection_T con = Vector_get(P->pool, i);
121                 if (Connection_isAvailable(con)) {
122                         if ((Connection_getLastAccessedTime(con) < timedout) || (! Connection_ping(con))) {
123                                 Vector_remove(P->pool, i);
124                                 Connection_free(&con);
125                                 n++;
126                                 i--;
127                         }
128                 }
129         }
130         return n;
131 }
132 
133 
_doSweep(void * args)134 static void *_doSweep(void *args) {
135         T P = args;
136         struct timespec wait = {};
137         Mutex_lock(P->mutex);
138         while (! P->stopped) {
139                 wait.tv_sec = Time_now() + P->sweepInterval;
140                 Sem_timeWait(P->alarm,  P->mutex, wait);
141                 if (P->stopped) break;
142                 _reapConnections(P);
143         }
144         Mutex_unlock(P->mutex);
145         DEBUG("Reaper thread stopped\n");
146         return NULL;
147 }
148 
149 
150 /* ---------------------------------------------------------------- Public */
151 
152 
ConnectionPool_new(URL_T url)153 T ConnectionPool_new(URL_T url) {
154         T P;
155 	assert(url);
156         System_init();
157 	NEW(P);
158         P->url = url;
159         Sem_init(P->alarm);
160 	Mutex_init(P->mutex);
161 	P->maxConnections = SQL_DEFAULT_MAX_CONNECTIONS;
162         P->pool = Vector_new(SQL_DEFAULT_MAX_CONNECTIONS);
163 	P->initialConnections = SQL_DEFAULT_INIT_CONNECTIONS;
164         P->connectionTimeout = SQL_DEFAULT_CONNECTION_TIMEOUT;
165 	return P;
166 }
167 
168 
ConnectionPool_free(T * P)169 void ConnectionPool_free(T *P) {
170         Vector_T pool;
171 	assert(P && *P);
172         pool = (*P)->pool;
173         if (! (*P)->stopped)
174                 ConnectionPool_stop((*P));
175         Vector_free(&pool);
176 	Mutex_destroy((*P)->mutex);
177         Sem_destroy((*P)->alarm);
178         FREE((*P)->error);
179 	FREE(*P);
180 }
181 
182 
183 /* ------------------------------------------------------------ Properties */
184 
185 
ConnectionPool_getURL(T P)186 URL_T ConnectionPool_getURL(T P) {
187         assert(P);
188         return P->url;
189 }
190 
191 
ConnectionPool_setInitialConnections(T P,int connections)192 void ConnectionPool_setInitialConnections(T P, int connections) {
193         assert(P);
194         assert(connections >= 0);
195         LOCK(P->mutex)
196         {
197                 P->initialConnections = connections;
198         }
199         END_LOCK;
200 }
201 
202 
ConnectionPool_getInitialConnections(T P)203 int ConnectionPool_getInitialConnections(T P) {
204         assert(P);
205         return P->initialConnections;
206 }
207 
208 
ConnectionPool_setMaxConnections(T P,int maxConnections)209 void ConnectionPool_setMaxConnections(T P, int maxConnections) {
210         assert(P);
211         assert(P->initialConnections <= maxConnections);
212         LOCK(P->mutex)
213         {
214                 P->maxConnections = maxConnections;
215         }
216         END_LOCK;
217 }
218 
219 
ConnectionPool_getMaxConnections(T P)220 int ConnectionPool_getMaxConnections(T P) {
221         assert(P);
222         return P->maxConnections;
223 }
224 
225 
ConnectionPool_setConnectionTimeout(T P,int connectionTimeout)226 void ConnectionPool_setConnectionTimeout(T P, int connectionTimeout) {
227         assert(P);
228         assert(connectionTimeout > 0);
229         P->connectionTimeout = connectionTimeout;
230 }
231 
232 
ConnectionPool_getConnectionTimeout(T P)233 int ConnectionPool_getConnectionTimeout(T P) {
234         assert(P);
235         return P->connectionTimeout;
236 }
237 
238 
ConnectionPool_setAbortHandler(T P,void (* abortHandler)(const char * error))239 void ConnectionPool_setAbortHandler(T P, void(*abortHandler)(const char *error)) {
240         assert(P);
241         AbortHandler = abortHandler;
242 }
243 
244 
ConnectionPool_setReaper(T P,int sweepInterval)245 void ConnectionPool_setReaper(T P, int sweepInterval) {
246         assert(P);
247         assert(sweepInterval>0);
248         LOCK(P->mutex)
249         {
250                 P->doSweep = true;
251                 P->sweepInterval = sweepInterval;
252         }
253         END_LOCK;
254 }
255 
256 
ConnectionPool_size(T P)257 int ConnectionPool_size(T P) {
258         assert(P);
259         return Vector_size(P->pool);
260 }
261 
262 
ConnectionPool_active(T P)263 int ConnectionPool_active(T P) {
264         int n = 0;
265         assert(P);
266         LOCK(P->mutex)
267         {
268                 n = _getActive(P);
269         }
270         END_LOCK;
271         return n;
272 }
273 
274 
275 /* -------------------------------------------------------- Public methods */
276 
277 
ConnectionPool_start(T P)278 void ConnectionPool_start(T P) {
279         assert(P);
280         LOCK(P->mutex)
281         {
282                 P->stopped = false;
283                 if (! P->filled) {
284                         P->filled = _fillPool(P);
285                         if (P->filled) {
286                                 if (P->doSweep) {
287                                         DEBUG("Starting Database reaper thread\n");
288                                         Thread_create(P->reaper, _doSweep, P);
289                                 }
290                         }
291                 }
292         }
293         END_LOCK;
294         if (! P->filled)
295                 THROW(SQLException, "Failed to start connection pool -- %s", P->error);
296 }
297 
298 
ConnectionPool_stop(T P)299 void ConnectionPool_stop(T P) {
300         int stopSweep = false;
301         assert(P);
302         LOCK(P->mutex)
303         {
304                 P->stopped = true;
305                 if (P->filled) {
306                         _drainPool(P);
307                         P->filled = false;
308                         stopSweep = (P->doSweep && P->reaper);
309                 }
310         }
311         END_LOCK;
312         if (stopSweep) {
313                 DEBUG("Stopping Database reaper thread...\n");
314                 Sem_signal(P->alarm);
315                 Thread_join(P->reaper);
316         }
317 }
318 
319 
ConnectionPool_getConnection(T P)320 Connection_T ConnectionPool_getConnection(T P) {
321 	Connection_T con = NULL;
322 	assert(P);
323 	LOCK(P->mutex)
324         {
325                 int size = Vector_size(P->pool);
326                 for (int i = 0; i < size; i++) {
327                         con = Vector_get(P->pool, i);
328                         if (Connection_isAvailable(con)) {
329                                 if (Connection_ping(con)) {
330                                         Connection_setAvailable(con, false);
331                                         goto done;
332                                 }
333                         }
334                 }
335                 con = NULL;
336                 if (size < P->maxConnections) {
337                         con = Connection_new(P, &P->error);
338                         if (con) {
339                                 Connection_setAvailable(con, false);
340                                 Vector_push(P->pool, con);
341                         } else {
342                                 DEBUG("Failed to create connection -- %s\n", P->error);
343                                 FREE(P->error);
344                         }
345                 }
346         }
347 done:
348         END_LOCK;
349 	return con;
350 }
351 
352 
ConnectionPool_returnConnection(T P,Connection_T connection)353 void ConnectionPool_returnConnection(T P, Connection_T connection) {
354 	assert(P);
355         assert(connection);
356 	if (Connection_isInTransaction(connection)) {
357                 TRY
358                         Connection_rollback(connection);
359                 ELSE
360                         DEBUG("Failed to rollback transaction -- %s\n", Exception_frame.message);
361                 END_TRY;
362 	}
363 	Connection_clear(connection);
364 	LOCK(P->mutex)
365         {
366 		Connection_setAvailable(connection, true);
367         }
368 	END_LOCK;
369 }
370 
371 
ConnectionPool_reapConnections(T P)372 int ConnectionPool_reapConnections(T P) {
373         int n = 0;
374         assert(P);
375         LOCK(P->mutex)
376         {
377                 n = _reapConnections(P);
378         }
379         END_LOCK;
380         return n;
381 }
382 
383 
ConnectionPool_version(void)384 const char *ConnectionPool_version(void) {
385         return ABOUT;
386 }
387