1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "components/services/storage/indexed_db/scopes/leveldb_scopes.h"
6 
7 #include <memory>
8 #include <string>
9 #include <utility>
10 
11 #include "base/barrier_closure.h"
12 #include "base/bind.h"
13 #include "base/callback_helpers.h"
14 #include "base/compiler_specific.h"
15 #include "base/memory/ptr_util.h"
16 #include "base/optional.h"
17 #include "base/strings/strcat.h"
18 #include "base/strings/string_number_conversions.h"
19 #include "base/task/post_task.h"
20 #include "base/task/task_traits.h"
21 #include "base/task/thread_pool.h"
22 #include "base/task_runner_util.h"
23 #include "base/threading/sequenced_task_runner_handle.h"
24 #include "components/services/storage/indexed_db/leveldb/leveldb_state.h"
25 #include "components/services/storage/indexed_db/scopes/leveldb_scope.h"
26 #include "components/services/storage/indexed_db/scopes/leveldb_scopes_coding.h"
27 #include "components/services/storage/indexed_db/scopes/leveldb_scopes_tasks.h"
28 #include "components/services/storage/indexed_db/scopes/scopes_lock_manager.h"
29 #include "components/services/storage/indexed_db/scopes/scopes_metadata.pb.h"
30 #include "third_party/leveldatabase/src/include/leveldb/db.h"
31 #include "third_party/leveldatabase/src/include/leveldb/iterator.h"
32 #include "third_party/leveldatabase/src/include/leveldb/slice.h"
33 
34 namespace content {
35 
LevelDBScopes(std::vector<uint8_t> metadata_key_prefix,size_t max_write_batch_size,scoped_refptr<LevelDBState> level_db,ScopesLockManager * lock_manager,TearDownCallback tear_down_callback)36 LevelDBScopes::LevelDBScopes(std::vector<uint8_t> metadata_key_prefix,
37                              size_t max_write_batch_size,
38                              scoped_refptr<LevelDBState> level_db,
39                              ScopesLockManager* lock_manager,
40                              TearDownCallback tear_down_callback)
41     : metadata_key_prefix_(std::move(metadata_key_prefix)),
42       max_write_batch_size_bytes_(max_write_batch_size),
43       level_db_(std::move(level_db)),
44       lock_manager_(lock_manager),
45       tear_down_callback_(std::move(tear_down_callback)) {}
46 
47 LevelDBScopes::~LevelDBScopes() = default;
48 
Initialize()49 leveldb::Status LevelDBScopes::Initialize() {
50 #if DCHECK_IS_ON()
51   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
52   DCHECK(level_db_);
53   DCHECK(!initialize_called_) << "Initialize() already called";
54   initialize_called_ = true;
55 #endif  // DCHECK_IS_ON()
56 
57   leveldb::ReadOptions read_options;
58   read_options.fill_cache = true;
59   read_options.verify_checksums = true;
60   leveldb::WriteOptions write_options;
61   write_options.sync = false;
62   ScopesEncoder key_encoder;
63   leveldb::Status s;
64 
65   // This method loads the global metadata, reads in all of the scopes still on
66   // disk, and stores their information for later cleanup or reverting. For all
67   // reverting scopes, the appropriate locks are acquired now.
68 
69   // Step 1 - Load & initialize global metadata.
70   std::string metadata_value;
71   leveldb::Slice metadata_key =
72       key_encoder.GlobalMetadataKey(metadata_key_prefix_);
73   s = level_db_->db()->Get(read_options, metadata_key, &metadata_value);
74   if (UNLIKELY(!s.ok() && !s.IsNotFound()))
75     return s;
76 
77   LevelDBScopesMetadata metadata;
78   if (s.IsNotFound()) {
79     metadata.set_version(leveldb_scopes::kCurrentVersion);
80     // This is the only 'write' operation that is done in this method, so a
81     // leveldb::WriteBatch isn't necessary.
82     s = level_db_->db()->Put(write_options, metadata_key,
83                              metadata.SerializeAsString());
84     if (UNLIKELY(!s.ok()))
85       return s;
86   } else {
87     if (!metadata.ParseFromString(metadata_value)) {
88       return leveldb::Status::Corruption(
89           "Could not parse LevelDBScopes Metadata.");
90     }
91     if (metadata.version() < leveldb_scopes::kMinSupportedVersion ||
92         metadata.version() > leveldb_scopes::kCurrentVersion) {
93       return leveldb::Status::Corruption(
94           base::StrCat({"Unsupported scopes metadata version ",
95                         base::NumberToString(metadata.version())}));
96     }
97   }
98 
99   // Step 2 - Load scopes metadata & queue up revert or cleanup tasks, to be run
100   // when StartRecoveryAndCleanupTasks() is called. All locks for the revert
101   // tasks are acquired now.
102 
103   DCHECK(startup_scopes_to_clean_.empty());
104   DCHECK(startup_scopes_to_revert_.empty());
105   const std::unique_ptr<leveldb::Iterator> iterator =
106       base::WrapUnique(level_db_->db()->NewIterator(read_options));
107   leveldb::Slice prefix_key =
108       key_encoder.ScopeMetadataPrefix(metadata_key_prefix_);
109   iterator->Seek(prefix_key);
110   LevelDBScopesScopeMetadata scope_metadata;
111   for (; iterator->Valid() && iterator->key().starts_with(prefix_key);
112        iterator->Next()) {
113     // Parse the key & value.
114     int64_t scope_id;
115     bool success;
116     std::tie(success, scope_id) = leveldb_scopes::ParseScopeMetadataId(
117         iterator->key(), metadata_key_prefix_);
118     if (UNLIKELY(!success)) {
119       return leveldb::Status::Corruption(base::StrCat(
120           {"Could not read scope metadata key: ", iterator->key().ToString()}));
121     }
122     if (UNLIKELY(!scope_metadata.ParseFromArray(iterator->value().data(),
123                                                 iterator->value().size()))) {
124       return leveldb::Status::Corruption(base::StrCat(
125           {"Could not parse scope value key: ", iterator->value().ToString()}));
126     }
127 
128     // The 'commit point' is not having any lock ranges in scope_metadata. If
129     // lock ranges aren't present then it was committed, and the scope only
130     // needs to be cleaned up.
131     if (LIKELY(scope_metadata.locks_size() == 0)) {
132       startup_scopes_to_clean_.emplace_back(
133           scope_id, scope_metadata.ignore_cleanup_tasks()
134                         ? StartupCleanupType::kIgnoreCleanupTasks
135                         : StartupCleanupType::kExecuteCleanupTasks);
136       continue;
137     }
138 
139     // The commit point isn't there, so that scope needs to be reverted.
140     // Acquire all locks necessary to undo the scope to prevent user-created
141     // scopes for reading or writing changes that will be undone.
142     ScopeLockRange range;
143     base::flat_set<ScopesLockManager::ScopeLockRequest> lock_requests;
144     lock_requests.reserve(scope_metadata.locks().size());
145     for (const auto& lock : scope_metadata.locks()) {
146       range.begin = lock.range().begin();
147       range.end = lock.range().end();
148       lock_requests.emplace(lock.level(), range,
149                             ScopesLockManager::LockType::kExclusive);
150     }
151     ScopesLocksHolder receiver;
152     bool locks_acquired = lock_manager_->AcquireLocks(
153         std::move(lock_requests), receiver.weak_factory.GetWeakPtr(),
154         base::DoNothing());
155     if (UNLIKELY(!locks_acquired))
156       return leveldb::Status::Corruption("Invalid locks on disk.");
157 
158     // AcquireLocks should grant the locks synchronously because
159     // 1. There should be no locks acquired before calling this method, and
160     // 2. All locks that were are being loaded from disk were previously 'held'
161     //    by this system. If they conflict, this is an invalid state on disk.
162     if (UNLIKELY(receiver.locks.empty()))
163       return leveldb::Status::Corruption("Invalid lock ranges on disk.");
164 
165     startup_scopes_to_revert_.emplace_back(scope_id, std::move(receiver.locks));
166   }
167   if (LIKELY(iterator->status().ok()))
168     recovery_finished_ = true;
169   return s;
170 }
171 
StartRecoveryAndCleanupTasks(TaskRunnerMode mode)172 leveldb::Status LevelDBScopes::StartRecoveryAndCleanupTasks(
173     TaskRunnerMode mode) {
174   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
175   DCHECK(!revert_runner_) << "StartRecoveryAndCleanupTasks() already called.";
176   DCHECK(!cleanup_runner_);
177 
178   // There are many choices for how to run these tasks. They technically could
179   // be done on a threadpool, where each task is in its own thread. Because both
180   // of these task types are triggered by the code on a webpage, it is dangerous
181   // to let them completely fill up a threadpool.
182   // The cleanup tasks are important to run because they will result in disk
183   // space shrinkage, especially when they have compaction tasks. This affects
184   // the webpage quota.
185   // The revert tasks are very important because they still hold a lock to that
186   // object store or database. This can completely block website database
187   // operations from happening.
188   // The compromise here is:
189   // It is OK to mark these priorities as somewhat high(blocking and visible)
190   // as long as each task type only uses one sequence. This makes sure that the
191   // tasks cannot monopolize the entire thread pool, and that they will be run
192   // reasonably soon.
193   switch (mode) {
194     case TaskRunnerMode::kNewCleanupAndRevertSequences:
195       revert_runner_ = base::ThreadPool::CreateSequencedTaskRunner(
196           {base::MayBlock(), base::WithBaseSyncPrimitives(),
197            base::TaskShutdownBehavior::SKIP_ON_SHUTDOWN,
198            base::TaskPriority::USER_BLOCKING});
199       cleanup_runner_ = base::ThreadPool::CreateSequencedTaskRunner(
200           {base::MayBlock(), base::WithBaseSyncPrimitives(),
201            base::TaskShutdownBehavior::SKIP_ON_SHUTDOWN,
202            base::TaskPriority::USER_VISIBLE});
203       break;
204     case TaskRunnerMode::kUseCurrentSequence:
205       revert_runner_ = nullptr;
206       cleanup_runner_ = base::SequencedTaskRunnerHandle::Get();
207       break;
208   }
209 
210   // Schedule all pending revert tasks ASAP.
211   leveldb::Status last_error;
212   for (StartupScopeToRevert& revert_scope_data : startup_scopes_to_revert_) {
213     leveldb::Status status =
214         Rollback(revert_scope_data.first, std::move(revert_scope_data.second));
215     if (!status.ok())
216       last_error = status;
217   }
218   startup_scopes_to_revert_.clear();
219   if (!last_error.ok())
220     return last_error;
221 
222   // Schedule all committed scopes to be cleaned up.
223   for (auto& cleanup_scope_data : startup_scopes_to_clean_) {
224     auto cleanup_task = std::make_unique<CleanupScopeTask>(
225         level_db_, metadata_key_prefix_, cleanup_scope_data.first,
226         cleanup_scope_data.second == StartupCleanupType::kExecuteCleanupTasks
227             ? CleanupScopeTask::CleanupMode::kExecuteCleanupTasks
228             : CleanupScopeTask::CleanupMode::kIgnoreCleanupTasks,
229         max_write_batch_size_bytes_);
230     base::PostTaskAndReplyWithResult(
231         cleanup_runner_.get(), FROM_HERE,
232         base::BindOnce(&CleanupScopeTask::Run, std::move(cleanup_task)),
233         base::BindOnce(&LevelDBScopes::OnCleanupTaskResult,
234                        weak_factory_.GetWeakPtr(), base::OnceClosure()));
235   }
236   startup_scopes_to_clean_.clear();
237   return last_error;
238 }
239 
CreateScope(std::vector<ScopeLock> locks,std::vector<std::pair<std::string,std::string>> empty_ranges)240 std::unique_ptr<LevelDBScope> LevelDBScopes::CreateScope(
241     std::vector<ScopeLock> locks,
242     std::vector<std::pair<std::string, std::string>> empty_ranges) {
243   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
244   DCHECK(recovery_finished_);
245   int scope_id = next_scope_id_;
246   ++next_scope_id_;
247   auto rollback_callback = base::BindOnce(
248       [](base::WeakPtr<LevelDBScopes> scopes, int64_t scope_id,
249          std::vector<ScopeLock> locks) {
250         if (!scopes)
251           return leveldb::Status::OK();
252         return scopes->Rollback(scope_id, std::move(locks));
253       },
254       weak_factory_.GetWeakPtr());
255   return base::WrapUnique(new LevelDBScope(
256       scope_id, metadata_key_prefix_, max_write_batch_size_bytes_, level_db_,
257       std::move(locks), std::move(empty_ranges), std::move(rollback_callback),
258       tear_down_callback_));
259 }
260 
Commit(std::unique_ptr<LevelDBScope> scope,bool sync_on_commit)261 leveldb::Status LevelDBScopes::Commit(std::unique_ptr<LevelDBScope> scope,
262                                       bool sync_on_commit) {
263   return Commit(std::move(scope), sync_on_commit, base::OnceClosure());
264 }
265 
Commit(std::unique_ptr<LevelDBScope> scope,bool sync_on_commit,base::OnceClosure on_complete)266 leveldb::Status LevelDBScopes::Commit(std::unique_ptr<LevelDBScope> scope,
267                                       bool sync_on_commit,
268                                       base::OnceClosure on_complete) {
269   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
270   DCHECK(recovery_finished_);
271   DCHECK(cleanup_runner_);
272   LevelDBScope::Mode scopes_mode;
273   leveldb::Status s;
274   std::tie(s, scopes_mode) = scope->Commit(sync_on_commit);
275   if (scopes_mode == LevelDBScope::Mode::kUndoLogOnDisk) {
276     auto task = std::make_unique<CleanupScopeTask>(
277         level_db_, metadata_key_prefix_, scope->scope_id(),
278         CleanupScopeTask::CleanupMode::kExecuteCleanupTasks,
279         max_write_batch_size_bytes_);
280     base::PostTaskAndReplyWithResult(
281         cleanup_runner_.get(), FROM_HERE,
282         base::BindOnce(&CleanupScopeTask::Run, std::move(task)),
283         base::BindOnce(&LevelDBScopes::OnCleanupTaskResult,
284                        weak_factory_.GetWeakPtr(), std::move(on_complete)));
285   }
286   return s;
287 }
288 
Rollback(int64_t scope_id,std::vector<ScopeLock> locks)289 leveldb::Status LevelDBScopes::Rollback(int64_t scope_id,
290                                         std::vector<ScopeLock> locks) {
291   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
292   auto task = std::make_unique<RevertScopeTask>(
293       level_db_, metadata_key_prefix_, scope_id, max_write_batch_size_bytes_);
294 
295   if (revert_runner_) {
296     base::PostTaskAndReplyWithResult(
297         revert_runner_.get(), FROM_HERE,
298         base::BindOnce(&RevertScopeTask::Run, std::move(task)),
299         base::BindOnce(&LevelDBScopes::OnRevertTaskResult,
300                        weak_factory_.GetWeakPtr(), scope_id, std::move(locks)));
301     return leveldb::Status::OK();
302   }
303   leveldb::Status result = task->Run();
304   if (LIKELY(result.ok()))
305     OnRevertTaskResult(scope_id, std::move(locks), result);
306   return result;
307 }
308 
OnCleanupTaskResult(base::OnceClosure on_complete,leveldb::Status result)309 void LevelDBScopes::OnCleanupTaskResult(base::OnceClosure on_complete,
310                                         leveldb::Status result) {
311   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
312   if (UNLIKELY(!result.ok()))
313     tear_down_callback_.Run(result);
314   if (on_complete)
315     std::move(on_complete).Run();
316 }
317 
OnRevertTaskResult(int64_t scope_id,std::vector<ScopeLock> locks,leveldb::Status result)318 void LevelDBScopes::OnRevertTaskResult(int64_t scope_id,
319                                        std::vector<ScopeLock> locks,
320                                        leveldb::Status result) {
321   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
322   if (UNLIKELY(!result.ok())) {
323     tear_down_callback_.Run(result);
324     return;
325   }
326   auto task = std::make_unique<CleanupScopeTask>(
327       level_db_, metadata_key_prefix_, scope_id,
328       CleanupScopeTask::CleanupMode::kIgnoreCleanupTasks,
329       max_write_batch_size_bytes_);
330   base::PostTaskAndReplyWithResult(
331       cleanup_runner_.get(), FROM_HERE,
332       base::BindOnce(&CleanupScopeTask::Run, std::move(task)),
333       base::BindOnce(&LevelDBScopes::OnCleanupTaskResult,
334                      weak_factory_.GetWeakPtr(), base::OnceClosure()));
335 }
336 
337 }  // namespace content
338