1# -*- coding: utf-8 -*- 2 3# Copyright(C) 2010-2012 Christophe Benz, Romain Bignon, Laurent Bachelier 4# 5# This file is part of weboob. 6# 7# weboob is free software: you can redistribute it and/or modify 8# it under the terms of the GNU Lesser General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# weboob is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU Lesser General Public License for more details. 16# 17# You should have received a copy of the GNU Lesser General Public License 18# along with weboob. If not, see <http://www.gnu.org/licenses/>. 19 20from __future__ import print_function 21 22import atexit 23import logging 24import os 25import re 26import shlex 27import signal 28import sys 29from cmd import Cmd 30from collections import OrderedDict 31from datetime import datetime 32from optparse import IndentedHelpFormatter, OptionGroup, OptionParser 33 34from weboob.capabilities.base import BaseObject, FieldNotFound, UserError, empty 35from weboob.capabilities.collection import BaseCollection, CapCollection, Collection, CollectionNotFound 36from weboob.core import CallErrors 37from weboob.exceptions import BrowserQuestion, BrowserRedirect, DecoupledValidation 38from weboob.tools.application.formatters.iformatter import MandatoryFieldsNotFound 39from weboob.tools.compat import basestring, range, unicode 40from weboob.tools.misc import to_unicode 41from weboob.tools.path import WorkingPath 42 43from .console import BackendNotGiven, ConsoleApplication 44from .formatters.load import FormatterLoadError, FormattersLoader 45from .results import ResultsCondition, ResultsConditionError 46 47__all__ = ['NotEnoughArguments', 'TooManyArguments', 'ArgSyntaxError', 48 'ReplApplication'] 49 50 51class NotEnoughArguments(Exception): 52 pass 53 54 55class TooManyArguments(Exception): 56 pass 57 58 59class ArgSyntaxError(Exception): 60 pass 61 62 63class ReplOptionParser(OptionParser): 64 def format_option_help(self, formatter=None): 65 if not formatter: 66 formatter = self.formatter 67 68 return '%s\n%s' % (formatter.format_commands(self.commands), 69 OptionParser.format_option_help(self, formatter)) 70 71 72class ReplOptionFormatter(IndentedHelpFormatter): 73 def format_commands(self, commands): 74 s = u'' 75 for section, cmds in commands.items(): 76 if len(cmds) == 0: 77 continue 78 if len(s) > 0: 79 s += '\n' 80 s += '%s Commands:\n' % section 81 for c in cmds: 82 c = c.split('\n')[0] 83 s += ' %s\n' % c 84 return s 85 86 87def defaultcount(default_count=10): 88 def deco(f): 89 def inner(self, *args, **kwargs): 90 oldvalue = self.options.count 91 if self._is_default_count: 92 self.options.count = default_count 93 94 try: 95 return f(self, *args, **kwargs) 96 finally: 97 self.options.count = oldvalue 98 99 inner.__doc__ = f.__doc__ 100 assert inner.__doc__ is not None, "A command must have a docstring" 101 inner.__doc__ += '\nDefault is limited to %s results.' % default_count 102 103 return inner 104 return deco 105 106 107class MyCmd(Cmd, object): 108 # Hack for Python 2, because Cmd doesn't inherit object, so its __init__ was not called when using super only 109 pass 110 111 112class ReplApplication(ConsoleApplication, MyCmd): 113 """ 114 Base application class for Repl applications. 115 """ 116 117 SYNOPSIS = 'Usage: %prog [-dqv] [-b backends] [-cnfs] [command [arguments..]]\n' 118 SYNOPSIS += ' %prog [--help] [--version]' 119 DISABLE_REPL = False 120 121 EXTRA_FORMATTERS = {} 122 DEFAULT_FORMATTER = 'multiline' 123 COMMANDS_FORMATTERS = {} 124 125 COLLECTION_OBJECTS = tuple() 126 """Objects to allow in do_ls / do_cd""" 127 128 weboob_commands = set(['backends', 'condition', 'count', 'formatter', 'logging', 'select', 'quit', 'ls', 'cd']) 129 hidden_commands = set(['EOF']) 130 131 def __init__(self): 132 super(ReplApplication, self).__init__(ReplOptionParser(self.SYNOPSIS, version=self._get_optparse_version())) 133 134 copyright = self.COPYRIGHT.replace('YEAR', '%d' % datetime.today().year) 135 self.intro = '\n'.join(('Welcome to %s%s%s v%s' % (self.BOLD, self.APPNAME, self.NC, self.VERSION), 136 '', 137 copyright, 138 'This program is free software: you can redistribute it and/or modify', 139 'it under the terms of the GNU Lesser General Public License as published by', 140 'the Free Software Foundation, either version 3 of the License, or', 141 '(at your option) any later version.', 142 '', 143 'Type "help" to display available commands.', 144 '', 145 )) 146 self.formatters_loader = FormattersLoader() 147 for key, klass in self.EXTRA_FORMATTERS.items(): 148 self.formatters_loader.register_formatter(key, klass) 149 self.formatter = None 150 self.commands_formatters = self.COMMANDS_FORMATTERS.copy() 151 152 commands_help = self.get_commands_doc() 153 self._parser.commands = commands_help 154 self._parser.formatter = ReplOptionFormatter() 155 156 results_options = OptionGroup(self._parser, 'Results Options') 157 results_options.add_option('-c', '--condition', help='filter result items to display given a boolean expression. See CONDITION section for the syntax') 158 results_options.add_option('-n', '--count', type='int', 159 help='limit number of results (from each backends)') 160 results_options.add_option('-s', '--select', help='select result item keys to display (comma separated)') 161 self._parser.add_option_group(results_options) 162 163 formatting_options = OptionGroup(self._parser, 'Formatting Options') 164 available_formatters = self.formatters_loader.get_available_formatters() 165 formatting_options.add_option('-f', '--formatter', choices=available_formatters, 166 help='select output formatter (%s)' % u', '.join(available_formatters)) 167 formatting_options.add_option('--no-header', dest='no_header', action='store_true', help='do not display header') 168 formatting_options.add_option('--no-keys', dest='no_keys', action='store_true', help='do not display item keys') 169 formatting_options.add_option('-O', '--outfile', dest='outfile', help='file to export result') 170 self._parser.add_option_group(formatting_options) 171 172 self._interactive = False 173 self.working_path = WorkingPath() 174 self._change_prompt() 175 176 @property 177 def interactive(self): 178 return self._interactive 179 180 def _change_prompt(self): 181 self.objects = [] 182 self.collections = [] 183 # XXX can't use bold prompt because: 184 # 1. it causes problems when trying to get history (lines don't start 185 # at the right place). 186 # 2. when typing a line longer than term width, cursor goes at start 187 # of the same line instead of new line. 188 #self.prompt = self.BOLD + '%s> ' % self.APPNAME + self.NC 189 if len(self.working_path.get()): 190 wp_enc = unicode(self.working_path) 191 self.prompt = '%s:%s> ' % (self.APPNAME, wp_enc) 192 else: 193 self.prompt = '%s> ' % (self.APPNAME) 194 195 def change_path(self, split_path): 196 self.working_path.location(split_path) 197 self._change_prompt() 198 199 def add_object(self, obj): 200 self.objects.append(obj) 201 202 def _complete_object(self): 203 return [obj.fullid for obj in self.objects] 204 205 def parse_id(self, id, unique_backend=False): 206 if self.interactive: 207 try: 208 obj = self.objects[int(id) - 1] 209 except (IndexError, ValueError): 210 # Try to find a shortcut in the cache 211 for obj in self.objects: 212 if id in obj.id: 213 id = obj.fullid 214 break 215 else: 216 if isinstance(obj, BaseObject): 217 id = obj.fullid 218 try: 219 return ConsoleApplication.parse_id(self, id, unique_backend) 220 except BackendNotGiven as e: 221 backend_name = None 222 while not backend_name: 223 print('This command works with an unique backend. Availables:') 224 for index, (name, backend) in enumerate(e.backends): 225 print('%s%d)%s %s%-15s%s %s' % (self.BOLD, index + 1, self.NC, self.BOLD, name, self.NC, 226 backend.DESCRIPTION)) 227 i = self.ask('Select a backend to proceed with "%s"' % id) 228 if not i.isdigit(): 229 if i not in dict(e.backends): 230 print('Error: %s is not a valid backend' % i, file=self.stderr) 231 continue 232 backend_name = i 233 else: 234 i = int(i) 235 if i < 0 or i > len(e.backends): 236 print('Error: %s is not a valid choice' % i, file=self.stderr) 237 continue 238 backend_name = e.backends[i-1][0] 239 240 return id, backend_name 241 242 def get_object(self, _id, method, fields=None, caps=None): 243 if self.interactive: 244 try: 245 obj = self.objects[int(_id) - 1] 246 except (IndexError, ValueError): 247 pass 248 else: 249 try: 250 backend = self.weboob.get_backend(obj.backend) 251 actual_method = getattr(backend, method, None) 252 if actual_method is None: 253 return None 254 else: 255 if callable(actual_method): 256 obj, = self.do('fillobj', obj, fields, backends=backend) 257 return obj 258 else: 259 return None 260 except UserError as e: 261 self.bcall_error_handler(backend, e, '') 262 263 _id, backend_name = self.parse_id(_id) 264 kargs = {} 265 if caps is not None: 266 kargs = {'caps': caps} 267 backend_names = (backend_name,) if backend_name is not None else self.enabled_backends 268 269 # if backend's service returns several objects, try to find the one 270 # with wanted ID. If not found, get the last not None object. 271 obj = None 272 273 # remove backends that do not have the required method 274 new_backend_names = [] 275 for backend in backend_names: 276 if isinstance(backend, (str, unicode)): 277 actual_backend = self.weboob.get_backend(backend) 278 else: 279 actual_backend = backend 280 if getattr(actual_backend, method, None) is not None: 281 new_backend_names.append(backend) 282 backend_names = tuple(new_backend_names) 283 try: 284 for objiter in self.do(method, _id, backends=backend_names, fields=fields, **kargs): 285 if objiter: 286 obj = objiter 287 if objiter.id == _id: 288 return obj 289 except CallErrors as e: 290 if obj is not None: 291 self.bcall_errors_handler(e) 292 else: 293 raise 294 295 return obj 296 297 def get_object_list(self, method=None, *args, **kwargs): 298 # return cache if not empty 299 if len(self.objects) > 0: 300 return self.objects 301 elif method is not None: 302 kwargs['backends'] = self.enabled_backends 303 for _object in self.do(method, *args, **kwargs): 304 self.add_object(_object) 305 return self.objects 306 # XXX: what can we do without method? 307 return tuple() 308 309 def unload_backends(self, *args, **kwargs): 310 self.objects = [] 311 self.collections = [] 312 return ConsoleApplication.unload_backends(self, *args, **kwargs) 313 314 def load_backends(self, *args, **kwargs): 315 self.objects = [] 316 self.collections = [] 317 return ConsoleApplication.load_backends(self, *args, **kwargs) 318 319 def main(self, argv): 320 cmd_args = argv[1:] 321 if cmd_args: 322 cmd_line = u' '.join(cmd_args) 323 cmds = cmd_line.split(';') 324 for cmd in cmds: 325 ret = self.onecmd(cmd) 326 if ret: 327 return ret 328 elif self.DISABLE_REPL: 329 self._parser.print_help() 330 self._parser.exit() 331 else: 332 try: 333 import readline 334 except ImportError: 335 pass 336 else: 337 # Remove '-' from delims 338 readline.set_completer_delims(readline.get_completer_delims().replace('-', '')) 339 340 history_filepath = os.path.join(self.weboob.workdir, '%s_history' % self.APPNAME) 341 try: 342 readline.read_history_file(history_filepath) 343 except IOError: 344 pass 345 346 def savehist(): 347 readline.write_history_file(history_filepath) 348 atexit.register(savehist) 349 350 self.intro += '\nLoaded backends: %s\n' % ', '.join(sorted(backend.name for backend in self.weboob.iter_backends())) 351 self._interactive = True 352 self.cmdloop() 353 354 def do(self, function, *args, **kwargs): 355 """ 356 Call Weboob.do(), passing count and selected fields given by user. 357 """ 358 backends = kwargs.pop('backends', None) 359 if backends is None: 360 kwargs['backends'] = [] 361 for backend in self.enabled_backends: 362 actual_function = getattr(backend, function, None) if isinstance(function, basestring) else function 363 364 if callable(actual_function): 365 kwargs['backends'].append(backend) 366 else: 367 kwargs['backends'] = backends 368 fields = kwargs.pop('fields', self.selected_fields) 369 if not fields and fields != []: 370 fields = self.selected_fields 371 372 fields = self.parse_fields(fields) 373 374 if fields and self.formatter.MANDATORY_FIELDS is not None: 375 missing_fields = set(self.formatter.MANDATORY_FIELDS) - set(fields) 376 # If a mandatory field is not selected, do not use the customized formatter 377 if missing_fields: 378 print('Warning: you do not select enough mandatory fields for the formatter. Fallback to another. Hint: use option -f', file=self.stderr) 379 self.formatter = self.formatters_loader.build_formatter(ReplApplication.DEFAULT_FORMATTER) 380 381 if self.formatter.DISPLAYED_FIELDS is not None: 382 if fields is None: 383 missing_fields = True 384 else: 385 missing_fields = set(fields) - set(self.formatter.DISPLAYED_FIELDS + self.formatter.MANDATORY_FIELDS) 386 # If a selected field is not displayed, do not use the customized formatter 387 if missing_fields: 388 print('Warning: some selected fields will not be displayed by the formatter. Fallback to another. Hint: use option -f', file=self.stderr) 389 self.formatter = self.formatters_loader.build_formatter(ReplApplication.DEFAULT_FORMATTER) 390 391 return self.weboob.do(self._do_complete, self.options.count, fields, function, *args, **kwargs) 392 393 def _do_and_retry(self, *args, **kwargs): 394 """ 395 This method is a wrapper around Weboob.do(), and handle interactive 396 errors which allow to retry. 397 398 List of handled errors: 399 - BrowserQuestion 400 - BrowserRedirect 401 - DecoupledValidation 402 """ 403 404 if self.stdout.isatty(): 405 # Set a non-None value to all backends's request_information 406 # 407 # - None indicates non-interactive: do not trigger 2FA challenges, 408 # raise NeedInteractive* exceptions before doing so 409 # - non-None indicates interactive: ok to trigger 2FA challenges, 410 # raise BrowserQuestion/AppValidation when facing one 411 # It should be a dict because when non-empty, it will contain HTTP 412 # headers for legal PSD2 AIS/PIS authentication. 413 for backend in self.enabled_backends: 414 key = 'request_information' 415 if key in backend.config and backend.config[key].get() is None: 416 backend.config[key].set({}) 417 418 try: 419 for obj in self.weboob.do(*args, **kwargs): 420 yield obj 421 except CallErrors as errors: 422 # Errors which are not handled here and which will be re-raised. 423 remaining_errors = [] 424 # Backends on which we will retry. 425 backends = set() 426 427 for backend, error, backtrace in errors.errors: 428 if isinstance(error, BrowserQuestion): 429 for field in error.fields: 430 v = self.ask(field) 431 backend.config[field.id].set(v) 432 elif isinstance(error, BrowserRedirect): 433 print(u'Open this URL in a browser:') 434 print(error.url) 435 print() 436 value = self.ask('Please enter the final URL') 437 backend.config['auth_uri'].set(value) 438 elif isinstance(error, DecoupledValidation): 439 print(error.message) 440 # FIXME we should reset this value, in case another DecoupledValidation occurs 441 key = 'resume' 442 if key in backend.config: 443 backend.config[key].set(True) 444 else: 445 # Not handled error. 446 remaining_errors.append((backend, error, backtrace)) 447 continue 448 449 backends.add(backend) 450 451 if backends: 452 # There is at least one backend on which we can retry, do it 453 # only on this ones. 454 kwargs['backends'] = backends 455 try: 456 for obj in self._do_and_retry(*args, **kwargs): 457 yield obj 458 except CallErrors as sub_errors: 459 # As we called _do_and_retry, these sub errors are not 460 # interactive ones, so we can add them to the remaining 461 # errors. 462 remaining_errors += sub_errors.errors 463 464 errors.errors = remaining_errors 465 if errors.errors: 466 # If there are remaining errors, raise them. 467 raise errors 468 469 # -- command tools ------------ 470 def parse_command_args(self, line, nb, req_n=None): 471 try: 472 if sys.version_info.major >= 3: 473 args = shlex.split(line) 474 else: 475 args = [arg.decode('utf-8') for arg in shlex.split(line.encode('utf-8'))] 476 except ValueError as e: 477 raise ArgSyntaxError(str(e)) 478 479 if nb < len(args): 480 raise TooManyArguments('Command takes at most %d arguments' % nb) 481 if req_n is not None and (len(args) < req_n): 482 raise NotEnoughArguments('Command needs %d arguments' % req_n) 483 484 if len(args) < nb: 485 args += tuple(None for i in range(nb - len(args))) 486 return args 487 488 # -- cmd.Cmd methods --------- 489 def postcmd(self, stop, line): 490 """ 491 This REPL method is overridden to return None instead of integers 492 to prevent stopping cmdloop(). 493 """ 494 if not isinstance(stop, bool): 495 stop = None 496 return stop 497 498 def parseline(self, line): 499 """ 500 This REPL method is overridden to search "short" alias of commands 501 """ 502 cmd, arg, ignored = Cmd.parseline(self, line) 503 504 if cmd is not None: 505 names = set(name for name in self.get_names() if name.startswith('do_')) 506 507 if 'do_' + cmd not in names: 508 long = set(name for name in names if name.startswith('do_' + cmd)) 509 # if more than one result, ambiguous command, do nothing (error will display suggestions) 510 if len(long) == 1: 511 cmd = long.pop()[3:] 512 513 return cmd, arg, ignored 514 515 def onecmd(self, line): 516 """ 517 This REPL method is overridden to catch some particular exceptions. 518 """ 519 line = to_unicode(line) 520 cmd, arg, ignored = self.parseline(line) 521 522 # Set the right formatter for the command. 523 try: 524 formatter_name = self.commands_formatters[cmd] 525 except KeyError: 526 formatter_name = self.DEFAULT_FORMATTER 527 self.set_formatter(formatter_name) 528 529 try: 530 try: 531 return super(ReplApplication, self).onecmd(line) 532 except CallErrors as e: 533 return self.bcall_errors_handler(e) 534 except BackendNotGiven as e: 535 print('Error: %s' % str(e), file=self.stderr) 536 return os.EX_DATAERR 537 except NotEnoughArguments as e: 538 print('Error: not enough arguments. %s' % str(e), file=self.stderr) 539 return os.EX_USAGE 540 except TooManyArguments as e: 541 print('Error: too many arguments. %s' % str(e), file=self.stderr) 542 return os.EX_USAGE 543 except ArgSyntaxError as e: 544 print('Error: invalid arguments. %s' % str(e), file=self.stderr) 545 return os.EX_USAGE 546 except (KeyboardInterrupt, EOFError): 547 # ^C during a command process doesn't exit application. 548 print('\nAborted.') 549 return signal.SIGINT + 128 550 finally: 551 self.flush() 552 553 def emptyline(self): 554 """ 555 By default, an emptyline repeats the previous command. 556 Overriding this function disables this behaviour. 557 """ 558 pass 559 560 def default(self, line): 561 print('Unknown command: "%s"' % line, file=self.stderr) 562 cmd, arg, ignore = Cmd.parseline(self, line) 563 if cmd is not None: 564 names = set(name[3:] for name in self.get_names() if name.startswith('do_' + cmd)) 565 if len(names) > 0: 566 print('Do you mean: %s?' % ', '.join(names), file=self.stderr) 567 return os.EX_USAGE 568 569 def completenames(self, text, *ignored): 570 return [name for name in Cmd.completenames(self, text, *ignored) if name not in self.hidden_commands] 571 572 def _shell_completion_items(self): 573 items = super(ReplApplication, self)._shell_completion_items() 574 items.update( 575 set(self.completenames('')) - 576 set(('debug', 'condition', 'count', 'formatter', 'logging', 'select', 'quit'))) 577 return items 578 579 def path_completer(self, arg): 580 dirname = os.path.dirname(arg) 581 try: 582 children = os.listdir(dirname or '.') 583 except OSError: 584 return () 585 l = [] 586 for child in children: 587 path = os.path.join(dirname, child) 588 if os.path.isdir(path): 589 child += '/' 590 l.append(child) 591 return l 592 593 def complete(self, text, state): 594 """ 595 Override of the Cmd.complete() method to: 596 597 * add a space at end of proposals 598 * display only proposals for words which match the 599 text already written by user. 600 """ 601 super(ReplApplication, self).complete(text, state) 602 603 # When state = 0, Cmd.complete() set the 'completion_matches' attribute by 604 # calling the completion function. Then, for other states, it only tries to 605 # get the right item in list. 606 # So that's the good place to rework the choices. 607 if state == 0: 608 self.completion_matches = [choice for choice in self.completion_matches if choice.startswith(text)] 609 610 try: 611 match = self.completion_matches[state] 612 except IndexError: 613 return None 614 else: 615 if match[-1] != '/': 616 return '%s ' % match 617 return match 618 619 # -- errors management ------------- 620 def bcall_error_handler(self, backend, error, backtrace): 621 """ 622 Handler for an exception inside the CallErrors exception. 623 624 This method can be overridden to support more exceptions types. 625 """ 626 return super(ReplApplication, self).bcall_error_handler(backend, error, backtrace) 627 628 def bcall_errors_handler(self, errors, ignore=()): 629 if self.interactive: 630 return super(ReplApplication, self).bcall_errors_handler(errors, 'Use "logging debug" option to print backtraces.', ignore) 631 else: 632 return super(ReplApplication, self).bcall_errors_handler(errors, ignore=ignore) 633 634 # -- options related methods ------------- 635 def _handle_options(self): 636 if self.options.formatter: 637 self.commands_formatters = {} 638 self.DEFAULT_FORMATTER = self.options.formatter 639 self.set_formatter(self.DEFAULT_FORMATTER) 640 641 if self.options.select: 642 self.selected_fields = self.options.select.split(',') 643 else: 644 self.selected_fields = ['$direct'] 645 646 647 if self.options.count is not None: 648 self._is_default_count = False 649 if self.options.count <= 0: 650 # infinite search 651 self.options.count = None 652 653 if self.options.condition: 654 self.condition = ResultsCondition(self.options.condition) 655 else: 656 self.condition = None 657 658 return super(ReplApplication, self)._handle_options() 659 660 def get_command_help(self, command, short=False): 661 try: 662 func = getattr(self, 'do_' + command) 663 except AttributeError: 664 return None 665 666 doc = func.__doc__ 667 assert doc is not None, "A command must have a docstring" 668 669 lines = [line.strip() for line in doc.strip().split('\n')] 670 if not lines[0].startswith(command): 671 lines = [command, ''] + lines 672 673 if short: 674 return lines[0] 675 676 return '\n'.join(lines) 677 678 def get_commands_doc(self): 679 names = set(name for name in self.get_names() if name.startswith('do_')) 680 appname = self.APPNAME.capitalize() 681 d = OrderedDict(((appname, []), ('Weboob', []))) 682 683 for name in sorted(names): 684 cmd = name[3:] 685 if cmd in self.hidden_commands.union(self.weboob_commands).union(['help']): 686 continue 687 688 d[appname].append(self.get_command_help(cmd)) 689 if not self.DISABLE_REPL: 690 for cmd in self.weboob_commands: 691 d['Weboob'].append(self.get_command_help(cmd)) 692 693 return d 694 695 # -- default REPL commands --------- 696 def do_quit(self, arg): 697 """ 698 Quit the application. 699 """ 700 return True 701 702 def do_EOF(self, arg): 703 """ 704 Quit the command line interpreter when ^D is pressed. 705 """ 706 # print empty line for the next shell prompt to appear on the first column of the terminal 707 print() 708 return self.do_quit(arg) 709 710 def do_help(self, arg=None): 711 """ 712 help [COMMAND] 713 714 List commands, or get information about a command. 715 """ 716 if arg: 717 cmd_names = set(name[3:] for name in self.get_names() if name.startswith('do_')) 718 if arg in cmd_names: 719 command_help = self.get_command_help(arg) 720 if command_help is None: 721 logging.warning(u'Command "%s" is undocumented' % arg) 722 else: 723 lines = command_help.split('\n') 724 lines[0] = '%s%s%s' % (self.BOLD, lines[0], self.NC) 725 self.stdout.write('%s\n' % '\n'.join(lines)) 726 else: 727 print('Unknown command: "%s"' % arg, file=self.stderr) 728 else: 729 cmds = self._parser.formatter.format_commands(self._parser.commands) 730 self.stdout.write('%s\n' % cmds) 731 self.stdout.write('Type "help <command>" for more info about a command.\n') 732 return 2 733 734 def complete_backends(self, text, line, begidx, endidx): 735 choices = [] 736 commands = ['enable', 'disable', 'only', 'list', 'add', 'register', 'edit', 'remove', 'list-modules'] 737 available_backends_names = set(backend.name for backend in self.weboob.iter_backends()) 738 enabled_backends_names = set(backend.name for backend in self.enabled_backends) 739 740 args = line.split(' ') 741 if len(args) == 2: 742 choices = commands 743 elif len(args) >= 3: 744 if args[1] == 'enable': 745 choices = sorted(available_backends_names - enabled_backends_names) 746 elif args[1] == 'only': 747 choices = sorted(available_backends_names) 748 elif args[1] == 'disable': 749 choices = sorted(enabled_backends_names) 750 elif args[1] in ('add', 'register') and len(args) == 3: 751 for name, module in sorted(self.weboob.repositories.get_all_modules_info(self.CAPS).items()): 752 choices.append(name) 753 elif args[1] == 'edit': 754 choices = sorted(available_backends_names) 755 elif args[1] == 'remove': 756 choices = sorted(available_backends_names) 757 758 return choices 759 760 def do_backends(self, line): 761 """ 762 backends [ACTION] [BACKEND_NAME]... 763 764 Select used backends. 765 766 ACTION is one of the following (default: list): 767 * enable enable given backends 768 * disable disable given backends 769 * only enable given backends and disable the others 770 * list list backends 771 * add add a backend 772 * register register a new account on a website 773 * edit edit a backend 774 * remove remove a backend 775 * list-modules list modules 776 """ 777 line = line.strip() 778 if line: 779 args = line.split() 780 else: 781 args = ['list'] 782 783 action = args[0] 784 given_backend_names = args[1:] 785 786 for backend_name in given_backend_names: 787 if action in ('add', 'register'): 788 minfo = self.weboob.repositories.get_module_info(backend_name) 789 if minfo is None: 790 print('Module "%s" does not exist.' % backend_name, file=self.stderr) 791 return 1 792 else: 793 if not minfo.has_caps(self.CAPS): 794 print('Module "%s" is not supported by this application => skipping.' % backend_name, file=self.stderr) 795 return 1 796 else: 797 if backend_name not in [backend.name for backend in self.weboob.iter_backends()]: 798 print('Backend "%s" does not exist => skipping.' % backend_name, file=self.stderr) 799 return 1 800 801 if action in ('enable', 'disable', 'only', 'add', 'register', 'edit', 'remove'): 802 if not given_backend_names: 803 print('Please give at least a backend name.', file=self.stderr) 804 return 2 805 806 given_backends = set(backend for backend in self.weboob.iter_backends() if backend.name in given_backend_names) 807 808 if action == 'enable': 809 for backend in given_backends: 810 self.enabled_backends.add(backend) 811 elif action == 'disable': 812 for backend in given_backends: 813 try: 814 self.enabled_backends.remove(backend) 815 except KeyError: 816 print('%s is not enabled' % backend.name, file=self.stderr) 817 elif action == 'only': 818 self.enabled_backends = set() 819 for backend in given_backends: 820 self.enabled_backends.add(backend) 821 elif action == 'list': 822 enabled_backends_names = set(backend.name for backend in self.enabled_backends) 823 disabled_backends_names = set(backend.name for backend in self.weboob.iter_backends()) - enabled_backends_names 824 print('Enabled: %s' % ', '.join(enabled_backends_names)) 825 if len(disabled_backends_names) > 0: 826 print('Disabled: %s' % ', '.join(disabled_backends_names)) 827 elif action == 'add': 828 for name in given_backend_names: 829 instname = self.add_backend(name, name) 830 if instname: 831 self.load_backends(names=[instname]) 832 elif action == 'register': 833 for name in given_backend_names: 834 instname = self.register_backend(name) 835 if isinstance(instname, basestring): 836 self.load_backends(names=[instname]) 837 elif action == 'edit': 838 for backend in given_backends: 839 enabled = backend in self.enabled_backends 840 self.unload_backends(names=[backend.name]) 841 self.edit_backend(backend.name) 842 for newb in self.load_backends(names=[backend.name]).values(): 843 if not enabled: 844 self.enabled_backends.remove(newb) 845 elif action == 'remove': 846 for backend in given_backends: 847 self.weboob.backends_config.remove_backend(backend.name) 848 self.unload_backends(backend.name) 849 elif action == 'list-modules': 850 modules = [] 851 print('Modules list:') 852 for name, info in sorted(self.weboob.repositories.get_all_modules_info().items()): 853 if not self.is_module_loadable(info): 854 continue 855 modules.append(name) 856 loaded = ' ' 857 for bi in self.weboob.iter_backends(): 858 if bi.NAME == name: 859 if loaded == ' ': 860 loaded = 'X' 861 elif loaded == 'X': 862 loaded = 2 863 else: 864 loaded += 1 865 print('[%s] %s%-15s%s %s' % (loaded, self.BOLD, name, self.NC, info.description)) 866 867 else: 868 print('Unknown action: "%s"' % action, file=self.stderr) 869 return 1 870 871 if len(self.enabled_backends) == 0: 872 print('Warning: no more backends are loaded. %s is probably unusable.' % self.APPNAME.capitalize(), file=self.stderr) 873 874 def complete_logging(self, text, line, begidx, endidx): 875 levels = ('debug', 'info', 'warning', 'error', 'quiet', 'default') 876 args = line.split(' ') 877 if len(args) == 2: 878 return levels 879 return () 880 881 def do_logging(self, line): 882 """ 883 logging [LEVEL] 884 885 Set logging level. 886 887 Availables: debug, info, warning, error. 888 * quiet is an alias for error 889 * default is an alias for warning 890 """ 891 args = self.parse_command_args(line, 1, 0) 892 levels = (('debug', logging.DEBUG), 893 ('info', logging.INFO), 894 ('warning', logging.WARNING), 895 ('error', logging.ERROR), 896 ('quiet', logging.ERROR), 897 ('default', logging.WARNING) 898 ) 899 900 if not args[0]: 901 current = None 902 for label, level in levels: 903 if logging.root.level == level: 904 current = label 905 break 906 print('Current level: %s' % current) 907 return 908 909 levels = dict(levels) 910 try: 911 level = levels[args[0]] 912 except KeyError: 913 print('Level "%s" does not exist.' % args[0], file=self.stderr) 914 print('Availables: %s' % ' '.join(levels), file=self.stderr) 915 return 2 916 else: 917 logging.root.setLevel(level) 918 for handler in logging.root.handlers: 919 handler.setLevel(level) 920 921 def do_condition(self, line): 922 """ 923 condition [EXPRESSION | off] 924 925 If an argument is given, set the condition expression used to filter the results. See CONDITION section for more details and the expression. 926 If the "off" value is given, conditional filtering is disabled. 927 928 If no argument is given, print the current condition expression. 929 """ 930 line = line.strip() 931 if line: 932 if line == 'off': 933 self.condition = None 934 else: 935 try: 936 self.condition = ResultsCondition(line) 937 except ResultsConditionError as e: 938 print('%s' % e, file=self.stderr) 939 return 2 940 else: 941 if self.condition is None: 942 print('No condition is set.') 943 else: 944 print(str(self.condition)) 945 946 def do_count(self, line): 947 """ 948 count [NUMBER | off] 949 950 If an argument is given, set the maximum number of results fetched. 951 NUMBER must be at least 1. 952 "off" value disables counting, and allows infinite searches. 953 954 If no argument is given, print the current count value. 955 """ 956 line = line.strip() 957 if line: 958 if line == 'off': 959 self.options.count = None 960 self._is_default_count = False 961 else: 962 try: 963 count = int(line) 964 except ValueError: 965 print('Could not interpret "%s" as a number.' % line, file=self.stderr) 966 return 2 967 else: 968 if count > 0: 969 self.options.count = count 970 self._is_default_count = False 971 else: 972 self.options.count = None 973 self._is_default_count = False 974 else: 975 if self.options.count is None: 976 print('Counting disabled.') 977 else: 978 print(self.options.count) 979 980 def complete_formatter(self, text, line, *ignored): 981 formatters = self.formatters_loader.get_available_formatters() 982 commands = ['list', 'option'] + formatters 983 options = ['header', 'keys'] 984 option_values = ['on', 'off'] 985 986 args = line.split(' ') 987 if len(args) == 2: 988 return commands 989 if args[1] == 'option': 990 if len(args) == 3: 991 return options 992 if len(args) == 4: 993 return option_values 994 elif args[1] in formatters: 995 return list(set(name[3:] for name in self.get_names() if name.startswith('do_'))) 996 997 def do_formatter(self, line): 998 """ 999 formatter [list | FORMATTER [COMMAND] | option OPTION_NAME [on | off]] 1000 1001 If a FORMATTER is given, set the formatter to use. 1002 You can add a COMMAND to apply the formatter change only to 1003 a given command. 1004 1005 If the argument is "list", print the available formatters. 1006 1007 If the argument is "option", set the formatter options. 1008 Valid options are: header, keys. 1009 If on/off value is given, set the value of the option. 1010 If not, print the current value for the option. 1011 1012 If no argument is given, print the current formatter. 1013 """ 1014 args = line.strip().split() 1015 if args: 1016 if args[0] == 'list': 1017 print(', '.join(self.formatters_loader.get_available_formatters())) 1018 elif args[0] == 'option': 1019 if len(args) > 1: 1020 if len(args) == 2: 1021 if args[1] == 'header': 1022 print('off' if self.options.no_header else 'on') 1023 elif args[1] == 'keys': 1024 print('off' if self.options.no_keys else 'on') 1025 else: 1026 if args[2] not in ('on', 'off'): 1027 print('Invalid value "%s". Please use "on" or "off" values.' % args[2], file=self.stderr) 1028 return 2 1029 else: 1030 if args[1] == 'header': 1031 self.options.no_header = True if args[2] == 'off' else False 1032 elif args[1] == 'keys': 1033 self.options.no_keys = True if args[2] == 'off' else False 1034 else: 1035 print('Don\'t know which option to set. Available options: header, keys.', file=self.stderr) 1036 return 2 1037 else: 1038 if args[0] in self.formatters_loader.get_available_formatters(): 1039 if len(args) > 1: 1040 self.commands_formatters[args[1]] = self.set_formatter(args[0]) 1041 else: 1042 self.commands_formatters = {} 1043 self.DEFAULT_FORMATTER = self.set_formatter(args[0]) 1044 else: 1045 print('Formatter "%s" is not available.\n' 1046 'Available formatters: %s.' % (args[0], ', '.join(self.formatters_loader.get_available_formatters())), file=self.stderr) 1047 return 1 1048 else: 1049 print('Default formatter: %s' % self.DEFAULT_FORMATTER) 1050 for key, klass in self.commands_formatters.items(): 1051 print('Command "%s": %s' % (key, klass)) 1052 1053 def do_select(self, line): 1054 """ 1055 select [FIELD_NAME]... | "$direct" | "$full" 1056 1057 If an argument is given, set the selected fields. 1058 $direct selects all fields loaded in one http request. 1059 $full selects all fields using as much http requests as necessary. 1060 1061 If no argument is given, print the currently selected fields. 1062 """ 1063 line = line.strip() 1064 if line: 1065 split = line.split() 1066 self.selected_fields = split 1067 else: 1068 print(' '.join(self.selected_fields)) 1069 1070 # First sort in alphabetical of backend 1071 # Second, sort with ID 1072 def comp_key(self, obj): 1073 return (obj.backend, obj.id) 1074 1075 @defaultcount(40) 1076 def do_ls(self, line): 1077 """ 1078 ls [-d] [-U] [PATH] 1079 1080 List objects in current path. 1081 If an argument is given, list the specified path. 1082 Use -U option to not sort results. It allows you to use a "fast path" to 1083 return results as soon as possible. 1084 Use -d option to display information about a collection (and to not 1085 display the content of it). It has the same behavior than the well 1086 known UNIX "ls" command. 1087 """ 1088 # TODO: real parsing of options 1089 path = line.strip() 1090 only = False 1091 sort = True 1092 1093 if '-U' in line.strip().partition(' '): 1094 path = line.strip().partition(' ')[-1] 1095 sort = False 1096 1097 if '-d' in line.strip().partition(' '): 1098 path = None 1099 only = line.strip().partition(' ')[-1] 1100 1101 if path: 1102 for _path in path.split('/'): 1103 # We have an argument, let's ch to the directory before the ls 1104 self.working_path.cd1(_path) 1105 1106 objects = [] 1107 collections = [] 1108 self.objects = [] 1109 1110 self.start_format() 1111 1112 for res in self._fetch_objects(objs=self.COLLECTION_OBJECTS): 1113 if isinstance(res, Collection): 1114 collections.append(res) 1115 if sort is False: 1116 self.formatter.format_collection(res, only) 1117 else: 1118 if sort: 1119 objects.append(res) 1120 else: 1121 self._format_obj(res, only) 1122 1123 if sort: 1124 objects.sort(key=self.comp_key) 1125 collections = self._merge_collections_with_same_path(collections) 1126 collections.sort(key=self.comp_key) 1127 for collection in collections: 1128 self.formatter.format_collection(collection, only) 1129 for obj in objects: 1130 self._format_obj(obj, only) 1131 1132 if path: 1133 for _path in path.split('/'): 1134 # Let's go back to the parent directory 1135 self.working_path.up() 1136 else: 1137 # Save collections only if we listed the current path. 1138 self.collections = collections 1139 1140 def _find_collection(self, collection, collections): 1141 for col in collections: 1142 if col.split_path == collection.split_path: 1143 return col 1144 return None 1145 1146 def _merge_collections_with_same_path(self, collections): 1147 to_return = [] 1148 for collection in collections: 1149 col = self._find_collection(collection, to_return) 1150 if col: 1151 col.backend += " %s" % collection.backend 1152 else: 1153 to_return.append(collection) 1154 return to_return 1155 1156 def _format_obj(self, obj, only): 1157 if only is False or not hasattr(obj, 'id') or obj.id in only: 1158 self.cached_format(obj) 1159 1160 1161 def do_cd(self, line): 1162 """ 1163 cd [PATH] 1164 1165 Follow a path. 1166 ".." is a special case and goes up one directory. 1167 "" is a special case and goes home. 1168 """ 1169 if not len(line.strip()): 1170 self.working_path.home() 1171 elif line.strip() == '..': 1172 self.working_path.up() 1173 else: 1174 self.working_path.cd1(line) 1175 1176 collections = [] 1177 try: 1178 for res in self.do('get_collection', objs=self.COLLECTION_OBJECTS, 1179 split_path=self.working_path.get(), 1180 caps=CapCollection): 1181 if res: 1182 collections.append(res) 1183 except CallErrors as errors: 1184 self.bcall_errors_handler(errors, CollectionNotFound) 1185 1186 if len(collections): 1187 # update the path from the collection if possible 1188 if len(collections) == 1: 1189 self.working_path.split_path = collections[0].split_path 1190 else: 1191 print(u"Path: %s not found" % unicode(self.working_path), file=self.stderr) 1192 self.working_path.restore() 1193 return 1 1194 1195 self._change_prompt() 1196 1197 def _fetch_objects(self, objs): 1198 split_path = self.working_path.get() 1199 1200 try: 1201 for res in self._do_and_retry( 1202 'iter_resources', 1203 objs=objs, 1204 split_path=split_path, 1205 caps=CapCollection 1206 ): 1207 yield res 1208 except CallErrors as errors: 1209 self.bcall_errors_handler(errors, CollectionNotFound) 1210 1211 1212 def all_collections(self): 1213 """ 1214 Get all objects that are collections: regular objects and fake dumb objects. 1215 """ 1216 obj_collections = [obj for obj in self.objects if isinstance(obj, BaseCollection)] 1217 return obj_collections + self.collections 1218 1219 def obj_to_filename(self, obj, dest=None, default=None): 1220 """ 1221 This method can be used to get a filename from an object, using a mask 1222 filled by information of this object. 1223 1224 All patterns are braces-enclosed, and are name of available fields in 1225 the object. 1226 1227 :param obj: object 1228 :type obj: BaseObject 1229 :param dest: dest given by user (default None) 1230 :type dest: str 1231 :param default: default file mask (if not given, this is '{id}-{title}.{ext}') 1232 :type default: str 1233 :rtype: str 1234 """ 1235 if default is None: 1236 default = '{id}-{title}.{ext}' 1237 if dest is None: 1238 dest = '.' 1239 if os.path.isdir(dest): 1240 dest = os.path.join(dest, default) 1241 1242 def repl(m): 1243 field = m.group(1) 1244 if hasattr(obj, field): 1245 value = getattr(obj, field) 1246 if empty(value): 1247 value = 'unknown' 1248 return re.sub('[?:/]', '-', '%s' % value) 1249 else: 1250 return m.group(0) 1251 return re.sub(r'\{(.+?)\}', repl, dest) 1252 1253 # for cd & ls 1254 def complete_path(self, text, line, begidx, endidx): 1255 directories = set() 1256 if len(self.working_path.get()): 1257 directories.add('..') 1258 mline = line.partition(' ')[2] 1259 offs = len(mline) - len(text) 1260 1261 # refresh only if needed 1262 if len(self.objects) == 0 and len(self.collections) == 0: 1263 try: 1264 self.objects, self.collections = self._fetch_objects(objs=self.COLLECTION_OBJECTS) 1265 except CallErrors as errors: 1266 self.bcall_errors_handler(errors, CollectionNotFound) 1267 1268 collections = self.all_collections() 1269 for collection in collections: 1270 directories.add(collection.basename) 1271 1272 return [s[offs:] for s in directories if s.startswith(mline)] 1273 1274 def complete_ls(self, text, line, begidx, endidx): 1275 return self.complete_path(text, line, begidx, endidx) 1276 1277 def complete_cd(self, text, line, begidx, endidx): 1278 return self.complete_path(text, line, begidx, endidx) 1279 1280 # -- formatting related methods ------------- 1281 def set_formatter(self, name): 1282 """ 1283 Set the current formatter from name. 1284 1285 It returns the name of the formatter which has been really set. 1286 """ 1287 try: 1288 self.formatter = self.formatters_loader.build_formatter(name) 1289 except FormatterLoadError as e: 1290 print('%s' % e, file=self.stderr) 1291 if self.DEFAULT_FORMATTER == name: 1292 self.DEFAULT_FORMATTER = ReplApplication.DEFAULT_FORMATTER 1293 print('Falling back to "%s".' % (self.DEFAULT_FORMATTER), file=self.stderr) 1294 self.formatter = self.formatters_loader.build_formatter(self.DEFAULT_FORMATTER) 1295 name = self.DEFAULT_FORMATTER 1296 if self.options.no_header: 1297 self.formatter.display_header = False 1298 if self.options.no_keys: 1299 self.formatter.display_keys = False 1300 if self.options.outfile: 1301 self.formatter.outfile = self.options.outfile 1302 if self.interactive: 1303 self.formatter.interactive = True 1304 return name 1305 1306 def set_formatter_header(self, string): 1307 pass 1308 1309 def start_format(self, **kwargs): 1310 self.formatter.start_format(**kwargs) 1311 1312 def cached_format(self, obj): 1313 self.add_object(obj) 1314 alias = None 1315 if self.interactive: 1316 alias = '%s' % len(self.objects) 1317 self.format(obj, alias=alias) 1318 1319 def parse_fields(self, fields): 1320 if '$direct' in fields: 1321 return [] 1322 if '$full' in fields: 1323 return None 1324 return fields 1325 1326 def format(self, result, alias=None): 1327 fields = self.parse_fields(self.selected_fields) 1328 try: 1329 self.formatter.format(obj=result, selected_fields=fields, alias=alias) 1330 except FieldNotFound as e: 1331 print(e, file=self.stderr) 1332 except MandatoryFieldsNotFound as e: 1333 print('%s Hint: select missing fields or use another formatter (ex: multiline).' % e, file=self.stderr) 1334 1335 def flush(self): 1336 self.formatter.flush() 1337 1338 def do_debug(self, line): 1339 """ 1340 debug 1341 1342 Launch a debug Python shell 1343 """ 1344 1345 from weboob.applications.weboobdebug import weboobdebug 1346 1347 app = weboobdebug.WeboobDebug() 1348 locs = dict(application=self, weboob=self.weboob) 1349 banner = ('Weboob debug shell\n\nAvailable variables:\n' 1350 + '\n'.join([' %s: %s' % (k, v) for k, v in locs.items()])) 1351 1352 funcs = [app.ipython, app.bpython, app.python] 1353 app.launch(funcs, locs, banner) 1354