1#!/bin/sh
2# ----------
3# slony1_dump.sh
4#
5#
6#
7#	This script creates a special data only dump from a subscriber
8#	node. The stdout of this script, fed into psql for a database that
9#	has the user schema of the replicated database installed, will
10#	prepare that database for log archive application.
11# ----------
12
13# ----
14# Check for correct usage
15# ----
16if test $# -lt 2 ; then
17	echo "usage: $0 subscriber-dbname clustername [-omit_copy ]" >&2
18	exit 1
19fi
20
21
22# ----
23# Remember call arguments and get the nodeId of the DB specified
24# ----
25dbname=$1
26cluster=$2
27clname="\"_$2\""
28omit_copy=0
29if test $# -eq 3; then
30	if [ "$3" = "-omit_copy" ];
31	then
32		omit_copy=1
33	else
34		echo "usage: $0 subscriber-dbname clustername [-omit_copy ]" >&2
35		exit 1
36    fi
37fi
38
39pgc="\"pg_catalog\""
40nodeid=`psql -q -At -c "select \"_$cluster\".getLocalNodeId('_$cluster')" $dbname`
41
42# ----
43# Get a list of all replicated table ID's this subscriber receives,
44# and remember the table names.
45# ----
46tables=`psql -q -At -d $dbname -c \
47		"select tab_id from $clname.sl_table, $clname.sl_set
48				where tab_set = set_id
49					and exists (select 1 from $clname.sl_subscribe
50							where sub_set = set_id
51								and sub_receiver = $nodeid)"`
52for tab in $tables ; do
53	eval tabname_$tab=`psql -q -At -d $dbname -c \
54			"select $pgc.quote_ident(tab_nspname) || '.' ||
55					$pgc.quote_ident(tab_relname) from
56					$clname.sl_table where tab_id = $tab"`
57done
58
59# ----
60# Get a list of all replicated sequence ID's this subscriber receives,
61# and remember the sequence names.
62# ----
63sequences=`psql -q -At -d $dbname -c \
64		"select seq_id from $clname.sl_sequence, $clname.sl_set
65				where seq_set = set_id
66					and exists (select 1 from $clname.sl_subscribe
67							where sub_set = set_id
68								and sub_receiver = $nodeid)"`
69for seq in $sequences ; do
70	eval seqname_$seq=`psql -q -At -d $dbname -c \
71			"select $pgc.quote_ident(seq_nspname) || '.' ||
72					$pgc.quote_ident(seq_relname) from
73					$clname.sl_sequence where seq_id = $seq"`
74done
75
76
77# ----
78# Emit SQL code to create the slony specific object required
79# in the remote database.
80# ----
81cat <<_EOF_
82start transaction;
83
84
85-- ----------------------------------------------------------------------
86-- SCHEMA $clname
87-- ----------------------------------------------------------------------
88create schema $clname;
89
90
91-- ----------------------------------------------------------------------
92-- TABLE sl_archive_tracking
93-- ----------------------------------------------------------------------
94create table $clname.sl_archive_tracking (
95	at_counter			bigint,
96	at_created			timestamp,
97	at_applied			timestamp
98);
99
100-- -----------------------------------------------------------------------------
101-- FUNCTION sequenceSetValue_offline (seq_id, last_value)
102-- -----------------------------------------------------------------------------
103create or replace function $clname.sequenceSetValue_offline(text,text, int8) returns int4
104as '
105declare
106	p_seq_nsp			alias for \$1;
107    p_seq_name          alias for \$2;
108	p_last_value		alias for \$3;
109
110begin
111
112
113	-- ----
114	-- Update it to the new value
115	-- ----
116	execute '' select setval(''''''|| p_seq_nsp || ''.'' || 
117			p_seq_name || '''''', '''''' || p_last_value || '''''')'';
118	return 0;
119end;
120' language plpgsql;
121-- ---------------------------------------------------------------------------------------
122-- FUNCTION finishTableAfterCopy(table_id)
123-- ---------------------------------------------------------------------------------------
124-- This can just be a simple stub function; it does not need to do anything...
125-- ---------------------------------------------------------------------------------------
126create or replace function $clname.finishTableAfterCopy(int4) returns int4 as
127  'select 1'
128language sql;
129
130-- ---------------------------------------------------------------------------------------
131-- FUNCTION archiveTracking_offline (new_counter, created_timestamp)
132-- ---------------------------------------------------------------------------------------
133create or replace function $clname.archiveTracking_offline(int8, timestamp) returns int8
134as '
135declare
136	p_new_seq	alias for \$1;
137	p_created	alias for \$2;
138	v_exp_seq	int8;
139	v_old_seq	int8;
140begin
141	select at_counter into v_old_seq from $clname.sl_archive_tracking;
142	if not found then
143		raise exception ''Slony-I: current archive tracking status not found'';
144	end if;
145
146	v_exp_seq := p_new_seq - 1;
147	if v_old_seq <> v_exp_seq then
148		raise exception ''Slony-I: node is on archive counter %, this archive log expects %'', 
149			v_old_seq, v_exp_seq;
150	end if;
151	raise notice ''Slony-I: Process archive with counter % created %'', p_new_seq, p_created;
152
153	update $clname.sl_archive_tracking
154		set at_counter = p_new_seq,
155			at_created = p_created,
156			at_applied = CURRENT_TIMESTAMP;
157	return p_new_seq;
158end;
159' language plpgsql;
160create table $clname.sl_log_archive (
161	log_origin			int4,
162	log_txid			bigint,
163	log_tableid			int4,
164	log_actionseq		int8,
165	log_tablenspname	text,
166	log_tablerelname	text,
167	log_cmdtype			char,
168	log_cmdupdncols		int4,
169	log_cmdargs			text[]
170) WITHOUT OIDS;
171
172
173
174create or replace function $clname.slon_quote_brute(p_tab_fqname text) returns text
175as \$\$
176declare	
177    v_fqname text default '';
178begin
179    v_fqname := '"' || replace(p_tab_fqname,'"','""') || '"';
180    return v_fqname;
181end;
182\$\$ language plpgsql immutable;
183
184
185create or replace function $clname.log_apply() returns trigger
186as \$\$
187declare
188	v_command	text = 'not implemented yet';
189	v_list1		text = '';
190	v_list2		text = '';
191	v_comma		text = '';
192	v_and		text = '';
193	v_idx		integer = 1;
194	v_nargs		integer;
195	v_i			integer = 0;
196begin
197	v_nargs = array_upper(NEW.log_cmdargs, 1);
198
199	if NEW.log_cmdtype = 'I' then
200		while v_idx < v_nargs loop
201			v_list1 = v_list1 || v_comma ||
202				$clname.slon_quote_brute(NEW.log_cmdargs[v_idx]);
203			v_idx = v_idx + 1;
204			if NEW.log_cmdargs[v_idx] is null then
205			   v_list2 = v_list2 || v_comma || 'null';
206			else 
207			     v_list2 = v_list2 || v_comma ||
208			     	pg_catalog.quote_literal(NEW.log_cmdargs[v_idx]);			end if;
209			v_idx = v_idx + 1;
210
211			v_comma = ',';
212		end loop;
213
214		v_command = 'INSERT INTO ' || 
215			$clname.slon_quote_brute(NEW.log_tablenspname) || '.' ||
216			$clname.slon_quote_brute(NEW.log_tablerelname) || ' (' ||
217			v_list1 || ') VALUES (' || v_list2 || ')';
218
219		execute v_command;
220	end if;
221	if NEW.log_cmdtype = 'U' then
222		v_command = 'UPDATE ONLY ' ||
223			$clname.slon_quote_brute(NEW.log_tablenspname) || '.' ||
224			$clname.slon_quote_brute(NEW.log_tablerelname) || ' SET ';
225		while v_i < NEW.log_cmdupdncols loop		      	
226			v_command = v_command || v_comma ||
227				$clname.slon_quote_brute(NEW.log_cmdargs[v_idx]) || '=';
228			v_idx = v_idx + 1;
229			if NEW.log_cmdargs[v_idx] is null then
230			   v_command = v_command || 'null';
231			else 
232			     v_command = v_command ||
233				pg_catalog.quote_literal(NEW.log_cmdargs[v_idx]);
234			end if;
235			v_idx = v_idx + 1;
236			v_comma = ',';
237			v_i = v_i + 1;
238		end loop;
239		if NEW.log_cmdupdncols = 0 then
240			v_command = v_command ||
241				$clname.slon_quote_brute(NEW.log_cmdargs[1]) || '=' ||
242				$clname.slon_quote_brute(NEW.log_cmdargs[1]);
243		end if;
244		v_command = v_command || ' WHERE ';
245		while v_idx < v_nargs loop
246			v_command = v_command || v_and ||
247				$clname.slon_quote_brute(NEW.log_cmdargs[v_idx]) || '=';
248			v_idx = v_idx + 1;
249			if NEW.log_cmdargs[v_idx] is null then
250			   v_command = v_command || 'null';
251			else
252				v_command = v_command ||
253					  pg_catalog.quote_literal(NEW.log_cmdargs[v_idx]);
254			end if;
255			v_idx = v_idx + 1;
256
257			v_and = ' AND ';
258		end loop;
259		execute v_command;
260	end if;
261
262	if NEW.log_cmdtype = 'D' then
263		v_command = 'DELETE FROM ONLY ' ||
264			$clname.slon_quote_brute(NEW.log_tablenspname) || '.' ||
265			$clname.slon_quote_brute(NEW.log_tablerelname) || ' WHERE ';
266		while v_idx < v_nargs loop
267			v_command = v_command || v_and ||
268				$clname.slon_quote_brute(NEW.log_cmdargs[v_idx]) || '=';
269			v_idx = v_idx + 1;
270			if NEW.log_cmdargs[v_idx] is null then
271			   v_command = v_command || 'null';
272			else
273				v_command = v_command ||
274					  pg_catalog.quote_literal(NEW.log_cmdargs[v_idx]);
275			end if;
276			v_idx = v_idx + 1;
277
278			v_and = ' AND ';
279		end loop;
280
281		execute v_command;
282	end if;
283    if NEW.log_cmdtype = 'S' then
284          execute 'set session_replication_role to local;';
285          execute NEW.log_cmdargs[1];  
286          execute 'set session_replication_role to replica;';
287
288    end if;
289	if NEW.log_cmdtype = 'T' then
290		execute 'TRUNCATE TABLE ONLY ' ||
291			$clname.slon_quote_brute(NEW.log_tablenspname) || '.' ||
292			$clname.slon_quote_brute(NEW.log_tablerelname) || ' CASCADE';
293	end if;
294
295	return NULL;
296end;
297\$\$ language plpgsql;
298create trigger apply_trigger
299		before INSERT on $clname.sl_log_archive
300		for each row execute procedure $clname.log_apply();
301alter table $clname.sl_log_archive
302	  enable replica trigger apply_trigger;
303set session_replication_role='replica';
304
305_EOF_
306
307
308if [ "$omit_copy" != "1" ]; then
309	for tab in $tables
310	do
311		eval tabname=\$tabname_$tab
312		echo "truncate $tabname cascade;";
313	done
314fi
315
316# ----
317# The remainder of this script is written in a way that
318# all output is generated by psql inside of one serializable
319# transaction, so that we get a consistent snapshot of the
320# replica.
321# ----
322
323(
324echo "start transaction;"
325echo "set transaction isolation level serializable;"
326
327# ----
328# Provide initial values for all sequences.
329# ----
330for seq in $sequences ; do
331	eval seqname=\$seqname_$seq
332	schema=`echo $seqname|cut -d'.' -f1`
333	name=`echo $seqname|cut -d'.' -f2`
334	echo "select E'select $clname.sequenceSetValue_offline(''$schema'',''$name'', ''' || last_value::text || E''');' from $seqname;"
335done
336
337# ----
338# Fill the setsync tracking table with the current status
339# ----
340echo "select 'insert into $clname.sl_archive_tracking values (' ||
341			ac_num::text || ', ''' || ac_timestamp::text ||
342			''', CURRENT_TIMESTAMP);'
343			from $clname.sl_archive_counter;";
344
345# ----
346# Now dump all the user table data
347# ----
348if [ "$omit_copy" -eq "0" ]; then
349	system_type=`uname`
350	for tab in $tables ; do
351		eval tabname=\$tabname_$tab
352
353		# Get fieldnames...
354 		fields=`psql -At -c "select $clname.copyfields($tab);" $dbname`
355 		echo "select 'copy $tabname $fields from stdin;';"
356		echo "copy $tabname $fields to stdout;"
357 		printf "select E'\\\\\\\\.';"
358	done
359fi
360
361# ----
362# Commit the transaction here in the replica that provided us
363# with the information.
364# ----
365echo "commit;"
366) | psql -q -At -d $dbname
367
368
369# ----
370# Emit the commit for the dump to stdout.
371# ----
372echo "commit;"
373
374exit 0
375
376