1#!/usr/local/bin/perl -w
2#
3# This file is part of 'Bitflu' - (C) 2006-2011 Adrian Ulrich
4#
5# Released under the terms of The "Artistic License 2.0".
6# http://www.opensource.org/licenses/artistic-license-2.0.php
7#
8use strict;
9use Data::Dumper;
10use Getopt::Long;
11use Danga::Socket;
12
13my $bitflu_run           = undef;     # Start as not_running and not_killed
14my $getopts              = { help => undef, config => "$ENV{HOME}/.bitflu.config", version => undef, quiet=>undef };
15$SIG{PIPE}  = $SIG{CHLD} = 'IGNORE';
16$SIG{INT}   = $SIG{HUP}  = $SIG{TERM} = \&HandleShutdown;
17
18GetOptions($getopts, "help|h", "version", "plugins", "config=s", "daemon", "quiet|q") or exit 1;
19if($getopts->{help}) {
20	die << "EOF";
21Usage: $0 [--help --version --plugins --config=s --daemon --quiet]
22
23  -h, --help         print this help.
24      --version      display the version of bitflu and exit
25      --plugins      list all loaded plugins and exit
26      --config=file  use specified configuration file (default: .bitflu.config)
27      --daemon       run bitflu as a daemon
28  -q, --quiet        disable logging to standard output
29
30Example: $0 --config=/etc/bitflu.config --daemon
31
32Mail bug reports and suggestions to <adrian\@blinkenlights.ch>.
33EOF
34}
35
36
37# -> Create bitflu object
38my $bitflu = Bitflu->new(configuration_file=>$getopts->{config}) or Carp::confess("Unable to create Bitflu Object");
39if($getopts->{version}) { die $bitflu->_Command_Version->{MSG}->[0]->[1]."\n" }
40
41my @loaded_plugins = $bitflu->LoadPlugins('Bitflu');
42
43if($getopts->{plugins}) {
44	print "# Loaded Plugins: (from ".$bitflu->Configuration->GetValue('plugindir').")\n";
45	foreach (@loaded_plugins) { printf("File %-35s provides: %s\n", $_->{file}, $_->{package}); }
46	exit(0);
47}
48elsif($getopts->{daemon}) {
49	$bitflu->Daemonize();
50}
51elsif($getopts->{quiet}) {
52	$bitflu->DisableConsoleOutput;
53}
54
55$bitflu->SysinitProcess();
56$bitflu->SetupDirectories();
57$bitflu->InitPlugins();
58$bitflu->PreloopInit();
59
60$bitflu_run = 1 if !defined($bitflu_run); # Enable mainloop and sighandler if we are still not_killed
61
62
63
64RunPlugins();
65Danga::Socket->SetPostLoopCallback( sub { return $bitflu_run } );
66Danga::Socket->EventLoop();
67
68
69$bitflu->Storage->terminate;
70$bitflu->info("-> Shutdown completed after running for ".(int(time())-$^T)." seconds");
71exit(0);
72
73
74
75
76
77sub RunPlugins {
78	my $NOW = $bitflu->Network->GetTime;
79	foreach my $rk (keys(%{$bitflu->{_Runners}})) {
80		my $rx = $bitflu->{_Runners}->{$rk} or $bitflu->panic("Runner $rk vanished");
81		next if $rx->{runat} > $NOW;
82		$rx->{runat} = $NOW + $rx->{target}->run($NOW);
83	}
84	if($bitflu_run) {
85		Danga::Socket->AddTimer(0.1, sub { RunPlugins() })
86	}
87}
88
89sub HandleShutdown {
90	my($sig) = @_;
91	if(defined($bitflu_run) && $bitflu_run == 1) {
92		# $bitflu is running, so we can use ->info
93		$bitflu->info("-> Starting shutdown... (signal $sig received)");
94	}
95	else {
96		print "-> Starting shutdown... (signal $sig received), please wait...\n";
97	}
98	$bitflu_run = 0; # set it to not_running and killed
99}
100
101
102
103package Bitflu;
104use strict;
105use Carp;
106use constant V_MAJOR  => '1';
107use constant V_MINOR  => '52';
108use constant V_STABLE => 1;
109use constant V_TYPE   => ( V_STABLE ? 'stable' : 'devel' );
110use constant VERSION  => V_MAJOR.'.'.V_MINOR.'-'.V_TYPE;
111use constant APIVER   => 20120529;
112use constant LOGBUFF  => 0xFF;
113
114	##########################################################################
115	# Create a new Bitflu-'Dispatcher' object
116	sub new {
117		my($class, %args) = @_;
118		my $self = {};
119		bless($self, $class);
120		$self->{_LogFH}                 = *STDOUT;                            # Must be set ASAP
121		$self->{_LogBuff}               = [];                                 # Empty at startup
122
123		$self->{Core}->{"00_Tools"}     = Bitflu::Tools->new(super => $self);   # Tools is also loaded ASAP because ::Configuration needs it
124		$self->{Core}->{"01_Syscall"}   = Bitflu::Syscall->new(super => $self); # -> this should be one of the first core plugin started
125		$self->{Core}->{"10_Admin"}     = Bitflu::Admin->new(super => $self);
126		$self->{Core}->{"20_Config"}    = Bitflu::Configuration->new(super=>$self, configuration_file => $args{configuration_file});
127		$self->{Core}->{"30_Network"}   = Bitflu::Network->new(super => $self);
128		$self->{Core}->{"99_QueueMgr"}  = Bitflu::QueueMgr->new(super => $self);
129
130		$self->{_Runners}               = {};
131		$self->{_Plugins}               = ();
132		return $self;
133	}
134
135	##########################################################################
136	# Return internal version
137	sub GetVersion {
138		my($self) = @_;
139		return(V_MAJOR, V_MINOR, V_STABLE);
140	}
141
142	##########################################################################
143	# Return internal version as string
144	sub GetVersionString {
145		my($self) = @_;
146		return VERSION;
147	}
148
149	##########################################################################
150	# Call hardcoded configuration plugin
151	sub Configuration {
152		my($self) = @_;
153		return $self->{Core}->{"20_Config"};
154	}
155
156	##########################################################################
157	# Call hardcoded tools plugin
158	sub Tools {
159		my($self) = @_;
160		return $self->{Core}->{"00_Tools"};
161	}
162
163	##########################################################################
164	# Call hardcoded Network IO plugin
165	sub Network {
166		my($self) = @_;
167		return $self->{Core}->{"30_Network"};
168	}
169
170	##########################################################################
171	# Call hardcoded Admin plugin
172	sub Admin {
173		my($self) = @_;
174		return $self->{Core}->{"10_Admin"};
175	}
176
177	##########################################################################
178	# Call hardcoded Syscall plugin
179	sub Syscall {
180		my($self) = @_;
181		return $self->{Core}->{"01_Syscall"};
182	}
183
184	##########################################################################
185	# Call hardcoded Queue plugin
186	sub Queue {
187		my($self) = @_;
188		return $self->{Core}->{"99_QueueMgr"};
189	}
190
191	##########################################################################
192	# Call currently loaded storage plugin
193	sub Storage {
194		my($self) = @_;
195		return $self->{Plugin}->{Storage};
196	}
197
198	##########################################################################
199	# Let bitflu run the given target
200	sub AddRunner {
201		my($self,$target) = @_;
202		$self->{_Runners}->{$target} and $self->panic("$target is registered: wont overwrite it");
203		$self->{_Runners}->{$target} = { target=>$target, runat=>0 };
204	}
205
206	##########################################################################
207	# Returns a list of bitflus _Runner array as reference hash
208	sub GetRunnerTarget {
209		my($self,$target) = @_;
210		foreach my $rx (values(%{$self->{_Runners}})) {
211			return $rx->{target} if ref($rx->{target}) eq $target;
212		}
213		return undef;
214	}
215
216	##########################################################################
217	# Creates a new Simple-eXecution task
218	sub CreateSxTask {
219		my($self,%args) = @_;
220		$args{__SUPER_} = $self;
221		my $sx = Bitflu::SxTask->new(%args);
222		$self->AddRunner($sx);
223		#$self->debug("CreateSxTask returns <$sx>");
224		#$self->info("SxTask  : ".ref($sx->{super})."->$sx->{cback} created (id: $sx)");
225		return $sx;
226	}
227
228	##########################################################################
229	# Kills an SxTask
230	sub DestroySxTask {
231		my($self,$taskref) = @_;
232		delete($self->{_Runners}->{$taskref}) or $self->panic("Could not kill non-existing task $taskref");
233		#$self->info("SxTask  : $taskref terminated");
234		return 1;
235	}
236
237	##########################################################################
238	# Register the exclusive storage plugin
239	sub AddStorage {
240		my($self,$target) = @_;
241		if(defined($self->{Plugin}->{Storage})) { $self->panic("Unable to register additional storage driver '$target' !"); }
242		$self->{Plugin}->{Storage} = $target;
243		$self->debug("AddStorage($target)");
244		return 1;
245	}
246
247
248
249	##########################################################################
250	# Loads all plugins from 'plugins' directory but does NOT init them
251	sub LoadPlugins {
252		my($self,$xclass) = @_;
253		#
254		unshift(@INC, $self->Configuration->GetValue('plugindir'));
255
256		my $pdirpath = $self->Configuration->GetValue('plugindir')."/$xclass";
257		my @plugins  = ();
258		my %exclude  = (map { $_ => 1} split(/;/,$self->Configuration->GetValue('pluginexclude')));
259
260		opendir(PLUGINS, $pdirpath) or $self->stop("Unable to read directory '$pdirpath' : $!");
261		foreach my $dirent (sort readdir(PLUGINS)) {
262			next unless my($pfile, $porder, $pmodname) = $dirent =~ /^((\d\d)_(.+)\.pm)$/i;
263
264			if($exclude{$pfile}) {
265				$self->info("Skipping disabled plugin '$pfile -> $pmodname'");
266			}
267			elsif($porder eq '00' && $pmodname ne $self->Configuration->GetValue('storage')) {
268				$self->debug("Skipping unconfigured storage plugin '$dirent'");
269			}
270			else {
271				push(@plugins, {file=>$pfile, order=>$porder, class=>$xclass, modname=>$pmodname, package=>$xclass."::".$3});
272				$self->debug("Found plugin $plugins[-1]->{package} in folder $pdirpath");
273			}
274		}
275		closedir(PLUGINS);
276
277		$self->{_Plugins} = \@plugins;
278
279		foreach my $plugin (@{$self->{_Plugins}}) {
280			my $fname = $plugin->{class}."/".$plugin->{file};
281			$self->debug("Loading $fname");
282			eval { require $fname; };
283			if($@) {
284				my $perr = $@; chomp($perr);
285				$self->yell("Unable to load plugin '$fname', error was: '$perr'");
286				$self->stop(" -> Please fix or remove this broken plugin file from $pdirpath");
287			}
288			my $this_apiversion = $plugin->{package}->_BITFLU_APIVERSION;
289			if($this_apiversion != APIVER) {
290				$self->yell("Plugin '$fname' has an invalid API-Version ( (\$apivers = $this_apiversion) != (\$expected = ".APIVER.") )");
291				$self->yell("HINT: Maybe you forgot to replace the plugins at $pdirpath while upgrading bitflu?!...");
292				$self->stop("-> Exiting due to APIVER mismatch");
293			}
294		}
295		return @plugins;
296	}
297
298	##########################################################################
299	# Startup all plugins
300	sub InitPlugins {
301		my($self) = @_;
302
303		my @TO_INIT = ();
304		foreach my $plugin (@{$self->{_Plugins}}) {
305			$self->debug("Registering '$plugin->{package}'");
306			my $this_plugin = $plugin->{package}->register($self) or $self->panic("Regsitering '$plugin' failed, aborting");
307			push(@TO_INIT, {name=>$plugin->{package}, ref=>$this_plugin});
308		}
309		foreach my $toinit (@TO_INIT) {
310			$self->debug("++ Starting Normal-Plugin '$toinit->{name}'");
311			$toinit->{ref}->init() or $self->panic("Unable to init plugin : $!");
312		}
313		foreach my $coreplug (sort keys(%{$self->{Core}})) {
314			$self->debug("++ Starting Core-Plugin '$coreplug'");
315			$self->{Core}->{$coreplug}->init() or $self->panic("Unable to init Core-Plugin : $!");
316		}
317	}
318
319	##########################################################################
320	# Build some basic directory structure
321	sub SetupDirectories {
322		my($self) =@_;
323		my $workdir = $self->Configuration->GetValue('workdir') or $self->panic("No workdir configured");
324		my $tmpdir  = $self->Tools->GetTempdir                  or $self->panic("No tempdir configured");
325		foreach my $this_dir ($workdir, $tmpdir) {
326			unless(-d $this_dir) {
327				$self->debug("mkdir($this_dir)");
328				mkdir($this_dir) or $self->stop("Unable to create directory '$this_dir' : $!");
329			}
330		}
331	}
332
333	##########################################################################
334	# Change nice level, chroot and drop privileges
335	sub SysinitProcess {
336		my($self) = @_;
337		my $chroot = $self->Configuration->GetValue('chroot');
338		my $chdir  = $self->Configuration->GetValue('chdir');
339		my $uid    = int($self->Configuration->GetValue('runas_uid') || 0);
340		my $gid    = int($self->Configuration->GetValue('runas_gid') || 0);
341		my $renice = int($self->Configuration->GetValue('renice')    || 0);
342		my $outlog = ($self->Configuration->GetValue('logfile')      || '');
343		my $pidfile= ($self->Configuration->GetValue('pidfile')      || '');
344
345		if(length($outlog)) {
346			open(LFH, ">>", $outlog) or $self->stop("Cannot write to logfile '$outlog' : $!");
347			$self->{_LogFH} = *LFH;
348			$self->{_LogFH}->autoflush(1);
349			$self->yell("Logging to '$outlog'");
350		}
351
352		if(length($pidfile)) {
353			$self->info("Writing pidfile at '$pidfile'");
354			open(PIDFILE, ">", $pidfile) or $self->stop("Cannot write to pidfile '$pidfile' : $!");
355			print PIDFILE "$$\n";
356			close(PIDFILE);
357		}
358
359
360		# Lock values because we cannot change them after we finished
361		foreach my $lockme (qw(runas_uid runas_gid chroot)) {
362			$self->Configuration->RuntimeLockValue($lockme);
363		}
364
365		# -> Adjust resolver settings (is this portable? does it work on *BSD?)
366		$ENV{RES_OPTIONS} = "timeout:1 attempts:1";
367
368
369		# -> Set niceness (This is done before dropping root to get negative values working)
370		if($renice) {
371			$renice = ($renice > 19 ? 19 : ($renice < -20 ? -20 : $renice) ); # Stop funny stuff...
372			$self->info("Setting my own niceness to $renice");
373			POSIX::nice($renice) or $self->warn("nice($renice) failed: $!");
374		}
375
376		# -> Chroot
377		if(defined($chroot)) {
378			$self->info("Chrooting into '$chroot'");
379
380			Carp::longmess("FULLY_LOADING_CARP");            # init CARP module
381			my $x = gethostbyname('localhost.localdomain');  # init DNS resolver
382
383			chdir($chroot)  or $self->panic("Cannot change into directory '$chroot' : $!");
384			chroot($chroot) or $self->panic("Cannot chroot into directory '$chroot' (are you root?) : $!");
385			chdir('/')      or $self->panic("Unable to change into new chroot topdir: $!");
386		}
387
388
389		# -> Drop group privileges
390		if($gid) {
391			$self->info("Changing gid to $gid");
392
393			$) = "$gid $gid"; # can fail with 'no such file or directory'
394
395			$! = undef;
396			$( = "$gid";
397			$self->panic("Unable to set GID: $!")  if $!;
398		}
399
400		# -> Drop user privileges
401		if($uid) {
402			$self->info("Changing uid to $uid");
403			POSIX::setuid($uid) or $self->panic("Unable to change UID: $!");
404		}
405
406		# -> Check if we are still root. We shouldn't.
407		if($> == 0 or $) == 0 or $( == 0) {
408			$self->warn("Refusing to run with root privileges. Do not start $0 as root unless you are using");
409			$self->warn("the chroot option. In this case you must also specify the options runas_uid & runas_gid");
410			$self->stop("Bitflu refuses to run as root");
411		}
412
413		if($chdir) {
414			$self->info("Changing into directory '$chdir'");
415			chdir($chdir) or $self->stop("chdir($chdir) failed: $!");
416		}
417
418		$self->info("$0 is running with pid $$ ; uid = ($>|$<) / gid = ($)|$()");
419	}
420
421
422	##########################################################################
423	# This should get called after starting the mainloop
424	# The subroutine does the same as a 'init' in a plugin
425	sub PreloopInit {
426		my($self) = @_;
427		$self->Admin->RegisterCommand('die'      , $self, '_Command_Shutdown'     , 'Terminates bitflu');
428		$self->Admin->RegisterCommand('version'  , $self, '_Command_Version'      , 'Displays bitflu version string');
429		$self->Admin->RegisterCommand('date'     , $self, '_Command_Date'         , 'Displays current time and date');
430	}
431
432	##########################################################################
433	# Set _LogFh to undef if we are logging to stdout
434	# this disables logging to the console
435	sub DisableConsoleOutput {
436		my($self) = @_;
437		$self->debug("DisableConsoleOutput called");
438		if($self->{_LogFH} eq *STDOUT) {
439			$self->debug("=> Setting _LogFH to undef");
440			$self->{_LogFH} = '';
441		}
442		# Do not printout any warnings to STDOUT
443		$SIG{__WARN__} = sub {};
444	}
445
446	##########################################################################
447	# Fork into background
448	sub Daemonize {
449		my($self) = @_;
450		my $child = fork();
451
452		if(!defined($child)) {
453			die "Unable to fork: $!\n";
454		}
455		elsif($child != 0) {
456			$self->debug("Bitflu is running with pid $child");
457			exit(0);
458		}
459
460		$self->DisableConsoleOutput;
461	}
462
463	##########################################################################
464	# bye!
465	sub _Command_Shutdown {
466		my($self) = @_;
467		kill(2,$$);
468		return {MSG=>[ [1, "Shutting down $0 (with pid $$)"] ], SCRAP=>[]};
469	}
470
471	##########################################################################
472	# Return version string
473	sub _Command_Version {
474		my($self) = @_;
475		my $uptime = ($self->Network->GetTime - $^T)/60;
476		return {MSG=>[ [1, sprintf("This is Bitflu %s (API:%s) running on %s with perl %vd. Uptime: %.3f minutes (%s)",$self->GetVersionString,
477		                                         APIVER, $^O, $^V, $uptime, "".localtime($^T) )] ], SCRAP=>[]};
478	}
479
480	##########################################################################
481	# Return version string
482	sub _Command_Date {
483		my($self) = @_;
484		return {MSG=>[ [1, "".localtime()] ], SCRAP=>[]};
485	}
486
487	##########################################################################
488	# Printout logmessage
489	sub _xlog {
490		my($self, $msg, $force_stdout) = @_;
491		my $rmsg  = localtime()." # $msg\n";
492		my $xfh   = $self->{_LogFH};
493		my $lbuff = $self->{_LogBuff};
494
495		print $xfh $rmsg if $xfh;
496
497		if($force_stdout && $xfh ne *STDOUT) {
498			print STDOUT $rmsg;
499		}
500
501		push(@$lbuff, $rmsg);
502		shift(@$lbuff) if int(@$lbuff) >= LOGBUFF;
503	}
504
505	sub info  { my($self,$msg) = @_; return if $self->Configuration->GetValue('loglevel') < 4;  $self->_xlog($msg);                 }
506	sub debug { my($self,$msg) = @_; return if $self->Configuration->GetValue('loglevel') < 10; $self->_xlog(" ** DEBUG **  $msg"); }
507	sub warn  { my($self,$msg) = @_; return if $self->Configuration->GetValue('loglevel') < 2;  $self->_xlog("** WARNING ** $msg"); }
508	sub yell  { my($self,$msg) = @_; $self->_xlog($msg,1);                                                                          }
509	sub stop  { my($self,$msg) = @_; $self->yell("EXITING # $msg"); exit(1); }
510	sub panic {
511		my($self,$msg) = @_;
512		$self->yell("--------- BITFLU SOMEHOW MANAGED TO CRASH ITSELF; PANIC MESSAGE: ---------");
513		$self->yell($msg);
514		$self->yell("--------- BACKTRACE START ---------");
515		$self->yell(Carp::longmess());
516		$self->yell("---------- BACKTRACE END ----------");
517
518		$self->yell("SHA1-Module used : ".$self->Tools->{mname});
519		$self->yell("Perl Version     : ".sprintf("%vd", $^V));
520		$self->yell("Perl Execname    : ".$^X);
521		$self->yell("Bitflu Version   : ".$self->GetVersionString);
522		$self->yell("OS-Name          : ".$^O);
523		$self->yell("IPv6 ?           : ".$self->Network->HaveIPv6);
524		$self->yell("Danga::Socket    : ".$Danga::Socket::VERSION);
525		$self->yell("Running since    : ".gmtime($^T));
526		$self->yell("---------- LOADED PLUGINS ---------");
527		foreach my $plug (@{$self->{_Plugins}}) {
528			$self->yell(sprintf("%-32s -> %s",$plug->{file}, $plug->{package}));
529		}
530		$self->yell("##################################");
531		exit(1);
532	}
533
534
5351;
536
537
538####################################################################################################################################################
539####################################################################################################################################################
540# Bitflu Queue manager
541#
542package Bitflu::QueueMgr;
543
544use constant SHALEN           => 40;
545use constant HPFX             => 'history_';
546use constant HIST_MAX         => 100;
547use constant STATE_PAUSED     => 1;
548use constant STATE_AUTOPAUSED => 2;
549
550	sub new {
551		my($class, %args) = @_;
552		my $self = {super=> $args{super}};
553		bless($self,$class);
554		return $self;
555	}
556
557	##########################################################################
558	# Inits plugin: This resumes all found storage items
559	sub init {
560		my($self) = @_;
561		my $queueIds = $self->{super}->Storage->GetStorageItems();
562		my $toload   = int(@$queueIds);
563		$self->info("Resuming $toload downloads, this may take a few seconds...");
564
565		foreach my $sid (@$queueIds) {
566			my $this_storage = $self->{super}->Storage->OpenStorage($sid) or $self->panic("Unable to open storage for sid $sid");
567			my $owner        = $this_storage->GetSetting('owner');
568
569			$self->info(sprintf("[%3d] Loading %s", $toload--, $sid));
570
571			if(defined($owner) && (my $r_target = $self->{super}->GetRunnerTarget($owner)) ) {
572				$r_target->resume_this($sid);
573			}
574			else {
575				$self->stop("StorageObject $sid is owned by '$owner', but plugin is not loaded/registered correctly");
576			}
577		}
578
579		$self->{super}->Admin->RegisterCommand('rename'  , $self, 'admincmd_rename', 'Renames a download',
580		         [ [undef, "Renames a download"], [undef, "Usage: rename queue_id \"New Name\""] ]);
581		$self->{super}->Admin->RegisterCommand('cancel'  , $self, 'admincmd_cancel', 'Removes a file from the download queue',
582		         [ [undef, "Removes a file from the download queue. Use --wipe to *remove* data of completed downloads"], [undef, "Usage: cancel [--wipe] queue_id [queue_id2 ...]"] ]);
583
584		$self->{super}->Admin->RegisterCommand('history' , $self, 'admincmd_history', 'Manages download history',
585		        [  [undef, "Manages internal download history"], [undef, ''],
586		           [undef, "Usage: history [ queue_id [show forget] ] [list|drop|cleanup]"], [undef, ''],
587		           [undef, "history list            : List all remembered downloads"],
588		           [undef, "history drop            : List and forget all remembered downloads"],
589		           [undef, "history cleanup         : Trim history size"],
590		           [undef, "history queue_id show   : Shows details about queue_id"],
591		           [undef, "history queue_id forget : Removes history of queue_id"],
592		        ]);
593
594		$self->{super}->Admin->RegisterCommand('pause' , $self, 'admincmd_pause', 'Stops a download',
595		        [  [undef, "Stop given download"], [undef, ''],
596		           [undef, "Usage: pause queue_id [queue_id2 ... | --all]"], [undef, ''],
597		        ]);
598
599		$self->{super}->Admin->RegisterCommand('resume' , $self, 'admincmd_resume', 'Resumes a paused download',
600		        [  [undef, "Resumes a paused download"], [undef, ''],
601		           [undef, "Usage: resume queue_id [queue_id2 ... | --all]"], [undef, ''],
602		        ]);
603
604		$self->info("--- startup completed: bitflu ".$self->{super}->GetVersionString." is ready ---");
605		return 1;
606	}
607
608
609	##########################################################################
610	# Pauses a download
611	sub admincmd_pause {
612		my($self, @args) = @_;
613
614		my @MSG    = ();
615		my $NOEXEC = '';
616		$self->{super}->Tools->GetOpts(\@args);
617
618		if(int(@args)) {
619			foreach my $cid (@args) {
620				my $so = $self->{super}->Storage->OpenStorage($cid);
621				if($so) {
622					$self->SetPaused($cid);
623					push(@MSG, [1, "$cid: download paused"]);
624				}
625				else {
626					push(@MSG, [2, "$cid: does not exist in queue, cannot pause"]);
627				}
628			}
629		}
630		else {
631			$NOEXEC .= 'Usage: pause queue_id';
632		}
633		return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC});
634	}
635
636	##########################################################################
637	# Resumes a download
638	sub admincmd_resume {
639		my($self, @args) = @_;
640
641		my @MSG    = ();
642		my $NOEXEC = '';
643		$self->{super}->Tools->GetOpts(\@args);
644
645		if(int(@args)) {
646			foreach my $cid (@args) {
647				my $so = $self->{super}->Storage->OpenStorage($cid);
648				if($so) {
649					$self->SetUnPaused($cid);
650					push(@MSG, [1, "$cid: download resumed"]);
651				}
652				else {
653					push(@MSG, [2, "$cid: does not exist in queue, cannot resume"]);
654				}
655			}
656		}
657		else {
658			$NOEXEC .= 'Usage: resume queue_id';
659		}
660		return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC});
661	}
662
663
664	##########################################################################
665	# Cancel a queue item
666	sub admincmd_cancel {
667		my($self, @args) = @_;
668
669		my @MSG     = ();
670		my $NOEXEC  = '';
671		my $do_wipe = 0;
672
673		if(int(@args)) {
674			if($args[0] eq '--wipe') {
675				$do_wipe = 1;
676				shift(@args); # remove first parameter
677			}
678
679			foreach my $cid (@args) {
680				my $storage = $self->{super}->Storage->OpenStorage($cid);
681				if($storage) {
682					my $owner = $storage->GetSetting('owner');
683					if(defined($owner) && (my $r_target = $self->{super}->GetRunnerTarget($owner)) ) {
684						$self->ModifyHistory($cid, Canceled=>'');
685						$storage->SetSetting('wipedata', 1) if $do_wipe;
686						$r_target->cancel_this($cid);
687						push(@MSG, [1, "'$cid' canceled"]);
688					}
689					else {
690						$self->panic("'$cid' has no owner, cannot cancel!");
691					}
692				}
693				else {
694					push(@MSG, [2, "'$cid' not removed from queue: No such item"]);
695				}
696			}
697		}
698		else {
699			$NOEXEC .= 'Usage: cancel [--wipe] queue_id [queue_id2 ...]';
700		}
701
702
703		return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC});
704	}
705
706	##########################################################################
707	# Rename a queue item
708	sub admincmd_rename {
709		my($self, @args) = @_;
710
711		my $sha    = $args[0];
712		my $name   = $args[1];
713		my @MSG    = ();
714		my $NOEXEC = '';
715
716		if(!defined($name)) {
717			$NOEXEC .= "Usage: rename queue_id \"New Name\"";
718		}
719		elsif(my $storage = $self->{super}->Storage->OpenStorage($sha)) {
720			$storage->SetSetting('name', $name);
721			push(@MSG, [1, "Renamed $sha into '$name'"]);
722		}
723		else {
724			push(@MSG, [2, "Unable to rename $sha: queue_id does not exist"]);
725		}
726		return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC});
727	}
728
729	##########################################################################
730	# Manages download history
731	sub admincmd_history {
732		my($self,@args) = @_;
733
734		my $sha = ($args[0] || '');
735		my $cmd = ($args[1] || '');
736		my @MSG    = ();
737		my $NOEXEC = '';
738		my $hpfx   = HPFX;
739		my $hkey   = $hpfx.$sha;
740		my $strg   = $self->{super}->Storage;
741
742		if($sha eq 'list' or $sha eq 'drop' or $sha eq 'cleanup') {
743			my @cbl    = $strg->ClipboardList;
744			my $cbi    = 0;
745			my $drop   = {};
746			my $drop_c = 0;
747
748			foreach my $item (@cbl) {
749				if(my($this_sid) = $item =~ /^$hpfx(.+)$/) {
750					my $hr = $self->GetHistory($this_sid);      # History Reference
751					my $ll = "$1 : ".substr(($hr->{Name}||''),0,64);  # Telnet-Safe-Name
752					push(@MSG, [ ($strg->OpenStorage($this_sid) ? 1 : 5 ), $ll]);
753					$strg->ClipboardRemove($item)                     if $sha eq 'drop';
754					push(@{$drop->{($hr->{FirstSeen}||0)}},$this_sid) if $sha eq 'cleanup'; # Create list with possible items to delete
755					$cbi++;
756				}
757			}
758
759
760			# Walk droplist (if any). From oldest to newest
761			foreach my $d_sid ( map(@{$drop->{$_}}, sort({$a<=>$b} keys(%$drop))) ) {
762				next if $strg->OpenStorage($d_sid);      # do not even try to drop existing items (wouldn't do much harm but...)
763				last if ( $cbi-$drop_c++ ) <= HIST_MAX;  # Abort if we reached our limit
764				$strg->ClipboardRemove(HPFX.$d_sid);     # Still here ? -> ditch it. (fixme: HPFX concating is ugly)
765			}
766
767
768			push(@MSG, [1, ($sha eq 'drop' ? "History cleared" : "$cbi item".($cbi == 1 ? '' : 's')." stored in history")]);
769		}
770		elsif(length($sha)) {
771			if(my $ref = $self->GetHistory($sha)) {
772				if($cmd eq 'show') {
773					foreach my $k (sort keys(%$ref)) {
774						push(@MSG,[1, sprintf("%20s -> %s",$k,$ref->{$k})]);
775					}
776				}
777				elsif($cmd eq 'forget') {
778					$strg->ClipboardRemove($hkey);
779					push(@MSG, [1, "history for $sha has been removed"]);
780				}
781				else {
782					push(@MSG, [2, "unknown subcommand, see 'help history'"]);
783				}
784			}
785			else {
786				push(@MSG, [2,"queue item $sha has no history"]);
787			}
788		}
789		else {
790			push(@MSG, [2,"See 'help history'"]);
791		}
792
793		return({MSG=>\@MSG, SCRAP=>[], NOEXEC=>$NOEXEC});
794	}
795
796	##########################################################################
797	# Add a new item to queue (Also creates a new storage)
798	sub AddItem {
799		my($self, %args) = @_;
800
801		my $name    = $args{Name};
802		my $chunks  = $args{Chunks} or $self->panic("No chunks?!");
803		my $size    = $args{Size};
804		my $overst  = $args{Overshoot};
805		my $flayout = $args{FileLayout} or $self->panic("FileLayout missing");
806		my $shaname = ($args{ShaName} || unpack("H*", $self->{super}->Tools->sha1($name)));
807		my $owner   = ref($args{Owner}) or $self->panic("No owner?");
808		my $sobj    = 0;
809		my $history = $self->{super}->Configuration->GetValue('history');
810
811		if($size == 0 && $chunks != 1) {
812			$self->panic("Sorry: You can not create a dynamic storage with multiple chunks ($chunks != 1)");
813		}
814		if(!defined($name)) {
815			$self->panic("AddItem needs a name!");
816		}
817		if(length($shaname) != SHALEN) {
818			$self->panic("Invalid shaname: $shaname");
819		}
820
821
822		if($self->{super}->Storage->OpenStorage($shaname)) {
823			$@ = "$shaname: item exists in queue";
824		}
825		elsif($history && $self->GetHistory($shaname)) {
826			$@ = "$shaname: has already been downloaded. Use 'history $shaname forget' if you want do re-download it";
827			$self->warn($@);
828		}
829		elsif($sobj = $self->{super}->Storage->CreateStorage(StorageId => $shaname, Size=>$size, Chunks=>$chunks, Overshoot=>$overst, FileLayout=>$flayout)) {
830			$sobj->SetSetting('owner', $owner);
831			$sobj->SetSetting('name' , $name);
832			$sobj->SetSetting('createdat', $self->{super}->Network->GetTime);
833			if($history) {
834				$self->ModifyHistory($shaname, Name=>$name, Canceled=>'never', Started=>'',
835				                               Ended=>'never', Committed=>'never', FirstSeen=>$self->{super}->Network->GetTime);
836			}
837		}
838		else {
839			$self->panic("CreateStorage for $shaname failed");
840		}
841
842		return $sobj;
843	}
844
845	##########################################################################
846	# Removes an item from the queue + storage
847	sub RemoveItem {
848		my($self,$sid) = @_;
849		my $ret = $self->{super}->Storage->RemoveStorage($sid);
850		if(!$ret) {
851			$self->panic("Unable to remove storage-object $sid : $!");
852		}
853
854		delete($self->{statistics}->{$sid}) or $self->panic("Cannot remove non-existing statistics for $sid");
855		return 1;
856	}
857
858	##########################################################################
859	# Updates/creates on-disk history of given sid
860	# Note: Strings with length == 0 are replaced with the current time. Awkward.
861	sub ModifyHistory {
862		my($self,$sid, %args) = @_;
863		if($self->{super}->Storage->OpenStorage($sid)) {
864			my $old_ref = $self->GetHistory($sid);
865			foreach my $k (keys(%args)) {
866				my $v = $args{$k};
867				$v = "".localtime($self->{super}->Network->GetTime) if length($v) == 0;
868				$old_ref->{$k} = $v;
869			}
870			return $self->{super}->Storage->ClipboardSet(HPFX.$sid, $self->{super}->Tools->RefToCBx($old_ref));
871		}
872		else {
873			return 0;
874		}
875	}
876
877	##########################################################################
878	# Returns history of given sid
879	sub GetHistory {
880		my($self,$sid) = @_;
881		my $r = $self->{super}->Tools->CBxToRef($self->{super}->Storage->ClipboardGet(HPFX.$sid));
882		return $r;
883	}
884
885	##########################################################################
886	# Set private statistics
887	# You are supposed to set total_bytes, total_chunks, done_bytes, done_chunks,
888	#                     uploaded_bytes, clients, active_clients, last_recv
889	# ..and we do not save anything.. you'll need to do this on your own :-)
890
891	sub SetStats {
892		my($self, $id, $ref) = @_;
893		foreach my $xk (keys(%$ref)) {
894			$self->{statistics}->{$id}->{$xk} = $ref->{$xk};
895		}
896	}
897
898	##########################################################################
899	# Flushes all stats and sets all common fields to 0
900	sub InitializeStats {
901		my($self, $id) = @_;
902		return $self->SetStats($id, {total_bytes=>0, done_bytes=>0, uploaded_bytes=>0, active_clients=>0,
903		                             clients=>0, speed_upload =>0, speed_download => 0, last_recv => 0,
904		                             total_chunks=>0, done_chunks=>0});
905	}
906
907	sub IncrementStats {
908		my($self, $id, $ref) = @_;
909		foreach my $xk (keys(%$ref)) {
910			$self->SetStats($id,{$xk => $self->GetStat($id,$xk)+$ref->{$xk}});
911		}
912	}
913	sub DecrementStats {
914		my($self, $id, $ref) = @_;
915		foreach my $xk (keys(%$ref)) {
916			$self->SetStats($id,{$xk => $self->GetStat($id,$xk)-$ref->{$xk}});
917		}
918	}
919
920	##########################################################################
921	# Get private statistics
922	sub GetStats {
923		my($self,$id) = @_;
924		return $self->{statistics}->{$id};
925	}
926
927	##########################################################################
928	# Get single statistics key
929	sub GetStat {
930		my($self,$id,$key) = @_;
931		return $self->GetStats($id)->{$key};
932	}
933
934	##########################################################################
935	# Returns a list with all queue objects
936	sub GetQueueList {
937		my($self) = @_;
938		my $xh = ();
939		my $all_ids = $self->{super}->Storage->GetStorageItems();
940		foreach my $id (@$all_ids) {
941			my $so = $self->{super}->Storage->OpenStorage($id) or $self->panic("Unable to open $id");
942			my $name =  $so->GetSetting('name');
943			my $type = ($so->GetSetting('type') or "????");
944			$xh->{$type}->{$id} = { name=>$name };
945		}
946		return $xh;
947	}
948
949	##########################################################################
950	# Returns true if download is marked as paused
951	sub IsPaused {
952		my($self,$sid) = @_;
953		my $so = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist");
954		return ( $so->GetSetting('_paused') ? 1 : 0 );
955	}
956
957	##########################################################################
958	# Returns true if download is marked as *auto* paused
959	sub IsAutoPaused {
960		my($self,$sid) = @_;
961		my $so  = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist");
962		my $val = ($so->GetSetting('_paused') || 0);
963		return ( $val == STATE_AUTOPAUSED ? 1 : 0 );
964	}
965
966	##########################################################################
967	# Set autopause flag of specified SID
968	sub SetAutoPaused {
969		my($self,$sid) = @_;
970		my $so  = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist");
971		$so->SetSetting('_paused', STATE_AUTOPAUSED);
972	}
973
974	##########################################################################
975	# Set normal pause flag of specified SID
976	sub SetPaused {
977		my($self,$sid) = @_;
978		my $so  = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist");
979		$so->SetSetting('_paused', STATE_PAUSED);
980	}
981
982	##########################################################################
983	# Remove paused flag
984	sub SetUnPaused {
985		my($self,$sid) = @_;
986		my $so  = $self->{super}->Storage->OpenStorage($sid) or $self->panic("$sid does not exist");
987		$so->SetSetting('_paused', 0);
988	}
989
990	sub debug { my($self, $msg) = @_; $self->{super}->debug("QueueMGR: ".$msg); }
991	sub info  { my($self, $msg) = @_; $self->{super}->info("QueueMGR: ".$msg);  }
992	sub warn  { my($self, $msg) = @_; $self->{super}->warn("QueueMGR: ".$msg);  }
993	sub panic { my($self, $msg) = @_; $self->{super}->panic("QueueMGR: ".$msg); }
994	sub stop  { my($self, $msg) = @_; $self->{super}->stop("QueueMGR: ".$msg);  }
995
9961;
997
998###############################################################################################################
999# Bitflu Sammelsurium
1000package Bitflu::Tools;
1001
1002	use MIME::Base64 ();
1003
1004	##########################################################################
1005	# Create new object and try to load a module
1006	# Note: new() this gets called before ::Configuration is ready!
1007	#       You can't use fancy stuff such as ->debug. ->stop should work
1008	sub new {
1009		my($class, %args) = @_;
1010		my $self = { super => $args{super}, ns => '', mname => '' };
1011		bless($self,$class);
1012
1013		foreach my $mname (qw(Digest::SHA Digest::SHA1)) {
1014			my $code = "use $mname; \$self->{ns} = $mname->new; \$self->{mname} = \$mname";
1015			eval $code;
1016		}
1017
1018		unless($self->{mname}) {
1019			$self->stop("No SHA1-Module found. Bitflu requires 'Digest::SHA' (http://search.cpan.org)");
1020		}
1021
1022		return $self;
1023	}
1024
1025	sub init { return 1 }
1026
1027	##########################################################################
1028	# Return hexed sha1 of $buff
1029	sub sha1_hex {
1030		my($self, $buff) = @_;
1031		$self->{ns}->add($buff);
1032		return $self->{ns}->hexdigest;
1033	}
1034
1035	##########################################################################
1036	# Return sha1 of $buff
1037	sub sha1 {
1038		my($self,$buff) = @_;
1039		$self->{ns}->add($buff);
1040		return $self->{ns}->digest;
1041	}
1042
1043	##########################################################################
1044	# Encode string into base32
1045	sub encode_b32 {
1046		my($self,$val) = @_;
1047		my $s = unpack("B*",$val);
1048		$s =~ s/(.{5})/000$1/g; # Convert 5-byte-chunks to 8-byte-chunks
1049		my $len  = length($s);
1050		my $olen = $len % 8;
1051
1052		if($olen) {
1053			$s = substr($s,0,$len-$olen)."000".substr($s,-1*$olen).("0" x (5 - $olen));
1054		}
1055
1056		$s = pack("B*",$s);
1057		$s =~ tr/\0-\37/A-Z2-7/; # Octal!
1058		return $s;
1059	}
1060
1061	##########################################################################
1062	# Decode base32 into string
1063	sub decode_b32 {
1064		my($self,$val) = @_;
1065		my $s = uc($val);
1066		$s =~ tr/A-Z2-7/\0-\37/;
1067		$s = unpack("B*", $s);
1068		$s =~ s/000(.{5})/$1/g;
1069		if( my $olen = -1*(length($s)%8) ) {
1070			$s = substr($s,0,$olen);
1071		}
1072		return pack("B*",$s);
1073	}
1074
1075	##########################################################################
1076	# Decode base64 into string
1077	sub decode_b64 {
1078		my($self,$val) = @_;
1079		return MIME::Base64::decode($val);
1080	}
1081
1082
1083	##########################################################################
1084	# Parse a magnet link
1085	sub decode_magnet {
1086		my($self,$uri) = @_;
1087		my $xt = {};
1088		if($uri =~ /^magnet:\?(.+)$/) {
1089			foreach my $item (split(/&/,$1)) {
1090				if($item =~ /^(([^=\.]+)(\.\d+)?)=(.+)$/) {
1091					my $mk = $2;
1092					my @it  = split(/:/,$4);
1093					my $mv = pop(@it);
1094					my $sk = join(':',@it) || ":";
1095					push(@{$xt->{$mk}}, {$sk => $mv});
1096				}
1097			}
1098		}
1099		return $xt;
1100	}
1101
1102	sub ExpandRange {
1103		my($self,@a) = @_;
1104		my %dedupe = ();
1105		foreach my $chunk (@a) {
1106			if($chunk =~ /^(\d+)-(\d+)$/) { if($2 <= 0xFFFF) { for($1..$2) { $dedupe{$_} = 1; } } }
1107			elsif($chunk =~ /^(\d+)$/)    { $dedupe{abs($1)} = 1;                                 }
1108		}
1109		return \%dedupe;
1110	}
1111
1112
1113	##########################################################################
1114	# Escape a HTTP-URI-Escaped string
1115	sub UriUnescape {
1116		my($self,$string) = @_;
1117		$string =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg;
1118		return $string;
1119	}
1120
1121	##########################################################################
1122	# Escape string
1123	sub UriEscape {
1124		my($self,$string) = @_;
1125		$string =~ s/([^A-Za-z0-9\-_.!~*'()\/])/sprintf("%%%02X",ord($1))/eg;
1126		return $string;
1127	}
1128
1129	##########################################################################
1130	# Converts a CBX into a hashref
1131	sub CBxToRef {
1132		my($self,$buff) = @_;
1133		my $r      = undef;
1134		   $buff ||= '';
1135		foreach my $line (split(/\n/,$buff)) {
1136			chomp($line);
1137			if($line =~ /^#/ or $line =~ /^\s*$/) {
1138				next; # Comment or empty line
1139			}
1140			elsif($line =~ /^([a-zA-Z0-9_:\.]+)\s*=\s*(.*)$/) {
1141				$r->{$1} = $2;
1142			}
1143			else {
1144				# Ignore. Can't use panic anyway
1145			}
1146		}
1147		return $r;
1148	}
1149
1150	##########################################################################
1151	# Convert hashref into CBX
1152	sub RefToCBx {
1153		my($self,$ref) = @_;
1154		my @caller = caller;
1155		my $buff   = "# Written by $caller[0]\@$caller[2] on ".gmtime()."\n";
1156		foreach my $key (sort(keys(%$ref))) {
1157			my $val = $ref->{$key};
1158			$key =~ tr/a-zA-Z0-9_:\.//cd;
1159			$val =~ tr/\r\n//d;
1160			$buff .= sprintf("%-25s = %s\n",$key, $val);
1161		}
1162		return $buff."# EOF #\n";
1163	}
1164
1165	##########################################################################
1166	# Generates a 'find' like dirlist
1167	sub GenDirList {
1168		my($self,$dstruct, $dir) = @_;
1169		push(@{$dstruct->{_}},$dir);
1170		my $pfx = join('/',@{$dstruct->{_}});
1171		opendir(my $DFH, $pfx);
1172		foreach my $dirent (readdir($DFH)) {
1173			my $fp = "$pfx/".$dirent;
1174			next if $dirent eq '.';   # No thanks
1175			next if $dirent eq '..';  # Ditto
1176			next if (-l $fp);         # Won't look at symlinks
1177			push(@{$dstruct->{list}},$fp);
1178			$self->GenDirList($dstruct,$dirent) if -d $fp;
1179		}
1180		closedir($DFH);
1181		pop(@{$dstruct->{_}});
1182	}
1183
1184	##########################################################################
1185	# Getopts like support
1186	sub GetOpts {
1187		my($self,$args) = @_;
1188		my @leftovers = ();
1189		my $ctx       = undef;
1190		my $argref    = {};
1191		my $getargs   = 1;
1192		foreach my $this_arg (@$args) {
1193
1194			if($getargs) {
1195				if($this_arg eq '--') {
1196					$getargs = 0;
1197				}
1198				elsif($this_arg eq '--all') {
1199					my $ql = $self->{super}->Queue->GetQueueList;
1200					foreach my $protocol (keys(%$ql)) {
1201						push(@leftovers, keys(%{$ql->{$protocol}}));
1202					}
1203				}
1204				elsif($this_arg =~ /^--?(.+)/) {
1205					$ctx = $1;
1206					$argref->{$ctx} = 1 if !exists $argref->{$ctx};
1207				}
1208				elsif(defined($ctx)) {
1209					$argref->{$ctx} = $this_arg;
1210					$ctx = undef;
1211				}
1212				else {
1213					push(@leftovers, $this_arg);
1214				}
1215			}
1216			else {
1217				push(@leftovers, $this_arg);
1218			}
1219
1220		}
1221		@$args = @leftovers;
1222		return $argref;
1223	}
1224
1225	##########################################################################
1226	# Return exclusive name
1227	sub GetExclusiveDirectory {
1228		my($self,$base,$id) = @_;
1229		my $xname = undef;
1230		foreach my $sfx (0..0xFFFF) {
1231			$xname = $base."/".$id;
1232			$xname .= ".$sfx" if $sfx != 0;
1233			unless(-e $xname) {
1234				return $xname;
1235			}
1236		}
1237		return undef;
1238	}
1239
1240	##########################################################################
1241	# Return exclusive name for a file
1242	sub GetExclusivePath {
1243		my($self, $basedir) = @_;
1244		my $dest = '';
1245		my $i    = 0;
1246		while(1) {
1247			$dest = sprintf("%s/%x-%x-%x.tmp_$i", $basedir, $$, int(rand(0xFFFFFF)), int(time()));
1248			return $dest if !(-e $dest);
1249		}
1250		return undef;
1251	}
1252
1253	##########################################################################
1254	# Return path to non-existing file within tempdir
1255	sub GetExclusiveTempfile {
1256		my($self) = @_;
1257		return $self->GetExclusivePath($self->GetTempdir);
1258	}
1259
1260	##########################################################################
1261	# Return path to non-existing directory within tempdir
1262	sub GetExclusiveTempdir {
1263		my($self,$id) = @_;
1264
1265		$id = time()."_".int(rand(0xFFFF)) unless defined $id;
1266		return $self->GetExclusiveDirectory($self->GetTempdir, $id);
1267	}
1268
1269
1270	##########################################################################
1271	# Return temp directory
1272	sub GetTempdir {
1273		my($self) = @_;
1274		return $self->{super}->Configuration->GetValue('workdir')."/tmp";
1275	}
1276
1277	##########################################################################
1278	# looping sysread implementation
1279	# *BSD doesn't like big LENGTH values on sysread
1280	# This provides a crappy warper to 'fix' this problem
1281	# syswrite() doesn't seem to suffer the same problem ...
1282	sub Sysread {
1283		my($self, $fh, $ref, $bytes_needed) = @_;
1284
1285		my $bytes_left = $bytes_needed;
1286		my $buff       = '';
1287
1288		$self->panic("Cannot read $bytes_needed bytes") if $bytes_needed < 0;
1289		while($bytes_left > 0) {
1290			my $br = sysread($fh, $buff, $bytes_left);
1291			if($br)             { ${$ref} .= $buff; $bytes_left -= $br; } # Data
1292			elsif(defined($br)) { last;                                 } # EOF
1293			else                { return undef;                         } # Error
1294		}
1295		return ($bytes_needed-$bytes_left);
1296	}
1297
1298	########################################################################
1299	# Decodes Compact IP-Chunks
1300	sub DecodeCompactIp {
1301		my($self, $compact_list) = @_;
1302		my @peers = ();
1303			for(my $i=0;$i<length($compact_list);$i+=6) {
1304				my $chunk = substr($compact_list, $i, 6);
1305				my($a,$b,$c,$d,$port) = unpack("CCCCn", $chunk);
1306				my $ip = "$a.$b.$c.$d";
1307				push(@peers, {ip=>$ip, port=>$port, peer_id=>""});
1308			}
1309		return @peers;
1310	}
1311
1312	########################################################################
1313	# Decodes IPv6 Chunks
1314	sub DecodeCompactIpV6 {
1315		my($self, $compact_list) = @_;
1316		my @peers = ();
1317			for(my $i=0;$i<length($compact_list);$i+=18) {
1318				my $chunk = substr($compact_list, $i, 18);
1319				my(@sx)   = unpack("nnnnnnnnn", $chunk);
1320				my $port  = pop(@sx);
1321				my $ip    = join(':',map(sprintf("%x", $_),@sx)); # Must match ExpandIpv6, otherwise babies will cry and cats might even die.
1322				push(@peers, {ip=>$ip, port=>$port, peer_id=>""});
1323			}
1324		return @peers;
1325	}
1326
1327	################################################################################################
1328	# Stolen from http://www.stonehenge.com/merlyn/UnixReview/col30.html
1329	sub DeepCopy {
1330		my($self,$this) = @_;
1331		if (not ref $this) {
1332			$this;
1333		} elsif (ref $this eq "ARRAY") {
1334			[map $self->DeepCopy($_), @$this];
1335		} elsif (ref $this eq "HASH") {
1336			+{map { $_ => $self->DeepCopy($this->{$_}) } keys %$this};
1337		} else { Carp::confess "what type is $_?" }
1338	}
1339
1340	################################################################################################
1341	# Decode bencoded data
1342	sub BencDecode {
1343		return Bitflu::Bencoder::decode($_[1]);
1344	}
1345
1346	################################################################################################
1347	# Serialize bencoded data
1348	sub BencEncode {
1349		return Bitflu::Bencoder::encode($_[1]);
1350	}
1351
1352	################################################################################################
1353	# Load file from disc and return both raw+decoded data
1354	sub BencfileToHash {
1355		my($self,$file) = @_;
1356
1357		open(BENC, "<", $file) or return {};
1358		my $buff = '';
1359		$self->Sysread(*BENC, \$buff, (2**25));
1360		close(BENC);
1361		return {} if (!defined($buff) or length($buff)==0); # File too short
1362
1363		my $decoded = $self->BencDecode($buff);
1364		return {} if ref($decoded) ne 'HASH';               # Decoding failed
1365
1366		return { content=>$decoded, raw_content=>$buff };   # All ok!
1367	}
1368
1369	################################################################################################
1370	# Guess eta for given hash
1371	sub GetETA {
1372		my($self, $key) = @_;
1373
1374		my $so      = $self->{super}->Storage->OpenStorage($key) or return undef;
1375		my $created = $so->GetSetting('createdat')               or return undef;
1376		my $stats   = $self->{super}->Queue->GetStats($key);
1377		my $age     = time()-$created;
1378
1379		if($age > 0) {
1380			my $not_done = $stats->{total_bytes} - $stats->{done_bytes};
1381			my $bps      = $stats->{done_bytes} / $age;
1382			   $bps      = $stats->{speed_download} if $stats->{speed_download} > $bps; # we are optimistic ;-)
1383			my $eta_sec  = ( $bps > 0 ? ($not_done / $bps) : undef );
1384			return $eta_sec;
1385		}
1386		# else
1387		return undef;
1388	}
1389
1390	################################################################################################
1391	# Convert number of seconds into something for humans
1392	sub SecondsToHuman {
1393		my($self,$sec) = @_;
1394
1395		if(!defined($sec)) {
1396			return 'inf.';
1397		}
1398		elsif($sec < 5) {
1399			return '-';
1400		}
1401		elsif($sec < 60) {
1402			return int($sec)." sec";
1403		}
1404		elsif($sec < 60*90) {
1405			return int($sec/60)."m";
1406		}
1407		elsif($sec < 3600*48) {
1408			return sprintf("%.1fh", $sec/3600);
1409		}
1410		elsif($sec < 86400*10) {
1411			return sprintf("%.1fd", $sec/86400);
1412		}
1413		elsif($sec < 86400*31) {
1414			return sprintf("%.1fw", $sec/86400/7);
1415		}
1416		else {
1417			return ">4w"
1418		}
1419
1420	}
1421
1422	sub warn   { my($self, $msg) = @_; $self->{super}->warn(ref($self).": ".$msg);  }
1423	sub debug  { my($self, $msg) = @_; $self->{super}->debug(ref($self).": ".$msg);  }
1424	sub stop   { my($self, $msg) = @_; $self->{super}->stop(ref($self).": ".$msg); }
1425
14261;
1427
1428
1429###############################################################################################################
1430# Bitflu Admin-Dispatcher : Release 20070319_1
1431package Bitflu::Admin;
1432
1433	##########################################################################
1434	# Guess what?
1435	sub new {
1436		my($class, %args) = @_;
1437		my $self = {super=> $args{super}, cmdlist => {}, notifylist => {}, complist=>[] };
1438		bless($self,$class);
1439		return $self;
1440	}
1441
1442	##########################################################################
1443	# Init plugin
1444	sub init {
1445		my($self) = @_;
1446		$self->RegisterCommand("help",    $self, 'admincmd_help'   , 'Displays what you are reading now',
1447		 [ [undef, "Use 'help' to get a list of all commands"], [undef, "Type 'help command' to get help about 'command'"] ]);
1448		$self->RegisterCommand("plugins",  $self, 'admincmd_plugins', 'Displays all loaded plugins');
1449		$self->RegisterCommand("useradmin",$self, 'admincmd_useradm', 'Create and modify accounts',
1450		 [ [undef, "Usage: useradmin [set username password] [delete username] [list]"] ]);
1451		$self->RegisterNotify($self, 'receive_notify');
1452		$self->RegisterCommand("log",  $self, 'admincmd_log', 'Display last log output',
1453		 [ [undef, "Usage: log [-limit]"], [undef, 'Example: log -10   # <-- displays the last 10 log entries'] ] );
1454		return 1;
1455	}
1456
1457	##########################################################################
1458	# Return logbuffer
1459	sub admincmd_log {
1460		my($self, @args) = @_;
1461		my @A       = ();
1462		my @log     = @{$self->{super}->{_LogBuff}};
1463		my $opts    = $self->{super}->Tools->GetOpts(\@args);
1464		my $limit   = int(((keys(%$opts))[0]) || 0);
1465		my $logsize = int(@log);
1466		my $logat   = ( $limit ? ( $logsize - $limit ) : 0 );
1467		my $i       = 0;
1468
1469		foreach my $ll (@log) {
1470			next if $i++ < $logat;
1471			chomp($ll);
1472			push(@A, [undef, $ll]);
1473		}
1474		return({MSG=>\@A, SCRAP=>[]});
1475	}
1476
1477	##########################################################################
1478	# Display registered plugins
1479	sub admincmd_plugins {
1480		my($self) = @_;
1481
1482
1483		my @A = ([1, "Known plugins:"]);
1484
1485		foreach my $pref (@{$self->{super}->{_Plugins}}) {
1486			push(@A, [undef, " ".$pref->{package}]);
1487		}
1488
1489		push(@A, [undef,''],[1,"Scheduler jobs:"]);
1490		foreach my $rref (values(%{$self->{super}->{_Runners}})) {
1491			push(@A, [undef, " ".$rref->{target}]);
1492		}
1493
1494		return({MSG=>\@A, SCRAP=>[]});
1495	}
1496
1497	##########################################################################
1498	# Notification handler, we are just going to print them out using the logging
1499	sub receive_notify {
1500		my($self,$msg) = @_;
1501		$self->info("#NOTIFICATION#: $msg");
1502	}
1503
1504	##########################################################################
1505	# BareBones help
1506	sub admincmd_help {
1507		my($self,$topic) = @_;
1508		my @A = ();
1509
1510		if($topic) {
1511			if(defined($self->GetCommands->{$topic})) {
1512				my @instances = @{$self->GetCommands->{$topic}};
1513
1514				foreach my $ci (@instances) {
1515					push(@A, [3, "Command '$topic' (Provided by plugin $ci->{class})"]);
1516					if($ci->{longhelp}) {
1517						push(@A, @{$ci->{longhelp}});
1518					}
1519					else {
1520						push(@A, [undef, $ci->{help}]);
1521					}
1522					push(@A, [undef, '']);
1523				}
1524			}
1525			else {
1526				push(@A, [2, "No help for '$topic', command does not exist"]);
1527			}
1528		}
1529		else {
1530			foreach my $xcmd (sort (keys %{$self->GetCommands})) {
1531				my $lb = sprintf("%-20s", $xcmd);
1532				my @hlps = ();
1533				foreach my $instance (@{$self->GetCommands->{$xcmd}}) {
1534					push(@hlps, "$instance->{help}");;
1535				}
1536
1537				$lb .= join(' / ',@hlps);
1538
1539				push(@A, [undef, $lb]);
1540			}
1541		}
1542
1543
1544		return({MSG=>\@A, SCRAP=>[]});
1545	}
1546
1547	##########################################################################
1548	# Handles useradm commands
1549	sub admincmd_useradm {
1550		my($self, @args) = @_;
1551
1552		my @A   = ();
1553		my $ERR = '';
1554
1555		my($cmd,$usr,$pass) = @args;
1556
1557		if($cmd eq 'set' && $pass) {
1558			$self->__useradm_modify(Inject => {User=>$usr, Pass=>$pass});
1559			push(@A, [1, "Useraccount updated"]);
1560		}
1561		elsif($cmd eq 'delete' && $usr) {
1562			if(defined $self->__useradm_modify->{$usr}) {
1563				# -> Account exists
1564				$self->__useradm_modify(Drop => {User=>$usr});
1565				push(@A, [1, "Account '$usr' removed"]);
1566				$self->panic("BUG") if (defined $self->__useradm_modify->{$usr}); # Paranoia check
1567			}
1568			else {
1569				push(@A, [2, "Account '$usr' does not exist"]);
1570			}
1571		}
1572		elsif($cmd eq 'list') {
1573			push(@A, [3, "Configured accounts:"]);
1574			foreach my $k (keys(%{$self->__useradm_modify})) {
1575				push(@A, [undef,$k]);
1576			}
1577		}
1578		else {
1579			$ERR .= "Usage error, type 'help useradmin' for more information";
1580		}
1581		return({MSG=>\@A, SCRAP=>[], NOEXEC=>$ERR});
1582	}
1583
1584	##########################################################################
1585	# Create password entry
1586	sub __useradm_mkentry {
1587		my($self,$usr,$pass) = @_;
1588		$usr =~ tr/: ;=//d;
1589		return undef if length($usr) == 0;
1590		return $usr.":".$self->{super}->Tools->sha1_hex("$usr;$pass");
1591	}
1592
1593	##########################################################################
1594	# Modify current setting
1595	sub __useradm_modify {
1596		my($self,%args) = @_;
1597		my @result    = ();
1598		my $allusr    = {};
1599		my $to_inject = '';
1600		my $delta     = 0;
1601		foreach my $entry (split(/;/,($self->{super}->Configuration->GetValue('useradm') || ''))) {
1602			if(my($user,$hash) = $entry =~ /^([^:]*):(.+)$/) {
1603				if ($user ne ($args{Inject}->{User} || '') && $user ne ($args{Drop}->{User} || '')) {
1604					push(@result,$entry);
1605				}
1606				else {
1607					$delta++;
1608				}
1609				$allusr->{$user} = $entry;
1610			}
1611			else {
1612				$self->warn("Useradmin: Wiping garbage entry: '$entry'");
1613			}
1614		}
1615
1616		if(exists($args{Inject}->{User})) {
1617			$to_inject = $args{Inject}->{User};
1618			$to_inject =~ tr/:; //d;
1619			$delta++;
1620		}
1621
1622		if(length($to_inject) > 0) {
1623			push(@result,$self->__useradm_mkentry($to_inject,$args{Inject}->{Pass}));
1624		}
1625
1626		$self->{super}->Configuration->SetValue('useradm', join(';', @result)) if $delta;
1627		return $allusr;
1628	}
1629
1630
1631	##########################################################################
1632	# Register Notify handler
1633	sub RegisterNotify {
1634		my($self, $xref, $xcmd) = @_;
1635		$self->debug("RegisterNotify: Will notify $xref via $xref->$xcmd");
1636		$self->{notifylist}->{$xref} = { class => $xref, cmd => $xcmd };
1637	}
1638
1639	##########################################################################
1640	# Send out notifications
1641	sub SendNotify {
1642		my($self,$msg) = @_;
1643		foreach my $kx (keys(%{$self->{notifylist}})) {
1644			my $nc = $self->{notifylist}->{$kx};
1645			my $class = $nc->{class}; my $cmd = $nc->{cmd};
1646			$class->$cmd($msg);
1647		}
1648	}
1649
1650	##########################################################################
1651	# Registers a new command to be used with ExecuteCommand
1652	sub RegisterCommand {
1653		my($self,$name,$xref,$xcmd,$helptext,$longhelp) = @_;
1654		$self->debug("RegisterCommand: Hooking $name to $xref->$xcmd");
1655		push(@{$self->{cmdlist}->{$name}}, {class=>$xref, cmd=>$xcmd, help=>$helptext, longhelp=>$longhelp});
1656		$helptext or $self->panic("=> $xcmd ; $xref");
1657	}
1658
1659	##########################################################################
1660	# Returns the full cmdlist
1661	sub GetCommands {
1662		my($self) = @_;
1663		return $self->{cmdlist};
1664	}
1665
1666	##########################################################################
1667	# Register as completion service
1668	sub RegisterCompletion {
1669		my($self,$xref,$xcmd) = @_;
1670		$self->debug("RegisterCompletition: Will ask $xref->$xcmd for completion");
1671		push(@{$self->{complist}}, { class=>$xref, cmd=>$xcmd });
1672	}
1673
1674	##########################################################################
1675	# Callss all completition services and returns the result
1676	sub GetCompletion {
1677		my($self,$hint) = @_;
1678		my @result = ();
1679		foreach my $xref (@{$self->{complist}}) {
1680			my $class = $xref->{class} or $self->panic("$xref has no class!");
1681			my $cmd   = $xref->{cmd}   or $self->panic("$xref has no cmd!");
1682			my @this  = $class->$cmd($hint);
1683			push(@result,@this);
1684		}
1685		return @result;
1686	}
1687
1688	##########################################################################
1689	# Execute a command!
1690	sub ExecuteCommand {
1691		my($self,$command,@args) = @_;
1692
1693
1694		my $plugin_hits  = 0;
1695		my $plugin_fails = 0;
1696		my $plugin_ok    = 0;
1697		my @plugin_msg   = ();
1698		my @plugin_nex   = ();
1699
1700		if(ref($self->GetCommands->{$command}) eq "ARRAY") {
1701			foreach my $ref (@{$self->GetCommands->{$command}}) {
1702				$plugin_hits++;
1703				my $class = $ref->{class};
1704				my $cmd   = $ref->{cmd};
1705				my $bref  = $class->$cmd(@args);
1706				my $SCRAP = $bref->{SCRAP}  or $self->panic("$class -> $cmd returned no SCRAP");
1707				my $MSG   = $bref->{MSG}    or $self->panic("$class -> $cmd returned no MSG");
1708				my $ERR   = $bref->{NOEXEC};
1709				@args     = @$SCRAP;
1710
1711				push(@plugin_msg, @$MSG);
1712
1713				if($ERR) {
1714					push(@plugin_nex, $ERR); # Plugin usage error
1715				}
1716				else {
1717					$plugin_ok++; # Plugin could do something
1718				}
1719			}
1720		}
1721
1722		if($plugin_hits == 0) {
1723			push(@plugin_msg, [2, "Unknown command '$command'"]);
1724			$plugin_fails++;
1725		}
1726		else {
1727			foreach my $leftover (@args) {
1728				push(@plugin_msg, [2, "Failed to execute '$command $leftover'"]);
1729				$plugin_fails++;
1730			}
1731
1732			if($plugin_ok == 0) {
1733				# Nothing executed, display all usage 'hints'
1734				foreach my $xerr (@plugin_nex) {
1735					push(@plugin_msg, [2, $xerr]);
1736					$plugin_fails++;
1737				}
1738			}
1739		}
1740		return({MSG=>\@plugin_msg, FAILS=>$plugin_fails});
1741	}
1742
1743	##########################################################################
1744	# Returns TRUE if Authentication was successful (or disabled (= no accounts))
1745	sub AuthenticateUser {
1746		my($self,%args) = @_;
1747		my $numentry = int(keys(%{$self->__useradm_modify}));
1748		return 1 if $numentry == 0; # No users, no security
1749
1750		my $expect = $self->__useradm_mkentry($args{User}, $args{Pass});
1751		my $have   = $self->__useradm_modify->{$args{User}};
1752
1753		if(defined($expect) && defined($have) && $have eq $expect ) {
1754			return 1;
1755		}
1756		else {
1757			return 0;
1758		}
1759	}
1760
1761
1762	sub warn  { my($self, $msg) = @_; $self->{super}->warn("Admin   : ".$msg);  }
1763	sub debug { my($self, $msg) = @_; $self->{super}->debug("Admin   : ".$msg); }
1764	sub info  { my($self, $msg) = @_; $self->{super}->info("Admin   : ".$msg);  }
1765	sub panic { my($self, $msg) = @_; $self->{super}->panic("Admin   : ".$msg); }
1766
17671;
1768
1769
1770###############################################################################################################
1771# Bitflu Network-IO Lib : Release 20090125_1
1772package Bitflu::Network;
1773
1774use strict;
1775use IO::Socket;
1776use POSIX;
1777use Danga::Socket;
1778use Hash::Util; # For chroot
1779
1780use constant DEVNULL      => '/dev/null';   # Path to /dev/null
1781use constant MAXONWIRE    => 1024*1024;     # Do not buffer more than 1mb per client connection
1782use constant BF_BUFSIZ    => 327680;         # How much we shall read()/recv() from a socket per run
1783use constant NI_SIXHACK   => 3;
1784use constant BPS_MIN      => 16;            # Minimal upload speed per socket
1785use constant NETSTATS     => 2;             # ReGen netstats each 2 seconds
1786use constant NETDEBUG     => 0;
1787use constant BLIST_LIMIT  => 1024;          # NeverEver blacklist more than 1024 IPs per instance
1788use constant BLIST_TTL    => 60*60;         # BL entries are valid for 1 hour
1789use constant DNS_BLIST    => 5;             # How long shall we blacklist 'bad' dns entries (NOTE: DNS_BLIST**rowfail !)
1790use constant DNS_BLTTL    => 60;            # Purge any older DNS-Blacklist entries
1791use constant MAGIC_DSNUM  => 1024*0.75;     # Don't ask me why, but 0.75 makes our downspeed guess much better
1792use constant KILL_IPV4    => 0;             # 'simulate' non-working ipv4 stack
1793
1794use fields qw( super NOWTIME avfds bpx_dn bpx_up _HANDLES _SOCKETS stagger up_q stats resolver_fail have_ipv6);
1795
1796	##########################################################################
1797	# Creates a new Networking Object
1798	sub new {
1799		my($class, %args) = @_;
1800		my $ptype = {super=> $args{super}, NOWTIME => 0, avfds => 0, bpx_up=>BPS_MIN, stagger=>{}, up_q=>{}, bpx_dn=>undef, _HANDLES=>{}, _SOCKETS=>{},
1801		             stats => { sent=>0, recv=>0, raw_recv=>0, raw_sent=>0}, resolver_fail=>{}, have_ipv6=>0 };
1802
1803		my $self = fields::new($class);
1804		map( $self->{$_} = delete($ptype->{$_}), keys(%$ptype) );
1805
1806		$self->SetTime; # Adds its own Danga::Socket timer
1807		$self->{avfds} = $self->TestFileDescriptors;
1808		$self->debug("Reserved $self->{avfds} file descriptors for networking");
1809
1810		## danga socket tests:
1811		# check if we have >= 1.52 .. everything below does not implement ->cancel for timers
1812		eval "use Danga::Socket 1.52; 1;";
1813		$self->stop("The installed version of Danga::Socket ($Danga::Socket::VERSION) is too old: bitflu will not work with this version.") if $@;
1814
1815		# Check if we can use ipv6 (if requested)
1816		if($self->{super}->Configuration->GetValue('ipv6')) {
1817			eval "use Danga::Socket 1.61; 1; "; # Check if at least 1.61 is installed
1818			if($@) {
1819				$self->warn("Danga::Socket 1.61 is required for IPv6 support.");
1820				$self->warn("Disabling IPv6 due to outdated Danga::Socked version");
1821			}
1822			else {
1823				eval {
1824					require IO::Socket::INET6;
1825					require Socket6;
1826					$self->{have_ipv6} = 1;
1827
1828					if( (my $isiv = $IO::Socket::INET6::VERSION) < 2.56 ) {
1829						$self->warn("Detected outdated version of IO::Socket::INET6 ($isiv)");
1830						$self->warn("Please upgrade to IO::Socket::INET6 >= 2.56 !");
1831						$self->warn("IPv6 might not work correctly with version $isiv");
1832					}
1833
1834				};
1835			}
1836		}
1837
1838		if(KILL_IPV4) {
1839			$self->warn("IPv4 support is *DISABLED*");
1840		}
1841
1842		return $self;
1843	}
1844
1845	##########################################################################
1846	# Register Admin commands
1847	sub init {
1848		my($self) = @_;
1849		$self->info("IPv6 support is ".($self->HaveIPv6 ? 'enabled' : 'not active'));
1850
1851		$self->{super}->AddRunner($self);
1852		$self->{super}->Admin->RegisterCommand('blacklist', $self, '_Command_Blacklist', 'Display current in-memory blacklist');
1853		$self->{super}->Admin->RegisterCommand('netstat',   $self, '_Command_Netstat',   'Display networking statistics');
1854		$self->{super}->Admin->RegisterCommand('dig',       $self, '_Command_Dig',       'Resolve a hostname');
1855		$self->{super}->CreateSxTask(Superclass=>$self,Callback=>'_UpdateNetstats', Interval=>NETSTATS, Args=>[]);
1856		return 1;
1857	}
1858
1859	sub run {
1860		my($self) = @_;
1861
1862
1863		my $ucnt = 4;
1864		foreach my $ukey (List::Util::shuffle(keys(%{$self->{up_q}}))) {
1865			my $aref = delete($self->{up_q}->{$ukey});
1866			$self->_WriteReal(@$aref);
1867			last if --$ucnt < 0;
1868		}
1869
1870
1871		my $ds = undef;
1872		foreach my $val (List::Util::shuffle(values(%{$self->{stagger}}))) {
1873			$ds = $val;
1874			last;
1875		}
1876
1877		if($ds && (!$self->{bpx_dn} or $self->{bpx_dn} > 0) ) {
1878				$self->_TCP_Read($ds) if $ds->sock;
1879		}
1880
1881		return 0; # Cannot use '1' due to deadlock :-)
1882	}
1883
1884	##########################################################################
1885	# Resolver debug
1886	sub _Command_Dig {
1887		my($self, $hostname) = @_;
1888		my @A = ();
1889		if($hostname) {
1890			push(@A, [1, "Resolver result for '$hostname'"]);
1891			foreach my $entry ($self->Resolve($hostname)) {
1892				push(@A, [0, "  $entry"]);
1893			}
1894		}
1895		else {
1896			push(@A, [2, "Usage: dig hostname"]);
1897		}
1898		return({MSG=>\@A, SCRAP=>[]});
1899	}
1900
1901	##########################################################################
1902	# Display blacklist information
1903	sub _Command_Blacklist {
1904		my($self) = @_;
1905
1906		my @A = ();
1907		foreach my $this_handle (keys(%{$self->{_HANDLES}})) {
1908			push(@A, [4, "Blacklist for '$this_handle'"]);
1909			my $count = 0;
1910			while( my($k,$v) = each(%{$self->{_HANDLES}->{$this_handle}->{blacklist}->{bldb}}) ) {
1911				my $this_ttl = $v - $self->GetTime;
1912				next if $this_ttl <= 0;
1913				push(@A, [2, sprintf(" %-24s (expires in %d seconds)", $k, $this_ttl) ]);
1914				$count++;
1915			}
1916			push(@A, [3, "$count ip(s) are blacklisted"], [undef, '']);
1917		}
1918		return({MSG=>\@A, SCRAP=>[]});
1919	}
1920
1921	##########################################################################
1922	# Netstat command
1923	sub _Command_Netstat {
1924		my($self) = @_;
1925
1926		my @A = ();
1927		my $staggered      = int(keys(%{$self->{stagger}}));
1928		my $sock_to_handle = {};
1929		map($sock_to_handle->{$_}=0           ,keys(%{$self->{_HANDLES}}));
1930		map($sock_to_handle->{$_->{handle}}++, values(%{$self->{_SOCKETS}}));
1931
1932		push(@A, [4, "Socket information"]);
1933		foreach my $this_handle (sort keys(%$sock_to_handle)) {
1934			my $hxref = $self->{_HANDLES}->{$this_handle};
1935			push(@A, [3, " Statistics for '$this_handle'"]);
1936			push(@A, [1, sprintf("  %-24s : %s", "Free sockets", (exists($hxref->{avpeers}) ? sprintf("%3d",$hxref->{avpeers}) : '  -') ) ]);
1937			push(@A, [1, sprintf("  %-24s : %3d", "Used sockets", $sock_to_handle->{$this_handle}) ]);
1938		}
1939
1940		push(@A, [0, '-' x 60]);
1941		push(@A, [0, sprintf(" >> Total: used=%d / watched=%d / free=%d / staggered=%d",int(keys(%{$self->{_SOCKETS}})), Danga::Socket->WatchedSockets(),$self->{avfds}, $staggered )]);
1942
1943		push(@A, [0,''],[4, "Resolver fail-list"]);
1944		while(my($k,$r) = each(%{$self->{resolver_fail}})) {
1945			push(@A, [2, sprintf(" %-32s : Row-Fails=>%3d, FirstFail=>%s", $k, $r->{rfail}, "".localtime($r->{first_fail}))]);
1946		}
1947
1948		return({MSG=>\@A, SCRAP=>[]});
1949	}
1950
1951
1952
1953	##########################################################################
1954	# Test how many filedescriptors this OS / env can handle
1955	sub TestFileDescriptors {
1956		my($self)   = @_;
1957		my $i       = 0;
1958		my @fdx     = ();
1959		my $sysr    = 32; # Reserve 32 FDs for OS & co.
1960		my $canhave = 0;
1961
1962		open(FAKE, DEVNULL) or $self->stop("Unable to open ".DEVNULL.": $!");
1963		close(FAKE);
1964
1965		while($i++ < 2048) { last unless( open($fdx[$i], DEVNULL) ); }
1966		if($i > $sysr) { $canhave = $i - $sysr; }
1967		else           { $self->panic("Sorry, bitfu can not run with only $i filedescriptors left"); }
1968
1969		while(--$i > 0) {
1970			close($fdx[$i]) or $self->panic("Unable to close TestFD # $i : $!");
1971		}
1972
1973		return $canhave;
1974	}
1975
1976	##########################################################################
1977	# Resolve hostnames
1978	sub Resolve {
1979		my($self,$name) = @_;
1980		my @iplist = ();
1981
1982		if($self->IsValidIPv4($name) or $self->IsValidIPv6($name)) {
1983			# Do not even try to resolve things looking like IPs
1984			push(@iplist,$name);
1985		}
1986		else {
1987			my $NOWTIME = $self->GetTime;                     # Current time guess
1988			my $blref   = $self->{resolver_fail}->{$name};    # Refernce to blacklist entry for this name (can be undef)
1989			my $is_bad  = 0;                                  # true = do not try to resolve / false = call resolver
1990
1991			if($blref && ($blref->{first_fail}+(DNS_BLIST**$blref->{rfail})) > $NOWTIME) {
1992				# -> We got an entry and it is still makred as a fail: skip resolver
1993				$self->warn("Resolve: won't resolve blacklisted DNS-Entry '$name'");
1994			}
1995			else {
1996				if($self->HaveIPv6) {
1997					my @addr_info = Socket6::getaddrinfo($name, defined);
1998					for(my $i=0;$i+3<int(@addr_info);$i+=5) {
1999						my ($addr,undef) = Socket6::getnameinfo($addr_info[$i+3], NI_SIXHACK);
2000						next if KILL_IPV4 && $self->IsNativeIPv4($addr);
2001						push(@iplist,$addr);
2002					}
2003				}
2004				else {
2005					my @result = gethostbyname($name);
2006					@iplist = map{ inet_ntoa($_) } @result[4..$#result];
2007				}
2008
2009				# Take care of resolver_fail:
2010				if(int(@iplist)==0) {
2011					# -> No A records? -> Mark entry as failed:
2012					$self->{resolver_fail}->{$name} ||= { first_fail=>$NOWTIME, rfail=>0 };  # Creates a new reference if empty...
2013					$self->{resolver_fail}->{$name}->{rfail}++;                              # Adjust row-fail count (used for timeout calculation)
2014
2015					# purge old entries (FIXME: can we use map?)
2016					while(my($xname,$xref)=each(%{$self->{resolver_fail}})) {
2017						delete($self->{resolver_fail}->{$xname}) if $xref->{first_fail}+DNS_BLTTL < $NOWTIME;
2018					}
2019
2020				}
2021				else {
2022					# -> Lookup was okay: delete fail-entry (if any)
2023					delete($self->{resolver_fail}->{$name});
2024				}
2025			}
2026		}
2027
2028		return List::Util::shuffle(@iplist);
2029	}
2030
2031	##########################################################################
2032	# Returns IP sorted by protocol
2033	sub ResolveByProto {
2034		my($self,$name) = @_;
2035
2036		my @iplist = $self->Resolve($name);
2037		my $list   = { 4=>[], 6=>[] };
2038		foreach my $ip (@iplist) {
2039			if($self->IsNativeIPv4($ip)) { push(@{$list->{4}},$ip) }
2040			if($self->IsNativeIPv6($ip)) { push(@{$list->{6}},$ip) }
2041		}
2042
2043		return $list;
2044	}
2045
2046	##########################################################################
2047	# Emulates getaddrinfo() in ipv6 mode
2048	sub _GetAddrFoo {
2049		my($self,$ip,$port,$af,$rqproto) = @_;
2050
2051		my ($family,$socktype,$proto,$sin) = undef;
2052
2053		$socktype = ($rqproto eq 'udp' ? SOCK_DGRAM : ($rqproto eq 'tcp' ? SOCK_STREAM : $self->panic("Invalid proto: $rqproto")) );
2054
2055		if($self->HaveIPv6) {
2056			($family, $socktype, $proto, $sin) = Socket6::getaddrinfo($ip,$port,$af,$socktype);
2057		}
2058		else {
2059			$family   = IO::Socket::AF_INET;
2060			# 17 is UDP // 6 is TCP
2061			$proto    = ($socktype == (SOCK_DGRAM) ? 17 : ($socktype == (SOCK_STREAM) ? 6 : $self->panic("Invalid socktype: $socktype")));
2062			eval { $sin = sockaddr_in($port,inet_aton($ip)); };
2063		}
2064		return($family,$socktype,$proto,$sin);
2065	}
2066
2067	##########################################################################
2068	# Set O_NONBLOCK on socket
2069	sub Unblock {
2070		my($self, $cfh) = @_;
2071		$self->panic("No filehandle given") unless $cfh;
2072		my $flags = fcntl($cfh, F_GETFL, 0)
2073								or return undef;
2074		fcntl($cfh, F_SETFL, $flags | O_NONBLOCK)
2075		or return undef;
2076		return 1;
2077	}
2078
2079	##########################################################################
2080	# Returns TRUE if we are running with IPv6 support
2081	sub HaveIPv6 {
2082		return $_[0]->{have_ipv6};
2083	}
2084
2085	##########################################################################
2086	# Returns TRUE if we are running with IPv4 support
2087	sub HaveIPv4 {
2088		return (KILL_IPV4 ? 0 : 1);
2089	}
2090
2091	##########################################################################
2092	# Returns TRUE if given string represents a valid IPv4 peeraddr
2093	sub IsValidIPv4 {
2094		my($self,$str) = @_;
2095		if(defined($str) && $str =~ /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/) {
2096			return 1;
2097		}
2098		return 0;
2099	}
2100
2101	##########################################################################
2102	# Returns TRUE if given string represents a valid IPv6 peeraddr
2103	sub IsValidIPv6 {
2104		my($self,$str) = @_;
2105		if(defined($str) && $str =~ /^[a-f0-9:]+$/i) { # This is a very BAD regexp..
2106			return 1;
2107		}
2108		return 0;
2109	}
2110
2111	##########################################################################
2112	# Convert Pseudo-IPv6 to real IPv4
2113	sub SixToFour {
2114		my($self,$str) = @_;
2115		if($str =~ /^::ffff:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})$/) {
2116			return $1;
2117		}
2118		return undef;
2119	}
2120
2121	##########################################################################
2122	# Returns TRUE if given IP is a 'real' IPv6 IP
2123	sub IsNativeIPv6 {
2124		my($self,$str) = @_;
2125
2126		if($self->IsValidIPv6($str) && !$self->SixToFour($str)) {
2127			return 1;
2128		}
2129		return 0;
2130	}
2131
2132	##########################################################################
2133	# Return TRUE if given string seems to be a *native* (non sixto4) ip
2134	sub IsNativeIPv4 {
2135		my($self,$str) = @_;
2136		return 1 if $str =~ /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/;
2137		return 0;
2138	}
2139
2140
2141	########################################################################
2142	# 'expands' a shorted IPv6 using sloppy code
2143	sub ExpandIpV6 {
2144		my($self,$ip) = @_;
2145		my $addrlen = 8;
2146		my @buff    = (0,0,0,0,0,0,0,0);
2147		my @ipend   = ();
2148		my $cnt     = 0;
2149		my $drp     = undef;
2150		foreach my $item (split(':',$ip)) {
2151			if(!defined $drp) {
2152				( $item eq '' ? ($drp=$cnt): ($buff[$cnt] = hex($item)) );
2153			}
2154			else {
2155				push(@ipend, hex($item));
2156			}
2157			last if ++$cnt == $addrlen;
2158		}
2159
2160		if(defined($drp)) {
2161			# We got some piggyback data:
2162			my $e_len  = int(@ipend);       # Number of items
2163			my $offset = $addrlen-$e_len;   # Offset to use
2164			if($offset >= 0) {
2165				for($offset..($addrlen-1)) {
2166					$buff[$_] = shift(@ipend);
2167				}
2168			}
2169		}
2170		return ( wantarray ? (@buff) : (join(':',map(sprintf("%x",$_),@buff))) ); # Do not change the sprintf() call as it must be consinstent with DecodeCompactIpV6
2171	}
2172
2173
2174	##########################################################################
2175	# Refresh buffered time
2176	sub SetTime {
2177		my($self) = @_;
2178		my $NOW = time();
2179
2180		if($NOW > $self->{NOWTIME}) {
2181			$self->{NOWTIME} = $NOW;
2182			$self->{bpx_dn} = ( $self->{super}->Configuration->GetValue('downspeed')*MAGIC_DSNUM || undef );
2183		}
2184		elsif($NOW < $self->{NOWTIME}) {
2185			$self->warn("Clock jumped backwards! Returning last known good time...");
2186		}
2187
2188		Danga::Socket->AddTimer(1, sub{$self->SetTime});
2189	}
2190
2191	##########################################################################
2192	# Returns buffered time
2193	sub GetTime {
2194		my($self) = @_;
2195		return $self->{NOWTIME};
2196	}
2197
2198	##########################################################################
2199	# Returns bandwidth statistics
2200	sub GetStats {
2201		my($self) = @_;
2202		return $self->{stats};
2203	}
2204
2205
2206	##########################################################################
2207	# Try to create a new listening socket
2208	# NewTcpListen(ID=>UniqueueRunnerId, Port=>PortToListen, Bind=>IPv4ToBind, Callbacks => {})
2209	sub NewTcpListen {
2210		my($self,%args) = @_;
2211
2212		my $handle_id = $args{ID} or $self->panic("No Handle ID?");
2213		my $maxpeers  = $args{MaxPeers};
2214		my $port      = $args{Port};
2215		my $bindto    = $args{Bind};
2216		my $cbacks    = $args{Callbacks};
2217		my $dnth      = $args{DownThrottle};
2218		my $socket    = 0;
2219
2220		if(exists($self->{_HANDLES}->{$handle_id})) {
2221			$self->panic("Cannot register multiple versions of handle_id $handle_id");
2222		}
2223		elsif($maxpeers < 1) {
2224			$self->panic("$handle_id: MaxPeers cannot be < 1 (is: $maxpeers)");
2225		}
2226		elsif($port) {
2227			my %sargs = (LocalPort=>$port, LocalAddr=>$bindto, Proto=>'tcp', ReuseAddr=>1, Listen=>1024);
2228			$socket = ( $self->HaveIPv6 ? IO::Socket::INET6->new(%sargs) : IO::Socket::INET->new(%sargs) ) or return undef;
2229		}
2230
2231		$self->{_HANDLES}->{$handle_id} = { lsock => $socket, downthrottle=>$dnth, cbacks=>$cbacks, avpeers=>$maxpeers, blacklist=>{pointer=>0,array=>[],bldb=>{}} };
2232
2233		if($socket) {
2234			my $dsock = Bitflu::Network::Danga->new(sock=>$socket, on_read_ready => sub { $self->_TCP_Accept(shift) } ) or $self->panic;
2235			$self->{_SOCKETS}->{$socket} = { dsock => $dsock, handle=>$handle_id };
2236		}
2237
2238		return $socket;
2239	}
2240
2241	sub NewUdpListen {
2242		my($self,%args) = @_;
2243		my $handle_id = $args{ID} or $self->panic("No Handle ID?");
2244		my $port      = $args{Port};
2245		my $bindto    = $args{Bind};
2246		my $cbacks    = $args{Callbacks};
2247		my $socket    = 0;
2248
2249		if(exists($self->{_HANDLES}->{$handle_id})) {
2250			$self->panic("Cannot register multiple versions of handle_id $handle_id");
2251		}
2252		elsif($port) {
2253			my %sargs = (LocalPort=>$port, LocalAddr=>$bindto, Proto=>'udp');
2254			$socket = ( $self->HaveIPv6 ? IO::Socket::INET6->new(%sargs) : IO::Socket::INET->new(%sargs) ) or return undef;
2255		}
2256
2257		$self->{_HANDLES}->{$handle_id} = { lsock => $socket, cbacks=>$cbacks, blacklist=>{pointer=>0,array=>[],bldb=>{}} };
2258
2259		if($socket) {
2260			my $dsock = Bitflu::Network::Danga->new(sock=>$socket, on_read_ready => sub { $self->_UDP_Read(shift) } ) or $self->panic;
2261			$self->{_SOCKETS}->{$socket} = { dsock => $dsock, handle=>$handle_id };
2262		}
2263
2264		return $socket;
2265	}
2266
2267	sub SendUdp {
2268		my($self, $socket, %args) = @_;
2269		my $ip   = $args{RemoteIp} or $self->panic("No IP given");
2270		my $port = $args{Port}     or $self->panic("No Port given");
2271		my $id   = $args{ID}       or $self->panic("No ID given");
2272		my $data = $args{Data};
2273
2274		if(KILL_IPV4 && $self->IsNativeIPv4($ip)) {
2275			$self->debug("udp: will not send data to ipv4 $ip");
2276			return undef;
2277		}
2278		else {
2279			my @af  = $self->_GetAddrFoo($ip,$port,AF_UNSPEC,'udp');
2280			my $sin = $af[3] or return undef;
2281			my $bs  = send($socket,$data,0,$sin);
2282			return $bs;
2283		}
2284	}
2285
2286	sub RemoveSocket {
2287		my($self,$handle_id,$sock) = @_;
2288		$self->debug("RemoveSocket($sock)") if NETDEBUG;
2289		my $sref = delete($self->{_SOCKETS}->{$sock}) or $self->panic("$sock was not registered?!");
2290		my $hxref= $self->{_HANDLES}->{$handle_id}    or $self->panic("No handle reference for $handle_id !");
2291		delete($self->{up_q}->{$sock});
2292		delete($self->{stagger}->{$sref->{dsock}});
2293		$sref->{dsock}->close;
2294		$self->{avfds}++;
2295		$hxref->{avpeers}++;
2296		return 1;
2297	}
2298
2299	sub WriteDataNow {
2300		my($self,$sock,$data) = @_;
2301		return $self->_WriteReal($sock,$data,1);
2302	}
2303
2304	sub WriteData {
2305		my($self,$sock,$data) = @_;
2306		return $self->_WriteReal($sock,$data,0);
2307	}
2308
2309	sub _WriteReal {
2310		my($self,$sock,$data,$fast) = @_;
2311
2312		my $sref         = $self->{_SOCKETS}->{$sock} or $self->panic("$sock has no _SOCKET entry!");
2313		my $this_len     = length($data);
2314
2315
2316		if($self->GetQueueLen($sock) > MAXONWIRE) {
2317			$self->warn("Buffer overrun for <$sock>: Too much unsent data!");
2318			return 1;
2319		}
2320
2321
2322		$sref->{writeq} .= $data;
2323		$sref->{qlen}   += $this_len;
2324
2325		if($fast==0 && exists($self->{up_q}->{$sock})) {
2326			# -> Still waiting for a timer
2327			$self->debug("$sock is throttled - waiting for a flush") if NETDEBUG;
2328		}
2329		else {
2330
2331			if($sref->{dsock}->{write_buf_size} == 0 && defined($sref->{dsock}->peer_ip_string)) { # peer_ip_string is needed on FreeBSD
2332				# Socket is empty and ready
2333				my $bpx_up         = ($fast? BF_BUFSIZ : $self->{bpx_up});
2334				my $sendable       = ($sref->{qlen} < $bpx_up ? $sref->{qlen} : $bpx_up );
2335				my $chunk          = substr($sref->{writeq},0,$sendable);
2336				$sref->{writeq}    = substr($sref->{writeq},$sendable);
2337				$sref->{qlen}     -= $sendable;
2338
2339				$self->{stats}->{raw_sent} += $sendable if $fast==0;
2340
2341				# Actually write data:
2342				$sref->{dsock}->write(\$chunk);
2343
2344				$self->debug("$sock has $sref->{qlen} bytes outstanding (sending: $sendable :: $fast ) ") if NETDEBUG;
2345
2346				if(!$sref->{dsock}->sock) {
2347					$self->debug("$sock went away while writing to it ($!) , scheduling kill timer");
2348					# Fake a 'connection timeout' -> This goes trough the whole kill-chain so it should be save
2349					Danga::Socket->AddTimer(0, sub { $self->_TCP_LazyClose($sref->{dsock},$sock); });
2350					return 1; # do not add a new timer (wouldn't hurt but it's of no use)
2351				}
2352
2353			}
2354
2355			if($self->GetQueueLen($sock)) {
2356				$self->{up_q}->{$sock} = [$sock,'',$fast];
2357			}
2358
2359		}
2360
2361		return 1;
2362	}
2363
2364
2365	##########################################################################
2366	# Returns TRUE if socket is an INCOMING connection
2367	sub IsIncoming {
2368		my($self,$socket) = @_;
2369		my $val = $self->{_SOCKETS}->{$socket}->{incoming};
2370		$self->panic("$socket has no incoming value!") unless defined($val);
2371		return $val;
2372	}
2373
2374	##########################################################################
2375	# Returns last IO for given socket
2376	sub GetLastIO {
2377		my($self,$socket) = @_;
2378		my $val = $self->{_SOCKETS}->{$socket}->{lastio};
2379		$self->panic("$socket has no lastio value!") unless defined($val);
2380		return $val;
2381	}
2382
2383	sub GetQueueFree {
2384		my($self,$sock) = @_;
2385		my $xfree = ((MAXONWIRE)-$self->GetQueueLen($sock));
2386		return ((MAXONWIRE)-$self->GetQueueLen($sock));
2387	}
2388
2389	sub GetQueueLen {
2390		my($self,$sock) = @_;
2391		my $sref = $self->{_SOCKETS}->{$sock} or $self->panic("$sock has no _SOCKET entry!");
2392		return $sref->{dsock}->{write_buf_size}+$sref->{qlen};
2393	}
2394
2395	sub NewTcpConnection {
2396		my($self,%args) = @_;
2397
2398		my $handle_id = $args{ID}                       or $self->panic("No Handle ID?");
2399		my $hxref     = $self->{_HANDLES}->{$handle_id} or $self->panic("No Handle reference for $handle_id");
2400		my $remote_ip = $args{RemoteIp};
2401		my $port      = $args{Port};
2402		my $new_sock  = undef;
2403
2404		if($self->{avfds} < 1) {
2405			return undef; # No more FDs :�-(
2406		}
2407		elsif($hxref->{avpeers} < 1) {
2408			return undef; # Handle is full
2409		}
2410
2411		if(exists($args{Hostname})) {
2412			# -> Resolve
2413			my @xresolved = $self->Resolve($args{Hostname});
2414			unless( ($remote_ip = $xresolved[0] ) ) {
2415				$self->warn("Cannot resolve $args{Hostname}");
2416				return undef;
2417			}
2418		}
2419
2420		if($self->IpIsBlacklisted($handle_id,$remote_ip)) {
2421			$self->debug("Won't connect to blacklisted IP $remote_ip");
2422			return undef;
2423		}
2424		if(KILL_IPV4 && $self->IsNativeIPv4($remote_ip)) {
2425			$self->debug("Will not connect to IPv4 addr $remote_ip");
2426			return undef;
2427		}
2428
2429		my($sx_family, $sx_socktype, $sx_proto, $sin) = $self->_GetAddrFoo($remote_ip,$port,AF_UNSPEC, 'tcp');
2430
2431		if(defined($sin) && socket($new_sock, $sx_family, $sx_socktype, $sx_proto) ) {
2432			$self->Unblock($new_sock) or $self->panic("Failed to unblock <$new_sock> : $!");
2433			connect($new_sock,$sin);
2434		}
2435		else {
2436			$self->warn("Unable to create socket for $remote_ip:$port (error: $!)");
2437			return undef;
2438		}
2439		my $new_dsock = Bitflu::Network::Danga->new(sock=>$new_sock, on_read_ready => sub { $self->_TCP_Read(shift); },
2440		                                            on_fclose=> sub { $self->_TCP_LazyClose(shift,shift) } ) or $self->panic;
2441		$self->{_SOCKETS}->{$new_sock} = { dsock => $new_dsock, peerip=>$remote_ip, handle=>$handle_id, incoming=>0, lastio=>$self->GetTime, writeq=>'', qlen=>0 };
2442		$self->{avfds}--;
2443		$hxref->{avpeers}--;
2444		$self->debug("<< $new_dsock -> $remote_ip ($new_sock)") if NETDEBUG;
2445		Danga::Socket->AddTimer(15, sub { $self->_TCP_LazyClose($new_dsock,$new_sock)  });
2446
2447		return $new_sock;
2448	}
2449
2450	sub _TCP_LazyClose {
2451		my($self,$dsock,$xglob) = @_;
2452		if( (!$dsock->sock or !$dsock->peer_ip_string) && exists($self->{_SOCKETS}->{$xglob} ) ) {
2453			$self->warn("<$xglob> is not connected yet, killing it : ".$dsock->sock) if NETDEBUG;
2454			my $sref      = $self->{_SOCKETS}->{$xglob} or $self->panic("<$xglob> is not registered!");
2455			my $handle_id = $sref->{handle}             or $self->panic("$xglob has no handle!");
2456			my $cbacks    = $self->{_HANDLES}->{$handle_id}->{cbacks};
2457			if(my $cbn = $cbacks->{Close}) { $handle_id->$cbn($xglob); }
2458			$self->RemoveSocket($handle_id,$xglob);
2459		}
2460	}
2461
2462	sub _TCP_Accept {
2463		my($self, $dsock) = @_;
2464
2465		my $new_sock  = $dsock->sock->accept;
2466		my $new_ip    = 0;
2467		my $handle_id = $self->{_SOCKETS}->{$dsock->sock}->{handle} or $self->panic("No handle id?");
2468		my $hxref     = $self->{_HANDLES}->{$handle_id}             or $self->panic("No handle reference for $handle_id");
2469		my $cbacks    = $hxref->{cbacks};
2470
2471		unless($new_sock) {
2472			$self->warn("accept() call failed?!");
2473		}
2474		elsif(! ($new_ip = $new_sock->peerhost) ) {
2475			$self->debug("No IP for $new_sock");
2476			$new_sock->close;
2477		}
2478		elsif( $self->SixToFour($new_ip) && ($new_ip = $self->SixToFour($new_ip)) && 0 ) {
2479			# nil -> convert $new_ip to native ipv4 and continue elsif in any case
2480		}
2481		elsif($self->IpIsBlacklisted($handle_id, $new_ip)) {
2482			$self->debug("Refusing incoming connection from blacklisted ip $new_ip");
2483			$new_sock->close;
2484		}
2485		elsif(KILL_IPV4 && $self->IsNativeIPv4($new_ip)) {
2486			$self->debug("Dropping new IPv4 connection from $new_ip");
2487			$new_sock->close;
2488		}
2489		elsif($self->{avfds} < 1) {
2490			$self->warn("running out of filedescriptors, refusing incoming TCP connection");
2491			$new_sock->close;
2492		}
2493		elsif($hxref->{avpeers} < 1) {
2494			$self->warn("$handle_id : is full: won't accept new peers");
2495			$new_sock->close;
2496		}
2497		elsif(!$self->Unblock($new_sock)) {
2498			$self->panic("Failed to unblock $new_sock : $!");
2499		}
2500		else {
2501			my $new_dsock = Bitflu::Network::Danga->new(sock=>$new_sock, on_read_ready => sub { $self->_TCP_Read(shift); },
2502			                                            on_fclose=> sub { $self->_TCP_LazyClose(shift,shift) } ) or $self->panic;
2503			$self->warn(">> ".$new_dsock->sock." -> ".$new_ip) if NETDEBUG;
2504			$self->{_SOCKETS}->{$new_dsock->sock} = { dsock => $new_dsock, peerip=>$new_ip, handle=>$handle_id, incoming=>1, lastio=>$self->GetTime, writeq=>'', qlen=>0 };
2505			$self->{avfds}--;
2506			$hxref->{avpeers}--;
2507			if(my $cbn = $cbacks->{Accept}) { $handle_id->$cbn($new_dsock->sock,$new_ip); }
2508		}
2509	}
2510
2511
2512	sub _TCP_Read {
2513		my($self, $dsock) = @_;
2514
2515		my $sref      = $self->{_SOCKETS}->{$dsock->sock} or $self->panic("Sock ".$dsock->sock." on $dsock not registered?");
2516		my $handle_id = $sref->{handle}                   or $self->panic("No handle id?");
2517		my $cbacks    = $self->{_HANDLES}->{$handle_id}->{cbacks};
2518		my $dnth      = $self->{_HANDLES}->{$handle_id}->{downthrottle};
2519		my $overflow  = ( defined($self->{bpx_dn}) && $self->{bpx_dn} < 1 ? 1 : 0 );
2520
2521
2522		my $rref = $dsock->read(BF_BUFSIZ);
2523
2524		if(!defined($rref)) {
2525			$self->debug("Reading from $dsock returned nothing, closing! -> ".$dsock->sock);
2526			if(my $cbn = $cbacks->{Close}) { $handle_id->$cbn($dsock->sock); }
2527			$self->RemoveSocket($handle_id,$dsock->sock);
2528		}
2529		else {
2530			my $len = length($$rref);
2531			$sref->{lastio}             = $self->GetTime;
2532			$self->{stats}->{raw_recv} += $len;
2533			$self->{bpx_dn}            -= $len if defined($self->{bpx_dn});
2534			$self->debug("RECV $len from ".$dsock->sock) if NETDEBUG;
2535
2536			# add for staggering if downthrottle is set.
2537			# note: must be BEFORE we do the callback! (cb could close the sock!)
2538			if($dnth) {
2539				if(exists($self->{stagger}->{$dsock})) {
2540					if($len==0 or (!$overflow && rand(10) > 7)) {
2541						delete($self->{stagger}->{$dsock}) or $self->panic;
2542						$dsock->watch_read(1);
2543					}
2544				}
2545				elsif($overflow) {
2546					$self->{stagger}->{$dsock} = $dsock;
2547					$dsock->watch_read(0);
2548				}
2549			}
2550
2551			if(my $cbn = $cbacks->{Data}) { $handle_id->$cbn($dsock->sock, $rref, $len); }
2552		}
2553	}
2554
2555	sub _UDP_Read {
2556		my($self,$dsock) = @_;
2557
2558		my $sock      = $dsock->sock               or $self->panic("No socket?");
2559		my $sref      = $self->{_SOCKETS}->{$sock} or $self->panic("$sock has no _SOCKETS entry!");
2560		my $handle_id = $sref->{handle}            or $self->panic("$sock has no handle in _SOCKETS!");
2561		my $cbacks    = $self->{_HANDLES}->{$handle_id}->{cbacks};
2562		my $new_ip    = '';
2563		my $new_port  = 0;
2564		my $buffer    = undef;
2565
2566		$sock->recv($buffer,BF_BUFSIZ);
2567
2568		if(!($new_ip = $sock->peerhost)) {
2569			# Weirdo..
2570			$self->debug("<$sock> had no peerhost, data dropped");
2571		}
2572		elsif(!($new_port = $sock->peerport)) {
2573			$self->debug("<$sock> had no peerport, data dropped");
2574		}
2575		elsif( $self->SixToFour($new_ip) && ($new_ip = $self->SixToFour($new_ip)) && 0 ) {
2576			# nil -> convert $new_ip to native ipv4 and continue elsif in any case
2577		}
2578		elsif(KILL_IPV4 && $self->IsNativeIPv4($new_ip)) {
2579			$self->debug("udp: dropping incoming datagram from $new_ip");
2580		}
2581		elsif(my $cbn = $cbacks->{Data}) {
2582				$handle_id->$cbn($sock, \$buffer, $new_ip, $new_port);
2583		}
2584	}
2585
2586	sub _UpdateNetstats {
2587		my($self) = @_;
2588
2589		# calculate our current up/download speed
2590		foreach my $kw (qw(sent recv)) {
2591			$self->{stats}->{$kw}       = ( ($self->{stats}->{"raw_$kw"}/NETSTATS)*2 + $self->{stats}->{$kw} ) / 3;
2592			$self->{stats}->{"raw_$kw"} = 0;
2593		}
2594
2595
2596		my $want_up  = $self->{super}->Configuration->GetValue('upspeed') * 1024;
2597		my $got_up   = ($self->{stats}->{sent} || 1);
2598		my $multiply = $want_up / $got_up;
2599		my $up_diff  = abs($want_up - $got_up);
2600
2601		if($up_diff < 500) { # good enough - let it be as it is
2602			$multiply = 1;
2603		}
2604		elsif($multiply > 20)  {  # we are way off - allow doubling our upspeed
2605			$multiply = 2;
2606		}
2607		elsif($multiply > 1.3) { # more or less ok - but don't go up too fast
2608			$multiply = 1.3;
2609		}
2610
2611		$self->debug("got=$got_up, want=$want_up, mp=$multiply, bps=$self->{bpx_up}, diff=$up_diff");
2612
2613		$self->{bpx_up} = int($self->{bpx_up} * $multiply);
2614		$self->{bpx_up} = BF_BUFSIZ if !$want_up or $self->{bpx_up} > BF_BUFSIZ;
2615		$self->{bpx_up} = BPS_MIN   if $self->{bpx_up} < BPS_MIN;
2616
2617
2618		return 1;
2619	}
2620
2621
2622	sub BlacklistIp {
2623		my($self, $handle_id, $this_ip, $ttl) = @_;
2624
2625		my $xbl = $self->{_HANDLES}->{$handle_id}->{blacklist} or $self->panic("$handle_id was not registered!");
2626		   $ttl = BLIST_TTL unless $ttl;
2627
2628		if($self->IsNativeIPv6($this_ip)) {
2629			$this_ip = $self->ExpandIpV6($this_ip);
2630		}
2631
2632		unless($self->IpIsBlacklisted($handle_id, $this_ip)) {
2633			my $pointer = ( $xbl->{pointer} >= BLIST_LIMIT ? 0 : $xbl->{pointer});
2634			# Ditch old entry
2635			my $oldkey = $xbl->{array}->[$pointer];
2636			defined($oldkey) and delete($xbl->{bldb}->{$oldkey});
2637			$xbl->{array}->[$pointer] = $this_ip;
2638			$xbl->{bldb}->{$this_ip}  = $self->GetTime + $ttl;
2639			$xbl->{pointer}           = 1+$pointer;
2640		}
2641	}
2642
2643	sub IpIsBlacklisted {
2644		my($self, $handle_id, $this_ip) = @_;
2645
2646		my $xbl = $self->{_HANDLES}->{$handle_id}->{blacklist} or $self->panic("$handle_id was not registered!");
2647
2648		if($self->IsNativeIPv6($this_ip)) {
2649			$this_ip = $self->ExpandIpV6($this_ip);
2650		}
2651
2652		if(exists($xbl->{bldb}->{$this_ip}) && $self->GetTime < $xbl->{bldb}->{$this_ip}) {
2653			return 1;
2654		}
2655		else {
2656			return 0;
2657		}
2658	}
2659
2660
2661	sub debug { my($self, $msg) = @_; $self->{super}->debug("Network : ".$msg); }
2662	sub info  { my($self, $msg) = @_; $self->{super}->info("Network : ".$msg);  }
2663	sub warn  { my($self, $msg) = @_; $self->{super}->warn("Network : ".$msg);  }
2664	sub panic { my($self, $msg) = @_; $self->{super}->panic("Network : ".$msg); }
2665	sub stop  { my($self, $msg) = @_; $self->{super}->stop("Network : ".$msg); }
26661;
2667
2668###############################################################################################################
2669# Danga-Socket event dispatcher
2670package Bitflu::Network::Danga;
2671	use strict;
2672	use base qw(Danga::Socket);
2673	use fields qw(on_read_ready on_error on_hup on_fclose);
2674
2675	sub new {
2676		my($self,%args) = @_;
2677		$self = fields::new($self) unless ref $self;
2678		$self->SUPER::new($args{sock});
2679
2680		foreach my $field (qw(on_read_ready on_error on_hup on_fclose)) {
2681			$self->{$field} = $args{$field} if $args{$field};
2682		}
2683
2684		$self->watch_read(1) if $args{on_read_ready}; # Watch out for read events
2685		return $self;
2686	}
2687
2688	sub event_read {
2689		my($self) = @_;
2690		if(my $cx = $self->{on_read_ready}) {
2691			return $cx->($self);
2692		}
2693	}
2694
2695	sub event_err {
2696		my($self) = @_;
2697		if(my $cx = ($self->{on_error}||$self->{on_read_ready})) {
2698			return $cx->($self);
2699		}
2700	}
2701
2702	sub event_hup {
2703		my($self) = @_;
2704		if(my $cx = ($self->{on_hup}||$self->{on_read_ready})) {
2705			return $cx->($self);
2706		}
2707	}
2708
2709	sub event_write {
2710		my($self) = @_;
2711		my $os = $self->sock;
2712		my $rv = $self->write(undef);
2713#		warn "!! ZERO RETURN WHILE WRITING TO $self - sock was $os , sock is ".$self->sock."\n" if !$rv;
2714		if(!$self->sock && (my $cx = ($self->{on_fclose}))) {
2715			return $cx->($self,$os);
2716		}
2717	}
2718
27191;
2720
2721
2722###############################################################################################################
2723# Builtin Configuration parser / class
2724package Bitflu::Configuration;
2725use strict;
2726
2727
2728	sub new {
2729		my($class, %args) = @_;
2730		my $self = { configuration_file => $args{configuration_file}, super=> $args{super} };
2731		bless($self, $class);
2732
2733		unless(-f $self->{configuration_file}) {
2734			warn("-> Creating configuration file '$self->{configuration_file}'");
2735			open(CFGH, ">", $self->{configuration_file}) or die("Unable to create $self->{configuration_file}: $!\n");
2736			close(CFGH);
2737		}
2738
2739		if (-f $self->{configuration_file}) {
2740			open(CFGH, "+<", $self->{configuration_file}) or die("Unable to open $self->{configuration_file} for writing: $!\n");
2741			$self->{configuration_fh} = *CFGH;
2742
2743			if($< == 0) {
2744				if($self->_IsWritableByNonRoot($self->{configuration_fh})) {
2745					warn "\tWARNING: You started $0 as root, but the configuration\n";
2746					warn "\tWARNING: file at '$self->{configuration_file}' is WRITABLE by\n";
2747					warn "\tWARNING: other users! You should fix the permissions of the\n";
2748					warn "\tWARNING: configuration file and it's directory.\n";
2749				}
2750			}
2751
2752			# Try to create a backup
2753			if( open(BKUP, ">", $self->{configuration_file}.".backup") ) {
2754				while(<CFGH>) { print BKUP; }
2755				close(BKUP);
2756			}
2757
2758		}
2759
2760		# Load the configuration ASAP to get logging working:
2761		$self->Load;
2762		return $self;
2763	}
2764
2765	sub init {
2766		my($self) = @_;
2767		$self->{super}->Admin->RegisterCommand('config', $self, '_Command_config', 'Configure bitflu while running. Type \'help config\' for more information',
2768		[ [undef, "Usage: config show|get key|set key"],
2769		  [undef, "config show          : Display contents of .bitflu.config"],
2770		  [undef, "config get key       : Display value of 'key'. Example: 'config get upspeed'"],
2771			[undef, "config set key foo   : Changes value of 'key' to 'foo'. Example: 'config set upspeed 45'"],
2772			[undef, ""],
2773			[1, "NOTE: Certain options (like telnet_port) would require a restart of $0 and cannot be changed"],
2774			[1, "      using the 'config' command. To edit such options you'll need to stop bitflu and edit .bitflu.config"],
2775			[1, "      using a text editor."],
2776		]
2777		);
2778		return 1;
2779	}
2780
2781	sub _Command_config {
2782		my($self,@args) = @_;
2783		my $msg    = undef;
2784		my $action = $args[0];
2785		my $key    = $args[1];
2786		my $value  = $args[2];
2787		my @A      = ();
2788		my $NOEXEC = '';
2789		if($action eq "show") {
2790			foreach my $k (sort keys(%{$self->{conf}})) {
2791				push(@A, [ ($self->IsRuntimeLocked($k) ? 3 : undef), sprintf("%-24s => %s",$k, $self->{conf}->{$k})]);
2792			}
2793		}
2794		elsif($action eq "get" && defined($key)) {
2795			my $xval = $self->GetValue($key);
2796			if(defined($xval)) {
2797				push(@A, [undef, "$key => $xval"]);;
2798			}
2799			else {
2800				push(@A, [2, "$key is not set"]);
2801			}
2802		}
2803		elsif($action eq "set" && defined($value)) {
2804			if(defined($self->GetValue($key))) {
2805				if($self->SetValue($key, $value)) { push(@A, [undef, "'$key' set to '$value'"]); $self->Save; }
2806				else                              { push(@A, [2, "Unable to change value of $key at runtime"]); }
2807			}
2808			else {
2809				push(@A, [2, "Option '$key' does not exist"]);
2810			}
2811		}
2812		else {
2813			$NOEXEC .= "Usage error, type 'help config' for more information";
2814		}
2815		return{MSG=>\@A, SCRAP=>[], NOEXEC=>$NOEXEC};
2816	}
2817
2818
2819	sub Load {
2820		my($self) = @_;
2821		$self->SetDefaults();
2822		if(defined(my $cfh = $self->{configuration_fh})) {
2823			seek($cfh,0,0) or $self->panic("Unable to seek to beginning");
2824			my $conf = $self->{super}->Tools->CBxToRef(join('', <$cfh>));
2825			while(my($k,$v) = each(%$conf)) {
2826				$self->{conf}->{$k} = $v;
2827			}
2828		}
2829
2830		# Remove obsoleted/ignored config settings
2831		foreach my $legacy_setting (qw(telnet_view)) {
2832			delete($self->{conf}->{$legacy_setting});
2833		}
2834
2835	}
2836
2837	sub SetDefaults {
2838		my($self) = @_;
2839		$self->{conf}->{plugindir}       = '/usr/local/share/bitflu/plugins';
2840		$self->{conf}->{pluginexclude}   = '';
2841		$self->{conf}->{workdir}         = "$ENV{HOME}/.bitflu.workdir";
2842		$self->{conf}->{upspeed}         = 35;
2843		$self->{conf}->{downspeed}       = 0;
2844		$self->{conf}->{loglevel}        = 5;
2845		$self->{conf}->{renice}          = 8;
2846		$self->{conf}->{logfile}         = '';
2847		$self->{conf}->{pidfile}         = '';
2848		$self->{conf}->{chdir}           = '';
2849		$self->{conf}->{history}         = 1;
2850		$self->{conf}->{ipv6}            = 1;
2851		$self->{conf}->{storage}         = 'StorageVFS';
2852		foreach my $opt (qw(ipv6 renice plugindir pluginexclude workdir logfile storage chdir pidfile)) {
2853			$self->RuntimeLockValue($opt);
2854		}
2855	}
2856
2857	sub Save {
2858		my($self) = @_;
2859		my $cfh = $self->{configuration_fh} or return undef;
2860		seek($cfh,0,0)   or $self->panic("Unable to seek to beginning");
2861		truncate($cfh,0) or $self->panic("Unable to truncate configuration file");
2862		print $cfh $self->{super}->Tools->RefToCBx($self->{conf});
2863		$cfh->autoflush(1);
2864	}
2865
2866	sub RuntimeLockValue {
2867		my($self,$xkey) = @_;
2868		return($self->{conf_setlock}->{$xkey} = 1);
2869	}
2870
2871	sub IsRuntimeLocked {
2872		my($self, $xkey) = @_;
2873		return(defined($self->{conf_setlock}->{$xkey}));
2874	}
2875
2876	sub GetKeys {
2877		my($self) = @_;
2878		return (sort keys(%{$self->{conf}}));
2879	}
2880
2881	sub GetValue {
2882		my($self,$xkey) = @_;
2883		return($self->{conf}->{$xkey});
2884	}
2885
2886	sub SetValue {
2887		my($self,$xkey,$xval) = @_;
2888		return undef if defined($self->{conf_setlock}->{$xkey});
2889		$self->{conf}->{$xkey} = $xval;
2890		$self->Save;
2891		return 1;
2892	}
2893
2894	sub _IsWritableByNonRoot {
2895		my($self,$file) = @_;
2896		my @stat = stat($file) or die "Could not stat '$file' : $!\n";
2897		return 1 if ($stat[2]&0002 or ($stat[5]!=0 && $stat[2]&0020) or ($stat[4] !=0) ); # uid can always chmod -> bad!
2898		return 0;
2899	}
2900
2901
2902	sub debug { my($self, $msg) = @_; $self->{super}->debug("Config  : ".$msg); }
2903	sub info  { my($self, $msg) = @_; $self->{super}->info("Config  : ".$msg);  }
2904	sub warn  { my($self, $msg) = @_; $self->{super}->warn("Config  : ".$msg);  }
2905	sub panic { my($self, $msg) = @_; $self->{super}->panic("Config  : ".$msg); }
2906
29071;
2908
2909################################################################################################
2910# Implements Simple-eXecution Tasks
2911package Bitflu::SxTask;
2912
2913	sub new {
2914		my($classname,%args) = @_;
2915		my $sx = {
2916		           __super_=> $args{__SUPER_},
2917		           interval=> (exists($args{Interval}) ? $args{Interval} : 1),
2918		           super   => $args{Superclass},
2919		           cback   => $args{Callback},
2920		           args    => $args{Args},
2921		         };
2922		bless($sx,$classname);
2923		$sx->{_} = \$sx;
2924		return $sx;
2925	}
2926
2927	sub run {
2928		my($self) = @_;
2929		my $cbx = $self->{cback};
2930		my $rv  = $self->{super}->$cbx(@{$self->{args}});
2931		$self->destroy if $rv == 0;
2932		return $self->{interval};
2933	}
2934
2935	sub destroy {
2936		my($self) = @_;
2937		$self->{__super_}->DestroySxTask($self);
2938	}
2939
29401;
2941
2942################################################################################################
2943# Implement os-specific syscalls.
2944package Bitflu::Syscall;
2945	use strict;
2946	use POSIX;
2947	use Config;
2948
2949	sub new {
2950		my($classname, %args) = @_;
2951
2952		my $self = { super=>$args{super}, sc=>{} };
2953		bless($self,$classname);
2954		return $self;
2955	}
2956
2957	sub init {
2958		my($self) = @_;
2959		# syscall 'prototype'
2960		my $syscalls = {
2961			'linux-x86_64'  => { fallocate=>{ NR=>285, pfx=>[0,0]     , pst=>[] }, statfs=>{ NR=>137, pack=>'Q', buff=>112, bsize=>1, total=>2, free=>4 }  },
2962			'linux-i386'    => { fallocate=>{ NR=>324, pfx=>[0,0,0]   , pst=>[0]}, statfs=>{ NR=>99 , pack=>'L', buff=>72,  bsize=>1, total=>2, free=>4 }  },
2963			'freebsd-amd64' => {                                                   statfs=>{ NR=>396, pack=>'Q', buff=>64,  bsize=>2, total=>4, free=>6 }  },
2964			'freebsd-i386'  => {                                                   statfs=>{ NR=>396, pack=>'Q', buff=>64,  bsize=>2, total=>4, free=>6 }  },
2965		};
2966
2967		# try to detect runtime environment:
2968		my(undef, undef, undef, undef, $arch) = POSIX::uname();
2969		my $osname  = $^O;
2970		my $os_spec = '';
2971		if ($osname eq "linux" or $osname eq "freebsd") {
2972			$arch    = "i386" if $arch =~ /^i[3456]86$/;
2973			$arch    = "i386" if ($arch eq 'x86_64' && $Config{ptrsize} == 4); #32bit perl on x86_64
2974			$os_spec = "$osname-$arch";
2975		}
2976
2977		$self->{sc} = ( $syscalls->{$os_spec} || {} );
2978
2979		$self->info("supported syscalls of $os_spec: ".( join(" ",keys(%{$self->{sc}})) || '(no syscalls supported!)' ));
2980
2981		return 1;
2982	}
2983
2984	##########################################################################
2985	# Implements fallocate() syscall
2986	sub fallocate {
2987		my($self, $fh, $size) = @_;
2988		my $rv = undef;
2989		if(my $scr = $self->{sc}->{fallocate}) {
2990			$rv = syscall($scr->{NR},fileno($fh), @{$scr->{pfx}}, $size, @{$scr->{pst}});
2991		}
2992		return $rv;
2993	}
2994
2995	##########################################################################
2996	# Implements 'statfs'
2997	sub statfs {
2998		my($self, $path) = @_;
2999
3000		my $rv = undef;
3001		if(my $scr = $self->{sc}->{statfs}) {
3002			my $buff  = '0' x $scr->{buff};
3003			if( syscall($scr->{NR}, $path, $buff) == 0 ) {
3004				my @res = unpack("$scr->{pack}*", $buff);
3005				$rv = { bytes_total=>$res[$scr->{bsize}]*$res[$scr->{total}], bytes_free=>$res[$scr->{bsize}]*$res[$scr->{free}] };
3006			}
3007		}
3008		return $rv;
3009	}
3010
3011	##########################################################################
3012	# Shortcut to stat current set 'workdir'
3013	sub statworkdir {
3014		my($self) = @_;
3015		return $self->statfs($self->{super}->Configuration->GetValue('workdir'));
3016	}
3017
3018
3019	sub warn   { my($self, $msg) = @_; $self->{super}->warn("Syscall : ".$msg);   }
3020	sub debug  { my($self, $msg) = @_; $self->{super}->debug("Syscall : ".$msg);  }
3021	sub info   { my($self, $msg) = @_; $self->{super}->info("Syscall : ".$msg);   }
3022
30231;
3024
3025
3026################################################################################################
3027# Bencoder lib
3028
3029package Bitflu::Bencoder;
3030	use strict;
3031	use constant DEBUG => 0;                    # debug decoder
3032	use constant MAX_STRSIZE => 1024*1024*5;    # max stringsize that the decoder will do
3033
3034
3035	sub decode {
3036		$_ = $_[0];
3037
3038		my $v = undef;
3039		goto PARSER_ERROR if length($_) == 0;
3040		$v = _decode();
3041
3042		PARSER_ERROR:
3043		return $v;
3044	}
3045
3046	sub encode {
3047		my($ref) = @_;
3048		Carp::confess("encode(undef) called") unless $ref;
3049		return _encode($ref);
3050	}
3051
3052
3053
3054	sub _encode {
3055		my($ref) = @_;
3056
3057		Carp::cluck() unless defined $ref;
3058
3059		my $encoded = undef;
3060		my $reftype = ref($ref);
3061
3062		if($reftype eq "HASH") {
3063			$encoded .= "d";
3064			foreach(sort keys(%$ref)) {
3065				$encoded .= length($_).":".$_;
3066				$encoded .= _encode($ref->{$_});
3067			}
3068			$encoded .= "e";
3069		}
3070		elsif($reftype eq "ARRAY") {
3071			$encoded .= "l";
3072			foreach(@$ref) {
3073				$encoded .= _encode($_);
3074			}
3075			$encoded .= "e";
3076		}
3077		elsif($ref =~ /^-?\d+$/) {
3078			$encoded .= "i".int($ref)."e";
3079		}
3080		else {
3081			# -> String
3082			$ref      = ${$ref} if $reftype eq "SCALAR"; # FORCED string
3083			$encoded .= length($ref).":".$ref;
3084		}
3085		return $encoded;
3086	}
3087
3088
3089	sub _decode {
3090
3091		print "ENTERING DX AT ".pos()."  "  if DEBUG;
3092		print ">> ".substr($_,pos(),1)."\n" if DEBUG;
3093
3094		if(m/\G\z/gc) {
3095			print "---- HIT EOF! ---\n" if DEBUG;
3096			goto PARSER_ERROR;
3097		}
3098		elsif(m/\G(0|[1-9]\d*):/gc) { # <LEN><:><VALUE>
3099			my $len = $1;
3100			my $pos = pos();
3101			my $v   = ( $len < MAX_STRSIZE ? substr($_, $pos, $len) : '' );
3102			pos()   = $pos+$len;
3103			goto PARSER_ERROR if length($v) != $len;
3104			return $v;
3105		}
3106		elsif(m/\Gd/gc) { #<d><DATA><e>
3107
3108			print "DICT AT ".pos()."\n" if DEBUG;
3109
3110			my $ref = {};
3111			until(m/\Ge/gc) {
3112				my $k = scalar(_decode());
3113				my $v = _decode();
3114				print "+ DICT: $k = $v\n" if DEBUG;
3115				$ref->{$k} = $v;
3116			}
3117			return $ref;
3118		}
3119		elsif(m/\Gl/gc) { #<l><DATA><e>
3120			my @list = ();
3121			print "ARRAY AT ".pos()."\n" if DEBUG;
3122
3123			until(m/\Ge/gc) {
3124				print "+ ARRAY AT ".pos()."\n" if DEBUG;
3125				push(@list, _decode() );
3126			}
3127			return \@list;
3128		}
3129		elsif(m/\Gi(0|-?[1-9]\d*)e/gc) { #<i><\d+><e>
3130			print "INTEGER AT ".pos()."\n" if DEBUG;
3131			return int($1);
3132		}
3133		else {
3134			print "No match at: ".pos()."\n" if DEBUG;
3135			goto PARSER_ERROR;
3136		}
3137
3138	}
3139
31401;
3141