1#!/usr/local/bin/perl -w 2# 3# This file is part of 'Bitflu' - (C) 2006-2011 Adrian Ulrich 4# 5# Released under the terms of The "Artistic License 2.0". 6# http://www.opensource.org/licenses/artistic-license-2.0.php 7# 8use strict; 9use Data::Dumper; 10use Getopt::Long; 11use Danga::Socket; 12 13my $bitflu_run = undef; # Start as not_running and not_killed 14my $getopts = { help => undef, config => "$ENV{HOME}/.bitflu.config", version => undef, quiet=>undef }; 15$SIG{PIPE} = $SIG{CHLD} = 'IGNORE'; 16$SIG{INT} = $SIG{HUP} = $SIG{TERM} = \&HandleShutdown; 17 18GetOptions($getopts, "help|h", "version", "plugins", "config=s", "daemon", "quiet|q") or exit 1; 19if($getopts->{help}) { 20 die << "EOF"; 21Usage: $0 [--help --version --plugins --config=s --daemon --quiet] 22 23 -h, --help print this help. 24 --version display the version of bitflu and exit 25 --plugins list all loaded plugins and exit 26 --config=file use specified configuration file (default: .bitflu.config) 27 --daemon run bitflu as a daemon 28 -q, --quiet disable logging to standard output 29 30Example: $0 --config=/etc/bitflu.config --daemon 31 32Mail bug reports and suggestions to <adrian\@blinkenlights.ch>. 33EOF 34} 35 36 37# -> Create bitflu object 38my $bitflu = Bitflu->new(configuration_file=>$getopts->{config}) or Carp::confess("Unable to create Bitflu Object"); 39if($getopts->{version}) { die $bitflu->_Command_Version->{MSG}->[0]->[1]."\n" } 40 41my @loaded_plugins = $bitflu->LoadPlugins('Bitflu'); 42 43if($getopts->{plugins}) { 44 print "# Loaded Plugins: (from ".$bitflu->Configuration->GetValue('plugindir').")\n"; 45 foreach (@loaded_plugins) { printf("File %-35s provides: %s\n", $_->{file}, $_->{package}); } 46 exit(0); 47} 48elsif($getopts->{daemon}) { 49 $bitflu->Daemonize(); 50} 51elsif($getopts->{quiet}) { 52 $bitflu->DisableConsoleOutput; 53} 54 55$bitflu->SysinitProcess(); 56$bitflu->SetupDirectories(); 57$bitflu->InitPlugins(); 58$bitflu->PreloopInit(); 59 60$bitflu_run = 1 if !defined($bitflu_run); # Enable mainloop and sighandler if we are still not_killed 61 62 63 64RunPlugins(); 65Danga::Socket->SetPostLoopCallback( sub { return $bitflu_run } ); 66Danga::Socket->EventLoop(); 67 68 69$bitflu->Storage->terminate; 70$bitflu->info("-> Shutdown completed after running for ".(int(time())-$^T)." seconds"); 71exit(0); 72 73 74 75 76 77sub RunPlugins { 78 my $NOW = $bitflu->Network->GetTime; 79 foreach my $rk (keys(%{$bitflu->{_Runners}})) { 80 my $rx = $bitflu->{_Runners}->{$rk} or $bitflu->panic("Runner $rk vanished"); 81 next if $rx->{runat} > $NOW; 82 $rx->{runat} = $NOW + $rx->{target}->run($NOW); 83 } 84 if($bitflu_run) { 85 Danga::Socket->AddTimer(0.1, sub { RunPlugins() }) 86 } 87} 88 89sub HandleShutdown { 90 my($sig) = @_; 91 if(defined($bitflu_run) && $bitflu_run == 1) { 92 # $bitflu is running, so we can use ->info 93 $bitflu->info("-> Starting shutdown... (signal $sig received)"); 94 } 95 else { 96 print "-> Starting shutdown... (signal $sig received), please wait...\n"; 97 } 98 $bitflu_run = 0; # set it to not_running and killed 99} 100 101 102 103package Bitflu; 104use strict; 105use Carp; 106use constant V_MAJOR => '1'; 107use constant V_MINOR => '52'; 108use constant V_STABLE => 1; 109use constant V_TYPE => ( V_STABLE ? 'stable' : 'devel' ); 110use constant VERSION => V_MAJOR.'.'.V_MINOR.'-'.V_TYPE; 111use constant APIVER => 20120529; 112use constant LOGBUFF => 0xFF; 113 114 ########################################################################## 115 # Create a new Bitflu-'Dispatcher' object 116 sub new { 117 my($class, %args) = @_; 118 my $self = {}; 119 bless($self, $class); 120 $self->{_LogFH} = *STDOUT; # Must be set ASAP 121 $self->{_LogBuff} = []; # Empty at startup 122 123 $self->{Core}->{"00_Tools"} = Bitflu::Tools->new(super => $self); # Tools is also loaded ASAP because ::Configuration needs it 124 $self->{Core}->{"01_Syscall"} = Bitflu::Syscall->new(super => $self); # -> this should be one of the first core plugin started 125 $self->{Core}->{"10_Admin"} = Bitflu::Admin->new(super => $self); 126 $self->{Core}->{"20_Config"} = Bitflu::Configuration->new(super=>$self, configuration_file => $args{configuration_file}); 127 $self->{Core}->{"30_Network"} = Bitflu::Network->new(super => $self); 128 $self->{Core}->{"99_QueueMgr"} = Bitflu::QueueMgr->new(super => $self); 129 130 $self->{_Runners} = {}; 131 $self->{_Plugins} = (); 132 return $self; 133 } 134 135 ########################################################################## 136 # Return internal version 137 sub GetVersion { 138 my($self) = @_; 139 return(V_MAJOR, V_MINOR, V_STABLE); 140 } 141 142 ########################################################################## 143 # Return internal version as string 144 sub GetVersionString { 145 my($self) = @_; 146 return VERSION; 147 } 148 149 ########################################################################## 150 # Call hardcoded configuration plugin 151 sub Configuration { 152 my($self) = @_; 153 return $self->{Core}->{"20_Config"}; 154 } 155 156 ########################################################################## 157 # Call hardcoded tools plugin 158 sub Tools { 159 my($self) = @_; 160 return $self->{Core}->{"00_Tools"}; 161 } 162 163 ########################################################################## 164 # Call hardcoded Network IO plugin 165 sub Network { 166 my($self) = @_; 167 return $self->{Core}->{"30_Network"}; 168 } 169 170 ########################################################################## 171 # Call hardcoded Admin plugin 172 sub Admin { 173 my($self) = @_; 174 return $self->{Core}->{"10_Admin"}; 175 } 176 177 ########################################################################## 178 # Call hardcoded Syscall plugin 179 sub Syscall { 180 my($self) = @_; 181 return $self->{Core}->{"01_Syscall"}; 182 } 183 184 ########################################################################## 185 # Call hardcoded Queue plugin 186 sub Queue { 187 my($self) = @_; 188 return $self->{Core}->{"99_QueueMgr"}; 189 } 190 191 ########################################################################## 192 # Call currently loaded storage plugin 193 sub Storage { 194 my($self) = @_; 195 return $self->{Plugin}->{Storage}; 196 } 197 198 ########################################################################## 199 # Let bitflu run the given target 200 sub AddRunner { 201 my($self,$target) = @_; 202 $self->{_Runners}->{$target} and $self->panic("$target is registered: wont overwrite it"); 203 $self->{_Runners}->{$target} = { target=>$target, runat=>0 }; 204 } 205 206 ########################################################################## 207 # Returns a list of bitflus _Runner array as reference hash 208 sub GetRunnerTarget { 209 my($self,$target) = @_; 210 foreach my $rx (values(%{$self->{_Runners}})) { 211 return $rx->{target} if ref($rx->{target}) eq $target; 212 } 213 return undef; 214 } 215 216 ########################################################################## 217 # Creates a new Simple-eXecution task 218 sub CreateSxTask { 219 my($self,%args) = @_; 220 $args{__SUPER_} = $self; 221 my $sx = Bitflu::SxTask->new(%args); 222 $self->AddRunner($sx); 223 #$self->debug("CreateSxTask returns <$sx>"); 224 #$self->info("SxTask : ".ref($sx->{super})."->$sx->{cback} created (id: $sx)"); 225 return $sx; 226 } 227 228 ########################################################################## 229 # Kills an SxTask 230 sub DestroySxTask { 231 my($self,$taskref) = @_; 232 delete($self->{_Runners}->{$taskref}) or $self->panic("Could not kill non-existing task $taskref"); 233 #$self->info("SxTask : $taskref terminated"); 234 return 1; 235 } 236 237 ########################################################################## 238 # Register the exclusive storage plugin 239 sub AddStorage { 240 my($self,$target) = @_; 241 if(defined($self->{Plugin}->{Storage})) { $self->panic("Unable to register additional storage driver '$target' !"); } 242 $self->{Plugin}->{Storage} = $target; 243 $self->debug("AddStorage($target)"); 244 return 1; 245 } 246 247 248 249 ########################################################################## 250 # Loads all plugins from 'plugins' directory but does NOT init them 251 sub LoadPlugins { 252 my($self,$xclass) = @_; 253 # 254 unshift(@INC, $self->Configuration->GetValue('plugindir')); 255 256 my $pdirpath = $self->Configuration->GetValue('plugindir')."/$xclass"; 257 my @plugins = (); 258 my %exclude = (map { $_ => 1} split(/;/,$self->Configuration->GetValue('pluginexclude'))); 259 260 opendir(PLUGINS, $pdirpath) or $self->stop("Unable to read directory '$pdirpath' : $!"); 261 foreach my $dirent (sort readdir(PLUGINS)) { 262 next unless my($pfile, $porder, $pmodname) = $dirent =~ /^((\d\d)_(.+)\.pm)$/i; 263 264 if($exclude{$pfile}) { 265 $self->info("Skipping disabled plugin '$pfile -> $pmodname'"); 266 } 267 elsif($porder eq '00' && $pmodname ne $self->Configuration->GetValue('storage')) { 268 $self->debug("Skipping unconfigured storage plugin '$dirent'"); 269 } 270 else { 271 push(@plugins, {file=>$pfile, order=>$porder, class=>$xclass, modname=>$pmodname, package=>$xclass."::".$3}); 272 $self->debug("Found plugin $plugins[-1]->{package} in folder $pdirpath"); 273 } 274 } 275 closedir(PLUGINS); 276 277 $self->{_Plugins} = \@plugins; 278 279 foreach my $plugin (@{$self->{_Plugins}}) { 280 my $fname = $plugin->{class}."/".$plugin->{file}; 281 $self->debug("Loading $fname"); 282 eval { require $fname; }; 283 if($@) { 284 my $perr = $@; chomp($perr); 285 $self->yell("Unable to load plugin '$fname', error was: '$perr'"); 286 $self->stop(" -> Please fix or remove this broken plugin file from $pdirpath"); 287 } 288 my $this_apiversion = $plugin->{package}->_BITFLU_APIVERSION; 289 if($this_apiversion != APIVER) { 290 $self->yell("Plugin '$fname' has an invalid API-Version ( (\$apivers = $this_apiversion) != (\$expected = ".APIVER.") )"); 291 $self->yell("HINT: Maybe you forgot to replace the plugins at $pdirpath while upgrading bitflu?!..."); 292 $self->stop("-> Exiting due to APIVER mismatch"); 293 } 294 } 295 return @plugins; 296 } 297 298 ########################################################################## 299 # Startup all plugins 300 sub InitPlugins { 301 my($self) = @_; 302 303 my @TO_INIT = (); 304 foreach my $plugin (@{$self->{_Plugins}}) { 305 $self->debug("Registering '$plugin->{package}'"); 306 my $this_plugin = $plugin->{package}->register($self) or $self->panic("Regsitering '$plugin' failed, aborting"); 307 push(@TO_INIT, {name=>$plugin->{package}, ref=>$this_plugin}); 308 } 309 foreach my $toinit (@TO_INIT) { 310 $self->debug("++ Starting Normal-Plugin '$toinit->{name}'"); 311 $toinit->{ref}->init() or $self->panic("Unable to init plugin : $!"); 312 } 313 foreach my $coreplug (sort keys(%{$self->{Core}})) { 314 $self->debug("++ Starting Core-Plugin '$coreplug'"); 315 $self->{Core}->{$coreplug}->init() or $self->panic("Unable to init Core-Plugin : $!"); 316 } 317 } 318 319 ########################################################################## 320 # Build some basic directory structure 321 sub SetupDirectories { 322 my($self) =@_; 323 my $workdir = $self->Configuration->GetValue('workdir') or $self->panic("No workdir configured"); 324 my $tmpdir = $self->Tools->GetTempdir or $self->panic("No tempdir configured"); 325 foreach my $this_dir ($workdir, $tmpdir) { 326 unless(-d $this_dir) { 327 $self->debug("mkdir($this_dir)"); 328 mkdir($this_dir) or $self->stop("Unable to create directory '$this_dir' : $!"); 329 } 330 } 331 } 332 333 ########################################################################## 334 # Change nice level, chroot and drop privileges 335 sub SysinitProcess { 336 my($self) = @_; 337 my $chroot = $self->Configuration->GetValue('chroot'); 338 my $chdir = $self->Configuration->GetValue('chdir'); 339 my $uid = int($self->Configuration->GetValue('runas_uid') || 0); 340 my $gid = int($self->Configuration->GetValue('runas_gid') || 0); 341 my $renice = int($self->Configuration->GetValue('renice') || 0); 342 my $outlog = ($self->Configuration->GetValue('logfile') || ''); 343 my $pidfile= ($self->Configuration->GetValue('pidfile') || ''); 344 345 if(length($outlog)) { 346 open(LFH, ">>", $outlog) or $self->stop("Cannot write to logfile '$outlog' : $!"); 347 $self->{_LogFH} = *LFH; 348 $self->{_LogFH}->autoflush(1); 349 $self->yell("Logging to '$outlog'"); 350 } 351 352 if(length($pidfile)) { 353 $self->info("Writing pidfile at '$pidfile'"); 354 open(PIDFILE, ">", $pidfile) or $self->stop("Cannot write to pidfile '$pidfile' : $!"); 355 print PIDFILE "$$\n"; 356 close(PIDFILE); 357 } 358 359 360 # Lock values because we cannot change them after we finished 361 foreach my $lockme (qw(runas_uid runas_gid chroot)) { 362 $self->Configuration->RuntimeLockValue($lockme); 363 } 364 365 # -> Adjust resolver settings (is this portable? does it work on *BSD?) 366 $ENV{RES_OPTIONS} = "timeout:1 attempts:1"; 367 368 369 # -> Set niceness (This is done before dropping root to get negative values working) 370 if($renice) { 371 $renice = ($renice > 19 ? 19 : ($renice < -20 ? -20 : $renice) ); # Stop funny stuff... 372 $self->info("Setting my own niceness to $renice"); 373 POSIX::nice($renice) or $self->warn("nice($renice) failed: $!"); 374 } 375 376 # -> Chroot 377 if(defined($chroot)) { 378 $self->info("Chrooting into '$chroot'"); 379 380 Carp::longmess("FULLY_LOADING_CARP"); # init CARP module 381 my $x = gethostbyname('localhost.localdomain'); # init DNS resolver 382 383 chdir($chroot) or $self->panic("Cannot change into directory '$chroot' : $!"); 384 chroot($chroot) or $self->panic("Cannot chroot into directory '$chroot' (are you root?) : $!"); 385 chdir('/') or $self->panic("Unable to change into new chroot topdir: $!"); 386 } 387 388 389 # -> Drop group privileges 390 if($gid) { 391 $self->info("Changing gid to $gid"); 392 393 $) = "$gid $gid"; # can fail with 'no such file or directory' 394 395 $! = undef; 396 $( = "$gid"; 397 $self->panic("Unable to set GID: $!") if $!; 398 } 399 400 # -> Drop user privileges 401 if($uid) { 402 $self->info("Changing uid to $uid"); 403 POSIX::setuid($uid) or $self->panic("Unable to change UID: $!"); 404 } 405 406 # -> Check if we are still root. We shouldn't. 407 if($> == 0 or $) == 0 or $( == 0) { 408 $self->warn("Refusing to run with root privileges. Do not start $0 as root unless you are using"); 409 $self->warn("the chroot option. In this case you must also specify the options runas_uid & runas_gid"); 410 $self->stop("Bitflu refuses to run as root"); 411 } 412 413 if($chdir) { 414 $self->info("Changing into directory '$chdir'"); 415 chdir($chdir) or $self->stop("chdir($chdir) failed: $!"); 416 } 417 418 $self->info("$0 is running with pid $$ ; uid = ($>|$<) / gid = ($)|$()"); 419 } 420 421 422 ########################################################################## 423 # This should get called after starting the mainloop 424 # The subroutine does the same as a 'init' in a plugin 425 sub PreloopInit { 426 my($self) = @_; 427 $self->Admin->RegisterCommand('die' , $self, '_Command_Shutdown' , 'Terminates bitflu'); 428 $self->Admin->RegisterCommand('version' , $self, '_Command_Version' , 'Displays bitflu version string'); 429 $self->Admin->RegisterCommand('date' , $self, '_Command_Date' , 'Displays current time and date'); 430 } 431 432 ########################################################################## 433 # Set _LogFh to undef if we are logging to stdout 434 # this disables logging to the console 435 sub DisableConsoleOutput { 436 my($self) = @_; 437 $self->debug("DisableConsoleOutput called"); 438 if($self->{_LogFH} eq *STDOUT) { 439 $self->debug("=> Setting _LogFH to undef"); 440 $self->{_LogFH} = ''; 441 } 442 # Do not printout any warnings to STDOUT 443 $SIG{__WARN__} = sub {}; 444 } 445 446 ########################################################################## 447 # Fork into background 448 sub Daemonize { 449 my($self) = @_; 450 my $child = fork(); 451 452 if(!defined($child)) { 453 die "Unable to fork: $!\n"; 454 } 455 elsif($child != 0) { 456 $self->debug("Bitflu is running with pid $child"); 457 exit(0); 458 } 459 460 $self->DisableConsoleOutput; 461 } 462 463 ########################################################################## 464 # bye! 465 sub _Command_Shutdown { 466 my($self) = @_; 467 kill(2,$$); 468 return {MSG=>[ [1, "Shutting down $0 (with pid $$)"] ], SCRAP=>[]}; 469 } 470 471 ########################################################################## 472 # Return version string 473 sub _Command_Version { 474 my($self) = @_; 475 my $uptime = ($self->Network->GetTime - $^T)/60; 476 return {MSG=>[ [1, sprintf("This is Bitflu %s (API:%s) running on %s with perl %vd. Uptime: %.3f minutes (%s)",$self->GetVersionString, 477 APIVER, $^O, $^V, $uptime, "".localtime($^T) )] ], SCRAP=>[]}; 478 } 479 480 ########################################################################## 481 # Return version string 482 sub _Command_Date { 483 my($self) = @_; 484 return {MSG=>[ [1, "".localtime()] ], SCRAP=>[]}; 485 } 486 487 ########################################################################## 488 # Printout logmessage 489 sub _xlog { 490 my($self, $msg, $force_stdout) = @_; 491 my $rmsg = localtime()." # $msg\n"; 492 my $xfh = $self->{_LogFH}; 493 my $lbuff = $self->{_LogBuff}; 494 495 print $xfh $rmsg if $xfh; 496 497 if($force_stdout && $xfh ne *STDOUT) { 498 print STDOUT $rmsg; 499 } 500 501 push(@$lbuff, $rmsg); 502 shift(@$lbuff) if int(@$lbuff) >= LOGBUFF; 503 } 504 505 sub info { my($self,$msg) = @_; return if $self->Configuration->GetValue('loglevel') < 4; $self->_xlog($msg); } 506 sub debug { my($self,$msg) = @_; return if $self->Configuration->GetValue('loglevel') < 10; $self->_xlog(" ** DEBUG ** $msg"); } 507 sub warn { my($self,$msg) = @_; return if $self->Configuration->GetValue('loglevel') < 2; $self->_xlog("** WARNING ** $msg"); } 508 sub yell { my($self,$msg) = @_; $self->_xlog($msg,1); } 509 sub stop { my($self,$msg) = @_; $self->yell("EXITING # $msg"); exit(1); } 510 sub panic { 511 my($self,$msg) = @_; 512 $self->yell("--------- BITFLU SOMEHOW MANAGED TO CRASH ITSELF; PANIC MESSAGE: ---------"); 513 $self->yell($msg); 514 $self->yell("--------- BACKTRACE START ---------"); 515 $self->yell(Carp::longmess()); 516 $self->yell("---------- BACKTRACE END ----------"); 517 518 $self->yell("SHA1-Module used : ".$self->Tools->{mname}); 519 $self->yell("Perl Version : ".sprintf("%vd", $^V)); 520 $self->yell("Perl Execname : ".$^X); 521 $self->yell("Bitflu Version : ".$self->GetVersionString); 522 $self->yell("OS-Name : ".$^O); 523 $self->yell("IPv6 ? : ".$self->Network->HaveIPv6); 524 $self->yell("Danga::Socket : ".$Danga::Socket::VERSION); 525 $self->yell("Running since : ".gmtime($^T)); 526 $self->yell("---------- LOADED PLUGINS ---------"); 527 foreach my $plug (@{$self->{_Plugins}}) { 528 $self->yell(sprintf("%-32s -> %s",$plug->{file}, $plug->{package})); 529 } 530 $self->yell("##################################"); 531 exit(1); 532 } 533 534 5351; 536 537 538#################################################################################################################################################### 539#################################################################################################################################################### 540# Bitflu Queue manager 541# 542package Bitflu::QueueMgr; 543 544use constant SHALEN => 40; 545use constant HPFX => 'history_'; 546use constant HIST_MAX => 100; 547use constant STATE_PAUSED => 1; 548use constant STATE_AUTOPAUSED => 2; 549 550 sub new { 551 my($class, %args) = @_; 552 my $self = {super=> $args{super}}; 553 bless($self,$class); 554 return $self; 555 } 556 557 ########################################################################## 558 # Inits plugin: This resumes all found storage items 559 sub init { 560 my($self) = @_; 561 my $queueIds = $self->{super}->Storage->GetStorageItems(); 562 my $toload = int(@$queueIds); 563 $self->info("Resuming $toload downloads, this may take a few seconds..."); 564 565 foreach my $sid (@$queueIds) { 566 my $this_storage = $self->{super}->Storage->OpenStorage($sid) or $self->panic("Unable to open storage for sid $sid"); 567 my $owner = $this_storage->GetSetting('owner'); 568 569 $self->info(sprintf("[%3d] Loading %s", $toload--, $sid)); 570 571 if(defined($owner) && (my $r_target = $self->{super}->GetRunnerTarget($owner)) ) { 572 $r_target->resume_this($sid); 573 } 574 else { 575 $self->stop("StorageObject $sid is owned by '$owner', but plugin is not loaded/registered correctly"); 576 } 577 } 578 579 $self->{super}->Admin->RegisterCommand('rename' , $self, 'admincmd_rename', 'Renames a download', 580 [ [undef, "Renames a download"], [undef, "Usage: rename queue_id \"New Name\""] ]); 581 $self->{super}->Admin->RegisterCommand('cancel' , $self, 'admincmd_cancel', 'Removes a file from the download queue', 582 [ [undef, "Removes a file from the download queue. Use --wipe to *remove* data of completed downloads"], [undef, "Usage: cancel [--wipe] queue_id [queue_id2 ...]"] ]); 583 584 $self->{super}->Admin->RegisterCommand('history' , $self, 'admincmd_history', 'Manages download history', 585 [ [undef, "Manages internal download history"], [undef, ''], 586 [undef, "Usage: history [ queue_id [show forget] ] [list|drop|cleanup]"], [undef, ''], 587 [undef, "history list : List all remembered downloads"], 588 [undef, "history drop : List and forget all remembered downloads"], 589 [undef, "history cleanup : Trim history size"], 590 [undef, "history queue_id show : Shows details about queue_id"], 591 [undef, "history queue_id forget : Removes history of queue_id"], 592 ]); 593 594 $self->{super}->Admin->RegisterCommand('pause' , $self, 'admincmd_pause', 'Stops a download', 595 [ [undef, "Stop given download"], [undef, ''], 596 [undef, "Usage: pause queue_id [queue_id2 ... | --all]"], [undef, ''], 597 ]); 598 599 $self->{super}->Admin->RegisterCommand('resume' , $self, 'admincmd_resume', 'Resumes a paused download', 600 [ [undef, "Resumes a paused download"], [undef, ''], 601 [undef, "Usage: resume queue_id [queue_id2 ... | --all]"], [undef, ''], 602 ]); 603 604 $self->info("--- startup completed: bitflu ".$self->{super}->GetVersionString." is ready ---"); 605 return 1; 606 } 607 608 609 ########################################################################## 610 # Pauses a download 611 sub admincmd_pause { 612 my($self, @args) = @_; 613 614 my @MSG = (); 615 my $NOEXEC = ''; 616 $self->{super}->Tools->GetOpts(\@args); 617 618 if(int(@args)) { 619 foreach my $cid (@args) { 620 my $so = $self->{super}->Storage->OpenStorage($cid); 621 if($so) { 622 $self->SetPaused($cid); 623 push(@MSG, [1, "$cid: download paused"]); 624 } 625 else { 626 push(@MSG, [2, "$cid: does not exist in queue, cannot pause"]); 627 } 628 } 629 } 630 else { 631 $NOEXEC .= 'Usage: pause queue_id'; 632 } 633 return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC}); 634 } 635 636 ########################################################################## 637 # Resumes a download 638 sub admincmd_resume { 639 my($self, @args) = @_; 640 641 my @MSG = (); 642 my $NOEXEC = ''; 643 $self->{super}->Tools->GetOpts(\@args); 644 645 if(int(@args)) { 646 foreach my $cid (@args) { 647 my $so = $self->{super}->Storage->OpenStorage($cid); 648 if($so) { 649 $self->SetUnPaused($cid); 650 push(@MSG, [1, "$cid: download resumed"]); 651 } 652 else { 653 push(@MSG, [2, "$cid: does not exist in queue, cannot resume"]); 654 } 655 } 656 } 657 else { 658 $NOEXEC .= 'Usage: resume queue_id'; 659 } 660 return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC}); 661 } 662 663 664 ########################################################################## 665 # Cancel a queue item 666 sub admincmd_cancel { 667 my($self, @args) = @_; 668 669 my @MSG = (); 670 my $NOEXEC = ''; 671 my $do_wipe = 0; 672 673 if(int(@args)) { 674 if($args[0] eq '--wipe') { 675 $do_wipe = 1; 676 shift(@args); # remove first parameter 677 } 678 679 foreach my $cid (@args) { 680 my $storage = $self->{super}->Storage->OpenStorage($cid); 681 if($storage) { 682 my $owner = $storage->GetSetting('owner'); 683 if(defined($owner) && (my $r_target = $self->{super}->GetRunnerTarget($owner)) ) { 684 $self->ModifyHistory($cid, Canceled=>''); 685 $storage->SetSetting('wipedata', 1) if $do_wipe; 686 $r_target->cancel_this($cid); 687 push(@MSG, [1, "'$cid' canceled"]); 688 } 689 else { 690 $self->panic("'$cid' has no owner, cannot cancel!"); 691 } 692 } 693 else { 694 push(@MSG, [2, "'$cid' not removed from queue: No such item"]); 695 } 696 } 697 } 698 else { 699 $NOEXEC .= 'Usage: cancel [--wipe] queue_id [queue_id2 ...]'; 700 } 701 702 703 return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC}); 704 } 705 706 ########################################################################## 707 # Rename a queue item 708 sub admincmd_rename { 709 my($self, @args) = @_; 710 711 my $sha = $args[0]; 712 my $name = $args[1]; 713 my @MSG = (); 714 my $NOEXEC = ''; 715 716 if(!defined($name)) { 717 $NOEXEC .= "Usage: rename queue_id \"New Name\""; 718 } 719 elsif(my $storage = $self->{super}->Storage->OpenStorage($sha)) { 720 $storage->SetSetting('name', $name); 721 push(@MSG, [1, "Renamed $sha into '$name'"]); 722 } 723 else { 724 push(@MSG, [2, "Unable to rename $sha: queue_id does not exist"]); 725 } 726 return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC}); 727 } 728 729 ########################################################################## 730 # Manages download history 731 sub admincmd_history { 732 my($self,@args) = @_; 733 734 my $sha = ($args[0] || ''); 735 my $cmd = ($args[1] || ''); 736 my @MSG = (); 737 my $NOEXEC = ''; 738 my $hpfx = HPFX; 739 my $hkey = $hpfx.$sha; 740 my $strg = $self->{super}->Storage; 741 742 if($sha eq 'list' or $sha eq 'drop' or $sha eq 'cleanup') { 743 my @cbl = $strg->ClipboardList; 744 my $cbi = 0; 745 my $drop = {}; 746 my $drop_c = 0; 747 748 foreach my $item (@cbl) { 749 if(my($this_sid) = $item =~ /^$hpfx(.+)$/) { 750 my $hr = $self->GetHistory($this_sid); # History Reference 751 my $ll = "$1 : ".substr(($hr->{Name}||''),0,64); # Telnet-Safe-Name 752 push(@MSG, [ ($strg->OpenStorage($this_sid) ? 1 : 5 ), $ll]); 753 $strg->ClipboardRemove($item) if $sha eq 'drop'; 754 push(@{$drop->{($hr->{FirstSeen}||0)}},$this_sid) if $sha eq 'cleanup'; # Create list with possible items to delete 755 $cbi++; 756 } 757 } 758 759 760 # Walk droplist (if any). From oldest to newest 761 foreach my $d_sid ( map(@{$drop->{$_}}, sort({$a<=>$b} keys(%$drop))) ) { 762 next if $strg->OpenStorage($d_sid); # do not even try to drop existing items (wouldn't do much harm but...) 763 last if ( $cbi-$drop_c++ ) <= HIST_MAX; # Abort if we reached our limit 764 $strg->ClipboardRemove(HPFX.$d_sid); # Still here ? -> ditch it. (fixme: HPFX concating is ugly) 765 } 766 767 768 push(@MSG, [1, ($sha eq 'drop' ? "History cleared" : "$cbi item".($cbi == 1 ? '' : 's')." stored in history")]); 769 } 770 elsif(length($sha)) { 771 if(my $ref = $self->GetHistory($sha)) { 772 if($cmd eq 'show') { 773 foreach my $k (sort keys(%$ref)) { 774 push(@MSG,[1, sprintf("%20s -> %s",$k,$ref->{$k})]); 775 } 776 } 777 elsif($cmd eq 'forget') { 778 $strg->ClipboardRemove($hkey); 779 push(@MSG, [1, "history for $sha has been removed"]); 780 } 781 else { 782 push(@MSG, [2, "unknown subcommand, see 'help history'"]); 783 } 784 } 785 else { 786 push(@MSG, [2,"queue item $sha has no history"]); 787 } 788 } 789 else { 790 push(@MSG, [2,"See 'help history'"]); 791 } 792 793 return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC}); 794 } 795 796 ########################################################################## 797 # Add a new item to queue (Also creates a new storage) 798 sub AddItem { 799 my($self, %args) = @_; 800 801 my $name = $args{Name}; 802 my $chunks = $args{Chunks} or $self->panic("No chunks?!"); 803 my $size = $args{Size}; 804 my $overst = $args{Overshoot}; 805 my $flayout = $args{FileLayout} or $self->panic("FileLayout missing"); 806 my $shaname = ($args{ShaName} || unpack("H*", $self->{super}->Tools->sha1($name))); 807 my $owner = ref($args{Owner}) or $self->panic("No owner?"); 808 my $sobj = 0; 809 my $history = $self->{super}->Configuration->GetValue('history'); 810 811 if($size == 0 && $chunks != 1) { 812 $self->panic("Sorry: You can not create a dynamic storage with multiple chunks ($chunks != 1)"); 813 } 814 if(!defined($name)) { 815 $self->panic("AddItem needs a name!"); 816 } 817 if(length($shaname) != SHALEN) { 818 $self->panic("Invalid shaname: $shaname"); 819 } 820 821 822 if($self->{super}->Storage->OpenStorage($shaname)) { 823 $@ = "$shaname: item exists in queue"; 824 } 825 elsif($history && $self->GetHistory($shaname)) { 826 $@ = "$shaname: has already been downloaded. Use 'history $shaname forget' if you want do re-download it"; 827 $self->warn($@); 828 } 829 elsif($sobj = $self->{super}->Storage->CreateStorage(StorageId => $shaname, Size=>$size, Chunks=>$chunks, Overshoot=>$overst, FileLayout=>$flayout)) { 830 $sobj->SetSetting('owner', $owner); 831 $sobj->SetSetting('name' , $name); 832 $sobj->SetSetting('createdat', $self->{super}->Network->GetTime); 833 if($history) { 834 $self->ModifyHistory($shaname, Name=>$name, Canceled=>'never', Started=>'', 835 Ended=>'never', Committed=>'never', FirstSeen=>$self->{super}->Network->GetTime); 836 } 837 } 838 else { 839 $self->panic("CreateStorage for $shaname failed"); 840 } 841 842 return $sobj; 843 } 844 845 ########################################################################## 846 # Removes an item from the queue + storage 847 sub RemoveItem { 848 my($self,$sid) = @_; 849 my $ret = $self->{super}->Storage->RemoveStorage($sid); 850 if(!$ret) { 851 $self->panic("Unable to remove storage-object $sid : $!"); 852 } 853 854 delete($self->{statistics}->{$sid}) or $self->panic("Cannot remove non-existing statistics for $sid"); 855 return 1; 856 } 857 858 ########################################################################## 859 # Updates/creates on-disk history of given sid 860 # Note: Strings with length == 0 are replaced with the current time. Awkward. 861 sub ModifyHistory { 862 my($self,$sid, %args) = @_; 863 if($self->{super}->Storage->OpenStorage($sid)) { 864 my $old_ref = $self->GetHistory($sid); 865 foreach my $k (keys(%args)) { 866 my $v = $args{$k}; 867 $v = "".localtime($self->{super}->Network->GetTime) if length($v) == 0; 868 $old_ref->{$k} = $v; 869 } 870 return $self->{super}->Storage->ClipboardSet(HPFX.$sid, $self->{super}->Tools->RefToCBx($old_ref)); 871 } 872 else { 873 return 0; 874 } 875 } 876 877 ########################################################################## 878 # Returns history of given sid 879 sub GetHistory { 880 my($self,$sid) = @_; 881 my $r = $self->{super}->Tools->CBxToRef($self->{super}->Storage->ClipboardGet(HPFX.$sid)); 882 return $r; 883 } 884 885 ########################################################################## 886 # Set private statistics 887 # You are supposed to set total_bytes, total_chunks, done_bytes, done_chunks, 888 # uploaded_bytes, clients, active_clients, last_recv 889 # ..and we do not save anything.. you'll need to do this on your own :-) 890 891 sub SetStats { 892 my($self, $id, $ref) = @_; 893 foreach my $xk (keys(%$ref)) { 894 $self->{statistics}->{$id}->{$xk} = $ref->{$xk}; 895 } 896 } 897 898 ########################################################################## 899 # Flushes all stats and sets all common fields to 0 900 sub InitializeStats { 901 my($self, $id) = @_; 902 return $self->SetStats($id, {total_bytes=>0, done_bytes=>0, uploaded_bytes=>0, active_clients=>0, 903 clients=>0, speed_upload =>0, speed_download => 0, last_recv => 0, 904 total_chunks=>0, done_chunks=>0}); 905 } 906 907 sub IncrementStats { 908 my($self, $id, $ref) = @_; 909 foreach my $xk (keys(%$ref)) { 910 $self->SetStats($id,{$xk => $self->GetStat($id,$xk)+$ref->{$xk}}); 911 } 912 } 913 sub DecrementStats { 914 my($self, $id, $ref) = @_; 915 foreach my $xk (keys(%$ref)) { 916 $self->SetStats($id,{$xk => $self->GetStat($id,$xk)-$ref->{$xk}}); 917 } 918 } 919 920 ########################################################################## 921 # Get private statistics 922 sub GetStats { 923 my($self,$id) = @_; 924 return $self->{statistics}->{$id}; 925 } 926 927 ########################################################################## 928 # Get single statistics key 929 sub GetStat { 930 my($self,$id,$key) = @_; 931 return $self->GetStats($id)->{$key}; 932 } 933 934 ########################################################################## 935 # Returns a list with all queue objects 936 sub GetQueueList { 937 my($self) = @_; 938 my $xh = (); 939 my $all_ids = $self->{super}->Storage->GetStorageItems(); 940 foreach my $id (@$all_ids) { 941 my $so = $self->{super}->Storage->OpenStorage($id) or $self->panic("Unable to open $id"); 942 my $name = $so->GetSetting('name'); 943 my $type = ($so->GetSetting('type') or "????"); 944 $xh->{$type}->{$id} = { name=>$name }; 945 } 946 return $xh; 947 } 948 949 ########################################################################## 950 # Returns true if download is marked as paused 951 sub IsPaused { 952 my($self,$sid) = @_; 953 my $so = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist"); 954 return ( $so->GetSetting('_paused') ? 1 : 0 ); 955 } 956 957 ########################################################################## 958 # Returns true if download is marked as *auto* paused 959 sub IsAutoPaused { 960 my($self,$sid) = @_; 961 my $so = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist"); 962 my $val = ($so->GetSetting('_paused') || 0); 963 return ( $val == STATE_AUTOPAUSED ? 1 : 0 ); 964 } 965 966 ########################################################################## 967 # Set autopause flag of specified SID 968 sub SetAutoPaused { 969 my($self,$sid) = @_; 970 my $so = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist"); 971 $so->SetSetting('_paused', STATE_AUTOPAUSED); 972 } 973 974 ########################################################################## 975 # Set normal pause flag of specified SID 976 sub SetPaused { 977 my($self,$sid) = @_; 978 my $so = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist"); 979 $so->SetSetting('_paused', STATE_PAUSED); 980 } 981 982 ########################################################################## 983 # Remove paused flag 984 sub SetUnPaused { 985 my($self,$sid) = @_; 986 my $so = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist"); 987 $so->SetSetting('_paused', 0); 988 } 989 990 sub debug { my($self, $msg) = @_; $self->{super}->debug("QueueMGR: ".$msg); } 991 sub info { my($self, $msg) = @_; $self->{super}->info("QueueMGR: ".$msg); } 992 sub warn { my($self, $msg) = @_; $self->{super}->warn("QueueMGR: ".$msg); } 993 sub panic { my($self, $msg) = @_; $self->{super}->panic("QueueMGR: ".$msg); } 994 sub stop { my($self, $msg) = @_; $self->{super}->stop("QueueMGR: ".$msg); } 995 9961; 997 998############################################################################################################### 999# Bitflu Sammelsurium 1000package Bitflu::Tools; 1001 1002 use MIME::Base64 (); 1003 1004 ########################################################################## 1005 # Create new object and try to load a module 1006 # Note: new() this gets called before ::Configuration is ready! 1007 # You can't use fancy stuff such as ->debug. ->stop should work 1008 sub new { 1009 my($class, %args) = @_; 1010 my $self = { super => $args{super}, ns => '', mname => '' }; 1011 bless($self,$class); 1012 1013 foreach my $mname (qw(Digest::SHA Digest::SHA1)) { 1014 my $code = "use $mname; \$self->{ns} = $mname->new; \$self->{mname} = \$mname"; 1015 eval $code; 1016 } 1017 1018 unless($self->{mname}) { 1019 $self->stop("No SHA1-Module found. Bitflu requires 'Digest::SHA' (http://search.cpan.org)"); 1020 } 1021 1022 return $self; 1023 } 1024 1025 sub init { return 1 } 1026 1027 ########################################################################## 1028 # Return hexed sha1 of $buff 1029 sub sha1_hex { 1030 my($self, $buff) = @_; 1031 $self->{ns}->add($buff); 1032 return $self->{ns}->hexdigest; 1033 } 1034 1035 ########################################################################## 1036 # Return sha1 of $buff 1037 sub sha1 { 1038 my($self,$buff) = @_; 1039 $self->{ns}->add($buff); 1040 return $self->{ns}->digest; 1041 } 1042 1043 ########################################################################## 1044 # Encode string into base32 1045 sub encode_b32 { 1046 my($self,$val) = @_; 1047 my $s = unpack("B*",$val); 1048 $s =~ s/(.{5})/000$1/g; # Convert 5-byte-chunks to 8-byte-chunks 1049 my $len = length($s); 1050 my $olen = $len % 8; 1051 1052 if($olen) { 1053 $s = substr($s,0,$len-$olen)."000".substr($s,-1*$olen).("0" x (5 - $olen)); 1054 } 1055 1056 $s = pack("B*",$s); 1057 $s =~ tr/\0-\37/A-Z2-7/; # Octal! 1058 return $s; 1059 } 1060 1061 ########################################################################## 1062 # Decode base32 into string 1063 sub decode_b32 { 1064 my($self,$val) = @_; 1065 my $s = uc($val); 1066 $s =~ tr/A-Z2-7/\0-\37/; 1067 $s = unpack("B*", $s); 1068 $s =~ s/000(.{5})/$1/g; 1069 if( my $olen = -1*(length($s)%8) ) { 1070 $s = substr($s,0,$olen); 1071 } 1072 return pack("B*",$s); 1073 } 1074 1075 ########################################################################## 1076 # Decode base64 into string 1077 sub decode_b64 { 1078 my($self,$val) = @_; 1079 return MIME::Base64::decode($val); 1080 } 1081 1082 1083 ########################################################################## 1084 # Parse a magnet link 1085 sub decode_magnet { 1086 my($self,$uri) = @_; 1087 my $xt = {}; 1088 if($uri =~ /^magnet:\?(.+)$/) { 1089 foreach my $item (split(/&/,$1)) { 1090 if($item =~ /^(([^=\.]+)(\.\d+)?)=(.+)$/) { 1091 my $mk = $2; 1092 my @it = split(/:/,$4); 1093 my $mv = pop(@it); 1094 my $sk = join(':',@it) || ":"; 1095 push(@{$xt->{$mk}}, {$sk => $mv}); 1096 } 1097 } 1098 } 1099 return $xt; 1100 } 1101 1102 sub ExpandRange { 1103 my($self,@a) = @_; 1104 my %dedupe = (); 1105 foreach my $chunk (@a) { 1106 if($chunk =~ /^(\d+)-(\d+)$/) { if($2 <= 0xFFFF) { for($1..$2) { $dedupe{$_} = 1; } } } 1107 elsif($chunk =~ /^(\d+)$/) { $dedupe{abs($1)} = 1; } 1108 } 1109 return \%dedupe; 1110 } 1111 1112 1113 ########################################################################## 1114 # Escape a HTTP-URI-Escaped string 1115 sub UriUnescape { 1116 my($self,$string) = @_; 1117 $string =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg; 1118 return $string; 1119 } 1120 1121 ########################################################################## 1122 # Escape string 1123 sub UriEscape { 1124 my($self,$string) = @_; 1125 $string =~ s/([^A-Za-z0-9\-_.!~*'()\/])/sprintf("%%%02X",ord($1))/eg; 1126 return $string; 1127 } 1128 1129 ########################################################################## 1130 # Converts a CBX into a hashref 1131 sub CBxToRef { 1132 my($self,$buff) = @_; 1133 my $r = undef; 1134 $buff ||= ''; 1135 foreach my $line (split(/\n/,$buff)) { 1136 chomp($line); 1137 if($line =~ /^#/ or $line =~ /^\s*$/) { 1138 next; # Comment or empty line 1139 } 1140 elsif($line =~ /^([a-zA-Z0-9_:\.]+)\s*=\s*(.*)$/) { 1141 $r->{$1} = $2; 1142 } 1143 else { 1144 # Ignore. Can't use panic anyway 1145 } 1146 } 1147 return $r; 1148 } 1149 1150 ########################################################################## 1151 # Convert hashref into CBX 1152 sub RefToCBx { 1153 my($self,$ref) = @_; 1154 my @caller = caller; 1155 my $buff = "# Written by $caller[0]\@$caller[2] on ".gmtime()."\n"; 1156 foreach my $key (sort(keys(%$ref))) { 1157 my $val = $ref->{$key}; 1158 $key =~ tr/a-zA-Z0-9_:\.//cd; 1159 $val =~ tr/\r\n//d; 1160 $buff .= sprintf("%-25s = %s\n",$key, $val); 1161 } 1162 return $buff."# EOF #\n"; 1163 } 1164 1165 ########################################################################## 1166 # Generates a 'find' like dirlist 1167 sub GenDirList { 1168 my($self,$dstruct, $dir) = @_; 1169 push(@{$dstruct->{_}},$dir); 1170 my $pfx = join('/',@{$dstruct->{_}}); 1171 opendir(my $DFH, $pfx); 1172 foreach my $dirent (readdir($DFH)) { 1173 my $fp = "$pfx/".$dirent; 1174 next if $dirent eq '.'; # No thanks 1175 next if $dirent eq '..'; # Ditto 1176 next if (-l $fp); # Won't look at symlinks 1177 push(@{$dstruct->{list}},$fp); 1178 $self->GenDirList($dstruct,$dirent) if -d $fp; 1179 } 1180 closedir($DFH); 1181 pop(@{$dstruct->{_}}); 1182 } 1183 1184 ########################################################################## 1185 # Getopts like support 1186 sub GetOpts { 1187 my($self,$args) = @_; 1188 my @leftovers = (); 1189 my $ctx = undef; 1190 my $argref = {}; 1191 my $getargs = 1; 1192 foreach my $this_arg (@$args) { 1193 1194 if($getargs) { 1195 if($this_arg eq '--') { 1196 $getargs = 0; 1197 } 1198 elsif($this_arg eq '--all') { 1199 my $ql = $self->{super}->Queue->GetQueueList; 1200 foreach my $protocol (keys(%$ql)) { 1201 push(@leftovers, keys(%{$ql->{$protocol}})); 1202 } 1203 } 1204 elsif($this_arg =~ /^--?(.+)/) { 1205 $ctx = $1; 1206 $argref->{$ctx} = 1 if !exists $argref->{$ctx}; 1207 } 1208 elsif(defined($ctx)) { 1209 $argref->{$ctx} = $this_arg; 1210 $ctx = undef; 1211 } 1212 else { 1213 push(@leftovers, $this_arg); 1214 } 1215 } 1216 else { 1217 push(@leftovers, $this_arg); 1218 } 1219 1220 } 1221 @$args = @leftovers; 1222 return $argref; 1223 } 1224 1225 ########################################################################## 1226 # Return exclusive name 1227 sub GetExclusiveDirectory { 1228 my($self,$base,$id) = @_; 1229 my $xname = undef; 1230 foreach my $sfx (0..0xFFFF) { 1231 $xname = $base."/".$id; 1232 $xname .= ".$sfx" if $sfx != 0; 1233 unless(-e $xname) { 1234 return $xname; 1235 } 1236 } 1237 return undef; 1238 } 1239 1240 ########################################################################## 1241 # Return exclusive name for a file 1242 sub GetExclusivePath { 1243 my($self, $basedir) = @_; 1244 my $dest = ''; 1245 my $i = 0; 1246 while(1) { 1247 $dest = sprintf("%s/%x-%x-%x.tmp_$i", $basedir, $$, int(rand(0xFFFFFF)), int(time())); 1248 return $dest if !(-e $dest); 1249 } 1250 return undef; 1251 } 1252 1253 ########################################################################## 1254 # Return path to non-existing file within tempdir 1255 sub GetExclusiveTempfile { 1256 my($self) = @_; 1257 return $self->GetExclusivePath($self->GetTempdir); 1258 } 1259 1260 ########################################################################## 1261 # Return path to non-existing directory within tempdir 1262 sub GetExclusiveTempdir { 1263 my($self,$id) = @_; 1264 1265 $id = time()."_".int(rand(0xFFFF)) unless defined $id; 1266 return $self->GetExclusiveDirectory($self->GetTempdir, $id); 1267 } 1268 1269 1270 ########################################################################## 1271 # Return temp directory 1272 sub GetTempdir { 1273 my($self) = @_; 1274 return $self->{super}->Configuration->GetValue('workdir')."/tmp"; 1275 } 1276 1277 ########################################################################## 1278 # looping sysread implementation 1279 # *BSD doesn't like big LENGTH values on sysread 1280 # This provides a crappy warper to 'fix' this problem 1281 # syswrite() doesn't seem to suffer the same problem ... 1282 sub Sysread { 1283 my($self, $fh, $ref, $bytes_needed) = @_; 1284 1285 my $bytes_left = $bytes_needed; 1286 my $buff = ''; 1287 1288 $self->panic("Cannot read $bytes_needed bytes") if $bytes_needed < 0; 1289 while($bytes_left > 0) { 1290 my $br = sysread($fh, $buff, $bytes_left); 1291 if($br) { ${$ref} .= $buff; $bytes_left -= $br; } # Data 1292 elsif(defined($br)) { last; } # EOF 1293 else { return undef; } # Error 1294 } 1295 return ($bytes_needed-$bytes_left); 1296 } 1297 1298 ######################################################################## 1299 # Decodes Compact IP-Chunks 1300 sub DecodeCompactIp { 1301 my($self, $compact_list) = @_; 1302 my @peers = (); 1303 for(my $i=0;$i<length($compact_list);$i+=6) { 1304 my $chunk = substr($compact_list, $i, 6); 1305 my($a,$b,$c,$d,$port) = unpack("CCCCn", $chunk); 1306 my $ip = "$a.$b.$c.$d"; 1307 push(@peers, {ip=>$ip, port=>$port, peer_id=>""}); 1308 } 1309 return @peers; 1310 } 1311 1312 ######################################################################## 1313 # Decodes IPv6 Chunks 1314 sub DecodeCompactIpV6 { 1315 my($self, $compact_list) = @_; 1316 my @peers = (); 1317 for(my $i=0;$i<length($compact_list);$i+=18) { 1318 my $chunk = substr($compact_list, $i, 18); 1319 my(@sx) = unpack("nnnnnnnnn", $chunk); 1320 my $port = pop(@sx); 1321 my $ip = join(':',map(sprintf("%x", $_),@sx)); # Must match ExpandIpv6, otherwise babies will cry and cats might even die. 1322 push(@peers, {ip=>$ip, port=>$port, peer_id=>""}); 1323 } 1324 return @peers; 1325 } 1326 1327 ################################################################################################ 1328 # Stolen from http://www.stonehenge.com/merlyn/UnixReview/col30.html 1329 sub DeepCopy { 1330 my($self,$this) = @_; 1331 if (not ref $this) { 1332 $this; 1333 } elsif (ref $this eq "ARRAY") { 1334 [map $self->DeepCopy($_), @$this]; 1335 } elsif (ref $this eq "HASH") { 1336 +{map { $_ => $self->DeepCopy($this->{$_}) } keys %$this}; 1337 } else { Carp::confess "what type is $_?" } 1338 } 1339 1340 ################################################################################################ 1341 # Decode bencoded data 1342 sub BencDecode { 1343 return Bitflu::Bencoder::decode($_[1]); 1344 } 1345 1346 ################################################################################################ 1347 # Serialize bencoded data 1348 sub BencEncode { 1349 return Bitflu::Bencoder::encode($_[1]); 1350 } 1351 1352 ################################################################################################ 1353 # Load file from disc and return both raw+decoded data 1354 sub BencfileToHash { 1355 my($self,$file) = @_; 1356 1357 open(BENC, "<", $file) or return {}; 1358 my $buff = ''; 1359 $self->Sysread(*BENC, \$buff, (2**25)); 1360 close(BENC); 1361 return {} if (!defined($buff) or length($buff)==0); # File too short 1362 1363 my $decoded = $self->BencDecode($buff); 1364 return {} if ref($decoded) ne 'HASH'; # Decoding failed 1365 1366 return { content=>$decoded, raw_content=>$buff }; # All ok! 1367 } 1368 1369 ################################################################################################ 1370 # Guess eta for given hash 1371 sub GetETA { 1372 my($self, $key) = @_; 1373 1374 my $so = $self->{super}->Storage->OpenStorage($key) or return undef; 1375 my $created = $so->GetSetting('createdat') or return undef; 1376 my $stats = $self->{super}->Queue->GetStats($key); 1377 my $age = time()-$created; 1378 1379 if($age > 0) { 1380 my $not_done = $stats->{total_bytes} - $stats->{done_bytes}; 1381 my $bps = $stats->{done_bytes} / $age; 1382 $bps = $stats->{speed_download} if $stats->{speed_download} > $bps; # we are optimistic ;-) 1383 my $eta_sec = ( $bps > 0 ? ($not_done / $bps) : undef ); 1384 return $eta_sec; 1385 } 1386 # else 1387 return undef; 1388 } 1389 1390 ################################################################################################ 1391 # Convert number of seconds into something for humans 1392 sub SecondsToHuman { 1393 my($self,$sec) = @_; 1394 1395 if(!defined($sec)) { 1396 return 'inf.'; 1397 } 1398 elsif($sec < 5) { 1399 return '-'; 1400 } 1401 elsif($sec < 60) { 1402 return int($sec)." sec"; 1403 } 1404 elsif($sec < 60*90) { 1405 return int($sec/60)."m"; 1406 } 1407 elsif($sec < 3600*48) { 1408 return sprintf("%.1fh", $sec/3600); 1409 } 1410 elsif($sec < 86400*10) { 1411 return sprintf("%.1fd", $sec/86400); 1412 } 1413 elsif($sec < 86400*31) { 1414 return sprintf("%.1fw", $sec/86400/7); 1415 } 1416 else { 1417 return ">4w" 1418 } 1419 1420 } 1421 1422 sub warn { my($self, $msg) = @_; $self->{super}->warn(ref($self).": ".$msg); } 1423 sub debug { my($self, $msg) = @_; $self->{super}->debug(ref($self).": ".$msg); } 1424 sub stop { my($self, $msg) = @_; $self->{super}->stop(ref($self).": ".$msg); } 1425 14261; 1427 1428 1429############################################################################################################### 1430# Bitflu Admin-Dispatcher : Release 20070319_1 1431package Bitflu::Admin; 1432 1433 ########################################################################## 1434 # Guess what? 1435 sub new { 1436 my($class, %args) = @_; 1437 my $self = {super=> $args{super}, cmdlist => {}, notifylist => {}, complist=>[] }; 1438 bless($self,$class); 1439 return $self; 1440 } 1441 1442 ########################################################################## 1443 # Init plugin 1444 sub init { 1445 my($self) = @_; 1446 $self->RegisterCommand("help", $self, 'admincmd_help' , 'Displays what you are reading now', 1447 [ [undef, "Use 'help' to get a list of all commands"], [undef, "Type 'help command' to get help about 'command'"] ]); 1448 $self->RegisterCommand("plugins", $self, 'admincmd_plugins', 'Displays all loaded plugins'); 1449 $self->RegisterCommand("useradmin",$self, 'admincmd_useradm', 'Create and modify accounts', 1450 [ [undef, "Usage: useradmin [set username password] [delete username] [list]"] ]); 1451 $self->RegisterNotify($self, 'receive_notify'); 1452 $self->RegisterCommand("log", $self, 'admincmd_log', 'Display last log output', 1453 [ [undef, "Usage: log [-limit]"], [undef, 'Example: log -10 # <-- displays the last 10 log entries'] ] ); 1454 return 1; 1455 } 1456 1457 ########################################################################## 1458 # Return logbuffer 1459 sub admincmd_log { 1460 my($self, @args) = @_; 1461 my @A = (); 1462 my @log = @{$self->{super}->{_LogBuff}}; 1463 my $opts = $self->{super}->Tools->GetOpts(\@args); 1464 my $limit = int(((keys(%$opts))[0]) || 0); 1465 my $logsize = int(@log); 1466 my $logat = ( $limit ? ( $logsize - $limit ) : 0 ); 1467 my $i = 0; 1468 1469 foreach my $ll (@log) { 1470 next if $i++ < $logat; 1471 chomp($ll); 1472 push(@A, [undef, $ll]); 1473 } 1474 return({MSG=>\@A, SCRAP=>[]}); 1475 } 1476 1477 ########################################################################## 1478 # Display registered plugins 1479 sub admincmd_plugins { 1480 my($self) = @_; 1481 1482 1483 my @A = ([1, "Known plugins:"]); 1484 1485 foreach my $pref (@{$self->{super}->{_Plugins}}) { 1486 push(@A, [undef, " ".$pref->{package}]); 1487 } 1488 1489 push(@A, [undef,''],[1,"Scheduler jobs:"]); 1490 foreach my $rref (values(%{$self->{super}->{_Runners}})) { 1491 push(@A, [undef, " ".$rref->{target}]); 1492 } 1493 1494 return({MSG=>\@A, SCRAP=>[]}); 1495 } 1496 1497 ########################################################################## 1498 # Notification handler, we are just going to print them out using the logging 1499 sub receive_notify { 1500 my($self,$msg) = @_; 1501 $self->info("#NOTIFICATION#: $msg"); 1502 } 1503 1504 ########################################################################## 1505 # BareBones help 1506 sub admincmd_help { 1507 my($self,$topic) = @_; 1508 my @A = (); 1509 1510 if($topic) { 1511 if(defined($self->GetCommands->{$topic})) { 1512 my @instances = @{$self->GetCommands->{$topic}}; 1513 1514 foreach my $ci (@instances) { 1515 push(@A, [3, "Command '$topic' (Provided by plugin $ci->{class})"]); 1516 if($ci->{longhelp}) { 1517 push(@A, @{$ci->{longhelp}}); 1518 } 1519 else { 1520 push(@A, [undef, $ci->{help}]); 1521 } 1522 push(@A, [undef, '']); 1523 } 1524 } 1525 else { 1526 push(@A, [2, "No help for '$topic', command does not exist"]); 1527 } 1528 } 1529 else { 1530 foreach my $xcmd (sort (keys %{$self->GetCommands})) { 1531 my $lb = sprintf("%-20s", $xcmd); 1532 my @hlps = (); 1533 foreach my $instance (@{$self->GetCommands->{$xcmd}}) { 1534 push(@hlps, "$instance->{help}");; 1535 } 1536 1537 $lb .= join(' / ',@hlps); 1538 1539 push(@A, [undef, $lb]); 1540 } 1541 } 1542 1543 1544 return({MSG=>\@A, SCRAP=>[]}); 1545 } 1546 1547 ########################################################################## 1548 # Handles useradm commands 1549 sub admincmd_useradm { 1550 my($self, @args) = @_; 1551 1552 my @A = (); 1553 my $ERR = ''; 1554 1555 my($cmd,$usr,$pass) = @args; 1556 1557 if($cmd eq 'set' && $pass) { 1558 $self->__useradm_modify(Inject => {User=>$usr, Pass=>$pass}); 1559 push(@A, [1, "Useraccount updated"]); 1560 } 1561 elsif($cmd eq 'delete' && $usr) { 1562 if(defined $self->__useradm_modify->{$usr}) { 1563 # -> Account exists 1564 $self->__useradm_modify(Drop => {User=>$usr}); 1565 push(@A, [1, "Account '$usr' removed"]); 1566 $self->panic("BUG") if (defined $self->__useradm_modify->{$usr}); # Paranoia check 1567 } 1568 else { 1569 push(@A, [2, "Account '$usr' does not exist"]); 1570 } 1571 } 1572 elsif($cmd eq 'list') { 1573 push(@A, [3, "Configured accounts:"]); 1574 foreach my $k (keys(%{$self->__useradm_modify})) { 1575 push(@A, [undef,$k]); 1576 } 1577 } 1578 else { 1579 $ERR .= "Usage error, type 'help useradmin' for more information"; 1580 } 1581 return({MSG=>\@A, SCRAP=>[], NOEXEC=>$ERR}); 1582 } 1583 1584 ########################################################################## 1585 # Create password entry 1586 sub __useradm_mkentry { 1587 my($self,$usr,$pass) = @_; 1588 $usr =~ tr/: ;=//d; 1589 return undef if length($usr) == 0; 1590 return $usr.":".$self->{super}->Tools->sha1_hex("$usr;$pass"); 1591 } 1592 1593 ########################################################################## 1594 # Modify current setting 1595 sub __useradm_modify { 1596 my($self,%args) = @_; 1597 my @result = (); 1598 my $allusr = {}; 1599 my $to_inject = ''; 1600 my $delta = 0; 1601 foreach my $entry (split(/;/,($self->{super}->Configuration->GetValue('useradm') || ''))) { 1602 if(my($user,$hash) = $entry =~ /^([^:]*):(.+)$/) { 1603 if ($user ne ($args{Inject}->{User} || '') && $user ne ($args{Drop}->{User} || '')) { 1604 push(@result,$entry); 1605 } 1606 else { 1607 $delta++; 1608 } 1609 $allusr->{$user} = $entry; 1610 } 1611 else { 1612 $self->warn("Useradmin: Wiping garbage entry: '$entry'"); 1613 } 1614 } 1615 1616 if(exists($args{Inject}->{User})) { 1617 $to_inject = $args{Inject}->{User}; 1618 $to_inject =~ tr/:; //d; 1619 $delta++; 1620 } 1621 1622 if(length($to_inject) > 0) { 1623 push(@result,$self->__useradm_mkentry($to_inject,$args{Inject}->{Pass})); 1624 } 1625 1626 $self->{super}->Configuration->SetValue('useradm', join(';', @result)) if $delta; 1627 return $allusr; 1628 } 1629 1630 1631 ########################################################################## 1632 # Register Notify handler 1633 sub RegisterNotify { 1634 my($self, $xref, $xcmd) = @_; 1635 $self->debug("RegisterNotify: Will notify $xref via $xref->$xcmd"); 1636 $self->{notifylist}->{$xref} = { class => $xref, cmd => $xcmd }; 1637 } 1638 1639 ########################################################################## 1640 # Send out notifications 1641 sub SendNotify { 1642 my($self,$msg) = @_; 1643 foreach my $kx (keys(%{$self->{notifylist}})) { 1644 my $nc = $self->{notifylist}->{$kx}; 1645 my $class = $nc->{class}; my $cmd = $nc->{cmd}; 1646 $class->$cmd($msg); 1647 } 1648 } 1649 1650 ########################################################################## 1651 # Registers a new command to be used with ExecuteCommand 1652 sub RegisterCommand { 1653 my($self,$name,$xref,$xcmd,$helptext,$longhelp) = @_; 1654 $self->debug("RegisterCommand: Hooking $name to $xref->$xcmd"); 1655 push(@{$self->{cmdlist}->{$name}}, {class=>$xref, cmd=>$xcmd, help=>$helptext, longhelp=>$longhelp}); 1656 $helptext or $self->panic("=> $xcmd ; $xref"); 1657 } 1658 1659 ########################################################################## 1660 # Returns the full cmdlist 1661 sub GetCommands { 1662 my($self) = @_; 1663 return $self->{cmdlist}; 1664 } 1665 1666 ########################################################################## 1667 # Register as completion service 1668 sub RegisterCompletion { 1669 my($self,$xref,$xcmd) = @_; 1670 $self->debug("RegisterCompletition: Will ask $xref->$xcmd for completion"); 1671 push(@{$self->{complist}}, { class=>$xref, cmd=>$xcmd }); 1672 } 1673 1674 ########################################################################## 1675 # Callss all completition services and returns the result 1676 sub GetCompletion { 1677 my($self,$hint) = @_; 1678 my @result = (); 1679 foreach my $xref (@{$self->{complist}}) { 1680 my $class = $xref->{class} or $self->panic("$xref has no class!"); 1681 my $cmd = $xref->{cmd} or $self->panic("$xref has no cmd!"); 1682 my @this = $class->$cmd($hint); 1683 push(@result,@this); 1684 } 1685 return @result; 1686 } 1687 1688 ########################################################################## 1689 # Execute a command! 1690 sub ExecuteCommand { 1691 my($self,$command,@args) = @_; 1692 1693 1694 my $plugin_hits = 0; 1695 my $plugin_fails = 0; 1696 my $plugin_ok = 0; 1697 my @plugin_msg = (); 1698 my @plugin_nex = (); 1699 1700 if(ref($self->GetCommands->{$command}) eq "ARRAY") { 1701 foreach my $ref (@{$self->GetCommands->{$command}}) { 1702 $plugin_hits++; 1703 my $class = $ref->{class}; 1704 my $cmd = $ref->{cmd}; 1705 my $bref = $class->$cmd(@args); 1706 my $SCRAP = $bref->{SCRAP} or $self->panic("$class -> $cmd returned no SCRAP"); 1707 my $MSG = $bref->{MSG} or $self->panic("$class -> $cmd returned no MSG"); 1708 my $ERR = $bref->{NOEXEC}; 1709 @args = @$SCRAP; 1710 1711 push(@plugin_msg, @$MSG); 1712 1713 if($ERR) { 1714 push(@plugin_nex, $ERR); # Plugin usage error 1715 } 1716 else { 1717 $plugin_ok++; # Plugin could do something 1718 } 1719 } 1720 } 1721 1722 if($plugin_hits == 0) { 1723 push(@plugin_msg, [2, "Unknown command '$command'"]); 1724 $plugin_fails++; 1725 } 1726 else { 1727 foreach my $leftover (@args) { 1728 push(@plugin_msg, [2, "Failed to execute '$command $leftover'"]); 1729 $plugin_fails++; 1730 } 1731 1732 if($plugin_ok == 0) { 1733 # Nothing executed, display all usage 'hints' 1734 foreach my $xerr (@plugin_nex) { 1735 push(@plugin_msg, [2, $xerr]); 1736 $plugin_fails++; 1737 } 1738 } 1739 } 1740 return({MSG=>\@plugin_msg, FAILS=>$plugin_fails}); 1741 } 1742 1743 ########################################################################## 1744 # Returns TRUE if Authentication was successful (or disabled (= no accounts)) 1745 sub AuthenticateUser { 1746 my($self,%args) = @_; 1747 my $numentry = int(keys(%{$self->__useradm_modify})); 1748 return 1 if $numentry == 0; # No users, no security 1749 1750 my $expect = $self->__useradm_mkentry($args{User}, $args{Pass}); 1751 my $have = $self->__useradm_modify->{$args{User}}; 1752 1753 if(defined($expect) && defined($have) && $have eq $expect ) { 1754 return 1; 1755 } 1756 else { 1757 return 0; 1758 } 1759 } 1760 1761 1762 sub warn { my($self, $msg) = @_; $self->{super}->warn("Admin : ".$msg); } 1763 sub debug { my($self, $msg) = @_; $self->{super}->debug("Admin : ".$msg); } 1764 sub info { my($self, $msg) = @_; $self->{super}->info("Admin : ".$msg); } 1765 sub panic { my($self, $msg) = @_; $self->{super}->panic("Admin : ".$msg); } 1766 17671; 1768 1769 1770############################################################################################################### 1771# Bitflu Network-IO Lib : Release 20090125_1 1772package Bitflu::Network; 1773 1774use strict; 1775use IO::Socket; 1776use POSIX; 1777use Danga::Socket; 1778use Hash::Util; # For chroot 1779 1780use constant DEVNULL => '/dev/null'; # Path to /dev/null 1781use constant MAXONWIRE => 1024*1024; # Do not buffer more than 1mb per client connection 1782use constant BF_BUFSIZ => 327680; # How much we shall read()/recv() from a socket per run 1783use constant NI_SIXHACK => 3; 1784use constant BPS_MIN => 16; # Minimal upload speed per socket 1785use constant NETSTATS => 2; # ReGen netstats each 2 seconds 1786use constant NETDEBUG => 0; 1787use constant BLIST_LIMIT => 1024; # NeverEver blacklist more than 1024 IPs per instance 1788use constant BLIST_TTL => 60*60; # BL entries are valid for 1 hour 1789use constant DNS_BLIST => 5; # How long shall we blacklist 'bad' dns entries (NOTE: DNS_BLIST**rowfail !) 1790use constant DNS_BLTTL => 60; # Purge any older DNS-Blacklist entries 1791use constant MAGIC_DSNUM => 1024*0.75; # Don't ask me why, but 0.75 makes our downspeed guess much better 1792use constant KILL_IPV4 => 0; # 'simulate' non-working ipv4 stack 1793 1794use fields qw( super NOWTIME avfds bpx_dn bpx_up _HANDLES _SOCKETS stagger up_q stats resolver_fail have_ipv6); 1795 1796 ########################################################################## 1797 # Creates a new Networking Object 1798 sub new { 1799 my($class, %args) = @_; 1800 my $ptype = {super=> $args{super}, NOWTIME => 0, avfds => 0, bpx_up=>BPS_MIN, stagger=>{}, up_q=>{}, bpx_dn=>undef, _HANDLES=>{}, _SOCKETS=>{}, 1801 stats => { sent=>0, recv=>0, raw_recv=>0, raw_sent=>0}, resolver_fail=>{}, have_ipv6=>0 }; 1802 1803 my $self = fields::new($class); 1804 map( $self->{$_} = delete($ptype->{$_}), keys(%$ptype) ); 1805 1806 $self->SetTime; # Adds its own Danga::Socket timer 1807 $self->{avfds} = $self->TestFileDescriptors; 1808 $self->debug("Reserved $self->{avfds} file descriptors for networking"); 1809 1810 ## danga socket tests: 1811 # check if we have >= 1.52 .. everything below does not implement ->cancel for timers 1812 eval "use Danga::Socket 1.52; 1;"; 1813 $self->stop("The installed version of Danga::Socket ($Danga::Socket::VERSION) is too old: bitflu will not work with this version.") if $@; 1814 1815 # Check if we can use ipv6 (if requested) 1816 if($self->{super}->Configuration->GetValue('ipv6')) { 1817 eval "use Danga::Socket 1.61; 1; "; # Check if at least 1.61 is installed 1818 if($@) { 1819 $self->warn("Danga::Socket 1.61 is required for IPv6 support."); 1820 $self->warn("Disabling IPv6 due to outdated Danga::Socked version"); 1821 } 1822 else { 1823 eval { 1824 require IO::Socket::INET6; 1825 require Socket6; 1826 $self->{have_ipv6} = 1; 1827 1828 if( (my $isiv = $IO::Socket::INET6::VERSION) < 2.56 ) { 1829 $self->warn("Detected outdated version of IO::Socket::INET6 ($isiv)"); 1830 $self->warn("Please upgrade to IO::Socket::INET6 >= 2.56 !"); 1831 $self->warn("IPv6 might not work correctly with version $isiv"); 1832 } 1833 1834 }; 1835 } 1836 } 1837 1838 if(KILL_IPV4) { 1839 $self->warn("IPv4 support is *DISABLED*"); 1840 } 1841 1842 return $self; 1843 } 1844 1845 ########################################################################## 1846 # Register Admin commands 1847 sub init { 1848 my($self) = @_; 1849 $self->info("IPv6 support is ".($self->HaveIPv6 ? 'enabled' : 'not active')); 1850 1851 $self->{super}->AddRunner($self); 1852 $self->{super}->Admin->RegisterCommand('blacklist', $self, '_Command_Blacklist', 'Display current in-memory blacklist'); 1853 $self->{super}->Admin->RegisterCommand('netstat', $self, '_Command_Netstat', 'Display networking statistics'); 1854 $self->{super}->Admin->RegisterCommand('dig', $self, '_Command_Dig', 'Resolve a hostname'); 1855 $self->{super}->CreateSxTask(Superclass=>$self,Callback=>'_UpdateNetstats', Interval=>NETSTATS, Args=>[]); 1856 return 1; 1857 } 1858 1859 sub run { 1860 my($self) = @_; 1861 1862 1863 my $ucnt = 4; 1864 foreach my $ukey (List::Util::shuffle(keys(%{$self->{up_q}}))) { 1865 my $aref = delete($self->{up_q}->{$ukey}); 1866 $self->_WriteReal(@$aref); 1867 last if --$ucnt < 0; 1868 } 1869 1870 1871 my $ds = undef; 1872 foreach my $val (List::Util::shuffle(values(%{$self->{stagger}}))) { 1873 $ds = $val; 1874 last; 1875 } 1876 1877 if($ds && (!$self->{bpx_dn} or $self->{bpx_dn} > 0) ) { 1878 $self->_TCP_Read($ds) if $ds->sock; 1879 } 1880 1881 return 0; # Cannot use '1' due to deadlock :-) 1882 } 1883 1884 ########################################################################## 1885 # Resolver debug 1886 sub _Command_Dig { 1887 my($self, $hostname) = @_; 1888 my @A = (); 1889 if($hostname) { 1890 push(@A, [1, "Resolver result for '$hostname'"]); 1891 foreach my $entry ($self->Resolve($hostname)) { 1892 push(@A, [0, " $entry"]); 1893 } 1894 } 1895 else { 1896 push(@A, [2, "Usage: dig hostname"]); 1897 } 1898 return({MSG=>\@A, SCRAP=>[]}); 1899 } 1900 1901 ########################################################################## 1902 # Display blacklist information 1903 sub _Command_Blacklist { 1904 my($self) = @_; 1905 1906 my @A = (); 1907 foreach my $this_handle (keys(%{$self->{_HANDLES}})) { 1908 push(@A, [4, "Blacklist for '$this_handle'"]); 1909 my $count = 0; 1910 while( my($k,$v) = each(%{$self->{_HANDLES}->{$this_handle}->{blacklist}->{bldb}}) ) { 1911 my $this_ttl = $v - $self->GetTime; 1912 next if $this_ttl <= 0; 1913 push(@A, [2, sprintf(" %-24s (expires in %d seconds)", $k, $this_ttl) ]); 1914 $count++; 1915 } 1916 push(@A, [3, "$count ip(s) are blacklisted"], [undef, '']); 1917 } 1918 return({MSG=>\@A, SCRAP=>[]}); 1919 } 1920 1921 ########################################################################## 1922 # Netstat command 1923 sub _Command_Netstat { 1924 my($self) = @_; 1925 1926 my @A = (); 1927 my $staggered = int(keys(%{$self->{stagger}})); 1928 my $sock_to_handle = {}; 1929 map($sock_to_handle->{$_}=0 ,keys(%{$self->{_HANDLES}})); 1930 map($sock_to_handle->{$_->{handle}}++, values(%{$self->{_SOCKETS}})); 1931 1932 push(@A, [4, "Socket information"]); 1933 foreach my $this_handle (sort keys(%$sock_to_handle)) { 1934 my $hxref = $self->{_HANDLES}->{$this_handle}; 1935 push(@A, [3, " Statistics for '$this_handle'"]); 1936 push(@A, [1, sprintf(" %-24s : %s", "Free sockets", (exists($hxref->{avpeers}) ? sprintf("%3d",$hxref->{avpeers}) : ' -') ) ]); 1937 push(@A, [1, sprintf(" %-24s : %3d", "Used sockets", $sock_to_handle->{$this_handle}) ]); 1938 } 1939 1940 push(@A, [0, '-' x 60]); 1941 push(@A, [0, sprintf(" >> Total: used=%d / watched=%d / free=%d / staggered=%d",int(keys(%{$self->{_SOCKETS}})), Danga::Socket->WatchedSockets(),$self->{avfds}, $staggered )]); 1942 1943 push(@A, [0,''],[4, "Resolver fail-list"]); 1944 while(my($k,$r) = each(%{$self->{resolver_fail}})) { 1945 push(@A, [2, sprintf(" %-32s : Row-Fails=>%3d, FirstFail=>%s", $k, $r->{rfail}, "".localtime($r->{first_fail}))]); 1946 } 1947 1948 return({MSG=>\@A, SCRAP=>[]}); 1949 } 1950 1951 1952 1953 ########################################################################## 1954 # Test how many filedescriptors this OS / env can handle 1955 sub TestFileDescriptors { 1956 my($self) = @_; 1957 my $i = 0; 1958 my @fdx = (); 1959 my $sysr = 32; # Reserve 32 FDs for OS & co. 1960 my $canhave = 0; 1961 1962 open(FAKE, DEVNULL) or $self->stop("Unable to open ".DEVNULL.": $!"); 1963 close(FAKE); 1964 1965 while($i++ < 2048) { last unless( open($fdx[$i], DEVNULL) ); } 1966 if($i > $sysr) { $canhave = $i - $sysr; } 1967 else { $self->panic("Sorry, bitfu can not run with only $i filedescriptors left"); } 1968 1969 while(--$i > 0) { 1970 close($fdx[$i]) or $self->panic("Unable to close TestFD # $i : $!"); 1971 } 1972 1973 return $canhave; 1974 } 1975 1976 ########################################################################## 1977 # Resolve hostnames 1978 sub Resolve { 1979 my($self,$name) = @_; 1980 my @iplist = (); 1981 1982 if($self->IsValidIPv4($name) or $self->IsValidIPv6($name)) { 1983 # Do not even try to resolve things looking like IPs 1984 push(@iplist,$name); 1985 } 1986 else { 1987 my $NOWTIME = $self->GetTime; # Current time guess 1988 my $blref = $self->{resolver_fail}->{$name}; # Refernce to blacklist entry for this name (can be undef) 1989 my $is_bad = 0; # true = do not try to resolve / false = call resolver 1990 1991 if($blref && ($blref->{first_fail}+(DNS_BLIST**$blref->{rfail})) > $NOWTIME) { 1992 # -> We got an entry and it is still makred as a fail: skip resolver 1993 $self->warn("Resolve: won't resolve blacklisted DNS-Entry '$name'"); 1994 } 1995 else { 1996 if($self->HaveIPv6) { 1997 my @addr_info = Socket6::getaddrinfo($name, defined); 1998 for(my $i=0;$i+3<int(@addr_info);$i+=5) { 1999 my ($addr,undef) = Socket6::getnameinfo($addr_info[$i+3], NI_SIXHACK); 2000 next if KILL_IPV4 && $self->IsNativeIPv4($addr); 2001 push(@iplist,$addr); 2002 } 2003 } 2004 else { 2005 my @result = gethostbyname($name); 2006 @iplist = map{ inet_ntoa($_) } @result[4..$#result]; 2007 } 2008 2009 # Take care of resolver_fail: 2010 if(int(@iplist)==0) { 2011 # -> No A records? -> Mark entry as failed: 2012 $self->{resolver_fail}->{$name} ||= { first_fail=>$NOWTIME, rfail=>0 }; # Creates a new reference if empty... 2013 $self->{resolver_fail}->{$name}->{rfail}++; # Adjust row-fail count (used for timeout calculation) 2014 2015 # purge old entries (FIXME: can we use map?) 2016 while(my($xname,$xref)=each(%{$self->{resolver_fail}})) { 2017 delete($self->{resolver_fail}->{$xname}) if $xref->{first_fail}+DNS_BLTTL < $NOWTIME; 2018 } 2019 2020 } 2021 else { 2022 # -> Lookup was okay: delete fail-entry (if any) 2023 delete($self->{resolver_fail}->{$name}); 2024 } 2025 } 2026 } 2027 2028 return List::Util::shuffle(@iplist); 2029 } 2030 2031 ########################################################################## 2032 # Returns IP sorted by protocol 2033 sub ResolveByProto { 2034 my($self,$name) = @_; 2035 2036 my @iplist = $self->Resolve($name); 2037 my $list = { 4=>[], 6=>[] }; 2038 foreach my $ip (@iplist) { 2039 if($self->IsNativeIPv4($ip)) { push(@{$list->{4}},$ip) } 2040 if($self->IsNativeIPv6($ip)) { push(@{$list->{6}},$ip) } 2041 } 2042 2043 return $list; 2044 } 2045 2046 ########################################################################## 2047 # Emulates getaddrinfo() in ipv6 mode 2048 sub _GetAddrFoo { 2049 my($self,$ip,$port,$af,$rqproto) = @_; 2050 2051 my ($family,$socktype,$proto,$sin) = undef; 2052 2053 $socktype = ($rqproto eq 'udp' ? SOCK_DGRAM : ($rqproto eq 'tcp' ? SOCK_STREAM : $self->panic("Invalid proto: $rqproto")) ); 2054 2055 if($self->HaveIPv6) { 2056 ($family, $socktype, $proto, $sin) = Socket6::getaddrinfo($ip,$port,$af,$socktype); 2057 } 2058 else { 2059 $family = IO::Socket::AF_INET; 2060 # 17 is UDP // 6 is TCP 2061 $proto = ($socktype == (SOCK_DGRAM) ? 17 : ($socktype == (SOCK_STREAM) ? 6 : $self->panic("Invalid socktype: $socktype"))); 2062 eval { $sin = sockaddr_in($port,inet_aton($ip)); }; 2063 } 2064 return($family,$socktype,$proto,$sin); 2065 } 2066 2067 ########################################################################## 2068 # Set O_NONBLOCK on socket 2069 sub Unblock { 2070 my($self, $cfh) = @_; 2071 $self->panic("No filehandle given") unless $cfh; 2072 my $flags = fcntl($cfh, F_GETFL, 0) 2073 or return undef; 2074 fcntl($cfh, F_SETFL, $flags | O_NONBLOCK) 2075 or return undef; 2076 return 1; 2077 } 2078 2079 ########################################################################## 2080 # Returns TRUE if we are running with IPv6 support 2081 sub HaveIPv6 { 2082 return $_[0]->{have_ipv6}; 2083 } 2084 2085 ########################################################################## 2086 # Returns TRUE if we are running with IPv4 support 2087 sub HaveIPv4 { 2088 return (KILL_IPV4 ? 0 : 1); 2089 } 2090 2091 ########################################################################## 2092 # Returns TRUE if given string represents a valid IPv4 peeraddr 2093 sub IsValidIPv4 { 2094 my($self,$str) = @_; 2095 if(defined($str) && $str =~ /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/) { 2096 return 1; 2097 } 2098 return 0; 2099 } 2100 2101 ########################################################################## 2102 # Returns TRUE if given string represents a valid IPv6 peeraddr 2103 sub IsValidIPv6 { 2104 my($self,$str) = @_; 2105 if(defined($str) && $str =~ /^[a-f0-9:]+$/i) { # This is a very BAD regexp.. 2106 return 1; 2107 } 2108 return 0; 2109 } 2110 2111 ########################################################################## 2112 # Convert Pseudo-IPv6 to real IPv4 2113 sub SixToFour { 2114 my($self,$str) = @_; 2115 if($str =~ /^::ffff:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})$/) { 2116 return $1; 2117 } 2118 return undef; 2119 } 2120 2121 ########################################################################## 2122 # Returns TRUE if given IP is a 'real' IPv6 IP 2123 sub IsNativeIPv6 { 2124 my($self,$str) = @_; 2125 2126 if($self->IsValidIPv6($str) && !$self->SixToFour($str)) { 2127 return 1; 2128 } 2129 return 0; 2130 } 2131 2132 ########################################################################## 2133 # Return TRUE if given string seems to be a *native* (non sixto4) ip 2134 sub IsNativeIPv4 { 2135 my($self,$str) = @_; 2136 return 1 if $str =~ /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/; 2137 return 0; 2138 } 2139 2140 2141 ######################################################################## 2142 # 'expands' a shorted IPv6 using sloppy code 2143 sub ExpandIpV6 { 2144 my($self,$ip) = @_; 2145 my $addrlen = 8; 2146 my @buff = (0,0,0,0,0,0,0,0); 2147 my @ipend = (); 2148 my $cnt = 0; 2149 my $drp = undef; 2150 foreach my $item (split(':',$ip)) { 2151 if(!defined $drp) { 2152 ( $item eq '' ? ($drp=$cnt): ($buff[$cnt] = hex($item)) ); 2153 } 2154 else { 2155 push(@ipend, hex($item)); 2156 } 2157 last if ++$cnt == $addrlen; 2158 } 2159 2160 if(defined($drp)) { 2161 # We got some piggyback data: 2162 my $e_len = int(@ipend); # Number of items 2163 my $offset = $addrlen-$e_len; # Offset to use 2164 if($offset >= 0) { 2165 for($offset..($addrlen-1)) { 2166 $buff[$_] = shift(@ipend); 2167 } 2168 } 2169 } 2170 return ( wantarray ? (@buff) : (join(':',map(sprintf("%x",$_),@buff))) ); # Do not change the sprintf() call as it must be consinstent with DecodeCompactIpV6 2171 } 2172 2173 2174 ########################################################################## 2175 # Refresh buffered time 2176 sub SetTime { 2177 my($self) = @_; 2178 my $NOW = time(); 2179 2180 if($NOW > $self->{NOWTIME}) { 2181 $self->{NOWTIME} = $NOW; 2182 $self->{bpx_dn} = ( $self->{super}->Configuration->GetValue('downspeed')*MAGIC_DSNUM || undef ); 2183 } 2184 elsif($NOW < $self->{NOWTIME}) { 2185 $self->warn("Clock jumped backwards! Returning last known good time..."); 2186 } 2187 2188 Danga::Socket->AddTimer(1, sub{$self->SetTime}); 2189 } 2190 2191 ########################################################################## 2192 # Returns buffered time 2193 sub GetTime { 2194 my($self) = @_; 2195 return $self->{NOWTIME}; 2196 } 2197 2198 ########################################################################## 2199 # Returns bandwidth statistics 2200 sub GetStats { 2201 my($self) = @_; 2202 return $self->{stats}; 2203 } 2204 2205 2206 ########################################################################## 2207 # Try to create a new listening socket 2208 # NewTcpListen(ID=>UniqueueRunnerId, Port=>PortToListen, Bind=>IPv4ToBind, Callbacks => {}) 2209 sub NewTcpListen { 2210 my($self,%args) = @_; 2211 2212 my $handle_id = $args{ID} or $self->panic("No Handle ID?"); 2213 my $maxpeers = $args{MaxPeers}; 2214 my $port = $args{Port}; 2215 my $bindto = $args{Bind}; 2216 my $cbacks = $args{Callbacks}; 2217 my $dnth = $args{DownThrottle}; 2218 my $socket = 0; 2219 2220 if(exists($self->{_HANDLES}->{$handle_id})) { 2221 $self->panic("Cannot register multiple versions of handle_id $handle_id"); 2222 } 2223 elsif($maxpeers < 1) { 2224 $self->panic("$handle_id: MaxPeers cannot be < 1 (is: $maxpeers)"); 2225 } 2226 elsif($port) { 2227 my %sargs = (LocalPort=>$port, LocalAddr=>$bindto, Proto=>'tcp', ReuseAddr=>1, Listen=>1024); 2228 $socket = ( $self->HaveIPv6 ? IO::Socket::INET6->new(%sargs) : IO::Socket::INET->new(%sargs) ) or return undef; 2229 } 2230 2231 $self->{_HANDLES}->{$handle_id} = { lsock => $socket, downthrottle=>$dnth, cbacks=>$cbacks, avpeers=>$maxpeers, blacklist=>{pointer=>0,array=>[],bldb=>{}} }; 2232 2233 if($socket) { 2234 my $dsock = Bitflu::Network::Danga->new(sock=>$socket, on_read_ready => sub { $self->_TCP_Accept(shift) } ) or $self->panic; 2235 $self->{_SOCKETS}->{$socket} = { dsock => $dsock, handle=>$handle_id }; 2236 } 2237 2238 return $socket; 2239 } 2240 2241 sub NewUdpListen { 2242 my($self,%args) = @_; 2243 my $handle_id = $args{ID} or $self->panic("No Handle ID?"); 2244 my $port = $args{Port}; 2245 my $bindto = $args{Bind}; 2246 my $cbacks = $args{Callbacks}; 2247 my $socket = 0; 2248 2249 if(exists($self->{_HANDLES}->{$handle_id})) { 2250 $self->panic("Cannot register multiple versions of handle_id $handle_id"); 2251 } 2252 elsif($port) { 2253 my %sargs = (LocalPort=>$port, LocalAddr=>$bindto, Proto=>'udp'); 2254 $socket = ( $self->HaveIPv6 ? IO::Socket::INET6->new(%sargs) : IO::Socket::INET->new(%sargs) ) or return undef; 2255 } 2256 2257 $self->{_HANDLES}->{$handle_id} = { lsock => $socket, cbacks=>$cbacks, blacklist=>{pointer=>0,array=>[],bldb=>{}} }; 2258 2259 if($socket) { 2260 my $dsock = Bitflu::Network::Danga->new(sock=>$socket, on_read_ready => sub { $self->_UDP_Read(shift) } ) or $self->panic; 2261 $self->{_SOCKETS}->{$socket} = { dsock => $dsock, handle=>$handle_id }; 2262 } 2263 2264 return $socket; 2265 } 2266 2267 sub SendUdp { 2268 my($self, $socket, %args) = @_; 2269 my $ip = $args{RemoteIp} or $self->panic("No IP given"); 2270 my $port = $args{Port} or $self->panic("No Port given"); 2271 my $id = $args{ID} or $self->panic("No ID given"); 2272 my $data = $args{Data}; 2273 2274 if(KILL_IPV4 && $self->IsNativeIPv4($ip)) { 2275 $self->debug("udp: will not send data to ipv4 $ip"); 2276 return undef; 2277 } 2278 else { 2279 my @af = $self->_GetAddrFoo($ip,$port,AF_UNSPEC,'udp'); 2280 my $sin = $af[3] or return undef; 2281 my $bs = send($socket,$data,0,$sin); 2282 return $bs; 2283 } 2284 } 2285 2286 sub RemoveSocket { 2287 my($self,$handle_id,$sock) = @_; 2288 $self->debug("RemoveSocket($sock)") if NETDEBUG; 2289 my $sref = delete($self->{_SOCKETS}->{$sock}) or $self->panic("$sock was not registered?!"); 2290 my $hxref= $self->{_HANDLES}->{$handle_id} or $self->panic("No handle reference for $handle_id !"); 2291 delete($self->{up_q}->{$sock}); 2292 delete($self->{stagger}->{$sref->{dsock}}); 2293 $sref->{dsock}->close; 2294 $self->{avfds}++; 2295 $hxref->{avpeers}++; 2296 return 1; 2297 } 2298 2299 sub WriteDataNow { 2300 my($self,$sock,$data) = @_; 2301 return $self->_WriteReal($sock,$data,1); 2302 } 2303 2304 sub WriteData { 2305 my($self,$sock,$data) = @_; 2306 return $self->_WriteReal($sock,$data,0); 2307 } 2308 2309 sub _WriteReal { 2310 my($self,$sock,$data,$fast) = @_; 2311 2312 my $sref = $self->{_SOCKETS}->{$sock} or $self->panic("$sock has no _SOCKET entry!"); 2313 my $this_len = length($data); 2314 2315 2316 if($self->GetQueueLen($sock) > MAXONWIRE) { 2317 $self->warn("Buffer overrun for <$sock>: Too much unsent data!"); 2318 return 1; 2319 } 2320 2321 2322 $sref->{writeq} .= $data; 2323 $sref->{qlen} += $this_len; 2324 2325 if($fast==0 && exists($self->{up_q}->{$sock})) { 2326 # -> Still waiting for a timer 2327 $self->debug("$sock is throttled - waiting for a flush") if NETDEBUG; 2328 } 2329 else { 2330 2331 if($sref->{dsock}->{write_buf_size} == 0 && defined($sref->{dsock}->peer_ip_string)) { # peer_ip_string is needed on FreeBSD 2332 # Socket is empty and ready 2333 my $bpx_up = ($fast? BF_BUFSIZ : $self->{bpx_up}); 2334 my $sendable = ($sref->{qlen} < $bpx_up ? $sref->{qlen} : $bpx_up ); 2335 my $chunk = substr($sref->{writeq},0,$sendable); 2336 $sref->{writeq} = substr($sref->{writeq},$sendable); 2337 $sref->{qlen} -= $sendable; 2338 2339 $self->{stats}->{raw_sent} += $sendable if $fast==0; 2340 2341 # Actually write data: 2342 $sref->{dsock}->write(\$chunk); 2343 2344 $self->debug("$sock has $sref->{qlen} bytes outstanding (sending: $sendable :: $fast ) ") if NETDEBUG; 2345 2346 if(!$sref->{dsock}->sock) { 2347 $self->debug("$sock went away while writing to it ($!) , scheduling kill timer"); 2348 # Fake a 'connection timeout' -> This goes trough the whole kill-chain so it should be save 2349 Danga::Socket->AddTimer(0, sub { $self->_TCP_LazyClose($sref->{dsock},$sock); }); 2350 return 1; # do not add a new timer (wouldn't hurt but it's of no use) 2351 } 2352 2353 } 2354 2355 if($self->GetQueueLen($sock)) { 2356 $self->{up_q}->{$sock} = [$sock,'',$fast]; 2357 } 2358 2359 } 2360 2361 return 1; 2362 } 2363 2364 2365 ########################################################################## 2366 # Returns TRUE if socket is an INCOMING connection 2367 sub IsIncoming { 2368 my($self,$socket) = @_; 2369 my $val = $self->{_SOCKETS}->{$socket}->{incoming}; 2370 $self->panic("$socket has no incoming value!") unless defined($val); 2371 return $val; 2372 } 2373 2374 ########################################################################## 2375 # Returns last IO for given socket 2376 sub GetLastIO { 2377 my($self,$socket) = @_; 2378 my $val = $self->{_SOCKETS}->{$socket}->{lastio}; 2379 $self->panic("$socket has no lastio value!") unless defined($val); 2380 return $val; 2381 } 2382 2383 sub GetQueueFree { 2384 my($self,$sock) = @_; 2385 my $xfree = ((MAXONWIRE)-$self->GetQueueLen($sock)); 2386 return ((MAXONWIRE)-$self->GetQueueLen($sock)); 2387 } 2388 2389 sub GetQueueLen { 2390 my($self,$sock) = @_; 2391 my $sref = $self->{_SOCKETS}->{$sock} or $self->panic("$sock has no _SOCKET entry!"); 2392 return $sref->{dsock}->{write_buf_size}+$sref->{qlen}; 2393 } 2394 2395 sub NewTcpConnection { 2396 my($self,%args) = @_; 2397 2398 my $handle_id = $args{ID} or $self->panic("No Handle ID?"); 2399 my $hxref = $self->{_HANDLES}->{$handle_id} or $self->panic("No Handle reference for $handle_id"); 2400 my $remote_ip = $args{RemoteIp}; 2401 my $port = $args{Port}; 2402 my $new_sock = undef; 2403 2404 if($self->{avfds} < 1) { 2405 return undef; # No more FDs :�-( 2406 } 2407 elsif($hxref->{avpeers} < 1) { 2408 return undef; # Handle is full 2409 } 2410 2411 if(exists($args{Hostname})) { 2412 # -> Resolve 2413 my @xresolved = $self->Resolve($args{Hostname}); 2414 unless( ($remote_ip = $xresolved[0] ) ) { 2415 $self->warn("Cannot resolve $args{Hostname}"); 2416 return undef; 2417 } 2418 } 2419 2420 if($self->IpIsBlacklisted($handle_id,$remote_ip)) { 2421 $self->debug("Won't connect to blacklisted IP $remote_ip"); 2422 return undef; 2423 } 2424 if(KILL_IPV4 && $self->IsNativeIPv4($remote_ip)) { 2425 $self->debug("Will not connect to IPv4 addr $remote_ip"); 2426 return undef; 2427 } 2428 2429 my($sx_family, $sx_socktype, $sx_proto, $sin) = $self->_GetAddrFoo($remote_ip,$port,AF_UNSPEC, 'tcp'); 2430 2431 if(defined($sin) && socket($new_sock, $sx_family, $sx_socktype, $sx_proto) ) { 2432 $self->Unblock($new_sock) or $self->panic("Failed to unblock <$new_sock> : $!"); 2433 connect($new_sock,$sin); 2434 } 2435 else { 2436 $self->warn("Unable to create socket for $remote_ip:$port (error: $!)"); 2437 return undef; 2438 } 2439 my $new_dsock = Bitflu::Network::Danga->new(sock=>$new_sock, on_read_ready => sub { $self->_TCP_Read(shift); }, 2440 on_fclose=> sub { $self->_TCP_LazyClose(shift,shift) } ) or $self->panic; 2441 $self->{_SOCKETS}->{$new_sock} = { dsock => $new_dsock, peerip=>$remote_ip, handle=>$handle_id, incoming=>0, lastio=>$self->GetTime, writeq=>'', qlen=>0 }; 2442 $self->{avfds}--; 2443 $hxref->{avpeers}--; 2444 $self->debug("<< $new_dsock -> $remote_ip ($new_sock)") if NETDEBUG; 2445 Danga::Socket->AddTimer(15, sub { $self->_TCP_LazyClose($new_dsock,$new_sock) }); 2446 2447 return $new_sock; 2448 } 2449 2450 sub _TCP_LazyClose { 2451 my($self,$dsock,$xglob) = @_; 2452 if( (!$dsock->sock or !$dsock->peer_ip_string) && exists($self->{_SOCKETS}->{$xglob} ) ) { 2453 $self->warn("<$xglob> is not connected yet, killing it : ".$dsock->sock) if NETDEBUG; 2454 my $sref = $self->{_SOCKETS}->{$xglob} or $self->panic("<$xglob> is not registered!"); 2455 my $handle_id = $sref->{handle} or $self->panic("$xglob has no handle!"); 2456 my $cbacks = $self->{_HANDLES}->{$handle_id}->{cbacks}; 2457 if(my $cbn = $cbacks->{Close}) { $handle_id->$cbn($xglob); } 2458 $self->RemoveSocket($handle_id,$xglob); 2459 } 2460 } 2461 2462 sub _TCP_Accept { 2463 my($self, $dsock) = @_; 2464 2465 my $new_sock = $dsock->sock->accept; 2466 my $new_ip = 0; 2467 my $handle_id = $self->{_SOCKETS}->{$dsock->sock}->{handle} or $self->panic("No handle id?"); 2468 my $hxref = $self->{_HANDLES}->{$handle_id} or $self->panic("No handle reference for $handle_id"); 2469 my $cbacks = $hxref->{cbacks}; 2470 2471 unless($new_sock) { 2472 $self->warn("accept() call failed?!"); 2473 } 2474 elsif(! ($new_ip = $new_sock->peerhost) ) { 2475 $self->debug("No IP for $new_sock"); 2476 $new_sock->close; 2477 } 2478 elsif( $self->SixToFour($new_ip) && ($new_ip = $self->SixToFour($new_ip)) && 0 ) { 2479 # nil -> convert $new_ip to native ipv4 and continue elsif in any case 2480 } 2481 elsif($self->IpIsBlacklisted($handle_id, $new_ip)) { 2482 $self->debug("Refusing incoming connection from blacklisted ip $new_ip"); 2483 $new_sock->close; 2484 } 2485 elsif(KILL_IPV4 && $self->IsNativeIPv4($new_ip)) { 2486 $self->debug("Dropping new IPv4 connection from $new_ip"); 2487 $new_sock->close; 2488 } 2489 elsif($self->{avfds} < 1) { 2490 $self->warn("running out of filedescriptors, refusing incoming TCP connection"); 2491 $new_sock->close; 2492 } 2493 elsif($hxref->{avpeers} < 1) { 2494 $self->warn("$handle_id : is full: won't accept new peers"); 2495 $new_sock->close; 2496 } 2497 elsif(!$self->Unblock($new_sock)) { 2498 $self->panic("Failed to unblock $new_sock : $!"); 2499 } 2500 else { 2501 my $new_dsock = Bitflu::Network::Danga->new(sock=>$new_sock, on_read_ready => sub { $self->_TCP_Read(shift); }, 2502 on_fclose=> sub { $self->_TCP_LazyClose(shift,shift) } ) or $self->panic; 2503 $self->warn(">> ".$new_dsock->sock." -> ".$new_ip) if NETDEBUG; 2504 $self->{_SOCKETS}->{$new_dsock->sock} = { dsock => $new_dsock, peerip=>$new_ip, handle=>$handle_id, incoming=>1, lastio=>$self->GetTime, writeq=>'', qlen=>0 }; 2505 $self->{avfds}--; 2506 $hxref->{avpeers}--; 2507 if(my $cbn = $cbacks->{Accept}) { $handle_id->$cbn($new_dsock->sock,$new_ip); } 2508 } 2509 } 2510 2511 2512 sub _TCP_Read { 2513 my($self, $dsock) = @_; 2514 2515 my $sref = $self->{_SOCKETS}->{$dsock->sock} or $self->panic("Sock ".$dsock->sock." on $dsock not registered?"); 2516 my $handle_id = $sref->{handle} or $self->panic("No handle id?"); 2517 my $cbacks = $self->{_HANDLES}->{$handle_id}->{cbacks}; 2518 my $dnth = $self->{_HANDLES}->{$handle_id}->{downthrottle}; 2519 my $overflow = ( defined($self->{bpx_dn}) && $self->{bpx_dn} < 1 ? 1 : 0 ); 2520 2521 2522 my $rref = $dsock->read(BF_BUFSIZ); 2523 2524 if(!defined($rref)) { 2525 $self->debug("Reading from $dsock returned nothing, closing! -> ".$dsock->sock); 2526 if(my $cbn = $cbacks->{Close}) { $handle_id->$cbn($dsock->sock); } 2527 $self->RemoveSocket($handle_id,$dsock->sock); 2528 } 2529 else { 2530 my $len = length($$rref); 2531 $sref->{lastio} = $self->GetTime; 2532 $self->{stats}->{raw_recv} += $len; 2533 $self->{bpx_dn} -= $len if defined($self->{bpx_dn}); 2534 $self->debug("RECV $len from ".$dsock->sock) if NETDEBUG; 2535 2536 # add for staggering if downthrottle is set. 2537 # note: must be BEFORE we do the callback! (cb could close the sock!) 2538 if($dnth) { 2539 if(exists($self->{stagger}->{$dsock})) { 2540 if($len==0 or (!$overflow && rand(10) > 7)) { 2541 delete($self->{stagger}->{$dsock}) or $self->panic; 2542 $dsock->watch_read(1); 2543 } 2544 } 2545 elsif($overflow) { 2546 $self->{stagger}->{$dsock} = $dsock; 2547 $dsock->watch_read(0); 2548 } 2549 } 2550 2551 if(my $cbn = $cbacks->{Data}) { $handle_id->$cbn($dsock->sock, $rref, $len); } 2552 } 2553 } 2554 2555 sub _UDP_Read { 2556 my($self,$dsock) = @_; 2557 2558 my $sock = $dsock->sock or $self->panic("No socket?"); 2559 my $sref = $self->{_SOCKETS}->{$sock} or $self->panic("$sock has no _SOCKETS entry!"); 2560 my $handle_id = $sref->{handle} or $self->panic("$sock has no handle in _SOCKETS!"); 2561 my $cbacks = $self->{_HANDLES}->{$handle_id}->{cbacks}; 2562 my $new_ip = ''; 2563 my $new_port = 0; 2564 my $buffer = undef; 2565 2566 $sock->recv($buffer,BF_BUFSIZ); 2567 2568 if(!($new_ip = $sock->peerhost)) { 2569 # Weirdo.. 2570 $self->debug("<$sock> had no peerhost, data dropped"); 2571 } 2572 elsif(!($new_port = $sock->peerport)) { 2573 $self->debug("<$sock> had no peerport, data dropped"); 2574 } 2575 elsif( $self->SixToFour($new_ip) && ($new_ip = $self->SixToFour($new_ip)) && 0 ) { 2576 # nil -> convert $new_ip to native ipv4 and continue elsif in any case 2577 } 2578 elsif(KILL_IPV4 && $self->IsNativeIPv4($new_ip)) { 2579 $self->debug("udp: dropping incoming datagram from $new_ip"); 2580 } 2581 elsif(my $cbn = $cbacks->{Data}) { 2582 $handle_id->$cbn($sock, \$buffer, $new_ip, $new_port); 2583 } 2584 } 2585 2586 sub _UpdateNetstats { 2587 my($self) = @_; 2588 2589 # calculate our current up/download speed 2590 foreach my $kw (qw(sent recv)) { 2591 $self->{stats}->{$kw} = ( ($self->{stats}->{"raw_$kw"}/NETSTATS)*2 + $self->{stats}->{$kw} ) / 3; 2592 $self->{stats}->{"raw_$kw"} = 0; 2593 } 2594 2595 2596 my $want_up = $self->{super}->Configuration->GetValue('upspeed') * 1024; 2597 my $got_up = ($self->{stats}->{sent} || 1); 2598 my $multiply = $want_up / $got_up; 2599 my $up_diff = abs($want_up - $got_up); 2600 2601 if($up_diff < 500) { # good enough - let it be as it is 2602 $multiply = 1; 2603 } 2604 elsif($multiply > 20) { # we are way off - allow doubling our upspeed 2605 $multiply = 2; 2606 } 2607 elsif($multiply > 1.3) { # more or less ok - but don't go up too fast 2608 $multiply = 1.3; 2609 } 2610 2611 $self->debug("got=$got_up, want=$want_up, mp=$multiply, bps=$self->{bpx_up}, diff=$up_diff"); 2612 2613 $self->{bpx_up} = int($self->{bpx_up} * $multiply); 2614 $self->{bpx_up} = BF_BUFSIZ if !$want_up or $self->{bpx_up} > BF_BUFSIZ; 2615 $self->{bpx_up} = BPS_MIN if $self->{bpx_up} < BPS_MIN; 2616 2617 2618 return 1; 2619 } 2620 2621 2622 sub BlacklistIp { 2623 my($self, $handle_id, $this_ip, $ttl) = @_; 2624 2625 my $xbl = $self->{_HANDLES}->{$handle_id}->{blacklist} or $self->panic("$handle_id was not registered!"); 2626 $ttl = BLIST_TTL unless $ttl; 2627 2628 if($self->IsNativeIPv6($this_ip)) { 2629 $this_ip = $self->ExpandIpV6($this_ip); 2630 } 2631 2632 unless($self->IpIsBlacklisted($handle_id, $this_ip)) { 2633 my $pointer = ( $xbl->{pointer} >= BLIST_LIMIT ? 0 : $xbl->{pointer}); 2634 # Ditch old entry 2635 my $oldkey = $xbl->{array}->[$pointer]; 2636 defined($oldkey) and delete($xbl->{bldb}->{$oldkey}); 2637 $xbl->{array}->[$pointer] = $this_ip; 2638 $xbl->{bldb}->{$this_ip} = $self->GetTime + $ttl; 2639 $xbl->{pointer} = 1+$pointer; 2640 } 2641 } 2642 2643 sub IpIsBlacklisted { 2644 my($self, $handle_id, $this_ip) = @_; 2645 2646 my $xbl = $self->{_HANDLES}->{$handle_id}->{blacklist} or $self->panic("$handle_id was not registered!"); 2647 2648 if($self->IsNativeIPv6($this_ip)) { 2649 $this_ip = $self->ExpandIpV6($this_ip); 2650 } 2651 2652 if(exists($xbl->{bldb}->{$this_ip}) && $self->GetTime < $xbl->{bldb}->{$this_ip}) { 2653 return 1; 2654 } 2655 else { 2656 return 0; 2657 } 2658 } 2659 2660 2661 sub debug { my($self, $msg) = @_; $self->{super}->debug("Network : ".$msg); } 2662 sub info { my($self, $msg) = @_; $self->{super}->info("Network : ".$msg); } 2663 sub warn { my($self, $msg) = @_; $self->{super}->warn("Network : ".$msg); } 2664 sub panic { my($self, $msg) = @_; $self->{super}->panic("Network : ".$msg); } 2665 sub stop { my($self, $msg) = @_; $self->{super}->stop("Network : ".$msg); } 26661; 2667 2668############################################################################################################### 2669# Danga-Socket event dispatcher 2670package Bitflu::Network::Danga; 2671 use strict; 2672 use base qw(Danga::Socket); 2673 use fields qw(on_read_ready on_error on_hup on_fclose); 2674 2675 sub new { 2676 my($self,%args) = @_; 2677 $self = fields::new($self) unless ref $self; 2678 $self->SUPER::new($args{sock}); 2679 2680 foreach my $field (qw(on_read_ready on_error on_hup on_fclose)) { 2681 $self->{$field} = $args{$field} if $args{$field}; 2682 } 2683 2684 $self->watch_read(1) if $args{on_read_ready}; # Watch out for read events 2685 return $self; 2686 } 2687 2688 sub event_read { 2689 my($self) = @_; 2690 if(my $cx = $self->{on_read_ready}) { 2691 return $cx->($self); 2692 } 2693 } 2694 2695 sub event_err { 2696 my($self) = @_; 2697 if(my $cx = ($self->{on_error}||$self->{on_read_ready})) { 2698 return $cx->($self); 2699 } 2700 } 2701 2702 sub event_hup { 2703 my($self) = @_; 2704 if(my $cx = ($self->{on_hup}||$self->{on_read_ready})) { 2705 return $cx->($self); 2706 } 2707 } 2708 2709 sub event_write { 2710 my($self) = @_; 2711 my $os = $self->sock; 2712 my $rv = $self->write(undef); 2713# warn "!! ZERO RETURN WHILE WRITING TO $self - sock was $os , sock is ".$self->sock."\n" if !$rv; 2714 if(!$self->sock && (my $cx = ($self->{on_fclose}))) { 2715 return $cx->($self,$os); 2716 } 2717 } 2718 27191; 2720 2721 2722############################################################################################################### 2723# Builtin Configuration parser / class 2724package Bitflu::Configuration; 2725use strict; 2726 2727 2728 sub new { 2729 my($class, %args) = @_; 2730 my $self = { configuration_file => $args{configuration_file}, super=> $args{super} }; 2731 bless($self, $class); 2732 2733 unless(-f $self->{configuration_file}) { 2734 warn("-> Creating configuration file '$self->{configuration_file}'"); 2735 open(CFGH, ">", $self->{configuration_file}) or die("Unable to create $self->{configuration_file}: $!\n"); 2736 close(CFGH); 2737 } 2738 2739 if (-f $self->{configuration_file}) { 2740 open(CFGH, "+<", $self->{configuration_file}) or die("Unable to open $self->{configuration_file} for writing: $!\n"); 2741 $self->{configuration_fh} = *CFGH; 2742 2743 if($< == 0) { 2744 if($self->_IsWritableByNonRoot($self->{configuration_fh})) { 2745 warn "\tWARNING: You started $0 as root, but the configuration\n"; 2746 warn "\tWARNING: file at '$self->{configuration_file}' is WRITABLE by\n"; 2747 warn "\tWARNING: other users! You should fix the permissions of the\n"; 2748 warn "\tWARNING: configuration file and it's directory.\n"; 2749 } 2750 } 2751 2752 # Try to create a backup 2753 if( open(BKUP, ">", $self->{configuration_file}.".backup") ) { 2754 while(<CFGH>) { print BKUP; } 2755 close(BKUP); 2756 } 2757 2758 } 2759 2760 # Load the configuration ASAP to get logging working: 2761 $self->Load; 2762 return $self; 2763 } 2764 2765 sub init { 2766 my($self) = @_; 2767 $self->{super}->Admin->RegisterCommand('config', $self, '_Command_config', 'Configure bitflu while running. Type \'help config\' for more information', 2768 [ [undef, "Usage: config show|get key|set key"], 2769 [undef, "config show : Display contents of .bitflu.config"], 2770 [undef, "config get key : Display value of 'key'. Example: 'config get upspeed'"], 2771 [undef, "config set key foo : Changes value of 'key' to 'foo'. Example: 'config set upspeed 45'"], 2772 [undef, ""], 2773 [1, "NOTE: Certain options (like telnet_port) would require a restart of $0 and cannot be changed"], 2774 [1, " using the 'config' command. To edit such options you'll need to stop bitflu and edit .bitflu.config"], 2775 [1, " using a text editor."], 2776 ] 2777 ); 2778 return 1; 2779 } 2780 2781 sub _Command_config { 2782 my($self,@args) = @_; 2783 my $msg = undef; 2784 my $action = $args[0]; 2785 my $key = $args[1]; 2786 my $value = $args[2]; 2787 my @A = (); 2788 my $NOEXEC = ''; 2789 if($action eq "show") { 2790 foreach my $k (sort keys(%{$self->{conf}})) { 2791 push(@A, [ ($self->IsRuntimeLocked($k) ? 3 : undef), sprintf("%-24s => %s",$k, $self->{conf}->{$k})]); 2792 } 2793 } 2794 elsif($action eq "get" && defined($key)) { 2795 my $xval = $self->GetValue($key); 2796 if(defined($xval)) { 2797 push(@A, [undef, "$key => $xval"]);; 2798 } 2799 else { 2800 push(@A, [2, "$key is not set"]); 2801 } 2802 } 2803 elsif($action eq "set" && defined($value)) { 2804 if(defined($self->GetValue($key))) { 2805 if($self->SetValue($key, $value)) { push(@A, [undef, "'$key' set to '$value'"]); $self->Save; } 2806 else { push(@A, [2, "Unable to change value of $key at runtime"]); } 2807 } 2808 else { 2809 push(@A, [2, "Option '$key' does not exist"]); 2810 } 2811 } 2812 else { 2813 $NOEXEC .= "Usage error, type 'help config' for more information"; 2814 } 2815 return{MSG=>\@A, SCRAP=>[], NOEXEC=>$NOEXEC}; 2816 } 2817 2818 2819 sub Load { 2820 my($self) = @_; 2821 $self->SetDefaults(); 2822 if(defined(my $cfh = $self->{configuration_fh})) { 2823 seek($cfh,0,0) or $self->panic("Unable to seek to beginning"); 2824 my $conf = $self->{super}->Tools->CBxToRef(join('', <$cfh>)); 2825 while(my($k,$v) = each(%$conf)) { 2826 $self->{conf}->{$k} = $v; 2827 } 2828 } 2829 2830 # Remove obsoleted/ignored config settings 2831 foreach my $legacy_setting (qw(telnet_view)) { 2832 delete($self->{conf}->{$legacy_setting}); 2833 } 2834 2835 } 2836 2837 sub SetDefaults { 2838 my($self) = @_; 2839 $self->{conf}->{plugindir} = '/usr/local/share/bitflu/plugins'; 2840 $self->{conf}->{pluginexclude} = ''; 2841 $self->{conf}->{workdir} = "$ENV{HOME}/.bitflu.workdir"; 2842 $self->{conf}->{upspeed} = 35; 2843 $self->{conf}->{downspeed} = 0; 2844 $self->{conf}->{loglevel} = 5; 2845 $self->{conf}->{renice} = 8; 2846 $self->{conf}->{logfile} = ''; 2847 $self->{conf}->{pidfile} = ''; 2848 $self->{conf}->{chdir} = ''; 2849 $self->{conf}->{history} = 1; 2850 $self->{conf}->{ipv6} = 1; 2851 $self->{conf}->{storage} = 'StorageVFS'; 2852 foreach my $opt (qw(ipv6 renice plugindir pluginexclude workdir logfile storage chdir pidfile)) { 2853 $self->RuntimeLockValue($opt); 2854 } 2855 } 2856 2857 sub Save { 2858 my($self) = @_; 2859 my $cfh = $self->{configuration_fh} or return undef; 2860 seek($cfh,0,0) or $self->panic("Unable to seek to beginning"); 2861 truncate($cfh,0) or $self->panic("Unable to truncate configuration file"); 2862 print $cfh $self->{super}->Tools->RefToCBx($self->{conf}); 2863 $cfh->autoflush(1); 2864 } 2865 2866 sub RuntimeLockValue { 2867 my($self,$xkey) = @_; 2868 return($self->{conf_setlock}->{$xkey} = 1); 2869 } 2870 2871 sub IsRuntimeLocked { 2872 my($self, $xkey) = @_; 2873 return(defined($self->{conf_setlock}->{$xkey})); 2874 } 2875 2876 sub GetKeys { 2877 my($self) = @_; 2878 return (sort keys(%{$self->{conf}})); 2879 } 2880 2881 sub GetValue { 2882 my($self,$xkey) = @_; 2883 return($self->{conf}->{$xkey}); 2884 } 2885 2886 sub SetValue { 2887 my($self,$xkey,$xval) = @_; 2888 return undef if defined($self->{conf_setlock}->{$xkey}); 2889 $self->{conf}->{$xkey} = $xval; 2890 $self->Save; 2891 return 1; 2892 } 2893 2894 sub _IsWritableByNonRoot { 2895 my($self,$file) = @_; 2896 my @stat = stat($file) or die "Could not stat '$file' : $!\n"; 2897 return 1 if ($stat[2]&0002 or ($stat[5]!=0 && $stat[2]&0020) or ($stat[4] !=0) ); # uid can always chmod -> bad! 2898 return 0; 2899 } 2900 2901 2902 sub debug { my($self, $msg) = @_; $self->{super}->debug("Config : ".$msg); } 2903 sub info { my($self, $msg) = @_; $self->{super}->info("Config : ".$msg); } 2904 sub warn { my($self, $msg) = @_; $self->{super}->warn("Config : ".$msg); } 2905 sub panic { my($self, $msg) = @_; $self->{super}->panic("Config : ".$msg); } 2906 29071; 2908 2909################################################################################################ 2910# Implements Simple-eXecution Tasks 2911package Bitflu::SxTask; 2912 2913 sub new { 2914 my($classname,%args) = @_; 2915 my $sx = { 2916 __super_=> $args{__SUPER_}, 2917 interval=> (exists($args{Interval}) ? $args{Interval} : 1), 2918 super => $args{Superclass}, 2919 cback => $args{Callback}, 2920 args => $args{Args}, 2921 }; 2922 bless($sx,$classname); 2923 $sx->{_} = \$sx; 2924 return $sx; 2925 } 2926 2927 sub run { 2928 my($self) = @_; 2929 my $cbx = $self->{cback}; 2930 my $rv = $self->{super}->$cbx(@{$self->{args}}); 2931 $self->destroy if $rv == 0; 2932 return $self->{interval}; 2933 } 2934 2935 sub destroy { 2936 my($self) = @_; 2937 $self->{__super_}->DestroySxTask($self); 2938 } 2939 29401; 2941 2942################################################################################################ 2943# Implement os-specific syscalls. 2944package Bitflu::Syscall; 2945 use strict; 2946 use POSIX; 2947 use Config; 2948 2949 sub new { 2950 my($classname, %args) = @_; 2951 2952 my $self = { super=>$args{super}, sc=>{} }; 2953 bless($self,$classname); 2954 return $self; 2955 } 2956 2957 sub init { 2958 my($self) = @_; 2959 # syscall 'prototype' 2960 my $syscalls = { 2961 'linux-x86_64' => { fallocate=>{ NR=>285, pfx=>[0,0] , pst=>[] }, statfs=>{ NR=>137, pack=>'Q', buff=>112, bsize=>1, total=>2, free=>4 } }, 2962 'linux-i386' => { fallocate=>{ NR=>324, pfx=>[0,0,0] , pst=>[0]}, statfs=>{ NR=>99 , pack=>'L', buff=>72, bsize=>1, total=>2, free=>4 } }, 2963 'freebsd-amd64' => { statfs=>{ NR=>396, pack=>'Q', buff=>64, bsize=>2, total=>4, free=>6 } }, 2964 'freebsd-i386' => { statfs=>{ NR=>396, pack=>'Q', buff=>64, bsize=>2, total=>4, free=>6 } }, 2965 }; 2966 2967 # try to detect runtime environment: 2968 my(undef, undef, undef, undef, $arch) = POSIX::uname(); 2969 my $osname = $^O; 2970 my $os_spec = ''; 2971 if ($osname eq "linux" or $osname eq "freebsd") { 2972 $arch = "i386" if $arch =~ /^i[3456]86$/; 2973 $arch = "i386" if ($arch eq 'x86_64' && $Config{ptrsize} == 4); #32bit perl on x86_64 2974 $os_spec = "$osname-$arch"; 2975 } 2976 2977 $self->{sc} = ( $syscalls->{$os_spec} || {} ); 2978 2979 $self->info("supported syscalls of $os_spec: ".( join(" ",keys(%{$self->{sc}})) || '(no syscalls supported!)' )); 2980 2981 return 1; 2982 } 2983 2984 ########################################################################## 2985 # Implements fallocate() syscall 2986 sub fallocate { 2987 my($self, $fh, $size) = @_; 2988 my $rv = undef; 2989 if(my $scr = $self->{sc}->{fallocate}) { 2990 $rv = syscall($scr->{NR},fileno($fh), @{$scr->{pfx}}, $size, @{$scr->{pst}}); 2991 } 2992 return $rv; 2993 } 2994 2995 ########################################################################## 2996 # Implements 'statfs' 2997 sub statfs { 2998 my($self, $path) = @_; 2999 3000 my $rv = undef; 3001 if(my $scr = $self->{sc}->{statfs}) { 3002 my $buff = '0' x $scr->{buff}; 3003 if( syscall($scr->{NR}, $path, $buff) == 0 ) { 3004 my @res = unpack("$scr->{pack}*", $buff); 3005 $rv = { bytes_total=>$res[$scr->{bsize}]*$res[$scr->{total}], bytes_free=>$res[$scr->{bsize}]*$res[$scr->{free}] }; 3006 } 3007 } 3008 return $rv; 3009 } 3010 3011 ########################################################################## 3012 # Shortcut to stat current set 'workdir' 3013 sub statworkdir { 3014 my($self) = @_; 3015 return $self->statfs($self->{super}->Configuration->GetValue('workdir')); 3016 } 3017 3018 3019 sub warn { my($self, $msg) = @_; $self->{super}->warn("Syscall : ".$msg); } 3020 sub debug { my($self, $msg) = @_; $self->{super}->debug("Syscall : ".$msg); } 3021 sub info { my($self, $msg) = @_; $self->{super}->info("Syscall : ".$msg); } 3022 30231; 3024 3025 3026################################################################################################ 3027# Bencoder lib 3028 3029package Bitflu::Bencoder; 3030 use strict; 3031 use constant DEBUG => 0; # debug decoder 3032 use constant MAX_STRSIZE => 1024*1024*5; # max stringsize that the decoder will do 3033 3034 3035 sub decode { 3036 $_ = $_[0]; 3037 3038 my $v = undef; 3039 goto PARSER_ERROR if length($_) == 0; 3040 $v = _decode(); 3041 3042 PARSER_ERROR: 3043 return $v; 3044 } 3045 3046 sub encode { 3047 my($ref) = @_; 3048 Carp::confess("encode(undef) called") unless $ref; 3049 return _encode($ref); 3050 } 3051 3052 3053 3054 sub _encode { 3055 my($ref) = @_; 3056 3057 Carp::cluck() unless defined $ref; 3058 3059 my $encoded = undef; 3060 my $reftype = ref($ref); 3061 3062 if($reftype eq "HASH") { 3063 $encoded .= "d"; 3064 foreach(sort keys(%$ref)) { 3065 $encoded .= length($_).":".$_; 3066 $encoded .= _encode($ref->{$_}); 3067 } 3068 $encoded .= "e"; 3069 } 3070 elsif($reftype eq "ARRAY") { 3071 $encoded .= "l"; 3072 foreach(@$ref) { 3073 $encoded .= _encode($_); 3074 } 3075 $encoded .= "e"; 3076 } 3077 elsif($ref =~ /^-?\d+$/) { 3078 $encoded .= "i".int($ref)."e"; 3079 } 3080 else { 3081 # -> String 3082 $ref = ${$ref} if $reftype eq "SCALAR"; # FORCED string 3083 $encoded .= length($ref).":".$ref; 3084 } 3085 return $encoded; 3086 } 3087 3088 3089 sub _decode { 3090 3091 print "ENTERING DX AT ".pos()." " if DEBUG; 3092 print ">> ".substr($_,pos(),1)."\n" if DEBUG; 3093 3094 if(m/\G\z/gc) { 3095 print "---- HIT EOF! ---\n" if DEBUG; 3096 goto PARSER_ERROR; 3097 } 3098 elsif(m/\G(0|[1-9]\d*):/gc) { # <LEN><:><VALUE> 3099 my $len = $1; 3100 my $pos = pos(); 3101 my $v = ( $len < MAX_STRSIZE ? substr($_, $pos, $len) : '' ); 3102 pos() = $pos+$len; 3103 goto PARSER_ERROR if length($v) != $len; 3104 return $v; 3105 } 3106 elsif(m/\Gd/gc) { #<d><DATA><e> 3107 3108 print "DICT AT ".pos()."\n" if DEBUG; 3109 3110 my $ref = {}; 3111 until(m/\Ge/gc) { 3112 my $k = scalar(_decode()); 3113 my $v = _decode(); 3114 print "+ DICT: $k = $v\n" if DEBUG; 3115 $ref->{$k} = $v; 3116 } 3117 return $ref; 3118 } 3119 elsif(m/\Gl/gc) { #<l><DATA><e> 3120 my @list = (); 3121 print "ARRAY AT ".pos()."\n" if DEBUG; 3122 3123 until(m/\Ge/gc) { 3124 print "+ ARRAY AT ".pos()."\n" if DEBUG; 3125 push(@list, _decode() ); 3126 } 3127 return \@list; 3128 } 3129 elsif(m/\Gi(0|-?[1-9]\d*)e/gc) { #<i><\d+><e> 3130 print "INTEGER AT ".pos()."\n" if DEBUG; 3131 return int($1); 3132 } 3133 else { 3134 print "No match at: ".pos()."\n" if DEBUG; 3135 goto PARSER_ERROR; 3136 } 3137 3138 } 3139 31401; 3141