1 /***********************************************************************************************************************************
2 Archive Get Command
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11
12 #include "command/archive/common.h"
13 #include "command/archive/get/file.h"
14 #include "command/archive/get/protocol.h"
15 #include "command/command.h"
16 #include "common/debug.h"
17 #include "common/log.h"
18 #include "common/memContext.h"
19 #include "common/regExp.h"
20 #include "common/wait.h"
21 #include "config/config.h"
22 #include "config/exec.h"
23 #include "info/infoArchive.h"
24 #include "postgres/interface.h"
25 #include "protocol/helper.h"
26 #include "protocol/parallel.h"
27 #include "storage/helper.h"
28 #include "storage/write.intern.h"
29
30 /***********************************************************************************************************************************
31 Constants for log messages that are used multiple times to keep them consistent
32 ***********************************************************************************************************************************/
33 #define FOUND_IN_ARCHIVE_MSG "found %s in the archive"
34 #define FOUND_IN_REPO_ARCHIVE_MSG "found %s in the repo%u: %s archive"
35 #define UNABLE_TO_FIND_IN_ARCHIVE_MSG "unable to find %s in the archive"
36 #define UNABLE_TO_FIND_VALID_REPO_MSG "unable to find a valid repository"
37 #define REPO_INVALID_OR_ERR_MSG "some repositories were invalid or encountered errors"
38
39 /***********************************************************************************************************************************
40 Check for a list of archive files in the repository
41 ***********************************************************************************************************************************/
42 typedef struct ArchiveFileMap
43 {
44 const String *request; // Archive file requested by archive_command
45 List *actualList; // Actual files in various repos/archiveIds
46 StringList *warnList; // Warnings that need to be reported by the async process
47 } ArchiveFileMap;
48
49 typedef struct ArchiveGetCheckResult
50 {
51 List *archiveFileMapList; // List of mapped archive files, i.e. found in the repo
52
53 // Global error that affects all repos
54 const ErrorType *errorType; // Error type if there was an error
55 const String *errorFile; // Error file if there was an error
56 const String *errorMessage; // Error message if there was an error
57 const StringList *warnList; // Warnings that need to be reported by the async process
58 } ArchiveGetCheckResult;
59
60 // Helper to add an error to an error list and warn if the error is not already in the list
61 static void
archiveGetErrorAdd(StringList * warnList,bool log,unsigned int repoIdx,const ErrorType * type,const String * message)62 archiveGetErrorAdd(StringList *warnList, bool log, unsigned int repoIdx, const ErrorType *type, const String *message)
63 {
64 const String *warn = strNewFmt(
65 "repo%u: [%s] %s", cfgOptionGroupIdxToKey(cfgOptGrpRepo, repoIdx), errorTypeName(type), strZ(message));
66
67 if (!strLstExists(warnList, warn))
68 {
69 if (log)
70 LOG_WARN(strZ(warn));
71
72 strLstAdd(warnList, warn);
73 }
74 }
75
76 // Helper to find a single archive file in the repository using a cache to speed up the process and minimize storageListP() calls
77 typedef struct ArchiveGetFindCachePath
78 {
79 const String *path; // Cached path in the archiveId
80 const StringList *fileList; // List of files in the cache path
81 } ArchiveGetFindCachePath;
82
83 typedef struct ArchiveGetFindCacheArchive
84 {
85 const String *archiveId; // ArchiveId in the repo
86 List *pathList; // List of paths cached for archiveId
87 } ArchiveGetFindCacheArchive;
88
89 typedef struct ArchiveGetFindCacheRepo
90 {
91 unsigned int repoIdx;
92 CipherType cipherType; // Repo cipher type
93 const String *cipherPassArchive; // Repo archive cipher pass
94 List *archiveList; // Cached list of archiveIds and associated paths
95 StringList *warnList; // Track repo warnings so each is only reported once
96 } ArchiveGetFindCacheRepo;
97
98 static bool
archiveGetFind(const String * archiveFileRequest,ArchiveGetCheckResult * getCheckResult,List * cacheRepoList,const StringList * warnList,bool single)99 archiveGetFind(
100 const String *archiveFileRequest, ArchiveGetCheckResult *getCheckResult, List *cacheRepoList, const StringList *warnList,
101 bool single)
102 {
103 FUNCTION_LOG_BEGIN(logLevelDebug);
104 FUNCTION_LOG_PARAM(STRING, archiveFileRequest);
105 FUNCTION_LOG_PARAM_P(VOID, getCheckResult);
106 FUNCTION_LOG_PARAM(LIST, cacheRepoList);
107 FUNCTION_LOG_PARAM(STRING_LIST, warnList);
108 FUNCTION_LOG_PARAM(BOOL, single);
109 FUNCTION_LOG_END();
110
111 ASSERT(archiveFileRequest != NULL);
112 ASSERT(getCheckResult != NULL);
113 ASSERT(cacheRepoList != NULL);
114
115 bool result = false;
116
117 MEM_CONTEXT_TEMP_BEGIN()
118 {
119 // Is the archive file a WAL segment?
120 bool isSegment = walIsSegment(archiveFileRequest);
121
122 // Get the WAL segment path
123 const String *path = isSegment ? strSubN(archiveFileRequest, 0, 16) : NULL;
124
125 // List to hold matches for the requested file
126 List *matchList = lstNewP(sizeof(ArchiveGetFile));
127
128 // List of file level warnings
129 StringList *fileWarnList = strLstDup(warnList);
130
131 // Errored repo total to track if all repos errored
132 unsigned int repoErrorTotal = 0;
133
134 // Check each repo
135 for (unsigned int repoCacheIdx = 0; repoCacheIdx < lstSize(cacheRepoList); repoCacheIdx++)
136 {
137 ArchiveGetFindCacheRepo *cacheRepo = lstGet(cacheRepoList, repoCacheIdx);
138
139 TRY_BEGIN()
140 {
141 // Check each archiveId
142 for (unsigned int archiveCacheIdx = 0; archiveCacheIdx < lstSize(cacheRepo->archiveList); archiveCacheIdx++)
143 {
144 ArchiveGetFindCacheArchive *cacheArchive = lstGet(cacheRepo->archiveList, archiveCacheIdx);
145
146 // If a WAL segment then search among the possible file names
147 if (isSegment)
148 {
149 StringList *segmentList = NULL;
150
151 // If a single file is requested then optimize by adding a restrictive expression to reduce bandwidth
152 if (single)
153 {
154 segmentList = storageListP(
155 storageRepoIdx(cacheRepo->repoIdx),
156 strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(cacheArchive->archiveId), strZ(path)),
157 .expression = strNewFmt(
158 "^%s%s-[0-f]{40}" COMPRESS_TYPE_REGEXP "{0,1}$", strZ(strSubN(archiveFileRequest, 0, 24)),
159 walIsPartial(archiveFileRequest) ? WAL_SEGMENT_PARTIAL_EXT : ""));
160 }
161 // Else multiple files will be requested so cache list results
162 else
163 {
164 // Partial files cannot be in a list with multiple requests
165 ASSERT(!walIsPartial(archiveFileRequest));
166
167 // If the path does not exist in the cache then fetch it
168 const ArchiveGetFindCachePath *cachePath = lstFind(cacheArchive->pathList, &path);
169
170 if (cachePath == NULL)
171 {
172 MEM_CONTEXT_BEGIN(lstMemContext(cacheArchive->pathList))
173 {
174 cachePath = lstAdd(
175 cacheArchive->pathList,
176 &(ArchiveGetFindCachePath)
177 {
178 .path = strDup(path),
179 .fileList = storageListP(
180 storageRepoIdx(cacheRepo->repoIdx),
181 strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(cacheArchive->archiveId), strZ(path)),
182 .expression = strNewFmt(
183 "^%s[0-F]{8}-[0-f]{40}" COMPRESS_TYPE_REGEXP "{0,1}$", strZ(path))),
184 });
185 }
186 MEM_CONTEXT_END();
187 }
188
189 // Get a list of all WAL segments that match
190 segmentList = strLstNew();
191
192 for (unsigned int fileIdx = 0; fileIdx < strLstSize(cachePath->fileList); fileIdx++)
193 {
194 if (strBeginsWith(strLstGet(cachePath->fileList, fileIdx), archiveFileRequest))
195 strLstAdd(segmentList, strLstGet(cachePath->fileList, fileIdx));
196 }
197 }
198
199 // Add segments to match list
200 for (unsigned int segmentIdx = 0; segmentIdx < strLstSize(segmentList); segmentIdx++)
201 {
202 MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
203 {
204 lstAdd(
205 matchList,
206 &(ArchiveGetFile)
207 {
208 .file = strNewFmt(
209 "%s/%s/%s", strZ(cacheArchive->archiveId), strZ(path),
210 strZ(strLstGet(segmentList, segmentIdx))),
211 .repoIdx = cacheRepo->repoIdx,
212 .archiveId = cacheArchive->archiveId,
213 .cipherType = cacheRepo->cipherType,
214 .cipherPassArchive = cacheRepo->cipherPassArchive,
215 });
216 }
217 MEM_CONTEXT_END();
218 }
219 }
220 // Else if not a WAL segment, see if it exists in the archiveId path
221 else if (storageExistsP(
222 storageRepoIdx(cacheRepo->repoIdx), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(cacheArchive->archiveId),
223 strZ(archiveFileRequest))))
224 {
225 MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
226 {
227 lstAdd(
228 matchList,
229 &(ArchiveGetFile)
230 {
231 .file = strNewFmt("%s/%s", strZ(cacheArchive->archiveId), strZ(archiveFileRequest)),
232 .repoIdx = cacheRepo->repoIdx,
233 .archiveId = cacheArchive->archiveId,
234 .cipherType = cacheRepo->cipherType,
235 .cipherPassArchive = cacheRepo->cipherPassArchive,
236 });
237 }
238 MEM_CONTEXT_END();
239 }
240 }
241 }
242 // Log errors as warnings and continue
243 CATCH_ANY()
244 {
245 repoErrorTotal++;
246 archiveGetErrorAdd(cacheRepo->warnList, true, cacheRepo->repoIdx, errorType(), STR(errorMessage()));
247 archiveGetErrorAdd(fileWarnList, false, cacheRepo->repoIdx, errorType(), STR(errorMessage()));
248 }
249 TRY_END();
250 }
251
252 // If all repos errored out then set the global error since processing cannot continue past this segment
253 ASSERT(repoErrorTotal <= lstSize(cacheRepoList));
254
255 if (repoErrorTotal == lstSize(cacheRepoList))
256 {
257 ASSERT(!strLstEmpty(fileWarnList));
258
259 MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
260 {
261 getCheckResult->errorType = &RepoInvalidError;
262 getCheckResult->errorFile = strDup(archiveFileRequest);
263 getCheckResult->errorMessage = strNewZ(UNABLE_TO_FIND_VALID_REPO_MSG);
264 getCheckResult->warnList = strLstMove(fileWarnList, memContextCurrent());
265 }
266 MEM_CONTEXT_END();
267
268 }
269 // Else if a file was found
270 else if (!lstEmpty(matchList))
271 {
272 bool error = false;
273
274 // If a segment match list is > 1 then check for duplicates
275 if (isSegment && lstSize(matchList) > 1)
276 {
277 // Count the number of unique hashes
278 StringList *hashList = strLstNew();
279
280 for (unsigned int matchIdx = 0; matchIdx < lstSize(matchList); matchIdx++)
281 strLstAddIfMissing(hashList, strSubN(((ArchiveGetFile *)lstGet(matchList, matchIdx))->file, 25, 40));
282
283 // If there is more than one unique hash then there are duplicates
284 if (strLstSize(hashList) > 1)
285 {
286 // Build list of duplicates
287 unsigned int repoKeyLast = 0;
288 String *message = strNew();
289 bool first = true;
290
291 for (unsigned int matchIdx = 0; matchIdx < lstSize(matchList); matchIdx++)
292 {
293 ArchiveGetFile *file = lstGet(matchList, matchIdx);
294 unsigned int repoKey = cfgOptionGroupIdxToKey(cfgOptGrpRepo, file->repoIdx);
295
296 if (repoKey != repoKeyLast)
297 {
298 strCatFmt(message, "\nrepo%u:", repoKey);
299 repoKeyLast = repoKey;
300 first = true;
301 }
302
303 if (first)
304 first = false;
305 else
306 strCatChr(message, ',');
307
308 strCatFmt(message, " %s", strZ(file->file));
309 }
310
311 // Set as global error since processing cannot continue past this segment
312 MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
313 {
314 getCheckResult->errorType = &ArchiveDuplicateError;
315 getCheckResult->errorFile = strDup(archiveFileRequest);
316 getCheckResult->errorMessage = strNewFmt(
317 "duplicates found for WAL segment %s:%s\n"
318 "HINT: are multiple primaries archiving to this stanza?",
319 strZ(archiveFileRequest), strZ(message));
320 getCheckResult->warnList = strLstMove(fileWarnList, memContextCurrent());
321 }
322 MEM_CONTEXT_END();
323
324 error = true;
325 }
326 }
327
328 // Files are valid so add them to the map
329 if (!error)
330 {
331 MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
332 {
333 ArchiveFileMap map =
334 {
335 .request = strDup(archiveFileRequest),
336 .actualList = lstNewP(sizeof(ArchiveGetFile)),
337 .warnList = strLstMove(fileWarnList, memContextCurrent()),
338 };
339
340 for (unsigned int matchIdx = 0; matchIdx < lstSize(matchList); matchIdx++)
341 lstAdd(map.actualList, lstGet(matchList, matchIdx));
342
343 lstAdd(getCheckResult->archiveFileMapList, &map);
344 }
345 MEM_CONTEXT_END();
346
347 result = true;
348 }
349 }
350 }
351 MEM_CONTEXT_TEMP_END();
352
353 FUNCTION_LOG_RETURN(BOOL, result);
354 }
355
356 static ArchiveGetCheckResult
archiveGetCheck(const StringList * archiveRequestList)357 archiveGetCheck(const StringList *archiveRequestList)
358 {
359 FUNCTION_LOG_BEGIN(logLevelDebug);
360 FUNCTION_LOG_PARAM(STRING_LIST, archiveRequestList);
361 FUNCTION_LOG_END();
362
363 ASSERT(archiveRequestList != NULL);
364 ASSERT(!strLstEmpty(archiveRequestList));
365
366 ArchiveGetCheckResult result = {.archiveFileMapList = lstNewP(sizeof(ArchiveFileMap), .comparator = lstComparatorStr)};
367
368 MEM_CONTEXT_TEMP_BEGIN()
369 {
370 // List of warnings
371 StringList *warnList = strLstNew();
372
373 // Get pg control info
374 PgControl controlInfo = pgControlFromFile(storagePg());
375
376 // Build list of repos/archiveIds where WAL may be found
377 List *cacheRepoList = lstNewP(sizeof(ArchiveGetFindCacheRepo));
378
379 for (unsigned int repoIdx = 0; repoIdx < cfgOptionGroupIdxTotal(cfgOptGrpRepo); repoIdx++)
380 {
381 // If a repo was specified then skip all other repos
382 if (cfgOptionTest(cfgOptRepo) && cfgOptionUInt(cfgOptRepo) != cfgOptionGroupIdxToKey(cfgOptGrpRepo, repoIdx))
383 continue;
384
385 TRY_BEGIN()
386 {
387 // Get the repo storage in case it is remote and encryption settings need to be pulled down
388 storageRepoIdx(repoIdx);
389
390 ArchiveGetFindCacheRepo cacheRepo =
391 {
392 .repoIdx = repoIdx,
393 .cipherType = cfgOptionIdxStrId(cfgOptRepoCipherType, repoIdx),
394 .archiveList = lstNewP(sizeof(ArchiveGetFindCacheArchive)),
395 .warnList = strLstNew(),
396 };
397
398 // Attempt to load the archive info file
399 InfoArchive *info = infoArchiveLoadFile(
400 storageRepoIdx(repoIdx), INFO_ARCHIVE_PATH_FILE_STR, cacheRepo.cipherType,
401 cfgOptionIdxStrNull(cfgOptRepoCipherPass, repoIdx));
402
403 // Copy cipher pass into the result list context once rather than making a copy per candidate file later
404 MEM_CONTEXT_BEGIN(lstMemContext(result.archiveFileMapList))
405 {
406 cacheRepo.cipherPassArchive = strDup(infoArchiveCipherPass(info));
407 }
408 MEM_CONTEXT_END();
409
410 // Loop through pg history and determine which archiveIds to use
411 StringList *archivePathList = NULL;
412
413 for (unsigned int pgIdx = 0; pgIdx < infoPgDataTotal(infoArchivePg(info)); pgIdx++)
414 {
415 InfoPgData pgData = infoPgData(infoArchivePg(info), pgIdx);
416
417 // Only use the archive id if it matches the current cluster
418 if (pgData.systemId == controlInfo.systemId && pgData.version == controlInfo.version)
419 {
420 const String *archiveId = infoPgArchiveId(infoArchivePg(info), pgIdx);
421 bool found = true;
422
423 // If the archiveId is in the past make sure the path exists
424 if (pgIdx != 0)
425 {
426 // Get list of archiveId paths in the archive path
427 if (archivePathList == NULL)
428 archivePathList = storageListP(storageRepoIdx(repoIdx), STORAGE_REPO_ARCHIVE_STR);
429
430 if (!strLstExists(archivePathList, archiveId))
431 found = false;
432 }
433
434 // If the archiveId is most recent or has files then add it
435 if (found)
436 {
437 ArchiveGetFindCacheArchive cacheArchive =
438 {
439 .pathList = lstNewP(sizeof(ArchiveGetFindCachePath), .comparator = lstComparatorStr),
440 };
441
442 // Copy archiveId into the result list context once rather than making a copy per candidate file later
443 MEM_CONTEXT_BEGIN(lstMemContext(result.archiveFileMapList))
444 {
445 cacheArchive.archiveId = strDup(archiveId);
446 }
447 MEM_CONTEXT_END();
448
449 lstAdd(cacheRepo.archiveList, &cacheArchive);
450 }
451 }
452 }
453
454 // Error if no archive id was found -- this indicates a mismatch with the current cluster
455 if (lstEmpty(cacheRepo.archiveList))
456 {
457 archiveGetErrorAdd(
458 warnList, true, repoIdx, &ArchiveMismatchError,
459 strNewFmt(
460 "unable to retrieve the archive id for database version '%s' and system-id '%" PRIu64 "'",
461 strZ(pgVersionToStr(controlInfo.version)), controlInfo.systemId));
462 }
463 // Else add repo to list
464 else
465 lstAdd(cacheRepoList, &cacheRepo);
466 }
467 // Log errors as warnings and continue
468 CATCH_ANY()
469 {
470 archiveGetErrorAdd(warnList, true, repoIdx, errorType(), STR(errorMessage()));
471 }
472 TRY_END();
473 }
474
475 // Error if there are no repos to check
476 if (lstEmpty(cacheRepoList))
477 {
478 ASSERT(!strLstEmpty(warnList));
479
480 // Set as global error since processing cannot continue past this segment
481 MEM_CONTEXT_BEGIN(lstMemContext(result.archiveFileMapList))
482 {
483 result.errorType = &RepoInvalidError;
484 result.errorMessage = strNewZ(UNABLE_TO_FIND_VALID_REPO_MSG);
485 result.warnList = strLstMove(warnList, memContextCurrent());
486 }
487 MEM_CONTEXT_END();
488 }
489 else
490 {
491 // Any remaining errors will be reported as warnings since at least one repo is valid
492 MEM_CONTEXT_BEGIN(lstMemContext(result.archiveFileMapList))
493 {
494 result.warnList = strLstMove(warnList, memContextCurrent());
495 }
496 MEM_CONTEXT_END();
497
498 // Find files in the list
499 for (unsigned int archiveRequestIdx = 0; archiveRequestIdx < strLstSize(archiveRequestList); archiveRequestIdx++)
500 {
501 if (!archiveGetFind(
502 strLstGet(archiveRequestList, archiveRequestIdx), &result, cacheRepoList, warnList,
503 strLstSize(archiveRequestList) == 1))
504 {
505 break;
506 }
507 }
508
509 // Sort the list to make searching for files faster
510 lstSort(result.archiveFileMapList, sortOrderAsc);
511 }
512 }
513 MEM_CONTEXT_TEMP_END();
514
515 FUNCTION_LOG_RETURN_STRUCT(result);
516 }
517
518 /***********************************************************************************************************************************
519 Clean the queue and prepare a list of WAL segments that the async process should get
520 ***********************************************************************************************************************************/
521 static StringList *
queueNeed(const String * walSegment,bool found,uint64_t queueSize,size_t walSegmentSize,unsigned int pgVersion)522 queueNeed(const String *walSegment, bool found, uint64_t queueSize, size_t walSegmentSize, unsigned int pgVersion)
523 {
524 FUNCTION_LOG_BEGIN(logLevelDebug);
525 FUNCTION_LOG_PARAM(STRING, walSegment);
526 FUNCTION_LOG_PARAM(BOOL, found);
527 FUNCTION_LOG_PARAM(UINT64, queueSize);
528 FUNCTION_LOG_PARAM(SIZE, walSegmentSize);
529 FUNCTION_LOG_PARAM(UINT, pgVersion);
530 FUNCTION_LOG_END();
531
532 ASSERT(walSegment != NULL);
533
534 StringList *result = strLstNew();
535
536 MEM_CONTEXT_TEMP_BEGIN()
537 {
538 // Determine the first WAL segment for the async process to get. If the WAL segment requested by
539 // PostgreSQL was not found then use that. If the segment was found but the queue is not full then
540 // start with the next segment.
541 const String *walSegmentFirst =
542 found ? walSegmentNext(walSegment, walSegmentSize, pgVersion) : walSegment;
543
544 // Determine how many WAL segments should be in the queue. The queue total must be at least 2 or it doesn't make sense to
545 // have async turned on at all.
546 unsigned int walSegmentQueueTotal = (unsigned int)(queueSize / walSegmentSize);
547
548 if (walSegmentQueueTotal < 2)
549 walSegmentQueueTotal = 2;
550
551 // Build the ideal queue -- the WAL segments we want in the queue after the async process has run
552 StringList *idealQueue = strLstSort(
553 walSegmentRange(walSegmentFirst, walSegmentSize, pgVersion, walSegmentQueueTotal), sortOrderAsc);
554
555 // Get the list of files actually in the queue
556 StringList *actualQueue = strLstSort(
557 storageListP(storageSpool(), STORAGE_SPOOL_ARCHIVE_IN_STR, .errorOnMissing = true), sortOrderAsc);
558
559 // Build a list of WAL segments that are being kept so we can later make a list of what is needed
560 StringList *keepQueue = strLstNew();
561
562 for (unsigned int actualQueueIdx = 0; actualQueueIdx < strLstSize(actualQueue); actualQueueIdx++)
563 {
564 // Get file from actual queue
565 const String *file = strLstGet(actualQueue, actualQueueIdx);
566
567 // Does this match a file we want to preserve?
568 if (strLstExists(idealQueue, file))
569 {
570 strLstAdd(keepQueue, file);
571 }
572 // Else delete if it does not match an ok file for a WAL segment that has already been preserved. If an ok file exists
573 // in addition to the segment then it contains warnings which need to be preserved.
574 else if (
575 !strEndsWithZ(file, STATUS_EXT_OK) ||
576 !strLstExists(actualQueue, strSubN(file, 0, strSize(file) - STATUS_EXT_OK_SIZE)))
577 {
578 storageRemoveP(storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(file)), .errorOnMissing = true);
579 }
580 }
581
582 // Generate a list of the WAL that are needed by removing kept WAL from the ideal queue
583 strLstSort(keepQueue, sortOrderAsc);
584
585 for (unsigned int idealQueueIdx = 0; idealQueueIdx < strLstSize(idealQueue); idealQueueIdx++)
586 {
587 if (!strLstExists(keepQueue, strLstGet(idealQueue, idealQueueIdx)))
588 strLstAdd(result, strLstGet(idealQueue, idealQueueIdx));
589 }
590 }
591 MEM_CONTEXT_TEMP_END();
592
593 FUNCTION_LOG_RETURN(STRING_LIST, result);
594 }
595
596 /**********************************************************************************************************************************/
597 int
cmdArchiveGet(void)598 cmdArchiveGet(void)
599 {
600 FUNCTION_LOG_VOID(logLevelDebug);
601
602 // PostgreSQL must be local
603 pgIsLocalVerify();
604
605 // Set the result assuming the archive file will not be found
606 int result = 1;
607
608 MEM_CONTEXT_TEMP_BEGIN()
609 {
610 // Check the parameters
611 const StringList *commandParam = cfgCommandParam();
612
613 if (strLstSize(commandParam) != 2)
614 {
615 if (strLstEmpty(commandParam))
616 THROW(ParamRequiredError, "WAL segment to get required");
617
618 if (strLstSize(commandParam) == 1)
619 THROW(ParamRequiredError, "path to copy WAL segment required");
620
621 THROW(ParamInvalidError, "extra parameters found");
622 }
623
624 // Get the segment name
625 String *walSegment = strBase(strLstGet(commandParam, 0));
626
627 // Destination is wherever we were told to move the WAL segment
628 const String *walDestination =
629 walPath(strLstGet(commandParam, 1), cfgOptionStr(cfgOptPgPath), STR(cfgCommandName()));
630
631 // Async get can only be performed on WAL segments, history or other files must use synchronous mode
632 if (cfgOptionBool(cfgOptArchiveAsync) && walIsSegment(walSegment))
633 {
634 bool first = true; // Is this the first time the loop has run?
635 bool found = false; // Has the WAL segment been found yet?
636 bool foundOk = false; // Was an OK file found which confirms the file was missing?
637 bool queueFull = false; // Is the queue half or more full?
638 bool forked = false; // Has the async process been forked yet?
639
640 // Loop and wait for the WAL segment to be pushed
641 Wait *wait = waitNew(cfgOptionUInt64(cfgOptArchiveTimeout));
642
643 do
644 {
645 // Check if the WAL segment is already in the queue
646 found = storageExistsP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(walSegment)));
647
648 // Check for errors or missing files. For archive-get ok indicates that the process succeeded but there is no WAL
649 // file to download, or that there was a warning. Do not error on the first run so the async process can be spawned
650 // to correct any errors from a previous run. Do not warn on the first run if the segment was not found so the async
651 // process can be spawned to check for the file again.
652 if (archiveAsyncStatus(archiveModeGet, walSegment, !first, found || !first))
653 {
654 storageRemoveP(
655 storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s" STATUS_EXT_OK, strZ(walSegment)),
656 .errorOnMissing = true);
657
658 // Break if an ok file was found but no segment exists, which means the segment was missing. However, don't
659 // break if this is the first time through the loop since this means the ok file was written by an async process
660 // spawned by a prior archive-get execution, which means we should spawn the async process again to see if the
661 // file exists now. This also prevents spool files from a previous recovery interfering with the current
662 // recovery.
663 if (!found && !first)
664 {
665 foundOk = true;
666 break;
667 }
668 }
669
670 // If found then move the WAL segment to the destination directory
671 if (found)
672 {
673 // Source is the WAL segment in the spool queue
674 StorageRead *source = storageNewReadP(
675 storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(walSegment)));
676
677 // A move will be attempted but if the spool queue and the WAL path are on different file systems then a copy
678 // will be performed instead.
679 //
680 // It looks scary that we are disabling syncs and atomicity (in case we need to copy instead of move) but this
681 // is safe because if the system crashes Postgres will not try to reuse a restored WAL segment but will instead
682 // request it again using the restore_command. In the case of a move this hardly matters since path syncs are
683 // cheap but if a copy is required we could save a lot of writes.
684 StorageWrite *destination = storageNewWriteP(
685 storageLocalWrite(), walDestination, .noCreatePath = true, .noSyncFile = true, .noSyncPath = true,
686 .noAtomic = true);
687
688 // Move (or copy if required) the file
689 storageMoveP(storageSpoolWrite(), source, destination);
690
691 // Return success
692 LOG_INFO_FMT(FOUND_IN_ARCHIVE_MSG " asynchronously", strZ(walSegment));
693 result = 0;
694
695 // Get a list of WAL segments left in the queue
696 StringList *queue = storageListP(
697 storageSpool(), STORAGE_SPOOL_ARCHIVE_IN_STR, .expression = WAL_SEGMENT_REGEXP_STR, .errorOnMissing = true);
698
699 if (!strLstEmpty(queue))
700 {
701 // Get size of the WAL segment
702 uint64_t walSegmentSize = storageInfoP(storageLocal(), walDestination).size;
703
704 // Use WAL segment size to estimate queue size and determine if the async process should be launched
705 queueFull = strLstSize(queue) * walSegmentSize > cfgOptionUInt64(cfgOptArchiveGetQueueMax) / 2;
706 }
707 }
708
709 // If the WAL segment has not already been found then start the async process to get it. There's no point in
710 // forking the async process off more than once so track that as well. Use an archive lock to prevent forking if
711 // the async process was launched by another process.
712 if (!forked && (!found || !queueFull) &&
713 lockAcquire(
714 cfgOptionStr(cfgOptLockPath), cfgOptionStr(cfgOptStanza), cfgOptionStr(cfgOptExecId), cfgLockType(), 0,
715 false))
716 {
717 // Get control info
718 PgControl pgControl = pgControlFromFile(storagePg());
719
720 // Create the queue
721 storagePathCreateP(storageSpoolWrite(), STORAGE_SPOOL_ARCHIVE_IN_STR);
722
723 // The async process should not output on the console at all
724 KeyValue *optionReplace = kvNew();
725
726 kvPut(optionReplace, VARSTRDEF(CFGOPT_LOG_LEVEL_CONSOLE), VARSTRDEF("off"));
727 kvPut(optionReplace, VARSTRDEF(CFGOPT_LOG_LEVEL_STDERR), VARSTRDEF("off"));
728
729 // Generate command options
730 StringList *commandExec = cfgExecParam(cfgCmdArchiveGet, cfgCmdRoleAsync, optionReplace, true, false);
731 strLstInsert(commandExec, 0, cfgExe());
732
733 // Clean the current queue using the list of WAL that we ideally want in the queue. queueNeed()
734 // will return the list of WAL needed to fill the queue and this will be passed to the async process.
735 const StringList *queue = queueNeed(
736 walSegment, found, cfgOptionUInt64(cfgOptArchiveGetQueueMax), pgControl.walSegmentSize,
737 pgControl.version);
738
739 for (unsigned int queueIdx = 0; queueIdx < strLstSize(queue); queueIdx++)
740 strLstAdd(commandExec, strLstGet(queue, queueIdx));
741
742 // Clear errors for the current wal segment
743 archiveAsyncErrorClear(archiveModeGet, walSegment);
744
745 // Release the lock so the child process can acquire it
746 lockRelease(true);
747
748 // Execute the async process
749 archiveAsyncExec(archiveModeGet, commandExec);
750
751 // Mark the async process as forked so it doesn't get forked again. A single run of the async process should be
752 // enough to do the job, running it again won't help anything.
753 forked = true;
754 }
755
756 // Exit loop if WAL was found
757 if (found)
758 break;
759
760 // No longer the first run, so errors will be thrown and missing files will be reported
761 first = false;
762 }
763 while (waitMore(wait));
764
765 // If the WAL segment was not found
766 if (!found)
767 {
768 // If no ok file was found then something may be wrong with the async process. It's better to throw an error here
769 // than report not found for debugging purposes. Either way PostgreSQL will halt if it has not reached consistency.
770 if (!foundOk)
771 {
772 THROW_FMT(
773 ArchiveTimeoutError, "unable to get WAL file '%s' from the archive asynchronously after %s second(s)",
774 strZ(walSegment), strZ(strNewDbl((double)cfgOptionInt64(cfgOptArchiveTimeout) / MSEC_PER_SEC)));
775 }
776 // Else report that the WAL segment could not be found
777 else
778 LOG_INFO_FMT(UNABLE_TO_FIND_IN_ARCHIVE_MSG " asynchronously", strZ(walSegment));
779 }
780 }
781 // Else perform synchronous get
782 else
783 {
784 // Check for the archive file
785 StringList *archiveRequestList = strLstNew();
786 strLstAdd(archiveRequestList, walSegment);
787
788 ArchiveGetCheckResult checkResult = archiveGetCheck(archiveRequestList);
789
790 // If there was an error then throw it
791 if (checkResult.errorType != NULL)
792 THROW_CODE(errorTypeCode(checkResult.errorType), strZ(checkResult.errorMessage));
793
794 // Get the archive file
795 if (!lstEmpty(checkResult.archiveFileMapList))
796 {
797 // There can only be one file mapping since only one file was requested
798 ASSERT(lstSize(checkResult.archiveFileMapList) == 1);
799 const ArchiveFileMap *fileMap = lstGet(checkResult.archiveFileMapList, 0);
800
801 // Get the file
802 ArchiveGetFileResult fileResult = archiveGetFile(
803 storageLocalWrite(), fileMap->request, fileMap->actualList, walDestination);
804
805 // Output file warnings
806 for (unsigned int warnIdx = 0; warnIdx < strLstSize(fileResult.warnList); warnIdx++)
807 LOG_WARN(strZ(strLstGet(fileResult.warnList, warnIdx)));
808
809 // If there was no error then the file existed
810 ArchiveGetFile *file = lstGet(fileMap->actualList, fileResult.actualIdx);
811 ASSERT(file != NULL);
812
813 LOG_INFO_FMT(
814 FOUND_IN_REPO_ARCHIVE_MSG, strZ(walSegment), cfgOptionGroupIdxToKey(cfgOptGrpRepo, file->repoIdx),
815 strZ(file->archiveId));
816
817 result = 0;
818 }
819 // Else log that the file was not found
820 else
821 LOG_INFO_FMT(UNABLE_TO_FIND_IN_ARCHIVE_MSG, strZ(walSegment));
822 }
823 }
824 MEM_CONTEXT_TEMP_END();
825
826 FUNCTION_LOG_RETURN(INT, result);
827 }
828
829 /**********************************************************************************************************************************/
830 typedef struct ArchiveGetAsyncData
831 {
832 const List *const archiveFileMapList; // List of wal segments to process
833 unsigned int archiveFileIdx; // Current index in the list to be processed
834 } ArchiveGetAsyncData;
835
archiveGetAsyncCallback(void * data,unsigned int clientIdx)836 static ProtocolParallelJob *archiveGetAsyncCallback(void *data, unsigned int clientIdx)
837 {
838 FUNCTION_TEST_BEGIN();
839 FUNCTION_TEST_PARAM_P(VOID, data);
840 FUNCTION_TEST_PARAM(UINT, clientIdx);
841 FUNCTION_TEST_END();
842
843 ProtocolParallelJob *result = NULL;
844
845 MEM_CONTEXT_TEMP_BEGIN()
846 {
847 // No special logic based on the client, we'll just get the next job
848 (void)clientIdx;
849
850 // Get a new job if there are any left
851 ArchiveGetAsyncData *jobData = data;
852
853 if (jobData->archiveFileIdx < lstSize(jobData->archiveFileMapList))
854 {
855 const ArchiveFileMap *archiveFileMap = lstGet(jobData->archiveFileMapList, jobData->archiveFileIdx);
856 jobData->archiveFileIdx++;
857
858 ProtocolCommand *const command = protocolCommandNew(PROTOCOL_COMMAND_ARCHIVE_GET_FILE);
859 PackWrite *const param = protocolCommandParam(command);
860
861 pckWriteStrP(param, archiveFileMap->request);
862
863 // Add actual files to get
864 for (unsigned int actualIdx = 0; actualIdx < lstSize(archiveFileMap->actualList); actualIdx++)
865 {
866 const ArchiveGetFile *const actual = lstGet(archiveFileMap->actualList, actualIdx);
867
868 pckWriteStrP(param, actual->file);
869 pckWriteU32P(param, actual->repoIdx);
870 pckWriteStrP(param, actual->archiveId);
871 pckWriteU64P(param, actual->cipherType);
872 pckWriteStrP(param, actual->cipherPassArchive);
873 }
874
875 MEM_CONTEXT_PRIOR_BEGIN()
876 {
877 result = protocolParallelJobNew(VARSTR(archiveFileMap->request), command);
878 }
879 MEM_CONTEXT_PRIOR_END();
880 }
881 }
882 MEM_CONTEXT_TEMP_END();
883
884 FUNCTION_TEST_RETURN(result);
885 }
886
887 void
cmdArchiveGetAsync(void)888 cmdArchiveGetAsync(void)
889 {
890 FUNCTION_LOG_VOID(logLevelDebug);
891
892 MEM_CONTEXT_TEMP_BEGIN()
893 {
894 TRY_BEGIN()
895 {
896 // PostgreSQL must be local
897 pgIsLocalVerify();
898
899 // Check the parameters
900 if (strLstSize(cfgCommandParam()) < 1)
901 THROW(ParamInvalidError, "at least one wal segment is required");
902
903 LOG_INFO_FMT(
904 "get %u WAL file(s) from archive: %s%s",
905 strLstSize(cfgCommandParam()), strZ(strLstGet(cfgCommandParam(), 0)),
906 strLstSize(cfgCommandParam()) == 1 ?
907 "" : strZ(strNewFmt("...%s", strZ(strLstGet(cfgCommandParam(), strLstSize(cfgCommandParam()) - 1)))));
908
909 // Check for archive files
910 ArchiveGetCheckResult checkResult = archiveGetCheck(cfgCommandParam());
911
912 // If any files are missing get the first one (used to construct the "unable to find" warning)
913 const String *archiveFileMissing = NULL;
914
915 if (lstSize(checkResult.archiveFileMapList) < strLstSize(cfgCommandParam()))
916 archiveFileMissing = strLstGet(cfgCommandParam(), lstSize(checkResult.archiveFileMapList));
917
918 // Get archive files that were found
919 if (!lstEmpty(checkResult.archiveFileMapList))
920 {
921 // Create the parallel executor
922 ArchiveGetAsyncData jobData = {.archiveFileMapList = checkResult.archiveFileMapList};
923
924 ProtocolParallel *parallelExec = protocolParallelNew(
925 cfgOptionUInt64(cfgOptProtocolTimeout) / 2, archiveGetAsyncCallback, &jobData);
926
927 for (unsigned int processIdx = 1; processIdx <= cfgOptionUInt(cfgOptProcessMax); processIdx++)
928 protocolParallelClientAdd(parallelExec, protocolLocalGet(protocolStorageTypeRepo, 0, processIdx));
929
930 // Process jobs
931 MEM_CONTEXT_TEMP_RESET_BEGIN()
932 {
933 do
934 {
935 unsigned int completed = protocolParallelProcess(parallelExec);
936
937 for (unsigned int jobIdx = 0; jobIdx < completed; jobIdx++)
938 {
939 // Get the job
940 ProtocolParallelJob *job = protocolParallelResult(parallelExec);
941 unsigned int processId = protocolParallelJobProcessId(job);
942
943 // Get wal segment name and archive file map
944 const String *walSegment = varStr(protocolParallelJobKey(job));
945 const ArchiveFileMap *fileMap = lstFind(checkResult.archiveFileMapList, &walSegment);
946 ASSERT(fileMap != NULL);
947
948 // Build warnings for status file
949 String *warning = strNew();
950
951 if (!strLstEmpty(fileMap->warnList))
952 strCatFmt(warning, "%s", strZ(strLstJoin(fileMap->warnList, "\n")));
953
954 // The job was successful
955 if (protocolParallelJobErrorCode(job) == 0)
956 {
957 // Get the actual file retrieved
958 PackRead *const fileResult = protocolParallelJobResult(job);
959 ArchiveGetFile *file = lstGet(fileMap->actualList, pckReadU32P(fileResult));
960 ASSERT(file != NULL);
961
962 // Output file warnings
963 StringList *fileWarnList = pckReadStrLstP(fileResult);
964
965 for (unsigned int warnIdx = 0; warnIdx < strLstSize(fileWarnList); warnIdx++)
966 LOG_WARN_PID(processId, strZ(strLstGet(fileWarnList, warnIdx)));
967
968 // Build file warnings for status file
969 if (!strLstEmpty(fileWarnList))
970 {
971 strCatFmt(
972 warning, "%s%s", strSize(warning) == 0 ? "" : "\n", strZ(strLstJoin(fileWarnList, "\n")));
973 }
974
975 if (strSize(warning) != 0)
976 archiveAsyncStatusOkWrite(archiveModeGet, walSegment, warning);
977
978 LOG_DETAIL_PID_FMT(
979 processId, FOUND_IN_REPO_ARCHIVE_MSG, strZ(walSegment),
980 cfgOptionGroupIdxToKey(cfgOptGrpRepo, file->repoIdx), strZ(file->archiveId));
981
982 // Rename temp WAL segment to actual name. This is done after the ok file is written so the ok file
983 // is guaranteed to exist before the foreground process finds the WAL segment.
984 storageMoveP(
985 storageSpoolWrite(),
986 storageNewReadP(
987 storageSpool(),
988 strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s." STORAGE_FILE_TEMP_EXT, strZ(walSegment))),
989 storageNewWriteP(
990 storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(walSegment))));
991 }
992 // Else the job errored
993 else
994 {
995 LOG_WARN_PID_FMT(
996 processId, "[%s] %s", errorTypeName(errorTypeFromCode(protocolParallelJobErrorCode(job))),
997 strZ(protocolParallelJobErrorMessage(job)));
998
999 archiveAsyncStatusErrorWrite(
1000 archiveModeGet, walSegment, protocolParallelJobErrorCode(job),
1001 strNewFmt(
1002 "%s%s", strZ(protocolParallelJobErrorMessage(job)),
1003 strSize(warning) == 0 ? "" : strZ(strNewFmt("\n%s", strZ(warning)))));
1004 }
1005
1006 protocolParallelJobFree(job);
1007 }
1008
1009 // Reset the memory context occasionally so we don't use too much memory or slow down processing
1010 MEM_CONTEXT_TEMP_RESET(1000);
1011 }
1012 while (!protocolParallelDone(parallelExec));
1013 }
1014 MEM_CONTEXT_TEMP_END();
1015 }
1016
1017 // Log an error from archiveGetCheck() after any existing files have been fetched. This ordering is important because we
1018 // need to fetch as many valid files as possible before throwing an error.
1019 if (checkResult.errorType != NULL)
1020 {
1021 LOG_WARN_FMT("[%s] %s", errorTypeName(checkResult.errorType), strZ(checkResult.errorMessage));
1022
1023 String *message = strDup(checkResult.errorMessage);
1024
1025 if (!strLstEmpty(checkResult.warnList))
1026 strCatFmt(message, "\n%s", strZ(strLstJoin(checkResult.warnList, "\n")));
1027
1028 archiveAsyncStatusErrorWrite(
1029 archiveModeGet, checkResult.errorFile, errorTypeCode(checkResult.errorType), message);
1030 }
1031 // If any files were missing write an ok file for the first missing file and add any warnings. It is important that this
1032 // happen right before the async process exits so the main process can immediately respawn the async process to retry
1033 // missing files.
1034 else if (archiveFileMissing != NULL)
1035 {
1036 LOG_DETAIL_FMT(UNABLE_TO_FIND_IN_ARCHIVE_MSG, strZ(archiveFileMissing));
1037
1038 String *message = NULL;
1039
1040 if (!strLstEmpty(checkResult.warnList))
1041 message = strLstJoin(checkResult.warnList, "\n");
1042
1043 archiveAsyncStatusOkWrite(archiveModeGet, archiveFileMissing, message);
1044 }
1045 }
1046 // On any global error write a single error file to cover all unprocessed files
1047 CATCH_ANY()
1048 {
1049 archiveAsyncStatusErrorWrite(archiveModeGet, NULL, errorCode(), STR(errorMessage()));
1050 RETHROW();
1051 }
1052 TRY_END();
1053 }
1054 MEM_CONTEXT_TEMP_END();
1055
1056 FUNCTION_LOG_RETURN_VOID();
1057 }
1058