1 /* 2 * Created on Jul 29, 2004 3 * Created by Alon Rohter 4 * Copyright (C) Azureus Software, Inc, All Rights Reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 2 9 * of the License, or (at your option) any later version. 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * You should have received a copy of the GNU General Public License 15 * along with this program; if not, write to the Free Software 16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 17 * 18 */ 19 20 package com.aelitis.azureus.core.networkmanager.impl; 21 22 23 import java.io.IOException; 24 import java.nio.ByteBuffer; 25 import java.util.Map; 26 27 import org.gudy.azureus2.core3.util.AddressUtils; 28 import org.gudy.azureus2.core3.util.Debug; 29 import org.gudy.azureus2.core3.util.LightHashMap; 30 31 import com.aelitis.azureus.core.networkmanager.*; 32 import com.aelitis.azureus.core.peermanager.messaging.MessageStreamDecoder; 33 import com.aelitis.azureus.core.peermanager.messaging.MessageStreamEncoder; 34 35 36 37 /** 38 * 39 */ 40 41 public class 42 NetworkConnectionImpl 43 extends NetworkConnectionHelper 44 implements NetworkConnection 45 { 46 private final ConnectionEndpoint connection_endpoint; 47 private final boolean is_incoming; 48 49 private boolean connect_with_crypto; 50 private boolean allow_fallback; 51 private byte[][] shared_secrets; 52 53 private ConnectionListener connection_listener; 54 private boolean is_connected; 55 private byte is_lan_local = AddressUtils.LAN_LOCAL_MAYBE; 56 57 private final OutgoingMessageQueueImpl outgoing_message_queue; 58 private final IncomingMessageQueueImpl incoming_message_queue; 59 60 private Transport transport; 61 62 private volatile ConnectionAttempt connection_attempt; 63 private volatile boolean closed; 64 65 private Map<Object,Object> user_data; 66 67 /** 68 * Constructor for new OUTbound connection. 69 * The connection is not yet established upon instantiation; use connect() to do so. 70 * @param _remote_address to connect to 71 * @param encoder default message stream encoder to use for the outgoing queue 72 * @param decoder default message stream decoder to use for the incoming queue 73 */ NetworkConnectionImpl( ConnectionEndpoint _target, MessageStreamEncoder encoder, MessageStreamDecoder decoder, boolean _connect_with_crypto, boolean _allow_fallback, byte[][] _shared_secrets )74 public NetworkConnectionImpl( 75 ConnectionEndpoint _target, MessageStreamEncoder encoder, 76 MessageStreamDecoder decoder, boolean _connect_with_crypto, boolean _allow_fallback, 77 byte[][] _shared_secrets ) 78 { 79 connection_endpoint = _target; 80 is_incoming = false; 81 connect_with_crypto = _connect_with_crypto; 82 allow_fallback = _allow_fallback; 83 shared_secrets = _shared_secrets; 84 85 86 is_connected = false; 87 outgoing_message_queue = new OutgoingMessageQueueImpl( encoder ); 88 incoming_message_queue = new IncomingMessageQueueImpl( decoder, this ); 89 } 90 91 92 /** 93 * Constructor for new INbound connection. 94 * The connection is assumed to be already established, by the given already-connected channel. 95 * @param _remote_channel connected by 96 * @param data_already_read bytestream already read during routing 97 * @param encoder default message stream encoder to use for the outgoing queue 98 * @param decoder default message stream decoder to use for the incoming queue 99 */ NetworkConnectionImpl( Transport _transport, MessageStreamEncoder encoder, MessageStreamDecoder decoder )100 public NetworkConnectionImpl( Transport _transport, MessageStreamEncoder encoder, MessageStreamDecoder decoder ) { 101 transport = _transport; 102 connection_endpoint = transport.getTransportEndpoint().getProtocolEndpoint().getConnectionEndpoint(); 103 is_incoming = true; 104 is_connected = true; 105 outgoing_message_queue = new OutgoingMessageQueueImpl( encoder ); 106 outgoing_message_queue.setTransport( transport ); 107 incoming_message_queue = new IncomingMessageQueueImpl( decoder, this ); 108 109 transport.bindConnection( this ); 110 } 111 112 113 public ConnectionEndpoint getEndpoint()114 getEndpoint() 115 { 116 return( connection_endpoint ); 117 } 118 119 public boolean isIncoming()120 isIncoming() 121 { 122 return( is_incoming ); 123 } 124 connect( int priority, ConnectionListener listener )125 public void connect( int priority, ConnectionListener listener ) { 126 connect( null, priority, listener ); 127 } 128 connect( ByteBuffer initial_outbound_data, int priority, ConnectionListener listener )129 public void connect( ByteBuffer initial_outbound_data, int priority, ConnectionListener listener ) { 130 this.connection_listener = listener; 131 132 if( is_connected ){ 133 134 connection_listener.connectStarted( -1 ); 135 136 connection_listener.connectSuccess( initial_outbound_data ); 137 138 return; 139 } 140 141 if ( connection_attempt != null ){ 142 143 Debug.out( "Connection attempt already active" ); 144 145 listener.connectFailure( new Throwable( "Connection attempt already active" )); 146 147 return; 148 } 149 150 connection_attempt = 151 connection_endpoint.connectOutbound( 152 connect_with_crypto, 153 allow_fallback, 154 shared_secrets, 155 initial_outbound_data, 156 priority, 157 new Transport.ConnectListener() { 158 public int connectAttemptStarted( int default_connect_timeout ){ 159 return( connection_listener.connectStarted( default_connect_timeout )); 160 } 161 162 public void connectSuccess( Transport _transport, ByteBuffer remaining_initial_data ) { 163 is_connected = true; 164 transport = _transport; 165 outgoing_message_queue.setTransport( transport ); 166 transport.bindConnection( NetworkConnectionImpl.this ); 167 connection_listener.connectSuccess( remaining_initial_data ); 168 connection_attempt = null; 169 } 170 171 public void connectFailure( Throwable failure_msg ) { 172 is_connected = false; 173 connection_listener.connectFailure( failure_msg ); 174 } 175 176 public Object 177 getConnectionProperty( 178 String property_name) 179 { 180 return( connection_listener.getConnectionProperty( property_name )); 181 } 182 }); 183 184 if ( closed ){ 185 186 ConnectionAttempt ca = connection_attempt; 187 188 if ( ca != null ){ 189 190 ca.abandon(); 191 } 192 } 193 } 194 195 public Transport detachTransport()196 detachTransport() 197 { 198 Transport t = transport; 199 200 if ( t != null ){ 201 202 t.unbindConnection( this ); 203 } 204 205 transport = new bogusTransport( transport ); 206 207 close( "detached transport" ); 208 209 return( t ); 210 } 211 close( String reason )212 public void close( String reason ) { 213 NetworkManager.getSingleton().stopTransferProcessing( this ); 214 closed = true; 215 if ( connection_attempt != null ){ 216 connection_attempt.abandon(); 217 } 218 if ( transport != null ){ 219 transport.close( "Tidy close" + ( reason==null||reason.length()==0?"":(": " + reason ))); 220 } 221 incoming_message_queue.destroy(); 222 outgoing_message_queue.destroy(); 223 is_connected = false; 224 } 225 226 notifyOfException( Throwable error )227 public void notifyOfException( Throwable error ) { 228 if( connection_listener != null ) { 229 connection_listener.exceptionThrown( error ); 230 } 231 else { 232 Debug.out( "notifyOfException():: connection_listener == null for exception: " +error.getMessage() ); 233 } 234 } 235 236 getOutgoingMessageQueue()237 public OutgoingMessageQueue getOutgoingMessageQueue() { return outgoing_message_queue; } 238 getIncomingMessageQueue()239 public IncomingMessageQueue getIncomingMessageQueue() { return incoming_message_queue; } 240 241 242 public void startMessageProcessing()243 startMessageProcessing() 244 { 245 NetworkManager.getSingleton().startTransferProcessing( this ); 246 } 247 248 enableEnhancedMessageProcessing( boolean enable, int partition_id )249 public void enableEnhancedMessageProcessing( boolean enable, int partition_id ) { 250 if( enable ) { 251 NetworkManager.getSingleton().upgradeTransferProcessing( this, partition_id ); 252 }else{ 253 NetworkManager.getSingleton().downgradeTransferProcessing( this ); 254 } 255 } 256 257 getTransport()258 public Transport getTransport() { return transport; } 259 getTransportBase()260 public TransportBase getTransportBase() { return transport; } 261 262 public int getMssSize()263 getMssSize() 264 { 265 if ( transport == null ){ 266 267 return( NetworkManager.getMinMssSize()); 268 269 }else{ 270 271 return( transport.getMssSize()); 272 } 273 } 274 275 276 public Object setUserData( Object key, Object value )277 setUserData( 278 Object key, 279 Object value ) 280 { 281 synchronized( this ){ 282 if ( user_data == null ){ 283 user_data = new LightHashMap<Object, Object>(); 284 } 285 286 return( user_data.put( key, value )); 287 } 288 } 289 290 public Object getUserData( Object key )291 getUserData( 292 Object key ) 293 { 294 synchronized( this ){ 295 if ( user_data == null ){ 296 return( null ); 297 } 298 299 return( user_data.get( key )); 300 } 301 } 302 toString()303 public String toString() { 304 return( transport==null?connection_endpoint.getDescription():transport.getDescription() ); 305 } 306 307 isConnected()308 public boolean isConnected() { 309 return is_connected; 310 } 311 312 isLANLocal()313 public boolean isLANLocal() { 314 if ( is_lan_local == AddressUtils.LAN_LOCAL_MAYBE ){ 315 316 is_lan_local = AddressUtils.isLANLocalAddress( connection_endpoint.getNotionalAddress()); 317 } 318 return( is_lan_local == AddressUtils.LAN_LOCAL_YES ); 319 } 320 321 public String getString()322 getString() 323 { 324 return( "tran=" + (transport==null?"null":transport.getDescription()+",w_ready=" + transport.isReadyForWrite(null)+",r_ready=" + transport.isReadyForRead( null ))+ ",in=" + incoming_message_queue.getPercentDoneOfCurrentMessage() + 325 ",out=" + (outgoing_message_queue==null?0:outgoing_message_queue.getTotalSize()) + ",owner=" + (connection_listener==null?"null":connection_listener.getDescription())); 326 } 327 328 protected static class 329 bogusTransport 330 implements Transport 331 { 332 private Transport transport; 333 334 protected bogusTransport( Transport _transport )335 bogusTransport( 336 Transport _transport ) 337 { 338 transport = _transport; 339 } 340 341 public boolean isReadyForWrite( EventWaiter waiter )342 isReadyForWrite( 343 EventWaiter waiter ) 344 { 345 return( false ); 346 } 347 348 public long isReadyForRead( EventWaiter waiter )349 isReadyForRead( 350 EventWaiter waiter ) 351 { 352 return( Long.MAX_VALUE ); 353 } 354 355 public boolean isTCP()356 isTCP() 357 { 358 return( transport.isTCP()); 359 } 360 361 public boolean isSOCKS()362 isSOCKS() 363 { 364 return( transport.isSOCKS()); 365 } 366 367 public String getDescription()368 getDescription() 369 { 370 return( transport.getDescription()); 371 } 372 373 public int getMssSize()374 getMssSize() 375 { 376 return( transport.getMssSize()); 377 } 378 379 public void setAlreadyRead( ByteBuffer bytes_already_read )380 setAlreadyRead( 381 ByteBuffer bytes_already_read ) 382 { 383 Debug.out( "Bogus Transport Operation" ); 384 } 385 386 public TransportEndpoint getTransportEndpoint()387 getTransportEndpoint() 388 { 389 return( transport.getTransportEndpoint()); 390 } 391 392 public TransportStartpoint getTransportStartpoint()393 getTransportStartpoint() 394 { 395 return( transport.getTransportStartpoint()); 396 } 397 398 public boolean isEncrypted()399 isEncrypted() 400 { 401 return( transport.isEncrypted()); 402 } 403 404 public String getEncryption( boolean verbose)405 getEncryption( boolean verbose) 406 { 407 return( transport.getEncryption( verbose )); 408 } 409 getProtocol()410 public String getProtocol(){ return transport.getProtocol(); } 411 412 public void setReadyForRead()413 setReadyForRead() 414 { 415 Debug.out( "Bogus Transport Operation" ); 416 } 417 418 public long write( ByteBuffer[] buffers, int array_offset, int length )419 write( 420 ByteBuffer[] buffers, 421 int array_offset, 422 int length ) 423 424 throws IOException 425 { 426 Debug.out( "Bogus Transport Operation" ); 427 428 throw( new IOException( "Bogus transport!" )); 429 } 430 431 public long read( ByteBuffer[] buffers, int array_offset, int length )432 read( 433 ByteBuffer[] buffers, int array_offset, int length ) 434 435 throws IOException 436 { 437 Debug.out( "Bogus Transport Operation" ); 438 439 throw( new IOException( "Bogus transport!" )); 440 } 441 442 public void setTransportMode( int mode )443 setTransportMode( 444 int mode ) 445 { 446 Debug.out( "Bogus Transport Operation" ); 447 } 448 449 public int getTransportMode()450 getTransportMode() 451 { 452 return( transport.getTransportMode()); 453 } 454 455 public void connectOutbound( ByteBuffer initial_data, ConnectListener listener, int priority )456 connectOutbound( 457 ByteBuffer initial_data, 458 ConnectListener listener, 459 int priority ) 460 { 461 Debug.out( "Bogus Transport Operation" ); 462 463 listener.connectFailure( new Throwable( "Bogus Transport" )); 464 } 465 466 public void connectedInbound()467 connectedInbound() 468 { 469 Debug.out( "Bogus Transport Operation" ); 470 } 471 472 public void close( String reason )473 close( 474 String reason ) 475 { 476 // we get here after detaching a transport and then closing the peer connection 477 } 478 479 public void bindConnection( NetworkConnection connection )480 bindConnection( 481 NetworkConnection connection ) 482 { 483 } 484 485 public void unbindConnection( NetworkConnection connection )486 unbindConnection( 487 NetworkConnection connection ) 488 { 489 } 490 491 public void setTrace( boolean on )492 setTrace( 493 boolean on ) 494 { 495 } 496 } 497 } 498