1 /* Copyright (C) 2019 MariaDB Corporation
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18
19 #include "Synchronizer.h"
20 #include "Cache.h"
21 #include "IOCoordinator.h"
22 #include "MetadataFile.h"
23 #include "Utilities.h"
24 #include <boost/thread/mutex.hpp>
25
26 #include <sys/stat.h>
27 #include <sys/types.h>
28 #include <fcntl.h>
29
30 using namespace std;
31
32 namespace
33 {
34 storagemanager::Synchronizer *instance = NULL;
35 boost::mutex inst_mutex;
36 }
37
38 namespace bf = boost::filesystem;
39 namespace storagemanager
40 {
41
get()42 Synchronizer * Synchronizer::get()
43 {
44 if (instance)
45 return instance;
46 boost::unique_lock<boost::mutex> lock(inst_mutex);
47 if (instance)
48 return instance;
49 instance = new Synchronizer();
50 return instance;
51 }
52
Synchronizer()53 Synchronizer::Synchronizer() : maxUploads(0)
54 {
55 Config *config = Config::get();
56 logger = SMLogging::get();
57 cache = Cache::get();
58 replicator = Replicator::get();
59 ioc = IOCoordinator::get();
60 cs = CloudStorage::get();
61
62 numBytesRead = numBytesWritten = numBytesUploaded = numBytesDownloaded = mergeDiff =
63 flushesTriggeredBySize = flushesTriggeredByTimer = journalsMerged =
64 objectsSyncedWithNoJournal = bytesReadBySync = bytesReadBySyncWithJournal = 0;
65
66 journalPath = cache->getJournalPath();
67 cachePath = cache->getCachePath();
68 threadPool.reset(new ThreadPool());
69 configListener();
70 config->addConfigListener(this);
71 die = false;
72 journalSizeThreshold = cache->getMaxCacheSize() / 2;
73 blockNewJobs = false;
74 syncThread = boost::thread([this] () { this->periodicSync(); });
75 }
76
~Synchronizer()77 Synchronizer::~Synchronizer()
78 {
79 /* should this wait until all pending work is done,
80 or save the list it's working on.....
81 For milestone 2, this will do the safe thing and finish working first.
82 Later we can get fancy. */
83 Config::get()->removeConfigListener(this);
84 forceFlush();
85 die = true;
86 syncThread.join();
87 threadPool.reset();
88 }
89
90 enum OpFlags
91 {
92 NOOP = 0,
93 JOURNAL = 0x1,
94 DELETE = 0x2,
95 NEW_OBJECT = 0x4,
96 };
97
98 /* XXXPAT. Need to revisit this later. To make multiple prefix functionality as minimal as possible,
99 I limited the changes to key string manipulation where possible. The keys it manages have the prefix they
100 belong to prepended. So key 12345 in prefix p1 becomes p1/12345. This is not the most elegant or performant
101 option, just the least invasive.
102 */
103
newPrefix(const bf::path & p)104 void Synchronizer::newPrefix(const bf::path &p)
105 {
106 uncommittedJournalSize[p] = 0;
107 }
108
dropPrefix(const bf::path & p)109 void Synchronizer::dropPrefix(const bf::path &p)
110 {
111 syncNow(p);
112 boost::unique_lock<boost::mutex> s(mutex);
113 uncommittedJournalSize.erase(p);
114 }
115
_newJournalEntry(const bf::path & prefix,const string & _key,size_t size)116 void Synchronizer::_newJournalEntry(const bf::path &prefix, const string &_key, size_t size)
117 {
118 string key = (prefix/_key).string();
119 uncommittedJournalSize[prefix] += size;
120 auto it = pendingOps.find(key);
121 if (it != pendingOps.end())
122 {
123 it->second->opFlags |= JOURNAL;
124 return;
125 }
126 //makeJob(key);
127 pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL));
128 }
129
newJournalEntry(const bf::path & prefix,const string & _key,size_t size)130 void Synchronizer::newJournalEntry(const bf::path &prefix, const string &_key, size_t size)
131 {
132 boost::unique_lock<boost::mutex> s(mutex);
133 _newJournalEntry(prefix, _key, size);
134 if (uncommittedJournalSize[prefix] > journalSizeThreshold)
135 {
136 uncommittedJournalSize[prefix] = 0;
137 s.unlock();
138 forceFlush();
139 }
140 }
141
newJournalEntries(const bf::path & prefix,const vector<pair<string,size_t>> & keys)142 void Synchronizer::newJournalEntries(const bf::path &prefix, const vector<pair<string, size_t> > &keys)
143 {
144 boost::unique_lock<boost::mutex> s(mutex);
145 for (auto &keysize : keys)
146 _newJournalEntry(prefix, keysize.first, keysize.second);
147 if (uncommittedJournalSize[prefix] > journalSizeThreshold)
148 {
149 uncommittedJournalSize[prefix] = 0;
150 s.unlock();
151 forceFlush();
152 }
153 }
154
newObjects(const bf::path & prefix,const vector<string> & keys)155 void Synchronizer::newObjects(const bf::path &prefix, const vector<string> &keys)
156 {
157 boost::unique_lock<boost::mutex> s(mutex);
158
159 for (const string &_key : keys)
160 {
161 bf::path key(prefix/_key);
162 assert(pendingOps.find(key.string()) == pendingOps.end());
163 //makeJob(key);
164 pendingOps[key.string()] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
165 }
166 }
167
deletedObjects(const bf::path & prefix,const vector<string> & keys)168 void Synchronizer::deletedObjects(const bf::path &prefix, const vector<string> &keys)
169 {
170 boost::unique_lock<boost::mutex> s(mutex);
171
172 for (const string &_key : keys)
173 {
174 bf::path key(prefix/_key);
175 auto it = pendingOps.find(key.string());
176 if (it != pendingOps.end())
177 it->second->opFlags |= DELETE;
178 else
179 pendingOps[key.string()] = boost::shared_ptr<PendingOps>(new PendingOps(DELETE));
180 }
181 // would be good to signal to the things in opsInProgress that these were deleted. That would
182 // quiet down the logging somewhat. How to do that efficiently, and w/o gaps or deadlock...
183 }
184
flushObject(const bf::path & prefix,const string & _key)185 void Synchronizer::flushObject(const bf::path &prefix, const string &_key)
186 {
187 string key = (prefix/_key).string();
188
189 while (blockNewJobs)
190 boost::this_thread::sleep_for(boost::chrono::seconds(1));
191
192 boost::unique_lock<boost::mutex> s(mutex);
193
194 // if there is something to do on key, it should be either in pendingOps or opsInProgress
195 // if it is is pending ops, start the job now. If it is in progress, wait for it to finish.
196 // If there are no jobs to do on key, it will verify it exists in cloud storage.
197 // The existence check is currently necessary on a startup where the cache is populated but
198 // synchronizer isn't.
199
200 bool noExistingJob = false;
201 auto it = pendingOps.find(key);
202 if (it != pendingOps.end())
203 {
204 objNames.push_front(key);
205 auto nameIt = objNames.begin();
206 s.unlock();
207 process(nameIt);
208 s.lock();
209 }
210 else
211 {
212 auto op = opsInProgress.find(key);
213 // it's already in progress
214 if (op != opsInProgress.end()) {
215 boost::shared_ptr<PendingOps> tmp = op->second;
216 tmp->wait(&mutex);
217 }
218 else
219 {
220 // it's not in either one, trigger existence check
221 noExistingJob = true;
222 }
223 }
224
225 if (!noExistingJob)
226 return;
227
228 // check whether this key is in cloud storage
229 bool keyExists, journalExists;
230 int err;
231 do {
232 err = cs->exists(_key.c_str(), &keyExists);
233 if (err)
234 {
235 char buf[80];
236 logger->log(LOG_CRIT, "Sync::flushObject(): cloud existence check failed, got '%s'", strerror_r(errno, buf, 80));
237 sleep(5);
238 }
239 } while (err);
240 journalExists = bf::exists(journalPath/(key + ".journal"));
241
242 if (journalExists)
243 {
244 logger->log(LOG_DEBUG, "Sync::flushObject(): %s has a journal, "
245 "and there is no job for it. Merging & uploading now.", key.c_str());
246 pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL));
247 objNames.push_front(key);
248 auto nameIt = objNames.begin();
249 s.unlock();
250 process(nameIt);
251 }
252 else if (!keyExists)
253 {
254 logger->log(LOG_DEBUG, "Sync::flushObject(): %s does not exist in cloud storage, "
255 "and there is no job for it. Uploading it now.", key.c_str());
256 pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
257 objNames.push_front(key);
258 auto nameIt = objNames.begin();
259 s.unlock();
260 process(nameIt);
261 }
262 }
263
periodicSync()264 void Synchronizer::periodicSync()
265 {
266 boost::unique_lock<boost::mutex> lock(mutex);
267 while (!die)
268 {
269 lock.unlock();
270 bool wasTriggeredBySize = false;
271 try
272 {
273 boost::this_thread::sleep_for(syncInterval);
274 }
275 catch (const boost::thread_interrupted)
276 {
277 //logger->log(LOG_DEBUG,"Synchronizer Force Flush.");
278 wasTriggeredBySize = true;
279 }
280 lock.lock();
281 if (blockNewJobs)
282 continue;
283 if (!pendingOps.empty())
284 {
285 if (wasTriggeredBySize)
286 ++flushesTriggeredBySize;
287 else
288 ++flushesTriggeredByTimer;
289 }
290 //cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " <<
291 // threadPool.currentQueueSize() << endl;
292 for (auto &job : pendingOps)
293 makeJob(job.first);
294 for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it)
295 it->second = 0;
296 }
297 }
298
syncNow(const bf::path & prefix)299 void Synchronizer::syncNow(const bf::path &prefix)
300 {
301 boost::unique_lock<boost::mutex> lock(mutex);
302
303 // This should ensure that all pendingOps have been added as jobs
304 // and waits for them to complete. until pendingOps is empty.
305 // this should be redone to only remove items of given prefix eventually
306
307 blockNewJobs = true;
308 while (pendingOps.size() != 0 || opsInProgress.size() != 0)
309 {
310 for (auto &job : pendingOps)
311 makeJob(job.first);
312 for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it)
313 it->second = 0;
314 lock.unlock();
315 while (opsInProgress.size() > 0)
316 boost::this_thread::sleep_for(boost::chrono::seconds(1));
317 lock.lock();
318 }
319 blockNewJobs = false;
320 }
321
syncNow()322 void Synchronizer::syncNow()
323 {
324 boost::unique_lock<boost::mutex> lock(mutex);
325
326 // This should ensure that all pendingOps have been added as jobs
327 // and waits for them to complete. until pendingOps is empty.
328 // Used by the mcsadmin command suspendDatabaseWrites.
329 // Leaving S3 storage and local metadata directories sync'd for snapshot backups.
330
331 blockNewJobs = true;
332 while (pendingOps.size() != 0 || opsInProgress.size() != 0)
333 {
334 for (auto &job : pendingOps)
335 makeJob(job.first);
336 for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it)
337 it->second = 0;
338 lock.unlock();
339 while (opsInProgress.size() > 0)
340 boost::this_thread::sleep_for(boost::chrono::seconds(1));
341 lock.lock();
342 }
343 blockNewJobs = false;
344 }
345
346
forceFlush()347 void Synchronizer::forceFlush()
348 {
349 boost::unique_lock<boost::mutex> lock(mutex);
350
351 syncThread.interrupt();
352 }
353
makeJob(const string & key)354 void Synchronizer::makeJob(const string &key)
355 {
356 objNames.push_front(key);
357
358 boost::shared_ptr<Job> j(new Job(this, objNames.begin()));
359 threadPool->addJob(j);
360 }
361
process(list<string>::iterator name)362 void Synchronizer::process(list<string>::iterator name)
363 {
364 /*
365 check if there is a pendingOp for name
366 if yes, start processing it
367 if no,
368 check if there is an ongoing op and block on it
369 if not, return
370 */
371
372 boost::unique_lock<boost::mutex> s(mutex);
373
374 string &key = *name;
375 auto it = pendingOps.find(key);
376 if (it == pendingOps.end())
377 {
378 auto op = opsInProgress.find(key);
379 // it's already in progress
380 if (op != opsInProgress.end())
381 {
382 boost::shared_ptr<PendingOps> tmp = op->second;
383 tmp->wait(&mutex);
384 objNames.erase(name);
385 return;
386 }
387 else
388 {
389 // it's not in pending or opsinprogress, nothing to do
390 objNames.erase(name);
391 return;
392 }
393 }
394
395 boost::shared_ptr<PendingOps> pending = it->second;
396 bool inserted = opsInProgress.insert(*it).second;
397 if (!inserted)
398 {
399 objNames.erase(name);
400 return; // the one in pending will have to wait until the next time to avoid clobbering waiting threads
401 }
402
403 // Because of the ownership thing and the odd set of changes that it required,
404 // we need to strip the prefix from key.
405 size_t first_slash_pos = key.find_first_of('/');
406 string realKey = key.substr(first_slash_pos + 1);
407 string sourceFile = MetadataFile::getSourceFromKey(realKey);
408 pendingOps.erase(it);
409 s.unlock();
410
411 bool success = false;
412 int retryCount = 0;
413 while (!success)
414 {
415 assert(!s.owns_lock());
416 try {
417 // Exceptions should only happen b/c of cloud service errors that can't be retried.
418 // This code is intentionally racy to avoid having to grab big locks.
419 // In particular, it's possible that by the time synchronize() runs,
420 // the file to sync has already been deleted. When one of these functions
421 // encounters a state that doesn't make sense, such as being told to upload a file
422 // that doesn't exist, it will return successfully under the assumption that
423 // things are working as they should upstream, and a syncDelete() call will be coming
424 // shortly.
425 if (pending->opFlags & DELETE)
426 synchronizeDelete(sourceFile, name);
427 else if (pending->opFlags & JOURNAL)
428 synchronizeWithJournal(sourceFile, name);
429 else if (pending->opFlags & NEW_OBJECT)
430 synchronize(sourceFile, name);
431 else
432 throw logic_error("Synchronizer::process(): got an unknown op flag");
433 s.lock();
434 pending->notify();
435 success = true;
436 }
437 catch(exception &e) {
438 // these are often self-resolving, so we will suppress logging it for 10 iterations, then escalate
439 // to error, then to crit
440 //if (++retryCount >= 10)
441 logger->log((retryCount < 20 ? LOG_ERR : LOG_CRIT), "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(),
442 pending->opFlags, e.what());
443 success = false;
444 sleep(1);
445 continue;
446 /* TODO: Need to think this about this requeue logic again. The potential problem is that
447 there may be threads waiting for this job to finish. If the insert doesn't happen because
448 there is already a job in pendingOps for the same file, then the threads waiting on this
449 job never get woken, right?? Or, can that never happen for some reason?
450 */
451 s.lock();
452 auto inserted = pendingOps.insert(pair<string, boost::shared_ptr<PendingOps> >(key, pending));
453 if (!inserted.second)
454 inserted.first->second->opFlags |= pending->opFlags;
455 opsInProgress.erase(key);
456 makeJob(key);
457 objNames.erase(name);
458 return;
459 }
460 }
461
462 opsInProgress.erase(*name);
463 objNames.erase(name);
464 }
465
synchronize(const string & sourceFile,list<string>::iterator & it)466 void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator &it)
467 {
468 ScopedReadLock s(ioc, sourceFile);
469
470 string key = *it;
471 size_t pos = key.find_first_of('/');
472 bf::path prefix = key.substr(0, pos);
473 string cloudKey = key.substr(pos + 1);
474 char buf[80];
475 bool exists = false;
476 int err;
477 bf::path objectPath = cachePath/key;
478 MetadataFile md(sourceFile, MetadataFile::no_create_t(),true);
479
480 if (!md.exists())
481 {
482 logger->log(LOG_DEBUG, "synchronize(): no metadata found for %s. It must have been deleted.", sourceFile.c_str());
483 try
484 {
485 if (!bf::exists(objectPath))
486 return;
487 size_t size = bf::file_size(objectPath);
488 replicator->remove(objectPath);
489 cache->deletedObject(prefix, cloudKey, size);
490 cs->deleteObject(cloudKey);
491 }
492 catch (exception &e)
493 {
494 logger->log(LOG_DEBUG, "synchronize(): failed to remove orphaned object '%s' from the cache, got %s",
495 objectPath.string().c_str(), e.what());
496 }
497 return;
498 }
499
500 metadataObject mdEntry;
501 bool entryExists = md.getEntry(MetadataFile::getOffsetFromKey(cloudKey), &mdEntry);
502 if (!entryExists || cloudKey != mdEntry.key)
503 {
504 logger->log(LOG_DEBUG, "synchronize(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str());
505 return;
506 }
507
508 //assert(key == mdEntry->key); <-- This could fail b/c of truncation + a write/append before this job runs.
509
510 err = cs->exists(cloudKey, &exists);
511 if (err)
512 throw runtime_error(string("synchronize(): checking existence of ") + key + ", got " +
513 strerror_r(errno, buf, 80));
514 if (exists)
515 return;
516
517 exists = cache->exists(prefix, cloudKey);
518 if (!exists)
519 {
520 logger->log(LOG_DEBUG, "synchronize(): was told to upload %s but it does not exist locally", key.c_str());
521 return;
522 }
523
524 err = cs->putObject(objectPath.string(), cloudKey);
525 if (err)
526 throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80));
527
528 numBytesRead += mdEntry.length;
529 bytesReadBySync += mdEntry.length;
530 numBytesUploaded += mdEntry.length;
531 ++objectsSyncedWithNoJournal;
532 replicator->remove(objectPath, Replicator::NO_LOCAL);
533 }
534
synchronizeDelete(const string & sourceFile,list<string>::iterator & it)535 void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it)
536 {
537 ScopedWriteLock s(ioc, sourceFile);
538 string cloudKey = it->substr(it->find('/') + 1);
539 cs->deleteObject(cloudKey);
540 }
541
synchronizeWithJournal(const string & sourceFile,list<string>::iterator & lit)542 void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>::iterator &lit)
543 {
544 ScopedWriteLock s(ioc, sourceFile);
545
546 char buf[80];
547 string key = *lit;
548 size_t pos = key.find_first_of('/');
549 bf::path prefix = key.substr(0, pos);
550 string cloudKey = key.substr(pos + 1);
551
552 MetadataFile md(sourceFile, MetadataFile::no_create_t(),true);
553
554 if (!md.exists())
555 {
556 logger->log(LOG_DEBUG, "synchronizeWithJournal(): no metadata found for %s. It must have been deleted.", sourceFile.c_str());
557 try
558 {
559 bf::path objectPath = cachePath/key;
560 if (bf::exists(objectPath))
561 {
562 size_t objSize = bf::file_size(objectPath);
563 replicator->remove(objectPath);
564 cache->deletedObject(prefix, cloudKey, objSize);
565 cs->deleteObject(cloudKey);
566 }
567 bf::path jPath = journalPath/(key + ".journal");
568 if (bf::exists(jPath))
569 {
570 size_t jSize = bf::file_size(jPath);
571 replicator->remove(jPath);
572 cache->deletedJournal(prefix, jSize);
573 }
574 }
575 catch(exception &e)
576 {
577 logger->log(LOG_DEBUG, "synchronizeWithJournal(): failed to remove orphaned object '%s' from the cache, got %s",
578 (cachePath/key).string().c_str(), e.what());
579 }
580 return;
581 }
582
583 metadataObject mdEntry;
584 bool metaExists = md.getEntry(MetadataFile::getOffsetFromKey(cloudKey), &mdEntry);
585 if (!metaExists || cloudKey != mdEntry.key)
586 {
587 logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str());
588 return;
589 }
590 //assert(key == mdEntry->key); <--- I suspect this can happen in a truncate + write situation + a deep sync queue
591
592 bf::path oldCachePath = cachePath / key;
593 string journalName = (journalPath/(key + ".journal")).string();
594
595 if (!bf::exists(journalName))
596 {
597 logger->log(LOG_DEBUG, "synchronizeWithJournal(): no journal file found for %s", key.c_str());
598
599 // sanity check + add'l info. Test whether the object exists in cloud storage. If so, complain,
600 // and run synchronize() instead.
601 bool existsOnCloud;
602 int err = cs->exists(cloudKey, &existsOnCloud);
603 if (err)
604 throw runtime_error(string("Synchronizer: cs->exists() failed: ") + strerror_r(errno, buf, 80));
605 if (!existsOnCloud)
606 {
607 if (cache->exists(prefix, cloudKey))
608 {
609 logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal and does not exist in the cloud, calling "
610 "synchronize() instead. Need to explain how this happens.", key.c_str());
611 s.unlock();
612 synchronize(sourceFile, lit);
613 }
614 else
615 logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal, and does not exist in the cloud or in "
616 " the local cache. Need to explain how this happens.", key.c_str());
617 return;
618 }
619 else
620 logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal, but it does exist in the cloud. "
621 " This indicates that an overlapping syncWithJournal() call handled it properly.", key.c_str());
622
623 return;
624 }
625
626 int err;
627 boost::shared_array<uint8_t> data;
628 size_t count = 0, size = mdEntry.length, originalSize = 0;
629
630 bool oldObjIsCached = cache->exists(prefix, cloudKey);
631
632 // get the base object if it is not already cached
633 // merge it with its journal file
634 if (!oldObjIsCached)
635 {
636 err = cs->getObject(cloudKey, &data, &size);
637 if (err)
638 {
639 if (errno == ENOENT)
640 {
641 logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s does not exist in cache nor in cloud storage", key.c_str());
642 return;
643 }
644 throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80));
645 }
646
647 numBytesDownloaded += size;
648 originalSize += size;
649
650 //TODO!! This sucks. Need a way to pass in a larger array to cloud storage, and also have it not
651 // do any add'l alloc'ing or copying
652 if (size < mdEntry.length)
653 {
654 boost::shared_array<uint8_t> tmp(new uint8_t[mdEntry.length]());
655 memcpy(tmp.get(), data.get(), size);
656 memset(&tmp[size], 0, mdEntry.length - size);
657 data.swap(tmp);
658 }
659 size = mdEntry.length; // reset length. Using the metadata as a source of truth (truncation), not actual file length.
660
661 size_t _bytesRead = 0;
662 err = ioc->mergeJournalInMem(data, size, journalName.c_str(), &_bytesRead);
663 if (err)
664 {
665 if (!bf::exists(journalName))
666 logger->log(LOG_DEBUG, "synchronizeWithJournal(): journal %s was deleted mid-operation, check locking",
667 journalName.c_str());
668 else
669 logger->log(LOG_ERR, "synchronizeWithJournal(): unexpected error merging journal for %s", key.c_str());
670 return;
671 }
672 numBytesRead += _bytesRead;
673 bytesReadBySyncWithJournal += _bytesRead;
674 originalSize += _bytesRead;
675 }
676 else
677 {
678 size_t _bytesRead = 0;
679 data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, size, &_bytesRead);
680 if (!data)
681 {
682 if (!bf::exists(journalName))
683 logger->log(LOG_DEBUG, "synchronizeWithJournal(): journal %s was deleted mid-operation, check locking",
684 journalName.c_str());
685 else
686 logger->log(LOG_ERR, "synchronizeWithJournal(): unexpected error merging journal for %s", key.c_str());
687 return;
688 }
689 numBytesRead += _bytesRead;
690 bytesReadBySyncWithJournal += _bytesRead;
691 originalSize = _bytesRead;
692 }
693
694 // original size here should be == objectsize + journalsize
695
696 // get a new key for the resolved version & upload it
697 string newCloudKey = MetadataFile::getNewKeyFromOldKey(cloudKey, size);
698 string newKey = (prefix/newCloudKey).string();
699 err = cs->putObject(data, size, newCloudKey);
700 if (err)
701 {
702 // try to delete it in cloud storage... unlikely it is there in the first place, and if it is
703 // this probably won't work
704 int l_errno = errno;
705 cs->deleteObject(newCloudKey);
706 throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(l_errno, buf, 80));
707 }
708 numBytesUploaded += size;
709
710 // if the object was cached...
711 // write the new data to disk,
712 // tell the cache about the rename
713 // rename the file in any pending ops in Synchronizer
714
715 if (oldObjIsCached)
716 {
717 // Is this the only thing outside of Replicator that writes files?
718 // If so move this write loop to Replicator.
719 bf::path newCachePath = cachePath / newKey;
720 int newFD = ::open(newCachePath.string().c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
721 if (newFD < 0)
722 throw runtime_error(string("Synchronizer: Failed to open a new object in local storage! Got ")
723 + strerror_r(errno, buf, 80));
724 ScopedCloser scloser(newFD);
725
726 while (count < size)
727 {
728 err = ::write(newFD, &data[count], size - count);
729 if (err < 0)
730 {
731 ::unlink(newCachePath.string().c_str());
732 throw runtime_error(string("Synchronizer: Failed to write to a new object in local storage! Got ")
733 + strerror_r(errno, buf, 80));
734 }
735 count += err;
736 }
737 numBytesWritten += size;
738
739 size_t oldSize = bf::file_size(oldCachePath);
740
741 cache->rename(prefix, cloudKey, newCloudKey, size - oldSize);
742 replicator->remove(oldCachePath);
743
744 // This condition is probably irrelevant for correct functioning now,
745 // but it should be very rare so what the hell.
746 if (oldSize != MetadataFile::getLengthFromKey(cloudKey))
747 {
748 ostringstream oss;
749 oss << "Synchronizer::synchronizeWithJournal(): detected a mismatch between file size and " <<
750 "length stored in the object name. object name = " << cloudKey << " length-in-name = " <<
751 MetadataFile::getLengthFromKey(cloudKey) << " real-length = " << oldSize;
752 logger->log(LOG_WARNING, oss.str().c_str());
753 }
754 }
755
756 mergeDiff += size - originalSize;
757 ++journalsMerged;
758 // update the metadata for the source file
759
760 md.updateEntry(MetadataFile::getOffsetFromKey(cloudKey), newCloudKey, size);
761 replicator->updateMetadata(md);
762
763 rename(key, newKey);
764
765 // delete the old object & journal file
766 cache->deletedJournal(prefix, bf::file_size(journalName));
767 replicator->remove(journalName);
768 cs->deleteObject(cloudKey);
769 }
770
rename(const string & oldKey,const string & newKey)771 void Synchronizer::rename(const string &oldKey, const string &newKey)
772 {
773 boost::unique_lock<boost::mutex> s(mutex);
774
775 auto it = pendingOps.find(oldKey);
776 if (it != pendingOps.end())
777 {
778 pendingOps[newKey] = it->second;
779 pendingOps.erase(it);
780 }
781 it = opsInProgress.find(oldKey);
782 if (it != opsInProgress.end())
783 {
784 opsInProgress[newKey] = it->second;
785 opsInProgress.erase(it);
786 }
787
788 for (auto &name: objNames)
789 if (name == oldKey)
790 name = newKey;
791 }
792
getJournalPath()793 bf::path Synchronizer::getJournalPath()
794 {
795 return journalPath;
796 }
797
getCachePath()798 bf::path Synchronizer::getCachePath()
799 {
800 return cachePath;
801 }
802
printKPIs() const803 void Synchronizer::printKPIs() const
804 {
805 cout << "Synchronizer" << endl;
806 cout << "\tnumBytesRead: " << numBytesRead << endl;
807 cout << "\tbytesReadBySync: " << bytesReadBySync << endl;
808 cout << "\tbytesReadBySyncWithJournal: " << bytesReadBySyncWithJournal << endl;
809 cout << "\tnumBytesWritten: " << numBytesWritten << endl;
810 cout << "\tnumBytesUploaded: " << numBytesUploaded << endl;
811 cout << "\tnumBytesDownloaded: " << numBytesDownloaded << endl;
812 cout << "\tmergeDiff: " << mergeDiff << endl;
813 cout << "\tflushesTriggeredBySize: " << flushesTriggeredBySize << endl;
814 cout << "\tflushesTriggeredByTimer: " << flushesTriggeredByTimer << endl;
815 cout << "\tjournalsMerged: " << journalsMerged << endl;
816 cout << "\tobjectsSyncedWithNoJournal: " << objectsSyncedWithNoJournal << endl;
817 }
818
819 /* The helper objects & fcns */
820
PendingOps(int flags)821 Synchronizer::PendingOps::PendingOps(int flags) : opFlags(flags), waiters(0), finished(false)
822 {
823 }
824
~PendingOps()825 Synchronizer::PendingOps::~PendingOps()
826 {
827 assert(waiters == 0);
828 }
829
notify()830 void Synchronizer::PendingOps::notify()
831 {
832 finished = true;
833 condvar.notify_all();
834 }
835
wait(boost::mutex * m)836 void Synchronizer::PendingOps::wait(boost::mutex *m)
837 {
838 while (!finished)
839 {
840 waiters++;
841 condvar.wait(*m);
842 waiters--;
843 }
844 }
845
Job(Synchronizer * s,std::list<std::string>::iterator i)846 Synchronizer::Job::Job(Synchronizer *s, std::list<std::string>::iterator i) : sync(s), it(i)
847 { }
848
operator ()()849 void Synchronizer::Job::operator()()
850 {
851 sync->process(it);
852 }
853
configListener()854 void Synchronizer::configListener()
855 {
856 // Uploader threads
857 string stmp = Config::get()->getValue("ObjectStorage", "max_concurrent_uploads");
858 if (maxUploads == 0)
859 maxUploads = 20;
860 if (stmp.empty())
861 {
862 logger->log(LOG_CRIT, "max_concurrent_uploads is not set. Using current value = %u",maxUploads);
863 }
864 try
865 {
866 uint newValue = stoul(stmp);
867 if (newValue != maxUploads)
868 {
869 maxUploads = newValue;
870 threadPool->setMaxThreads(maxUploads);
871 logger->log(LOG_INFO, "max_concurrent_uploads = %u",maxUploads);
872 }
873 }
874 catch (invalid_argument &)
875 {
876 logger->log(LOG_CRIT, "max_concurrent_uploads is not a number. Using current value = %u",maxUploads);
877 }
878 }
879 }
880