1 /***********************************************************************************************************************************
2 Archive Common
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5 
6 #include <stdint.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <sys/wait.h>
10 #include <unistd.h>
11 
12 #include "command/archive/common.h"
13 #include "common/debug.h"
14 #include "common/fork.h"
15 #include "common/log.h"
16 #include "common/memContext.h"
17 #include "common/regExp.h"
18 #include "common/wait.h"
19 #include "config/config.h"
20 #include "postgres/version.h"
21 #include "storage/helper.h"
22 #include "storage/helper.h"
23 
24 /***********************************************************************************************************************************
25 WAL segment constants
26 ***********************************************************************************************************************************/
27 STRING_EXTERN(WAL_SEGMENT_REGEXP_STR,                               WAL_SEGMENT_REGEXP);
28 STRING_EXTERN(WAL_SEGMENT_PARTIAL_REGEXP_STR,                       WAL_SEGMENT_PARTIAL_REGEXP);
29 STRING_EXTERN(WAL_SEGMENT_DIR_REGEXP_STR,                           WAL_SEGMENT_DIR_REGEXP);
30 STRING_EXTERN(WAL_SEGMENT_FILE_REGEXP_STR,                          WAL_SEGMENT_FILE_REGEXP);
31 STRING_EXTERN(WAL_TIMELINE_HISTORY_REGEXP_STR,                      WAL_TIMELINE_HISTORY_REGEXP);
32 
33 /***********************************************************************************************************************************
34 Global error file constant
35 ***********************************************************************************************************************************/
36 #define STATUS_FILE_GLOBAL                                          "global"
37     STRING_STATIC(STATUS_FILE_GLOBAL_STR,                           STATUS_FILE_GLOBAL);
38 
39 #define STATUS_FILE_GLOBAL_ERROR                                    STATUS_FILE_GLOBAL STATUS_EXT_ERROR
40     STRING_STATIC(STATUS_FILE_GLOBAL_ERROR_STR,                         STATUS_FILE_GLOBAL_ERROR);
41 
42 /***********************************************************************************************************************************
43 Get the correct spool queue based on the archive mode
44 ***********************************************************************************************************************************/
45 static const String *
archiveAsyncSpoolQueue(ArchiveMode archiveMode)46 archiveAsyncSpoolQueue(ArchiveMode archiveMode)
47 {
48     FUNCTION_TEST_BEGIN();
49         FUNCTION_TEST_PARAM(STRING_ID, archiveMode);
50     FUNCTION_TEST_END();
51 
52     FUNCTION_TEST_RETURN((archiveMode == archiveModeGet ? STORAGE_SPOOL_ARCHIVE_IN_STR : STORAGE_SPOOL_ARCHIVE_OUT_STR));
53 }
54 
55 /**********************************************************************************************************************************/
56 void
archiveAsyncErrorClear(ArchiveMode archiveMode,const String * archiveFile)57 archiveAsyncErrorClear(ArchiveMode archiveMode, const String *archiveFile)
58 {
59     FUNCTION_LOG_BEGIN(logLevelDebug);
60         FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
61         FUNCTION_LOG_PARAM(STRING, archiveFile);
62     FUNCTION_LOG_END();
63 
64     ASSERT(archiveFile != NULL);
65 
66     storageRemoveP(storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s" STATUS_EXT_ERROR, strZ(archiveFile)));
67     storageRemoveP(storageSpoolWrite(), STRDEF(STORAGE_SPOOL_ARCHIVE_OUT "/" STATUS_FILE_GLOBAL_ERROR));
68 
69     FUNCTION_LOG_RETURN_VOID();
70 }
71 
72 /**********************************************************************************************************************************/
73 bool
archiveAsyncStatus(ArchiveMode archiveMode,const String * walSegment,bool throwOnError,bool warnOnOk)74 archiveAsyncStatus(ArchiveMode archiveMode, const String *walSegment, bool throwOnError, bool warnOnOk)
75 {
76     FUNCTION_LOG_BEGIN(logLevelDebug);
77         FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
78         FUNCTION_LOG_PARAM(STRING, walSegment);
79         FUNCTION_LOG_PARAM(BOOL, throwOnError);
80         FUNCTION_LOG_PARAM(BOOL, warnOnOk);
81     FUNCTION_LOG_END();
82 
83     ASSERT(walSegment != NULL);
84 
85     bool result = false;
86 
87     MEM_CONTEXT_TEMP_BEGIN()
88     {
89         const String *errorFile = NULL;
90         bool errorFileExists = false;
91 
92         const String *spoolQueue = archiveAsyncSpoolQueue(archiveMode);
93 
94         String *okFile = strNewFmt("%s" STATUS_EXT_OK, strZ(walSegment));
95         bool okFileExists = storageExistsP(storageSpool(), strNewFmt("%s/%s", strZ(spoolQueue), strZ(okFile)));
96 
97         // If the ok file does not exist then check to see if a file-specific or global error exists
98         if (!okFileExists)
99         {
100             // Check for a file-specific error first
101             errorFile = strNewFmt("%s" STATUS_EXT_ERROR, strZ(walSegment));
102             errorFileExists = storageExistsP(storageSpool(), strNewFmt("%s/%s", strZ(spoolQueue), strZ(errorFile)));
103 
104             // If that doesn't exist then check for a global error
105             if (!errorFileExists)
106             {
107                 errorFile = STATUS_FILE_GLOBAL_ERROR_STR;
108                 errorFileExists = storageExistsP(storageSpool(), strNewFmt("%s/%s", strZ(spoolQueue), strZ(errorFile)));
109             }
110         }
111 
112         // If either of them exists then check what happened and report back
113         if (okFileExists || errorFileExists)
114         {
115             // Get the status file content
116             const String *statusFile = okFileExists ? okFile: errorFile;
117 
118             String *content = strNewBuf(
119                 storageGetP(storageNewReadP(storageSpool(), strNewFmt("%s/%s", strZ(spoolQueue), strZ(statusFile)))));
120 
121             // Get the code and message if the file has content
122             int code = 0;
123             const String *message = NULL;
124 
125             if (strSize(content) != 0)
126             {
127                 // Find the line feed after the error code -- should be the first one
128                 const char *linefeedPtr = strchr(strZ(content), '\n');
129 
130                 // Error if linefeed not found
131                 if (linefeedPtr == NULL)
132                     THROW_FMT(FormatError, "%s content must have at least two lines", strZ(statusFile));
133 
134                 // Error if message is zero-length
135                 if (strlen(linefeedPtr + 1) == 0)
136                     THROW_FMT(FormatError, "%s message must be > 0", strZ(statusFile));
137 
138                 // Get contents
139                 code = varIntForce(VARSTR(strNewN(strZ(content), (size_t)(linefeedPtr - strZ(content)))));
140                 message = strTrim(strNewZ(linefeedPtr + 1));
141             }
142 
143             // Process OK files
144             if (okFileExists)
145             {
146                 // If there is content in the status file it is a warning
147                 if (strSize(content) != 0 && warnOnOk)
148                 {
149                     // If error code is not success, then this was a renamed error file
150                     if (code != 0)
151                     {
152                         message = strNewFmt(
153                             "WAL segment '%s' was not pushed due to error [%d] and was manually skipped: %s", strZ(walSegment),
154                             code, strZ(message));
155                     }
156 
157                     LOG_WARN(strZ(message));
158                 }
159 
160                 result = true;
161             }
162             else if (throwOnError)
163             {
164                 // Error status files must have content
165                 if (strSize(content) == 0)
166                     THROW_FMT(AssertError, "status file '%s' has no content", strZ(statusFile));
167 
168                 // Throw error using the code passed in the file
169                 THROW_CODE(code, strZ(message));
170             }
171         }
172     }
173     MEM_CONTEXT_TEMP_END();
174 
175     FUNCTION_LOG_RETURN(BOOL, result);
176 }
177 
178 /**********************************************************************************************************************************/
179 void
archiveAsyncStatusErrorWrite(ArchiveMode archiveMode,const String * walSegment,int code,const String * message)180 archiveAsyncStatusErrorWrite(ArchiveMode archiveMode, const String *walSegment, int code, const String *message)
181 {
182     FUNCTION_LOG_BEGIN(logLevelDebug);
183         FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
184         FUNCTION_LOG_PARAM(STRING, walSegment);
185         FUNCTION_LOG_PARAM(INT, code);
186         FUNCTION_LOG_PARAM(STRING, message);
187     FUNCTION_LOG_END();
188 
189     ASSERT(code != 0);
190     ASSERT(message != NULL);
191 
192     MEM_CONTEXT_TEMP_BEGIN()
193     {
194         const String *errorFile = walSegment == NULL ? STATUS_FILE_GLOBAL_STR : walSegment;
195 
196         storagePutP(
197             storageNewWriteP(
198                 storageSpoolWrite(),
199                 strNewFmt("%s/%s" STATUS_EXT_ERROR, strZ(archiveAsyncSpoolQueue(archiveMode)), strZ(errorFile))),
200             BUFSTR(strNewFmt("%d\n%s", code, strZ(message))));
201     }
202     MEM_CONTEXT_TEMP_END();
203 
204     FUNCTION_LOG_RETURN_VOID();
205 }
206 
207 /**********************************************************************************************************************************/
208 void
archiveAsyncStatusOkWrite(ArchiveMode archiveMode,const String * walSegment,const String * warning)209 archiveAsyncStatusOkWrite(ArchiveMode archiveMode, const String *walSegment, const String *warning)
210 {
211     FUNCTION_LOG_BEGIN(logLevelDebug);
212         FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
213         FUNCTION_LOG_PARAM(STRING, walSegment);
214         FUNCTION_LOG_PARAM(STRING, warning);
215     FUNCTION_LOG_END();
216 
217     ASSERT(walSegment != NULL);
218 
219     MEM_CONTEXT_TEMP_BEGIN()
220     {
221         // Write file
222         storagePutP(
223             storageNewWriteP(
224                 storageSpoolWrite(), strNewFmt("%s/%s" STATUS_EXT_OK, strZ(archiveAsyncSpoolQueue(archiveMode)), strZ(walSegment))),
225             warning == NULL ? NULL : BUFSTR(strNewFmt("0\n%s", strZ(warning))));
226     }
227     MEM_CONTEXT_TEMP_END();
228 
229     FUNCTION_LOG_RETURN_VOID();
230 }
231 
232 /**********************************************************************************************************************************/
233 void
archiveAsyncExec(ArchiveMode archiveMode,const StringList * commandExec)234 archiveAsyncExec(ArchiveMode archiveMode, const StringList *commandExec)
235 {
236     FUNCTION_LOG_BEGIN(logLevelDebug);
237         FUNCTION_LOG_PARAM(STRING_ID, archiveMode);
238         FUNCTION_LOG_PARAM(STRING_LIST, commandExec);
239     FUNCTION_LOG_END();
240 
241     ASSERT(commandExec != NULL);
242 
243     // Fork off the async process
244     pid_t pid = forkSafe();
245 
246     if (pid == 0)
247     {
248         // Disable logging and close log file
249         logClose();
250 
251         // Detach from parent process
252         forkDetach();
253 
254         // Close any open file descriptors above the standard three (stdin, stdout, stderr). Don't check the return value since we
255         // don't know which file descriptors are actually open (might be none). It's possible that there are open files >= 1024 but
256         // there is no easy way to detect that and this should give us enough descriptors to do our work.
257         for (int fd = 3; fd < 1024; fd++)
258             close(fd);
259 
260         // Execute the binary.  This statement will not return if it is successful.
261         THROW_ON_SYS_ERROR_FMT(
262             execvp(strZ(strLstGet(commandExec, 0)), (char ** const)strLstPtr(commandExec)) == -1, ExecuteError,
263             "unable to execute asynchronous '%s'", archiveMode == archiveModeGet ? CFGCMD_ARCHIVE_GET : CFGCMD_ARCHIVE_PUSH);
264     }
265 
266 #ifdef DEBUG_EXEC_TIME
267     // Get the time to measure how long it takes for the forked process to exit
268     TimeMSec timeBegin = timeMSec();
269 #endif
270 
271     // The process that was just forked should return immediately
272     int processStatus;
273 
274     THROW_ON_SYS_ERROR(waitpid(pid, &processStatus, 0) == -1, ExecuteError, "unable to wait for forked process");
275 
276     // The first fork should exit with success.  If not, something went wrong during the second fork.
277     CHECK(WIFEXITED(processStatus) && WEXITSTATUS(processStatus) == 0);
278 
279 #ifdef DEBUG_EXEC_TIME
280     // If the process does not exit immediately then something probably went wrong with the double fork.  It's possible that this
281     // test will fail on very slow systems so it may need to be tuned.  The idea is to make sure that the waitpid() above is not
282     // waiting on the async process.
283     ASSERT(timeMSec() - timeBegin < 10);
284 #endif
285 
286     FUNCTION_LOG_RETURN_VOID();
287 }
288 
289 /**********************************************************************************************************************************/
290 int
archiveIdComparator(const void * item1,const void * item2)291 archiveIdComparator(const void *item1, const void *item2)
292 {
293     StringList *archiveSort1 = strLstNewSplitZ(*(String **)item1, "-");
294     StringList *archiveSort2 = strLstNewSplitZ(*(String **)item2, "-");
295     int int1 = atoi(strZ(strLstGet(archiveSort1, 1)));
296     int int2 = atoi(strZ(strLstGet(archiveSort2, 1)));
297 
298     return (int1 - int2);
299 }
300 
301 /**********************************************************************************************************************************/
302 bool
walIsPartial(const String * walSegment)303 walIsPartial(const String *walSegment)
304 {
305     FUNCTION_LOG_BEGIN(logLevelTrace);
306         FUNCTION_LOG_PARAM(STRING, walSegment);
307     FUNCTION_LOG_END();
308 
309     ASSERT(walSegment != NULL);
310     ASSERT(walIsSegment(walSegment));
311 
312     FUNCTION_LOG_RETURN(BOOL, strEndsWithZ(walSegment, WAL_SEGMENT_PARTIAL_EXT));
313 }
314 
315 /**********************************************************************************************************************************/
316 String *
walPath(const String * walFile,const String * pgPath,const String * command)317 walPath(const String *walFile, const String *pgPath, const String *command)
318 {
319     FUNCTION_LOG_BEGIN(logLevelDebug);
320         FUNCTION_LOG_PARAM(STRING, walFile);
321         FUNCTION_LOG_PARAM(STRING, pgPath);
322         FUNCTION_LOG_PARAM(STRING, command);
323     FUNCTION_LOG_END();
324 
325     ASSERT(walFile != NULL);
326     ASSERT(command != NULL);
327 
328     String *result = NULL;
329 
330     if (!strBeginsWithZ(walFile, "/"))
331     {
332         // Error if walFile has a relative path and pgPath is not set
333         if (pgPath == NULL)
334         {
335             THROW_FMT(
336                 OptionRequiredError,
337                 "option '%s' must be specified when relative wal paths are used\n"
338                     "HINT: is %%f passed to %s instead of %%p?\n"
339                     "HINT: PostgreSQL may pass relative paths even with %%p depending on the environment.",
340                 cfgOptionName(cfgOptPgPath), strZ(command));
341         }
342 
343         // Get the working directory
344         char currentWorkDir[4096];
345         THROW_ON_SYS_ERROR(getcwd(currentWorkDir, sizeof(currentWorkDir)) == NULL, FormatError, "unable to get cwd");
346 
347         // Check if the working directory is the same as pgPath
348         if (!strEqZ(pgPath, currentWorkDir))
349         {
350             // If not we'll change the working directory to pgPath and see if that equals the working directory we got called with
351             THROW_ON_SYS_ERROR_FMT(chdir(strZ(pgPath)) != 0, PathMissingError, "unable to chdir() to '%s'", strZ(pgPath));
352 
353             // Get the new working directory
354             char newWorkDir[4096];
355             THROW_ON_SYS_ERROR(getcwd(newWorkDir, sizeof(newWorkDir)) == NULL, FormatError, "unable to get cwd");
356 
357             // Error if the new working directory is not equal to the original current working directory. This means that PostgreSQL
358             // and pgBackrest have a different idea about where the PostgreSQL data directory is located.
359             if (strcmp(currentWorkDir, newWorkDir) != 0)
360             {
361                 THROW_FMT(
362                     OptionInvalidValueError,
363                     PG_NAME " working directory '%s' is not the same as option %s '%s'\n"
364                         "HINT: is the " PG_NAME " data_directory configured the same as the %s option?",
365                     currentWorkDir, cfgOptionName(cfgOptPgPath), strZ(pgPath), cfgOptionName(cfgOptPgPath));
366             }
367         }
368 
369         result = strNewFmt("%s/%s", strZ(pgPath), strZ(walFile));
370     }
371     else
372         result = strDup(walFile);
373 
374     FUNCTION_LOG_RETURN(STRING, result);
375 }
376 
377 /**********************************************************************************************************************************/
378 bool
walIsSegment(const String * walSegment)379 walIsSegment(const String *walSegment)
380 {
381     FUNCTION_LOG_BEGIN(logLevelTrace);
382         FUNCTION_LOG_PARAM(STRING, walSegment);
383     FUNCTION_LOG_END();
384 
385     ASSERT(walSegment != NULL);
386 
387     // Create the regular expression to identify WAL segments if it does not already exist
388     static RegExp *regExpSegment = NULL;
389 
390     if (regExpSegment == NULL)
391     {
392         MEM_CONTEXT_BEGIN(memContextTop())
393         {
394             regExpSegment = regExpNew(WAL_SEGMENT_PARTIAL_REGEXP_STR);
395         }
396         MEM_CONTEXT_END();
397     }
398 
399     FUNCTION_LOG_RETURN(BOOL, regExpMatch(regExpSegment, walSegment));
400 }
401 
402 /**********************************************************************************************************************************/
403 String *
walSegmentFind(const Storage * storage,const String * archiveId,const String * walSegment,TimeMSec timeout)404 walSegmentFind(const Storage *storage, const String *archiveId, const String *walSegment, TimeMSec timeout)
405 {
406     FUNCTION_LOG_BEGIN(logLevelDebug);
407         FUNCTION_LOG_PARAM(STORAGE, storage);
408         FUNCTION_LOG_PARAM(STRING, archiveId);
409         FUNCTION_LOG_PARAM(STRING, walSegment);
410         FUNCTION_LOG_PARAM(TIME_MSEC, timeout);
411     FUNCTION_LOG_END();
412 
413     ASSERT(storage != NULL);
414     ASSERT(archiveId != NULL);
415     ASSERT(walSegment != NULL);
416     ASSERT(walIsSegment(walSegment));
417 
418     String *result = NULL;
419 
420     MEM_CONTEXT_TEMP_BEGIN()
421     {
422         Wait *wait = waitNew(timeout);
423 
424         do
425         {
426             // Get a list of all WAL segments that match
427             StringList *list = storageListP(
428                 storage, strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(archiveId), strZ(strSubN(walSegment, 0, 16))),
429                 .expression = strNewFmt(
430                     "^%s%s-[0-f]{40}" COMPRESS_TYPE_REGEXP "{0,1}$", strZ(strSubN(walSegment, 0, 24)),
431                         walIsPartial(walSegment) ? WAL_SEGMENT_PARTIAL_EXT : ""),
432                 .nullOnMissing = true);
433 
434             // If there are results
435             if (list != NULL && !strLstEmpty(list))
436             {
437                 // Error if there is more than one match
438                 if (strLstSize(list) > 1)
439                 {
440                     THROW_FMT(
441                         ArchiveDuplicateError,
442                         "duplicates found in archive for WAL segment %s: %s\n"
443                             "HINT: are multiple primaries archiving to this stanza?",
444                         strZ(walSegment), strZ(strLstJoin(strLstSort(list, sortOrderAsc), ", ")));
445                 }
446 
447                 // Copy file name of WAL segment found into the prior context
448                 MEM_CONTEXT_PRIOR_BEGIN()
449                 {
450                     result = strDup(strLstGet(list, 0));
451                 }
452                 MEM_CONTEXT_PRIOR_END();
453             }
454         }
455         while (result == NULL && waitMore(wait));
456     }
457     MEM_CONTEXT_TEMP_END();
458 
459     if (result == NULL && timeout != 0)
460     {
461         THROW_FMT(
462             ArchiveTimeoutError,
463             "WAL segment %s was not archived before the %" PRIu64 "ms timeout\n"
464                 "HINT: check the archive_command to ensure that all options are correct (especially --stanza).\n"
465                 "HINT: check the PostgreSQL server log for errors.\n"
466                 "HINT: run the 'start' command if the stanza was previously stopped.",
467             strZ(walSegment), timeout);
468     }
469 
470     FUNCTION_LOG_RETURN(STRING, result);
471 }
472 
473 /**********************************************************************************************************************************/
474 String *
walSegmentNext(const String * walSegment,size_t walSegmentSize,unsigned int pgVersion)475 walSegmentNext(const String *walSegment, size_t walSegmentSize, unsigned int pgVersion)
476 {
477     FUNCTION_LOG_BEGIN(logLevelTrace);
478         FUNCTION_LOG_PARAM(STRING, walSegment);
479         FUNCTION_LOG_PARAM(SIZE, walSegmentSize);
480         FUNCTION_LOG_PARAM(UINT, pgVersion);
481     FUNCTION_LOG_END();
482 
483     ASSERT(walSegment != NULL);
484     ASSERT(strSize(walSegment) == 24);
485     ASSERT(UINT32_MAX % walSegmentSize == walSegmentSize - 1);
486     ASSERT(pgVersion >= PG_VERSION_11 || walSegmentSize == 16 * 1024 * 1024);
487 
488     // Extract WAL parts
489     uint32_t timeline = 0;
490     uint32_t major = 0;
491     uint32_t minor = 0;
492 
493     MEM_CONTEXT_TEMP_BEGIN()
494     {
495         timeline = (uint32_t)strtol(strZ(strSubN(walSegment, 0, 8)), NULL, 16);
496         major = (uint32_t)strtol(strZ(strSubN(walSegment, 8, 8)), NULL, 16);
497         minor = (uint32_t)strtol(strZ(strSubN(walSegment, 16, 8)), NULL, 16);
498 
499         // Increment minor and adjust major dir on overflow
500         minor++;
501 
502         if (minor > UINT32_MAX / walSegmentSize)
503         {
504             major++;
505             minor = 0;
506         }
507 
508         // Special hack for PostgreSQL < 9.3 which skipped minor FF
509         if (minor == 0xFF && pgVersion < PG_VERSION_93)
510         {
511             major++;
512             minor = 0;
513         }
514     }
515     MEM_CONTEXT_TEMP_END();
516 
517     FUNCTION_LOG_RETURN(STRING, strNewFmt("%08X%08X%08X", timeline, major, minor));
518 }
519 
520 /**********************************************************************************************************************************/
521 StringList *
walSegmentRange(const String * walSegmentBegin,size_t walSegmentSize,unsigned int pgVersion,unsigned int range)522 walSegmentRange(const String *walSegmentBegin, size_t walSegmentSize, unsigned int pgVersion, unsigned int range)
523 {
524     FUNCTION_LOG_BEGIN(logLevelDebug);
525         FUNCTION_LOG_PARAM(STRING, walSegmentBegin);
526         FUNCTION_LOG_PARAM(SIZE, walSegmentSize);
527         FUNCTION_LOG_PARAM(UINT, pgVersion);
528     FUNCTION_LOG_END();
529 
530     ASSERT(range > 0);
531 
532     StringList *result = NULL;
533 
534     MEM_CONTEXT_TEMP_BEGIN()
535     {
536         result = strLstNew();
537         strLstAdd(result, walSegmentBegin);
538 
539         if (range > 1)
540         {
541             String *current = strDup(walSegmentBegin);
542 
543             for (unsigned int rangeIdx = 0; rangeIdx < range - 1; rangeIdx++)
544             {
545                 String *next = walSegmentNext(current, walSegmentSize, pgVersion);
546 
547                 strLstAdd(result, next);
548 
549                 strFree(current);
550                 current = next;
551             }
552         }
553 
554         strLstMove(result, memContextPrior());
555     }
556     MEM_CONTEXT_TEMP_END();
557 
558     FUNCTION_LOG_RETURN(STRING_LIST, result);
559 }
560