1# Copyright (C) 2003-2014 Yann Leboulanger <asterix AT lagaule.org> 2# Copyright (C) 2004-2005 Vincent Hanquez <tab AT snarc.org> 3# Copyright (C) 2005-2006 Nikos Kouremenos <kourem AT gmail.com> 4# Copyright (C) 2006 Dimitur Kirov <dkirov AT gmail.com> 5# Copyright (C) 2006-2008 Jean-Marie Traissard <jim AT lapin.org> 6# Copyright (C) 2007 Tomasz Melcer <liori AT exroot.org> 7# Julien Pivotto <roidelapluie AT gmail.com> 8# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com> 9# 10# This file is part of Gajim. 11# 12# Gajim is free software; you can redistribute it and/or modify 13# it under the terms of the GNU General Public License as published 14# by the Free Software Foundation; version 3 only. 15# 16# Gajim is distributed in the hope that it will be useful, 17# but WITHOUT ANY WARRANTY; without even the implied warranty of 18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19# GNU General Public License for more details. 20# 21# You should have received a copy of the GNU General Public License 22# along with Gajim. If not, see <http://www.gnu.org/licenses/>. 23 24import time 25import datetime 26import calendar 27import json 28import logging 29import sqlite3 as sqlite 30from collections import namedtuple 31 32from gajim.common import app 33from gajim.common import configpaths 34from gajim.common.helpers import AdditionalDataDict 35from gajim.common.const import ShowConstant 36from gajim.common.const import KindConstant 37from gajim.common.const import JIDConstant 38 39from gajim.common.storage.base import SqliteStorage 40from gajim.common.storage.base import timeit 41 42 43CURRENT_USER_VERSION = 6 44 45ARCHIVE_SQL_STATEMENT = ''' 46 CREATE TABLE jids( 47 jid_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, 48 jid TEXT UNIQUE, 49 type INTEGER 50 ); 51 CREATE TABLE unread_messages( 52 message_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, 53 jid_id INTEGER, 54 shown BOOLEAN default 0 55 ); 56 CREATE INDEX idx_unread_messages_jid_id ON unread_messages (jid_id); 57 CREATE TABLE logs( 58 log_line_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, 59 account_id INTEGER, 60 jid_id INTEGER, 61 contact_name TEXT, 62 time INTEGER, 63 kind INTEGER, 64 show INTEGER, 65 message TEXT, 66 error TEXT, 67 subject TEXT, 68 additional_data TEXT, 69 stanza_id TEXT, 70 message_id TEXT, 71 encryption TEXT, 72 encryption_state TEXT, 73 marker INTEGER 74 ); 75 CREATE TABLE last_archive_message( 76 jid_id INTEGER PRIMARY KEY UNIQUE, 77 last_mam_id TEXT, 78 oldest_mam_timestamp TEXT, 79 last_muc_timestamp TEXT 80 ); 81 CREATE INDEX idx_logs_jid_id_time ON logs (jid_id, time DESC); 82 CREATE INDEX idx_logs_stanza_id ON logs (stanza_id); 83 CREATE INDEX idx_logs_message_id ON logs (message_id); 84 PRAGMA user_version=%s; 85 ''' % CURRENT_USER_VERSION 86 87 88log = logging.getLogger('gajim.c.storage.archive') 89 90 91class MessageArchiveStorage(SqliteStorage): 92 def __init__(self): 93 SqliteStorage.__init__(self, 94 log, 95 configpaths.get('LOG_DB'), 96 ARCHIVE_SQL_STATEMENT) 97 98 self._jid_ids = {} 99 self._jid_ids_reversed = {} 100 101 def init(self, **kwargs): 102 SqliteStorage.init(self, 103 detect_types=sqlite.PARSE_COLNAMES) 104 105 self._set_journal_mode('WAL') 106 self._enable_secure_delete() 107 108 self._con.row_factory = self._namedtuple_factory 109 110 self._con.create_function("like", 1, self._like) 111 self._con.create_function("get_timeout", 0, self._get_timeout) 112 113 self._get_jid_ids_from_db() 114 self._cleanup_chat_history() 115 116 def _namedtuple_factory(self, cursor, row): 117 fields = [col[0] for col in cursor.description] 118 Row = namedtuple("Row", fields) 119 named_row = Row(*row) 120 if 'additional_data' in fields: 121 _dict = json.loads(named_row.additional_data or '{}') 122 named_row = named_row._replace( 123 additional_data=AdditionalDataDict(_dict)) 124 125 # if an alias `account` for the field `account_id` is used for the 126 # query, the account_id is converted to the account jid 127 if 'account' in fields: 128 if named_row.account: 129 jid = self._jid_ids_reversed[named_row.account].jid 130 named_row = named_row._replace(account=jid) 131 return named_row 132 133 def _migrate(self): 134 user_version = self.user_version 135 if user_version == 0: 136 # All migrations from 0.16.9 until 1.0.0 137 statements = [ 138 'ALTER TABLE logs ADD COLUMN "account_id" INTEGER', 139 'ALTER TABLE logs ADD COLUMN "stanza_id" TEXT', 140 'ALTER TABLE logs ADD COLUMN "encryption" TEXT', 141 'ALTER TABLE logs ADD COLUMN "encryption_state" TEXT', 142 'ALTER TABLE logs ADD COLUMN "marker" INTEGER', 143 'ALTER TABLE logs ADD COLUMN "additional_data" TEXT', 144 '''CREATE TABLE IF NOT EXISTS last_archive_message( 145 jid_id INTEGER PRIMARY KEY UNIQUE, 146 last_mam_id TEXT, 147 oldest_mam_timestamp TEXT, 148 last_muc_timestamp TEXT 149 )''', 150 151 '''CREATE INDEX IF NOT EXISTS idx_logs_stanza_id 152 ON logs(stanza_id)''', 153 'PRAGMA user_version=1' 154 ] 155 156 self._execute_multiple(statements) 157 158 if user_version < 2: 159 statements = [ 160 'ALTER TABLE last_archive_message ADD COLUMN "sync_threshold" INTEGER', 161 'PRAGMA user_version=2' 162 ] 163 self._execute_multiple(statements) 164 165 if user_version < 3: 166 statements = [ 167 'ALTER TABLE logs ADD COLUMN "message_id" TEXT', 168 'PRAGMA user_version=3' 169 ] 170 self._execute_multiple(statements) 171 172 if user_version < 4: 173 statements = [ 174 'ALTER TABLE logs ADD COLUMN "error" TEXT', 175 'PRAGMA user_version=4' 176 ] 177 self._execute_multiple(statements) 178 179 if user_version < 5: 180 statements = [ 181 'CREATE INDEX idx_logs_message_id ON logs (message_id)', 182 'PRAGMA user_version=5' 183 ] 184 self._execute_multiple(statements) 185 186 @staticmethod 187 def dispatch(event, error): 188 app.ged.raise_event(event, None, str(error)) 189 190 @staticmethod 191 def _get_timeout(): 192 """ 193 returns the timeout in epoch 194 """ 195 timeout = app.settings.get('restore_timeout') 196 197 now = int(time.time()) 198 if timeout > 0: 199 timeout = now - (timeout * 60) 200 return timeout 201 202 @staticmethod 203 def _like(search_str): 204 return '%{}%'.format(search_str) 205 206 @timeit 207 def _get_jid_ids_from_db(self): 208 """ 209 Load all jid/jid_id tuples into a dict for faster access 210 """ 211 rows = self._con.execute( 212 'SELECT jid_id, jid, type FROM jids').fetchall() 213 for row in rows: 214 self._jid_ids[row.jid] = row 215 self._jid_ids_reversed[row.jid_id] = row 216 217 def get_jids_in_db(self): 218 return self._jid_ids.keys() 219 220 def jid_is_from_pm(self, jid): 221 """ 222 If jid is gajim@conf/nkour it's likely a pm one, how we know gajim@conf 223 is not a normal guy and nkour is not his resource? we ask if gajim@conf 224 is already in jids (with type room jid) this fails if user disables 225 logging for room and only enables for pm (so highly unlikely) and if we 226 fail we do not go chaos (user will see the first pm as if it was message 227 in room's public chat) and after that all okay 228 """ 229 if jid.find('/') > -1: 230 possible_room_jid = jid.split('/', 1)[0] 231 return self.jid_is_room_jid(possible_room_jid) 232 # it's not a full jid, so it's not a pm one 233 return False 234 235 def jid_is_room_jid(self, jid): 236 """ 237 Return True if it's a room jid, False if it's not, None if we don't know 238 """ 239 jid_ = self._jid_ids.get(jid) 240 if jid_ is None: 241 return None 242 return jid_.type == JIDConstant.ROOM_TYPE 243 244 @staticmethod 245 def _get_family_jids(account, jid): 246 """ 247 Get all jids of the metacontacts family 248 249 :param account: The account 250 251 :param jid: The JID 252 253 returns a list of JIDs' 254 """ 255 family = app.contacts.get_metacontacts_family(account, jid) 256 if family: 257 return [user['jid'] for user in family] 258 return [jid] 259 260 def get_account_id(self, account, type_=JIDConstant.NORMAL_TYPE): 261 jid = app.get_jid_from_account(account) 262 return self.get_jid_id(jid, type_=type_) 263 264 @timeit 265 def get_jid_id(self, jid, kind=None, type_=None): 266 """ 267 Get the jid id from a jid. 268 In case the jid id is not found create a new one. 269 270 :param jid: The JID 271 272 :param kind: The KindConstant 273 274 :param type_: The JIDConstant 275 276 return the jid id 277 """ 278 279 if kind in (KindConstant.GC_MSG, KindConstant.GCSTATUS): 280 type_ = JIDConstant.ROOM_TYPE 281 elif kind is not None: 282 type_ = JIDConstant.NORMAL_TYPE 283 284 result = self._jid_ids.get(jid, None) 285 if result is not None: 286 return result.jid_id 287 288 sql = 'SELECT jid_id, jid, type FROM jids WHERE jid = ?' 289 row = self._con.execute(sql, [jid]).fetchone() 290 if row is not None: 291 self._jid_ids[jid] = row 292 return row.jid_id 293 294 if type_ is None: 295 raise ValueError( 296 'Unable to insert new JID because type is missing') 297 298 sql = 'INSERT INTO jids (jid, type) VALUES (?, ?)' 299 lastrowid = self._con.execute(sql, (jid, type_)).lastrowid 300 Row = namedtuple('Row', 'jid_id jid type') 301 self._jid_ids[jid] = Row(lastrowid, jid, type_) 302 self._delayed_commit() 303 return lastrowid 304 305 @staticmethod 306 def convert_show_values_to_db_api_values(show): 307 """ 308 Convert from string style to constant ints for db 309 """ 310 311 if show == 'online': 312 return ShowConstant.ONLINE 313 if show == 'chat': 314 return ShowConstant.CHAT 315 if show == 'away': 316 return ShowConstant.AWAY 317 if show == 'xa': 318 return ShowConstant.XA 319 if show == 'dnd': 320 return ShowConstant.DND 321 if show == 'offline': 322 return ShowConstant.OFFLINE 323 if show is None: 324 return ShowConstant.ONLINE 325 # invisible in GC when someone goes invisible 326 # it's a RFC violation .... but we should not crash 327 return None 328 329 @timeit 330 def insert_unread_events(self, message_id, jid_id): 331 """ 332 Add unread message with id: message_id 333 """ 334 sql = '''INSERT INTO unread_messages (message_id, jid_id, shown) 335 VALUES (?, ?, 0)''' 336 self._con.execute(sql, (message_id, jid_id)) 337 self._delayed_commit() 338 339 @timeit 340 def set_read_messages(self, message_ids): 341 """ 342 Mark all messages with ids in message_ids as read 343 """ 344 ids = ','.join([str(i) for i in message_ids]) 345 sql = 'DELETE FROM unread_messages WHERE message_id IN (%s)' % ids 346 self._con.execute(sql) 347 self._delayed_commit() 348 349 @timeit 350 def set_shown_unread_msgs(self, msg_log_id): 351 """ 352 Mark unread message as shown un GUI 353 """ 354 sql = 'UPDATE unread_messages SET shown = 1 where message_id = %s' % \ 355 msg_log_id 356 self._con.execute(sql) 357 self._delayed_commit() 358 359 @timeit 360 def reset_shown_unread_messages(self): 361 """ 362 Set shown field to False in unread_messages table 363 """ 364 sql = 'UPDATE unread_messages SET shown = 0' 365 self._con.execute(sql) 366 self._delayed_commit() 367 368 @timeit 369 def get_unread_msgs(self): 370 """ 371 Get all unread messages 372 """ 373 all_messages = [] 374 try: 375 unread_results = self._con.execute( 376 'SELECT message_id, shown from unread_messages').fetchall() 377 except Exception: 378 unread_results = [] 379 for message in unread_results: 380 msg_log_id = message.message_id 381 shown = message.shown 382 # here we get infos for that message, and related jid from jids table 383 # do NOT change order of SELECTed things, unless you change function(s) 384 # that called this function 385 result = self._con.execute(''' 386 SELECT logs.log_line_id, logs.message, logs.time, logs.subject, 387 jids.jid, logs.additional_data 388 FROM logs, jids 389 WHERE logs.log_line_id = %d AND logs.jid_id = jids.jid_id 390 ''' % msg_log_id 391 ).fetchone() 392 if result is None: 393 # Log line is no more in logs table. 394 # remove it from unread_messages 395 self.set_read_messages([msg_log_id]) 396 continue 397 398 all_messages.append((result, shown)) 399 return all_messages 400 401 @timeit 402 def load_groupchat_messages(self, account, jid): 403 account_id = self.get_account_id(account, type_=JIDConstant.ROOM_TYPE) 404 405 sql = ''' 406 SELECT time, contact_name, message, additional_data, message_id 407 FROM logs NATURAL JOIN jids WHERE jid = ? 408 AND account_id = ? AND kind = ? 409 ORDER BY time DESC, log_line_id DESC LIMIT ?''' 410 411 messages = self._con.execute( 412 sql, (jid, account_id, KindConstant.GC_MSG, 50)).fetchall() 413 414 messages.reverse() 415 return messages 416 417 @timeit 418 def get_last_conversation_lines(self, account, jid, pending): 419 """ 420 Get recent messages 421 422 Pending messages are already in queue to be printed when the 423 ChatControl is opened, so we don’t want to request those messages. 424 How many messages are requested depends on the 'restore_lines' 425 config value. How far back in time messages are requested depends on 426 _get_timeout(). 427 428 :param account: The account 429 430 :param jid: The jid from which we request the conversation lines 431 432 :param pending: How many messages are currently pending so we don’t 433 request those messages 434 435 returns a list of namedtuples 436 """ 437 438 restore = app.settings.get('restore_lines') 439 if restore <= 0: 440 return [] 441 442 kinds = map(str, [KindConstant.SINGLE_MSG_RECV, 443 KindConstant.SINGLE_MSG_SENT, 444 KindConstant.CHAT_MSG_RECV, 445 KindConstant.CHAT_MSG_SENT, 446 KindConstant.ERROR]) 447 448 jids = self._get_family_jids(account, jid) 449 account_id = self.get_account_id(account) 450 451 sql = ''' 452 SELECT time, kind, message, error as "error [common_error]", 453 subject, additional_data, marker as "marker [marker]", 454 message_id 455 FROM logs NATURAL JOIN jids WHERE jid IN ({jids}) 456 AND account_id = {account_id} AND kind IN ({kinds}) 457 AND time > get_timeout() 458 ORDER BY time DESC, log_line_id DESC LIMIT ? OFFSET ? 459 '''.format(jids=', '.join('?' * len(jids)), 460 account_id=account_id, 461 kinds=', '.join(kinds)) 462 463 messages = self._con.execute( 464 sql, tuple(jids) + (restore, pending)).fetchall() 465 466 messages.reverse() 467 return messages 468 469 @timeit 470 def get_conversation_for_date(self, account, jid, date): 471 """ 472 Load the complete conversation with a given jid on a specific date 473 474 :param account: The account 475 476 :param jid: The jid for which we request the conversation 477 478 :param date: datetime.datetime instance 479 example: datetime.datetime(year, month, day) 480 481 returns a list of namedtuples 482 """ 483 484 jids = self._get_family_jids(account, jid) 485 486 delta = datetime.timedelta( 487 hours=23, minutes=59, seconds=59, microseconds=999999) 488 489 sql = ''' 490 SELECT contact_name, time, kind, show, message, subject, 491 additional_data, log_line_id 492 FROM logs NATURAL JOIN jids WHERE jid IN ({jids}) 493 AND time BETWEEN ? AND ? 494 ORDER BY time, log_line_id 495 '''.format(jids=', '.join('?' * len(jids))) 496 497 return self._con.execute(sql, tuple(jids) + 498 (date.timestamp(), 499 (date + delta).timestamp())).fetchall() 500 501 @timeit 502 def search_log(self, account, jid, query, date=None): 503 """ 504 Search the conversation log for messages containing the `query` string. 505 506 The search can either span the complete log for the given 507 `account` and `jid` or be restricted to a single day by 508 specifying `date`. 509 510 :param account: The account 511 512 :param jid: The jid for which we request the conversation 513 514 :param query: A search string 515 516 :param date: datetime.datetime instance 517 example: datetime.datetime(year, month, day) 518 519 returns a list of namedtuples 520 """ 521 jids = self._get_family_jids(account, jid) 522 523 if date: 524 delta = datetime.timedelta( 525 hours=23, minutes=59, seconds=59, microseconds=999999) 526 527 between = ''' 528 AND time BETWEEN {start} AND {end} 529 '''.format(start=date.timestamp(), 530 end=(date + delta).timestamp()) 531 532 sql = ''' 533 SELECT contact_name, time, kind, show, message, subject, 534 additional_data, log_line_id 535 FROM logs NATURAL JOIN jids WHERE jid IN ({jids}) 536 AND message LIKE like(?) {date_search} 537 ORDER BY time DESC, log_line_id 538 '''.format(jids=', '.join('?' * len(jids)), 539 date_search=between if date else '') 540 541 return self._con.execute(sql, tuple(jids) + (query,)).fetchall() 542 543 @timeit 544 def get_days_with_logs(self, account, jid, year, month): 545 """ 546 Request the days in a month where we received messages 547 for a given `jid`. 548 549 :param account: The account 550 551 :param jid: The jid for which we request the days 552 553 :param year: The year 554 555 :param month: The month 556 557 returns a list of namedtuples 558 """ 559 jids = self._get_family_jids(account, jid) 560 561 kinds = map(str, [KindConstant.STATUS, 562 KindConstant.GCSTATUS]) 563 564 # Calculate the start and end datetime of the month 565 date = datetime.datetime(year, month, 1) 566 days = calendar.monthrange(year, month)[1] - 1 567 delta = datetime.timedelta( 568 days=days, hours=23, minutes=59, seconds=59, microseconds=999999) 569 570 sql = """ 571 SELECT DISTINCT 572 CAST(strftime('%d', time, 'unixepoch', 'localtime') AS INTEGER) 573 AS day FROM logs NATURAL JOIN jids WHERE jid IN ({jids}) 574 AND time BETWEEN ? AND ? 575 AND kind NOT IN ({kinds}) 576 ORDER BY time 577 """.format(jids=', '.join('?' * len(jids)), 578 kinds=', '.join(kinds)) 579 580 return self._con.execute(sql, tuple(jids) + 581 (date.timestamp(), 582 (date + delta).timestamp())).fetchall() 583 584 @timeit 585 def get_last_date_that_has_logs(self, account, jid): 586 """ 587 Get the timestamp of the last message we received for the jid. 588 589 :param account: The account 590 591 :param jid: The jid for which we request the last timestamp 592 593 returns a timestamp or None 594 """ 595 jids = self._get_family_jids(account, jid) 596 597 kinds = map(str, [KindConstant.STATUS, 598 KindConstant.GCSTATUS]) 599 600 sql = ''' 601 SELECT MAX(time) as time FROM logs 602 NATURAL JOIN jids WHERE jid IN ({jids}) 603 AND kind NOT IN ({kinds}) 604 '''.format(jids=', '.join('?' * len(jids)), 605 kinds=', '.join(kinds)) 606 607 # fetchone() returns always at least one Row with all 608 # attributes set to None because of the MAX() function 609 return self._con.execute(sql, tuple(jids)).fetchone().time 610 611 @timeit 612 def get_first_date_that_has_logs(self, account, jid): 613 """ 614 Get the timestamp of the first message we received for the jid. 615 616 :param account: The account 617 618 :param jid: The jid for which we request the first timestamp 619 620 returns a timestamp or None 621 """ 622 jids = self._get_family_jids(account, jid) 623 624 kinds = map(str, [KindConstant.STATUS, 625 KindConstant.GCSTATUS]) 626 627 sql = ''' 628 SELECT MIN(time) as time FROM logs 629 NATURAL JOIN jids WHERE jid IN ({jids}) 630 AND kind NOT IN ({kinds}) 631 '''.format(jids=', '.join('?' * len(jids)), 632 kinds=', '.join(kinds)) 633 634 # fetchone() returns always at least one Row with all 635 # attributes set to None because of the MIN() function 636 return self._con.execute(sql, tuple(jids)).fetchone().time 637 638 @timeit 639 def get_date_has_logs(self, account, jid, date): 640 """ 641 Get single timestamp of a message we received for the jid 642 in the time range of one day. 643 644 :param account: The account 645 646 :param jid: The jid for which we request the first timestamp 647 648 :param date: datetime.datetime instance 649 example: datetime.datetime(year, month, day) 650 651 returns a timestamp or None 652 """ 653 jids = self._get_family_jids(account, jid) 654 655 delta = datetime.timedelta( 656 hours=23, minutes=59, seconds=59, microseconds=999999) 657 658 start = date.timestamp() 659 end = (date + delta).timestamp() 660 661 sql = ''' 662 SELECT time 663 FROM logs NATURAL JOIN jids WHERE jid IN ({jids}) 664 AND time BETWEEN ? AND ? 665 '''.format(jids=', '.join('?' * len(jids))) 666 667 return self._con.execute( 668 sql, tuple(jids) + (start, end)).fetchone() 669 670 @timeit 671 def deduplicate_muc_message(self, account, jid, resource, 672 timestamp, message_id): 673 """ 674 Check if a message is already in the `logs` table 675 676 :param account: The account 677 678 :param jid: The muc jid as string 679 680 :param resource: The resource 681 682 :param timestamp: The timestamp in UTC epoch 683 684 :param message_id: The message-id 685 """ 686 687 # Add 60 seconds around the timestamp 688 start_time = timestamp - 60 689 end_time = timestamp + 60 690 691 account_id = self.get_account_id(account) 692 log.debug('Search for MUC duplicate') 693 log.debug('start: %s, end: %s, jid: %s, resource: %s, message-id: %s', 694 start_time, end_time, jid, resource, message_id) 695 696 sql = ''' 697 SELECT * FROM logs 698 NATURAL JOIN jids WHERE 699 jid = ? AND 700 contact_name = ? AND 701 message_id = ? AND 702 account_id = ? AND 703 time BETWEEN ? AND ? 704 ''' 705 706 result = self._con.execute(sql, (jid, 707 resource, 708 message_id, 709 account_id, 710 start_time, 711 end_time)).fetchone() 712 713 if result is not None: 714 log.debug('Found duplicate') 715 return True 716 return False 717 718 @timeit 719 def find_stanza_id(self, account, archive_jid, stanza_id, origin_id=None, 720 groupchat=False): 721 """ 722 Checks if a stanza-id is already in the `logs` table 723 724 :param account: The account 725 726 :param archive_jid: The jid of the archive the stanza-id belongs to 727 only used if groupchat=True 728 729 :param stanza_id: The stanza-id 730 731 :param origin_id: The origin-id 732 733 :param groupchat: stanza-id is from a groupchat 734 735 return True if the stanza-id was found 736 """ 737 ids = [] 738 if stanza_id is not None: 739 ids.append(stanza_id) 740 if origin_id is not None: 741 ids.append(origin_id) 742 743 if not ids: 744 return False 745 746 type_ = JIDConstant.NORMAL_TYPE 747 if groupchat: 748 type_ = JIDConstant.ROOM_TYPE 749 750 archive_id = self.get_jid_id(archive_jid, type_=type_) 751 account_id = self.get_account_id(account) 752 753 if groupchat: 754 # Stanza ID is only unique within a specific archive. 755 # So a Stanza ID could be repeated in different MUCs, so we 756 # filter also for the archive JID which is the bare MUC jid. 757 758 # Use Unary-"+" operator for "jid_id", otherwise the 759 # idx_logs_jid_id_time index is used instead of the much better 760 # idx_logs_stanza_id index 761 sql = ''' 762 SELECT stanza_id FROM logs 763 WHERE stanza_id IN ({values}) 764 AND +jid_id = ? AND account_id = ? LIMIT 1 765 '''.format(values=', '.join('?' * len(ids))) 766 result = self._con.execute( 767 sql, tuple(ids) + (archive_id, account_id)).fetchone() 768 else: 769 sql = ''' 770 SELECT stanza_id FROM logs 771 WHERE stanza_id IN ({values}) AND account_id = ? AND kind != ? LIMIT 1 772 '''.format(values=', '.join('?' * len(ids))) 773 result = self._con.execute( 774 sql, tuple(ids) + (account_id, KindConstant.GC_MSG)).fetchone() 775 776 if result is not None: 777 log.info('Found duplicated message, stanza-id: %s, origin-id: %s, ' 778 'archive-jid: %s, account: %s', stanza_id, origin_id, archive_jid, account_id) 779 return True 780 return False 781 782 def insert_jid(self, jid, kind=None, type_=JIDConstant.NORMAL_TYPE): 783 """ 784 Insert a new jid into the `jids` table. 785 This is an alias of get_jid_id() for better readablility. 786 787 :param jid: The jid as string 788 789 :param kind: A KindConstant 790 791 :param type_: A JIDConstant 792 """ 793 return self.get_jid_id(jid, kind, type_) 794 795 @timeit 796 def insert_into_logs(self, account, jid, time_, kind, 797 unread=True, **kwargs): 798 """ 799 Insert a new message into the `logs` table 800 801 :param jid: The jid as string 802 803 :param time_: The timestamp in UTC epoch 804 805 :param kind: A KindConstant 806 807 :param unread: If True the message is added to the`unread_messages` 808 table. Only if kind == CHAT_MSG_RECV 809 810 :param kwargs: Every additional named argument must correspond to 811 a field in the `logs` table 812 """ 813 jid_id = self.get_jid_id(jid, kind=kind) 814 account_id = self.get_account_id(account) 815 816 if 'additional_data' in kwargs: 817 if not kwargs['additional_data']: 818 del kwargs['additional_data'] 819 else: 820 serialized_dict = json.dumps(kwargs["additional_data"].data) 821 kwargs['additional_data'] = serialized_dict 822 823 sql = ''' 824 INSERT INTO logs (account_id, jid_id, time, kind, {columns}) 825 VALUES (?, ?, ?, ?, {values}) 826 '''.format(columns=', '.join(kwargs.keys()), 827 values=', '.join('?' * len(kwargs))) 828 829 lastrowid = self._con.execute( 830 sql, (account_id, jid_id, time_, kind) + tuple(kwargs.values())).lastrowid 831 832 log.info('Insert into DB: jid: %s, time: %s, kind: %s, stanza_id: %s', 833 jid, time_, kind, kwargs.get('stanza_id', None)) 834 835 if unread and kind == KindConstant.CHAT_MSG_RECV: 836 sql = '''INSERT INTO unread_messages (message_id, jid_id) 837 VALUES (?, (SELECT jid_id FROM jids WHERE jid = ?))''' 838 self._con.execute(sql, (lastrowid, jid)) 839 840 self._delayed_commit() 841 842 return lastrowid 843 844 @timeit 845 def set_message_error(self, account_jid, jid, message_id, error): 846 """ 847 Update the corresponding message with the error 848 849 :param account_jid: The jid of the account 850 851 :param jid: The jid that belongs to the avatar 852 853 :param message_id: The id of the message 854 855 :param error: The error stanza as string 856 857 """ 858 859 account_id = self.get_jid_id(account_jid) 860 try: 861 jid_id = self.get_jid_id(str(jid)) 862 except ValueError: 863 # Unknown JID 864 return 865 866 sql = ''' 867 UPDATE logs SET error = ? 868 WHERE account_id = ? AND jid_id = ? AND message_id = ? 869 ''' 870 self._con.execute(sql, (error, account_id, jid_id, message_id)) 871 self._delayed_commit() 872 873 @timeit 874 def set_marker(self, account_jid, jid, message_id, state): 875 """ 876 Update the marker state of the corresponding message 877 878 :param account_jid: The jid of the account 879 880 :param jid: The jid that belongs to the avatar 881 882 :param message_id: The id of the message 883 884 :param state: The state, 'received' or 'displayed' 885 886 """ 887 if state not in ('received', 'displayed'): 888 raise ValueError('Invalid marker state') 889 890 account_id = self.get_jid_id(account_jid) 891 try: 892 jid_id = self.get_jid_id(str(jid)) 893 except ValueError: 894 # Unknown JID 895 return 896 897 state = 0 if state == 'received' else 1 898 899 sql = ''' 900 UPDATE logs SET marker = ? 901 WHERE account_id = ? AND jid_id = ? AND message_id = ? 902 ''' 903 self._con.execute(sql, (state, account_id, jid_id, message_id)) 904 self._delayed_commit() 905 906 @timeit 907 def get_archive_infos(self, jid): 908 """ 909 Get the archive infos 910 911 :param jid: The jid that belongs to the avatar 912 913 """ 914 jid_id = self.get_jid_id(jid, type_=JIDConstant.ROOM_TYPE) 915 sql = '''SELECT * FROM last_archive_message WHERE jid_id = ?''' 916 return self._con.execute(sql, (jid_id,)).fetchone() 917 918 @timeit 919 def set_archive_infos(self, jid, **kwargs): 920 """ 921 Set archive infos 922 923 :param jid: The jid that belongs to the avatar 924 925 :param last_mam_id: The last MAM result id 926 927 :param oldest_mam_timestamp: The oldest date we requested MAM 928 history for 929 930 :param last_muc_timestamp: The timestamp of the last message we 931 received in a MUC 932 933 :param sync_threshold: The max days that we request from a 934 MUC archive 935 936 """ 937 jid_id = self.get_jid_id(jid) 938 exists = self.get_archive_infos(jid) 939 if not exists: 940 sql = '''INSERT INTO last_archive_message 941 (jid_id, last_mam_id, oldest_mam_timestamp, 942 last_muc_timestamp) 943 VALUES (?, ?, ?, ?)''' 944 self._con.execute(sql, ( 945 jid_id, 946 kwargs.get('last_mam_id', None), 947 kwargs.get('oldest_mam_timestamp', None), 948 kwargs.get('last_muc_timestamp', None), 949 )) 950 else: 951 for key, value in list(kwargs.items()): 952 if value is None: 953 del kwargs[key] 954 955 args = ' = ?, '.join(kwargs.keys()) + ' = ?' 956 sql = '''UPDATE last_archive_message SET {} 957 WHERE jid_id = ?'''.format(args) 958 self._con.execute(sql, tuple(kwargs.values()) + (jid_id,)) 959 log.info('Set message archive info: %s %s', jid, kwargs) 960 self._delayed_commit() 961 962 @timeit 963 def reset_archive_infos(self, jid): 964 """ 965 Set archive infos 966 967 :param jid: The jid of the archive 968 969 """ 970 jid_id = self.get_jid_id(jid) 971 sql = '''UPDATE last_archive_message 972 SET last_mam_id = NULL, oldest_mam_timestamp = NULL, 973 last_muc_timestamp = NULL 974 WHERE jid_id = ?''' 975 self._con.execute(sql, (jid_id,)) 976 log.info('Reset message archive info: %s', jid) 977 self._delayed_commit() 978 979 def _cleanup_chat_history(self): 980 """ 981 Remove messages from account where messages are older than max_age 982 """ 983 for account in app.settings.get_accounts(): 984 max_age = app.settings.get_account_setting( 985 account, 'chat_history_max_age') 986 if max_age == -1: 987 continue 988 account_id = self.get_account_id(account) 989 now = time.time() 990 point_in_time = now - int(max_age) 991 992 sql = 'DELETE FROM logs WHERE account_id = ? AND time < ?' 993 994 cursor = self._con.execute(sql, (account_id, point_in_time)) 995 self._delayed_commit() 996 log.info('Removed %s old messages for %s', cursor.rowcount, account) 997