1 /*-------------------------------------------------------------------------
2 *
3 * libpq_fetch.c
4 * Functions for fetching files from a remote server.
5 *
6 * Copyright (c) 2013-2017, PostgreSQL Global Development Group
7 *
8 *-------------------------------------------------------------------------
9 */
10 #include "postgres_fe.h"
11
12 #include <sys/stat.h>
13 #include <dirent.h>
14 #include <fcntl.h>
15 #include <unistd.h>
16
17 /* for ntohl/htonl */
18 #include <netinet/in.h>
19 #include <arpa/inet.h>
20
21 #include "pg_rewind.h"
22 #include "datapagemap.h"
23 #include "fetch.h"
24 #include "file_ops.h"
25 #include "filemap.h"
26 #include "logging.h"
27
28 #include "libpq-fe.h"
29 #include "catalog/catalog.h"
30 #include "catalog/pg_type.h"
31 #include "fe_utils/connect.h"
32
33 static PGconn *conn = NULL;
34
35 /*
36 * Files are fetched max CHUNKSIZE bytes at a time.
37 *
38 * (This only applies to files that are copied in whole, or for truncated
39 * files where we copy the tail. Relation files, where we know the individual
40 * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
41 */
42 #define CHUNKSIZE 1000000
43
44 static void receiveFileChunks(const char *sql);
45 static void execute_pagemap(datapagemap_t *pagemap, const char *path);
46 static char *run_simple_query(const char *sql);
47 static void run_simple_command(const char *sql);
48
49 void
libpqConnect(const char * connstr)50 libpqConnect(const char *connstr)
51 {
52 char *str;
53 PGresult *res;
54
55 conn = PQconnectdb(connstr);
56 if (PQstatus(conn) == CONNECTION_BAD)
57 pg_fatal("%s", PQerrorMessage(conn));
58
59 pg_log(PG_PROGRESS, "connected to server\n");
60
61 /* disable all types of timeouts */
62 run_simple_command("SET statement_timeout = 0");
63 run_simple_command("SET lock_timeout = 0");
64 run_simple_command("SET idle_in_transaction_session_timeout = 0");
65
66 res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
67 if (PQresultStatus(res) != PGRES_TUPLES_OK)
68 pg_fatal("could not clear search_path: %s",
69 PQresultErrorMessage(res));
70 PQclear(res);
71
72 /*
73 * Check that the server is not in hot standby mode. There is no
74 * fundamental reason that couldn't be made to work, but it doesn't
75 * currently because we use a temporary table. Better to check for it
76 * explicitly than error out, for a better error message.
77 */
78 str = run_simple_query("SELECT pg_is_in_recovery()");
79 if (strcmp(str, "f") != 0)
80 pg_fatal("source server must not be in recovery mode\n");
81 pg_free(str);
82
83 /*
84 * Also check that full_page_writes is enabled. We can get torn pages if
85 * a page is modified while we read it with pg_read_binary_file(), and we
86 * rely on full page images to fix them.
87 */
88 str = run_simple_query("SHOW full_page_writes");
89 if (strcmp(str, "on") != 0)
90 pg_fatal("full_page_writes must be enabled in the source server\n");
91 pg_free(str);
92
93 /*
94 * Although we don't do any "real" updates, we do work with a temporary
95 * table. We don't care about synchronous commit for that. It doesn't
96 * otherwise matter much, but if the server is using synchronous
97 * replication, and replication isn't working for some reason, we don't
98 * want to get stuck, waiting for it to start working again.
99 */
100 run_simple_command("SET synchronous_commit = off");
101 }
102
103 /*
104 * Runs a query that returns a single value.
105 * The result should be pg_free'd after use.
106 */
107 static char *
run_simple_query(const char * sql)108 run_simple_query(const char *sql)
109 {
110 PGresult *res;
111 char *result;
112
113 res = PQexec(conn, sql);
114
115 if (PQresultStatus(res) != PGRES_TUPLES_OK)
116 pg_fatal("error running query (%s) in source server: %s",
117 sql, PQresultErrorMessage(res));
118
119 /* sanity check the result set */
120 if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
121 pg_fatal("unexpected result set from query\n");
122
123 result = pg_strdup(PQgetvalue(res, 0, 0));
124
125 PQclear(res);
126
127 return result;
128 }
129
130 /*
131 * Runs a command.
132 * In the event of a failure, exit immediately.
133 */
134 static void
run_simple_command(const char * sql)135 run_simple_command(const char *sql)
136 {
137 PGresult *res;
138
139 res = PQexec(conn, sql);
140
141 if (PQresultStatus(res) != PGRES_COMMAND_OK)
142 pg_fatal("error running query (%s) in source server: %s",
143 sql, PQresultErrorMessage(res));
144
145 PQclear(res);
146 }
147
148 /*
149 * Calls pg_current_wal_insert_lsn() function
150 */
151 XLogRecPtr
libpqGetCurrentXlogInsertLocation(void)152 libpqGetCurrentXlogInsertLocation(void)
153 {
154 XLogRecPtr result;
155 uint32 hi;
156 uint32 lo;
157 char *val;
158
159 val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
160
161 if (sscanf(val, "%X/%X", &hi, &lo) != 2)
162 pg_fatal("unrecognized result \"%s\" for current WAL insert location\n", val);
163
164 result = ((uint64) hi) << 32 | lo;
165
166 pg_free(val);
167
168 return result;
169 }
170
171 /*
172 * Get a list of all files in the data directory.
173 */
174 void
libpqProcessFileList(void)175 libpqProcessFileList(void)
176 {
177 PGresult *res;
178 const char *sql;
179 int i;
180
181 /*
182 * Create a recursive directory listing of the whole data directory.
183 *
184 * The WITH RECURSIVE part does most of the work. The second part gets the
185 * targets of the symlinks in pg_tblspc directory.
186 *
187 * XXX: There is no backend function to get a symbolic link's target in
188 * general, so if the admin has put any custom symbolic links in the data
189 * directory, they won't be copied correctly.
190 */
191 sql =
192 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
193 " SELECT '' AS path, filename, size, isdir FROM\n"
194 " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
195 " pg_stat_file(fn.filename, true) AS this\n"
196 " UNION ALL\n"
197 " SELECT parent.path || parent.filename || '/' AS path,\n"
198 " fn, this.size, this.isdir\n"
199 " FROM files AS parent,\n"
200 " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
201 " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
202 " WHERE parent.isdir = 't'\n"
203 ")\n"
204 "SELECT path || filename, size, isdir,\n"
205 " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
206 "FROM files\n"
207 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
208 " AND oid::text = files.filename\n";
209 res = PQexec(conn, sql);
210
211 if (PQresultStatus(res) != PGRES_TUPLES_OK)
212 pg_fatal("could not fetch file list: %s",
213 PQresultErrorMessage(res));
214
215 /* sanity check the result set */
216 if (PQnfields(res) != 4)
217 pg_fatal("unexpected result set while fetching file list\n");
218
219 /* Read result to local variables */
220 for (i = 0; i < PQntuples(res); i++)
221 {
222 char *path;
223 int64 filesize;
224 bool isdir;
225 char *link_target;
226 file_type_t type;
227
228 if (PQgetisnull(res, i, 1))
229 {
230 /*
231 * The file was removed from the server while the query was
232 * running. Ignore it.
233 */
234 continue;
235 }
236
237 path = PQgetvalue(res, i, 0);
238 filesize = atol(PQgetvalue(res, i, 1));
239 isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
240 link_target = PQgetvalue(res, i, 3);
241
242 if (link_target[0])
243 type = FILE_TYPE_SYMLINK;
244 else if (isdir)
245 type = FILE_TYPE_DIRECTORY;
246 else
247 type = FILE_TYPE_REGULAR;
248
249 process_source_file(path, type, filesize, link_target);
250 }
251 PQclear(res);
252 }
253
254 /*
255 * Converts an int64 from network byte order to native format.
256 */
257 static int64
pg_recvint64(int64 value)258 pg_recvint64(int64 value)
259 {
260 union
261 {
262 int64 i64;
263 uint32 i32[2];
264 } swap;
265 int64 result;
266
267 swap.i64 = value;
268
269 result = (uint32) ntohl(swap.i32[0]);
270 result <<= 32;
271 result |= (uint32) ntohl(swap.i32[1]);
272
273 return result;
274 }
275
276 /*----
277 * Runs a query, which returns pieces of files from the remote source data
278 * directory, and overwrites the corresponding parts of target files with
279 * the received parts. The result set is expected to be of format:
280 *
281 * path text -- path in the data directory, e.g "base/1/123"
282 * begin int8 -- offset within the file
283 * chunk bytea -- file content
284 *----
285 */
286 static void
receiveFileChunks(const char * sql)287 receiveFileChunks(const char *sql)
288 {
289 PGresult *res;
290
291 if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
292 pg_fatal("could not send query: %s", PQerrorMessage(conn));
293
294 pg_log(PG_DEBUG, "getting file chunks\n");
295
296 if (PQsetSingleRowMode(conn) != 1)
297 pg_fatal("could not set libpq connection to single row mode\n");
298
299 while ((res = PQgetResult(conn)) != NULL)
300 {
301 char *filename;
302 int filenamelen;
303 int64 chunkoff;
304 char chunkoff_str[32];
305 int chunksize;
306 char *chunk;
307
308 switch (PQresultStatus(res))
309 {
310 case PGRES_SINGLE_TUPLE:
311 break;
312
313 case PGRES_TUPLES_OK:
314 PQclear(res);
315 continue; /* final zero-row result */
316
317 default:
318 pg_fatal("unexpected result while fetching remote files: %s",
319 PQresultErrorMessage(res));
320 }
321
322 /* sanity check the result set */
323 if (PQnfields(res) != 3 || PQntuples(res) != 1)
324 pg_fatal("unexpected result set size while fetching remote files\n");
325
326 if (PQftype(res, 0) != TEXTOID ||
327 PQftype(res, 1) != INT8OID ||
328 PQftype(res, 2) != BYTEAOID)
329 {
330 pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u\n",
331 PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
332 }
333
334 if (PQfformat(res, 0) != 1 &&
335 PQfformat(res, 1) != 1 &&
336 PQfformat(res, 2) != 1)
337 {
338 pg_fatal("unexpected result format while fetching remote files\n");
339 }
340
341 if (PQgetisnull(res, 0, 0) ||
342 PQgetisnull(res, 0, 1))
343 {
344 pg_fatal("unexpected null values in result while fetching remote files\n");
345 }
346
347 if (PQgetlength(res, 0, 1) != sizeof(int64))
348 pg_fatal("unexpected result length while fetching remote files\n");
349
350 /* Read result set to local variables */
351 memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
352 chunkoff = pg_recvint64(chunkoff);
353 chunksize = PQgetlength(res, 0, 2);
354
355 filenamelen = PQgetlength(res, 0, 0);
356 filename = pg_malloc(filenamelen + 1);
357 memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
358 filename[filenamelen] = '\0';
359
360 chunk = PQgetvalue(res, 0, 2);
361
362 /*
363 * If a file has been deleted on the source, remove it on the target
364 * as well. Note that multiple unlink() calls may happen on the same
365 * file if multiple data chunks are associated with it, hence ignore
366 * unconditionally anything missing. If this file is not a relation
367 * data file, then it has been already truncated when creating the
368 * file chunk list at the previous execution of the filemap.
369 */
370 if (PQgetisnull(res, 0, 2))
371 {
372 pg_log(PG_DEBUG,
373 "received null value for chunk for file \"%s\", file has been deleted\n",
374 filename);
375 remove_target_file(filename, true);
376 pg_free(filename);
377 PQclear(res);
378 continue;
379 }
380
381 /*
382 * Separate step to keep platform-dependent format code out of
383 * translatable strings.
384 */
385 snprintf(chunkoff_str, sizeof(chunkoff_str), INT64_FORMAT, chunkoff);
386 pg_log(PG_DEBUG, "received chunk for file \"%s\", offset %s, size %d\n",
387 filename, chunkoff_str, chunksize);
388
389 open_target_file(filename, false);
390
391 write_target_range(chunk, chunkoff, chunksize);
392
393 pg_free(filename);
394
395 PQclear(res);
396 }
397 }
398
399 /*
400 * Receive a single file as a malloc'd buffer.
401 */
402 char *
libpqGetFile(const char * filename,size_t * filesize)403 libpqGetFile(const char *filename, size_t *filesize)
404 {
405 PGresult *res;
406 char *result;
407 int len;
408 const char *paramValues[1];
409
410 paramValues[0] = filename;
411 res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
412 1, NULL, paramValues, NULL, NULL, 1);
413
414 if (PQresultStatus(res) != PGRES_TUPLES_OK)
415 pg_fatal("could not fetch remote file \"%s\": %s",
416 filename, PQresultErrorMessage(res));
417
418 /* sanity check the result set */
419 if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
420 pg_fatal("unexpected result set while fetching remote file \"%s\"\n",
421 filename);
422
423 /* Read result to local variables */
424 len = PQgetlength(res, 0, 0);
425 result = pg_malloc(len + 1);
426 memcpy(result, PQgetvalue(res, 0, 0), len);
427 result[len] = '\0';
428
429 PQclear(res);
430
431 pg_log(PG_DEBUG, "fetched file \"%s\", length %d\n", filename, len);
432
433 if (filesize)
434 *filesize = len;
435 return result;
436 }
437
438 /*
439 * Write a file range to a temporary table in the server.
440 *
441 * The range is sent to the server as a COPY formatted line, to be inserted
442 * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
443 * function to actually fetch the data.
444 */
445 static void
fetch_file_range(const char * path,uint64 begin,uint64 end)446 fetch_file_range(const char *path, uint64 begin, uint64 end)
447 {
448 char linebuf[MAXPGPATH + 23];
449
450 /* Split the range into CHUNKSIZE chunks */
451 while (end - begin > 0)
452 {
453 unsigned int len;
454
455 /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
456 if (end - begin > CHUNKSIZE)
457 len = CHUNKSIZE;
458 else
459 len = (unsigned int) (end - begin);
460
461 snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
462
463 if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
464 pg_fatal("could not send COPY data: %s",
465 PQerrorMessage(conn));
466
467 begin += len;
468 }
469 }
470
471 /*
472 * Fetch all changed blocks from remote source data directory.
473 */
474 void
libpq_executeFileMap(filemap_t * map)475 libpq_executeFileMap(filemap_t *map)
476 {
477 file_entry_t *entry;
478 const char *sql;
479 PGresult *res;
480 int i;
481
482 /*
483 * First create a temporary table, and load it with the blocks that we
484 * need to fetch.
485 */
486 sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
487 run_simple_command(sql);
488
489 sql = "COPY fetchchunks FROM STDIN";
490 res = PQexec(conn, sql);
491
492 if (PQresultStatus(res) != PGRES_COPY_IN)
493 pg_fatal("could not send file list: %s",
494 PQresultErrorMessage(res));
495 PQclear(res);
496
497 for (i = 0; i < map->narray; i++)
498 {
499 entry = map->array[i];
500
501 /* If this is a relation file, copy the modified blocks */
502 execute_pagemap(&entry->pagemap, entry->path);
503
504 switch (entry->action)
505 {
506 case FILE_ACTION_NONE:
507 /* nothing else to do */
508 break;
509
510 case FILE_ACTION_COPY:
511 /* Truncate the old file out of the way, if any */
512 open_target_file(entry->path, true);
513 fetch_file_range(entry->path, 0, entry->newsize);
514 break;
515
516 case FILE_ACTION_TRUNCATE:
517 truncate_target_file(entry->path, entry->newsize);
518 break;
519
520 case FILE_ACTION_COPY_TAIL:
521 fetch_file_range(entry->path, entry->oldsize, entry->newsize);
522 break;
523
524 case FILE_ACTION_REMOVE:
525 remove_target(entry);
526 break;
527
528 case FILE_ACTION_CREATE:
529 create_target(entry);
530 break;
531 }
532 }
533
534 if (PQputCopyEnd(conn, NULL) != 1)
535 pg_fatal("could not send end-of-COPY: %s",
536 PQerrorMessage(conn));
537
538 while ((res = PQgetResult(conn)) != NULL)
539 {
540 if (PQresultStatus(res) != PGRES_COMMAND_OK)
541 pg_fatal("unexpected result while sending file list: %s",
542 PQresultErrorMessage(res));
543 PQclear(res);
544 }
545
546 /*
547 * We've now copied the list of file ranges that we need to fetch to the
548 * temporary table. Now, actually fetch all of those ranges.
549 */
550 sql =
551 "SELECT path, begin,\n"
552 " pg_read_binary_file(path, begin, len, true) AS chunk\n"
553 "FROM fetchchunks\n";
554
555 receiveFileChunks(sql);
556 }
557
558 static void
execute_pagemap(datapagemap_t * pagemap,const char * path)559 execute_pagemap(datapagemap_t *pagemap, const char *path)
560 {
561 datapagemap_iterator_t *iter;
562 BlockNumber blkno;
563 off_t offset;
564
565 iter = datapagemap_iterate(pagemap);
566 while (datapagemap_next(iter, &blkno))
567 {
568 offset = blkno * BLCKSZ;
569
570 fetch_file_range(path, offset, offset + BLCKSZ);
571 }
572 pg_free(iter);
573 }
574