1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/DialogDb.h"
8 
9 #include "td/telegram/Version.h"
10 
11 #include "td/db/SqliteConnectionSafe.h"
12 #include "td/db/SqliteDb.h"
13 #include "td/db/SqliteKeyValue.h"
14 #include "td/db/SqliteStatement.h"
15 
16 #include "td/actor/actor.h"
17 #include "td/actor/SchedulerLocalStorage.h"
18 
19 #include "td/utils/common.h"
20 #include "td/utils/format.h"
21 #include "td/utils/logging.h"
22 #include "td/utils/misc.h"
23 #include "td/utils/ScopeGuard.h"
24 #include "td/utils/SliceBuilder.h"
25 #include "td/utils/Time.h"
26 
27 namespace td {
28 // NB: must happen inside a transaction
init_dialog_db(SqliteDb & db,int32 version,KeyValueSyncInterface & binlog_pmc,bool & was_created)29 Status init_dialog_db(SqliteDb &db, int32 version, KeyValueSyncInterface &binlog_pmc, bool &was_created) {
30   LOG(INFO) << "Init dialog database " << tag("version", version);
31   was_created = false;
32 
33   // Check if database exists
34   TRY_RESULT(has_table, db.has_table("dialogs"));
35   if (!has_table) {
36     version = 0;
37   }
38 
39   if (version < static_cast<int32>(DbVersion::DialogDbCreated) || version > current_db_version()) {
40     TRY_STATUS(drop_dialog_db(db, version));
41     version = 0;
42   }
43 
44   auto create_notification_group_table = [&db] {
45     return db.exec(
46         "CREATE TABLE IF NOT EXISTS notification_groups (notification_group_id INT4 PRIMARY KEY, dialog_id "
47         "INT8, last_notification_date INT4)");
48   };
49 
50   auto create_last_notification_date_index = [&db] {
51     return db.exec(
52         "CREATE INDEX IF NOT EXISTS notification_group_by_last_notification_date ON notification_groups "
53         "(last_notification_date, dialog_id, notification_group_id) WHERE last_notification_date IS NOT NULL");
54   };
55 
56   auto add_dialogs_in_folder_index = [&db] {
57     return db.exec(
58         "CREATE INDEX IF NOT EXISTS dialog_in_folder_by_dialog_order ON dialogs (folder_id, dialog_order, dialog_id) "
59         "WHERE folder_id IS NOT NULL");
60   };
61 
62   if (version == 0) {
63     LOG(INFO) << "Create new dialog database";
64     was_created = true;
65     TRY_STATUS(
66         db.exec("CREATE TABLE IF NOT EXISTS dialogs (dialog_id INT8 PRIMARY KEY, dialog_order INT8, data BLOB, "
67                 "folder_id INT4)"));
68     TRY_STATUS(create_notification_group_table());
69     TRY_STATUS(create_last_notification_date_index());
70     TRY_STATUS(add_dialogs_in_folder_index());
71     version = current_db_version();
72   }
73   if (version < static_cast<int32>(DbVersion::AddNotificationsSupport)) {
74     TRY_STATUS(create_notification_group_table());
75     TRY_STATUS(create_last_notification_date_index());
76   }
77   if (version < static_cast<int32>(DbVersion::AddFolders)) {
78     TRY_STATUS(db.exec("DROP INDEX IF EXISTS dialog_by_dialog_order"));
79     TRY_STATUS(db.exec("ALTER TABLE dialogs ADD COLUMN folder_id INT4"));
80     TRY_STATUS(add_dialogs_in_folder_index());
81     TRY_STATUS(db.exec("UPDATE dialogs SET folder_id = 0 WHERE dialog_id < -1500000000000 AND dialog_order > 0"));
82   }
83   if (version < static_cast<int32>(DbVersion::StorePinnedDialogsInBinlog)) {
84     // 9221294780217032704 == get_dialog_order(Auto(), MIN_PINNED_DIALOG_DATE - 1)
85     TRY_RESULT(get_pinned_dialogs_stmt,
86                db.get_statement("SELECT dialog_id FROM dialogs WHERE folder_id == ?1 AND dialog_order > "
87                                 "9221294780217032704 ORDER BY dialog_order DESC, dialog_id DESC"));
88     for (auto folder_id = 0; folder_id < 2; folder_id++) {
89       vector<string> pinned_dialog_ids;
90       TRY_STATUS(get_pinned_dialogs_stmt.bind_int32(1, folder_id));
91       TRY_STATUS(get_pinned_dialogs_stmt.step());
92       while (get_pinned_dialogs_stmt.has_row()) {
93         pinned_dialog_ids.push_back(PSTRING() << get_pinned_dialogs_stmt.view_int64(0));
94         TRY_STATUS(get_pinned_dialogs_stmt.step());
95       }
96       get_pinned_dialogs_stmt.reset();
97 
98       binlog_pmc.set(PSTRING() << "pinned_dialog_ids" << folder_id, implode(pinned_dialog_ids, ','));
99     }
100   }
101 
102   return Status::OK();
103 }
104 
105 // NB: must happen inside a transaction
drop_dialog_db(SqliteDb & db,int version)106 Status drop_dialog_db(SqliteDb &db, int version) {
107   if (version < static_cast<int32>(DbVersion::DialogDbCreated)) {
108     if (version != 0) {
109       LOG(WARNING) << "Drop old pmc dialog_db";
110     }
111     SqliteKeyValue kv;
112     kv.init_with_connection(db.clone(), "common").ensure();
113     kv.erase_by_prefix("di");
114   }
115 
116   if (version != 0) {
117     LOG(WARNING) << "Drop dialog_db " << tag("version", version) << tag("current_db_version", current_db_version());
118   }
119   auto status = db.exec("DROP TABLE IF EXISTS dialogs");
120   TRY_STATUS(db.exec("DROP TABLE IF EXISTS notification_groups"));
121   return status;
122 }
123 
124 class DialogDbImpl final : public DialogDbSyncInterface {
125  public:
DialogDbImpl(SqliteDb db)126   explicit DialogDbImpl(SqliteDb db) : db_(std::move(db)) {
127     init().ensure();
128   }
129 
init()130   Status init() {
131     TRY_RESULT_ASSIGN(add_dialog_stmt_, db_.get_statement("INSERT OR REPLACE INTO dialogs VALUES(?1, ?2, ?3, ?4)"));
132     TRY_RESULT_ASSIGN(add_notification_group_stmt_,
133                       db_.get_statement("INSERT OR REPLACE INTO notification_groups VALUES(?1, ?2, ?3)"));
134     TRY_RESULT_ASSIGN(delete_notification_group_stmt_,
135                       db_.get_statement("DELETE FROM notification_groups WHERE notification_group_id = ?1"));
136     TRY_RESULT_ASSIGN(get_dialog_stmt_, db_.get_statement("SELECT data FROM dialogs WHERE dialog_id = ?1"));
137     TRY_RESULT_ASSIGN(
138         get_dialogs_stmt_,
139         db_.get_statement("SELECT data, dialog_id, dialog_order FROM dialogs WHERE "
140                           "folder_id == ?1 AND (dialog_order < ?2 OR (dialog_order = ?2 AND dialog_id < ?3)) ORDER "
141                           "BY dialog_order DESC, dialog_id DESC LIMIT ?4"));
142     TRY_RESULT_ASSIGN(
143         get_notification_groups_by_last_notification_date_stmt_,
144         db_.get_statement("SELECT notification_group_id, dialog_id, last_notification_date FROM notification_groups "
145                           "WHERE last_notification_date < ?1 OR (last_notification_date = ?1 "
146                           "AND (dialog_id < ?2 OR (dialog_id = ?2 AND notification_group_id < ?3))) ORDER BY "
147                           "last_notification_date DESC, dialog_id DESC LIMIT ?4"));
148     //                          "WHERE (last_notification_date, dialog_id, notification_group_id) < (?1, ?2, ?3) ORDER BY "
149     //                          "last_notification_date DESC, dialog_id DESC, notification_group_id DESC LIMIT ?4"));
150     TRY_RESULT_ASSIGN(
151         get_notification_group_stmt_,
152         db_.get_statement(
153             "SELECT dialog_id, last_notification_date FROM notification_groups WHERE notification_group_id = ?1"));
154     TRY_RESULT_ASSIGN(
155         get_secret_chat_count_stmt_,
156         db_.get_statement(
157             "SELECT COUNT(*) FROM dialogs WHERE folder_id = ?1 AND dialog_order > 0 AND dialog_id < -1500000000000"));
158 
159     // LOG(ERROR) << get_dialog_stmt_.explain().ok();
160     // LOG(ERROR) << get_dialogs_stmt_.explain().ok();
161     // LOG(ERROR) << get_notification_groups_by_last_notification_date_stmt_.explain().ok();
162     // LOG(ERROR) << get_notification_group_stmt_.explain().ok();
163     // LOG(FATAL) << "EXPLAINED";
164 
165     return Status::OK();
166   }
167 
add_dialog(DialogId dialog_id,FolderId folder_id,int64 order,BufferSlice data,vector<NotificationGroupKey> notification_groups)168   Status add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
169                     vector<NotificationGroupKey> notification_groups) final {
170     SCOPE_EXIT {
171       add_dialog_stmt_.reset();
172     };
173     add_dialog_stmt_.bind_int64(1, dialog_id.get()).ensure();
174     add_dialog_stmt_.bind_int64(2, order).ensure();
175     add_dialog_stmt_.bind_blob(3, data.as_slice()).ensure();
176     if (order > 0) {
177       add_dialog_stmt_.bind_int32(4, folder_id.get()).ensure();
178     } else {
179       add_dialog_stmt_.bind_null(4).ensure();
180     }
181 
182     TRY_STATUS(add_dialog_stmt_.step());
183 
184     for (auto &to_add : notification_groups) {
185       if (to_add.dialog_id.is_valid()) {
186         SCOPE_EXIT {
187           add_notification_group_stmt_.reset();
188         };
189         add_notification_group_stmt_.bind_int32(1, to_add.group_id.get()).ensure();
190         add_notification_group_stmt_.bind_int64(2, to_add.dialog_id.get()).ensure();
191         if (to_add.last_notification_date != 0) {
192           add_notification_group_stmt_.bind_int32(3, to_add.last_notification_date).ensure();
193         } else {
194           add_notification_group_stmt_.bind_null(3).ensure();
195         }
196         TRY_STATUS(add_notification_group_stmt_.step());
197       } else {
198         SCOPE_EXIT {
199           delete_notification_group_stmt_.reset();
200         };
201         delete_notification_group_stmt_.bind_int32(1, to_add.group_id.get()).ensure();
202         TRY_STATUS(delete_notification_group_stmt_.step());
203       }
204     }
205     return Status::OK();
206   }
207 
get_dialog(DialogId dialog_id)208   Result<BufferSlice> get_dialog(DialogId dialog_id) final {
209     SCOPE_EXIT {
210       get_dialog_stmt_.reset();
211     };
212 
213     get_dialog_stmt_.bind_int64(1, dialog_id.get()).ensure();
214     TRY_STATUS(get_dialog_stmt_.step());
215     if (!get_dialog_stmt_.has_row()) {
216       return Status::Error("Not found");
217     }
218     return BufferSlice(get_dialog_stmt_.view_blob(0));
219   }
220 
get_notification_group(NotificationGroupId notification_group_id)221   Result<NotificationGroupKey> get_notification_group(NotificationGroupId notification_group_id) final {
222     SCOPE_EXIT {
223       get_notification_group_stmt_.reset();
224     };
225     get_notification_group_stmt_.bind_int32(1, notification_group_id.get()).ensure();
226     TRY_STATUS(get_notification_group_stmt_.step());
227     if (!get_notification_group_stmt_.has_row()) {
228       return Status::Error("Not found");
229     }
230     return NotificationGroupKey(notification_group_id, DialogId(get_notification_group_stmt_.view_int64(0)),
231                                 get_last_notification_date(get_notification_group_stmt_, 1));
232   }
233 
get_secret_chat_count(FolderId folder_id)234   Result<int32> get_secret_chat_count(FolderId folder_id) final {
235     SCOPE_EXIT {
236       get_secret_chat_count_stmt_.reset();
237     };
238     get_secret_chat_count_stmt_.bind_int32(1, folder_id.get()).ensure();
239     TRY_STATUS(get_secret_chat_count_stmt_.step());
240     CHECK(get_secret_chat_count_stmt_.has_row());
241     return get_secret_chat_count_stmt_.view_int32(0);
242   }
243 
get_dialogs(FolderId folder_id,int64 order,DialogId dialog_id,int32 limit)244   Result<DialogDbGetDialogsResult> get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit) final {
245     SCOPE_EXIT {
246       get_dialogs_stmt_.reset();
247     };
248 
249     get_dialogs_stmt_.bind_int32(1, folder_id.get()).ensure();
250     get_dialogs_stmt_.bind_int64(2, order).ensure();
251     get_dialogs_stmt_.bind_int64(3, dialog_id.get()).ensure();
252     get_dialogs_stmt_.bind_int32(4, limit).ensure();
253 
254     DialogDbGetDialogsResult result;
255     TRY_STATUS(get_dialogs_stmt_.step());
256     while (get_dialogs_stmt_.has_row()) {
257       BufferSlice data(get_dialogs_stmt_.view_blob(0));
258       result.next_dialog_id = DialogId(get_dialogs_stmt_.view_int64(1));
259       result.next_order = get_dialogs_stmt_.view_int64(2);
260       LOG(INFO) << "Load " << result.next_dialog_id << " with order " << result.next_order;
261       result.dialogs.emplace_back(std::move(data));
262       TRY_STATUS(get_dialogs_stmt_.step());
263     }
264 
265     return std::move(result);
266   }
267 
get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key,int32 limit)268   Result<vector<NotificationGroupKey>> get_notification_groups_by_last_notification_date(
269       NotificationGroupKey notification_group_key, int32 limit) final {
270     auto &stmt = get_notification_groups_by_last_notification_date_stmt_;
271     SCOPE_EXIT {
272       stmt.reset();
273     };
274 
275     stmt.bind_int32(1, notification_group_key.last_notification_date).ensure();
276     stmt.bind_int64(2, notification_group_key.dialog_id.get()).ensure();
277     stmt.bind_int32(3, notification_group_key.group_id.get()).ensure();
278     stmt.bind_int32(4, limit).ensure();
279 
280     vector<NotificationGroupKey> notification_groups;
281     TRY_STATUS(stmt.step());
282     while (stmt.has_row()) {
283       notification_groups.emplace_back(NotificationGroupId(stmt.view_int32(0)), DialogId(stmt.view_int64(1)),
284                                        get_last_notification_date(stmt, 2));
285       TRY_STATUS(stmt.step());
286     }
287 
288     return std::move(notification_groups);
289   }
290 
begin_read_transaction()291   Status begin_read_transaction() final {
292     return db_.begin_read_transaction();
293   }
begin_write_transaction()294   Status begin_write_transaction() final {
295     return db_.begin_write_transaction();
296   }
commit_transaction()297   Status commit_transaction() final {
298     return db_.commit_transaction();
299   }
300 
301  private:
302   SqliteDb db_;
303 
304   SqliteStatement add_dialog_stmt_;
305   SqliteStatement add_notification_group_stmt_;
306   SqliteStatement delete_notification_group_stmt_;
307   SqliteStatement get_dialog_stmt_;
308   SqliteStatement get_dialogs_stmt_;
309   SqliteStatement get_notification_groups_by_last_notification_date_stmt_;
310   SqliteStatement get_notification_group_stmt_;
311   SqliteStatement get_secret_chat_count_stmt_;
312 
get_last_notification_date(SqliteStatement & stmt,int id)313   static int32 get_last_notification_date(SqliteStatement &stmt, int id) {
314     if (stmt.view_datatype(id) == SqliteStatement::Datatype::Null) {
315       return 0;
316     }
317     return stmt.view_int32(id);
318   }
319 };
320 
create_dialog_db_sync(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)321 std::shared_ptr<DialogDbSyncSafeInterface> create_dialog_db_sync(
322     std::shared_ptr<SqliteConnectionSafe> sqlite_connection) {
323   class DialogDbSyncSafe final : public DialogDbSyncSafeInterface {
324    public:
325     explicit DialogDbSyncSafe(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)
326         : lsls_db_([safe_connection = std::move(sqlite_connection)] {
327           return make_unique<DialogDbImpl>(safe_connection->get().clone());
328         }) {
329     }
330     DialogDbSyncInterface &get() final {
331       return *lsls_db_.get();
332     }
333 
334    private:
335     LazySchedulerLocalStorage<unique_ptr<DialogDbSyncInterface>> lsls_db_;
336   };
337   return std::make_shared<DialogDbSyncSafe>(std::move(sqlite_connection));
338 }
339 
340 class DialogDbAsync final : public DialogDbAsyncInterface {
341  public:
DialogDbAsync(std::shared_ptr<DialogDbSyncSafeInterface> sync_db,int32 scheduler_id)342   DialogDbAsync(std::shared_ptr<DialogDbSyncSafeInterface> sync_db, int32 scheduler_id) {
343     impl_ = create_actor_on_scheduler<Impl>("DialogDbActor", scheduler_id, std::move(sync_db));
344   }
345 
add_dialog(DialogId dialog_id,FolderId folder_id,int64 order,BufferSlice data,vector<NotificationGroupKey> notification_groups,Promise<> promise)346   void add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
347                   vector<NotificationGroupKey> notification_groups, Promise<> promise) final {
348     send_closure(impl_, &Impl::add_dialog, dialog_id, folder_id, order, std::move(data), std::move(notification_groups),
349                  std::move(promise));
350   }
351 
get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key,int32 limit,Promise<vector<NotificationGroupKey>> promise)352   void get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key, int32 limit,
353                                                          Promise<vector<NotificationGroupKey>> promise) final {
354     send_closure(impl_, &Impl::get_notification_groups_by_last_notification_date, notification_group_key, limit,
355                  std::move(promise));
356   }
357 
get_notification_group(NotificationGroupId notification_group_id,Promise<NotificationGroupKey> promise)358   void get_notification_group(NotificationGroupId notification_group_id, Promise<NotificationGroupKey> promise) final {
359     send_closure(impl_, &Impl::get_notification_group, notification_group_id, std::move(promise));
360   }
361 
get_secret_chat_count(FolderId folder_id,Promise<int32> promise)362   void get_secret_chat_count(FolderId folder_id, Promise<int32> promise) final {
363     send_closure(impl_, &Impl::get_secret_chat_count, folder_id, std::move(promise));
364   }
365 
get_dialog(DialogId dialog_id,Promise<BufferSlice> promise)366   void get_dialog(DialogId dialog_id, Promise<BufferSlice> promise) final {
367     send_closure_later(impl_, &Impl::get_dialog, dialog_id, std::move(promise));
368   }
369 
get_dialogs(FolderId folder_id,int64 order,DialogId dialog_id,int32 limit,Promise<DialogDbGetDialogsResult> promise)370   void get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit,
371                    Promise<DialogDbGetDialogsResult> promise) final {
372     send_closure_later(impl_, &Impl::get_dialogs, folder_id, order, dialog_id, limit, std::move(promise));
373   }
374 
close(Promise<> promise)375   void close(Promise<> promise) final {
376     send_closure_later(impl_, &Impl::close, std::move(promise));
377   }
378 
379  private:
380   class Impl final : public Actor {
381    public:
Impl(std::shared_ptr<DialogDbSyncSafeInterface> sync_db_safe)382     explicit Impl(std::shared_ptr<DialogDbSyncSafeInterface> sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) {
383     }
384 
add_dialog(DialogId dialog_id,FolderId folder_id,int64 order,BufferSlice data,vector<NotificationGroupKey> notification_groups,Promise<> promise)385     void add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
386                     vector<NotificationGroupKey> notification_groups, Promise<> promise) {
387       add_write_query([this, dialog_id, folder_id, order, promise = std::move(promise), data = std::move(data),
388                        notification_groups = std::move(notification_groups)](Unit) mutable {
389         on_write_result(std::move(promise), sync_db_->add_dialog(dialog_id, folder_id, order, std::move(data),
390                                                                  std::move(notification_groups)));
391       });
392     }
393 
on_write_result(Promise<> promise,Status status)394     void on_write_result(Promise<> promise, Status status) {
395       // We are inside a transaction and don't know how to handle the error
396       status.ensure();
397       pending_write_results_.emplace_back(std::move(promise), std::move(status));
398     }
399 
get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key,int32 limit,Promise<vector<NotificationGroupKey>> promise)400     void get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key, int32 limit,
401                                                            Promise<vector<NotificationGroupKey>> promise) {
402       add_read_query();
403       promise.set_result(sync_db_->get_notification_groups_by_last_notification_date(notification_group_key, limit));
404     }
405 
get_notification_group(NotificationGroupId notification_group_id,Promise<NotificationGroupKey> promise)406     void get_notification_group(NotificationGroupId notification_group_id, Promise<NotificationGroupKey> promise) {
407       add_read_query();
408       promise.set_result(sync_db_->get_notification_group(notification_group_id));
409     }
410 
get_secret_chat_count(FolderId folder_id,Promise<int32> promise)411     void get_secret_chat_count(FolderId folder_id, Promise<int32> promise) {
412       add_read_query();
413       promise.set_result(sync_db_->get_secret_chat_count(folder_id));
414     }
415 
get_dialog(DialogId dialog_id,Promise<BufferSlice> promise)416     void get_dialog(DialogId dialog_id, Promise<BufferSlice> promise) {
417       add_read_query();
418       promise.set_result(sync_db_->get_dialog(dialog_id));
419     }
420 
get_dialogs(FolderId folder_id,int64 order,DialogId dialog_id,int32 limit,Promise<DialogDbGetDialogsResult> promise)421     void get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit,
422                      Promise<DialogDbGetDialogsResult> promise) {
423       add_read_query();
424       promise.set_result(sync_db_->get_dialogs(folder_id, order, dialog_id, limit));
425     }
426 
close(Promise<> promise)427     void close(Promise<> promise) {
428       do_flush();
429       sync_db_safe_.reset();
430       sync_db_ = nullptr;
431       promise.set_value(Unit());
432       stop();
433     }
434 
435    private:
436     std::shared_ptr<DialogDbSyncSafeInterface> sync_db_safe_;
437     DialogDbSyncInterface *sync_db_ = nullptr;
438 
439     static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
440     static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
441 
442     //NB: order is important, destructor of pending_writes_ will change pending_write_results_
443     std::vector<std::pair<Promise<>, Status>> pending_write_results_;
444     vector<Promise<>> pending_writes_;
445     double wakeup_at_ = 0;
446 
447     template <class F>
add_write_query(F && f)448     void add_write_query(F &&f) {
449       pending_writes_.push_back(PromiseCreator::lambda(std::forward<F>(f), PromiseCreator::Ignore()));
450       if (pending_writes_.size() > MAX_PENDING_QUERIES_COUNT) {
451         do_flush();
452         wakeup_at_ = 0;
453       } else if (wakeup_at_ == 0) {
454         wakeup_at_ = Time::now_cached() + MAX_PENDING_QUERIES_DELAY;
455       }
456       if (wakeup_at_ != 0) {
457         set_timeout_at(wakeup_at_);
458       }
459     }
460 
add_read_query()461     void add_read_query() {
462       do_flush();
463     }
464 
do_flush()465     void do_flush() {
466       if (pending_writes_.empty()) {
467         return;
468       }
469       sync_db_->begin_write_transaction().ensure();
470       for (auto &query : pending_writes_) {
471         query.set_value(Unit());
472       }
473       sync_db_->commit_transaction().ensure();
474       pending_writes_.clear();
475       for (auto &p : pending_write_results_) {
476         p.first.set_result(std::move(p.second));
477       }
478       pending_write_results_.clear();
479       cancel_timeout();
480     }
481 
timeout_expired()482     void timeout_expired() final {
483       do_flush();
484     }
485 
start_up()486     void start_up() final {
487       sync_db_ = &sync_db_safe_->get();
488     }
489   };
490   ActorOwn<Impl> impl_;
491 };
492 
create_dialog_db_async(std::shared_ptr<DialogDbSyncSafeInterface> sync_db,int32 scheduler_id)493 std::shared_ptr<DialogDbAsyncInterface> create_dialog_db_async(std::shared_ptr<DialogDbSyncSafeInterface> sync_db,
494                                                                int32 scheduler_id) {
495   return std::make_shared<DialogDbAsync>(std::move(sync_db), scheduler_id);
496 }
497 
498 }  // namespace td
499