1# -*- coding: utf-8 -*- 2from __future__ import absolute_import, print_function, division 3import sys 4import logging 5 6 7import petl as etl 8from petl.test.helpers import ieq 9 10 11logger = logging.getLogger(__name__) 12debug = logger.debug 13 14 15def _test_dbo(write_dbo, read_dbo=None): 16 if read_dbo is None: 17 read_dbo = write_dbo 18 19 expect_empty = (('foo', 'bar'),) 20 expect = (('foo', 'bar'), 21 ('a', 1), 22 ('b', 2)) 23 expect_appended = (('foo', 'bar'), 24 ('a', 1), 25 ('b', 2), 26 ('a', 1), 27 ('b', 2)) 28 actual = etl.fromdb(read_dbo, 'SELECT * FROM test') 29 30 debug('verify empty to start with...') 31 debug(etl.look(actual)) 32 ieq(expect_empty, actual) 33 34 debug('write some data and verify...') 35 etl.todb(expect, write_dbo, 'test') 36 debug(etl.look(actual)) 37 ieq(expect, actual) 38 39 debug('append some data and verify...') 40 etl.appenddb(expect, write_dbo, 'test') 41 debug(etl.look(actual)) 42 ieq(expect_appended, actual) 43 44 debug('overwrite and verify...') 45 etl.todb(expect, write_dbo, 'test') 46 debug(etl.look(actual)) 47 ieq(expect, actual) 48 49 debug('cut, overwrite and verify') 50 etl.todb(etl.cut(expect, 'bar', 'foo'), write_dbo, 'test') 51 debug(etl.look(actual)) 52 ieq(expect, actual) 53 54 debug('cut, append and verify') 55 etl.appenddb(etl.cut(expect, 'bar', 'foo'), write_dbo, 'test') 56 debug(etl.look(actual)) 57 ieq(expect_appended, actual) 58 59 debug('try a single row') 60 etl.todb(etl.head(expect, 1), write_dbo, 'test') 61 debug(etl.look(actual)) 62 ieq(etl.head(expect, 1), actual) 63 64 65def _test_with_schema(dbo, schema): 66 67 expect = (('foo', 'bar'), 68 ('a', 1), 69 ('b', 2)) 70 expect_appended = (('foo', 'bar'), 71 ('a', 1), 72 ('b', 2), 73 ('a', 1), 74 ('b', 2)) 75 actual = etl.fromdb(dbo, 'SELECT * FROM test') 76 77 print('write some data and verify...') 78 etl.todb(expect, dbo, 'test', schema=schema) 79 ieq(expect, actual) 80 print(etl.look(actual)) 81 82 print('append some data and verify...') 83 etl.appenddb(expect, dbo, 'test', schema=schema) 84 ieq(expect_appended, actual) 85 print(etl.look(actual)) 86 87 88def _test_unicode(dbo): 89 expect = ((u'name', u'id'), 90 (u'Արամ Խաչատրյան', 1), 91 (u'Johann Strauß', 2), 92 (u'Вагиф Сәмәдоғлу', 3), 93 (u'章子怡', 4), 94 ) 95 actual = etl.fromdb(dbo, 'SELECT * FROM test_unicode') 96 97 print('write some data and verify...') 98 etl.todb(expect, dbo, 'test_unicode') 99 ieq(expect, actual) 100 print(etl.look(actual)) 101 102 103def _setup_mysql(dbapi_connection): 104 # setup table 105 cursor = dbapi_connection.cursor() 106 # deal with quote compatibility 107 cursor.execute('SET SQL_MODE=ANSI_QUOTES') 108 cursor.execute('DROP TABLE IF EXISTS test') 109 cursor.execute('CREATE TABLE test (foo TEXT, bar INT)') 110 cursor.execute('DROP TABLE IF EXISTS test_unicode') 111 cursor.execute('CREATE TABLE test_unicode (name TEXT, id INT) ' 112 'CHARACTER SET utf8') 113 cursor.close() 114 dbapi_connection.commit() 115 116 117def _setup_postgresql(dbapi_connection): 118 # setup table 119 cursor = dbapi_connection.cursor() 120 cursor.execute('DROP TABLE IF EXISTS test') 121 cursor.execute('CREATE TABLE test (foo TEXT, bar INT)') 122 cursor.execute('DROP TABLE IF EXISTS test_unicode') 123 # assume character encoding UTF-8 already set at database level 124 cursor.execute('CREATE TABLE test_unicode (name TEXT, id INT)') 125 cursor.close() 126 dbapi_connection.commit() 127 128 129host, user, password, database = 'localhost', 'petl', 'test', 'petl' 130 131 132try: 133 import pymysql 134 import sqlalchemy 135 pymysql.connect(host=host, 136 user=user, 137 password=password, 138 database=database) 139except Exception as e: 140 print('SKIP pymysql tests: %s' % e, file=sys.stderr) 141else: 142 143 def test_mysql(): 144 145 import pymysql 146 connect = pymysql.connect 147 148 # assume database already created 149 dbapi_connection = connect(host=host, 150 user=user, 151 password=password, 152 database=database) 153 154 # exercise using a dbapi_connection 155 _setup_mysql(dbapi_connection) 156 _test_dbo(dbapi_connection) 157 158 # exercise using a dbapi_cursor 159 _setup_mysql(dbapi_connection) 160 dbapi_cursor = dbapi_connection.cursor() 161 _test_dbo(dbapi_cursor) 162 dbapi_cursor.close() 163 164 # exercise sqlalchemy dbapi_connection 165 _setup_mysql(dbapi_connection) 166 from sqlalchemy import create_engine 167 sqlalchemy_engine = create_engine('mysql+pymysql://%s:%s@%s/%s' % 168 (user, password, host, database)) 169 sqlalchemy_connection = sqlalchemy_engine.connect() 170 sqlalchemy_connection.execute('SET SQL_MODE=ANSI_QUOTES') 171 _test_dbo(sqlalchemy_connection) 172 sqlalchemy_connection.close() 173 174 # exercise sqlalchemy session 175 _setup_mysql(dbapi_connection) 176 from sqlalchemy.orm import sessionmaker 177 Session = sessionmaker(bind=sqlalchemy_engine) 178 sqlalchemy_session = Session() 179 _test_dbo(sqlalchemy_session) 180 sqlalchemy_session.close() 181 182 # exercise sqlalchemy engine 183 _setup_mysql(dbapi_connection) 184 sqlalchemy_engine2 = create_engine('mysql+pymysql://%s:%s@%s/%s' % 185 (user, password, host, database)) 186 sqlalchemy_engine2.execute('SET SQL_MODE=ANSI_QUOTES') 187 _test_dbo(sqlalchemy_engine2) 188 sqlalchemy_engine2.dispose() 189 190 # other exercises 191 _test_with_schema(dbapi_connection, database) 192 utf8_connection = connect(host=host, user=user, 193 password=password, 194 database=database, 195 charset='utf8') 196 utf8_connection.cursor().execute('SET SQL_MODE=ANSI_QUOTES') 197 _test_unicode(utf8_connection) 198 199 200try: 201 import MySQLdb 202 import sqlalchemy 203 MySQLdb.connect(host=host, 204 user=user, 205 passwd=password, 206 db=database) 207except Exception as e: 208 print('SKIP MySQLdb tests: %s' % e, file=sys.stderr) 209else: 210 211 def test_mysql(): 212 213 import MySQLdb 214 connect = MySQLdb.connect 215 216 # assume database already created 217 dbapi_connection = connect(host=host, 218 user=user, 219 passwd=password, 220 db=database) 221 222 # exercise using a dbapi_connection 223 _setup_mysql(dbapi_connection) 224 _test_dbo(dbapi_connection) 225 226 # exercise using a dbapi_cursor 227 _setup_mysql(dbapi_connection) 228 dbapi_cursor = dbapi_connection.cursor() 229 _test_dbo(dbapi_cursor) 230 dbapi_cursor.close() 231 232 # exercise sqlalchemy dbapi_connection 233 _setup_mysql(dbapi_connection) 234 from sqlalchemy import create_engine 235 sqlalchemy_engine = create_engine('mysql+mysqldb://%s:%s@%s/%s' % 236 (user, password, host, database)) 237 sqlalchemy_connection = sqlalchemy_engine.connect() 238 sqlalchemy_connection.execute('SET SQL_MODE=ANSI_QUOTES') 239 _test_dbo(sqlalchemy_connection) 240 sqlalchemy_connection.close() 241 242 # exercise sqlalchemy session 243 _setup_mysql(dbapi_connection) 244 from sqlalchemy.orm import sessionmaker 245 Session = sessionmaker(bind=sqlalchemy_engine) 246 sqlalchemy_session = Session() 247 _test_dbo(sqlalchemy_session) 248 sqlalchemy_session.close() 249 250 # other exercises 251 _test_with_schema(dbapi_connection, database) 252 utf8_connection = connect(host=host, user=user, 253 passwd=password, 254 db=database, 255 charset='utf8') 256 utf8_connection.cursor().execute('SET SQL_MODE=ANSI_QUOTES') 257 _test_unicode(utf8_connection) 258 259 260try: 261 import psycopg2 262 import sqlalchemy 263 psycopg2.connect( 264 'host=%s dbname=%s user=%s password=%s' 265 % (host, database, user, password) 266 ) 267except Exception as e: 268 print('SKIP psycopg2 tests: %s' % e, file=sys.stderr) 269else: 270 271 def test_postgresql(): 272 273 import psycopg2 274 import psycopg2.extensions 275 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) 276 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) 277 278 # assume database already created 279 dbapi_connection = psycopg2.connect( 280 'host=%s dbname=%s user=%s password=%s' 281 % (host, database, user, password) 282 ) 283 284 # exercise using a dbapi_connection 285 _setup_postgresql(dbapi_connection) 286 _test_dbo(dbapi_connection) 287 288 # exercise using a dbapi_cursor 289 _setup_postgresql(dbapi_connection) 290 dbapi_cursor = dbapi_connection.cursor() 291 _test_dbo(dbapi_cursor) 292 dbapi_cursor.close() 293 294 # exercise sqlalchemy dbapi_connection 295 _setup_postgresql(dbapi_connection) 296 from sqlalchemy import create_engine 297 sqlalchemy_engine = create_engine('postgresql+psycopg2://%s:%s@%s/%s' % 298 (user, password, host, database)) 299 sqlalchemy_connection = sqlalchemy_engine.connect() 300 _test_dbo(sqlalchemy_connection) 301 sqlalchemy_connection.close() 302 303 # exercise sqlalchemy session 304 _setup_postgresql(dbapi_connection) 305 from sqlalchemy.orm import sessionmaker 306 Session = sessionmaker(bind=sqlalchemy_engine) 307 sqlalchemy_session = Session() 308 _test_dbo(sqlalchemy_session) 309 sqlalchemy_session.close() 310 311 # other exercises 312 _test_dbo(dbapi_connection, 313 lambda: dbapi_connection.cursor(name='arbitrary')) 314 _test_with_schema(dbapi_connection, 'public') 315 _test_unicode(dbapi_connection) 316