1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 #include "db/memtable_list.h"
7 
8 #include <cinttypes>
9 #include <limits>
10 #include <queue>
11 #include <string>
12 #include "db/db_impl/db_impl.h"
13 #include "db/memtable.h"
14 #include "db/range_tombstone_fragmenter.h"
15 #include "db/version_set.h"
16 #include "logging/log_buffer.h"
17 #include "monitoring/thread_status_util.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/env.h"
20 #include "rocksdb/iterator.h"
21 #include "table/merging_iterator.h"
22 #include "test_util/sync_point.h"
23 #include "util/coding.h"
24 
25 namespace rocksdb {
26 
27 class InternalKeyComparator;
28 class Mutex;
29 class VersionSet;
30 
AddMemTable(MemTable * m)31 void MemTableListVersion::AddMemTable(MemTable* m) {
32   memlist_.push_front(m);
33   *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
34 }
35 
UnrefMemTable(autovector<MemTable * > * to_delete,MemTable * m)36 void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
37                                         MemTable* m) {
38   if (m->Unref()) {
39     to_delete->push_back(m);
40     assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
41     *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
42   }
43 }
44 
MemTableListVersion(size_t * parent_memtable_list_memory_usage,MemTableListVersion * old)45 MemTableListVersion::MemTableListVersion(
46     size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
47     : max_write_buffer_number_to_maintain_(
48           old->max_write_buffer_number_to_maintain_),
49       max_write_buffer_size_to_maintain_(
50           old->max_write_buffer_size_to_maintain_),
51       parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
52   if (old != nullptr) {
53     memlist_ = old->memlist_;
54     for (auto& m : memlist_) {
55       m->Ref();
56     }
57 
58     memlist_history_ = old->memlist_history_;
59     for (auto& m : memlist_history_) {
60       m->Ref();
61     }
62   }
63 }
64 
MemTableListVersion(size_t * parent_memtable_list_memory_usage,int max_write_buffer_number_to_maintain,int64_t max_write_buffer_size_to_maintain)65 MemTableListVersion::MemTableListVersion(
66     size_t* parent_memtable_list_memory_usage,
67     int max_write_buffer_number_to_maintain,
68     int64_t max_write_buffer_size_to_maintain)
69     : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
70       max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain),
71       parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
72 
Ref()73 void MemTableListVersion::Ref() { ++refs_; }
74 
75 // called by superversion::clean()
Unref(autovector<MemTable * > * to_delete)76 void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
77   assert(refs_ >= 1);
78   --refs_;
79   if (refs_ == 0) {
80     // if to_delete is equal to nullptr it means we're confident
81     // that refs_ will not be zero
82     assert(to_delete != nullptr);
83     for (const auto& m : memlist_) {
84       UnrefMemTable(to_delete, m);
85     }
86     for (const auto& m : memlist_history_) {
87       UnrefMemTable(to_delete, m);
88     }
89     delete this;
90   }
91 }
92 
NumNotFlushed() const93 int MemTableList::NumNotFlushed() const {
94   int size = static_cast<int>(current_->memlist_.size());
95   assert(num_flush_not_started_ <= size);
96   return size;
97 }
98 
NumFlushed() const99 int MemTableList::NumFlushed() const {
100   return static_cast<int>(current_->memlist_history_.size());
101 }
102 
103 // Search all the memtables starting from the most recent one.
104 // Return the most recent value found, if any.
105 // Operands stores the list of merge operations to apply, so far.
Get(const LookupKey & key,std::string * value,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,ReadCallback * callback,bool * is_blob_index)106 bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
107                               Status* s, MergeContext* merge_context,
108                               SequenceNumber* max_covering_tombstone_seq,
109                               SequenceNumber* seq, const ReadOptions& read_opts,
110                               ReadCallback* callback, bool* is_blob_index) {
111   return GetFromList(&memlist_, key, value, s, merge_context,
112                      max_covering_tombstone_seq, seq, read_opts, callback,
113                      is_blob_index);
114 }
115 
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback,bool * is_blob)116 void MemTableListVersion::MultiGet(const ReadOptions& read_options,
117                                    MultiGetRange* range, ReadCallback* callback,
118                                    bool* is_blob) {
119   for (auto memtable : memlist_) {
120     memtable->MultiGet(read_options, range, callback, is_blob);
121     if (range->empty()) {
122       return;
123     }
124   }
125 }
126 
GetMergeOperands(const LookupKey & key,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,const ReadOptions & read_opts)127 bool MemTableListVersion::GetMergeOperands(
128     const LookupKey& key, Status* s, MergeContext* merge_context,
129     SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
130   for (MemTable* memtable : memlist_) {
131     bool done = memtable->Get(key, nullptr, s, merge_context,
132                               max_covering_tombstone_seq, read_opts, nullptr,
133                               nullptr, false);
134     if (done) {
135       return true;
136     }
137   }
138   return false;
139 }
140 
GetFromHistory(const LookupKey & key,std::string * value,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,bool * is_blob_index)141 bool MemTableListVersion::GetFromHistory(
142     const LookupKey& key, std::string* value, Status* s,
143     MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,
144     SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) {
145   return GetFromList(&memlist_history_, key, value, s, merge_context,
146                      max_covering_tombstone_seq, seq, read_opts,
147                      nullptr /*read_callback*/, is_blob_index);
148 }
149 
GetFromList(std::list<MemTable * > * list,const LookupKey & key,std::string * value,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,ReadCallback * callback,bool * is_blob_index)150 bool MemTableListVersion::GetFromList(
151     std::list<MemTable*>* list, const LookupKey& key, std::string* value,
152     Status* s, MergeContext* merge_context,
153     SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
154     const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) {
155   *seq = kMaxSequenceNumber;
156 
157   for (auto& memtable : *list) {
158     SequenceNumber current_seq = kMaxSequenceNumber;
159 
160     bool done =
161         memtable->Get(key, value, s, merge_context, max_covering_tombstone_seq,
162                       &current_seq, read_opts, callback, is_blob_index);
163     if (*seq == kMaxSequenceNumber) {
164       // Store the most recent sequence number of any operation on this key.
165       // Since we only care about the most recent change, we only need to
166       // return the first operation found when searching memtables in
167       // reverse-chronological order.
168       // current_seq would be equal to kMaxSequenceNumber if the value was to be
169       // skipped. This allows seq to be assigned again when the next value is
170       // read.
171       *seq = current_seq;
172     }
173 
174     if (done) {
175       assert(*seq != kMaxSequenceNumber || s->IsNotFound());
176       return true;
177     }
178     if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
179       return false;
180     }
181   }
182   return false;
183 }
184 
AddRangeTombstoneIterators(const ReadOptions & read_opts,Arena *,RangeDelAggregator * range_del_agg)185 Status MemTableListVersion::AddRangeTombstoneIterators(
186     const ReadOptions& read_opts, Arena* /*arena*/,
187     RangeDelAggregator* range_del_agg) {
188   assert(range_del_agg != nullptr);
189   // Except for snapshot read, using kMaxSequenceNumber is OK because these
190   // are immutable memtables.
191   SequenceNumber read_seq = read_opts.snapshot != nullptr
192                                 ? read_opts.snapshot->GetSequenceNumber()
193                                 : kMaxSequenceNumber;
194   for (auto& m : memlist_) {
195     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
196         m->NewRangeTombstoneIterator(read_opts, read_seq));
197     range_del_agg->AddTombstones(std::move(range_del_iter));
198   }
199   return Status::OK();
200 }
201 
AddIterators(const ReadOptions & options,std::vector<InternalIterator * > * iterator_list,Arena * arena)202 void MemTableListVersion::AddIterators(
203     const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
204     Arena* arena) {
205   for (auto& m : memlist_) {
206     iterator_list->push_back(m->NewIterator(options, arena));
207   }
208 }
209 
AddIterators(const ReadOptions & options,MergeIteratorBuilder * merge_iter_builder)210 void MemTableListVersion::AddIterators(
211     const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
212   for (auto& m : memlist_) {
213     merge_iter_builder->AddIterator(
214         m->NewIterator(options, merge_iter_builder->GetArena()));
215   }
216 }
217 
GetTotalNumEntries() const218 uint64_t MemTableListVersion::GetTotalNumEntries() const {
219   uint64_t total_num = 0;
220   for (auto& m : memlist_) {
221     total_num += m->num_entries();
222   }
223   return total_num;
224 }
225 
ApproximateStats(const Slice & start_ikey,const Slice & end_ikey)226 MemTable::MemTableStats MemTableListVersion::ApproximateStats(
227     const Slice& start_ikey, const Slice& end_ikey) {
228   MemTable::MemTableStats total_stats = {0, 0};
229   for (auto& m : memlist_) {
230     auto mStats = m->ApproximateStats(start_ikey, end_ikey);
231     total_stats.size += mStats.size;
232     total_stats.count += mStats.count;
233   }
234   return total_stats;
235 }
236 
GetTotalNumDeletes() const237 uint64_t MemTableListVersion::GetTotalNumDeletes() const {
238   uint64_t total_num = 0;
239   for (auto& m : memlist_) {
240     total_num += m->num_deletes();
241   }
242   return total_num;
243 }
244 
GetEarliestSequenceNumber(bool include_history) const245 SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
246     bool include_history) const {
247   if (include_history && !memlist_history_.empty()) {
248     return memlist_history_.back()->GetEarliestSequenceNumber();
249   } else if (!memlist_.empty()) {
250     return memlist_.back()->GetEarliestSequenceNumber();
251   } else {
252     return kMaxSequenceNumber;
253   }
254 }
255 
256 // caller is responsible for referencing m
Add(MemTable * m,autovector<MemTable * > * to_delete)257 void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
258   assert(refs_ == 1);  // only when refs_ == 1 is MemTableListVersion mutable
259   AddMemTable(m);
260 
261   TrimHistory(to_delete, m->ApproximateMemoryUsage());
262 }
263 
264 // Removes m from list of memtables not flushed.  Caller should NOT Unref m.
Remove(MemTable * m,autovector<MemTable * > * to_delete)265 void MemTableListVersion::Remove(MemTable* m,
266                                  autovector<MemTable*>* to_delete) {
267   assert(refs_ == 1);  // only when refs_ == 1 is MemTableListVersion mutable
268   memlist_.remove(m);
269 
270   m->MarkFlushed();
271   if (max_write_buffer_size_to_maintain_ > 0 ||
272       max_write_buffer_number_to_maintain_ > 0) {
273     memlist_history_.push_front(m);
274     // Unable to get size of mutable memtable at this point, pass 0 to
275     // TrimHistory as a best effort.
276     TrimHistory(to_delete, 0);
277   } else {
278     UnrefMemTable(to_delete, m);
279   }
280 }
281 
282 // return the total memory usage assuming the oldest flushed memtable is dropped
ApproximateMemoryUsageExcludingLast() const283 size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() const {
284   size_t total_memtable_size = 0;
285   for (auto& memtable : memlist_) {
286     total_memtable_size += memtable->ApproximateMemoryUsage();
287   }
288   for (auto& memtable : memlist_history_) {
289     total_memtable_size += memtable->ApproximateMemoryUsage();
290   }
291   if (!memlist_history_.empty()) {
292     total_memtable_size -= memlist_history_.back()->ApproximateMemoryUsage();
293   }
294   return total_memtable_size;
295 }
296 
MemtableLimitExceeded(size_t usage)297 bool MemTableListVersion::MemtableLimitExceeded(size_t usage) {
298   if (max_write_buffer_size_to_maintain_ > 0) {
299     // calculate the total memory usage after dropping the oldest flushed
300     // memtable, compare with max_write_buffer_size_to_maintain_ to decide
301     // whether to trim history
302     return ApproximateMemoryUsageExcludingLast() + usage >=
303            static_cast<size_t>(max_write_buffer_size_to_maintain_);
304   } else if (max_write_buffer_number_to_maintain_ > 0) {
305     return memlist_.size() + memlist_history_.size() >
306            static_cast<size_t>(max_write_buffer_number_to_maintain_);
307   } else {
308     return false;
309   }
310 }
311 
312 // Make sure we don't use up too much space in history
TrimHistory(autovector<MemTable * > * to_delete,size_t usage)313 void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete,
314                                       size_t usage) {
315   while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) {
316     MemTable* x = memlist_history_.back();
317     memlist_history_.pop_back();
318 
319     UnrefMemTable(to_delete, x);
320   }
321 }
322 
323 // Returns true if there is at least one memtable on which flush has
324 // not yet started.
IsFlushPending() const325 bool MemTableList::IsFlushPending() const {
326   if ((flush_requested_ && num_flush_not_started_ > 0) ||
327       (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
328     assert(imm_flush_needed.load(std::memory_order_relaxed));
329     return true;
330   }
331   return false;
332 }
333 
334 // Returns the memtables that need to be flushed.
PickMemtablesToFlush(const uint64_t * max_memtable_id,autovector<MemTable * > * ret)335 void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
336                                         autovector<MemTable*>* ret) {
337   AutoThreadOperationStageUpdater stage_updater(
338       ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
339   const auto& memlist = current_->memlist_;
340   bool atomic_flush = false;
341   for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
342     MemTable* m = *it;
343     if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
344       atomic_flush = true;
345     }
346     if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) {
347       break;
348     }
349     if (!m->flush_in_progress_) {
350       assert(!m->flush_completed_);
351       num_flush_not_started_--;
352       if (num_flush_not_started_ == 0) {
353         imm_flush_needed.store(false, std::memory_order_release);
354       }
355       m->flush_in_progress_ = true;  // flushing will start very soon
356       ret->push_back(m);
357     }
358   }
359   if (!atomic_flush || num_flush_not_started_ == 0) {
360     flush_requested_ = false;  // start-flush request is complete
361   }
362 }
363 
RollbackMemtableFlush(const autovector<MemTable * > & mems,uint64_t)364 void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
365                                          uint64_t /*file_number*/) {
366   AutoThreadOperationStageUpdater stage_updater(
367       ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
368   assert(!mems.empty());
369 
370   // If the flush was not successful, then just reset state.
371   // Maybe a succeeding attempt to flush will be successful.
372   for (MemTable* m : mems) {
373     assert(m->flush_in_progress_);
374     assert(m->file_number_ == 0);
375 
376     m->flush_in_progress_ = false;
377     m->flush_completed_ = false;
378     m->edit_.Clear();
379     num_flush_not_started_++;
380   }
381   imm_flush_needed.store(true, std::memory_order_release);
382 }
383 
384 // Try record a successful flush in the manifest file. It might just return
385 // Status::OK letting a concurrent flush to do actual the recording..
TryInstallMemtableFlushResults(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,const autovector<MemTable * > & mems,LogsWithPrepTracker * prep_tracker,VersionSet * vset,InstrumentedMutex * mu,uint64_t file_number,autovector<MemTable * > * to_delete,Directory * db_directory,LogBuffer * log_buffer,std::list<std::unique_ptr<FlushJobInfo>> * committed_flush_jobs_info)386 Status MemTableList::TryInstallMemtableFlushResults(
387     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
388     const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
389     VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
390     autovector<MemTable*>* to_delete, Directory* db_directory,
391     LogBuffer* log_buffer,
392     std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info) {
393   AutoThreadOperationStageUpdater stage_updater(
394       ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
395   mu->AssertHeld();
396 
397   // Flush was successful
398   // Record the status on the memtable object. Either this call or a call by a
399   // concurrent flush thread will read the status and write it to manifest.
400   for (size_t i = 0; i < mems.size(); ++i) {
401     // All the edits are associated with the first memtable of this batch.
402     assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
403 
404     mems[i]->flush_completed_ = true;
405     mems[i]->file_number_ = file_number;
406   }
407 
408   // if some other thread is already committing, then return
409   Status s;
410   if (commit_in_progress_) {
411     TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress");
412     return s;
413   }
414 
415   // Only a single thread can be executing this piece of code
416   commit_in_progress_ = true;
417 
418   // Retry until all completed flushes are committed. New flushes can finish
419   // while the current thread is writing manifest where mutex is released.
420   while (s.ok()) {
421     auto& memlist = current_->memlist_;
422     // The back is the oldest; if flush_completed_ is not set to it, it means
423     // that we were assigned a more recent memtable. The memtables' flushes must
424     // be recorded in manifest in order. A concurrent flush thread, who is
425     // assigned to flush the oldest memtable, will later wake up and does all
426     // the pending writes to manifest, in order.
427     if (memlist.empty() || !memlist.back()->flush_completed_) {
428       break;
429     }
430     // scan all memtables from the earliest, and commit those
431     // (in that order) that have finished flushing. Memtables
432     // are always committed in the order that they were created.
433     uint64_t batch_file_number = 0;
434     size_t batch_count = 0;
435     autovector<VersionEdit*> edit_list;
436     autovector<MemTable*> memtables_to_flush;
437     // enumerate from the last (earliest) element to see how many batch finished
438     for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
439       MemTable* m = *it;
440       if (!m->flush_completed_) {
441         break;
442       }
443       if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
444         batch_file_number = m->file_number_;
445         ROCKS_LOG_BUFFER(log_buffer,
446                          "[%s] Level-0 commit table #%" PRIu64 " started",
447                          cfd->GetName().c_str(), m->file_number_);
448         edit_list.push_back(&m->edit_);
449         memtables_to_flush.push_back(m);
450 #ifndef ROCKSDB_LITE
451         std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
452         if (info != nullptr) {
453           committed_flush_jobs_info->push_back(std::move(info));
454         }
455 #else
456         (void)committed_flush_jobs_info;
457 #endif  // !ROCKSDB_LITE
458       }
459       batch_count++;
460     }
461 
462     // TODO(myabandeh): Not sure how batch_count could be 0 here.
463     if (batch_count > 0) {
464       if (vset->db_options()->allow_2pc) {
465         assert(edit_list.size() > 0);
466         // We piggyback the information of  earliest log file to keep in the
467         // manifest entry for the last file flushed.
468         edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
469             vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
470       }
471 
472       // this can release and reacquire the mutex.
473       s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
474                             db_directory);
475 
476       // we will be changing the version in the next code path,
477       // so we better create a new one, since versions are immutable
478       InstallNewVersion();
479 
480       // All the later memtables that have the same filenum
481       // are part of the same batch. They can be committed now.
482       uint64_t mem_id = 1;  // how many memtables have been flushed.
483 
484       // commit new state only if the column family is NOT dropped.
485       // The reason is as follows (refer to
486       // ColumnFamilyTest.FlushAndDropRaceCondition).
487       // If the column family is dropped, then according to LogAndApply, its
488       // corresponding flush operation is NOT written to the MANIFEST. This
489       // means the DB is not aware of the L0 files generated from the flush.
490       // By committing the new state, we remove the memtable from the memtable
491       // list. Creating an iterator on this column family will not be able to
492       // read full data since the memtable is removed, and the DB is not aware
493       // of the L0 files, causing MergingIterator unable to build child
494       // iterators. RocksDB contract requires that the iterator can be created
495       // on a dropped column family, and we must be able to
496       // read full data as long as column family handle is not deleted, even if
497       // the column family is dropped.
498       if (s.ok() && !cfd->IsDropped()) {  // commit new state
499         while (batch_count-- > 0) {
500           MemTable* m = current_->memlist_.back();
501           ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
502                                        ": memtable #%" PRIu64 " done",
503                            cfd->GetName().c_str(), m->file_number_, mem_id);
504           assert(m->file_number_ > 0);
505           current_->Remove(m, to_delete);
506           UpdateCachedValuesFromMemTableListVersion();
507           ResetTrimHistoryNeeded();
508           ++mem_id;
509         }
510       } else {
511         for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
512           MemTable* m = *it;
513           // commit failed. setup state so that we can flush again.
514           ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
515                                        ": memtable #%" PRIu64 " failed",
516                            m->file_number_, mem_id);
517           m->flush_completed_ = false;
518           m->flush_in_progress_ = false;
519           m->edit_.Clear();
520           num_flush_not_started_++;
521           m->file_number_ = 0;
522           imm_flush_needed.store(true, std::memory_order_release);
523           ++mem_id;
524         }
525       }
526     }
527   }
528   commit_in_progress_ = false;
529   return s;
530 }
531 
532 // New memtables are inserted at the front of the list.
Add(MemTable * m,autovector<MemTable * > * to_delete)533 void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
534   assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
535   InstallNewVersion();
536   // this method is used to move mutable memtable into an immutable list.
537   // since mutable memtable is already refcounted by the DBImpl,
538   // and when moving to the imutable list we don't unref it,
539   // we don't have to ref the memtable here. we just take over the
540   // reference from the DBImpl.
541   current_->Add(m, to_delete);
542   m->MarkImmutable();
543   num_flush_not_started_++;
544   if (num_flush_not_started_ == 1) {
545     imm_flush_needed.store(true, std::memory_order_release);
546   }
547   UpdateCachedValuesFromMemTableListVersion();
548   ResetTrimHistoryNeeded();
549 }
550 
TrimHistory(autovector<MemTable * > * to_delete,size_t usage)551 void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
552   InstallNewVersion();
553   current_->TrimHistory(to_delete, usage);
554   UpdateCachedValuesFromMemTableListVersion();
555   ResetTrimHistoryNeeded();
556 }
557 
558 // Returns an estimate of the number of bytes of data in use.
ApproximateUnflushedMemTablesMemoryUsage()559 size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
560   size_t total_size = 0;
561   for (auto& memtable : current_->memlist_) {
562     total_size += memtable->ApproximateMemoryUsage();
563   }
564   return total_size;
565 }
566 
ApproximateMemoryUsage()567 size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
568 
ApproximateMemoryUsageExcludingLast() const569 size_t MemTableList::ApproximateMemoryUsageExcludingLast() const {
570   const size_t usage =
571       current_memory_usage_excluding_last_.load(std::memory_order_relaxed);
572   return usage;
573 }
574 
HasHistory() const575 bool MemTableList::HasHistory() const {
576   const bool has_history = current_has_history_.load(std::memory_order_relaxed);
577   return has_history;
578 }
579 
UpdateCachedValuesFromMemTableListVersion()580 void MemTableList::UpdateCachedValuesFromMemTableListVersion() {
581   const size_t total_memtable_size =
582       current_->ApproximateMemoryUsageExcludingLast();
583   current_memory_usage_excluding_last_.store(total_memtable_size,
584                                              std::memory_order_relaxed);
585 
586   const bool has_history = current_->HasHistory();
587   current_has_history_.store(has_history, std::memory_order_relaxed);
588 }
589 
ApproximateOldestKeyTime() const590 uint64_t MemTableList::ApproximateOldestKeyTime() const {
591   if (!current_->memlist_.empty()) {
592     return current_->memlist_.back()->ApproximateOldestKeyTime();
593   }
594   return std::numeric_limits<uint64_t>::max();
595 }
596 
InstallNewVersion()597 void MemTableList::InstallNewVersion() {
598   if (current_->refs_ == 1) {
599     // we're the only one using the version, just keep using it
600   } else {
601     // somebody else holds the current version, we need to create new one
602     MemTableListVersion* version = current_;
603     current_ = new MemTableListVersion(&current_memory_usage_, current_);
604     current_->Ref();
605     version->Unref();
606   }
607 }
608 
PrecomputeMinLogContainingPrepSection(const autovector<MemTable * > & memtables_to_flush)609 uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
610     const autovector<MemTable*>& memtables_to_flush) {
611   uint64_t min_log = 0;
612 
613   for (auto& m : current_->memlist_) {
614     // Assume the list is very short, we can live with O(m*n). We can optimize
615     // if the performance has some problem.
616     bool should_skip = false;
617     for (MemTable* m_to_flush : memtables_to_flush) {
618       if (m == m_to_flush) {
619         should_skip = true;
620         break;
621       }
622     }
623     if (should_skip) {
624       continue;
625     }
626 
627     auto log = m->GetMinLogContainingPrepSection();
628 
629     if (log > 0 && (min_log == 0 || log < min_log)) {
630       min_log = log;
631     }
632   }
633 
634   return min_log;
635 }
636 
637 // Commit a successful atomic flush in the manifest file.
InstallMemtableAtomicFlushResults(const autovector<MemTableList * > * imm_lists,const autovector<ColumnFamilyData * > & cfds,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<const autovector<MemTable * > * > & mems_list,VersionSet * vset,InstrumentedMutex * mu,const autovector<FileMetaData * > & file_metas,autovector<MemTable * > * to_delete,Directory * db_directory,LogBuffer * log_buffer)638 Status InstallMemtableAtomicFlushResults(
639     const autovector<MemTableList*>* imm_lists,
640     const autovector<ColumnFamilyData*>& cfds,
641     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
642     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
643     InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
644     autovector<MemTable*>* to_delete, Directory* db_directory,
645     LogBuffer* log_buffer) {
646   AutoThreadOperationStageUpdater stage_updater(
647       ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
648   mu->AssertHeld();
649 
650   size_t num = mems_list.size();
651   assert(cfds.size() == num);
652   if (imm_lists != nullptr) {
653     assert(imm_lists->size() == num);
654   }
655   for (size_t k = 0; k != num; ++k) {
656 #ifndef NDEBUG
657     const auto* imm =
658         (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
659     if (!mems_list[k]->empty()) {
660       assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
661     }
662 #endif
663     assert(nullptr != file_metas[k]);
664     for (size_t i = 0; i != mems_list[k]->size(); ++i) {
665       assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
666       (*mems_list[k])[i]->SetFlushCompleted(true);
667       (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
668     }
669   }
670 
671   Status s;
672 
673   autovector<autovector<VersionEdit*>> edit_lists;
674   uint32_t num_entries = 0;
675   for (const auto mems : mems_list) {
676     assert(mems != nullptr);
677     autovector<VersionEdit*> edits;
678     assert(!mems->empty());
679     edits.emplace_back((*mems)[0]->GetEdits());
680     ++num_entries;
681     edit_lists.emplace_back(edits);
682   }
683   // Mark the version edits as an atomic group if the number of version edits
684   // exceeds 1.
685   if (cfds.size() > 1) {
686     for (auto& edits : edit_lists) {
687       assert(edits.size() == 1);
688       edits[0]->MarkAtomicGroup(--num_entries);
689     }
690     assert(0 == num_entries);
691   }
692 
693   // this can release and reacquire the mutex.
694   s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
695                         db_directory);
696 
697   for (size_t k = 0; k != cfds.size(); ++k) {
698     auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
699     imm->InstallNewVersion();
700   }
701 
702   if (s.ok() || s.IsColumnFamilyDropped()) {
703     for (size_t i = 0; i != cfds.size(); ++i) {
704       if (cfds[i]->IsDropped()) {
705         continue;
706       }
707       auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
708       for (auto m : *mems_list[i]) {
709         assert(m->GetFileNumber() > 0);
710         uint64_t mem_id = m->GetID();
711         ROCKS_LOG_BUFFER(log_buffer,
712                          "[%s] Level-0 commit table #%" PRIu64
713                          ": memtable #%" PRIu64 " done",
714                          cfds[i]->GetName().c_str(), m->GetFileNumber(),
715                          mem_id);
716         imm->current_->Remove(m, to_delete);
717         imm->UpdateCachedValuesFromMemTableListVersion();
718         imm->ResetTrimHistoryNeeded();
719       }
720     }
721   } else {
722     for (size_t i = 0; i != cfds.size(); ++i) {
723       auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
724       for (auto m : *mems_list[i]) {
725         uint64_t mem_id = m->GetID();
726         ROCKS_LOG_BUFFER(log_buffer,
727                          "[%s] Level-0 commit table #%" PRIu64
728                          ": memtable #%" PRIu64 " failed",
729                          cfds[i]->GetName().c_str(), m->GetFileNumber(),
730                          mem_id);
731         m->SetFlushCompleted(false);
732         m->SetFlushInProgress(false);
733         m->GetEdits()->Clear();
734         m->SetFileNumber(0);
735         imm->num_flush_not_started_++;
736       }
737       imm->imm_flush_needed.store(true, std::memory_order_release);
738     }
739   }
740 
741   return s;
742 }
743 
RemoveOldMemTables(uint64_t log_number,autovector<MemTable * > * to_delete)744 void MemTableList::RemoveOldMemTables(uint64_t log_number,
745                                       autovector<MemTable*>* to_delete) {
746   assert(to_delete != nullptr);
747   InstallNewVersion();
748   auto& memlist = current_->memlist_;
749   autovector<MemTable*> old_memtables;
750   for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
751     MemTable* mem = *it;
752     if (mem->GetNextLogNumber() > log_number) {
753       break;
754     }
755     old_memtables.push_back(mem);
756   }
757 
758   for (auto it = old_memtables.begin(); it != old_memtables.end(); ++it) {
759     MemTable* mem = *it;
760     current_->Remove(mem, to_delete);
761     --num_flush_not_started_;
762     if (0 == num_flush_not_started_) {
763       imm_flush_needed.store(false, std::memory_order_release);
764     }
765   }
766 
767   UpdateCachedValuesFromMemTableListVersion();
768   ResetTrimHistoryNeeded();
769 }
770 
771 }  // namespace rocksdb
772