1 /***********************************************************************************************************************************
2 Archive Get Command
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5 
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 #include "command/archive/common.h"
13 #include "command/archive/get/file.h"
14 #include "command/archive/get/protocol.h"
15 #include "command/command.h"
16 #include "common/debug.h"
17 #include "common/log.h"
18 #include "common/memContext.h"
19 #include "common/regExp.h"
20 #include "common/wait.h"
21 #include "config/config.h"
22 #include "config/exec.h"
23 #include "info/infoArchive.h"
24 #include "postgres/interface.h"
25 #include "protocol/helper.h"
26 #include "protocol/parallel.h"
27 #include "storage/helper.h"
28 #include "storage/write.intern.h"
29 
30 /***********************************************************************************************************************************
31 Constants for log messages that are used multiple times to keep them consistent
32 ***********************************************************************************************************************************/
33 #define FOUND_IN_ARCHIVE_MSG                                        "found %s in the archive"
34 #define FOUND_IN_REPO_ARCHIVE_MSG                                   "found %s in the repo%u: %s archive"
35 #define UNABLE_TO_FIND_IN_ARCHIVE_MSG                               "unable to find %s in the archive"
36 #define UNABLE_TO_FIND_VALID_REPO_MSG                               "unable to find a valid repository"
37 #define REPO_INVALID_OR_ERR_MSG                                     "some repositories were invalid or encountered errors"
38 
39 /***********************************************************************************************************************************
40 Check for a list of archive files in the repository
41 ***********************************************************************************************************************************/
42 typedef struct ArchiveFileMap
43 {
44     const String *request;                                          // Archive file requested by archive_command
45     List *actualList;                                               // Actual files in various repos/archiveIds
46     StringList *warnList;                                           // Warnings that need to be reported by the async process
47 } ArchiveFileMap;
48 
49 typedef struct ArchiveGetCheckResult
50 {
51     List *archiveFileMapList;                                       // List of mapped archive files, i.e. found in the repo
52 
53     // Global error that affects all repos
54     const ErrorType *errorType;                                     // Error type if there was an error
55     const String *errorFile;                                        // Error file if there was an error
56     const String *errorMessage;                                     // Error message if there was an error
57     const StringList *warnList;                                     // Warnings that need to be reported by the async process
58 } ArchiveGetCheckResult;
59 
60 // Helper to add an error to an error list and warn if the error is not already in the list
61 static void
archiveGetErrorAdd(StringList * warnList,bool log,unsigned int repoIdx,const ErrorType * type,const String * message)62 archiveGetErrorAdd(StringList *warnList, bool log, unsigned int repoIdx, const ErrorType *type, const String *message)
63 {
64     const String *warn = strNewFmt(
65         "repo%u: [%s] %s", cfgOptionGroupIdxToKey(cfgOptGrpRepo, repoIdx), errorTypeName(type), strZ(message));
66 
67     if (!strLstExists(warnList, warn))
68     {
69         if (log)
70             LOG_WARN(strZ(warn));
71 
72         strLstAdd(warnList, warn);
73     }
74 }
75 
76 // Helper to find a single archive file in the repository using a cache to speed up the process and minimize storageListP() calls
77 typedef struct ArchiveGetFindCachePath
78 {
79     const String *path;                                             // Cached path in the archiveId
80     const StringList *fileList;                                     // List of files in the cache path
81 } ArchiveGetFindCachePath;
82 
83 typedef struct ArchiveGetFindCacheArchive
84 {
85     const String *archiveId;                                        // ArchiveId in the repo
86     List *pathList;                                                 // List of paths cached for archiveId
87 } ArchiveGetFindCacheArchive;
88 
89 typedef struct ArchiveGetFindCacheRepo
90 {
91     unsigned int repoIdx;
92     CipherType cipherType;                                          // Repo cipher type
93     const String *cipherPassArchive;                                // Repo archive cipher pass
94     List *archiveList;                                              // Cached list of archiveIds and associated paths
95     StringList *warnList;                                           // Track repo warnings so each is only reported once
96 } ArchiveGetFindCacheRepo;
97 
98 static bool
archiveGetFind(const String * archiveFileRequest,ArchiveGetCheckResult * getCheckResult,List * cacheRepoList,const StringList * warnList,bool single)99 archiveGetFind(
100     const String *archiveFileRequest, ArchiveGetCheckResult *getCheckResult, List *cacheRepoList, const StringList *warnList,
101     bool single)
102 {
103     FUNCTION_LOG_BEGIN(logLevelDebug);
104         FUNCTION_LOG_PARAM(STRING, archiveFileRequest);
105         FUNCTION_LOG_PARAM_P(VOID, getCheckResult);
106         FUNCTION_LOG_PARAM(LIST, cacheRepoList);
107         FUNCTION_LOG_PARAM(STRING_LIST, warnList);
108         FUNCTION_LOG_PARAM(BOOL, single);
109     FUNCTION_LOG_END();
110 
111     ASSERT(archiveFileRequest != NULL);
112     ASSERT(getCheckResult != NULL);
113     ASSERT(cacheRepoList != NULL);
114 
115     bool result = false;
116 
117     MEM_CONTEXT_TEMP_BEGIN()
118     {
119         // Is the archive file a WAL segment?
120         bool isSegment = walIsSegment(archiveFileRequest);
121 
122         // Get the WAL segment path
123         const String *path = isSegment ? strSubN(archiveFileRequest, 0, 16) : NULL;
124 
125         // List to hold matches for the requested file
126         List *matchList = lstNewP(sizeof(ArchiveGetFile));
127 
128         // List of file level warnings
129         StringList *fileWarnList = strLstDup(warnList);
130 
131         // Errored repo total to track if all repos errored
132         unsigned int repoErrorTotal = 0;
133 
134         // Check each repo
135         for (unsigned int repoCacheIdx = 0; repoCacheIdx < lstSize(cacheRepoList); repoCacheIdx++)
136         {
137             ArchiveGetFindCacheRepo *cacheRepo = lstGet(cacheRepoList, repoCacheIdx);
138 
139             TRY_BEGIN()
140             {
141                 // Check each archiveId
142                 for (unsigned int archiveCacheIdx = 0; archiveCacheIdx < lstSize(cacheRepo->archiveList); archiveCacheIdx++)
143                 {
144                     ArchiveGetFindCacheArchive *cacheArchive = lstGet(cacheRepo->archiveList, archiveCacheIdx);
145 
146                     // If a WAL segment then search among the possible file names
147                     if (isSegment)
148                     {
149                         StringList *segmentList = NULL;
150 
151                         // If a single file is requested then optimize by adding a restrictive expression to reduce bandwidth
152                         if (single)
153                         {
154                             segmentList = storageListP(
155                                 storageRepoIdx(cacheRepo->repoIdx),
156                                 strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(cacheArchive->archiveId), strZ(path)),
157                                 .expression = strNewFmt(
158                                     "^%s%s-[0-f]{40}" COMPRESS_TYPE_REGEXP "{0,1}$", strZ(strSubN(archiveFileRequest, 0, 24)),
159                                         walIsPartial(archiveFileRequest) ? WAL_SEGMENT_PARTIAL_EXT : ""));
160                         }
161                         // Else multiple files will be requested so cache list results
162                         else
163                         {
164                             // Partial files cannot be in a list with multiple requests
165                             ASSERT(!walIsPartial(archiveFileRequest));
166 
167                             // If the path does not exist in the cache then fetch it
168                             const ArchiveGetFindCachePath *cachePath = lstFind(cacheArchive->pathList, &path);
169 
170                             if (cachePath == NULL)
171                             {
172                                 MEM_CONTEXT_BEGIN(lstMemContext(cacheArchive->pathList))
173                                 {
174                                     cachePath = lstAdd(
175                                         cacheArchive->pathList,
176                                         &(ArchiveGetFindCachePath)
177                                         {
178                                             .path = strDup(path),
179                                             .fileList = storageListP(
180                                                 storageRepoIdx(cacheRepo->repoIdx),
181                                                 strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(cacheArchive->archiveId), strZ(path)),
182                                                 .expression = strNewFmt(
183                                                     "^%s[0-F]{8}-[0-f]{40}" COMPRESS_TYPE_REGEXP "{0,1}$", strZ(path))),
184                                         });
185                                 }
186                                 MEM_CONTEXT_END();
187                             }
188 
189                             // Get a list of all WAL segments that match
190                             segmentList = strLstNew();
191 
192                             for (unsigned int fileIdx = 0; fileIdx < strLstSize(cachePath->fileList); fileIdx++)
193                             {
194                                 if (strBeginsWith(strLstGet(cachePath->fileList, fileIdx), archiveFileRequest))
195                                     strLstAdd(segmentList, strLstGet(cachePath->fileList, fileIdx));
196                             }
197                         }
198 
199                         // Add segments to match list
200                         for (unsigned int segmentIdx = 0; segmentIdx < strLstSize(segmentList); segmentIdx++)
201                         {
202                             MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
203                             {
204                                 lstAdd(
205                                     matchList,
206                                     &(ArchiveGetFile)
207                                     {
208                                         .file = strNewFmt(
209                                             "%s/%s/%s", strZ(cacheArchive->archiveId), strZ(path),
210                                             strZ(strLstGet(segmentList, segmentIdx))),
211                                         .repoIdx = cacheRepo->repoIdx,
212                                         .archiveId = cacheArchive->archiveId,
213                                         .cipherType = cacheRepo->cipherType,
214                                         .cipherPassArchive = cacheRepo->cipherPassArchive,
215                                     });
216                             }
217                             MEM_CONTEXT_END();
218                         }
219                     }
220                     // Else if not a WAL segment, see if it exists in the archiveId path
221                     else if (storageExistsP(
222                         storageRepoIdx(cacheRepo->repoIdx), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(cacheArchive->archiveId),
223                         strZ(archiveFileRequest))))
224                     {
225                         MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
226                         {
227                             lstAdd(
228                                 matchList,
229                                 &(ArchiveGetFile)
230                                 {
231                                     .file = strNewFmt("%s/%s", strZ(cacheArchive->archiveId), strZ(archiveFileRequest)),
232                                     .repoIdx = cacheRepo->repoIdx,
233                                     .archiveId = cacheArchive->archiveId,
234                                     .cipherType = cacheRepo->cipherType,
235                                     .cipherPassArchive = cacheRepo->cipherPassArchive,
236                                 });
237                         }
238                         MEM_CONTEXT_END();
239                     }
240                 }
241             }
242             // Log errors as warnings and continue
243             CATCH_ANY()
244             {
245                 repoErrorTotal++;
246                 archiveGetErrorAdd(cacheRepo->warnList, true, cacheRepo->repoIdx, errorType(), STR(errorMessage()));
247                 archiveGetErrorAdd(fileWarnList, false, cacheRepo->repoIdx, errorType(), STR(errorMessage()));
248             }
249             TRY_END();
250         }
251 
252         // If all repos errored out then set the global error since processing cannot continue past this segment
253         ASSERT(repoErrorTotal <= lstSize(cacheRepoList));
254 
255         if (repoErrorTotal == lstSize(cacheRepoList))
256         {
257             ASSERT(!strLstEmpty(fileWarnList));
258 
259             MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
260             {
261                 getCheckResult->errorType = &RepoInvalidError;
262                 getCheckResult->errorFile = strDup(archiveFileRequest);
263                 getCheckResult->errorMessage = strNewZ(UNABLE_TO_FIND_VALID_REPO_MSG);
264                 getCheckResult->warnList = strLstMove(fileWarnList, memContextCurrent());
265             }
266             MEM_CONTEXT_END();
267 
268         }
269         // Else if a file was found
270         else if (!lstEmpty(matchList))
271         {
272             bool error = false;
273 
274             // If a segment match list is > 1 then check for duplicates
275             if (isSegment && lstSize(matchList) > 1)
276             {
277                 // Count the number of unique hashes
278                 StringList *hashList = strLstNew();
279 
280                 for (unsigned int matchIdx = 0; matchIdx < lstSize(matchList); matchIdx++)
281                     strLstAddIfMissing(hashList, strSubN(((ArchiveGetFile *)lstGet(matchList, matchIdx))->file, 25, 40));
282 
283                 // If there is more than one unique hash then there are duplicates
284                 if (strLstSize(hashList) > 1)
285                 {
286                     // Build list of duplicates
287                     unsigned int repoKeyLast = 0;
288                     String *message = strNew();
289                     bool first = true;
290 
291                     for (unsigned int matchIdx = 0; matchIdx < lstSize(matchList); matchIdx++)
292                     {
293                         ArchiveGetFile *file = lstGet(matchList, matchIdx);
294                         unsigned int repoKey = cfgOptionGroupIdxToKey(cfgOptGrpRepo, file->repoIdx);
295 
296                         if (repoKey != repoKeyLast)
297                         {
298                             strCatFmt(message, "\nrepo%u:", repoKey);
299                             repoKeyLast = repoKey;
300                             first = true;
301                         }
302 
303                         if (first)
304                             first = false;
305                         else
306                             strCatChr(message, ',');
307 
308                         strCatFmt(message, " %s", strZ(file->file));
309                     }
310 
311                     // Set as global error since processing cannot continue past this segment
312                     MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
313                     {
314                         getCheckResult->errorType = &ArchiveDuplicateError;
315                         getCheckResult->errorFile = strDup(archiveFileRequest);
316                         getCheckResult->errorMessage = strNewFmt(
317                             "duplicates found for WAL segment %s:%s\n"
318                                 "HINT: are multiple primaries archiving to this stanza?",
319                             strZ(archiveFileRequest), strZ(message));
320                         getCheckResult->warnList = strLstMove(fileWarnList, memContextCurrent());
321                     }
322                     MEM_CONTEXT_END();
323 
324                     error = true;
325                 }
326             }
327 
328             // Files are valid so add them to the map
329             if (!error)
330             {
331                 MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
332                 {
333                     ArchiveFileMap map =
334                     {
335                         .request = strDup(archiveFileRequest),
336                         .actualList = lstNewP(sizeof(ArchiveGetFile)),
337                         .warnList = strLstMove(fileWarnList, memContextCurrent()),
338                     };
339 
340                     for (unsigned int matchIdx = 0; matchIdx < lstSize(matchList); matchIdx++)
341                         lstAdd(map.actualList, lstGet(matchList, matchIdx));
342 
343                     lstAdd(getCheckResult->archiveFileMapList, &map);
344                 }
345                 MEM_CONTEXT_END();
346 
347                 result = true;
348             }
349         }
350     }
351     MEM_CONTEXT_TEMP_END();
352 
353     FUNCTION_LOG_RETURN(BOOL, result);
354 }
355 
356 static ArchiveGetCheckResult
archiveGetCheck(const StringList * archiveRequestList)357 archiveGetCheck(const StringList *archiveRequestList)
358 {
359     FUNCTION_LOG_BEGIN(logLevelDebug);
360         FUNCTION_LOG_PARAM(STRING_LIST, archiveRequestList);
361     FUNCTION_LOG_END();
362 
363     ASSERT(archiveRequestList != NULL);
364     ASSERT(!strLstEmpty(archiveRequestList));
365 
366     ArchiveGetCheckResult result = {.archiveFileMapList = lstNewP(sizeof(ArchiveFileMap), .comparator = lstComparatorStr)};
367 
368     MEM_CONTEXT_TEMP_BEGIN()
369     {
370         // List of warnings
371         StringList *warnList = strLstNew();
372 
373         // Get pg control info
374         PgControl controlInfo = pgControlFromFile(storagePg());
375 
376         // Build list of repos/archiveIds where WAL may be found
377         List *cacheRepoList = lstNewP(sizeof(ArchiveGetFindCacheRepo));
378 
379         for (unsigned int repoIdx = 0; repoIdx < cfgOptionGroupIdxTotal(cfgOptGrpRepo); repoIdx++)
380         {
381             // If a repo was specified then skip all other repos
382             if (cfgOptionTest(cfgOptRepo) && cfgOptionUInt(cfgOptRepo) != cfgOptionGroupIdxToKey(cfgOptGrpRepo, repoIdx))
383                 continue;
384 
385             TRY_BEGIN()
386             {
387                 // Get the repo storage in case it is remote and encryption settings need to be pulled down
388                 storageRepoIdx(repoIdx);
389 
390                 ArchiveGetFindCacheRepo cacheRepo =
391                 {
392                     .repoIdx = repoIdx,
393                     .cipherType = cfgOptionIdxStrId(cfgOptRepoCipherType, repoIdx),
394                     .archiveList = lstNewP(sizeof(ArchiveGetFindCacheArchive)),
395                     .warnList = strLstNew(),
396                 };
397 
398                 // Attempt to load the archive info file
399                 InfoArchive *info = infoArchiveLoadFile(
400                     storageRepoIdx(repoIdx), INFO_ARCHIVE_PATH_FILE_STR, cacheRepo.cipherType,
401                     cfgOptionIdxStrNull(cfgOptRepoCipherPass, repoIdx));
402 
403                 // Copy cipher pass into the result list context once rather than making a copy per candidate file later
404                 MEM_CONTEXT_BEGIN(lstMemContext(result.archiveFileMapList))
405                 {
406                     cacheRepo.cipherPassArchive = strDup(infoArchiveCipherPass(info));
407                 }
408                 MEM_CONTEXT_END();
409 
410                 // Loop through pg history and determine which archiveIds to use
411                 StringList *archivePathList = NULL;
412 
413                 for (unsigned int pgIdx = 0; pgIdx < infoPgDataTotal(infoArchivePg(info)); pgIdx++)
414                 {
415                     InfoPgData pgData = infoPgData(infoArchivePg(info), pgIdx);
416 
417                     // Only use the archive id if it matches the current cluster
418                     if (pgData.systemId == controlInfo.systemId && pgData.version == controlInfo.version)
419                     {
420                         const String *archiveId = infoPgArchiveId(infoArchivePg(info), pgIdx);
421                         bool found = true;
422 
423                         // If the archiveId is in the past make sure the path exists
424                         if (pgIdx != 0)
425                         {
426                             // Get list of archiveId paths in the archive path
427                             if (archivePathList == NULL)
428                                 archivePathList = storageListP(storageRepoIdx(repoIdx), STORAGE_REPO_ARCHIVE_STR);
429 
430                             if (!strLstExists(archivePathList, archiveId))
431                                 found = false;
432                         }
433 
434                         // If the archiveId is most recent or has files then add it
435                         if (found)
436                         {
437                             ArchiveGetFindCacheArchive cacheArchive =
438                             {
439                                 .pathList = lstNewP(sizeof(ArchiveGetFindCachePath), .comparator = lstComparatorStr),
440                             };
441 
442                             // Copy archiveId into the result list context once rather than making a copy per candidate file later
443                             MEM_CONTEXT_BEGIN(lstMemContext(result.archiveFileMapList))
444                             {
445                                 cacheArchive.archiveId = strDup(archiveId);
446                             }
447                             MEM_CONTEXT_END();
448 
449                             lstAdd(cacheRepo.archiveList, &cacheArchive);
450                         }
451                     }
452                 }
453 
454                 // Error if no archive id was found -- this indicates a mismatch with the current cluster
455                 if (lstEmpty(cacheRepo.archiveList))
456                 {
457                     archiveGetErrorAdd(
458                         warnList, true, repoIdx, &ArchiveMismatchError,
459                         strNewFmt(
460                             "unable to retrieve the archive id for database version '%s' and system-id '%" PRIu64 "'",
461                             strZ(pgVersionToStr(controlInfo.version)), controlInfo.systemId));
462                 }
463                 // Else add repo to list
464                 else
465                     lstAdd(cacheRepoList, &cacheRepo);
466             }
467             // Log errors as warnings and continue
468             CATCH_ANY()
469             {
470                 archiveGetErrorAdd(warnList, true, repoIdx, errorType(), STR(errorMessage()));
471             }
472             TRY_END();
473         }
474 
475         // Error if there are no repos to check
476         if (lstEmpty(cacheRepoList))
477         {
478             ASSERT(!strLstEmpty(warnList));
479 
480             // Set as global error since processing cannot continue past this segment
481             MEM_CONTEXT_BEGIN(lstMemContext(result.archiveFileMapList))
482             {
483                 result.errorType = &RepoInvalidError;
484                 result.errorMessage = strNewZ(UNABLE_TO_FIND_VALID_REPO_MSG);
485                 result.warnList = strLstMove(warnList, memContextCurrent());
486             }
487             MEM_CONTEXT_END();
488         }
489         else
490         {
491             // Any remaining errors will be reported as warnings since at least one repo is valid
492             MEM_CONTEXT_BEGIN(lstMemContext(result.archiveFileMapList))
493             {
494                 result.warnList = strLstMove(warnList, memContextCurrent());
495             }
496             MEM_CONTEXT_END();
497 
498             // Find files in the list
499             for (unsigned int archiveRequestIdx = 0; archiveRequestIdx < strLstSize(archiveRequestList); archiveRequestIdx++)
500             {
501                 if (!archiveGetFind(
502                         strLstGet(archiveRequestList, archiveRequestIdx), &result, cacheRepoList, warnList,
503                         strLstSize(archiveRequestList) == 1))
504                 {
505                     break;
506                 }
507             }
508 
509             // Sort the list to make searching for files faster
510             lstSort(result.archiveFileMapList, sortOrderAsc);
511         }
512     }
513     MEM_CONTEXT_TEMP_END();
514 
515     FUNCTION_LOG_RETURN_STRUCT(result);
516 }
517 
518 /***********************************************************************************************************************************
519 Clean the queue and prepare a list of WAL segments that the async process should get
520 ***********************************************************************************************************************************/
521 static StringList *
queueNeed(const String * walSegment,bool found,uint64_t queueSize,size_t walSegmentSize,unsigned int pgVersion)522 queueNeed(const String *walSegment, bool found, uint64_t queueSize, size_t walSegmentSize, unsigned int pgVersion)
523 {
524     FUNCTION_LOG_BEGIN(logLevelDebug);
525         FUNCTION_LOG_PARAM(STRING, walSegment);
526         FUNCTION_LOG_PARAM(BOOL, found);
527         FUNCTION_LOG_PARAM(UINT64, queueSize);
528         FUNCTION_LOG_PARAM(SIZE, walSegmentSize);
529         FUNCTION_LOG_PARAM(UINT, pgVersion);
530     FUNCTION_LOG_END();
531 
532     ASSERT(walSegment != NULL);
533 
534     StringList *result = strLstNew();
535 
536     MEM_CONTEXT_TEMP_BEGIN()
537     {
538         // Determine the first WAL segment for the async process to get.  If the WAL segment requested by
539         // PostgreSQL was not found then use that.  If the segment was found but the queue is not full then
540         // start with the next segment.
541         const String *walSegmentFirst =
542             found ? walSegmentNext(walSegment, walSegmentSize, pgVersion) : walSegment;
543 
544         // Determine how many WAL segments should be in the queue.  The queue total must be at least 2 or it doesn't make sense to
545         // have async turned on at all.
546         unsigned int walSegmentQueueTotal = (unsigned int)(queueSize / walSegmentSize);
547 
548         if (walSegmentQueueTotal < 2)
549             walSegmentQueueTotal = 2;
550 
551         // Build the ideal queue -- the WAL segments we want in the queue after the async process has run
552         StringList *idealQueue = strLstSort(
553             walSegmentRange(walSegmentFirst, walSegmentSize, pgVersion, walSegmentQueueTotal), sortOrderAsc);
554 
555         // Get the list of files actually in the queue
556         StringList *actualQueue = strLstSort(
557             storageListP(storageSpool(), STORAGE_SPOOL_ARCHIVE_IN_STR, .errorOnMissing = true), sortOrderAsc);
558 
559         // Build a list of WAL segments that are being kept so we can later make a list of what is needed
560         StringList *keepQueue = strLstNew();
561 
562         for (unsigned int actualQueueIdx = 0; actualQueueIdx < strLstSize(actualQueue); actualQueueIdx++)
563         {
564             // Get file from actual queue
565             const String *file = strLstGet(actualQueue, actualQueueIdx);
566 
567             // Does this match a file we want to preserve?
568             if (strLstExists(idealQueue, file))
569             {
570                 strLstAdd(keepQueue, file);
571             }
572             // Else delete if it does not match an ok file for a WAL segment that has already been preserved. If an ok file exists
573             // in addition to the segment then it contains warnings which need to be preserved.
574             else if (
575                 !strEndsWithZ(file, STATUS_EXT_OK) ||
576                 !strLstExists(actualQueue, strSubN(file, 0, strSize(file) - STATUS_EXT_OK_SIZE)))
577             {
578                 storageRemoveP(storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(file)), .errorOnMissing = true);
579             }
580         }
581 
582         // Generate a list of the WAL that are needed by removing kept WAL from the ideal queue
583         strLstSort(keepQueue, sortOrderAsc);
584 
585         for (unsigned int idealQueueIdx = 0; idealQueueIdx < strLstSize(idealQueue); idealQueueIdx++)
586         {
587             if (!strLstExists(keepQueue, strLstGet(idealQueue, idealQueueIdx)))
588                 strLstAdd(result, strLstGet(idealQueue, idealQueueIdx));
589         }
590     }
591     MEM_CONTEXT_TEMP_END();
592 
593     FUNCTION_LOG_RETURN(STRING_LIST, result);
594 }
595 
596 /**********************************************************************************************************************************/
597 int
cmdArchiveGet(void)598 cmdArchiveGet(void)
599 {
600     FUNCTION_LOG_VOID(logLevelDebug);
601 
602     // PostgreSQL must be local
603     pgIsLocalVerify();
604 
605     // Set the result assuming the archive file will not be found
606     int result = 1;
607 
608     MEM_CONTEXT_TEMP_BEGIN()
609     {
610         // Check the parameters
611         const StringList *commandParam = cfgCommandParam();
612 
613         if (strLstSize(commandParam) != 2)
614         {
615             if (strLstEmpty(commandParam))
616                 THROW(ParamRequiredError, "WAL segment to get required");
617 
618             if (strLstSize(commandParam) == 1)
619                 THROW(ParamRequiredError, "path to copy WAL segment required");
620 
621             THROW(ParamInvalidError, "extra parameters found");
622         }
623 
624         // Get the segment name
625         String *walSegment = strBase(strLstGet(commandParam, 0));
626 
627         // Destination is wherever we were told to move the WAL segment
628         const String *walDestination =
629             walPath(strLstGet(commandParam, 1), cfgOptionStr(cfgOptPgPath), STR(cfgCommandName()));
630 
631         // Async get can only be performed on WAL segments, history or other files must use synchronous mode
632         if (cfgOptionBool(cfgOptArchiveAsync) && walIsSegment(walSegment))
633         {
634             bool first = true;                                          // Is this the first time the loop has run?
635             bool found = false;                                         // Has the WAL segment been found yet?
636             bool foundOk = false;                                       // Was an OK file found which confirms the file was missing?
637             bool queueFull = false;                                     // Is the queue half or more full?
638             bool forked = false;                                        // Has the async process been forked yet?
639 
640             // Loop and wait for the WAL segment to be pushed
641             Wait *wait = waitNew(cfgOptionUInt64(cfgOptArchiveTimeout));
642 
643             do
644             {
645                 // Check if the WAL segment is already in the queue
646                 found = storageExistsP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(walSegment)));
647 
648                 // Check for errors or missing files. For archive-get ok indicates that the process succeeded but there is no WAL
649                 // file to download, or that there was a warning. Do not error on the first run so the async process can be spawned
650                 // to correct any errors from a previous run. Do not warn on the first run if the segment was not found so the async
651                 // process can be spawned to check for the file again.
652                 if (archiveAsyncStatus(archiveModeGet, walSegment, !first, found || !first))
653                 {
654                     storageRemoveP(
655                         storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s" STATUS_EXT_OK, strZ(walSegment)),
656                         .errorOnMissing = true);
657 
658                     // Break if an ok file was found but no segment exists, which means the segment was missing. However, don't
659                     // break if this is the first time through the loop since this means the ok file was written by an async process
660                     // spawned by a prior archive-get execution, which means we should spawn the async process again to see if the
661                     // file exists now. This also prevents spool files from a previous recovery interfering with the current
662                     // recovery.
663                     if (!found && !first)
664                     {
665                         foundOk = true;
666                         break;
667                     }
668                 }
669 
670                 // If found then move the WAL segment to the destination directory
671                 if (found)
672                 {
673                     // Source is the WAL segment in the spool queue
674                     StorageRead *source = storageNewReadP(
675                         storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(walSegment)));
676 
677                     // A move will be attempted but if the spool queue and the WAL path are on different file systems then a copy
678                     // will be performed instead.
679                     //
680                     // It looks scary that we are disabling syncs and atomicity (in case we need to copy instead of move) but this
681                     // is safe because if the system crashes Postgres will not try to reuse a restored WAL segment but will instead
682                     // request it again using the restore_command. In the case of a move this hardly matters since path syncs are
683                     // cheap but if a copy is required we could save a lot of writes.
684                     StorageWrite *destination = storageNewWriteP(
685                         storageLocalWrite(), walDestination, .noCreatePath = true, .noSyncFile = true, .noSyncPath = true,
686                         .noAtomic = true);
687 
688                     // Move (or copy if required) the file
689                     storageMoveP(storageSpoolWrite(), source, destination);
690 
691                     // Return success
692                     LOG_INFO_FMT(FOUND_IN_ARCHIVE_MSG " asynchronously", strZ(walSegment));
693                     result = 0;
694 
695                     // Get a list of WAL segments left in the queue
696                     StringList *queue = storageListP(
697                         storageSpool(), STORAGE_SPOOL_ARCHIVE_IN_STR, .expression = WAL_SEGMENT_REGEXP_STR, .errorOnMissing = true);
698 
699                     if (!strLstEmpty(queue))
700                     {
701                         // Get size of the WAL segment
702                         uint64_t walSegmentSize = storageInfoP(storageLocal(), walDestination).size;
703 
704                         // Use WAL segment size to estimate queue size and determine if the async process should be launched
705                         queueFull = strLstSize(queue) * walSegmentSize > cfgOptionUInt64(cfgOptArchiveGetQueueMax) / 2;
706                     }
707                 }
708 
709                 // If the WAL segment has not already been found then start the async process to get it.  There's no point in
710                 // forking the async process off more than once so track that as well.  Use an archive lock to prevent forking if
711                 // the async process was launched by another process.
712                 if (!forked && (!found || !queueFull)  &&
713                     lockAcquire(
714                         cfgOptionStr(cfgOptLockPath), cfgOptionStr(cfgOptStanza), cfgOptionStr(cfgOptExecId), cfgLockType(), 0,
715                         false))
716                 {
717                     // Get control info
718                     PgControl pgControl = pgControlFromFile(storagePg());
719 
720                     // Create the queue
721                     storagePathCreateP(storageSpoolWrite(), STORAGE_SPOOL_ARCHIVE_IN_STR);
722 
723                     // The async process should not output on the console at all
724                     KeyValue *optionReplace = kvNew();
725 
726                     kvPut(optionReplace, VARSTRDEF(CFGOPT_LOG_LEVEL_CONSOLE), VARSTRDEF("off"));
727                     kvPut(optionReplace, VARSTRDEF(CFGOPT_LOG_LEVEL_STDERR), VARSTRDEF("off"));
728 
729                     // Generate command options
730                     StringList *commandExec = cfgExecParam(cfgCmdArchiveGet, cfgCmdRoleAsync, optionReplace, true, false);
731                     strLstInsert(commandExec, 0, cfgExe());
732 
733                     // Clean the current queue using the list of WAL that we ideally want in the queue.  queueNeed()
734                     // will return the list of WAL needed to fill the queue and this will be passed to the async process.
735                     const StringList *queue = queueNeed(
736                         walSegment, found, cfgOptionUInt64(cfgOptArchiveGetQueueMax), pgControl.walSegmentSize,
737                         pgControl.version);
738 
739                     for (unsigned int queueIdx = 0; queueIdx < strLstSize(queue); queueIdx++)
740                         strLstAdd(commandExec, strLstGet(queue, queueIdx));
741 
742                     // Clear errors for the current wal segment
743                     archiveAsyncErrorClear(archiveModeGet, walSegment);
744 
745                     // Release the lock so the child process can acquire it
746                     lockRelease(true);
747 
748                     // Execute the async process
749                     archiveAsyncExec(archiveModeGet, commandExec);
750 
751                     // Mark the async process as forked so it doesn't get forked again.  A single run of the async process should be
752                     // enough to do the job, running it again won't help anything.
753                     forked = true;
754                 }
755 
756                 // Exit loop if WAL was found
757                 if (found)
758                     break;
759 
760                 // No longer the first run, so errors will be thrown and missing files will be reported
761                 first = false;
762             }
763             while (waitMore(wait));
764 
765             // If the WAL segment was not found
766             if (!found)
767             {
768                 // If no ok file was found then something may be wrong with the async process. It's better to throw an error here
769                 // than report not found for debugging purposes. Either way PostgreSQL will halt if it has not reached consistency.
770                 if (!foundOk)
771                 {
772                     THROW_FMT(
773                         ArchiveTimeoutError, "unable to get WAL file '%s' from the archive asynchronously after %s second(s)",
774                         strZ(walSegment), strZ(strNewDbl((double)cfgOptionInt64(cfgOptArchiveTimeout) / MSEC_PER_SEC)));
775                 }
776                 // Else report that the WAL segment could not be found
777                 else
778                     LOG_INFO_FMT(UNABLE_TO_FIND_IN_ARCHIVE_MSG " asynchronously", strZ(walSegment));
779             }
780         }
781         // Else perform synchronous get
782         else
783         {
784             // Check for the archive file
785             StringList *archiveRequestList = strLstNew();
786             strLstAdd(archiveRequestList, walSegment);
787 
788             ArchiveGetCheckResult checkResult = archiveGetCheck(archiveRequestList);
789 
790             // If there was an error then throw it
791             if (checkResult.errorType != NULL)
792                 THROW_CODE(errorTypeCode(checkResult.errorType), strZ(checkResult.errorMessage));
793 
794             // Get the archive file
795             if (!lstEmpty(checkResult.archiveFileMapList))
796             {
797                 // There can only be one file mapping since only one file was requested
798                 ASSERT(lstSize(checkResult.archiveFileMapList) == 1);
799                 const ArchiveFileMap *fileMap = lstGet(checkResult.archiveFileMapList, 0);
800 
801                 // Get the file
802                 ArchiveGetFileResult fileResult = archiveGetFile(
803                     storageLocalWrite(), fileMap->request, fileMap->actualList, walDestination);
804 
805                 // Output file warnings
806                 for (unsigned int warnIdx = 0; warnIdx < strLstSize(fileResult.warnList); warnIdx++)
807                     LOG_WARN(strZ(strLstGet(fileResult.warnList, warnIdx)));
808 
809                 // If there was no error then the file existed
810                 ArchiveGetFile *file = lstGet(fileMap->actualList, fileResult.actualIdx);
811                 ASSERT(file != NULL);
812 
813                 LOG_INFO_FMT(
814                     FOUND_IN_REPO_ARCHIVE_MSG, strZ(walSegment), cfgOptionGroupIdxToKey(cfgOptGrpRepo, file->repoIdx),
815                     strZ(file->archiveId));
816 
817                 result = 0;
818             }
819             // Else log that the file was not found
820             else
821                 LOG_INFO_FMT(UNABLE_TO_FIND_IN_ARCHIVE_MSG, strZ(walSegment));
822         }
823     }
824     MEM_CONTEXT_TEMP_END();
825 
826     FUNCTION_LOG_RETURN(INT, result);
827 }
828 
829 /**********************************************************************************************************************************/
830 typedef struct ArchiveGetAsyncData
831 {
832     const List *const archiveFileMapList;                           // List of wal segments to process
833     unsigned int archiveFileIdx;                                    // Current index in the list to be processed
834 } ArchiveGetAsyncData;
835 
archiveGetAsyncCallback(void * data,unsigned int clientIdx)836 static ProtocolParallelJob *archiveGetAsyncCallback(void *data, unsigned int clientIdx)
837 {
838     FUNCTION_TEST_BEGIN();
839         FUNCTION_TEST_PARAM_P(VOID, data);
840         FUNCTION_TEST_PARAM(UINT, clientIdx);
841     FUNCTION_TEST_END();
842 
843     ProtocolParallelJob *result = NULL;
844 
845     MEM_CONTEXT_TEMP_BEGIN()
846     {
847         // No special logic based on the client, we'll just get the next job
848         (void)clientIdx;
849 
850         // Get a new job if there are any left
851         ArchiveGetAsyncData *jobData = data;
852 
853         if (jobData->archiveFileIdx < lstSize(jobData->archiveFileMapList))
854         {
855             const ArchiveFileMap *archiveFileMap = lstGet(jobData->archiveFileMapList, jobData->archiveFileIdx);
856             jobData->archiveFileIdx++;
857 
858             ProtocolCommand *const command = protocolCommandNew(PROTOCOL_COMMAND_ARCHIVE_GET_FILE);
859             PackWrite *const param = protocolCommandParam(command);
860 
861             pckWriteStrP(param, archiveFileMap->request);
862 
863             // Add actual files to get
864             for (unsigned int actualIdx = 0; actualIdx < lstSize(archiveFileMap->actualList); actualIdx++)
865             {
866                 const ArchiveGetFile *const actual = lstGet(archiveFileMap->actualList, actualIdx);
867 
868                 pckWriteStrP(param, actual->file);
869                 pckWriteU32P(param, actual->repoIdx);
870                 pckWriteStrP(param, actual->archiveId);
871                 pckWriteU64P(param, actual->cipherType);
872                 pckWriteStrP(param, actual->cipherPassArchive);
873             }
874 
875             MEM_CONTEXT_PRIOR_BEGIN()
876             {
877                 result = protocolParallelJobNew(VARSTR(archiveFileMap->request), command);
878             }
879             MEM_CONTEXT_PRIOR_END();
880         }
881     }
882     MEM_CONTEXT_TEMP_END();
883 
884     FUNCTION_TEST_RETURN(result);
885 }
886 
887 void
cmdArchiveGetAsync(void)888 cmdArchiveGetAsync(void)
889 {
890     FUNCTION_LOG_VOID(logLevelDebug);
891 
892     MEM_CONTEXT_TEMP_BEGIN()
893     {
894         TRY_BEGIN()
895         {
896             // PostgreSQL must be local
897             pgIsLocalVerify();
898 
899             // Check the parameters
900             if (strLstSize(cfgCommandParam()) < 1)
901                 THROW(ParamInvalidError, "at least one wal segment is required");
902 
903             LOG_INFO_FMT(
904                 "get %u WAL file(s) from archive: %s%s",
905                 strLstSize(cfgCommandParam()), strZ(strLstGet(cfgCommandParam(), 0)),
906                 strLstSize(cfgCommandParam()) == 1 ?
907                     "" : strZ(strNewFmt("...%s", strZ(strLstGet(cfgCommandParam(), strLstSize(cfgCommandParam()) - 1)))));
908 
909             // Check for archive files
910             ArchiveGetCheckResult checkResult = archiveGetCheck(cfgCommandParam());
911 
912             // If any files are missing get the first one (used to construct the "unable to find" warning)
913             const String *archiveFileMissing = NULL;
914 
915             if (lstSize(checkResult.archiveFileMapList) < strLstSize(cfgCommandParam()))
916                 archiveFileMissing = strLstGet(cfgCommandParam(), lstSize(checkResult.archiveFileMapList));
917 
918             // Get archive files that were found
919             if (!lstEmpty(checkResult.archiveFileMapList))
920             {
921                 // Create the parallel executor
922                 ArchiveGetAsyncData jobData = {.archiveFileMapList = checkResult.archiveFileMapList};
923 
924                 ProtocolParallel *parallelExec = protocolParallelNew(
925                     cfgOptionUInt64(cfgOptProtocolTimeout) / 2, archiveGetAsyncCallback, &jobData);
926 
927                 for (unsigned int processIdx = 1; processIdx <= cfgOptionUInt(cfgOptProcessMax); processIdx++)
928                     protocolParallelClientAdd(parallelExec, protocolLocalGet(protocolStorageTypeRepo, 0, processIdx));
929 
930                 // Process jobs
931                 MEM_CONTEXT_TEMP_RESET_BEGIN()
932                 {
933                     do
934                     {
935                         unsigned int completed = protocolParallelProcess(parallelExec);
936 
937                         for (unsigned int jobIdx = 0; jobIdx < completed; jobIdx++)
938                         {
939                             // Get the job
940                             ProtocolParallelJob *job = protocolParallelResult(parallelExec);
941                             unsigned int processId = protocolParallelJobProcessId(job);
942 
943                             // Get wal segment name and archive file map
944                             const String *walSegment = varStr(protocolParallelJobKey(job));
945                             const ArchiveFileMap *fileMap = lstFind(checkResult.archiveFileMapList, &walSegment);
946                             ASSERT(fileMap != NULL);
947 
948                             // Build warnings for status file
949                             String *warning = strNew();
950 
951                             if (!strLstEmpty(fileMap->warnList))
952                                 strCatFmt(warning, "%s", strZ(strLstJoin(fileMap->warnList, "\n")));
953 
954                             // The job was successful
955                             if (protocolParallelJobErrorCode(job) == 0)
956                             {
957                                 // Get the actual file retrieved
958                                 PackRead *const fileResult = protocolParallelJobResult(job);
959                                 ArchiveGetFile *file = lstGet(fileMap->actualList, pckReadU32P(fileResult));
960                                 ASSERT(file != NULL);
961 
962                                 // Output file warnings
963                                 StringList *fileWarnList = pckReadStrLstP(fileResult);
964 
965                                 for (unsigned int warnIdx = 0; warnIdx < strLstSize(fileWarnList); warnIdx++)
966                                     LOG_WARN_PID(processId, strZ(strLstGet(fileWarnList, warnIdx)));
967 
968                                 // Build file warnings for status file
969                                 if (!strLstEmpty(fileWarnList))
970                                 {
971                                     strCatFmt(
972                                         warning, "%s%s", strSize(warning) == 0 ? "" : "\n", strZ(strLstJoin(fileWarnList, "\n")));
973                                 }
974 
975                                 if (strSize(warning) != 0)
976                                     archiveAsyncStatusOkWrite(archiveModeGet, walSegment, warning);
977 
978                                 LOG_DETAIL_PID_FMT(
979                                     processId, FOUND_IN_REPO_ARCHIVE_MSG, strZ(walSegment),
980                                     cfgOptionGroupIdxToKey(cfgOptGrpRepo, file->repoIdx), strZ(file->archiveId));
981 
982                                 // Rename temp WAL segment to actual name. This is done after the ok file is written so the ok file
983                                 // is guaranteed to exist before the foreground process finds the WAL segment.
984                                 storageMoveP(
985                                     storageSpoolWrite(),
986                                     storageNewReadP(
987                                         storageSpool(),
988                                         strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s." STORAGE_FILE_TEMP_EXT, strZ(walSegment))),
989                                     storageNewWriteP(
990                                         storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(walSegment))));
991                             }
992                             // Else the job errored
993                             else
994                             {
995                                 LOG_WARN_PID_FMT(
996                                     processId, "[%s] %s", errorTypeName(errorTypeFromCode(protocolParallelJobErrorCode(job))),
997                                     strZ(protocolParallelJobErrorMessage(job)));
998 
999                                 archiveAsyncStatusErrorWrite(
1000                                     archiveModeGet, walSegment, protocolParallelJobErrorCode(job),
1001                                     strNewFmt(
1002                                         "%s%s", strZ(protocolParallelJobErrorMessage(job)),
1003                                         strSize(warning) == 0 ? "" : strZ(strNewFmt("\n%s", strZ(warning)))));
1004                             }
1005 
1006                             protocolParallelJobFree(job);
1007                         }
1008 
1009                         // Reset the memory context occasionally so we don't use too much memory or slow down processing
1010                         MEM_CONTEXT_TEMP_RESET(1000);
1011                     }
1012                     while (!protocolParallelDone(parallelExec));
1013                 }
1014                 MEM_CONTEXT_TEMP_END();
1015             }
1016 
1017             // Log an error from archiveGetCheck() after any existing files have been fetched. This ordering is important because we
1018             // need to fetch as many valid files as possible before throwing an error.
1019             if (checkResult.errorType != NULL)
1020             {
1021                 LOG_WARN_FMT("[%s] %s", errorTypeName(checkResult.errorType), strZ(checkResult.errorMessage));
1022 
1023                 String *message = strDup(checkResult.errorMessage);
1024 
1025                 if (!strLstEmpty(checkResult.warnList))
1026                     strCatFmt(message, "\n%s", strZ(strLstJoin(checkResult.warnList, "\n")));
1027 
1028                 archiveAsyncStatusErrorWrite(
1029                     archiveModeGet, checkResult.errorFile, errorTypeCode(checkResult.errorType), message);
1030             }
1031             // If any files were missing write an ok file for the first missing file and add any warnings. It is important that this
1032             // happen right before the async process exits so the main process can immediately respawn the async process to retry
1033             // missing files.
1034             else if (archiveFileMissing != NULL)
1035             {
1036                 LOG_DETAIL_FMT(UNABLE_TO_FIND_IN_ARCHIVE_MSG, strZ(archiveFileMissing));
1037 
1038                 String *message = NULL;
1039 
1040                 if (!strLstEmpty(checkResult.warnList))
1041                     message = strLstJoin(checkResult.warnList, "\n");
1042 
1043                 archiveAsyncStatusOkWrite(archiveModeGet, archiveFileMissing, message);
1044             }
1045         }
1046         // On any global error write a single error file to cover all unprocessed files
1047         CATCH_ANY()
1048         {
1049             archiveAsyncStatusErrorWrite(archiveModeGet, NULL, errorCode(), STR(errorMessage()));
1050             RETHROW();
1051         }
1052         TRY_END();
1053     }
1054     MEM_CONTEXT_TEMP_END();
1055 
1056     FUNCTION_LOG_RETURN_VOID();
1057 }
1058