1 /*
2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 79 Disk IO Routines */
10
11 #include "squid.h"
12 #include "base/AsyncJobCalls.h"
13 #include "fs/rock/RockDbCell.h"
14 #include "fs/rock/RockRebuild.h"
15 #include "fs/rock/RockSwapDir.h"
16 #include "fs_io.h"
17 #include "globals.h"
18 #include "ipc/StoreMap.h"
19 #include "md5.h"
20 #include "SquidTime.h"
21 #include "Store.h"
22 #include "store_rebuild.h"
23 #include "tools.h"
24
25 #include <cerrno>
26
27 CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
28
29 /**
30 \defgroup RockFsRebuild Rock Store Rebuild
31 \ingroup Filesystems
32 *
33 \section Overview Overview
34 * Several layers of information are manipualted during the rebuild:
35 \par
36 * Store Entry: Response message plus all the metainformation associated with
37 * it. Identified by store key. At any given time, from Squid point
38 * of view, there is only one entry with a given key, but several
39 * different entries with the same key can be observed in any historical
40 * archive (such as an access log or a store database).
41 \par
42 * Slot chain: A sequence of db slots representing a Store Entry state at
43 * some point in time. Identified by key+version combination. Due to
44 * transaction aborts, crashes, and idle periods, some chains may contain
45 * incomplete or stale information. We assume that no two different chains
46 * have the same key and version. If that assumption fails, we may serve a
47 * hodgepodge entry during rebuild, until "extra" slots are loaded/noticed.
48 \par
49 * iNode: The very first db slot in an entry slot chain. This slot contains
50 * at least the beginning of Store Entry metadata, but most 32KB inodes contain
51 * the entire metadata, HTTP headers, and HTTP body.
52 \par
53 * Db slot: A db record containing a piece of a single store entry and linked
54 * to other slots with the same key and version fields, forming a chain.
55 * Slots are identified by their absolute position in the database file,
56 * which is naturally unique.
57 \par
58 * When information from the newly loaded db slot contradicts the entry-level
59 * information collected so far (e.g., the versions do not match or the total
60 * chain size after the slot contribution exceeds the expected number), the
61 * whole entry (and not just the chain or the slot!) is declared corrupted.
62 \par
63 * Why invalidate the whole entry? Rock Store is written for high-load
64 * environments with large caches, where there is usually very few idle slots
65 * in the database. A space occupied by a purged entry is usually immediately
66 * reclaimed. A Squid crash or a transaction abort is rather unlikely to
67 * leave a relatively large number of stale slots in the database. Thus, the
68 * number of potentially corrupted entries is relatively small. On the other
69 * hand, the damage from serving a single hadgepodge entry may be significant
70 * to the user. In such an environment, invalidating the whole entry has
71 * negligible performance impact but saves us from high-damage bugs.
72 */
73
74 namespace Rock
75 {
76
77 /// low-level anti-padding storage class for LoadingEntry and LoadingSlot flags
78 class LoadingFlags
79 {
80 public:
LoadingFlags()81 LoadingFlags(): state(0), anchored(0), mapped(0), finalized(0), freed(0) {}
82
83 /* for LoadingEntry */
84 uint8_t state:3; ///< current entry state (one of the LoadingEntry::State values)
85 uint8_t anchored:1; ///< whether we loaded the inode slot for this entry
86
87 /* for LoadingSlot */
88 uint8_t mapped:1; ///< whether the slot was added to a mapped entry
89 uint8_t finalized:1; ///< whether finalizeOrThrow() has scanned the slot
90 uint8_t freed:1; ///< whether the slot was given to the map as free space
91 };
92
93 /// smart StoreEntry-level info pointer (hides anti-padding LoadingParts arrays)
94 class LoadingEntry
95 {
96 public:
97 LoadingEntry(const sfileno fileNo, LoadingParts &source);
98
99 uint64_t &size; ///< payload seen so far
100 uint32_t &version; ///< DbCellHeader::version to distinguish same-URL chains
101
102 /// possible store entry states during index rebuild
103 typedef enum { leEmpty = 0, leLoading, leLoaded, leCorrupted, leIgnored } State;
104
105 /* LoadingFlags::state */
state() const106 State state() const { return static_cast<State>(flags.state); }
state(State aState) const107 void state(State aState) const { flags.state = aState; }
108
109 /* LoadingFlags::anchored */
anchored() const110 bool anchored() const { return flags.anchored; }
anchored(const bool beAnchored)111 void anchored(const bool beAnchored) { flags.anchored = beAnchored; }
112
113 private:
114 LoadingFlags &flags; ///< entry flags (see the above accessors) are ours
115 };
116
117 /// smart db slot-level info pointer (hides anti-padding LoadingParts arrays)
118 class LoadingSlot
119 {
120 public:
121 LoadingSlot(const SlotId slotId, LoadingParts &source);
122
123 /// another slot in some chain belonging to the same entry (unordered!)
124 Ipc::StoreMapSliceId &more;
125
126 /* LoadingFlags::mapped */
mapped() const127 bool mapped() const { return flags.mapped; }
mapped(const bool beMapped)128 void mapped(const bool beMapped) { flags.mapped = beMapped; }
129
130 /* LoadingFlags::finalized */
finalized() const131 bool finalized() const { return flags.finalized; }
finalized(const bool beFinalized)132 void finalized(const bool beFinalized) { flags.finalized = beFinalized; }
133
134 /* LoadingFlags::freed */
freed() const135 bool freed() const { return flags.freed; }
freed(const bool beFreed)136 void freed(const bool beFreed) { flags.freed = beFreed; }
137
used() const138 bool used() const { return freed() || mapped() || more != -1; }
139
140 private:
141 LoadingFlags &flags; ///< slot flags (see the above accessors) are ours
142 };
143
144 /// information about store entries being loaded from disk (and their slots)
145 /// used for identifying partially stored/loaded entries
146 class LoadingParts
147 {
148 public:
149 LoadingParts(int dbSlotLimit, int dbEntryLimit);
150 LoadingParts(LoadingParts&&) = delete; // paranoid (often too huge to copy)
151
152 private:
153 friend class LoadingEntry;
154 friend class LoadingSlot;
155
156 /* Anti-padding storage. With millions of entries, padding matters! */
157
158 /* indexed by sfileno */
159 std::vector<uint64_t> sizes; ///< LoadingEntry::size for all entries
160 std::vector<uint32_t> versions; ///< LoadingEntry::version for all entries
161
162 /* indexed by SlotId */
163 std::vector<Ipc::StoreMapSliceId> mores; ///< LoadingSlot::more for all slots
164
165 /* entry flags are indexed by sfileno; slot flags -- by SlotId */
166 std::vector<LoadingFlags> flags; ///< all LoadingEntry and LoadingSlot flags
167 };
168
169 } /* namespace Rock */
170
171 /* LoadingEntry */
172
LoadingEntry(const sfileno fileNo,LoadingParts & source)173 Rock::LoadingEntry::LoadingEntry(const sfileno fileNo, LoadingParts &source):
174 size(source.sizes.at(fileNo)),
175 version(source.versions.at(fileNo)),
176 flags(source.flags.at(fileNo))
177 {
178 }
179
180 /* LoadingSlot */
181
LoadingSlot(const SlotId slotId,LoadingParts & source)182 Rock::LoadingSlot::LoadingSlot(const SlotId slotId, LoadingParts &source):
183 more(source.mores.at(slotId)),
184 flags(source.flags.at(slotId))
185 {
186 }
187
188 /* LoadingParts */
189
LoadingParts(const int dbEntryLimit,const int dbSlotLimit)190 Rock::LoadingParts::LoadingParts(const int dbEntryLimit, const int dbSlotLimit):
191 sizes(dbEntryLimit, 0),
192 versions(dbEntryLimit, 0),
193 mores(dbSlotLimit, -1),
194 flags(dbSlotLimit)
195 {
196 assert(sizes.size() == versions.size()); // every entry has both fields
197 assert(sizes.size() <= mores.size()); // every entry needs slot(s)
198 assert(mores.size() == flags.size()); // every slot needs a set of flags
199 }
200
201 /* Rebuild */
202
Rebuild(SwapDir * dir)203 Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
204 sd(dir),
205 parts(nullptr),
206 dbSize(0),
207 dbSlotSize(0),
208 dbSlotLimit(0),
209 dbEntryLimit(0),
210 fd(-1),
211 dbOffset(0),
212 loadingPos(0),
213 validationPos(0)
214 {
215 assert(sd);
216 dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
217 dbSlotSize = sd->slotSize;
218 dbEntryLimit = sd->entryLimitActual();
219 dbSlotLimit = sd->slotLimitActual();
220 assert(dbEntryLimit <= dbSlotLimit);
221 }
222
~Rebuild()223 Rock::Rebuild::~Rebuild()
224 {
225 if (fd >= 0)
226 file_close(fd);
227 delete parts;
228 }
229
230 /// prepares and initiates entry loading sequence
231 void
start()232 Rock::Rebuild::start()
233 {
234 // in SMP mode, only the disker is responsible for populating the map
235 if (UsingSmp() && !IamDiskProcess()) {
236 debugs(47, 2, "Non-disker skips rebuilding of cache_dir #" <<
237 sd->index << " from " << sd->filePath);
238 mustStop("non-disker");
239 return;
240 }
241
242 debugs(47, DBG_IMPORTANT, "Loading cache_dir #" << sd->index <<
243 " from " << sd->filePath);
244
245 fd = file_open(sd->filePath, O_RDONLY | O_BINARY);
246 if (fd < 0)
247 failure("cannot open db", errno);
248
249 char hdrBuf[SwapDir::HeaderSize];
250 if (read(fd, hdrBuf, sizeof(hdrBuf)) != SwapDir::HeaderSize)
251 failure("cannot read db header", errno);
252
253 // slot prefix of SM_PAGE_SIZE should fit both core entry header and ours
254 assert(sizeof(DbCellHeader) < SM_PAGE_SIZE);
255 buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
256
257 dbOffset = SwapDir::HeaderSize;
258
259 parts = new LoadingParts(dbEntryLimit, dbSlotLimit);
260
261 checkpoint();
262 }
263
264 /// continues after a pause if not done
265 void
checkpoint()266 Rock::Rebuild::checkpoint()
267 {
268 if (!done())
269 eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
270 }
271
272 bool
doneLoading() const273 Rock::Rebuild::doneLoading() const
274 {
275 return loadingPos >= dbSlotLimit;
276 }
277
278 bool
doneValidating() const279 Rock::Rebuild::doneValidating() const
280 {
281 // paranoid slot checking is only enabled with squid -S
282 return validationPos >= dbEntryLimit +
283 (opt_store_doublecheck ? dbSlotLimit : 0);
284 }
285
286 bool
doneAll() const287 Rock::Rebuild::doneAll() const
288 {
289 return doneLoading() && doneValidating() && AsyncJob::doneAll();
290 }
291
292 void
Steps(void * data)293 Rock::Rebuild::Steps(void *data)
294 {
295 // use async call to enable job call protection that time events lack
296 CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps);
297 }
298
299 void
steps()300 Rock::Rebuild::steps()
301 {
302 if (!doneLoading())
303 loadingSteps();
304 else
305 validationSteps();
306
307 checkpoint();
308 }
309
310 void
loadingSteps()311 Rock::Rebuild::loadingSteps()
312 {
313 debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
314 dbOffset << " <= " << dbSize);
315
316 // Balance our desire to maximize the number of entries processed at once
317 // (and, hence, minimize overheads and total rebuild time) with a
318 // requirement to also process Coordinator events, disk I/Os, etc.
319 const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
320 const timeval loopStart = current_time;
321
322 int loaded = 0;
323 while (!doneLoading()) {
324 loadOneSlot();
325 dbOffset += dbSlotSize;
326 ++loadingPos;
327 ++loaded;
328
329 if (counts.scancount % 1000 == 0)
330 storeRebuildProgress(sd->index, dbSlotLimit, counts.scancount);
331
332 if (opt_foreground_rebuild)
333 continue; // skip "few entries at a time" check below
334
335 getCurrentTime();
336 const double elapsedMsec = tvSubMsec(loopStart, current_time);
337 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
338 debugs(47, 5, HERE << "pausing after " << loaded << " entries in " <<
339 elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per entry");
340 break;
341 }
342 }
343 }
344
345 Rock::LoadingEntry
loadingEntry(const sfileno fileNo)346 Rock::Rebuild::loadingEntry(const sfileno fileNo)
347 {
348 Must(0 <= fileNo && fileNo < dbEntryLimit);
349 return LoadingEntry(fileNo, *parts);
350 }
351
352 Rock::LoadingSlot
loadingSlot(const SlotId slotId)353 Rock::Rebuild::loadingSlot(const SlotId slotId)
354 {
355 Must(0 <= slotId && slotId < dbSlotLimit);
356 Must(slotId <= loadingPos); // cannot look ahead
357 return LoadingSlot(slotId, *parts);
358 }
359
360 void
loadOneSlot()361 Rock::Rebuild::loadOneSlot()
362 {
363 debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
364 dbOffset << " <= " << dbSize);
365
366 ++counts.scancount;
367
368 if (lseek(fd, dbOffset, SEEK_SET) < 0)
369 failure("cannot seek to db entry", errno);
370
371 buf.reset();
372
373 if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
374 return;
375
376 const SlotId slotId = loadingPos;
377
378 // get our header
379 DbCellHeader header;
380 if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
381 debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
382 "Ignoring truncated " << buf.contentSize() << "-byte " <<
383 "cache entry meta data at " << dbOffset);
384 freeUnusedSlot(slotId, true);
385 return;
386 }
387 memcpy(&header, buf.content(), sizeof(header));
388 if (header.empty()) {
389 freeUnusedSlot(slotId, false);
390 return;
391 }
392 if (!header.sane(dbSlotSize, dbSlotLimit)) {
393 debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
394 "Ignoring malformed cache entry meta data at " << dbOffset);
395 freeUnusedSlot(slotId, true);
396 return;
397 }
398 buf.consume(sizeof(header)); // optimize to avoid memmove()
399
400 useNewSlot(slotId, header);
401 }
402
403 /// parse StoreEntry basics and add them to the map, returning true on success
404 bool
importEntry(Ipc::StoreMapAnchor & anchor,const sfileno fileno,const DbCellHeader & header)405 Rock::Rebuild::importEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header)
406 {
407 cache_key key[SQUID_MD5_DIGEST_LENGTH];
408 StoreEntry loadedE;
409 const uint64_t knownSize = header.entrySize > 0 ?
410 header.entrySize : anchor.basics.swap_file_sz.load();
411 if (!storeRebuildParseEntry(buf, loadedE, key, counts, knownSize))
412 return false;
413
414 // the entry size may be unknown, but if it is known, it is authoritative
415
416 debugs(47, 8, "importing basics for entry " << fileno <<
417 " inode.entrySize: " << header.entrySize <<
418 " swap_file_sz: " << loadedE.swap_file_sz);
419 anchor.set(loadedE);
420
421 // we have not validated whether all db cells for this entry were loaded
422 EBIT_CLR(anchor.basics.flags, ENTRY_VALIDATED);
423
424 // loadedE->dump(5);
425
426 return true;
427 }
428
429 void
validationSteps()430 Rock::Rebuild::validationSteps()
431 {
432 debugs(47, 5, sd->index << " validating from " << validationPos);
433
434 // see loadingSteps() for the rationale; TODO: avoid duplication
435 const int maxSpentMsec = 50; // keep small: validation does not do I/O
436 const timeval loopStart = current_time;
437
438 int validated = 0;
439 while (!doneValidating()) {
440 if (validationPos < dbEntryLimit)
441 validateOneEntry(validationPos);
442 else
443 validateOneSlot(validationPos - dbEntryLimit);
444 ++validationPos;
445 ++validated;
446
447 if (validationPos % 1000 == 0)
448 debugs(20, 2, "validated: " << validationPos);
449
450 if (opt_foreground_rebuild)
451 continue; // skip "few entries at a time" check below
452
453 getCurrentTime();
454 const double elapsedMsec = tvSubMsec(loopStart, current_time);
455 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
456 debugs(47, 5, "pausing after " << validated << " entries in " <<
457 elapsedMsec << "ms; " << (elapsedMsec/validated) << "ms per entry");
458 break;
459 }
460 }
461 }
462
463 /// Either make the entry accessible to all or throw.
464 /// This method assumes it is called only when no more entry slots are expected.
465 void
finalizeOrThrow(const sfileno fileNo,LoadingEntry & le)466 Rock::Rebuild::finalizeOrThrow(const sfileno fileNo, LoadingEntry &le)
467 {
468 // walk all map-linked slots, starting from inode, and mark each
469 Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileNo);
470 Must(le.size > 0); // paranoid
471 uint64_t mappedSize = 0;
472 SlotId slotId = anchor.start;
473 while (slotId >= 0 && mappedSize < le.size) {
474 LoadingSlot slot = loadingSlot(slotId); // throws if we have not loaded that slot
475 Must(!slot.finalized()); // no loops or stealing from other entries
476 Must(slot.mapped()); // all our slots should be in the sd->map
477 Must(!slot.freed()); // all our slots should still be present
478 slot.finalized(true);
479
480 Ipc::StoreMapSlice &mapSlice = sd->map->writeableSlice(fileNo, slotId);
481 Must(mapSlice.size > 0); // paranoid
482 mappedSize += mapSlice.size;
483 slotId = mapSlice.next;
484 }
485 /* no hodgepodge entries: one entry - one full chain and no leftovers */
486 Must(slotId < 0);
487 Must(mappedSize == le.size);
488
489 if (!anchor.basics.swap_file_sz)
490 anchor.basics.swap_file_sz = le.size;
491 EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED);
492 le.state(LoadingEntry::leLoaded);
493 sd->map->closeForWriting(fileNo);
494 ++counts.objcount;
495 }
496
497 /// Either make the entry accessible to all or free it.
498 /// This method must only be called when no more entry slots are expected.
499 void
finalizeOrFree(const sfileno fileNo,LoadingEntry & le)500 Rock::Rebuild::finalizeOrFree(const sfileno fileNo, LoadingEntry &le)
501 {
502 try {
503 finalizeOrThrow(fileNo, le);
504 } catch (const std::exception &ex) {
505 freeBadEntry(fileNo, ex.what());
506 }
507 }
508
509 void
validateOneEntry(const sfileno fileNo)510 Rock::Rebuild::validateOneEntry(const sfileno fileNo)
511 {
512 LoadingEntry entry = loadingEntry(fileNo);
513 switch (entry.state()) {
514
515 case LoadingEntry::leLoading:
516 finalizeOrFree(fileNo, entry);
517 break;
518
519 case LoadingEntry::leEmpty: // no entry hashed to this position
520 case LoadingEntry::leLoaded: // we have already unlocked this entry
521 case LoadingEntry::leCorrupted: // we have already removed this entry
522 case LoadingEntry::leIgnored: // we have already discarded this entry
523 break;
524 }
525 }
526
527 void
validateOneSlot(const SlotId slotId)528 Rock::Rebuild::validateOneSlot(const SlotId slotId)
529 {
530 const LoadingSlot slot = loadingSlot(slotId);
531 // there should not be any unprocessed slots left
532 Must(slot.freed() || (slot.mapped() && slot.finalized()));
533 }
534
535 /// Marks remaining bad entry slots as free and unlocks the entry. The map
536 /// cannot do this because Loading entries may have holes in the slots chain.
537 void
freeBadEntry(const sfileno fileno,const char * eDescription)538 Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription)
539 {
540 debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription <<
541 " entry " << fileno << " is ignored during rebuild");
542
543 LoadingEntry le = loadingEntry(fileno);
544 le.state(LoadingEntry::leCorrupted);
545
546 Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
547 assert(anchor.start < 0 || le.size > 0);
548 for (SlotId slotId = anchor.start; slotId >= 0;) {
549 const SlotId next = loadingSlot(slotId).more;
550 freeSlot(slotId, true);
551 slotId = next;
552 }
553
554 sd->map->forgetWritingEntry(fileno);
555 }
556
557 void
swanSong()558 Rock::Rebuild::swanSong()
559 {
560 debugs(47,3, HERE << "cache_dir #" << sd->index << " rebuild level: " <<
561 StoreController::store_dirs_rebuilding);
562 --StoreController::store_dirs_rebuilding;
563 storeRebuildComplete(&counts);
564 }
565
566 void
failure(const char * msg,int errNo)567 Rock::Rebuild::failure(const char *msg, int errNo)
568 {
569 debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
570 dbOffset << " <= " << dbSize);
571
572 if (errNo)
573 debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir rebuild failure: " << xstrerr(errNo));
574 debugs(47, DBG_CRITICAL, "Do you need to run 'squid -z' to initialize storage?");
575
576 assert(sd);
577 fatalf("Rock cache_dir[%d] rebuild of %s failed: %s.",
578 sd->index, sd->filePath, msg);
579 }
580
581 /// adds slot to the free slot index
582 void
freeSlot(const SlotId slotId,const bool invalid)583 Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid)
584 {
585 debugs(47,5, sd->index << " frees slot " << slotId);
586 LoadingSlot slot = loadingSlot(slotId);
587 assert(!slot.freed());
588 slot.freed(true);
589
590 if (invalid) {
591 ++counts.invalid;
592 //sd->unlink(fileno); leave garbage on disk, it should not hurt
593 }
594
595 Ipc::Mem::PageId pageId;
596 pageId.pool = sd->index+1;
597 pageId.number = slotId+1;
598 sd->freeSlots->push(pageId);
599 }
600
601 /// freeSlot() for never-been-mapped slots
602 void
freeUnusedSlot(const SlotId slotId,const bool invalid)603 Rock::Rebuild::freeUnusedSlot(const SlotId slotId, const bool invalid)
604 {
605 LoadingSlot slot = loadingSlot(slotId);
606 // mapped slots must be freed via freeBadEntry() to keep the map in sync
607 assert(!slot.mapped());
608 freeSlot(slotId, invalid);
609 }
610
611 /// adds slot to the entry chain in the map
612 void
mapSlot(const SlotId slotId,const DbCellHeader & header)613 Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header)
614 {
615 LoadingSlot slot = loadingSlot(slotId);
616 assert(!slot.mapped());
617 assert(!slot.freed());
618 slot.mapped(true);
619
620 Ipc::StoreMapSlice slice;
621 slice.next = header.nextSlot;
622 slice.size = header.payloadSize;
623 sd->map->importSlice(slotId, slice);
624 }
625
626 template <class SlotIdType> // accommodates atomic and simple SlotIds.
627 void
chainSlots(SlotIdType & from,const SlotId to)628 Rock::Rebuild::chainSlots(SlotIdType &from, const SlotId to)
629 {
630 LoadingSlot slot = loadingSlot(to);
631 assert(slot.more < 0);
632 slot.more = from; // may still be unset
633 from = to;
634 }
635
636 /// adds slot to an existing entry chain; caller must check that the slot
637 /// belongs to the chain it is being added to
638 void
addSlotToEntry(const sfileno fileno,const SlotId slotId,const DbCellHeader & header)639 Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
640 {
641 LoadingEntry le = loadingEntry(fileno);
642 Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
643
644 debugs(47,9, "adding " << slotId << " to entry " << fileno);
645 // we do not need to preserve the order
646 if (le.anchored()) {
647 LoadingSlot inode = loadingSlot(anchor.start);
648 chainSlots(inode.more, slotId);
649 } else {
650 chainSlots(anchor.start, slotId);
651 }
652
653 le.size += header.payloadSize; // must precede freeBadEntry() calls
654
655 if (header.firstSlot == slotId) {
656 debugs(47,5, "added inode");
657
658 if (le.anchored()) { // we have already added another inode slot
659 freeBadEntry(fileno, "inode conflict");
660 ++counts.clashcount;
661 return;
662 }
663
664 le.anchored(true);
665
666 if (!importEntry(anchor, fileno, header)) {
667 freeBadEntry(fileno, "corrupted metainfo");
668 return;
669 }
670
671 // set total entry size and/or check it for consistency
672 if (const uint64_t totalSize = header.entrySize) {
673 assert(totalSize != static_cast<uint64_t>(-1));
674 if (!anchor.basics.swap_file_sz) {
675 anchor.basics.swap_file_sz = totalSize;
676 assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
677 } else if (totalSize != anchor.basics.swap_file_sz) {
678 freeBadEntry(fileno, "size mismatch");
679 return;
680 }
681 }
682 }
683
684 const uint64_t totalSize = anchor.basics.swap_file_sz; // may be 0/unknown
685
686 if (totalSize > 0 && le.size > totalSize) { // overflow
687 debugs(47, 8, "overflow: " << le.size << " > " << totalSize);
688 freeBadEntry(fileno, "overflowing");
689 return;
690 }
691
692 mapSlot(slotId, header);
693 if (totalSize > 0 && le.size == totalSize)
694 finalizeOrFree(fileno, le); // entry is probably fully loaded now
695 }
696
697 /// initialize housekeeping information for a newly accepted entry
698 void
primeNewEntry(Ipc::StoreMap::Anchor & anchor,const sfileno fileno,const DbCellHeader & header)699 Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno, const DbCellHeader &header)
700 {
701 anchor.setKey(reinterpret_cast<const cache_key*>(header.key));
702 assert(header.firstSlot >= 0);
703 anchor.start = -1; // addSlotToEntry() will set it
704
705 assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
706
707 LoadingEntry le = loadingEntry(fileno);
708 le.state(LoadingEntry::leLoading);
709 le.version = header.version;
710 le.size = 0;
711 }
712
713 /// handle a slot from an entry that we have not seen before
714 void
startNewEntry(const sfileno fileno,const SlotId slotId,const DbCellHeader & header)715 Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
716 {
717 // A miss may have been stored at our fileno while we were loading other
718 // slots from disk. We ought to preserve that entry because it is fresher.
719 const bool overwriteExisting = false;
720 if (Ipc::StoreMap::Anchor *anchor = sd->map->openForWritingAt(fileno, overwriteExisting)) {
721 primeNewEntry(*anchor, fileno, header);
722 addSlotToEntry(fileno, slotId, header); // may fail
723 assert(anchor->basics.swap_file_sz != static_cast<uint64_t>(-1));
724 } else {
725 // A new from-network entry is occupying our map slot; let it be, but
726 // save us from the trouble of going through the above motions again.
727 LoadingEntry le = loadingEntry(fileno);
728 le.state(LoadingEntry::leIgnored);
729 freeUnusedSlot(slotId, false);
730 }
731 }
732
733 /// does the header belong to the fileno entry being loaded?
734 bool
sameEntry(const sfileno fileno,const DbCellHeader & header) const735 Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const
736 {
737 // Header updates always result in multi-start chains and often
738 // result in multi-version chains so we can only compare the keys.
739 const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno);
740 return anchor.sameKey(reinterpret_cast<const cache_key*>(header.key));
741 }
742
743 /// handle freshly loaded (and validated) db slot header
744 void
useNewSlot(const SlotId slotId,const DbCellHeader & header)745 Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
746 {
747 const cache_key *const key =
748 reinterpret_cast<const cache_key*>(header.key);
749 const sfileno fileno = sd->map->fileNoByKey(key);
750 assert(0 <= fileno && fileno < dbEntryLimit);
751
752 LoadingEntry le = loadingEntry(fileno);
753 debugs(47,9, "entry " << fileno << " state: " << le.state() << ", inode: " <<
754 header.firstSlot << ", size: " << header.payloadSize);
755
756 switch (le.state()) {
757
758 case LoadingEntry::leEmpty: {
759 startNewEntry(fileno, slotId, header);
760 break;
761 }
762
763 case LoadingEntry::leLoading: {
764 if (sameEntry(fileno, header)) {
765 addSlotToEntry(fileno, slotId, header); // may fail
766 } else {
767 // either the loading chain or this slot is stale;
768 // be conservative and ignore both (and any future ones)
769 freeBadEntry(fileno, "duplicated");
770 freeUnusedSlot(slotId, true);
771 ++counts.dupcount;
772 }
773 break;
774 }
775
776 case LoadingEntry::leLoaded: {
777 // either the previously loaded chain or this slot is stale;
778 // be conservative and ignore both (and any future ones)
779 le.state(LoadingEntry::leCorrupted);
780 sd->map->freeEntry(fileno); // may not be immediately successful
781 freeUnusedSlot(slotId, true);
782 ++counts.dupcount;
783 break;
784 }
785
786 case LoadingEntry::leCorrupted: {
787 // previously seen slots messed things up so we must ignore this one
788 freeUnusedSlot(slotId, true);
789 break;
790 }
791
792 case LoadingEntry::leIgnored: {
793 // already replaced by a fresher or colliding from-network entry
794 freeUnusedSlot(slotId, false);
795 break;
796 }
797 }
798 }
799
800