1import errno 2import shutil 3import os 4import platform 5from os.path import expanduser, exists, dirname 6import re 7from typing import TextIO 8from configobj import ConfigObj 9 10 11def config_location(): 12 if "XDG_CONFIG_HOME" in os.environ: 13 return "%s/pgcli/" % expanduser(os.environ["XDG_CONFIG_HOME"]) 14 elif platform.system() == "Windows": 15 return os.getenv("USERPROFILE") + "\\AppData\\Local\\dbcli\\pgcli\\" 16 else: 17 return expanduser("~/.config/pgcli/") 18 19 20def load_config(usr_cfg, def_cfg=None): 21 # avoid config merges when possible. For writing, we need an umerged config instance. 22 # see https://github.com/dbcli/pgcli/issues/1240 and https://github.com/DiffSK/configobj/issues/171 23 if def_cfg: 24 cfg = ConfigObj() 25 cfg.merge(ConfigObj(def_cfg, interpolation=False)) 26 cfg.merge(ConfigObj(expanduser(usr_cfg), interpolation=False, encoding="utf-8")) 27 else: 28 cfg = ConfigObj(expanduser(usr_cfg), interpolation=False, encoding="utf-8") 29 cfg.filename = expanduser(usr_cfg) 30 return cfg 31 32 33def ensure_dir_exists(path): 34 parent_dir = expanduser(dirname(path)) 35 os.makedirs(parent_dir, exist_ok=True) 36 37 38def write_default_config(source, destination, overwrite=False): 39 destination = expanduser(destination) 40 if not overwrite and exists(destination): 41 return 42 43 ensure_dir_exists(destination) 44 45 shutil.copyfile(source, destination) 46 47 48def upgrade_config(config, def_config): 49 cfg = load_config(config, def_config) 50 cfg.write() 51 52 53def get_config_filename(pgclirc_file=None): 54 return pgclirc_file or "%sconfig" % config_location() 55 56 57def get_config(pgclirc_file=None): 58 from pgcli import __file__ as package_root 59 60 package_root = os.path.dirname(package_root) 61 62 pgclirc_file = get_config_filename(pgclirc_file) 63 64 default_config = os.path.join(package_root, "pgclirc") 65 write_default_config(default_config, pgclirc_file) 66 67 return load_config(pgclirc_file, default_config) 68 69 70def get_casing_file(config): 71 casing_file = config["main"]["casing_file"] 72 if casing_file == "default": 73 casing_file = config_location() + "casing" 74 return casing_file 75 76 77def skip_initial_comment(f_stream: TextIO) -> int: 78 """ 79 Initial comment in ~/.pg_service.conf is not always marked with '#' 80 which crashes the parser. This function takes a file object and 81 "rewinds" it to the beginning of the first section, 82 from where on it can be parsed safely 83 84 :return: number of skipped lines 85 """ 86 section_regex = r"\s*\[" 87 pos = f_stream.tell() 88 lines_skipped = 0 89 while True: 90 line = f_stream.readline() 91 if line == "": 92 break 93 if re.match(section_regex, line) is not None: 94 f_stream.seek(pos) 95 break 96 else: 97 pos += len(line) 98 lines_skipped += 1 99 return lines_skipped 100