1 /***********************************************************************************************************************************
2 Archive Common
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5
6 #include <stdint.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <sys/wait.h>
10 #include <unistd.h>
11
12 #include "command/archive/common.h"
13 #include "common/debug.h"
14 #include "common/fork.h"
15 #include "common/log.h"
16 #include "common/memContext.h"
17 #include "common/regExp.h"
18 #include "common/wait.h"
19 #include "config/config.h"
20 #include "postgres/version.h"
21 #include "storage/helper.h"
22 #include "storage/helper.h"
23
24 /***********************************************************************************************************************************
25 WAL segment constants
26 ***********************************************************************************************************************************/
27 STRING_EXTERN(WAL_SEGMENT_REGEXP_STR, WAL_SEGMENT_REGEXP);
28 STRING_EXTERN(WAL_SEGMENT_PARTIAL_REGEXP_STR, WAL_SEGMENT_PARTIAL_REGEXP);
29 STRING_EXTERN(WAL_SEGMENT_DIR_REGEXP_STR, WAL_SEGMENT_DIR_REGEXP);
30 STRING_EXTERN(WAL_SEGMENT_FILE_REGEXP_STR, WAL_SEGMENT_FILE_REGEXP);
31 STRING_EXTERN(WAL_TIMELINE_HISTORY_REGEXP_STR, WAL_TIMELINE_HISTORY_REGEXP);
32
33 /***********************************************************************************************************************************
34 Global error file constant
35 ***********************************************************************************************************************************/
36 #define STATUS_FILE_GLOBAL "global"
37 STRING_STATIC(STATUS_FILE_GLOBAL_STR, STATUS_FILE_GLOBAL);
38
39 #define STATUS_FILE_GLOBAL_ERROR STATUS_FILE_GLOBAL STATUS_EXT_ERROR
40 STRING_STATIC(STATUS_FILE_GLOBAL_ERROR_STR, STATUS_FILE_GLOBAL_ERROR);
41
42 /***********************************************************************************************************************************
43 Get the correct spool queue based on the archive mode
44 ***********************************************************************************************************************************/
45 static const String *
archiveAsyncSpoolQueue(ArchiveMode archiveMode)46 archiveAsyncSpoolQueue(ArchiveMode archiveMode)
47 {
48 FUNCTION_TEST_BEGIN();
49 FUNCTION_TEST_PARAM(STRING_ID, archiveMode);
50 FUNCTION_TEST_END();
51
52 FUNCTION_TEST_RETURN((archiveMode == archiveModeGet ? STORAGE_SPOOL_ARCHIVE_IN_STR : STORAGE_SPOOL_ARCHIVE_OUT_STR));
53 }
54
55 /**********************************************************************************************************************************/
56 void
archiveAsyncErrorClear(ArchiveMode archiveMode,const String * archiveFile)57 archiveAsyncErrorClear(ArchiveMode archiveMode, const String *archiveFile)
58 {
59 FUNCTION_LOG_BEGIN(logLevelDebug);
60 FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
61 FUNCTION_LOG_PARAM(STRING, archiveFile);
62 FUNCTION_LOG_END();
63
64 ASSERT(archiveFile != NULL);
65
66 storageRemoveP(storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s" STATUS_EXT_ERROR, strZ(archiveFile)));
67 storageRemoveP(storageSpoolWrite(), STRDEF(STORAGE_SPOOL_ARCHIVE_OUT "/" STATUS_FILE_GLOBAL_ERROR));
68
69 FUNCTION_LOG_RETURN_VOID();
70 }
71
72 /**********************************************************************************************************************************/
73 bool
archiveAsyncStatus(ArchiveMode archiveMode,const String * walSegment,bool throwOnError,bool warnOnOk)74 archiveAsyncStatus(ArchiveMode archiveMode, const String *walSegment, bool throwOnError, bool warnOnOk)
75 {
76 FUNCTION_LOG_BEGIN(logLevelDebug);
77 FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
78 FUNCTION_LOG_PARAM(STRING, walSegment);
79 FUNCTION_LOG_PARAM(BOOL, throwOnError);
80 FUNCTION_LOG_PARAM(BOOL, warnOnOk);
81 FUNCTION_LOG_END();
82
83 ASSERT(walSegment != NULL);
84
85 bool result = false;
86
87 MEM_CONTEXT_TEMP_BEGIN()
88 {
89 const String *errorFile = NULL;
90 bool errorFileExists = false;
91
92 const String *spoolQueue = archiveAsyncSpoolQueue(archiveMode);
93
94 String *okFile = strNewFmt("%s" STATUS_EXT_OK, strZ(walSegment));
95 bool okFileExists = storageExistsP(storageSpool(), strNewFmt("%s/%s", strZ(spoolQueue), strZ(okFile)));
96
97 // If the ok file does not exist then check to see if a file-specific or global error exists
98 if (!okFileExists)
99 {
100 // Check for a file-specific error first
101 errorFile = strNewFmt("%s" STATUS_EXT_ERROR, strZ(walSegment));
102 errorFileExists = storageExistsP(storageSpool(), strNewFmt("%s/%s", strZ(spoolQueue), strZ(errorFile)));
103
104 // If that doesn't exist then check for a global error
105 if (!errorFileExists)
106 {
107 errorFile = STATUS_FILE_GLOBAL_ERROR_STR;
108 errorFileExists = storageExistsP(storageSpool(), strNewFmt("%s/%s", strZ(spoolQueue), strZ(errorFile)));
109 }
110 }
111
112 // If either of them exists then check what happened and report back
113 if (okFileExists || errorFileExists)
114 {
115 // Get the status file content
116 const String *statusFile = okFileExists ? okFile: errorFile;
117
118 String *content = strNewBuf(
119 storageGetP(storageNewReadP(storageSpool(), strNewFmt("%s/%s", strZ(spoolQueue), strZ(statusFile)))));
120
121 // Get the code and message if the file has content
122 int code = 0;
123 const String *message = NULL;
124
125 if (strSize(content) != 0)
126 {
127 // Find the line feed after the error code -- should be the first one
128 const char *linefeedPtr = strchr(strZ(content), '\n');
129
130 // Error if linefeed not found
131 if (linefeedPtr == NULL)
132 THROW_FMT(FormatError, "%s content must have at least two lines", strZ(statusFile));
133
134 // Error if message is zero-length
135 if (strlen(linefeedPtr + 1) == 0)
136 THROW_FMT(FormatError, "%s message must be > 0", strZ(statusFile));
137
138 // Get contents
139 code = varIntForce(VARSTR(strNewN(strZ(content), (size_t)(linefeedPtr - strZ(content)))));
140 message = strTrim(strNewZ(linefeedPtr + 1));
141 }
142
143 // Process OK files
144 if (okFileExists)
145 {
146 // If there is content in the status file it is a warning
147 if (strSize(content) != 0 && warnOnOk)
148 {
149 // If error code is not success, then this was a renamed error file
150 if (code != 0)
151 {
152 message = strNewFmt(
153 "WAL segment '%s' was not pushed due to error [%d] and was manually skipped: %s", strZ(walSegment),
154 code, strZ(message));
155 }
156
157 LOG_WARN(strZ(message));
158 }
159
160 result = true;
161 }
162 else if (throwOnError)
163 {
164 // Error status files must have content
165 if (strSize(content) == 0)
166 THROW_FMT(AssertError, "status file '%s' has no content", strZ(statusFile));
167
168 // Throw error using the code passed in the file
169 THROW_CODE(code, strZ(message));
170 }
171 }
172 }
173 MEM_CONTEXT_TEMP_END();
174
175 FUNCTION_LOG_RETURN(BOOL, result);
176 }
177
178 /**********************************************************************************************************************************/
179 void
archiveAsyncStatusErrorWrite(ArchiveMode archiveMode,const String * walSegment,int code,const String * message)180 archiveAsyncStatusErrorWrite(ArchiveMode archiveMode, const String *walSegment, int code, const String *message)
181 {
182 FUNCTION_LOG_BEGIN(logLevelDebug);
183 FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
184 FUNCTION_LOG_PARAM(STRING, walSegment);
185 FUNCTION_LOG_PARAM(INT, code);
186 FUNCTION_LOG_PARAM(STRING, message);
187 FUNCTION_LOG_END();
188
189 ASSERT(code != 0);
190 ASSERT(message != NULL);
191
192 MEM_CONTEXT_TEMP_BEGIN()
193 {
194 const String *errorFile = walSegment == NULL ? STATUS_FILE_GLOBAL_STR : walSegment;
195
196 storagePutP(
197 storageNewWriteP(
198 storageSpoolWrite(),
199 strNewFmt("%s/%s" STATUS_EXT_ERROR, strZ(archiveAsyncSpoolQueue(archiveMode)), strZ(errorFile))),
200 BUFSTR(strNewFmt("%d\n%s", code, strZ(message))));
201 }
202 MEM_CONTEXT_TEMP_END();
203
204 FUNCTION_LOG_RETURN_VOID();
205 }
206
207 /**********************************************************************************************************************************/
208 void
archiveAsyncStatusOkWrite(ArchiveMode archiveMode,const String * walSegment,const String * warning)209 archiveAsyncStatusOkWrite(ArchiveMode archiveMode, const String *walSegment, const String *warning)
210 {
211 FUNCTION_LOG_BEGIN(logLevelDebug);
212 FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
213 FUNCTION_LOG_PARAM(STRING, walSegment);
214 FUNCTION_LOG_PARAM(STRING, warning);
215 FUNCTION_LOG_END();
216
217 ASSERT(walSegment != NULL);
218
219 MEM_CONTEXT_TEMP_BEGIN()
220 {
221 // Write file
222 storagePutP(
223 storageNewWriteP(
224 storageSpoolWrite(), strNewFmt("%s/%s" STATUS_EXT_OK, strZ(archiveAsyncSpoolQueue(archiveMode)), strZ(walSegment))),
225 warning == NULL ? NULL : BUFSTR(strNewFmt("0\n%s", strZ(warning))));
226 }
227 MEM_CONTEXT_TEMP_END();
228
229 FUNCTION_LOG_RETURN_VOID();
230 }
231
232 /**********************************************************************************************************************************/
233 void
archiveAsyncExec(ArchiveMode archiveMode,const StringList * commandExec)234 archiveAsyncExec(ArchiveMode archiveMode, const StringList *commandExec)
235 {
236 FUNCTION_LOG_BEGIN(logLevelDebug);
237 FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
238 FUNCTION_LOG_PARAM(STRING_LIST, commandExec);
239 FUNCTION_LOG_END();
240
241 ASSERT(commandExec != NULL);
242
243 // Fork off the async process
244 pid_t pid = forkSafe();
245
246 if (pid == 0)
247 {
248 // Disable logging and close log file
249 logClose();
250
251 // Detach from parent process
252 forkDetach();
253
254 // Close any open file descriptors above the standard three (stdin, stdout, stderr). Don't check the return value since we
255 // don't know which file descriptors are actually open (might be none). It's possible that there are open files >= 1024 but
256 // there is no easy way to detect that and this should give us enough descriptors to do our work.
257 for (int fd = 3; fd < 1024; fd++)
258 close(fd);
259
260 // Execute the binary. This statement will not return if it is successful.
261 THROW_ON_SYS_ERROR_FMT(
262 execvp(strZ(strLstGet(commandExec, 0)), (char ** const)strLstPtr(commandExec)) == -1, ExecuteError,
263 "unable to execute asynchronous '%s'", archiveMode == archiveModeGet ? CFGCMD_ARCHIVE_GET : CFGCMD_ARCHIVE_PUSH);
264 }
265
266 #ifdef DEBUG_EXEC_TIME
267 // Get the time to measure how long it takes for the forked process to exit
268 TimeMSec timeBegin = timeMSec();
269 #endif
270
271 // The process that was just forked should return immediately
272 int processStatus;
273
274 THROW_ON_SYS_ERROR(waitpid(pid, &processStatus, 0) == -1, ExecuteError, "unable to wait for forked process");
275
276 // The first fork should exit with success. If not, something went wrong during the second fork.
277 CHECK(WIFEXITED(processStatus) && WEXITSTATUS(processStatus) == 0);
278
279 #ifdef DEBUG_EXEC_TIME
280 // If the process does not exit immediately then something probably went wrong with the double fork. It's possible that this
281 // test will fail on very slow systems so it may need to be tuned. The idea is to make sure that the waitpid() above is not
282 // waiting on the async process.
283 ASSERT(timeMSec() - timeBegin < 10);
284 #endif
285
286 FUNCTION_LOG_RETURN_VOID();
287 }
288
289 /**********************************************************************************************************************************/
290 int
archiveIdComparator(const void * item1,const void * item2)291 archiveIdComparator(const void *item1, const void *item2)
292 {
293 StringList *archiveSort1 = strLstNewSplitZ(*(String **)item1, "-");
294 StringList *archiveSort2 = strLstNewSplitZ(*(String **)item2, "-");
295 int int1 = atoi(strZ(strLstGet(archiveSort1, 1)));
296 int int2 = atoi(strZ(strLstGet(archiveSort2, 1)));
297
298 return (int1 - int2);
299 }
300
301 /**********************************************************************************************************************************/
302 bool
walIsPartial(const String * walSegment)303 walIsPartial(const String *walSegment)
304 {
305 FUNCTION_LOG_BEGIN(logLevelTrace);
306 FUNCTION_LOG_PARAM(STRING, walSegment);
307 FUNCTION_LOG_END();
308
309 ASSERT(walSegment != NULL);
310 ASSERT(walIsSegment(walSegment));
311
312 FUNCTION_LOG_RETURN(BOOL, strEndsWithZ(walSegment, WAL_SEGMENT_PARTIAL_EXT));
313 }
314
315 /**********************************************************************************************************************************/
316 String *
walPath(const String * walFile,const String * pgPath,const String * command)317 walPath(const String *walFile, const String *pgPath, const String *command)
318 {
319 FUNCTION_LOG_BEGIN(logLevelDebug);
320 FUNCTION_LOG_PARAM(STRING, walFile);
321 FUNCTION_LOG_PARAM(STRING, pgPath);
322 FUNCTION_LOG_PARAM(STRING, command);
323 FUNCTION_LOG_END();
324
325 ASSERT(walFile != NULL);
326 ASSERT(command != NULL);
327
328 String *result = NULL;
329
330 if (!strBeginsWithZ(walFile, "/"))
331 {
332 // Error if walFile has a relative path and pgPath is not set
333 if (pgPath == NULL)
334 {
335 THROW_FMT(
336 OptionRequiredError,
337 "option '%s' must be specified when relative wal paths are used\n"
338 "HINT: is %%f passed to %s instead of %%p?\n"
339 "HINT: PostgreSQL may pass relative paths even with %%p depending on the environment.",
340 cfgOptionName(cfgOptPgPath), strZ(command));
341 }
342
343 // Get the working directory
344 char currentWorkDir[4096];
345 THROW_ON_SYS_ERROR(getcwd(currentWorkDir, sizeof(currentWorkDir)) == NULL, FormatError, "unable to get cwd");
346
347 // Check if the working directory is the same as pgPath
348 if (!strEqZ(pgPath, currentWorkDir))
349 {
350 // If not we'll change the working directory to pgPath and see if that equals the working directory we got called with
351 THROW_ON_SYS_ERROR_FMT(chdir(strZ(pgPath)) != 0, PathMissingError, "unable to chdir() to '%s'", strZ(pgPath));
352
353 // Get the new working directory
354 char newWorkDir[4096];
355 THROW_ON_SYS_ERROR(getcwd(newWorkDir, sizeof(newWorkDir)) == NULL, FormatError, "unable to get cwd");
356
357 // Error if the new working directory is not equal to the original current working directory. This means that PostgreSQL
358 // and pgBackrest have a different idea about where the PostgreSQL data directory is located.
359 if (strcmp(currentWorkDir, newWorkDir) != 0)
360 {
361 THROW_FMT(
362 OptionInvalidValueError,
363 PG_NAME " working directory '%s' is not the same as option %s '%s'\n"
364 "HINT: is the " PG_NAME " data_directory configured the same as the %s option?",
365 currentWorkDir, cfgOptionName(cfgOptPgPath), strZ(pgPath), cfgOptionName(cfgOptPgPath));
366 }
367 }
368
369 result = strNewFmt("%s/%s", strZ(pgPath), strZ(walFile));
370 }
371 else
372 result = strDup(walFile);
373
374 FUNCTION_LOG_RETURN(STRING, result);
375 }
376
377 /**********************************************************************************************************************************/
378 bool
walIsSegment(const String * walSegment)379 walIsSegment(const String *walSegment)
380 {
381 FUNCTION_LOG_BEGIN(logLevelTrace);
382 FUNCTION_LOG_PARAM(STRING, walSegment);
383 FUNCTION_LOG_END();
384
385 ASSERT(walSegment != NULL);
386
387 // Create the regular expression to identify WAL segments if it does not already exist
388 static RegExp *regExpSegment = NULL;
389
390 if (regExpSegment == NULL)
391 {
392 MEM_CONTEXT_BEGIN(memContextTop())
393 {
394 regExpSegment = regExpNew(WAL_SEGMENT_PARTIAL_REGEXP_STR);
395 }
396 MEM_CONTEXT_END();
397 }
398
399 FUNCTION_LOG_RETURN(BOOL, regExpMatch(regExpSegment, walSegment));
400 }
401
402 /**********************************************************************************************************************************/
403 String *
walSegmentFind(const Storage * storage,const String * archiveId,const String * walSegment,TimeMSec timeout)404 walSegmentFind(const Storage *storage, const String *archiveId, const String *walSegment, TimeMSec timeout)
405 {
406 FUNCTION_LOG_BEGIN(logLevelDebug);
407 FUNCTION_LOG_PARAM(STORAGE, storage);
408 FUNCTION_LOG_PARAM(STRING, archiveId);
409 FUNCTION_LOG_PARAM(STRING, walSegment);
410 FUNCTION_LOG_PARAM(TIME_MSEC, timeout);
411 FUNCTION_LOG_END();
412
413 ASSERT(storage != NULL);
414 ASSERT(archiveId != NULL);
415 ASSERT(walSegment != NULL);
416 ASSERT(walIsSegment(walSegment));
417
418 String *result = NULL;
419
420 MEM_CONTEXT_TEMP_BEGIN()
421 {
422 Wait *wait = waitNew(timeout);
423
424 do
425 {
426 // Get a list of all WAL segments that match
427 StringList *list = storageListP(
428 storage, strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(archiveId), strZ(strSubN(walSegment, 0, 16))),
429 .expression = strNewFmt(
430 "^%s%s-[0-f]{40}" COMPRESS_TYPE_REGEXP "{0,1}$", strZ(strSubN(walSegment, 0, 24)),
431 walIsPartial(walSegment) ? WAL_SEGMENT_PARTIAL_EXT : ""),
432 .nullOnMissing = true);
433
434 // If there are results
435 if (list != NULL && !strLstEmpty(list))
436 {
437 // Error if there is more than one match
438 if (strLstSize(list) > 1)
439 {
440 THROW_FMT(
441 ArchiveDuplicateError,
442 "duplicates found in archive for WAL segment %s: %s\n"
443 "HINT: are multiple primaries archiving to this stanza?",
444 strZ(walSegment), strZ(strLstJoin(strLstSort(list, sortOrderAsc), ", ")));
445 }
446
447 // Copy file name of WAL segment found into the prior context
448 MEM_CONTEXT_PRIOR_BEGIN()
449 {
450 result = strDup(strLstGet(list, 0));
451 }
452 MEM_CONTEXT_PRIOR_END();
453 }
454 }
455 while (result == NULL && waitMore(wait));
456 }
457 MEM_CONTEXT_TEMP_END();
458
459 if (result == NULL && timeout != 0)
460 {
461 THROW_FMT(
462 ArchiveTimeoutError,
463 "WAL segment %s was not archived before the %" PRIu64 "ms timeout\n"
464 "HINT: check the archive_command to ensure that all options are correct (especially --stanza).\n"
465 "HINT: check the PostgreSQL server log for errors.\n"
466 "HINT: run the 'start' command if the stanza was previously stopped.",
467 strZ(walSegment), timeout);
468 }
469
470 FUNCTION_LOG_RETURN(STRING, result);
471 }
472
473 /**********************************************************************************************************************************/
474 String *
walSegmentNext(const String * walSegment,size_t walSegmentSize,unsigned int pgVersion)475 walSegmentNext(const String *walSegment, size_t walSegmentSize, unsigned int pgVersion)
476 {
477 FUNCTION_LOG_BEGIN(logLevelTrace);
478 FUNCTION_LOG_PARAM(STRING, walSegment);
479 FUNCTION_LOG_PARAM(SIZE, walSegmentSize);
480 FUNCTION_LOG_PARAM(UINT, pgVersion);
481 FUNCTION_LOG_END();
482
483 ASSERT(walSegment != NULL);
484 ASSERT(strSize(walSegment) == 24);
485 ASSERT(UINT32_MAX % walSegmentSize == walSegmentSize - 1);
486 ASSERT(pgVersion >= PG_VERSION_11 || walSegmentSize == 16 * 1024 * 1024);
487
488 // Extract WAL parts
489 uint32_t timeline = 0;
490 uint32_t major = 0;
491 uint32_t minor = 0;
492
493 MEM_CONTEXT_TEMP_BEGIN()
494 {
495 timeline = (uint32_t)strtol(strZ(strSubN(walSegment, 0, 8)), NULL, 16);
496 major = (uint32_t)strtol(strZ(strSubN(walSegment, 8, 8)), NULL, 16);
497 minor = (uint32_t)strtol(strZ(strSubN(walSegment, 16, 8)), NULL, 16);
498
499 // Increment minor and adjust major dir on overflow
500 minor++;
501
502 if (minor > UINT32_MAX / walSegmentSize)
503 {
504 major++;
505 minor = 0;
506 }
507
508 // Special hack for PostgreSQL < 9.3 which skipped minor FF
509 if (minor == 0xFF && pgVersion < PG_VERSION_93)
510 {
511 major++;
512 minor = 0;
513 }
514 }
515 MEM_CONTEXT_TEMP_END();
516
517 FUNCTION_LOG_RETURN(STRING, strNewFmt("%08X%08X%08X", timeline, major, minor));
518 }
519
520 /**********************************************************************************************************************************/
521 StringList *
walSegmentRange(const String * walSegmentBegin,size_t walSegmentSize,unsigned int pgVersion,unsigned int range)522 walSegmentRange(const String *walSegmentBegin, size_t walSegmentSize, unsigned int pgVersion, unsigned int range)
523 {
524 FUNCTION_LOG_BEGIN(logLevelDebug);
525 FUNCTION_LOG_PARAM(STRING, walSegmentBegin);
526 FUNCTION_LOG_PARAM(SIZE, walSegmentSize);
527 FUNCTION_LOG_PARAM(UINT, pgVersion);
528 FUNCTION_LOG_END();
529
530 ASSERT(range > 0);
531
532 StringList *result = NULL;
533
534 MEM_CONTEXT_TEMP_BEGIN()
535 {
536 result = strLstNew();
537 strLstAdd(result, walSegmentBegin);
538
539 if (range > 1)
540 {
541 String *current = strDup(walSegmentBegin);
542
543 for (unsigned int rangeIdx = 0; rangeIdx < range - 1; rangeIdx++)
544 {
545 String *next = walSegmentNext(current, walSegmentSize, pgVersion);
546
547 strLstAdd(result, next);
548
549 strFree(current);
550 current = next;
551 }
552 }
553
554 strLstMove(result, memContextPrior());
555 }
556 MEM_CONTEXT_TEMP_END();
557
558 FUNCTION_LOG_RETURN(STRING_LIST, result);
559 }
560