1# $Id: a619749fa4d80c458a3eefe2b419922edc0e16a8 $
2
3"""
4SQLite3 extended database driver.
5"""
6
7__docformat__ = "restructuredtext en"
8
9# ---------------------------------------------------------------------------
10# Imports
11# ---------------------------------------------------------------------------
12
13import os
14import sys
15import re
16
17from grizzled.db.base import (DBDriver, Error, Warning, TableMetadata,
18                              IndexMetadata, RDBMSMetadata)
19
20# ---------------------------------------------------------------------------
21# Exports
22# ---------------------------------------------------------------------------
23
24# ---------------------------------------------------------------------------
25# Classes
26# ---------------------------------------------------------------------------
27
28class SQLite3Driver(DBDriver):
29    """DB Driver for SQLite, using the pysqlite DB API module."""
30
31    def get_import(self):
32        import sqlite3
33        return sqlite3
34
35    def get_display_name(self):
36        return "SQLite3"
37
38    def do_connect(self,
39                   host=None,
40                   port=None,
41                   user='',
42                   password='',
43                   database='default'):
44        dbi = self.get_import()
45        return dbi.connect(database=database, isolation_level=None)
46
47    def get_rdbms_metadata(self, cursor):
48        import sqlite3
49        return RDBMSMetadata('SQLite', 'SQLite 3', sqlite3.sqlite_version)
50
51    def get_tables(self, cursor):
52        cursor.execute("select name from sqlite_master where type = 'table'")
53        table_names = []
54        rs = cursor.fetchone()
55        while rs is not None:
56            table_names += [rs[0]]
57            rs = cursor.fetchone()
58
59        return table_names
60
61    def get_table_metadata(self, table, cursor):
62        self._ensure_valid_table(cursor, table)
63
64        # The table_info pragma returns results looking something like this:
65        #
66        # cid name            type              notnull dflt_value pk
67        # --- --------------- ----------------- ------- ---------- --
68        # 0   id              integer           99      NULL       1
69        # 1   action_time     datetime          99      NULL       0
70        # 2   user_id         integer           99      NULL       0
71        # 3   content_type_id integer           0       NULL       0
72        # 4   object_id       text              0       NULL       0
73        # 5   object_repr     varchar(200)      99      NULL       0
74        # 6   action_flag     smallint unsigned 99      NULL       0
75        # 7   change_message  text              99      NULL       0
76
77        cursor.execute('PRAGMA table_info(%s)' % table)
78        rs = cursor.fetchone()
79        result = []
80
81        char_field_re = re.compile(r'(varchar|char)\((\d+)\)')
82        while rs is not None:
83            (id, name, type, not_null, default_value, is_primary) = rs
84            m = char_field_re.match(type)
85            if m:
86                type = m.group(1)
87                try:
88                    max_char_size = int(m.group(2))
89                except ValueError:
90                    log.error('Bad number in "%s" type for column "%s"' %
91                              (type, name))
92            else:
93                max_char_size = 0
94
95            data = TableMetadata(name, type, max_char_size, 0, 0, not not_null)
96            result += [data]
97
98            rs = cursor.fetchone()
99
100        return result
101
102    def get_index_metadata(self, table, cursor):
103        self._ensure_valid_table(cursor, table)
104
105        # First, get the list of indexes for the table, using the appropriate
106        # pragma. The pragma returns output like this:
107        #
108        # seq name    unique
109        # --- ------- ------
110        # 0   id        0
111        # 1   name      0
112        # 2   address   0
113
114        result = []
115
116        cursor.execute("PRAGMA index_list('%s')" % table)
117        indexes = []
118        rs = cursor.fetchone()
119        while rs is not None:
120            indexes += [(rs[1], rs[2])]
121            rs = cursor.fetchone()
122
123        # Now, get the data about each index, using another pragma. This
124        # pragma returns data like this:
125        #
126        # seqno cid name
127        # ----- --- ---------------
128        # 0     3   content_type_id
129
130        for name, unique in indexes:
131            cursor.execute("PRAGMA index_info('%s')" % name)
132            rs = cursor.fetchone()
133            columns = []
134            while rs is not None:
135                columns += [rs[2]]
136                rs = cursor.fetchone()
137
138            description = 'UNIQUE' if unique else ''
139            result += [IndexMetadata(name, columns, description)]
140
141        return result
142