1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79    Disk IO Routines */
10 
11 #include "squid.h"
12 #include "base/AsyncJobCalls.h"
13 #include "fs/rock/RockDbCell.h"
14 #include "fs/rock/RockRebuild.h"
15 #include "fs/rock/RockSwapDir.h"
16 #include "fs_io.h"
17 #include "globals.h"
18 #include "ipc/StoreMap.h"
19 #include "md5.h"
20 #include "SquidTime.h"
21 #include "Store.h"
22 #include "store_rebuild.h"
23 #include "tools.h"
24 
25 #include <cerrno>
26 
27 CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
28 
29 /**
30  \defgroup RockFsRebuild Rock Store Rebuild
31  \ingroup Filesystems
32  *
33  \section Overview Overview
34  *  Several layers of information are manipualted during the rebuild:
35  \par
36  *  Store Entry: Response message plus all the metainformation associated with
37  *  it. Identified by store key. At any given time, from Squid point
38  *  of view, there is only one entry with a given key, but several
39  *  different entries with the same key can be observed in any historical
40  *  archive (such as an access log or a store database).
41  \par
42  *  Slot chain: A sequence of db slots representing a Store Entry state at
43  *  some point in time. Identified by key+version combination. Due to
44  *  transaction aborts, crashes, and idle periods, some chains may contain
45  *  incomplete or stale information. We assume that no two different chains
46  *  have the same key and version. If that assumption fails, we may serve a
47  *  hodgepodge entry during rebuild, until "extra" slots are loaded/noticed.
48  \par
49  *  iNode: The very first db slot in an entry slot chain. This slot contains
50  *  at least the beginning of Store Entry metadata, but most 32KB inodes contain
51  *  the entire metadata, HTTP headers, and HTTP body.
52  \par
53  *  Db slot: A db record containing a piece of a single store entry and linked
54  *  to other slots with the same key and version fields, forming a chain.
55  *  Slots are identified by their absolute position in the database file,
56  *  which is naturally unique.
57  \par
58  *  When information from the newly loaded db slot contradicts the entry-level
59  *  information collected so far (e.g., the versions do not match or the total
60  *  chain size after the slot contribution exceeds the expected number), the
61  *  whole entry (and not just the chain or the slot!) is declared corrupted.
62  \par
63  *  Why invalidate the whole entry? Rock Store is written for high-load
64  *  environments with large caches, where there is usually very few idle slots
65  *  in the database. A space occupied by a purged entry is usually immediately
66  *  reclaimed. A Squid crash or a transaction abort is rather unlikely to
67  *  leave a relatively large number of stale slots in the database. Thus, the
68  *  number of potentially corrupted entries is relatively small. On the other
69  *  hand, the damage from serving a single hadgepodge entry may be significant
70  *  to the user. In such an environment, invalidating the whole entry has
71  *  negligible performance impact but saves us from high-damage bugs.
72  */
73 
74 namespace Rock
75 {
76 
77 /// low-level anti-padding storage class for LoadingEntry and LoadingSlot flags
78 class LoadingFlags
79 {
80 public:
LoadingFlags()81     LoadingFlags(): state(0), anchored(0), mapped(0), finalized(0), freed(0) {}
82 
83     /* for LoadingEntry */
84     uint8_t state:3;  ///< current entry state (one of the LoadingEntry::State values)
85     uint8_t anchored:1;  ///< whether we loaded the inode slot for this entry
86 
87     /* for LoadingSlot */
88     uint8_t mapped:1;  ///< whether the slot was added to a mapped entry
89     uint8_t finalized:1;  ///< whether finalizeOrThrow() has scanned the slot
90     uint8_t freed:1;  ///< whether the slot was given to the map as free space
91 };
92 
93 /// smart StoreEntry-level info pointer (hides anti-padding LoadingParts arrays)
94 class LoadingEntry
95 {
96 public:
97     LoadingEntry(const sfileno fileNo, LoadingParts &source);
98 
99     uint64_t &size; ///< payload seen so far
100     uint32_t &version; ///< DbCellHeader::version to distinguish same-URL chains
101 
102     /// possible store entry states during index rebuild
103     typedef enum { leEmpty = 0, leLoading, leLoaded, leCorrupted, leIgnored } State;
104 
105     /* LoadingFlags::state */
state() const106     State state() const { return static_cast<State>(flags.state); }
state(State aState) const107     void state(State aState) const { flags.state = aState; }
108 
109     /* LoadingFlags::anchored */
anchored() const110     bool anchored() const { return flags.anchored; }
anchored(const bool beAnchored)111     void anchored(const bool beAnchored) { flags.anchored = beAnchored; }
112 
113 private:
114     LoadingFlags &flags; ///< entry flags (see the above accessors) are ours
115 };
116 
117 /// smart db slot-level info pointer (hides anti-padding LoadingParts arrays)
118 class LoadingSlot
119 {
120 public:
121     LoadingSlot(const SlotId slotId, LoadingParts &source);
122 
123     /// another slot in some chain belonging to the same entry (unordered!)
124     Ipc::StoreMapSliceId &more;
125 
126     /* LoadingFlags::mapped */
mapped() const127     bool mapped() const { return flags.mapped; }
mapped(const bool beMapped)128     void mapped(const bool beMapped) { flags.mapped = beMapped; }
129 
130     /* LoadingFlags::finalized */
finalized() const131     bool finalized() const { return flags.finalized; }
finalized(const bool beFinalized)132     void finalized(const bool beFinalized) { flags.finalized = beFinalized; }
133 
134     /* LoadingFlags::freed */
freed() const135     bool freed() const { return flags.freed; }
freed(const bool beFreed)136     void freed(const bool beFreed) { flags.freed = beFreed; }
137 
used() const138     bool used() const { return freed() || mapped() || more != -1; }
139 
140 private:
141     LoadingFlags &flags; ///< slot flags (see the above accessors) are ours
142 };
143 
144 /// information about store entries being loaded from disk (and their slots)
145 /// used for identifying partially stored/loaded entries
146 class LoadingParts
147 {
148 public:
149     LoadingParts(int dbSlotLimit, int dbEntryLimit);
150     LoadingParts(LoadingParts&&) = delete; // paranoid (often too huge to copy)
151 
152 private:
153     friend class LoadingEntry;
154     friend class LoadingSlot;
155 
156     /* Anti-padding storage. With millions of entries, padding matters! */
157 
158     /* indexed by sfileno */
159     std::vector<uint64_t> sizes; ///< LoadingEntry::size for all entries
160     std::vector<uint32_t> versions; ///< LoadingEntry::version for all entries
161 
162     /* indexed by SlotId */
163     std::vector<Ipc::StoreMapSliceId> mores; ///< LoadingSlot::more for all slots
164 
165     /* entry flags are indexed by sfileno; slot flags -- by SlotId */
166     std::vector<LoadingFlags> flags; ///< all LoadingEntry and LoadingSlot flags
167 };
168 
169 } /* namespace Rock */
170 
171 /* LoadingEntry */
172 
LoadingEntry(const sfileno fileNo,LoadingParts & source)173 Rock::LoadingEntry::LoadingEntry(const sfileno fileNo, LoadingParts &source):
174     size(source.sizes.at(fileNo)),
175     version(source.versions.at(fileNo)),
176     flags(source.flags.at(fileNo))
177 {
178 }
179 
180 /* LoadingSlot */
181 
LoadingSlot(const SlotId slotId,LoadingParts & source)182 Rock::LoadingSlot::LoadingSlot(const SlotId slotId, LoadingParts &source):
183     more(source.mores.at(slotId)),
184     flags(source.flags.at(slotId))
185 {
186 }
187 
188 /* LoadingParts */
189 
LoadingParts(const int dbEntryLimit,const int dbSlotLimit)190 Rock::LoadingParts::LoadingParts(const int dbEntryLimit, const int dbSlotLimit):
191     sizes(dbEntryLimit, 0),
192     versions(dbEntryLimit, 0),
193     mores(dbSlotLimit, -1),
194     flags(dbSlotLimit)
195 {
196     assert(sizes.size() == versions.size()); // every entry has both fields
197     assert(sizes.size() <= mores.size()); // every entry needs slot(s)
198     assert(mores.size() == flags.size()); // every slot needs a set of flags
199 }
200 
201 /* Rebuild */
202 
Rebuild(SwapDir * dir)203 Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
204     sd(dir),
205     parts(nullptr),
206     dbSize(0),
207     dbSlotSize(0),
208     dbSlotLimit(0),
209     dbEntryLimit(0),
210     fd(-1),
211     dbOffset(0),
212     loadingPos(0),
213     validationPos(0)
214 {
215     assert(sd);
216     dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
217     dbSlotSize = sd->slotSize;
218     dbEntryLimit = sd->entryLimitActual();
219     dbSlotLimit = sd->slotLimitActual();
220     assert(dbEntryLimit <= dbSlotLimit);
221 }
222 
~Rebuild()223 Rock::Rebuild::~Rebuild()
224 {
225     if (fd >= 0)
226         file_close(fd);
227     delete parts;
228 }
229 
230 /// prepares and initiates entry loading sequence
231 void
start()232 Rock::Rebuild::start()
233 {
234     // in SMP mode, only the disker is responsible for populating the map
235     if (UsingSmp() && !IamDiskProcess()) {
236         debugs(47, 2, "Non-disker skips rebuilding of cache_dir #" <<
237                sd->index << " from " << sd->filePath);
238         mustStop("non-disker");
239         return;
240     }
241 
242     debugs(47, DBG_IMPORTANT, "Loading cache_dir #" << sd->index <<
243            " from " << sd->filePath);
244 
245     fd = file_open(sd->filePath, O_RDONLY | O_BINARY);
246     if (fd < 0)
247         failure("cannot open db", errno);
248 
249     char hdrBuf[SwapDir::HeaderSize];
250     if (read(fd, hdrBuf, sizeof(hdrBuf)) != SwapDir::HeaderSize)
251         failure("cannot read db header", errno);
252 
253     // slot prefix of SM_PAGE_SIZE should fit both core entry header and ours
254     assert(sizeof(DbCellHeader) < SM_PAGE_SIZE);
255     buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
256 
257     dbOffset = SwapDir::HeaderSize;
258 
259     parts = new LoadingParts(dbEntryLimit, dbSlotLimit);
260 
261     checkpoint();
262 }
263 
264 /// continues after a pause if not done
265 void
checkpoint()266 Rock::Rebuild::checkpoint()
267 {
268     if (!done())
269         eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
270 }
271 
272 bool
doneLoading() const273 Rock::Rebuild::doneLoading() const
274 {
275     return loadingPos >= dbSlotLimit;
276 }
277 
278 bool
doneValidating() const279 Rock::Rebuild::doneValidating() const
280 {
281     // paranoid slot checking is only enabled with squid -S
282     return validationPos >= dbEntryLimit +
283            (opt_store_doublecheck ? dbSlotLimit : 0);
284 }
285 
286 bool
doneAll() const287 Rock::Rebuild::doneAll() const
288 {
289     return doneLoading() && doneValidating() && AsyncJob::doneAll();
290 }
291 
292 void
Steps(void * data)293 Rock::Rebuild::Steps(void *data)
294 {
295     // use async call to enable job call protection that time events lack
296     CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps);
297 }
298 
299 void
steps()300 Rock::Rebuild::steps()
301 {
302     if (!doneLoading())
303         loadingSteps();
304     else
305         validationSteps();
306 
307     checkpoint();
308 }
309 
310 void
loadingSteps()311 Rock::Rebuild::loadingSteps()
312 {
313     debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
314            dbOffset << " <= " << dbSize);
315 
316     // Balance our desire to maximize the number of entries processed at once
317     // (and, hence, minimize overheads and total rebuild time) with a
318     // requirement to also process Coordinator events, disk I/Os, etc.
319     const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
320     const timeval loopStart = current_time;
321 
322     int loaded = 0;
323     while (!doneLoading()) {
324         loadOneSlot();
325         dbOffset += dbSlotSize;
326         ++loadingPos;
327         ++loaded;
328 
329         if (counts.scancount % 1000 == 0)
330             storeRebuildProgress(sd->index, dbSlotLimit, counts.scancount);
331 
332         if (opt_foreground_rebuild)
333             continue; // skip "few entries at a time" check below
334 
335         getCurrentTime();
336         const double elapsedMsec = tvSubMsec(loopStart, current_time);
337         if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
338             debugs(47, 5, HERE << "pausing after " << loaded << " entries in " <<
339                    elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per entry");
340             break;
341         }
342     }
343 }
344 
345 Rock::LoadingEntry
loadingEntry(const sfileno fileNo)346 Rock::Rebuild::loadingEntry(const sfileno fileNo)
347 {
348     Must(0 <= fileNo && fileNo < dbEntryLimit);
349     return LoadingEntry(fileNo, *parts);
350 }
351 
352 Rock::LoadingSlot
loadingSlot(const SlotId slotId)353 Rock::Rebuild::loadingSlot(const SlotId slotId)
354 {
355     Must(0 <= slotId && slotId < dbSlotLimit);
356     Must(slotId <= loadingPos); // cannot look ahead
357     return LoadingSlot(slotId, *parts);
358 }
359 
360 void
loadOneSlot()361 Rock::Rebuild::loadOneSlot()
362 {
363     debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
364            dbOffset << " <= " << dbSize);
365 
366     ++counts.scancount;
367 
368     if (lseek(fd, dbOffset, SEEK_SET) < 0)
369         failure("cannot seek to db entry", errno);
370 
371     buf.reset();
372 
373     if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
374         return;
375 
376     const SlotId slotId = loadingPos;
377 
378     // get our header
379     DbCellHeader header;
380     if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
381         debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
382                "Ignoring truncated " << buf.contentSize() << "-byte " <<
383                "cache entry meta data at " << dbOffset);
384         freeUnusedSlot(slotId, true);
385         return;
386     }
387     memcpy(&header, buf.content(), sizeof(header));
388     if (header.empty()) {
389         freeUnusedSlot(slotId, false);
390         return;
391     }
392     if (!header.sane(dbSlotSize, dbSlotLimit)) {
393         debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
394                "Ignoring malformed cache entry meta data at " << dbOffset);
395         freeUnusedSlot(slotId, true);
396         return;
397     }
398     buf.consume(sizeof(header)); // optimize to avoid memmove()
399 
400     useNewSlot(slotId, header);
401 }
402 
403 /// parse StoreEntry basics and add them to the map, returning true on success
404 bool
importEntry(Ipc::StoreMapAnchor & anchor,const sfileno fileno,const DbCellHeader & header)405 Rock::Rebuild::importEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header)
406 {
407     cache_key key[SQUID_MD5_DIGEST_LENGTH];
408     StoreEntry loadedE;
409     const uint64_t knownSize = header.entrySize > 0 ?
410                                header.entrySize : anchor.basics.swap_file_sz.load();
411     if (!storeRebuildParseEntry(buf, loadedE, key, counts, knownSize))
412         return false;
413 
414     // the entry size may be unknown, but if it is known, it is authoritative
415 
416     debugs(47, 8, "importing basics for entry " << fileno <<
417            " inode.entrySize: " << header.entrySize <<
418            " swap_file_sz: " << loadedE.swap_file_sz);
419     anchor.set(loadedE);
420 
421     // we have not validated whether all db cells for this entry were loaded
422     EBIT_CLR(anchor.basics.flags, ENTRY_VALIDATED);
423 
424     // loadedE->dump(5);
425 
426     return true;
427 }
428 
429 void
validationSteps()430 Rock::Rebuild::validationSteps()
431 {
432     debugs(47, 5, sd->index << " validating from " << validationPos);
433 
434     // see loadingSteps() for the rationale; TODO: avoid duplication
435     const int maxSpentMsec = 50; // keep small: validation does not do I/O
436     const timeval loopStart = current_time;
437 
438     int validated = 0;
439     while (!doneValidating()) {
440         if (validationPos < dbEntryLimit)
441             validateOneEntry(validationPos);
442         else
443             validateOneSlot(validationPos - dbEntryLimit);
444         ++validationPos;
445         ++validated;
446 
447         if (validationPos % 1000 == 0)
448             debugs(20, 2, "validated: " << validationPos);
449 
450         if (opt_foreground_rebuild)
451             continue; // skip "few entries at a time" check below
452 
453         getCurrentTime();
454         const double elapsedMsec = tvSubMsec(loopStart, current_time);
455         if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
456             debugs(47, 5, "pausing after " << validated << " entries in " <<
457                    elapsedMsec << "ms; " << (elapsedMsec/validated) << "ms per entry");
458             break;
459         }
460     }
461 }
462 
463 /// Either make the entry accessible to all or throw.
464 /// This method assumes it is called only when no more entry slots are expected.
465 void
finalizeOrThrow(const sfileno fileNo,LoadingEntry & le)466 Rock::Rebuild::finalizeOrThrow(const sfileno fileNo, LoadingEntry &le)
467 {
468     // walk all map-linked slots, starting from inode, and mark each
469     Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileNo);
470     Must(le.size > 0); // paranoid
471     uint64_t mappedSize = 0;
472     SlotId slotId = anchor.start;
473     while (slotId >= 0 && mappedSize < le.size) {
474         LoadingSlot slot = loadingSlot(slotId); // throws if we have not loaded that slot
475         Must(!slot.finalized()); // no loops or stealing from other entries
476         Must(slot.mapped()); // all our slots should be in the sd->map
477         Must(!slot.freed()); // all our slots should still be present
478         slot.finalized(true);
479 
480         Ipc::StoreMapSlice &mapSlice = sd->map->writeableSlice(fileNo, slotId);
481         Must(mapSlice.size > 0); // paranoid
482         mappedSize += mapSlice.size;
483         slotId = mapSlice.next;
484     }
485     /* no hodgepodge entries: one entry - one full chain and no leftovers */
486     Must(slotId < 0);
487     Must(mappedSize == le.size);
488 
489     if (!anchor.basics.swap_file_sz)
490         anchor.basics.swap_file_sz = le.size;
491     EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED);
492     le.state(LoadingEntry::leLoaded);
493     sd->map->closeForWriting(fileNo);
494     ++counts.objcount;
495 }
496 
497 /// Either make the entry accessible to all or free it.
498 /// This method must only be called when no more entry slots are expected.
499 void
finalizeOrFree(const sfileno fileNo,LoadingEntry & le)500 Rock::Rebuild::finalizeOrFree(const sfileno fileNo, LoadingEntry &le)
501 {
502     try {
503         finalizeOrThrow(fileNo, le);
504     } catch (const std::exception &ex) {
505         freeBadEntry(fileNo, ex.what());
506     }
507 }
508 
509 void
validateOneEntry(const sfileno fileNo)510 Rock::Rebuild::validateOneEntry(const sfileno fileNo)
511 {
512     LoadingEntry entry = loadingEntry(fileNo);
513     switch (entry.state()) {
514 
515     case LoadingEntry::leLoading:
516         finalizeOrFree(fileNo, entry);
517         break;
518 
519     case LoadingEntry::leEmpty: // no entry hashed to this position
520     case LoadingEntry::leLoaded: // we have already unlocked this entry
521     case LoadingEntry::leCorrupted: // we have already removed this entry
522     case LoadingEntry::leIgnored: // we have already discarded this entry
523         break;
524     }
525 }
526 
527 void
validateOneSlot(const SlotId slotId)528 Rock::Rebuild::validateOneSlot(const SlotId slotId)
529 {
530     const LoadingSlot slot = loadingSlot(slotId);
531     // there should not be any unprocessed slots left
532     Must(slot.freed() || (slot.mapped() && slot.finalized()));
533 }
534 
535 /// Marks remaining bad entry slots as free and unlocks the entry. The map
536 /// cannot do this because Loading entries may have holes in the slots chain.
537 void
freeBadEntry(const sfileno fileno,const char * eDescription)538 Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription)
539 {
540     debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription <<
541            " entry " << fileno << " is ignored during rebuild");
542 
543     LoadingEntry le = loadingEntry(fileno);
544     le.state(LoadingEntry::leCorrupted);
545 
546     Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
547     assert(anchor.start < 0 || le.size > 0);
548     for (SlotId slotId = anchor.start; slotId >= 0;) {
549         const SlotId next = loadingSlot(slotId).more;
550         freeSlot(slotId, true);
551         slotId = next;
552     }
553 
554     sd->map->forgetWritingEntry(fileno);
555 }
556 
557 void
swanSong()558 Rock::Rebuild::swanSong()
559 {
560     debugs(47,3, HERE << "cache_dir #" << sd->index << " rebuild level: " <<
561            StoreController::store_dirs_rebuilding);
562     --StoreController::store_dirs_rebuilding;
563     storeRebuildComplete(&counts);
564 }
565 
566 void
failure(const char * msg,int errNo)567 Rock::Rebuild::failure(const char *msg, int errNo)
568 {
569     debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
570            dbOffset << " <= " << dbSize);
571 
572     if (errNo)
573         debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir rebuild failure: " << xstrerr(errNo));
574     debugs(47, DBG_CRITICAL, "Do you need to run 'squid -z' to initialize storage?");
575 
576     assert(sd);
577     fatalf("Rock cache_dir[%d] rebuild of %s failed: %s.",
578            sd->index, sd->filePath, msg);
579 }
580 
581 /// adds slot to the free slot index
582 void
freeSlot(const SlotId slotId,const bool invalid)583 Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid)
584 {
585     debugs(47,5, sd->index << " frees slot " << slotId);
586     LoadingSlot slot = loadingSlot(slotId);
587     assert(!slot.freed());
588     slot.freed(true);
589 
590     if (invalid) {
591         ++counts.invalid;
592         //sd->unlink(fileno); leave garbage on disk, it should not hurt
593     }
594 
595     Ipc::Mem::PageId pageId;
596     pageId.pool = sd->index+1;
597     pageId.number = slotId+1;
598     sd->freeSlots->push(pageId);
599 }
600 
601 /// freeSlot() for never-been-mapped slots
602 void
freeUnusedSlot(const SlotId slotId,const bool invalid)603 Rock::Rebuild::freeUnusedSlot(const SlotId slotId, const bool invalid)
604 {
605     LoadingSlot slot = loadingSlot(slotId);
606     // mapped slots must be freed via freeBadEntry() to keep the map in sync
607     assert(!slot.mapped());
608     freeSlot(slotId, invalid);
609 }
610 
611 /// adds slot to the entry chain in the map
612 void
mapSlot(const SlotId slotId,const DbCellHeader & header)613 Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header)
614 {
615     LoadingSlot slot = loadingSlot(slotId);
616     assert(!slot.mapped());
617     assert(!slot.freed());
618     slot.mapped(true);
619 
620     Ipc::StoreMapSlice slice;
621     slice.next = header.nextSlot;
622     slice.size = header.payloadSize;
623     sd->map->importSlice(slotId, slice);
624 }
625 
626 template <class SlotIdType> // accommodates atomic and simple SlotIds.
627 void
chainSlots(SlotIdType & from,const SlotId to)628 Rock::Rebuild::chainSlots(SlotIdType &from, const SlotId to)
629 {
630     LoadingSlot slot = loadingSlot(to);
631     assert(slot.more < 0);
632     slot.more = from; // may still be unset
633     from = to;
634 }
635 
636 /// adds slot to an existing entry chain; caller must check that the slot
637 /// belongs to the chain it is being added to
638 void
addSlotToEntry(const sfileno fileno,const SlotId slotId,const DbCellHeader & header)639 Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
640 {
641     LoadingEntry le = loadingEntry(fileno);
642     Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
643 
644     debugs(47,9, "adding " << slotId << " to entry " << fileno);
645     // we do not need to preserve the order
646     if (le.anchored()) {
647         LoadingSlot inode = loadingSlot(anchor.start);
648         chainSlots(inode.more, slotId);
649     } else {
650         chainSlots(anchor.start, slotId);
651     }
652 
653     le.size += header.payloadSize; // must precede freeBadEntry() calls
654 
655     if (header.firstSlot == slotId) {
656         debugs(47,5, "added inode");
657 
658         if (le.anchored()) { // we have already added another inode slot
659             freeBadEntry(fileno, "inode conflict");
660             ++counts.clashcount;
661             return;
662         }
663 
664         le.anchored(true);
665 
666         if (!importEntry(anchor, fileno, header)) {
667             freeBadEntry(fileno, "corrupted metainfo");
668             return;
669         }
670 
671         // set total entry size and/or check it for consistency
672         if (const uint64_t totalSize = header.entrySize) {
673             assert(totalSize != static_cast<uint64_t>(-1));
674             if (!anchor.basics.swap_file_sz) {
675                 anchor.basics.swap_file_sz = totalSize;
676                 assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
677             } else if (totalSize != anchor.basics.swap_file_sz) {
678                 freeBadEntry(fileno, "size mismatch");
679                 return;
680             }
681         }
682     }
683 
684     const uint64_t totalSize = anchor.basics.swap_file_sz; // may be 0/unknown
685 
686     if (totalSize > 0 && le.size > totalSize) { // overflow
687         debugs(47, 8, "overflow: " << le.size << " > " << totalSize);
688         freeBadEntry(fileno, "overflowing");
689         return;
690     }
691 
692     mapSlot(slotId, header);
693     if (totalSize > 0 && le.size == totalSize)
694         finalizeOrFree(fileno, le); // entry is probably fully loaded now
695 }
696 
697 /// initialize housekeeping information for a newly accepted entry
698 void
primeNewEntry(Ipc::StoreMap::Anchor & anchor,const sfileno fileno,const DbCellHeader & header)699 Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno, const DbCellHeader &header)
700 {
701     anchor.setKey(reinterpret_cast<const cache_key*>(header.key));
702     assert(header.firstSlot >= 0);
703     anchor.start = -1; // addSlotToEntry() will set it
704 
705     assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
706 
707     LoadingEntry le = loadingEntry(fileno);
708     le.state(LoadingEntry::leLoading);
709     le.version = header.version;
710     le.size = 0;
711 }
712 
713 /// handle a slot from an entry that we have not seen before
714 void
startNewEntry(const sfileno fileno,const SlotId slotId,const DbCellHeader & header)715 Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
716 {
717     // A miss may have been stored at our fileno while we were loading other
718     // slots from disk. We ought to preserve that entry because it is fresher.
719     const bool overwriteExisting = false;
720     if (Ipc::StoreMap::Anchor *anchor = sd->map->openForWritingAt(fileno, overwriteExisting)) {
721         primeNewEntry(*anchor, fileno, header);
722         addSlotToEntry(fileno, slotId, header); // may fail
723         assert(anchor->basics.swap_file_sz != static_cast<uint64_t>(-1));
724     } else {
725         // A new from-network entry is occupying our map slot; let it be, but
726         // save us from the trouble of going through the above motions again.
727         LoadingEntry le = loadingEntry(fileno);
728         le.state(LoadingEntry::leIgnored);
729         freeUnusedSlot(slotId, false);
730     }
731 }
732 
733 /// does the header belong to the fileno entry being loaded?
734 bool
sameEntry(const sfileno fileno,const DbCellHeader & header) const735 Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const
736 {
737     // Header updates always result in multi-start chains and often
738     // result in multi-version chains so we can only compare the keys.
739     const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno);
740     return anchor.sameKey(reinterpret_cast<const cache_key*>(header.key));
741 }
742 
743 /// handle freshly loaded (and validated) db slot header
744 void
useNewSlot(const SlotId slotId,const DbCellHeader & header)745 Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
746 {
747     const cache_key *const key =
748         reinterpret_cast<const cache_key*>(header.key);
749     const sfileno fileno = sd->map->fileNoByKey(key);
750     assert(0 <= fileno && fileno < dbEntryLimit);
751 
752     LoadingEntry le = loadingEntry(fileno);
753     debugs(47,9, "entry " << fileno << " state: " << le.state() << ", inode: " <<
754            header.firstSlot << ", size: " << header.payloadSize);
755 
756     switch (le.state()) {
757 
758     case LoadingEntry::leEmpty: {
759         startNewEntry(fileno, slotId, header);
760         break;
761     }
762 
763     case LoadingEntry::leLoading: {
764         if (sameEntry(fileno, header)) {
765             addSlotToEntry(fileno, slotId, header); // may fail
766         } else {
767             // either the loading chain or this slot is stale;
768             // be conservative and ignore both (and any future ones)
769             freeBadEntry(fileno, "duplicated");
770             freeUnusedSlot(slotId, true);
771             ++counts.dupcount;
772         }
773         break;
774     }
775 
776     case LoadingEntry::leLoaded: {
777         // either the previously loaded chain or this slot is stale;
778         // be conservative and ignore both (and any future ones)
779         le.state(LoadingEntry::leCorrupted);
780         sd->map->freeEntry(fileno); // may not be immediately successful
781         freeUnusedSlot(slotId, true);
782         ++counts.dupcount;
783         break;
784     }
785 
786     case LoadingEntry::leCorrupted: {
787         // previously seen slots messed things up so we must ignore this one
788         freeUnusedSlot(slotId, true);
789         break;
790     }
791 
792     case LoadingEntry::leIgnored: {
793         // already replaced by a fresher or colliding from-network entry
794         freeUnusedSlot(slotId, false);
795         break;
796     }
797     }
798 }
799 
800