1# coding: utf-8 2 3from __future__ import print_function 4 5# partially from package six by Benjamin Peterson 6 7import sys 8import os 9import types 10import traceback 11from abc import abstractmethod 12from collections import OrderedDict # type: ignore 13 14 15# fmt: off 16if False: # MYPY 17 from typing import Any, Dict, Optional, List, Union, BinaryIO, IO, Text, Tuple # NOQA 18 from typing import Optional # NOQA 19# fmt: on 20 21_DEFAULT_YAML_VERSION = (1, 2) 22 23 24class ordereddict(OrderedDict): # type: ignore 25 if not hasattr(OrderedDict, "insert"): 26 27 def insert(self, pos, key, value): 28 # type: (int, Any, Any) -> None 29 if pos >= len(self): 30 self[key] = value 31 return 32 od = ordereddict() 33 od.update(self) 34 for k in od: 35 del self[k] 36 for index, old_key in enumerate(od): 37 if pos == index: 38 self[key] = value 39 self[old_key] = od[old_key] 40 41 42PY2 = sys.version_info[0] == 2 43PY3 = sys.version_info[0] == 3 44 45 46if PY3: 47 48 def utf8(s): 49 # type: (str) -> str 50 return s 51 52 def to_str(s): 53 # type: (str) -> str 54 return s 55 56 def to_unicode(s): 57 # type: (str) -> str 58 return s 59 60 61else: 62 if False: 63 unicode = str 64 65 def utf8(s): 66 # type: (unicode) -> str 67 return s.encode("utf-8") 68 69 def to_str(s): 70 # type: (str) -> str 71 return str(s) 72 73 def to_unicode(s): 74 # type: (str) -> unicode 75 return unicode(s) # NOQA 76 77 78if PY3: 79 string_types = str 80 integer_types = int 81 class_types = type 82 text_type = str 83 binary_type = bytes 84 85 MAXSIZE = sys.maxsize 86 unichr = chr 87 import io 88 89 StringIO = io.StringIO 90 BytesIO = io.BytesIO 91 # have unlimited precision 92 no_limit_int = int 93 from collections.abc import ( 94 Hashable, 95 MutableSequence, 96 MutableMapping, 97 Mapping, 98 ) # NOQA 99 100else: 101 string_types = basestring # NOQA 102 integer_types = (int, long) # NOQA 103 class_types = (type, types.ClassType) 104 text_type = unicode # NOQA 105 binary_type = str 106 107 # to allow importing 108 unichr = unichr 109 from StringIO import StringIO as _StringIO 110 111 StringIO = _StringIO 112 import cStringIO 113 114 BytesIO = cStringIO.StringIO 115 # have unlimited precision 116 no_limit_int = long # NOQA not available on Python 3 117 from collections import Hashable, MutableSequence, MutableMapping, Mapping # NOQA 118 119if False: # MYPY 120 # StreamType = Union[BinaryIO, IO[str], IO[unicode], StringIO] 121 # StreamType = Union[BinaryIO, IO[str], StringIO] # type: ignore 122 StreamType = Any 123 124 StreamTextType = StreamType # Union[Text, StreamType] 125 VersionType = Union[List[int], str, Tuple[int, int]] 126 127if PY3: 128 builtins_module = "builtins" 129else: 130 builtins_module = "__builtin__" 131 132UNICODE_SIZE = 4 if sys.maxunicode > 65535 else 2 133 134 135def with_metaclass(meta, *bases): 136 # type: (Any, Any) -> Any 137 """Create a base class with a metaclass.""" 138 return meta("NewBase", bases, {}) 139 140 141DBG_TOKEN = 1 142DBG_EVENT = 2 143DBG_NODE = 4 144 145 146_debug = None # type: Optional[int] 147if "RUAMELDEBUG" in os.environ: 148 _debugx = os.environ.get("RUAMELDEBUG") 149 if _debugx is None: 150 _debug = 0 151 else: 152 _debug = int(_debugx) 153 154 155if bool(_debug): 156 157 class ObjectCounter(object): 158 def __init__(self): 159 # type: () -> None 160 self.map = {} # type: Dict[Any, Any] 161 162 def __call__(self, k): 163 # type: (Any) -> None 164 self.map[k] = self.map.get(k, 0) + 1 165 166 def dump(self): 167 # type: () -> None 168 for k in sorted(self.map): 169 sys.stdout.write("{} -> {}".format(k, self.map[k])) 170 171 object_counter = ObjectCounter() 172 173 174# used from yaml util when testing 175def dbg(val=None): 176 # type: (Any) -> Any 177 global _debug 178 if _debug is None: 179 # set to true or false 180 _debugx = os.environ.get("YAMLDEBUG") 181 if _debugx is None: 182 _debug = 0 183 else: 184 _debug = int(_debugx) 185 if val is None: 186 return _debug 187 return _debug & val 188 189 190class Nprint(object): 191 def __init__(self, file_name=None): 192 # type: (Any) -> None 193 self._max_print = None # type: Any 194 self._count = None # type: Any 195 self._file_name = file_name 196 197 def __call__(self, *args, **kw): 198 # type: (Any, Any) -> None 199 if not bool(_debug): 200 return 201 out = sys.stdout if self._file_name is None else open(self._file_name, "a") 202 dbgprint = print # to fool checking for print statements by dv utility 203 kw1 = kw.copy() 204 kw1["file"] = out 205 dbgprint(*args, **kw1) 206 out.flush() 207 if self._max_print is not None: 208 if self._count is None: 209 self._count = self._max_print 210 self._count -= 1 211 if self._count == 0: 212 dbgprint("forced exit\n") 213 traceback.print_stack() 214 out.flush() 215 sys.exit(0) 216 if self._file_name: 217 out.close() 218 219 def set_max_print(self, i): 220 # type: (int) -> None 221 self._max_print = i 222 self._count = None 223 224 225nprint = Nprint() 226nprintf = Nprint("/var/tmp/srsly.ruamel_yaml.log") 227 228# char checkers following production rules 229 230 231def check_namespace_char(ch): 232 # type: (Any) -> bool 233 if u"\x21" <= ch <= u"\x7E": # ! to ~ 234 return True 235 if u"\xA0" <= ch <= u"\uD7FF": 236 return True 237 if (u"\uE000" <= ch <= u"\uFFFD") and ch != u"\uFEFF": # excl. byte order mark 238 return True 239 if u"\U00010000" <= ch <= u"\U0010FFFF": 240 return True 241 return False 242 243 244def check_anchorname_char(ch): 245 # type: (Any) -> bool 246 if ch in u",[]{}": 247 return False 248 return check_namespace_char(ch) 249 250 251def version_tnf(t1, t2=None): 252 # type: (Any, Any) -> Any 253 """ 254 return True if srsly.ruamel_yaml version_info < t1, None if t2 is specified and bigger else False 255 """ 256 from srsly.ruamel_yaml import version_info # NOQA 257 258 if version_info < t1: 259 return True 260 if t2 is not None and version_info < t2: 261 return None 262 return False 263 264 265class MutableSliceableSequence(MutableSequence): # type: ignore 266 __slots__ = () 267 268 def __getitem__(self, index): 269 # type: (Any) -> Any 270 if not isinstance(index, slice): 271 return self.__getsingleitem__(index) 272 return type(self)( 273 [self[i] for i in range(*index.indices(len(self)))] 274 ) # type: ignore 275 276 def __setitem__(self, index, value): 277 # type: (Any, Any) -> None 278 if not isinstance(index, slice): 279 return self.__setsingleitem__(index, value) 280 assert iter(value) 281 # nprint(index.start, index.stop, index.step, index.indices(len(self))) 282 if index.step is None: 283 del self[index.start : index.stop] 284 for elem in reversed(value): 285 self.insert(0 if index.start is None else index.start, elem) 286 else: 287 range_parms = index.indices(len(self)) 288 nr_assigned_items = (range_parms[1] - range_parms[0] - 1) // range_parms[ 289 2 290 ] + 1 291 # need to test before changing, in case TypeError is caught 292 if nr_assigned_items < len(value): 293 raise TypeError( 294 "too many elements in value {} < {}".format( 295 nr_assigned_items, len(value) 296 ) 297 ) 298 elif nr_assigned_items > len(value): 299 raise TypeError( 300 "not enough elements in value {} > {}".format( 301 nr_assigned_items, len(value) 302 ) 303 ) 304 for idx, i in enumerate(range(*range_parms)): 305 self[i] = value[idx] 306 307 def __delitem__(self, index): 308 # type: (Any) -> None 309 if not isinstance(index, slice): 310 return self.__delsingleitem__(index) 311 # nprint(index.start, index.stop, index.step, index.indices(len(self))) 312 for i in reversed(range(*index.indices(len(self)))): 313 del self[i] 314 315 @abstractmethod 316 def __getsingleitem__(self, index): 317 # type: (Any) -> Any 318 raise IndexError 319 320 @abstractmethod 321 def __setsingleitem__(self, index, value): 322 # type: (Any, Any) -> None 323 raise IndexError 324 325 @abstractmethod 326 def __delsingleitem__(self, index): 327 # type: (Any) -> None 328 raise IndexError 329