1#!/bin/sh 2# ---------- 3# slony1_dump.sh 4# 5# 6# 7# This script creates a special data only dump from a subscriber 8# node. The stdout of this script, fed into psql for a database that 9# has the user schema of the replicated database installed, will 10# prepare that database for log archive application. 11# ---------- 12 13# ---- 14# Check for correct usage 15# ---- 16if test $# -lt 2 ; then 17 echo "usage: $0 subscriber-dbname clustername [-omit_copy ]" >&2 18 exit 1 19fi 20 21 22# ---- 23# Remember call arguments and get the nodeId of the DB specified 24# ---- 25dbname=$1 26cluster=$2 27clname="\"_$2\"" 28omit_copy=0 29if test $# -eq 3; then 30 if [ "$3" = "-omit_copy" ]; 31 then 32 omit_copy=1 33 else 34 echo "usage: $0 subscriber-dbname clustername [-omit_copy ]" >&2 35 exit 1 36 fi 37fi 38 39pgc="\"pg_catalog\"" 40nodeid=`psql -q -At -c "select \"_$cluster\".getLocalNodeId('_$cluster')" $dbname` 41 42# ---- 43# Get a list of all replicated table ID's this subscriber receives, 44# and remember the table names. 45# ---- 46tables=`psql -q -At -d $dbname -c \ 47 "select tab_id from $clname.sl_table, $clname.sl_set 48 where tab_set = set_id 49 and exists (select 1 from $clname.sl_subscribe 50 where sub_set = set_id 51 and sub_receiver = $nodeid)"` 52for tab in $tables ; do 53 eval tabname_$tab=`psql -q -At -d $dbname -c \ 54 "select $pgc.quote_ident(tab_nspname) || '.' || 55 $pgc.quote_ident(tab_relname) from 56 $clname.sl_table where tab_id = $tab"` 57done 58 59# ---- 60# Get a list of all replicated sequence ID's this subscriber receives, 61# and remember the sequence names. 62# ---- 63sequences=`psql -q -At -d $dbname -c \ 64 "select seq_id from $clname.sl_sequence, $clname.sl_set 65 where seq_set = set_id 66 and exists (select 1 from $clname.sl_subscribe 67 where sub_set = set_id 68 and sub_receiver = $nodeid)"` 69for seq in $sequences ; do 70 eval seqname_$seq=`psql -q -At -d $dbname -c \ 71 "select $pgc.quote_ident(seq_nspname) || '.' || 72 $pgc.quote_ident(seq_relname) from 73 $clname.sl_sequence where seq_id = $seq"` 74done 75 76 77# ---- 78# Emit SQL code to create the slony specific object required 79# in the remote database. 80# ---- 81cat <<_EOF_ 82start transaction; 83 84 85-- ---------------------------------------------------------------------- 86-- SCHEMA $clname 87-- ---------------------------------------------------------------------- 88create schema $clname; 89 90 91-- ---------------------------------------------------------------------- 92-- TABLE sl_archive_tracking 93-- ---------------------------------------------------------------------- 94create table $clname.sl_archive_tracking ( 95 at_counter bigint, 96 at_created timestamp, 97 at_applied timestamp 98); 99 100-- ----------------------------------------------------------------------------- 101-- FUNCTION sequenceSetValue_offline (seq_id, last_value) 102-- ----------------------------------------------------------------------------- 103create or replace function $clname.sequenceSetValue_offline(text,text, int8) returns int4 104as ' 105declare 106 p_seq_nsp alias for \$1; 107 p_seq_name alias for \$2; 108 p_last_value alias for \$3; 109 110begin 111 112 113 -- ---- 114 -- Update it to the new value 115 -- ---- 116 execute '' select setval(''''''|| p_seq_nsp || ''.'' || 117 p_seq_name || '''''', '''''' || p_last_value || '''''')''; 118 return 0; 119end; 120' language plpgsql; 121-- --------------------------------------------------------------------------------------- 122-- FUNCTION finishTableAfterCopy(table_id) 123-- --------------------------------------------------------------------------------------- 124-- This can just be a simple stub function; it does not need to do anything... 125-- --------------------------------------------------------------------------------------- 126create or replace function $clname.finishTableAfterCopy(int4) returns int4 as 127 'select 1' 128language sql; 129 130-- --------------------------------------------------------------------------------------- 131-- FUNCTION archiveTracking_offline (new_counter, created_timestamp) 132-- --------------------------------------------------------------------------------------- 133create or replace function $clname.archiveTracking_offline(int8, timestamp) returns int8 134as ' 135declare 136 p_new_seq alias for \$1; 137 p_created alias for \$2; 138 v_exp_seq int8; 139 v_old_seq int8; 140begin 141 select at_counter into v_old_seq from $clname.sl_archive_tracking; 142 if not found then 143 raise exception ''Slony-I: current archive tracking status not found''; 144 end if; 145 146 v_exp_seq := p_new_seq - 1; 147 if v_old_seq <> v_exp_seq then 148 raise exception ''Slony-I: node is on archive counter %, this archive log expects %'', 149 v_old_seq, v_exp_seq; 150 end if; 151 raise notice ''Slony-I: Process archive with counter % created %'', p_new_seq, p_created; 152 153 update $clname.sl_archive_tracking 154 set at_counter = p_new_seq, 155 at_created = p_created, 156 at_applied = CURRENT_TIMESTAMP; 157 return p_new_seq; 158end; 159' language plpgsql; 160create table $clname.sl_log_archive ( 161 log_origin int4, 162 log_txid bigint, 163 log_tableid int4, 164 log_actionseq int8, 165 log_tablenspname text, 166 log_tablerelname text, 167 log_cmdtype char, 168 log_cmdupdncols int4, 169 log_cmdargs text[] 170) WITHOUT OIDS; 171 172 173 174create or replace function $clname.slon_quote_brute(p_tab_fqname text) returns text 175as \$\$ 176declare 177 v_fqname text default ''; 178begin 179 v_fqname := '"' || replace(p_tab_fqname,'"','""') || '"'; 180 return v_fqname; 181end; 182\$\$ language plpgsql immutable; 183 184 185create or replace function $clname.log_apply() returns trigger 186as \$\$ 187declare 188 v_command text = 'not implemented yet'; 189 v_list1 text = ''; 190 v_list2 text = ''; 191 v_comma text = ''; 192 v_and text = ''; 193 v_idx integer = 1; 194 v_nargs integer; 195 v_i integer = 0; 196begin 197 v_nargs = array_upper(NEW.log_cmdargs, 1); 198 199 if NEW.log_cmdtype = 'I' then 200 while v_idx < v_nargs loop 201 v_list1 = v_list1 || v_comma || 202 $clname.slon_quote_brute(NEW.log_cmdargs[v_idx]); 203 v_idx = v_idx + 1; 204 if NEW.log_cmdargs[v_idx] is null then 205 v_list2 = v_list2 || v_comma || 'null'; 206 else 207 v_list2 = v_list2 || v_comma || 208 pg_catalog.quote_literal(NEW.log_cmdargs[v_idx]); end if; 209 v_idx = v_idx + 1; 210 211 v_comma = ','; 212 end loop; 213 214 v_command = 'INSERT INTO ' || 215 $clname.slon_quote_brute(NEW.log_tablenspname) || '.' || 216 $clname.slon_quote_brute(NEW.log_tablerelname) || ' (' || 217 v_list1 || ') VALUES (' || v_list2 || ')'; 218 219 execute v_command; 220 end if; 221 if NEW.log_cmdtype = 'U' then 222 v_command = 'UPDATE ONLY ' || 223 $clname.slon_quote_brute(NEW.log_tablenspname) || '.' || 224 $clname.slon_quote_brute(NEW.log_tablerelname) || ' SET '; 225 while v_i < NEW.log_cmdupdncols loop 226 v_command = v_command || v_comma || 227 $clname.slon_quote_brute(NEW.log_cmdargs[v_idx]) || '='; 228 v_idx = v_idx + 1; 229 if NEW.log_cmdargs[v_idx] is null then 230 v_command = v_command || 'null'; 231 else 232 v_command = v_command || 233 pg_catalog.quote_literal(NEW.log_cmdargs[v_idx]); 234 end if; 235 v_idx = v_idx + 1; 236 v_comma = ','; 237 v_i = v_i + 1; 238 end loop; 239 if NEW.log_cmdupdncols = 0 then 240 v_command = v_command || 241 $clname.slon_quote_brute(NEW.log_cmdargs[1]) || '=' || 242 $clname.slon_quote_brute(NEW.log_cmdargs[1]); 243 end if; 244 v_command = v_command || ' WHERE '; 245 while v_idx < v_nargs loop 246 v_command = v_command || v_and || 247 $clname.slon_quote_brute(NEW.log_cmdargs[v_idx]) || '='; 248 v_idx = v_idx + 1; 249 if NEW.log_cmdargs[v_idx] is null then 250 v_command = v_command || 'null'; 251 else 252 v_command = v_command || 253 pg_catalog.quote_literal(NEW.log_cmdargs[v_idx]); 254 end if; 255 v_idx = v_idx + 1; 256 257 v_and = ' AND '; 258 end loop; 259 execute v_command; 260 end if; 261 262 if NEW.log_cmdtype = 'D' then 263 v_command = 'DELETE FROM ONLY ' || 264 $clname.slon_quote_brute(NEW.log_tablenspname) || '.' || 265 $clname.slon_quote_brute(NEW.log_tablerelname) || ' WHERE '; 266 while v_idx < v_nargs loop 267 v_command = v_command || v_and || 268 $clname.slon_quote_brute(NEW.log_cmdargs[v_idx]) || '='; 269 v_idx = v_idx + 1; 270 if NEW.log_cmdargs[v_idx] is null then 271 v_command = v_command || 'null'; 272 else 273 v_command = v_command || 274 pg_catalog.quote_literal(NEW.log_cmdargs[v_idx]); 275 end if; 276 v_idx = v_idx + 1; 277 278 v_and = ' AND '; 279 end loop; 280 281 execute v_command; 282 end if; 283 if NEW.log_cmdtype = 'S' then 284 execute 'set session_replication_role to local;'; 285 execute NEW.log_cmdargs[1]; 286 execute 'set session_replication_role to replica;'; 287 288 end if; 289 if NEW.log_cmdtype = 'T' then 290 execute 'TRUNCATE TABLE ONLY ' || 291 $clname.slon_quote_brute(NEW.log_tablenspname) || '.' || 292 $clname.slon_quote_brute(NEW.log_tablerelname) || ' CASCADE'; 293 end if; 294 295 return NULL; 296end; 297\$\$ language plpgsql; 298create trigger apply_trigger 299 before INSERT on $clname.sl_log_archive 300 for each row execute procedure $clname.log_apply(); 301alter table $clname.sl_log_archive 302 enable replica trigger apply_trigger; 303set session_replication_role='replica'; 304 305_EOF_ 306 307 308if [ "$omit_copy" != "1" ]; then 309 for tab in $tables 310 do 311 eval tabname=\$tabname_$tab 312 echo "truncate $tabname cascade;"; 313 done 314fi 315 316# ---- 317# The remainder of this script is written in a way that 318# all output is generated by psql inside of one serializable 319# transaction, so that we get a consistent snapshot of the 320# replica. 321# ---- 322 323( 324echo "start transaction;" 325echo "set transaction isolation level serializable;" 326 327# ---- 328# Provide initial values for all sequences. 329# ---- 330for seq in $sequences ; do 331 eval seqname=\$seqname_$seq 332 schema=`echo $seqname|cut -d'.' -f1` 333 name=`echo $seqname|cut -d'.' -f2` 334 echo "select E'select $clname.sequenceSetValue_offline(''$schema'',''$name'', ''' || last_value::text || E''');' from $seqname;" 335done 336 337# ---- 338# Fill the setsync tracking table with the current status 339# ---- 340echo "select 'insert into $clname.sl_archive_tracking values (' || 341 ac_num::text || ', ''' || ac_timestamp::text || 342 ''', CURRENT_TIMESTAMP);' 343 from $clname.sl_archive_counter;"; 344 345# ---- 346# Now dump all the user table data 347# ---- 348if [ "$omit_copy" -eq "0" ]; then 349 system_type=`uname` 350 for tab in $tables ; do 351 eval tabname=\$tabname_$tab 352 353 # Get fieldnames... 354 fields=`psql -At -c "select $clname.copyfields($tab);" $dbname` 355 echo "select 'copy $tabname $fields from stdin;';" 356 echo "copy $tabname $fields to stdout;" 357 printf "select E'\\\\\\\\.';" 358 done 359fi 360 361# ---- 362# Commit the transaction here in the replica that provided us 363# with the information. 364# ---- 365echo "commit;" 366) | psql -q -At -d $dbname 367 368 369# ---- 370# Emit the commit for the dump to stdout. 371# ---- 372echo "commit;" 373 374exit 0 375 376