1# Copyright (C) 2013 Kristoffer Gronlund <kgronlund@suse.com> 2# See COPYING for license information. 3''' 4Functions that abstract creating and editing the corosync.conf 5configuration file, and also the corosync-* utilities. 6''' 7 8import os 9import re 10import socket 11from . import utils 12from . import tmpfiles 13from . import parallax 14from .msg import err_buf, common_debug 15 16 17def conf(): 18 return os.getenv('COROSYNC_MAIN_CONFIG_FILE', '/usr/local/etc/corosync/corosync.conf') 19 20 21def check_tools(): 22 return all(utils.is_program(p) 23 for p in ['corosync-cfgtool', 'corosync-quorumtool', 'corosync-cmapctl']) 24 25 26def cfgtool(*args): 27 return utils.get_stdout(['corosync-cfgtool'] + list(args), shell=False) 28 29 30def quorumtool(*args): 31 return utils.get_stdout(['corosync-quorumtool'] + list(args), shell=False) 32 33 34def query_status(status_type): 35 """ 36 Query status of corosync 37 38 Possible types could be ring/quorum/qdevice/qnetd 39 """ 40 status_func_dict = { 41 "ring": query_ring_status, 42 "quorum": query_quorum_status, 43 "qdevice": query_qdevice_status, 44 "qnetd": query_qnetd_status 45 } 46 if status_type in status_func_dict: 47 status_func_dict[status_type]() 48 else: 49 raise ValueError("Wrong type \"{}\" to query status".format(status_type)) 50 51 52def query_ring_status(): 53 """ 54 Query corosync ring status 55 """ 56 rc, out, err = utils.get_stdout_stderr("corosync-cfgtool -s") 57 if rc != 0 and err: 58 raise ValueError(err) 59 if rc == 0 and out: 60 print(out) 61 62 63def query_quorum_status(): 64 """ 65 Query corosync quorum status 66 67 """ 68 utils.print_cluster_nodes() 69 rc, out, err = utils.get_stdout_stderr("corosync-quorumtool -s") 70 if rc != 0 and err: 71 raise ValueError(err) 72 # If the return code of corosync-quorumtool is 2, 73 # that means no problem appeared but node is not quorate 74 if rc in [0, 2] and out: 75 print(out) 76 77 78def query_qdevice_status(): 79 """ 80 Query qdevice status 81 """ 82 if not utils.is_qdevice_configured(): 83 raise ValueError("QDevice/QNetd not configured!") 84 cmd = "corosync-qdevice-tool -sv" 85 out = utils.get_stdout_or_raise_error(cmd) 86 utils.print_cluster_nodes() 87 print(out) 88 89 90def query_qnetd_status(): 91 """ 92 Query qnetd status 93 """ 94 if not utils.is_qdevice_configured(): 95 raise ValueError("QDevice/QNetd not configured!") 96 cluster_name = get_value('totem.cluster_name') 97 if not cluster_name: 98 raise ValueError("cluster_name not configured!") 99 qnetd_addr = get_value('quorum.device.net.host') 100 if not qnetd_addr: 101 raise ValueError("host for qnetd not configured!") 102 103 # Configure ssh passwordless to qnetd if detect password is needed 104 if utils.check_ssh_passwd_need(qnetd_addr): 105 print("Copy ssh key to qnetd node({})".format(qnetd_addr)) 106 rc, _, err = utils.get_stdout_stderr("ssh-copy-id -i /root/.ssh/id_rsa.pub root@{}".format(qnetd_addr)) 107 if rc != 0: 108 raise ValueError(err) 109 110 cmd = "corosync-qnetd-tool -lv -c {}".format(cluster_name) 111 result = parallax.parallax_call([qnetd_addr], cmd) 112 _, qnetd_result_stdout, _ = result[0][1] 113 if qnetd_result_stdout: 114 utils.print_cluster_nodes() 115 print(utils.to_ascii(qnetd_result_stdout)) 116 117 118def add_nodelist_from_cmaptool(): 119 for nodeid, iplist in utils.get_nodeinfo_from_cmaptool().items(): 120 try: 121 add_node_ucast(iplist, nodeid) 122 except IPAlreadyConfiguredError: 123 continue 124 125 126def is_unicast(): 127 return get_value("totem.transport") == "udpu" 128 129 130_tCOMMENT = 0 131_tBEGIN = 1 132_tEND = 2 133_tVALUE = 3 134 135 136class Token(object): 137 def __init__(self, token, path, key=None, value=None): 138 self.token = token 139 self.path = '.'.join(path) 140 self.key = key 141 self.value = value 142 143 def __repr__(self): 144 if self.token == _tCOMMENT: 145 return self.key 146 elif self.token == _tBEGIN: 147 return "%s {" % (self.key) 148 elif self.token == _tEND: 149 return '}' 150 return '%s: %s' % (self.key, self.value) 151 152 153class QDevice(object): 154 """ 155 Class to manage qdevice configuration and services 156 157 Whole certification process: 158 For init 159 Step 1: init_db_on_qnetd 160 Step 2: fetch_qnetd_crt_from_qnetd 161 Step 3: copy_qnetd_crt_to_cluster 162 Step 4: init_db_on_cluster 163 Step 5: create_ca_request 164 Step 6: copy_crq_to_qnetd 165 Step 7: sign_crq_on_qnetd 166 Step 8: fetch_cluster_crt_from_qnetd 167 Step 9: import_cluster_crt 168 Step 10: copy_p12_to_cluster 169 Step 11: import_p12_on_cluster 170 171 For join 172 Step 1: fetch_qnetd_crt_from_cluster 173 Step 2: init_db_on_local 174 Step 3: fetch_p12_from_cluster 175 Step 4: import_p12_on_local 176 """ 177 qnetd_service = "corosync-qnetd.service" 178 qnetd_cacert_filename = "qnetd-cacert.crt" 179 qdevice_crq_filename = "qdevice-net-node.crq" 180 qdevice_p12_filename = "qdevice-net-node.p12" 181 qnetd_path = "/usr/local/etc/corosync/qnetd" 182 qdevice_path = "/usr/local/etc/corosync/qdevice/net" 183 qdevice_db_path = "/usr/local/etc/corosync/qdevice/net/nssdb" 184 185 def __init__(self, qnetd_addr, port=5403, algo="ffsplit", tie_breaker="lowest", 186 tls="on", cluster_node=None, cmds=None, mode=None): 187 """ 188 Init function 189 """ 190 self.qnetd_addr = qnetd_addr 191 self.port = port 192 self.algo = algo 193 self.tie_breaker = tie_breaker 194 self.tls = tls 195 self.cluster_node = cluster_node 196 self.askpass = False 197 self.cmds = cmds 198 self.mode = mode 199 200 @property 201 def qnetd_cacert_on_qnetd(self): 202 """ 203 Return path of qnetd-cacert.crt on qnetd node 204 """ 205 return "{}/nssdb/{}".format(self.qnetd_path, self.qnetd_cacert_filename) 206 207 @property 208 def qnetd_cacert_on_local(self): 209 """ 210 Return path of qnetd-cacert.crt on local node 211 """ 212 return "{}/{}/{}".format(self.qdevice_path, self.qnetd_addr, self.qnetd_cacert_filename) 213 214 @property 215 def qnetd_cacert_on_cluster(self): 216 """ 217 Return path of qnetd-cacert.crt on cluster node 218 """ 219 return "{}/{}/{}".format(self.qdevice_path, self.cluster_node, self.qnetd_cacert_filename) 220 221 @property 222 def qdevice_crq_on_qnetd(self): 223 """ 224 Return path of qdevice-net-node.crq on qnetd node 225 """ 226 return "{}/nssdb/{}".format(self.qnetd_path, self.qdevice_crq_filename) 227 228 @property 229 def qdevice_crq_on_local(self): 230 """ 231 Return path of qdevice-net-node.crq on local node 232 """ 233 return "{}/nssdb/{}".format(self.qdevice_path, self.qdevice_crq_filename) 234 235 @property 236 def qnetd_cluster_crt_on_qnetd(self): 237 """ 238 Return path of cluster-cluster_name.crt on qnetd node 239 """ 240 return "{}/nssdb/cluster-{}.crt".format(self.qnetd_path, self.cluster_name) 241 242 @property 243 def qnetd_cluster_crt_on_local(self): 244 """ 245 Return path of cluster-cluster_name.crt on local node 246 """ 247 return "{}/{}/{}".format(self.qdevice_path, self.qnetd_addr, os.path.basename(self.qnetd_cluster_crt_on_qnetd)) 248 249 @property 250 def qdevice_p12_on_local(self): 251 """ 252 Return path of qdevice-net-node.p12 on local node 253 """ 254 return "{}/nssdb/{}".format(self.qdevice_path, self.qdevice_p12_filename) 255 256 @property 257 def qdevice_p12_on_cluster(self): 258 """ 259 Return path of qdevice-net-node.p12 on cluster node 260 """ 261 return "{}/{}/{}".format(self.qdevice_path, self.cluster_node, self.qdevice_p12_filename) 262 263 def valid_attr(self): 264 """ 265 Validate qdevice related options 266 """ 267 qnetd_ip = None 268 269 if not utils.package_is_installed("corosync-qdevice"): 270 raise ValueError("Package \"corosync-qdevice\" not installed on this node") 271 272 try: 273 # socket.getaddrinfo works for both ipv4 and ipv6 address 274 # The function returns a list of 5-tuples with the following structure: 275 # (family, type, proto, canonname, sockaddr) 276 # sockaddr is a tuple describing a socket address, whose format depends on the returned family 277 # (a (address, port) 2-tuple for AF_INET, a (address, port, flow info, scope id) 4-tuple for AF_INET6) 278 res = socket.getaddrinfo(self.qnetd_addr, None) 279 qnetd_ip = res[0][-1][0] 280 except socket.error: 281 raise ValueError("host \"{}\" is unreachable".format(self.qnetd_addr)) 282 283 utils.ping_node(self.qnetd_addr) 284 285 if utils.InterfacesInfo.ip_in_local(qnetd_ip): 286 raise ValueError("host for qnetd must be a remote one") 287 288 if not utils.check_port_open(qnetd_ip, 22): 289 raise ValueError("ssh service on \"{}\" not available".format(self.qnetd_addr)) 290 291 if not utils.valid_port(self.port): 292 raise ValueError("invalid qdevice port range(1024 - 65535)") 293 294 if self.algo not in ("ffsplit", "lms"): 295 raise ValueError("invalid ALGORITHM choice: '{}' (choose from 'ffsplit', 'lms')".format(self.algo)) 296 297 if self.tie_breaker not in ("lowest", "highest") and not utils.valid_nodeid(self.tie_breaker): 298 raise ValueError("invalid qdevice tie_breaker(lowest/highest/valid_node_id)") 299 300 if self.tls not in ("on", "off", "required"): 301 raise ValueError("invalid TLS choice: '{}' (choose from 'on', 'off', 'required')".format(self.tls)) 302 303 if self.cmds: 304 for cmd in self.cmds.strip(';').split(';'): 305 if not cmd.startswith('/'): 306 raise ValueError("commands for heuristics should be absolute path") 307 if not os.path.exists(cmd.split()[0]): 308 raise ValueError("command {} not exist".format(cmd.split()[0])) 309 310 def valid_qnetd(self): 311 """ 312 Validate on qnetd node 313 """ 314 if utils.check_ssh_passwd_need(self.qnetd_addr): 315 self.askpass = True 316 317 exception_msg = "" 318 suggest = "" 319 if utils.service_is_active("pacemaker", self.qnetd_addr): 320 exception_msg = "host for qnetd must be a non-cluster node" 321 suggest = "change to another host or stop cluster service on {}".format(self.qnetd_addr) 322 elif not utils.package_is_installed("corosync-qnetd", self.qnetd_addr): 323 exception_msg = "Package \"corosync-qnetd\" not installed on {}".format(self.qnetd_addr) 324 suggest = "install \"corosync-qnetd\" on {}".format(self.qnetd_addr) 325 326 if exception_msg: 327 exception_msg += "\nCluster service already successfully started on this node except qdevice service\nIf you still want to use qdevice, {}\nThen run command \"crm cluster init\" with \"qdevice\" stage, like:\n crm cluster init qdevice qdevice_related_options\nThat command will setup qdevice separately".format(suggest) 328 raise ValueError(exception_msg) 329 330 def manage_qnetd(self, action): 331 cmd = "systemctl {} {}".format(action, self.qnetd_service) 332 if self.askpass: 333 print("{} {} on {}".format(action.capitalize(), self.qnetd_service, self.qnetd_addr)) 334 parallax.parallax_call([self.qnetd_addr], cmd, self.askpass) 335 336 def enable_qnetd(self): 337 self.manage_qnetd("enable") 338 339 def disable_qnetd(self): 340 self.manage_qnetd("disable") 341 342 def start_qnetd(self): 343 self.manage_qnetd("start") 344 345 def stop_qnetd(self): 346 self.manage_qnetd("stop") 347 348 def debug_and_log_to_bootstrap(self, msg): 349 from . import bootstrap 350 common_debug(msg) 351 bootstrap.log("# " + msg) 352 353 def init_db_on_qnetd(self): 354 """ 355 Certificate process for init 356 Step 1 357 Initialize database on QNetd server by running corosync-qnetd-certutil -i 358 """ 359 cmd = "test -f {}".format(self.qnetd_cacert_on_qnetd) 360 if self.askpass: 361 print("Test whether {} exists on QNetd server({})".format(self.qnetd_cacert_on_qnetd, self.qnetd_addr)) 362 try: 363 parallax.parallax_call([self.qnetd_addr], cmd, self.askpass) 364 except ValueError: 365 # target file not exist 366 pass 367 else: 368 return 369 370 cmd = "corosync-qnetd-certutil -i" 371 desc = "Step 1: Initialize database on {}".format(self.qnetd_addr) 372 self.debug_and_log_to_bootstrap(desc) 373 if self.askpass: 374 print(desc) 375 parallax.parallax_call([self.qnetd_addr], cmd, self.askpass) 376 377 def fetch_qnetd_crt_from_qnetd(self): 378 """ 379 Certificate process for init 380 Step 2 381 Fetch QNetd CA certificate(qnetd-cacert.crt) from QNetd server 382 """ 383 if os.path.exists(self.qnetd_cacert_on_local): 384 return 385 386 desc = "Step 2: Fetch {} from {}".format(self.qnetd_cacert_filename, self.qnetd_addr) 387 self.debug_and_log_to_bootstrap(desc) 388 if self.askpass: 389 print(desc) 390 parallax.parallax_slurp([self.qnetd_addr], self.qdevice_path, self.qnetd_cacert_on_qnetd, self.askpass) 391 392 def copy_qnetd_crt_to_cluster(self): 393 """ 394 Certificate process for init 395 Step 3 396 Copy exported QNetd CA certificate (qnetd-cacert.crt) to every node 397 """ 398 node_list = utils.list_cluster_nodes_except_me() 399 if not node_list: 400 return 401 402 desc = "Step 3: Copy exported {} to {}".format(self.qnetd_cacert_filename, node_list) 403 self.debug_and_log_to_bootstrap(desc) 404 if self.askpass: 405 print(desc) 406 parallax.parallax_copy( 407 node_list, 408 os.path.dirname(self.qnetd_cacert_on_local), 409 self.qdevice_path, 410 self.askpass) 411 412 def init_db_on_cluster(self): 413 """ 414 Certificate process for init 415 Step 4 416 On one of cluster node initialize database by running 417 /usr/sbin/corosync-qdevice-net-certutil -i -c qnetd-cacert.crt 418 """ 419 node_list = utils.list_cluster_nodes() 420 cmd = "corosync-qdevice-net-certutil -i -c {}".format(self.qnetd_cacert_on_local) 421 desc = "Step 4: Initialize database on {}".format(node_list) 422 self.debug_and_log_to_bootstrap(desc) 423 if self.askpass: 424 print(desc) 425 parallax.parallax_call(node_list, cmd, self.askpass) 426 427 def create_ca_request(self): 428 """ 429 Certificate process for init 430 Step 5 431 Generate certificate request: 432 /usr/sbin/corosync-qdevice-net-certutil -r -n Cluster 433 (Cluster name must match cluster_name key in the corosync.conf) 434 """ 435 self.debug_and_log_to_bootstrap("Step 5: Generate certificate request {}".format(self.qdevice_crq_filename)) 436 self.cluster_name = get_value('totem.cluster_name') 437 if not self.cluster_name: 438 raise ValueError("No cluster_name found in {}".format(conf())) 439 cmd = "corosync-qdevice-net-certutil -r -n {}".format(self.cluster_name) 440 rc, _, err = utils.get_stdout_stderr(cmd) 441 if rc != 0: 442 raise ValueError(err) 443 444 def copy_crq_to_qnetd(self): 445 """ 446 Certificate process for init 447 Step 6 448 Copy exported CRQ to QNetd server 449 """ 450 desc = "Step 6: Copy {} to {}".format(self.qdevice_crq_filename, self.qnetd_addr) 451 self.debug_and_log_to_bootstrap(desc) 452 if self.askpass: 453 print(desc) 454 parallax.parallax_copy( 455 [self.qnetd_addr], 456 self.qdevice_crq_on_local, 457 os.path.dirname(self.qdevice_crq_on_qnetd), 458 self.askpass) 459 460 def sign_crq_on_qnetd(self): 461 """ 462 Certificate process for init 463 Step 7 464 On QNetd server sign and export cluster certificate by running 465 corosync-qnetd-certutil -s -c qdevice-net-node.crq -n Cluster 466 """ 467 desc = "Step 7: Sign and export cluster certificate on {}".format(self.qnetd_addr) 468 self.debug_and_log_to_bootstrap(desc) 469 cmd = "corosync-qnetd-certutil -s -c {} -n {}".\ 470 format(self.qdevice_crq_on_qnetd, self.cluster_name) 471 if self.askpass: 472 print(desc) 473 parallax.parallax_call([self.qnetd_addr], cmd, self.askpass) 474 475 def fetch_cluster_crt_from_qnetd(self): 476 """ 477 Certificate process for init 478 Step 8 479 Copy exported CRT to node where certificate request was created 480 """ 481 desc = "Step 8: Fetch {} from {}".format(os.path.basename(self.qnetd_cluster_crt_on_qnetd), self.qnetd_addr) 482 self.debug_and_log_to_bootstrap(desc) 483 if self.askpass: 484 print(desc) 485 parallax.parallax_slurp( 486 [self.qnetd_addr], 487 self.qdevice_path, 488 self.qnetd_cluster_crt_on_qnetd, 489 self.askpass) 490 491 def import_cluster_crt(self): 492 """ 493 Certificate process for init 494 Step 9 495 Import certificate on node where certificate request was created by 496 running /usr/sbin/corosync-qdevice-net-certutil -M -c cluster-Cluster.crt 497 """ 498 self.debug_and_log_to_bootstrap("Step 9: Import certificate file {} on local".format(os.path.basename(self.qnetd_cluster_crt_on_local))) 499 cmd = "corosync-qdevice-net-certutil -M -c {}".format(self.qnetd_cluster_crt_on_local) 500 rc, _, err = utils.get_stdout_stderr(cmd) 501 if rc != 0: 502 raise ValueError(err) 503 504 def copy_p12_to_cluster(self): 505 """ 506 Certificate process for init 507 Step 10 508 Copy output qdevice-net-node.p12 to all other cluster nodes 509 """ 510 node_list = utils.list_cluster_nodes_except_me() 511 if not node_list: 512 return 513 514 desc = "Step 10: Copy {} to {}".format(self.qdevice_p12_filename, node_list) 515 self.debug_and_log_to_bootstrap(desc) 516 if self.askpass: 517 print(desc) 518 parallax.parallax_copy( 519 node_list, 520 self.qdevice_p12_on_local, 521 os.path.dirname(self.qdevice_p12_on_local), 522 self.askpass) 523 524 def import_p12_on_cluster(self): 525 """ 526 Certificate process for init 527 Step 11 528 Import cluster certificate and key on all other cluster nodes: 529 /usr/sbin/corosync-qdevice-net-certutil -m -c qdevice-net-node.p12 530 """ 531 node_list = utils.list_cluster_nodes_except_me() 532 if not node_list: 533 return 534 535 desc = "Step 11: Import {} on {}".format(self.qdevice_p12_filename, node_list) 536 self.debug_and_log_to_bootstrap(desc) 537 if self.askpass: 538 print(desc) 539 cmd = "corosync-qdevice-net-certutil -m -c {}".format(self.qdevice_p12_on_local) 540 parallax.parallax_call(node_list, cmd, self.askpass) 541 542 def certificate_process_on_init(self): 543 """ 544 The qdevice certificate process on init node 545 """ 546 self.init_db_on_qnetd() 547 self.fetch_qnetd_crt_from_qnetd() 548 self.copy_qnetd_crt_to_cluster() 549 self.init_db_on_cluster() 550 self.create_ca_request() 551 self.copy_crq_to_qnetd() 552 self.sign_crq_on_qnetd() 553 self.fetch_cluster_crt_from_qnetd() 554 self.import_cluster_crt() 555 self.copy_p12_to_cluster() 556 self.import_p12_on_cluster() 557 558 def fetch_qnetd_crt_from_cluster(self): 559 """ 560 Certificate process for join 561 Step 1 562 Fetch QNetd CA certificate(qnetd-cacert.crt) from init node 563 """ 564 if os.path.exists(self.qnetd_cacert_on_cluster): 565 return 566 567 desc = "Step 1: Fetch {} from {}".format(self.qnetd_cacert_filename, self.cluster_node) 568 self.debug_and_log_to_bootstrap(desc) 569 if self.askpass: 570 print(desc) 571 parallax.parallax_slurp( 572 [self.cluster_node], 573 self.qdevice_path, 574 self.qnetd_cacert_on_local, 575 self.askpass) 576 577 def init_db_on_local(self): 578 """ 579 Certificate process for join 580 Step 2 581 Initialize database by running 582 /usr/sbin/corosync-qdevice-net-certutil -i -c qnetd-cacert.crt 583 """ 584 if os.path.exists(self.qdevice_db_path): 585 utils.rmdir_r(self.qdevice_db_path) 586 587 self.debug_and_log_to_bootstrap("Step 2: Initialize database on local") 588 cmd = "corosync-qdevice-net-certutil -i -c {}".format(self.qnetd_cacert_on_cluster) 589 rc, _, err = utils.get_stdout_stderr(cmd) 590 if rc != 0: 591 raise ValueError(err) 592 593 def fetch_p12_from_cluster(self): 594 """ 595 Certificate process for join 596 Step 3 597 Fetch p12 key file from init node 598 """ 599 if os.path.exists(self.qdevice_p12_on_cluster): 600 return 601 602 desc = "Step 3: Fetch {} from {}".format(self.qdevice_p12_filename, self.cluster_node) 603 self.debug_and_log_to_bootstrap(desc) 604 if self.askpass: 605 print(desc) 606 parallax.parallax_slurp( 607 [self.cluster_node], 608 self.qdevice_path, 609 self.qdevice_p12_on_local, 610 self.askpass) 611 612 def import_p12_on_local(self): 613 """ 614 Certificate process for join 615 Step 4 616 Import cluster certificate and key 617 """ 618 self.debug_and_log_to_bootstrap("Step 4: Import cluster certificate and key") 619 cmd = "corosync-qdevice-net-certutil -m -c {}".format(self.qdevice_p12_on_cluster) 620 rc, _, err = utils.get_stdout_stderr(cmd) 621 if rc != 0: 622 raise ValueError(err) 623 624 def certificate_process_on_join(self): 625 """ 626 The qdevice certificate process on join node 627 """ 628 self.fetch_qnetd_crt_from_cluster() 629 self.init_db_on_local() 630 self.fetch_p12_from_cluster() 631 self.import_p12_on_local() 632 633 def write_qdevice_config(self): 634 """ 635 Write qdevice attributes to config file 636 """ 637 with open(conf()) as f: 638 p = Parser(f.read()) 639 640 p.remove("quorum.device") 641 p.add('quorum', make_section('quorum.device', [])) 642 p.set('quorum.device.votes', '1') 643 p.set('quorum.device.model', 'net') 644 p.add('quorum.device', make_section('quorum.device.net', [])) 645 p.set('quorum.device.net.tls', self.tls) 646 p.set('quorum.device.net.host', self.qnetd_addr) 647 p.set('quorum.device.net.port', self.port) 648 p.set('quorum.device.net.algorithm', self.algo) 649 p.set('quorum.device.net.tie_breaker', self.tie_breaker) 650 if self.cmds: 651 p.add('quorum.device', make_section('quorum.device.heuristics', [])) 652 p.set('quorum.device.heuristics.mode', self.mode) 653 for i, cmd in enumerate(self.cmds.strip(';').split(';')): 654 cmd_name = re.sub("[.-]", "_", os.path.basename(cmd.split()[0])) 655 exec_name = "exec_{}{}".format(cmd_name, i) 656 p.set('quorum.device.heuristics.{}'.format(exec_name), cmd) 657 utils.str2file(p.to_string(), conf()) 658 659 def remove_qdevice_config(self): 660 """ 661 Remove configuration of qdevice 662 """ 663 with open(conf()) as f: 664 p = Parser(f.read()) 665 p.remove("quorum.device") 666 utils.str2file(p.to_string(), conf()) 667 668 def remove_qdevice_db(self): 669 """ 670 Remove qdevice database 671 """ 672 if not os.path.exists(self.qdevice_db_path): 673 return 674 node_list = utils.list_cluster_nodes() 675 cmd = "rm -rf {}/*".format(self.qdevice_path) 676 if self.askpass: 677 print("Remove database on cluster nodes") 678 parallax.parallax_call(node_list, cmd, self.askpass) 679 680 681def corosync_tokenizer(stream): 682 """Parses the corosync config file into a token stream""" 683 section_re = re.compile(r'(\w+)\s*{') 684 value_re = re.compile(r'(\w+):\s*([\S ]+)') 685 path = [] 686 while stream: 687 stream = stream.lstrip() 688 if not stream: 689 break 690 if stream[0] == '#': 691 end = stream.find('\n') 692 t = Token(_tCOMMENT, [], stream[:end]) 693 stream = stream[end:] 694 yield t 695 continue 696 if stream[0] == '}': 697 t = Token(_tEND, []) 698 stream = stream[1:] 699 yield t 700 path = path[:-1] 701 continue 702 m = section_re.match(stream) 703 if m: 704 path.append(m.group(1)) 705 t = Token(_tBEGIN, path, m.group(1)) 706 stream = stream[m.end():] 707 yield t 708 continue 709 m = value_re.match(stream) 710 if m: 711 t = Token(_tVALUE, path + [m.group(1)], m.group(1), m.group(2)) 712 stream = stream[m.end():] 713 yield t 714 continue 715 raise ValueError("Parse error at [..%s..]" % (stream[:16])) 716 717 718def make_section(path, contents=None): 719 "Create a token sequence representing a section" 720 if not contents: 721 contents = [] 722 sp = path.split('.') 723 name = sp[-1] 724 for t in contents: 725 if t.path and not t.path.startswith(path): 726 raise ValueError("%s (%s) not in path %s" % (t.path, t.key, path)) 727 return [Token(_tBEGIN, sp, name)] + contents + [Token(_tEND, [])] 728 729 730def make_value(path, value): 731 "Create a token sequence representing a value" 732 sp = path.split('.') 733 name = sp[-1] 734 return [Token(_tVALUE, sp, name, value)] 735 736 737class Parser(object): 738 def __init__(self, data): 739 self._tokens = list(corosync_tokenizer(data)) 740 741 def find(self, name, start=0): 742 """Gets the index of the element with the given path""" 743 for i, t in enumerate(self._tokens[start:]): 744 if t.path == name: 745 return i + start 746 return -1 747 748 def find_bounds(self, name, start=0): 749 """find the (start, end) of the next instance of name found at start""" 750 i = self.find(name, start) 751 if i < 0: 752 return -1, -1 753 if self._tokens[i].token != _tBEGIN: 754 return i, i 755 e = i + 1 756 depth = 0 757 while e < len(self._tokens): 758 t = self._tokens[e] 759 if t.token == _tBEGIN: 760 depth += 1 761 if t.token == _tEND: 762 depth -= 1 763 if depth < 0: 764 break 765 e += 1 766 if e == len(self._tokens): 767 raise ValueError("Unclosed section") 768 return i, e 769 770 def get(self, path): 771 """Gets the value for the key (if any)""" 772 for t in self._tokens: 773 if t.token == _tVALUE and t.path == path: 774 return t.value 775 return None 776 777 def get_all(self, path): 778 """Returns all values matching path""" 779 ret = [] 780 for t in self._tokens: 781 if t.token == _tVALUE and t.path == path: 782 ret.append(t.value) 783 return ret 784 785 def all_paths(self): 786 """Returns all value paths""" 787 ret = [] 788 for t in self._tokens: 789 if t.token == _tVALUE: 790 ret.append(t.path) 791 return ret 792 793 def count(self, path): 794 """Returns the number of elements matching path""" 795 n = 0 796 for t in self._tokens: 797 if t.path == path: 798 n += 1 799 return n 800 801 def remove(self, path): 802 """Removes the given section or value""" 803 i, e = self.find_bounds(path) 804 if i < 0: 805 return 806 self._tokens = self._tokens[:i] + self._tokens[(e+1):] 807 808 def remove_section_where(self, path, key, value): 809 """ 810 Remove section which contains key: value 811 Used to remove node definitions 812 """ 813 nth = -1 814 start = 0 815 keypath = '.'.join([path, key]) 816 while True: 817 nth += 1 818 i, e = self.find_bounds(path, start) 819 start = e + 1 820 if i < 0: 821 break 822 k = self.find(keypath, i) 823 if k < 0 or k > e: 824 continue 825 vt = self._tokens[k] 826 if vt.token == _tVALUE and vt.value == value: 827 self._tokens = self._tokens[:i] + self._tokens[(e+1):] 828 return nth 829 return -1 830 831 def add(self, path, tokens): 832 """Adds tokens to a section""" 833 common_debug("corosync.add (%s) (%s)" % (path, tokens)) 834 if not path: 835 self._tokens += tokens 836 return 837 start = self.find(path) 838 if start < 0: 839 return None 840 depth = 0 841 end = None 842 for i, t in enumerate(self._tokens[start + 1:]): 843 if t.token == _tBEGIN: 844 depth += 1 845 elif t.token == _tEND: 846 depth -= 1 847 if depth < 0: 848 end = start + i + 1 849 break 850 if end is None: 851 raise ValueError("Unterminated section at %s" % (start)) 852 self._tokens = self._tokens[:end] + tokens + self._tokens[end:] 853 854 def set(self, path, value): 855 """Sets a key: value entry. sections are given 856 via dot-notation.""" 857 i = self.find(path) 858 if i < 0: 859 spath = path.split('.') 860 return self.add('.'.join(spath[:-1]), 861 make_value(path, value)) 862 if self._tokens[i].token != _tVALUE: 863 raise ValueError("%s is not a value" % (path)) 864 self._tokens[i].value = value 865 866 def to_string(self): 867 ''' 868 Serialize tokens into the corosync.conf 869 file format 870 ''' 871 def joiner(tstream): 872 indent = 0 873 last = None 874 while tstream: 875 t = tstream[0] 876 if indent and t.token == _tEND: 877 indent -= 1 878 s = '' 879 if t.token == _tCOMMENT and (last and last.token != _tCOMMENT): 880 s += '\n' 881 s += ('\t'*indent) + str(t) + '\n' 882 if t.token == _tEND: 883 s += '\n' 884 yield s 885 if t.token == _tBEGIN: 886 indent += 1 887 last = t 888 tstream = tstream[1:] 889 return ''.join(joiner(self._tokens)) 890 891 892def logfile(conftext): 893 ''' 894 Return corosync logfile (if set) 895 ''' 896 return Parser(conftext).get('logging.logfile') 897 898 899def push_configuration(nodes): 900 ''' 901 Push the local configuration to the list of remote nodes 902 ''' 903 return utils.cluster_copy_file(conf(), nodes) 904 905 906def pull_configuration(from_node): 907 ''' 908 Copy the configuration from the given node to this node 909 ''' 910 local_path = conf() 911 _, fname = tmpfiles.create() 912 print("Retrieving %s:%s..." % (from_node, local_path)) 913 cmd = ['scp', '-qC', 914 '-o', 'PasswordAuthentication=no', 915 '-o', 'StrictHostKeyChecking=no', 916 '%s:%s' % (from_node, local_path), 917 fname] 918 rc = utils.ext_cmd_nosudo(cmd, shell=False) 919 if rc == 0: 920 data = open(fname).read() 921 newhash = hash(data) 922 if os.path.isfile(local_path): 923 oldata = open(local_path).read() 924 oldhash = hash(oldata) 925 if newhash == oldhash: 926 print("No change.") 927 return 928 print("Writing %s:%s..." % (utils.this_node(), local_path)) 929 local_file = open(local_path, 'w') 930 local_file.write(data) 931 local_file.close() 932 else: 933 raise ValueError("Failed to retrieve %s from %s" % (local_path, from_node)) 934 935 936def diff_configuration(nodes, checksum=False): 937 local_path = conf() 938 this_node = utils.this_node() 939 nodes = list(nodes) 940 if checksum: 941 utils.remote_checksum(local_path, nodes, this_node) 942 elif len(nodes) == 1: 943 utils.remote_diff_this(local_path, nodes, this_node) 944 elif this_node in nodes: 945 nodes.remove(this_node) 946 utils.remote_diff_this(local_path, nodes, this_node) 947 elif nodes: 948 utils.remote_diff(local_path, nodes) 949 950 951def get_free_nodeid(parser): 952 ids = parser.get_all('nodelist.node.nodeid') 953 if not ids: 954 return 1 955 ids = [int(i) for i in ids] 956 max_id = max(ids) + 1 957 for i in range(1, max_id): 958 if i not in ids: 959 return i 960 return max_id 961 962 963def get_ip(node): 964 try: 965 return socket.gethostbyname(node) 966 except socket.error: 967 return None 968 969 970def get_all_paths(): 971 f = open(conf()).read() 972 p = Parser(f) 973 return p.all_paths() 974 975 976def get_value(path): 977 f = open(conf()).read() 978 p = Parser(f) 979 return p.get(path) 980 981 982def get_values(path): 983 f = open(conf()).read() 984 p = Parser(f) 985 return p.get_all(path) 986 987 988def set_value(path, value): 989 f = open(conf()).read() 990 p = Parser(f) 991 p.set(path, value) 992 utils.str2file(p.to_string(), conf()) 993 994 995class IPAlreadyConfiguredError(Exception): 996 pass 997 998 999def find_configured_ip(ip_list): 1000 """ 1001 find if the same IP already configured 1002 If so, raise IPAlreadyConfiguredError 1003 """ 1004 with open(conf()) as f: 1005 p = Parser(f.read()) 1006 1007 # get exist ip list from corosync.conf 1008 corosync_iplist = [] 1009 for path in set(p.all_paths()): 1010 if re.search('nodelist.node.ring[0-9]*_addr', path): 1011 corosync_iplist.extend(p.get_all(path)) 1012 1013 # all_possible_ip is a ip set to check whether one of them already configured 1014 all_possible_ip = set(ip_list) 1015 # get local ip list 1016 is_ipv6 = utils.IP.is_ipv6(ip_list[0]) 1017 local_ip_list = utils.InterfacesInfo.get_local_ip_list(is_ipv6) 1018 # extend all_possible_ip if ip_list contain local ip 1019 # to avoid this scenarios in join node: 1020 # eth0's ip already configured in corosync.conf 1021 # eth1's ip also want to add in nodelist 1022 # if this scenarios happened, raise IPAlreadyConfiguredError 1023 if bool(set(ip_list) & set(local_ip_list)): 1024 all_possible_ip |= set(local_ip_list) 1025 configured_ip = list(all_possible_ip & set(corosync_iplist)) 1026 if configured_ip: 1027 raise IPAlreadyConfiguredError("IP {} was already configured".format(','.join(configured_ip))) 1028 1029 1030def add_node_ucast(ip_list, node_id=None): 1031 1032 find_configured_ip(ip_list) 1033 1034 with open(conf()) as f: 1035 p = Parser(f.read()) 1036 1037 if node_id is None: 1038 node_id = get_free_nodeid(p) 1039 node_value = [] 1040 for i, addr in enumerate(ip_list): 1041 node_value += make_value('nodelist.node.ring{}_addr'.format(i), addr) 1042 node_value += make_value('nodelist.node.nodeid', str(node_id)) 1043 1044 if get_values("nodelist.node.ring0_addr") == []: 1045 p.add('', make_section('nodelist', [])) 1046 p.add('nodelist', make_section('nodelist.node', node_value)) 1047 1048 num_nodes = p.count('nodelist.node') 1049 p.set('quorum.two_node', '1' if num_nodes == 2 else '0') 1050 if p.get("quorum.device.model") == "net": 1051 p.set('quorum.two_node', '0') 1052 1053 utils.str2file(p.to_string(), conf()) 1054 1055 1056def add_node(addr, name=None): 1057 ''' 1058 Add node to corosync.conf 1059 ''' 1060 coronodes = None 1061 nodes = None 1062 nodenames = None 1063 coronodes = utils.list_corosync_nodes() 1064 nodenames = utils.list_corosync_node_names() 1065 try: 1066 nodes = utils.list_cluster_nodes() 1067 except Exception: 1068 nodes = [] 1069 ipaddr = get_ip(addr) 1070 if addr in nodenames + coronodes or (ipaddr and ipaddr in coronodes): 1071 err_buf.warning("%s already in corosync.conf" % (addr)) 1072 return 1073 if name and name in nodenames + coronodes: 1074 err_buf.warning("%s already in corosync.conf" % (name)) 1075 return 1076 if addr in nodes: 1077 err_buf.warning("%s already in configuration" % (addr)) 1078 return 1079 if name and name in nodes: 1080 err_buf.warning("%s already in configuration" % (name)) 1081 return 1082 1083 f = open(conf()).read() 1084 p = Parser(f) 1085 1086 node_addr = addr 1087 node_id = get_free_nodeid(p) 1088 node_name = name 1089 node_value = (make_value('nodelist.node.ring0_addr', node_addr) + 1090 make_value('nodelist.node.nodeid', str(node_id))) 1091 if node_name: 1092 node_value += make_value('nodelist.node.name', node_name) 1093 1094 p.add('nodelist', make_section('nodelist.node', node_value)) 1095 1096 num_nodes = p.count('nodelist.node') 1097 p.set('quorum.two_node', '1' if num_nodes == 2 else '0') 1098 if p.get("quorum.device.model") == "net": 1099 p.set('quorum.two_node', '0') 1100 1101 utils.str2file(p.to_string(), conf()) 1102 1103 # update running config (if any) 1104 if nodes: 1105 utils.ext_cmd(["corosync-cmapctl", 1106 "-s", "nodelist.node.%s.nodeid" % (num_nodes - 1), 1107 "u32", str(node_id)], shell=False) 1108 utils.ext_cmd(["corosync-cmapctl", 1109 "-s", "nodelist.node.%s.ring0_addr" % (num_nodes - 1), 1110 "str", node_addr], shell=False) 1111 if node_name: 1112 utils.ext_cmd(["corosync-cmapctl", 1113 "-s", "nodelist.node.%s.name" % (num_nodes - 1), 1114 "str", node_name], shell=False) 1115 1116 1117def del_node(addr): 1118 ''' 1119 Remove node from corosync 1120 ''' 1121 f = open(conf()).read() 1122 p = Parser(f) 1123 nth = p.remove_section_where('nodelist.node', 'ring0_addr', addr) 1124 if nth == -1: 1125 return 1126 1127 num_nodes = p.count('nodelist.node') 1128 p.set('quorum.two_node', '1' if num_nodes == 2 else '0') 1129 if p.get("quorum.device.model") == "net": 1130 p.set('quorum.two_node', '0') 1131 1132 utils.str2file(p.to_string(), conf()) 1133 1134 1135_COROSYNC_CONF_TEMPLATE_HEAD = """# Please read the corosync.conf.5 manual page 1136 1137totem { 1138 version: 2 1139 secauth: on 1140 crypto_hash: sha1 1141 crypto_cipher: aes256 1142 cluster_name: %(clustername)s 1143 clear_node_high_bit: yes 1144 1145 token: 5000 1146 token_retransmits_before_loss_const: 10 1147 join: 60 1148 consensus: 6000 1149 max_messages: 20 1150""" 1151_COROSYNC_CONF_TEMPLATE_TAIL = """ 1152 %(rrp_mode)s 1153 %(transport)s 1154 %(ipv6)s 1155 %(ipv6_nodeid)s 1156} 1157 1158logging { 1159 fileline: off 1160 to_stderr: no 1161 to_logfile: no 1162 logfile: /var/log/cluster/corosync.log 1163 to_syslog: yes 1164 debug: off 1165 timestamp: on 1166 logger_subsys { 1167 subsys: QUORUM 1168 debug: off 1169 } 1170} 1171 1172%(nodelist)s 1173%(quorum)s 1174""" 1175_COROSYNC_CONF_TEMPLATE_RING = """ 1176 interface { 1177 ringnumber: %(number)d 1178 %(bindnetaddr)s 1179%(mcast)s 1180 ttl: 1 1181 } 1182""" 1183 1184 1185def create_configuration(clustername="hacluster", 1186 bindnetaddr=None, 1187 mcastaddr=None, 1188 mcastport=None, 1189 ringXaddr=None, 1190 transport=None, 1191 ipv6=False, 1192 nodeid=None, 1193 two_rings=False, 1194 qdevice=None): 1195 1196 if transport == "udpu": 1197 ring_tmpl = "" 1198 for i in 0, 1: 1199 ring_tmpl += " ring{}_addr: {}\n".format(i, ringXaddr[i]) 1200 if not two_rings: 1201 break 1202 1203 nodelist_tmpl = """nodelist { 1204 node { 1205%(ringaddr)s 1206 nodeid: 1 1207 } 1208} 1209""" % {"ringaddr": ring_tmpl} 1210 else: 1211 nodelist_tmpl = "" 1212 1213 transport_tmpl = "" 1214 if transport is not None: 1215 transport_tmpl = "transport: {}\n".format(transport) 1216 1217 rrp_mode_tmp = "" 1218 if two_rings: 1219 rrp_mode_tmp = "rrp_mode: passive" 1220 1221 ipv6_tmpl = "" 1222 ipv6_nodeid = "" 1223 if ipv6: 1224 ipv6_tmpl = "ip_version: ipv6" 1225 if transport != "udpu": 1226 ipv6_nodeid = "nodeid: %d" % nodeid 1227 1228 quorum_tmpl = """quorum { 1229 # Enable and configure quorum subsystem (default: off) 1230 # see also corosync.conf.5 and votequorum.5 1231 provider: corosync_votequorum 1232 expected_votes: 1 1233 two_node: 0 1234} 1235""" 1236 if qdevice is not None: 1237 quorum_tmpl = """quorum { 1238 # Enable and configure quorum subsystem (default: off) 1239 # see also corosync.conf.5 and votequorum.5 1240 provider: corosync_votequorum 1241 expected_votes: 1 1242 two_node: 0 1243 device { 1244 votes: 0 1245 model: net 1246 net { 1247 tls: %(tls)s 1248 host: %(ip)s 1249 port: %(port)s 1250 algorithm: %(algo)s 1251 tie_breaker: %(tie_breaker)s 1252 } 1253 } 1254} 1255""" % qdevice.__dict__ 1256 1257 config_common = { 1258 "clustername": clustername, 1259 "nodelist": nodelist_tmpl, 1260 "quorum": quorum_tmpl, 1261 "ipv6": ipv6_tmpl, 1262 "ipv6_nodeid": ipv6_nodeid, 1263 "rrp_mode": rrp_mode_tmp, 1264 "transport": transport_tmpl 1265 } 1266 1267 _COROSYNC_CONF_TEMPLATE_RING_ALL = "" 1268 mcast_tmp = [] 1269 bindnetaddr_tmp = [] 1270 config_ring = [] 1271 for i in 0, 1: 1272 mcast_tmp.append("") 1273 if mcastaddr is not None: 1274 mcast_tmp[i] += " mcastaddr: {}\n".format(mcastaddr[i]) 1275 if mcastport is not None: 1276 mcast_tmp[i] += " mcastport: {}".format(mcastport[i]) 1277 1278 bindnetaddr_tmp.append("") 1279 if bindnetaddr is None: 1280 bindnetaddr_tmp[i] = "" 1281 else: 1282 bindnetaddr_tmp[i] = "bindnetaddr: {}".format(bindnetaddr[i]) 1283 1284 config_ring.append("") 1285 config_ring[i] = { 1286 "bindnetaddr": bindnetaddr_tmp[i], 1287 "mcast": mcast_tmp[i], 1288 "number": i 1289 } 1290 _COROSYNC_CONF_TEMPLATE_RING_ALL += _COROSYNC_CONF_TEMPLATE_RING % config_ring[i] 1291 1292 if not two_rings: 1293 break 1294 1295 _COROSYNC_CONF_TEMPLATE = _COROSYNC_CONF_TEMPLATE_HEAD + \ 1296 _COROSYNC_CONF_TEMPLATE_RING_ALL + \ 1297 _COROSYNC_CONF_TEMPLATE_TAIL 1298 utils.str2file(_COROSYNC_CONF_TEMPLATE % config_common, conf()) 1299