1# coding: utf-8 2 3from __future__ import absolute_import 4from __future__ import print_function 5 6# Emitter expects events obeying the following grammar: 7# stream ::= STREAM-START document* STREAM-END 8# document ::= DOCUMENT-START node DOCUMENT-END 9# node ::= SCALAR | sequence | mapping 10# sequence ::= SEQUENCE-START node* SEQUENCE-END 11# mapping ::= MAPPING-START (node node)* MAPPING-END 12 13import sys 14from ruamel.yaml.error import YAMLError, YAMLStreamError 15from ruamel.yaml.events import * # NOQA 16 17# fmt: off 18from ruamel.yaml.compat import utf8, text_type, PY2, nprint, dbg, DBG_EVENT, \ 19 check_anchorname_char 20# fmt: on 21 22if False: # MYPY 23 from typing import Any, Dict, List, Union, Text, Tuple, Optional # NOQA 24 from ruamel.yaml.compat import StreamType # NOQA 25 26__all__ = ['Emitter', 'EmitterError'] 27 28 29class EmitterError(YAMLError): 30 pass 31 32 33class ScalarAnalysis(object): 34 def __init__( 35 self, 36 scalar, 37 empty, 38 multiline, 39 allow_flow_plain, 40 allow_block_plain, 41 allow_single_quoted, 42 allow_double_quoted, 43 allow_block, 44 ): 45 # type: (Any, Any, Any, bool, bool, bool, bool, bool) -> None 46 self.scalar = scalar 47 self.empty = empty 48 self.multiline = multiline 49 self.allow_flow_plain = allow_flow_plain 50 self.allow_block_plain = allow_block_plain 51 self.allow_single_quoted = allow_single_quoted 52 self.allow_double_quoted = allow_double_quoted 53 self.allow_block = allow_block 54 55 56class Indents(object): 57 # replacement for the list based stack of None/int 58 def __init__(self): 59 # type: () -> None 60 self.values = [] # type: List[Tuple[int, bool]] 61 62 def append(self, val, seq): 63 # type: (Any, Any) -> None 64 self.values.append((val, seq)) 65 66 def pop(self): 67 # type: () -> Any 68 return self.values.pop()[0] 69 70 def last_seq(self): 71 # type: () -> bool 72 # return the seq(uence) value for the element added before the last one 73 # in increase_indent() 74 try: 75 return self.values[-2][1] 76 except IndexError: 77 return False 78 79 def seq_flow_align(self, seq_indent, column): 80 # type: (int, int) -> int 81 # extra spaces because of dash 82 if len(self.values) < 2 or not self.values[-1][1]: 83 return 0 84 # -1 for the dash 85 base = self.values[-1][0] if self.values[-1][0] is not None else 0 86 return base + seq_indent - column - 1 87 88 def __len__(self): 89 # type: () -> int 90 return len(self.values) 91 92 93class Emitter(object): 94 # fmt: off 95 DEFAULT_TAG_PREFIXES = { 96 u'!': u'!', 97 u'tag:yaml.org,2002:': u'!!', 98 } 99 # fmt: on 100 101 MAX_SIMPLE_KEY_LENGTH = 128 102 103 def __init__( 104 self, 105 stream, 106 canonical=None, 107 indent=None, 108 width=None, 109 allow_unicode=None, 110 line_break=None, 111 block_seq_indent=None, 112 top_level_colon_align=None, 113 prefix_colon=None, 114 brace_single_entry_mapping_in_flow_sequence=None, 115 dumper=None, 116 ): 117 # type: (StreamType, Any, Optional[int], Optional[int], Optional[bool], Any, Optional[int], Optional[bool], Any, Optional[bool], Any) -> None # NOQA 118 self.dumper = dumper 119 if self.dumper is not None and getattr(self.dumper, '_emitter', None) is None: 120 self.dumper._emitter = self 121 self.stream = stream 122 123 # Encoding can be overriden by STREAM-START. 124 self.encoding = None # type: Optional[Text] 125 self.allow_space_break = None 126 127 # Emitter is a state machine with a stack of states to handle nested 128 # structures. 129 self.states = [] # type: List[Any] 130 self.state = self.expect_stream_start # type: Any 131 132 # Current event and the event queue. 133 self.events = [] # type: List[Any] 134 self.event = None # type: Any 135 136 # The current indentation level and the stack of previous indents. 137 self.indents = Indents() 138 self.indent = None # type: Optional[int] 139 140 # flow_context is an expanding/shrinking list consisting of '{' and '[' 141 # for each unclosed flow context. If empty list that means block context 142 self.flow_context = [] # type: List[Text] 143 144 # Contexts. 145 self.root_context = False 146 self.sequence_context = False 147 self.mapping_context = False 148 self.simple_key_context = False 149 150 # Characteristics of the last emitted character: 151 # - current position. 152 # - is it a whitespace? 153 # - is it an indention character 154 # (indentation space, '-', '?', or ':')? 155 self.line = 0 156 self.column = 0 157 self.whitespace = True 158 self.indention = True 159 self.compact_seq_seq = True # dash after dash 160 self.compact_seq_map = True # key after dash 161 # self.compact_ms = False # dash after key, only when excplicit key with ? 162 self.no_newline = None # type: Optional[bool] # set if directly after `- ` 163 164 # Whether the document requires an explicit document end indicator 165 self.open_ended = False 166 167 # colon handling 168 self.colon = u':' 169 self.prefixed_colon = self.colon if prefix_colon is None else prefix_colon + self.colon 170 # single entry mappings in flow sequence 171 self.brace_single_entry_mapping_in_flow_sequence = ( 172 brace_single_entry_mapping_in_flow_sequence # NOQA 173 ) 174 175 # Formatting details. 176 self.canonical = canonical 177 self.allow_unicode = allow_unicode 178 # set to False to get "\Uxxxxxxxx" for non-basic unicode like emojis 179 self.unicode_supplementary = sys.maxunicode > 0xFFFF 180 self.sequence_dash_offset = block_seq_indent if block_seq_indent else 0 181 self.top_level_colon_align = top_level_colon_align 182 self.best_sequence_indent = 2 183 self.requested_indent = indent # specific for literal zero indent 184 if indent and 1 < indent < 10: 185 self.best_sequence_indent = indent 186 self.best_map_indent = self.best_sequence_indent 187 # if self.best_sequence_indent < self.sequence_dash_offset + 1: 188 # self.best_sequence_indent = self.sequence_dash_offset + 1 189 self.best_width = 80 190 if width and width > self.best_sequence_indent * 2: 191 self.best_width = width 192 self.best_line_break = u'\n' # type: Any 193 if line_break in [u'\r', u'\n', u'\r\n']: 194 self.best_line_break = line_break 195 196 # Tag prefixes. 197 self.tag_prefixes = None # type: Any 198 199 # Prepared anchor and tag. 200 self.prepared_anchor = None # type: Any 201 self.prepared_tag = None # type: Any 202 203 # Scalar analysis and style. 204 self.analysis = None # type: Any 205 self.style = None # type: Any 206 207 self.scalar_after_indicator = True # write a scalar on the same line as `---` 208 209 @property 210 def stream(self): 211 # type: () -> Any 212 try: 213 return self._stream 214 except AttributeError: 215 raise YAMLStreamError('output stream needs to specified') 216 217 @stream.setter 218 def stream(self, val): 219 # type: (Any) -> None 220 if val is None: 221 return 222 if not hasattr(val, 'write'): 223 raise YAMLStreamError('stream argument needs to have a write() method') 224 self._stream = val 225 226 @property 227 def serializer(self): 228 # type: () -> Any 229 try: 230 if hasattr(self.dumper, 'typ'): 231 return self.dumper.serializer 232 return self.dumper._serializer 233 except AttributeError: 234 return self # cyaml 235 236 @property 237 def flow_level(self): 238 # type: () -> int 239 return len(self.flow_context) 240 241 def dispose(self): 242 # type: () -> None 243 # Reset the state attributes (to clear self-references) 244 self.states = [] 245 self.state = None 246 247 def emit(self, event): 248 # type: (Any) -> None 249 if dbg(DBG_EVENT): 250 nprint(event) 251 self.events.append(event) 252 while not self.need_more_events(): 253 self.event = self.events.pop(0) 254 self.state() 255 self.event = None 256 257 # In some cases, we wait for a few next events before emitting. 258 259 def need_more_events(self): 260 # type: () -> bool 261 if not self.events: 262 return True 263 event = self.events[0] 264 if isinstance(event, DocumentStartEvent): 265 return self.need_events(1) 266 elif isinstance(event, SequenceStartEvent): 267 return self.need_events(2) 268 elif isinstance(event, MappingStartEvent): 269 return self.need_events(3) 270 else: 271 return False 272 273 def need_events(self, count): 274 # type: (int) -> bool 275 level = 0 276 for event in self.events[1:]: 277 if isinstance(event, (DocumentStartEvent, CollectionStartEvent)): 278 level += 1 279 elif isinstance(event, (DocumentEndEvent, CollectionEndEvent)): 280 level -= 1 281 elif isinstance(event, StreamEndEvent): 282 level = -1 283 if level < 0: 284 return False 285 return len(self.events) < count + 1 286 287 def increase_indent(self, flow=False, sequence=None, indentless=False): 288 # type: (bool, Optional[bool], bool) -> None 289 self.indents.append(self.indent, sequence) 290 if self.indent is None: # top level 291 if flow: 292 # self.indent = self.best_sequence_indent if self.indents.last_seq() else \ 293 # self.best_map_indent 294 # self.indent = self.best_sequence_indent 295 self.indent = self.requested_indent 296 else: 297 self.indent = 0 298 elif not indentless: 299 self.indent += ( 300 self.best_sequence_indent if self.indents.last_seq() else self.best_map_indent 301 ) 302 # if self.indents.last_seq(): 303 # if self.indent == 0: # top level block sequence 304 # self.indent = self.best_sequence_indent - self.sequence_dash_offset 305 # else: 306 # self.indent += self.best_sequence_indent 307 # else: 308 # self.indent += self.best_map_indent 309 310 # States. 311 312 # Stream handlers. 313 314 def expect_stream_start(self): 315 # type: () -> None 316 if isinstance(self.event, StreamStartEvent): 317 if PY2: 318 if self.event.encoding and not getattr(self.stream, 'encoding', None): 319 self.encoding = self.event.encoding 320 else: 321 if self.event.encoding and not hasattr(self.stream, 'encoding'): 322 self.encoding = self.event.encoding 323 self.write_stream_start() 324 self.state = self.expect_first_document_start 325 else: 326 raise EmitterError('expected StreamStartEvent, but got %s' % (self.event,)) 327 328 def expect_nothing(self): 329 # type: () -> None 330 raise EmitterError('expected nothing, but got %s' % (self.event,)) 331 332 # Document handlers. 333 334 def expect_first_document_start(self): 335 # type: () -> Any 336 return self.expect_document_start(first=True) 337 338 def expect_document_start(self, first=False): 339 # type: (bool) -> None 340 if isinstance(self.event, DocumentStartEvent): 341 if (self.event.version or self.event.tags) and self.open_ended: 342 self.write_indicator(u'...', True) 343 self.write_indent() 344 if self.event.version: 345 version_text = self.prepare_version(self.event.version) 346 self.write_version_directive(version_text) 347 self.tag_prefixes = self.DEFAULT_TAG_PREFIXES.copy() 348 if self.event.tags: 349 handles = sorted(self.event.tags.keys()) 350 for handle in handles: 351 prefix = self.event.tags[handle] 352 self.tag_prefixes[prefix] = handle 353 handle_text = self.prepare_tag_handle(handle) 354 prefix_text = self.prepare_tag_prefix(prefix) 355 self.write_tag_directive(handle_text, prefix_text) 356 implicit = ( 357 first 358 and not self.event.explicit 359 and not self.canonical 360 and not self.event.version 361 and not self.event.tags 362 and not self.check_empty_document() 363 ) 364 if not implicit: 365 self.write_indent() 366 self.write_indicator(u'---', True) 367 if self.canonical: 368 self.write_indent() 369 self.state = self.expect_document_root 370 elif isinstance(self.event, StreamEndEvent): 371 if self.open_ended: 372 self.write_indicator(u'...', True) 373 self.write_indent() 374 self.write_stream_end() 375 self.state = self.expect_nothing 376 else: 377 raise EmitterError('expected DocumentStartEvent, but got %s' % (self.event,)) 378 379 def expect_document_end(self): 380 # type: () -> None 381 if isinstance(self.event, DocumentEndEvent): 382 self.write_indent() 383 if self.event.explicit: 384 self.write_indicator(u'...', True) 385 self.write_indent() 386 self.flush_stream() 387 self.state = self.expect_document_start 388 else: 389 raise EmitterError('expected DocumentEndEvent, but got %s' % (self.event,)) 390 391 def expect_document_root(self): 392 # type: () -> None 393 self.states.append(self.expect_document_end) 394 self.expect_node(root=True) 395 396 # Node handlers. 397 398 def expect_node(self, root=False, sequence=False, mapping=False, simple_key=False): 399 # type: (bool, bool, bool, bool) -> None 400 self.root_context = root 401 self.sequence_context = sequence # not used in PyYAML 402 self.mapping_context = mapping 403 self.simple_key_context = simple_key 404 if isinstance(self.event, AliasEvent): 405 self.expect_alias() 406 elif isinstance(self.event, (ScalarEvent, CollectionStartEvent)): 407 if ( 408 self.process_anchor(u'&') 409 and isinstance(self.event, ScalarEvent) 410 and self.sequence_context 411 ): 412 self.sequence_context = False 413 if ( 414 root 415 and isinstance(self.event, ScalarEvent) 416 and not self.scalar_after_indicator 417 ): 418 self.write_indent() 419 self.process_tag() 420 if isinstance(self.event, ScalarEvent): 421 # nprint('@', self.indention, self.no_newline, self.column) 422 self.expect_scalar() 423 elif isinstance(self.event, SequenceStartEvent): 424 # nprint('@', self.indention, self.no_newline, self.column) 425 i2, n2 = self.indention, self.no_newline # NOQA 426 if self.event.comment: 427 if self.event.flow_style is False and self.event.comment: 428 if self.write_post_comment(self.event): 429 self.indention = False 430 self.no_newline = True 431 if self.write_pre_comment(self.event): 432 self.indention = i2 433 self.no_newline = not self.indention 434 if ( 435 self.flow_level 436 or self.canonical 437 or self.event.flow_style 438 or self.check_empty_sequence() 439 ): 440 self.expect_flow_sequence() 441 else: 442 self.expect_block_sequence() 443 elif isinstance(self.event, MappingStartEvent): 444 if self.event.flow_style is False and self.event.comment: 445 self.write_post_comment(self.event) 446 if self.event.comment and self.event.comment[1]: 447 self.write_pre_comment(self.event) 448 if ( 449 self.flow_level 450 or self.canonical 451 or self.event.flow_style 452 or self.check_empty_mapping() 453 ): 454 self.expect_flow_mapping(single=self.event.nr_items == 1) 455 else: 456 self.expect_block_mapping() 457 else: 458 raise EmitterError('expected NodeEvent, but got %s' % (self.event,)) 459 460 def expect_alias(self): 461 # type: () -> None 462 if self.event.anchor is None: 463 raise EmitterError('anchor is not specified for alias') 464 self.process_anchor(u'*') 465 self.state = self.states.pop() 466 467 def expect_scalar(self): 468 # type: () -> None 469 self.increase_indent(flow=True) 470 self.process_scalar() 471 self.indent = self.indents.pop() 472 self.state = self.states.pop() 473 474 # Flow sequence handlers. 475 476 def expect_flow_sequence(self): 477 # type: () -> None 478 ind = self.indents.seq_flow_align(self.best_sequence_indent, self.column) 479 self.write_indicator(u' ' * ind + u'[', True, whitespace=True) 480 self.increase_indent(flow=True, sequence=True) 481 self.flow_context.append('[') 482 self.state = self.expect_first_flow_sequence_item 483 484 def expect_first_flow_sequence_item(self): 485 # type: () -> None 486 if isinstance(self.event, SequenceEndEvent): 487 self.indent = self.indents.pop() 488 popped = self.flow_context.pop() 489 assert popped == '[' 490 self.write_indicator(u']', False) 491 if self.event.comment and self.event.comment[0]: 492 # eol comment on empty flow sequence 493 self.write_post_comment(self.event) 494 elif self.flow_level == 0: 495 self.write_line_break() 496 self.state = self.states.pop() 497 else: 498 if self.canonical or self.column > self.best_width: 499 self.write_indent() 500 self.states.append(self.expect_flow_sequence_item) 501 self.expect_node(sequence=True) 502 503 def expect_flow_sequence_item(self): 504 # type: () -> None 505 if isinstance(self.event, SequenceEndEvent): 506 self.indent = self.indents.pop() 507 popped = self.flow_context.pop() 508 assert popped == '[' 509 if self.canonical: 510 self.write_indicator(u',', False) 511 self.write_indent() 512 self.write_indicator(u']', False) 513 if self.event.comment and self.event.comment[0]: 514 # eol comment on flow sequence 515 self.write_post_comment(self.event) 516 else: 517 self.no_newline = False 518 self.state = self.states.pop() 519 else: 520 self.write_indicator(u',', False) 521 if self.canonical or self.column > self.best_width: 522 self.write_indent() 523 self.states.append(self.expect_flow_sequence_item) 524 self.expect_node(sequence=True) 525 526 # Flow mapping handlers. 527 528 def expect_flow_mapping(self, single=False): 529 # type: (Optional[bool]) -> None 530 ind = self.indents.seq_flow_align(self.best_sequence_indent, self.column) 531 map_init = u'{' 532 if ( 533 single 534 and self.flow_level 535 and self.flow_context[-1] == '[' 536 and not self.canonical 537 and not self.brace_single_entry_mapping_in_flow_sequence 538 ): 539 # single map item with flow context, no curly braces necessary 540 map_init = u'' 541 self.write_indicator(u' ' * ind + map_init, True, whitespace=True) 542 self.flow_context.append(map_init) 543 self.increase_indent(flow=True, sequence=False) 544 self.state = self.expect_first_flow_mapping_key 545 546 def expect_first_flow_mapping_key(self): 547 # type: () -> None 548 if isinstance(self.event, MappingEndEvent): 549 self.indent = self.indents.pop() 550 popped = self.flow_context.pop() 551 assert popped == '{' # empty flow mapping 552 self.write_indicator(u'}', False) 553 if self.event.comment and self.event.comment[0]: 554 # eol comment on empty mapping 555 self.write_post_comment(self.event) 556 elif self.flow_level == 0: 557 self.write_line_break() 558 self.state = self.states.pop() 559 else: 560 if self.canonical or self.column > self.best_width: 561 self.write_indent() 562 if not self.canonical and self.check_simple_key(): 563 self.states.append(self.expect_flow_mapping_simple_value) 564 self.expect_node(mapping=True, simple_key=True) 565 else: 566 self.write_indicator(u'?', True) 567 self.states.append(self.expect_flow_mapping_value) 568 self.expect_node(mapping=True) 569 570 def expect_flow_mapping_key(self): 571 # type: () -> None 572 if isinstance(self.event, MappingEndEvent): 573 # if self.event.comment and self.event.comment[1]: 574 # self.write_pre_comment(self.event) 575 self.indent = self.indents.pop() 576 popped = self.flow_context.pop() 577 assert popped in [u'{', u''] 578 if self.canonical: 579 self.write_indicator(u',', False) 580 self.write_indent() 581 if popped != u'': 582 self.write_indicator(u'}', False) 583 if self.event.comment and self.event.comment[0]: 584 # eol comment on flow mapping, never reached on empty mappings 585 self.write_post_comment(self.event) 586 else: 587 self.no_newline = False 588 self.state = self.states.pop() 589 else: 590 self.write_indicator(u',', False) 591 if self.canonical or self.column > self.best_width: 592 self.write_indent() 593 if not self.canonical and self.check_simple_key(): 594 self.states.append(self.expect_flow_mapping_simple_value) 595 self.expect_node(mapping=True, simple_key=True) 596 else: 597 self.write_indicator(u'?', True) 598 self.states.append(self.expect_flow_mapping_value) 599 self.expect_node(mapping=True) 600 601 def expect_flow_mapping_simple_value(self): 602 # type: () -> None 603 self.write_indicator(self.prefixed_colon, False) 604 self.states.append(self.expect_flow_mapping_key) 605 self.expect_node(mapping=True) 606 607 def expect_flow_mapping_value(self): 608 # type: () -> None 609 if self.canonical or self.column > self.best_width: 610 self.write_indent() 611 self.write_indicator(self.prefixed_colon, True) 612 self.states.append(self.expect_flow_mapping_key) 613 self.expect_node(mapping=True) 614 615 # Block sequence handlers. 616 617 def expect_block_sequence(self): 618 # type: () -> None 619 if self.mapping_context: 620 indentless = not self.indention 621 else: 622 indentless = False 623 if not self.compact_seq_seq and self.column != 0: 624 self.write_line_break() 625 self.increase_indent(flow=False, sequence=True, indentless=indentless) 626 self.state = self.expect_first_block_sequence_item 627 628 def expect_first_block_sequence_item(self): 629 # type: () -> Any 630 return self.expect_block_sequence_item(first=True) 631 632 def expect_block_sequence_item(self, first=False): 633 # type: (bool) -> None 634 if not first and isinstance(self.event, SequenceEndEvent): 635 if self.event.comment and self.event.comment[1]: 636 # final comments on a block list e.g. empty line 637 self.write_pre_comment(self.event) 638 self.indent = self.indents.pop() 639 self.state = self.states.pop() 640 self.no_newline = False 641 else: 642 if self.event.comment and self.event.comment[1]: 643 self.write_pre_comment(self.event) 644 nonl = self.no_newline if self.column == 0 else False 645 self.write_indent() 646 ind = self.sequence_dash_offset # if len(self.indents) > 1 else 0 647 self.write_indicator(u' ' * ind + u'-', True, indention=True) 648 if nonl or self.sequence_dash_offset + 2 > self.best_sequence_indent: 649 self.no_newline = True 650 self.states.append(self.expect_block_sequence_item) 651 self.expect_node(sequence=True) 652 653 # Block mapping handlers. 654 655 def expect_block_mapping(self): 656 # type: () -> None 657 if not self.mapping_context and not (self.compact_seq_map or self.column == 0): 658 self.write_line_break() 659 self.increase_indent(flow=False, sequence=False) 660 self.state = self.expect_first_block_mapping_key 661 662 def expect_first_block_mapping_key(self): 663 # type: () -> None 664 return self.expect_block_mapping_key(first=True) 665 666 def expect_block_mapping_key(self, first=False): 667 # type: (Any) -> None 668 if not first and isinstance(self.event, MappingEndEvent): 669 if self.event.comment and self.event.comment[1]: 670 # final comments from a doc 671 self.write_pre_comment(self.event) 672 self.indent = self.indents.pop() 673 self.state = self.states.pop() 674 else: 675 if self.event.comment and self.event.comment[1]: 676 # final comments from a doc 677 self.write_pre_comment(self.event) 678 self.write_indent() 679 if self.check_simple_key(): 680 if not isinstance( 681 self.event, (SequenceStartEvent, MappingStartEvent) 682 ): # sequence keys 683 try: 684 if self.event.style == '?': 685 self.write_indicator(u'?', True, indention=True) 686 except AttributeError: # aliases have no style 687 pass 688 self.states.append(self.expect_block_mapping_simple_value) 689 self.expect_node(mapping=True, simple_key=True) 690 if isinstance(self.event, AliasEvent): 691 self.stream.write(u' ') 692 else: 693 self.write_indicator(u'?', True, indention=True) 694 self.states.append(self.expect_block_mapping_value) 695 self.expect_node(mapping=True) 696 697 def expect_block_mapping_simple_value(self): 698 # type: () -> None 699 if getattr(self.event, 'style', None) != '?': 700 # prefix = u'' 701 if self.indent == 0 and self.top_level_colon_align is not None: 702 # write non-prefixed colon 703 c = u' ' * (self.top_level_colon_align - self.column) + self.colon 704 else: 705 c = self.prefixed_colon 706 self.write_indicator(c, False) 707 self.states.append(self.expect_block_mapping_key) 708 self.expect_node(mapping=True) 709 710 def expect_block_mapping_value(self): 711 # type: () -> None 712 self.write_indent() 713 self.write_indicator(self.prefixed_colon, True, indention=True) 714 self.states.append(self.expect_block_mapping_key) 715 self.expect_node(mapping=True) 716 717 # Checkers. 718 719 def check_empty_sequence(self): 720 # type: () -> bool 721 return ( 722 isinstance(self.event, SequenceStartEvent) 723 and bool(self.events) 724 and isinstance(self.events[0], SequenceEndEvent) 725 ) 726 727 def check_empty_mapping(self): 728 # type: () -> bool 729 return ( 730 isinstance(self.event, MappingStartEvent) 731 and bool(self.events) 732 and isinstance(self.events[0], MappingEndEvent) 733 ) 734 735 def check_empty_document(self): 736 # type: () -> bool 737 if not isinstance(self.event, DocumentStartEvent) or not self.events: 738 return False 739 event = self.events[0] 740 return ( 741 isinstance(event, ScalarEvent) 742 and event.anchor is None 743 and event.tag is None 744 and event.implicit 745 and event.value == "" 746 ) 747 748 def check_simple_key(self): 749 # type: () -> bool 750 length = 0 751 if isinstance(self.event, NodeEvent) and self.event.anchor is not None: 752 if self.prepared_anchor is None: 753 self.prepared_anchor = self.prepare_anchor(self.event.anchor) 754 length += len(self.prepared_anchor) 755 if ( 756 isinstance(self.event, (ScalarEvent, CollectionStartEvent)) 757 and self.event.tag is not None 758 ): 759 if self.prepared_tag is None: 760 self.prepared_tag = self.prepare_tag(self.event.tag) 761 length += len(self.prepared_tag) 762 if isinstance(self.event, ScalarEvent): 763 if self.analysis is None: 764 self.analysis = self.analyze_scalar(self.event.value) 765 length += len(self.analysis.scalar) 766 return length < self.MAX_SIMPLE_KEY_LENGTH and ( 767 isinstance(self.event, AliasEvent) 768 or (isinstance(self.event, SequenceStartEvent) and self.event.flow_style is True) 769 or (isinstance(self.event, MappingStartEvent) and self.event.flow_style is True) 770 or ( 771 isinstance(self.event, ScalarEvent) 772 # if there is an explicit style for an empty string, it is a simple key 773 and not (self.analysis.empty and self.style and self.style not in '\'"') 774 and not self.analysis.multiline 775 ) 776 or self.check_empty_sequence() 777 or self.check_empty_mapping() 778 ) 779 780 # Anchor, Tag, and Scalar processors. 781 782 def process_anchor(self, indicator): 783 # type: (Any) -> bool 784 if self.event.anchor is None: 785 self.prepared_anchor = None 786 return False 787 if self.prepared_anchor is None: 788 self.prepared_anchor = self.prepare_anchor(self.event.anchor) 789 if self.prepared_anchor: 790 self.write_indicator(indicator + self.prepared_anchor, True) 791 # issue 288 792 self.no_newline = False 793 self.prepared_anchor = None 794 return True 795 796 def process_tag(self): 797 # type: () -> None 798 tag = self.event.tag 799 if isinstance(self.event, ScalarEvent): 800 if self.style is None: 801 self.style = self.choose_scalar_style() 802 if (not self.canonical or tag is None) and ( 803 (self.style == "" and self.event.implicit[0]) 804 or (self.style != "" and self.event.implicit[1]) 805 ): 806 self.prepared_tag = None 807 return 808 if self.event.implicit[0] and tag is None: 809 tag = u'!' 810 self.prepared_tag = None 811 else: 812 if (not self.canonical or tag is None) and self.event.implicit: 813 self.prepared_tag = None 814 return 815 if tag is None: 816 raise EmitterError('tag is not specified') 817 if self.prepared_tag is None: 818 self.prepared_tag = self.prepare_tag(tag) 819 if self.prepared_tag: 820 self.write_indicator(self.prepared_tag, True) 821 if ( 822 self.sequence_context 823 and not self.flow_level 824 and isinstance(self.event, ScalarEvent) 825 ): 826 self.no_newline = True 827 self.prepared_tag = None 828 829 def choose_scalar_style(self): 830 # type: () -> Any 831 if self.analysis is None: 832 self.analysis = self.analyze_scalar(self.event.value) 833 if self.event.style == '"' or self.canonical: 834 return '"' 835 if (not self.event.style or self.event.style == '?') and ( 836 self.event.implicit[0] or not self.event.implicit[2] 837 ): 838 if not ( 839 self.simple_key_context and (self.analysis.empty or self.analysis.multiline) 840 ) and ( 841 self.flow_level 842 and self.analysis.allow_flow_plain 843 or (not self.flow_level and self.analysis.allow_block_plain) 844 ): 845 return "" 846 self.analysis.allow_block = True 847 if self.event.style and self.event.style in '|>': 848 if ( 849 not self.flow_level 850 and not self.simple_key_context 851 and self.analysis.allow_block 852 ): 853 return self.event.style 854 if not self.event.style and self.analysis.allow_double_quoted: 855 if "'" in self.event.value or '\n' in self.event.value: 856 return '"' 857 if not self.event.style or self.event.style == "'": 858 if self.analysis.allow_single_quoted and not ( 859 self.simple_key_context and self.analysis.multiline 860 ): 861 return "'" 862 return '"' 863 864 def process_scalar(self): 865 # type: () -> None 866 if self.analysis is None: 867 self.analysis = self.analyze_scalar(self.event.value) 868 if self.style is None: 869 self.style = self.choose_scalar_style() 870 split = not self.simple_key_context 871 # if self.analysis.multiline and split \ 872 # and (not self.style or self.style in '\'\"'): 873 # self.write_indent() 874 # nprint('xx', self.sequence_context, self.flow_level) 875 if self.sequence_context and not self.flow_level: 876 self.write_indent() 877 if self.style == '"': 878 self.write_double_quoted(self.analysis.scalar, split) 879 elif self.style == "'": 880 self.write_single_quoted(self.analysis.scalar, split) 881 elif self.style == '>': 882 self.write_folded(self.analysis.scalar) 883 elif self.style == '|': 884 self.write_literal(self.analysis.scalar, self.event.comment) 885 else: 886 self.write_plain(self.analysis.scalar, split) 887 self.analysis = None 888 self.style = None 889 if self.event.comment: 890 self.write_post_comment(self.event) 891 892 # Analyzers. 893 894 def prepare_version(self, version): 895 # type: (Any) -> Any 896 major, minor = version 897 if major != 1: 898 raise EmitterError('unsupported YAML version: %d.%d' % (major, minor)) 899 return u'%d.%d' % (major, minor) 900 901 def prepare_tag_handle(self, handle): 902 # type: (Any) -> Any 903 if not handle: 904 raise EmitterError('tag handle must not be empty') 905 if handle[0] != u'!' or handle[-1] != u'!': 906 raise EmitterError("tag handle must start and end with '!': %r" % (utf8(handle))) 907 for ch in handle[1:-1]: 908 if not ( 909 u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' or ch in u'-_' 910 ): 911 raise EmitterError( 912 'invalid character %r in the tag handle: %r' % (utf8(ch), utf8(handle)) 913 ) 914 return handle 915 916 def prepare_tag_prefix(self, prefix): 917 # type: (Any) -> Any 918 if not prefix: 919 raise EmitterError('tag prefix must not be empty') 920 chunks = [] # type: List[Any] 921 start = end = 0 922 if prefix[0] == u'!': 923 end = 1 924 ch_set = u"-;/?:@&=+$,_.~*'()[]" 925 if self.dumper: 926 version = getattr(self.dumper, 'version', (1, 2)) 927 if version is None or version >= (1, 2): 928 ch_set += u'#' 929 while end < len(prefix): 930 ch = prefix[end] 931 if u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' or ch in ch_set: 932 end += 1 933 else: 934 if start < end: 935 chunks.append(prefix[start:end]) 936 start = end = end + 1 937 data = utf8(ch) 938 for ch in data: 939 chunks.append(u'%%%02X' % ord(ch)) 940 if start < end: 941 chunks.append(prefix[start:end]) 942 return "".join(chunks) 943 944 def prepare_tag(self, tag): 945 # type: (Any) -> Any 946 if not tag: 947 raise EmitterError('tag must not be empty') 948 if tag == u'!': 949 return tag 950 handle = None 951 suffix = tag 952 prefixes = sorted(self.tag_prefixes.keys()) 953 for prefix in prefixes: 954 if tag.startswith(prefix) and (prefix == u'!' or len(prefix) < len(tag)): 955 handle = self.tag_prefixes[prefix] 956 suffix = tag[len(prefix) :] 957 chunks = [] # type: List[Any] 958 start = end = 0 959 ch_set = u"-;/?:@&=+$,_.~*'()[]" 960 if self.dumper: 961 version = getattr(self.dumper, 'version', (1, 2)) 962 if version is None or version >= (1, 2): 963 ch_set += u'#' 964 while end < len(suffix): 965 ch = suffix[end] 966 if ( 967 u'0' <= ch <= u'9' 968 or u'A' <= ch <= u'Z' 969 or u'a' <= ch <= u'z' 970 or ch in ch_set 971 or (ch == u'!' and handle != u'!') 972 ): 973 end += 1 974 else: 975 if start < end: 976 chunks.append(suffix[start:end]) 977 start = end = end + 1 978 data = utf8(ch) 979 for ch in data: 980 chunks.append(u'%%%02X' % ord(ch)) 981 if start < end: 982 chunks.append(suffix[start:end]) 983 suffix_text = "".join(chunks) 984 if handle: 985 return u'%s%s' % (handle, suffix_text) 986 else: 987 return u'!<%s>' % suffix_text 988 989 def prepare_anchor(self, anchor): 990 # type: (Any) -> Any 991 if not anchor: 992 raise EmitterError('anchor must not be empty') 993 for ch in anchor: 994 if not check_anchorname_char(ch): 995 raise EmitterError( 996 'invalid character %r in the anchor: %r' % (utf8(ch), utf8(anchor)) 997 ) 998 return anchor 999 1000 def analyze_scalar(self, scalar): 1001 # type: (Any) -> Any 1002 # Empty scalar is a special case. 1003 if not scalar: 1004 return ScalarAnalysis( 1005 scalar=scalar, 1006 empty=True, 1007 multiline=False, 1008 allow_flow_plain=False, 1009 allow_block_plain=True, 1010 allow_single_quoted=True, 1011 allow_double_quoted=True, 1012 allow_block=False, 1013 ) 1014 1015 # Indicators and special characters. 1016 block_indicators = False 1017 flow_indicators = False 1018 line_breaks = False 1019 special_characters = False 1020 1021 # Important whitespace combinations. 1022 leading_space = False 1023 leading_break = False 1024 trailing_space = False 1025 trailing_break = False 1026 break_space = False 1027 space_break = False 1028 1029 # Check document indicators. 1030 if scalar.startswith(u'---') or scalar.startswith(u'...'): 1031 block_indicators = True 1032 flow_indicators = True 1033 1034 # First character or preceded by a whitespace. 1035 preceeded_by_whitespace = True 1036 1037 # Last character or followed by a whitespace. 1038 followed_by_whitespace = len(scalar) == 1 or scalar[1] in u'\0 \t\r\n\x85\u2028\u2029' 1039 1040 # The previous character is a space. 1041 previous_space = False 1042 1043 # The previous character is a break. 1044 previous_break = False 1045 1046 index = 0 1047 while index < len(scalar): 1048 ch = scalar[index] 1049 1050 # Check for indicators. 1051 if index == 0: 1052 # Leading indicators are special characters. 1053 if ch in u'#,[]{}&*!|>\'"%@`': 1054 flow_indicators = True 1055 block_indicators = True 1056 if ch in u'?:': # ToDo 1057 if self.serializer.use_version == (1, 1): 1058 flow_indicators = True 1059 elif len(scalar) == 1: # single character 1060 flow_indicators = True 1061 if followed_by_whitespace: 1062 block_indicators = True 1063 if ch == u'-' and followed_by_whitespace: 1064 flow_indicators = True 1065 block_indicators = True 1066 else: 1067 # Some indicators cannot appear within a scalar as well. 1068 if ch in u',[]{}': # http://yaml.org/spec/1.2/spec.html#id2788859 1069 flow_indicators = True 1070 if ch == u'?' and self.serializer.use_version == (1, 1): 1071 flow_indicators = True 1072 if ch == u':': 1073 if followed_by_whitespace: 1074 flow_indicators = True 1075 block_indicators = True 1076 if ch == u'#' and preceeded_by_whitespace: 1077 flow_indicators = True 1078 block_indicators = True 1079 1080 # Check for line breaks, special, and unicode characters. 1081 if ch in u'\n\x85\u2028\u2029': 1082 line_breaks = True 1083 if not (ch == u'\n' or u'\x20' <= ch <= u'\x7E'): 1084 if ( 1085 ch == u'\x85' 1086 or u'\xA0' <= ch <= u'\uD7FF' 1087 or u'\uE000' <= ch <= u'\uFFFD' 1088 or (self.unicode_supplementary and (u'\U00010000' <= ch <= u'\U0010FFFF')) 1089 ) and ch != u'\uFEFF': 1090 # unicode_characters = True 1091 if not self.allow_unicode: 1092 special_characters = True 1093 else: 1094 special_characters = True 1095 1096 # Detect important whitespace combinations. 1097 if ch == u' ': 1098 if index == 0: 1099 leading_space = True 1100 if index == len(scalar) - 1: 1101 trailing_space = True 1102 if previous_break: 1103 break_space = True 1104 previous_space = True 1105 previous_break = False 1106 elif ch in u'\n\x85\u2028\u2029': 1107 if index == 0: 1108 leading_break = True 1109 if index == len(scalar) - 1: 1110 trailing_break = True 1111 if previous_space: 1112 space_break = True 1113 previous_space = False 1114 previous_break = True 1115 else: 1116 previous_space = False 1117 previous_break = False 1118 1119 # Prepare for the next character. 1120 index += 1 1121 preceeded_by_whitespace = ch in u'\0 \t\r\n\x85\u2028\u2029' 1122 followed_by_whitespace = ( 1123 index + 1 >= len(scalar) or scalar[index + 1] in u'\0 \t\r\n\x85\u2028\u2029' 1124 ) 1125 1126 # Let's decide what styles are allowed. 1127 allow_flow_plain = True 1128 allow_block_plain = True 1129 allow_single_quoted = True 1130 allow_double_quoted = True 1131 allow_block = True 1132 1133 # Leading and trailing whitespaces are bad for plain scalars. 1134 if leading_space or leading_break or trailing_space or trailing_break: 1135 allow_flow_plain = allow_block_plain = False 1136 1137 # We do not permit trailing spaces for block scalars. 1138 if trailing_space: 1139 allow_block = False 1140 1141 # Spaces at the beginning of a new line are only acceptable for block 1142 # scalars. 1143 if break_space: 1144 allow_flow_plain = allow_block_plain = allow_single_quoted = False 1145 1146 # Spaces followed by breaks, as well as special character are only 1147 # allowed for double quoted scalars. 1148 if special_characters: 1149 allow_flow_plain = allow_block_plain = allow_single_quoted = allow_block = False 1150 elif space_break: 1151 allow_flow_plain = allow_block_plain = allow_single_quoted = False 1152 if not self.allow_space_break: 1153 allow_block = False 1154 1155 # Although the plain scalar writer supports breaks, we never emit 1156 # multiline plain scalars. 1157 if line_breaks: 1158 allow_flow_plain = allow_block_plain = False 1159 1160 # Flow indicators are forbidden for flow plain scalars. 1161 if flow_indicators: 1162 allow_flow_plain = False 1163 1164 # Block indicators are forbidden for block plain scalars. 1165 if block_indicators: 1166 allow_block_plain = False 1167 1168 return ScalarAnalysis( 1169 scalar=scalar, 1170 empty=False, 1171 multiline=line_breaks, 1172 allow_flow_plain=allow_flow_plain, 1173 allow_block_plain=allow_block_plain, 1174 allow_single_quoted=allow_single_quoted, 1175 allow_double_quoted=allow_double_quoted, 1176 allow_block=allow_block, 1177 ) 1178 1179 # Writers. 1180 1181 def flush_stream(self): 1182 # type: () -> None 1183 if hasattr(self.stream, 'flush'): 1184 self.stream.flush() 1185 1186 def write_stream_start(self): 1187 # type: () -> None 1188 # Write BOM if needed. 1189 if self.encoding and self.encoding.startswith('utf-16'): 1190 self.stream.write(u'\uFEFF'.encode(self.encoding)) 1191 1192 def write_stream_end(self): 1193 # type: () -> None 1194 self.flush_stream() 1195 1196 def write_indicator(self, indicator, need_whitespace, whitespace=False, indention=False): 1197 # type: (Any, Any, bool, bool) -> None 1198 if self.whitespace or not need_whitespace: 1199 data = indicator 1200 else: 1201 data = u' ' + indicator 1202 self.whitespace = whitespace 1203 self.indention = self.indention and indention 1204 self.column += len(data) 1205 self.open_ended = False 1206 if bool(self.encoding): 1207 data = data.encode(self.encoding) 1208 self.stream.write(data) 1209 1210 def write_indent(self): 1211 # type: () -> None 1212 indent = self.indent or 0 1213 if ( 1214 not self.indention 1215 or self.column > indent 1216 or (self.column == indent and not self.whitespace) 1217 ): 1218 if bool(self.no_newline): 1219 self.no_newline = False 1220 else: 1221 self.write_line_break() 1222 if self.column < indent: 1223 self.whitespace = True 1224 data = u' ' * (indent - self.column) 1225 self.column = indent 1226 if self.encoding: 1227 data = data.encode(self.encoding) 1228 self.stream.write(data) 1229 1230 def write_line_break(self, data=None): 1231 # type: (Any) -> None 1232 if data is None: 1233 data = self.best_line_break 1234 self.whitespace = True 1235 self.indention = True 1236 self.line += 1 1237 self.column = 0 1238 if bool(self.encoding): 1239 data = data.encode(self.encoding) 1240 self.stream.write(data) 1241 1242 def write_version_directive(self, version_text): 1243 # type: (Any) -> None 1244 data = u'%%YAML %s' % version_text 1245 if self.encoding: 1246 data = data.encode(self.encoding) 1247 self.stream.write(data) 1248 self.write_line_break() 1249 1250 def write_tag_directive(self, handle_text, prefix_text): 1251 # type: (Any, Any) -> None 1252 data = u'%%TAG %s %s' % (handle_text, prefix_text) 1253 if self.encoding: 1254 data = data.encode(self.encoding) 1255 self.stream.write(data) 1256 self.write_line_break() 1257 1258 # Scalar streams. 1259 1260 def write_single_quoted(self, text, split=True): 1261 # type: (Any, Any) -> None 1262 if self.root_context: 1263 if self.requested_indent is not None: 1264 self.write_line_break() 1265 if self.requested_indent != 0: 1266 self.write_indent() 1267 self.write_indicator(u"'", True) 1268 spaces = False 1269 breaks = False 1270 start = end = 0 1271 while end <= len(text): 1272 ch = None 1273 if end < len(text): 1274 ch = text[end] 1275 if spaces: 1276 if ch is None or ch != u' ': 1277 if ( 1278 start + 1 == end 1279 and self.column > self.best_width 1280 and split 1281 and start != 0 1282 and end != len(text) 1283 ): 1284 self.write_indent() 1285 else: 1286 data = text[start:end] 1287 self.column += len(data) 1288 if bool(self.encoding): 1289 data = data.encode(self.encoding) 1290 self.stream.write(data) 1291 start = end 1292 elif breaks: 1293 if ch is None or ch not in u'\n\x85\u2028\u2029': 1294 if text[start] == u'\n': 1295 self.write_line_break() 1296 for br in text[start:end]: 1297 if br == u'\n': 1298 self.write_line_break() 1299 else: 1300 self.write_line_break(br) 1301 self.write_indent() 1302 start = end 1303 else: 1304 if ch is None or ch in u' \n\x85\u2028\u2029' or ch == u"'": 1305 if start < end: 1306 data = text[start:end] 1307 self.column += len(data) 1308 if bool(self.encoding): 1309 data = data.encode(self.encoding) 1310 self.stream.write(data) 1311 start = end 1312 if ch == u"'": 1313 data = u"''" 1314 self.column += 2 1315 if bool(self.encoding): 1316 data = data.encode(self.encoding) 1317 self.stream.write(data) 1318 start = end + 1 1319 if ch is not None: 1320 spaces = ch == u' ' 1321 breaks = ch in u'\n\x85\u2028\u2029' 1322 end += 1 1323 self.write_indicator(u"'", False) 1324 1325 ESCAPE_REPLACEMENTS = { 1326 u'\0': u'0', 1327 u'\x07': u'a', 1328 u'\x08': u'b', 1329 u'\x09': u't', 1330 u'\x0A': u'n', 1331 u'\x0B': u'v', 1332 u'\x0C': u'f', 1333 u'\x0D': u'r', 1334 u'\x1B': u'e', 1335 u'"': u'"', 1336 u'\\': u'\\', 1337 u'\x85': u'N', 1338 u'\xA0': u'_', 1339 u'\u2028': u'L', 1340 u'\u2029': u'P', 1341 } 1342 1343 def write_double_quoted(self, text, split=True): 1344 # type: (Any, Any) -> None 1345 if self.root_context: 1346 if self.requested_indent is not None: 1347 self.write_line_break() 1348 if self.requested_indent != 0: 1349 self.write_indent() 1350 self.write_indicator(u'"', True) 1351 start = end = 0 1352 while end <= len(text): 1353 ch = None 1354 if end < len(text): 1355 ch = text[end] 1356 if ( 1357 ch is None 1358 or ch in u'"\\\x85\u2028\u2029\uFEFF' 1359 or not ( 1360 u'\x20' <= ch <= u'\x7E' 1361 or ( 1362 self.allow_unicode 1363 and (u'\xA0' <= ch <= u'\uD7FF' or u'\uE000' <= ch <= u'\uFFFD') 1364 ) 1365 ) 1366 ): 1367 if start < end: 1368 data = text[start:end] 1369 self.column += len(data) 1370 if bool(self.encoding): 1371 data = data.encode(self.encoding) 1372 self.stream.write(data) 1373 start = end 1374 if ch is not None: 1375 if ch in self.ESCAPE_REPLACEMENTS: 1376 data = u'\\' + self.ESCAPE_REPLACEMENTS[ch] 1377 elif ch <= u'\xFF': 1378 data = u'\\x%02X' % ord(ch) 1379 elif ch <= u'\uFFFF': 1380 data = u'\\u%04X' % ord(ch) 1381 else: 1382 data = u'\\U%08X' % ord(ch) 1383 self.column += len(data) 1384 if bool(self.encoding): 1385 data = data.encode(self.encoding) 1386 self.stream.write(data) 1387 start = end + 1 1388 if ( 1389 0 < end < len(text) - 1 1390 and (ch == u' ' or start >= end) 1391 and self.column + (end - start) > self.best_width 1392 and split 1393 ): 1394 data = text[start:end] + u'\\' 1395 if start < end: 1396 start = end 1397 self.column += len(data) 1398 if bool(self.encoding): 1399 data = data.encode(self.encoding) 1400 self.stream.write(data) 1401 self.write_indent() 1402 self.whitespace = False 1403 self.indention = False 1404 if text[start] == u' ': 1405 data = u'\\' 1406 self.column += len(data) 1407 if bool(self.encoding): 1408 data = data.encode(self.encoding) 1409 self.stream.write(data) 1410 end += 1 1411 self.write_indicator(u'"', False) 1412 1413 def determine_block_hints(self, text): 1414 # type: (Any) -> Any 1415 indent = 0 1416 indicator = u'' 1417 hints = u'' 1418 if text: 1419 if text[0] in u' \n\x85\u2028\u2029': 1420 indent = self.best_sequence_indent 1421 hints += text_type(indent) 1422 elif self.root_context: 1423 for end in ['\n---', '\n...']: 1424 pos = 0 1425 while True: 1426 pos = text.find(end, pos) 1427 if pos == -1: 1428 break 1429 try: 1430 if text[pos + 4] in ' \r\n': 1431 break 1432 except IndexError: 1433 pass 1434 pos += 1 1435 if pos > -1: 1436 break 1437 if pos > 0: 1438 indent = self.best_sequence_indent 1439 if text[-1] not in u'\n\x85\u2028\u2029': 1440 indicator = u'-' 1441 elif len(text) == 1 or text[-2] in u'\n\x85\u2028\u2029': 1442 indicator = u'+' 1443 hints += indicator 1444 return hints, indent, indicator 1445 1446 def write_folded(self, text): 1447 # type: (Any) -> None 1448 hints, _indent, _indicator = self.determine_block_hints(text) 1449 self.write_indicator(u'>' + hints, True) 1450 if _indicator == u'+': 1451 self.open_ended = True 1452 self.write_line_break() 1453 leading_space = True 1454 spaces = False 1455 breaks = True 1456 start = end = 0 1457 while end <= len(text): 1458 ch = None 1459 if end < len(text): 1460 ch = text[end] 1461 if breaks: 1462 if ch is None or ch not in u'\n\x85\u2028\u2029\a': 1463 if ( 1464 not leading_space 1465 and ch is not None 1466 and ch != u' ' 1467 and text[start] == u'\n' 1468 ): 1469 self.write_line_break() 1470 leading_space = ch == u' ' 1471 for br in text[start:end]: 1472 if br == u'\n': 1473 self.write_line_break() 1474 else: 1475 self.write_line_break(br) 1476 if ch is not None: 1477 self.write_indent() 1478 start = end 1479 elif spaces: 1480 if ch != u' ': 1481 if start + 1 == end and self.column > self.best_width: 1482 self.write_indent() 1483 else: 1484 data = text[start:end] 1485 self.column += len(data) 1486 if bool(self.encoding): 1487 data = data.encode(self.encoding) 1488 self.stream.write(data) 1489 start = end 1490 else: 1491 if ch is None or ch in u' \n\x85\u2028\u2029\a': 1492 data = text[start:end] 1493 self.column += len(data) 1494 if bool(self.encoding): 1495 data = data.encode(self.encoding) 1496 self.stream.write(data) 1497 if ch == u'\a': 1498 if end < (len(text) - 1) and not text[end + 2].isspace(): 1499 self.write_line_break() 1500 self.write_indent() 1501 end += 2 # \a and the space that is inserted on the fold 1502 else: 1503 raise EmitterError('unexcpected fold indicator \\a before space') 1504 if ch is None: 1505 self.write_line_break() 1506 start = end 1507 if ch is not None: 1508 breaks = ch in u'\n\x85\u2028\u2029' 1509 spaces = ch == u' ' 1510 end += 1 1511 1512 def write_literal(self, text, comment=None): 1513 # type: (Any, Any) -> None 1514 hints, _indent, _indicator = self.determine_block_hints(text) 1515 self.write_indicator(u'|' + hints, True) 1516 try: 1517 comment = comment[1][0] 1518 if comment: 1519 self.stream.write(comment) 1520 except (TypeError, IndexError): 1521 pass 1522 if _indicator == u'+': 1523 self.open_ended = True 1524 self.write_line_break() 1525 breaks = True 1526 start = end = 0 1527 while end <= len(text): 1528 ch = None 1529 if end < len(text): 1530 ch = text[end] 1531 if breaks: 1532 if ch is None or ch not in u'\n\x85\u2028\u2029': 1533 for br in text[start:end]: 1534 if br == u'\n': 1535 self.write_line_break() 1536 else: 1537 self.write_line_break(br) 1538 if ch is not None: 1539 if self.root_context: 1540 idnx = self.indent if self.indent is not None else 0 1541 self.stream.write(u' ' * (_indent + idnx)) 1542 else: 1543 self.write_indent() 1544 start = end 1545 else: 1546 if ch is None or ch in u'\n\x85\u2028\u2029': 1547 data = text[start:end] 1548 if bool(self.encoding): 1549 data = data.encode(self.encoding) 1550 self.stream.write(data) 1551 if ch is None: 1552 self.write_line_break() 1553 start = end 1554 if ch is not None: 1555 breaks = ch in u'\n\x85\u2028\u2029' 1556 end += 1 1557 1558 def write_plain(self, text, split=True): 1559 # type: (Any, Any) -> None 1560 if self.root_context: 1561 if self.requested_indent is not None: 1562 self.write_line_break() 1563 if self.requested_indent != 0: 1564 self.write_indent() 1565 else: 1566 self.open_ended = True 1567 if not text: 1568 return 1569 if not self.whitespace: 1570 data = u' ' 1571 self.column += len(data) 1572 if self.encoding: 1573 data = data.encode(self.encoding) 1574 self.stream.write(data) 1575 self.whitespace = False 1576 self.indention = False 1577 spaces = False 1578 breaks = False 1579 start = end = 0 1580 while end <= len(text): 1581 ch = None 1582 if end < len(text): 1583 ch = text[end] 1584 if spaces: 1585 if ch != u' ': 1586 if start + 1 == end and self.column > self.best_width and split: 1587 self.write_indent() 1588 self.whitespace = False 1589 self.indention = False 1590 else: 1591 data = text[start:end] 1592 self.column += len(data) 1593 if self.encoding: 1594 data = data.encode(self.encoding) 1595 self.stream.write(data) 1596 start = end 1597 elif breaks: 1598 if ch not in u'\n\x85\u2028\u2029': # type: ignore 1599 if text[start] == u'\n': 1600 self.write_line_break() 1601 for br in text[start:end]: 1602 if br == u'\n': 1603 self.write_line_break() 1604 else: 1605 self.write_line_break(br) 1606 self.write_indent() 1607 self.whitespace = False 1608 self.indention = False 1609 start = end 1610 else: 1611 if ch is None or ch in u' \n\x85\u2028\u2029': 1612 data = text[start:end] 1613 self.column += len(data) 1614 if self.encoding: 1615 data = data.encode(self.encoding) 1616 try: 1617 self.stream.write(data) 1618 except: # NOQA 1619 sys.stdout.write(repr(data) + '\n') 1620 raise 1621 start = end 1622 if ch is not None: 1623 spaces = ch == u' ' 1624 breaks = ch in u'\n\x85\u2028\u2029' 1625 end += 1 1626 1627 def write_comment(self, comment, pre=False): 1628 # type: (Any, bool) -> None 1629 value = comment.value 1630 # nprintf('{:02d} {:02d} {!r}'.format(self.column, comment.start_mark.column, value)) 1631 if not pre and value[-1] == '\n': 1632 value = value[:-1] 1633 try: 1634 # get original column position 1635 col = comment.start_mark.column 1636 if comment.value and comment.value.startswith('\n'): 1637 # never inject extra spaces if the comment starts with a newline 1638 # and not a real comment (e.g. if you have an empty line following a key-value 1639 col = self.column 1640 elif col < self.column + 1: 1641 ValueError 1642 except ValueError: 1643 col = self.column + 1 1644 # nprint('post_comment', self.line, self.column, value) 1645 try: 1646 # at least one space if the current column >= the start column of the comment 1647 # but not at the start of a line 1648 nr_spaces = col - self.column 1649 if self.column and value.strip() and nr_spaces < 1 and value[0] != '\n': 1650 nr_spaces = 1 1651 value = ' ' * nr_spaces + value 1652 try: 1653 if bool(self.encoding): 1654 value = value.encode(self.encoding) 1655 except UnicodeDecodeError: 1656 pass 1657 self.stream.write(value) 1658 except TypeError: 1659 raise 1660 if not pre: 1661 self.write_line_break() 1662 1663 def write_pre_comment(self, event): 1664 # type: (Any) -> bool 1665 comments = event.comment[1] 1666 if comments is None: 1667 return False 1668 try: 1669 start_events = (MappingStartEvent, SequenceStartEvent) 1670 for comment in comments: 1671 if isinstance(event, start_events) and getattr(comment, 'pre_done', None): 1672 continue 1673 if self.column != 0: 1674 self.write_line_break() 1675 self.write_comment(comment, pre=True) 1676 if isinstance(event, start_events): 1677 comment.pre_done = True 1678 except TypeError: 1679 sys.stdout.write('eventtt {} {}'.format(type(event), event)) 1680 raise 1681 return True 1682 1683 def write_post_comment(self, event): 1684 # type: (Any) -> bool 1685 if self.event.comment[0] is None: 1686 return False 1687 comment = event.comment[0] 1688 self.write_comment(comment) 1689 return True 1690