1# coding: utf-8
2
3from __future__ import print_function
4
5# partially from package six by Benjamin Peterson
6
7import sys
8import os
9import types
10import traceback
11from abc import abstractmethod
12from collections import OrderedDict  # type: ignore
13
14
15# fmt: off
16if False:  # MYPY
17    from typing import Any, Dict, Optional, List, Union, BinaryIO, IO, Text, Tuple  # NOQA
18    from typing import Optional  # NOQA
19# fmt: on
20
21_DEFAULT_YAML_VERSION = (1, 2)
22
23
24class ordereddict(OrderedDict):  # type: ignore
25    if not hasattr(OrderedDict, "insert"):
26
27        def insert(self, pos, key, value):
28            # type: (int, Any, Any) -> None
29            if pos >= len(self):
30                self[key] = value
31                return
32            od = ordereddict()
33            od.update(self)
34            for k in od:
35                del self[k]
36            for index, old_key in enumerate(od):
37                if pos == index:
38                    self[key] = value
39                self[old_key] = od[old_key]
40
41
42PY2 = sys.version_info[0] == 2
43PY3 = sys.version_info[0] == 3
44
45
46if PY3:
47
48    def utf8(s):
49        # type: (str) -> str
50        return s
51
52    def to_str(s):
53        # type: (str) -> str
54        return s
55
56    def to_unicode(s):
57        # type: (str) -> str
58        return s
59
60
61else:
62    if False:
63        unicode = str
64
65    def utf8(s):
66        # type: (unicode) -> str
67        return s.encode("utf-8")
68
69    def to_str(s):
70        # type: (str) -> str
71        return str(s)
72
73    def to_unicode(s):
74        # type: (str) -> unicode
75        return unicode(s)  # NOQA
76
77
78if PY3:
79    string_types = str
80    integer_types = int
81    class_types = type
82    text_type = str
83    binary_type = bytes
84
85    MAXSIZE = sys.maxsize
86    unichr = chr
87    import io
88
89    StringIO = io.StringIO
90    BytesIO = io.BytesIO
91    # have unlimited precision
92    no_limit_int = int
93    from collections.abc import (
94        Hashable,
95        MutableSequence,
96        MutableMapping,
97        Mapping,
98    )  # NOQA
99
100else:
101    string_types = basestring  # NOQA
102    integer_types = (int, long)  # NOQA
103    class_types = (type, types.ClassType)
104    text_type = unicode  # NOQA
105    binary_type = str
106
107    # to allow importing
108    unichr = unichr
109    from StringIO import StringIO as _StringIO
110
111    StringIO = _StringIO
112    import cStringIO
113
114    BytesIO = cStringIO.StringIO
115    # have unlimited precision
116    no_limit_int = long  # NOQA not available on Python 3
117    from collections import Hashable, MutableSequence, MutableMapping, Mapping  # NOQA
118
119if False:  # MYPY
120    # StreamType = Union[BinaryIO, IO[str], IO[unicode],  StringIO]
121    # StreamType = Union[BinaryIO, IO[str], StringIO]  # type: ignore
122    StreamType = Any
123
124    StreamTextType = StreamType  # Union[Text, StreamType]
125    VersionType = Union[List[int], str, Tuple[int, int]]
126
127if PY3:
128    builtins_module = "builtins"
129else:
130    builtins_module = "__builtin__"
131
132UNICODE_SIZE = 4 if sys.maxunicode > 65535 else 2
133
134
135def with_metaclass(meta, *bases):
136    # type: (Any, Any) -> Any
137    """Create a base class with a metaclass."""
138    return meta("NewBase", bases, {})
139
140
141DBG_TOKEN = 1
142DBG_EVENT = 2
143DBG_NODE = 4
144
145
146_debug = None  # type: Optional[int]
147if "RUAMELDEBUG" in os.environ:
148    _debugx = os.environ.get("RUAMELDEBUG")
149    if _debugx is None:
150        _debug = 0
151    else:
152        _debug = int(_debugx)
153
154
155if bool(_debug):
156
157    class ObjectCounter(object):
158        def __init__(self):
159            # type: () -> None
160            self.map = {}  # type: Dict[Any, Any]
161
162        def __call__(self, k):
163            # type: (Any) -> None
164            self.map[k] = self.map.get(k, 0) + 1
165
166        def dump(self):
167            # type: () -> None
168            for k in sorted(self.map):
169                sys.stdout.write("{} -> {}".format(k, self.map[k]))
170
171    object_counter = ObjectCounter()
172
173
174# used from yaml util when testing
175def dbg(val=None):
176    # type: (Any) -> Any
177    global _debug
178    if _debug is None:
179        # set to true or false
180        _debugx = os.environ.get("YAMLDEBUG")
181        if _debugx is None:
182            _debug = 0
183        else:
184            _debug = int(_debugx)
185    if val is None:
186        return _debug
187    return _debug & val
188
189
190class Nprint(object):
191    def __init__(self, file_name=None):
192        # type: (Any) -> None
193        self._max_print = None  # type: Any
194        self._count = None  # type: Any
195        self._file_name = file_name
196
197    def __call__(self, *args, **kw):
198        # type: (Any, Any) -> None
199        if not bool(_debug):
200            return
201        out = sys.stdout if self._file_name is None else open(self._file_name, "a")
202        dbgprint = print  # to fool checking for print statements by dv utility
203        kw1 = kw.copy()
204        kw1["file"] = out
205        dbgprint(*args, **kw1)
206        out.flush()
207        if self._max_print is not None:
208            if self._count is None:
209                self._count = self._max_print
210            self._count -= 1
211            if self._count == 0:
212                dbgprint("forced exit\n")
213                traceback.print_stack()
214                out.flush()
215                sys.exit(0)
216        if self._file_name:
217            out.close()
218
219    def set_max_print(self, i):
220        # type: (int) -> None
221        self._max_print = i
222        self._count = None
223
224
225nprint = Nprint()
226nprintf = Nprint("/var/tmp/srsly.ruamel_yaml.log")
227
228# char checkers following production rules
229
230
231def check_namespace_char(ch):
232    # type: (Any) -> bool
233    if u"\x21" <= ch <= u"\x7E":  # ! to ~
234        return True
235    if u"\xA0" <= ch <= u"\uD7FF":
236        return True
237    if (u"\uE000" <= ch <= u"\uFFFD") and ch != u"\uFEFF":  # excl. byte order mark
238        return True
239    if u"\U00010000" <= ch <= u"\U0010FFFF":
240        return True
241    return False
242
243
244def check_anchorname_char(ch):
245    # type: (Any) -> bool
246    if ch in u",[]{}":
247        return False
248    return check_namespace_char(ch)
249
250
251def version_tnf(t1, t2=None):
252    # type: (Any, Any) -> Any
253    """
254    return True if srsly.ruamel_yaml version_info < t1, None if t2 is specified and bigger else False
255    """
256    from srsly.ruamel_yaml import version_info  # NOQA
257
258    if version_info < t1:
259        return True
260    if t2 is not None and version_info < t2:
261        return None
262    return False
263
264
265class MutableSliceableSequence(MutableSequence):  # type: ignore
266    __slots__ = ()
267
268    def __getitem__(self, index):
269        # type: (Any) -> Any
270        if not isinstance(index, slice):
271            return self.__getsingleitem__(index)
272        return type(self)(
273            [self[i] for i in range(*index.indices(len(self)))]
274        )  # type: ignore
275
276    def __setitem__(self, index, value):
277        # type: (Any, Any) -> None
278        if not isinstance(index, slice):
279            return self.__setsingleitem__(index, value)
280        assert iter(value)
281        # nprint(index.start, index.stop, index.step, index.indices(len(self)))
282        if index.step is None:
283            del self[index.start : index.stop]
284            for elem in reversed(value):
285                self.insert(0 if index.start is None else index.start, elem)
286        else:
287            range_parms = index.indices(len(self))
288            nr_assigned_items = (range_parms[1] - range_parms[0] - 1) // range_parms[
289                2
290            ] + 1
291            # need to test before changing, in case TypeError is caught
292            if nr_assigned_items < len(value):
293                raise TypeError(
294                    "too many elements in value {} < {}".format(
295                        nr_assigned_items, len(value)
296                    )
297                )
298            elif nr_assigned_items > len(value):
299                raise TypeError(
300                    "not enough elements in value {} > {}".format(
301                        nr_assigned_items, len(value)
302                    )
303                )
304            for idx, i in enumerate(range(*range_parms)):
305                self[i] = value[idx]
306
307    def __delitem__(self, index):
308        # type: (Any) -> None
309        if not isinstance(index, slice):
310            return self.__delsingleitem__(index)
311        # nprint(index.start, index.stop, index.step, index.indices(len(self)))
312        for i in reversed(range(*index.indices(len(self)))):
313            del self[i]
314
315    @abstractmethod
316    def __getsingleitem__(self, index):
317        # type: (Any) -> Any
318        raise IndexError
319
320    @abstractmethod
321    def __setsingleitem__(self, index, value):
322        # type: (Any, Any) -> None
323        raise IndexError
324
325    @abstractmethod
326    def __delsingleitem__(self, index):
327        # type: (Any) -> None
328        raise IndexError
329