1# -*- coding: utf-8 -*-
2"""Tests the json history backend."""
3# pylint: disable=protected-access
4import os
5import shlex
6
7import pytest
8
9from xonsh.lazyjson import LazyJSON
10from xonsh.history.dummy import DummyHistory
11from xonsh.history.json import JsonHistory
12from xonsh.history.main import history_main, _xh_parse_args, construct_history
13
14
15CMDS = ["ls", "cat hello kitty", "abc", "def", "touch me", "grep from me"]
16
17
18@pytest.yield_fixture
19def hist():
20    h = JsonHistory(
21        filename="xonsh-HISTORY-TEST.json", here="yup", sessionid="SESSIONID", gc=False
22    )
23    yield h
24    os.remove(h.filename)
25
26
27def test_hist_init(hist):
28    """Test initialization of the shell history."""
29    with LazyJSON(hist.filename) as lj:
30        obs = lj["here"]
31    assert "yup" == obs
32
33
34def test_hist_append(hist, xonsh_builtins):
35    """Verify appending to the history works."""
36    xonsh_builtins.__xonsh_env__["HISTCONTROL"] = set()
37    hf = hist.append({"inp": "still alive", "rtn": 0})
38    assert hf is None
39    assert "still alive" == hist.buffer[0]["inp"]
40    assert 0 == hist.buffer[0]["rtn"]
41    assert 0 == hist.rtns[-1]
42    hf = hist.append({"inp": "dead now", "rtn": 1})
43    assert "dead now" == hist.buffer[1]["inp"]
44    assert 1 == hist.buffer[1]["rtn"]
45    assert 1 == hist.rtns[-1]
46    hf = hist.append({"inp": "reborn", "rtn": 0})
47    assert "reborn" == hist.buffer[2]["inp"]
48    assert 0 == hist.buffer[2]["rtn"]
49    assert 0 == hist.rtns[-1]
50
51
52def test_hist_flush(hist, xonsh_builtins):
53    """Verify explicit flushing of the history works."""
54    hf = hist.flush()
55    assert hf is None
56    xonsh_builtins.__xonsh_env__["HISTCONTROL"] = set()
57    hist.append({"inp": "still alive?", "rtn": 0, "out": "yes"})
58    hf = hist.flush()
59    assert hf is not None
60    while hf.is_alive():
61        pass
62    with LazyJSON(hist.filename) as lj:
63        assert len(lj["cmds"]) == 1
64        cmd = lj["cmds"][0]
65        assert cmd["inp"] == "still alive?"
66        assert not cmd.get("out", None)
67
68
69def test_hist_flush_with_store_stdout(hist, xonsh_builtins):
70    """Verify explicit flushing of the history works."""
71    hf = hist.flush()
72    assert hf is None
73    xonsh_builtins.__xonsh_env__["HISTCONTROL"] = set()
74    xonsh_builtins.__xonsh_env__["XONSH_STORE_STDOUT"] = True
75    hist.append({"inp": "still alive?", "rtn": 0, "out": "yes"})
76    hf = hist.flush()
77    assert hf is not None
78    while hf.is_alive():
79        pass
80    with LazyJSON(hist.filename) as lj:
81        assert len(lj["cmds"]) == 1
82        assert lj["cmds"][0]["inp"] == "still alive?"
83        assert lj["cmds"][0]["out"].strip() == "yes"
84
85
86def test_hist_flush_with_hist_control(hist, xonsh_builtins):
87    """Verify explicit flushing of the history works."""
88    hf = hist.flush()
89    assert hf is None
90    xonsh_builtins.__xonsh_env__["HISTCONTROL"] = "ignoredups,ignoreerr"
91    hist.append({"inp": "ls foo1", "rtn": 0})
92    hist.append({"inp": "ls foo1", "rtn": 1})
93    hist.append({"inp": "ls foo1", "rtn": 0})
94    hist.append({"inp": "ls foo2", "rtn": 2})
95    hist.append({"inp": "ls foo3", "rtn": 0})
96    hf = hist.flush()
97    assert hf is not None
98    while hf.is_alive():
99        pass
100    assert len(hist.buffer) == 0
101    with LazyJSON(hist.filename) as lj:
102        cmds = list(lj["cmds"])
103        assert len(cmds) == 2
104        assert [x["inp"] for x in cmds] == ["ls foo1", "ls foo3"]
105        assert [x["rtn"] for x in cmds] == [0, 0]
106
107
108def test_cmd_field(hist, xonsh_builtins):
109    # in-memory
110    xonsh_builtins.__xonsh_env__["HISTCONTROL"] = set()
111    hf = hist.append({"inp": "ls foo", "rtn": 1})
112    assert hf is None
113    assert 1 == hist.rtns[0]
114    assert 1 == hist.rtns[-1]
115    assert None == hist.outs[-1]
116    # slice
117    assert [1] == hist.rtns[:]
118    # on disk
119    hf = hist.flush()
120    assert hf is not None
121    assert 1 == hist.rtns[0]
122    assert 1 == hist.rtns[-1]
123    assert None == hist.outs[-1]
124
125
126@pytest.mark.parametrize(
127    "inp, commands, offset",
128    [
129        ("", CMDS, (0, 1)),
130        ("-r", list(reversed(CMDS)), (len(CMDS) - 1, -1)),
131        ("0", CMDS[0:1], (0, 1)),
132        ("1", CMDS[1:2], (1, 1)),
133        ("-2", CMDS[-2:-1], (len(CMDS) - 2, 1)),
134        ("1:3", CMDS[1:3], (1, 1)),
135        ("1::2", CMDS[1::2], (1, 2)),
136        ("-4:-2", CMDS[-4:-2], (len(CMDS) - 4, 1)),
137    ],
138)
139def test_show_cmd_numerate(inp, commands, offset, hist, xonsh_builtins, capsys):
140    """Verify that CLI history commands work."""
141    base_idx, step = offset
142    xonsh_builtins.__xonsh_history__ = hist
143    xonsh_builtins.__xonsh_env__["HISTCONTROL"] = set()
144    for ts, cmd in enumerate(CMDS):  # populate the shell history
145        hist.append({"inp": cmd, "rtn": 0, "ts": (ts + 1, ts + 1.5)})
146
147    exp = (
148        "{}: {}".format(base_idx + idx * step, cmd)
149        for idx, cmd in enumerate(list(commands))
150    )
151    exp = "\n".join(exp)
152
153    history_main(["show", "-n"] + shlex.split(inp))
154    out, err = capsys.readouterr()
155    assert out.rstrip() == exp
156
157
158def test_histcontrol(hist, xonsh_builtins):
159    """Test HISTCONTROL=ignoredups,ignoreerr"""
160
161    xonsh_builtins.__xonsh_env__["HISTCONTROL"] = "ignoredups,ignoreerr"
162    assert len(hist.buffer) == 0
163
164    # An error, buffer remains empty
165    hist.append({"inp": "ls foo", "rtn": 2})
166    assert len(hist.buffer) == 1
167    assert hist.rtns[-1] == 2
168    assert hist.inps[-1] == "ls foo"
169
170    # Success
171    hist.append({"inp": "ls foobazz", "rtn": 0})
172    assert len(hist.buffer) == 2
173    assert "ls foobazz" == hist.buffer[-1]["inp"]
174    assert 0 == hist.buffer[-1]["rtn"]
175    assert hist.rtns[-1] == 0
176    assert hist.inps[-1] == "ls foobazz"
177
178    # Error
179    hist.append({"inp": "ls foo", "rtn": 2})
180    assert len(hist.buffer) == 3
181    assert "ls foo" == hist.buffer[-1]["inp"]
182    assert 2 == hist.buffer[-1]["rtn"]
183    assert hist.rtns[-1] == 2
184    assert hist.inps[-1] == "ls foo"
185
186    # File now exists, success
187    hist.append({"inp": "ls foo", "rtn": 0})
188    assert len(hist.buffer) == 4
189    assert "ls foo" == hist.buffer[-1]["inp"]
190    assert 0 == hist.buffer[-1]["rtn"]
191    assert hist.rtns[-1] == 0
192    assert hist.inps[-1] == "ls foo"
193
194    # Success
195    hist.append({"inp": "ls", "rtn": 0})
196    assert len(hist.buffer) == 5
197    assert "ls" == hist.buffer[-1]["inp"]
198    assert 0 == hist.buffer[-1]["rtn"]
199    assert hist.rtns[-1] == 0
200    assert hist.inps[-1] == "ls"
201
202    # Dup
203    hist.append({"inp": "ls", "rtn": 0})
204    assert len(hist.buffer) == 6
205    assert hist.rtns[-1] == 0
206    assert hist.inps[-1] == "ls"
207
208    # Success
209    hist.append({"inp": "/bin/ls", "rtn": 0})
210    assert len(hist.buffer) == 7
211    assert "/bin/ls" == hist.buffer[-1]["inp"]
212    assert 0 == hist.buffer[-1]["rtn"]
213    assert hist.rtns[-1] == 0
214    assert hist.inps[-1] == "/bin/ls"
215
216    # Error
217    hist.append({"inp": "ls bazz", "rtn": 1})
218    assert len(hist.buffer) == 8
219    assert "ls bazz" == hist.buffer[-1]["inp"]
220    assert 1 == hist.buffer[-1]["rtn"]
221    assert hist.rtns[-1] == 1
222    assert hist.inps[-1] == "ls bazz"
223
224    # Error
225    hist.append({"inp": "ls bazz", "rtn": -1})
226    assert len(hist.buffer) == 9
227    assert "ls bazz" == hist.buffer[-1]["inp"]
228    assert -1 == hist.buffer[-1]["rtn"]
229    assert hist.rtns[-1] == -1
230    assert hist.inps[-1] == "ls bazz"
231
232
233@pytest.mark.parametrize("args", ["-h", "--help", "show -h", "show --help"])
234def test_parse_args_help(args, capsys):
235    with pytest.raises(SystemExit):
236        args = _xh_parse_args(shlex.split(args))
237    assert "show this help message and exit" in capsys.readouterr()[0]
238
239
240@pytest.mark.parametrize(
241    "args, exp",
242    [
243        ("", ("show", "session", [], False, False)),
244        ("1:5", ("show", "session", ["1:5"], False, False)),
245        ("show", ("show", "session", [], False, False)),
246        ("show 15", ("show", "session", ["15"], False, False)),
247        ("show bash 3:5 15:66", ("show", "bash", ["3:5", "15:66"], False, False)),
248        ("show -r", ("show", "session", [], False, True)),
249        ("show -rn bash", ("show", "bash", [], True, True)),
250        ("show -n -r -30:20", ("show", "session", ["-30:20"], True, True)),
251        ("show -n zsh 1:2:3", ("show", "zsh", ["1:2:3"], True, False)),
252    ],
253)
254def test_parser_show(args, exp):
255    # use dict instead of argparse.Namespace for pretty pytest diff
256    exp_ns = {
257        "action": exp[0],
258        "session": exp[1],
259        "slices": exp[2],
260        "numerate": exp[3],
261        "reverse": exp[4],
262        "start_time": None,
263        "end_time": None,
264        "datetime_format": None,
265        "timestamp": False,
266        "null_byte": False,
267    }
268    ns = _xh_parse_args(shlex.split(args))
269    assert ns.__dict__ == exp_ns
270
271
272@pytest.mark.parametrize(
273    "index, exp",
274    [
275        (-1, ("grep from me", "out", 0, (5, 6))),
276        (1, ("cat hello kitty", "out", 0, (1, 2))),
277        (
278            slice(1, 3),
279            [("cat hello kitty", "out", 0, (1, 2)), ("abc", "out", 0, (2, 3))],
280        ),
281    ],
282)
283def test_history_getitem(index, exp, hist, xonsh_builtins):
284    xonsh_builtins.__xonsh_env__["HISTCONTROL"] = set()
285    attrs = ("inp", "out", "rtn", "ts")
286
287    for ts, cmd in enumerate(CMDS):  # populate the shell history
288        entry = {k: v for k, v in zip(attrs, [cmd, "out", 0, (ts, ts + 1)])}
289        hist.append(entry)
290
291    entry = hist[index]
292    if isinstance(entry, list):
293        assert [(e.cmd, e.out, e.rtn, e.ts) for e in entry] == exp
294    else:
295        assert (entry.cmd, entry.out, entry.rtn, entry.ts) == exp
296
297
298def test_construct_history_str(xonsh_builtins):
299    xonsh_builtins.__xonsh_env__["XONSH_HISTORY_BACKEND"] = "dummy"
300    assert isinstance(construct_history(), DummyHistory)
301
302
303def test_construct_history_class(xonsh_builtins):
304    xonsh_builtins.__xonsh_env__["XONSH_HISTORY_BACKEND"] = DummyHistory
305    assert isinstance(construct_history(), DummyHistory)
306
307
308def test_construct_history_instance(xonsh_builtins):
309    xonsh_builtins.__xonsh_env__["XONSH_HISTORY_BACKEND"] = DummyHistory()
310    assert isinstance(construct_history(), DummyHistory)
311