1package Bitflu::SourcesBitTorrent;
2
3################################################################################################
4#
5# This file is part of 'Bitflu' - (C) 2008-2011 Adrian Ulrich
6#
7# Released under the terms of The "Artistic License 2.0".
8# http://www.opensource.org/licenses/artistic-license-2.0.php
9#
10#
11# This plugin implements simple BitTorrent tracker (client!) support.
12# Note: This plugin does mess with the internals of Bitflu::DownloadBitTorrent !
13#       Maybe some kind person offers to rewrite this code mess? :-)
14#
15################################################################################################
16
17use strict;
18use List::Util;
19use constant _BITFLU_APIVERSION   => 20120529;
20use constant TORRENT_RUN          => 3;        # How often shall we check for work
21use constant TRACKER_TIMEOUT      => 40;       # How long do we wait for the tracker to drop the connection
22use constant TRACKER_MIN_INTERVAL => 360;      # Minimal interval value for Tracker replys
23use constant TRACKER_SKEW         => 20;       # Avoid storm at startup
24
25use constant SBT_NOTHING_SENT_YET => 0;        # => 'started' will be the next event sent to the tracker
26use constant SBT_SENT_START       => 1;        # => 'completed' will be the next event if we completed just now
27use constant SBT_SENT_COMPLETE    => 2;        # => download is done, do not send any events to tracker
28use constant MAX_ROWFAIL          => 3;        # How often can a tracker fail in a row?
29use constant PERTORRENT_TRACKERBL => '_trackerbl';
30
31################################################################################################
32# Register this plugin
33sub register {
34	my($class, $mainclass) = @_;
35	my $self = { super => $mainclass, bittorrent => undef, p_tcp=>undef, p_udp=>undef,
36	             secret => int(rand(0xFFFFFF)), torrents => {} };
37	bless($self,$class);
38
39	my $bindto = ($self->{super}->Configuration->GetValue('torrent_bind') || 0); # May be null
40	my $cproto = { torrent_trackerblacklist=>'', torrent_tracker_udpport=>6689, torrent_tracker_autoudp=>1 };
41
42	foreach my $k (keys(%$cproto)) {
43		my $cval = $mainclass->Configuration->GetValue($k);
44		unless(defined($cval)) {
45			$mainclass->Configuration->SetValue($k, $cproto->{$k});
46		}
47	}
48
49	$mainclass->Configuration->RuntimeLockValue('torrent_tracker_udpport');
50
51
52	$self->{p_tcp} = Bitflu::SourcesBitTorrent::TCP->new(_super=>$self, Bind=>$bindto);
53	$self->{p_udp} = Bitflu::SourcesBitTorrent::UDP->new(_super=>$self, Bind=>$bindto,
54	                                                         Port=>$mainclass->Configuration->GetValue('torrent_tracker_udpport'));
55
56	$mainclass->AddRunner($self) or $self->panic("Unable to add runner");
57
58	return $self;
59}
60
61################################################################################################
62# Init plugin
63sub init {
64	my($self) = @_;
65
66	# Search DownloadBitTorrent hook:
67	my $hookit = $self->{super}->GetRunnerTarget('Bitflu::DownloadBitTorrent');
68
69	if(defined($hookit)) {
70		$self->debug("Using '$hookit' to communicate with BitTorrent plugin.");
71		$self->{bittorrent} = $hookit;
72		$self->{bittorrent}->{super}->Admin->RegisterCommand('tracker'  , $self, '_Command_Tracker', 'Displays information about tracker',
73		   [
74		   [undef, "Usage: tracker queue_id [show | set TRACKERLIST | blacklist REGEXP ]" ],
75		   [undef, ""],
76		   [undef, "tracker queue_id show             : Display tracker information"],
77		   [undef, "tracker queue_id blacklist \.com   : Skip all trackers matching /\\.com/ (<-- regexp!)"],
78		   [undef, "tracker queue_id set TRACKERLIST  : Change trackers of given torrent"],
79		   [undef, "tracker queue_id set default      : Revert to default trackerlist (provided by torrent)"],
80		   [undef,""],
81		   [1    ,"trackerlist example:"],
82		   [1    , "',' seperates trackers, '!' forms groups. Also see 'help create_torrent'"],
83		   [1    , ""],
84		   [1    , "Example: Configure 3 Trackers: t1 , t2 and t3 (t1 and t2 are in the same group)"],
85		   [3    , " bitflu> tracker queue_id set t1!t2,t3"],
86		   ]);
87		return 1;
88	}
89	else {
90		$self->panic("Unable to find BitTorrent plugin!");
91	}
92}
93
94
95
96################################################################################################
97# Mainloop
98sub run {
99	my($self,$NOW) = @_;
100
101	my $cnt = 0;
102	foreach my $loading_torrent ($self->{bittorrent}->Torrent->GetTorrents) {
103		my $this_torrent = $self->{bittorrent}->Torrent->GetTorrent($loading_torrent);
104
105		if($this_torrent->IsPaused) {
106			# Skip paused torrent
107		}
108		elsif(!defined($self->{torrents}->{$loading_torrent})) {
109			# Cache data for new torrent
110			my $trackers = $self->GetTrackers($this_torrent);
111
112			my $r4 = { cttlist=>[], cstlist=>[], info_hash=>$loading_torrent, skip_until=>$NOW+($cnt++*TORRENT_RUN), last_query=>0,
113			          tracker=>'', rowfail=>0, trackers=>$trackers, waiting=>0, timeout_at=>0, proto=>4 };
114			my $r6 = $self->{super}->Tools->DeepCopy($r4);
115			$r6->{proto} = 6;
116
117			# Remove tracker for unsupported protocols:
118			$r4 = undef unless $self->{super}->Network->HaveIPv4;
119			$r6 = undef unless $self->{super}->Network->HaveIPv6;
120
121			$self->{torrents}->{$loading_torrent} = { v4=>$r4, v6=>$r6, stamp=>$NOW, kill=>0 };
122		}
123		else {
124			# Just refresh the stamp
125			$self->{torrents}->{$loading_torrent}->{stamp} = $NOW;
126		}
127	}
128
129
130	# Loop for cached torrents
131	foreach my $this_torrent (List::Util::shuffle(keys(%{$self->{torrents}}))) {
132		my $xobj = $self->{torrents}->{$this_torrent};
133
134		if($xobj->{stamp} != $NOW or $xobj->{kill}) {
135			# Whoops, this torrent vanished from main plugin -> drop it
136			$self->info("$this_torrent: Aborting tracker requests");
137
138			foreach my $obj ($xobj->{v4}, $xobj->{v6}) {
139				next unless $obj;
140				$self->MarkTrackerAsBroken($obj); # fail and stop current activity (if any)
141			}
142
143			delete($self->{torrents}->{$this_torrent});
144		}
145		else {
146			foreach my $obj ($xobj->{v4}, $xobj->{v6}) {
147				next unless $obj;
148				if($obj->{waiting}) { # Tracker has been contacted
149					if($obj->{timeout_at} < $NOW) {
150						$self->info("$this_torrent: IPv$obj->{proto} tracker '$obj->{tracker}' timed out");
151						$self->MarkTrackerAsBroken($obj, Softfail=>1);
152						$obj->{skip_until} = $NOW + int( 5+rand(TRACKER_SKEW) )*$obj->{rowfail}; # fast retry
153					}
154				}
155				elsif($obj->{skip_until} > $NOW) {
156					# Nothing to do.
157				}
158				else {
159					$self->QueryTracker($obj);
160				}
161			}
162		}
163	}
164
165	return TORRENT_RUN;
166}
167
168################################################################################################
169# Read currently configured trackers.
170# Fetches a list from torrent if _trackers is empty/does not exist
171sub GetTrackers {
172	my($self, $tref) = @_;
173
174	my $torrent_buffer = ($tref->Storage->GetSetting('_torrent')     or '');   # Empty if this is a magnet link
175	my $tracker_buffer = ($tref->Storage->GetSetting('_trackers')    or '');   # Cached/Configured trackerlist as bencoded string
176	my $tracker_hint   = ($tref->Storage->GetSetting('_trackerhint') or '');   # Enforced trackers (set by Magnet links)
177
178	if(length($tracker_buffer) == 0) {
179		# No trackers (yet): try to grab some
180		my $taref = [];
181
182		if(length($tracker_hint) != 0) {
183			$self->debug("trackerhint string: $tracker_hint");
184			foreach my $chunk (split("\x00", $tracker_hint)) {
185				push(@$taref, [ $chunk ]);
186				$self->debug(" -> $chunk");
187			}
188		}
189		elsif(length($torrent_buffer) != 0) {
190			# -> This is a full torrent (= not magnet) - check announce list
191			$self->debug($tref->GetSha1.": no trackerfile: Reading data from raw torrent");
192			my $decoded = $self->{super}->Tools->BencDecode($torrent_buffer);
193
194			if(exists($decoded->{'announce-list'}) && ref($decoded->{'announce-list'}) eq "ARRAY") {
195				$self->debug($tref->GetSha1." Using announce-list");
196				$taref = $self->_LoadAnnounceList($decoded->{'announce-list'});
197			}
198
199			# -> no tracker-list? try to get a single tracker
200			if(int(@$taref) == 0 && $decoded->{announce}) {
201				$self->debug($tref->GetSha1." Using announce");
202				push(@$taref, [ scalar($decoded->{announce}) ]);
203			}
204		}
205
206		$self->StoreTrackers($tref, $taref);
207		# Reload _trackers
208		$tracker_buffer = ($tref->Storage->GetSetting('_trackers') or '');
209	}
210
211	my $tlist_array = $self->_LoadAnnounceList($self->{super}->Tools->BencDecode($tracker_buffer));
212	# must have at least one entry
213	push(@$tlist_array, ['']) if int(@$tlist_array) == 0;
214
215	return $tlist_array;
216}
217
218
219################################################################################################
220# Load (and somewhat verify) an announce list
221sub _LoadAnnounceList {
222	my($self,$source) = @_;
223	my $target = [];
224
225	foreach my $this_item (@$source) {
226		next if ref($this_item) ne "ARRAY";
227		push(@$target, [map( scalar($_||''), @$this_item)] );
228	}
229
230	return $target;
231}
232
233
234
235################################################################################################
236# Writeout _trackers file
237sub StoreTrackers {
238	my($self,$tref, $tracker_ref) = @_;
239	$self->debug($tref->GetSha1.": storing new trackerlist");
240	my $benc = $self->{super}->Tools->BencEncode($tracker_ref);
241	   $benc = '' if $benc eq 'le'; # '' and 'le' are the same (empty) -> normalize
242	$tref->Storage->SetSetting('_trackers',$benc);
243	return 1;
244}
245
246
247################################################################################################
248# Build trackerlist (if needed) and contact a tracker
249sub QueryTracker {
250	my($self, $obj) = @_;
251
252	my $NOW            = $self->{bittorrent}->{super}->Network->GetTime;
253	my $sha1           = $obj->{info_hash};
254	$obj->{skip_until} = $NOW + TRACKER_MIN_INTERVAL; # Do not hammer the tracker if it closes the connection quickly
255
256	# This construct is used to select new trackers
257	if(int(@{$obj->{cttlist}}) == 0) {
258		# Fillup
259		$obj->{cttlist} = $self->{super}->Tools->DeepCopy($obj->{trackers});
260	}
261	if(int(@{$obj->{cstlist}}) == 0) {
262		my @rnd = (List::Util::shuffle(@{shift(@{$obj->{cttlist}})}));
263		my @fixed = ();
264		my $autoudp = $self->{super}->Configuration->GetValue('torrent_tracker_autoudp');
265
266		foreach my $this_tracker (@rnd) {
267			next if !$this_tracker; # Could be empty (funny torrent)
268			my($proto,$host,$port,$base) = $self->ParseTrackerUri({tracker=>$this_tracker});
269			if($autoudp && $proto eq 'http') { push(@fixed, "udp://$host:$port/$base#bitflu-autoudp") }
270			push(@fixed, $this_tracker);
271		}
272		$obj->{cstlist} = \@fixed;
273		$self->debug("cstlist is: \n\t".join("\n\t",@fixed));
274	}
275	unless($obj->{tracker}) {
276		# No selected tracker: get a newone
277		$obj->{tracker} = ( shift(@{$obj->{cstlist}}) || '' );  # Grab next tracker
278		$self->BlessTracker($obj);                              # Reset fails
279
280		if($obj->{tracker} =~ /#bitflu-autoudp$/) {
281			# -> only try once if in autoudp mode
282			map( $self->MarkTrackerAsBroken($obj,Softfail=>1), (1..MAX_ROWFAIL-1) );
283		}
284
285	}
286
287	$self->ContactCurrentTracker($obj);
288}
289
290################################################################################################
291# Concact tracker
292sub ContactCurrentTracker {
293	my($self, $obj) = @_;
294
295	my $NOW       = $self->{bittorrent}->{super}->Network->GetTime;
296	my $blacklist = $self->GetTrackerBlacklist($obj);
297	my $sha1      = $obj->{info_hash} or $self->panic("No info hash");
298	my $tracker   = $obj->{tracker};
299
300	if(length($tracker) == 0) {
301		$self->debug("$sha1: has currently no tracker");
302	}
303	elsif(length($blacklist) && $tracker =~ /$blacklist/i) {
304		$self->debug("$sha1: Skipping blacklisted tracker '$tracker'");
305		$self->MarkTrackerAsBroken($obj);
306		$obj->{skip_until} = $NOW + int(rand(TRACKER_SKEW));
307	}
308	else {
309		my ($proto)   = $self->ParseTrackerUri($obj);
310		# -> Not blacklisted
311		if($proto eq 'http') {
312			$obj->{timeout_at} = $NOW + TRACKER_TIMEOUT;       # Set response timeout
313			$obj->{last_query} = $NOW;                         # Remember last querytime
314			$obj->{waiting}    = $self->{p_tcp}->Start($obj);  # Start request via tcp/http
315		}
316		elsif($proto eq 'udp') {
317			$obj->{timeout_at} = $NOW + TRACKER_TIMEOUT;       # Set response timeout
318			$obj->{last_query} = $NOW;                         # Remember last querytime
319			$obj->{waiting}    = $self->{p_udp}->Start($obj);  # Start request via udp
320		}
321		else {
322			$self->info("$sha1: Protocol of tracker '$tracker' is not supported.");
323			$self->MarkTrackerAsBroken($obj);
324		}
325	}
326}
327
328################################################################################################
329# Advance to next request and stop in-flight transactions
330sub MarkTrackerAsBroken {
331	my($self,$obj,%args) = @_;
332
333	my $softfail = ($args{Softfail} ? 1 : 0);
334
335	$self->debug("MarkTrackerAsBroken($self,$obj), waiting=$obj->{waiting}, softfail=$softfail");
336
337	if($obj->{waiting}) {
338		$obj->{waiting}->Stop($obj);
339		$obj->{waiting} = 0;
340	}
341
342	if(++$obj->{rowfail} >= MAX_ROWFAIL or !$softfail) {
343		$obj->{tracker} = '';
344		# rowfail will be reseted while selecting a new tracker
345	}
346}
347
348################################################################################################
349# Mark current tracker as good
350sub BlessTracker {
351	my($self,$obj) = @_;
352	$self->debug("Blessing $obj->{tracker}");
353	$obj->{rowfail} = 0;
354}
355
356################################################################################################
357# Returns the trackerblacklist for given object
358sub GetTrackerBlacklist {
359	my($self, $obj) = @_;
360	my $tbl  = '';
361	my $sha1 = $obj->{info_hash} or $self->panic("$obj has no info_hash key!");
362
363	if((my $torrent = $self->{bittorrent}->Torrent->GetTorrent($sha1))) {
364		$tbl = $torrent->Storage->GetSetting(PERTORRENT_TRACKERBL);
365	}
366	if(!defined($tbl) || length($tbl) == 0) {
367		$tbl = $self->{bittorrent}->{super}->Configuration->GetValue('torrent_trackerblacklist');
368	}
369	return $tbl;
370}
371
372################################################################################################
373# Parse an uri
374sub ParseTrackerUri {
375	my($self, $obj) = @_;
376	my ($proto,$host,undef,$port,undef,$base) = $obj->{tracker} =~ /^([^:]+):\/\/([^\/:]+)(:(\d+))?($|\/(.*)$)/;
377	$proto  = lc($proto);
378	$host   = lc($host);
379	$port ||= 80;
380	$base ||= '';
381
382	$self->debug("ParseTrackerUri($obj->{tracker}) -> proto=$proto, host=$host, port=$port, base=$base");
383
384	return($proto,$host,$port,$base);
385}
386
387################################################################################################
388# Returns current tracker event
389sub GetTrackerEvent {
390	my($self,$obj) = @_;
391	my $sha1     = $obj->{info_hash} or $self->panic("No info_hash?");
392	my $tobj     = $self->{bittorrent}->Torrent->GetTorrent($sha1);
393
394	my $current_setting = int($tobj->Storage->GetSetting('_sbt_trackerstat') || SBT_NOTHING_SENT_YET);
395
396	if($current_setting == SBT_NOTHING_SENT_YET) {
397		return 'started';
398	}
399	elsif($current_setting == SBT_SENT_START && $tobj->IsComplete) {
400		return 'completed';
401	}
402	else {
403		return '';
404	}
405}
406
407################################################################################################
408# Go to next tracker event
409sub AdvanceTrackerEvent {
410	my($self,$obj) = @_;
411	my $sha1            = $obj->{info_hash} or $self->panic("No info_hash?");
412	my $tobj            = $self->{bittorrent}->Torrent->GetTorrent($sha1);
413	my $current_setting = $self->GetTrackerEvent($obj);
414	my $nsetting        = 0;
415
416	if($current_setting eq 'started') {
417		$nsetting = SBT_SENT_START;
418	}
419	elsif($current_setting eq 'completed') {
420		$nsetting = SBT_SENT_COMPLETE;
421	}
422
423	$tobj->Storage->SetSetting('_sbt_trackerstat', $nsetting) if $nsetting != 0;
424}
425
426
427
428
429################################################################################################
430# CLI Command
431sub _Command_Tracker {
432	my($self,@args) = @_;
433
434	my $sha1   = $args[0];
435	my $cmd    = $args[1];
436	my $value  = $args[2];
437	my @MSG    = ();
438	my @SCRAP  = ();
439	my $NOEXEC = '';
440
441	if(defined($sha1) && exists($self->{torrents}->{$sha1})) {
442		if(!defined($cmd) or $cmd =~ /^(show|list|debug)$/ ) {
443			my $xobj = $self->{torrents}->{$sha1};
444			my $allt = ''; # List of all trackers
445			my $tdbg = ''; # Debug list of all trackers
446			foreach my $obj ($xobj->{v4}, $xobj->{v6}) {
447				next unless $obj;
448				push(@MSG, [3, "Trackers for $sha1 via IPv$obj->{proto}"]);
449				push(@MSG, [undef, "Next Query           : ".localtime($obj->{skip_until})]);
450				push(@MSG, [undef, "Last Query           : ".($obj->{last_query} ? localtime($obj->{last_query}) : 'Never contacted') ]);
451				push(@MSG, [($obj->{waiting}?2:1) ,"Waiting for response : ".($obj->{waiting}?"Yes":"No")]);
452				push(@MSG, [undef, "Current Tracker      : $obj->{tracker}"]);
453				push(@MSG, [undef, "Fails                : $obj->{rowfail}"]);
454				$allt = join(',', map( join('!',@$_), @{$obj->{trackers}})); # Doesn't matter if we use v4 or v6: it's the same anyway...
455				$tdbg = Data::Dumper::Dumper($obj->{trackers}) if ($cmd && $cmd eq 'debug');
456			}
457
458			push(@MSG, [undef, "All Trackers         : $allt"]);
459			push(@MSG, [undef, "Tracker Blacklist    : ".$self->GetTrackerBlacklist({info_hash=>$sha1})]); # Ieks: API-Abuse
460			if($tdbg) {
461				$tdbg =~ s/\n/\r\n/gm;
462				push(@MSG, [3, $tdbg]);
463			}
464		}
465		elsif($cmd eq "set" && $value) {
466			# , seperates groups. ! seperates trackers
467			my @tlist = ();
468			if($value eq 'default') {
469				@tlist = (); # Empty array -> no tracker
470				push(@MSG, [1, "$sha1: Will use default trackerlist from torrent..."]);
471			}
472			else {
473				foreach my $tracker_group (split(/,/,$value)) {
474					my @tg_members = split(/!/,$tracker_group);
475					push(@tlist,\@tg_members);
476				}
477			}
478
479			$self->StoreTrackers($self->{bittorrent}->Torrent->GetTorrent($sha1), \@tlist);
480			$self->{torrents}->{$sha1}->{kill} = 1;
481			push(@MSG, [1, "$sha1: Trackerlist updated, reload will happen in a few seconds."]);
482		}
483		elsif($cmd eq "blacklist") {
484			if(my $torrent = $self->{bittorrent}->Torrent->GetTorrent($sha1)) {
485				$torrent->Storage->SetSetting(PERTORRENT_TRACKERBL, $value);
486				$self->{torrents}->{$sha1}->{kill} = 1;
487				push(@MSG, [1, "$sha1: Tracker blacklist set to '$value'"]);
488			}
489			else {
490				push(@SCRAP, $sha1);
491				$NOEXEC .= "$sha1: No such torrent";
492			}
493		}
494		else {
495			push(@MSG, [2, "Unknown subcommand '$cmd'"]);
496		}
497	}
498	else {
499		$NOEXEC .= "Usage error or no such torrent. type 'help tracker' for more information";
500	}
501	return({MSG=>\@MSG, SCRAP=>\@SCRAP, NOEXEC=>$NOEXEC});
502}
503
504
505
506sub debug { my($self, $msg) = @_; $self->{super}->debug("Tracker : ".$msg); }
507sub info  { my($self, $msg) = @_; $self->{super}->info("Tracker : ".$msg);  }
508sub warn  { my($self, $msg) = @_; $self->{super}->warn("Tracker : ".$msg);  }
509sub panic { my($self, $msg) = @_; $self->{super}->panic("Tracker : ".$msg); }
510
5111;
512
513
514
515
516################################################################################################
517
518
519
520package Bitflu::SourcesBitTorrent::TCP;
521
522	use strict;
523	use constant TRACKER_MAXPAYLOAD   => 1024*256; # Tracker payload is limited to ~256 KB
524
525	################################################################################################
526	# Returns a new TCP-Object
527	sub new {
528		my($class, %args) = @_;
529		my $self = { _super=>$args{_super}, super=>$args{_super}->{super}, net=>{bind=>$args{Bind}, port=>0, sock=>undef },
530		             sockmap=>{} };
531		bless($self,$class);
532
533		my $sock = $self->{super}->Network->NewTcpListen(ID=>$self, Bind=>$self->{net}->{bind}, Port=>$self->{net}->{port},
534		                                                 DownThrottle=>0,
535		                                                 MaxPeers=>8, Callbacks => { Data  =>'_Network_Data',
536		                                                                             Close =>'_Network_Close' } );
537		$self->{net}->{sock} = $sock;
538		return $self;
539	}
540
541	################################################################################################
542	# Starts a new request
543	sub Start {
544		my($self,$obj) = @_;
545
546		my($proto,$host,$port,$base) = $self->{_super}->ParseTrackerUri($obj);
547
548		my $sha1     = $obj->{info_hash} or $self->panic("No info_hash?");
549		my $stats    = $self->{super}->Queue->GetStats($sha1);
550		my $event    = $self->{_super}->GetTrackerEvent($obj);
551		my $nextchar = "?";
552		   $nextchar = "&" if ($base =~ /\?/);
553
554		# Create good $key and $peer_id length
555		my $key      = $self->_UriEscape(pack("H40",unpack("H40",$self->{_super}->{secret}.("x" x 20))));
556		my $peer_id  = $self->_UriEscape(pack("H40",unpack("H40",$self->{_super}->{bittorrent}->{CurrentPeerId})));
557
558		# Assemble HTTP-Request
559		my $q  = "GET /".$base.$nextchar."info_hash=".$self->_UriEscape(pack("H40",$obj->{info_hash}));
560		   $q .= "&peer_id=".$peer_id;
561		   $q .= "&port=".int($self->{super}->Configuration->GetValue('torrent_port'));
562		   $q .= "&uploaded=".int($stats->{uploaded_bytes});
563		   $q .= "&downloaded=".int($stats->{done_bytes});
564		   $q .= "&left=".int($stats->{total_bytes}-$stats->{done_bytes});
565		   $q .= "&key=".$key;
566		   $q .= "&event=$event" if $event; # BEP-0003 would allow empty events, but some trackers do not like that
567		   $q .= "&compact=1";
568		   $q .= " HTTP/1.0\r\n";
569		   $q .= "User-Agent: Bitflu ".$self->{super}->GetVersionString."\r\n";
570		   $q .= "Host: $host:$port\r\n\r\n";
571
572		$self->info("$sha1: Contacting $proto://$host:$port/$base via IPv$obj->{proto}...");
573
574		my $remote_ip = $self->{super}->Network->ResolveByProto($host)->{$obj->{proto}}->[0];
575
576		if($remote_ip) {
577			my $tsock = $self->{super}->Network->NewTcpConnection(ID=>$self, Port=>$port, RemoteIp=>$remote_ip, Timeout=>5);
578			if($tsock) {
579				$self->{super}->Network->WriteDataNow($tsock, $q) or $self->panic("Unable to write data to $tsock !");
580				$self->{sockmap}->{$tsock} = { obj=>$obj, socket=>$tsock, buffer=>'' };
581			}
582			else {
583				# Request will timeout -> tracker marked will be marked as broken
584				$self->warn("Failed to create a new connection to $host:$port : $!");
585			}
586		}
587		else {
588			$self->debug("Failed to resolve IPv$obj->{proto} record for $host");
589		}
590
591		return $self;
592	}
593
594	################################################################################################
595	# Append data to buffer (if still active)
596	sub _Network_Data {
597		my($self,$sock,$buffref,$blen) = @_;
598		if(exists($self->{sockmap}->{$sock}) && length($self->{sockmap}->{$sock}->{buffer}) < TRACKER_MAXPAYLOAD) {
599			$self->{sockmap}->{$sock}->{buffer} .= ${$buffref}; # append data if socket still active
600		}
601	}
602
603	################################################################################################
604	# Connection finished: Parse data and add new peers
605	sub _Network_Close {
606		my($self,$sock) = @_;
607		if(exists($self->{sockmap}->{$sock})) {
608			my $smap    = $self->{sockmap}->{$sock};
609			my $buffer  = $smap->{buffer};
610			my $obj     = $smap->{obj}                     or $self->panic("Missing object!");
611			my $sha1    = $obj->{info_hash}                or $self->panic("No info_hash?");
612			my $bobj    = $self->{_super}->{bittorrent}    or $self->panic("No BT-Object?");
613			my @nnodes  = ();       # NewNodes
614			my $hdr_len = 0;        # HeaderLength
615			my $decoded = undef;    # Decoded data
616			my $failed  = 0;        # Did the tracker fail?
617
618
619			# Ditch existing HTTP-Header
620			foreach my $line (split(/\n/,$buffer)) {
621				$hdr_len += length($line)+1; # 1=\n
622				if($line eq "\r")      {
623					last; # \n\r found
624				}
625				elsif(length($line) == 0) {
626					# -> Huh? We just got \n\n while reading the header.
627					# The tracker seems to speak some sort of brokish-HTTP!
628					$self->warn("$obj->{tracker} violates HTTP/1.0! Accepting broken HTTP header :-/");
629					last;
630				}
631			}
632
633			if(length($buffer) > $hdr_len) {
634				$buffer  = substr($buffer,$hdr_len); # Throws the http header away
635				$decoded = $self->{super}->Tools->BencDecode($buffer);
636			}
637
638			if(ref($decoded) ne "HASH") {
639				$self->info("$sha1: received invalid response from IPv$obj->{proto} tracker. (http_header_len=$hdr_len)");
640				$failed = 1;
641			}
642			elsif(exists($decoded->{peers}) && ref($decoded->{peers}) eq "ARRAY") {
643				foreach my $cref (@{$decoded->{peers}}) {
644					push(@nnodes , { ip=> $cref->{ip}, port=> $cref->{port}, peer_id=> $cref->{'peer id'} } );
645				}
646			}
647			elsif(exists($decoded->{peers6})) {
648				@nnodes = $self->{super}->Tools->DecodeCompactIpV6($decoded->{peers6});
649			}
650			elsif(exists($decoded->{peers})) {
651				@nnodes = $self->{super}->Tools->DecodeCompactIp($decoded->{peers});
652			}
653
654			if(exists($decoded->{'failure reason'})) {
655				# avoid messing up the terminal:
656				my $clean_fr = $decoded->{'failure reason'};
657				$clean_fr =~ tr/\x20-\x7e/?/c;
658				$self->warn("$sha1: Error from tracker: $clean_fr");
659			}
660
661			# Calculate new Skiptime
662			my $new_skip = $self->{super}->Network->GetTime + (abs(int($decoded->{interval}||0)));
663			my $old_skip = $obj->{skip_until};
664			$obj->{skip_until} = ( $new_skip > $old_skip ? $new_skip : $old_skip ); # Set new skip_until time
665			$obj->{waiting}    = 0;                                                 # No open transaction
666			delete($self->{sockmap}->{$sock}) or $self->panic;                      # Mark socket as down
667
668			if($bobj->Torrent->ExistsTorrent($sha1) && !$failed) {
669				# Torrent does still exist: add nodes
670				$bobj->Torrent->GetTorrent($sha1)->AddNewPeers(@nnodes);
671				$self->{_super}->AdvanceTrackerEvent($obj);
672				$self->{_super}->BlessTracker($obj);
673				$self->info("$sha1: IPv$obj->{proto} tracker returned ".int(@nnodes)." peers");
674			}
675			elsif($failed) {
676				$self->{_super}->MarkTrackerAsBroken($obj, Softfail=>1)
677			}
678
679		}
680	}
681
682	################################################################################################
683	# Aborts in-flight transactions
684	sub Stop {
685		my($self,$obj) = @_;
686
687		foreach my $snam (keys(%{$self->{sockmap}})) {
688			if($self->{sockmap}->{$snam}->{obj} eq $obj) {
689				my $socket = $self->{sockmap}->{$snam}->{socket};
690				$self->_Network_Close($socket);                        # cleans sockmap
691				$self->{super}->Network->RemoveSocket($self, $socket); # drop connection
692			}
693		}
694
695	}
696
697	################################################################################################
698	# Primitive Escaping
699	sub _UriEscape {
700		my($self,$string) = @_;
701		my $esc = undef;
702		foreach my $c (split(//,$string)) {
703			$esc .= sprintf("%%%02X",ord($c));
704		}
705		return $esc;
706	}
707
708	sub debug { my($self, $msg) = @_; $self->{_super}->debug($msg); }
709	sub info  { my($self, $msg) = @_; $self->{_super}->info($msg);  }
710	sub warn  { my($self, $msg) = @_; $self->{_super}->warn($msg);  }
711	sub panic { my($self, $msg) = @_; $self->{_super}->panic($msg); }
712
7131;
714
715
716
717
718
719package Bitflu::SourcesBitTorrent::UDP;
720	use constant OP_CONNECT   => 0;  # Connection request
721	use constant OP_ANNOUNCE  => 1;  # IPv4 announce
722	use constant OP_ERROR     => 3;  # Error (only returned from tracker)
723	use constant OP_ANNOUNCE6 => 4;  # IPv6 announce
724	################################################################################################
725	# Creates a new UDP object
726	sub new {
727		my($class, %args) = @_;
728		my $self = { _super=>$args{_super}, super=>$args{_super}->{super}, net=>{bind=>$args{Bind}, port=>$args{Port},
729		             sock=>undef }, tmap=>{}, ccache=>[{t=>0},{t=>0},{t=>0},{t=>0}] };
730		bless($self,$class);
731
732		my $sock = $self->{super}->Network->NewUdpListen(ID=>$self, Bind=>$self->{net}->{bind}, Port=>$self->{net}->{port},
733		                                                            Callbacks => {  Data  =>'_Network_Data' } );
734		$self->{net}->{sock} = $sock or $self->panic("Failed to bind to $self->{net}->{bind}:$self->{net}->{port}: $!");
735		return $self;
736	}
737
738	################################################################################################
739	# Send a connect() request to current tracker
740	sub Start {
741		my($self,$obj) = @_;
742		my $sha1                     = $obj->{info_hash};                                                    # Info Hash
743		my($proto,$host,$port,$base) = $self->{_super}->ParseTrackerUri($obj);                               # Parsed Tracker URI
744		my $ip                       = $self->{super}->Network->ResolveByProto($host)->{$obj->{proto}}->[0]; # IP Addr
745		my $tid                      = _GetFreeTxId();                                                       # Obtain free Transaction ID
746
747
748		# Creates a new TransactionMap (tx) Object:
749		my $tx_obj = $self->{tmap}->{$tid} = { id=>$tid, obj => $obj, ip=>$ip, port=>$port, trackerid=>"IPv$obj->{proto}://$host:$port" };
750
751		if($ip && $port) {
752			# -> Tracker is resolveable
753			my $con_id = $self->_GetConnectionId($tx_obj);                          # Do we have a connection id for this tracker?
754			if(defined($con_id)) { $self->_WriteAnnounceRequest($tx_obj,$con_id); } # Yes -> Send an announce request
755			else                 { $self->_WriteConnectionRequest($tx_obj);       } # No  -> Obtain a new connection_id first
756		}
757		return $self;
758	}
759
760
761	################################################################################################
762	# Find a random transaction id
763	# $tid will be 'something' if this loop ends.
764	# This isn't such a big problem: we will just add wrong ips to
765	# the a wrong peer (this will result in broken connections..)
766	sub _GetFreeTxId {
767		my($self) = @_;
768		my $tid = 0;
769		for(0..255){
770			$tid = 1+int(rand(0xFFFFFE));
771			last if !exists($self->{tmap}->{$tid});
772		}
773		return $tid;
774	}
775
776
777	################################################################################################
778	# Changes the id of given tx_obj
779	sub _ChangeTransactionId {
780		my($self,$tx_obj) = @_;
781
782		my $new_id = _GetFreeTxId();
783		my $old_id = $tx_obj->{id}       or $self->panic("\$tx_obj has no id!");
784		delete($self->{tmap}->{$old_id}) or $self->panic("Could not delete $old_id from tmap!");
785		$tx_obj->{id}            = $new_id; # Fixup internal id
786		$self->{tmap}->{$new_id} = $tx_obj; # Store in hash
787		return $new_id;                     # Return new id
788	}
789
790	################################################################################################
791	# Invalidate transaction of $obj
792	sub Stop {
793		my($self, $obj) = @_;
794		foreach my $trans_id (keys(%{$self->{tmap}})) {
795			my $t_obj = $self->{tmap}->{$trans_id}->{obj};
796			if($t_obj eq $obj) {
797				delete($self->{tmap}->{$trans_id});
798				last;
799			}
800		}
801	}
802
803	################################################################################################
804	# Returns true if torrent still exists in queue
805	sub _TorrentExists {
806		my($self, $obj) = @_;
807		return $self->{_super}->{bittorrent}->Torrent->ExistsTorrent($obj->{info_hash});
808	}
809
810	################################################################################################
811	# Send a connection request to given tracker
812	sub _WriteConnectionRequest {
813		my($self,$tx_obj) = @_;
814		$self->info("$tx_obj->{obj}->{info_hash}: Validating connection to IPv$tx_obj->{obj}->{proto} tracker $tx_obj->{obj}->{tracker}");
815		my $payload = pack("H16", "0000041727101980").pack("NN",OP_CONNECT,$tx_obj->{id});
816		$self->{super}->Network->SendUdp($self->{net}->{sock}, ID=>$self, RemoteIp=>$tx_obj->{ip}, Port=>$tx_obj->{port}, Data=>$payload);
817	}
818
819	################################################################################################
820	# Send an announce request (=request peers)
821	sub _WriteAnnounceRequest {
822		my($self,$tx_obj,$con_id) = @_;
823
824		my $obj    = $tx_obj->{obj};                             # Tracker object
825		my $sha1   = $obj->{info_hash};                          # Current info_hash
826		my $btobj  = $self->{_super}->{bittorrent};              # BitTorrent object
827
828		if($self->_TorrentExists($obj)) {
829			$self->info("$sha1: Requesting new peers from $obj->{tracker}");
830			my $t_port  = int($self->{super}->Configuration->GetValue('torrent_port'));
831			my $t_key   = $self->{_super}->{secret};
832			my $t_pid   = $btobj->{CurrentPeerId};
833			my $t_stats = $self->{super}->Queue->GetStats($sha1);
834			my $t_estr  = $self->{_super}->GetTrackerEvent($obj);
835			my $t_enum  = undef;
836			my $opcode  = OP_ANNOUNCE;
837			my $ipsize  = "N";
838			$t_enum     = ($t_estr eq 'started' ? 2 : ($t_estr eq 'completed' ? 1 : 0 ) );
839
840
841			if(0&&$self->{super}->Network->IsNativeIPv6($tx_obj->{ip})) { # Disabled -> Not implemented in opentracker (yet?)
842				$self->warn("Using IPv6 announce to $tx_obj->{ip}");
843				$opcode = OP_ANNOUNCE6;
844				$ipsize = "H32";
845			}
846
847
848			my $pkt  = pack("H16NN",$con_id,$opcode,$tx_obj->{id});                     # ConnectionId, Opcode, TransactionId
849			   $pkt .= pack("H40",$sha1).$t_pid;                                        # info_hash, peer-id (always 20)
850			   $pkt .= pack("NN",0,$t_stats->{done_bytes});                             # Downloaded
851			   $pkt .= pack("NN",0,($t_stats->{total_bytes}-$t_stats->{done_bytes}));   # Bytes left
852			   $pkt .= pack("NN",0,$t_stats->{uploaded_bytes});                         # Uploaded data
853			   $pkt .= pack("N",$t_enum);                                               # Event
854			   $pkt .= pack($ipsize,0);                                                 # IP(0)
855			   $pkt .= pack("NN",$t_key,50);                                            # Secret, NumWant(50)
856			   $pkt .= pack("n",$t_port);                                               # Port used by BitTorrent
857			$self->{super}->Network->SendUdp($self->{net}->{sock}, ID=>$self, RemoteIp=>$tx_obj->{ip},
858			                                                       Port=>$tx_obj->{port}, Data=>$pkt);
859		}
860	}
861
862	################################################################################################
863	# Stores a connection id in cache
864	sub _CacheConnectionId {
865		my($self,$tx_obj,$con_id) = @_;
866		my $trackerid = $tx_obj->{trackerid} or $self->panic;
867
868		$self->panic("$trackerid HAS a cached connection id!") if defined($self->_GetConnectionId($tx_obj));
869		shift(@{$self->{ccache}});
870		push(@{$self->{ccache}}, {t=>$self->{super}->Network->GetTime, trackerid=>$trackerid, id=>$con_id});
871	}
872
873	################################################################################################
874	# Fetches a connection-id from cache, return undef on cache-miss
875	sub _GetConnectionId {
876		my($self,$tx_obj) = @_;
877
878		my $ttl = $self->{super}->Network->GetTime-60;  # BEP-15 limits the ttl to 60 seconds
879
880		foreach my $cc (@{$self->{ccache}}) {
881			if($cc->{t} >= $ttl && $cc->{trackerid} eq $tx_obj->{trackerid}) {
882				return $cc->{id};
883			}
884		}
885		return undef;
886	}
887
888	################################################################################################
889	# Handles incoming udp data
890	sub _Network_Data {
891		my($self,$sock,$buffref) = @_;
892
893		my $buffer  = ${$buffref};
894		my $bufflen = length($buffer);
895
896		if($bufflen >= 16) {
897			my($action,$trans_id,$con_id) = unpack("NNH16",$buffer); # Parse udp 'header'
898
899			if(exists($self->{tmap}->{$trans_id})) {
900				# -> We got an 'open' transaction
901
902				my $tx_obj = $self->{tmap}->{$trans_id};             # Transaction object
903				my $obj    = $tx_obj->{obj};                         # Tracker object
904				my $sha1   = $obj->{info_hash};                      # Current info_hash
905				my $btobj  = $self->{_super}->{bittorrent};          # BitTorrent object
906				my $NOW    = $self->{super}->Network->GetTime;       # Current timestamp
907
908				$obj->{waiting} or $self->panic("$trans_id was in non-wait state?!"); # paranoia check
909
910				if($action == OP_CONNECT && !defined($self->_GetConnectionId($tx_obj))) {
911					# -> Connect response received. send an announce request
912					$self->_CacheConnectionId($tx_obj,$con_id);
913					$self->_ChangeTransactionId($tx_obj);
914					$self->_WriteAnnounceRequest($tx_obj,$con_id);
915				}
916				elsif($action == OP_ANNOUNCE && $bufflen >= 20 && $self->_TorrentExists($obj)) {
917					# -> Announce-Response for existing torrent
918
919					my(undef,undef,$interval,$peercount,$seeders) = unpack("NNNNN",$buffer);
920
921					$self->{_super}->AdvanceTrackerEvent($obj); # Mark current event as 'sent'
922					$self->{_super}->BlessTracker($obj);        # Mark current tracker as 'alive'
923
924					# Parse and add nodes
925					my @iplist = $self->{super}->Tools->DecodeCompactIp(substr($buffer,20));
926					$btobj->Torrent->GetTorrent($sha1)->AddNewPeers(@iplist);
927
928					my $new_skip = $NOW + (abs(int($interval||0)));
929					my $old_skip = $obj->{skip_until};
930					$obj->{skip_until} = ( $new_skip > $old_skip ? $new_skip : $old_skip ); # Set new skip_until time
931					$obj->{waiting}    = 0;                                                 # No open transaction
932					$self->Stop($obj);                                                      # Mark request as completed (invalidate tmap entry)
933
934					$self->info("$sha1: Received ".int(@iplist)." peers (info: proto=IPv$obj->{proto} peers=$peercount seeders=$seeders)");
935				}
936				elsif($action == OP_ERROR) {
937					# We will timeout after 40 seconds and retry
938					$self->info("$sha1: Tracker returned an error");
939				}
940				else {
941					$self->debug("Ignoring udp-packet with length=$bufflen, action=$action"); # Could be a late connection-id response
942				}
943			}
944			else {
945				$self->info("Received udp-packet with invalid transaction-id ($trans_id), dropping data");
946			}
947		}
948	}
949
950	sub debug { my($self, $msg) = @_; $self->{_super}->debug($msg); }
951	sub info  { my($self, $msg) = @_; $self->{_super}->info($msg);  }
952	sub warn  { my($self, $msg) = @_; $self->{_super}->warn($msg);  }
953	sub panic { my($self, $msg) = @_; $self->{_super}->panic($msg); }
9541;
955