1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
6 // Use of this source code is governed by a BSD-style license that can be
7 // found in the LICENSE file. See the AUTHORS file for names of contributors.
8 //
9 // WriteBatch holds a collection of updates to apply atomically to a DB.
10 //
11 // The updates are applied in the order in which they are added
12 // to the WriteBatch.  For example, the value of "key" will be "v3"
13 // after the following batch is written:
14 //
15 //    batch.Put("key", "v1");
16 //    batch.Delete("key");
17 //    batch.Put("key", "v2");
18 //    batch.Put("key", "v3");
19 //
20 // Multiple threads can invoke const methods on a WriteBatch without
21 // external synchronization, but if any of the threads may call a
22 // non-const method, all threads accessing the same WriteBatch must use
23 // external synchronization.
24 
25 #pragma once
26 
27 #include <stdint.h>
28 #include <atomic>
29 #include <memory>
30 #include <string>
31 #include <vector>
32 #include "rocksdb/status.h"
33 #include "rocksdb/write_batch_base.h"
34 
35 namespace ROCKSDB_NAMESPACE {
36 
37 class Slice;
38 class ColumnFamilyHandle;
39 struct SavePoints;
40 struct SliceParts;
41 
42 struct SavePoint {
43   size_t size;  // size of rep_
44   int count;    // count of elements in rep_
45   uint32_t content_flags;
46 
SavePointSavePoint47   SavePoint() : size(0), count(0), content_flags(0) {}
48 
SavePointSavePoint49   SavePoint(size_t _size, int _count, uint32_t _flags)
50       : size(_size), count(_count), content_flags(_flags) {}
51 
clearSavePoint52   void clear() {
53     size = 0;
54     count = 0;
55     content_flags = 0;
56   }
57 
is_clearedSavePoint58   bool is_cleared() const { return (size | count | content_flags) == 0; }
59 };
60 
61 class WriteBatch : public WriteBatchBase {
62  public:
63   explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0);
64   // `protection_bytes_per_key` is the number of bytes used to store
65   // protection information for each key entry. Currently supported values are
66   // zero (disabled) and eight.
67   explicit WriteBatch(size_t reserved_bytes, size_t max_bytes,
68                       size_t protection_bytes_per_key);
69   ~WriteBatch() override;
70 
71   using WriteBatchBase::Put;
72   // Store the mapping "key->value" in the database.
73   // The following Put(..., const Slice& key, ...) API can also be used when
74   // user-defined timestamp is enabled as long as `key` points to a contiguous
75   // buffer with timestamp appended after user key. The caller is responsible
76   // for setting up the memory buffer pointed to by `key`.
77   Status Put(ColumnFamilyHandle* column_family, const Slice& key,
78              const Slice& value) override;
Put(const Slice & key,const Slice & value)79   Status Put(const Slice& key, const Slice& value) override {
80     return Put(nullptr, key, value);
81   }
82 
83   // Variant of Put() that gathers output like writev(2).  The key and value
84   // that will be written to the database are concatenations of arrays of
85   // slices.
86   // The following Put(..., const SliceParts& key, ...) API can be used when
87   // user-defined timestamp is enabled as long as the timestamp is the last
88   // Slice in `key`, a SliceParts (array of Slices). The caller is responsible
89   // for setting up the `key` SliceParts object.
90   Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
91              const SliceParts& value) override;
Put(const SliceParts & key,const SliceParts & value)92   Status Put(const SliceParts& key, const SliceParts& value) override {
93     return Put(nullptr, key, value);
94   }
95 
96   using WriteBatchBase::Delete;
97   // If the database contains a mapping for "key", erase it.  Else do nothing.
98   // The following Delete(..., const Slice& key) can be used when user-defined
99   // timestamp is enabled as long as `key` points to a contiguous buffer with
100   // timestamp appended after user key. The caller is responsible for setting
101   // up the memory buffer pointed to by `key`.
102   Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
Delete(const Slice & key)103   Status Delete(const Slice& key) override { return Delete(nullptr, key); }
104 
105   // variant that takes SliceParts
106   // These two variants of Delete(..., const SliceParts& key) can be used when
107   // user-defined timestamp is enabled as long as the timestamp is the last
108   // Slice in `key`, a SliceParts (array of Slices). The caller is responsible
109   // for setting up the `key` SliceParts object.
110   Status Delete(ColumnFamilyHandle* column_family,
111                 const SliceParts& key) override;
Delete(const SliceParts & key)112   Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
113 
114   using WriteBatchBase::SingleDelete;
115   // WriteBatch implementation of DB::SingleDelete().  See db.h.
116   Status SingleDelete(ColumnFamilyHandle* column_family,
117                       const Slice& key) override;
SingleDelete(const Slice & key)118   Status SingleDelete(const Slice& key) override {
119     return SingleDelete(nullptr, key);
120   }
121 
122   // variant that takes SliceParts
123   Status SingleDelete(ColumnFamilyHandle* column_family,
124                       const SliceParts& key) override;
SingleDelete(const SliceParts & key)125   Status SingleDelete(const SliceParts& key) override {
126     return SingleDelete(nullptr, key);
127   }
128 
129   using WriteBatchBase::DeleteRange;
130   // WriteBatch implementation of DB::DeleteRange().  See db.h.
131   Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key,
132                      const Slice& end_key) override;
DeleteRange(const Slice & begin_key,const Slice & end_key)133   Status DeleteRange(const Slice& begin_key, const Slice& end_key) override {
134     return DeleteRange(nullptr, begin_key, end_key);
135   }
136 
137   // variant that takes SliceParts
138   Status DeleteRange(ColumnFamilyHandle* column_family,
139                      const SliceParts& begin_key,
140                      const SliceParts& end_key) override;
DeleteRange(const SliceParts & begin_key,const SliceParts & end_key)141   Status DeleteRange(const SliceParts& begin_key,
142                      const SliceParts& end_key) override {
143     return DeleteRange(nullptr, begin_key, end_key);
144   }
145 
146   using WriteBatchBase::Merge;
147   // Merge "value" with the existing value of "key" in the database.
148   // "key->merge(existing, value)"
149   Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
150                const Slice& value) override;
Merge(const Slice & key,const Slice & value)151   Status Merge(const Slice& key, const Slice& value) override {
152     return Merge(nullptr, key, value);
153   }
154 
155   // variant that takes SliceParts
156   Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key,
157                const SliceParts& value) override;
Merge(const SliceParts & key,const SliceParts & value)158   Status Merge(const SliceParts& key, const SliceParts& value) override {
159     return Merge(nullptr, key, value);
160   }
161 
162   using WriteBatchBase::PutLogData;
163   // Append a blob of arbitrary size to the records in this batch. The blob will
164   // be stored in the transaction log but not in any other file. In particular,
165   // it will not be persisted to the SST files. When iterating over this
166   // WriteBatch, WriteBatch::Handler::LogData will be called with the contents
167   // of the blob as it is encountered. Blobs, puts, deletes, and merges will be
168   // encountered in the same order in which they were inserted. The blob will
169   // NOT consume sequence number(s) and will NOT increase the count of the batch
170   //
171   // Example application: add timestamps to the transaction log for use in
172   // replication.
173   Status PutLogData(const Slice& blob) override;
174 
175   using WriteBatchBase::Clear;
176   // Clear all updates buffered in this batch.
177   void Clear() override;
178 
179   // Records the state of the batch for future calls to RollbackToSavePoint().
180   // May be called multiple times to set multiple save points.
181   void SetSavePoint() override;
182 
183   // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the
184   // most recent call to SetSavePoint() and removes the most recent save point.
185   // If there is no previous call to SetSavePoint(), Status::NotFound()
186   // will be returned.
187   // Otherwise returns Status::OK().
188   Status RollbackToSavePoint() override;
189 
190   // Pop the most recent save point.
191   // If there is no previous call to SetSavePoint(), Status::NotFound()
192   // will be returned.
193   // Otherwise returns Status::OK().
194   Status PopSavePoint() override;
195 
196   // Support for iterating over the contents of a batch.
197   class Handler {
198    public:
199     virtual ~Handler();
200     // All handler functions in this class provide default implementations so
201     // we won't break existing clients of Handler on a source code level when
202     // adding a new member function.
203 
204     // default implementation will just call Put without column family for
205     // backwards compatibility. If the column family is not default,
206     // the function is noop
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)207     virtual Status PutCF(uint32_t column_family_id, const Slice& key,
208                          const Slice& value) {
209       if (column_family_id == 0) {
210         // Put() historically doesn't return status. We didn't want to be
211         // backwards incompatible so we didn't change the return status
212         // (this is a public API). We do an ordinary get and return Status::OK()
213         Put(key, value);
214         return Status::OK();
215       }
216       return Status::InvalidArgument(
217           "non-default column family and PutCF not implemented");
218     }
Put(const Slice &,const Slice &)219     virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {}
220 
DeleteCF(uint32_t column_family_id,const Slice & key)221     virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
222       if (column_family_id == 0) {
223         Delete(key);
224         return Status::OK();
225       }
226       return Status::InvalidArgument(
227           "non-default column family and DeleteCF not implemented");
228     }
Delete(const Slice &)229     virtual void Delete(const Slice& /*key*/) {}
230 
SingleDeleteCF(uint32_t column_family_id,const Slice & key)231     virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) {
232       if (column_family_id == 0) {
233         SingleDelete(key);
234         return Status::OK();
235       }
236       return Status::InvalidArgument(
237           "non-default column family and SingleDeleteCF not implemented");
238     }
SingleDelete(const Slice &)239     virtual void SingleDelete(const Slice& /*key*/) {}
240 
DeleteRangeCF(uint32_t,const Slice &,const Slice &)241     virtual Status DeleteRangeCF(uint32_t /*column_family_id*/,
242                                  const Slice& /*begin_key*/,
243                                  const Slice& /*end_key*/) {
244       return Status::InvalidArgument("DeleteRangeCF not implemented");
245     }
246 
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)247     virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
248                            const Slice& value) {
249       if (column_family_id == 0) {
250         Merge(key, value);
251         return Status::OK();
252       }
253       return Status::InvalidArgument(
254           "non-default column family and MergeCF not implemented");
255     }
Merge(const Slice &,const Slice &)256     virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {}
257 
PutBlobIndexCF(uint32_t,const Slice &,const Slice &)258     virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/,
259                                   const Slice& /*key*/,
260                                   const Slice& /*value*/) {
261       return Status::InvalidArgument("PutBlobIndexCF not implemented");
262     }
263 
264     // The default implementation of LogData does nothing.
265     virtual void LogData(const Slice& blob);
266 
267     virtual Status MarkBeginPrepare(bool = false) {
268       return Status::InvalidArgument("MarkBeginPrepare() handler not defined.");
269     }
270 
MarkEndPrepare(const Slice &)271     virtual Status MarkEndPrepare(const Slice& /*xid*/) {
272       return Status::InvalidArgument("MarkEndPrepare() handler not defined.");
273     }
274 
MarkNoop(bool)275     virtual Status MarkNoop(bool /*empty_batch*/) {
276       return Status::InvalidArgument("MarkNoop() handler not defined.");
277     }
278 
MarkRollback(const Slice &)279     virtual Status MarkRollback(const Slice& /*xid*/) {
280       return Status::InvalidArgument(
281           "MarkRollbackPrepare() handler not defined.");
282     }
283 
MarkCommit(const Slice &)284     virtual Status MarkCommit(const Slice& /*xid*/) {
285       return Status::InvalidArgument("MarkCommit() handler not defined.");
286     }
287 
288     // Continue is called by WriteBatch::Iterate. If it returns false,
289     // iteration is halted. Otherwise, it continues iterating. The default
290     // implementation always returns true.
291     virtual bool Continue();
292 
293    protected:
294     friend class WriteBatchInternal;
WriteAfterCommit()295     virtual bool WriteAfterCommit() const { return true; }
WriteBeforePrepare()296     virtual bool WriteBeforePrepare() const { return false; }
297   };
298   Status Iterate(Handler* handler) const;
299 
300   // Retrieve the serialized version of this batch.
Data()301   const std::string& Data() const { return rep_; }
302 
303   // Retrieve data size of the batch.
GetDataSize()304   size_t GetDataSize() const { return rep_.size(); }
305 
306   // Returns the number of updates in the batch
307   uint32_t Count() const;
308 
309   // Returns true if PutCF will be called during Iterate
310   bool HasPut() const;
311 
312   // Returns true if DeleteCF will be called during Iterate
313   bool HasDelete() const;
314 
315   // Returns true if SingleDeleteCF will be called during Iterate
316   bool HasSingleDelete() const;
317 
318   // Returns true if DeleteRangeCF will be called during Iterate
319   bool HasDeleteRange() const;
320 
321   // Returns true if MergeCF will be called during Iterate
322   bool HasMerge() const;
323 
324   // Returns true if MarkBeginPrepare will be called during Iterate
325   bool HasBeginPrepare() const;
326 
327   // Returns true if MarkEndPrepare will be called during Iterate
328   bool HasEndPrepare() const;
329 
330   // Returns true if MarkCommit will be called during Iterate
331   bool HasCommit() const;
332 
333   // Returns true if MarkRollback will be called during Iterate
334   bool HasRollback() const;
335 
336   // Assign timestamp to write batch.
337   // This requires that all keys (possibly from multiple column families) in
338   // the write batch have timestamps of the same format.
339   Status AssignTimestamp(const Slice& ts);
340 
341   // Assign timestamps to write batch.
342   // This API allows the write batch to include keys from multiple column
343   // families whose timestamps' formats can differ. For example, some column
344   // families can enable timestamp, while others disable the feature.
345   // If key does not have timestamp, then put an empty Slice in ts_list as
346   // a placeholder.
347   Status AssignTimestamps(const std::vector<Slice>& ts_list);
348 
349   using WriteBatchBase::GetWriteBatch;
GetWriteBatch()350   WriteBatch* GetWriteBatch() override { return this; }
351 
352   // Constructor with a serialized string object
353   explicit WriteBatch(const std::string& rep);
354   explicit WriteBatch(std::string&& rep);
355 
356   WriteBatch(const WriteBatch& src);
357   WriteBatch(WriteBatch&& src) noexcept;
358   WriteBatch& operator=(const WriteBatch& src);
359   WriteBatch& operator=(WriteBatch&& src);
360 
361   // marks this point in the WriteBatch as the last record to
362   // be inserted into the WAL, provided the WAL is enabled
363   void MarkWalTerminationPoint();
GetWalTerminationPoint()364   const SavePoint& GetWalTerminationPoint() const { return wal_term_point_; }
365 
SetMaxBytes(size_t max_bytes)366   void SetMaxBytes(size_t max_bytes) override { max_bytes_ = max_bytes; }
367 
368   struct ProtectionInfo;
369   size_t GetProtectionBytesPerKey() const;
370 
371  private:
372   friend class WriteBatchInternal;
373   friend class LocalSavePoint;
374   // TODO(myabandeh): this is needed for a hack to collapse the write batch and
375   // remove duplicate keys. Remove it when the hack is replaced with a proper
376   // solution.
377   friend class WriteBatchWithIndex;
378   std::unique_ptr<SavePoints> save_points_;
379 
380   // When sending a WriteBatch through WriteImpl we might want to
381   // specify that only the first x records of the batch be written to
382   // the WAL.
383   SavePoint wal_term_point_;
384 
385   // For HasXYZ.  Mutable to allow lazy computation of results
386   mutable std::atomic<uint32_t> content_flags_;
387 
388   // Performs deferred computation of content_flags if necessary
389   uint32_t ComputeContentFlags() const;
390 
391   // Maximum size of rep_.
392   size_t max_bytes_;
393 
394   // Is the content of the batch the application's latest state that meant only
395   // to be used for recovery? Refer to
396   // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery for
397   // more details.
398   bool is_latest_persistent_state_ = false;
399 
400   std::unique_ptr<ProtectionInfo> prot_info_;
401 
402  protected:
403   std::string rep_;  // See comment in write_batch.cc for the format of rep_
404 };
405 
406 }  // namespace ROCKSDB_NAMESPACE
407