1# Copyright (C) 2016 Kristoffer Gronlund <kgronlund@suse.com>
2# See COPYING for license information.
3#
4# Bootstrap:
5#
6# Supersedes and replaces both the init/add/remove cluster scripts,
7# and the ha-cluster-bootstrap scripts.
8#
9# Implemented as a straight-forward set of python functions for
10# simplicity and flexibility.
11#
12# TODO: Make csync2 usage optional
13# TODO: Configuration file for bootstrap?
14
15import os
16import sys
17import random
18import re
19import time
20import readline
21import shutil
22from string import Template
23from lxml import etree
24from pathlib import Path
25from enum import Enum
26from contextlib import contextmanager
27from . import config
28from . import utils
29from . import xmlutil
30from .cibconfig import mkset_obj, cib_factory
31from . import corosync
32from . import tmpfiles
33from . import clidisplay
34from . import term
35from . import lock
36from . import userdir
37from .constants import SSH_OPTION, QDEVICE_HELP_INFO
38from . import ocfs2
39
40
41
42LOG_FILE = "/var/log/crmsh/ha-cluster-bootstrap.log"
43CSYNC2_KEY = "/etc/csync2/key_hagroup"
44CSYNC2_CFG = "/etc/csync2/csync2.cfg"
45COROSYNC_AUTH = "%%PREFIX%%/etc/corosync/authkey"
46SYSCONFIG_SBD = "/etc/sysconfig/sbd"
47SYSCONFIG_PCMK = "/etc/sysconfig/pacemaker"
48SYSCONFIG_NFS = "/etc/sysconfig/nfs"
49SYSCONFIG_FW = "/etc/sysconfig/SuSEfirewall2"
50SYSCONFIG_FW_CLUSTER = "/etc/sysconfig/SuSEfirewall2.d/services/cluster"
51PCMK_REMOTE_AUTH = "/etc/pacemaker/authkey"
52COROSYNC_CONF_ORIG = tmpfiles.create()[1]
53SERVICES_STOP_LIST = ["corosync-qdevice.service", "corosync.service", "hawk.service"]
54USER_LIST = ["root", "hacluster"]
55QDEVICE_ADD = "add"
56QDEVICE_REMOVE = "remove"
57WATCHDOG_CFG = "/etc/modules-load.d/watchdog.conf"
58BOOTH_DIR = "/etc/booth"
59BOOTH_CFG = "/etc/booth/booth.conf"
60BOOTH_AUTH = "/etc/booth/authkey"
61FILES_TO_SYNC = (BOOTH_DIR, corosync.conf(), COROSYNC_AUTH, CSYNC2_CFG, CSYNC2_KEY, "/etc/ctdb/nodes",
62        "/etc/drbd.conf", "/etc/drbd.d", "/etc/ha.d/ldirectord.cf", "/etc/lvm/lvm.conf", "/etc/multipath.conf",
63        "/etc/samba/smb.conf", SYSCONFIG_NFS, SYSCONFIG_PCMK, SYSCONFIG_SBD, PCMK_REMOTE_AUTH, WATCHDOG_CFG)
64
65INIT_STAGES = ("ssh", "ssh_remote", "csync2", "csync2_remote", "corosync", "sbd", "cluster", "ocfs2", "admin", "qdevice")
66
67class QdevicePolicy(Enum):
68    QDEVICE_RELOAD = 0
69    QDEVICE_RESTART = 1
70    QDEVICE_RESTART_LATER = 2
71
72
73class Context(object):
74    """
75    Context object used to avoid having to pass these variables
76    to every bootstrap method.
77    """
78    def __init__(self):
79        '''
80        Initialize attributes
81        '''
82        self.type = None # init or join
83        self.quiet = None
84        self.yes_to_all = None
85        self.cluster_name = None
86        self.watchdog = None
87        self.no_overwrite_sshkey = None
88        self.nic_list = None
89        self.unicast = None
90        self.admin_ip = None
91        self.second_heartbeat = None
92        self.ipv6 = None
93        self.qdevice_inst = None
94        self.qnetd_addr = None
95        self.qdevice_port = None
96        self.qdevice_algo = None
97        self.qdevice_tie_breaker = None
98        self.qdevice_tls = None
99        self.qdevice_heuristics = None
100        self.qdevice_heuristics_mode = None
101        self.qdevice_rm_flag = None
102        self.qdevice_reload_policy = QdevicePolicy.QDEVICE_RESTART
103        self.ocfs2_devices = []
104        self.use_cluster_lvm2 = None
105        self.mount_point = None
106        self.cluster_node = None
107        self.cluster_node_ip = None
108        self.force = None
109        self.arbitrator = None
110        self.clusters = None
111        self.tickets = None
112        self.sbd_manager = None
113        self.sbd_devices = None
114        self.diskless_sbd = None
115        self.stage = None
116        self.args = None
117        self.ui_context = None
118        self.interfaces_inst = None
119        self.with_other_user = True
120        self.cluster_is_running = None
121        self.default_nic_list = []
122        self.default_ip_list = []
123        self.local_ip_list = []
124        self.local_network_list = []
125        self.rm_list = [SYSCONFIG_SBD, CSYNC2_CFG, corosync.conf(), CSYNC2_KEY,
126                COROSYNC_AUTH, "/var/lib/heartbeat/crm/*", "/var/lib/pacemaker/cib/*"]
127
128    @classmethod
129    def set_context(cls, options):
130        ctx = cls()
131        for opt in vars(options):
132            setattr(ctx, opt, getattr(options, opt))
133        return ctx
134
135    def initialize_qdevice(self):
136        """
137        Initialize qdevice instance
138        """
139        if not self.qnetd_addr:
140            return
141        self.qdevice_inst = corosync.QDevice(
142                self.qnetd_addr,
143                port=self.qdevice_port,
144                algo=self.qdevice_algo,
145                tie_breaker=self.qdevice_tie_breaker,
146                tls=self.qdevice_tls,
147                cmds=self.qdevice_heuristics,
148                mode=self.qdevice_heuristics_mode)
149
150    def _validate_sbd_option(self):
151        """
152        Validate sbd options
153        """
154        if self.sbd_devices and self.diskless_sbd:
155            error("Can't use -s and -S options together")
156        if self.stage == "sbd":
157            if not self.sbd_devices and not self.diskless_sbd and self.yes_to_all:
158                error("Stage sbd should specify sbd device by -s or diskless sbd by -S option")
159            if utils.service_is_active("sbd.service"):
160                error("Cannot configure stage sbd: sbd.service already running!")
161            if self.cluster_is_running:
162                utils.check_all_nodes_reachable()
163
164    def validate_option(self):
165        """
166        Validate options
167        """
168        if self.admin_ip:
169            Validation.valid_admin_ip(self.admin_ip)
170        if self.qdevice_inst:
171            self.qdevice_inst.valid_attr()
172        if self.nic_list:
173            if len(self.nic_list) > 2:
174                error("Maximum number of interface is 2")
175            if len(self.nic_list) != len(set(self.nic_list)):
176                error("Duplicated input")
177        if self.no_overwrite_sshkey:
178            warn("--no-overwrite-sshkey option is deprecated since crmsh does not overwrite ssh keys by default anymore and will be removed in future versions")
179        if self.type == "join" and self.watchdog:
180            warn("-w option is deprecated and will be removed in future versions")
181        if self.ocfs2_devices or self.stage == "ocfs2":
182            ocfs2.OCFS2Manager.verify_ocfs2(self)
183        self._validate_sbd_option()
184
185    def init_sbd_manager(self):
186        self.sbd_manager = SBDManager(self.sbd_devices, self.diskless_sbd)
187
188
189class SBDManager(object):
190    """
191    Class to manage sbd configuration and services
192    """
193    SYSCONFIG_SBD_TEMPLATE = "/usr/share/fillup-templates/sysconfig.sbd"
194    SBD_STATUS_DESCRIPTION = """
195Configure SBD:
196  If you have shared storage, for example a SAN or iSCSI target,
197  you can use it avoid split-brain scenarios by configuring SBD.
198  This requires a 1 MB partition, accessible to all nodes in the
199  cluster.  The device path must be persistent and consistent
200  across all nodes in the cluster, so /dev/disk/by-id/* devices
201  are a good choice.  Note that all data on the partition you
202  specify here will be destroyed.
203"""
204    SBD_WARNING = "Not configuring SBD - STONITH will be disabled."
205    DISKLESS_SBD_WARNING = """Diskless SBD requires cluster with three or more nodes.
206If you want to use diskless SBD for two-nodes cluster, should be combined with QDevice."""
207    PARSE_RE = "[; ]"
208    DISKLESS_CRM_CMD = "crm configure property stonith-enabled=true stonith-watchdog-timeout={} stonith-timeout={}"
209    SBD_WATCHDOG_TIMEOUT_DEFAULT_WITH_QDEVICE = 35
210    STONITH_WATCHDOG_TIMEOUT_DEFAULT = "10s"
211    STONITH_WATCHDOG_TIMEOUT_DEFAULT_S390 = "30s"
212
213    def __init__(self, sbd_devices=None, diskless_sbd=False):
214        """
215        Init function
216
217        sbd_devices is provided by '-s' option on init process
218        diskless_sbd is provided by '-S' option on init process
219        """
220        self.sbd_devices_input = sbd_devices
221        self.diskless_sbd = diskless_sbd
222        self._sbd_devices = None
223        self._watchdog_inst = None
224        self._stonith_watchdog_timeout = self.STONITH_WATCHDOG_TIMEOUT_DEFAULT
225        self._stonith_timeout = 60
226        self._sbd_watchdog_timeout = 0
227        self._is_s390 = "390" in os.uname().machine
228
229    @staticmethod
230    def _get_device_uuid(dev, node=None):
231        """
232        Get UUID for specific device and node
233        """
234        cmd = "sbd -d {} dump".format(dev)
235        if node:
236            cmd = "ssh {} root@{} '{}'".format(SSH_OPTION, node, cmd)
237
238        rc, out, err = utils.get_stdout_stderr(cmd)
239        if rc != 0 and err:
240            raise ValueError("Cannot dump sbd meta-data: {}".format(err))
241        if rc == 0 and out:
242            res = re.search("UUID\s*:\s*(.*)\n", out)
243            if not res:
244                raise ValueError("Cannot find sbd device UUID for {}".format(dev))
245            return res.group(1)
246
247    def _compare_device_uuid(self, dev, node_list):
248        """
249        Compare local sbd device UUID with other node's sbd device UUID
250        """
251        if not node_list:
252            return
253        local_uuid = self._get_device_uuid(dev)
254        for node in node_list:
255            remote_uuid = self._get_device_uuid(dev, node)
256            if local_uuid != remote_uuid:
257                raise ValueError("Device {} doesn't have the same UUID with {}".format(dev, node))
258
259    def _verify_sbd_device(self, dev_list, compare_node_list=[]):
260        """
261        Verify sbd device
262        """
263        if len(dev_list) > 3:
264            raise ValueError("Maximum number of SBD device is 3")
265        for dev in dev_list:
266            if not utils.is_block_device(dev):
267                raise ValueError("{} doesn't look like a block device".format(dev))
268            self._compare_device_uuid(dev, compare_node_list)
269
270    def _get_sbd_device_interactive(self):
271        """
272        Get sbd device on interactive mode
273        """
274        if _context.yes_to_all:
275            warn(self.SBD_WARNING)
276            return
277
278        status(self.SBD_STATUS_DESCRIPTION)
279
280        if not confirm("Do you wish to use SBD?"):
281            warn(self.SBD_WARNING)
282            return
283
284        configured_dev_list = self._get_sbd_device_from_config()
285        if configured_dev_list and not confirm("SBD is already configured to use {} - overwrite?".format(';'.join(configured_dev_list))):
286            return configured_dev_list
287
288        dev_list = []
289        dev_looks_sane = False
290        while not dev_looks_sane:
291            dev = prompt_for_string('Path to storage device (e.g. /dev/disk/by-id/...), or "none" for diskless sbd, use ";" as separator for multi path', r'none|\/.*')
292            if not dev:
293                continue
294            if dev == "none":
295                self.diskless_sbd = True
296                return
297            dev_list = utils.re_split_string(self.PARSE_RE, dev)
298            try:
299                self._verify_sbd_device(dev_list)
300            except ValueError as err_msg:
301                print_error_msg(str(err_msg))
302                continue
303            for dev_item in dev_list:
304                warn("All data on {} will be destroyed!".format(dev_item))
305                if confirm('Are you sure you wish to use this device?'):
306                    dev_looks_sane = True
307                else:
308                    dev_looks_sane = False
309                    break
310
311        return dev_list
312
313    def _get_sbd_device(self):
314        """
315        Get sbd device from options or interactive mode
316        """
317        dev_list = []
318        if self.sbd_devices_input:
319            dev_list = utils.parse_append_action_argument(self.sbd_devices_input)
320            self._verify_sbd_device(dev_list)
321        elif not self.diskless_sbd:
322            dev_list = self._get_sbd_device_interactive()
323        self._sbd_devices = dev_list
324
325    def _initialize_sbd(self):
326        """
327        Initialize SBD device
328        """
329        if self.diskless_sbd:
330            return
331        for dev in self._sbd_devices:
332            rc, _, err = invoke("sbd -d {} create".format(dev))
333            if not rc:
334                error("Failed to initialize SBD device {}: {}".format(dev, err))
335
336    def _update_configuration(self):
337        """
338        Update /etc/sysconfig/sbd
339        """
340        shutil.copyfile(self.SYSCONFIG_SBD_TEMPLATE, SYSCONFIG_SBD)
341        self._determine_sbd_watchdog_timeout()
342        sbd_config_dict = {
343                "SBD_PACEMAKER": "yes",
344                "SBD_STARTMODE": "always",
345                "SBD_DELAY_START": "no",
346                "SBD_WATCHDOG_DEV": self._watchdog_inst.watchdog_device_name
347                }
348        if self._sbd_watchdog_timeout > 0:
349            sbd_config_dict["SBD_WATCHDOG_TIMEOUT"] = str(self._sbd_watchdog_timeout)
350        if self._sbd_devices:
351            sbd_config_dict["SBD_DEVICE"] = ';'.join(self._sbd_devices)
352        utils.sysconfig_set(SYSCONFIG_SBD, **sbd_config_dict)
353        csync2_update(SYSCONFIG_SBD)
354
355    def _determine_sbd_watchdog_timeout(self):
356        """
357        When using diskless SBD, determine value of SBD_WATCHDOG_TIMEOUT
358        """
359        if not self.diskless_sbd:
360            return
361        # add sbd after qdevice started
362        if utils.is_qdevice_configured() and utils.service_is_active("corosync-qdevice.service"):
363            qdevice_sync_timeout = utils.get_qdevice_sync_timeout()
364            self._sbd_watchdog_timeout = qdevice_sync_timeout + 5
365            if self._is_s390 and self._sbd_watchdog_timeout < 15:
366                self._sbd_watchdog_timeout = 15
367            self._stonith_timeout = self.calculate_stonith_timeout(self._sbd_watchdog_timeout)
368        # add sbd and qdevice together from beginning
369        elif _context.qdevice_inst:
370            self._sbd_watchdog_timeout = self.SBD_WATCHDOG_TIMEOUT_DEFAULT_WITH_QDEVICE
371            self._stonith_timeout = self.calculate_stonith_timeout(self._sbd_watchdog_timeout)
372
373    def _determine_stonith_watchdog_timeout(self):
374        """
375        Determine value of stonith-watchdog-timeout
376        """
377        res = SBDManager.get_sbd_value_from_config("SBD_WATCHDOG_TIMEOUT")
378        if res:
379            self._stonith_watchdog_timeout = -1
380        elif self._is_s390:
381            self._stonith_watchdog_timeout = self.STONITH_WATCHDOG_TIMEOUT_DEFAULT_S390
382
383    def _get_sbd_device_from_config(self):
384        """
385        Gets currently configured SBD device, i.e. what's in /etc/sysconfig/sbd
386        """
387        res = SBDManager.get_sbd_value_from_config("SBD_DEVICE")
388        if res:
389            return utils.re_split_string(self.PARSE_RE, res)
390        else:
391            return None
392
393    def _restart_cluster_and_configure_sbd_ra(self):
394        """
395        Try to configure sbd resource, restart cluster on needed
396        """
397        if not utils.has_resource_running():
398            status("Restarting cluster service")
399            utils.cluster_run_cmd("crm cluster restart")
400            wait_for_cluster()
401            self.configure_sbd_resource()
402        else:
403            warn("To start sbd.service, need to restart cluster service manually on each node")
404            if self.diskless_sbd:
405                cmd = self.DISKLESS_CRM_CMD.format(self._stonith_watchdog_timeout, str(self._stonith_timeout)+"s")
406                warn("Then run \"{}\" on any node".format(cmd))
407            else:
408                self.configure_sbd_resource()
409
410    def _enable_sbd_service(self):
411        """
412        Try to enable sbd service
413        """
414        if _context.cluster_is_running:
415            # in sbd stage, enable sbd.service on cluster wide
416            utils.cluster_run_cmd("systemctl enable sbd.service")
417            self._restart_cluster_and_configure_sbd_ra()
418        else:
419            # in init process
420            invoke("systemctl enable sbd.service")
421
422    def _warn_diskless_sbd(self, peer=None):
423        """
424        Give warning when configuring diskless sbd
425        """
426        # When in sbd stage or join process
427        if (self.diskless_sbd and _context.cluster_is_running) or peer:
428            vote_dict = utils.get_quorum_votes_dict(peer)
429            expected_vote = int(vote_dict['Expected'])
430            if (expected_vote < 2 and peer) or (expected_vote < 3 and not peer):
431                warn(self.DISKLESS_SBD_WARNING)
432        # When in init process
433        elif self.diskless_sbd:
434            warn(self.DISKLESS_SBD_WARNING)
435
436    def sbd_init(self):
437        """
438        Function sbd_init includes these steps:
439        1. Get sbd device from options or interactive mode
440        2. Initialize sbd device
441        3. Write config file /etc/sysconfig/sbd
442        """
443        from .watchdog import Watchdog
444
445        if not utils.package_is_installed("sbd"):
446            return
447        self._watchdog_inst = Watchdog(_input=_context.watchdog)
448        self._watchdog_inst.init_watchdog()
449        self._get_sbd_device()
450        if not self._sbd_devices and not self.diskless_sbd:
451            invoke("systemctl disable sbd.service")
452            return
453        self._warn_diskless_sbd()
454        with status_long("Initializing {}SBD...".format("diskless " if self.diskless_sbd else "")):
455            self._initialize_sbd()
456            self._update_configuration()
457        self._determine_stonith_watchdog_timeout()
458        self._enable_sbd_service()
459
460    def configure_sbd_resource(self):
461        """
462        Configure stonith-sbd resource and stonith-enabled property
463        """
464        if not utils.package_is_installed("sbd") or \
465                not utils.service_is_enabled("sbd.service") or \
466                utils.has_resource_configured("stonith:external/sbd"):
467            return
468
469        if self._get_sbd_device_from_config():
470            if not invokerc("crm configure primitive stonith-sbd stonith:external/sbd pcmk_delay_max=30s"):
471                error("Can't create stonith-sbd primitive")
472            if not invokerc("crm configure property stonith-enabled=true"):
473                error("Can't enable STONITH for SBD")
474        else:
475            cmd = self.DISKLESS_CRM_CMD.format(self._stonith_watchdog_timeout, str(self._stonith_timeout)+"s")
476            if not invokerc(cmd):
477                error("Can't enable STONITH for diskless SBD")
478
479    def join_sbd(self, peer_host):
480        """
481        Function join_sbd running on join process only
482        On joining process, check whether peer node has enabled sbd.service
483        If so, check prerequisites of SBD and verify sbd device on join node
484        """
485        from .watchdog import Watchdog
486
487        if not utils.package_is_installed("sbd"):
488            return
489        if not os.path.exists(SYSCONFIG_SBD) or not utils.service_is_enabled("sbd.service", peer_host):
490            invoke("systemctl disable sbd.service")
491            return
492        self._watchdog_inst = Watchdog(peer_host=peer_host)
493        self._watchdog_inst.join_watchdog()
494        dev_list = self._get_sbd_device_from_config()
495        if dev_list:
496            self._verify_sbd_device(dev_list, [peer_host])
497        else:
498            self._warn_diskless_sbd(peer_host)
499        status("Got {}SBD configuration".format("" if dev_list else "diskless "))
500        invoke("systemctl enable sbd.service")
501
502    @classmethod
503    def verify_sbd_device(cls):
504        """
505        This classmethod is for verifying sbd device on a running cluster
506        Raise ValueError for exceptions
507        """
508        inst = cls()
509        dev_list = inst._get_sbd_device_from_config()
510        if not dev_list:
511            raise ValueError("No sbd device configured")
512        inst._verify_sbd_device(dev_list, utils.list_cluster_nodes_except_me())
513
514    @classmethod
515    def get_sbd_device_from_config(cls):
516        """
517        Get sbd device list from config
518        """
519        inst = cls()
520        return inst._get_sbd_device_from_config()
521
522    @classmethod
523    def is_using_diskless_sbd(cls):
524        """
525        Check if using diskless SBD
526        """
527        inst = cls()
528        dev_list = inst._get_sbd_device_from_config()
529        if not dev_list and utils.service_is_active("sbd.service"):
530            return True
531        return False
532
533    @staticmethod
534    def update_configuration(sbd_config_dict):
535        """
536        Update and sync sbd configuration
537        """
538        utils.sysconfig_set(SYSCONFIG_SBD, **sbd_config_dict)
539        csync2_update(SYSCONFIG_SBD)
540
541    @staticmethod
542    def calculate_stonith_timeout(sbd_watchdog_timeout):
543        """
544        Calculate stonith timeout
545        """
546        return int(sbd_watchdog_timeout * 2 * 1.2)
547
548    @staticmethod
549    def get_sbd_value_from_config(key):
550        """
551        Get value from /etc/sysconfig/sbd
552        """
553        conf = utils.parse_sysconfig(SYSCONFIG_SBD)
554        res = conf.get(key)
555        return res
556
557
558_context = None
559
560
561def die(*args):
562    """
563    Broken out as special case for log() failure.  Ordinarily you
564    should just use error() to terminate.
565    """
566    raise ValueError(" ".join([str(arg) for arg in args]))
567
568
569def error(*args):
570    """
571    Log an error message and raise ValueError to bail out of
572    bootstrap process.
573    """
574    log("ERROR: {}".format(" ".join([str(arg) for arg in args])))
575    die(*args)
576
577
578def print_error_msg(msg):
579    """
580    Just print error message
581    """
582    print(term.render(clidisplay.error("ERROR:")) + " {}".format(msg))
583
584
585def warn(*args):
586    """
587    Log and display a warning message.
588    """
589    log("WARNING: {}".format(" ".join(str(arg) for arg in args)))
590    print(term.render(clidisplay.warn("WARNING: {}".format(" ".join(str(arg) for arg in args)))))
591
592
593@utils.memoize
594def log_file_fallback():
595    """
596    If the standard log location isn't writable,
597    just log to the nearest temp dir.
598    """
599    return os.path.join(utils.get_tempdir(), "ha-cluster-bootstrap.log")
600
601
602def log(*args):
603    global LOG_FILE
604    try:
605        Path(os.path.dirname(LOG_FILE)).mkdir(parents=True, exist_ok=True)
606        with open(LOG_FILE, "ab") as logfile:
607            text = " ".join([utils.to_ascii(arg) for arg in args]) + "\n"
608            logfile.write(text.encode('ascii', 'backslashreplace'))
609    except IOError:
610        if LOG_FILE != log_file_fallback():
611            LOG_FILE = log_file_fallback()
612            log(*args)
613        else:
614            die("Can't append to {} - aborting".format(LOG_FILE))
615
616
617def drop_last_history():
618    hlen = readline.get_current_history_length()
619    if hlen > 0:
620        readline.remove_history_item(hlen - 1)
621
622
623def prompt_for_string(msg, match=None, default='', valid_func=None, prev_value=[]):
624    if _context.yes_to_all:
625        return default
626
627    while True:
628        disable_completion()
629        val = utils.multi_input('  %s [%s]' % (msg, default))
630        enable_completion()
631        if not val:
632            val = default
633        else:
634            drop_last_history()
635
636        if not val:
637            return None
638        if not match and not valid_func:
639            return val
640        if match and not re.match(match, val):
641            print_error_msg("Invalid value entered")
642            continue
643        if valid_func:
644            try:
645                valid_func(val, prev_value)
646            except ValueError as err:
647                print_error_msg(err)
648                continue
649
650        return val
651
652
653def confirm(msg):
654    if _context.yes_to_all:
655        return True
656    disable_completion()
657    rc = utils.ask(msg)
658    enable_completion()
659    drop_last_history()
660    return rc
661
662
663def disable_completion():
664    if _context.ui_context:
665        _context.ui_context.disable_completion()
666
667
668def enable_completion():
669    if _context.ui_context:
670        _context.ui_context.setup_readline()
671
672
673def invoke(*args):
674    """
675    Log command execution to log file.
676    Log output from command to log file.
677    Return (boolean, stdout, stderr)
678    """
679    log("+ " + " ".join(args))
680    rc, stdout, stderr = utils.get_stdout_stderr(" ".join(args))
681    if stdout:
682        log(stdout)
683    if stderr:
684        log(stderr)
685    return rc == 0, stdout, stderr
686
687
688def invokerc(*args):
689    """
690    Calling invoke, return True/False
691    """
692    rc, _, _ = invoke(*args)
693    return rc
694
695
696def crm_configure_load(action, configuration):
697    log(": loading crm config (%s), content is:" % (action))
698    log(configuration)
699    if not cib_factory.initialize():
700        error("Failed to load cluster configuration")
701    set_obj = mkset_obj()
702    if action == 'replace':
703        cib_factory.erase()
704    if not set_obj.save(configuration, remove=False, method=action):
705        error("Failed to load cluster configuration")
706    if not cib_factory.commit():
707        error("Failed to commit cluster configuration")
708
709
710def wait_for_resource(message, resource):
711    """
712    Wait for resource started
713    """
714    with status_long(message):
715        while True:
716            # -r here to display inactive resources
717            # -R here to display individual clone instances
718            _rc, out, err = utils.get_stdout_stderr("crm_mon -1rR")
719            # Make sure clone instances also started(no Stopped instance)
720            if re.search(r"{}\s+.*:\s+Started\s".format(resource), out) and \
721                    not re.search(r"{}\s+.*:\s+(Stopped|Starting)".format(resource), out):
722                break
723            status_progress()
724            sleep(1)
725
726
727def wait_for_cluster():
728    with status_long("Waiting for cluster"):
729        while True:
730            _rc, out, _err = utils.get_stdout_stderr("crm_mon -1")
731            if is_online(out):
732                break
733            status_progress()
734            sleep(2)
735
736
737def get_cluster_node_hostname():
738    """
739    Get the hostname of the cluster node
740    """
741    peer_node = None
742    if _context.cluster_node:
743        rc, out, err = utils.get_stdout_stderr("ssh {} {} crm_node --name".format(SSH_OPTION, _context.cluster_node))
744        if rc != 0:
745            error(err)
746        peer_node = out
747    return peer_node
748
749
750def is_online(crm_mon_txt):
751    """
752    Check whether local node is online
753    Besides that, in join process, check whether init node is online
754    """
755    if not re.search("Online: .* {} ".format(utils.this_node()), crm_mon_txt):
756        return False
757
758    # if peer_node is None, this is in the init process
759    peer_node = get_cluster_node_hostname()
760    if peer_node is None:
761        return True
762    # In join process
763    # If the joining node is already online but can't find the init node
764    # The communication IP maybe mis-configured
765    if not re.search("Online: .* {} ".format(peer_node), crm_mon_txt):
766        shutil.copy(COROSYNC_CONF_ORIG, corosync.conf())
767        csync2_update(corosync.conf())
768        utils.stop_service("corosync")
769        print()
770        error("Cannot see peer node \"{}\", please check the communication IP".format(peer_node))
771    return True
772
773
774def pick_default_value(default_list, prev_list):
775    """
776    Provide default value for function 'prompt_for_string'.
777    Make sure give different default value in multi-ring mode.
778
779    Parameters:
780    * default_list - default value list for config item
781    * prev_list    - previous value for config item in multi-ring mode
782    """
783    for value in default_list:
784        if value not in prev_list:
785            return value
786    return ""
787
788
789def sleep(t):
790    """
791    Sleep for t seconds.
792    """
793    t = float(t)
794    time.sleep(t)
795
796
797def status(msg):
798    log("# " + msg)
799    if not _context.quiet:
800        print("  {}".format(msg))
801
802
803@contextmanager
804def status_long(msg):
805    log("# {}...".format(msg))
806    if not _context.quiet:
807        sys.stdout.write("  {}...".format(msg))
808        sys.stdout.flush()
809    try:
810        yield
811    except:
812        raise
813    else:
814        status_done()
815
816
817def status_progress():
818    if not _context.quiet:
819        sys.stdout.write(".")
820        sys.stdout.flush()
821
822
823def status_done():
824    log("# done")
825    if not _context.quiet:
826        print("done")
827
828
829def partprobe():
830    # This function uses fdisk to create a list of valid devices for probing
831    # with partprobe.  This prevents partprobe from failing on read-only mounted
832    # devices such as /dev/sr0 (etc) that might cause it to return an error when
833    # it exits.  This allows partprobe to run without forcing _die to bail out.
834    # -Brandon Heaton
835    #  ATT Training Engineer
836    #  Data Center Engineer
837    #  bheaton@suse.com
838    _rc, out, _err = utils.get_stdout_stderr("sfdisk -l")
839    disks = re.findall(r'^Disk\s*(/.+):', out, re.M)
840    invoke("partprobe", *disks)
841
842
843def probe_partitions():
844    # Need to do this if second (or subsequent) node happens to be up and
845    # connected to storage while it's being repartitioned on the first node.
846    with status_long("Probing for new partitions"):
847        partprobe()
848        sleep(5)
849
850
851def check_tty():
852    """
853    Check for pseudo-tty: Cannot display read prompts without a TTY (bnc#892702)
854    """
855    if _context.yes_to_all:
856        return
857    if not sys.stdin.isatty():
858        error("No pseudo-tty detected! Use -t option to ssh if calling remotely.")
859
860
861def my_hostname_resolves():
862    import socket
863    hostname = utils.this_node()
864    try:
865        socket.gethostbyname(hostname)
866        return True
867    except socket.error:
868        return False
869
870
871def check_prereqs(stage):
872    warned = False
873
874    if not my_hostname_resolves():
875        warn("Hostname '{}' is unresolvable. {}".format(
876            utils.this_node(),
877            "Please add an entry to /etc/hosts or configure DNS."))
878        warned = True
879
880    timekeepers = ('chronyd.service', 'ntp.service', 'ntpd.service')
881    timekeeper = None
882    for tk in timekeepers:
883        if utils.service_is_available(tk):
884            timekeeper = tk
885            break
886
887    if timekeeper is None:
888        warn("No NTP service found.")
889        warned = True
890    elif not utils.service_is_enabled(timekeeper):
891        warn("{} is not configured to start at system boot.".format(timekeeper))
892        warned = True
893
894    if warned:
895        if not confirm("Do you want to continue anyway?"):
896            return False
897
898    firewall_open_basic_ports()
899    return True
900
901
902def log_start():
903    """
904    Convenient side-effect: this will die immediately if the log file
905    is not writable (e.g. if not running as root)
906    """
907    # Reload rsyslog to make sure it logs with the correct hostname
908    if utils.service_is_active("rsyslog.service"):
909        invoke("systemctl reload rsyslog.service")
910    datestr = utils.get_stdout("date --rfc-3339=seconds")[1]
911    log('================================================================')
912    log("%s %s" % (datestr, " ".join(sys.argv)))
913    log('----------------------------------------------------------------')
914
915
916def init_network():
917    """
918    Get all needed network information through utils.InterfacesInfo
919    """
920    interfaces_inst = utils.InterfacesInfo(_context.ipv6, _context.second_heartbeat, _context.nic_list)
921    interfaces_inst.get_interfaces_info()
922    _context.default_nic_list = interfaces_inst.get_default_nic_list_from_route()
923    _context.default_ip_list = interfaces_inst.get_default_ip_list()
924
925    # local_ip_list and local_network_list are for validation
926    _context.local_ip_list = interfaces_inst.ip_list
927    _context.local_network_list = interfaces_inst.network_list
928    _context.interfaces_inst = interfaces_inst
929    # use two "-i" options equal to use "-M" option
930    if len(_context.default_nic_list) == 2 and not _context.second_heartbeat:
931        _context.second_heartbeat = True
932
933
934def configure_firewall(tcp=None, udp=None):
935    if tcp is None:
936        tcp = []
937    if udp is None:
938        udp = []
939
940    def init_firewall_suse(tcp, udp):
941        if os.path.exists(SYSCONFIG_FW_CLUSTER):
942            cluster = utils.parse_sysconfig(SYSCONFIG_FW_CLUSTER)
943            tcpcurr = set(cluster.get("TCP", "").split())
944            tcpcurr.update(tcp)
945            tcp = list(tcpcurr)
946            udpcurr = set(cluster.get("UDP", "").split())
947            udpcurr.update(udp)
948            udp = list(udpcurr)
949
950        utils.sysconfig_set(SYSCONFIG_FW_CLUSTER, TCP=" ".join(tcp), UDP=" ".join(udp))
951
952        ext = ""
953        if os.path.exists(SYSCONFIG_FW):
954            fw = utils.parse_sysconfig(SYSCONFIG_FW)
955            ext = fw.get("FW_CONFIGURATIONS_EXT", "")
956            if "cluster" not in ext.split():
957                ext = ext + " cluster"
958        utils.sysconfig_set(SYSCONFIG_FW, FW_CONFIGURATIONS_EXT=ext)
959
960        # No need to do anything else if the firewall is inactive
961        if not utils.service_is_active("SuSEfirewall2"):
962            return
963
964        # Firewall is active, either restart or complain if we couldn't tweak it
965        status("Restarting firewall (tcp={}, udp={})".format(" ".join(tcp), " ".join(udp)))
966        if not invokerc("rcSuSEfirewall2 restart"):
967            error("Failed to restart firewall (SuSEfirewall2)")
968
969    def init_firewall_firewalld(tcp, udp):
970        has_firewalld = utils.service_is_active("firewalld")
971        cmdbase = 'firewall-cmd --zone=public --permanent ' if has_firewalld else 'firewall-offline-cmd --zone=public '
972
973        def cmd(args):
974            if not invokerc(cmdbase + args):
975                error("Failed to configure firewall.")
976
977        for p in tcp:
978            cmd("--add-port={}/tcp".format(p))
979
980        for p in udp:
981            cmd("--add-port={}/udp".format(p))
982
983        if has_firewalld:
984            if not invokerc("firewall-cmd --reload"):
985                error("Failed to reload firewall configuration.")
986
987    def init_firewall_ufw(tcp, udp):
988        """
989        try configuring firewall with ufw
990        """
991        for p in tcp:
992            if not invokerc("ufw allow {}/tcp".format(p)):
993                error("Failed to configure firewall (ufw)")
994        for p in udp:
995            if not invokerc("ufw allow {}/udp".format(p)):
996                error("Failed to configure firewall (ufw)")
997
998    if utils.package_is_installed("firewalld"):
999        init_firewall_firewalld(tcp, udp)
1000    elif utils.package_is_installed("SuSEfirewall2"):
1001        init_firewall_suse(tcp, udp)
1002    elif utils.package_is_installed("ufw"):
1003        init_firewall_ufw(tcp, udp)
1004
1005
1006def firewall_open_basic_ports():
1007    """
1008    Open ports for csync2, mgmtd, hawk & dlm respectively
1009    """
1010    configure_firewall(tcp=["30865", "5560", "7630", "21064"])
1011
1012
1013def firewall_open_corosync_ports():
1014    """
1015    Have to do this separately, as we need general firewall config early
1016    so csync2 works, but need corosync config *after* corosync.conf has
1017    been created/updated.
1018
1019    Please note corosync uses two UDP ports mcastport (for mcast
1020    receives) and mcastport - 1 (for mcast sends).
1021
1022    Also open QNetd/QDevice port if configured.
1023    """
1024    # all mcastports defined in corosync config
1025    udp = corosync.get_values("totem.interface.mcastport")
1026    udp.extend([str(int(p) - 1) for p in udp])
1027
1028    tcp = corosync.get_values("totem.quorum.device.net.port")
1029
1030    configure_firewall(tcp=tcp, udp=udp)
1031
1032
1033def init_cluster_local():
1034    # Caller should check this, but I'm paranoid...
1035    if utils.service_is_active("corosync.service"):
1036        error("corosync service is running!")
1037
1038    firewall_open_corosync_ports()
1039
1040    # reset password, but only if it's not already set
1041    _rc, outp = utils.get_stdout("passwd -S hacluster")
1042    ps = outp.strip().split()[1]
1043    pass_msg = ""
1044    if ps not in ("P", "PS"):
1045        log(': Resetting password of hacluster user')
1046        rc, outp, errp = utils.get_stdout_stderr("passwd hacluster", input_s=b"linux\nlinux\n")
1047        if rc != 0:
1048            warn("Failed to reset password of hacluster user: %s" % (outp + errp))
1049        else:
1050            pass_msg = ", password 'linux'"
1051
1052    # evil, but necessary
1053    invoke("rm -f /var/lib/heartbeat/crm/* /var/lib/pacemaker/cib/*")
1054
1055    # only try to start hawk if hawk is installed
1056    if utils.service_is_available("hawk.service"):
1057        utils.start_service("hawk.service", enable=True)
1058        status("Hawk cluster interface is now running. To see cluster status, open:")
1059        status("  https://{}:7630/".format(_context.default_ip_list[0]))
1060        status("Log in with username 'hacluster'{}".format(pass_msg))
1061    else:
1062        warn("Hawk not installed - not configuring web management interface.")
1063
1064    if pass_msg:
1065        warn("You should change the hacluster password to something more secure!")
1066
1067    utils.start_service("pacemaker.service", enable=True)
1068    wait_for_cluster()
1069
1070
1071def install_tmp(tmpfile, to):
1072    with open(tmpfile, "r") as src:
1073        with utils.open_atomic(to, "w") as dst:
1074            for line in src:
1075                dst.write(line)
1076
1077
1078def append(fromfile, tofile):
1079    log("+ cat %s >> %s" % (fromfile, tofile))
1080    with open(tofile, "a") as tf:
1081        with open(fromfile, "r") as ff:
1082            tf.write(ff.read())
1083
1084
1085def append_unique(fromfile, tofile):
1086    """
1087    Append unique content from fromfile to tofile
1088    """
1089    if not utils.check_file_content_included(fromfile, tofile):
1090        append(fromfile, tofile)
1091
1092
1093def rmfile(path, ignore_errors=False):
1094    """
1095    Try to remove the given file, and
1096    report an error on failure
1097    """
1098    try:
1099        os.remove(path)
1100    except os.error as err:
1101        if not ignore_errors:
1102            error("Failed to remove {}: {}".format(path, err))
1103
1104
1105def mkdirs_owned(dirs, mode=0o777, uid=-1, gid=-1):
1106    """
1107    Create directory path, setting the mode and
1108    ownership of the leaf directory to mode/uid/gid.
1109    """
1110    if not os.path.exists(dirs):
1111        try:
1112            os.makedirs(dirs, mode)
1113        except OSError as err:
1114            error("Failed to create {}: {}".format(dirs, err))
1115        if uid != -1 or gid != -1:
1116            utils.chown(dirs, uid, gid)
1117
1118
1119def init_ssh():
1120    """
1121    Configure passwordless SSH.
1122    """
1123    utils.start_service("sshd.service", enable=True)
1124    for user in USER_LIST:
1125        configure_local_ssh_key(user)
1126
1127
1128def key_files(user):
1129    """
1130    Find home directory for user and return key files with abspath
1131    """
1132    keyfile_dict = {}
1133    home_dir = userdir.gethomedir(user)
1134    keyfile_dict['private'] = "{}/.ssh/id_rsa".format(home_dir)
1135    keyfile_dict['public'] = "{}/.ssh/id_rsa.pub".format(home_dir)
1136    keyfile_dict['authorized'] = "{}/.ssh/authorized_keys".format(home_dir)
1137    return keyfile_dict
1138
1139
1140def is_nologin(user):
1141    """
1142    Check if user's shell is /sbin/nologin
1143    """
1144    with open("/etc/passwd") as f:
1145        return re.search("{}:.*:/sbin/nologin".format(user), f.read())
1146
1147
1148def change_user_shell(user):
1149    """
1150    To change user's login shell
1151    """
1152    if user != "root" and is_nologin(user):
1153        if not _context.yes_to_all:
1154            status("""
1155User {} will be changed the login shell as /bin/bash, and
1156be setted up authorized ssh access among cluster nodes""".format(user))
1157            if not confirm("Continue?"):
1158                _context.with_other_user = False
1159                return
1160        invoke("usermod -s /bin/bash {}".format(user))
1161
1162
1163def configure_local_ssh_key(user="root"):
1164    """
1165    Configure ssh rsa key locally
1166
1167    If <home_dir>/.ssh/id_rsa not exist, generate a new one
1168    Add <home_dir>/.ssh/id_rsa.pub to <home_dir>/.ssh/authorized_keys anyway, make sure itself authorized
1169    """
1170    change_user_shell(user)
1171
1172    private_key, public_key, authorized_file = key_files(user).values()
1173    if not os.path.exists(private_key):
1174        status("Generating SSH key for {}".format(user))
1175        cmd = "ssh-keygen -q -f {} -C 'Cluster Internal on {}' -N ''".format(private_key, utils.this_node())
1176        cmd = utils.add_su(cmd, user)
1177        rc, _, err = invoke(cmd)
1178        if not rc:
1179            error("Failed to generate ssh key for {}: {}".format(user, err))
1180
1181    if not os.path.exists(authorized_file):
1182        open(authorized_file, 'w').close()
1183    append_unique(public_key, authorized_file)
1184
1185
1186def init_ssh_remote():
1187    """
1188    Called by ha-cluster-join
1189    """
1190    authorized_keys_file = "/root/.ssh/authorized_keys"
1191    if not os.path.exists(authorized_keys_file):
1192        open(authorized_keys_file, 'w').close()
1193    authkeys = open(authorized_keys_file, "r+")
1194    authkeys_data = authkeys.read()
1195    for key in ("id_rsa", "id_dsa", "id_ecdsa", "id_ed25519"):
1196        fn = os.path.join("/root/.ssh", key)
1197        if not os.path.exists(fn):
1198            continue
1199        keydata = open(fn + ".pub").read()
1200        if keydata not in authkeys_data:
1201            append(fn + ".pub", authorized_keys_file)
1202
1203
1204def append_to_remote_file(fromfile, remote_node, tofile):
1205    """
1206    Append content of fromfile to tofile on remote_node
1207    """
1208    err_details_string = """
1209    crmsh has no way to help you to setup up passwordless ssh among nodes at this time.
1210    As the hint, likely, `PasswordAuthentication` is 'no' in /etc/ssh/sshd_config.
1211    Given in this case, users must setup passwordless ssh beforehand, or change it to 'yes' and manage passwords properly
1212    """
1213    cmd = "cat {} | ssh {} root@{} 'cat >> {}'".format(fromfile, SSH_OPTION, remote_node, tofile)
1214    rc, _, err = invoke(cmd)
1215    if not rc:
1216        error("Failed to append contents of {} to {}:\n\"{}\"\n{}".format(fromfile, remote_node, err, err_details_string))
1217
1218
1219def init_csync2():
1220    status("Configuring csync2")
1221    if os.path.exists(CSYNC2_KEY):
1222        if not confirm("csync2 is already configured - overwrite?"):
1223            return
1224
1225    invoke("rm", "-f", CSYNC2_KEY)
1226    with status_long("Generating csync2 shared key (this may take a while)"):
1227        if not invokerc("csync2", "-k", CSYNC2_KEY):
1228            error("Can't create csync2 key {}".format(CSYNC2_KEY))
1229
1230    csync2_file_list = ""
1231    for f in FILES_TO_SYNC:
1232        csync2_file_list += "include {};\n".format(f)
1233
1234    utils.str2file("""group ha_group
1235{{
1236key /etc/csync2/key_hagroup;
1237host {};
1238{}
1239}}
1240    """.format(utils.this_node(), csync2_file_list), CSYNC2_CFG)
1241
1242    utils.start_service("csync2.socket", enable=True)
1243    with status_long("csync2 checking files"):
1244        invoke("csync2", "-cr", "/")
1245
1246
1247def csync2_update(path):
1248    '''
1249    Sync path to all peers
1250
1251    If there was a conflict, use '-f' to force this side to win
1252    '''
1253    invoke("csync2 -rm {}".format(path))
1254    if invokerc("csync2 -rxv {}".format(path)):
1255        return
1256    invoke("csync2 -rf {}".format(path))
1257    if not invokerc("csync2 -rxv {}".format(path)):
1258        warn("{} was not synced".format(path))
1259
1260
1261def init_csync2_remote():
1262    """
1263    It would be nice if we could just have csync2.cfg include a directory,
1264    which in turn included one file per node which would be referenced via
1265    something like "group ha_group { ... config: /etc/csync2/hosts/*; }"
1266    That way, adding a new node would just mean adding a single new file
1267    to that directory.  Unfortunately, the 'config' statement only allows
1268    inclusion of specific individual files, not multiple files via wildcard.
1269    So we have this function which is called by ha-cluster-join to add the new
1270    remote node to csync2 config on some existing node.  It is intentionally
1271    not documented in ha-cluster-init's user-visible usage information.
1272    """
1273    newhost = _context.cluster_node
1274    if not newhost:
1275        error("Hostname not specified")
1276
1277    curr_cfg = open(CSYNC2_CFG).read()
1278
1279    was_quiet = _context.quiet
1280    try:
1281        _context.quiet = True
1282        # if host doesn't already exist in csync2 config, add it
1283        if not re.search(r"^\s*host.*\s+%s\s*;" % (newhost), curr_cfg, flags=re.M):
1284            curr_cfg = re.sub(r"\bhost.*\s+\S+\s*;", r"\g<0>\n\thost %s;" % (utils.doublequote(newhost)), curr_cfg, count=1)
1285            utils.str2file(curr_cfg, CSYNC2_CFG)
1286            csync2_update("/")
1287        else:
1288            log(": Not updating %s - remote host %s already exists" % (CSYNC2_CFG, newhost))
1289    finally:
1290        _context.quiet = was_quiet
1291
1292
1293def init_corosync_auth():
1294    """
1295    Generate the corosync authkey
1296    """
1297    if os.path.exists(COROSYNC_AUTH):
1298        if not confirm("%s already exists - overwrite?" % (COROSYNC_AUTH)):
1299            return
1300        rmfile(COROSYNC_AUTH)
1301    invoke("corosync-keygen -l -k {}".format(COROSYNC_AUTH))
1302
1303
1304def init_remote_auth():
1305    """
1306    Generate the pacemaker-remote authkey
1307    """
1308    if os.path.exists(PCMK_REMOTE_AUTH):
1309        if not confirm("%s already exists - overwrite?" % (PCMK_REMOTE_AUTH)):
1310            return
1311        rmfile(PCMK_REMOTE_AUTH)
1312
1313    pcmk_remote_dir = os.path.dirname(PCMK_REMOTE_AUTH)
1314    mkdirs_owned(pcmk_remote_dir, mode=0o750, gid="haclient")
1315    if not invokerc("dd if=/dev/urandom of={} bs=4096 count=1".format(PCMK_REMOTE_AUTH)):
1316        warn("Failed to create pacemaker authkey: {}".format(PCMK_REMOTE_AUTH))
1317    utils.chown(PCMK_REMOTE_AUTH, "hacluster", "haclient")
1318    os.chmod(PCMK_REMOTE_AUTH, 0o640)
1319
1320
1321class Validation(object):
1322    """
1323    Class to validate values from interactive inputs
1324    """
1325
1326    def __init__(self, value, prev_value_list=[]):
1327        """
1328        Init function
1329        """
1330        self.value = value
1331        self.prev_value_list = prev_value_list
1332        if self.value in self.prev_value_list:
1333            raise ValueError("Already in use: {}".format(self.value))
1334
1335    def _is_mcast_addr(self):
1336        """
1337        Check whether the address is multicast address
1338        """
1339        if not utils.IP.is_mcast(self.value):
1340            raise ValueError("{} is not multicast address".format(self.value))
1341
1342    def _is_local_addr(self, local_addr_list):
1343        """
1344        Check whether the address is in local
1345        """
1346        if self.value not in local_addr_list:
1347            raise ValueError("Address must be a local address (one of {})".format(local_addr_list))
1348
1349    def _is_valid_port(self):
1350        """
1351        Check whether the port is valid
1352        """
1353        if self.prev_value_list and abs(int(self.value) - int(self.prev_value_list[0])) <= 1:
1354            raise ValueError("Port {} is already in use by corosync. Leave a gap between multiple rings.".format(self.value))
1355        if int(self.value) <= 1024 or int(self.value) > 65535:
1356            raise ValueError("Valid port range should be 1025-65535")
1357
1358    @classmethod
1359    def valid_mcast_address(cls, addr, prev_value_list=[]):
1360        """
1361        Check whether the address is already in use and whether the address is for multicast
1362        """
1363        cls_inst = cls(addr, prev_value_list)
1364        cls_inst._is_mcast_addr()
1365
1366    @classmethod
1367    def valid_ucast_ip(cls, addr, prev_value_list=[]):
1368        """
1369        Check whether the address is already in use and whether the address exists on local
1370        """
1371        cls_inst = cls(addr, prev_value_list)
1372        cls_inst._is_local_addr(_context.local_ip_list)
1373
1374    @classmethod
1375    def valid_mcast_ip(cls, addr, prev_value_list=[]):
1376        """
1377        Check whether the address is already in use and whether the address exists on local address and network
1378        """
1379        cls_inst = cls(addr, prev_value_list)
1380        cls_inst._is_local_addr(_context.local_ip_list + _context.local_network_list)
1381
1382    @classmethod
1383    def valid_port(cls, port, prev_value_list=[]):
1384        """
1385        Check whether the port is valid
1386        """
1387        cls_inst = cls(port, prev_value_list)
1388        cls_inst._is_valid_port()
1389
1390    @staticmethod
1391    def valid_admin_ip(addr, prev_value_list=[]):
1392        """
1393        Validate admin IP address
1394        """
1395        ipv6 = utils.IP.is_ipv6(addr)
1396
1397        # Check whether this IP already configured in cluster
1398        ping_cmd = "ping6" if ipv6 else "ping"
1399        if invokerc("{} -c 1 {}".format(ping_cmd, addr)):
1400            raise ValueError("Address already in use: {}".format(addr))
1401
1402
1403def init_corosync_unicast():
1404
1405    if _context.yes_to_all:
1406        status("Configuring corosync (unicast)")
1407    else:
1408        status("""
1409Configure Corosync (unicast):
1410  This will configure the cluster messaging layer.  You will need
1411  to specify a network address over which to communicate (default
1412  is {}'s network, but you can use the network address of any
1413  active interface).
1414""".format(_context.default_nic_list[0]))
1415
1416    ringXaddr_res = []
1417    mcastport_res = []
1418    default_ports = ["5405", "5407"]
1419    two_rings = False
1420
1421    for i in range(2):
1422        ringXaddr = prompt_for_string(
1423                'Address for ring{}'.format(i),
1424                default=pick_default_value(_context.default_ip_list, ringXaddr_res),
1425                valid_func=Validation.valid_ucast_ip,
1426                prev_value=ringXaddr_res)
1427        if not ringXaddr:
1428            error("No value for ring{}".format(i))
1429        ringXaddr_res.append(ringXaddr)
1430
1431        mcastport = prompt_for_string(
1432                'Port for ring{}'.format(i),
1433                match='[0-9]+',
1434                default=pick_default_value(default_ports, mcastport_res),
1435                valid_func=Validation.valid_port,
1436                prev_value=mcastport_res)
1437        if not mcastport:
1438            error("Expected a multicast port for ring{}".format(i))
1439        mcastport_res.append(mcastport)
1440
1441        if i == 1 or \
1442           not _context.second_heartbeat or \
1443           not confirm("\nAdd another heartbeat line?"):
1444            break
1445        two_rings = True
1446
1447    corosync.create_configuration(
1448            clustername=_context.cluster_name,
1449            ringXaddr=ringXaddr_res,
1450            mcastport=mcastport_res,
1451            transport="udpu",
1452            ipv6=_context.ipv6,
1453            two_rings=two_rings)
1454    csync2_update(corosync.conf())
1455
1456
1457def init_corosync_multicast():
1458    def gen_mcastaddr():
1459        if _context.ipv6:
1460            return "ff3e::%s:%d" % (
1461                ''.join([random.choice('0123456789abcdef') for _ in range(4)]),
1462                random.randint(0, 9))
1463        return "239.%d.%d.%d" % (
1464            random.randint(0, 255),
1465            random.randint(0, 255),
1466            random.randint(1, 255))
1467
1468    if _context.yes_to_all:
1469        status("Configuring corosync")
1470    else:
1471        status("""
1472Configure Corosync:
1473  This will configure the cluster messaging layer.  You will need
1474  to specify a network address over which to communicate (default
1475  is {}'s network, but you can use the network address of any
1476  active interface).
1477""".format(_context.default_nic_list[0]))
1478
1479    bindnetaddr_res = []
1480    mcastaddr_res = []
1481    mcastport_res = []
1482    default_ports = ["5405", "5407"]
1483    two_rings = False
1484
1485    for i in range(2):
1486        bindnetaddr = prompt_for_string(
1487                'IP or network address to bind to',
1488                default=pick_default_value(_context.default_ip_list, bindnetaddr_res),
1489                valid_func=Validation.valid_mcast_ip,
1490                prev_value=bindnetaddr_res)
1491        if not bindnetaddr:
1492            error("No value for bindnetaddr")
1493        bindnetaddr_res.append(bindnetaddr)
1494
1495        mcastaddr = prompt_for_string(
1496                'Multicast address',
1497                default=gen_mcastaddr(),
1498                valid_func=Validation.valid_mcast_address,
1499                prev_value=mcastaddr_res)
1500        if not mcastaddr:
1501            error("No value for mcastaddr")
1502        mcastaddr_res.append(mcastaddr)
1503
1504        mcastport = prompt_for_string(
1505                'Multicast port',
1506                match='[0-9]+',
1507                default=pick_default_value(default_ports, mcastport_res),
1508                valid_func=Validation.valid_port,
1509                prev_value=mcastport_res)
1510        if not mcastport:
1511            error("No value for mcastport")
1512        mcastport_res.append(mcastport)
1513
1514        if i == 1 or \
1515           not _context.second_heartbeat or \
1516           not confirm("\nConfigure a second multicast ring?"):
1517            break
1518        two_rings = True
1519
1520    nodeid = None
1521    if _context.ipv6:
1522        nodeid = utils.gen_nodeid_from_ipv6(_context.default_ip_list[0])
1523
1524    corosync.create_configuration(
1525        clustername=_context.cluster_name,
1526        bindnetaddr=bindnetaddr_res,
1527        mcastaddr=mcastaddr_res,
1528        mcastport=mcastport_res,
1529        ipv6=_context.ipv6,
1530        nodeid=nodeid,
1531        two_rings=two_rings)
1532    csync2_update(corosync.conf())
1533
1534
1535def init_corosync():
1536    """
1537    Configure corosync (unicast or multicast, encrypted?)
1538    """
1539    def requires_unicast():
1540        host = utils.detect_cloud()
1541        if host is not None:
1542            status("Detected cloud platform: {}".format(host))
1543        return host is not None
1544
1545    init_corosync_auth()
1546
1547    if os.path.exists(corosync.conf()):
1548        if not confirm("%s already exists - overwrite?" % (corosync.conf())):
1549            return
1550
1551    if _context.unicast or requires_unicast():
1552        init_corosync_unicast()
1553    else:
1554        init_corosync_multicast()
1555
1556
1557def init_sbd():
1558    """
1559    Configure SBD (Storage-based fencing).
1560
1561    SBD can also run in diskless mode if no device
1562    is configured.
1563    """
1564    _context.sbd_manager.sbd_init()
1565
1566
1567def init_ocfs2():
1568    """
1569    OCFS2 configure process
1570    """
1571    if not _context.ocfs2_devices:
1572        return
1573    ocfs2_manager = ocfs2.OCFS2Manager(_context)
1574    ocfs2_manager.init_ocfs2()
1575
1576
1577def init_cluster():
1578    """
1579    Initial cluster configuration.
1580    """
1581    init_cluster_local()
1582
1583    _rc, nnodes = utils.get_stdout("crm_node -l")
1584    nnodes = len(nnodes.splitlines())
1585    if nnodes < 1:
1586        error("No nodes found in cluster")
1587    if nnodes > 1:
1588        error("Joined existing cluster - will not reconfigure.")
1589
1590    status("Loading initial cluster configuration")
1591
1592    crm_configure_load("update", """
1593property cib-bootstrap-options: stonith-enabled=false
1594op_defaults op-options: timeout=600 record-pending=true
1595rsc_defaults rsc-options: resource-stickiness=1 migration-threshold=3
1596""")
1597
1598    _context.sbd_manager.configure_sbd_resource()
1599
1600
1601def init_admin():
1602    # Skip this section when -y is passed
1603    # unless $ADMIN_IP is set
1604    adminaddr = _context.admin_ip
1605    if _context.yes_to_all and not adminaddr:
1606        return
1607
1608    if not adminaddr:
1609        status("""
1610Configure Administration IP Address:
1611  Optionally configure an administration virtual IP
1612  address. The purpose of this IP address is to
1613  provide a single IP that can be used to interact
1614  with the cluster, rather than using the IP address
1615  of any specific cluster node.
1616""")
1617        if not confirm("Do you wish to configure a virtual IP address?"):
1618            return
1619
1620        adminaddr = prompt_for_string('Virtual IP', valid_func=Validation.valid_admin_ip)
1621        if not adminaddr:
1622            error("Expected an IP address")
1623
1624    crm_configure_load("update", 'primitive admin-ip IPaddr2 ip=%s op monitor interval=10 timeout=20' % (utils.doublequote(adminaddr)))
1625    wait_for_resource("Configuring virtual IP ({})".format(adminaddr), "admin-ip")
1626
1627
1628def evaluate_qdevice_quorum_effect(mode, diskless_sbd=False):
1629    """
1630    While adding/removing qdevice, get current expected votes and actual total votes,
1631    to calculate after adding/removing qdevice, whether cluster has quorum
1632    return different policy
1633    """
1634    quorum_votes_dict = utils.get_quorum_votes_dict()
1635    expected_votes = int(quorum_votes_dict["Expected"])
1636    actual_votes = int(quorum_votes_dict["Total"])
1637    if mode == QDEVICE_ADD:
1638        expected_votes += 1
1639    elif mode == QDEVICE_REMOVE:
1640        actual_votes -= 1
1641
1642    if utils.is_quorate(expected_votes, actual_votes) and not diskless_sbd:
1643        # safe to use reload
1644        return QdevicePolicy.QDEVICE_RELOAD
1645    elif utils.has_resource_running():
1646        # will lose quorum, and with RA running
1647        # no reload, no restart cluster service
1648        # just leave a warning
1649        return QdevicePolicy.QDEVICE_RESTART_LATER
1650    else:
1651        # will lose quorum, without RA running
1652        # safe to restart cluster service
1653        return QdevicePolicy.QDEVICE_RESTART
1654
1655
1656def configure_qdevice_interactive():
1657    """
1658    Configure qdevice on interactive mode
1659    """
1660    if _context.yes_to_all:
1661        return
1662    status("\nConfigure Qdevice/Qnetd:\n" + QDEVICE_HELP_INFO + "\n")
1663    if not confirm("Do you want to configure QDevice?"):
1664        return
1665    qnetd_addr = prompt_for_string("HOST or IP of the QNetd server to be used")
1666    if not qnetd_addr:
1667        error("Address of QNetd is required")
1668    qdevice_port = prompt_for_string("TCP PORT of QNetd server", default=5403)
1669    qdevice_algo = prompt_for_string("QNetd decision ALGORITHM (ffsplit/lms)", default="ffsplit")
1670    qdevice_tie_breaker = prompt_for_string("QNetd TIE_BREAKER (lowest/highest/valid node id)", default="lowest")
1671    qdevice_tls = prompt_for_string("Whether using TLS on QDevice/QNetd (on/off/required)", default="on")
1672    qdevice_heuristics = prompt_for_string("Heuristics COMMAND to run with absolute path; For multiple commands, use \";\" to separate")
1673    qdevice_heuristics_mode = prompt_for_string("MODE of operation of heuristics (on/sync/off)", default="sync") if qdevice_heuristics else None
1674    _context.qdevice_inst = corosync.QDevice(
1675            qnetd_addr,
1676            port=qdevice_port,
1677            algo=qdevice_algo,
1678            tie_breaker=qdevice_tie_breaker,
1679            tls=qdevice_tls,
1680            cmds=qdevice_heuristics,
1681            mode=qdevice_heuristics_mode)
1682    _context.qdevice_inst.valid_attr()
1683
1684
1685def init_qdevice():
1686    """
1687    Setup qdevice and qnetd service
1688    """
1689    if not _context.qdevice_inst:
1690        configure_qdevice_interactive()
1691    # If don't want to config qdevice, return
1692    if not _context.qdevice_inst:
1693        utils.disable_service("corosync-qdevice.service")
1694        return
1695    if _context.stage == "qdevice":
1696        utils.check_all_nodes_reachable()
1697        using_diskless_sbd = SBDManager.is_using_diskless_sbd()
1698        _context.qdevice_reload_policy = evaluate_qdevice_quorum_effect(QDEVICE_ADD, using_diskless_sbd)
1699        # add qdevice after diskless sbd started
1700        if using_diskless_sbd:
1701            res = SBDManager.get_sbd_value_from_config("SBD_WATCHDOG_TIMEOUT")
1702            if res:
1703                sbd_watchdog_timeout = max(int(res), SBDManager.SBD_WATCHDOG_TIMEOUT_DEFAULT_WITH_QDEVICE)
1704            else:
1705                sbd_watchdog_timeout = SBDManager.SBD_WATCHDOG_TIMEOUT_DEFAULT_WITH_QDEVICE
1706            stonith_timeout = SBDManager.calculate_stonith_timeout(sbd_watchdog_timeout)
1707            SBDManager.update_configuration({"SBD_WATCHDOG_TIMEOUT": str(sbd_watchdog_timeout)})
1708            invokerc("crm configure property stonith-watchdog-timeout=-1 stonith-timeout={}s".format(stonith_timeout))
1709
1710    status("""
1711Configure Qdevice/Qnetd:""")
1712    qdevice_inst = _context.qdevice_inst
1713    qnetd_addr = qdevice_inst.qnetd_addr
1714    # Configure ssh passwordless to qnetd if detect password is needed
1715    if utils.check_ssh_passwd_need(qnetd_addr):
1716        status("Copy ssh key to qnetd node({})".format(qnetd_addr))
1717        rc, _, err = invoke("ssh-copy-id -i /root/.ssh/id_rsa.pub root@{}".format(qnetd_addr))
1718        if not rc:
1719            error("Failed to copy ssh key: {}".format(err))
1720    # Start qdevice service if qdevice already configured
1721    if utils.is_qdevice_configured() and not confirm("Qdevice is already configured - overwrite?"):
1722        start_qdevice_service()
1723        return
1724
1725    # Validate qnetd node
1726    qdevice_inst.valid_qnetd()
1727    # Config qdevice
1728    config_qdevice()
1729    # Execute certificate process when tls flag is on
1730    if utils.is_qdevice_tls_on():
1731        with status_long("Qdevice certification process"):
1732            qdevice_inst.certificate_process_on_init()
1733
1734    start_qdevice_service()
1735
1736
1737def start_qdevice_service():
1738    """
1739    Start qdevice and qnetd service
1740    """
1741    qdevice_inst = _context.qdevice_inst
1742    qnetd_addr = qdevice_inst.qnetd_addr
1743
1744    status("Enable corosync-qdevice.service in cluster")
1745    utils.cluster_run_cmd("systemctl enable corosync-qdevice")
1746    if _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RELOAD:
1747        status("Starting corosync-qdevice.service in cluster")
1748        utils.cluster_run_cmd("systemctl restart corosync-qdevice")
1749    elif _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RESTART:
1750        status("Restarting cluster service")
1751        utils.cluster_run_cmd("crm cluster restart")
1752        wait_for_cluster()
1753    else:
1754        warn("To use qdevice service, need to restart cluster service manually on each node")
1755
1756    status("Enable corosync-qnetd.service on {}".format(qnetd_addr))
1757    qdevice_inst.enable_qnetd()
1758    status("Starting corosync-qnetd.service on {}".format(qnetd_addr))
1759    qdevice_inst.start_qnetd()
1760
1761
1762def config_qdevice():
1763    """
1764    Process of config qdevice
1765    """
1766    qdevice_inst = _context.qdevice_inst
1767
1768    qdevice_inst.remove_qdevice_db()
1769    qdevice_inst.write_qdevice_config()
1770    if not corosync.is_unicast():
1771        corosync.add_nodelist_from_cmaptool()
1772    with status_long("Update configuration"):
1773        update_expected_votes()
1774        if _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RELOAD:
1775            utils.cluster_run_cmd("crm corosync reload")
1776
1777
1778def init():
1779    """
1780    Basic init
1781    """
1782    log_start()
1783    init_network()
1784
1785
1786def join_ssh(seed_host):
1787    """
1788    SSH configuration for joining node.
1789    """
1790    if not seed_host:
1791        error("No existing IP/hostname specified (use -c option)")
1792
1793    utils.start_service("sshd.service", enable=True)
1794    for user in USER_LIST:
1795        configure_local_ssh_key(user)
1796        swap_public_ssh_key(seed_host, user)
1797
1798    # This makes sure the seed host has its own SSH keys in its own
1799    # authorized_keys file (again, to help with the case where the
1800    # user has done manual initial setup without the assistance of
1801    # ha-cluster-init).
1802    rc, _, err = invoke("ssh {} root@{} crm cluster init -i {} ssh_remote".format(SSH_OPTION, seed_host, _context.default_nic_list[0]))
1803    if not rc:
1804        error("Can't invoke crm cluster init -i {} ssh_remote on {}: {}".format(_context.default_nic_list[0], seed_host, err))
1805
1806
1807def swap_public_ssh_key(remote_node, user="root"):
1808    """
1809    Swap public ssh key between remote_node and local
1810    """
1811    if user != "root" and not _context.with_other_user:
1812        return
1813
1814    _, public_key, authorized_file = key_files(user).values()
1815    # Detect whether need password to login to remote_node
1816    if utils.check_ssh_passwd_need(remote_node, user):
1817        # If no passwordless configured, paste /root/.ssh/id_rsa.pub to remote_node's /root/.ssh/authorized_keys
1818        status("Configuring SSH passwordless with {}@{}".format(user, remote_node))
1819        # After this, login to remote_node is passwordless
1820        append_to_remote_file(public_key, remote_node, authorized_file)
1821
1822    try:
1823        # Fetch public key file from remote_node
1824        public_key_file_remote = fetch_public_key_from_remote_node(remote_node, user)
1825    except ValueError as err:
1826        warn(err)
1827        return
1828    # Append public key file from remote_node to local's /root/.ssh/authorized_keys
1829    # After this, login from remote_node is passwordless
1830    # Should do this step even passwordless is True, to make sure we got two-way passwordless
1831    append_unique(public_key_file_remote, authorized_file)
1832
1833
1834def fetch_public_key_from_remote_node(node, user="root"):
1835    """
1836    Fetch public key file from remote node
1837    Return a temp file contains public key
1838    Return None if no key exist
1839    """
1840
1841    # For dsa, might need to add PubkeyAcceptedKeyTypes=+ssh-dss to config file, see
1842    # https://superuser.com/questions/1016989/ssh-dsa-keys-no-longer-work-for-password-less-authentication
1843    home_dir = userdir.gethomedir(user)
1844    for key in ("id_rsa", "id_ecdsa", "id_ed25519", "id_dsa"):
1845        public_key_file = "{}/.ssh/{}.pub".format(home_dir, key)
1846        cmd = "ssh {} root@{} 'test -f {}'".format(SSH_OPTION, node, public_key_file)
1847        if not invokerc(cmd):
1848            continue
1849        _, temp_public_key_file = tmpfiles.create()
1850        cmd = "scp {} root@{}:{} {}".format(SSH_OPTION, node, public_key_file, temp_public_key_file)
1851        rc, _, err = invoke(cmd)
1852        if not rc:
1853            error("Failed to run \"{}\": {}".format(cmd, err))
1854        return temp_public_key_file
1855    raise ValueError("No ssh key exist on {}".format(node))
1856
1857
1858def join_csync2(seed_host):
1859    """
1860    Csync2 configuration for joining node.
1861    """
1862    if not seed_host:
1863        error("No existing IP/hostname specified (use -c option)")
1864    with status_long("Configuring csync2"):
1865
1866        # Necessary if re-running join on a node that's been configured before.
1867        rmfile("/var/lib/csync2/{}.db3".format(utils.this_node()), ignore_errors=True)
1868
1869        # Not automatically updating /etc/hosts - risky in the general case.
1870        # etc_hosts_add_me
1871        # local hosts_line=$(etc_hosts_get_me)
1872        # [ -n "$hosts_line" ] || error "No valid entry for $(hostname) in /etc/hosts - csync2 can't work"
1873
1874        # If we *were* updating /etc/hosts, the next line would have "\"$hosts_line\"" as
1875        # the last arg (but this requires re-enabling this functionality in ha-cluster-init)
1876        cmd = "crm cluster init -i {} csync2_remote {}".format(_context.default_nic_list[0], utils.this_node())
1877        rc, _, err = invoke("ssh {} root@{} {}".format(SSH_OPTION, seed_host, cmd))
1878        if not rc:
1879            error("Can't invoke \"{}\" on {}: {}".format(cmd, seed_host, err))
1880
1881        # This is necessary if syncing /etc/hosts (to ensure everyone's got the
1882        # same list of hosts)
1883        # local tmp_conf=/etc/hosts.$$
1884        # invoke scp root@seed_host:/etc/hosts $tmp_conf \
1885        #   || error "Can't retrieve /etc/hosts from seed_host"
1886        # install_tmp $tmp_conf /etc/hosts
1887        rc, _, err = invoke("scp root@%s:'/etc/csync2/{csync2.cfg,key_hagroup}' /etc/csync2" % (seed_host))
1888        if not rc:
1889            error("Can't retrieve csync2 config from {}: {}".format(seed_host, err))
1890
1891        utils.start_service("csync2.socket", enable=True)
1892
1893        # Sync new config out.  This goes to all hosts; csync2.cfg definitely
1894        # needs to go to all hosts (else hosts other than the seed and the
1895        # joining host won't have the joining host in their config yet).
1896        # Strictly, the rest of the files need only go to the new host which
1897        # could theoretically be effected using `csync2 -xv -P $(hostname)`,
1898        # but this still leaves all the other files in dirty state (becuase
1899        # they haven't gone to all nodes in the cluster, which means a
1900        # subseqent join of another node can fail its sync of corosync.conf
1901        # when it updates expected_votes.  Grrr...
1902        if not invokerc('ssh {} root@{} "csync2 -rm /; csync2 -rxv || csync2 -rf / && csync2 -rxv"'.format(SSH_OPTION, seed_host)):
1903            print("")
1904            warn("csync2 run failed - some files may not be sync'd")
1905
1906
1907def join_ssh_merge(_cluster_node):
1908    status("Merging known_hosts")
1909
1910    me = utils.this_node()
1911    hosts = [m.group(1)
1912             for m in re.finditer(r"^\s*host\s*([^ ;]+)\s*;", open(CSYNC2_CFG).read(), re.M)
1913             if m.group(1) != me]
1914    if not hosts:
1915        hosts = [_cluster_node]
1916        warn("Unable to extract host list from %s" % (CSYNC2_CFG))
1917
1918    try:
1919        import parallax
1920    except ImportError:
1921        error("parallax python library is missing")
1922
1923    opts = parallax.Options()
1924    opts.ssh_options = ['StrictHostKeyChecking=no']
1925
1926    # The act of using pssh to connect to every host (without strict host key
1927    # checking) ensures that at least *this* host has every other host in its
1928    # known_hosts
1929    known_hosts_new = set()
1930    cat_cmd = "[ -e /root/.ssh/known_hosts ] && cat /root/.ssh/known_hosts || true"
1931    log("parallax.call {} : {}".format(hosts, cat_cmd))
1932    results = parallax.call(hosts, cat_cmd, opts)
1933    for host, result in results.items():
1934        if isinstance(result, parallax.Error):
1935            warn("Failed to get known_hosts from {}: {}".format(host, str(result)))
1936        else:
1937            if result[1]:
1938                known_hosts_new.update((utils.to_ascii(result[1]) or "").splitlines())
1939    if known_hosts_new:
1940        hoststxt = "\n".join(sorted(known_hosts_new))
1941        tmpf = utils.str2tmp(hoststxt)
1942        log("parallax.copy {} : {}".format(hosts, hoststxt))
1943        results = parallax.copy(hosts, tmpf, "/root/.ssh/known_hosts")
1944        for host, result in results.items():
1945            if isinstance(result, parallax.Error):
1946                warn("scp to {} failed ({}), known_hosts update may be incomplete".format(host, str(result)))
1947
1948
1949def update_expected_votes():
1950    # get a list of nodes, excluding remote nodes
1951    nodelist = None
1952    loop_count = 0
1953    device_votes = 0
1954    nodecount = 0
1955    expected_votes = 0
1956    while True:
1957        rc, nodelist_text = utils.get_stdout("cibadmin -Ql --xpath '/cib/status/node_state'")
1958        if rc == 0:
1959            try:
1960                nodelist_xml = etree.fromstring(nodelist_text)
1961                nodelist = [n.get('uname') for n in nodelist_xml.xpath('//node_state') if n.get('remote_node') != 'true']
1962                if len(nodelist) >= 2:
1963                    break
1964            except Exception:
1965                break
1966        # timeout: 10 seconds
1967        if loop_count == 10:
1968            break
1969        loop_count += 1
1970        sleep(1)
1971
1972    # Increase expected_votes
1973    # TODO: wait to adjust expected_votes until after cluster join,
1974    # so that we can ask the cluster for the current membership list
1975    # Have to check if a qnetd device is configured and increase
1976    # expected_votes in that case
1977    is_qdevice_configured = utils.is_qdevice_configured()
1978    if nodelist is None:
1979        for v in corosync.get_values("quorum.expected_votes"):
1980            expected_votes = v
1981
1982            # For node >= 2, expected_votes = nodecount + device_votes
1983            # Assume nodecount is N, for ffsplit, qdevice only has one vote
1984            # which means that device_votes is 1, ie:expected_votes = N + 1;
1985            # while for lms, qdevice has N - 1 votes, ie: expected_votes = N + (N - 1)
1986            # and update quorum.device.net.algorithm based on device_votes
1987
1988            if corosync.get_value("quorum.device.net.algorithm") == "lms":
1989                device_votes = int((expected_votes - 1) / 2)
1990                nodecount = expected_votes - device_votes
1991                # as nodecount will increase 1, and device_votes is nodecount - 1
1992                # device_votes also increase 1
1993                device_votes += 1
1994            elif corosync.get_value("quorum.device.net.algorithm") == "ffsplit":
1995                device_votes = 1
1996                nodecount = expected_votes - device_votes
1997            elif is_qdevice_configured:
1998                device_votes = 0
1999                nodecount = v
2000
2001            nodecount += 1
2002            expected_votes = nodecount + device_votes
2003            corosync.set_value("quorum.expected_votes", str(expected_votes))
2004    else:
2005        nodecount = len(nodelist)
2006        expected_votes = 0
2007        # For node >= 2, expected_votes = nodecount + device_votes
2008        # Assume nodecount is N, for ffsplit, qdevice only has one vote
2009        # which means that device_votes is 1, ie:expected_votes = N + 1;
2010        # while for lms, qdevice has N - 1 votes, ie: expected_votes = N + (N - 1)
2011        if corosync.get_value("quorum.device.net.algorithm") == "ffsplit":
2012            device_votes = 1
2013        if corosync.get_value("quorum.device.net.algorithm") == "lms":
2014            device_votes = nodecount - 1
2015
2016        if nodecount > 1:
2017            expected_votes = nodecount + device_votes
2018
2019        if corosync.get_value("quorum.expected_votes"):
2020            corosync.set_value("quorum.expected_votes", str(expected_votes))
2021    if is_qdevice_configured:
2022        corosync.set_value("quorum.device.votes", device_votes)
2023    corosync.set_value("quorum.two_node", 1 if expected_votes == 2 else 0)
2024
2025    csync2_update(corosync.conf())
2026
2027
2028def setup_passwordless_with_other_nodes(init_node):
2029    """
2030    Setup passwordless with other cluster nodes
2031
2032    Should fetch the node list from init node, then swap the key
2033    """
2034    # Fetch cluster nodes list
2035    cmd = "ssh {} root@{} crm_node -l".format(SSH_OPTION, init_node)
2036    rc, out, err = utils.get_stdout_stderr(cmd)
2037    if rc != 0:
2038        error("Can't fetch cluster nodes list from {}: {}".format(init_node, err))
2039    cluster_nodes_list = []
2040    for line in out.splitlines():
2041        _, node, stat = line.split()
2042        if stat == "member":
2043            cluster_nodes_list.append(node)
2044
2045    # Filter out init node from cluster_nodes_list
2046    cmd = "ssh {} root@{} hostname".format(SSH_OPTION, init_node)
2047    rc, out, err = utils.get_stdout_stderr(cmd)
2048    if rc != 0:
2049        error("Can't fetch hostname of {}: {}".format(init_node, err))
2050    if out in cluster_nodes_list:
2051        cluster_nodes_list.remove(out)
2052
2053    # Swap ssh public key between join node and other cluster nodes
2054    for node in cluster_nodes_list:
2055        for user in USER_LIST:
2056            swap_public_ssh_key(node, user)
2057
2058
2059def sync_files_to_disk():
2060    """
2061    Sync file content to disk between cluster nodes
2062    """
2063    files_string = ' '.join(filter(lambda f: os.path.isfile(f), FILES_TO_SYNC))
2064    if files_string:
2065        utils.cluster_run_cmd("sync {}".format(files_string.strip()))
2066
2067
2068def join_cluster(seed_host):
2069    """
2070    Cluster configuration for joining node.
2071    """
2072    def get_local_nodeid():
2073        # for IPv6
2074        return utils.gen_nodeid_from_ipv6(_context.local_ip_list[0])
2075
2076    def update_nodeid(nodeid, node=None):
2077        # for IPv6
2078        if node and node != utils.this_node():
2079            cmd = "crm corosync set totem.nodeid %d" % nodeid
2080            invoke("crm cluster run '{}' {}".format(cmd, node))
2081        else:
2082            corosync.set_value("totem.nodeid", nodeid)
2083
2084    shutil.copy(corosync.conf(), COROSYNC_CONF_ORIG)
2085
2086    # check if use IPv6
2087    ipv6_flag = False
2088    ipv6 = corosync.get_value("totem.ip_version")
2089    if ipv6 and ipv6 == "ipv6":
2090        ipv6_flag = True
2091    _context.ipv6 = ipv6_flag
2092
2093    init_network()
2094
2095    # check whether have two rings
2096    rrp_flag = False
2097    rrp = corosync.get_value("totem.rrp_mode")
2098    if rrp in ('active', 'passive'):
2099        rrp_flag = True
2100
2101    # It would be massively useful at this point if new nodes could come
2102    # up in standby mode, so we could query the CIB locally to see if
2103    # there was any further local setup that needed doing, e.g.: creating
2104    # mountpoints for clustered filesystems.  Unfortunately we don't have
2105    # that yet, so the following crawling horror takes a punt on the seed
2106    # node being up, then asks it for a list of mountpoints...
2107    if _context.cluster_node:
2108        _rc, outp, _ = utils.get_stdout_stderr("ssh {} root@{} 'cibadmin -Q --xpath \"//primitive\"'".format(SSH_OPTION, seed_host))
2109        if outp:
2110            xml = etree.fromstring(outp)
2111            mountpoints = xml.xpath(' and '.join(['//primitive[@class="ocf"',
2112                                                  '@provider="heartbeat"',
2113                                                  '@type="Filesystem"]']) +
2114                                    '/instance_attributes/nvpair[@name="directory"]/@value')
2115            for m in mountpoints:
2116                invoke("mkdir -p {}".format(m))
2117    else:
2118        status("No existing IP/hostname specified - skipping mountpoint detection/creation")
2119
2120    # Bump expected_votes in corosync.conf
2121    # TODO(must): this is rather fragile (see related code in ha-cluster-remove)
2122
2123    # If corosync.conf() doesn't exist or is empty, we will fail here. (bsc#943227)
2124    if not os.path.exists(corosync.conf()):
2125        error("{} is not readable. Please ensure that hostnames are resolvable.".format(corosync.conf()))
2126
2127    # if unicast, we need to add our node to $corosync.conf()
2128    is_unicast = corosync.is_unicast()
2129    if is_unicast:
2130        ringXaddr_res = []
2131        for i in 0, 1:
2132            while True:
2133                ringXaddr = prompt_for_string(
2134                        'Address for ring{}'.format(i),
2135                        default=pick_default_value(_context.default_ip_list, ringXaddr_res),
2136                        valid_func=Validation.valid_ucast_ip,
2137                        prev_value=ringXaddr_res)
2138                if not ringXaddr:
2139                    error("No value for ring{}".format(i))
2140                ringXaddr_res.append(ringXaddr)
2141                break
2142            if not rrp_flag:
2143                break
2144        print("")
2145        invoke("rm -f /var/lib/heartbeat/crm/* /var/lib/pacemaker/cib/*")
2146        try:
2147            corosync.add_node_ucast(ringXaddr_res)
2148        except corosync.IPAlreadyConfiguredError as e:
2149            warn(e)
2150        csync2_update(corosync.conf())
2151        invoke("ssh {} root@{} corosync-cfgtool -R".format(SSH_OPTION, seed_host))
2152
2153    _context.sbd_manager.join_sbd(seed_host)
2154
2155    if ipv6_flag and not is_unicast:
2156        # for ipv6 mcast
2157        # using ipv6 need nodeid configured
2158        local_nodeid = get_local_nodeid()
2159        update_nodeid(local_nodeid)
2160
2161    is_qdevice_configured = utils.is_qdevice_configured()
2162    if is_qdevice_configured and not is_unicast:
2163        # expected_votes here maybe is "0", set to "3" to make sure cluster can start
2164        corosync.set_value("quorum.expected_votes", "3")
2165
2166    # Initialize the cluster before adjusting quorum. This is so
2167    # that we can query the cluster to find out how many nodes
2168    # there are (so as not to adjust multiple times if a previous
2169    # attempt to join the cluster failed)
2170    init_cluster_local()
2171
2172    with status_long("Reloading cluster configuration"):
2173
2174        if ipv6_flag and not is_unicast:
2175            # for ipv6 mcast
2176            nodeid_dict = {}
2177            _rc, outp, _ = utils.get_stdout_stderr("crm_node -l")
2178            if _rc == 0:
2179                for line in outp.split('\n'):
2180                    tmp = line.split()
2181                    nodeid_dict[tmp[1]] = tmp[0]
2182
2183        # apply nodelist in cluster
2184        if is_unicast or is_qdevice_configured:
2185            invoke("crm cluster run 'crm corosync reload'")
2186
2187        update_expected_votes()
2188        # Trigger corosync config reload to ensure expected_votes is propagated
2189        invoke("corosync-cfgtool -R")
2190
2191        # Ditch no-quorum-policy=ignore
2192        _rc, outp = utils.get_stdout("crm configure show")
2193        if re.search('no-quorum-policy=.*ignore', outp):
2194            invoke("crm_attribute --attr-name no-quorum-policy --delete-attr")
2195
2196        # if unicast, we need to reload the corosync configuration
2197        # on the other nodes
2198        if is_unicast:
2199            invoke("crm cluster run 'crm corosync reload'")
2200
2201        if ipv6_flag and not is_unicast:
2202            # for ipv6 mcast
2203            # after csync2_update, all config files are same
2204            # but nodeid must be uniqe
2205            for node in list(nodeid_dict.keys()):
2206                if node == utils.this_node():
2207                    continue
2208                update_nodeid(int(nodeid_dict[node]), node)
2209            update_nodeid(local_nodeid)
2210
2211        sync_files_to_disk()
2212
2213    if is_qdevice_configured:
2214        start_qdevice_on_join_node(seed_host)
2215    else:
2216        utils.disable_service("corosync-qdevice.service")
2217
2218
2219def start_qdevice_on_join_node(seed_host):
2220    """
2221    Doing qdevice certificate process and start qdevice service on join node
2222    """
2223    with status_long("Starting corosync-qdevice.service"):
2224        if not corosync.is_unicast():
2225            corosync.add_nodelist_from_cmaptool()
2226            csync2_update(corosync.conf())
2227            invoke("crm corosync reload")
2228        if utils.is_qdevice_tls_on():
2229            qnetd_addr = corosync.get_value("quorum.device.net.host")
2230            qdevice_inst = corosync.QDevice(qnetd_addr, cluster_node=seed_host)
2231            qdevice_inst.certificate_process_on_join()
2232        utils.start_service("corosync-qdevice.service", enable=True)
2233
2234
2235def set_cluster_node_ip():
2236    """
2237    ringx_addr might be hostname or IP
2238    _context.cluster_node by now is always hostname
2239
2240    If ring0_addr is IP, we should get the configured iplist which belong _context.cluster_node
2241    Then filter out which one is configured as ring0_addr
2242    At last assign that ip to _context.cluster_node_ip which will be removed later
2243    """
2244    node = _context.cluster_node
2245    addr_list = corosync.get_values('nodelist.node.ring0_addr')
2246    if node in addr_list:
2247        return
2248
2249    ip_list = utils.get_iplist_from_name(node)
2250    for ip in ip_list:
2251        if ip in addr_list:
2252            _context.cluster_node_ip = ip
2253            break
2254
2255
2256def stop_services(stop_list, remote_addr=None):
2257    """
2258    Stop cluster related service
2259    """
2260    for service in stop_list:
2261        if utils.service_is_active(service, remote_addr=remote_addr):
2262            status("Stopping the {}".format(service))
2263            utils.stop_service(service, disable=True, remote_addr=remote_addr)
2264
2265
2266def remove_node_from_cluster():
2267    """
2268    Remove node from running cluster and the corosync / pacemaker configuration.
2269    """
2270    node = _context.cluster_node
2271    set_cluster_node_ip()
2272
2273    stop_services(SERVICES_STOP_LIST, remote_addr=node)
2274
2275    # delete configuration files from the node to be removed
2276    rc, _, err = invoke('ssh {} root@{} "bash -c \\\"rm -f {}\\\""'.format(SSH_OPTION, node, " ".join(_context.rm_list)))
2277    if not rc:
2278        error("Deleting the configuration files failed: {}".format(err))
2279
2280    # execute the command : crm node delete $HOSTNAME
2281    status("Removing the node {}".format(node))
2282    if not invokerc("crm node delete {}".format(node)):
2283        error("Failed to remove {}".format(node))
2284
2285    if not invokerc("sed -i /{}/d {}".format(node, CSYNC2_CFG)):
2286        error("Removing the node {} from {} failed".format(node, CSYNC2_CFG))
2287
2288    # Remove node from nodelist
2289    if corosync.get_values("nodelist.node.ring0_addr"):
2290        del_target = _context.cluster_node_ip or node
2291        corosync.del_node(del_target)
2292
2293    decrease_expected_votes()
2294
2295    status("Propagating configuration changes across the remaining nodes")
2296    csync2_update(CSYNC2_CFG)
2297    csync2_update(corosync.conf())
2298
2299    # Trigger corosync config reload to ensure expected_votes is propagated
2300    invoke("corosync-cfgtool -R")
2301
2302
2303def decrease_expected_votes():
2304    '''
2305    Decrement expected_votes in corosync.conf
2306    '''
2307    vote = corosync.get_value("quorum.expected_votes")
2308    if not vote:
2309        return
2310    quorum = int(vote)
2311    new_quorum = quorum - 1
2312    if utils.is_qdevice_configured():
2313        new_nodecount = 0
2314        device_votes = 0
2315        nodecount = 0
2316
2317        if corosync.get_value("quorum.device.net.algorithm") == "lms":
2318            nodecount = int((quorum + 1)/2)
2319            new_nodecount = nodecount - 1
2320            device_votes = new_nodecount - 1
2321
2322        elif corosync.get_value("quorum.device.net.algorithm") == "ffsplit":
2323            device_votes = 1
2324            nodecount = quorum - device_votes
2325            new_nodecount = nodecount - 1
2326
2327        if new_nodecount > 1:
2328            new_quorum = new_nodecount + device_votes
2329        else:
2330            new_quorum = 0
2331
2332        corosync.set_value("quorum.device.votes", device_votes)
2333    else:
2334        corosync.set_value("quorum.two_node", 1 if new_quorum == 2 else 0)
2335    corosync.set_value("quorum.expected_votes", str(new_quorum))
2336
2337
2338def bootstrap_init(context):
2339    """
2340    Init cluster process
2341    """
2342    global _context
2343    _context = context
2344
2345    init()
2346
2347    stage = _context.stage
2348    if stage is None:
2349        stage = ""
2350
2351    # vgfs stage requires running cluster, everything else requires inactive cluster,
2352    # except ssh and csync2 (which don't care) and csync2_remote (which mustn't care,
2353    # just in case this breaks ha-cluster-join on another node).
2354    corosync_active = utils.service_is_active("corosync.service")
2355    if stage in ("vgfs", "admin", "qdevice", "ocfs2"):
2356        if not corosync_active:
2357            error("Cluster is inactive - can't run %s stage" % (stage))
2358    elif stage == "":
2359        if corosync_active:
2360            error("Cluster is currently active - can't run")
2361    elif stage not in ("ssh", "ssh_remote", "csync2", "csync2_remote", "sbd", "ocfs2"):
2362        if corosync_active:
2363            error("Cluster is currently active - can't run %s stage" % (stage))
2364
2365    _context.initialize_qdevice()
2366    _context.validate_option()
2367    _context.init_sbd_manager()
2368
2369    # Need hostname resolution to work, want NTP (but don't block ssh_remote or csync2_remote)
2370    if stage not in ('ssh_remote', 'csync2_remote'):
2371        check_tty()
2372        if not check_prereqs(stage):
2373            return
2374    elif stage == 'csync2_remote':
2375        args = _context.args
2376        log("args: {}".format(args))
2377        if len(args) != 2:
2378            error("Expected NODE argument to csync2_remote")
2379        _context.cluster_node = args[1]
2380
2381    if stage != "":
2382        globals()["init_" + stage]()
2383    else:
2384        init_ssh()
2385        init_csync2()
2386        init_corosync()
2387        init_remote_auth()
2388        init_sbd()
2389
2390        lock_inst = lock.Lock()
2391        try:
2392            with lock_inst.lock():
2393                init_cluster()
2394                init_admin()
2395                init_qdevice()
2396                init_ocfs2()
2397        except lock.ClaimLockError as err:
2398            error(err)
2399
2400    status("Done (log saved to %s)" % (LOG_FILE))
2401
2402
2403def bootstrap_join(context):
2404    """
2405    Join cluster process
2406    """
2407    global _context
2408    _context = context
2409
2410    init()
2411    _context.init_sbd_manager()
2412    _context.validate_option()
2413
2414    check_tty()
2415
2416    corosync_active = utils.service_is_active("corosync.service")
2417    if corosync_active and _context.stage != "ssh":
2418        error("Abort: Cluster is currently active. Run this command on a node joining the cluster.")
2419
2420    if not check_prereqs("join"):
2421        return
2422
2423    cluster_node = _context.cluster_node
2424    if _context.stage != "":
2425        globals()["join_" + _context.stage](cluster_node)
2426    else:
2427        if not _context.yes_to_all and cluster_node is None:
2428            status("""Join This Node to Cluster:
2429  You will be asked for the IP address of an existing node, from which
2430  configuration will be copied.  If you have not already configured
2431  passwordless ssh between nodes, you will be prompted for the root
2432  password of the existing node.
2433""")
2434            cluster_node = prompt_for_string("IP address or hostname of existing node (e.g.: 192.168.1.1)", ".+")
2435            _context.cluster_node = cluster_node
2436
2437        utils.ping_node(cluster_node)
2438
2439        join_ssh(cluster_node)
2440
2441        if not utils.service_is_active("pacemaker.service", cluster_node):
2442            error("Cluster is inactive on {}".format(cluster_node))
2443
2444        lock_inst = lock.RemoteLock(cluster_node)
2445        try:
2446            with lock_inst.lock():
2447                setup_passwordless_with_other_nodes(cluster_node)
2448                join_remote_auth(cluster_node)
2449                join_csync2(cluster_node)
2450                join_ssh_merge(cluster_node)
2451                probe_partitions()
2452                join_ocfs2(cluster_node)
2453                join_cluster(cluster_node)
2454        except (lock.SSHError, lock.ClaimLockError) as err:
2455            error(err)
2456
2457    status("Done (log saved to %s)" % (LOG_FILE))
2458
2459
2460def join_ocfs2(peer_host):
2461    """
2462    If init node configured OCFS2 device, verify that device on join node
2463    """
2464    ocfs2_inst = ocfs2.OCFS2Manager(_context)
2465    ocfs2_inst.join_ocfs2(peer_host)
2466
2467
2468def join_remote_auth(node):
2469    if os.path.exists(PCMK_REMOTE_AUTH):
2470        rmfile(PCMK_REMOTE_AUTH)
2471    pcmk_remote_dir = os.path.dirname(PCMK_REMOTE_AUTH)
2472    mkdirs_owned(pcmk_remote_dir, mode=0o750, gid="haclient")
2473    invoke("touch {}".format(PCMK_REMOTE_AUTH))
2474
2475
2476def remove_qdevice():
2477    """
2478    Remove qdevice service and configuration from cluster
2479    """
2480    if not utils.is_qdevice_configured():
2481        error("No QDevice configuration in this cluster")
2482    if not confirm("Removing QDevice service and configuration from cluster: Are you sure?"):
2483        return
2484
2485    utils.check_all_nodes_reachable()
2486    _context.qdevice_reload_policy = evaluate_qdevice_quorum_effect(QDEVICE_REMOVE)
2487
2488    status("Disable corosync-qdevice.service")
2489    invoke("crm cluster run 'systemctl disable corosync-qdevice'")
2490    if _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RELOAD:
2491        status("Stopping corosync-qdevice.service")
2492        invoke("crm cluster run 'systemctl stop corosync-qdevice'")
2493
2494    with status_long("Removing QDevice configuration from cluster"):
2495        qnetd_host = corosync.get_value('quorum.device.net.host')
2496        qdevice_inst = corosync.QDevice(qnetd_host)
2497        qdevice_inst.remove_qdevice_config()
2498        qdevice_inst.remove_qdevice_db()
2499        update_expected_votes()
2500    if _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RELOAD:
2501        invoke("crm cluster run 'crm corosync reload'")
2502    elif _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RESTART:
2503        status("Restarting cluster service")
2504        utils.cluster_run_cmd("crm cluster restart")
2505        wait_for_cluster()
2506    else:
2507        warn("To remove qdevice service, need to restart cluster service manually on each node")
2508
2509
2510def bootstrap_remove(context):
2511    """
2512    Remove node from cluster, or remove qdevice configuration
2513    """
2514    global _context
2515    _context = context
2516    force_flag = config.core.force or _context.force
2517
2518    init()
2519
2520    if not utils.service_is_active("corosync.service"):
2521        error("Cluster is not active - can't execute removing action")
2522
2523    if _context.qdevice_rm_flag and _context.cluster_node:
2524        error("Either remove node or qdevice")
2525
2526    if _context.qdevice_rm_flag:
2527        remove_qdevice()
2528        return
2529
2530    if not _context.yes_to_all and _context.cluster_node is None:
2531        status("""Remove This Node from Cluster:
2532  You will be asked for the IP address or name of an existing node,
2533  which will be removed from the cluster. This command must be
2534  executed from a different node in the cluster.
2535""")
2536        _context.cluster_node = prompt_for_string("IP address or hostname of cluster node (e.g.: 192.168.1.1)", ".+")
2537
2538    if not _context.cluster_node:
2539        error("No existing IP/hostname specified (use -c option)")
2540
2541    _context.cluster_node = get_cluster_node_hostname()
2542
2543    if not force_flag and not confirm("Removing node \"{}\" from the cluster: Are you sure?".format(_context.cluster_node)):
2544        return
2545
2546    if _context.cluster_node == utils.this_node():
2547        if not force_flag:
2548            error("Removing self requires --force")
2549        remove_self()
2550        return
2551
2552    if _context.cluster_node in xmlutil.listnodes():
2553        remove_node_from_cluster()
2554    else:
2555        error("Specified node {} is not configured in cluster! Unable to remove.".format(_context.cluster_node))
2556
2557
2558def remove_self():
2559    me = _context.cluster_node
2560    yes_to_all = _context.yes_to_all
2561    nodes = xmlutil.listnodes(include_remote_nodes=False)
2562    othernode = next((x for x in nodes if x != me), None)
2563    if othernode is not None:
2564        # remove from other node
2565        cmd = "crm cluster remove{} -c {}".format(" -y" if yes_to_all else "", me)
2566        rc = utils.ext_cmd_nosudo("ssh{} {} {} '{}'".format("" if yes_to_all else " -t", SSH_OPTION, othernode, cmd))
2567        if rc != 0:
2568            error("Failed to remove this node from {}".format(othernode))
2569    else:
2570        # disable and stop cluster
2571        stop_services(SERVICES_STOP_LIST)
2572        # remove all trace of cluster from this node
2573        # delete configuration files from the node to be removed
2574        if not invokerc('bash -c "rm -f {}"'.format(" ".join(_context.rm_list))):
2575            error("Deleting the configuration files failed")
2576
2577
2578def init_common_geo():
2579    """
2580    Tasks to do both on first and other geo nodes.
2581    """
2582    if not utils.package_is_installed("booth"):
2583        error("Booth not installed - Not configurable as a geo cluster node.")
2584
2585
2586def init_csync2_geo():
2587    """
2588    TODO: Configure csync2 for geo cluster
2589    That is, create a second sync group which
2590    syncs the geo configuration across the whole
2591    geo cluster.
2592    """
2593
2594
2595def create_booth_authkey():
2596    status("Create authentication key for booth")
2597    if os.path.exists(BOOTH_AUTH):
2598        rmfile(BOOTH_AUTH)
2599    rc, _, err = invoke("booth-keygen {}".format(BOOTH_AUTH))
2600    if not rc:
2601        error("Failed to generate booth authkey: {}".format(err))
2602
2603
2604def create_booth_config(arbitrator, clusters, tickets):
2605    status("Configure booth")
2606
2607    config_template = """# The booth configuration file is "/etc/booth/booth.conf". You need to
2608# prepare the same booth configuration file on each arbitrator and
2609# each node in the cluster sites where the booth daemon can be launched.
2610
2611# "transport" means which transport layer booth daemon will use.
2612# Currently only "UDP" is supported.
2613transport="UDP"
2614port="9929"
2615"""
2616    cfg = [config_template]
2617    if arbitrator is not None:
2618        cfg.append("arbitrator=\"{}\"".format(arbitrator))
2619    for s in clusters.values():
2620        cfg.append("site=\"{}\"".format(s))
2621    cfg.append("authfile=\"{}\"".format(BOOTH_AUTH))
2622    for t in tickets:
2623        cfg.append("ticket=\"{}\"\nexpire=\"600\"".format(t))
2624    cfg = "\n".join(cfg) + "\n"
2625
2626    if os.path.exists(BOOTH_CFG):
2627        rmfile(BOOTH_CFG)
2628    utils.str2file(cfg, BOOTH_CFG)
2629    utils.chown(BOOTH_CFG, "hacluster", "haclient")
2630    os.chmod(BOOTH_CFG, 0o644)
2631
2632
2633def bootstrap_init_geo(context):
2634    """
2635    Configure as a geo cluster member.
2636    """
2637    global _context
2638    _context = context
2639
2640    if os.path.exists(BOOTH_CFG) and not confirm("This will overwrite {} - continue?".format(BOOTH_CFG)):
2641        return
2642    if os.path.exists(BOOTH_AUTH) and not confirm("This will overwrite {} - continue?".format(BOOTH_AUTH)):
2643        return
2644
2645    init_common_geo()
2646
2647    # TODO:
2648    # in /etc/drbd.conf or /etc/drbd.d/global_common.conf
2649    # set common.startup.wfc-timeout 100
2650    # set common.startup.degr-wfc-timeout 120
2651
2652    create_booth_authkey()
2653    create_booth_config(_context.arbitrator, _context.clusters, _context.tickets)
2654    status("Sync booth configuration across cluster")
2655    csync2_update(BOOTH_DIR)
2656    init_csync2_geo()
2657    geo_cib_config(_context.clusters)
2658
2659
2660def geo_fetch_config(node):
2661    # TODO: clean this up
2662    status("Retrieving configuration - This may prompt for root@%s:" % (node))
2663    tmpdir = tmpfiles.create_dir()
2664    rc, _, err = invoke("scp -oStrictHostKeyChecking=no root@{}:'{}/*' {}/".format(node, BOOTH_DIR, tmpdir))
2665    if not rc:
2666        error("Failed to retrieve configuration: {}".format(err))
2667    try:
2668        if os.path.isfile("%s/authkey" % (tmpdir)):
2669            invoke("mv %s/authkey %s" % (tmpdir, BOOTH_AUTH))
2670            os.chmod(BOOTH_AUTH, 0o600)
2671        if os.path.isfile("%s/booth.conf" % (tmpdir)):
2672            invoke("mv %s/booth.conf %s" % (tmpdir, BOOTH_CFG))
2673            os.chmod(BOOTH_CFG, 0o644)
2674    except OSError as err:
2675        raise ValueError("Problem encountered with booth configuration from {}: {}".format(node, err))
2676
2677
2678def geo_cib_config(clusters):
2679    cluster_name = corosync.get_values('totem.cluster_name')[0]
2680    if cluster_name not in list(clusters.keys()):
2681        error("Local cluster name is {}, expected {}".format(cluster_name, "|".join(list(clusters.keys()))))
2682
2683    status("Configure cluster resources for booth")
2684    crm_template = Template("""
2685primitive booth-ip ocf:heartbeat:IPaddr2 $iprules
2686primitive booth-site ocf:pacemaker:booth-site \
2687  meta resource-stickiness="INFINITY" \
2688  params config=booth op monitor interval="10s"
2689group g-booth booth-ip booth-site meta target-role=Stopped
2690""")
2691    iprule = 'params rule #cluster-name eq {} ip="{}"'
2692
2693    crm_configure_load("update", crm_template.substitute(iprules=" ".join(iprule.format(k, v) for k, v in clusters.items())))
2694
2695
2696def bootstrap_join_geo(context):
2697    """
2698    Run on second cluster to add to a geo configuration.
2699    It fetches its booth configuration from the other node (cluster node or arbitrator).
2700    """
2701    global _context
2702    _context = context
2703    init_common_geo()
2704    check_tty()
2705    geo_fetch_config(_context.cluster_node)
2706    status("Sync booth configuration across cluster")
2707    csync2_update(BOOTH_DIR)
2708    geo_cib_config(_context.clusters)
2709
2710
2711def bootstrap_arbitrator(context):
2712    """
2713    Configure this machine as an arbitrator.
2714    It fetches its booth configuration from a cluster node already in the cluster.
2715    """
2716    global _context
2717    _context = context
2718    node = _context.cluster_node
2719
2720    init_common_geo()
2721    check_tty()
2722    geo_fetch_config(node)
2723    if not os.path.isfile(BOOTH_CFG):
2724        error("Failed to copy {} from {}".format(BOOTH_CFG, node))
2725    # TODO: verify that the arbitrator IP in the configuration is us?
2726    status("Enabling and starting the booth arbitrator service")
2727    utils.start_service("booth@booth", enable=True)
2728
2729# EOF
2730