1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <folly/DynamicConverter.h>
20 #include <folly/FileUtil.h>
21 #include <folly/ScopeGuard.h>
22 #include <folly/json.h>
23 #include <folly/portability/SysTime.h>
24 #include <folly/system/ThreadName.h>
25 #include <atomic>
26 #include <cerrno>
27 #include <functional>
28
29 namespace wangle {
30
31 template <typename K, typename V, typename MutexT>
LRUPersistentCache(PersistentCacheConfig config,std::unique_ptr<CachePersistence> persistence)32 LRUPersistentCache<K, V, MutexT>::LRUPersistentCache(
33 PersistentCacheConfig config,
34 std::unique_ptr<CachePersistence> persistence)
35 : cache_(config.capacity),
36 syncInterval_(config.syncInterval),
37 nSyncRetries_(config.nSyncRetries),
38 executor_(std::move(config.executor)),
39 inlinePersistenceLoading_(config.inlinePersistenceLoading) {
40 if (persistence) {
41 std::shared_ptr<CachePersistence> sharedPersistence(std::move(persistence));
42 {
43 typename wangle::CacheLockGuard<MutexT>::Write writeLock(
44 persistenceLock_);
45 std::swap(persistence_, sharedPersistence);
46 }
47 }
48 }
49
50 template <typename K, typename V, typename MutexT>
~LRUPersistentCache()51 LRUPersistentCache<K, V, MutexT>::~LRUPersistentCache() {
52 if (executor_) {
53 oneShotSync();
54 return;
55 }
56 {
57 // tell syncer to wake up and quit
58 std::lock_guard<std::mutex> lock(stopSyncerMutex_);
59 stopSyncer_ = true;
60 stopSyncerCV_.notify_all();
61 }
62 if (syncer_.joinable()) {
63 syncer_.join();
64 }
65 }
66
67 template <typename K, typename V, typename MutexT>
init()68 void LRUPersistentCache<K, V, MutexT>::init() {
69 if (inlinePersistenceLoading_) {
70 // load the cache. be silent if load fails, we just drop the cache
71 // and start from scratch.
72 setPersistenceHelper(true);
73 }
74 if (!executor_) {
75 // start the syncer thread. done at the end of construction so that the
76 // cache is fully initialized before being passed to the syncer thread.
77 syncer_ =
78 std::thread(&LRUPersistentCache<K, V, MutexT>::syncThreadMain, this);
79 return;
80 }
81 executor_->add([self = folly::to_weak_ptr(this->shared_from_this())]() {
82 if (auto sharedSelf = self.lock()) {
83 sharedSelf->setPersistenceHelper(true);
84 }
85 });
86 }
87
88 template <typename K, typename V, typename MutexT>
put(const K & key,const V & val)89 void LRUPersistentCache<K, V, MutexT>::put(const K& key, const V& val) {
90 blockingAccessInMemCache().put(key, val);
91
92 if (!executor_) {
93 return;
94 }
95
96 // Within the same time interval as the last sync
97 if (std::chrono::steady_clock::now() - lastExecutorScheduleTime_ <
98 syncInterval_) {
99 return;
100 }
101
102 // Sync already scheduled
103 if (executorScheduled_.test_and_set()) {
104 return;
105 }
106
107 lastExecutorScheduleTime_ = std::chrono::steady_clock::now();
108 std::weak_ptr<LRUPersistentCache<K, V, MutexT>> weakSelf =
109 this->shared_from_this();
110 executor_->add([self = std::move(weakSelf)]() {
111 if (auto sharedSelf = self.lock()) {
112 sharedSelf->oneShotSync();
113 }
114 });
115 }
116
117 template <typename K, typename V, typename MutexT>
hasPendingUpdates()118 bool LRUPersistentCache<K, V, MutexT>::hasPendingUpdates() {
119 typename wangle::CacheLockGuard<MutexT>::Read readLock(persistenceLock_);
120 if (!persistence_) {
121 return false;
122 }
123 return blockingAccessInMemCache().hasChangedSince(
124 persistence_->getLastPersistedVersion());
125 }
126
127 template <typename K, typename V, typename MutexT>
syncThreadMain(void * arg)128 void* LRUPersistentCache<K, V, MutexT>::syncThreadMain(void* arg) {
129 folly::setThreadName("lru-sync-thread");
130
131 auto self = static_cast<LRUPersistentCache<K, V, MutexT>*>(arg);
132 self->sync();
133 return nullptr;
134 }
135
136 template <typename K, typename V, typename MutexT>
oneShotSync()137 void LRUPersistentCache<K, V, MutexT>::oneShotSync() {
138 // load the cache. be silent if load fails, we just drop the cache
139 // and start from scratch.
140 setPersistenceHelper(true);
141 auto persistence = getPersistence();
142 if (persistence && !syncNow(*persistence)) {
143 // track failures and give up if we tried too many times
144 ++nSyncTries_;
145 if (nSyncTries_ == nSyncRetries_) {
146 persistence->setPersistedVersion(cache_.getVersion());
147 nSyncTries_ = 0;
148 }
149 } else {
150 nSyncTries_ = 0;
151 }
152
153 executorScheduled_.clear();
154 }
155
156 template <typename K, typename V, typename MutexT>
sync()157 void LRUPersistentCache<K, V, MutexT>::sync() {
158 // load the cache. be silent if load fails, we just drop the cache
159 // and start from scratch.
160 setPersistenceHelper(true);
161 // keep running as long the destructor signals to stop or
162 // there are pending updates that are not synced yet
163 std::unique_lock<std::mutex> stopSyncerLock(stopSyncerMutex_);
164 int nSyncFailures = 0;
165 while (true) {
166 auto persistence = getPersistence();
167 if (stopSyncer_) {
168 if (!persistence ||
169 !cache_.hasChangedSince(persistence->getLastPersistedVersion())) {
170 break;
171 }
172 }
173
174 if (persistence && !syncNow(*persistence)) {
175 // track failures and give up if we tried too many times
176 ++nSyncFailures;
177 if (nSyncFailures == nSyncRetries_) {
178 persistence->setPersistedVersion(cache_.getVersion());
179 nSyncFailures = 0;
180 }
181 } else {
182 nSyncFailures = 0;
183 }
184
185 if (!stopSyncer_) {
186 stopSyncerCV_.wait_for(stopSyncerLock, syncInterval_);
187 }
188 }
189 }
190
191 template <typename K, typename V, typename MutexT>
syncNow(CachePersistence & persistence)192 bool LRUPersistentCache<K, V, MutexT>::syncNow(CachePersistence& persistence) {
193 // check if we need to sync. There is a chance that someone can
194 // update cache_ between this check and the convert below, but that
195 // is ok. The persistence layer would have needed to update anyway
196 // and will just get the latest version.
197 if (!cache_.hasChangedSince(persistence.getLastPersistedVersion())) {
198 // nothing to do
199 return true;
200 }
201
202 // serialize the current contents of cache under lock
203 auto serializedCacheAndVersion = cache_.convertToKeyValuePairs();
204 if (!serializedCacheAndVersion) {
205 LOG(ERROR) << "Failed to convert cache for serialization.";
206 return false;
207 }
208
209 auto& kvPairs = std::get<0>(serializedCacheAndVersion.value());
210 auto& version = std::get<1>(serializedCacheAndVersion.value());
211 auto persisted =
212 persistence.persistVersionedData(std::move(kvPairs), version);
213
214 return persisted;
215 }
216
217 template <typename K, typename V, typename MutexT>
218 std::shared_ptr<CachePersistence>
getPersistence()219 LRUPersistentCache<K, V, MutexT>::getPersistence() {
220 typename wangle::CacheLockGuard<MutexT>::Read readLock(persistenceLock_);
221 return persistence_;
222 }
223
224 template <typename K, typename V, typename MutexT>
setPersistenceHelper(bool syncVersion)225 void LRUPersistentCache<K, V, MutexT>::setPersistenceHelper(
226 bool syncVersion) noexcept {
227 typename wangle::CacheLockGuard<MutexT>::Write writeLock(persistenceLock_);
228 if (persistenceLoadedSemaphore_.ready()) {
229 return;
230 }
231 // load the persistence data into memory
232 if (persistence_) {
233 auto version = load(*persistence_);
234 if (version && syncVersion) {
235 persistence_->setPersistedVersion(*version);
236 }
237 }
238 persistenceLoadedSemaphore_.post();
239 }
240
241 template <typename K, typename V, typename MutexT>
242 LRUInMemoryCache<K, V, MutexT>&
blockingAccessInMemCache()243 LRUPersistentCache<K, V, MutexT>::blockingAccessInMemCache() {
244 persistenceLoadedSemaphore_.wait();
245 return cache_;
246 }
247
248 template <typename K, typename V, typename MutexT>
load(CachePersistence & persistence)249 folly::Optional<CacheDataVersion> LRUPersistentCache<K, V, MutexT>::load(
250 CachePersistence& persistence) noexcept {
251 auto kvPairs = persistence.load();
252 if (!kvPairs) {
253 return folly::none;
254 }
255 return cache_.loadData(kvPairs.value());
256 }
257 } // namespace wangle
258