1# Copyright (C) 2015 Kristoffer Gronlund <kgronlund@suse.com>
2# See COPYING for license information.
3
4import os
5import re
6import subprocess
7import getpass
8import time
9import shutil
10import socket
11import random
12from copy import deepcopy
13from glob import glob
14from lxml import etree
15
16try:
17    import json
18except ImportError:
19    import simplejson as json
20
21
22try:
23    import parallax
24    has_parallax = True
25except ImportError:
26    has_parallax = False
27
28
29from . import config
30from . import handles
31from . import options
32from . import userdir
33from . import utils
34from .msg import err_buf, common_debug
35
36
37_script_cache = None
38_script_version = 2.2
39_strict_handles = False
40
41_action_shortdescs = {
42    'cib': 'Configure cluster resources',
43    'install': 'Install packages',
44    'service': 'Manage system services',
45    'call': 'Run command on nodes',
46    'copy': 'Install file on nodes',
47    'crm': 'Run crm command',
48    'collect': 'Collect data from nodes',
49    'verify': 'Verify collected data',
50    'apply': 'Apply changes to nodes',
51    'apply_local': 'Apply changes to cluster'
52}
53
54
55class Text(object):
56    """
57    Idea: Replace all fields that may contain
58    references to data with Text objects, that
59    lazily resolve when asked to.
60    Context needed is the script in which this
61    Text resolves. What we do is that we install
62    the parameter values in the script, so we can
63    get it from here.
64
65    This can also then be responsible for the
66    various kinds of output cleanup/formatting
67    (desc, cib, etc)
68    """
69    DESC = 1
70    CIB = 2
71    SHORTDESC = 3
72
73    @staticmethod
74    def shortdesc(script, text):
75        return Text(script, text, kind=Text.SHORTDESC)
76
77    @staticmethod
78    def desc(script, text):
79        return Text(script, text, kind=Text.DESC)
80
81    @staticmethod
82    def cib(script, text):
83        return Text(script, text, kind=Text.CIB)
84
85    @staticmethod
86    def isa(obj):
87        return isinstance(obj, str) or isinstance(obj, Text)
88
89    def __init__(self, script, text, kind=None):
90        self.script = script
91        if isinstance(text, Text):
92            self.text = text.text
93        else:
94            self.text = text
95        self._kind = kind
96
97    def _parse(self):
98        val = self.text
99        if val in (True, False):
100            return "true" if val else "false"
101        if not isinstance(val, str):
102            return str(val)
103        return handles.parse(val, self.script.get('__values__', {})).strip()
104
105    def __repr__(self):
106        return repr(self.text)
107
108    def __str__(self):
109        if self._kind == self.DESC:
110            return format_desc(self._parse())
111        elif self._kind == self.SHORTDESC:
112            return self._parse()
113        elif self._kind == self.CIB:
114            return format_cib(self._parse())
115        return self._parse()
116
117    def __eq__(self, obj):
118        return str(self) == str(obj)
119
120
121class WhenExpr(object):
122    def __init__(self, script, prog):
123        self.script = script
124        self.prog = prog
125
126    def __repr__(self):
127        return repr(self.prog)
128
129    def __str__(self):
130        lenv = self.script.get('__values__', {})
131        inp = handles.parse(self.prog, lenv).strip()
132        try:
133            from .minieval import minieval, InvalidExpression
134            return str(minieval(inp, lenv)).lower()
135        except InvalidExpression as err:
136            raise ValueError(str(err))
137
138
139def _strip(desc):
140    if desc is None:
141        return None
142    return desc.strip()
143
144
145def format_desc(desc):
146    import textwrap
147    return '\n\n'.join([textwrap.fill(para) for para in desc.split('\n\n') if para.strip()])
148
149
150def format_cib(text):
151    text = re.sub(r'[ ]+', ' ', text)
152    text = re.sub(r'\n[ \t\f\v]+', '\n\t', text)
153    i = 0
154    while True:
155        i = text.find('\n\t\n')
156        if i < 0:
157            break
158        text = text[:i] + text[i+2:]
159    return text
160
161
162def space_cib(text):
163    """
164    After merging CIB commands, space separate lines out
165    """
166    return re.sub(r'\n([^\t])', r'\n\n\1', re.sub(r'[\n\r]+', r'\n', text))
167
168
169class Actions(object):
170    """
171    Each method in this class handles a particular action.
172    """
173    @staticmethod
174    def parse(script, action):
175        """
176        action: action data (dict)
177        params: flat list of parameter values
178        values: processed list of parameter values (for handles.parse)
179
180        Converts {'cib': "primitive..."} into {"name": "cib", "value": "primitive..."}
181        Each action has two values: "value" may be a non-textual object
182        depending on the type of action. "text" is visual context to display
183        to a user (so a cleaned up CIB, or the list of packages to install)
184        """
185        name = action['name']
186        action['value'] = action[name]
187        del action[name]
188        action['text'] = ''
189        value = action['value']
190        if name == 'install':
191            if Text.isa(value):
192                action['value'] = str(value).split()
193            action['text'] = ' '.join(action['value'])
194        # service takes a list of objects with a single key;
195        # mapping service: state
196        # the text field will be converted to lines where
197        # each line is <service> -> <state>
198        elif name == 'service':
199            if Text.isa(value):
200                value = [dict([v.split(':', 1)]) for v in str(value).split()]
201                action['value'] = value
202
203            def arrow(v):
204                return ' -> '.join(list(v.items())[0])
205            action['text'] = '\n'.join([arrow(x) for x in value])
206        elif name == 'cib' or name == 'crm':
207            action['text'] = str(Text.cib(script, value))
208            action['value'] = _remove_empty_lines(action['text'])
209        elif name == 'call':
210            action['value'] = Text(script, value)
211        elif name == 'copy':
212            action['value'] = Text(script, value)
213            action['template'] = _make_boolean(action.get('template', False))
214            action['to'] = Text(script, action.get('to', action['value']))
215            action['text'] = "%s -> %s" % (action['value'], action['to'])
216
217        if 'shortdesc' not in action:
218            action['shortdesc'] = _action_shortdescs.get(name, '')
219        else:
220            action['shortdesc'] = Text.shortdesc(script, action['shortdesc'])
221        if 'longdesc' not in action:
222            action['longdesc'] = ''
223        else:
224            action['longdesc'] = Text.desc(script, action['longdesc'])
225
226        hre = handles.headmatcher
227        ident_re = re.compile(r'([a-z_-][a-z0-9_-]*)$', re.IGNORECASE)
228
229        if 'when' in action:
230            when = action['when']
231            if ident_re.match(when):
232                action['when'] = Text(script, '{{%s}}' % (when))
233            elif when:
234                action['when'] = WhenExpr(script, when)
235            else:
236                del action['when']
237        for k, v in action.items():
238            if isinstance(v, str) and hre.search(v):
239                v = Text(script, v)
240            if Text.isa(v):
241                action[k] = str(v).strip()
242
243    @staticmethod
244    def mergeable(action):
245        return action['name'] in ('cib', 'crm', 'install', 'service')
246
247    @staticmethod
248    def merge(into, new):
249        """
250        Merge neighbour actions.
251        Note: When this is called, all text values
252        should already be "reduced", that is, any
253        variable references already resolved.
254        """
255        if into.get('nodes') != new.get('nodes'):
256            return False
257        if into['name'] in ('cib', 'crm'):
258            into['value'] = '\n'.join([str(into['value']), str(new['value'])])
259            into['text'] = space_cib('\n'.join([str(into['text']), str(new['text'])]))
260        elif into['name'] == 'service':
261            into['value'].extend(new['value'])
262            into['text'] = '\n'.join([str(into['text']), str(new['text'])])
263        elif into['name'] == 'install':
264            into['value'].extend(new['value'])
265            into['text'] = ' '.join([str(into['text']), str(new['text'])])
266        if new['shortdesc']:
267            newd = str(new['shortdesc'])
268            if newd != str(into['shortdesc']):
269                into['shortdesc'] = _strip(newd)
270        if new['longdesc']:
271            newd = str(new['longdesc'])
272            if newd != str(into['longdesc']):
273                into['longdesc'] = newd
274        return True
275
276    @staticmethod
277    def needs_sudo(action):
278        if action['name'] == 'call':
279            return action.get('sudo') or action.get('nodes') != 'local'
280        return action['name'] in ('apply', 'apply_local', 'install', 'service')
281
282    def __init__(self, run, action):
283        self._run = run
284        self._action = action
285        self._value = action['value']
286        if not isinstance(self._value, list):
287            self._value = str(self._value)
288        self._text = str(action['text'])
289        self._nodes = str(action.get('nodes', ''))
290
291    def collect(self):
292        "input: shell command"
293        self._run.run_command(self._nodes or 'all', self._value, True)
294        self._run.record_json()
295
296    def validate(self):
297        "input: shell command"
298        self._run.run_command(None, self._value, True)
299        self._run.validate_json()
300
301    def apply(self):
302        "input: shell command"
303        self._run.run_command(self._nodes or 'all', self._value, True)
304        self._run.record_json()
305
306    def apply_local(self):
307        "input: shell command"
308        self._run.run_command(None, self._value, True)
309        self._run.record_json()
310
311    def report(self):
312        "input: shell command"
313        self._run.run_command(None, self._value, False)
314        self._run.report_result()
315
316    def call(self):
317        """
318        input: shell command / script
319
320        TODO: actually allow script here
321        """
322        self._run.call(self._nodes, self._value)
323
324    def copy(self):
325        """
326        copy: <from>
327        to: <path>
328        template: true|false
329
330        TODO: FIXME: Verify that it works...
331        TODO: FIXME: Error handling
332        """
333        if not os.path.exists(self._value):
334            raise ValueError("File not found: %s" % (self._value))
335        if self._action['template']:
336            fn = self._run.str2tmp(str(Text.cib(self._run.script, open(self._value).read())))
337            self._value = fn
338        self._run.copy_file(self._nodes, self._value, str(self._action['to']))
339
340    def _crm_do(self, act):
341        fn = self._run.str2tmp(_join_script_lines(self._value))
342        if config.core.debug:
343            args = '-d --wait --no'
344        else:
345            args = '--wait --no'
346        if self._action.get('force'):
347            args = args + ' --force'
348        self._run.call(None, 'crm %s %s %s' % (args, act, fn))
349
350    def crm(self):
351        """
352        input: crm command sequence
353        """
354        return self._crm_do('-f')
355
356    def cib(self):
357        "input: cli configuration script"
358        return self._crm_do('configure load update')
359
360    def install(self):
361        """
362        input: list of packages
363        or: map of <os>: <packages>
364        """
365        self._run.execute_shell(self._nodes or 'all', '''#!/usr/bin/env python3
366import crm_script
367import crm_init
368
369crm_init.install_packages(%s)
370crm_script.exit_ok(True)
371        ''' % (self._value))
372
373    def service(self):
374        values = []
375        for s in self._value:
376            for v in s.items():
377                values.append(v)
378        services = "\n".join([('crm_script.service%s' % repr(v)) for v in values])
379        self._run.execute_shell(self._nodes or 'all', '''#!/usr/bin/env python3
380import crm_script
381import crm_init
382
383%s
384crm_script.exit_ok(True)
385''' % (services))
386
387    def include(self):
388        """
389        Treated differently: at parse time,
390        the include actions should disappear
391        and be replaced with actions generated
392        from the include. Either from an included
393        script, or a cib generated from an agent
394        include.
395        """
396
397
398_actions = dict([(n, getattr(Actions, n)) for n in dir(Actions) if not n.startswith('_')])
399
400
401def _find_action(action):
402    """return name of action for action"""
403    for a in list(_actions.keys()):
404        if a in action:
405            return a
406    return None
407
408
409def _make_options(params):
410    "Setup parallax options."
411    opts = parallax.Options()
412    opts.inline = True
413    opts.timeout = int(params['timeout'])
414    opts.recursive = True
415    opts.ssh_options += [
416        'KbdInteractiveAuthentication=no',
417        'PreferredAuthentications=gssapi-with-mic,gssapi-keyex,hostbased,publickey',
418        'PasswordAuthentication=no',
419        'StrictHostKeyChecking=no',
420        'ControlPersist=no']
421    if options.regression_tests:
422        opts.ssh_extra += ['-vvv']
423    return opts
424
425
426def _parse_yaml(scriptname, scriptfile):
427    data = None
428    try:
429        import yaml
430        with open(scriptfile) as f:
431            data = yaml.load(f, Loader=yaml.SafeLoader)
432            if isinstance(data, list):
433                data = data[0]
434    except ImportError as e:
435        raise ValueError("Failed to load yaml module: %s" % (e))
436    except Exception as e:
437        raise ValueError("Failed to parse script main: %s" % (e))
438
439    if data:
440        ver = data.get('version')
441        if ver is None or str(ver) != str(_script_version):
442            data = _upgrade_yaml(data)
443
444    if 'parameters' in data:
445        data['steps'] = [{'parameters': data['parameters']}]
446        del data['parameters']
447    elif 'steps' not in data:
448        data['steps'] = []
449    data['name'] = scriptname
450    data['dir'] = os.path.dirname(scriptfile)
451    return data
452
453
454def _rename(obj, key, to):
455    if key in obj:
456        obj[to] = obj[key]
457        del obj[key]
458
459
460def _upgrade_yaml(data):
461    """
462    Upgrade a parsed yaml document from
463    an older version.
464    """
465    if 'version' in data and data['version'] > _script_version:
466        raise ValueError("Unknown version (expected < %s, got %s)" % (_script_version, data['version']))
467
468    data['version'] = _script_version
469    data['category'] = data.get('category', 'Legacy')
470    _rename(data, 'name', 'shortdesc')
471    _rename(data, 'description', 'longdesc')
472
473    data['actions'] = data.get('steps', [])
474    paramstep = {'parameters': data.get('parameters', [])}
475    data['steps'] = [paramstep]
476    if 'parameters' in data:
477        del data['parameters']
478
479    for p in paramstep['parameters']:
480        _rename(p, 'description', 'shortdesc')
481        _rename(p, 'default', 'value')
482        if 'required' not in p:
483            p['required'] = 'value' not in p
484
485    for action in data['actions']:
486        _rename(action, 'name', 'shortdesc')
487
488    return data
489
490
491_hawk_template_cache = {}
492
493
494def _parse_hawk_template(workflow, name, kind, step, actions):
495    """
496    Convert a hawk template into steps + a cib action
497    """
498    path = os.path.join(os.path.dirname(workflow), '../templates', kind + '.xml')
499    if path in _hawk_template_cache:
500        xml = _hawk_template_cache[path]
501    elif os.path.isfile(path):
502        xml = etree.parse(path).getroot()
503        common_debug("Found matching template: %s" % (path))
504        _hawk_template_cache[path] = xml
505    else:
506        raise ValueError("Template does not exist: %s" % (path))
507
508    step['shortdesc'] = _strip(''.join(xml.xpath('./shortdesc/text()')))
509    step['longdesc'] = ''.join(xml.xpath('./longdesc/text()'))
510
511    actions.append({'cib': _hawk_to_handles(name, xml.xpath('./crm_script')[0])})
512
513    for item in xml.xpath('./parameters/parameter'):
514        obj = {}
515        obj['name'] = item.get('name')
516        obj['required'] = item.get('required', False)
517        content = next(item.iter('content'))
518        obj['type'] = content.get('type', 'string')
519        val = content.get('default', content.get('value', None))
520        if val:
521            obj['value'] = val
522        obj['shortdesc'] = _strip(''.join(item.xpath('./shortdesc/text()')))
523        obj['longdesc'] = ''.join(item.xpath('./longdesc/text()'))
524        step['parameters'].append(obj)
525
526
527def _mkhandle(pfx, scope, text):
528    if scope:
529        return '{{%s%s:%s}}' % (pfx, scope, text)
530    else:
531        return '{{%s%s}}' % (pfx, text)
532
533
534def _hawk_to_handles(context, tag):
535    """
536    input: a context name to prefix variable references with (may be empty)
537    and a crm_script tag
538    output: text with {{handles}}
539    """
540    s = ""
541    s += tag.text
542    for c in tag:
543        if c.tag == 'if':
544            cond = c.get('set')
545            if cond:
546                s += _mkhandle('#', context, cond)
547                s += _hawk_to_handles(context, c)
548                s += _mkhandle('/', context, cond)
549        elif c.tag == 'insert':
550            param = c.get('param')
551            src = c.get('from_template') or context
552            s += _mkhandle('', src, param)
553        s += c.tail
554    return s
555
556
557def _parse_hawk_workflow(scriptname, scriptfile):
558    """
559    Reads a hawk workflow into a script.
560
561    TODO: Parse hawk workflows that invoke legacy cluster scripts?
562    """
563    xml = etree.parse(scriptfile).getroot()
564    if xml.tag != "workflow":
565        raise ValueError("Not a hawk workflow: %s" % (scriptfile))
566    data = {
567        'version': 2.2,
568        'name': scriptname,
569        'shortdesc': _strip(''.join(xml.xpath('./shortdesc/text()'))),
570        'longdesc': ''.join(xml.xpath('./longdesc/text()')),
571        'category': ''.join(xml.xpath('./@category')) or 'Wizard',
572        'dir': None,
573        'steps': [],
574        'actions': [],
575    }
576
577    # the parameters together form a step with an optional shortdesc
578    # then each template becomes an additional step with an optional shortdesc
579    paramstep = {
580        'shortdesc': _strip(''.join(xml.xpath('./parameters/stepdesc/text()'))),
581        'parameters': []
582    }
583    data['steps'].append(paramstep)
584    for item in xml.xpath('./parameters/parameter'):
585        obj = {}
586        obj['name'] = item.get('name')
587        obj['required'] = item.get('required', False)
588        obj['unique'] = item.get('unique', False)
589        content = next(item.iter('content'))
590        obj['type'] = content.get('type', 'string')
591        val = content.get('default', content.get('value', None))
592        if val is not None:
593            obj['value'] = val
594        obj['shortdesc'] = _strip(''.join(item.xpath('./shortdesc/text()')))
595        obj['longdesc'] = ''.join(item.xpath('./longdesc/text()'))
596        paramstep['parameters'].append(obj)
597
598    data['actions'] = []
599
600    for item in xml.xpath('./templates/template'):
601        templatestep = {
602            'shortdesc': _strip(''.join(item.xpath('./stepdesc/text()'))),
603            'name': item.get('name'),
604            # Optional steps in the legacy wizards was broken (!?)
605            'required': True,  # item.get('required'),
606            'parameters': []
607        }
608        data['steps'].append(templatestep)
609
610        _parse_hawk_template(scriptfile, item.get('name'), item.get('type', item.get('name')),
611                             templatestep, data['actions'])
612        for override in item.xpath('./override'):
613            name = override.get("name")
614            for param in templatestep['parameters']:
615                if param['name'] == name:
616                    param['value'] = override.get("value")
617                    param['required'] = False
618                    break
619
620    data['actions'].append({'cib': _hawk_to_handles('', xml.xpath('./crm_script')[0])})
621
622    if config.core.debug:
623        import pprint
624        print("Parsed hawk workflow:")
625        pprint.pprint(data)
626    return data
627
628
629def build_script_cache():
630    global _script_cache
631    if _script_cache is not None:
632        return
633    _script_cache = {}
634    for d in _script_dirs():
635        if d:
636            for s in glob(os.path.join(d, '*/main.yml')):
637                name = os.path.dirname(s).split('/')[-1]
638                if name not in _script_cache:
639                    _script_cache[name] = os.path.join(d, s)
640            for s in glob(os.path.join(d, '*.yml')):
641                name = os.path.splitext(os.path.basename(s))[0]
642                if name not in _script_cache:
643                    _script_cache[name] = os.path.join(d, s)
644            for s in glob(os.path.join(d, 'workflows/*.xml')):
645                name = os.path.splitext(os.path.basename(s))[0]
646                if name not in _script_cache:
647                    _script_cache[name] = os.path.join(d, s)
648
649
650def list_scripts():
651    '''
652    List the available cluster installation scripts.
653    Yields the names of the main script files.
654    '''
655    build_script_cache()
656    return sorted(_script_cache.keys())
657
658
659def _meta_text(meta, tag):
660    for c in meta.iterchildren(tag):
661        return c.text
662    return ''
663
664
665def _listfind(needle, haystack, keyfn):
666    for x in haystack:
667        if keyfn(x) == needle:
668            return x
669    return None
670
671
672def _listfindpend(needle, haystack, keyfn, orfn):
673    for x in haystack:
674        if keyfn(x) == needle:
675            return x
676    x = orfn()
677    haystack.append(x)
678    return x
679
680
681def _make_cib_for_agent(name, agent, data, ops):
682    aid = "{{%s:id}}" % (name) if name else "{{id}}"
683    template = ['primitive %s %s' % (aid, agent)]
684    params = []
685    ops = [op.strip() for op in ops.split('\n') if op.strip()]
686    for param in data['parameters']:
687        paramname = param['name']
688        if paramname == 'id':
689            # FIXME: What if the resource actually has a parameter named id?
690            continue
691        path = ':'.join((name, paramname)) if name else paramname
692        params.append('{{#%s}}%s="{{%s}}"{{/%s}}' % (path, paramname, path, path))
693    ret = '\n\t'.join(template + params + ops)
694    return ret
695
696
697def _merge_objects(o1, o2):
698    for key, value in o2.items():
699        o1[key] = value
700
701
702def _lookup_step(name, steps, stepname):
703    for step in steps:
704        if step.get('name', '') == stepname:
705            return step
706    if not stepname and len(steps) == 1:
707        return steps[0]
708    if not stepname:
709        raise ValueError("Parameter '%s' not found" % (name))
710    raise ValueError("Referenced step '%s' not found in '%s'" % (stepname, name))
711
712
713def _process_agent_include(script, include):
714    from . import ra
715    agent = include['agent']
716    info = ra.get_ra(agent)
717    meta = info.meta()
718    if meta is None:
719        raise ValueError("No meta-data for agent: %s" % (agent))
720    name = include.get('name', meta.get('name'))
721    if not name:
722        cls, provider, name = ra.disambiguate_ra_type(agent)
723    if 'name' not in include:
724        include['name'] = name
725    step = _listfindpend(name, script['steps'], lambda x: x.get('name'), lambda: {
726        'name': name,
727        'longdesc': '',
728        'shortdesc': '',
729        'parameters': [],
730    })
731    step['longdesc'] = include.get('longdesc') or _meta_text(meta, 'longdesc')
732    step['shortdesc'] = _strip(include.get('shortdesc') or _meta_text(meta, 'shortdesc'))
733    step['required'] = include.get('required', True)
734    step['parameters'].append({
735        'name': 'id',
736        'shortdesc': 'Identifier for the cluster resource',
737        'longdesc': '',
738        'required': True,
739        'unique': True,
740        'type': 'resource',
741    })
742
743    def newparamobj(param):
744        pname = param.get('name')
745        return _listfindpend(pname, step['parameters'], lambda x: x.get('name'), lambda: {'name': pname})
746
747    for param in meta.xpath('./parameters/parameter'):
748        pobj = newparamobj(param)
749        pobj['required'] = _make_boolean(param.get('required', False))
750        pobj['unique'] = _make_boolean(param.get('unique', False))
751        pobj['longdesc'] = _meta_text(param, 'longdesc')
752        pobj['shortdesc'] = _strip(_meta_text(param, 'shortdesc'))
753        # set 'advanced' flag on all non-required agent parameters by default
754        # a UI should hide these parameters unless "show advanced" is set
755        pobj['advanced'] = not pobj['required']
756        ctype = param.xpath('./content/@type')
757        cexample = param.xpath('./content/@default')
758        if ctype:
759            pobj['type'] = ctype[0]
760        if cexample:
761            pobj['example'] = cexample[0]
762
763    for param in include.get('parameters', []):
764        pobj = newparamobj(param)
765        # Make any overriden parameters non-advanced
766        # unless explicitly set to advanced
767        pobj['advanced'] = False
768        for key, value in param.items():
769            if key in ('shortdesc', 'longdesc'):
770                pobj[key] = value
771            elif key == 'value':
772                pobj[key] = Text(script, value)
773            else:
774                pobj[key] = value
775            if 'value' in pobj:
776                pobj['required'] = False
777
778    # If the script doesn't have any base parameters
779    # and the name of this step is the same as the
780    # script name itself, then make this the base step
781    hoist = False
782    hoist_from = None
783    if step['name'] == script['name']:
784        zerostep = _listfind('', script['steps'], lambda x: x.get('name', ''))
785        if not zerostep:
786            hoist = True
787        elif zerostep.get('parameters'):
788            zp = zerostep['parameters']
789            for pname in [p['name'] for p in step['parameters']]:
790                if _listfind(pname, zp, lambda x: x['name']):
791                    break
792            else:
793                hoist, hoist_from = True, zerostep
794
795    # use step['name'] here in case we did the zerostep hoist
796    step['value'] = Text.cib(script, _make_cib_for_agent('' if hoist else step['name'],
797                                                         agent, step, include.get('ops', '')))
798
799    if hoist:
800        step['name'] = ''
801        if hoist_from:
802            step['parameters'] = hoist_from['parameters'] + step['parameters']
803            script['steps'] = [s for s in script['steps'] if s != hoist_from]
804
805    if not step['name']:
806        del step['name']
807
808    # this works despite possible hoist above,
809    # since name is still the actual name
810    for action in script['actions']:
811        if 'include' in action and action['include'] == name:
812            del action['include']
813            action['cib'] = step['value']
814
815
816def _process_script_include(script, include):
817    script_name = include['script']
818    if 'name' not in include:
819        include['name'] = script_name
820    subscript = load_script(script_name)
821    name = include['name']
822
823    scriptstep = {
824        'name': name,
825        'shortdesc': subscript['shortdesc'],
826        'longdesc': subscript['longdesc'],
827        'required': _make_boolean(include.get('required', True)),
828        'steps': deepcopy(subscript['steps']),
829        'sub-script': subscript,
830    }
831
832    def _merge_step_params(step, params):
833        for param in params:
834            _merge_step_param(step, param)
835
836    def _merge_step_param(step, param):
837        for p in step.get('parameters', []):
838            if p['name'] == param['name']:
839                for key, value in param.items():
840                    if key in ('shortdesc', 'longdesc'):
841                        p[key] = value
842                    elif key == 'value' and Text.isa(value):
843                        p[key] = Text(script, value)
844                    else:
845                        p[key] = value
846                if 'value' in p:
847                    p['required'] = False
848                break
849        else:
850            raise ValueError("Referenced parameter '%s' not found in '%s'" % (param['name'], name))
851
852    for incparam in include.get('parameters', []):
853        if 'step' in incparam and 'name' not in incparam:
854            _merge_step_params(_lookup_step(name, scriptstep.get('steps', []), incparam['step']),
855                               incparam['parameters'])
856        else:
857            _merge_step_param(_lookup_step(name, scriptstep.get('steps', []), ''),
858                              incparam)
859
860    script['steps'].append(scriptstep)
861
862
863def _process_include(script, include):
864    """
865    includes add parameter steps and actions
866    an agent include works like a hawk template:
867    it adds a parameter step
868    a script include however adds any number of
869    parameter steps and actions
870
871    OK. here's what to do: Don't rescope the steps
872    and actions. Instead, keep the actions attached
873    to script step 0, as above. And for each step, add
874    a scope which states its scope. Then, when evaluating
875    handles, build custom environments for those scopes to
876    pass into handles.parse.
877
878    This is just for scripts, no need to do this for agents.
879    Of course, how about scripts that include other scripts?
880    _scope has to be a list which gets expanded...
881    """
882    if 'agent' in include:
883        return _process_agent_include(script, include)
884
885    elif 'script' in include:
886        return _process_script_include(script, include)
887    else:
888        raise ValueError("Unknown include type: %s" % (', '.join(list(include.keys()))))
889
890
891def _postprocess_script_step(script, step):
892    if 'name' in step and not step['name']:
893        del step['name']
894    step['required'] = _make_boolean(step.get('required', True))
895    step['shortdesc'] = _strip(step.get('shortdesc', ''))
896    step['longdesc'] = step.get('longdesc', '')
897    for p in step.get('parameters', []):
898        if 'name' not in p:
899            raise ValueError("Parameter has no name: %s" % (list(p.keys())))
900        p['shortdesc'] = _strip(p.get('shortdesc', ''))
901        p['longdesc'] = p.get('longdesc', '')
902        if 'default' in p and 'value' not in p:
903            p['value'] = p['default']
904            del p['default']
905        if 'value' in p:
906            if p['value'] is None:
907                del p['value']
908            elif isinstance(p['value'], str):
909                p['value'] = Text(script, p['value'])
910        if 'required' not in p:
911            p['required'] = False
912        else:
913            p['required'] = _make_boolean(p['required'])
914        if 'advanced' in p:
915            p['advanced'] = _make_boolean(p['advanced'])
916        else:
917            p['advanced'] = False
918        if 'unique' in p:
919            p['unique'] = _make_boolean(p['unique'])
920        else:
921            p['unique'] = False
922        if 'type' not in p or p['type'] == '':
923            if p['name'] == 'id':
924                p['type'] = 'resource'
925            else:
926                p['type'] = 'string'
927    for s in step.get('steps', []):
928        _postprocess_script_step(script, s)
929
930
931def _postprocess_script_steps(script):
932    def empty(step):
933        if 'parameters' in step and len(step['parameters']) > 0:
934            return False
935        if 'steps' in step and len(step['steps']) > 0:
936            return False
937        return True
938
939    script['steps'] = [step for step in script['steps'] if not empty(step)]
940
941    for step in script['steps']:
942        _postprocess_script_step(script, step)
943
944
945def _postprocess_script(script):
946    """
947    Post-process the parsed script into an executable
948    form. This means parsing all included agents and
949    scripts, merging parameters, steps and actions.
950    """
951    ver = script.get('version')
952    if ver is None or str(ver) != str(_script_version):
953        raise ValueError("Unsupported script version (expected %s, got %s)" % (_script_version, repr(ver)))
954
955    if 'category' not in script:
956        script['category'] = 'Custom'
957
958    if 'actions' not in script:
959        script['actions'] = []
960
961        # if we include subscripts but have no defined actions, assume that's a
962        # mistake and generate include actions for all includes
963        for inc in [{"include": inc['name']} for inc in script.get('include', [])]:
964            script['actions'].append(inc)
965
966    _postprocess_script_steps(script)
967
968    # Includes may add steps, or modify parameters,
969    # but assume that any included data is already
970    # postprocessed. To run this before the
971    # step processing would risk replacing Text() objects
972    # with references to other scripts with references
973    # to this script.
974    for inc in script.get('include', []):
975        _process_include(script, inc)
976
977    for action in script['actions']:
978        if 'include' in action:
979            includes = [inc['name'] for inc in script.get('include', [])]
980            if action['include'] not in includes:
981                raise ValueError("Script references '%s', but only includes: %s" %
982                                 (action['include'], ', '.join(includes)))
983
984    if 'include' in script:
985        del script['include']
986
987    def _setdesc(name):
988        desc = script.get(name)
989        if desc is None:
990            desc = ''
991        if not desc:
992            if script['steps'] and script['steps'][0][name]:
993                desc = script['steps'][0][name]
994                script['steps'][0][name] = ''
995        script[name] = desc
996    _setdesc('shortdesc')
997    _setdesc('longdesc')
998
999    return script
1000
1001
1002def _join_script_lines(txt):
1003    s = ""
1004    current_line = ""
1005    for line in [line for line in txt.split('\n')]:
1006        if not line.strip():
1007            pass
1008        elif re.match(r'^\s+\S', line):
1009            current_line += line
1010        else:
1011            if current_line.strip():
1012                s += current_line + "\n"
1013            current_line = line
1014    if current_line:
1015        s += current_line + "\n"
1016    return s
1017
1018
1019def load_script_file(script, filename):
1020    if filename.endswith('.yml'):
1021        parsed = _parse_yaml(script, filename)
1022    elif filename.endswith('.xml'):
1023        parsed = _parse_hawk_workflow(script, filename)
1024    if parsed is None:
1025        raise ValueError("Failed to parse script: %s (%s)" % (script, filename))
1026    obj = _postprocess_script(parsed)
1027    if 'name' in obj:
1028        script = obj['name']
1029    if script not in _script_cache or isinstance(_script_cache[script], str):
1030        _script_cache[script] = obj
1031    return obj
1032
1033
1034def load_script_string(script, yml):
1035    build_script_cache()
1036    import io
1037    import yaml
1038    data = yaml.load(io.StringIO(yml), Loader=yaml.SafeLoader)
1039    if isinstance(data, list):
1040        data = data[0]
1041    if 'parameters' in data:
1042        data['steps'] = [{'parameters': data['parameters']}]
1043        del data['parameters']
1044    elif 'steps' not in data:
1045        data['steps'] = []
1046    data['name'] = script
1047    data['dir'] = None
1048
1049    obj = _postprocess_script(data)
1050    if 'name' in obj:
1051        script = obj['name']
1052    _script_cache[script] = obj
1053    return obj
1054
1055
1056def load_script(script):
1057    build_script_cache()
1058    if script not in _script_cache:
1059        common_debug("cache: %s" % (list(_script_cache.keys())))
1060        raise ValueError("Script not found: %s" % (script))
1061    s = _script_cache[script]
1062    if isinstance(s, str):
1063        try:
1064            return load_script_file(script, s)
1065        except KeyError as err:
1066            raise ValueError("Error when loading script %s: Expected key %s not found" % (script, err))
1067        except Exception as err:
1068            raise ValueError("Error when loading script %s: %s" % (script, err))
1069    return s
1070
1071
1072def _script_dirs():
1073    "list of directories that may contain cluster scripts"
1074    ret = [d for d in options.scriptdir.split(';') if d and os.path.isdir(d)]
1075    return ret + [os.path.join(userdir.CONFIG_HOME, 'scripts'),
1076                  os.path.join(config.path.sharedir, 'scripts'),
1077                  config.path.hawk_wizards]
1078
1079
1080def _check_control_persist():
1081    '''
1082    Checks if ControlPersist is available. If so,
1083    we'll use it to make things faster.
1084    '''
1085    cmd = 'ssh -o ControlPersist'.split()
1086    if options.regression_tests:
1087        print((".EXT", cmd))
1088    cmd = subprocess.Popen(cmd,
1089                           stdout=subprocess.PIPE,
1090                           stderr=subprocess.PIPE)
1091    (out, err) = cmd.communicate()
1092    return "Bad configuration option" not in err
1093
1094
1095def _parallax_call(printer, hosts, cmd, opts):
1096    "parallax.call with debug logging"
1097    printer.debug("parallax.call(%s, %s)" % (repr(hosts), cmd))
1098    return parallax.call(hosts, cmd, opts)
1099
1100
1101def _resolve_script(name):
1102    for p in list_scripts():
1103        if p.endswith('main.yml') and os.path.dirname(p).endswith('/' + name):
1104            return p
1105        elif p.endswith('.yml') and os.path.splitext(os.path.basename(p))[0] == name:
1106            return p
1107        elif p.endswith('.xml') and os.path.splitext(os.path.basename(p))[0] == name:
1108            return p
1109    return None
1110
1111
1112def _parallax_copy(printer, hosts, src, dst, opts):
1113    "parallax.copy with debug logging"
1114    printer.debug("parallax.copy(%s, %s, %s)" % (repr(hosts), src, dst))
1115    return parallax.copy(hosts, src, dst, opts)
1116
1117
1118def _tempname(prefix):
1119    return '%s-%s%s' % (prefix,
1120                        hex(int(time.time()))[2:],
1121                        hex(random.randint(0, 2**48))[2:])
1122
1123
1124def _generate_workdir_name():
1125    '''
1126    Generate a temporary folder name to use while
1127    running the script
1128    '''
1129    # TODO: make use of /tmp configurable
1130    basefile = _tempname('crm-tmp')
1131    basetmp = os.path.join(utils.get_tempdir(), basefile)
1132    if os.path.isdir(basetmp):
1133        raise ValueError("Invalid temporary workdir %s" % (basetmp))
1134    return basetmp
1135
1136
1137def _print_debug(printer, local_node, hosts, workdir, opts):
1138    "Print debug output (if any)"
1139    dbglog = os.path.join(workdir, 'crm_script.debug')
1140    if hosts:
1141        for host, result in _parallax_call(printer, hosts,
1142                                           "if [ -f '%s' ]; then cat '%s'; fi" % (dbglog, dbglog),
1143                                           opts).items():
1144            if isinstance(result, parallax.Error):
1145                printer.error(host, result)
1146            else:
1147                printer.output(host, *result)
1148    if os.path.isfile(dbglog):
1149        f = open(dbglog).read()
1150        printer.output(local_node, 0, f, '')
1151
1152
1153def _cleanup_local(workdir):
1154    "clean up the local tmp dir"
1155    if workdir and os.path.isdir(workdir):
1156        cleanscript = os.path.join(workdir, 'crm_clean.py')
1157        if os.path.isfile(cleanscript):
1158            if subprocess.call([cleanscript, workdir], shell=False) != 0:
1159                shutil.rmtree(workdir)
1160        else:
1161            shutil.rmtree(workdir)
1162
1163
1164def _run_cleanup(printer, has_remote_actions, local_node, hosts, workdir, opts):
1165    "Clean up after the cluster script"
1166    if has_remote_actions and hosts and workdir:
1167        cleanscript = os.path.join(workdir, 'crm_clean.py')
1168        for host, result in _parallax_call(printer, hosts,
1169                                           "%s %s" % (cleanscript,
1170                                                      workdir),
1171                                           opts).items():
1172            if isinstance(result, parallax.Error):
1173                printer.error(host, "Clean: %s" % (result))
1174            else:
1175                printer.output(host, *result)
1176    _cleanup_local(workdir)
1177
1178
1179def _extract_localnode(hosts):
1180    """
1181    Remove loal node from hosts list, so
1182    we can treat it separately
1183    """
1184    this_node = utils.this_node()
1185    hosts2 = []
1186    local_node = None
1187    for h, p, u in hosts:
1188        if h != this_node:
1189            hosts2.append((h, p, u))
1190        else:
1191            local_node = (h, p, u)
1192    err_buf.debug("Local node: %s, Remote hosts: %s" % (
1193        local_node,
1194        ', '.join(h[0] for h in hosts2)))
1195    return local_node, hosts2
1196
1197
1198# TODO: remove common params?
1199# Pass them in a separate list of options?
1200# Right now these names are basically reserved..
1201def common_params():
1202    "Parameters common to all cluster scripts"
1203    return [('nodes', None, 'List of nodes to execute the script for'),
1204            ('dry_run', 'no', 'If set, simulate execution only'),
1205            ('action', None, 'If set, only execute a single action (index, as returned by verify)'),
1206            ('statefile', None, 'When single-stepping, the state is saved in the given file'),
1207            ('user', config.core.user or None, 'Run script as the given user'),
1208            ('sudo', 'no',
1209             'If set, crm will prompt for a sudo password and use sudo when appropriate'),
1210            ('port', None, 'Port to connect on'),
1211            ('timeout', '600', 'Execution timeout in seconds')]
1212
1213
1214def _common_param_default(name):
1215    for param, default, _ in common_params():
1216        if param == name:
1217            return default
1218    return None
1219
1220
1221def _filter_dict(d, name, fn, *args):
1222    'filter the given element in the dict through the function fn'
1223    d[name] = fn(d[name], *args)
1224
1225
1226def _filter_nodes(nodes, user, port):
1227    'filter function for the nodes element'
1228    if nodes:
1229        nodes = nodes.replace(',', ' ').split()
1230    else:
1231        nodes = utils.list_cluster_nodes()
1232    if not nodes:
1233        raise ValueError("No hosts")
1234    nodes = [(node, port or None, user or None) for node in nodes]
1235    return nodes
1236
1237
1238def _scoped_param(context, name):
1239    if context:
1240        return ':'.join(context) + ':' + name
1241    return name
1242
1243
1244def _find_by_name(params, name):
1245    try:
1246        return next(x for x in params if x.get('name') == name)
1247    except StopIteration:
1248        return None
1249
1250
1251_IDENT_RE = re.compile(r'^([a-z0-9_#$-][^\s=]*)$', re.IGNORECASE)
1252
1253
1254def is_valid_ipv4_address(address):
1255    try:
1256        socket.inet_pton(socket.AF_INET, address)
1257    except AttributeError:
1258        try:
1259            socket.inet_aton(address)
1260        except socket.error:
1261            return False
1262        return address.count('.') == 3
1263    except socket.error:  # not a valid address
1264        return False
1265
1266    return True
1267
1268
1269def is_valid_ipv6_address(address):
1270    try:
1271        socket.inet_pton(socket.AF_INET6, address)
1272    except socket.error:  # not a valid address
1273        return False
1274    return True
1275
1276# Types:
1277# OCF types
1278#
1279# string
1280# integer
1281# boolean
1282#
1283# Propose to add
1284# resource ==> a valid resource identifier
1285# ip_address ==> a valid ipv4 or ipv6 address
1286# ip_network ==> a valid ipv4 or ipv6 network (or address without /XX)
1287# port ==> integer between 0 and 65535
1288# email ==> a valid email address
1289
1290# node ==> name of a node in the cluster
1291# select <value>, <value>, <value>, ... ==> any of the values in the list.
1292# range <n> <m> ==> integer in range
1293# rx <rx> ==> anything matching the regular expression.
1294
1295
1296def _valid_integer(value):
1297    try:
1298        return True, int(value, base=0)
1299    except ValueError:
1300        return False, value
1301
1302
1303def _valid_ip(value):
1304    return is_valid_ipv4_address(value) or is_valid_ipv6_address(value)
1305
1306
1307def _verify_type(param, value, errors):
1308    if value is None:
1309        value = ''
1310    vtype = param.get('type')
1311    if not vtype:
1312        return value
1313    elif vtype == 'integer':
1314        ok, _ = _valid_integer(value)
1315        if not ok:
1316            errors.append("%s=%s is not an integer" % (param.get('name'), value))
1317    elif vtype == 'string':
1318        return value
1319    elif vtype == 'boolean':
1320        return "true" if _make_boolean(value) else "false"
1321    elif vtype == 'resource':
1322        try:
1323            if not _IDENT_RE.match(value):
1324                errors.append("%s=%s invalid resource identifier" % (param.get('name'), value))
1325        except TypeError as e:
1326            errors.append("%s=%s %s" % (param.get('name'), value, str(e)))
1327    elif vtype == 'enum':
1328        if 'values' not in param:
1329            errors.append("%s=%s enum without list of values" % (param.get('name'), value))
1330        else:
1331            opts = param['values']
1332            if isinstance(opts, str):
1333                opts = opts.replace(',', ' ').split(' ')
1334            for v in opts:
1335                if value.lower() == v.lower():
1336                    return v
1337            errors.append("%s=%s does not match '%s'" % (param.get('name'), value, "|".join(opts)))
1338    elif vtype == 'ip_address':
1339        if not _valid_ip(value):
1340            errors.append("%s=%s is not an IP address" % (param.get('name'), value))
1341    elif vtype == 'ip_network':
1342        sp = value.rsplit('/', 1)
1343        if len(sp) == 1 and not (is_valid_ipv4_address(value) or is_valid_ipv6_address(value)):
1344            errors.append("%s=%s is not a valid IP network" % (param.get('name'), value))
1345        elif len(sp) == 2 and (not _valid_ip(sp[0]) or not _valid_integer(sp[1])):
1346            errors.append("%s=%s is not a valid IP network" % (param.get('name'), value))
1347        else:
1348            errors.append("%s=%s is not a valid IP network" % (param.get('name'), value))
1349    elif vtype == 'port':
1350        ok, ival = _valid_integer(value)
1351        if not ok:
1352            errors.append("%s=%s is not a valid port" % (param.get('name'), value))
1353        if ival < 0 or ival > 65535:
1354            errors.append("%s=%s is out of port range" % (param.get('name'), value))
1355    elif vtype == 'email':
1356        if not re.match(r'[^@]+@[^@]+', value):
1357            errors.append("%s=%s is not a valid email address" % (param.get('name'), value))
1358    else:
1359        errors.append("%s=%s is unknown type %s" % (param.get('name'), value, vtype))
1360    return value
1361
1362
1363_NO_RESOLVE = object()
1364
1365
1366def _resolve_direct(step, pname, pvalue, path, errors):
1367    step_parameters = step.get('parameters', [])
1368    step_steps = step.get('steps', [])
1369    param = _find_by_name(step_parameters, pname)
1370    if param is not None:
1371        # resolved to a parameter... now verify the value type?
1372        return _verify_type(param, pvalue, errors)
1373    substep = _find_by_name(step_steps, pname)
1374    if substep is not None:
1375        # resolved to a step... recurse
1376        return _resolve_params(substep, pvalue, path + [pname], errors)
1377    return _NO_RESOLVE
1378
1379
1380def _resolve_unnamed_step(step, pname, pvalue, path, errors):
1381    step_steps = step.get('steps', [])
1382    substep = _find_by_name(step_steps, '')
1383    if substep is not None:
1384        return _resolve_direct(substep, pname, pvalue, path, errors)
1385    return _NO_RESOLVE
1386
1387
1388def _resolve_single_step(step, pname, pvalue, path, errors):
1389    step_steps = step.get('steps', [])
1390    if len(step_steps) >= 1:
1391        first_step = step_steps[0]
1392        return _resolve_direct(first_step, pname, pvalue, path + [first_step.get('name')], errors)
1393    return _NO_RESOLVE
1394
1395
1396def _resolve_params(step, params, path, errors):
1397    """
1398    any parameter that doesn't resolve is an error
1399    """
1400    ret = {}
1401
1402    for pname, pvalue in params.items():
1403        result = _resolve_direct(step, pname, pvalue, path, errors)
1404        if result is not _NO_RESOLVE:
1405            ret[pname] = result
1406            continue
1407
1408        result = _resolve_unnamed_step(step, pname, pvalue, path, errors)
1409        if result is not _NO_RESOLVE:
1410            ret[pname] = result
1411            continue
1412
1413        result = _resolve_single_step(step, pname, pvalue, path, errors)
1414        if result is not _NO_RESOLVE:
1415            stepname = step['steps'][0].get('name', '')
1416            if stepname not in ret:
1417                ret[stepname] = {}
1418            ret[stepname][pname] = result
1419            ret[pname] = result
1420            continue
1421
1422        errors.append("Unknown parameter %s" % (':'.join(path + [pname])))
1423
1424    return ret
1425
1426
1427def _check_parameters(script, params):
1428    '''
1429    1. Fill in values where none are supplied and there's a value
1430    in the step data
1431    2. Check missing values
1432    3. For each input parameter: look it up and adjust the path
1433    '''
1434    errors = []
1435    # params = deepcopy(params)
1436    # recursively resolve parameters: report
1437    # error if a parameter can't be resolved
1438    # TODO: move "common params" out of the params dict completely
1439    # pass as flags to command line
1440
1441    def _split_commons(params):
1442        ret, cdict = {}, dict([(c, d) for c, d, _ in common_params()])
1443        for key, value in params.items():
1444            if key in cdict:
1445                cdict[key] = value
1446            else:
1447                ret[key] = deepcopy(value)
1448        return ret, cdict
1449
1450    params, commons = _split_commons(params)
1451    params = _resolve_params(script, params, [], errors)
1452
1453    if errors:
1454        raise ValueError('\n'.join(errors))
1455
1456    for key, value in commons.items():
1457        params[key] = value
1458
1459    def _fill_values(path, into, source, srcreq):
1460        """
1461        Copy values into into while checking for missing required parameters.
1462        If into has content, all required parameters ARE required, even if the
1463        whole step is not required (since we're supplying it). This is checked
1464        by checking if the step is not required, but there are some parameters
1465        set by the user anyway.
1466        """
1467        if 'required' in source:
1468            srcreq = (source['required'] and srcreq) or (into and srcreq)
1469
1470        for param in source.get('parameters', []):
1471            if param['name'] not in into:
1472                if 'value' in param:
1473                    into[param['name']] = param['value']
1474                elif srcreq and param['required']:
1475                    errors.append(_scoped_param(path, param['name']))
1476
1477        for step in source.get('steps', []):
1478            required = step.get('required', True)
1479            if not required and step['name'] not in into:
1480                continue
1481            if not required and step['name'] in into and into[step['name']]:
1482                required = True
1483            if 'name' not in step:
1484                _fill_values(path, into, step, required and srcreq)
1485            else:
1486                if step['name'] not in into:
1487                    into[step['name']] = {}
1488                _fill_values(path + [step['name']], into[step['name']], step, required and srcreq)
1489
1490    _fill_values([], params, script, True)
1491
1492    if errors:
1493        raise ValueError("Missing required parameter(s): %s" % (', '.join(errors)))
1494
1495    # if config.core.debug:
1496    #    from pprint import pprint
1497    #    print("Checked script parameters:")
1498    #    pprint(params)
1499    return params
1500
1501
1502def _handles_values(ret, script, params, subactions):
1503    """
1504    Generate a values structure that the handles
1505    templates understands.
1506    """
1507    def _process(to, context, params):
1508        """
1509        to: level writing to
1510        context: source step
1511        params: values for step
1512        """
1513        for key, value in params.items():
1514            if not isinstance(value, dict):
1515                to[key] = value
1516
1517        for step in context.get('steps', []):
1518            name = step.get('name', '')
1519            if name:
1520                if step['required'] or name in params:
1521                    obj = {}
1522                    vobj = handles.value(obj, '')
1523                    to[name] = vobj
1524                    subaction = None
1525                    if step.get('sub-script'):
1526                        subaction = subactions.get(step['sub-script']['name'])
1527                    if subaction and subaction[-1]['name'] == 'cib':
1528                        vobj.value = Text.cib(script, subaction[-1]['value'])
1529                    else:
1530                        vobj.value = Text.cib(script, step.get('value', vobj.value))
1531
1532                    _process(obj, step, params.get(name, {}))
1533            else:
1534                _process(to, step, params)
1535
1536    _process(ret, script, params)
1537
1538
1539def _has_remote_actions(actions):
1540    """
1541    True if any actions execute on remote nodes
1542    """
1543    for action in actions:
1544        if action['name'] in ('collect', 'apply', 'install', 'service', 'copy'):
1545            return True
1546        if action.get('nodes') == 'all':
1547            return True
1548    return False
1549
1550
1551def _set_controlpersist(opts):
1552    # _has_controlpersist = _check_control_persist()
1553    # if _has_controlpersist:
1554    #    opts.ssh_options += ["ControlMaster=auto",
1555    #                         "ControlPersist=30s",
1556    #                         "ControlPath=/tmp/crm-ssh-%r@%h:%p"]
1557    # unfortunately, due to bad interaction between parallax and ssh,
1558    # ControlPersist is broken
1559    # See: http://code.google.com/p/parallel-ssh/issues/detail?id=67
1560    # Supposedly fixed in openssh 6.3, but isn't: This may be an
1561    # issue in parallel-ssh, not openssh
1562    pass
1563
1564
1565def _flatten_parameters(steps):
1566    pret = []
1567    for step in steps:
1568        stepname = step.get('name', '')
1569        for param in step.get('parameters', []):
1570            if stepname:
1571                pret.append('%s:%s' % (stepname, param['name']))
1572            else:
1573                pret.append(param['name'])
1574    return pret
1575
1576
1577def param_completion_list(name):
1578    """
1579    Returns completions for the given script
1580    """
1581    try:
1582        script = load_script(name)
1583        params = _flatten_parameters(script.get('steps', []))
1584        ps = [p['name'] + '=' for p in params]
1585        return ps
1586    except Exception:
1587        return []
1588
1589
1590def _create_script_workdir(script, workdir):
1591    "Create workdir and copy contents of scriptdir into it"
1592    scriptdir = script['dir']
1593    try:
1594        if scriptdir is not None:
1595            if os.path.basename(scriptdir) == script['name']:
1596                cmd = ["mkdir", "-p", os.path.dirname(workdir)]
1597            else:
1598                cmd = ["mkdir", "-p", workdir]
1599            if options.regression_tests:
1600                print(".EXT", cmd)
1601            if subprocess.call(cmd, shell=False) != 0:
1602                raise ValueError("Failed to create temporary working directory")
1603            # only copytree if script is a dir
1604            if os.path.basename(scriptdir) == script['name']:
1605                shutil.copytree(scriptdir, workdir)
1606        else:
1607            cmd = ["mkdir", "-p", workdir]
1608            if options.regression_tests:
1609                print(".EXT", cmd)
1610            if subprocess.call(cmd, shell=False) != 0:
1611                raise ValueError("Failed to create temporary working directory")
1612    except (IOError, OSError) as e:
1613        raise ValueError(e)
1614
1615
1616def _copy_utils(dst):
1617    '''
1618    Copy run utils to the destination directory
1619    '''
1620    try:
1621        import glob
1622        for f in glob.glob(os.path.join(config.path.sharedir, 'utils/*.py')):
1623            shutil.copy(f, dst)
1624    except (IOError, OSError) as e:
1625        raise ValueError(e)
1626
1627
1628def _create_remote_workdirs(printer, hosts, path, opts):
1629    "Create workdirs on remote hosts"
1630    ok = True
1631    for host, result in _parallax_call(printer, hosts,
1632                                       "mkdir -p %s" % (os.path.dirname(path)),
1633                                       opts).items():
1634        if isinstance(result, parallax.Error):
1635            printer.error(host, "Start: %s" % (result))
1636            ok = False
1637    if not ok:
1638        msg = "Failed to connect to one or more of these hosts via SSH: %s" % (
1639            ', '.join(h[0] for h in hosts))
1640        raise ValueError(msg)
1641
1642
1643def _copy_to_remote_dirs(printer, hosts, path, opts):
1644    "Copy a local folder to same location on remote hosts"
1645    ok = True
1646    for host, result in _parallax_copy(printer, hosts,
1647                                       path,
1648                                       path, opts).items():
1649        if isinstance(result, parallax.Error):
1650            printer.debug("_copy_to_remote_dirs failed: %s, %s, %s" % (hosts, path, opts))
1651            printer.error(host, result)
1652            ok = False
1653    if not ok:
1654        raise ValueError("Failed when copying script data, aborting.")
1655    return ok
1656
1657
1658def _copy_local(printer, workdir, local_node, src, dst):
1659    ok = True
1660    if local_node and not src.startswith(workdir):
1661        try:
1662            if os.path.abspath(src) != os.path.abspath(dst):
1663                if os.path.isfile(src):
1664                    shutil.copy(src, dst)
1665                else:
1666                    shutil.copytree(src, dst)
1667        except (IOError, OSError, shutil.Error) as e:
1668            printer.error(local_node, e)
1669            ok = False
1670    return ok
1671
1672
1673def _copy_to_all(printer, workdir, hosts, local_node, src, dst, opts):
1674    """
1675    Copy src to dst both locally and remotely
1676    """
1677    ok = True
1678    ret = _parallax_copy(printer, hosts, src, dst, opts)
1679    for host, result in ret.items():
1680        if isinstance(result, parallax.Error):
1681            printer.error(host, result)
1682            ok = False
1683        else:
1684            rc, out, err = result
1685            if rc != 0:
1686                printer.error(host, err)
1687                ok = False
1688    return ok and _copy_local(printer, workdir, local_node, src, dst)
1689
1690
1691def _clean_parameters(params):
1692    ret = []
1693    for param in params:
1694        rp = {}
1695        for elem in ('name', 'required', 'unique', 'advanced', 'type', 'example'):
1696            if elem in param:
1697                rp[elem] = param[elem]
1698        if 'shortdesc' in param:
1699            rp['shortdesc'] = _strip(param['shortdesc'])
1700        if 'longdesc' in param:
1701            rp['longdesc'] = format_desc(param['longdesc'])
1702        if 'value' in param:
1703            val = param['value']
1704            if isinstance(val, Text):
1705                val = val.text
1706            rp['value'] = val
1707        ret.append(rp)
1708    return ret
1709
1710
1711def clean_steps(steps):
1712    ret = []
1713    for step in steps:
1714        rstep = {}
1715        if 'name' in step:
1716            rstep['name'] = step['name']
1717        if 'shortdesc' in step:
1718            rstep['shortdesc'] = _strip(step['shortdesc'])
1719        if 'longdesc' in step:
1720            rstep['longdesc'] = format_desc(step['longdesc'])
1721        if 'required' in step:
1722            rstep['required'] = step['required']
1723        if 'parameters' in step:
1724            rstep['parameters'] = _clean_parameters(step['parameters'])
1725        if 'steps' in step:
1726            rstep['steps'] = clean_steps(step['steps'])
1727        ret.append(rstep)
1728    return ret
1729
1730
1731def clean_run_params(params):
1732    for key, value in params.items():
1733        if isinstance(value, dict):
1734            clean_run_params(value)
1735        elif Text.isa(value):
1736            params[key] = str(value)
1737    return params
1738
1739
1740def _chmodx(path):
1741    "chmod +x <path>"
1742    mode = os.stat(path).st_mode
1743    mode |= (mode & 0o444) >> 2
1744    os.chmod(path, mode)
1745
1746
1747class RunActions(object):
1748    def __init__(self, printer, script, params, actions, local_node, hosts, opts, workdir):
1749        self.printer = printer
1750        self.script = script
1751        self.data = [clean_run_params(params)]
1752        self.actions = actions
1753        self.local_node = local_node
1754        self.hosts = hosts
1755        self.opts = opts
1756        self.dry_run = params.get('dry_run', False)
1757        self.sudo = params.get('sudo', False)
1758        self.workdir = workdir
1759        self.statefile = os.path.join(self.workdir, 'script.input')
1760        self.dstfile = os.path.join(self.workdir, 'script.input')
1761        self.sudo_pass = None
1762        self.result = None
1763        self.output = None
1764        self.rc = False
1765
1766    def prepare(self, has_remote_actions):
1767        if not self.dry_run:
1768            _create_script_workdir(self.script, self.workdir)
1769            json.dump(self.data, open(self.statefile, 'w'))
1770            _copy_utils(self.workdir)
1771            if has_remote_actions:
1772                _create_remote_workdirs(self.printer, self.hosts, self.workdir, self.opts)
1773                _copy_to_remote_dirs(self.printer, self.hosts, self.workdir, self.opts)
1774            # make sure all path references are relative to the script directory
1775            os.chdir(self.workdir)
1776
1777    def single_action(self, action_index, statefile):
1778        self.statefile = statefile
1779        try:
1780            action_index = int(action_index) - 1
1781        except ValueError:
1782            raise ValueError("action parameter must be an index")
1783        if action_index < 0 or action_index >= len(self.actions):
1784            raise ValueError("action index out of range")
1785
1786        action = self.actions[action_index]
1787        common_debug("Execute: %s" % (action))
1788        # if this is not the first action, load action data
1789        if action_index != 1:
1790            if not os.path.isfile(statefile):
1791                raise ValueError("No state for action: %s" % (action_index))
1792            self.data = json.load(open(statefile))
1793        if Actions.needs_sudo(action):
1794            self._check_sudo_pass()
1795        result = self._run_action(action)
1796        json.dump(self.data, open(self.statefile, 'w'))
1797        return result
1798
1799    def all_actions(self):
1800        # TODO: run asynchronously on remote nodes
1801        # run on remote nodes
1802        # run on local nodes
1803        # TODO: wait for remote results
1804        for action in self.actions:
1805            if Actions.needs_sudo(action):
1806                self._check_sudo_pass()
1807            if not self._run_action(action):
1808                return False
1809        return True
1810
1811    def _update_state(self):
1812        if self.dry_run:
1813            return True
1814        json.dump(self.data, open(self.statefile, 'w'))
1815        return _copy_to_all(self.printer,
1816                            self.workdir,
1817                            self.hosts,
1818                            self.local_node,
1819                            self.statefile,
1820                            self.dstfile,
1821                            self.opts)
1822
1823    def run_command(self, nodes, command, is_json_output):
1824        "called by Actions"
1825        cmdline = 'cd "%s"; ./%s' % (self.workdir, command)
1826        if not self._update_state():
1827            raise ValueError("Failed when updating input, aborting.")
1828        self.call(nodes, cmdline, is_json_output)
1829
1830    def copy_file(self, nodes, src, dst):
1831        if not self._is_local(nodes):
1832            ok = _copy_to_all(self.printer,
1833                              self.workdir,
1834                              self.hosts,
1835                              self.local_node,
1836                              src,
1837                              dst,
1838                              self.opts)
1839        else:
1840            ok = _copy_local(self.printer,
1841                             self.workdir,
1842                             self.local_node,
1843                             src,
1844                             dst)
1845        self.result = '' if ok else None
1846        self.rc = ok
1847
1848    def record_json(self):
1849        "called by Actions"
1850        if self.result is not None:
1851            if not self.result:
1852                self.result = {}
1853            self.data.append(self.result)
1854            self.rc = True
1855        else:
1856            self.rc = False
1857
1858    def validate_json(self):
1859        "called by Actions"
1860        if self.dry_run:
1861            self.rc = True
1862            return
1863
1864        if self.result is not None:
1865            if not self.result:
1866                self.result = ''
1867            self.data.append(self.result)
1868            if isinstance(self.result, dict):
1869                for k, v in self.result.items():
1870                    self.data[0][k] = v
1871            self.rc = True
1872        else:
1873            self.rc = False
1874
1875    def report_result(self):
1876        "called by Actions"
1877        if self.result is not None:
1878            self.output = self.result
1879            self.rc = True
1880        else:
1881            self.rc = False
1882
1883    def _run_action(self, action):
1884        """
1885        Execute a single action
1886        """
1887        method = _actions[action['name']]
1888        self.printer.start(action)
1889        try:
1890            self.output = None
1891            self.result = None
1892            self.rc = False
1893            method(Actions(self, action))
1894            self.printer.finish(action, self.rc, self.output)
1895            return self.rc
1896        finally:
1897            self.printer.flush()
1898        return False
1899
1900    def _check_sudo_pass(self):
1901        if self.sudo and not self.sudo_pass and userdir.getuser() != 'root':
1902            prompt = "sudo password: "
1903            self.sudo_pass = getpass.getpass(prompt=prompt)
1904
1905    def _is_local(self, nodes):
1906        islocal = False
1907        if nodes == 'all':
1908            pass
1909        elif nodes == 'local':
1910            islocal = True
1911        elif nodes is not None and nodes != []:
1912            islocal = nodes == [self.local_node_name()]
1913        else:
1914            islocal = True
1915        self.printer.debug("is_local (%s): %s" % (nodes, islocal))
1916        return islocal
1917
1918    def call(self, nodes, cmdline, is_json_output=False):
1919        if cmdline.startswith("#!"):
1920            self.execute_shell(nodes or 'all', cmdline)
1921        else:
1922            if not self._is_local(nodes):
1923                self.result = self._process_remote(cmdline, is_json_output)
1924            else:
1925                self.result = self._process_local(cmdline, is_json_output)
1926            self.rc = self.result not in (False, None)
1927
1928    def execute_shell(self, nodes, cmdscript):
1929        """
1930        execute the shell script...
1931        """
1932        cmdscript = str(cmdscript).rstrip() + '\n'
1933        if self.dry_run:
1934            self.printer.print_command(nodes, cmdscript)
1935            self.result = ''
1936            self.rc = True
1937            return
1938        elif config.core.debug:
1939            self.printer.print_command(nodes, cmdscript)
1940
1941        tmpf = self.str2tmp(cmdscript)
1942        _chmodx(tmpf)
1943        if not self._is_local(nodes):
1944            ok = _copy_to_remote_dirs(self.printer,
1945                                      self.hosts,
1946                                      tmpf,
1947                                      self.opts)
1948            if not ok:
1949                self.result = False
1950            else:
1951                cmdline = 'cd "%s"; %s' % (self.workdir, tmpf)
1952                self.result = self._process_remote(cmdline, False)
1953        else:
1954            cmdline = 'cd "%s"; %s' % (self.workdir, tmpf)
1955            self.result = self._process_local(cmdline, False)
1956        self.rc = self.result not in (None, False)
1957
1958    def str2tmp(self, s):
1959        """
1960        Create a temporary file in the temp workdir
1961        Returns path to file
1962        """
1963        fn = os.path.join(self.workdir, _tempname('str2tmp'))
1964        if self.dry_run:
1965            self.printer.print_command(self.local_node_name(), 'temporary file <<END\n%s\nEND\n' % (s))
1966            return fn
1967        elif config.core.debug:
1968            self.printer.print_command(self.local_node_name(), 'temporary file <<END\n%s\nEND\n' % (s))
1969        try:
1970            with open(fn, "w") as f:
1971                f.write(s)
1972                if not s.endswith('\n'):
1973                    f.write("\n")
1974        except IOError as msg:
1975            self.printer.error(self.local_node_name(), "Write failed: %s" % (msg))
1976            return
1977        return fn
1978
1979    def _process_remote(self, cmdline, is_json_output):
1980        """
1981        Handle an action that executes on all nodes
1982        """
1983        ok = True
1984        action_result = {}
1985
1986        if self.sudo_pass:
1987            self.opts.input_stream = u'sudo: %s\n' % (self.sudo_pass)
1988        else:
1989            self.opts.input_stream = None
1990
1991        if self.dry_run:
1992            self.printer.print_command(self.hosts, cmdline)
1993            return {}
1994        elif config.core.debug:
1995            self.printer.print_command(self.hosts, cmdline)
1996
1997        for host, result in _parallax_call(self.printer,
1998                                           self.hosts,
1999                                           cmdline,
2000                                           self.opts).items():
2001            if isinstance(result, parallax.Error):
2002                self.printer.error(host, "Remote error: %s" % (result))
2003                ok = False
2004            else:
2005                rc, out, err = result
2006                out = utils.to_ascii(out)
2007                if rc != 0:
2008                    self.printer.error(host, "Remote error (rc=%s) %s%s" % (rc, out, err))
2009                    ok = False
2010                elif is_json_output:
2011                    action_result[host] = json.loads(out)
2012                else:
2013                    action_result[host] = out
2014        if self.local_node:
2015            ret = self._process_local(cmdline, False)
2016            if ret is None:
2017                ok = False
2018            elif is_json_output:
2019                action_result[self.local_node_name()] = json.loads(ret)
2020            else:
2021                action_result[self.local_node_name()] = ret
2022        if ok:
2023            self.printer.debug("Result: %s" % repr(action_result))
2024            return action_result
2025        return None
2026
2027    def _process_local(self, cmdline, is_json_output):
2028        """
2029        Handle an action that executes locally
2030        """
2031        if self.sudo_pass:
2032            input_s = u'sudo: %s\n' % (self.sudo_pass)
2033        else:
2034            input_s = None
2035        if self.dry_run:
2036            self.printer.print_command(self.local_node_name(), cmdline)
2037            return {}
2038        elif config.core.debug:
2039            self.printer.print_command(self.local_node_name(), cmdline)
2040        rc, out, err = utils.get_stdout_stderr(cmdline, input_s=input_s, shell=True)
2041        if rc != 0:
2042            self.printer.error(self.local_node_name(), "Error (%d): %s" % (rc, err))
2043            return None
2044        self.printer.debug("Result(local): %s" % repr(out))
2045        if is_json_output:
2046            if out != '':
2047                out = json.loads(out)
2048        return out
2049
2050    def local_node_name(self):
2051        if self.local_node:
2052            return self.local_node[0]
2053        return "localhost"
2054
2055
2056def run(script, params, printer):
2057    '''
2058    Run the given script on the given set of hosts
2059    name: a cluster script is a folder <name> containing a main.yml or main.xml file
2060    params: a tree of parameters
2061    printer: Object that receives and formats output
2062    '''
2063    workdir = _generate_workdir_name()
2064    # pull out the actions to perform based on the actual
2065    # parameter values (so discard actions conditional on
2066    # conditions that are false)
2067    params = _check_parameters(script, params)
2068    user = params['user']
2069    port = params['port']
2070    _filter_dict(params, 'nodes', _filter_nodes, user, port)
2071    _filter_dict(params, 'dry_run', _make_boolean)
2072    _filter_dict(params, 'sudo', _make_boolean)
2073    _filter_dict(params, 'statefile', lambda x: (x and os.path.abspath(x)) or x)
2074    if config.core.debug:
2075        params['debug'] = True
2076    actions = _process_actions(script, params)
2077    name = script['name']
2078    hosts = params['nodes']
2079    printer.print_header(script, params, hosts)
2080    local_node, hosts = _extract_localnode(hosts)
2081    opts = _make_options(params)
2082    _set_controlpersist(opts)
2083
2084    dry_run = params.get('dry_run', False)
2085
2086    has_remote_actions = _has_remote_actions(actions)
2087
2088    try:
2089        runner = RunActions(printer, script, params, actions, local_node, hosts, opts, workdir)
2090        runner.prepare(has_remote_actions)
2091        action = params['action']
2092        statefile = params['statefile']
2093        if action or statefile:
2094            if not action or not statefile:
2095                raise ValueError("Must set both action and statefile")
2096            return runner.single_action(action, statefile)
2097        else:
2098            return runner.all_actions()
2099
2100    except (OSError, IOError) as e:
2101        import traceback
2102        traceback.print_exc()
2103        raise ValueError("Internal error while running %s: %s" % (name, e))
2104    finally:
2105        if not dry_run:
2106            if not config.core.debug:
2107                _run_cleanup(printer, has_remote_actions, local_node, hosts, workdir, opts)
2108            elif has_remote_actions:
2109                _print_debug(printer, local_node, hosts, workdir, opts)
2110            else:
2111                _print_debug(printer, local_node, None, workdir, opts)
2112
2113
2114def _remove_empty_lines(txt):
2115    return '\n'.join(line for line in txt.split('\n') if line.strip())
2116
2117
2118def _process_actions(script, params):
2119    """
2120    Given parameter values, we can process
2121    all the handles data and generate all the
2122    actions to perform, validate and check conditions.
2123    """
2124
2125    subactions = {}
2126    values = {}
2127    script['__values__'] = values
2128
2129    for step in script['steps']:
2130        _handles_values(values, script, params, subactions)
2131        if not step.get('required', True) and not params.get(step['name']):
2132            continue
2133        obj = step.get('sub-script')
2134        if obj:
2135            try:
2136                subparams = params.get(step['name'], {})
2137                subactions[step['name']] = _process_actions(obj, subparams)
2138            except ValueError as err:
2139                raise ValueError("Error in included script %s: %s" % (step['name'], err))
2140
2141    _handles_values(values, script, params, subactions)
2142    actions = deepcopy(script['actions'])
2143
2144    ret = []
2145    for action in actions:
2146        name = _find_action(action)
2147        if name is None:
2148            raise ValueError("Unknown action: %s" % (list(action.keys())))
2149        action['name'] = name
2150        toadd = []
2151        if name == 'include':
2152            if action['include'] in subactions:
2153                toadd.extend(subactions[action['include']])
2154        else:
2155            Actions.parse(script, action)
2156            if 'when' in action:
2157                when = str(action['when']).strip()
2158                if when not in (False, None, '', 'false'):
2159                    toadd.append(action)
2160            else:
2161                toadd.append(action)
2162        if ret:
2163            for add in toadd:
2164                if Actions.mergeable(add) and ret[-1]['name'] == add['name']:
2165                    if not Actions.merge(ret[-1], add):
2166                        ret.append(add)
2167                else:
2168                    ret.append(add)
2169        else:
2170            ret.extend(toadd)
2171    return ret
2172
2173
2174def verify(script, params, external_check=True):
2175    """
2176    Verify the given parameter values, reporting
2177    errors where such are detected.
2178
2179    Return a list of actions to perform.
2180    """
2181    params = _check_parameters(script, params)
2182    actions = _process_actions(script, params)
2183
2184    if external_check and all(action['name'] == 'cib' for action in actions) and utils.is_program('crm'):
2185        errors = set([])
2186        cmd = ["cib new"]
2187        for action in actions:
2188            cmd.append(_join_script_lines(action['value']))
2189        cmd.extend(["verify", "commit", "\n"])
2190        try:
2191            common_debug("Try executing %s" % ("\n".join(cmd)))
2192            rc, out = utils.filter_string(['crm', '-f', '-', 'configure'], "\n".join(cmd).encode('utf-8'), stderr_on='stdout', shell=False)
2193            errm = re.compile(r"^ERROR: \d+: (.*)$")
2194            outp = []
2195            for l in (out or "").splitlines():
2196                m = errm.match(l)
2197                if m:
2198                    errors.add(m.group(1))
2199                else:
2200                    outp.append(l)
2201            if rc != 0 and len(errors) == 0:
2202                errors.add("Failed to verify (rc=%s): %s" % (rc, "\n".join(outp)))
2203        except OSError as e:
2204            errors.add(str(e))
2205        if len(errors):
2206            raise ValueError("\n".join(errors))
2207
2208    return actions
2209
2210
2211def _make_boolean(v):
2212    if isinstance(v, str):
2213        return utils.get_boolean(v)
2214    return v not in (0, False, None)
2215