1# -*- coding: utf-8 -*-
2"""Tools to replay xonsh history files."""
3import json
4import time
5import builtins
6import collections.abc as cabc
7
8from xonsh.tools import swap
9from xonsh.lazyjson import LazyJSON
10from xonsh.environ import Env
11import xonsh.history.main as xhm
12
13
14DEFAULT_MERGE_ENVS = ("replay", "native")
15
16
17class Replayer(object):
18    """Replays a xonsh history file."""
19
20    def __init__(self, f, reopen=True):
21        """
22        Parameters
23        ----------
24        f : file handle or str
25            Path to xonsh history file.
26        reopen : bool, optional
27            Whether new file handle should be opened for each load, passed directly into
28            LazyJSON class.
29        """
30        self._lj = LazyJSON(f, reopen=reopen)
31
32    def __del__(self):
33        self._lj.close()
34
35    def replay(self, merge_envs=DEFAULT_MERGE_ENVS, target=None):
36        """Replays the history specified, returns the history object where the code
37        was executed.
38
39        Parameters
40        ----------
41        merge_env : tuple of str or Mappings, optional
42            Describes how to merge the environments, in order of increasing precedence.
43            Available strings are 'replay' and 'native'. The 'replay' env comes from the
44            history file that we are replaying. The 'native' env comes from what this
45            instance of xonsh was started up with. Instead of a string, a dict or other
46            mapping may be passed in as well. Defaults to ('replay', 'native').
47        target : str, optional
48            Path to new history file.
49        """
50        shell = builtins.__xonsh_shell__
51        re_env = self._lj["env"].load()
52        new_env = self._merge_envs(merge_envs, re_env)
53        new_hist = xhm.construct_history(
54            env=new_env.detype(),
55            locked=True,
56            ts=[time.time(), None],
57            gc=False,
58            filename=target,
59        )
60        with swap(builtins, "__xonsh_env__", new_env), swap(
61            builtins, "__xonsh_history__", new_hist
62        ):
63            for cmd in self._lj["cmds"]:
64                inp = cmd["inp"]
65                shell.default(inp)
66                if builtins.__xonsh_exit__:  # prevent premature exit
67                    builtins.__xonsh_exit__ = False
68        new_hist.flush(at_exit=True)
69        return new_hist
70
71    def _merge_envs(self, merge_envs, re_env):
72        new_env = {}
73        for e in merge_envs:
74            if e == "replay":
75                new_env.update(re_env)
76            elif e == "native":
77                new_env.update(builtins.__xonsh_env__)
78            elif isinstance(e, cabc.Mapping):
79                new_env.update(e)
80            else:
81                raise TypeError("Type of env not understood: {0!r}".format(e))
82        new_env = Env(**new_env)
83        return new_env
84
85
86_REPLAY_PARSER = None
87
88
89def replay_create_parser(p=None):
90    global _REPLAY_PARSER
91    p_was_none = p is None
92    if _REPLAY_PARSER is not None and p_was_none:
93        return _REPLAY_PARSER
94    if p_was_none:
95        from argparse import ArgumentParser
96
97        p = ArgumentParser("replay", description="replays a xonsh history file")
98    p.add_argument(
99        "--merge-envs",
100        dest="merge_envs",
101        default=DEFAULT_MERGE_ENVS,
102        nargs="+",
103        help="Describes how to merge the environments, in order of "
104        "increasing precedence. Available strings are 'replay' and "
105        "'native'. The 'replay' env comes from the history file that we "
106        "are replaying. The 'native' env comes from what this instance "
107        "of xonsh was started up with. One or more of these options may "
108        "be passed in. Defaults to '--merge-envs replay native'.",
109    )
110    p.add_argument(
111        "--json",
112        dest="json",
113        default=False,
114        action="store_true",
115        help="print history info in JSON format",
116    )
117    p.add_argument(
118        "-o", "--target", dest="target", default=None, help="path to new history file"
119    )
120    p.add_argument("path", help="path to replay history file")
121    if p_was_none:
122        _REPLAY_PARSER = p
123    return p
124
125
126def replay_main_action(h, ns, stdout=None, stderr=None):
127    replayer = Replayer(ns.path)
128    hist = replayer.replay(merge_envs=ns.merge_envs, target=ns.target)
129    print("----------------------------------------------------------------")
130    print("Just replayed history, new history has the following information")
131    print("----------------------------------------------------------------")
132    data = hist.info()
133    if ns.json:
134        s = json.dumps(data)
135        print(s, file=stdout)
136    else:
137        lines = ["{0}: {1}".format(k, v) for k, v in data.items()]
138        print("\n".join(lines), file=stdout)
139
140
141def replay_main(args, stdin=None):
142    """Acts as main function for replaying a xonsh history file."""
143    parser = replay_create_parser()
144    ns = parser.parse_args(args)
145    replay_main_action(ns)
146