1 /* Copyright (C) 2019 MariaDB Corporation
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18
19 #include "PrefixCache.h"
20 #include "Cache.h"
21 #include "Config.h"
22 #include "Downloader.h"
23 #include "Synchronizer.h"
24 #include <iostream>
25 #include <syslog.h>
26 #include <boost/filesystem.hpp>
27 #include <boost/thread.hpp>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <unistd.h>
31
32 using namespace std;
33 namespace bf = boost::filesystem;
34
35 namespace storagemanager
36 {
37
PrefixCache(const bf::path & prefix)38 PrefixCache::PrefixCache(const bf::path &prefix) : firstDir(prefix), currentCacheSize(0)
39 {
40 Config *conf = Config::get();
41 logger = SMLogging::get();
42 replicator = Replicator::get();
43 downloader = Cache::get()->getDownloader();
44
45 string stmp = conf->getValue("Cache", "cache_size");
46 if (stmp.empty())
47 {
48 logger->log(LOG_CRIT, "Cache/cache_size is not set");
49 throw runtime_error("Please set Cache/cache_size in the storagemanager.cnf file");
50 }
51 try
52 {
53 maxCacheSize = stoul(stmp);
54 }
55 catch (invalid_argument &)
56 {
57 logger->log(LOG_CRIT, "Cache/cache_size is not a number");
58 throw runtime_error("Please set Cache/cache_size to a number");
59 }
60 //cout << "Cache got cache size " << maxCacheSize << endl;
61
62 stmp = conf->getValue("ObjectStorage", "object_size");
63 if (stmp.empty())
64 {
65 logger->log(LOG_CRIT, "ObjectStorage/object_size is not set");
66 throw runtime_error("Please set ObjectStorage/object_size in the storagemanager.cnf file");
67 }
68 try
69 {
70 objectSize = stoul(stmp);
71 }
72 catch (invalid_argument &)
73 {
74 logger->log(LOG_CRIT, "ObjectStorage/object_size is not a number");
75 throw runtime_error("Please set ObjectStorage/object_size to a number");
76 }
77
78 cachePrefix = conf->getValue("Cache", "path");
79 if (cachePrefix.empty())
80 {
81 logger->log(LOG_CRIT, "Cache/path is not set");
82 throw runtime_error("Please set Cache/path in the storagemanager.cnf file");
83 }
84 cachePrefix /= firstDir;
85
86 try
87 {
88 bf::create_directories(cachePrefix);
89 }
90 catch (exception &e)
91 {
92 logger->log(LOG_CRIT, "Failed to create %s, got: %s", cachePrefix.string().c_str(), e.what());
93 throw e;
94 }
95
96 stmp = conf->getValue("ObjectStorage", "journal_path");
97 if (stmp.empty())
98 {
99 logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set");
100 throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file");
101 }
102 journalPrefix = stmp;
103 journalPrefix /= firstDir;
104
105 bf::create_directories(journalPrefix);
106 try
107 {
108 bf::create_directories(journalPrefix);
109 }
110 catch (exception &e)
111 {
112 logger->log(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what());
113 throw e;
114 }
115
116 lru_mutex.lock(); // unlocked by populate() when it's done
117 // Ideally put this in background but has to be synchronous with write calls
118 populate();
119 //boost::thread t([this] { this->populate(); });
120 //t.detach();
121 }
122
~PrefixCache()123 PrefixCache::~PrefixCache()
124 {
125 /* This and shutdown() need to do whatever is necessary to leave cache contents in a safe
126 state on disk. Does anything need to be done toward that?
127 */
128 }
129
populate()130 void PrefixCache::populate()
131 {
132 Synchronizer *sync = Synchronizer::get();
133 bf::directory_iterator dir(cachePrefix);
134 bf::directory_iterator dend;
135 vector<string> newObjects;
136 while (dir != dend)
137 {
138 // put everything in lru & m_lru
139 const bf::path &p = dir->path();
140 if (bf::is_regular_file(p))
141 {
142 lru.push_back(p.filename().string());
143 auto last = lru.end();
144 m_lru.insert(--last);
145 currentCacheSize += bf::file_size(*dir);
146 newObjects.push_back(p.filename().string());
147 }
148 else if (p != cachePrefix/downloader->getTmpPath())
149 logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str());
150 ++dir;
151 }
152 sync->newObjects(firstDir, newObjects);
153 newObjects.clear();
154
155 // account for what's in the journal dir
156 vector<pair<string, size_t> > newJournals;
157 dir = bf::directory_iterator(journalPrefix);
158 while (dir != dend)
159 {
160 const bf::path &p = dir->path();
161 if (bf::is_regular_file(p))
162 {
163 if (p.extension() == ".journal")
164 {
165 size_t s = bf::file_size(*dir);
166 currentCacheSize += s;
167 newJournals.push_back(pair<string, size_t>(p.stem().string(), s));
168 }
169 else
170 logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str());
171 }
172 else
173 logger->log(LOG_WARNING, "Cache: found something in the journal dir that does not belong '%s'", p.string().c_str());
174 ++dir;
175 }
176 lru_mutex.unlock();
177 sync->newJournalEntries(firstDir, newJournals);
178 }
179
180 // be careful using this! SM should be idle. No ongoing reads or writes.
validateCacheSize()181 void PrefixCache::validateCacheSize()
182 {
183 boost::unique_lock<boost::mutex> s(lru_mutex);
184
185 if (!doNotEvict.empty() || !toBeDeleted.empty())
186 {
187 cout << "Not safe to use validateCacheSize() at the moment." << endl;
188 return;
189 }
190
191 size_t oldSize = currentCacheSize;
192 currentCacheSize = 0;
193 m_lru.clear();
194 lru.clear();
195 populate();
196
197 if (oldSize != currentCacheSize)
198 logger->log(LOG_DEBUG, "PrefixCache::validateCacheSize(): found a discrepancy. Actual size is %lld, had %lld.",
199 currentCacheSize, oldSize);
200 else
201 logger->log(LOG_DEBUG, "PrefixCache::validateCacheSize(): Cache size accounting agrees with reality for now.");
202 }
203
read(const vector<string> & keys)204 void PrefixCache::read(const vector<string> &keys)
205 {
206 /* Move existing keys to the back of the LRU, start downloading nonexistant keys.
207 */
208 vector<const string *> keysToFetch;
209 vector<int> dlErrnos;
210 vector<size_t> dlSizes;
211
212 boost::unique_lock<boost::mutex> s(lru_mutex);
213
214 M_LRU_t::iterator mit;
215 for (const string &key : keys)
216 {
217 mit = m_lru.find(key);
218 if (mit != m_lru.end())
219 {
220 addToDNE(mit->lit);
221 lru.splice(lru.end(), lru, mit->lit); // move them to the back so they are last to pick for eviction
222 }
223 else
224 {
225 // There's window where the file has been downloaded but is not yet
226 // added to the lru structs. However it is in the DNE. If it is in the DNE, then it is also
227 // in Downloader's map. So, this thread needs to start the download if it's not in the
228 // DNE or if there's an existing download that hasn't finished yet. Starting the download
229 // includes waiting for an existing download to finish, which from this class's pov is the
230 // same thing.
231 if (doNotEvict.find(key) == doNotEvict.end() || downloader->inProgress(key))
232 keysToFetch.push_back(&key);
233 else
234 cout << "Cache: detected and stopped a racey download" << endl;
235 addToDNE(key);
236 }
237 }
238 if (keysToFetch.empty())
239 return;
240
241 downloader->download(keysToFetch, &dlErrnos, &dlSizes, cachePrefix, &lru_mutex);
242
243 size_t sum_sizes = 0;
244 for (uint i = 0; i < keysToFetch.size(); ++i)
245 {
246 // downloads with size 0 didn't actually happen, either because it
247 // was a preexisting download (another read() call owns it), or because
248 // there was an error downloading it. Use size == 0 as an indication of
249 // what to add to the cache. Also needs to verify that the file was not deleted,
250 // indicated by existence in doNotEvict.
251 if (dlSizes[i] != 0)
252 {
253 if (doNotEvict.find(*keysToFetch[i]) != doNotEvict.end())
254 {
255 sum_sizes += dlSizes[i];
256 lru.push_back(*keysToFetch[i]);
257 LRU_t::iterator lit = lru.end();
258 m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list.
259 }
260 else // it was downloaded, but a deletion happened so we have to toss it
261 {
262 cout << "removing a file that was deleted by another thread during download" << endl;
263 bf::remove(cachePrefix / (*keysToFetch[i]));
264 }
265 }
266 }
267
268 // move everything in keys to the back of the lru (yes, again)
269 for (const string &key : keys)
270 {
271 mit = m_lru.find(key);
272 if (mit != m_lru.end()) // all of the files exist, just not all of them are 'owned by' this thread.
273 lru.splice(lru.end(), lru, mit->lit);
274 }
275
276 // fix cache size
277 //_makeSpace(sum_sizes);
278 currentCacheSize += sum_sizes;
279 }
280
doneReading(const vector<string> & keys)281 void PrefixCache::doneReading(const vector<string> &keys)
282 {
283 boost::unique_lock<boost::mutex> s(lru_mutex);
284 for (const string &key : keys)
285 {
286 removeFromDNE(key);
287 // most should be in the map.
288 // debateable whether it's faster to look up the list iterator and use it
289 // or whether it's faster to bypass that and use strings only.
290
291 //const auto &it = m_lru.find(key);
292 //if (it != m_lru.end())
293 // removeFromDNE(it->lit);
294 }
295 _makeSpace(0);
296 }
297
doneWriting()298 void PrefixCache::doneWriting()
299 {
300 makeSpace(0);
301 }
302
DNEElement(const LRU_t::iterator & k)303 PrefixCache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1)
304 {
305 }
DNEElement(const string & k)306 PrefixCache::DNEElement::DNEElement(const string &k) : sKey(k), refCount(1)
307 {
308 }
309
addToDNE(const DNEElement & key)310 void PrefixCache::addToDNE(const DNEElement &key)
311 {
312 DNE_t::iterator it = doNotEvict.find(key);
313 if (it != doNotEvict.end())
314 {
315 DNEElement &dnee = const_cast<DNEElement &>(*it);
316 ++(dnee.refCount);
317 }
318 else
319 doNotEvict.insert(key);
320 }
321
removeFromDNE(const DNEElement & key)322 void PrefixCache::removeFromDNE(const DNEElement &key)
323 {
324 DNE_t::iterator it = doNotEvict.find(key);
325 if (it == doNotEvict.end())
326 return;
327 DNEElement &dnee = const_cast<DNEElement &>(*it);
328 if (--(dnee.refCount) == 0)
329 doNotEvict.erase(it);
330 }
331
getCachePath()332 const bf::path & PrefixCache::getCachePath()
333 {
334 return cachePrefix;
335 }
336
getJournalPath()337 const bf::path & PrefixCache::getJournalPath()
338 {
339 return journalPrefix;
340 }
341
exists(const vector<string> & keys,vector<bool> * out) const342 void PrefixCache::exists(const vector<string> &keys, vector<bool> *out) const
343 {
344 out->resize(keys.size());
345 boost::unique_lock<boost::mutex> s(lru_mutex);
346 for (uint i = 0; i < keys.size(); i++)
347 (*out)[i] = (m_lru.find(keys[i]) != m_lru.end());
348 }
349
exists(const string & key) const350 bool PrefixCache::exists(const string &key) const
351 {
352 boost::unique_lock<boost::mutex> s(lru_mutex);
353 return m_lru.find(key) != m_lru.end();
354 }
355
newObject(const string & key,size_t size)356 void PrefixCache::newObject(const string &key, size_t size)
357 {
358 boost::unique_lock<boost::mutex> s(lru_mutex);
359 assert(m_lru.find(key) == m_lru.end());
360 if (m_lru.find(key) != m_lru.end())
361 {
362 //This should never happen but was in MCOL-3499
363 //Remove this when PrefixCache ctor can call populate() synchronous with write calls
364 logger->log(LOG_ERR, "PrefixCache::newObject(): key exists in m_lru already %s",key.c_str());
365 }
366 //_makeSpace(size);
367 lru.push_back(key);
368 LRU_t::iterator back = lru.end();
369 m_lru.insert(--back);
370 currentCacheSize += size;
371 }
372
newJournalEntry(size_t size)373 void PrefixCache::newJournalEntry(size_t size)
374 {
375 boost::unique_lock<boost::mutex> s(lru_mutex);
376 //_makeSpace(size);
377 currentCacheSize += size;
378 }
379
deletedJournal(size_t size)380 void PrefixCache::deletedJournal(size_t size)
381 {
382 boost::unique_lock<boost::mutex> s(lru_mutex);
383
384 if (currentCacheSize >= size)
385 currentCacheSize -= size;
386 else
387 {
388 ostringstream oss;
389 oss << "PrefixCache::deletedJournal(): Detected an accounting error.";
390 logger->log(LOG_WARNING, oss.str().c_str());
391 currentCacheSize = 0;
392 }
393 }
394
deletedObject(const string & key,size_t size)395 void PrefixCache::deletedObject(const string &key, size_t size)
396 {
397 boost::unique_lock<boost::mutex> s(lru_mutex);
398
399 M_LRU_t::iterator mit = m_lru.find(key);
400 assert(mit != m_lru.end());
401
402 // if it's being flushed, let makeSpace() do the deleting
403 if (toBeDeleted.find(mit->lit) == toBeDeleted.end())
404 {
405 doNotEvict.erase(mit->lit);
406 lru.erase(mit->lit);
407 m_lru.erase(mit);
408 if (currentCacheSize >= size)
409 currentCacheSize -= size;
410 else
411 {
412 ostringstream oss;
413 oss << "PrefixCache::deletedObject(): Detected an accounting error.";
414 logger->log(LOG_WARNING, oss.str().c_str());
415 currentCacheSize = 0;
416 }
417 }
418 }
419
setMaxCacheSize(size_t size)420 void PrefixCache::setMaxCacheSize(size_t size)
421 {
422 boost::unique_lock<boost::mutex> s(lru_mutex);
423 if (size < maxCacheSize)
424 _makeSpace(maxCacheSize - size);
425 maxCacheSize = size;
426 }
427
makeSpace(size_t size)428 void PrefixCache::makeSpace(size_t size)
429 {
430 boost::unique_lock<boost::mutex> s(lru_mutex);
431 _makeSpace(size);
432 }
433
getMaxCacheSize() const434 size_t PrefixCache::getMaxCacheSize() const
435 {
436 return maxCacheSize;
437 }
438
439 // call this holding lru_mutex
_makeSpace(size_t size)440 void PrefixCache::_makeSpace(size_t size)
441 {
442 ssize_t thisMuch = currentCacheSize + size - maxCacheSize;
443 if (thisMuch <= 0)
444 return;
445
446 LRU_t::iterator it;
447 while (thisMuch > 0 && !lru.empty())
448 {
449 it = lru.begin();
450 // find the first element not being either read() right now or being processed by another
451 // makeSpace() call.
452 while (it != lru.end())
453 {
454 // make sure it's not currently being read or being flushed by another _makeSpace() call
455 if ((doNotEvict.find(it) == doNotEvict.end()) && (toBeDeleted.find(it) == toBeDeleted.end()))
456 break;
457 ++it;
458 }
459 if (it == lru.end())
460 {
461 // nothing can be deleted right now
462 return;
463 }
464
465 // ran into this a couple times, still happens as of commit 948ee1aa5
466 // BT: made this more visable in logging.
467 // likely related to MCOL-3499 and lru containing double entries.
468 if (!bf::exists(cachePrefix / *it))
469 logger->log(LOG_WARNING, "PrefixCache::makeSpace(): doesn't exist, %s/%s",cachePrefix.string().c_str(),((string)(*it)).c_str());
470 assert(bf::exists(cachePrefix / *it));
471 /*
472 tell Synchronizer that this key will be evicted
473 delete the file
474 remove it from our structs
475 update current size
476 */
477
478 //logger->log(LOG_WARNING, "Cache: flushing!");
479 toBeDeleted.insert(it);
480
481 string key = *it; // need to make a copy; it could get changed after unlocking.
482
483 lru_mutex.unlock();
484 try
485 {
486 Synchronizer::get()->flushObject(firstDir, key);
487 }
488 catch (...)
489 {
490 // it gets logged by Sync
491 lru_mutex.lock();
492 toBeDeleted.erase(it);
493 continue;
494 }
495 lru_mutex.lock();
496
497 // check doNotEvict again in case this object is now being read
498 if (doNotEvict.find(it) == doNotEvict.end())
499 {
500 bf::path cachedFile = cachePrefix / *it;
501 m_lru.erase(*it);
502 toBeDeleted.erase(it);
503 lru.erase(it);
504 size_t newSize = bf::file_size(cachedFile);
505 replicator->remove(cachedFile, Replicator::LOCAL_ONLY);
506 if (newSize < currentCacheSize)
507 {
508 currentCacheSize -= newSize;
509 thisMuch -= newSize;
510 }
511 else
512 {
513 logger->log(LOG_WARNING, "PrefixCache::makeSpace(): accounting error. Almost wrapped currentCacheSize on flush.");
514 currentCacheSize = 0;
515 thisMuch = 0;
516 }
517 }
518 else
519 toBeDeleted.erase(it);
520 }
521 }
522
rename(const string & oldKey,const string & newKey,ssize_t sizediff)523 void PrefixCache::rename(const string &oldKey, const string &newKey, ssize_t sizediff)
524 {
525 // rename it in the LRU
526 // erase/insert to rehash it everywhere else
527
528 boost::unique_lock<boost::mutex> s(lru_mutex);
529 auto it = m_lru.find(oldKey);
530 if (it == m_lru.end())
531 return;
532
533 auto lit = it->lit;
534 m_lru.erase(it);
535 int refCount = 0;
536 auto dne_it = doNotEvict.find(lit);
537 if (dne_it != doNotEvict.end())
538 {
539 refCount = dne_it->refCount;
540 doNotEvict.erase(dne_it);
541 }
542
543 auto tbd_it = toBeDeleted.find(lit);
544 bool hasTBDEntry = (tbd_it != toBeDeleted.end());
545 if (hasTBDEntry)
546 toBeDeleted.erase(tbd_it);
547
548 *lit = newKey;
549
550 if (hasTBDEntry)
551 toBeDeleted.insert(lit);
552 if (refCount != 0)
553 {
554 pair<DNE_t::iterator, bool> dne_tmp = doNotEvict.insert(lit);
555 const_cast<DNEElement &>(*(dne_tmp.first)).refCount = refCount;
556 }
557
558 m_lru.insert(lit);
559 currentCacheSize += sizediff;
560 }
561
ifExistsThenDelete(const string & key)562 int PrefixCache::ifExistsThenDelete(const string &key)
563 {
564 bf::path cachedPath = cachePrefix / key;
565 bf::path journalPath = journalPrefix / (key + ".journal");
566
567 boost::unique_lock<boost::mutex> s(lru_mutex);
568 bool objectExists = false;
569
570 auto it = m_lru.find(key);
571 if (it != m_lru.end())
572 {
573 if (toBeDeleted.find(it->lit) == toBeDeleted.end())
574 {
575 doNotEvict.erase(it->lit);
576 lru.erase(it->lit);
577 m_lru.erase(it);
578 objectExists = true;
579 }
580 else // let makeSpace() delete it if it's already in progress
581 return 0;
582 }
583 bool journalExists = bf::exists(journalPath);
584 //assert(objectExists == bf::exists(cachedPath));
585
586 size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0);
587 //size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0);
588 size_t journalSize = (journalExists ? bf::file_size(journalPath) : 0);
589 currentCacheSize -= (objectSize + journalSize);
590
591 //assert(!objectExists || objectSize == bf::file_size(cachedPath));
592
593 return (objectExists ? 1 : 0) | (journalExists ? 2 : 0);
594 }
595
getCurrentCacheSize() const596 size_t PrefixCache::getCurrentCacheSize() const
597 {
598 return currentCacheSize;
599 }
600
getCurrentCacheElementCount() const601 size_t PrefixCache::getCurrentCacheElementCount() const
602 {
603 boost::unique_lock<boost::mutex> s(lru_mutex);
604 assert(m_lru.size() == lru.size());
605 return m_lru.size();
606 }
607
reset()608 void PrefixCache::reset()
609 {
610 boost::unique_lock<boost::mutex> s(lru_mutex);
611 m_lru.clear();
612 lru.clear();
613 toBeDeleted.clear();
614 doNotEvict.clear();
615
616 bf::directory_iterator dir;
617 bf::directory_iterator dend;
618 for (dir = bf::directory_iterator(cachePrefix); dir != dend; ++dir)
619 bf::remove_all(dir->path());
620
621 for (dir = bf::directory_iterator(journalPrefix); dir != dend; ++dir)
622 bf::remove_all(dir->path());
623 currentCacheSize = 0;
624 }
625
shutdown()626 void PrefixCache::shutdown()
627 {
628 /* Does this need to do something anymore? */
629 }
630
631 /* The helper classes */
632
M_LRU_element_t(const string * k)633 PrefixCache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k)
634 {}
635
M_LRU_element_t(const string & k)636 PrefixCache::M_LRU_element_t::M_LRU_element_t(const string &k) : key(&k)
637 {}
638
M_LRU_element_t(const LRU_t::iterator & i)639 PrefixCache::M_LRU_element_t::M_LRU_element_t(const LRU_t::iterator &i) : key(&(*i)), lit(i)
640 {}
641
operator ()(const M_LRU_element_t & l) const642 inline size_t PrefixCache::KeyHasher::operator()(const M_LRU_element_t &l) const
643 {
644 return hash<string>()(*(l.key));
645 }
646
operator ()(const M_LRU_element_t & l1,const M_LRU_element_t & l2) const647 inline bool PrefixCache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const
648 {
649 return (*(l1.key) == *(l2.key));
650 }
651
operator ()(const DNEElement & l) const652 inline size_t PrefixCache::DNEHasher::operator()(const DNEElement &l) const
653 {
654 return (l.sKey.empty() ? hash<string>()(*(l.key)) : hash<string>()(l.sKey));
655 }
656
operator ()(const DNEElement & l1,const DNEElement & l2) const657 inline bool PrefixCache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const
658 {
659 const string *s1, *s2;
660 s1 = l1.sKey.empty() ? &(*(l1.key)) : &(l1.sKey);
661 s2 = l2.sKey.empty() ? &(*(l2.key)) : &(l2.sKey);
662
663 return (*s1 == *s2);
664 }
665
operator ()(const LRU_t::iterator & i1,const LRU_t::iterator & i2) const666 inline bool PrefixCache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const
667 {
668 return *i1 < *i2;
669 }
670
671 }
672
673
674
675
676