1 /*-------------------------------------------------------------------------
2 *
3 * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
4 * pg_recvlogical
5 *
6 * Author: Magnus Hagander <magnus@hagander.net>
7 *
8 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9 *
10 * IDENTIFICATION
11 * src/bin/pg_basebackup/streamutil.c
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres_fe.h"
16
17 #include <sys/time.h>
18 #include <unistd.h>
19
20 /* for ntohl/htonl */
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
23
24 /* local includes */
25 #include "receivelog.h"
26 #include "streamutil.h"
27
28 #include "pqexpbuffer.h"
29 #include "common/fe_memutils.h"
30 #include "datatype/timestamp.h"
31 #include "fe_utils/connect.h"
32
33 #define ERRCODE_DUPLICATE_OBJECT "42710"
34
35 const char *progname;
36 char *connection_string = NULL;
37 char *dbhost = NULL;
38 char *dbuser = NULL;
39 char *dbport = NULL;
40 char *dbname = NULL;
41 int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
42 static bool have_password = false;
43 static char password[100];
44 PGconn *conn = NULL;
45
46 /*
47 * Connect to the server. Returns a valid PGconn pointer if connected,
48 * or NULL on non-permanent error. On permanent error, the function will
49 * call exit(1) directly.
50 */
51 PGconn *
GetConnection(void)52 GetConnection(void)
53 {
54 PGconn *tmpconn;
55 int argcount = 7; /* dbname, replication, fallback_app_name,
56 * host, user, port, password */
57 int i;
58 const char **keywords;
59 const char **values;
60 const char *tmpparam;
61 bool need_password;
62 PQconninfoOption *conn_opts = NULL;
63 PQconninfoOption *conn_opt;
64 char *err_msg = NULL;
65
66 /* pg_recvlogical uses dbname only; others use connection_string only. */
67 Assert(dbname == NULL || connection_string == NULL);
68
69 /*
70 * Merge the connection info inputs given in form of connection string,
71 * options and default values (dbname=replication, replication=true, etc.)
72 * Explicitly discard any dbname value in the connection string;
73 * otherwise, PQconnectdbParams() would interpret that value as being
74 * itself a connection string.
75 */
76 i = 0;
77 if (connection_string)
78 {
79 conn_opts = PQconninfoParse(connection_string, &err_msg);
80 if (conn_opts == NULL)
81 {
82 fprintf(stderr, "%s: %s", progname, err_msg);
83 exit(1);
84 }
85
86 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
87 {
88 if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
89 strcmp(conn_opt->keyword, "dbname") != 0)
90 argcount++;
91 }
92
93 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
94 values = pg_malloc0((argcount + 1) * sizeof(*values));
95
96 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
97 {
98 if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
99 strcmp(conn_opt->keyword, "dbname") != 0)
100 {
101 keywords[i] = conn_opt->keyword;
102 values[i] = conn_opt->val;
103 i++;
104 }
105 }
106 }
107 else
108 {
109 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
110 values = pg_malloc0((argcount + 1) * sizeof(*values));
111 }
112
113 keywords[i] = "dbname";
114 values[i] = dbname == NULL ? "replication" : dbname;
115 i++;
116 keywords[i] = "replication";
117 values[i] = dbname == NULL ? "true" : "database";
118 i++;
119 keywords[i] = "fallback_application_name";
120 values[i] = progname;
121 i++;
122
123 if (dbhost)
124 {
125 keywords[i] = "host";
126 values[i] = dbhost;
127 i++;
128 }
129 if (dbuser)
130 {
131 keywords[i] = "user";
132 values[i] = dbuser;
133 i++;
134 }
135 if (dbport)
136 {
137 keywords[i] = "port";
138 values[i] = dbport;
139 i++;
140 }
141
142 /* If -W was given, force prompt for password, but only the first time */
143 need_password = (dbgetpassword == 1 && !have_password);
144
145 do
146 {
147 /* Get a new password if appropriate */
148 if (need_password)
149 {
150 simple_prompt("Password: ", password, sizeof(password), false);
151 have_password = true;
152 need_password = false;
153 }
154
155 /* Use (or reuse, on a subsequent connection) password if we have it */
156 if (have_password)
157 {
158 keywords[i] = "password";
159 values[i] = password;
160 }
161 else
162 {
163 keywords[i] = NULL;
164 values[i] = NULL;
165 }
166
167 tmpconn = PQconnectdbParams(keywords, values, true);
168
169 /*
170 * If there is too little memory even to allocate the PGconn object
171 * and PQconnectdbParams returns NULL, we call exit(1) directly.
172 */
173 if (!tmpconn)
174 {
175 fprintf(stderr, _("%s: could not connect to server\n"),
176 progname);
177 exit(1);
178 }
179
180 /* If we need a password and -w wasn't given, loop back and get one */
181 if (PQstatus(tmpconn) == CONNECTION_BAD &&
182 PQconnectionNeedsPassword(tmpconn) &&
183 dbgetpassword != -1)
184 {
185 PQfinish(tmpconn);
186 need_password = true;
187 }
188 }
189 while (need_password);
190
191 if (PQstatus(tmpconn) != CONNECTION_OK)
192 {
193 fprintf(stderr, _("%s: %s"), progname, PQerrorMessage(tmpconn));
194 PQfinish(tmpconn);
195 free(values);
196 free(keywords);
197 if (conn_opts)
198 PQconninfoFree(conn_opts);
199 return NULL;
200 }
201
202 /* Connection ok! */
203 free(values);
204 free(keywords);
205 if (conn_opts)
206 PQconninfoFree(conn_opts);
207
208 /*
209 * Set always-secure search path, so malicious users can't get control.
210 * The capacity to run normal SQL queries was added in PostgreSQL
211 * 10, so the search path cannot be changed (by us or attackers) on
212 * earlier versions.
213 */
214 if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
215 {
216 PGresult *res;
217
218 res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
219 if (PQresultStatus(res) != PGRES_TUPLES_OK)
220 {
221 fprintf(stderr, _("%s: could not clear search_path: %s"),
222 progname, PQerrorMessage(tmpconn));
223 PQclear(res);
224 PQfinish(tmpconn);
225 exit(1);
226 }
227 PQclear(res);
228 }
229
230 /*
231 * Ensure we have the same value of integer_datetimes (now always "on") as
232 * the server we are connecting to.
233 */
234 tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
235 if (!tmpparam)
236 {
237 fprintf(stderr,
238 _("%s: could not determine server setting for integer_datetimes\n"),
239 progname);
240 PQfinish(tmpconn);
241 exit(1);
242 }
243
244 if (strcmp(tmpparam, "on") != 0)
245 {
246 fprintf(stderr,
247 _("%s: integer_datetimes compile flag does not match server\n"),
248 progname);
249 PQfinish(tmpconn);
250 exit(1);
251 }
252
253 return tmpconn;
254 }
255
256 /*
257 * Run IDENTIFY_SYSTEM through a given connection and give back to caller
258 * some result information if requested:
259 * - System identifier
260 * - Current timeline ID
261 * - Start LSN position
262 * - Database name (NULL in servers prior to 9.4)
263 */
264 bool
RunIdentifySystem(PGconn * conn,char ** sysid,TimeLineID * starttli,XLogRecPtr * startpos,char ** db_name)265 RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
266 XLogRecPtr *startpos, char **db_name)
267 {
268 PGresult *res;
269 uint32 hi,
270 lo;
271
272 /* Check connection existence */
273 Assert(conn != NULL);
274
275 res = PQexec(conn, "IDENTIFY_SYSTEM");
276 if (PQresultStatus(res) != PGRES_TUPLES_OK)
277 {
278 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
279 progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
280
281 PQclear(res);
282 return false;
283 }
284 if (PQntuples(res) != 1 || PQnfields(res) < 3)
285 {
286 fprintf(stderr,
287 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
288 progname, PQntuples(res), PQnfields(res), 1, 3);
289
290 PQclear(res);
291 return false;
292 }
293
294 /* Get system identifier */
295 if (sysid != NULL)
296 *sysid = pg_strdup(PQgetvalue(res, 0, 0));
297
298 /* Get timeline ID to start streaming from */
299 if (starttli != NULL)
300 *starttli = atoi(PQgetvalue(res, 0, 1));
301
302 /* Get LSN start position if necessary */
303 if (startpos != NULL)
304 {
305 if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
306 {
307 fprintf(stderr,
308 _("%s: could not parse write-ahead log location \"%s\"\n"),
309 progname, PQgetvalue(res, 0, 2));
310
311 PQclear(res);
312 return false;
313 }
314 *startpos = ((uint64) hi) << 32 | lo;
315 }
316
317 /* Get database name, only available in 9.4 and newer versions */
318 if (db_name != NULL)
319 {
320 *db_name = NULL;
321 if (PQserverVersion(conn) >= 90400)
322 {
323 if (PQnfields(res) < 4)
324 {
325 fprintf(stderr,
326 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
327 progname, PQntuples(res), PQnfields(res), 1, 4);
328
329 PQclear(res);
330 return false;
331 }
332 if (!PQgetisnull(res, 0, 3))
333 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
334 }
335 }
336
337 PQclear(res);
338 return true;
339 }
340
341 /*
342 * Create a replication slot for the given connection. This function
343 * returns true in case of success.
344 */
345 bool
CreateReplicationSlot(PGconn * conn,const char * slot_name,const char * plugin,bool is_physical,bool slot_exists_ok)346 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
347 bool is_physical, bool slot_exists_ok)
348 {
349 PQExpBuffer query;
350 PGresult *res;
351
352 query = createPQExpBuffer();
353
354 Assert((is_physical && plugin == NULL) ||
355 (!is_physical && plugin != NULL));
356 Assert(slot_name != NULL);
357
358 /* Build query */
359 if (is_physical)
360 appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
361 slot_name);
362 else
363 {
364 appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
365 slot_name, plugin);
366 if (PQserverVersion(conn) >= 100000)
367 /* pg_recvlogical doesn't use an exported snapshot, so suppress */
368 appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
369 }
370
371 res = PQexec(conn, query->data);
372 if (PQresultStatus(res) != PGRES_TUPLES_OK)
373 {
374 const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
375
376 if (slot_exists_ok &&
377 sqlstate &&
378 strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
379 {
380 destroyPQExpBuffer(query);
381 PQclear(res);
382 return true;
383 }
384 else
385 {
386 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
387 progname, query->data, PQerrorMessage(conn));
388
389 destroyPQExpBuffer(query);
390 PQclear(res);
391 return false;
392 }
393 }
394
395 if (PQntuples(res) != 1 || PQnfields(res) != 4)
396 {
397 fprintf(stderr,
398 _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
399 progname, slot_name,
400 PQntuples(res), PQnfields(res), 1, 4);
401
402 destroyPQExpBuffer(query);
403 PQclear(res);
404 return false;
405 }
406
407 destroyPQExpBuffer(query);
408 PQclear(res);
409 return true;
410 }
411
412 /*
413 * Drop a replication slot for the given connection. This function
414 * returns true in case of success.
415 */
416 bool
DropReplicationSlot(PGconn * conn,const char * slot_name)417 DropReplicationSlot(PGconn *conn, const char *slot_name)
418 {
419 PQExpBuffer query;
420 PGresult *res;
421
422 Assert(slot_name != NULL);
423
424 query = createPQExpBuffer();
425
426 /* Build query */
427 appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
428 slot_name);
429 res = PQexec(conn, query->data);
430 if (PQresultStatus(res) != PGRES_COMMAND_OK)
431 {
432 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
433 progname, query->data, PQerrorMessage(conn));
434
435 destroyPQExpBuffer(query);
436 PQclear(res);
437 return false;
438 }
439
440 if (PQntuples(res) != 0 || PQnfields(res) != 0)
441 {
442 fprintf(stderr,
443 _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
444 progname, slot_name,
445 PQntuples(res), PQnfields(res), 0, 0);
446
447 destroyPQExpBuffer(query);
448 PQclear(res);
449 return false;
450 }
451
452 destroyPQExpBuffer(query);
453 PQclear(res);
454 return true;
455 }
456
457
458 /*
459 * Frontend version of GetCurrentTimestamp(), since we are not linked with
460 * backend code.
461 */
462 TimestampTz
feGetCurrentTimestamp(void)463 feGetCurrentTimestamp(void)
464 {
465 TimestampTz result;
466 struct timeval tp;
467
468 gettimeofday(&tp, NULL);
469
470 result = (TimestampTz) tp.tv_sec -
471 ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
472 result = (result * USECS_PER_SEC) + tp.tv_usec;
473
474 return result;
475 }
476
477 /*
478 * Frontend version of TimestampDifference(), since we are not linked with
479 * backend code.
480 */
481 void
feTimestampDifference(TimestampTz start_time,TimestampTz stop_time,long * secs,int * microsecs)482 feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
483 long *secs, int *microsecs)
484 {
485 TimestampTz diff = stop_time - start_time;
486
487 if (diff <= 0)
488 {
489 *secs = 0;
490 *microsecs = 0;
491 }
492 else
493 {
494 *secs = (long) (diff / USECS_PER_SEC);
495 *microsecs = (int) (diff % USECS_PER_SEC);
496 }
497 }
498
499 /*
500 * Frontend version of TimestampDifferenceExceeds(), since we are not
501 * linked with backend code.
502 */
503 bool
feTimestampDifferenceExceeds(TimestampTz start_time,TimestampTz stop_time,int msec)504 feTimestampDifferenceExceeds(TimestampTz start_time,
505 TimestampTz stop_time,
506 int msec)
507 {
508 TimestampTz diff = stop_time - start_time;
509
510 return (diff >= msec * INT64CONST(1000));
511 }
512
513 /*
514 * Converts an int64 to network byte order.
515 */
516 void
fe_sendint64(int64 i,char * buf)517 fe_sendint64(int64 i, char *buf)
518 {
519 uint32 n32;
520
521 /* High order half first, since we're doing MSB-first */
522 n32 = (uint32) (i >> 32);
523 n32 = htonl(n32);
524 memcpy(&buf[0], &n32, 4);
525
526 /* Now the low order half */
527 n32 = (uint32) i;
528 n32 = htonl(n32);
529 memcpy(&buf[4], &n32, 4);
530 }
531
532 /*
533 * Converts an int64 from network byte order to native format.
534 */
535 int64
fe_recvint64(char * buf)536 fe_recvint64(char *buf)
537 {
538 int64 result;
539 uint32 h32;
540 uint32 l32;
541
542 memcpy(&h32, buf, 4);
543 memcpy(&l32, buf + 4, 4);
544 h32 = ntohl(h32);
545 l32 = ntohl(l32);
546
547 result = h32;
548 result <<= 32;
549 result |= l32;
550
551 return result;
552 }
553