1 package org.jgroups.protocols;
2 
3 import org.jgroups.Event;
4 import org.jgroups.Global;
5 import org.jgroups.Message;
6 import org.jgroups.annotations.LocalAddress;
7 import org.jgroups.annotations.Property;
8 import org.jgroups.annotations.DeprecatedProperty;
9 import org.jgroups.conf.PropertyConverters;
10 import org.jgroups.util.*;
11 
12 import java.io.*;
13 import java.net.*;
14 import java.util.*;
15 
16 /**
17  * Uses its own IP multicast socket to send and receive discovery requests/responses. Can be used in
18  * conjuntion with a non-UDP transport, e.g. TCP.<p>
19  * The discovery is <em>assymetric</em>: discovery requests are broadcast via the multicast socket, and
20  * received via the multicast socket by everyone in the group. However, the discovery responses are sent
21  * back via the regular transport (e.g. TCP) to the sender (discovery request contained sender's regular address,
22  * e.g. 192.168.0.2:7800).
23  * @author Bela Ban
24  */
25 @DeprecatedProperty(names="bind_to_all_interfaces")
26 public class MPING extends PING implements Runnable {
27 
28     private static final boolean can_bind_to_mcast_addr; // are we running on Linux ?
29 
30 
31     static {
32         can_bind_to_mcast_addr=Util.checkForFreeBSD() || Util.checkForLinux() || Util.checkForSolaris() || Util.checkForHp();
33     }
34 
35 
36     /* -----------------------------------------    Properties     -------------------------------------------------- */
37 
38     @LocalAddress
39     @Property(description="Bind address for multicast socket. " +
40             "The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL and NON_LOOPBACK",
41               systemProperty={Global.BIND_ADDR, Global.BIND_ADDR_OLD},
42               defaultValueIPv4=Global.NON_LOOPBACK_ADDRESS, defaultValueIPv6=Global.NON_LOOPBACK_ADDRESS)
43     InetAddress bind_addr=null;
44 
45     @Property(name="bind_interface", converter=PropertyConverters.BindInterface.class,
46     		description="The interface (NIC) which should be used by this transport",dependsUpon="bind_addr")
47     protected String bind_interface_str=null;
48 
49 
50     @Property(description="Time to live for discovery packets. Default is 8", systemProperty=Global.MPING_IP_TTL)
51     int ip_ttl=8;
52 
53     @Property(name="mcast_addr", systemProperty=Global.MPING_MCAST_ADDR,
54               defaultValueIPv4="230.5.6.7", defaultValueIPv6="ff0e::5:6:7")
55     InetAddress mcast_addr=null;
56 
57 
58     @Property(description="Multicast port for discovery packets. Default is 7555", systemProperty=Global.MPING_MCAST_PORT)
59     int mcast_port=7555;
60 
61     @Property(description="If true, the transport should use all available interfaces to receive multicast messages. Default is false")
62     boolean receive_on_all_interfaces=false;
63 
64     /**
65      * List<NetworkInterface> of interfaces to receive multicasts on. The
66      * multicast receive socket will listen on all of these interfaces. This is
67      * a comma-separated list of IP addresses or interface names. E.g.
68      * "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to
69      * an interface once. If this property is set, it override
70      * receive_on_all_interfaces.
71      */
72     @Property(converter=PropertyConverters.NetworkInterfaceList.class, description="List of interfaces to receive multicasts on")
73     List<NetworkInterface> receive_interfaces=null;
74 
75     /**
76      * If true, the transport should use all available interfaces to send
77      * multicast messages. This means the same multicast message is sent N
78      * times, so use with care
79      */
80     @Property(description="Whether send messages are sent on all interfaces. Default is false")
81     boolean send_on_all_interfaces=false;
82 
83     /**
84      * List<NetworkInterface> of interfaces to send multicasts on. The
85      * multicast send socket will send the same multicast message on all of
86      * these interfaces. This is a comma-separated list of IP addresses or
87      * interface names. E.g. "192.168.5.1,eth1,127.0.0.1". Duplicates are
88      * discarded. If this property is set, it override send_on_all_interfaces.
89      */
90     @Property(converter=PropertyConverters.NetworkInterfaceList.class, description="List of interfaces to send multicasts on")
91     List<NetworkInterface> send_interfaces=null;
92 
93 
94 
95     /* --------------------------------------------- Fields ------------------------------------------------------ */
96 
97 
98     private MulticastSocket mcast_sock=null;
99 
100     /**
101      * If we have multiple mcast send sockets, e.g. send_interfaces or
102      * send_on_all_interfaces enabled
103      */
104     private MulticastSocket[] mcast_send_sockets=null;
105 
106     private volatile Thread receiver=null;
107 
108 
109 
MPING()110     public MPING() {
111     }
112 
getBindAddr()113     public InetAddress getBindAddr() {
114         return bind_addr;
115     }
116 
setBindAddr(InetAddress bind_addr)117     public void setBindAddr(InetAddress bind_addr) {
118         this.bind_addr=bind_addr;
119     }
120 
getReceiveInterfaces()121     public List<NetworkInterface> getReceiveInterfaces() {
122         return receive_interfaces;
123     }
124 
getSendInterfaces()125     public List<NetworkInterface> getSendInterfaces() {
126         return send_interfaces;
127     }
128 
isReceiveOnAllInterfaces()129     public boolean isReceiveOnAllInterfaces() {
130         return receive_on_all_interfaces;
131     }
132 
isSendOnAllInterfaces()133     public boolean isSendOnAllInterfaces() {
134         return send_on_all_interfaces;
135     }
136 
getTTL()137     public int getTTL() {
138         return ip_ttl;
139     }
140 
setTTL(int ip_ttl)141     public void setTTL(int ip_ttl) {
142         this.ip_ttl=ip_ttl;
143     }
144 
getMcastAddr()145     public InetAddress getMcastAddr() {
146         return mcast_addr;
147     }
148 
setMcastAddr(InetAddress mcast_addr)149     public void setMcastAddr(InetAddress mcast_addr) {
150         this.mcast_addr=mcast_addr;
151     }
152 
setMulticastAddress(String addr)153     public void setMulticastAddress(String addr) throws UnknownHostException {
154         mcast_addr=InetAddress.getByName(addr);
155     }
156 
getMcastPort()157     public int getMcastPort() {
158         return mcast_port;
159     }
160 
setMcastPort(int mcast_port)161     public void setMcastPort(int mcast_port) {
162         this.mcast_port=mcast_port;
163     }
164 
165 
166 
167     @SuppressWarnings("unchecked")
up(Event evt)168     public Object up(Event evt) {
169         if(evt.getType() == Event.CONFIG) {
170             if(bind_addr == null) {
171                 Map<String,Object> config=(Map<String,Object>)evt.getArg();
172                 bind_addr=(InetAddress)config.get("bind_addr");
173             }
174             return up_prot.up(evt);
175         }
176         return super.up(evt);
177     }
178 
179 
init()180     public void init() throws Exception {
181         super.init();
182         if(log.isDebugEnabled())
183             log.debug("bind_addr=" + bind_addr + " mcast_addr=" + mcast_addr + ", mcast_port=" + mcast_port);
184     }
185 
start()186     public void start() throws Exception {
187         if(can_bind_to_mcast_addr) // https://jira.jboss.org/jira/browse/JGRP-836 - prevent cross talking on Linux
188             mcast_sock=Util.createMulticastSocket(getSocketFactory(), Global.MPING_MCAST_SOCK, mcast_addr, mcast_port, log);
189         else
190             mcast_sock=getSocketFactory().createMulticastSocket(Global.MPING_MCAST_SOCK, mcast_port);
191 
192         mcast_sock.setTimeToLive(ip_ttl);
193 
194         if(receive_on_all_interfaces || (receive_interfaces != null && !receive_interfaces.isEmpty())) {
195             List<NetworkInterface> interfaces;
196             if(receive_interfaces != null)
197                 interfaces=receive_interfaces;
198             else
199                 interfaces=Util.getAllAvailableInterfaces();
200             bindToInterfaces(interfaces, mcast_sock, mcast_addr);
201         }
202         else {
203             if(bind_addr != null)
204                 mcast_sock.setInterface(bind_addr);
205             mcast_sock.joinGroup(mcast_addr);
206         }
207 
208 
209         // 3b. Create mcast sender socket
210         if(send_on_all_interfaces || (send_interfaces != null && !send_interfaces.isEmpty())) {
211             List interfaces;
212             NetworkInterface intf;
213             if(send_interfaces != null)
214                 interfaces=send_interfaces;
215             else
216                 interfaces=Util.getAllAvailableInterfaces();
217             mcast_send_sockets=new MulticastSocket[interfaces.size()];
218             int index=0;
219             for(Iterator it=interfaces.iterator(); it.hasNext();) {
220                 intf=(NetworkInterface)it.next();
221                 mcast_send_sockets[index]=new MulticastSocket();
222                 mcast_send_sockets[index].setNetworkInterface(intf);
223                 mcast_send_sockets[index].setTimeToLive(ip_ttl);
224                 index++;
225             }
226         }
227 
228 
229         startReceiver();
230         super.start();
231     }
232 
233 
bindToInterfaces(List<NetworkInterface> interfaces, MulticastSocket s, InetAddress mcast_addr)234     private void bindToInterfaces(List<NetworkInterface> interfaces, MulticastSocket s, InetAddress mcast_addr) throws IOException {
235         SocketAddress tmp_mcast_addr=new InetSocketAddress(mcast_addr, mcast_port);
236         for(Iterator it=interfaces.iterator(); it.hasNext();) {
237             NetworkInterface i=(NetworkInterface)it.next();
238             for(Enumeration en2=i.getInetAddresses(); en2.hasMoreElements();) {
239                 InetAddress addr=(InetAddress)en2.nextElement();
240                 s.joinGroup(tmp_mcast_addr, i);
241                 if(log.isTraceEnabled())
242                     log.trace("joined " + tmp_mcast_addr + " on " + i.getName() + " (" + addr + ")");
243                 break;
244             }
245         }
246     }
247 
248 
249 
startReceiver()250     private void startReceiver() {
251         if(receiver == null || !receiver.isAlive()) {
252             receiver=new Thread(Util.getGlobalThreadGroup(), this, "ReceiverThread");
253             receiver.setDaemon(true);
254             receiver.start();
255             if(log.isTraceEnabled())
256                 log.trace("receiver thread started");
257         }
258     }
259 
stop()260     public void stop() {
261         Util.close(mcast_sock);
262         mcast_sock=null;
263         receiver=null;
264         super.stop();
265     }
266 
sendMcastDiscoveryRequest(Message msg)267     void sendMcastDiscoveryRequest(Message msg) {
268         Buffer           buf;
269         DatagramPacket   packet;
270         DataOutputStream out=null;
271 
272         try {
273             if(msg.getSrc() == null)
274                 msg.setSrc(local_addr);
275             ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(128);
276             out=new DataOutputStream(out_stream);
277             msg.writeTo(out);
278             out.flush(); // flushes contents to out_stream
279             buf=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
280             packet=new DatagramPacket(buf.getBuf(), buf.getOffset(), buf.getLength(), mcast_addr, mcast_port);
281             discovery_reception.reset();
282             if(mcast_send_sockets != null) {
283                 MulticastSocket s;
284                 for(int i=0; i < mcast_send_sockets.length; i++) {
285                     s=mcast_send_sockets[i];
286                     try {
287                         s.send(packet);
288                     }
289                     catch(Exception e) {
290                         log.error("failed sending packet on socket " + s);
291                     }
292                 }
293             }
294             else { // DEFAULT path
295                 if(mcast_sock != null)
296                     mcast_sock.send(packet);
297             }
298             waitForDiscoveryRequestReception();
299         }
300         catch(IOException ex) {
301             log.error("failed sending discovery request", ex);
302         }
303         finally {
304             Util.close(out);
305         }
306     }
307 
308 
309 
run()310     public void run() {
311         final byte[]         receive_buf=new byte[65535];
312         DatagramPacket       packet=new DatagramPacket(receive_buf, receive_buf.length);
313         byte[]               data;
314         ByteArrayInputStream inp_stream;
315         DataInputStream      inp=null;
316         Message              msg;
317 
318         while(mcast_sock != null && receiver != null && Thread.currentThread().equals(receiver)) {
319             packet.setData(receive_buf, 0, receive_buf.length);
320             try {
321                 mcast_sock.receive(packet);
322                 data=packet.getData();
323                 inp_stream=new ExposedByteArrayInputStream(data, 0, data.length);
324                 inp=new DataInputStream(inp_stream);
325                 msg=new Message();
326                 msg.readFrom(inp);
327                 up(new Event(Event.MSG, msg));
328             }
329             catch(SocketException socketEx) {
330                 break;
331             }
332             catch(Throwable ex) {
333                 log.error("failed receiving packet (from " + packet.getSocketAddress() + ")", ex);
334             }
335             finally {
336                 Util.close(inp);
337             }
338         }
339         if(log.isTraceEnabled())
340             log.trace("receiver thread terminated");
341     }
342 
343 
344 }
345