1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/StorageManager.h"
8 
9 #include "td/telegram/ConfigShared.h"
10 #include "td/telegram/DialogId.h"
11 #include "td/telegram/files/FileGcWorker.h"
12 #include "td/telegram/files/FileStatsWorker.h"
13 #include "td/telegram/Global.h"
14 #include "td/telegram/logevent/LogEvent.h"
15 #include "td/telegram/MessagesManager.h"
16 #include "td/telegram/TdDb.h"
17 #include "td/telegram/TdParameters.h"
18 
19 #include "td/db/SqliteDb.h"
20 
21 #include "td/utils/algorithm.h"
22 #include "td/utils/logging.h"
23 #include "td/utils/misc.h"
24 #include "td/utils/port/Clocks.h"
25 #include "td/utils/port/Stat.h"
26 #include "td/utils/Random.h"
27 #include "td/utils/Slice.h"
28 #include "td/utils/Time.h"
29 
30 namespace td {
31 
get_database_statistics_object() const32 tl_object_ptr<td_api::databaseStatistics> DatabaseStats::get_database_statistics_object() const {
33   return make_tl_object<td_api::databaseStatistics>(debug);
34 }
35 
StorageManager(ActorShared<> parent,int32 scheduler_id)36 StorageManager::StorageManager(ActorShared<> parent, int32 scheduler_id)
37     : parent_(std::move(parent)), scheduler_id_(scheduler_id) {
38 }
39 
start_up()40 void StorageManager::start_up() {
41   load_last_gc_timestamp();
42   schedule_next_gc();
43 
44   load_fast_stat();
45 }
46 
on_new_file(int64 size,int64 real_size,int32 cnt)47 void StorageManager::on_new_file(int64 size, int64 real_size, int32 cnt) {
48   LOG(INFO) << "Add " << cnt << " file of size " << size << " with real size " << real_size
49             << " to fast storage statistics";
50   fast_stat_.cnt += cnt;
51 #if TD_WINDOWS
52   auto add_size = size;
53 #else
54   auto add_size = real_size;
55 #endif
56   fast_stat_.size += add_size;
57 
58   if (fast_stat_.cnt < 0 || fast_stat_.size < 0) {
59     LOG(ERROR) << "Wrong fast stat after adding size " << add_size << " and cnt " << cnt;
60     fast_stat_ = FileTypeStat();
61   }
62   save_fast_stat();
63 }
64 
get_storage_stats(bool need_all_files,int32 dialog_limit,Promise<FileStats> promise)65 void StorageManager::get_storage_stats(bool need_all_files, int32 dialog_limit, Promise<FileStats> promise) {
66   if (is_closed_) {
67     return promise.set_error(Global::request_aborted_error());
68   }
69   if (!pending_storage_stats_.empty()) {
70     if (stats_dialog_limit_ == dialog_limit && need_all_files == stats_need_all_files_) {
71       pending_storage_stats_.emplace_back(std::move(promise));
72       return;
73     }
74     //TODO group same queries
75     close_stats_worker();
76   }
77   if (!pending_run_gc_[0].empty() || !pending_run_gc_[1].empty()) {
78     close_gc_worker();
79   }
80   stats_dialog_limit_ = dialog_limit;
81   stats_need_all_files_ = need_all_files;
82   pending_storage_stats_.emplace_back(std::move(promise));
83 
84   create_stats_worker();
85   send_closure(stats_worker_, &FileStatsWorker::get_stats, need_all_files, stats_dialog_limit_ != 0,
86                PromiseCreator::lambda(
87                    [actor_id = actor_id(this), stats_generation = stats_generation_](Result<FileStats> file_stats) {
88                      send_closure(actor_id, &StorageManager::on_file_stats, std::move(file_stats), stats_generation);
89                    }));
90 }
91 
get_storage_stats_fast(Promise<FileStatsFast> promise)92 void StorageManager::get_storage_stats_fast(Promise<FileStatsFast> promise) {
93   promise.set_value(FileStatsFast(fast_stat_.size, fast_stat_.cnt, get_database_size(),
94                                   get_language_pack_database_size(), get_log_size()));
95 }
96 
get_database_stats(Promise<DatabaseStats> promise)97 void StorageManager::get_database_stats(Promise<DatabaseStats> promise) {
98   //TODO: use another thread
99   auto r_stats = G()->td_db()->get_stats();
100   if (r_stats.is_error()) {
101     promise.set_error(r_stats.move_as_error());
102   } else {
103     promise.set_value(DatabaseStats(r_stats.move_as_ok()));
104   }
105 }
106 
update_use_storage_optimizer()107 void StorageManager::update_use_storage_optimizer() {
108   schedule_next_gc();
109 }
110 
run_gc(FileGcParameters parameters,bool return_deleted_file_statistics,Promise<FileStats> promise)111 void StorageManager::run_gc(FileGcParameters parameters, bool return_deleted_file_statistics,
112                             Promise<FileStats> promise) {
113   if (is_closed_) {
114     return promise.set_error(Global::request_aborted_error());
115   }
116   if (!pending_run_gc_[0].empty() || !pending_run_gc_[1].empty()) {
117     close_gc_worker();
118   }
119 
120   bool split_by_owner_dialog_id = !parameters.owner_dialog_ids.empty() ||
121                                   !parameters.exclude_owner_dialog_ids.empty() || parameters.dialog_limit != 0;
122   get_storage_stats(
123       true /*need_all_files*/, split_by_owner_dialog_id,
124       PromiseCreator::lambda(
125           [actor_id = actor_id(this), parameters = std::move(parameters)](Result<FileStats> file_stats) mutable {
126             send_closure(actor_id, &StorageManager::on_all_files, std::move(parameters), std::move(file_stats));
127           }));
128 
129   //NB: get_storage_stats will cancel all garbage collection queries, so promise needs to be added after the call
130   pending_run_gc_[return_deleted_file_statistics].push_back(std::move(promise));
131 }
132 
on_file_stats(Result<FileStats> r_file_stats,uint32 generation)133 void StorageManager::on_file_stats(Result<FileStats> r_file_stats, uint32 generation) {
134   if (generation != stats_generation_) {
135     return;
136   }
137   if (r_file_stats.is_error()) {
138     auto promises = std::move(pending_storage_stats_);
139     for (auto &promise : promises) {
140       promise.set_error(r_file_stats.error().clone());
141     }
142     return;
143   }
144 
145   update_fast_stats(r_file_stats.ok());
146   send_stats(r_file_stats.move_as_ok(), stats_dialog_limit_, std::move(pending_storage_stats_));
147 }
148 
create_stats_worker()149 void StorageManager::create_stats_worker() {
150   CHECK(!is_closed_);
151   if (stats_worker_.empty()) {
152     stats_worker_ =
153         create_actor_on_scheduler<FileStatsWorker>("FileStatsWorker", scheduler_id_, create_reference(),
154                                                    stats_cancellation_token_source_.get_cancellation_token());
155   }
156 }
157 
on_all_files(FileGcParameters gc_parameters,Result<FileStats> r_file_stats)158 void StorageManager::on_all_files(FileGcParameters gc_parameters, Result<FileStats> r_file_stats) {
159   int32 dialog_limit = gc_parameters.dialog_limit;
160   if (is_closed_ && r_file_stats.is_ok()) {
161     r_file_stats = Global::request_aborted_error();
162   }
163   if (r_file_stats.is_error()) {
164     return on_gc_finished(dialog_limit, r_file_stats.move_as_error());
165   }
166 
167   create_gc_worker();
168 
169   send_closure(gc_worker_, &FileGcWorker::run_gc, std::move(gc_parameters), r_file_stats.ok_ref().get_all_files(),
170                PromiseCreator::lambda([actor_id = actor_id(this), dialog_limit](Result<FileGcResult> r_file_gc_result) {
171                  send_closure(actor_id, &StorageManager::on_gc_finished, dialog_limit, std::move(r_file_gc_result));
172                }));
173 }
174 
get_file_size(CSlice path)175 int64 StorageManager::get_file_size(CSlice path) {
176   auto r_info = stat(path);
177   if (r_info.is_error()) {
178     return 0;
179   }
180 
181   auto size = r_info.ok().real_size_;
182   LOG(DEBUG) << "Add file \"" << path << "\" of size " << size << " to fast storage statistics";
183   return size;
184 }
185 
get_database_size()186 int64 StorageManager::get_database_size() {
187   int64 size = 0;
188   G()->td_db()->with_db_path([&size](CSlice path) { size += get_file_size(path); });
189   return size;
190 }
191 
get_language_pack_database_size()192 int64 StorageManager::get_language_pack_database_size() {
193   int64 size = 0;
194   auto path = G()->shared_config().get_option_string("language_pack_database_path");
195   if (!path.empty()) {
196     SqliteDb::with_db_path(path, [&size](CSlice path) { size += get_file_size(path); });
197   }
198   return size;
199 }
200 
get_log_size()201 int64 StorageManager::get_log_size() {
202   int64 size = 0;
203   for (auto &log_path : log_interface->get_file_paths()) {
204     size += get_file_size(log_path);
205   }
206   return size;
207 }
208 
create_gc_worker()209 void StorageManager::create_gc_worker() {
210   CHECK(!is_closed_);
211   if (gc_worker_.empty()) {
212     gc_worker_ = create_actor_on_scheduler<FileGcWorker>("FileGcWorker", scheduler_id_, create_reference(),
213                                                          gc_cancellation_token_source_.get_cancellation_token());
214   }
215 }
216 
on_gc_finished(int32 dialog_limit,Result<FileGcResult> r_file_gc_result)217 void StorageManager::on_gc_finished(int32 dialog_limit, Result<FileGcResult> r_file_gc_result) {
218   if (r_file_gc_result.is_error()) {
219     if (r_file_gc_result.error().code() != 500) {
220       LOG(ERROR) << "GC failed: " << r_file_gc_result.error();
221     }
222     auto promises = std::move(pending_run_gc_[0]);
223     append(promises, std::move(pending_run_gc_[1]));
224     pending_run_gc_[0].clear();
225     pending_run_gc_[1].clear();
226     for (auto &promise : promises) {
227       promise.set_error(r_file_gc_result.error().clone());
228     }
229     return;
230   }
231 
232   update_fast_stats(r_file_gc_result.ok().kept_file_stats_);
233 
234   auto kept_file_promises = std::move(pending_run_gc_[0]);
235   auto removed_file_promises = std::move(pending_run_gc_[1]);
236   send_stats(std::move(r_file_gc_result.ok_ref().kept_file_stats_), dialog_limit, std::move(kept_file_promises));
237   send_stats(std::move(r_file_gc_result.ok_ref().removed_file_stats_), dialog_limit, std::move(removed_file_promises));
238 }
239 
save_fast_stat()240 void StorageManager::save_fast_stat() {
241   G()->td_db()->get_binlog_pmc()->set("fast_file_stat", log_event_store(fast_stat_).as_slice().str());
242 }
243 
load_fast_stat()244 void StorageManager::load_fast_stat() {
245   auto status = log_event_parse(fast_stat_, G()->td_db()->get_binlog_pmc()->get("fast_file_stat"));
246   if (status.is_error()) {
247     fast_stat_ = FileTypeStat();
248   }
249   LOG(INFO) << "Loaded fast storage statistics with " << fast_stat_.cnt << " files of total size " << fast_stat_.size;
250 }
251 
update_fast_stats(const FileStats & stats)252 void StorageManager::update_fast_stats(const FileStats &stats) {
253   fast_stat_ = stats.get_total_nontemp_stat();
254   LOG(INFO) << "Recalculate fast storage statistics to " << fast_stat_.cnt << " files of total size "
255             << fast_stat_.size;
256   save_fast_stat();
257 }
258 
send_stats(FileStats && stats,int32 dialog_limit,std::vector<Promise<FileStats>> && promises)259 void StorageManager::send_stats(FileStats &&stats, int32 dialog_limit, std::vector<Promise<FileStats>> &&promises) {
260   if (promises.empty()) {
261     return;
262   }
263 
264   stats.apply_dialog_limit(dialog_limit);
265   auto dialog_ids = stats.get_dialog_ids();
266 
267   auto promise = PromiseCreator::lambda(
268       [promises = std::move(promises), stats = std::move(stats)](vector<DialogId> dialog_ids) mutable {
269         stats.apply_dialog_ids(dialog_ids);
270         for (auto &promise : promises) {
271           promise.set_value(FileStats(stats));
272         }
273       });
274 
275   send_closure(G()->messages_manager(), &MessagesManager::load_dialogs, std::move(dialog_ids), std::move(promise));
276 }
277 
create_reference()278 ActorShared<> StorageManager::create_reference() {
279   ref_cnt_++;
280   return actor_shared(this, 1);
281 }
282 
hangup_shared()283 void StorageManager::hangup_shared() {
284   ref_cnt_--;
285   if (ref_cnt_ == 0) {
286     stop();
287   }
288 }
289 
close_stats_worker()290 void StorageManager::close_stats_worker() {
291   auto promises = std::move(pending_storage_stats_);
292   pending_storage_stats_.clear();
293   for (auto &promise : promises) {
294     promise.set_error(Global::request_aborted_error());
295   }
296   stats_generation_++;
297   stats_worker_.reset();
298   stats_cancellation_token_source_.cancel();
299 }
300 
close_gc_worker()301 void StorageManager::close_gc_worker() {
302   auto promises = std::move(pending_run_gc_[0]);
303   append(promises, std::move(pending_run_gc_[1]));
304   pending_run_gc_[0].clear();
305   pending_run_gc_[1].clear();
306   for (auto &promise : promises) {
307     promise.set_error(Global::request_aborted_error());
308   }
309   gc_worker_.reset();
310   gc_cancellation_token_source_.cancel();
311 }
312 
hangup()313 void StorageManager::hangup() {
314   is_closed_ = true;
315   close_stats_worker();
316   close_gc_worker();
317   hangup_shared();
318 }
319 
load_last_gc_timestamp()320 uint32 StorageManager::load_last_gc_timestamp() {
321   last_gc_timestamp_ = to_integer<uint32>(G()->td_db()->get_binlog_pmc()->get("files_gc_ts"));
322   return last_gc_timestamp_;
323 }
324 
save_last_gc_timestamp()325 void StorageManager::save_last_gc_timestamp() {
326   last_gc_timestamp_ = static_cast<uint32>(Clocks::system());
327   G()->td_db()->get_binlog_pmc()->set("files_gc_ts", to_string(last_gc_timestamp_));
328 }
329 
schedule_next_gc()330 void StorageManager::schedule_next_gc() {
331   if (!G()->shared_config().get_option_boolean("use_storage_optimizer") &&
332       !G()->parameters().enable_storage_optimizer) {
333     next_gc_at_ = 0;
334     cancel_timeout();
335     LOG(INFO) << "No next file clean up is scheduled";
336     return;
337   }
338   auto sys_time = static_cast<uint32>(Clocks::system());
339 
340   auto next_gc_at = last_gc_timestamp_ + GC_EACH;
341   if (next_gc_at < sys_time) {
342     next_gc_at = sys_time;
343   }
344   if (next_gc_at > sys_time + GC_EACH) {
345     next_gc_at = sys_time + GC_EACH;
346   }
347   next_gc_at += Random::fast(GC_DELAY, GC_DELAY + GC_RAND_DELAY);
348   CHECK(next_gc_at >= sys_time);
349   auto next_gc_in = next_gc_at - sys_time;
350 
351   LOG(INFO) << "Schedule next file clean up in " << next_gc_in;
352   next_gc_at_ = Time::now() + next_gc_in;
353   set_timeout_at(next_gc_at_);
354 }
355 
timeout_expired()356 void StorageManager::timeout_expired() {
357   if (next_gc_at_ == 0) {
358     return;
359   }
360   if (!pending_run_gc_[0].empty() || !pending_run_gc_[1].empty() || !pending_storage_stats_.empty()) {
361     set_timeout_in(60);
362     return;
363   }
364   next_gc_at_ = 0;
365   run_gc({}, false, PromiseCreator::lambda([actor_id = actor_id(this)](Result<FileStats> r_stats) {
366            if (!r_stats.is_error() || r_stats.error().code() != 500) {
367              // do not save garbage collection timestamp if request was canceled
368              send_closure(actor_id, &StorageManager::save_last_gc_timestamp);
369            }
370            send_closure(actor_id, &StorageManager::schedule_next_gc);
371          }));
372 }
373 
374 }  // namespace td
375