1# -*- coding: utf-8 -*-
2from __future__ import absolute_import, print_function, division
3
4
5# standard library dependencies
6import logging
7from petl.compat import next, text_type, string_types
8
9
10# internal dependencies
11from petl.errors import ArgumentError
12from petl.util.base import Table
13from petl.io.db_utils import _is_dbapi_connection, _is_dbapi_cursor, \
14    _is_sqlalchemy_connection, _is_sqlalchemy_engine, _is_sqlalchemy_session, \
15    _quote, _placeholders
16from petl.io.db_create import drop_table, create_table
17
18
19logger = logging.getLogger(__name__)
20debug = logger.debug
21warning = logger.warning
22
23
24def fromdb(dbo, query, *args, **kwargs):
25    """Provides access to data from any DB-API 2.0 connection via a given query.
26    E.g., using :mod:`sqlite3`::
27
28        >>> import petl as etl
29        >>> import sqlite3
30        >>> connection = sqlite3.connect('example.db')
31        >>> table = etl.fromdb(connection, 'SELECT * FROM example')
32
33    E.g., using :mod:`psycopg2` (assuming you've installed it first)::
34
35        >>> import petl as etl
36        >>> import psycopg2
37        >>> connection = psycopg2.connect('dbname=example user=postgres')
38        >>> table = etl.fromdb(connection, 'SELECT * FROM example')
39
40    E.g., using :mod:`pymysql` (assuming you've installed it first)::
41
42        >>> import petl as etl
43        >>> import pymysql
44        >>> connection = pymysql.connect(password='moonpie', database='thangs')
45        >>> table = etl.fromdb(connection, 'SELECT * FROM example')
46
47    The `dbo` argument may also be a function that creates a cursor. N.B., each
48    call to the function should return a new cursor. E.g.::
49
50        >>> import petl as etl
51        >>> import psycopg2
52        >>> connection = psycopg2.connect('dbname=example user=postgres')
53        >>> mkcursor = lambda: connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
54        >>> table = etl.fromdb(mkcursor, 'SELECT * FROM example')
55
56    The parameter `dbo` may also be an SQLAlchemy engine, session or
57    connection object.
58
59    The parameter `dbo` may also be a string, in which case it is interpreted as
60    the name of a file containing an :mod:`sqlite3` database.
61
62    Note that the default behaviour of most database servers and clients is for
63    the entire result set for each query to be sent from the server to the
64    client. If your query returns a large result set this can result in
65    significant memory usage at the client. Some databases support server-side
66    cursors which provide a means for client libraries to fetch result sets
67    incrementally, reducing memory usage at the client.
68
69    To use a server-side cursor with a PostgreSQL database, e.g.::
70
71        >>> import petl as etl
72        >>> import psycopg2
73        >>> connection = psycopg2.connect('dbname=example user=postgres')
74        >>> table = etl.fromdb(lambda: connection.cursor(name='arbitrary'),
75        ...                    'SELECT * FROM example')
76
77    For more information on server-side cursors see the following links:
78
79        * http://initd.org/psycopg/docs/usage.html#server-side-cursors
80        * http://mysql-python.sourceforge.net/MySQLdb.html#using-and-extending
81
82    """
83
84    # convenience for working with sqlite3
85    if isinstance(dbo, string_types):
86        import sqlite3
87        dbo = sqlite3.connect(dbo)
88
89    return DbView(dbo, query, *args, **kwargs)
90
91
92class DbView(Table):
93
94    def __init__(self, dbo, query, *args, **kwargs):
95        self.dbo = dbo
96        self.query = query
97        self.args = args
98        self.kwargs = kwargs
99
100    def __iter__(self):
101
102        # does it quack like a standard DB-API 2.0 connection?
103        if _is_dbapi_connection(self.dbo):
104            debug('assuming %r is standard DB-API 2.0 connection', self.dbo)
105            _iter = _iter_dbapi_connection
106
107        # does it quack like a standard DB-API 2.0 cursor?
108        elif _is_dbapi_cursor(self.dbo):
109            debug('assuming %r is standard DB-API 2.0 cursor')
110            warning('using a DB-API cursor with fromdb() is not recommended '
111                    'and may lead to unexpected results, a DB-API connection '
112                    'is better')
113            _iter = _iter_dbapi_cursor
114
115        # does it quack like an SQLAlchemy engine?
116        elif _is_sqlalchemy_engine(self.dbo):
117            debug('assuming %r instance of sqlalchemy.engine.base.Engine',
118                  self.dbo)
119            _iter = _iter_sqlalchemy_engine
120
121        # does it quack like an SQLAlchemy session?
122        elif _is_sqlalchemy_session(self.dbo):
123            debug('assuming %r instance of sqlalchemy.orm.session.Session',
124                  self.dbo)
125            _iter = _iter_sqlalchemy_session
126
127        # does it quack like an SQLAlchemy connection?
128        elif _is_sqlalchemy_connection(self.dbo):
129            debug('assuming %r instance of sqlalchemy.engine.base.Connection',
130                  self.dbo)
131            _iter = _iter_sqlalchemy_connection
132
133        elif callable(self.dbo):
134            debug('assuming %r is a function returning a cursor', self.dbo)
135            _iter = _iter_dbapi_mkcurs
136
137        # some other sort of duck...
138        else:
139            raise ArgumentError('unsupported database object type: %r' % self.dbo)
140
141        return _iter(self.dbo, self.query, *self.args, **self.kwargs)
142
143
144def _iter_dbapi_mkcurs(mkcurs, query, *args, **kwargs):
145    cursor = mkcurs()
146    try:
147        for row in _iter_dbapi_cursor(cursor, query, *args, **kwargs):
148            yield row
149    finally:
150        cursor.close()
151
152
153def _iter_dbapi_connection(connection, query, *args, **kwargs):
154    cursor = connection.cursor()
155    try:
156        for row in _iter_dbapi_cursor(cursor, query, *args, **kwargs):
157            yield row
158    finally:
159        cursor.close()
160
161
162def _iter_dbapi_cursor(cursor, query, *args, **kwargs):
163    cursor.execute(query, *args, **kwargs)
164    # fetch one row before iterating, to force population of cursor.description
165    # which may be postponed if using server-side cursors
166    # not all database drivers populate cursor after execute so we call fetchall
167    try:
168        it = iter(cursor)
169    except TypeError:
170        it = iter(cursor.fetchall())
171    try:
172        first_row = next(it)
173    except StopIteration:
174        first_row = None
175    # fields should be available now
176    hdr = [d[0] for d in cursor.description]
177    yield tuple(hdr)
178    if first_row is None:
179        return
180    yield first_row
181    for row in it:
182        yield row  # don't wrap, return whatever the database engine returns
183
184
185def _iter_sqlalchemy_engine(engine, query, *args, **kwargs):
186    return _iter_sqlalchemy_connection(engine.connect(), query,
187                                       *args, **kwargs)
188
189
190def _iter_sqlalchemy_connection(connection, query, *args, **kwargs):
191    debug('connection: %r', connection)
192    results = connection.execute(query, *args, **kwargs)
193    hdr = results.keys()
194    yield tuple(hdr)
195    for row in results:
196        yield row
197
198
199def _iter_sqlalchemy_session(session, query, *args, **kwargs):
200    results = session.execute(query, *args, **kwargs)
201    hdr = results.keys()
202    yield tuple(hdr)
203    for row in results:
204        yield row
205
206
207def todb(table, dbo, tablename, schema=None, commit=True,
208         create=False, drop=False, constraints=True, metadata=None,
209         dialect=None, sample=1000):
210    """
211    Load data into an existing database table via a DB-API 2.0
212    connection or cursor. Note that the database table will be truncated,
213    i.e., all existing rows will be deleted prior to inserting the new data.
214    E.g.::
215
216        >>> import petl as etl
217        >>> table = [['foo', 'bar'],
218        ...          ['a', 1],
219        ...          ['b', 2],
220        ...          ['c', 2]]
221        >>> # using sqlite3
222        ... import sqlite3
223        >>> connection = sqlite3.connect('example.db')
224        >>> # assuming table "foobar" already exists in the database
225        ... etl.todb(table, connection, 'foobar')
226        >>> # using psycopg2
227        >>> import psycopg2
228        >>> connection = psycopg2.connect('dbname=example user=postgres')
229        >>> # assuming table "foobar" already exists in the database
230        ... etl.todb(table, connection, 'foobar')
231        >>> # using pymysql
232        >>> import pymysql
233        >>> connection = pymysql.connect(password='moonpie', database='thangs')
234        >>> # tell MySQL to use standard quote character
235        ... connection.cursor().execute('SET SQL_MODE=ANSI_QUOTES')
236        >>> # load data, assuming table "foobar" already exists in the database
237        ... etl.todb(table, connection, 'foobar')
238
239    N.B., for MySQL the statement ``SET SQL_MODE=ANSI_QUOTES`` is required to
240    ensure MySQL uses SQL-92 standard quote characters.
241
242    A cursor can also be provided instead of a connection, e.g.::
243
244        >>> import psycopg2
245        >>> connection = psycopg2.connect('dbname=example user=postgres')
246        >>> cursor = connection.cursor()
247        >>> etl.todb(table, cursor, 'foobar')
248
249    The parameter `dbo` may also be an SQLAlchemy engine, session or
250    connection object.
251
252    The parameter `dbo` may also be a string, in which case it is interpreted
253    as the name of a file containing an :mod:`sqlite3` database.
254
255    If ``create=True`` this function will attempt to automatically create a
256    database table before loading the data. This functionality requires
257    `SQLAlchemy <http://www.sqlalchemy.org/>`_ to be installed.
258
259    **Keyword arguments:**
260
261    table : table container
262        Table data to load
263    dbo : database object
264        DB-API 2.0 connection, callable returning a DB-API 2.0 cursor, or
265        SQLAlchemy connection, engine or session
266    tablename : string
267        Name of the table in the database
268    schema : string
269        Name of the database schema to find the table in
270    commit : bool
271        If True commit the changes
272    create : bool
273        If True attempt to create the table before loading, inferring types
274        from a sample of the data (requires SQLAlchemy)
275    drop : bool
276        If True attempt to drop the table before recreating (only relevant if
277        create=True)
278    constraints : bool
279        If True use length and nullable constraints (only relevant if
280        create=True)
281    metadata : sqlalchemy.MetaData
282        Custom table metadata (only relevant if create=True)
283    dialect : string
284        One of {'access', 'sybase', 'sqlite', 'informix', 'firebird', 'mysql',
285        'oracle', 'maxdb', 'postgresql', 'mssql'} (only relevant if
286        create=True)
287    sample : int
288        Number of rows to sample when inferring types etc. Set to 0 to use the
289        whole table (only relevant if create=True)
290
291    .. note::
292
293        This function is in principle compatible with any DB-API 2.0
294        compliant database driver. However, at the time of writing some DB-API
295        2.0 implementations, including cx_Oracle and MySQL's
296        Connector/Python, are not compatible with this function, because they
297        only accept a list argument to the cursor.executemany() function
298        called internally by :mod:`petl`. This can be worked around by
299        proxying the cursor objects, e.g.::
300
301            >>> import cx_Oracle
302            >>> connection = cx_Oracle.Connection(...)
303            >>> class CursorProxy(object):
304            ...     def __init__(self, cursor):
305            ...         self._cursor = cursor
306            ...     def executemany(self, statement, parameters, **kwargs):
307            ...         # convert parameters to a list
308            ...         parameters = list(parameters)
309            ...         # pass through to proxied cursor
310            ...         return self._cursor.executemany(statement, parameters, **kwargs)
311            ...     def __getattr__(self, item):
312            ...         return getattr(self._cursor, item)
313            ...
314            >>> def get_cursor():
315            ...     return CursorProxy(connection.cursor())
316            ...
317            >>> import petl as etl
318            >>> etl.todb(tbl, get_cursor, ...)
319
320        Note however that this does imply loading the entire table into
321        memory as a list prior to inserting into the database.
322
323    """
324
325    needs_closing = False
326
327    # convenience for working with sqlite3
328    if isinstance(dbo, string_types):
329        import sqlite3
330        dbo = sqlite3.connect(dbo)
331        needs_closing = True
332
333    try:
334        if create:
335            if drop:
336                drop_table(dbo, tablename, schema=schema, commit=commit)
337            create_table(table, dbo, tablename, schema=schema, commit=commit,
338                         constraints=constraints, metadata=metadata,
339                         dialect=dialect, sample=sample)
340        _todb(table, dbo, tablename, schema=schema, commit=commit,
341              truncate=True)
342
343    finally:
344        if needs_closing:
345            dbo.close()
346
347
348Table.todb = todb
349
350
351def _todb(table, dbo, tablename, schema=None, commit=True, truncate=False):
352
353    # need to deal with polymorphic dbo argument
354    # what sort of duck is it?
355
356    # does it quack like a standard DB-API 2.0 connection?
357    if _is_dbapi_connection(dbo):
358        debug('assuming %r is standard DB-API 2.0 connection', dbo)
359        _todb_dbapi_connection(table, dbo, tablename, schema=schema,
360                               commit=commit, truncate=truncate)
361
362    # does it quack like a standard DB-API 2.0 cursor?
363    elif _is_dbapi_cursor(dbo):
364        debug('assuming %r is standard DB-API 2.0 cursor')
365        _todb_dbapi_cursor(table, dbo, tablename, schema=schema, commit=commit,
366                           truncate=truncate)
367
368    # does it quack like an SQLAlchemy engine?
369    elif _is_sqlalchemy_engine(dbo):
370        debug('assuming %r instance of sqlalchemy.engine.base.Engine', dbo)
371        _todb_sqlalchemy_engine(table, dbo, tablename, schema=schema,
372                                commit=commit, truncate=truncate)
373
374    # does it quack like an SQLAlchemy session?
375    elif _is_sqlalchemy_session(dbo):
376        debug('assuming %r instance of sqlalchemy.orm.session.Session', dbo)
377        _todb_sqlalchemy_session(table, dbo, tablename, schema=schema,
378                                 commit=commit, truncate=truncate)
379
380    # does it quack like an SQLAlchemy connection?
381    elif _is_sqlalchemy_connection(dbo):
382        debug('assuming %r instance of sqlalchemy.engine.base.Connection', dbo)
383        _todb_sqlalchemy_connection(table, dbo, tablename, schema=schema,
384                                    commit=commit, truncate=truncate)
385
386    elif callable(dbo):
387        debug('assuming %r is a function returning standard DB-API 2.0 cursor '
388              'objects', dbo)
389        _todb_dbapi_mkcurs(table, dbo, tablename, schema=schema, commit=commit,
390                           truncate=truncate)
391
392    # some other sort of duck...
393    else:
394        raise ArgumentError('unsupported database object type: %r' % dbo)
395
396
397SQL_TRUNCATE_QUERY = 'DELETE FROM %s'
398SQL_INSERT_QUERY = 'INSERT INTO %s (%s) VALUES (%s)'
399
400
401def _todb_dbapi_connection(table, connection, tablename, schema=None,
402                           commit=True, truncate=False):
403
404    # sanitise table name
405    tablename = _quote(tablename)
406    if schema is not None:
407        tablename = _quote(schema) + '.' + tablename
408    debug('tablename: %r', tablename)
409
410    # sanitise field names
411    it = iter(table)
412    hdr = next(it)
413    flds = list(map(text_type, hdr))
414    colnames = [_quote(n) for n in flds]
415    debug('column names: %r', colnames)
416
417    # determine paramstyle and build placeholders string
418    placeholders = _placeholders(connection, colnames)
419    debug('placeholders: %r', placeholders)
420
421    # get a cursor
422    cursor = connection.cursor()
423
424    if truncate:
425        # TRUNCATE is not supported in some databases and causing locks with
426        # MySQL used via SQLAlchemy, fall back to DELETE FROM for now
427        truncatequery = SQL_TRUNCATE_QUERY % tablename
428        debug('truncate the table via query %r', truncatequery)
429        cursor.execute(truncatequery)
430        # just in case, close and resurrect cursor
431        cursor.close()
432        cursor = connection.cursor()
433
434    insertcolnames = ', '.join(colnames)
435    insertquery = SQL_INSERT_QUERY % (tablename, insertcolnames, placeholders)
436    debug('insert data via query %r' % insertquery)
437    cursor.executemany(insertquery, it)
438
439    # finish up
440    debug('close the cursor')
441    cursor.close()
442
443    if commit:
444        debug('commit transaction')
445        connection.commit()
446
447
448def _todb_dbapi_mkcurs(table, mkcurs, tablename, schema=None, commit=True,
449                       truncate=False):
450
451    # sanitise table name
452    tablename = _quote(tablename)
453    if schema is not None:
454        tablename = _quote(schema) + '.' + tablename
455    debug('tablename: %r', tablename)
456
457    # sanitise field names
458    it = iter(table)
459    hdr = next(it)
460    flds = list(map(text_type, hdr))
461    colnames = [_quote(n) for n in flds]
462    debug('column names: %r', colnames)
463
464    debug('obtain cursor and connection')
465    cursor = mkcurs()
466    # N.B., we depend on this optional DB-API 2.0 attribute being implemented
467    assert hasattr(cursor, 'connection'), \
468        'could not obtain connection via cursor'
469    connection = cursor.connection
470
471    # determine paramstyle and build placeholders string
472    placeholders = _placeholders(connection, colnames)
473    debug('placeholders: %r', placeholders)
474
475    if truncate:
476        # TRUNCATE is not supported in some databases and causing locks with
477        # MySQL used via SQLAlchemy, fall back to DELETE FROM for now
478        truncatequery = SQL_TRUNCATE_QUERY % tablename
479        debug('truncate the table via query %r', truncatequery)
480        cursor.execute(truncatequery)
481        # N.B., may be server-side cursor, need to resurrect
482        cursor.close()
483        cursor = mkcurs()
484
485    insertcolnames = ', '.join(colnames)
486    insertquery = SQL_INSERT_QUERY % (tablename, insertcolnames, placeholders)
487    debug('insert data via query %r' % insertquery)
488    cursor.executemany(insertquery, it)
489    cursor.close()
490
491    if commit:
492        debug('commit transaction')
493        connection.commit()
494
495
496def _todb_dbapi_cursor(table, cursor, tablename, schema=None, commit=True,
497                       truncate=False):
498
499    # sanitise table name
500    tablename = _quote(tablename)
501    if schema is not None:
502        tablename = _quote(schema) + '.' + tablename
503    debug('tablename: %r', tablename)
504
505    # sanitise field names
506    it = iter(table)
507    hdr = next(it)
508    flds = list(map(text_type, hdr))
509    colnames = [_quote(n) for n in flds]
510    debug('column names: %r', colnames)
511
512    debug('obtain connection via cursor')
513    # N.B., we depend on this optional DB-API 2.0 attribute being implemented
514    assert hasattr(cursor, 'connection'), \
515        'could not obtain connection via cursor'
516    connection = cursor.connection
517
518    # determine paramstyle and build placeholders string
519    placeholders = _placeholders(connection, colnames)
520    debug('placeholders: %r', placeholders)
521
522    if truncate:
523        # TRUNCATE is not supported in some databases and causing locks with
524        # MySQL used via SQLAlchemy, fall back to DELETE FROM for now
525        truncatequery = SQL_TRUNCATE_QUERY % tablename
526        debug('truncate the table via query %r', truncatequery)
527        cursor.execute(truncatequery)
528
529    insertcolnames = ', '.join(colnames)
530    insertquery = SQL_INSERT_QUERY % (tablename, insertcolnames, placeholders)
531    debug('insert data via query %r' % insertquery)
532    cursor.executemany(insertquery, it)
533
534    # N.B., don't close the cursor, leave that to the application
535
536    if commit:
537        debug('commit transaction')
538        connection.commit()
539
540
541def _todb_sqlalchemy_engine(table, engine, tablename, schema=None, commit=True,
542                            truncate=False):
543
544    _todb_sqlalchemy_connection(table, engine.connect(), tablename,
545                                schema=schema, commit=commit, truncate=truncate)
546
547
548def _todb_sqlalchemy_connection(table, connection, tablename, schema=None,
549                                commit=True, truncate=False):
550
551    debug('connection: %r', connection)
552
553    # sanitise table name
554    tablename = _quote(tablename)
555    if schema is not None:
556        tablename = _quote(schema) + '.' + tablename
557    debug('tablename: %r', tablename)
558
559    # sanitise field names
560    it = iter(table)
561    hdr = next(it)
562    flds = list(map(text_type, hdr))
563    colnames = [_quote(n) for n in flds]
564    debug('column names: %r', colnames)
565
566    # N.B., we need to obtain a reference to the underlying DB-API connection so
567    # we can import the module and determine the paramstyle
568    proxied_raw_connection = connection.connection
569    actual_raw_connection = proxied_raw_connection.connection
570
571    # determine paramstyle and build placeholders string
572    placeholders = _placeholders(actual_raw_connection, colnames)
573    debug('placeholders: %r', placeholders)
574
575    if commit:
576        debug('begin transaction')
577        trans = connection.begin()
578
579    if truncate:
580        # TRUNCATE is not supported in some databases and causing locks with
581        # MySQL used via SQLAlchemy, fall back to DELETE FROM for now
582        truncatequery = SQL_TRUNCATE_QUERY % tablename
583        debug('truncate the table via query %r', truncatequery)
584        connection.execute(truncatequery)
585
586    insertcolnames = ', '.join(colnames)
587    insertquery = SQL_INSERT_QUERY % (tablename, insertcolnames, placeholders)
588    debug('insert data via query %r' % insertquery)
589    for row in it:
590        connection.execute(insertquery, row)
591
592    # finish up
593
594    if commit:
595        debug('commit transaction')
596        trans.commit()
597
598    # N.B., don't close connection, leave that to the application
599
600
601def _todb_sqlalchemy_session(table, session, tablename, schema=None,
602                             commit=True, truncate=False):
603
604    _todb_sqlalchemy_connection(table, session.connection(), tablename,
605                                schema=schema, commit=commit,
606                                truncate=truncate)
607
608
609def appenddb(table, dbo, tablename, schema=None, commit=True):
610    """
611    Load data into an existing database table via a DB-API 2.0
612    connection or cursor. As :func:`petl.io.db.todb` except that the database
613    table will be appended, i.e., the new data will be inserted into the
614    table, and any existing rows will remain.
615
616    """
617
618    needs_closing = False
619
620    # convenience for working with sqlite3
621    if isinstance(dbo, string_types):
622        import sqlite3
623        dbo = sqlite3.connect(dbo)
624        needs_closing = True
625
626    try:
627        _todb(table, dbo, tablename, schema=schema, commit=commit,
628              truncate=False)
629
630    finally:
631        if needs_closing:
632            dbo.close()
633
634
635Table.appenddb = appenddb
636