1#!/usr/local/bin/perl -w 2# ============================================================================= 3# CD-HIT 4# http://cd-hit.org/ 5# http://bioinformatics.burnham.org/cd-hi 6# 7# program written by 8# Weizhong Li 9# UCSD, San Diego Supercomputer Center 10# La Jolla, CA, 92093 11# Email liwz@sdsc.edu 12# ============================================================================= 13 14use strict; 15no strict "refs"; 16 17my $script_name = $0; 18my $script_dir = $0; 19 $script_dir =~ s/[^\/]+$//; 20 $script_dir = "./" unless ($script_dir); 21 chop($script_dir); 22 23my $arg; 24my $in; 25my $indb2; 26my $out; 27my $arg_pass = ""; 28my $para = ""; 29my $host_no = 0; 30my @hosts = (); 31my $cd_hit_div_exe = "$script_dir/cd-hit-div"; 32my $cd_hit_div_pl = "$script_dir/cd-hit-div.pl"; 33my $cd_hit_2d_exe = "$script_dir/cd-hit-2d"; 34my $cd_hit_est_2d_exe = "$script_dir/cd-hit-est-2d"; 35my $clstr_merge_exe = "$script_dir/clstr_merge.pl"; 36my $seg_no = 2; 37my $seg_no2 = 8; 38my $restart_in = ""; 39my $queue = 0; 40my $local_cpu = 0; 41my $queue_type = "PBS"; 42my $prog = "cd-hit-2d"; 43my %stable_files = (); 44 45while ($arg=shift) { 46 if ($arg eq "-h" ) { print_usage();} 47 elsif ($arg eq "-i" ) { $in = shift; } 48 elsif ($arg eq "-i2") { $indb2 = shift; } 49 elsif ($arg eq "-o" ) { $out = shift; } 50 elsif ($arg eq "--B") { $para = shift; } 51 elsif ($arg eq "--L") { $local_cpu = shift; } 52 elsif ($arg eq "--P") { $prog = shift; } 53 elsif ($arg eq "--S") { $seg_no = shift; } 54 elsif ($arg eq "--S2") { $seg_no2 = shift; } 55 elsif ($arg eq "--Q" ) { $queue = shift; } 56 elsif ($arg eq "--T" ) { $queue_type = shift; } 57 elsif ($arg eq "--R" ) { $restart_in = shift; } 58 else {$arg_pass .= " $arg "; } 59} 60($in and $out) || print_usage(); 61if (not ($seg_no2 >1)) { 62 die "You are not dividing 2nd input db, parallel mode won't help"; 63} 64 65if ($prog eq "cd-hit-est") { 66 $cd_hit_2d_exe = $cd_hit_est_2d_exe; 67} 68 69my $pwd = `pwd`; chop($pwd); 70my $work_dir = "$out.cd-hit-para-tmp"; 71my $restart_file = "$out.restart"; 72my $indiv = "$work_dir/$in.div"; 73my $indiv2 = "$work_dir/$indb2.div"; 74my @commands = (); 75my @command_status = (); 76my $command_no = 0; 77my $cmd; 78my ($i, $j, $k, $i1, $j1, $k1); 79 80# readin a list of hosts 81if ($para) { 82 open(PARA, "$para") || die "can not open $para"; 83 while(my $ll= <PARA>){ 84 chop($ll); $ll =~ s/\s//g; 85 next unless ($ll); 86 push(@hosts, $ll); $host_no++; 87 } 88 close(PARA); 89} 90if ($queue) { 91 for ($i=0; $i<$queue; $i++) { 92 push(@hosts, "queue_host.$i"); 93 } 94 $host_no = $queue; 95} 96if ($local_cpu) { 97 for ($i=0; $i<$local_cpu; $i++) { 98 push(@hosts, "localhost.$i"); 99 } 100 $host_no = $local_cpu; 101} 102 103die "no host" unless $host_no; 104if ($host_no > $seg_no2) { 105 print "$indb2 was divided into $seg_no2 pieces, "; 106 print "Your number of hosts ($host_no) is more than $seg_no, "; 107 print "So, not all of hosts will be used\n"; 108} 109 110if (-e $restart_in) { 111 read_restart(); 112} 113else { 114 $cmd = `mkdir $work_dir`; 115 assign_commands(); 116 write_restart(); 117} 118 119 120#dbdiv run on master node? 121if ($command_status[0] eq "wait") { 122 $cmd = `$commands[0]`; 123 $command_status[0] = "done"; 124 write_restart(); 125 126 for ($i=0; $i<$seg_no; $i++) { 127 my $idb = "$indiv-$i"; 128 $stable_files{$idb} = 1; 129 } 130} 131 132if ($command_status[1] eq "wait") { 133 $cmd = `$commands[1]`; 134 $command_status[1] = "done"; 135 write_restart(); 136 137 for ($i=0; $i<$seg_no2; $i++) { 138 my $idb = "$indiv2-$i"; 139 $stable_files{$idb} = 1; 140 } 141} 142 143 144 145#main runing loop 146my $sleep_time = 1; 147while(1) { 148 #refresh job status by checking output files 149 #check whether all jobs are done or not 150 my $finish_flag = 1; 151 my $status_change = 0; 152 for ($i=2; $i<$command_no; $i++) { 153 next if ($command_status[$i] eq "done"); 154 $finish_flag = 0; 155 my $tcmd = $commands[$i]; 156 my $output = ""; 157 if ($tcmd =~ / -o\s+(\S+)/) { 158 $output = $1; 159 if ((-s $output) or (-s "$output.clstr")) { 160 $command_status[$i] = "done"; 161 $status_change = 1; 162 } 163 } 164 } 165 if ($status_change) { 166 write_restart(); 167 } 168 else { 169 sleep($sleep_time); print "."; 170 } 171 last if $finish_flag; 172 173 my $job_sent = 0; 174 for ($i=2; $i<$command_no; $i++) { 175 next if ($command_status[$i] eq "done"); 176 next if ($command_status[$i] eq "run"); 177 my $tcmd = $commands[$i]; 178 my $in1 = ""; 179 my $in2 = ""; 180 my $out_done = ""; 181 if ($tcmd =~ / -i\s+(\S+)/ ) {$in1 = $1;} 182 if ($tcmd =~ / -i2\s+(\S+)/) {$in2 = $1;} 183 if ($tcmd =~ / -o\s+(\S+)/ ) {$out_done = "$1.done";} 184 my $input_flag = 0; 185 186 if (($in1 =~ /\S/) and ($in2 =~ /\S/)) { 187 $input_flag = 1 if ((-s $in1) and (-s $in2)); 188 } 189 elsif ($in1 =~ /\S/) { 190 $input_flag = 1 if (-s $in1); 191 } 192 else { 193 die "Error at $tcmd\n"; 194 } 195 next unless $input_flag; 196 197 #now input files are ready, wait 198 wait_stable_file($in1); 199 wait_stable_file($in2); 200 201 my $thost_idx = wait_for_available_host(); 202 my $thost = $hosts[$thost_idx]; 203 my $tsh = "$work_dir/$out.$$.$thost_idx.sh"; 204 my $tlock = "$work_dir/$out.$$.$thost_idx.lock"; 205 my $trm = ""; 206 $trm = "rm -f $in2" if ($in2 =~ /\S/); 207 open(TSH, "> $tsh") || die; 208 $cmd = `date > $tlock`; 209 print TSH <<EOD; 210date > $tlock 211$tcmd 212$trm 213rm -f $tlock 214date > $out_done 215EOD 216 close(TSH); 217 if ($local_cpu) { 218 $cmd = `sh $tsh >/dev/null 2>&1 &`; 219 $command_status[$i] = "run"; 220 print "run at $thost $tcmd\n"; 221 } 222 elsif ($queue and ($queue_type eq "PBS")) { 223 my $t = "para-$thost_idx"; 224 open(QUEUE,"| qsub -N $t -o $t.log -e $t.err"); 225 print QUEUE "cd $pwd; sh $tsh"; 226 close(QUEUE); 227 $command_status[$i] = "run"; 228 } 229 elsif ($queue and ($queue_type eq "SGE")) { 230 my $t = "para-$thost_idx"; 231 open(QUEUE,"| qsub -N $t"); 232 print QUEUE <<EOD; 233#!/bin/sh 234#$ -S /usr/local/bin/bash 235#$ -v PATH 236 237cd $pwd 238sh $tsh 239 240EOD 241 close(QUEUE); 242 $command_status[$i] = "run"; 243 } 244 else { 245 $cmd = `ssh -xqf $thost 'cd $pwd; sh $tsh >/dev/null 2>&1 &'`; 246 $command_status[$i] = "run"; 247 print "run at $thost $tcmd\n"; 248 } 249 $sleep_time = 1; 250 $job_sent = 1; 251 last; 252 } 253 254 if ((not $job_sent) and ($sleep_time < 60)) { 255 $sleep_time +=5; 256 } 257 258} ############ main run loop 259 260 261######## merge all .clstr file 262my $out_clstr = "$out.clstr"; 263if (not -s $out_clstr) { 264 265 my @indb2_left = (); 266 for ($i=0; $i<$seg_no; $i++) { 267 my @t_clstr = (); 268 for ($j=0; $j<$seg_no2; $j++) { 269 my $tclstr = "$indiv2-$j.vs.$i.clstr"; 270 if (-s $tclstr) {push(@t_clstr,$tclstr); } 271 else {die "No file $tclstr\n";} 272 273 if ($i==($seg_no-1)) { 274 push(@indb2_left, "$indiv2-$j.vs.$i"); 275 } 276 } 277 278 #merge 279 my $tclstrs = join(" ", @t_clstr); 280 if (@t_clstr > 1) { 281 print "$clstr_merge_exe $tclstrs >> $out_clstr\n"; 282 $cmd = `$clstr_merge_exe $tclstrs >> $out_clstr`; 283 } 284 else { 285 print "cat $tclstrs >> $out_clstr"; 286 $cmd = `cat $tclstrs >> $out_clstr`; 287 288 } 289 } 290 291 my $out_clstr_ren = "$out.clstr.$$"; 292 open(TMP, $out_clstr) || die; 293 open(OTMP, "> $out_clstr_ren") || die; 294 my $no = 0; 295 my $cno; 296 my $ll; 297 while($ll=<TMP>){ 298 if ($ll =~ /^>Cluster (\d+)/) { 299 print OTMP ">Cluster $no\n"; $no++; 300 $cno = 0; 301 } 302 else { 303 $ll =~ s/^\d+/$cno/; 304 print OTMP $ll; 305 $cno++; 306 } 307 } 308 close(TMP); 309 close(OTMP); 310 sleep(10); 311 $cmd = `mv $out_clstr_ren $out_clstr`; 312 313 my $reps = join(" ", @indb2_left); 314 $cmd = `cat $reps > $out`; 315} 316 317if (1) { 318 $cmd = `grep CPU $work_dir/*log`; 319 my @lls = split(/\n/, $cmd); 320 my $cpu = 0; 321 my $ll; 322 foreach $ll (@lls) { 323 if ($ll =~ /CPU\s+time\s+(\d+)/) { 324 $cpu += $1; 325 } 326 } 327 print "Total CPU time: $cpu\n"; 328} 329 330sub wait_for_available_host { 331 my ($i, $j, $k); 332 my $sleep = 30; 333 while(1) { 334 335 for ($i=0; $i<$host_no; $i++) { 336 my $thost = $hosts[$i]; 337 my $tlock = "$work_dir/$out.$$.$i.lock"; 338 next if (-e $tlock); 339 return $i; 340 } 341 sleep($sleep); 342 $sleep +=30; 343 if ($sleep >= 300) { $sleep = 30; } 344 } 345} 346########## END wait_for_available_host 347 348 349sub wait_stable_file { 350 my ($i, $j, $k); 351 my $f = shift; 352 return if ($stable_files{$f}); 353 return unless (-e $f); 354 355 if (-e "$f.done") { $stable_files{$f} = 1; return; } 356 my $size0 = -s $f; 357 while(1) { 358 sleep(10); 359 my $size1 = -s $f; 360 if ($size0 == $size1) { $stable_files{$f} = 1; last; } 361 else {$size0 = $size1; } 362 } 363} 364########## END wait_stable_file 365 366 367sub write_restart { 368 my ($i, $j, $k); 369 open(RES, "> $restart_file") || die; 370 371 for ($i=0; $i<$command_no; $i++) { 372 print RES "$commands[$i]\n$command_status[$i]\n"; 373 } 374 close(RES); 375} 376########## END write_restart 377 378sub assign_commands { 379 my ($i, $j, $k); 380 my $cmd; 381 my ($idb, $idbo, $jdb, $jdbo, $idbout, $idblog, $jdblog); 382 383 $command_no = 0; 384 if ($seg_no >1) {$cmd = "$cd_hit_div_exe -i $in -o $indiv -div $seg_no";} 385 elsif ($seg_no==1) {$cmd = "ln -s ../$in $indiv-0";} 386 else {die "Wrong --S option";} 387 push(@commands, $cmd); 388 push(@command_status, "wait"); 389 $command_no++; 390 391 if ($seg_no2 >1){$cmd = "$cd_hit_div_pl $indb2 $indiv2 $seg_no2";} 392 elsif ($seg_no2==1){$cmd = "ln -s ../$indb2 $indiv2-0"; } 393 push(@commands, $cmd); 394 push(@command_status, "wait"); 395 $command_no++; 396 397 for ($j=0; $j<$seg_no2; $j++) { 398 #j for db2 399 $jdb = "$indiv2-$j"; 400 $jdblog = "$jdb.log"; 401 402 for ($i=0; $i<$seg_no; $i++) { 403 $idb = "$indiv-$i"; 404 $jdbo = "$indiv2-$j.vs.$i"; 405 $cmd = "$cd_hit_2d_exe -i $idb -i2 $jdb -o $jdbo $arg_pass >> $jdblog"; 406 push(@commands, $cmd); 407 push(@command_status, "wait"); 408 $command_no++; 409 $jdb = $jdbo; 410 } 411 } 412} #### END assign_commands 413 414 415sub read_restart { 416 $command_no = 0; 417 open(RRRR, "$restart_in") || die; 418 my $ll; 419 while ($ll = <RRRR>) { 420 chop($ll); 421 push(@commands, $ll); 422 $ll = <RRRR>; 423 chop($ll); 424 push(@command_status, $ll); 425 $command_no++; 426 } 427 close(RRRR); 428} 429########## END read_restart 430 431 432sub print_usage { 433 print <<EOD; 434Usage: $script_name options 435 This script divide a big clustering job into pieces and submit 436 jobs to remote computers over a network to make it parallel. 437 After all the jobs finished, the script merge the clustering 438 results as if you just run a single cd-hit-2d or cd-hit-est-2d. 439 440 You can also use it to divide big jobs on a single computer if 441 your computer does not have enough RAM (with -L option). 442 443Requirements: 444 1 When run this script over a network, the directory where you 445 run the scripts and the input files must be available on 446 all the remote hosts with identical path. 447 2 If you choose "ssh" to submit jobs, you have to have 448 passwordless ssh to any remote host, see ssh manual to 449 know how to set up passwordless ssh. 450 3 I suggest to use queuing system instead of ssh, 451 I currently support PBS and SGE 452 4 cd-hit-2d cd-hit-est-2d cd-hit-div cd-hit-div.pl must be 453 in same directory where this script is in. 454 455Options 456 457 -i input filename for 1st db in fasta format, required 458 -i2 input filename for 2nd db in fasta format, required 459 -o output filename, required 460 --P program, "cd-hit-2d" or "cd-hit-est-2d", default "cd-hit-2d" 461 --B filename of list of hosts, 462 requred unless -Q or -L option is supplied 463 --L number of cpus on local computer, default $local_cpu 464 when you are not running it over a cluster, you can use 465 this option to divide a big clustering jobs into small 466 pieces, I suggest you just use "--L 1" unless you have 467 enough RAM for each cpu 468 --S Number of segments to split 1st db into, default $seg_no 469 --S2 Number of segments to split 2nd db into, default $seg_no2 470 --Q number of jobs to submit to queue queuing system, default $queue 471 by default, the program use ssh mode to submit remote jobs 472 --T type of queuing system, "PBS", "SGE" are supported, default $queue_type 473 --R restart file, used after a crash of run 474 -h print this help 475 476More cd-hit-2d/cd-hit-est-2d options can be speicified in command line 477 478 Questions, bugs, contact Weizhong Li at liwz\@sdsc.edu 479 480EOD 481 exit; 482} 483#### END print_usage 484