1 package org.jgroups.protocols; 2 3 import org.jgroups.Event; 4 import org.jgroups.Global; 5 import org.jgroups.Message; 6 import org.jgroups.annotations.LocalAddress; 7 import org.jgroups.annotations.Property; 8 import org.jgroups.annotations.DeprecatedProperty; 9 import org.jgroups.conf.PropertyConverters; 10 import org.jgroups.util.*; 11 12 import java.io.*; 13 import java.net.*; 14 import java.util.*; 15 16 /** 17 * Uses its own IP multicast socket to send and receive discovery requests/responses. Can be used in 18 * conjuntion with a non-UDP transport, e.g. TCP.<p> 19 * The discovery is <em>assymetric</em>: discovery requests are broadcast via the multicast socket, and 20 * received via the multicast socket by everyone in the group. However, the discovery responses are sent 21 * back via the regular transport (e.g. TCP) to the sender (discovery request contained sender's regular address, 22 * e.g. 192.168.0.2:7800). 23 * @author Bela Ban 24 */ 25 @DeprecatedProperty(names="bind_to_all_interfaces") 26 public class MPING extends PING implements Runnable { 27 28 private static final boolean can_bind_to_mcast_addr; // are we running on Linux ? 29 30 31 static { 32 can_bind_to_mcast_addr=Util.checkForFreeBSD() || Util.checkForLinux() || Util.checkForSolaris() || Util.checkForHp(); 33 } 34 35 36 /* ----------------------------------------- Properties -------------------------------------------------- */ 37 38 @LocalAddress 39 @Property(description="Bind address for multicast socket. " + 40 "The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL and NON_LOOPBACK", 41 systemProperty={Global.BIND_ADDR, Global.BIND_ADDR_OLD}, 42 defaultValueIPv4=Global.NON_LOOPBACK_ADDRESS, defaultValueIPv6=Global.NON_LOOPBACK_ADDRESS) 43 InetAddress bind_addr=null; 44 45 @Property(name="bind_interface", converter=PropertyConverters.BindInterface.class, 46 description="The interface (NIC) which should be used by this transport",dependsUpon="bind_addr") 47 protected String bind_interface_str=null; 48 49 50 @Property(description="Time to live for discovery packets. Default is 8", systemProperty=Global.MPING_IP_TTL) 51 int ip_ttl=8; 52 53 @Property(name="mcast_addr", systemProperty=Global.MPING_MCAST_ADDR, 54 defaultValueIPv4="230.5.6.7", defaultValueIPv6="ff0e::5:6:7") 55 InetAddress mcast_addr=null; 56 57 58 @Property(description="Multicast port for discovery packets. Default is 7555", systemProperty=Global.MPING_MCAST_PORT) 59 int mcast_port=7555; 60 61 @Property(description="If true, the transport should use all available interfaces to receive multicast messages. Default is false") 62 boolean receive_on_all_interfaces=false; 63 64 /** 65 * List<NetworkInterface> of interfaces to receive multicasts on. The 66 * multicast receive socket will listen on all of these interfaces. This is 67 * a comma-separated list of IP addresses or interface names. E.g. 68 * "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to 69 * an interface once. If this property is set, it override 70 * receive_on_all_interfaces. 71 */ 72 @Property(converter=PropertyConverters.NetworkInterfaceList.class, description="List of interfaces to receive multicasts on") 73 List<NetworkInterface> receive_interfaces=null; 74 75 /** 76 * If true, the transport should use all available interfaces to send 77 * multicast messages. This means the same multicast message is sent N 78 * times, so use with care 79 */ 80 @Property(description="Whether send messages are sent on all interfaces. Default is false") 81 boolean send_on_all_interfaces=false; 82 83 /** 84 * List<NetworkInterface> of interfaces to send multicasts on. The 85 * multicast send socket will send the same multicast message on all of 86 * these interfaces. This is a comma-separated list of IP addresses or 87 * interface names. E.g. "192.168.5.1,eth1,127.0.0.1". Duplicates are 88 * discarded. If this property is set, it override send_on_all_interfaces. 89 */ 90 @Property(converter=PropertyConverters.NetworkInterfaceList.class, description="List of interfaces to send multicasts on") 91 List<NetworkInterface> send_interfaces=null; 92 93 94 95 /* --------------------------------------------- Fields ------------------------------------------------------ */ 96 97 98 private MulticastSocket mcast_sock=null; 99 100 /** 101 * If we have multiple mcast send sockets, e.g. send_interfaces or 102 * send_on_all_interfaces enabled 103 */ 104 private MulticastSocket[] mcast_send_sockets=null; 105 106 private volatile Thread receiver=null; 107 108 109 MPING()110 public MPING() { 111 } 112 getBindAddr()113 public InetAddress getBindAddr() { 114 return bind_addr; 115 } 116 setBindAddr(InetAddress bind_addr)117 public void setBindAddr(InetAddress bind_addr) { 118 this.bind_addr=bind_addr; 119 } 120 getReceiveInterfaces()121 public List<NetworkInterface> getReceiveInterfaces() { 122 return receive_interfaces; 123 } 124 getSendInterfaces()125 public List<NetworkInterface> getSendInterfaces() { 126 return send_interfaces; 127 } 128 isReceiveOnAllInterfaces()129 public boolean isReceiveOnAllInterfaces() { 130 return receive_on_all_interfaces; 131 } 132 isSendOnAllInterfaces()133 public boolean isSendOnAllInterfaces() { 134 return send_on_all_interfaces; 135 } 136 getTTL()137 public int getTTL() { 138 return ip_ttl; 139 } 140 setTTL(int ip_ttl)141 public void setTTL(int ip_ttl) { 142 this.ip_ttl=ip_ttl; 143 } 144 getMcastAddr()145 public InetAddress getMcastAddr() { 146 return mcast_addr; 147 } 148 setMcastAddr(InetAddress mcast_addr)149 public void setMcastAddr(InetAddress mcast_addr) { 150 this.mcast_addr=mcast_addr; 151 } 152 setMulticastAddress(String addr)153 public void setMulticastAddress(String addr) throws UnknownHostException { 154 mcast_addr=InetAddress.getByName(addr); 155 } 156 getMcastPort()157 public int getMcastPort() { 158 return mcast_port; 159 } 160 setMcastPort(int mcast_port)161 public void setMcastPort(int mcast_port) { 162 this.mcast_port=mcast_port; 163 } 164 165 166 167 @SuppressWarnings("unchecked") up(Event evt)168 public Object up(Event evt) { 169 if(evt.getType() == Event.CONFIG) { 170 if(bind_addr == null) { 171 Map<String,Object> config=(Map<String,Object>)evt.getArg(); 172 bind_addr=(InetAddress)config.get("bind_addr"); 173 } 174 return up_prot.up(evt); 175 } 176 return super.up(evt); 177 } 178 179 init()180 public void init() throws Exception { 181 super.init(); 182 if(log.isDebugEnabled()) 183 log.debug("bind_addr=" + bind_addr + " mcast_addr=" + mcast_addr + ", mcast_port=" + mcast_port); 184 } 185 start()186 public void start() throws Exception { 187 if(can_bind_to_mcast_addr) // https://jira.jboss.org/jira/browse/JGRP-836 - prevent cross talking on Linux 188 mcast_sock=Util.createMulticastSocket(getSocketFactory(), Global.MPING_MCAST_SOCK, mcast_addr, mcast_port, log); 189 else 190 mcast_sock=getSocketFactory().createMulticastSocket(Global.MPING_MCAST_SOCK, mcast_port); 191 192 mcast_sock.setTimeToLive(ip_ttl); 193 194 if(receive_on_all_interfaces || (receive_interfaces != null && !receive_interfaces.isEmpty())) { 195 List<NetworkInterface> interfaces; 196 if(receive_interfaces != null) 197 interfaces=receive_interfaces; 198 else 199 interfaces=Util.getAllAvailableInterfaces(); 200 bindToInterfaces(interfaces, mcast_sock, mcast_addr); 201 } 202 else { 203 if(bind_addr != null) 204 mcast_sock.setInterface(bind_addr); 205 mcast_sock.joinGroup(mcast_addr); 206 } 207 208 209 // 3b. Create mcast sender socket 210 if(send_on_all_interfaces || (send_interfaces != null && !send_interfaces.isEmpty())) { 211 List interfaces; 212 NetworkInterface intf; 213 if(send_interfaces != null) 214 interfaces=send_interfaces; 215 else 216 interfaces=Util.getAllAvailableInterfaces(); 217 mcast_send_sockets=new MulticastSocket[interfaces.size()]; 218 int index=0; 219 for(Iterator it=interfaces.iterator(); it.hasNext();) { 220 intf=(NetworkInterface)it.next(); 221 mcast_send_sockets[index]=new MulticastSocket(); 222 mcast_send_sockets[index].setNetworkInterface(intf); 223 mcast_send_sockets[index].setTimeToLive(ip_ttl); 224 index++; 225 } 226 } 227 228 229 startReceiver(); 230 super.start(); 231 } 232 233 bindToInterfaces(List<NetworkInterface> interfaces, MulticastSocket s, InetAddress mcast_addr)234 private void bindToInterfaces(List<NetworkInterface> interfaces, MulticastSocket s, InetAddress mcast_addr) throws IOException { 235 SocketAddress tmp_mcast_addr=new InetSocketAddress(mcast_addr, mcast_port); 236 for(Iterator it=interfaces.iterator(); it.hasNext();) { 237 NetworkInterface i=(NetworkInterface)it.next(); 238 for(Enumeration en2=i.getInetAddresses(); en2.hasMoreElements();) { 239 InetAddress addr=(InetAddress)en2.nextElement(); 240 s.joinGroup(tmp_mcast_addr, i); 241 if(log.isTraceEnabled()) 242 log.trace("joined " + tmp_mcast_addr + " on " + i.getName() + " (" + addr + ")"); 243 break; 244 } 245 } 246 } 247 248 249 startReceiver()250 private void startReceiver() { 251 if(receiver == null || !receiver.isAlive()) { 252 receiver=new Thread(Util.getGlobalThreadGroup(), this, "ReceiverThread"); 253 receiver.setDaemon(true); 254 receiver.start(); 255 if(log.isTraceEnabled()) 256 log.trace("receiver thread started"); 257 } 258 } 259 stop()260 public void stop() { 261 Util.close(mcast_sock); 262 mcast_sock=null; 263 receiver=null; 264 super.stop(); 265 } 266 sendMcastDiscoveryRequest(Message msg)267 void sendMcastDiscoveryRequest(Message msg) { 268 Buffer buf; 269 DatagramPacket packet; 270 DataOutputStream out=null; 271 272 try { 273 if(msg.getSrc() == null) 274 msg.setSrc(local_addr); 275 ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(128); 276 out=new DataOutputStream(out_stream); 277 msg.writeTo(out); 278 out.flush(); // flushes contents to out_stream 279 buf=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size()); 280 packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), mcast_addr, mcast_port); 281 discovery_reception.reset(); 282 if(mcast_send_sockets != null) { 283 MulticastSocket s; 284 for(int i=0; i < mcast_send_sockets.length; i++) { 285 s=mcast_send_sockets[i]; 286 try { 287 s.send(packet); 288 } 289 catch(Exception e) { 290 log.error("failed sending packet on socket " + s); 291 } 292 } 293 } 294 else { // DEFAULT path 295 if(mcast_sock != null) 296 mcast_sock.send(packet); 297 } 298 waitForDiscoveryRequestReception(); 299 } 300 catch(IOException ex) { 301 log.error("failed sending discovery request", ex); 302 } 303 finally { 304 Util.close(out); 305 } 306 } 307 308 309 run()310 public void run() { 311 final byte[] receive_buf=new byte[65535]; 312 DatagramPacket packet=new DatagramPacket(receive_buf, receive_buf.length); 313 byte[] data; 314 ByteArrayInputStream inp_stream; 315 DataInputStream inp=null; 316 Message msg; 317 318 while(mcast_sock != null && receiver != null && Thread.currentThread().equals(receiver)) { 319 packet.setData(receive_buf, 0, receive_buf.length); 320 try { 321 mcast_sock.receive(packet); 322 data=packet.getData(); 323 inp_stream=new ExposedByteArrayInputStream(data, 0, data.length); 324 inp=new DataInputStream(inp_stream); 325 msg=new Message(); 326 msg.readFrom(inp); 327 up(new Event(Event.MSG, msg)); 328 } 329 catch(SocketException socketEx) { 330 break; 331 } 332 catch(Throwable ex) { 333 log.error("failed receiving packet (from " + packet.getSocketAddress() + ")", ex); 334 } 335 finally { 336 Util.close(inp); 337 } 338 } 339 if(log.isTraceEnabled()) 340 log.trace("receiver thread terminated"); 341 } 342 343 344 } 345