1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <atomic>
20 #include <chrono>
21 #include <condition_variable>
22 #include <future>
23 #include <map>
24 #include <thread>
25 
26 #include <folly/Executor.h>
27 #include <folly/dynamic.h>
28 #include <folly/synchronization/SaturatingSemaphore.h>
29 #include <wangle/client/persistence/LRUInMemoryCache.h>
30 #include <wangle/client/persistence/PersistentCache.h>
31 #include <wangle/client/persistence/PersistentCacheCommon.h>
32 
33 namespace wangle {
34 
35 /**
36  * The underlying persistence layer interface.  Implementations may
37  * write to file, db, /dev/null, etc.
38  */
39 class CachePersistence {
40  public:
41   virtual ~CachePersistence() = default;
42 
43   /**
44    * Persist a folly::dynamic array of key value pairs at the
45    * specified version.  Returns true if persistence succeeded.
46    */
persistVersionedData(const folly::dynamic & kvPairs,const CacheDataVersion & version)47   bool persistVersionedData(
48       const folly::dynamic& kvPairs,
49       const CacheDataVersion& version) {
50     auto result = persist(kvPairs);
51     if (result) {
52       persistedVersion_ = version;
53     }
54     return result;
55   }
56 
57   /**
58    * Get the last version of the data that was successfully persisted.
59    */
getLastPersistedVersion()60   virtual CacheDataVersion getLastPersistedVersion() const {
61     return persistedVersion_;
62   }
63 
64   /**
65    * Force set a persisted version.  This is primarily for when a persistence
66    * layer acts as the initial source of data for some version tracking cache.
67    */
setPersistedVersion(CacheDataVersion version)68   virtual void setPersistedVersion(CacheDataVersion version) noexcept {
69     persistedVersion_ = version;
70   }
71 
72   /**
73    * Persist a folly::dynamic array of key value pairs.
74    * Returns true on success.
75    */
76   virtual bool persist(const folly::dynamic& kvPairs) noexcept = 0;
77 
78   /**
79    * Returns a list of key value pairs that are present in this
80    * persistence store.
81    */
82   virtual folly::Optional<folly::dynamic> load() noexcept = 0;
83 
84   /**
85    * Clears Persistent cache
86    */
87   virtual void clear() = 0;
88 
89  private:
90   CacheDataVersion persistedVersion_{kDefaultInitCacheDataVersion};
91 };
92 
93 /**
94  * A PersistentCache implementation that used a CachePersistence for
95  * storage. In memory structure fronts the persistence and the cache
96  * operations happen on it. Loading from and syncing to persistence are
97  * hidden from clients. Sync to persistence happens asynchronously on
98  * a separate thread at a configurable interval. Syncs to persistence
99  * on destruction as well.
100  *
101  * The in memory structure is an EvictingCacheMap which causes this class
102  * to evict entries in an LRU fashion.
103  *
104  * NOTE NOTE NOTE: Although this class aims to be a cache for arbitrary,
105  * it relies heavily on folly::toJson, folly::dynamic and convertTo for
106  * serialization and deserialization. So It may not suit your need until
107  * true support arbitrary types is written.
108  */
109 template <typename K, typename V, typename MutexT = std::mutex>
110 class LRUPersistentCache
111     : public PersistentCache<K, V>,
112       public std::enable_shared_from_this<LRUPersistentCache<K, V, MutexT>> {
113  public:
114   using Ptr = std::shared_ptr<LRUPersistentCache<K, V, MutexT>>;
115 
116   /**
117    * LRUPersistentCache constructor
118    *
119    * On write failures, the sync will happen again up to nSyncRetries times.
120    * Once failed nSyncRetries amount of time, then it will give up and not
121    * attempt to sync again until another update occurs.
122    *
123    * On reaching capacity limit, LRU items are evicted.
124    */
125   explicit LRUPersistentCache(
126       PersistentCacheConfig config,
127       std::unique_ptr<CachePersistence> persistence = nullptr);
128 
129   /**
130    * LRUPersistentCache Destructor
131    *
132    * Signals the syncer thread to stop, waits for any pending syncs to
133    * be done.
134    */
135   ~LRUPersistentCache() override;
136 
137   /**
138    * Loads the cache inline on the calling thread, and starts of the syncer
139    * thread that periodically syncs the cache to persistence if the cache is
140    * running thread mode.
141    *
142    * If persistence is specified from constructor, the cache is initially loaded
143    * with the contents from it. If load fails, then cache starts empty.
144    */
145   void init();
146 
147   /**
148    * Check if there are updates that need to be synced to persistence
149    */
150   bool hasPendingUpdates();
151 
152   /**
153    * PersistentCache operations
154    */
get(const K & key)155   folly::Optional<V> get(const K& key) override {
156     return blockingAccessInMemCache().get(key);
157   }
158 
159   void put(const K& key, const V& val) override;
160 
remove(const K & key)161   bool remove(const K& key) override {
162     return blockingAccessInMemCache().remove(key);
163   }
164 
165   void clear(bool clearPersistence = false) override {
166     blockingAccessInMemCache().clear();
167     if (clearPersistence) {
168       auto persistence = getPersistence();
169       if (persistence) {
170         persistence->clear();
171       }
172     }
173   }
174 
size()175   size_t size() override {
176     return blockingAccessInMemCache().size();
177   }
178 
179  private:
180   LRUPersistentCache(const LRUPersistentCache&) = delete;
181   LRUPersistentCache& operator=(const LRUPersistentCache&) = delete;
182 
183   /**
184    * Helper to set persistence that will load the persistence data
185    * into memory and optionally sync versions
186    */
187   void setPersistenceHelper(bool syncVersion) noexcept;
188 
189   /**
190    * Load the contents of the persistence passed to constructor in to the
191    * in-memory cache. Failure to read will result in no changes to the
192    * in-memory data.  That is, if in-memory entries exist, and loading
193    * fails, the in-memory data remains and will sync down to the underlying
194    * persistence layer on the next sync.
195    *
196    * Failure to read inclues IO errors and deserialization errors.
197    *
198    * @returns the in memory cache's new version
199    */
200   folly::Optional<CacheDataVersion> load(
201       CachePersistence& persistence) noexcept;
202 
203   /**
204    * The syncer thread's function. Syncs to the persistence, if necessary,
205    * after every syncInterval_ seconds.
206    */
207   void sync();
208   void oneShotSync();
209   static void* syncThreadMain(void* arg);
210 
211   /**
212    * Helper to sync routine above that actualy does the serialization
213    * and writes to persistence.
214    *
215    * @returns boolean, true on successful serialization and write to
216    *                    persistence, false otherwise
217    */
218   bool syncNow(CachePersistence& persistence);
219 
220   /**
221    * Helper to get the persistence layer under lock since it will be called
222    * by syncer thread and setters call from any thread.
223    */
224   std::shared_ptr<CachePersistence> getPersistence();
225 
226   /**
227    * Block the caller thread until persistence has been loaded into the
228    * in-memory cache. Return cache_ after persistence loading is done.
229    */
230   LRUInMemoryCache<K, V, MutexT>& blockingAccessInMemCache();
231 
232  private:
233   // Our threadsafe in memory cache
234   LRUInMemoryCache<K, V, MutexT> cache_;
235 
236   // used to signal syncer thread
237   bool stopSyncer_{false};
238   // mutex used to synchronize syncer_ on destruction, tied to stopSyncerCV_
239   std::mutex stopSyncerMutex_;
240   // condvar used to wakeup syncer on exit
241   std::condition_variable stopSyncerCV_;
242 
243   // We do not schedule the same task more than one to the executor
244   std::atomic_flag executorScheduled_ = ATOMIC_FLAG_INIT;
245 
246   // sync interval in milliseconds
247   const std::chrono::milliseconds syncInterval_{
248       client::persistence::DEFAULT_CACHE_SYNC_INTERVAL};
249   // limit on no. of sync attempts
250   const int nSyncRetries_{client::persistence::DEFAULT_CACHE_SYNC_RETRIES};
251   // Sync try count across executor tasks
252   int nSyncTries_{0};
253   std::chrono::steady_clock::time_point lastExecutorScheduleTime_;
254 
255   // persistence layer
256   // we use a shared pointer since the syncer thread might be operating on
257   // it when the client decides to set a new one
258   std::shared_ptr<CachePersistence> persistence_;
259   // for locking access to persistence set/get
260   MutexT persistenceLock_;
261 
262   // thread for periodic sync
263   std::thread syncer_;
264 
265   // executor for periodic sync.
266   std::shared_ptr<folly::Executor> executor_;
267 
268   // Semaphore to synchronize persistence loading and operations on the cache.
269   folly::SaturatingSemaphore<true /* MayBlock */> persistenceLoadedSemaphore_;
270 
271   // Whether the persistence will be loaded inline.
272   const bool inlinePersistenceLoading_;
273 };
274 
275 } // namespace wangle
276 
277 #include <wangle/client/persistence/LRUPersistentCache-inl.h>
278