1# Copyright (c) 2020, Manfred Moitzi 2# License: MIT License 3from typing import ( 4 Iterable, cast, BinaryIO, Tuple, Dict, Optional, List, Set, Union, 5) 6from io import StringIO 7from pathlib import Path 8from ezdxf.lldxf.const import DXFStructureError 9from ezdxf.lldxf.extendedtags import ExtendedTags, DXFTag 10from ezdxf.lldxf.tagwriter import TagWriter 11from ezdxf.lldxf.tagger import tag_compiler, ascii_tags_loader 12from ezdxf.filemanagement import dxf_file_info 13from ezdxf.lldxf import fileindex 14 15from ezdxf.entities import DXFGraphic, DXFEntity 16from ezdxf.entities import factory 17from ezdxf.entities.subentity import entity_linker 18from ezdxf.tools.codepage import toencoding 19 20__all__ = ['opendxf', 'single_pass_modelspace', 'modelspace'] 21 22SUPPORTED_TYPES = { 23 'ARC', 'LINE', 'CIRCLE', 'ELLIPSE', 'POINT', 'LWPOLYLINE', 'SPLINE', 24 '3DFACE', 'SOLID', 'TRACE', 'SHAPE', 'POLYLINE', 'VERTEX', 'SEQEND', 'MESH', 25 'TEXT', 'MTEXT', 'HATCH', 'INSERT', 'ATTRIB', 'ATTDEF', 'RAY', 'XLINE', 26 'DIMENSION', 'LEADER', 'IMAGE', 'WIPEOUT', 'HELIX', 'MLINE', 'MLEADER', 27} 28 29Filename = Union[Path, str] 30 31 32class IterDXF: 33 """ Iterator for DXF entities stored in the modelspace. 34 35 Args: 36 name: filename, has to be a seekable file. 37 errors: specify decoding error handler 38 39 - "surrogateescape" to preserve possible binary data (default) 40 - "ignore" to use the replacement char U+FFFD "\ufffd" for invalid data 41 - "strict" to raise an :class:`UnicodeDecodeError`exception for invalid data 42 43 Raises: 44 DXFStructureError: invalid or incomplete DXF file 45 UnicodeDecodeError: if `errors` is "strict" and a decoding error occurs 46 47 """ 48 49 def __init__(self, name: Filename, errors: str = 'surrogateescape'): 50 self.structure, self.sections = self._load_index(name) 51 self.errors = errors 52 self.file: BinaryIO = open(name, mode='rb') 53 if 'ENTITIES' not in self.sections: 54 raise DXFStructureError('ENTITIES section not found.') 55 if self.structure.version > 'AC1009' and 'OBJECTS' not in self.sections: 56 raise DXFStructureError('OBJECTS section not found.') 57 58 def _load_index(self, name: str) -> Tuple[ 59 fileindex.FileStructure, Dict[str, int]]: 60 structure = fileindex.load(name) 61 sections: Dict[str, int] = dict() 62 new_index = [] 63 for e in structure.index: 64 if e.code == 0: 65 new_index.append(e) 66 elif e.code == 2: 67 sections[e.value] = len(new_index) - 1 68 # remove all other tags like handles (code == 5) 69 structure.index = new_index 70 return structure, sections 71 72 @property 73 def encoding(self): 74 return self.structure.encoding 75 76 @property 77 def dxfversion(self): 78 return self.structure.version 79 80 def export(self, name: Filename) -> 'IterDXFWriter': 81 """ Returns a companion object to export parts from the source DXF file 82 into another DXF file, the new file will have the same HEADER, CLASSES, 83 TABLES, BLOCKS and OBJECTS sections, which guarantees all necessary 84 dependencies are present in the new file. 85 86 Args: 87 name: filename, no special requirements 88 89 """ 90 doc = IterDXFWriter(name, self) 91 # Copy everything from start of source DXF until the first entity 92 # of the ENTITIES section to the new DXF. 93 location = self.structure.index[self.sections['ENTITIES'] + 1].location 94 self.file.seek(0) 95 data = self.file.read(location) 96 doc.write_data(data) 97 return doc 98 99 def copy_objects_section(self, f: BinaryIO) -> None: 100 start_index = self.sections['OBJECTS'] 101 try: 102 end_index = self.structure.get(0, 'ENDSEC', start_index) 103 except ValueError: 104 raise DXFStructureError(f'ENDSEC of OBJECTS section not found.') 105 106 start_location = self.structure.index[start_index].location 107 end_location = self.structure.index[end_index + 1].location 108 count = end_location - start_location 109 self.file.seek(start_location) 110 data = self.file.read(count) 111 f.write(data) 112 113 def modelspace(self, types: Iterable[str] = None) -> Iterable[DXFGraphic]: 114 """ Returns an iterator for all supported DXF entities in the 115 modelspace. These entities are regular :class:`~ezdxf.entities.DXFGraphic` 116 objects but without a valid document assigned. It is **not** 117 possible to add these entities to other `ezdxf` documents. 118 119 It is only possible to recreate the objects by factory functions base 120 on attributes of the source entity. 121 For MESH, POLYMESH and POLYFACE it is possible to use the 122 :class:`~ezdxf.render.MeshTransformer` class to render (recreate) this 123 objects as new entities in another document. 124 125 Args: 126 types: DXF types like ``['LINE', '3DFACE']`` which should be 127 returned, ``None`` returns all supported types. 128 129 """ 130 linked_entity = entity_linker() 131 queued = None 132 requested_types = _requested_types(types) 133 for entity in self.load_entities(self.sections['ENTITIES'] + 1, 134 requested_types): 135 if not linked_entity(entity) and entity.dxf.paperspace == 0: 136 # queue one entity for collecting linked entities: 137 # VERTEX, ATTRIB 138 if queued: 139 yield queued 140 queued = entity 141 if queued: 142 yield queued 143 144 def load_entities(self, start: int, 145 requested_types: Iterable[str] = None) -> Iterable[ 146 DXFGraphic]: 147 def to_str(data: bytes) -> str: 148 return data.decode( 149 self.encoding, errors=self.errors).replace('\r\n', '\n') 150 151 index = start 152 entry = self.structure.index[index] 153 self.file.seek(entry.location) 154 while entry.value != 'ENDSEC': 155 index += 1 156 next_entry = self.structure.index[index] 157 size = next_entry.location - entry.location 158 data = self.file.read(size) 159 if entry.value in requested_types: 160 xtags = ExtendedTags.from_text(to_str(data)) 161 yield factory.load(xtags) 162 entry = next_entry 163 164 def close(self): 165 """ Safe closing source DXF file. """ 166 self.file.close() 167 168 169class IterDXFWriter: 170 def __init__(self, name: Filename, loader: IterDXF): 171 self.name = str(name) 172 self.file: BinaryIO = open(name, mode='wb') 173 self.text = StringIO() 174 self.entity_writer = TagWriter(self.text, loader.dxfversion) 175 self.loader = loader 176 177 def write_data(self, data: bytes): 178 self.file.write(data) 179 180 def write(self, entity: DXFGraphic): 181 """ Write a DXF entity from the source DXF file to the export file. 182 183 Don't write entities from different documents than the source DXF file, 184 dependencies and resources will not match, maybe it will work once, but 185 not in a reliable way for different DXF documents. 186 187 """ 188 # Not necessary to remove this dependencies by copying 189 # them into the same document frame 190 # --------------------------------- 191 # remove all possible dependencies 192 # entity.xdata = None 193 # entity.appdata = None 194 # entity.extension_dict = None 195 # entity.reactors = None 196 # reset text stream 197 self.text.seek(0) 198 self.text.truncate() 199 200 if entity.dxf.handle is None: # DXF R12 without handles 201 self.entity_writer.write_handles = False 202 203 entity.export_dxf(self.entity_writer) 204 if entity.dxftype() == 'POLYLINE': 205 polyline = cast('Polyline', entity) 206 for vertex in polyline.vertices: 207 vertex.export_dxf(self.entity_writer) 208 polyline.seqend.export_dxf(self.entity_writer) 209 elif entity.dxftype() == 'INSERT': 210 insert = cast('Insert', entity) 211 if insert.attribs_follow: 212 for attrib in insert.attribs: 213 attrib.export_dxf(self.entity_writer) 214 insert.seqend.export_dxf(self.entity_writer) 215 data = self.text.getvalue().encode(self.loader.encoding) 216 self.file.write(data) 217 218 def close(self): 219 """ Safe closing of exported DXF file. Copying of OBJECTS section 220 happens only at closing the file, without closing the new DXF file is 221 invalid. 222 """ 223 self.file.write(b' 0\r\nENDSEC\r\n') # for ENTITIES section 224 if self.loader.dxfversion > 'AC1009': 225 self.loader.copy_objects_section(self.file) 226 self.file.write(b' 0\r\nEOF\r\n') 227 self.file.close() 228 229 230def opendxf(filename: Filename, errors: str = 'surrogateescape') -> IterDXF: 231 """ Open DXF file for iterating, be sure to open valid DXF files, no DXF 232 structure checks will be applied. 233 234 Use this function to split up big DXF files as shown in the example above. 235 236 Args: 237 filename: DXF filename of a seekable DXF file. 238 errors: specify decoding error handler 239 240 - "surrogateescape" to preserve possible binary data (default) 241 - "ignore" to use the replacement char U+FFFD "\ufffd" for invalid data 242 - "strict" to raise an :class:`UnicodeDecodeError` exception for invalid data 243 244 Raises: 245 DXFStructureError: invalid or incomplete DXF file 246 UnicodeDecodeError: if `errors` is "strict" and a decoding error occurs 247 248 """ 249 return IterDXF(filename, errors=errors) 250 251 252def modelspace(filename: Filename, 253 types: Iterable[str] = None, 254 errors: str = 'surrogateescape') -> Iterable[DXFGraphic]: 255 """ Iterate over all modelspace entities as :class:`DXFGraphic` objects of 256 a seekable file. 257 258 Use this function to iterate "quick" over modelspace entities of a DXF file, 259 filtering DXF types may speed up things if many entity types will be skipped. 260 261 Args: 262 filename: filename of a seekable DXF file 263 types: DXF types like ``['LINE', '3DFACE']`` which should be returned, 264 ``None`` returns all supported types. 265 errors: specify decoding error handler 266 267 - "surrogateescape" to preserve possible binary data (default) 268 - "ignore" to use the replacement char U+FFFD "\ufffd" for invalid data 269 - "strict" to raise an :class:`UnicodeDecodeError` exception for invalid data 270 271 Raises: 272 DXFStructureError: invalid or incomplete DXF file 273 UnicodeDecodeError: if `errors` is "strict" and a decoding error occurs 274 275 """ 276 info = dxf_file_info(filename) 277 prev_code: int = -1 278 prev_value: str = '' 279 entities = False 280 requested_types = _requested_types(types) 281 282 with open(filename, mode='rt', encoding=info.encoding, errors=errors) as fp: 283 tagger = ascii_tags_loader(fp) 284 queued: Optional[DXFEntity] = None 285 tags: List[DXFTag] = [] 286 linked_entity = entity_linker() 287 288 for tag in tag_compiler(tagger): 289 code = tag.code 290 value = tag.value 291 if entities: 292 if code == 0: 293 if len(tags) and tags[0].value in requested_types: 294 entity = factory.load(ExtendedTags(tags)) 295 if not linked_entity( 296 entity) and entity.dxf.paperspace == 0: 297 # queue one entity for collecting linked entities: 298 # VERTEX, ATTRIB 299 if queued: 300 yield queued 301 queued = entity 302 tags = [tag] 303 else: 304 tags.append(tag) 305 if code == 0 and value == 'ENDSEC': 306 if queued: 307 yield queued 308 return 309 continue # if entities - nothing else matters 310 elif code == 2 and prev_code == 0 and prev_value == 'SECTION': 311 entities = (value == 'ENTITIES') 312 313 prev_code = code 314 prev_value = value 315 316 317def single_pass_modelspace( 318 stream: BinaryIO, 319 types: Iterable[str] = None, 320 errors: str = 'surrogateescape') -> Iterable[DXFGraphic]: 321 """ Iterate over all modelspace entities as :class:`DXFGraphic` objects in 322 one single pass. 323 324 Use this function to 'quick' iterate over modelspace entities of a **not** 325 seekable binary DXF stream, filtering DXF types may speed up things if many 326 entity types will be skipped. 327 328 Args: 329 stream: (not seekable) binary DXF stream 330 types: DXF types like ``['LINE', '3DFACE']`` which should be returned, 331 ``None`` returns all supported types. 332 errors: specify decoding error handler 333 334 - "surrogateescape" to preserve possible binary data (default) 335 - "ignore" to use the replacement char U+FFFD "\ufffd" for invalid data 336 - "strict" to raise an :class:`UnicodeDecodeError` exception for invalid data 337 338 Raises: 339 DXFStructureError: Invalid or incomplete DXF file 340 UnicodeDecodeError: if `errors` is "strict" and a decoding error occurs 341 342 """ 343 fetch_header_var: Optional[str] = None 344 encoding = 'cp1252' 345 version = 'AC1009' 346 prev_code: int = -1 347 prev_value: str = '' 348 entities = False 349 requested_types = _requested_types(types) 350 351 for code, value in binary_tagger(stream): 352 if code == 0 and value == b'ENDSEC': 353 break 354 elif code == 2 and prev_code == 0 and value != b'HEADER': 355 # (0, SECTION), (2, name) 356 # First section is not the HEADER section 357 entities = (value == b'ENTITIES') 358 break 359 elif code == 9 and value == b'$DWGCODEPAGE': 360 fetch_header_var = 'ENCODING' 361 elif code == 9 and value == b'$ACADVER': 362 fetch_header_var = 'VERSION' 363 elif fetch_header_var == 'ENCODING': 364 encoding = toencoding(value.decode()) 365 fetch_header_var = None 366 elif fetch_header_var == 'VERSION': 367 version = value.decode() 368 fetch_header_var = None 369 prev_code = code 370 371 if version >= 'AC1021': 372 encoding = 'utf-8' 373 374 queued: Optional[DXFEntity] = None 375 tags: List[DXFTag] = [] 376 linked_entity = entity_linker() 377 378 for tag in tag_compiler(binary_tagger(stream, encoding, errors)): 379 code = tag.code 380 value = tag.value 381 if entities: 382 if code == 0 and value == 'ENDSEC': 383 if queued: 384 yield queued 385 return 386 if code == 0: 387 if len(tags) and tags[0].value in requested_types: 388 entity = factory.load(ExtendedTags(tags)) 389 if not linked_entity(entity) and entity.dxf.paperspace == 0: 390 # queue one entity for collecting linked entities: 391 # VERTEX, ATTRIB 392 if queued: 393 yield queued 394 queued = entity 395 tags = [tag] 396 else: 397 tags.append(tag) 398 continue # if entities - nothing else matters 399 elif code == 2 and prev_code == 0 and prev_value == 'SECTION': 400 entities = (value == 'ENTITIES') 401 402 prev_code = code 403 prev_value = value 404 405 406def binary_tagger(file: BinaryIO, encoding: str = None, 407 errors: str = 'surrogateescape') -> DXFTag: 408 while True: 409 try: 410 try: 411 code = int(file.readline()) 412 except ValueError: 413 raise DXFStructureError(f'Invalid group code') 414 value = file.readline().rstrip(b'\r\n') 415 yield DXFTag( 416 code, 417 value.decode(encoding, errors=errors) 418 if encoding else value) 419 except IOError: 420 return 421 422 423def _requested_types(types: Optional[Iterable[str]]) -> Set[str]: 424 if types: 425 requested = SUPPORTED_TYPES.intersection(set(types)) 426 if 'POLYLINE' in requested: 427 requested.add('SEQEND') 428 requested.add('VERTEX') 429 if 'INSERT' in requested: 430 requested.add('SEQEND') 431 requested.add('ATTRIB') 432 else: 433 requested = SUPPORTED_TYPES 434 return requested 435