1 /* 2 * Created on 22 Jun 2006 3 * Created by Paul Gardner 4 * Copyright (C) Azureus Software, Inc, All Rights Reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 2 9 * of the License, or (at your option) any later version. 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * You should have received a copy of the GNU General Public License 15 * along with this program; if not, write to the Free Software 16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 17 * 18 */ 19 20 package com.aelitis.azureus.core.networkmanager.impl.udp; 21 22 import java.util.*; 23 import java.io.IOException; 24 import java.net.InetSocketAddress; 25 import java.net.UnknownHostException; 26 import java.nio.ByteBuffer; 27 28 import org.gudy.azureus2.core3.config.COConfigurationManager; 29 import org.gudy.azureus2.core3.config.ParameterListener; 30 import org.gudy.azureus2.core3.logging.LogEvent; 31 import org.gudy.azureus2.core3.logging.LogIDs; 32 import org.gudy.azureus2.core3.logging.Logger; 33 import org.gudy.azureus2.core3.util.AEThread2; 34 import org.gudy.azureus2.core3.util.Debug; 35 import org.gudy.azureus2.core3.util.SystemTime; 36 37 import com.aelitis.azureus.core.networkmanager.ConnectionEndpoint; 38 import com.aelitis.azureus.core.networkmanager.ProtocolEndpoint; 39 import com.aelitis.azureus.core.networkmanager.ProtocolEndpointFactory; 40 import com.aelitis.azureus.core.networkmanager.Transport.ConnectListener; 41 import com.aelitis.azureus.core.networkmanager.impl.IncomingConnectionManager; 42 import com.aelitis.azureus.core.networkmanager.impl.ProtocolDecoder; 43 import com.aelitis.azureus.core.networkmanager.impl.TransportCryptoManager; 44 import com.aelitis.azureus.core.networkmanager.impl.TransportHelperFilter; 45 import com.aelitis.azureus.core.util.bloom.BloomFilter; 46 import com.aelitis.azureus.core.util.bloom.BloomFilterFactory; 47 48 public class 49 UDPConnectionManager 50 implements NetworkGlueListener 51 { 52 private static final LogIDs LOGID = LogIDs.NET; 53 private static final boolean LOOPBACK = false; 54 private static final boolean FORCE_LOG = false; 55 56 private static boolean LOG = false; 57 private static int max_outbound_connections; 58 59 static{ COConfigurationManager.addAndFireParameterListeners( new String[]{ R, R, }, new ParameterListener() { public void parameterChanged( String name ) { LOG = FORCE_LOG || COConfigurationManager.getBooleanParameter( R ); max_outbound_connections = COConfigurationManager.getIntParameter( R, 2048 ); } })60 COConfigurationManager.addAndFireParameterListeners( 61 new String[]{ 62 "Logging Enable UDP Transport", 63 "network.udp.max.connections.outstanding", 64 }, 65 new ParameterListener() 66 { 67 public void 68 parameterChanged( 69 String name ) 70 { 71 LOG = FORCE_LOG || COConfigurationManager.getBooleanParameter( "Logging Enable UDP Transport" ); 72 73 max_outbound_connections = COConfigurationManager.getIntParameter( "network.udp.max.connections.outstanding", 2048 ); 74 } 75 }); 76 } 77 78 public static final int TIMER_TICK_MILLIS = 25; 79 public static final int THREAD_LINGER_ON_IDLE_PERIOD = 30*1000; 80 public static final int DEAD_KEY_RETENTION_PERIOD = 30*1000; 81 82 public static final int STATS_TIME = 60*1000; 83 public static final int STATS_TICKS = STATS_TIME / TIMER_TICK_MILLIS; 84 85 private final Map connection_sets = new HashMap(); 86 private final Map recently_dead_keys = new HashMap(); 87 88 private int next_connection_id; 89 90 91 private IncomingConnectionManager incoming_manager = IncomingConnectionManager.getSingleton(); 92 93 private NetworkGlue network_glue; 94 95 private UDPSelector selector; 96 private ProtocolTimer protocol_timer; 97 private long idle_start; 98 99 private static final int BLOOM_RECREATE = 30*1000; 100 private static final int BLOOM_INCREASE = 1000; 101 private BloomFilter incoming_bloom = BloomFilterFactory.createAddRemove4Bit(BLOOM_INCREASE); 102 private long incoming_bloom_create_time = SystemTime.getCurrentTime(); 103 private long last_incoming; 104 105 106 private int rate_limit_discard_packets; 107 private int rate_limit_discard_bytes; 108 private int setup_discard_packets; 109 private int setup_discard_bytes; 110 111 private volatile int outbound_connection_count; 112 private boolean max_conn_exceeded_logged; 113 114 protected UDPConnectionManager()115 UDPConnectionManager() 116 { 117 if ( LOOPBACK ){ 118 119 network_glue = new NetworkGlueLoopBack( this ); 120 121 }else{ 122 123 network_glue = new NetworkGlueUDP( this ); 124 } 125 } 126 127 public void connectOutbound( final UDPTransport udp_transport, final InetSocketAddress address, byte[][] shared_secrets, ByteBuffer initial_data, final ConnectListener listener )128 connectOutbound( 129 final UDPTransport udp_transport, 130 final InetSocketAddress address, 131 byte[][] shared_secrets, 132 ByteBuffer initial_data, 133 final ConnectListener listener ) 134 { 135 UDPTransportHelper helper = null; 136 137 try{ 138 if ( address.isUnresolved()){ 139 140 listener.connectFailure( new UnknownHostException( address.getHostName())); 141 142 return; 143 } 144 145 int time = listener.connectAttemptStarted( -1 ); 146 147 if ( time != -1 ){ 148 149 Debug.out( "UDP connect time override not supported" ); 150 } 151 152 helper = new UDPTransportHelper( this, address, udp_transport ); 153 154 final UDPTransportHelper f_helper = helper; 155 156 synchronized( this ){ 157 158 outbound_connection_count++; 159 160 if ( outbound_connection_count >= max_outbound_connections ){ 161 162 if ( !max_conn_exceeded_logged ){ 163 164 max_conn_exceeded_logged = true; 165 166 Debug.out( "UDPConnectionManager: max outbound connection limit reached (" + max_outbound_connections + ")" ); 167 } 168 } 169 } 170 171 try{ 172 TransportCryptoManager.getSingleton().manageCrypto( 173 helper, 174 shared_secrets, 175 false, 176 initial_data, 177 new TransportCryptoManager.HandshakeListener() 178 { 179 public void 180 handshakeSuccess( 181 ProtocolDecoder decoder, 182 ByteBuffer remaining_initial_data ) 183 { 184 synchronized( UDPConnectionManager.this ){ 185 186 if ( outbound_connection_count > 0 ){ 187 188 outbound_connection_count--; 189 } 190 } 191 192 TransportHelperFilter filter = decoder.getFilter(); 193 194 try{ 195 udp_transport.setFilter( filter ); 196 197 if ( udp_transport.isClosed()){ 198 199 udp_transport.close( "Already closed" ); 200 201 listener.connectFailure( new Exception( "Connection already closed" )); 202 203 }else{ 204 205 if ( Logger.isEnabled()){ 206 207 Logger.log(new LogEvent(LOGID, "Outgoing UDP stream to " + address + " established, type = " + filter.getName(false))); 208 } 209 210 udp_transport.connectedOutbound(); 211 212 listener.connectSuccess( udp_transport, remaining_initial_data ); 213 } 214 }catch( Throwable e ){ 215 216 Debug.printStackTrace(e); 217 218 udp_transport.close( Debug.getNestedExceptionMessageAndStack(e)); 219 220 listener.connectFailure( e ); 221 } 222 } 223 224 public void 225 handshakeFailure( 226 Throwable failure_msg ) 227 { 228 synchronized( UDPConnectionManager.this ){ 229 230 if ( outbound_connection_count > 0 ){ 231 232 outbound_connection_count--; 233 } 234 } 235 236 f_helper.close( Debug.getNestedExceptionMessageAndStack(failure_msg)); 237 238 listener.connectFailure( failure_msg ); 239 } 240 241 public void 242 gotSecret( 243 byte[] session_secret ) 244 { 245 f_helper.getConnection().setSecret( session_secret ); 246 } 247 248 public int 249 getMaximumPlainHeaderLength() 250 { 251 throw( new RuntimeException()); // this is outgoing 252 } 253 254 public int 255 matchPlainHeader( 256 ByteBuffer buffer ) 257 { 258 throw( new RuntimeException()); // this is outgoing 259 } 260 }); 261 262 }catch( Throwable e ){ 263 264 synchronized( this ){ 265 266 if ( outbound_connection_count > 0 ){ 267 268 outbound_connection_count--; 269 } 270 } 271 272 throw( e ); 273 } 274 275 }catch( Throwable e ){ 276 277 Debug.printStackTrace(e); 278 279 if ( helper != null ){ 280 281 helper.close( Debug.getNestedExceptionMessage( e )); 282 } 283 284 listener.connectFailure( e ); 285 } 286 } 287 288 public int getMaxOutboundPermitted()289 getMaxOutboundPermitted() 290 { 291 return( Math.max( max_outbound_connections - outbound_connection_count, 0 )); 292 } 293 294 protected UDPSelector checkThreadCreation()295 checkThreadCreation() 296 { 297 // called while holding the "connections" monitor 298 299 if ( selector == null ){ 300 301 if (Logger.isEnabled()){ 302 Logger.log(new LogEvent(LOGID, "UDPConnectionManager: activating" )); 303 } 304 305 idle_start = SystemTime.getMonotonousTime(); 306 307 selector = new UDPSelector(this ); 308 309 protocol_timer = new ProtocolTimer(); 310 } 311 312 return( selector ); 313 } 314 315 protected void checkThreadDeath( boolean connections_running )316 checkThreadDeath( 317 boolean connections_running ) 318 { 319 // called while holding the "connections" monitor 320 321 if ( connections_running ){ 322 323 idle_start = 0; 324 325 }else{ 326 327 long now = SystemTime.getMonotonousTime(); 328 329 if ( idle_start == 0 ){ 330 331 idle_start = now; 332 333 }else if ( now - idle_start > THREAD_LINGER_ON_IDLE_PERIOD ){ 334 335 if (Logger.isEnabled()){ 336 Logger.log(new LogEvent(LOGID, "UDPConnectionManager: deactivating" )); 337 } 338 339 selector.destroy(); 340 341 selector = null; 342 343 protocol_timer.destroy(); 344 345 protocol_timer = null; 346 } 347 } 348 } 349 350 protected void poll()351 poll() 352 { 353 synchronized( connection_sets ){ 354 355 Iterator it = connection_sets.values().iterator(); 356 357 while( it.hasNext()){ 358 359 ((UDPConnectionSet)it.next()).poll(); 360 } 361 } 362 } 363 364 public void remove( UDPConnectionSet set, UDPConnection connection )365 remove( 366 UDPConnectionSet set, 367 UDPConnection connection ) 368 { 369 synchronized( connection_sets ){ 370 371 if ( set.remove( connection )){ 372 373 String key = set.getKey(); 374 375 if ( set.hasFailed()){ 376 377 if ( connection_sets.remove( key ) != null ){ 378 379 set.removed(); 380 381 recently_dead_keys.put( key, new Long( SystemTime.getCurrentTime())); 382 383 if (Logger.isEnabled()){ 384 385 Logger.log(new LogEvent(LOGID, "Connection set " + key + " failed")); 386 } 387 } 388 } 389 } 390 } 391 } 392 393 public void failed( UDPConnectionSet set )394 failed( 395 UDPConnectionSet set ) 396 { 397 synchronized( connection_sets ){ 398 399 String key = set.getKey(); 400 401 if ( connection_sets.remove( key ) != null ){ 402 403 set.removed(); 404 405 recently_dead_keys.put( key, new Long( SystemTime.getCurrentTime())); 406 407 if (Logger.isEnabled()){ 408 409 Logger.log(new LogEvent(LOGID, "Connection set " + key + " failed")); 410 } 411 } 412 } 413 } 414 415 protected UDPConnection registerOutgoing( UDPTransportHelper helper )416 registerOutgoing( 417 UDPTransportHelper helper ) 418 419 throws IOException 420 { 421 int local_port = UDPNetworkManager.getSingleton().getUDPListeningPortNumber(); 422 423 InetSocketAddress address = helper.getAddress(); 424 425 String key = local_port + ":" + address.getAddress().getHostAddress() + ":" + address.getPort(); 426 427 synchronized( connection_sets ){ 428 429 UDPSelector current_selector = checkThreadCreation(); 430 431 UDPConnectionSet connection_set = (UDPConnectionSet)connection_sets.get( key ); 432 433 if ( connection_set == null ){ 434 435 timeoutDeadKeys(); 436 437 connection_set = new UDPConnectionSet( this, key, current_selector, local_port, address ); 438 439 if (Logger.isEnabled()){ 440 441 Logger.log(new LogEvent(LOGID, "Created new set - " + connection_set.getName() + ", outgoing")); 442 } 443 444 connection_sets.put( key, connection_set ); 445 } 446 447 UDPConnection connection = new UDPConnection( connection_set, allocationConnectionID(), helper ); 448 449 connection_set.add( connection ); 450 451 return( connection ); 452 } 453 } 454 455 public void receive( int local_port, InetSocketAddress remote_address, byte[] data, int data_length )456 receive( 457 int local_port, 458 InetSocketAddress remote_address, 459 byte[] data, 460 int data_length ) 461 { 462 String key = local_port + ":" + remote_address.getAddress().getHostAddress() + ":" + remote_address.getPort(); 463 464 UDPConnectionSet connection_set; 465 466 synchronized( connection_sets ){ 467 468 UDPSelector current_selector = checkThreadCreation(); 469 470 connection_set = (UDPConnectionSet)connection_sets.get( key ); 471 472 if ( connection_set == null ){ 473 474 timeoutDeadKeys(); 475 476 // check that this at least looks like an initial crypto packet 477 478 if ( data_length >= UDPNetworkManager.MIN_INCOMING_INITIAL_PACKET_SIZE && 479 data_length <= UDPNetworkManager.MAX_INCOMING_INITIAL_PACKET_SIZE ){ 480 481 if ( !rateLimitIncoming( remote_address )){ 482 483 rate_limit_discard_packets++; 484 rate_limit_discard_bytes += data_length; 485 486 return; 487 } 488 489 connection_set = new UDPConnectionSet( this, key, current_selector, local_port, remote_address ); 490 491 if (Logger.isEnabled()){ 492 493 Logger.log(new LogEvent(LOGID, "Created new set - " + connection_set.getName() + ", incoming")); 494 } 495 496 connection_sets.put( key, connection_set ); 497 498 }else{ 499 500 if ( recently_dead_keys.get( key ) == null ){ 501 502 // we can get quite a lot of these if things get out of sync 503 504 // Debug.out( "Incoming UDP packet mismatch for connection establishment: " + key ); 505 } 506 507 setup_discard_packets++; 508 setup_discard_bytes += data_length; 509 510 return; 511 } 512 } 513 } 514 515 try{ 516 //System.out.println( "recv:" + ByteFormatter.encodeString( data, 0, data_length>64?64:data_length ) + (data_length>64?"...":"")); 517 518 connection_set.receive( data, data_length ); 519 520 }catch( IOException e ){ 521 522 connection_set.failed( e ); 523 524 }catch( Throwable e ){ 525 526 Debug.printStackTrace( e ); 527 528 connection_set.failed( e ); 529 } 530 } 531 532 protected boolean rateLimitIncoming( InetSocketAddress s_address )533 rateLimitIncoming( 534 InetSocketAddress s_address ) 535 { 536 long now = SystemTime.getCurrentTime(); 537 538 byte[] address = s_address.getAddress().getAddress(); 539 540 long delay; 541 542 synchronized( this ){ 543 544 int hit_count = incoming_bloom.add( address ); 545 546 // allow up to 10% bloom filter utilisation 547 548 if ( incoming_bloom.getSize() / incoming_bloom.getEntryCount() < 10 ){ 549 550 incoming_bloom = BloomFilterFactory.createAddRemove4Bit(incoming_bloom.getSize() + BLOOM_INCREASE ); 551 552 incoming_bloom_create_time = now; 553 554 Logger.log( new LogEvent(LOGID, "UDP connnection bloom: size increased to " + incoming_bloom.getSize())); 555 556 }else if ( now < incoming_bloom_create_time || now - incoming_bloom_create_time > BLOOM_RECREATE ){ 557 558 incoming_bloom = BloomFilterFactory.createAddRemove4Bit(incoming_bloom.getSize()); 559 560 incoming_bloom_create_time = now; 561 } 562 563 if ( hit_count >= 15 ){ 564 565 Logger.log( new LogEvent(LOGID, "UDP incoming: too many recent connection attempts from " + s_address )); 566 567 return( false ); 568 } 569 570 long since_last = now - last_incoming; 571 572 delay = 100 - since_last; 573 574 last_incoming = now; 575 } 576 577 // limit to 10 a second 578 579 if ( delay > 0 && delay < 100 ){ 580 581 try{ 582 Thread.sleep( delay ); 583 584 }catch( Throwable e ){ 585 } 586 } 587 588 return( true ); 589 } 590 591 public int send( int local_port, InetSocketAddress remote_address, byte[] data )592 send( 593 int local_port, 594 InetSocketAddress remote_address, 595 byte[] data ) 596 597 throws IOException 598 { 599 return( network_glue.send( local_port, remote_address, data )); 600 } 601 602 protected void accept( final int local_port, final InetSocketAddress remote_address, final UDPConnection connection )603 accept( 604 final int local_port, 605 final InetSocketAddress remote_address, 606 final UDPConnection connection ) 607 { 608 final UDPTransportHelper helper = new UDPTransportHelper( this, remote_address, connection ); 609 610 try{ 611 connection.setTransport( helper ); 612 613 TransportCryptoManager.getSingleton().manageCrypto( 614 helper, 615 null, 616 true, 617 null, 618 new TransportCryptoManager.HandshakeListener() 619 { 620 public void 621 handshakeSuccess( 622 ProtocolDecoder decoder, 623 ByteBuffer remaining_initial_data ) 624 { 625 TransportHelperFilter filter = decoder.getFilter(); 626 627 ConnectionEndpoint co_ep = new ConnectionEndpoint( remote_address); 628 629 ProtocolEndpointUDP pe_udp = (ProtocolEndpointUDP)ProtocolEndpointFactory.createEndpoint( ProtocolEndpoint.PROTOCOL_UDP, co_ep, remote_address ); 630 631 UDPTransport transport = new UDPTransport( pe_udp, filter ); 632 633 helper.setTransport( transport ); 634 635 incoming_manager.addConnection( local_port, filter, transport ); 636 } 637 638 public void 639 handshakeFailure( 640 Throwable failure_msg ) 641 { 642 if (Logger.isEnabled()){ 643 Logger.log(new LogEvent(LOGID, "incoming crypto handshake failure: " + Debug.getNestedExceptionMessage( failure_msg ))); 644 } 645 646 connection.close( "handshake failure: " + Debug.getNestedExceptionMessage(failure_msg)); 647 } 648 649 public void 650 gotSecret( 651 byte[] session_secret ) 652 { 653 helper.getConnection().setSecret( session_secret ); 654 } 655 656 public int 657 getMaximumPlainHeaderLength() 658 { 659 return( incoming_manager.getMaxMinMatchBufferSize()); 660 } 661 662 public int 663 matchPlainHeader( 664 ByteBuffer buffer ) 665 { 666 Object[] match_data = incoming_manager.checkForMatch( helper, local_port, buffer, true ); 667 668 if ( match_data == null ){ 669 670 return( TransportCryptoManager.HandshakeListener.MATCH_NONE ); 671 672 }else{ 673 674 // no fallback for UDP 675 676 return( TransportCryptoManager.HandshakeListener.MATCH_CRYPTO_NO_AUTO_FALLBACK ); 677 } 678 } 679 }); 680 681 }catch( Throwable e ){ 682 683 Debug.printStackTrace( e ); 684 685 helper.close( Debug.getNestedExceptionMessage(e)); 686 } 687 } 688 689 protected synchronized int allocationConnectionID()690 allocationConnectionID() 691 { 692 int id = next_connection_id++; 693 694 if ( id < 0 ){ 695 696 id = 0; 697 next_connection_id = 1; 698 } 699 700 return( id ); 701 } 702 703 protected void timeoutDeadKeys()704 timeoutDeadKeys() 705 { 706 Iterator it = recently_dead_keys.values().iterator(); 707 708 long now = SystemTime.getCurrentTime(); 709 710 while( it.hasNext()){ 711 712 long dead_time = ((Long)it.next()).longValue(); 713 714 if ( dead_time > now || now - dead_time > DEAD_KEY_RETENTION_PERIOD ){ 715 716 it.remove(); 717 } 718 } 719 } 720 721 protected class 722 ProtocolTimer 723 { 724 private volatile boolean destroyed; 725 726 protected ProtocolTimer()727 ProtocolTimer() 728 { 729 new AEThread2( "UDPConnectionManager:timer", true ) 730 { 731 private int tick_count; 732 733 public void 734 run() 735 { 736 Thread.currentThread().setPriority( Thread.NORM_PRIORITY + 1 ); 737 738 while( !destroyed ){ 739 740 try{ 741 Thread.sleep( TIMER_TICK_MILLIS ); 742 743 }catch( Throwable e ){ 744 745 } 746 747 tick_count++; 748 749 if ( tick_count % STATS_TICKS == 0 ){ 750 751 logStats(); 752 } 753 754 List failed_sets = null; 755 756 synchronized( connection_sets ){ 757 758 int cs_size = connection_sets.size(); 759 760 checkThreadDeath( cs_size > 0 ); 761 762 if ( cs_size > 0 ){ 763 764 Iterator it = connection_sets.values().iterator(); 765 766 while( it.hasNext()){ 767 768 UDPConnectionSet set = (UDPConnectionSet)it.next(); 769 770 try{ 771 set.timerTick(); 772 773 if ( set.idleLimitExceeded()){ 774 775 if (Logger.isEnabled()){ 776 777 Logger.log(new LogEvent(LOGID, "Idle limit exceeded for " + set.getName() + ", removing" )); 778 } 779 780 recently_dead_keys.put( set.getKey(), new Long( SystemTime.getCurrentTime())); 781 782 it.remove(); 783 784 set.removed(); 785 } 786 }catch( Throwable e ){ 787 788 if ( failed_sets == null ){ 789 790 failed_sets = new ArrayList(); 791 } 792 793 failed_sets.add( new Object[]{ set, e }); 794 } 795 } 796 } 797 } 798 799 if ( failed_sets != null ){ 800 801 for (int i=0;i<failed_sets.size();i++){ 802 803 Object[] entry = (Object[])failed_sets.get(i); 804 805 ((UDPConnectionSet)entry[0]).failed((Throwable)entry[1]); 806 } 807 } 808 } 809 810 logStats(); 811 } 812 }.start(); 813 } 814 815 protected void destroy()816 destroy() 817 { 818 destroyed = true; 819 } 820 } 821 822 protected void logStats()823 logStats() 824 { 825 if (Logger.isEnabled()){ 826 827 long[] nw_stats = network_glue.getStats(); 828 829 String str = "UDPConnection stats: sent=" + nw_stats[0] + "/" + nw_stats[1] + ",received=" + nw_stats[2] + "/" + nw_stats[3]; 830 831 str += ", setup discards=" + setup_discard_packets + "/" + setup_discard_bytes; 832 str += ", rate discards=" + rate_limit_discard_packets + "/" + rate_limit_discard_bytes; 833 834 Logger.log(new LogEvent(LOGID, str )); 835 } 836 } 837 838 protected boolean trace()839 trace() 840 { 841 return( LOG ); 842 } 843 844 protected void trace( String str )845 trace( 846 String str ) 847 { 848 if ( LOG ){ 849 850 if ( FORCE_LOG ){ 851 852 System.out.println( str ); 853 } 854 855 if (Logger.isEnabled()){ 856 857 Logger.log(new LogEvent(LOGID, str )); 858 } 859 } 860 } 861 } 862