1 package org.jgroups.protocols;
2 
3 
4 import org.jgroups.*;
5 import org.jgroups.annotations.*;
6 import org.jgroups.stack.Protocol;
7 import org.jgroups.util.TimeScheduler;
8 import org.jgroups.util.Util;
9 
10 import java.util.*;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.locks.Lock;
14 import java.util.concurrent.locks.ReentrantLock;
15 
16 
17 /**
18  * Protocol to discover subgroups; e.g., existing due to a network partition (that healed). Example: group
19  * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send
20  * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time
21  * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the
22  * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done
23  * somewhere above this protocol (typically in the GMS protocol).<p>
24  * This protocol works as follows:
25  * <ul>
26  * <li>If coordinator: periodically retrieve the initial membership (using the FIND_INITIAL_MBRS event provided e.g.
27  *     by PING or TCPPING protocols. This list contains {coord,addr} pairs.
28  * <li>If there is more than 1 coordinator:
29  *     <ol>
30  *     <li>Get all coordinators
31  *     <li>Create a MERGE event with the list of coordinators as argument
32  *     <li>Send the event up the stack
33  *     </ol>
34  * </ul>
35  *
36  * <p>
37  *
38  * Requires: FIND_INITIAL_MBRS event from below<br>
39  * Provides: sends MERGE event with list of coordinators up the stack<br>
40  * @author Bela Ban, Oct 16 2001
41  */
42 @MBean(description="Protocol to discover subgroups existing due to a network partition")
43 @DeprecatedProperty(names={"use_separate_thread"})
44 public class MERGE2 extends Protocol {
45 
46 
47     /* -----------------------------------------    Properties     -------------------------------------------------- */
48 
49     @Property(description="Minimum time in msbetween runs to discover other clusters")
50     protected long min_interval=5000;
51 
52     @Property(description="Maximum time in ms between runs to discover other clusters")
53     protected long max_interval=20000;
54 
55     @Property(description="Number of inconsistent  views with only 1 coord after a MERGE event is sent up")
56     protected int inconsistent_view_threshold=1;
57 
58     @Property(description="When receiving a multicast message, checks if the sender is member of the cluster. " +
59             "If not, initiates a merge")
60     protected boolean merge_fast=true;
61 
62     @Property(description="The delay (in milliseconds) after which a merge fast execution is started")
63     protected long merge_fast_delay=1000;
64 
65     /* ---------------------------------------------- JMX -------------------------------------------------------- */
66     @ManagedAttribute(writable=false, description="whether or not a merge task is currently running " +
67             "(should be the case in a coordinator")
isMergeTaskRunning()68     public boolean isMergeTaskRunning() {
69         return task.isRunning();
70     }
71 
72     /* --------------------------------------------- Fields ------------------------------------------------------ */
73 
74     private Address local_addr=null;
75 
76     private View view;
77 
78     private final Set<Address> members=new HashSet<Address>();
79 
80     private final Set<Address> merge_candidates=new HashSet<Address>();
81 
82     private final FindSubgroupsTask task=new FindSubgroupsTask();
83 
84     private volatile boolean is_coord=false;
85 
86     private TimeScheduler timer;
87 
88     @ManagedAttribute(description="Number of inconsistent 1-coord views until a MERGE event is sent up the stack")
89     private int num_inconsistent_views=0;
90 
91 
92 
MERGE2()93     public MERGE2() {
94     }
95 
96 
init()97     public void init() throws Exception {
98         timer=getTransport().getTimer();
99         if(timer == null)
100             throw new Exception("timer cannot be retrieved");
101 
102         if(min_interval <= 0 || max_interval <= 0) {
103             throw new Exception("min_interval and max_interval have to be > 0");
104         }
105 
106         if(max_interval <= min_interval) {
107             throw new Exception ("max_interval has to be greater than min_interval");
108         }
109     }
110 
getMinInterval()111     public long getMinInterval() {
112         return min_interval;
113     }
114 
setMinInterval(long i)115     public void setMinInterval(long i) {
116         min_interval=i;
117     }
118 
getMaxInterval()119     public long getMaxInterval() {
120         return max_interval;
121     }
122 
setMaxInterval(long l)123     public void setMaxInterval(long l) {
124         max_interval=l;
125     }
126 
requiredDownServices()127     public Vector<Integer> requiredDownServices() {
128         Vector<Integer> retval=new Vector<Integer>(1);
129         retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));
130         return retval;
131     }
132 
133     /** Discovers members and detects whether we have multiple coordinator. If so, kicks off a merge */
134     @ManagedOperation
sendMergeSolicitation()135     public void sendMergeSolicitation() {
136         task.findAndNotify();
137     }
138 
startMergeTask()139     @ManagedOperation public void startMergeTask() {task.start();}
140 
stopMergeTask()141     @ManagedOperation public void stopMergeTask() {task.stop();}
142 
stop()143     public void stop() {
144         is_coord=false;
145         merge_candidates.clear();
146         task.stop();
147     }
148 
149 
down(Event evt)150     public Object down(Event evt) {
151         switch(evt.getType()) {
152 
153             case Event.VIEW_CHANGE:
154                 Object ret=down_prot.down(evt);
155                 view=(View)evt.getArg();
156                 Vector<Address> mbrs=view.getMembers();
157                 if(mbrs == null || mbrs.isEmpty() || local_addr == null) {
158                     task.stop();
159                     return ret;
160                 }
161                 members.clear();
162                 members.addAll(mbrs);
163                 merge_candidates.removeAll(members);
164                 Address coord=mbrs.elementAt(0);
165                 if(coord.equals(local_addr)) {
166                     is_coord=true;
167                     task.start(); // start task if we became coordinator (doesn't start if already running)
168                 }
169                 else {
170                     // if we were coordinator, but are no longer, stop task. this happens e.g. when we merge and someone
171                     // else becomes the new coordinator of the merged group
172                     if(is_coord) {
173                         is_coord=false;
174                     }
175                     task.stop();
176                 }
177                 return ret;
178 
179             case Event.SET_LOCAL_ADDRESS:
180                 local_addr=(Address)evt.getArg();
181                 return down_prot.down(evt);
182 
183             default:
184                 return down_prot.down(evt);          // Pass on to the layer below us
185         }
186     }
187 
up(Event evt)188     public Object up(Event evt) {
189         switch(evt.getType()) {
190             case Event.MSG:
191                 if(!merge_fast)
192                     break;
193                 Message msg=(Message)evt.getArg();
194                 Address dest=msg.getDest();
195                 boolean multicast=dest == null || dest.isMulticastAddress();
196                 if(!multicast)
197                     break;
198                 final Address sender=msg.getSrc();
199                 if(!members.contains(sender) && merge_candidates.add(sender)) {
200                     timer.schedule(new Runnable() {
201                         public void run() {
202                             if(!members.contains(sender))
203                                 task.findAndNotify();
204                         }
205                     }, merge_fast_delay, TimeUnit.MILLISECONDS);
206                 }
207                 break;
208         }
209         return up_prot.up(evt);
210     }
211 
212     /**
213 	 * Task periodically executing (if role is coordinator). Gets the initial membership and determines
214 	 * whether there are subgroups (multiple coordinators for the same group). If yes, it sends a MERGE event
215 	 * with the list of the coordinators up the stack
216 	 */
217     private class FindSubgroupsTask {
218         @GuardedBy("this")
219         private Future<?> future;
220         private Lock      lock=new ReentrantLock();
221 
start()222         public synchronized void start() {
223             if(future == null || future.isDone() || future.isCancelled()) {
224                 future=timer.scheduleWithFixedDelay(new Runnable() {
225                     public void run() {
226                         findAndNotify();
227                     }
228                 },  Math.max(5000L, computeInterval()), computeInterval(), TimeUnit.MILLISECONDS);
229             }
230         }
231 
stop()232         public synchronized void stop() {
233             if(future != null) {
234                 future.cancel(true);
235                 future=null;
236             }
237         }
238 
isRunning()239         public synchronized boolean isRunning() {
240             return future != null && !future.isDone() && !future.isCancelled();
241         }
242 
243 
findAndNotify()244         public void findAndNotify() {
245             if(lock.tryLock()) {
246                 try {
247                     _findAndNotify();
248                 }
249                 finally {
250                     lock.unlock();
251                 }
252             }
253         }
254 
_findAndNotify()255         private void _findAndNotify() {
256             List<PingData> discovery_rsps=findAllMembers();
257 
258             if(log.isTraceEnabled()) {
259                 StringBuilder sb=new StringBuilder();
260                 sb.append("Discovery results:\n");
261                 for(PingData data: discovery_rsps)
262                     sb.append("[" + data.getAddress() + "]: " + data.getView()).append("\n");
263                 log.trace(sb);
264             }
265 
266             // Create a map of senders and the views they sent
267             Map<Address,View> views=getViews(discovery_rsps);
268 
269             // A list of different views
270             List<View> different_views=detectDifferentViews(views);
271             if(different_views.size() <= 1) {
272                 num_inconsistent_views=0;
273                 return;
274             }
275             Collection<Address> merge_participants=Util.determineMergeParticipants(views);
276             if(merge_participants.size() == 1) {
277                 if(num_inconsistent_views < inconsistent_view_threshold) {
278                     if(log.isDebugEnabled())
279                         log.debug("dropping MERGE for inconsistent views " + Util.printViews(different_views) +
280                                 " as inconsistent view threshold (" + inconsistent_view_threshold +
281                                 ") has not yet been reached (" + num_inconsistent_views + ")");
282                     num_inconsistent_views++;
283                     return;
284                 }
285                 else
286                     num_inconsistent_views=0;
287             }
288             else
289                 num_inconsistent_views=0;
290 
291             if(log.isDebugEnabled()) {
292                 StringBuilder sb=new StringBuilder();
293                 sb.append(local_addr + " found different views : " + Util.printViews(different_views) +
294                         "; sending up MERGE event with merge participants " + merge_participants + ".\n");
295                 sb.append("Discovery results:\n");
296                 for(PingData data: discovery_rsps)
297                     sb.append("[" + data.getAddress() + "]: " + data.getView()).append("\n");
298                 log.debug(sb.toString());
299             }
300             Event evt=new Event(Event.MERGE, views);
301             try {
302                 up_prot.up(evt);
303             }
304             catch(Throwable t) {
305                 log.error("failed sending up MERGE event", t);
306             }
307         }
308 
309 
310         /**
311          * Returns a random value within [min_interval - max_interval]
312          */
computeInterval()313         long computeInterval() {
314             return min_interval + Util.random(max_interval - min_interval);
315         }
316 
317         /** Returns a list of PingData with only the view from members around the cluster */
318         @SuppressWarnings("unchecked")
findAllMembers()319         private List<PingData> findAllMembers() {
320             List<PingData> retval=(List<PingData>)down_prot.down(new Event(Event.FIND_ALL_MBRS));
321             if(retval == null) return Collections.emptyList();
322             if(is_coord && local_addr != null) {
323                 PingData tmp=new PingData(local_addr, view, true);
324                 //let's make sure that we add ourself as a coordinator
325                 if(!retval.contains(tmp))
326                     retval.add(tmp);
327             }
328             return retval;
329         }
330 
331 
332 
getViews(List<PingData> initial_mbrs)333         public Map<Address,View> getViews(List<PingData> initial_mbrs) {
334             Map<Address,View> retval=new HashMap<Address,View>();
335             for(PingData response: initial_mbrs) {
336                 if(!response.isServer())
337                     continue;
338                 Address sender=response.getAddress();
339                 View view=response.getView();
340                 if(sender == null || view == null)
341                     continue;
342                 retval.put(sender,view);
343             }
344             return retval;
345         }
346 
347 
detectDifferentViews(Map<Address,View> map)348         public List<View> detectDifferentViews(Map<Address,View> map) {
349             final List<View> ret=new ArrayList<View>();
350             for(View view: map.values()) {
351                 if(view == null)
352                     continue;
353                 ViewId vid=view.getVid();
354                 if(!Util.containsViewId(ret, vid))
355                     ret.add(view);
356             }
357             return ret;
358         }
359 
360 
361     }
362 }
363