1 
2 package org.jgroups.protocols;
3 
4 import org.jgroups.Address;
5 import org.jgroups.PhysicalAddress;
6 import org.jgroups.Global;
7 import org.jgroups.annotations.ManagedAttribute;
8 import org.jgroups.annotations.ManagedOperation;
9 import org.jgroups.blocks.TCPConnectionMap;
10 import org.jgroups.util.SocketFactory;
11 
12 import java.net.InetAddress;
13 import java.util.Collection;
14 
15 /**
16  * TCP based protocol. Creates a server socket, which gives us the local address
17  * of this group member. For each accept() on the server socket, a new thread is
18  * created that listens on the socket. For each outgoing message m, if m.dest is
19  * in the outgoing hash table, the associated socket will be reused to send
20  * message, otherwise a new socket is created and put in the hash table. When a
21  * socket connection breaks or a member is removed from the group, the
22  * corresponding items in the incoming and outgoing hash tables will be removed
23  * as well.
24  * <p>
25  *
26  * This functionality is in TCPConnectionMap, which is used by TCP. TCP sends
27  * messages using ct.send() and registers with the connection table to receive
28  * all incoming messages.
29  *
30  * @author Bela Ban
31  */
32 public class TCP extends BasicTCP implements TCPConnectionMap.Receiver {
33 
34     private TCPConnectionMap ct=null;
35 
TCP()36     public TCP() {}
37 
38 
39     @ManagedAttribute
getOpenConnections()40     public int getOpenConnections() {
41         return ct.getNumConnections();
42     }
43 
44     @ManagedOperation
printConnections()45     public String printConnections() {
46         return ct.printConnections();
47     }
48 
setSocketFactory(SocketFactory factory)49     public void setSocketFactory(SocketFactory factory) {
50         super.setSocketFactory(factory);
51         if(ct != null)
52             ct.setSocketFactory(factory);
53     }
54 
send(Address dest, byte[] data, int offset, int length)55     public void send(Address dest, byte[] data, int offset, int length) throws Exception {
56         ct.send(dest, data, offset, length);
57     }
58 
retainAll(Collection<Address> members)59     public void retainAll(Collection<Address> members) {
60         ct.retainAll(members);
61     }
62 
start()63     public void start() throws Exception {
64         ct=createConnectionMap(reaper_interval,
65                               conn_expire_time,
66                               bind_addr,
67                               external_addr,
68                               bind_port,
69                               bind_port+port_range
70                               );
71         ct.setReceiveBufferSize(recv_buf_size);
72         ct.setSendQueueSize(send_queue_size);
73         ct.setUseSendQueues(use_send_queues);
74         ct.setSendBufferSize(send_buf_size);
75         ct.setSocketConnectionTimeout(sock_conn_timeout);
76         ct.setTcpNodelay(tcp_nodelay);
77         ct.setLinger(linger);
78         ct.setSocketFactory(getSocketFactory());
79 
80         // we first start threads in TP (http://jira.jboss.com/jira/browse/JGRP-626)
81         super.start();
82     }
83 
stop()84     public void stop() {
85         if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads");
86         ct.stop(); //not needed, but just in case
87         super.stop();
88     }
89 
90 
handleConnect()91     protected void handleConnect() throws Exception {
92         if(isSingleton()) {
93             if(connect_count == 0) {
94                 ct.start();
95             }
96             super.handleConnect();
97         }
98         else
99             ct.start();
100     }
101 
handleDisconnect()102     protected void handleDisconnect() {
103         if(isSingleton()) {
104             super.handleDisconnect();
105             if(connect_count == 0) {
106                 ct.stop();
107             }
108         }
109         else
110             ct.stop();
111     }
112 
113     /**
114      * @param reaperInterval
115      * @param connExpireTime
116      * @param bindAddress
117      * @param startPort
118      * @throws Exception
119      * @return TCPConnectionMap Subclasses override this method to initialize a different version of ConnectionMap
120      */
createConnectionMap(long reaperInterval, long connExpireTime, InetAddress bindAddress, InetAddress externalAddress, int startPort, int endPort )121     protected TCPConnectionMap createConnectionMap(long reaperInterval,
122                                                    long connExpireTime,
123                                                    InetAddress bindAddress,
124                                                    InetAddress externalAddress,
125                                                    int startPort,
126                                                    int endPort
127     ) throws Exception {
128         TCPConnectionMap cTable;
129         if(reaperInterval == 0 && connExpireTime == 0) {
130             cTable=new TCPConnectionMap(Global.TCP_SRV_SOCK,
131                                         getThreadFactory(),
132                                         getSocketFactory(),
133                                         this,
134                                         bindAddress,
135                                         externalAddress,
136                                         startPort,
137                                         endPort
138             );
139         }
140         else {
141             if(reaperInterval == 0) {
142                 reaperInterval=5000;
143                 if(log.isWarnEnabled())
144                     log.warn("reaper_interval was 0, set it to " + reaperInterval);
145             }
146             if(connExpireTime == 0) {
147                 connExpireTime=1000 * 60 * 5;
148                 if(log.isWarnEnabled())
149                     log.warn("conn_expire_time was 0, set it to " + connExpireTime);
150             }
151             cTable=new TCPConnectionMap(Global.TCP_SRV_SOCK,
152                                         getThreadFactory(),
153                                         getSocketFactory(),
154                                         this,
155                                         bindAddress,
156                                         externalAddress,
157                                         startPort,
158                                         endPort,
159                                         reaperInterval,
160                                         connExpireTime
161             );
162         }
163 
164         return cTable;
165     }
166 
getPhysicalAddress()167     protected PhysicalAddress getPhysicalAddress() {
168         return ct != null? (PhysicalAddress)ct.getLocalAddress() : null;
169     }
170 }
171