1 2 package org.jgroups.protocols; 3 4 import org.jgroups.Address; 5 import org.jgroups.PhysicalAddress; 6 import org.jgroups.Global; 7 import org.jgroups.annotations.ManagedAttribute; 8 import org.jgroups.annotations.ManagedOperation; 9 import org.jgroups.blocks.TCPConnectionMap; 10 import org.jgroups.util.SocketFactory; 11 12 import java.net.InetAddress; 13 import java.util.Collection; 14 15 /** 16 * TCP based protocol. Creates a server socket, which gives us the local address 17 * of this group member. For each accept() on the server socket, a new thread is 18 * created that listens on the socket. For each outgoing message m, if m.dest is 19 * in the outgoing hash table, the associated socket will be reused to send 20 * message, otherwise a new socket is created and put in the hash table. When a 21 * socket connection breaks or a member is removed from the group, the 22 * corresponding items in the incoming and outgoing hash tables will be removed 23 * as well. 24 * <p> 25 * 26 * This functionality is in TCPConnectionMap, which is used by TCP. TCP sends 27 * messages using ct.send() and registers with the connection table to receive 28 * all incoming messages. 29 * 30 * @author Bela Ban 31 */ 32 public class TCP extends BasicTCP implements TCPConnectionMap.Receiver { 33 34 private TCPConnectionMap ct=null; 35 TCP()36 public TCP() {} 37 38 39 @ManagedAttribute getOpenConnections()40 public int getOpenConnections() { 41 return ct.getNumConnections(); 42 } 43 44 @ManagedOperation printConnections()45 public String printConnections() { 46 return ct.printConnections(); 47 } 48 setSocketFactory(SocketFactory factory)49 public void setSocketFactory(SocketFactory factory) { 50 super.setSocketFactory(factory); 51 if(ct != null) 52 ct.setSocketFactory(factory); 53 } 54 send(Address dest, byte[] data, int offset, int length)55 public void send(Address dest, byte[] data, int offset, int length) throws Exception { 56 ct.send(dest, data, offset, length); 57 } 58 retainAll(Collection<Address> members)59 public void retainAll(Collection<Address> members) { 60 ct.retainAll(members); 61 } 62 start()63 public void start() throws Exception { 64 ct=createConnectionMap(reaper_interval, 65 conn_expire_time, 66 bind_addr, 67 external_addr, 68 bind_port, 69 bind_port+port_range 70 ); 71 ct.setReceiveBufferSize(recv_buf_size); 72 ct.setSendQueueSize(send_queue_size); 73 ct.setUseSendQueues(use_send_queues); 74 ct.setSendBufferSize(send_buf_size); 75 ct.setSocketConnectionTimeout(sock_conn_timeout); 76 ct.setTcpNodelay(tcp_nodelay); 77 ct.setLinger(linger); 78 ct.setSocketFactory(getSocketFactory()); 79 80 // we first start threads in TP (http://jira.jboss.com/jira/browse/JGRP-626) 81 super.start(); 82 } 83 stop()84 public void stop() { 85 if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads"); 86 ct.stop(); //not needed, but just in case 87 super.stop(); 88 } 89 90 handleConnect()91 protected void handleConnect() throws Exception { 92 if(isSingleton()) { 93 if(connect_count == 0) { 94 ct.start(); 95 } 96 super.handleConnect(); 97 } 98 else 99 ct.start(); 100 } 101 handleDisconnect()102 protected void handleDisconnect() { 103 if(isSingleton()) { 104 super.handleDisconnect(); 105 if(connect_count == 0) { 106 ct.stop(); 107 } 108 } 109 else 110 ct.stop(); 111 } 112 113 /** 114 * @param reaperInterval 115 * @param connExpireTime 116 * @param bindAddress 117 * @param startPort 118 * @throws Exception 119 * @return TCPConnectionMap Subclasses override this method to initialize a different version of ConnectionMap 120 */ createConnectionMap(long reaperInterval, long connExpireTime, InetAddress bindAddress, InetAddress externalAddress, int startPort, int endPort )121 protected TCPConnectionMap createConnectionMap(long reaperInterval, 122 long connExpireTime, 123 InetAddress bindAddress, 124 InetAddress externalAddress, 125 int startPort, 126 int endPort 127 ) throws Exception { 128 TCPConnectionMap cTable; 129 if(reaperInterval == 0 && connExpireTime == 0) { 130 cTable=new TCPConnectionMap(Global.TCP_SRV_SOCK, 131 getThreadFactory(), 132 getSocketFactory(), 133 this, 134 bindAddress, 135 externalAddress, 136 startPort, 137 endPort 138 ); 139 } 140 else { 141 if(reaperInterval == 0) { 142 reaperInterval=5000; 143 if(log.isWarnEnabled()) 144 log.warn("reaper_interval was 0, set it to " + reaperInterval); 145 } 146 if(connExpireTime == 0) { 147 connExpireTime=1000 * 60 * 5; 148 if(log.isWarnEnabled()) 149 log.warn("conn_expire_time was 0, set it to " + connExpireTime); 150 } 151 cTable=new TCPConnectionMap(Global.TCP_SRV_SOCK, 152 getThreadFactory(), 153 getSocketFactory(), 154 this, 155 bindAddress, 156 externalAddress, 157 startPort, 158 endPort, 159 reaperInterval, 160 connExpireTime 161 ); 162 } 163 164 return cTable; 165 } 166 getPhysicalAddress()167 protected PhysicalAddress getPhysicalAddress() { 168 return ct != null? (PhysicalAddress)ct.getLocalAddress() : null; 169 } 170 } 171