1# Copyright (C) 2013 Kristoffer Gronlund <kgronlund@suse.com>
2# See COPYING for license information.
3'''
4Functions that abstract creating and editing the corosync.conf
5configuration file, and also the corosync-* utilities.
6'''
7
8import os
9import re
10import socket
11from . import utils
12from . import tmpfiles
13from . import parallax
14from .msg import err_buf, common_debug
15
16
17def conf():
18    return os.getenv('COROSYNC_MAIN_CONFIG_FILE', '/usr/local/etc/corosync/corosync.conf')
19
20
21def check_tools():
22    return all(utils.is_program(p)
23               for p in ['corosync-cfgtool', 'corosync-quorumtool', 'corosync-cmapctl'])
24
25
26def cfgtool(*args):
27    return utils.get_stdout(['corosync-cfgtool'] + list(args), shell=False)
28
29
30def quorumtool(*args):
31    return utils.get_stdout(['corosync-quorumtool'] + list(args), shell=False)
32
33
34def query_status(status_type):
35    """
36    Query status of corosync
37
38    Possible types could be ring/quorum/qdevice/qnetd
39    """
40    status_func_dict = {
41            "ring": query_ring_status,
42            "quorum": query_quorum_status,
43            "qdevice": query_qdevice_status,
44            "qnetd": query_qnetd_status
45            }
46    if status_type in status_func_dict:
47        status_func_dict[status_type]()
48    else:
49        raise ValueError("Wrong type \"{}\" to query status".format(status_type))
50
51
52def query_ring_status():
53    """
54    Query corosync ring status
55    """
56    rc, out, err = utils.get_stdout_stderr("corosync-cfgtool -s")
57    if rc != 0 and err:
58        raise ValueError(err)
59    if rc == 0 and out:
60        print(out)
61
62
63def query_quorum_status():
64    """
65    Query corosync quorum status
66
67    """
68    utils.print_cluster_nodes()
69    rc, out, err = utils.get_stdout_stderr("corosync-quorumtool -s")
70    if rc != 0 and err:
71        raise ValueError(err)
72    # If the return code of corosync-quorumtool is 2,
73    # that means no problem appeared but node is not quorate
74    if rc in [0, 2] and out:
75        print(out)
76
77
78def query_qdevice_status():
79    """
80    Query qdevice status
81    """
82    if not utils.is_qdevice_configured():
83        raise ValueError("QDevice/QNetd not configured!")
84    cmd = "corosync-qdevice-tool -sv"
85    out = utils.get_stdout_or_raise_error(cmd)
86    utils.print_cluster_nodes()
87    print(out)
88
89
90def query_qnetd_status():
91    """
92    Query qnetd status
93    """
94    if not utils.is_qdevice_configured():
95        raise ValueError("QDevice/QNetd not configured!")
96    cluster_name = get_value('totem.cluster_name')
97    if not cluster_name:
98        raise ValueError("cluster_name not configured!")
99    qnetd_addr = get_value('quorum.device.net.host')
100    if not qnetd_addr:
101        raise ValueError("host for qnetd not configured!")
102
103    # Configure ssh passwordless to qnetd if detect password is needed
104    if utils.check_ssh_passwd_need(qnetd_addr):
105        print("Copy ssh key to qnetd node({})".format(qnetd_addr))
106        rc, _, err = utils.get_stdout_stderr("ssh-copy-id -i /root/.ssh/id_rsa.pub root@{}".format(qnetd_addr))
107        if rc != 0:
108            raise ValueError(err)
109
110    cmd = "corosync-qnetd-tool -lv -c {}".format(cluster_name)
111    result = parallax.parallax_call([qnetd_addr], cmd)
112    _, qnetd_result_stdout, _ = result[0][1]
113    if qnetd_result_stdout:
114        utils.print_cluster_nodes()
115        print(utils.to_ascii(qnetd_result_stdout))
116
117
118def add_nodelist_from_cmaptool():
119    for nodeid, iplist in utils.get_nodeinfo_from_cmaptool().items():
120        try:
121            add_node_ucast(iplist, nodeid)
122        except IPAlreadyConfiguredError:
123            continue
124
125
126def is_unicast():
127    return get_value("totem.transport") == "udpu"
128
129
130_tCOMMENT = 0
131_tBEGIN = 1
132_tEND = 2
133_tVALUE = 3
134
135
136class Token(object):
137    def __init__(self, token, path, key=None, value=None):
138        self.token = token
139        self.path = '.'.join(path)
140        self.key = key
141        self.value = value
142
143    def __repr__(self):
144        if self.token == _tCOMMENT:
145            return self.key
146        elif self.token == _tBEGIN:
147            return "%s {" % (self.key)
148        elif self.token == _tEND:
149            return '}'
150        return '%s: %s' % (self.key, self.value)
151
152
153class QDevice(object):
154    """
155    Class to manage qdevice configuration and services
156
157    Whole certification process:
158    For init
159    Step 1:  init_db_on_qnetd
160    Step 2:  fetch_qnetd_crt_from_qnetd
161    Step 3:  copy_qnetd_crt_to_cluster
162    Step 4:  init_db_on_cluster
163    Step 5:  create_ca_request
164    Step 6:  copy_crq_to_qnetd
165    Step 7:  sign_crq_on_qnetd
166    Step 8:  fetch_cluster_crt_from_qnetd
167    Step 9:  import_cluster_crt
168    Step 10: copy_p12_to_cluster
169    Step 11: import_p12_on_cluster
170
171    For join
172    Step 1:  fetch_qnetd_crt_from_cluster
173    Step 2:  init_db_on_local
174    Step 3:  fetch_p12_from_cluster
175    Step 4:  import_p12_on_local
176    """
177    qnetd_service = "corosync-qnetd.service"
178    qnetd_cacert_filename = "qnetd-cacert.crt"
179    qdevice_crq_filename = "qdevice-net-node.crq"
180    qdevice_p12_filename = "qdevice-net-node.p12"
181    qnetd_path = "/usr/local/etc/corosync/qnetd"
182    qdevice_path = "/usr/local/etc/corosync/qdevice/net"
183    qdevice_db_path = "/usr/local/etc/corosync/qdevice/net/nssdb"
184
185    def __init__(self, qnetd_addr, port=5403, algo="ffsplit", tie_breaker="lowest",
186            tls="on", cluster_node=None, cmds=None, mode=None):
187        """
188        Init function
189        """
190        self.qnetd_addr = qnetd_addr
191        self.port = port
192        self.algo = algo
193        self.tie_breaker = tie_breaker
194        self.tls = tls
195        self.cluster_node = cluster_node
196        self.askpass = False
197        self.cmds = cmds
198        self.mode = mode
199
200    @property
201    def qnetd_cacert_on_qnetd(self):
202        """
203        Return path of qnetd-cacert.crt on qnetd node
204        """
205        return "{}/nssdb/{}".format(self.qnetd_path, self.qnetd_cacert_filename)
206
207    @property
208    def qnetd_cacert_on_local(self):
209        """
210        Return path of qnetd-cacert.crt on local node
211        """
212        return "{}/{}/{}".format(self.qdevice_path, self.qnetd_addr, self.qnetd_cacert_filename)
213
214    @property
215    def qnetd_cacert_on_cluster(self):
216        """
217        Return path of qnetd-cacert.crt on cluster node
218        """
219        return "{}/{}/{}".format(self.qdevice_path, self.cluster_node, self.qnetd_cacert_filename)
220
221    @property
222    def qdevice_crq_on_qnetd(self):
223        """
224        Return path of qdevice-net-node.crq on qnetd node
225        """
226        return "{}/nssdb/{}".format(self.qnetd_path, self.qdevice_crq_filename)
227
228    @property
229    def qdevice_crq_on_local(self):
230        """
231        Return path of qdevice-net-node.crq on local node
232        """
233        return "{}/nssdb/{}".format(self.qdevice_path, self.qdevice_crq_filename)
234
235    @property
236    def qnetd_cluster_crt_on_qnetd(self):
237        """
238        Return path of cluster-cluster_name.crt on qnetd node
239        """
240        return "{}/nssdb/cluster-{}.crt".format(self.qnetd_path, self.cluster_name)
241
242    @property
243    def qnetd_cluster_crt_on_local(self):
244        """
245        Return path of cluster-cluster_name.crt on local node
246        """
247        return "{}/{}/{}".format(self.qdevice_path, self.qnetd_addr, os.path.basename(self.qnetd_cluster_crt_on_qnetd))
248
249    @property
250    def qdevice_p12_on_local(self):
251        """
252        Return path of qdevice-net-node.p12 on local node
253        """
254        return "{}/nssdb/{}".format(self.qdevice_path, self.qdevice_p12_filename)
255
256    @property
257    def qdevice_p12_on_cluster(self):
258        """
259        Return path of qdevice-net-node.p12 on cluster node
260        """
261        return "{}/{}/{}".format(self.qdevice_path, self.cluster_node, self.qdevice_p12_filename)
262
263    def valid_attr(self):
264        """
265        Validate qdevice related options
266        """
267        qnetd_ip = None
268
269        if not utils.package_is_installed("corosync-qdevice"):
270            raise ValueError("Package \"corosync-qdevice\" not installed on this node")
271
272        try:
273            # socket.getaddrinfo works for both ipv4 and ipv6 address
274            # The function returns a list of 5-tuples with the following structure:
275            # (family, type, proto, canonname, sockaddr)
276            # sockaddr is a tuple describing a socket address, whose format depends on the returned family
277            # (a (address, port) 2-tuple for AF_INET, a (address, port, flow info, scope id) 4-tuple for AF_INET6)
278            res = socket.getaddrinfo(self.qnetd_addr, None)
279            qnetd_ip = res[0][-1][0]
280        except socket.error:
281            raise ValueError("host \"{}\" is unreachable".format(self.qnetd_addr))
282
283        utils.ping_node(self.qnetd_addr)
284
285        if utils.InterfacesInfo.ip_in_local(qnetd_ip):
286            raise ValueError("host for qnetd must be a remote one")
287
288        if not utils.check_port_open(qnetd_ip, 22):
289            raise ValueError("ssh service on \"{}\" not available".format(self.qnetd_addr))
290
291        if not utils.valid_port(self.port):
292            raise ValueError("invalid qdevice port range(1024 - 65535)")
293
294        if self.algo not in ("ffsplit", "lms"):
295            raise ValueError("invalid ALGORITHM choice: '{}' (choose from 'ffsplit', 'lms')".format(self.algo))
296
297        if self.tie_breaker not in ("lowest", "highest") and not utils.valid_nodeid(self.tie_breaker):
298            raise ValueError("invalid qdevice tie_breaker(lowest/highest/valid_node_id)")
299
300        if self.tls not in ("on", "off", "required"):
301            raise ValueError("invalid TLS choice: '{}' (choose from 'on', 'off', 'required')".format(self.tls))
302
303        if self.cmds:
304            for cmd in self.cmds.strip(';').split(';'):
305                if not cmd.startswith('/'):
306                    raise ValueError("commands for heuristics should be absolute path")
307                if not os.path.exists(cmd.split()[0]):
308                    raise ValueError("command {} not exist".format(cmd.split()[0]))
309
310    def valid_qnetd(self):
311        """
312        Validate on qnetd node
313        """
314        if utils.check_ssh_passwd_need(self.qnetd_addr):
315            self.askpass = True
316
317        exception_msg = ""
318        suggest = ""
319        if utils.service_is_active("pacemaker", self.qnetd_addr):
320            exception_msg = "host for qnetd must be a non-cluster node"
321            suggest = "change to another host or stop cluster service on {}".format(self.qnetd_addr)
322        elif not utils.package_is_installed("corosync-qnetd", self.qnetd_addr):
323            exception_msg = "Package \"corosync-qnetd\" not installed on {}".format(self.qnetd_addr)
324            suggest = "install \"corosync-qnetd\" on {}".format(self.qnetd_addr)
325
326        if exception_msg:
327            exception_msg += "\nCluster service already successfully started on this node except qdevice service\nIf you still want to use qdevice, {}\nThen run command \"crm cluster init\" with \"qdevice\" stage, like:\n  crm cluster init qdevice qdevice_related_options\nThat command will setup qdevice separately".format(suggest)
328            raise ValueError(exception_msg)
329
330    def manage_qnetd(self, action):
331        cmd = "systemctl {} {}".format(action, self.qnetd_service)
332        if self.askpass:
333            print("{} {} on {}".format(action.capitalize(), self.qnetd_service, self.qnetd_addr))
334        parallax.parallax_call([self.qnetd_addr], cmd, self.askpass)
335
336    def enable_qnetd(self):
337        self.manage_qnetd("enable")
338
339    def disable_qnetd(self):
340        self.manage_qnetd("disable")
341
342    def start_qnetd(self):
343        self.manage_qnetd("start")
344
345    def stop_qnetd(self):
346        self.manage_qnetd("stop")
347
348    def debug_and_log_to_bootstrap(self, msg):
349        from . import bootstrap
350        common_debug(msg)
351        bootstrap.log("# " + msg)
352
353    def init_db_on_qnetd(self):
354        """
355        Certificate process for init
356        Step 1
357        Initialize database on QNetd server by running corosync-qnetd-certutil -i
358        """
359        cmd = "test -f {}".format(self.qnetd_cacert_on_qnetd)
360        if self.askpass:
361            print("Test whether {} exists on QNetd server({})".format(self.qnetd_cacert_on_qnetd, self.qnetd_addr))
362        try:
363            parallax.parallax_call([self.qnetd_addr], cmd, self.askpass)
364        except ValueError:
365            # target file not exist
366            pass
367        else:
368            return
369
370        cmd = "corosync-qnetd-certutil -i"
371        desc = "Step 1: Initialize database on {}".format(self.qnetd_addr)
372        self.debug_and_log_to_bootstrap(desc)
373        if self.askpass:
374            print(desc)
375        parallax.parallax_call([self.qnetd_addr], cmd, self.askpass)
376
377    def fetch_qnetd_crt_from_qnetd(self):
378        """
379        Certificate process for init
380        Step 2
381        Fetch QNetd CA certificate(qnetd-cacert.crt) from QNetd server
382        """
383        if os.path.exists(self.qnetd_cacert_on_local):
384            return
385
386        desc = "Step 2: Fetch {} from {}".format(self.qnetd_cacert_filename, self.qnetd_addr)
387        self.debug_and_log_to_bootstrap(desc)
388        if self.askpass:
389            print(desc)
390        parallax.parallax_slurp([self.qnetd_addr], self.qdevice_path, self.qnetd_cacert_on_qnetd, self.askpass)
391
392    def copy_qnetd_crt_to_cluster(self):
393        """
394        Certificate process for init
395        Step 3
396        Copy exported QNetd CA certificate (qnetd-cacert.crt) to every node
397        """
398        node_list = utils.list_cluster_nodes_except_me()
399        if not node_list:
400            return
401
402        desc = "Step 3: Copy exported {} to {}".format(self.qnetd_cacert_filename, node_list)
403        self.debug_and_log_to_bootstrap(desc)
404        if self.askpass:
405            print(desc)
406        parallax.parallax_copy(
407                node_list,
408                os.path.dirname(self.qnetd_cacert_on_local),
409                self.qdevice_path,
410                self.askpass)
411
412    def init_db_on_cluster(self):
413        """
414        Certificate process for init
415        Step 4
416        On one of cluster node initialize database by running
417        /usr/sbin/corosync-qdevice-net-certutil -i -c qnetd-cacert.crt
418        """
419        node_list = utils.list_cluster_nodes()
420        cmd = "corosync-qdevice-net-certutil -i -c {}".format(self.qnetd_cacert_on_local)
421        desc = "Step 4: Initialize database on {}".format(node_list)
422        self.debug_and_log_to_bootstrap(desc)
423        if self.askpass:
424            print(desc)
425        parallax.parallax_call(node_list, cmd, self.askpass)
426
427    def create_ca_request(self):
428        """
429        Certificate process for init
430        Step 5
431        Generate certificate request:
432        /usr/sbin/corosync-qdevice-net-certutil -r -n Cluster
433        (Cluster name must match cluster_name key in the corosync.conf)
434        """
435        self.debug_and_log_to_bootstrap("Step 5: Generate certificate request {}".format(self.qdevice_crq_filename))
436        self.cluster_name = get_value('totem.cluster_name')
437        if not self.cluster_name:
438            raise ValueError("No cluster_name found in {}".format(conf()))
439        cmd = "corosync-qdevice-net-certutil -r -n {}".format(self.cluster_name)
440        rc, _, err = utils.get_stdout_stderr(cmd)
441        if rc != 0:
442            raise ValueError(err)
443
444    def copy_crq_to_qnetd(self):
445        """
446        Certificate process for init
447        Step 6
448        Copy exported CRQ to QNetd server
449        """
450        desc = "Step 6: Copy {} to {}".format(self.qdevice_crq_filename, self.qnetd_addr)
451        self.debug_and_log_to_bootstrap(desc)
452        if self.askpass:
453            print(desc)
454        parallax.parallax_copy(
455                [self.qnetd_addr],
456                self.qdevice_crq_on_local,
457                os.path.dirname(self.qdevice_crq_on_qnetd),
458                self.askpass)
459
460    def sign_crq_on_qnetd(self):
461        """
462        Certificate process for init
463        Step 7
464        On QNetd server sign and export cluster certificate by running
465        corosync-qnetd-certutil -s -c qdevice-net-node.crq -n Cluster
466        """
467        desc = "Step 7: Sign and export cluster certificate on {}".format(self.qnetd_addr)
468        self.debug_and_log_to_bootstrap(desc)
469        cmd = "corosync-qnetd-certutil -s -c {} -n {}".\
470                format(self.qdevice_crq_on_qnetd, self.cluster_name)
471        if self.askpass:
472            print(desc)
473        parallax.parallax_call([self.qnetd_addr], cmd, self.askpass)
474
475    def fetch_cluster_crt_from_qnetd(self):
476        """
477        Certificate process for init
478        Step 8
479        Copy exported CRT to node where certificate request was created
480        """
481        desc = "Step 8: Fetch {} from {}".format(os.path.basename(self.qnetd_cluster_crt_on_qnetd), self.qnetd_addr)
482        self.debug_and_log_to_bootstrap(desc)
483        if self.askpass:
484            print(desc)
485        parallax.parallax_slurp(
486                [self.qnetd_addr],
487                self.qdevice_path,
488                self.qnetd_cluster_crt_on_qnetd,
489                self.askpass)
490
491    def import_cluster_crt(self):
492        """
493        Certificate process for init
494        Step 9
495        Import certificate on node where certificate request was created by
496        running /usr/sbin/corosync-qdevice-net-certutil -M -c cluster-Cluster.crt
497        """
498        self.debug_and_log_to_bootstrap("Step 9: Import certificate file {} on local".format(os.path.basename(self.qnetd_cluster_crt_on_local)))
499        cmd = "corosync-qdevice-net-certutil -M -c {}".format(self.qnetd_cluster_crt_on_local)
500        rc, _, err = utils.get_stdout_stderr(cmd)
501        if rc != 0:
502            raise ValueError(err)
503
504    def copy_p12_to_cluster(self):
505        """
506        Certificate process for init
507        Step 10
508        Copy output qdevice-net-node.p12 to all other cluster nodes
509        """
510        node_list = utils.list_cluster_nodes_except_me()
511        if not node_list:
512            return
513
514        desc = "Step 10: Copy {} to {}".format(self.qdevice_p12_filename, node_list)
515        self.debug_and_log_to_bootstrap(desc)
516        if self.askpass:
517            print(desc)
518        parallax.parallax_copy(
519                node_list,
520                self.qdevice_p12_on_local,
521                os.path.dirname(self.qdevice_p12_on_local),
522                self.askpass)
523
524    def import_p12_on_cluster(self):
525        """
526        Certificate process for init
527        Step 11
528        Import cluster certificate and key on all other cluster nodes:
529        /usr/sbin/corosync-qdevice-net-certutil -m -c qdevice-net-node.p12
530        """
531        node_list = utils.list_cluster_nodes_except_me()
532        if not node_list:
533            return
534
535        desc = "Step 11: Import {} on {}".format(self.qdevice_p12_filename, node_list)
536        self.debug_and_log_to_bootstrap(desc)
537        if self.askpass:
538            print(desc)
539        cmd = "corosync-qdevice-net-certutil -m -c {}".format(self.qdevice_p12_on_local)
540        parallax.parallax_call(node_list, cmd, self.askpass)
541
542    def certificate_process_on_init(self):
543        """
544        The qdevice certificate process on init node
545        """
546        self.init_db_on_qnetd()
547        self.fetch_qnetd_crt_from_qnetd()
548        self.copy_qnetd_crt_to_cluster()
549        self.init_db_on_cluster()
550        self.create_ca_request()
551        self.copy_crq_to_qnetd()
552        self.sign_crq_on_qnetd()
553        self.fetch_cluster_crt_from_qnetd()
554        self.import_cluster_crt()
555        self.copy_p12_to_cluster()
556        self.import_p12_on_cluster()
557
558    def fetch_qnetd_crt_from_cluster(self):
559        """
560        Certificate process for join
561        Step 1
562        Fetch QNetd CA certificate(qnetd-cacert.crt) from init node
563        """
564        if os.path.exists(self.qnetd_cacert_on_cluster):
565            return
566
567        desc = "Step 1: Fetch {} from {}".format(self.qnetd_cacert_filename, self.cluster_node)
568        self.debug_and_log_to_bootstrap(desc)
569        if self.askpass:
570            print(desc)
571        parallax.parallax_slurp(
572                [self.cluster_node],
573                self.qdevice_path,
574                self.qnetd_cacert_on_local,
575                self.askpass)
576
577    def init_db_on_local(self):
578        """
579        Certificate process for join
580        Step 2
581        Initialize database by running
582        /usr/sbin/corosync-qdevice-net-certutil -i -c qnetd-cacert.crt
583        """
584        if os.path.exists(self.qdevice_db_path):
585            utils.rmdir_r(self.qdevice_db_path)
586
587        self.debug_and_log_to_bootstrap("Step 2: Initialize database on local")
588        cmd = "corosync-qdevice-net-certutil -i -c {}".format(self.qnetd_cacert_on_cluster)
589        rc, _, err = utils.get_stdout_stderr(cmd)
590        if rc != 0:
591            raise ValueError(err)
592
593    def fetch_p12_from_cluster(self):
594        """
595        Certificate process for join
596        Step 3
597        Fetch p12 key file from init node
598        """
599        if os.path.exists(self.qdevice_p12_on_cluster):
600            return
601
602        desc = "Step 3: Fetch {} from {}".format(self.qdevice_p12_filename, self.cluster_node)
603        self.debug_and_log_to_bootstrap(desc)
604        if self.askpass:
605            print(desc)
606        parallax.parallax_slurp(
607                [self.cluster_node],
608                self.qdevice_path,
609                self.qdevice_p12_on_local,
610                self.askpass)
611
612    def import_p12_on_local(self):
613        """
614        Certificate process for join
615        Step 4
616        Import cluster certificate and key
617        """
618        self.debug_and_log_to_bootstrap("Step 4: Import cluster certificate and key")
619        cmd = "corosync-qdevice-net-certutil -m -c {}".format(self.qdevice_p12_on_cluster)
620        rc, _, err = utils.get_stdout_stderr(cmd)
621        if rc != 0:
622            raise ValueError(err)
623
624    def certificate_process_on_join(self):
625        """
626        The qdevice certificate process on join node
627        """
628        self.fetch_qnetd_crt_from_cluster()
629        self.init_db_on_local()
630        self.fetch_p12_from_cluster()
631        self.import_p12_on_local()
632
633    def write_qdevice_config(self):
634        """
635        Write qdevice attributes to config file
636        """
637        with open(conf()) as f:
638            p = Parser(f.read())
639
640        p.remove("quorum.device")
641        p.add('quorum', make_section('quorum.device', []))
642        p.set('quorum.device.votes', '1')
643        p.set('quorum.device.model', 'net')
644        p.add('quorum.device', make_section('quorum.device.net', []))
645        p.set('quorum.device.net.tls', self.tls)
646        p.set('quorum.device.net.host', self.qnetd_addr)
647        p.set('quorum.device.net.port', self.port)
648        p.set('quorum.device.net.algorithm', self.algo)
649        p.set('quorum.device.net.tie_breaker', self.tie_breaker)
650        if self.cmds:
651            p.add('quorum.device', make_section('quorum.device.heuristics', []))
652            p.set('quorum.device.heuristics.mode', self.mode)
653            for i, cmd in enumerate(self.cmds.strip(';').split(';')):
654                cmd_name = re.sub("[.-]", "_", os.path.basename(cmd.split()[0]))
655                exec_name = "exec_{}{}".format(cmd_name, i)
656                p.set('quorum.device.heuristics.{}'.format(exec_name), cmd)
657        utils.str2file(p.to_string(), conf())
658
659    def remove_qdevice_config(self):
660        """
661        Remove configuration of qdevice
662        """
663        with open(conf()) as f:
664            p = Parser(f.read())
665            p.remove("quorum.device")
666        utils.str2file(p.to_string(), conf())
667
668    def remove_qdevice_db(self):
669        """
670        Remove qdevice database
671        """
672        if not os.path.exists(self.qdevice_db_path):
673            return
674        node_list = utils.list_cluster_nodes()
675        cmd = "rm -rf {}/*".format(self.qdevice_path)
676        if self.askpass:
677            print("Remove database on cluster nodes")
678        parallax.parallax_call(node_list, cmd, self.askpass)
679
680
681def corosync_tokenizer(stream):
682    """Parses the corosync config file into a token stream"""
683    section_re = re.compile(r'(\w+)\s*{')
684    value_re = re.compile(r'(\w+):\s*([\S ]+)')
685    path = []
686    while stream:
687        stream = stream.lstrip()
688        if not stream:
689            break
690        if stream[0] == '#':
691            end = stream.find('\n')
692            t = Token(_tCOMMENT, [], stream[:end])
693            stream = stream[end:]
694            yield t
695            continue
696        if stream[0] == '}':
697            t = Token(_tEND, [])
698            stream = stream[1:]
699            yield t
700            path = path[:-1]
701            continue
702        m = section_re.match(stream)
703        if m:
704            path.append(m.group(1))
705            t = Token(_tBEGIN, path, m.group(1))
706            stream = stream[m.end():]
707            yield t
708            continue
709        m = value_re.match(stream)
710        if m:
711            t = Token(_tVALUE, path + [m.group(1)], m.group(1), m.group(2))
712            stream = stream[m.end():]
713            yield t
714            continue
715        raise ValueError("Parse error at [..%s..]" % (stream[:16]))
716
717
718def make_section(path, contents=None):
719    "Create a token sequence representing a section"
720    if not contents:
721        contents = []
722    sp = path.split('.')
723    name = sp[-1]
724    for t in contents:
725        if t.path and not t.path.startswith(path):
726            raise ValueError("%s (%s) not in path %s" % (t.path, t.key, path))
727    return [Token(_tBEGIN, sp, name)] + contents + [Token(_tEND, [])]
728
729
730def make_value(path, value):
731    "Create a token sequence representing a value"
732    sp = path.split('.')
733    name = sp[-1]
734    return [Token(_tVALUE, sp, name, value)]
735
736
737class Parser(object):
738    def __init__(self, data):
739        self._tokens = list(corosync_tokenizer(data))
740
741    def find(self, name, start=0):
742        """Gets the index of the element with the given path"""
743        for i, t in enumerate(self._tokens[start:]):
744            if t.path == name:
745                return i + start
746        return -1
747
748    def find_bounds(self, name, start=0):
749        """find the (start, end) of the next instance of name found at start"""
750        i = self.find(name, start)
751        if i < 0:
752            return -1, -1
753        if self._tokens[i].token != _tBEGIN:
754            return i, i
755        e = i + 1
756        depth = 0
757        while e < len(self._tokens):
758            t = self._tokens[e]
759            if t.token == _tBEGIN:
760                depth += 1
761            if t.token == _tEND:
762                depth -= 1
763            if depth < 0:
764                break
765            e += 1
766        if e == len(self._tokens):
767            raise ValueError("Unclosed section")
768        return i, e
769
770    def get(self, path):
771        """Gets the value for the key (if any)"""
772        for t in self._tokens:
773            if t.token == _tVALUE and t.path == path:
774                return t.value
775        return None
776
777    def get_all(self, path):
778        """Returns all values matching path"""
779        ret = []
780        for t in self._tokens:
781            if t.token == _tVALUE and t.path == path:
782                ret.append(t.value)
783        return ret
784
785    def all_paths(self):
786        """Returns all value paths"""
787        ret = []
788        for t in self._tokens:
789            if t.token == _tVALUE:
790                ret.append(t.path)
791        return ret
792
793    def count(self, path):
794        """Returns the number of elements matching path"""
795        n = 0
796        for t in self._tokens:
797            if t.path == path:
798                n += 1
799        return n
800
801    def remove(self, path):
802        """Removes the given section or value"""
803        i, e = self.find_bounds(path)
804        if i < 0:
805            return
806        self._tokens = self._tokens[:i] + self._tokens[(e+1):]
807
808    def remove_section_where(self, path, key, value):
809        """
810        Remove section which contains key: value
811        Used to remove node definitions
812        """
813        nth = -1
814        start = 0
815        keypath = '.'.join([path, key])
816        while True:
817            nth += 1
818            i, e = self.find_bounds(path, start)
819            start = e + 1
820            if i < 0:
821                break
822            k = self.find(keypath, i)
823            if k < 0 or k > e:
824                continue
825            vt = self._tokens[k]
826            if vt.token == _tVALUE and vt.value == value:
827                self._tokens = self._tokens[:i] + self._tokens[(e+1):]
828                return nth
829        return -1
830
831    def add(self, path, tokens):
832        """Adds tokens to a section"""
833        common_debug("corosync.add (%s) (%s)" % (path, tokens))
834        if not path:
835            self._tokens += tokens
836            return
837        start = self.find(path)
838        if start < 0:
839            return None
840        depth = 0
841        end = None
842        for i, t in enumerate(self._tokens[start + 1:]):
843            if t.token == _tBEGIN:
844                depth += 1
845            elif t.token == _tEND:
846                depth -= 1
847            if depth < 0:
848                end = start + i + 1
849                break
850        if end is None:
851            raise ValueError("Unterminated section at %s" % (start))
852        self._tokens = self._tokens[:end] + tokens + self._tokens[end:]
853
854    def set(self, path, value):
855        """Sets a key: value entry. sections are given
856        via dot-notation."""
857        i = self.find(path)
858        if i < 0:
859            spath = path.split('.')
860            return self.add('.'.join(spath[:-1]),
861                            make_value(path, value))
862        if self._tokens[i].token != _tVALUE:
863            raise ValueError("%s is not a value" % (path))
864        self._tokens[i].value = value
865
866    def to_string(self):
867        '''
868        Serialize tokens into the corosync.conf
869        file format
870        '''
871        def joiner(tstream):
872            indent = 0
873            last = None
874            while tstream:
875                t = tstream[0]
876                if indent and t.token == _tEND:
877                    indent -= 1
878                s = ''
879                if t.token == _tCOMMENT and (last and last.token != _tCOMMENT):
880                    s += '\n'
881                s += ('\t'*indent) + str(t) + '\n'
882                if t.token == _tEND:
883                    s += '\n'
884                yield s
885                if t.token == _tBEGIN:
886                    indent += 1
887                last = t
888                tstream = tstream[1:]
889        return ''.join(joiner(self._tokens))
890
891
892def logfile(conftext):
893    '''
894    Return corosync logfile (if set)
895    '''
896    return Parser(conftext).get('logging.logfile')
897
898
899def push_configuration(nodes):
900    '''
901    Push the local configuration to the list of remote nodes
902    '''
903    return utils.cluster_copy_file(conf(), nodes)
904
905
906def pull_configuration(from_node):
907    '''
908    Copy the configuration from the given node to this node
909    '''
910    local_path = conf()
911    _, fname = tmpfiles.create()
912    print("Retrieving %s:%s..." % (from_node, local_path))
913    cmd = ['scp', '-qC',
914           '-o', 'PasswordAuthentication=no',
915           '-o', 'StrictHostKeyChecking=no',
916           '%s:%s' % (from_node, local_path),
917           fname]
918    rc = utils.ext_cmd_nosudo(cmd, shell=False)
919    if rc == 0:
920        data = open(fname).read()
921        newhash = hash(data)
922        if os.path.isfile(local_path):
923            oldata = open(local_path).read()
924            oldhash = hash(oldata)
925            if newhash == oldhash:
926                print("No change.")
927                return
928        print("Writing %s:%s..." % (utils.this_node(), local_path))
929        local_file = open(local_path, 'w')
930        local_file.write(data)
931        local_file.close()
932    else:
933        raise ValueError("Failed to retrieve %s from %s" % (local_path, from_node))
934
935
936def diff_configuration(nodes, checksum=False):
937    local_path = conf()
938    this_node = utils.this_node()
939    nodes = list(nodes)
940    if checksum:
941        utils.remote_checksum(local_path, nodes, this_node)
942    elif len(nodes) == 1:
943        utils.remote_diff_this(local_path, nodes, this_node)
944    elif this_node in nodes:
945        nodes.remove(this_node)
946        utils.remote_diff_this(local_path, nodes, this_node)
947    elif nodes:
948        utils.remote_diff(local_path, nodes)
949
950
951def get_free_nodeid(parser):
952    ids = parser.get_all('nodelist.node.nodeid')
953    if not ids:
954        return 1
955    ids = [int(i) for i in ids]
956    max_id = max(ids) + 1
957    for i in range(1, max_id):
958        if i not in ids:
959            return i
960    return max_id
961
962
963def get_ip(node):
964    try:
965        return socket.gethostbyname(node)
966    except socket.error:
967        return None
968
969
970def get_all_paths():
971    f = open(conf()).read()
972    p = Parser(f)
973    return p.all_paths()
974
975
976def get_value(path):
977    f = open(conf()).read()
978    p = Parser(f)
979    return p.get(path)
980
981
982def get_values(path):
983    f = open(conf()).read()
984    p = Parser(f)
985    return p.get_all(path)
986
987
988def set_value(path, value):
989    f = open(conf()).read()
990    p = Parser(f)
991    p.set(path, value)
992    utils.str2file(p.to_string(), conf())
993
994
995class IPAlreadyConfiguredError(Exception):
996    pass
997
998
999def find_configured_ip(ip_list):
1000    """
1001    find if the same IP already configured
1002    If so, raise IPAlreadyConfiguredError
1003    """
1004    with open(conf()) as f:
1005        p = Parser(f.read())
1006
1007    # get exist ip list from corosync.conf
1008    corosync_iplist = []
1009    for path in set(p.all_paths()):
1010        if re.search('nodelist.node.ring[0-9]*_addr', path):
1011            corosync_iplist.extend(p.get_all(path))
1012
1013    # all_possible_ip is a ip set to check whether one of them already configured
1014    all_possible_ip = set(ip_list)
1015    # get local ip list
1016    is_ipv6 = utils.IP.is_ipv6(ip_list[0])
1017    local_ip_list = utils.InterfacesInfo.get_local_ip_list(is_ipv6)
1018    # extend all_possible_ip if ip_list contain local ip
1019    # to avoid this scenarios in join node:
1020    #   eth0's ip already configured in corosync.conf
1021    #   eth1's ip also want to add in nodelist
1022    # if this scenarios happened, raise IPAlreadyConfiguredError
1023    if bool(set(ip_list) & set(local_ip_list)):
1024        all_possible_ip |= set(local_ip_list)
1025    configured_ip = list(all_possible_ip & set(corosync_iplist))
1026    if configured_ip:
1027        raise IPAlreadyConfiguredError("IP {} was already configured".format(','.join(configured_ip)))
1028
1029
1030def add_node_ucast(ip_list, node_id=None):
1031
1032    find_configured_ip(ip_list)
1033
1034    with open(conf()) as f:
1035        p = Parser(f.read())
1036
1037    if node_id is None:
1038        node_id = get_free_nodeid(p)
1039    node_value = []
1040    for i, addr in enumerate(ip_list):
1041        node_value += make_value('nodelist.node.ring{}_addr'.format(i), addr)
1042    node_value += make_value('nodelist.node.nodeid', str(node_id))
1043
1044    if get_values("nodelist.node.ring0_addr") == []:
1045        p.add('', make_section('nodelist', []))
1046    p.add('nodelist', make_section('nodelist.node', node_value))
1047
1048    num_nodes = p.count('nodelist.node')
1049    p.set('quorum.two_node', '1' if num_nodes == 2 else '0')
1050    if p.get("quorum.device.model") == "net":
1051        p.set('quorum.two_node', '0')
1052
1053    utils.str2file(p.to_string(), conf())
1054
1055
1056def add_node(addr, name=None):
1057    '''
1058    Add node to corosync.conf
1059    '''
1060    coronodes = None
1061    nodes = None
1062    nodenames = None
1063    coronodes = utils.list_corosync_nodes()
1064    nodenames = utils.list_corosync_node_names()
1065    try:
1066        nodes = utils.list_cluster_nodes()
1067    except Exception:
1068        nodes = []
1069    ipaddr = get_ip(addr)
1070    if addr in nodenames + coronodes or (ipaddr and ipaddr in coronodes):
1071        err_buf.warning("%s already in corosync.conf" % (addr))
1072        return
1073    if name and name in nodenames + coronodes:
1074        err_buf.warning("%s already in corosync.conf" % (name))
1075        return
1076    if addr in nodes:
1077        err_buf.warning("%s already in configuration" % (addr))
1078        return
1079    if name and name in nodes:
1080        err_buf.warning("%s already in configuration" % (name))
1081        return
1082
1083    f = open(conf()).read()
1084    p = Parser(f)
1085
1086    node_addr = addr
1087    node_id = get_free_nodeid(p)
1088    node_name = name
1089    node_value = (make_value('nodelist.node.ring0_addr', node_addr) +
1090                  make_value('nodelist.node.nodeid', str(node_id)))
1091    if node_name:
1092        node_value += make_value('nodelist.node.name', node_name)
1093
1094    p.add('nodelist', make_section('nodelist.node', node_value))
1095
1096    num_nodes = p.count('nodelist.node')
1097    p.set('quorum.two_node', '1' if num_nodes == 2 else '0')
1098    if p.get("quorum.device.model") == "net":
1099        p.set('quorum.two_node', '0')
1100
1101    utils.str2file(p.to_string(), conf())
1102
1103    # update running config (if any)
1104    if nodes:
1105        utils.ext_cmd(["corosync-cmapctl",
1106                       "-s", "nodelist.node.%s.nodeid" % (num_nodes - 1),
1107                       "u32", str(node_id)], shell=False)
1108        utils.ext_cmd(["corosync-cmapctl",
1109                       "-s", "nodelist.node.%s.ring0_addr" % (num_nodes - 1),
1110                       "str", node_addr], shell=False)
1111        if node_name:
1112            utils.ext_cmd(["corosync-cmapctl",
1113                           "-s", "nodelist.node.%s.name" % (num_nodes - 1),
1114                           "str", node_name], shell=False)
1115
1116
1117def del_node(addr):
1118    '''
1119    Remove node from corosync
1120    '''
1121    f = open(conf()).read()
1122    p = Parser(f)
1123    nth = p.remove_section_where('nodelist.node', 'ring0_addr', addr)
1124    if nth == -1:
1125        return
1126
1127    num_nodes = p.count('nodelist.node')
1128    p.set('quorum.two_node', '1' if num_nodes == 2 else '0')
1129    if p.get("quorum.device.model") == "net":
1130        p.set('quorum.two_node', '0')
1131
1132    utils.str2file(p.to_string(), conf())
1133
1134
1135_COROSYNC_CONF_TEMPLATE_HEAD = """# Please read the corosync.conf.5 manual page
1136
1137totem {
1138    version:    2
1139    secauth:    on
1140    crypto_hash:    sha1
1141    crypto_cipher:  aes256
1142    cluster_name:   %(clustername)s
1143    clear_node_high_bit: yes
1144
1145    token:      5000
1146    token_retransmits_before_loss_const: 10
1147    join:       60
1148    consensus:  6000
1149    max_messages:   20
1150"""
1151_COROSYNC_CONF_TEMPLATE_TAIL = """
1152    %(rrp_mode)s
1153    %(transport)s
1154    %(ipv6)s
1155    %(ipv6_nodeid)s
1156}
1157
1158logging {
1159    fileline:   off
1160    to_stderr:  no
1161    to_logfile:     no
1162    logfile:    /var/log/cluster/corosync.log
1163    to_syslog:  yes
1164    debug:      off
1165    timestamp:  on
1166    logger_subsys {
1167        subsys:     QUORUM
1168        debug:  off
1169    }
1170}
1171
1172%(nodelist)s
1173%(quorum)s
1174"""
1175_COROSYNC_CONF_TEMPLATE_RING = """
1176    interface {
1177        ringnumber: %(number)d
1178        %(bindnetaddr)s
1179%(mcast)s
1180        ttl: 1
1181    }
1182"""
1183
1184
1185def create_configuration(clustername="hacluster",
1186                         bindnetaddr=None,
1187                         mcastaddr=None,
1188                         mcastport=None,
1189                         ringXaddr=None,
1190                         transport=None,
1191                         ipv6=False,
1192                         nodeid=None,
1193                         two_rings=False,
1194                         qdevice=None):
1195
1196    if transport == "udpu":
1197        ring_tmpl = ""
1198        for i in 0, 1:
1199            ring_tmpl += "        ring{}_addr: {}\n".format(i, ringXaddr[i])
1200            if not two_rings:
1201                break
1202
1203        nodelist_tmpl = """nodelist {
1204    node {
1205%(ringaddr)s
1206        nodeid: 1
1207    }
1208}
1209""" % {"ringaddr": ring_tmpl}
1210    else:
1211        nodelist_tmpl = ""
1212
1213    transport_tmpl = ""
1214    if transport is not None:
1215        transport_tmpl = "transport: {}\n".format(transport)
1216
1217    rrp_mode_tmp = ""
1218    if two_rings:
1219        rrp_mode_tmp = "rrp_mode:  passive"
1220
1221    ipv6_tmpl = ""
1222    ipv6_nodeid = ""
1223    if ipv6:
1224        ipv6_tmpl = "ip_version:  ipv6"
1225        if transport != "udpu":
1226            ipv6_nodeid = "nodeid:  %d" % nodeid
1227
1228    quorum_tmpl = """quorum {
1229    # Enable and configure quorum subsystem (default: off)
1230    # see also corosync.conf.5 and votequorum.5
1231    provider: corosync_votequorum
1232    expected_votes: 1
1233    two_node: 0
1234}
1235"""
1236    if qdevice is not None:
1237        quorum_tmpl = """quorum {
1238    # Enable and configure quorum subsystem (default: off)
1239    # see also corosync.conf.5 and votequorum.5
1240    provider: corosync_votequorum
1241    expected_votes: 1
1242    two_node: 0
1243    device {
1244      votes: 0
1245      model: net
1246      net {
1247        tls: %(tls)s
1248        host: %(ip)s
1249        port: %(port)s
1250        algorithm: %(algo)s
1251        tie_breaker: %(tie_breaker)s
1252      }
1253    }
1254}
1255""" % qdevice.__dict__
1256
1257    config_common = {
1258        "clustername": clustername,
1259        "nodelist": nodelist_tmpl,
1260        "quorum": quorum_tmpl,
1261        "ipv6": ipv6_tmpl,
1262        "ipv6_nodeid": ipv6_nodeid,
1263        "rrp_mode": rrp_mode_tmp,
1264        "transport": transport_tmpl
1265    }
1266
1267    _COROSYNC_CONF_TEMPLATE_RING_ALL = ""
1268    mcast_tmp = []
1269    bindnetaddr_tmp = []
1270    config_ring = []
1271    for i in 0, 1:
1272        mcast_tmp.append("")
1273        if mcastaddr is not None:
1274            mcast_tmp[i] += "        mcastaddr:   {}\n".format(mcastaddr[i])
1275        if mcastport is not None:
1276            mcast_tmp[i] += "        mcastport:   {}".format(mcastport[i])
1277
1278        bindnetaddr_tmp.append("")
1279        if bindnetaddr is None:
1280            bindnetaddr_tmp[i] = ""
1281        else:
1282            bindnetaddr_tmp[i] = "bindnetaddr: {}".format(bindnetaddr[i])
1283
1284        config_ring.append("")
1285        config_ring[i] = {
1286            "bindnetaddr": bindnetaddr_tmp[i],
1287            "mcast": mcast_tmp[i],
1288            "number": i
1289        }
1290        _COROSYNC_CONF_TEMPLATE_RING_ALL += _COROSYNC_CONF_TEMPLATE_RING % config_ring[i]
1291
1292        if not two_rings:
1293            break
1294
1295    _COROSYNC_CONF_TEMPLATE = _COROSYNC_CONF_TEMPLATE_HEAD + \
1296                              _COROSYNC_CONF_TEMPLATE_RING_ALL + \
1297                              _COROSYNC_CONF_TEMPLATE_TAIL
1298    utils.str2file(_COROSYNC_CONF_TEMPLATE % config_common, conf())
1299