1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
6 // Use of this source code is governed by a BSD-style license that can be
7 // found in the LICENSE file. See the AUTHORS file for names of contributors.
8 //
9 // WriteBatch holds a collection of updates to apply atomically to a DB.
10 //
11 // The updates are applied in the order in which they are added
12 // to the WriteBatch.  For example, the value of "key" will be "v3"
13 // after the following batch is written:
14 //
15 //    batch.Put("key", "v1");
16 //    batch.Delete("key");
17 //    batch.Put("key", "v2");
18 //    batch.Put("key", "v3");
19 //
20 // Multiple threads can invoke const methods on a WriteBatch without
21 // external synchronization, but if any of the threads may call a
22 // non-const method, all threads accessing the same WriteBatch must use
23 // external synchronization.
24 
25 #pragma once
26 
27 #include <stdint.h>
28 #include <atomic>
29 #include <memory>
30 #include <string>
31 #include <vector>
32 #include "rocksdb/status.h"
33 #include "rocksdb/write_batch_base.h"
34 
35 namespace rocksdb {
36 
37 class Slice;
38 class ColumnFamilyHandle;
39 struct SavePoints;
40 struct SliceParts;
41 
42 struct SavePoint {
43   size_t size;  // size of rep_
44   int count;    // count of elements in rep_
45   uint32_t content_flags;
46 
SavePointSavePoint47   SavePoint() : size(0), count(0), content_flags(0) {}
48 
SavePointSavePoint49   SavePoint(size_t _size, int _count, uint32_t _flags)
50       : size(_size), count(_count), content_flags(_flags) {}
51 
clearSavePoint52   void clear() {
53     size = 0;
54     count = 0;
55     content_flags = 0;
56   }
57 
is_clearedSavePoint58   bool is_cleared() const { return (size | count | content_flags) == 0; }
59 };
60 
61 class WriteBatch : public WriteBatchBase {
62  public:
63   explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0);
64   explicit WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz);
65   ~WriteBatch() override;
66 
67   using WriteBatchBase::Put;
68   // Store the mapping "key->value" in the database.
69   Status Put(ColumnFamilyHandle* column_family, const Slice& key,
70              const Slice& value) override;
Put(const Slice & key,const Slice & value)71   Status Put(const Slice& key, const Slice& value) override {
72     return Put(nullptr, key, value);
73   }
74 
75   // Variant of Put() that gathers output like writev(2).  The key and value
76   // that will be written to the database are concatenations of arrays of
77   // slices.
78   Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
79              const SliceParts& value) override;
Put(const SliceParts & key,const SliceParts & value)80   Status Put(const SliceParts& key, const SliceParts& value) override {
81     return Put(nullptr, key, value);
82   }
83 
84   using WriteBatchBase::Delete;
85   // If the database contains a mapping for "key", erase it.  Else do nothing.
86   Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
Delete(const Slice & key)87   Status Delete(const Slice& key) override { return Delete(nullptr, key); }
88 
89   // variant that takes SliceParts
90   Status Delete(ColumnFamilyHandle* column_family,
91                 const SliceParts& key) override;
Delete(const SliceParts & key)92   Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
93 
94   using WriteBatchBase::SingleDelete;
95   // WriteBatch implementation of DB::SingleDelete().  See db.h.
96   Status SingleDelete(ColumnFamilyHandle* column_family,
97                       const Slice& key) override;
SingleDelete(const Slice & key)98   Status SingleDelete(const Slice& key) override {
99     return SingleDelete(nullptr, key);
100   }
101 
102   // variant that takes SliceParts
103   Status SingleDelete(ColumnFamilyHandle* column_family,
104                       const SliceParts& key) override;
SingleDelete(const SliceParts & key)105   Status SingleDelete(const SliceParts& key) override {
106     return SingleDelete(nullptr, key);
107   }
108 
109   using WriteBatchBase::DeleteRange;
110   // WriteBatch implementation of DB::DeleteRange().  See db.h.
111   Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key,
112                      const Slice& end_key) override;
DeleteRange(const Slice & begin_key,const Slice & end_key)113   Status DeleteRange(const Slice& begin_key, const Slice& end_key) override {
114     return DeleteRange(nullptr, begin_key, end_key);
115   }
116 
117   // variant that takes SliceParts
118   Status DeleteRange(ColumnFamilyHandle* column_family,
119                      const SliceParts& begin_key,
120                      const SliceParts& end_key) override;
DeleteRange(const SliceParts & begin_key,const SliceParts & end_key)121   Status DeleteRange(const SliceParts& begin_key,
122                      const SliceParts& end_key) override {
123     return DeleteRange(nullptr, begin_key, end_key);
124   }
125 
126   using WriteBatchBase::Merge;
127   // Merge "value" with the existing value of "key" in the database.
128   // "key->merge(existing, value)"
129   Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
130                const Slice& value) override;
Merge(const Slice & key,const Slice & value)131   Status Merge(const Slice& key, const Slice& value) override {
132     return Merge(nullptr, key, value);
133   }
134 
135   // variant that takes SliceParts
136   Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key,
137                const SliceParts& value) override;
Merge(const SliceParts & key,const SliceParts & value)138   Status Merge(const SliceParts& key, const SliceParts& value) override {
139     return Merge(nullptr, key, value);
140   }
141 
142   using WriteBatchBase::PutLogData;
143   // Append a blob of arbitrary size to the records in this batch. The blob will
144   // be stored in the transaction log but not in any other file. In particular,
145   // it will not be persisted to the SST files. When iterating over this
146   // WriteBatch, WriteBatch::Handler::LogData will be called with the contents
147   // of the blob as it is encountered. Blobs, puts, deletes, and merges will be
148   // encountered in the same order in which they were inserted. The blob will
149   // NOT consume sequence number(s) and will NOT increase the count of the batch
150   //
151   // Example application: add timestamps to the transaction log for use in
152   // replication.
153   Status PutLogData(const Slice& blob) override;
154 
155   using WriteBatchBase::Clear;
156   // Clear all updates buffered in this batch.
157   void Clear() override;
158 
159   // Records the state of the batch for future calls to RollbackToSavePoint().
160   // May be called multiple times to set multiple save points.
161   void SetSavePoint() override;
162 
163   // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the
164   // most recent call to SetSavePoint() and removes the most recent save point.
165   // If there is no previous call to SetSavePoint(), Status::NotFound()
166   // will be returned.
167   // Otherwise returns Status::OK().
168   Status RollbackToSavePoint() override;
169 
170   // Pop the most recent save point.
171   // If there is no previous call to SetSavePoint(), Status::NotFound()
172   // will be returned.
173   // Otherwise returns Status::OK().
174   Status PopSavePoint() override;
175 
176   // Support for iterating over the contents of a batch.
177   class Handler {
178    public:
179     virtual ~Handler();
180     // All handler functions in this class provide default implementations so
181     // we won't break existing clients of Handler on a source code level when
182     // adding a new member function.
183 
184     // default implementation will just call Put without column family for
185     // backwards compatibility. If the column family is not default,
186     // the function is noop
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)187     virtual Status PutCF(uint32_t column_family_id, const Slice& key,
188                          const Slice& value) {
189       if (column_family_id == 0) {
190         // Put() historically doesn't return status. We didn't want to be
191         // backwards incompatible so we didn't change the return status
192         // (this is a public API). We do an ordinary get and return Status::OK()
193         Put(key, value);
194         return Status::OK();
195       }
196       return Status::InvalidArgument(
197           "non-default column family and PutCF not implemented");
198     }
Put(const Slice &,const Slice &)199     virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {}
200 
DeleteCF(uint32_t column_family_id,const Slice & key)201     virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
202       if (column_family_id == 0) {
203         Delete(key);
204         return Status::OK();
205       }
206       return Status::InvalidArgument(
207           "non-default column family and DeleteCF not implemented");
208     }
Delete(const Slice &)209     virtual void Delete(const Slice& /*key*/) {}
210 
SingleDeleteCF(uint32_t column_family_id,const Slice & key)211     virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) {
212       if (column_family_id == 0) {
213         SingleDelete(key);
214         return Status::OK();
215       }
216       return Status::InvalidArgument(
217           "non-default column family and SingleDeleteCF not implemented");
218     }
SingleDelete(const Slice &)219     virtual void SingleDelete(const Slice& /*key*/) {}
220 
DeleteRangeCF(uint32_t,const Slice &,const Slice &)221     virtual Status DeleteRangeCF(uint32_t /*column_family_id*/,
222                                  const Slice& /*begin_key*/,
223                                  const Slice& /*end_key*/) {
224       return Status::InvalidArgument("DeleteRangeCF not implemented");
225     }
226 
MergeCF(uint32_t column_family_id,const Slice & key,const Slice & value)227     virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
228                            const Slice& value) {
229       if (column_family_id == 0) {
230         Merge(key, value);
231         return Status::OK();
232       }
233       return Status::InvalidArgument(
234           "non-default column family and MergeCF not implemented");
235     }
Merge(const Slice &,const Slice &)236     virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {}
237 
PutBlobIndexCF(uint32_t,const Slice &,const Slice &)238     virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/,
239                                   const Slice& /*key*/,
240                                   const Slice& /*value*/) {
241       return Status::InvalidArgument("PutBlobIndexCF not implemented");
242     }
243 
244     // The default implementation of LogData does nothing.
245     virtual void LogData(const Slice& blob);
246 
247     virtual Status MarkBeginPrepare(bool = false) {
248       return Status::InvalidArgument("MarkBeginPrepare() handler not defined.");
249     }
250 
MarkEndPrepare(const Slice &)251     virtual Status MarkEndPrepare(const Slice& /*xid*/) {
252       return Status::InvalidArgument("MarkEndPrepare() handler not defined.");
253     }
254 
MarkNoop(bool)255     virtual Status MarkNoop(bool /*empty_batch*/) {
256       return Status::InvalidArgument("MarkNoop() handler not defined.");
257     }
258 
MarkRollback(const Slice &)259     virtual Status MarkRollback(const Slice& /*xid*/) {
260       return Status::InvalidArgument(
261           "MarkRollbackPrepare() handler not defined.");
262     }
263 
MarkCommit(const Slice &)264     virtual Status MarkCommit(const Slice& /*xid*/) {
265       return Status::InvalidArgument("MarkCommit() handler not defined.");
266     }
267 
268     // Continue is called by WriteBatch::Iterate. If it returns false,
269     // iteration is halted. Otherwise, it continues iterating. The default
270     // implementation always returns true.
271     virtual bool Continue();
272 
273    protected:
274     friend class WriteBatchInternal;
WriteAfterCommit()275     virtual bool WriteAfterCommit() const { return true; }
WriteBeforePrepare()276     virtual bool WriteBeforePrepare() const { return false; }
277   };
278   Status Iterate(Handler* handler) const;
279 
280   // Retrieve the serialized version of this batch.
Data()281   const std::string& Data() const { return rep_; }
282 
283   // Retrieve data size of the batch.
GetDataSize()284   size_t GetDataSize() const { return rep_.size(); }
285 
286   // Returns the number of updates in the batch
287   uint32_t Count() const;
288 
289   // Returns true if PutCF will be called during Iterate
290   bool HasPut() const;
291 
292   // Returns true if DeleteCF will be called during Iterate
293   bool HasDelete() const;
294 
295   // Returns true if SingleDeleteCF will be called during Iterate
296   bool HasSingleDelete() const;
297 
298   // Returns true if DeleteRangeCF will be called during Iterate
299   bool HasDeleteRange() const;
300 
301   // Returns true if MergeCF will be called during Iterate
302   bool HasMerge() const;
303 
304   // Returns true if MarkBeginPrepare will be called during Iterate
305   bool HasBeginPrepare() const;
306 
307   // Returns true if MarkEndPrepare will be called during Iterate
308   bool HasEndPrepare() const;
309 
310   // Returns trie if MarkCommit will be called during Iterate
311   bool HasCommit() const;
312 
313   // Returns trie if MarkRollback will be called during Iterate
314   bool HasRollback() const;
315 
316   // Assign timestamp to write batch
317   Status AssignTimestamp(const Slice& ts);
318 
319   // Assign timestamps to write batch
320   Status AssignTimestamps(const std::vector<Slice>& ts_list);
321 
322   using WriteBatchBase::GetWriteBatch;
GetWriteBatch()323   WriteBatch* GetWriteBatch() override { return this; }
324 
325   // Constructor with a serialized string object
326   explicit WriteBatch(const std::string& rep);
327   explicit WriteBatch(std::string&& rep);
328 
329   WriteBatch(const WriteBatch& src);
330   WriteBatch(WriteBatch&& src) noexcept;
331   WriteBatch& operator=(const WriteBatch& src);
332   WriteBatch& operator=(WriteBatch&& src);
333 
334   // marks this point in the WriteBatch as the last record to
335   // be inserted into the WAL, provided the WAL is enabled
336   void MarkWalTerminationPoint();
GetWalTerminationPoint()337   const SavePoint& GetWalTerminationPoint() const { return wal_term_point_; }
338 
SetMaxBytes(size_t max_bytes)339   void SetMaxBytes(size_t max_bytes) override { max_bytes_ = max_bytes; }
340 
341  private:
342   friend class WriteBatchInternal;
343   friend class LocalSavePoint;
344   // TODO(myabandeh): this is needed for a hack to collapse the write batch and
345   // remove duplicate keys. Remove it when the hack is replaced with a proper
346   // solution.
347   friend class WriteBatchWithIndex;
348   std::unique_ptr<SavePoints> save_points_;
349 
350   // When sending a WriteBatch through WriteImpl we might want to
351   // specify that only the first x records of the batch be written to
352   // the WAL.
353   SavePoint wal_term_point_;
354 
355   // For HasXYZ.  Mutable to allow lazy computation of results
356   mutable std::atomic<uint32_t> content_flags_;
357 
358   // Performs deferred computation of content_flags if necessary
359   uint32_t ComputeContentFlags() const;
360 
361   // Maximum size of rep_.
362   size_t max_bytes_;
363 
364   // Is the content of the batch the application's latest state that meant only
365   // to be used for recovery? Refer to
366   // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery for
367   // more details.
368   bool is_latest_persistent_state_ = false;
369 
370  protected:
371   std::string rep_;  // See comment in write_batch.cc for the format of rep_
372   const size_t timestamp_size_;
373 
374   // Intentionally copyable
375 };
376 
377 }  // namespace rocksdb
378