1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
4
5import datetime
6import http
7import json
8import os
9import re
10import shutil
11import signal
12import tempfile
13import zipfile
14from contextlib import suppress
15from typing import Any, Callable, Dict, Iterator, Match, Optional, Tuple, Union, Type
16from urllib.error import HTTPError
17from urllib.request import Request, urlopen
18
19from kitty.config import atomic_save, parse_config
20from kitty.constants import cache_dir, config_dir
21from kitty.options.types import Options as KittyOptions
22from kitty.rgb import Color
23from kitty.utils import reload_conf_in_all_kitties
24
25from ..choose.match import match
26
27MARK_BEFORE = '\033[33m'
28MARK_AFTER = '\033[39m'
29
30
31def patch_conf(raw: str, theme_name: str) -> str:
32    addition = f'# BEGIN_KITTY_THEME\n# {theme_name}\ninclude current-theme.conf\n# END_KITTY_THEME'
33    nraw, num = re.subn(r'^# BEGIN_KITTY_THEME.+?# END_KITTY_THEME', addition, raw, flags=re.MULTILINE | re.DOTALL)
34    if not num:
35        if raw:
36            raw += '\n\n'
37        nraw = raw + addition
38    # comment out all existing color definitions
39    color_conf_items = [f'color{i}' for i in range(256)] + list(filter(None, '''
40foreground
41background
42selection_foreground
43selection_background
44
45cursor
46cursor_text_color
47
48url_color
49
50active_border_color
51inactive_border_color
52bell_border_color
53
54wayland_titlebar_color
55macos_titlebar_color
56
57active_tab_foreground
58active_tab_background
59inactive_tab_foreground
60inactive_tab_background
61tab_bar_background
62
63mark1_foreground
64mark1_background
65mark2_foreground
66mark2_background
67mark3_foreground
68mark3_background
69'''.splitlines()))
70    pat = fr'^\s*({"|".join(color_conf_items)})\b'
71    return re.sub(pat, r'# \1', nraw, flags=re.MULTILINE)
72
73
74def set_comment_in_zip_file(path: str, data: str) -> None:
75    with zipfile.ZipFile(path, 'a') as zf:
76        zf.comment = data.encode('utf-8')
77
78
79class NoCacheFound(ValueError):
80    pass
81
82
83def fetch_themes(
84    name: str = 'kitty-themes',
85    url: str = 'https://codeload.github.com/kovidgoyal/kitty-themes/zip/master',
86    cache_age: float = 1,
87) -> str:
88    now = datetime.datetime.now(datetime.timezone.utc)
89    cache_age_delta = datetime.timedelta(days=cache_age)
90
91    class Metadata:
92        def __init__(self) -> None:
93            self.etag = ''
94            self.timestamp = now
95
96        def __str__(self) -> str:
97            return json.dumps({'etag': self.etag, 'timestamp': self.timestamp.isoformat()})
98
99    dest_path = os.path.join(cache_dir(), f'{name}.zip')
100    m = Metadata()
101    with suppress(Exception), zipfile.ZipFile(dest_path, 'r') as zf:
102        q = json.loads(zf.comment)
103        m.etag = str(q.get('etag') or '')
104        m.timestamp = datetime.datetime.fromisoformat(q['timestamp'])
105        if cache_age < 0 or (now - m.timestamp) < cache_age_delta:
106            return dest_path
107    if cache_age < 0:
108        raise NoCacheFound('No local themes cache found and negative cache age specified, aborting')
109
110    rq = Request(url)
111    m.timestamp = now
112    if m.etag:
113        rq.add_header('If-None-Match', m.etag)
114    try:
115        res = urlopen(rq, timeout=30)
116    except HTTPError as e:
117        if m.etag and e.code == http.HTTPStatus.NOT_MODIFIED:
118            set_comment_in_zip_file(dest_path, str(m))
119            return dest_path
120        raise
121    m.etag = res.headers.get('etag') or ''
122
123    needs_delete = False
124    try:
125        with tempfile.NamedTemporaryFile(suffix='-' + os.path.basename(dest_path), dir=os.path.dirname(dest_path), delete=False) as f:
126            needs_delete = True
127            shutil.copyfileobj(res, f)
128            f.flush()
129            set_comment_in_zip_file(f.name, str(m))
130            os.replace(f.name, dest_path)
131            needs_delete = False
132    finally:
133        if needs_delete:
134            os.unlink(f.name)
135    return dest_path
136
137
138def zip_file_loader(path_to_zip: str, theme_file_name: str, file_name: str) -> Callable[[], str]:
139
140    name = os.path.join(os.path.dirname(theme_file_name), file_name)
141
142    def zip_loader() -> str:
143        with zipfile.ZipFile(path_to_zip, 'r') as zf, zf.open(name) as f:
144            return f.read().decode('utf-8')
145
146    return zip_loader
147
148
149def theme_name_from_file_name(fname: str) -> str:
150    ans = fname.rsplit('.', 1)[0]
151    ans = ans.replace('_', ' ')
152
153    def camel_case(m: Match) -> str:
154        return str(m.group(1) + ' ' + m.group(2))
155
156    ans = re.sub(r'([a-z])([A-Z])', camel_case, ans)
157    return ' '.join(x.capitalize() for x in filter(None, ans.split()))
158
159
160class LineParser:
161
162    def __init__(self) -> None:
163        self.in_metadata = False
164        self.in_blurb = False
165        self.keep_going = True
166
167    def __call__(self, line: str, ans: Dict[str, Any]) -> None:
168        is_block = line.startswith('## ')
169        if self.in_metadata and not is_block:
170            self.keep_going = False
171            return
172        if not self.in_metadata and is_block:
173            self.in_metadata = True
174        if not self.in_metadata:
175            return
176        line = line[3:]
177        if self.in_blurb:
178            ans['blurb'] += ' ' + line
179            return
180        try:
181            key, val = line.split(':', 1)
182        except Exception:
183            self.keep_going = False
184            return
185        key = key.strip().lower()
186        val = val.strip()
187        if val:
188            ans[key] = val
189        if key == 'blurb':
190            self.in_blurb = True
191
192
193def parse_theme(fname: str, raw: str, exc_class: Type[BaseException] = SystemExit) -> Dict[str, Any]:
194    lines = raw.splitlines()
195    conf = parse_config(lines)
196    bg = conf.get('background', Color())
197    is_dark = max(bg) < 115
198    ans: Dict[str, Any] = {'name': theme_name_from_file_name(fname)}
199    parser = LineParser()
200    for i, line in enumerate(raw.splitlines()):
201        line = line.strip()
202        if not line:
203            continue
204        try:
205            parser(line, ans)
206        except Exception as e:
207            raise exc_class(
208                f'Failed to parse {fname} line {i+1} with error: {e}')
209        if not parser.keep_going:
210            break
211    if is_dark:
212        ans['is_dark'] = True
213    ans['num_settings'] = len(conf) - len(parse_config(()))
214    if ans['num_settings'] < 1 and fname != 'default.conf':
215        raise exc_class(f'The theme {fname} has no settings')
216    return ans
217
218
219def update_theme_file(path: str) -> bool:
220    with open(path) as f:
221        raw = f.read()
222    td = parse_theme(os.path.basename(path), raw, exc_class=ValueError)
223    if 'upstream' not in td:
224        return False
225    nraw = urlopen(td['upstream']).read().decode('utf-8')
226    if raw == nraw:
227        return False
228    atomic_save(nraw.encode('utf-8'), path)
229    return True
230
231
232class Theme:
233    name: str = ''
234    author: str = ''
235    license: str = ''
236    is_dark: bool = False
237    blurb: str = ''
238    num_settings: int = 0
239
240    def apply_dict(self, d: Dict[str, Any]) -> None:
241        self.name = str(d['name'])
242        for x in ('author', 'license', 'blurb'):
243            a = d.get(x)
244            if isinstance(a, str):
245                setattr(self, x, a)
246        for x in ('is_dark', 'num_settings'):
247            a = d.get(x)
248            if isinstance(a, int):
249                setattr(self, x, a)
250
251    def __init__(self, loader: Callable[[], str]):
252        self._loader = loader
253        self._raw: Optional[str] = None
254        self._opts: Optional[KittyOptions] = None
255
256    @property
257    def raw(self) -> str:
258        if self._raw is None:
259            self._raw = self._loader()
260        return self._raw
261
262    @property
263    def kitty_opts(self) -> KittyOptions:
264        if self._opts is None:
265            self._opts = KittyOptions(options_dict=parse_config(self.raw.splitlines()))
266        return self._opts
267
268    def save_in_dir(self, dirpath: str) -> None:
269        atomic_save(self.raw.encode('utf-8'), os.path.join(dirpath, f'{self.name}.conf'))
270
271    def save_in_conf(self, confdir: str, reload_in: str) -> None:
272        atomic_save(self.raw.encode('utf-8'), os.path.join(confdir, 'current-theme.conf'))
273        confpath = os.path.realpath(os.path.join(confdir, 'kitty.conf'))
274        try:
275            with open(confpath) as f:
276                raw = f.read()
277        except FileNotFoundError:
278            raw = ''
279        nraw = patch_conf(raw, self.name)
280        if raw:
281            with open(confpath + '.bak', 'w') as f:
282                f.write(raw)
283        atomic_save(nraw.encode('utf-8'), confpath)
284        if reload_in == 'parent':
285            if 'KITTY_PID' in os.environ:
286                os.kill(int(os.environ['KITTY_PID']), signal.SIGUSR1)
287        elif reload_in == 'all':
288            reload_conf_in_all_kitties()
289
290
291class Themes:
292
293    def __init__(self) -> None:
294        self.themes: Dict[str, Theme] = {}
295        self.index_map: Tuple[str, ...] = ()
296
297    def __len__(self) -> int:
298        return len(self.themes)
299
300    def __iter__(self) -> Iterator[Theme]:
301        return iter(self.themes.values())
302
303    def __getitem__(self, key: Union[int, str]) -> Theme:
304        if isinstance(key, str):
305            return self.themes[key]
306        if key < 0:
307            key += len(self.index_map)
308        return self.themes[self.index_map[key]]
309
310    def load_from_zip(self, path_to_zip: str) -> None:
311        with zipfile.ZipFile(path_to_zip, 'r') as zf:
312            for name in zf.namelist():
313                if os.path.basename(name) == 'themes.json':
314                    theme_file_name = name
315                    with zf.open(theme_file_name) as f:
316                        items = json.loads(f.read())
317                    break
318            else:
319                raise ValueError(f'No themes.json found in {path_to_zip}')
320
321            for item in items:
322                t = Theme(zip_file_loader(path_to_zip, theme_file_name, item['file']))
323                t.apply_dict(item)
324                if t.name:
325                    self.themes[t.name] = t
326
327    def load_from_dir(self, path: str) -> None:
328        if not os.path.isdir(path):
329            return
330        for name in os.listdir(path):
331            if name.endswith('.conf'):
332                with open(os.path.join(path, name), 'rb') as f:
333                    raw = f.read().decode()
334                try:
335                    d = parse_theme(name, raw)
336                except (Exception, SystemExit):
337                    continue
338                t = Theme(raw.__str__)
339                t.apply_dict(d)
340                if t.name:
341                    self.themes[t.name] = t
342
343    def filtered(self, is_ok: Callable[[Theme], bool]) -> 'Themes':
344        ans = Themes()
345
346        def sort_key(k: Tuple[str, Theme]) -> str:
347            return k[1].name.lower()
348
349        ans.themes = {k: v for k, v in sorted(self.themes.items(), key=sort_key) if is_ok(v)}
350        ans.index_map = tuple(ans.themes)
351        return ans
352
353    def copy(self) -> 'Themes':
354        ans = Themes()
355        ans.themes = self.themes.copy()
356        ans.index_map = self.index_map
357        return ans
358
359    def apply_search(
360        self, expression: str, mark_before: str = MARK_BEFORE, mark_after: str = MARK_AFTER
361    ) -> Iterator[str]:
362        raw = '\n'.join(self.themes)
363        results = match(raw, expression, positions=True, level1=' ')
364        themes: Dict[str, Theme] = {}
365        for r in results:
366            pos, k = r.split(':', 1)
367            positions = tuple(map(int, pos.split(',')))
368            text = k
369            for p in reversed(positions):
370                text = text[:p] + mark_before + text[p] + mark_after + text[p+1:]
371            themes[k] = self.themes[k]
372            yield text
373        self.themes = themes
374        self.index_map = tuple(self.themes)
375
376
377def load_themes(cache_age: float = 1., ignore_no_cache: bool = False) -> Themes:
378    ans = Themes()
379    try:
380        fetched = fetch_themes(cache_age=cache_age)
381    except NoCacheFound:
382        if not ignore_no_cache:
383            raise
384    ans.load_from_zip(fetched)
385    ans.load_from_dir(os.path.join(config_dir, 'themes'))
386    ans.index_map = tuple(ans.themes)
387    return ans
388