1 package org.jgroups.protocols; 2 3 import org.jgroups.Address; 4 import org.jgroups.PhysicalAddress; 5 import org.jgroups.annotations.Experimental; 6 import org.jgroups.annotations.ManagedAttribute; 7 import org.jgroups.annotations.Property; 8 import org.jgroups.annotations.Unsupported; 9 import org.jgroups.blocks.BasicConnectionTable; 10 import org.jgroups.blocks.ConnectionTableNIO; 11 12 import java.net.InetAddress; 13 import java.util.Collection; 14 15 /** 16 * Transport using NIO 17 * @author Scott Marlow 18 * @author Alex Fu 19 * @author Bela Ban 20 */ 21 @Experimental @Unsupported 22 public class TCP_NIO extends BasicTCP implements BasicConnectionTable.Receiver 23 { 24 25 /* 26 * (non-Javadoc) 27 * 28 * @see org.jgroups.protocols.TCP#getConnectionTable(long, long) 29 */ getConnectionTable(long ri, long cet, InetAddress b_addr, InetAddress bc_addr, int s_port, int e_port)30 protected ConnectionTableNIO getConnectionTable(long ri, long cet, 31 InetAddress b_addr, InetAddress bc_addr, 32 int s_port, int e_port) throws Exception { 33 ConnectionTableNIO retval=null; 34 if (ri == 0 && cet == 0) { 35 retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, false ); 36 } 37 else { 38 if (ri == 0) { 39 ri = 5000; 40 if(log.isWarnEnabled()) log.warn("reaper_interval was 0, set it to " + ri); 41 } 42 if (cet == 0) { 43 cet = 1000 * 60 * 5; 44 if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + cet); 45 } 46 retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, ri, cet, false); 47 } 48 retval.setThreadFactory(getThreadFactory()); 49 retval.setProcessorMaxThreads(getProcessorMaxThreads()); 50 retval.setProcessorQueueSize(getProcessorQueueSize()); 51 retval.setProcessorMinThreads(getProcessorMinThreads()); 52 retval.setProcessorKeepAliveTime(getProcessorKeepAliveTime()); 53 retval.setProcessorThreads(getProcessorThreads()); 54 retval.start(); 55 return retval; 56 } 57 printConnections()58 public String printConnections() {return ct.toString();} 59 getPhysicalAddress()60 protected PhysicalAddress getPhysicalAddress() { 61 return ct != null? (PhysicalAddress)ct.getLocalAddress() : null; 62 } 63 send(Address dest, byte[] data, int offset, int length)64 public void send(Address dest, byte[] data, int offset, int length) throws Exception { 65 ct.send(dest, data, offset, length); 66 } 67 start()68 public void start() throws Exception { 69 ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,bind_port,bind_port+port_range); 70 ct.setUseSendQueues(use_send_queues); 71 // ct.addConnectionListener(this); 72 ct.setReceiveBufferSize(recv_buf_size); 73 ct.setSendBufferSize(send_buf_size); 74 ct.setSocketConnectionTimeout(sock_conn_timeout); 75 ct.setPeerAddressReadTimeout(peer_addr_read_timeout); 76 ct.setTcpNodelay(tcp_nodelay); 77 ct.setLinger(linger); 78 super.start(); 79 } 80 retainAll(Collection<Address> members)81 public void retainAll(Collection<Address> members) { 82 ct.retainAll(members); 83 } 84 stop()85 public void stop() { 86 ct.stop(); 87 super.stop(); 88 } 89 getReaderThreads()90 public int getReaderThreads() { return reader_threads; } getWriterThreads()91 public int getWriterThreads() { return writer_threads; } getProcessorThreads()92 public int getProcessorThreads() { return processor_threads; } getProcessorMinThreads()93 public int getProcessorMinThreads() { return processor_minThreads;} getProcessorMaxThreads()94 public int getProcessorMaxThreads() { return processor_maxThreads;} getProcessorQueueSize()95 public int getProcessorQueueSize() { return processor_queueSize; } getProcessorKeepAliveTime()96 public long getProcessorKeepAliveTime() { return processor_keepAliveTime; } 97 @ManagedAttribute getOpenConnections()98 public int getOpenConnections() {return ct.getNumConnections();} 99 100 101 @Property 102 private int reader_threads= 3; 103 104 @Property 105 private int writer_threads= 3; 106 107 @Property 108 private int processor_threads= 5; // PooledExecutor.createThreads() 109 @Property 110 private int processor_minThreads= 5; // PooledExecutor.setMinimumPoolSize() 111 @Property 112 private int processor_maxThreads= 5; // PooledExecutor.setMaxThreads() 113 @Property 114 private int processor_queueSize=100; // Number of queued requests that can be pending waiting 115 // for a background thread to run the request. 116 @Property 117 private long processor_keepAliveTime= Long.MAX_VALUE; // PooledExecutor.setKeepAliveTime( milliseconds); 118 // negative value used to mean (before 2.5 release) to wait forever, 119 // instead set to Long.MAX_VALUE to keep alive forever 120 private ConnectionTableNIO ct; 121 }