1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 #pragma once
7 
8 #include <deque>
9 #include <limits>
10 #include <list>
11 #include <set>
12 #include <string>
13 #include <vector>
14 
15 #include "db/dbformat.h"
16 #include "db/logs_with_prep_tracker.h"
17 #include "db/memtable.h"
18 #include "db/range_del_aggregator.h"
19 #include "file/filename.h"
20 #include "logging/log_buffer.h"
21 #include "monitoring/instrumented_mutex.h"
22 #include "rocksdb/db.h"
23 #include "rocksdb/iterator.h"
24 #include "rocksdb/options.h"
25 #include "rocksdb/types.h"
26 #include "util/autovector.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 
30 class ColumnFamilyData;
31 class InternalKeyComparator;
32 class InstrumentedMutex;
33 class MergeIteratorBuilder;
34 class MemTableList;
35 
36 struct FlushJobInfo;
37 
38 // keeps a list of immutable memtables in a vector. the list is immutable
39 // if refcount is bigger than one. It is used as a state for Get() and
40 // Iterator code paths
41 //
42 // This class is not thread-safe.  External synchronization is required
43 // (such as holding the db mutex or being on the write thread).
44 class MemTableListVersion {
45  public:
46   explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
47                                MemTableListVersion* old = nullptr);
48   explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
49                                int max_write_buffer_number_to_maintain,
50                                int64_t max_write_buffer_size_to_maintain);
51 
52   void Ref();
53   void Unref(autovector<MemTable*>* to_delete = nullptr);
54 
55   // Search all the memtables starting from the most recent one.
56   // Return the most recent value found, if any.
57   //
58   // If any operation was found for this key, its most recent sequence number
59   // will be stored in *seq on success (regardless of whether true/false is
60   // returned).  Otherwise, *seq will be set to kMaxSequenceNumber.
61   bool Get(const LookupKey& key, std::string* value, Status* s,
62            MergeContext* merge_context,
63            SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
64            const ReadOptions& read_opts, ReadCallback* callback = nullptr,
65            bool* is_blob_index = nullptr);
66 
67   bool Get(const LookupKey& key, std::string* value, Status* s,
68            MergeContext* merge_context,
69            SequenceNumber* max_covering_tombstone_seq,
70            const ReadOptions& read_opts, ReadCallback* callback = nullptr,
71            bool* is_blob_index = nullptr) {
72     SequenceNumber seq;
73     return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq,
74                read_opts, callback, is_blob_index);
75   }
76 
77   void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
78                 ReadCallback* callback, bool* is_blob);
79 
80   // Returns all the merge operands corresponding to the key by searching all
81   // memtables starting from the most recent one.
82   bool GetMergeOperands(const LookupKey& key, Status* s,
83                         MergeContext* merge_context,
84                         SequenceNumber* max_covering_tombstone_seq,
85                         const ReadOptions& read_opts);
86 
87   // Similar to Get(), but searches the Memtable history of memtables that
88   // have already been flushed.  Should only be used from in-memory only
89   // queries (such as Transaction validation) as the history may contain
90   // writes that are also present in the SST files.
91   bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
92                       MergeContext* merge_context,
93                       SequenceNumber* max_covering_tombstone_seq,
94                       SequenceNumber* seq, const ReadOptions& read_opts,
95                       bool* is_blob_index = nullptr);
96   bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
97                       MergeContext* merge_context,
98                       SequenceNumber* max_covering_tombstone_seq,
99                       const ReadOptions& read_opts,
100                       bool* is_blob_index = nullptr) {
101     SequenceNumber seq;
102     return GetFromHistory(key, value, s, merge_context,
103                           max_covering_tombstone_seq, &seq, read_opts,
104                           is_blob_index);
105   }
106 
107   Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
108                                     RangeDelAggregator* range_del_agg);
109 
110   void AddIterators(const ReadOptions& options,
111                     std::vector<InternalIterator*>* iterator_list,
112                     Arena* arena);
113 
114   void AddIterators(const ReadOptions& options,
115                     MergeIteratorBuilder* merge_iter_builder);
116 
117   uint64_t GetTotalNumEntries() const;
118 
119   uint64_t GetTotalNumDeletes() const;
120 
121   MemTable::MemTableStats ApproximateStats(const Slice& start_ikey,
122                                            const Slice& end_ikey);
123 
124   // Returns the value of MemTable::GetEarliestSequenceNumber() on the most
125   // recent MemTable in this list or kMaxSequenceNumber if the list is empty.
126   // If include_history=true, will also search Memtables in MemTableList
127   // History.
128   SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const;
129 
130  private:
131   friend class MemTableList;
132 
133   friend Status InstallMemtableAtomicFlushResults(
134       const autovector<MemTableList*>* imm_lists,
135       const autovector<ColumnFamilyData*>& cfds,
136       const autovector<const MutableCFOptions*>& mutable_cf_options_list,
137       const autovector<const autovector<MemTable*>*>& mems_list,
138       VersionSet* vset, InstrumentedMutex* mu,
139       const autovector<FileMetaData*>& file_meta,
140       autovector<MemTable*>* to_delete, Directory* db_directory,
141       LogBuffer* log_buffer);
142 
143   // REQUIRE: m is an immutable memtable
144   void Add(MemTable* m, autovector<MemTable*>* to_delete);
145   // REQUIRE: m is an immutable memtable
146   void Remove(MemTable* m, autovector<MemTable*>* to_delete);
147 
148   void TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
149 
150   bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
151                    std::string* value, Status* s, MergeContext* merge_context,
152                    SequenceNumber* max_covering_tombstone_seq,
153                    SequenceNumber* seq, const ReadOptions& read_opts,
154                    ReadCallback* callback = nullptr,
155                    bool* is_blob_index = nullptr);
156 
157   void AddMemTable(MemTable* m);
158 
159   void UnrefMemTable(autovector<MemTable*>* to_delete, MemTable* m);
160 
161   // Calculate the total amount of memory used by memlist_ and memlist_history_
162   // excluding the last MemTable in memlist_history_. The reason for excluding
163   // the last MemTable is to see if dropping the last MemTable will keep total
164   // memory usage above or equal to max_write_buffer_size_to_maintain_
165   size_t ApproximateMemoryUsageExcludingLast() const;
166 
167   // Whether this version contains flushed memtables that are only kept around
168   // for transaction conflict checking.
HasHistory()169   bool HasHistory() const { return !memlist_history_.empty(); }
170 
171   bool MemtableLimitExceeded(size_t usage);
172 
173   // Immutable MemTables that have not yet been flushed.
174   std::list<MemTable*> memlist_;
175 
176   // MemTables that have already been flushed
177   // (used during Transaction validation)
178   std::list<MemTable*> memlist_history_;
179 
180   // Maximum number of MemTables to keep in memory (including both flushed
181   const int max_write_buffer_number_to_maintain_;
182   // Maximum size of MemTables to keep in memory (including both flushed
183   // and not-yet-flushed tables).
184   const int64_t max_write_buffer_size_to_maintain_;
185 
186   int refs_ = 0;
187 
188   size_t* parent_memtable_list_memory_usage_;
189 };
190 
191 // This class stores references to all the immutable memtables.
192 // The memtables are flushed to L0 as soon as possible and in
193 // any order. If there are more than one immutable memtable, their
194 // flushes can occur concurrently.  However, they are 'committed'
195 // to the manifest in FIFO order to maintain correctness and
196 // recoverability from a crash.
197 //
198 //
199 // Other than imm_flush_needed and imm_trim_needed, this class is not
200 // thread-safe and requires external synchronization (such as holding the db
201 // mutex or being on the write thread.)
202 class MemTableList {
203  public:
204   // A list of memtables.
MemTableList(int min_write_buffer_number_to_merge,int max_write_buffer_number_to_maintain,int64_t max_write_buffer_size_to_maintain)205   explicit MemTableList(int min_write_buffer_number_to_merge,
206                         int max_write_buffer_number_to_maintain,
207                         int64_t max_write_buffer_size_to_maintain)
208       : imm_flush_needed(false),
209         imm_trim_needed(false),
210         min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
211         current_(new MemTableListVersion(&current_memory_usage_,
212                                          max_write_buffer_number_to_maintain,
213                                          max_write_buffer_size_to_maintain)),
214         num_flush_not_started_(0),
215         commit_in_progress_(false),
216         flush_requested_(false),
217         current_memory_usage_(0),
218         current_memory_usage_excluding_last_(0),
219         current_has_history_(false) {
220     current_->Ref();
221   }
222 
223   // Should not delete MemTableList without making sure MemTableList::current()
224   // is Unref()'d.
~MemTableList()225   ~MemTableList() {}
226 
current()227   MemTableListVersion* current() const { return current_; }
228 
229   // so that background threads can detect non-nullptr pointer to
230   // determine whether there is anything more to start flushing.
231   std::atomic<bool> imm_flush_needed;
232 
233   std::atomic<bool> imm_trim_needed;
234 
235   // Returns the total number of memtables in the list that haven't yet
236   // been flushed and logged.
237   int NumNotFlushed() const;
238 
239   // Returns total number of memtables in the list that have been
240   // completely flushed and logged.
241   int NumFlushed() const;
242 
243   // Returns true if there is at least one memtable on which flush has
244   // not yet started.
245   bool IsFlushPending() const;
246 
247   // Returns the earliest memtables that needs to be flushed. The returned
248   // memtables are guaranteed to be in the ascending order of created time.
249   void PickMemtablesToFlush(const uint64_t* max_memtable_id,
250                             autovector<MemTable*>* mems);
251 
252   // Reset status of the given memtable list back to pending state so that
253   // they can get picked up again on the next round of flush.
254   void RollbackMemtableFlush(const autovector<MemTable*>& mems,
255                              uint64_t file_number);
256 
257   // Try commit a successful flush in the manifest file. It might just return
258   // Status::OK letting a concurrent flush to do the actual the recording.
259   Status TryInstallMemtableFlushResults(
260       ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
261       const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker,
262       VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
263       autovector<MemTable*>* to_delete, Directory* db_directory,
264       LogBuffer* log_buffer,
265       std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info);
266 
267   // New memtables are inserted at the front of the list.
268   // Takes ownership of the referenced held on *m by the caller of Add().
269   void Add(MemTable* m, autovector<MemTable*>* to_delete);
270 
271   // Returns an estimate of the number of bytes of data in use.
272   size_t ApproximateMemoryUsage();
273 
274   // Returns the cached current_memory_usage_excluding_last_ value.
275   size_t ApproximateMemoryUsageExcludingLast() const;
276 
277   // Returns the cached current_has_history_ value.
278   bool HasHistory() const;
279 
280   // Updates current_memory_usage_excluding_last_ and current_has_history_
281   // from MemTableListVersion. Must be called whenever InstallNewVersion is
282   // called.
283   void UpdateCachedValuesFromMemTableListVersion();
284 
285   // `usage` is the current size of the mutable Memtable. When
286   // max_write_buffer_size_to_maintain is used, total size of mutable and
287   // immutable memtables is checked against it to decide whether to trim
288   // memtable list.
289   void TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
290 
291   // Returns an estimate of the number of bytes of data used by
292   // the unflushed mem-tables.
293   size_t ApproximateUnflushedMemTablesMemoryUsage();
294 
295   // Returns an estimate of the timestamp of the earliest key.
296   uint64_t ApproximateOldestKeyTime() const;
297 
298   // Request a flush of all existing memtables to storage.  This will
299   // cause future calls to IsFlushPending() to return true if this list is
300   // non-empty (regardless of the min_write_buffer_number_to_merge
301   // parameter). This flush request will persist until the next time
302   // PickMemtablesToFlush() is called.
FlushRequested()303   void FlushRequested() { flush_requested_ = true; }
304 
HasFlushRequested()305   bool HasFlushRequested() { return flush_requested_; }
306 
307   // Returns true if a trim history should be scheduled and the caller should
308   // be the one to schedule it
MarkTrimHistoryNeeded()309   bool MarkTrimHistoryNeeded() {
310     auto expected = false;
311     return imm_trim_needed.compare_exchange_strong(
312         expected, true, std::memory_order_relaxed, std::memory_order_relaxed);
313   }
314 
ResetTrimHistoryNeeded()315   void ResetTrimHistoryNeeded() {
316     auto expected = true;
317     imm_trim_needed.compare_exchange_strong(
318         expected, false, std::memory_order_relaxed, std::memory_order_relaxed);
319   }
320 
321   // Copying allowed
322   // MemTableList(const MemTableList&);
323   // void operator=(const MemTableList&);
324 
current_memory_usage()325   size_t* current_memory_usage() { return &current_memory_usage_; }
326 
327   // Returns the min log containing the prep section after memtables listsed in
328   // `memtables_to_flush` are flushed and their status is persisted in manifest.
329   uint64_t PrecomputeMinLogContainingPrepSection(
330       const autovector<MemTable*>& memtables_to_flush);
331 
GetEarliestMemTableID()332   uint64_t GetEarliestMemTableID() const {
333     auto& memlist = current_->memlist_;
334     if (memlist.empty()) {
335       return std::numeric_limits<uint64_t>::max();
336     }
337     return memlist.back()->GetID();
338   }
339 
GetLatestMemTableID()340   uint64_t GetLatestMemTableID() const {
341     auto& memlist = current_->memlist_;
342     if (memlist.empty()) {
343       return 0;
344     }
345     return memlist.front()->GetID();
346   }
347 
AssignAtomicFlushSeq(const SequenceNumber & seq)348   void AssignAtomicFlushSeq(const SequenceNumber& seq) {
349     const auto& memlist = current_->memlist_;
350     // Scan the memtable list from new to old
351     for (auto it = memlist.begin(); it != memlist.end(); ++it) {
352       MemTable* mem = *it;
353       if (mem->atomic_flush_seqno_ == kMaxSequenceNumber) {
354         mem->atomic_flush_seqno_ = seq;
355       } else {
356         // Earlier memtables must have been assigned a atomic flush seq, no
357         // need to continue scan.
358         break;
359       }
360     }
361   }
362 
363   // Used only by DBImplSecondary during log replay.
364   // Remove memtables whose data were written before the WAL with log_number
365   // was created, i.e. mem->GetNextLogNumber() <= log_number. The memtables are
366   // not freed, but put into a vector for future deref and reclamation.
367   void RemoveOldMemTables(uint64_t log_number,
368                           autovector<MemTable*>* to_delete);
369 
370  private:
371   friend Status InstallMemtableAtomicFlushResults(
372       const autovector<MemTableList*>* imm_lists,
373       const autovector<ColumnFamilyData*>& cfds,
374       const autovector<const MutableCFOptions*>& mutable_cf_options_list,
375       const autovector<const autovector<MemTable*>*>& mems_list,
376       VersionSet* vset, InstrumentedMutex* mu,
377       const autovector<FileMetaData*>& file_meta,
378       autovector<MemTable*>* to_delete, Directory* db_directory,
379       LogBuffer* log_buffer);
380 
381   // DB mutex held
382   void InstallNewVersion();
383 
384   const int min_write_buffer_number_to_merge_;
385 
386   MemTableListVersion* current_;
387 
388   // the number of elements that still need flushing
389   int num_flush_not_started_;
390 
391   // committing in progress
392   bool commit_in_progress_;
393 
394   // Requested a flush of memtables to storage. It's possible to request that
395   // a subset of memtables be flushed.
396   bool flush_requested_;
397 
398   // The current memory usage.
399   size_t current_memory_usage_;
400 
401   // Cached value of current_->ApproximateMemoryUsageExcludingLast().
402   std::atomic<size_t> current_memory_usage_excluding_last_;
403 
404   // Cached value of current_->HasHistory().
405   std::atomic<bool> current_has_history_;
406 };
407 
408 // Installs memtable atomic flush results.
409 // In most cases, imm_lists is nullptr, and the function simply uses the
410 // immutable memtable lists associated with the cfds. There are unit tests that
411 // installs flush results for external immutable memtable lists other than the
412 // cfds' own immutable memtable lists, e.g. MemTableLIstTest. In this case,
413 // imm_lists parameter is not nullptr.
414 extern Status InstallMemtableAtomicFlushResults(
415     const autovector<MemTableList*>* imm_lists,
416     const autovector<ColumnFamilyData*>& cfds,
417     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
418     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
419     InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
420     autovector<MemTable*>* to_delete, Directory* db_directory,
421     LogBuffer* log_buffer);
422 }  // namespace ROCKSDB_NAMESPACE
423