1# 2# Forked from m9dicts.{api,dicts}. 3# 4# Copyright (C) 2011 - 2015 Red Hat, Inc. 5# Copyright (C) 2011 - 2017 Satoru SATOH <ssato redhat.com> 6# License: MIT 7# 8r"""Utility functions to operate on mapping objects such as get, set and merge. 9 10.. versionadded: 0.8.3 11 define _update_* and merge functions based on classes in 12 :mod:`m9dicts.dicts` 13 14""" 15from __future__ import absolute_import 16import functools 17import operator 18import re 19import anyconfig.utils 20 21 22# Merge strategies: 23MS_REPLACE = "replace" 24MS_NO_REPLACE = "noreplace" 25MS_DICTS = "merge_dicts" 26MS_DICTS_AND_LISTS = "merge_dicts_and_lists" 27MERGE_STRATEGIES = (MS_REPLACE, MS_NO_REPLACE, MS_DICTS, MS_DICTS_AND_LISTS) 28 29PATH_SEPS = ('/', '.') 30 31_JSNP_GET_ARRAY_IDX_REG = re.compile(r"(?:0|[1-9][0-9]*)") 32_JSNP_SET_ARRAY_IDX = re.compile(r"(?:0|[1-9][0-9]*|-)") 33 34 35def _jsnp_unescape(jsn_s): 36 """ 37 Parse and decode given encoded JSON Pointer expression, convert ~1 to 38 / and ~0 to ~. 39 40 .. note:: JSON Pointer: http://tools.ietf.org/html/rfc6901 41 42 >>> _jsnp_unescape("/a~1b") 43 '/a/b' 44 >>> _jsnp_unescape("~1aaa~1~0bbb") 45 '/aaa/~bbb' 46 """ 47 return jsn_s.replace('~1', '/').replace('~0', '~') 48 49 50def _split_path(path, seps=PATH_SEPS): 51 """ 52 Parse path expression and return list of path items. 53 54 :param path: Path expression may contain separator chars. 55 :param seps: Separator char candidates. 56 :return: A list of keys to fetch object[s] later. 57 58 >>> assert _split_path('') == [] 59 >>> assert _split_path('/') == [''] # JSON Pointer spec expects this. 60 >>> for p in ('/a', '.a', 'a', 'a.'): 61 ... assert _split_path(p) == ['a'], p 62 >>> assert _split_path('/a/b/c') == _split_path('a.b.c') == ['a', 'b', 'c'] 63 >>> assert _split_path('abc') == ['abc'] 64 """ 65 if not path: 66 return [] 67 68 for sep in seps: 69 if sep in path: 70 if path == sep: # Special case, '/' or '.' only. 71 return [''] 72 return [x for x in path.split(sep) if x] 73 74 return [path] 75 76 77def mk_nested_dic(path, val, seps=PATH_SEPS): 78 """ 79 Make a nested dict iteratively. 80 81 :param path: Path expression to make a nested dict 82 :param val: Value to set 83 :param seps: Separator char candidates 84 85 >>> mk_nested_dic("a.b.c", 1) 86 {'a': {'b': {'c': 1}}} 87 >>> mk_nested_dic("/a/b/c", 1) 88 {'a': {'b': {'c': 1}}} 89 """ 90 ret = None 91 for key in reversed(_split_path(path, seps)): 92 ret = {key: val if ret is None else ret.copy()} 93 94 return ret 95 96 97def get(dic, path, seps=PATH_SEPS, idx_reg=_JSNP_GET_ARRAY_IDX_REG): 98 """getter for nested dicts. 99 100 :param dic: a dict[-like] object 101 :param path: Path expression to point object wanted 102 :param seps: Separator char candidates 103 :return: A tuple of (result_object, error_message) 104 105 >>> d = {'a': {'b': {'c': 0, 'd': [1, 2]}}, '': 3} 106 >>> assert get(d, '/') == (3, '') # key becomes '' (empty string). 107 >>> assert get(d, "/a/b/c") == (0, '') 108 >>> sorted(get(d, "a.b")[0].items()) 109 [('c', 0), ('d', [1, 2])] 110 >>> (get(d, "a.b.d"), get(d, "/a/b/d/1")) 111 (([1, 2], ''), (2, '')) 112 >>> get(d, "a.b.key_not_exist") # doctest: +ELLIPSIS 113 (None, "'...'") 114 >>> get(d, "/a/b/d/2") 115 (None, 'list index out of range') 116 >>> get(d, "/a/b/d/-") # doctest: +ELLIPSIS 117 (None, 'list indices must be integers...') 118 """ 119 items = [_jsnp_unescape(p) for p in _split_path(path, seps)] 120 if not items: 121 return (dic, '') 122 try: 123 if len(items) == 1: 124 return (dic[items[0]], '') 125 126 prnt = functools.reduce(operator.getitem, items[:-1], dic) 127 arr = anyconfig.utils.is_list_like(prnt) and idx_reg.match(items[-1]) 128 return (prnt[int(items[-1])], '') if arr else (prnt[items[-1]], '') 129 130 except (TypeError, KeyError, IndexError) as exc: 131 return (None, str(exc)) 132 133 134def set_(dic, path, val, seps=PATH_SEPS): 135 """setter for nested dicts. 136 137 :param dic: a dict[-like] object support recursive merge operations 138 :param path: Path expression to point object wanted 139 :param seps: Separator char candidates 140 141 >>> d = dict(a=1, b=dict(c=2, )) 142 >>> set_(d, 'a.b.d', 3) 143 >>> d['a']['b']['d'] 144 3 145 """ 146 merge(dic, mk_nested_dic(path, val, seps), ac_merge=MS_DICTS) 147 148 149def _are_list_like(*objs): 150 """ 151 >>> _are_list_like([], (), [x for x in range(10)], (x for x in range(4))) 152 True 153 >>> _are_list_like([], {}) 154 False 155 >>> _are_list_like([], "aaa") 156 False 157 """ 158 return all(anyconfig.utils.is_list_like(obj) for obj in objs) 159 160 161def _update_with_replace(self, other, key, val=None, **options): 162 """ 163 Replace value of a mapping object 'self' with 'other' has if both have same 164 keys on update. Otherwise, just keep the value of 'self'. 165 166 :param self: mapping object to update with 'other' 167 :param other: mapping object to update 'self' 168 :param key: key of mapping object to update 169 :param val: value to update self alternatively 170 171 :return: None but 'self' will be updated 172 """ 173 self[key] = other[key] if val is None else val 174 175 176def _update_wo_replace(self, other, key, val=None, **options): 177 """ 178 Never update (replace) the value of 'self' with 'other''s, that is, only 179 the values 'self' does not have its key will be added on update. 180 181 :param self: mapping object to update with 'other' 182 :param other: mapping object to update 'self' 183 :param key: key of mapping object to update 184 :param val: value to update self alternatively 185 186 :return: None but 'self' will be updated 187 """ 188 if key not in self: 189 _update_with_replace(self, other, key, val=val) 190 191 192def _merge_list(self, key, lst): 193 """ 194 :param key: self[key] will be updated 195 :param lst: Other list to merge 196 """ 197 self[key] += [x for x in lst if x not in self[key]] 198 199 200def _merge_other(self, key, val): 201 """ 202 :param key: self[key] will be updated 203 :param val: Other val to merge (update/replace) 204 """ 205 self[key] = val # Just overwrite it by default implementation. 206 207 208def _update_with_merge(self, other, key, val=None, merge_lists=False, 209 **options): 210 """ 211 Merge the value of self with other's recursively. Behavior of merge will be 212 vary depends on types of original and new values. 213 214 - mapping vs. mapping -> merge recursively 215 - list vs. list -> vary depends on 'merge_lists'. see its description. 216 217 :param other: a dict[-like] object or a list of (key, value) tuples 218 :param key: key of mapping object to update 219 :param val: value to update self[key] 220 :param merge_lists: 221 Merge not only dicts but also lists. For example, 222 223 [1, 2, 3], [3, 4] ==> [1, 2, 3, 4] 224 [1, 2, 2], [2, 4] ==> [1, 2, 2, 4] 225 226 :return: None but 'self' will be updated 227 """ 228 if val is None: 229 val = other[key] 230 231 if key in self: 232 val0 = self[key] # Original value 233 if anyconfig.utils.is_dict_like(val0): # It needs recursive updates. 234 merge(self[key], val, merge_lists=merge_lists, **options) 235 elif merge_lists and _are_list_like(val, val0): 236 _merge_list(self, key, val) 237 else: 238 _merge_other(self, key, val) 239 else: 240 self[key] = val 241 242 243def _update_with_merge_lists(self, other, key, val=None, **options): 244 """ 245 Similar to _update_with_merge but merge lists always. 246 247 :param self: mapping object to update with 'other' 248 :param other: mapping object to update 'self' 249 :param key: key of mapping object to update 250 :param val: value to update self alternatively 251 252 :return: None but 'self' will be updated 253 """ 254 _update_with_merge(self, other, key, val=val, merge_lists=True, **options) 255 256 257_MERGE_FNS = {MS_REPLACE: _update_with_replace, 258 MS_NO_REPLACE: _update_wo_replace, 259 MS_DICTS: _update_with_merge, 260 MS_DICTS_AND_LISTS: _update_with_merge_lists} 261 262 263def _get_update_fn(strategy): 264 """ 265 Select dict-like class based on merge strategy and orderness of keys. 266 267 :param merge: Specify strategy from MERGE_STRATEGIES of how to merge dicts. 268 :return: Callable to update objects 269 """ 270 if strategy is None: 271 strategy = MS_DICTS 272 try: 273 return _MERGE_FNS[strategy] 274 except KeyError: 275 if callable(strategy): 276 return strategy 277 278 raise ValueError("Wrong merge strategy: %r" % strategy) 279 280 281def merge(self, other, ac_merge=MS_DICTS, **options): 282 """ 283 Update (merge) a mapping object 'self' with other mapping object or an 284 iterable yields (key, value) tuples based on merge strategy 'ac_merge'. 285 286 :param others: a list of dict[-like] objects or (key, value) tuples 287 :param another: optional keyword arguments to update self more 288 :param ac_merge: Merge strategy to choose 289 """ 290 _update_fn = _get_update_fn(ac_merge) 291 292 if hasattr(other, "keys"): 293 for key in other: 294 _update_fn(self, other, key, **options) 295 else: 296 try: 297 for key, val in other: 298 _update_fn(self, other, key, val=val, **options) 299 except (ValueError, TypeError) as exc: # Re-raise w/ info. 300 raise type(exc)("%s other=%r" % (str(exc), other)) 301 302 303def _make_recur(obj, make_fn, ac_ordered=False, ac_dict=None, **options): 304 """ 305 :param obj: A mapping objects or other primitive object 306 :param make_fn: Function to make/convert to 307 :param ac_ordered: Use OrderedDict instead of dict to keep order of items 308 :param ac_dict: Callable to convert 'obj' to mapping object 309 :param options: Optional keyword arguments. 310 311 :return: Mapping object 312 """ 313 if ac_dict is None: 314 ac_dict = anyconfig.compat.OrderedDict if ac_ordered else dict 315 316 return ac_dict((k, None if v is None else make_fn(v, **options)) 317 for k, v in obj.items()) 318 319 320def _make_iter(obj, make_fn, **options): 321 """ 322 :param obj: A mapping objects or other primitive object 323 :param make_fn: Function to make/convert to 324 :param options: Optional keyword arguments. 325 326 :return: Mapping object 327 """ 328 return type(obj)(make_fn(v, **options) for v in obj) 329 330 331def convert_to(obj, ac_ordered=False, ac_dict=None, **options): 332 """ 333 Convert a mapping objects to a dict or object of 'to_type' recursively. 334 Borrowed basic idea and implementation from bunch.unbunchify. (bunch is 335 distributed under MIT license same as this.) 336 337 :param obj: A mapping objects or other primitive object 338 :param ac_ordered: Use OrderedDict instead of dict to keep order of items 339 :param ac_dict: Callable to convert 'obj' to mapping object 340 :param options: Optional keyword arguments. 341 342 :return: A dict or OrderedDict or object of 'cls' 343 344 >>> OD = anyconfig.compat.OrderedDict 345 >>> convert_to(OD((('a', 1) ,)), cls=dict) 346 {'a': 1} 347 >>> convert_to(OD((('a', OD((('b', OD((('c', 1), ))), ))), )), cls=dict) 348 {'a': {'b': {'c': 1}}} 349 """ 350 options.update(ac_ordered=ac_ordered, ac_dict=ac_dict) 351 if anyconfig.utils.is_dict_like(obj): 352 return _make_recur(obj, convert_to, **options) 353 if anyconfig.utils.is_list_like(obj): 354 return _make_iter(obj, convert_to, **options) 355 356 return obj 357 358# vim:sw=4:ts=4:et: 359