1 /* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "array.h"
5 #include "llist.h"
6 #include "ioloop.h"
7 #include "sql-api-private.h"
8 
9 #include <time.h>
10 
11 #define QUERY_TIMEOUT_SECS 6
12 
13 /* sqlpool events are separate from category:sql, because
14    they are usually not very interesting, and would only
15    make logging too noisy. They can be enabled explicitly.
16 */
17 static struct event_category event_category_sqlpool = {
18 	.name = "sqlpool",
19 };
20 
21 struct sqlpool_host {
22 	char *connect_string;
23 
24 	unsigned int connection_count;
25 };
26 
27 struct sqlpool_connection {
28 	struct sql_db *db;
29 	unsigned int host_idx;
30 };
31 
32 struct sqlpool_db {
33 	struct sql_db api;
34 
35 	pool_t pool;
36 	const struct sql_db *driver;
37 	unsigned int connection_limit;
38 
39 	ARRAY(struct sqlpool_host) hosts;
40 	/* all connections from all hosts */
41 	ARRAY(struct sqlpool_connection) all_connections;
42 	/* index of last connection in all_connections that was used to
43 	   send a query. */
44 	unsigned int last_query_conn_idx;
45 
46 	/* queued requests */
47 	struct sqlpool_request *requests_head, *requests_tail;
48 	struct timeout *request_to;
49 };
50 
51 struct sqlpool_request {
52 	struct sqlpool_request *prev, *next;
53 
54 	struct sqlpool_db *db;
55 	time_t created;
56 
57 	unsigned int host_idx;
58 	unsigned int retry_count;
59 
60 	struct event *event;
61 
62 	/* requests are a) queries */
63 	char *query;
64 	sql_query_callback_t *callback;
65 	void *context;
66 
67 	/* b) transaction waiters */
68 	struct sqlpool_transaction_context *trans;
69 };
70 
71 struct sqlpool_transaction_context {
72 	struct sql_transaction_context ctx;
73 
74 	sql_commit_callback_t *callback;
75 	void *context;
76 
77 	pool_t query_pool;
78 	struct sqlpool_request *commit_request;
79 };
80 
81 extern struct sql_db driver_sqlpool_db;
82 
83 static struct sqlpool_connection *
84 sqlpool_add_connection(struct sqlpool_db *db, struct sqlpool_host *host,
85 		       unsigned int host_idx);
86 static void
87 driver_sqlpool_query_callback(struct sql_result *result,
88 			      struct sqlpool_request *request);
89 static void
90 driver_sqlpool_commit_callback(const struct sql_commit_result *result,
91 			       struct sqlpool_transaction_context *ctx);
92 static void driver_sqlpool_deinit(struct sql_db *_db);
93 
94 static struct sqlpool_request * ATTR_NULL(2)
sqlpool_request_new(struct sqlpool_db * db,const char * query)95 sqlpool_request_new(struct sqlpool_db *db, const char *query)
96 {
97 	struct sqlpool_request *request;
98 
99 	request = i_new(struct sqlpool_request, 1);
100 	request->db = db;
101 	request->created = time(NULL);
102 	request->query = i_strdup(query);
103 	request->event = event_create(db->api.event);
104 	return request;
105 }
106 
107 static void
sqlpool_request_free(struct sqlpool_request ** _request)108 sqlpool_request_free(struct sqlpool_request **_request)
109 {
110 	struct sqlpool_request *request = *_request;
111 
112 	*_request = NULL;
113 
114 	i_assert(request->prev == NULL && request->next == NULL);
115 	event_unref(&request->event);
116 	i_free(request->query);
117 	i_free(request);
118 }
119 
120 static void
sqlpool_request_abort(struct sqlpool_request ** _request)121 sqlpool_request_abort(struct sqlpool_request **_request)
122 {
123 	struct sqlpool_request *request = *_request;
124 
125 	*_request = NULL;
126 
127 	if (request->callback != NULL)
128 		request->callback(&sql_not_connected_result, request->context);
129 
130 	i_assert(request->prev != NULL ||
131 		 request->db->requests_head == request);
132 	DLLIST2_REMOVE(&request->db->requests_head,
133 		       &request->db->requests_tail, request);
134 	sqlpool_request_free(&request);
135 }
136 
137 static struct sql_transaction_context *
driver_sqlpool_new_conn_trans(struct sqlpool_transaction_context * trans,struct sql_db * conndb)138 driver_sqlpool_new_conn_trans(struct sqlpool_transaction_context *trans,
139 			      struct sql_db *conndb)
140 {
141 	struct sql_transaction_context *conn_trans;
142 	struct sql_transaction_query *query;
143 
144 	conn_trans = sql_transaction_begin(conndb);
145 	/* backend will use our queries list (we might still append more
146 	   queries to the list) */
147 	conn_trans->head = trans->ctx.head;
148 	conn_trans->tail = trans->ctx.tail;
149 	for (query = conn_trans->head; query != NULL; query = query->next)
150 		query->trans = conn_trans;
151 	return conn_trans;
152 }
153 
154 static void
sqlpool_request_handle_transaction(struct sql_db * conndb,struct sqlpool_transaction_context * trans)155 sqlpool_request_handle_transaction(struct sql_db *conndb,
156 				   struct sqlpool_transaction_context *trans)
157 {
158 	struct sql_transaction_context *conn_trans;
159 
160 	sqlpool_request_free(&trans->commit_request);
161 	conn_trans = driver_sqlpool_new_conn_trans(trans, conndb);
162 	sql_transaction_commit(&conn_trans,
163 			       driver_sqlpool_commit_callback, trans);
164 }
165 
166 static void
sqlpool_request_send_next(struct sqlpool_db * db,struct sql_db * conndb)167 sqlpool_request_send_next(struct sqlpool_db *db, struct sql_db *conndb)
168 {
169 	struct sqlpool_request *request;
170 
171 	if (db->requests_head == NULL || !SQL_DB_IS_READY(conndb))
172 		return;
173 
174 	request = db->requests_head;
175 	DLLIST2_REMOVE(&db->requests_head, &db->requests_tail, request);
176 	timeout_reset(db->request_to);
177 
178 	if (request->query != NULL) {
179 		sql_query(conndb, request->query,
180 			  driver_sqlpool_query_callback, request);
181 	} else if (request->trans != NULL) {
182 		sqlpool_request_handle_transaction(conndb, request->trans);
183 	} else {
184 		i_unreached();
185 	}
186 }
187 
sqlpool_reconnect(struct sql_db * conndb)188 static void sqlpool_reconnect(struct sql_db *conndb)
189 {
190 	timeout_remove(&conndb->to_reconnect);
191 	(void)sql_connect(conndb);
192 }
193 
194 static struct sqlpool_host *
sqlpool_find_host_with_least_connections(struct sqlpool_db * db,unsigned int * host_idx_r)195 sqlpool_find_host_with_least_connections(struct sqlpool_db *db,
196 					 unsigned int *host_idx_r)
197 {
198 	struct sqlpool_host *hosts, *min = NULL;
199 	unsigned int i, count;
200 
201 	hosts = array_get_modifiable(&db->hosts, &count);
202 	i_assert(count > 0);
203 
204 	min = &hosts[0];
205 	*host_idx_r = 0;
206 
207 	for (i = 1; i < count; i++) {
208 		if (min->connection_count > hosts[i].connection_count) {
209 			min = &hosts[i];
210 			*host_idx_r = i;
211 		}
212 	}
213 	return min;
214 }
215 
sqlpool_have_successful_connections(struct sqlpool_db * db)216 static bool sqlpool_have_successful_connections(struct sqlpool_db *db)
217 {
218 	const struct sqlpool_connection *conn;
219 
220 	array_foreach(&db->all_connections, conn) {
221 		if (conn->db->state >= SQL_DB_STATE_IDLE)
222 			return TRUE;
223 	}
224 	return FALSE;
225 }
226 
227 static void
sqlpool_handle_connect_failed(struct sqlpool_db * db,struct sql_db * conndb)228 sqlpool_handle_connect_failed(struct sqlpool_db *db, struct sql_db *conndb)
229 {
230 	struct sqlpool_host *host;
231 	unsigned int host_idx;
232 
233 	if (conndb->connect_failure_count > 0) {
234 		/* increase delay between reconnections to this
235 		   server */
236 		conndb->connect_delay *= 5;
237 		if (conndb->connect_delay > SQL_CONNECT_MAX_DELAY)
238 			conndb->connect_delay = SQL_CONNECT_MAX_DELAY;
239 	}
240 	conndb->connect_failure_count++;
241 
242 	/* reconnect after the delay */
243 	timeout_remove(&conndb->to_reconnect);
244 	conndb->to_reconnect = timeout_add(conndb->connect_delay * 1000,
245 					   sqlpool_reconnect, conndb);
246 
247 	/* if we have zero successful hosts and there still are hosts
248 	   without connections, connect to one of them. */
249 	if (!sqlpool_have_successful_connections(db)) {
250 		host = sqlpool_find_host_with_least_connections(db, &host_idx);
251 		if (host->connection_count == 0)
252 			(void)sqlpool_add_connection(db, host, host_idx);
253 	}
254 }
255 
256 static void
sqlpool_state_changed(struct sql_db * conndb,enum sql_db_state prev_state,void * context)257 sqlpool_state_changed(struct sql_db *conndb, enum sql_db_state prev_state,
258 		      void *context)
259 {
260 	struct sqlpool_db *db = context;
261 
262 	if (conndb->state == SQL_DB_STATE_IDLE) {
263 		conndb->connect_failure_count = 0;
264 		conndb->connect_delay = SQL_CONNECT_MIN_DELAY;
265 		sqlpool_request_send_next(db, conndb);
266 	}
267 
268 	if (prev_state == SQL_DB_STATE_CONNECTING &&
269 	    conndb->state == SQL_DB_STATE_DISCONNECTED &&
270 	    !conndb->no_reconnect)
271 		sqlpool_handle_connect_failed(db, conndb);
272 }
273 
274 static struct sqlpool_connection *
sqlpool_add_connection(struct sqlpool_db * db,struct sqlpool_host * host,unsigned int host_idx)275 sqlpool_add_connection(struct sqlpool_db *db, struct sqlpool_host *host,
276 		       unsigned int host_idx)
277 {
278 	struct sql_db *conndb;
279 	struct sqlpool_connection *conn;
280 	const char *error;
281 	int ret = 0;
282 
283 	host->connection_count++;
284 
285 	e_debug(db->api.event, "Creating new connection");
286 
287 	if (db->driver->v.init_full == NULL) {
288 		conndb = db->driver->v.init(host->connect_string);
289 	} else {
290 		struct sql_settings set = {
291 			.connect_string = host->connect_string,
292 			.event_parent = event_get_parent(db->api.event),
293 		};
294 		ret = db->driver->v.init_full(&set, &conndb, &error);
295 	}
296 	if (ret < 0)
297 		i_fatal("sqlpool: %s", error);
298 
299 	sql_init_common(conndb);
300 
301 	conndb->state_change_callback = sqlpool_state_changed;
302 	conndb->state_change_context = db;
303 	conndb->connect_delay = SQL_CONNECT_MIN_DELAY;
304 
305 	conn = array_append_space(&db->all_connections);
306 	conn->host_idx = host_idx;
307 	conn->db = conndb;
308 	return conn;
309 }
310 
311 static struct sqlpool_connection *
sqlpool_add_new_connection(struct sqlpool_db * db)312 sqlpool_add_new_connection(struct sqlpool_db *db)
313 {
314 	struct sqlpool_host *host;
315 	unsigned int host_idx;
316 
317 	host = sqlpool_find_host_with_least_connections(db, &host_idx);
318 	if (host->connection_count >= db->connection_limit)
319 		return NULL;
320 	else
321 		return sqlpool_add_connection(db, host, host_idx);
322 }
323 
324 static const struct sqlpool_connection *
sqlpool_find_available_connection(struct sqlpool_db * db,unsigned int unwanted_host_idx,bool * all_disconnected_r)325 sqlpool_find_available_connection(struct sqlpool_db *db,
326 				  unsigned int unwanted_host_idx,
327 				  bool *all_disconnected_r)
328 {
329 	const struct sqlpool_connection *conns;
330 	unsigned int i, count;
331 
332 	*all_disconnected_r = TRUE;
333 
334 	conns = array_get(&db->all_connections, &count);
335 	for (i = 0; i < count; i++) {
336 		unsigned int idx = (i + db->last_query_conn_idx + 1) % count;
337 		struct sql_db *conndb = conns[idx].db;
338 
339 		if (conns[idx].host_idx == unwanted_host_idx)
340 			continue;
341 
342 		if (!SQL_DB_IS_READY(conndb) && conndb->to_reconnect == NULL) {
343 			/* see if we could reconnect to it immediately */
344 			(void)sql_connect(conndb);
345 		}
346 		if (SQL_DB_IS_READY(conndb)) {
347 			db->last_query_conn_idx = idx;
348 			*all_disconnected_r = FALSE;
349 			return &conns[idx];
350 		}
351 		if (conndb->state != SQL_DB_STATE_DISCONNECTED)
352 			*all_disconnected_r = FALSE;
353 	}
354 	return NULL;
355 }
356 
357 static bool
driver_sqlpool_get_connection(struct sqlpool_db * db,unsigned int unwanted_host_idx,const struct sqlpool_connection ** conn_r)358 driver_sqlpool_get_connection(struct sqlpool_db *db,
359 			      unsigned int unwanted_host_idx,
360 			      const struct sqlpool_connection **conn_r)
361 {
362 	const struct sqlpool_connection *conn, *conns;
363 	unsigned int i, count;
364 	bool all_disconnected;
365 
366 	conn = sqlpool_find_available_connection(db, unwanted_host_idx,
367 						 &all_disconnected);
368 	if (conn == NULL && unwanted_host_idx != UINT_MAX) {
369 		/* maybe there are no wanted hosts. use any of them. */
370 		conn = sqlpool_find_available_connection(db, UINT_MAX,
371 							 &all_disconnected);
372 	}
373 	if (conn == NULL && all_disconnected) {
374 		/* no connected connections. connect_delays may have gotten too
375 		   high, reset all of them to see if some are still alive. */
376 		conns = array_get(&db->all_connections, &count);
377 		for (i = 0; i < count; i++) {
378 			struct sql_db *conndb = conns[i].db;
379 
380 			if (conndb->connect_delay > SQL_CONNECT_RESET_DELAY)
381 				conndb->connect_delay = SQL_CONNECT_RESET_DELAY;
382 		}
383 		conn = sqlpool_find_available_connection(db, UINT_MAX,
384 							 &all_disconnected);
385 	}
386 	if (conn == NULL) {
387 		/* still nothing. try creating new connections */
388 		conn = sqlpool_add_new_connection(db);
389 		if (conn != NULL)
390 			(void)sql_connect(conn->db);
391 		if (conn == NULL || !SQL_DB_IS_READY(conn->db))
392 			return FALSE;
393 	}
394 	*conn_r = conn;
395 	return TRUE;
396 }
397 
398 static bool
driver_sqlpool_get_sync_connection(struct sqlpool_db * db,const struct sqlpool_connection ** conn_r)399 driver_sqlpool_get_sync_connection(struct sqlpool_db *db,
400 				   const struct sqlpool_connection **conn_r)
401 {
402 	const struct sqlpool_connection *conns;
403 	unsigned int i, count;
404 
405 	if (driver_sqlpool_get_connection(db, UINT_MAX, conn_r))
406 		return TRUE;
407 
408 	/* no idling connections, but maybe we can find one that's trying to
409 	   connect to server, and we can use it once it's finished */
410 	conns = array_get(&db->all_connections, &count);
411 	for (i = 0; i < count; i++) {
412 		if (conns[i].db->state == SQL_DB_STATE_CONNECTING) {
413 			*conn_r = &conns[i];
414 			return TRUE;
415 		}
416 	}
417 	return FALSE;
418 }
419 
420 static bool
driver_sqlpool_get_connected_flags(struct sqlpool_db * db,enum sql_db_flags * flags_r)421 driver_sqlpool_get_connected_flags(struct sqlpool_db *db,
422 				   enum sql_db_flags *flags_r)
423 {
424 	const struct sqlpool_connection *conn;
425 
426 	array_foreach(&db->all_connections, conn) {
427 		if (conn->db->state > SQL_DB_STATE_CONNECTING) {
428 			*flags_r = sql_get_flags(conn->db);
429 			return TRUE;
430 		}
431 	}
432 	return FALSE;
433 }
434 
driver_sqlpool_get_flags(struct sql_db * _db)435 static enum sql_db_flags driver_sqlpool_get_flags(struct sql_db *_db)
436 {
437 	struct sqlpool_db *db = (struct sqlpool_db *)_db;
438 	const struct sqlpool_connection *conn;
439 	enum sql_db_flags flags;
440 
441 	/* try to use a connected db */
442 	if (driver_sqlpool_get_connected_flags(db, &flags))
443 		return flags;
444 
445 	if (!driver_sqlpool_get_sync_connection(db, &conn)) {
446 		/* Failed to connect to database. Just use the first
447 		   connection. */
448 		conn = array_idx(&db->all_connections, 0);
449 	}
450 	return sql_get_flags(conn->db);
451 }
452 
453 static int
driver_sqlpool_parse_hosts(struct sqlpool_db * db,const char * connect_string,const char ** error_r)454 driver_sqlpool_parse_hosts(struct sqlpool_db *db, const char *connect_string,
455 			   const char **error_r)
456 {
457 	const char *const *args, *key, *value, *hostname;
458 	struct sqlpool_host *host;
459 	ARRAY_TYPE(const_string) hostnames, connect_args;
460 
461 	t_array_init(&hostnames, 8);
462 	t_array_init(&connect_args, 32);
463 
464 	/* connect string is a space separated list. it may contain
465 	   backend-specific strings which we'll pass as-is. we'll only care
466 	   about our own settings, plus the host settings. */
467 	args = t_strsplit_spaces(connect_string, " ");
468 	for (; *args != NULL; args++) {
469 		value = strchr(*args, '=');
470 		if (value == NULL) {
471 			key = *args;
472 			value = "";
473 		} else {
474 			key = t_strdup_until(*args, value);
475 			value++;
476 		}
477 
478 		if (strcmp(key, "maxconns") == 0) {
479 			if (str_to_uint(value, &db->connection_limit) < 0) {
480 				*error_r = t_strdup_printf("Invalid value for maxconns: %s",
481 					value);
482 				return -1;
483 			}
484 		} else if (strcmp(key, "host") == 0) {
485 			array_push_back(&hostnames, &value);
486 		} else {
487 			array_push_back(&connect_args, args);
488 		}
489 	}
490 
491 	/* build a new connect string without our settings or hosts */
492 	array_append_zero(&connect_args);
493 	connect_string = t_strarray_join(array_front(&connect_args), " ");
494 
495 	if (array_count(&hostnames) == 0) {
496 		/* no hosts specified. create a default one. */
497 		host = array_append_space(&db->hosts);
498 		host->connect_string = i_strdup(connect_string);
499 	} else {
500 		if (*connect_string == '\0')
501 			connect_string = NULL;
502 
503 		array_foreach_elem(&hostnames, hostname) {
504 			host = array_append_space(&db->hosts);
505 			host->connect_string =
506 				i_strconcat("host=", hostname, " ",
507 					    connect_string, NULL);
508 		}
509 	}
510 
511 	if (db->connection_limit == 0)
512 		db->connection_limit = SQL_DEFAULT_CONNECTION_LIMIT;
513 	return 0;
514 }
515 
sqlpool_add_all_once(struct sqlpool_db * db)516 static void sqlpool_add_all_once(struct sqlpool_db *db)
517 {
518 	struct sqlpool_host *host;
519 	unsigned int host_idx;
520 
521 	for (;;) {
522 		host = sqlpool_find_host_with_least_connections(db, &host_idx);
523 		if (host->connection_count > 0)
524 			break;
525 		(void)sqlpool_add_connection(db, host, host_idx);
526 	}
527 }
528 
driver_sqlpool_init_full(const struct sql_settings * set,const struct sql_db * driver,struct sql_db ** db_r,const char ** error_r)529 int driver_sqlpool_init_full(const struct sql_settings *set, const struct sql_db *driver,
530 			     struct sql_db **db_r, const char **error_r)
531 {
532 	struct sqlpool_db *db;
533 	int ret;
534 
535 	db = i_new(struct sqlpool_db, 1);
536 	db->driver = driver;
537 	db->api = driver_sqlpool_db;
538 	db->api.flags = driver->flags;
539 	db->api.event = event_create(set->event_parent);
540 	event_add_category(db->api.event, &event_category_sqlpool);
541 	event_set_append_log_prefix(db->api.event,
542 				    t_strdup_printf("sqlpool(%s): ", driver->name));
543 	i_array_init(&db->hosts, 8);
544 
545 	T_BEGIN {
546 		ret = driver_sqlpool_parse_hosts(db, set->connect_string,
547 						 error_r);
548 	} T_END_PASS_STR_IF(ret < 0, error_r);
549 
550 	if (ret < 0) {
551 		driver_sqlpool_deinit(&db->api);
552 		return ret;
553 	}
554 	i_array_init(&db->all_connections, 16);
555 	/* connect to all databases so we can do load balancing immediately */
556 	sqlpool_add_all_once(db);
557 
558 	*db_r = &db->api;
559 	return 0;
560 }
561 
driver_sqlpool_abort_requests(struct sqlpool_db * db)562 static void driver_sqlpool_abort_requests(struct sqlpool_db *db)
563 {
564 	while (db->requests_head != NULL) {
565 		struct sqlpool_request *request = db->requests_head;
566 
567 		sqlpool_request_abort(&request);
568 	}
569 	timeout_remove(&db->request_to);
570 }
571 
driver_sqlpool_deinit(struct sql_db * _db)572 static void driver_sqlpool_deinit(struct sql_db *_db)
573 {
574 	struct sqlpool_db *db = (struct sqlpool_db *)_db;
575 	struct sqlpool_host *host;
576 	struct sqlpool_connection *conn;
577 
578 	array_foreach_modifiable(&db->all_connections, conn)
579 		sql_unref(&conn->db);
580 	array_clear(&db->all_connections);
581 
582 	driver_sqlpool_abort_requests(db);
583 
584 	array_foreach_modifiable(&db->hosts, host)
585 		i_free(host->connect_string);
586 
587 	i_assert(array_count(&db->all_connections) == 0);
588 	array_free(&db->hosts);
589 	array_free(&db->all_connections);
590 	array_free(&_db->module_contexts);
591 	event_unref(&_db->event);
592 	i_free(db);
593 }
594 
driver_sqlpool_connect(struct sql_db * _db)595 static int driver_sqlpool_connect(struct sql_db *_db)
596 {
597 	struct sqlpool_db *db = (struct sqlpool_db *)_db;
598 	const struct sqlpool_connection *conn;
599 	int ret = -1, ret2;
600 
601 	array_foreach(&db->all_connections, conn) {
602 		ret2 = conn->db->to_reconnect != NULL ? -1 :
603 			sql_connect(conn->db);
604 		if (ret2 > 0)
605 			ret = 1;
606 		else if (ret2 == 0 && ret < 0)
607 			ret = 0;
608 	}
609 	return ret;
610 }
611 
driver_sqlpool_disconnect(struct sql_db * _db)612 static void driver_sqlpool_disconnect(struct sql_db *_db)
613 {
614 	struct sqlpool_db *db = (struct sqlpool_db *)_db;
615 	const struct sqlpool_connection *conn;
616 
617 	array_foreach(&db->all_connections, conn)
618 		sql_disconnect(conn->db);
619 	driver_sqlpool_abort_requests(db);
620 }
621 
622 static const char *
driver_sqlpool_escape_string(struct sql_db * _db,const char * string)623 driver_sqlpool_escape_string(struct sql_db *_db, const char *string)
624 {
625 	struct sqlpool_db *db = (struct sqlpool_db *)_db;
626 	const struct sqlpool_connection *conns;
627 	unsigned int i, count;
628 
629 	/* use the first ready connection */
630 	conns = array_get(&db->all_connections, &count);
631 	for (i = 0; i < count; i++) {
632 		if (SQL_DB_IS_READY(conns[i].db))
633 			return sql_escape_string(conns[i].db, string);
634 	}
635 	/* no ready connections. just use the first one (we're guaranteed
636 	   to always have one) */
637 	return sql_escape_string(conns[0].db, string);
638 }
639 
driver_sqlpool_timeout(struct sqlpool_db * db)640 static void driver_sqlpool_timeout(struct sqlpool_db *db)
641 {
642 	int duration;
643 
644 	while (db->requests_head != NULL) {
645 		struct sqlpool_request *request = db->requests_head;
646 
647 		if (request->created + SQL_QUERY_TIMEOUT_SECS > ioloop_time)
648 			break;
649 
650 
651 		if (request->query != NULL) {
652 			e_error(sql_query_finished_event(&db->api, request->event,
653 							 request->query, FALSE,
654 							 &duration)->
655 					add_str("error", "Query timed out")->
656 					event(),
657 				SQL_QUERY_FINISHED_FMT": Query timed out "
658 	                        "(no free connections for %u secs)",
659 				request->query, duration,
660 				(unsigned int)(ioloop_time - request->created));
661 		} else {
662 			e_error(event_create_passthrough(request->event)->
663 					add_str("error", "Timed out")->
664 					set_name(SQL_TRANSACTION_FINISHED)->event(),
665 				"Transaction timed out "
666 				"(no free connections for %u secs)",
667 				(unsigned int)(ioloop_time - request->created));
668 		}
669 		sqlpool_request_abort(&request);
670 	}
671 
672 	if (db->requests_head == NULL)
673 		timeout_remove(&db->request_to);
674 }
675 
676 static void
driver_sqlpool_prepend_request(struct sqlpool_db * db,struct sqlpool_request * request)677 driver_sqlpool_prepend_request(struct sqlpool_db *db,
678 			       struct sqlpool_request *request)
679 {
680 	DLLIST2_PREPEND(&db->requests_head, &db->requests_tail, request);
681 	if (db->request_to == NULL) {
682 		db->request_to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
683 					     driver_sqlpool_timeout, db);
684 	}
685 }
686 
687 static void
driver_sqlpool_append_request(struct sqlpool_db * db,struct sqlpool_request * request)688 driver_sqlpool_append_request(struct sqlpool_db *db,
689 			      struct sqlpool_request *request)
690 {
691 	DLLIST2_APPEND(&db->requests_head, &db->requests_tail, request);
692 	if (db->request_to == NULL) {
693 		db->request_to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
694 					     driver_sqlpool_timeout, db);
695 	}
696 }
697 
698 static void
driver_sqlpool_query_callback(struct sql_result * result,struct sqlpool_request * request)699 driver_sqlpool_query_callback(struct sql_result *result,
700 			      struct sqlpool_request *request)
701 {
702 	struct sqlpool_db *db = request->db;
703 	const struct sqlpool_connection *conn = NULL;
704 	struct sql_db *conndb;
705 
706 	if (result->failed_try_retry &&
707 	    request->retry_count < array_count(&db->hosts)) {
708 		e_warning(db->api.event, "Query failed, retrying: %s",
709 			  sql_result_get_error(result));
710 		request->retry_count++;
711 		driver_sqlpool_prepend_request(db, request);
712 
713 		if (driver_sqlpool_get_connection(request->db,
714 						  request->host_idx, &conn)) {
715 			request->host_idx = conn->host_idx;
716 			sqlpool_request_send_next(db, conn->db);
717 		}
718 	} else {
719 		if (result->failed) {
720 			e_error(db->api.event, "Query failed, aborting: %s",
721 				request->query);
722 		}
723 		conndb = result->db;
724 
725 		if (request->callback != NULL)
726 			request->callback(result, request->context);
727 		sqlpool_request_free(&request);
728 
729 		sqlpool_request_send_next(db, conndb);
730 	}
731 }
732 
733 static void ATTR_NULL(3, 4)
driver_sqlpool_query(struct sql_db * _db,const char * query,sql_query_callback_t * callback,void * context)734 driver_sqlpool_query(struct sql_db *_db, const char *query,
735 		     sql_query_callback_t *callback, void *context)
736 {
737         struct sqlpool_db *db = (struct sqlpool_db *)_db;
738 	struct sqlpool_request *request;
739 	const struct sqlpool_connection *conn;
740 
741 	request = sqlpool_request_new(db, query);
742 	request->callback = callback;
743 	request->context = context;
744 
745 	if (!driver_sqlpool_get_connection(db, UINT_MAX, &conn))
746 		driver_sqlpool_append_request(db, request);
747 	else {
748 		request->host_idx = conn->host_idx;
749 		sql_query(conn->db, query, driver_sqlpool_query_callback,
750 			  request);
751 	}
752 }
753 
driver_sqlpool_exec(struct sql_db * _db,const char * query)754 static void driver_sqlpool_exec(struct sql_db *_db, const char *query)
755 {
756 	driver_sqlpool_query(_db, query, NULL, NULL);
757 }
758 
759 static struct sql_result *
driver_sqlpool_query_s(struct sql_db * _db,const char * query)760 driver_sqlpool_query_s(struct sql_db *_db, const char *query)
761 {
762         struct sqlpool_db *db = (struct sqlpool_db *)_db;
763 	const struct sqlpool_connection *conn;
764 	struct sql_result *result;
765 
766 	if (!driver_sqlpool_get_sync_connection(db, &conn)) {
767 		sql_not_connected_result.refcount++;
768 		return &sql_not_connected_result;
769 	}
770 
771 	result = sql_query_s(conn->db, query);
772 	if (result->failed_try_retry) {
773 		if (!driver_sqlpool_get_sync_connection(db, &conn))
774 			return result;
775 
776 		sql_result_unref(result);
777 		result = sql_query_s(conn->db, query);
778 	}
779 	return result;
780 }
781 
782 static struct sql_transaction_context *
driver_sqlpool_transaction_begin(struct sql_db * _db)783 driver_sqlpool_transaction_begin(struct sql_db *_db)
784 {
785 	struct sqlpool_transaction_context *ctx;
786 
787 	ctx = i_new(struct sqlpool_transaction_context, 1);
788 	ctx->ctx.db = _db;
789 
790 	/* queue changes until commit. even if we did have a free connection
791 	   now, don't use it or multiple open transactions could tie up all
792 	   connections. */
793 	ctx->query_pool = pool_alloconly_create("sqlpool transaction", 1024);
794 	return &ctx->ctx;
795 }
796 
797 static void
driver_sqlpool_transaction_free(struct sqlpool_transaction_context * ctx)798 driver_sqlpool_transaction_free(struct sqlpool_transaction_context *ctx)
799 {
800 	if (ctx->commit_request != NULL)
801 		sqlpool_request_abort(&ctx->commit_request);
802 	pool_unref(&ctx->query_pool);
803 	i_free(ctx);
804 }
805 
806 static void
driver_sqlpool_commit_callback(const struct sql_commit_result * result,struct sqlpool_transaction_context * ctx)807 driver_sqlpool_commit_callback(const struct sql_commit_result *result,
808 			       struct sqlpool_transaction_context *ctx)
809 {
810 	ctx->callback(result, ctx->context);
811 	driver_sqlpool_transaction_free(ctx);
812 }
813 
814 static void
driver_sqlpool_transaction_commit(struct sql_transaction_context * _ctx,sql_commit_callback_t * callback,void * context)815 driver_sqlpool_transaction_commit(struct sql_transaction_context *_ctx,
816 				  sql_commit_callback_t *callback,
817 				  void *context)
818 {
819 	struct sqlpool_transaction_context *ctx =
820 		(struct sqlpool_transaction_context *)_ctx;
821 	struct sqlpool_db *db = (struct sqlpool_db *)_ctx->db;
822 	const struct sqlpool_connection *conn;
823 
824 	ctx->callback = callback;
825 	ctx->context = context;
826 
827 	ctx->commit_request = sqlpool_request_new(db, NULL);
828 	ctx->commit_request->trans = ctx;
829 
830 	if (driver_sqlpool_get_connection(db, UINT_MAX, &conn))
831 		sqlpool_request_handle_transaction(conn->db, ctx);
832 	else
833 		driver_sqlpool_append_request(db, ctx->commit_request);
834 }
835 
836 static int
driver_sqlpool_transaction_commit_s(struct sql_transaction_context * _ctx,const char ** error_r)837 driver_sqlpool_transaction_commit_s(struct sql_transaction_context *_ctx,
838 				    const char **error_r)
839 {
840 	struct sqlpool_transaction_context *ctx =
841 		(struct sqlpool_transaction_context *)_ctx;
842         struct sqlpool_db *db = (struct sqlpool_db *)_ctx->db;
843 	const struct sqlpool_connection *conn;
844 	struct sql_transaction_context *conn_trans;
845 	int ret;
846 
847 	*error_r = NULL;
848 
849 	if (!driver_sqlpool_get_sync_connection(db, &conn)) {
850 		*error_r = SQL_ERRSTR_NOT_CONNECTED;
851 		driver_sqlpool_transaction_free(ctx);
852 		return -1;
853 	}
854 
855 	conn_trans = driver_sqlpool_new_conn_trans(ctx, conn->db);
856 	ret = sql_transaction_commit_s(&conn_trans, error_r);
857 	driver_sqlpool_transaction_free(ctx);
858 	return ret;
859 }
860 
861 static void
driver_sqlpool_transaction_rollback(struct sql_transaction_context * _ctx)862 driver_sqlpool_transaction_rollback(struct sql_transaction_context *_ctx)
863 {
864 	struct sqlpool_transaction_context *ctx =
865 		(struct sqlpool_transaction_context *)_ctx;
866 
867 	driver_sqlpool_transaction_free(ctx);
868 }
869 
870 static void
driver_sqlpool_update(struct sql_transaction_context * _ctx,const char * query,unsigned int * affected_rows)871 driver_sqlpool_update(struct sql_transaction_context *_ctx, const char *query,
872 		      unsigned int *affected_rows)
873 {
874 	struct sqlpool_transaction_context *ctx =
875 		(struct sqlpool_transaction_context *)_ctx;
876 
877 	/* we didn't get a connection for transaction immediately.
878 	   queue updates until commit transfers all of these */
879 	sql_transaction_add_query(&ctx->ctx, ctx->query_pool,
880 				  query, affected_rows);
881 }
882 
883 static const char *
driver_sqlpool_escape_blob(struct sql_db * _db,const unsigned char * data,size_t size)884 driver_sqlpool_escape_blob(struct sql_db *_db,
885 			   const unsigned char *data, size_t size)
886 {
887 	struct sqlpool_db *db = (struct sqlpool_db *)_db;
888 	const struct sqlpool_connection *conns;
889 	unsigned int i, count;
890 
891 	/* use the first ready connection */
892 	conns = array_get(&db->all_connections, &count);
893 	for (i = 0; i < count; i++) {
894 		if (SQL_DB_IS_READY(conns[i].db))
895 			return sql_escape_blob(conns[i].db, data, size);
896 	}
897 	/* no ready connections. just use the first one (we're guaranteed
898 	   to always have one) */
899 	return sql_escape_blob(conns[0].db, data, size);
900 }
901 
driver_sqlpool_wait(struct sql_db * _db)902 static void driver_sqlpool_wait(struct sql_db *_db)
903 {
904 	struct sqlpool_db *db = (struct sqlpool_db *)_db;
905 	const struct sqlpool_connection *conn;
906 
907 	array_foreach(&db->all_connections, conn)
908 		sql_wait(conn->db);
909 }
910 
911 struct sql_db driver_sqlpool_db = {
912 	"",
913 
914 	.v = {
915 		.get_flags = driver_sqlpool_get_flags,
916 		.deinit = driver_sqlpool_deinit,
917 		.connect = driver_sqlpool_connect,
918 		.disconnect = driver_sqlpool_disconnect,
919 		.escape_string = driver_sqlpool_escape_string,
920 		.exec = driver_sqlpool_exec,
921 		.query = driver_sqlpool_query,
922 		.query_s = driver_sqlpool_query_s,
923 		.wait = driver_sqlpool_wait,
924 
925 		.transaction_begin = driver_sqlpool_transaction_begin,
926 		.transaction_commit = driver_sqlpool_transaction_commit,
927 		.transaction_commit_s = driver_sqlpool_transaction_commit_s,
928 		.transaction_rollback = driver_sqlpool_transaction_rollback,
929 
930 		.update = driver_sqlpool_update,
931 
932 		.escape_blob = driver_sqlpool_escape_blob,
933 	}
934 };
935