1 /* 2 * Created on Sep 28, 2004 3 * Created by Alon Rohter 4 * Copyright (C) Azureus Software, Inc, All Rights Reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 2 9 * of the License, or (at your option) any later version. 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * You should have received a copy of the GNU General Public License 15 * along with this program; if not, write to the Free Software 16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 17 * 18 */ 19 20 package com.aelitis.azureus.core.networkmanager.impl; 21 22 import java.io.IOException; 23 24 import org.gudy.azureus2.core3.util.AEDiagnostics; 25 import org.gudy.azureus2.core3.util.Debug; 26 27 import com.aelitis.azureus.core.networkmanager.EventWaiter; 28 import com.aelitis.azureus.core.networkmanager.NetworkConnectionBase; 29 import com.aelitis.azureus.core.networkmanager.RateHandler; 30 31 32 /** 33 * A fast write entity backed by a single peer connection. 34 */ 35 public class SinglePeerUploader implements RateControlledEntity { 36 37 private final NetworkConnectionBase connection; 38 private final RateHandler rate_handler; 39 SinglePeerUploader( NetworkConnectionBase connection, RateHandler rate_handler )40 public SinglePeerUploader( NetworkConnectionBase connection, RateHandler rate_handler ) { 41 this.connection = connection; 42 this.rate_handler = rate_handler; 43 } 44 45 public RateHandler getRateHandler()46 getRateHandler() 47 { 48 return( rate_handler ); 49 } 50 51 ////////////////RateControlledWriteEntity implementation //////////////////// 52 canProcess(EventWaiter waiter)53 public boolean canProcess(EventWaiter waiter) { 54 if( !connection.getTransportBase().isReadyForWrite(waiter) ) { 55 return false; //underlying transport not ready 56 } 57 if( connection.getOutgoingMessageQueue().getTotalSize() < 1 ) { 58 return false; //no data to send 59 } 60 int[] allowed = rate_handler.getCurrentNumBytesAllowed(); 61 62 if( allowed[0] < 1 && allowed[1] == 0 ) { 63 return false; //not allowed to send any bytes 64 } 65 return true; 66 } 67 doProcessing(EventWaiter waiter, int max_bytes )68 public int doProcessing(EventWaiter waiter, int max_bytes ) { 69 if( !connection.getTransportBase().isReadyForWrite(waiter) ) { 70 //Debug.out("dW:not ready"); happens sometimes, just live with it as non-fatal 71 return 0; 72 } 73 74 int[] allowed = rate_handler.getCurrentNumBytesAllowed(); 75 76 int num_bytes_allowed = allowed[0]; 77 78 boolean protocol_is_free = allowed[1] > 0; 79 80 if ( num_bytes_allowed < 1 ){ 81 82 if ( protocol_is_free ){ 83 84 num_bytes_allowed = 0; // in case negative 85 86 }else{ 87 88 return( 0 ); 89 } 90 } 91 92 if ( max_bytes > 0 && max_bytes < num_bytes_allowed ){ 93 94 num_bytes_allowed = max_bytes; 95 } 96 97 int num_bytes_available = connection.getOutgoingMessageQueue().getTotalSize(); 98 99 if ( num_bytes_available < 1 ){ 100 101 if ( !connection.getOutgoingMessageQueue().isDestroyed()){ 102 103 //Debug.out("dW:not avail"); happens sometimes, just live with it as non-fatal 104 } 105 106 return 0; 107 } 108 109 int num_bytes_to_write = num_bytes_allowed > num_bytes_available ? num_bytes_available : num_bytes_allowed; 110 111 //int mss = NetworkManager.getTcpMssSize(); 112 //if( num_bytes_to_write > mss ) num_bytes_to_write = mss; 113 114 int[] written; 115 116 try { 117 118 written = connection.getOutgoingMessageQueue().deliverToTransport( num_bytes_to_write, protocol_is_free, false ); 119 120 }catch( Throwable e ) { 121 122 written = new int[2]; 123 124 if( AEDiagnostics.TRACE_CONNECTION_DROPS ) { 125 if( e.getMessage() == null ) { 126 Debug.out( "null write exception message: ", e ); 127 } 128 else { 129 if( e.getMessage().indexOf( "An existing connection was forcibly closed by the remote host" ) == -1 && 130 e.getMessage().indexOf( "Connection reset by peer" ) == -1 && 131 e.getMessage().indexOf( "Broken pipe" ) == -1 && 132 e.getMessage().indexOf( "An established connection was aborted by the software in your host machine" ) == -1 ) { 133 134 System.out.println( "SP: write exception [" +connection.getTransportBase().getDescription()+ "]: " +e.getMessage() ); 135 } 136 } 137 } 138 139 if (! (e instanceof IOException )){ 140 141 Debug.printStackTrace(e); 142 } 143 144 connection.notifyOfException( e ); 145 return 0; 146 } 147 148 int data_bytes_written = written[0]; 149 int protocol_bytes_written = written[1]; 150 151 int total_written =data_bytes_written + protocol_bytes_written; 152 153 if ( total_written < 1 ){ 154 155 return 0; 156 } 157 158 rate_handler.bytesProcessed( data_bytes_written, protocol_bytes_written ); 159 160 return( total_written ); 161 } 162 getPriority()163 public int getPriority() { 164 return RateControlledEntity.PRIORITY_NORMAL; 165 } 166 167 public boolean getPriorityBoost()168 getPriorityBoost() 169 { 170 return( connection.getOutgoingMessageQueue().getPriorityBoost()); 171 } 172 173 public long getBytesReadyToWrite()174 getBytesReadyToWrite() 175 { 176 return( connection.getOutgoingMessageQueue().getTotalSize()); 177 } 178 179 public int getConnectionCount( EventWaiter waiter )180 getConnectionCount( EventWaiter waiter ) 181 { 182 return( 1 ); 183 } 184 185 public int getReadyConnectionCount( EventWaiter waiter )186 getReadyConnectionCount( 187 EventWaiter waiter ) 188 { 189 if ( connection.getTransportBase().isReadyForWrite(waiter)){ 190 191 return( 1 ); 192 } 193 194 return( 0 ); 195 } 196 197 public String getString()198 getString() 199 { 200 return( "SPU: bytes_allowed=" + rate_handler.getCurrentNumBytesAllowed() + " " + connection.getString()); 201 } 202 203 } 204