1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #pragma once
11 #include <vector>
12 #include "db/flush_scheduler.h"
13 #include "db/trim_history_scheduler.h"
14 #include "db/write_thread.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/options.h"
17 #include "rocksdb/types.h"
18 #include "rocksdb/write_batch.h"
19 #include "util/autovector.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 class MemTable;
24 class FlushScheduler;
25 class ColumnFamilyData;
26 
27 class ColumnFamilyMemTables {
28  public:
~ColumnFamilyMemTables()29   virtual ~ColumnFamilyMemTables() {}
30   virtual bool Seek(uint32_t column_family_id) = 0;
31   // returns true if the update to memtable should be ignored
32   // (useful when recovering from log whose updates have already
33   // been processed)
34   virtual uint64_t GetLogNumber() const = 0;
35   virtual MemTable* GetMemTable() const = 0;
36   virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
current()37   virtual ColumnFamilyData* current() { return nullptr; }
38 };
39 
40 class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
41  public:
ColumnFamilyMemTablesDefault(MemTable * mem)42   explicit ColumnFamilyMemTablesDefault(MemTable* mem)
43       : ok_(false), mem_(mem) {}
44 
Seek(uint32_t column_family_id)45   bool Seek(uint32_t column_family_id) override {
46     ok_ = (column_family_id == 0);
47     return ok_;
48   }
49 
GetLogNumber()50   uint64_t GetLogNumber() const override { return 0; }
51 
GetMemTable()52   MemTable* GetMemTable() const override {
53     assert(ok_);
54     return mem_;
55   }
56 
GetColumnFamilyHandle()57   ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
58 
59  private:
60   bool ok_;
61   MemTable* mem_;
62 };
63 
64 // WriteBatchInternal provides static methods for manipulating a
65 // WriteBatch that we don't want in the public WriteBatch interface.
66 class WriteBatchInternal {
67  public:
68 
69   // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
70   static const size_t kHeader = 12;
71 
72   // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
73   static Status Put(WriteBatch* batch, uint32_t column_family_id,
74                     const Slice& key, const Slice& value);
75 
76   static Status Put(WriteBatch* batch, uint32_t column_family_id,
77                     const SliceParts& key, const SliceParts& value);
78 
79   static Status Delete(WriteBatch* batch, uint32_t column_family_id,
80                        const SliceParts& key);
81 
82   static Status Delete(WriteBatch* batch, uint32_t column_family_id,
83                        const Slice& key);
84 
85   static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
86                              const SliceParts& key);
87 
88   static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
89                              const Slice& key);
90 
91   static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
92                             const Slice& begin_key, const Slice& end_key);
93 
94   static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
95                             const SliceParts& begin_key,
96                             const SliceParts& end_key);
97 
98   static Status Merge(WriteBatch* batch, uint32_t column_family_id,
99                       const Slice& key, const Slice& value);
100 
101   static Status Merge(WriteBatch* batch, uint32_t column_family_id,
102                       const SliceParts& key, const SliceParts& value);
103 
104   static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,
105                              const Slice& key, const Slice& value);
106 
107   static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid,
108                                const bool write_after_commit = true,
109                                const bool unprepared_batch = false);
110 
111   static Status MarkRollback(WriteBatch* batch, const Slice& xid);
112 
113   static Status MarkCommit(WriteBatch* batch, const Slice& xid);
114 
115   static Status InsertNoop(WriteBatch* batch);
116 
117   // Return the number of entries in the batch.
118   static uint32_t Count(const WriteBatch* batch);
119 
120   // Set the count for the number of entries in the batch.
121   static void SetCount(WriteBatch* batch, uint32_t n);
122 
123   // Return the sequence number for the start of this batch.
124   static SequenceNumber Sequence(const WriteBatch* batch);
125 
126   // Store the specified number as the sequence number for the start of
127   // this batch.
128   static void SetSequence(WriteBatch* batch, SequenceNumber seq);
129 
130   // Returns the offset of the first entry in the batch.
131   // This offset is only valid if the batch is not empty.
132   static size_t GetFirstOffset(WriteBatch* batch);
133 
Contents(const WriteBatch * batch)134   static Slice Contents(const WriteBatch* batch) {
135     return Slice(batch->rep_);
136   }
137 
ByteSize(const WriteBatch * batch)138   static size_t ByteSize(const WriteBatch* batch) {
139     return batch->rep_.size();
140   }
141 
142   static Status SetContents(WriteBatch* batch, const Slice& contents);
143 
144   static Status CheckSlicePartsLength(const SliceParts& key,
145                                       const SliceParts& value);
146 
147   // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.
148   //
149   // If ignore_missing_column_families == true. WriteBatch
150   // referencing non-existing column family will be ignored.
151   // If ignore_missing_column_families == false, processing of the
152   // batches will be stopped if a reference is found to a non-existing
153   // column family and InvalidArgument() will be returned.  The writes
154   // in batches may be only partially applied at that point.
155   //
156   // If log_number is non-zero, the memtable will be updated only if
157   // memtables->GetLogNumber() >= log_number.
158   //
159   // If flush_scheduler is non-null, it will be invoked if the memtable
160   // should be flushed.
161   //
162   // Under concurrent use, the caller is responsible for making sure that
163   // the memtables object itself is thread-local.
164   static Status InsertInto(
165       WriteThread::WriteGroup& write_group, SequenceNumber sequence,
166       ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
167       TrimHistoryScheduler* trim_history_scheduler,
168       bool ignore_missing_column_families = false, uint64_t log_number = 0,
169       DB* db = nullptr, bool concurrent_memtable_writes = false,
170       bool seq_per_batch = false, bool batch_per_txn = true);
171 
172   // Convenience form of InsertInto when you have only one batch
173   // next_seq returns the seq after last sequence number used in MemTable insert
174   static Status InsertInto(
175       const WriteBatch* batch, ColumnFamilyMemTables* memtables,
176       FlushScheduler* flush_scheduler,
177       TrimHistoryScheduler* trim_history_scheduler,
178       bool ignore_missing_column_families = false, uint64_t log_number = 0,
179       DB* db = nullptr, bool concurrent_memtable_writes = false,
180       SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr,
181       bool seq_per_batch = false, bool batch_per_txn = true);
182 
183   static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,
184                            ColumnFamilyMemTables* memtables,
185                            FlushScheduler* flush_scheduler,
186                            TrimHistoryScheduler* trim_history_scheduler,
187                            bool ignore_missing_column_families = false,
188                            uint64_t log_number = 0, DB* db = nullptr,
189                            bool concurrent_memtable_writes = false,
190                            bool seq_per_batch = false, size_t batch_cnt = 0,
191                            bool batch_per_txn = true,
192                            bool hint_per_batch = false);
193 
194   static Status Append(WriteBatch* dst, const WriteBatch* src,
195                        const bool WAL_only = false);
196 
197   // Returns the byte size of appending a WriteBatch with ByteSize
198   // leftByteSize and a WriteBatch with ByteSize rightByteSize
199   static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
200 
201   // Iterate over [begin, end) range of a write batch
202   static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler,
203                         size_t begin, size_t end);
204 
205   // This write batch includes the latest state that should be persisted. Such
206   // state meant to be used only during recovery.
207   static void SetAsLastestPersistentState(WriteBatch* b);
208   static bool IsLatestPersistentState(const WriteBatch* b);
209 };
210 
211 // LocalSavePoint is similar to a scope guard
212 class LocalSavePoint {
213  public:
LocalSavePoint(WriteBatch * batch)214   explicit LocalSavePoint(WriteBatch* batch)
215       : batch_(batch),
216         savepoint_(batch->GetDataSize(), batch->Count(),
217                    batch->content_flags_.load(std::memory_order_relaxed))
218 #ifndef NDEBUG
219         ,
220         committed_(false)
221 #endif
222   {
223   }
224 
225 #ifndef NDEBUG
~LocalSavePoint()226   ~LocalSavePoint() { assert(committed_); }
227 #endif
commit()228   Status commit() {
229 #ifndef NDEBUG
230     committed_ = true;
231 #endif
232     if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {
233       batch_->rep_.resize(savepoint_.size);
234       WriteBatchInternal::SetCount(batch_, savepoint_.count);
235       batch_->content_flags_.store(savepoint_.content_flags,
236                                    std::memory_order_relaxed);
237       return Status::MemoryLimit();
238     }
239     return Status::OK();
240   }
241 
242  private:
243   WriteBatch* batch_;
244   SavePoint savepoint_;
245 #ifndef NDEBUG
246   bool committed_;
247 #endif
248 };
249 
250 }  // namespace ROCKSDB_NAMESPACE
251