1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/DynamicConverter.h>
20 #include <folly/FileUtil.h>
21 #include <folly/ScopeGuard.h>
22 #include <folly/json.h>
23 #include <folly/portability/SysTime.h>
24 #include <folly/system/ThreadName.h>
25 #include <atomic>
26 #include <cerrno>
27 #include <functional>
28 
29 namespace wangle {
30 
31 template <typename K, typename V, typename MutexT>
LRUPersistentCache(PersistentCacheConfig config,std::unique_ptr<CachePersistence> persistence)32 LRUPersistentCache<K, V, MutexT>::LRUPersistentCache(
33     PersistentCacheConfig config,
34     std::unique_ptr<CachePersistence> persistence)
35     : cache_(config.capacity),
36       syncInterval_(config.syncInterval),
37       nSyncRetries_(config.nSyncRetries),
38       executor_(std::move(config.executor)),
39       inlinePersistenceLoading_(config.inlinePersistenceLoading) {
40   if (persistence) {
41     std::shared_ptr<CachePersistence> sharedPersistence(std::move(persistence));
42     {
43       typename wangle::CacheLockGuard<MutexT>::Write writeLock(
44           persistenceLock_);
45       std::swap(persistence_, sharedPersistence);
46     }
47   }
48 }
49 
50 template <typename K, typename V, typename MutexT>
~LRUPersistentCache()51 LRUPersistentCache<K, V, MutexT>::~LRUPersistentCache() {
52   if (executor_) {
53     oneShotSync();
54     return;
55   }
56   {
57     // tell syncer to wake up and quit
58     std::lock_guard<std::mutex> lock(stopSyncerMutex_);
59     stopSyncer_ = true;
60     stopSyncerCV_.notify_all();
61   }
62   if (syncer_.joinable()) {
63     syncer_.join();
64   }
65 }
66 
67 template <typename K, typename V, typename MutexT>
init()68 void LRUPersistentCache<K, V, MutexT>::init() {
69   if (inlinePersistenceLoading_) {
70     // load the cache. be silent if load fails, we just drop the cache
71     // and start from scratch.
72     setPersistenceHelper(true);
73   }
74   if (!executor_) {
75     // start the syncer thread. done at the end of construction so that the
76     // cache is fully initialized before being passed to the syncer thread.
77     syncer_ =
78         std::thread(&LRUPersistentCache<K, V, MutexT>::syncThreadMain, this);
79     return;
80   }
81   executor_->add([self = folly::to_weak_ptr(this->shared_from_this())]() {
82     if (auto sharedSelf = self.lock()) {
83       sharedSelf->setPersistenceHelper(true);
84     }
85   });
86 }
87 
88 template <typename K, typename V, typename MutexT>
put(const K & key,const V & val)89 void LRUPersistentCache<K, V, MutexT>::put(const K& key, const V& val) {
90   blockingAccessInMemCache().put(key, val);
91 
92   if (!executor_) {
93     return;
94   }
95 
96   // Within the same time interval as the last sync
97   if (std::chrono::steady_clock::now() - lastExecutorScheduleTime_ <
98       syncInterval_) {
99     return;
100   }
101 
102   // Sync already scheduled
103   if (executorScheduled_.test_and_set()) {
104     return;
105   }
106 
107   lastExecutorScheduleTime_ = std::chrono::steady_clock::now();
108   std::weak_ptr<LRUPersistentCache<K, V, MutexT>> weakSelf =
109       this->shared_from_this();
110   executor_->add([self = std::move(weakSelf)]() {
111     if (auto sharedSelf = self.lock()) {
112       sharedSelf->oneShotSync();
113     }
114   });
115 }
116 
117 template <typename K, typename V, typename MutexT>
hasPendingUpdates()118 bool LRUPersistentCache<K, V, MutexT>::hasPendingUpdates() {
119   typename wangle::CacheLockGuard<MutexT>::Read readLock(persistenceLock_);
120   if (!persistence_) {
121     return false;
122   }
123   return blockingAccessInMemCache().hasChangedSince(
124       persistence_->getLastPersistedVersion());
125 }
126 
127 template <typename K, typename V, typename MutexT>
syncThreadMain(void * arg)128 void* LRUPersistentCache<K, V, MutexT>::syncThreadMain(void* arg) {
129   folly::setThreadName("lru-sync-thread");
130 
131   auto self = static_cast<LRUPersistentCache<K, V, MutexT>*>(arg);
132   self->sync();
133   return nullptr;
134 }
135 
136 template <typename K, typename V, typename MutexT>
oneShotSync()137 void LRUPersistentCache<K, V, MutexT>::oneShotSync() {
138   // load the cache. be silent if load fails, we just drop the cache
139   // and start from scratch.
140   setPersistenceHelper(true);
141   auto persistence = getPersistence();
142   if (persistence && !syncNow(*persistence)) {
143     // track failures and give up if we tried too many times
144     ++nSyncTries_;
145     if (nSyncTries_ == nSyncRetries_) {
146       persistence->setPersistedVersion(cache_.getVersion());
147       nSyncTries_ = 0;
148     }
149   } else {
150     nSyncTries_ = 0;
151   }
152 
153   executorScheduled_.clear();
154 }
155 
156 template <typename K, typename V, typename MutexT>
sync()157 void LRUPersistentCache<K, V, MutexT>::sync() {
158   // load the cache. be silent if load fails, we just drop the cache
159   // and start from scratch.
160   setPersistenceHelper(true);
161   // keep running as long the destructor signals to stop or
162   // there are pending updates that are not synced yet
163   std::unique_lock<std::mutex> stopSyncerLock(stopSyncerMutex_);
164   int nSyncFailures = 0;
165   while (true) {
166     auto persistence = getPersistence();
167     if (stopSyncer_) {
168       if (!persistence ||
169           !cache_.hasChangedSince(persistence->getLastPersistedVersion())) {
170         break;
171       }
172     }
173 
174     if (persistence && !syncNow(*persistence)) {
175       // track failures and give up if we tried too many times
176       ++nSyncFailures;
177       if (nSyncFailures == nSyncRetries_) {
178         persistence->setPersistedVersion(cache_.getVersion());
179         nSyncFailures = 0;
180       }
181     } else {
182       nSyncFailures = 0;
183     }
184 
185     if (!stopSyncer_) {
186       stopSyncerCV_.wait_for(stopSyncerLock, syncInterval_);
187     }
188   }
189 }
190 
191 template <typename K, typename V, typename MutexT>
syncNow(CachePersistence & persistence)192 bool LRUPersistentCache<K, V, MutexT>::syncNow(CachePersistence& persistence) {
193   // check if we need to sync.  There is a chance that someone can
194   // update cache_ between this check and the convert below, but that
195   // is ok.  The persistence layer would have needed to update anyway
196   // and will just get the latest version.
197   if (!cache_.hasChangedSince(persistence.getLastPersistedVersion())) {
198     // nothing to do
199     return true;
200   }
201 
202   // serialize the current contents of cache under lock
203   auto serializedCacheAndVersion = cache_.convertToKeyValuePairs();
204   if (!serializedCacheAndVersion) {
205     LOG(ERROR) << "Failed to convert cache for serialization.";
206     return false;
207   }
208 
209   auto& kvPairs = std::get<0>(serializedCacheAndVersion.value());
210   auto& version = std::get<1>(serializedCacheAndVersion.value());
211   auto persisted =
212       persistence.persistVersionedData(std::move(kvPairs), version);
213 
214   return persisted;
215 }
216 
217 template <typename K, typename V, typename MutexT>
218 std::shared_ptr<CachePersistence>
getPersistence()219 LRUPersistentCache<K, V, MutexT>::getPersistence() {
220   typename wangle::CacheLockGuard<MutexT>::Read readLock(persistenceLock_);
221   return persistence_;
222 }
223 
224 template <typename K, typename V, typename MutexT>
setPersistenceHelper(bool syncVersion)225 void LRUPersistentCache<K, V, MutexT>::setPersistenceHelper(
226     bool syncVersion) noexcept {
227   typename wangle::CacheLockGuard<MutexT>::Write writeLock(persistenceLock_);
228   if (persistenceLoadedSemaphore_.ready()) {
229     return;
230   }
231   // load the persistence data into memory
232   if (persistence_) {
233     auto version = load(*persistence_);
234     if (version && syncVersion) {
235       persistence_->setPersistedVersion(*version);
236     }
237   }
238   persistenceLoadedSemaphore_.post();
239 }
240 
241 template <typename K, typename V, typename MutexT>
242 LRUInMemoryCache<K, V, MutexT>&
blockingAccessInMemCache()243 LRUPersistentCache<K, V, MutexT>::blockingAccessInMemCache() {
244   persistenceLoadedSemaphore_.wait();
245   return cache_;
246 }
247 
248 template <typename K, typename V, typename MutexT>
load(CachePersistence & persistence)249 folly::Optional<CacheDataVersion> LRUPersistentCache<K, V, MutexT>::load(
250     CachePersistence& persistence) noexcept {
251   auto kvPairs = persistence.load();
252   if (!kvPairs) {
253     return folly::none;
254   }
255   return cache_.loadData(kvPairs.value());
256 }
257 } // namespace wangle
258