1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <atomic> 20 #include <chrono> 21 #include <condition_variable> 22 #include <future> 23 #include <map> 24 #include <thread> 25 26 #include <folly/Executor.h> 27 #include <folly/dynamic.h> 28 #include <folly/synchronization/SaturatingSemaphore.h> 29 #include <wangle/client/persistence/LRUInMemoryCache.h> 30 #include <wangle/client/persistence/PersistentCache.h> 31 #include <wangle/client/persistence/PersistentCacheCommon.h> 32 33 namespace wangle { 34 35 /** 36 * The underlying persistence layer interface. Implementations may 37 * write to file, db, /dev/null, etc. 38 */ 39 class CachePersistence { 40 public: 41 virtual ~CachePersistence() = default; 42 43 /** 44 * Persist a folly::dynamic array of key value pairs at the 45 * specified version. Returns true if persistence succeeded. 46 */ persistVersionedData(const folly::dynamic & kvPairs,const CacheDataVersion & version)47 bool persistVersionedData( 48 const folly::dynamic& kvPairs, 49 const CacheDataVersion& version) { 50 auto result = persist(kvPairs); 51 if (result) { 52 persistedVersion_ = version; 53 } 54 return result; 55 } 56 57 /** 58 * Get the last version of the data that was successfully persisted. 59 */ getLastPersistedVersion()60 virtual CacheDataVersion getLastPersistedVersion() const { 61 return persistedVersion_; 62 } 63 64 /** 65 * Force set a persisted version. This is primarily for when a persistence 66 * layer acts as the initial source of data for some version tracking cache. 67 */ setPersistedVersion(CacheDataVersion version)68 virtual void setPersistedVersion(CacheDataVersion version) noexcept { 69 persistedVersion_ = version; 70 } 71 72 /** 73 * Persist a folly::dynamic array of key value pairs. 74 * Returns true on success. 75 */ 76 virtual bool persist(const folly::dynamic& kvPairs) noexcept = 0; 77 78 /** 79 * Returns a list of key value pairs that are present in this 80 * persistence store. 81 */ 82 virtual folly::Optional<folly::dynamic> load() noexcept = 0; 83 84 /** 85 * Clears Persistent cache 86 */ 87 virtual void clear() = 0; 88 89 private: 90 CacheDataVersion persistedVersion_{kDefaultInitCacheDataVersion}; 91 }; 92 93 /** 94 * A PersistentCache implementation that used a CachePersistence for 95 * storage. In memory structure fronts the persistence and the cache 96 * operations happen on it. Loading from and syncing to persistence are 97 * hidden from clients. Sync to persistence happens asynchronously on 98 * a separate thread at a configurable interval. Syncs to persistence 99 * on destruction as well. 100 * 101 * The in memory structure is an EvictingCacheMap which causes this class 102 * to evict entries in an LRU fashion. 103 * 104 * NOTE NOTE NOTE: Although this class aims to be a cache for arbitrary, 105 * it relies heavily on folly::toJson, folly::dynamic and convertTo for 106 * serialization and deserialization. So It may not suit your need until 107 * true support arbitrary types is written. 108 */ 109 template <typename K, typename V, typename MutexT = std::mutex> 110 class LRUPersistentCache 111 : public PersistentCache<K, V>, 112 public std::enable_shared_from_this<LRUPersistentCache<K, V, MutexT>> { 113 public: 114 using Ptr = std::shared_ptr<LRUPersistentCache<K, V, MutexT>>; 115 116 /** 117 * LRUPersistentCache constructor 118 * 119 * On write failures, the sync will happen again up to nSyncRetries times. 120 * Once failed nSyncRetries amount of time, then it will give up and not 121 * attempt to sync again until another update occurs. 122 * 123 * On reaching capacity limit, LRU items are evicted. 124 */ 125 explicit LRUPersistentCache( 126 PersistentCacheConfig config, 127 std::unique_ptr<CachePersistence> persistence = nullptr); 128 129 /** 130 * LRUPersistentCache Destructor 131 * 132 * Signals the syncer thread to stop, waits for any pending syncs to 133 * be done. 134 */ 135 ~LRUPersistentCache() override; 136 137 /** 138 * Loads the cache inline on the calling thread, and starts of the syncer 139 * thread that periodically syncs the cache to persistence if the cache is 140 * running thread mode. 141 * 142 * If persistence is specified from constructor, the cache is initially loaded 143 * with the contents from it. If load fails, then cache starts empty. 144 */ 145 void init(); 146 147 /** 148 * Check if there are updates that need to be synced to persistence 149 */ 150 bool hasPendingUpdates(); 151 152 /** 153 * PersistentCache operations 154 */ get(const K & key)155 folly::Optional<V> get(const K& key) override { 156 return blockingAccessInMemCache().get(key); 157 } 158 159 void put(const K& key, const V& val) override; 160 remove(const K & key)161 bool remove(const K& key) override { 162 return blockingAccessInMemCache().remove(key); 163 } 164 165 void clear(bool clearPersistence = false) override { 166 blockingAccessInMemCache().clear(); 167 if (clearPersistence) { 168 auto persistence = getPersistence(); 169 if (persistence) { 170 persistence->clear(); 171 } 172 } 173 } 174 size()175 size_t size() override { 176 return blockingAccessInMemCache().size(); 177 } 178 179 private: 180 LRUPersistentCache(const LRUPersistentCache&) = delete; 181 LRUPersistentCache& operator=(const LRUPersistentCache&) = delete; 182 183 /** 184 * Helper to set persistence that will load the persistence data 185 * into memory and optionally sync versions 186 */ 187 void setPersistenceHelper(bool syncVersion) noexcept; 188 189 /** 190 * Load the contents of the persistence passed to constructor in to the 191 * in-memory cache. Failure to read will result in no changes to the 192 * in-memory data. That is, if in-memory entries exist, and loading 193 * fails, the in-memory data remains and will sync down to the underlying 194 * persistence layer on the next sync. 195 * 196 * Failure to read inclues IO errors and deserialization errors. 197 * 198 * @returns the in memory cache's new version 199 */ 200 folly::Optional<CacheDataVersion> load( 201 CachePersistence& persistence) noexcept; 202 203 /** 204 * The syncer thread's function. Syncs to the persistence, if necessary, 205 * after every syncInterval_ seconds. 206 */ 207 void sync(); 208 void oneShotSync(); 209 static void* syncThreadMain(void* arg); 210 211 /** 212 * Helper to sync routine above that actualy does the serialization 213 * and writes to persistence. 214 * 215 * @returns boolean, true on successful serialization and write to 216 * persistence, false otherwise 217 */ 218 bool syncNow(CachePersistence& persistence); 219 220 /** 221 * Helper to get the persistence layer under lock since it will be called 222 * by syncer thread and setters call from any thread. 223 */ 224 std::shared_ptr<CachePersistence> getPersistence(); 225 226 /** 227 * Block the caller thread until persistence has been loaded into the 228 * in-memory cache. Return cache_ after persistence loading is done. 229 */ 230 LRUInMemoryCache<K, V, MutexT>& blockingAccessInMemCache(); 231 232 private: 233 // Our threadsafe in memory cache 234 LRUInMemoryCache<K, V, MutexT> cache_; 235 236 // used to signal syncer thread 237 bool stopSyncer_{false}; 238 // mutex used to synchronize syncer_ on destruction, tied to stopSyncerCV_ 239 std::mutex stopSyncerMutex_; 240 // condvar used to wakeup syncer on exit 241 std::condition_variable stopSyncerCV_; 242 243 // We do not schedule the same task more than one to the executor 244 std::atomic_flag executorScheduled_ = ATOMIC_FLAG_INIT; 245 246 // sync interval in milliseconds 247 const std::chrono::milliseconds syncInterval_{ 248 client::persistence::DEFAULT_CACHE_SYNC_INTERVAL}; 249 // limit on no. of sync attempts 250 const int nSyncRetries_{client::persistence::DEFAULT_CACHE_SYNC_RETRIES}; 251 // Sync try count across executor tasks 252 int nSyncTries_{0}; 253 std::chrono::steady_clock::time_point lastExecutorScheduleTime_; 254 255 // persistence layer 256 // we use a shared pointer since the syncer thread might be operating on 257 // it when the client decides to set a new one 258 std::shared_ptr<CachePersistence> persistence_; 259 // for locking access to persistence set/get 260 MutexT persistenceLock_; 261 262 // thread for periodic sync 263 std::thread syncer_; 264 265 // executor for periodic sync. 266 std::shared_ptr<folly::Executor> executor_; 267 268 // Semaphore to synchronize persistence loading and operations on the cache. 269 folly::SaturatingSemaphore<true /* MayBlock */> persistenceLoadedSemaphore_; 270 271 // Whether the persistence will be loaded inline. 272 const bool inlinePersistenceLoading_; 273 }; 274 275 } // namespace wangle 276 277 #include <wangle/client/persistence/LRUPersistentCache-inl.h> 278