1# -*- perl -*-
2#
3# Author: Christopher Browne
4# Copyright 2004-2009 Afilias Canada
5
6use POSIX;
7use Errno;
8use File::Temp qw/ tempfile tempdir /;
9
10sub add_node {
11  my %PARAMS = (host=> undef,
12		dbname => 'template1',
13		port => 5432,
14		user => 'postgres',
15		node => undef,
16		password => undef,
17		parent => undef,
18		noforward => undef,
19		sslmode => undef,
20		options => undef,
21		config => undef
22	       );
23  my $K;
24  while ($K= shift) {
25    $PARAMS{$K} = shift;
26  }
27   die ("I need a node number") unless $PARAMS{'node'};
28  my $node = $PARAMS{'node'};
29  push @NODES, $node;
30  my $loginstr;
31  my $host = $PARAMS{'host'};
32  if ($host) {
33    $loginstr .= "host=$host";
34    $HOST[$node] = $host;
35  } else {
36    die("I need a host name") unless $PARAMS{'host'};
37  }
38  my $dbname = $PARAMS{'dbname'};
39  if ($dbname) {
40    $loginstr .= " dbname=$dbname";
41    $DBNAME[$node] = $dbname;
42  }
43  my $user=$PARAMS{'user'};
44  $loginstr .= " user=$user";
45  $USER[$node]= $user;
46
47  my $port = $PARAMS{'port'};
48  if ($port) {
49    $loginstr .= " port=$port";
50    $PORT[$node] = $port;
51  } else {
52    die ("I need a port number");
53  }
54  my $password = $PARAMS{'password'};
55  if ($password) {
56    $loginstr .= " password=$password";
57    $PASSWORD[$node] = $password;
58  }
59  my $sslmode = $PARAMS{'sslmode'};
60  if ($sslmode) {
61    $loginstr .= " sslmode=$sslmode";
62    $SSLMODE[$node] = $sslmode;
63  }
64  $DSN[$node] = $loginstr;
65  my $parent = $PARAMS{'parent'};
66  if ($parent) {
67    $PARENT[$node] = $parent;
68  }
69  my $noforward = $PARAMS{'noforward'};
70  if ($noforward) {
71    $NOFORWARD[$node] = $noforward;
72  }
73  my $options = $PARAMS{'options'};
74  if ($options) {
75    $OPTIONS[$node] = $options;
76  }
77  my $config = $PARAMS{ 'config' };
78  if ($config) {
79    $CONFIG[$node] = $config;
80  }
81}
82
83# This is the usual header to a slonik invocation that declares the
84# cluster name and the set of nodes and how to connect to them.
85sub genheader {
86  my $header = "cluster name = $CLUSTER_NAME;\n";
87  foreach my $node (@NODES) {
88    if ($DSN[$node]) {
89      my $dsn = $DSN[$node];
90      $header .= " node $node admin conninfo='$dsn';\n";
91    }
92  }
93  return $header;
94}
95
96# Stores copy of slonik script in log file in $LOGDIR
97# then invokes it and deletes it
98sub run_slonik_script {
99  my ($script) = @_;
100  open(OUT, ">>$LOGDIR/slonik_scripts.log");
101  my $now = `date`;
102  chomp $now;
103  print OUT "# -------------------------------------------------------------\n";
104  print OUT "# Script: $script submitted at $now \n";
105  print OUT "# -------------------------------------------------------------\n";
106  print OUT $script;
107  close OUT;
108  print $script;
109}
110
111sub ps_args {
112  my $sys=`uname`;
113  chomp $sys;   # strip off edges
114  if ($sys eq "Linux") {
115    return "/bin/ps auxww";
116  } elsif ($sys eq "FreeBSD") {
117    return "/bin/ps -auxww";
118  } elsif ($sys eq "SunOS") {
119    return "/usr/ucb/ps -auxww";
120  } elsif ($sys eq "AIX") {
121    return "/usr/bin/ps auxww";
122  } elsif ($sys eq "Darwin") {
123    return "/bin/ps auxww";
124  }
125  return "/usr/bin/ps -auxww";    # This may be questionable for other systems; extend as needed!
126}
127
128sub get_pid {
129  my ($node) = @_;
130  $node =~ /^(?:node)?(\d+)$/;
131  my $nodenum = $1;
132  my $pid;
133  my ($dsn, $config) = ($DSN[$nodenum], $CONFIG[$nodenum]);
134  #  print "Searching for PID for $dbname on port $dbport\n";
135
136  $PIDFILE_DIR ||= '/var/run/slony1';
137  $PIDFILE_PREFIX ||= $CLUSTER_NAME;
138
139  my $pidfile;
140  $pidfile = "$PIDFILE_DIR/$PIDFILE_PREFIX" . "_node$nodenum.pid";
141
142  open my $in, '<' , $pidfile or return '';
143
144  while( <$in> ) {
145    $pid = $_;
146  }
147
148  #print "Command:\n$command\n";
149  chomp $pid;
150
151  #make sure the pid actually exists
152  kill(0,$pid);
153  if ($! == Errno::ESRCH) {
154	  return 0;
155  }
156
157  return $pid;
158}
159
160sub start_slon {
161  my ($nodenum) = @_;
162  my ($dsn, $dbname, $opts, $config) = ($DSN[$nodenum], $DBNAME[$nodenum], $OPTIONS[$nodenum], $CONFIG[$nodenum]);
163  $SYNC_CHECK_INTERVAL ||= 1000;
164  $DEBUGLEVEL ||= 0;
165  $LOG_NAME_SUFFIX ||= '%Y-%m-%d';
166  $PIDFILE_DIR ||= '/var/run/slony1';
167  $PIDFILE_PREFIX ||= $CLUSTER_NAME;
168
169  # system("mkdir -p $PIDFILE_DIR" );
170  system("mkdir -p $LOGDIR/node$nodenum");
171
172  my $cmd,$pidfile;
173
174  $pidfile = "$PIDFILE_DIR/$PIDFILE_PREFIX" . "_node$nodenum.pid";
175
176  if ($config) {
177     $cmd = "@@SLONBINDIR@@/slon -p $pidfile -f $config ";
178  } else {
179     $cmd = "@@SLONBINDIR@@/slon -p $pidfile -s $SYNC_CHECK_INTERVAL -d$DEBUGLEVEL $opts $CLUSTER_NAME '$dsn' ";
180  }
181  my $logfilesuffix = POSIX::strftime( "$LOG_NAME_SUFFIX",localtime );
182  chomp $logfilesuffix;
183
184  if ($APACHE_ROTATOR) {
185    $cmd .= "2>&1 | $APACHE_ROTATOR \"$LOGDIR/node$nodenum/" . $dbname . "-$logfilesuffix.log\" 10M &";
186  } else {
187    $cmd .= "> $LOGDIR/node$nodenum/$dbname-$logfilesuffix.log 2>&1 &";
188  }
189  print "Invoke slon for node $nodenum - $cmd\n";
190  system ($cmd);
191  # give time to slon daemon start and create pid file
192  sleep 3;
193}
194
195
196$killafter="00:20:00";  # Restart slon after this interval, if there is no activity
197sub query_slony_status {
198  my ($nodenum) = @_;
199
200# Old query - basically looked at how far we are behind
201#   my $query = qq{
202#   select now() - ev_timestamp > '$killafter'::interval as event_old, now() - ev_timestamp as age,
203#        ev_timestamp, ev_seqno, ev_origin as origin
204# from _$CLUSTER_NAME.sl_event events, _$CLUSTER_NAME.sl_subscribe slony_master
205#   where
206#      events.ev_origin = slony_master.sub_provider and
207#      not exists (select * from _$CLUSTER_NAME.sl_subscribe providers
208#                   where providers.sub_receiver = slony_master.sub_provider and
209#                         providers.sub_set = slony_master.sub_set and
210#                         slony_master.sub_active = 't' and
211#                         providers.sub_active = 't')
212# order by ev_origin desc, ev_seqno desc limit 1;
213# };
214
215# New query: Looks to see if an event has been confirmed, for the set,
216# for the master node, within the interval requested
217
218  my $query = qq{
219select * from
220(select now() - con_timestamp < '$killafter'::interval, now() - con_timestamp as age,
221       con_timestamp
222from "_$CLUSTER_NAME".sl_confirm c, "_$CLUSTER_NAME".sl_subscribe slony_master
223  where c.con_origin = slony_master.sub_provider and
224             not exists (select * from "_$CLUSTER_NAME".sl_subscribe providers
225                  where providers.sub_receiver = slony_master.sub_provider and
226                        providers.sub_set = slony_master.sub_set and
227                        slony_master.sub_active = 't' and
228                        providers.sub_active = 't') and
229        c.con_received = "_$CLUSTER_NAME".getLocalNodeId('_$CLUSTER_NAME') and
230        now() - con_timestamp < '$killafter'::interval
231limit 1) as slave_confirmed_events
232union all (select
233now() - con_timestamp < '$killafter'::interval, now() - con_timestamp as age,
234       con_timestamp
235from "_$CLUSTER_NAME".sl_confirm c, "_$CLUSTER_NAME".sl_subscribe slony_master
236  where c.con_origin = "_$CLUSTER_NAME".getLocalNodeId('_$CLUSTER_NAME') and
237             exists (select * from "_$CLUSTER_NAME".sl_subscribe providers
238                  where providers.sub_provider = "_$CLUSTER_NAME".getLocalNodeId('_$CLUSTER_NAME') and
239                        slony_master.sub_active = 't') and
240        now() - con_timestamp < '$killafter'::interval
241limit 1)
242;
243  };
244  my ($port, $host, $dbname, $dbuser, $passwd)= ($PORT[$nodenum], $HOST[$nodenum], $DBNAME[$nodenum], $USER[$nodenum], $PASSWORD[$nodenum]);
245  my $result;
246  if ($passwd) {
247     my ($fh, $filename) = tempfile();
248     chmod( 0600, $filename);
249     print $fh "$host:$port:$dbname:$dbuser:$passwd";
250     close $fh;
251     $result=`PGPASSFILE=$filename @@PGBINDIR@@/psql -p $port -h $host -U $dbuser -c "$query" --tuples-only $dbname`;
252     unlink $filename;
253  } else {
254     $result=`@@PGBINDIR@@/psql -p $port -h $host -U $dbuser -c "$query" --tuples-only $dbname`;
255  }
256  chomp $result;
257  #print "Query was: $query\n";
258  #print "Result was: $result\n";
259  return $result;
260}
261
262# This is a horrible function name, but it really *is* what it should
263# be called.
264sub get_set {
265    my $set = shift();
266    my $match;
267    my $name;
268
269    # If the variables are already set through $ENV{SLONYSET}, just
270    # make sure we have an integer for $SET_ID
271    if ($TABLE_ID) {
272	return 0 unless $set =~ /^(?:set)?(\d+)$/;
273	return $1;
274    }
275
276    # Die if we don't have any sets defined in the configuration file.
277    unless (defined $SLONY_SETS
278	    and ref($SLONY_SETS) eq "HASH"
279	    and keys %{$SLONY_SETS}) {
280	die "There are no sets defined in your configuration file.";
281    }
282
283    # Is this a set name or number?
284    if ($SLONY_SETS->{$set}) {
285	$match = $SLONY_SETS->{$set};
286	$name  = $set;
287    }
288    elsif ($set =~ /^(?:set)?(\d+)$/) {
289	$set = $1;
290	($name) = grep { $SLONY_SETS->{$_}->{"set_id"} == $set } keys %{$SLONY_SETS};
291	$match = $SLONY_SETS->{$name};
292    }
293    else {
294	return 0;
295    }
296
297    # Set the variables for this set.
298    $SET_NAME     = $name;
299    $SET_ORIGIN   = ($match->{"origin"} or $MASTERNODE);
300    $TABLE_ID     = $match->{"table_id"};
301    $SEQUENCE_ID  = $match->{"sequence_id"};
302    @PKEYEDTABLES = @{$match->{"pkeyedtables"}};
303    %KEYEDTABLES  = %{$match->{"keyedtables"}};
304    @SEQUENCES    = @{$match->{"sequences"}};
305    $FOLD_CASE    = ($match->{"foldCase"} or 0);
306
307	if(defined($match->{"serialtables"}) &&
308	   scalar(@{$match->{"serialtables"}}) > 0 ) {
309		   # slony generated primary keys have
310		   # been deprecated.
311		   #
312		die "primary keys generated by slony (serialtables) are no longer "
313			. "supported by slony-I. Please remove serialtables"
314			.  "from your config file";
315	}
316    return $match->{"set_id"};
317}
318
319# This function checks to see if there is a still-in-progress subscription
320# It does so by looking to see if there is a SUBSCRIBE_SET event corresponding
321# to a sl_subscribe entry that is not yet active.
322sub node_is_subscribing {
323  my ($nodenum) = @_;
324  my $query = qq{
325select * from "_$CLUSTER_NAME".sl_event e, "_$CLUSTER_NAME".sl_subscribe s
326where ev_origin = "_$CLUSTER_NAME".getlocalnodeid('_$CLUSTER_NAME') and  -- Event on local node
327      ev_type = 'SUBSCRIBE_SET' and                            -- Event is SUBSCRIBE SET
328      --- Then, match criteria against sl_subscribe
329      sub_set::text = ev_data1 and sub_provider::text = ev_data2 and sub_receiver::text = ev_data3 and
330      (case sub_forward when 'f' then 'f'::text when 't' then 't'::text end) = ev_data4
331      --- And we're looking for a subscription that is not yet active
332      and not sub_active
333limit 1;   --- One such entry is sufficient...
334};
335  my ($port, $host, $dbname, $dbuser, $passwd)= ($PORT[$nodenum], $HOST[$nodenum], $DBNAME[$nodenum], $USER[$nodenum], $PASSWORD[$nodenum]);
336  my $result;
337  if ($passwd) {
338     my ($fh, $filename) = tempfile();
339     chmod(0600,$filename);
340     print $fh "$host:$port:$dbname:$dbuser:$passwd";
341     close $fh;
342     $result=`PGPASSFILE=$filename @@PGBINDIR@@/psql -p $port -h $host -U $dbuser -c "$query" --tuples-only $dbname`;
343     unlink $filename;
344  } else {
345     $result=`@@PGBINDIR@@/psql -p $port -h $host -c "$query" -U $dbuser --tuples-only $dbname`;
346  }
347  chomp $result;
348  #print "Query was: $query\n";
349  #print "Result was: $result\n";
350  return $result;
351}
352
3531;
354