1# Copyright (C) 2016 Kristoffer Gronlund <kgronlund@suse.com> 2# See COPYING for license information. 3# 4# Bootstrap: 5# 6# Supersedes and replaces both the init/add/remove cluster scripts, 7# and the ha-cluster-bootstrap scripts. 8# 9# Implemented as a straight-forward set of python functions for 10# simplicity and flexibility. 11# 12# TODO: Make csync2 usage optional 13# TODO: Configuration file for bootstrap? 14 15import os 16import sys 17import random 18import re 19import time 20import readline 21import shutil 22from string import Template 23from lxml import etree 24from pathlib import Path 25from enum import Enum 26from contextlib import contextmanager 27from . import config 28from . import utils 29from . import xmlutil 30from .cibconfig import mkset_obj, cib_factory 31from . import corosync 32from . import tmpfiles 33from . import clidisplay 34from . import term 35from . import lock 36from . import userdir 37from .constants import SSH_OPTION, QDEVICE_HELP_INFO 38from . import ocfs2 39 40 41 42LOG_FILE = "/var/log/crmsh/ha-cluster-bootstrap.log" 43CSYNC2_KEY = "/etc/csync2/key_hagroup" 44CSYNC2_CFG = "/etc/csync2/csync2.cfg" 45COROSYNC_AUTH = "%%PREFIX%%/etc/corosync/authkey" 46SYSCONFIG_SBD = "/etc/sysconfig/sbd" 47SYSCONFIG_PCMK = "/etc/sysconfig/pacemaker" 48SYSCONFIG_NFS = "/etc/sysconfig/nfs" 49SYSCONFIG_FW = "/etc/sysconfig/SuSEfirewall2" 50SYSCONFIG_FW_CLUSTER = "/etc/sysconfig/SuSEfirewall2.d/services/cluster" 51PCMK_REMOTE_AUTH = "/etc/pacemaker/authkey" 52COROSYNC_CONF_ORIG = tmpfiles.create()[1] 53SERVICES_STOP_LIST = ["corosync-qdevice.service", "corosync.service", "hawk.service"] 54USER_LIST = ["root", "hacluster"] 55QDEVICE_ADD = "add" 56QDEVICE_REMOVE = "remove" 57WATCHDOG_CFG = "/etc/modules-load.d/watchdog.conf" 58BOOTH_DIR = "/etc/booth" 59BOOTH_CFG = "/etc/booth/booth.conf" 60BOOTH_AUTH = "/etc/booth/authkey" 61FILES_TO_SYNC = (BOOTH_DIR, corosync.conf(), COROSYNC_AUTH, CSYNC2_CFG, CSYNC2_KEY, "/etc/ctdb/nodes", 62 "/etc/drbd.conf", "/etc/drbd.d", "/etc/ha.d/ldirectord.cf", "/etc/lvm/lvm.conf", "/etc/multipath.conf", 63 "/etc/samba/smb.conf", SYSCONFIG_NFS, SYSCONFIG_PCMK, SYSCONFIG_SBD, PCMK_REMOTE_AUTH, WATCHDOG_CFG) 64 65INIT_STAGES = ("ssh", "ssh_remote", "csync2", "csync2_remote", "corosync", "sbd", "cluster", "ocfs2", "admin", "qdevice") 66 67class QdevicePolicy(Enum): 68 QDEVICE_RELOAD = 0 69 QDEVICE_RESTART = 1 70 QDEVICE_RESTART_LATER = 2 71 72 73class Context(object): 74 """ 75 Context object used to avoid having to pass these variables 76 to every bootstrap method. 77 """ 78 def __init__(self): 79 ''' 80 Initialize attributes 81 ''' 82 self.type = None # init or join 83 self.quiet = None 84 self.yes_to_all = None 85 self.cluster_name = None 86 self.watchdog = None 87 self.no_overwrite_sshkey = None 88 self.nic_list = None 89 self.unicast = None 90 self.admin_ip = None 91 self.second_heartbeat = None 92 self.ipv6 = None 93 self.qdevice_inst = None 94 self.qnetd_addr = None 95 self.qdevice_port = None 96 self.qdevice_algo = None 97 self.qdevice_tie_breaker = None 98 self.qdevice_tls = None 99 self.qdevice_heuristics = None 100 self.qdevice_heuristics_mode = None 101 self.qdevice_rm_flag = None 102 self.qdevice_reload_policy = QdevicePolicy.QDEVICE_RESTART 103 self.ocfs2_devices = [] 104 self.use_cluster_lvm2 = None 105 self.mount_point = None 106 self.cluster_node = None 107 self.cluster_node_ip = None 108 self.force = None 109 self.arbitrator = None 110 self.clusters = None 111 self.tickets = None 112 self.sbd_manager = None 113 self.sbd_devices = None 114 self.diskless_sbd = None 115 self.stage = None 116 self.args = None 117 self.ui_context = None 118 self.interfaces_inst = None 119 self.with_other_user = True 120 self.cluster_is_running = None 121 self.default_nic_list = [] 122 self.default_ip_list = [] 123 self.local_ip_list = [] 124 self.local_network_list = [] 125 self.rm_list = [SYSCONFIG_SBD, CSYNC2_CFG, corosync.conf(), CSYNC2_KEY, 126 COROSYNC_AUTH, "/var/lib/heartbeat/crm/*", "/var/lib/pacemaker/cib/*"] 127 128 @classmethod 129 def set_context(cls, options): 130 ctx = cls() 131 for opt in vars(options): 132 setattr(ctx, opt, getattr(options, opt)) 133 return ctx 134 135 def initialize_qdevice(self): 136 """ 137 Initialize qdevice instance 138 """ 139 if not self.qnetd_addr: 140 return 141 self.qdevice_inst = corosync.QDevice( 142 self.qnetd_addr, 143 port=self.qdevice_port, 144 algo=self.qdevice_algo, 145 tie_breaker=self.qdevice_tie_breaker, 146 tls=self.qdevice_tls, 147 cmds=self.qdevice_heuristics, 148 mode=self.qdevice_heuristics_mode) 149 150 def _validate_sbd_option(self): 151 """ 152 Validate sbd options 153 """ 154 if self.sbd_devices and self.diskless_sbd: 155 error("Can't use -s and -S options together") 156 if self.stage == "sbd": 157 if not self.sbd_devices and not self.diskless_sbd and self.yes_to_all: 158 error("Stage sbd should specify sbd device by -s or diskless sbd by -S option") 159 if utils.service_is_active("sbd.service"): 160 error("Cannot configure stage sbd: sbd.service already running!") 161 if self.cluster_is_running: 162 utils.check_all_nodes_reachable() 163 164 def validate_option(self): 165 """ 166 Validate options 167 """ 168 if self.admin_ip: 169 Validation.valid_admin_ip(self.admin_ip) 170 if self.qdevice_inst: 171 self.qdevice_inst.valid_attr() 172 if self.nic_list: 173 if len(self.nic_list) > 2: 174 error("Maximum number of interface is 2") 175 if len(self.nic_list) != len(set(self.nic_list)): 176 error("Duplicated input") 177 if self.no_overwrite_sshkey: 178 warn("--no-overwrite-sshkey option is deprecated since crmsh does not overwrite ssh keys by default anymore and will be removed in future versions") 179 if self.type == "join" and self.watchdog: 180 warn("-w option is deprecated and will be removed in future versions") 181 if self.ocfs2_devices or self.stage == "ocfs2": 182 ocfs2.OCFS2Manager.verify_ocfs2(self) 183 self._validate_sbd_option() 184 185 def init_sbd_manager(self): 186 self.sbd_manager = SBDManager(self.sbd_devices, self.diskless_sbd) 187 188 189class SBDManager(object): 190 """ 191 Class to manage sbd configuration and services 192 """ 193 SYSCONFIG_SBD_TEMPLATE = "/usr/share/fillup-templates/sysconfig.sbd" 194 SBD_STATUS_DESCRIPTION = """ 195Configure SBD: 196 If you have shared storage, for example a SAN or iSCSI target, 197 you can use it avoid split-brain scenarios by configuring SBD. 198 This requires a 1 MB partition, accessible to all nodes in the 199 cluster. The device path must be persistent and consistent 200 across all nodes in the cluster, so /dev/disk/by-id/* devices 201 are a good choice. Note that all data on the partition you 202 specify here will be destroyed. 203""" 204 SBD_WARNING = "Not configuring SBD - STONITH will be disabled." 205 DISKLESS_SBD_WARNING = """Diskless SBD requires cluster with three or more nodes. 206If you want to use diskless SBD for two-nodes cluster, should be combined with QDevice.""" 207 PARSE_RE = "[; ]" 208 DISKLESS_CRM_CMD = "crm configure property stonith-enabled=true stonith-watchdog-timeout={} stonith-timeout={}" 209 SBD_WATCHDOG_TIMEOUT_DEFAULT_WITH_QDEVICE = 35 210 STONITH_WATCHDOG_TIMEOUT_DEFAULT = "10s" 211 STONITH_WATCHDOG_TIMEOUT_DEFAULT_S390 = "30s" 212 213 def __init__(self, sbd_devices=None, diskless_sbd=False): 214 """ 215 Init function 216 217 sbd_devices is provided by '-s' option on init process 218 diskless_sbd is provided by '-S' option on init process 219 """ 220 self.sbd_devices_input = sbd_devices 221 self.diskless_sbd = diskless_sbd 222 self._sbd_devices = None 223 self._watchdog_inst = None 224 self._stonith_watchdog_timeout = self.STONITH_WATCHDOG_TIMEOUT_DEFAULT 225 self._stonith_timeout = 60 226 self._sbd_watchdog_timeout = 0 227 self._is_s390 = "390" in os.uname().machine 228 229 @staticmethod 230 def _get_device_uuid(dev, node=None): 231 """ 232 Get UUID for specific device and node 233 """ 234 cmd = "sbd -d {} dump".format(dev) 235 if node: 236 cmd = "ssh {} root@{} '{}'".format(SSH_OPTION, node, cmd) 237 238 rc, out, err = utils.get_stdout_stderr(cmd) 239 if rc != 0 and err: 240 raise ValueError("Cannot dump sbd meta-data: {}".format(err)) 241 if rc == 0 and out: 242 res = re.search("UUID\s*:\s*(.*)\n", out) 243 if not res: 244 raise ValueError("Cannot find sbd device UUID for {}".format(dev)) 245 return res.group(1) 246 247 def _compare_device_uuid(self, dev, node_list): 248 """ 249 Compare local sbd device UUID with other node's sbd device UUID 250 """ 251 if not node_list: 252 return 253 local_uuid = self._get_device_uuid(dev) 254 for node in node_list: 255 remote_uuid = self._get_device_uuid(dev, node) 256 if local_uuid != remote_uuid: 257 raise ValueError("Device {} doesn't have the same UUID with {}".format(dev, node)) 258 259 def _verify_sbd_device(self, dev_list, compare_node_list=[]): 260 """ 261 Verify sbd device 262 """ 263 if len(dev_list) > 3: 264 raise ValueError("Maximum number of SBD device is 3") 265 for dev in dev_list: 266 if not utils.is_block_device(dev): 267 raise ValueError("{} doesn't look like a block device".format(dev)) 268 self._compare_device_uuid(dev, compare_node_list) 269 270 def _get_sbd_device_interactive(self): 271 """ 272 Get sbd device on interactive mode 273 """ 274 if _context.yes_to_all: 275 warn(self.SBD_WARNING) 276 return 277 278 status(self.SBD_STATUS_DESCRIPTION) 279 280 if not confirm("Do you wish to use SBD?"): 281 warn(self.SBD_WARNING) 282 return 283 284 configured_dev_list = self._get_sbd_device_from_config() 285 if configured_dev_list and not confirm("SBD is already configured to use {} - overwrite?".format(';'.join(configured_dev_list))): 286 return configured_dev_list 287 288 dev_list = [] 289 dev_looks_sane = False 290 while not dev_looks_sane: 291 dev = prompt_for_string('Path to storage device (e.g. /dev/disk/by-id/...), or "none" for diskless sbd, use ";" as separator for multi path', r'none|\/.*') 292 if not dev: 293 continue 294 if dev == "none": 295 self.diskless_sbd = True 296 return 297 dev_list = utils.re_split_string(self.PARSE_RE, dev) 298 try: 299 self._verify_sbd_device(dev_list) 300 except ValueError as err_msg: 301 print_error_msg(str(err_msg)) 302 continue 303 for dev_item in dev_list: 304 warn("All data on {} will be destroyed!".format(dev_item)) 305 if confirm('Are you sure you wish to use this device?'): 306 dev_looks_sane = True 307 else: 308 dev_looks_sane = False 309 break 310 311 return dev_list 312 313 def _get_sbd_device(self): 314 """ 315 Get sbd device from options or interactive mode 316 """ 317 dev_list = [] 318 if self.sbd_devices_input: 319 dev_list = utils.parse_append_action_argument(self.sbd_devices_input) 320 self._verify_sbd_device(dev_list) 321 elif not self.diskless_sbd: 322 dev_list = self._get_sbd_device_interactive() 323 self._sbd_devices = dev_list 324 325 def _initialize_sbd(self): 326 """ 327 Initialize SBD device 328 """ 329 if self.diskless_sbd: 330 return 331 for dev in self._sbd_devices: 332 rc, _, err = invoke("sbd -d {} create".format(dev)) 333 if not rc: 334 error("Failed to initialize SBD device {}: {}".format(dev, err)) 335 336 def _update_configuration(self): 337 """ 338 Update /etc/sysconfig/sbd 339 """ 340 shutil.copyfile(self.SYSCONFIG_SBD_TEMPLATE, SYSCONFIG_SBD) 341 self._determine_sbd_watchdog_timeout() 342 sbd_config_dict = { 343 "SBD_PACEMAKER": "yes", 344 "SBD_STARTMODE": "always", 345 "SBD_DELAY_START": "no", 346 "SBD_WATCHDOG_DEV": self._watchdog_inst.watchdog_device_name 347 } 348 if self._sbd_watchdog_timeout > 0: 349 sbd_config_dict["SBD_WATCHDOG_TIMEOUT"] = str(self._sbd_watchdog_timeout) 350 if self._sbd_devices: 351 sbd_config_dict["SBD_DEVICE"] = ';'.join(self._sbd_devices) 352 utils.sysconfig_set(SYSCONFIG_SBD, **sbd_config_dict) 353 csync2_update(SYSCONFIG_SBD) 354 355 def _determine_sbd_watchdog_timeout(self): 356 """ 357 When using diskless SBD, determine value of SBD_WATCHDOG_TIMEOUT 358 """ 359 if not self.diskless_sbd: 360 return 361 # add sbd after qdevice started 362 if utils.is_qdevice_configured() and utils.service_is_active("corosync-qdevice.service"): 363 qdevice_sync_timeout = utils.get_qdevice_sync_timeout() 364 self._sbd_watchdog_timeout = qdevice_sync_timeout + 5 365 if self._is_s390 and self._sbd_watchdog_timeout < 15: 366 self._sbd_watchdog_timeout = 15 367 self._stonith_timeout = self.calculate_stonith_timeout(self._sbd_watchdog_timeout) 368 # add sbd and qdevice together from beginning 369 elif _context.qdevice_inst: 370 self._sbd_watchdog_timeout = self.SBD_WATCHDOG_TIMEOUT_DEFAULT_WITH_QDEVICE 371 self._stonith_timeout = self.calculate_stonith_timeout(self._sbd_watchdog_timeout) 372 373 def _determine_stonith_watchdog_timeout(self): 374 """ 375 Determine value of stonith-watchdog-timeout 376 """ 377 res = SBDManager.get_sbd_value_from_config("SBD_WATCHDOG_TIMEOUT") 378 if res: 379 self._stonith_watchdog_timeout = -1 380 elif self._is_s390: 381 self._stonith_watchdog_timeout = self.STONITH_WATCHDOG_TIMEOUT_DEFAULT_S390 382 383 def _get_sbd_device_from_config(self): 384 """ 385 Gets currently configured SBD device, i.e. what's in /etc/sysconfig/sbd 386 """ 387 res = SBDManager.get_sbd_value_from_config("SBD_DEVICE") 388 if res: 389 return utils.re_split_string(self.PARSE_RE, res) 390 else: 391 return None 392 393 def _restart_cluster_and_configure_sbd_ra(self): 394 """ 395 Try to configure sbd resource, restart cluster on needed 396 """ 397 if not utils.has_resource_running(): 398 status("Restarting cluster service") 399 utils.cluster_run_cmd("crm cluster restart") 400 wait_for_cluster() 401 self.configure_sbd_resource() 402 else: 403 warn("To start sbd.service, need to restart cluster service manually on each node") 404 if self.diskless_sbd: 405 cmd = self.DISKLESS_CRM_CMD.format(self._stonith_watchdog_timeout, str(self._stonith_timeout)+"s") 406 warn("Then run \"{}\" on any node".format(cmd)) 407 else: 408 self.configure_sbd_resource() 409 410 def _enable_sbd_service(self): 411 """ 412 Try to enable sbd service 413 """ 414 if _context.cluster_is_running: 415 # in sbd stage, enable sbd.service on cluster wide 416 utils.cluster_run_cmd("systemctl enable sbd.service") 417 self._restart_cluster_and_configure_sbd_ra() 418 else: 419 # in init process 420 invoke("systemctl enable sbd.service") 421 422 def _warn_diskless_sbd(self, peer=None): 423 """ 424 Give warning when configuring diskless sbd 425 """ 426 # When in sbd stage or join process 427 if (self.diskless_sbd and _context.cluster_is_running) or peer: 428 vote_dict = utils.get_quorum_votes_dict(peer) 429 expected_vote = int(vote_dict['Expected']) 430 if (expected_vote < 2 and peer) or (expected_vote < 3 and not peer): 431 warn(self.DISKLESS_SBD_WARNING) 432 # When in init process 433 elif self.diskless_sbd: 434 warn(self.DISKLESS_SBD_WARNING) 435 436 def sbd_init(self): 437 """ 438 Function sbd_init includes these steps: 439 1. Get sbd device from options or interactive mode 440 2. Initialize sbd device 441 3. Write config file /etc/sysconfig/sbd 442 """ 443 from .watchdog import Watchdog 444 445 if not utils.package_is_installed("sbd"): 446 return 447 self._watchdog_inst = Watchdog(_input=_context.watchdog) 448 self._watchdog_inst.init_watchdog() 449 self._get_sbd_device() 450 if not self._sbd_devices and not self.diskless_sbd: 451 invoke("systemctl disable sbd.service") 452 return 453 self._warn_diskless_sbd() 454 with status_long("Initializing {}SBD...".format("diskless " if self.diskless_sbd else "")): 455 self._initialize_sbd() 456 self._update_configuration() 457 self._determine_stonith_watchdog_timeout() 458 self._enable_sbd_service() 459 460 def configure_sbd_resource(self): 461 """ 462 Configure stonith-sbd resource and stonith-enabled property 463 """ 464 if not utils.package_is_installed("sbd") or \ 465 not utils.service_is_enabled("sbd.service") or \ 466 utils.has_resource_configured("stonith:external/sbd"): 467 return 468 469 if self._get_sbd_device_from_config(): 470 if not invokerc("crm configure primitive stonith-sbd stonith:external/sbd pcmk_delay_max=30s"): 471 error("Can't create stonith-sbd primitive") 472 if not invokerc("crm configure property stonith-enabled=true"): 473 error("Can't enable STONITH for SBD") 474 else: 475 cmd = self.DISKLESS_CRM_CMD.format(self._stonith_watchdog_timeout, str(self._stonith_timeout)+"s") 476 if not invokerc(cmd): 477 error("Can't enable STONITH for diskless SBD") 478 479 def join_sbd(self, peer_host): 480 """ 481 Function join_sbd running on join process only 482 On joining process, check whether peer node has enabled sbd.service 483 If so, check prerequisites of SBD and verify sbd device on join node 484 """ 485 from .watchdog import Watchdog 486 487 if not utils.package_is_installed("sbd"): 488 return 489 if not os.path.exists(SYSCONFIG_SBD) or not utils.service_is_enabled("sbd.service", peer_host): 490 invoke("systemctl disable sbd.service") 491 return 492 self._watchdog_inst = Watchdog(peer_host=peer_host) 493 self._watchdog_inst.join_watchdog() 494 dev_list = self._get_sbd_device_from_config() 495 if dev_list: 496 self._verify_sbd_device(dev_list, [peer_host]) 497 else: 498 self._warn_diskless_sbd(peer_host) 499 status("Got {}SBD configuration".format("" if dev_list else "diskless ")) 500 invoke("systemctl enable sbd.service") 501 502 @classmethod 503 def verify_sbd_device(cls): 504 """ 505 This classmethod is for verifying sbd device on a running cluster 506 Raise ValueError for exceptions 507 """ 508 inst = cls() 509 dev_list = inst._get_sbd_device_from_config() 510 if not dev_list: 511 raise ValueError("No sbd device configured") 512 inst._verify_sbd_device(dev_list, utils.list_cluster_nodes_except_me()) 513 514 @classmethod 515 def get_sbd_device_from_config(cls): 516 """ 517 Get sbd device list from config 518 """ 519 inst = cls() 520 return inst._get_sbd_device_from_config() 521 522 @classmethod 523 def is_using_diskless_sbd(cls): 524 """ 525 Check if using diskless SBD 526 """ 527 inst = cls() 528 dev_list = inst._get_sbd_device_from_config() 529 if not dev_list and utils.service_is_active("sbd.service"): 530 return True 531 return False 532 533 @staticmethod 534 def update_configuration(sbd_config_dict): 535 """ 536 Update and sync sbd configuration 537 """ 538 utils.sysconfig_set(SYSCONFIG_SBD, **sbd_config_dict) 539 csync2_update(SYSCONFIG_SBD) 540 541 @staticmethod 542 def calculate_stonith_timeout(sbd_watchdog_timeout): 543 """ 544 Calculate stonith timeout 545 """ 546 return int(sbd_watchdog_timeout * 2 * 1.2) 547 548 @staticmethod 549 def get_sbd_value_from_config(key): 550 """ 551 Get value from /etc/sysconfig/sbd 552 """ 553 conf = utils.parse_sysconfig(SYSCONFIG_SBD) 554 res = conf.get(key) 555 return res 556 557 558_context = None 559 560 561def die(*args): 562 """ 563 Broken out as special case for log() failure. Ordinarily you 564 should just use error() to terminate. 565 """ 566 raise ValueError(" ".join([str(arg) for arg in args])) 567 568 569def error(*args): 570 """ 571 Log an error message and raise ValueError to bail out of 572 bootstrap process. 573 """ 574 log("ERROR: {}".format(" ".join([str(arg) for arg in args]))) 575 die(*args) 576 577 578def print_error_msg(msg): 579 """ 580 Just print error message 581 """ 582 print(term.render(clidisplay.error("ERROR:")) + " {}".format(msg)) 583 584 585def warn(*args): 586 """ 587 Log and display a warning message. 588 """ 589 log("WARNING: {}".format(" ".join(str(arg) for arg in args))) 590 print(term.render(clidisplay.warn("WARNING: {}".format(" ".join(str(arg) for arg in args))))) 591 592 593@utils.memoize 594def log_file_fallback(): 595 """ 596 If the standard log location isn't writable, 597 just log to the nearest temp dir. 598 """ 599 return os.path.join(utils.get_tempdir(), "ha-cluster-bootstrap.log") 600 601 602def log(*args): 603 global LOG_FILE 604 try: 605 Path(os.path.dirname(LOG_FILE)).mkdir(parents=True, exist_ok=True) 606 with open(LOG_FILE, "ab") as logfile: 607 text = " ".join([utils.to_ascii(arg) for arg in args]) + "\n" 608 logfile.write(text.encode('ascii', 'backslashreplace')) 609 except IOError: 610 if LOG_FILE != log_file_fallback(): 611 LOG_FILE = log_file_fallback() 612 log(*args) 613 else: 614 die("Can't append to {} - aborting".format(LOG_FILE)) 615 616 617def drop_last_history(): 618 hlen = readline.get_current_history_length() 619 if hlen > 0: 620 readline.remove_history_item(hlen - 1) 621 622 623def prompt_for_string(msg, match=None, default='', valid_func=None, prev_value=[]): 624 if _context.yes_to_all: 625 return default 626 627 while True: 628 disable_completion() 629 val = utils.multi_input(' %s [%s]' % (msg, default)) 630 enable_completion() 631 if not val: 632 val = default 633 else: 634 drop_last_history() 635 636 if not val: 637 return None 638 if not match and not valid_func: 639 return val 640 if match and not re.match(match, val): 641 print_error_msg("Invalid value entered") 642 continue 643 if valid_func: 644 try: 645 valid_func(val, prev_value) 646 except ValueError as err: 647 print_error_msg(err) 648 continue 649 650 return val 651 652 653def confirm(msg): 654 if _context.yes_to_all: 655 return True 656 disable_completion() 657 rc = utils.ask(msg) 658 enable_completion() 659 drop_last_history() 660 return rc 661 662 663def disable_completion(): 664 if _context.ui_context: 665 _context.ui_context.disable_completion() 666 667 668def enable_completion(): 669 if _context.ui_context: 670 _context.ui_context.setup_readline() 671 672 673def invoke(*args): 674 """ 675 Log command execution to log file. 676 Log output from command to log file. 677 Return (boolean, stdout, stderr) 678 """ 679 log("+ " + " ".join(args)) 680 rc, stdout, stderr = utils.get_stdout_stderr(" ".join(args)) 681 if stdout: 682 log(stdout) 683 if stderr: 684 log(stderr) 685 return rc == 0, stdout, stderr 686 687 688def invokerc(*args): 689 """ 690 Calling invoke, return True/False 691 """ 692 rc, _, _ = invoke(*args) 693 return rc 694 695 696def crm_configure_load(action, configuration): 697 log(": loading crm config (%s), content is:" % (action)) 698 log(configuration) 699 if not cib_factory.initialize(): 700 error("Failed to load cluster configuration") 701 set_obj = mkset_obj() 702 if action == 'replace': 703 cib_factory.erase() 704 if not set_obj.save(configuration, remove=False, method=action): 705 error("Failed to load cluster configuration") 706 if not cib_factory.commit(): 707 error("Failed to commit cluster configuration") 708 709 710def wait_for_resource(message, resource): 711 """ 712 Wait for resource started 713 """ 714 with status_long(message): 715 while True: 716 # -r here to display inactive resources 717 # -R here to display individual clone instances 718 _rc, out, err = utils.get_stdout_stderr("crm_mon -1rR") 719 # Make sure clone instances also started(no Stopped instance) 720 if re.search(r"{}\s+.*:\s+Started\s".format(resource), out) and \ 721 not re.search(r"{}\s+.*:\s+(Stopped|Starting)".format(resource), out): 722 break 723 status_progress() 724 sleep(1) 725 726 727def wait_for_cluster(): 728 with status_long("Waiting for cluster"): 729 while True: 730 _rc, out, _err = utils.get_stdout_stderr("crm_mon -1") 731 if is_online(out): 732 break 733 status_progress() 734 sleep(2) 735 736 737def get_cluster_node_hostname(): 738 """ 739 Get the hostname of the cluster node 740 """ 741 peer_node = None 742 if _context.cluster_node: 743 rc, out, err = utils.get_stdout_stderr("ssh {} {} crm_node --name".format(SSH_OPTION, _context.cluster_node)) 744 if rc != 0: 745 error(err) 746 peer_node = out 747 return peer_node 748 749 750def is_online(crm_mon_txt): 751 """ 752 Check whether local node is online 753 Besides that, in join process, check whether init node is online 754 """ 755 if not re.search("Online: .* {} ".format(utils.this_node()), crm_mon_txt): 756 return False 757 758 # if peer_node is None, this is in the init process 759 peer_node = get_cluster_node_hostname() 760 if peer_node is None: 761 return True 762 # In join process 763 # If the joining node is already online but can't find the init node 764 # The communication IP maybe mis-configured 765 if not re.search("Online: .* {} ".format(peer_node), crm_mon_txt): 766 shutil.copy(COROSYNC_CONF_ORIG, corosync.conf()) 767 csync2_update(corosync.conf()) 768 utils.stop_service("corosync") 769 print() 770 error("Cannot see peer node \"{}\", please check the communication IP".format(peer_node)) 771 return True 772 773 774def pick_default_value(default_list, prev_list): 775 """ 776 Provide default value for function 'prompt_for_string'. 777 Make sure give different default value in multi-ring mode. 778 779 Parameters: 780 * default_list - default value list for config item 781 * prev_list - previous value for config item in multi-ring mode 782 """ 783 for value in default_list: 784 if value not in prev_list: 785 return value 786 return "" 787 788 789def sleep(t): 790 """ 791 Sleep for t seconds. 792 """ 793 t = float(t) 794 time.sleep(t) 795 796 797def status(msg): 798 log("# " + msg) 799 if not _context.quiet: 800 print(" {}".format(msg)) 801 802 803@contextmanager 804def status_long(msg): 805 log("# {}...".format(msg)) 806 if not _context.quiet: 807 sys.stdout.write(" {}...".format(msg)) 808 sys.stdout.flush() 809 try: 810 yield 811 except: 812 raise 813 else: 814 status_done() 815 816 817def status_progress(): 818 if not _context.quiet: 819 sys.stdout.write(".") 820 sys.stdout.flush() 821 822 823def status_done(): 824 log("# done") 825 if not _context.quiet: 826 print("done") 827 828 829def partprobe(): 830 # This function uses fdisk to create a list of valid devices for probing 831 # with partprobe. This prevents partprobe from failing on read-only mounted 832 # devices such as /dev/sr0 (etc) that might cause it to return an error when 833 # it exits. This allows partprobe to run without forcing _die to bail out. 834 # -Brandon Heaton 835 # ATT Training Engineer 836 # Data Center Engineer 837 # bheaton@suse.com 838 _rc, out, _err = utils.get_stdout_stderr("sfdisk -l") 839 disks = re.findall(r'^Disk\s*(/.+):', out, re.M) 840 invoke("partprobe", *disks) 841 842 843def probe_partitions(): 844 # Need to do this if second (or subsequent) node happens to be up and 845 # connected to storage while it's being repartitioned on the first node. 846 with status_long("Probing for new partitions"): 847 partprobe() 848 sleep(5) 849 850 851def check_tty(): 852 """ 853 Check for pseudo-tty: Cannot display read prompts without a TTY (bnc#892702) 854 """ 855 if _context.yes_to_all: 856 return 857 if not sys.stdin.isatty(): 858 error("No pseudo-tty detected! Use -t option to ssh if calling remotely.") 859 860 861def my_hostname_resolves(): 862 import socket 863 hostname = utils.this_node() 864 try: 865 socket.gethostbyname(hostname) 866 return True 867 except socket.error: 868 return False 869 870 871def check_prereqs(stage): 872 warned = False 873 874 if not my_hostname_resolves(): 875 warn("Hostname '{}' is unresolvable. {}".format( 876 utils.this_node(), 877 "Please add an entry to /etc/hosts or configure DNS.")) 878 warned = True 879 880 timekeepers = ('chronyd.service', 'ntp.service', 'ntpd.service') 881 timekeeper = None 882 for tk in timekeepers: 883 if utils.service_is_available(tk): 884 timekeeper = tk 885 break 886 887 if timekeeper is None: 888 warn("No NTP service found.") 889 warned = True 890 elif not utils.service_is_enabled(timekeeper): 891 warn("{} is not configured to start at system boot.".format(timekeeper)) 892 warned = True 893 894 if warned: 895 if not confirm("Do you want to continue anyway?"): 896 return False 897 898 firewall_open_basic_ports() 899 return True 900 901 902def log_start(): 903 """ 904 Convenient side-effect: this will die immediately if the log file 905 is not writable (e.g. if not running as root) 906 """ 907 # Reload rsyslog to make sure it logs with the correct hostname 908 if utils.service_is_active("rsyslog.service"): 909 invoke("systemctl reload rsyslog.service") 910 datestr = utils.get_stdout("date --rfc-3339=seconds")[1] 911 log('================================================================') 912 log("%s %s" % (datestr, " ".join(sys.argv))) 913 log('----------------------------------------------------------------') 914 915 916def init_network(): 917 """ 918 Get all needed network information through utils.InterfacesInfo 919 """ 920 interfaces_inst = utils.InterfacesInfo(_context.ipv6, _context.second_heartbeat, _context.nic_list) 921 interfaces_inst.get_interfaces_info() 922 _context.default_nic_list = interfaces_inst.get_default_nic_list_from_route() 923 _context.default_ip_list = interfaces_inst.get_default_ip_list() 924 925 # local_ip_list and local_network_list are for validation 926 _context.local_ip_list = interfaces_inst.ip_list 927 _context.local_network_list = interfaces_inst.network_list 928 _context.interfaces_inst = interfaces_inst 929 # use two "-i" options equal to use "-M" option 930 if len(_context.default_nic_list) == 2 and not _context.second_heartbeat: 931 _context.second_heartbeat = True 932 933 934def configure_firewall(tcp=None, udp=None): 935 if tcp is None: 936 tcp = [] 937 if udp is None: 938 udp = [] 939 940 def init_firewall_suse(tcp, udp): 941 if os.path.exists(SYSCONFIG_FW_CLUSTER): 942 cluster = utils.parse_sysconfig(SYSCONFIG_FW_CLUSTER) 943 tcpcurr = set(cluster.get("TCP", "").split()) 944 tcpcurr.update(tcp) 945 tcp = list(tcpcurr) 946 udpcurr = set(cluster.get("UDP", "").split()) 947 udpcurr.update(udp) 948 udp = list(udpcurr) 949 950 utils.sysconfig_set(SYSCONFIG_FW_CLUSTER, TCP=" ".join(tcp), UDP=" ".join(udp)) 951 952 ext = "" 953 if os.path.exists(SYSCONFIG_FW): 954 fw = utils.parse_sysconfig(SYSCONFIG_FW) 955 ext = fw.get("FW_CONFIGURATIONS_EXT", "") 956 if "cluster" not in ext.split(): 957 ext = ext + " cluster" 958 utils.sysconfig_set(SYSCONFIG_FW, FW_CONFIGURATIONS_EXT=ext) 959 960 # No need to do anything else if the firewall is inactive 961 if not utils.service_is_active("SuSEfirewall2"): 962 return 963 964 # Firewall is active, either restart or complain if we couldn't tweak it 965 status("Restarting firewall (tcp={}, udp={})".format(" ".join(tcp), " ".join(udp))) 966 if not invokerc("rcSuSEfirewall2 restart"): 967 error("Failed to restart firewall (SuSEfirewall2)") 968 969 def init_firewall_firewalld(tcp, udp): 970 has_firewalld = utils.service_is_active("firewalld") 971 cmdbase = 'firewall-cmd --zone=public --permanent ' if has_firewalld else 'firewall-offline-cmd --zone=public ' 972 973 def cmd(args): 974 if not invokerc(cmdbase + args): 975 error("Failed to configure firewall.") 976 977 for p in tcp: 978 cmd("--add-port={}/tcp".format(p)) 979 980 for p in udp: 981 cmd("--add-port={}/udp".format(p)) 982 983 if has_firewalld: 984 if not invokerc("firewall-cmd --reload"): 985 error("Failed to reload firewall configuration.") 986 987 def init_firewall_ufw(tcp, udp): 988 """ 989 try configuring firewall with ufw 990 """ 991 for p in tcp: 992 if not invokerc("ufw allow {}/tcp".format(p)): 993 error("Failed to configure firewall (ufw)") 994 for p in udp: 995 if not invokerc("ufw allow {}/udp".format(p)): 996 error("Failed to configure firewall (ufw)") 997 998 if utils.package_is_installed("firewalld"): 999 init_firewall_firewalld(tcp, udp) 1000 elif utils.package_is_installed("SuSEfirewall2"): 1001 init_firewall_suse(tcp, udp) 1002 elif utils.package_is_installed("ufw"): 1003 init_firewall_ufw(tcp, udp) 1004 1005 1006def firewall_open_basic_ports(): 1007 """ 1008 Open ports for csync2, mgmtd, hawk & dlm respectively 1009 """ 1010 configure_firewall(tcp=["30865", "5560", "7630", "21064"]) 1011 1012 1013def firewall_open_corosync_ports(): 1014 """ 1015 Have to do this separately, as we need general firewall config early 1016 so csync2 works, but need corosync config *after* corosync.conf has 1017 been created/updated. 1018 1019 Please note corosync uses two UDP ports mcastport (for mcast 1020 receives) and mcastport - 1 (for mcast sends). 1021 1022 Also open QNetd/QDevice port if configured. 1023 """ 1024 # all mcastports defined in corosync config 1025 udp = corosync.get_values("totem.interface.mcastport") 1026 udp.extend([str(int(p) - 1) for p in udp]) 1027 1028 tcp = corosync.get_values("totem.quorum.device.net.port") 1029 1030 configure_firewall(tcp=tcp, udp=udp) 1031 1032 1033def init_cluster_local(): 1034 # Caller should check this, but I'm paranoid... 1035 if utils.service_is_active("corosync.service"): 1036 error("corosync service is running!") 1037 1038 firewall_open_corosync_ports() 1039 1040 # reset password, but only if it's not already set 1041 _rc, outp = utils.get_stdout("passwd -S hacluster") 1042 ps = outp.strip().split()[1] 1043 pass_msg = "" 1044 if ps not in ("P", "PS"): 1045 log(': Resetting password of hacluster user') 1046 rc, outp, errp = utils.get_stdout_stderr("passwd hacluster", input_s=b"linux\nlinux\n") 1047 if rc != 0: 1048 warn("Failed to reset password of hacluster user: %s" % (outp + errp)) 1049 else: 1050 pass_msg = ", password 'linux'" 1051 1052 # evil, but necessary 1053 invoke("rm -f /var/lib/heartbeat/crm/* /var/lib/pacemaker/cib/*") 1054 1055 # only try to start hawk if hawk is installed 1056 if utils.service_is_available("hawk.service"): 1057 utils.start_service("hawk.service", enable=True) 1058 status("Hawk cluster interface is now running. To see cluster status, open:") 1059 status(" https://{}:7630/".format(_context.default_ip_list[0])) 1060 status("Log in with username 'hacluster'{}".format(pass_msg)) 1061 else: 1062 warn("Hawk not installed - not configuring web management interface.") 1063 1064 if pass_msg: 1065 warn("You should change the hacluster password to something more secure!") 1066 1067 utils.start_service("pacemaker.service", enable=True) 1068 wait_for_cluster() 1069 1070 1071def install_tmp(tmpfile, to): 1072 with open(tmpfile, "r") as src: 1073 with utils.open_atomic(to, "w") as dst: 1074 for line in src: 1075 dst.write(line) 1076 1077 1078def append(fromfile, tofile): 1079 log("+ cat %s >> %s" % (fromfile, tofile)) 1080 with open(tofile, "a") as tf: 1081 with open(fromfile, "r") as ff: 1082 tf.write(ff.read()) 1083 1084 1085def append_unique(fromfile, tofile): 1086 """ 1087 Append unique content from fromfile to tofile 1088 """ 1089 if not utils.check_file_content_included(fromfile, tofile): 1090 append(fromfile, tofile) 1091 1092 1093def rmfile(path, ignore_errors=False): 1094 """ 1095 Try to remove the given file, and 1096 report an error on failure 1097 """ 1098 try: 1099 os.remove(path) 1100 except os.error as err: 1101 if not ignore_errors: 1102 error("Failed to remove {}: {}".format(path, err)) 1103 1104 1105def mkdirs_owned(dirs, mode=0o777, uid=-1, gid=-1): 1106 """ 1107 Create directory path, setting the mode and 1108 ownership of the leaf directory to mode/uid/gid. 1109 """ 1110 if not os.path.exists(dirs): 1111 try: 1112 os.makedirs(dirs, mode) 1113 except OSError as err: 1114 error("Failed to create {}: {}".format(dirs, err)) 1115 if uid != -1 or gid != -1: 1116 utils.chown(dirs, uid, gid) 1117 1118 1119def init_ssh(): 1120 """ 1121 Configure passwordless SSH. 1122 """ 1123 utils.start_service("sshd.service", enable=True) 1124 for user in USER_LIST: 1125 configure_local_ssh_key(user) 1126 1127 1128def key_files(user): 1129 """ 1130 Find home directory for user and return key files with abspath 1131 """ 1132 keyfile_dict = {} 1133 home_dir = userdir.gethomedir(user) 1134 keyfile_dict['private'] = "{}/.ssh/id_rsa".format(home_dir) 1135 keyfile_dict['public'] = "{}/.ssh/id_rsa.pub".format(home_dir) 1136 keyfile_dict['authorized'] = "{}/.ssh/authorized_keys".format(home_dir) 1137 return keyfile_dict 1138 1139 1140def is_nologin(user): 1141 """ 1142 Check if user's shell is /sbin/nologin 1143 """ 1144 with open("/etc/passwd") as f: 1145 return re.search("{}:.*:/sbin/nologin".format(user), f.read()) 1146 1147 1148def change_user_shell(user): 1149 """ 1150 To change user's login shell 1151 """ 1152 if user != "root" and is_nologin(user): 1153 if not _context.yes_to_all: 1154 status(""" 1155User {} will be changed the login shell as /bin/bash, and 1156be setted up authorized ssh access among cluster nodes""".format(user)) 1157 if not confirm("Continue?"): 1158 _context.with_other_user = False 1159 return 1160 invoke("usermod -s /bin/bash {}".format(user)) 1161 1162 1163def configure_local_ssh_key(user="root"): 1164 """ 1165 Configure ssh rsa key locally 1166 1167 If <home_dir>/.ssh/id_rsa not exist, generate a new one 1168 Add <home_dir>/.ssh/id_rsa.pub to <home_dir>/.ssh/authorized_keys anyway, make sure itself authorized 1169 """ 1170 change_user_shell(user) 1171 1172 private_key, public_key, authorized_file = key_files(user).values() 1173 if not os.path.exists(private_key): 1174 status("Generating SSH key for {}".format(user)) 1175 cmd = "ssh-keygen -q -f {} -C 'Cluster Internal on {}' -N ''".format(private_key, utils.this_node()) 1176 cmd = utils.add_su(cmd, user) 1177 rc, _, err = invoke(cmd) 1178 if not rc: 1179 error("Failed to generate ssh key for {}: {}".format(user, err)) 1180 1181 if not os.path.exists(authorized_file): 1182 open(authorized_file, 'w').close() 1183 append_unique(public_key, authorized_file) 1184 1185 1186def init_ssh_remote(): 1187 """ 1188 Called by ha-cluster-join 1189 """ 1190 authorized_keys_file = "/root/.ssh/authorized_keys" 1191 if not os.path.exists(authorized_keys_file): 1192 open(authorized_keys_file, 'w').close() 1193 authkeys = open(authorized_keys_file, "r+") 1194 authkeys_data = authkeys.read() 1195 for key in ("id_rsa", "id_dsa", "id_ecdsa", "id_ed25519"): 1196 fn = os.path.join("/root/.ssh", key) 1197 if not os.path.exists(fn): 1198 continue 1199 keydata = open(fn + ".pub").read() 1200 if keydata not in authkeys_data: 1201 append(fn + ".pub", authorized_keys_file) 1202 1203 1204def append_to_remote_file(fromfile, remote_node, tofile): 1205 """ 1206 Append content of fromfile to tofile on remote_node 1207 """ 1208 err_details_string = """ 1209 crmsh has no way to help you to setup up passwordless ssh among nodes at this time. 1210 As the hint, likely, `PasswordAuthentication` is 'no' in /etc/ssh/sshd_config. 1211 Given in this case, users must setup passwordless ssh beforehand, or change it to 'yes' and manage passwords properly 1212 """ 1213 cmd = "cat {} | ssh {} root@{} 'cat >> {}'".format(fromfile, SSH_OPTION, remote_node, tofile) 1214 rc, _, err = invoke(cmd) 1215 if not rc: 1216 error("Failed to append contents of {} to {}:\n\"{}\"\n{}".format(fromfile, remote_node, err, err_details_string)) 1217 1218 1219def init_csync2(): 1220 status("Configuring csync2") 1221 if os.path.exists(CSYNC2_KEY): 1222 if not confirm("csync2 is already configured - overwrite?"): 1223 return 1224 1225 invoke("rm", "-f", CSYNC2_KEY) 1226 with status_long("Generating csync2 shared key (this may take a while)"): 1227 if not invokerc("csync2", "-k", CSYNC2_KEY): 1228 error("Can't create csync2 key {}".format(CSYNC2_KEY)) 1229 1230 csync2_file_list = "" 1231 for f in FILES_TO_SYNC: 1232 csync2_file_list += "include {};\n".format(f) 1233 1234 utils.str2file("""group ha_group 1235{{ 1236key /etc/csync2/key_hagroup; 1237host {}; 1238{} 1239}} 1240 """.format(utils.this_node(), csync2_file_list), CSYNC2_CFG) 1241 1242 utils.start_service("csync2.socket", enable=True) 1243 with status_long("csync2 checking files"): 1244 invoke("csync2", "-cr", "/") 1245 1246 1247def csync2_update(path): 1248 ''' 1249 Sync path to all peers 1250 1251 If there was a conflict, use '-f' to force this side to win 1252 ''' 1253 invoke("csync2 -rm {}".format(path)) 1254 if invokerc("csync2 -rxv {}".format(path)): 1255 return 1256 invoke("csync2 -rf {}".format(path)) 1257 if not invokerc("csync2 -rxv {}".format(path)): 1258 warn("{} was not synced".format(path)) 1259 1260 1261def init_csync2_remote(): 1262 """ 1263 It would be nice if we could just have csync2.cfg include a directory, 1264 which in turn included one file per node which would be referenced via 1265 something like "group ha_group { ... config: /etc/csync2/hosts/*; }" 1266 That way, adding a new node would just mean adding a single new file 1267 to that directory. Unfortunately, the 'config' statement only allows 1268 inclusion of specific individual files, not multiple files via wildcard. 1269 So we have this function which is called by ha-cluster-join to add the new 1270 remote node to csync2 config on some existing node. It is intentionally 1271 not documented in ha-cluster-init's user-visible usage information. 1272 """ 1273 newhost = _context.cluster_node 1274 if not newhost: 1275 error("Hostname not specified") 1276 1277 curr_cfg = open(CSYNC2_CFG).read() 1278 1279 was_quiet = _context.quiet 1280 try: 1281 _context.quiet = True 1282 # if host doesn't already exist in csync2 config, add it 1283 if not re.search(r"^\s*host.*\s+%s\s*;" % (newhost), curr_cfg, flags=re.M): 1284 curr_cfg = re.sub(r"\bhost.*\s+\S+\s*;", r"\g<0>\n\thost %s;" % (utils.doublequote(newhost)), curr_cfg, count=1) 1285 utils.str2file(curr_cfg, CSYNC2_CFG) 1286 csync2_update("/") 1287 else: 1288 log(": Not updating %s - remote host %s already exists" % (CSYNC2_CFG, newhost)) 1289 finally: 1290 _context.quiet = was_quiet 1291 1292 1293def init_corosync_auth(): 1294 """ 1295 Generate the corosync authkey 1296 """ 1297 if os.path.exists(COROSYNC_AUTH): 1298 if not confirm("%s already exists - overwrite?" % (COROSYNC_AUTH)): 1299 return 1300 rmfile(COROSYNC_AUTH) 1301 invoke("corosync-keygen -l -k {}".format(COROSYNC_AUTH)) 1302 1303 1304def init_remote_auth(): 1305 """ 1306 Generate the pacemaker-remote authkey 1307 """ 1308 if os.path.exists(PCMK_REMOTE_AUTH): 1309 if not confirm("%s already exists - overwrite?" % (PCMK_REMOTE_AUTH)): 1310 return 1311 rmfile(PCMK_REMOTE_AUTH) 1312 1313 pcmk_remote_dir = os.path.dirname(PCMK_REMOTE_AUTH) 1314 mkdirs_owned(pcmk_remote_dir, mode=0o750, gid="haclient") 1315 if not invokerc("dd if=/dev/urandom of={} bs=4096 count=1".format(PCMK_REMOTE_AUTH)): 1316 warn("Failed to create pacemaker authkey: {}".format(PCMK_REMOTE_AUTH)) 1317 utils.chown(PCMK_REMOTE_AUTH, "hacluster", "haclient") 1318 os.chmod(PCMK_REMOTE_AUTH, 0o640) 1319 1320 1321class Validation(object): 1322 """ 1323 Class to validate values from interactive inputs 1324 """ 1325 1326 def __init__(self, value, prev_value_list=[]): 1327 """ 1328 Init function 1329 """ 1330 self.value = value 1331 self.prev_value_list = prev_value_list 1332 if self.value in self.prev_value_list: 1333 raise ValueError("Already in use: {}".format(self.value)) 1334 1335 def _is_mcast_addr(self): 1336 """ 1337 Check whether the address is multicast address 1338 """ 1339 if not utils.IP.is_mcast(self.value): 1340 raise ValueError("{} is not multicast address".format(self.value)) 1341 1342 def _is_local_addr(self, local_addr_list): 1343 """ 1344 Check whether the address is in local 1345 """ 1346 if self.value not in local_addr_list: 1347 raise ValueError("Address must be a local address (one of {})".format(local_addr_list)) 1348 1349 def _is_valid_port(self): 1350 """ 1351 Check whether the port is valid 1352 """ 1353 if self.prev_value_list and abs(int(self.value) - int(self.prev_value_list[0])) <= 1: 1354 raise ValueError("Port {} is already in use by corosync. Leave a gap between multiple rings.".format(self.value)) 1355 if int(self.value) <= 1024 or int(self.value) > 65535: 1356 raise ValueError("Valid port range should be 1025-65535") 1357 1358 @classmethod 1359 def valid_mcast_address(cls, addr, prev_value_list=[]): 1360 """ 1361 Check whether the address is already in use and whether the address is for multicast 1362 """ 1363 cls_inst = cls(addr, prev_value_list) 1364 cls_inst._is_mcast_addr() 1365 1366 @classmethod 1367 def valid_ucast_ip(cls, addr, prev_value_list=[]): 1368 """ 1369 Check whether the address is already in use and whether the address exists on local 1370 """ 1371 cls_inst = cls(addr, prev_value_list) 1372 cls_inst._is_local_addr(_context.local_ip_list) 1373 1374 @classmethod 1375 def valid_mcast_ip(cls, addr, prev_value_list=[]): 1376 """ 1377 Check whether the address is already in use and whether the address exists on local address and network 1378 """ 1379 cls_inst = cls(addr, prev_value_list) 1380 cls_inst._is_local_addr(_context.local_ip_list + _context.local_network_list) 1381 1382 @classmethod 1383 def valid_port(cls, port, prev_value_list=[]): 1384 """ 1385 Check whether the port is valid 1386 """ 1387 cls_inst = cls(port, prev_value_list) 1388 cls_inst._is_valid_port() 1389 1390 @staticmethod 1391 def valid_admin_ip(addr, prev_value_list=[]): 1392 """ 1393 Validate admin IP address 1394 """ 1395 ipv6 = utils.IP.is_ipv6(addr) 1396 1397 # Check whether this IP already configured in cluster 1398 ping_cmd = "ping6" if ipv6 else "ping" 1399 if invokerc("{} -c 1 {}".format(ping_cmd, addr)): 1400 raise ValueError("Address already in use: {}".format(addr)) 1401 1402 1403def init_corosync_unicast(): 1404 1405 if _context.yes_to_all: 1406 status("Configuring corosync (unicast)") 1407 else: 1408 status(""" 1409Configure Corosync (unicast): 1410 This will configure the cluster messaging layer. You will need 1411 to specify a network address over which to communicate (default 1412 is {}'s network, but you can use the network address of any 1413 active interface). 1414""".format(_context.default_nic_list[0])) 1415 1416 ringXaddr_res = [] 1417 mcastport_res = [] 1418 default_ports = ["5405", "5407"] 1419 two_rings = False 1420 1421 for i in range(2): 1422 ringXaddr = prompt_for_string( 1423 'Address for ring{}'.format(i), 1424 default=pick_default_value(_context.default_ip_list, ringXaddr_res), 1425 valid_func=Validation.valid_ucast_ip, 1426 prev_value=ringXaddr_res) 1427 if not ringXaddr: 1428 error("No value for ring{}".format(i)) 1429 ringXaddr_res.append(ringXaddr) 1430 1431 mcastport = prompt_for_string( 1432 'Port for ring{}'.format(i), 1433 match='[0-9]+', 1434 default=pick_default_value(default_ports, mcastport_res), 1435 valid_func=Validation.valid_port, 1436 prev_value=mcastport_res) 1437 if not mcastport: 1438 error("Expected a multicast port for ring{}".format(i)) 1439 mcastport_res.append(mcastport) 1440 1441 if i == 1 or \ 1442 not _context.second_heartbeat or \ 1443 not confirm("\nAdd another heartbeat line?"): 1444 break 1445 two_rings = True 1446 1447 corosync.create_configuration( 1448 clustername=_context.cluster_name, 1449 ringXaddr=ringXaddr_res, 1450 mcastport=mcastport_res, 1451 transport="udpu", 1452 ipv6=_context.ipv6, 1453 two_rings=two_rings) 1454 csync2_update(corosync.conf()) 1455 1456 1457def init_corosync_multicast(): 1458 def gen_mcastaddr(): 1459 if _context.ipv6: 1460 return "ff3e::%s:%d" % ( 1461 ''.join([random.choice('0123456789abcdef') for _ in range(4)]), 1462 random.randint(0, 9)) 1463 return "239.%d.%d.%d" % ( 1464 random.randint(0, 255), 1465 random.randint(0, 255), 1466 random.randint(1, 255)) 1467 1468 if _context.yes_to_all: 1469 status("Configuring corosync") 1470 else: 1471 status(""" 1472Configure Corosync: 1473 This will configure the cluster messaging layer. You will need 1474 to specify a network address over which to communicate (default 1475 is {}'s network, but you can use the network address of any 1476 active interface). 1477""".format(_context.default_nic_list[0])) 1478 1479 bindnetaddr_res = [] 1480 mcastaddr_res = [] 1481 mcastport_res = [] 1482 default_ports = ["5405", "5407"] 1483 two_rings = False 1484 1485 for i in range(2): 1486 bindnetaddr = prompt_for_string( 1487 'IP or network address to bind to', 1488 default=pick_default_value(_context.default_ip_list, bindnetaddr_res), 1489 valid_func=Validation.valid_mcast_ip, 1490 prev_value=bindnetaddr_res) 1491 if not bindnetaddr: 1492 error("No value for bindnetaddr") 1493 bindnetaddr_res.append(bindnetaddr) 1494 1495 mcastaddr = prompt_for_string( 1496 'Multicast address', 1497 default=gen_mcastaddr(), 1498 valid_func=Validation.valid_mcast_address, 1499 prev_value=mcastaddr_res) 1500 if not mcastaddr: 1501 error("No value for mcastaddr") 1502 mcastaddr_res.append(mcastaddr) 1503 1504 mcastport = prompt_for_string( 1505 'Multicast port', 1506 match='[0-9]+', 1507 default=pick_default_value(default_ports, mcastport_res), 1508 valid_func=Validation.valid_port, 1509 prev_value=mcastport_res) 1510 if not mcastport: 1511 error("No value for mcastport") 1512 mcastport_res.append(mcastport) 1513 1514 if i == 1 or \ 1515 not _context.second_heartbeat or \ 1516 not confirm("\nConfigure a second multicast ring?"): 1517 break 1518 two_rings = True 1519 1520 nodeid = None 1521 if _context.ipv6: 1522 nodeid = utils.gen_nodeid_from_ipv6(_context.default_ip_list[0]) 1523 1524 corosync.create_configuration( 1525 clustername=_context.cluster_name, 1526 bindnetaddr=bindnetaddr_res, 1527 mcastaddr=mcastaddr_res, 1528 mcastport=mcastport_res, 1529 ipv6=_context.ipv6, 1530 nodeid=nodeid, 1531 two_rings=two_rings) 1532 csync2_update(corosync.conf()) 1533 1534 1535def init_corosync(): 1536 """ 1537 Configure corosync (unicast or multicast, encrypted?) 1538 """ 1539 def requires_unicast(): 1540 host = utils.detect_cloud() 1541 if host is not None: 1542 status("Detected cloud platform: {}".format(host)) 1543 return host is not None 1544 1545 init_corosync_auth() 1546 1547 if os.path.exists(corosync.conf()): 1548 if not confirm("%s already exists - overwrite?" % (corosync.conf())): 1549 return 1550 1551 if _context.unicast or requires_unicast(): 1552 init_corosync_unicast() 1553 else: 1554 init_corosync_multicast() 1555 1556 1557def init_sbd(): 1558 """ 1559 Configure SBD (Storage-based fencing). 1560 1561 SBD can also run in diskless mode if no device 1562 is configured. 1563 """ 1564 _context.sbd_manager.sbd_init() 1565 1566 1567def init_ocfs2(): 1568 """ 1569 OCFS2 configure process 1570 """ 1571 if not _context.ocfs2_devices: 1572 return 1573 ocfs2_manager = ocfs2.OCFS2Manager(_context) 1574 ocfs2_manager.init_ocfs2() 1575 1576 1577def init_cluster(): 1578 """ 1579 Initial cluster configuration. 1580 """ 1581 init_cluster_local() 1582 1583 _rc, nnodes = utils.get_stdout("crm_node -l") 1584 nnodes = len(nnodes.splitlines()) 1585 if nnodes < 1: 1586 error("No nodes found in cluster") 1587 if nnodes > 1: 1588 error("Joined existing cluster - will not reconfigure.") 1589 1590 status("Loading initial cluster configuration") 1591 1592 crm_configure_load("update", """ 1593property cib-bootstrap-options: stonith-enabled=false 1594op_defaults op-options: timeout=600 record-pending=true 1595rsc_defaults rsc-options: resource-stickiness=1 migration-threshold=3 1596""") 1597 1598 _context.sbd_manager.configure_sbd_resource() 1599 1600 1601def init_admin(): 1602 # Skip this section when -y is passed 1603 # unless $ADMIN_IP is set 1604 adminaddr = _context.admin_ip 1605 if _context.yes_to_all and not adminaddr: 1606 return 1607 1608 if not adminaddr: 1609 status(""" 1610Configure Administration IP Address: 1611 Optionally configure an administration virtual IP 1612 address. The purpose of this IP address is to 1613 provide a single IP that can be used to interact 1614 with the cluster, rather than using the IP address 1615 of any specific cluster node. 1616""") 1617 if not confirm("Do you wish to configure a virtual IP address?"): 1618 return 1619 1620 adminaddr = prompt_for_string('Virtual IP', valid_func=Validation.valid_admin_ip) 1621 if not adminaddr: 1622 error("Expected an IP address") 1623 1624 crm_configure_load("update", 'primitive admin-ip IPaddr2 ip=%s op monitor interval=10 timeout=20' % (utils.doublequote(adminaddr))) 1625 wait_for_resource("Configuring virtual IP ({})".format(adminaddr), "admin-ip") 1626 1627 1628def evaluate_qdevice_quorum_effect(mode, diskless_sbd=False): 1629 """ 1630 While adding/removing qdevice, get current expected votes and actual total votes, 1631 to calculate after adding/removing qdevice, whether cluster has quorum 1632 return different policy 1633 """ 1634 quorum_votes_dict = utils.get_quorum_votes_dict() 1635 expected_votes = int(quorum_votes_dict["Expected"]) 1636 actual_votes = int(quorum_votes_dict["Total"]) 1637 if mode == QDEVICE_ADD: 1638 expected_votes += 1 1639 elif mode == QDEVICE_REMOVE: 1640 actual_votes -= 1 1641 1642 if utils.is_quorate(expected_votes, actual_votes) and not diskless_sbd: 1643 # safe to use reload 1644 return QdevicePolicy.QDEVICE_RELOAD 1645 elif utils.has_resource_running(): 1646 # will lose quorum, and with RA running 1647 # no reload, no restart cluster service 1648 # just leave a warning 1649 return QdevicePolicy.QDEVICE_RESTART_LATER 1650 else: 1651 # will lose quorum, without RA running 1652 # safe to restart cluster service 1653 return QdevicePolicy.QDEVICE_RESTART 1654 1655 1656def configure_qdevice_interactive(): 1657 """ 1658 Configure qdevice on interactive mode 1659 """ 1660 if _context.yes_to_all: 1661 return 1662 status("\nConfigure Qdevice/Qnetd:\n" + QDEVICE_HELP_INFO + "\n") 1663 if not confirm("Do you want to configure QDevice?"): 1664 return 1665 qnetd_addr = prompt_for_string("HOST or IP of the QNetd server to be used") 1666 if not qnetd_addr: 1667 error("Address of QNetd is required") 1668 qdevice_port = prompt_for_string("TCP PORT of QNetd server", default=5403) 1669 qdevice_algo = prompt_for_string("QNetd decision ALGORITHM (ffsplit/lms)", default="ffsplit") 1670 qdevice_tie_breaker = prompt_for_string("QNetd TIE_BREAKER (lowest/highest/valid node id)", default="lowest") 1671 qdevice_tls = prompt_for_string("Whether using TLS on QDevice/QNetd (on/off/required)", default="on") 1672 qdevice_heuristics = prompt_for_string("Heuristics COMMAND to run with absolute path; For multiple commands, use \";\" to separate") 1673 qdevice_heuristics_mode = prompt_for_string("MODE of operation of heuristics (on/sync/off)", default="sync") if qdevice_heuristics else None 1674 _context.qdevice_inst = corosync.QDevice( 1675 qnetd_addr, 1676 port=qdevice_port, 1677 algo=qdevice_algo, 1678 tie_breaker=qdevice_tie_breaker, 1679 tls=qdevice_tls, 1680 cmds=qdevice_heuristics, 1681 mode=qdevice_heuristics_mode) 1682 _context.qdevice_inst.valid_attr() 1683 1684 1685def init_qdevice(): 1686 """ 1687 Setup qdevice and qnetd service 1688 """ 1689 if not _context.qdevice_inst: 1690 configure_qdevice_interactive() 1691 # If don't want to config qdevice, return 1692 if not _context.qdevice_inst: 1693 utils.disable_service("corosync-qdevice.service") 1694 return 1695 if _context.stage == "qdevice": 1696 utils.check_all_nodes_reachable() 1697 using_diskless_sbd = SBDManager.is_using_diskless_sbd() 1698 _context.qdevice_reload_policy = evaluate_qdevice_quorum_effect(QDEVICE_ADD, using_diskless_sbd) 1699 # add qdevice after diskless sbd started 1700 if using_diskless_sbd: 1701 res = SBDManager.get_sbd_value_from_config("SBD_WATCHDOG_TIMEOUT") 1702 if res: 1703 sbd_watchdog_timeout = max(int(res), SBDManager.SBD_WATCHDOG_TIMEOUT_DEFAULT_WITH_QDEVICE) 1704 else: 1705 sbd_watchdog_timeout = SBDManager.SBD_WATCHDOG_TIMEOUT_DEFAULT_WITH_QDEVICE 1706 stonith_timeout = SBDManager.calculate_stonith_timeout(sbd_watchdog_timeout) 1707 SBDManager.update_configuration({"SBD_WATCHDOG_TIMEOUT": str(sbd_watchdog_timeout)}) 1708 invokerc("crm configure property stonith-watchdog-timeout=-1 stonith-timeout={}s".format(stonith_timeout)) 1709 1710 status(""" 1711Configure Qdevice/Qnetd:""") 1712 qdevice_inst = _context.qdevice_inst 1713 qnetd_addr = qdevice_inst.qnetd_addr 1714 # Configure ssh passwordless to qnetd if detect password is needed 1715 if utils.check_ssh_passwd_need(qnetd_addr): 1716 status("Copy ssh key to qnetd node({})".format(qnetd_addr)) 1717 rc, _, err = invoke("ssh-copy-id -i /root/.ssh/id_rsa.pub root@{}".format(qnetd_addr)) 1718 if not rc: 1719 error("Failed to copy ssh key: {}".format(err)) 1720 # Start qdevice service if qdevice already configured 1721 if utils.is_qdevice_configured() and not confirm("Qdevice is already configured - overwrite?"): 1722 start_qdevice_service() 1723 return 1724 1725 # Validate qnetd node 1726 qdevice_inst.valid_qnetd() 1727 # Config qdevice 1728 config_qdevice() 1729 # Execute certificate process when tls flag is on 1730 if utils.is_qdevice_tls_on(): 1731 with status_long("Qdevice certification process"): 1732 qdevice_inst.certificate_process_on_init() 1733 1734 start_qdevice_service() 1735 1736 1737def start_qdevice_service(): 1738 """ 1739 Start qdevice and qnetd service 1740 """ 1741 qdevice_inst = _context.qdevice_inst 1742 qnetd_addr = qdevice_inst.qnetd_addr 1743 1744 status("Enable corosync-qdevice.service in cluster") 1745 utils.cluster_run_cmd("systemctl enable corosync-qdevice") 1746 if _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RELOAD: 1747 status("Starting corosync-qdevice.service in cluster") 1748 utils.cluster_run_cmd("systemctl restart corosync-qdevice") 1749 elif _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RESTART: 1750 status("Restarting cluster service") 1751 utils.cluster_run_cmd("crm cluster restart") 1752 wait_for_cluster() 1753 else: 1754 warn("To use qdevice service, need to restart cluster service manually on each node") 1755 1756 status("Enable corosync-qnetd.service on {}".format(qnetd_addr)) 1757 qdevice_inst.enable_qnetd() 1758 status("Starting corosync-qnetd.service on {}".format(qnetd_addr)) 1759 qdevice_inst.start_qnetd() 1760 1761 1762def config_qdevice(): 1763 """ 1764 Process of config qdevice 1765 """ 1766 qdevice_inst = _context.qdevice_inst 1767 1768 qdevice_inst.remove_qdevice_db() 1769 qdevice_inst.write_qdevice_config() 1770 if not corosync.is_unicast(): 1771 corosync.add_nodelist_from_cmaptool() 1772 with status_long("Update configuration"): 1773 update_expected_votes() 1774 if _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RELOAD: 1775 utils.cluster_run_cmd("crm corosync reload") 1776 1777 1778def init(): 1779 """ 1780 Basic init 1781 """ 1782 log_start() 1783 init_network() 1784 1785 1786def join_ssh(seed_host): 1787 """ 1788 SSH configuration for joining node. 1789 """ 1790 if not seed_host: 1791 error("No existing IP/hostname specified (use -c option)") 1792 1793 utils.start_service("sshd.service", enable=True) 1794 for user in USER_LIST: 1795 configure_local_ssh_key(user) 1796 swap_public_ssh_key(seed_host, user) 1797 1798 # This makes sure the seed host has its own SSH keys in its own 1799 # authorized_keys file (again, to help with the case where the 1800 # user has done manual initial setup without the assistance of 1801 # ha-cluster-init). 1802 rc, _, err = invoke("ssh {} root@{} crm cluster init -i {} ssh_remote".format(SSH_OPTION, seed_host, _context.default_nic_list[0])) 1803 if not rc: 1804 error("Can't invoke crm cluster init -i {} ssh_remote on {}: {}".format(_context.default_nic_list[0], seed_host, err)) 1805 1806 1807def swap_public_ssh_key(remote_node, user="root"): 1808 """ 1809 Swap public ssh key between remote_node and local 1810 """ 1811 if user != "root" and not _context.with_other_user: 1812 return 1813 1814 _, public_key, authorized_file = key_files(user).values() 1815 # Detect whether need password to login to remote_node 1816 if utils.check_ssh_passwd_need(remote_node, user): 1817 # If no passwordless configured, paste /root/.ssh/id_rsa.pub to remote_node's /root/.ssh/authorized_keys 1818 status("Configuring SSH passwordless with {}@{}".format(user, remote_node)) 1819 # After this, login to remote_node is passwordless 1820 append_to_remote_file(public_key, remote_node, authorized_file) 1821 1822 try: 1823 # Fetch public key file from remote_node 1824 public_key_file_remote = fetch_public_key_from_remote_node(remote_node, user) 1825 except ValueError as err: 1826 warn(err) 1827 return 1828 # Append public key file from remote_node to local's /root/.ssh/authorized_keys 1829 # After this, login from remote_node is passwordless 1830 # Should do this step even passwordless is True, to make sure we got two-way passwordless 1831 append_unique(public_key_file_remote, authorized_file) 1832 1833 1834def fetch_public_key_from_remote_node(node, user="root"): 1835 """ 1836 Fetch public key file from remote node 1837 Return a temp file contains public key 1838 Return None if no key exist 1839 """ 1840 1841 # For dsa, might need to add PubkeyAcceptedKeyTypes=+ssh-dss to config file, see 1842 # https://superuser.com/questions/1016989/ssh-dsa-keys-no-longer-work-for-password-less-authentication 1843 home_dir = userdir.gethomedir(user) 1844 for key in ("id_rsa", "id_ecdsa", "id_ed25519", "id_dsa"): 1845 public_key_file = "{}/.ssh/{}.pub".format(home_dir, key) 1846 cmd = "ssh {} root@{} 'test -f {}'".format(SSH_OPTION, node, public_key_file) 1847 if not invokerc(cmd): 1848 continue 1849 _, temp_public_key_file = tmpfiles.create() 1850 cmd = "scp {} root@{}:{} {}".format(SSH_OPTION, node, public_key_file, temp_public_key_file) 1851 rc, _, err = invoke(cmd) 1852 if not rc: 1853 error("Failed to run \"{}\": {}".format(cmd, err)) 1854 return temp_public_key_file 1855 raise ValueError("No ssh key exist on {}".format(node)) 1856 1857 1858def join_csync2(seed_host): 1859 """ 1860 Csync2 configuration for joining node. 1861 """ 1862 if not seed_host: 1863 error("No existing IP/hostname specified (use -c option)") 1864 with status_long("Configuring csync2"): 1865 1866 # Necessary if re-running join on a node that's been configured before. 1867 rmfile("/var/lib/csync2/{}.db3".format(utils.this_node()), ignore_errors=True) 1868 1869 # Not automatically updating /etc/hosts - risky in the general case. 1870 # etc_hosts_add_me 1871 # local hosts_line=$(etc_hosts_get_me) 1872 # [ -n "$hosts_line" ] || error "No valid entry for $(hostname) in /etc/hosts - csync2 can't work" 1873 1874 # If we *were* updating /etc/hosts, the next line would have "\"$hosts_line\"" as 1875 # the last arg (but this requires re-enabling this functionality in ha-cluster-init) 1876 cmd = "crm cluster init -i {} csync2_remote {}".format(_context.default_nic_list[0], utils.this_node()) 1877 rc, _, err = invoke("ssh {} root@{} {}".format(SSH_OPTION, seed_host, cmd)) 1878 if not rc: 1879 error("Can't invoke \"{}\" on {}: {}".format(cmd, seed_host, err)) 1880 1881 # This is necessary if syncing /etc/hosts (to ensure everyone's got the 1882 # same list of hosts) 1883 # local tmp_conf=/etc/hosts.$$ 1884 # invoke scp root@seed_host:/etc/hosts $tmp_conf \ 1885 # || error "Can't retrieve /etc/hosts from seed_host" 1886 # install_tmp $tmp_conf /etc/hosts 1887 rc, _, err = invoke("scp root@%s:'/etc/csync2/{csync2.cfg,key_hagroup}' /etc/csync2" % (seed_host)) 1888 if not rc: 1889 error("Can't retrieve csync2 config from {}: {}".format(seed_host, err)) 1890 1891 utils.start_service("csync2.socket", enable=True) 1892 1893 # Sync new config out. This goes to all hosts; csync2.cfg definitely 1894 # needs to go to all hosts (else hosts other than the seed and the 1895 # joining host won't have the joining host in their config yet). 1896 # Strictly, the rest of the files need only go to the new host which 1897 # could theoretically be effected using `csync2 -xv -P $(hostname)`, 1898 # but this still leaves all the other files in dirty state (becuase 1899 # they haven't gone to all nodes in the cluster, which means a 1900 # subseqent join of another node can fail its sync of corosync.conf 1901 # when it updates expected_votes. Grrr... 1902 if not invokerc('ssh {} root@{} "csync2 -rm /; csync2 -rxv || csync2 -rf / && csync2 -rxv"'.format(SSH_OPTION, seed_host)): 1903 print("") 1904 warn("csync2 run failed - some files may not be sync'd") 1905 1906 1907def join_ssh_merge(_cluster_node): 1908 status("Merging known_hosts") 1909 1910 me = utils.this_node() 1911 hosts = [m.group(1) 1912 for m in re.finditer(r"^\s*host\s*([^ ;]+)\s*;", open(CSYNC2_CFG).read(), re.M) 1913 if m.group(1) != me] 1914 if not hosts: 1915 hosts = [_cluster_node] 1916 warn("Unable to extract host list from %s" % (CSYNC2_CFG)) 1917 1918 try: 1919 import parallax 1920 except ImportError: 1921 error("parallax python library is missing") 1922 1923 opts = parallax.Options() 1924 opts.ssh_options = ['StrictHostKeyChecking=no'] 1925 1926 # The act of using pssh to connect to every host (without strict host key 1927 # checking) ensures that at least *this* host has every other host in its 1928 # known_hosts 1929 known_hosts_new = set() 1930 cat_cmd = "[ -e /root/.ssh/known_hosts ] && cat /root/.ssh/known_hosts || true" 1931 log("parallax.call {} : {}".format(hosts, cat_cmd)) 1932 results = parallax.call(hosts, cat_cmd, opts) 1933 for host, result in results.items(): 1934 if isinstance(result, parallax.Error): 1935 warn("Failed to get known_hosts from {}: {}".format(host, str(result))) 1936 else: 1937 if result[1]: 1938 known_hosts_new.update((utils.to_ascii(result[1]) or "").splitlines()) 1939 if known_hosts_new: 1940 hoststxt = "\n".join(sorted(known_hosts_new)) 1941 tmpf = utils.str2tmp(hoststxt) 1942 log("parallax.copy {} : {}".format(hosts, hoststxt)) 1943 results = parallax.copy(hosts, tmpf, "/root/.ssh/known_hosts") 1944 for host, result in results.items(): 1945 if isinstance(result, parallax.Error): 1946 warn("scp to {} failed ({}), known_hosts update may be incomplete".format(host, str(result))) 1947 1948 1949def update_expected_votes(): 1950 # get a list of nodes, excluding remote nodes 1951 nodelist = None 1952 loop_count = 0 1953 device_votes = 0 1954 nodecount = 0 1955 expected_votes = 0 1956 while True: 1957 rc, nodelist_text = utils.get_stdout("cibadmin -Ql --xpath '/cib/status/node_state'") 1958 if rc == 0: 1959 try: 1960 nodelist_xml = etree.fromstring(nodelist_text) 1961 nodelist = [n.get('uname') for n in nodelist_xml.xpath('//node_state') if n.get('remote_node') != 'true'] 1962 if len(nodelist) >= 2: 1963 break 1964 except Exception: 1965 break 1966 # timeout: 10 seconds 1967 if loop_count == 10: 1968 break 1969 loop_count += 1 1970 sleep(1) 1971 1972 # Increase expected_votes 1973 # TODO: wait to adjust expected_votes until after cluster join, 1974 # so that we can ask the cluster for the current membership list 1975 # Have to check if a qnetd device is configured and increase 1976 # expected_votes in that case 1977 is_qdevice_configured = utils.is_qdevice_configured() 1978 if nodelist is None: 1979 for v in corosync.get_values("quorum.expected_votes"): 1980 expected_votes = v 1981 1982 # For node >= 2, expected_votes = nodecount + device_votes 1983 # Assume nodecount is N, for ffsplit, qdevice only has one vote 1984 # which means that device_votes is 1, ie:expected_votes = N + 1; 1985 # while for lms, qdevice has N - 1 votes, ie: expected_votes = N + (N - 1) 1986 # and update quorum.device.net.algorithm based on device_votes 1987 1988 if corosync.get_value("quorum.device.net.algorithm") == "lms": 1989 device_votes = int((expected_votes - 1) / 2) 1990 nodecount = expected_votes - device_votes 1991 # as nodecount will increase 1, and device_votes is nodecount - 1 1992 # device_votes also increase 1 1993 device_votes += 1 1994 elif corosync.get_value("quorum.device.net.algorithm") == "ffsplit": 1995 device_votes = 1 1996 nodecount = expected_votes - device_votes 1997 elif is_qdevice_configured: 1998 device_votes = 0 1999 nodecount = v 2000 2001 nodecount += 1 2002 expected_votes = nodecount + device_votes 2003 corosync.set_value("quorum.expected_votes", str(expected_votes)) 2004 else: 2005 nodecount = len(nodelist) 2006 expected_votes = 0 2007 # For node >= 2, expected_votes = nodecount + device_votes 2008 # Assume nodecount is N, for ffsplit, qdevice only has one vote 2009 # which means that device_votes is 1, ie:expected_votes = N + 1; 2010 # while for lms, qdevice has N - 1 votes, ie: expected_votes = N + (N - 1) 2011 if corosync.get_value("quorum.device.net.algorithm") == "ffsplit": 2012 device_votes = 1 2013 if corosync.get_value("quorum.device.net.algorithm") == "lms": 2014 device_votes = nodecount - 1 2015 2016 if nodecount > 1: 2017 expected_votes = nodecount + device_votes 2018 2019 if corosync.get_value("quorum.expected_votes"): 2020 corosync.set_value("quorum.expected_votes", str(expected_votes)) 2021 if is_qdevice_configured: 2022 corosync.set_value("quorum.device.votes", device_votes) 2023 corosync.set_value("quorum.two_node", 1 if expected_votes == 2 else 0) 2024 2025 csync2_update(corosync.conf()) 2026 2027 2028def setup_passwordless_with_other_nodes(init_node): 2029 """ 2030 Setup passwordless with other cluster nodes 2031 2032 Should fetch the node list from init node, then swap the key 2033 """ 2034 # Fetch cluster nodes list 2035 cmd = "ssh {} root@{} crm_node -l".format(SSH_OPTION, init_node) 2036 rc, out, err = utils.get_stdout_stderr(cmd) 2037 if rc != 0: 2038 error("Can't fetch cluster nodes list from {}: {}".format(init_node, err)) 2039 cluster_nodes_list = [] 2040 for line in out.splitlines(): 2041 _, node, stat = line.split() 2042 if stat == "member": 2043 cluster_nodes_list.append(node) 2044 2045 # Filter out init node from cluster_nodes_list 2046 cmd = "ssh {} root@{} hostname".format(SSH_OPTION, init_node) 2047 rc, out, err = utils.get_stdout_stderr(cmd) 2048 if rc != 0: 2049 error("Can't fetch hostname of {}: {}".format(init_node, err)) 2050 if out in cluster_nodes_list: 2051 cluster_nodes_list.remove(out) 2052 2053 # Swap ssh public key between join node and other cluster nodes 2054 for node in cluster_nodes_list: 2055 for user in USER_LIST: 2056 swap_public_ssh_key(node, user) 2057 2058 2059def sync_files_to_disk(): 2060 """ 2061 Sync file content to disk between cluster nodes 2062 """ 2063 files_string = ' '.join(filter(lambda f: os.path.isfile(f), FILES_TO_SYNC)) 2064 if files_string: 2065 utils.cluster_run_cmd("sync {}".format(files_string.strip())) 2066 2067 2068def join_cluster(seed_host): 2069 """ 2070 Cluster configuration for joining node. 2071 """ 2072 def get_local_nodeid(): 2073 # for IPv6 2074 return utils.gen_nodeid_from_ipv6(_context.local_ip_list[0]) 2075 2076 def update_nodeid(nodeid, node=None): 2077 # for IPv6 2078 if node and node != utils.this_node(): 2079 cmd = "crm corosync set totem.nodeid %d" % nodeid 2080 invoke("crm cluster run '{}' {}".format(cmd, node)) 2081 else: 2082 corosync.set_value("totem.nodeid", nodeid) 2083 2084 shutil.copy(corosync.conf(), COROSYNC_CONF_ORIG) 2085 2086 # check if use IPv6 2087 ipv6_flag = False 2088 ipv6 = corosync.get_value("totem.ip_version") 2089 if ipv6 and ipv6 == "ipv6": 2090 ipv6_flag = True 2091 _context.ipv6 = ipv6_flag 2092 2093 init_network() 2094 2095 # check whether have two rings 2096 rrp_flag = False 2097 rrp = corosync.get_value("totem.rrp_mode") 2098 if rrp in ('active', 'passive'): 2099 rrp_flag = True 2100 2101 # It would be massively useful at this point if new nodes could come 2102 # up in standby mode, so we could query the CIB locally to see if 2103 # there was any further local setup that needed doing, e.g.: creating 2104 # mountpoints for clustered filesystems. Unfortunately we don't have 2105 # that yet, so the following crawling horror takes a punt on the seed 2106 # node being up, then asks it for a list of mountpoints... 2107 if _context.cluster_node: 2108 _rc, outp, _ = utils.get_stdout_stderr("ssh {} root@{} 'cibadmin -Q --xpath \"//primitive\"'".format(SSH_OPTION, seed_host)) 2109 if outp: 2110 xml = etree.fromstring(outp) 2111 mountpoints = xml.xpath(' and '.join(['//primitive[@class="ocf"', 2112 '@provider="heartbeat"', 2113 '@type="Filesystem"]']) + 2114 '/instance_attributes/nvpair[@name="directory"]/@value') 2115 for m in mountpoints: 2116 invoke("mkdir -p {}".format(m)) 2117 else: 2118 status("No existing IP/hostname specified - skipping mountpoint detection/creation") 2119 2120 # Bump expected_votes in corosync.conf 2121 # TODO(must): this is rather fragile (see related code in ha-cluster-remove) 2122 2123 # If corosync.conf() doesn't exist or is empty, we will fail here. (bsc#943227) 2124 if not os.path.exists(corosync.conf()): 2125 error("{} is not readable. Please ensure that hostnames are resolvable.".format(corosync.conf())) 2126 2127 # if unicast, we need to add our node to $corosync.conf() 2128 is_unicast = corosync.is_unicast() 2129 if is_unicast: 2130 ringXaddr_res = [] 2131 for i in 0, 1: 2132 while True: 2133 ringXaddr = prompt_for_string( 2134 'Address for ring{}'.format(i), 2135 default=pick_default_value(_context.default_ip_list, ringXaddr_res), 2136 valid_func=Validation.valid_ucast_ip, 2137 prev_value=ringXaddr_res) 2138 if not ringXaddr: 2139 error("No value for ring{}".format(i)) 2140 ringXaddr_res.append(ringXaddr) 2141 break 2142 if not rrp_flag: 2143 break 2144 print("") 2145 invoke("rm -f /var/lib/heartbeat/crm/* /var/lib/pacemaker/cib/*") 2146 try: 2147 corosync.add_node_ucast(ringXaddr_res) 2148 except corosync.IPAlreadyConfiguredError as e: 2149 warn(e) 2150 csync2_update(corosync.conf()) 2151 invoke("ssh {} root@{} corosync-cfgtool -R".format(SSH_OPTION, seed_host)) 2152 2153 _context.sbd_manager.join_sbd(seed_host) 2154 2155 if ipv6_flag and not is_unicast: 2156 # for ipv6 mcast 2157 # using ipv6 need nodeid configured 2158 local_nodeid = get_local_nodeid() 2159 update_nodeid(local_nodeid) 2160 2161 is_qdevice_configured = utils.is_qdevice_configured() 2162 if is_qdevice_configured and not is_unicast: 2163 # expected_votes here maybe is "0", set to "3" to make sure cluster can start 2164 corosync.set_value("quorum.expected_votes", "3") 2165 2166 # Initialize the cluster before adjusting quorum. This is so 2167 # that we can query the cluster to find out how many nodes 2168 # there are (so as not to adjust multiple times if a previous 2169 # attempt to join the cluster failed) 2170 init_cluster_local() 2171 2172 with status_long("Reloading cluster configuration"): 2173 2174 if ipv6_flag and not is_unicast: 2175 # for ipv6 mcast 2176 nodeid_dict = {} 2177 _rc, outp, _ = utils.get_stdout_stderr("crm_node -l") 2178 if _rc == 0: 2179 for line in outp.split('\n'): 2180 tmp = line.split() 2181 nodeid_dict[tmp[1]] = tmp[0] 2182 2183 # apply nodelist in cluster 2184 if is_unicast or is_qdevice_configured: 2185 invoke("crm cluster run 'crm corosync reload'") 2186 2187 update_expected_votes() 2188 # Trigger corosync config reload to ensure expected_votes is propagated 2189 invoke("corosync-cfgtool -R") 2190 2191 # Ditch no-quorum-policy=ignore 2192 _rc, outp = utils.get_stdout("crm configure show") 2193 if re.search('no-quorum-policy=.*ignore', outp): 2194 invoke("crm_attribute --attr-name no-quorum-policy --delete-attr") 2195 2196 # if unicast, we need to reload the corosync configuration 2197 # on the other nodes 2198 if is_unicast: 2199 invoke("crm cluster run 'crm corosync reload'") 2200 2201 if ipv6_flag and not is_unicast: 2202 # for ipv6 mcast 2203 # after csync2_update, all config files are same 2204 # but nodeid must be uniqe 2205 for node in list(nodeid_dict.keys()): 2206 if node == utils.this_node(): 2207 continue 2208 update_nodeid(int(nodeid_dict[node]), node) 2209 update_nodeid(local_nodeid) 2210 2211 sync_files_to_disk() 2212 2213 if is_qdevice_configured: 2214 start_qdevice_on_join_node(seed_host) 2215 else: 2216 utils.disable_service("corosync-qdevice.service") 2217 2218 2219def start_qdevice_on_join_node(seed_host): 2220 """ 2221 Doing qdevice certificate process and start qdevice service on join node 2222 """ 2223 with status_long("Starting corosync-qdevice.service"): 2224 if not corosync.is_unicast(): 2225 corosync.add_nodelist_from_cmaptool() 2226 csync2_update(corosync.conf()) 2227 invoke("crm corosync reload") 2228 if utils.is_qdevice_tls_on(): 2229 qnetd_addr = corosync.get_value("quorum.device.net.host") 2230 qdevice_inst = corosync.QDevice(qnetd_addr, cluster_node=seed_host) 2231 qdevice_inst.certificate_process_on_join() 2232 utils.start_service("corosync-qdevice.service", enable=True) 2233 2234 2235def set_cluster_node_ip(): 2236 """ 2237 ringx_addr might be hostname or IP 2238 _context.cluster_node by now is always hostname 2239 2240 If ring0_addr is IP, we should get the configured iplist which belong _context.cluster_node 2241 Then filter out which one is configured as ring0_addr 2242 At last assign that ip to _context.cluster_node_ip which will be removed later 2243 """ 2244 node = _context.cluster_node 2245 addr_list = corosync.get_values('nodelist.node.ring0_addr') 2246 if node in addr_list: 2247 return 2248 2249 ip_list = utils.get_iplist_from_name(node) 2250 for ip in ip_list: 2251 if ip in addr_list: 2252 _context.cluster_node_ip = ip 2253 break 2254 2255 2256def stop_services(stop_list, remote_addr=None): 2257 """ 2258 Stop cluster related service 2259 """ 2260 for service in stop_list: 2261 if utils.service_is_active(service, remote_addr=remote_addr): 2262 status("Stopping the {}".format(service)) 2263 utils.stop_service(service, disable=True, remote_addr=remote_addr) 2264 2265 2266def remove_node_from_cluster(): 2267 """ 2268 Remove node from running cluster and the corosync / pacemaker configuration. 2269 """ 2270 node = _context.cluster_node 2271 set_cluster_node_ip() 2272 2273 stop_services(SERVICES_STOP_LIST, remote_addr=node) 2274 2275 # delete configuration files from the node to be removed 2276 rc, _, err = invoke('ssh {} root@{} "bash -c \\\"rm -f {}\\\""'.format(SSH_OPTION, node, " ".join(_context.rm_list))) 2277 if not rc: 2278 error("Deleting the configuration files failed: {}".format(err)) 2279 2280 # execute the command : crm node delete $HOSTNAME 2281 status("Removing the node {}".format(node)) 2282 if not invokerc("crm node delete {}".format(node)): 2283 error("Failed to remove {}".format(node)) 2284 2285 if not invokerc("sed -i /{}/d {}".format(node, CSYNC2_CFG)): 2286 error("Removing the node {} from {} failed".format(node, CSYNC2_CFG)) 2287 2288 # Remove node from nodelist 2289 if corosync.get_values("nodelist.node.ring0_addr"): 2290 del_target = _context.cluster_node_ip or node 2291 corosync.del_node(del_target) 2292 2293 decrease_expected_votes() 2294 2295 status("Propagating configuration changes across the remaining nodes") 2296 csync2_update(CSYNC2_CFG) 2297 csync2_update(corosync.conf()) 2298 2299 # Trigger corosync config reload to ensure expected_votes is propagated 2300 invoke("corosync-cfgtool -R") 2301 2302 2303def decrease_expected_votes(): 2304 ''' 2305 Decrement expected_votes in corosync.conf 2306 ''' 2307 vote = corosync.get_value("quorum.expected_votes") 2308 if not vote: 2309 return 2310 quorum = int(vote) 2311 new_quorum = quorum - 1 2312 if utils.is_qdevice_configured(): 2313 new_nodecount = 0 2314 device_votes = 0 2315 nodecount = 0 2316 2317 if corosync.get_value("quorum.device.net.algorithm") == "lms": 2318 nodecount = int((quorum + 1)/2) 2319 new_nodecount = nodecount - 1 2320 device_votes = new_nodecount - 1 2321 2322 elif corosync.get_value("quorum.device.net.algorithm") == "ffsplit": 2323 device_votes = 1 2324 nodecount = quorum - device_votes 2325 new_nodecount = nodecount - 1 2326 2327 if new_nodecount > 1: 2328 new_quorum = new_nodecount + device_votes 2329 else: 2330 new_quorum = 0 2331 2332 corosync.set_value("quorum.device.votes", device_votes) 2333 else: 2334 corosync.set_value("quorum.two_node", 1 if new_quorum == 2 else 0) 2335 corosync.set_value("quorum.expected_votes", str(new_quorum)) 2336 2337 2338def bootstrap_init(context): 2339 """ 2340 Init cluster process 2341 """ 2342 global _context 2343 _context = context 2344 2345 init() 2346 2347 stage = _context.stage 2348 if stage is None: 2349 stage = "" 2350 2351 # vgfs stage requires running cluster, everything else requires inactive cluster, 2352 # except ssh and csync2 (which don't care) and csync2_remote (which mustn't care, 2353 # just in case this breaks ha-cluster-join on another node). 2354 corosync_active = utils.service_is_active("corosync.service") 2355 if stage in ("vgfs", "admin", "qdevice", "ocfs2"): 2356 if not corosync_active: 2357 error("Cluster is inactive - can't run %s stage" % (stage)) 2358 elif stage == "": 2359 if corosync_active: 2360 error("Cluster is currently active - can't run") 2361 elif stage not in ("ssh", "ssh_remote", "csync2", "csync2_remote", "sbd", "ocfs2"): 2362 if corosync_active: 2363 error("Cluster is currently active - can't run %s stage" % (stage)) 2364 2365 _context.initialize_qdevice() 2366 _context.validate_option() 2367 _context.init_sbd_manager() 2368 2369 # Need hostname resolution to work, want NTP (but don't block ssh_remote or csync2_remote) 2370 if stage not in ('ssh_remote', 'csync2_remote'): 2371 check_tty() 2372 if not check_prereqs(stage): 2373 return 2374 elif stage == 'csync2_remote': 2375 args = _context.args 2376 log("args: {}".format(args)) 2377 if len(args) != 2: 2378 error("Expected NODE argument to csync2_remote") 2379 _context.cluster_node = args[1] 2380 2381 if stage != "": 2382 globals()["init_" + stage]() 2383 else: 2384 init_ssh() 2385 init_csync2() 2386 init_corosync() 2387 init_remote_auth() 2388 init_sbd() 2389 2390 lock_inst = lock.Lock() 2391 try: 2392 with lock_inst.lock(): 2393 init_cluster() 2394 init_admin() 2395 init_qdevice() 2396 init_ocfs2() 2397 except lock.ClaimLockError as err: 2398 error(err) 2399 2400 status("Done (log saved to %s)" % (LOG_FILE)) 2401 2402 2403def bootstrap_join(context): 2404 """ 2405 Join cluster process 2406 """ 2407 global _context 2408 _context = context 2409 2410 init() 2411 _context.init_sbd_manager() 2412 _context.validate_option() 2413 2414 check_tty() 2415 2416 corosync_active = utils.service_is_active("corosync.service") 2417 if corosync_active and _context.stage != "ssh": 2418 error("Abort: Cluster is currently active. Run this command on a node joining the cluster.") 2419 2420 if not check_prereqs("join"): 2421 return 2422 2423 cluster_node = _context.cluster_node 2424 if _context.stage != "": 2425 globals()["join_" + _context.stage](cluster_node) 2426 else: 2427 if not _context.yes_to_all and cluster_node is None: 2428 status("""Join This Node to Cluster: 2429 You will be asked for the IP address of an existing node, from which 2430 configuration will be copied. If you have not already configured 2431 passwordless ssh between nodes, you will be prompted for the root 2432 password of the existing node. 2433""") 2434 cluster_node = prompt_for_string("IP address or hostname of existing node (e.g.: 192.168.1.1)", ".+") 2435 _context.cluster_node = cluster_node 2436 2437 utils.ping_node(cluster_node) 2438 2439 join_ssh(cluster_node) 2440 2441 if not utils.service_is_active("pacemaker.service", cluster_node): 2442 error("Cluster is inactive on {}".format(cluster_node)) 2443 2444 lock_inst = lock.RemoteLock(cluster_node) 2445 try: 2446 with lock_inst.lock(): 2447 setup_passwordless_with_other_nodes(cluster_node) 2448 join_remote_auth(cluster_node) 2449 join_csync2(cluster_node) 2450 join_ssh_merge(cluster_node) 2451 probe_partitions() 2452 join_ocfs2(cluster_node) 2453 join_cluster(cluster_node) 2454 except (lock.SSHError, lock.ClaimLockError) as err: 2455 error(err) 2456 2457 status("Done (log saved to %s)" % (LOG_FILE)) 2458 2459 2460def join_ocfs2(peer_host): 2461 """ 2462 If init node configured OCFS2 device, verify that device on join node 2463 """ 2464 ocfs2_inst = ocfs2.OCFS2Manager(_context) 2465 ocfs2_inst.join_ocfs2(peer_host) 2466 2467 2468def join_remote_auth(node): 2469 if os.path.exists(PCMK_REMOTE_AUTH): 2470 rmfile(PCMK_REMOTE_AUTH) 2471 pcmk_remote_dir = os.path.dirname(PCMK_REMOTE_AUTH) 2472 mkdirs_owned(pcmk_remote_dir, mode=0o750, gid="haclient") 2473 invoke("touch {}".format(PCMK_REMOTE_AUTH)) 2474 2475 2476def remove_qdevice(): 2477 """ 2478 Remove qdevice service and configuration from cluster 2479 """ 2480 if not utils.is_qdevice_configured(): 2481 error("No QDevice configuration in this cluster") 2482 if not confirm("Removing QDevice service and configuration from cluster: Are you sure?"): 2483 return 2484 2485 utils.check_all_nodes_reachable() 2486 _context.qdevice_reload_policy = evaluate_qdevice_quorum_effect(QDEVICE_REMOVE) 2487 2488 status("Disable corosync-qdevice.service") 2489 invoke("crm cluster run 'systemctl disable corosync-qdevice'") 2490 if _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RELOAD: 2491 status("Stopping corosync-qdevice.service") 2492 invoke("crm cluster run 'systemctl stop corosync-qdevice'") 2493 2494 with status_long("Removing QDevice configuration from cluster"): 2495 qnetd_host = corosync.get_value('quorum.device.net.host') 2496 qdevice_inst = corosync.QDevice(qnetd_host) 2497 qdevice_inst.remove_qdevice_config() 2498 qdevice_inst.remove_qdevice_db() 2499 update_expected_votes() 2500 if _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RELOAD: 2501 invoke("crm cluster run 'crm corosync reload'") 2502 elif _context.qdevice_reload_policy == QdevicePolicy.QDEVICE_RESTART: 2503 status("Restarting cluster service") 2504 utils.cluster_run_cmd("crm cluster restart") 2505 wait_for_cluster() 2506 else: 2507 warn("To remove qdevice service, need to restart cluster service manually on each node") 2508 2509 2510def bootstrap_remove(context): 2511 """ 2512 Remove node from cluster, or remove qdevice configuration 2513 """ 2514 global _context 2515 _context = context 2516 force_flag = config.core.force or _context.force 2517 2518 init() 2519 2520 if not utils.service_is_active("corosync.service"): 2521 error("Cluster is not active - can't execute removing action") 2522 2523 if _context.qdevice_rm_flag and _context.cluster_node: 2524 error("Either remove node or qdevice") 2525 2526 if _context.qdevice_rm_flag: 2527 remove_qdevice() 2528 return 2529 2530 if not _context.yes_to_all and _context.cluster_node is None: 2531 status("""Remove This Node from Cluster: 2532 You will be asked for the IP address or name of an existing node, 2533 which will be removed from the cluster. This command must be 2534 executed from a different node in the cluster. 2535""") 2536 _context.cluster_node = prompt_for_string("IP address or hostname of cluster node (e.g.: 192.168.1.1)", ".+") 2537 2538 if not _context.cluster_node: 2539 error("No existing IP/hostname specified (use -c option)") 2540 2541 _context.cluster_node = get_cluster_node_hostname() 2542 2543 if not force_flag and not confirm("Removing node \"{}\" from the cluster: Are you sure?".format(_context.cluster_node)): 2544 return 2545 2546 if _context.cluster_node == utils.this_node(): 2547 if not force_flag: 2548 error("Removing self requires --force") 2549 remove_self() 2550 return 2551 2552 if _context.cluster_node in xmlutil.listnodes(): 2553 remove_node_from_cluster() 2554 else: 2555 error("Specified node {} is not configured in cluster! Unable to remove.".format(_context.cluster_node)) 2556 2557 2558def remove_self(): 2559 me = _context.cluster_node 2560 yes_to_all = _context.yes_to_all 2561 nodes = xmlutil.listnodes(include_remote_nodes=False) 2562 othernode = next((x for x in nodes if x != me), None) 2563 if othernode is not None: 2564 # remove from other node 2565 cmd = "crm cluster remove{} -c {}".format(" -y" if yes_to_all else "", me) 2566 rc = utils.ext_cmd_nosudo("ssh{} {} {} '{}'".format("" if yes_to_all else " -t", SSH_OPTION, othernode, cmd)) 2567 if rc != 0: 2568 error("Failed to remove this node from {}".format(othernode)) 2569 else: 2570 # disable and stop cluster 2571 stop_services(SERVICES_STOP_LIST) 2572 # remove all trace of cluster from this node 2573 # delete configuration files from the node to be removed 2574 if not invokerc('bash -c "rm -f {}"'.format(" ".join(_context.rm_list))): 2575 error("Deleting the configuration files failed") 2576 2577 2578def init_common_geo(): 2579 """ 2580 Tasks to do both on first and other geo nodes. 2581 """ 2582 if not utils.package_is_installed("booth"): 2583 error("Booth not installed - Not configurable as a geo cluster node.") 2584 2585 2586def init_csync2_geo(): 2587 """ 2588 TODO: Configure csync2 for geo cluster 2589 That is, create a second sync group which 2590 syncs the geo configuration across the whole 2591 geo cluster. 2592 """ 2593 2594 2595def create_booth_authkey(): 2596 status("Create authentication key for booth") 2597 if os.path.exists(BOOTH_AUTH): 2598 rmfile(BOOTH_AUTH) 2599 rc, _, err = invoke("booth-keygen {}".format(BOOTH_AUTH)) 2600 if not rc: 2601 error("Failed to generate booth authkey: {}".format(err)) 2602 2603 2604def create_booth_config(arbitrator, clusters, tickets): 2605 status("Configure booth") 2606 2607 config_template = """# The booth configuration file is "/etc/booth/booth.conf". You need to 2608# prepare the same booth configuration file on each arbitrator and 2609# each node in the cluster sites where the booth daemon can be launched. 2610 2611# "transport" means which transport layer booth daemon will use. 2612# Currently only "UDP" is supported. 2613transport="UDP" 2614port="9929" 2615""" 2616 cfg = [config_template] 2617 if arbitrator is not None: 2618 cfg.append("arbitrator=\"{}\"".format(arbitrator)) 2619 for s in clusters.values(): 2620 cfg.append("site=\"{}\"".format(s)) 2621 cfg.append("authfile=\"{}\"".format(BOOTH_AUTH)) 2622 for t in tickets: 2623 cfg.append("ticket=\"{}\"\nexpire=\"600\"".format(t)) 2624 cfg = "\n".join(cfg) + "\n" 2625 2626 if os.path.exists(BOOTH_CFG): 2627 rmfile(BOOTH_CFG) 2628 utils.str2file(cfg, BOOTH_CFG) 2629 utils.chown(BOOTH_CFG, "hacluster", "haclient") 2630 os.chmod(BOOTH_CFG, 0o644) 2631 2632 2633def bootstrap_init_geo(context): 2634 """ 2635 Configure as a geo cluster member. 2636 """ 2637 global _context 2638 _context = context 2639 2640 if os.path.exists(BOOTH_CFG) and not confirm("This will overwrite {} - continue?".format(BOOTH_CFG)): 2641 return 2642 if os.path.exists(BOOTH_AUTH) and not confirm("This will overwrite {} - continue?".format(BOOTH_AUTH)): 2643 return 2644 2645 init_common_geo() 2646 2647 # TODO: 2648 # in /etc/drbd.conf or /etc/drbd.d/global_common.conf 2649 # set common.startup.wfc-timeout 100 2650 # set common.startup.degr-wfc-timeout 120 2651 2652 create_booth_authkey() 2653 create_booth_config(_context.arbitrator, _context.clusters, _context.tickets) 2654 status("Sync booth configuration across cluster") 2655 csync2_update(BOOTH_DIR) 2656 init_csync2_geo() 2657 geo_cib_config(_context.clusters) 2658 2659 2660def geo_fetch_config(node): 2661 # TODO: clean this up 2662 status("Retrieving configuration - This may prompt for root@%s:" % (node)) 2663 tmpdir = tmpfiles.create_dir() 2664 rc, _, err = invoke("scp -oStrictHostKeyChecking=no root@{}:'{}/*' {}/".format(node, BOOTH_DIR, tmpdir)) 2665 if not rc: 2666 error("Failed to retrieve configuration: {}".format(err)) 2667 try: 2668 if os.path.isfile("%s/authkey" % (tmpdir)): 2669 invoke("mv %s/authkey %s" % (tmpdir, BOOTH_AUTH)) 2670 os.chmod(BOOTH_AUTH, 0o600) 2671 if os.path.isfile("%s/booth.conf" % (tmpdir)): 2672 invoke("mv %s/booth.conf %s" % (tmpdir, BOOTH_CFG)) 2673 os.chmod(BOOTH_CFG, 0o644) 2674 except OSError as err: 2675 raise ValueError("Problem encountered with booth configuration from {}: {}".format(node, err)) 2676 2677 2678def geo_cib_config(clusters): 2679 cluster_name = corosync.get_values('totem.cluster_name')[0] 2680 if cluster_name not in list(clusters.keys()): 2681 error("Local cluster name is {}, expected {}".format(cluster_name, "|".join(list(clusters.keys())))) 2682 2683 status("Configure cluster resources for booth") 2684 crm_template = Template(""" 2685primitive booth-ip ocf:heartbeat:IPaddr2 $iprules 2686primitive booth-site ocf:pacemaker:booth-site \ 2687 meta resource-stickiness="INFINITY" \ 2688 params config=booth op monitor interval="10s" 2689group g-booth booth-ip booth-site meta target-role=Stopped 2690""") 2691 iprule = 'params rule #cluster-name eq {} ip="{}"' 2692 2693 crm_configure_load("update", crm_template.substitute(iprules=" ".join(iprule.format(k, v) for k, v in clusters.items()))) 2694 2695 2696def bootstrap_join_geo(context): 2697 """ 2698 Run on second cluster to add to a geo configuration. 2699 It fetches its booth configuration from the other node (cluster node or arbitrator). 2700 """ 2701 global _context 2702 _context = context 2703 init_common_geo() 2704 check_tty() 2705 geo_fetch_config(_context.cluster_node) 2706 status("Sync booth configuration across cluster") 2707 csync2_update(BOOTH_DIR) 2708 geo_cib_config(_context.clusters) 2709 2710 2711def bootstrap_arbitrator(context): 2712 """ 2713 Configure this machine as an arbitrator. 2714 It fetches its booth configuration from a cluster node already in the cluster. 2715 """ 2716 global _context 2717 _context = context 2718 node = _context.cluster_node 2719 2720 init_common_geo() 2721 check_tty() 2722 geo_fetch_config(node) 2723 if not os.path.isfile(BOOTH_CFG): 2724 error("Failed to copy {} from {}".format(BOOTH_CFG, node)) 2725 # TODO: verify that the arbitrator IP in the configuration is us? 2726 status("Enabling and starting the booth arbitrator service") 2727 utils.start_service("booth@booth", enable=True) 2728 2729# EOF 2730