1# testing/schema.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8import sys
9
10from . import config
11from . import exclusions
12from .. import event
13from .. import schema
14from .. import types as sqltypes
15from ..util import OrderedDict
16
17
18__all__ = ["Table", "Column"]
19
20table_options = {}
21
22
23def Table(*args, **kw):
24    """A schema.Table wrapper/hook for dialect-specific tweaks."""
25
26    test_opts = {k: kw.pop(k) for k in list(kw) if k.startswith("test_")}
27
28    kw.update(table_options)
29
30    if exclusions.against(config._current, "mysql"):
31        if (
32            "mysql_engine" not in kw
33            and "mysql_type" not in kw
34            and "autoload_with" not in kw
35        ):
36            if "test_needs_fk" in test_opts or "test_needs_acid" in test_opts:
37                kw["mysql_engine"] = "InnoDB"
38            else:
39                kw["mysql_engine"] = "MyISAM"
40    elif exclusions.against(config._current, "mariadb"):
41        if (
42            "mariadb_engine" not in kw
43            and "mariadb_type" not in kw
44            and "autoload_with" not in kw
45        ):
46            if "test_needs_fk" in test_opts or "test_needs_acid" in test_opts:
47                kw["mariadb_engine"] = "InnoDB"
48            else:
49                kw["mariadb_engine"] = "MyISAM"
50
51    # Apply some default cascading rules for self-referential foreign keys.
52    # MySQL InnoDB has some issues around selecting self-refs too.
53    if exclusions.against(config._current, "firebird"):
54        table_name = args[0]
55        unpack = config.db.dialect.identifier_preparer.unformat_identifiers
56
57        # Only going after ForeignKeys in Columns.  May need to
58        # expand to ForeignKeyConstraint too.
59        fks = [
60            fk
61            for col in args
62            if isinstance(col, schema.Column)
63            for fk in col.foreign_keys
64        ]
65
66        for fk in fks:
67            # root around in raw spec
68            ref = fk._colspec
69            if isinstance(ref, schema.Column):
70                name = ref.table.name
71            else:
72                # take just the table name: on FB there cannot be
73                # a schema, so the first element is always the
74                # table name, possibly followed by the field name
75                name = unpack(ref)[0]
76            if name == table_name:
77                if fk.ondelete is None:
78                    fk.ondelete = "CASCADE"
79                if fk.onupdate is None:
80                    fk.onupdate = "CASCADE"
81
82    return schema.Table(*args, **kw)
83
84
85def Column(*args, **kw):
86    """A schema.Column wrapper/hook for dialect-specific tweaks."""
87
88    test_opts = {k: kw.pop(k) for k in list(kw) if k.startswith("test_")}
89
90    if not config.requirements.foreign_key_ddl.enabled_for_config(config):
91        args = [arg for arg in args if not isinstance(arg, schema.ForeignKey)]
92
93    col = schema.Column(*args, **kw)
94    if test_opts.get("test_needs_autoincrement", False) and kw.get(
95        "primary_key", False
96    ):
97
98        if col.default is None and col.server_default is None:
99            col.autoincrement = True
100
101        # allow any test suite to pick up on this
102        col.info["test_needs_autoincrement"] = True
103
104        # hardcoded rule for firebird, oracle; this should
105        # be moved out
106        if exclusions.against(config._current, "firebird", "oracle"):
107
108            def add_seq(c, tbl):
109                c._init_items(
110                    schema.Sequence(
111                        _truncate_name(
112                            config.db.dialect, tbl.name + "_" + c.name + "_seq"
113                        ),
114                        optional=True,
115                    )
116                )
117
118            event.listen(col, "after_parent_attach", add_seq, propagate=True)
119    return col
120
121
122class eq_type_affinity(object):
123    """Helper to compare types inside of datastructures based on affinity.
124
125    E.g.::
126
127        eq_(
128            inspect(connection).get_columns("foo"),
129            [
130                {
131                    "name": "id",
132                    "type": testing.eq_type_affinity(sqltypes.INTEGER),
133                    "nullable": False,
134                    "default": None,
135                    "autoincrement": False,
136                },
137                {
138                    "name": "data",
139                    "type": testing.eq_type_affinity(sqltypes.NullType),
140                    "nullable": True,
141                    "default": None,
142                    "autoincrement": False,
143                },
144            ],
145        )
146
147    """
148
149    def __init__(self, target):
150        self.target = sqltypes.to_instance(target)
151
152    def __eq__(self, other):
153        return self.target._type_affinity is other._type_affinity
154
155    def __ne__(self, other):
156        return self.target._type_affinity is not other._type_affinity
157
158
159class eq_clause_element(object):
160    """Helper to compare SQL structures based on compare()"""
161
162    def __init__(self, target):
163        self.target = target
164
165    def __eq__(self, other):
166        return self.target.compare(other)
167
168    def __ne__(self, other):
169        return not self.target.compare(other)
170
171
172def _truncate_name(dialect, name):
173    if len(name) > dialect.max_identifier_length:
174        return (
175            name[0 : max(dialect.max_identifier_length - 6, 0)]
176            + "_"
177            + hex(hash(name) % 64)[2:]
178        )
179    else:
180        return name
181
182
183def pep435_enum(name):
184    # Implements PEP 435 in the minimal fashion needed by SQLAlchemy
185    __members__ = OrderedDict()
186
187    def __init__(self, name, value, alias=None):
188        self.name = name
189        self.value = value
190        self.__members__[name] = self
191        value_to_member[value] = self
192        setattr(self.__class__, name, self)
193        if alias:
194            self.__members__[alias] = self
195            setattr(self.__class__, alias, self)
196
197    value_to_member = {}
198
199    @classmethod
200    def get(cls, value):
201        return value_to_member[value]
202
203    someenum = type(
204        name,
205        (object,),
206        {"__members__": __members__, "__init__": __init__, "get": get},
207    )
208
209    # getframe() trick for pickling I don't understand courtesy
210    # Python namedtuple()
211    try:
212        module = sys._getframe(1).f_globals.get("__name__", "__main__")
213    except (AttributeError, ValueError):
214        pass
215    if module is not None:
216        someenum.__module__ = module
217
218    return someenum
219