1# -*- coding: utf-8 -*- 2"""Test audit columns""" 3 4import pytest 5 6from pyrseas.testutils import AugmentToMapTestCase 7 8CREATE_STMT = "CREATE TABLE t1 (c1 integer, c2 text)" 9FUNC_SRC1 = """ 10BEGIN 11 NEW.modified_by_user = SESSION_USER; 12 NEW.modified_timestamp = CURRENT_TIMESTAMP; 13 RETURN NEW; 14END""" 15 16FUNC_SRC2 = """ 17BEGIN 18 NEW.updated = CURRENT_TIMESTAMP; 19 RETURN NEW; 20END""" 21 22 23class AuditColumnsTestCase(AugmentToMapTestCase): 24 """Test mapping of audit column augmentations""" 25 26 def test_predef_column(self): 27 "Add predefined audit column" 28 augmap = {'schema sd': {'table t1': { 29 'audit_columns': 'created_date_only'}}} 30 dbmap = self.to_map([CREATE_STMT], augmap) 31 expmap = {'columns': [ 32 {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 33 {'created_date': {'type': 'date', 'not_null': True, 34 'default': "('now'::text)::date"}}]} 35 assert expmap == dbmap['schema sd']['table t1'] 36 37 def test_unknown_table(self): 38 "Error on non-existent table" 39 augmap = {'schema sd': {'table t2': { 40 'audit_columns': 'created_date_only'}}} 41 with pytest.raises(KeyError): 42 self.to_map([CREATE_STMT], augmap) 43 44 def test_bad_audit_spec(self): 45 "Error on bad audit column specification" 46 augmap = {'schema sd': {'table t1': { 47 'audit_column': 'created_date_only'}}} 48 with pytest.raises(KeyError): 49 self.to_map([CREATE_STMT], augmap) 50 51 def test_unknown_audit_spec(self): 52 "Error on non-existent audit column specification" 53 augmap = {'schema sd': {'table t1': { 54 'audit_columns': 'created_date'}}} 55 with pytest.raises(KeyError): 56 self.to_map([CREATE_STMT], augmap) 57 58 def test_new_column(self): 59 "Add new (non-predefined) audit column" 60 augmap = {'augmenter': {'columns': { 61 'modified_date': {'type': 'date', 'not_null': True, 62 'default': "('now'::text)::date"}}, 63 'audit_columns': {'modified_date_only': { 64 'columns': ['modified_date']}}}, 65 'schema sd': {'table t1': { 66 'audit_columns': 'modified_date_only'}}} 67 dbmap = self.to_map([CREATE_STMT], augmap) 68 expmap = {'columns': [ 69 {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 70 {'modified_date': {'type': 'date', 'not_null': True, 71 'default': "('now'::text)::date"}}]} 72 assert expmap == dbmap['schema sd']['table t1'] 73 74 def test_rename_column(self): 75 "Add predefined audit column but with new name" 76 augmap = {'augmenter': {'columns': { 77 'modified_timestamp': {'name': 'updated'}}}, 78 'schema sd': {'table t1': { 79 'audit_columns': 'modified_only'}}} 80 dbmap = self.to_map([CREATE_STMT], augmap) 81 colmap = {'columns': [ 82 {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 83 {'updated': {'type': 'timestamp with time zone', 84 'not_null': True}}], 85 'triggers': {'t1_20_audit_modified_only': { 86 'events': ['insert', 'update'], 'level': 'row', 87 'procedure': 'sd.audit_modified', 'timing': 'before'}}} 88 funcmap = {'language': 'plpgsql', 'returns': 'trigger', 89 'security_definer': True, 'description': 90 'Provides modified_timestamp values for audit columns.', 91 'source': FUNC_SRC2} 92 assert dbmap['schema sd']['table t1'] == colmap 93 assert dbmap['schema sd']['function audit_modified()'] == funcmap 94 95 def test_change_column_type(self): 96 "Add predefined audit column but with changed datatype" 97 augmap = {'augmenter': {'columns': {'created_date': {'type': 'text'}}}, 98 'schema sd': {'table t1': { 99 'audit_columns': 'created_date_only'}}} 100 dbmap = self.to_map([CREATE_STMT], augmap) 101 expmap = {'columns': [ 102 {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 103 {'created_date': {'type': 'text', 'not_null': True, 104 'default': "('now'::text)::date"}}]} 105 assert expmap == dbmap['schema sd']['table t1'] 106 107 def test_columns_with_trigger(self): 108 "Add predefined audit columns with trigger" 109 augmap = {'schema sd': {'table t1': {'audit_columns': 'default'}}} 110 dbmap = self.to_map([CREATE_STMT], augmap) 111 expmap = {'columns': [ 112 {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 113 {'modified_by_user': {'type': 'character varying(63)', 114 'not_null': True}}, 115 {'modified_timestamp': {'type': 'timestamp with time zone', 116 'not_null': True}}], 117 'triggers': {'t1_20_audit_default': { 118 'events': ['update'], 'level': 'row', 119 'procedure': 'sd.audit_default', 'timing': 'before'}}} 120 assert expmap == dbmap['schema sd']['table t1'] 121 assert dbmap['schema sd']['function audit_default()'][ 122 'returns'] == 'trigger' 123 assert dbmap['schema sd']['function audit_default()'][ 124 'source'] == FUNC_SRC1 125 126 def test_nondefault_schema_with_trigger(self): 127 "Add predefined audit columns with trigger in a non-default schema" 128 stmts = ["CREATE SCHEMA s1", 129 "CREATE TABLE s1.t1 (c1 integer, c2 text)"] 130 augmap = {'schema s1': {'table t1': {'audit_columns': 'default'}}} 131 dbmap = self.to_map(stmts, augmap) 132 expmap = {'columns': [ 133 {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 134 {'modified_by_user': {'type': 'character varying(63)', 135 'not_null': True}}, 136 {'modified_timestamp': {'type': 'timestamp with time zone', 137 'not_null': True}}], 138 'triggers': {'t1_20_audit_default': { 139 'events': ['update'], 'level': 'row', 140 'procedure': 's1.audit_default', 'timing': 'before'}}} 141 assert expmap == dbmap['schema s1']['table t1'] 142 assert dbmap['schema s1']['function audit_default()']['returns'] == \ 143 'trigger' 144 assert dbmap['schema s1']['function audit_default()'][ 145 'source'] == FUNC_SRC1 146 147 def test_skip_existing_columns(self): 148 "Do not add already existing audit columns" 149 stmts = [CREATE_STMT, 150 "ALTER TABLE t1 ADD modified_by_user varchar(63) NOT NULL", 151 "ALTER TABLE t1 ADD modified_timestamp " 152 "timestamp with time zone NOT NULL"] 153 augmap = {'schema sd': {'table t1': { 154 'audit_columns': 'default'}}} 155 dbmap = self.to_map(stmts, augmap) 156 expmap = [{'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 157 {'modified_by_user': {'type': 'character varying(63)', 158 'not_null': True}}, 159 {'modified_timestamp': {'type': 'timestamp with time zone', 160 'not_null': True}}] 161 assert expmap == dbmap['schema sd']['table t1']['columns'] 162 163 def test_change_existing_columns(self): 164 "Change already existing audit columns" 165 stmts = [CREATE_STMT, "ALTER TABLE t1 ADD modified_by_user text ", 166 "ALTER TABLE t1 ADD modified_timestamp " 167 "timestamp with time zone NOT NULL"] 168 augmap = {'schema sd': {'table t1': {'audit_columns': 'default'}}} 169 dbmap = self.to_map(stmts, augmap) 170 expmap = [{'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 171 {'modified_by_user': {'type': 'character varying(63)', 172 'not_null': True}}, 173 {'modified_timestamp': {'type': 'timestamp with time zone', 174 'not_null': True}}] 175 assert expmap == dbmap['schema sd']['table t1']['columns'] 176 177 def test_custom_function_template(self): 178 "Add new (non-predefined) audit trigger using a function template" 179 template = """ 180 BEGIN 181 NEW.{{modified_by_user}} = SESSION_USER; 182 NEW.{{modified_timestamp}} = CURRENT_TIMESTAMP::timestamp(0); 183 RETURN NEW; 184 END""" 185 source = """ 186 BEGIN 187 NEW.modified_by_user = SESSION_USER; 188 NEW.modified_timestamp = CURRENT_TIMESTAMP::timestamp(0); 189 RETURN NEW; 190 END""" 191 augmap = { 192 'augmenter': { 193 'audit_columns': {'custom': { 194 'columns': ['modified_by_user', 'modified_timestamp'], 195 'triggers': ['custom_audit']}}, 196 'function_templates': {'custom_template': template}, 197 'functions': {'custom_audit()': { 198 'description': 'Maintain custom audit columns', 199 'language': 'plpgsql', 200 'returns': 'trigger', 201 'security_definer': True, 202 'source': '{{custom_template}}'}}, 203 'triggers': {'custom_audit': { 204 'events': ['insert', 'update'], 205 'level': 'row', 206 'name': '{{table_name}}_20_custom_audit', 207 'procedure': 'custom_audit', 208 'timing': 'before'}}}, 209 'schema sd': {'table t1': { 210 'audit_columns': 'custom'}}} 211 dbmap = self.to_map([CREATE_STMT], augmap) 212 expmap = {'columns': [ 213 {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 214 {'modified_by_user': {'type': 'character varying(63)', 215 'not_null': True}}, 216 {'modified_timestamp': {'type': 'timestamp with time zone', 217 'not_null': True}}], 218 'triggers': {'t1_20_custom_audit': { 219 'events': ['insert', 'update'], 'level': 'row', 220 'procedure': 'sd.custom_audit', 'timing': 'before'}}} 221 assert expmap == dbmap['schema sd']['table t1'] 222 assert dbmap['schema sd']['function custom_audit()'][ 223 'returns'] == 'trigger' 224 assert dbmap['schema sd']['function custom_audit()'][ 225 'source'] == source 226 227 def test_custom_function_inline_with_column_substitution(self): 228 "Add new (non-predefined) audit trigger using an inline definition" 229 template = """ 230 BEGIN 231 NEW.{{modified_by_user}} = SESSION_USER; 232 NEW.{{modified_timestamp}} = CURRENT_TIMESTAMP::timestamp(0); 233 RETURN NEW; 234 END""" 235 source = """ 236 BEGIN 237 NEW.modified_by_user = SESSION_USER; 238 NEW.modified_timestamp = CURRENT_TIMESTAMP::timestamp(0); 239 RETURN NEW; 240 END""" 241 augmap = { 242 'augmenter': { 243 'audit_columns': {'custom': { 244 'columns': ['modified_by_user', 'modified_timestamp'], 245 'triggers': ['custom_audit']}}, 246 'functions': {'custom_audit()': { 247 'description': 'Maintain custom audit columns', 248 'language': 'plpgsql', 249 'returns': 'trigger', 250 'security_definer': True, 251 'source': template}}, 252 'triggers': {'custom_audit': { 253 'events': ['insert', 'update'], 254 'level': 'row', 255 'name': '{{table_name}}_20_custom_audit', 256 'procedure': 'custom_audit', 257 'timing': 'before'}}}, 258 'schema sd': {'table t1': { 259 'audit_columns': 'custom'}}} 260 dbmap = self.to_map([CREATE_STMT], augmap) 261 expmap = {'columns': [ 262 {'c1': {'type': 'integer'}}, {'c2': {'type': 'text'}}, 263 {'modified_by_user': {'type': 'character varying(63)', 264 'not_null': True}}, 265 {'modified_timestamp': {'type': 'timestamp with time zone', 266 'not_null': True}}], 267 'triggers': {'t1_20_custom_audit': { 268 'events': ['insert', 'update'], 'level': 'row', 269 'procedure': 'sd.custom_audit', 'timing': 'before'}}} 270 assert expmap == dbmap['schema sd']['table t1'] 271 assert dbmap['schema sd']['function custom_audit()'][ 272 'returns'] == 'trigger' 273 assert dbmap['schema sd']['function custom_audit()'][ 274 'source'] == source 275