1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/MessagesDb.h"
8 
9 #include "td/telegram/logevent/LogEvent.h"
10 #include "td/telegram/Version.h"
11 
12 #include "td/db/SqliteConnectionSafe.h"
13 #include "td/db/SqliteDb.h"
14 #include "td/db/SqliteStatement.h"
15 
16 #include "td/actor/actor.h"
17 #include "td/actor/PromiseFuture.h"
18 #include "td/actor/SchedulerLocalStorage.h"
19 
20 #include "td/utils/format.h"
21 #include "td/utils/logging.h"
22 #include "td/utils/ScopeGuard.h"
23 #include "td/utils/Slice.h"
24 #include "td/utils/SliceBuilder.h"
25 #include "td/utils/StackAllocator.h"
26 #include "td/utils/StringBuilder.h"
27 #include "td/utils/Time.h"
28 #include "td/utils/tl_helpers.h"
29 #include "td/utils/unicode.h"
30 #include "td/utils/utf8.h"
31 
32 #include <algorithm>
33 #include <array>
34 #include <iterator>
35 #include <limits>
36 #include <tuple>
37 #include <utility>
38 
39 namespace td {
40 
41 static constexpr int32 MESSAGES_DB_INDEX_COUNT = 30;
42 static constexpr int32 MESSAGES_DB_INDEX_COUNT_OLD = 9;
43 
44 // NB: must happen inside a transaction
init_messages_db(SqliteDb & db,int32 version)45 Status init_messages_db(SqliteDb &db, int32 version) {
46   LOG(INFO) << "Init message database " << tag("version", version);
47 
48   // Check if database exists
49   TRY_RESULT(has_table, db.has_table("messages"));
50   if (!has_table) {
51     version = 0;
52   } else if (version < static_cast<int32>(DbVersion::DialogDbCreated) || version > current_db_version()) {
53     TRY_STATUS(drop_messages_db(db, version));
54     version = 0;
55   }
56 
57   auto add_media_indices = [&db](int begin, int end) {
58     for (int i = begin; i < end; i++) {
59       TRY_STATUS(db.exec(PSLICE() << "CREATE INDEX IF NOT EXISTS message_index_" << i
60                                   << " ON messages (dialog_id, message_id) WHERE (index_mask & " << (1 << i)
61                                   << ") != 0"));
62     }
63     return Status::OK();
64   };
65 
66   auto add_fts = [&db] {
67     TRY_STATUS(
68         db.exec("CREATE INDEX IF NOT EXISTS message_by_search_id ON messages "
69                 "(search_id) WHERE search_id IS NOT NULL"));
70 
71     TRY_STATUS(
72         db.exec("CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5(text, content='messages', "
73                 "content_rowid='search_id', tokenize = \"unicode61 remove_diacritics 0 tokenchars '\a'\")"));
74     TRY_STATUS(db.exec(
75         "CREATE TRIGGER IF NOT EXISTS trigger_fts_delete BEFORE DELETE ON messages WHEN OLD.search_id IS NOT NULL"
76         " BEGIN INSERT INTO messages_fts(messages_fts, rowid, text) VALUES(\'delete\', OLD.search_id, OLD.text); END"));
77     TRY_STATUS(db.exec(
78         "CREATE TRIGGER IF NOT EXISTS trigger_fts_insert AFTER INSERT ON messages WHEN NEW.search_id IS NOT NULL"
79         " BEGIN INSERT INTO messages_fts(rowid, text) VALUES(NEW.search_id, NEW.text); END"));
80     //TRY_STATUS(db.exec(
81     //"CREATE TRIGGER IF NOT EXISTS trigger_fts_update AFTER UPDATE ON messages WHEN NEW.search_id IS NOT NULL OR "
82     //"OLD.search_id IS NOT NULL"
83     //" BEGIN "
84     //"INSERT INTO messages_fts(messages_fts, rowid, text) VALUES(\'delete\', OLD.search_id, OLD.text); "
85     //"INSERT INTO messages_fts(rowid, text) VALUES(NEW.search_id, NEW.text); "
86     //" END"));
87 
88     return Status::OK();
89   };
90   auto add_call_index = [&db] {
91     for (int i = static_cast<int>(MessageSearchFilter::Call) - 1; i < static_cast<int>(MessageSearchFilter::MissedCall);
92          i++) {
93       TRY_STATUS(db.exec(PSLICE() << "CREATE INDEX IF NOT EXISTS full_message_index_" << i
94                                   << " ON messages (unique_message_id) WHERE (index_mask & " << (1 << i) << ") != 0"));
95     }
96     return Status::OK();
97   };
98   auto add_notification_id_index = [&db] {
99     return db.exec(
100         "CREATE INDEX IF NOT EXISTS message_by_notification_id ON messages (dialog_id, notification_id) WHERE "
101         "notification_id IS NOT NULL");
102   };
103   auto add_scheduled_messages_table = [&db] {
104     TRY_STATUS(
105         db.exec("CREATE TABLE IF NOT EXISTS scheduled_messages (dialog_id INT8, message_id INT8, "
106                 "server_message_id INT4, data BLOB, PRIMARY KEY (dialog_id, message_id))"));
107 
108     TRY_STATUS(
109         db.exec("CREATE INDEX IF NOT EXISTS message_by_server_message_id ON scheduled_messages "
110                 "(dialog_id, server_message_id) WHERE server_message_id IS NOT NULL"));
111     return Status::OK();
112   };
113 
114   if (version == 0) {
115     LOG(INFO) << "Create new message database";
116     TRY_STATUS(
117         db.exec("CREATE TABLE IF NOT EXISTS messages (dialog_id INT8, message_id INT8, unique_message_id INT4, "
118                 "sender_user_id INT8, random_id INT8, data BLOB, ttl_expires_at INT4, index_mask INT4, search_id INT8, "
119                 "text STRING, notification_id INT4, top_thread_message_id INT8, PRIMARY KEY (dialog_id, message_id))"));
120 
121     TRY_STATUS(
122         db.exec("CREATE INDEX IF NOT EXISTS message_by_random_id ON messages (dialog_id, random_id) "
123                 "WHERE random_id IS NOT NULL"));
124 
125     TRY_STATUS(
126         db.exec("CREATE INDEX IF NOT EXISTS message_by_unique_message_id ON messages "
127                 "(unique_message_id) WHERE unique_message_id IS NOT NULL"));
128 
129     TRY_STATUS(
130         db.exec("CREATE INDEX IF NOT EXISTS message_by_ttl ON messages "
131                 "(ttl_expires_at) WHERE ttl_expires_at IS NOT NULL"));
132 
133     TRY_STATUS(add_media_indices(0, MESSAGES_DB_INDEX_COUNT));
134 
135     TRY_STATUS(add_fts());
136 
137     TRY_STATUS(add_call_index());
138 
139     TRY_STATUS(add_notification_id_index());
140 
141     TRY_STATUS(add_scheduled_messages_table());
142 
143     version = current_db_version();
144   }
145   if (version < static_cast<int32>(DbVersion::MessagesDbMediaIndex)) {
146     TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN index_mask INT4"));
147     TRY_STATUS(add_media_indices(0, MESSAGES_DB_INDEX_COUNT_OLD));
148   }
149   if (version < static_cast<int32>(DbVersion::MessagesDb30MediaIndex)) {
150     TRY_STATUS(add_media_indices(MESSAGES_DB_INDEX_COUNT_OLD, MESSAGES_DB_INDEX_COUNT));
151   }
152   if (version < static_cast<int32>(DbVersion::MessagesDbFts)) {
153     TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN search_id INT8"));
154     TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN text STRING"));
155     TRY_STATUS(add_fts());
156   }
157   if (version < static_cast<int32>(DbVersion::MessagesCallIndex)) {
158     TRY_STATUS(add_call_index());
159   }
160   if (version < static_cast<int32>(DbVersion::AddNotificationsSupport)) {
161     TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN notification_id INT4"));
162     TRY_STATUS(add_notification_id_index());
163   }
164   if (version < static_cast<int32>(DbVersion::AddScheduledMessages)) {
165     TRY_STATUS(add_scheduled_messages_table());
166   }
167   if (version < static_cast<int32>(DbVersion::AddMessageThreadSupport)) {
168     TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN top_thread_message_id INT8"));
169   }
170   return Status::OK();
171 }
172 
173 // NB: must happen inside a transaction
drop_messages_db(SqliteDb & db,int32 version)174 Status drop_messages_db(SqliteDb &db, int32 version) {
175   LOG(WARNING) << "Drop message database " << tag("version", version)
176                << tag("current_db_version", current_db_version());
177   return db.exec("DROP TABLE IF EXISTS messages");
178 }
179 
180 class MessagesDbImpl final : public MessagesDbSyncInterface {
181  public:
MessagesDbImpl(SqliteDb db)182   explicit MessagesDbImpl(SqliteDb db) : db_(std::move(db)) {
183     init().ensure();
184   }
185 
init()186   Status init() {
187     TRY_RESULT_ASSIGN(
188         add_message_stmt_,
189         db_.get_statement("INSERT OR REPLACE INTO messages VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)"));
190     TRY_RESULT_ASSIGN(delete_message_stmt_,
191                       db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
192     TRY_RESULT_ASSIGN(delete_all_dialog_messages_stmt_,
193                       db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id <= ?2"));
194     TRY_RESULT_ASSIGN(delete_dialog_messages_by_sender_stmt_,
195                       db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND sender_user_id == ?2"));
196 
197     TRY_RESULT_ASSIGN(
198         get_message_stmt_,
199         db_.get_statement("SELECT message_id, data FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
200     TRY_RESULT_ASSIGN(
201         get_message_by_random_id_stmt_,
202         db_.get_statement("SELECT message_id, data FROM messages WHERE dialog_id = ?1 AND random_id = ?2"));
203     TRY_RESULT_ASSIGN(
204         get_message_by_unique_message_id_stmt_,
205         db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE unique_message_id = ?1"));
206 
207     TRY_RESULT_ASSIGN(
208         get_expiring_messages_stmt_,
209         db_.get_statement(
210             "SELECT dialog_id, message_id, data FROM messages WHERE ?1 < ttl_expires_at AND ttl_expires_at <= ?2"));
211     TRY_RESULT_ASSIGN(get_expiring_messages_helper_stmt_,
212                       db_.get_statement("SELECT MAX(ttl_expires_at), COUNT(*) FROM (SELECT ttl_expires_at FROM "
213                                         "messages WHERE ?1 < ttl_expires_at LIMIT ?2) AS T"));
214 
215     TRY_RESULT_ASSIGN(get_messages_stmt_.asc_stmt_,
216                       db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id > "
217                                         "?2 ORDER BY message_id ASC LIMIT ?3"));
218     TRY_RESULT_ASSIGN(get_messages_stmt_.desc_stmt_,
219                       db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id < "
220                                         "?2 ORDER BY message_id DESC LIMIT ?3"));
221     TRY_RESULT_ASSIGN(get_scheduled_messages_stmt_,
222                       db_.get_statement("SELECT data, message_id FROM scheduled_messages WHERE dialog_id = ?1 AND "
223                                         "message_id < ?2 ORDER BY message_id DESC LIMIT ?3"));
224     TRY_RESULT_ASSIGN(get_messages_from_notification_id_stmt_,
225                       db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND "
226                                         "notification_id < ?2 ORDER BY notification_id DESC LIMIT ?3"));
227     TRY_RESULT_ASSIGN(get_messages_fts_stmt_,
228                       db_.get_statement("SELECT dialog_id, message_id, data, search_id FROM messages WHERE search_id "
229                                         "IN (SELECT rowid FROM messages_fts WHERE messages_fts MATCH ?1 AND rowid < ?2 "
230                                         "ORDER BY rowid DESC LIMIT ?3) ORDER BY search_id DESC"));
231 
232     for (int32 i = 0; i < MESSAGES_DB_INDEX_COUNT; i++) {
233       TRY_RESULT_ASSIGN(
234           get_message_ids_stmts_[i],
235           db_.get_statement(
236               PSLICE() << "SELECT message_id FROM messages WHERE dialog_id = ?1 AND message_id < ?2 AND (index_mask & "
237                        << (1 << i) << ") != 0 ORDER BY message_id DESC LIMIT 1000000"));
238 
239       TRY_RESULT_ASSIGN(
240           get_messages_from_index_stmts_[i].desc_stmt_,
241           db_.get_statement(
242               PSLICE()
243               << "SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id < ?2 AND (index_mask & "
244               << (1 << i) << ") != 0 ORDER BY message_id DESC LIMIT ?3"));
245 
246       TRY_RESULT_ASSIGN(
247           get_messages_from_index_stmts_[i].asc_stmt_,
248           db_.get_statement(
249               PSLICE()
250               << "SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id > ?2 AND (index_mask & "
251               << (1 << i) << ") != 0 ORDER BY message_id ASC LIMIT ?3"));
252 
253       // LOG(ERROR) << get_messages_from_index_stmts_[i].desc_stmt_.explain().ok();
254       // LOG(ERROR) << get_messages_from_index_stmts_[i].asc_stmt_.explain().ok();
255     }
256 
257     for (int i = static_cast<int>(MessageSearchFilter::Call) - 1, pos = 0;
258          i < static_cast<int>(MessageSearchFilter::MissedCall); i++, pos++) {
259       TRY_RESULT_ASSIGN(
260           get_calls_stmts_[pos],
261           db_.get_statement(
262               PSLICE()
263               << "SELECT dialog_id, message_id, data FROM messages WHERE unique_message_id < ?1 AND (index_mask & "
264               << (1 << i) << ") != 0 ORDER BY unique_message_id DESC LIMIT ?2"));
265     }
266 
267     TRY_RESULT_ASSIGN(add_scheduled_message_stmt_,
268                       db_.get_statement("INSERT OR REPLACE INTO scheduled_messages VALUES(?1, ?2, ?3, ?4)"));
269     TRY_RESULT_ASSIGN(
270         get_scheduled_message_stmt_,
271         db_.get_statement("SELECT message_id, data FROM scheduled_messages WHERE dialog_id = ?1 AND message_id = ?2"));
272     TRY_RESULT_ASSIGN(
273         get_scheduled_server_message_stmt_,
274         db_.get_statement(
275             "SELECT message_id, data FROM scheduled_messages WHERE dialog_id = ?1 AND server_message_id = ?2"));
276     TRY_RESULT_ASSIGN(delete_scheduled_message_stmt_,
277                       db_.get_statement("DELETE FROM scheduled_messages WHERE dialog_id = ?1 AND message_id = ?2"));
278     TRY_RESULT_ASSIGN(
279         delete_scheduled_server_message_stmt_,
280         db_.get_statement("DELETE FROM scheduled_messages WHERE dialog_id = ?1 AND server_message_id = ?2"));
281 
282     // LOG(ERROR) << get_message_stmt_.explain().ok();
283     // LOG(ERROR) << get_messages_from_notification_id_stmt.explain().ok();
284     // LOG(ERROR) << get_message_by_random_id_stmt_.explain().ok();
285     // LOG(ERROR) << get_message_by_unique_message_id_stmt_.explain().ok();
286 
287     // LOG(ERROR) << get_expiring_messages_stmt_.explain().ok();
288     // LOG(ERROR) << get_expiring_messages_helper_stmt_.explain().ok();
289 
290     // LOG(FATAL) << "EXPLAINED";
291 
292     return Status::OK();
293   }
294 
add_message(FullMessageId full_message_id,ServerMessageId unique_message_id,DialogId sender_dialog_id,int64 random_id,int32 ttl_expires_at,int32 index_mask,int64 search_id,string text,NotificationId notification_id,MessageId top_thread_message_id,BufferSlice data)295   Status add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
296                      int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
297                      NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data) final {
298     LOG(INFO) << "Add " << full_message_id << " to database";
299     auto dialog_id = full_message_id.get_dialog_id();
300     auto message_id = full_message_id.get_message_id();
301     LOG_CHECK(dialog_id.is_valid()) << dialog_id << ' ' << message_id << ' ' << full_message_id;
302     CHECK(message_id.is_valid());
303     SCOPE_EXIT {
304       add_message_stmt_.reset();
305     };
306     add_message_stmt_.bind_int64(1, dialog_id.get()).ensure();
307     add_message_stmt_.bind_int64(2, message_id.get()).ensure();
308 
309     if (unique_message_id.is_valid()) {
310       add_message_stmt_.bind_int32(3, unique_message_id.get()).ensure();
311     } else {
312       add_message_stmt_.bind_null(3).ensure();
313     }
314 
315     if (sender_dialog_id.is_valid()) {
316       add_message_stmt_.bind_int64(4, sender_dialog_id.get()).ensure();
317     } else {
318       add_message_stmt_.bind_null(4).ensure();
319     }
320 
321     if (random_id != 0) {
322       add_message_stmt_.bind_int64(5, random_id).ensure();
323     } else {
324       add_message_stmt_.bind_null(5).ensure();
325     }
326 
327     add_message_stmt_.bind_blob(6, data.as_slice()).ensure();
328 
329     if (ttl_expires_at != 0) {
330       add_message_stmt_.bind_int32(7, ttl_expires_at).ensure();
331     } else {
332       add_message_stmt_.bind_null(7).ensure();
333     }
334 
335     if (index_mask != 0) {
336       add_message_stmt_.bind_int32(8, index_mask).ensure();
337     } else {
338       add_message_stmt_.bind_null(8).ensure();
339     }
340     if (search_id != 0) {
341       // add dialog_id to text
342       text += PSTRING() << " \a" << dialog_id.get();
343       if (index_mask != 0) {
344         for (int i = 0; i < MESSAGES_DB_INDEX_COUNT; i++) {
345           if ((index_mask & (1 << i))) {
346             text += PSTRING() << " \a\a" << i;
347           }
348         }
349       }
350       add_message_stmt_.bind_int64(9, search_id).ensure();
351     } else {
352       text = "";
353       add_message_stmt_.bind_null(9).ensure();
354     }
355     if (!text.empty()) {
356       add_message_stmt_.bind_string(10, text).ensure();
357     } else {
358       add_message_stmt_.bind_null(10).ensure();
359     }
360     if (notification_id.is_valid()) {
361       add_message_stmt_.bind_int32(11, notification_id.get()).ensure();
362     } else {
363       add_message_stmt_.bind_null(11).ensure();
364     }
365     if (top_thread_message_id.is_valid()) {
366       add_message_stmt_.bind_int64(12, top_thread_message_id.get()).ensure();
367     } else {
368       add_message_stmt_.bind_null(12).ensure();
369     }
370 
371     add_message_stmt_.step().ensure();
372 
373     return Status::OK();
374   }
375 
add_scheduled_message(FullMessageId full_message_id,BufferSlice data)376   Status add_scheduled_message(FullMessageId full_message_id, BufferSlice data) final {
377     LOG(INFO) << "Add " << full_message_id << " to database";
378     auto dialog_id = full_message_id.get_dialog_id();
379     auto message_id = full_message_id.get_message_id();
380     CHECK(dialog_id.is_valid());
381     CHECK(message_id.is_valid_scheduled());
382     SCOPE_EXIT {
383       add_scheduled_message_stmt_.reset();
384     };
385     add_scheduled_message_stmt_.bind_int64(1, dialog_id.get()).ensure();
386     add_scheduled_message_stmt_.bind_int64(2, message_id.get()).ensure();
387 
388     if (message_id.is_scheduled_server()) {
389       add_scheduled_message_stmt_.bind_int32(3, message_id.get_scheduled_server_message_id().get()).ensure();
390     } else {
391       add_scheduled_message_stmt_.bind_null(3).ensure();
392     }
393 
394     add_scheduled_message_stmt_.bind_blob(4, data.as_slice()).ensure();
395 
396     add_scheduled_message_stmt_.step().ensure();
397 
398     return Status::OK();
399   }
400 
delete_message(FullMessageId full_message_id)401   Status delete_message(FullMessageId full_message_id) final {
402     LOG(INFO) << "Delete " << full_message_id << " from database";
403     auto dialog_id = full_message_id.get_dialog_id();
404     auto message_id = full_message_id.get_message_id();
405     CHECK(dialog_id.is_valid());
406     CHECK(message_id.is_valid() || message_id.is_valid_scheduled());
407     bool is_scheduled = message_id.is_scheduled();
408     bool is_scheduled_server = is_scheduled && message_id.is_scheduled_server();
409     auto &stmt = is_scheduled
410                      ? (is_scheduled_server ? delete_scheduled_server_message_stmt_ : delete_scheduled_message_stmt_)
411                      : delete_message_stmt_;
412     SCOPE_EXIT {
413       stmt.reset();
414     };
415     stmt.bind_int64(1, dialog_id.get()).ensure();
416     if (is_scheduled_server) {
417       stmt.bind_int32(2, message_id.get_scheduled_server_message_id().get()).ensure();
418     } else {
419       stmt.bind_int64(2, message_id.get()).ensure();
420     }
421     stmt.step().ensure();
422     return Status::OK();
423   }
424 
delete_all_dialog_messages(DialogId dialog_id,MessageId from_message_id)425   Status delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) final {
426     LOG(INFO) << "Delete all messages in " << dialog_id << " up to " << from_message_id << " from database";
427     CHECK(dialog_id.is_valid());
428     CHECK(from_message_id.is_valid());
429     SCOPE_EXIT {
430       delete_all_dialog_messages_stmt_.reset();
431     };
432     delete_all_dialog_messages_stmt_.bind_int64(1, dialog_id.get()).ensure();
433     delete_all_dialog_messages_stmt_.bind_int64(2, from_message_id.get()).ensure();
434     auto status = delete_all_dialog_messages_stmt_.step();
435     if (status.is_error()) {
436       LOG(ERROR) << status;
437     }
438     return status;
439   }
440 
delete_dialog_messages_by_sender(DialogId dialog_id,DialogId sender_dialog_id)441   Status delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id) final {
442     LOG(INFO) << "Delete all messages in " << dialog_id << " sent by " << sender_dialog_id << " from database";
443     CHECK(dialog_id.is_valid());
444     CHECK(sender_dialog_id.is_valid());
445     SCOPE_EXIT {
446       delete_dialog_messages_by_sender_stmt_.reset();
447     };
448     delete_dialog_messages_by_sender_stmt_.bind_int64(1, dialog_id.get()).ensure();
449     delete_dialog_messages_by_sender_stmt_.bind_int64(2, sender_dialog_id.get()).ensure();
450     delete_dialog_messages_by_sender_stmt_.step().ensure();
451     return Status::OK();
452   }
453 
get_message(FullMessageId full_message_id)454   Result<MessagesDbDialogMessage> get_message(FullMessageId full_message_id) final {
455     auto dialog_id = full_message_id.get_dialog_id();
456     auto message_id = full_message_id.get_message_id();
457     CHECK(dialog_id.is_valid());
458     CHECK(message_id.is_valid() || message_id.is_valid_scheduled());
459     bool is_scheduled = message_id.is_scheduled();
460     bool is_scheduled_server = is_scheduled && message_id.is_scheduled_server();
461     auto &stmt = is_scheduled ? (is_scheduled_server ? get_scheduled_server_message_stmt_ : get_scheduled_message_stmt_)
462                               : get_message_stmt_;
463     SCOPE_EXIT {
464       stmt.reset();
465     };
466 
467     stmt.bind_int64(1, dialog_id.get()).ensure();
468     if (is_scheduled_server) {
469       stmt.bind_int32(2, message_id.get_scheduled_server_message_id().get()).ensure();
470     } else {
471       stmt.bind_int64(2, message_id.get()).ensure();
472     }
473     stmt.step().ensure();
474     if (!stmt.has_row()) {
475       return Status::Error("Not found");
476     }
477     MessageId received_message_id(stmt.view_int64(0));
478     Slice data = stmt.view_blob(1);
479     if (is_scheduled_server) {
480       CHECK(received_message_id.is_scheduled());
481       CHECK(received_message_id.is_scheduled_server());
482       CHECK(received_message_id.get_scheduled_server_message_id() == message_id.get_scheduled_server_message_id());
483     } else {
484       LOG_CHECK(received_message_id == message_id)
485           << received_message_id << ' ' << message_id << ' ' << get_message_info(received_message_id, data, true).first;
486     }
487     return MessagesDbDialogMessage{received_message_id, BufferSlice(data)};
488   }
489 
get_message_by_unique_message_id(ServerMessageId unique_message_id)490   Result<MessagesDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) final {
491     if (!unique_message_id.is_valid()) {
492       return Status::Error("Invalid unique_message_id");
493     }
494     SCOPE_EXIT {
495       get_message_by_unique_message_id_stmt_.reset();
496     };
497     get_message_by_unique_message_id_stmt_.bind_int32(1, unique_message_id.get()).ensure();
498     get_message_by_unique_message_id_stmt_.step().ensure();
499     if (!get_message_by_unique_message_id_stmt_.has_row()) {
500       return Status::Error("Not found");
501     }
502     DialogId dialog_id(get_message_by_unique_message_id_stmt_.view_int64(0));
503     MessageId message_id(get_message_by_unique_message_id_stmt_.view_int64(1));
504     return MessagesDbMessage{dialog_id, message_id, BufferSlice(get_message_by_unique_message_id_stmt_.view_blob(2))};
505   }
506 
get_message_by_random_id(DialogId dialog_id,int64 random_id)507   Result<MessagesDbDialogMessage> get_message_by_random_id(DialogId dialog_id, int64 random_id) final {
508     SCOPE_EXIT {
509       get_message_by_random_id_stmt_.reset();
510     };
511     get_message_by_random_id_stmt_.bind_int64(1, dialog_id.get()).ensure();
512     get_message_by_random_id_stmt_.bind_int64(2, random_id).ensure();
513     get_message_by_random_id_stmt_.step().ensure();
514     if (!get_message_by_random_id_stmt_.has_row()) {
515       return Status::Error("Not found");
516     }
517     MessageId message_id(get_message_by_random_id_stmt_.view_int64(0));
518     return MessagesDbDialogMessage{message_id, BufferSlice(get_message_by_random_id_stmt_.view_blob(1))};
519   }
520 
get_dialog_message_by_date(DialogId dialog_id,MessageId first_message_id,MessageId last_message_id,int32 date)521   Result<MessagesDbDialogMessage> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id,
522                                                              MessageId last_message_id, int32 date) final {
523     int64 left_message_id = first_message_id.get();
524     int64 right_message_id = last_message_id.get();
525     LOG_CHECK(left_message_id <= right_message_id) << first_message_id << " " << last_message_id;
526     TRY_RESULT(first_messages, get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 1));
527     if (!first_messages.empty()) {
528       MessageId real_first_message_id;
529       int32 real_first_message_date;
530       std::tie(real_first_message_id, real_first_message_date) = get_message_info(first_messages[0]);
531       if (real_first_message_date <= date) {
532         // we definitely have at least one suitable message, let's do a binary search
533         left_message_id = real_first_message_id.get();
534 
535         MessageId prev_found_message_id;
536         while (left_message_id <= right_message_id) {
537           auto middle_message_id = left_message_id + ((right_message_id - left_message_id) >> 1);
538           TRY_RESULT(messages, get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, middle_message_id, 1));
539 
540           MessageId message_id;
541           int32 message_date = std::numeric_limits<int32>::max();
542           if (!messages.empty()) {
543             std::tie(message_id, message_date) = get_message_info(messages[0]);
544           }
545           if (message_date <= date) {
546             left_message_id = message_id.get();
547           } else {
548             right_message_id = middle_message_id - 1;
549           }
550 
551           if (prev_found_message_id == message_id) {
552             // we may be very close to the result, let's check
553             TRY_RESULT(left_messages,
554                        get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 2));
555             CHECK(!left_messages.empty());
556             if (left_messages.size() == 1) {
557               // only one message has left, result is found
558               break;
559             }
560 
561             MessageId next_message_id;
562             int32 next_message_date;
563             std::tie(next_message_id, next_message_date) = get_message_info(left_messages[1]);
564             if (next_message_date <= date) {
565               // next message has lesser date, adjusting left message
566               left_message_id = next_message_id.get();
567             } else {
568               // next message has bigger date, result is found
569               break;
570             }
571           }
572 
573           prev_found_message_id = message_id;
574         }
575 
576         // left_message_id is always an id of suitable message, let's return it
577         return get_message({dialog_id, MessageId(left_message_id)});
578       }
579     }
580 
581     return Status::Error("Not found");
582   }
583 
get_expiring_messages(int32 expires_from,int32 expires_till,int32 limit)584   Result<std::pair<vector<MessagesDbMessage>, int32>> get_expiring_messages(int32 expires_from, int32 expires_till,
585                                                                             int32 limit) final {
586     SCOPE_EXIT {
587       get_expiring_messages_stmt_.reset();
588       get_expiring_messages_helper_stmt_.reset();
589     };
590 
591     vector<MessagesDbMessage> messages;
592     // load messages
593     if (expires_from <= expires_till) {
594       get_expiring_messages_stmt_.bind_int32(1, expires_from).ensure();
595       get_expiring_messages_stmt_.bind_int32(2, expires_till).ensure();
596       get_expiring_messages_stmt_.step().ensure();
597 
598       while (get_expiring_messages_stmt_.has_row()) {
599         DialogId dialog_id(get_expiring_messages_stmt_.view_int64(0));
600         MessageId message_id(get_expiring_messages_stmt_.view_int64(1));
601         BufferSlice data(get_expiring_messages_stmt_.view_blob(2));
602         messages.push_back(MessagesDbMessage{dialog_id, message_id, std::move(data)});
603         get_expiring_messages_stmt_.step().ensure();
604       }
605     }
606 
607     // calc next expires_till
608     get_expiring_messages_helper_stmt_.bind_int32(1, expires_till).ensure();
609     get_expiring_messages_helper_stmt_.bind_int32(2, limit).ensure();
610     get_expiring_messages_helper_stmt_.step().ensure();
611     CHECK(get_expiring_messages_helper_stmt_.has_row());
612     int32 count = get_expiring_messages_helper_stmt_.view_int32(1);
613     int32 next_expires_till = -1;
614     if (count != 0) {
615       next_expires_till = get_expiring_messages_helper_stmt_.view_int32(0);
616     }
617     return std::make_pair(std::move(messages), next_expires_till);
618   }
619 
get_dialog_message_calendar(MessagesDbDialogCalendarQuery query)620   Result<MessagesDbCalendar> get_dialog_message_calendar(MessagesDbDialogCalendarQuery query) final {
621     auto &stmt = get_messages_from_index_stmts_[message_search_filter_index(query.filter)].desc_stmt_;
622     SCOPE_EXIT {
623       stmt.reset();
624     };
625     int32 limit = 1000;
626     stmt.bind_int64(1, query.dialog_id.get()).ensure();
627     stmt.bind_int64(2, query.from_message_id.get()).ensure();
628     stmt.bind_int32(3, limit).ensure();
629 
630     vector<MessagesDbDialogMessage> messages;
631     vector<int32> total_counts;
632     stmt.step().ensure();
633     int32 current_day = std::numeric_limits<int32>::max();
634     while (stmt.has_row()) {
635       auto data_slice = stmt.view_blob(0);
636       MessageId message_id(stmt.view_int64(1));
637       auto info = get_message_info(message_id, data_slice, false);
638       auto day = (query.tz_offset + info.second) / 86400;
639       if (day >= current_day) {
640         CHECK(!total_counts.empty());
641         total_counts.back()++;
642       } else {
643         current_day = day;
644         messages.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
645         total_counts.push_back(1);
646       }
647       stmt.step().ensure();
648     }
649     return MessagesDbCalendar{std::move(messages), std::move(total_counts)};
650   }
651 
get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query)652   Result<MessagesDbMessagePositions> get_dialog_sparse_message_positions(
653       MessagesDbGetDialogSparseMessagePositionsQuery query) final {
654     auto &stmt = get_message_ids_stmts_[message_search_filter_index(query.filter)];
655     SCOPE_EXIT {
656       stmt.reset();
657     };
658     stmt.bind_int64(1, query.dialog_id.get()).ensure();
659     stmt.bind_int64(2, query.from_message_id.get()).ensure();
660 
661     vector<MessageId> message_ids;
662     stmt.step().ensure();
663     while (stmt.has_row()) {
664       message_ids.push_back(MessageId(stmt.view_int64(0)));
665       stmt.step().ensure();
666     }
667 
668     int32 limit = min(query.limit, static_cast<int32>(message_ids.size()));
669     double delta = static_cast<double>(message_ids.size()) / limit;
670     MessagesDbMessagePositions positions;
671     positions.total_count = static_cast<int32>(message_ids.size());
672     positions.positions.reserve(limit);
673     for (int32 i = 0; i < limit; i++) {
674       auto position = static_cast<int32>((i + 0.5) * delta);
675       auto message_id = message_ids[position];
676       TRY_RESULT(message, get_message({query.dialog_id, message_id}));
677       auto date = get_message_info(message).second;
678       positions.positions.push_back(MessagesDbMessagePosition{position, date, message_id});
679     }
680     return positions;
681   }
682 
get_messages(MessagesDbMessagesQuery query)683   Result<vector<MessagesDbDialogMessage>> get_messages(MessagesDbMessagesQuery query) final {
684     if (query.filter != MessageSearchFilter::Empty) {
685       return get_messages_from_index(query.dialog_id, query.from_message_id, query.filter, query.offset, query.limit);
686     }
687     return get_messages_impl(get_messages_stmt_, query.dialog_id, query.from_message_id, query.offset, query.limit);
688   }
689 
get_scheduled_messages(DialogId dialog_id,int32 limit)690   Result<vector<MessagesDbDialogMessage>> get_scheduled_messages(DialogId dialog_id, int32 limit) final {
691     return get_messages_inner(get_scheduled_messages_stmt_, dialog_id, std::numeric_limits<int64>::max(), limit);
692   }
693 
get_messages_from_notification_id(DialogId dialog_id,NotificationId from_notification_id,int32 limit)694   Result<vector<MessagesDbDialogMessage>> get_messages_from_notification_id(DialogId dialog_id,
695                                                                             NotificationId from_notification_id,
696                                                                             int32 limit) final {
697     auto &stmt = get_messages_from_notification_id_stmt_;
698     SCOPE_EXIT {
699       stmt.reset();
700     };
701     stmt.bind_int64(1, dialog_id.get()).ensure();
702     stmt.bind_int32(2, from_notification_id.get()).ensure();
703     stmt.bind_int32(3, limit).ensure();
704 
705     vector<MessagesDbDialogMessage> result;
706     stmt.step().ensure();
707     while (stmt.has_row()) {
708       auto data_slice = stmt.view_blob(0);
709       MessageId message_id(stmt.view_int64(1));
710       result.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
711       LOG(INFO) << "Load " << message_id << " in " << dialog_id << " from database";
712       stmt.step().ensure();
713     }
714     return std::move(result);
715   }
716 
prepare_query(Slice query)717   static string prepare_query(Slice query) {
718     auto is_word_character = [](uint32 a) {
719       switch (get_unicode_simple_category(a)) {
720         case UnicodeSimpleCategory::Letter:
721         case UnicodeSimpleCategory::DecimalNumber:
722         case UnicodeSimpleCategory::Number:
723           return true;
724         default:
725           return a == '_';
726       }
727     };
728 
729     const size_t MAX_QUERY_SIZE = 1024;
730     query = utf8_truncate(query, MAX_QUERY_SIZE);
731     auto buf = StackAllocator::alloc(query.size() * 4 + 100);
732     StringBuilder sb(buf.as_slice());
733     bool in_word{false};
734 
735     for (auto ptr = query.ubegin(), end = query.uend(); ptr < end;) {
736       uint32 code;
737       auto code_ptr = ptr;
738       ptr = next_utf8_unsafe(ptr, &code, "prepare_query");
739       if (is_word_character(code)) {
740         if (!in_word) {
741           in_word = true;
742           sb << "\"";
743         }
744         sb << Slice(code_ptr, ptr);
745       } else {
746         if (in_word) {
747           in_word = false;
748           sb << "\" ";
749         }
750       }
751     }
752     if (in_word) {
753       sb << "\" ";
754     }
755 
756     if (sb.is_error()) {
757       LOG(ERROR) << "StringBuilder buffer overflow";
758       return "";
759     }
760 
761     return sb.as_cslice().str();
762   }
763 
get_messages_fts(MessagesDbFtsQuery query)764   Result<MessagesDbFtsResult> get_messages_fts(MessagesDbFtsQuery query) final {
765     SCOPE_EXIT {
766       get_messages_fts_stmt_.reset();
767     };
768 
769     LOG(INFO) << tag("query", query.query) << query.dialog_id << tag("filter", query.filter)
770               << tag("from_search_id", query.from_search_id) << tag("limit", query.limit);
771     string words = prepare_query(query.query);
772     LOG(INFO) << tag("from", query.query) << tag("to", words);
773 
774     // dialog_id kludge
775     if (query.dialog_id.is_valid()) {
776       words += PSTRING() << " \"\a" << query.dialog_id.get() << "\"";
777     }
778 
779     // index_mask kludge
780     if (query.filter != MessageSearchFilter::Empty) {
781       words += PSTRING() << " \"\a\a" << message_search_filter_index(query.filter) << "\"";
782     }
783 
784     auto &stmt = get_messages_fts_stmt_;
785     stmt.bind_string(1, words).ensure();
786     if (query.from_search_id == 0) {
787       query.from_search_id = std::numeric_limits<int64>::max();
788     }
789     stmt.bind_int64(2, query.from_search_id).ensure();
790     stmt.bind_int32(3, query.limit).ensure();
791     MessagesDbFtsResult result;
792     auto status = stmt.step();
793     if (status.is_error()) {
794       LOG(ERROR) << status;
795       return std::move(result);
796     }
797     while (stmt.has_row()) {
798       DialogId dialog_id(stmt.view_int64(0));
799       MessageId message_id(stmt.view_int64(1));
800       auto data_slice = stmt.view_blob(2);
801       auto search_id = stmt.view_int64(3);
802       result.next_search_id = search_id;
803       result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
804       stmt.step().ensure();
805     }
806     return std::move(result);
807   }
808 
get_messages_from_index(DialogId dialog_id,MessageId from_message_id,MessageSearchFilter filter,int32 offset,int32 limit)809   Result<vector<MessagesDbDialogMessage>> get_messages_from_index(DialogId dialog_id, MessageId from_message_id,
810                                                                   MessageSearchFilter filter, int32 offset,
811                                                                   int32 limit) {
812     auto &stmt = get_messages_from_index_stmts_[message_search_filter_index(filter)];
813     return get_messages_impl(stmt, dialog_id, from_message_id, offset, limit);
814   }
815 
get_calls(MessagesDbCallsQuery query)816   Result<MessagesDbCallsResult> get_calls(MessagesDbCallsQuery query) final {
817     int32 pos;
818     if (query.filter == MessageSearchFilter::Call) {
819       pos = 0;
820     } else if (query.filter == MessageSearchFilter::MissedCall) {
821       pos = 1;
822     } else {
823       return Status::Error(PSLICE() << "Filter is not Call or MissedCall: " << query.filter);
824     }
825 
826     auto &stmt = get_calls_stmts_[pos];
827     SCOPE_EXIT {
828       stmt.reset();
829     };
830 
831     stmt.bind_int32(1, query.from_unique_message_id).ensure();
832     stmt.bind_int32(2, query.limit).ensure();
833 
834     MessagesDbCallsResult result;
835     stmt.step().ensure();
836     while (stmt.has_row()) {
837       DialogId dialog_id(stmt.view_int64(0));
838       MessageId message_id(stmt.view_int64(1));
839       auto data_slice = stmt.view_blob(2);
840       result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
841       stmt.step().ensure();
842     }
843     return std::move(result);
844   }
845 
begin_write_transaction()846   Status begin_write_transaction() final {
847     return db_.begin_write_transaction();
848   }
commit_transaction()849   Status commit_transaction() final {
850     return db_.commit_transaction();
851   }
852 
853  private:
854   SqliteDb db_;
855 
856   SqliteStatement add_message_stmt_;
857 
858   SqliteStatement delete_message_stmt_;
859   SqliteStatement delete_all_dialog_messages_stmt_;
860   SqliteStatement delete_dialog_messages_by_sender_stmt_;
861 
862   SqliteStatement get_message_stmt_;
863   SqliteStatement get_message_by_random_id_stmt_;
864   SqliteStatement get_message_by_unique_message_id_stmt_;
865   SqliteStatement get_expiring_messages_stmt_;
866   SqliteStatement get_expiring_messages_helper_stmt_;
867 
868   struct GetMessagesStmt {
869     SqliteStatement asc_stmt_;
870     SqliteStatement desc_stmt_;
871   };
872   GetMessagesStmt get_messages_stmt_;
873   SqliteStatement get_scheduled_messages_stmt_;
874   SqliteStatement get_messages_from_notification_id_stmt_;
875 
876   std::array<SqliteStatement, MESSAGES_DB_INDEX_COUNT> get_message_ids_stmts_;
877   std::array<GetMessagesStmt, MESSAGES_DB_INDEX_COUNT> get_messages_from_index_stmts_;
878   std::array<SqliteStatement, 2> get_calls_stmts_;
879 
880   SqliteStatement get_messages_fts_stmt_;
881 
882   SqliteStatement add_scheduled_message_stmt_;
883   SqliteStatement get_scheduled_message_stmt_;
884   SqliteStatement get_scheduled_server_message_stmt_;
885   SqliteStatement delete_scheduled_message_stmt_;
886   SqliteStatement delete_scheduled_server_message_stmt_;
887 
get_messages_impl(GetMessagesStmt & stmt,DialogId dialog_id,MessageId from_message_id,int32 offset,int32 limit)888   static Result<vector<MessagesDbDialogMessage>> get_messages_impl(GetMessagesStmt &stmt, DialogId dialog_id,
889                                                                    MessageId from_message_id, int32 offset,
890                                                                    int32 limit) {
891     LOG_CHECK(dialog_id.is_valid()) << dialog_id;
892     CHECK(from_message_id.is_valid());
893 
894     LOG(INFO) << "Loading messages in " << dialog_id << " from " << from_message_id << " with offset = " << offset
895               << " and limit = " << limit;
896 
897     auto message_id = from_message_id.get();
898 
899     if (message_id >= MessageId::max().get()) {
900       message_id--;
901     }
902 
903     auto left_message_id = message_id;
904     auto left_cnt = limit + offset;
905 
906     auto right_message_id = message_id - 1;
907     auto right_cnt = -offset;
908 
909     vector<MessagesDbDialogMessage> left;
910     vector<MessagesDbDialogMessage> right;
911 
912     if (left_cnt != 0) {
913       if (right_cnt == 1 && false) {
914         left_message_id++;
915         left_cnt++;
916       }
917 
918       TRY_RESULT_ASSIGN(left, get_messages_inner(stmt.desc_stmt_, dialog_id, left_message_id, left_cnt));
919 
920       if (right_cnt == 1 && !left.empty() && false /*get_message_id(left[0].as_slice()) == message_id*/) {
921         right_cnt = 0;
922       }
923     }
924     if (right_cnt != 0) {
925       TRY_RESULT_ASSIGN(right, get_messages_inner(stmt.asc_stmt_, dialog_id, right_message_id, right_cnt));
926       std::reverse(right.begin(), right.end());
927     }
928     if (left.empty()) {
929       return std::move(right);
930     }
931     if (right.empty()) {
932       return std::move(left);
933     }
934 
935     right.reserve(right.size() + left.size());
936     std::move(left.begin(), left.end(), std::back_inserter(right));
937 
938     return std::move(right);
939   }
940 
get_messages_inner(SqliteStatement & stmt,DialogId dialog_id,int64 from_message_id,int32 limit)941   static Result<vector<MessagesDbDialogMessage>> get_messages_inner(SqliteStatement &stmt, DialogId dialog_id,
942                                                                     int64 from_message_id, int32 limit) {
943     SCOPE_EXIT {
944       stmt.reset();
945     };
946     stmt.bind_int64(1, dialog_id.get()).ensure();
947     stmt.bind_int64(2, from_message_id).ensure();
948     stmt.bind_int32(3, limit).ensure();
949 
950     LOG(INFO) << "Begin to load " << limit << " messages in " << dialog_id << " from " << MessageId(from_message_id)
951               << " from database";
952     vector<MessagesDbDialogMessage> result;
953     stmt.step().ensure();
954     while (stmt.has_row()) {
955       auto data_slice = stmt.view_blob(0);
956       MessageId message_id(stmt.view_int64(1));
957       result.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
958       LOG(INFO) << "Loaded " << message_id << " in " << dialog_id << " from database";
959       stmt.step().ensure();
960     }
961     return std::move(result);
962   }
963 
get_message_info(const MessagesDbDialogMessage & message,bool from_data=false)964   static std::pair<MessageId, int32> get_message_info(const MessagesDbDialogMessage &message, bool from_data = false) {
965     return get_message_info(message.message_id, message.data.as_slice(), from_data);
966   }
967 
get_message_info(MessageId message_id,Slice data,bool from_data)968   static std::pair<MessageId, int32> get_message_info(MessageId message_id, Slice data, bool from_data) {
969     LogEventParser message_date_parser(data);
970     int32 flags;
971     int32 flags2 = 0;
972     int32 flags3 = 0;
973     td::parse(flags, message_date_parser);
974     if ((flags & (1 << 29)) != 0) {
975       td::parse(flags2, message_date_parser);
976       if ((flags2 & (1 << 29)) != 0) {
977         td::parse(flags3, message_date_parser);
978       }
979     }
980     bool has_sender = (flags & (1 << 10)) != 0;
981     MessageId data_message_id;
982     td::parse(data_message_id, message_date_parser);
983     UserId sender_user_id;
984     if (has_sender) {
985       td::parse(sender_user_id, message_date_parser);
986     }
987     int32 date;
988     td::parse(date, message_date_parser);
989     LOG(INFO) << "Loaded " << message_id << "(aka " << data_message_id << ") sent at " << date << " by "
990               << sender_user_id;
991     return {from_data ? data_message_id : message_id, date};
992   }
993 };
994 
create_messages_db_sync(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)995 std::shared_ptr<MessagesDbSyncSafeInterface> create_messages_db_sync(
996     std::shared_ptr<SqliteConnectionSafe> sqlite_connection) {
997   class MessagesDbSyncSafe final : public MessagesDbSyncSafeInterface {
998    public:
999     explicit MessagesDbSyncSafe(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)
1000         : lsls_db_([safe_connection = std::move(sqlite_connection)] {
1001           return make_unique<MessagesDbImpl>(safe_connection->get().clone());
1002         }) {
1003     }
1004     MessagesDbSyncInterface &get() final {
1005       return *lsls_db_.get();
1006     }
1007 
1008    private:
1009     LazySchedulerLocalStorage<unique_ptr<MessagesDbSyncInterface>> lsls_db_;
1010   };
1011   return std::make_shared<MessagesDbSyncSafe>(std::move(sqlite_connection));
1012 }
1013 
1014 class MessagesDbAsync final : public MessagesDbAsyncInterface {
1015  public:
MessagesDbAsync(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,int32 scheduler_id)1016   MessagesDbAsync(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db, int32 scheduler_id) {
1017     impl_ = create_actor_on_scheduler<Impl>("MessagesDbActor", scheduler_id, std::move(sync_db));
1018   }
1019 
add_message(FullMessageId full_message_id,ServerMessageId unique_message_id,DialogId sender_dialog_id,int64 random_id,int32 ttl_expires_at,int32 index_mask,int64 search_id,string text,NotificationId notification_id,MessageId top_thread_message_id,BufferSlice data,Promise<> promise)1020   void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
1021                    int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
1022                    NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data,
1023                    Promise<> promise) final {
1024     send_closure_later(impl_, &Impl::add_message, full_message_id, unique_message_id, sender_dialog_id, random_id,
1025                        ttl_expires_at, index_mask, search_id, std::move(text), notification_id, top_thread_message_id,
1026                        std::move(data), std::move(promise));
1027   }
add_scheduled_message(FullMessageId full_message_id,BufferSlice data,Promise<> promise)1028   void add_scheduled_message(FullMessageId full_message_id, BufferSlice data, Promise<> promise) final {
1029     send_closure_later(impl_, &Impl::add_scheduled_message, full_message_id, std::move(data), std::move(promise));
1030   }
1031 
delete_message(FullMessageId full_message_id,Promise<> promise)1032   void delete_message(FullMessageId full_message_id, Promise<> promise) final {
1033     send_closure_later(impl_, &Impl::delete_message, full_message_id, std::move(promise));
1034   }
delete_all_dialog_messages(DialogId dialog_id,MessageId from_message_id,Promise<> promise)1035   void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) final {
1036     send_closure_later(impl_, &Impl::delete_all_dialog_messages, dialog_id, from_message_id, std::move(promise));
1037   }
delete_dialog_messages_by_sender(DialogId dialog_id,DialogId sender_dialog_id,Promise<> promise)1038   void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id, Promise<> promise) final {
1039     send_closure_later(impl_, &Impl::delete_dialog_messages_by_sender, dialog_id, sender_dialog_id, std::move(promise));
1040   }
1041 
get_message(FullMessageId full_message_id,Promise<MessagesDbDialogMessage> promise)1042   void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) final {
1043     send_closure_later(impl_, &Impl::get_message, full_message_id, std::move(promise));
1044   }
get_message_by_unique_message_id(ServerMessageId unique_message_id,Promise<MessagesDbMessage> promise)1045   void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessagesDbMessage> promise) final {
1046     send_closure_later(impl_, &Impl::get_message_by_unique_message_id, unique_message_id, std::move(promise));
1047   }
get_message_by_random_id(DialogId dialog_id,int64 random_id,Promise<MessagesDbDialogMessage> promise)1048   void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessagesDbDialogMessage> promise) final {
1049     send_closure_later(impl_, &Impl::get_message_by_random_id, dialog_id, random_id, std::move(promise));
1050   }
get_dialog_message_by_date(DialogId dialog_id,MessageId first_message_id,MessageId last_message_id,int32 date,Promise<MessagesDbDialogMessage> promise)1051   void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id, int32 date,
1052                                   Promise<MessagesDbDialogMessage> promise) final {
1053     send_closure_later(impl_, &Impl::get_dialog_message_by_date, dialog_id, first_message_id, last_message_id, date,
1054                        std::move(promise));
1055   }
1056 
get_dialog_message_calendar(MessagesDbDialogCalendarQuery query,Promise<MessagesDbCalendar> promise)1057   void get_dialog_message_calendar(MessagesDbDialogCalendarQuery query, Promise<MessagesDbCalendar> promise) final {
1058     send_closure_later(impl_, &Impl::get_dialog_message_calendar, std::move(query), std::move(promise));
1059   }
1060 
get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,Promise<MessagesDbMessagePositions> promise)1061   void get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,
1062                                            Promise<MessagesDbMessagePositions> promise) final {
1063     send_closure_later(impl_, &Impl::get_dialog_sparse_message_positions, std::move(query), std::move(promise));
1064   }
1065 
get_messages(MessagesDbMessagesQuery query,Promise<vector<MessagesDbDialogMessage>> promise)1066   void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) final {
1067     send_closure_later(impl_, &Impl::get_messages, std::move(query), std::move(promise));
1068   }
get_scheduled_messages(DialogId dialog_id,int32 limit,Promise<vector<MessagesDbDialogMessage>> promise)1069   void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessagesDbDialogMessage>> promise) final {
1070     send_closure_later(impl_, &Impl::get_scheduled_messages, dialog_id, limit, std::move(promise));
1071   }
get_messages_from_notification_id(DialogId dialog_id,NotificationId from_notification_id,int32 limit,Promise<vector<MessagesDbDialogMessage>> promise)1072   void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
1073                                          Promise<vector<MessagesDbDialogMessage>> promise) final {
1074     send_closure_later(impl_, &Impl::get_messages_from_notification_id, dialog_id, from_notification_id, limit,
1075                        std::move(promise));
1076   }
get_calls(MessagesDbCallsQuery query,Promise<MessagesDbCallsResult> promise)1077   void get_calls(MessagesDbCallsQuery query, Promise<MessagesDbCallsResult> promise) final {
1078     send_closure_later(impl_, &Impl::get_calls, std::move(query), std::move(promise));
1079   }
get_messages_fts(MessagesDbFtsQuery query,Promise<MessagesDbFtsResult> promise)1080   void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) final {
1081     send_closure_later(impl_, &Impl::get_messages_fts, std::move(query), std::move(promise));
1082   }
get_expiring_messages(int32 expires_from,int32 expires_till,int32 limit,Promise<std::pair<vector<MessagesDbMessage>,int32>> promise)1083   void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
1084                              Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) final {
1085     send_closure_later(impl_, &Impl::get_expiring_messages, expires_from, expires_till, limit, std::move(promise));
1086   }
1087 
close(Promise<> promise)1088   void close(Promise<> promise) final {
1089     send_closure_later(impl_, &Impl::close, std::move(promise));
1090   }
1091 
force_flush()1092   void force_flush() final {
1093     send_closure_later(impl_, &Impl::force_flush);
1094   }
1095 
1096  private:
1097   class Impl final : public Actor {
1098    public:
Impl(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db_safe)1099     explicit Impl(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) {
1100     }
add_message(FullMessageId full_message_id,ServerMessageId unique_message_id,DialogId sender_dialog_id,int64 random_id,int32 ttl_expires_at,int32 index_mask,int64 search_id,string text,NotificationId notification_id,MessageId top_thread_message_id,BufferSlice data,Promise<> promise)1101     void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
1102                      int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
1103                      NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data,
1104                      Promise<> promise) {
1105       add_write_query([this, full_message_id, unique_message_id, sender_dialog_id, random_id, ttl_expires_at,
1106                        index_mask, search_id, text = std::move(text), notification_id, top_thread_message_id,
1107                        data = std::move(data), promise = std::move(promise)](Unit) mutable {
1108         on_write_result(std::move(promise),
1109                         sync_db_->add_message(full_message_id, unique_message_id, sender_dialog_id, random_id,
1110                                               ttl_expires_at, index_mask, search_id, std::move(text), notification_id,
1111                                               top_thread_message_id, std::move(data)));
1112       });
1113     }
add_scheduled_message(FullMessageId full_message_id,BufferSlice data,Promise<> promise)1114     void add_scheduled_message(FullMessageId full_message_id, BufferSlice data, Promise<> promise) {
1115       add_write_query([this, full_message_id, promise = std::move(promise), data = std::move(data)](Unit) mutable {
1116         on_write_result(std::move(promise), sync_db_->add_scheduled_message(full_message_id, std::move(data)));
1117       });
1118     }
1119 
delete_message(FullMessageId full_message_id,Promise<> promise)1120     void delete_message(FullMessageId full_message_id, Promise<> promise) {
1121       add_write_query([this, full_message_id, promise = std::move(promise)](Unit) mutable {
1122         on_write_result(std::move(promise), sync_db_->delete_message(full_message_id));
1123       });
1124     }
on_write_result(Promise<> promise,Status status)1125     void on_write_result(Promise<> promise, Status status) {
1126       // We are inside a transaction and don't know how to handle the error
1127       status.ensure();
1128       pending_write_results_.emplace_back(std::move(promise), std::move(status));
1129     }
delete_all_dialog_messages(DialogId dialog_id,MessageId from_message_id,Promise<> promise)1130     void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) {
1131       add_read_query();
1132       promise.set_result(sync_db_->delete_all_dialog_messages(dialog_id, from_message_id));
1133     }
delete_dialog_messages_by_sender(DialogId dialog_id,DialogId sender_dialog_id,Promise<> promise)1134     void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id, Promise<> promise) {
1135       add_read_query();
1136       promise.set_result(sync_db_->delete_dialog_messages_by_sender(dialog_id, sender_dialog_id));
1137     }
1138 
get_message(FullMessageId full_message_id,Promise<MessagesDbDialogMessage> promise)1139     void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) {
1140       add_read_query();
1141       promise.set_result(sync_db_->get_message(full_message_id));
1142     }
get_message_by_unique_message_id(ServerMessageId unique_message_id,Promise<MessagesDbMessage> promise)1143     void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessagesDbMessage> promise) {
1144       add_read_query();
1145       promise.set_result(sync_db_->get_message_by_unique_message_id(unique_message_id));
1146     }
get_message_by_random_id(DialogId dialog_id,int64 random_id,Promise<MessagesDbDialogMessage> promise)1147     void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessagesDbDialogMessage> promise) {
1148       add_read_query();
1149       promise.set_result(sync_db_->get_message_by_random_id(dialog_id, random_id));
1150     }
get_dialog_message_by_date(DialogId dialog_id,MessageId first_message_id,MessageId last_message_id,int32 date,Promise<MessagesDbDialogMessage> promise)1151     void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id,
1152                                     int32 date, Promise<MessagesDbDialogMessage> promise) {
1153       add_read_query();
1154       promise.set_result(sync_db_->get_dialog_message_by_date(dialog_id, first_message_id, last_message_id, date));
1155     }
1156 
get_dialog_message_calendar(MessagesDbDialogCalendarQuery query,Promise<MessagesDbCalendar> promise)1157     void get_dialog_message_calendar(MessagesDbDialogCalendarQuery query, Promise<MessagesDbCalendar> promise) {
1158       add_read_query();
1159       promise.set_result(sync_db_->get_dialog_message_calendar(std::move(query)));
1160     }
1161 
get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,Promise<MessagesDbMessagePositions> promise)1162     void get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,
1163                                              Promise<MessagesDbMessagePositions> promise) {
1164       add_read_query();
1165       promise.set_result(sync_db_->get_dialog_sparse_message_positions(std::move(query)));
1166     }
1167 
get_messages(MessagesDbMessagesQuery query,Promise<vector<MessagesDbDialogMessage>> promise)1168     void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) {
1169       add_read_query();
1170       promise.set_result(sync_db_->get_messages(std::move(query)));
1171     }
get_scheduled_messages(DialogId dialog_id,int32 limit,Promise<vector<MessagesDbDialogMessage>> promise)1172     void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessagesDbDialogMessage>> promise) {
1173       add_read_query();
1174       promise.set_result(sync_db_->get_scheduled_messages(dialog_id, limit));
1175     }
get_messages_from_notification_id(DialogId dialog_id,NotificationId from_notification_id,int32 limit,Promise<vector<MessagesDbDialogMessage>> promise)1176     void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
1177                                            Promise<vector<MessagesDbDialogMessage>> promise) {
1178       add_read_query();
1179       promise.set_result(sync_db_->get_messages_from_notification_id(dialog_id, from_notification_id, limit));
1180     }
get_calls(MessagesDbCallsQuery query,Promise<MessagesDbCallsResult> promise)1181     void get_calls(MessagesDbCallsQuery query, Promise<MessagesDbCallsResult> promise) {
1182       add_read_query();
1183       promise.set_result(sync_db_->get_calls(std::move(query)));
1184     }
get_messages_fts(MessagesDbFtsQuery query,Promise<MessagesDbFtsResult> promise)1185     void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) {
1186       add_read_query();
1187       promise.set_result(sync_db_->get_messages_fts(std::move(query)));
1188     }
get_expiring_messages(int32 expires_from,int32 expires_till,int32 limit,Promise<std::pair<vector<MessagesDbMessage>,int32>> promise)1189     void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
1190                                Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) {
1191       add_read_query();
1192       promise.set_result(sync_db_->get_expiring_messages(expires_from, expires_till, limit));
1193     }
1194 
close(Promise<> promise)1195     void close(Promise<> promise) {
1196       do_flush();
1197       sync_db_safe_.reset();
1198       sync_db_ = nullptr;
1199       promise.set_value(Unit());
1200       stop();
1201     }
1202 
force_flush()1203     void force_flush() {
1204       LOG(INFO) << "MessagesDb flushed";
1205       do_flush();
1206     }
1207 
1208    private:
1209     std::shared_ptr<MessagesDbSyncSafeInterface> sync_db_safe_;
1210     MessagesDbSyncInterface *sync_db_ = nullptr;
1211 
1212     static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
1213     static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
1214 
1215     //NB: order is important, destructor of pending_writes_ will change pending_write_results_
1216     vector<std::pair<Promise<>, Status>> pending_write_results_;
1217     vector<Promise<>> pending_writes_;
1218     double wakeup_at_ = 0;
1219     template <class F>
add_write_query(F && f)1220     void add_write_query(F &&f) {
1221       pending_writes_.push_back(PromiseCreator::lambda(std::forward<F>(f), PromiseCreator::Ignore()));
1222       if (pending_writes_.size() > MAX_PENDING_QUERIES_COUNT) {
1223         do_flush();
1224         wakeup_at_ = 0;
1225       } else if (wakeup_at_ == 0) {
1226         wakeup_at_ = Time::now_cached() + MAX_PENDING_QUERIES_DELAY;
1227       }
1228       if (wakeup_at_ != 0) {
1229         set_timeout_at(wakeup_at_);
1230       }
1231     }
add_read_query()1232     void add_read_query() {
1233       do_flush();
1234     }
do_flush()1235     void do_flush() {
1236       if (pending_writes_.empty()) {
1237         return;
1238       }
1239       sync_db_->begin_write_transaction().ensure();
1240       for (auto &query : pending_writes_) {
1241         query.set_value(Unit());
1242       }
1243       sync_db_->commit_transaction().ensure();
1244       pending_writes_.clear();
1245       for (auto &p : pending_write_results_) {
1246         p.first.set_result(std::move(p.second));
1247       }
1248       pending_write_results_.clear();
1249       cancel_timeout();
1250     }
timeout_expired()1251     void timeout_expired() final {
1252       do_flush();
1253     }
1254 
start_up()1255     void start_up() final {
1256       sync_db_ = &sync_db_safe_->get();
1257     }
1258   };
1259   ActorOwn<Impl> impl_;
1260 };
1261 
create_messages_db_async(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,int32 scheduler_id)1262 std::shared_ptr<MessagesDbAsyncInterface> create_messages_db_async(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,
1263                                                                    int32 scheduler_id) {
1264   return std::make_shared<MessagesDbAsync>(std::move(sync_db), scheduler_id);
1265 }
1266 
1267 }  // namespace td
1268