1 /* 2 * Created on 15-Dec-2005 3 * Created by Paul Gardner 4 * Copyright (C) Azureus Software, Inc, All Rights Reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 2 9 * of the License, or (at your option) any later version. 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * You should have received a copy of the GNU General Public License 15 * along with this program; if not, write to the Free Software 16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 17 * 18 */ 19 20 package com.aelitis.azureus.plugins.extseed.impl; 21 22 import java.util.*; 23 24 import org.gudy.azureus2.core3.config.COConfigurationManager; 25 import org.gudy.azureus2.core3.config.ParameterListener; 26 import org.gudy.azureus2.core3.config.impl.TransferSpeedValidator; 27 import org.gudy.azureus2.core3.util.AENetworkClassifier; 28 import org.gudy.azureus2.core3.util.AESemaphore; 29 import org.gudy.azureus2.core3.util.Debug; 30 import org.gudy.azureus2.core3.util.HostNameToIPResolver; 31 import org.gudy.azureus2.core3.util.SystemTime; 32 import org.gudy.azureus2.plugins.PluginInterface; 33 import org.gudy.azureus2.plugins.clientid.ClientIDGenerator; 34 import org.gudy.azureus2.plugins.download.Download; 35 import org.gudy.azureus2.plugins.download.DownloadAnnounceResult; 36 import org.gudy.azureus2.plugins.peers.Peer; 37 import org.gudy.azureus2.plugins.peers.PeerManager; 38 import org.gudy.azureus2.plugins.peers.PeerManagerEvent; 39 import org.gudy.azureus2.plugins.peers.PeerManagerListener2; 40 import org.gudy.azureus2.plugins.peers.PeerReadRequest; 41 import org.gudy.azureus2.plugins.peers.PeerStats; 42 import org.gudy.azureus2.plugins.peers.Piece; 43 import org.gudy.azureus2.plugins.torrent.Torrent; 44 import org.gudy.azureus2.plugins.utils.Monitor; 45 import org.gudy.azureus2.plugins.utils.PooledByteBuffer; 46 import org.gudy.azureus2.plugins.utils.Semaphore; 47 import org.gudy.azureus2.pluginsimpl.local.PluginCoreUtils; 48 49 import com.aelitis.azureus.core.util.CopyOnWriteSet; 50 import com.aelitis.azureus.plugins.extseed.ExternalSeedException; 51 import com.aelitis.azureus.plugins.extseed.ExternalSeedPeer; 52 import com.aelitis.azureus.plugins.extseed.ExternalSeedPlugin; 53 import com.aelitis.azureus.plugins.extseed.ExternalSeedReader; 54 import com.aelitis.azureus.plugins.extseed.ExternalSeedReaderListener; 55 import com.aelitis.azureus.plugins.extseed.util.ExternalSeedHTTPDownloaderListener; 56 57 public abstract class 58 ExternalSeedReaderImpl 59 implements ExternalSeedReader, PeerManagerListener2 60 { 61 public static final int RECONNECT_DEFAULT = 30*1000; 62 public static final int INITIAL_DELAY = 30*1000; 63 public static final int STALLED_DOWNLOAD_SPEED = 20*1024; 64 public static final int STALLED_PEER_SPEED = 5*1024; 65 66 public static final int TOP_PIECE_PRIORITY = 100*1000; 67 68 private static boolean use_avail_to_activate; 69 70 static{ 71 COConfigurationManager.addAndFireParameterListener( 72 "webseed.activation.uses.availability", 73 new ParameterListener() 74 { 75 public void 76 parameterChanged( 77 String name ) 78 { 79 use_avail_to_activate = COConfigurationManager.getBooleanParameter( name ); 80 } 81 }); 82 } 83 84 private ExternalSeedPlugin plugin; 85 private Torrent torrent; 86 87 private final String host; 88 private final String host_net; 89 90 private String ip_use_accessor; 91 92 private String status; 93 94 private boolean active; 95 private boolean permanent_fail; 96 97 private long last_failed_read; 98 private int consec_failures; 99 100 private String user_agent; 101 102 private long peer_manager_change_time; 103 104 private volatile PeerManager current_manager; 105 106 private List<PeerReadRequest> requests = new LinkedList<PeerReadRequest>(); 107 private List<PeerReadRequest> dangling_requests; 108 109 private Thread request_thread; 110 private Semaphore request_sem; 111 private Monitor requests_mon; 112 113 private ExternalSeedReaderRequest active_read_request; 114 115 private int[] priority_offsets; 116 117 private boolean fast_activate; 118 private int min_availability; 119 private int min_download_speed; 120 private int max_peer_speed; 121 private long valid_until; 122 private boolean transient_seed; 123 124 private int reconnect_delay = RECONNECT_DEFAULT; 125 126 private volatile ExternalSeedReaderRequest current_request; 127 128 private List listeners = new ArrayList(); 129 130 private AESemaphore rate_sem = new AESemaphore( "ExternalSeedReaderRequest" ); 131 private int rate_bytes_read; 132 private int rate_bytes_permitted; 133 134 private volatile CopyOnWriteSet<MutableInteger> bad_pieces = new CopyOnWriteSet<MutableInteger>( true ); 135 136 protected ExternalSeedReaderImpl( ExternalSeedPlugin _plugin, Torrent _torrent, String _host, Map _params )137 ExternalSeedReaderImpl( 138 ExternalSeedPlugin _plugin, 139 Torrent _torrent, 140 String _host, 141 Map _params ) 142 { 143 plugin = _plugin; 144 torrent = _torrent; 145 host = _host; 146 147 host_net = AENetworkClassifier.categoriseAddress( host ); 148 149 fast_activate = getBooleanParam( _params, "fast_start", false ); 150 min_availability = getIntParam( _params, "min_avail", 1 ); // default is avail based 151 min_download_speed = getIntParam( _params, "min_speed", 0 ); 152 max_peer_speed = getIntParam( _params, "max_speed", 0 ); 153 valid_until = getIntParam( _params, "valid_ms", 0 ); 154 155 if ( valid_until > 0 ){ 156 157 valid_until += getSystemTime(); 158 } 159 160 transient_seed = getBooleanParam( _params, "transient", false ); 161 162 requests_mon = plugin.getPluginInterface().getUtilities().getMonitor(); 163 request_sem = plugin.getPluginInterface().getUtilities().getSemaphore(); 164 165 PluginInterface pi = plugin.getPluginInterface(); 166 167 user_agent = pi.getAzureusName(); 168 169 try{ 170 Properties props = new Properties(); 171 172 pi.getClientIDManager().getGenerator().generateHTTPProperties( torrent.getHash(), props ); 173 174 String ua = props.getProperty( ClientIDGenerator.PR_USER_AGENT ); 175 176 if ( ua != null ){ 177 178 user_agent = ua; 179 } 180 }catch( Throwable e ){ 181 } 182 183 setActive( null, false ); 184 } 185 186 public String getIP()187 getIP() 188 { 189 synchronized( host ){ 190 191 if ( ip_use_accessor == null ){ 192 193 try{ 194 ip_use_accessor = HostNameToIPResolver.syncResolve( host ).getHostAddress(); 195 196 }catch( Throwable e ){ 197 198 ip_use_accessor = host; 199 200 Debug.out( e ); 201 } 202 } 203 204 return( ip_use_accessor ); 205 } 206 } 207 208 public Torrent getTorrent()209 getTorrent() 210 { 211 return( torrent ); 212 } 213 214 public String getStatus()215 getStatus() 216 { 217 return( status ); 218 } 219 220 public boolean isTransient()221 isTransient() 222 { 223 return( transient_seed ); 224 } 225 226 protected void log( String str )227 log( 228 String str ) 229 { 230 plugin.log( str ); 231 } 232 233 protected String getUserAgent()234 getUserAgent() 235 { 236 return( user_agent ); 237 } 238 protected long getSystemTime()239 getSystemTime() 240 { 241 return( plugin.getPluginInterface().getUtilities().getCurrentSystemTime()); 242 } 243 244 protected int getFailureCount()245 getFailureCount() 246 { 247 return( consec_failures ); 248 } 249 250 protected long getLastFailTime()251 getLastFailTime() 252 { 253 return( last_failed_read ); 254 } 255 256 public boolean isPermanentlyUnavailable()257 isPermanentlyUnavailable() 258 { 259 return( permanent_fail ); 260 } 261 262 protected void setReconnectDelay( int delay, boolean reset_failures )263 setReconnectDelay( 264 int delay, 265 boolean reset_failures ) 266 { 267 reconnect_delay = delay; 268 269 if ( reset_failures ){ 270 271 consec_failures = 0; 272 } 273 } 274 275 public void eventOccurred( PeerManagerEvent event )276 eventOccurred( 277 PeerManagerEvent event ) 278 { 279 if ( event.getType() == PeerManagerEvent.ET_PEER_SENT_BAD_DATA ){ 280 281 if ( event.getPeer().getIp().equals( getIP())){ 282 283 if ( bad_pieces.size() > 128 ){ 284 285 return; 286 } 287 288 bad_pieces.add(new MutableInteger((Integer)event.getData())); 289 } 290 } 291 } 292 293 protected boolean readyToActivate( PeerManager peer_manager, Peer peer, long time_since_start )294 readyToActivate( 295 PeerManager peer_manager, 296 Peer peer, 297 long time_since_start ) 298 { 299 boolean early_days = time_since_start < INITIAL_DELAY; 300 301 try{ 302 Download download = peer_manager.getDownload(); 303 304 // first respect failure count 305 306 int fail_count = getFailureCount(); 307 308 if ( fail_count > 0 ){ 309 310 int delay = reconnect_delay; 311 312 for (int i=1;i<fail_count;i++){ 313 314 delay += delay; 315 316 if ( delay > 30*60*1000 ){ 317 318 break; 319 } 320 } 321 322 long now = getSystemTime(); 323 324 long last_fail = getLastFailTime(); 325 326 if ( last_fail < now && now - last_fail < delay ){ 327 328 return( false ); 329 } 330 } 331 332 // next obvious things like validity and the fact that we're complete 333 334 if ( valid_until > 0 && getSystemTime() > valid_until ){ 335 336 return( false ); 337 } 338 339 if ( download.getState() != Download.ST_DOWNLOADING ){ 340 341 return( false ); 342 } 343 344 // check dnd completeness too 345 346 if ( download.isComplete()){ 347 348 return( false ); 349 } 350 351 if ( !PluginCoreUtils.unwrap( download ).getDownloadState().isNetworkEnabled( host_net )){ 352 353 return( false ); 354 } 355 356 // now the more interesting stuff 357 358 if ( transient_seed ){ 359 360 // kick any existing peers that are running too slowly if the download appears 361 // to be stalled 362 363 Peer[] existing_peers = peer_manager.getPeers( getIP()); 364 365 int existing_peer_count = existing_peers.length; 366 367 int global_limit = TransferSpeedValidator.getGlobalDownloadRateLimitBytesPerSecond(); 368 369 if ( global_limit > 0 ){ 370 371 // if we have a global limit in force and we are near it then no point in 372 // activating 373 374 int current_down = plugin.getGlobalDownloadRateBytesPerSec(); 375 376 if ( global_limit - current_down < 5*1024 ){ 377 378 return( false ); 379 } 380 } 381 382 int download_limit = peer_manager.getDownloadRateLimitBytesPerSecond(); 383 384 if ( global_limit > 0 && global_limit < download_limit ){ 385 386 download_limit = global_limit; 387 } 388 389 if ( ( download_limit == 0 || download_limit > STALLED_DOWNLOAD_SPEED + 5*1024 ) && 390 peer_manager.getStats().getDownloadAverage() < STALLED_DOWNLOAD_SPEED ){ 391 392 for (int i=0;i<existing_peers.length;i++){ 393 394 Peer existing_peer = existing_peers[i]; 395 396 // no point in booting ourselves! 397 398 if ( existing_peer instanceof ExternalSeedPeer ){ 399 400 continue; 401 } 402 403 PeerStats stats = existing_peer.getStats(); 404 405 if ( stats.getTimeSinceConnectionEstablished() > INITIAL_DELAY ){ 406 407 if ( stats.getDownloadAverage() < STALLED_PEER_SPEED ){ 408 409 existing_peer.close( "Replacing slow peer with web-seed", false, false ); 410 411 existing_peer_count--; 412 } 413 } 414 } 415 } 416 417 if ( existing_peer_count == 0 ){ 418 419 // check to see if we have pending connections to the same address 420 421 if ( peer_manager.getPendingPeers( getIP()).length == 0 ){ 422 423 log( getName() + ": activating as transient seed and nothing blocking it" ); 424 425 return( true ); 426 } 427 } 428 } 429 430 // availability and speed based stuff needs a little time before being applied 431 432 if ( !use_avail_to_activate ){ 433 434 log( getName() + ": activating as availability-based activation disabled" ); 435 436 return( true ); 437 } 438 439 if ( fast_activate || !early_days ){ 440 441 if ( min_availability > 0 ){ 442 443 float availability = download.getStats().getAvailability(); 444 445 if ( availability < min_availability){ 446 447 log( getName() + ": activating as availability is poor" ); 448 449 return( true ); 450 } 451 } 452 453 if ( min_download_speed > 0 ){ 454 455 if ( peer_manager.getStats().getDownloadAverage() < min_download_speed ){ 456 457 log( getName() + ": activating as speed is slow" ); 458 459 return( true ); 460 } 461 } 462 } 463 464 // if we have an announce result and there are no seeds, or it failed then go for it 465 466 DownloadAnnounceResult ar = download.getLastAnnounceResult(); 467 468 if ( ar != null ){ 469 470 if ( ar.getResponseType() == DownloadAnnounceResult.RT_ERROR ){ 471 472 log( getName() + ": activating as tracker unavailable" ); 473 474 return( true ); 475 } 476 477 if ( ar.getSeedCount() == 0 ){ 478 479 log( getName() + ": activating as no seeds" ); 480 481 return( true ); 482 } 483 } 484 }catch( Throwable e ){ 485 486 Debug.printStackTrace(e); 487 } 488 489 return( false ); 490 } 491 492 protected boolean readyToDeactivate( PeerManager peer_manager, Peer peer )493 readyToDeactivate( 494 PeerManager peer_manager, 495 Peer peer ) 496 { 497 try{ 498 // obvious stuff first 499 500 if ( valid_until > 0 && getSystemTime() > valid_until ){ 501 502 return( true ); 503 } 504 505 if ( peer_manager.getDownload().getState() == Download.ST_SEEDING ){ 506 507 return( true ); 508 } 509 510 // more interesting stuff 511 512 if ( transient_seed ){ 513 514 return( false ); 515 } 516 517 boolean deactivate = false; 518 String reason = ""; 519 520 if ( use_avail_to_activate ){ 521 522 if ( min_availability > 0 ){ 523 524 float availability = peer_manager.getDownload().getStats().getAvailability(); 525 526 if ( availability >= min_availability + 1 ){ 527 528 reason = "availability is good"; 529 530 deactivate = true; 531 } 532 } 533 534 if ( min_download_speed > 0 ){ 535 536 long my_speed = peer.getStats().getDownloadAverage(); 537 538 long overall_speed = peer_manager.getStats().getDownloadAverage(); 539 540 if ( overall_speed - my_speed > 2 * min_download_speed ){ 541 542 reason += (reason.length()==0?"":", ") + "speed is good"; 543 544 deactivate = true; 545 546 }else{ 547 548 deactivate = false; 549 } 550 } 551 } 552 553 if ( deactivate ){ 554 555 log( getName() + ": deactivating as " + reason ); 556 557 return( true ); 558 } 559 }catch( Throwable e ){ 560 561 Debug.printStackTrace(e); 562 } 563 564 return( false ); 565 } 566 567 public boolean checkActivation( PeerManager peer_manager, Peer peer )568 checkActivation( 569 PeerManager peer_manager, 570 Peer peer ) 571 { 572 long now = getSystemTime(); 573 574 if ( peer_manager == current_manager ){ 575 576 if ( peer_manager_change_time > now ){ 577 578 peer_manager_change_time = now; 579 } 580 581 long time_since_started = now - peer_manager_change_time; 582 583 584 if ( peer_manager != null ){ 585 586 if ( active ){ 587 588 if ( now - peer_manager_change_time > INITIAL_DELAY && readyToDeactivate( peer_manager, peer )){ 589 590 setActive( peer_manager, false ); 591 592 }else{ 593 594 if ( max_peer_speed > 0 ){ 595 596 PeerStats ps = peer.getStats(); 597 598 if ( ps != null && ps.getDownloadRateLimit() != max_peer_speed ){ 599 600 ps.setDownloadRateLimit( max_peer_speed ); 601 } 602 } 603 } 604 }else{ 605 606 if ( !isPermanentlyUnavailable()){ 607 608 if ( readyToActivate( peer_manager, peer, time_since_started )){ 609 610 if ( max_peer_speed > 0 ){ 611 612 PeerStats ps = peer.getStats(); 613 614 if ( ps != null ){ 615 616 ps.setDownloadRateLimit( max_peer_speed ); 617 } 618 } 619 620 setActive( peer_manager, true ); 621 } 622 } 623 } 624 } 625 }else{ 626 627 // if the peer manager's changed then we always go inactive for a period to wait for 628 // download status to stabilise a bit 629 630 peer_manager_change_time = now; 631 632 PeerManager existing_manager = current_manager; 633 634 if ( current_manager != null ){ 635 636 current_manager.removeListener( this ); 637 } 638 639 current_manager = peer_manager; 640 641 if ( current_manager != null ){ 642 643 current_manager.addListener( this ); 644 } 645 646 setActive( existing_manager, false ); 647 } 648 649 return( active ); 650 } 651 652 public void deactivate( String reason )653 deactivate( 654 String reason ) 655 { 656 plugin.log( getName() + ": deactivating (" + reason + ")" ); 657 658 checkActivation( null, null ); 659 } 660 661 protected void setActive( PeerManager _peer_manager, boolean _active )662 setActive( 663 PeerManager _peer_manager, 664 boolean _active ) 665 { 666 try{ 667 requests_mon.enter(); 668 669 active = _active; 670 671 status = active?"Active":"Idle"; 672 673 rate_bytes_permitted = 0; 674 rate_bytes_read = 0; 675 676 setActiveSupport( _peer_manager, _active ); 677 678 }finally{ 679 680 requests_mon.exit(); 681 } 682 } 683 684 protected void setActiveSupport( PeerManager _peer_manager, boolean _active )685 setActiveSupport( 686 PeerManager _peer_manager, 687 boolean _active ) 688 { 689 // overridden if needed 690 } 691 692 public boolean isActive()693 isActive() 694 { 695 return( active ); 696 } 697 698 protected void processRequests()699 processRequests() 700 { 701 try{ 702 requests_mon.enter(); 703 704 if ( request_thread != null ){ 705 706 return; 707 } 708 709 request_thread = Thread.currentThread(); 710 711 }finally{ 712 713 requests_mon.exit(); 714 } 715 716 while( true ){ 717 718 try{ 719 if ( !request_sem.reserve(30000)){ 720 721 try{ 722 requests_mon.enter(); 723 724 if ( requests.size() == 0 ){ 725 726 dangling_requests = null; 727 728 request_thread = null; 729 730 break; 731 } 732 }finally{ 733 734 requests_mon.exit(); 735 } 736 }else{ 737 738 List<PeerReadRequest> selected_requests = new ArrayList<PeerReadRequest>(); 739 PeerReadRequest cancelled_request = null; 740 741 try{ 742 requests_mon.enter(); 743 744 // get an advisory set to process together 745 746 int count = selectRequests( requests ); 747 748 if ( count <= 0 || count > requests.size()){ 749 750 Debug.out( "invalid count" ); 751 752 count = 1; 753 } 754 755 for (int i=0;i<count;i++){ 756 757 PeerReadRequest request = requests.remove(0); 758 759 if ( request.isCancelled()){ 760 761 // if this is the first request then process it, otherwise leave 762 // for the next-round 763 764 if ( i == 0 ){ 765 766 cancelled_request = request; 767 768 }else{ 769 770 requests.add( 0, request ); 771 } 772 773 break; 774 775 }else{ 776 777 selected_requests.add( request ); 778 779 if ( i > 0 ){ 780 781 // we've only got the sem for the first request, catch up for subsequent 782 783 request_sem.reserve(); 784 } 785 } 786 } 787 788 dangling_requests = new ArrayList<PeerReadRequest>( selected_requests ); 789 790 }finally{ 791 792 requests_mon.exit(); 793 } 794 795 if ( cancelled_request != null ){ 796 797 informCancelled( cancelled_request ); 798 799 }else{ 800 801 processRequests( selected_requests ); 802 } 803 } 804 }catch( Throwable e ){ 805 806 e.printStackTrace(); 807 } 808 } 809 } 810 811 /** 812 * Rate handling 813 */ 814 815 public int readBytes( int max )816 readBytes( 817 int max ) 818 { 819 // permission to read a bunch of bytes 820 821 // we're out of step here due to multiple threads so we have to report what 822 // has already happened and prepare for what will 823 824 int res = 0; 825 826 synchronized( rate_sem ){ 827 828 if ( rate_bytes_read > 0 ){ 829 830 res = rate_bytes_read; 831 832 if ( res > max ){ 833 834 res = max; 835 } 836 837 rate_bytes_read -= res; 838 } 839 840 int rem = max - res; 841 842 if ( rem > rate_bytes_permitted ){ 843 844 if ( rate_bytes_permitted == 0 ){ 845 846 rate_sem.release(); 847 } 848 849 rate_bytes_permitted = rem; 850 } 851 852 // if things are way out then hack them back - most likely a change from unlimited to limited... 853 854 if ( rate_bytes_permitted > max*10L ){ 855 856 rate_bytes_permitted = max; 857 } 858 } 859 860 return( res ); 861 } 862 863 public int getPermittedBytes()864 getPermittedBytes() 865 866 throws ExternalSeedException 867 { 868 synchronized( rate_sem ){ 869 870 if ( rate_bytes_permitted > 0 ){ 871 872 return( rate_bytes_permitted ); 873 } 874 } 875 876 if ( !rate_sem.reserve( 1000 )){ 877 878 return( 1 ); // one byte a sec to check for connection liveness 879 } 880 881 return( rate_bytes_permitted ); 882 } 883 884 public void reportBytesRead( int num )885 reportBytesRead( 886 int num ) 887 { 888 synchronized( rate_sem ){ 889 890 rate_bytes_read += num; 891 892 rate_bytes_permitted -= num; 893 894 if ( rate_bytes_permitted < 0 ){ 895 896 rate_bytes_permitted = 0; 897 } 898 } 899 } 900 901 public int getPercentDoneOfCurrentIncomingRequest()902 getPercentDoneOfCurrentIncomingRequest() 903 { 904 ExternalSeedReaderRequest cr = current_request; 905 906 if ( cr == null ){ 907 908 return( -1 ); 909 } 910 911 return( cr.getPercentDoneOfCurrentIncomingRequest()); 912 } 913 914 public int getMaximumNumberOfRequests()915 getMaximumNumberOfRequests() 916 { 917 if ( getRequestCount() == 0 ){ 918 919 return((int)(( getPieceGroupSize() * torrent.getPieceSize() ) / PeerReadRequest.NORMAL_REQUEST_SIZE )); 920 921 }else{ 922 923 return( 0 ); 924 } 925 } 926 927 public void calculatePriorityOffsets( PeerManager peer_manager, int[] base_priorities )928 calculatePriorityOffsets( 929 PeerManager peer_manager, 930 int[] base_priorities ) 931 { 932 try{ 933 Piece[] pieces = peer_manager.getPieces(); 934 935 int piece_group_size = getPieceGroupSize(); 936 937 int[] contiguous_best_pieces = new int[piece_group_size]; 938 int[] contiguous_highest_pri = new int[piece_group_size]; 939 940 Arrays.fill( contiguous_highest_pri, -1 ); 941 942 int contiguous = 0; 943 int contiguous_best_pri = -1; 944 945 int max_contiguous = 0; 946 947 int max_free_reqs = 0; 948 int max_free_reqs_piece = -1; 949 950 MutableInteger mi = new MutableInteger(0); 951 952 for (int i=0;i<pieces.length;i++){ 953 954 mi.setValue( i ); 955 956 if ( bad_pieces.contains(mi)){ 957 958 continue; 959 } 960 961 Piece piece = pieces[i]; 962 963 if ( piece.isFullyAllocatable()){ 964 965 contiguous++; 966 967 int base_pri = base_priorities[i]; 968 969 if ( base_pri > contiguous_best_pri ){ 970 971 contiguous_best_pri = base_pri; 972 } 973 974 for (int j=0;j<contiguous && j<contiguous_highest_pri.length;j++){ 975 976 if ( contiguous_best_pri > contiguous_highest_pri[j] ){ 977 978 contiguous_highest_pri[j] = contiguous_best_pri; 979 contiguous_best_pieces[j] = i - j; 980 } 981 982 if ( j+1 > max_contiguous ){ 983 984 max_contiguous = j+1; 985 } 986 } 987 988 }else{ 989 990 contiguous = 0; 991 contiguous_best_pri = -1; 992 993 if ( max_contiguous == 0 ){ 994 995 int free_reqs = piece.getAllocatableRequestCount(); 996 997 if ( free_reqs > max_free_reqs ){ 998 999 max_free_reqs = free_reqs; 1000 max_free_reqs_piece = i; 1001 } 1002 } 1003 } 1004 } 1005 1006 if ( max_contiguous == 0 ){ 1007 1008 if ( max_free_reqs_piece >= 0 ){ 1009 1010 priority_offsets = new int[ (int)getTorrent().getPieceCount()]; 1011 1012 priority_offsets[max_free_reqs_piece] = TOP_PIECE_PRIORITY; 1013 1014 }else{ 1015 1016 priority_offsets = null; 1017 } 1018 }else{ 1019 1020 priority_offsets = new int[ (int)getTorrent().getPieceCount()]; 1021 1022 int start_piece = contiguous_best_pieces[max_contiguous-1]; 1023 1024 for (int i=start_piece;i<start_piece+max_contiguous;i++){ 1025 1026 priority_offsets[i] = TOP_PIECE_PRIORITY - (i-start_piece); 1027 } 1028 } 1029 }catch( Throwable e ){ 1030 1031 Debug.printStackTrace(e); 1032 1033 priority_offsets = null; 1034 } 1035 } 1036 1037 protected abstract int getPieceGroupSize()1038 getPieceGroupSize(); 1039 1040 protected abstract boolean getRequestCanSpanPieces()1041 getRequestCanSpanPieces(); 1042 1043 public int[] getPriorityOffsets()1044 getPriorityOffsets() 1045 { 1046 return( priority_offsets ); 1047 } 1048 1049 protected int selectRequests( List<PeerReadRequest> requests )1050 selectRequests( 1051 List<PeerReadRequest> requests ) 1052 { 1053 long next_start = -1; 1054 1055 int last_piece_number = -1; 1056 1057 for (int i=0;i<requests.size();i++){ 1058 1059 PeerReadRequest request = (PeerReadRequest)requests.get(i); 1060 1061 int this_piece_number = request.getPieceNumber(); 1062 1063 if ( last_piece_number != -1 && last_piece_number != this_piece_number ){ 1064 1065 if ( !getRequestCanSpanPieces()){ 1066 1067 return( i ); 1068 } 1069 } 1070 1071 long this_start = this_piece_number * torrent.getPieceSize() + request.getOffset(); 1072 1073 if ( next_start != -1 && this_start != next_start ){ 1074 1075 return(i); 1076 } 1077 1078 next_start = this_start + request.getLength(); 1079 1080 last_piece_number = this_piece_number; 1081 } 1082 1083 return( requests.size()); 1084 } 1085 1086 public byte[] read( int piece_number, int piece_offset, int length, final int timeout )1087 read( 1088 int piece_number, 1089 int piece_offset, 1090 int length, 1091 final int timeout ) 1092 1093 throws ExternalSeedException 1094 { 1095 final byte[] result = new byte[ length ]; 1096 1097 ExternalSeedHTTPDownloaderListener listener = 1098 new ExternalSeedHTTPDownloaderListener() 1099 { 1100 private int bp; 1101 private long start_time = SystemTime.getCurrentTime(); 1102 1103 public byte[] 1104 getBuffer() 1105 1106 throws ExternalSeedException 1107 { 1108 return( result ); 1109 } 1110 1111 public void 1112 setBufferPosition( 1113 int position ) 1114 { 1115 bp = position; 1116 } 1117 1118 public int 1119 getBufferPosition() 1120 { 1121 return( bp ); 1122 } 1123 1124 public int 1125 getBufferLength() 1126 { 1127 return( result.length ); 1128 } 1129 1130 public int 1131 getPermittedBytes() 1132 1133 throws ExternalSeedException 1134 { 1135 return( result.length ); 1136 } 1137 1138 public int 1139 getPermittedTime() 1140 { 1141 if ( timeout == 0 ){ 1142 1143 return( 0 ); 1144 } 1145 1146 int rem = timeout - (int)( SystemTime.getCurrentTime() - start_time ); 1147 1148 if ( rem <= 0 ){ 1149 1150 return( -1 ); 1151 } 1152 1153 return( rem ); 1154 } 1155 1156 public void 1157 reportBytesRead( 1158 int num ) 1159 { 1160 } 1161 1162 public boolean 1163 isCancelled() 1164 { 1165 return false; 1166 } 1167 1168 public void 1169 done() 1170 { 1171 } 1172 }; 1173 1174 readData( piece_number, piece_offset, length, listener ); 1175 1176 return( result ); 1177 } 1178 1179 protected void readData( ExternalSeedReaderRequest request )1180 readData( 1181 ExternalSeedReaderRequest request ) 1182 1183 throws ExternalSeedException 1184 { 1185 readData( request.getStartPieceNumber(), request.getStartPieceOffset(), request.getLength(), request ); 1186 } 1187 1188 protected abstract void readData( int piece_number, int piece_offset, int length, ExternalSeedHTTPDownloaderListener listener )1189 readData( 1190 int piece_number, 1191 int piece_offset, 1192 int length, 1193 ExternalSeedHTTPDownloaderListener listener ) 1194 1195 throws ExternalSeedException; 1196 1197 protected void processRequests( List<PeerReadRequest> requests )1198 processRequests( 1199 List<PeerReadRequest> requests ) 1200 { 1201 boolean ok = false; 1202 1203 ExternalSeedReaderRequest request = new ExternalSeedReaderRequest( this, requests ); 1204 1205 active_read_request = request; 1206 1207 try{ 1208 current_request = request; 1209 1210 readData( request ); 1211 1212 ok = true; 1213 1214 }catch( ExternalSeedException e ){ 1215 1216 if ( e.isPermanentFailure()){ 1217 1218 permanent_fail = true; 1219 } 1220 1221 status = "Failed: " + Debug.getNestedExceptionMessage(e); 1222 1223 request.failed(); 1224 1225 }catch( Throwable e ){ 1226 1227 status = "Failed: " + Debug.getNestedExceptionMessage(e); 1228 1229 request.failed(); 1230 1231 }finally{ 1232 1233 active_read_request = null; 1234 1235 if ( ok ){ 1236 1237 last_failed_read = 0; 1238 1239 consec_failures = 0; 1240 1241 }else{ 1242 last_failed_read = getSystemTime(); 1243 1244 consec_failures++; 1245 } 1246 } 1247 } 1248 1249 public void addRequests( List<PeerReadRequest> new_requests )1250 addRequests( 1251 List<PeerReadRequest> new_requests ) 1252 { 1253 try{ 1254 requests_mon.enter(); 1255 1256 if ( !active ){ 1257 1258 Debug.out( "request added when not active!!!!" ); 1259 } 1260 1261 for (int i=0;i<new_requests.size();i++){ 1262 1263 requests.add( new_requests.get(i)); 1264 1265 request_sem.release(); 1266 } 1267 1268 if ( request_thread == null ){ 1269 1270 plugin.getPluginInterface().getUtilities().createThread( 1271 "RequestProcessor", 1272 new Runnable() 1273 { 1274 public void 1275 run() 1276 { 1277 processRequests(); 1278 } 1279 }); 1280 } 1281 1282 }finally{ 1283 1284 requests_mon.exit(); 1285 } 1286 } 1287 1288 public void cancelRequest( PeerReadRequest request )1289 cancelRequest( 1290 PeerReadRequest request ) 1291 { 1292 try{ 1293 requests_mon.enter(); 1294 1295 if ( requests.contains( request ) && !request.isCancelled()){ 1296 1297 request.cancel(); 1298 } 1299 1300 if ( dangling_requests != null && dangling_requests.contains( request ) && !request.isCancelled()){ 1301 1302 request.cancel(); 1303 } 1304 1305 }finally{ 1306 1307 requests_mon.exit(); 1308 } 1309 } 1310 1311 public void cancelAllRequests()1312 cancelAllRequests() 1313 { 1314 try{ 1315 requests_mon.enter(); 1316 1317 for ( PeerReadRequest request: requests ){ 1318 1319 if ( !request.isCancelled()){ 1320 1321 request.cancel(); 1322 } 1323 } 1324 1325 if ( dangling_requests != null ){ 1326 1327 for ( PeerReadRequest request: dangling_requests ){ 1328 1329 if ( !request.isCancelled()){ 1330 1331 request.cancel(); 1332 } 1333 } 1334 } 1335 1336 if ( active_read_request != null ){ 1337 1338 active_read_request.cancel(); 1339 } 1340 }finally{ 1341 1342 requests_mon.exit(); 1343 } 1344 } 1345 1346 public int getRequestCount()1347 getRequestCount() 1348 { 1349 try{ 1350 requests_mon.enter(); 1351 1352 return( requests.size()); 1353 1354 }finally{ 1355 1356 requests_mon.exit(); 1357 } 1358 } 1359 1360 public List<PeerReadRequest> getExpiredRequests()1361 getExpiredRequests() 1362 { 1363 List<PeerReadRequest> res = null; 1364 1365 try{ 1366 requests_mon.enter(); 1367 1368 for (int i=0;i<requests.size();i++){ 1369 1370 PeerReadRequest request = (PeerReadRequest)requests.get(i); 1371 1372 if ( request.isExpired()){ 1373 1374 if ( res == null ){ 1375 1376 res = new ArrayList<PeerReadRequest>(); 1377 } 1378 1379 res.add( request ); 1380 } 1381 } 1382 }finally{ 1383 1384 requests_mon.exit(); 1385 } 1386 1387 return( res ); 1388 } 1389 1390 public List<PeerReadRequest> getRequests()1391 getRequests() 1392 { 1393 List<PeerReadRequest> res = null; 1394 1395 try{ 1396 requests_mon.enter(); 1397 1398 res = new ArrayList<PeerReadRequest>( requests ); 1399 1400 }finally{ 1401 1402 requests_mon.exit(); 1403 } 1404 1405 return( res ); 1406 } 1407 1408 public int[] getOutgoingRequestedPieceNumbers()1409 getOutgoingRequestedPieceNumbers() 1410 { 1411 try{ 1412 requests_mon.enter(); 1413 1414 int size = requests.size(); 1415 1416 if ( dangling_requests != null ){ 1417 1418 size += dangling_requests.size(); 1419 } 1420 1421 int[] res = new int[size]; 1422 1423 int pos = 0; 1424 1425 if ( dangling_requests != null ){ 1426 1427 for ( PeerReadRequest r: dangling_requests ){ 1428 1429 int piece_number = r.getPieceNumber(); 1430 1431 boolean hit = false; 1432 1433 for ( int i=0;i<pos;i++){ 1434 1435 if ( piece_number == res[i] ){ 1436 1437 hit = true; 1438 1439 break; 1440 } 1441 } 1442 1443 if ( !hit ){ 1444 1445 res[pos++] = piece_number; 1446 } 1447 } 1448 } 1449 1450 for ( PeerReadRequest r: requests ){ 1451 1452 int piece_number = r.getPieceNumber(); 1453 1454 boolean hit = false; 1455 1456 for ( int i=0;i<pos;i++){ 1457 1458 if ( piece_number == res[i] ){ 1459 1460 hit = true; 1461 1462 break; 1463 } 1464 } 1465 1466 if ( !hit ){ 1467 1468 res[pos++] = piece_number; 1469 } 1470 } 1471 1472 if ( pos == res.length ){ 1473 1474 return( res ); 1475 } 1476 1477 int[] trunc = new int[pos]; 1478 1479 System.arraycopy( res, 0, trunc, 0, pos ); 1480 1481 return( trunc ); 1482 1483 }finally{ 1484 1485 requests_mon.exit(); 1486 } 1487 } 1488 1489 public int getOutgoingRequestCount()1490 getOutgoingRequestCount() 1491 { 1492 try{ 1493 requests_mon.enter(); 1494 1495 int res = requests.size(); 1496 1497 if ( dangling_requests != null ){ 1498 1499 res += dangling_requests.size(); 1500 } 1501 1502 return( res ); 1503 1504 }finally{ 1505 1506 requests_mon.exit(); 1507 } 1508 } 1509 1510 1511 protected void informComplete( PeerReadRequest request, byte[] buffer )1512 informComplete( 1513 PeerReadRequest request, 1514 byte[] buffer ) 1515 { 1516 PooledByteBuffer pool_buffer = plugin.getPluginInterface().getUtilities().allocatePooledByteBuffer( buffer ); 1517 1518 for (int i=0;i<listeners.size();i++){ 1519 1520 try{ 1521 ((ExternalSeedReaderListener)listeners.get(i)).requestComplete( request, pool_buffer ); 1522 1523 }catch( Throwable e ){ 1524 1525 e.printStackTrace(); 1526 } 1527 } 1528 } 1529 1530 protected void informCancelled( PeerReadRequest request )1531 informCancelled( 1532 PeerReadRequest request ) 1533 { 1534 for (int i=0;i<listeners.size();i++){ 1535 1536 try{ 1537 ((ExternalSeedReaderListener)listeners.get(i)).requestCancelled( request ); 1538 1539 }catch( Throwable e ){ 1540 1541 e.printStackTrace(); 1542 } 1543 } 1544 } 1545 1546 protected void informFailed( PeerReadRequest request )1547 informFailed( 1548 PeerReadRequest request ) 1549 { 1550 for (int i=0;i<listeners.size();i++){ 1551 1552 try{ 1553 ((ExternalSeedReaderListener)listeners.get(i)).requestFailed( request ); 1554 1555 }catch( Throwable e ){ 1556 1557 e.printStackTrace(); 1558 } 1559 } 1560 } 1561 1562 public void addListener( ExternalSeedReaderListener l )1563 addListener( 1564 ExternalSeedReaderListener l ) 1565 { 1566 listeners.add( l ); 1567 } 1568 1569 public void removeListener( ExternalSeedReaderListener l )1570 removeListener( 1571 ExternalSeedReaderListener l ) 1572 { 1573 listeners.remove( l ); 1574 } 1575 1576 protected int getIntParam( Map map, String name, int def )1577 getIntParam( 1578 Map map, 1579 String name, 1580 int def ) 1581 { 1582 Object obj = map.get(name); 1583 1584 if ( obj instanceof Long ){ 1585 1586 return(((Long)obj).intValue()); 1587 } 1588 1589 return( def ); 1590 } 1591 1592 protected boolean getBooleanParam( Map map, String name, boolean def )1593 getBooleanParam( 1594 Map map, 1595 String name, 1596 boolean def ) 1597 { 1598 return( getIntParam( map, name, def?1:0) != 0 ); 1599 } 1600 1601 protected static class 1602 MutableInteger 1603 { 1604 private int value; 1605 1606 protected MutableInteger( int v )1607 MutableInteger( 1608 int v ) 1609 { 1610 value = v; 1611 } 1612 1613 protected void setValue( int v )1614 setValue( 1615 int v ) 1616 { 1617 value = v; 1618 } 1619 1620 protected int getValue()1621 getValue() 1622 { 1623 return( value ); 1624 } 1625 1626 public int hashCode()1627 hashCode() 1628 { 1629 return value; 1630 } 1631 1632 public boolean equals( Object obj )1633 equals( 1634 Object obj ) 1635 { 1636 if (obj instanceof MutableInteger) { 1637 return value == ((MutableInteger)obj).value; 1638 } 1639 return false; 1640 } 1641 } 1642 } 1643