1# -*- coding: utf-8 -*-
2
3# Copyright (c) 2011 - 2021 Detlev Offenbach <detlev@die-offenbachs.de>
4#
5
6"""
7Module implementing the code style checker.
8"""
9
10import queue
11import ast
12import sys
13import multiprocessing
14import contextlib
15
16import pycodestyle
17from Naming.NamingStyleChecker import NamingStyleChecker
18
19# register the name checker
20pycodestyle.register_check(NamingStyleChecker, NamingStyleChecker.Codes)
21
22from DocStyle.DocStyleChecker import DocStyleChecker
23from Miscellaneous.MiscellaneousChecker import MiscellaneousChecker
24from Complexity.ComplexityChecker import ComplexityChecker
25from Security.SecurityChecker import SecurityChecker
26from PathLib.PathlibChecker import PathlibChecker
27from Simplify.SimplifyChecker import SimplifyChecker
28
29
30def initService():
31    """
32    Initialize the service and return the entry point.
33
34    @return the entry point for the background client (function)
35    """
36    return codeStyleCheck
37
38
39def initBatchService():
40    """
41    Initialize the batch service and return the entry point.
42
43    @return the entry point for the background client (function)
44    """
45    return codeStyleBatchCheck
46
47
48class CodeStyleCheckerReport(pycodestyle.BaseReport):
49    """
50    Class implementing a special report to be used with our dialog.
51    """
52    def __init__(self, options):
53        """
54        Constructor
55
56        @param options options for the report (optparse.Values)
57        """
58        super().__init__(options)
59
60        self.__repeat = options.repeat
61        self.errors = []
62
63    def error_args(self, line_number, offset, code, check, *args):
64        """
65        Public method to collect the error messages.
66
67        @param line_number line number of the issue (integer)
68        @param offset position within line of the issue (integer)
69        @param code message code (string)
70        @param check reference to the checker function (function)
71        @param args arguments for the message (list)
72        @return error code (string)
73        """
74        code = super().error_args(
75            line_number, offset, code, check, *args)
76        if code and (self.counters[code] == 1 or self.__repeat):
77            self.errors.append(
78                {
79                    "file": self.filename,
80                    "line": line_number,
81                    "offset": offset,
82                    "code": code,
83                    "args": args,
84                }
85            )
86        return code
87
88
89def extractLineFlags(line, startComment="#", endComment="", flagsLine=False):
90    """
91    Function to extract flags starting and ending with '__' from a line
92    comment.
93
94    @param line line to extract flags from (string)
95    @param startComment string identifying the start of the comment (string)
96    @param endComment string identifying the end of a comment (string)
97    @param flagsLine flag indicating to check for a flags only line (bool)
98    @return list containing the extracted flags (list of strings)
99    """
100    flags = []
101
102    if not flagsLine or (
103       flagsLine and line.strip().startswith(startComment)):
104        pos = line.rfind(startComment)
105        if pos >= 0:
106            comment = line[pos + len(startComment):].strip()
107            if endComment:
108                endPos = line.rfind(endComment)
109                if endPos >= 0:
110                    comment = comment[:endPos]
111            flags = [f.strip() for f in comment.split()
112                     if (f.startswith("__") and f.endswith("__"))]
113            flags += [f.strip().lower() for f in comment.split()
114                      if f in ("noqa", "NOQA",
115                               "nosec", "NOSEC",
116                               "secok", "SECOK")]
117    return flags
118
119
120def ignoreCode(code, lineFlags):
121    """
122    Function to check, if the given code should be ignored as per line flags.
123
124    @param code error code to be checked
125    @type str
126    @param lineFlags list of line flags to check against
127    @type list of str
128    @return flag indicating to ignore the code
129    @rtype bool
130    """
131    if lineFlags:
132
133        if (
134            "__IGNORE_WARNING__" in lineFlags or
135            "noqa" in lineFlags or
136            "nosec" in lineFlags
137        ):
138            # ignore all warning codes
139            return True
140
141        for flag in lineFlags:
142            # check individual warning code
143            if flag.startswith("__IGNORE_WARNING_"):
144                ignoredCode = flag[2:-2].rsplit("_", 1)[-1]
145                if code.startswith(ignoredCode):
146                    return True
147
148    return False
149
150
151def securityOk(code, lineFlags):
152    """
153    Function to check, if the given code is an acknowledged security report.
154
155    @param code error code to be checked
156    @type str
157    @param lineFlags list of line flags to check against
158    @type list of str
159    @return flag indicating an acknowledged security report
160    @rtype bool
161    """
162    if lineFlags:
163        return "secok" in lineFlags
164
165    return False
166
167
168def codeStyleCheck(filename, source, args):
169    """
170    Do the code style check and/or fix found errors.
171
172    @param filename source filename
173    @type str
174    @param source string containing the code to check
175    @type str
176    @param args arguments used by the codeStyleCheck function (list of
177        excludeMessages, includeMessages, repeatMessages, fixCodes,
178        noFixCodes, fixIssues, maxLineLength, maxDocLineLength, blankLines,
179        hangClosing, docType, codeComplexityArgs, miscellaneousArgs, errors,
180        eol, encoding, backup)
181    @type list of (str, str, bool, str, str, bool, int, list of (int, int),
182        bool, str, dict, dict, list of str, str, str, bool)
183    @return tuple of statistics (dict) and list of results (tuple for each
184        found violation of style (lineno, position, text, ignored, fixed,
185        autofixing, fixedMsg))
186    @rtype tuple of (dict, list of tuples of (int, int, str, bool, bool, bool,
187        str))
188    """
189    return __checkCodeStyle(filename, source, args)
190
191
192def codeStyleBatchCheck(argumentsList, send, fx, cancelled, maxProcesses=0):
193    """
194    Module function to check code style for a batch of files.
195
196    @param argumentsList list of arguments tuples as given for codeStyleCheck
197    @type list
198    @param send reference to send function
199    @type func
200    @param fx registered service name
201    @type str
202    @param cancelled reference to function checking for a cancellation
203    @type func
204    @param maxProcesses number of processes to be used
205    @type int
206    """
207    if maxProcesses == 0:
208        # determine based on CPU count
209        try:
210            NumberOfProcesses = multiprocessing.cpu_count()
211            if NumberOfProcesses >= 1:
212                NumberOfProcesses -= 1
213        except NotImplementedError:
214            NumberOfProcesses = 1
215    else:
216        NumberOfProcesses = maxProcesses
217
218    # Create queues
219    taskQueue = multiprocessing.Queue()
220    doneQueue = multiprocessing.Queue()
221
222    # Submit tasks (initially two time number of processes
223    initialTasks = 2 * NumberOfProcesses
224    for task in argumentsList[:initialTasks]:
225        taskQueue.put(task)
226
227    # Start worker processes
228    for _ in range(NumberOfProcesses):
229        multiprocessing.Process(
230            target=worker, args=(taskQueue, doneQueue)
231        ).start()
232
233    # Get and send results
234    endIndex = len(argumentsList) - initialTasks
235    for i in range(len(argumentsList)):
236        resultSent = False
237        wasCancelled = False
238
239        while not resultSent:
240            try:
241                # get result (waiting max. 3 seconds and send it to frontend
242                filename, result = doneQueue.get(timeout=3)
243                send(fx, filename, result)
244                resultSent = True
245            except queue.Empty:
246                # ignore empty queue, just carry on
247                if cancelled():
248                    wasCancelled = True
249                    break
250
251        if wasCancelled or cancelled():
252            # just exit the loop ignoring the results of queued tasks
253            break
254
255        if i < endIndex:
256            taskQueue.put(argumentsList[i + initialTasks])
257
258    # Tell child processes to stop
259    for _ in range(NumberOfProcesses):
260        taskQueue.put('STOP')
261
262
263def worker(inputQueue, outputQueue):
264    """
265    Module function acting as the parallel worker for the style check.
266
267    @param inputQueue input queue (multiprocessing.Queue)
268    @param outputQueue output queue (multiprocessing.Queue)
269    """
270    for filename, source, args in iter(inputQueue.get, 'STOP'):
271        result = __checkCodeStyle(filename, source, args)
272        outputQueue.put((filename, result))
273
274
275def __checkSyntax(filename, source):
276    """
277    Private module function to perform a syntax check.
278
279    @param filename source filename
280    @type str
281    @param source string containing the code to check
282    @type str
283    @return tuple containing the error dictionary with syntax error details,
284        a statistics dictionary and None or a tuple containing two None and
285        the generated AST tree
286    @rtype tuple of (dict, dict, None) or tuple of (None, None, ast.Module)
287    """
288    src = "".join(source)
289
290    try:
291        tree = (
292            ast.parse(src, filename, 'exec', type_comments=True)
293            # need the 'type_comments' parameter to include type annotations
294            if sys.version_info >= (3, 8) else
295            ast.parse(src, filename, 'exec')
296        )
297        return None, None, tree
298    except (SyntaxError, TypeError):
299        exc_type, exc = sys.exc_info()[:2]
300        if len(exc.args) > 1:
301            offset = exc.args[1]
302            if len(offset) > 2:
303                offset = offset[1:3]
304        else:
305            offset = (1, 0)
306        return (
307            {
308                "file": filename,
309                "line": offset[0],
310                "offset": offset[1],
311                "code": "E901",
312                "args": [exc_type.__name__, exc.args[0]],
313            }, {
314                "E901": 1,
315            },
316            None
317        )
318
319
320def __checkCodeStyle(filename, source, args):
321    """
322    Private module function to perform the code style check and/or fix
323    found errors.
324
325    @param filename source filename
326    @type str
327    @param source string containing the code to check
328    @type str
329    @param args arguments used by the codeStyleCheck function (list of
330        excludeMessages, includeMessages, repeatMessages, fixCodes,
331        noFixCodes, fixIssues, maxLineLength, maxDocLineLength, blankLines,
332        hangClosing, docType, codeComplexityArgs, miscellaneousArgs,
333        annotationArgs, securityArgs, errors, eol, encoding, backup)
334    @type list of (str, str, bool, str, str, bool, int, list of (int, int),
335        bool, str, dict, dict, dict, list of str, str, str, bool)
336    @return tuple of statistics data and list of result dictionaries with
337        keys:
338        <ul>
339        <li>file: file name</li>
340        <li>line: line_number</li>
341        <li>offset: offset within line</li>
342        <li>code: message code</li>
343        <li>args: list of arguments to format the message</li>
344        <li>ignored: flag indicating this issue was ignored</li>
345        <li>fixed: flag indicating this issue was fixed</li>
346        <li>autofixing: flag indicating that a fix can be done</li>
347        <li>fixcode: message code for the fix</li>
348        <li>fixargs: list of arguments to format the fix message</li>
349        </ul>
350    @rtype tuple of (dict, list of dict)
351    """
352    (excludeMessages, includeMessages, repeatMessages, fixCodes, noFixCodes,
353     fixIssues, maxLineLength, maxDocLineLength, blankLines, hangClosing,
354     docType, codeComplexityArgs, miscellaneousArgs, annotationArgs,
355     securityArgs, errors, eol, encoding, backup) = args
356
357    stats = {}
358
359    if fixIssues:
360        from CodeStyleFixer import CodeStyleFixer
361        fixer = CodeStyleFixer(
362            filename, source, fixCodes, noFixCodes,
363            maxLineLength, blankLines, True, eol, backup)
364        # always fix in place
365    else:
366        fixer = None
367
368    if not errors:
369        if includeMessages:
370            select = [s.strip() for s in
371                      includeMessages.split(',') if s.strip()]
372        else:
373            select = []
374        if excludeMessages:
375            ignore = [i.strip() for i in
376                      excludeMessages.split(',') if i.strip()]
377        else:
378            ignore = []
379
380        syntaxError, syntaxStats, tree = __checkSyntax(filename, source)
381
382        # perform the checks only, if syntax is ok and AST tree was generated
383        if tree:
384            # check coding style
385            pycodestyle.BLANK_LINES_CONFIG = {
386                # Top level class and function.
387                'top_level': blankLines[0],
388                # Methods and nested class and function.
389                'method': blankLines[1],
390            }
391            styleGuide = pycodestyle.StyleGuide(
392                reporter=CodeStyleCheckerReport,
393                repeat=repeatMessages,
394                select=select,
395                ignore=ignore,
396                max_line_length=maxLineLength,
397                max_doc_length=maxDocLineLength,
398                hang_closing=hangClosing,
399            )
400            report = styleGuide.check_files([filename])
401            stats.update(report.counters)
402            errors = report.errors
403
404            # check documentation style
405            docStyleChecker = DocStyleChecker(
406                source, filename, select, ignore, [], repeatMessages,
407                maxLineLength=maxDocLineLength, docType=docType)
408            docStyleChecker.run()
409            stats.update(docStyleChecker.counters)
410            errors += docStyleChecker.errors
411
412            # miscellaneous additional checks
413            miscellaneousChecker = MiscellaneousChecker(
414                source, filename, tree, select, ignore, [], repeatMessages,
415                miscellaneousArgs)
416            miscellaneousChecker.run()
417            stats.update(miscellaneousChecker.counters)
418            errors += miscellaneousChecker.errors
419
420            # check code complexity
421            complexityChecker = ComplexityChecker(
422                source, filename, tree, select, ignore, codeComplexityArgs)
423            complexityChecker.run()
424            stats.update(complexityChecker.counters)
425            errors += complexityChecker.errors
426
427            # check function annotations
428            if sys.version_info >= (3, 8, 0):
429                # annotations with type comments are supported from
430                # Python 3.8 on
431                from Annotations.AnnotationsChecker import AnnotationsChecker
432                annotationsChecker = AnnotationsChecker(
433                    source, filename, tree, select, ignore, [], repeatMessages,
434                    annotationArgs)
435                annotationsChecker.run()
436                stats.update(annotationsChecker.counters)
437                errors += annotationsChecker.errors
438
439            # check for security issues
440            securityChecker = SecurityChecker(
441                source, filename, tree, select, ignore, [], repeatMessages,
442                securityArgs)
443            securityChecker.run()
444            stats.update(securityChecker.counters)
445            errors += securityChecker.errors
446
447            # check for pathlib usage
448            pathlibChecker = PathlibChecker(
449                source, filename, tree, select, ignore, [], repeatMessages)
450            pathlibChecker.run()
451            stats.update(pathlibChecker.counters)
452            errors += pathlibChecker.errors
453
454            # check for code simplifications
455            simplifyChecker = SimplifyChecker(
456                source, filename, tree, select, ignore, [], repeatMessages)
457            simplifyChecker.run()
458            stats.update(simplifyChecker.counters)
459            errors += simplifyChecker.errors
460
461        elif syntaxError:
462            errors = [syntaxError]
463            stats.update(syntaxStats)
464
465    errorsDict = {}
466    for error in errors:
467        if error["line"] > len(source):
468            error["line"] = len(source)
469        # inverse processing of messages and fixes
470        errorLine = errorsDict.setdefault(error["line"], [])
471        errorLine.append((error["offset"], error))
472    deferredFixes = {}
473    results = []
474    for lineno, errorsList in errorsDict.items():
475        errorsList.sort(key=lambda x: x[0], reverse=True)
476        for _, error in errorsList:
477            error.update({
478                "ignored": False,
479                "fixed": False,
480                "autofixing": False,
481                "fixcode": "",
482                "fixargs": [],
483                "securityOk": False,
484            })
485
486            if source:
487                code = error["code"]
488                lineFlags = extractLineFlags(source[lineno - 1].strip())
489                with contextlib.suppress(IndexError):
490                    lineFlags += extractLineFlags(source[lineno].strip(),
491                                                  flagsLine=True)
492
493                if securityOk(code, lineFlags):
494                    error["securityOk"] = True
495
496                if ignoreCode(code, lineFlags):
497                    error["ignored"] = True
498                else:
499                    if fixer:
500                        res, fixcode, fixargs, id_ = fixer.fixIssue(
501                            lineno, error["offset"], code)
502                        if res == -1:
503                            deferredFixes[id_] = error
504                        else:
505                            error.update({
506                                "fixed": res == 1,
507                                "autofixing": True,
508                                "fixcode": fixcode,
509                                "fixargs": fixargs,
510                            })
511
512            results.append(error)
513
514    if fixer:
515        deferredResults = fixer.finalize()
516        for id_ in deferredResults:
517            fixed, fixcode, fixargs = deferredResults[id_]
518            error = deferredFixes[id_]
519            error.update({
520                "ignored": False,
521                "fixed": fixed == 1,
522                "autofixing": True,
523                "fixcode": fixcode,
524                "fixargs": fixargs,
525            })
526
527        saveError = fixer.saveFile(encoding)
528        if saveError:
529            for error in results:
530                error.update({
531                    "fixcode": saveError[0],
532                    "fixargs": saveError[1],
533                })
534
535    return stats, results
536