1# -*- coding: utf-8 -*-
2"""Test audit columns"""
3
4import pytest
5
6from pyrseas.testutils import AugmentToMapTestCase
7
8CREATE_STMT = "CREATE TABLE t1 (c1 integer, c2 text)"
9FUNC_SRC1 = """
10BEGIN
11  NEW.modified_by_user = SESSION_USER;
12  NEW.modified_timestamp = CURRENT_TIMESTAMP;
13  RETURN NEW;
14END"""
15
16FUNC_SRC2 = """
17BEGIN
18  NEW.updated = CURRENT_TIMESTAMP;
19  RETURN NEW;
20END"""
21
22
23class AuditColumnsTestCase(AugmentToMapTestCase):
24    """Test mapping of audit column augmentations"""
25
26    def test_predef_column(self):
27        "Add predefined audit column"
28        augmap = {'schema sd': {'table t1': {
29            'audit_columns': 'created_date_only'}}}
30        dbmap = self.to_map([CREATE_STMT], augmap)
31        expmap = {'columns': [
32            {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
33            {'created_date': {'type': 'date', 'not_null': True,
34                              'default': "('now'::text)::date"}}]}
35        assert expmap == dbmap['schema sd']['table t1']
36
37    def test_unknown_table(self):
38        "Error on non-existent table"
39        augmap = {'schema sd': {'table t2': {
40            'audit_columns': 'created_date_only'}}}
41        with pytest.raises(KeyError):
42            self.to_map([CREATE_STMT], augmap)
43
44    def test_bad_audit_spec(self):
45        "Error on bad audit column specification"
46        augmap = {'schema sd': {'table t1': {
47            'audit_column': 'created_date_only'}}}
48        with pytest.raises(KeyError):
49            self.to_map([CREATE_STMT], augmap)
50
51    def test_unknown_audit_spec(self):
52        "Error on non-existent audit column specification"
53        augmap = {'schema sd': {'table t1': {
54            'audit_columns': 'created_date'}}}
55        with pytest.raises(KeyError):
56            self.to_map([CREATE_STMT], augmap)
57
58    def test_new_column(self):
59        "Add new (non-predefined) audit column"
60        augmap = {'augmenter': {'columns': {
61            'modified_date': {'type': 'date', 'not_null': True,
62                              'default': "('now'::text)::date"}},
63            'audit_columns': {'modified_date_only': {
64                'columns': ['modified_date']}}},
65            'schema sd': {'table t1': {
66                'audit_columns': 'modified_date_only'}}}
67        dbmap = self.to_map([CREATE_STMT], augmap)
68        expmap = {'columns': [
69            {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
70            {'modified_date': {'type': 'date', 'not_null': True,
71                               'default': "('now'::text)::date"}}]}
72        assert expmap == dbmap['schema sd']['table t1']
73
74    def test_rename_column(self):
75        "Add predefined audit column but with new name"
76        augmap = {'augmenter': {'columns': {
77            'modified_timestamp': {'name': 'updated'}}},
78            'schema sd': {'table t1': {
79                'audit_columns': 'modified_only'}}}
80        dbmap = self.to_map([CREATE_STMT], augmap)
81        colmap = {'columns': [
82            {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
83            {'updated': {'type': 'timestamp with time zone',
84                         'not_null': True}}],
85            'triggers': {'t1_20_audit_modified_only': {
86                'events': ['insert', 'update'], 'level': 'row',
87                'procedure': 'sd.audit_modified', 'timing': 'before'}}}
88        funcmap = {'language': 'plpgsql', 'returns': 'trigger',
89                   'security_definer': True, 'description':
90                   'Provides modified_timestamp values for audit columns.',
91                   'source': FUNC_SRC2}
92        assert dbmap['schema sd']['table t1'] == colmap
93        assert dbmap['schema sd']['function audit_modified()'] == funcmap
94
95    def test_change_column_type(self):
96        "Add predefined audit column but with changed datatype"
97        augmap = {'augmenter': {'columns': {'created_date': {'type': 'text'}}},
98                  'schema sd': {'table t1': {
99                      'audit_columns': 'created_date_only'}}}
100        dbmap = self.to_map([CREATE_STMT], augmap)
101        expmap = {'columns': [
102            {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
103            {'created_date': {'type': 'text', 'not_null': True,
104                              'default': "('now'::text)::date"}}]}
105        assert expmap == dbmap['schema sd']['table t1']
106
107    def test_columns_with_trigger(self):
108        "Add predefined audit columns with trigger"
109        augmap = {'schema sd': {'table t1': {'audit_columns': 'default'}}}
110        dbmap = self.to_map([CREATE_STMT], augmap)
111        expmap = {'columns': [
112            {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
113            {'modified_by_user': {'type': 'character varying(63)',
114                                  'not_null': True}},
115            {'modified_timestamp': {'type': 'timestamp with time zone',
116                                    'not_null': True}}],
117            'triggers': {'t1_20_audit_default': {
118                'events': ['update'], 'level': 'row',
119                'procedure': 'sd.audit_default', 'timing': 'before'}}}
120        assert expmap == dbmap['schema sd']['table t1']
121        assert dbmap['schema sd']['function audit_default()'][
122            'returns'] == 'trigger'
123        assert dbmap['schema sd']['function audit_default()'][
124            'source'] == FUNC_SRC1
125
126    def test_nondefault_schema_with_trigger(self):
127        "Add predefined audit columns with trigger in a non-default schema"
128        stmts = ["CREATE SCHEMA s1",
129                 "CREATE TABLE s1.t1 (c1 integer, c2 text)"]
130        augmap = {'schema s1': {'table t1': {'audit_columns': 'default'}}}
131        dbmap = self.to_map(stmts, augmap)
132        expmap = {'columns': [
133            {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
134            {'modified_by_user': {'type': 'character varying(63)',
135                                  'not_null': True}},
136            {'modified_timestamp': {'type': 'timestamp with time zone',
137                                    'not_null': True}}],
138            'triggers': {'t1_20_audit_default': {
139                'events': ['update'], 'level': 'row',
140                'procedure': 's1.audit_default', 'timing': 'before'}}}
141        assert expmap == dbmap['schema s1']['table t1']
142        assert dbmap['schema s1']['function audit_default()']['returns'] == \
143            'trigger'
144        assert dbmap['schema s1']['function audit_default()'][
145            'source'] == FUNC_SRC1
146
147    def test_skip_existing_columns(self):
148        "Do not add already existing audit columns"
149        stmts = [CREATE_STMT,
150                 "ALTER TABLE t1 ADD modified_by_user varchar(63) NOT NULL",
151                 "ALTER TABLE t1 ADD modified_timestamp "
152                 "timestamp with time zone NOT NULL"]
153        augmap = {'schema sd': {'table t1': {
154            'audit_columns': 'default'}}}
155        dbmap = self.to_map(stmts, augmap)
156        expmap = [{'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
157                  {'modified_by_user': {'type': 'character varying(63)',
158                                        'not_null': True}},
159                  {'modified_timestamp': {'type': 'timestamp with time zone',
160                                          'not_null': True}}]
161        assert expmap == dbmap['schema sd']['table t1']['columns']
162
163    def test_change_existing_columns(self):
164        "Change already existing audit columns"
165        stmts = [CREATE_STMT, "ALTER TABLE t1 ADD modified_by_user text ",
166                 "ALTER TABLE t1 ADD modified_timestamp "
167                 "timestamp with time zone NOT NULL"]
168        augmap = {'schema sd': {'table t1': {'audit_columns': 'default'}}}
169        dbmap = self.to_map(stmts, augmap)
170        expmap = [{'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
171                  {'modified_by_user': {'type': 'character varying(63)',
172                                        'not_null': True}},
173                  {'modified_timestamp': {'type': 'timestamp with time zone',
174                                          'not_null': True}}]
175        assert expmap == dbmap['schema sd']['table t1']['columns']
176
177    def test_custom_function_template(self):
178        "Add new (non-predefined) audit trigger using a function template"
179        template = """
180        BEGIN
181          NEW.{{modified_by_user}} = SESSION_USER;
182          NEW.{{modified_timestamp}} = CURRENT_TIMESTAMP::timestamp(0);
183          RETURN NEW;
184        END"""
185        source = """
186        BEGIN
187          NEW.modified_by_user = SESSION_USER;
188          NEW.modified_timestamp = CURRENT_TIMESTAMP::timestamp(0);
189          RETURN NEW;
190        END"""
191        augmap = {
192            'augmenter': {
193                'audit_columns': {'custom': {
194                    'columns': ['modified_by_user', 'modified_timestamp'],
195                    'triggers': ['custom_audit']}},
196                'function_templates': {'custom_template': template},
197                'functions': {'custom_audit()': {
198                    'description': 'Maintain custom audit columns',
199                    'language': 'plpgsql',
200                    'returns': 'trigger',
201                    'security_definer': True,
202                    'source': '{{custom_template}}'}},
203                'triggers': {'custom_audit': {
204                    'events': ['insert', 'update'],
205                    'level': 'row',
206                    'name': '{{table_name}}_20_custom_audit',
207                    'procedure': 'custom_audit',
208                    'timing': 'before'}}},
209            'schema sd': {'table t1': {
210                'audit_columns': 'custom'}}}
211        dbmap = self.to_map([CREATE_STMT], augmap)
212        expmap = {'columns': [
213            {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
214            {'modified_by_user': {'type': 'character varying(63)',
215                                  'not_null': True}},
216            {'modified_timestamp': {'type': 'timestamp with time zone',
217                                    'not_null': True}}],
218            'triggers': {'t1_20_custom_audit': {
219                'events': ['insert', 'update'], 'level': 'row',
220                'procedure': 'sd.custom_audit', 'timing': 'before'}}}
221        assert expmap == dbmap['schema sd']['table t1']
222        assert dbmap['schema sd']['function custom_audit()'][
223            'returns'] == 'trigger'
224        assert dbmap['schema sd']['function custom_audit()'][
225            'source'] == source
226
227    def test_custom_function_inline_with_column_substitution(self):
228        "Add new (non-predefined) audit trigger using an inline definition"
229        template = """
230        BEGIN
231          NEW.{{modified_by_user}} = SESSION_USER;
232          NEW.{{modified_timestamp}} = CURRENT_TIMESTAMP::timestamp(0);
233          RETURN NEW;
234        END"""
235        source = """
236        BEGIN
237          NEW.modified_by_user = SESSION_USER;
238          NEW.modified_timestamp = CURRENT_TIMESTAMP::timestamp(0);
239          RETURN NEW;
240        END"""
241        augmap = {
242            'augmenter': {
243                'audit_columns': {'custom': {
244                    'columns': ['modified_by_user', 'modified_timestamp'],
245                    'triggers': ['custom_audit']}},
246                'functions': {'custom_audit()': {
247                    'description': 'Maintain custom audit columns',
248                    'language': 'plpgsql',
249                    'returns': 'trigger',
250                    'security_definer': True,
251                    'source': template}},
252                'triggers': {'custom_audit': {
253                    'events': ['insert', 'update'],
254                    'level': 'row',
255                    'name': '{{table_name}}_20_custom_audit',
256                    'procedure': 'custom_audit',
257                    'timing': 'before'}}},
258            'schema sd': {'table t1': {
259                'audit_columns': 'custom'}}}
260        dbmap = self.to_map([CREATE_STMT], augmap)
261        expmap = {'columns': [
262            {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}},
263            {'modified_by_user': {'type': 'character varying(63)',
264                                  'not_null': True}},
265            {'modified_timestamp': {'type': 'timestamp with time zone',
266                                    'not_null': True}}],
267            'triggers': {'t1_20_custom_audit': {
268                'events': ['insert', 'update'], 'level': 'row',
269                'procedure': 'sd.custom_audit', 'timing': 'before'}}}
270        assert expmap == dbmap['schema sd']['table t1']
271        assert dbmap['schema sd']['function custom_audit()'][
272            'returns'] == 'trigger'
273        assert dbmap['schema sd']['function custom_audit()'][
274            'source'] == source
275