1# Copyright (C) 2003-2014 Yann Leboulanger <asterix AT lagaule.org> 2# Copyright (C) 2005-2006 Dimitur Kirov <dkirov AT gmail.com> 3# Nikos Kouremenos <kourem AT gmail.com> 4# Copyright (C) 2006 Alex Mauer <hawke AT hawkesnest.net> 5# Copyright (C) 2006-2007 Travis Shirk <travis AT pobox.com> 6# Copyright (C) 2006-2008 Jean-Marie Traissard <jim AT lapin.org> 7# Copyright (C) 2007 Lukas Petrovicky <lukas AT petrovicky.net> 8# James Newton <redshodan AT gmail.com> 9# Julien Pivotto <roidelapluie AT gmail.com> 10# Copyright (C) 2007-2008 Stephan Erb <steve-e AT h3c.de> 11# Copyright (C) 2008 Brendan Taylor <whateley AT gmail.com> 12# Jonathan Schleifer <js-gajim AT webkeks.org> 13# 14# This file is part of Gajim. 15# 16# Gajim is free software; you can redistribute it and/or modify 17# it under the terms of the GNU General Public License as published 18# by the Free Software Foundation; version 3 only. 19# 20# Gajim is distributed in the hope that it will be useful, 21# but WITHOUT ANY WARRANTY; without even the implied warranty of 22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 23# GNU General Public License for more details. 24# 25# You should have received a copy of the GNU General Public License 26# along with Gajim. If not, see <http://www.gnu.org/licenses/>. 27 28from typing import Any # pylint: disable=unused-import 29from typing import Dict # pylint: disable=unused-import 30 31import sys 32import re 33import os 34import subprocess 35import base64 36import hashlib 37import shlex 38import socket 39import logging 40import json 41import copy 42import collections 43import platform 44import functools 45from collections import defaultdict 46import random 47import weakref 48import inspect 49import string 50import webbrowser 51from string import Template 52import urllib 53from urllib.parse import unquote 54from encodings.punycode import punycode_encode 55from functools import wraps 56from pathlib import Path 57from packaging.version import Version as V 58 59from nbxmpp.namespaces import Namespace 60from nbxmpp.const import Role 61from nbxmpp.const import ConnectionProtocol 62from nbxmpp.const import ConnectionType 63from nbxmpp.structs import ProxyData 64from nbxmpp.protocol import JID 65from nbxmpp.protocol import InvalidJid 66from OpenSSL.crypto import load_certificate 67from OpenSSL.crypto import FILETYPE_PEM 68from gi.repository import Gio 69from gi.repository import GLib 70import precis_i18n.codec # pylint: disable=unused-import 71 72from gajim.common import app 73from gajim.common import configpaths 74from gajim.common.i18n import Q_ 75from gajim.common.i18n import _ 76from gajim.common.i18n import ngettext 77from gajim.common.i18n import get_rfc5646_lang 78from gajim.common.const import ShowConstant 79from gajim.common.const import Display 80from gajim.common.const import URIType 81from gajim.common.const import URIAction 82from gajim.common.const import GIO_TLS_ERRORS 83from gajim.common.const import SHOW_LIST 84from gajim.common.regex import INVALID_XML_CHARS_REGEX 85from gajim.common.regex import STH_AT_STH_DOT_STH_REGEX 86from gajim.common.structs import URI 87 88 89log = logging.getLogger('gajim.c.helpers') 90 91special_groups = (_('Transports'), 92 _('Not in contact list'), 93 _('Observers'), 94 _('Group chats')) 95 96URL_REGEX = re.compile( 97 r"(www\.(?!\.)|[a-z][a-z0-9+.-]*://)[^\s<>'\"]+[^!,\.\s<>\)'\"\]]") 98 99 100class InvalidFormat(Exception): 101 pass 102 103 104def parse_jid(jidstring): 105 try: 106 return str(validate_jid(jidstring)) 107 except Exception as error: 108 raise InvalidFormat(error) 109 110def idn_to_ascii(host): 111 """ 112 Convert IDN (Internationalized Domain Names) to ACE (ASCII-compatible 113 encoding) 114 """ 115 from encodings import idna 116 labels = idna.dots.split(host) 117 converted_labels = [] 118 for label in labels: 119 if label: 120 converted_labels.append(idna.ToASCII(label).decode('utf-8')) 121 else: 122 converted_labels.append('') 123 return ".".join(converted_labels) 124 125def ascii_to_idn(host): 126 """ 127 Convert ACE (ASCII-compatible encoding) to IDN (Internationalized Domain 128 Names) 129 """ 130 from encodings import idna 131 labels = idna.dots.split(host) 132 converted_labels = [] 133 for label in labels: 134 converted_labels.append(idna.ToUnicode(label)) 135 return ".".join(converted_labels) 136 137def puny_encode_url(url): 138 _url = url 139 if '//' not in _url: 140 _url = '//' + _url 141 try: 142 o = urllib.parse.urlparse(_url) 143 p_loc = idn_to_ascii(o.hostname) 144 except Exception: 145 log.debug('urlparse failed: %s', url) 146 return False 147 return url.replace(o.hostname, p_loc) 148 149def parse_resource(resource): 150 """ 151 Perform stringprep on resource and return it 152 """ 153 if not resource: 154 return None 155 156 try: 157 return resource.encode('OpaqueString').decode('utf-8') 158 except UnicodeError: 159 raise InvalidFormat('Invalid character in resource.') 160 161def windowsify(word): 162 if os.name == 'nt': 163 return word.capitalize() 164 return word 165 166def get_uf_show(show, use_mnemonic=False): 167 """ 168 Return a userfriendly string for dnd/xa/chat and make all strings 169 translatable 170 171 If use_mnemonic is True, it adds _ so GUI should call with True for 172 accessibility issues 173 """ 174 if isinstance(show, ShowConstant): 175 show = show.name.lower() 176 177 if show == 'dnd': 178 if use_mnemonic: 179 uf_show = _('_Busy') 180 else: 181 uf_show = _('Busy') 182 elif show == 'xa': 183 if use_mnemonic: 184 uf_show = _('_Not Available') 185 else: 186 uf_show = _('Not Available') 187 elif show == 'chat': 188 if use_mnemonic: 189 uf_show = _('_Free for Chat') 190 else: 191 uf_show = _('Free for Chat') 192 elif show == 'online': 193 if use_mnemonic: 194 uf_show = Q_('?user status:_Available') 195 else: 196 uf_show = Q_('?user status:Available') 197 elif show == 'connecting': 198 uf_show = _('Connecting') 199 elif show == 'away': 200 if use_mnemonic: 201 uf_show = _('A_way') 202 else: 203 uf_show = _('Away') 204 elif show == 'offline': 205 if use_mnemonic: 206 uf_show = _('_Offline') 207 else: 208 uf_show = _('Offline') 209 elif show == 'not in roster': 210 uf_show = _('Not in contact list') 211 elif show == 'requested': 212 uf_show = Q_('?contact has status:Unknown') 213 else: 214 uf_show = Q_('?contact has status:Has errors') 215 return uf_show 216 217def get_uf_sub(sub): 218 if sub == 'none': 219 uf_sub = Q_('?Subscription we already have:None') 220 elif sub == 'to': 221 uf_sub = _('To') 222 elif sub == 'from': 223 uf_sub = _('From') 224 elif sub == 'both': 225 uf_sub = _('Both') 226 else: 227 uf_sub = _('Unknown') 228 229 return uf_sub 230 231def get_uf_ask(ask): 232 if ask is None: 233 uf_ask = Q_('?Ask (for Subscription):None') 234 elif ask == 'subscribe': 235 uf_ask = _('Subscribe') 236 else: 237 uf_ask = ask 238 239 return uf_ask 240 241def get_uf_role(role, plural=False): 242 ''' plural determines if you get Moderators or Moderator''' 243 if not isinstance(role, str): 244 role = role.value 245 246 if role == 'none': 247 role_name = Q_('?Group Chat Contact Role:None') 248 elif role == 'moderator': 249 if plural: 250 role_name = _('Moderators') 251 else: 252 role_name = _('Moderator') 253 elif role == 'participant': 254 if plural: 255 role_name = _('Participants') 256 else: 257 role_name = _('Participant') 258 elif role == 'visitor': 259 if plural: 260 role_name = _('Visitors') 261 else: 262 role_name = _('Visitor') 263 return role_name 264 265def get_uf_affiliation(affiliation, plural=False): 266 '''Get a nice and translated affilition for muc''' 267 if not isinstance(affiliation, str): 268 affiliation = affiliation.value 269 270 if affiliation == 'none': 271 affiliation_name = Q_('?Group Chat Contact Affiliation:None') 272 elif affiliation == 'owner': 273 if plural: 274 affiliation_name = _('Owners') 275 else: 276 affiliation_name = _('Owner') 277 elif affiliation == 'admin': 278 if plural: 279 affiliation_name = _('Administrators') 280 else: 281 affiliation_name = _('Administrator') 282 elif affiliation == 'member': 283 if plural: 284 affiliation_name = _('Members') 285 else: 286 affiliation_name = _('Member') 287 return affiliation_name 288 289def get_sorted_keys(adict): 290 keys = sorted(adict.keys()) 291 return keys 292 293def to_one_line(msg): 294 msg = msg.replace('\\', '\\\\') 295 msg = msg.replace('\n', '\\n') 296 # s1 = 'test\ntest\\ntest' 297 # s11 = s1.replace('\\', '\\\\') 298 # s12 = s11.replace('\n', '\\n') 299 # s12 300 # 'test\\ntest\\\\ntest' 301 return msg 302 303def from_one_line(msg): 304 # (?<!\\) is a lookbehind assertion which asks anything but '\' 305 # to match the regexp that follows it 306 307 # So here match '\\n' but not if you have a '\' before that 308 expr = re.compile(r'(?<!\\)\\n') 309 msg = expr.sub('\n', msg) 310 msg = msg.replace('\\\\', '\\') 311 # s12 = 'test\\ntest\\\\ntest' 312 # s13 = re.sub('\n', s12) 313 # s14 s13.replace('\\\\', '\\') 314 # s14 315 # 'test\ntest\\ntest' 316 return msg 317 318def get_uf_chatstate(chatstate): 319 """ 320 Remove chatstate jargon and returns user friendly messages 321 """ 322 if chatstate == 'active': 323 return _('is paying attention to the conversation') 324 if chatstate == 'inactive': 325 return _('is doing something else') 326 if chatstate == 'composing': 327 return _('is composing a message…') 328 if chatstate == 'paused': 329 #paused means he or she was composing but has stopped for a while 330 return _('paused composing a message') 331 if chatstate == 'gone': 332 return _('has closed the chat window or tab') 333 return '' 334 335def exec_command(command, use_shell=False, posix=True): 336 """ 337 execute a command. if use_shell is True, we run the command as is it was 338 typed in a console. So it may be dangerous if you are not sure about what 339 is executed. 340 """ 341 if use_shell: 342 subprocess.Popen('%s &' % command, shell=True).wait() 343 else: 344 args = shlex.split(command, posix=posix) 345 process = subprocess.Popen(args) 346 app.thread_interface(process.wait) 347 348def build_command(executable, parameter): 349 # we add to the parameter (can hold path with spaces) 350 # "" so we have good parsing from shell 351 parameter = parameter.replace('"', '\\"') # but first escape " 352 command = '%s "%s"' % (executable, parameter) 353 return command 354 355def get_file_path_from_dnd_dropped_uri(uri): 356 path = urllib.parse.unquote(uri) # escape special chars 357 path = path.strip('\r\n\x00') # remove \r\n and NULL 358 # get the path to file 359 if re.match('^file:///[a-zA-Z]:/', path): # windows 360 path = path[8:] # 8 is len('file:///') 361 elif path.startswith('file://'): # nautilus, rox 362 path = path[7:] # 7 is len('file://') 363 elif path.startswith('file:'): # xffm 364 path = path[5:] # 5 is len('file:') 365 return path 366 367def sanitize_filename(filename): 368 """ 369 Make sure the filename we will write does contain only acceptable and latin 370 characters, and is not too long (in that case hash it) 371 """ 372 # 48 is the limit 373 if len(filename) > 48: 374 hash_ = hashlib.md5(filename.encode('utf-8')) 375 filename = base64.b64encode(hash_.digest()).decode('utf-8') 376 377 # make it latin chars only 378 filename = punycode_encode(filename).decode('utf-8') 379 filename = filename.replace('/', '_') 380 if os.name == 'nt': 381 filename = filename.replace('?', '_').replace(':', '_')\ 382 .replace('\\', '_').replace('"', "'").replace('|', '_')\ 383 .replace('*', '_').replace('<', '_').replace('>', '_') 384 385 return filename 386 387def reduce_chars_newlines(text, max_chars=0, max_lines=0): 388 """ 389 Cut the chars after 'max_chars' on each line and show only the first 390 'max_lines' 391 392 If any of the params is not present (None or 0) the action on it is not 393 performed 394 """ 395 def _cut_if_long(string_): 396 if len(string_) > max_chars: 397 string_ = string_[:max_chars - 3] + '…' 398 return string_ 399 400 if max_lines == 0: 401 lines = text.split('\n') 402 else: 403 lines = text.split('\n', max_lines)[:max_lines] 404 if max_chars > 0: 405 if lines: 406 lines = [_cut_if_long(e) for e in lines] 407 if lines: 408 reduced_text = '\n'.join(lines) 409 if reduced_text != text: 410 reduced_text += '…' 411 else: 412 reduced_text = '' 413 return reduced_text 414 415def get_account_status(account): 416 status = reduce_chars_newlines(account['status_line'], 100, 1) 417 return status 418 419def get_contact_dict_for_account(account): 420 """ 421 Create a dict of jid, nick -> contact with all contacts of account. 422 423 Can be used for completion lists 424 """ 425 contacts_dict = {} 426 for jid in app.contacts.get_jid_list(account): 427 contact = app.contacts.get_contact_with_highest_priority(account, jid) 428 contacts_dict[jid] = contact 429 name = contact.name 430 if name in contacts_dict: 431 contact1 = contacts_dict[name] 432 del contacts_dict[name] 433 contacts_dict['%s (%s)' % (name, contact1.jid)] = contact1 434 contacts_dict['%s (%s)' % (name, jid)] = contact 435 elif contact.name: 436 if contact.name == app.get_nick_from_jid(jid): 437 del contacts_dict[jid] 438 contacts_dict[name] = contact 439 return contacts_dict 440 441def play_sound(event): 442 if not app.settings.get('sounds_on'): 443 return 444 play_sound_file(app.settings.get_soundevent_settings(event)['path']) 445 446def check_soundfile_path(file_, dirs=None): 447 """ 448 Check if the sound file exists 449 450 :param file_: the file to check, absolute or relative to 'dirs' path 451 :param dirs: list of knows paths to fallback if the file doesn't exists 452 (eg: ~/.gajim/sounds/, DATADIR/sounds...). 453 :return the path to file or None if it doesn't exists. 454 """ 455 if not file_: 456 return None 457 if Path(file_).exists(): 458 return Path(file_) 459 460 if dirs is None: 461 dirs = [configpaths.get('MY_DATA'), 462 configpaths.get('DATA')] 463 464 for dir_ in dirs: 465 dir_ = dir_ / 'sounds' / file_ 466 if dir_.exists(): 467 return dir_ 468 return None 469 470def strip_soundfile_path(file_, dirs=None, abs_=True): 471 """ 472 Remove knowns paths from a sound file 473 474 Filechooser returns an absolute path. 475 If path is a known fallback path, we remove it. 476 So config has no hardcoded path to DATA_DIR and text in textfield is 477 shorther. 478 param: file_: the filename to strip 479 param: dirs: list of knowns paths from which the filename should be stripped 480 param: abs_: force absolute path on dirs 481 """ 482 483 if not file_: 484 return None 485 486 if dirs is None: 487 dirs = [configpaths.get('MY_DATA'), 488 configpaths.get('DATA')] 489 490 file_ = Path(file_) 491 name = file_.name 492 for dir_ in dirs: 493 dir_ = dir_ / 'sounds' / name 494 if abs_: 495 dir_ = dir_.absolute() 496 if file_ == dir_: 497 return name 498 return file_ 499 500def play_sound_file(path_to_soundfile): 501 path_to_soundfile = check_soundfile_path(path_to_soundfile) 502 if path_to_soundfile is None: 503 return 504 505 path_to_soundfile = str(path_to_soundfile) 506 if sys.platform == 'win32': 507 import winsound 508 try: 509 winsound.PlaySound(path_to_soundfile, 510 winsound.SND_FILENAME|winsound.SND_ASYNC) 511 except Exception: 512 log.exception('Sound Playback Error') 513 514 elif sys.platform == 'darwin': 515 try: 516 from AppKit import NSSound 517 except ImportError: 518 log.exception('Sound Playback Error') 519 return 520 521 sound = NSSound.alloc() 522 sound.initWithContentsOfFile_byReference_(path_to_soundfile, True) 523 sound.play() 524 525 elif app.is_installed('GSOUND'): 526 try: 527 app.gsound_ctx.play_simple({'media.filename' : path_to_soundfile}) 528 except GLib.Error as error: 529 log.error('Could not play sound: %s', error.message) 530 531def get_connection_status(account): 532 con = app.connections[account] 533 if con.state.is_reconnect_scheduled: 534 return 'error' 535 536 if con.state.is_connecting or con.state.is_connected: 537 return 'connecting' 538 539 if con.state.is_disconnected: 540 return 'offline' 541 return con.status 542 543def get_global_show(): 544 maxi = 0 545 for account in app.connections: 546 if not app.settings.get_account_setting(account, 547 'sync_with_global_status'): 548 continue 549 status = get_connection_status(account) 550 index = SHOW_LIST.index(status) 551 if index > maxi: 552 maxi = index 553 return SHOW_LIST[maxi] 554 555def get_global_status_message(): 556 maxi = 0 557 for account in app.connections: 558 if not app.settings.get_account_setting(account, 559 'sync_with_global_status'): 560 continue 561 status = app.connections[account].status 562 index = SHOW_LIST.index(status) 563 if index > maxi: 564 maxi = index 565 status_message = app.connections[account].status_message 566 return status_message 567 568def statuses_unified(): 569 """ 570 Test if all statuses are the same 571 """ 572 reference = None 573 for account in app.connections: 574 if not app.settings.get_account_setting(account, 575 'sync_with_global_status'): 576 continue 577 if reference is None: 578 reference = app.connections[account].status 579 elif reference != app.connections[account].status: 580 return False 581 return True 582 583def get_icon_name_to_show(contact, account=None): 584 """ 585 Get the icon name to show in online, away, requested, etc 586 """ 587 if account and app.events.get_nb_roster_events(account, contact.jid): 588 return 'event' 589 if account and app.events.get_nb_roster_events(account, 590 contact.get_full_jid()): 591 return 'event' 592 if account and account in app.interface.minimized_controls and \ 593 contact.jid in app.interface.minimized_controls[account] and app.interface.\ 594 minimized_controls[account][contact.jid].get_nb_unread_pm() > 0: 595 return 'event' 596 if account and contact.jid in app.gc_connected[account]: 597 if app.gc_connected[account][contact.jid]: 598 return 'muc-active' 599 return 'muc-inactive' 600 if contact.jid.find('@') <= 0: # if not '@' or '@' starts the jid ==> agent 601 return contact.show 602 if contact.sub in ('both', 'to'): 603 return contact.show 604 if contact.ask == 'subscribe': 605 return 'requested' 606 transport = app.get_transport_name_from_jid(contact.jid) 607 if transport: 608 return contact.show 609 if contact.show in SHOW_LIST: 610 return contact.show 611 return 'notinroster' 612 613def get_full_jid_from_iq(iq_obj): 614 """ 615 Return the full jid (with resource) from an iq 616 """ 617 jid = iq_obj.getFrom() 618 if jid is None: 619 return None 620 return parse_jid(str(iq_obj.getFrom())) 621 622def get_jid_from_iq(iq_obj): 623 """ 624 Return the jid (without resource) from an iq 625 """ 626 jid = get_full_jid_from_iq(iq_obj) 627 return app.get_jid_without_resource(jid) 628 629def get_auth_sha(sid, initiator, target): 630 """ 631 Return sha of sid + initiator + target used for proxy auth 632 """ 633 return hashlib.sha1(("%s%s%s" % (sid, initiator, target)).encode('utf-8')).\ 634 hexdigest() 635 636def remove_invalid_xml_chars(string_): 637 if string_: 638 string_ = re.sub(INVALID_XML_CHARS_REGEX, '', string_) 639 return string_ 640 641def get_random_string(count=16): 642 """ 643 Create random string of count length 644 645 WARNING: Don't use this for security purposes 646 """ 647 allowed = string.ascii_uppercase + string.digits 648 return ''.join(random.choice(allowed) for char in range(count)) 649 650@functools.lru_cache(maxsize=1) 651def get_os_info(): 652 info = 'N/A' 653 if sys.platform in ('win32', 'darwin'): 654 info = f'{platform.system()} {platform.release()}' 655 656 elif sys.platform == 'linux': 657 try: 658 import distro 659 info = distro.name(pretty=True) 660 except ImportError: 661 info = platform.system() 662 return info 663 664def allow_showing_notification(account): 665 if not app.settings.get('show_notifications'): 666 return False 667 if app.settings.get('autopopupaway'): 668 return True 669 if app.account_is_available(account): 670 return True 671 return False 672 673def allow_popup_window(account): 674 """ 675 Is it allowed to popup windows? 676 """ 677 autopopup = app.settings.get('autopopup') 678 autopopupaway = app.settings.get('autopopupaway') 679 if autopopup and (autopopupaway or \ 680 app.connections[account].status in ('online', 'chat')): 681 return True 682 return False 683 684def allow_sound_notification(account, sound_event): 685 if (app.settings.get('sounddnd') or 686 app.connections[account].status != 'dnd' and 687 app.settings.get_soundevent_settings(sound_event)['enabled']): 688 return True 689 return False 690 691def get_notification_icon_tooltip_dict(): 692 """ 693 Return a dict of the form {acct: {'show': show, 'message': message, 694 'event_lines': [list of text lines to show in tooltip]} 695 """ 696 # How many events before we show summarized, not per-user 697 max_ungrouped_events = 10 698 699 accounts = get_accounts_info() 700 701 # Gather events. (With accounts, when there are more.) 702 for account in accounts: 703 account_name = account['name'] 704 account['event_lines'] = [] 705 # Gather events per-account 706 pending_events = app.events.get_events(account=account_name) 707 messages, non_messages = {}, {} 708 total_messages, total_non_messages = 0, 0 709 for jid in pending_events: 710 for event in pending_events[jid]: 711 if event.type_.count('file') > 0: 712 # This is a non-messagee event. 713 messages[jid] = non_messages.get(jid, 0) + 1 714 total_non_messages = total_non_messages + 1 715 else: 716 # This is a message. 717 messages[jid] = messages.get(jid, 0) + 1 718 total_messages = total_messages + 1 719 # Display unread messages numbers, if any 720 if total_messages > 0: 721 if total_messages > max_ungrouped_events: 722 text = ngettext('%d message pending', 723 '%d messages pending', 724 total_messages, 725 total_messages, 726 total_messages) 727 account['event_lines'].append(text) 728 else: 729 for jid in messages: 730 text = ngettext('%d message pending', 731 '%d messages pending', 732 messages[jid], 733 messages[jid], 734 messages[jid]) 735 contact = app.contacts.get_first_contact_from_jid( 736 account['name'], jid) 737 text += ' ' 738 if jid in app.gc_connected[account['name']]: 739 text += _('from group chat %s') % (jid) 740 elif contact: 741 name = contact.get_shown_name() 742 text += _('from user %s') % (name) 743 else: 744 text += _('from %s') % (jid) 745 account['event_lines'].append(text) 746 747 # Display unseen events numbers, if any 748 if total_non_messages > 0: 749 if total_non_messages > max_ungrouped_events: 750 text = ngettext('%d event pending', 751 '%d events pending', 752 total_non_messages, 753 total_non_messages, 754 total_non_messages) 755 account['event_lines'].append(text) 756 else: 757 for jid in non_messages: 758 text = ngettext('%d event pending', 759 '%d events pending', 760 non_messages[jid], 761 non_messages[jid], 762 non_messages[jid]) 763 text += ' ' + _('from user %s') % (jid) 764 account[account]['event_lines'].append(text) 765 766 return accounts 767 768def get_accounts_info(): 769 """ 770 Helper for notification icon tooltip 771 """ 772 accounts = [] 773 accounts_list = sorted(app.contacts.get_accounts()) 774 for account in accounts_list: 775 776 status = get_connection_status(account) 777 message = app.connections[account].status_message 778 single_line = get_uf_show(status) 779 if message is None: 780 message = '' 781 else: 782 message = message.strip() 783 if message != '': 784 single_line += ': ' + message 785 account_label = app.get_account_label(account) 786 accounts.append({'name': account, 787 'account_label': account_label, 788 'status_line': single_line, 789 'show': status, 790 'message': message}) 791 return accounts 792 793def get_current_show(account): 794 if account not in app.connections: 795 return 'offline' 796 return app.connections[account].status 797 798def get_optional_features(account): 799 features = [] 800 801 if app.settings.get_account_setting(account, 'request_user_data'): 802 features.append(Namespace.MOOD + '+notify') 803 features.append(Namespace.ACTIVITY + '+notify') 804 features.append(Namespace.TUNE + '+notify') 805 features.append(Namespace.LOCATION + '+notify') 806 807 features.append(Namespace.NICK + '+notify') 808 809 if app.connections[account].get_module('Bookmarks').nativ_bookmarks_used: 810 features.append(Namespace.BOOKMARKS_1 + '+notify') 811 elif app.connections[account].get_module('Bookmarks').pep_bookmarks_used: 812 features.append(Namespace.BOOKMARKS + '+notify') 813 if app.is_installed('AV'): 814 features.append(Namespace.JINGLE_RTP) 815 features.append(Namespace.JINGLE_RTP_AUDIO) 816 features.append(Namespace.JINGLE_RTP_VIDEO) 817 features.append(Namespace.JINGLE_ICE_UDP) 818 819 # Give plugins the possibility to add their features 820 app.plugin_manager.extension_point('update_caps', account, features) 821 return features 822 823def jid_is_blocked(account, jid): 824 con = app.connections[account] 825 return jid in con.get_module('Blocking').blocked 826 827def get_subscription_request_msg(account=None): 828 s = app.settings.get_account_setting(account, 'subscription_request_msg') 829 if s: 830 return s 831 s = _('I would like to add you to my contact list.') 832 if account: 833 s = _('Hello, I am $name.') + ' ' + s 834 s = Template(s).safe_substitute({'name': app.nicks[account]}) 835 return s 836 837def get_user_proxy(account): 838 proxy_name = app.settings.get_account_setting(account, 'proxy') 839 if not proxy_name: 840 return None 841 return get_proxy(proxy_name) 842 843def get_proxy(proxy_name): 844 try: 845 settings = app.settings.get_proxy_settings(proxy_name) 846 except ValueError: 847 return None 848 849 username, password = None, None 850 if settings['useauth']: 851 username, password = settings['user'], settings['pass'] 852 853 return ProxyData(type=settings['type'], 854 host='%s:%s' % (settings['host'], settings['port']), 855 username=username, 856 password=password) 857 858def version_condition(current_version, required_version): 859 if V(current_version) < V(required_version): 860 return False 861 return True 862 863def get_available_emoticon_themes(): 864 files = [] 865 for folder in configpaths.get('EMOTICONS').iterdir(): 866 if not folder.is_dir(): 867 continue 868 files += [theme for theme in folder.iterdir() if theme.is_file()] 869 870 my_emots = configpaths.get('MY_EMOTS') 871 if my_emots.is_dir(): 872 files += list(my_emots.iterdir()) 873 874 emoticons_themes = ['font'] 875 emoticons_themes += [file.stem for file in files if file.suffix == '.png'] 876 return sorted(emoticons_themes) 877 878def call_counter(func): 879 def helper(self, restart=False): 880 if restart: 881 self._connect_machine_calls = 0 882 self._connect_machine_calls += 1 883 return func(self) 884 return helper 885 886def load_json(path, key=None, default=None): 887 try: 888 with path.open('r') as file: 889 json_dict = json.loads(file.read()) 890 except Exception: 891 log.exception('Parsing error') 892 return default 893 894 if key is None: 895 return json_dict 896 return json_dict.get(key, default) 897 898def ignore_contact(account, jid): 899 jid = str(jid) 900 known_contact = app.contacts.get_contacts(account, jid) 901 ignore = app.settings.get_account_setting(account, 902 'ignore_unknown_contacts') 903 if ignore and not known_contact: 904 log.info('Ignore unknown contact %s', jid) 905 return True 906 return False 907 908class AdditionalDataDict(collections.UserDict): 909 def __init__(self, initialdata=None): 910 collections.UserDict.__init__(self, initialdata) 911 912 @staticmethod 913 def _get_path_childs(full_path): 914 path_childs = [full_path] 915 if ':' in full_path: 916 path_childs = full_path.split(':') 917 return path_childs 918 919 def set_value(self, full_path, key, value): 920 path_childs = self._get_path_childs(full_path) 921 _dict = self.data 922 for path in path_childs: 923 try: 924 _dict = _dict[path] 925 except KeyError: 926 _dict[path] = {} 927 _dict = _dict[path] 928 _dict[key] = value 929 930 def get_value(self, full_path, key, default=None): 931 path_childs = self._get_path_childs(full_path) 932 _dict = self.data 933 for path in path_childs: 934 try: 935 _dict = _dict[path] 936 except KeyError: 937 return default 938 try: 939 return _dict[key] 940 except KeyError: 941 return default 942 943 def remove_value(self, full_path, key): 944 path_childs = self._get_path_childs(full_path) 945 _dict = self.data 946 for path in path_childs: 947 try: 948 _dict = _dict[path] 949 except KeyError: 950 return 951 try: 952 del _dict[key] 953 except KeyError: 954 return 955 956 def copy(self): 957 return copy.deepcopy(self) 958 959 960def save_roster_position(window): 961 if not app.settings.get('save-roster-position'): 962 return 963 if app.is_display(Display.WAYLAND): 964 return 965 x_pos, y_pos = window.get_position() 966 log.debug('Save roster position: %s %s', x_pos, y_pos) 967 app.settings.set('roster_x-position', x_pos) 968 app.settings.set('roster_y-position', y_pos) 969 970 971class Singleton(type): 972 _instances = {} # type: Dict[Any, Any] 973 def __call__(cls, *args, **kwargs): 974 if cls not in cls._instances: 975 cls._instances[cls] = super(Singleton, cls).__call__( 976 *args, **kwargs) 977 return cls._instances[cls] 978 979 980def delay_execution(milliseconds): 981 # Delay the first call for `milliseconds` 982 # ignore all other calls while the delay is active 983 def delay_execution_decorator(func): 984 @wraps(func) 985 def func_wrapper(*args, **kwargs): 986 def timeout_wrapper(): 987 func(*args, **kwargs) 988 delattr(func_wrapper, 'source_id') 989 990 if hasattr(func_wrapper, 'source_id'): 991 return 992 func_wrapper.source_id = GLib.timeout_add( 993 milliseconds, timeout_wrapper) 994 return func_wrapper 995 return delay_execution_decorator 996 997 998def event_filter(filter_): 999 def event_filter_decorator(func): 1000 @wraps(func) 1001 def func_wrapper(self, event, *args, **kwargs): 1002 for attr in filter_: 1003 if '=' in attr: 1004 attr1, attr2 = attr.split('=') 1005 else: 1006 attr1, attr2 = attr, attr 1007 try: 1008 if getattr(event, attr1) != getattr(self, attr2): 1009 return None 1010 except AttributeError: 1011 if getattr(event, attr1) != getattr(self, '_%s' % attr2): 1012 return None 1013 1014 return func(self, event, *args, **kwargs) 1015 return func_wrapper 1016 return event_filter_decorator 1017 1018 1019def catch_exceptions(func): 1020 @wraps(func) 1021 def func_wrapper(self, *args, **kwargs): 1022 try: 1023 result = func(self, *args, **kwargs) 1024 except Exception as error: 1025 log.exception(error) 1026 return None 1027 return result 1028 return func_wrapper 1029 1030 1031def parse_uri_actions(uri): 1032 uri = uri[5:] 1033 if '?' not in uri: 1034 return 'message', {'jid': uri} 1035 1036 jid, action = uri.split('?', 1) 1037 data = {'jid': jid} 1038 if ';' in action: 1039 action, keys = action.split(';', 1) 1040 action_keys = keys.split(';') 1041 for key in action_keys: 1042 if key.startswith('subject='): 1043 data['subject'] = unquote(key[8:]) 1044 1045 elif key.startswith('body='): 1046 data['body'] = unquote(key[5:]) 1047 1048 elif key.startswith('thread='): 1049 data['thread'] = key[7:] 1050 return action, data 1051 1052 1053def parse_uri(uri): 1054 if uri.startswith('xmpp:'): 1055 action, data = parse_uri_actions(uri) 1056 try: 1057 validate_jid(data['jid']) 1058 return URI(type=URIType.XMPP, 1059 action=URIAction(action), 1060 data=data) 1061 except ValueError: 1062 # Unknown action 1063 return URI(type=URIType.UNKNOWN) 1064 1065 if uri.startswith('mailto:'): 1066 uri = uri[7:] 1067 return URI(type=URIType.MAIL, data=uri) 1068 1069 if uri.startswith('tel:'): 1070 uri = uri[4:] 1071 return URI(type=URIType.TEL, data=uri) 1072 1073 if STH_AT_STH_DOT_STH_REGEX.match(uri): 1074 return URI(type=URIType.AT, data=uri) 1075 1076 if uri.startswith('geo:'): 1077 location = uri[4:] 1078 lat, _, lon = location.partition(',') 1079 if not lon: 1080 return URI(type=URIType.UNKNOWN, data=uri) 1081 1082 if Gio.AppInfo.get_default_for_uri_scheme('geo'): 1083 return URI(type=URIType.GEO, data=uri) 1084 1085 uri = geo_provider_from_location(lat, lon) 1086 return URI(type=URIType.GEO, data=uri) 1087 1088 if uri.startswith('file://'): 1089 return URI(type=URIType.FILE, data=uri) 1090 1091 return URI(type=URIType.WEB, data=uri) 1092 1093 1094@catch_exceptions 1095def open_uri(uri, account=None): 1096 if not isinstance(uri, URI): 1097 uri = parse_uri(uri) 1098 1099 if uri.type == URIType.FILE: 1100 open_file(uri.data) 1101 1102 elif uri.type == URIType.TEL: 1103 if sys.platform == 'win32': 1104 webbrowser.open(f'tel:{uri.data}') 1105 else: 1106 Gio.AppInfo.launch_default_for_uri(f'tel:{uri.data}') 1107 1108 elif uri.type == URIType.MAIL: 1109 if sys.platform == 'win32': 1110 webbrowser.open(f'mailto:{uri.data}') 1111 else: 1112 Gio.AppInfo.launch_default_for_uri(f'mailto:{uri.data}') 1113 1114 elif uri.type in (URIType.WEB, URIType.GEO): 1115 if sys.platform == 'win32': 1116 webbrowser.open(uri.data) 1117 else: 1118 Gio.AppInfo.launch_default_for_uri(uri.data) 1119 1120 elif uri.type == URIType.AT: 1121 app.interface.new_chat_from_jid(account, uri.data) 1122 1123 elif uri.type == URIType.XMPP: 1124 if account is None: 1125 log.warning('Account must be specified to open XMPP uri') 1126 return 1127 1128 if uri.action == URIAction.JOIN: 1129 app.app.activate_action( 1130 'groupchat-join', 1131 GLib.Variant('as', [account, uri.data['jid']])) 1132 elif uri.action == URIAction.MESSAGE: 1133 app.interface.new_chat_from_jid(account, uri.data['jid'], 1134 message=uri.data.get('body')) 1135 else: 1136 log.warning('Cant open URI: %s', uri) 1137 1138 else: 1139 log.warning('Cant open URI: %s', uri) 1140 1141 1142@catch_exceptions 1143def open_file(path): 1144 if os.name == 'nt': 1145 os.startfile(path) 1146 else: 1147 # Call str() to make it work with pathlib.Path 1148 path = str(path) 1149 if not path.startswith('file://'): 1150 path = 'file://' + path 1151 Gio.AppInfo.launch_default_for_uri(path) 1152 1153 1154def geo_provider_from_location(lat, lon): 1155 return ('https://www.openstreetmap.org/?' 1156 'mlat=%s&mlon=%s&zoom=16') % (lat, lon) 1157 1158 1159def get_resource(account): 1160 resource = app.settings.get_account_setting(account, 'resource') 1161 if not resource: 1162 return None 1163 1164 resource = Template(resource).safe_substitute( 1165 {'hostname': socket.gethostname(), 1166 'rand': get_random_string()}) 1167 app.settings.set_account_setting(account, 'resource', resource) 1168 return resource 1169 1170 1171def get_default_muc_config(): 1172 return { 1173 # XEP-0045 options 1174 'muc#roomconfig_allowinvites': True, 1175 'muc#roomconfig_publicroom': False, 1176 'muc#roomconfig_membersonly': True, 1177 'muc#roomconfig_persistentroom': True, 1178 'muc#roomconfig_whois': 'anyone', 1179 'muc#roomconfig_moderatedroom': False, 1180 1181 # Ejabberd options 1182 'allow_voice_requests': False, 1183 'public_list': False, 1184 1185 # Prosody options 1186 '{http://prosody.im/protocol/muc}roomconfig_allowmemberinvites': True, 1187 'muc#roomconfig_enablearchiving': True, 1188 } 1189 1190 1191def validate_jid(jid, type_=None): 1192 try: 1193 jid = JID.from_string(str(jid)) 1194 except InvalidJid as error: 1195 raise ValueError(error) 1196 1197 if type_ is None: 1198 return jid 1199 if type_ == 'bare' and jid.is_bare: 1200 return jid 1201 if type_ == 'full' and jid.is_full: 1202 return jid 1203 if type_ == 'domain' and jid.is_domain: 1204 return jid 1205 1206 raise ValueError('Not a %s JID' % type_) 1207 1208 1209def to_user_string(error): 1210 text = error.get_text(get_rfc5646_lang()) 1211 if text: 1212 return text 1213 1214 condition = error.condition 1215 if error.app_condition is not None: 1216 return '%s (%s)' % (condition, error.app_condition) 1217 return condition 1218 1219 1220def get_groupchat_name(con, jid): 1221 name = con.get_module('Bookmarks').get_name_from_bookmark(jid) 1222 if name: 1223 return name 1224 1225 disco_info = app.storage.cache.get_last_disco_info(jid) 1226 if disco_info is not None: 1227 if disco_info.muc_name: 1228 return disco_info.muc_name 1229 1230 return jid.split('@')[0] 1231 1232 1233def is_affiliation_change_allowed(self_contact, contact, target_aff): 1234 if contact.affiliation.value == target_aff: 1235 # Contact has already the target affiliation 1236 return False 1237 1238 if self_contact.affiliation.is_owner: 1239 return True 1240 1241 if not self_contact.affiliation.is_admin: 1242 return False 1243 1244 if target_aff in ('admin', 'owner'): 1245 # Admin can’t edit admin/owner list 1246 return False 1247 return self_contact.affiliation > contact.affiliation 1248 1249 1250def is_role_change_allowed(self_contact, contact): 1251 if self_contact.role < Role.MODERATOR: 1252 return False 1253 return self_contact.affiliation >= contact.affiliation 1254 1255 1256def get_tls_error_phrase(tls_error): 1257 phrase = GIO_TLS_ERRORS.get(tls_error) 1258 if phrase is None: 1259 return GIO_TLS_ERRORS.get(Gio.TlsCertificateFlags.GENERIC_ERROR) 1260 return phrase 1261 1262 1263class Observable: 1264 def __init__(self, log_=None): 1265 self._log = log_ 1266 self._callbacks = defaultdict(list) 1267 1268 def disconnect_signals(self): 1269 self._callbacks = defaultdict(list) 1270 1271 def disconnect(self, object_): 1272 for signal_name, handlers in self._callbacks.items(): 1273 for handler in list(handlers): 1274 func = handler() 1275 if func is None or func.__self__ == object_: 1276 self._callbacks[signal_name].remove(handler) 1277 1278 def connect(self, signal_name, func): 1279 if inspect.ismethod(func): 1280 weak_func = weakref.WeakMethod(func) 1281 elif inspect.isfunction(func): 1282 weak_func = weakref.ref(func) 1283 1284 self._callbacks[signal_name].append(weak_func) 1285 1286 def notify(self, signal_name, *args, **kwargs): 1287 if self._log is not None: 1288 self._log.info('Signal: %s', signal_name) 1289 1290 callbacks = self._callbacks.get(signal_name, []) 1291 for func in list(callbacks): 1292 if func() is None: 1293 self._callbacks[signal_name].remove(func) 1294 continue 1295 func()(self, signal_name, *args, **kwargs) 1296 1297 1298def write_file_async(path, data, callback, user_data=None): 1299 file = Gio.File.new_for_path(str(path)) 1300 file.create_async(Gio.FileCreateFlags.PRIVATE, 1301 GLib.PRIORITY_DEFAULT, 1302 None, 1303 _on_file_created, 1304 (callback, data, user_data)) 1305 1306def _on_file_created(file, result, user_data): 1307 callback, data, user_data = user_data 1308 try: 1309 outputstream = file.create_finish(result) 1310 except GLib.Error as error: 1311 callback(False, error, user_data) 1312 return 1313 1314 # Pass data as user_data to the callback, because 1315 # write_all_async() takes not reference to the data 1316 # and python gc collects it before the data are written 1317 outputstream.write_all_async(data, 1318 GLib.PRIORITY_DEFAULT, 1319 None, 1320 _on_write_finished, 1321 (callback, data, user_data)) 1322 1323def _on_write_finished(outputstream, result, user_data): 1324 callback, _data, user_data = user_data 1325 try: 1326 successful, _bytes_written = outputstream.write_all_finish(result) 1327 except GLib.Error as error: 1328 callback(False, error, user_data) 1329 else: 1330 callback(successful, None, user_data) 1331 1332 1333def load_file_async(path, callback, user_data=None): 1334 file = Gio.File.new_for_path(str(path)) 1335 file.load_contents_async(None, 1336 _on_load_finished, 1337 (callback, user_data)) 1338 1339 1340def _on_load_finished(file, result, user_data): 1341 callback, user_data = user_data 1342 try: 1343 _successful, contents, _etag = file.load_contents_finish(result) 1344 except GLib.Error as error: 1345 callback(None, error, user_data) 1346 else: 1347 callback(contents, None, user_data) 1348 1349 1350def convert_gio_to_openssl_cert(cert): 1351 cert = load_certificate(FILETYPE_PEM, cert.props.certificate_pem.encode()) 1352 return cert 1353 1354 1355def get_custom_host(account): 1356 if not app.settings.get_account_setting(account, 'use_custom_host'): 1357 return None 1358 host = app.settings.get_account_setting(account, 'custom_host') 1359 port = app.settings.get_account_setting(account, 'custom_port') 1360 type_ = app.settings.get_account_setting(account, 'custom_type') 1361 1362 if host.startswith('ws://') or host.startswith('wss://'): 1363 protocol = ConnectionProtocol.WEBSOCKET 1364 else: 1365 host = f'{host}:{port}' 1366 protocol = ConnectionProtocol.TCP 1367 1368 return (host, protocol, ConnectionType(type_)) 1369 1370 1371def warn_about_plain_connection(account, connection_types): 1372 warn = app.settings.get_account_setting( 1373 account, 'confirm_unencrypted_connection') 1374 for type_ in connection_types: 1375 if type_.is_plain and warn: 1376 return True 1377 return False 1378 1379 1380def get_idle_status_message(state, status_message): 1381 message = app.settings.get(f'auto{state}_message') 1382 if not message: 1383 message = status_message 1384 else: 1385 message = message.replace('$S', '%(status)s') 1386 message = message.replace('$T', '%(time)s') 1387 message = message % { 1388 'status': status_message, 1389 'time': app.settings.get(f'auto{state}time') 1390 } 1391 return message 1392 1393 1394def should_log(account, jid): 1395 """ 1396 Should conversations between a local account and a remote jid be logged? 1397 """ 1398 no_log_for = app.settings.get_account_setting(account, 'no_log_for') 1399 1400 if not no_log_for: 1401 no_log_for = '' 1402 1403 no_log_for = no_log_for.split() 1404 1405 return (account not in no_log_for) and (jid not in no_log_for) 1406 1407 1408def ask_for_status_message(status, signin=False): 1409 if status is None: 1410 # We try to change the message 1411 return True 1412 1413 if signin: 1414 return app.settings.get('ask_online_status') 1415 1416 if status == 'offline': 1417 return app.settings.get('ask_offline_status') 1418 1419 return app.settings.get('always_ask_for_status_message') 1420 1421 1422def get_group_chat_nick(account, room_jid): 1423 nick = app.nicks[account] 1424 1425 client = app.get_client(account) 1426 1427 bookmark = client.get_module('Bookmarks').get_bookmark(room_jid) 1428 if bookmark is not None: 1429 if bookmark.nick is not None: 1430 nick = bookmark.nick 1431 1432 return nick 1433 1434 1435def get_muc_context(jid): 1436 disco_info = app.storage.cache.get_last_disco_info(jid) 1437 if disco_info is None: 1438 return None 1439 1440 if (disco_info.muc_is_members_only and disco_info.muc_is_nonanonymous): 1441 return 'private' 1442 return 'public' 1443