1import os
2
3import pytest
4import pymysql
5
6from .utils import run, dbtest, set_expanded_output, is_expanded_output
7
8
9def assert_result_equal(result, title=None, rows=None, headers=None,
10                        status=None, auto_status=True, assert_contains=False):
11    """Assert that an sqlexecute.run() result matches the expected values."""
12    if status is None and auto_status and rows:
13        status = '{} row{} in set'.format(
14            len(rows), 's' if len(rows) > 1 else '')
15    fields = {'title': title, 'rows': rows, 'headers': headers,
16              'status': status}
17
18    if assert_contains:
19        # Do a loose match on the results using the *in* operator.
20        for key, field in fields.items():
21            if field:
22                assert field in result[0][key]
23    else:
24        # Do an exact match on the fields.
25        assert result == [fields]
26
27
28@dbtest
29def test_conn(executor):
30    run(executor, '''create table test(a text)''')
31    run(executor, '''insert into test values('abc')''')
32    results = run(executor, '''select * from test''')
33
34    assert_result_equal(results, headers=['a'], rows=[('abc',)])
35
36
37@dbtest
38def test_bools(executor):
39    run(executor, '''create table test(a boolean)''')
40    run(executor, '''insert into test values(True)''')
41    results = run(executor, '''select * from test''')
42
43    assert_result_equal(results, headers=['a'], rows=[(1,)])
44
45
46@dbtest
47def test_binary(executor):
48    run(executor, '''create table bt(geom linestring NOT NULL)''')
49    run(executor, "INSERT INTO bt VALUES "
50        "(ST_GeomFromText('LINESTRING(116.37604 39.73979,116.375 39.73965)'));")
51    results = run(executor, '''select * from bt''')
52
53    geom = (b'\x00\x00\x00\x00\x01\x02\x00\x00\x00\x02\x00\x00\x009\x7f\x13\n'
54            b'\x11\x18]@4\xf4Op\xb1\xdeC@\x00\x00\x00\x00\x00\x18]@B>\xe8\xd9'
55            b'\xac\xdeC@')
56
57    assert_result_equal(results, headers=['geom'], rows=[(geom,)])
58
59
60@dbtest
61def test_table_and_columns_query(executor):
62    run(executor, "create table a(x text, y text)")
63    run(executor, "create table b(z text)")
64
65    assert set(executor.tables()) == set([('a',), ('b',)])
66    assert set(executor.table_columns()) == set(
67        [('a', 'x'), ('a', 'y'), ('b', 'z')])
68
69
70@dbtest
71def test_database_list(executor):
72    databases = executor.databases()
73    assert '_test_db' in databases
74
75
76@dbtest
77def test_invalid_syntax(executor):
78    with pytest.raises(pymysql.ProgrammingError) as excinfo:
79        run(executor, 'invalid syntax!')
80    assert 'You have an error in your SQL syntax;' in str(excinfo.value)
81
82
83@dbtest
84def test_invalid_column_name(executor):
85    with pytest.raises(pymysql.err.OperationalError) as excinfo:
86        run(executor, 'select invalid command')
87    assert "Unknown column 'invalid' in 'field list'" in str(excinfo.value)
88
89
90@dbtest
91def test_unicode_support_in_output(executor):
92    run(executor, "create table unicodechars(t text)")
93    run(executor, u"insert into unicodechars (t) values ('é')")
94
95    # See issue #24, this raises an exception without proper handling
96    results = run(executor, u"select * from unicodechars")
97    assert_result_equal(results, headers=['t'], rows=[(u'é',)])
98
99
100@dbtest
101def test_multiple_queries_same_line(executor):
102    results = run(executor, "select 'foo'; select 'bar'")
103
104    expected = [{'title': None, 'headers': ['foo'], 'rows': [('foo',)],
105                 'status': '1 row in set'},
106                {'title': None, 'headers': ['bar'], 'rows': [('bar',)],
107                 'status': '1 row in set'}]
108    assert expected == results
109
110
111@dbtest
112def test_multiple_queries_same_line_syntaxerror(executor):
113    with pytest.raises(pymysql.ProgrammingError) as excinfo:
114        run(executor, "select 'foo'; invalid syntax")
115    assert 'You have an error in your SQL syntax;' in str(excinfo.value)
116
117
118@dbtest
119def test_favorite_query(executor):
120    set_expanded_output(False)
121    run(executor, "create table test(a text)")
122    run(executor, "insert into test values('abc')")
123    run(executor, "insert into test values('def')")
124
125    results = run(executor, "\\fs test-a select * from test where a like 'a%'")
126    assert_result_equal(results, status='Saved.')
127
128    results = run(executor, "\\f test-a")
129    assert_result_equal(results,
130                        title="> select * from test where a like 'a%'",
131                        headers=['a'], rows=[('abc',)], auto_status=False)
132
133    results = run(executor, "\\fd test-a")
134    assert_result_equal(results, status='test-a: Deleted')
135
136
137@dbtest
138def test_favorite_query_multiple_statement(executor):
139    set_expanded_output(False)
140    run(executor, "create table test(a text)")
141    run(executor, "insert into test values('abc')")
142    run(executor, "insert into test values('def')")
143
144    results = run(executor,
145                  "\\fs test-ad select * from test where a like 'a%'; "
146                  "select * from test where a like 'd%'")
147    assert_result_equal(results, status='Saved.')
148
149    results = run(executor, "\\f test-ad")
150    expected = [{'title': "> select * from test where a like 'a%'",
151                 'headers': ['a'], 'rows': [('abc',)], 'status': None},
152                {'title': "> select * from test where a like 'd%'",
153                 'headers': ['a'], 'rows': [('def',)], 'status': None}]
154    assert expected == results
155
156    results = run(executor, "\\fd test-ad")
157    assert_result_equal(results, status='test-ad: Deleted')
158
159
160@dbtest
161def test_favorite_query_expanded_output(executor):
162    set_expanded_output(False)
163    run(executor, '''create table test(a text)''')
164    run(executor, '''insert into test values('abc')''')
165
166    results = run(executor, "\\fs test-ae select * from test")
167    assert_result_equal(results, status='Saved.')
168
169    results = run(executor, "\\f test-ae \\G")
170    assert is_expanded_output() is True
171    assert_result_equal(results, title='> select * from test',
172                        headers=['a'], rows=[('abc',)], auto_status=False)
173
174    set_expanded_output(False)
175
176    results = run(executor, "\\fd test-ae")
177    assert_result_equal(results, status='test-ae: Deleted')
178
179
180@dbtest
181def test_special_command(executor):
182    results = run(executor, '\\?')
183    assert_result_equal(results, rows=('quit', '\\q', 'Quit.'),
184                        headers='Command', assert_contains=True,
185                        auto_status=False)
186
187
188@dbtest
189def test_cd_command_without_a_folder_name(executor):
190    results = run(executor, 'system cd')
191    assert_result_equal(results, status='No folder name was provided.')
192
193
194@dbtest
195def test_system_command_not_found(executor):
196    results = run(executor, 'system xyz')
197    assert_result_equal(results, status='OSError: No such file or directory',
198                        assert_contains=True)
199
200
201@dbtest
202def test_system_command_output(executor):
203    test_dir = os.path.abspath(os.path.dirname(__file__))
204    test_file_path = os.path.join(test_dir, 'test.txt')
205    results = run(executor, 'system cat {0}'.format(test_file_path))
206    assert_result_equal(results, status='mycli rocks!\n')
207
208
209@dbtest
210def test_cd_command_current_dir(executor):
211    test_path = os.path.abspath(os.path.dirname(__file__))
212    run(executor, 'system cd {0}'.format(test_path))
213    assert os.getcwd() == test_path
214
215
216@dbtest
217def test_unicode_support(executor):
218    results = run(executor, u"SELECT '日本語' AS japanese;")
219    assert_result_equal(results, headers=['japanese'], rows=[(u'日本語',)])
220
221
222@dbtest
223def test_timestamp_null(executor):
224    run(executor, '''create table ts_null(a timestamp null)''')
225    run(executor, '''insert into ts_null values(null)''')
226    results = run(executor, '''select * from ts_null''')
227    assert_result_equal(results, headers=['a'],
228                        rows=[(None,)])
229
230
231@dbtest
232def test_datetime_null(executor):
233    run(executor, '''create table dt_null(a datetime null)''')
234    run(executor, '''insert into dt_null values(null)''')
235    results = run(executor, '''select * from dt_null''')
236    assert_result_equal(results, headers=['a'],
237                        rows=[(None,)])
238
239
240@dbtest
241def test_date_null(executor):
242    run(executor, '''create table date_null(a date null)''')
243    run(executor, '''insert into date_null values(null)''')
244    results = run(executor, '''select * from date_null''')
245    assert_result_equal(results, headers=['a'], rows=[(None,)])
246
247
248@dbtest
249def test_time_null(executor):
250    run(executor, '''create table time_null(a time null)''')
251    run(executor, '''insert into time_null values(null)''')
252    results = run(executor, '''select * from time_null''')
253    assert_result_equal(results, headers=['a'], rows=[(None,)])
254
255
256@dbtest
257def test_multiple_results(executor):
258    query = '''CREATE PROCEDURE dmtest()
259        BEGIN
260          SELECT 1;
261          SELECT 2;
262        END'''
263    executor.conn.cursor().execute(query)
264
265    results = run(executor, 'call dmtest;')
266    expected = [
267        {'title': None, 'rows': [(1,)], 'headers': ['1'],
268         'status': '1 row in set'},
269        {'title': None, 'rows': [(2,)], 'headers': ['2'],
270         'status': '1 row in set'}
271    ]
272    assert results == expected
273