1package Bitflu::SourcesBitTorrent; 2 3################################################################################################ 4# 5# This file is part of 'Bitflu' - (C) 2008-2011 Adrian Ulrich 6# 7# Released under the terms of The "Artistic License 2.0". 8# http://www.opensource.org/licenses/artistic-license-2.0.php 9# 10# 11# This plugin implements simple BitTorrent tracker (client!) support. 12# Note: This plugin does mess with the internals of Bitflu::DownloadBitTorrent ! 13# Maybe some kind person offers to rewrite this code mess? :-) 14# 15################################################################################################ 16 17use strict; 18use List::Util; 19use constant _BITFLU_APIVERSION => 20120529; 20use constant TORRENT_RUN => 3; # How often shall we check for work 21use constant TRACKER_TIMEOUT => 40; # How long do we wait for the tracker to drop the connection 22use constant TRACKER_MIN_INTERVAL => 360; # Minimal interval value for Tracker replys 23use constant TRACKER_SKEW => 20; # Avoid storm at startup 24 25use constant SBT_NOTHING_SENT_YET => 0; # => 'started' will be the next event sent to the tracker 26use constant SBT_SENT_START => 1; # => 'completed' will be the next event if we completed just now 27use constant SBT_SENT_COMPLETE => 2; # => download is done, do not send any events to tracker 28use constant MAX_ROWFAIL => 3; # How often can a tracker fail in a row? 29use constant PERTORRENT_TRACKERBL => '_trackerbl'; 30 31################################################################################################ 32# Register this plugin 33sub register { 34 my($class, $mainclass) = @_; 35 my $self = { super => $mainclass, bittorrent => undef, p_tcp=>undef, p_udp=>undef, 36 secret => int(rand(0xFFFFFF)), torrents => {} }; 37 bless($self,$class); 38 39 my $bindto = ($self->{super}->Configuration->GetValue('torrent_bind') || 0); # May be null 40 my $cproto = { torrent_trackerblacklist=>'', torrent_tracker_udpport=>6689, torrent_tracker_autoudp=>1 }; 41 42 foreach my $k (keys(%$cproto)) { 43 my $cval = $mainclass->Configuration->GetValue($k); 44 unless(defined($cval)) { 45 $mainclass->Configuration->SetValue($k, $cproto->{$k}); 46 } 47 } 48 49 $mainclass->Configuration->RuntimeLockValue('torrent_tracker_udpport'); 50 51 52 $self->{p_tcp} = Bitflu::SourcesBitTorrent::TCP->new(_super=>$self, Bind=>$bindto); 53 $self->{p_udp} = Bitflu::SourcesBitTorrent::UDP->new(_super=>$self, Bind=>$bindto, 54 Port=>$mainclass->Configuration->GetValue('torrent_tracker_udpport')); 55 56 $mainclass->AddRunner($self) or $self->panic("Unable to add runner"); 57 58 return $self; 59} 60 61################################################################################################ 62# Init plugin 63sub init { 64 my($self) = @_; 65 66 # Search DownloadBitTorrent hook: 67 my $hookit = $self->{super}->GetRunnerTarget('Bitflu::DownloadBitTorrent'); 68 69 if(defined($hookit)) { 70 $self->debug("Using '$hookit' to communicate with BitTorrent plugin."); 71 $self->{bittorrent} = $hookit; 72 $self->{bittorrent}->{super}->Admin->RegisterCommand('tracker' , $self, '_Command_Tracker', 'Displays information about tracker', 73 [ 74 [undef, "Usage: tracker queue_id [show | set TRACKERLIST | blacklist REGEXP ]" ], 75 [undef, ""], 76 [undef, "tracker queue_id show : Display tracker information"], 77 [undef, "tracker queue_id blacklist \.com : Skip all trackers matching /\\.com/ (<-- regexp!)"], 78 [undef, "tracker queue_id set TRACKERLIST : Change trackers of given torrent"], 79 [undef, "tracker queue_id set default : Revert to default trackerlist (provided by torrent)"], 80 [undef,""], 81 [1 ,"trackerlist example:"], 82 [1 , "',' seperates trackers, '!' forms groups. Also see 'help create_torrent'"], 83 [1 , ""], 84 [1 , "Example: Configure 3 Trackers: t1 , t2 and t3 (t1 and t2 are in the same group)"], 85 [3 , " bitflu> tracker queue_id set t1!t2,t3"], 86 ]); 87 return 1; 88 } 89 else { 90 $self->panic("Unable to find BitTorrent plugin!"); 91 } 92} 93 94 95 96################################################################################################ 97# Mainloop 98sub run { 99 my($self,$NOW) = @_; 100 101 my $cnt = 0; 102 foreach my $loading_torrent ($self->{bittorrent}->Torrent->GetTorrents) { 103 my $this_torrent = $self->{bittorrent}->Torrent->GetTorrent($loading_torrent); 104 105 if($this_torrent->IsPaused) { 106 # Skip paused torrent 107 } 108 elsif(!defined($self->{torrents}->{$loading_torrent})) { 109 # Cache data for new torrent 110 my $trackers = $self->GetTrackers($this_torrent); 111 112 my $r4 = { cttlist=>[], cstlist=>[], info_hash=>$loading_torrent, skip_until=>$NOW+($cnt++*TORRENT_RUN), last_query=>0, 113 tracker=>'', rowfail=>0, trackers=>$trackers, waiting=>0, timeout_at=>0, proto=>4 }; 114 my $r6 = $self->{super}->Tools->DeepCopy($r4); 115 $r6->{proto} = 6; 116 117 # Remove tracker for unsupported protocols: 118 $r4 = undef unless $self->{super}->Network->HaveIPv4; 119 $r6 = undef unless $self->{super}->Network->HaveIPv6; 120 121 $self->{torrents}->{$loading_torrent} = { v4=>$r4, v6=>$r6, stamp=>$NOW, kill=>0 }; 122 } 123 else { 124 # Just refresh the stamp 125 $self->{torrents}->{$loading_torrent}->{stamp} = $NOW; 126 } 127 } 128 129 130 # Loop for cached torrents 131 foreach my $this_torrent (List::Util::shuffle(keys(%{$self->{torrents}}))) { 132 my $xobj = $self->{torrents}->{$this_torrent}; 133 134 if($xobj->{stamp} != $NOW or $xobj->{kill}) { 135 # Whoops, this torrent vanished from main plugin -> drop it 136 $self->info("$this_torrent: Aborting tracker requests"); 137 138 foreach my $obj ($xobj->{v4}, $xobj->{v6}) { 139 next unless $obj; 140 $self->MarkTrackerAsBroken($obj); # fail and stop current activity (if any) 141 } 142 143 delete($self->{torrents}->{$this_torrent}); 144 } 145 else { 146 foreach my $obj ($xobj->{v4}, $xobj->{v6}) { 147 next unless $obj; 148 if($obj->{waiting}) { # Tracker has been contacted 149 if($obj->{timeout_at} < $NOW) { 150 $self->info("$this_torrent: IPv$obj->{proto} tracker '$obj->{tracker}' timed out"); 151 $self->MarkTrackerAsBroken($obj, Softfail=>1); 152 $obj->{skip_until} = $NOW + int( 5+rand(TRACKER_SKEW) )*$obj->{rowfail}; # fast retry 153 } 154 } 155 elsif($obj->{skip_until} > $NOW) { 156 # Nothing to do. 157 } 158 else { 159 $self->QueryTracker($obj); 160 } 161 } 162 } 163 } 164 165 return TORRENT_RUN; 166} 167 168################################################################################################ 169# Read currently configured trackers. 170# Fetches a list from torrent if _trackers is empty/does not exist 171sub GetTrackers { 172 my($self, $tref) = @_; 173 174 my $torrent_buffer = ($tref->Storage->GetSetting('_torrent') or ''); # Empty if this is a magnet link 175 my $tracker_buffer = ($tref->Storage->GetSetting('_trackers') or ''); # Cached/Configured trackerlist as bencoded string 176 my $tracker_hint = ($tref->Storage->GetSetting('_trackerhint') or ''); # Enforced trackers (set by Magnet links) 177 178 if(length($tracker_buffer) == 0) { 179 # No trackers (yet): try to grab some 180 my $taref = []; 181 182 if(length($tracker_hint) != 0) { 183 $self->debug("trackerhint string: $tracker_hint"); 184 foreach my $chunk (split("\x00", $tracker_hint)) { 185 push(@$taref, [ $chunk ]); 186 $self->debug(" -> $chunk"); 187 } 188 } 189 elsif(length($torrent_buffer) != 0) { 190 # -> This is a full torrent (= not magnet) - check announce list 191 $self->debug($tref->GetSha1.": no trackerfile: Reading data from raw torrent"); 192 my $decoded = $self->{super}->Tools->BencDecode($torrent_buffer); 193 194 if(exists($decoded->{'announce-list'}) && ref($decoded->{'announce-list'}) eq "ARRAY") { 195 $self->debug($tref->GetSha1." Using announce-list"); 196 $taref = $self->_LoadAnnounceList($decoded->{'announce-list'}); 197 } 198 199 # -> no tracker-list? try to get a single tracker 200 if(int(@$taref) == 0 && $decoded->{announce}) { 201 $self->debug($tref->GetSha1." Using announce"); 202 push(@$taref, [ scalar($decoded->{announce}) ]); 203 } 204 } 205 206 $self->StoreTrackers($tref, $taref); 207 # Reload _trackers 208 $tracker_buffer = ($tref->Storage->GetSetting('_trackers') or ''); 209 } 210 211 my $tlist_array = $self->_LoadAnnounceList($self->{super}->Tools->BencDecode($tracker_buffer)); 212 # must have at least one entry 213 push(@$tlist_array, ['']) if int(@$tlist_array) == 0; 214 215 return $tlist_array; 216} 217 218 219################################################################################################ 220# Load (and somewhat verify) an announce list 221sub _LoadAnnounceList { 222 my($self,$source) = @_; 223 my $target = []; 224 225 foreach my $this_item (@$source) { 226 next if ref($this_item) ne "ARRAY"; 227 push(@$target, [map( scalar($_||''), @$this_item)] ); 228 } 229 230 return $target; 231} 232 233 234 235################################################################################################ 236# Writeout _trackers file 237sub StoreTrackers { 238 my($self,$tref, $tracker_ref) = @_; 239 $self->debug($tref->GetSha1.": storing new trackerlist"); 240 my $benc = $self->{super}->Tools->BencEncode($tracker_ref); 241 $benc = '' if $benc eq 'le'; # '' and 'le' are the same (empty) -> normalize 242 $tref->Storage->SetSetting('_trackers',$benc); 243 return 1; 244} 245 246 247################################################################################################ 248# Build trackerlist (if needed) and contact a tracker 249sub QueryTracker { 250 my($self, $obj) = @_; 251 252 my $NOW = $self->{bittorrent}->{super}->Network->GetTime; 253 my $sha1 = $obj->{info_hash}; 254 $obj->{skip_until} = $NOW + TRACKER_MIN_INTERVAL; # Do not hammer the tracker if it closes the connection quickly 255 256 # This construct is used to select new trackers 257 if(int(@{$obj->{cttlist}}) == 0) { 258 # Fillup 259 $obj->{cttlist} = $self->{super}->Tools->DeepCopy($obj->{trackers}); 260 } 261 if(int(@{$obj->{cstlist}}) == 0) { 262 my @rnd = (List::Util::shuffle(@{shift(@{$obj->{cttlist}})})); 263 my @fixed = (); 264 my $autoudp = $self->{super}->Configuration->GetValue('torrent_tracker_autoudp'); 265 266 foreach my $this_tracker (@rnd) { 267 next if !$this_tracker; # Could be empty (funny torrent) 268 my($proto,$host,$port,$base) = $self->ParseTrackerUri({tracker=>$this_tracker}); 269 if($autoudp && $proto eq 'http') { push(@fixed, "udp://$host:$port/$base#bitflu-autoudp") } 270 push(@fixed, $this_tracker); 271 } 272 $obj->{cstlist} = \@fixed; 273 $self->debug("cstlist is: \n\t".join("\n\t",@fixed)); 274 } 275 unless($obj->{tracker}) { 276 # No selected tracker: get a newone 277 $obj->{tracker} = ( shift(@{$obj->{cstlist}}) || '' ); # Grab next tracker 278 $self->BlessTracker($obj); # Reset fails 279 280 if($obj->{tracker} =~ /#bitflu-autoudp$/) { 281 # -> only try once if in autoudp mode 282 map( $self->MarkTrackerAsBroken($obj,Softfail=>1), (1..MAX_ROWFAIL-1) ); 283 } 284 285 } 286 287 $self->ContactCurrentTracker($obj); 288} 289 290################################################################################################ 291# Concact tracker 292sub ContactCurrentTracker { 293 my($self, $obj) = @_; 294 295 my $NOW = $self->{bittorrent}->{super}->Network->GetTime; 296 my $blacklist = $self->GetTrackerBlacklist($obj); 297 my $sha1 = $obj->{info_hash} or $self->panic("No info hash"); 298 my $tracker = $obj->{tracker}; 299 300 if(length($tracker) == 0) { 301 $self->debug("$sha1: has currently no tracker"); 302 } 303 elsif(length($blacklist) && $tracker =~ /$blacklist/i) { 304 $self->debug("$sha1: Skipping blacklisted tracker '$tracker'"); 305 $self->MarkTrackerAsBroken($obj); 306 $obj->{skip_until} = $NOW + int(rand(TRACKER_SKEW)); 307 } 308 else { 309 my ($proto) = $self->ParseTrackerUri($obj); 310 # -> Not blacklisted 311 if($proto eq 'http') { 312 $obj->{timeout_at} = $NOW + TRACKER_TIMEOUT; # Set response timeout 313 $obj->{last_query} = $NOW; # Remember last querytime 314 $obj->{waiting} = $self->{p_tcp}->Start($obj); # Start request via tcp/http 315 } 316 elsif($proto eq 'udp') { 317 $obj->{timeout_at} = $NOW + TRACKER_TIMEOUT; # Set response timeout 318 $obj->{last_query} = $NOW; # Remember last querytime 319 $obj->{waiting} = $self->{p_udp}->Start($obj); # Start request via udp 320 } 321 else { 322 $self->info("$sha1: Protocol of tracker '$tracker' is not supported."); 323 $self->MarkTrackerAsBroken($obj); 324 } 325 } 326} 327 328################################################################################################ 329# Advance to next request and stop in-flight transactions 330sub MarkTrackerAsBroken { 331 my($self,$obj,%args) = @_; 332 333 my $softfail = ($args{Softfail} ? 1 : 0); 334 335 $self->debug("MarkTrackerAsBroken($self,$obj), waiting=$obj->{waiting}, softfail=$softfail"); 336 337 if($obj->{waiting}) { 338 $obj->{waiting}->Stop($obj); 339 $obj->{waiting} = 0; 340 } 341 342 if(++$obj->{rowfail} >= MAX_ROWFAIL or !$softfail) { 343 $obj->{tracker} = ''; 344 # rowfail will be reseted while selecting a new tracker 345 } 346} 347 348################################################################################################ 349# Mark current tracker as good 350sub BlessTracker { 351 my($self,$obj) = @_; 352 $self->debug("Blessing $obj->{tracker}"); 353 $obj->{rowfail} = 0; 354} 355 356################################################################################################ 357# Returns the trackerblacklist for given object 358sub GetTrackerBlacklist { 359 my($self, $obj) = @_; 360 my $tbl = ''; 361 my $sha1 = $obj->{info_hash} or $self->panic("$obj has no info_hash key!"); 362 363 if((my $torrent = $self->{bittorrent}->Torrent->GetTorrent($sha1))) { 364 $tbl = $torrent->Storage->GetSetting(PERTORRENT_TRACKERBL); 365 } 366 if(!defined($tbl) || length($tbl) == 0) { 367 $tbl = $self->{bittorrent}->{super}->Configuration->GetValue('torrent_trackerblacklist'); 368 } 369 return $tbl; 370} 371 372################################################################################################ 373# Parse an uri 374sub ParseTrackerUri { 375 my($self, $obj) = @_; 376 my ($proto,$host,undef,$port,undef,$base) = $obj->{tracker} =~ /^([^:]+):\/\/([^\/:]+)(:(\d+))?($|\/(.*)$)/; 377 $proto = lc($proto); 378 $host = lc($host); 379 $port ||= 80; 380 $base ||= ''; 381 382 $self->debug("ParseTrackerUri($obj->{tracker}) -> proto=$proto, host=$host, port=$port, base=$base"); 383 384 return($proto,$host,$port,$base); 385} 386 387################################################################################################ 388# Returns current tracker event 389sub GetTrackerEvent { 390 my($self,$obj) = @_; 391 my $sha1 = $obj->{info_hash} or $self->panic("No info_hash?"); 392 my $tobj = $self->{bittorrent}->Torrent->GetTorrent($sha1); 393 394 my $current_setting = int($tobj->Storage->GetSetting('_sbt_trackerstat') || SBT_NOTHING_SENT_YET); 395 396 if($current_setting == SBT_NOTHING_SENT_YET) { 397 return 'started'; 398 } 399 elsif($current_setting == SBT_SENT_START && $tobj->IsComplete) { 400 return 'completed'; 401 } 402 else { 403 return ''; 404 } 405} 406 407################################################################################################ 408# Go to next tracker event 409sub AdvanceTrackerEvent { 410 my($self,$obj) = @_; 411 my $sha1 = $obj->{info_hash} or $self->panic("No info_hash?"); 412 my $tobj = $self->{bittorrent}->Torrent->GetTorrent($sha1); 413 my $current_setting = $self->GetTrackerEvent($obj); 414 my $nsetting = 0; 415 416 if($current_setting eq 'started') { 417 $nsetting = SBT_SENT_START; 418 } 419 elsif($current_setting eq 'completed') { 420 $nsetting = SBT_SENT_COMPLETE; 421 } 422 423 $tobj->Storage->SetSetting('_sbt_trackerstat', $nsetting) if $nsetting != 0; 424} 425 426 427 428 429################################################################################################ 430# CLI Command 431sub _Command_Tracker { 432 my($self,@args) = @_; 433 434 my $sha1 = $args[0]; 435 my $cmd = $args[1]; 436 my $value = $args[2]; 437 my @MSG = (); 438 my @SCRAP = (); 439 my $NOEXEC = ''; 440 441 if(defined($sha1) && exists($self->{torrents}->{$sha1})) { 442 if(!defined($cmd) or $cmd =~ /^(show|list|debug)$/ ) { 443 my $xobj = $self->{torrents}->{$sha1}; 444 my $allt = ''; # List of all trackers 445 my $tdbg = ''; # Debug list of all trackers 446 foreach my $obj ($xobj->{v4}, $xobj->{v6}) { 447 next unless $obj; 448 push(@MSG, [3, "Trackers for $sha1 via IPv$obj->{proto}"]); 449 push(@MSG, [undef, "Next Query : ".localtime($obj->{skip_until})]); 450 push(@MSG, [undef, "Last Query : ".($obj->{last_query} ? localtime($obj->{last_query}) : 'Never contacted') ]); 451 push(@MSG, [($obj->{waiting}?2:1) ,"Waiting for response : ".($obj->{waiting}?"Yes":"No")]); 452 push(@MSG, [undef, "Current Tracker : $obj->{tracker}"]); 453 push(@MSG, [undef, "Fails : $obj->{rowfail}"]); 454 $allt = join(',', map( join('!',@$_), @{$obj->{trackers}})); # Doesn't matter if we use v4 or v6: it's the same anyway... 455 $tdbg = Data::Dumper::Dumper($obj->{trackers}) if ($cmd && $cmd eq 'debug'); 456 } 457 458 push(@MSG, [undef, "All Trackers : $allt"]); 459 push(@MSG, [undef, "Tracker Blacklist : ".$self->GetTrackerBlacklist({info_hash=>$sha1})]); # Ieks: API-Abuse 460 if($tdbg) { 461 $tdbg =~ s/\n/\r\n/gm; 462 push(@MSG, [3, $tdbg]); 463 } 464 } 465 elsif($cmd eq "set" && $value) { 466 # , seperates groups. ! seperates trackers 467 my @tlist = (); 468 if($value eq 'default') { 469 @tlist = (); # Empty array -> no tracker 470 push(@MSG, [1, "$sha1: Will use default trackerlist from torrent..."]); 471 } 472 else { 473 foreach my $tracker_group (split(/,/,$value)) { 474 my @tg_members = split(/!/,$tracker_group); 475 push(@tlist,\@tg_members); 476 } 477 } 478 479 $self->StoreTrackers($self->{bittorrent}->Torrent->GetTorrent($sha1), \@tlist); 480 $self->{torrents}->{$sha1}->{kill} = 1; 481 push(@MSG, [1, "$sha1: Trackerlist updated, reload will happen in a few seconds."]); 482 } 483 elsif($cmd eq "blacklist") { 484 if(my $torrent = $self->{bittorrent}->Torrent->GetTorrent($sha1)) { 485 $torrent->Storage->SetSetting(PERTORRENT_TRACKERBL, $value); 486 $self->{torrents}->{$sha1}->{kill} = 1; 487 push(@MSG, [1, "$sha1: Tracker blacklist set to '$value'"]); 488 } 489 else { 490 push(@SCRAP, $sha1); 491 $NOEXEC .= "$sha1: No such torrent"; 492 } 493 } 494 else { 495 push(@MSG, [2, "Unknown subcommand '$cmd'"]); 496 } 497 } 498 else { 499 $NOEXEC .= "Usage error or no such torrent. type 'help tracker' for more information"; 500 } 501 return({MSG=>\@MSG, SCRAP=>\@SCRAP, NOEXEC=>$NOEXEC}); 502} 503 504 505 506sub debug { my($self, $msg) = @_; $self->{super}->debug("Tracker : ".$msg); } 507sub info { my($self, $msg) = @_; $self->{super}->info("Tracker : ".$msg); } 508sub warn { my($self, $msg) = @_; $self->{super}->warn("Tracker : ".$msg); } 509sub panic { my($self, $msg) = @_; $self->{super}->panic("Tracker : ".$msg); } 510 5111; 512 513 514 515 516################################################################################################ 517 518 519 520package Bitflu::SourcesBitTorrent::TCP; 521 522 use strict; 523 use constant TRACKER_MAXPAYLOAD => 1024*256; # Tracker payload is limited to ~256 KB 524 525 ################################################################################################ 526 # Returns a new TCP-Object 527 sub new { 528 my($class, %args) = @_; 529 my $self = { _super=>$args{_super}, super=>$args{_super}->{super}, net=>{bind=>$args{Bind}, port=>0, sock=>undef }, 530 sockmap=>{} }; 531 bless($self,$class); 532 533 my $sock = $self->{super}->Network->NewTcpListen(ID=>$self, Bind=>$self->{net}->{bind}, Port=>$self->{net}->{port}, 534 DownThrottle=>0, 535 MaxPeers=>8, Callbacks => { Data =>'_Network_Data', 536 Close =>'_Network_Close' } ); 537 $self->{net}->{sock} = $sock; 538 return $self; 539 } 540 541 ################################################################################################ 542 # Starts a new request 543 sub Start { 544 my($self,$obj) = @_; 545 546 my($proto,$host,$port,$base) = $self->{_super}->ParseTrackerUri($obj); 547 548 my $sha1 = $obj->{info_hash} or $self->panic("No info_hash?"); 549 my $stats = $self->{super}->Queue->GetStats($sha1); 550 my $event = $self->{_super}->GetTrackerEvent($obj); 551 my $nextchar = "?"; 552 $nextchar = "&" if ($base =~ /\?/); 553 554 # Create good $key and $peer_id length 555 my $key = $self->_UriEscape(pack("H40",unpack("H40",$self->{_super}->{secret}.("x" x 20)))); 556 my $peer_id = $self->_UriEscape(pack("H40",unpack("H40",$self->{_super}->{bittorrent}->{CurrentPeerId}))); 557 558 # Assemble HTTP-Request 559 my $q = "GET /".$base.$nextchar."info_hash=".$self->_UriEscape(pack("H40",$obj->{info_hash})); 560 $q .= "&peer_id=".$peer_id; 561 $q .= "&port=".int($self->{super}->Configuration->GetValue('torrent_port')); 562 $q .= "&uploaded=".int($stats->{uploaded_bytes}); 563 $q .= "&downloaded=".int($stats->{done_bytes}); 564 $q .= "&left=".int($stats->{total_bytes}-$stats->{done_bytes}); 565 $q .= "&key=".$key; 566 $q .= "&event=$event" if $event; # BEP-0003 would allow empty events, but some trackers do not like that 567 $q .= "&compact=1"; 568 $q .= " HTTP/1.0\r\n"; 569 $q .= "User-Agent: Bitflu ".$self->{super}->GetVersionString."\r\n"; 570 $q .= "Host: $host:$port\r\n\r\n"; 571 572 $self->info("$sha1: Contacting $proto://$host:$port/$base via IPv$obj->{proto}..."); 573 574 my $remote_ip = $self->{super}->Network->ResolveByProto($host)->{$obj->{proto}}->[0]; 575 576 if($remote_ip) { 577 my $tsock = $self->{super}->Network->NewTcpConnection(ID=>$self, Port=>$port, RemoteIp=>$remote_ip, Timeout=>5); 578 if($tsock) { 579 $self->{super}->Network->WriteDataNow($tsock, $q) or $self->panic("Unable to write data to $tsock !"); 580 $self->{sockmap}->{$tsock} = { obj=>$obj, socket=>$tsock, buffer=>'' }; 581 } 582 else { 583 # Request will timeout -> tracker marked will be marked as broken 584 $self->warn("Failed to create a new connection to $host:$port : $!"); 585 } 586 } 587 else { 588 $self->debug("Failed to resolve IPv$obj->{proto} record for $host"); 589 } 590 591 return $self; 592 } 593 594 ################################################################################################ 595 # Append data to buffer (if still active) 596 sub _Network_Data { 597 my($self,$sock,$buffref,$blen) = @_; 598 if(exists($self->{sockmap}->{$sock}) && length($self->{sockmap}->{$sock}->{buffer}) < TRACKER_MAXPAYLOAD) { 599 $self->{sockmap}->{$sock}->{buffer} .= ${$buffref}; # append data if socket still active 600 } 601 } 602 603 ################################################################################################ 604 # Connection finished: Parse data and add new peers 605 sub _Network_Close { 606 my($self,$sock) = @_; 607 if(exists($self->{sockmap}->{$sock})) { 608 my $smap = $self->{sockmap}->{$sock}; 609 my $buffer = $smap->{buffer}; 610 my $obj = $smap->{obj} or $self->panic("Missing object!"); 611 my $sha1 = $obj->{info_hash} or $self->panic("No info_hash?"); 612 my $bobj = $self->{_super}->{bittorrent} or $self->panic("No BT-Object?"); 613 my @nnodes = (); # NewNodes 614 my $hdr_len = 0; # HeaderLength 615 my $decoded = undef; # Decoded data 616 my $failed = 0; # Did the tracker fail? 617 618 619 # Ditch existing HTTP-Header 620 foreach my $line (split(/\n/,$buffer)) { 621 $hdr_len += length($line)+1; # 1=\n 622 if($line eq "\r") { 623 last; # \n\r found 624 } 625 elsif(length($line) == 0) { 626 # -> Huh? We just got \n\n while reading the header. 627 # The tracker seems to speak some sort of brokish-HTTP! 628 $self->warn("$obj->{tracker} violates HTTP/1.0! Accepting broken HTTP header :-/"); 629 last; 630 } 631 } 632 633 if(length($buffer) > $hdr_len) { 634 $buffer = substr($buffer,$hdr_len); # Throws the http header away 635 $decoded = $self->{super}->Tools->BencDecode($buffer); 636 } 637 638 if(ref($decoded) ne "HASH") { 639 $self->info("$sha1: received invalid response from IPv$obj->{proto} tracker. (http_header_len=$hdr_len)"); 640 $failed = 1; 641 } 642 elsif(exists($decoded->{peers}) && ref($decoded->{peers}) eq "ARRAY") { 643 foreach my $cref (@{$decoded->{peers}}) { 644 push(@nnodes , { ip=> $cref->{ip}, port=> $cref->{port}, peer_id=> $cref->{'peer id'} } ); 645 } 646 } 647 elsif(exists($decoded->{peers6})) { 648 @nnodes = $self->{super}->Tools->DecodeCompactIpV6($decoded->{peers6}); 649 } 650 elsif(exists($decoded->{peers})) { 651 @nnodes = $self->{super}->Tools->DecodeCompactIp($decoded->{peers}); 652 } 653 654 if(exists($decoded->{'failure reason'})) { 655 # avoid messing up the terminal: 656 my $clean_fr = $decoded->{'failure reason'}; 657 $clean_fr =~ tr/\x20-\x7e/?/c; 658 $self->warn("$sha1: Error from tracker: $clean_fr"); 659 } 660 661 # Calculate new Skiptime 662 my $new_skip = $self->{super}->Network->GetTime + (abs(int($decoded->{interval}||0))); 663 my $old_skip = $obj->{skip_until}; 664 $obj->{skip_until} = ( $new_skip > $old_skip ? $new_skip : $old_skip ); # Set new skip_until time 665 $obj->{waiting} = 0; # No open transaction 666 delete($self->{sockmap}->{$sock}) or $self->panic; # Mark socket as down 667 668 if($bobj->Torrent->ExistsTorrent($sha1) && !$failed) { 669 # Torrent does still exist: add nodes 670 $bobj->Torrent->GetTorrent($sha1)->AddNewPeers(@nnodes); 671 $self->{_super}->AdvanceTrackerEvent($obj); 672 $self->{_super}->BlessTracker($obj); 673 $self->info("$sha1: IPv$obj->{proto} tracker returned ".int(@nnodes)." peers"); 674 } 675 elsif($failed) { 676 $self->{_super}->MarkTrackerAsBroken($obj, Softfail=>1) 677 } 678 679 } 680 } 681 682 ################################################################################################ 683 # Aborts in-flight transactions 684 sub Stop { 685 my($self,$obj) = @_; 686 687 foreach my $snam (keys(%{$self->{sockmap}})) { 688 if($self->{sockmap}->{$snam}->{obj} eq $obj) { 689 my $socket = $self->{sockmap}->{$snam}->{socket}; 690 $self->_Network_Close($socket); # cleans sockmap 691 $self->{super}->Network->RemoveSocket($self, $socket); # drop connection 692 } 693 } 694 695 } 696 697 ################################################################################################ 698 # Primitive Escaping 699 sub _UriEscape { 700 my($self,$string) = @_; 701 my $esc = undef; 702 foreach my $c (split(//,$string)) { 703 $esc .= sprintf("%%%02X",ord($c)); 704 } 705 return $esc; 706 } 707 708 sub debug { my($self, $msg) = @_; $self->{_super}->debug($msg); } 709 sub info { my($self, $msg) = @_; $self->{_super}->info($msg); } 710 sub warn { my($self, $msg) = @_; $self->{_super}->warn($msg); } 711 sub panic { my($self, $msg) = @_; $self->{_super}->panic($msg); } 712 7131; 714 715 716 717 718 719package Bitflu::SourcesBitTorrent::UDP; 720 use constant OP_CONNECT => 0; # Connection request 721 use constant OP_ANNOUNCE => 1; # IPv4 announce 722 use constant OP_ERROR => 3; # Error (only returned from tracker) 723 use constant OP_ANNOUNCE6 => 4; # IPv6 announce 724 ################################################################################################ 725 # Creates a new UDP object 726 sub new { 727 my($class, %args) = @_; 728 my $self = { _super=>$args{_super}, super=>$args{_super}->{super}, net=>{bind=>$args{Bind}, port=>$args{Port}, 729 sock=>undef }, tmap=>{}, ccache=>[{t=>0},{t=>0},{t=>0},{t=>0}] }; 730 bless($self,$class); 731 732 my $sock = $self->{super}->Network->NewUdpListen(ID=>$self, Bind=>$self->{net}->{bind}, Port=>$self->{net}->{port}, 733 Callbacks => { Data =>'_Network_Data' } ); 734 $self->{net}->{sock} = $sock or $self->panic("Failed to bind to $self->{net}->{bind}:$self->{net}->{port}: $!"); 735 return $self; 736 } 737 738 ################################################################################################ 739 # Send a connect() request to current tracker 740 sub Start { 741 my($self,$obj) = @_; 742 my $sha1 = $obj->{info_hash}; # Info Hash 743 my($proto,$host,$port,$base) = $self->{_super}->ParseTrackerUri($obj); # Parsed Tracker URI 744 my $ip = $self->{super}->Network->ResolveByProto($host)->{$obj->{proto}}->[0]; # IP Addr 745 my $tid = _GetFreeTxId(); # Obtain free Transaction ID 746 747 748 # Creates a new TransactionMap (tx) Object: 749 my $tx_obj = $self->{tmap}->{$tid} = { id=>$tid, obj => $obj, ip=>$ip, port=>$port, trackerid=>"IPv$obj->{proto}://$host:$port" }; 750 751 if($ip && $port) { 752 # -> Tracker is resolveable 753 my $con_id = $self->_GetConnectionId($tx_obj); # Do we have a connection id for this tracker? 754 if(defined($con_id)) { $self->_WriteAnnounceRequest($tx_obj,$con_id); } # Yes -> Send an announce request 755 else { $self->_WriteConnectionRequest($tx_obj); } # No -> Obtain a new connection_id first 756 } 757 return $self; 758 } 759 760 761 ################################################################################################ 762 # Find a random transaction id 763 # $tid will be 'something' if this loop ends. 764 # This isn't such a big problem: we will just add wrong ips to 765 # the a wrong peer (this will result in broken connections..) 766 sub _GetFreeTxId { 767 my($self) = @_; 768 my $tid = 0; 769 for(0..255){ 770 $tid = 1+int(rand(0xFFFFFE)); 771 last if !exists($self->{tmap}->{$tid}); 772 } 773 return $tid; 774 } 775 776 777 ################################################################################################ 778 # Changes the id of given tx_obj 779 sub _ChangeTransactionId { 780 my($self,$tx_obj) = @_; 781 782 my $new_id = _GetFreeTxId(); 783 my $old_id = $tx_obj->{id} or $self->panic("\$tx_obj has no id!"); 784 delete($self->{tmap}->{$old_id}) or $self->panic("Could not delete $old_id from tmap!"); 785 $tx_obj->{id} = $new_id; # Fixup internal id 786 $self->{tmap}->{$new_id} = $tx_obj; # Store in hash 787 return $new_id; # Return new id 788 } 789 790 ################################################################################################ 791 # Invalidate transaction of $obj 792 sub Stop { 793 my($self, $obj) = @_; 794 foreach my $trans_id (keys(%{$self->{tmap}})) { 795 my $t_obj = $self->{tmap}->{$trans_id}->{obj}; 796 if($t_obj eq $obj) { 797 delete($self->{tmap}->{$trans_id}); 798 last; 799 } 800 } 801 } 802 803 ################################################################################################ 804 # Returns true if torrent still exists in queue 805 sub _TorrentExists { 806 my($self, $obj) = @_; 807 return $self->{_super}->{bittorrent}->Torrent->ExistsTorrent($obj->{info_hash}); 808 } 809 810 ################################################################################################ 811 # Send a connection request to given tracker 812 sub _WriteConnectionRequest { 813 my($self,$tx_obj) = @_; 814 $self->info("$tx_obj->{obj}->{info_hash}: Validating connection to IPv$tx_obj->{obj}->{proto} tracker $tx_obj->{obj}->{tracker}"); 815 my $payload = pack("H16", "0000041727101980").pack("NN",OP_CONNECT,$tx_obj->{id}); 816 $self->{super}->Network->SendUdp($self->{net}->{sock}, ID=>$self, RemoteIp=>$tx_obj->{ip}, Port=>$tx_obj->{port}, Data=>$payload); 817 } 818 819 ################################################################################################ 820 # Send an announce request (=request peers) 821 sub _WriteAnnounceRequest { 822 my($self,$tx_obj,$con_id) = @_; 823 824 my $obj = $tx_obj->{obj}; # Tracker object 825 my $sha1 = $obj->{info_hash}; # Current info_hash 826 my $btobj = $self->{_super}->{bittorrent}; # BitTorrent object 827 828 if($self->_TorrentExists($obj)) { 829 $self->info("$sha1: Requesting new peers from $obj->{tracker}"); 830 my $t_port = int($self->{super}->Configuration->GetValue('torrent_port')); 831 my $t_key = $self->{_super}->{secret}; 832 my $t_pid = $btobj->{CurrentPeerId}; 833 my $t_stats = $self->{super}->Queue->GetStats($sha1); 834 my $t_estr = $self->{_super}->GetTrackerEvent($obj); 835 my $t_enum = undef; 836 my $opcode = OP_ANNOUNCE; 837 my $ipsize = "N"; 838 $t_enum = ($t_estr eq 'started' ? 2 : ($t_estr eq 'completed' ? 1 : 0 ) ); 839 840 841 if(0&&$self->{super}->Network->IsNativeIPv6($tx_obj->{ip})) { # Disabled -> Not implemented in opentracker (yet?) 842 $self->warn("Using IPv6 announce to $tx_obj->{ip}"); 843 $opcode = OP_ANNOUNCE6; 844 $ipsize = "H32"; 845 } 846 847 848 my $pkt = pack("H16NN",$con_id,$opcode,$tx_obj->{id}); # ConnectionId, Opcode, TransactionId 849 $pkt .= pack("H40",$sha1).$t_pid; # info_hash, peer-id (always 20) 850 $pkt .= pack("NN",0,$t_stats->{done_bytes}); # Downloaded 851 $pkt .= pack("NN",0,($t_stats->{total_bytes}-$t_stats->{done_bytes})); # Bytes left 852 $pkt .= pack("NN",0,$t_stats->{uploaded_bytes}); # Uploaded data 853 $pkt .= pack("N",$t_enum); # Event 854 $pkt .= pack($ipsize,0); # IP(0) 855 $pkt .= pack("NN",$t_key,50); # Secret, NumWant(50) 856 $pkt .= pack("n",$t_port); # Port used by BitTorrent 857 $self->{super}->Network->SendUdp($self->{net}->{sock}, ID=>$self, RemoteIp=>$tx_obj->{ip}, 858 Port=>$tx_obj->{port}, Data=>$pkt); 859 } 860 } 861 862 ################################################################################################ 863 # Stores a connection id in cache 864 sub _CacheConnectionId { 865 my($self,$tx_obj,$con_id) = @_; 866 my $trackerid = $tx_obj->{trackerid} or $self->panic; 867 868 $self->panic("$trackerid HAS a cached connection id!") if defined($self->_GetConnectionId($tx_obj)); 869 shift(@{$self->{ccache}}); 870 push(@{$self->{ccache}}, {t=>$self->{super}->Network->GetTime, trackerid=>$trackerid, id=>$con_id}); 871 } 872 873 ################################################################################################ 874 # Fetches a connection-id from cache, return undef on cache-miss 875 sub _GetConnectionId { 876 my($self,$tx_obj) = @_; 877 878 my $ttl = $self->{super}->Network->GetTime-60; # BEP-15 limits the ttl to 60 seconds 879 880 foreach my $cc (@{$self->{ccache}}) { 881 if($cc->{t} >= $ttl && $cc->{trackerid} eq $tx_obj->{trackerid}) { 882 return $cc->{id}; 883 } 884 } 885 return undef; 886 } 887 888 ################################################################################################ 889 # Handles incoming udp data 890 sub _Network_Data { 891 my($self,$sock,$buffref) = @_; 892 893 my $buffer = ${$buffref}; 894 my $bufflen = length($buffer); 895 896 if($bufflen >= 16) { 897 my($action,$trans_id,$con_id) = unpack("NNH16",$buffer); # Parse udp 'header' 898 899 if(exists($self->{tmap}->{$trans_id})) { 900 # -> We got an 'open' transaction 901 902 my $tx_obj = $self->{tmap}->{$trans_id}; # Transaction object 903 my $obj = $tx_obj->{obj}; # Tracker object 904 my $sha1 = $obj->{info_hash}; # Current info_hash 905 my $btobj = $self->{_super}->{bittorrent}; # BitTorrent object 906 my $NOW = $self->{super}->Network->GetTime; # Current timestamp 907 908 $obj->{waiting} or $self->panic("$trans_id was in non-wait state?!"); # paranoia check 909 910 if($action == OP_CONNECT && !defined($self->_GetConnectionId($tx_obj))) { 911 # -> Connect response received. send an announce request 912 $self->_CacheConnectionId($tx_obj,$con_id); 913 $self->_ChangeTransactionId($tx_obj); 914 $self->_WriteAnnounceRequest($tx_obj,$con_id); 915 } 916 elsif($action == OP_ANNOUNCE && $bufflen >= 20 && $self->_TorrentExists($obj)) { 917 # -> Announce-Response for existing torrent 918 919 my(undef,undef,$interval,$peercount,$seeders) = unpack("NNNNN",$buffer); 920 921 $self->{_super}->AdvanceTrackerEvent($obj); # Mark current event as 'sent' 922 $self->{_super}->BlessTracker($obj); # Mark current tracker as 'alive' 923 924 # Parse and add nodes 925 my @iplist = $self->{super}->Tools->DecodeCompactIp(substr($buffer,20)); 926 $btobj->Torrent->GetTorrent($sha1)->AddNewPeers(@iplist); 927 928 my $new_skip = $NOW + (abs(int($interval||0))); 929 my $old_skip = $obj->{skip_until}; 930 $obj->{skip_until} = ( $new_skip > $old_skip ? $new_skip : $old_skip ); # Set new skip_until time 931 $obj->{waiting} = 0; # No open transaction 932 $self->Stop($obj); # Mark request as completed (invalidate tmap entry) 933 934 $self->info("$sha1: Received ".int(@iplist)." peers (info: proto=IPv$obj->{proto} peers=$peercount seeders=$seeders)"); 935 } 936 elsif($action == OP_ERROR) { 937 # We will timeout after 40 seconds and retry 938 $self->info("$sha1: Tracker returned an error"); 939 } 940 else { 941 $self->debug("Ignoring udp-packet with length=$bufflen, action=$action"); # Could be a late connection-id response 942 } 943 } 944 else { 945 $self->info("Received udp-packet with invalid transaction-id ($trans_id), dropping data"); 946 } 947 } 948 } 949 950 sub debug { my($self, $msg) = @_; $self->{_super}->debug($msg); } 951 sub info { my($self, $msg) = @_; $self->{_super}->info($msg); } 952 sub warn { my($self, $msg) = @_; $self->{_super}->warn($msg); } 953 sub panic { my($self, $msg) = @_; $self->{_super}->panic($msg); } 9541; 955