1 /* Copyright (C) 2019 MariaDB Corporation
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 
19 #include "PrefixCache.h"
20 #include "Cache.h"
21 #include "Config.h"
22 #include "Downloader.h"
23 #include "Synchronizer.h"
24 #include <iostream>
25 #include <syslog.h>
26 #include <boost/filesystem.hpp>
27 #include <boost/thread.hpp>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <unistd.h>
31 
32 using namespace std;
33 namespace bf = boost::filesystem;
34 
35 namespace storagemanager
36 {
37 
PrefixCache(const bf::path & prefix)38 PrefixCache::PrefixCache(const bf::path &prefix) : firstDir(prefix), currentCacheSize(0)
39 {
40     Config *conf = Config::get();
41     logger = SMLogging::get();
42     replicator = Replicator::get();
43     downloader = Cache::get()->getDownloader();
44 
45     string stmp = conf->getValue("Cache", "cache_size");
46     if (stmp.empty())
47     {
48         logger->log(LOG_CRIT, "Cache/cache_size is not set");
49         throw runtime_error("Please set Cache/cache_size in the storagemanager.cnf file");
50     }
51     try
52     {
53         maxCacheSize = stoul(stmp);
54     }
55     catch (invalid_argument &)
56     {
57         logger->log(LOG_CRIT, "Cache/cache_size is not a number");
58         throw runtime_error("Please set Cache/cache_size to a number");
59     }
60     //cout << "Cache got cache size " << maxCacheSize << endl;
61 
62     stmp = conf->getValue("ObjectStorage", "object_size");
63     if (stmp.empty())
64     {
65         logger->log(LOG_CRIT, "ObjectStorage/object_size is not set");
66         throw runtime_error("Please set ObjectStorage/object_size in the storagemanager.cnf file");
67     }
68     try
69     {
70         objectSize = stoul(stmp);
71     }
72     catch (invalid_argument &)
73     {
74         logger->log(LOG_CRIT, "ObjectStorage/object_size is not a number");
75         throw runtime_error("Please set ObjectStorage/object_size to a number");
76     }
77 
78     cachePrefix = conf->getValue("Cache", "path");
79     if (cachePrefix.empty())
80     {
81         logger->log(LOG_CRIT, "Cache/path is not set");
82         throw runtime_error("Please set Cache/path in the storagemanager.cnf file");
83     }
84     cachePrefix /= firstDir;
85 
86     try
87     {
88         bf::create_directories(cachePrefix);
89     }
90     catch (exception &e)
91     {
92         logger->log(LOG_CRIT, "Failed to create %s, got: %s", cachePrefix.string().c_str(), e.what());
93         throw e;
94     }
95 
96     stmp = conf->getValue("ObjectStorage", "journal_path");
97     if (stmp.empty())
98     {
99         logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set");
100         throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file");
101     }
102     journalPrefix = stmp;
103     journalPrefix /= firstDir;
104 
105     bf::create_directories(journalPrefix);
106     try
107     {
108         bf::create_directories(journalPrefix);
109     }
110     catch (exception &e)
111     {
112         logger->log(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what());
113         throw e;
114     }
115 
116     lru_mutex.lock();    // unlocked by populate() when it's done
117     // Ideally put this in background but has to be synchronous with write calls
118     populate();
119     //boost::thread t([this] { this->populate(); });
120     //t.detach();
121 }
122 
~PrefixCache()123 PrefixCache::~PrefixCache()
124 {
125     /*  This and shutdown() need to do whatever is necessary to leave cache contents in a safe
126         state on disk.  Does anything need to be done toward that?
127     */
128 }
129 
populate()130 void PrefixCache::populate()
131 {
132     Synchronizer *sync = Synchronizer::get();
133     bf::directory_iterator dir(cachePrefix);
134     bf::directory_iterator dend;
135     vector<string> newObjects;
136     while (dir != dend)
137     {
138         // put everything in lru & m_lru
139         const bf::path &p = dir->path();
140         if (bf::is_regular_file(p))
141         {
142             lru.push_back(p.filename().string());
143             auto last = lru.end();
144             m_lru.insert(--last);
145             currentCacheSize += bf::file_size(*dir);
146             newObjects.push_back(p.filename().string());
147         }
148         else if (p != cachePrefix/downloader->getTmpPath())
149             logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str());
150         ++dir;
151     }
152     sync->newObjects(firstDir, newObjects);
153     newObjects.clear();
154 
155     // account for what's in the journal dir
156     vector<pair<string, size_t> > newJournals;
157     dir = bf::directory_iterator(journalPrefix);
158     while (dir != dend)
159     {
160         const bf::path &p = dir->path();
161         if (bf::is_regular_file(p))
162         {
163             if (p.extension() == ".journal")
164             {
165                 size_t s = bf::file_size(*dir);
166                 currentCacheSize += s;
167                 newJournals.push_back(pair<string, size_t>(p.stem().string(), s));
168             }
169             else
170                 logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str());
171         }
172         else
173             logger->log(LOG_WARNING, "Cache: found something in the journal dir that does not belong '%s'", p.string().c_str());
174         ++dir;
175     }
176     lru_mutex.unlock();
177     sync->newJournalEntries(firstDir, newJournals);
178 }
179 
180 // be careful using this!  SM should be idle.  No ongoing reads or writes.
validateCacheSize()181 void PrefixCache::validateCacheSize()
182 {
183     boost::unique_lock<boost::mutex> s(lru_mutex);
184 
185     if (!doNotEvict.empty() || !toBeDeleted.empty())
186     {
187         cout << "Not safe to use validateCacheSize() at the moment." << endl;
188         return;
189     }
190 
191     size_t oldSize = currentCacheSize;
192     currentCacheSize = 0;
193     m_lru.clear();
194     lru.clear();
195     populate();
196 
197     if (oldSize != currentCacheSize)
198         logger->log(LOG_DEBUG, "PrefixCache::validateCacheSize(): found a discrepancy.  Actual size is %lld, had %lld.",
199             currentCacheSize, oldSize);
200     else
201         logger->log(LOG_DEBUG, "PrefixCache::validateCacheSize(): Cache size accounting agrees with reality for now.");
202 }
203 
read(const vector<string> & keys)204 void PrefixCache::read(const vector<string> &keys)
205 {
206     /*  Move existing keys to the back of the LRU, start downloading nonexistant keys.
207     */
208     vector<const string *> keysToFetch;
209     vector<int> dlErrnos;
210     vector<size_t> dlSizes;
211 
212     boost::unique_lock<boost::mutex> s(lru_mutex);
213 
214     M_LRU_t::iterator mit;
215     for (const string &key : keys)
216     {
217         mit = m_lru.find(key);
218         if (mit != m_lru.end())
219         {
220             addToDNE(mit->lit);
221             lru.splice(lru.end(), lru, mit->lit);   // move them to the back so they are last to pick for eviction
222         }
223         else
224         {
225             // There's window where the file has been downloaded but is not yet
226             // added to the lru structs.  However it is in the DNE.  If it is in the DNE, then it is also
227             // in Downloader's map.  So, this thread needs to start the download if it's not in the
228             // DNE or if there's an existing download that hasn't finished yet.  Starting the download
229             // includes waiting for an existing download to finish, which from this class's pov is the
230             // same thing.
231             if (doNotEvict.find(key) == doNotEvict.end() || downloader->inProgress(key))
232                 keysToFetch.push_back(&key);
233             else
234                 cout << "Cache: detected and stopped a racey download" << endl;
235             addToDNE(key);
236         }
237     }
238     if (keysToFetch.empty())
239         return;
240 
241     downloader->download(keysToFetch, &dlErrnos, &dlSizes, cachePrefix, &lru_mutex);
242 
243     size_t sum_sizes = 0;
244     for (uint i = 0; i < keysToFetch.size(); ++i)
245     {
246         // downloads with size 0 didn't actually happen, either because it
247         // was a preexisting download (another read() call owns it), or because
248         // there was an error downloading it.  Use size == 0 as an indication of
249         // what to add to the cache.  Also needs to verify that the file was not deleted,
250         // indicated by existence in doNotEvict.
251         if (dlSizes[i] != 0)
252         {
253             if (doNotEvict.find(*keysToFetch[i]) != doNotEvict.end())
254             {
255                 sum_sizes += dlSizes[i];
256                 lru.push_back(*keysToFetch[i]);
257                 LRU_t::iterator lit = lru.end();
258                 m_lru.insert(--lit);  // I dislike this way of grabbing the last iterator in a list.
259             }
260             else    // it was downloaded, but a deletion happened so we have to toss it
261             {
262                 cout << "removing a file that was deleted by another thread during download" << endl;
263                 bf::remove(cachePrefix / (*keysToFetch[i]));
264             }
265         }
266     }
267 
268     // move everything in keys to the back of the lru (yes, again)
269     for (const string &key : keys)
270     {
271         mit = m_lru.find(key);
272         if (mit != m_lru.end())    // all of the files exist, just not all of them are 'owned by' this thread.
273             lru.splice(lru.end(), lru, mit->lit);
274     }
275 
276     // fix cache size
277     //_makeSpace(sum_sizes);
278     currentCacheSize += sum_sizes;
279 }
280 
doneReading(const vector<string> & keys)281 void PrefixCache::doneReading(const vector<string> &keys)
282 {
283     boost::unique_lock<boost::mutex> s(lru_mutex);
284     for (const string &key : keys)
285     {
286         removeFromDNE(key);
287         // most should be in the map.
288         // debateable whether it's faster to look up the list iterator and use it
289         // or whether it's faster to bypass that and use strings only.
290 
291         //const auto &it = m_lru.find(key);
292         //if (it != m_lru.end())
293         //    removeFromDNE(it->lit);
294     }
295     _makeSpace(0);
296 }
297 
doneWriting()298 void PrefixCache::doneWriting()
299 {
300     makeSpace(0);
301 }
302 
DNEElement(const LRU_t::iterator & k)303 PrefixCache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1)
304 {
305 }
DNEElement(const string & k)306 PrefixCache::DNEElement::DNEElement(const string &k) : sKey(k), refCount(1)
307 {
308 }
309 
addToDNE(const DNEElement & key)310 void PrefixCache::addToDNE(const DNEElement &key)
311 {
312     DNE_t::iterator it = doNotEvict.find(key);
313     if (it != doNotEvict.end())
314     {
315         DNEElement &dnee = const_cast<DNEElement &>(*it);
316         ++(dnee.refCount);
317     }
318     else
319         doNotEvict.insert(key);
320 }
321 
removeFromDNE(const DNEElement & key)322 void PrefixCache::removeFromDNE(const DNEElement &key)
323 {
324     DNE_t::iterator it = doNotEvict.find(key);
325     if (it == doNotEvict.end())
326         return;
327     DNEElement &dnee = const_cast<DNEElement &>(*it);
328     if (--(dnee.refCount) == 0)
329         doNotEvict.erase(it);
330 }
331 
getCachePath()332 const bf::path & PrefixCache::getCachePath()
333 {
334     return cachePrefix;
335 }
336 
getJournalPath()337 const bf::path & PrefixCache::getJournalPath()
338 {
339     return journalPrefix;
340 }
341 
exists(const vector<string> & keys,vector<bool> * out) const342 void PrefixCache::exists(const vector<string> &keys, vector<bool> *out) const
343 {
344     out->resize(keys.size());
345     boost::unique_lock<boost::mutex> s(lru_mutex);
346     for (uint i = 0; i < keys.size(); i++)
347         (*out)[i] = (m_lru.find(keys[i]) != m_lru.end());
348 }
349 
exists(const string & key) const350 bool PrefixCache::exists(const string &key) const
351 {
352     boost::unique_lock<boost::mutex> s(lru_mutex);
353     return m_lru.find(key) != m_lru.end();
354 }
355 
newObject(const string & key,size_t size)356 void PrefixCache::newObject(const string &key, size_t size)
357 {
358     boost::unique_lock<boost::mutex> s(lru_mutex);
359     assert(m_lru.find(key) == m_lru.end());
360     if (m_lru.find(key) != m_lru.end())
361     {
362         //This should never happen but was in MCOL-3499
363         //Remove this when PrefixCache ctor can call populate() synchronous with write calls
364         logger->log(LOG_ERR, "PrefixCache::newObject(): key exists in m_lru already %s",key.c_str());
365     }
366     //_makeSpace(size);
367     lru.push_back(key);
368     LRU_t::iterator back = lru.end();
369     m_lru.insert(--back);
370     currentCacheSize += size;
371 }
372 
newJournalEntry(size_t size)373 void PrefixCache::newJournalEntry(size_t size)
374 {
375     boost::unique_lock<boost::mutex> s(lru_mutex);
376     //_makeSpace(size);
377     currentCacheSize += size;
378 }
379 
deletedJournal(size_t size)380 void PrefixCache::deletedJournal(size_t size)
381 {
382     boost::unique_lock<boost::mutex> s(lru_mutex);
383 
384     if (currentCacheSize >= size)
385         currentCacheSize -= size;
386     else
387     {
388         ostringstream oss;
389         oss << "PrefixCache::deletedJournal(): Detected an accounting error.";
390         logger->log(LOG_WARNING, oss.str().c_str());
391         currentCacheSize = 0;
392     }
393 }
394 
deletedObject(const string & key,size_t size)395 void PrefixCache::deletedObject(const string &key, size_t size)
396 {
397     boost::unique_lock<boost::mutex> s(lru_mutex);
398 
399     M_LRU_t::iterator mit = m_lru.find(key);
400     assert(mit != m_lru.end());
401 
402     // if it's being flushed, let makeSpace() do the deleting
403     if (toBeDeleted.find(mit->lit) == toBeDeleted.end())
404     {
405         doNotEvict.erase(mit->lit);
406         lru.erase(mit->lit);
407         m_lru.erase(mit);
408         if (currentCacheSize >= size)
409             currentCacheSize -= size;
410         else
411         {
412             ostringstream oss;
413             oss << "PrefixCache::deletedObject(): Detected an accounting error.";
414             logger->log(LOG_WARNING, oss.str().c_str());
415             currentCacheSize = 0;
416         }
417     }
418 }
419 
setMaxCacheSize(size_t size)420 void PrefixCache::setMaxCacheSize(size_t size)
421 {
422     boost::unique_lock<boost::mutex> s(lru_mutex);
423     if (size < maxCacheSize)
424         _makeSpace(maxCacheSize - size);
425     maxCacheSize = size;
426 }
427 
makeSpace(size_t size)428 void PrefixCache::makeSpace(size_t size)
429 {
430     boost::unique_lock<boost::mutex> s(lru_mutex);
431     _makeSpace(size);
432 }
433 
getMaxCacheSize() const434 size_t PrefixCache::getMaxCacheSize() const
435 {
436     return maxCacheSize;
437 }
438 
439 // call this holding lru_mutex
_makeSpace(size_t size)440 void PrefixCache::_makeSpace(size_t size)
441 {
442     ssize_t thisMuch = currentCacheSize + size - maxCacheSize;
443     if (thisMuch <= 0)
444         return;
445 
446     LRU_t::iterator it;
447     while (thisMuch > 0 && !lru.empty())
448     {
449         it = lru.begin();
450         // find the first element not being either read() right now or being processed by another
451         // makeSpace() call.
452         while (it != lru.end())
453         {
454             // make sure it's not currently being read or being flushed by another _makeSpace() call
455             if ((doNotEvict.find(it) == doNotEvict.end()) && (toBeDeleted.find(it) == toBeDeleted.end()))
456                 break;
457             ++it;
458         }
459         if (it == lru.end())
460         {
461             // nothing can be deleted right now
462             return;
463         }
464 
465         // ran into this a couple times, still happens as of commit 948ee1aa5
466         // BT: made this more visable in logging.
467         //     likely related to MCOL-3499 and lru containing double entries.
468         if (!bf::exists(cachePrefix / *it))
469             logger->log(LOG_WARNING, "PrefixCache::makeSpace(): doesn't exist, %s/%s",cachePrefix.string().c_str(),((string)(*it)).c_str());
470         assert(bf::exists(cachePrefix / *it));
471         /*
472             tell Synchronizer that this key will be evicted
473             delete the file
474             remove it from our structs
475             update current size
476         */
477 
478         //logger->log(LOG_WARNING, "Cache:  flushing!");
479         toBeDeleted.insert(it);
480 
481         string key = *it;    // need to make a copy; it could get changed after unlocking.
482 
483         lru_mutex.unlock();
484         try
485         {
486             Synchronizer::get()->flushObject(firstDir, key);
487         }
488         catch (...)
489         {
490             // it gets logged by Sync
491             lru_mutex.lock();
492             toBeDeleted.erase(it);
493             continue;
494         }
495         lru_mutex.lock();
496 
497         // check doNotEvict again in case this object is now being read
498         if (doNotEvict.find(it) == doNotEvict.end())
499         {
500             bf::path cachedFile = cachePrefix / *it;
501             m_lru.erase(*it);
502             toBeDeleted.erase(it);
503             lru.erase(it);
504             size_t newSize = bf::file_size(cachedFile);
505             replicator->remove(cachedFile, Replicator::LOCAL_ONLY);
506             if (newSize < currentCacheSize)
507             {
508                 currentCacheSize -= newSize;
509                 thisMuch -= newSize;
510             }
511             else
512             {
513                 logger->log(LOG_WARNING, "PrefixCache::makeSpace(): accounting error.  Almost wrapped currentCacheSize on flush.");
514                 currentCacheSize = 0;
515                 thisMuch = 0;
516             }
517         }
518         else
519             toBeDeleted.erase(it);
520     }
521 }
522 
rename(const string & oldKey,const string & newKey,ssize_t sizediff)523 void PrefixCache::rename(const string &oldKey, const string &newKey, ssize_t sizediff)
524 {
525     // rename it in the LRU
526     // erase/insert to rehash it everywhere else
527 
528     boost::unique_lock<boost::mutex> s(lru_mutex);
529     auto it = m_lru.find(oldKey);
530     if (it == m_lru.end())
531         return;
532 
533     auto lit = it->lit;
534     m_lru.erase(it);
535     int refCount = 0;
536     auto dne_it = doNotEvict.find(lit);
537     if (dne_it != doNotEvict.end())
538     {
539         refCount = dne_it->refCount;
540         doNotEvict.erase(dne_it);
541     }
542 
543     auto tbd_it = toBeDeleted.find(lit);
544     bool hasTBDEntry = (tbd_it != toBeDeleted.end());
545     if (hasTBDEntry)
546         toBeDeleted.erase(tbd_it);
547 
548     *lit = newKey;
549 
550     if (hasTBDEntry)
551         toBeDeleted.insert(lit);
552     if (refCount != 0)
553     {
554         pair<DNE_t::iterator, bool> dne_tmp = doNotEvict.insert(lit);
555         const_cast<DNEElement &>(*(dne_tmp.first)).refCount = refCount;
556     }
557 
558     m_lru.insert(lit);
559     currentCacheSize += sizediff;
560 }
561 
ifExistsThenDelete(const string & key)562 int PrefixCache::ifExistsThenDelete(const string &key)
563 {
564     bf::path cachedPath = cachePrefix / key;
565     bf::path journalPath = journalPrefix / (key + ".journal");
566 
567     boost::unique_lock<boost::mutex> s(lru_mutex);
568     bool objectExists = false;
569 
570     auto it = m_lru.find(key);
571     if (it != m_lru.end())
572     {
573         if (toBeDeleted.find(it->lit) == toBeDeleted.end())
574         {
575             doNotEvict.erase(it->lit);
576             lru.erase(it->lit);
577             m_lru.erase(it);
578             objectExists = true;
579         }
580         else  // let makeSpace() delete it if it's already in progress
581             return 0;
582     }
583     bool journalExists = bf::exists(journalPath);
584     //assert(objectExists == bf::exists(cachedPath));
585 
586     size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0);
587     //size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0);
588     size_t journalSize = (journalExists ? bf::file_size(journalPath) : 0);
589     currentCacheSize -= (objectSize + journalSize);
590 
591     //assert(!objectExists || objectSize == bf::file_size(cachedPath));
592 
593     return (objectExists ? 1 : 0) | (journalExists ? 2 : 0);
594 }
595 
getCurrentCacheSize() const596 size_t PrefixCache::getCurrentCacheSize() const
597 {
598     return currentCacheSize;
599 }
600 
getCurrentCacheElementCount() const601 size_t PrefixCache::getCurrentCacheElementCount() const
602 {
603     boost::unique_lock<boost::mutex> s(lru_mutex);
604     assert(m_lru.size() == lru.size());
605     return m_lru.size();
606 }
607 
reset()608 void PrefixCache::reset()
609 {
610     boost::unique_lock<boost::mutex> s(lru_mutex);
611     m_lru.clear();
612     lru.clear();
613     toBeDeleted.clear();
614     doNotEvict.clear();
615 
616     bf::directory_iterator dir;
617     bf::directory_iterator dend;
618     for (dir = bf::directory_iterator(cachePrefix); dir != dend; ++dir)
619         bf::remove_all(dir->path());
620 
621     for (dir = bf::directory_iterator(journalPrefix); dir != dend; ++dir)
622         bf::remove_all(dir->path());
623     currentCacheSize = 0;
624 }
625 
shutdown()626 void PrefixCache::shutdown()
627 {
628     /* Does this need to do something anymore? */
629 }
630 
631 /* The helper classes */
632 
M_LRU_element_t(const string * k)633 PrefixCache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k)
634 {}
635 
M_LRU_element_t(const string & k)636 PrefixCache::M_LRU_element_t::M_LRU_element_t(const string &k) : key(&k)
637 {}
638 
M_LRU_element_t(const LRU_t::iterator & i)639 PrefixCache::M_LRU_element_t::M_LRU_element_t(const LRU_t::iterator &i) : key(&(*i)), lit(i)
640 {}
641 
operator ()(const M_LRU_element_t & l) const642 inline size_t PrefixCache::KeyHasher::operator()(const M_LRU_element_t &l) const
643 {
644     return hash<string>()(*(l.key));
645 }
646 
operator ()(const M_LRU_element_t & l1,const M_LRU_element_t & l2) const647 inline bool PrefixCache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const
648 {
649     return (*(l1.key) == *(l2.key));
650 }
651 
operator ()(const DNEElement & l) const652 inline size_t PrefixCache::DNEHasher::operator()(const DNEElement &l) const
653 {
654     return (l.sKey.empty() ? hash<string>()(*(l.key)) : hash<string>()(l.sKey));
655 }
656 
operator ()(const DNEElement & l1,const DNEElement & l2) const657 inline bool PrefixCache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const
658 {
659     const string *s1, *s2;
660     s1 = l1.sKey.empty() ? &(*(l1.key)) : &(l1.sKey);
661     s2 = l2.sKey.empty() ? &(*(l2.key)) : &(l2.sKey);
662 
663     return (*s1 == *s2);
664 }
665 
operator ()(const LRU_t::iterator & i1,const LRU_t::iterator & i2) const666 inline bool PrefixCache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const
667 {
668     return *i1 < *i2;
669 }
670 
671 }
672 
673 
674 
675 
676