1 /*
2    Bacula(R) - The Network Backup Solution
3 
4    Copyright (C) 2000-2020 Kern Sibbald
5 
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8 
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13 
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16 
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  * Bacula Catalog Database routines specific to PostgreSQL
21  *   These are PostgreSQL specific routines
22  *
23  *    Dan Langille, December 2003
24  *    based upon work done by Kern Sibbald, March 2000
25  *
26  * Note: at one point, this file was changed to class based by a certain
27  *  programmer, and other than "wrapping" in a class, which is a trivial
28  *  change for a C++ programmer, nothing substantial was done, yet all the
29  *  code was recommitted under this programmer's name.  Consequently, we
30  *  undo those changes here.  Unfortunately, it is too difficult to put
31  *  back the original author's name (Dan Langille) on the parts he wrote.
32  */
33 
34 #include "bacula.h"
35 
36 #ifdef HAVE_POSTGRESQL
37 
38 #include  "cats.h"
39 
40 /* Note in this file, we want these for Postgresql not Bacula */
41 #undef PACKAGE_BUGREPORT
42 #undef PACKAGE_NAME
43 #undef PACKAGE_STRING
44 #undef PACKAGE_TARNAME
45 #undef PACKAGE_VERSION
46 
47 #include  "libpq-fe.h"
48 #include  "postgres_ext.h"       /* needed for NAMEDATALEN */
49 #include  "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
50 #include  "pg_config.h"          /* for PG_VERSION_NUM */
51 #define __BDB_POSTGRESQL_H_ 1
52 #include "bdb_postgresql.h"
53 
54 #define dbglvl_dbg   DT_SQL|100
55 #define dbglvl_info  DT_SQL|50
56 #define dbglvl_err   DT_SQL|10
57 
58 /* -----------------------------------------------------------------------
59  *
60  *   PostgreSQL dependent defines and subroutines
61  *
62  * -----------------------------------------------------------------------
63  */
64 
65 /* List of open databases */
66 static dlist *db_list = NULL;
67 
68 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
69 
BDB_POSTGRESQL()70 BDB_POSTGRESQL::BDB_POSTGRESQL(): BDB()
71 {
72    BDB_POSTGRESQL *mdb = this;
73 
74    if (db_list == NULL) {
75       db_list = New(dlist(mdb, &mdb->m_link));
76    }
77    mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
78    mdb->m_db_type = SQL_TYPE_POSTGRESQL;
79    mdb->m_db_driver = bstrdup("PostgreSQL");
80 
81    mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
82    mdb->errmsg[0] = 0;
83    mdb->cmd = get_pool_memory(PM_EMSG);    /* get command buffer */
84    mdb->cached_path = get_pool_memory(PM_FNAME);
85    mdb->cached_path_id = 0;
86    mdb->m_ref_count = 1;
87    mdb->fname = get_pool_memory(PM_FNAME);
88    mdb->path = get_pool_memory(PM_FNAME);
89    mdb->esc_name = get_pool_memory(PM_FNAME);
90    mdb->esc_path = get_pool_memory(PM_FNAME);
91    mdb->esc_obj = get_pool_memory(PM_FNAME);
92    mdb->m_use_fatal_jmsg = true;
93 
94    /* Initialize the private members. */
95    mdb->m_db_handle = NULL;
96    mdb->m_result = NULL;
97    mdb->m_buf =  get_pool_memory(PM_FNAME);
98 
99    db_list->append(this);
100 }
101 
~BDB_POSTGRESQL()102 BDB_POSTGRESQL::~BDB_POSTGRESQL()
103 {
104 }
105 
106 /*
107  * Initialize database data structure. In principal this should
108  * never have errors, or it is really fatal.
109  */
db_init_database(JCR * jcr,const char * db_driver,const char * db_name,const char * db_user,const char * db_password,const char * db_address,int db_port,const char * db_socket,const char * db_ssl_mode,const char * db_ssl_key,const char * db_ssl_cert,const char * db_ssl_ca,const char * db_ssl_capath,const char * db_ssl_cipher,bool mult_db_connections,bool disable_batch_insert)110 BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name,
111                        const char *db_user, const char *db_password,
112                        const char *db_address, int db_port,
113                        const char *db_socket,
114                        const char *db_ssl_mode, const char *db_ssl_key, const char *db_ssl_cert,
115                        const char *db_ssl_ca, const char *db_ssl_capath, const char *db_ssl_cipher,
116                        bool mult_db_connections, bool disable_batch_insert)
117 {
118    BDB_POSTGRESQL *mdb = NULL;
119 
120    if (!db_user) {
121       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
122       return NULL;
123    }
124    P(mutex);                          /* lock DB queue */
125    if (db_list && !mult_db_connections) {
126       /*
127        * Look to see if DB already open
128        */
129       foreach_dlist(mdb, db_list) {
130          if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
131             Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
132             mdb->increment_refcount();
133             goto get_out;
134          }
135       }
136    }
137    Dmsg0(dbglvl_info, "db_init_database first time\n");
138    /* Create the global Bacula db context */
139    mdb = New(BDB_POSTGRESQL());
140    if (!mdb) goto get_out;
141 
142    /*
143     * Initialize the parent class members.
144     */
145    mdb->m_db_name = bstrdup(db_name);
146    mdb->m_db_user = bstrdup(db_user);
147    if (db_password) {
148       mdb->m_db_password = bstrdup(db_password);
149    }
150    if (db_address) {
151       mdb->m_db_address = bstrdup(db_address);
152    }
153    if (db_socket) {
154       mdb->m_db_socket = bstrdup(db_socket);
155    }
156    if (db_ssl_mode) {
157       mdb->m_db_ssl_mode = bstrdup(db_ssl_mode);
158    } else {
159       mdb->m_db_ssl_mode = bstrdup("prefer");
160    }
161    if (db_ssl_key) {
162       mdb->m_db_ssl_key = bstrdup(db_ssl_key);
163    }
164    if (db_ssl_cert) {
165       mdb->m_db_ssl_cert = bstrdup(db_ssl_cert);
166    }
167    if (db_ssl_ca) {
168       mdb->m_db_ssl_ca = bstrdup(db_ssl_ca);
169    }
170    mdb->m_db_port = db_port;
171 
172    if (disable_batch_insert) {
173       mdb->m_disabled_batch_insert = true;
174       mdb->m_have_batch_insert = false;
175    } else {
176       mdb->m_disabled_batch_insert = false;
177 #ifdef USE_BATCH_FILE_INSERT
178 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
179 #ifdef HAVE_PQISTHREADSAFE
180       mdb->m_have_batch_insert = PQisthreadsafe();
181 #else
182       mdb->m_have_batch_insert = true;
183 #endif /* HAVE_PQISTHREADSAFE */
184 #else
185       mdb->m_have_batch_insert = true;
186 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
187 #else
188       mdb->m_have_batch_insert = false;
189 #endif /* USE_BATCH_FILE_INSERT */
190    }
191    mdb->m_allow_transactions = mult_db_connections;
192 
193    /* At this time, when mult_db_connections == true, this is for
194     * specific console command such as bvfs or batch mode, and we don't
195     * want to share a batch mode or bvfs. In the future, we can change
196     * the creation function to add this parameter.
197     */
198    mdb->m_dedicated = mult_db_connections;
199 
200 get_out:
201    V(mutex);
202    return mdb;
203 }
204 
205 
206 /* Check that the database corresponds to the encoding we want  */
pgsql_check_database_encoding(JCR * jcr,BDB_POSTGRESQL * mdb)207 static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
208 {
209    SQL_ROW row;
210    int ret = false;
211 
212    if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
213       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
214       return false;
215    }
216 
217    if ((row = mdb->sql_fetch_row()) == NULL) {
218       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
219       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
220    } else {
221       ret = bstrcmp(row[0], "SQL_ASCII");
222 
223       if (ret) {
224          /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
225          mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
226 
227       } else {
228          /* Something is wrong with database encoding */
229          Mmsg(mdb->errmsg,
230               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
231               mdb->get_db_name(), row[0]);
232          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
233          Dmsg1(dbglvl_err, "%s", mdb->errmsg);
234       }
235    }
236    return ret;
237 }
238 
239 /*
240  * Now actually open the database.  This can generate errors,
241  *   which are returned in the errmsg
242  *
243  *  DO NOT close the database or delete mdb here !!!!
244  */
bdb_open_database(JCR * jcr)245 bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
246 {
247    bool retval = false;
248    int errstat;
249    char buf[10], *port;
250    BDB_POSTGRESQL *mdb = this;
251 
252    P(mutex);
253    if (mdb->m_connected) {
254       retval = true;
255       goto get_out;
256    }
257 
258    if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
259       berrno be;
260       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
261             be.bstrerror(errstat));
262       goto get_out;
263    }
264 
265    if (mdb->m_db_port) {
266       bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
267       port = buf;
268    } else {
269       port = NULL;
270    }
271 
272    /* Tells libpq that the SSL library has already been initialized */
273    PQinitSSL(0);
274 
275    /* If connection fails, try at 5 sec intervals for 30 seconds. */
276    for (int retry=0; retry < 6; retry++) {
277 
278 #if PG_VERSION_NUM < 90000
279       /* connect to the database */
280       /* Old "depreciated" connection call */
281       mdb->m_db_handle = PQsetdbLogin(
282            mdb->m_db_address,         /* default = localhost */
283            port,                      /* default port */
284            NULL,                      /* pg options */
285            NULL,                      /* tty, ignored */
286            mdb->m_db_name,            /* database name */
287            mdb->m_db_user,            /* login name */
288            mdb->m_db_password);       /* password */
289 #else
290       /* Code for Postgresql 9.0 and greater */
291       const char *keywords[10] = {"host", "port",
292                                   "dbname", "user",
293                                   "password", "sslmode",
294                                   "sslkey", "sslcert",
295                                   "sslrootcert", NULL };
296       const char *values[10] = {mdb->m_db_address, /* default localhost */
297                                 port, /* default port */
298                                 mdb->m_db_name,
299                                 mdb->m_db_user,
300                                 mdb->m_db_password,
301                                 mdb->m_db_ssl_mode,
302                                 mdb->m_db_ssl_key,
303                                 mdb->m_db_ssl_cert,
304                                 mdb->m_db_ssl_ca,
305                                 NULL };
306       mdb->m_db_handle = PQconnectdbParams(keywords, values, 0);
307 #endif
308 
309       /* If no connect, try once more in case it is a timing problem */
310       if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
311          break;
312       }
313       bmicrosleep(5, 0);
314    }
315 
316    Dmsg0(dbglvl_info, "pg_real_connect done\n");
317    Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
318         mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
319 
320 #ifdef HAVE_OPENSSL
321    #define USE_OPENSSL 1
322    SSL *ssl;
323    if (PQgetssl(mdb->m_db_handle) != NULL) {
324       Dmsg0(dbglvl_info, "SSL in use\n");
325       ssl = (SSL *)PQgetssl(mdb->m_db_handle);
326       Dmsg2(dbglvl_info, "Version:%s Cipher:%s\n", SSL_get_version(ssl), SSL_get_cipher(ssl));
327    } else {
328       Dmsg0(dbglvl_info, "SSL not in use\n");
329    }
330 #endif
331 
332    if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
333       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
334          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
335          mdb->m_db_name, mdb->m_db_user);
336       goto get_out;
337    }
338 
339    mdb->m_connected = true;
340    if (!bdb_check_version(jcr)) {
341       goto get_out;
342    }
343 
344    sql_query("SET datestyle TO 'ISO, YMD'");
345    sql_query("SET cursor_tuple_fraction=1");
346    sql_query("SET client_min_messages TO WARNING");
347 
348    /*
349     * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
350     *   WARNING:  nonstandard use of \\ in a string literal
351     */
352    sql_query("SET standard_conforming_strings=on");
353 
354    /* Check that encoding is SQL_ASCII */
355    pgsql_check_database_encoding(jcr, mdb);
356 
357    retval = true;
358 
359 get_out:
360    V(mutex);
361    return retval;
362 }
363 
bdb_close_database(JCR * jcr)364 void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
365 {
366    BDB_POSTGRESQL *mdb = this;
367 
368    if (mdb->m_connected) {
369       bdb_end_transaction(jcr);
370    }
371    P(mutex);
372    mdb->m_ref_count--;
373    if (mdb->m_ref_count == 0) {
374       if (mdb->m_connected) {
375          sql_free_result();
376       }
377       db_list->remove(mdb);
378       if (mdb->m_connected && mdb->m_db_handle) {
379          PQfinish(mdb->m_db_handle);
380       }
381       if (is_rwl_valid(&mdb->m_lock)) {
382          rwl_destroy(&mdb->m_lock);
383       }
384       free_pool_memory(mdb->errmsg);
385       free_pool_memory(mdb->cmd);
386       free_pool_memory(mdb->cached_path);
387       free_pool_memory(mdb->fname);
388       free_pool_memory(mdb->path);
389       free_pool_memory(mdb->esc_name);
390       free_pool_memory(mdb->esc_path);
391       free_pool_memory(mdb->esc_obj);
392       free_pool_memory(mdb->m_buf);
393       if (mdb->m_db_driver) {
394          free(mdb->m_db_driver);
395       }
396       if (mdb->m_db_name) {
397          free(mdb->m_db_name);
398       }
399       if (mdb->m_db_user) {
400          free(mdb->m_db_user);
401       }
402       if (mdb->m_db_password) {
403          free(mdb->m_db_password);
404       }
405       if (mdb->m_db_address) {
406          free(mdb->m_db_address);
407       }
408       if (mdb->m_db_socket) {
409          free(mdb->m_db_socket);
410       }
411       if (mdb->m_db_ssl_mode) {
412          free(mdb->m_db_ssl_mode);
413       }
414       if (mdb->m_db_ssl_key) {
415          free(mdb->m_db_ssl_key);
416       }
417       if (mdb->m_db_ssl_cert) {
418          free(mdb->m_db_ssl_cert);
419       }
420       if (mdb->m_db_ssl_ca) {
421          free(mdb->m_db_ssl_ca);
422       }
423       delete mdb;
424       if (db_list->size() == 0) {
425          delete db_list;
426          db_list = NULL;
427       }
428    }
429    V(mutex);
430 }
431 
bdb_thread_cleanup(void)432 void BDB_POSTGRESQL::bdb_thread_cleanup(void)
433 {
434 }
435 
436 /*
437  * Escape strings so PostgreSQL is happy
438  *
439  *  len is the length of the old string. Your new
440  *    string must be long enough (max 2*old+1) to hold
441  *    the escaped output.
442  */
bdb_escape_string(JCR * jcr,char * snew,char * old,int len)443 void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
444 {
445    BDB_POSTGRESQL *mdb = this;
446    int failed;
447 
448    PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
449    if (failed) {
450       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
451       /* failed on encoding, probably invalid multibyte encoding in the source string
452          see PQescapeStringConn documentation for details. */
453       Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
454    }
455 }
456 
457 /*
458  * Escape binary so that PostgreSQL is happy
459  *
460  */
bdb_escape_object(JCR * jcr,char * old,int len)461 char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
462 {
463    size_t new_len;
464    unsigned char *obj;
465    BDB_POSTGRESQL *mdb = this;
466 
467    mdb->esc_obj[0] = 0;
468    obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
469    if (!obj) {
470       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
471    } else {
472       mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
473       memcpy(mdb->esc_obj, obj, new_len);
474       mdb->esc_obj[new_len] = 0;
475       PQfreemem(obj);
476    }
477    return (char *)mdb->esc_obj;
478 }
479 
480 /*
481  * Unescape binary object so that PostgreSQL is happy
482  *
483  */
bdb_unescape_object(JCR * jcr,char * from,int32_t expected_len,POOLMEM ** dest,int32_t * dest_len)484 void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
485                        POOLMEM **dest, int32_t *dest_len)
486 {
487    size_t new_len;
488    unsigned char *obj;
489 
490    if (!from) {
491       *dest[0] = 0;
492       *dest_len = 0;
493       return;
494    }
495 
496    obj = PQunescapeBytea((unsigned const char *)from, &new_len);
497 
498    if (!obj) {
499       Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
500    }
501 
502    *dest_len = new_len;
503    *dest = check_pool_memory_size(*dest, new_len+1);
504    memcpy(*dest, obj, new_len);
505    (*dest)[new_len]=0;
506 
507    PQfreemem(obj);
508 
509    Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
510 }
511 
512 /*
513  * Start a transaction. This groups inserts and makes things more efficient.
514  *  Usually started when inserting file attributes.
515  */
bdb_start_transaction(JCR * jcr)516 void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
517 {
518    BDB_POSTGRESQL *mdb = this;
519 
520    if (jcr) {
521       if (!jcr->attr) {
522          jcr->attr = get_pool_memory(PM_FNAME);
523       }
524       if (!jcr->ar) {
525          jcr->ar = (ATTR_DBR *)bmalloc(sizeof(ATTR_DBR));
526       }
527    }
528 
529    /*
530     * This is turned off because transactions break if
531     *  multiple simultaneous jobs are run.
532     */
533    if (!mdb->m_allow_transactions) {
534       return;
535    }
536 
537    bdb_lock();
538    /* Allow only 25,000 changes per transaction */
539    if (mdb->m_transaction && changes > 25000) {
540       bdb_end_transaction(jcr);
541    }
542    if (!mdb->m_transaction) {
543       sql_query("BEGIN");             /* begin transaction */
544       Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
545       mdb->m_transaction = true;
546    }
547    bdb_unlock();
548 }
549 
bdb_end_transaction(JCR * jcr)550 void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
551 {
552    BDB_POSTGRESQL *mdb = this;
553 
554    if (!mdb->m_allow_transactions) {
555       return;
556    }
557 
558    bdb_lock();
559    if (mdb->m_transaction) {
560       sql_query("COMMIT");            /* end transaction */
561       mdb->m_transaction = false;
562       Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
563    }
564    changes = 0;
565    bdb_unlock();
566 }
567 
568 
569 /*
570  * Submit a general SQL command, and for each row returned,
571  *  the result_handler is called with the ctx.
572  */
bdb_big_sql_query(const char * query,DB_RESULT_HANDLER * result_handler,void * ctx)573 bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
574                                        DB_RESULT_HANDLER *result_handler,
575                                        void *ctx)
576 {
577    BDB_POSTGRESQL *mdb = this;
578    SQL_ROW row;
579    bool retval = false;
580    bool in_transaction = mdb->m_transaction;
581 
582    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
583 
584    mdb->errmsg[0] = 0;
585    /* This code handles only SELECT queries */
586    if (strncasecmp(query, "SELECT", 6) != 0) {
587       return bdb_sql_query(query, result_handler, ctx);
588    }
589 
590    if (!result_handler) {       /* no need of big_query without handler */
591       return false;
592    }
593 
594    bdb_lock();
595 
596    if (!in_transaction) {       /* CURSOR needs transaction */
597       sql_query("BEGIN");
598    }
599 
600    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
601 
602    if (!sql_query(mdb->m_buf)) {
603       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
604       Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
605       goto get_out;
606    }
607 
608    do {
609       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
610          Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
611          Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
612          goto get_out;
613       }
614       while ((row = sql_fetch_row()) != NULL) {
615          Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
616          if (result_handler(ctx, mdb->m_num_fields, row))
617             break;
618       }
619       PQclear(mdb->m_result);
620       m_result = NULL;
621 
622    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
623 
624    sql_query("CLOSE _bac_cursor");
625 
626    Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
627    sql_free_result();
628    retval = true;
629 
630 get_out:
631    if (!in_transaction) {
632       sql_query("COMMIT");  /* end transaction */
633    }
634 
635    bdb_unlock();
636    return retval;
637 }
638 
639 /*
640  * Submit a general SQL command, and for each row returned,
641  *  the result_handler is called with the ctx.
642  */
bdb_sql_query(const char * query,DB_RESULT_HANDLER * result_handler,void * ctx)643 bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
644 {
645    SQL_ROW row;
646    bool retval = true;
647    BDB_POSTGRESQL *mdb = this;
648 
649    Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
650 
651    bdb_lock();
652    mdb->errmsg[0] = 0;
653    if (!sql_query(query, QF_STORE_RESULT)) {
654       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
655       Dmsg0(dbglvl_err, "db_sql_query failed\n");
656       retval = false;
657       goto get_out;
658    }
659 
660    Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
661 
662    if (result_handler) {
663       Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
664       while ((row = sql_fetch_row())) {
665          Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
666          if (result_handler(ctx, mdb->m_num_fields, row))
667             break;
668       }
669       sql_free_result();
670    }
671 
672    Dmsg0(dbglvl_info, "db_sql_query finished\n");
673 
674 get_out:
675    bdb_unlock();
676    return retval;
677 }
678 
679 /*
680  * If this routine returns false (failure), Bacula expects
681  *   that no result has been stored.
682  * This is where QueryDB calls to with Postgresql.
683  *
684  *  Returns: true  on success
685  *           false on failure
686  *
687  */
sql_query(const char * query,int flags)688 bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
689 {
690    int i;
691    bool retval = false;
692    BDB_POSTGRESQL *mdb = this;
693 
694    Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
695 
696    /* We are starting a new query. reset everything. */
697    mdb->m_num_rows     = -1;
698    mdb->m_row_number   = -1;
699    mdb->m_field_number = -1;
700 
701    if (mdb->m_result) {
702       PQclear(mdb->m_result);  /* hmm, someone forgot to free?? */
703       mdb->m_result = NULL;
704    }
705 
706    for (i = 0; i < 10; i++) {
707       mdb->m_result = PQexec(mdb->m_db_handle, query);
708       if (mdb->m_result) {
709          break;
710       }
711       bmicrosleep(5, 0);
712    }
713    if (!mdb->m_result) {
714       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
715       goto get_out;
716    }
717 
718    mdb->m_status = PQresultStatus(mdb->m_result);
719    if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
720       Dmsg0(dbglvl_dbg, "we have a result\n");
721 
722       /* How many fields in the set? */
723       mdb->m_num_fields = (int)PQnfields(mdb->m_result);
724       Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
725 
726       mdb->m_num_rows = PQntuples(mdb->m_result);
727       Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
728 
729       mdb->m_row_number = 0;      /* we can start to fetch something */
730       mdb->m_status = 0;          /* succeed */
731       retval = true;
732    } else {
733       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
734       goto get_out;
735    }
736 
737    Dmsg0(dbglvl_info, "sql_query finishing\n");
738    goto ok_out;
739 
740 get_out:
741    Dmsg0(dbglvl_err, "we failed\n");
742    PQclear(mdb->m_result);
743    mdb->m_result = NULL;
744    mdb->m_status = 1;                   /* failed */
745 
746 ok_out:
747    return retval;
748 }
749 
sql_free_result(void)750 void BDB_POSTGRESQL::sql_free_result(void)
751 {
752    BDB_POSTGRESQL *mdb = this;
753 
754    bdb_lock();
755    if (mdb->m_result) {
756       PQclear(mdb->m_result);
757       mdb->m_result = NULL;
758    }
759    if (mdb->m_rows) {
760       free(mdb->m_rows);
761       mdb->m_rows = NULL;
762    }
763    if (mdb->m_fields) {
764       free(mdb->m_fields);
765       mdb->m_fields = NULL;
766    }
767    mdb->m_num_rows = mdb->m_num_fields = 0;
768    bdb_unlock();
769 }
770 
sql_fetch_row(void)771 SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
772 {
773    SQL_ROW row = NULL;            /* by default, return NULL */
774    BDB_POSTGRESQL *mdb = this;
775 
776    Dmsg0(dbglvl_info, "sql_fetch_row start\n");
777 
778    if (mdb->m_num_fields == 0) {     /* No field, no row */
779       Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
780       return NULL;
781    }
782 
783    if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
784       if (mdb->m_rows) {
785          Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
786          free(mdb->m_rows);
787       }
788       Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
789       mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
790       mdb->m_rows_size = mdb->m_num_fields;
791 
792       /* Now reset the row_number now that we have the space allocated */
793       mdb->m_row_number = 0;
794    }
795 
796    /* If still within the result set */
797    if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
798       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
799 
800       /* Get each value from this row */
801       for (int j = 0; j < mdb->m_num_fields; j++) {
802          mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
803          Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
804       }
805       mdb->m_row_number++;  /* Increment the row number for the next call */
806       row = mdb->m_rows;
807    } else {
808       Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
809    }
810 
811    Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
812 
813    return row;
814 }
815 
sql_strerror(void)816 const char *BDB_POSTGRESQL::sql_strerror(void)
817 {
818    BDB_POSTGRESQL *mdb = this;
819    return PQerrorMessage(mdb->m_db_handle);
820 }
821 
sql_data_seek(int row)822 void BDB_POSTGRESQL::sql_data_seek(int row)
823 {
824    BDB_POSTGRESQL *mdb = this;
825    /* Set the row number to be returned on the next call to sql_fetch_row */
826    mdb->m_row_number = row;
827 }
828 
sql_affected_rows(void)829 int BDB_POSTGRESQL::sql_affected_rows(void)
830 {
831    BDB_POSTGRESQL *mdb = this;
832    return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
833 }
834 
sql_insert_autokey_record(const char * query,const char * table_name)835 uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
836 {
837    uint64_t id = 0;
838    char sequence[NAMEDATALEN-1];
839    char getkeyval_query[NAMEDATALEN+50];
840    PGresult *p_result;
841    BDB_POSTGRESQL *mdb = this;
842 
843    /* First execute the insert query and then retrieve the currval. */
844    if (!sql_query(query)) {
845       return 0;
846    }
847 
848    mdb->m_num_rows = sql_affected_rows();
849    if (mdb->m_num_rows != 1) {
850       return 0;
851    }
852    mdb->changes++;
853    /*
854     *  Obtain the current value of the sequence that
855     *  provides the serial value for primary key of the table.
856     *
857     *  currval is local to our session.  It is not affected by
858     *  other transactions.
859     *
860     *  Determine the name of the sequence.
861     *  PostgreSQL automatically creates a sequence using
862     *   <table>_<column>_seq.
863     *  At the time of writing, all tables used this format for
864     *   for their primary key: <table>id
865     *  Except for basefiles which has a primary key on baseid.
866     *  Therefore, we need to special case that one table.
867     *
868     *  everything else can use the PostgreSQL formula.
869     */
870    if (strcasecmp(table_name, "basefiles") == 0) {
871       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
872    } else {
873       bstrncpy(sequence, table_name, sizeof(sequence));
874       bstrncat(sequence, "_",        sizeof(sequence));
875       bstrncat(sequence, table_name, sizeof(sequence));
876       bstrncat(sequence, "id",       sizeof(sequence));
877    }
878 
879    bstrncat(sequence, "_seq", sizeof(sequence));
880    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
881 
882    Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
883    for (int i = 0; i < 10; i++) {
884       p_result = PQexec(mdb->m_db_handle, getkeyval_query);
885       if (p_result) {
886          break;
887       }
888       bmicrosleep(5, 0);
889    }
890    if (!p_result) {
891       Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
892       goto get_out;
893    }
894 
895    Dmsg0(dbglvl_dbg, "exec done");
896 
897    if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
898       Dmsg0(dbglvl_dbg, "getting value");
899       id = str_to_uint64(PQgetvalue(p_result, 0, 0));
900       Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
901    } else {
902       Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
903       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
904    }
905 
906 get_out:
907    PQclear(p_result);
908    return id;
909 }
910 
sql_fetch_field(void)911 SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
912 {
913    int max_len;
914    int this_len;
915    BDB_POSTGRESQL *mdb = this;
916 
917    Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
918 
919    if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
920       if (mdb->m_fields) {
921          free(mdb->m_fields);
922          mdb->m_fields = NULL;
923       }
924       Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
925       mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
926       mdb->m_fields_size = mdb->m_num_fields;
927 
928       for (int i = 0; i < mdb->m_num_fields; i++) {
929          Dmsg1(dbglvl_dbg, "filling field %d\n", i);
930          mdb->m_fields[i].name = PQfname(mdb->m_result, i);
931          mdb->m_fields[i].type = PQftype(mdb->m_result, i);
932          mdb->m_fields[i].flags = 0;
933 
934          /* For a given column, find the max length. */
935          max_len = 0;
936          for (int j = 0; j < mdb->m_num_rows; j++) {
937             if (PQgetisnull(mdb->m_result, j, i)) {
938                this_len = 4;         /* "NULL" */
939             } else {
940                this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
941             }
942 
943             if (max_len < this_len) {
944                max_len = this_len;
945             }
946          }
947          mdb->m_fields[i].max_length = max_len;
948 
949          Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
950                mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
951       }
952    }
953 
954    /* Increment field number for the next time around */
955    return &mdb->m_fields[mdb->m_field_number++];
956 }
957 
sql_field_is_not_null(int field_type)958 bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
959 {
960    if (field_type == 1) {
961       return true;
962    }
963    return false;
964 }
965 
sql_field_is_numeric(int field_type)966 bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
967 {
968     /*
969      * TEMP: the following is taken from select OID, typname from pg_type;
970      */
971     switch (field_type) {
972     case 20:
973     case 21:
974     case 23:
975     case 700:
976     case 701:
977        return true;
978     default:
979        return false;
980     }
981 }
982 
983 /*
984  * Escape strings so PostgreSQL is happy on COPY
985  *
986  * len is the length of the old string. Your new
987  *   string must be long enough (max 2*old+1) to hold
988  *   the escaped output.
989  */
pgsql_copy_escape(char * dest,char * src,size_t len)990 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
991 {
992     /* we have to escape \t, \n, \r, \ */
993     char c = '\0' ;
994 
995     while (len > 0 && *src) {
996        switch (*src) {
997        case '\n':
998           c = 'n';
999           break;
1000        case '\\':
1001           c = '\\';
1002           break;
1003        case '\t':
1004           c = 't';
1005           break;
1006        case '\r':
1007           c = 'r';
1008           break;
1009        default:
1010           c = '\0' ;
1011        }
1012 
1013        if (c) {
1014           *dest = '\\';
1015           dest++;
1016           *dest = c;
1017        } else {
1018           *dest = *src;
1019        }
1020 
1021        len--;
1022        src++;
1023        dest++;
1024     }
1025 
1026     *dest = '\0';
1027     return dest;
1028 }
1029 
sql_batch_start(JCR * jcr)1030 bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
1031 {
1032    BDB_POSTGRESQL *mdb = this;
1033    const char *query = "COPY batch FROM STDIN";
1034 
1035    Dmsg0(dbglvl_info, "sql_batch_start started\n");
1036 
1037    if  (!sql_query("CREATE TEMPORARY TABLE batch ("
1038                           "FileIndex int,"
1039                           "JobId int,"
1040                           "Path varchar,"
1041                           "Name varchar,"
1042                           "LStat varchar,"
1043                           "Md5 varchar,"
1044                           "DeltaSeq smallint)")) {
1045       Dmsg0(dbglvl_err, "sql_batch_start failed\n");
1046       return false;
1047    }
1048 
1049    /* We are starting a new query.  reset everything. */
1050    mdb->m_num_rows     = -1;
1051    mdb->m_row_number   = -1;
1052    mdb->m_field_number = -1;
1053 
1054    sql_free_result();
1055 
1056    for (int i=0; i < 10; i++) {
1057       mdb->m_result = PQexec(mdb->m_db_handle, query);
1058       if (mdb->m_result) {
1059          break;
1060       }
1061       bmicrosleep(5, 0);
1062    }
1063    if (!mdb->m_result) {
1064       Dmsg1(dbglvl_err, "Query failed: %s\n", query);
1065       goto get_out;
1066    }
1067 
1068    mdb->m_status = PQresultStatus(mdb->m_result);
1069    if (mdb->m_status == PGRES_COPY_IN) {
1070       /* How many fields in the set? */
1071       mdb->m_num_fields = (int) PQnfields(mdb->m_result);
1072       mdb->m_num_rows = 0;
1073       mdb->m_status = 1;
1074    } else {
1075       Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
1076       goto get_out;
1077    }
1078 
1079    Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
1080 
1081    return true;
1082 
1083 get_out:
1084    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1085    mdb->m_status = 0;
1086    PQclear(mdb->m_result);
1087    mdb->m_result = NULL;
1088    return false;
1089 }
1090 
1091 /*
1092  * Set error to something to abort the operation
1093  */
sql_batch_end(JCR * jcr,const char * error)1094 bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
1095 {
1096    int res;
1097    int count=30;
1098    PGresult *p_result;
1099    BDB_POSTGRESQL *mdb = this;
1100 
1101    Dmsg0(dbglvl_info, "sql_batch_end started\n");
1102 
1103    do {
1104       res = PQputCopyEnd(mdb->m_db_handle, error);
1105    } while (res == 0 && --count > 0);
1106 
1107    if (res == 1) {
1108       Dmsg0(dbglvl_dbg, "ok\n");
1109       mdb->m_status = 0;
1110    }
1111 
1112    if (res <= 0) {
1113       mdb->m_status = 1;
1114       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1115       Dmsg1(dbglvl_err, "failure %s\n", errmsg);
1116    }
1117 
1118    /* Check command status and return to normal libpq state */
1119    p_result = PQgetResult(mdb->m_db_handle);
1120    if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
1121       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1122       mdb->m_status = 1;
1123    }
1124 
1125    /* Get some statistics to compute the best plan */
1126    sql_query("ANALYZE batch");
1127 
1128    PQclear(p_result);
1129 
1130    Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
1131    return true;
1132 }
1133 
sql_batch_insert(JCR * jcr,ATTR_DBR * ar)1134 bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1135 {
1136    int res;
1137    int count=30;
1138    size_t len;
1139    const char *digest;
1140    char ed1[50];
1141    BDB_POSTGRESQL *mdb = this;
1142 
1143    mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
1144    pgsql_copy_escape(mdb->esc_name, fname, fnl);
1145 
1146    mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
1147    pgsql_copy_escape(mdb->esc_path, path, pnl);
1148 
1149    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1150       digest = "0";
1151    } else {
1152       digest = ar->Digest;
1153    }
1154 
1155    len = Mmsg(mdb->cmd, "%d\t%s\t%s\t%s\t%s\t%s\t%u\n",
1156               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
1157               mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
1158 
1159    do {
1160       res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
1161    } while (res == 0 && --count > 0);
1162 
1163    if (res == 1) {
1164       Dmsg0(dbglvl_dbg, "ok\n");
1165       mdb->changes++;
1166       mdb->m_status = 1;
1167    }
1168 
1169    if (res <= 0) {
1170       mdb->m_status = 0;
1171       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
1172       Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
1173    }
1174 
1175    Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
1176 
1177    return true;
1178 }
1179 
1180 
1181 #endif /* HAVE_POSTGRESQL */
1182