1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net> 4 5import datetime 6import http 7import json 8import os 9import re 10import shutil 11import signal 12import tempfile 13import zipfile 14from contextlib import suppress 15from typing import Any, Callable, Dict, Iterator, Match, Optional, Tuple, Union, Type 16from urllib.error import HTTPError 17from urllib.request import Request, urlopen 18 19from kitty.config import atomic_save, parse_config 20from kitty.constants import cache_dir, config_dir 21from kitty.options.types import Options as KittyOptions 22from kitty.rgb import Color 23from kitty.utils import reload_conf_in_all_kitties 24 25from ..choose.match import match 26 27MARK_BEFORE = '\033[33m' 28MARK_AFTER = '\033[39m' 29 30 31def patch_conf(raw: str, theme_name: str) -> str: 32 addition = f'# BEGIN_KITTY_THEME\n# {theme_name}\ninclude current-theme.conf\n# END_KITTY_THEME' 33 nraw, num = re.subn(r'^# BEGIN_KITTY_THEME.+?# END_KITTY_THEME', addition, raw, flags=re.MULTILINE | re.DOTALL) 34 if not num: 35 if raw: 36 raw += '\n\n' 37 nraw = raw + addition 38 # comment out all existing color definitions 39 color_conf_items = [f'color{i}' for i in range(256)] + list(filter(None, ''' 40foreground 41background 42selection_foreground 43selection_background 44 45cursor 46cursor_text_color 47 48url_color 49 50active_border_color 51inactive_border_color 52bell_border_color 53 54wayland_titlebar_color 55macos_titlebar_color 56 57active_tab_foreground 58active_tab_background 59inactive_tab_foreground 60inactive_tab_background 61tab_bar_background 62 63mark1_foreground 64mark1_background 65mark2_foreground 66mark2_background 67mark3_foreground 68mark3_background 69'''.splitlines())) 70 pat = fr'^\s*({"|".join(color_conf_items)})\b' 71 return re.sub(pat, r'# \1', nraw, flags=re.MULTILINE) 72 73 74def set_comment_in_zip_file(path: str, data: str) -> None: 75 with zipfile.ZipFile(path, 'a') as zf: 76 zf.comment = data.encode('utf-8') 77 78 79class NoCacheFound(ValueError): 80 pass 81 82 83def fetch_themes( 84 name: str = 'kitty-themes', 85 url: str = 'https://codeload.github.com/kovidgoyal/kitty-themes/zip/master', 86 cache_age: float = 1, 87) -> str: 88 now = datetime.datetime.now(datetime.timezone.utc) 89 cache_age_delta = datetime.timedelta(days=cache_age) 90 91 class Metadata: 92 def __init__(self) -> None: 93 self.etag = '' 94 self.timestamp = now 95 96 def __str__(self) -> str: 97 return json.dumps({'etag': self.etag, 'timestamp': self.timestamp.isoformat()}) 98 99 dest_path = os.path.join(cache_dir(), f'{name}.zip') 100 m = Metadata() 101 with suppress(Exception), zipfile.ZipFile(dest_path, 'r') as zf: 102 q = json.loads(zf.comment) 103 m.etag = str(q.get('etag') or '') 104 m.timestamp = datetime.datetime.fromisoformat(q['timestamp']) 105 if cache_age < 0 or (now - m.timestamp) < cache_age_delta: 106 return dest_path 107 if cache_age < 0: 108 raise NoCacheFound('No local themes cache found and negative cache age specified, aborting') 109 110 rq = Request(url) 111 m.timestamp = now 112 if m.etag: 113 rq.add_header('If-None-Match', m.etag) 114 try: 115 res = urlopen(rq, timeout=30) 116 except HTTPError as e: 117 if m.etag and e.code == http.HTTPStatus.NOT_MODIFIED: 118 set_comment_in_zip_file(dest_path, str(m)) 119 return dest_path 120 raise 121 m.etag = res.headers.get('etag') or '' 122 123 needs_delete = False 124 try: 125 with tempfile.NamedTemporaryFile(suffix='-' + os.path.basename(dest_path), dir=os.path.dirname(dest_path), delete=False) as f: 126 needs_delete = True 127 shutil.copyfileobj(res, f) 128 f.flush() 129 set_comment_in_zip_file(f.name, str(m)) 130 os.replace(f.name, dest_path) 131 needs_delete = False 132 finally: 133 if needs_delete: 134 os.unlink(f.name) 135 return dest_path 136 137 138def zip_file_loader(path_to_zip: str, theme_file_name: str, file_name: str) -> Callable[[], str]: 139 140 name = os.path.join(os.path.dirname(theme_file_name), file_name) 141 142 def zip_loader() -> str: 143 with zipfile.ZipFile(path_to_zip, 'r') as zf, zf.open(name) as f: 144 return f.read().decode('utf-8') 145 146 return zip_loader 147 148 149def theme_name_from_file_name(fname: str) -> str: 150 ans = fname.rsplit('.', 1)[0] 151 ans = ans.replace('_', ' ') 152 153 def camel_case(m: Match) -> str: 154 return str(m.group(1) + ' ' + m.group(2)) 155 156 ans = re.sub(r'([a-z])([A-Z])', camel_case, ans) 157 return ' '.join(x.capitalize() for x in filter(None, ans.split())) 158 159 160class LineParser: 161 162 def __init__(self) -> None: 163 self.in_metadata = False 164 self.in_blurb = False 165 self.keep_going = True 166 167 def __call__(self, line: str, ans: Dict[str, Any]) -> None: 168 is_block = line.startswith('## ') 169 if self.in_metadata and not is_block: 170 self.keep_going = False 171 return 172 if not self.in_metadata and is_block: 173 self.in_metadata = True 174 if not self.in_metadata: 175 return 176 line = line[3:] 177 if self.in_blurb: 178 ans['blurb'] += ' ' + line 179 return 180 try: 181 key, val = line.split(':', 1) 182 except Exception: 183 self.keep_going = False 184 return 185 key = key.strip().lower() 186 val = val.strip() 187 if val: 188 ans[key] = val 189 if key == 'blurb': 190 self.in_blurb = True 191 192 193def parse_theme(fname: str, raw: str, exc_class: Type[BaseException] = SystemExit) -> Dict[str, Any]: 194 lines = raw.splitlines() 195 conf = parse_config(lines) 196 bg = conf.get('background', Color()) 197 is_dark = max(bg) < 115 198 ans: Dict[str, Any] = {'name': theme_name_from_file_name(fname)} 199 parser = LineParser() 200 for i, line in enumerate(raw.splitlines()): 201 line = line.strip() 202 if not line: 203 continue 204 try: 205 parser(line, ans) 206 except Exception as e: 207 raise exc_class( 208 f'Failed to parse {fname} line {i+1} with error: {e}') 209 if not parser.keep_going: 210 break 211 if is_dark: 212 ans['is_dark'] = True 213 ans['num_settings'] = len(conf) - len(parse_config(())) 214 if ans['num_settings'] < 1 and fname != 'default.conf': 215 raise exc_class(f'The theme {fname} has no settings') 216 return ans 217 218 219def update_theme_file(path: str) -> bool: 220 with open(path) as f: 221 raw = f.read() 222 td = parse_theme(os.path.basename(path), raw, exc_class=ValueError) 223 if 'upstream' not in td: 224 return False 225 nraw = urlopen(td['upstream']).read().decode('utf-8') 226 if raw == nraw: 227 return False 228 atomic_save(nraw.encode('utf-8'), path) 229 return True 230 231 232class Theme: 233 name: str = '' 234 author: str = '' 235 license: str = '' 236 is_dark: bool = False 237 blurb: str = '' 238 num_settings: int = 0 239 240 def apply_dict(self, d: Dict[str, Any]) -> None: 241 self.name = str(d['name']) 242 for x in ('author', 'license', 'blurb'): 243 a = d.get(x) 244 if isinstance(a, str): 245 setattr(self, x, a) 246 for x in ('is_dark', 'num_settings'): 247 a = d.get(x) 248 if isinstance(a, int): 249 setattr(self, x, a) 250 251 def __init__(self, loader: Callable[[], str]): 252 self._loader = loader 253 self._raw: Optional[str] = None 254 self._opts: Optional[KittyOptions] = None 255 256 @property 257 def raw(self) -> str: 258 if self._raw is None: 259 self._raw = self._loader() 260 return self._raw 261 262 @property 263 def kitty_opts(self) -> KittyOptions: 264 if self._opts is None: 265 self._opts = KittyOptions(options_dict=parse_config(self.raw.splitlines())) 266 return self._opts 267 268 def save_in_dir(self, dirpath: str) -> None: 269 atomic_save(self.raw.encode('utf-8'), os.path.join(dirpath, f'{self.name}.conf')) 270 271 def save_in_conf(self, confdir: str, reload_in: str) -> None: 272 atomic_save(self.raw.encode('utf-8'), os.path.join(confdir, 'current-theme.conf')) 273 confpath = os.path.realpath(os.path.join(confdir, 'kitty.conf')) 274 try: 275 with open(confpath) as f: 276 raw = f.read() 277 except FileNotFoundError: 278 raw = '' 279 nraw = patch_conf(raw, self.name) 280 if raw: 281 with open(confpath + '.bak', 'w') as f: 282 f.write(raw) 283 atomic_save(nraw.encode('utf-8'), confpath) 284 if reload_in == 'parent': 285 if 'KITTY_PID' in os.environ: 286 os.kill(int(os.environ['KITTY_PID']), signal.SIGUSR1) 287 elif reload_in == 'all': 288 reload_conf_in_all_kitties() 289 290 291class Themes: 292 293 def __init__(self) -> None: 294 self.themes: Dict[str, Theme] = {} 295 self.index_map: Tuple[str, ...] = () 296 297 def __len__(self) -> int: 298 return len(self.themes) 299 300 def __iter__(self) -> Iterator[Theme]: 301 return iter(self.themes.values()) 302 303 def __getitem__(self, key: Union[int, str]) -> Theme: 304 if isinstance(key, str): 305 return self.themes[key] 306 if key < 0: 307 key += len(self.index_map) 308 return self.themes[self.index_map[key]] 309 310 def load_from_zip(self, path_to_zip: str) -> None: 311 with zipfile.ZipFile(path_to_zip, 'r') as zf: 312 for name in zf.namelist(): 313 if os.path.basename(name) == 'themes.json': 314 theme_file_name = name 315 with zf.open(theme_file_name) as f: 316 items = json.loads(f.read()) 317 break 318 else: 319 raise ValueError(f'No themes.json found in {path_to_zip}') 320 321 for item in items: 322 t = Theme(zip_file_loader(path_to_zip, theme_file_name, item['file'])) 323 t.apply_dict(item) 324 if t.name: 325 self.themes[t.name] = t 326 327 def load_from_dir(self, path: str) -> None: 328 if not os.path.isdir(path): 329 return 330 for name in os.listdir(path): 331 if name.endswith('.conf'): 332 with open(os.path.join(path, name), 'rb') as f: 333 raw = f.read().decode() 334 try: 335 d = parse_theme(name, raw) 336 except (Exception, SystemExit): 337 continue 338 t = Theme(raw.__str__) 339 t.apply_dict(d) 340 if t.name: 341 self.themes[t.name] = t 342 343 def filtered(self, is_ok: Callable[[Theme], bool]) -> 'Themes': 344 ans = Themes() 345 346 def sort_key(k: Tuple[str, Theme]) -> str: 347 return k[1].name.lower() 348 349 ans.themes = {k: v for k, v in sorted(self.themes.items(), key=sort_key) if is_ok(v)} 350 ans.index_map = tuple(ans.themes) 351 return ans 352 353 def copy(self) -> 'Themes': 354 ans = Themes() 355 ans.themes = self.themes.copy() 356 ans.index_map = self.index_map 357 return ans 358 359 def apply_search( 360 self, expression: str, mark_before: str = MARK_BEFORE, mark_after: str = MARK_AFTER 361 ) -> Iterator[str]: 362 raw = '\n'.join(self.themes) 363 results = match(raw, expression, positions=True, level1=' ') 364 themes: Dict[str, Theme] = {} 365 for r in results: 366 pos, k = r.split(':', 1) 367 positions = tuple(map(int, pos.split(','))) 368 text = k 369 for p in reversed(positions): 370 text = text[:p] + mark_before + text[p] + mark_after + text[p+1:] 371 themes[k] = self.themes[k] 372 yield text 373 self.themes = themes 374 self.index_map = tuple(self.themes) 375 376 377def load_themes(cache_age: float = 1., ignore_no_cache: bool = False) -> Themes: 378 ans = Themes() 379 try: 380 fetched = fetch_themes(cache_age=cache_age) 381 except NoCacheFound: 382 if not ignore_no_cache: 383 raise 384 ans.load_from_zip(fetched) 385 ans.load_from_dir(os.path.join(config_dir, 'themes')) 386 ans.index_map = tuple(ans.themes) 387 return ans 388