1-- test that we can insert the result of a get_changes call into a 2-- logged relation. That's really not a good idea in practical terms, 3-- but provides a nice test. 4-- predictability 5SET synchronous_commit = on; 6SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); 7 ?column? 8---------- 9 init 10(1 row) 11 12-- slot works 13SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); 14 data 15------ 16(0 rows) 17 18-- create some changes 19CREATE TABLE somechange(id serial primary key); 20INSERT INTO somechange DEFAULT VALUES; 21CREATE TABLE changeresult AS 22 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); 23SELECT * FROM changeresult; 24 data 25------------------------------------------------ 26 BEGIN 27 table public.somechange: INSERT: id[integer]:1 28 COMMIT 29(3 rows) 30 31INSERT INTO changeresult 32 SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); 33INSERT INTO changeresult 34 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); 35SELECT * FROM changeresult; 36 data 37-------------------------------------------------------------------------------------------------------------------------------------------------- 38 BEGIN 39 table public.somechange: INSERT: id[integer]:1 40 COMMIT 41 BEGIN 42 table public.changeresult: INSERT: data[text]:'BEGIN' 43 table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1' 44 table public.changeresult: INSERT: data[text]:'COMMIT' 45 COMMIT 46 BEGIN 47 table public.changeresult: INSERT: data[text]:'BEGIN' 48 table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1' 49 table public.changeresult: INSERT: data[text]:'COMMIT' 50 COMMIT 51 BEGIN 52 table public.changeresult: INSERT: data[text]:'BEGIN' 53 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' 54 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1''' 55 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' 56 table public.changeresult: INSERT: data[text]:'COMMIT' 57 COMMIT 58(20 rows) 59 60DROP TABLE changeresult; 61DROP TABLE somechange; 62-- check calling logical decoding from pl/pgsql 63CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$ 64BEGIN 65 RETURN QUERY 66 SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); 67END$$ LANGUAGE plpgsql; 68SELECT * FROM slot_changes_wrapper('regression_slot'); 69 slot_changes_wrapper 70-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 71 BEGIN 72 table public.changeresult: INSERT: data[text]:'BEGIN' 73 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' 74 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1''' 75 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' 76 table public.changeresult: INSERT: data[text]:'COMMIT' 77 table public.changeresult: INSERT: data[text]:'BEGIN' 78 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' 79 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN''''''' 80 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1''''''' 81 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT''''''' 82 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' 83 table public.changeresult: INSERT: data[text]:'COMMIT' 84 COMMIT 85(14 rows) 86 87SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); 88 data 89-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 90 BEGIN 91 table public.changeresult: INSERT: data[text]:'BEGIN' 92 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' 93 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1''' 94 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' 95 table public.changeresult: INSERT: data[text]:'COMMIT' 96 table public.changeresult: INSERT: data[text]:'BEGIN' 97 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' 98 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN''''''' 99 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1''''''' 100 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT''''''' 101 table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' 102 table public.changeresult: INSERT: data[text]:'COMMIT' 103 COMMIT 104(14 rows) 105 106SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); 107 ?column? 108---------- 109 stop 110(1 row) 111 112