1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/StorageManager.h"
8
9 #include "td/telegram/ConfigShared.h"
10 #include "td/telegram/DialogId.h"
11 #include "td/telegram/files/FileGcWorker.h"
12 #include "td/telegram/files/FileStatsWorker.h"
13 #include "td/telegram/Global.h"
14 #include "td/telegram/logevent/LogEvent.h"
15 #include "td/telegram/MessagesManager.h"
16 #include "td/telegram/TdDb.h"
17 #include "td/telegram/TdParameters.h"
18
19 #include "td/db/SqliteDb.h"
20
21 #include "td/utils/algorithm.h"
22 #include "td/utils/logging.h"
23 #include "td/utils/misc.h"
24 #include "td/utils/port/Clocks.h"
25 #include "td/utils/port/Stat.h"
26 #include "td/utils/Random.h"
27 #include "td/utils/Slice.h"
28 #include "td/utils/Time.h"
29
30 namespace td {
31
get_database_statistics_object() const32 tl_object_ptr<td_api::databaseStatistics> DatabaseStats::get_database_statistics_object() const {
33 return make_tl_object<td_api::databaseStatistics>(debug);
34 }
35
StorageManager(ActorShared<> parent,int32 scheduler_id)36 StorageManager::StorageManager(ActorShared<> parent, int32 scheduler_id)
37 : parent_(std::move(parent)), scheduler_id_(scheduler_id) {
38 }
39
start_up()40 void StorageManager::start_up() {
41 load_last_gc_timestamp();
42 schedule_next_gc();
43
44 load_fast_stat();
45 }
46
on_new_file(int64 size,int64 real_size,int32 cnt)47 void StorageManager::on_new_file(int64 size, int64 real_size, int32 cnt) {
48 LOG(INFO) << "Add " << cnt << " file of size " << size << " with real size " << real_size
49 << " to fast storage statistics";
50 fast_stat_.cnt += cnt;
51 #if TD_WINDOWS
52 auto add_size = size;
53 #else
54 auto add_size = real_size;
55 #endif
56 fast_stat_.size += add_size;
57
58 if (fast_stat_.cnt < 0 || fast_stat_.size < 0) {
59 LOG(ERROR) << "Wrong fast stat after adding size " << add_size << " and cnt " << cnt;
60 fast_stat_ = FileTypeStat();
61 }
62 save_fast_stat();
63 }
64
get_storage_stats(bool need_all_files,int32 dialog_limit,Promise<FileStats> promise)65 void StorageManager::get_storage_stats(bool need_all_files, int32 dialog_limit, Promise<FileStats> promise) {
66 if (is_closed_) {
67 return promise.set_error(Global::request_aborted_error());
68 }
69 if (!pending_storage_stats_.empty()) {
70 if (stats_dialog_limit_ == dialog_limit && need_all_files == stats_need_all_files_) {
71 pending_storage_stats_.emplace_back(std::move(promise));
72 return;
73 }
74 //TODO group same queries
75 close_stats_worker();
76 }
77 if (!pending_run_gc_[0].empty() || !pending_run_gc_[1].empty()) {
78 close_gc_worker();
79 }
80 stats_dialog_limit_ = dialog_limit;
81 stats_need_all_files_ = need_all_files;
82 pending_storage_stats_.emplace_back(std::move(promise));
83
84 create_stats_worker();
85 send_closure(stats_worker_, &FileStatsWorker::get_stats, need_all_files, stats_dialog_limit_ != 0,
86 PromiseCreator::lambda(
87 [actor_id = actor_id(this), stats_generation = stats_generation_](Result<FileStats> file_stats) {
88 send_closure(actor_id, &StorageManager::on_file_stats, std::move(file_stats), stats_generation);
89 }));
90 }
91
get_storage_stats_fast(Promise<FileStatsFast> promise)92 void StorageManager::get_storage_stats_fast(Promise<FileStatsFast> promise) {
93 promise.set_value(FileStatsFast(fast_stat_.size, fast_stat_.cnt, get_database_size(),
94 get_language_pack_database_size(), get_log_size()));
95 }
96
get_database_stats(Promise<DatabaseStats> promise)97 void StorageManager::get_database_stats(Promise<DatabaseStats> promise) {
98 //TODO: use another thread
99 auto r_stats = G()->td_db()->get_stats();
100 if (r_stats.is_error()) {
101 promise.set_error(r_stats.move_as_error());
102 } else {
103 promise.set_value(DatabaseStats(r_stats.move_as_ok()));
104 }
105 }
106
update_use_storage_optimizer()107 void StorageManager::update_use_storage_optimizer() {
108 schedule_next_gc();
109 }
110
run_gc(FileGcParameters parameters,bool return_deleted_file_statistics,Promise<FileStats> promise)111 void StorageManager::run_gc(FileGcParameters parameters, bool return_deleted_file_statistics,
112 Promise<FileStats> promise) {
113 if (is_closed_) {
114 return promise.set_error(Global::request_aborted_error());
115 }
116 if (!pending_run_gc_[0].empty() || !pending_run_gc_[1].empty()) {
117 close_gc_worker();
118 }
119
120 bool split_by_owner_dialog_id = !parameters.owner_dialog_ids.empty() ||
121 !parameters.exclude_owner_dialog_ids.empty() || parameters.dialog_limit != 0;
122 get_storage_stats(
123 true /*need_all_files*/, split_by_owner_dialog_id,
124 PromiseCreator::lambda(
125 [actor_id = actor_id(this), parameters = std::move(parameters)](Result<FileStats> file_stats) mutable {
126 send_closure(actor_id, &StorageManager::on_all_files, std::move(parameters), std::move(file_stats));
127 }));
128
129 //NB: get_storage_stats will cancel all garbage collection queries, so promise needs to be added after the call
130 pending_run_gc_[return_deleted_file_statistics].push_back(std::move(promise));
131 }
132
on_file_stats(Result<FileStats> r_file_stats,uint32 generation)133 void StorageManager::on_file_stats(Result<FileStats> r_file_stats, uint32 generation) {
134 if (generation != stats_generation_) {
135 return;
136 }
137 if (r_file_stats.is_error()) {
138 auto promises = std::move(pending_storage_stats_);
139 for (auto &promise : promises) {
140 promise.set_error(r_file_stats.error().clone());
141 }
142 return;
143 }
144
145 update_fast_stats(r_file_stats.ok());
146 send_stats(r_file_stats.move_as_ok(), stats_dialog_limit_, std::move(pending_storage_stats_));
147 }
148
create_stats_worker()149 void StorageManager::create_stats_worker() {
150 CHECK(!is_closed_);
151 if (stats_worker_.empty()) {
152 stats_worker_ =
153 create_actor_on_scheduler<FileStatsWorker>("FileStatsWorker", scheduler_id_, create_reference(),
154 stats_cancellation_token_source_.get_cancellation_token());
155 }
156 }
157
on_all_files(FileGcParameters gc_parameters,Result<FileStats> r_file_stats)158 void StorageManager::on_all_files(FileGcParameters gc_parameters, Result<FileStats> r_file_stats) {
159 int32 dialog_limit = gc_parameters.dialog_limit;
160 if (is_closed_ && r_file_stats.is_ok()) {
161 r_file_stats = Global::request_aborted_error();
162 }
163 if (r_file_stats.is_error()) {
164 return on_gc_finished(dialog_limit, r_file_stats.move_as_error());
165 }
166
167 create_gc_worker();
168
169 send_closure(gc_worker_, &FileGcWorker::run_gc, std::move(gc_parameters), r_file_stats.ok_ref().get_all_files(),
170 PromiseCreator::lambda([actor_id = actor_id(this), dialog_limit](Result<FileGcResult> r_file_gc_result) {
171 send_closure(actor_id, &StorageManager::on_gc_finished, dialog_limit, std::move(r_file_gc_result));
172 }));
173 }
174
get_file_size(CSlice path)175 int64 StorageManager::get_file_size(CSlice path) {
176 auto r_info = stat(path);
177 if (r_info.is_error()) {
178 return 0;
179 }
180
181 auto size = r_info.ok().real_size_;
182 LOG(DEBUG) << "Add file \"" << path << "\" of size " << size << " to fast storage statistics";
183 return size;
184 }
185
get_database_size()186 int64 StorageManager::get_database_size() {
187 int64 size = 0;
188 G()->td_db()->with_db_path([&size](CSlice path) { size += get_file_size(path); });
189 return size;
190 }
191
get_language_pack_database_size()192 int64 StorageManager::get_language_pack_database_size() {
193 int64 size = 0;
194 auto path = G()->shared_config().get_option_string("language_pack_database_path");
195 if (!path.empty()) {
196 SqliteDb::with_db_path(path, [&size](CSlice path) { size += get_file_size(path); });
197 }
198 return size;
199 }
200
get_log_size()201 int64 StorageManager::get_log_size() {
202 int64 size = 0;
203 for (auto &log_path : log_interface->get_file_paths()) {
204 size += get_file_size(log_path);
205 }
206 return size;
207 }
208
create_gc_worker()209 void StorageManager::create_gc_worker() {
210 CHECK(!is_closed_);
211 if (gc_worker_.empty()) {
212 gc_worker_ = create_actor_on_scheduler<FileGcWorker>("FileGcWorker", scheduler_id_, create_reference(),
213 gc_cancellation_token_source_.get_cancellation_token());
214 }
215 }
216
on_gc_finished(int32 dialog_limit,Result<FileGcResult> r_file_gc_result)217 void StorageManager::on_gc_finished(int32 dialog_limit, Result<FileGcResult> r_file_gc_result) {
218 if (r_file_gc_result.is_error()) {
219 if (r_file_gc_result.error().code() != 500) {
220 LOG(ERROR) << "GC failed: " << r_file_gc_result.error();
221 }
222 auto promises = std::move(pending_run_gc_[0]);
223 append(promises, std::move(pending_run_gc_[1]));
224 pending_run_gc_[0].clear();
225 pending_run_gc_[1].clear();
226 for (auto &promise : promises) {
227 promise.set_error(r_file_gc_result.error().clone());
228 }
229 return;
230 }
231
232 update_fast_stats(r_file_gc_result.ok().kept_file_stats_);
233
234 auto kept_file_promises = std::move(pending_run_gc_[0]);
235 auto removed_file_promises = std::move(pending_run_gc_[1]);
236 send_stats(std::move(r_file_gc_result.ok_ref().kept_file_stats_), dialog_limit, std::move(kept_file_promises));
237 send_stats(std::move(r_file_gc_result.ok_ref().removed_file_stats_), dialog_limit, std::move(removed_file_promises));
238 }
239
save_fast_stat()240 void StorageManager::save_fast_stat() {
241 G()->td_db()->get_binlog_pmc()->set("fast_file_stat", log_event_store(fast_stat_).as_slice().str());
242 }
243
load_fast_stat()244 void StorageManager::load_fast_stat() {
245 auto status = log_event_parse(fast_stat_, G()->td_db()->get_binlog_pmc()->get("fast_file_stat"));
246 if (status.is_error()) {
247 fast_stat_ = FileTypeStat();
248 }
249 LOG(INFO) << "Loaded fast storage statistics with " << fast_stat_.cnt << " files of total size " << fast_stat_.size;
250 }
251
update_fast_stats(const FileStats & stats)252 void StorageManager::update_fast_stats(const FileStats &stats) {
253 fast_stat_ = stats.get_total_nontemp_stat();
254 LOG(INFO) << "Recalculate fast storage statistics to " << fast_stat_.cnt << " files of total size "
255 << fast_stat_.size;
256 save_fast_stat();
257 }
258
send_stats(FileStats && stats,int32 dialog_limit,std::vector<Promise<FileStats>> && promises)259 void StorageManager::send_stats(FileStats &&stats, int32 dialog_limit, std::vector<Promise<FileStats>> &&promises) {
260 if (promises.empty()) {
261 return;
262 }
263
264 stats.apply_dialog_limit(dialog_limit);
265 auto dialog_ids = stats.get_dialog_ids();
266
267 auto promise = PromiseCreator::lambda(
268 [promises = std::move(promises), stats = std::move(stats)](vector<DialogId> dialog_ids) mutable {
269 stats.apply_dialog_ids(dialog_ids);
270 for (auto &promise : promises) {
271 promise.set_value(FileStats(stats));
272 }
273 });
274
275 send_closure(G()->messages_manager(), &MessagesManager::load_dialogs, std::move(dialog_ids), std::move(promise));
276 }
277
create_reference()278 ActorShared<> StorageManager::create_reference() {
279 ref_cnt_++;
280 return actor_shared(this, 1);
281 }
282
hangup_shared()283 void StorageManager::hangup_shared() {
284 ref_cnt_--;
285 if (ref_cnt_ == 0) {
286 stop();
287 }
288 }
289
close_stats_worker()290 void StorageManager::close_stats_worker() {
291 auto promises = std::move(pending_storage_stats_);
292 pending_storage_stats_.clear();
293 for (auto &promise : promises) {
294 promise.set_error(Global::request_aborted_error());
295 }
296 stats_generation_++;
297 stats_worker_.reset();
298 stats_cancellation_token_source_.cancel();
299 }
300
close_gc_worker()301 void StorageManager::close_gc_worker() {
302 auto promises = std::move(pending_run_gc_[0]);
303 append(promises, std::move(pending_run_gc_[1]));
304 pending_run_gc_[0].clear();
305 pending_run_gc_[1].clear();
306 for (auto &promise : promises) {
307 promise.set_error(Global::request_aborted_error());
308 }
309 gc_worker_.reset();
310 gc_cancellation_token_source_.cancel();
311 }
312
hangup()313 void StorageManager::hangup() {
314 is_closed_ = true;
315 close_stats_worker();
316 close_gc_worker();
317 hangup_shared();
318 }
319
load_last_gc_timestamp()320 uint32 StorageManager::load_last_gc_timestamp() {
321 last_gc_timestamp_ = to_integer<uint32>(G()->td_db()->get_binlog_pmc()->get("files_gc_ts"));
322 return last_gc_timestamp_;
323 }
324
save_last_gc_timestamp()325 void StorageManager::save_last_gc_timestamp() {
326 last_gc_timestamp_ = static_cast<uint32>(Clocks::system());
327 G()->td_db()->get_binlog_pmc()->set("files_gc_ts", to_string(last_gc_timestamp_));
328 }
329
schedule_next_gc()330 void StorageManager::schedule_next_gc() {
331 if (!G()->shared_config().get_option_boolean("use_storage_optimizer") &&
332 !G()->parameters().enable_storage_optimizer) {
333 next_gc_at_ = 0;
334 cancel_timeout();
335 LOG(INFO) << "No next file clean up is scheduled";
336 return;
337 }
338 auto sys_time = static_cast<uint32>(Clocks::system());
339
340 auto next_gc_at = last_gc_timestamp_ + GC_EACH;
341 if (next_gc_at < sys_time) {
342 next_gc_at = sys_time;
343 }
344 if (next_gc_at > sys_time + GC_EACH) {
345 next_gc_at = sys_time + GC_EACH;
346 }
347 next_gc_at += Random::fast(GC_DELAY, GC_DELAY + GC_RAND_DELAY);
348 CHECK(next_gc_at >= sys_time);
349 auto next_gc_in = next_gc_at - sys_time;
350
351 LOG(INFO) << "Schedule next file clean up in " << next_gc_in;
352 next_gc_at_ = Time::now() + next_gc_in;
353 set_timeout_at(next_gc_at_);
354 }
355
timeout_expired()356 void StorageManager::timeout_expired() {
357 if (next_gc_at_ == 0) {
358 return;
359 }
360 if (!pending_run_gc_[0].empty() || !pending_run_gc_[1].empty() || !pending_storage_stats_.empty()) {
361 set_timeout_in(60);
362 return;
363 }
364 next_gc_at_ = 0;
365 run_gc({}, false, PromiseCreator::lambda([actor_id = actor_id(this)](Result<FileStats> r_stats) {
366 if (!r_stats.is_error() || r_stats.error().code() != 500) {
367 // do not save garbage collection timestamp if request was canceled
368 send_closure(actor_id, &StorageManager::save_last_gc_timestamp);
369 }
370 send_closure(actor_id, &StorageManager::schedule_next_gc);
371 }));
372 }
373
374 } // namespace td
375