1 /* TDSPool - Connection pooling for TDS based databases
2  * Copyright (C) 2001, 2002, 2003, 2004, 2005  Brian Bruns
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  *
18  */
19 
20 #include <config.h>
21 
22 #include <stdarg.h>
23 #include <stdio.h>
24 #include <assert.h>
25 
26 #if HAVE_STDLIB_H
27 #include <stdlib.h>
28 #endif /* HAVE_STDLIB_H */
29 
30 #if HAVE_STRING_H
31 #include <string.h>
32 #endif /* HAVE_STRING_H */
33 
34 #if HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif /* HAVE_UNISTD_H */
37 
38 #if HAVE_SYS_PARAM_H
39 #include <sys/param.h>
40 #endif /* HAVE_SYS_PARAM_H */
41 
42 #if HAVE_SYS_SOCKET_H
43 #include <sys/socket.h>
44 #endif /* HAVE_SYS_SOCKET_H */
45 
46 #if HAVE_NETINET_IN_H
47 #include <netinet/in.h>
48 #endif /* HAVE_NETINET_IN_H */
49 
50 #if HAVE_ARPA_INET_H
51 #include <arpa/inet.h>
52 #endif /* HAVE_ARPA_INET_H */
53 
54 #include "pool.h"
55 #include <freetds/replacements.h>
56 #include <freetds/utils/string.h>
57 
58 #ifndef MAXHOSTNAMELEN
59 #define MAXHOSTNAMELEN 256
60 #endif /* MAXHOSTNAMELEN */
61 
62 static void
pool_mbr_free_socket(TDSSOCKET * tds)63 pool_mbr_free_socket(TDSSOCKET *tds)
64 {
65 	if (tds) {
66 		TDSCONTEXT *ctx = (TDSCONTEXT *) tds->conn->tds_ctx;
67 
68 		tds_free_socket(tds);
69 		tds_free_context(ctx);
70 	}
71 }
72 
73 /*
74  * pool_mbr_login open a single pool login, to be call at init time or
75  * to reconnect.
76  */
77 static TDSSOCKET *
pool_mbr_login(const TDS_POOL * pool,int tds_version)78 pool_mbr_login(const TDS_POOL * pool, int tds_version)
79 {
80 	TDSCONTEXT *context;
81 	TDSLOGIN *login;
82 	TDSSOCKET *tds;
83 	TDSLOGIN *connection;
84 	char hostname[MAXHOSTNAMELEN];
85 
86 	login = tds_alloc_login(1);
87 	if (!login) {
88 		fprintf(stderr, "out of memory");
89 		return NULL;
90 	}
91 	if (gethostname(hostname, MAXHOSTNAMELEN) < 0)
92 		strlcpy(hostname, "tdspool", MAXHOSTNAMELEN);
93 	if (!tds_set_passwd(login, pool->server_password)
94 	    || !tds_set_user(login, pool->server_user)
95 	    || !tds_set_app(login, "tdspool")
96 	    || !tds_set_host(login, hostname)
97 	    || !tds_set_library(login, "TDS-Library")
98 	    || !tds_set_server(login, pool->server)
99 	    || !tds_set_client_charset(login, "iso_1")
100 	    || !tds_set_language(login, "us_english")) {
101 		tds_free_login(login);
102 		return NULL;
103 	}
104 	if (tds_version > 0)
105 		login->tds_version = tds_version;
106 	if (pool->database && strlen(pool->database)) {
107 		if (!tds_dstr_copy(&login->database, pool->database)) {
108 			tds_free_login(login);
109 			return NULL;
110 		}
111 	}
112 	context = tds_alloc_context(NULL);
113 	if (!context) {
114 		fprintf(stderr, "Context cannot be null\n");
115 		return NULL;
116 	}
117 	tds = tds_alloc_socket(context, 512);
118 	if (!tds) {
119 		fprintf(stderr, "tds cannot be null\n");
120 		return NULL;
121 	}
122 	connection = tds_read_config_info(tds, login, context->locale);
123 	tds_free_login(login);
124 	if (!connection || TDS_FAILED(tds_connect_and_login(tds, connection))) {
125 		pool_mbr_free_socket(tds);
126 		tds_free_login(connection);
127 		/* what to do? */
128 		fprintf(stderr, "Could not open connection to server %s\n", pool->server);
129 		return NULL;
130 	}
131 	tds_free_login(connection);
132 
133 	if (pool->database && strlen(pool->database)) {
134 		if (strcasecmp(tds->conn->env.database, pool->database) != 0) {
135 			fprintf(stderr, "changing database failed\n");
136 			return NULL;
137 		}
138 	}
139 
140 	return tds;
141 }
142 
143 void
pool_assign_member(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr,TDS_POOL_USER * puser)144 pool_assign_member(TDS_POOL *pool, TDS_POOL_MEMBER * pmbr, TDS_POOL_USER *puser)
145 {
146 	assert(pmbr->current_user == NULL);
147 	if (pmbr->current_user) {
148 		pmbr->current_user->assigned_member = NULL;
149 	} else {
150 		dlist_member_remove(&pool->idle_members, pmbr);
151 		dlist_member_append(&pool->active_members, pmbr);
152 	}
153 	pmbr->current_user = puser;
154 	puser->assigned_member = pmbr;
155 }
156 
157 void
pool_deassign_member(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr)158 pool_deassign_member(TDS_POOL *pool, TDS_POOL_MEMBER * pmbr)
159 {
160 	if (pmbr->current_user) {
161 		pmbr->current_user->assigned_member = NULL;
162 		pmbr->current_user = NULL;
163 		dlist_member_remove(&pool->active_members, pmbr);
164 		dlist_member_append(&pool->idle_members, pmbr);
165 	}
166 	pmbr->sock.poll_send = false;
167 }
168 
169 /*
170  * if a dead connection on the client side left this member in a questionable
171  * state, let's bring in a correct one
172  * We are not sure what the client did so we must try to clean as much as
173  * possible.
174  * Use pool_free_member if the state is really broken.
175  */
176 void
pool_reset_member(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr)177 pool_reset_member(TDS_POOL * pool, TDS_POOL_MEMBER * pmbr)
178 {
179 	// FIXME not wait for server !!! asyncronous
180 	TDSSOCKET *tds = pmbr->sock.tds;
181 	TDS_POOL_USER *puser;
182 
183 	puser = pmbr->current_user;
184 	if (puser) {
185 		pool_deassign_member(pool, pmbr);
186 		pool_free_user(pool, puser);
187 	}
188 
189 	/* cancel whatever pending */
190 	tds_init_write_buf(tds);
191 	if (tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
192 		goto failure;
193 	tds->out_flag = TDS_CANCEL;
194 	if (TDS_FAILED(tds_flush_packet(tds)))
195 		goto failure;
196 	tds_set_state(tds, TDS_PENDING);
197 	tds->in_cancel = 2;
198 
199 	if (TDS_FAILED(tds_process_cancel(tds)))
200 		goto failure;
201 
202 	if (IS_TDS71_PLUS(tds->conn)) {
203 		/* this 0x9 final reset the state from mssql 2000 */
204 		if (tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
205 			goto failure;
206 		tds_start_query(tds, TDS_QUERY);
207 		tds_put_string(tds, "WHILE @@TRANCOUNT > 0 ROLLBACK SET TRANSACTION ISOLATION LEVEL READ COMMITTED", -1);
208 		tds_write_packet(tds, 0x9);
209 		tds_set_state(tds, TDS_PENDING);
210 
211 		if (TDS_FAILED(tds_process_simple_query(tds)))
212 			goto failure;
213 	}
214 	return;
215 
216 failure:
217 	pool_free_member(pool, pmbr);
218 }
219 
220 void
pool_free_member(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr)221 pool_free_member(TDS_POOL * pool, TDS_POOL_MEMBER * pmbr)
222 {
223 	TDSSOCKET *tds;
224 	TDS_POOL_USER *puser;
225 
226 	tds = pmbr->sock.tds;
227 	if (tds) {
228 		if (!IS_TDSDEAD(tds))
229 			tds_close_socket(tds);
230 		pool_mbr_free_socket(tds);
231 		pmbr->sock.tds = NULL;
232 	}
233 
234 	/*
235 	 * if he is allocated disconnect the client
236 	 * otherwise we end up with broken client.
237 	 */
238 	puser = pmbr->current_user;
239 	if (puser) {
240 		pool_deassign_member(pool, pmbr);
241 		pool_free_user(pool, puser);
242 	}
243 
244 	if (dlist_member_in_list(&pool->active_members, pmbr)) {
245 		pool->num_active_members--;
246 		dlist_member_remove(&pool->active_members, pmbr);
247 	}
248 	free(pmbr);
249 }
250 
251 void
pool_mbr_init(TDS_POOL * pool)252 pool_mbr_init(TDS_POOL * pool)
253 {
254 	TDS_POOL_MEMBER *pmbr;
255 
256 	/* allocate room for pool members */
257 
258 	pool->num_active_members = 0;
259 	dlist_member_init(&pool->active_members);
260 	dlist_member_init(&pool->idle_members);
261 
262 	/* open connections for each member */
263 	while (pool->num_active_members < pool->min_open_conn) {
264 		pmbr = tds_new0(TDS_POOL_MEMBER, 1);
265 		if (!pmbr) {
266 			fprintf(stderr, "Out of memory\n");
267 			exit(1);
268 		}
269 		pmbr->sock.poll_recv = true;
270 
271 		pmbr->sock.tds = pool_mbr_login(pool, 0);
272 		if (!pmbr->sock.tds) {
273 			fprintf(stderr, "Could not open initial connection\n");
274 			exit(1);
275 		}
276 		pmbr->last_used_tm = time(NULL);
277 		pool->num_active_members++;
278 		dlist_member_append(&pool->idle_members, pmbr);
279 		if (!IS_TDS71_PLUS(pmbr->sock.tds->conn)) {
280 			fprintf(stderr, "Current pool implementation does not support protocol versions former than 7.1\n");
281 			exit(1);
282 		}
283 		pool->member_logins++;
284 	}
285 }
286 
287 void
pool_mbr_destroy(TDS_POOL * pool)288 pool_mbr_destroy(TDS_POOL * pool)
289 {
290 	while (dlist_member_first(&pool->active_members))
291 		pool_free_member(pool, dlist_member_first(&pool->active_members));
292 	while (dlist_member_first(&pool->idle_members))
293 		pool_free_member(pool, dlist_member_first(&pool->idle_members));
294 
295 	assert(pool->num_active_members == 0);
296 	pool->num_active_members = 0;
297 }
298 
299 static bool
pool_process_data(TDS_POOL * pool,TDS_POOL_MEMBER * pmbr)300 pool_process_data(TDS_POOL *pool, TDS_POOL_MEMBER *pmbr)
301 {
302 	TDSSOCKET *tds = pmbr->sock.tds;
303 	TDS_POOL_USER *puser = NULL;
304 
305 	for (;;) {
306 		if (pool_packet_read(tds))
307 			break;
308 
309 		/* disconnected */
310 		if (tds->in_len == 0) {
311 			tdsdump_log(TDS_DBG_INFO1, "Uh oh! member disconnected\n");
312 			/* mark as dead */
313 			pool_free_member(pool, pmbr);
314 			return false;
315 		}
316 
317 		tdsdump_dump_buf(TDS_DBG_NETWORK, "Got packet from server:", tds->in_buf, tds->in_len);
318 		puser = pmbr->current_user;
319 		if (!puser)
320 			break;
321 
322 		tdsdump_log(TDS_DBG_INFO1, "writing it sock %d\n", tds_get_s(puser->sock.tds));
323 		if (!pool_write_data(&pmbr->sock, &puser->sock)) {
324 			tdsdump_log(TDS_DBG_ERROR, "member received error while writing\n");
325 			pool_free_user(pool, puser);
326 			return false;
327 		}
328 		if (tds->in_pos < tds->in_len)
329 			/* partial write, schedule a future write */
330 			break;
331 	}
332 	if (puser && !puser->sock.poll_send)
333 		tds_socket_flush(tds_get_s(puser->sock.tds));
334 	return true;
335 }
336 
337 /*
338  * pool_process_members
339  * check the fd_set for members returning data to the client, lookup the
340  * client holding this member and forward the results.
341  * @return Timeout you should call this function again or -1 for infinite
342  */
343 int
pool_process_members(TDS_POOL * pool,fd_set * rfds,fd_set * wfds)344 pool_process_members(TDS_POOL * pool, fd_set * rfds, fd_set * wfds)
345 {
346 	TDS_POOL_MEMBER *pmbr, *next;
347 	TDSSOCKET *tds;
348 	time_t age;
349 	time_t time_now;
350 	int min_expire_left = -1;
351 
352 	for (next = dlist_member_first(&pool->active_members); (pmbr = next) != NULL; ) {
353 		bool processed = false;
354 
355 		next = dlist_member_next(&pool->active_members, pmbr);
356 
357 		if (pmbr->doing_async)
358 			continue;
359 
360 		tds = pmbr->sock.tds;
361 		assert(tds);
362 
363 		time_now = time(NULL);
364 		if (pmbr->sock.poll_recv && FD_ISSET(tds_get_s(tds), rfds)) {
365 			if (!pool_process_data(pool, pmbr))
366 				continue;
367 			processed = true;
368 		}
369 		if (pmbr->sock.poll_send && FD_ISSET(tds_get_s(tds), wfds)) {
370 			if (!pool_write_data(&pmbr->current_user->sock, &pmbr->sock)) {
371 				pool_free_member(pool, pmbr);
372 				continue;
373 			}
374 			processed = true;
375 		}
376 		if (processed)
377 			pmbr->last_used_tm = time_now;
378 	}
379 
380 	if (pool->num_active_members <= pool->min_open_conn)
381 		return min_expire_left;
382 
383 	/* close old connections */
384 	time_now = time(NULL);
385 	for (next = dlist_member_first(&pool->idle_members); (pmbr = next) != NULL; ) {
386 
387 		next = dlist_member_next(&pool->idle_members, pmbr);
388 
389 		assert(pmbr->sock.tds);
390 		assert(!pmbr->current_user);
391 
392 		age = time_now - pmbr->last_used_tm;
393 		if (age >= pool->max_member_age) {
394 			tdsdump_log(TDS_DBG_INFO1, "member is %ld seconds old...closing\n", (long int) age);
395 			pool_free_member(pool, pmbr);
396 		} else {
397 			int left = (int) (pool->max_member_age - age);
398 			if (min_expire_left < 0 || left < min_expire_left)
399 				min_expire_left = left;
400 		}
401 	}
402 	return min_expire_left;
403 }
404 
405 static bool
compatible_versions(const TDSSOCKET * tds,const TDS_POOL_USER * user)406 compatible_versions(const TDSSOCKET *tds, const TDS_POOL_USER *user)
407 {
408 	if (tds->conn->tds_version != user->login->tds_version)
409 		return false;
410 	return true;
411 }
412 
413 typedef struct {
414 	TDS_POOL_EVENT common;
415 	TDS_POOL *pool;
416 	TDS_POOL_MEMBER *pmbr;
417 	int tds_version;
418 } CONNECT_EVENT;
419 
420 static void connect_execute_ok(TDS_POOL_EVENT *base_event);
421 static void connect_execute_ko(TDS_POOL_EVENT *base_event);
422 
TDS_THREAD_PROC_DECLARE(connect_proc,arg)423 static TDS_THREAD_PROC_DECLARE(connect_proc, arg)
424 {
425 	CONNECT_EVENT *ev = (CONNECT_EVENT *) arg;
426 	TDS_POOL_MEMBER *pmbr = ev->pmbr;
427 	TDS_POOL *pool = ev->pool;
428 
429 	for (;;) {
430 		pmbr->sock.tds = pool_mbr_login(pool, ev->tds_version);
431 		if (!pmbr->sock.tds) {
432 			tdsdump_log(TDS_DBG_ERROR, "Error opening a new connection to server\n");
433 			break;
434 		}
435 		if (!IS_TDS71_PLUS(pmbr->sock.tds->conn)) {
436 			tdsdump_log(TDS_DBG_ERROR, "Protocol server version not supported\n");
437 			break;
438 		}
439 
440 		/* if already attached to a user we can send login directly */
441 		if (pmbr->current_user)
442 			if (!pool_user_send_login_ack(pool, pmbr->current_user))
443 				break;
444 
445 		pool_event_add(pool, &ev->common, connect_execute_ok);
446 		return TDS_THREAD_RESULT(0);
447 	}
448 
449 	/* failure */
450 	pool_event_add(pool, &ev->common, connect_execute_ko);
451 	return TDS_THREAD_RESULT(0);
452 }
453 
454 static void
connect_execute_ko(TDS_POOL_EVENT * base_event)455 connect_execute_ko(TDS_POOL_EVENT *base_event)
456 {
457 	CONNECT_EVENT *ev = (CONNECT_EVENT *) base_event;
458 
459 	pool_free_member(ev->pool, ev->pmbr);
460 }
461 
462 static void
connect_execute_ok(TDS_POOL_EVENT * base_event)463 connect_execute_ok(TDS_POOL_EVENT *base_event)
464 {
465 	CONNECT_EVENT *ev = (CONNECT_EVENT *) base_event;
466 	TDS_POOL_MEMBER *pmbr = ev->pmbr;
467 	TDS_POOL_USER *puser = pmbr->current_user;
468 
469 	ev->pool->member_logins++;
470 	pmbr->doing_async = false;
471 
472 	pmbr->last_used_tm = time(NULL);
473 
474 	if (puser) {
475 		pmbr->sock.poll_recv = true;
476 		puser->sock.poll_recv = true;
477 
478 		puser->user_state = TDS_SRV_QUERY;
479 	}
480 }
481 
482 /*
483  * pool_assign_idle_member
484  * assign a member to the user specified
485  */
486 TDS_POOL_MEMBER *
pool_assign_idle_member(TDS_POOL * pool,TDS_POOL_USER * puser)487 pool_assign_idle_member(TDS_POOL * pool, TDS_POOL_USER *puser)
488 {
489 	TDS_POOL_MEMBER *pmbr;
490 	CONNECT_EVENT *ev;
491 
492 	puser->sock.poll_recv = false;
493 	puser->sock.poll_send = false;
494 
495 	DLIST_FOREACH(dlist_member, &pool->idle_members, pmbr) {
496 		assert(pmbr->current_user == NULL);
497 		assert(!pmbr->doing_async);
498 
499 		assert(pmbr->sock.tds);
500 
501 		if (!compatible_versions(pmbr->sock.tds, puser))
502 			continue;
503 
504 		pool_assign_member(pool, pmbr, puser);
505 
506 		/*
507 		 * make sure member wasn't idle more that the timeout
508 		 * otherwise it'll send the query and close leaving a
509 		 * hung client
510 		 */
511 		pmbr->last_used_tm = time(NULL);
512 		pmbr->sock.poll_recv = false;
513 		pmbr->sock.poll_send = false;
514 
515 		pool_user_finish_login(pool, puser);
516 		return pmbr;
517 	}
518 
519 	/* if we can open a new connection open it */
520 	if (pool->num_active_members >= pool->max_open_conn) {
521 		fprintf(stderr, "No idle members left, increase \"max pool conn\"\n");
522 		return NULL;
523 	}
524 
525 	pmbr = tds_new0(TDS_POOL_MEMBER, 1);
526 	if (!pmbr) {
527 		fprintf(stderr, "Out of memory\n");
528 		return NULL;
529 	}
530 
531 	tdsdump_log(TDS_DBG_INFO1, "No open connections left, opening new member\n");
532 
533 	ev = tds_new0(CONNECT_EVENT, 1);
534 	if (!ev) {
535 		free(pmbr);
536 		fprintf(stderr, "Out of memory\n");
537 		return NULL;
538 	}
539 	ev->pmbr = pmbr;
540 	ev->pool = pool;
541 	ev->tds_version = puser->login->tds_version;
542 
543 	if (tds_thread_create_detached(connect_proc, ev) != 0) {
544 		free(pmbr);
545 		free(ev);
546 		fprintf(stderr, "error creating thread\n");
547 		return NULL;
548 	}
549 	pmbr->doing_async = true;
550 
551 	pool->num_active_members++;
552 	dlist_member_append(&pool->idle_members, pmbr);
553 
554 	pool_assign_member(pool, pmbr, puser);
555 	puser->sock.poll_send = false;
556 	puser->sock.poll_recv = false;
557 
558 	return pmbr;
559 }
560