1 /*
2  * Created on Jul 29, 2004
3  * Created by Alon Rohter
4  * Copyright (C) Azureus Software, Inc, All Rights Reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17  *
18  */
19 
20 package com.aelitis.azureus.core.networkmanager.impl;
21 
22 
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 import java.util.Map;
26 
27 import org.gudy.azureus2.core3.util.AddressUtils;
28 import org.gudy.azureus2.core3.util.Debug;
29 import org.gudy.azureus2.core3.util.LightHashMap;
30 
31 import com.aelitis.azureus.core.networkmanager.*;
32 import com.aelitis.azureus.core.peermanager.messaging.MessageStreamDecoder;
33 import com.aelitis.azureus.core.peermanager.messaging.MessageStreamEncoder;
34 
35 
36 
37 /**
38  *
39  */
40 
41 public class
42 NetworkConnectionImpl
43 	extends NetworkConnectionHelper
44 	implements NetworkConnection
45 {
46   private final ConnectionEndpoint	connection_endpoint;
47   private final boolean				is_incoming;
48 
49   private boolean connect_with_crypto;
50   private boolean allow_fallback;
51   private byte[][] shared_secrets;
52 
53   private ConnectionListener connection_listener;
54   private boolean 	is_connected;
55   private byte		is_lan_local	= AddressUtils.LAN_LOCAL_MAYBE;
56 
57   private final OutgoingMessageQueueImpl outgoing_message_queue;
58   private final IncomingMessageQueueImpl incoming_message_queue;
59 
60   private Transport	transport;
61 
62   private volatile ConnectionAttempt	connection_attempt;
63   private volatile boolean				closed;
64 
65   private Map<Object,Object>			user_data;
66 
67   /**
68    * Constructor for new OUTbound connection.
69    * The connection is not yet established upon instantiation; use connect() to do so.
70    * @param _remote_address to connect to
71    * @param encoder default message stream encoder to use for the outgoing queue
72    * @param decoder default message stream decoder to use for the incoming queue
73    */
NetworkConnectionImpl( ConnectionEndpoint _target, MessageStreamEncoder encoder, MessageStreamDecoder decoder, boolean _connect_with_crypto, boolean _allow_fallback, byte[][] _shared_secrets )74   public NetworkConnectionImpl(
75 		  		ConnectionEndpoint _target, MessageStreamEncoder encoder,
76 		  		MessageStreamDecoder decoder, boolean _connect_with_crypto, boolean _allow_fallback,
77 		  		byte[][] _shared_secrets )
78   {
79 	connection_endpoint	= _target;
80 	is_incoming			= false;
81     connect_with_crypto	= _connect_with_crypto;
82     allow_fallback = _allow_fallback;
83     shared_secrets = _shared_secrets;
84 
85 
86     is_connected = false;
87     outgoing_message_queue = new OutgoingMessageQueueImpl( encoder );
88     incoming_message_queue = new IncomingMessageQueueImpl( decoder, this );
89   }
90 
91 
92   /**
93    * Constructor for new INbound connection.
94    * The connection is assumed to be already established, by the given already-connected channel.
95    * @param _remote_channel connected by
96    * @param data_already_read bytestream already read during routing
97    * @param encoder default message stream encoder to use for the outgoing queue
98    * @param decoder default message stream decoder to use for the incoming queue
99    */
NetworkConnectionImpl( Transport _transport, MessageStreamEncoder encoder, MessageStreamDecoder decoder )100   public NetworkConnectionImpl( Transport _transport, MessageStreamEncoder encoder, MessageStreamDecoder decoder ) {
101     transport = _transport;
102     connection_endpoint = transport.getTransportEndpoint().getProtocolEndpoint().getConnectionEndpoint();
103     is_incoming		= true;
104     is_connected 	= true;
105     outgoing_message_queue = new OutgoingMessageQueueImpl( encoder );
106     outgoing_message_queue.setTransport( transport );
107     incoming_message_queue = new IncomingMessageQueueImpl( decoder, this );
108 
109     transport.bindConnection( this );
110   }
111 
112 
113   public ConnectionEndpoint
getEndpoint()114   getEndpoint()
115   {
116 	  return( connection_endpoint );
117   }
118 
119   public boolean
isIncoming()120   isIncoming()
121   {
122 	  return( is_incoming );
123   }
124 
connect( int priority, ConnectionListener listener )125   public void connect( int priority, ConnectionListener listener ) {
126 	  connect( null, priority, listener );
127   }
128 
connect( ByteBuffer initial_outbound_data, int priority, ConnectionListener listener )129   public void connect( ByteBuffer initial_outbound_data, int priority, ConnectionListener listener ) {
130     this.connection_listener = listener;
131 
132     if( is_connected ){
133 
134       connection_listener.connectStarted( -1 );
135 
136       connection_listener.connectSuccess( initial_outbound_data );
137 
138       return;
139     }
140 
141     if ( connection_attempt != null ){
142 
143     	Debug.out( "Connection attempt already active" );
144 
145     	listener.connectFailure( new Throwable( "Connection attempt already active" ));
146 
147     	return;
148     }
149 
150     connection_attempt =
151     	connection_endpoint.connectOutbound(
152     			connect_with_crypto,
153     			allow_fallback,
154     			shared_secrets,
155     			initial_outbound_data,
156     			priority,
157     			new Transport.ConnectListener() {
158 			      public int connectAttemptStarted( int default_connect_timeout ){
159 			        return( connection_listener.connectStarted( default_connect_timeout ));
160 			      }
161 
162 			      public void connectSuccess( Transport	_transport, ByteBuffer remaining_initial_data ) {
163 			        is_connected = true;
164 			        transport	= _transport;
165 			        outgoing_message_queue.setTransport( transport );
166 			        transport.bindConnection( NetworkConnectionImpl.this );
167 			        connection_listener.connectSuccess( remaining_initial_data );
168 			        connection_attempt	= null;
169 			      }
170 
171 			      public void connectFailure( Throwable failure_msg ) {
172 			        is_connected = false;
173 			        connection_listener.connectFailure( failure_msg );
174 			      }
175 
176 			    	public Object
177 					getConnectionProperty(
178 						String property_name)
179 					{
180 			    		return( connection_listener.getConnectionProperty( property_name ));
181 					}
182 			    });
183 
184     if ( closed ){
185 
186     	ConnectionAttempt	ca = connection_attempt;
187 
188     	if ( ca != null ){
189 
190     		ca.abandon();
191     	}
192     }
193   }
194 
195   public Transport
detachTransport()196   detachTransport()
197   {
198 	  Transport	t = transport;
199 
200 	  if ( t != null ){
201 
202 		  t.unbindConnection( this );
203 	  }
204 
205 	  transport = new bogusTransport( transport );
206 
207 	  close( "detached transport" );
208 
209 	  return( t );
210   }
211 
close( String reason )212   public void close( String reason ) {
213   	NetworkManager.getSingleton().stopTransferProcessing( this );
214   	closed	= true;
215     if ( connection_attempt != null ){
216     	connection_attempt.abandon();
217     }
218     if ( transport != null ){
219     	transport.close( "Tidy close" + ( reason==null||reason.length()==0?"":(": " + reason )));
220     }
221     incoming_message_queue.destroy();
222    	outgoing_message_queue.destroy();
223     is_connected = false;
224   }
225 
226 
notifyOfException( Throwable error )227   public void notifyOfException( Throwable error ) {
228     if( connection_listener != null ) {
229       connection_listener.exceptionThrown( error );
230     }
231     else {
232       Debug.out( "notifyOfException():: connection_listener == null for exception: " +error.getMessage() );
233     }
234   }
235 
236 
getOutgoingMessageQueue()237   public OutgoingMessageQueue getOutgoingMessageQueue() {  return outgoing_message_queue;  }
238 
getIncomingMessageQueue()239   public IncomingMessageQueue getIncomingMessageQueue() {  return incoming_message_queue;  }
240 
241 
242   public void
startMessageProcessing()243   startMessageProcessing()
244   {
245   	NetworkManager.getSingleton().startTransferProcessing( this );
246   }
247 
248 
enableEnhancedMessageProcessing( boolean enable, int partition_id )249   public void enableEnhancedMessageProcessing( boolean enable, int partition_id ) {
250     if( enable ) {
251     	NetworkManager.getSingleton().upgradeTransferProcessing( this, partition_id );
252     }else{
253       NetworkManager.getSingleton().downgradeTransferProcessing( this );
254     }
255   }
256 
257 
getTransport()258   public Transport getTransport() {  return transport;  }
259 
getTransportBase()260   public TransportBase getTransportBase() {  return transport;  }
261 
262   public int
getMssSize()263   getMssSize()
264   {
265 	  if ( transport == null ){
266 
267 		  return( NetworkManager.getMinMssSize());
268 
269 	  }else{
270 
271 		  return( transport.getMssSize());
272 	  }
273   }
274 
275 
276   public Object
setUserData( Object key, Object value )277   setUserData(
278   	Object		key,
279   	Object		value )
280   {
281 	synchronized( this ){
282 		if ( user_data == null ){
283 			user_data = new LightHashMap<Object, Object>();
284 		}
285 
286 		return( user_data.put( key, value ));
287 	}
288   }
289 
290   public Object
getUserData( Object key )291   getUserData(
292   	Object		key )
293   {
294 	  synchronized( this ){
295 			if ( user_data == null ){
296 				return( null );
297 			}
298 
299 			return( user_data.get( key ));
300 	  }
301   }
302 
toString()303   public String toString() {
304     return( transport==null?connection_endpoint.getDescription():transport.getDescription() );
305   }
306 
307 
isConnected()308 	public boolean isConnected() {
309 		return is_connected;
310 	}
311 
312 
isLANLocal()313 	public boolean isLANLocal() {
314 		if ( is_lan_local == AddressUtils.LAN_LOCAL_MAYBE ){
315 
316 			is_lan_local = AddressUtils.isLANLocalAddress( connection_endpoint.getNotionalAddress());
317 		}
318 		return( is_lan_local == AddressUtils.LAN_LOCAL_YES );
319 	}
320 
321 	public String
getString()322 	getString()
323 	{
324 		return( "tran=" + (transport==null?"null":transport.getDescription()+",w_ready=" + transport.isReadyForWrite(null)+",r_ready=" + transport.isReadyForRead( null ))+ ",in=" + incoming_message_queue.getPercentDoneOfCurrentMessage() +
325 				",out=" + (outgoing_message_queue==null?0:outgoing_message_queue.getTotalSize()) + ",owner=" + (connection_listener==null?"null":connection_listener.getDescription()));
326 	}
327 
328 	protected static class
329 	bogusTransport
330 		implements Transport
331 	{
332 		private Transport transport;
333 
334 		protected
bogusTransport( Transport _transport )335 		bogusTransport(
336 			Transport	_transport )
337 		{
338 			transport = _transport;
339 		}
340 
341 		public boolean
isReadyForWrite( EventWaiter waiter )342 		isReadyForWrite(
343 			EventWaiter waiter )
344 		{
345 			return( false );
346 		}
347 
348 		public long
isReadyForRead( EventWaiter waiter )349 		isReadyForRead(
350 			EventWaiter waiter )
351 		{
352 			return( Long.MAX_VALUE );
353 		}
354 
355 		public boolean
isTCP()356 		isTCP()
357 		{
358 			return( transport.isTCP());
359 		}
360 
361 		public boolean
isSOCKS()362 		isSOCKS()
363 		{
364 			return( transport.isSOCKS());
365 		}
366 
367 		public String
getDescription()368 		getDescription()
369 		{
370 			return( transport.getDescription());
371 		}
372 
373 		public int
getMssSize()374 		getMssSize()
375 		{
376 			return( transport.getMssSize());
377 		}
378 
379 		public void
setAlreadyRead( ByteBuffer bytes_already_read )380 		setAlreadyRead(
381 			ByteBuffer bytes_already_read )
382 		{
383 			Debug.out( "Bogus Transport Operation" );
384 		}
385 
386 		public TransportEndpoint
getTransportEndpoint()387 		getTransportEndpoint()
388 		{
389 			return( transport.getTransportEndpoint());
390 		}
391 
392 		public TransportStartpoint
getTransportStartpoint()393 		getTransportStartpoint()
394 		{
395 			return( transport.getTransportStartpoint());
396 		}
397 
398 		public boolean
isEncrypted()399 		isEncrypted()
400 		{
401 			return( transport.isEncrypted());
402 		}
403 
404 		public String
getEncryption( boolean verbose)405 		getEncryption( boolean verbose)
406 		{
407 			return( transport.getEncryption( verbose ));
408 		}
409 
getProtocol()410 		public String getProtocol(){ return transport.getProtocol(); }
411 
412 		public void
setReadyForRead()413 		setReadyForRead()
414 		{
415 			Debug.out( "Bogus Transport Operation" );
416 		}
417 
418 		public long
write( ByteBuffer[] buffers, int array_offset, int length )419 		write(
420 			ByteBuffer[] buffers,
421 			int array_offset,
422 			int length )
423 
424 			throws IOException
425 		{
426 			Debug.out( "Bogus Transport Operation" );
427 
428 			throw( new IOException( "Bogus transport!" ));
429 		}
430 
431 		public long
read( ByteBuffer[] buffers, int array_offset, int length )432 		read(
433 			ByteBuffer[] buffers, int array_offset, int length )
434 
435 			throws IOException
436 		{
437 			Debug.out( "Bogus Transport Operation" );
438 
439 			throw( new IOException( "Bogus transport!" ));
440 		}
441 
442 		public void
setTransportMode( int mode )443 		setTransportMode(
444 			int mode )
445 		{
446 			Debug.out( "Bogus Transport Operation" );
447 		}
448 
449 		public int
getTransportMode()450 		getTransportMode()
451 		{
452 			return( transport.getTransportMode());
453 		}
454 
455 		public void
connectOutbound( ByteBuffer initial_data, ConnectListener listener, int priority )456 		connectOutbound(
457 			ByteBuffer			initial_data,
458 			ConnectListener 	listener,
459 			int					priority )
460 		{
461 			Debug.out( "Bogus Transport Operation" );
462 
463 			listener.connectFailure( new Throwable( "Bogus Transport" ));
464 		}
465 
466 		public void
connectedInbound()467 		connectedInbound()
468 		{
469 			Debug.out( "Bogus Transport Operation" );
470 		}
471 
472 		public void
close( String reason )473 		close(
474 			String reason )
475 		{
476 			// we get here after detaching a transport and then closing the peer connection
477 		}
478 
479 		public void
bindConnection( NetworkConnection connection )480 		bindConnection(
481 			NetworkConnection	connection )
482 		{
483 		}
484 
485 		public void
unbindConnection( NetworkConnection connection )486 		unbindConnection(
487 			NetworkConnection	connection )
488 		{
489 		}
490 
491 		public void
setTrace( boolean on )492 		setTrace(
493 			boolean	on )
494 		{
495 		}
496 	}
497 }
498