1#!/usr/local/bin/python3.8 2 3import argparse 4import atexit 5import base64 6import bisect 7import collections 8import contextlib 9import errno 10import importlib 11import json 12import logging 13import os 14import pprint 15import re 16import shlex 17import shutil 18import signal as signal_ipc 19import subprocess 20import sys 21import tempfile 22import textwrap 23from datetime import datetime, timezone 24 25import urwid 26 27try: 28 from urwid_readline import ReadlineEdit 29 Edit = ReadlineEdit 30except ImportError: 31 Edit = urwid.Edit 32 33 34# ############################################################################# 35# constants 36# ############################################################################# 37 38 39DATA_FOLDER = os.getenv('XDG_DATA_HOME', os.path.expanduser('~/.local/share')) 40CFG_FOLDER = os.getenv('XDG_CONFIG_HOME', os.path.expanduser('~/.config')) 41 42SIGNALCLI_LEGACY_FOLDER = os.path.join(CFG_FOLDER, 'signal') 43SIGNALCLI_LEGACY_DATA_FOLDER = os.path.join(SIGNALCLI_LEGACY_FOLDER, 'data') 44SIGNALCLI_LEGACY_ATTACHMENT_FOLDER = os.path.join(SIGNALCLI_LEGACY_FOLDER, 'attachments') 45 46SIGNALCLI_FOLDER = os.path.join(DATA_FOLDER, 'signal-cli') 47SIGNALCLI_DATA_FOLDER = os.path.join(SIGNALCLI_FOLDER, 'data') 48SIGNALCLI_ATTACHMENT_FOLDER = os.path.join(SIGNALCLI_FOLDER, 'attachments') 49 50SCLI_DATA_FOLDER = os.path.join(DATA_FOLDER, 'scli') 51SCLI_ATTACHMENT_FOLDER = os.path.join(SCLI_DATA_FOLDER, 'attachments') 52SCLI_HISTORY_FILE = os.path.join(SCLI_DATA_FOLDER, 'history') 53SCLI_CFG_FILE = os.path.join(CFG_FOLDER, 'sclirc') 54SCLI_LOG_FILE = os.path.join(SCLI_DATA_FOLDER, 'log') 55 56SCLI_EXEC_FOLDER = os.path.dirname(os.path.realpath(__file__)) 57SCLI_README_FILE = os.path.join('/usr/local/share/doc/scli', 'README.md') 58 59 60# ############################################################################# 61# utility 62# ############################################################################# 63 64 65def noop(*_args, **_kwargs): 66 pass 67 68 69def get_nested(dct, *keys, default=None): 70 for key in keys: 71 try: 72 dct = dct[key] 73 except (KeyError, TypeError, IndexError): 74 return default 75 return dct 76 77 78def get_urls(txt): 79 return re.findall(r'(https?://[^\s]+)', txt) 80 81 82def callf(cmd, rmap=None, background=False, **subprocess_kwargs): 83 if rmap: 84 for key, val in rmap.items(): 85 if key not in cmd: 86 raise ValueError(f'Command string `{cmd}` should contain a replacement placeholder `{key}` (e.g. `some-cmd "{key}"`). See `--help`.') 87 cmd = cmd.replace(key, val) 88 89 if not subprocess_kwargs.get('shell'): 90 cmd = shlex.split(cmd) 91 logging.debug('callf: `%s`', cmd) 92 93 if background: 94 for arg in ('stdin', 'stdout', 'stderr'): 95 subprocess_kwargs.setdefault(arg, subprocess.DEVNULL) 96 proc = subprocess.Popen(cmd, **subprocess_kwargs) 97 return proc 98 99 subprocess_kwargs.setdefault('text', True) 100 proc = subprocess.run(cmd, **subprocess_kwargs) 101 102 if proc.returncode != 0: 103 logging.error( 104 'callf: %s: exit code: %d, stderr: %s', 105 proc.args, 106 proc.returncode, 107 proc.stderr 108 ) 109 elif proc.stdout: 110 logging.debug('callf: %s', proc.stdout) 111 112 return proc 113 114 115def get_prog_dir(): 116 return os.path.dirname(os.path.realpath(__file__)) 117 118 119def get_version(): 120 """Get this program's version. 121 122 Based on either `git describe`, or, if not available (e.g. for a release downloaded without the `.git` dir), use VERSION file populated during the creation of the release. 123 Does not output the leading `v` if it's present in git tag's name. 124 """ 125 126 # Do not use `logging` in this function, as it's called before logging.basicConfig(). 127 prog_dir = get_prog_dir() 128 git_dir = os.path.join(prog_dir, '.git') 129 git_cmd = ['git', '--git-dir', git_dir, 'describe'] 130 try: 131 proc = subprocess.run(git_cmd, capture_output=True, check=True, text=True) 132 return proc.stdout.strip('v\n') 133 except (FileNotFoundError, subprocess.CalledProcessError): 134 pass 135 136 version_file_path = os.path.join(prog_dir, 'VERSION') 137 try: 138 with open(version_file_path) as f: 139 version_str = f.readline() 140 except OSError: 141 return '?' 142 if version_str.startswith('$'): 143 # Neither a release, nor is there a `.git` dir (e.g. manually dl'ed blob) 144 return '?' 145 tag_re = re.compile(r"tag: v(.+?)[,)]") 146 # Assumes VERSION file is formatted with `$Format:%h %d$` 147 tag_re_match = tag_re.search(version_str) 148 if tag_re_match: 149 return tag_re_match.group(1) 150 return version_str.split(maxsplit=1)[0] # commit hash 151 152 153def get_default_editor(): 154 for env_var in ('VISUAL', 'EDITOR'): 155 ret = os.getenv(env_var) 156 if ret is not None: 157 return ret 158 for exe in ('sensible-editor', 'editor', 'nano', 'emacs', 'vi'): 159 ret = shutil.which(exe) 160 if ret is not None: 161 return ret 162 return ret 163 164 165PHONE_NUM_REGEX = re.compile('^\\+[1-9][0-9]{6,14}$') 166# https://github.com/signalapp/libsignal-service-java/blob/master/java/src/main/java/org/whispersystems/signalservice/api/util/PhoneNumberFormatter.java 167def is_number(number): 168 return bool(PHONE_NUM_REGEX.match(number)) 169 170 171def is_path(path): 172 return path.startswith(("/", "~/", "./")) 173 174 175PATH_RE = re.compile( 176 r""" 177 # Matches a path-like string, with whitespaces escaped or with the whole path in quotes. 178 ( 179 ( 180 \\\ | # escaped whitespace OR .. 181 [^'" ] # .. not a quote or space 182 )+ 183 ) # Path with escaped whitespace .. 184 | # .. OR .. 185 ( # .. path in quotes. 186 (?P<quote>['"]) # a quote char; name the capture 187 .+? # anything, non-greedily 188 (?P=quote) # matching quote 189 ) 190 """, 191 re.VERBOSE, 192) 193def split_path(string): 194 string = string.strip() 195 if not string: 196 return ['', ''] 197 re_match = PATH_RE.match(string) 198 if not re_match: 199 return ['', string] 200 path = re_match.group() 201 if re_match.group(1): # unquoted path 202 path = path.replace(r'\ ', ' ') 203 else: # path in quotes 204 path = path.strip('\'"') 205 rest = string[re_match.end() :].strip() 206 return [path, rest] if rest else [path] 207 208 209def utc2local(utc_dt): 210 return utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None) 211 212 213def strftimestamp(timestamp, strformat='%H:%M:%S (%Y-%m-%d)'): 214 try: 215 date = datetime.utcfromtimestamp(timestamp) 216 except ValueError: 217 date = datetime.utcfromtimestamp(timestamp / 1000) 218 return utc2local(date).strftime(strformat) 219 220 221def strip_non_printable_chars(string): 222 if string.isprintable(): 223 return string 224 return ''.join((c for c in string if c.isprintable())) 225 226 227# ############################################################################# 228# signal utility 229# ############################################################################# 230 231 232def get_contact_id(contact_dict): 233 return contact_dict.get('number') or contact_dict.get('groupId') 234 235 236def is_contact_group(contact_dict): 237 return 'groupId' in contact_dict 238 239 240def is_group_v2(group_dict): 241 gid = group_dict['groupId'] 242 return len(gid) == 44 243 244 245def get_envelope_data_val(envelope, *keys, default=None, return_tuple=False): 246 data_message_ret = get_nested(envelope, 'dataMessage', *keys, default=default) 247 sync_message_ret = get_nested(envelope, 'syncMessage', 'sentMessage', *keys, default=default) 248 if return_tuple: 249 return (data_message_ret, sync_message_ret) 250 else: 251 return data_message_ret or sync_message_ret 252 253 254def is_envelope_outgoing(envelope): 255 return ( 256 'target' in envelope 257 or get_nested(envelope, 'syncMessage', 'sentMessage') is not None 258 or get_nested(envelope, 'callMessage', 'answerMessage') is not None 259 ) 260 261 262def is_envelope_group_message(envelope): 263 return ( 264 get_envelope_data_val(envelope, 'groupInfo') is not None 265 or ('target' in envelope and not is_number(envelope['target'])) 266 or get_nested(envelope, 'typingMessage', 'groupId') is not None 267 ) 268 269 270def get_envelope_msg(envelope): 271 # If the `message` field is absent from the envelope: return None. If it is present but contains no text (since signal-cli v0.6.8, this is represented as `'message': null`): return ''. Otherwise: return the `message` field's value. 272 for msg in get_envelope_data_val(envelope, 'message', default=0, return_tuple=True): 273 if msg is None: 274 return '' 275 elif msg != 0: 276 return msg 277 return None 278 279 280def get_envelope_time(envelope): 281 return ( 282 envelope['timestamp'] 283 or get_envelope_data_val(envelope, 'timestamp') 284 ) 285 286 287def get_envelope_contact_id(envelope): 288 return ( 289 envelope.get('target') 290 or get_envelope_data_val(envelope, 'groupInfo', 'groupId') 291 or get_nested(envelope, 'syncMessage', 'sentMessage', 'destination') 292 or get_nested(envelope, 'typingMessage', 'groupId') 293 or envelope['source'] 294 ) 295 296 297def get_envelope_sender_id(envelope): 298 return envelope['source'] 299 300 301def get_envelope_quote(envelope): 302 return get_envelope_data_val(envelope, 'quote') 303 304 305def get_envelope_reaction(envelope): 306 return get_envelope_data_val(envelope, 'reaction') 307 308 309def get_envelope_mentions(envelope): 310 return get_envelope_data_val(envelope, 'mentions') 311 312 313def get_envelope_remote_delete(envelope): 314 return get_envelope_data_val(envelope, 'remoteDelete') 315 316 317def get_envelope_attachments(envelope): 318 return get_envelope_data_val(envelope, 'attachments') 319 320 321def get_attachment_name(attachment): 322 if isinstance(attachment, dict): 323 filename = attachment['filename'] 324 return filename if filename is not None else attachment['contentType'] 325 else: 326 return os.path.basename(attachment) 327 328 329def get_attachment_path(attachment): 330 try: 331 aid = attachment['id'] 332 except TypeError: 333 return attachment 334 received_attachment = os.path.join(SIGNALCLI_ATTACHMENT_FOLDER, aid) 335 if not os.path.exists(received_attachment): 336 received_attachment = os.path.join(SIGNALCLI_LEGACY_ATTACHMENT_FOLDER, aid) 337 return received_attachment 338 339 340def b64_to_bytearray(group_id): 341 return ','.join(str(i) for i in base64.b64decode(group_id.encode())) 342 343 344def b64_to_hex_str(group_id): 345 return base64.b64decode(group_id.encode()).hex() 346 347 348# ############################################################################# 349# clipboard 350# ############################################################################# 351 352 353class clip: 354 mime_order = ['image/png', 'image/jpeg', 'image/jpg', 'text/uri-list'] 355 tempfile_prefix = '_scli-tmp.' 356 357 @staticmethod 358 def xrun(mime): 359 try: 360 proc = subprocess.run( 361 ['xclip', '-selection', 'clipboard', '-t', mime, '-o'], 362 capture_output=True, 363 check=True, 364 ) 365 except (OSError, subprocess.CalledProcessError): 366 return None 367 return proc.stdout 368 369 @staticmethod 370 def xrun_lines(mime): 371 out = clip.xrun(mime) 372 if out: 373 return out.decode('utf-8').split('\n') 374 return None 375 376 @staticmethod 377 def xfiles(): 378 out = clip.xrun_lines('TARGETS') 379 if out is None: 380 return out 381 382 for otype in out: 383 for mtype in clip.mime_order: 384 if mtype == otype: 385 if mtype.startswith('image/'): 386 content = clip.xrun(mtype) 387 suffix = '.' + mtype.split('/')[1] 388 if cfg.save_history: 389 clip_file_path = os.path.join( 390 SCLI_ATTACHMENT_FOLDER, 391 f"clipboard_{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}{suffix}" 392 ) 393 clip_file = open(clip_file_path, 'w+b') 394 else: 395 clip_file = tempfile.NamedTemporaryFile( 396 mode='w+b', 397 prefix=clip.tempfile_prefix, 398 suffix=suffix, 399 delete=False, 400 ) 401 clip_file.write(content) 402 clip_file.flush() 403 clip_file.close() 404 return [clip_file.name] 405 elif mtype == 'text/uri-list': 406 content = clip.xrun_lines(mtype) 407 return [x.replace('file://', '') for x in content[1:]] 408 409 return None 410 411 @staticmethod 412 def xput(txt): 413 if not txt: 414 return 415 try: 416 proc = subprocess.Popen( 417 ['xclip', '-selection', 'clipboard'], 418 stdin=subprocess.PIPE, 419 text=True, 420 ) 421 except OSError: 422 return 423 proc.stdin.write(txt) 424 proc.stdin.close() 425 426 @staticmethod 427 def put(txt): 428 cmd = cfg.clipboard_put_command 429 if cmd is None: 430 return clip.xput(txt) 431 return callf(cmd, {'%s': txt}) 432 433 @staticmethod 434 def files(): 435 cmd = cfg.clipboard_get_command 436 if cmd is None: 437 return clip.xfiles() 438 439 return callf(cmd).split('\n') 440 441 442# ############################################################################# 443# AsyncProc & Daemon 444# ############################################################################# 445 446 447class CallbackQueue: 448 def __init__(self): 449 # The _queue is a list with each item a tuple: ({proc1, proc2, ..}, callback, callback_kwargs, proc_callback, proc_callback_kwargs) 450 self._queue = [] 451 self._accepting_new_procs_for_item = False 452 453 def new_item(self, callback, callback_kwargs, proc_callback, proc_callback_kwargs): 454 if callback is None and proc_callback is None: 455 return 456 self._queue.append( 457 (set(), callback, callback_kwargs, proc_callback, proc_callback_kwargs) 458 ) 459 self._accepting_new_procs_for_item = True 460 461 def add_proc(self, proc): 462 if not self._accepting_new_procs_for_item: 463 return 464 curr_item = self._queue[-1] 465 procs = curr_item[0] 466 procs.add(proc) 467 468 def finalize_item(self): 469 self._accepting_new_procs_for_item = False 470 if self._queue and not self._queue[-1][0]: 471 # All background procs have already finished, or there had been none started 472 self._pop_callback(-1) 473 474 def on_proc_done(self, proc): 475 for ind, item in enumerate(self._queue): 476 procs, *_ = item 477 if proc not in procs: 478 continue 479 self._run_proc_callback(ind, proc) 480 procs.remove(proc) 481 if not procs and not self._accepting_new_procs_for_item: 482 self._pop_callback(ind) 483 return 484 485 def _run_proc_callback(self, queue_index, proc): 486 *_, proc_callback, proc_callback_kwargs = self._queue[queue_index] 487 if proc_callback is not None: 488 proc_callback(proc, **proc_callback_kwargs) 489 490 def _pop_callback(self, queue_index): 491 _, callback, callback_kwargs, *_ = self._queue.pop(queue_index) 492 if callback is not None: 493 callback(**callback_kwargs) 494 495 496class AsyncProc: 497 def __init__(self, main_loop): 498 # The `main_loop` is an object like `urwid.MainLoop`, that implements `watch_pipe()` and `set_alarm_in()` methods. 499 self.main_loop = main_loop 500 self._callback_queue = CallbackQueue() 501 502 def run(self, args, callback=None, *callback_args, shell=False, **callback_kwargs): 503 """ Run the command composed of `args` in the background (asynchronously); run the `callback` function when it finishes """ 504 505 def watchpipe_handler(line): 506 # This function is run when the shell process returns (finishes execution). 507 # The `line` printed to watch pipe is of the form "b'<PID> <RETURN_CODE>\n'" 508 _proc_pid, return_code = [int(i) for i in line.decode().split()] 509 proc.wait() # reap the child process, to prevent zombies 510 511 proc.returncode = return_code # overwrite the 'wrapper' command return code (always 0) with the actual command return code 512 proc.output = proc.stderr.read().rstrip('\n') # stderr stream is not seekable, so can be read only once 513 514 if return_code != 0: 515 logging.error( 516 'proc: cmd:`%s`; return_code:%d; output:"%s"', 517 proc.args, 518 return_code, 519 proc.output, 520 ) 521 522 if callback is not None: 523 callback(proc, *callback_args, **callback_kwargs) 524 self._callback_queue.on_proc_done(proc) 525 526 os.close(watchpipe_fd) # Close the write end of watch pipe. 527 return False # Close the read end of watch pipe and remove the watch from event_loop. 528 529 watchpipe_fd = self.main_loop.watch_pipe(watchpipe_handler) 530 531 # If the command is run with Popen(.., shell=True), shlex.quote is needed to escape special chars in args. 532 sh_command = " ".join( 533 [shlex.quote(arg) for arg in args] if not shell else ['{', args, ';', '}'] 534 ) 535 # Redirect all the process's output to stderr, and write the process PID and exit status to the watch pipe. 536 sh_command += " 1>&2; echo $$ $?" 537 538 proc = subprocess.Popen( 539 sh_command, 540 shell=True, 541 stdout=watchpipe_fd, 542 stderr=subprocess.PIPE, 543 universal_newlines=True, 544 ) 545 atexit.register(proc.kill) # prevent orphaned processes surviving after the main program is stopped 546 self._callback_queue.add_proc(proc) 547 return proc 548 549 @contextlib.contextmanager 550 def callback_finally( 551 self, 552 callback=None, 553 proc_callback=None, 554 proc_callback_kwargs=None, 555 **callback_kwargs 556 ): 557 """Execute callback function after all background processes started inside this context have finished. 558 559 Optionally, run `proc_callback` after every background processes that exits. 560 """ 561 562 proc_callback_kwargs = proc_callback_kwargs or {} 563 try: 564 yield self._callback_queue.new_item( 565 callback, 566 callback_kwargs, 567 proc_callback, 568 proc_callback_kwargs, 569 ) 570 finally: 571 self._callback_queue.finalize_item() 572 573 574class Daemon(AsyncProc): 575 def __init__(self, main_loop, username): 576 super().__init__(main_loop) 577 self._username = username 578 self._buffer = b'' 579 self.callbacks = { 580 cb_name: noop for cb_name in [ 581 'daemon_started', 582 'daemon_log', 583 'receive_message', 584 'receive_sync_message', 585 'receive_receipt', 586 'receive_reaction', 587 'sending_message', 588 'sending_done', 589 'contact_typing', 590 'call_message', 591 'contacts_sync', 592 'remote_delete', 593 ] 594 } 595 596 def start(self): 597 stdout_fd = self.main_loop.watch_pipe(self._daemon_stdout_handler) 598 stderr_fd = self.main_loop.watch_pipe(self._daemon_stderr_handler) 599 try: 600 proc = callf( 601 cfg.daemon_command, 602 {'%u': self._username}, 603 background=True, 604 stdout=stdout_fd, 605 stderr=stderr_fd, 606 ) 607 except FileNotFoundError: 608 sys.exit( 609 f"ERROR: could not find `{cfg.daemon_command.split()[0]}` executable. " 610 "Make sure it is on system path." 611 ) 612 return proc 613 614 def _daemon_stdout_handler(self, bytes_recv): 615 bytes_recv = self._buffer + bytes_recv 616 lines = bytes_recv.split(b'\n') 617 if lines[-1] != b'': 618 # Not a complete message. Store in buffer 619 self._buffer = lines[-1] 620 else: 621 self._buffer = b'' 622 623 # The last item is either empty or an incomplete message, so we don't process it 624 for line in lines[:-1]: 625 if not line.strip(): 626 continue 627 try: 628 json_data = json.loads(line.decode('utf-8')) 629 envelope = json_data['envelope'] 630 except (json.JSONDecodeError, KeyError) as err: 631 logging.error('Could not parse daemon output: %s', line) 632 logging.exception(err) 633 else: 634 self._envelope_handler(envelope) 635 636 def _daemon_stderr_handler(self, bytes_recv): 637 line = bytes_recv.decode().strip() 638 if not line: 639 return 640 logging.info('daemon_log: %s', line) 641 self.callbacks['daemon_log'](line) 642 if line == "INFO DaemonCommand - Exported dbus object: /org/asamk/Signal": 643 self._run_when_dbus_service_started( 644 self.callbacks['daemon_started'] 645 ) 646 647 def _envelope_handler(self, envelope): 648 logging.debug("Daemon: envelope = \n%s", pprint.pformat(envelope)) 649 if get_envelope_msg(envelope) or get_envelope_attachments(envelope): 650 if get_nested(envelope, 'syncMessage', 'sentMessage') is not None: 651 self.callbacks['receive_sync_message'](envelope) 652 else: 653 self.callbacks['receive_message'](envelope) 654 elif envelope.get('receiptMessage') is not None: 655 # In signal-cli >=0.7.3, above check can be replaced with just 656 # 'receiptMessage' in envelope 657 # Keeping `is not None` for compatiability with envelopes in history from older signal-cli versions. 658 self.callbacks['receive_receipt'](envelope) 659 elif 'typingMessage' in envelope: 660 self.callbacks['contact_typing'](envelope) 661 elif get_envelope_reaction(envelope): 662 self.callbacks['receive_reaction'](envelope) 663 elif envelope.get('callMessage') is not None: 664 self.callbacks['call_message'](envelope) 665 elif get_nested(envelope, 'syncMessage', 'type') in ('CONTACTS_SYNC', 'GROUPS_SYNC'): 666 self.callbacks['contacts_sync']() 667 elif get_envelope_data_val(envelope, 'groupInfo', 'type') == 'UPDATE': 668 self.callbacks['contacts_sync']() 669 elif get_envelope_remote_delete(envelope): 670 self.callbacks['remote_delete'](envelope) 671 else: 672 logging.info('No action for received envelope: %s', pprint.pformat(envelope)) 673 674 def _dbus_send(self, args, *proc_args, async_proc=True, **proc_kwargs): 675 args = [ 676 'dbus-send', 677 '--session', 678 '--type=method_call', 679 '--print-reply', 680 *args 681 ] 682 if async_proc: 683 proc = self.run(args, *proc_args, **proc_kwargs) 684 else: 685 proc = subprocess.run(args, *proc_args, **proc_kwargs) 686 return proc 687 688 def _dbus_send_signal_cli(self, args, *proc_args, **proc_kwargs): 689 """ Send a command to signal-cli daemon through dbus """ 690 args = [ 691 '--dest=org.asamk.Signal', 692 '/org/asamk/Signal', 693 *args 694 ] 695 return self._dbus_send(args, *proc_args, **proc_kwargs) 696 697 def _send_message_dbus_cmd(self, message, attachments, recipient, is_group=False, *proc_args, **proc_kwargs): 698 args = [ 699 ('org.asamk.Signal.sendMessage' 700 if not is_group else 701 'org.asamk.Signal.sendGroupMessage'), 702 'string:' + message, 703 'array:string:' + ','.join(attachments), 704 ('string:' + recipient 705 if not is_group else 706 'array:byte:' + b64_to_bytearray(recipient)) 707 ] 708 709 self._dbus_send_signal_cli(args, *proc_args, **proc_kwargs) 710 711 def send_message(self, contact_id, message="", attachments=None): 712 is_group = not is_number(contact_id) 713 714 if attachments is None: 715 attachments = [] 716 attachment_paths = [os.path.expanduser(attachment) for attachment in attachments] 717 if not all(os.path.exists(attachment_path) for attachment_path in attachment_paths): 718 logging.warning('send_message: Attached file(s) does not exist.') 719 return 720 721 timestamp = int(datetime.now().timestamp() * 1000) 722 envelope = { 723 'source': self._username, 724 'target': contact_id, 725 'timestamp': timestamp, 726 'dataMessage': { 727 'message': message, 728 'attachments': attachments, 729 'timestamp': timestamp, 730 }, 731 } 732 733 def after_send_proc_returns(proc): 734 # Remove temproary attachments 735 for attachment in envelope['dataMessage']['attachments']: 736 if attachment.startswith( 737 os.path.join(tempfile.gettempdir(), clip.tempfile_prefix) 738 ): 739 os.remove(attachment) 740 741 # Check if send command succeeded 742 if proc.returncode != 0: 743 if is_group and 'Unregistered user' in proc.output: 744 # Workaround for 745 # https://github.com/AsamK/signal-cli/issues/348 746 # Can't check whether _all_ members in group have uninstalled signal (like in b912161), since in this function we don't have access to group's membership information. 747 output_lines = proc.output.strip().rsplit('\n') 748 timestamp_adj = int(output_lines[0].rsplit(':')[1]) 749 logging.warning("send_message: some group members have uninstalled signal: %s", output_lines[2:]) 750 # 2 lines for error explanation, rest are failed numbers 751 self.callbacks['sending_done'](envelope, 'ignore_receipts', timestamp_adj) 752 else: 753 self.callbacks['sending_done'](envelope, 'send_failed') 754 return 755 756 # Set envelope timestamp to that returned by signal-cli 757 try: 758 timestamp_adj = int(proc.output.rsplit(maxsplit=1)[1]) 759 except (IndexError, AttributeError) as err: 760 logging.error("send_message: Failed to get adjusted envelope timestamp") 761 logging.exception(err) 762 self.callbacks['sending_done'](envelope) 763 else: 764 self.callbacks['sending_done'](envelope, 'sent', timestamp_adj) 765 766 self._send_message_dbus_cmd( 767 message, 768 attachment_paths, 769 contact_id, 770 is_group, 771 callback=after_send_proc_returns, 772 ) 773 774 logging.info('send_message: %s', envelope) 775 self.callbacks['sending_message'](envelope) 776 777 def rename_contact(self, contact_id, new_name, is_group=False, *proc_args, **proc_kwargs): 778 """Rename a contact or group. 779 780 If a contact does not exist, it will be created. Changes to groups are sent to the server, changes to individual contacts are local. 781 """ 782 783 if not is_group: 784 args = [ 785 "org.asamk.Signal.setContactName", 786 "string:" + contact_id, 787 "string:" + new_name, 788 ] 789 else: 790 args = [ 791 "org.asamk.Signal.updateGroup", 792 "array:byte:" + b64_to_bytearray(contact_id), 793 "string:" + new_name, 794 "array:string:" + '', # members 795 "string:" + '' # avatar 796 ] 797 self._dbus_send_signal_cli(args, *proc_args, **proc_kwargs) 798 799 def get_group_name(self, group_id, callback, *cb_args, **cb_kwargs): 800 def proc_callback(proc): 801 try: 802 name = proc.output.split('\n')[1][11:-1] # Ad-hoc parsing of `dbus-send` output 803 except IndexError: 804 name = group_id[:10] + '[..]' 805 callback(name, *cb_args, **cb_kwargs) 806 args = [ 807 "org.asamk.Signal.getGroupName", 808 "array:byte:" + b64_to_bytearray(group_id) 809 ] 810 self._dbus_send_signal_cli(args, callback=proc_callback) 811 812 def get_group_members(self, group_id, callback, *cb_args, **cb_kwargs): 813 def proc_callback(proc): 814 members_ids = set() 815 for line in proc.output.split('\n')[2:-1]: 816 # Ad hoc parsing of `dbus-send` output 817 phone_num = line[14:-1] 818 members_ids.add(phone_num) 819 callback(members_ids, *cb_args, **cb_kwargs) 820 args = [ 821 "org.asamk.Signal.getGroupMembers", 822 "array:byte:" + b64_to_bytearray(group_id) 823 ] 824 self._dbus_send_signal_cli(args, callback=proc_callback) 825 826 def get_signal_cli_version(self, callback, *cb_args, **cb_kwargs): 827 def proc_callback(proc): 828 version_num = proc.output.split('\n')[1][11:-1] # Ad-hoc parsing of `dbus-send` output 829 version_string = "signal-cli " + version_num 830 callback(version_string, *cb_args, **cb_kwargs) 831 args = ["org.asamk.Signal.version"] 832 self._dbus_send_signal_cli(args, callback=proc_callback) 833 834 @property 835 def is_dbus_service_running(self): 836 args = [ 837 '--dest=org.freedesktop.DBus', 838 '/org/freedesktop/DBus', 839 'org.freedesktop.DBus.ListNames' 840 ] 841 proc = self._dbus_send(args, async_proc=False, capture_output=True, text=True, check=True) 842 signal_cli_str = 'string "org.asamk.Signal"' 843 return signal_cli_str in proc.stdout 844 845 def _run_when_dbus_service_started(self, callback): 846 poll_freq = 1 # seconds between polls 847 def set_alarm(main_loop, _user_data=None): 848 if self.is_dbus_service_running: 849 callback() 850 else: 851 main_loop.set_alarm_in(poll_freq, set_alarm) 852 set_alarm(self.main_loop) 853 854 855# ############################################################################# 856# signal-cli data 857# ############################################################################# 858 859 860class SignalData: 861 def __init__(self, username): 862 self._username = username 863 self._file_path = os.path.join(SIGNALCLI_DATA_FOLDER, self._username) 864 865 if not os.path.exists(self._file_path): 866 self._file_path = os.path.join(SIGNALCLI_LEGACY_DATA_FOLDER, self._username) 867 if not os.path.exists(self._file_path): 868 raise FileNotFoundError(self._username + " does not exist!") 869 870 self._data = None 871 872 def parse_data_file(self): 873 with open(self._file_path) as f: 874 self._data = json.load(f) 875 876 logging.debug("signal-cli account: %s device", "linked" if self.is_linked_device else "master") 877 878 try: 879 indivs = self._get_recipients() 880 except FileNotFoundError: 881 indivs = self._get_recipients_v1() 882 883 groups = [] 884 for g in get_nested(self._data, 'groupStore', 'groups', default=()): 885 if is_group_v2(g): 886 group_id = g['groupId'] 887 cached_name = self._get_group_v2_cache_name(group_id) 888 g['name'] = cached_name or group_id[:10] + '[..]' 889 if g.get('archived') or not g.get('name'): 890 continue 891 g['name'] = strip_non_printable_chars(g['name']) 892 groups.append(g) 893 894 return indivs, groups 895 896 def _get_group_v2_cache_name(self, group_id): 897 # This is a crude hack to extract a group name from the group-cache's protobuf binary file without dealing with protobuf or querying signal-cli's `getGroupName`. 898 # See 899 # https://github.com/AsamK/signal-cli/issues/386 900 cache = None 901 for filename in ( 902 group_id.replace("/", "_"), 903 b64_to_hex_str(group_id) 904 ): 905 # Since signal-cli v0.7.2, group-cache files' names are `group_id`s with `/` replaced with `_`. Existing group-cache filenames (hex-formatted) from v0.7.{0,1} are kept without renaming, so here we have to try both. 906 path = os.path.join(self._file_path + '.d', 'group-cache', filename) 907 try: 908 with open(path, 'br') as fo: 909 cache = fo.read() 910 except FileNotFoundError: 911 logging.debug( 912 "Could not open v2 group cache file\n\t" 913 "group_id = %s\n\t" 914 "filepath = %s", 915 group_id, path 916 ) 917 continue 918 else: 919 break 920 if cache is None: 921 logging.warning( 922 "Could not open v2 group cache file\n\t" 923 "group_id = %s", 924 group_id 925 ) 926 return None 927 cache_part = cache[2:] 928 for name_end_str in (b'"\x00', b'\x1aI'): 929 cache_part = cache_part.partition(name_end_str)[0] 930 if len(cache_part) == len(cache) - 2: 931 return None 932 try: 933 return cache_part.decode() 934 except UnicodeDecodeError: 935 return None 936 937 def _get_recipients(self): 938 recipients_store_file = os.path.join(self._file_path + '.d', 'recipients-store') 939 with open(recipients_store_file) as f: 940 recipients_data = json.load(f) 941 942 ret = [] 943 for recip in recipients_data['recipients']: 944 # Skip contact if the number is `null` 945 if not recip['number']: 946 continue 947 948 # Flatten contact's data dictionary to match signal-cli data file v1 structure 949 try: 950 for key, val in recip['contact'].items(): 951 recip[key] = val 952 except (KeyError, AttributeError): 953 pass 954 955 # Strip bidi chars (see #115) 956 try: 957 recip['name'] = strip_non_printable_chars(recip['name']) 958 except (KeyError, AttributeError): 959 pass 960 961 # Add profile name 962 profile_name = '' 963 for n in ('givenName', 'familyName'): 964 name_part = get_nested(recip, 'profile', n) 965 if name_part: 966 profile_name += name_part 967 recip['profile_name'] = profile_name or None 968 969 ret.append(recip) 970 return ret 971 972 def _get_recipients_v1(self): 973 profile_names = self._get_profile_names_v1() 974 975 indivs = [] 976 for cont in self._data['contactStore']['contacts']: 977 if not cont['number']: 978 continue 979 cont['profile_name'] = profile_names.get(get_contact_id(cont)) 980 # snake_case in key name, so it should never clash with signal-cli's own key names, in camelCase. 981 try: 982 cont['name'] = strip_non_printable_chars(cont['name']) 983 except (KeyError, AttributeError): 984 pass 985 indivs.append(cont) 986 987 return indivs 988 989 def _get_profile_names_v1(self): 990 profile_names = {} 991 try: 992 profiles = self._data['profileStore']['profiles'] 993 except KeyError: 994 return profile_names 995 for prof in profiles: 996 try: 997 num = prof['name'] 998 profile_name = prof['profile']['name'] 999 except (KeyError, TypeError): 1000 continue 1001 profile_names[num] = profile_name 1002 return profile_names 1003 1004 @property 1005 def own_num(self): 1006 return self._data['username'] 1007 1008 @property 1009 def is_linked_device(self): 1010 # Assuming master device must always get `deviceId == 1` 1011 return self._data['deviceId'] != 1 1012 1013 1014class Contact: 1015 1016 # A `Contact` can be either an individual contact or a group. 1017 # This class uses the _record dict with contact's details, which is what is obtained from contactsStore and groupsStore in signal-cli data file's json structure. 1018 1019 def __init__(self, record): 1020 self._record = record 1021 if self.is_group: 1022 self.members_ids = set() 1023 self.member_contacts = set() 1024 1025 def __getattr__(self, attr): 1026 # A helper function to access values in contact's dict `record`. 1027 return self._record.get(attr) 1028 1029 def update_record(self, update_dict): 1030 self._record.update(update_dict) 1031 1032 @property 1033 def is_group(self): 1034 return is_contact_group(self._record) 1035 1036 @property 1037 def is_group_v2(self): 1038 return is_group_v2(self._record) 1039 1040 @property 1041 def id(self): 1042 return get_contact_id(self._record) 1043 1044 @property 1045 def name_or_id(self): 1046 return self.name or self.profile_name or self.id 1047 1048 1049class Contacts: 1050 def __init__(self, sigdata): 1051 self._sigdata = sigdata 1052 self.reload() 1053 1054 def reload(self): 1055 indivs_dicts, groups_dicts = self._sigdata.parse_data_file() 1056 self.indivs = set(Contact(c) for c in indivs_dicts) 1057 self.groups = set(Contact(g) for g in groups_dicts) 1058 self.map = {c.id: c for c in self.indivs | self.groups} 1059 1060 def set_groups_membership(self): 1061 for group in self.groups: 1062 group.members_ids.discard(self._sigdata.own_num) 1063 group.member_contacts = self._get_group_members(group) 1064 # Naming: group.members == group._record['members'] (from signal-cli data) 1065 1066 def _get_group_members(self, group): 1067 members = set() 1068 for mid in group.members_ids: 1069 mem = self.map.get(mid) 1070 if mem is None: 1071 # Some members of a group might not be in my `contacts`, so they have no Contact obj associated with them. 1072 mem = Contact({"number": mid}) 1073 members.add(mem) 1074 return members 1075 1076 def get_by_id(self, contact_id): 1077 return self.map.get(contact_id) 1078 1079 1080# ############################################################################# 1081# chats data 1082# ############################################################################# 1083 1084 1085class Message: 1086 1087 _get_delivery_status = noop 1088 _get_contact = noop 1089 1090 @classmethod 1091 def set_class_functions(cls, get_delivery_status, get_contact): 1092 cls._get_delivery_status = get_delivery_status 1093 cls._get_contact = get_contact 1094 1095 __slots__ = ("envelope", "reactions", "remote_delete") 1096 1097 def __init__(self, envelope): 1098 self.envelope = envelope 1099 1100 def __eq__(self, other_msg): 1101 return self.envelope == other_msg.envelope 1102 1103 def __lt__(self, other_msg): 1104 return self.timestamp < other_msg.timestamp 1105 1106 @property 1107 def timestamp(self): 1108 return get_envelope_time(self.envelope) 1109 1110 @timestamp.setter 1111 def timestamp(self, ts_new): 1112 # NOTE: For Message in Chat, use Chat.adjust_timestamp(), rather then this setter directly, to ensure that Chat remains sorted. 1113 self.envelope['timestamp'] = self.envelope['dataMessage']['timestamp'] = ts_new 1114 1115 @property 1116 def text(self): 1117 if self.mentions: 1118 return self.text_w_mentions() 1119 else: 1120 return get_envelope_msg(self.envelope) 1121 1122 @property 1123 def attachments(self): 1124 return get_envelope_attachments(self.envelope) 1125 1126 @property 1127 def mentions(self): 1128 return get_envelope_mentions(self.envelope) 1129 1130 @property 1131 def delivery_status(self): 1132 if is_envelope_outgoing(self.envelope): 1133 return self._get_delivery_status(self.timestamp).str 1134 else: 1135 return 'received_by_me' 1136 1137 @property 1138 def delivery_status_detailed(self): 1139 return self._get_delivery_status(self.timestamp) 1140 1141 @property 1142 def contact_id(self): 1143 return get_envelope_contact_id(self.envelope) 1144 1145 @property 1146 def sender_num(self): 1147 return get_envelope_sender_id(self.envelope) 1148 1149 @property 1150 def sender(self): 1151 return self._get_contact(self.sender_num) 1152 1153 def add_reaction(self, envelope): 1154 self.reactions = getattr(self, 'reactions', {}) # pylint: disable=attribute-defined-outside-init 1155 # Don't want to add `reactions` attribute to every Message instance; only to those that actually have reactions. 1156 self.reactions[get_envelope_sender_id(envelope)] = envelope 1157 1158 def text_w_mentions(self, bracket_char=''): 1159 # See also: What is the Mention's "length" parameter? 1160 # https://github.com/AsamK/signal-cli/discussions/409 1161 ret = '' 1162 pos = 0 1163 text = get_envelope_msg(self.envelope) 1164 for mention in self.mentions: 1165 contact_num = mention['name'] 1166 contact = self._get_contact(contact_num) 1167 contact_name = contact.name_or_id if contact else contact_num 1168 start = mention['start'] 1169 ret = ''.join(( 1170 ret, 1171 text[pos:start], 1172 bracket_char, 1173 "@", contact_name, 1174 bracket_char, 1175 )) 1176 pos = start + 1 1177 ret += text[pos:] 1178 return ret 1179 1180 1181class Chat(urwid.MonitoredList): 1182 # An `urwid.MonitoredList` is a subclass of a regular `list`, that modifies the "mutating" (modifying `self`) methods, so that they call the `self._modified()` method at the end. 1183 # The `self._modified()` method is set to simply do `pass`, until a callback is assigned to it in ListWalker's __init__. 1184 1185 def index(self, msg): 1186 """More efficient way to locate an object in the sorted list than just using super().index() method. 1187 1188 Since Chat should always be sorted, a member object can be located faster using bisect.""" 1189 1190 try: 1191 msg_last = self[-1] 1192 except IndexError as exc: 1193 # Return "message-not-found" when chat history is blank. 1194 raise ValueError from exc 1195 if msg_last == msg: 1196 # First check the last msg before doing bisect_left, as bisect starts in the middle. See also comment in self.add() 1197 return len(self) - 1 1198 index = bisect.bisect_left(self, msg) 1199 if index != len(self) and self[index] == msg: 1200 return index 1201 raise ValueError 1202 1203 def index_ts(self, timestamp, sender_num=None): 1204 """Return an index of a message in Chat with a given timestamp, from a given phone number""" 1205 1206 def match_test(msg): 1207 return ( 1208 msg.timestamp == timestamp 1209 and 1210 (sender_num is None or msg.sender_num == sender_num) 1211 ) 1212 1213 try: 1214 msg = self[-1] 1215 except IndexError as exc: 1216 # Return "message-not-found" when chat history is blank. 1217 raise ValueError from exc 1218 if match_test(msg): 1219 # First check the last msg before doing bisect_left, as bisect starts in the middle. See also comment in self.add() 1220 return len(self) - 1 1221 dummy_message = Message({'timestamp': timestamp}) 1222 index = bisect.bisect_left(self, dummy_message) 1223 if index != len(self): 1224 msg = self[index] 1225 if match_test(msg): 1226 return index 1227 raise ValueError 1228 1229 def get_msg_for_envelope(self, envelope): 1230 dummy_message = Message(envelope) 1231 index = self.index(dummy_message) 1232 return self[index] 1233 1234 def get_msg_for_timestamp(self, timestamp, sender_num=None): 1235 ind = self.index_ts(timestamp, sender_num) 1236 return self[ind] 1237 1238 def add(self, msg): 1239 try: 1240 msg_last = self[-1] 1241 except IndexError: 1242 # The chat is empty 1243 self.append(msg) 1244 return 1245 if msg_last.timestamp < msg.timestamp: 1246 # Check first if the message should be appended at the end of Chat container. 1247 # This is the case for most of the messages. The exceptions might be the sync messages. 1248 # `bisect` starts searching for the place for new item from the middle of the container, which takes more steps. 1249 self.append(msg) 1250 else: 1251 bisect.insort(self, msg) 1252 1253 def adjust_timestamp(self, msg, timestamp_adj): 1254 """Adjust message's timestamp, ensuring that Chat remains sorted""" 1255 index = self.index(msg) 1256 msg.timestamp = timestamp_adj 1257 1258 # Ensure that Chat remains sorted 1259 # This should rarely be necessary, as signal-cli's timestamp adjustments are small enough (~50ms) to not modify the messages' order. 1260 try: 1261 if self[index-1].timestamp <= msg.timestamp <= self[index+1].timestamp: 1262 return 1263 except IndexError: 1264 # The `msg` is either the last or the only message in Chat. 1265 # In the former case, there's no need to move the msg in chat: if its timestamp moved forwards, the msg is still the latest; if it moved backwards far enough to now come before the previous message, the first clause of the `if` test above would fail, and the `IndexError` would not be raised. 1266 return 1267 logging.debug("Chat: moving msg to maintain sorted history.") 1268 del self[index] 1269 self.add(msg) 1270 1271 1272class Chats: 1273 def __init__(self): 1274 self._dict = collections.defaultdict(Chat) 1275 1276 def __getitem__(self, contact_id): 1277 return self._dict[contact_id] 1278 1279 def get_chat_for_envelope(self, envelope): 1280 return self._dict[get_envelope_contact_id(envelope)] 1281 1282 def get_msg_for_envelope(self, envelope): 1283 try: 1284 chat = self.get_chat_for_envelope(envelope) 1285 msg = chat.get_msg_for_envelope(envelope) 1286 return msg 1287 except (KeyError, ValueError, IndexError) as err: 1288 logging.error("get_msg_for_envelope(): envelope = %s", envelope) 1289 logging.exception(err) 1290 raise ValueError from err 1291 1292 def get_msg_for_timestamp(self, envelope, timestamp, sender_num=None): 1293 chat = self.get_chat_for_envelope(envelope) 1294 return chat.get_msg_for_timestamp(timestamp, sender_num) 1295 1296 def add_envelope(self, envelope): 1297 msg = Message(envelope) 1298 chat = self.get_chat_for_envelope(envelope) 1299 chat.add(msg) 1300 return msg 1301 1302 def add_reaction_envelope(self, envelope): 1303 reaction = get_envelope_reaction(envelope) 1304 try: 1305 msg = self.get_msg_for_timestamp( 1306 envelope, 1307 timestamp=reaction['targetSentTimestamp'], 1308 sender_num=reaction['targetAuthor'] 1309 ) 1310 except ValueError: 1311 logging.error("Message not found for reaction: %s", envelope) 1312 return None 1313 msg.add_reaction(envelope) 1314 return msg 1315 1316 def add_remote_delete_envelope(self, envelope): 1317 try: 1318 msg = self.get_msg_for_timestamp( 1319 envelope, 1320 timestamp=get_envelope_remote_delete(envelope)['timestamp'], 1321 sender_num=get_envelope_sender_id(envelope) 1322 ) 1323 except ValueError: 1324 logging.error("Message not found for remote delete envelope: %s", envelope) 1325 return None 1326 msg.remote_delete = envelope 1327 return msg 1328 1329 def delete_message(self, msg, index=None): 1330 # The `index` is optional, but if known, will save cpu cycles for finding the message in chat. 1331 chat = self.get_chat_for_envelope(msg.envelope) 1332 try: 1333 if index is None: 1334 index = chat.index(msg) 1335 del chat[index] 1336 except (ValueError, IndexError) as err: 1337 logging.info("delete_message(): message not found; envelope = %s", msg.envelope) 1338 logging.exception(err) 1339 raise ValueError from err 1340 1341 def serialize(self): 1342 envelopes = [] 1343 for chat in self._dict.values(): 1344 for msg in chat: 1345 envelope = msg.envelope 1346 if 'typingMessage' in envelope: 1347 continue 1348 envelopes.append(envelope) 1349 try: 1350 envelopes.extend(msg.reactions.values()) 1351 except AttributeError: 1352 pass 1353 try: 1354 # Currently, the "deleted" messages are saved in the history file. 1355 envelopes.append(msg.remote_delete) 1356 except AttributeError: 1357 pass 1358 return envelopes 1359 1360 1361class UnreadCounts(collections.defaultdict): 1362 def __init__(self, *args, **kwargs): 1363 super().__init__(int, *args, **kwargs) 1364 1365 @property 1366 def total(self): 1367 return sum(self.values()) 1368 1369 def serialize(self): 1370 return {contact_id: count for contact_id, count in self.items() if count != 0} 1371 1372 1373class DeliveryStatus: 1374 1375 DelivReadConts = collections.namedtuple('DelivReadConts', ['delivered', 'read']) 1376 1377 class DetailedStatus: 1378 1379 __slots__ = ("str", "when", "grp_memb_remain_un") 1380 1381 def __init__(self, status='', when=0, grp_memb_remain_un=None): 1382 self.str = status 1383 self.when = when 1384 if grp_memb_remain_un: 1385 self.grp_memb_remain_un = DeliveryStatus.DelivReadConts( 1386 *( 1387 set(contacts) if contacts else set() 1388 for contacts in grp_memb_remain_un 1389 ) 1390 ) 1391 1392 def set_grp_memb_status(self, grp_member, status): 1393 try: 1394 grp_memb_remain_un = self.grp_memb_remain_un 1395 except AttributeError: 1396 return None 1397 grp_memb_remaining = getattr(grp_memb_remain_un, status) 1398 try: 1399 grp_memb_remaining.remove(grp_member) 1400 except (KeyError, AttributeError): 1401 # This happens when 'read' receipt arrives before 'delivered', or after getting multiple copies of the same receipt message. 1402 grp_memb_remaining = grp_memb_remain_un.delivered 1403 try: 1404 grp_memb_remaining.remove(grp_member) 1405 except (KeyError, AttributeError): 1406 return None 1407 if not grp_memb_remain_un.delivered and grp_memb_remain_un.read: 1408 return 'delivered' 1409 1410 if status == 'delivered': 1411 remaining_unread = grp_memb_remain_un.read 1412 remaining_unread.add(grp_member) 1413 if grp_memb_remaining: 1414 return None 1415 return status 1416 1417 if any(grp_memb_remain_un): 1418 return None 1419 del self.grp_memb_remain_un 1420 return status 1421 1422 def serialize(self): 1423 ret = [] 1424 for attr in self.__slots__: 1425 val = getattr(self, attr, None) 1426 ret.append(val) 1427 1428 # Skip empty values at the end 1429 for ind, val in enumerate(reversed(ret)): 1430 if val: 1431 if ind != 0: 1432 ret = ret[:-ind] 1433 break 1434 else: 1435 ret = [] 1436 1437 return ret 1438 1439 def _make_markup_map(): # pylint: disable=no-method-argument 1440 status_text = { 1441 # Order matters: 'higher' status can't be 're-set' to a 'lower' one. 1442 '': '<<', 1443 'received_by_me': '>>', 1444 'sending': '', 1445 'send_failed': '✖', 1446 'sent': '✓', 1447 'delivered': '✓✓', 1448 'read': '✓✓', 1449 'ignore_receipts': '✓', 1450 } 1451 max_len = max([len(text) for text in status_text.values()]) 1452 markup_map = {} 1453 for status, text in status_text.items(): 1454 markup_map[status] = ( 1455 ('bold', text) 1456 if status not in ('read', 'ignore_receipts') 1457 else ('strikethrough', text) 1458 ) 1459 return (markup_map, max_len) 1460 1461 MARKUP_MAP, MARKUP_WIDTH = _make_markup_map() 1462 MAX_GROUP_SIZE = 15 1463 1464 def __init__(self): 1465 self._status_map = {} 1466 self._buffered = {} 1467 1468 self._status_order = {key: ind for ind, key in enumerate(self.MARKUP_MAP)} 1469 1470 self.on_status_changed = noop 1471 1472 def get_detailed(self, timestamp): 1473 return self._status_map.get(timestamp, self.DetailedStatus()) 1474 1475 def get_str(self, timestamp): 1476 return self.get_detailed(timestamp).str 1477 1478 def on_receive_receipt(self, envelope): 1479 receipt_contact = get_envelope_sender_id(envelope) 1480 receipt_message = envelope['receiptMessage'] 1481 if receipt_message['isDelivery']: 1482 status = 'delivered' 1483 elif receipt_message['isRead']: 1484 status = 'read' 1485 else: 1486 logging.error('on_receive_receipt: unknown receipt type in envelope %s', envelope) 1487 return 1488 timestamps = receipt_message['timestamps'] 1489 when = receipt_message['when'] 1490 for timestamp in timestamps: 1491 if timestamp not in self._status_map: 1492 # Receipt is received before 'sent' status set (e.g. because receipt received before a `sync` message for a message sent from another device) 1493 self._buffer_receipt(timestamp, status, receipt_contact) 1494 else: 1495 self._set(timestamp, status, when, receipt_contact) 1496 1497 def on_sending_message(self, envelope, group_members=None): 1498 timestamp = get_envelope_time(envelope) 1499 self._set(timestamp, 'sending') 1500 if group_members is not None: 1501 self._set_group_members(timestamp, group_members) 1502 1503 def on_sending_done(self, envelope, status='sent', timestamp_adj=None): 1504 timestamp = get_envelope_time(envelope) 1505 if timestamp not in self._status_map: 1506 logging.error("DeliveryStatus: on_sending_done(): no corresponding timestamp in _status_map for envelope = %s", envelope) 1507 return 1508 self._set(timestamp, status) 1509 if status == 'send_failed': 1510 return 1511 if timestamp_adj is not None: 1512 self._adjust_timestamp(timestamp, timestamp_adj) 1513 1514 def _adjust_timestamp(self, timestamp_orig, timestamp_adj): 1515 self._status_map[timestamp_adj] = self._status_map.pop(timestamp_orig) 1516 1517 def _set(self, timestamp, status, when=None, receipt_contact=None): 1518 curr_status_detailed = self._status_map.setdefault( 1519 timestamp, self.DetailedStatus() 1520 ) 1521 curr_status = curr_status_detailed.str 1522 1523 if self._status_order[status] <= self._status_order[curr_status]: 1524 return 1525 1526 is_group = getattr(curr_status_detailed, 'grp_memb_remain_un', False) 1527 if is_group and receipt_contact is not None: 1528 status = curr_status_detailed.set_grp_memb_status(receipt_contact, status) 1529 if status is None: 1530 return 1531 1532 logging.info("Setting status = `%s` for timestamp = %s", status, timestamp) 1533 curr_status_detailed.str = status 1534 if when is not None: 1535 curr_status_detailed.when = when 1536 self.on_status_changed(timestamp, status) 1537 1538 def _set_group_members(self, timestamp, group_members): 1539 status_detailed = self._status_map[timestamp] 1540 1541 if len(group_members) > self.MAX_GROUP_SIZE: 1542 self._set(timestamp, 'ignore_receipts') 1543 return 1544 1545 status_detailed.grp_memb_remain_un = self.DelivReadConts(set(group_members), set()) 1546 1547 def _buffer_receipt(self, timestamp, status, contact): 1548 logging.debug("DeliveryStatus: buffering timestamp = %s", timestamp) 1549 buffered = self._buffered.setdefault( 1550 timestamp, 1551 self.DelivReadConts( 1552 set(), set() 1553 ) 1554 ) 1555 buffered_contacts = getattr(buffered, status) 1556 buffered_contacts.add(contact) 1557 1558 def process_buffered_receipts(self, timestamp): 1559 buffered = self._buffered.get(timestamp) 1560 if buffered is None: 1561 return 1562 logging.debug("Processing buffered receipts: timestamp = %s, self._buffered = %s", timestamp, self._buffered) 1563 for status in buffered._fields: 1564 buffered_contacts = getattr(buffered, status) or [] 1565 for contact in buffered_contacts: 1566 self._set(timestamp, status, receipt_contact=contact) 1567 del self._buffered[timestamp] 1568 1569 def delete(self, timestamp): 1570 try: 1571 del self._status_map[timestamp] 1572 except KeyError: 1573 pass 1574 1575 def dump(self): 1576 ret = {} 1577 for timestamp, status_detailed in self._status_map.items(): 1578 status_serialized = status_detailed.serialize() 1579 if status_serialized: 1580 ret[timestamp] = status_serialized 1581 return ret 1582 1583 def load(self, status_map): 1584 for timestamp, status_detailed in status_map.items(): 1585 self._status_map[int(timestamp)] = self.DetailedStatus(*status_detailed) 1586 1587 1588class TypingIndicators: 1589 def __init__(self, chats): 1590 self._chats = chats 1591 self._map = {} 1592 self.set_alarm_in = self.remove_alarm = noop 1593 # In some cases the STOPPED typing message is never sent, e.g. when the last key is `backspace`. 1594 # https://github.com/AsamK/signal-cli/issues/458 1595 # So we need to manually set a timeout / alarm to remove the typing indicator in this case. 1596 1597 def on_typing_message(self, envelope): 1598 sender_num = get_envelope_sender_id(envelope) 1599 typing_event = get_nested(envelope, 'typingMessage', 'action') 1600 self.remove(sender_num) 1601 if typing_event == 'STARTED': 1602 self._add(sender_num, envelope) 1603 elif typing_event != 'STOPPED': 1604 logging.error("on_typing_message: unknown `action` type in %s", envelope) 1605 1606 def _add(self, sender_num, envelope): 1607 msg = self._chats.add_envelope(envelope) 1608 alarm = self.set_alarm_in(10, lambda *_: self.remove(sender_num)) 1609 self._map[sender_num] = (msg, alarm) 1610 1611 def remove(self, sender_num): 1612 try: 1613 msg, alarm = self._map.pop(sender_num) 1614 except KeyError: 1615 return 1616 self.remove_alarm(alarm) 1617 try: 1618 self._chats.delete_message(msg) 1619 except ValueError: 1620 logging.info("TypingIndicators: remove: index not found for envelope = %s", msg.envelope) 1621 1622 1623class ChatsData: 1624 def __init__(self, history_file): 1625 self.chats = Chats() 1626 self.unread_counts = UnreadCounts() 1627 self.delivery_status = DeliveryStatus() 1628 self.typing_indicators = TypingIndicators(self.chats) 1629 self._history = history_file 1630 self.current_contact = None 1631 1632 if self._history: 1633 self._load_history() 1634 atexit.register(self._save_history) 1635 1636 @property 1637 def current_chat(self): 1638 if self.current_contact: 1639 return self.chats[self.current_contact.id] 1640 return None 1641 1642 def _save_history(self): 1643 envelopes = self.chats.serialize() 1644 unread_counts = self.unread_counts.serialize() 1645 delivery_status = self.delivery_status.dump() 1646 items = { 1647 'envelopes': envelopes, 1648 'unread_counts': unread_counts, 1649 'delivery_status': delivery_status, 1650 } 1651 1652 class JSONSetEncoder(json.JSONEncoder): 1653 # Using a custom json encoder to encode `set`s from `DeliveryStatus` group_members. 1654 def default(self, o): 1655 try: 1656 return json.JSONEncoder.default(self, o) 1657 except TypeError: 1658 if isinstance(o, set): 1659 return tuple(o) 1660 raise 1661 1662 with open(self._history, 'w') as history_fileobj: 1663 json.dump(items, history_fileobj, ensure_ascii=False, cls=JSONSetEncoder, indent=2) 1664 1665 def _load_history(self): 1666 history_backup_filename = self._history + '.bak' 1667 for history_filename in (self._history, history_backup_filename): 1668 try: 1669 with open(history_filename, 'r') as history_fileobj: 1670 history = json.load(history_fileobj) 1671 except (FileNotFoundError, json.JSONDecodeError) as err: 1672 if isinstance(err, json.JSONDecodeError): 1673 logging.error("History file corrupted, attempting to read from backup.") 1674 continue 1675 else: 1676 break 1677 else: 1678 logging.warning("Could not read history from file.") 1679 return 1680 os.replace(history_filename, history_backup_filename) 1681 # If both `history` and `history.bak` are missing, the line above (amounting to `mv history.bak history.bak`) does not throw an error. 1682 1683 self.delivery_status.load(history.get('delivery_status', {})) 1684 1685 for envelope in history['envelopes']: 1686 if get_envelope_reaction(envelope): 1687 self.chats.add_reaction_envelope(envelope) 1688 elif get_envelope_remote_delete(envelope): 1689 self.chats.add_remote_delete_envelope(envelope) 1690 else: 1691 self.chats.add_envelope(envelope) 1692 1693 self.unread_counts = UnreadCounts(history.get('unread_counts', {})) 1694 1695 1696# ############################################################################# 1697# urwid palette 1698# ############################################################################# 1699 1700 1701PALETTE = [ 1702 ('bold', 'bold', ''), 1703 ('italic', 'italics', ''), 1704 ('bolditalic', 'bold,italics', ''), 1705 ('strikethrough', 'strikethrough', ''), 1706] 1707 1708REVERSED_FOCUS_MAP = { 1709 None: 'reversed', 1710} 1711 1712 1713def _fill_palette(): 1714 global PALETTE, REVERSED_FOCUS_MAP # pylint: disable=global-statement 1715 palette_reversed = [] 1716 for item in PALETTE: 1717 name, fg = item[0:2] 1718 name_rev = '_'.join(('reversed', name)) 1719 fg_rev = ','.join(('standout', fg)) 1720 palette_reversed.append((name_rev, fg_rev, '')) 1721 REVERSED_FOCUS_MAP[name] = name_rev 1722 PALETTE.extend(palette_reversed) 1723 PALETTE.append(('reversed', 'standout', '')) 1724 PALETTE.append(('line_focused', 'dark blue', '')) 1725 1726 1727_fill_palette() 1728 1729 1730class Color: 1731 1732 SIGNAL_COLORS_PALETTE = [ 1733 ('pink', 'dark magenta', '', None, '#f08', None), 1734 ('red', 'dark red', '', None, '#f00', None), 1735 ('orange', 'brown', '', None, '#f60', None), 1736 ('purple', 'dark magenta', '', None, '#a0f', None), 1737 ('indigo', 'dark blue', '', None, '#60f', None), 1738 ('blue_grey', 'brown', '', None, '#680', None), 1739 ('ultramarine', 'dark blue', '', None, '#06f', None), 1740 ('blue', 'dark cyan', '', None, '#06a', None), 1741 ('teal', 'dark cyan', '', None, '#086', None), 1742 ('green', 'dark green', '', None, '#0a0', None), 1743 ('light_green', 'dark green', '', None, '#0d0', None), 1744 ('brown', 'brown', '', None, '#880', None), 1745 ('grey', 'light gray', '', None, 'g52', None), 1746 ] 1747 1748 # The colors are defined in ..? 1749 # Signal-Android/app/src/main/java/org/thoughtcrime/securesms/contacts/avatars/ContactColorsLegacy.java 1750 # Signal-Android/app/src/main/res/values/material_colors.xml 1751 # Using `dark ...` colors, because many terminals show `light ...` as `bold`: 1752 # "Some terminals also will display bright colors in a bold font even if you don’t specify bold." 1753 # https://urwid.readthedocs.io/en/latest/manual/displayattributes.html#bold-underline-standout 1754 1755 HIGH_COLOR_RE = re.compile(r""" 1756 \#[0-9A-Fa-f]{3} 1757 | 1758 g\#[0-9A-Fa-f]{2} 1759 | 1760 g[0-9]{1,3} 1761 | 1762 h[0-9]{1,3} 1763 """, re.VERBOSE) 1764 # https://urwid.readthedocs.io/en/latest/reference/attrspec.html#urwid.AttrSpec 1765 1766 def __init__(self, args_color): 1767 self._args_color = args_color 1768 self.high_color_mode = False 1769 self._colors = self._set_color_palette() 1770 1771 def _exit(self): 1772 sys.exit("ERROR: could not parse the `color` argument: " + repr(self._args_color)) 1773 1774 def _is_high_color(self, color_str): 1775 # Test if `color_str` is a "high-color" (256 colors) value 1776 return self.HIGH_COLOR_RE.fullmatch(color_str) 1777 1778 def _add_palette_entry(self, name, val): 1779 if self._is_high_color(val): 1780 PALETTE.append((name, '', '', None, val, None)) 1781 self.high_color_mode = True 1782 else: 1783 PALETTE.append((name, val, '')) 1784 1785 def _set_color_palette(self): 1786 if self._args_color == 'high': 1787 self.high_color_mode = True 1788 1789 if self._args_color is True or self._args_color == 'high': 1790 PALETTE.extend(self.SIGNAL_COLORS_PALETTE) 1791 return self._args_color 1792 1793 try: 1794 color_spec = json.loads(self._args_color) 1795 except (TypeError, json.decoder.JSONDecodeError): 1796 self._exit() 1797 1798 if isinstance(color_spec, list) and len(color_spec) == 2: 1799 for sent_or_recv, col in zip( 1800 ('sent_color', 'recv_color'), 1801 color_spec, 1802 ): 1803 self._add_palette_entry(sent_or_recv, col) 1804 return color_spec 1805 elif isinstance(color_spec, dict): 1806 PALETTE.extend(self.SIGNAL_COLORS_PALETTE) 1807 # Adding a tuple to PALETTE that already has a tuple with the same "name" (i.e. the first item in tuple) overrides the old tuple. 1808 override_dict = {} 1809 for key, val in color_spec.items(): 1810 self._add_palette_entry(key, val) 1811 if is_number(key): 1812 override_dict[key] = key # sic 1813 return override_dict 1814 else: 1815 return self._exit() # `return` just to make pylint happy 1816 1817 def for_message(self, msg): 1818 try: 1819 return self._colors[msg.sender_num] 1820 except (TypeError, KeyError): 1821 pass 1822 if isinstance(self._colors, list): 1823 if is_envelope_outgoing(msg.envelope): 1824 return 'sent_color' 1825 else: 1826 return 'recv_color' 1827 if is_envelope_outgoing(msg.envelope): 1828 return 'default' 1829 try: 1830 return msg.sender.color 1831 except (TypeError, AttributeError): 1832 # In case `sender` is not in `Contacts` 1833 return 'default' 1834 1835 1836# ############################################################################# 1837# ui utility 1838# ############################################################################# 1839 1840 1841def markup_to_text(markup): 1842 # This is useful when we have only the markup; if we have the urwid.Text instance, can use its `.text` property instead. 1843 # Not currently used anywhere. 1844 if isinstance(markup, str): 1845 return markup 1846 elif isinstance(markup, tuple): 1847 return markup[1] 1848 else: 1849 return ''.join([markup_to_text(t) for t in markup]) 1850 1851 1852def get_text_markup(text_widget): 1853 """Get urwid.Text widget text, in markup format. 1854 1855 Like urwid.Text.get_text(), but returns a text markup that can be passed on to urwid.Text.set_text() or to urwid.Text() for creating a new text object""" 1856 1857 text, display_attributes = text_widget.get_text() 1858 if not display_attributes: 1859 return text 1860 markup = [] 1861 run_len_pos = 0 1862 for attr, attr_run_len in display_attributes: 1863 attr_run_end = run_len_pos + attr_run_len 1864 markup.append((attr, text[run_len_pos:attr_run_end])) 1865 run_len_pos = attr_run_end 1866 if run_len_pos != len(text): 1867 markup.append(text[run_len_pos:]) 1868 return markup 1869 1870 1871def listbox_set_body(listbox, body_new): 1872 # Can't just do `listbox.body = body_new`: 1873 # https://github.com/urwid/urwid/issues/428 1874 # pylint: disable=protected-access 1875 if body_new is listbox.body: 1876 return 1877 urwid.disconnect_signal(listbox.body, "modified", listbox._invalidate) 1878 listbox.body = body_new 1879 urwid.connect_signal(listbox.body, "modified", listbox._invalidate) 1880 1881 1882class LineBoxHighlight(urwid.WidgetWrap): 1883 def __init__(self, w, title=''): 1884 box_w = urwid.AttrMap( 1885 urwid.LineBox( 1886 urwid.AttrMap(w, ''), # need to set a "default" attribute, to not color all the contents in `w` 1887 title_align='center', 1888 title=title 1889 ), 1890 None, 1891 focus_map='line_focused', 1892 ) 1893 super().__init__(box_w) 1894 1895 1896class PopUpBox(urwid.WidgetWrap): 1897 1898 signals = ['closed'] 1899 1900 def __init__(self, widget, title='', buttons=True): 1901 self._buttons = buttons 1902 if buttons: 1903 def handle_click(_button): 1904 self._emit('closed') 1905 btn_close = urwid.Padding(urwid.Button('Close', on_press=handle_click), align='center', width=9) 1906 self._frame_w = urwid.Frame(widget, footer=btn_close, focus_part='footer') 1907 box_w = urwid.LineBox(self._frame_w, title) 1908 else: 1909 box_w = urwid.LineBox(widget) 1910 super().__init__(box_w) 1911 1912 def keypress(self, size, key): 1913 key = super().keypress(size, key) 1914 if key in ('esc', 'q'): 1915 self._emit('closed') 1916 elif key in ('tab', 'shift tab'): 1917 if self._buttons: 1918 if self._frame_w.focus_position == 'footer': 1919 self._frame_w.focus_position = 'body' 1920 else: 1921 self._frame_w.focus_position = 'footer' 1922 else: 1923 return key 1924 return None 1925 1926 1927class FocusableText(urwid.WidgetWrap): 1928 def __init__(self, markup, attr_map=None, **kwargs): 1929 self._text_w = urwid.Text(markup, **kwargs) 1930 w = urwid.AttrMap(self._text_w, attr_map, focus_map=REVERSED_FOCUS_MAP) 1931 1932 super().__init__(w) 1933 1934 def selectable(self): 1935 # Setting class variable `_selectable = True` does not work. Probably gets overwritten by the base class constructor. 1936 return True 1937 1938 def keypress(self, _size, key): # pylint: disable=no-self-use 1939 # When reimplementing selectable(), have to redefine keypress() too. 1940 # https://urwid.readthedocs.io/en/latest/reference/widget.html#urwid.Widget.selectable 1941 return key 1942 1943 def __getattr__(self, attr): 1944 return getattr(self._text_w, attr) 1945 1946 1947class LazyEvalListWalker(urwid.ListWalker): 1948 1949 """A ListWalker that creates widgets only as they come into view. 1950 1951 This ListWalker subclass saves resources by deferring widgets creation until they are actually visible. For large `contents` list, most of the items might not be viewed in a typical usage. 1952 1953 "If you need to display a large number of widgets you should implement your own list walker that manages creating widgets as they are requested and destroying them later to avoid excessive memory use." 1954 https://urwid.readthedocs.io/en/latest/manual/widgets.html#list-walkers 1955 """ 1956 1957 def __init__(self, contents, eval_func, init_focus_pos=0): 1958 if not getattr(contents, '__getitem__', None): 1959 raise urwid.ListWalkerError("ListWalker expecting list like object, got: %r" % (contents,)) 1960 self._init_focus_pos = init_focus_pos 1961 self._eval_func = eval_func 1962 self.contents = contents 1963 super().__init__() # Not really needed, just here to make pylint happy. 1964 1965 @property 1966 def contents(self): 1967 return self._contents 1968 1969 @contents.setter 1970 def contents(self, contents_new): 1971 self._remove_contents_modified_callback() 1972 self._contents = contents_new 1973 self._set_contents_modified_callback(self._modified) 1974 1975 if self._init_focus_pos < 0: 1976 self.focus = max(0, len(self.contents) + self._init_focus_pos) 1977 else: 1978 self.focus = self._init_focus_pos 1979 1980 self._modified() 1981 1982 def _set_contents_modified_callback(self, callback): 1983 try: 1984 self.contents.set_modified_callback(callback) 1985 except AttributeError: 1986 logging.warning( 1987 "Changes to object will not be automatically updated: %s", 1988 textwrap.shorten(str(self.contents), 150), 1989 ) 1990 1991 def _remove_contents_modified_callback(self): 1992 try: 1993 self.contents.set_modified_callback(noop) 1994 except AttributeError: 1995 pass 1996 1997 def _modified(self): 1998 if self.focus >= len(self.contents): 1999 # Making sure that if after some items are removed from `contents` it becomes shorter then the current `focus` position, we don't crash. 2000 self.focus = max(0, len(self.contents) - 1) 2001 super()._modified() 2002 2003 def __getitem__(self, position): 2004 item = self.contents[position] 2005 widget = self._eval_func(item, position) 2006 return widget 2007 2008 def next_position(self, position): 2009 if position >= len(self.contents) - 1: 2010 raise IndexError 2011 return position + 1 2012 2013 def prev_position(self, position): # pylint: disable=no-self-use 2014 if position <= 0: 2015 raise IndexError 2016 return position - 1 2017 2018 def set_focus(self, position): 2019 if position < 0 or position >= len(self.contents): 2020 # NOTE: there is crash in this method that I can not reliably recproduce: 2021 # Happens when I start a search through message widgets w `/` and mash the keyboard. Seems to only happen if I push many keys fast enough.. 2022 # This might well be an urwid bug 2023 raise IndexError 2024 self.focus = position 2025 self._modified() 2026 2027 def positions(self, reverse=False): 2028 ret = range(len(self.contents)) 2029 if reverse: 2030 ret = reversed(ret) 2031 return ret 2032 2033 2034class ListBoxPlus(urwid.ListBox): 2035 2036 """ListBox plus a few useful features. 2037 2038 - Vim bindings for common motions: j, k, g, G, ctrl+n/p. 2039 - Filter visible contents to the items passing test by a given function. 2040 - Updates to new `contents` are displayed automatically. Fixes an urwid bug (see listbox_set_body function). 2041 """ 2042 2043 _KEY_EQUIVS = { 2044 'k': 'up', 2045 'j': 'down', 2046 'g': 'home', 2047 'G': 'end', 2048 'ctrl p': 'up', 2049 'ctrl n': 'down', 2050 } 2051 2052 def __init__(self, body=None): 2053 if body is None: 2054 body = [] 2055 super().__init__(body) 2056 self._contents_pre_filter = self.contents 2057 2058 def _get_contents(self): 2059 try: 2060 return self.body.contents 2061 except AttributeError: 2062 return self.body 2063 2064 def _set_contents(self, contents_new): 2065 # This method does not change the self._contents_pre_filter, unlike self._set_contents_pre_filter() 2066 try: 2067 self.body.contents = contents_new 2068 except AttributeError: 2069 listbox_set_body(self, contents_new) 2070 2071 def _set_contents_pre_filter(self, contents_new): 2072 if type(contents_new) is list: # pylint: disable=unidiomatic-typecheck 2073 # If contents_new is a `list` (not one of the `ListWalker`s), make the new body the same type as the original (e.g. SimpleListWalker) 2074 # Shouldn't use `if isinstance(contents_new, list)` test: a ListWalker returns `True` for it too. 2075 contents_new = type(self.contents)(contents_new) 2076 self._set_contents(contents_new) 2077 self._contents_pre_filter = self.contents 2078 2079 contents = property(_get_contents, _set_contents_pre_filter) 2080 # Would be nice to override the base class's `body` property, so that this class can be easily replaced by any other `ListWalker`s. 2081 # However, overriding a property which is used in superclass's __init__ seems problematic. Need a way to delay the assignment of property. Maybe something like this is necessary: 2082 # https://code.activestate.com/recipes/408713-late-binding-properties-allowing-subclasses-to-ove/ 2083 2084 def try_set_focus(self, index): 2085 if index < 0: 2086 index = len(self.contents) + index 2087 try: 2088 self.focus_position = index 2089 except IndexError: 2090 pass 2091 2092 def filter_contents(self, test_function, scope=None): 2093 """Remove widgets not passing `test_function`. 2094 2095 Retain only the items in `self.contents` that return `True` when passed as arguments to `test_function`. Pre-filtered `contents` is stored before filtering and can be restored by running `filter_contents` again with `test_function=None`. 2096 The `scope` argument specifies the itarable to apply the filter to. By default, the scope is all the pre-filtered items. Passing `scope=self.contents' can be useful to further filter an already filtered contents. 2097 """ 2098 2099 # Note that if `contents` is modified directly elsewhere in the code while a filter is on, this modification applies only to the filtered contents. So, for instance the code for adding a new MessageWidget to ChatView shouldn't do `self.contents.append()`, but rather `current_chat.append()` (after doing `_set_contents_pre_filter(current_chat)`). That way the new msg will show up after the filter is removed. 2100 # Alternatively, can do `self._contents_pre_filter.append()`. That should work fine either with filter on or off. 2101 2102 if scope is None: 2103 scope = self._contents_pre_filter 2104 if test_function is None: 2105 self._set_contents(scope) 2106 else: 2107 contents_type = type(self.contents) 2108 matching_widgets = contents_type([w for w in scope if test_function(w)]) 2109 self._set_contents(matching_widgets) 2110 2111 @property 2112 def is_filter_on(self): 2113 return self.contents is not self._contents_pre_filter 2114 2115 def move_item(self, w, pos, pos_in_prefilter=None): 2116 def try_move(seq, w, pos): 2117 try: 2118 ind = seq.index(w) 2119 except ValueError: 2120 # Widget might be absent from `body` e.g. while doing a search on contacts, or if the contact is 'new' (i.e. not in Contacts yet) 2121 return 2122 if ind == pos: 2123 return 2124 seq.insert(pos, seq.pop(ind)) 2125 2126 try_move(self.contents, w, pos) 2127 2128 if self.is_filter_on: 2129 if pos_in_prefilter is None: 2130 pos_in_prefilter = pos 2131 try_move(self._contents_pre_filter, w, pos_in_prefilter) 2132 2133 def keypress(self, size, key): 2134 key = super().keypress(size, key) 2135 key_equiv = self._KEY_EQUIVS.get(key) 2136 if key_equiv: 2137 return super().keypress(size, key_equiv) 2138 return key 2139 2140 2141# ############################################################################# 2142# contacts widgets 2143# ############################################################################# 2144 2145 2146class ContactWidget(FocusableText): 2147 2148 SEND_FAILED_MARKUP = '✖' 2149 NOTE_TO_SELF_MARKUP = ('italic', ' (Self)') 2150 GROUP_MARKUP = ('italic', ' [GRP]') 2151 HIGHLIGHT_MARKUP_ATTR = 'bold' 2152 2153 def __init__(self, contact): 2154 self.contact = contact 2155 self._fail_mark_set = False 2156 self._highlight = False 2157 self._unread_count = 0 2158 self._name_markup = self._get_name_markup() 2159 super().__init__(self._name_markup) 2160 2161 def _get_name_markup(self): 2162 markup = [] 2163 name = self.contact.name 2164 if name: 2165 markup.append(name) 2166 else: 2167 name = self.contact.id 2168 markup.append(name) 2169 if name == cfg.username: 2170 markup.append(self.NOTE_TO_SELF_MARKUP) 2171 else: 2172 profile_name = self.contact.profile_name 2173 if profile_name: 2174 markup.append(('italic', ' ~' + profile_name)) 2175 if self.contact.is_group and not cfg.partition_contacts: 2176 markup.append(self.GROUP_MARKUP) 2177 return markup 2178 2179 @property 2180 def unread_count(self): 2181 return self._unread_count 2182 2183 @unread_count.setter 2184 def unread_count(self, count): 2185 if count == self._unread_count: 2186 return 2187 self._unread_count = count 2188 unread_count_markup = '' if not count else [('bold', f"({count})"), " "] 2189 self.set_text([unread_count_markup, self._name_markup]) 2190 2191 @property 2192 def fail_mark_set(self): 2193 return self._fail_mark_set 2194 2195 @fail_mark_set.setter 2196 def fail_mark_set(self, true_false): 2197 if self._fail_mark_set == true_false: 2198 return 2199 self._fail_mark_set = true_false 2200 fail_markup = [self.SEND_FAILED_MARKUP, " "] if self._fail_mark_set else [""] 2201 self.set_text(fail_markup + self._name_markup) 2202 2203 @property 2204 def highlight(self): 2205 return self._highlight 2206 2207 @highlight.setter 2208 def highlight(self, new_val): 2209 if self._highlight == new_val: 2210 return 2211 self._highlight = new_val 2212 highlight_attr = self.HIGHLIGHT_MARKUP_ATTR if self._highlight else None 2213 self.set_text((highlight_attr, self._name_markup)) 2214 # Note: this removes any additional markup the widget had (failed-send, unread count). But that's OK currently, as it's used only on switching between contacts, when those additional things should get cleared anyway. 2215 2216 2217class PartitionedContactsListWalker(urwid.SimpleListWalker): 2218 """Ensure that when `partition_contacts == True` only the ContactWidget objects can be in focus (not the headers or divider widgets). 2219 2220 If there are no ContactWidget objects it will focus on the last widget in `self.contents`. 2221 """ 2222 2223 def set_focus(self, position): 2224 # Overriding the base class's function to make sure only ContactWidget type objects may be in focus. 2225 # When the widget at `position` is not a ContactWidget, try the ones below it until we find one or reach the end. 2226 for pos in range(position, len(self)): 2227 w = self[pos] 2228 if type(w) is ContactWidget: # pylint: disable=unidiomatic-typecheck 2229 # Check that widget is of exactly ContactWidget type, not one of its base classes. 2230 return super().set_focus(pos) 2231 return None 2232 2233 def set_modified_callback(self, callback): 2234 # Abstract method, inherited from urwid.MonitoredList; has to be overriden in the concrete class. 2235 # See base class's docs: urwid.SimpleListWalker.set_modified_callback 2236 raise NotImplementedError( 2237 'Use connect_signal(list_walker, "modified", ...) instead.' 2238 ) 2239 2240 2241class ContactsListWidget(ListBoxPlus): 2242 signals = ['contact_selected'] 2243 2244 def __init__(self, contacts, chats_data): 2245 super().__init__( 2246 urwid.SimpleListWalker([]) 2247 if not cfg.partition_contacts else 2248 PartitionedContactsListWalker([]) 2249 ) 2250 self._contacts = contacts 2251 self._chats_data = chats_data 2252 self._contact_widgets_map = {} 2253 self.update() 2254 2255 def _get_sorted_contacts(self): 2256 def sorter(contact): 2257 contact_name = contact.name_or_id 2258 if cfg.contacts_sort_alpha: 2259 return contact_name.casefold() 2260 try: 2261 chat = self._chats_data.chats[contact.id] 2262 last_msg = chat[-1] 2263 except (KeyError, IndexError): 2264 return (0, contact_name.casefold()) 2265 return (-last_msg.timestamp, contact_name.casefold()) 2266 2267 if not cfg.partition_contacts: 2268 return sorted(self._contacts.map.values(), key=sorter) 2269 else: 2270 grps = sorted(self._contacts.groups, key=sorter) 2271 cnts = sorted(self._contacts.indivs, key=sorter) 2272 return (grps, cnts) 2273 2274 def update(self): 2275 sorted_contacts = self._get_sorted_contacts() 2276 if not cfg.partition_contacts: 2277 self.contents = [ContactWidget(contact) for contact in sorted_contacts] 2278 self._contact_widgets_map = {w.contact.id: w for w in self.contents} 2279 else: 2280 group_contact_widgets = [ContactWidget(contact) for contact in sorted_contacts[0]] 2281 indiv_contact_widgets = [ContactWidget(contact) for contact in sorted_contacts[1]] 2282 div_w = urwid.Divider('-') 2283 group_cont_section_title = urwid.Text(('bold', '~~ Groups ~~'), align='center') 2284 indiv_cont_section_title = urwid.Text(('bold', '~~ Contacts ~~'), align='center') 2285 widgets = ( 2286 [group_cont_section_title, div_w] 2287 + group_contact_widgets 2288 + [div_w, indiv_cont_section_title, div_w] 2289 + indiv_contact_widgets 2290 ) 2291 self._indiv_header_w = indiv_cont_section_title # Used in _move_contact_top() for getting its index position 2292 self.contents = widgets 2293 self._contact_widgets_map = {w.contact.id: w for w in group_contact_widgets + indiv_contact_widgets} 2294 self._set_all_ws_unread_counts() 2295 try: 2296 self._get_current_contact_widget().highlight = True 2297 except AttributeError: # current_contact is None 2298 pass 2299 self.try_set_focus(0) 2300 2301 def _set_all_ws_unread_counts(self): 2302 for contact_id, contact_widget in self._contact_widgets_map.items(): 2303 unread_count = self._chats_data.unread_counts.get(contact_id, 0) 2304 if unread_count: 2305 contact_widget.unread_count = unread_count 2306 2307 def update_contact_unread_count(self, contact_id): 2308 contact_widget = self._contact_widgets_map.get(contact_id) 2309 if contact_widget is not None: 2310 # The widget is None if received a msg from a 'new' contact (one not in the read signal-cli's data file) 2311 contact_widget.unread_count = self._chats_data.unread_counts[contact_id] 2312 2313 def on_new_message(self, msg): 2314 contact_widget = self._contact_widgets_map.get(msg.contact_id) 2315 if not cfg.contacts_sort_alpha and contact_widget is not None: 2316 self._move_contact_top(contact_widget) 2317 2318 def on_sending_done(self, envelope, status='sent', _timestamp_adj=None): 2319 # Show a "send failed" symbol next to the contact, but not if it's the "current" contact (whose chat is opened). 2320 if status != 'send_failed': 2321 return 2322 current_contact = self._chats_data.current_contact 2323 envelope_contact_id = get_envelope_contact_id(envelope) 2324 if current_contact.id == envelope_contact_id: 2325 return 2326 contact_widget = self._contact_widgets_map[envelope_contact_id] 2327 contact_widget.fail_mark_set = True 2328 2329 def _move_contact_top(self, w): 2330 pos_in_prefilter = None 2331 if not cfg.partition_contacts: 2332 pos_new = 0 2333 else: 2334 if w.contact.is_group: 2335 pos_new = 2 2336 elif not self.is_filter_on: 2337 pos_new = len(self._contacts.groups) + 5 # 2 for "Groups" header and 3 for "Contacts" 2338 else: 2339 pos_new = self.contents.index(self._indiv_header_w) + 2 2340 pos_in_prefilter = len(self._contacts.groups) + 5 2341 self.move_item(w, pos_new, pos_in_prefilter) 2342 self.focus_position = pos_new 2343 2344 def _get_current_contact_widget(self): 2345 current_contact = self._chats_data.current_contact 2346 if current_contact is None: 2347 return None 2348 return self._contact_widgets_map[current_contact.id] 2349 2350 def _unhighlight_current_contact_widget(self): 2351 # Remove highlighting from the "current" (not for long) contact's widget. 2352 try: 2353 self._get_current_contact_widget().highlight = False 2354 except AttributeError: # current_contact is None 2355 pass 2356 2357 def _select_focused_contact(self, focus_widget=None): 2358 # The `focus_widget` parameter is passed through from caller to emit_signal. It specifies whether the focus should be set on `input`, `chat` or `contacts` widgets after switching to a new contact. 2359 focused_contact_w = self.focus 2360 if focused_contact_w is None: 2361 # This can happen e.g. when searching through contacts returns no results. 2362 return 2363 if cfg.partition_contacts and not isinstance(focused_contact_w, ContactWidget): 2364 # Widget in focus is urwid.Text (a header) or urwid.Divider. They are normally not supposed to get the focus, but sometimes may: e.g. after pressing `home`, or after doing a search with `/`, or when there are no other widgets (e.g. no search results). 2365 return 2366 contact = focused_contact_w.contact 2367 focused_contact_w.fail_mark_set = False 2368 self._unhighlight_current_contact_widget() 2369 urwid.emit_signal(self, 'contact_selected', contact, focus_widget) 2370 focused_contact_w.highlight = True 2371 2372 def select_next_contact(self, reverse=False): 2373 current_contact = self._chats_data.current_contact 2374 if current_contact == self.focus.contact or current_contact is None: 2375 curr_position = self.focus_position 2376 else: 2377 contact_w = self._contact_widgets_map[current_contact.id] 2378 curr_position = self.contents.index(contact_w) 2379 try: 2380 focus_position_new = ( 2381 self.body.next_position(curr_position) 2382 if not reverse else 2383 self.body.prev_position(curr_position) 2384 ) 2385 except IndexError: 2386 return 2387 #focus_position_new = self.focus_position - int((reverse - 0.5) * 2) # Alternative way of obtaining the new position 2388 if (cfg.partition_contacts 2389 and reverse 2390 and not isinstance(self.contents[focus_position_new], ContactWidget) 2391 and focus_position_new != 1): 2392 # Jumping over the `~~ Contacts ~~` header when going up. 2393 focus_position_new -= 3 2394 try: 2395 self.set_focus(focus_position_new, coming_from='below' if reverse else 'above') 2396 except IndexError: 2397 return 2398 self._select_focused_contact() 2399 2400 def keypress(self, size, key): 2401 key = super().keypress(size, key) 2402 if key == 'enter': 2403 self._select_focused_contact(focus_widget='input') 2404 elif key == 'l': 2405 self._select_focused_contact() 2406 else: 2407 return key 2408 return None 2409 2410 2411class ContactsWindow(urwid.Frame): 2412 def __init__(self, contacts, chats_data): 2413 self.contacts_list_w = ContactsListWidget(contacts, chats_data) 2414 self._wsearch = Edit(('bold', '/ ')) 2415 2416 urwid.connect_signal(self._wsearch, 'postchange', self._on_search_text_changed) 2417 2418 super().__init__(self.contacts_list_w, footer=None) 2419 2420 if not cfg.partition_contacts: 2421 self.header = urwid.Pile([ 2422 urwid.Text(('bold', 'Contacts'), align='center'), 2423 urwid.Divider('-') 2424 ]) 2425 2426 def _start_search(self): 2427 self.footer = self._wsearch 2428 self.focus_position = 'footer' 2429 2430 def _remove_search(self): 2431 self._wsearch.set_edit_text('') 2432 self.focus_position = 'body' 2433 self.footer = None 2434 2435 def _on_search_text_changed(self, input_w, _old_text): 2436 def match_test(contact_w): 2437 try: 2438 contact = contact_w.contact 2439 except AttributeError: 2440 # Keep the `partition_contacts` headers / dividers 2441 return True 2442 return ( 2443 txt.casefold() in contact.name_or_id.casefold() 2444 or 2445 not contact.is_group and txt in contact.id 2446 ) 2447 txt = input_w.get_edit_text() 2448 match_test = None if not txt else match_test 2449 self.contacts_list_w.filter_contents(match_test) 2450 2451 def keypress(self, size, key): 2452 key = super().keypress(size, key) 2453 if key == '/': 2454 self._start_search() 2455 elif key == 'enter' and self.focus_position == 'footer': 2456 self.focus_position = 'body' 2457 self.contacts_list_w.try_set_focus(0) 2458 elif key == 'esc': 2459 self._remove_search() 2460 else: 2461 return key 2462 return None 2463 2464 2465# ############################################################################# 2466# input line 2467# ############################################################################# 2468 2469 2470class CommandsHistory: 2471 def __init__(self): 2472 self._history = [] 2473 self._index = 0 2474 self._stashed_input = None 2475 2476 def prev(self, curr_input): 2477 if (curr_input != self._stashed_input 2478 and self._history 2479 and curr_input != self._history[self._index]): 2480 # This check fixes the following unexpected behavior: 2481 # Type `:whatev`, press `up` a few times, then delete the input with e.g. `backspace`. Next time the history will be looked up from where it's been left this time. 2482 self._index = 0 2483 if self._index == 0: 2484 self._stashed_input = curr_input 2485 self._index -= 1 2486 try: 2487 return self._history[self._index] 2488 except IndexError: 2489 self._index += 1 2490 return curr_input 2491 2492 def next(self, curr_input): 2493 if self._index == 0: 2494 return curr_input 2495 self._index += 1 2496 if self._index == 0: 2497 return self._stashed_input 2498 return self._history[self._index] 2499 2500 def add(self, cmd): 2501 self._history.append(cmd) 2502 self._index = 0 2503 2504 2505class BracketedPasteEdit(Edit): 2506 def __init__(self, *args, multiline=False, **kwargs): 2507 super().__init__(*args, multiline=True, **kwargs) 2508 # Using `multiline=True` in super() and then passing on 'enter' keypress to it. A nicer alternative would be to pass '\n', but Edit does not handle it. 2509 self._multiline_arg = multiline 2510 self._paste_mode_on = False 2511 2512 def keypress(self, size, key): 2513 if key == 'begin paste': 2514 self._paste_mode_on = True 2515 elif key == 'end paste': 2516 self._paste_mode_on = False 2517 elif key == 'enter' and not (self._multiline_arg or self._paste_mode_on): 2518 return key 2519 elif key == 'meta enter': 2520 # Allow inserting new lines with Alt+Enter. This is not a part of "bracketed paste mode" functionality. 2521 return super().keypress(size, 'enter') 2522 else: 2523 return super().keypress(size, key) 2524 return None 2525 2526 2527class InputLine(BracketedPasteEdit): 2528 def __init__(self, **kwargs): 2529 self._cmd_history = CommandsHistory() 2530 self._cmds = None 2531 self._prompt = ('bold', '> ') # In urwid's parlance, this is called 'caption'. 2532 super().__init__(self._prompt, **kwargs) 2533 2534 def set_cmds(self, cmds): 2535 self._cmds = cmds 2536 2537 def _set_edit_text_move_cursor(self, txt, cursor_pos=-1): 2538 """Edit.set_edit_text() + Edit.set_edit_pos() 2539 2540 Like Edit.insert_text(), but istead of adding to the current edit_text, replace it with the provided argument. 2541 """ 2542 self.set_edit_text(txt) 2543 if cursor_pos == -1: 2544 cursor_pos = len(txt) 2545 self.set_edit_pos(cursor_pos) 2546 2547 def auto_complete_commands(self, txt): 2548 # See also: there is an autocomplete in rr-/urwid_readline 2549 splitted_txt = txt.split(' ') 2550 if len(splitted_txt) > 1: 2551 path, *messages = split_path(' '.join(splitted_txt[1:])) 2552 2553 # Check we are trying to complete a path 2554 if len(messages) > 0 or not is_path(path): 2555 return 2556 2557 fullpath = os.path.expanduser(path) 2558 dirname = os.path.dirname(fullpath) 2559 if not os.path.isdir(dirname): 2560 return 2561 2562 possible_paths = [x for x in os.listdir(dirname) if os.path.join(dirname, x).startswith(fullpath)] 2563 commonprefix = os.path.commonprefix(possible_paths) 2564 2565 action_request.set_status_line('|'.join(possible_paths)) 2566 2567 completion = '' 2568 if commonprefix != '': 2569 completion = os.path.join(os.path.dirname(path), commonprefix) 2570 if os.path.isdir(os.path.expanduser(completion)) and not completion.endswith('/'): 2571 completion = completion + '/' 2572 if ' ' in completion: 2573 completion = '"' + completion + '"' 2574 2575 if completion != '': 2576 self._set_edit_text_move_cursor(splitted_txt[0] + ' ' + completion) 2577 else: 2578 all_commands = [ 2579 cmd 2580 for cmd in [tupl[0][0] for tupl in self._cmds.cmd_mapping] 2581 if cmd.lower().startswith(txt[1:].lower()) 2582 ] 2583 commonprefix = os.path.commonprefix(all_commands) 2584 2585 action_request.set_status_line('{' + '|'.join(all_commands) + '}') 2586 2587 if len(all_commands) == 1: 2588 self._set_edit_text_move_cursor(':' + all_commands[0] + ' ') 2589 elif commonprefix != '': 2590 self._set_edit_text_move_cursor(':' + commonprefix) 2591 2592 def _keypress_cmd_mode(self, key, key_orig, txt): 2593 # Called when `txt.startswith(':')` 2594 if key == 'enter': 2595 if txt.strip() == ":": 2596 action_request.set_status_line('Command missing after `:`') 2597 return None 2598 cmd, *args = txt[1:].split(maxsplit=1) 2599 self._cmds.exec(cmd, *args) 2600 self._cmd_history.add(txt) 2601 self.set_edit_text('') 2602 self.set_caption(self._prompt) 2603 elif key == 'tab' and not self.get_edit_text().endswith(' '): 2604 self.auto_complete_commands(txt) 2605 elif key_orig in ('up', 'ctrl p'): 2606 # Since BracketedPasteEdit is based on Edit(multiline=True), the up / down / ctrl+p/n are consumed by the superclass, so need to check `key_orig`, before `super` method call. 2607 prev_cmd = self._cmd_history.prev(txt) # pylint: disable=not-callable; https://github.com/PyCQA/pylint/issues/3970 2608 self._set_edit_text_move_cursor(prev_cmd) 2609 elif key_orig in ('down', 'ctrl n'): 2610 next_cmd = self._cmd_history.next(txt) # pylint: disable=not-callable; https://github.com/PyCQA/pylint/issues/3970 2611 self._set_edit_text_move_cursor(next_cmd) 2612 else: 2613 return key 2614 return None 2615 2616 def keypress(self, size, key): 2617 key_orig = key 2618 key = super().keypress(size, key) 2619 txt = self.get_edit_text() 2620 2621 if not txt or txt.isspace(): 2622 self.set_caption(self._prompt) # restore normal prompt 2623 return key 2624 if txt.startswith(('/', ':')): 2625 self.set_caption('') # set "prompt" to '/' or ':' 2626 if key == 'esc': 2627 self.set_edit_text('') 2628 self.set_caption(self._prompt) 2629 return None 2630 else: 2631 self.set_caption(self._prompt) 2632 if txt.startswith(':'): 2633 return self._keypress_cmd_mode(key, key_orig, txt) 2634 elif key == 'enter': 2635 if txt.startswith('/'): 2636 return key 2637 action_request.send_message_curr_contact(txt) 2638 self.set_edit_text('') 2639 else: 2640 return key 2641 return None 2642 2643 2644# ############################################################################# 2645# conversation widgets 2646# ############################################################################# 2647 2648 2649class MessageWidget(urwid.WidgetWrap): 2650 2651 MAX_ATTACHS_SHOW = 4 2652 2653 TYPING_INDICATOR_MARKUP = '...' 2654 REMOTE_DELETE_MARKUP = ('italic', '[deleted]') 2655 2656 FORMAT_MAP = {'_': 'italic', '*': 'bold', '~': 'strikethrough'} 2657 MENTION_BRACKET_CHAR = chr(31) # arbitrary non-printable char 2658 FORMAT_MENTION = {MENTION_BRACKET_CHAR: 'italic'} 2659 FORMATTING_RE = None 2660 2661 @classmethod 2662 def set_formatting_consants(cls, use_formatting): 2663 if use_formatting: 2664 cls.FORMAT_MAP.update(cls.FORMAT_MENTION) 2665 else: 2666 cls.FORMAT_MAP = cls.FORMAT_MENTION 2667 cls.FORMATTING_RE = re.compile( 2668 # Match text like "_italicised_", where "_" is a char in FORMAT_MAP 2669 rf""" 2670 ( 2671 [{''.join(cls.FORMAT_MAP.keys())}] 2672 ) 2673 #.+? # bad with doubled format chars, e.g. ~~this~~ 2674 #[^\1]+ # can't use backreferences in character class 2675 (?: 2676 (?!\1). # consume a char and check it's not a format char 2677 )+ 2678 \1 2679 """, 2680 re.VERBOSE) 2681 2682 def __init__(self, msg): 2683 self.msg = msg 2684 self._align = ( 2685 'left' 2686 if (not is_envelope_outgoing(self.msg.envelope) 2687 or cfg.one_sided) 2688 else 'right' 2689 ) 2690 msg_markup = self._get_message_markup() 2691 self._text_w = FocusableText(msg_markup or '', align=self._align) 2692 # urwid.Text throws an error if given an empty list for markup. Not sure `msg_markup` can ever end up being empty though. 2693 msg_pad_w = urwid.Padding(self._text_w, self._align, width=cfg.wrap_at) 2694 status_markup = self._get_status_markup() 2695 self._status_w = urwid.Text(status_markup, self._align) 2696 status_w_valign = 'top' if self._align == 'left' else 'bottom' 2697 status_filler_w = urwid.Filler(self._status_w, status_w_valign) 2698 cols = [(DeliveryStatus.MARKUP_WIDTH, status_filler_w), msg_pad_w] 2699 box_columns = [0] 2700 if self._align == 'right': 2701 cols.reverse() 2702 box_columns = [1] 2703 columns_w = urwid.Columns(cols, dividechars=1, box_columns=box_columns) 2704 color = None if not cfg.color else cfg.color.for_message(msg) 2705 display_w = urwid.AttrMap(columns_w, color, focus_map=REVERSED_FOCUS_MAP) 2706 super().__init__(display_w) 2707 self._reactions_w = None 2708 self.update_reactions_w() 2709 2710 def _get_message_markup(self): 2711 if 'typingMessage' in self.msg.envelope: 2712 markups = ( 2713 self._get_sender_markup(), 2714 [self.TYPING_INDICATOR_MARKUP], 2715 ) 2716 elif self.msg.envelope.get('callMessage') is not None: 2717 markups = (self._get_call_message_markup(), ) 2718 elif getattr(self.msg, 'remote_delete', None): 2719 markups = ([self.REMOTE_DELETE_MARKUP], ) 2720 else: 2721 markups = ( 2722 self._get_sender_markup(), 2723 self._get_quote_markup(), 2724 self._get_text_markup(), 2725 self._get_attachments_markup(), 2726 ) 2727 2728 ret = [] 2729 for markup in markups: 2730 if markup: 2731 if ret: 2732 ret.append('\n') 2733 ret.extend(markup) 2734 return ret 2735 2736 def _get_text_markup(self, text=None): 2737 if text is None: 2738 text = self.msg.text 2739 if not text: 2740 return None 2741 if not (cfg.use_formatting or self.msg.mentions): 2742 return [text] 2743 if self.msg.mentions: 2744 text = self.msg.text_w_mentions(bracket_char=self.MENTION_BRACKET_CHAR) 2745 ret = [] 2746 pos = 0 2747 for match in self.FORMATTING_RE.finditer(text): 2748 if pos != match.start(): 2749 # Do not add empty strings. Urwid breaks on markup like: 2750 # [.., ('bold', 'txt1'), '', ('bold', 'txt2'), ...] 2751 ret.append(text[pos : match.start()]) 2752 ret.append((self.FORMAT_MAP[match[1]], match.group()[1:-1])) 2753 pos = match.end() 2754 if pos != len(text): 2755 ret.append(text[pos:]) 2756 return ret 2757 2758 def _get_attachments_markup(self, attachments=None): 2759 if attachments is None: 2760 attachments = self.msg.attachments 2761 if not attachments: 2762 return None 2763 attach_list = [get_attachment_name(attach) for attach in attachments] 2764 if len(attachments) > self.MAX_ATTACHS_SHOW: 2765 attach_list = attach_list[: self.MAX_ATTACHS_SHOW] 2766 attach_list.append(f'... ({len(attachments)-self.MAX_ATTACHS_SHOW} more)') 2767 attach_txt = ', '.join(attach_list) 2768 return ['[attached: ', ('italic', attach_txt), ']'] 2769 2770 def _get_sender_markup(self): 2771 envelope = self.msg.envelope 2772 is_group = is_envelope_group_message(envelope) 2773 if not (is_group or cfg.show_names): 2774 return None 2775 if is_envelope_outgoing(envelope): 2776 return [('bolditalic', 'You')] if cfg.show_names else None 2777 sender_name = action_request.get_contact_name(self.msg.sender_num) 2778 return [('bolditalic', sender_name)] 2779 2780 def _get_quote_markup(self): 2781 quote = get_envelope_quote(self.msg.envelope) 2782 if not quote: 2783 return None 2784 try: 2785 quote_author_num = quote['author'] 2786 quote_text = quote['text'] 2787 quote_attachments = quote['attachments'] 2788 except KeyError: 2789 logging.error("Failed to extract a quote from %s", self.msg.envelope) 2790 return None 2791 author_name = action_request.get_contact_name(quote_author_num) 2792 ret = ['| ', ('bolditalic', author_name)] 2793 if quote_text: 2794 ret.extend(['\n', '| ']) 2795 ret.extend(self._get_text_markup(quote_text)) 2796 if quote_attachments: 2797 ret.extend(['\n', '| ']) 2798 ret.extend(self._get_attachments_markup(quote_attachments)) 2799 if self._align == 'right': 2800 del ret[0] 2801 for ind, elem in enumerate(ret): 2802 if elem == '| ' and ret[ind-1] == '\n': 2803 del ret[ind] 2804 ret.insert(ind-1, ' |') 2805 ret.append(' |') 2806 return ret 2807 2808 def _get_call_message_markup(self): 2809 call_message = self.msg.envelope['callMessage'] 2810 if 'offerMessage' in call_message: 2811 return [' ', ('italic', 'Incoming call')] 2812 elif 'answerMessage' in call_message: 2813 return [('italic', 'Calling'), ' '] 2814 elif get_nested(call_message, 'hangupMessage', 'type') == 'NORMAL': 2815 # For accepted calls, `type: "ACCEPTED"` 2816 return [' ', ('italic', 'Hung up')] 2817 return None 2818 2819 def _get_status_markup(self): 2820 return DeliveryStatus.MARKUP_MAP[self.msg.delivery_status] 2821 2822 def update_status(self): 2823 status_markup_new = self._get_status_markup() 2824 self._status_w.set_text(status_markup_new) 2825 2826 def reload_markup(self): 2827 msg_markup = self._get_message_markup() 2828 self._text_w.set_text(msg_markup or '') 2829 2830 def update_reactions_w(self): 2831 try: 2832 reactions = self.msg.reactions 2833 except AttributeError: 2834 return 2835 emojis_markup = [] 2836 for envelope in reactions.values(): 2837 reaction = get_envelope_reaction(envelope) 2838 if not reaction.get('isRemove'): 2839 emojis_markup.append(reaction['emoji']) 2840 if not emojis_markup: 2841 self._remove_reactions_w() 2842 return 2843 try: 2844 self._reactions_w.set_text(emojis_markup) 2845 except AttributeError: 2846 self._add_reactions_w(emojis_markup) 2847 2848 def _add_reactions_w(self, emojis_markup): 2849 self._reactions_w = urwid.Text(emojis_markup, align=self._align) 2850 react_pad_w = urwid.Padding(self._reactions_w, self._align, width=cfg.wrap_at) 2851 react_sym_markup = '╰╴' if self._align == 'left' else '╶╯' 2852 react_sym_w = urwid.Text( 2853 react_sym_markup, 2854 align='right' if self._align == 'left' else 'left', 2855 ) 2856 cols = [ 2857 (DeliveryStatus.MARKUP_WIDTH + len(react_sym_markup), react_sym_w), 2858 react_pad_w, 2859 ] 2860 if self._align == 'right': 2861 cols.reverse() 2862 react_cols_w = urwid.Columns(cols) 2863 self._w.original_widget = urwid.Pile([self._w.original_widget, react_cols_w]) 2864 2865 def _remove_reactions_w(self): 2866 if self._reactions_w is None: 2867 return 2868 self._w.original_widget = self._w.original_widget.contents[0][0] 2869 self._reactions_w = None 2870 2871 2872class MessageWidgetsCache: 2873 """Create and cache widgets for LazyEvalMessageListWalker""" 2874 2875 def __init__(self): 2876 self._cache = {} 2877 #self._cache = weakref.WeakValueDictionary() 2878 # Using a weak reference dictionary would save memory, but at the cost of using cpu to (re)create MessageWidget objects after switching back and forth between the chats. 2879 2880 def get(self, msg, _position=None): 2881 key = self._hash(msg) 2882 try: 2883 # Not using 2884 # return self._cache.setdefault(key, MessageWidget(msg)) 2885 # insted of this try..except, because it would (re)create a new MessageWidget(msg) obj every time, even if it's already in the cache. 2886 w = self._cache[key] 2887 except KeyError: 2888 w = MessageWidget(msg) 2889 self._cache[key] = w 2890 return w 2891 2892 @staticmethod 2893 def _hash(msg): 2894 return hash((msg.sender_num, msg.timestamp)) 2895 2896 def on_delivery_status_changed(self, timestamp, _status): 2897 key = hash((cfg.username, timestamp)) 2898 try: 2899 msg_w = self._cache[key] 2900 except KeyError: 2901 # This is not necessarily an error: 2902 # Happens when the msg's delivery status is set before the message widget is created. For instance, when status = sending, or before the chat is opened and the widgets for it are created. 2903 return 2904 msg_w.update_status() 2905 2906 def adjust_timestamp(self, msg, timestamp_adj): 2907 """Save memory by purging entry with old timestamp from cache. 2908 2909 Also, saves cpu by not re-creating new MessageWidgets. 2910 """ 2911 # This method is not be needed if self._cache is a weakref dictionary. 2912 key = self._hash(msg) 2913 key_adj = hash((msg.sender_num, timestamp_adj)) 2914 try: 2915 # Theoretically, it's possible to get a race condition here if signal-cli returns adjusted timestamp before the msg with un-adjusted timestamp is added to the _cache. 2916 self._cache[key_adj] = self._cache.pop(key) 2917 except KeyError: 2918 pass 2919 2920 2921class LazyEvalMessageListWalker(LazyEvalListWalker): 2922 def __init__(self, contents, init_focus_pos=-1): 2923 self.msg_ws_cache = MessageWidgetsCache() 2924 super().__init__(contents, self.msg_ws_cache.get, init_focus_pos) 2925 2926 2927class ChatView(ListBoxPlus): 2928 def __init__(self): 2929 lw = LazyEvalMessageListWalker(urwid.MonitoredList()) 2930 super().__init__(lw) 2931 2932 def _update_search_results(self, txt, old_txt=''): 2933 if not txt: 2934 return 2935 scope = self.contents if old_txt in txt else None 2936 # Incremental search: only search through the current search results, rather then the whole chat. 2937 def test_match(msg): 2938 if not msg.text: 2939 return None 2940 return txt in msg.text.casefold() 2941 self.filter_contents(test_match, scope) 2942 self.try_set_focus(-1) 2943 2944 def _reset_search(self, keep_curr_focused=False): 2945 """Restore the pre-search contents. 2946 2947 If keep_curr_focused is false, the focus is restored to the widget that was in focus before the search was started. 2948 Otherwise, place the focus on the same message that was in focus before the search is removed. 2949 """ 2950 curr_focused_msg_w = self.focus 2951 self.filter_contents(None) 2952 if keep_curr_focused: 2953 focus_position = self.contents.index(curr_focused_msg_w.msg) 2954 self.try_set_focus(focus_position) 2955 2956 def on_input_line_change(self, input_line_w, old_text): 2957 txt = input_line_w.get_edit_text() 2958 if txt.startswith('/'): 2959 self._update_search_results(txt[1:], old_text[1:]) 2960 elif self.is_filter_on: 2961 self._reset_search() 2962 2963 def _delete_message(self, msg): 2964 index = self.focus_position if not self.is_filter_on else None 2965 action_request.delete_message(msg, index) 2966 if self.is_filter_on: 2967 del self.contents[self.focus_position] 2968 2969 def _resend_message(self, msg): 2970 focus_position = self.focus_position # Saving it because it'll shift after resend_message(). 2971 index = focus_position if not self.is_filter_on else None 2972 try: 2973 action_request.resend_message(msg, index) 2974 except TypeError: 2975 return 2976 if self.is_filter_on: 2977 del self.contents[focus_position] 2978 self.contents.append(self._contents_pre_filter[-1]) 2979 # The `_contents_pre_filter` for this class always points to the `current_chat` list. So after `resend()` action, its last element is the new message. 2980 self.try_set_focus(-1) 2981 2982 def keypress(self, size, key): 2983 key = super().keypress(size, key) 2984 message_widget = self.focus 2985 if message_widget is None: 2986 return key 2987 envelope = message_widget.msg.envelope 2988 2989 if key in ('enter', 'l'): 2990 if self.is_filter_on: 2991 self._reset_search(keep_curr_focused=True) 2992 elif get_envelope_msg(envelope) is not None: 2993 action_request.open_attach(envelope) or action_request.open_urls(envelope) # pylint: disable=expression-not-assigned 2994 elif key == 'o': 2995 action_request.open_urls(envelope) or action_request.open_attach(envelope) # pylint: disable=expression-not-assigned 2996 elif key == 'y': 2997 txt = get_envelope_msg(envelope) 2998 clip.put(txt) 2999 elif key == 'd': 3000 self._delete_message(message_widget.msg) 3001 elif key == 'r': 3002 self._resend_message(message_widget.msg) 3003 elif key == 'q': 3004 # Replying / quoting not supported by signal-cli 3005 # https://github.com/AsamK/signal-cli/issues/213 3006 pass 3007 else: 3008 return key 3009 return None 3010 3011 3012class ChatWindow(urwid.Frame): 3013 def __init__(self): 3014 self._title_widget = urwid.Text('', align='center') 3015 self.input_line_w = InputLine() 3016 self.chat_view = ChatView() 3017 title_w_div = urwid.Pile([self._title_widget, urwid.Divider('-')]) 3018 input_w_div = urwid.Pile([urwid.Divider('-'), self.input_line_w]) 3019 self._focusable_widgets = {'chat': 'body', 'input': 'footer'} 3020 super().__init__(self.chat_view, header=title_w_div, footer=input_w_div) 3021 urwid.connect_signal(self.input_line_w, 'postchange', self.chat_view.on_input_line_change) 3022 3023 @property 3024 def focus_widget_name(self): 3025 for widget_name, focus_pos in self._focusable_widgets.items(): 3026 if focus_pos == self.focus_position: 3027 return widget_name 3028 return None 3029 3030 @focus_widget_name.setter 3031 def focus_widget_name(self, widget_name): 3032 self.focus_position = self._focusable_widgets[widget_name] 3033 3034 def set_title(self, contact): 3035 name = contact.name_or_id 3036 markup = [('bold', name)] 3037 if not contact.is_group: 3038 num = contact.number 3039 if name != num: 3040 markup.extend([' (', num, ')']) 3041 else: 3042 memb_names = [memb.name_or_id for memb in contact.member_contacts] 3043 markup.append(' (') 3044 markup.extend(', '.join(memb_names)) 3045 markup.append(', ' if memb_names else 'only: ') 3046 markup.extend([('italic', 'You'), ')']) 3047 self._title_widget.set_text(markup) 3048 3049 def on_contact_selected(self, contact): 3050 self.set_title(contact) 3051 self.chat_view.try_set_focus(-1) 3052 3053 def keypress(self, size, key): 3054 key = super().keypress(size, key) 3055 if not self.input_line_w.edit_text.startswith('/'): 3056 return key 3057 if key == 'esc': 3058 return self.input_line_w.keypress(size, key) 3059 if key == 'enter' and self.focus_widget_name == 'input': 3060 if not self.chat_view.is_filter_on: 3061 # This clause is used for re-doing a search on a new chat contents after swtiching to a new contact. 3062 urwid.emit_signal(self.input_line_w, 'postchange', self.input_line_w, '/') 3063 if self.chat_view.contents: 3064 self.focus_widget_name = 'chat' 3065 return None 3066 else: 3067 return key 3068 return None 3069 3070 3071# ############################################################################# 3072# MainWindow 3073# ############################################################################# 3074 3075 3076class StatusLine(urwid.WidgetWrap): 3077 def __init__(self, unread_count=0): 3078 self._text = urwid.Text('') 3079 self._unreads_widget = urwid.Text([ 3080 "Unread messages count: ", 3081 ('bold', f"{unread_count}"), 3082 ]) 3083 cols = urwid.Columns([self._text, ('pack', self._unreads_widget)], dividechars=1) 3084 super().__init__(cols) 3085 3086 def set_text(self, new_text): 3087 self._text.set_text(new_text) 3088 3089 def set_unread_count(self, count): 3090 txt = str(count) if count else str() 3091 self._unreads_widget.set_text(('bold', txt)) 3092 3093 3094class MessageInfo(ListBoxPlus): 3095 3096 class OpenPath(FocusableText): 3097 """Open-able text: file or URL""" 3098 3099 def __init__(self, text, *args, fpath=None, **kwargs): 3100 super().__init__(text, *args, **kwargs) 3101 self.fpath = fpath 3102 3103 def get_path(self): 3104 return self.fpath or self.text 3105 3106 def open_path(self): 3107 if self.fpath: 3108 return action_request.open_file(self.fpath) 3109 return action_request.open_url(self.text) 3110 3111 def __init__(self, msg): 3112 self._msg = msg 3113 3114 name_w = FocusableText([ 3115 ('bold', 'Sender : '), 3116 action_request.get_contact_name(msg.sender_num), 3117 ]) 3118 num_w = FocusableText([('bold', 'Number : '), msg.sender_num]) 3119 date = strftimestamp(msg.timestamp) 3120 date_w = FocusableText([('bold', 'Date : '), date]) 3121 items = [name_w, num_w, date_w] 3122 3123 if msg.text: 3124 txt_w = FocusableText([('bold', 'Message: '), msg.text]) 3125 items.append(txt_w) 3126 3127 delivery_status_w = self._get_delivery_status_w() 3128 if delivery_status_w: 3129 items.append(delivery_status_w) 3130 3131 items.append(urwid.Divider()) 3132 3133 if msg.text: 3134 urls = get_urls(msg.text) 3135 if urls: 3136 items.extend(self._get_urls_ws(urls)) 3137 3138 attachments = get_envelope_attachments(msg.envelope) 3139 if attachments: 3140 items.extend(self._get_attachments_ws(attachments)) 3141 3142 reactions = getattr(msg, 'reactions', None) 3143 if reactions is not None: 3144 items.extend(self._get_reactions_ws(reactions)) 3145 3146 if cfg.debug: 3147 items.extend(self._get_debug_info()) 3148 3149 super().__init__(items) 3150 3151 def _get_delivery_status_w(self): 3152 status_detailed = self._msg.delivery_status_detailed 3153 status_str = status_detailed.str 3154 if not status_str: 3155 return None 3156 when_str = strftimestamp(status_detailed.when, strformat='%H:%M:%S %Y-%m-%d') 3157 status_when = f' ({when_str})' if status_detailed.when else '' 3158 return FocusableText([('bold', 'Status : '), status_str, status_when]) 3159 3160 def _get_urls_ws(self, urls): 3161 header_w = urwid.Text([('bold', 'Links')], align='center') 3162 ret = [header_w] 3163 for url in urls: 3164 url_w = self.OpenPath(url) 3165 ret.append(url_w) 3166 return ret 3167 3168 def _get_attachments_ws(self, attachments): 3169 header_w = urwid.Text(('bold', 'Attachments'), align='center') 3170 ret = [header_w] 3171 for atch in attachments: 3172 atch_w = self.OpenPath( 3173 text=get_attachment_name(atch), 3174 fpath=get_attachment_path(atch) 3175 ) 3176 ret.append(atch_w) 3177 return ret 3178 3179 @staticmethod 3180 def _get_reactions_ws(reactions): 3181 heading_w = urwid.Text([('bold', 'Reactions')], align='center') 3182 ret = [heading_w] 3183 for sender_num, envelope in reactions.items(): 3184 sender_name = action_request.get_contact_name(sender_num) 3185 reaction = get_envelope_reaction(envelope) 3186 if reaction.get('isRemove'): 3187 continue 3188 ret.append(FocusableText([ 3189 sender_name, 3190 ': ', 3191 reaction['emoji'], 3192 ' (', 3193 strftimestamp(get_envelope_time(envelope)), 3194 ')', 3195 ])) 3196 if ret == [heading_w]: 3197 return [] 3198 return ret 3199 3200 def _get_debug_info(self): 3201 ret = [ 3202 urwid.Divider(), 3203 urwid.Text(('bold', 'Debug info'), align='center'), 3204 urwid.Text('Envelope', align='center'), 3205 FocusableText(pprint.pformat(self._msg.envelope, width=-1)), 3206 ] 3207 return ret 3208 3209 def keypress(self, size, key): 3210 key = super().keypress(size, key) 3211 item = self.body[self.focus_position] 3212 if key == 'y': 3213 try: 3214 clip.put(item.get_path()) 3215 except AttributeError: 3216 markup = get_text_markup(item) 3217 if len(markup) == 2 and isinstance(markup[0], tuple): 3218 # The line is `Property : Value` type 3219 clip.put(markup[1]) 3220 else: 3221 clip.put(item.text) 3222 elif key in ('enter', 'o'): 3223 try: 3224 item.open_path() 3225 except AttributeError: 3226 pass 3227 else: 3228 return key 3229 return None 3230 3231 3232class HelpDialog(ListBoxPlus): 3233 def __init__(self): 3234 items = [ 3235 urwid.Divider(), 3236 urwid.Text( 3237 "Please see README for the full list of commands and key bindings.", 3238 align='center', 3239 ), 3240 urwid.Divider(), 3241 ] 3242 3243 def open_readme(_button): 3244 action_request.open_file(SCLI_README_FILE) 3245 3246 btn_open_readme = urwid.Button('Open README', on_press=open_readme) 3247 pad = urwid.Padding(btn_open_readme, align='center', width=len(btn_open_readme.label)+4) 3248 items.append(pad) 3249 3250 super().__init__(items) 3251 3252 3253class PopUpPlaceholder(urwid.WidgetPlaceholder): 3254 def __init__(self, w): 3255 super().__init__(w) 3256 self._orig_w = w 3257 # Urwid's terminology here might be confusing: "WidgetPlaceholder.original_widget" means "currently displayed widget", not the one it is originally initialized with. 3258 3259 def _show_pop_up(self, widget, title=''): 3260 pop_up_box = PopUpBox(widget, title) 3261 urwid.connect_signal(pop_up_box, 'closed', self._remove_pop_up) 3262 pop_up_overlay = urwid.Overlay( 3263 pop_up_box, 3264 self._orig_w, 3265 align='center', 3266 valign='middle', 3267 width=('relative', 85), 3268 height=('relative', 65), 3269 ) 3270 self.original_widget = pop_up_overlay 3271 3272 def _remove_pop_up(self, _sender=None): 3273 self.original_widget = self._orig_w 3274 3275 @property 3276 def _is_popup_shown(self): 3277 return self.original_widget is not self._orig_w 3278 3279 def show_help(self): 3280 self._show_pop_up(HelpDialog(), title='Help') 3281 3282 def show_message_info(self, message_widget): 3283 info = MessageInfo(message_widget.msg) 3284 fill = urwid.Filler(info, height=('relative', 100), top=1, bottom=1) 3285 self._show_pop_up(fill, title='Message info') 3286 3287 def keypress(self, size, key): 3288 key = super().keypress(size, key) 3289 if not self._is_popup_shown: 3290 # When popup is shown, do not pass keys to other widgets until it's closed 3291 return key 3292 return None 3293 3294 3295class MainWindow(urwid.WidgetWrap): 3296 def __init__(self, contacts, chats_data): 3297 self._chats_data = chats_data 3298 self.contacts_w = ContactsWindow(contacts, self._chats_data) 3299 3300 self.chat_w = ChatWindow() 3301 contacts_box = LineBoxHighlight(self.contacts_w) 3302 self._chat_win_box = LineBoxHighlight(self.chat_w) 3303 self._popup_ph = PopUpPlaceholder(self._chat_win_box) 3304 cols = [('weight', 1, contacts_box), ('weight', 3, self._popup_ph)] 3305 self._columns = urwid.Columns(cols) 3306 self._contacts_column = self._columns.contents[0] 3307 3308 total_unread_count = self._chats_data.unread_counts.total 3309 self.status_line = StatusLine(total_unread_count) 3310 3311 w = urwid.Frame(self._columns, footer=self.status_line) 3312 super().__init__(w) 3313 3314 @property 3315 def contacts_hidden(self): 3316 return self._contacts_column not in self._columns.contents 3317 3318 @contacts_hidden.setter 3319 def contacts_hidden(self, yes_no): 3320 if yes_no and not self.contacts_hidden: 3321 self._columns.contents.remove(self._contacts_column) 3322 elif not yes_no and self.contacts_hidden: 3323 self._columns.contents.insert(0, self._contacts_column) 3324 3325 @property 3326 def focus_widget_name(self): 3327 if self.contacts_hidden or self._columns.focus_position == 1: 3328 return self.chat_w.focus_widget_name 3329 return 'contacts' 3330 3331 @focus_widget_name.setter 3332 def focus_widget_name(self, widget_name): 3333 if widget_name == 'contacts': 3334 self.contacts_hidden = False 3335 self._columns.focus_position = 0 3336 else: 3337 if cfg.contacts_autohide and not self.contacts_hidden: 3338 self.contacts_hidden = True 3339 self._columns.focus_position = 0 if self.contacts_hidden else 1 3340 self.chat_w.focus_widget_name = widget_name 3341 3342 def _focus_next(self, reverse=False): 3343 wnames = ['contacts', 'chat', 'input'] 3344 curr_wname = self.focus_widget_name 3345 if not self.chat_w.chat_view.contents and not curr_wname == 'chat': 3346 # If there are no messages in current chat (either because no chat selected, or searching has filtered out all results), don't focus it. 3347 wnames.remove('chat') 3348 curr_focus_pos = wnames.index(curr_wname) 3349 incr = -1 if reverse else 1 3350 next_wname = wnames[(curr_focus_pos + incr) % len(wnames)] 3351 self.focus_widget_name = next_wname 3352 3353 def update_unread_count(self, contact_id): 3354 self.contacts_w.contacts_list_w.update_contact_unread_count(contact_id) 3355 self.status_line.set_unread_count(self._chats_data.unread_counts.total) 3356 3357 def on_contact_selected(self, contact, focus_widget): 3358 self.status_line.set_text('') 3359 # NOTE: for now not checking what's currently in the status line, just remove whatever text was there. 3360 self.update_unread_count(contact.id) 3361 self.chat_w.on_contact_selected(contact) 3362 if focus_widget: 3363 self.focus_widget_name = focus_widget 3364 3365 def show_help(self): 3366 self._popup_ph.show_help() 3367 self.focus_widget_name = 'chat' 3368 3369 def keypress(self, size, key): 3370 key = super().keypress(size, key) 3371 if key == 'tab': 3372 self._focus_next() 3373 elif key == 'shift tab': 3374 self._focus_next(reverse=True) 3375 elif key == ':': 3376 self.focus_widget_name = 'input' 3377 self.keypress(size, key) 3378 elif key == '/' and self.focus_widget_name == 'chat': 3379 self.focus_widget_name = 'input' 3380 self.keypress(size, key) 3381 elif key in ('meta j', 'meta down'): 3382 self.contacts_w.contacts_list_w.select_next_contact() 3383 elif key in ('meta k', 'meta up'): 3384 self.contacts_w.contacts_list_w.select_next_contact(reverse=True) 3385 elif key == '?': 3386 self.show_help() 3387 elif key == 'i': 3388 if self.focus_widget_name != 'chat': 3389 return key 3390 message_widget = self.chat_w.chat_view.focus 3391 if message_widget is None: 3392 return key 3393 self._popup_ph.show_message_info(message_widget) 3394 else: 3395 return key 3396 return None 3397 3398 3399class UrwidUI: 3400 def __init__(self, contacts, chats_data): 3401 self.main_w = MainWindow(contacts, chats_data) 3402 # FYI: to later get the topmost widget, can also use `urwid_main_loop.widget` 3403 self.loop = urwid.MainLoop(self.main_w, palette=PALETTE) 3404 if cfg.color and cfg.color.high_color_mode: 3405 self.loop.screen.set_terminal_properties(256) 3406 MessageWidget.set_formatting_consants(cfg.use_formatting) 3407 3408 # Shortcuts for deeply nested attributes 3409 self.contacts = self.main_w.contacts_w.contacts_list_w 3410 self.chat = self.main_w.chat_w.chat_view 3411 self.msg_ws_cache = self.chat.body.msg_ws_cache 3412 3413 3414# ############################################################################# 3415# commands 3416# ############################################################################# 3417 3418 3419class Commands: 3420 def __init__(self, actions): 3421 self._actions = actions 3422 self.cmd_mapping = [ 3423 (['attach', 'a'], self._actions.attach), 3424 (['edit', 'e'], self._actions.external_edit), 3425 (['read', 'r'], self._actions.read), 3426 (['attachClip', 'c'], self._actions.attach_clip), 3427 (['openAttach', 'o'], self._actions.open_last_attach), 3428 (['openUrl', 'u'], self._actions.open_last_url), 3429 (['toggleNotifications', 'n'], self._actions.toggle_notifications), 3430 (['toggleAutohide', 'h'], self._actions.toggle_autohide), 3431 (['toggleContactsSort', 's'], self._actions.toggle_sort_contacts), 3432 (['renameContact'], self._actions.rename_contact), 3433 (['addContact'], self._actions.add_contact), 3434 (['reload'], self._actions.reload), 3435 (['help'], self._actions.show_help), 3436 (['quit', 'q'], self._actions.quit), 3437 ] 3438 self._map = {cmd.lower(): fn for cmds, fn in self.cmd_mapping for cmd in cmds} 3439 3440 def exec(self, cmd, *args): 3441 fn = self._map.get(cmd.lower()) 3442 if fn is None: 3443 self._actions.set_status_line(f"Command `{cmd}` not found") 3444 return None 3445 if not self._actions.check_cmd_for_current_contact(fn): 3446 self._actions.set_status_line(f":{cmd} Error: no contact currently selected") 3447 return None 3448 try: 3449 return fn(*args) 3450 except TypeError as err: 3451 # Handle only the exceptions produced by giving the wrong number of arguments to `fn()`, not any exceptions produced inside executing `fn()` (i.e. deeper in the stack trace) 3452 if err.__traceback__.tb_next is not None: 3453 raise 3454 if re.search(r"missing \d+ required positional argument", str(err)): 3455 self._actions.set_status_line(f':{cmd} missing arguments') 3456 return None 3457 elif re.search(r"takes \d+ positional arguments? but \d+ were given", str(err)): 3458 self._actions.set_status_line(f':{cmd} extra arguments') 3459 return None 3460 else: 3461 raise 3462 3463 3464class Actions: 3465 def __init__(self, daemon, contacts, chats_data, urwid_ui): 3466 self._daemon = daemon 3467 self._contacts = contacts 3468 self._chats_data = chats_data 3469 self._urwid_ui = urwid_ui 3470 3471 def reload(self): 3472 self._contacts.reload() 3473 if self._daemon.is_dbus_service_running: 3474 self.update_groups_async() 3475 else: 3476 self._update_contacts_ui() 3477 # _update_contacts_ui() is included in update_groups_async() callback because it needs to run _after_ groups are updated. 3478 3479 def _update_contacts_ui(self): 3480 self._urwid_ui.contacts.update() 3481 # Updating the title text in chat widget: 3482 try: 3483 current_contact = self._contacts.map[self._chats_data.current_contact.id] 3484 # Need to re-obtain the contact object, since the one in _chats_data now points to an outdated object 3485 except (AttributeError, KeyError): 3486 return 3487 self._urwid_ui.main_w.chat_w.set_title(current_contact) 3488 3489 def set_status_line(self, text): 3490 self._urwid_ui.main_w.status_line.set_text(text) 3491 3492 def callf(self, *args, **kwargs): 3493 """Wrapper that logs and swallows the exceptions""" 3494 try: 3495 return callf(*args, **kwargs) 3496 except (OSError, ValueError) as err: 3497 logging.exception(err) 3498 self.set_status_line( 3499 '\n'.join([ 3500 str(err), 3501 'Full error traceback written to log.', 3502 ]) 3503 ) 3504 return None 3505 3506 def send_desktop_notification(self, sender, message): 3507 if not cfg.enable_notifications: 3508 return 3509 rmap = {} 3510 for token, text in (('%s', sender), ('%m', message)): 3511 text = text.replace(r"'", r"'\''") 3512 rmap[token] = text 3513 self.callf(cfg.notification_command, rmap, background=True) 3514 3515 def send_message_curr_contact(self, message="", attachments=None): 3516 if self._chats_data.current_contact is None: 3517 return 3518 self._daemon.send_message(self._chats_data.current_contact.id, message, attachments) 3519 3520 def external_edit(self, *args): 3521 if cfg.editor_command is None: 3522 self.set_status_line(":edit Error: no command for external editor set") 3523 return 3524 3525 filename = '' 3526 if args: 3527 filename, *message = split_path(*args) 3528 3529 if is_path(filename): 3530 msg_file_path = os.path.expanduser(filename) 3531 else: 3532 msg_file_path = tmpfile = tempfile.NamedTemporaryFile( 3533 suffix='.md', delete=False 3534 ).name 3535 message = args 3536 if message: 3537 with open(msg_file_path, "w") as msg_file: 3538 msg_file.write(*message) 3539 3540 self._daemon.main_loop.stop() 3541 cmd = " ".join((cfg.editor_command, shlex.quote(msg_file_path))) 3542 self.callf(cmd) 3543 self._daemon.main_loop.start() 3544 3545 with open(msg_file_path, 'r') as msg_file: 3546 msg = msg_file.read().strip() 3547 if msg: 3548 self.send_message_curr_contact(msg) 3549 3550 try: 3551 os.remove(tmpfile) 3552 except NameError: 3553 pass 3554 3555 def read(self, path_or_cmd): 3556 message = '' 3557 if is_path(path_or_cmd): 3558 try: 3559 with open(os.path.expanduser(path_or_cmd), 'r') as file: 3560 message = file.read() 3561 except OSError as err: 3562 logging.exception(err) 3563 self.set_status_line(str(err)) 3564 elif path_or_cmd.startswith('!'): 3565 proc = self.callf( 3566 path_or_cmd[1:].strip(), 3567 capture_output=True, 3568 ) 3569 if proc is not None: 3570 message = proc.stdout 3571 else: 3572 self.set_status_line(f"Error: could not read `{path_or_cmd}`") 3573 if message != '': 3574 self.send_message_curr_contact(message) 3575 3576 def attach(self, args): 3577 attachment, *message = split_path(args) 3578 attachment = os.path.expanduser(attachment) 3579 if not os.path.isfile(attachment): 3580 self.set_status_line('File does not exist: ' + attachment) 3581 return 3582 self.send_message_curr_contact(*message, attachments=[attachment]) 3583 3584 def attach_clip(self, *message): 3585 files = clip.files() 3586 if files: 3587 self.send_message_curr_contact(*message, attachments=files) 3588 else: 3589 self.set_status_line('Clipboard is empty.') 3590 3591 def open_file(self, path): 3592 if not os.path.exists(path): 3593 logging.warning("File does not exist: %s", path) 3594 return None 3595 return self.callf(cfg.open_command, {'%u': path}, background=True) 3596 3597 def open_attach(self, envelope): 3598 attachments = get_envelope_attachments(envelope) 3599 if not attachments: 3600 return attachments 3601 for attachment in attachments: 3602 file_path = get_attachment_path(attachment) 3603 if file_path: 3604 self.open_file(file_path) 3605 return attachments 3606 3607 def open_last_attach(self): 3608 for txt in reversed(self._chats_data.current_chat): 3609 if self.open_attach(txt.envelope): 3610 return 3611 3612 def open_url(self, url): 3613 return self.callf(cfg.open_command, {'%u': url}, background=True) 3614 3615 def open_urls(self, envelope): 3616 txt = get_envelope_msg(envelope) 3617 urls = get_urls(txt) if txt else [] 3618 for url in urls: 3619 self.open_url(url) 3620 return urls 3621 3622 def open_last_url(self): 3623 for txt in reversed(self._chats_data.current_chat): 3624 if self.open_urls(txt.envelope): 3625 return 3626 3627 # pylint: disable=attribute-defined-outside-init 3628 # `Config` class uses __setattr__ that forwards to argparser's `args` instance. 3629 @staticmethod 3630 def toggle_autohide(): 3631 cfg.contacts_autohide = not cfg.contacts_autohide 3632 3633 def toggle_sort_contacts(self): 3634 cfg.contacts_sort_alpha = not cfg.contacts_sort_alpha 3635 self.reload() 3636 3637 def toggle_notifications(self): 3638 cfg.enable_notifications = not cfg.enable_notifications 3639 notif = ''.join(( 3640 'Desktop notifications are ', 3641 'ON' if cfg.enable_notifications else 'OFF', 3642 '.' 3643 )) 3644 self.set_status_line(notif) 3645 # pylint: enable=attribute-defined-outside-init 3646 3647 def add_contact(self, args): 3648 """Add a new contact. 3649 3650 The syntax is 3651 :addContact +NUMBER [Contact Name] 3652 """ 3653 try: 3654 number, name = args.split(maxsplit=1) 3655 except ValueError: 3656 number, name = args, "" 3657 if not is_number(number): 3658 self.set_status_line(f':addContact "{number}": not a valid number') 3659 return 3660 self._daemon.rename_contact(number, name, is_group=False, callback=lambda *i: self.reload()) 3661 3662 def rename_contact(self, args): 3663 """Rename contact. 3664 3665 :renameContact +NUMBER new name here -> use +NUMBER number 3666 :renameContact "Old Name" new name here -> use contact named "Old Name" 3667 :renameContact new name here -> rename current contact or group 3668 """ 3669 try: 3670 number, new_name = split_path(args) 3671 if not is_number(number): 3672 for contact_id, contact in self._contacts.map.items(): 3673 if contact.name == number: 3674 is_group = contact.is_group 3675 break 3676 else: # contact with name `number` not found 3677 raise ValueError 3678 elif self._contacts.get_by_id(number) is None: 3679 self.set_status_line(f":renameContact Error: no contact with number {number} found") 3680 return 3681 else: 3682 is_group = False 3683 contact_id = number 3684 except ValueError: 3685 if self._chats_data.current_contact is None: 3686 self.set_status_line(":renameContact Error: no contact currently selected") 3687 return 3688 contact_id = self._chats_data.current_contact.id 3689 is_group = self._chats_data.current_contact.is_group 3690 new_name = args 3691 self._daemon.rename_contact(contact_id, new_name, is_group, lambda *i: self.reload()) 3692 3693 def delete_message(self, msg, index=None): 3694 self._chats_data.chats.delete_message(msg, index) 3695 self._chats_data.delivery_status.delete(msg.timestamp) 3696 3697 def resend_message(self, msg, index=None): 3698 if msg.delivery_status != 'send_failed': 3699 # Only allow re-sending previously failed-to-send messages 3700 raise TypeError 3701 self.delete_message(msg, index) 3702 self.set_status_line('') # remove 'send-failed' status line 3703 envelope = msg.envelope 3704 contact_id = get_envelope_contact_id(envelope) 3705 message = get_envelope_msg(envelope) 3706 attachments = get_envelope_attachments(envelope) 3707 self._daemon.send_message(contact_id, message, attachments) 3708 3709 def update_groups_async(self): 3710 def on_groups_updated(): 3711 self._contacts.set_groups_membership() 3712 self._update_contacts_ui() 3713 with self._daemon.callback_finally(on_groups_updated): 3714 for group in self._contacts.groups: 3715 self._daemon.get_group_name( 3716 group.id, 3717 callback=lambda name, group=group: group.update_record( 3718 {'name': strip_non_printable_chars(name)} 3719 ) 3720 ) 3721 self._daemon.get_group_members( 3722 group.id, 3723 callback=lambda members_ids, group=group: setattr( 3724 group, 'members_ids', members_ids 3725 ) 3726 ) 3727 3728 def show_new_msg_notifications(self, msg): 3729 sender_name = self.get_contact_name(msg.sender_num) 3730 3731 def get_msg_notif(): 3732 try: 3733 *_, reaction_envelope = msg.reactions.values() # the latest reaction envelope 3734 except (AttributeError, ValueError): 3735 pass 3736 else: 3737 reaction = get_envelope_reaction(reaction_envelope) 3738 if reaction.get('isRemove'): 3739 return None 3740 reaction_emoji = reaction['emoji'] 3741 return (reaction_emoji, 3742 ''.join(( 3743 'New reaction from ', repr(sender_name), ': ', 3744 reaction_emoji, 3745 ))) 3746 3747 msg_text = msg.text 3748 if msg_text: 3749 return (msg_text, 3750 ''.join(( 3751 # NOTE: this could be a list (urwid.Text markup type), except for the textwrap.shorten below 3752 'New message from ', repr(sender_name), ': ', repr(msg_text) 3753 )) 3754 ) 3755 3756 if msg.attachments: 3757 return('[attachments]', 3758 ''.join(( 3759 'New attachments message from: ', repr(sender_name) 3760 )) 3761 ) 3762 3763 incoming_call = get_nested(msg.envelope, 'callMessage', 'offerMessage') 3764 if incoming_call: 3765 txt = ' Incoming call' 3766 return(txt, 3767 ' '.join(( 3768 txt, 'from', repr(sender_name) 3769 )) 3770 ) 3771 3772 return None 3773 3774 try: 3775 msg_text, notif = get_msg_notif() 3776 except TypeError: 3777 return 3778 notif = textwrap.shorten(notif, 80) 3779 self.send_desktop_notification(sender_name, msg_text) 3780 if (self._chats_data.current_contact is None 3781 or msg.contact_id != self._chats_data.current_contact.id): 3782 self.set_status_line(notif) 3783 3784 def get_contact_name(self, contact_num): 3785 contact = self._contacts.get_by_id(contact_num) 3786 return contact.name_or_id if contact else contact_num 3787 3788 def check_cmd_for_current_contact(self, fn): 3789 return ( 3790 self._chats_data.current_contact is not None 3791 or fn not in ( 3792 self.external_edit, 3793 self.read, 3794 self.attach, 3795 self.attach_clip, 3796 self.open_last_attach, 3797 self.open_last_url, 3798 ) 3799 ) 3800 3801 def show_help(self): 3802 self._urwid_ui.main_w.show_help() 3803 3804 @staticmethod 3805 def quit(): 3806 raise urwid.ExitMainLoop() 3807 3808 3809class ActionRequest: 3810 # The idea of having this class & its instance is to make a *globally accessible* function for all UI classes to call to request an action (e.g. setting status line text), without having to pass `Actions` instances down the class stack to every UI class that needs (or might need) it. 3811 # There might be a better OO way of doing this though. 3812 3813 def __init__(self, actions=None): 3814 self._actions = actions 3815 3816 def set_actions(self, actions): 3817 self._actions = actions 3818 3819 def __getattr__(self, method): 3820 return getattr(self._actions, method) 3821 3822 3823action_request = ActionRequest() 3824 3825 3826# ############################################################################# 3827# Coordinate 3828# ############################################################################# 3829 3830 3831class Coordinate: 3832 def __init__(self): 3833 self._chats_data = ChatsData(cfg.save_history) 3834 sigdata = SignalData(cfg.username) 3835 self._contacts = Contacts(sigdata) 3836 self._ui = UrwidUI(self._contacts, self._chats_data) 3837 self.daemon = Daemon(self._ui.loop, cfg.username) 3838 self._actions = Actions(self.daemon, self._contacts, self._chats_data, self._ui) 3839 self._commands = Commands(self._actions) 3840 action_request.set_actions(self._actions) 3841 self._connect_methods() 3842 3843 def _connect_methods(self): 3844 for cb_name in self.daemon.callbacks: 3845 self.daemon.callbacks[cb_name] = getattr(self, "_on_" + cb_name) 3846 urwid.connect_signal(self._ui.contacts, 'contact_selected', self._on_contact_selected) 3847 cfg.on_modified = self._on_cfg_changed 3848 self._chats_data.delivery_status.on_status_changed = self._ui.msg_ws_cache.on_delivery_status_changed 3849 Message.set_class_functions( 3850 get_delivery_status=self._chats_data.delivery_status.get_detailed, 3851 get_contact=self._contacts.get_by_id, 3852 ) 3853 self._ui.main_w.chat_w.input_line_w.set_cmds(self._commands) 3854 self._chats_data.typing_indicators.set_alarm_in = self._ui.loop.set_alarm_in 3855 self._chats_data.typing_indicators.remove_alarm = self._ui.loop.remove_alarm 3856 3857 def _on_sending_message(self, envelope): 3858 group_members = None 3859 if is_envelope_group_message(envelope): 3860 group_id = get_envelope_contact_id(envelope) 3861 group = self._contacts.get_by_id(group_id) 3862 if group is not None: 3863 # Can happen if `group` is absent from the `groupStore` (for whatever reason), and we get a sync-ed message sent to `group` from another device. See #126. 3864 group_members = group.members_ids 3865 self._chats_data.delivery_status.on_sending_message(envelope, group_members) 3866 msg = self._chats_data.chats.add_envelope(envelope) 3867 self._ui.chat.try_set_focus(-1) 3868 self._ui.contacts.on_new_message(msg) 3869 3870 def _on_sending_done(self, envelope, status='sent', timestamp_adj=None): 3871 self._chats_data.delivery_status.on_sending_done(envelope, status, timestamp_adj) 3872 self._ui.contacts.on_sending_done(envelope, status, timestamp_adj) 3873 3874 try: 3875 msg = self._chats_data.chats.get_msg_for_envelope(envelope) 3876 except ValueError: 3877 return 3878 3879 if status == 'send_failed': 3880 msg_txt = textwrap.shorten(msg.text, 20) 3881 self._actions.set_status_line( 3882 f'Message "{msg_txt}" failed to send. ' 3883 'Press `r` on message to re-send.' 3884 ) 3885 return 3886 3887 if timestamp_adj is not None: 3888 self._ui.msg_ws_cache.adjust_timestamp(msg, timestamp_adj) 3889 chat = self._chats_data.chats.get_chat_for_envelope(envelope) 3890 chat.adjust_timestamp(msg, timestamp_adj) 3891 self._ui.chat.try_set_focus(-1) 3892 3893 self._chats_data.delivery_status.process_buffered_receipts(msg.timestamp) 3894 3895 def _on_receive_message(self, envelope): 3896 logging.info('MESSAGE: %s', envelope) 3897 contact_id = get_envelope_contact_id(envelope) 3898 if contact_id not in self._contacts.map: 3899 if not self._on_unknown_contact(envelope): 3900 return 3901 3902 sender_num = get_envelope_sender_id(envelope) 3903 self._chats_data.typing_indicators.remove(sender_num) 3904 3905 msg = self._chats_data.chats.add_envelope(envelope) 3906 self._on_new_message(msg) 3907 3908 def _on_unknown_contact(self, envelope): 3909 logging.info("Message from unknown contact: %s", envelope) 3910 self._actions.reload() 3911 contact_id = get_envelope_contact_id(envelope) 3912 try: 3913 return self._contacts.map[contact_id] 3914 except KeyError: 3915 pass 3916 logging.error("Message from unknown contact: %s", envelope) 3917 sender_num = get_envelope_sender_id(envelope) 3918 msg_text = get_envelope_msg(envelope) 3919 self._actions.set_status_line([ 3920 'Message from unknown chat ', 3921 repr(contact_id), 3922 '\n', 3923 (sender_num + ': ' if sender_num != contact_id else ''), 3924 repr(msg_text), 3925 ]) 3926 self._actions.send_desktop_notification(sender_num, msg_text) 3927 return None 3928 3929 def _on_receive_sync_message(self, envelope): 3930 self._on_sending_message(envelope) 3931 self._on_sending_done(envelope) 3932 3933 def _on_new_message(self, msg): 3934 self._ui.contacts.on_new_message(msg) 3935 contact_id = msg.contact_id 3936 if (self._chats_data.current_contact is not None 3937 and contact_id == self._chats_data.current_contact.id): 3938 self._ui.chat.try_set_focus(-1) 3939 else: 3940 self._chats_data.unread_counts[contact_id] += 1 3941 self._ui.main_w.update_unread_count(contact_id) 3942 self._actions.show_new_msg_notifications(msg) 3943 3944 def _on_receive_receipt(self, envelope): 3945 self._chats_data.delivery_status.on_receive_receipt(envelope) 3946 3947 def _on_receive_reaction(self, envelope): 3948 msg = self._chats_data.chats.add_reaction_envelope(envelope) 3949 if not msg: 3950 return 3951 msg_w = self._ui.msg_ws_cache.get(msg) 3952 msg_w.update_reactions_w() 3953 self._on_new_message(msg) 3954 3955 def _on_daemon_log(self, log_line): 3956 if log_line.startswith("ERROR") and not self.daemon.is_dbus_service_running: 3957 self._actions.set_status_line([ 3958 "signal-cli daemon has stopped:\n ", 3959 log_line, 3960 "\nRestart scli to restart the daemon." 3961 ]) 3962 elif "in use by another instance" in log_line: 3963 self._actions.set_status_line([ 3964 "signal-cli: Config file is in use by another instance, waiting…\n", 3965 "Stop previously launched signal-cli processes to continue.", 3966 ]) 3967 3968 def _on_daemon_started(self): 3969 logging.info("signal-cli dbus service started") 3970 self._actions.set_status_line("Initializing signal-cli daemon... Done") 3971 def clear_status_line(*_args): 3972 self._actions.set_status_line("") 3973 self._ui.loop.set_alarm_in(2, clear_status_line) 3974 self._actions.update_groups_async() 3975 self.daemon.get_signal_cli_version(callback=logging.info) 3976 3977 def _on_contact_selected(self, contact, focus_widget): 3978 self._chats_data.current_contact = contact 3979 self._ui.chat.contents = self._chats_data.current_chat 3980 self._chats_data.unread_counts[contact.id] = 0 3981 self._ui.main_w.on_contact_selected(contact, focus_widget) 3982 3983 def _on_cfg_changed(self, key, val): 3984 if key == 'contacts_autohide': 3985 self._ui.main_w.contacts_hidden = val 3986 3987 def _on_contact_typing(self, envelope): 3988 self._chats_data.typing_indicators.on_typing_message(envelope) 3989 self._ui.chat.try_set_focus(-1) 3990 3991 def _on_call_message(self, envelope): 3992 call_message = envelope['callMessage'] 3993 if ( 3994 'offerMessage' in call_message 3995 or 'answerMessage' in call_message 3996 or get_nested(call_message, 'hangupMessage', 'type') == 'NORMAL' 3997 ): 3998 msg = self._chats_data.chats.add_envelope(envelope) 3999 if 'offerMessage' in call_message: 4000 # Incoming call 4001 self._on_new_message(msg) 4002 4003 def _on_contacts_sync(self): 4004 logging.info("Received contacts sync message, reloading signal-cli contacts") 4005 self._actions.reload() 4006 4007 def _on_remote_delete(self, envelope): 4008 msg = self._chats_data.chats.add_remote_delete_envelope(envelope) 4009 if not msg: 4010 return 4011 msg_w = self._ui.msg_ws_cache.get(msg) 4012 msg_w.reload_markup() 4013 4014 4015# ############################################################################# 4016# config 4017# ############################################################################# 4018 4019 4020class Config: 4021 def __init__(self, cfg_obj): 4022 self._cfg_obj = cfg_obj 4023 self.on_modified = noop 4024 4025 def set(self, cfg_obj): 4026 self._cfg_obj = cfg_obj 4027 4028 def __getattr__(self, name): 4029 return getattr(self._cfg_obj, name) 4030 4031 def __setattr__(self, name, value): 4032 if name != '_cfg_obj' and hasattr(self._cfg_obj, name): 4033 setattr(self._cfg_obj, name, value) 4034 self.on_modified(name, value) 4035 else: 4036 super().__setattr__(name, value) 4037 4038 4039cfg = Config(None) 4040 4041 4042# ############################################################################# 4043# argparse 4044# ############################################################################# 4045 4046 4047class CustomDefaultsHelpFormatter(argparse.ArgumentDefaultsHelpFormatter): 4048 """Show default values in `--help` output for custom-set default values. 4049 4050 Modified `argparse.ArgumentDefaultsHelpFormatter` class that adds 4051 `(default: %(default)s)` 4052 to `--help` output, but only for the explicitly-set `default`s: not `True` for `action=store_true` arguments, and not `None` for `action=store` arguments (`action=store` is the default action for `argparse.add_argument()`, and `None` its default value). 4053 """ 4054 4055 def _get_help_string(self, action): 4056 if action.default in (None, False): 4057 return action.help 4058 return super()._get_help_string(action) 4059 4060 4061def make_arg_parser(): 4062 parser = argparse.ArgumentParser( 4063 formatter_class=CustomDefaultsHelpFormatter, 4064 ) 4065 4066 subparser = parser.add_subparsers( 4067 description='Use `%(prog)s <subcommand> -h` for additional help.', 4068 dest='subcommand', 4069 ) 4070 parser_link = subparser.add_parser( 4071 'link', 4072 help='Link to an existing device.', 4073 formatter_class=CustomDefaultsHelpFormatter, 4074 ) 4075 parser_link.add_argument( 4076 '-n', 4077 '--name', 4078 default='scli', 4079 help='Device name that will be shown in "Linked devices" list on master device.', 4080 ) 4081 4082 parser.add_argument( 4083 '-c', 4084 '--config-file', 4085 default=SCLI_CFG_FILE, 4086 help='Path to the config file. Arguments on the command line override settings in the file.', 4087 ) 4088 4089 parser.add_argument( 4090 '-u', 4091 '--username', 4092 help='Phone number starting with "+" followed by country code. If not given, %(prog)s will look for an existing profile in signal-cli\'s data dir.', 4093 ) 4094 4095 parser.add_argument( 4096 '-N', 4097 '--notification-command', 4098 default="notify-send scli '%s - %m'", 4099 help='Command to run when a new message arrives. %%m is replaced with the message, %%s is replaced with the sender.', 4100 ) 4101 4102 parser.add_argument( 4103 '-o', 4104 '--open-command', 4105 default='xdg-open "%u"', 4106 help='File/URL opener command. %%u is replaced with the path.', 4107 ) 4108 4109 parser.add_argument( 4110 '-e', 4111 '--editor-command', 4112 help='External text editor command. If not set, %(prog)s checks among `$VISUAL`, `$EDITOR`, `sensible-editor` etc.', 4113 ) 4114 4115 parser.add_argument( 4116 '-G', 4117 '--clipboard-get-command', 4118 help='Command used by `:attachClip` to get a list of files to send as attachments. Should return one absolute file path per line. If not set, `xclip` is used.', 4119 ) 4120 4121 parser.add_argument( 4122 '-P', 4123 '--clipboard-put-command', 4124 help='Command to put text on clipboard. %%s will be replaced with the text. If not set, `xclip` is used.', 4125 ) 4126 4127 parser.add_argument( 4128 '-s', 4129 '--save-history', 4130 nargs='?', 4131 const=SCLI_HISTORY_FILE, 4132 default=False, 4133 metavar='HISTORY_FILE', 4134 help='Enable conversations history. History is saved in plain text. (default %(metavar)s: %(const)s)', 4135 ) 4136 4137 parser.add_argument( 4138 '--log-file', 4139 default=SCLI_LOG_FILE, 4140 help='Path to the log file. If not explicitly specified, logs are written only if `--debug` or `--save-history` are on.', 4141 ) 4142 4143 parser.add_argument( 4144 '-n', 4145 '--enable-notifications', 4146 action='store_true', 4147 help='Enable desktop notifications. (Also see --notification-command)', 4148 ) 4149 4150 parser.add_argument( 4151 '-f', 4152 '--use-formatting', 4153 action='store_true', 4154 help='Show _italic_, *bold*, ~strikethrough~ formatting in messages.', 4155 ) 4156 4157 parser.add_argument( 4158 '--color', 4159 nargs='?', 4160 const=True, 4161 default=False, 4162 help="Colorize messages. See README for options.", 4163 ) 4164 4165 parser.add_argument( 4166 '-w', 4167 '--wrap-at', 4168 default='85%', 4169 help="Wrap messages' text at a given number of columns / percentage of available screen width.", 4170 ) 4171 4172 parser.add_argument( 4173 '--one-sided', 4174 action='store_true', 4175 help='Left-align both sent and received messages', 4176 ) 4177 4178 parser.add_argument( 4179 '--show-names', 4180 action='store_true', 4181 help="Show contacts' names next to messages, even in one-to-one conversations.", 4182 ) 4183 4184 parser.add_argument( 4185 '--group-contacts', 4186 action='store_true', 4187 help=argparse.SUPPRESS, 4188 # The option name can be confusing, e.g. in: 4189 # https://github.com/isamert/scli/issues/95#issuecomment-757502271 4190 # Keep for backwards compatiability, but don't show in `--help`. Use `--partition-contacts` instead. 4191 ) 4192 4193 parser.add_argument( 4194 '--partition-contacts', 4195 action='store_true', 4196 help='Separate groups and individual contacts in the contacts list.', 4197 ) 4198 4199 parser.add_argument( 4200 '--contacts-autohide', 4201 action='store_true', 4202 help='Autohide the contacts pane when it loses focus.', 4203 ) 4204 4205 parser.add_argument( 4206 '--contacts-sort-alpha', 4207 action='store_true', 4208 help='Sort contacts alphabetically. (default: sort by the most recent message)', 4209 ) 4210 4211 parser.add_argument( 4212 '--daemon-command', 4213 default='signal-cli -u %u --output=json daemon', 4214 help='Command for starting signal-cli daemon. The `%%u` in command will be replaced with username (phone number).', 4215 ) 4216 4217 parser.add_argument( 4218 '--no-daemon', 4219 action='store_true', 4220 help='Do not start signal-cli daemon. Only useful for debugging scli.', 4221 ) 4222 4223 parser.add_argument( 4224 '--debug', 4225 action='store_true', 4226 help='Verbose log output.', 4227 ) 4228 4229 parser.add_argument( 4230 '--version', 4231 action='version', 4232 version='%(prog)s ' + __version__, 4233 ) 4234 4235 return parser 4236 4237 4238def get_cfg_file_args(file_obj): 4239 # Alternatively, can override `ArgumentParser.convert_arg_line_to_args()`. 4240 # Could use `configparser` module if the syntax gets more complicated. 4241 ret = {} 4242 for line in file_obj: 4243 line = line.strip() 4244 if not line or line.startswith('#'): 4245 continue 4246 name, _, val = line.partition("=") 4247 ret[name.strip()] = val.strip() 4248 return ret 4249 4250 4251def get_opt_val_flags(parser): 4252 """Flags that optionally take values. 4253 4254 These are defined by 4255 ..., nargs='?', const=True, default=False, ... 4256 See 4257 https://docs.python.org/3/library/argparse.html#nargs 4258 4259 They allow any of the following forms on the command line: 4260 --color 4261 --color=high 4262 <nothing> (i.e. option omitted) 4263 In config file this corresponds to: 4264 color = true 4265 color = high 4266 color = false 4267 OR 4268 <nothing> (option not mentioned in config) 4269 """ 4270 4271 # For these arguments, checking for the `False` values in the config file (e.g. 'false', 'f', 'no', etc) needs to be done explicitly, unlike for the regular flags that interpret any config value that is not in ('true', 't', 'yes', etc) as `False`. 4272 4273 return frozenset( 4274 opt 4275 for a in parser._actions # pylint: disable=protected-access 4276 for opt in a.option_strings 4277 if ( 4278 a.nargs == argparse.OPTIONAL 4279 and isinstance(a.const, bool) 4280 and isinstance(a.default, bool) 4281 ) 4282 ) 4283 4284 4285def parse_cfg_file(parser, cli_args): 4286 cfg_file_path = os.path.expanduser(cli_args.config_file) 4287 try: 4288 with open(cfg_file_path) as cfg_f: 4289 cfg_f_args_dict = get_cfg_file_args(cfg_f) 4290 except FileNotFoundError: 4291 if cli_args.config_file == parser.get_default('config_file'): 4292 return cli_args 4293 sys.exit("ERROR: Config file not found: " + cfg_file_path) 4294 4295 opt_val_flags = get_opt_val_flags(parser) 4296 args_list = [] 4297 for arg_name, arg_val in cfg_f_args_dict.items(): 4298 arg_dest = arg_name.replace('-', '_') # Assuming `dest` has not been overriden. 4299 if arg_dest not in cli_args: 4300 print("WARNING: encountered an unrecognized argument while parsing config file:", arg_name, file=sys.stderr) 4301 continue 4302 arg_default = parser.get_default(arg_dest) 4303 arg_name = '--' + arg_name 4304 if isinstance(arg_default, bool): 4305 if arg_val.lower() in ('true', 't', 'yes', 'y'): 4306 args_list.append(arg_name) 4307 elif ( 4308 arg_name in opt_val_flags 4309 and arg_val.lower() not in ('false', 'f', 'no', 'n') 4310 ): 4311 args_list.extend((arg_name, arg_val)) 4312 else: 4313 args_list.extend((arg_name, arg_val)) 4314 # Need to actually parse the arguments (rather then simply updating args.__dict__), so that the `type`s would be set correctly. 4315 cfg_file_args = parser.parse_args(args_list) 4316 parser.parse_args(namespace=cfg_file_args) 4317 return cfg_file_args 4318 4319 4320def parse_wrap_at_arg(width): 4321 def bad_val(width): 4322 sys.exit( 4323 f"ERROR: Could not parse the width value: `{width}`.\n" 4324 "The value should be an `<int>` or a `<float>%` (`42` or `42.42%`).\n" 4325 "See `--help` for additional info." 4326 ) 4327 if width.endswith('%'): 4328 try: 4329 percent_width = float(width.rstrip('%')) 4330 except ValueError: 4331 bad_val(width) 4332 return ('relative', percent_width) 4333 else: 4334 try: 4335 return int(width) 4336 except ValueError: 4337 bad_val(width) 4338 4339 4340def parse_args(): 4341 parser = make_arg_parser() 4342 args = parser.parse_args() 4343 4344 if args.subcommand == 'link': 4345 link_device(args.name) 4346 sys.exit() 4347 4348 if args.config_file: 4349 args = parse_cfg_file(parser, args) 4350 if args.editor_command is None: 4351 args.editor_command = get_default_editor() 4352 if not args.username: 4353 args.username = detect_user_name() 4354 if args.color: 4355 args.color = Color(args.color) 4356 args.partition_contacts = args.partition_contacts or args.group_contacts 4357 del args.__dict__['group_contacts'] 4358 args.wrap_at = parse_wrap_at_arg(args.wrap_at) 4359 return args 4360 4361 4362# ############################################################################# 4363# main 4364# ############################################################################# 4365 4366 4367class BracketedPasteMode: 4368 """Context manager for enabling/disabling bracketed paste mode.""" 4369 # Same as tdryer's code 4370 # https://github.com/urwid/urwid/issues/119#issuecomment-761424363 4371 4372 def __enter__(self): 4373 sys.stdout.write('\x1b[?2004h') 4374 4375 def __exit__(self, exc_type, exc_value, traceback): 4376 sys.stdout.write('\x1b[?2004l') 4377 4378 4379def link_device(device_name): 4380 try: 4381 pyqrcode = importlib.import_module('pyqrcode') 4382 except ImportError: 4383 sys.exit( 4384 "ERROR: `pyqrcode` module not found. " 4385 "Please install it with `pip install pyqrcode`" 4386 ) 4387 print("Retrieving QR code, please wait...") 4388 cmd_link = ['signal-cli', 'link', '-n', device_name] 4389 pipe_link = subprocess.Popen(cmd_link, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 4390 line = pipe_link.stdout.readline().strip() 4391 4392 if line.startswith(('tsdevice:/', 'sgnl://linkdevice')): 4393 qr_obj = pyqrcode.create(line, version=10) 4394 print(qr_obj.terminal(module_color='black', background='white')) 4395 else: 4396 sys.exit( 4397 "ERROR: Encountered a problem while linking:\n" 4398 f"{line}\n" 4399 f"{pipe_link.stderr.read()}" 4400 ) 4401 4402 print( 4403 "Scan the QR code with Signal app on your phone and wait for the linking process to finish.\n" 4404 "You might need to zoom out for the QR code to display properly.\n" 4405 "This may take a moment..." 4406 ) 4407 4408 pipe_link.wait() 4409 if pipe_link.returncode != 0: 4410 sys.exit( 4411 "ERROR: Encountered a problem while linking:\n" 4412 f"{pipe_link.stderr.read()}" 4413 ) 4414 4415 print('Receiving data for the first time...') 4416 4417 cmd_receive = 'signal-cli -u {} receive'.format(detect_user_name()) 4418 pipe_receive = subprocess.Popen(cmd_receive.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 4419 4420 for receive_out in iter(pipe_receive.stdout.readline, ''): 4421 print(receive_out, end='') 4422 4423 pipe_receive.wait() 4424 if pipe_receive.returncode != 0: 4425 sys.exit( 4426 "ERROR: Encountered a problem while receiving:\n" 4427 f"{pipe_receive.stderr.read()}" 4428 ) 4429 4430 print('Done.') 4431 sys.exit(0) 4432 4433 4434def detect_user_name(): 4435 ulist = [] 4436 for folder in [SIGNALCLI_DATA_FOLDER, SIGNALCLI_LEGACY_DATA_FOLDER]: 4437 try: 4438 users = [x for x in os.listdir(folder) if os.path.isfile(os.path.join(folder, x))] 4439 ulist.extend(users) 4440 except FileNotFoundError: 4441 pass 4442 4443 if not ulist: 4444 sys.exit("ERROR: Could not find any registered accounts. " 4445 "Register a new one or link with an existing device (see README).") 4446 elif len(ulist) == 1: 4447 return ulist[0] 4448 else: 4449 sys.exit("ERROR: Multiple accounts found. Run one of:\n\t" 4450 + "\n\t".join((f"scli --username={u}" for u in ulist))) 4451 4452 4453def main(): 4454 try: 4455 os.makedirs(SCLI_ATTACHMENT_FOLDER) 4456 except OSError as exc: 4457 if not (exc.errno == errno.EEXIST and os.path.isdir(SCLI_DATA_FOLDER)): 4458 sys.exit("ERROR: Could not create a directory in " + SCLI_DATA_FOLDER) 4459 4460 args = parse_args() 4461 4462 if args.debug: 4463 logging.basicConfig(filename=args.log_file, level=logging.DEBUG) 4464 elif args.save_history or args.log_file != SCLI_LOG_FILE: 4465 logging.basicConfig(filename=args.log_file, level=logging.WARNING) 4466 else: 4467 logging.disable() 4468 logging.info("scli %s", __version__) 4469 4470 cfg.set(args) 4471 4472 coord = Coordinate() 4473 loop = coord.daemon.main_loop 4474 4475 if not args.no_daemon: 4476 proc = coord.daemon.start() 4477 atexit.register(proc.kill) 4478 action_request.set_status_line("Initializing signal-cli daemon... ") 4479 4480 for sig in {signal_ipc.SIGHUP, signal_ipc.SIGTERM}: 4481 signal_ipc.signal(sig, lambda signum, frame: action_request.quit()) 4482 4483 with BracketedPasteMode(): 4484 loop.run() 4485 4486 4487__version__ = get_version() 4488 4489if __name__ == "__main__": 4490 main() 4491