1
2 #include "common.h"
3
4 #include "log.h"
5
6 #include "seaf-db.h"
7
8 #include <stdarg.h>
9 #ifdef HAVE_MYSQL
10 #include <mysql.h>
11 #endif
12 #include <sqlite3.h>
13 #include <pthread.h>
14
15 struct DBConnPool {
16 GPtrArray *connections;
17 pthread_mutex_t lock;
18 int max_connections;
19 };
20 typedef struct DBConnPool DBConnPool;
21
22 struct SeafDB {
23 int type;
24 DBConnPool *pool;
25 };
26
27 typedef struct DBConnection {
28 gboolean is_available;
29 DBConnPool *pool;
30 } DBConnection;
31
32 struct SeafDBRow {
33 /* Empty */
34 };
35
36 struct SeafDBTrans {
37 DBConnection *conn;
38 gboolean need_close;
39 };
40
41 typedef struct DBOperations {
42 DBConnection* (*get_connection)(SeafDB *db);
43 void (*release_connection)(DBConnection *conn, gboolean need_close);
44 int (*execute_sql_no_stmt)(DBConnection *conn, const char *sql);
45 int (*execute_sql)(DBConnection *conn, const char *sql,
46 int n, va_list args);
47 int (*query_foreach_row)(DBConnection *conn,
48 const char *sql, SeafDBRowFunc callback, void *data,
49 int n, va_list args);
50 int (*row_get_column_count)(SeafDBRow *row);
51 const char* (*row_get_column_string)(SeafDBRow *row, int idx);
52 int (*row_get_column_int)(SeafDBRow *row, int idx);
53 gint64 (*row_get_column_int64)(SeafDBRow *row, int idx);
54 } DBOperations;
55
56 static DBOperations db_ops;
57
58 #ifdef HAVE_MYSQL
59
60 /* MySQL Ops */
61 static SeafDB *
62 mysql_db_new (const char *host,
63 int port,
64 const char *user,
65 const char *password,
66 const char *db_name,
67 const char *unix_socket,
68 gboolean use_ssl,
69 const char *charset);
70 static DBConnection *
71 mysql_db_get_connection (SeafDB *db);
72 static void
73 mysql_db_release_connection (DBConnection *vconn);
74 static int
75 mysql_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql);
76 static int
77 mysql_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args);
78 static int
79 mysql_db_query_foreach_row (DBConnection *vconn, const char *sql,
80 SeafDBRowFunc callback, void *data,
81 int n, va_list args);
82 static int
83 mysql_db_row_get_column_count (SeafDBRow *row);
84 static const char *
85 mysql_db_row_get_column_string (SeafDBRow *row, int idx);
86 static int
87 mysql_db_row_get_column_int (SeafDBRow *row, int idx);
88 static gint64
89 mysql_db_row_get_column_int64 (SeafDBRow *row, int idx);
90 static gboolean
91 mysql_db_connection_ping (DBConnection *vconn);
92
93 static DBConnPool *
init_conn_pool_common(int max_connections)94 init_conn_pool_common (int max_connections)
95 {
96 DBConnPool *pool = g_new0(DBConnPool, 1);
97 pool->connections = g_ptr_array_sized_new (max_connections);
98 pthread_mutex_init (&pool->lock, NULL);
99 pool->max_connections = max_connections;
100
101 return pool;
102 }
103
104 static DBConnection *
mysql_conn_pool_get_connection(SeafDB * db)105 mysql_conn_pool_get_connection (SeafDB *db)
106 {
107 DBConnPool *pool = db->pool;
108 DBConnection *conn = NULL;
109
110 if (pool->max_connections == 0) {
111 conn = mysql_db_get_connection (db);
112 conn->pool = pool;
113 return conn;
114 }
115
116 pthread_mutex_lock (&pool->lock);
117
118 guint i, size = pool->connections->len;
119 for (i = 0; i < size; ++i) {
120 conn = g_ptr_array_index (pool->connections, i);
121 if (conn->is_available && mysql_db_connection_ping (conn)) {
122 conn->is_available = FALSE;
123 goto out;
124 }
125 }
126 conn = NULL;
127 if (size < pool->max_connections) {
128 conn = mysql_db_get_connection (db);
129 if (conn) {
130 conn->pool = pool;
131 conn->is_available = FALSE;
132 g_ptr_array_add (pool->connections, conn);
133 }
134 }
135
136 out:
137 pthread_mutex_unlock (&pool->lock);
138 return conn;
139 }
140
141 static void
mysql_conn_pool_release_connection(DBConnection * conn,gboolean need_close)142 mysql_conn_pool_release_connection (DBConnection *conn, gboolean need_close)
143 {
144 if (!conn)
145 return;
146
147 if (conn->pool->max_connections == 0) {
148 mysql_db_release_connection (conn);
149 return;
150 }
151
152 if (need_close) {
153 pthread_mutex_lock (&conn->pool->lock);
154 g_ptr_array_remove (conn->pool->connections, conn);
155 pthread_mutex_unlock (&conn->pool->lock);
156 mysql_db_release_connection (conn);
157 return;
158 }
159
160 pthread_mutex_lock (&conn->pool->lock);
161 conn->is_available = TRUE;
162 pthread_mutex_unlock (&conn->pool->lock);
163 }
164
165 #define KEEPALIVE_INTERVAL 30
166 static void *
mysql_conn_keepalive(void * arg)167 mysql_conn_keepalive (void *arg)
168 {
169 DBConnPool *pool = arg;
170 DBConnection *conn = NULL;
171
172 while (1) {
173 pthread_mutex_lock (&pool->lock);
174
175 guint i, size = pool->connections->len;
176 for (i = 0; i < size; ++i) {
177 conn = g_ptr_array_index (pool->connections, i);
178 if (conn->is_available) {
179 mysql_db_connection_ping (conn);
180 }
181 }
182 pthread_mutex_unlock (&pool->lock);
183
184 sleep (KEEPALIVE_INTERVAL);
185 }
186
187 return NULL;
188 }
189
190 SeafDB *
seaf_db_new_mysql(const char * host,int port,const char * user,const char * passwd,const char * db_name,const char * unix_socket,gboolean use_ssl,const char * charset,int max_connections)191 seaf_db_new_mysql (const char *host,
192 int port,
193 const char *user,
194 const char *passwd,
195 const char *db_name,
196 const char *unix_socket,
197 gboolean use_ssl,
198 const char *charset,
199 int max_connections)
200 {
201 SeafDB *db;
202
203 db = mysql_db_new (host, port, user, passwd, db_name, unix_socket, use_ssl, charset);
204 if (!db)
205 return NULL;
206 db->type = SEAF_DB_TYPE_MYSQL;
207
208 db_ops.get_connection = mysql_conn_pool_get_connection;
209 db_ops.release_connection = mysql_conn_pool_release_connection;
210 db_ops.execute_sql_no_stmt = mysql_db_execute_sql_no_stmt;
211 db_ops.execute_sql = mysql_db_execute_sql;
212 db_ops.query_foreach_row = mysql_db_query_foreach_row;
213 db_ops.row_get_column_count = mysql_db_row_get_column_count;
214 db_ops.row_get_column_string = mysql_db_row_get_column_string;
215 db_ops.row_get_column_int = mysql_db_row_get_column_int;
216 db_ops.row_get_column_int64 = mysql_db_row_get_column_int64;
217
218 db->pool = init_conn_pool_common (max_connections);
219
220 pthread_t tid;
221 int ret = pthread_create (&tid, NULL, mysql_conn_keepalive, db->pool);
222 if (ret != 0) {
223 seaf_warning ("Failed to create mysql connection keepalive thread.\n");
224 return NULL;
225 }
226 pthread_detach (tid);
227
228 return db;
229 }
230
231 #endif
232
233 /* SQLite Ops */
234 static SeafDB *
235 sqlite_db_new (const char *db_path);
236 static DBConnection *
237 sqlite_db_get_connection (SeafDB *db);
238 static void
239 sqlite_db_release_connection (DBConnection *vconn, gboolean need_close);
240 static int
241 sqlite_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql);
242 static int
243 sqlite_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args);
244 static int
245 sqlite_db_query_foreach_row (DBConnection *vconn, const char *sql,
246 SeafDBRowFunc callback, void *data,
247 int n, va_list args);
248 static int
249 sqlite_db_row_get_column_count (SeafDBRow *row);
250 static const char *
251 sqlite_db_row_get_column_string (SeafDBRow *row, int idx);
252 static int
253 sqlite_db_row_get_column_int (SeafDBRow *row, int idx);
254 static gint64
255 sqlite_db_row_get_column_int64 (SeafDBRow *row, int idx);
256
257 SeafDB *
seaf_db_new_sqlite(const char * db_path,int max_connections)258 seaf_db_new_sqlite (const char *db_path, int max_connections)
259 {
260 SeafDB *db;
261
262 db = sqlite_db_new (db_path);
263 if (!db)
264 return NULL;
265 db->type = SEAF_DB_TYPE_SQLITE;
266
267 db_ops.get_connection = sqlite_db_get_connection;
268 db_ops.release_connection = sqlite_db_release_connection;
269 db_ops.execute_sql_no_stmt = sqlite_db_execute_sql_no_stmt;
270 db_ops.execute_sql = sqlite_db_execute_sql;
271 db_ops.query_foreach_row = sqlite_db_query_foreach_row;
272 db_ops.row_get_column_count = sqlite_db_row_get_column_count;
273 db_ops.row_get_column_string = sqlite_db_row_get_column_string;
274 db_ops.row_get_column_int = sqlite_db_row_get_column_int;
275 db_ops.row_get_column_int64 = sqlite_db_row_get_column_int64;
276
277 return db;
278 }
279
280 int
seaf_db_type(SeafDB * db)281 seaf_db_type (SeafDB *db)
282 {
283 return db->type;
284 }
285
286 int
seaf_db_query(SeafDB * db,const char * sql)287 seaf_db_query (SeafDB *db, const char *sql)
288 {
289 DBConnection *conn = db_ops.get_connection (db);
290 if (!conn)
291 return -1;
292
293 int ret;
294 ret = db_ops.execute_sql_no_stmt (conn, sql);
295
296 db_ops.release_connection (conn, ret < 0);
297 return ret;
298 }
299
300 gboolean
seaf_db_check_for_existence(SeafDB * db,const char * sql,gboolean * db_err)301 seaf_db_check_for_existence (SeafDB *db, const char *sql, gboolean *db_err)
302 {
303 return seaf_db_statement_exists (db, sql, db_err, 0);
304 }
305
306 int
seaf_db_foreach_selected_row(SeafDB * db,const char * sql,SeafDBRowFunc callback,void * data)307 seaf_db_foreach_selected_row (SeafDB *db, const char *sql,
308 SeafDBRowFunc callback, void *data)
309 {
310 return seaf_db_statement_foreach_row (db, sql, callback, data, 0);
311 }
312
313 const char *
seaf_db_row_get_column_text(SeafDBRow * row,guint32 idx)314 seaf_db_row_get_column_text (SeafDBRow *row, guint32 idx)
315 {
316 g_return_val_if_fail (idx < db_ops.row_get_column_count(row), NULL);
317
318 return db_ops.row_get_column_string (row, idx);
319 }
320
321 int
seaf_db_row_get_column_int(SeafDBRow * row,guint32 idx)322 seaf_db_row_get_column_int (SeafDBRow *row, guint32 idx)
323 {
324 g_return_val_if_fail (idx < db_ops.row_get_column_count(row), -1);
325
326 return db_ops.row_get_column_int (row, idx);
327 }
328
329 gint64
seaf_db_row_get_column_int64(SeafDBRow * row,guint32 idx)330 seaf_db_row_get_column_int64 (SeafDBRow *row, guint32 idx)
331 {
332 g_return_val_if_fail (idx < db_ops.row_get_column_count(row), -1);
333
334 return db_ops.row_get_column_int64 (row, idx);
335 }
336
337 int
seaf_db_get_int(SeafDB * db,const char * sql)338 seaf_db_get_int (SeafDB *db, const char *sql)
339 {
340 return seaf_db_statement_get_int (db, sql, 0);
341 }
342
343 gint64
seaf_db_get_int64(SeafDB * db,const char * sql)344 seaf_db_get_int64 (SeafDB *db, const char *sql)
345 {
346 return seaf_db_statement_get_int64 (db, sql, 0);
347 }
348
349 char *
seaf_db_get_string(SeafDB * db,const char * sql)350 seaf_db_get_string (SeafDB *db, const char *sql)
351 {
352 return seaf_db_statement_get_string (db, sql, 0);
353 }
354
355 int
seaf_db_statement_query(SeafDB * db,const char * sql,int n,...)356 seaf_db_statement_query (SeafDB *db, const char *sql, int n, ...)
357 {
358 int ret;
359 DBConnection *conn = NULL;
360
361 conn = db_ops.get_connection (db);
362 if (!conn)
363 return -1;
364
365 va_list args;
366 va_start (args, n);
367 ret = db_ops.execute_sql (conn, sql, n, args);
368 va_end (args);
369
370 db_ops.release_connection (conn, ret < 0);
371
372 return ret;
373 }
374
375 gboolean
seaf_db_statement_exists(SeafDB * db,const char * sql,gboolean * db_err,int n,...)376 seaf_db_statement_exists (SeafDB *db, const char *sql, gboolean *db_err, int n, ...)
377 {
378 int n_rows;
379 DBConnection *conn = NULL;
380
381 conn = db_ops.get_connection(db);
382 if (!conn) {
383 *db_err = TRUE;
384 return FALSE;
385 }
386
387 va_list args;
388 va_start (args, n);
389 n_rows = db_ops.query_foreach_row (conn, sql, NULL, NULL, n, args);
390 va_end (args);
391
392 db_ops.release_connection(conn, n_rows < 0);
393
394 if (n_rows < 0) {
395 *db_err = TRUE;
396 return FALSE;
397 } else {
398 *db_err = FALSE;
399 return (n_rows != 0);
400 }
401 }
402
403 int
seaf_db_statement_foreach_row(SeafDB * db,const char * sql,SeafDBRowFunc callback,void * data,int n,...)404 seaf_db_statement_foreach_row (SeafDB *db, const char *sql,
405 SeafDBRowFunc callback, void *data,
406 int n, ...)
407 {
408 int ret;
409 DBConnection *conn = NULL;
410
411 conn = db_ops.get_connection (db);
412 if (!conn)
413 return -1;
414
415 va_list args;
416 va_start (args, n);
417 ret = db_ops.query_foreach_row (conn, sql, callback, data, n, args);
418 va_end (args);
419
420 db_ops.release_connection (conn, ret < 0);
421
422 return ret;
423 }
424
425 static gboolean
get_int_cb(SeafDBRow * row,void * data)426 get_int_cb (SeafDBRow *row, void *data)
427 {
428 int *pret = (int*)data;
429
430 *pret = seaf_db_row_get_column_int (row, 0);
431
432 return FALSE;
433 }
434
435 int
seaf_db_statement_get_int(SeafDB * db,const char * sql,int n,...)436 seaf_db_statement_get_int (SeafDB *db, const char *sql, int n, ...)
437 {
438 int ret = -1;
439 int rc;
440 DBConnection *conn = NULL;
441
442 conn = db_ops.get_connection (db);
443 if (!conn)
444 return -1;
445
446 va_list args;
447 va_start (args, n);
448 rc = db_ops.query_foreach_row (conn, sql, get_int_cb, &ret, n, args);
449 va_end (args);
450
451 db_ops.release_connection (conn, rc < 0);
452
453 if (rc < 0)
454 return -1;
455
456 return ret;
457 }
458
459 static gboolean
get_int64_cb(SeafDBRow * row,void * data)460 get_int64_cb (SeafDBRow *row, void *data)
461 {
462 gint64 *pret = (gint64*)data;
463
464 *pret = seaf_db_row_get_column_int64 (row, 0);
465
466 return FALSE;
467 }
468
469 gint64
seaf_db_statement_get_int64(SeafDB * db,const char * sql,int n,...)470 seaf_db_statement_get_int64 (SeafDB *db, const char *sql, int n, ...)
471 {
472 gint64 ret = -1;
473 int rc;
474 DBConnection *conn = NULL;
475
476 conn = db_ops.get_connection (db);
477 if (!conn)
478 return -1;
479
480 va_list args;
481 va_start (args, n);
482 rc = db_ops.query_foreach_row (conn, sql, get_int64_cb, &ret, n, args);
483 va_end(args);
484
485 db_ops.release_connection (conn, rc < 0);
486
487 if (rc < 0)
488 return -1;
489
490 return ret;
491 }
492
493 static gboolean
get_string_cb(SeafDBRow * row,void * data)494 get_string_cb (SeafDBRow *row, void *data)
495 {
496 char **pret = (char**)data;
497
498 *pret = g_strdup(seaf_db_row_get_column_text (row, 0));
499
500 return FALSE;
501 }
502
503 char *
seaf_db_statement_get_string(SeafDB * db,const char * sql,int n,...)504 seaf_db_statement_get_string (SeafDB *db, const char *sql, int n, ...)
505 {
506 char *ret = NULL;
507 int rc;
508 DBConnection *conn = NULL;
509
510 conn = db_ops.get_connection (db);
511 if (!conn)
512 return NULL;
513
514 va_list args;
515 va_start (args, n);
516 rc = db_ops.query_foreach_row (conn, sql, get_string_cb, &ret, n, args);
517 va_end(args);
518
519 db_ops.release_connection (conn, rc < 0);
520
521 if (rc < 0)
522 return NULL;
523
524 return ret;
525 }
526
527 /* Transaction */
528
529 SeafDBTrans *
seaf_db_begin_transaction(SeafDB * db)530 seaf_db_begin_transaction (SeafDB *db)
531 {
532 SeafDBTrans *trans = NULL;
533 DBConnection *conn = db_ops.get_connection(db);
534 if (!conn) {
535 return trans;
536 }
537
538 if (db_ops.execute_sql_no_stmt (conn, "BEGIN") < 0) {
539 db_ops.release_connection (conn, TRUE);
540 return trans;
541 }
542
543 trans = g_new0 (SeafDBTrans, 1);
544 trans->conn = conn;
545
546 return trans;
547 }
548
549 void
seaf_db_trans_close(SeafDBTrans * trans)550 seaf_db_trans_close (SeafDBTrans *trans)
551 {
552 db_ops.release_connection (trans->conn, trans->need_close);
553 g_free (trans);
554 }
555
556 int
seaf_db_commit(SeafDBTrans * trans)557 seaf_db_commit (SeafDBTrans *trans)
558 {
559 DBConnection *conn = trans->conn;
560
561 if (db_ops.execute_sql_no_stmt (conn, "COMMIT") < 0) {
562 trans->need_close = TRUE;
563 return -1;
564 }
565
566 return 0;
567 }
568
569 int
seaf_db_rollback(SeafDBTrans * trans)570 seaf_db_rollback (SeafDBTrans *trans)
571 {
572 DBConnection *conn = trans->conn;
573
574 if (db_ops.execute_sql_no_stmt (conn, "ROLLBACK") < 0) {
575 trans->need_close = TRUE;
576 return -1;
577 }
578
579 return 0;
580 }
581
582 int
seaf_db_trans_query(SeafDBTrans * trans,const char * sql,int n,...)583 seaf_db_trans_query (SeafDBTrans *trans, const char *sql, int n, ...)
584 {
585 int ret;
586
587 va_list args;
588 va_start (args, n);
589 ret = db_ops.execute_sql (trans->conn, sql, n, args);
590 va_end (args);
591
592 if (ret < 0)
593 trans->need_close = TRUE;
594
595 return ret;
596 }
597
598 gboolean
seaf_db_trans_check_for_existence(SeafDBTrans * trans,const char * sql,gboolean * db_err,int n,...)599 seaf_db_trans_check_for_existence (SeafDBTrans *trans,
600 const char *sql,
601 gboolean *db_err,
602 int n, ...)
603 {
604 int n_rows;
605
606 va_list args;
607 va_start (args, n);
608 n_rows = db_ops.query_foreach_row (trans->conn, sql, NULL, NULL, n, args);
609 va_end (args);
610
611 if (n_rows < 0) {
612 trans->need_close = TRUE;
613 *db_err = TRUE;
614 return FALSE;
615 } else {
616 *db_err = FALSE;
617 return (n_rows != 0);
618 }
619 }
620
621 int
seaf_db_trans_foreach_selected_row(SeafDBTrans * trans,const char * sql,SeafDBRowFunc callback,void * data,int n,...)622 seaf_db_trans_foreach_selected_row (SeafDBTrans *trans, const char *sql,
623 SeafDBRowFunc callback, void *data,
624 int n, ...)
625 {
626 int ret;
627
628 va_list args;
629 va_start (args, n);
630 ret = db_ops.query_foreach_row (trans->conn, sql, callback, data, n, args);
631 va_end (args);
632
633 if (ret < 0)
634 trans->need_close = TRUE;
635
636 return ret;
637 }
638
639 int
seaf_db_row_get_column_count(SeafDBRow * row)640 seaf_db_row_get_column_count (SeafDBRow *row)
641 {
642 return db_ops.row_get_column_count(row);
643 }
644
645 #ifdef HAVE_MYSQL
646
647 /* MySQL DB */
648
649 typedef struct MySQLDB {
650 struct SeafDB parent;
651 char *host;
652 char *user;
653 char *password;
654 unsigned int port;
655 char *db_name;
656 char *unix_socket;
657 gboolean use_ssl;
658 char *charset;
659 } MySQLDB;
660
661 typedef struct MySQLDBConnection {
662 struct DBConnection parent;
663 MYSQL *db_conn;
664 } MySQLDBConnection;
665
666 static gboolean
mysql_db_connection_ping(DBConnection * vconn)667 mysql_db_connection_ping (DBConnection *vconn)
668 {
669 MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
670
671 return (mysql_ping (conn->db_conn) == 0);
672 }
673
674 static SeafDB *
mysql_db_new(const char * host,int port,const char * user,const char * password,const char * db_name,const char * unix_socket,gboolean use_ssl,const char * charset)675 mysql_db_new (const char *host,
676 int port,
677 const char *user,
678 const char *password,
679 const char *db_name,
680 const char *unix_socket,
681 gboolean use_ssl,
682 const char *charset)
683 {
684 MySQLDB *db = g_new0 (MySQLDB, 1);
685
686 db->host = g_strdup (host);
687 db->user = g_strdup (user);
688 db->password = g_strdup (password);
689 db->port = port;
690 db->db_name = g_strdup(db_name);
691 db->unix_socket = g_strdup(unix_socket);
692 db->use_ssl = use_ssl;
693 db->charset = g_strdup(charset);
694
695 mysql_library_init (0, NULL, NULL);
696
697 return (SeafDB *)db;
698 }
699
700 typedef char my_bool;
701
702 static DBConnection *
mysql_db_get_connection(SeafDB * vdb)703 mysql_db_get_connection (SeafDB *vdb)
704 {
705 MySQLDB *db = (MySQLDB *)vdb;
706 my_bool yes = 1;
707 int conn_timeout = 1;
708 MYSQL *db_conn;
709 MySQLDBConnection *conn = NULL;
710
711 db_conn = mysql_init (NULL);
712 if (!db_conn) {
713 seaf_warning ("Failed to init mysql connection object.\n");
714 return NULL;
715 }
716
717 if (db->use_ssl)
718 mysql_ssl_set(db_conn, 0,0,0,0,0);
719
720 if (db->charset)
721 mysql_options(db_conn, MYSQL_SET_CHARSET_NAME, db->charset);
722
723 mysql_options(db_conn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&conn_timeout);
724 mysql_options(db_conn, MYSQL_OPT_RECONNECT, (const char*)&yes);
725
726 if (!mysql_real_connect(db_conn, db->host, db->user, db->password,
727 db->db_name, db->port,
728 db->unix_socket, CLIENT_MULTI_STATEMENTS)) {
729 seaf_warning ("Failed to connect to MySQL: %s\n", mysql_error(db_conn));
730 mysql_close (db_conn);
731 return NULL;
732 }
733
734 conn = g_new0 (MySQLDBConnection, 1);
735 conn->db_conn = db_conn;
736
737 return (DBConnection *)conn;
738 }
739
740 static void
mysql_db_release_connection(DBConnection * vconn)741 mysql_db_release_connection (DBConnection *vconn)
742 {
743 if (!vconn)
744 return;
745
746 MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
747
748 mysql_close (conn->db_conn);
749
750 g_free (conn);
751 }
752
753 static int
mysql_db_execute_sql_no_stmt(DBConnection * vconn,const char * sql)754 mysql_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql)
755 {
756 MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
757
758 if (mysql_query (conn->db_conn, sql) != 0) {
759 seaf_warning ("Failed to execute sql %s: %s\n", sql, mysql_error(conn->db_conn));
760 return -1;
761 }
762
763 return 0;
764 }
765
766 static MYSQL_STMT *
_prepare_stmt_mysql(MYSQL * db,const char * sql)767 _prepare_stmt_mysql (MYSQL *db, const char *sql)
768 {
769 MYSQL_STMT *stmt;
770
771 stmt = mysql_stmt_init (db);
772 if (!stmt) {
773 seaf_warning ("mysql_stmt_init failed.\n");
774 return NULL;
775 }
776
777 if (mysql_stmt_prepare (stmt, sql, strlen(sql)) != 0) {
778 seaf_warning ("Failed to prepare sql %s: %s\n", sql, mysql_stmt_error(stmt));
779 mysql_stmt_close (stmt);
780 return NULL;
781 }
782
783 return stmt;
784 }
785
786 static int
_bind_params_mysql(MYSQL_STMT * stmt,MYSQL_BIND * params,int n,va_list args)787 _bind_params_mysql (MYSQL_STMT *stmt, MYSQL_BIND *params, int n, va_list args)
788 {
789 int i;
790 const char *type;
791
792 for (i = 0; i < n; ++i) {
793 type = va_arg (args, const char *);
794 if (strcmp(type, "int") == 0) {
795 int x = va_arg (args, int);
796 int *pval = g_new (int, 1);
797 *pval = x;
798 params[i].buffer_type = MYSQL_TYPE_LONG;
799 params[i].buffer = pval;
800 params[i].is_null = 0;
801 } else if (strcmp (type, "int64") == 0) {
802 gint64 x = va_arg (args, gint64);
803 gint64 *pval = g_new (gint64, 1);
804 *pval = x;
805 params[i].buffer_type = MYSQL_TYPE_LONGLONG;
806 params[i].buffer = pval;
807 params[i].is_null = 0;
808 } else if (strcmp (type, "string") == 0) {
809 const char *s = va_arg (args, const char *);
810 static my_bool yes = TRUE;
811 params[i].buffer_type = MYSQL_TYPE_STRING;
812 params[i].buffer = g_strdup(s);
813 unsigned long *plen = g_new (unsigned long, 1);
814 params[i].length = plen;
815 if (!s) {
816 *plen = 0;
817 params[i].buffer_length = 0;
818 params[i].is_null = &yes;
819 } else {
820 *plen = strlen(s);
821 params[i].buffer_length = *plen + 1;
822 params[i].is_null = 0;
823 }
824 } else {
825 seaf_warning ("BUG: invalid prep stmt parameter type %s.\n", type);
826 g_return_val_if_reached (-1);
827 }
828 }
829
830 if (mysql_stmt_bind_param (stmt, params) != 0) {
831 return -1;
832 }
833
834 return 0;
835 }
836
837 static int
mysql_db_execute_sql(DBConnection * vconn,const char * sql,int n,va_list args)838 mysql_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args)
839 {
840 MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
841 MYSQL *db = conn->db_conn;
842 MYSQL_STMT *stmt = NULL;
843 MYSQL_BIND *params = NULL;
844 int ret = 0;
845
846 stmt = _prepare_stmt_mysql (db, sql);
847 if (!stmt) {
848 return -1;
849 }
850
851 if (n > 0) {
852 params = g_new0 (MYSQL_BIND, n);
853 if (_bind_params_mysql (stmt, params, n, args) < 0) {
854 seaf_warning ("Failed to bind parameters for %s: %s.\n",
855 sql, mysql_stmt_error(stmt));
856 ret = -1;
857 goto out;
858 }
859 }
860
861 if (mysql_stmt_execute (stmt) != 0) {
862 seaf_warning ("Failed to execute sql %s: %s\n", sql, mysql_stmt_error(stmt));
863 ret = -1;
864 goto out;
865 }
866
867 out:
868 if (stmt)
869 mysql_stmt_close (stmt);
870 if (params) {
871 int i;
872 for (i = 0; i < n; ++i) {
873 g_free (params[i].buffer);
874 g_free (params[i].length);
875 }
876 g_free (params);
877 }
878 return ret;
879 }
880
881 typedef struct MySQLDBRow {
882 SeafDBRow parent;
883 int column_count;
884 MYSQL_STMT *stmt;
885 MYSQL_BIND *results;
886 /* Used when returned columns are truncated. */
887 MYSQL_BIND *new_binds;
888 } MySQLDBRow;
889
890 #define DEFAULT_MYSQL_COLUMN_SIZE 1024
891
892 static int
mysql_db_query_foreach_row(DBConnection * vconn,const char * sql,SeafDBRowFunc callback,void * data,int n,va_list args)893 mysql_db_query_foreach_row (DBConnection *vconn, const char *sql,
894 SeafDBRowFunc callback, void *data,
895 int n, va_list args)
896 {
897 MySQLDBConnection *conn = (MySQLDBConnection *)vconn;
898 MYSQL *db = conn->db_conn;
899 MYSQL_STMT *stmt = NULL;
900 MYSQL_BIND *params = NULL;
901 MySQLDBRow row;
902 int nrows = 0;
903 int i;
904
905 memset (&row, 0, sizeof(row));
906
907 stmt = _prepare_stmt_mysql (db, sql);
908 if (!stmt) {
909 return -1;
910 }
911
912 if (n > 0) {
913 params = g_new0 (MYSQL_BIND, n);
914 if (_bind_params_mysql (stmt, params, n, args) < 0) {
915 nrows = -1;
916 goto out;
917 }
918 }
919
920 if (mysql_stmt_execute (stmt) != 0) {
921 seaf_warning ("Failed to execute sql %s: %s\n", sql, mysql_stmt_error(stmt));
922 nrows = -1;
923 goto out;
924 }
925
926 row.column_count = mysql_stmt_field_count (stmt);
927 row.stmt = stmt;
928 row.results = g_new0 (MYSQL_BIND, row.column_count);
929 for (i = 0; i < row.column_count; ++i) {
930 row.results[i].buffer = g_malloc (DEFAULT_MYSQL_COLUMN_SIZE + 1);
931 /* Ask MySQL to convert fields to string, to avoid the trouble of
932 * checking field types.
933 */
934 row.results[i].buffer_type = MYSQL_TYPE_STRING;
935 row.results[i].buffer_length = DEFAULT_MYSQL_COLUMN_SIZE;
936 row.results[i].length = g_new0 (unsigned long, 1);
937 row.results[i].is_null = g_new0 (my_bool, 1);
938 }
939 row.new_binds = g_new0 (MYSQL_BIND, row.column_count);
940
941 if (mysql_stmt_bind_result (stmt, row.results) != 0) {
942 seaf_warning ("Failed to bind result for sql %s: %s\n", sql, mysql_stmt_error(stmt));
943 nrows = -1;
944 goto out;
945 }
946
947 int rc;
948 gboolean next_row = TRUE;
949 while (1) {
950 rc = mysql_stmt_fetch (stmt);
951 if (rc == 1) {
952 seaf_warning ("Failed to fetch result for sql %s: %s\n",
953 sql, mysql_stmt_error(stmt));
954 nrows = -1;
955 goto out;
956 }
957 if (rc == MYSQL_NO_DATA)
958 break;
959
960 /* rc == 0 or rc == MYSQL_DATA_TRUNCATED */
961
962 ++nrows;
963 if (callback)
964 next_row = callback ((SeafDBRow *)&row, data);
965
966 for (i = 0; i < row.column_count; ++i) {
967 g_free (row.new_binds[i].buffer);
968 g_free (row.new_binds[i].length);
969 g_free (row.new_binds[i].is_null);
970 memset (&row.new_binds[i], 0, sizeof(MYSQL_BIND));
971 }
972
973 if (!next_row)
974 break;
975 }
976
977 out:
978 if (stmt) {
979 mysql_stmt_free_result (stmt);
980 mysql_stmt_close (stmt);
981 }
982 if (params) {
983 for (i = 0; i < n; ++i) {
984 g_free (params[i].buffer);
985 g_free (params[i].length);
986 }
987 g_free (params);
988 }
989 if (row.results) {
990 for (i = 0; i < row.column_count; ++i) {
991 g_free (row.results[i].buffer);
992 g_free (row.results[i].length);
993 g_free (row.results[i].is_null);
994 }
995 g_free (row.results);
996 }
997 if (row.new_binds) {
998 for (i = 0; i < row.column_count; ++i) {
999 g_free (row.new_binds[i].buffer);
1000 g_free (row.new_binds[i].length);
1001 g_free (row.new_binds[i].is_null);
1002 }
1003 g_free (row.new_binds);
1004 }
1005 return nrows;
1006 }
1007
1008 static int
mysql_db_row_get_column_count(SeafDBRow * vrow)1009 mysql_db_row_get_column_count (SeafDBRow *vrow)
1010 {
1011 MySQLDBRow *row = (MySQLDBRow *)vrow;
1012 return row->column_count;
1013 }
1014
1015 static const char *
mysql_db_row_get_column_string(SeafDBRow * vrow,int i)1016 mysql_db_row_get_column_string (SeafDBRow *vrow, int i)
1017 {
1018 MySQLDBRow *row = (MySQLDBRow *)vrow;
1019
1020 if (*(row->results[i].is_null)) {
1021 return NULL;
1022 }
1023
1024 char *ret = NULL;
1025 unsigned long real_length = *(row->results[i].length);
1026 /* If column size is larger then allocated buffer size, re-allocate a new buffer
1027 * and fetch the column directly.
1028 */
1029 if (real_length > row->results[i].buffer_length) {
1030 row->new_binds[i].buffer = g_malloc (real_length + 1);
1031 row->new_binds[i].buffer_type = MYSQL_TYPE_STRING;
1032 row->new_binds[i].buffer_length = real_length;
1033 row->new_binds[i].length = g_new0 (unsigned long, 1);
1034 row->new_binds[i].is_null = g_new0 (my_bool, 1);
1035 if (mysql_stmt_fetch_column (row->stmt, &row->new_binds[i], i, 0) != 0) {
1036 seaf_warning ("Faield to fetch column: %s\n", mysql_stmt_error(row->stmt));
1037 return NULL;
1038 }
1039
1040 ret = row->new_binds[i].buffer;
1041 } else {
1042 ret = row->results[i].buffer;
1043 }
1044 ret[real_length] = 0;
1045
1046 return ret;
1047 }
1048
1049 static int
mysql_db_row_get_column_int(SeafDBRow * vrow,int idx)1050 mysql_db_row_get_column_int (SeafDBRow *vrow, int idx)
1051 {
1052 const char *str;
1053 char *e;
1054 int ret;
1055
1056 str = mysql_db_row_get_column_string (vrow, idx);
1057 if (!str) {
1058 return 0;
1059 }
1060
1061 errno = 0;
1062 ret = strtol (str, &e, 10);
1063 if (errno || (e == str)) {
1064 seaf_warning ("Number conversion failed.\n");
1065 return -1;
1066 }
1067
1068 return ret;
1069 }
1070
1071 static gint64
mysql_db_row_get_column_int64(SeafDBRow * vrow,int idx)1072 mysql_db_row_get_column_int64 (SeafDBRow *vrow, int idx)
1073 {
1074 const char *str;
1075 char *e;
1076 gint64 ret;
1077
1078 str = mysql_db_row_get_column_string (vrow, idx);
1079 if (!str) {
1080 return 0;
1081 }
1082
1083 errno = 0;
1084 ret = strtoll (str, &e, 10);
1085 if (errno || (e == str)) {
1086 seaf_warning ("Number conversion failed.\n");
1087 return -1;
1088 }
1089
1090 return ret;
1091 }
1092
1093 #endif /* HAVE_MYSQL */
1094
1095 /* SQLite DB */
1096
1097 /* SQLite thread synchronization rountines.
1098 * See https://www.sqlite.org/unlock_notify.html
1099 */
1100
1101 typedef struct UnlockNotification {
1102 int fired;
1103 pthread_cond_t cond;
1104 pthread_mutex_t mutex;
1105 } UnlockNotification;
1106
1107 static void
unlock_notify_cb(void ** ap_arg,int n_arg)1108 unlock_notify_cb(void **ap_arg, int n_arg)
1109 {
1110 int i;
1111
1112 for (i = 0; i < n_arg; i++) {
1113 UnlockNotification *p = (UnlockNotification *)ap_arg[i];
1114 pthread_mutex_lock (&p->mutex);
1115 p->fired = 1;
1116 pthread_cond_signal (&p->cond);
1117 pthread_mutex_unlock (&p->mutex);
1118 }
1119 }
1120
1121 static int
wait_for_unlock_notify(sqlite3 * db)1122 wait_for_unlock_notify(sqlite3 *db)
1123 {
1124 UnlockNotification un;
1125 un.fired = 0;
1126 pthread_mutex_init (&un.mutex, NULL);
1127 pthread_cond_init (&un.cond, NULL);
1128
1129 int rc = sqlite3_unlock_notify(db, unlock_notify_cb, (void *)&un);
1130
1131 if (rc == SQLITE_OK) {
1132 pthread_mutex_lock(&un.mutex);
1133 if (!un.fired)
1134 pthread_cond_wait (&un.cond, &un.mutex);
1135 pthread_mutex_unlock(&un.mutex);
1136 }
1137
1138 pthread_cond_destroy (&un.cond);
1139 pthread_mutex_destroy (&un.mutex);
1140
1141 return rc;
1142 }
1143
1144 static int
sqlite3_blocking_step(sqlite3_stmt * stmt)1145 sqlite3_blocking_step(sqlite3_stmt *stmt)
1146 {
1147 int rc;
1148 while (SQLITE_LOCKED == (rc = sqlite3_step(stmt))) {
1149 rc = wait_for_unlock_notify(sqlite3_db_handle(stmt));
1150 if (rc != SQLITE_OK)
1151 break;
1152 sqlite3_reset(stmt);
1153 }
1154 return rc;
1155 }
1156
1157 static int
sqlite3_blocking_prepare_v2(sqlite3 * db,const char * sql,int sql_len,sqlite3_stmt ** pstmt,const char ** pz)1158 sqlite3_blocking_prepare_v2(sqlite3 *db, const char *sql, int sql_len, sqlite3_stmt **pstmt, const char **pz)
1159 {
1160 int rc;
1161 while (SQLITE_LOCKED == (rc = sqlite3_prepare_v2(db, sql, sql_len, pstmt, pz))) {
1162 rc = wait_for_unlock_notify(db);
1163 if (rc != SQLITE_OK)
1164 break;
1165 }
1166 return rc;
1167 }
1168
1169 static int
sqlite3_blocking_exec(sqlite3 * db,const char * sql,int (* callback)(void *,int,char **,char **),void * arg,char ** errmsg)1170 sqlite3_blocking_exec(sqlite3 *db, const char *sql, int (*callback)(void *, int, char **, char **), void *arg, char **errmsg)
1171 {
1172 int rc;
1173 while (SQLITE_LOCKED == (rc = sqlite3_exec(db, sql, callback, arg, errmsg))) {
1174 rc = wait_for_unlock_notify(db);
1175 if (rc != SQLITE_OK)
1176 break;
1177 }
1178 return rc;
1179 }
1180
1181 typedef struct SQLiteDB {
1182 SeafDB parent;
1183 char *db_path;
1184 } SQLiteDB;
1185
1186 typedef struct SQLiteDBConnection {
1187 DBConnection parent;
1188 sqlite3 *db_conn;
1189 } SQLiteDBConnection;
1190
1191 static SeafDB *
sqlite_db_new(const char * db_path)1192 sqlite_db_new (const char *db_path)
1193 {
1194 SQLiteDB *db = g_new0 (SQLiteDB, 1);
1195 db->db_path = g_strdup(db_path);
1196
1197 return (SeafDB *)db;
1198 }
1199
1200 static DBConnection *
sqlite_db_get_connection(SeafDB * vdb)1201 sqlite_db_get_connection (SeafDB *vdb)
1202 {
1203 SQLiteDB *db = (SQLiteDB *)vdb;
1204 sqlite3 *db_conn;
1205 int result;
1206 const char *errmsg;
1207 SQLiteDBConnection *conn;
1208
1209 result = sqlite3_open_v2 (db->db_path, &db_conn, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_SHAREDCACHE, NULL);
1210 if (result != SQLITE_OK) {
1211 errmsg = sqlite3_errmsg(db_conn);
1212 seaf_warning ("Failed to open sqlite db: %s\n", errmsg ? errmsg : "no error given");
1213 return NULL;
1214 }
1215
1216 conn = g_new0 (SQLiteDBConnection, 1);
1217 conn->db_conn = db_conn;
1218
1219 return (DBConnection *)conn;
1220 }
1221
1222 static void
sqlite_db_release_connection(DBConnection * vconn,gboolean need_close)1223 sqlite_db_release_connection (DBConnection *vconn, gboolean need_close)
1224 {
1225 if (!vconn)
1226 return;
1227
1228 SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn;
1229
1230 sqlite3_close (conn->db_conn);
1231
1232 g_free (conn);
1233 }
1234
1235 static int
sqlite_db_execute_sql_no_stmt(DBConnection * vconn,const char * sql)1236 sqlite_db_execute_sql_no_stmt (DBConnection *vconn, const char *sql)
1237 {
1238 SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn;
1239 char *errmsg = NULL;
1240 int rc;
1241
1242 rc = sqlite3_blocking_exec (conn->db_conn, sql, NULL, NULL, &errmsg);
1243 if (rc != SQLITE_OK) {
1244 seaf_warning ("sqlite3_exec failed %s: %s", sql, errmsg ? errmsg : "no error given");
1245 if (errmsg)
1246 sqlite3_free (errmsg);
1247 return -1;
1248 }
1249
1250 return 0;
1251 }
1252
1253 static int
_bind_parameters_sqlite(sqlite3 * db,sqlite3_stmt * stmt,int n,va_list args)1254 _bind_parameters_sqlite (sqlite3 *db, sqlite3_stmt *stmt, int n, va_list args)
1255 {
1256 int i;
1257 const char *type;
1258
1259 for (i = 0; i < n; ++i) {
1260 type = va_arg (args, const char *);
1261 if (strcmp(type, "int") == 0) {
1262 int x = va_arg (args, int);
1263 if (sqlite3_bind_int (stmt, i+1, x) != SQLITE_OK) {
1264 seaf_warning ("sqlite3_bind_int failed: %s\n", sqlite3_errmsg(db));
1265 return -1;
1266 }
1267 } else if (strcmp (type, "int64") == 0) {
1268 gint64 x = va_arg (args, gint64);
1269 if (sqlite3_bind_int64 (stmt, i+1, x) != SQLITE_OK) {
1270 seaf_warning ("sqlite3_bind_int64 failed: %s\n", sqlite3_errmsg(db));
1271 return -1;
1272 }
1273 } else if (strcmp (type, "string") == 0) {
1274 const char *s = va_arg (args, const char *);
1275 if (sqlite3_bind_text (stmt, i+1, s, -1, SQLITE_TRANSIENT) != SQLITE_OK) {
1276 seaf_warning ("sqlite3_bind_text failed: %s\n", sqlite3_errmsg(db));
1277 return -1;
1278 }
1279 } else {
1280 seaf_warning ("BUG: invalid prep stmt parameter type %s.\n", type);
1281 g_return_val_if_reached (-1);
1282 }
1283 }
1284
1285 return 0;
1286 }
1287
1288 static int
sqlite_db_execute_sql(DBConnection * vconn,const char * sql,int n,va_list args)1289 sqlite_db_execute_sql (DBConnection *vconn, const char *sql, int n, va_list args)
1290 {
1291 SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn;
1292 sqlite3 *db = conn->db_conn;
1293 sqlite3_stmt *stmt;
1294 int rc;
1295 int ret = 0;
1296
1297 rc = sqlite3_blocking_prepare_v2 (db, sql, -1, &stmt, NULL);
1298 if (rc != SQLITE_OK) {
1299 seaf_warning ("sqlite3_prepare_v2 failed %s: %s", sql, sqlite3_errmsg(db));
1300 return -1;
1301 }
1302
1303 if (_bind_parameters_sqlite (db, stmt, n, args) < 0) {
1304 seaf_warning ("Failed to bind parameters for sql %s\n", sql);
1305 ret = -1;
1306 goto out;
1307 }
1308
1309 rc = sqlite3_blocking_step (stmt);
1310 if (rc != SQLITE_DONE) {
1311 seaf_warning ("sqlite3_step failed %s: %s", sql, sqlite3_errmsg(db));
1312 ret = -1;
1313 goto out;
1314 }
1315
1316 out:
1317 sqlite3_finalize (stmt);
1318 return ret;
1319 }
1320
1321 typedef struct SQLiteDBRow {
1322 SeafDBRow parent;
1323 int column_count;
1324 sqlite3 *db;
1325 sqlite3_stmt *stmt;
1326 } SQLiteDBRow;
1327
1328 static int
sqlite_db_query_foreach_row(DBConnection * vconn,const char * sql,SeafDBRowFunc callback,void * data,int n,va_list args)1329 sqlite_db_query_foreach_row (DBConnection *vconn, const char *sql,
1330 SeafDBRowFunc callback, void *data,
1331 int n, va_list args)
1332 {
1333 SQLiteDBConnection *conn = (SQLiteDBConnection *)vconn;
1334 sqlite3 *db = conn->db_conn;
1335 sqlite3_stmt *stmt;
1336 int rc;
1337 int nrows = 0;
1338
1339 rc = sqlite3_blocking_prepare_v2 (db, sql, -1, &stmt, NULL);
1340 if (rc != SQLITE_OK) {
1341 seaf_warning ("sqlite3_prepare_v2 failed %s: %s", sql, sqlite3_errmsg(db));
1342 return -1;
1343 }
1344
1345 if (_bind_parameters_sqlite (db, stmt, n, args) < 0) {
1346 seaf_warning ("Failed to bind parameters for sql %s\n", sql);
1347 nrows = -1;
1348 goto out;
1349 }
1350
1351 SQLiteDBRow row;
1352 memset (&row, 0, sizeof(row));
1353 row.db = db;
1354 row.stmt = stmt;
1355 row.column_count = sqlite3_column_count (stmt);
1356
1357 while (1) {
1358 rc = sqlite3_blocking_step (stmt);
1359 if (rc == SQLITE_ROW) {
1360 ++nrows;
1361 if (callback && !callback ((SeafDBRow *)&row, data))
1362 break;
1363 } else if (rc == SQLITE_DONE) {
1364 break;
1365 } else {
1366 seaf_warning ("sqlite3_step failed %s: %s\n", sql, sqlite3_errmsg(db));
1367 nrows = -1;
1368 goto out;
1369 }
1370 }
1371
1372 out:
1373 sqlite3_finalize (stmt);
1374 return nrows;
1375 }
1376
1377 static int
sqlite_db_row_get_column_count(SeafDBRow * vrow)1378 sqlite_db_row_get_column_count (SeafDBRow *vrow)
1379 {
1380 SQLiteDBRow *row = (SQLiteDBRow *)vrow;
1381
1382 return row->column_count;
1383 }
1384
1385 static const char *
sqlite_db_row_get_column_string(SeafDBRow * vrow,int idx)1386 sqlite_db_row_get_column_string (SeafDBRow *vrow, int idx)
1387 {
1388 SQLiteDBRow *row = (SQLiteDBRow *)vrow;
1389
1390 return (const char *)sqlite3_column_text (row->stmt, idx);
1391 }
1392
1393 static int
sqlite_db_row_get_column_int(SeafDBRow * vrow,int idx)1394 sqlite_db_row_get_column_int (SeafDBRow *vrow, int idx)
1395 {
1396 SQLiteDBRow *row = (SQLiteDBRow *)vrow;
1397
1398 return sqlite3_column_int (row->stmt, idx);
1399 }
1400
1401 static gint64
sqlite_db_row_get_column_int64(SeafDBRow * vrow,int idx)1402 sqlite_db_row_get_column_int64 (SeafDBRow *vrow, int idx)
1403 {
1404 SQLiteDBRow *row = (SQLiteDBRow *)vrow;
1405
1406 return sqlite3_column_int64 (row->stmt, idx);
1407 }
1408