1 /*
2  * Created on Sep 28, 2004
3  * Created by Alon Rohter
4  * Copyright (C) Azureus Software, Inc, All Rights Reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17  *
18  */
19 
20 package com.aelitis.azureus.core.networkmanager.impl;
21 
22 import java.io.IOException;
23 
24 import org.gudy.azureus2.core3.util.AEDiagnostics;
25 import org.gudy.azureus2.core3.util.Debug;
26 
27 import com.aelitis.azureus.core.networkmanager.EventWaiter;
28 import com.aelitis.azureus.core.networkmanager.NetworkConnectionBase;
29 import com.aelitis.azureus.core.networkmanager.RateHandler;
30 
31 
32 /**
33  * A fast write entity backed by a single peer connection.
34  */
35 public class SinglePeerUploader implements RateControlledEntity {
36 
37   private final NetworkConnectionBase connection;
38   private final RateHandler rate_handler;
39 
SinglePeerUploader( NetworkConnectionBase connection, RateHandler rate_handler )40   public SinglePeerUploader( NetworkConnectionBase connection, RateHandler rate_handler ) {
41     this.connection = connection;
42     this.rate_handler = rate_handler;
43   }
44 
45 	public RateHandler
getRateHandler()46 	getRateHandler()
47 	{
48 		return( rate_handler );
49 	}
50 
51 ////////////////RateControlledWriteEntity implementation ////////////////////
52 
canProcess(EventWaiter waiter)53   public boolean canProcess(EventWaiter waiter) {
54     if( !connection.getTransportBase().isReadyForWrite(waiter) )  {
55       return false;  //underlying transport not ready
56     }
57     if( connection.getOutgoingMessageQueue().getTotalSize() < 1 ) {
58       return false;  //no data to send
59     }
60     int[] allowed = rate_handler.getCurrentNumBytesAllowed();
61 
62     if( allowed[0] < 1 && allowed[1] == 0 ) {
63       return false;  //not allowed to send any bytes
64     }
65     return true;
66   }
67 
doProcessing(EventWaiter waiter, int max_bytes )68   public int doProcessing(EventWaiter waiter, int max_bytes ) {
69     if( !connection.getTransportBase().isReadyForWrite(waiter) )  {
70       //Debug.out("dW:not ready"); happens sometimes, just live with it as non-fatal
71       return 0;
72     }
73 
74     int[] allowed = rate_handler.getCurrentNumBytesAllowed();
75 
76     int num_bytes_allowed = allowed[0];
77 
78     boolean	protocol_is_free = allowed[1] > 0;
79 
80     if ( num_bytes_allowed < 1 ){
81 
82     	if ( protocol_is_free ){
83 
84     		num_bytes_allowed = 0;	// in case negative
85 
86     	}else{
87 
88     		return( 0 );
89     	}
90     }
91 
92 	if ( max_bytes > 0 && max_bytes < num_bytes_allowed ){
93 
94 		num_bytes_allowed = max_bytes;
95 	}
96 
97     int num_bytes_available = connection.getOutgoingMessageQueue().getTotalSize();
98 
99     if ( num_bytes_available < 1 ){
100 
101     	if ( !connection.getOutgoingMessageQueue().isDestroyed()){
102 
103     		//Debug.out("dW:not avail"); happens sometimes, just live with it as non-fatal
104     	}
105 
106     	return 0;
107     }
108 
109     int num_bytes_to_write = num_bytes_allowed > num_bytes_available ? num_bytes_available : num_bytes_allowed;
110 
111     //int mss = NetworkManager.getTcpMssSize();
112     //if( num_bytes_to_write > mss )  num_bytes_to_write = mss;
113 
114     int[] written;
115 
116     try {
117 
118       written = connection.getOutgoingMessageQueue().deliverToTransport( num_bytes_to_write, protocol_is_free, false );
119 
120     }catch( Throwable e ) {
121 
122     	written = new int[2];
123 
124     	if( AEDiagnostics.TRACE_CONNECTION_DROPS ) {
125     		if( e.getMessage() == null ) {
126     			Debug.out( "null write exception message: ", e );
127     		}
128     		else {
129     			if( e.getMessage().indexOf( "An existing connection was forcibly closed by the remote host" ) == -1 &&
130     					e.getMessage().indexOf( "Connection reset by peer" ) == -1 &&
131     					e.getMessage().indexOf( "Broken pipe" ) == -1 &&
132     					e.getMessage().indexOf( "An established connection was aborted by the software in your host machine" ) == -1 ) {
133 
134     				System.out.println( "SP: write exception [" +connection.getTransportBase().getDescription()+ "]: " +e.getMessage() );
135     			}
136     		}
137     	}
138 
139     	if (! (e instanceof IOException )){
140 
141     		Debug.printStackTrace(e);
142     	}
143 
144     	connection.notifyOfException( e );
145     	return 0;
146     }
147 
148     int data_bytes_written		= written[0];
149     int protocol_bytes_written	= written[1];
150 
151     int total_written =data_bytes_written + protocol_bytes_written;
152 
153     if ( total_written < 1 ){
154 
155     	return 0;
156     }
157 
158     rate_handler.bytesProcessed( data_bytes_written, protocol_bytes_written );
159 
160     return( total_written );
161   }
162 
getPriority()163   public int getPriority() {
164     return RateControlledEntity.PRIORITY_NORMAL;
165   }
166 
167   public boolean
getPriorityBoost()168   getPriorityBoost()
169   {
170 	  return( connection.getOutgoingMessageQueue().getPriorityBoost());
171   }
172 
173   public long
getBytesReadyToWrite()174   getBytesReadyToWrite()
175   {
176 	  return( connection.getOutgoingMessageQueue().getTotalSize());
177   }
178 
179   public int
getConnectionCount( EventWaiter waiter )180   getConnectionCount( EventWaiter waiter )
181   {
182 	  return( 1 );
183   }
184 
185   public int
getReadyConnectionCount( EventWaiter waiter )186   getReadyConnectionCount(
187 	EventWaiter	waiter )
188   {
189 	  if ( connection.getTransportBase().isReadyForWrite(waiter)){
190 
191 		  return( 1 );
192 	  }
193 
194 	  return( 0 );
195   }
196 
197   public String
getString()198   getString()
199   {
200 	  return( "SPU: bytes_allowed=" + rate_handler.getCurrentNumBytesAllowed() + " " + connection.getString());
201   }
202 
203 }
204