1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/db_impl/db_impl_readonly.h"
7
8 #include "db/arena_wrapped_db_iter.h"
9 #include "db/db_impl/compacted_db_impl.h"
10 #include "db/db_impl/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/merge_context.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "util/cast_util.h"
15
16 namespace ROCKSDB_NAMESPACE {
17
18 #ifndef ROCKSDB_LITE
19
DBImplReadOnly(const DBOptions & db_options,const std::string & dbname)20 DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
21 const std::string& dbname)
22 : DBImpl(db_options, dbname, /*seq_per_batch*/ false,
23 /*batch_per_txn*/ true, /*read_only*/ true) {
24 ROCKS_LOG_INFO(immutable_db_options_.info_log,
25 "Opening the db in read only mode");
26 LogFlush(immutable_db_options_.info_log);
27 }
28
~DBImplReadOnly()29 DBImplReadOnly::~DBImplReadOnly() {}
30
31 // Implementations of the DB interface
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)32 Status DBImplReadOnly::Get(const ReadOptions& read_options,
33 ColumnFamilyHandle* column_family, const Slice& key,
34 PinnableSlice* pinnable_val) {
35 assert(pinnable_val != nullptr);
36 // TODO: stopwatch DB_GET needed?, perf timer needed?
37 PERF_TIMER_GUARD(get_snapshot_time);
38 Status s;
39 SequenceNumber snapshot = versions_->LastSequence();
40 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
41 auto cfd = cfh->cfd();
42 if (tracer_) {
43 InstrumentedMutexLock lock(&trace_mutex_);
44 if (tracer_) {
45 tracer_->Get(column_family, key);
46 }
47 }
48 SuperVersion* super_version = cfd->GetSuperVersion();
49 MergeContext merge_context;
50 SequenceNumber max_covering_tombstone_seq = 0;
51 LookupKey lkey(key, snapshot);
52 PERF_TIMER_STOP(get_snapshot_time);
53 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
54 /*timestamp=*/nullptr, &s, &merge_context,
55 &max_covering_tombstone_seq, read_options)) {
56 pinnable_val->PinSelf();
57 RecordTick(stats_, MEMTABLE_HIT);
58 } else {
59 PERF_TIMER_GUARD(get_from_output_files_time);
60 super_version->current->Get(read_options, lkey, pinnable_val,
61 /*timestamp=*/nullptr, &s, &merge_context,
62 &max_covering_tombstone_seq);
63 RecordTick(stats_, MEMTABLE_MISS);
64 }
65 RecordTick(stats_, NUMBER_KEYS_READ);
66 size_t size = pinnable_val->size();
67 RecordTick(stats_, BYTES_READ, size);
68 RecordInHistogram(stats_, BYTES_PER_READ, size);
69 PERF_COUNTER_ADD(get_read_bytes, size);
70 return s;
71 }
72
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)73 Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
74 ColumnFamilyHandle* column_family) {
75 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
76 auto cfd = cfh->cfd();
77 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
78 SequenceNumber latest_snapshot = versions_->LastSequence();
79 SequenceNumber read_seq =
80 read_options.snapshot != nullptr
81 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
82 ->number_
83 : latest_snapshot;
84 ReadCallback* read_callback = nullptr; // No read callback provided.
85 auto db_iter = NewArenaWrappedDbIterator(
86 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
87 super_version->current, read_seq,
88 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
89 super_version->version_number, read_callback);
90 auto internal_iter = NewInternalIterator(
91 db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
92 db_iter->GetRangeDelAggregator(), read_seq,
93 /* allow_unprepared_value */ true);
94 db_iter->SetIterUnderDBIter(internal_iter);
95 return db_iter;
96 }
97
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)98 Status DBImplReadOnly::NewIterators(
99 const ReadOptions& read_options,
100 const std::vector<ColumnFamilyHandle*>& column_families,
101 std::vector<Iterator*>* iterators) {
102 ReadCallback* read_callback = nullptr; // No read callback provided.
103 if (iterators == nullptr) {
104 return Status::InvalidArgument("iterators not allowed to be nullptr");
105 }
106 iterators->clear();
107 iterators->reserve(column_families.size());
108 SequenceNumber latest_snapshot = versions_->LastSequence();
109 SequenceNumber read_seq =
110 read_options.snapshot != nullptr
111 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
112 ->number_
113 : latest_snapshot;
114
115 for (auto cfh : column_families) {
116 auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
117 auto* sv = cfd->GetSuperVersion()->Ref();
118 auto* db_iter = NewArenaWrappedDbIterator(
119 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
120 sv->current, read_seq,
121 sv->mutable_cf_options.max_sequential_skip_in_iterations,
122 sv->version_number, read_callback);
123 auto* internal_iter = NewInternalIterator(
124 db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
125 db_iter->GetRangeDelAggregator(), read_seq,
126 /* allow_unprepared_value */ true);
127 db_iter->SetIterUnderDBIter(internal_iter);
128 iterators->push_back(db_iter);
129 }
130
131 return Status::OK();
132 }
133
134 namespace {
135 // Return OK if dbname exists in the file system or create it if
136 // create_if_missing
OpenForReadOnlyCheckExistence(const DBOptions & db_options,const std::string & dbname)137 Status OpenForReadOnlyCheckExistence(const DBOptions& db_options,
138 const std::string& dbname) {
139 Status s;
140 if (!db_options.create_if_missing) {
141 // Attempt to read "CURRENT" file
142 const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
143 std::string manifest_path;
144 uint64_t manifest_file_number;
145 s = VersionSet::GetCurrentManifestPath(dbname, fs.get(), &manifest_path,
146 &manifest_file_number);
147 } else {
148 // Historic behavior that doesn't necessarily make sense
149 s = db_options.env->CreateDirIfMissing(dbname);
150 }
151 return s;
152 }
153 } // namespace
154
OpenForReadOnly(const Options & options,const std::string & dbname,DB ** dbptr,bool)155 Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
156 DB** dbptr, bool /*error_if_wal_file_exists*/) {
157 Status s = OpenForReadOnlyCheckExistence(options, dbname);
158 if (!s.ok()) {
159 return s;
160 }
161
162 *dbptr = nullptr;
163
164 // Try to first open DB as fully compacted DB
165 s = CompactedDBImpl::Open(options, dbname, dbptr);
166 if (s.ok()) {
167 return s;
168 }
169
170 DBOptions db_options(options);
171 ColumnFamilyOptions cf_options(options);
172 std::vector<ColumnFamilyDescriptor> column_families;
173 column_families.push_back(
174 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
175 std::vector<ColumnFamilyHandle*> handles;
176
177 s = DBImplReadOnly::OpenForReadOnlyWithoutCheck(
178 db_options, dbname, column_families, &handles, dbptr);
179 if (s.ok()) {
180 assert(handles.size() == 1);
181 // i can delete the handle since DBImpl is always holding a
182 // reference to default column family
183 delete handles[0];
184 }
185 return s;
186 }
187
OpenForReadOnly(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,bool error_if_wal_file_exists)188 Status DB::OpenForReadOnly(
189 const DBOptions& db_options, const std::string& dbname,
190 const std::vector<ColumnFamilyDescriptor>& column_families,
191 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
192 bool error_if_wal_file_exists) {
193 // If dbname does not exist in the file system, should not do anything
194 Status s = OpenForReadOnlyCheckExistence(db_options, dbname);
195 if (!s.ok()) {
196 return s;
197 }
198
199 return DBImplReadOnly::OpenForReadOnlyWithoutCheck(
200 db_options, dbname, column_families, handles, dbptr,
201 error_if_wal_file_exists);
202 }
203
OpenForReadOnlyWithoutCheck(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,bool error_if_wal_file_exists)204 Status DBImplReadOnly::OpenForReadOnlyWithoutCheck(
205 const DBOptions& db_options, const std::string& dbname,
206 const std::vector<ColumnFamilyDescriptor>& column_families,
207 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
208 bool error_if_wal_file_exists) {
209 *dbptr = nullptr;
210 handles->clear();
211
212 SuperVersionContext sv_context(/* create_superversion */ true);
213 DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
214 impl->mutex_.Lock();
215 Status s = impl->Recover(column_families, true /* read only */,
216 error_if_wal_file_exists);
217 if (s.ok()) {
218 // set column family handles
219 for (auto cf : column_families) {
220 auto cfd =
221 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
222 if (cfd == nullptr) {
223 s = Status::InvalidArgument("Column family not found", cf.name);
224 break;
225 }
226 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
227 }
228 }
229 if (s.ok()) {
230 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
231 sv_context.NewSuperVersion();
232 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
233 }
234 }
235 impl->mutex_.Unlock();
236 sv_context.Clean();
237 if (s.ok()) {
238 *dbptr = impl;
239 for (auto* h : *handles) {
240 impl->NewThreadStatusCfInfo(
241 static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
242 }
243 } else {
244 for (auto h : *handles) {
245 delete h;
246 }
247 handles->clear();
248 delete impl;
249 }
250 return s;
251 }
252
253 #else // !ROCKSDB_LITE
254
255 Status DB::OpenForReadOnly(const Options& /*options*/,
256 const std::string& /*dbname*/, DB** /*dbptr*/,
257 bool /*error_if_wal_file_exists*/) {
258 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
259 }
260
261 Status DB::OpenForReadOnly(
262 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
263 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
264 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
265 bool /*error_if_wal_file_exists*/) {
266 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
267 }
268 #endif // !ROCKSDB_LITE
269
270 } // namespace ROCKSDB_NAMESPACE
271