1# coding: utf-8 2 3from __future__ import print_function 4 5# partially from package six by Benjamin Peterson 6 7import sys 8import os 9import types 10import traceback 11from abc import abstractmethod 12 13 14# fmt: off 15if False: # MYPY 16 from typing import Any, Dict, Optional, List, Union, BinaryIO, IO, Text, Tuple # NOQA 17 from typing import Optional # NOQA 18# fmt: on 19 20_DEFAULT_YAML_VERSION = (1, 2) 21 22try: 23 from ruamel.ordereddict import ordereddict 24except: # NOQA 25 try: 26 from collections import OrderedDict 27 except ImportError: 28 from ordereddict import OrderedDict # type: ignore 29 # to get the right name import ... as ordereddict doesn't do that 30 31 class ordereddict(OrderedDict): # type: ignore 32 if not hasattr(OrderedDict, 'insert'): 33 34 def insert(self, pos, key, value): 35 # type: (int, Any, Any) -> None 36 if pos >= len(self): 37 self[key] = value 38 return 39 od = ordereddict() 40 od.update(self) 41 for k in od: 42 del self[k] 43 for index, old_key in enumerate(od): 44 if pos == index: 45 self[key] = value 46 self[old_key] = od[old_key] 47 48 49PY2 = sys.version_info[0] == 2 50PY3 = sys.version_info[0] == 3 51 52 53if PY3: 54 55 def utf8(s): 56 # type: (str) -> str 57 return s 58 59 def to_str(s): 60 # type: (str) -> str 61 return s 62 63 def to_unicode(s): 64 # type: (str) -> str 65 return s 66 67 68else: 69 if False: 70 unicode = str 71 72 def utf8(s): 73 # type: (unicode) -> str 74 return s.encode('utf-8') 75 76 def to_str(s): 77 # type: (str) -> str 78 return str(s) 79 80 def to_unicode(s): 81 # type: (str) -> unicode 82 return unicode(s) # NOQA 83 84 85if PY3: 86 string_types = str 87 integer_types = int 88 class_types = type 89 text_type = str 90 binary_type = bytes 91 92 MAXSIZE = sys.maxsize 93 unichr = chr 94 import io 95 96 StringIO = io.StringIO 97 BytesIO = io.BytesIO 98 # have unlimited precision 99 no_limit_int = int 100 from collections.abc import Hashable, MutableSequence, MutableMapping, Mapping # NOQA 101 102else: 103 string_types = basestring # NOQA 104 integer_types = (int, long) # NOQA 105 class_types = (type, types.ClassType) 106 text_type = unicode # NOQA 107 binary_type = str 108 109 # to allow importing 110 unichr = unichr 111 from StringIO import StringIO as _StringIO 112 113 StringIO = _StringIO 114 import cStringIO 115 116 BytesIO = cStringIO.StringIO 117 # have unlimited precision 118 no_limit_int = long # NOQA not available on Python 3 119 from collections import Hashable, MutableSequence, MutableMapping, Mapping # NOQA 120 121if False: # MYPY 122 # StreamType = Union[BinaryIO, IO[str], IO[unicode], StringIO] 123 # StreamType = Union[BinaryIO, IO[str], StringIO] # type: ignore 124 StreamType = Any 125 126 StreamTextType = StreamType # Union[Text, StreamType] 127 VersionType = Union[List[int], str, Tuple[int, int]] 128 129if PY3: 130 builtins_module = 'builtins' 131else: 132 builtins_module = '__builtin__' 133 134UNICODE_SIZE = 4 if sys.maxunicode > 65535 else 2 135 136 137def with_metaclass(meta, *bases): 138 # type: (Any, Any) -> Any 139 """Create a base class with a metaclass.""" 140 return meta('NewBase', bases, {}) 141 142 143DBG_TOKEN = 1 144DBG_EVENT = 2 145DBG_NODE = 4 146 147 148_debug = None # type: Optional[int] 149if 'RUAMELDEBUG' in os.environ: 150 _debugx = os.environ.get('RUAMELDEBUG') 151 if _debugx is None: 152 _debug = 0 153 else: 154 _debug = int(_debugx) 155 156 157if bool(_debug): 158 159 class ObjectCounter(object): 160 def __init__(self): 161 # type: () -> None 162 self.map = {} # type: Dict[Any, Any] 163 164 def __call__(self, k): 165 # type: (Any) -> None 166 self.map[k] = self.map.get(k, 0) + 1 167 168 def dump(self): 169 # type: () -> None 170 for k in sorted(self.map): 171 sys.stdout.write('{} -> {}'.format(k, self.map[k])) 172 173 object_counter = ObjectCounter() 174 175 176# used from yaml util when testing 177def dbg(val=None): 178 # type: (Any) -> Any 179 global _debug 180 if _debug is None: 181 # set to true or false 182 _debugx = os.environ.get('YAMLDEBUG') 183 if _debugx is None: 184 _debug = 0 185 else: 186 _debug = int(_debugx) 187 if val is None: 188 return _debug 189 return _debug & val 190 191 192class Nprint(object): 193 def __init__(self, file_name=None): 194 # type: (Any) -> None 195 self._max_print = None # type: Any 196 self._count = None # type: Any 197 self._file_name = file_name 198 199 def __call__(self, *args, **kw): 200 # type: (Any, Any) -> None 201 if not bool(_debug): 202 return 203 out = sys.stdout if self._file_name is None else open(self._file_name, 'a') 204 dbgprint = print # to fool checking for print statements by dv utility 205 kw1 = kw.copy() 206 kw1['file'] = out 207 dbgprint(*args, **kw1) 208 out.flush() 209 if self._max_print is not None: 210 if self._count is None: 211 self._count = self._max_print 212 self._count -= 1 213 if self._count == 0: 214 dbgprint('forced exit\n') 215 traceback.print_stack() 216 out.flush() 217 sys.exit(0) 218 if self._file_name: 219 out.close() 220 221 def set_max_print(self, i): 222 # type: (int) -> None 223 self._max_print = i 224 self._count = None 225 226 227nprint = Nprint() 228nprintf = Nprint('/var/tmp/ruamel.yaml.log') 229 230# char checkers following production rules 231 232 233def check_namespace_char(ch): 234 # type: (Any) -> bool 235 if u'\x21' <= ch <= u'\x7E': # ! to ~ 236 return True 237 if u'\xA0' <= ch <= u'\uD7FF': 238 return True 239 if (u'\uE000' <= ch <= u'\uFFFD') and ch != u'\uFEFF': # excl. byte order mark 240 return True 241 if u'\U00010000' <= ch <= u'\U0010FFFF': 242 return True 243 return False 244 245 246def check_anchorname_char(ch): 247 # type: (Any) -> bool 248 if ch in u',[]{}': 249 return False 250 return check_namespace_char(ch) 251 252 253def version_tnf(t1, t2=None): 254 # type: (Any, Any) -> Any 255 """ 256 return True if ruamel.yaml version_info < t1, None if t2 is specified and bigger else False 257 """ 258 from ruamel.yaml import version_info # NOQA 259 260 if version_info < t1: 261 return True 262 if t2 is not None and version_info < t2: 263 return None 264 return False 265 266 267class MutableSliceableSequence(MutableSequence): # type: ignore 268 __slots__ = () 269 270 def __getitem__(self, index): 271 # type: (Any) -> Any 272 if not isinstance(index, slice): 273 return self.__getsingleitem__(index) 274 return type(self)([self[i] for i in range(*index.indices(len(self)))]) # type: ignore 275 276 def __setitem__(self, index, value): 277 # type: (Any, Any) -> None 278 if not isinstance(index, slice): 279 return self.__setsingleitem__(index, value) 280 assert iter(value) 281 # nprint(index.start, index.stop, index.step, index.indices(len(self))) 282 if index.step is None: 283 del self[index.start : index.stop] 284 for elem in reversed(value): 285 self.insert(0 if index.start is None else index.start, elem) 286 else: 287 range_parms = index.indices(len(self)) 288 nr_assigned_items = (range_parms[1] - range_parms[0] - 1) // range_parms[2] + 1 289 # need to test before changing, in case TypeError is caught 290 if nr_assigned_items < len(value): 291 raise TypeError( 292 'too many elements in value {} < {}'.format(nr_assigned_items, len(value)) 293 ) 294 elif nr_assigned_items > len(value): 295 raise TypeError( 296 'not enough elements in value {} > {}'.format( 297 nr_assigned_items, len(value) 298 ) 299 ) 300 for idx, i in enumerate(range(*range_parms)): 301 self[i] = value[idx] 302 303 def __delitem__(self, index): 304 # type: (Any) -> None 305 if not isinstance(index, slice): 306 return self.__delsingleitem__(index) 307 # nprint(index.start, index.stop, index.step, index.indices(len(self))) 308 for i in reversed(range(*index.indices(len(self)))): 309 del self[i] 310 311 @abstractmethod 312 def __getsingleitem__(self, index): 313 # type: (Any) -> Any 314 raise IndexError 315 316 @abstractmethod 317 def __setsingleitem__(self, index, value): 318 # type: (Any, Any) -> None 319 raise IndexError 320 321 @abstractmethod 322 def __delsingleitem__(self, index): 323 # type: (Any) -> None 324 raise IndexError 325