1# -*- coding: utf-8 -*- 2# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com> 3# Copyright © 2017-2018 Dylan Baker 4# This file is released under the GNU GPL, version 3 or a later revision. 5# For further details see the COPYING file 6from datetime import timedelta 7from datetime import datetime 8from collections import deque 9import logging 10import mimetypes 11import os 12import re 13import shlex 14import subprocess 15import email 16from email.mime.audio import MIMEAudio 17from email.mime.base import MIMEBase 18from email.mime.image import MIMEImage 19from email.mime.text import MIMEText 20import asyncio 21 22import urwid 23import magic 24 25 26def split_commandline(s, comments=False, posix=True): 27 """ 28 splits semi-colon separated commandlines 29 """ 30 # shlex seems to remove unescaped quotes and backslashes 31 s = s.replace('\\', '\\\\') 32 s = s.replace('\'', '\\\'') 33 s = s.replace('\"', '\\\"') 34 lex = shlex.shlex(s, posix=posix) 35 lex.whitespace_split = True 36 lex.whitespace = ';' 37 if not comments: 38 lex.commenters = '' 39 return list(lex) 40 41 42def split_commandstring(cmdstring): 43 """ 44 split command string into a list of strings to pass on to subprocess.Popen 45 and the like. This simply calls shlex.split but works also with unicode 46 bytestrings. 47 """ 48 assert isinstance(cmdstring, str) 49 return shlex.split(cmdstring) 50 51 52def string_sanitize(string, tab_width=8): 53 r""" 54 strips, and replaces non-printable characters 55 56 :param tab_width: number of spaces to replace tabs with. Read from 57 `globals.tabwidth` setting if `None` 58 :type tab_width: int or `None` 59 60 >>> string_sanitize(' foo\rbar ', 8) 61 ' foobar ' 62 >>> string_sanitize('foo\tbar', 8) 63 'foo bar' 64 >>> string_sanitize('foo\t\tbar', 8) 65 'foo bar' 66 """ 67 68 string = string.replace('\r', '') 69 70 lines = list() 71 for line in string.split('\n'): 72 tab_count = line.count('\t') 73 74 if tab_count > 0: 75 line_length = 0 76 new_line = list() 77 for i, chunk in enumerate(line.split('\t')): 78 line_length += len(chunk) 79 new_line.append(chunk) 80 81 if i < tab_count: 82 next_tab_stop_in = tab_width - (line_length % tab_width) 83 new_line.append(' ' * next_tab_stop_in) 84 line_length += next_tab_stop_in 85 lines.append(''.join(new_line)) 86 else: 87 lines.append(line) 88 89 return '\n'.join(lines) 90 91 92def string_decode(string, enc='ascii'): 93 """ 94 safely decodes string to unicode bytestring, respecting `enc` as a hint. 95 96 :param string: the string to decode 97 :type string: str or unicode 98 :param enc: a hint what encoding is used in string ('ascii', 'utf-8', ...) 99 :type enc: str 100 :returns: the unicode decoded input string 101 :rtype: unicode 102 103 """ 104 105 if enc is None: 106 enc = 'ascii' 107 try: 108 string = str(string, enc, errors='replace') 109 except LookupError: # malformed enc string 110 string = string.decode('ascii', errors='replace') 111 except TypeError: # already str 112 pass 113 return string 114 115 116def shorten(string, maxlen): 117 """shortens string if longer than maxlen, appending ellipsis""" 118 if 1 < maxlen < len(string): 119 string = string[:maxlen - 1] + '…' 120 return string[:maxlen] 121 122 123def shorten_author_string(authors_string, maxlength): 124 """ 125 Parse a list of authors concatenated as a text string (comma 126 separated) and smartly adjust them to maxlength. 127 128 1) If the complete list of sender names does not fit in maxlength, it 129 tries to shorten names by using only the first part of each. 130 131 2) If the list is still too long, hide authors according to the 132 following priority: 133 134 - First author is always shown (if too long is shorten with ellipsis) 135 136 - If possible, last author is also shown (if too long, uses ellipsis) 137 138 - If there are more than 2 authors in the thread, show the 139 maximum of them. More recent senders have higher priority. 140 141 - If it is finally necessary to hide any author, an ellipsis 142 between first and next authors is added. 143 """ 144 145 # I will create a list of authors by parsing author_string. I use 146 # deque to do popleft without performance penalties 147 authors = deque() 148 149 # If author list is too long, it uses only the first part of each 150 # name (gmail style) 151 short_names = len(authors_string) > maxlength 152 for au in authors_string.split(", "): 153 if short_names: 154 author_as_list = au.split() 155 if len(author_as_list) > 0: 156 authors.append(author_as_list[0]) 157 else: 158 authors.append(au) 159 160 # Author chain will contain the list of author strings to be 161 # concatenated using commas for the final formatted author_string. 162 authors_chain = deque() 163 164 if len(authors) == 0: 165 return '' 166 167 # reserve space for first author 168 first_au = shorten(authors.popleft(), maxlength) 169 remaining_length = maxlength - len(first_au) 170 171 # Tries to add an ellipsis if no space to show more than 1 author 172 if authors and maxlength > 3 and remaining_length < 3: 173 first_au = shorten(first_au, maxlength - 3) 174 remaining_length += 3 175 176 # Tries to add as more authors as possible. It takes into account 177 # that if any author will be hidden, and ellipsis should be added 178 while authors and remaining_length >= 3: 179 au = authors.pop() 180 if len(au) > 1 and (remaining_length == 3 or (authors and 181 remaining_length < 7)): 182 authors_chain.appendleft('…') 183 break 184 else: 185 if authors: 186 # 5= ellipsis + 2 x comma and space used as separators 187 au_string = shorten(au, remaining_length - 5) 188 else: 189 # 2 = comma and space used as separator 190 au_string = shorten(au, remaining_length - 2) 191 remaining_length -= len(au_string) + 2 192 authors_chain.appendleft(au_string) 193 194 # Add the first author to the list and concatenate list 195 authors_chain.appendleft(first_au) 196 authorsstring = ', '.join(authors_chain) 197 return authorsstring 198 199 200def pretty_datetime(d): 201 """ 202 translates :class:`datetime` `d` to a "sup-style" human readable string. 203 204 >>> now = datetime.now() 205 >>> now.strftime('%c') 206 'Sat 31 Mar 2012 14:47:26 ' 207 >>> pretty_datetime(now) 208 'just now' 209 >>> pretty_datetime(now - timedelta(minutes=1)) 210 '1min ago' 211 >>> pretty_datetime(now - timedelta(hours=5)) 212 '5h ago' 213 >>> pretty_datetime(now - timedelta(hours=12)) 214 '02:54am' 215 >>> pretty_datetime(now - timedelta(days=1)) 216 'yest 02pm' 217 >>> pretty_datetime(now - timedelta(days=2)) 218 'Thu 02pm' 219 >>> pretty_datetime(now - timedelta(days=7)) 220 'Mar 24' 221 >>> pretty_datetime(now - timedelta(days=356)) 222 'Apr 2011' 223 """ 224 ampm = d.strftime('%p').lower() 225 if len(ampm): 226 hourfmt = '%I' + ampm 227 hourminfmt = '%I:%M' + ampm 228 else: 229 hourfmt = '%Hh' 230 hourminfmt = '%H:%M' 231 232 now = datetime.now() 233 today = now.date() 234 if d.date() == today or d > now - timedelta(hours=6): 235 delta = datetime.now() - d 236 if delta.seconds < 60: 237 string = 'just now' 238 elif delta.seconds < 3600: 239 string = '%dmin ago' % (delta.seconds // 60) 240 elif delta.seconds < 6 * 3600: 241 string = '%dh ago' % (delta.seconds // 3600) 242 else: 243 string = d.strftime(hourminfmt) 244 elif d.date() == today - timedelta(1): 245 string = d.strftime('yest ' + hourfmt) 246 elif d.date() > today - timedelta(7): 247 string = d.strftime('%a ' + hourfmt) 248 elif d.year != today.year: 249 string = d.strftime('%b %Y') 250 else: 251 string = d.strftime('%b %d') 252 return string_decode(string, 'UTF-8') 253 254 255def call_cmd(cmdlist, stdin=None): 256 """ 257 get a shell commands output, error message and return value and immediately 258 return. 259 260 .. warning:: 261 262 This returns with the first screen content for interactive commands. 263 264 :param cmdlist: shellcommand to call, already splitted into a list accepted 265 by :meth:`subprocess.Popen` 266 :type cmdlist: list of str 267 :param stdin: string to pipe to the process 268 :type stdin: str, bytes, or None 269 :return: triple of stdout, stderr, return value of the shell command 270 :rtype: str, str, int 271 """ 272 termenc = urwid.util.detected_encoding 273 if isinstance(stdin, str): 274 stdin = stdin.encode(termenc) 275 try: 276 277 logging.debug("Calling %s" % cmdlist) 278 proc = subprocess.Popen( 279 cmdlist, 280 stdout=subprocess.PIPE, 281 stderr=subprocess.PIPE, 282 stdin=subprocess.PIPE if stdin is not None else None) 283 except OSError as e: 284 out = b'' 285 err = e.strerror 286 ret = e.errno 287 else: 288 out, err = proc.communicate(stdin) 289 ret = proc.returncode 290 291 out = string_decode(out, termenc) 292 err = string_decode(err, termenc) 293 return out, err, ret 294 295 296async def call_cmd_async(cmdlist, stdin=None, env=None): 297 """Given a command, call that command asynchronously and return the output. 298 299 This function only handles `OSError` when creating the subprocess, any 300 other exceptions raised either durring subprocess creation or while 301 exchanging data with the subprocess are the caller's responsibility to 302 handle. 303 304 If such an `OSError` is caught, then returncode will be set to 1, and the 305 error value will be set to the str() value of the exception. 306 307 :type cmdlist: list of str 308 :param stdin: string to pipe to the process 309 :type stdin: str 310 :return: Tuple of stdout, stderr, returncode 311 :rtype: tuple[str, str, int] 312 """ 313 termenc = urwid.util.detected_encoding 314 cmdlist = [s.encode(termenc) for s in cmdlist] 315 316 environment = os.environ.copy() 317 if env is not None: 318 environment.update(env) 319 logging.debug('ENV = %s', environment) 320 logging.debug('CMD = %s', cmdlist) 321 try: 322 proc = await asyncio.create_subprocess_exec( 323 *cmdlist, 324 env=environment, 325 stdout=asyncio.subprocess.PIPE, 326 stderr=asyncio.subprocess.PIPE, 327 stdin=asyncio.subprocess.PIPE if stdin else None) 328 except OSError as e: 329 return ('', str(e), 1) 330 out, err = await proc.communicate(stdin.encode(termenc) if stdin else None) 331 return (out.decode(termenc), err.decode(termenc), proc.returncode) 332 333 334def guess_mimetype(blob): 335 """ 336 uses file magic to determine the mime-type of the given data blob. 337 338 :param blob: file content as read by file.read() 339 :type blob: data 340 :returns: mime-type, falls back to 'application/octet-stream' 341 :rtype: str 342 """ 343 mimetype = 'application/octet-stream' 344 # this is a bit of a hack to support different versions of python magic. 345 # Hopefully at some point this will no longer be necessary 346 # 347 # the version with open() is the bindings shipped with the file source from 348 # http://darwinsys.com/file/ - this is what is used by the python-magic 349 # package on Debian/Ubuntu. However, it is not available on pypi/via pip. 350 # 351 # the version with from_buffer() is available at 352 # https://github.com/ahupp/python-magic and directly installable via pip. 353 # 354 # for more detail see https://github.com/pazz/alot/pull/588 355 if hasattr(magic, 'open'): 356 m = magic.open(magic.MAGIC_MIME_TYPE) 357 m.load() 358 magictype = m.buffer(blob) 359 elif hasattr(magic, 'from_buffer'): 360 # cf. issue #841 361 magictype = magic.from_buffer(blob, mime=True) or magictype 362 else: 363 raise Exception('Unknown magic API') 364 365 # libmagic does not always return proper mimetype strings, cf. issue #459 366 if re.match(r'\w+\/\w+', magictype): 367 mimetype = magictype 368 return mimetype 369 370 371def guess_encoding(blob): 372 """ 373 uses file magic to determine the encoding of the given data blob. 374 375 :param blob: file content as read by file.read() 376 :type blob: data 377 :returns: encoding 378 :rtype: str 379 """ 380 # this is a bit of a hack to support different versions of python magic. 381 # Hopefully at some point this will no longer be necessary 382 # 383 # the version with open() is the bindings shipped with the file source from 384 # http://darwinsys.com/file/ - this is what is used by the python-magic 385 # package on Debian/Ubuntu. However it is not available on pypi/via pip. 386 # 387 # the version with from_buffer() is available at 388 # https://github.com/ahupp/python-magic and directly installable via pip. 389 # 390 # for more detail see https://github.com/pazz/alot/pull/588 391 if hasattr(magic, 'open'): 392 m = magic.open(magic.MAGIC_MIME_ENCODING) 393 m.load() 394 return m.buffer(blob) 395 elif hasattr(magic, 'from_buffer'): 396 m = magic.Magic(mime_encoding=True) 397 return m.from_buffer(blob) 398 else: 399 raise Exception('Unknown magic API') 400 401 402def try_decode(blob): 403 """Guess the encoding of blob and try to decode it into a str. 404 405 :param bytes blob: The bytes to decode 406 :returns: the decoded blob 407 :rtype: str 408 """ 409 assert isinstance(blob, bytes), 'cannot decode a str or non-bytes object' 410 return blob.decode(guess_encoding(blob)) 411 412 413def libmagic_version_at_least(version): 414 """ 415 checks if the libmagic library installed is more recent than a given 416 version. 417 418 :param version: minimum version expected in the form XYY (i.e. 5.14 -> 514) 419 with XYY >= 513 420 """ 421 if hasattr(magic, 'open'): 422 magic_wrapper = magic._libraries['magic'] 423 elif hasattr(magic, 'from_buffer'): 424 magic_wrapper = magic.libmagic 425 else: 426 raise Exception('Unknown magic API') 427 428 if not hasattr(magic_wrapper, 'magic_version'): 429 # The magic_version function has been introduced in libmagic 5.13, 430 # if it's not present, we can't guess right, so let's assume False 431 return False 432 433 # Depending on the libmagic/ctypes version, magic_version is a function or 434 # a callable: 435 if callable(magic_wrapper.magic_version): 436 return magic_wrapper.magic_version() >= version 437 438 return magic_wrapper.magic_version >= version 439 440 441# TODO: make this work on blobs, not paths 442def mimewrap(path, filename=None, ctype=None): 443 """Take the contents of the given path and wrap them into an email MIME 444 part according to the content type. The content type is auto detected from 445 the actual file contents and the file name if it is not given. 446 447 :param path: the path to the file contents 448 :type path: str 449 :param filename: the file name to use in the generated MIME part 450 :type filename: str or None 451 :param ctype: the content type of the file contents in path 452 :type ctype: str or None 453 :returns: the message MIME part storing the data from path 454 :rtype: subclasses of email.mime.base.MIMEBase 455 """ 456 457 with open(path, 'rb') as f: 458 content = f.read() 459 if not ctype: 460 ctype = guess_mimetype(content) 461 # libmagic < 5.12 incorrectly detects excel/powerpoint files as 462 # 'application/msword' (see #179 and #186 in libmagic bugtracker) 463 # This is a workaround, based on file extension, useful as long 464 # as distributions still ship libmagic 5.11. 465 if (ctype == 'application/msword' and 466 not libmagic_version_at_least(513)): 467 mimetype, _ = mimetypes.guess_type(path) 468 if mimetype: 469 ctype = mimetype 470 471 maintype, subtype = ctype.split('/', 1) 472 if maintype == 'text': 473 part = MIMEText(content.decode(guess_encoding(content), 'replace'), 474 _subtype=subtype, 475 _charset='utf-8') 476 elif maintype == 'image': 477 part = MIMEImage(content, _subtype=subtype) 478 elif maintype == 'audio': 479 part = MIMEAudio(content, _subtype=subtype) 480 else: 481 part = MIMEBase(maintype, subtype) 482 part.set_payload(content) 483 # Encode the payload using Base64 484 email.encoders.encode_base64(part) 485 # Set the filename parameter 486 if not filename: 487 filename = os.path.basename(path) 488 part.add_header('Content-Disposition', 'attachment', 489 filename=filename) 490 return part 491 492 493def shell_quote(text): 494 """Escape the given text for passing it to the shell for interpretation. 495 The resulting string will be parsed into one "word" (in the sense used in 496 the shell documentation, see sh(1)) by the shell. 497 498 :param text: the text to quote 499 :type text: str 500 :returns: the quoted text 501 :rtype: str 502 """ 503 return "'%s'" % text.replace("'", """'"'"'""") 504 505 506def humanize_size(size): 507 """Create a nice human readable representation of the given number 508 (understood as bytes) using the "KiB" and "MiB" suffixes to indicate 509 kibibytes and mebibytes. A kibibyte is defined as 1024 bytes (as opposed to 510 a kilobyte which is 1000 bytes) and a mibibyte is 1024**2 bytes (as opposed 511 to a megabyte which is 1000**2 bytes). 512 513 :param size: the number to convert 514 :type size: int 515 :returns: the human readable representation of size 516 :rtype: str 517 """ 518 for factor, format_string in ((1, '%i'), 519 (1024, '%iKiB'), 520 (1024 * 1024, '%.1fMiB')): 521 if size / factor < 1024: 522 return format_string % (size / factor) 523 return format_string % (size / factor) 524 525 526def parse_mailcap_nametemplate(tmplate='%s'): 527 """this returns a prefix and suffix to be used 528 in the tempfile module for a given mailcap nametemplate string""" 529 nt_list = tmplate.split('%s') 530 template_prefix = '' 531 template_suffix = '' 532 if len(nt_list) == 2: 533 template_suffix = nt_list[1] 534 template_prefix = nt_list[0] 535 else: 536 template_suffix = tmplate 537 return (template_prefix, template_suffix) 538 539 540def parse_mailto(mailto_str): 541 """ 542 Interpret mailto-string 543 544 :param mailto_str: the string to interpret. Must conform to :rfc:2368. 545 :type mailto_str: str 546 :return: the header fields and the body found in the mailto link as a tuple 547 of length two 548 :rtype: tuple(dict(str->list(str)), str) 549 """ 550 if mailto_str.startswith('mailto:'): 551 import urllib.parse 552 to_str, parms_str = mailto_str[7:].partition('?')[::2] 553 headers = {} 554 body = '' 555 556 to = urllib.parse.unquote(to_str) 557 if to: 558 headers['To'] = [to] 559 560 for s in parms_str.split('&'): 561 key, value = s.partition('=')[::2] 562 key = key.capitalize() 563 if key == 'Body': 564 body = urllib.parse.unquote(value) 565 elif value: 566 headers[key] = [urllib.parse.unquote(value)] 567 return (headers, body) 568 else: 569 return (None, None) 570 571 572def mailto_to_envelope(mailto_str): 573 """ 574 Interpret mailto-string into a :class:`alot.db.envelope.Envelope` 575 """ 576 from alot.db.envelope import Envelope 577 headers, body = parse_mailto(mailto_str) 578 return Envelope(bodytext=body, headers=headers) 579 580 581def RFC3156_canonicalize(text): 582 """ 583 Canonicalizes plain text (MIME-encoded usually) according to RFC3156. 584 585 This function works as follows (in that order): 586 587 1. Convert all line endings to \\\\r\\\\n (DOS line endings). 588 2. Encode all occurrences of "From " at the beginning of a line 589 to "From=20" in order to prevent other mail programs to replace 590 this with "> From" (to avoid MBox conflicts) and thus invalidate 591 the signature. 592 593 :param text: text to canonicalize (already encoded as quoted-printable) 594 :rtype: str 595 """ 596 text = re.sub("\r?\n", "\r\n", text) 597 text = re.sub("^From ", "From=20", text, flags=re.MULTILINE) 598 return text 599 600 601def get_xdg_env(env_name, fallback): 602 """ Used for XDG_* env variables to return fallback if unset *or* empty """ 603 env = os.environ.get(env_name) 604 return env if env else fallback 605