1# Copyright (C) 2015 Kristoffer Gronlund <kgronlund@suse.com> 2# See COPYING for license information. 3 4import os 5import re 6import subprocess 7import getpass 8import time 9import shutil 10import socket 11import random 12from copy import deepcopy 13from glob import glob 14from lxml import etree 15 16try: 17 import json 18except ImportError: 19 import simplejson as json 20 21 22try: 23 import parallax 24 has_parallax = True 25except ImportError: 26 has_parallax = False 27 28 29from . import config 30from . import handles 31from . import options 32from . import userdir 33from . import utils 34from .msg import err_buf, common_debug 35 36 37_script_cache = None 38_script_version = 2.2 39_strict_handles = False 40 41_action_shortdescs = { 42 'cib': 'Configure cluster resources', 43 'install': 'Install packages', 44 'service': 'Manage system services', 45 'call': 'Run command on nodes', 46 'copy': 'Install file on nodes', 47 'crm': 'Run crm command', 48 'collect': 'Collect data from nodes', 49 'verify': 'Verify collected data', 50 'apply': 'Apply changes to nodes', 51 'apply_local': 'Apply changes to cluster' 52} 53 54 55class Text(object): 56 """ 57 Idea: Replace all fields that may contain 58 references to data with Text objects, that 59 lazily resolve when asked to. 60 Context needed is the script in which this 61 Text resolves. What we do is that we install 62 the parameter values in the script, so we can 63 get it from here. 64 65 This can also then be responsible for the 66 various kinds of output cleanup/formatting 67 (desc, cib, etc) 68 """ 69 DESC = 1 70 CIB = 2 71 SHORTDESC = 3 72 73 @staticmethod 74 def shortdesc(script, text): 75 return Text(script, text, kind=Text.SHORTDESC) 76 77 @staticmethod 78 def desc(script, text): 79 return Text(script, text, kind=Text.DESC) 80 81 @staticmethod 82 def cib(script, text): 83 return Text(script, text, kind=Text.CIB) 84 85 @staticmethod 86 def isa(obj): 87 return isinstance(obj, str) or isinstance(obj, Text) 88 89 def __init__(self, script, text, kind=None): 90 self.script = script 91 if isinstance(text, Text): 92 self.text = text.text 93 else: 94 self.text = text 95 self._kind = kind 96 97 def _parse(self): 98 val = self.text 99 if val in (True, False): 100 return "true" if val else "false" 101 if not isinstance(val, str): 102 return str(val) 103 return handles.parse(val, self.script.get('__values__', {})).strip() 104 105 def __repr__(self): 106 return repr(self.text) 107 108 def __str__(self): 109 if self._kind == self.DESC: 110 return format_desc(self._parse()) 111 elif self._kind == self.SHORTDESC: 112 return self._parse() 113 elif self._kind == self.CIB: 114 return format_cib(self._parse()) 115 return self._parse() 116 117 def __eq__(self, obj): 118 return str(self) == str(obj) 119 120 121class WhenExpr(object): 122 def __init__(self, script, prog): 123 self.script = script 124 self.prog = prog 125 126 def __repr__(self): 127 return repr(self.prog) 128 129 def __str__(self): 130 lenv = self.script.get('__values__', {}) 131 inp = handles.parse(self.prog, lenv).strip() 132 try: 133 from .minieval import minieval, InvalidExpression 134 return str(minieval(inp, lenv)).lower() 135 except InvalidExpression as err: 136 raise ValueError(str(err)) 137 138 139def _strip(desc): 140 if desc is None: 141 return None 142 return desc.strip() 143 144 145def format_desc(desc): 146 import textwrap 147 return '\n\n'.join([textwrap.fill(para) for para in desc.split('\n\n') if para.strip()]) 148 149 150def format_cib(text): 151 text = re.sub(r'[ ]+', ' ', text) 152 text = re.sub(r'\n[ \t\f\v]+', '\n\t', text) 153 i = 0 154 while True: 155 i = text.find('\n\t\n') 156 if i < 0: 157 break 158 text = text[:i] + text[i+2:] 159 return text 160 161 162def space_cib(text): 163 """ 164 After merging CIB commands, space separate lines out 165 """ 166 return re.sub(r'\n([^\t])', r'\n\n\1', re.sub(r'[\n\r]+', r'\n', text)) 167 168 169class Actions(object): 170 """ 171 Each method in this class handles a particular action. 172 """ 173 @staticmethod 174 def parse(script, action): 175 """ 176 action: action data (dict) 177 params: flat list of parameter values 178 values: processed list of parameter values (for handles.parse) 179 180 Converts {'cib': "primitive..."} into {"name": "cib", "value": "primitive..."} 181 Each action has two values: "value" may be a non-textual object 182 depending on the type of action. "text" is visual context to display 183 to a user (so a cleaned up CIB, or the list of packages to install) 184 """ 185 name = action['name'] 186 action['value'] = action[name] 187 del action[name] 188 action['text'] = '' 189 value = action['value'] 190 if name == 'install': 191 if Text.isa(value): 192 action['value'] = str(value).split() 193 action['text'] = ' '.join(action['value']) 194 # service takes a list of objects with a single key; 195 # mapping service: state 196 # the text field will be converted to lines where 197 # each line is <service> -> <state> 198 elif name == 'service': 199 if Text.isa(value): 200 value = [dict([v.split(':', 1)]) for v in str(value).split()] 201 action['value'] = value 202 203 def arrow(v): 204 return ' -> '.join(list(v.items())[0]) 205 action['text'] = '\n'.join([arrow(x) for x in value]) 206 elif name == 'cib' or name == 'crm': 207 action['text'] = str(Text.cib(script, value)) 208 action['value'] = _remove_empty_lines(action['text']) 209 elif name == 'call': 210 action['value'] = Text(script, value) 211 elif name == 'copy': 212 action['value'] = Text(script, value) 213 action['template'] = _make_boolean(action.get('template', False)) 214 action['to'] = Text(script, action.get('to', action['value'])) 215 action['text'] = "%s -> %s" % (action['value'], action['to']) 216 217 if 'shortdesc' not in action: 218 action['shortdesc'] = _action_shortdescs.get(name, '') 219 else: 220 action['shortdesc'] = Text.shortdesc(script, action['shortdesc']) 221 if 'longdesc' not in action: 222 action['longdesc'] = '' 223 else: 224 action['longdesc'] = Text.desc(script, action['longdesc']) 225 226 hre = handles.headmatcher 227 ident_re = re.compile(r'([a-z_-][a-z0-9_-]*)$', re.IGNORECASE) 228 229 if 'when' in action: 230 when = action['when'] 231 if ident_re.match(when): 232 action['when'] = Text(script, '{{%s}}' % (when)) 233 elif when: 234 action['when'] = WhenExpr(script, when) 235 else: 236 del action['when'] 237 for k, v in action.items(): 238 if isinstance(v, str) and hre.search(v): 239 v = Text(script, v) 240 if Text.isa(v): 241 action[k] = str(v).strip() 242 243 @staticmethod 244 def mergeable(action): 245 return action['name'] in ('cib', 'crm', 'install', 'service') 246 247 @staticmethod 248 def merge(into, new): 249 """ 250 Merge neighbour actions. 251 Note: When this is called, all text values 252 should already be "reduced", that is, any 253 variable references already resolved. 254 """ 255 if into.get('nodes') != new.get('nodes'): 256 return False 257 if into['name'] in ('cib', 'crm'): 258 into['value'] = '\n'.join([str(into['value']), str(new['value'])]) 259 into['text'] = space_cib('\n'.join([str(into['text']), str(new['text'])])) 260 elif into['name'] == 'service': 261 into['value'].extend(new['value']) 262 into['text'] = '\n'.join([str(into['text']), str(new['text'])]) 263 elif into['name'] == 'install': 264 into['value'].extend(new['value']) 265 into['text'] = ' '.join([str(into['text']), str(new['text'])]) 266 if new['shortdesc']: 267 newd = str(new['shortdesc']) 268 if newd != str(into['shortdesc']): 269 into['shortdesc'] = _strip(newd) 270 if new['longdesc']: 271 newd = str(new['longdesc']) 272 if newd != str(into['longdesc']): 273 into['longdesc'] = newd 274 return True 275 276 @staticmethod 277 def needs_sudo(action): 278 if action['name'] == 'call': 279 return action.get('sudo') or action.get('nodes') != 'local' 280 return action['name'] in ('apply', 'apply_local', 'install', 'service') 281 282 def __init__(self, run, action): 283 self._run = run 284 self._action = action 285 self._value = action['value'] 286 if not isinstance(self._value, list): 287 self._value = str(self._value) 288 self._text = str(action['text']) 289 self._nodes = str(action.get('nodes', '')) 290 291 def collect(self): 292 "input: shell command" 293 self._run.run_command(self._nodes or 'all', self._value, True) 294 self._run.record_json() 295 296 def validate(self): 297 "input: shell command" 298 self._run.run_command(None, self._value, True) 299 self._run.validate_json() 300 301 def apply(self): 302 "input: shell command" 303 self._run.run_command(self._nodes or 'all', self._value, True) 304 self._run.record_json() 305 306 def apply_local(self): 307 "input: shell command" 308 self._run.run_command(None, self._value, True) 309 self._run.record_json() 310 311 def report(self): 312 "input: shell command" 313 self._run.run_command(None, self._value, False) 314 self._run.report_result() 315 316 def call(self): 317 """ 318 input: shell command / script 319 320 TODO: actually allow script here 321 """ 322 self._run.call(self._nodes, self._value) 323 324 def copy(self): 325 """ 326 copy: <from> 327 to: <path> 328 template: true|false 329 330 TODO: FIXME: Verify that it works... 331 TODO: FIXME: Error handling 332 """ 333 if not os.path.exists(self._value): 334 raise ValueError("File not found: %s" % (self._value)) 335 if self._action['template']: 336 fn = self._run.str2tmp(str(Text.cib(self._run.script, open(self._value).read()))) 337 self._value = fn 338 self._run.copy_file(self._nodes, self._value, str(self._action['to'])) 339 340 def _crm_do(self, act): 341 fn = self._run.str2tmp(_join_script_lines(self._value)) 342 if config.core.debug: 343 args = '-d --wait --no' 344 else: 345 args = '--wait --no' 346 if self._action.get('force'): 347 args = args + ' --force' 348 self._run.call(None, 'crm %s %s %s' % (args, act, fn)) 349 350 def crm(self): 351 """ 352 input: crm command sequence 353 """ 354 return self._crm_do('-f') 355 356 def cib(self): 357 "input: cli configuration script" 358 return self._crm_do('configure load update') 359 360 def install(self): 361 """ 362 input: list of packages 363 or: map of <os>: <packages> 364 """ 365 self._run.execute_shell(self._nodes or 'all', '''#!/usr/bin/env python3 366import crm_script 367import crm_init 368 369crm_init.install_packages(%s) 370crm_script.exit_ok(True) 371 ''' % (self._value)) 372 373 def service(self): 374 values = [] 375 for s in self._value: 376 for v in s.items(): 377 values.append(v) 378 services = "\n".join([('crm_script.service%s' % repr(v)) for v in values]) 379 self._run.execute_shell(self._nodes or 'all', '''#!/usr/bin/env python3 380import crm_script 381import crm_init 382 383%s 384crm_script.exit_ok(True) 385''' % (services)) 386 387 def include(self): 388 """ 389 Treated differently: at parse time, 390 the include actions should disappear 391 and be replaced with actions generated 392 from the include. Either from an included 393 script, or a cib generated from an agent 394 include. 395 """ 396 397 398_actions = dict([(n, getattr(Actions, n)) for n in dir(Actions) if not n.startswith('_')]) 399 400 401def _find_action(action): 402 """return name of action for action""" 403 for a in list(_actions.keys()): 404 if a in action: 405 return a 406 return None 407 408 409def _make_options(params): 410 "Setup parallax options." 411 opts = parallax.Options() 412 opts.inline = True 413 opts.timeout = int(params['timeout']) 414 opts.recursive = True 415 opts.ssh_options += [ 416 'KbdInteractiveAuthentication=no', 417 'PreferredAuthentications=gssapi-with-mic,gssapi-keyex,hostbased,publickey', 418 'PasswordAuthentication=no', 419 'StrictHostKeyChecking=no', 420 'ControlPersist=no'] 421 if options.regression_tests: 422 opts.ssh_extra += ['-vvv'] 423 return opts 424 425 426def _parse_yaml(scriptname, scriptfile): 427 data = None 428 try: 429 import yaml 430 with open(scriptfile) as f: 431 data = yaml.load(f, Loader=yaml.SafeLoader) 432 if isinstance(data, list): 433 data = data[0] 434 except ImportError as e: 435 raise ValueError("Failed to load yaml module: %s" % (e)) 436 except Exception as e: 437 raise ValueError("Failed to parse script main: %s" % (e)) 438 439 if data: 440 ver = data.get('version') 441 if ver is None or str(ver) != str(_script_version): 442 data = _upgrade_yaml(data) 443 444 if 'parameters' in data: 445 data['steps'] = [{'parameters': data['parameters']}] 446 del data['parameters'] 447 elif 'steps' not in data: 448 data['steps'] = [] 449 data['name'] = scriptname 450 data['dir'] = os.path.dirname(scriptfile) 451 return data 452 453 454def _rename(obj, key, to): 455 if key in obj: 456 obj[to] = obj[key] 457 del obj[key] 458 459 460def _upgrade_yaml(data): 461 """ 462 Upgrade a parsed yaml document from 463 an older version. 464 """ 465 if 'version' in data and data['version'] > _script_version: 466 raise ValueError("Unknown version (expected < %s, got %s)" % (_script_version, data['version'])) 467 468 data['version'] = _script_version 469 data['category'] = data.get('category', 'Legacy') 470 _rename(data, 'name', 'shortdesc') 471 _rename(data, 'description', 'longdesc') 472 473 data['actions'] = data.get('steps', []) 474 paramstep = {'parameters': data.get('parameters', [])} 475 data['steps'] = [paramstep] 476 if 'parameters' in data: 477 del data['parameters'] 478 479 for p in paramstep['parameters']: 480 _rename(p, 'description', 'shortdesc') 481 _rename(p, 'default', 'value') 482 if 'required' not in p: 483 p['required'] = 'value' not in p 484 485 for action in data['actions']: 486 _rename(action, 'name', 'shortdesc') 487 488 return data 489 490 491_hawk_template_cache = {} 492 493 494def _parse_hawk_template(workflow, name, kind, step, actions): 495 """ 496 Convert a hawk template into steps + a cib action 497 """ 498 path = os.path.join(os.path.dirname(workflow), '../templates', kind + '.xml') 499 if path in _hawk_template_cache: 500 xml = _hawk_template_cache[path] 501 elif os.path.isfile(path): 502 xml = etree.parse(path).getroot() 503 common_debug("Found matching template: %s" % (path)) 504 _hawk_template_cache[path] = xml 505 else: 506 raise ValueError("Template does not exist: %s" % (path)) 507 508 step['shortdesc'] = _strip(''.join(xml.xpath('./shortdesc/text()'))) 509 step['longdesc'] = ''.join(xml.xpath('./longdesc/text()')) 510 511 actions.append({'cib': _hawk_to_handles(name, xml.xpath('./crm_script')[0])}) 512 513 for item in xml.xpath('./parameters/parameter'): 514 obj = {} 515 obj['name'] = item.get('name') 516 obj['required'] = item.get('required', False) 517 content = next(item.iter('content')) 518 obj['type'] = content.get('type', 'string') 519 val = content.get('default', content.get('value', None)) 520 if val: 521 obj['value'] = val 522 obj['shortdesc'] = _strip(''.join(item.xpath('./shortdesc/text()'))) 523 obj['longdesc'] = ''.join(item.xpath('./longdesc/text()')) 524 step['parameters'].append(obj) 525 526 527def _mkhandle(pfx, scope, text): 528 if scope: 529 return '{{%s%s:%s}}' % (pfx, scope, text) 530 else: 531 return '{{%s%s}}' % (pfx, text) 532 533 534def _hawk_to_handles(context, tag): 535 """ 536 input: a context name to prefix variable references with (may be empty) 537 and a crm_script tag 538 output: text with {{handles}} 539 """ 540 s = "" 541 s += tag.text 542 for c in tag: 543 if c.tag == 'if': 544 cond = c.get('set') 545 if cond: 546 s += _mkhandle('#', context, cond) 547 s += _hawk_to_handles(context, c) 548 s += _mkhandle('/', context, cond) 549 elif c.tag == 'insert': 550 param = c.get('param') 551 src = c.get('from_template') or context 552 s += _mkhandle('', src, param) 553 s += c.tail 554 return s 555 556 557def _parse_hawk_workflow(scriptname, scriptfile): 558 """ 559 Reads a hawk workflow into a script. 560 561 TODO: Parse hawk workflows that invoke legacy cluster scripts? 562 """ 563 xml = etree.parse(scriptfile).getroot() 564 if xml.tag != "workflow": 565 raise ValueError("Not a hawk workflow: %s" % (scriptfile)) 566 data = { 567 'version': 2.2, 568 'name': scriptname, 569 'shortdesc': _strip(''.join(xml.xpath('./shortdesc/text()'))), 570 'longdesc': ''.join(xml.xpath('./longdesc/text()')), 571 'category': ''.join(xml.xpath('./@category')) or 'Wizard', 572 'dir': None, 573 'steps': [], 574 'actions': [], 575 } 576 577 # the parameters together form a step with an optional shortdesc 578 # then each template becomes an additional step with an optional shortdesc 579 paramstep = { 580 'shortdesc': _strip(''.join(xml.xpath('./parameters/stepdesc/text()'))), 581 'parameters': [] 582 } 583 data['steps'].append(paramstep) 584 for item in xml.xpath('./parameters/parameter'): 585 obj = {} 586 obj['name'] = item.get('name') 587 obj['required'] = item.get('required', False) 588 obj['unique'] = item.get('unique', False) 589 content = next(item.iter('content')) 590 obj['type'] = content.get('type', 'string') 591 val = content.get('default', content.get('value', None)) 592 if val is not None: 593 obj['value'] = val 594 obj['shortdesc'] = _strip(''.join(item.xpath('./shortdesc/text()'))) 595 obj['longdesc'] = ''.join(item.xpath('./longdesc/text()')) 596 paramstep['parameters'].append(obj) 597 598 data['actions'] = [] 599 600 for item in xml.xpath('./templates/template'): 601 templatestep = { 602 'shortdesc': _strip(''.join(item.xpath('./stepdesc/text()'))), 603 'name': item.get('name'), 604 # Optional steps in the legacy wizards was broken (!?) 605 'required': True, # item.get('required'), 606 'parameters': [] 607 } 608 data['steps'].append(templatestep) 609 610 _parse_hawk_template(scriptfile, item.get('name'), item.get('type', item.get('name')), 611 templatestep, data['actions']) 612 for override in item.xpath('./override'): 613 name = override.get("name") 614 for param in templatestep['parameters']: 615 if param['name'] == name: 616 param['value'] = override.get("value") 617 param['required'] = False 618 break 619 620 data['actions'].append({'cib': _hawk_to_handles('', xml.xpath('./crm_script')[0])}) 621 622 if config.core.debug: 623 import pprint 624 print("Parsed hawk workflow:") 625 pprint.pprint(data) 626 return data 627 628 629def build_script_cache(): 630 global _script_cache 631 if _script_cache is not None: 632 return 633 _script_cache = {} 634 for d in _script_dirs(): 635 if d: 636 for s in glob(os.path.join(d, '*/main.yml')): 637 name = os.path.dirname(s).split('/')[-1] 638 if name not in _script_cache: 639 _script_cache[name] = os.path.join(d, s) 640 for s in glob(os.path.join(d, '*.yml')): 641 name = os.path.splitext(os.path.basename(s))[0] 642 if name not in _script_cache: 643 _script_cache[name] = os.path.join(d, s) 644 for s in glob(os.path.join(d, 'workflows/*.xml')): 645 name = os.path.splitext(os.path.basename(s))[0] 646 if name not in _script_cache: 647 _script_cache[name] = os.path.join(d, s) 648 649 650def list_scripts(): 651 ''' 652 List the available cluster installation scripts. 653 Yields the names of the main script files. 654 ''' 655 build_script_cache() 656 return sorted(_script_cache.keys()) 657 658 659def _meta_text(meta, tag): 660 for c in meta.iterchildren(tag): 661 return c.text 662 return '' 663 664 665def _listfind(needle, haystack, keyfn): 666 for x in haystack: 667 if keyfn(x) == needle: 668 return x 669 return None 670 671 672def _listfindpend(needle, haystack, keyfn, orfn): 673 for x in haystack: 674 if keyfn(x) == needle: 675 return x 676 x = orfn() 677 haystack.append(x) 678 return x 679 680 681def _make_cib_for_agent(name, agent, data, ops): 682 aid = "{{%s:id}}" % (name) if name else "{{id}}" 683 template = ['primitive %s %s' % (aid, agent)] 684 params = [] 685 ops = [op.strip() for op in ops.split('\n') if op.strip()] 686 for param in data['parameters']: 687 paramname = param['name'] 688 if paramname == 'id': 689 # FIXME: What if the resource actually has a parameter named id? 690 continue 691 path = ':'.join((name, paramname)) if name else paramname 692 params.append('{{#%s}}%s="{{%s}}"{{/%s}}' % (path, paramname, path, path)) 693 ret = '\n\t'.join(template + params + ops) 694 return ret 695 696 697def _merge_objects(o1, o2): 698 for key, value in o2.items(): 699 o1[key] = value 700 701 702def _lookup_step(name, steps, stepname): 703 for step in steps: 704 if step.get('name', '') == stepname: 705 return step 706 if not stepname and len(steps) == 1: 707 return steps[0] 708 if not stepname: 709 raise ValueError("Parameter '%s' not found" % (name)) 710 raise ValueError("Referenced step '%s' not found in '%s'" % (stepname, name)) 711 712 713def _process_agent_include(script, include): 714 from . import ra 715 agent = include['agent'] 716 info = ra.get_ra(agent) 717 meta = info.meta() 718 if meta is None: 719 raise ValueError("No meta-data for agent: %s" % (agent)) 720 name = include.get('name', meta.get('name')) 721 if not name: 722 cls, provider, name = ra.disambiguate_ra_type(agent) 723 if 'name' not in include: 724 include['name'] = name 725 step = _listfindpend(name, script['steps'], lambda x: x.get('name'), lambda: { 726 'name': name, 727 'longdesc': '', 728 'shortdesc': '', 729 'parameters': [], 730 }) 731 step['longdesc'] = include.get('longdesc') or _meta_text(meta, 'longdesc') 732 step['shortdesc'] = _strip(include.get('shortdesc') or _meta_text(meta, 'shortdesc')) 733 step['required'] = include.get('required', True) 734 step['parameters'].append({ 735 'name': 'id', 736 'shortdesc': 'Identifier for the cluster resource', 737 'longdesc': '', 738 'required': True, 739 'unique': True, 740 'type': 'resource', 741 }) 742 743 def newparamobj(param): 744 pname = param.get('name') 745 return _listfindpend(pname, step['parameters'], lambda x: x.get('name'), lambda: {'name': pname}) 746 747 for param in meta.xpath('./parameters/parameter'): 748 pobj = newparamobj(param) 749 pobj['required'] = _make_boolean(param.get('required', False)) 750 pobj['unique'] = _make_boolean(param.get('unique', False)) 751 pobj['longdesc'] = _meta_text(param, 'longdesc') 752 pobj['shortdesc'] = _strip(_meta_text(param, 'shortdesc')) 753 # set 'advanced' flag on all non-required agent parameters by default 754 # a UI should hide these parameters unless "show advanced" is set 755 pobj['advanced'] = not pobj['required'] 756 ctype = param.xpath('./content/@type') 757 cexample = param.xpath('./content/@default') 758 if ctype: 759 pobj['type'] = ctype[0] 760 if cexample: 761 pobj['example'] = cexample[0] 762 763 for param in include.get('parameters', []): 764 pobj = newparamobj(param) 765 # Make any overriden parameters non-advanced 766 # unless explicitly set to advanced 767 pobj['advanced'] = False 768 for key, value in param.items(): 769 if key in ('shortdesc', 'longdesc'): 770 pobj[key] = value 771 elif key == 'value': 772 pobj[key] = Text(script, value) 773 else: 774 pobj[key] = value 775 if 'value' in pobj: 776 pobj['required'] = False 777 778 # If the script doesn't have any base parameters 779 # and the name of this step is the same as the 780 # script name itself, then make this the base step 781 hoist = False 782 hoist_from = None 783 if step['name'] == script['name']: 784 zerostep = _listfind('', script['steps'], lambda x: x.get('name', '')) 785 if not zerostep: 786 hoist = True 787 elif zerostep.get('parameters'): 788 zp = zerostep['parameters'] 789 for pname in [p['name'] for p in step['parameters']]: 790 if _listfind(pname, zp, lambda x: x['name']): 791 break 792 else: 793 hoist, hoist_from = True, zerostep 794 795 # use step['name'] here in case we did the zerostep hoist 796 step['value'] = Text.cib(script, _make_cib_for_agent('' if hoist else step['name'], 797 agent, step, include.get('ops', ''))) 798 799 if hoist: 800 step['name'] = '' 801 if hoist_from: 802 step['parameters'] = hoist_from['parameters'] + step['parameters'] 803 script['steps'] = [s for s in script['steps'] if s != hoist_from] 804 805 if not step['name']: 806 del step['name'] 807 808 # this works despite possible hoist above, 809 # since name is still the actual name 810 for action in script['actions']: 811 if 'include' in action and action['include'] == name: 812 del action['include'] 813 action['cib'] = step['value'] 814 815 816def _process_script_include(script, include): 817 script_name = include['script'] 818 if 'name' not in include: 819 include['name'] = script_name 820 subscript = load_script(script_name) 821 name = include['name'] 822 823 scriptstep = { 824 'name': name, 825 'shortdesc': subscript['shortdesc'], 826 'longdesc': subscript['longdesc'], 827 'required': _make_boolean(include.get('required', True)), 828 'steps': deepcopy(subscript['steps']), 829 'sub-script': subscript, 830 } 831 832 def _merge_step_params(step, params): 833 for param in params: 834 _merge_step_param(step, param) 835 836 def _merge_step_param(step, param): 837 for p in step.get('parameters', []): 838 if p['name'] == param['name']: 839 for key, value in param.items(): 840 if key in ('shortdesc', 'longdesc'): 841 p[key] = value 842 elif key == 'value' and Text.isa(value): 843 p[key] = Text(script, value) 844 else: 845 p[key] = value 846 if 'value' in p: 847 p['required'] = False 848 break 849 else: 850 raise ValueError("Referenced parameter '%s' not found in '%s'" % (param['name'], name)) 851 852 for incparam in include.get('parameters', []): 853 if 'step' in incparam and 'name' not in incparam: 854 _merge_step_params(_lookup_step(name, scriptstep.get('steps', []), incparam['step']), 855 incparam['parameters']) 856 else: 857 _merge_step_param(_lookup_step(name, scriptstep.get('steps', []), ''), 858 incparam) 859 860 script['steps'].append(scriptstep) 861 862 863def _process_include(script, include): 864 """ 865 includes add parameter steps and actions 866 an agent include works like a hawk template: 867 it adds a parameter step 868 a script include however adds any number of 869 parameter steps and actions 870 871 OK. here's what to do: Don't rescope the steps 872 and actions. Instead, keep the actions attached 873 to script step 0, as above. And for each step, add 874 a scope which states its scope. Then, when evaluating 875 handles, build custom environments for those scopes to 876 pass into handles.parse. 877 878 This is just for scripts, no need to do this for agents. 879 Of course, how about scripts that include other scripts? 880 _scope has to be a list which gets expanded... 881 """ 882 if 'agent' in include: 883 return _process_agent_include(script, include) 884 885 elif 'script' in include: 886 return _process_script_include(script, include) 887 else: 888 raise ValueError("Unknown include type: %s" % (', '.join(list(include.keys())))) 889 890 891def _postprocess_script_step(script, step): 892 if 'name' in step and not step['name']: 893 del step['name'] 894 step['required'] = _make_boolean(step.get('required', True)) 895 step['shortdesc'] = _strip(step.get('shortdesc', '')) 896 step['longdesc'] = step.get('longdesc', '') 897 for p in step.get('parameters', []): 898 if 'name' not in p: 899 raise ValueError("Parameter has no name: %s" % (list(p.keys()))) 900 p['shortdesc'] = _strip(p.get('shortdesc', '')) 901 p['longdesc'] = p.get('longdesc', '') 902 if 'default' in p and 'value' not in p: 903 p['value'] = p['default'] 904 del p['default'] 905 if 'value' in p: 906 if p['value'] is None: 907 del p['value'] 908 elif isinstance(p['value'], str): 909 p['value'] = Text(script, p['value']) 910 if 'required' not in p: 911 p['required'] = False 912 else: 913 p['required'] = _make_boolean(p['required']) 914 if 'advanced' in p: 915 p['advanced'] = _make_boolean(p['advanced']) 916 else: 917 p['advanced'] = False 918 if 'unique' in p: 919 p['unique'] = _make_boolean(p['unique']) 920 else: 921 p['unique'] = False 922 if 'type' not in p or p['type'] == '': 923 if p['name'] == 'id': 924 p['type'] = 'resource' 925 else: 926 p['type'] = 'string' 927 for s in step.get('steps', []): 928 _postprocess_script_step(script, s) 929 930 931def _postprocess_script_steps(script): 932 def empty(step): 933 if 'parameters' in step and len(step['parameters']) > 0: 934 return False 935 if 'steps' in step and len(step['steps']) > 0: 936 return False 937 return True 938 939 script['steps'] = [step for step in script['steps'] if not empty(step)] 940 941 for step in script['steps']: 942 _postprocess_script_step(script, step) 943 944 945def _postprocess_script(script): 946 """ 947 Post-process the parsed script into an executable 948 form. This means parsing all included agents and 949 scripts, merging parameters, steps and actions. 950 """ 951 ver = script.get('version') 952 if ver is None or str(ver) != str(_script_version): 953 raise ValueError("Unsupported script version (expected %s, got %s)" % (_script_version, repr(ver))) 954 955 if 'category' not in script: 956 script['category'] = 'Custom' 957 958 if 'actions' not in script: 959 script['actions'] = [] 960 961 # if we include subscripts but have no defined actions, assume that's a 962 # mistake and generate include actions for all includes 963 for inc in [{"include": inc['name']} for inc in script.get('include', [])]: 964 script['actions'].append(inc) 965 966 _postprocess_script_steps(script) 967 968 # Includes may add steps, or modify parameters, 969 # but assume that any included data is already 970 # postprocessed. To run this before the 971 # step processing would risk replacing Text() objects 972 # with references to other scripts with references 973 # to this script. 974 for inc in script.get('include', []): 975 _process_include(script, inc) 976 977 for action in script['actions']: 978 if 'include' in action: 979 includes = [inc['name'] for inc in script.get('include', [])] 980 if action['include'] not in includes: 981 raise ValueError("Script references '%s', but only includes: %s" % 982 (action['include'], ', '.join(includes))) 983 984 if 'include' in script: 985 del script['include'] 986 987 def _setdesc(name): 988 desc = script.get(name) 989 if desc is None: 990 desc = '' 991 if not desc: 992 if script['steps'] and script['steps'][0][name]: 993 desc = script['steps'][0][name] 994 script['steps'][0][name] = '' 995 script[name] = desc 996 _setdesc('shortdesc') 997 _setdesc('longdesc') 998 999 return script 1000 1001 1002def _join_script_lines(txt): 1003 s = "" 1004 current_line = "" 1005 for line in [line for line in txt.split('\n')]: 1006 if not line.strip(): 1007 pass 1008 elif re.match(r'^\s+\S', line): 1009 current_line += line 1010 else: 1011 if current_line.strip(): 1012 s += current_line + "\n" 1013 current_line = line 1014 if current_line: 1015 s += current_line + "\n" 1016 return s 1017 1018 1019def load_script_file(script, filename): 1020 if filename.endswith('.yml'): 1021 parsed = _parse_yaml(script, filename) 1022 elif filename.endswith('.xml'): 1023 parsed = _parse_hawk_workflow(script, filename) 1024 if parsed is None: 1025 raise ValueError("Failed to parse script: %s (%s)" % (script, filename)) 1026 obj = _postprocess_script(parsed) 1027 if 'name' in obj: 1028 script = obj['name'] 1029 if script not in _script_cache or isinstance(_script_cache[script], str): 1030 _script_cache[script] = obj 1031 return obj 1032 1033 1034def load_script_string(script, yml): 1035 build_script_cache() 1036 import io 1037 import yaml 1038 data = yaml.load(io.StringIO(yml), Loader=yaml.SafeLoader) 1039 if isinstance(data, list): 1040 data = data[0] 1041 if 'parameters' in data: 1042 data['steps'] = [{'parameters': data['parameters']}] 1043 del data['parameters'] 1044 elif 'steps' not in data: 1045 data['steps'] = [] 1046 data['name'] = script 1047 data['dir'] = None 1048 1049 obj = _postprocess_script(data) 1050 if 'name' in obj: 1051 script = obj['name'] 1052 _script_cache[script] = obj 1053 return obj 1054 1055 1056def load_script(script): 1057 build_script_cache() 1058 if script not in _script_cache: 1059 common_debug("cache: %s" % (list(_script_cache.keys()))) 1060 raise ValueError("Script not found: %s" % (script)) 1061 s = _script_cache[script] 1062 if isinstance(s, str): 1063 try: 1064 return load_script_file(script, s) 1065 except KeyError as err: 1066 raise ValueError("Error when loading script %s: Expected key %s not found" % (script, err)) 1067 except Exception as err: 1068 raise ValueError("Error when loading script %s: %s" % (script, err)) 1069 return s 1070 1071 1072def _script_dirs(): 1073 "list of directories that may contain cluster scripts" 1074 ret = [d for d in options.scriptdir.split(';') if d and os.path.isdir(d)] 1075 return ret + [os.path.join(userdir.CONFIG_HOME, 'scripts'), 1076 os.path.join(config.path.sharedir, 'scripts'), 1077 config.path.hawk_wizards] 1078 1079 1080def _check_control_persist(): 1081 ''' 1082 Checks if ControlPersist is available. If so, 1083 we'll use it to make things faster. 1084 ''' 1085 cmd = 'ssh -o ControlPersist'.split() 1086 if options.regression_tests: 1087 print((".EXT", cmd)) 1088 cmd = subprocess.Popen(cmd, 1089 stdout=subprocess.PIPE, 1090 stderr=subprocess.PIPE) 1091 (out, err) = cmd.communicate() 1092 return "Bad configuration option" not in err 1093 1094 1095def _parallax_call(printer, hosts, cmd, opts): 1096 "parallax.call with debug logging" 1097 printer.debug("parallax.call(%s, %s)" % (repr(hosts), cmd)) 1098 return parallax.call(hosts, cmd, opts) 1099 1100 1101def _resolve_script(name): 1102 for p in list_scripts(): 1103 if p.endswith('main.yml') and os.path.dirname(p).endswith('/' + name): 1104 return p 1105 elif p.endswith('.yml') and os.path.splitext(os.path.basename(p))[0] == name: 1106 return p 1107 elif p.endswith('.xml') and os.path.splitext(os.path.basename(p))[0] == name: 1108 return p 1109 return None 1110 1111 1112def _parallax_copy(printer, hosts, src, dst, opts): 1113 "parallax.copy with debug logging" 1114 printer.debug("parallax.copy(%s, %s, %s)" % (repr(hosts), src, dst)) 1115 return parallax.copy(hosts, src, dst, opts) 1116 1117 1118def _tempname(prefix): 1119 return '%s-%s%s' % (prefix, 1120 hex(int(time.time()))[2:], 1121 hex(random.randint(0, 2**48))[2:]) 1122 1123 1124def _generate_workdir_name(): 1125 ''' 1126 Generate a temporary folder name to use while 1127 running the script 1128 ''' 1129 # TODO: make use of /tmp configurable 1130 basefile = _tempname('crm-tmp') 1131 basetmp = os.path.join(utils.get_tempdir(), basefile) 1132 if os.path.isdir(basetmp): 1133 raise ValueError("Invalid temporary workdir %s" % (basetmp)) 1134 return basetmp 1135 1136 1137def _print_debug(printer, local_node, hosts, workdir, opts): 1138 "Print debug output (if any)" 1139 dbglog = os.path.join(workdir, 'crm_script.debug') 1140 if hosts: 1141 for host, result in _parallax_call(printer, hosts, 1142 "if [ -f '%s' ]; then cat '%s'; fi" % (dbglog, dbglog), 1143 opts).items(): 1144 if isinstance(result, parallax.Error): 1145 printer.error(host, result) 1146 else: 1147 printer.output(host, *result) 1148 if os.path.isfile(dbglog): 1149 f = open(dbglog).read() 1150 printer.output(local_node, 0, f, '') 1151 1152 1153def _cleanup_local(workdir): 1154 "clean up the local tmp dir" 1155 if workdir and os.path.isdir(workdir): 1156 cleanscript = os.path.join(workdir, 'crm_clean.py') 1157 if os.path.isfile(cleanscript): 1158 if subprocess.call([cleanscript, workdir], shell=False) != 0: 1159 shutil.rmtree(workdir) 1160 else: 1161 shutil.rmtree(workdir) 1162 1163 1164def _run_cleanup(printer, has_remote_actions, local_node, hosts, workdir, opts): 1165 "Clean up after the cluster script" 1166 if has_remote_actions and hosts and workdir: 1167 cleanscript = os.path.join(workdir, 'crm_clean.py') 1168 for host, result in _parallax_call(printer, hosts, 1169 "%s %s" % (cleanscript, 1170 workdir), 1171 opts).items(): 1172 if isinstance(result, parallax.Error): 1173 printer.error(host, "Clean: %s" % (result)) 1174 else: 1175 printer.output(host, *result) 1176 _cleanup_local(workdir) 1177 1178 1179def _extract_localnode(hosts): 1180 """ 1181 Remove loal node from hosts list, so 1182 we can treat it separately 1183 """ 1184 this_node = utils.this_node() 1185 hosts2 = [] 1186 local_node = None 1187 for h, p, u in hosts: 1188 if h != this_node: 1189 hosts2.append((h, p, u)) 1190 else: 1191 local_node = (h, p, u) 1192 err_buf.debug("Local node: %s, Remote hosts: %s" % ( 1193 local_node, 1194 ', '.join(h[0] for h in hosts2))) 1195 return local_node, hosts2 1196 1197 1198# TODO: remove common params? 1199# Pass them in a separate list of options? 1200# Right now these names are basically reserved.. 1201def common_params(): 1202 "Parameters common to all cluster scripts" 1203 return [('nodes', None, 'List of nodes to execute the script for'), 1204 ('dry_run', 'no', 'If set, simulate execution only'), 1205 ('action', None, 'If set, only execute a single action (index, as returned by verify)'), 1206 ('statefile', None, 'When single-stepping, the state is saved in the given file'), 1207 ('user', config.core.user or None, 'Run script as the given user'), 1208 ('sudo', 'no', 1209 'If set, crm will prompt for a sudo password and use sudo when appropriate'), 1210 ('port', None, 'Port to connect on'), 1211 ('timeout', '600', 'Execution timeout in seconds')] 1212 1213 1214def _common_param_default(name): 1215 for param, default, _ in common_params(): 1216 if param == name: 1217 return default 1218 return None 1219 1220 1221def _filter_dict(d, name, fn, *args): 1222 'filter the given element in the dict through the function fn' 1223 d[name] = fn(d[name], *args) 1224 1225 1226def _filter_nodes(nodes, user, port): 1227 'filter function for the nodes element' 1228 if nodes: 1229 nodes = nodes.replace(',', ' ').split() 1230 else: 1231 nodes = utils.list_cluster_nodes() 1232 if not nodes: 1233 raise ValueError("No hosts") 1234 nodes = [(node, port or None, user or None) for node in nodes] 1235 return nodes 1236 1237 1238def _scoped_param(context, name): 1239 if context: 1240 return ':'.join(context) + ':' + name 1241 return name 1242 1243 1244def _find_by_name(params, name): 1245 try: 1246 return next(x for x in params if x.get('name') == name) 1247 except StopIteration: 1248 return None 1249 1250 1251_IDENT_RE = re.compile(r'^([a-z0-9_#$-][^\s=]*)$', re.IGNORECASE) 1252 1253 1254def is_valid_ipv4_address(address): 1255 try: 1256 socket.inet_pton(socket.AF_INET, address) 1257 except AttributeError: 1258 try: 1259 socket.inet_aton(address) 1260 except socket.error: 1261 return False 1262 return address.count('.') == 3 1263 except socket.error: # not a valid address 1264 return False 1265 1266 return True 1267 1268 1269def is_valid_ipv6_address(address): 1270 try: 1271 socket.inet_pton(socket.AF_INET6, address) 1272 except socket.error: # not a valid address 1273 return False 1274 return True 1275 1276# Types: 1277# OCF types 1278# 1279# string 1280# integer 1281# boolean 1282# 1283# Propose to add 1284# resource ==> a valid resource identifier 1285# ip_address ==> a valid ipv4 or ipv6 address 1286# ip_network ==> a valid ipv4 or ipv6 network (or address without /XX) 1287# port ==> integer between 0 and 65535 1288# email ==> a valid email address 1289 1290# node ==> name of a node in the cluster 1291# select <value>, <value>, <value>, ... ==> any of the values in the list. 1292# range <n> <m> ==> integer in range 1293# rx <rx> ==> anything matching the regular expression. 1294 1295 1296def _valid_integer(value): 1297 try: 1298 return True, int(value, base=0) 1299 except ValueError: 1300 return False, value 1301 1302 1303def _valid_ip(value): 1304 return is_valid_ipv4_address(value) or is_valid_ipv6_address(value) 1305 1306 1307def _verify_type(param, value, errors): 1308 if value is None: 1309 value = '' 1310 vtype = param.get('type') 1311 if not vtype: 1312 return value 1313 elif vtype == 'integer': 1314 ok, _ = _valid_integer(value) 1315 if not ok: 1316 errors.append("%s=%s is not an integer" % (param.get('name'), value)) 1317 elif vtype == 'string': 1318 return value 1319 elif vtype == 'boolean': 1320 return "true" if _make_boolean(value) else "false" 1321 elif vtype == 'resource': 1322 try: 1323 if not _IDENT_RE.match(value): 1324 errors.append("%s=%s invalid resource identifier" % (param.get('name'), value)) 1325 except TypeError as e: 1326 errors.append("%s=%s %s" % (param.get('name'), value, str(e))) 1327 elif vtype == 'enum': 1328 if 'values' not in param: 1329 errors.append("%s=%s enum without list of values" % (param.get('name'), value)) 1330 else: 1331 opts = param['values'] 1332 if isinstance(opts, str): 1333 opts = opts.replace(',', ' ').split(' ') 1334 for v in opts: 1335 if value.lower() == v.lower(): 1336 return v 1337 errors.append("%s=%s does not match '%s'" % (param.get('name'), value, "|".join(opts))) 1338 elif vtype == 'ip_address': 1339 if not _valid_ip(value): 1340 errors.append("%s=%s is not an IP address" % (param.get('name'), value)) 1341 elif vtype == 'ip_network': 1342 sp = value.rsplit('/', 1) 1343 if len(sp) == 1 and not (is_valid_ipv4_address(value) or is_valid_ipv6_address(value)): 1344 errors.append("%s=%s is not a valid IP network" % (param.get('name'), value)) 1345 elif len(sp) == 2 and (not _valid_ip(sp[0]) or not _valid_integer(sp[1])): 1346 errors.append("%s=%s is not a valid IP network" % (param.get('name'), value)) 1347 else: 1348 errors.append("%s=%s is not a valid IP network" % (param.get('name'), value)) 1349 elif vtype == 'port': 1350 ok, ival = _valid_integer(value) 1351 if not ok: 1352 errors.append("%s=%s is not a valid port" % (param.get('name'), value)) 1353 if ival < 0 or ival > 65535: 1354 errors.append("%s=%s is out of port range" % (param.get('name'), value)) 1355 elif vtype == 'email': 1356 if not re.match(r'[^@]+@[^@]+', value): 1357 errors.append("%s=%s is not a valid email address" % (param.get('name'), value)) 1358 else: 1359 errors.append("%s=%s is unknown type %s" % (param.get('name'), value, vtype)) 1360 return value 1361 1362 1363_NO_RESOLVE = object() 1364 1365 1366def _resolve_direct(step, pname, pvalue, path, errors): 1367 step_parameters = step.get('parameters', []) 1368 step_steps = step.get('steps', []) 1369 param = _find_by_name(step_parameters, pname) 1370 if param is not None: 1371 # resolved to a parameter... now verify the value type? 1372 return _verify_type(param, pvalue, errors) 1373 substep = _find_by_name(step_steps, pname) 1374 if substep is not None: 1375 # resolved to a step... recurse 1376 return _resolve_params(substep, pvalue, path + [pname], errors) 1377 return _NO_RESOLVE 1378 1379 1380def _resolve_unnamed_step(step, pname, pvalue, path, errors): 1381 step_steps = step.get('steps', []) 1382 substep = _find_by_name(step_steps, '') 1383 if substep is not None: 1384 return _resolve_direct(substep, pname, pvalue, path, errors) 1385 return _NO_RESOLVE 1386 1387 1388def _resolve_single_step(step, pname, pvalue, path, errors): 1389 step_steps = step.get('steps', []) 1390 if len(step_steps) >= 1: 1391 first_step = step_steps[0] 1392 return _resolve_direct(first_step, pname, pvalue, path + [first_step.get('name')], errors) 1393 return _NO_RESOLVE 1394 1395 1396def _resolve_params(step, params, path, errors): 1397 """ 1398 any parameter that doesn't resolve is an error 1399 """ 1400 ret = {} 1401 1402 for pname, pvalue in params.items(): 1403 result = _resolve_direct(step, pname, pvalue, path, errors) 1404 if result is not _NO_RESOLVE: 1405 ret[pname] = result 1406 continue 1407 1408 result = _resolve_unnamed_step(step, pname, pvalue, path, errors) 1409 if result is not _NO_RESOLVE: 1410 ret[pname] = result 1411 continue 1412 1413 result = _resolve_single_step(step, pname, pvalue, path, errors) 1414 if result is not _NO_RESOLVE: 1415 stepname = step['steps'][0].get('name', '') 1416 if stepname not in ret: 1417 ret[stepname] = {} 1418 ret[stepname][pname] = result 1419 ret[pname] = result 1420 continue 1421 1422 errors.append("Unknown parameter %s" % (':'.join(path + [pname]))) 1423 1424 return ret 1425 1426 1427def _check_parameters(script, params): 1428 ''' 1429 1. Fill in values where none are supplied and there's a value 1430 in the step data 1431 2. Check missing values 1432 3. For each input parameter: look it up and adjust the path 1433 ''' 1434 errors = [] 1435 # params = deepcopy(params) 1436 # recursively resolve parameters: report 1437 # error if a parameter can't be resolved 1438 # TODO: move "common params" out of the params dict completely 1439 # pass as flags to command line 1440 1441 def _split_commons(params): 1442 ret, cdict = {}, dict([(c, d) for c, d, _ in common_params()]) 1443 for key, value in params.items(): 1444 if key in cdict: 1445 cdict[key] = value 1446 else: 1447 ret[key] = deepcopy(value) 1448 return ret, cdict 1449 1450 params, commons = _split_commons(params) 1451 params = _resolve_params(script, params, [], errors) 1452 1453 if errors: 1454 raise ValueError('\n'.join(errors)) 1455 1456 for key, value in commons.items(): 1457 params[key] = value 1458 1459 def _fill_values(path, into, source, srcreq): 1460 """ 1461 Copy values into into while checking for missing required parameters. 1462 If into has content, all required parameters ARE required, even if the 1463 whole step is not required (since we're supplying it). This is checked 1464 by checking if the step is not required, but there are some parameters 1465 set by the user anyway. 1466 """ 1467 if 'required' in source: 1468 srcreq = (source['required'] and srcreq) or (into and srcreq) 1469 1470 for param in source.get('parameters', []): 1471 if param['name'] not in into: 1472 if 'value' in param: 1473 into[param['name']] = param['value'] 1474 elif srcreq and param['required']: 1475 errors.append(_scoped_param(path, param['name'])) 1476 1477 for step in source.get('steps', []): 1478 required = step.get('required', True) 1479 if not required and step['name'] not in into: 1480 continue 1481 if not required and step['name'] in into and into[step['name']]: 1482 required = True 1483 if 'name' not in step: 1484 _fill_values(path, into, step, required and srcreq) 1485 else: 1486 if step['name'] not in into: 1487 into[step['name']] = {} 1488 _fill_values(path + [step['name']], into[step['name']], step, required and srcreq) 1489 1490 _fill_values([], params, script, True) 1491 1492 if errors: 1493 raise ValueError("Missing required parameter(s): %s" % (', '.join(errors))) 1494 1495 # if config.core.debug: 1496 # from pprint import pprint 1497 # print("Checked script parameters:") 1498 # pprint(params) 1499 return params 1500 1501 1502def _handles_values(ret, script, params, subactions): 1503 """ 1504 Generate a values structure that the handles 1505 templates understands. 1506 """ 1507 def _process(to, context, params): 1508 """ 1509 to: level writing to 1510 context: source step 1511 params: values for step 1512 """ 1513 for key, value in params.items(): 1514 if not isinstance(value, dict): 1515 to[key] = value 1516 1517 for step in context.get('steps', []): 1518 name = step.get('name', '') 1519 if name: 1520 if step['required'] or name in params: 1521 obj = {} 1522 vobj = handles.value(obj, '') 1523 to[name] = vobj 1524 subaction = None 1525 if step.get('sub-script'): 1526 subaction = subactions.get(step['sub-script']['name']) 1527 if subaction and subaction[-1]['name'] == 'cib': 1528 vobj.value = Text.cib(script, subaction[-1]['value']) 1529 else: 1530 vobj.value = Text.cib(script, step.get('value', vobj.value)) 1531 1532 _process(obj, step, params.get(name, {})) 1533 else: 1534 _process(to, step, params) 1535 1536 _process(ret, script, params) 1537 1538 1539def _has_remote_actions(actions): 1540 """ 1541 True if any actions execute on remote nodes 1542 """ 1543 for action in actions: 1544 if action['name'] in ('collect', 'apply', 'install', 'service', 'copy'): 1545 return True 1546 if action.get('nodes') == 'all': 1547 return True 1548 return False 1549 1550 1551def _set_controlpersist(opts): 1552 # _has_controlpersist = _check_control_persist() 1553 # if _has_controlpersist: 1554 # opts.ssh_options += ["ControlMaster=auto", 1555 # "ControlPersist=30s", 1556 # "ControlPath=/tmp/crm-ssh-%r@%h:%p"] 1557 # unfortunately, due to bad interaction between parallax and ssh, 1558 # ControlPersist is broken 1559 # See: http://code.google.com/p/parallel-ssh/issues/detail?id=67 1560 # Supposedly fixed in openssh 6.3, but isn't: This may be an 1561 # issue in parallel-ssh, not openssh 1562 pass 1563 1564 1565def _flatten_parameters(steps): 1566 pret = [] 1567 for step in steps: 1568 stepname = step.get('name', '') 1569 for param in step.get('parameters', []): 1570 if stepname: 1571 pret.append('%s:%s' % (stepname, param['name'])) 1572 else: 1573 pret.append(param['name']) 1574 return pret 1575 1576 1577def param_completion_list(name): 1578 """ 1579 Returns completions for the given script 1580 """ 1581 try: 1582 script = load_script(name) 1583 params = _flatten_parameters(script.get('steps', [])) 1584 ps = [p['name'] + '=' for p in params] 1585 return ps 1586 except Exception: 1587 return [] 1588 1589 1590def _create_script_workdir(script, workdir): 1591 "Create workdir and copy contents of scriptdir into it" 1592 scriptdir = script['dir'] 1593 try: 1594 if scriptdir is not None: 1595 if os.path.basename(scriptdir) == script['name']: 1596 cmd = ["mkdir", "-p", os.path.dirname(workdir)] 1597 else: 1598 cmd = ["mkdir", "-p", workdir] 1599 if options.regression_tests: 1600 print(".EXT", cmd) 1601 if subprocess.call(cmd, shell=False) != 0: 1602 raise ValueError("Failed to create temporary working directory") 1603 # only copytree if script is a dir 1604 if os.path.basename(scriptdir) == script['name']: 1605 shutil.copytree(scriptdir, workdir) 1606 else: 1607 cmd = ["mkdir", "-p", workdir] 1608 if options.regression_tests: 1609 print(".EXT", cmd) 1610 if subprocess.call(cmd, shell=False) != 0: 1611 raise ValueError("Failed to create temporary working directory") 1612 except (IOError, OSError) as e: 1613 raise ValueError(e) 1614 1615 1616def _copy_utils(dst): 1617 ''' 1618 Copy run utils to the destination directory 1619 ''' 1620 try: 1621 import glob 1622 for f in glob.glob(os.path.join(config.path.sharedir, 'utils/*.py')): 1623 shutil.copy(f, dst) 1624 except (IOError, OSError) as e: 1625 raise ValueError(e) 1626 1627 1628def _create_remote_workdirs(printer, hosts, path, opts): 1629 "Create workdirs on remote hosts" 1630 ok = True 1631 for host, result in _parallax_call(printer, hosts, 1632 "mkdir -p %s" % (os.path.dirname(path)), 1633 opts).items(): 1634 if isinstance(result, parallax.Error): 1635 printer.error(host, "Start: %s" % (result)) 1636 ok = False 1637 if not ok: 1638 msg = "Failed to connect to one or more of these hosts via SSH: %s" % ( 1639 ', '.join(h[0] for h in hosts)) 1640 raise ValueError(msg) 1641 1642 1643def _copy_to_remote_dirs(printer, hosts, path, opts): 1644 "Copy a local folder to same location on remote hosts" 1645 ok = True 1646 for host, result in _parallax_copy(printer, hosts, 1647 path, 1648 path, opts).items(): 1649 if isinstance(result, parallax.Error): 1650 printer.debug("_copy_to_remote_dirs failed: %s, %s, %s" % (hosts, path, opts)) 1651 printer.error(host, result) 1652 ok = False 1653 if not ok: 1654 raise ValueError("Failed when copying script data, aborting.") 1655 return ok 1656 1657 1658def _copy_local(printer, workdir, local_node, src, dst): 1659 ok = True 1660 if local_node and not src.startswith(workdir): 1661 try: 1662 if os.path.abspath(src) != os.path.abspath(dst): 1663 if os.path.isfile(src): 1664 shutil.copy(src, dst) 1665 else: 1666 shutil.copytree(src, dst) 1667 except (IOError, OSError, shutil.Error) as e: 1668 printer.error(local_node, e) 1669 ok = False 1670 return ok 1671 1672 1673def _copy_to_all(printer, workdir, hosts, local_node, src, dst, opts): 1674 """ 1675 Copy src to dst both locally and remotely 1676 """ 1677 ok = True 1678 ret = _parallax_copy(printer, hosts, src, dst, opts) 1679 for host, result in ret.items(): 1680 if isinstance(result, parallax.Error): 1681 printer.error(host, result) 1682 ok = False 1683 else: 1684 rc, out, err = result 1685 if rc != 0: 1686 printer.error(host, err) 1687 ok = False 1688 return ok and _copy_local(printer, workdir, local_node, src, dst) 1689 1690 1691def _clean_parameters(params): 1692 ret = [] 1693 for param in params: 1694 rp = {} 1695 for elem in ('name', 'required', 'unique', 'advanced', 'type', 'example'): 1696 if elem in param: 1697 rp[elem] = param[elem] 1698 if 'shortdesc' in param: 1699 rp['shortdesc'] = _strip(param['shortdesc']) 1700 if 'longdesc' in param: 1701 rp['longdesc'] = format_desc(param['longdesc']) 1702 if 'value' in param: 1703 val = param['value'] 1704 if isinstance(val, Text): 1705 val = val.text 1706 rp['value'] = val 1707 ret.append(rp) 1708 return ret 1709 1710 1711def clean_steps(steps): 1712 ret = [] 1713 for step in steps: 1714 rstep = {} 1715 if 'name' in step: 1716 rstep['name'] = step['name'] 1717 if 'shortdesc' in step: 1718 rstep['shortdesc'] = _strip(step['shortdesc']) 1719 if 'longdesc' in step: 1720 rstep['longdesc'] = format_desc(step['longdesc']) 1721 if 'required' in step: 1722 rstep['required'] = step['required'] 1723 if 'parameters' in step: 1724 rstep['parameters'] = _clean_parameters(step['parameters']) 1725 if 'steps' in step: 1726 rstep['steps'] = clean_steps(step['steps']) 1727 ret.append(rstep) 1728 return ret 1729 1730 1731def clean_run_params(params): 1732 for key, value in params.items(): 1733 if isinstance(value, dict): 1734 clean_run_params(value) 1735 elif Text.isa(value): 1736 params[key] = str(value) 1737 return params 1738 1739 1740def _chmodx(path): 1741 "chmod +x <path>" 1742 mode = os.stat(path).st_mode 1743 mode |= (mode & 0o444) >> 2 1744 os.chmod(path, mode) 1745 1746 1747class RunActions(object): 1748 def __init__(self, printer, script, params, actions, local_node, hosts, opts, workdir): 1749 self.printer = printer 1750 self.script = script 1751 self.data = [clean_run_params(params)] 1752 self.actions = actions 1753 self.local_node = local_node 1754 self.hosts = hosts 1755 self.opts = opts 1756 self.dry_run = params.get('dry_run', False) 1757 self.sudo = params.get('sudo', False) 1758 self.workdir = workdir 1759 self.statefile = os.path.join(self.workdir, 'script.input') 1760 self.dstfile = os.path.join(self.workdir, 'script.input') 1761 self.sudo_pass = None 1762 self.result = None 1763 self.output = None 1764 self.rc = False 1765 1766 def prepare(self, has_remote_actions): 1767 if not self.dry_run: 1768 _create_script_workdir(self.script, self.workdir) 1769 json.dump(self.data, open(self.statefile, 'w')) 1770 _copy_utils(self.workdir) 1771 if has_remote_actions: 1772 _create_remote_workdirs(self.printer, self.hosts, self.workdir, self.opts) 1773 _copy_to_remote_dirs(self.printer, self.hosts, self.workdir, self.opts) 1774 # make sure all path references are relative to the script directory 1775 os.chdir(self.workdir) 1776 1777 def single_action(self, action_index, statefile): 1778 self.statefile = statefile 1779 try: 1780 action_index = int(action_index) - 1 1781 except ValueError: 1782 raise ValueError("action parameter must be an index") 1783 if action_index < 0 or action_index >= len(self.actions): 1784 raise ValueError("action index out of range") 1785 1786 action = self.actions[action_index] 1787 common_debug("Execute: %s" % (action)) 1788 # if this is not the first action, load action data 1789 if action_index != 1: 1790 if not os.path.isfile(statefile): 1791 raise ValueError("No state for action: %s" % (action_index)) 1792 self.data = json.load(open(statefile)) 1793 if Actions.needs_sudo(action): 1794 self._check_sudo_pass() 1795 result = self._run_action(action) 1796 json.dump(self.data, open(self.statefile, 'w')) 1797 return result 1798 1799 def all_actions(self): 1800 # TODO: run asynchronously on remote nodes 1801 # run on remote nodes 1802 # run on local nodes 1803 # TODO: wait for remote results 1804 for action in self.actions: 1805 if Actions.needs_sudo(action): 1806 self._check_sudo_pass() 1807 if not self._run_action(action): 1808 return False 1809 return True 1810 1811 def _update_state(self): 1812 if self.dry_run: 1813 return True 1814 json.dump(self.data, open(self.statefile, 'w')) 1815 return _copy_to_all(self.printer, 1816 self.workdir, 1817 self.hosts, 1818 self.local_node, 1819 self.statefile, 1820 self.dstfile, 1821 self.opts) 1822 1823 def run_command(self, nodes, command, is_json_output): 1824 "called by Actions" 1825 cmdline = 'cd "%s"; ./%s' % (self.workdir, command) 1826 if not self._update_state(): 1827 raise ValueError("Failed when updating input, aborting.") 1828 self.call(nodes, cmdline, is_json_output) 1829 1830 def copy_file(self, nodes, src, dst): 1831 if not self._is_local(nodes): 1832 ok = _copy_to_all(self.printer, 1833 self.workdir, 1834 self.hosts, 1835 self.local_node, 1836 src, 1837 dst, 1838 self.opts) 1839 else: 1840 ok = _copy_local(self.printer, 1841 self.workdir, 1842 self.local_node, 1843 src, 1844 dst) 1845 self.result = '' if ok else None 1846 self.rc = ok 1847 1848 def record_json(self): 1849 "called by Actions" 1850 if self.result is not None: 1851 if not self.result: 1852 self.result = {} 1853 self.data.append(self.result) 1854 self.rc = True 1855 else: 1856 self.rc = False 1857 1858 def validate_json(self): 1859 "called by Actions" 1860 if self.dry_run: 1861 self.rc = True 1862 return 1863 1864 if self.result is not None: 1865 if not self.result: 1866 self.result = '' 1867 self.data.append(self.result) 1868 if isinstance(self.result, dict): 1869 for k, v in self.result.items(): 1870 self.data[0][k] = v 1871 self.rc = True 1872 else: 1873 self.rc = False 1874 1875 def report_result(self): 1876 "called by Actions" 1877 if self.result is not None: 1878 self.output = self.result 1879 self.rc = True 1880 else: 1881 self.rc = False 1882 1883 def _run_action(self, action): 1884 """ 1885 Execute a single action 1886 """ 1887 method = _actions[action['name']] 1888 self.printer.start(action) 1889 try: 1890 self.output = None 1891 self.result = None 1892 self.rc = False 1893 method(Actions(self, action)) 1894 self.printer.finish(action, self.rc, self.output) 1895 return self.rc 1896 finally: 1897 self.printer.flush() 1898 return False 1899 1900 def _check_sudo_pass(self): 1901 if self.sudo and not self.sudo_pass and userdir.getuser() != 'root': 1902 prompt = "sudo password: " 1903 self.sudo_pass = getpass.getpass(prompt=prompt) 1904 1905 def _is_local(self, nodes): 1906 islocal = False 1907 if nodes == 'all': 1908 pass 1909 elif nodes == 'local': 1910 islocal = True 1911 elif nodes is not None and nodes != []: 1912 islocal = nodes == [self.local_node_name()] 1913 else: 1914 islocal = True 1915 self.printer.debug("is_local (%s): %s" % (nodes, islocal)) 1916 return islocal 1917 1918 def call(self, nodes, cmdline, is_json_output=False): 1919 if cmdline.startswith("#!"): 1920 self.execute_shell(nodes or 'all', cmdline) 1921 else: 1922 if not self._is_local(nodes): 1923 self.result = self._process_remote(cmdline, is_json_output) 1924 else: 1925 self.result = self._process_local(cmdline, is_json_output) 1926 self.rc = self.result not in (False, None) 1927 1928 def execute_shell(self, nodes, cmdscript): 1929 """ 1930 execute the shell script... 1931 """ 1932 cmdscript = str(cmdscript).rstrip() + '\n' 1933 if self.dry_run: 1934 self.printer.print_command(nodes, cmdscript) 1935 self.result = '' 1936 self.rc = True 1937 return 1938 elif config.core.debug: 1939 self.printer.print_command(nodes, cmdscript) 1940 1941 tmpf = self.str2tmp(cmdscript) 1942 _chmodx(tmpf) 1943 if not self._is_local(nodes): 1944 ok = _copy_to_remote_dirs(self.printer, 1945 self.hosts, 1946 tmpf, 1947 self.opts) 1948 if not ok: 1949 self.result = False 1950 else: 1951 cmdline = 'cd "%s"; %s' % (self.workdir, tmpf) 1952 self.result = self._process_remote(cmdline, False) 1953 else: 1954 cmdline = 'cd "%s"; %s' % (self.workdir, tmpf) 1955 self.result = self._process_local(cmdline, False) 1956 self.rc = self.result not in (None, False) 1957 1958 def str2tmp(self, s): 1959 """ 1960 Create a temporary file in the temp workdir 1961 Returns path to file 1962 """ 1963 fn = os.path.join(self.workdir, _tempname('str2tmp')) 1964 if self.dry_run: 1965 self.printer.print_command(self.local_node_name(), 'temporary file <<END\n%s\nEND\n' % (s)) 1966 return fn 1967 elif config.core.debug: 1968 self.printer.print_command(self.local_node_name(), 'temporary file <<END\n%s\nEND\n' % (s)) 1969 try: 1970 with open(fn, "w") as f: 1971 f.write(s) 1972 if not s.endswith('\n'): 1973 f.write("\n") 1974 except IOError as msg: 1975 self.printer.error(self.local_node_name(), "Write failed: %s" % (msg)) 1976 return 1977 return fn 1978 1979 def _process_remote(self, cmdline, is_json_output): 1980 """ 1981 Handle an action that executes on all nodes 1982 """ 1983 ok = True 1984 action_result = {} 1985 1986 if self.sudo_pass: 1987 self.opts.input_stream = u'sudo: %s\n' % (self.sudo_pass) 1988 else: 1989 self.opts.input_stream = None 1990 1991 if self.dry_run: 1992 self.printer.print_command(self.hosts, cmdline) 1993 return {} 1994 elif config.core.debug: 1995 self.printer.print_command(self.hosts, cmdline) 1996 1997 for host, result in _parallax_call(self.printer, 1998 self.hosts, 1999 cmdline, 2000 self.opts).items(): 2001 if isinstance(result, parallax.Error): 2002 self.printer.error(host, "Remote error: %s" % (result)) 2003 ok = False 2004 else: 2005 rc, out, err = result 2006 out = utils.to_ascii(out) 2007 if rc != 0: 2008 self.printer.error(host, "Remote error (rc=%s) %s%s" % (rc, out, err)) 2009 ok = False 2010 elif is_json_output: 2011 action_result[host] = json.loads(out) 2012 else: 2013 action_result[host] = out 2014 if self.local_node: 2015 ret = self._process_local(cmdline, False) 2016 if ret is None: 2017 ok = False 2018 elif is_json_output: 2019 action_result[self.local_node_name()] = json.loads(ret) 2020 else: 2021 action_result[self.local_node_name()] = ret 2022 if ok: 2023 self.printer.debug("Result: %s" % repr(action_result)) 2024 return action_result 2025 return None 2026 2027 def _process_local(self, cmdline, is_json_output): 2028 """ 2029 Handle an action that executes locally 2030 """ 2031 if self.sudo_pass: 2032 input_s = u'sudo: %s\n' % (self.sudo_pass) 2033 else: 2034 input_s = None 2035 if self.dry_run: 2036 self.printer.print_command(self.local_node_name(), cmdline) 2037 return {} 2038 elif config.core.debug: 2039 self.printer.print_command(self.local_node_name(), cmdline) 2040 rc, out, err = utils.get_stdout_stderr(cmdline, input_s=input_s, shell=True) 2041 if rc != 0: 2042 self.printer.error(self.local_node_name(), "Error (%d): %s" % (rc, err)) 2043 return None 2044 self.printer.debug("Result(local): %s" % repr(out)) 2045 if is_json_output: 2046 if out != '': 2047 out = json.loads(out) 2048 return out 2049 2050 def local_node_name(self): 2051 if self.local_node: 2052 return self.local_node[0] 2053 return "localhost" 2054 2055 2056def run(script, params, printer): 2057 ''' 2058 Run the given script on the given set of hosts 2059 name: a cluster script is a folder <name> containing a main.yml or main.xml file 2060 params: a tree of parameters 2061 printer: Object that receives and formats output 2062 ''' 2063 workdir = _generate_workdir_name() 2064 # pull out the actions to perform based on the actual 2065 # parameter values (so discard actions conditional on 2066 # conditions that are false) 2067 params = _check_parameters(script, params) 2068 user = params['user'] 2069 port = params['port'] 2070 _filter_dict(params, 'nodes', _filter_nodes, user, port) 2071 _filter_dict(params, 'dry_run', _make_boolean) 2072 _filter_dict(params, 'sudo', _make_boolean) 2073 _filter_dict(params, 'statefile', lambda x: (x and os.path.abspath(x)) or x) 2074 if config.core.debug: 2075 params['debug'] = True 2076 actions = _process_actions(script, params) 2077 name = script['name'] 2078 hosts = params['nodes'] 2079 printer.print_header(script, params, hosts) 2080 local_node, hosts = _extract_localnode(hosts) 2081 opts = _make_options(params) 2082 _set_controlpersist(opts) 2083 2084 dry_run = params.get('dry_run', False) 2085 2086 has_remote_actions = _has_remote_actions(actions) 2087 2088 try: 2089 runner = RunActions(printer, script, params, actions, local_node, hosts, opts, workdir) 2090 runner.prepare(has_remote_actions) 2091 action = params['action'] 2092 statefile = params['statefile'] 2093 if action or statefile: 2094 if not action or not statefile: 2095 raise ValueError("Must set both action and statefile") 2096 return runner.single_action(action, statefile) 2097 else: 2098 return runner.all_actions() 2099 2100 except (OSError, IOError) as e: 2101 import traceback 2102 traceback.print_exc() 2103 raise ValueError("Internal error while running %s: %s" % (name, e)) 2104 finally: 2105 if not dry_run: 2106 if not config.core.debug: 2107 _run_cleanup(printer, has_remote_actions, local_node, hosts, workdir, opts) 2108 elif has_remote_actions: 2109 _print_debug(printer, local_node, hosts, workdir, opts) 2110 else: 2111 _print_debug(printer, local_node, None, workdir, opts) 2112 2113 2114def _remove_empty_lines(txt): 2115 return '\n'.join(line for line in txt.split('\n') if line.strip()) 2116 2117 2118def _process_actions(script, params): 2119 """ 2120 Given parameter values, we can process 2121 all the handles data and generate all the 2122 actions to perform, validate and check conditions. 2123 """ 2124 2125 subactions = {} 2126 values = {} 2127 script['__values__'] = values 2128 2129 for step in script['steps']: 2130 _handles_values(values, script, params, subactions) 2131 if not step.get('required', True) and not params.get(step['name']): 2132 continue 2133 obj = step.get('sub-script') 2134 if obj: 2135 try: 2136 subparams = params.get(step['name'], {}) 2137 subactions[step['name']] = _process_actions(obj, subparams) 2138 except ValueError as err: 2139 raise ValueError("Error in included script %s: %s" % (step['name'], err)) 2140 2141 _handles_values(values, script, params, subactions) 2142 actions = deepcopy(script['actions']) 2143 2144 ret = [] 2145 for action in actions: 2146 name = _find_action(action) 2147 if name is None: 2148 raise ValueError("Unknown action: %s" % (list(action.keys()))) 2149 action['name'] = name 2150 toadd = [] 2151 if name == 'include': 2152 if action['include'] in subactions: 2153 toadd.extend(subactions[action['include']]) 2154 else: 2155 Actions.parse(script, action) 2156 if 'when' in action: 2157 when = str(action['when']).strip() 2158 if when not in (False, None, '', 'false'): 2159 toadd.append(action) 2160 else: 2161 toadd.append(action) 2162 if ret: 2163 for add in toadd: 2164 if Actions.mergeable(add) and ret[-1]['name'] == add['name']: 2165 if not Actions.merge(ret[-1], add): 2166 ret.append(add) 2167 else: 2168 ret.append(add) 2169 else: 2170 ret.extend(toadd) 2171 return ret 2172 2173 2174def verify(script, params, external_check=True): 2175 """ 2176 Verify the given parameter values, reporting 2177 errors where such are detected. 2178 2179 Return a list of actions to perform. 2180 """ 2181 params = _check_parameters(script, params) 2182 actions = _process_actions(script, params) 2183 2184 if external_check and all(action['name'] == 'cib' for action in actions) and utils.is_program('crm'): 2185 errors = set([]) 2186 cmd = ["cib new"] 2187 for action in actions: 2188 cmd.append(_join_script_lines(action['value'])) 2189 cmd.extend(["verify", "commit", "\n"]) 2190 try: 2191 common_debug("Try executing %s" % ("\n".join(cmd))) 2192 rc, out = utils.filter_string(['crm', '-f', '-', 'configure'], "\n".join(cmd).encode('utf-8'), stderr_on='stdout', shell=False) 2193 errm = re.compile(r"^ERROR: \d+: (.*)$") 2194 outp = [] 2195 for l in (out or "").splitlines(): 2196 m = errm.match(l) 2197 if m: 2198 errors.add(m.group(1)) 2199 else: 2200 outp.append(l) 2201 if rc != 0 and len(errors) == 0: 2202 errors.add("Failed to verify (rc=%s): %s" % (rc, "\n".join(outp))) 2203 except OSError as e: 2204 errors.add(str(e)) 2205 if len(errors): 2206 raise ValueError("\n".join(errors)) 2207 2208 return actions 2209 2210 2211def _make_boolean(v): 2212 if isinstance(v, str): 2213 return utils.get_boolean(v) 2214 return v not in (0, False, None) 2215