1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/DialogDb.h"
8
9 #include "td/telegram/Version.h"
10
11 #include "td/db/SqliteConnectionSafe.h"
12 #include "td/db/SqliteDb.h"
13 #include "td/db/SqliteKeyValue.h"
14 #include "td/db/SqliteStatement.h"
15
16 #include "td/actor/actor.h"
17 #include "td/actor/SchedulerLocalStorage.h"
18
19 #include "td/utils/common.h"
20 #include "td/utils/format.h"
21 #include "td/utils/logging.h"
22 #include "td/utils/misc.h"
23 #include "td/utils/ScopeGuard.h"
24 #include "td/utils/SliceBuilder.h"
25 #include "td/utils/Time.h"
26
27 namespace td {
28 // NB: must happen inside a transaction
init_dialog_db(SqliteDb & db,int32 version,KeyValueSyncInterface & binlog_pmc,bool & was_created)29 Status init_dialog_db(SqliteDb &db, int32 version, KeyValueSyncInterface &binlog_pmc, bool &was_created) {
30 LOG(INFO) << "Init dialog database " << tag("version", version);
31 was_created = false;
32
33 // Check if database exists
34 TRY_RESULT(has_table, db.has_table("dialogs"));
35 if (!has_table) {
36 version = 0;
37 }
38
39 if (version < static_cast<int32>(DbVersion::DialogDbCreated) || version > current_db_version()) {
40 TRY_STATUS(drop_dialog_db(db, version));
41 version = 0;
42 }
43
44 auto create_notification_group_table = [&db] {
45 return db.exec(
46 "CREATE TABLE IF NOT EXISTS notification_groups (notification_group_id INT4 PRIMARY KEY, dialog_id "
47 "INT8, last_notification_date INT4)");
48 };
49
50 auto create_last_notification_date_index = [&db] {
51 return db.exec(
52 "CREATE INDEX IF NOT EXISTS notification_group_by_last_notification_date ON notification_groups "
53 "(last_notification_date, dialog_id, notification_group_id) WHERE last_notification_date IS NOT NULL");
54 };
55
56 auto add_dialogs_in_folder_index = [&db] {
57 return db.exec(
58 "CREATE INDEX IF NOT EXISTS dialog_in_folder_by_dialog_order ON dialogs (folder_id, dialog_order, dialog_id) "
59 "WHERE folder_id IS NOT NULL");
60 };
61
62 if (version == 0) {
63 LOG(INFO) << "Create new dialog database";
64 was_created = true;
65 TRY_STATUS(
66 db.exec("CREATE TABLE IF NOT EXISTS dialogs (dialog_id INT8 PRIMARY KEY, dialog_order INT8, data BLOB, "
67 "folder_id INT4)"));
68 TRY_STATUS(create_notification_group_table());
69 TRY_STATUS(create_last_notification_date_index());
70 TRY_STATUS(add_dialogs_in_folder_index());
71 version = current_db_version();
72 }
73 if (version < static_cast<int32>(DbVersion::AddNotificationsSupport)) {
74 TRY_STATUS(create_notification_group_table());
75 TRY_STATUS(create_last_notification_date_index());
76 }
77 if (version < static_cast<int32>(DbVersion::AddFolders)) {
78 TRY_STATUS(db.exec("DROP INDEX IF EXISTS dialog_by_dialog_order"));
79 TRY_STATUS(db.exec("ALTER TABLE dialogs ADD COLUMN folder_id INT4"));
80 TRY_STATUS(add_dialogs_in_folder_index());
81 TRY_STATUS(db.exec("UPDATE dialogs SET folder_id = 0 WHERE dialog_id < -1500000000000 AND dialog_order > 0"));
82 }
83 if (version < static_cast<int32>(DbVersion::StorePinnedDialogsInBinlog)) {
84 // 9221294780217032704 == get_dialog_order(Auto(), MIN_PINNED_DIALOG_DATE - 1)
85 TRY_RESULT(get_pinned_dialogs_stmt,
86 db.get_statement("SELECT dialog_id FROM dialogs WHERE folder_id == ?1 AND dialog_order > "
87 "9221294780217032704 ORDER BY dialog_order DESC, dialog_id DESC"));
88 for (auto folder_id = 0; folder_id < 2; folder_id++) {
89 vector<string> pinned_dialog_ids;
90 TRY_STATUS(get_pinned_dialogs_stmt.bind_int32(1, folder_id));
91 TRY_STATUS(get_pinned_dialogs_stmt.step());
92 while (get_pinned_dialogs_stmt.has_row()) {
93 pinned_dialog_ids.push_back(PSTRING() << get_pinned_dialogs_stmt.view_int64(0));
94 TRY_STATUS(get_pinned_dialogs_stmt.step());
95 }
96 get_pinned_dialogs_stmt.reset();
97
98 binlog_pmc.set(PSTRING() << "pinned_dialog_ids" << folder_id, implode(pinned_dialog_ids, ','));
99 }
100 }
101
102 return Status::OK();
103 }
104
105 // NB: must happen inside a transaction
drop_dialog_db(SqliteDb & db,int version)106 Status drop_dialog_db(SqliteDb &db, int version) {
107 if (version < static_cast<int32>(DbVersion::DialogDbCreated)) {
108 if (version != 0) {
109 LOG(WARNING) << "Drop old pmc dialog_db";
110 }
111 SqliteKeyValue kv;
112 kv.init_with_connection(db.clone(), "common").ensure();
113 kv.erase_by_prefix("di");
114 }
115
116 if (version != 0) {
117 LOG(WARNING) << "Drop dialog_db " << tag("version", version) << tag("current_db_version", current_db_version());
118 }
119 auto status = db.exec("DROP TABLE IF EXISTS dialogs");
120 TRY_STATUS(db.exec("DROP TABLE IF EXISTS notification_groups"));
121 return status;
122 }
123
124 class DialogDbImpl final : public DialogDbSyncInterface {
125 public:
DialogDbImpl(SqliteDb db)126 explicit DialogDbImpl(SqliteDb db) : db_(std::move(db)) {
127 init().ensure();
128 }
129
init()130 Status init() {
131 TRY_RESULT_ASSIGN(add_dialog_stmt_, db_.get_statement("INSERT OR REPLACE INTO dialogs VALUES(?1, ?2, ?3, ?4)"));
132 TRY_RESULT_ASSIGN(add_notification_group_stmt_,
133 db_.get_statement("INSERT OR REPLACE INTO notification_groups VALUES(?1, ?2, ?3)"));
134 TRY_RESULT_ASSIGN(delete_notification_group_stmt_,
135 db_.get_statement("DELETE FROM notification_groups WHERE notification_group_id = ?1"));
136 TRY_RESULT_ASSIGN(get_dialog_stmt_, db_.get_statement("SELECT data FROM dialogs WHERE dialog_id = ?1"));
137 TRY_RESULT_ASSIGN(
138 get_dialogs_stmt_,
139 db_.get_statement("SELECT data, dialog_id, dialog_order FROM dialogs WHERE "
140 "folder_id == ?1 AND (dialog_order < ?2 OR (dialog_order = ?2 AND dialog_id < ?3)) ORDER "
141 "BY dialog_order DESC, dialog_id DESC LIMIT ?4"));
142 TRY_RESULT_ASSIGN(
143 get_notification_groups_by_last_notification_date_stmt_,
144 db_.get_statement("SELECT notification_group_id, dialog_id, last_notification_date FROM notification_groups "
145 "WHERE last_notification_date < ?1 OR (last_notification_date = ?1 "
146 "AND (dialog_id < ?2 OR (dialog_id = ?2 AND notification_group_id < ?3))) ORDER BY "
147 "last_notification_date DESC, dialog_id DESC LIMIT ?4"));
148 // "WHERE (last_notification_date, dialog_id, notification_group_id) < (?1, ?2, ?3) ORDER BY "
149 // "last_notification_date DESC, dialog_id DESC, notification_group_id DESC LIMIT ?4"));
150 TRY_RESULT_ASSIGN(
151 get_notification_group_stmt_,
152 db_.get_statement(
153 "SELECT dialog_id, last_notification_date FROM notification_groups WHERE notification_group_id = ?1"));
154 TRY_RESULT_ASSIGN(
155 get_secret_chat_count_stmt_,
156 db_.get_statement(
157 "SELECT COUNT(*) FROM dialogs WHERE folder_id = ?1 AND dialog_order > 0 AND dialog_id < -1500000000000"));
158
159 // LOG(ERROR) << get_dialog_stmt_.explain().ok();
160 // LOG(ERROR) << get_dialogs_stmt_.explain().ok();
161 // LOG(ERROR) << get_notification_groups_by_last_notification_date_stmt_.explain().ok();
162 // LOG(ERROR) << get_notification_group_stmt_.explain().ok();
163 // LOG(FATAL) << "EXPLAINED";
164
165 return Status::OK();
166 }
167
add_dialog(DialogId dialog_id,FolderId folder_id,int64 order,BufferSlice data,vector<NotificationGroupKey> notification_groups)168 Status add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
169 vector<NotificationGroupKey> notification_groups) final {
170 SCOPE_EXIT {
171 add_dialog_stmt_.reset();
172 };
173 add_dialog_stmt_.bind_int64(1, dialog_id.get()).ensure();
174 add_dialog_stmt_.bind_int64(2, order).ensure();
175 add_dialog_stmt_.bind_blob(3, data.as_slice()).ensure();
176 if (order > 0) {
177 add_dialog_stmt_.bind_int32(4, folder_id.get()).ensure();
178 } else {
179 add_dialog_stmt_.bind_null(4).ensure();
180 }
181
182 TRY_STATUS(add_dialog_stmt_.step());
183
184 for (auto &to_add : notification_groups) {
185 if (to_add.dialog_id.is_valid()) {
186 SCOPE_EXIT {
187 add_notification_group_stmt_.reset();
188 };
189 add_notification_group_stmt_.bind_int32(1, to_add.group_id.get()).ensure();
190 add_notification_group_stmt_.bind_int64(2, to_add.dialog_id.get()).ensure();
191 if (to_add.last_notification_date != 0) {
192 add_notification_group_stmt_.bind_int32(3, to_add.last_notification_date).ensure();
193 } else {
194 add_notification_group_stmt_.bind_null(3).ensure();
195 }
196 TRY_STATUS(add_notification_group_stmt_.step());
197 } else {
198 SCOPE_EXIT {
199 delete_notification_group_stmt_.reset();
200 };
201 delete_notification_group_stmt_.bind_int32(1, to_add.group_id.get()).ensure();
202 TRY_STATUS(delete_notification_group_stmt_.step());
203 }
204 }
205 return Status::OK();
206 }
207
get_dialog(DialogId dialog_id)208 Result<BufferSlice> get_dialog(DialogId dialog_id) final {
209 SCOPE_EXIT {
210 get_dialog_stmt_.reset();
211 };
212
213 get_dialog_stmt_.bind_int64(1, dialog_id.get()).ensure();
214 TRY_STATUS(get_dialog_stmt_.step());
215 if (!get_dialog_stmt_.has_row()) {
216 return Status::Error("Not found");
217 }
218 return BufferSlice(get_dialog_stmt_.view_blob(0));
219 }
220
get_notification_group(NotificationGroupId notification_group_id)221 Result<NotificationGroupKey> get_notification_group(NotificationGroupId notification_group_id) final {
222 SCOPE_EXIT {
223 get_notification_group_stmt_.reset();
224 };
225 get_notification_group_stmt_.bind_int32(1, notification_group_id.get()).ensure();
226 TRY_STATUS(get_notification_group_stmt_.step());
227 if (!get_notification_group_stmt_.has_row()) {
228 return Status::Error("Not found");
229 }
230 return NotificationGroupKey(notification_group_id, DialogId(get_notification_group_stmt_.view_int64(0)),
231 get_last_notification_date(get_notification_group_stmt_, 1));
232 }
233
get_secret_chat_count(FolderId folder_id)234 Result<int32> get_secret_chat_count(FolderId folder_id) final {
235 SCOPE_EXIT {
236 get_secret_chat_count_stmt_.reset();
237 };
238 get_secret_chat_count_stmt_.bind_int32(1, folder_id.get()).ensure();
239 TRY_STATUS(get_secret_chat_count_stmt_.step());
240 CHECK(get_secret_chat_count_stmt_.has_row());
241 return get_secret_chat_count_stmt_.view_int32(0);
242 }
243
get_dialogs(FolderId folder_id,int64 order,DialogId dialog_id,int32 limit)244 Result<DialogDbGetDialogsResult> get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit) final {
245 SCOPE_EXIT {
246 get_dialogs_stmt_.reset();
247 };
248
249 get_dialogs_stmt_.bind_int32(1, folder_id.get()).ensure();
250 get_dialogs_stmt_.bind_int64(2, order).ensure();
251 get_dialogs_stmt_.bind_int64(3, dialog_id.get()).ensure();
252 get_dialogs_stmt_.bind_int32(4, limit).ensure();
253
254 DialogDbGetDialogsResult result;
255 TRY_STATUS(get_dialogs_stmt_.step());
256 while (get_dialogs_stmt_.has_row()) {
257 BufferSlice data(get_dialogs_stmt_.view_blob(0));
258 result.next_dialog_id = DialogId(get_dialogs_stmt_.view_int64(1));
259 result.next_order = get_dialogs_stmt_.view_int64(2);
260 LOG(INFO) << "Load " << result.next_dialog_id << " with order " << result.next_order;
261 result.dialogs.emplace_back(std::move(data));
262 TRY_STATUS(get_dialogs_stmt_.step());
263 }
264
265 return std::move(result);
266 }
267
get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key,int32 limit)268 Result<vector<NotificationGroupKey>> get_notification_groups_by_last_notification_date(
269 NotificationGroupKey notification_group_key, int32 limit) final {
270 auto &stmt = get_notification_groups_by_last_notification_date_stmt_;
271 SCOPE_EXIT {
272 stmt.reset();
273 };
274
275 stmt.bind_int32(1, notification_group_key.last_notification_date).ensure();
276 stmt.bind_int64(2, notification_group_key.dialog_id.get()).ensure();
277 stmt.bind_int32(3, notification_group_key.group_id.get()).ensure();
278 stmt.bind_int32(4, limit).ensure();
279
280 vector<NotificationGroupKey> notification_groups;
281 TRY_STATUS(stmt.step());
282 while (stmt.has_row()) {
283 notification_groups.emplace_back(NotificationGroupId(stmt.view_int32(0)), DialogId(stmt.view_int64(1)),
284 get_last_notification_date(stmt, 2));
285 TRY_STATUS(stmt.step());
286 }
287
288 return std::move(notification_groups);
289 }
290
begin_read_transaction()291 Status begin_read_transaction() final {
292 return db_.begin_read_transaction();
293 }
begin_write_transaction()294 Status begin_write_transaction() final {
295 return db_.begin_write_transaction();
296 }
commit_transaction()297 Status commit_transaction() final {
298 return db_.commit_transaction();
299 }
300
301 private:
302 SqliteDb db_;
303
304 SqliteStatement add_dialog_stmt_;
305 SqliteStatement add_notification_group_stmt_;
306 SqliteStatement delete_notification_group_stmt_;
307 SqliteStatement get_dialog_stmt_;
308 SqliteStatement get_dialogs_stmt_;
309 SqliteStatement get_notification_groups_by_last_notification_date_stmt_;
310 SqliteStatement get_notification_group_stmt_;
311 SqliteStatement get_secret_chat_count_stmt_;
312
get_last_notification_date(SqliteStatement & stmt,int id)313 static int32 get_last_notification_date(SqliteStatement &stmt, int id) {
314 if (stmt.view_datatype(id) == SqliteStatement::Datatype::Null) {
315 return 0;
316 }
317 return stmt.view_int32(id);
318 }
319 };
320
create_dialog_db_sync(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)321 std::shared_ptr<DialogDbSyncSafeInterface> create_dialog_db_sync(
322 std::shared_ptr<SqliteConnectionSafe> sqlite_connection) {
323 class DialogDbSyncSafe final : public DialogDbSyncSafeInterface {
324 public:
325 explicit DialogDbSyncSafe(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)
326 : lsls_db_([safe_connection = std::move(sqlite_connection)] {
327 return make_unique<DialogDbImpl>(safe_connection->get().clone());
328 }) {
329 }
330 DialogDbSyncInterface &get() final {
331 return *lsls_db_.get();
332 }
333
334 private:
335 LazySchedulerLocalStorage<unique_ptr<DialogDbSyncInterface>> lsls_db_;
336 };
337 return std::make_shared<DialogDbSyncSafe>(std::move(sqlite_connection));
338 }
339
340 class DialogDbAsync final : public DialogDbAsyncInterface {
341 public:
DialogDbAsync(std::shared_ptr<DialogDbSyncSafeInterface> sync_db,int32 scheduler_id)342 DialogDbAsync(std::shared_ptr<DialogDbSyncSafeInterface> sync_db, int32 scheduler_id) {
343 impl_ = create_actor_on_scheduler<Impl>("DialogDbActor", scheduler_id, std::move(sync_db));
344 }
345
add_dialog(DialogId dialog_id,FolderId folder_id,int64 order,BufferSlice data,vector<NotificationGroupKey> notification_groups,Promise<> promise)346 void add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
347 vector<NotificationGroupKey> notification_groups, Promise<> promise) final {
348 send_closure(impl_, &Impl::add_dialog, dialog_id, folder_id, order, std::move(data), std::move(notification_groups),
349 std::move(promise));
350 }
351
get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key,int32 limit,Promise<vector<NotificationGroupKey>> promise)352 void get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key, int32 limit,
353 Promise<vector<NotificationGroupKey>> promise) final {
354 send_closure(impl_, &Impl::get_notification_groups_by_last_notification_date, notification_group_key, limit,
355 std::move(promise));
356 }
357
get_notification_group(NotificationGroupId notification_group_id,Promise<NotificationGroupKey> promise)358 void get_notification_group(NotificationGroupId notification_group_id, Promise<NotificationGroupKey> promise) final {
359 send_closure(impl_, &Impl::get_notification_group, notification_group_id, std::move(promise));
360 }
361
get_secret_chat_count(FolderId folder_id,Promise<int32> promise)362 void get_secret_chat_count(FolderId folder_id, Promise<int32> promise) final {
363 send_closure(impl_, &Impl::get_secret_chat_count, folder_id, std::move(promise));
364 }
365
get_dialog(DialogId dialog_id,Promise<BufferSlice> promise)366 void get_dialog(DialogId dialog_id, Promise<BufferSlice> promise) final {
367 send_closure_later(impl_, &Impl::get_dialog, dialog_id, std::move(promise));
368 }
369
get_dialogs(FolderId folder_id,int64 order,DialogId dialog_id,int32 limit,Promise<DialogDbGetDialogsResult> promise)370 void get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit,
371 Promise<DialogDbGetDialogsResult> promise) final {
372 send_closure_later(impl_, &Impl::get_dialogs, folder_id, order, dialog_id, limit, std::move(promise));
373 }
374
close(Promise<> promise)375 void close(Promise<> promise) final {
376 send_closure_later(impl_, &Impl::close, std::move(promise));
377 }
378
379 private:
380 class Impl final : public Actor {
381 public:
Impl(std::shared_ptr<DialogDbSyncSafeInterface> sync_db_safe)382 explicit Impl(std::shared_ptr<DialogDbSyncSafeInterface> sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) {
383 }
384
add_dialog(DialogId dialog_id,FolderId folder_id,int64 order,BufferSlice data,vector<NotificationGroupKey> notification_groups,Promise<> promise)385 void add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
386 vector<NotificationGroupKey> notification_groups, Promise<> promise) {
387 add_write_query([this, dialog_id, folder_id, order, promise = std::move(promise), data = std::move(data),
388 notification_groups = std::move(notification_groups)](Unit) mutable {
389 on_write_result(std::move(promise), sync_db_->add_dialog(dialog_id, folder_id, order, std::move(data),
390 std::move(notification_groups)));
391 });
392 }
393
on_write_result(Promise<> promise,Status status)394 void on_write_result(Promise<> promise, Status status) {
395 // We are inside a transaction and don't know how to handle the error
396 status.ensure();
397 pending_write_results_.emplace_back(std::move(promise), std::move(status));
398 }
399
get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key,int32 limit,Promise<vector<NotificationGroupKey>> promise)400 void get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key, int32 limit,
401 Promise<vector<NotificationGroupKey>> promise) {
402 add_read_query();
403 promise.set_result(sync_db_->get_notification_groups_by_last_notification_date(notification_group_key, limit));
404 }
405
get_notification_group(NotificationGroupId notification_group_id,Promise<NotificationGroupKey> promise)406 void get_notification_group(NotificationGroupId notification_group_id, Promise<NotificationGroupKey> promise) {
407 add_read_query();
408 promise.set_result(sync_db_->get_notification_group(notification_group_id));
409 }
410
get_secret_chat_count(FolderId folder_id,Promise<int32> promise)411 void get_secret_chat_count(FolderId folder_id, Promise<int32> promise) {
412 add_read_query();
413 promise.set_result(sync_db_->get_secret_chat_count(folder_id));
414 }
415
get_dialog(DialogId dialog_id,Promise<BufferSlice> promise)416 void get_dialog(DialogId dialog_id, Promise<BufferSlice> promise) {
417 add_read_query();
418 promise.set_result(sync_db_->get_dialog(dialog_id));
419 }
420
get_dialogs(FolderId folder_id,int64 order,DialogId dialog_id,int32 limit,Promise<DialogDbGetDialogsResult> promise)421 void get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit,
422 Promise<DialogDbGetDialogsResult> promise) {
423 add_read_query();
424 promise.set_result(sync_db_->get_dialogs(folder_id, order, dialog_id, limit));
425 }
426
close(Promise<> promise)427 void close(Promise<> promise) {
428 do_flush();
429 sync_db_safe_.reset();
430 sync_db_ = nullptr;
431 promise.set_value(Unit());
432 stop();
433 }
434
435 private:
436 std::shared_ptr<DialogDbSyncSafeInterface> sync_db_safe_;
437 DialogDbSyncInterface *sync_db_ = nullptr;
438
439 static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
440 static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
441
442 //NB: order is important, destructor of pending_writes_ will change pending_write_results_
443 std::vector<std::pair<Promise<>, Status>> pending_write_results_;
444 vector<Promise<>> pending_writes_;
445 double wakeup_at_ = 0;
446
447 template <class F>
add_write_query(F && f)448 void add_write_query(F &&f) {
449 pending_writes_.push_back(PromiseCreator::lambda(std::forward<F>(f), PromiseCreator::Ignore()));
450 if (pending_writes_.size() > MAX_PENDING_QUERIES_COUNT) {
451 do_flush();
452 wakeup_at_ = 0;
453 } else if (wakeup_at_ == 0) {
454 wakeup_at_ = Time::now_cached() + MAX_PENDING_QUERIES_DELAY;
455 }
456 if (wakeup_at_ != 0) {
457 set_timeout_at(wakeup_at_);
458 }
459 }
460
add_read_query()461 void add_read_query() {
462 do_flush();
463 }
464
do_flush()465 void do_flush() {
466 if (pending_writes_.empty()) {
467 return;
468 }
469 sync_db_->begin_write_transaction().ensure();
470 for (auto &query : pending_writes_) {
471 query.set_value(Unit());
472 }
473 sync_db_->commit_transaction().ensure();
474 pending_writes_.clear();
475 for (auto &p : pending_write_results_) {
476 p.first.set_result(std::move(p.second));
477 }
478 pending_write_results_.clear();
479 cancel_timeout();
480 }
481
timeout_expired()482 void timeout_expired() final {
483 do_flush();
484 }
485
start_up()486 void start_up() final {
487 sync_db_ = &sync_db_safe_->get();
488 }
489 };
490 ActorOwn<Impl> impl_;
491 };
492
create_dialog_db_async(std::shared_ptr<DialogDbSyncSafeInterface> sync_db,int32 scheduler_id)493 std::shared_ptr<DialogDbAsyncInterface> create_dialog_db_async(std::shared_ptr<DialogDbSyncSafeInterface> sync_db,
494 int32 scheduler_id) {
495 return std::make_shared<DialogDbAsync>(std::move(sync_db), scheduler_id);
496 }
497
498 } // namespace td
499