1# coding: utf-8 2 3# Copyright (c) Jupyter Development Team. 4# Distributed under the terms of the Modified BSD License. 5 6 7 8import copy 9import re 10 11from nbdime import patch 12from nbdime.diff_format import op_patch 13from nbdime.merging.decisions import ( 14 apply_decisions, ensure_common_path, MergeDecision) 15 16from nbdime import diff 17 18 19def has_merge_conflicts(decisions): 20 "Return whether there are conflicting entries or not." 21 return any(item.conflict for item in decisions) 22 23 24_r_is_int = re.compile("^[0-9]+$") 25 26 27def is_int(n): 28 return bool(_r_is_int.match(n)) 29 30 31def pick_merge_decision(base, dec): 32 if dec.action is None or dec.action == "base": 33 di = None 34 elif dec.action == "local" or dec.action == "either": 35 di = dec.local_diff 36 elif dec.action == "remote": 37 di = dec.remote_diff 38 elif dec.action == "custom": 39 di = dec.custom_diff 40 else: 41 raise ValueError("Unknown action {}".format(dec.action)) 42 43 if di is not None: 44 # Parse common path 45 keys = [k for k in dec.common_path.split("/") if k != ""] 46 sub = base 47 for k in keys: 48 if isinstance(sub, list): 49 k = int(k) 50 sub = sub[k] 51 # "/cells" -> sub = base[cells], sub is a list 52 # patch 53 54 # Add patch entries 55 base_diff = di 56 for k in reversed(keys): 57 if is_int(k): 58 k = int(k) 59 base_diff = op_patch(k, base_diff) 60 # Apply patch 61 base = patch(base, base_diff) 62 63 return base 64 65 66def gather_merge_decisions(base, decisions): 67 assert not has_merge_conflicts(decisions) 68 for dec in decisions: 69 base = pick_merge_decision(base, dec) 70 return base 71 72 73def create_decision_item(action=None, common_path="", conflict=False, 74 local_diff=None, remote_diff=None, custom_diff=None): 75 76 # Some parameter validation here, but don't need to duplicate json schema 77 if action is None: 78 pass 79 elif action == "local": 80 assert local_diff 81 elif action == "remote": 82 assert remote_diff 83 elif action == "custom": 84 assert custom_diff 85 elif action in ("either", "local_then_remote", "remote_then_local"): 86 assert local_diff 87 assert remote_diff 88 else: 89 pass 90 91 item = MergeDecision({ 92 "action": action, 93 "common_path": common_path, 94 "conflict": conflict, 95 "custom_diff": custom_diff, 96 "local_diff": local_diff, 97 "remote_diff": remote_diff, 98 }) 99 return item 100 101 102def _example_decisions(): 103 decisions = [ 104 create_decision_item( 105 action="local", 106 local_diff="" 107 ) 108 ] 109 return decisions 110 111 112def test_apply_merge_empty(): 113 decisions = [] 114 base = {"hello": "world"} 115 assert base == apply_decisions(base, decisions) 116 117 118def test_apply_merge_on_dicts(): 119 base = { 120 "metadata": { 121 "a": {"ting": 123}, 122 "b": {"tang": 456} 123 } 124 } 125 126 local = copy.deepcopy(base) 127 local["metadata"]["a"]["ting"] += 1 128 129 remote = copy.deepcopy(base) 130 remote["metadata"]["a"]["ting"] -= 1 131 132 bld = diff(base, local) 133 brd = diff(base, remote) 134 135 path, (bld, brd) = ensure_common_path((), [bld, brd]) 136 137 merge_decisions = [ 138 create_decision_item( 139 action="remote", 140 common_path=path, 141 local_diff=bld, 142 remote_diff=brd) 143 ] 144 145 assert remote == apply_decisions(base, merge_decisions) 146 147# merge decisions with common path "cells" can modify cells/* indices 148# merge decisions with common path "cells/*" only edit exactly one of the cells/* objects 149# applying cells/* before cells means editing first, no indices modified, then moving things around 150