1# coding: utf-8 2 3from __future__ import absolute_import 4 5from ruamel.yaml.error import YAMLError 6from ruamel.yaml.compat import nprint, DBG_NODE, dbg, string_types, nprintf # NOQA 7from ruamel.yaml.util import RegExp 8 9from ruamel.yaml.events import ( 10 StreamStartEvent, 11 StreamEndEvent, 12 MappingStartEvent, 13 MappingEndEvent, 14 SequenceStartEvent, 15 SequenceEndEvent, 16 AliasEvent, 17 ScalarEvent, 18 DocumentStartEvent, 19 DocumentEndEvent, 20) 21from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode 22 23if False: # MYPY 24 from typing import Any, Dict, Union, Text, Optional # NOQA 25 from ruamel.yaml.compat import VersionType # NOQA 26 27__all__ = ['Serializer', 'SerializerError'] 28 29 30class SerializerError(YAMLError): 31 pass 32 33 34class Serializer(object): 35 36 # 'id' and 3+ numbers, but not 000 37 ANCHOR_TEMPLATE = u'id%03d' 38 ANCHOR_RE = RegExp(u'id(?!000$)\\d{3,}') 39 40 def __init__( 41 self, 42 encoding=None, 43 explicit_start=None, 44 explicit_end=None, 45 version=None, 46 tags=None, 47 dumper=None, 48 ): 49 # type: (Any, Optional[bool], Optional[bool], Optional[VersionType], Any, Any) -> None # NOQA 50 self.dumper = dumper 51 if self.dumper is not None: 52 self.dumper._serializer = self 53 self.use_encoding = encoding 54 self.use_explicit_start = explicit_start 55 self.use_explicit_end = explicit_end 56 if isinstance(version, string_types): 57 self.use_version = tuple(map(int, version.split('.'))) 58 else: 59 self.use_version = version # type: ignore 60 self.use_tags = tags 61 self.serialized_nodes = {} # type: Dict[Any, Any] 62 self.anchors = {} # type: Dict[Any, Any] 63 self.last_anchor_id = 0 64 self.closed = None # type: Optional[bool] 65 self._templated_id = None 66 67 @property 68 def emitter(self): 69 # type: () -> Any 70 if hasattr(self.dumper, 'typ'): 71 return self.dumper.emitter 72 return self.dumper._emitter 73 74 @property 75 def resolver(self): 76 # type: () -> Any 77 if hasattr(self.dumper, 'typ'): 78 self.dumper.resolver 79 return self.dumper._resolver 80 81 def open(self): 82 # type: () -> None 83 if self.closed is None: 84 self.emitter.emit(StreamStartEvent(encoding=self.use_encoding)) 85 self.closed = False 86 elif self.closed: 87 raise SerializerError('serializer is closed') 88 else: 89 raise SerializerError('serializer is already opened') 90 91 def close(self): 92 # type: () -> None 93 if self.closed is None: 94 raise SerializerError('serializer is not opened') 95 elif not self.closed: 96 self.emitter.emit(StreamEndEvent()) 97 self.closed = True 98 99 # def __del__(self): 100 # self.close() 101 102 def serialize(self, node): 103 # type: (Any) -> None 104 if dbg(DBG_NODE): 105 nprint('Serializing nodes') 106 node.dump() 107 if self.closed is None: 108 raise SerializerError('serializer is not opened') 109 elif self.closed: 110 raise SerializerError('serializer is closed') 111 self.emitter.emit( 112 DocumentStartEvent( 113 explicit=self.use_explicit_start, version=self.use_version, tags=self.use_tags 114 ) 115 ) 116 self.anchor_node(node) 117 self.serialize_node(node, None, None) 118 self.emitter.emit(DocumentEndEvent(explicit=self.use_explicit_end)) 119 self.serialized_nodes = {} 120 self.anchors = {} 121 self.last_anchor_id = 0 122 123 def anchor_node(self, node): 124 # type: (Any) -> None 125 if node in self.anchors: 126 if self.anchors[node] is None: 127 self.anchors[node] = self.generate_anchor(node) 128 else: 129 anchor = None 130 try: 131 if node.anchor.always_dump: 132 anchor = node.anchor.value 133 except: # NOQA 134 pass 135 self.anchors[node] = anchor 136 if isinstance(node, SequenceNode): 137 for item in node.value: 138 self.anchor_node(item) 139 elif isinstance(node, MappingNode): 140 for key, value in node.value: 141 self.anchor_node(key) 142 self.anchor_node(value) 143 144 def generate_anchor(self, node): 145 # type: (Any) -> Any 146 try: 147 anchor = node.anchor.value 148 except: # NOQA 149 anchor = None 150 if anchor is None: 151 self.last_anchor_id += 1 152 return self.ANCHOR_TEMPLATE % self.last_anchor_id 153 return anchor 154 155 def serialize_node(self, node, parent, index): 156 # type: (Any, Any, Any) -> None 157 alias = self.anchors[node] 158 if node in self.serialized_nodes: 159 self.emitter.emit(AliasEvent(alias)) 160 else: 161 self.serialized_nodes[node] = True 162 self.resolver.descend_resolver(parent, index) 163 if isinstance(node, ScalarNode): 164 # here check if the node.tag equals the one that would result from parsing 165 # if not equal quoting is necessary for strings 166 detected_tag = self.resolver.resolve(ScalarNode, node.value, (True, False)) 167 default_tag = self.resolver.resolve(ScalarNode, node.value, (False, True)) 168 implicit = ( 169 (node.tag == detected_tag), 170 (node.tag == default_tag), 171 node.tag.startswith('tag:yaml.org,2002:'), 172 ) 173 self.emitter.emit( 174 ScalarEvent( 175 alias, 176 node.tag, 177 implicit, 178 node.value, 179 style=node.style, 180 comment=node.comment, 181 ) 182 ) 183 elif isinstance(node, SequenceNode): 184 implicit = node.tag == self.resolver.resolve(SequenceNode, node.value, True) 185 comment = node.comment 186 end_comment = None 187 seq_comment = None 188 if node.flow_style is True: 189 if comment: # eol comment on flow style sequence 190 seq_comment = comment[0] 191 # comment[0] = None 192 if comment and len(comment) > 2: 193 end_comment = comment[2] 194 else: 195 end_comment = None 196 self.emitter.emit( 197 SequenceStartEvent( 198 alias, 199 node.tag, 200 implicit, 201 flow_style=node.flow_style, 202 comment=node.comment, 203 ) 204 ) 205 index = 0 206 for item in node.value: 207 self.serialize_node(item, node, index) 208 index += 1 209 self.emitter.emit(SequenceEndEvent(comment=[seq_comment, end_comment])) 210 elif isinstance(node, MappingNode): 211 implicit = node.tag == self.resolver.resolve(MappingNode, node.value, True) 212 comment = node.comment 213 end_comment = None 214 map_comment = None 215 if node.flow_style is True: 216 if comment: # eol comment on flow style sequence 217 map_comment = comment[0] 218 # comment[0] = None 219 if comment and len(comment) > 2: 220 end_comment = comment[2] 221 self.emitter.emit( 222 MappingStartEvent( 223 alias, 224 node.tag, 225 implicit, 226 flow_style=node.flow_style, 227 comment=node.comment, 228 nr_items=len(node.value), 229 ) 230 ) 231 for key, value in node.value: 232 self.serialize_node(key, node, None) 233 self.serialize_node(value, node, key) 234 self.emitter.emit(MappingEndEvent(comment=[map_comment, end_comment])) 235 self.resolver.ascend_resolver() 236 237 238def templated_id(s): 239 # type: (Text) -> Any 240 return Serializer.ANCHOR_RE.match(s) 241