1 /* TDSPool - Connection pooling for TDS based databases
2 * Copyright (C) 2001, 2002, 2003, 2004, 2005 Brian Bruns
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 */
19
20 #include <config.h>
21
22 #include <stdarg.h>
23 #include <stdio.h>
24 #include <assert.h>
25
26 #if HAVE_STDLIB_H
27 #include <stdlib.h>
28 #endif /* HAVE_STDLIB_H */
29
30 #if HAVE_STRING_H
31 #include <string.h>
32 #endif /* HAVE_STRING_H */
33
34 #if HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif /* HAVE_UNISTD_H */
37
38 #if HAVE_SYS_PARAM_H
39 #include <sys/param.h>
40 #endif /* HAVE_SYS_PARAM_H */
41
42 #if HAVE_SYS_SOCKET_H
43 #include <sys/socket.h>
44 #endif /* HAVE_SYS_SOCKET_H */
45
46 #if HAVE_NETINET_IN_H
47 #include <netinet/in.h>
48 #endif /* HAVE_NETINET_IN_H */
49
50 #if HAVE_ARPA_INET_H
51 #include <arpa/inet.h>
52 #endif /* HAVE_ARPA_INET_H */
53
54 #include "pool.h"
55 #include <freetds/replacements.h>
56 #include <freetds/utils/string.h>
57
58 #ifndef MAXHOSTNAMELEN
59 #define MAXHOSTNAMELEN 256
60 #endif /* MAXHOSTNAMELEN */
61
62 static void
pool_mbr_free_socket(TDSSOCKET * tds)63 pool_mbr_free_socket(TDSSOCKET *tds)
64 {
65 if (tds) {
66 TDSCONTEXT *ctx = (TDSCONTEXT *) tds->conn->tds_ctx;
67
68 tds_free_socket(tds);
69 tds_free_context(ctx);
70 }
71 }
72
73 /*
74 * pool_mbr_login open a single pool login, to be call at init time or
75 * to reconnect.
76 */
77 static TDSSOCKET *
pool_mbr_login(const TDS_POOL * pool,int tds_version)78 pool_mbr_login(const TDS_POOL * pool, int tds_version)
79 {
80 TDSCONTEXT *context;
81 TDSLOGIN *login;
82 TDSSOCKET *tds;
83 TDSLOGIN *connection;
84 char hostname[MAXHOSTNAMELEN];
85
86 login = tds_alloc_login(1);
87 if (!login) {
88 fprintf(stderr, "out of memory");
89 return NULL;
90 }
91 if (gethostname(hostname, MAXHOSTNAMELEN) < 0)
92 strlcpy(hostname, "tdspool", MAXHOSTNAMELEN);
93 if (!tds_set_passwd(login, pool->server_password)
94 || !tds_set_user(login, pool->server_user)
95 || !tds_set_app(login, "tdspool")
96 || !tds_set_host(login, hostname)
97 || !tds_set_library(login, "TDS-Library")
98 || !tds_set_server(login, pool->server)
99 || !tds_set_client_charset(login, "iso_1")
100 || !tds_set_language(login, "us_english")) {
101 tds_free_login(login);
102 return NULL;
103 }
104 if (tds_version > 0)
105 login->tds_version = tds_version;
106 if (pool->database && strlen(pool->database)) {
107 if (!tds_dstr_copy(&login->database, pool->database)) {
108 tds_free_login(login);
109 return NULL;
110 }
111 }
112 context = tds_alloc_context(NULL);
113 if (!context) {
114 fprintf(stderr, "Context cannot be null\n");
115 return NULL;
116 }
117 tds = tds_alloc_socket(context, 512);
118 if (!tds) {
119 fprintf(stderr, "tds cannot be null\n");
120 return NULL;
121 }
122 connection = tds_read_config_info(tds, login, context->locale);
123 tds_free_login(login);
124 if (!connection || TDS_FAILED(tds_connect_and_login(tds, connection))) {
125 pool_mbr_free_socket(tds);
126 tds_free_login(connection);
127 /* what to do? */
128 fprintf(stderr, "Could not open connection to server %s\n", pool->server);
129 return NULL;
130 }
131 tds_free_login(connection);
132
133 if (pool->database && strlen(pool->database)) {
134 if (strcasecmp(tds->conn->env.database, pool->database) != 0) {
135 fprintf(stderr, "changing database failed\n");
136 return NULL;
137 }
138 }
139
140 return tds;
141 }
142
143 void
pool_assign_member(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr,TDS_POOL_USER * puser)144 pool_assign_member(TDS_POOL *pool, TDS_POOL_MEMBER * pmbr, TDS_POOL_USER *puser)
145 {
146 assert(pmbr->current_user == NULL);
147 if (pmbr->current_user) {
148 pmbr->current_user->assigned_member = NULL;
149 } else {
150 dlist_member_remove(&pool->idle_members, pmbr);
151 dlist_member_append(&pool->active_members, pmbr);
152 }
153 pmbr->current_user = puser;
154 puser->assigned_member = pmbr;
155 }
156
157 void
pool_deassign_member(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr)158 pool_deassign_member(TDS_POOL *pool, TDS_POOL_MEMBER * pmbr)
159 {
160 if (pmbr->current_user) {
161 pmbr->current_user->assigned_member = NULL;
162 pmbr->current_user = NULL;
163 dlist_member_remove(&pool->active_members, pmbr);
164 dlist_member_append(&pool->idle_members, pmbr);
165 }
166 pmbr->sock.poll_send = false;
167 }
168
169 /*
170 * if a dead connection on the client side left this member in a questionable
171 * state, let's bring in a correct one
172 * We are not sure what the client did so we must try to clean as much as
173 * possible.
174 * Use pool_free_member if the state is really broken.
175 */
176 void
pool_reset_member(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr)177 pool_reset_member(TDS_POOL * pool, TDS_POOL_MEMBER * pmbr)
178 {
179 // FIXME not wait for server !!! asyncronous
180 TDSSOCKET *tds = pmbr->sock.tds;
181 TDS_POOL_USER *puser;
182
183 puser = pmbr->current_user;
184 if (puser) {
185 pool_deassign_member(pool, pmbr);
186 pool_free_user(pool, puser);
187 }
188
189 /* cancel whatever pending */
190 tds_init_write_buf(tds);
191 if (tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
192 goto failure;
193 tds->out_flag = TDS_CANCEL;
194 if (TDS_FAILED(tds_flush_packet(tds)))
195 goto failure;
196 tds_set_state(tds, TDS_PENDING);
197 tds->in_cancel = 2;
198
199 if (TDS_FAILED(tds_process_cancel(tds)))
200 goto failure;
201
202 if (IS_TDS71_PLUS(tds->conn)) {
203 /* this 0x9 final reset the state from mssql 2000 */
204 if (tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
205 goto failure;
206 tds_start_query(tds, TDS_QUERY);
207 tds_put_string(tds, "WHILE @@TRANCOUNT > 0 ROLLBACK SET TRANSACTION ISOLATION LEVEL READ COMMITTED", -1);
208 tds_write_packet(tds, 0x9);
209 tds_set_state(tds, TDS_PENDING);
210
211 if (TDS_FAILED(tds_process_simple_query(tds)))
212 goto failure;
213 }
214 return;
215
216 failure:
217 pool_free_member(pool, pmbr);
218 }
219
220 void
pool_free_member(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr)221 pool_free_member(TDS_POOL * pool, TDS_POOL_MEMBER * pmbr)
222 {
223 TDSSOCKET *tds;
224 TDS_POOL_USER *puser;
225
226 tds = pmbr->sock.tds;
227 if (tds) {
228 if (!IS_TDSDEAD(tds))
229 tds_close_socket(tds);
230 pool_mbr_free_socket(tds);
231 pmbr->sock.tds = NULL;
232 }
233
234 /*
235 * if he is allocated disconnect the client
236 * otherwise we end up with broken client.
237 */
238 puser = pmbr->current_user;
239 if (puser) {
240 pool_deassign_member(pool, pmbr);
241 pool_free_user(pool, puser);
242 }
243
244 if (dlist_member_in_list(&pool->active_members, pmbr)) {
245 pool->num_active_members--;
246 dlist_member_remove(&pool->active_members, pmbr);
247 }
248 free(pmbr);
249 }
250
251 void
pool_mbr_init(TDS_POOL * pool)252 pool_mbr_init(TDS_POOL * pool)
253 {
254 TDS_POOL_MEMBER *pmbr;
255
256 /* allocate room for pool members */
257
258 pool->num_active_members = 0;
259 dlist_member_init(&pool->active_members);
260 dlist_member_init(&pool->idle_members);
261
262 /* open connections for each member */
263 while (pool->num_active_members < pool->min_open_conn) {
264 pmbr = tds_new0(TDS_POOL_MEMBER, 1);
265 if (!pmbr) {
266 fprintf(stderr, "Out of memory\n");
267 exit(1);
268 }
269 pmbr->sock.poll_recv = true;
270
271 pmbr->sock.tds = pool_mbr_login(pool, 0);
272 if (!pmbr->sock.tds) {
273 fprintf(stderr, "Could not open initial connection\n");
274 exit(1);
275 }
276 pmbr->last_used_tm = time(NULL);
277 pool->num_active_members++;
278 dlist_member_append(&pool->idle_members, pmbr);
279 if (!IS_TDS71_PLUS(pmbr->sock.tds->conn)) {
280 fprintf(stderr, "Current pool implementation does not support protocol versions former than 7.1\n");
281 exit(1);
282 }
283 pool->member_logins++;
284 }
285 }
286
287 void
pool_mbr_destroy(TDS_POOL * pool)288 pool_mbr_destroy(TDS_POOL * pool)
289 {
290 while (dlist_member_first(&pool->active_members))
291 pool_free_member(pool, dlist_member_first(&pool->active_members));
292 while (dlist_member_first(&pool->idle_members))
293 pool_free_member(pool, dlist_member_first(&pool->idle_members));
294
295 assert(pool->num_active_members == 0);
296 pool->num_active_members = 0;
297 }
298
299 static bool
pool_process_data(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr)300 pool_process_data(TDS_POOL *pool, TDS_POOL_MEMBER *pmbr)
301 {
302 TDSSOCKET *tds = pmbr->sock.tds;
303 TDS_POOL_USER *puser = NULL;
304
305 for (;;) {
306 if (pool_packet_read(tds))
307 break;
308
309 /* disconnected */
310 if (tds->in_len == 0) {
311 tdsdump_log(TDS_DBG_INFO1, "Uh oh! member disconnected\n");
312 /* mark as dead */
313 pool_free_member(pool, pmbr);
314 return false;
315 }
316
317 tdsdump_dump_buf(TDS_DBG_NETWORK, "Got packet from server:", tds->in_buf, tds->in_len);
318 puser = pmbr->current_user;
319 if (!puser)
320 break;
321
322 tdsdump_log(TDS_DBG_INFO1, "writing it sock %d\n", tds_get_s(puser->sock.tds));
323 if (!pool_write_data(&pmbr->sock, &puser->sock)) {
324 tdsdump_log(TDS_DBG_ERROR, "member received error while writing\n");
325 pool_free_user(pool, puser);
326 return false;
327 }
328 if (tds->in_pos < tds->in_len)
329 /* partial write, schedule a future write */
330 break;
331 }
332 if (puser && !puser->sock.poll_send)
333 tds_socket_flush(tds_get_s(puser->sock.tds));
334 return true;
335 }
336
337 /*
338 * pool_process_members
339 * check the fd_set for members returning data to the client, lookup the
340 * client holding this member and forward the results.
341 * @return Timeout you should call this function again or -1 for infinite
342 */
343 int
pool_process_members(TDS_POOL * pool,fd_set * rfds,fd_set * wfds)344 pool_process_members(TDS_POOL * pool, fd_set * rfds, fd_set * wfds)
345 {
346 TDS_POOL_MEMBER *pmbr, *next;
347 TDSSOCKET *tds;
348 time_t age;
349 time_t time_now;
350 int min_expire_left = -1;
351
352 for (next = dlist_member_first(&pool->active_members); (pmbr = next) != NULL; ) {
353 bool processed = false;
354
355 next = dlist_member_next(&pool->active_members, pmbr);
356
357 if (pmbr->doing_async)
358 continue;
359
360 tds = pmbr->sock.tds;
361 assert(tds);
362
363 time_now = time(NULL);
364 if (pmbr->sock.poll_recv && FD_ISSET(tds_get_s(tds), rfds)) {
365 if (!pool_process_data(pool, pmbr))
366 continue;
367 processed = true;
368 }
369 if (pmbr->sock.poll_send && FD_ISSET(tds_get_s(tds), wfds)) {
370 if (!pool_write_data(&pmbr->current_user->sock, &pmbr->sock)) {
371 pool_free_member(pool, pmbr);
372 continue;
373 }
374 processed = true;
375 }
376 if (processed)
377 pmbr->last_used_tm = time_now;
378 }
379
380 if (pool->num_active_members <= pool->min_open_conn)
381 return min_expire_left;
382
383 /* close old connections */
384 time_now = time(NULL);
385 for (next = dlist_member_first(&pool->idle_members); (pmbr = next) != NULL; ) {
386
387 next = dlist_member_next(&pool->idle_members, pmbr);
388
389 assert(pmbr->sock.tds);
390 assert(!pmbr->current_user);
391
392 age = time_now - pmbr->last_used_tm;
393 if (age >= pool->max_member_age) {
394 tdsdump_log(TDS_DBG_INFO1, "member is %ld seconds old...closing\n", (long int) age);
395 pool_free_member(pool, pmbr);
396 } else {
397 int left = (int) (pool->max_member_age - age);
398 if (min_expire_left < 0 || left < min_expire_left)
399 min_expire_left = left;
400 }
401 }
402 return min_expire_left;
403 }
404
405 static bool
compatible_versions(const TDSSOCKET * tds,const TDS_POOL_USER * user)406 compatible_versions(const TDSSOCKET *tds, const TDS_POOL_USER *user)
407 {
408 if (tds->conn->tds_version != user->login->tds_version)
409 return false;
410 return true;
411 }
412
413 typedef struct {
414 TDS_POOL_EVENT common;
415 TDS_POOL *pool;
416 TDS_POOL_MEMBER *pmbr;
417 int tds_version;
418 } CONNECT_EVENT;
419
420 static void connect_execute_ok(TDS_POOL_EVENT *base_event);
421 static void connect_execute_ko(TDS_POOL_EVENT *base_event);
422
TDS_THREAD_PROC_DECLARE(connect_proc,arg)423 static TDS_THREAD_PROC_DECLARE(connect_proc, arg)
424 {
425 CONNECT_EVENT *ev = (CONNECT_EVENT *) arg;
426 TDS_POOL_MEMBER *pmbr = ev->pmbr;
427 TDS_POOL *pool = ev->pool;
428
429 for (;;) {
430 pmbr->sock.tds = pool_mbr_login(pool, ev->tds_version);
431 if (!pmbr->sock.tds) {
432 tdsdump_log(TDS_DBG_ERROR, "Error opening a new connection to server\n");
433 break;
434 }
435 if (!IS_TDS71_PLUS(pmbr->sock.tds->conn)) {
436 tdsdump_log(TDS_DBG_ERROR, "Protocol server version not supported\n");
437 break;
438 }
439
440 /* if already attached to a user we can send login directly */
441 if (pmbr->current_user)
442 if (!pool_user_send_login_ack(pool, pmbr->current_user))
443 break;
444
445 pool_event_add(pool, &ev->common, connect_execute_ok);
446 return TDS_THREAD_RESULT(0);
447 }
448
449 /* failure */
450 pool_event_add(pool, &ev->common, connect_execute_ko);
451 return TDS_THREAD_RESULT(0);
452 }
453
454 static void
connect_execute_ko(TDS_POOL_EVENT * base_event)455 connect_execute_ko(TDS_POOL_EVENT *base_event)
456 {
457 CONNECT_EVENT *ev = (CONNECT_EVENT *) base_event;
458
459 pool_free_member(ev->pool, ev->pmbr);
460 }
461
462 static void
connect_execute_ok(TDS_POOL_EVENT * base_event)463 connect_execute_ok(TDS_POOL_EVENT *base_event)
464 {
465 CONNECT_EVENT *ev = (CONNECT_EVENT *) base_event;
466 TDS_POOL_MEMBER *pmbr = ev->pmbr;
467 TDS_POOL_USER *puser = pmbr->current_user;
468
469 ev->pool->member_logins++;
470 pmbr->doing_async = false;
471
472 pmbr->last_used_tm = time(NULL);
473
474 if (puser) {
475 pmbr->sock.poll_recv = true;
476 puser->sock.poll_recv = true;
477
478 puser->user_state = TDS_SRV_QUERY;
479 }
480 }
481
482 /*
483 * pool_assign_idle_member
484 * assign a member to the user specified
485 */
486 TDS_POOL_MEMBER *
pool_assign_idle_member(TDS_POOL * pool,TDS_POOL_USER * puser)487 pool_assign_idle_member(TDS_POOL * pool, TDS_POOL_USER *puser)
488 {
489 TDS_POOL_MEMBER *pmbr;
490 CONNECT_EVENT *ev;
491
492 puser->sock.poll_recv = false;
493 puser->sock.poll_send = false;
494
495 DLIST_FOREACH(dlist_member, &pool->idle_members, pmbr) {
496 assert(pmbr->current_user == NULL);
497 assert(!pmbr->doing_async);
498
499 assert(pmbr->sock.tds);
500
501 if (!compatible_versions(pmbr->sock.tds, puser))
502 continue;
503
504 pool_assign_member(pool, pmbr, puser);
505
506 /*
507 * make sure member wasn't idle more that the timeout
508 * otherwise it'll send the query and close leaving a
509 * hung client
510 */
511 pmbr->last_used_tm = time(NULL);
512 pmbr->sock.poll_recv = false;
513 pmbr->sock.poll_send = false;
514
515 pool_user_finish_login(pool, puser);
516 return pmbr;
517 }
518
519 /* if we can open a new connection open it */
520 if (pool->num_active_members >= pool->max_open_conn) {
521 fprintf(stderr, "No idle members left, increase \"max pool conn\"\n");
522 return NULL;
523 }
524
525 pmbr = tds_new0(TDS_POOL_MEMBER, 1);
526 if (!pmbr) {
527 fprintf(stderr, "Out of memory\n");
528 return NULL;
529 }
530
531 tdsdump_log(TDS_DBG_INFO1, "No open connections left, opening new member\n");
532
533 ev = tds_new0(CONNECT_EVENT, 1);
534 if (!ev) {
535 free(pmbr);
536 fprintf(stderr, "Out of memory\n");
537 return NULL;
538 }
539 ev->pmbr = pmbr;
540 ev->pool = pool;
541 ev->tds_version = puser->login->tds_version;
542
543 if (tds_thread_create_detached(connect_proc, ev) != 0) {
544 free(pmbr);
545 free(ev);
546 fprintf(stderr, "error creating thread\n");
547 return NULL;
548 }
549 pmbr->doing_async = true;
550
551 pool->num_active_members++;
552 dlist_member_append(&pool->idle_members, pmbr);
553
554 pool_assign_member(pool, pmbr, puser);
555 puser->sock.poll_send = false;
556 puser->sock.poll_recv = false;
557
558 return pmbr;
559 }
560