1# -*- coding: utf-8 -*-
2from __future__ import absolute_import, print_function, division
3import sys
4import logging
5
6
7import petl as etl
8from petl.test.helpers import ieq
9
10
11logger = logging.getLogger(__name__)
12debug = logger.debug
13
14
15def _test_dbo(write_dbo, read_dbo=None):
16    if read_dbo is None:
17        read_dbo = write_dbo
18
19    expect_empty = (('foo', 'bar'),)
20    expect = (('foo', 'bar'),
21              ('a', 1),
22              ('b', 2))
23    expect_appended = (('foo', 'bar'),
24                       ('a', 1),
25                       ('b', 2),
26                       ('a', 1),
27                       ('b', 2))
28    actual = etl.fromdb(read_dbo, 'SELECT * FROM test')
29
30    debug('verify empty to start with...')
31    debug(etl.look(actual))
32    ieq(expect_empty, actual)
33
34    debug('write some data and verify...')
35    etl.todb(expect, write_dbo, 'test')
36    debug(etl.look(actual))
37    ieq(expect, actual)
38
39    debug('append some data and verify...')
40    etl.appenddb(expect, write_dbo, 'test')
41    debug(etl.look(actual))
42    ieq(expect_appended, actual)
43
44    debug('overwrite and verify...')
45    etl.todb(expect, write_dbo, 'test')
46    debug(etl.look(actual))
47    ieq(expect, actual)
48
49    debug('cut, overwrite and verify')
50    etl.todb(etl.cut(expect, 'bar', 'foo'), write_dbo, 'test')
51    debug(etl.look(actual))
52    ieq(expect, actual)
53
54    debug('cut, append and verify')
55    etl.appenddb(etl.cut(expect, 'bar', 'foo'), write_dbo, 'test')
56    debug(etl.look(actual))
57    ieq(expect_appended, actual)
58
59    debug('try a single row')
60    etl.todb(etl.head(expect, 1), write_dbo, 'test')
61    debug(etl.look(actual))
62    ieq(etl.head(expect, 1), actual)
63
64
65def _test_with_schema(dbo, schema):
66
67    expect = (('foo', 'bar'),
68              ('a', 1),
69              ('b', 2))
70    expect_appended = (('foo', 'bar'),
71                       ('a', 1),
72                       ('b', 2),
73                       ('a', 1),
74                       ('b', 2))
75    actual = etl.fromdb(dbo, 'SELECT * FROM test')
76
77    print('write some data and verify...')
78    etl.todb(expect, dbo, 'test', schema=schema)
79    ieq(expect, actual)
80    print(etl.look(actual))
81
82    print('append some data and verify...')
83    etl.appenddb(expect, dbo, 'test', schema=schema)
84    ieq(expect_appended, actual)
85    print(etl.look(actual))
86
87
88def _test_unicode(dbo):
89    expect = ((u'name', u'id'),
90              (u'Արամ Խաչատրյան', 1),
91              (u'Johann Strauß', 2),
92              (u'Вагиф Сәмәдоғлу', 3),
93              (u'章子怡', 4),
94              )
95    actual = etl.fromdb(dbo, 'SELECT * FROM test_unicode')
96
97    print('write some data and verify...')
98    etl.todb(expect, dbo, 'test_unicode')
99    ieq(expect, actual)
100    print(etl.look(actual))
101
102
103def _setup_mysql(dbapi_connection):
104    # setup table
105    cursor = dbapi_connection.cursor()
106    # deal with quote compatibility
107    cursor.execute('SET SQL_MODE=ANSI_QUOTES')
108    cursor.execute('DROP TABLE IF EXISTS test')
109    cursor.execute('CREATE TABLE test (foo TEXT, bar INT)')
110    cursor.execute('DROP TABLE IF EXISTS test_unicode')
111    cursor.execute('CREATE TABLE test_unicode (name TEXT, id INT) '
112                   'CHARACTER SET utf8')
113    cursor.close()
114    dbapi_connection.commit()
115
116
117def _setup_postgresql(dbapi_connection):
118    # setup table
119    cursor = dbapi_connection.cursor()
120    cursor.execute('DROP TABLE IF EXISTS test')
121    cursor.execute('CREATE TABLE test (foo TEXT, bar INT)')
122    cursor.execute('DROP TABLE IF EXISTS test_unicode')
123    # assume character encoding UTF-8 already set at database level
124    cursor.execute('CREATE TABLE test_unicode (name TEXT, id INT)')
125    cursor.close()
126    dbapi_connection.commit()
127
128
129host, user, password, database = 'localhost', 'petl', 'test', 'petl'
130
131
132try:
133    import pymysql
134    import sqlalchemy
135    pymysql.connect(host=host,
136                    user=user,
137                    password=password,
138                    database=database)
139except Exception as e:
140    print('SKIP pymysql tests: %s' % e, file=sys.stderr)
141else:
142
143    def test_mysql():
144
145        import pymysql
146        connect = pymysql.connect
147
148        # assume database already created
149        dbapi_connection = connect(host=host,
150                                   user=user,
151                                   password=password,
152                                   database=database)
153
154        # exercise using a dbapi_connection
155        _setup_mysql(dbapi_connection)
156        _test_dbo(dbapi_connection)
157
158        # exercise using a dbapi_cursor
159        _setup_mysql(dbapi_connection)
160        dbapi_cursor = dbapi_connection.cursor()
161        _test_dbo(dbapi_cursor)
162        dbapi_cursor.close()
163
164        # exercise sqlalchemy dbapi_connection
165        _setup_mysql(dbapi_connection)
166        from sqlalchemy import create_engine
167        sqlalchemy_engine = create_engine('mysql+pymysql://%s:%s@%s/%s' %
168                                          (user, password, host, database))
169        sqlalchemy_connection = sqlalchemy_engine.connect()
170        sqlalchemy_connection.execute('SET SQL_MODE=ANSI_QUOTES')
171        _test_dbo(sqlalchemy_connection)
172        sqlalchemy_connection.close()
173
174        # exercise sqlalchemy session
175        _setup_mysql(dbapi_connection)
176        from sqlalchemy.orm import sessionmaker
177        Session = sessionmaker(bind=sqlalchemy_engine)
178        sqlalchemy_session = Session()
179        _test_dbo(sqlalchemy_session)
180        sqlalchemy_session.close()
181
182        # exercise sqlalchemy engine
183        _setup_mysql(dbapi_connection)
184        sqlalchemy_engine2 = create_engine('mysql+pymysql://%s:%s@%s/%s' %
185                                           (user, password, host, database))
186        sqlalchemy_engine2.execute('SET SQL_MODE=ANSI_QUOTES')
187        _test_dbo(sqlalchemy_engine2)
188        sqlalchemy_engine2.dispose()
189
190        # other exercises
191        _test_with_schema(dbapi_connection, database)
192        utf8_connection = connect(host=host, user=user,
193                                  password=password,
194                                  database=database,
195                                  charset='utf8')
196        utf8_connection.cursor().execute('SET SQL_MODE=ANSI_QUOTES')
197        _test_unicode(utf8_connection)
198
199
200try:
201    import MySQLdb
202    import sqlalchemy
203    MySQLdb.connect(host=host,
204                    user=user,
205                    passwd=password,
206                    db=database)
207except Exception as e:
208    print('SKIP MySQLdb tests: %s' % e, file=sys.stderr)
209else:
210
211    def test_mysql():
212
213        import MySQLdb
214        connect = MySQLdb.connect
215
216        # assume database already created
217        dbapi_connection = connect(host=host,
218                                   user=user,
219                                   passwd=password,
220                                   db=database)
221
222        # exercise using a dbapi_connection
223        _setup_mysql(dbapi_connection)
224        _test_dbo(dbapi_connection)
225
226        # exercise using a dbapi_cursor
227        _setup_mysql(dbapi_connection)
228        dbapi_cursor = dbapi_connection.cursor()
229        _test_dbo(dbapi_cursor)
230        dbapi_cursor.close()
231
232        # exercise sqlalchemy dbapi_connection
233        _setup_mysql(dbapi_connection)
234        from sqlalchemy import create_engine
235        sqlalchemy_engine = create_engine('mysql+mysqldb://%s:%s@%s/%s' %
236                                          (user, password, host, database))
237        sqlalchemy_connection = sqlalchemy_engine.connect()
238        sqlalchemy_connection.execute('SET SQL_MODE=ANSI_QUOTES')
239        _test_dbo(sqlalchemy_connection)
240        sqlalchemy_connection.close()
241
242        # exercise sqlalchemy session
243        _setup_mysql(dbapi_connection)
244        from sqlalchemy.orm import sessionmaker
245        Session = sessionmaker(bind=sqlalchemy_engine)
246        sqlalchemy_session = Session()
247        _test_dbo(sqlalchemy_session)
248        sqlalchemy_session.close()
249
250        # other exercises
251        _test_with_schema(dbapi_connection, database)
252        utf8_connection = connect(host=host, user=user,
253                                  passwd=password,
254                                  db=database,
255                                  charset='utf8')
256        utf8_connection.cursor().execute('SET SQL_MODE=ANSI_QUOTES')
257        _test_unicode(utf8_connection)
258
259
260try:
261    import psycopg2
262    import sqlalchemy
263    psycopg2.connect(
264        'host=%s dbname=%s user=%s password=%s'
265        % (host, database, user, password)
266    )
267except Exception as e:
268    print('SKIP psycopg2 tests: %s' % e, file=sys.stderr)
269else:
270
271    def test_postgresql():
272
273        import psycopg2
274        import psycopg2.extensions
275        psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
276        psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
277
278        # assume database already created
279        dbapi_connection = psycopg2.connect(
280            'host=%s dbname=%s user=%s password=%s'
281            % (host, database, user, password)
282        )
283
284        # exercise using a dbapi_connection
285        _setup_postgresql(dbapi_connection)
286        _test_dbo(dbapi_connection)
287
288        # exercise using a dbapi_cursor
289        _setup_postgresql(dbapi_connection)
290        dbapi_cursor = dbapi_connection.cursor()
291        _test_dbo(dbapi_cursor)
292        dbapi_cursor.close()
293
294        # exercise sqlalchemy dbapi_connection
295        _setup_postgresql(dbapi_connection)
296        from sqlalchemy import create_engine
297        sqlalchemy_engine = create_engine('postgresql+psycopg2://%s:%s@%s/%s' %
298                                          (user, password, host, database))
299        sqlalchemy_connection = sqlalchemy_engine.connect()
300        _test_dbo(sqlalchemy_connection)
301        sqlalchemy_connection.close()
302
303        # exercise sqlalchemy session
304        _setup_postgresql(dbapi_connection)
305        from sqlalchemy.orm import sessionmaker
306        Session = sessionmaker(bind=sqlalchemy_engine)
307        sqlalchemy_session = Session()
308        _test_dbo(sqlalchemy_session)
309        sqlalchemy_session.close()
310
311        # other exercises
312        _test_dbo(dbapi_connection,
313                  lambda: dbapi_connection.cursor(name='arbitrary'))
314        _test_with_schema(dbapi_connection, 'public')
315        _test_unicode(dbapi_connection)
316