1 /* Copyright (C) 2019 MariaDB Corporation
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 
19 #include "Synchronizer.h"
20 #include "Cache.h"
21 #include "IOCoordinator.h"
22 #include "MetadataFile.h"
23 #include "Utilities.h"
24 #include <boost/thread/mutex.hpp>
25 
26 #include <sys/stat.h>
27 #include <sys/types.h>
28 #include <fcntl.h>
29 
30 using namespace std;
31 
32 namespace
33 {
34     storagemanager::Synchronizer *instance = NULL;
35     boost::mutex inst_mutex;
36 }
37 
38 namespace bf = boost::filesystem;
39 namespace storagemanager
40 {
41 
get()42 Synchronizer * Synchronizer::get()
43 {
44     if (instance)
45         return instance;
46     boost::unique_lock<boost::mutex> lock(inst_mutex);
47     if (instance)
48         return instance;
49     instance = new Synchronizer();
50     return instance;
51 }
52 
Synchronizer()53 Synchronizer::Synchronizer() : maxUploads(0)
54 {
55     Config *config = Config::get();
56     logger = SMLogging::get();
57     cache = Cache::get();
58     replicator = Replicator::get();
59     ioc = IOCoordinator::get();
60     cs = CloudStorage::get();
61 
62     numBytesRead = numBytesWritten = numBytesUploaded = numBytesDownloaded = mergeDiff =
63         flushesTriggeredBySize = flushesTriggeredByTimer = journalsMerged =
64         objectsSyncedWithNoJournal = bytesReadBySync = bytesReadBySyncWithJournal = 0;
65 
66     journalPath = cache->getJournalPath();
67     cachePath = cache->getCachePath();
68     threadPool.reset(new ThreadPool());
69     configListener();
70     config->addConfigListener(this);
71     die = false;
72     journalSizeThreshold = cache->getMaxCacheSize() / 2;
73     blockNewJobs = false;
74     syncThread = boost::thread([this] () { this->periodicSync(); });
75 }
76 
~Synchronizer()77 Synchronizer::~Synchronizer()
78 {
79     /* should this wait until all pending work is done,
80         or save the list it's working on.....
81         For milestone 2, this will do the safe thing and finish working first.
82         Later we can get fancy. */
83     Config::get()->removeConfigListener(this);
84     forceFlush();
85     die = true;
86     syncThread.join();
87     threadPool.reset();
88 }
89 
90 enum OpFlags
91 {
92     NOOP = 0,
93     JOURNAL = 0x1,
94     DELETE = 0x2,
95     NEW_OBJECT = 0x4,
96 };
97 
98 /* XXXPAT.  Need to revisit this later.  To make multiple prefix functionality as minimal as possible,
99 I limited the changes to key string manipulation where possible.  The keys it manages have the prefix they
100 belong to prepended.  So key 12345 in prefix p1 becomes p1/12345.  This is not the most elegant or performant
101 option, just the least invasive.
102 */
103 
newPrefix(const bf::path & p)104 void Synchronizer::newPrefix(const bf::path &p)
105 {
106     uncommittedJournalSize[p] = 0;
107 }
108 
dropPrefix(const bf::path & p)109 void Synchronizer::dropPrefix(const bf::path &p)
110 {
111     syncNow(p);
112     boost::unique_lock<boost::mutex> s(mutex);
113     uncommittedJournalSize.erase(p);
114 }
115 
_newJournalEntry(const bf::path & prefix,const string & _key,size_t size)116 void Synchronizer::_newJournalEntry(const bf::path &prefix, const string &_key, size_t size)
117 {
118     string key = (prefix/_key).string();
119     uncommittedJournalSize[prefix] += size;
120     auto it = pendingOps.find(key);
121     if (it != pendingOps.end())
122     {
123         it->second->opFlags |= JOURNAL;
124         return;
125     }
126     //makeJob(key);
127     pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL));
128 }
129 
newJournalEntry(const bf::path & prefix,const string & _key,size_t size)130 void Synchronizer::newJournalEntry(const bf::path &prefix, const string &_key, size_t size)
131 {
132     boost::unique_lock<boost::mutex> s(mutex);
133     _newJournalEntry(prefix, _key, size);
134     if (uncommittedJournalSize[prefix] > journalSizeThreshold)
135     {
136         uncommittedJournalSize[prefix] = 0;
137         s.unlock();
138         forceFlush();
139     }
140 }
141 
newJournalEntries(const bf::path & prefix,const vector<pair<string,size_t>> & keys)142 void Synchronizer::newJournalEntries(const bf::path &prefix, const vector<pair<string, size_t> > &keys)
143 {
144     boost::unique_lock<boost::mutex> s(mutex);
145     for (auto &keysize : keys)
146         _newJournalEntry(prefix, keysize.first, keysize.second);
147     if (uncommittedJournalSize[prefix] > journalSizeThreshold)
148     {
149         uncommittedJournalSize[prefix] = 0;
150         s.unlock();
151         forceFlush();
152     }
153 }
154 
newObjects(const bf::path & prefix,const vector<string> & keys)155 void Synchronizer::newObjects(const bf::path &prefix, const vector<string> &keys)
156 {
157     boost::unique_lock<boost::mutex> s(mutex);
158 
159     for (const string &_key : keys)
160     {
161         bf::path key(prefix/_key);
162         assert(pendingOps.find(key.string()) == pendingOps.end());
163         //makeJob(key);
164         pendingOps[key.string()] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
165     }
166 }
167 
deletedObjects(const bf::path & prefix,const vector<string> & keys)168 void Synchronizer::deletedObjects(const bf::path &prefix, const vector<string> &keys)
169 {
170     boost::unique_lock<boost::mutex> s(mutex);
171 
172     for (const string &_key : keys)
173     {
174         bf::path key(prefix/_key);
175         auto it = pendingOps.find(key.string());
176         if (it != pendingOps.end())
177             it->second->opFlags |= DELETE;
178         else
179             pendingOps[key.string()] = boost::shared_ptr<PendingOps>(new PendingOps(DELETE));
180     }
181     // would be good to signal to the things in opsInProgress that these were deleted.  That would
182     // quiet down the logging somewhat.  How to do that efficiently, and w/o gaps or deadlock...
183 }
184 
flushObject(const bf::path & prefix,const string & _key)185 void Synchronizer::flushObject(const bf::path &prefix, const string &_key)
186 {
187     string key = (prefix/_key).string();
188 
189     while (blockNewJobs)
190         boost::this_thread::sleep_for(boost::chrono::seconds(1));
191 
192     boost::unique_lock<boost::mutex> s(mutex);
193 
194     // if there is something to do on key, it should be either in pendingOps or opsInProgress
195     // if it is is pending ops, start the job now.  If it is in progress, wait for it to finish.
196     // If there are no jobs to do on key, it will verify it exists in cloud storage.
197     // The existence check is currently necessary on a startup where the cache is populated but
198     // synchronizer isn't.
199 
200     bool noExistingJob = false;
201     auto it = pendingOps.find(key);
202     if (it != pendingOps.end())
203     {
204         objNames.push_front(key);
205         auto nameIt = objNames.begin();
206         s.unlock();
207         process(nameIt);
208         s.lock();
209     }
210     else
211     {
212         auto op = opsInProgress.find(key);
213         // it's already in progress
214         if (op != opsInProgress.end()) {
215             boost::shared_ptr<PendingOps> tmp = op->second;
216             tmp->wait(&mutex);
217         }
218         else
219         {
220             // it's not in either one, trigger existence check
221             noExistingJob = true;
222         }
223     }
224 
225     if (!noExistingJob)
226         return;
227 
228     // check whether this key is in cloud storage
229     bool keyExists, journalExists;
230     int err;
231     do {
232         err = cs->exists(_key.c_str(), &keyExists);
233         if (err)
234         {
235             char buf[80];
236             logger->log(LOG_CRIT, "Sync::flushObject(): cloud existence check failed, got '%s'", strerror_r(errno, buf, 80));
237             sleep(5);
238         }
239     } while (err);
240     journalExists = bf::exists(journalPath/(key + ".journal"));
241 
242     if (journalExists)
243     {
244         logger->log(LOG_DEBUG, "Sync::flushObject(): %s has a journal, "
245             "and there is no job for it.  Merging & uploading now.", key.c_str());
246         pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL));
247         objNames.push_front(key);
248         auto nameIt = objNames.begin();
249         s.unlock();
250         process(nameIt);
251     }
252     else if (!keyExists)
253     {
254         logger->log(LOG_DEBUG, "Sync::flushObject(): %s does not exist in cloud storage, "
255             "and there is no job for it.  Uploading it now.", key.c_str());
256         pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
257         objNames.push_front(key);
258         auto nameIt = objNames.begin();
259         s.unlock();
260         process(nameIt);
261     }
262 }
263 
periodicSync()264 void Synchronizer::periodicSync()
265 {
266     boost::unique_lock<boost::mutex> lock(mutex);
267     while (!die)
268     {
269         lock.unlock();
270         bool wasTriggeredBySize = false;
271         try
272         {
273             boost::this_thread::sleep_for(syncInterval);
274         }
275         catch (const boost::thread_interrupted)
276         {
277             //logger->log(LOG_DEBUG,"Synchronizer Force Flush.");
278             wasTriggeredBySize = true;
279         }
280         lock.lock();
281         if (blockNewJobs)
282             continue;
283         if (!pendingOps.empty())
284         {
285             if (wasTriggeredBySize)
286                 ++flushesTriggeredBySize;
287             else
288                 ++flushesTriggeredByTimer;
289         }
290         //cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " <<
291         //    threadPool.currentQueueSize() << endl;
292         for (auto &job : pendingOps)
293             makeJob(job.first);
294         for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it)
295             it->second = 0;
296     }
297 }
298 
syncNow(const bf::path & prefix)299 void Synchronizer::syncNow(const bf::path &prefix)
300 {
301     boost::unique_lock<boost::mutex> lock(mutex);
302 
303     // This should ensure that all pendingOps have been added as jobs
304     // and waits for them to complete. until pendingOps is empty.
305     // this should be redone to only remove items of given prefix eventually
306 
307     blockNewJobs = true;
308     while (pendingOps.size() != 0 || opsInProgress.size() != 0)
309     {
310         for (auto &job : pendingOps)
311             makeJob(job.first);
312         for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it)
313             it->second = 0;
314         lock.unlock();
315         while (opsInProgress.size() > 0)
316             boost::this_thread::sleep_for(boost::chrono::seconds(1));
317         lock.lock();
318     }
319     blockNewJobs = false;
320 }
321 
syncNow()322 void Synchronizer::syncNow()
323 {
324     boost::unique_lock<boost::mutex> lock(mutex);
325 
326     // This should ensure that all pendingOps have been added as jobs
327     // and waits for them to complete. until pendingOps is empty.
328     // Used by the mcsadmin command suspendDatabaseWrites.
329     // Leaving S3 storage and local metadata directories sync'd for snapshot backups.
330 
331     blockNewJobs = true;
332     while (pendingOps.size() != 0 || opsInProgress.size() != 0)
333     {
334         for (auto &job : pendingOps)
335             makeJob(job.first);
336         for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it)
337             it->second = 0;
338         lock.unlock();
339         while (opsInProgress.size() > 0)
340             boost::this_thread::sleep_for(boost::chrono::seconds(1));
341         lock.lock();
342     }
343     blockNewJobs = false;
344 }
345 
346 
forceFlush()347 void Synchronizer::forceFlush()
348 {
349     boost::unique_lock<boost::mutex> lock(mutex);
350 
351     syncThread.interrupt();
352 }
353 
makeJob(const string & key)354 void Synchronizer::makeJob(const string &key)
355 {
356     objNames.push_front(key);
357 
358     boost::shared_ptr<Job> j(new Job(this, objNames.begin()));
359     threadPool->addJob(j);
360 }
361 
process(list<string>::iterator name)362 void Synchronizer::process(list<string>::iterator name)
363 {
364     /*
365         check if there is a pendingOp for name
366         if yes, start processing it
367         if no,
368             check if there is an ongoing op and block on it
369             if not, return
370     */
371 
372     boost::unique_lock<boost::mutex> s(mutex);
373 
374     string &key = *name;
375     auto it = pendingOps.find(key);
376     if (it == pendingOps.end())
377     {
378         auto op = opsInProgress.find(key);
379         // it's already in progress
380         if (op != opsInProgress.end())
381         {
382             boost::shared_ptr<PendingOps> tmp = op->second;
383             tmp->wait(&mutex);
384             objNames.erase(name);
385             return;
386         }
387         else
388         {
389             // it's not in pending or opsinprogress, nothing to do
390             objNames.erase(name);
391             return;
392         }
393     }
394 
395     boost::shared_ptr<PendingOps> pending = it->second;
396     bool inserted = opsInProgress.insert(*it).second;
397     if (!inserted)
398     {
399         objNames.erase(name);
400         return;    // the one in pending will have to wait until the next time to avoid clobbering waiting threads
401     }
402 
403     // Because of the ownership thing and the odd set of changes that it required,
404     // we need to strip the prefix from key.
405     size_t first_slash_pos = key.find_first_of('/');
406     string realKey = key.substr(first_slash_pos + 1);
407     string sourceFile = MetadataFile::getSourceFromKey(realKey);
408     pendingOps.erase(it);
409     s.unlock();
410 
411     bool success = false;
412     int retryCount = 0;
413     while (!success)
414     {
415         assert(!s.owns_lock());
416         try {
417             // Exceptions should only happen b/c of cloud service errors that can't be retried.
418             // This code is intentionally racy to avoid having to grab big locks.
419             // In particular, it's possible that by the time synchronize() runs,
420             // the file to sync has already been deleted.  When one of these functions
421             // encounters a state that doesn't make sense, such as being told to upload a file
422             // that doesn't exist, it will return successfully under the assumption that
423             // things are working as they should upstream, and a syncDelete() call will be coming
424             // shortly.
425             if (pending->opFlags & DELETE)
426                 synchronizeDelete(sourceFile, name);
427             else if (pending->opFlags & JOURNAL)
428                 synchronizeWithJournal(sourceFile, name);
429             else if (pending->opFlags & NEW_OBJECT)
430                 synchronize(sourceFile, name);
431             else
432                 throw logic_error("Synchronizer::process(): got an unknown op flag");
433             s.lock();
434             pending->notify();
435             success = true;
436         }
437         catch(exception &e) {
438             // these are often self-resolving, so we will suppress logging it for 10 iterations, then escalate
439             // to error, then to crit
440             //if (++retryCount >= 10)
441                 logger->log((retryCount < 20 ? LOG_ERR : LOG_CRIT), "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'.  Retrying...", key.c_str(),
442                     pending->opFlags, e.what());
443             success = false;
444             sleep(1);
445             continue;
446             /*  TODO:  Need to think this about this requeue logic again.  The potential problem is that
447                 there may be threads waiting for this job to finish.  If the insert doesn't happen because
448                 there is already a job in pendingOps for the same file, then the threads waiting on this
449                 job never get woken, right??  Or, can that never happen for some reason?
450             */
451             s.lock();
452             auto inserted = pendingOps.insert(pair<string, boost::shared_ptr<PendingOps> >(key, pending));
453             if (!inserted.second)
454                 inserted.first->second->opFlags |= pending->opFlags;
455             opsInProgress.erase(key);
456             makeJob(key);
457             objNames.erase(name);
458             return;
459         }
460     }
461 
462     opsInProgress.erase(*name);
463     objNames.erase(name);
464 }
465 
synchronize(const string & sourceFile,list<string>::iterator & it)466 void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator &it)
467 {
468     ScopedReadLock s(ioc, sourceFile);
469 
470     string key = *it;
471     size_t pos = key.find_first_of('/');
472     bf::path prefix = key.substr(0, pos);
473     string cloudKey = key.substr(pos + 1);
474     char buf[80];
475     bool exists = false;
476     int err;
477     bf::path objectPath = cachePath/key;
478     MetadataFile md(sourceFile, MetadataFile::no_create_t(),true);
479 
480     if (!md.exists())
481     {
482         logger->log(LOG_DEBUG, "synchronize(): no metadata found for %s.  It must have been deleted.", sourceFile.c_str());
483         try
484         {
485             if (!bf::exists(objectPath))
486                 return;
487             size_t size = bf::file_size(objectPath);
488             replicator->remove(objectPath);
489             cache->deletedObject(prefix, cloudKey, size);
490             cs->deleteObject(cloudKey);
491         }
492         catch (exception &e)
493         {
494             logger->log(LOG_DEBUG, "synchronize(): failed to remove orphaned object '%s' from the cache, got %s",
495                 objectPath.string().c_str(), e.what());
496         }
497         return;
498     }
499 
500     metadataObject mdEntry;
501     bool entryExists = md.getEntry(MetadataFile::getOffsetFromKey(cloudKey), &mdEntry);
502     if (!entryExists || cloudKey != mdEntry.key)
503     {
504         logger->log(LOG_DEBUG, "synchronize(): %s does not exist in metadata for %s.  This suggests truncation.", key.c_str(), sourceFile.c_str());
505         return;
506     }
507 
508     //assert(key == mdEntry->key);  <-- This could fail b/c of truncation + a write/append before this job runs.
509 
510     err = cs->exists(cloudKey, &exists);
511     if (err)
512         throw runtime_error(string("synchronize(): checking existence of ") + key + ", got " +
513             strerror_r(errno, buf, 80));
514     if (exists)
515         return;
516 
517     exists = cache->exists(prefix, cloudKey);
518     if (!exists)
519     {
520         logger->log(LOG_DEBUG, "synchronize(): was told to upload %s but it does not exist locally", key.c_str());
521         return;
522     }
523 
524     err = cs->putObject(objectPath.string(), cloudKey);
525     if (err)
526         throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80));
527 
528     numBytesRead += mdEntry.length;
529     bytesReadBySync += mdEntry.length;
530     numBytesUploaded += mdEntry.length;
531     ++objectsSyncedWithNoJournal;
532     replicator->remove(objectPath, Replicator::NO_LOCAL);
533 }
534 
synchronizeDelete(const string & sourceFile,list<string>::iterator & it)535 void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it)
536 {
537     ScopedWriteLock s(ioc, sourceFile);
538     string cloudKey = it->substr(it->find('/') + 1);
539     cs->deleteObject(cloudKey);
540 }
541 
synchronizeWithJournal(const string & sourceFile,list<string>::iterator & lit)542 void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>::iterator &lit)
543 {
544     ScopedWriteLock s(ioc, sourceFile);
545 
546     char buf[80];
547     string key = *lit;
548     size_t pos = key.find_first_of('/');
549     bf::path prefix = key.substr(0, pos);
550     string cloudKey = key.substr(pos + 1);
551 
552     MetadataFile md(sourceFile, MetadataFile::no_create_t(),true);
553 
554     if (!md.exists())
555     {
556         logger->log(LOG_DEBUG, "synchronizeWithJournal(): no metadata found for %s.  It must have been deleted.", sourceFile.c_str());
557         try
558         {
559             bf::path objectPath = cachePath/key;
560             if (bf::exists(objectPath))
561             {
562                 size_t objSize = bf::file_size(objectPath);
563                 replicator->remove(objectPath);
564                 cache->deletedObject(prefix, cloudKey, objSize);
565                 cs->deleteObject(cloudKey);
566             }
567             bf::path jPath = journalPath/(key + ".journal");
568             if (bf::exists(jPath))
569             {
570                 size_t jSize = bf::file_size(jPath);
571                 replicator->remove(jPath);
572                 cache->deletedJournal(prefix, jSize);
573             }
574         }
575         catch(exception &e)
576         {
577             logger->log(LOG_DEBUG, "synchronizeWithJournal(): failed to remove orphaned object '%s' from the cache, got %s",
578                 (cachePath/key).string().c_str(), e.what());
579         }
580         return;
581     }
582 
583     metadataObject mdEntry;
584     bool metaExists = md.getEntry(MetadataFile::getOffsetFromKey(cloudKey), &mdEntry);
585     if (!metaExists || cloudKey != mdEntry.key)
586     {
587         logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s does not exist in metadata for %s.  This suggests truncation.", key.c_str(), sourceFile.c_str());
588         return;
589     }
590     //assert(key == mdEntry->key);   <--- I suspect this can happen in a truncate + write situation + a deep sync queue
591 
592     bf::path oldCachePath = cachePath / key;
593     string journalName = (journalPath/(key + ".journal")).string();
594 
595     if (!bf::exists(journalName))
596     {
597         logger->log(LOG_DEBUG, "synchronizeWithJournal(): no journal file found for %s", key.c_str());
598 
599         // sanity check + add'l info.  Test whether the object exists in cloud storage.  If so, complain,
600         // and run synchronize() instead.
601         bool existsOnCloud;
602         int err = cs->exists(cloudKey, &existsOnCloud);
603         if (err)
604             throw runtime_error(string("Synchronizer: cs->exists() failed: ") + strerror_r(errno, buf, 80));
605         if (!existsOnCloud)
606         {
607             if (cache->exists(prefix, cloudKey))
608             {
609                 logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal and does not exist in the cloud, calling "
610                     "synchronize() instead.  Need to explain how this happens.", key.c_str());
611                 s.unlock();
612                 synchronize(sourceFile, lit);
613             }
614             else
615                 logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal, and does not exist in the cloud or in "
616                     " the local cache.  Need to explain how this happens.", key.c_str());
617             return;
618         }
619         else
620             logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal, but it does exist in the cloud. "
621                 " This indicates that an overlapping syncWithJournal() call handled it properly.", key.c_str());
622 
623         return;
624     }
625 
626     int err;
627     boost::shared_array<uint8_t> data;
628     size_t count = 0, size = mdEntry.length, originalSize = 0;
629 
630     bool oldObjIsCached = cache->exists(prefix, cloudKey);
631 
632     // get the base object if it is not already cached
633     // merge it with its journal file
634     if (!oldObjIsCached)
635     {
636         err = cs->getObject(cloudKey, &data, &size);
637         if (err)
638         {
639             if (errno == ENOENT)
640             {
641                 logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s does not exist in cache nor in cloud storage", key.c_str());
642                 return;
643             }
644             throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80));
645         }
646 
647         numBytesDownloaded += size;
648         originalSize += size;
649 
650         //TODO!!  This sucks.  Need a way to pass in a larger array to cloud storage, and also have it not
651         // do any add'l alloc'ing or copying
652         if (size < mdEntry.length)
653         {
654             boost::shared_array<uint8_t> tmp(new uint8_t[mdEntry.length]());
655             memcpy(tmp.get(), data.get(), size);
656             memset(&tmp[size], 0, mdEntry.length - size);
657             data.swap(tmp);
658         }
659         size = mdEntry.length;    // reset length.  Using the metadata as a source of truth (truncation), not actual file length.
660 
661         size_t _bytesRead = 0;
662         err = ioc->mergeJournalInMem(data, size, journalName.c_str(), &_bytesRead);
663         if (err)
664         {
665             if (!bf::exists(journalName))
666                 logger->log(LOG_DEBUG, "synchronizeWithJournal(): journal %s was deleted mid-operation, check locking",
667                     journalName.c_str());
668             else
669                 logger->log(LOG_ERR, "synchronizeWithJournal(): unexpected error merging journal for %s", key.c_str());
670             return;
671         }
672         numBytesRead += _bytesRead;
673         bytesReadBySyncWithJournal += _bytesRead;
674         originalSize += _bytesRead;
675     }
676     else
677     {
678         size_t _bytesRead = 0;
679         data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, size, &_bytesRead);
680         if (!data)
681         {
682             if (!bf::exists(journalName))
683                 logger->log(LOG_DEBUG, "synchronizeWithJournal(): journal %s was deleted mid-operation, check locking",
684                     journalName.c_str());
685             else
686                 logger->log(LOG_ERR, "synchronizeWithJournal(): unexpected error merging journal for %s", key.c_str());
687             return;
688         }
689         numBytesRead += _bytesRead;
690         bytesReadBySyncWithJournal += _bytesRead;
691         originalSize = _bytesRead;
692     }
693 
694     // original size here should be == objectsize + journalsize
695 
696     // get a new key for the resolved version & upload it
697     string newCloudKey = MetadataFile::getNewKeyFromOldKey(cloudKey, size);
698     string newKey = (prefix/newCloudKey).string();
699     err = cs->putObject(data, size, newCloudKey);
700     if (err)
701     {
702         // try to delete it in cloud storage... unlikely it is there in the first place, and if it is
703         // this probably won't work
704         int l_errno = errno;
705         cs->deleteObject(newCloudKey);
706         throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(l_errno, buf, 80));
707     }
708     numBytesUploaded += size;
709 
710     // if the object was cached...
711     //   write the new data to disk,
712     //   tell the cache about the rename
713     //   rename the file in any pending ops in Synchronizer
714 
715     if (oldObjIsCached)
716     {
717         // Is this the only thing outside of Replicator that writes files?
718         // If so move this write loop to Replicator.
719         bf::path newCachePath = cachePath / newKey;
720         int newFD = ::open(newCachePath.string().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
721         if (newFD < 0)
722             throw runtime_error(string("Synchronizer: Failed to open a new object in local storage!  Got ")
723               + strerror_r(errno, buf, 80));
724         ScopedCloser scloser(newFD);
725 
726         while (count < size)
727         {
728             err = ::write(newFD, &data[count], size - count);
729             if (err < 0)
730             {
731                 ::unlink(newCachePath.string().c_str());
732                 throw runtime_error(string("Synchronizer: Failed to write to a new object in local storage!  Got ")
733                   + strerror_r(errno, buf, 80));
734             }
735             count += err;
736         }
737         numBytesWritten += size;
738 
739         size_t oldSize = bf::file_size(oldCachePath);
740 
741         cache->rename(prefix, cloudKey, newCloudKey, size - oldSize);
742         replicator->remove(oldCachePath);
743 
744         // This condition is probably irrelevant for correct functioning now,
745         // but it should be very rare so what the hell.
746         if (oldSize != MetadataFile::getLengthFromKey(cloudKey))
747         {
748             ostringstream oss;
749             oss << "Synchronizer::synchronizeWithJournal(): detected a mismatch between file size and " <<
750                 "length stored in the object name. object name = " << cloudKey << " length-in-name = " <<
751                 MetadataFile::getLengthFromKey(cloudKey) << " real-length = " << oldSize;
752             logger->log(LOG_WARNING, oss.str().c_str());
753         }
754     }
755 
756     mergeDiff += size - originalSize;
757     ++journalsMerged;
758     // update the metadata for the source file
759 
760     md.updateEntry(MetadataFile::getOffsetFromKey(cloudKey), newCloudKey, size);
761     replicator->updateMetadata(md);
762 
763     rename(key, newKey);
764 
765     // delete the old object & journal file
766     cache->deletedJournal(prefix, bf::file_size(journalName));
767     replicator->remove(journalName);
768     cs->deleteObject(cloudKey);
769 }
770 
rename(const string & oldKey,const string & newKey)771 void Synchronizer::rename(const string &oldKey, const string &newKey)
772 {
773     boost::unique_lock<boost::mutex> s(mutex);
774 
775     auto it = pendingOps.find(oldKey);
776     if (it != pendingOps.end())
777     {
778         pendingOps[newKey] = it->second;
779         pendingOps.erase(it);
780     }
781     it = opsInProgress.find(oldKey);
782     if (it != opsInProgress.end())
783     {
784         opsInProgress[newKey] = it->second;
785         opsInProgress.erase(it);
786     }
787 
788     for (auto &name: objNames)
789         if (name == oldKey)
790             name = newKey;
791 }
792 
getJournalPath()793 bf::path Synchronizer::getJournalPath()
794 {
795     return journalPath;
796 }
797 
getCachePath()798 bf::path Synchronizer::getCachePath()
799 {
800     return cachePath;
801 }
802 
printKPIs() const803 void Synchronizer::printKPIs() const
804 {
805     cout << "Synchronizer" << endl;
806     cout << "\tnumBytesRead: " << numBytesRead << endl;
807     cout << "\tbytesReadBySync: " << bytesReadBySync << endl;
808     cout << "\tbytesReadBySyncWithJournal: " << bytesReadBySyncWithJournal << endl;
809     cout << "\tnumBytesWritten: " << numBytesWritten << endl;
810     cout << "\tnumBytesUploaded: " << numBytesUploaded << endl;
811     cout << "\tnumBytesDownloaded: " << numBytesDownloaded << endl;
812     cout << "\tmergeDiff: " << mergeDiff << endl;
813     cout << "\tflushesTriggeredBySize: " << flushesTriggeredBySize << endl;
814     cout << "\tflushesTriggeredByTimer: " << flushesTriggeredByTimer << endl;
815     cout << "\tjournalsMerged: " << journalsMerged << endl;
816     cout << "\tobjectsSyncedWithNoJournal: " << objectsSyncedWithNoJournal << endl;
817 }
818 
819 /* The helper objects & fcns */
820 
PendingOps(int flags)821 Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), waiters(0), finished(false)
822 {
823 }
824 
~PendingOps()825 Synchronizer::PendingOps::~PendingOps()
826 {
827     assert(waiters == 0);
828 }
829 
notify()830 void Synchronizer::PendingOps::notify()
831 {
832     finished = true;
833     condvar.notify_all();
834 }
835 
wait(boost::mutex * m)836 void Synchronizer::PendingOps::wait(boost::mutex *m)
837 {
838     while (!finished)
839     {
840         waiters++;
841         condvar.wait(*m);
842         waiters--;
843     }
844 }
845 
Job(Synchronizer * s,std::list<std::string>::iterator i)846 Synchronizer::Job::Job(Synchronizer *s, std::list<std::string>::iterator i) : sync(s), it(i)
847 { }
848 
operator ()()849 void Synchronizer::Job::operator()()
850 {
851     sync->process(it);
852 }
853 
configListener()854 void Synchronizer::configListener()
855 {
856     // Uploader threads
857     string stmp = Config::get()->getValue("ObjectStorage", "max_concurrent_uploads");
858     if (maxUploads == 0)
859         maxUploads = 20;
860     if (stmp.empty())
861     {
862         logger->log(LOG_CRIT, "max_concurrent_uploads is not set. Using current value = %u",maxUploads);
863     }
864     try
865     {
866         uint newValue = stoul(stmp);
867         if (newValue != maxUploads)
868         {
869             maxUploads = newValue;
870             threadPool->setMaxThreads(maxUploads);
871             logger->log(LOG_INFO, "max_concurrent_uploads = %u",maxUploads);
872         }
873     }
874     catch (invalid_argument &)
875     {
876         logger->log(LOG_CRIT, "max_concurrent_uploads is not a number. Using current value = %u",maxUploads);
877     }
878 }
879 }
880