1import errno
2import shutil
3import os
4import platform
5from os.path import expanduser, exists, dirname
6import re
7from typing import TextIO
8from configobj import ConfigObj
9
10
11def config_location():
12    if "XDG_CONFIG_HOME" in os.environ:
13        return "%s/pgcli/" % expanduser(os.environ["XDG_CONFIG_HOME"])
14    elif platform.system() == "Windows":
15        return os.getenv("USERPROFILE") + "\\AppData\\Local\\dbcli\\pgcli\\"
16    else:
17        return expanduser("~/.config/pgcli/")
18
19
20def load_config(usr_cfg, def_cfg=None):
21    # avoid config merges when possible. For writing, we need an umerged config instance.
22    # see https://github.com/dbcli/pgcli/issues/1240 and https://github.com/DiffSK/configobj/issues/171
23    if def_cfg:
24        cfg = ConfigObj()
25        cfg.merge(ConfigObj(def_cfg, interpolation=False))
26        cfg.merge(ConfigObj(expanduser(usr_cfg), interpolation=False, encoding="utf-8"))
27    else:
28        cfg = ConfigObj(expanduser(usr_cfg), interpolation=False, encoding="utf-8")
29    cfg.filename = expanduser(usr_cfg)
30    return cfg
31
32
33def ensure_dir_exists(path):
34    parent_dir = expanduser(dirname(path))
35    os.makedirs(parent_dir, exist_ok=True)
36
37
38def write_default_config(source, destination, overwrite=False):
39    destination = expanduser(destination)
40    if not overwrite and exists(destination):
41        return
42
43    ensure_dir_exists(destination)
44
45    shutil.copyfile(source, destination)
46
47
48def upgrade_config(config, def_config):
49    cfg = load_config(config, def_config)
50    cfg.write()
51
52
53def get_config_filename(pgclirc_file=None):
54    return pgclirc_file or "%sconfig" % config_location()
55
56
57def get_config(pgclirc_file=None):
58    from pgcli import __file__ as package_root
59
60    package_root = os.path.dirname(package_root)
61
62    pgclirc_file = get_config_filename(pgclirc_file)
63
64    default_config = os.path.join(package_root, "pgclirc")
65    write_default_config(default_config, pgclirc_file)
66
67    return load_config(pgclirc_file, default_config)
68
69
70def get_casing_file(config):
71    casing_file = config["main"]["casing_file"]
72    if casing_file == "default":
73        casing_file = config_location() + "casing"
74    return casing_file
75
76
77def skip_initial_comment(f_stream: TextIO) -> int:
78    """
79    Initial comment in ~/.pg_service.conf is not always marked with '#'
80    which crashes the parser. This function takes a file object and
81    "rewinds" it to the beginning of the first section,
82    from where on it can be parsed safely
83
84    :return: number of skipped lines
85    """
86    section_regex = r"\s*\["
87    pos = f_stream.tell()
88    lines_skipped = 0
89    while True:
90        line = f_stream.readline()
91        if line == "":
92            break
93        if re.match(section_regex, line) is not None:
94            f_stream.seek(pos)
95            break
96        else:
97            pos += len(line)
98            lines_skipped += 1
99    return lines_skipped
100