1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
4
5import atexit
6import json
7import numbers
8import sys
9from collections import namedtuple
10from itertools import repeat
11
12from qt.core import QApplication, QEventLoop, pyqtSignal, sip
13from qt.webengine import (
14    QWebEnginePage, QWebEngineProfile, QWebEngineScript
15)
16
17from calibre import detect_ncpus as cpu_count, prints
18from calibre.ebooks.oeb.polish.check.base import ERROR, WARN, BaseError
19from calibre.gui2 import must_use_qt
20from calibre.gui2.webengine import secure_webengine
21
22
23class CSSParseError(BaseError):
24    level = ERROR
25    is_parsing_error = True
26
27
28class CSSError(BaseError):
29    level = ERROR
30
31
32class CSSWarning(BaseError):
33    level = WARN
34
35
36def as_int_or_none(x):
37    if x is not None and not isinstance(x, numbers.Integral):
38        try:
39            x = int(x)
40        except Exception:
41            x = None
42    return x
43
44
45def message_to_error(message, name, line_offset=0):
46    rule = message.get('rule', {})
47    rule_id = rule.get('id') or ''
48    cls = CSSWarning
49    if message.get('type') == 'error':
50        cls = CSSParseError if rule.get('name') == 'Parsing Errors' else CSSError
51    title = message.get('message') or _('Unknown error')
52    line = as_int_or_none(message.get('line'))
53    col = as_int_or_none(message.get('col'))
54    if col is not None:
55        col -= 1
56    if line is not None:
57        line += line_offset
58    ans = cls(title, name, line, col)
59    ans.HELP = rule.get('desc') or ''
60    ans.css_rule_id = rule_id
61    if ans.HELP and 'url' in rule:
62        ans.HELP += ' ' + _('See <a href="{}">detailed description</a>.').format(rule['url'])
63    return ans
64
65
66def csslint_js():
67    ans = getattr(csslint_js, 'ans', None)
68    if ans is None:
69        ans = csslint_js.ans = P('csslint.js', data=True, allow_user_override=False).decode('utf-8') + '''
70
71        window.check_css =  function(src) {
72            var rules = CSSLint.getRules();
73            var ruleset = {};
74            var ignored_rules = {
75                'order-alphabetical': 1,
76                'font-sizes': 1,
77                'zero-units': 1,
78                'bulletproof-font-face': 1,
79                'import': 1,
80                'box-model': 1,
81                'adjoining-classes': 1,
82                'box-sizing': 1,
83                'compatible-vendor-prefixes': 1,
84                'text-indent': 1,
85                'unique-headings': 1,
86                'fallback-colors': 1,
87                'font-faces': 1,
88                'regex-selectors': 1,
89                'universal-selector': 1,
90                'unqualified-attributes': 1,
91                'overqualified-elements': 1,
92                'shorthand': 1,
93                'duplicate-background-images': 1,
94                'floats': 1,
95                'ids': 1,
96                'gradients': 1
97            };
98            var error_rules = {
99                'known-properties': 1,
100                'duplicate-properties': 1,
101                'vendor-prefix': 1
102            };
103
104            for (var i = 0; i < rules.length; i++) {
105                var rule = rules[i];
106                if (!ignored_rules[rule.id] && rule.browsers === "All") ruleset[rule.id] = error_rules[rule.id] ? 2 : 1;
107            }
108            var result = CSSLint.verify(src, ruleset);
109            return result;
110        }
111        document.title = 'ready';
112        '''
113    return ans
114
115
116def create_profile():
117    ans = getattr(create_profile, 'ans', None)
118    if ans is None:
119        ans = create_profile.ans = QWebEngineProfile(QApplication.instance())
120        s = QWebEngineScript()
121        s.setName('csslint.js')
122        s.setSourceCode(csslint_js())
123        s.setWorldId(QWebEngineScript.ScriptWorldId.ApplicationWorld)
124        ans.scripts().insert(s)
125    return ans
126
127
128class Worker(QWebEnginePage):
129
130    work_done = pyqtSignal(object, object)
131
132    def __init__(self):
133        must_use_qt()
134        QWebEnginePage.__init__(self, create_profile(), QApplication.instance())
135        self.titleChanged.connect(self.title_changed)
136        secure_webengine(self.settings())
137        self.console_messages = []
138        self.ready = False
139        self.working = False
140        self.pending = None
141        self.setHtml('')
142
143    def title_changed(self, new_title):
144        if new_title == 'ready':
145            self.ready = True
146            if self.pending is not None:
147                self.check_css(self.pending)
148                self.pending = None
149
150    def javaScriptConsoleMessage(self, level, msg, lineno, source_id):
151        msg = '{}:{}:{}'.format(source_id, lineno, msg)
152        self.console_messages.append(msg)
153        try:
154            print(msg)
155        except Exception:
156            pass
157
158    def check_css(self, src):
159        self.working = True
160        self.console_messages = []
161        self.runJavaScript(
162            'window.check_css({})'.format(json.dumps(src)), QWebEngineScript.ScriptWorldId.ApplicationWorld, self.check_done)
163
164    def check_css_when_ready(self, src):
165        if self.ready:
166            self.check_css(src)
167        else:
168            self.working = True
169            self.pending = src
170
171    def check_done(self, result):
172        self.working = False
173        self.work_done.emit(self, result)
174
175
176class Pool:
177
178    def __init__(self):
179        self.workers = []
180        self.max_workers = cpu_count()
181
182    def add_worker(self):
183        w = Worker()
184        w.work_done.connect(self.work_done)
185        self.workers.append(w)
186
187    def check_css(self, css_sources):
188        self.pending = list(enumerate(css_sources))
189        self.results = list(repeat(None, len(css_sources)))
190        self.working = True
191        self.assign_work()
192        app = QApplication.instance()
193        while self.working:
194            app.processEvents(QEventLoop.ProcessEventsFlag.WaitForMoreEvents | QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents)
195        return self.results
196
197    def assign_work(self):
198        while self.pending:
199            if len(self.workers) < self.max_workers:
200                self.add_worker()
201            for w in self.workers:
202                if not w.working:
203                    idx, src = self.pending.pop()
204                    w.result_idx = idx
205                    w.check_css_when_ready(src)
206                    break
207            else:
208                break
209
210    def work_done(self, worker, result):
211        if not isinstance(result, dict):
212            result = worker.console_messages
213        self.results[worker.result_idx] = result
214        self.assign_work()
215        if not self.pending and not [w for w in self.workers if w.working]:
216            self.working = False
217
218    def shutdown(self):
219
220        def safe_delete(x):
221            if not sip.isdeleted(x):
222                sip.delete(x)
223
224        for i in self.workers:
225            safe_delete(i)
226        self.workers = []
227
228
229pool = Pool()
230shutdown = pool.shutdown
231atexit.register(shutdown)
232Job = namedtuple('Job', 'name css line_offset')
233
234
235def create_job(name, css, line_offset=0, is_declaration=False):
236    if is_declaration:
237        css = 'div{\n' + css + '\n}'
238        line_offset -= 1
239    return Job(name, css, line_offset)
240
241
242def check_css(jobs):
243    errors = []
244    if not jobs:
245        return errors
246    results = pool.check_css([j.css for j in jobs])
247    for job, result in zip(jobs, results):
248        if isinstance(result, dict):
249            for msg in result['messages']:
250                err = message_to_error(msg, job.name, job.line_offset)
251                if err is not None:
252                    errors.append(err)
253        elif isinstance(result, list) and result:
254            errors.append(CSSParseError(_('Failed to process CSS in {name} with errors: {errors}').format(
255                name=job.name, errors='\n'.join(result)), job.name))
256        else:
257            errors.append(CSSParseError(_('Failed to process CSS in {name}').format(name=job.name), job.name))
258    return errors
259
260
261def main():
262    with open(sys.argv[-1], 'rb') as f:
263        css = f.read().decode('utf-8')
264    errors = check_css([create_job(sys.argv[-1], css)])
265    for error in errors:
266        prints(error)
267
268
269if __name__ == '__main__':
270    try:
271        main()
272    finally:
273        shutdown()
274