1 /* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "array.h"
5 #include "llist.h"
6 #include "ioloop.h"
7 #include "sql-api-private.h"
8
9 #include <time.h>
10
11 #define QUERY_TIMEOUT_SECS 6
12
13 /* sqlpool events are separate from category:sql, because
14 they are usually not very interesting, and would only
15 make logging too noisy. They can be enabled explicitly.
16 */
17 static struct event_category event_category_sqlpool = {
18 .name = "sqlpool",
19 };
20
21 struct sqlpool_host {
22 char *connect_string;
23
24 unsigned int connection_count;
25 };
26
27 struct sqlpool_connection {
28 struct sql_db *db;
29 unsigned int host_idx;
30 };
31
32 struct sqlpool_db {
33 struct sql_db api;
34
35 pool_t pool;
36 const struct sql_db *driver;
37 unsigned int connection_limit;
38
39 ARRAY(struct sqlpool_host) hosts;
40 /* all connections from all hosts */
41 ARRAY(struct sqlpool_connection) all_connections;
42 /* index of last connection in all_connections that was used to
43 send a query. */
44 unsigned int last_query_conn_idx;
45
46 /* queued requests */
47 struct sqlpool_request *requests_head, *requests_tail;
48 struct timeout *request_to;
49 };
50
51 struct sqlpool_request {
52 struct sqlpool_request *prev, *next;
53
54 struct sqlpool_db *db;
55 time_t created;
56
57 unsigned int host_idx;
58 unsigned int retry_count;
59
60 struct event *event;
61
62 /* requests are a) queries */
63 char *query;
64 sql_query_callback_t *callback;
65 void *context;
66
67 /* b) transaction waiters */
68 struct sqlpool_transaction_context *trans;
69 };
70
71 struct sqlpool_transaction_context {
72 struct sql_transaction_context ctx;
73
74 sql_commit_callback_t *callback;
75 void *context;
76
77 pool_t query_pool;
78 struct sqlpool_request *commit_request;
79 };
80
81 extern struct sql_db driver_sqlpool_db;
82
83 static struct sqlpool_connection *
84 sqlpool_add_connection(struct sqlpool_db *db, struct sqlpool_host *host,
85 unsigned int host_idx);
86 static void
87 driver_sqlpool_query_callback(struct sql_result *result,
88 struct sqlpool_request *request);
89 static void
90 driver_sqlpool_commit_callback(const struct sql_commit_result *result,
91 struct sqlpool_transaction_context *ctx);
92 static void driver_sqlpool_deinit(struct sql_db *_db);
93
94 static struct sqlpool_request * ATTR_NULL(2)
sqlpool_request_new(struct sqlpool_db * db,const char * query)95 sqlpool_request_new(struct sqlpool_db *db, const char *query)
96 {
97 struct sqlpool_request *request;
98
99 request = i_new(struct sqlpool_request, 1);
100 request->db = db;
101 request->created = time(NULL);
102 request->query = i_strdup(query);
103 request->event = event_create(db->api.event);
104 return request;
105 }
106
107 static void
sqlpool_request_free(struct sqlpool_request ** _request)108 sqlpool_request_free(struct sqlpool_request **_request)
109 {
110 struct sqlpool_request *request = *_request;
111
112 *_request = NULL;
113
114 i_assert(request->prev == NULL && request->next == NULL);
115 event_unref(&request->event);
116 i_free(request->query);
117 i_free(request);
118 }
119
120 static void
sqlpool_request_abort(struct sqlpool_request ** _request)121 sqlpool_request_abort(struct sqlpool_request **_request)
122 {
123 struct sqlpool_request *request = *_request;
124
125 *_request = NULL;
126
127 if (request->callback != NULL)
128 request->callback(&sql_not_connected_result, request->context);
129
130 i_assert(request->prev != NULL ||
131 request->db->requests_head == request);
132 DLLIST2_REMOVE(&request->db->requests_head,
133 &request->db->requests_tail, request);
134 sqlpool_request_free(&request);
135 }
136
137 static struct sql_transaction_context *
driver_sqlpool_new_conn_trans(struct sqlpool_transaction_context * trans,struct sql_db * conndb)138 driver_sqlpool_new_conn_trans(struct sqlpool_transaction_context *trans,
139 struct sql_db *conndb)
140 {
141 struct sql_transaction_context *conn_trans;
142 struct sql_transaction_query *query;
143
144 conn_trans = sql_transaction_begin(conndb);
145 /* backend will use our queries list (we might still append more
146 queries to the list) */
147 conn_trans->head = trans->ctx.head;
148 conn_trans->tail = trans->ctx.tail;
149 for (query = conn_trans->head; query != NULL; query = query->next)
150 query->trans = conn_trans;
151 return conn_trans;
152 }
153
154 static void
sqlpool_request_handle_transaction(struct sql_db * conndb,struct sqlpool_transaction_context * trans)155 sqlpool_request_handle_transaction(struct sql_db *conndb,
156 struct sqlpool_transaction_context *trans)
157 {
158 struct sql_transaction_context *conn_trans;
159
160 sqlpool_request_free(&trans->commit_request);
161 conn_trans = driver_sqlpool_new_conn_trans(trans, conndb);
162 sql_transaction_commit(&conn_trans,
163 driver_sqlpool_commit_callback, trans);
164 }
165
166 static void
sqlpool_request_send_next(struct sqlpool_db * db,struct sql_db * conndb)167 sqlpool_request_send_next(struct sqlpool_db *db, struct sql_db *conndb)
168 {
169 struct sqlpool_request *request;
170
171 if (db->requests_head == NULL || !SQL_DB_IS_READY(conndb))
172 return;
173
174 request = db->requests_head;
175 DLLIST2_REMOVE(&db->requests_head, &db->requests_tail, request);
176 timeout_reset(db->request_to);
177
178 if (request->query != NULL) {
179 sql_query(conndb, request->query,
180 driver_sqlpool_query_callback, request);
181 } else if (request->trans != NULL) {
182 sqlpool_request_handle_transaction(conndb, request->trans);
183 } else {
184 i_unreached();
185 }
186 }
187
sqlpool_reconnect(struct sql_db * conndb)188 static void sqlpool_reconnect(struct sql_db *conndb)
189 {
190 timeout_remove(&conndb->to_reconnect);
191 (void)sql_connect(conndb);
192 }
193
194 static struct sqlpool_host *
sqlpool_find_host_with_least_connections(struct sqlpool_db * db,unsigned int * host_idx_r)195 sqlpool_find_host_with_least_connections(struct sqlpool_db *db,
196 unsigned int *host_idx_r)
197 {
198 struct sqlpool_host *hosts, *min = NULL;
199 unsigned int i, count;
200
201 hosts = array_get_modifiable(&db->hosts, &count);
202 i_assert(count > 0);
203
204 min = &hosts[0];
205 *host_idx_r = 0;
206
207 for (i = 1; i < count; i++) {
208 if (min->connection_count > hosts[i].connection_count) {
209 min = &hosts[i];
210 *host_idx_r = i;
211 }
212 }
213 return min;
214 }
215
sqlpool_have_successful_connections(struct sqlpool_db * db)216 static bool sqlpool_have_successful_connections(struct sqlpool_db *db)
217 {
218 const struct sqlpool_connection *conn;
219
220 array_foreach(&db->all_connections, conn) {
221 if (conn->db->state >= SQL_DB_STATE_IDLE)
222 return TRUE;
223 }
224 return FALSE;
225 }
226
227 static void
sqlpool_handle_connect_failed(struct sqlpool_db * db,struct sql_db * conndb)228 sqlpool_handle_connect_failed(struct sqlpool_db *db, struct sql_db *conndb)
229 {
230 struct sqlpool_host *host;
231 unsigned int host_idx;
232
233 if (conndb->connect_failure_count > 0) {
234 /* increase delay between reconnections to this
235 server */
236 conndb->connect_delay *= 5;
237 if (conndb->connect_delay > SQL_CONNECT_MAX_DELAY)
238 conndb->connect_delay = SQL_CONNECT_MAX_DELAY;
239 }
240 conndb->connect_failure_count++;
241
242 /* reconnect after the delay */
243 timeout_remove(&conndb->to_reconnect);
244 conndb->to_reconnect = timeout_add(conndb->connect_delay * 1000,
245 sqlpool_reconnect, conndb);
246
247 /* if we have zero successful hosts and there still are hosts
248 without connections, connect to one of them. */
249 if (!sqlpool_have_successful_connections(db)) {
250 host = sqlpool_find_host_with_least_connections(db, &host_idx);
251 if (host->connection_count == 0)
252 (void)sqlpool_add_connection(db, host, host_idx);
253 }
254 }
255
256 static void
sqlpool_state_changed(struct sql_db * conndb,enum sql_db_state prev_state,void * context)257 sqlpool_state_changed(struct sql_db *conndb, enum sql_db_state prev_state,
258 void *context)
259 {
260 struct sqlpool_db *db = context;
261
262 if (conndb->state == SQL_DB_STATE_IDLE) {
263 conndb->connect_failure_count = 0;
264 conndb->connect_delay = SQL_CONNECT_MIN_DELAY;
265 sqlpool_request_send_next(db, conndb);
266 }
267
268 if (prev_state == SQL_DB_STATE_CONNECTING &&
269 conndb->state == SQL_DB_STATE_DISCONNECTED &&
270 !conndb->no_reconnect)
271 sqlpool_handle_connect_failed(db, conndb);
272 }
273
274 static struct sqlpool_connection *
sqlpool_add_connection(struct sqlpool_db * db,struct sqlpool_host * host,unsigned int host_idx)275 sqlpool_add_connection(struct sqlpool_db *db, struct sqlpool_host *host,
276 unsigned int host_idx)
277 {
278 struct sql_db *conndb;
279 struct sqlpool_connection *conn;
280 const char *error;
281 int ret = 0;
282
283 host->connection_count++;
284
285 e_debug(db->api.event, "Creating new connection");
286
287 if (db->driver->v.init_full == NULL) {
288 conndb = db->driver->v.init(host->connect_string);
289 } else {
290 struct sql_settings set = {
291 .connect_string = host->connect_string,
292 .event_parent = event_get_parent(db->api.event),
293 };
294 ret = db->driver->v.init_full(&set, &conndb, &error);
295 }
296 if (ret < 0)
297 i_fatal("sqlpool: %s", error);
298
299 sql_init_common(conndb);
300
301 conndb->state_change_callback = sqlpool_state_changed;
302 conndb->state_change_context = db;
303 conndb->connect_delay = SQL_CONNECT_MIN_DELAY;
304
305 conn = array_append_space(&db->all_connections);
306 conn->host_idx = host_idx;
307 conn->db = conndb;
308 return conn;
309 }
310
311 static struct sqlpool_connection *
sqlpool_add_new_connection(struct sqlpool_db * db)312 sqlpool_add_new_connection(struct sqlpool_db *db)
313 {
314 struct sqlpool_host *host;
315 unsigned int host_idx;
316
317 host = sqlpool_find_host_with_least_connections(db, &host_idx);
318 if (host->connection_count >= db->connection_limit)
319 return NULL;
320 else
321 return sqlpool_add_connection(db, host, host_idx);
322 }
323
324 static const struct sqlpool_connection *
sqlpool_find_available_connection(struct sqlpool_db * db,unsigned int unwanted_host_idx,bool * all_disconnected_r)325 sqlpool_find_available_connection(struct sqlpool_db *db,
326 unsigned int unwanted_host_idx,
327 bool *all_disconnected_r)
328 {
329 const struct sqlpool_connection *conns;
330 unsigned int i, count;
331
332 *all_disconnected_r = TRUE;
333
334 conns = array_get(&db->all_connections, &count);
335 for (i = 0; i < count; i++) {
336 unsigned int idx = (i + db->last_query_conn_idx + 1) % count;
337 struct sql_db *conndb = conns[idx].db;
338
339 if (conns[idx].host_idx == unwanted_host_idx)
340 continue;
341
342 if (!SQL_DB_IS_READY(conndb) && conndb->to_reconnect == NULL) {
343 /* see if we could reconnect to it immediately */
344 (void)sql_connect(conndb);
345 }
346 if (SQL_DB_IS_READY(conndb)) {
347 db->last_query_conn_idx = idx;
348 *all_disconnected_r = FALSE;
349 return &conns[idx];
350 }
351 if (conndb->state != SQL_DB_STATE_DISCONNECTED)
352 *all_disconnected_r = FALSE;
353 }
354 return NULL;
355 }
356
357 static bool
driver_sqlpool_get_connection(struct sqlpool_db * db,unsigned int unwanted_host_idx,const struct sqlpool_connection ** conn_r)358 driver_sqlpool_get_connection(struct sqlpool_db *db,
359 unsigned int unwanted_host_idx,
360 const struct sqlpool_connection **conn_r)
361 {
362 const struct sqlpool_connection *conn, *conns;
363 unsigned int i, count;
364 bool all_disconnected;
365
366 conn = sqlpool_find_available_connection(db, unwanted_host_idx,
367 &all_disconnected);
368 if (conn == NULL && unwanted_host_idx != UINT_MAX) {
369 /* maybe there are no wanted hosts. use any of them. */
370 conn = sqlpool_find_available_connection(db, UINT_MAX,
371 &all_disconnected);
372 }
373 if (conn == NULL && all_disconnected) {
374 /* no connected connections. connect_delays may have gotten too
375 high, reset all of them to see if some are still alive. */
376 conns = array_get(&db->all_connections, &count);
377 for (i = 0; i < count; i++) {
378 struct sql_db *conndb = conns[i].db;
379
380 if (conndb->connect_delay > SQL_CONNECT_RESET_DELAY)
381 conndb->connect_delay = SQL_CONNECT_RESET_DELAY;
382 }
383 conn = sqlpool_find_available_connection(db, UINT_MAX,
384 &all_disconnected);
385 }
386 if (conn == NULL) {
387 /* still nothing. try creating new connections */
388 conn = sqlpool_add_new_connection(db);
389 if (conn != NULL)
390 (void)sql_connect(conn->db);
391 if (conn == NULL || !SQL_DB_IS_READY(conn->db))
392 return FALSE;
393 }
394 *conn_r = conn;
395 return TRUE;
396 }
397
398 static bool
driver_sqlpool_get_sync_connection(struct sqlpool_db * db,const struct sqlpool_connection ** conn_r)399 driver_sqlpool_get_sync_connection(struct sqlpool_db *db,
400 const struct sqlpool_connection **conn_r)
401 {
402 const struct sqlpool_connection *conns;
403 unsigned int i, count;
404
405 if (driver_sqlpool_get_connection(db, UINT_MAX, conn_r))
406 return TRUE;
407
408 /* no idling connections, but maybe we can find one that's trying to
409 connect to server, and we can use it once it's finished */
410 conns = array_get(&db->all_connections, &count);
411 for (i = 0; i < count; i++) {
412 if (conns[i].db->state == SQL_DB_STATE_CONNECTING) {
413 *conn_r = &conns[i];
414 return TRUE;
415 }
416 }
417 return FALSE;
418 }
419
420 static bool
driver_sqlpool_get_connected_flags(struct sqlpool_db * db,enum sql_db_flags * flags_r)421 driver_sqlpool_get_connected_flags(struct sqlpool_db *db,
422 enum sql_db_flags *flags_r)
423 {
424 const struct sqlpool_connection *conn;
425
426 array_foreach(&db->all_connections, conn) {
427 if (conn->db->state > SQL_DB_STATE_CONNECTING) {
428 *flags_r = sql_get_flags(conn->db);
429 return TRUE;
430 }
431 }
432 return FALSE;
433 }
434
driver_sqlpool_get_flags(struct sql_db * _db)435 static enum sql_db_flags driver_sqlpool_get_flags(struct sql_db *_db)
436 {
437 struct sqlpool_db *db = (struct sqlpool_db *)_db;
438 const struct sqlpool_connection *conn;
439 enum sql_db_flags flags;
440
441 /* try to use a connected db */
442 if (driver_sqlpool_get_connected_flags(db, &flags))
443 return flags;
444
445 if (!driver_sqlpool_get_sync_connection(db, &conn)) {
446 /* Failed to connect to database. Just use the first
447 connection. */
448 conn = array_idx(&db->all_connections, 0);
449 }
450 return sql_get_flags(conn->db);
451 }
452
453 static int
driver_sqlpool_parse_hosts(struct sqlpool_db * db,const char * connect_string,const char ** error_r)454 driver_sqlpool_parse_hosts(struct sqlpool_db *db, const char *connect_string,
455 const char **error_r)
456 {
457 const char *const *args, *key, *value, *hostname;
458 struct sqlpool_host *host;
459 ARRAY_TYPE(const_string) hostnames, connect_args;
460
461 t_array_init(&hostnames, 8);
462 t_array_init(&connect_args, 32);
463
464 /* connect string is a space separated list. it may contain
465 backend-specific strings which we'll pass as-is. we'll only care
466 about our own settings, plus the host settings. */
467 args = t_strsplit_spaces(connect_string, " ");
468 for (; *args != NULL; args++) {
469 value = strchr(*args, '=');
470 if (value == NULL) {
471 key = *args;
472 value = "";
473 } else {
474 key = t_strdup_until(*args, value);
475 value++;
476 }
477
478 if (strcmp(key, "maxconns") == 0) {
479 if (str_to_uint(value, &db->connection_limit) < 0) {
480 *error_r = t_strdup_printf("Invalid value for maxconns: %s",
481 value);
482 return -1;
483 }
484 } else if (strcmp(key, "host") == 0) {
485 array_push_back(&hostnames, &value);
486 } else {
487 array_push_back(&connect_args, args);
488 }
489 }
490
491 /* build a new connect string without our settings or hosts */
492 array_append_zero(&connect_args);
493 connect_string = t_strarray_join(array_front(&connect_args), " ");
494
495 if (array_count(&hostnames) == 0) {
496 /* no hosts specified. create a default one. */
497 host = array_append_space(&db->hosts);
498 host->connect_string = i_strdup(connect_string);
499 } else {
500 if (*connect_string == '\0')
501 connect_string = NULL;
502
503 array_foreach_elem(&hostnames, hostname) {
504 host = array_append_space(&db->hosts);
505 host->connect_string =
506 i_strconcat("host=", hostname, " ",
507 connect_string, NULL);
508 }
509 }
510
511 if (db->connection_limit == 0)
512 db->connection_limit = SQL_DEFAULT_CONNECTION_LIMIT;
513 return 0;
514 }
515
sqlpool_add_all_once(struct sqlpool_db * db)516 static void sqlpool_add_all_once(struct sqlpool_db *db)
517 {
518 struct sqlpool_host *host;
519 unsigned int host_idx;
520
521 for (;;) {
522 host = sqlpool_find_host_with_least_connections(db, &host_idx);
523 if (host->connection_count > 0)
524 break;
525 (void)sqlpool_add_connection(db, host, host_idx);
526 }
527 }
528
driver_sqlpool_init_full(const struct sql_settings * set,const struct sql_db * driver,struct sql_db ** db_r,const char ** error_r)529 int driver_sqlpool_init_full(const struct sql_settings *set, const struct sql_db *driver,
530 struct sql_db **db_r, const char **error_r)
531 {
532 struct sqlpool_db *db;
533 int ret;
534
535 db = i_new(struct sqlpool_db, 1);
536 db->driver = driver;
537 db->api = driver_sqlpool_db;
538 db->api.flags = driver->flags;
539 db->api.event = event_create(set->event_parent);
540 event_add_category(db->api.event, &event_category_sqlpool);
541 event_set_append_log_prefix(db->api.event,
542 t_strdup_printf("sqlpool(%s): ", driver->name));
543 i_array_init(&db->hosts, 8);
544
545 T_BEGIN {
546 ret = driver_sqlpool_parse_hosts(db, set->connect_string,
547 error_r);
548 } T_END_PASS_STR_IF(ret < 0, error_r);
549
550 if (ret < 0) {
551 driver_sqlpool_deinit(&db->api);
552 return ret;
553 }
554 i_array_init(&db->all_connections, 16);
555 /* connect to all databases so we can do load balancing immediately */
556 sqlpool_add_all_once(db);
557
558 *db_r = &db->api;
559 return 0;
560 }
561
driver_sqlpool_abort_requests(struct sqlpool_db * db)562 static void driver_sqlpool_abort_requests(struct sqlpool_db *db)
563 {
564 while (db->requests_head != NULL) {
565 struct sqlpool_request *request = db->requests_head;
566
567 sqlpool_request_abort(&request);
568 }
569 timeout_remove(&db->request_to);
570 }
571
driver_sqlpool_deinit(struct sql_db * _db)572 static void driver_sqlpool_deinit(struct sql_db *_db)
573 {
574 struct sqlpool_db *db = (struct sqlpool_db *)_db;
575 struct sqlpool_host *host;
576 struct sqlpool_connection *conn;
577
578 array_foreach_modifiable(&db->all_connections, conn)
579 sql_unref(&conn->db);
580 array_clear(&db->all_connections);
581
582 driver_sqlpool_abort_requests(db);
583
584 array_foreach_modifiable(&db->hosts, host)
585 i_free(host->connect_string);
586
587 i_assert(array_count(&db->all_connections) == 0);
588 array_free(&db->hosts);
589 array_free(&db->all_connections);
590 array_free(&_db->module_contexts);
591 event_unref(&_db->event);
592 i_free(db);
593 }
594
driver_sqlpool_connect(struct sql_db * _db)595 static int driver_sqlpool_connect(struct sql_db *_db)
596 {
597 struct sqlpool_db *db = (struct sqlpool_db *)_db;
598 const struct sqlpool_connection *conn;
599 int ret = -1, ret2;
600
601 array_foreach(&db->all_connections, conn) {
602 ret2 = conn->db->to_reconnect != NULL ? -1 :
603 sql_connect(conn->db);
604 if (ret2 > 0)
605 ret = 1;
606 else if (ret2 == 0 && ret < 0)
607 ret = 0;
608 }
609 return ret;
610 }
611
driver_sqlpool_disconnect(struct sql_db * _db)612 static void driver_sqlpool_disconnect(struct sql_db *_db)
613 {
614 struct sqlpool_db *db = (struct sqlpool_db *)_db;
615 const struct sqlpool_connection *conn;
616
617 array_foreach(&db->all_connections, conn)
618 sql_disconnect(conn->db);
619 driver_sqlpool_abort_requests(db);
620 }
621
622 static const char *
driver_sqlpool_escape_string(struct sql_db * _db,const char * string)623 driver_sqlpool_escape_string(struct sql_db *_db, const char *string)
624 {
625 struct sqlpool_db *db = (struct sqlpool_db *)_db;
626 const struct sqlpool_connection *conns;
627 unsigned int i, count;
628
629 /* use the first ready connection */
630 conns = array_get(&db->all_connections, &count);
631 for (i = 0; i < count; i++) {
632 if (SQL_DB_IS_READY(conns[i].db))
633 return sql_escape_string(conns[i].db, string);
634 }
635 /* no ready connections. just use the first one (we're guaranteed
636 to always have one) */
637 return sql_escape_string(conns[0].db, string);
638 }
639
driver_sqlpool_timeout(struct sqlpool_db * db)640 static void driver_sqlpool_timeout(struct sqlpool_db *db)
641 {
642 int duration;
643
644 while (db->requests_head != NULL) {
645 struct sqlpool_request *request = db->requests_head;
646
647 if (request->created + SQL_QUERY_TIMEOUT_SECS > ioloop_time)
648 break;
649
650
651 if (request->query != NULL) {
652 e_error(sql_query_finished_event(&db->api, request->event,
653 request->query, FALSE,
654 &duration)->
655 add_str("error", "Query timed out")->
656 event(),
657 SQL_QUERY_FINISHED_FMT": Query timed out "
658 "(no free connections for %u secs)",
659 request->query, duration,
660 (unsigned int)(ioloop_time - request->created));
661 } else {
662 e_error(event_create_passthrough(request->event)->
663 add_str("error", "Timed out")->
664 set_name(SQL_TRANSACTION_FINISHED)->event(),
665 "Transaction timed out "
666 "(no free connections for %u secs)",
667 (unsigned int)(ioloop_time - request->created));
668 }
669 sqlpool_request_abort(&request);
670 }
671
672 if (db->requests_head == NULL)
673 timeout_remove(&db->request_to);
674 }
675
676 static void
driver_sqlpool_prepend_request(struct sqlpool_db * db,struct sqlpool_request * request)677 driver_sqlpool_prepend_request(struct sqlpool_db *db,
678 struct sqlpool_request *request)
679 {
680 DLLIST2_PREPEND(&db->requests_head, &db->requests_tail, request);
681 if (db->request_to == NULL) {
682 db->request_to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
683 driver_sqlpool_timeout, db);
684 }
685 }
686
687 static void
driver_sqlpool_append_request(struct sqlpool_db * db,struct sqlpool_request * request)688 driver_sqlpool_append_request(struct sqlpool_db *db,
689 struct sqlpool_request *request)
690 {
691 DLLIST2_APPEND(&db->requests_head, &db->requests_tail, request);
692 if (db->request_to == NULL) {
693 db->request_to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
694 driver_sqlpool_timeout, db);
695 }
696 }
697
698 static void
driver_sqlpool_query_callback(struct sql_result * result,struct sqlpool_request * request)699 driver_sqlpool_query_callback(struct sql_result *result,
700 struct sqlpool_request *request)
701 {
702 struct sqlpool_db *db = request->db;
703 const struct sqlpool_connection *conn = NULL;
704 struct sql_db *conndb;
705
706 if (result->failed_try_retry &&
707 request->retry_count < array_count(&db->hosts)) {
708 e_warning(db->api.event, "Query failed, retrying: %s",
709 sql_result_get_error(result));
710 request->retry_count++;
711 driver_sqlpool_prepend_request(db, request);
712
713 if (driver_sqlpool_get_connection(request->db,
714 request->host_idx, &conn)) {
715 request->host_idx = conn->host_idx;
716 sqlpool_request_send_next(db, conn->db);
717 }
718 } else {
719 if (result->failed) {
720 e_error(db->api.event, "Query failed, aborting: %s",
721 request->query);
722 }
723 conndb = result->db;
724
725 if (request->callback != NULL)
726 request->callback(result, request->context);
727 sqlpool_request_free(&request);
728
729 sqlpool_request_send_next(db, conndb);
730 }
731 }
732
733 static void ATTR_NULL(3, 4)
driver_sqlpool_query(struct sql_db * _db,const char * query,sql_query_callback_t * callback,void * context)734 driver_sqlpool_query(struct sql_db *_db, const char *query,
735 sql_query_callback_t *callback, void *context)
736 {
737 struct sqlpool_db *db = (struct sqlpool_db *)_db;
738 struct sqlpool_request *request;
739 const struct sqlpool_connection *conn;
740
741 request = sqlpool_request_new(db, query);
742 request->callback = callback;
743 request->context = context;
744
745 if (!driver_sqlpool_get_connection(db, UINT_MAX, &conn))
746 driver_sqlpool_append_request(db, request);
747 else {
748 request->host_idx = conn->host_idx;
749 sql_query(conn->db, query, driver_sqlpool_query_callback,
750 request);
751 }
752 }
753
driver_sqlpool_exec(struct sql_db * _db,const char * query)754 static void driver_sqlpool_exec(struct sql_db *_db, const char *query)
755 {
756 driver_sqlpool_query(_db, query, NULL, NULL);
757 }
758
759 static struct sql_result *
driver_sqlpool_query_s(struct sql_db * _db,const char * query)760 driver_sqlpool_query_s(struct sql_db *_db, const char *query)
761 {
762 struct sqlpool_db *db = (struct sqlpool_db *)_db;
763 const struct sqlpool_connection *conn;
764 struct sql_result *result;
765
766 if (!driver_sqlpool_get_sync_connection(db, &conn)) {
767 sql_not_connected_result.refcount++;
768 return &sql_not_connected_result;
769 }
770
771 result = sql_query_s(conn->db, query);
772 if (result->failed_try_retry) {
773 if (!driver_sqlpool_get_sync_connection(db, &conn))
774 return result;
775
776 sql_result_unref(result);
777 result = sql_query_s(conn->db, query);
778 }
779 return result;
780 }
781
782 static struct sql_transaction_context *
driver_sqlpool_transaction_begin(struct sql_db * _db)783 driver_sqlpool_transaction_begin(struct sql_db *_db)
784 {
785 struct sqlpool_transaction_context *ctx;
786
787 ctx = i_new(struct sqlpool_transaction_context, 1);
788 ctx->ctx.db = _db;
789
790 /* queue changes until commit. even if we did have a free connection
791 now, don't use it or multiple open transactions could tie up all
792 connections. */
793 ctx->query_pool = pool_alloconly_create("sqlpool transaction", 1024);
794 return &ctx->ctx;
795 }
796
797 static void
driver_sqlpool_transaction_free(struct sqlpool_transaction_context * ctx)798 driver_sqlpool_transaction_free(struct sqlpool_transaction_context *ctx)
799 {
800 if (ctx->commit_request != NULL)
801 sqlpool_request_abort(&ctx->commit_request);
802 pool_unref(&ctx->query_pool);
803 i_free(ctx);
804 }
805
806 static void
driver_sqlpool_commit_callback(const struct sql_commit_result * result,struct sqlpool_transaction_context * ctx)807 driver_sqlpool_commit_callback(const struct sql_commit_result *result,
808 struct sqlpool_transaction_context *ctx)
809 {
810 ctx->callback(result, ctx->context);
811 driver_sqlpool_transaction_free(ctx);
812 }
813
814 static void
driver_sqlpool_transaction_commit(struct sql_transaction_context * _ctx,sql_commit_callback_t * callback,void * context)815 driver_sqlpool_transaction_commit(struct sql_transaction_context *_ctx,
816 sql_commit_callback_t *callback,
817 void *context)
818 {
819 struct sqlpool_transaction_context *ctx =
820 (struct sqlpool_transaction_context *)_ctx;
821 struct sqlpool_db *db = (struct sqlpool_db *)_ctx->db;
822 const struct sqlpool_connection *conn;
823
824 ctx->callback = callback;
825 ctx->context = context;
826
827 ctx->commit_request = sqlpool_request_new(db, NULL);
828 ctx->commit_request->trans = ctx;
829
830 if (driver_sqlpool_get_connection(db, UINT_MAX, &conn))
831 sqlpool_request_handle_transaction(conn->db, ctx);
832 else
833 driver_sqlpool_append_request(db, ctx->commit_request);
834 }
835
836 static int
driver_sqlpool_transaction_commit_s(struct sql_transaction_context * _ctx,const char ** error_r)837 driver_sqlpool_transaction_commit_s(struct sql_transaction_context *_ctx,
838 const char **error_r)
839 {
840 struct sqlpool_transaction_context *ctx =
841 (struct sqlpool_transaction_context *)_ctx;
842 struct sqlpool_db *db = (struct sqlpool_db *)_ctx->db;
843 const struct sqlpool_connection *conn;
844 struct sql_transaction_context *conn_trans;
845 int ret;
846
847 *error_r = NULL;
848
849 if (!driver_sqlpool_get_sync_connection(db, &conn)) {
850 *error_r = SQL_ERRSTR_NOT_CONNECTED;
851 driver_sqlpool_transaction_free(ctx);
852 return -1;
853 }
854
855 conn_trans = driver_sqlpool_new_conn_trans(ctx, conn->db);
856 ret = sql_transaction_commit_s(&conn_trans, error_r);
857 driver_sqlpool_transaction_free(ctx);
858 return ret;
859 }
860
861 static void
driver_sqlpool_transaction_rollback(struct sql_transaction_context * _ctx)862 driver_sqlpool_transaction_rollback(struct sql_transaction_context *_ctx)
863 {
864 struct sqlpool_transaction_context *ctx =
865 (struct sqlpool_transaction_context *)_ctx;
866
867 driver_sqlpool_transaction_free(ctx);
868 }
869
870 static void
driver_sqlpool_update(struct sql_transaction_context * _ctx,const char * query,unsigned int * affected_rows)871 driver_sqlpool_update(struct sql_transaction_context *_ctx, const char *query,
872 unsigned int *affected_rows)
873 {
874 struct sqlpool_transaction_context *ctx =
875 (struct sqlpool_transaction_context *)_ctx;
876
877 /* we didn't get a connection for transaction immediately.
878 queue updates until commit transfers all of these */
879 sql_transaction_add_query(&ctx->ctx, ctx->query_pool,
880 query, affected_rows);
881 }
882
883 static const char *
driver_sqlpool_escape_blob(struct sql_db * _db,const unsigned char * data,size_t size)884 driver_sqlpool_escape_blob(struct sql_db *_db,
885 const unsigned char *data, size_t size)
886 {
887 struct sqlpool_db *db = (struct sqlpool_db *)_db;
888 const struct sqlpool_connection *conns;
889 unsigned int i, count;
890
891 /* use the first ready connection */
892 conns = array_get(&db->all_connections, &count);
893 for (i = 0; i < count; i++) {
894 if (SQL_DB_IS_READY(conns[i].db))
895 return sql_escape_blob(conns[i].db, data, size);
896 }
897 /* no ready connections. just use the first one (we're guaranteed
898 to always have one) */
899 return sql_escape_blob(conns[0].db, data, size);
900 }
901
driver_sqlpool_wait(struct sql_db * _db)902 static void driver_sqlpool_wait(struct sql_db *_db)
903 {
904 struct sqlpool_db *db = (struct sqlpool_db *)_db;
905 const struct sqlpool_connection *conn;
906
907 array_foreach(&db->all_connections, conn)
908 sql_wait(conn->db);
909 }
910
911 struct sql_db driver_sqlpool_db = {
912 "",
913
914 .v = {
915 .get_flags = driver_sqlpool_get_flags,
916 .deinit = driver_sqlpool_deinit,
917 .connect = driver_sqlpool_connect,
918 .disconnect = driver_sqlpool_disconnect,
919 .escape_string = driver_sqlpool_escape_string,
920 .exec = driver_sqlpool_exec,
921 .query = driver_sqlpool_query,
922 .query_s = driver_sqlpool_query_s,
923 .wait = driver_sqlpool_wait,
924
925 .transaction_begin = driver_sqlpool_transaction_begin,
926 .transaction_commit = driver_sqlpool_transaction_commit,
927 .transaction_commit_s = driver_sqlpool_transaction_commit_s,
928 .transaction_rollback = driver_sqlpool_transaction_rollback,
929
930 .update = driver_sqlpool_update,
931
932 .escape_blob = driver_sqlpool_escape_blob,
933 }
934 };
935