1import os 2 3import pytest 4import pymysql 5 6from .utils import run, dbtest, set_expanded_output, is_expanded_output 7 8 9def assert_result_equal(result, title=None, rows=None, headers=None, 10 status=None, auto_status=True, assert_contains=False): 11 """Assert that an sqlexecute.run() result matches the expected values.""" 12 if status is None and auto_status and rows: 13 status = '{} row{} in set'.format( 14 len(rows), 's' if len(rows) > 1 else '') 15 fields = {'title': title, 'rows': rows, 'headers': headers, 16 'status': status} 17 18 if assert_contains: 19 # Do a loose match on the results using the *in* operator. 20 for key, field in fields.items(): 21 if field: 22 assert field in result[0][key] 23 else: 24 # Do an exact match on the fields. 25 assert result == [fields] 26 27 28@dbtest 29def test_conn(executor): 30 run(executor, '''create table test(a text)''') 31 run(executor, '''insert into test values('abc')''') 32 results = run(executor, '''select * from test''') 33 34 assert_result_equal(results, headers=['a'], rows=[('abc',)]) 35 36 37@dbtest 38def test_bools(executor): 39 run(executor, '''create table test(a boolean)''') 40 run(executor, '''insert into test values(True)''') 41 results = run(executor, '''select * from test''') 42 43 assert_result_equal(results, headers=['a'], rows=[(1,)]) 44 45 46@dbtest 47def test_binary(executor): 48 run(executor, '''create table bt(geom linestring NOT NULL)''') 49 run(executor, "INSERT INTO bt VALUES " 50 "(ST_GeomFromText('LINESTRING(116.37604 39.73979,116.375 39.73965)'));") 51 results = run(executor, '''select * from bt''') 52 53 geom = (b'\x00\x00\x00\x00\x01\x02\x00\x00\x00\x02\x00\x00\x009\x7f\x13\n' 54 b'\x11\x18]@4\xf4Op\xb1\xdeC@\x00\x00\x00\x00\x00\x18]@B>\xe8\xd9' 55 b'\xac\xdeC@') 56 57 assert_result_equal(results, headers=['geom'], rows=[(geom,)]) 58 59 60@dbtest 61def test_table_and_columns_query(executor): 62 run(executor, "create table a(x text, y text)") 63 run(executor, "create table b(z text)") 64 65 assert set(executor.tables()) == set([('a',), ('b',)]) 66 assert set(executor.table_columns()) == set( 67 [('a', 'x'), ('a', 'y'), ('b', 'z')]) 68 69 70@dbtest 71def test_database_list(executor): 72 databases = executor.databases() 73 assert '_test_db' in databases 74 75 76@dbtest 77def test_invalid_syntax(executor): 78 with pytest.raises(pymysql.ProgrammingError) as excinfo: 79 run(executor, 'invalid syntax!') 80 assert 'You have an error in your SQL syntax;' in str(excinfo.value) 81 82 83@dbtest 84def test_invalid_column_name(executor): 85 with pytest.raises(pymysql.err.OperationalError) as excinfo: 86 run(executor, 'select invalid command') 87 assert "Unknown column 'invalid' in 'field list'" in str(excinfo.value) 88 89 90@dbtest 91def test_unicode_support_in_output(executor): 92 run(executor, "create table unicodechars(t text)") 93 run(executor, u"insert into unicodechars (t) values ('é')") 94 95 # See issue #24, this raises an exception without proper handling 96 results = run(executor, u"select * from unicodechars") 97 assert_result_equal(results, headers=['t'], rows=[(u'é',)]) 98 99 100@dbtest 101def test_multiple_queries_same_line(executor): 102 results = run(executor, "select 'foo'; select 'bar'") 103 104 expected = [{'title': None, 'headers': ['foo'], 'rows': [('foo',)], 105 'status': '1 row in set'}, 106 {'title': None, 'headers': ['bar'], 'rows': [('bar',)], 107 'status': '1 row in set'}] 108 assert expected == results 109 110 111@dbtest 112def test_multiple_queries_same_line_syntaxerror(executor): 113 with pytest.raises(pymysql.ProgrammingError) as excinfo: 114 run(executor, "select 'foo'; invalid syntax") 115 assert 'You have an error in your SQL syntax;' in str(excinfo.value) 116 117 118@dbtest 119def test_favorite_query(executor): 120 set_expanded_output(False) 121 run(executor, "create table test(a text)") 122 run(executor, "insert into test values('abc')") 123 run(executor, "insert into test values('def')") 124 125 results = run(executor, "\\fs test-a select * from test where a like 'a%'") 126 assert_result_equal(results, status='Saved.') 127 128 results = run(executor, "\\f test-a") 129 assert_result_equal(results, 130 title="> select * from test where a like 'a%'", 131 headers=['a'], rows=[('abc',)], auto_status=False) 132 133 results = run(executor, "\\fd test-a") 134 assert_result_equal(results, status='test-a: Deleted') 135 136 137@dbtest 138def test_favorite_query_multiple_statement(executor): 139 set_expanded_output(False) 140 run(executor, "create table test(a text)") 141 run(executor, "insert into test values('abc')") 142 run(executor, "insert into test values('def')") 143 144 results = run(executor, 145 "\\fs test-ad select * from test where a like 'a%'; " 146 "select * from test where a like 'd%'") 147 assert_result_equal(results, status='Saved.') 148 149 results = run(executor, "\\f test-ad") 150 expected = [{'title': "> select * from test where a like 'a%'", 151 'headers': ['a'], 'rows': [('abc',)], 'status': None}, 152 {'title': "> select * from test where a like 'd%'", 153 'headers': ['a'], 'rows': [('def',)], 'status': None}] 154 assert expected == results 155 156 results = run(executor, "\\fd test-ad") 157 assert_result_equal(results, status='test-ad: Deleted') 158 159 160@dbtest 161def test_favorite_query_expanded_output(executor): 162 set_expanded_output(False) 163 run(executor, '''create table test(a text)''') 164 run(executor, '''insert into test values('abc')''') 165 166 results = run(executor, "\\fs test-ae select * from test") 167 assert_result_equal(results, status='Saved.') 168 169 results = run(executor, "\\f test-ae \\G") 170 assert is_expanded_output() is True 171 assert_result_equal(results, title='> select * from test', 172 headers=['a'], rows=[('abc',)], auto_status=False) 173 174 set_expanded_output(False) 175 176 results = run(executor, "\\fd test-ae") 177 assert_result_equal(results, status='test-ae: Deleted') 178 179 180@dbtest 181def test_special_command(executor): 182 results = run(executor, '\\?') 183 assert_result_equal(results, rows=('quit', '\\q', 'Quit.'), 184 headers='Command', assert_contains=True, 185 auto_status=False) 186 187 188@dbtest 189def test_cd_command_without_a_folder_name(executor): 190 results = run(executor, 'system cd') 191 assert_result_equal(results, status='No folder name was provided.') 192 193 194@dbtest 195def test_system_command_not_found(executor): 196 results = run(executor, 'system xyz') 197 assert_result_equal(results, status='OSError: No such file or directory', 198 assert_contains=True) 199 200 201@dbtest 202def test_system_command_output(executor): 203 test_dir = os.path.abspath(os.path.dirname(__file__)) 204 test_file_path = os.path.join(test_dir, 'test.txt') 205 results = run(executor, 'system cat {0}'.format(test_file_path)) 206 assert_result_equal(results, status='mycli rocks!\n') 207 208 209@dbtest 210def test_cd_command_current_dir(executor): 211 test_path = os.path.abspath(os.path.dirname(__file__)) 212 run(executor, 'system cd {0}'.format(test_path)) 213 assert os.getcwd() == test_path 214 215 216@dbtest 217def test_unicode_support(executor): 218 results = run(executor, u"SELECT '日本語' AS japanese;") 219 assert_result_equal(results, headers=['japanese'], rows=[(u'日本語',)]) 220 221 222@dbtest 223def test_timestamp_null(executor): 224 run(executor, '''create table ts_null(a timestamp null)''') 225 run(executor, '''insert into ts_null values(null)''') 226 results = run(executor, '''select * from ts_null''') 227 assert_result_equal(results, headers=['a'], 228 rows=[(None,)]) 229 230 231@dbtest 232def test_datetime_null(executor): 233 run(executor, '''create table dt_null(a datetime null)''') 234 run(executor, '''insert into dt_null values(null)''') 235 results = run(executor, '''select * from dt_null''') 236 assert_result_equal(results, headers=['a'], 237 rows=[(None,)]) 238 239 240@dbtest 241def test_date_null(executor): 242 run(executor, '''create table date_null(a date null)''') 243 run(executor, '''insert into date_null values(null)''') 244 results = run(executor, '''select * from date_null''') 245 assert_result_equal(results, headers=['a'], rows=[(None,)]) 246 247 248@dbtest 249def test_time_null(executor): 250 run(executor, '''create table time_null(a time null)''') 251 run(executor, '''insert into time_null values(null)''') 252 results = run(executor, '''select * from time_null''') 253 assert_result_equal(results, headers=['a'], rows=[(None,)]) 254 255 256@dbtest 257def test_multiple_results(executor): 258 query = '''CREATE PROCEDURE dmtest() 259 BEGIN 260 SELECT 1; 261 SELECT 2; 262 END''' 263 executor.conn.cursor().execute(query) 264 265 results = run(executor, 'call dmtest;') 266 expected = [ 267 {'title': None, 'rows': [(1,)], 'headers': ['1'], 268 'status': '1 row in set'}, 269 {'title': None, 'rows': [(2,)], 'headers': ['2'], 270 'status': '1 row in set'} 271 ] 272 assert results == expected 273