1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Bacula Catalog Database routines specific to PostgreSQL
21 * These are PostgreSQL specific routines
22 *
23 * Dan Langille, December 2003
24 * based upon work done by Kern Sibbald, March 2000
25 *
26 * Note: at one point, this file was changed to class based by a certain
27 * programmer, and other than "wrapping" in a class, which is a trivial
28 * change for a C++ programmer, nothing substantial was done, yet all the
29 * code was recommitted under this programmer's name. Consequently, we
30 * undo those changes here. Unfortunately, it is too difficult to put
31 * back the original author's name (Dan Langille) on the parts he wrote.
32 */
33
34 #include "bacula.h"
35
36 #ifdef HAVE_POSTGRESQL
37
38 #include "cats.h"
39
40 /* Note in this file, we want these for Postgresql not Bacula */
41 #undef PACKAGE_BUGREPORT
42 #undef PACKAGE_NAME
43 #undef PACKAGE_STRING
44 #undef PACKAGE_TARNAME
45 #undef PACKAGE_VERSION
46
47 #include "libpq-fe.h"
48 #include "postgres_ext.h" /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
50 #include "pg_config.h" /* for PG_VERSION_NUM */
51 #define __BDB_POSTGRESQL_H_ 1
52 #include "bdb_postgresql.h"
53
54 #define dbglvl_dbg DT_SQL|100
55 #define dbglvl_info DT_SQL|50
56 #define dbglvl_err DT_SQL|10
57
58 /* -----------------------------------------------------------------------
59 *
60 * PostgreSQL dependent defines and subroutines
61 *
62 * -----------------------------------------------------------------------
63 */
64
65 /* List of open databases */
66 static dlist *db_list = NULL;
67
68 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
69
BDB_POSTGRESQL()70 BDB_POSTGRESQL::BDB_POSTGRESQL(): BDB()
71 {
72 BDB_POSTGRESQL *mdb = this;
73
74 if (db_list == NULL) {
75 db_list = New(dlist(mdb, &mdb->m_link));
76 }
77 mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
78 mdb->m_db_type = SQL_TYPE_POSTGRESQL;
79 mdb->m_db_driver = bstrdup("PostgreSQL");
80
81 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
82 mdb->errmsg[0] = 0;
83 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
84 mdb->cached_path = get_pool_memory(PM_FNAME);
85 mdb->cached_path_id = 0;
86 mdb->m_ref_count = 1;
87 mdb->fname = get_pool_memory(PM_FNAME);
88 mdb->path = get_pool_memory(PM_FNAME);
89 mdb->esc_name = get_pool_memory(PM_FNAME);
90 mdb->esc_path = get_pool_memory(PM_FNAME);
91 mdb->esc_obj = get_pool_memory(PM_FNAME);
92 mdb->m_use_fatal_jmsg = true;
93
94 /* Initialize the private members. */
95 mdb->m_db_handle = NULL;
96 mdb->m_result = NULL;
97 mdb->m_buf = get_pool_memory(PM_FNAME);
98
99 db_list->append(this);
100 }
101
~BDB_POSTGRESQL()102 BDB_POSTGRESQL::~BDB_POSTGRESQL()
103 {
104 }
105
106 /*
107 * Initialize database data structure. In principal this should
108 * never have errors, or it is really fatal.
109 */
db_init_database(JCR * jcr,const char * db_driver,const char * db_name,const char * db_user,const char * db_password,const char * db_address,int db_port,const char * db_socket,const char * db_ssl_mode,const char * db_ssl_key,const char * db_ssl_cert,const char * db_ssl_ca,const char * db_ssl_capath,const char * db_ssl_cipher,bool mult_db_connections,bool disable_batch_insert)110 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name,
111 const char *db_user, const char *db_password,
112 const char *db_address, int db_port,
113 const char *db_socket,
114 const char *db_ssl_mode, const char *db_ssl_key, const char *db_ssl_cert,
115 const char *db_ssl_ca, const char *db_ssl_capath, const char *db_ssl_cipher,
116 bool mult_db_connections, bool disable_batch_insert)
117 {
118 BDB_POSTGRESQL *mdb = NULL;
119
120 if (!db_user) {
121 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
122 return NULL;
123 }
124 P(mutex); /* lock DB queue */
125 if (db_list && !mult_db_connections) {
126 /*
127 * Look to see if DB already open
128 */
129 foreach_dlist(mdb, db_list) {
130 if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
131 Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
132 mdb->increment_refcount();
133 goto get_out;
134 }
135 }
136 }
137 Dmsg0(dbglvl_info, "db_init_database first time\n");
138 /* Create the global Bacula db context */
139 mdb = New(BDB_POSTGRESQL());
140 if (!mdb) goto get_out;
141
142 /*
143 * Initialize the parent class members.
144 */
145 mdb->m_db_name = bstrdup(db_name);
146 mdb->m_db_user = bstrdup(db_user);
147 if (db_password) {
148 mdb->m_db_password = bstrdup(db_password);
149 }
150 if (db_address) {
151 mdb->m_db_address = bstrdup(db_address);
152 }
153 if (db_socket) {
154 mdb->m_db_socket = bstrdup(db_socket);
155 }
156 if (db_ssl_mode) {
157 mdb->m_db_ssl_mode = bstrdup(db_ssl_mode);
158 } else {
159 mdb->m_db_ssl_mode = bstrdup("prefer");
160 }
161 if (db_ssl_key) {
162 mdb->m_db_ssl_key = bstrdup(db_ssl_key);
163 }
164 if (db_ssl_cert) {
165 mdb->m_db_ssl_cert = bstrdup(db_ssl_cert);
166 }
167 if (db_ssl_ca) {
168 mdb->m_db_ssl_ca = bstrdup(db_ssl_ca);
169 }
170 mdb->m_db_port = db_port;
171
172 if (disable_batch_insert) {
173 mdb->m_disabled_batch_insert = true;
174 mdb->m_have_batch_insert = false;
175 } else {
176 mdb->m_disabled_batch_insert = false;
177 #ifdef USE_BATCH_FILE_INSERT
178 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
179 #ifdef HAVE_PQISTHREADSAFE
180 mdb->m_have_batch_insert = PQisthreadsafe();
181 #else
182 mdb->m_have_batch_insert = true;
183 #endif /* HAVE_PQISTHREADSAFE */
184 #else
185 mdb->m_have_batch_insert = true;
186 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
187 #else
188 mdb->m_have_batch_insert = false;
189 #endif /* USE_BATCH_FILE_INSERT */
190 }
191 mdb->m_allow_transactions = mult_db_connections;
192
193 /* At this time, when mult_db_connections == true, this is for
194 * specific console command such as bvfs or batch mode, and we don't
195 * want to share a batch mode or bvfs. In the future, we can change
196 * the creation function to add this parameter.
197 */
198 mdb->m_dedicated = mult_db_connections;
199
200 get_out:
201 V(mutex);
202 return mdb;
203 }
204
205
206 /* Check that the database corresponds to the encoding we want */
pgsql_check_database_encoding(JCR * jcr,BDB_POSTGRESQL * mdb)207 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
208 {
209 SQL_ROW row;
210 int ret = false;
211
212 if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
213 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
214 return false;
215 }
216
217 if ((row = mdb->sql_fetch_row()) == NULL) {
218 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
219 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
220 } else {
221 ret = bstrcmp(row[0], "SQL_ASCII");
222
223 if (ret) {
224 /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
225 mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
226
227 } else {
228 /* Something is wrong with database encoding */
229 Mmsg(mdb->errmsg,
230 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
231 mdb->get_db_name(), row[0]);
232 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
233 Dmsg1(dbglvl_err, "%s", mdb->errmsg);
234 }
235 }
236 return ret;
237 }
238
239 /*
240 * Now actually open the database. This can generate errors,
241 * which are returned in the errmsg
242 *
243 * DO NOT close the database or delete mdb here !!!!
244 */
bdb_open_database(JCR * jcr)245 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
246 {
247 bool retval = false;
248 int errstat;
249 char buf[10], *port;
250 BDB_POSTGRESQL *mdb = this;
251
252 P(mutex);
253 if (mdb->m_connected) {
254 retval = true;
255 goto get_out;
256 }
257
258 if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
259 berrno be;
260 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
261 be.bstrerror(errstat));
262 goto get_out;
263 }
264
265 if (mdb->m_db_port) {
266 bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
267 port = buf;
268 } else {
269 port = NULL;
270 }
271
272 /* Tells libpq that the SSL library has already been initialized */
273 PQinitSSL(0);
274
275 /* If connection fails, try at 5 sec intervals for 30 seconds. */
276 for (int retry=0; retry < 6; retry++) {
277
278 #if PG_VERSION_NUM < 90000
279 /* connect to the database */
280 /* Old "depreciated" connection call */
281 mdb->m_db_handle = PQsetdbLogin(
282 mdb->m_db_address, /* default = localhost */
283 port, /* default port */
284 NULL, /* pg options */
285 NULL, /* tty, ignored */
286 mdb->m_db_name, /* database name */
287 mdb->m_db_user, /* login name */
288 mdb->m_db_password); /* password */
289 #else
290 /* Code for Postgresql 9.0 and greater */
291 const char *keywords[10] = {"host", "port",
292 "dbname", "user",
293 "password", "sslmode",
294 "sslkey", "sslcert",
295 "sslrootcert", NULL };
296 const char *values[10] = {mdb->m_db_address, /* default localhost */
297 port, /* default port */
298 mdb->m_db_name,
299 mdb->m_db_user,
300 mdb->m_db_password,
301 mdb->m_db_ssl_mode,
302 mdb->m_db_ssl_key,
303 mdb->m_db_ssl_cert,
304 mdb->m_db_ssl_ca,
305 NULL };
306 mdb->m_db_handle = PQconnectdbParams(keywords, values, 0);
307 #endif
308
309 /* If no connect, try once more in case it is a timing problem */
310 if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
311 break;
312 }
313 bmicrosleep(5, 0);
314 }
315
316 Dmsg0(dbglvl_info, "pg_real_connect done\n");
317 Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
318 mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
319
320 #ifdef HAVE_OPENSSL
321 #define USE_OPENSSL 1
322 SSL *ssl;
323 if (PQgetssl(mdb->m_db_handle) != NULL) {
324 Dmsg0(dbglvl_info, "SSL in use\n");
325 ssl = (SSL *)PQgetssl(mdb->m_db_handle);
326 Dmsg2(dbglvl_info, "Version:%s Cipher:%s\n", SSL_get_version(ssl), SSL_get_cipher(ssl));
327 } else {
328 Dmsg0(dbglvl_info, "SSL not in use\n");
329 }
330 #endif
331
332 if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
333 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
334 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
335 mdb->m_db_name, mdb->m_db_user);
336 goto get_out;
337 }
338
339 mdb->m_connected = true;
340 if (!bdb_check_version(jcr)) {
341 goto get_out;
342 }
343
344 sql_query("SET datestyle TO 'ISO, YMD'");
345 sql_query("SET cursor_tuple_fraction=1");
346 sql_query("SET client_min_messages TO WARNING");
347
348 /*
349 * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
350 * WARNING: nonstandard use of \\ in a string literal
351 */
352 sql_query("SET standard_conforming_strings=on");
353
354 /* Check that encoding is SQL_ASCII */
355 pgsql_check_database_encoding(jcr, mdb);
356
357 retval = true;
358
359 get_out:
360 V(mutex);
361 return retval;
362 }
363
bdb_close_database(JCR * jcr)364 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
365 {
366 BDB_POSTGRESQL *mdb = this;
367
368 if (mdb->m_connected) {
369 bdb_end_transaction(jcr);
370 }
371 P(mutex);
372 mdb->m_ref_count--;
373 if (mdb->m_ref_count == 0) {
374 if (mdb->m_connected) {
375 sql_free_result();
376 }
377 db_list->remove(mdb);
378 if (mdb->m_connected && mdb->m_db_handle) {
379 PQfinish(mdb->m_db_handle);
380 }
381 if (is_rwl_valid(&mdb->m_lock)) {
382 rwl_destroy(&mdb->m_lock);
383 }
384 free_pool_memory(mdb->errmsg);
385 free_pool_memory(mdb->cmd);
386 free_pool_memory(mdb->cached_path);
387 free_pool_memory(mdb->fname);
388 free_pool_memory(mdb->path);
389 free_pool_memory(mdb->esc_name);
390 free_pool_memory(mdb->esc_path);
391 free_pool_memory(mdb->esc_obj);
392 free_pool_memory(mdb->m_buf);
393 if (mdb->m_db_driver) {
394 free(mdb->m_db_driver);
395 }
396 if (mdb->m_db_name) {
397 free(mdb->m_db_name);
398 }
399 if (mdb->m_db_user) {
400 free(mdb->m_db_user);
401 }
402 if (mdb->m_db_password) {
403 free(mdb->m_db_password);
404 }
405 if (mdb->m_db_address) {
406 free(mdb->m_db_address);
407 }
408 if (mdb->m_db_socket) {
409 free(mdb->m_db_socket);
410 }
411 if (mdb->m_db_ssl_mode) {
412 free(mdb->m_db_ssl_mode);
413 }
414 if (mdb->m_db_ssl_key) {
415 free(mdb->m_db_ssl_key);
416 }
417 if (mdb->m_db_ssl_cert) {
418 free(mdb->m_db_ssl_cert);
419 }
420 if (mdb->m_db_ssl_ca) {
421 free(mdb->m_db_ssl_ca);
422 }
423 delete mdb;
424 if (db_list->size() == 0) {
425 delete db_list;
426 db_list = NULL;
427 }
428 }
429 V(mutex);
430 }
431
bdb_thread_cleanup(void)432 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
433 {
434 }
435
436 /*
437 * Escape strings so PostgreSQL is happy
438 *
439 * len is the length of the old string. Your new
440 * string must be long enough (max 2*old+1) to hold
441 * the escaped output.
442 */
bdb_escape_string(JCR * jcr,char * snew,char * old,int len)443 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
444 {
445 BDB_POSTGRESQL *mdb = this;
446 int failed;
447
448 PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
449 if (failed) {
450 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
451 /* failed on encoding, probably invalid multibyte encoding in the source string
452 see PQescapeStringConn documentation for details. */
453 Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
454 }
455 }
456
457 /*
458 * Escape binary so that PostgreSQL is happy
459 *
460 */
bdb_escape_object(JCR * jcr,char * old,int len)461 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
462 {
463 size_t new_len;
464 unsigned char *obj;
465 BDB_POSTGRESQL *mdb = this;
466
467 mdb->esc_obj[0] = 0;
468 obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
469 if (!obj) {
470 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
471 } else {
472 mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
473 memcpy(mdb->esc_obj, obj, new_len);
474 mdb->esc_obj[new_len] = 0;
475 PQfreemem(obj);
476 }
477 return (char *)mdb->esc_obj;
478 }
479
480 /*
481 * Unescape binary object so that PostgreSQL is happy
482 *
483 */
bdb_unescape_object(JCR * jcr,char * from,int32_t expected_len,POOLMEM ** dest,int32_t * dest_len)484 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
485 POOLMEM **dest, int32_t *dest_len)
486 {
487 size_t new_len;
488 unsigned char *obj;
489
490 if (!from) {
491 *dest[0] = 0;
492 *dest_len = 0;
493 return;
494 }
495
496 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
497
498 if (!obj) {
499 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
500 }
501
502 *dest_len = new_len;
503 *dest = check_pool_memory_size(*dest, new_len+1);
504 memcpy(*dest, obj, new_len);
505 (*dest)[new_len]=0;
506
507 PQfreemem(obj);
508
509 Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
510 }
511
512 /*
513 * Start a transaction. This groups inserts and makes things more efficient.
514 * Usually started when inserting file attributes.
515 */
bdb_start_transaction(JCR * jcr)516 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
517 {
518 BDB_POSTGRESQL *mdb = this;
519
520 if (jcr) {
521 if (!jcr->attr) {
522 jcr->attr = get_pool_memory(PM_FNAME);
523 }
524 if (!jcr->ar) {
525 jcr->ar = (ATTR_DBR *)bmalloc(sizeof(ATTR_DBR));
526 }
527 }
528
529 /*
530 * This is turned off because transactions break if
531 * multiple simultaneous jobs are run.
532 */
533 if (!mdb->m_allow_transactions) {
534 return;
535 }
536
537 bdb_lock();
538 /* Allow only 25,000 changes per transaction */
539 if (mdb->m_transaction && changes > 25000) {
540 bdb_end_transaction(jcr);
541 }
542 if (!mdb->m_transaction) {
543 sql_query("BEGIN"); /* begin transaction */
544 Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
545 mdb->m_transaction = true;
546 }
547 bdb_unlock();
548 }
549
bdb_end_transaction(JCR * jcr)550 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
551 {
552 BDB_POSTGRESQL *mdb = this;
553
554 if (!mdb->m_allow_transactions) {
555 return;
556 }
557
558 bdb_lock();
559 if (mdb->m_transaction) {
560 sql_query("COMMIT"); /* end transaction */
561 mdb->m_transaction = false;
562 Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
563 }
564 changes = 0;
565 bdb_unlock();
566 }
567
568
569 /*
570 * Submit a general SQL command, and for each row returned,
571 * the result_handler is called with the ctx.
572 */
bdb_big_sql_query(const char * query,DB_RESULT_HANDLER * result_handler,void * ctx)573 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
574 DB_RESULT_HANDLER *result_handler,
575 void *ctx)
576 {
577 BDB_POSTGRESQL *mdb = this;
578 SQL_ROW row;
579 bool retval = false;
580 bool in_transaction = mdb->m_transaction;
581
582 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
583
584 mdb->errmsg[0] = 0;
585 /* This code handles only SELECT queries */
586 if (strncasecmp(query, "SELECT", 6) != 0) {
587 return bdb_sql_query(query, result_handler, ctx);
588 }
589
590 if (!result_handler) { /* no need of big_query without handler */
591 return false;
592 }
593
594 bdb_lock();
595
596 if (!in_transaction) { /* CURSOR needs transaction */
597 sql_query("BEGIN");
598 }
599
600 Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
601
602 if (!sql_query(mdb->m_buf)) {
603 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
604 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
605 goto get_out;
606 }
607
608 do {
609 if (!sql_query("FETCH 100 FROM _bac_cursor")) {
610 Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
611 Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
612 goto get_out;
613 }
614 while ((row = sql_fetch_row()) != NULL) {
615 Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
616 if (result_handler(ctx, mdb->m_num_fields, row))
617 break;
618 }
619 PQclear(mdb->m_result);
620 m_result = NULL;
621
622 } while (m_num_rows > 0); /* TODO: Can probably test against 100 */
623
624 sql_query("CLOSE _bac_cursor");
625
626 Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
627 sql_free_result();
628 retval = true;
629
630 get_out:
631 if (!in_transaction) {
632 sql_query("COMMIT"); /* end transaction */
633 }
634
635 bdb_unlock();
636 return retval;
637 }
638
639 /*
640 * Submit a general SQL command, and for each row returned,
641 * the result_handler is called with the ctx.
642 */
bdb_sql_query(const char * query,DB_RESULT_HANDLER * result_handler,void * ctx)643 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
644 {
645 SQL_ROW row;
646 bool retval = true;
647 BDB_POSTGRESQL *mdb = this;
648
649 Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
650
651 bdb_lock();
652 mdb->errmsg[0] = 0;
653 if (!sql_query(query, QF_STORE_RESULT)) {
654 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
655 Dmsg0(dbglvl_err, "db_sql_query failed\n");
656 retval = false;
657 goto get_out;
658 }
659
660 Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
661
662 if (result_handler) {
663 Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
664 while ((row = sql_fetch_row())) {
665 Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
666 if (result_handler(ctx, mdb->m_num_fields, row))
667 break;
668 }
669 sql_free_result();
670 }
671
672 Dmsg0(dbglvl_info, "db_sql_query finished\n");
673
674 get_out:
675 bdb_unlock();
676 return retval;
677 }
678
679 /*
680 * If this routine returns false (failure), Bacula expects
681 * that no result has been stored.
682 * This is where QueryDB calls to with Postgresql.
683 *
684 * Returns: true on success
685 * false on failure
686 *
687 */
sql_query(const char * query,int flags)688 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
689 {
690 int i;
691 bool retval = false;
692 BDB_POSTGRESQL *mdb = this;
693
694 Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
695
696 /* We are starting a new query. reset everything. */
697 mdb->m_num_rows = -1;
698 mdb->m_row_number = -1;
699 mdb->m_field_number = -1;
700
701 if (mdb->m_result) {
702 PQclear(mdb->m_result); /* hmm, someone forgot to free?? */
703 mdb->m_result = NULL;
704 }
705
706 for (i = 0; i < 10; i++) {
707 mdb->m_result = PQexec(mdb->m_db_handle, query);
708 if (mdb->m_result) {
709 break;
710 }
711 bmicrosleep(5, 0);
712 }
713 if (!mdb->m_result) {
714 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
715 goto get_out;
716 }
717
718 mdb->m_status = PQresultStatus(mdb->m_result);
719 if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
720 Dmsg0(dbglvl_dbg, "we have a result\n");
721
722 /* How many fields in the set? */
723 mdb->m_num_fields = (int)PQnfields(mdb->m_result);
724 Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
725
726 mdb->m_num_rows = PQntuples(mdb->m_result);
727 Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
728
729 mdb->m_row_number = 0; /* we can start to fetch something */
730 mdb->m_status = 0; /* succeed */
731 retval = true;
732 } else {
733 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
734 goto get_out;
735 }
736
737 Dmsg0(dbglvl_info, "sql_query finishing\n");
738 goto ok_out;
739
740 get_out:
741 Dmsg0(dbglvl_err, "we failed\n");
742 PQclear(mdb->m_result);
743 mdb->m_result = NULL;
744 mdb->m_status = 1; /* failed */
745
746 ok_out:
747 return retval;
748 }
749
sql_free_result(void)750 void BDB_POSTGRESQL::sql_free_result(void)
751 {
752 BDB_POSTGRESQL *mdb = this;
753
754 bdb_lock();
755 if (mdb->m_result) {
756 PQclear(mdb->m_result);
757 mdb->m_result = NULL;
758 }
759 if (mdb->m_rows) {
760 free(mdb->m_rows);
761 mdb->m_rows = NULL;
762 }
763 if (mdb->m_fields) {
764 free(mdb->m_fields);
765 mdb->m_fields = NULL;
766 }
767 mdb->m_num_rows = mdb->m_num_fields = 0;
768 bdb_unlock();
769 }
770
sql_fetch_row(void)771 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
772 {
773 SQL_ROW row = NULL; /* by default, return NULL */
774 BDB_POSTGRESQL *mdb = this;
775
776 Dmsg0(dbglvl_info, "sql_fetch_row start\n");
777
778 if (mdb->m_num_fields == 0) { /* No field, no row */
779 Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
780 return NULL;
781 }
782
783 if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
784 if (mdb->m_rows) {
785 Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
786 free(mdb->m_rows);
787 }
788 Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
789 mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
790 mdb->m_rows_size = mdb->m_num_fields;
791
792 /* Now reset the row_number now that we have the space allocated */
793 mdb->m_row_number = 0;
794 }
795
796 /* If still within the result set */
797 if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
798 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
799
800 /* Get each value from this row */
801 for (int j = 0; j < mdb->m_num_fields; j++) {
802 mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
803 Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
804 }
805 mdb->m_row_number++; /* Increment the row number for the next call */
806 row = mdb->m_rows;
807 } else {
808 Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
809 }
810
811 Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
812
813 return row;
814 }
815
sql_strerror(void)816 const char *BDB_POSTGRESQL::sql_strerror(void)
817 {
818 BDB_POSTGRESQL *mdb = this;
819 return PQerrorMessage(mdb->m_db_handle);
820 }
821
sql_data_seek(int row)822 void BDB_POSTGRESQL::sql_data_seek(int row)
823 {
824 BDB_POSTGRESQL *mdb = this;
825 /* Set the row number to be returned on the next call to sql_fetch_row */
826 mdb->m_row_number = row;
827 }
828
sql_affected_rows(void)829 int BDB_POSTGRESQL::sql_affected_rows(void)
830 {
831 BDB_POSTGRESQL *mdb = this;
832 return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
833 }
834
sql_insert_autokey_record(const char * query,const char * table_name)835 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
836 {
837 uint64_t id = 0;
838 char sequence[NAMEDATALEN-1];
839 char getkeyval_query[NAMEDATALEN+50];
840 PGresult *p_result;
841 BDB_POSTGRESQL *mdb = this;
842
843 /* First execute the insert query and then retrieve the currval. */
844 if (!sql_query(query)) {
845 return 0;
846 }
847
848 mdb->m_num_rows = sql_affected_rows();
849 if (mdb->m_num_rows != 1) {
850 return 0;
851 }
852 mdb->changes++;
853 /*
854 * Obtain the current value of the sequence that
855 * provides the serial value for primary key of the table.
856 *
857 * currval is local to our session. It is not affected by
858 * other transactions.
859 *
860 * Determine the name of the sequence.
861 * PostgreSQL automatically creates a sequence using
862 * <table>_<column>_seq.
863 * At the time of writing, all tables used this format for
864 * for their primary key: <table>id
865 * Except for basefiles which has a primary key on baseid.
866 * Therefore, we need to special case that one table.
867 *
868 * everything else can use the PostgreSQL formula.
869 */
870 if (strcasecmp(table_name, "basefiles") == 0) {
871 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
872 } else {
873 bstrncpy(sequence, table_name, sizeof(sequence));
874 bstrncat(sequence, "_", sizeof(sequence));
875 bstrncat(sequence, table_name, sizeof(sequence));
876 bstrncat(sequence, "id", sizeof(sequence));
877 }
878
879 bstrncat(sequence, "_seq", sizeof(sequence));
880 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
881
882 Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
883 for (int i = 0; i < 10; i++) {
884 p_result = PQexec(mdb->m_db_handle, getkeyval_query);
885 if (p_result) {
886 break;
887 }
888 bmicrosleep(5, 0);
889 }
890 if (!p_result) {
891 Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
892 goto get_out;
893 }
894
895 Dmsg0(dbglvl_dbg, "exec done");
896
897 if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
898 Dmsg0(dbglvl_dbg, "getting value");
899 id = str_to_uint64(PQgetvalue(p_result, 0, 0));
900 Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
901 } else {
902 Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
903 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
904 }
905
906 get_out:
907 PQclear(p_result);
908 return id;
909 }
910
sql_fetch_field(void)911 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
912 {
913 int max_len;
914 int this_len;
915 BDB_POSTGRESQL *mdb = this;
916
917 Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
918
919 if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
920 if (mdb->m_fields) {
921 free(mdb->m_fields);
922 mdb->m_fields = NULL;
923 }
924 Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
925 mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
926 mdb->m_fields_size = mdb->m_num_fields;
927
928 for (int i = 0; i < mdb->m_num_fields; i++) {
929 Dmsg1(dbglvl_dbg, "filling field %d\n", i);
930 mdb->m_fields[i].name = PQfname(mdb->m_result, i);
931 mdb->m_fields[i].type = PQftype(mdb->m_result, i);
932 mdb->m_fields[i].flags = 0;
933
934 /* For a given column, find the max length. */
935 max_len = 0;
936 for (int j = 0; j < mdb->m_num_rows; j++) {
937 if (PQgetisnull(mdb->m_result, j, i)) {
938 this_len = 4; /* "NULL" */
939 } else {
940 this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
941 }
942
943 if (max_len < this_len) {
944 max_len = this_len;
945 }
946 }
947 mdb->m_fields[i].max_length = max_len;
948
949 Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
950 mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
951 }
952 }
953
954 /* Increment field number for the next time around */
955 return &mdb->m_fields[mdb->m_field_number++];
956 }
957
sql_field_is_not_null(int field_type)958 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
959 {
960 if (field_type == 1) {
961 return true;
962 }
963 return false;
964 }
965
sql_field_is_numeric(int field_type)966 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
967 {
968 /*
969 * TEMP: the following is taken from select OID, typname from pg_type;
970 */
971 switch (field_type) {
972 case 20:
973 case 21:
974 case 23:
975 case 700:
976 case 701:
977 return true;
978 default:
979 return false;
980 }
981 }
982
983 /*
984 * Escape strings so PostgreSQL is happy on COPY
985 *
986 * len is the length of the old string. Your new
987 * string must be long enough (max 2*old+1) to hold
988 * the escaped output.
989 */
pgsql_copy_escape(char * dest,char * src,size_t len)990 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
991 {
992 /* we have to escape \t, \n, \r, \ */
993 char c = '\0' ;
994
995 while (len > 0 && *src) {
996 switch (*src) {
997 case '\n':
998 c = 'n';
999 break;
1000 case '\\':
1001 c = '\\';
1002 break;
1003 case '\t':
1004 c = 't';
1005 break;
1006 case '\r':
1007 c = 'r';
1008 break;
1009 default:
1010 c = '\0' ;
1011 }
1012
1013 if (c) {
1014 *dest = '\\';
1015 dest++;
1016 *dest = c;
1017 } else {
1018 *dest = *src;
1019 }
1020
1021 len--;
1022 src++;
1023 dest++;
1024 }
1025
1026 *dest = '\0';
1027 return dest;
1028 }
1029
sql_batch_start(JCR * jcr)1030 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
1031 {
1032 BDB_POSTGRESQL *mdb = this;
1033 const char *query = "COPY batch FROM STDIN";
1034
1035 Dmsg0(dbglvl_info, "sql_batch_start started\n");
1036
1037 if (!sql_query("CREATE TEMPORARY TABLE batch ("
1038 "FileIndex int,"
1039 "JobId int,"
1040 "Path varchar,"
1041 "Name varchar,"
1042 "LStat varchar,"
1043 "Md5 varchar,"
1044 "DeltaSeq smallint)")) {
1045 Dmsg0(dbglvl_err, "sql_batch_start failed\n");
1046 return false;
1047 }
1048
1049 /* We are starting a new query. reset everything. */
1050 mdb->m_num_rows = -1;
1051 mdb->m_row_number = -1;
1052 mdb->m_field_number = -1;
1053
1054 sql_free_result();
1055
1056 for (int i=0; i < 10; i++) {
1057 mdb->m_result = PQexec(mdb->m_db_handle, query);
1058 if (mdb->m_result) {
1059 break;
1060 }
1061 bmicrosleep(5, 0);
1062 }
1063 if (!mdb->m_result) {
1064 Dmsg1(dbglvl_err, "Query failed: %s\n", query);
1065 goto get_out;
1066 }
1067
1068 mdb->m_status = PQresultStatus(mdb->m_result);
1069 if (mdb->m_status == PGRES_COPY_IN) {
1070 /* How many fields in the set? */
1071 mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1072 mdb->m_num_rows = 0;
1073 mdb->m_status = 1;
1074 } else {
1075 Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1076 goto get_out;
1077 }
1078
1079 Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1080
1081 return true;
1082
1083 get_out:
1084 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1085 mdb->m_status = 0;
1086 PQclear(mdb->m_result);
1087 mdb->m_result = NULL;
1088 return false;
1089 }
1090
1091 /*
1092 * Set error to something to abort the operation
1093 */
sql_batch_end(JCR * jcr,const char * error)1094 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1095 {
1096 int res;
1097 int count=30;
1098 PGresult *p_result;
1099 BDB_POSTGRESQL *mdb = this;
1100
1101 Dmsg0(dbglvl_info, "sql_batch_end started\n");
1102
1103 do {
1104 res = PQputCopyEnd(mdb->m_db_handle, error);
1105 } while (res == 0 && --count > 0);
1106
1107 if (res == 1) {
1108 Dmsg0(dbglvl_dbg, "ok\n");
1109 mdb->m_status = 0;
1110 }
1111
1112 if (res <= 0) {
1113 mdb->m_status = 1;
1114 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1115 Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1116 }
1117
1118 /* Check command status and return to normal libpq state */
1119 p_result = PQgetResult(mdb->m_db_handle);
1120 if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1121 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1122 mdb->m_status = 1;
1123 }
1124
1125 /* Get some statistics to compute the best plan */
1126 sql_query("ANALYZE batch");
1127
1128 PQclear(p_result);
1129
1130 Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1131 return true;
1132 }
1133
sql_batch_insert(JCR * jcr,ATTR_DBR * ar)1134 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1135 {
1136 int res;
1137 int count=30;
1138 size_t len;
1139 const char *digest;
1140 char ed1[50];
1141 BDB_POSTGRESQL *mdb = this;
1142
1143 mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1144 pgsql_copy_escape(mdb->esc_name, fname, fnl);
1145
1146 mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1147 pgsql_copy_escape(mdb->esc_path, path, pnl);
1148
1149 if (ar->Digest == NULL || ar->Digest[0] == 0) {
1150 digest = "0";
1151 } else {
1152 digest = ar->Digest;
1153 }
1154
1155 len = Mmsg(mdb->cmd, "%d\t%s\t%s\t%s\t%s\t%s\t%u\n",
1156 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1157 mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1158
1159 do {
1160 res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1161 } while (res == 0 && --count > 0);
1162
1163 if (res == 1) {
1164 Dmsg0(dbglvl_dbg, "ok\n");
1165 mdb->changes++;
1166 mdb->m_status = 1;
1167 }
1168
1169 if (res <= 0) {
1170 mdb->m_status = 0;
1171 Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1172 Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1173 }
1174
1175 Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1176
1177 return true;
1178 }
1179
1180
1181 #endif /* HAVE_POSTGRESQL */
1182