1-- predictability 2SET synchronous_commit = on; 3CREATE TABLE origin_tbl(id serial primary key, data text); 4CREATE TABLE target_tbl(id serial primary key, data text); 5SELECT pg_replication_origin_create('test_decoding: regression_slot'); 6 pg_replication_origin_create 7------------------------------ 8 1 9(1 row) 10 11-- ensure duplicate creations fail 12SELECT pg_replication_origin_create('test_decoding: regression_slot'); 13ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index" 14DETAIL: Key (roname)=(test_decoding: regression_slot) already exists. 15--ensure deletions work (once) 16SELECT pg_replication_origin_create('test_decoding: temp'); 17 pg_replication_origin_create 18------------------------------ 19 2 20(1 row) 21 22SELECT pg_replication_origin_drop('test_decoding: temp'); 23 pg_replication_origin_drop 24---------------------------- 25 26(1 row) 27 28SELECT pg_replication_origin_drop('test_decoding: temp'); 29ERROR: replication origin "test_decoding: temp" does not exist 30-- various failure checks for undefined slots 31select pg_replication_origin_advance('test_decoding: temp', '0/1'); 32ERROR: replication origin "test_decoding: temp" does not exist 33select pg_replication_origin_session_setup('test_decoding: temp'); 34ERROR: replication origin "test_decoding: temp" does not exist 35select pg_replication_origin_progress('test_decoding: temp', true); 36ERROR: replication origin "test_decoding: temp" does not exist 37SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); 38 ?column? 39---------- 40 init 41(1 row) 42 43-- origin tx 44INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again'); 45INSERT INTO target_tbl(data) 46SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); 47-- as is normal, the insert into target_tbl shows up 48SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); 49 data 50---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 51 BEGIN 52 table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN' 53 table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again''' 54 table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT' 55 COMMIT 56(5 rows) 57 58INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again'); 59-- mark session as replaying 60SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); 61 pg_replication_origin_session_setup 62------------------------------------- 63 64(1 row) 65 66-- ensure we prevent duplicate setup 67SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); 68ERROR: cannot setup replication origin when one is already setup 69SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded'); 70 ?column? 71---------- 72 73(1 row) 74 75BEGIN; 76-- setup transaction origin 77SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00'); 78 pg_replication_origin_xact_setup 79---------------------------------- 80 81(1 row) 82 83INSERT INTO target_tbl(data) 84SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); 85COMMIT; 86-- check replication progress for the session is correct 87SELECT pg_replication_origin_session_progress(false); 88 pg_replication_origin_session_progress 89---------------------------------------- 90 0/AABBCCDD 91(1 row) 92 93SELECT pg_replication_origin_session_progress(true); 94 pg_replication_origin_session_progress 95---------------------------------------- 96 0/AABBCCDD 97(1 row) 98 99SELECT pg_replication_origin_session_reset(); 100 pg_replication_origin_session_reset 101------------------------------------- 102 103(1 row) 104 105SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status; 106 local_id | external_id | remote_lsn | ?column? 107----------+--------------------------------+------------+---------- 108 1 | test_decoding: regression_slot | 0/AABBCCDD | t 109(1 row) 110 111-- check replication progress identified by name is correct 112SELECT pg_replication_origin_progress('test_decoding: regression_slot', false); 113 pg_replication_origin_progress 114-------------------------------- 115 0/AABBCCDD 116(1 row) 117 118SELECT pg_replication_origin_progress('test_decoding: regression_slot', true); 119 pg_replication_origin_progress 120-------------------------------- 121 0/AABBCCDD 122(1 row) 123 124-- ensure reset requires previously setup state 125SELECT pg_replication_origin_session_reset(); 126ERROR: no replication origin is configured 127-- and magically the replayed xact will be filtered! 128SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); 129 data 130------ 131(0 rows) 132 133--but new original changes still show up 134INSERT INTO origin_tbl(data) VALUES ('will be replicated'); 135SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); 136 data 137-------------------------------------------------------------------------------- 138 BEGIN 139 table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated' 140 COMMIT 141(3 rows) 142 143SELECT pg_drop_replication_slot('regression_slot'); 144 pg_drop_replication_slot 145-------------------------- 146 147(1 row) 148 149SELECT pg_replication_origin_drop('test_decoding: regression_slot'); 150 pg_replication_origin_drop 151---------------------------- 152 153(1 row) 154 155