1 /*
2 * Copyright (C) Tildeslash Ltd. All rights reserved.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 3.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 *
15 * In addition, as a special exception, the copyright holders give
16 * permission to link the code of portions of this program with the
17 * OpenSSL library under certain conditions as described in each
18 * individual source file, and distribute linked combinations
19 * including the two.
20 *
21 * You must obey the GNU General Public License in all respects
22 * for all of the code used other than OpenSSL.
23 */
24
25
26 #include "Config.h"
27
28 #include <stdio.h>
29 #include <string.h>
30
31 #include "URL.h"
32 #include "Thread.h"
33 #include "system/Time.h"
34 #include "Vector.h"
35 #include "ResultSet.h"
36 #include "PreparedStatement.h"
37 #include "Connection.h"
38 #include "ConnectionPool.h"
39
40
41 /**
42 * Implementation of the ConnectionPool interface
43 *
44 * @file
45 */
46
47
48 /* ----------------------------------------------------------- Definitions */
49
50
51 #define T ConnectionPool_T
52 struct ConnectionPool_S {
53 URL_T url;
54 int filled;
55 int doSweep;
56 char *error;
57 Sem_T alarm;
58 Mutex_T mutex;
59 Vector_T pool;
60 Thread_T reaper;
61 int sweepInterval;
62 int maxConnections;
63 volatile int stopped;
64 int connectionTimeout;
65 int initialConnections;
66 };
67
68 int ZBDEBUG = false;
69 #ifdef PACKAGE_PROTECTED
70 #pragma GCC visibility push(hidden)
71 #endif
72 void(*AbortHandler)(const char *error) = NULL;
73 #ifdef PACKAGE_PROTECTED
74 #pragma GCC visibility pop
75 #endif
76
77
78 /* ------------------------------------------------------- Private methods */
79
80
_drainPool(T P)81 static void _drainPool(T P) {
82 while (! Vector_isEmpty(P->pool)) {
83 Connection_T con = Vector_pop(P->pool);
84 Connection_free(&con);
85 }
86 }
87
88
_fillPool(T P)89 static bool _fillPool(T P) {
90 for (int i = 0; i < P->initialConnections; i++) {
91 Connection_T con = Connection_new(P, &P->error);
92 if (! con) {
93 if (i > 0) {
94 DEBUG("Failed to fill the pool with initial connections -- %s\n", P->error);
95 FREE(P->error);
96 return true;
97 }
98 return false;
99 }
100 Vector_push(P->pool, con);
101 }
102 return true;
103 }
104
105
_getActive(T P)106 static int _getActive(T P){
107 int i, n = 0, size = Vector_size(P->pool);
108 for (i = 0; i < size; i++)
109 if (! Connection_isAvailable(Vector_get(P->pool, i)))
110 n++;
111 return n;
112 }
113
114
_reapConnections(T P)115 static int _reapConnections(T P) {
116 int n = 0;
117 int x = Vector_size(P->pool) - _getActive(P) - P->initialConnections;
118 time_t timedout = Time_now() - P->connectionTimeout;
119 for (int i = 0; ((n < x) && (i < Vector_size(P->pool))); i++) {
120 Connection_T con = Vector_get(P->pool, i);
121 if (Connection_isAvailable(con)) {
122 if ((Connection_getLastAccessedTime(con) < timedout) || (! Connection_ping(con))) {
123 Vector_remove(P->pool, i);
124 Connection_free(&con);
125 n++;
126 i--;
127 }
128 }
129 }
130 return n;
131 }
132
133
_doSweep(void * args)134 static void *_doSweep(void *args) {
135 T P = args;
136 struct timespec wait = {};
137 Mutex_lock(P->mutex);
138 while (! P->stopped) {
139 wait.tv_sec = Time_now() + P->sweepInterval;
140 Sem_timeWait(P->alarm, P->mutex, wait);
141 if (P->stopped) break;
142 _reapConnections(P);
143 }
144 Mutex_unlock(P->mutex);
145 DEBUG("Reaper thread stopped\n");
146 return NULL;
147 }
148
149
150 /* ---------------------------------------------------------------- Public */
151
152
ConnectionPool_new(URL_T url)153 T ConnectionPool_new(URL_T url) {
154 T P;
155 assert(url);
156 System_init();
157 NEW(P);
158 P->url = url;
159 Sem_init(P->alarm);
160 Mutex_init(P->mutex);
161 P->maxConnections = SQL_DEFAULT_MAX_CONNECTIONS;
162 P->pool = Vector_new(SQL_DEFAULT_MAX_CONNECTIONS);
163 P->initialConnections = SQL_DEFAULT_INIT_CONNECTIONS;
164 P->connectionTimeout = SQL_DEFAULT_CONNECTION_TIMEOUT;
165 return P;
166 }
167
168
ConnectionPool_free(T * P)169 void ConnectionPool_free(T *P) {
170 Vector_T pool;
171 assert(P && *P);
172 pool = (*P)->pool;
173 if (! (*P)->stopped)
174 ConnectionPool_stop((*P));
175 Vector_free(&pool);
176 Mutex_destroy((*P)->mutex);
177 Sem_destroy((*P)->alarm);
178 FREE((*P)->error);
179 FREE(*P);
180 }
181
182
183 /* ------------------------------------------------------------ Properties */
184
185
ConnectionPool_getURL(T P)186 URL_T ConnectionPool_getURL(T P) {
187 assert(P);
188 return P->url;
189 }
190
191
ConnectionPool_setInitialConnections(T P,int connections)192 void ConnectionPool_setInitialConnections(T P, int connections) {
193 assert(P);
194 assert(connections >= 0);
195 LOCK(P->mutex)
196 {
197 P->initialConnections = connections;
198 }
199 END_LOCK;
200 }
201
202
ConnectionPool_getInitialConnections(T P)203 int ConnectionPool_getInitialConnections(T P) {
204 assert(P);
205 return P->initialConnections;
206 }
207
208
ConnectionPool_setMaxConnections(T P,int maxConnections)209 void ConnectionPool_setMaxConnections(T P, int maxConnections) {
210 assert(P);
211 assert(P->initialConnections <= maxConnections);
212 LOCK(P->mutex)
213 {
214 P->maxConnections = maxConnections;
215 }
216 END_LOCK;
217 }
218
219
ConnectionPool_getMaxConnections(T P)220 int ConnectionPool_getMaxConnections(T P) {
221 assert(P);
222 return P->maxConnections;
223 }
224
225
ConnectionPool_setConnectionTimeout(T P,int connectionTimeout)226 void ConnectionPool_setConnectionTimeout(T P, int connectionTimeout) {
227 assert(P);
228 assert(connectionTimeout > 0);
229 P->connectionTimeout = connectionTimeout;
230 }
231
232
ConnectionPool_getConnectionTimeout(T P)233 int ConnectionPool_getConnectionTimeout(T P) {
234 assert(P);
235 return P->connectionTimeout;
236 }
237
238
ConnectionPool_setAbortHandler(T P,void (* abortHandler)(const char * error))239 void ConnectionPool_setAbortHandler(T P, void(*abortHandler)(const char *error)) {
240 assert(P);
241 AbortHandler = abortHandler;
242 }
243
244
ConnectionPool_setReaper(T P,int sweepInterval)245 void ConnectionPool_setReaper(T P, int sweepInterval) {
246 assert(P);
247 assert(sweepInterval>0);
248 LOCK(P->mutex)
249 {
250 P->doSweep = true;
251 P->sweepInterval = sweepInterval;
252 }
253 END_LOCK;
254 }
255
256
ConnectionPool_size(T P)257 int ConnectionPool_size(T P) {
258 assert(P);
259 return Vector_size(P->pool);
260 }
261
262
ConnectionPool_active(T P)263 int ConnectionPool_active(T P) {
264 int n = 0;
265 assert(P);
266 LOCK(P->mutex)
267 {
268 n = _getActive(P);
269 }
270 END_LOCK;
271 return n;
272 }
273
274
275 /* -------------------------------------------------------- Public methods */
276
277
ConnectionPool_start(T P)278 void ConnectionPool_start(T P) {
279 assert(P);
280 LOCK(P->mutex)
281 {
282 P->stopped = false;
283 if (! P->filled) {
284 P->filled = _fillPool(P);
285 if (P->filled) {
286 if (P->doSweep) {
287 DEBUG("Starting Database reaper thread\n");
288 Thread_create(P->reaper, _doSweep, P);
289 }
290 }
291 }
292 }
293 END_LOCK;
294 if (! P->filled)
295 THROW(SQLException, "Failed to start connection pool -- %s", P->error);
296 }
297
298
ConnectionPool_stop(T P)299 void ConnectionPool_stop(T P) {
300 int stopSweep = false;
301 assert(P);
302 LOCK(P->mutex)
303 {
304 P->stopped = true;
305 if (P->filled) {
306 _drainPool(P);
307 P->filled = false;
308 stopSweep = (P->doSweep && P->reaper);
309 }
310 }
311 END_LOCK;
312 if (stopSweep) {
313 DEBUG("Stopping Database reaper thread...\n");
314 Sem_signal(P->alarm);
315 Thread_join(P->reaper);
316 }
317 }
318
319
ConnectionPool_getConnection(T P)320 Connection_T ConnectionPool_getConnection(T P) {
321 Connection_T con = NULL;
322 assert(P);
323 LOCK(P->mutex)
324 {
325 int size = Vector_size(P->pool);
326 for (int i = 0; i < size; i++) {
327 con = Vector_get(P->pool, i);
328 if (Connection_isAvailable(con)) {
329 if (Connection_ping(con)) {
330 Connection_setAvailable(con, false);
331 goto done;
332 }
333 }
334 }
335 con = NULL;
336 if (size < P->maxConnections) {
337 con = Connection_new(P, &P->error);
338 if (con) {
339 Connection_setAvailable(con, false);
340 Vector_push(P->pool, con);
341 } else {
342 DEBUG("Failed to create connection -- %s\n", P->error);
343 FREE(P->error);
344 }
345 }
346 }
347 done:
348 END_LOCK;
349 return con;
350 }
351
352
ConnectionPool_returnConnection(T P,Connection_T connection)353 void ConnectionPool_returnConnection(T P, Connection_T connection) {
354 assert(P);
355 assert(connection);
356 if (Connection_isInTransaction(connection)) {
357 TRY
358 Connection_rollback(connection);
359 ELSE
360 DEBUG("Failed to rollback transaction -- %s\n", Exception_frame.message);
361 END_TRY;
362 }
363 Connection_clear(connection);
364 LOCK(P->mutex)
365 {
366 Connection_setAvailable(connection, true);
367 }
368 END_LOCK;
369 }
370
371
ConnectionPool_reapConnections(T P)372 int ConnectionPool_reapConnections(T P) {
373 int n = 0;
374 assert(P);
375 LOCK(P->mutex)
376 {
377 n = _reapConnections(P);
378 }
379 END_LOCK;
380 return n;
381 }
382
383
ConnectionPool_version(void)384 const char *ConnectionPool_version(void) {
385 return ABOUT;
386 }
387