1 /*
2  * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 47    Store Directory Routines */
10 
11 #include "squid.h"
12 #include "cache_cf.h"
13 #include "CollapsedForwarding.h"
14 #include "ConfigOption.h"
15 #include "DiskIO/DiskIOModule.h"
16 #include "DiskIO/DiskIOStrategy.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fs/rock/RockHeaderUpdater.h"
20 #include "fs/rock/RockIoRequests.h"
21 #include "fs/rock/RockIoState.h"
22 #include "fs/rock/RockRebuild.h"
23 #include "fs/rock/RockSwapDir.h"
24 #include "globals.h"
25 #include "ipc/mem/Pages.h"
26 #include "MemObject.h"
27 #include "Parsing.h"
28 #include "SquidConfig.h"
29 #include "SquidMath.h"
30 #include "tools.h"
31 
32 #include <cstdlib>
33 #include <iomanip>
34 #include <limits>
35 
36 #if HAVE_SYS_STAT_H
37 #include <sys/stat.h>
38 #endif
39 
40 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
41 
SwapDir()42 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
43     slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
44     waitingForPage(NULL)
45 {
46 }
47 
~SwapDir()48 Rock::SwapDir::~SwapDir()
49 {
50     delete io;
51     delete map;
52     safe_free(filePath);
53 }
54 
55 // called when Squid core needs a StoreEntry with a given key
56 StoreEntry *
get(const cache_key * key)57 Rock::SwapDir::get(const cache_key *key)
58 {
59     if (!map || !theFile || !theFile->canRead())
60         return NULL;
61 
62     sfileno filen;
63     const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
64     if (!slot)
65         return NULL;
66 
67     // create a brand new store entry and initialize it with stored basics
68     StoreEntry *e = new StoreEntry();
69     e->createMemObject();
70     anchorEntry(*e, filen, *slot);
71     trackReferences(*e);
72     return e;
73 }
74 
75 bool
anchorToCache(StoreEntry & entry,bool & inSync)76 Rock::SwapDir::anchorToCache(StoreEntry &entry, bool &inSync)
77 {
78     if (!map || !theFile || !theFile->canRead())
79         return false;
80 
81     sfileno filen;
82     const Ipc::StoreMapAnchor *const slot = map->openForReading(
83             reinterpret_cast<cache_key*>(entry.key), filen);
84     if (!slot)
85         return false;
86 
87     anchorEntry(entry, filen, *slot);
88     inSync = updateAnchoredWith(entry, *slot);
89     return true; // even if inSync is false
90 }
91 
92 bool
updateAnchored(StoreEntry & entry)93 Rock::SwapDir::updateAnchored(StoreEntry &entry)
94 {
95     if (!map || !theFile || !theFile->canRead())
96         return false;
97 
98     assert(entry.hasDisk(index));
99 
100     const Ipc::StoreMapAnchor &s = map->readableEntry(entry.swap_filen);
101     return updateAnchoredWith(entry, s);
102 }
103 
104 bool
updateAnchoredWith(StoreEntry & entry,const Ipc::StoreMapAnchor & anchor)105 Rock::SwapDir::updateAnchoredWith(StoreEntry &entry, const Ipc::StoreMapAnchor &anchor)
106 {
107     entry.swap_file_sz = anchor.basics.swap_file_sz;
108     return true;
109 }
110 
111 void
anchorEntry(StoreEntry & e,const sfileno filen,const Ipc::StoreMapAnchor & anchor)112 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
113 {
114     anchor.exportInto(e);
115 
116     const bool complete = anchor.complete();
117     e.store_status = complete ? STORE_OK : STORE_PENDING;
118     // SWAPOUT_WRITING: even though another worker writes?
119     e.attachToDisk(index, filen, complete ? SWAPOUT_DONE : SWAPOUT_WRITING);
120 
121     e.ping_status = PING_NONE;
122 
123     EBIT_SET(e.flags, ENTRY_VALIDATED);
124 }
125 
disconnect(StoreEntry & e)126 void Rock::SwapDir::disconnect(StoreEntry &e)
127 {
128     assert(e.hasDisk(index));
129 
130     ignoreReferences(e);
131 
132     // do not rely on e.swap_status here because there is an async delay
133     // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
134 
135     // since e has swap_filen, its slot is locked for reading and/or writing
136     // but it is difficult to know whether THIS worker is reading or writing e,
137     // especially since we may switch from writing to reading. This code relies
138     // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
139     if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
140             dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
141         map->abortWriting(e.swap_filen);
142         e.detachFromDisk();
143         dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
144         Store::Root().stopSharing(e); // broadcasts after the change
145     } else {
146         map->closeForReading(e.swap_filen);
147         e.detachFromDisk();
148     }
149 }
150 
151 uint64_t
currentSize() const152 Rock::SwapDir::currentSize() const
153 {
154     const uint64_t spaceSize = !freeSlots ?
155                                maxSize() : (slotSize * freeSlots->size());
156     // everything that is not free is in use
157     return maxSize() - spaceSize;
158 }
159 
160 uint64_t
currentCount() const161 Rock::SwapDir::currentCount() const
162 {
163     return map ? map->entryCount() : 0;
164 }
165 
166 /// In SMP mode only the disker process reports stats to avoid
167 /// counting the same stats by multiple processes.
168 bool
doReportStat() const169 Rock::SwapDir::doReportStat() const
170 {
171     return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
172 }
173 
174 void
finalizeSwapoutSuccess(const StoreEntry &)175 Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &)
176 {
177     // nothing to do
178 }
179 
180 void
finalizeSwapoutFailure(StoreEntry & entry)181 Rock::SwapDir::finalizeSwapoutFailure(StoreEntry &entry)
182 {
183     debugs(47, 5, entry);
184     disconnect(entry); // calls abortWriting() to free the disk entry
185 }
186 
187 int64_t
slotLimitAbsolute() const188 Rock::SwapDir::slotLimitAbsolute() const
189 {
190     // the max value is an invalid one; all values must be below the limit
191     assert(std::numeric_limits<Ipc::StoreMapSliceId>::max() ==
192            std::numeric_limits<SlotId>::max());
193     return std::numeric_limits<SlotId>::max();
194 }
195 
196 int64_t
slotLimitActual() const197 Rock::SwapDir::slotLimitActual() const
198 {
199     const int64_t sWanted = (maxSize() - HeaderSize)/slotSize;
200     const int64_t sLimitLo = map ? map->sliceLimit() : 0; // dynamic shrinking unsupported
201     const int64_t sLimitHi = slotLimitAbsolute();
202     return min(max(sLimitLo, sWanted), sLimitHi);
203 }
204 
205 int64_t
entryLimitActual() const206 Rock::SwapDir::entryLimitActual() const
207 {
208     return min(slotLimitActual(), entryLimitAbsolute());
209 }
210 
211 // TODO: encapsulate as a tool
212 void
create()213 Rock::SwapDir::create()
214 {
215     assert(path);
216     assert(filePath);
217 
218     if (UsingSmp() && !IamDiskProcess()) {
219         debugs (47,3, HERE << "disker will create in " << path);
220         return;
221     }
222 
223     debugs (47,3, HERE << "creating in " << path);
224 
225     struct stat dir_sb;
226     if (::stat(path, &dir_sb) == 0) {
227         struct stat file_sb;
228         if (::stat(filePath, &file_sb) == 0) {
229             debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
230             return;
231         }
232         // else the db file is not there or is not accessible, and we will try
233         // to create it later below, generating a detailed error on failures.
234     } else { // path does not exist or is inaccessible
235         // If path exists but is not accessible, mkdir() below will fail, and
236         // the admin should see the error and act accordingly, so there is
237         // no need to distinguish ENOENT from other possible stat() errors.
238         debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
239         const int res = mkdir(path, 0700);
240         if (res != 0)
241             createError("mkdir");
242     }
243 
244     debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
245     const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
246     if (swap < 0)
247         createError("create");
248 
249 #if SLOWLY_FILL_WITH_ZEROS
250     char block[1024];
251     Must(maxSize() % sizeof(block) == 0);
252     memset(block, '\0', sizeof(block));
253 
254     for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
255         if (write(swap, block, sizeof(block)) != sizeof(block))
256             createError("write");
257     }
258 #else
259     if (ftruncate(swap, maxSize()) != 0)
260         createError("truncate");
261 
262     char header[HeaderSize];
263     memset(header, '\0', sizeof(header));
264     if (write(swap, header, sizeof(header)) != sizeof(header))
265         createError("write");
266 #endif
267 
268     close(swap);
269 }
270 
271 // report Rock DB creation error and exit
272 void
createError(const char * const msg)273 Rock::SwapDir::createError(const char *const msg)
274 {
275     int xerrno = errno; // XXX: where does errno come from?
276     debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
277            filePath << "; " << msg << " error: " << xstrerr(xerrno));
278     fatal("Rock Store db creation error");
279 }
280 
281 void
init()282 Rock::SwapDir::init()
283 {
284     debugs(47,2, HERE);
285 
286     // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
287     // are refcounted. We up our count once to avoid implicit delete's.
288     lock();
289 
290     freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
291 
292     Must(!map);
293     map = new DirMap(inodeMapPath());
294     map->cleaner = this;
295 
296     const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
297     if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
298         debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
299         io = m->createStrategy();
300         io->init();
301     } else {
302         debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
303                ioModule);
304         fatal("Rock Store missing a required DiskIO module");
305     }
306 
307     theFile = io->newFile(filePath);
308     theFile->configure(fileConfig);
309     theFile->open(O_RDWR, 0644, this);
310 
311     // Increment early. Otherwise, if one SwapDir finishes rebuild before
312     // others start, storeRebuildComplete() will think the rebuild is over!
313     // TODO: move store_dirs_rebuilding hack to store modules that need it.
314     ++StoreController::store_dirs_rebuilding;
315 }
316 
317 bool
needsDiskStrand() const318 Rock::SwapDir::needsDiskStrand() const
319 {
320     const bool wontEvenWorkWithoutDisker = Config.workers > 1;
321     const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
322     return InDaemonMode() && (wontEvenWorkWithoutDisker ||
323                               wouldWorkBetterWithDisker);
324 }
325 
326 void
parse(int anIndex,char * aPath)327 Rock::SwapDir::parse(int anIndex, char *aPath)
328 {
329     index = anIndex;
330 
331     path = xstrdup(aPath);
332 
333     // cache store is located at path/db
334     String fname(path);
335     fname.append("/rock");
336     filePath = xstrdup(fname.termedBuf());
337 
338     parseSize(false);
339     parseOptions(0);
340 
341     // Current openForWriting() code overwrites the old slot if needed
342     // and possible, so proactively removing old slots is probably useless.
343     assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
344 
345     validateOptions();
346 }
347 
348 void
reconfigure()349 Rock::SwapDir::reconfigure()
350 {
351     parseSize(true);
352     parseOptions(1);
353     // TODO: can we reconfigure the replacement policy (repl)?
354     validateOptions();
355 }
356 
357 /// parse maximum db disk size
358 void
parseSize(const bool reconfig)359 Rock::SwapDir::parseSize(const bool reconfig)
360 {
361     const int i = GetInteger();
362     if (i < 0)
363         fatal("negative Rock cache_dir size value");
364     const uint64_t new_max_size =
365         static_cast<uint64_t>(i) << 20; // MBytes to Bytes
366     if (!reconfig)
367         max_size = new_max_size;
368     else if (new_max_size != max_size) {
369         debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
370                "cannot be changed dynamically, value left unchanged (" <<
371                (max_size >> 20) << " MB)");
372     }
373 }
374 
375 ConfigOption *
getOptionTree() const376 Rock::SwapDir::getOptionTree() const
377 {
378     ConfigOption *copt = ::SwapDir::getOptionTree();
379     ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(copt);
380     if (vector) {
381         // if copt is actually a ConfigOptionVector
382         vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
383         vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
384         vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
385     } else {
386         // we don't know how to handle copt, as it's not a ConfigOptionVector.
387         // free it (and return nullptr)
388         delete copt;
389         copt = nullptr;
390     }
391     return copt;
392 }
393 
394 bool
allowOptionReconfigure(const char * const option) const395 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
396 {
397     return strcmp(option, "slot-size") != 0 &&
398            ::SwapDir::allowOptionReconfigure(option);
399 }
400 
401 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
402 bool
parseTimeOption(char const * option,const char * value,int reconfig)403 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
404 {
405     // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
406     // including time unit handling. Same for size and rate.
407 
408     time_msec_t *storedTime;
409     if (strcmp(option, "swap-timeout") == 0)
410         storedTime = &fileConfig.ioTimeout;
411     else
412         return false;
413 
414     if (!value)
415         self_destruct();
416 
417     // TODO: handle time units and detect parsing errors better
418     const int64_t parsedValue = strtoll(value, NULL, 10);
419     if (parsedValue < 0) {
420         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
421         self_destruct();
422     }
423 
424     const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
425 
426     if (!reconfig)
427         *storedTime = newTime;
428     else if (*storedTime != newTime) {
429         debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
430                << " cannot be changed dynamically, value left unchanged: " <<
431                *storedTime);
432     }
433 
434     return true;
435 }
436 
437 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
438 void
dumpTimeOption(StoreEntry * e) const439 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
440 {
441     if (fileConfig.ioTimeout)
442         storeAppendPrintf(e, " swap-timeout=%" PRId64,
443                           static_cast<int64_t>(fileConfig.ioTimeout));
444 }
445 
446 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
447 bool
parseRateOption(char const * option,const char * value,int isaReconfig)448 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
449 {
450     int *storedRate;
451     if (strcmp(option, "max-swap-rate") == 0)
452         storedRate = &fileConfig.ioRate;
453     else
454         return false;
455 
456     if (!value)
457         self_destruct();
458 
459     // TODO: handle time units and detect parsing errors better
460     const int64_t parsedValue = strtoll(value, NULL, 10);
461     if (parsedValue < 0) {
462         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
463         self_destruct();
464     }
465 
466     const int newRate = static_cast<int>(parsedValue);
467 
468     if (newRate < 0) {
469         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
470         self_destruct();
471     }
472 
473     if (!isaReconfig)
474         *storedRate = newRate;
475     else if (*storedRate != newRate) {
476         debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
477                << " cannot be changed dynamically, value left unchanged: " <<
478                *storedRate);
479     }
480 
481     return true;
482 }
483 
484 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
485 void
dumpRateOption(StoreEntry * e) const486 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
487 {
488     if (fileConfig.ioRate >= 0)
489         storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
490 }
491 
492 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
493 bool
parseSizeOption(char const * option,const char * value,int reconfig)494 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
495 {
496     uint64_t *storedSize;
497     if (strcmp(option, "slot-size") == 0)
498         storedSize = &slotSize;
499     else
500         return false;
501 
502     if (!value)
503         self_destruct();
504 
505     // TODO: handle size units and detect parsing errors better
506     const uint64_t newSize = strtoll(value, NULL, 10);
507     if (newSize <= 0) {
508         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
509         self_destruct();
510     }
511 
512     if (newSize <= sizeof(DbCellHeader)) {
513         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
514         self_destruct();
515     }
516 
517     if (!reconfig)
518         *storedSize = newSize;
519     else if (*storedSize != newSize) {
520         debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
521                << " cannot be changed dynamically, value left unchanged: " <<
522                *storedSize);
523     }
524 
525     return true;
526 }
527 
528 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
529 void
dumpSizeOption(StoreEntry * e) const530 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
531 {
532     storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
533 }
534 
535 /// check the results of the configuration; only level-0 debugging works here
536 void
validateOptions()537 Rock::SwapDir::validateOptions()
538 {
539     if (slotSize <= 0)
540         fatal("Rock store requires a positive slot-size");
541 
542     const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
543     const int64_t slotSizeRoundingWaste = slotSize;
544     const int64_t maxRoundingWaste =
545         max(maxSizeRoundingWaste, slotSizeRoundingWaste);
546 
547     // an entry consumes at least one slot; round up to reduce false warnings
548     const int64_t blockSize = static_cast<int64_t>(slotSize);
549     const int64_t maxObjSize = max(blockSize,
550                                    ((maxObjectSize()+blockSize-1)/blockSize)*blockSize);
551 
552     // Does the "sfileno*max-size" limit match configured db capacity?
553     const double entriesMayOccupy = entryLimitAbsolute()*static_cast<double>(maxObjSize);
554     if (entriesMayOccupy + maxRoundingWaste < maxSize()) {
555         const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
556         debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to entry limits:" <<
557                "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
558                "\n\tconfigured db slot size: " << slotSize << " bytes" <<
559                "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
560                "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
561                "\n\tdisk space all entries may use: " << entriesMayOccupy << " bytes" <<
562                "\n\tdisk space wasted: " << diskWasteSize << " bytes");
563     }
564 
565     // Does the "absolute slot count" limit match configured db capacity?
566     const double slotsMayOccupy = slotLimitAbsolute()*static_cast<double>(slotSize);
567     if (slotsMayOccupy + maxRoundingWaste < maxSize()) {
568         const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
569         debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to slot limits:" <<
570                "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
571                "\n\tconfigured db slot size: " << slotSize << " bytes" <<
572                "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
573                "\n\tdisk space all slots may use: " << slotsMayOccupy << " bytes" <<
574                "\n\tdisk space wasted: " << diskWasteSize << " bytes");
575     }
576 }
577 
578 void
rebuild()579 Rock::SwapDir::rebuild()
580 {
581     //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
582     AsyncJob::Start(new Rebuild(this));
583 }
584 
585 bool
canStore(const StoreEntry & e,int64_t diskSpaceNeeded,int & load) const586 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
587 {
588     if (diskSpaceNeeded >= 0)
589         diskSpaceNeeded += sizeof(DbCellHeader);
590     if (!::SwapDir::canStore(e, diskSpaceNeeded, load))
591         return false;
592 
593     if (!theFile || !theFile->canWrite())
594         return false;
595 
596     if (!map)
597         return false;
598 
599     // Do not start I/O transaction if there are less than 10% free pages left.
600     // TODO: reserve page instead
601     if (needsDiskStrand() &&
602             Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
603         debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
604         return false;
605     }
606 
607     if (io->shedLoad())
608         return false;
609 
610     load = io->load();
611     return true;
612 }
613 
614 StoreIOState::Pointer
createStoreIO(StoreEntry & e,StoreIOState::STFNCB * cbFile,StoreIOState::STIOCB * cbIo,void * data)615 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
616 {
617     if (!theFile || theFile->error()) {
618         debugs(47,4, HERE << theFile);
619         return NULL;
620     }
621 
622     sfileno filen;
623     Ipc::StoreMapAnchor *const slot =
624         map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
625     if (!slot) {
626         debugs(47, 5, HERE << "map->add failed");
627         return NULL;
628     }
629 
630     assert(filen >= 0);
631     slot->set(e);
632 
633     // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
634     // If that does not happen, the entry will not decrement the read level!
635 
636     Rock::SwapDir::Pointer self(this);
637     IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
638 
639     sio->swap_dirn = index;
640     sio->swap_filen = filen;
641     sio->writeableAnchor_ = slot;
642 
643     debugs(47,5, HERE << "dir " << index << " created new filen " <<
644            std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
645            sio->swap_filen << std::dec << " starting at " <<
646            diskOffset(sio->swap_filen));
647 
648     sio->file(theFile);
649 
650     trackReferences(e);
651     return sio;
652 }
653 
654 StoreIOState::Pointer
createUpdateIO(const Ipc::StoreMapUpdate & update,StoreIOState::STFNCB * cbFile,StoreIOState::STIOCB * cbIo,void * data)655 Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
656 {
657     if (!theFile || theFile->error()) {
658         debugs(47,4, theFile);
659         return nullptr;
660     }
661 
662     Must(update.fresh);
663     Must(update.fresh.fileNo >= 0);
664 
665     Rock::SwapDir::Pointer self(this);
666     IoState *sio = new IoState(self, update.entry, cbFile, cbIo, data);
667 
668     sio->swap_dirn = index;
669     sio->swap_filen = update.fresh.fileNo;
670     sio->writeableAnchor_ = update.fresh.anchor;
671 
672     debugs(47,5, "dir " << index << " updating filen " <<
673            std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
674            sio->swap_filen << std::dec << " starting at " <<
675            diskOffset(sio->swap_filen));
676 
677     sio->file(theFile);
678     return sio;
679 }
680 
681 int64_t
diskOffset(const SlotId sid) const682 Rock::SwapDir::diskOffset(const SlotId sid) const
683 {
684     assert(sid >= 0);
685     return HeaderSize + slotSize*sid;
686 }
687 
688 int64_t
diskOffset(Ipc::Mem::PageId & pageId) const689 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
690 {
691     assert(pageId);
692     return diskOffset(pageId.number - 1);
693 }
694 
695 int64_t
diskOffsetLimit() const696 Rock::SwapDir::diskOffsetLimit() const
697 {
698     assert(map);
699     return diskOffset(map->sliceLimit());
700 }
701 
702 Rock::SlotId
reserveSlotForWriting()703 Rock::SwapDir::reserveSlotForWriting()
704 {
705     Ipc::Mem::PageId pageId;
706 
707     if (freeSlots->pop(pageId)) {
708         const auto slotId = pageId.number - 1;
709         debugs(47, 5, "got a previously free slot: " << slotId);
710         map->prepFreeSlice(slotId);
711         return slotId;
712     }
713 
714     // catch free slots delivered to noteFreeMapSlice()
715     assert(!waitingForPage);
716     waitingForPage = &pageId;
717     if (map->purgeOne()) {
718         assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
719         assert(pageId.set());
720         const auto slotId = pageId.number - 1;
721         debugs(47, 5, "got a previously busy slot: " << slotId);
722         map->prepFreeSlice(slotId);
723         return slotId;
724     }
725     assert(waitingForPage == &pageId);
726     waitingForPage = NULL;
727 
728     // This may happen when the number of available db slots is close to the
729     // number of concurrent requests reading or writing those slots, which may
730     // happen when the db is "small" compared to the request traffic OR when we
731     // are rebuilding and have not loaded "many" entries or empty slots yet.
732     debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
733     throw TexcHere("ran out of free db slots");
734 }
735 
736 bool
validSlotId(const SlotId slotId) const737 Rock::SwapDir::validSlotId(const SlotId slotId) const
738 {
739     return 0 <= slotId && slotId < slotLimitActual();
740 }
741 
742 void
noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)743 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
744 {
745     Ipc::Mem::PageId pageId;
746     pageId.pool = index+1;
747     pageId.number = sliceId+1;
748     if (waitingForPage) {
749         *waitingForPage = pageId;
750         waitingForPage = NULL;
751     } else {
752         freeSlots->push(pageId);
753     }
754 }
755 
756 // tries to open an old entry with swap_filen for reading
757 StoreIOState::Pointer
openStoreIO(StoreEntry & e,StoreIOState::STFNCB * cbFile,StoreIOState::STIOCB * cbIo,void * data)758 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
759 {
760     if (!theFile || theFile->error()) {
761         debugs(47,4, HERE << theFile);
762         return NULL;
763     }
764 
765     if (!e.hasDisk()) {
766         debugs(47,4, HERE << e);
767         return NULL;
768     }
769 
770     // Do not start I/O transaction if there are less than 10% free pages left.
771     // TODO: reserve page instead
772     if (needsDiskStrand() &&
773             Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
774         debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
775         return NULL;
776     }
777 
778     // The are two ways an entry can get swap_filen: our get() locked it for
779     // reading or our storeSwapOutStart() locked it for writing. Peeking at our
780     // locked entry is safe, but no support for reading the entry we swap out.
781     const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
782     if (!slot)
783         return NULL; // we were writing afterall
784 
785     Rock::SwapDir::Pointer self(this);
786     IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
787 
788     sio->swap_dirn = index;
789     sio->swap_filen = e.swap_filen;
790     sio->readableAnchor_ = slot;
791     sio->file(theFile);
792 
793     debugs(47,5, HERE << "dir " << index << " has old filen: " <<
794            std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
795            sio->swap_filen);
796 
797     assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
798     // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
799     // may still be zero and basics.swap_file_sz may grow.
800     assert(slot->basics.swap_file_sz >= e.swap_file_sz);
801 
802     return sio;
803 }
804 
805 void
ioCompletedNotification()806 Rock::SwapDir::ioCompletedNotification()
807 {
808     if (!theFile)
809         fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
810 
811     if (theFile->error()) {
812         int xerrno = errno; // XXX: where does errno come from
813         fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
814                xstrerr(xerrno));
815     }
816 
817     debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
818            std::setw(12) << maxSize() << " disk bytes, " <<
819            std::setw(7) << map->entryLimit() << " entries, and " <<
820            std::setw(7) << map->sliceLimit() << " slots");
821 
822     rebuild();
823 }
824 
825 void
closeCompleted()826 Rock::SwapDir::closeCompleted()
827 {
828     theFile = NULL;
829 }
830 
831 void
readCompleted(const char *,int rlen,int errflag,RefCount<::ReadRequest> r)832 Rock::SwapDir::readCompleted(const char *, int rlen, int errflag, RefCount< ::ReadRequest> r)
833 {
834     ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
835     assert(request);
836     IoState::Pointer sio = request->sio;
837     sio->handleReadCompletion(*request, rlen, errflag);
838 }
839 
840 void
writeCompleted(int errflag,size_t,RefCount<::WriteRequest> r)841 Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
842 {
843     // TODO: Move details into IoState::handleWriteCompletion() after figuring
844     // out how to deal with map access. See readCompleted().
845 
846     Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
847     assert(request);
848     assert(request->sio !=  NULL);
849     IoState &sio = *request->sio;
850 
851     // quit if somebody called IoState::close() while we were waiting
852     if (!sio.stillWaiting()) {
853         debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
854         noteFreeMapSlice(request->sidCurrent);
855         return;
856     }
857 
858     debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof);
859 
860     if (errflag != DISK_OK)
861         handleWriteCompletionProblem(errflag, *request);
862     else if (!sio.expectedReply(request->id))
863         handleWriteCompletionProblem(DISK_ERROR, *request);
864     else
865         handleWriteCompletionSuccess(*request);
866 
867     if (sio.touchingStoreEntry())
868         CollapsedForwarding::Broadcast(*sio.e);
869 }
870 
871 /// code shared by writeCompleted() success handling cases
872 void
handleWriteCompletionSuccess(const WriteRequest & request)873 Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest &request)
874 {
875     auto &sio = *(request.sio);
876     sio.splicingPoint = request.sidCurrent;
877     // do not increment sio.offset_ because we do it in sio->write()
878 
879     assert(sio.writeableAnchor_);
880     if (sio.writeableAnchor_->start < 0) { // wrote the first slot
881         Must(request.sidPrevious < 0);
882         sio.writeableAnchor_->start = request.sidCurrent;
883     } else {
884         Must(request.sidPrevious >= 0);
885         map->writeableSlice(sio.swap_filen, request.sidPrevious).next = request.sidCurrent;
886     }
887 
888     // finalize the shared slice info after writing slice contents to disk;
889     // the chain gets possession of the slice we were writing
890     Ipc::StoreMap::Slice &slice =
891         map->writeableSlice(sio.swap_filen, request.sidCurrent);
892     slice.size = request.len - sizeof(DbCellHeader);
893     Must(slice.next < 0);
894 
895     if (request.eof) {
896         assert(sio.e);
897         if (sio.touchingStoreEntry()) {
898             sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
899                                       sio.offset_;
900 
901             map->switchWritingToReading(sio.swap_filen);
902             // sio.e keeps the (now read) lock on the anchor
903         }
904         sio.writeableAnchor_ = NULL;
905         sio.finishedWriting(DISK_OK);
906     }
907 }
908 
909 /// code shared by writeCompleted() error handling cases
910 void
handleWriteCompletionProblem(const int errflag,const WriteRequest & request)911 Rock::SwapDir::handleWriteCompletionProblem(const int errflag, const WriteRequest &request)
912 {
913     auto &sio = *request.sio;
914 
915     noteFreeMapSlice(request.sidCurrent);
916 
917     writeError(sio);
918     sio.finishedWriting(errflag);
919     // and hope that Core will call disconnect() to close the map entry
920 }
921 
922 void
writeError(StoreIOState & sio)923 Rock::SwapDir::writeError(StoreIOState &sio)
924 {
925     // Do not abortWriting here. The entry should keep the write lock
926     // instead of losing association with the store and confusing core.
927     map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
928 
929     if (sio.touchingStoreEntry())
930         Store::Root().stopSharing(*sio.e);
931     // else noop: a fresh entry update error does not affect stale entry readers
932 
933     // All callers must also call IoState callback, to propagate the error.
934 }
935 
936 void
updateHeaders(StoreEntry * updatedE)937 Rock::SwapDir::updateHeaders(StoreEntry *updatedE)
938 {
939     if (!map)
940         return;
941 
942     Ipc::StoreMapUpdate update(updatedE);
943     if (!map->openForUpdating(update, updatedE->swap_filen))
944         return;
945 
946     try {
947         AsyncJob::Start(new HeaderUpdater(this, update));
948     } catch (const std::exception &ex) {
949         debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
950         map->abortUpdating(update);
951     }
952 }
953 
954 bool
full() const955 Rock::SwapDir::full() const
956 {
957     return freeSlots != NULL && !freeSlots->size();
958 }
959 
960 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
961 // but it should not happen for us
962 void
diskFull()963 Rock::SwapDir::diskFull()
964 {
965     debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
966            filePath);
967 }
968 
969 /// purge while full(); it should be sufficient to purge just one
970 void
maintain()971 Rock::SwapDir::maintain()
972 {
973     // The Store calls this to free some db space, but there is nothing wrong
974     // with a full() db, except when db has to shrink after reconfigure, and
975     // we do not support shrinking yet (it would have to purge specific slots).
976     // TODO: Disable maintain() requests when they are pointless.
977 }
978 
979 void
reference(StoreEntry & e)980 Rock::SwapDir::reference(StoreEntry &e)
981 {
982     debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
983     if (repl && repl->Referenced)
984         repl->Referenced(repl, &e, &e.repl);
985 }
986 
987 bool
dereference(StoreEntry & e)988 Rock::SwapDir::dereference(StoreEntry &e)
989 {
990     debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
991     if (repl && repl->Dereferenced)
992         repl->Dereferenced(repl, &e, &e.repl);
993 
994     // no need to keep e in the global store_table for us; we have our own map
995     return false;
996 }
997 
998 bool
unlinkdUseful() const999 Rock::SwapDir::unlinkdUseful() const
1000 {
1001     // no entry-specific files to unlink
1002     return false;
1003 }
1004 
1005 void
evictIfFound(const cache_key * key)1006 Rock::SwapDir::evictIfFound(const cache_key *key)
1007 {
1008     if (map)
1009         map->freeEntryByKey(key); // may not be there
1010 }
1011 
1012 void
evictCached(StoreEntry & e)1013 Rock::SwapDir::evictCached(StoreEntry &e)
1014 {
1015     debugs(47, 5, e);
1016     if (e.hasDisk(index)) {
1017         if (map->freeEntry(e.swap_filen))
1018             CollapsedForwarding::Broadcast(e);
1019         if (!e.locked())
1020             disconnect(e);
1021     } else if (const auto key = e.publicKey()) {
1022         evictIfFound(key);
1023     }
1024 }
1025 
1026 void
trackReferences(StoreEntry & e)1027 Rock::SwapDir::trackReferences(StoreEntry &e)
1028 {
1029     debugs(47, 5, HERE << e);
1030     if (repl)
1031         repl->Add(repl, &e, &e.repl);
1032 }
1033 
1034 void
ignoreReferences(StoreEntry & e)1035 Rock::SwapDir::ignoreReferences(StoreEntry &e)
1036 {
1037     debugs(47, 5, HERE << e);
1038     if (repl)
1039         repl->Remove(repl, &e, &e.repl);
1040 }
1041 
1042 void
statfs(StoreEntry & e) const1043 Rock::SwapDir::statfs(StoreEntry &e) const
1044 {
1045     storeAppendPrintf(&e, "\n");
1046     storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
1047     storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
1048                       currentSize() / 1024.0,
1049                       Math::doublePercent(currentSize(), maxSize()));
1050 
1051     const int entryLimit = entryLimitActual();
1052     const int slotLimit = slotLimitActual();
1053     storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit);
1054     if (map && entryLimit > 0) {
1055         const int entryCount = map->entryCount();
1056         storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
1057                           entryCount, (100.0 * entryCount / entryLimit));
1058     }
1059 
1060     storeAppendPrintf(&e, "Maximum slots:   %9d\n", slotLimit);
1061     if (map && slotLimit > 0) {
1062         const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
1063         if (slotsFree <= static_cast<const unsigned int>(slotLimit)) {
1064             const int usedSlots = slotLimit - static_cast<const int>(slotsFree);
1065             storeAppendPrintf(&e, "Used slots:      %9d %.2f%%\n",
1066                               usedSlots, (100.0 * usedSlots / slotLimit));
1067         }
1068         if (slotLimit < 100) { // XXX: otherwise too expensive to count
1069             Ipc::ReadWriteLockStats stats;
1070             map->updateStats(stats);
1071             stats.dump(e);
1072         }
1073     }
1074 
1075     storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
1076                       store_open_disk_fd, Config.max_open_disk_fds);
1077 
1078     storeAppendPrintf(&e, "Flags:");
1079 
1080     if (flags.selected)
1081         storeAppendPrintf(&e, " SELECTED");
1082 
1083     if (flags.read_only)
1084         storeAppendPrintf(&e, " READ-ONLY");
1085 
1086     storeAppendPrintf(&e, "\n");
1087 
1088 }
1089 
1090 SBuf
inodeMapPath() const1091 Rock::SwapDir::inodeMapPath() const
1092 {
1093     return Ipc::Mem::Segment::Name(SBuf(path), "map");
1094 }
1095 
1096 const char *
freeSlotsPath() const1097 Rock::SwapDir::freeSlotsPath() const
1098 {
1099     static String spacesPath;
1100     spacesPath = path;
1101     spacesPath.append("_spaces");
1102     return spacesPath.termedBuf();
1103 }
1104 
1105 bool
hasReadableEntry(const StoreEntry & e) const1106 Rock::SwapDir::hasReadableEntry(const StoreEntry &e) const
1107 {
1108     return map->hasReadableEntry(reinterpret_cast<const cache_key*>(e.key));
1109 }
1110 
1111 namespace Rock
1112 {
1113 RunnerRegistrationEntry(SwapDirRr);
1114 }
1115 
create()1116 void Rock::SwapDirRr::create()
1117 {
1118     Must(mapOwners.empty() && freeSlotsOwners.empty());
1119     for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
1120         if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
1121             const int64_t capacity = sd->slotLimitActual();
1122 
1123             SwapDir::DirMap::Owner *const mapOwner =
1124                 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1125             mapOwners.push_back(mapOwner);
1126 
1127             // TODO: somehow remove pool id and counters from PageStack?
1128             Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1129                 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1130                                              i+1, capacity, 0);
1131             freeSlotsOwners.push_back(freeSlotsOwner);
1132 
1133             // TODO: add method to initialize PageStack with no free pages
1134             while (true) {
1135                 Ipc::Mem::PageId pageId;
1136                 if (!freeSlotsOwner->object()->pop(pageId))
1137                     break;
1138             }
1139         }
1140     }
1141 }
1142 
~SwapDirRr()1143 Rock::SwapDirRr::~SwapDirRr()
1144 {
1145     for (size_t i = 0; i < mapOwners.size(); ++i) {
1146         delete mapOwners[i];
1147         delete freeSlotsOwners[i];
1148     }
1149 }
1150 
1151