1# coding: utf-8
2
3from __future__ import print_function
4
5# partially from package six by Benjamin Peterson
6
7import sys
8import os
9import types
10import traceback
11from abc import abstractmethod
12
13
14# fmt: off
15if False:  # MYPY
16    from typing import Any, Dict, Optional, List, Union, BinaryIO, IO, Text, Tuple  # NOQA
17    from typing import Optional  # NOQA
18# fmt: on
19
20_DEFAULT_YAML_VERSION = (1, 2)
21
22try:
23    from ruamel.ordereddict import ordereddict
24except:  # NOQA
25    try:
26        from collections import OrderedDict
27    except ImportError:
28        from ordereddict import OrderedDict  # type: ignore
29    # to get the right name import ... as ordereddict doesn't do that
30
31    class ordereddict(OrderedDict):  # type: ignore
32        if not hasattr(OrderedDict, 'insert'):
33
34            def insert(self, pos, key, value):
35                # type: (int, Any, Any) -> None
36                if pos >= len(self):
37                    self[key] = value
38                    return
39                od = ordereddict()
40                od.update(self)
41                for k in od:
42                    del self[k]
43                for index, old_key in enumerate(od):
44                    if pos == index:
45                        self[key] = value
46                    self[old_key] = od[old_key]
47
48
49PY2 = sys.version_info[0] == 2
50PY3 = sys.version_info[0] == 3
51
52
53if PY3:
54
55    def utf8(s):
56        # type: (str) -> str
57        return s
58
59    def to_str(s):
60        # type: (str) -> str
61        return s
62
63    def to_unicode(s):
64        # type: (str) -> str
65        return s
66
67
68else:
69    if False:
70        unicode = str
71
72    def utf8(s):
73        # type: (unicode) -> str
74        return s.encode('utf-8')
75
76    def to_str(s):
77        # type: (str) -> str
78        return str(s)
79
80    def to_unicode(s):
81        # type: (str) -> unicode
82        return unicode(s)  # NOQA
83
84
85if PY3:
86    string_types = str
87    integer_types = int
88    class_types = type
89    text_type = str
90    binary_type = bytes
91
92    MAXSIZE = sys.maxsize
93    unichr = chr
94    import io
95
96    StringIO = io.StringIO
97    BytesIO = io.BytesIO
98    # have unlimited precision
99    no_limit_int = int
100    from collections.abc import Hashable, MutableSequence, MutableMapping, Mapping  # NOQA
101
102else:
103    string_types = basestring  # NOQA
104    integer_types = (int, long)  # NOQA
105    class_types = (type, types.ClassType)
106    text_type = unicode  # NOQA
107    binary_type = str
108
109    # to allow importing
110    unichr = unichr
111    from StringIO import StringIO as _StringIO
112
113    StringIO = _StringIO
114    import cStringIO
115
116    BytesIO = cStringIO.StringIO
117    # have unlimited precision
118    no_limit_int = long  # NOQA not available on Python 3
119    from collections import Hashable, MutableSequence, MutableMapping, Mapping  # NOQA
120
121if False:  # MYPY
122    # StreamType = Union[BinaryIO, IO[str], IO[unicode],  StringIO]
123    # StreamType = Union[BinaryIO, IO[str], StringIO]  # type: ignore
124    StreamType = Any
125
126    StreamTextType = StreamType  # Union[Text, StreamType]
127    VersionType = Union[List[int], str, Tuple[int, int]]
128
129if PY3:
130    builtins_module = 'builtins'
131else:
132    builtins_module = '__builtin__'
133
134UNICODE_SIZE = 4 if sys.maxunicode > 65535 else 2
135
136
137def with_metaclass(meta, *bases):
138    # type: (Any, Any) -> Any
139    """Create a base class with a metaclass."""
140    return meta('NewBase', bases, {})
141
142
143DBG_TOKEN = 1
144DBG_EVENT = 2
145DBG_NODE = 4
146
147
148_debug = None  # type: Optional[int]
149if 'RUAMELDEBUG' in os.environ:
150    _debugx = os.environ.get('RUAMELDEBUG')
151    if _debugx is None:
152        _debug = 0
153    else:
154        _debug = int(_debugx)
155
156
157if bool(_debug):
158
159    class ObjectCounter(object):
160        def __init__(self):
161            # type: () -> None
162            self.map = {}  # type: Dict[Any, Any]
163
164        def __call__(self, k):
165            # type: (Any) -> None
166            self.map[k] = self.map.get(k, 0) + 1
167
168        def dump(self):
169            # type: () -> None
170            for k in sorted(self.map):
171                sys.stdout.write('{} -> {}'.format(k, self.map[k]))
172
173    object_counter = ObjectCounter()
174
175
176# used from yaml util when testing
177def dbg(val=None):
178    # type: (Any) -> Any
179    global _debug
180    if _debug is None:
181        # set to true or false
182        _debugx = os.environ.get('YAMLDEBUG')
183        if _debugx is None:
184            _debug = 0
185        else:
186            _debug = int(_debugx)
187    if val is None:
188        return _debug
189    return _debug & val
190
191
192class Nprint(object):
193    def __init__(self, file_name=None):
194        # type: (Any) -> None
195        self._max_print = None  # type: Any
196        self._count = None  # type: Any
197        self._file_name = file_name
198
199    def __call__(self, *args, **kw):
200        # type: (Any, Any) -> None
201        if not bool(_debug):
202            return
203        out = sys.stdout if self._file_name is None else open(self._file_name, 'a')
204        dbgprint = print  # to fool checking for print statements by dv utility
205        kw1 = kw.copy()
206        kw1['file'] = out
207        dbgprint(*args, **kw1)
208        out.flush()
209        if self._max_print is not None:
210            if self._count is None:
211                self._count = self._max_print
212            self._count -= 1
213            if self._count == 0:
214                dbgprint('forced exit\n')
215                traceback.print_stack()
216                out.flush()
217                sys.exit(0)
218        if self._file_name:
219            out.close()
220
221    def set_max_print(self, i):
222        # type: (int) -> None
223        self._max_print = i
224        self._count = None
225
226
227nprint = Nprint()
228nprintf = Nprint('/var/tmp/ruamel.yaml.log')
229
230# char checkers following production rules
231
232
233def check_namespace_char(ch):
234    # type: (Any) -> bool
235    if u'\x21' <= ch <= u'\x7E':  # ! to ~
236        return True
237    if u'\xA0' <= ch <= u'\uD7FF':
238        return True
239    if (u'\uE000' <= ch <= u'\uFFFD') and ch != u'\uFEFF':  # excl. byte order mark
240        return True
241    if u'\U00010000' <= ch <= u'\U0010FFFF':
242        return True
243    return False
244
245
246def check_anchorname_char(ch):
247    # type: (Any) -> bool
248    if ch in u',[]{}':
249        return False
250    return check_namespace_char(ch)
251
252
253def version_tnf(t1, t2=None):
254    # type: (Any, Any) -> Any
255    """
256    return True if ruamel.yaml version_info < t1, None if t2 is specified and bigger else False
257    """
258    from ruamel.yaml import version_info  # NOQA
259
260    if version_info < t1:
261        return True
262    if t2 is not None and version_info < t2:
263        return None
264    return False
265
266
267class MutableSliceableSequence(MutableSequence):  # type: ignore
268    __slots__ = ()
269
270    def __getitem__(self, index):
271        # type: (Any) -> Any
272        if not isinstance(index, slice):
273            return self.__getsingleitem__(index)
274        return type(self)([self[i] for i in range(*index.indices(len(self)))])  # type: ignore
275
276    def __setitem__(self, index, value):
277        # type: (Any, Any) -> None
278        if not isinstance(index, slice):
279            return self.__setsingleitem__(index, value)
280        assert iter(value)
281        # nprint(index.start, index.stop, index.step, index.indices(len(self)))
282        if index.step is None:
283            del self[index.start : index.stop]
284            for elem in reversed(value):
285                self.insert(0 if index.start is None else index.start, elem)
286        else:
287            range_parms = index.indices(len(self))
288            nr_assigned_items = (range_parms[1] - range_parms[0] - 1) // range_parms[2] + 1
289            # need to test before changing, in case TypeError is caught
290            if nr_assigned_items < len(value):
291                raise TypeError(
292                    'too many elements in value {} < {}'.format(nr_assigned_items, len(value))
293                )
294            elif nr_assigned_items > len(value):
295                raise TypeError(
296                    'not enough elements in value {} > {}'.format(
297                        nr_assigned_items, len(value)
298                    )
299                )
300            for idx, i in enumerate(range(*range_parms)):
301                self[i] = value[idx]
302
303    def __delitem__(self, index):
304        # type: (Any) -> None
305        if not isinstance(index, slice):
306            return self.__delsingleitem__(index)
307        # nprint(index.start, index.stop, index.step, index.indices(len(self)))
308        for i in reversed(range(*index.indices(len(self)))):
309            del self[i]
310
311    @abstractmethod
312    def __getsingleitem__(self, index):
313        # type: (Any) -> Any
314        raise IndexError
315
316    @abstractmethod
317    def __setsingleitem__(self, index, value):
318        # type: (Any, Any) -> None
319        raise IndexError
320
321    @abstractmethod
322    def __delsingleitem__(self, index):
323        # type: (Any) -> None
324        raise IndexError
325