1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/db_impl/db_impl_readonly.h"
7 
8 #include "db/arena_wrapped_db_iter.h"
9 #include "db/db_impl/compacted_db_impl.h"
10 #include "db/db_impl/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/merge_context.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "util/cast_util.h"
15 
16 namespace ROCKSDB_NAMESPACE {
17 
18 #ifndef ROCKSDB_LITE
19 
DBImplReadOnly(const DBOptions & db_options,const std::string & dbname)20 DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
21                                const std::string& dbname)
22     : DBImpl(db_options, dbname, /*seq_per_batch*/ false,
23              /*batch_per_txn*/ true, /*read_only*/ true) {
24   ROCKS_LOG_INFO(immutable_db_options_.info_log,
25                  "Opening the db in read only mode");
26   LogFlush(immutable_db_options_.info_log);
27 }
28 
~DBImplReadOnly()29 DBImplReadOnly::~DBImplReadOnly() {}
30 
31 // Implementations of the DB interface
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)32 Status DBImplReadOnly::Get(const ReadOptions& read_options,
33                            ColumnFamilyHandle* column_family, const Slice& key,
34                            PinnableSlice* pinnable_val) {
35   assert(pinnable_val != nullptr);
36   // TODO: stopwatch DB_GET needed?, perf timer needed?
37   PERF_TIMER_GUARD(get_snapshot_time);
38   Status s;
39   SequenceNumber snapshot = versions_->LastSequence();
40   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
41   auto cfd = cfh->cfd();
42   if (tracer_) {
43     InstrumentedMutexLock lock(&trace_mutex_);
44     if (tracer_) {
45       tracer_->Get(column_family, key);
46     }
47   }
48   SuperVersion* super_version = cfd->GetSuperVersion();
49   MergeContext merge_context;
50   SequenceNumber max_covering_tombstone_seq = 0;
51   LookupKey lkey(key, snapshot);
52   PERF_TIMER_STOP(get_snapshot_time);
53   if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
54                               /*timestamp=*/nullptr, &s, &merge_context,
55                               &max_covering_tombstone_seq, read_options)) {
56     pinnable_val->PinSelf();
57     RecordTick(stats_, MEMTABLE_HIT);
58   } else {
59     PERF_TIMER_GUARD(get_from_output_files_time);
60     super_version->current->Get(read_options, lkey, pinnable_val,
61                                 /*timestamp=*/nullptr, &s, &merge_context,
62                                 &max_covering_tombstone_seq);
63     RecordTick(stats_, MEMTABLE_MISS);
64   }
65   RecordTick(stats_, NUMBER_KEYS_READ);
66   size_t size = pinnable_val->size();
67   RecordTick(stats_, BYTES_READ, size);
68   RecordInHistogram(stats_, BYTES_PER_READ, size);
69   PERF_COUNTER_ADD(get_read_bytes, size);
70   return s;
71 }
72 
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)73 Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
74                                       ColumnFamilyHandle* column_family) {
75   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
76   auto cfd = cfh->cfd();
77   SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
78   SequenceNumber latest_snapshot = versions_->LastSequence();
79   SequenceNumber read_seq =
80       read_options.snapshot != nullptr
81           ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
82                 ->number_
83           : latest_snapshot;
84   ReadCallback* read_callback = nullptr;  // No read callback provided.
85   auto db_iter = NewArenaWrappedDbIterator(
86       env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
87       super_version->current, read_seq,
88       super_version->mutable_cf_options.max_sequential_skip_in_iterations,
89       super_version->version_number, read_callback);
90   auto internal_iter = NewInternalIterator(
91       db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
92       db_iter->GetRangeDelAggregator(), read_seq,
93       /* allow_unprepared_value */ true);
94   db_iter->SetIterUnderDBIter(internal_iter);
95   return db_iter;
96 }
97 
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)98 Status DBImplReadOnly::NewIterators(
99     const ReadOptions& read_options,
100     const std::vector<ColumnFamilyHandle*>& column_families,
101     std::vector<Iterator*>* iterators) {
102   ReadCallback* read_callback = nullptr;  // No read callback provided.
103   if (iterators == nullptr) {
104     return Status::InvalidArgument("iterators not allowed to be nullptr");
105   }
106   iterators->clear();
107   iterators->reserve(column_families.size());
108   SequenceNumber latest_snapshot = versions_->LastSequence();
109   SequenceNumber read_seq =
110       read_options.snapshot != nullptr
111           ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
112                 ->number_
113           : latest_snapshot;
114 
115   for (auto cfh : column_families) {
116     auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
117     auto* sv = cfd->GetSuperVersion()->Ref();
118     auto* db_iter = NewArenaWrappedDbIterator(
119         env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
120         sv->current, read_seq,
121         sv->mutable_cf_options.max_sequential_skip_in_iterations,
122         sv->version_number, read_callback);
123     auto* internal_iter = NewInternalIterator(
124         db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
125         db_iter->GetRangeDelAggregator(), read_seq,
126         /* allow_unprepared_value */ true);
127     db_iter->SetIterUnderDBIter(internal_iter);
128     iterators->push_back(db_iter);
129   }
130 
131   return Status::OK();
132 }
133 
134 namespace {
135 // Return OK if dbname exists in the file system or create it if
136 // create_if_missing
OpenForReadOnlyCheckExistence(const DBOptions & db_options,const std::string & dbname)137 Status OpenForReadOnlyCheckExistence(const DBOptions& db_options,
138                                      const std::string& dbname) {
139   Status s;
140   if (!db_options.create_if_missing) {
141     // Attempt to read "CURRENT" file
142     const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
143     std::string manifest_path;
144     uint64_t manifest_file_number;
145     s = VersionSet::GetCurrentManifestPath(dbname, fs.get(), &manifest_path,
146                                            &manifest_file_number);
147   } else {
148     // Historic behavior that doesn't necessarily make sense
149     s = db_options.env->CreateDirIfMissing(dbname);
150   }
151   return s;
152 }
153 }  // namespace
154 
OpenForReadOnly(const Options & options,const std::string & dbname,DB ** dbptr,bool)155 Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
156                            DB** dbptr, bool /*error_if_wal_file_exists*/) {
157   Status s = OpenForReadOnlyCheckExistence(options, dbname);
158   if (!s.ok()) {
159     return s;
160   }
161 
162   *dbptr = nullptr;
163 
164   // Try to first open DB as fully compacted DB
165   s = CompactedDBImpl::Open(options, dbname, dbptr);
166   if (s.ok()) {
167     return s;
168   }
169 
170   DBOptions db_options(options);
171   ColumnFamilyOptions cf_options(options);
172   std::vector<ColumnFamilyDescriptor> column_families;
173   column_families.push_back(
174       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
175   std::vector<ColumnFamilyHandle*> handles;
176 
177   s = DBImplReadOnly::OpenForReadOnlyWithoutCheck(
178       db_options, dbname, column_families, &handles, dbptr);
179   if (s.ok()) {
180     assert(handles.size() == 1);
181     // i can delete the handle since DBImpl is always holding a
182     // reference to default column family
183     delete handles[0];
184   }
185   return s;
186 }
187 
OpenForReadOnly(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,bool error_if_wal_file_exists)188 Status DB::OpenForReadOnly(
189     const DBOptions& db_options, const std::string& dbname,
190     const std::vector<ColumnFamilyDescriptor>& column_families,
191     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
192     bool error_if_wal_file_exists) {
193   // If dbname does not exist in the file system, should not do anything
194   Status s = OpenForReadOnlyCheckExistence(db_options, dbname);
195   if (!s.ok()) {
196     return s;
197   }
198 
199   return DBImplReadOnly::OpenForReadOnlyWithoutCheck(
200       db_options, dbname, column_families, handles, dbptr,
201       error_if_wal_file_exists);
202 }
203 
OpenForReadOnlyWithoutCheck(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,bool error_if_wal_file_exists)204 Status DBImplReadOnly::OpenForReadOnlyWithoutCheck(
205     const DBOptions& db_options, const std::string& dbname,
206     const std::vector<ColumnFamilyDescriptor>& column_families,
207     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
208     bool error_if_wal_file_exists) {
209   *dbptr = nullptr;
210   handles->clear();
211 
212   SuperVersionContext sv_context(/* create_superversion */ true);
213   DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
214   impl->mutex_.Lock();
215   Status s = impl->Recover(column_families, true /* read only */,
216                            error_if_wal_file_exists);
217   if (s.ok()) {
218     // set column family handles
219     for (auto cf : column_families) {
220       auto cfd =
221           impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
222       if (cfd == nullptr) {
223         s = Status::InvalidArgument("Column family not found", cf.name);
224         break;
225       }
226       handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
227     }
228   }
229   if (s.ok()) {
230     for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
231       sv_context.NewSuperVersion();
232       cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
233     }
234   }
235   impl->mutex_.Unlock();
236   sv_context.Clean();
237   if (s.ok()) {
238     *dbptr = impl;
239     for (auto* h : *handles) {
240       impl->NewThreadStatusCfInfo(
241           static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
242     }
243   } else {
244     for (auto h : *handles) {
245       delete h;
246     }
247     handles->clear();
248     delete impl;
249   }
250   return s;
251 }
252 
253 #else   // !ROCKSDB_LITE
254 
255 Status DB::OpenForReadOnly(const Options& /*options*/,
256                            const std::string& /*dbname*/, DB** /*dbptr*/,
257                            bool /*error_if_wal_file_exists*/) {
258   return Status::NotSupported("Not supported in ROCKSDB_LITE.");
259 }
260 
261 Status DB::OpenForReadOnly(
262     const DBOptions& /*db_options*/, const std::string& /*dbname*/,
263     const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
264     std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
265     bool /*error_if_wal_file_exists*/) {
266   return Status::NotSupported("Not supported in ROCKSDB_LITE.");
267 }
268 #endif  // !ROCKSDB_LITE
269 
270 }  // namespace ROCKSDB_NAMESPACE
271