1 /*
2  * Created on 15-Dec-2005
3  * Created by Paul Gardner
4  * Copyright (C) Azureus Software, Inc, All Rights Reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17  *
18  */
19 
20 package com.aelitis.azureus.plugins.extseed.impl;
21 
22 import java.util.*;
23 
24 import org.gudy.azureus2.core3.config.COConfigurationManager;
25 import org.gudy.azureus2.core3.config.ParameterListener;
26 import org.gudy.azureus2.core3.config.impl.TransferSpeedValidator;
27 import org.gudy.azureus2.core3.util.AENetworkClassifier;
28 import org.gudy.azureus2.core3.util.AESemaphore;
29 import org.gudy.azureus2.core3.util.Debug;
30 import org.gudy.azureus2.core3.util.HostNameToIPResolver;
31 import org.gudy.azureus2.core3.util.SystemTime;
32 import org.gudy.azureus2.plugins.PluginInterface;
33 import org.gudy.azureus2.plugins.clientid.ClientIDGenerator;
34 import org.gudy.azureus2.plugins.download.Download;
35 import org.gudy.azureus2.plugins.download.DownloadAnnounceResult;
36 import org.gudy.azureus2.plugins.peers.Peer;
37 import org.gudy.azureus2.plugins.peers.PeerManager;
38 import org.gudy.azureus2.plugins.peers.PeerManagerEvent;
39 import org.gudy.azureus2.plugins.peers.PeerManagerListener2;
40 import org.gudy.azureus2.plugins.peers.PeerReadRequest;
41 import org.gudy.azureus2.plugins.peers.PeerStats;
42 import org.gudy.azureus2.plugins.peers.Piece;
43 import org.gudy.azureus2.plugins.torrent.Torrent;
44 import org.gudy.azureus2.plugins.utils.Monitor;
45 import org.gudy.azureus2.plugins.utils.PooledByteBuffer;
46 import org.gudy.azureus2.plugins.utils.Semaphore;
47 import org.gudy.azureus2.pluginsimpl.local.PluginCoreUtils;
48 
49 import com.aelitis.azureus.core.util.CopyOnWriteSet;
50 import com.aelitis.azureus.plugins.extseed.ExternalSeedException;
51 import com.aelitis.azureus.plugins.extseed.ExternalSeedPeer;
52 import com.aelitis.azureus.plugins.extseed.ExternalSeedPlugin;
53 import com.aelitis.azureus.plugins.extseed.ExternalSeedReader;
54 import com.aelitis.azureus.plugins.extseed.ExternalSeedReaderListener;
55 import com.aelitis.azureus.plugins.extseed.util.ExternalSeedHTTPDownloaderListener;
56 
57 public abstract class
58 ExternalSeedReaderImpl
59 	implements ExternalSeedReader, PeerManagerListener2
60 {
61 	public static final int	RECONNECT_DEFAULT 			= 30*1000;
62 	public static final int INITIAL_DELAY				= 30*1000;
63 	public static final int STALLED_DOWNLOAD_SPEED		= 20*1024;
64 	public static final int STALLED_PEER_SPEED			= 5*1024;
65 
66 	public static final int TOP_PIECE_PRIORITY			= 100*1000;
67 
68 	private static boolean	use_avail_to_activate;
69 
70 	static{
71 		COConfigurationManager.addAndFireParameterListener(
72 				"webseed.activation.uses.availability",
73 				new ParameterListener()
74 				{
75 					public void
76 					parameterChanged(
77 						String name )
78 					{
79 						use_avail_to_activate = COConfigurationManager.getBooleanParameter( name );
80 					}
81 				});
82 	}
83 
84 	private ExternalSeedPlugin	plugin;
85 	private Torrent				torrent;
86 
87 	private final String	host;
88 	private final String	host_net;
89 
90 	private String			ip_use_accessor;
91 
92 	private String			status;
93 
94 	private boolean			active;
95 	private boolean			permanent_fail;
96 
97 	private long			last_failed_read;
98 	private int				consec_failures;
99 
100 	private String			user_agent;
101 
102 	private long			peer_manager_change_time;
103 
104 	private volatile PeerManager		current_manager;
105 
106 	private List<PeerReadRequest>			requests			= new LinkedList<PeerReadRequest>();
107 	private List<PeerReadRequest>			dangling_requests;
108 
109 	private Thread			request_thread;
110 	private Semaphore		request_sem;
111 	private Monitor			requests_mon;
112 
113 	private ExternalSeedReaderRequest	active_read_request;
114 
115 	private int[]		priority_offsets;
116 
117 	private boolean		fast_activate;
118 	private int			min_availability;
119 	private int			min_download_speed;
120 	private int			max_peer_speed;
121 	private long		valid_until;
122 	private boolean		transient_seed;
123 
124 	private int			reconnect_delay	= RECONNECT_DEFAULT;
125 
126 	private volatile ExternalSeedReaderRequest	current_request;
127 
128 	private List	listeners	= new ArrayList();
129 
130 	private AESemaphore			rate_sem = new AESemaphore( "ExternalSeedReaderRequest" );
131 	private int					rate_bytes_read;
132 	private int					rate_bytes_permitted;
133 
134 	private volatile CopyOnWriteSet<MutableInteger>		bad_pieces = new CopyOnWriteSet<MutableInteger>( true );
135 
136 	protected
ExternalSeedReaderImpl( ExternalSeedPlugin _plugin, Torrent _torrent, String _host, Map _params )137 	ExternalSeedReaderImpl(
138 		ExternalSeedPlugin 		_plugin,
139 		Torrent					_torrent,
140 		String					_host,
141 		Map						_params )
142 	{
143 		plugin	= _plugin;
144 		torrent	= _torrent;
145 		host	= _host;
146 
147 		host_net = AENetworkClassifier.categoriseAddress( host );
148 
149 		fast_activate 		= getBooleanParam( _params, "fast_start", false );
150 		min_availability 	= getIntParam( _params, "min_avail", 1 );	// default is avail based
151 		min_download_speed	= getIntParam( _params, "min_speed", 0 );
152 		max_peer_speed		= getIntParam( _params, "max_speed", 0 );
153 		valid_until			= getIntParam( _params, "valid_ms", 0 );
154 
155 		if ( valid_until > 0 ){
156 
157 			valid_until += getSystemTime();
158 		}
159 
160 		transient_seed		= getBooleanParam( _params, "transient", false );
161 
162 		requests_mon	= plugin.getPluginInterface().getUtilities().getMonitor();
163 		request_sem		= plugin.getPluginInterface().getUtilities().getSemaphore();
164 
165 		PluginInterface	pi = plugin.getPluginInterface();
166 
167 		user_agent = pi.getAzureusName();
168 
169 		try{
170 			Properties	props = new Properties();
171 
172 			pi.getClientIDManager().getGenerator().generateHTTPProperties( torrent.getHash(), props );
173 
174 			String ua = props.getProperty( ClientIDGenerator.PR_USER_AGENT );
175 
176 			if ( ua != null ){
177 
178 				user_agent	= ua;
179 			}
180 		}catch( Throwable e ){
181 		}
182 
183 		setActive( null, false );
184 	}
185 
186 	public String
getIP()187 	getIP()
188 	{
189 		synchronized( host ){
190 
191 			if ( ip_use_accessor == null ){
192 
193 				try{
194 					ip_use_accessor = HostNameToIPResolver.syncResolve( host ).getHostAddress();
195 
196 				}catch( Throwable e ){
197 
198 					ip_use_accessor = host;
199 
200 					Debug.out( e );
201 				}
202 			}
203 
204 			return( ip_use_accessor );
205 		}
206 	}
207 
208 	public Torrent
getTorrent()209 	getTorrent()
210 	{
211 		return( torrent );
212 	}
213 
214 	public String
getStatus()215 	getStatus()
216 	{
217 		return( status );
218 	}
219 
220 	public boolean
isTransient()221 	isTransient()
222 	{
223 		return( transient_seed );
224 	}
225 
226 	protected void
log( String str )227 	log(
228 		String	str )
229 	{
230 		plugin.log( str );
231 	}
232 
233 	protected String
getUserAgent()234 	getUserAgent()
235 	{
236 		return( user_agent );
237 	}
238 	protected long
getSystemTime()239 	getSystemTime()
240 	{
241 		return( plugin.getPluginInterface().getUtilities().getCurrentSystemTime());
242 	}
243 
244 	protected int
getFailureCount()245 	getFailureCount()
246 	{
247 		return( consec_failures );
248 	}
249 
250 	protected long
getLastFailTime()251 	getLastFailTime()
252 	{
253 		return( last_failed_read );
254 	}
255 
256 	public boolean
isPermanentlyUnavailable()257 	isPermanentlyUnavailable()
258 	{
259 		return( permanent_fail );
260 	}
261 
262 	protected void
setReconnectDelay( int delay, boolean reset_failures )263 	setReconnectDelay(
264 		int			delay,
265 		boolean		reset_failures )
266 	{
267 		reconnect_delay = delay;
268 
269 		if ( reset_failures ){
270 
271 			consec_failures = 0;
272 		}
273 	}
274 
275 	public void
eventOccurred( PeerManagerEvent event )276 	eventOccurred(
277 		PeerManagerEvent	event )
278 	{
279 		if ( event.getType() == PeerManagerEvent.ET_PEER_SENT_BAD_DATA ){
280 
281 			if ( event.getPeer().getIp().equals( getIP())){
282 
283 				if ( bad_pieces.size() > 128 ){
284 
285 					return;
286 				}
287 
288 				bad_pieces.add(new MutableInteger((Integer)event.getData()));
289 			}
290 		}
291 	}
292 
293 	protected boolean
readyToActivate( PeerManager peer_manager, Peer peer, long time_since_start )294 	readyToActivate(
295 		PeerManager	peer_manager,
296 		Peer		peer,
297 		long		time_since_start )
298 	{
299 		boolean	early_days = time_since_start < INITIAL_DELAY;
300 
301 		try{
302 			Download download = peer_manager.getDownload();
303 
304 				// first respect failure count
305 
306 			int	fail_count = getFailureCount();
307 
308 			if ( fail_count > 0 ){
309 
310 				int	delay	= reconnect_delay;
311 
312 				for (int i=1;i<fail_count;i++){
313 
314 					delay += delay;
315 
316 					if ( delay > 30*60*1000 ){
317 
318 						break;
319 					}
320 				}
321 
322 				long	now = getSystemTime();
323 
324 				long	last_fail = getLastFailTime();
325 
326 				if ( last_fail < now && now - last_fail < delay ){
327 
328 					return( false );
329 				}
330 			}
331 
332 				// next obvious things like validity and the fact that we're complete
333 
334 			if ( valid_until > 0 && getSystemTime() > valid_until ){
335 
336 				return( false );
337 			}
338 
339 			if ( download.getState() != Download.ST_DOWNLOADING ){
340 
341 				return( false );
342 			}
343 
344 				// check dnd completeness too
345 
346 			if ( download.isComplete()){
347 
348 				return( false );
349 			}
350 
351 			if ( !PluginCoreUtils.unwrap( download ).getDownloadState().isNetworkEnabled( host_net )){
352 
353 				return( false );
354 			}
355 
356 				// now the more interesting stuff
357 
358 			if ( transient_seed ){
359 
360 					// kick any existing peers that are running too slowly if the download appears
361 					// to be stalled
362 
363 				Peer[]	existing_peers = peer_manager.getPeers( getIP());
364 
365 				int	existing_peer_count = existing_peers.length;
366 
367 				int	global_limit	= TransferSpeedValidator.getGlobalDownloadRateLimitBytesPerSecond();
368 
369 				if ( global_limit > 0 ){
370 
371 						// if we have a global limit in force and we are near it then no point in
372 						// activating
373 
374 					int current_down = plugin.getGlobalDownloadRateBytesPerSec();
375 
376 					if ( global_limit - current_down < 5*1024 ){
377 
378 						return( false );
379 					}
380 				}
381 
382 				int	download_limit  = peer_manager.getDownloadRateLimitBytesPerSecond();
383 
384 				if ( global_limit > 0 && global_limit < download_limit ){
385 
386 					download_limit = global_limit;
387 				}
388 
389 				if ( 	( download_limit == 0 || download_limit > STALLED_DOWNLOAD_SPEED + 5*1024 ) &&
390 						peer_manager.getStats().getDownloadAverage() < STALLED_DOWNLOAD_SPEED ){
391 
392 					for (int i=0;i<existing_peers.length;i++){
393 
394 						Peer	existing_peer = existing_peers[i];
395 
396 							// no point in booting ourselves!
397 
398 						if ( existing_peer instanceof ExternalSeedPeer ){
399 
400 							continue;
401 						}
402 
403 						PeerStats stats = existing_peer.getStats();
404 
405 						if ( stats.getTimeSinceConnectionEstablished() > INITIAL_DELAY ){
406 
407 							if ( stats.getDownloadAverage() < STALLED_PEER_SPEED ){
408 
409 								existing_peer.close( "Replacing slow peer with web-seed", false, false );
410 
411 								existing_peer_count--;
412 							}
413 						}
414 					}
415 				}
416 
417 				if ( existing_peer_count == 0 ){
418 
419 					// check to see if we have pending connections to the same address
420 
421 					if ( peer_manager.getPendingPeers( getIP()).length == 0 ){
422 
423 						log( getName() + ": activating as transient seed and nothing blocking it" );
424 
425 						return( true );
426 					}
427 				}
428 			}
429 
430 				// availability and speed based stuff needs a little time before being applied
431 
432 			if ( !use_avail_to_activate ){
433 
434 				log( getName() + ": activating as availability-based activation disabled" );
435 
436 				return( true );
437 			}
438 
439 			if ( fast_activate || !early_days ){
440 
441 				if ( min_availability > 0 ){
442 
443 					float availability = download.getStats().getAvailability();
444 
445 					if ( availability < min_availability){
446 
447 						log( getName() + ": activating as availability is poor" );
448 
449 						return( true );
450 					}
451 				}
452 
453 				if ( min_download_speed > 0 ){
454 
455 					if ( peer_manager.getStats().getDownloadAverage() < min_download_speed ){
456 
457 						log( getName() + ": activating as speed is slow" );
458 
459 						return( true );
460 					}
461 				}
462 			}
463 
464 				// if we have an announce result and there are no seeds, or it failed then go for it
465 
466 			DownloadAnnounceResult ar = download.getLastAnnounceResult();
467 
468 			if ( ar != null ){
469 
470 				if ( ar.getResponseType() == DownloadAnnounceResult.RT_ERROR ){
471 
472 					log( getName() + ": activating as tracker unavailable" );
473 
474 					return( true );
475 				}
476 
477 				if ( ar.getSeedCount() == 0 ){
478 
479 					log( getName() + ": activating as no seeds" );
480 
481 					return( true );
482 				}
483 			}
484 		}catch( Throwable e ){
485 
486 			Debug.printStackTrace(e);
487 		}
488 
489 		return( false );
490 	}
491 
492 	protected boolean
readyToDeactivate( PeerManager peer_manager, Peer peer )493 	readyToDeactivate(
494 		PeerManager	peer_manager,
495 		Peer		peer )
496 	{
497 		try{
498 				// obvious stuff first
499 
500 			if ( valid_until > 0 && getSystemTime() > valid_until ){
501 
502 				return( true );
503 			}
504 
505 			if ( peer_manager.getDownload().getState() == Download.ST_SEEDING ){
506 
507 				return( true );
508 			}
509 
510 				// more interesting stuff
511 
512 			if ( transient_seed ){
513 
514 				return( false );
515 			}
516 
517 			boolean	deactivate = false;
518 			String	reason		= "";
519 
520 			if ( use_avail_to_activate ){
521 
522 				if ( min_availability > 0 ){
523 
524 					float availability = peer_manager.getDownload().getStats().getAvailability();
525 
526 					if ( availability >= min_availability + 1 ){
527 
528 						reason =  "availability is good";
529 
530 						deactivate = true;
531 					}
532 				}
533 
534 				if ( min_download_speed > 0 ){
535 
536 					long	my_speed 		= peer.getStats().getDownloadAverage();
537 
538 					long	overall_speed 	= peer_manager.getStats().getDownloadAverage();
539 
540 					if ( overall_speed - my_speed > 2 * min_download_speed ){
541 
542 						reason += (reason.length()==0?"":", ") + "speed is good";
543 
544 						deactivate = true;
545 
546 					}else{
547 
548 						deactivate = false;
549 					}
550 				}
551 			}
552 
553 			if ( deactivate ){
554 
555 				log( getName() + ": deactivating as " + reason );
556 
557 				return( true );
558 			}
559 		}catch( Throwable e ){
560 
561 			Debug.printStackTrace(e);
562 		}
563 
564 		return( false );
565 	}
566 
567 	public boolean
checkActivation( PeerManager peer_manager, Peer peer )568 	checkActivation(
569 		PeerManager		peer_manager,
570 		Peer			peer )
571 	{
572 		long now = getSystemTime();
573 
574 		if ( peer_manager == current_manager ){
575 
576 			if ( peer_manager_change_time > now ){
577 
578 				peer_manager_change_time	= now;
579 			}
580 
581 			long	time_since_started = now - peer_manager_change_time;
582 
583 
584 			if ( peer_manager != null ){
585 
586 				if ( active ){
587 
588 					if ( now - peer_manager_change_time > INITIAL_DELAY && readyToDeactivate( peer_manager, peer )){
589 
590 						setActive( peer_manager, false );
591 
592 					}else{
593 
594 						if ( max_peer_speed > 0 ){
595 
596 							PeerStats ps = peer.getStats();
597 
598 							if ( ps != null && ps.getDownloadRateLimit() != max_peer_speed ){
599 
600 								ps.setDownloadRateLimit( max_peer_speed );
601 							}
602 						}
603 					}
604 				}else{
605 
606 					if ( !isPermanentlyUnavailable()){
607 
608 						if ( readyToActivate( peer_manager, peer, time_since_started )){
609 
610 							if ( max_peer_speed > 0 ){
611 
612 								PeerStats ps = peer.getStats();
613 
614 								if ( ps != null ){
615 
616 									ps.setDownloadRateLimit( max_peer_speed );
617 								}
618 							}
619 
620 							setActive( peer_manager, true );
621 						}
622 					}
623 				}
624 			}
625 		}else{
626 
627 				// if the peer manager's changed then we always go inactive for a period to wait for
628 				// download status to stabilise a bit
629 
630 			peer_manager_change_time	= now;
631 
632 			PeerManager existing_manager = current_manager;
633 
634 			if ( current_manager != null ){
635 
636 				current_manager.removeListener( this );
637 			}
638 
639 			current_manager	= peer_manager;
640 
641 			if ( current_manager != null ){
642 
643 				current_manager.addListener( this );
644 			}
645 
646 			setActive( existing_manager, false );
647 		}
648 
649 		return( active );
650 	}
651 
652 	public void
deactivate( String reason )653 	deactivate(
654 		String	reason )
655 	{
656 		plugin.log( getName() + ": deactivating (" + reason  + ")" );
657 
658 		checkActivation( null, null );
659 	}
660 
661 	protected void
setActive( PeerManager _peer_manager, boolean _active )662 	setActive(
663 		PeerManager	_peer_manager,
664 		boolean		_active )
665 	{
666 		try{
667 			requests_mon.enter();
668 
669 			active	= _active;
670 
671 			status = active?"Active":"Idle";
672 
673 			rate_bytes_permitted		= 0;
674 			rate_bytes_read				= 0;
675 
676 			setActiveSupport( _peer_manager, _active );
677 
678 		}finally{
679 
680 			requests_mon.exit();
681 		}
682 	}
683 
684 	protected void
setActiveSupport( PeerManager _peer_manager, boolean _active )685 	setActiveSupport(
686 		PeerManager	_peer_manager,
687 		boolean		_active )
688 	{
689 		// overridden if needed
690 	}
691 
692 	public boolean
isActive()693 	isActive()
694 	{
695 		return( active );
696 	}
697 
698 	protected void
processRequests()699 	processRequests()
700 	{
701 		try{
702 			requests_mon.enter();
703 
704 			if ( request_thread != null ){
705 
706 				return;
707 			}
708 
709 			request_thread = Thread.currentThread();
710 
711 		}finally{
712 
713 			requests_mon.exit();
714 		}
715 
716 		while( true ){
717 
718 			try{
719 				if ( !request_sem.reserve(30000)){
720 
721 					try{
722 						requests_mon.enter();
723 
724 						if ( requests.size() == 0 ){
725 
726 							dangling_requests = null;
727 
728 							request_thread	= null;
729 
730 							break;
731 						}
732 					}finally{
733 
734 						requests_mon.exit();
735 					}
736 				}else{
737 
738 					List<PeerReadRequest>	selected_requests 	= new ArrayList<PeerReadRequest>();
739 					PeerReadRequest			cancelled_request	= null;
740 
741 					try{
742 						requests_mon.enter();
743 
744 							// get an advisory set to process together
745 
746 						int	count = selectRequests( requests );
747 
748 						if ( count <= 0 || count > requests.size()){
749 
750 							Debug.out( "invalid count" );
751 
752 							count	= 1;
753 						}
754 
755 						for (int i=0;i<count;i++){
756 
757 							PeerReadRequest	request = requests.remove(0);
758 
759 							if ( request.isCancelled()){
760 
761 									// if this is the first request then process it, otherwise leave
762 									// for the next-round
763 
764 								if ( i == 0 ){
765 
766 									cancelled_request = request;
767 
768 								}else{
769 
770 									requests.add( 0, request );
771 								}
772 
773 								break;
774 
775 							}else{
776 
777 								selected_requests.add( request );
778 
779 								if ( i > 0 ){
780 
781 										// we've only got the sem for the first request, catch up for subsequent
782 
783 									request_sem.reserve();
784 								}
785 							}
786 						}
787 
788 						dangling_requests = new ArrayList<PeerReadRequest>( selected_requests );
789 
790 					}finally{
791 
792 						requests_mon.exit();
793 					}
794 
795 					if ( cancelled_request != null ){
796 
797 						informCancelled( cancelled_request );
798 
799 					}else{
800 
801 						processRequests( selected_requests );
802 					}
803 				}
804 			}catch( Throwable e ){
805 
806 				e.printStackTrace();
807 			}
808 		}
809 	}
810 
811 		/**
812 		 * Rate handling
813 		 */
814 
815 	public int
readBytes( int max )816 	readBytes(
817 		int		max )
818 	{
819 			// permission to read a bunch of bytes
820 
821 			// we're out of step here due to multiple threads so we have to report what
822 			// has already happened and prepare for what will
823 
824 		int	res = 0;
825 
826 		synchronized( rate_sem ){
827 
828 			if ( rate_bytes_read > 0 ){
829 
830 				res = rate_bytes_read;
831 
832 				if ( res > max ){
833 
834 					res = max;
835 				}
836 
837 				rate_bytes_read -= res;
838 			}
839 
840 			int	rem = max - res;
841 
842 			if ( rem > rate_bytes_permitted ){
843 
844 				if ( rate_bytes_permitted == 0 ){
845 
846 					rate_sem.release();
847 				}
848 
849 				rate_bytes_permitted = rem;
850 			}
851 
852 				// if things are way out then hack them back - most likely a change from unlimited to limited...
853 
854 			if ( rate_bytes_permitted > max*10L ){
855 
856 				rate_bytes_permitted = max;
857 			}
858 		}
859 
860 		return( res );
861 	}
862 
863 	public int
getPermittedBytes()864 	getPermittedBytes()
865 
866 		throws ExternalSeedException
867 	{
868 		synchronized( rate_sem ){
869 
870 			if ( rate_bytes_permitted > 0 ){
871 
872 				return( rate_bytes_permitted );
873 			}
874 		}
875 
876 		if ( !rate_sem.reserve( 1000 )){
877 
878 			return( 1 );	// one byte a sec to check for connection liveness
879 		}
880 
881 		return( rate_bytes_permitted );
882 	}
883 
884 	public void
reportBytesRead( int num )885 	reportBytesRead(
886 		int		num )
887 	{
888 		synchronized( rate_sem ){
889 
890 			rate_bytes_read += num;
891 
892 			rate_bytes_permitted -= num;
893 
894 			if ( rate_bytes_permitted < 0 ){
895 
896 				rate_bytes_permitted = 0;
897 			}
898 		}
899 	}
900 
901 	public int
getPercentDoneOfCurrentIncomingRequest()902 	getPercentDoneOfCurrentIncomingRequest()
903 	{
904 		ExternalSeedReaderRequest	cr = current_request;
905 
906 		if ( cr == null ){
907 
908 			return( -1 );
909 		}
910 
911 		return( cr.getPercentDoneOfCurrentIncomingRequest());
912 	}
913 
914 	public int
getMaximumNumberOfRequests()915 	getMaximumNumberOfRequests()
916 	{
917 		if ( getRequestCount() == 0 ){
918 
919 			return((int)(( getPieceGroupSize() * torrent.getPieceSize() ) / PeerReadRequest.NORMAL_REQUEST_SIZE ));
920 
921 		}else{
922 
923 			return( 0 );
924 		}
925 	}
926 
927 	public void
calculatePriorityOffsets( PeerManager peer_manager, int[] base_priorities )928 	calculatePriorityOffsets(
929 		PeerManager		peer_manager,
930 		int[]			base_priorities )
931 	{
932 		try{
933 			Piece[]	pieces = peer_manager.getPieces();
934 
935 			int	piece_group_size = getPieceGroupSize();
936 
937 			int[]	contiguous_best_pieces = new int[piece_group_size];
938 			int[]	contiguous_highest_pri = new int[piece_group_size];
939 
940 			Arrays.fill( contiguous_highest_pri, -1 );
941 
942 			int	contiguous			= 0;
943 			int	contiguous_best_pri	= -1;
944 
945 			int	max_contiguous	= 0;
946 
947 			int	max_free_reqs		= 0;
948 			int max_free_reqs_piece	= -1;
949 
950 			MutableInteger	mi = new MutableInteger(0);
951 
952 			for (int i=0;i<pieces.length;i++){
953 
954 				mi.setValue( i );
955 
956 				if ( bad_pieces.contains(mi)){
957 
958 					continue;
959 				}
960 
961 				Piece	piece = pieces[i];
962 
963 				if ( piece.isFullyAllocatable()){
964 
965 					contiguous++;
966 
967 					int	base_pri = base_priorities[i];
968 
969 					if ( base_pri > contiguous_best_pri ){
970 
971 						contiguous_best_pri	= base_pri;
972 					}
973 
974 					for (int j=0;j<contiguous && j<contiguous_highest_pri.length;j++){
975 
976 						if ( contiguous_best_pri > contiguous_highest_pri[j] ){
977 
978 							contiguous_highest_pri[j]	= contiguous_best_pri;
979 							contiguous_best_pieces[j]	= i - j;
980 						}
981 
982 						if ( j+1 > max_contiguous ){
983 
984 							max_contiguous	= j+1;
985 						}
986 					}
987 
988 				}else{
989 
990 					contiguous			= 0;
991 					contiguous_best_pri	= -1;
992 
993 					if ( max_contiguous == 0 ){
994 
995 						int	free_reqs = piece.getAllocatableRequestCount();
996 
997 						if ( free_reqs > max_free_reqs ){
998 
999 							max_free_reqs 		= free_reqs;
1000 							max_free_reqs_piece	= i;
1001 						}
1002 					}
1003 				}
1004 			}
1005 
1006 			if ( max_contiguous == 0 ){
1007 
1008 				if ( max_free_reqs_piece >= 0 ){
1009 
1010 					priority_offsets	 = new int[ (int)getTorrent().getPieceCount()];
1011 
1012 					priority_offsets[max_free_reqs_piece] = TOP_PIECE_PRIORITY;
1013 
1014 				}else{
1015 
1016 					priority_offsets	= null;
1017 				}
1018 			}else{
1019 
1020 				priority_offsets	 = new int[ (int)getTorrent().getPieceCount()];
1021 
1022 				int	start_piece = contiguous_best_pieces[max_contiguous-1];
1023 
1024 				for (int i=start_piece;i<start_piece+max_contiguous;i++){
1025 
1026 					priority_offsets[i] = TOP_PIECE_PRIORITY - (i-start_piece);
1027 				}
1028 			}
1029 		}catch( Throwable e ){
1030 
1031 			Debug.printStackTrace(e);
1032 
1033 			priority_offsets	= null;
1034 		}
1035 	}
1036 
1037 	protected abstract int
getPieceGroupSize()1038 	getPieceGroupSize();
1039 
1040 	protected abstract boolean
getRequestCanSpanPieces()1041 	getRequestCanSpanPieces();
1042 
1043 	public int[]
getPriorityOffsets()1044 	getPriorityOffsets()
1045 	{
1046 		return( priority_offsets );
1047 	}
1048 
1049 	protected int
selectRequests( List<PeerReadRequest> requests )1050 	selectRequests(
1051 		List<PeerReadRequest>	requests )
1052 	{
1053 		long	next_start = -1;
1054 
1055 		int	last_piece_number = -1;
1056 
1057 		for (int i=0;i<requests.size();i++){
1058 
1059 			PeerReadRequest	request = (PeerReadRequest)requests.get(i);
1060 
1061 			int	this_piece_number	= request.getPieceNumber();
1062 
1063 			if ( last_piece_number != -1 && last_piece_number != this_piece_number ){
1064 
1065 				if ( !getRequestCanSpanPieces()){
1066 
1067 					return( i );
1068 				}
1069 			}
1070 
1071 			long	this_start = this_piece_number * torrent.getPieceSize() + request.getOffset();
1072 
1073 			if ( next_start != -1 && this_start != next_start ){
1074 
1075 				return(i);
1076 			}
1077 
1078 			next_start	= this_start + request.getLength();
1079 
1080 			last_piece_number	= this_piece_number;
1081 		}
1082 
1083 		return( requests.size());
1084 	}
1085 
1086 	public byte[]
read( int piece_number, int piece_offset, int length, final int timeout )1087    	read(
1088    		int			piece_number,
1089    		int			piece_offset,
1090    		int			length,
1091    		final int	timeout )
1092 
1093    		throws ExternalSeedException
1094    	{
1095    		final byte[] 	result = new byte[ length ];
1096 
1097    		ExternalSeedHTTPDownloaderListener listener =
1098    			new ExternalSeedHTTPDownloaderListener()
1099    			{
1100    				private int		bp;
1101    				private long	start_time = SystemTime.getCurrentTime();
1102 
1103    				public byte[]
1104    	        	getBuffer()
1105 
1106    	        		throws ExternalSeedException
1107    	        	{
1108    					return( result );
1109    	        	}
1110 
1111    	        	public void
1112    	        	setBufferPosition(
1113    	        		int	position )
1114    	        	{
1115    	        		bp = position;
1116    	        	}
1117 
1118    	        	public int
1119    	        	getBufferPosition()
1120    	        	{
1121    	        		return( bp );
1122    	        	}
1123 
1124    	        	public int
1125    	        	getBufferLength()
1126    	        	{
1127    	        		return( result.length );
1128    	        	}
1129 
1130    	        	public int
1131    	        	getPermittedBytes()
1132 
1133    	        		throws ExternalSeedException
1134    	        	{
1135    	        		return( result.length );
1136    	        	}
1137 
1138    	        	public int
1139    	        	getPermittedTime()
1140    	        	{
1141    	        		if ( timeout == 0 ){
1142 
1143    	        			return( 0 );
1144    	        		}
1145 
1146    	        		int	rem = timeout - (int)( SystemTime.getCurrentTime() - start_time );
1147 
1148    	        		if ( rem <= 0 ){
1149 
1150    	        			return( -1 );
1151    	        		}
1152 
1153    	        		return( rem );
1154    	        	}
1155 
1156    	        	public void
1157    	        	reportBytesRead(
1158    	        		int		num )
1159    	        	{
1160    	        	}
1161 
1162    	        	public boolean
1163    	        	isCancelled()
1164    	        	{
1165    	        		return false;
1166    	        	}
1167 
1168    	        	public void
1169    	        	done()
1170    	        	{
1171    	        	}
1172    			};
1173 
1174    		readData( piece_number, piece_offset, length, listener );
1175 
1176    		return( result );
1177    	}
1178 
1179 	protected void
readData( ExternalSeedReaderRequest request )1180 	readData(
1181 		ExternalSeedReaderRequest	request )
1182 
1183 		throws ExternalSeedException
1184 	{
1185 		readData( request.getStartPieceNumber(), request.getStartPieceOffset(), request.getLength(), request );
1186 	}
1187 
1188 	protected abstract void
readData( int piece_number, int piece_offset, int length, ExternalSeedHTTPDownloaderListener listener )1189 	readData(
1190 		int									piece_number,
1191 		int									piece_offset,
1192 		int									length,
1193 		ExternalSeedHTTPDownloaderListener	listener )
1194 
1195 		throws ExternalSeedException;
1196 
1197 	protected void
processRequests( List<PeerReadRequest> requests )1198 	processRequests(
1199 		List<PeerReadRequest>		requests )
1200 	{
1201 		boolean	ok = false;
1202 
1203 		ExternalSeedReaderRequest	request = new ExternalSeedReaderRequest( this, requests );
1204 
1205 		active_read_request = request;
1206 
1207 		try{
1208 			current_request = request;
1209 
1210 			readData( request );
1211 
1212 			ok	= true;
1213 
1214 		}catch( ExternalSeedException 	e ){
1215 
1216 			if ( e.isPermanentFailure()){
1217 
1218 				permanent_fail	= true;
1219 			}
1220 
1221 			status = "Failed: " + Debug.getNestedExceptionMessage(e);
1222 
1223 			request.failed();
1224 
1225 		}catch( Throwable e ){
1226 
1227 			status = "Failed: " + Debug.getNestedExceptionMessage(e);
1228 
1229 			request.failed();
1230 
1231 		}finally{
1232 
1233 			active_read_request = null;
1234 
1235 			if ( ok ){
1236 
1237 				last_failed_read	= 0;
1238 
1239 				consec_failures		= 0;
1240 
1241 			}else{
1242 				last_failed_read	= getSystemTime();
1243 
1244 				consec_failures++;
1245 			}
1246 		}
1247 	}
1248 
1249 	public void
addRequests( List<PeerReadRequest> new_requests )1250 	addRequests(
1251 		List<PeerReadRequest>	new_requests )
1252 	{
1253 		try{
1254 			requests_mon.enter();
1255 
1256 			if ( !active ){
1257 
1258 				Debug.out( "request added when not active!!!!" );
1259 			}
1260 
1261 			for (int i=0;i<new_requests.size();i++){
1262 
1263 				requests.add( new_requests.get(i));
1264 
1265 				request_sem.release();
1266 			}
1267 
1268 			if ( request_thread == null ){
1269 
1270 				plugin.getPluginInterface().getUtilities().createThread(
1271 						"RequestProcessor",
1272 						new Runnable()
1273 						{
1274 							public void
1275 							run()
1276 							{
1277 								processRequests();
1278 							}
1279 						});
1280 			}
1281 
1282 		}finally{
1283 
1284 			requests_mon.exit();
1285 		}
1286 	}
1287 
1288 	public void
cancelRequest( PeerReadRequest request )1289 	cancelRequest(
1290 		PeerReadRequest	request )
1291 	{
1292 		try{
1293 			requests_mon.enter();
1294 
1295 			if ( requests.contains( request ) && !request.isCancelled()){
1296 
1297 				request.cancel();
1298 			}
1299 
1300 			if ( dangling_requests != null && dangling_requests.contains( request ) && !request.isCancelled()){
1301 
1302 				request.cancel();
1303 			}
1304 
1305 		}finally{
1306 
1307 			requests_mon.exit();
1308 		}
1309 	}
1310 
1311 	public void
cancelAllRequests()1312 	cancelAllRequests()
1313 	{
1314 		try{
1315 			requests_mon.enter();
1316 
1317 			for ( PeerReadRequest request: requests ){
1318 
1319 				if ( !request.isCancelled()){
1320 
1321 					request.cancel();
1322 				}
1323 			}
1324 
1325 			if ( dangling_requests != null ){
1326 
1327 				for ( PeerReadRequest request: dangling_requests ){
1328 
1329 					if ( !request.isCancelled()){
1330 
1331 						request.cancel();
1332 					}
1333 				}
1334 			}
1335 
1336 			if ( active_read_request != null ){
1337 
1338 				active_read_request.cancel();
1339 			}
1340 		}finally{
1341 
1342 			requests_mon.exit();
1343 		}
1344 	}
1345 
1346 	public int
getRequestCount()1347 	getRequestCount()
1348 	{
1349 		try{
1350 			requests_mon.enter();
1351 
1352 			return( requests.size());
1353 
1354 		}finally{
1355 
1356 			requests_mon.exit();
1357 		}
1358 	}
1359 
1360 	public List<PeerReadRequest>
getExpiredRequests()1361 	getExpiredRequests()
1362 	{
1363 		List<PeerReadRequest>	res = null;
1364 
1365 		try{
1366 			requests_mon.enter();
1367 
1368 			for (int i=0;i<requests.size();i++){
1369 
1370 				PeerReadRequest	request = (PeerReadRequest)requests.get(i);
1371 
1372 				if ( request.isExpired()){
1373 
1374 					if ( res == null ){
1375 
1376 						res = new ArrayList<PeerReadRequest>();
1377 					}
1378 
1379 					res.add( request );
1380 				}
1381 			}
1382 		}finally{
1383 
1384 			requests_mon.exit();
1385 		}
1386 
1387 		return( res );
1388 	}
1389 
1390 	public List<PeerReadRequest>
getRequests()1391 	getRequests()
1392 	{
1393 		List<PeerReadRequest>	res = null;
1394 
1395 		try{
1396 			requests_mon.enter();
1397 
1398 			res = new ArrayList<PeerReadRequest>( requests );
1399 
1400 		}finally{
1401 
1402 			requests_mon.exit();
1403 		}
1404 
1405 		return( res );
1406 	}
1407 
1408 	public int[]
getOutgoingRequestedPieceNumbers()1409    	getOutgoingRequestedPieceNumbers()
1410 	{
1411 		try{
1412 			requests_mon.enter();
1413 
1414 			int size = requests.size();
1415 
1416 			if ( dangling_requests != null ){
1417 
1418 				size += dangling_requests.size();
1419 			}
1420 
1421 			int[] res = new int[size];
1422 
1423 			int	pos = 0;
1424 
1425 			if ( dangling_requests != null ){
1426 
1427 				for ( PeerReadRequest r: dangling_requests ){
1428 
1429 					int	piece_number = r.getPieceNumber();
1430 
1431 					boolean	hit = false;
1432 
1433 					for ( int i=0;i<pos;i++){
1434 
1435 						if ( piece_number == res[i] ){
1436 
1437 							hit = true;
1438 
1439 							break;
1440 						}
1441 					}
1442 
1443 					if ( !hit ){
1444 
1445 						res[pos++] = piece_number;
1446 					}
1447 				}
1448 			}
1449 
1450 			for ( PeerReadRequest r: requests ){
1451 
1452 				int	piece_number = r.getPieceNumber();
1453 
1454 				boolean	hit = false;
1455 
1456 				for ( int i=0;i<pos;i++){
1457 
1458 					if ( piece_number == res[i] ){
1459 
1460 						hit = true;
1461 
1462 						break;
1463 					}
1464 				}
1465 
1466 				if ( !hit ){
1467 
1468 					res[pos++] = piece_number;
1469 				}
1470 			}
1471 
1472 			if ( pos == res.length ){
1473 
1474 				return( res );
1475 			}
1476 
1477 			int[]	trunc = new int[pos];
1478 
1479 			System.arraycopy( res, 0, trunc, 0, pos );
1480 
1481 			return( trunc );
1482 
1483 		}finally{
1484 
1485 			requests_mon.exit();
1486 		}
1487 	}
1488 
1489    	public int
getOutgoingRequestCount()1490   	getOutgoingRequestCount()
1491    	{
1492 		try{
1493 			requests_mon.enter();
1494 
1495 			int res = requests.size();
1496 
1497 			if ( dangling_requests != null ){
1498 
1499 				res += dangling_requests.size();
1500 			}
1501 
1502 			return( res );
1503 
1504 		}finally{
1505 
1506 			requests_mon.exit();
1507 		}
1508    	}
1509 
1510 
1511 	protected void
informComplete( PeerReadRequest request, byte[] buffer )1512 	informComplete(
1513 		PeerReadRequest		request,
1514 		byte[]				buffer )
1515 	{
1516 		PooledByteBuffer pool_buffer = plugin.getPluginInterface().getUtilities().allocatePooledByteBuffer( buffer );
1517 
1518 		for (int i=0;i<listeners.size();i++){
1519 
1520 			try{
1521 				((ExternalSeedReaderListener)listeners.get(i)).requestComplete( request, pool_buffer );
1522 
1523 			}catch( Throwable e ){
1524 
1525 				e.printStackTrace();
1526 			}
1527 		}
1528 	}
1529 
1530 	protected void
informCancelled( PeerReadRequest request )1531 	informCancelled(
1532 		PeerReadRequest		request )
1533 	{
1534 		for (int i=0;i<listeners.size();i++){
1535 
1536 			try{
1537 				((ExternalSeedReaderListener)listeners.get(i)).requestCancelled( request );
1538 
1539 			}catch( Throwable e ){
1540 
1541 				e.printStackTrace();
1542 			}
1543 		}
1544 	}
1545 
1546 	protected void
informFailed( PeerReadRequest request )1547 	informFailed(
1548 		PeerReadRequest	request )
1549 	{
1550 		for (int i=0;i<listeners.size();i++){
1551 
1552 			try{
1553 				((ExternalSeedReaderListener)listeners.get(i)).requestFailed( request );
1554 
1555 			}catch( Throwable e ){
1556 
1557 				e.printStackTrace();
1558 			}
1559 		}
1560 	}
1561 
1562 	public void
addListener( ExternalSeedReaderListener l )1563 	addListener(
1564 		ExternalSeedReaderListener	l )
1565 	{
1566 		listeners.add( l );
1567 	}
1568 
1569 	public void
removeListener( ExternalSeedReaderListener l )1570 	removeListener(
1571 		ExternalSeedReaderListener	l )
1572 	{
1573 		listeners.remove( l );
1574 	}
1575 
1576 	protected int
getIntParam( Map map, String name, int def )1577 	getIntParam(
1578 		Map			map,
1579 		String		name,
1580 		int			def )
1581 	{
1582 		Object	obj = map.get(name);
1583 
1584 		if ( obj instanceof Long ){
1585 
1586 			return(((Long)obj).intValue());
1587 		}
1588 
1589 		return( def );
1590 	}
1591 
1592 	protected boolean
getBooleanParam( Map map, String name, boolean def )1593 	getBooleanParam(
1594 		Map			map,
1595 		String		name,
1596 		boolean		def )
1597 	{
1598 		return( getIntParam( map, name, def?1:0) != 0 );
1599 	}
1600 
1601 	protected static class
1602 	MutableInteger
1603 	{
1604 		private int	value;
1605 
1606 		protected
MutableInteger( int v )1607 		MutableInteger(
1608 			int		v )
1609 		{
1610 			value = v;
1611 		}
1612 
1613 		protected void
setValue( int v )1614 		setValue(
1615 			int	v )
1616 		{
1617 			value = v;
1618 		}
1619 
1620 		protected int
getValue()1621 		getValue()
1622 		{
1623 			return( value );
1624 		}
1625 
1626 		public int
hashCode()1627 		hashCode()
1628 		{
1629 			return value;
1630 		}
1631 
1632 		public boolean
equals( Object obj )1633 		equals(
1634 			Object obj )
1635 		{
1636 			if (obj instanceof MutableInteger) {
1637 				return value == ((MutableInteger)obj).value;
1638 			}
1639 			return false;
1640 		}
1641 	}
1642 }
1643