1 package org.jgroups.protocols;
2 
3 import org.jgroups.Address;
4 import org.jgroups.PhysicalAddress;
5 import org.jgroups.annotations.Experimental;
6 import org.jgroups.annotations.ManagedAttribute;
7 import org.jgroups.annotations.Property;
8 import org.jgroups.annotations.Unsupported;
9 import org.jgroups.blocks.BasicConnectionTable;
10 import org.jgroups.blocks.ConnectionTableNIO;
11 
12 import java.net.InetAddress;
13 import java.util.Collection;
14 
15 /**
16  * Transport using NIO
17  * @author Scott Marlow
18  * @author Alex Fu
19  * @author Bela Ban
20  */
21 @Experimental @Unsupported
22 public class TCP_NIO extends BasicTCP implements BasicConnectionTable.Receiver
23 {
24 
25     /*
26     * (non-Javadoc)
27     *
28     * @see org.jgroups.protocols.TCP#getConnectionTable(long, long)
29     */
getConnectionTable(long ri, long cet, InetAddress b_addr, InetAddress bc_addr, int s_port, int e_port)30    protected ConnectionTableNIO getConnectionTable(long ri, long cet,
31                                                    InetAddress b_addr, InetAddress bc_addr,
32                                                    int s_port, int e_port) throws Exception {
33        ConnectionTableNIO retval=null;
34        if (ri == 0 && cet == 0) {
35            retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, false );
36        }
37        else {
38            if (ri == 0) {
39                ri = 5000;
40                if(log.isWarnEnabled()) log.warn("reaper_interval was 0, set it to " + ri);
41            }
42            if (cet == 0) {
43                cet = 1000 * 60 * 5;
44                if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + cet);
45            }
46            retval = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, ri, cet, false);
47        }
48        retval.setThreadFactory(getThreadFactory());
49        retval.setProcessorMaxThreads(getProcessorMaxThreads());
50        retval.setProcessorQueueSize(getProcessorQueueSize());
51        retval.setProcessorMinThreads(getProcessorMinThreads());
52        retval.setProcessorKeepAliveTime(getProcessorKeepAliveTime());
53        retval.setProcessorThreads(getProcessorThreads());
54        retval.start();
55        return retval;
56    }
57 
printConnections()58     public String printConnections()     {return ct.toString();}
59 
getPhysicalAddress()60     protected PhysicalAddress getPhysicalAddress() {
61         return ct != null? (PhysicalAddress)ct.getLocalAddress() : null;
62     }
63 
send(Address dest, byte[] data, int offset, int length)64    public void send(Address dest, byte[] data, int offset, int length) throws Exception {
65       ct.send(dest, data, offset, length);
66    }
67 
start()68    public void start() throws Exception {
69        ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,bind_port,bind_port+port_range);
70        ct.setUseSendQueues(use_send_queues);
71        // ct.addConnectionListener(this);
72        ct.setReceiveBufferSize(recv_buf_size);
73        ct.setSendBufferSize(send_buf_size);
74        ct.setSocketConnectionTimeout(sock_conn_timeout);
75        ct.setPeerAddressReadTimeout(peer_addr_read_timeout);
76        ct.setTcpNodelay(tcp_nodelay);
77        ct.setLinger(linger);
78        super.start();
79    }
80 
retainAll(Collection<Address> members)81    public void retainAll(Collection<Address> members) {
82       ct.retainAll(members);
83    }
84 
stop()85    public void stop() {
86        ct.stop();
87        super.stop();
88    }
89 
getReaderThreads()90    public int getReaderThreads() { return reader_threads; }
getWriterThreads()91    public int getWriterThreads() { return writer_threads; }
getProcessorThreads()92    public int getProcessorThreads() { return processor_threads; }
getProcessorMinThreads()93    public int getProcessorMinThreads() { return processor_minThreads;}
getProcessorMaxThreads()94    public int getProcessorMaxThreads() { return processor_maxThreads;}
getProcessorQueueSize()95    public int getProcessorQueueSize() { return processor_queueSize; }
getProcessorKeepAliveTime()96    public long getProcessorKeepAliveTime() { return processor_keepAliveTime; }
97    @ManagedAttribute
getOpenConnections()98    public int getOpenConnections()      {return ct.getNumConnections();}
99 
100 
101    @Property
102    private int reader_threads= 3;
103 
104    @Property
105    private int writer_threads= 3;
106 
107    @Property
108    private int processor_threads= 5;                    // PooledExecutor.createThreads()
109    @Property
110    private int processor_minThreads= 5;                 // PooledExecutor.setMinimumPoolSize()
111    @Property
112    private int processor_maxThreads= 5;                 // PooledExecutor.setMaxThreads()
113    @Property
114    private int processor_queueSize=100;                   // Number of queued requests that can be pending waiting
115                                                             // for a background thread to run the request.
116    @Property
117    private long processor_keepAliveTime= Long.MAX_VALUE; // PooledExecutor.setKeepAliveTime( milliseconds);
118                                                             // negative value used to mean (before 2.5 release) to wait forever,
119                                                             // instead set to Long.MAX_VALUE to keep alive forever
120    private ConnectionTableNIO ct;
121 }