1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 10 #pragma once 11 #include <vector> 12 #include "db/flush_scheduler.h" 13 #include "db/trim_history_scheduler.h" 14 #include "db/write_thread.h" 15 #include "rocksdb/db.h" 16 #include "rocksdb/options.h" 17 #include "rocksdb/types.h" 18 #include "rocksdb/write_batch.h" 19 #include "util/autovector.h" 20 21 namespace ROCKSDB_NAMESPACE { 22 23 class MemTable; 24 class FlushScheduler; 25 class ColumnFamilyData; 26 27 class ColumnFamilyMemTables { 28 public: ~ColumnFamilyMemTables()29 virtual ~ColumnFamilyMemTables() {} 30 virtual bool Seek(uint32_t column_family_id) = 0; 31 // returns true if the update to memtable should be ignored 32 // (useful when recovering from log whose updates have already 33 // been processed) 34 virtual uint64_t GetLogNumber() const = 0; 35 virtual MemTable* GetMemTable() const = 0; 36 virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0; current()37 virtual ColumnFamilyData* current() { return nullptr; } 38 }; 39 40 class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { 41 public: ColumnFamilyMemTablesDefault(MemTable * mem)42 explicit ColumnFamilyMemTablesDefault(MemTable* mem) 43 : ok_(false), mem_(mem) {} 44 Seek(uint32_t column_family_id)45 bool Seek(uint32_t column_family_id) override { 46 ok_ = (column_family_id == 0); 47 return ok_; 48 } 49 GetLogNumber()50 uint64_t GetLogNumber() const override { return 0; } 51 GetMemTable()52 MemTable* GetMemTable() const override { 53 assert(ok_); 54 return mem_; 55 } 56 GetColumnFamilyHandle()57 ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; } 58 59 private: 60 bool ok_; 61 MemTable* mem_; 62 }; 63 64 // WriteBatchInternal provides static methods for manipulating a 65 // WriteBatch that we don't want in the public WriteBatch interface. 66 class WriteBatchInternal { 67 public: 68 69 // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. 70 static const size_t kHeader = 12; 71 72 // WriteBatch methods with column_family_id instead of ColumnFamilyHandle* 73 static Status Put(WriteBatch* batch, uint32_t column_family_id, 74 const Slice& key, const Slice& value); 75 76 static Status Put(WriteBatch* batch, uint32_t column_family_id, 77 const SliceParts& key, const SliceParts& value); 78 79 static Status Delete(WriteBatch* batch, uint32_t column_family_id, 80 const SliceParts& key); 81 82 static Status Delete(WriteBatch* batch, uint32_t column_family_id, 83 const Slice& key); 84 85 static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id, 86 const SliceParts& key); 87 88 static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id, 89 const Slice& key); 90 91 static Status DeleteRange(WriteBatch* b, uint32_t column_family_id, 92 const Slice& begin_key, const Slice& end_key); 93 94 static Status DeleteRange(WriteBatch* b, uint32_t column_family_id, 95 const SliceParts& begin_key, 96 const SliceParts& end_key); 97 98 static Status Merge(WriteBatch* batch, uint32_t column_family_id, 99 const Slice& key, const Slice& value); 100 101 static Status Merge(WriteBatch* batch, uint32_t column_family_id, 102 const SliceParts& key, const SliceParts& value); 103 104 static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, 105 const Slice& key, const Slice& value); 106 107 static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid, 108 const bool write_after_commit = true, 109 const bool unprepared_batch = false); 110 111 static Status MarkRollback(WriteBatch* batch, const Slice& xid); 112 113 static Status MarkCommit(WriteBatch* batch, const Slice& xid); 114 115 static Status InsertNoop(WriteBatch* batch); 116 117 // Return the number of entries in the batch. 118 static uint32_t Count(const WriteBatch* batch); 119 120 // Set the count for the number of entries in the batch. 121 static void SetCount(WriteBatch* batch, uint32_t n); 122 123 // Return the sequence number for the start of this batch. 124 static SequenceNumber Sequence(const WriteBatch* batch); 125 126 // Store the specified number as the sequence number for the start of 127 // this batch. 128 static void SetSequence(WriteBatch* batch, SequenceNumber seq); 129 130 // Returns the offset of the first entry in the batch. 131 // This offset is only valid if the batch is not empty. 132 static size_t GetFirstOffset(WriteBatch* batch); 133 Contents(const WriteBatch * batch)134 static Slice Contents(const WriteBatch* batch) { 135 return Slice(batch->rep_); 136 } 137 ByteSize(const WriteBatch * batch)138 static size_t ByteSize(const WriteBatch* batch) { 139 return batch->rep_.size(); 140 } 141 142 static Status SetContents(WriteBatch* batch, const Slice& contents); 143 144 static Status CheckSlicePartsLength(const SliceParts& key, 145 const SliceParts& value); 146 147 // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive. 148 // 149 // If ignore_missing_column_families == true. WriteBatch 150 // referencing non-existing column family will be ignored. 151 // If ignore_missing_column_families == false, processing of the 152 // batches will be stopped if a reference is found to a non-existing 153 // column family and InvalidArgument() will be returned. The writes 154 // in batches may be only partially applied at that point. 155 // 156 // If log_number is non-zero, the memtable will be updated only if 157 // memtables->GetLogNumber() >= log_number. 158 // 159 // If flush_scheduler is non-null, it will be invoked if the memtable 160 // should be flushed. 161 // 162 // Under concurrent use, the caller is responsible for making sure that 163 // the memtables object itself is thread-local. 164 static Status InsertInto( 165 WriteThread::WriteGroup& write_group, SequenceNumber sequence, 166 ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, 167 TrimHistoryScheduler* trim_history_scheduler, 168 bool ignore_missing_column_families = false, uint64_t log_number = 0, 169 DB* db = nullptr, bool concurrent_memtable_writes = false, 170 bool seq_per_batch = false, bool batch_per_txn = true); 171 172 // Convenience form of InsertInto when you have only one batch 173 // next_seq returns the seq after last sequence number used in MemTable insert 174 static Status InsertInto( 175 const WriteBatch* batch, ColumnFamilyMemTables* memtables, 176 FlushScheduler* flush_scheduler, 177 TrimHistoryScheduler* trim_history_scheduler, 178 bool ignore_missing_column_families = false, uint64_t log_number = 0, 179 DB* db = nullptr, bool concurrent_memtable_writes = false, 180 SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr, 181 bool seq_per_batch = false, bool batch_per_txn = true); 182 183 static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence, 184 ColumnFamilyMemTables* memtables, 185 FlushScheduler* flush_scheduler, 186 TrimHistoryScheduler* trim_history_scheduler, 187 bool ignore_missing_column_families = false, 188 uint64_t log_number = 0, DB* db = nullptr, 189 bool concurrent_memtable_writes = false, 190 bool seq_per_batch = false, size_t batch_cnt = 0, 191 bool batch_per_txn = true, 192 bool hint_per_batch = false); 193 194 static Status Append(WriteBatch* dst, const WriteBatch* src, 195 const bool WAL_only = false); 196 197 // Returns the byte size of appending a WriteBatch with ByteSize 198 // leftByteSize and a WriteBatch with ByteSize rightByteSize 199 static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); 200 201 // Iterate over [begin, end) range of a write batch 202 static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler, 203 size_t begin, size_t end); 204 205 // This write batch includes the latest state that should be persisted. Such 206 // state meant to be used only during recovery. 207 static void SetAsLastestPersistentState(WriteBatch* b); 208 static bool IsLatestPersistentState(const WriteBatch* b); 209 }; 210 211 // LocalSavePoint is similar to a scope guard 212 class LocalSavePoint { 213 public: LocalSavePoint(WriteBatch * batch)214 explicit LocalSavePoint(WriteBatch* batch) 215 : batch_(batch), 216 savepoint_(batch->GetDataSize(), batch->Count(), 217 batch->content_flags_.load(std::memory_order_relaxed)) 218 #ifndef NDEBUG 219 , 220 committed_(false) 221 #endif 222 { 223 } 224 225 #ifndef NDEBUG ~LocalSavePoint()226 ~LocalSavePoint() { assert(committed_); } 227 #endif commit()228 Status commit() { 229 #ifndef NDEBUG 230 committed_ = true; 231 #endif 232 if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) { 233 batch_->rep_.resize(savepoint_.size); 234 WriteBatchInternal::SetCount(batch_, savepoint_.count); 235 batch_->content_flags_.store(savepoint_.content_flags, 236 std::memory_order_relaxed); 237 return Status::MemoryLimit(); 238 } 239 return Status::OK(); 240 } 241 242 private: 243 WriteBatch* batch_; 244 SavePoint savepoint_; 245 #ifndef NDEBUG 246 bool committed_; 247 #endif 248 }; 249 250 } // namespace ROCKSDB_NAMESPACE 251