1# -*- coding: utf-8 -*- 2from __future__ import absolute_import, print_function, division 3 4 5# standard library dependencies 6import logging 7from petl.compat import next, text_type, string_types 8 9 10# internal dependencies 11from petl.errors import ArgumentError 12from petl.util.base import Table 13from petl.io.db_utils import _is_dbapi_connection, _is_dbapi_cursor, \ 14 _is_sqlalchemy_connection, _is_sqlalchemy_engine, _is_sqlalchemy_session, \ 15 _quote, _placeholders 16from petl.io.db_create import drop_table, create_table 17 18 19logger = logging.getLogger(__name__) 20debug = logger.debug 21warning = logger.warning 22 23 24def fromdb(dbo, query, *args, **kwargs): 25 """Provides access to data from any DB-API 2.0 connection via a given query. 26 E.g., using :mod:`sqlite3`:: 27 28 >>> import petl as etl 29 >>> import sqlite3 30 >>> connection = sqlite3.connect('example.db') 31 >>> table = etl.fromdb(connection, 'SELECT * FROM example') 32 33 E.g., using :mod:`psycopg2` (assuming you've installed it first):: 34 35 >>> import petl as etl 36 >>> import psycopg2 37 >>> connection = psycopg2.connect('dbname=example user=postgres') 38 >>> table = etl.fromdb(connection, 'SELECT * FROM example') 39 40 E.g., using :mod:`pymysql` (assuming you've installed it first):: 41 42 >>> import petl as etl 43 >>> import pymysql 44 >>> connection = pymysql.connect(password='moonpie', database='thangs') 45 >>> table = etl.fromdb(connection, 'SELECT * FROM example') 46 47 The `dbo` argument may also be a function that creates a cursor. N.B., each 48 call to the function should return a new cursor. E.g.:: 49 50 >>> import petl as etl 51 >>> import psycopg2 52 >>> connection = psycopg2.connect('dbname=example user=postgres') 53 >>> mkcursor = lambda: connection.cursor(cursor_factory=psycopg2.extras.DictCursor) 54 >>> table = etl.fromdb(mkcursor, 'SELECT * FROM example') 55 56 The parameter `dbo` may also be an SQLAlchemy engine, session or 57 connection object. 58 59 The parameter `dbo` may also be a string, in which case it is interpreted as 60 the name of a file containing an :mod:`sqlite3` database. 61 62 Note that the default behaviour of most database servers and clients is for 63 the entire result set for each query to be sent from the server to the 64 client. If your query returns a large result set this can result in 65 significant memory usage at the client. Some databases support server-side 66 cursors which provide a means for client libraries to fetch result sets 67 incrementally, reducing memory usage at the client. 68 69 To use a server-side cursor with a PostgreSQL database, e.g.:: 70 71 >>> import petl as etl 72 >>> import psycopg2 73 >>> connection = psycopg2.connect('dbname=example user=postgres') 74 >>> table = etl.fromdb(lambda: connection.cursor(name='arbitrary'), 75 ... 'SELECT * FROM example') 76 77 For more information on server-side cursors see the following links: 78 79 * http://initd.org/psycopg/docs/usage.html#server-side-cursors 80 * http://mysql-python.sourceforge.net/MySQLdb.html#using-and-extending 81 82 """ 83 84 # convenience for working with sqlite3 85 if isinstance(dbo, string_types): 86 import sqlite3 87 dbo = sqlite3.connect(dbo) 88 89 return DbView(dbo, query, *args, **kwargs) 90 91 92class DbView(Table): 93 94 def __init__(self, dbo, query, *args, **kwargs): 95 self.dbo = dbo 96 self.query = query 97 self.args = args 98 self.kwargs = kwargs 99 100 def __iter__(self): 101 102 # does it quack like a standard DB-API 2.0 connection? 103 if _is_dbapi_connection(self.dbo): 104 debug('assuming %r is standard DB-API 2.0 connection', self.dbo) 105 _iter = _iter_dbapi_connection 106 107 # does it quack like a standard DB-API 2.0 cursor? 108 elif _is_dbapi_cursor(self.dbo): 109 debug('assuming %r is standard DB-API 2.0 cursor') 110 warning('using a DB-API cursor with fromdb() is not recommended ' 111 'and may lead to unexpected results, a DB-API connection ' 112 'is better') 113 _iter = _iter_dbapi_cursor 114 115 # does it quack like an SQLAlchemy engine? 116 elif _is_sqlalchemy_engine(self.dbo): 117 debug('assuming %r instance of sqlalchemy.engine.base.Engine', 118 self.dbo) 119 _iter = _iter_sqlalchemy_engine 120 121 # does it quack like an SQLAlchemy session? 122 elif _is_sqlalchemy_session(self.dbo): 123 debug('assuming %r instance of sqlalchemy.orm.session.Session', 124 self.dbo) 125 _iter = _iter_sqlalchemy_session 126 127 # does it quack like an SQLAlchemy connection? 128 elif _is_sqlalchemy_connection(self.dbo): 129 debug('assuming %r instance of sqlalchemy.engine.base.Connection', 130 self.dbo) 131 _iter = _iter_sqlalchemy_connection 132 133 elif callable(self.dbo): 134 debug('assuming %r is a function returning a cursor', self.dbo) 135 _iter = _iter_dbapi_mkcurs 136 137 # some other sort of duck... 138 else: 139 raise ArgumentError('unsupported database object type: %r' % self.dbo) 140 141 return _iter(self.dbo, self.query, *self.args, **self.kwargs) 142 143 144def _iter_dbapi_mkcurs(mkcurs, query, *args, **kwargs): 145 cursor = mkcurs() 146 try: 147 for row in _iter_dbapi_cursor(cursor, query, *args, **kwargs): 148 yield row 149 finally: 150 cursor.close() 151 152 153def _iter_dbapi_connection(connection, query, *args, **kwargs): 154 cursor = connection.cursor() 155 try: 156 for row in _iter_dbapi_cursor(cursor, query, *args, **kwargs): 157 yield row 158 finally: 159 cursor.close() 160 161 162def _iter_dbapi_cursor(cursor, query, *args, **kwargs): 163 cursor.execute(query, *args, **kwargs) 164 # fetch one row before iterating, to force population of cursor.description 165 # which may be postponed if using server-side cursors 166 # not all database drivers populate cursor after execute so we call fetchall 167 try: 168 it = iter(cursor) 169 except TypeError: 170 it = iter(cursor.fetchall()) 171 try: 172 first_row = next(it) 173 except StopIteration: 174 first_row = None 175 # fields should be available now 176 hdr = [d[0] for d in cursor.description] 177 yield tuple(hdr) 178 if first_row is None: 179 return 180 yield first_row 181 for row in it: 182 yield row # don't wrap, return whatever the database engine returns 183 184 185def _iter_sqlalchemy_engine(engine, query, *args, **kwargs): 186 return _iter_sqlalchemy_connection(engine.connect(), query, 187 *args, **kwargs) 188 189 190def _iter_sqlalchemy_connection(connection, query, *args, **kwargs): 191 debug('connection: %r', connection) 192 results = connection.execute(query, *args, **kwargs) 193 hdr = results.keys() 194 yield tuple(hdr) 195 for row in results: 196 yield row 197 198 199def _iter_sqlalchemy_session(session, query, *args, **kwargs): 200 results = session.execute(query, *args, **kwargs) 201 hdr = results.keys() 202 yield tuple(hdr) 203 for row in results: 204 yield row 205 206 207def todb(table, dbo, tablename, schema=None, commit=True, 208 create=False, drop=False, constraints=True, metadata=None, 209 dialect=None, sample=1000): 210 """ 211 Load data into an existing database table via a DB-API 2.0 212 connection or cursor. Note that the database table will be truncated, 213 i.e., all existing rows will be deleted prior to inserting the new data. 214 E.g.:: 215 216 >>> import petl as etl 217 >>> table = [['foo', 'bar'], 218 ... ['a', 1], 219 ... ['b', 2], 220 ... ['c', 2]] 221 >>> # using sqlite3 222 ... import sqlite3 223 >>> connection = sqlite3.connect('example.db') 224 >>> # assuming table "foobar" already exists in the database 225 ... etl.todb(table, connection, 'foobar') 226 >>> # using psycopg2 227 >>> import psycopg2 228 >>> connection = psycopg2.connect('dbname=example user=postgres') 229 >>> # assuming table "foobar" already exists in the database 230 ... etl.todb(table, connection, 'foobar') 231 >>> # using pymysql 232 >>> import pymysql 233 >>> connection = pymysql.connect(password='moonpie', database='thangs') 234 >>> # tell MySQL to use standard quote character 235 ... connection.cursor().execute('SET SQL_MODE=ANSI_QUOTES') 236 >>> # load data, assuming table "foobar" already exists in the database 237 ... etl.todb(table, connection, 'foobar') 238 239 N.B., for MySQL the statement ``SET SQL_MODE=ANSI_QUOTES`` is required to 240 ensure MySQL uses SQL-92 standard quote characters. 241 242 A cursor can also be provided instead of a connection, e.g.:: 243 244 >>> import psycopg2 245 >>> connection = psycopg2.connect('dbname=example user=postgres') 246 >>> cursor = connection.cursor() 247 >>> etl.todb(table, cursor, 'foobar') 248 249 The parameter `dbo` may also be an SQLAlchemy engine, session or 250 connection object. 251 252 The parameter `dbo` may also be a string, in which case it is interpreted 253 as the name of a file containing an :mod:`sqlite3` database. 254 255 If ``create=True`` this function will attempt to automatically create a 256 database table before loading the data. This functionality requires 257 `SQLAlchemy <http://www.sqlalchemy.org/>`_ to be installed. 258 259 **Keyword arguments:** 260 261 table : table container 262 Table data to load 263 dbo : database object 264 DB-API 2.0 connection, callable returning a DB-API 2.0 cursor, or 265 SQLAlchemy connection, engine or session 266 tablename : string 267 Name of the table in the database 268 schema : string 269 Name of the database schema to find the table in 270 commit : bool 271 If True commit the changes 272 create : bool 273 If True attempt to create the table before loading, inferring types 274 from a sample of the data (requires SQLAlchemy) 275 drop : bool 276 If True attempt to drop the table before recreating (only relevant if 277 create=True) 278 constraints : bool 279 If True use length and nullable constraints (only relevant if 280 create=True) 281 metadata : sqlalchemy.MetaData 282 Custom table metadata (only relevant if create=True) 283 dialect : string 284 One of {'access', 'sybase', 'sqlite', 'informix', 'firebird', 'mysql', 285 'oracle', 'maxdb', 'postgresql', 'mssql'} (only relevant if 286 create=True) 287 sample : int 288 Number of rows to sample when inferring types etc. Set to 0 to use the 289 whole table (only relevant if create=True) 290 291 .. note:: 292 293 This function is in principle compatible with any DB-API 2.0 294 compliant database driver. However, at the time of writing some DB-API 295 2.0 implementations, including cx_Oracle and MySQL's 296 Connector/Python, are not compatible with this function, because they 297 only accept a list argument to the cursor.executemany() function 298 called internally by :mod:`petl`. This can be worked around by 299 proxying the cursor objects, e.g.:: 300 301 >>> import cx_Oracle 302 >>> connection = cx_Oracle.Connection(...) 303 >>> class CursorProxy(object): 304 ... def __init__(self, cursor): 305 ... self._cursor = cursor 306 ... def executemany(self, statement, parameters, **kwargs): 307 ... # convert parameters to a list 308 ... parameters = list(parameters) 309 ... # pass through to proxied cursor 310 ... return self._cursor.executemany(statement, parameters, **kwargs) 311 ... def __getattr__(self, item): 312 ... return getattr(self._cursor, item) 313 ... 314 >>> def get_cursor(): 315 ... return CursorProxy(connection.cursor()) 316 ... 317 >>> import petl as etl 318 >>> etl.todb(tbl, get_cursor, ...) 319 320 Note however that this does imply loading the entire table into 321 memory as a list prior to inserting into the database. 322 323 """ 324 325 needs_closing = False 326 327 # convenience for working with sqlite3 328 if isinstance(dbo, string_types): 329 import sqlite3 330 dbo = sqlite3.connect(dbo) 331 needs_closing = True 332 333 try: 334 if create: 335 if drop: 336 drop_table(dbo, tablename, schema=schema, commit=commit) 337 create_table(table, dbo, tablename, schema=schema, commit=commit, 338 constraints=constraints, metadata=metadata, 339 dialect=dialect, sample=sample) 340 _todb(table, dbo, tablename, schema=schema, commit=commit, 341 truncate=True) 342 343 finally: 344 if needs_closing: 345 dbo.close() 346 347 348Table.todb = todb 349 350 351def _todb(table, dbo, tablename, schema=None, commit=True, truncate=False): 352 353 # need to deal with polymorphic dbo argument 354 # what sort of duck is it? 355 356 # does it quack like a standard DB-API 2.0 connection? 357 if _is_dbapi_connection(dbo): 358 debug('assuming %r is standard DB-API 2.0 connection', dbo) 359 _todb_dbapi_connection(table, dbo, tablename, schema=schema, 360 commit=commit, truncate=truncate) 361 362 # does it quack like a standard DB-API 2.0 cursor? 363 elif _is_dbapi_cursor(dbo): 364 debug('assuming %r is standard DB-API 2.0 cursor') 365 _todb_dbapi_cursor(table, dbo, tablename, schema=schema, commit=commit, 366 truncate=truncate) 367 368 # does it quack like an SQLAlchemy engine? 369 elif _is_sqlalchemy_engine(dbo): 370 debug('assuming %r instance of sqlalchemy.engine.base.Engine', dbo) 371 _todb_sqlalchemy_engine(table, dbo, tablename, schema=schema, 372 commit=commit, truncate=truncate) 373 374 # does it quack like an SQLAlchemy session? 375 elif _is_sqlalchemy_session(dbo): 376 debug('assuming %r instance of sqlalchemy.orm.session.Session', dbo) 377 _todb_sqlalchemy_session(table, dbo, tablename, schema=schema, 378 commit=commit, truncate=truncate) 379 380 # does it quack like an SQLAlchemy connection? 381 elif _is_sqlalchemy_connection(dbo): 382 debug('assuming %r instance of sqlalchemy.engine.base.Connection', dbo) 383 _todb_sqlalchemy_connection(table, dbo, tablename, schema=schema, 384 commit=commit, truncate=truncate) 385 386 elif callable(dbo): 387 debug('assuming %r is a function returning standard DB-API 2.0 cursor ' 388 'objects', dbo) 389 _todb_dbapi_mkcurs(table, dbo, tablename, schema=schema, commit=commit, 390 truncate=truncate) 391 392 # some other sort of duck... 393 else: 394 raise ArgumentError('unsupported database object type: %r' % dbo) 395 396 397SQL_TRUNCATE_QUERY = 'DELETE FROM %s' 398SQL_INSERT_QUERY = 'INSERT INTO %s (%s) VALUES (%s)' 399 400 401def _todb_dbapi_connection(table, connection, tablename, schema=None, 402 commit=True, truncate=False): 403 404 # sanitise table name 405 tablename = _quote(tablename) 406 if schema is not None: 407 tablename = _quote(schema) + '.' + tablename 408 debug('tablename: %r', tablename) 409 410 # sanitise field names 411 it = iter(table) 412 hdr = next(it) 413 flds = list(map(text_type, hdr)) 414 colnames = [_quote(n) for n in flds] 415 debug('column names: %r', colnames) 416 417 # determine paramstyle and build placeholders string 418 placeholders = _placeholders(connection, colnames) 419 debug('placeholders: %r', placeholders) 420 421 # get a cursor 422 cursor = connection.cursor() 423 424 if truncate: 425 # TRUNCATE is not supported in some databases and causing locks with 426 # MySQL used via SQLAlchemy, fall back to DELETE FROM for now 427 truncatequery = SQL_TRUNCATE_QUERY % tablename 428 debug('truncate the table via query %r', truncatequery) 429 cursor.execute(truncatequery) 430 # just in case, close and resurrect cursor 431 cursor.close() 432 cursor = connection.cursor() 433 434 insertcolnames = ', '.join(colnames) 435 insertquery = SQL_INSERT_QUERY % (tablename, insertcolnames, placeholders) 436 debug('insert data via query %r' % insertquery) 437 cursor.executemany(insertquery, it) 438 439 # finish up 440 debug('close the cursor') 441 cursor.close() 442 443 if commit: 444 debug('commit transaction') 445 connection.commit() 446 447 448def _todb_dbapi_mkcurs(table, mkcurs, tablename, schema=None, commit=True, 449 truncate=False): 450 451 # sanitise table name 452 tablename = _quote(tablename) 453 if schema is not None: 454 tablename = _quote(schema) + '.' + tablename 455 debug('tablename: %r', tablename) 456 457 # sanitise field names 458 it = iter(table) 459 hdr = next(it) 460 flds = list(map(text_type, hdr)) 461 colnames = [_quote(n) for n in flds] 462 debug('column names: %r', colnames) 463 464 debug('obtain cursor and connection') 465 cursor = mkcurs() 466 # N.B., we depend on this optional DB-API 2.0 attribute being implemented 467 assert hasattr(cursor, 'connection'), \ 468 'could not obtain connection via cursor' 469 connection = cursor.connection 470 471 # determine paramstyle and build placeholders string 472 placeholders = _placeholders(connection, colnames) 473 debug('placeholders: %r', placeholders) 474 475 if truncate: 476 # TRUNCATE is not supported in some databases and causing locks with 477 # MySQL used via SQLAlchemy, fall back to DELETE FROM for now 478 truncatequery = SQL_TRUNCATE_QUERY % tablename 479 debug('truncate the table via query %r', truncatequery) 480 cursor.execute(truncatequery) 481 # N.B., may be server-side cursor, need to resurrect 482 cursor.close() 483 cursor = mkcurs() 484 485 insertcolnames = ', '.join(colnames) 486 insertquery = SQL_INSERT_QUERY % (tablename, insertcolnames, placeholders) 487 debug('insert data via query %r' % insertquery) 488 cursor.executemany(insertquery, it) 489 cursor.close() 490 491 if commit: 492 debug('commit transaction') 493 connection.commit() 494 495 496def _todb_dbapi_cursor(table, cursor, tablename, schema=None, commit=True, 497 truncate=False): 498 499 # sanitise table name 500 tablename = _quote(tablename) 501 if schema is not None: 502 tablename = _quote(schema) + '.' + tablename 503 debug('tablename: %r', tablename) 504 505 # sanitise field names 506 it = iter(table) 507 hdr = next(it) 508 flds = list(map(text_type, hdr)) 509 colnames = [_quote(n) for n in flds] 510 debug('column names: %r', colnames) 511 512 debug('obtain connection via cursor') 513 # N.B., we depend on this optional DB-API 2.0 attribute being implemented 514 assert hasattr(cursor, 'connection'), \ 515 'could not obtain connection via cursor' 516 connection = cursor.connection 517 518 # determine paramstyle and build placeholders string 519 placeholders = _placeholders(connection, colnames) 520 debug('placeholders: %r', placeholders) 521 522 if truncate: 523 # TRUNCATE is not supported in some databases and causing locks with 524 # MySQL used via SQLAlchemy, fall back to DELETE FROM for now 525 truncatequery = SQL_TRUNCATE_QUERY % tablename 526 debug('truncate the table via query %r', truncatequery) 527 cursor.execute(truncatequery) 528 529 insertcolnames = ', '.join(colnames) 530 insertquery = SQL_INSERT_QUERY % (tablename, insertcolnames, placeholders) 531 debug('insert data via query %r' % insertquery) 532 cursor.executemany(insertquery, it) 533 534 # N.B., don't close the cursor, leave that to the application 535 536 if commit: 537 debug('commit transaction') 538 connection.commit() 539 540 541def _todb_sqlalchemy_engine(table, engine, tablename, schema=None, commit=True, 542 truncate=False): 543 544 _todb_sqlalchemy_connection(table, engine.connect(), tablename, 545 schema=schema, commit=commit, truncate=truncate) 546 547 548def _todb_sqlalchemy_connection(table, connection, tablename, schema=None, 549 commit=True, truncate=False): 550 551 debug('connection: %r', connection) 552 553 # sanitise table name 554 tablename = _quote(tablename) 555 if schema is not None: 556 tablename = _quote(schema) + '.' + tablename 557 debug('tablename: %r', tablename) 558 559 # sanitise field names 560 it = iter(table) 561 hdr = next(it) 562 flds = list(map(text_type, hdr)) 563 colnames = [_quote(n) for n in flds] 564 debug('column names: %r', colnames) 565 566 # N.B., we need to obtain a reference to the underlying DB-API connection so 567 # we can import the module and determine the paramstyle 568 proxied_raw_connection = connection.connection 569 actual_raw_connection = proxied_raw_connection.connection 570 571 # determine paramstyle and build placeholders string 572 placeholders = _placeholders(actual_raw_connection, colnames) 573 debug('placeholders: %r', placeholders) 574 575 if commit: 576 debug('begin transaction') 577 trans = connection.begin() 578 579 if truncate: 580 # TRUNCATE is not supported in some databases and causing locks with 581 # MySQL used via SQLAlchemy, fall back to DELETE FROM for now 582 truncatequery = SQL_TRUNCATE_QUERY % tablename 583 debug('truncate the table via query %r', truncatequery) 584 connection.execute(truncatequery) 585 586 insertcolnames = ', '.join(colnames) 587 insertquery = SQL_INSERT_QUERY % (tablename, insertcolnames, placeholders) 588 debug('insert data via query %r' % insertquery) 589 for row in it: 590 connection.execute(insertquery, row) 591 592 # finish up 593 594 if commit: 595 debug('commit transaction') 596 trans.commit() 597 598 # N.B., don't close connection, leave that to the application 599 600 601def _todb_sqlalchemy_session(table, session, tablename, schema=None, 602 commit=True, truncate=False): 603 604 _todb_sqlalchemy_connection(table, session.connection(), tablename, 605 schema=schema, commit=commit, 606 truncate=truncate) 607 608 609def appenddb(table, dbo, tablename, schema=None, commit=True): 610 """ 611 Load data into an existing database table via a DB-API 2.0 612 connection or cursor. As :func:`petl.io.db.todb` except that the database 613 table will be appended, i.e., the new data will be inserted into the 614 table, and any existing rows will remain. 615 616 """ 617 618 needs_closing = False 619 620 # convenience for working with sqlite3 621 if isinstance(dbo, string_types): 622 import sqlite3 623 dbo = sqlite3.connect(dbo) 624 needs_closing = True 625 626 try: 627 _todb(table, dbo, tablename, schema=schema, commit=commit, 628 truncate=False) 629 630 finally: 631 if needs_closing: 632 dbo.close() 633 634 635Table.appenddb = appenddb 636