1#!/usr/local/bin/perl -w
2# =============================================================================
3# CD-HIT
4# http://cd-hit.org/
5# http://bioinformatics.burnham.org/cd-hi
6#
7# program written by
8#                                      Weizhong Li
9#                                      UCSD, San Diego Supercomputer Center
10#                                      La Jolla, CA, 92093
11#                                      Email liwz@sdsc.edu
12# =============================================================================
13
14use strict;
15no strict "refs";
16
17my $script_name = $0;
18my $script_dir = $0;
19   $script_dir =~ s/[^\/]+$//;
20   $script_dir = "./" unless ($script_dir);
21   chop($script_dir);
22
23my $arg;
24my $in;
25my $indb2;
26my $out;
27my $arg_pass          = "";
28my $para              = "";
29my $host_no           = 0;
30my @hosts             = ();
31my $cd_hit_div_exe    = "$script_dir/cd-hit-div";
32my $cd_hit_div_pl     = "$script_dir/cd-hit-div.pl";
33my $cd_hit_2d_exe     = "$script_dir/cd-hit-2d";
34my $cd_hit_est_2d_exe = "$script_dir/cd-hit-est-2d";
35my $clstr_merge_exe   = "$script_dir/clstr_merge.pl";
36my $seg_no            = 2;
37my $seg_no2           = 8;
38my $restart_in        = "";
39my $queue             = 0;
40my $local_cpu         = 0;
41my $queue_type        = "PBS";
42my $prog              = "cd-hit-2d";
43my %stable_files = ();
44
45while ($arg=shift) {
46  if    ($arg eq "-h" )  { print_usage();}
47  elsif ($arg eq "-i" )  { $in         = shift; }
48  elsif ($arg eq "-i2")  { $indb2      = shift; }
49  elsif ($arg eq "-o" )  { $out        = shift; }
50  elsif ($arg eq "--B")  { $para       = shift; }
51  elsif ($arg eq "--L")  { $local_cpu  = shift; }
52  elsif ($arg eq "--P")  { $prog       = shift; }
53  elsif ($arg eq "--S")  { $seg_no     = shift; }
54  elsif ($arg eq "--S2") { $seg_no2    = shift; }
55  elsif ($arg eq "--Q" ) { $queue      = shift; }
56  elsif ($arg eq "--T" ) { $queue_type = shift; }
57  elsif ($arg eq "--R" ) { $restart_in = shift; }
58  else  {$arg_pass         .= " $arg "; }
59}
60($in and $out) || print_usage();
61if (not ($seg_no2 >1)) {
62  die "You are not dividing 2nd input db, parallel mode won't help";
63}
64
65if ($prog eq "cd-hit-est") {
66  $cd_hit_2d_exe     = $cd_hit_est_2d_exe;
67}
68
69my $pwd            = `pwd`; chop($pwd);
70my $work_dir       = "$out.cd-hit-para-tmp";
71my $restart_file   = "$out.restart";
72my $indiv          = "$work_dir/$in.div";
73my $indiv2         = "$work_dir/$indb2.div";
74my @commands       = ();
75my @command_status = ();
76my $command_no     = 0;
77my $cmd;
78my ($i, $j, $k, $i1, $j1, $k1);
79
80# readin a list of hosts
81if ($para) {
82  open(PARA, "$para") || die "can not open $para";
83  while(my $ll= <PARA>){
84    chop($ll); $ll =~ s/\s//g;
85    next unless ($ll);
86    push(@hosts, $ll); $host_no++;
87  }
88  close(PARA);
89}
90if ($queue) {
91  for ($i=0; $i<$queue; $i++) {
92    push(@hosts, "queue_host.$i");
93  }
94  $host_no = $queue;
95}
96if ($local_cpu) {
97  for ($i=0; $i<$local_cpu; $i++) {
98    push(@hosts, "localhost.$i");
99  }
100  $host_no = $local_cpu;
101}
102
103die "no host" unless $host_no;
104if ($host_no > $seg_no2) {
105  print "$indb2 was divided into $seg_no2 pieces, ";
106  print "Your number of hosts ($host_no) is more than $seg_no, ";
107  print "So, not all of hosts will be used\n";
108}
109
110if (-e $restart_in) {
111  read_restart();
112}
113else {
114  $cmd = `mkdir $work_dir`;
115  assign_commands();
116  write_restart();
117}
118
119
120#dbdiv run on master node?
121if ($command_status[0] eq "wait") {
122  $cmd = `$commands[0]`;
123  $command_status[0] = "done";
124  write_restart();
125
126  for ($i=0; $i<$seg_no; $i++) {
127    my $idb    = "$indiv-$i";
128    $stable_files{$idb} = 1;
129  }
130}
131
132if ($command_status[1] eq "wait") {
133  $cmd = `$commands[1]`;
134  $command_status[1] = "done";
135  write_restart();
136
137  for ($i=0; $i<$seg_no2; $i++) {
138    my $idb    = "$indiv2-$i";
139    $stable_files{$idb} = 1;
140  }
141}
142
143
144
145#main runing loop
146my $sleep_time = 1;
147while(1) {
148  #refresh job status by checking output files
149  #check whether all jobs are done or not
150  my $finish_flag = 1;
151  my $status_change = 0;
152  for ($i=2; $i<$command_no; $i++) {
153    next if ($command_status[$i] eq "done");
154    $finish_flag = 0;
155    my $tcmd = $commands[$i];
156    my $output = "";
157    if ($tcmd =~ / -o\s+(\S+)/) {
158      $output = $1;
159      if ((-s $output) or (-s "$output.clstr")) {
160        $command_status[$i] = "done";
161        $status_change = 1;
162      }
163    }
164  }
165  if ($status_change) {
166    write_restart();
167  }
168  else {
169    sleep($sleep_time); print ".";
170  }
171  last if $finish_flag;
172
173  my $job_sent = 0;
174  for ($i=2; $i<$command_no; $i++) {
175    next if ($command_status[$i] eq "done");
176    next if ($command_status[$i] eq "run");
177    my $tcmd = $commands[$i];
178    my $in1 = "";
179    my $in2 = "";
180    my $out_done = "";
181    if ($tcmd =~ / -i\s+(\S+)/ ) {$in1 = $1;}
182    if ($tcmd =~ / -i2\s+(\S+)/) {$in2 = $1;}
183    if ($tcmd =~ / -o\s+(\S+)/ ) {$out_done = "$1.done";}
184    my $input_flag = 0;
185
186    if (($in1 =~ /\S/) and ($in2 =~ /\S/)) {
187      $input_flag = 1 if ((-s $in1) and (-s $in2));
188    }
189    elsif ($in1 =~ /\S/) {
190      $input_flag = 1 if (-s $in1);
191    }
192    else {
193      die "Error at $tcmd\n";
194    }
195    next unless $input_flag;
196
197    #now input files are ready, wait
198    wait_stable_file($in1);
199    wait_stable_file($in2);
200
201    my $thost_idx = wait_for_available_host();
202    my $thost     = $hosts[$thost_idx];
203    my $tsh   = "$work_dir/$out.$$.$thost_idx.sh";
204    my $tlock = "$work_dir/$out.$$.$thost_idx.lock";
205    my $trm   = "";
206       $trm   = "rm -f $in2" if ($in2 =~ /\S/);
207    open(TSH, "> $tsh") || die;
208    $cmd = `date > $tlock`;
209    print TSH <<EOD;
210date > $tlock
211$tcmd
212$trm
213rm -f $tlock
214date > $out_done
215EOD
216    close(TSH);
217    if ($local_cpu) {
218      $cmd = `sh $tsh  >/dev/null 2>&1 &`;
219      $command_status[$i] = "run";
220      print "run at $thost $tcmd\n";
221    }
222    elsif ($queue and ($queue_type eq "PBS")) {
223      my $t = "para-$thost_idx";
224      open(QUEUE,"| qsub -N $t -o $t.log -e $t.err");
225      print QUEUE "cd $pwd; sh $tsh";
226      close(QUEUE);
227      $command_status[$i] = "run";
228    }
229    elsif ($queue and ($queue_type eq "SGE")) {
230      my $t = "para-$thost_idx";
231      open(QUEUE,"| qsub -N $t");
232      print QUEUE <<EOD;
233#!/bin/sh
234#$ -S /usr/local/bin/bash
235#$ -v PATH
236
237cd $pwd
238sh $tsh
239
240EOD
241      close(QUEUE);
242      $command_status[$i] = "run";
243    }
244    else {
245      $cmd = `ssh -xqf $thost 'cd $pwd; sh $tsh  >/dev/null 2>&1 &'`;
246      $command_status[$i] = "run";
247      print "run at $thost $tcmd\n";
248    }
249    $sleep_time = 1;
250    $job_sent = 1;
251    last;
252  }
253
254  if ((not $job_sent) and ($sleep_time < 60)) {
255    $sleep_time +=5;
256  }
257
258} ############ main run loop
259
260
261######## merge all .clstr file
262my $out_clstr = "$out.clstr";
263if (not -s $out_clstr) {
264
265  my @indb2_left = ();
266  for ($i=0; $i<$seg_no; $i++) {
267    my @t_clstr = ();
268    for ($j=0; $j<$seg_no2; $j++) {
269      my $tclstr = "$indiv2-$j.vs.$i.clstr";
270      if (-s $tclstr) {push(@t_clstr,$tclstr); }
271      else {die "No file $tclstr\n";}
272
273      if ($i==($seg_no-1)) {
274        push(@indb2_left, "$indiv2-$j.vs.$i");
275      }
276    }
277
278    #merge
279    my $tclstrs = join(" ", @t_clstr);
280    if (@t_clstr > 1) {
281      print  "$clstr_merge_exe $tclstrs >> $out_clstr\n";
282      $cmd = `$clstr_merge_exe $tclstrs >> $out_clstr`;
283    }
284    else {
285      print  "cat $tclstrs >> $out_clstr";
286      $cmd = `cat $tclstrs >> $out_clstr`;
287
288    }
289  }
290
291  my $out_clstr_ren = "$out.clstr.$$";
292  open(TMP, $out_clstr) || die;
293  open(OTMP, "> $out_clstr_ren") || die;
294  my $no = 0;
295  my $cno;
296  my $ll;
297  while($ll=<TMP>){
298    if ($ll =~ /^>Cluster (\d+)/) {
299      print OTMP ">Cluster $no\n"; $no++;
300      $cno  = 0;
301    }
302    else {
303      $ll =~ s/^\d+/$cno/;
304      print OTMP $ll;
305      $cno++;
306    }
307  }
308  close(TMP);
309  close(OTMP);
310  sleep(10);
311  $cmd = `mv $out_clstr_ren $out_clstr`;
312
313  my $reps = join(" ", @indb2_left);
314  $cmd = `cat $reps > $out`;
315}
316
317if (1) {
318  $cmd = `grep CPU $work_dir/*log`;
319  my @lls = split(/\n/, $cmd);
320  my $cpu = 0;
321  my $ll;
322  foreach $ll (@lls) {
323    if ($ll =~ /CPU\s+time\s+(\d+)/) {
324      $cpu += $1;
325    }
326  }
327  print "Total CPU time: $cpu\n";
328}
329
330sub wait_for_available_host {
331  my ($i, $j, $k);
332  my $sleep = 30;
333  while(1) {
334
335    for ($i=0; $i<$host_no; $i++) {
336      my $thost = $hosts[$i];
337      my $tlock = "$work_dir/$out.$$.$i.lock";
338      next if (-e $tlock);
339      return $i;
340    }
341    sleep($sleep);
342    $sleep +=30;
343    if ($sleep >= 300) { $sleep = 30; }
344  }
345}
346########## END wait_for_available_host
347
348
349sub wait_stable_file {
350  my ($i, $j, $k);
351  my $f = shift;
352  return if ($stable_files{$f});
353  return unless (-e $f);
354
355  if (-e "$f.done") { $stable_files{$f} = 1; return; }
356  my $size0 = -s $f;
357  while(1) {
358    sleep(10);
359    my $size1 = -s $f;
360    if ($size0 == $size1) { $stable_files{$f} = 1; last; }
361    else {$size0 = $size1; }
362  }
363}
364########## END wait_stable_file
365
366
367sub write_restart {
368  my ($i, $j, $k);
369  open(RES, "> $restart_file") || die;
370
371  for ($i=0; $i<$command_no; $i++) {
372    print RES "$commands[$i]\n$command_status[$i]\n";
373  }
374  close(RES);
375}
376########## END write_restart
377
378sub assign_commands {
379  my ($i, $j, $k);
380  my $cmd;
381  my ($idb, $idbo, $jdb, $jdbo, $idbout, $idblog, $jdblog);
382
383  $command_no = 0;
384  if    ($seg_no >1) {$cmd = "$cd_hit_div_exe -i $in -o $indiv -div $seg_no";}
385  elsif ($seg_no==1) {$cmd = "ln -s ../$in $indiv-0";}
386  else  {die "Wrong --S option";}
387  push(@commands,         $cmd);
388  push(@command_status, "wait");
389  $command_no++;
390
391  if    ($seg_no2 >1){$cmd = "$cd_hit_div_pl $indb2 $indiv2 $seg_no2";}
392  elsif ($seg_no2==1){$cmd = "ln -s ../$indb2 $indiv2-0"; }
393  push(@commands,         $cmd);
394  push(@command_status, "wait");
395  $command_no++;
396
397  for ($j=0; $j<$seg_no2; $j++) {
398    #j for db2
399    $jdb    = "$indiv2-$j";
400    $jdblog = "$jdb.log";
401
402    for ($i=0; $i<$seg_no; $i++) {
403      $idb    = "$indiv-$i";
404      $jdbo   = "$indiv2-$j.vs.$i";
405      $cmd = "$cd_hit_2d_exe -i $idb -i2 $jdb -o $jdbo $arg_pass >> $jdblog";
406      push(@commands,         $cmd);
407      push(@command_status, "wait");
408      $command_no++;
409      $jdb = $jdbo;
410    }
411  }
412} #### END assign_commands
413
414
415sub read_restart {
416  $command_no = 0;
417  open(RRRR, "$restart_in") || die;
418  my $ll;
419  while ($ll = <RRRR>) {
420    chop($ll);
421    push(@commands, $ll);
422    $ll = <RRRR>;
423    chop($ll);
424    push(@command_status, $ll);
425    $command_no++;
426  }
427  close(RRRR);
428}
429########## END read_restart
430
431
432sub print_usage {
433  print <<EOD;
434Usage: $script_name options
435        This script divide a big clustering job into pieces and submit
436        jobs to remote computers over a network to make it parallel.
437        After all the jobs finished, the script merge the clustering
438        results as if you just run a single cd-hit-2d or cd-hit-est-2d.
439
440        You can also use it to divide big jobs on a single computer if
441        your computer does not have enough RAM (with -L option).
442
443Requirements:
444      1 When run this script over a network, the directory where you
445        run the scripts and the input files must be available on
446        all the remote hosts with identical path.
447      2 If you choose "ssh" to submit jobs, you have to have
448        passwordless ssh to any remote host, see ssh manual to
449        know how to set up passwordless ssh.
450      3 I suggest to use queuing system instead of ssh,
451        I currently support PBS and SGE
452      4 cd-hit-2d cd-hit-est-2d cd-hit-div cd-hit-div.pl must be
453        in same directory where this script is in.
454
455Options
456
457     -i  input filename for 1st db in fasta format, required
458     -i2 input filename for 2nd db in fasta format, required
459     -o  output filename, required
460    --P  program, "cd-hit-2d" or "cd-hit-est-2d", default "cd-hit-2d"
461    --B  filename of list of hosts,
462         requred unless -Q or -L option is supplied
463    --L  number of cpus on local computer, default $local_cpu
464         when you are not running it over a cluster, you can use
465         this option to divide a big clustering jobs into small
466         pieces, I suggest you just use "--L 1" unless you have
467         enough RAM for each cpu
468    --S  Number of segments to split 1st db into, default $seg_no
469    --S2 Number of segments to split 2nd db into, default $seg_no2
470    --Q  number of jobs to submit to queue queuing system, default $queue
471         by default, the program use ssh mode to submit remote jobs
472    --T  type of queuing system, "PBS", "SGE" are supported, default $queue_type
473    --R  restart file, used after a crash of run
474     -h  print this help
475
476More cd-hit-2d/cd-hit-est-2d options can be speicified in command line
477
478    Questions, bugs, contact Weizhong Li at liwz\@sdsc.edu
479
480EOD
481  exit;
482}
483#### END print_usage
484