1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 6 // Use of this source code is governed by a BSD-style license that can be 7 // found in the LICENSE file. See the AUTHORS file for names of contributors. 8 // 9 // WriteBatch holds a collection of updates to apply atomically to a DB. 10 // 11 // The updates are applied in the order in which they are added 12 // to the WriteBatch. For example, the value of "key" will be "v3" 13 // after the following batch is written: 14 // 15 // batch.Put("key", "v1"); 16 // batch.Delete("key"); 17 // batch.Put("key", "v2"); 18 // batch.Put("key", "v3"); 19 // 20 // Multiple threads can invoke const methods on a WriteBatch without 21 // external synchronization, but if any of the threads may call a 22 // non-const method, all threads accessing the same WriteBatch must use 23 // external synchronization. 24 25 #pragma once 26 27 #include <stdint.h> 28 #include <atomic> 29 #include <memory> 30 #include <string> 31 #include <vector> 32 #include "rocksdb/status.h" 33 #include "rocksdb/write_batch_base.h" 34 35 namespace rocksdb { 36 37 class Slice; 38 class ColumnFamilyHandle; 39 struct SavePoints; 40 struct SliceParts; 41 42 struct SavePoint { 43 size_t size; // size of rep_ 44 int count; // count of elements in rep_ 45 uint32_t content_flags; 46 SavePointSavePoint47 SavePoint() : size(0), count(0), content_flags(0) {} 48 SavePointSavePoint49 SavePoint(size_t _size, int _count, uint32_t _flags) 50 : size(_size), count(_count), content_flags(_flags) {} 51 clearSavePoint52 void clear() { 53 size = 0; 54 count = 0; 55 content_flags = 0; 56 } 57 is_clearedSavePoint58 bool is_cleared() const { return (size | count | content_flags) == 0; } 59 }; 60 61 class WriteBatch : public WriteBatchBase { 62 public: 63 explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0); 64 explicit WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz); 65 ~WriteBatch() override; 66 67 using WriteBatchBase::Put; 68 // Store the mapping "key->value" in the database. 69 Status Put(ColumnFamilyHandle* column_family, const Slice& key, 70 const Slice& value) override; Put(const Slice & key,const Slice & value)71 Status Put(const Slice& key, const Slice& value) override { 72 return Put(nullptr, key, value); 73 } 74 75 // Variant of Put() that gathers output like writev(2). The key and value 76 // that will be written to the database are concatenations of arrays of 77 // slices. 78 Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, 79 const SliceParts& value) override; Put(const SliceParts & key,const SliceParts & value)80 Status Put(const SliceParts& key, const SliceParts& value) override { 81 return Put(nullptr, key, value); 82 } 83 84 using WriteBatchBase::Delete; 85 // If the database contains a mapping for "key", erase it. Else do nothing. 86 Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; Delete(const Slice & key)87 Status Delete(const Slice& key) override { return Delete(nullptr, key); } 88 89 // variant that takes SliceParts 90 Status Delete(ColumnFamilyHandle* column_family, 91 const SliceParts& key) override; Delete(const SliceParts & key)92 Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } 93 94 using WriteBatchBase::SingleDelete; 95 // WriteBatch implementation of DB::SingleDelete(). See db.h. 96 Status SingleDelete(ColumnFamilyHandle* column_family, 97 const Slice& key) override; SingleDelete(const Slice & key)98 Status SingleDelete(const Slice& key) override { 99 return SingleDelete(nullptr, key); 100 } 101 102 // variant that takes SliceParts 103 Status SingleDelete(ColumnFamilyHandle* column_family, 104 const SliceParts& key) override; SingleDelete(const SliceParts & key)105 Status SingleDelete(const SliceParts& key) override { 106 return SingleDelete(nullptr, key); 107 } 108 109 using WriteBatchBase::DeleteRange; 110 // WriteBatch implementation of DB::DeleteRange(). See db.h. 111 Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, 112 const Slice& end_key) override; DeleteRange(const Slice & begin_key,const Slice & end_key)113 Status DeleteRange(const Slice& begin_key, const Slice& end_key) override { 114 return DeleteRange(nullptr, begin_key, end_key); 115 } 116 117 // variant that takes SliceParts 118 Status DeleteRange(ColumnFamilyHandle* column_family, 119 const SliceParts& begin_key, 120 const SliceParts& end_key) override; DeleteRange(const SliceParts & begin_key,const SliceParts & end_key)121 Status DeleteRange(const SliceParts& begin_key, 122 const SliceParts& end_key) override { 123 return DeleteRange(nullptr, begin_key, end_key); 124 } 125 126 using WriteBatchBase::Merge; 127 // Merge "value" with the existing value of "key" in the database. 128 // "key->merge(existing, value)" 129 Status Merge(ColumnFamilyHandle* column_family, const Slice& key, 130 const Slice& value) override; Merge(const Slice & key,const Slice & value)131 Status Merge(const Slice& key, const Slice& value) override { 132 return Merge(nullptr, key, value); 133 } 134 135 // variant that takes SliceParts 136 Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key, 137 const SliceParts& value) override; Merge(const SliceParts & key,const SliceParts & value)138 Status Merge(const SliceParts& key, const SliceParts& value) override { 139 return Merge(nullptr, key, value); 140 } 141 142 using WriteBatchBase::PutLogData; 143 // Append a blob of arbitrary size to the records in this batch. The blob will 144 // be stored in the transaction log but not in any other file. In particular, 145 // it will not be persisted to the SST files. When iterating over this 146 // WriteBatch, WriteBatch::Handler::LogData will be called with the contents 147 // of the blob as it is encountered. Blobs, puts, deletes, and merges will be 148 // encountered in the same order in which they were inserted. The blob will 149 // NOT consume sequence number(s) and will NOT increase the count of the batch 150 // 151 // Example application: add timestamps to the transaction log for use in 152 // replication. 153 Status PutLogData(const Slice& blob) override; 154 155 using WriteBatchBase::Clear; 156 // Clear all updates buffered in this batch. 157 void Clear() override; 158 159 // Records the state of the batch for future calls to RollbackToSavePoint(). 160 // May be called multiple times to set multiple save points. 161 void SetSavePoint() override; 162 163 // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the 164 // most recent call to SetSavePoint() and removes the most recent save point. 165 // If there is no previous call to SetSavePoint(), Status::NotFound() 166 // will be returned. 167 // Otherwise returns Status::OK(). 168 Status RollbackToSavePoint() override; 169 170 // Pop the most recent save point. 171 // If there is no previous call to SetSavePoint(), Status::NotFound() 172 // will be returned. 173 // Otherwise returns Status::OK(). 174 Status PopSavePoint() override; 175 176 // Support for iterating over the contents of a batch. 177 class Handler { 178 public: 179 virtual ~Handler(); 180 // All handler functions in this class provide default implementations so 181 // we won't break existing clients of Handler on a source code level when 182 // adding a new member function. 183 184 // default implementation will just call Put without column family for 185 // backwards compatibility. If the column family is not default, 186 // the function is noop PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)187 virtual Status PutCF(uint32_t column_family_id, const Slice& key, 188 const Slice& value) { 189 if (column_family_id == 0) { 190 // Put() historically doesn't return status. We didn't want to be 191 // backwards incompatible so we didn't change the return status 192 // (this is a public API). We do an ordinary get and return Status::OK() 193 Put(key, value); 194 return Status::OK(); 195 } 196 return Status::InvalidArgument( 197 "non-default column family and PutCF not implemented"); 198 } Put(const Slice &,const Slice &)199 virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {} 200 DeleteCF(uint32_t column_family_id,const Slice & key)201 virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { 202 if (column_family_id == 0) { 203 Delete(key); 204 return Status::OK(); 205 } 206 return Status::InvalidArgument( 207 "non-default column family and DeleteCF not implemented"); 208 } Delete(const Slice &)209 virtual void Delete(const Slice& /*key*/) {} 210 SingleDeleteCF(uint32_t column_family_id,const Slice & key)211 virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) { 212 if (column_family_id == 0) { 213 SingleDelete(key); 214 return Status::OK(); 215 } 216 return Status::InvalidArgument( 217 "non-default column family and SingleDeleteCF not implemented"); 218 } SingleDelete(const Slice &)219 virtual void SingleDelete(const Slice& /*key*/) {} 220 DeleteRangeCF(uint32_t,const Slice &,const Slice &)221 virtual Status DeleteRangeCF(uint32_t /*column_family_id*/, 222 const Slice& /*begin_key*/, 223 const Slice& /*end_key*/) { 224 return Status::InvalidArgument("DeleteRangeCF not implemented"); 225 } 226 MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)227 virtual Status MergeCF(uint32_t column_family_id, const Slice& key, 228 const Slice& value) { 229 if (column_family_id == 0) { 230 Merge(key, value); 231 return Status::OK(); 232 } 233 return Status::InvalidArgument( 234 "non-default column family and MergeCF not implemented"); 235 } Merge(const Slice &,const Slice &)236 virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {} 237 PutBlobIndexCF(uint32_t,const Slice &,const Slice &)238 virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/, 239 const Slice& /*key*/, 240 const Slice& /*value*/) { 241 return Status::InvalidArgument("PutBlobIndexCF not implemented"); 242 } 243 244 // The default implementation of LogData does nothing. 245 virtual void LogData(const Slice& blob); 246 247 virtual Status MarkBeginPrepare(bool = false) { 248 return Status::InvalidArgument("MarkBeginPrepare() handler not defined."); 249 } 250 MarkEndPrepare(const Slice &)251 virtual Status MarkEndPrepare(const Slice& /*xid*/) { 252 return Status::InvalidArgument("MarkEndPrepare() handler not defined."); 253 } 254 MarkNoop(bool)255 virtual Status MarkNoop(bool /*empty_batch*/) { 256 return Status::InvalidArgument("MarkNoop() handler not defined."); 257 } 258 MarkRollback(const Slice &)259 virtual Status MarkRollback(const Slice& /*xid*/) { 260 return Status::InvalidArgument( 261 "MarkRollbackPrepare() handler not defined."); 262 } 263 MarkCommit(const Slice &)264 virtual Status MarkCommit(const Slice& /*xid*/) { 265 return Status::InvalidArgument("MarkCommit() handler not defined."); 266 } 267 268 // Continue is called by WriteBatch::Iterate. If it returns false, 269 // iteration is halted. Otherwise, it continues iterating. The default 270 // implementation always returns true. 271 virtual bool Continue(); 272 273 protected: 274 friend class WriteBatchInternal; WriteAfterCommit()275 virtual bool WriteAfterCommit() const { return true; } WriteBeforePrepare()276 virtual bool WriteBeforePrepare() const { return false; } 277 }; 278 Status Iterate(Handler* handler) const; 279 280 // Retrieve the serialized version of this batch. Data()281 const std::string& Data() const { return rep_; } 282 283 // Retrieve data size of the batch. GetDataSize()284 size_t GetDataSize() const { return rep_.size(); } 285 286 // Returns the number of updates in the batch 287 uint32_t Count() const; 288 289 // Returns true if PutCF will be called during Iterate 290 bool HasPut() const; 291 292 // Returns true if DeleteCF will be called during Iterate 293 bool HasDelete() const; 294 295 // Returns true if SingleDeleteCF will be called during Iterate 296 bool HasSingleDelete() const; 297 298 // Returns true if DeleteRangeCF will be called during Iterate 299 bool HasDeleteRange() const; 300 301 // Returns true if MergeCF will be called during Iterate 302 bool HasMerge() const; 303 304 // Returns true if MarkBeginPrepare will be called during Iterate 305 bool HasBeginPrepare() const; 306 307 // Returns true if MarkEndPrepare will be called during Iterate 308 bool HasEndPrepare() const; 309 310 // Returns trie if MarkCommit will be called during Iterate 311 bool HasCommit() const; 312 313 // Returns trie if MarkRollback will be called during Iterate 314 bool HasRollback() const; 315 316 // Assign timestamp to write batch 317 Status AssignTimestamp(const Slice& ts); 318 319 // Assign timestamps to write batch 320 Status AssignTimestamps(const std::vector<Slice>& ts_list); 321 322 using WriteBatchBase::GetWriteBatch; GetWriteBatch()323 WriteBatch* GetWriteBatch() override { return this; } 324 325 // Constructor with a serialized string object 326 explicit WriteBatch(const std::string& rep); 327 explicit WriteBatch(std::string&& rep); 328 329 WriteBatch(const WriteBatch& src); 330 WriteBatch(WriteBatch&& src) noexcept; 331 WriteBatch& operator=(const WriteBatch& src); 332 WriteBatch& operator=(WriteBatch&& src); 333 334 // marks this point in the WriteBatch as the last record to 335 // be inserted into the WAL, provided the WAL is enabled 336 void MarkWalTerminationPoint(); GetWalTerminationPoint()337 const SavePoint& GetWalTerminationPoint() const { return wal_term_point_; } 338 SetMaxBytes(size_t max_bytes)339 void SetMaxBytes(size_t max_bytes) override { max_bytes_ = max_bytes; } 340 341 private: 342 friend class WriteBatchInternal; 343 friend class LocalSavePoint; 344 // TODO(myabandeh): this is needed for a hack to collapse the write batch and 345 // remove duplicate keys. Remove it when the hack is replaced with a proper 346 // solution. 347 friend class WriteBatchWithIndex; 348 std::unique_ptr<SavePoints> save_points_; 349 350 // When sending a WriteBatch through WriteImpl we might want to 351 // specify that only the first x records of the batch be written to 352 // the WAL. 353 SavePoint wal_term_point_; 354 355 // For HasXYZ. Mutable to allow lazy computation of results 356 mutable std::atomic<uint32_t> content_flags_; 357 358 // Performs deferred computation of content_flags if necessary 359 uint32_t ComputeContentFlags() const; 360 361 // Maximum size of rep_. 362 size_t max_bytes_; 363 364 // Is the content of the batch the application's latest state that meant only 365 // to be used for recovery? Refer to 366 // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery for 367 // more details. 368 bool is_latest_persistent_state_ = false; 369 370 protected: 371 std::string rep_; // See comment in write_batch.cc for the format of rep_ 372 const size_t timestamp_size_; 373 374 // Intentionally copyable 375 }; 376 377 } // namespace rocksdb 378