1# Copyright (c) 2017-2021, Manfred Moitzi
2# License: MIT License
3from typing import (
4    TYPE_CHECKING, Iterable, List, Set, TextIO, Any, Dict, Optional, Callable,
5)
6import sys
7from enum import IntEnum
8from ezdxf.lldxf import const, validator
9from ezdxf.entities import factory, DXFEntity
10from ezdxf.math import NULLVEC
11from ezdxf.sections.table import table_key
12
13if TYPE_CHECKING:
14    from ezdxf.eztypes import (
15        DXFEntity, Drawing, DXFGraphic, BlocksSection, EntityDB,
16    )
17
18__all__ = ['Auditor', 'AuditError', 'audit', 'BlockCycleDetector']
19
20
21class AuditError(IntEnum):
22    # DXF structure errors:
23    MISSING_REQUIRED_ROOT_DICT_ENTRY = 1
24    DUPLICATE_TABLE_ENTRY_NAME = 2
25    POINTER_TARGET_NOT_EXIST = 3
26    TABLE_NOT_FOUND = 4
27    MISSING_SECTION_TAG = 5
28    MISSING_SECTION_NAME_TAG = 6
29    MISSING_ENDSEC_TAG = 7
30    FOUND_TAG_OUTSIDE_SECTION = 8
31    REMOVED_UNSUPPORTED_SECTION = 9
32    REMOVED_UNSUPPORTED_TABLE = 10
33
34    UNDEFINED_LINETYPE = 100
35    UNDEFINED_DIMENSION_STYLE = 101
36    UNDEFINED_TEXT_STYLE = 102
37    UNDEFINED_BLOCK = 103
38    INVALID_BLOCK_REFERENCE_CYCLE = 104
39    REMOVE_EMPTY_GROUP = 105
40    GROUP_ENTITIES_IN_DIFFERENT_LAYOUTS = 106
41    MISSING_REQUIRED_SEQEND = 107
42    ORPHANED_LAYOUT_ENTITY = 108
43    ORPHANED_PAPER_SPACE_BLOCK_RECORD_ENTITY = 109
44    INVALID_TABLE_HANDLE = 110
45    DECODING_ERROR = 111
46    CREATED_MISSING_OBJECT = 112
47    RESET_MLINE_STYLE = 113
48
49    # DXF entity property errors:
50    INVALID_ENTITY_HANDLE = 201
51    INVALID_OWNER_HANDLE = 202
52    INVALID_LAYER_NAME = 203
53    INVALID_COLOR_INDEX = 204
54    INVALID_LINEWEIGHT = 205
55    INVALID_MLINESTYLE_HANDLE = 206
56
57    # DXF entity geometry or content errors:
58    INVALID_EXTRUSION_VECTOR = 210
59    INVALID_MAJOR_AXIS = 211
60    INVALID_VERTEX_COUNT = 212
61    INVALID_DICTIONARY_ENTRY = 213
62    INVALID_CHARACTER = 214
63    INVALID_MLINE_VERTEX = 215
64    INVALID_MLINESTYLE_ELEMENT_COUNT = 216
65    INVALID_SPLINE_DEFINITION = 217
66    INVALID_SPLINE_CONTROL_POINT_COUNT = 218
67    INVALID_SPLINE_FIT_POINT_COUNT = 219
68    INVALID_SPLINE_KNOT_VALUE_COUNT = 220
69    INVALID_SPLINE_WEIGHT_COUNT = 221
70
71
72REQUIRED_ROOT_DICT_ENTRIES = ('ACAD_GROUP', 'ACAD_PLOTSTYLENAME')
73
74
75class ErrorEntry:
76    def __init__(self, code: int, message: str = '',
77                 dxf_entity: 'DXFEntity' = None, data: Any = None):
78        self.code: int = code  # error code AuditError()
79        self.entity: 'DXFEntity' = dxf_entity  # source entity of error
80        self.message: str = message  # error message
81        self.data: Any = data  # additional data as an arbitrary object
82
83
84class Auditor:
85    def __init__(self, doc: Optional['Drawing']):
86        self.doc: Optional['Drawing'] = doc
87        self._rootdict_handle = doc.rootdict.dxf.handle if doc else '0'
88        self.errors: List[ErrorEntry] = []
89        self.fixes: List[ErrorEntry] = []
90        self._trashcan: Optional['EntityDB.Trashcan'] = \
91            doc.entitydb.new_trashcan() if doc else None
92        self._post_audit_jobs = []
93
94    def reset(self) -> None:
95        self.errors = []
96        self.fixes = []
97        self.empty_trashcan()
98
99    def __len__(self) -> int:
100        """ Returns count of unfixed errors. """
101        return len(self.errors)
102
103    def __bool__(self) -> bool:
104        """ Returns ``True`` if any unfixed errors exist. """
105        return self.__len__() > 0
106
107    def __iter__(self) -> Iterable[ErrorEntry]:
108        """ Iterate over all unfixed errors. """
109        return iter(self.errors)
110
111    @property
112    def entitydb(self):
113        if self.doc:
114            return self.doc.entitydb
115        else:
116            return None
117
118    @property
119    def has_errors(self) -> bool:
120        return bool(self.errors)
121
122    @property
123    def has_fixes(self) -> bool:
124        return bool(self.fixes)
125
126    def print_error_report(self, errors: List[ErrorEntry] = None,
127                           stream: TextIO = None) -> None:
128        def entity_str(count, code, entity):
129            if entity is not None:
130                return f"{count:4d}. Issue [{code}] in {str(entity)}."
131            else:
132                return f"{count:4d}. Issue [{code}]."
133
134        if errors is None:
135            errors = self.errors
136        else:
137            errors = list(errors)
138
139        if stream is None:
140            stream = sys.stdout
141
142        if len(errors) == 0:
143            stream.write('No issues found.\n\n')
144        else:
145            stream.write(f'{len(errors)} issues found.\n\n')
146            for count, error in enumerate(errors):
147                stream.write(
148                    entity_str(count + 1, error.code, error.entity) + '\n')
149                stream.write('   ' + error.message + '\n\n')
150
151    def print_fixed_errors(self, stream: TextIO = None) -> None:
152        def entity_str(count, code, entity):
153            if entity is not None:
154                return f"{count:4d}. Issue [{code}] fixed in {str(entity)}."
155            else:
156                return f"{count:4d}. Issue [{code}] fixed."
157
158        if stream is None:
159            stream = sys.stdout
160
161        if len(self.fixes) == 0:
162            stream.write('No issues fixed.\n\n')
163        else:
164            stream.write(f'{len(self.fixes)} issues fixed.\n\n')
165            for count, error in enumerate(self.fixes):
166                stream.write(
167                    entity_str(count + 1, error.code, error.entity) + '\n')
168                stream.write('   ' + error.message + '\n\n')
169
170    def add_error(self, code: int, message: str = '',
171                  dxf_entity: 'DXFEntity' = None, data: Any = None) -> None:
172        self.errors.append(ErrorEntry(code, message, dxf_entity, data))
173
174    def fixed_error(self, code: int, message: str = '',
175                    dxf_entity: 'DXFEntity' = None, data: Any = None) -> None:
176        self.fixes.append(ErrorEntry(code, message, dxf_entity, data))
177
178    def purge(self, codes: Set[int]):
179        """ Remove error messages defined by integer error `codes`.
180
181        This is useful to remove errors which are not important for a specific
182        file usage.
183
184        """
185        self.errors = [err for err in self.errors if err.code in codes]
186
187    def run(self) -> List[ErrorEntry]:
188        # Check database integrity:
189        self.doc.entitydb.audit(self)
190        self.check_root_dict()
191        self.check_tables()
192        self.audit_all_database_entities()
193        self.doc.groups.audit(self)
194        self.check_block_reference_cycles()
195        self.doc.layouts.audit(self)
196        self.empty_trashcan()
197        return self.errors
198
199    def empty_trashcan(self):
200        if self.has_trashcan:
201            self._trashcan.clear()
202
203    def trash(self, entity: 'DXFEntity') -> None:
204        if entity is None or not entity.is_alive:
205            return
206        if self.has_trashcan:
207            self._trashcan.add(entity.dxf.handle)
208        else:
209            entity.destroy()
210
211    @property
212    def has_trashcan(self) -> bool:
213        return self._trashcan is not None
214
215    def add_post_audit_job(self, job: Callable):
216        self._post_audit_jobs.append(job)
217
218    def check_root_dict(self) -> None:
219        root_dict = self.doc.rootdict
220        for name in REQUIRED_ROOT_DICT_ENTRIES:
221            if name not in root_dict:
222                self.add_error(
223                    code=AuditError.MISSING_REQUIRED_ROOT_DICT_ENTRY,
224                    message=f'Missing root dict entry: {name}',
225                    dxf_entity=root_dict,
226                )
227
228    def check_tables(self) -> None:
229        def fix_table_head(table):
230            head = table.head
231            # Another exception for an invalid owner tag, but this usage is
232            # covered in Auditor.check_owner_exist():
233            head.dxf.owner = '0'
234            handle = head.dxf.handle
235            if handle is None or handle == '0':
236                # Entity database does not assign new handle:
237                head.dxf.handle = self.entitydb.next_handle()
238                self.entitydb.add(head)
239                self.fixed_error(
240                    code=AuditError.INVALID_TABLE_HANDLE,
241                    message=f'Fixed invalid table handle in {table.name}',
242                )
243            # Just to be sure owner handle is valid in every circumstance:
244            table.update_owner_handles()
245
246        table_section = self.doc.tables
247        fix_table_head(table_section.viewports)
248        fix_table_head(table_section.linetypes)
249        fix_table_head(table_section.layers)
250        fix_table_head(table_section.styles)
251        fix_table_head(table_section.views)
252        fix_table_head(table_section.ucs)
253        fix_table_head(table_section.appids)
254        fix_table_head(table_section.dimstyles)
255        fix_table_head(table_section.block_records)
256
257    def audit_all_database_entities(self) -> None:
258        """ Audit all entities stored in the entity database. """
259        # Destruction of entities can occur while auditing.
260        # Best practice to delete entities is to move them into the trashcan:
261        # Auditor.trash(entity)
262        db = self.doc.entitydb
263        db.locked = True
264        # To create new entities while auditing, add a post audit job by calling
265        # Auditor.app_post_audit_job() with a callable object or
266        # function as argument.
267        self._post_audit_jobs = []
268        for entity in db.values():
269            if entity.is_alive:
270                entity.audit(self)
271        db.locked = False
272        self.empty_trashcan()
273        self.exec_post_audit_jobs()
274
275    def exec_post_audit_jobs(self):
276        for call in self._post_audit_jobs:
277            call()
278        self._post_audit_jobs = []
279
280    def check_entity_linetype(self, entity: 'DXFEntity') -> None:
281        """ Check for usage of undefined line types. AutoCAD does not load
282        DXF files with undefined line types.
283        """
284        assert self.doc is entity.doc, 'Entity from different DXF document.'
285        if not entity.dxf.hasattr('linetype'):
286            return
287        linetype = table_key(entity.dxf.linetype)
288        # No table entry in linetypes required:
289        if linetype in ('bylayer', 'byblock'):
290            return
291
292        if linetype not in self.doc.linetypes:
293            # Defaults to 'BYLAYER'
294            entity.dxf.discard('linetype')
295            self.fixed_error(
296                code=AuditError.UNDEFINED_LINETYPE,
297                message=f'Removed undefined linetype {linetype} in {str(entity)}',
298                dxf_entity=entity,
299                data=linetype,
300            )
301
302    def check_text_style(self, entity: 'DXFEntity') -> None:
303        """ Check for usage of undefined text styles. """
304        assert self.doc is entity.doc, 'Entity from different DXF document.'
305        if not entity.dxf.hasattr('style'):
306            return
307        style = entity.dxf.style
308        if style not in self.doc.styles:
309            # Defaults to 'Standard'
310            entity.dxf.discard('style')
311            self.fixed_error(
312                code=AuditError.UNDEFINED_TEXT_STYLE,
313                message=f'Removed undefined text style "{style}" from {str(entity)}.',
314                dxf_entity=entity,
315                data=style,
316            )
317
318    def check_dimension_style(self, entity: 'DXFGraphic') -> None:
319        """  Check for usage of undefined dimension styles. """
320        assert self.doc is entity.doc, 'Entity from different DXF document.'
321        if not entity.dxf.hasattr('dimstyle'):
322            return
323        dimstyle = entity.dxf.dimstyle
324        if dimstyle not in self.doc.dimstyles:
325            # The dimstyle attribute is not optional:
326            entity.dxf.dimstyle = 'Standard'
327            self.fixed_error(
328                code=AuditError.UNDEFINED_DIMENSION_STYLE,
329                message=f'Replaced undefined dimstyle "{dimstyle}" in '
330                        f'{str(entity)} by "Standard".',
331                dxf_entity=entity,
332                data=dimstyle,
333            )
334
335    def check_for_valid_layer_name(self, entity: 'DXFEntity') -> None:
336        """ Check layer names for invalid characters: <>/\":;?*|=' """
337        name = entity.dxf.layer
338        if not validator.is_valid_layer_name(name):
339            # This error can't be fixed !?
340            self.add_error(
341                code=AuditError.INVALID_LAYER_NAME,
342                message=f'Invalid layer name "{name}" in {str(entity)}',
343                dxf_entity=entity,
344                data=name,
345            )
346
347    def check_entity_color_index(self, entity: 'DXFGraphic') -> None:
348        color = entity.dxf.color
349        # 0 == BYBLOCK
350        # 256 == BYLAYER
351        # 257 == BYOBJECT
352        if color < 0 or color > 257:
353            entity.dxf.discard('color')
354            self.fixed_error(
355                code=AuditError.INVALID_COLOR_INDEX,
356                message=f'Removed invalid color index of {str(entity)}.',
357                dxf_entity=entity,
358                data=color,
359            )
360
361    def check_entity_lineweight(self, entity: 'DXFGraphic') -> None:
362        weight = entity.dxf.lineweight
363        if weight not in const.VALID_DXF_LINEWEIGHT_VALUES:
364            entity.dxf.lineweight = validator.fix_lineweight(weight)
365            self.fixed_error(
366                code=AuditError.INVALID_LINEWEIGHT,
367                message=f'Fixed invalid lineweight of {str(entity)}.',
368                dxf_entity=entity,
369            )
370
371    def check_owner_exist(self, entity: 'DXFEntity') -> None:
372        assert self.doc is entity.doc, 'Entity from different DXF document.'
373        if not entity.dxf.hasattr('owner'):
374            return
375        doc = self.doc
376        owner_handle = entity.dxf.owner
377        handle = entity.dxf.get('handle', '0')
378        if owner_handle == '0':
379            # Root-Dictionary or Table-Head:
380            if handle == self._rootdict_handle or entity.dxftype() == 'TABLE':
381                return  # '0' handle as owner is valid
382        if owner_handle not in doc.entitydb:
383            if handle == self._rootdict_handle:
384                entity.dxf.owner = '0'
385                self.fixed_error(
386                    code=AuditError.INVALID_OWNER_HANDLE,
387                    message=f'Fixed invalid owner handle in root {str(self)}.',
388                )
389            elif entity.dxftype() == 'TABLE':
390                name = entity.dxf.get('name', 'UNKNOWN')
391                entity.dxf.owner = '0'
392                self.fixed_error(
393                    code=AuditError.INVALID_OWNER_HANDLE,
394                    message=f'Fixed invalid owner handle for {name} table.',
395                )
396            else:
397                self.fixed_error(
398                    code=AuditError.INVALID_OWNER_HANDLE,
399                    message=f'Deleted {str(entity)} entity with invalid owner '
400                            f'handle #{owner_handle}.',
401                )
402                self.trash(doc.entitydb.get(handle))
403
404    def check_extrusion_vector(self, entity: 'DXFEntity') -> None:
405        if NULLVEC.isclose(entity.dxf.extrusion):
406            entity.dxf.discard('extrusion')
407            self.fixed_error(
408                code=AuditError.INVALID_EXTRUSION_VECTOR,
409                message=f'Fixed extrusion vector for entity: {str(self)}.',
410                dxf_entity=entity,
411            )
412
413    def check_block_reference_cycles(self) -> None:
414        cycle_detector = BlockCycleDetector(self.doc)
415        for block in self.doc.blocks:
416            if cycle_detector.has_cycle(block.name):
417                self.add_error(
418                    code=AuditError.INVALID_BLOCK_REFERENCE_CYCLE,
419                    message=f'Invalid block reference cycle detected in '
420                            f'block "{block.name}".',
421                    dxf_entity=block.block_record,
422                )
423
424
425class BlockCycleDetector:
426    def __init__(self, doc: 'Drawing'):
427        self.key = doc.blocks.key
428        self.blocks = self._build_block_ledger(doc.blocks)
429
430    def _build_block_ledger(
431            self, blocks: 'BlocksSection') -> Dict[str, Set[str]]:
432        ledger = dict()
433        for block in blocks:
434            inserts = {self.key(insert.dxf.name) for insert in
435                       block.query('INSERT')}
436            ledger[self.key(block.name)] = inserts
437        return ledger
438
439    def has_cycle(self, block_name: str) -> bool:
440        def check(name):
441            # block 'name' does not exist: ignore this error, because it is not
442            # the task of this method to detect not existing block definitions
443            try:
444                inserts = self.blocks[name]
445            except KeyError:
446                return False  # Not existing blocks can't create cycles.
447            path.append(name)
448            for n in inserts:
449                if n in path:
450                    return True
451                elif check(n):
452                    return True
453            path.pop()
454            return False
455
456        path = []
457        block_name = self.key(block_name)
458        return check(block_name)
459
460
461def audit(entity: 'DXFEntity', doc: 'Drawing') -> Auditor:
462    """ Setup an :class:`Auditor` object, run the audit process for `entity`
463    and return result as :class:`Auditor` object.
464
465    Args:
466        entity: DXF entity to validate
467        doc: bounded DXF document of `entity`
468
469    """
470    if not entity.is_alive:
471        raise TypeError('Entity is destroyed.')
472
473    # Validation of unbound entities is possible, but it is not useful
474    # to validate entities against a different DXF document:
475    if entity.dxf.handle is not None and not factory.is_bound(entity, doc):
476        raise ValueError('Entity is bound to different DXF document.')
477
478    auditor = Auditor(doc)
479    entity.audit(auditor)
480    return auditor
481