1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/telegram/MessagesDb.h"
8
9 #include "td/telegram/logevent/LogEvent.h"
10 #include "td/telegram/Version.h"
11
12 #include "td/db/SqliteConnectionSafe.h"
13 #include "td/db/SqliteDb.h"
14 #include "td/db/SqliteStatement.h"
15
16 #include "td/actor/actor.h"
17 #include "td/actor/PromiseFuture.h"
18 #include "td/actor/SchedulerLocalStorage.h"
19
20 #include "td/utils/format.h"
21 #include "td/utils/logging.h"
22 #include "td/utils/ScopeGuard.h"
23 #include "td/utils/Slice.h"
24 #include "td/utils/SliceBuilder.h"
25 #include "td/utils/StackAllocator.h"
26 #include "td/utils/StringBuilder.h"
27 #include "td/utils/Time.h"
28 #include "td/utils/tl_helpers.h"
29 #include "td/utils/unicode.h"
30 #include "td/utils/utf8.h"
31
32 #include <algorithm>
33 #include <array>
34 #include <iterator>
35 #include <limits>
36 #include <tuple>
37 #include <utility>
38
39 namespace td {
40
41 static constexpr int32 MESSAGES_DB_INDEX_COUNT = 30;
42 static constexpr int32 MESSAGES_DB_INDEX_COUNT_OLD = 9;
43
44 // NB: must happen inside a transaction
init_messages_db(SqliteDb & db,int32 version)45 Status init_messages_db(SqliteDb &db, int32 version) {
46 LOG(INFO) << "Init message database " << tag("version", version);
47
48 // Check if database exists
49 TRY_RESULT(has_table, db.has_table("messages"));
50 if (!has_table) {
51 version = 0;
52 } else if (version < static_cast<int32>(DbVersion::DialogDbCreated) || version > current_db_version()) {
53 TRY_STATUS(drop_messages_db(db, version));
54 version = 0;
55 }
56
57 auto add_media_indices = [&db](int begin, int end) {
58 for (int i = begin; i < end; i++) {
59 TRY_STATUS(db.exec(PSLICE() << "CREATE INDEX IF NOT EXISTS message_index_" << i
60 << " ON messages (dialog_id, message_id) WHERE (index_mask & " << (1 << i)
61 << ") != 0"));
62 }
63 return Status::OK();
64 };
65
66 auto add_fts = [&db] {
67 TRY_STATUS(
68 db.exec("CREATE INDEX IF NOT EXISTS message_by_search_id ON messages "
69 "(search_id) WHERE search_id IS NOT NULL"));
70
71 TRY_STATUS(
72 db.exec("CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5(text, content='messages', "
73 "content_rowid='search_id', tokenize = \"unicode61 remove_diacritics 0 tokenchars '\a'\")"));
74 TRY_STATUS(db.exec(
75 "CREATE TRIGGER IF NOT EXISTS trigger_fts_delete BEFORE DELETE ON messages WHEN OLD.search_id IS NOT NULL"
76 " BEGIN INSERT INTO messages_fts(messages_fts, rowid, text) VALUES(\'delete\', OLD.search_id, OLD.text); END"));
77 TRY_STATUS(db.exec(
78 "CREATE TRIGGER IF NOT EXISTS trigger_fts_insert AFTER INSERT ON messages WHEN NEW.search_id IS NOT NULL"
79 " BEGIN INSERT INTO messages_fts(rowid, text) VALUES(NEW.search_id, NEW.text); END"));
80 //TRY_STATUS(db.exec(
81 //"CREATE TRIGGER IF NOT EXISTS trigger_fts_update AFTER UPDATE ON messages WHEN NEW.search_id IS NOT NULL OR "
82 //"OLD.search_id IS NOT NULL"
83 //" BEGIN "
84 //"INSERT INTO messages_fts(messages_fts, rowid, text) VALUES(\'delete\', OLD.search_id, OLD.text); "
85 //"INSERT INTO messages_fts(rowid, text) VALUES(NEW.search_id, NEW.text); "
86 //" END"));
87
88 return Status::OK();
89 };
90 auto add_call_index = [&db] {
91 for (int i = static_cast<int>(MessageSearchFilter::Call) - 1; i < static_cast<int>(MessageSearchFilter::MissedCall);
92 i++) {
93 TRY_STATUS(db.exec(PSLICE() << "CREATE INDEX IF NOT EXISTS full_message_index_" << i
94 << " ON messages (unique_message_id) WHERE (index_mask & " << (1 << i) << ") != 0"));
95 }
96 return Status::OK();
97 };
98 auto add_notification_id_index = [&db] {
99 return db.exec(
100 "CREATE INDEX IF NOT EXISTS message_by_notification_id ON messages (dialog_id, notification_id) WHERE "
101 "notification_id IS NOT NULL");
102 };
103 auto add_scheduled_messages_table = [&db] {
104 TRY_STATUS(
105 db.exec("CREATE TABLE IF NOT EXISTS scheduled_messages (dialog_id INT8, message_id INT8, "
106 "server_message_id INT4, data BLOB, PRIMARY KEY (dialog_id, message_id))"));
107
108 TRY_STATUS(
109 db.exec("CREATE INDEX IF NOT EXISTS message_by_server_message_id ON scheduled_messages "
110 "(dialog_id, server_message_id) WHERE server_message_id IS NOT NULL"));
111 return Status::OK();
112 };
113
114 if (version == 0) {
115 LOG(INFO) << "Create new message database";
116 TRY_STATUS(
117 db.exec("CREATE TABLE IF NOT EXISTS messages (dialog_id INT8, message_id INT8, unique_message_id INT4, "
118 "sender_user_id INT8, random_id INT8, data BLOB, ttl_expires_at INT4, index_mask INT4, search_id INT8, "
119 "text STRING, notification_id INT4, top_thread_message_id INT8, PRIMARY KEY (dialog_id, message_id))"));
120
121 TRY_STATUS(
122 db.exec("CREATE INDEX IF NOT EXISTS message_by_random_id ON messages (dialog_id, random_id) "
123 "WHERE random_id IS NOT NULL"));
124
125 TRY_STATUS(
126 db.exec("CREATE INDEX IF NOT EXISTS message_by_unique_message_id ON messages "
127 "(unique_message_id) WHERE unique_message_id IS NOT NULL"));
128
129 TRY_STATUS(
130 db.exec("CREATE INDEX IF NOT EXISTS message_by_ttl ON messages "
131 "(ttl_expires_at) WHERE ttl_expires_at IS NOT NULL"));
132
133 TRY_STATUS(add_media_indices(0, MESSAGES_DB_INDEX_COUNT));
134
135 TRY_STATUS(add_fts());
136
137 TRY_STATUS(add_call_index());
138
139 TRY_STATUS(add_notification_id_index());
140
141 TRY_STATUS(add_scheduled_messages_table());
142
143 version = current_db_version();
144 }
145 if (version < static_cast<int32>(DbVersion::MessagesDbMediaIndex)) {
146 TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN index_mask INT4"));
147 TRY_STATUS(add_media_indices(0, MESSAGES_DB_INDEX_COUNT_OLD));
148 }
149 if (version < static_cast<int32>(DbVersion::MessagesDb30MediaIndex)) {
150 TRY_STATUS(add_media_indices(MESSAGES_DB_INDEX_COUNT_OLD, MESSAGES_DB_INDEX_COUNT));
151 }
152 if (version < static_cast<int32>(DbVersion::MessagesDbFts)) {
153 TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN search_id INT8"));
154 TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN text STRING"));
155 TRY_STATUS(add_fts());
156 }
157 if (version < static_cast<int32>(DbVersion::MessagesCallIndex)) {
158 TRY_STATUS(add_call_index());
159 }
160 if (version < static_cast<int32>(DbVersion::AddNotificationsSupport)) {
161 TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN notification_id INT4"));
162 TRY_STATUS(add_notification_id_index());
163 }
164 if (version < static_cast<int32>(DbVersion::AddScheduledMessages)) {
165 TRY_STATUS(add_scheduled_messages_table());
166 }
167 if (version < static_cast<int32>(DbVersion::AddMessageThreadSupport)) {
168 TRY_STATUS(db.exec("ALTER TABLE messages ADD COLUMN top_thread_message_id INT8"));
169 }
170 return Status::OK();
171 }
172
173 // NB: must happen inside a transaction
drop_messages_db(SqliteDb & db,int32 version)174 Status drop_messages_db(SqliteDb &db, int32 version) {
175 LOG(WARNING) << "Drop message database " << tag("version", version)
176 << tag("current_db_version", current_db_version());
177 return db.exec("DROP TABLE IF EXISTS messages");
178 }
179
180 class MessagesDbImpl final : public MessagesDbSyncInterface {
181 public:
MessagesDbImpl(SqliteDb db)182 explicit MessagesDbImpl(SqliteDb db) : db_(std::move(db)) {
183 init().ensure();
184 }
185
init()186 Status init() {
187 TRY_RESULT_ASSIGN(
188 add_message_stmt_,
189 db_.get_statement("INSERT OR REPLACE INTO messages VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)"));
190 TRY_RESULT_ASSIGN(delete_message_stmt_,
191 db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
192 TRY_RESULT_ASSIGN(delete_all_dialog_messages_stmt_,
193 db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND message_id <= ?2"));
194 TRY_RESULT_ASSIGN(delete_dialog_messages_by_sender_stmt_,
195 db_.get_statement("DELETE FROM messages WHERE dialog_id = ?1 AND sender_user_id == ?2"));
196
197 TRY_RESULT_ASSIGN(
198 get_message_stmt_,
199 db_.get_statement("SELECT message_id, data FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
200 TRY_RESULT_ASSIGN(
201 get_message_by_random_id_stmt_,
202 db_.get_statement("SELECT message_id, data FROM messages WHERE dialog_id = ?1 AND random_id = ?2"));
203 TRY_RESULT_ASSIGN(
204 get_message_by_unique_message_id_stmt_,
205 db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE unique_message_id = ?1"));
206
207 TRY_RESULT_ASSIGN(
208 get_expiring_messages_stmt_,
209 db_.get_statement(
210 "SELECT dialog_id, message_id, data FROM messages WHERE ?1 < ttl_expires_at AND ttl_expires_at <= ?2"));
211 TRY_RESULT_ASSIGN(get_expiring_messages_helper_stmt_,
212 db_.get_statement("SELECT MAX(ttl_expires_at), COUNT(*) FROM (SELECT ttl_expires_at FROM "
213 "messages WHERE ?1 < ttl_expires_at LIMIT ?2) AS T"));
214
215 TRY_RESULT_ASSIGN(get_messages_stmt_.asc_stmt_,
216 db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id > "
217 "?2 ORDER BY message_id ASC LIMIT ?3"));
218 TRY_RESULT_ASSIGN(get_messages_stmt_.desc_stmt_,
219 db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id < "
220 "?2 ORDER BY message_id DESC LIMIT ?3"));
221 TRY_RESULT_ASSIGN(get_scheduled_messages_stmt_,
222 db_.get_statement("SELECT data, message_id FROM scheduled_messages WHERE dialog_id = ?1 AND "
223 "message_id < ?2 ORDER BY message_id DESC LIMIT ?3"));
224 TRY_RESULT_ASSIGN(get_messages_from_notification_id_stmt_,
225 db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND "
226 "notification_id < ?2 ORDER BY notification_id DESC LIMIT ?3"));
227 TRY_RESULT_ASSIGN(get_messages_fts_stmt_,
228 db_.get_statement("SELECT dialog_id, message_id, data, search_id FROM messages WHERE search_id "
229 "IN (SELECT rowid FROM messages_fts WHERE messages_fts MATCH ?1 AND rowid < ?2 "
230 "ORDER BY rowid DESC LIMIT ?3) ORDER BY search_id DESC"));
231
232 for (int32 i = 0; i < MESSAGES_DB_INDEX_COUNT; i++) {
233 TRY_RESULT_ASSIGN(
234 get_message_ids_stmts_[i],
235 db_.get_statement(
236 PSLICE() << "SELECT message_id FROM messages WHERE dialog_id = ?1 AND message_id < ?2 AND (index_mask & "
237 << (1 << i) << ") != 0 ORDER BY message_id DESC LIMIT 1000000"));
238
239 TRY_RESULT_ASSIGN(
240 get_messages_from_index_stmts_[i].desc_stmt_,
241 db_.get_statement(
242 PSLICE()
243 << "SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id < ?2 AND (index_mask & "
244 << (1 << i) << ") != 0 ORDER BY message_id DESC LIMIT ?3"));
245
246 TRY_RESULT_ASSIGN(
247 get_messages_from_index_stmts_[i].asc_stmt_,
248 db_.get_statement(
249 PSLICE()
250 << "SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id > ?2 AND (index_mask & "
251 << (1 << i) << ") != 0 ORDER BY message_id ASC LIMIT ?3"));
252
253 // LOG(ERROR) << get_messages_from_index_stmts_[i].desc_stmt_.explain().ok();
254 // LOG(ERROR) << get_messages_from_index_stmts_[i].asc_stmt_.explain().ok();
255 }
256
257 for (int i = static_cast<int>(MessageSearchFilter::Call) - 1, pos = 0;
258 i < static_cast<int>(MessageSearchFilter::MissedCall); i++, pos++) {
259 TRY_RESULT_ASSIGN(
260 get_calls_stmts_[pos],
261 db_.get_statement(
262 PSLICE()
263 << "SELECT dialog_id, message_id, data FROM messages WHERE unique_message_id < ?1 AND (index_mask & "
264 << (1 << i) << ") != 0 ORDER BY unique_message_id DESC LIMIT ?2"));
265 }
266
267 TRY_RESULT_ASSIGN(add_scheduled_message_stmt_,
268 db_.get_statement("INSERT OR REPLACE INTO scheduled_messages VALUES(?1, ?2, ?3, ?4)"));
269 TRY_RESULT_ASSIGN(
270 get_scheduled_message_stmt_,
271 db_.get_statement("SELECT message_id, data FROM scheduled_messages WHERE dialog_id = ?1 AND message_id = ?2"));
272 TRY_RESULT_ASSIGN(
273 get_scheduled_server_message_stmt_,
274 db_.get_statement(
275 "SELECT message_id, data FROM scheduled_messages WHERE dialog_id = ?1 AND server_message_id = ?2"));
276 TRY_RESULT_ASSIGN(delete_scheduled_message_stmt_,
277 db_.get_statement("DELETE FROM scheduled_messages WHERE dialog_id = ?1 AND message_id = ?2"));
278 TRY_RESULT_ASSIGN(
279 delete_scheduled_server_message_stmt_,
280 db_.get_statement("DELETE FROM scheduled_messages WHERE dialog_id = ?1 AND server_message_id = ?2"));
281
282 // LOG(ERROR) << get_message_stmt_.explain().ok();
283 // LOG(ERROR) << get_messages_from_notification_id_stmt.explain().ok();
284 // LOG(ERROR) << get_message_by_random_id_stmt_.explain().ok();
285 // LOG(ERROR) << get_message_by_unique_message_id_stmt_.explain().ok();
286
287 // LOG(ERROR) << get_expiring_messages_stmt_.explain().ok();
288 // LOG(ERROR) << get_expiring_messages_helper_stmt_.explain().ok();
289
290 // LOG(FATAL) << "EXPLAINED";
291
292 return Status::OK();
293 }
294
add_message(FullMessageId full_message_id,ServerMessageId unique_message_id,DialogId sender_dialog_id,int64 random_id,int32 ttl_expires_at,int32 index_mask,int64 search_id,string text,NotificationId notification_id,MessageId top_thread_message_id,BufferSlice data)295 Status add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
296 int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
297 NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data) final {
298 LOG(INFO) << "Add " << full_message_id << " to database";
299 auto dialog_id = full_message_id.get_dialog_id();
300 auto message_id = full_message_id.get_message_id();
301 LOG_CHECK(dialog_id.is_valid()) << dialog_id << ' ' << message_id << ' ' << full_message_id;
302 CHECK(message_id.is_valid());
303 SCOPE_EXIT {
304 add_message_stmt_.reset();
305 };
306 add_message_stmt_.bind_int64(1, dialog_id.get()).ensure();
307 add_message_stmt_.bind_int64(2, message_id.get()).ensure();
308
309 if (unique_message_id.is_valid()) {
310 add_message_stmt_.bind_int32(3, unique_message_id.get()).ensure();
311 } else {
312 add_message_stmt_.bind_null(3).ensure();
313 }
314
315 if (sender_dialog_id.is_valid()) {
316 add_message_stmt_.bind_int64(4, sender_dialog_id.get()).ensure();
317 } else {
318 add_message_stmt_.bind_null(4).ensure();
319 }
320
321 if (random_id != 0) {
322 add_message_stmt_.bind_int64(5, random_id).ensure();
323 } else {
324 add_message_stmt_.bind_null(5).ensure();
325 }
326
327 add_message_stmt_.bind_blob(6, data.as_slice()).ensure();
328
329 if (ttl_expires_at != 0) {
330 add_message_stmt_.bind_int32(7, ttl_expires_at).ensure();
331 } else {
332 add_message_stmt_.bind_null(7).ensure();
333 }
334
335 if (index_mask != 0) {
336 add_message_stmt_.bind_int32(8, index_mask).ensure();
337 } else {
338 add_message_stmt_.bind_null(8).ensure();
339 }
340 if (search_id != 0) {
341 // add dialog_id to text
342 text += PSTRING() << " \a" << dialog_id.get();
343 if (index_mask != 0) {
344 for (int i = 0; i < MESSAGES_DB_INDEX_COUNT; i++) {
345 if ((index_mask & (1 << i))) {
346 text += PSTRING() << " \a\a" << i;
347 }
348 }
349 }
350 add_message_stmt_.bind_int64(9, search_id).ensure();
351 } else {
352 text = "";
353 add_message_stmt_.bind_null(9).ensure();
354 }
355 if (!text.empty()) {
356 add_message_stmt_.bind_string(10, text).ensure();
357 } else {
358 add_message_stmt_.bind_null(10).ensure();
359 }
360 if (notification_id.is_valid()) {
361 add_message_stmt_.bind_int32(11, notification_id.get()).ensure();
362 } else {
363 add_message_stmt_.bind_null(11).ensure();
364 }
365 if (top_thread_message_id.is_valid()) {
366 add_message_stmt_.bind_int64(12, top_thread_message_id.get()).ensure();
367 } else {
368 add_message_stmt_.bind_null(12).ensure();
369 }
370
371 add_message_stmt_.step().ensure();
372
373 return Status::OK();
374 }
375
add_scheduled_message(FullMessageId full_message_id,BufferSlice data)376 Status add_scheduled_message(FullMessageId full_message_id, BufferSlice data) final {
377 LOG(INFO) << "Add " << full_message_id << " to database";
378 auto dialog_id = full_message_id.get_dialog_id();
379 auto message_id = full_message_id.get_message_id();
380 CHECK(dialog_id.is_valid());
381 CHECK(message_id.is_valid_scheduled());
382 SCOPE_EXIT {
383 add_scheduled_message_stmt_.reset();
384 };
385 add_scheduled_message_stmt_.bind_int64(1, dialog_id.get()).ensure();
386 add_scheduled_message_stmt_.bind_int64(2, message_id.get()).ensure();
387
388 if (message_id.is_scheduled_server()) {
389 add_scheduled_message_stmt_.bind_int32(3, message_id.get_scheduled_server_message_id().get()).ensure();
390 } else {
391 add_scheduled_message_stmt_.bind_null(3).ensure();
392 }
393
394 add_scheduled_message_stmt_.bind_blob(4, data.as_slice()).ensure();
395
396 add_scheduled_message_stmt_.step().ensure();
397
398 return Status::OK();
399 }
400
delete_message(FullMessageId full_message_id)401 Status delete_message(FullMessageId full_message_id) final {
402 LOG(INFO) << "Delete " << full_message_id << " from database";
403 auto dialog_id = full_message_id.get_dialog_id();
404 auto message_id = full_message_id.get_message_id();
405 CHECK(dialog_id.is_valid());
406 CHECK(message_id.is_valid() || message_id.is_valid_scheduled());
407 bool is_scheduled = message_id.is_scheduled();
408 bool is_scheduled_server = is_scheduled && message_id.is_scheduled_server();
409 auto &stmt = is_scheduled
410 ? (is_scheduled_server ? delete_scheduled_server_message_stmt_ : delete_scheduled_message_stmt_)
411 : delete_message_stmt_;
412 SCOPE_EXIT {
413 stmt.reset();
414 };
415 stmt.bind_int64(1, dialog_id.get()).ensure();
416 if (is_scheduled_server) {
417 stmt.bind_int32(2, message_id.get_scheduled_server_message_id().get()).ensure();
418 } else {
419 stmt.bind_int64(2, message_id.get()).ensure();
420 }
421 stmt.step().ensure();
422 return Status::OK();
423 }
424
delete_all_dialog_messages(DialogId dialog_id,MessageId from_message_id)425 Status delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) final {
426 LOG(INFO) << "Delete all messages in " << dialog_id << " up to " << from_message_id << " from database";
427 CHECK(dialog_id.is_valid());
428 CHECK(from_message_id.is_valid());
429 SCOPE_EXIT {
430 delete_all_dialog_messages_stmt_.reset();
431 };
432 delete_all_dialog_messages_stmt_.bind_int64(1, dialog_id.get()).ensure();
433 delete_all_dialog_messages_stmt_.bind_int64(2, from_message_id.get()).ensure();
434 auto status = delete_all_dialog_messages_stmt_.step();
435 if (status.is_error()) {
436 LOG(ERROR) << status;
437 }
438 return status;
439 }
440
delete_dialog_messages_by_sender(DialogId dialog_id,DialogId sender_dialog_id)441 Status delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id) final {
442 LOG(INFO) << "Delete all messages in " << dialog_id << " sent by " << sender_dialog_id << " from database";
443 CHECK(dialog_id.is_valid());
444 CHECK(sender_dialog_id.is_valid());
445 SCOPE_EXIT {
446 delete_dialog_messages_by_sender_stmt_.reset();
447 };
448 delete_dialog_messages_by_sender_stmt_.bind_int64(1, dialog_id.get()).ensure();
449 delete_dialog_messages_by_sender_stmt_.bind_int64(2, sender_dialog_id.get()).ensure();
450 delete_dialog_messages_by_sender_stmt_.step().ensure();
451 return Status::OK();
452 }
453
get_message(FullMessageId full_message_id)454 Result<MessagesDbDialogMessage> get_message(FullMessageId full_message_id) final {
455 auto dialog_id = full_message_id.get_dialog_id();
456 auto message_id = full_message_id.get_message_id();
457 CHECK(dialog_id.is_valid());
458 CHECK(message_id.is_valid() || message_id.is_valid_scheduled());
459 bool is_scheduled = message_id.is_scheduled();
460 bool is_scheduled_server = is_scheduled && message_id.is_scheduled_server();
461 auto &stmt = is_scheduled ? (is_scheduled_server ? get_scheduled_server_message_stmt_ : get_scheduled_message_stmt_)
462 : get_message_stmt_;
463 SCOPE_EXIT {
464 stmt.reset();
465 };
466
467 stmt.bind_int64(1, dialog_id.get()).ensure();
468 if (is_scheduled_server) {
469 stmt.bind_int32(2, message_id.get_scheduled_server_message_id().get()).ensure();
470 } else {
471 stmt.bind_int64(2, message_id.get()).ensure();
472 }
473 stmt.step().ensure();
474 if (!stmt.has_row()) {
475 return Status::Error("Not found");
476 }
477 MessageId received_message_id(stmt.view_int64(0));
478 Slice data = stmt.view_blob(1);
479 if (is_scheduled_server) {
480 CHECK(received_message_id.is_scheduled());
481 CHECK(received_message_id.is_scheduled_server());
482 CHECK(received_message_id.get_scheduled_server_message_id() == message_id.get_scheduled_server_message_id());
483 } else {
484 LOG_CHECK(received_message_id == message_id)
485 << received_message_id << ' ' << message_id << ' ' << get_message_info(received_message_id, data, true).first;
486 }
487 return MessagesDbDialogMessage{received_message_id, BufferSlice(data)};
488 }
489
get_message_by_unique_message_id(ServerMessageId unique_message_id)490 Result<MessagesDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) final {
491 if (!unique_message_id.is_valid()) {
492 return Status::Error("Invalid unique_message_id");
493 }
494 SCOPE_EXIT {
495 get_message_by_unique_message_id_stmt_.reset();
496 };
497 get_message_by_unique_message_id_stmt_.bind_int32(1, unique_message_id.get()).ensure();
498 get_message_by_unique_message_id_stmt_.step().ensure();
499 if (!get_message_by_unique_message_id_stmt_.has_row()) {
500 return Status::Error("Not found");
501 }
502 DialogId dialog_id(get_message_by_unique_message_id_stmt_.view_int64(0));
503 MessageId message_id(get_message_by_unique_message_id_stmt_.view_int64(1));
504 return MessagesDbMessage{dialog_id, message_id, BufferSlice(get_message_by_unique_message_id_stmt_.view_blob(2))};
505 }
506
get_message_by_random_id(DialogId dialog_id,int64 random_id)507 Result<MessagesDbDialogMessage> get_message_by_random_id(DialogId dialog_id, int64 random_id) final {
508 SCOPE_EXIT {
509 get_message_by_random_id_stmt_.reset();
510 };
511 get_message_by_random_id_stmt_.bind_int64(1, dialog_id.get()).ensure();
512 get_message_by_random_id_stmt_.bind_int64(2, random_id).ensure();
513 get_message_by_random_id_stmt_.step().ensure();
514 if (!get_message_by_random_id_stmt_.has_row()) {
515 return Status::Error("Not found");
516 }
517 MessageId message_id(get_message_by_random_id_stmt_.view_int64(0));
518 return MessagesDbDialogMessage{message_id, BufferSlice(get_message_by_random_id_stmt_.view_blob(1))};
519 }
520
get_dialog_message_by_date(DialogId dialog_id,MessageId first_message_id,MessageId last_message_id,int32 date)521 Result<MessagesDbDialogMessage> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id,
522 MessageId last_message_id, int32 date) final {
523 int64 left_message_id = first_message_id.get();
524 int64 right_message_id = last_message_id.get();
525 LOG_CHECK(left_message_id <= right_message_id) << first_message_id << " " << last_message_id;
526 TRY_RESULT(first_messages, get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 1));
527 if (!first_messages.empty()) {
528 MessageId real_first_message_id;
529 int32 real_first_message_date;
530 std::tie(real_first_message_id, real_first_message_date) = get_message_info(first_messages[0]);
531 if (real_first_message_date <= date) {
532 // we definitely have at least one suitable message, let's do a binary search
533 left_message_id = real_first_message_id.get();
534
535 MessageId prev_found_message_id;
536 while (left_message_id <= right_message_id) {
537 auto middle_message_id = left_message_id + ((right_message_id - left_message_id) >> 1);
538 TRY_RESULT(messages, get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, middle_message_id, 1));
539
540 MessageId message_id;
541 int32 message_date = std::numeric_limits<int32>::max();
542 if (!messages.empty()) {
543 std::tie(message_id, message_date) = get_message_info(messages[0]);
544 }
545 if (message_date <= date) {
546 left_message_id = message_id.get();
547 } else {
548 right_message_id = middle_message_id - 1;
549 }
550
551 if (prev_found_message_id == message_id) {
552 // we may be very close to the result, let's check
553 TRY_RESULT(left_messages,
554 get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 2));
555 CHECK(!left_messages.empty());
556 if (left_messages.size() == 1) {
557 // only one message has left, result is found
558 break;
559 }
560
561 MessageId next_message_id;
562 int32 next_message_date;
563 std::tie(next_message_id, next_message_date) = get_message_info(left_messages[1]);
564 if (next_message_date <= date) {
565 // next message has lesser date, adjusting left message
566 left_message_id = next_message_id.get();
567 } else {
568 // next message has bigger date, result is found
569 break;
570 }
571 }
572
573 prev_found_message_id = message_id;
574 }
575
576 // left_message_id is always an id of suitable message, let's return it
577 return get_message({dialog_id, MessageId(left_message_id)});
578 }
579 }
580
581 return Status::Error("Not found");
582 }
583
get_expiring_messages(int32 expires_from,int32 expires_till,int32 limit)584 Result<std::pair<vector<MessagesDbMessage>, int32>> get_expiring_messages(int32 expires_from, int32 expires_till,
585 int32 limit) final {
586 SCOPE_EXIT {
587 get_expiring_messages_stmt_.reset();
588 get_expiring_messages_helper_stmt_.reset();
589 };
590
591 vector<MessagesDbMessage> messages;
592 // load messages
593 if (expires_from <= expires_till) {
594 get_expiring_messages_stmt_.bind_int32(1, expires_from).ensure();
595 get_expiring_messages_stmt_.bind_int32(2, expires_till).ensure();
596 get_expiring_messages_stmt_.step().ensure();
597
598 while (get_expiring_messages_stmt_.has_row()) {
599 DialogId dialog_id(get_expiring_messages_stmt_.view_int64(0));
600 MessageId message_id(get_expiring_messages_stmt_.view_int64(1));
601 BufferSlice data(get_expiring_messages_stmt_.view_blob(2));
602 messages.push_back(MessagesDbMessage{dialog_id, message_id, std::move(data)});
603 get_expiring_messages_stmt_.step().ensure();
604 }
605 }
606
607 // calc next expires_till
608 get_expiring_messages_helper_stmt_.bind_int32(1, expires_till).ensure();
609 get_expiring_messages_helper_stmt_.bind_int32(2, limit).ensure();
610 get_expiring_messages_helper_stmt_.step().ensure();
611 CHECK(get_expiring_messages_helper_stmt_.has_row());
612 int32 count = get_expiring_messages_helper_stmt_.view_int32(1);
613 int32 next_expires_till = -1;
614 if (count != 0) {
615 next_expires_till = get_expiring_messages_helper_stmt_.view_int32(0);
616 }
617 return std::make_pair(std::move(messages), next_expires_till);
618 }
619
get_dialog_message_calendar(MessagesDbDialogCalendarQuery query)620 Result<MessagesDbCalendar> get_dialog_message_calendar(MessagesDbDialogCalendarQuery query) final {
621 auto &stmt = get_messages_from_index_stmts_[message_search_filter_index(query.filter)].desc_stmt_;
622 SCOPE_EXIT {
623 stmt.reset();
624 };
625 int32 limit = 1000;
626 stmt.bind_int64(1, query.dialog_id.get()).ensure();
627 stmt.bind_int64(2, query.from_message_id.get()).ensure();
628 stmt.bind_int32(3, limit).ensure();
629
630 vector<MessagesDbDialogMessage> messages;
631 vector<int32> total_counts;
632 stmt.step().ensure();
633 int32 current_day = std::numeric_limits<int32>::max();
634 while (stmt.has_row()) {
635 auto data_slice = stmt.view_blob(0);
636 MessageId message_id(stmt.view_int64(1));
637 auto info = get_message_info(message_id, data_slice, false);
638 auto day = (query.tz_offset + info.second) / 86400;
639 if (day >= current_day) {
640 CHECK(!total_counts.empty());
641 total_counts.back()++;
642 } else {
643 current_day = day;
644 messages.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
645 total_counts.push_back(1);
646 }
647 stmt.step().ensure();
648 }
649 return MessagesDbCalendar{std::move(messages), std::move(total_counts)};
650 }
651
get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query)652 Result<MessagesDbMessagePositions> get_dialog_sparse_message_positions(
653 MessagesDbGetDialogSparseMessagePositionsQuery query) final {
654 auto &stmt = get_message_ids_stmts_[message_search_filter_index(query.filter)];
655 SCOPE_EXIT {
656 stmt.reset();
657 };
658 stmt.bind_int64(1, query.dialog_id.get()).ensure();
659 stmt.bind_int64(2, query.from_message_id.get()).ensure();
660
661 vector<MessageId> message_ids;
662 stmt.step().ensure();
663 while (stmt.has_row()) {
664 message_ids.push_back(MessageId(stmt.view_int64(0)));
665 stmt.step().ensure();
666 }
667
668 int32 limit = min(query.limit, static_cast<int32>(message_ids.size()));
669 double delta = static_cast<double>(message_ids.size()) / limit;
670 MessagesDbMessagePositions positions;
671 positions.total_count = static_cast<int32>(message_ids.size());
672 positions.positions.reserve(limit);
673 for (int32 i = 0; i < limit; i++) {
674 auto position = static_cast<int32>((i + 0.5) * delta);
675 auto message_id = message_ids[position];
676 TRY_RESULT(message, get_message({query.dialog_id, message_id}));
677 auto date = get_message_info(message).second;
678 positions.positions.push_back(MessagesDbMessagePosition{position, date, message_id});
679 }
680 return positions;
681 }
682
get_messages(MessagesDbMessagesQuery query)683 Result<vector<MessagesDbDialogMessage>> get_messages(MessagesDbMessagesQuery query) final {
684 if (query.filter != MessageSearchFilter::Empty) {
685 return get_messages_from_index(query.dialog_id, query.from_message_id, query.filter, query.offset, query.limit);
686 }
687 return get_messages_impl(get_messages_stmt_, query.dialog_id, query.from_message_id, query.offset, query.limit);
688 }
689
get_scheduled_messages(DialogId dialog_id,int32 limit)690 Result<vector<MessagesDbDialogMessage>> get_scheduled_messages(DialogId dialog_id, int32 limit) final {
691 return get_messages_inner(get_scheduled_messages_stmt_, dialog_id, std::numeric_limits<int64>::max(), limit);
692 }
693
get_messages_from_notification_id(DialogId dialog_id,NotificationId from_notification_id,int32 limit)694 Result<vector<MessagesDbDialogMessage>> get_messages_from_notification_id(DialogId dialog_id,
695 NotificationId from_notification_id,
696 int32 limit) final {
697 auto &stmt = get_messages_from_notification_id_stmt_;
698 SCOPE_EXIT {
699 stmt.reset();
700 };
701 stmt.bind_int64(1, dialog_id.get()).ensure();
702 stmt.bind_int32(2, from_notification_id.get()).ensure();
703 stmt.bind_int32(3, limit).ensure();
704
705 vector<MessagesDbDialogMessage> result;
706 stmt.step().ensure();
707 while (stmt.has_row()) {
708 auto data_slice = stmt.view_blob(0);
709 MessageId message_id(stmt.view_int64(1));
710 result.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
711 LOG(INFO) << "Load " << message_id << " in " << dialog_id << " from database";
712 stmt.step().ensure();
713 }
714 return std::move(result);
715 }
716
prepare_query(Slice query)717 static string prepare_query(Slice query) {
718 auto is_word_character = [](uint32 a) {
719 switch (get_unicode_simple_category(a)) {
720 case UnicodeSimpleCategory::Letter:
721 case UnicodeSimpleCategory::DecimalNumber:
722 case UnicodeSimpleCategory::Number:
723 return true;
724 default:
725 return a == '_';
726 }
727 };
728
729 const size_t MAX_QUERY_SIZE = 1024;
730 query = utf8_truncate(query, MAX_QUERY_SIZE);
731 auto buf = StackAllocator::alloc(query.size() * 4 + 100);
732 StringBuilder sb(buf.as_slice());
733 bool in_word{false};
734
735 for (auto ptr = query.ubegin(), end = query.uend(); ptr < end;) {
736 uint32 code;
737 auto code_ptr = ptr;
738 ptr = next_utf8_unsafe(ptr, &code, "prepare_query");
739 if (is_word_character(code)) {
740 if (!in_word) {
741 in_word = true;
742 sb << "\"";
743 }
744 sb << Slice(code_ptr, ptr);
745 } else {
746 if (in_word) {
747 in_word = false;
748 sb << "\" ";
749 }
750 }
751 }
752 if (in_word) {
753 sb << "\" ";
754 }
755
756 if (sb.is_error()) {
757 LOG(ERROR) << "StringBuilder buffer overflow";
758 return "";
759 }
760
761 return sb.as_cslice().str();
762 }
763
get_messages_fts(MessagesDbFtsQuery query)764 Result<MessagesDbFtsResult> get_messages_fts(MessagesDbFtsQuery query) final {
765 SCOPE_EXIT {
766 get_messages_fts_stmt_.reset();
767 };
768
769 LOG(INFO) << tag("query", query.query) << query.dialog_id << tag("filter", query.filter)
770 << tag("from_search_id", query.from_search_id) << tag("limit", query.limit);
771 string words = prepare_query(query.query);
772 LOG(INFO) << tag("from", query.query) << tag("to", words);
773
774 // dialog_id kludge
775 if (query.dialog_id.is_valid()) {
776 words += PSTRING() << " \"\a" << query.dialog_id.get() << "\"";
777 }
778
779 // index_mask kludge
780 if (query.filter != MessageSearchFilter::Empty) {
781 words += PSTRING() << " \"\a\a" << message_search_filter_index(query.filter) << "\"";
782 }
783
784 auto &stmt = get_messages_fts_stmt_;
785 stmt.bind_string(1, words).ensure();
786 if (query.from_search_id == 0) {
787 query.from_search_id = std::numeric_limits<int64>::max();
788 }
789 stmt.bind_int64(2, query.from_search_id).ensure();
790 stmt.bind_int32(3, query.limit).ensure();
791 MessagesDbFtsResult result;
792 auto status = stmt.step();
793 if (status.is_error()) {
794 LOG(ERROR) << status;
795 return std::move(result);
796 }
797 while (stmt.has_row()) {
798 DialogId dialog_id(stmt.view_int64(0));
799 MessageId message_id(stmt.view_int64(1));
800 auto data_slice = stmt.view_blob(2);
801 auto search_id = stmt.view_int64(3);
802 result.next_search_id = search_id;
803 result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
804 stmt.step().ensure();
805 }
806 return std::move(result);
807 }
808
get_messages_from_index(DialogId dialog_id,MessageId from_message_id,MessageSearchFilter filter,int32 offset,int32 limit)809 Result<vector<MessagesDbDialogMessage>> get_messages_from_index(DialogId dialog_id, MessageId from_message_id,
810 MessageSearchFilter filter, int32 offset,
811 int32 limit) {
812 auto &stmt = get_messages_from_index_stmts_[message_search_filter_index(filter)];
813 return get_messages_impl(stmt, dialog_id, from_message_id, offset, limit);
814 }
815
get_calls(MessagesDbCallsQuery query)816 Result<MessagesDbCallsResult> get_calls(MessagesDbCallsQuery query) final {
817 int32 pos;
818 if (query.filter == MessageSearchFilter::Call) {
819 pos = 0;
820 } else if (query.filter == MessageSearchFilter::MissedCall) {
821 pos = 1;
822 } else {
823 return Status::Error(PSLICE() << "Filter is not Call or MissedCall: " << query.filter);
824 }
825
826 auto &stmt = get_calls_stmts_[pos];
827 SCOPE_EXIT {
828 stmt.reset();
829 };
830
831 stmt.bind_int32(1, query.from_unique_message_id).ensure();
832 stmt.bind_int32(2, query.limit).ensure();
833
834 MessagesDbCallsResult result;
835 stmt.step().ensure();
836 while (stmt.has_row()) {
837 DialogId dialog_id(stmt.view_int64(0));
838 MessageId message_id(stmt.view_int64(1));
839 auto data_slice = stmt.view_blob(2);
840 result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
841 stmt.step().ensure();
842 }
843 return std::move(result);
844 }
845
begin_write_transaction()846 Status begin_write_transaction() final {
847 return db_.begin_write_transaction();
848 }
commit_transaction()849 Status commit_transaction() final {
850 return db_.commit_transaction();
851 }
852
853 private:
854 SqliteDb db_;
855
856 SqliteStatement add_message_stmt_;
857
858 SqliteStatement delete_message_stmt_;
859 SqliteStatement delete_all_dialog_messages_stmt_;
860 SqliteStatement delete_dialog_messages_by_sender_stmt_;
861
862 SqliteStatement get_message_stmt_;
863 SqliteStatement get_message_by_random_id_stmt_;
864 SqliteStatement get_message_by_unique_message_id_stmt_;
865 SqliteStatement get_expiring_messages_stmt_;
866 SqliteStatement get_expiring_messages_helper_stmt_;
867
868 struct GetMessagesStmt {
869 SqliteStatement asc_stmt_;
870 SqliteStatement desc_stmt_;
871 };
872 GetMessagesStmt get_messages_stmt_;
873 SqliteStatement get_scheduled_messages_stmt_;
874 SqliteStatement get_messages_from_notification_id_stmt_;
875
876 std::array<SqliteStatement, MESSAGES_DB_INDEX_COUNT> get_message_ids_stmts_;
877 std::array<GetMessagesStmt, MESSAGES_DB_INDEX_COUNT> get_messages_from_index_stmts_;
878 std::array<SqliteStatement, 2> get_calls_stmts_;
879
880 SqliteStatement get_messages_fts_stmt_;
881
882 SqliteStatement add_scheduled_message_stmt_;
883 SqliteStatement get_scheduled_message_stmt_;
884 SqliteStatement get_scheduled_server_message_stmt_;
885 SqliteStatement delete_scheduled_message_stmt_;
886 SqliteStatement delete_scheduled_server_message_stmt_;
887
get_messages_impl(GetMessagesStmt & stmt,DialogId dialog_id,MessageId from_message_id,int32 offset,int32 limit)888 static Result<vector<MessagesDbDialogMessage>> get_messages_impl(GetMessagesStmt &stmt, DialogId dialog_id,
889 MessageId from_message_id, int32 offset,
890 int32 limit) {
891 LOG_CHECK(dialog_id.is_valid()) << dialog_id;
892 CHECK(from_message_id.is_valid());
893
894 LOG(INFO) << "Loading messages in " << dialog_id << " from " << from_message_id << " with offset = " << offset
895 << " and limit = " << limit;
896
897 auto message_id = from_message_id.get();
898
899 if (message_id >= MessageId::max().get()) {
900 message_id--;
901 }
902
903 auto left_message_id = message_id;
904 auto left_cnt = limit + offset;
905
906 auto right_message_id = message_id - 1;
907 auto right_cnt = -offset;
908
909 vector<MessagesDbDialogMessage> left;
910 vector<MessagesDbDialogMessage> right;
911
912 if (left_cnt != 0) {
913 if (right_cnt == 1 && false) {
914 left_message_id++;
915 left_cnt++;
916 }
917
918 TRY_RESULT_ASSIGN(left, get_messages_inner(stmt.desc_stmt_, dialog_id, left_message_id, left_cnt));
919
920 if (right_cnt == 1 && !left.empty() && false /*get_message_id(left[0].as_slice()) == message_id*/) {
921 right_cnt = 0;
922 }
923 }
924 if (right_cnt != 0) {
925 TRY_RESULT_ASSIGN(right, get_messages_inner(stmt.asc_stmt_, dialog_id, right_message_id, right_cnt));
926 std::reverse(right.begin(), right.end());
927 }
928 if (left.empty()) {
929 return std::move(right);
930 }
931 if (right.empty()) {
932 return std::move(left);
933 }
934
935 right.reserve(right.size() + left.size());
936 std::move(left.begin(), left.end(), std::back_inserter(right));
937
938 return std::move(right);
939 }
940
get_messages_inner(SqliteStatement & stmt,DialogId dialog_id,int64 from_message_id,int32 limit)941 static Result<vector<MessagesDbDialogMessage>> get_messages_inner(SqliteStatement &stmt, DialogId dialog_id,
942 int64 from_message_id, int32 limit) {
943 SCOPE_EXIT {
944 stmt.reset();
945 };
946 stmt.bind_int64(1, dialog_id.get()).ensure();
947 stmt.bind_int64(2, from_message_id).ensure();
948 stmt.bind_int32(3, limit).ensure();
949
950 LOG(INFO) << "Begin to load " << limit << " messages in " << dialog_id << " from " << MessageId(from_message_id)
951 << " from database";
952 vector<MessagesDbDialogMessage> result;
953 stmt.step().ensure();
954 while (stmt.has_row()) {
955 auto data_slice = stmt.view_blob(0);
956 MessageId message_id(stmt.view_int64(1));
957 result.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
958 LOG(INFO) << "Loaded " << message_id << " in " << dialog_id << " from database";
959 stmt.step().ensure();
960 }
961 return std::move(result);
962 }
963
get_message_info(const MessagesDbDialogMessage & message,bool from_data=false)964 static std::pair<MessageId, int32> get_message_info(const MessagesDbDialogMessage &message, bool from_data = false) {
965 return get_message_info(message.message_id, message.data.as_slice(), from_data);
966 }
967
get_message_info(MessageId message_id,Slice data,bool from_data)968 static std::pair<MessageId, int32> get_message_info(MessageId message_id, Slice data, bool from_data) {
969 LogEventParser message_date_parser(data);
970 int32 flags;
971 int32 flags2 = 0;
972 int32 flags3 = 0;
973 td::parse(flags, message_date_parser);
974 if ((flags & (1 << 29)) != 0) {
975 td::parse(flags2, message_date_parser);
976 if ((flags2 & (1 << 29)) != 0) {
977 td::parse(flags3, message_date_parser);
978 }
979 }
980 bool has_sender = (flags & (1 << 10)) != 0;
981 MessageId data_message_id;
982 td::parse(data_message_id, message_date_parser);
983 UserId sender_user_id;
984 if (has_sender) {
985 td::parse(sender_user_id, message_date_parser);
986 }
987 int32 date;
988 td::parse(date, message_date_parser);
989 LOG(INFO) << "Loaded " << message_id << "(aka " << data_message_id << ") sent at " << date << " by "
990 << sender_user_id;
991 return {from_data ? data_message_id : message_id, date};
992 }
993 };
994
create_messages_db_sync(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)995 std::shared_ptr<MessagesDbSyncSafeInterface> create_messages_db_sync(
996 std::shared_ptr<SqliteConnectionSafe> sqlite_connection) {
997 class MessagesDbSyncSafe final : public MessagesDbSyncSafeInterface {
998 public:
999 explicit MessagesDbSyncSafe(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)
1000 : lsls_db_([safe_connection = std::move(sqlite_connection)] {
1001 return make_unique<MessagesDbImpl>(safe_connection->get().clone());
1002 }) {
1003 }
1004 MessagesDbSyncInterface &get() final {
1005 return *lsls_db_.get();
1006 }
1007
1008 private:
1009 LazySchedulerLocalStorage<unique_ptr<MessagesDbSyncInterface>> lsls_db_;
1010 };
1011 return std::make_shared<MessagesDbSyncSafe>(std::move(sqlite_connection));
1012 }
1013
1014 class MessagesDbAsync final : public MessagesDbAsyncInterface {
1015 public:
MessagesDbAsync(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,int32 scheduler_id)1016 MessagesDbAsync(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db, int32 scheduler_id) {
1017 impl_ = create_actor_on_scheduler<Impl>("MessagesDbActor", scheduler_id, std::move(sync_db));
1018 }
1019
add_message(FullMessageId full_message_id,ServerMessageId unique_message_id,DialogId sender_dialog_id,int64 random_id,int32 ttl_expires_at,int32 index_mask,int64 search_id,string text,NotificationId notification_id,MessageId top_thread_message_id,BufferSlice data,Promise<> promise)1020 void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
1021 int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
1022 NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data,
1023 Promise<> promise) final {
1024 send_closure_later(impl_, &Impl::add_message, full_message_id, unique_message_id, sender_dialog_id, random_id,
1025 ttl_expires_at, index_mask, search_id, std::move(text), notification_id, top_thread_message_id,
1026 std::move(data), std::move(promise));
1027 }
add_scheduled_message(FullMessageId full_message_id,BufferSlice data,Promise<> promise)1028 void add_scheduled_message(FullMessageId full_message_id, BufferSlice data, Promise<> promise) final {
1029 send_closure_later(impl_, &Impl::add_scheduled_message, full_message_id, std::move(data), std::move(promise));
1030 }
1031
delete_message(FullMessageId full_message_id,Promise<> promise)1032 void delete_message(FullMessageId full_message_id, Promise<> promise) final {
1033 send_closure_later(impl_, &Impl::delete_message, full_message_id, std::move(promise));
1034 }
delete_all_dialog_messages(DialogId dialog_id,MessageId from_message_id,Promise<> promise)1035 void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) final {
1036 send_closure_later(impl_, &Impl::delete_all_dialog_messages, dialog_id, from_message_id, std::move(promise));
1037 }
delete_dialog_messages_by_sender(DialogId dialog_id,DialogId sender_dialog_id,Promise<> promise)1038 void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id, Promise<> promise) final {
1039 send_closure_later(impl_, &Impl::delete_dialog_messages_by_sender, dialog_id, sender_dialog_id, std::move(promise));
1040 }
1041
get_message(FullMessageId full_message_id,Promise<MessagesDbDialogMessage> promise)1042 void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) final {
1043 send_closure_later(impl_, &Impl::get_message, full_message_id, std::move(promise));
1044 }
get_message_by_unique_message_id(ServerMessageId unique_message_id,Promise<MessagesDbMessage> promise)1045 void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessagesDbMessage> promise) final {
1046 send_closure_later(impl_, &Impl::get_message_by_unique_message_id, unique_message_id, std::move(promise));
1047 }
get_message_by_random_id(DialogId dialog_id,int64 random_id,Promise<MessagesDbDialogMessage> promise)1048 void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessagesDbDialogMessage> promise) final {
1049 send_closure_later(impl_, &Impl::get_message_by_random_id, dialog_id, random_id, std::move(promise));
1050 }
get_dialog_message_by_date(DialogId dialog_id,MessageId first_message_id,MessageId last_message_id,int32 date,Promise<MessagesDbDialogMessage> promise)1051 void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id, int32 date,
1052 Promise<MessagesDbDialogMessage> promise) final {
1053 send_closure_later(impl_, &Impl::get_dialog_message_by_date, dialog_id, first_message_id, last_message_id, date,
1054 std::move(promise));
1055 }
1056
get_dialog_message_calendar(MessagesDbDialogCalendarQuery query,Promise<MessagesDbCalendar> promise)1057 void get_dialog_message_calendar(MessagesDbDialogCalendarQuery query, Promise<MessagesDbCalendar> promise) final {
1058 send_closure_later(impl_, &Impl::get_dialog_message_calendar, std::move(query), std::move(promise));
1059 }
1060
get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,Promise<MessagesDbMessagePositions> promise)1061 void get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,
1062 Promise<MessagesDbMessagePositions> promise) final {
1063 send_closure_later(impl_, &Impl::get_dialog_sparse_message_positions, std::move(query), std::move(promise));
1064 }
1065
get_messages(MessagesDbMessagesQuery query,Promise<vector<MessagesDbDialogMessage>> promise)1066 void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) final {
1067 send_closure_later(impl_, &Impl::get_messages, std::move(query), std::move(promise));
1068 }
get_scheduled_messages(DialogId dialog_id,int32 limit,Promise<vector<MessagesDbDialogMessage>> promise)1069 void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessagesDbDialogMessage>> promise) final {
1070 send_closure_later(impl_, &Impl::get_scheduled_messages, dialog_id, limit, std::move(promise));
1071 }
get_messages_from_notification_id(DialogId dialog_id,NotificationId from_notification_id,int32 limit,Promise<vector<MessagesDbDialogMessage>> promise)1072 void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
1073 Promise<vector<MessagesDbDialogMessage>> promise) final {
1074 send_closure_later(impl_, &Impl::get_messages_from_notification_id, dialog_id, from_notification_id, limit,
1075 std::move(promise));
1076 }
get_calls(MessagesDbCallsQuery query,Promise<MessagesDbCallsResult> promise)1077 void get_calls(MessagesDbCallsQuery query, Promise<MessagesDbCallsResult> promise) final {
1078 send_closure_later(impl_, &Impl::get_calls, std::move(query), std::move(promise));
1079 }
get_messages_fts(MessagesDbFtsQuery query,Promise<MessagesDbFtsResult> promise)1080 void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) final {
1081 send_closure_later(impl_, &Impl::get_messages_fts, std::move(query), std::move(promise));
1082 }
get_expiring_messages(int32 expires_from,int32 expires_till,int32 limit,Promise<std::pair<vector<MessagesDbMessage>,int32>> promise)1083 void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
1084 Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) final {
1085 send_closure_later(impl_, &Impl::get_expiring_messages, expires_from, expires_till, limit, std::move(promise));
1086 }
1087
close(Promise<> promise)1088 void close(Promise<> promise) final {
1089 send_closure_later(impl_, &Impl::close, std::move(promise));
1090 }
1091
force_flush()1092 void force_flush() final {
1093 send_closure_later(impl_, &Impl::force_flush);
1094 }
1095
1096 private:
1097 class Impl final : public Actor {
1098 public:
Impl(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db_safe)1099 explicit Impl(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) {
1100 }
add_message(FullMessageId full_message_id,ServerMessageId unique_message_id,DialogId sender_dialog_id,int64 random_id,int32 ttl_expires_at,int32 index_mask,int64 search_id,string text,NotificationId notification_id,MessageId top_thread_message_id,BufferSlice data,Promise<> promise)1101 void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
1102 int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
1103 NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data,
1104 Promise<> promise) {
1105 add_write_query([this, full_message_id, unique_message_id, sender_dialog_id, random_id, ttl_expires_at,
1106 index_mask, search_id, text = std::move(text), notification_id, top_thread_message_id,
1107 data = std::move(data), promise = std::move(promise)](Unit) mutable {
1108 on_write_result(std::move(promise),
1109 sync_db_->add_message(full_message_id, unique_message_id, sender_dialog_id, random_id,
1110 ttl_expires_at, index_mask, search_id, std::move(text), notification_id,
1111 top_thread_message_id, std::move(data)));
1112 });
1113 }
add_scheduled_message(FullMessageId full_message_id,BufferSlice data,Promise<> promise)1114 void add_scheduled_message(FullMessageId full_message_id, BufferSlice data, Promise<> promise) {
1115 add_write_query([this, full_message_id, promise = std::move(promise), data = std::move(data)](Unit) mutable {
1116 on_write_result(std::move(promise), sync_db_->add_scheduled_message(full_message_id, std::move(data)));
1117 });
1118 }
1119
delete_message(FullMessageId full_message_id,Promise<> promise)1120 void delete_message(FullMessageId full_message_id, Promise<> promise) {
1121 add_write_query([this, full_message_id, promise = std::move(promise)](Unit) mutable {
1122 on_write_result(std::move(promise), sync_db_->delete_message(full_message_id));
1123 });
1124 }
on_write_result(Promise<> promise,Status status)1125 void on_write_result(Promise<> promise, Status status) {
1126 // We are inside a transaction and don't know how to handle the error
1127 status.ensure();
1128 pending_write_results_.emplace_back(std::move(promise), std::move(status));
1129 }
delete_all_dialog_messages(DialogId dialog_id,MessageId from_message_id,Promise<> promise)1130 void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) {
1131 add_read_query();
1132 promise.set_result(sync_db_->delete_all_dialog_messages(dialog_id, from_message_id));
1133 }
delete_dialog_messages_by_sender(DialogId dialog_id,DialogId sender_dialog_id,Promise<> promise)1134 void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id, Promise<> promise) {
1135 add_read_query();
1136 promise.set_result(sync_db_->delete_dialog_messages_by_sender(dialog_id, sender_dialog_id));
1137 }
1138
get_message(FullMessageId full_message_id,Promise<MessagesDbDialogMessage> promise)1139 void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) {
1140 add_read_query();
1141 promise.set_result(sync_db_->get_message(full_message_id));
1142 }
get_message_by_unique_message_id(ServerMessageId unique_message_id,Promise<MessagesDbMessage> promise)1143 void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessagesDbMessage> promise) {
1144 add_read_query();
1145 promise.set_result(sync_db_->get_message_by_unique_message_id(unique_message_id));
1146 }
get_message_by_random_id(DialogId dialog_id,int64 random_id,Promise<MessagesDbDialogMessage> promise)1147 void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessagesDbDialogMessage> promise) {
1148 add_read_query();
1149 promise.set_result(sync_db_->get_message_by_random_id(dialog_id, random_id));
1150 }
get_dialog_message_by_date(DialogId dialog_id,MessageId first_message_id,MessageId last_message_id,int32 date,Promise<MessagesDbDialogMessage> promise)1151 void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id,
1152 int32 date, Promise<MessagesDbDialogMessage> promise) {
1153 add_read_query();
1154 promise.set_result(sync_db_->get_dialog_message_by_date(dialog_id, first_message_id, last_message_id, date));
1155 }
1156
get_dialog_message_calendar(MessagesDbDialogCalendarQuery query,Promise<MessagesDbCalendar> promise)1157 void get_dialog_message_calendar(MessagesDbDialogCalendarQuery query, Promise<MessagesDbCalendar> promise) {
1158 add_read_query();
1159 promise.set_result(sync_db_->get_dialog_message_calendar(std::move(query)));
1160 }
1161
get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,Promise<MessagesDbMessagePositions> promise)1162 void get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,
1163 Promise<MessagesDbMessagePositions> promise) {
1164 add_read_query();
1165 promise.set_result(sync_db_->get_dialog_sparse_message_positions(std::move(query)));
1166 }
1167
get_messages(MessagesDbMessagesQuery query,Promise<vector<MessagesDbDialogMessage>> promise)1168 void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) {
1169 add_read_query();
1170 promise.set_result(sync_db_->get_messages(std::move(query)));
1171 }
get_scheduled_messages(DialogId dialog_id,int32 limit,Promise<vector<MessagesDbDialogMessage>> promise)1172 void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessagesDbDialogMessage>> promise) {
1173 add_read_query();
1174 promise.set_result(sync_db_->get_scheduled_messages(dialog_id, limit));
1175 }
get_messages_from_notification_id(DialogId dialog_id,NotificationId from_notification_id,int32 limit,Promise<vector<MessagesDbDialogMessage>> promise)1176 void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
1177 Promise<vector<MessagesDbDialogMessage>> promise) {
1178 add_read_query();
1179 promise.set_result(sync_db_->get_messages_from_notification_id(dialog_id, from_notification_id, limit));
1180 }
get_calls(MessagesDbCallsQuery query,Promise<MessagesDbCallsResult> promise)1181 void get_calls(MessagesDbCallsQuery query, Promise<MessagesDbCallsResult> promise) {
1182 add_read_query();
1183 promise.set_result(sync_db_->get_calls(std::move(query)));
1184 }
get_messages_fts(MessagesDbFtsQuery query,Promise<MessagesDbFtsResult> promise)1185 void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) {
1186 add_read_query();
1187 promise.set_result(sync_db_->get_messages_fts(std::move(query)));
1188 }
get_expiring_messages(int32 expires_from,int32 expires_till,int32 limit,Promise<std::pair<vector<MessagesDbMessage>,int32>> promise)1189 void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
1190 Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) {
1191 add_read_query();
1192 promise.set_result(sync_db_->get_expiring_messages(expires_from, expires_till, limit));
1193 }
1194
close(Promise<> promise)1195 void close(Promise<> promise) {
1196 do_flush();
1197 sync_db_safe_.reset();
1198 sync_db_ = nullptr;
1199 promise.set_value(Unit());
1200 stop();
1201 }
1202
force_flush()1203 void force_flush() {
1204 LOG(INFO) << "MessagesDb flushed";
1205 do_flush();
1206 }
1207
1208 private:
1209 std::shared_ptr<MessagesDbSyncSafeInterface> sync_db_safe_;
1210 MessagesDbSyncInterface *sync_db_ = nullptr;
1211
1212 static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
1213 static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
1214
1215 //NB: order is important, destructor of pending_writes_ will change pending_write_results_
1216 vector<std::pair<Promise<>, Status>> pending_write_results_;
1217 vector<Promise<>> pending_writes_;
1218 double wakeup_at_ = 0;
1219 template <class F>
add_write_query(F && f)1220 void add_write_query(F &&f) {
1221 pending_writes_.push_back(PromiseCreator::lambda(std::forward<F>(f), PromiseCreator::Ignore()));
1222 if (pending_writes_.size() > MAX_PENDING_QUERIES_COUNT) {
1223 do_flush();
1224 wakeup_at_ = 0;
1225 } else if (wakeup_at_ == 0) {
1226 wakeup_at_ = Time::now_cached() + MAX_PENDING_QUERIES_DELAY;
1227 }
1228 if (wakeup_at_ != 0) {
1229 set_timeout_at(wakeup_at_);
1230 }
1231 }
add_read_query()1232 void add_read_query() {
1233 do_flush();
1234 }
do_flush()1235 void do_flush() {
1236 if (pending_writes_.empty()) {
1237 return;
1238 }
1239 sync_db_->begin_write_transaction().ensure();
1240 for (auto &query : pending_writes_) {
1241 query.set_value(Unit());
1242 }
1243 sync_db_->commit_transaction().ensure();
1244 pending_writes_.clear();
1245 for (auto &p : pending_write_results_) {
1246 p.first.set_result(std::move(p.second));
1247 }
1248 pending_write_results_.clear();
1249 cancel_timeout();
1250 }
timeout_expired()1251 void timeout_expired() final {
1252 do_flush();
1253 }
1254
start_up()1255 void start_up() final {
1256 sync_db_ = &sync_db_safe_->get();
1257 }
1258 };
1259 ActorOwn<Impl> impl_;
1260 };
1261
create_messages_db_async(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,int32 scheduler_id)1262 std::shared_ptr<MessagesDbAsyncInterface> create_messages_db_async(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,
1263 int32 scheduler_id) {
1264 return std::make_shared<MessagesDbAsync>(std::move(sync_db), scheduler_id);
1265 }
1266
1267 } // namespace td
1268