1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 
8 #include <cassert>
9 
10 #include "db/blob/blob_garbage_meter.h"
11 #include "rocksdb/rocksdb_namespace.h"
12 #include "rocksdb/status.h"
13 #include "table/internal_iterator.h"
14 #include "test_util/sync_point.h"
15 
16 namespace ROCKSDB_NAMESPACE {
17 
18 // An internal iterator that passes each key-value encountered to
19 // BlobGarbageMeter as inflow in order to measure the total number and size of
20 // blobs in the compaction input on a per-blob file basis.
21 class BlobCountingIterator : public InternalIterator {
22  public:
BlobCountingIterator(InternalIterator * iter,BlobGarbageMeter * blob_garbage_meter)23   BlobCountingIterator(InternalIterator* iter,
24                        BlobGarbageMeter* blob_garbage_meter)
25       : iter_(iter), blob_garbage_meter_(blob_garbage_meter) {
26     assert(iter_);
27     assert(blob_garbage_meter_);
28 
29     UpdateAndCountBlobIfNeeded();
30   }
31 
Valid()32   bool Valid() const override { return iter_->Valid() && status_.ok(); }
33 
SeekToFirst()34   void SeekToFirst() override {
35     iter_->SeekToFirst();
36     UpdateAndCountBlobIfNeeded();
37   }
38 
SeekToLast()39   void SeekToLast() override {
40     iter_->SeekToLast();
41     UpdateAndCountBlobIfNeeded();
42   }
43 
Seek(const Slice & target)44   void Seek(const Slice& target) override {
45     iter_->Seek(target);
46     UpdateAndCountBlobIfNeeded();
47   }
48 
SeekForPrev(const Slice & target)49   void SeekForPrev(const Slice& target) override {
50     iter_->SeekForPrev(target);
51     UpdateAndCountBlobIfNeeded();
52   }
53 
Next()54   void Next() override {
55     assert(Valid());
56 
57     iter_->Next();
58     UpdateAndCountBlobIfNeeded();
59   }
60 
NextAndGetResult(IterateResult * result)61   bool NextAndGetResult(IterateResult* result) override {
62     assert(Valid());
63 
64     const bool res = iter_->NextAndGetResult(result);
65     UpdateAndCountBlobIfNeeded();
66     return res;
67   }
68 
Prev()69   void Prev() override {
70     assert(Valid());
71 
72     iter_->Prev();
73     UpdateAndCountBlobIfNeeded();
74   }
75 
key()76   Slice key() const override {
77     assert(Valid());
78     return iter_->key();
79   }
80 
user_key()81   Slice user_key() const override {
82     assert(Valid());
83     return iter_->user_key();
84   }
85 
value()86   Slice value() const override {
87     assert(Valid());
88     return iter_->value();
89   }
90 
status()91   Status status() const override { return status_; }
92 
PrepareValue()93   bool PrepareValue() override {
94     assert(Valid());
95     return iter_->PrepareValue();
96   }
97 
MayBeOutOfLowerBound()98   bool MayBeOutOfLowerBound() override {
99     assert(Valid());
100     return iter_->MayBeOutOfLowerBound();
101   }
102 
UpperBoundCheckResult()103   IterBoundCheck UpperBoundCheckResult() override {
104     assert(Valid());
105     return iter_->UpperBoundCheckResult();
106   }
107 
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)108   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
109     iter_->SetPinnedItersMgr(pinned_iters_mgr);
110   }
111 
IsKeyPinned()112   bool IsKeyPinned() const override {
113     assert(Valid());
114     return iter_->IsKeyPinned();
115   }
116 
IsValuePinned()117   bool IsValuePinned() const override {
118     assert(Valid());
119     return iter_->IsValuePinned();
120   }
121 
GetProperty(std::string prop_name,std::string * prop)122   Status GetProperty(std::string prop_name, std::string* prop) override {
123     return iter_->GetProperty(prop_name, prop);
124   }
125 
126  private:
UpdateAndCountBlobIfNeeded()127   void UpdateAndCountBlobIfNeeded() {
128     assert(!iter_->Valid() || iter_->status().ok());
129 
130     if (!iter_->Valid()) {
131       status_ = iter_->status();
132       return;
133     }
134 
135     TEST_SYNC_POINT(
136         "BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow");
137 
138     status_ = blob_garbage_meter_->ProcessInFlow(key(), value());
139   }
140 
141   InternalIterator* iter_;
142   BlobGarbageMeter* blob_garbage_meter_;
143   Status status_;
144 };
145 
146 }  // namespace ROCKSDB_NAMESPACE
147