1# testing/schema.py
2# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8from . import exclusions
9from .. import schema, event
10from . import config
11
12__all__ = 'Table', 'Column',
13
14table_options = {}
15
16
17def Table(*args, **kw):
18    """A schema.Table wrapper/hook for dialect-specific tweaks."""
19
20    test_opts = dict([(k, kw.pop(k)) for k in list(kw)
21                      if k.startswith('test_')])
22
23    kw.update(table_options)
24
25    if exclusions.against(config._current, 'mysql'):
26        if 'mysql_engine' not in kw and 'mysql_type' not in kw:
27            if 'test_needs_fk' in test_opts or 'test_needs_acid' in test_opts:
28                kw['mysql_engine'] = 'InnoDB'
29            else:
30                kw['mysql_engine'] = 'MyISAM'
31
32    # Apply some default cascading rules for self-referential foreign keys.
33    # MySQL InnoDB has some issues around seleting self-refs too.
34    if exclusions.against(config._current, 'firebird'):
35        table_name = args[0]
36        unpack = (config.db.dialect.
37                  identifier_preparer.unformat_identifiers)
38
39        # Only going after ForeignKeys in Columns.  May need to
40        # expand to ForeignKeyConstraint too.
41        fks = [fk
42               for col in args if isinstance(col, schema.Column)
43               for fk in col.foreign_keys]
44
45        for fk in fks:
46            # root around in raw spec
47            ref = fk._colspec
48            if isinstance(ref, schema.Column):
49                name = ref.table.name
50            else:
51                # take just the table name: on FB there cannot be
52                # a schema, so the first element is always the
53                # table name, possibly followed by the field name
54                name = unpack(ref)[0]
55            if name == table_name:
56                if fk.ondelete is None:
57                    fk.ondelete = 'CASCADE'
58                if fk.onupdate is None:
59                    fk.onupdate = 'CASCADE'
60
61    return schema.Table(*args, **kw)
62
63
64def Column(*args, **kw):
65    """A schema.Column wrapper/hook for dialect-specific tweaks."""
66
67    test_opts = dict([(k, kw.pop(k)) for k in list(kw)
68                      if k.startswith('test_')])
69
70    if not config.requirements.foreign_key_ddl.enabled_for_config(config):
71        args = [arg for arg in args if not isinstance(arg, schema.ForeignKey)]
72
73    col = schema.Column(*args, **kw)
74    if test_opts.get('test_needs_autoincrement', False) and \
75            kw.get('primary_key', False):
76
77        if col.default is None and col.server_default is None:
78            col.autoincrement = True
79
80        # allow any test suite to pick up on this
81        col.info['test_needs_autoincrement'] = True
82
83        # hardcoded rule for firebird, oracle; this should
84        # be moved out
85        if exclusions.against(config._current, 'firebird', 'oracle'):
86            def add_seq(c, tbl):
87                c._init_items(
88                    schema.Sequence(_truncate_name(
89                        config.db.dialect, tbl.name + '_' + c.name + '_seq'),
90                        optional=True)
91                )
92            event.listen(col, 'after_parent_attach', add_seq, propagate=True)
93    return col
94
95
96def _truncate_name(dialect, name):
97    if len(name) > dialect.max_identifier_length:
98        return name[0:max(dialect.max_identifier_length - 6, 0)] + \
99            "_" + hex(hash(name) % 64)[2:]
100    else:
101        return name
102