1"""
2Steps for behavioral style tests are defined in this module.
3Each step is defined by the string decorating it.
4This string is used to call the step in "*.feature" file.
5"""
6
7import pexpect
8import subprocess
9import tempfile
10
11from behave import when, then
12from textwrap import dedent
13import wrappers
14
15
16@when("we list databases")
17def step_list_databases(context):
18    cmd = ["pgcli", "--list"]
19    context.cmd_output = subprocess.check_output(cmd, cwd=context.package_root)
20
21
22@then("we see list of databases")
23def step_see_list_databases(context):
24    assert b"List of databases" in context.cmd_output
25    assert b"postgres" in context.cmd_output
26    context.cmd_output = None
27
28
29@when("we run dbcli")
30def step_run_cli(context):
31    wrappers.run_cli(context)
32
33
34@when("we launch dbcli using {arg}")
35def step_run_cli_using_arg(context, arg):
36    prompt_check = False
37    currentdb = None
38    if arg == "--username":
39        arg = "--username={}".format(context.conf["user"])
40    if arg == "--user":
41        arg = "--user={}".format(context.conf["user"])
42    if arg == "--port":
43        arg = "--port={}".format(context.conf["port"])
44    if arg == "--password":
45        arg = "--password"
46        prompt_check = False
47    # This uses the mock_pg_service.conf file in fixtures folder.
48    if arg == "dsn_password":
49        arg = "service=mock_postgres --password"
50        prompt_check = False
51        currentdb = "postgres"
52    wrappers.run_cli(
53        context, run_args=[arg], prompt_check=prompt_check, currentdb=currentdb
54    )
55
56
57@when("we wait for prompt")
58def step_wait_prompt(context):
59    wrappers.wait_prompt(context)
60
61
62@when('we send "ctrl + d"')
63def step_ctrl_d(context):
64    """
65    Send Ctrl + D to hopefully exit.
66    """
67    # turn off pager before exiting
68    context.cli.sendcontrol("c")
69    context.cli.sendline(r"\pset pager off")
70    wrappers.wait_prompt(context)
71    context.cli.sendcontrol("d")
72    context.cli.expect(pexpect.EOF, timeout=15)
73    context.exit_sent = True
74
75
76@when(r'we send "\?" command')
77def step_send_help(context):
78    r"""
79    Send \? to see help.
80    """
81    context.cli.sendline(r"\?")
82
83
84@when("we send partial select command")
85def step_send_partial_select_command(context):
86    """
87    Send `SELECT a` to see completion.
88    """
89    context.cli.sendline("SELECT a")
90
91
92@then("we see error message")
93def step_see_error_message(context):
94    wrappers.expect_exact(context, 'column "a" does not exist', timeout=2)
95
96
97@when("we send source command")
98def step_send_source_command(context):
99    context.tmpfile_sql_help = tempfile.NamedTemporaryFile(prefix="pgcli_")
100    context.tmpfile_sql_help.write(br"\?")
101    context.tmpfile_sql_help.flush()
102    context.cli.sendline(fr"\i {context.tmpfile_sql_help.name}")
103    wrappers.expect_exact(context, context.conf["pager_boundary"] + "\r\n", timeout=5)
104
105
106@when("we run query to check application_name")
107def step_check_application_name(context):
108    context.cli.sendline(
109        "SELECT 'found' FROM pg_stat_activity WHERE application_name = 'pgcli' HAVING COUNT(*) > 0;"
110    )
111
112
113@then("we see found")
114def step_see_found(context):
115    wrappers.expect_exact(
116        context,
117        context.conf["pager_boundary"]
118        + "\r"
119        + dedent(
120            """
121            +------------+\r
122            | ?column?   |\r
123            |------------|\r
124            | found      |\r
125            +------------+\r
126            SELECT 1\r
127        """
128        )
129        + context.conf["pager_boundary"],
130        timeout=5,
131    )
132
133
134@then("we confirm the destructive warning")
135def step_confirm_destructive_command(context):
136    """Confirm destructive command."""
137    wrappers.expect_exact(
138        context,
139        "You're about to run a destructive command.\r\nDo you want to proceed? (y/n):",
140        timeout=2,
141    )
142    context.cli.sendline("y")
143
144
145@then("we send password")
146def step_send_password(context):
147    wrappers.expect_exact(context, "Password for", timeout=5)
148    context.cli.sendline(context.conf["pass"] or "DOES NOT MATTER")
149