1# -*- coding: utf-8 -*-
2"""Tools for diff'ing two xonsh history files in a meaningful fashion."""
3import difflib
4import datetime
5import itertools
6import argparse
7
8from xonsh.lazyjson import LazyJSON
9from xonsh.tools import print_color
10
11NO_COLOR_S = "{NO_COLOR}"
12RED_S = "{RED}"
13GREEN_S = "{GREEN}"
14BOLD_RED_S = "{BOLD_RED}"
15BOLD_GREEN_S = "{BOLD_GREEN}"
16
17# intern some strings
18REPLACE_S = "replace"
19DELETE_S = "delete"
20INSERT_S = "insert"
21EQUAL_S = "equal"
22
23
24def bold_str_diff(a, b, sm=None):
25    if sm is None:
26        sm = difflib.SequenceMatcher()
27    aline = RED_S + "- "
28    bline = GREEN_S + "+ "
29    sm.set_seqs(a, b)
30    for tag, i1, i2, j1, j2 in sm.get_opcodes():
31        if tag == REPLACE_S:
32            aline += BOLD_RED_S + a[i1:i2] + RED_S
33            bline += BOLD_GREEN_S + b[j1:j2] + GREEN_S
34        elif tag == DELETE_S:
35            aline += BOLD_RED_S + a[i1:i2] + RED_S
36        elif tag == INSERT_S:
37            bline += BOLD_GREEN_S + b[j1:j2] + GREEN_S
38        elif tag == EQUAL_S:
39            aline += a[i1:i2]
40            bline += b[j1:j2]
41        else:
42            raise RuntimeError("tag not understood")
43    return aline + NO_COLOR_S + "\n" + bline + NO_COLOR_S + "\n"
44
45
46def redline(line):
47    return "{red}- {line}{no_color}\n".format(red=RED_S, line=line, no_color=NO_COLOR_S)
48
49
50def greenline(line):
51    return "{green}+ {line}{no_color}\n".format(
52        green=GREEN_S, line=line, no_color=NO_COLOR_S
53    )
54
55
56def highlighted_ndiff(a, b):
57    """Returns a highlighted string, with bold characters where different."""
58    s = ""
59    sm = difflib.SequenceMatcher()
60    sm.set_seqs(a, b)
61    linesm = difflib.SequenceMatcher()
62    for tag, i1, i2, j1, j2 in sm.get_opcodes():
63        if tag == REPLACE_S:
64            for aline, bline in itertools.zip_longest(a[i1:i2], b[j1:j2]):
65                if bline is None:
66                    s += redline(aline)
67                elif aline is None:
68                    s += greenline(bline)
69                else:
70                    s += bold_str_diff(aline, bline, sm=linesm)
71        elif tag == DELETE_S:
72            for aline in a[i1:i2]:
73                s += redline(aline)
74        elif tag == INSERT_S:
75            for bline in b[j1:j2]:
76                s += greenline(bline)
77        elif tag == EQUAL_S:
78            for aline in a[i1:i2]:
79                s += "  " + aline + "\n"
80        else:
81            raise RuntimeError("tag not understood")
82    return s
83
84
85class HistoryDiffer(object):
86    """This class helps diff two xonsh history files."""
87
88    def __init__(self, afile, bfile, reopen=False, verbose=False):
89        """
90        Parameters
91        ----------
92        afile : file handle or str
93            The first file to diff
94        bfile : file handle or str
95            The second file to diff
96        reopen : bool, optional
97            Whether or not to reopen the file handles each time. The default here is
98            opposite from the LazyJSON default because we know that we will be doing
99            a lot of reading so it is best to keep the handles open.
100        verbose : bool, optional
101            Whether to print a verbose amount of information.
102        """
103        self.a = LazyJSON(afile, reopen=reopen)
104        self.b = LazyJSON(bfile, reopen=reopen)
105        self.verbose = verbose
106        self.sm = difflib.SequenceMatcher(autojunk=False)
107
108    def __del__(self):
109        self.a.close()
110        self.b.close()
111
112    def __str__(self):
113        return self.format()
114
115    def _header_line(self, lj):
116        s = lj._f.name if hasattr(lj._f, "name") else ""
117        s += " (" + lj["sessionid"] + ")"
118        s += " [locked]" if lj["locked"] else " [unlocked]"
119        ts = lj["ts"].load()
120        ts0 = datetime.datetime.fromtimestamp(ts[0])
121        s += " started: " + ts0.isoformat(" ")
122        if ts[1] is not None:
123            ts1 = datetime.datetime.fromtimestamp(ts[1])
124            s += " stopped: " + ts1.isoformat(" ") + " runtime: " + str(ts1 - ts0)
125        return s
126
127    def header(self):
128        """Computes a header string difference."""
129        s = "{red}--- {aline}{no_color}\n" "{green}+++ {bline}{no_color}"
130        s = s.format(
131            aline=self._header_line(self.a),
132            bline=self._header_line(self.b),
133            red=RED_S,
134            green=GREEN_S,
135            no_color=NO_COLOR_S,
136        )
137        return s
138
139    def _env_both_diff(self, in_both, aenv, benv):
140        sm = self.sm
141        s = ""
142        for key in sorted(in_both):
143            aval = aenv[key]
144            bval = benv[key]
145            if aval == bval:
146                continue
147            s += "{0!r} is in both, but differs\n".format(key)
148            s += bold_str_diff(aval, bval, sm=sm) + "\n"
149        return s
150
151    def _env_in_one_diff(self, x, y, color, xid, xenv):
152        only_x = sorted(x - y)
153        if len(only_x) == 0:
154            return ""
155        if self.verbose:
156            xstr = ",\n".join(
157                ["    {0!r}: {1!r}".format(key, xenv[key]) for key in only_x]
158            )
159            xstr = "\n" + xstr
160        else:
161            xstr = ", ".join(["{0!r}".format(key) for key in only_x])
162        in_x = "These vars are only in {color}{xid}{no_color}: {{{xstr}}}\n\n"
163        return in_x.format(xid=xid, color=color, no_color=NO_COLOR_S, xstr=xstr)
164
165    def envdiff(self):
166        """Computes the difference between the environments."""
167        aenv = self.a["env"].load()
168        benv = self.b["env"].load()
169        akeys = frozenset(aenv)
170        bkeys = frozenset(benv)
171        in_both = akeys & bkeys
172        if len(in_both) == len(akeys) == len(bkeys):
173            keydiff = self._env_both_diff(in_both, aenv, benv)
174            if len(keydiff) == 0:
175                return ""
176            in_a = in_b = ""
177        else:
178            keydiff = self._env_both_diff(in_both, aenv, benv)
179            in_a = self._env_in_one_diff(akeys, bkeys, RED_S, self.a["sessionid"], aenv)
180            in_b = self._env_in_one_diff(
181                bkeys, akeys, GREEN_S, self.b["sessionid"], benv
182            )
183        s = "Environment\n-----------\n" + in_a + keydiff + in_b
184        return s
185
186    def _cmd_in_one_diff(self, inp, i, xlj, xid, color):
187        s = "cmd #{i} only in {color}{xid}{no_color}:\n"
188        s = s.format(i=i, color=color, xid=xid, no_color=NO_COLOR_S)
189        lines = inp.splitlines()
190        lt = "{color}{pre}{no_color} {line}\n"
191        s += lt.format(color=color, no_color=NO_COLOR_S, line=lines[0], pre=">>>")
192        for line in lines[1:]:
193            s += lt.format(color=color, no_color=NO_COLOR_S, line=line, pre="...")
194        if not self.verbose:
195            return s + "\n"
196        out = xlj["cmds"][0].get("out", "Note: no output stored")
197        s += out.rstrip() + "\n\n"
198        return s
199
200    def _cmd_out_and_rtn_diff(self, i, j):
201        s = ""
202        aout = self.a["cmds"][i].get("out", None)
203        bout = self.b["cmds"][j].get("out", None)
204        if aout is None and bout is None:
205            # s += 'Note: neither output stored\n'
206            pass
207        elif bout is None:
208            aid = self.a["sessionid"]
209            s += "Note: only {red}{aid}{no_color} output stored\n".format(
210                red=RED_S, aid=aid, no_color=NO_COLOR_S
211            )
212        elif aout is None:
213            bid = self.b["sessionid"]
214            s += "Note: only {green}{bid}{no_color} output stored\n".format(
215                green=GREEN_S, bid=bid, no_color=NO_COLOR_S
216            )
217        elif aout != bout:
218            s += "Outputs differ\n"
219            s += highlighted_ndiff(aout.splitlines(), bout.splitlines())
220        else:
221            pass
222        artn = self.a["cmds"][i]["rtn"]
223        brtn = self.b["cmds"][j]["rtn"]
224        if artn != brtn:
225            s += (
226                "Return vals {red}{artn}{no_color} & {green}{brtn}{no_color} differ\n"
227            ).format(
228                red=RED_S, green=GREEN_S, no_color=NO_COLOR_S, artn=artn, brtn=brtn
229            )
230        return s
231
232    def _cmd_replace_diff(self, i, ainp, aid, j, binp, bid):
233        s = (
234            "cmd #{i} in {red}{aid}{no_color} is replaced by \n"
235            "cmd #{j} in {green}{bid}{no_color}:\n"
236        )
237        s = s.format(
238            i=i, aid=aid, j=j, bid=bid, red=RED_S, green=GREEN_S, no_color=NO_COLOR_S
239        )
240        s += highlighted_ndiff(ainp.splitlines(), binp.splitlines())
241        if not self.verbose:
242            return s + "\n"
243        s += self._cmd_out_and_rtn_diff(i, j)
244        return s + "\n"
245
246    def cmdsdiff(self):
247        """Computes the difference of the commands themselves."""
248        aid = self.a["sessionid"]
249        bid = self.b["sessionid"]
250        ainps = [c["inp"] for c in self.a["cmds"]]
251        binps = [c["inp"] for c in self.b["cmds"]]
252        sm = self.sm
253        sm.set_seqs(ainps, binps)
254        s = ""
255        for tag, i1, i2, j1, j2 in sm.get_opcodes():
256            if tag == REPLACE_S:
257                zipper = itertools.zip_longest
258                for i, ainp, j, binp in zipper(
259                    range(i1, i2), ainps[i1:i2], range(j1, j2), binps[j1:j2]
260                ):
261                    if j is None:
262                        s += self._cmd_in_one_diff(ainp, i, self.a, aid, RED_S)
263                    elif i is None:
264                        s += self._cmd_in_one_diff(binp, j, self.b, bid, GREEN_S)
265                    else:
266                        self._cmd_replace_diff(i, ainp, aid, j, binp, bid)
267            elif tag == DELETE_S:
268                for i, inp in enumerate(ainps[i1:i2], i1):
269                    s += self._cmd_in_one_diff(inp, i, self.a, aid, RED_S)
270            elif tag == INSERT_S:
271                for j, inp in enumerate(binps[j1:j2], j1):
272                    s += self._cmd_in_one_diff(inp, j, self.b, bid, GREEN_S)
273            elif tag == EQUAL_S:
274                for i, j in zip(range(i1, i2), range(j1, j2)):
275                    odiff = self._cmd_out_and_rtn_diff(i, j)
276                    if len(odiff) > 0:
277                        h = (
278                            "cmd #{i} in {red}{aid}{no_color} input is the same as \n"
279                            "cmd #{j} in {green}{bid}{no_color}, but output differs:\n"
280                        )
281                        s += h.format(
282                            i=i,
283                            aid=aid,
284                            j=j,
285                            bid=bid,
286                            red=RED_S,
287                            green=GREEN_S,
288                            no_color=NO_COLOR_S,
289                        )
290                        s += odiff + "\n"
291            else:
292                raise RuntimeError("tag not understood")
293        if len(s) == 0:
294            return s
295        return "Commands\n--------\n" + s
296
297    def format(self):
298        """Formats the difference between the two history files."""
299        s = self.header()
300        ed = self.envdiff()
301        if len(ed) > 0:
302            s += "\n\n" + ed
303        cd = self.cmdsdiff()
304        if len(cd) > 0:
305            s += "\n\n" + cd
306        return s.rstrip()
307
308
309_HD_PARSER = None
310
311
312def dh_create_parser(p=None):
313    global _HD_PARSER
314    p_was_none = p is None
315    if _HD_PARSER is not None and p_was_none:
316        return _HD_PARSER
317    if p_was_none:
318        p = argparse.ArgumentParser(
319            "diff-history", description="diffs two xonsh history files"
320        )
321    p.add_argument(
322        "--reopen",
323        dest="reopen",
324        default=False,
325        action="store_true",
326        help="make lazy file loading reopen files each time",
327    )
328    p.add_argument(
329        "-v",
330        "--verbose",
331        dest="verbose",
332        default=False,
333        action="store_true",
334        help="whether to print even more information",
335    )
336    p.add_argument("a", help="first file in diff")
337    p.add_argument("b", help="second file in diff")
338    if p_was_none:
339        _HD_PARSER = p
340    return p
341
342
343def dh_main_action(ns, hist=None, stdout=None, stderr=None):
344    hd = HistoryDiffer(ns.a, ns.b, reopen=ns.reopen, verbose=ns.verbose)
345    print_color(hd.format(), file=stdout)
346