1 package org.jgroups.protocols; 2 3 4 import org.jgroups.*; 5 import org.jgroups.annotations.*; 6 import org.jgroups.stack.Protocol; 7 import org.jgroups.util.TimeScheduler; 8 import org.jgroups.util.Util; 9 10 import java.util.*; 11 import java.util.concurrent.Future; 12 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.locks.Lock; 14 import java.util.concurrent.locks.ReentrantLock; 15 16 17 /** 18 * Protocol to discover subgroups; e.g., existing due to a network partition (that healed). Example: group 19 * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send 20 * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time 21 * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the 22 * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done 23 * somewhere above this protocol (typically in the GMS protocol).<p> 24 * This protocol works as follows: 25 * <ul> 26 * <li>If coordinator: periodically retrieve the initial membership (using the FIND_INITIAL_MBRS event provided e.g. 27 * by PING or TCPPING protocols. This list contains {coord,addr} pairs. 28 * <li>If there is more than 1 coordinator: 29 * <ol> 30 * <li>Get all coordinators 31 * <li>Create a MERGE event with the list of coordinators as argument 32 * <li>Send the event up the stack 33 * </ol> 34 * </ul> 35 * 36 * <p> 37 * 38 * Requires: FIND_INITIAL_MBRS event from below<br> 39 * Provides: sends MERGE event with list of coordinators up the stack<br> 40 * @author Bela Ban, Oct 16 2001 41 */ 42 @MBean(description="Protocol to discover subgroups existing due to a network partition") 43 @DeprecatedProperty(names={"use_separate_thread"}) 44 public class MERGE2 extends Protocol { 45 46 47 /* ----------------------------------------- Properties -------------------------------------------------- */ 48 49 @Property(description="Minimum time in msbetween runs to discover other clusters") 50 protected long min_interval=5000; 51 52 @Property(description="Maximum time in ms between runs to discover other clusters") 53 protected long max_interval=20000; 54 55 @Property(description="Number of inconsistent views with only 1 coord after a MERGE event is sent up") 56 protected int inconsistent_view_threshold=1; 57 58 @Property(description="When receiving a multicast message, checks if the sender is member of the cluster. " + 59 "If not, initiates a merge") 60 protected boolean merge_fast=true; 61 62 @Property(description="The delay (in milliseconds) after which a merge fast execution is started") 63 protected long merge_fast_delay=1000; 64 65 /* ---------------------------------------------- JMX -------------------------------------------------------- */ 66 @ManagedAttribute(writable=false, description="whether or not a merge task is currently running " + 67 "(should be the case in a coordinator") isMergeTaskRunning()68 public boolean isMergeTaskRunning() { 69 return task.isRunning(); 70 } 71 72 /* --------------------------------------------- Fields ------------------------------------------------------ */ 73 74 private Address local_addr=null; 75 76 private View view; 77 78 private final Set<Address> members=new HashSet<Address>(); 79 80 private final Set<Address> merge_candidates=new HashSet<Address>(); 81 82 private final FindSubgroupsTask task=new FindSubgroupsTask(); 83 84 private volatile boolean is_coord=false; 85 86 private TimeScheduler timer; 87 88 @ManagedAttribute(description="Number of inconsistent 1-coord views until a MERGE event is sent up the stack") 89 private int num_inconsistent_views=0; 90 91 92 MERGE2()93 public MERGE2() { 94 } 95 96 init()97 public void init() throws Exception { 98 timer=getTransport().getTimer(); 99 if(timer == null) 100 throw new Exception("timer cannot be retrieved"); 101 102 if(min_interval <= 0 || max_interval <= 0) { 103 throw new Exception("min_interval and max_interval have to be > 0"); 104 } 105 106 if(max_interval <= min_interval) { 107 throw new Exception ("max_interval has to be greater than min_interval"); 108 } 109 } 110 getMinInterval()111 public long getMinInterval() { 112 return min_interval; 113 } 114 setMinInterval(long i)115 public void setMinInterval(long i) { 116 min_interval=i; 117 } 118 getMaxInterval()119 public long getMaxInterval() { 120 return max_interval; 121 } 122 setMaxInterval(long l)123 public void setMaxInterval(long l) { 124 max_interval=l; 125 } 126 requiredDownServices()127 public Vector<Integer> requiredDownServices() { 128 Vector<Integer> retval=new Vector<Integer>(1); 129 retval.addElement(new Integer(Event.FIND_INITIAL_MBRS)); 130 return retval; 131 } 132 133 /** Discovers members and detects whether we have multiple coordinator. If so, kicks off a merge */ 134 @ManagedOperation sendMergeSolicitation()135 public void sendMergeSolicitation() { 136 task.findAndNotify(); 137 } 138 startMergeTask()139 @ManagedOperation public void startMergeTask() {task.start();} 140 stopMergeTask()141 @ManagedOperation public void stopMergeTask() {task.stop();} 142 stop()143 public void stop() { 144 is_coord=false; 145 merge_candidates.clear(); 146 task.stop(); 147 } 148 149 down(Event evt)150 public Object down(Event evt) { 151 switch(evt.getType()) { 152 153 case Event.VIEW_CHANGE: 154 Object ret=down_prot.down(evt); 155 view=(View)evt.getArg(); 156 Vector<Address> mbrs=view.getMembers(); 157 if(mbrs == null || mbrs.isEmpty() || local_addr == null) { 158 task.stop(); 159 return ret; 160 } 161 members.clear(); 162 members.addAll(mbrs); 163 merge_candidates.removeAll(members); 164 Address coord=mbrs.elementAt(0); 165 if(coord.equals(local_addr)) { 166 is_coord=true; 167 task.start(); // start task if we became coordinator (doesn't start if already running) 168 } 169 else { 170 // if we were coordinator, but are no longer, stop task. this happens e.g. when we merge and someone 171 // else becomes the new coordinator of the merged group 172 if(is_coord) { 173 is_coord=false; 174 } 175 task.stop(); 176 } 177 return ret; 178 179 case Event.SET_LOCAL_ADDRESS: 180 local_addr=(Address)evt.getArg(); 181 return down_prot.down(evt); 182 183 default: 184 return down_prot.down(evt); // Pass on to the layer below us 185 } 186 } 187 up(Event evt)188 public Object up(Event evt) { 189 switch(evt.getType()) { 190 case Event.MSG: 191 if(!merge_fast) 192 break; 193 Message msg=(Message)evt.getArg(); 194 Address dest=msg.getDest(); 195 boolean multicast=dest == null || dest.isMulticastAddress(); 196 if(!multicast) 197 break; 198 final Address sender=msg.getSrc(); 199 if(!members.contains(sender) && merge_candidates.add(sender)) { 200 timer.schedule(new Runnable() { 201 public void run() { 202 if(!members.contains(sender)) 203 task.findAndNotify(); 204 } 205 }, merge_fast_delay, TimeUnit.MILLISECONDS); 206 } 207 break; 208 } 209 return up_prot.up(evt); 210 } 211 212 /** 213 * Task periodically executing (if role is coordinator). Gets the initial membership and determines 214 * whether there are subgroups (multiple coordinators for the same group). If yes, it sends a MERGE event 215 * with the list of the coordinators up the stack 216 */ 217 private class FindSubgroupsTask { 218 @GuardedBy("this") 219 private Future<?> future; 220 private Lock lock=new ReentrantLock(); 221 start()222 public synchronized void start() { 223 if(future == null || future.isDone() || future.isCancelled()) { 224 future=timer.scheduleWithFixedDelay(new Runnable() { 225 public void run() { 226 findAndNotify(); 227 } 228 }, Math.max(5000L, computeInterval()), computeInterval(), TimeUnit.MILLISECONDS); 229 } 230 } 231 stop()232 public synchronized void stop() { 233 if(future != null) { 234 future.cancel(true); 235 future=null; 236 } 237 } 238 isRunning()239 public synchronized boolean isRunning() { 240 return future != null && !future.isDone() && !future.isCancelled(); 241 } 242 243 findAndNotify()244 public void findAndNotify() { 245 if(lock.tryLock()) { 246 try { 247 _findAndNotify(); 248 } 249 finally { 250 lock.unlock(); 251 } 252 } 253 } 254 _findAndNotify()255 private void _findAndNotify() { 256 List<PingData> discovery_rsps=findAllMembers(); 257 258 if(log.isTraceEnabled()) { 259 StringBuilder sb=new StringBuilder(); 260 sb.append("Discovery results:\n"); 261 for(PingData data: discovery_rsps) 262 sb.append("[" + data.getAddress() + "]: " + data.getView()).append("\n"); 263 log.trace(sb); 264 } 265 266 // Create a map of senders and the views they sent 267 Map<Address,View> views=getViews(discovery_rsps); 268 269 // A list of different views 270 List<View> different_views=detectDifferentViews(views); 271 if(different_views.size() <= 1) { 272 num_inconsistent_views=0; 273 return; 274 } 275 Collection<Address> merge_participants=Util.determineMergeParticipants(views); 276 if(merge_participants.size() == 1) { 277 if(num_inconsistent_views < inconsistent_view_threshold) { 278 if(log.isDebugEnabled()) 279 log.debug("dropping MERGE for inconsistent views " + Util.printViews(different_views) + 280 " as inconsistent view threshold (" + inconsistent_view_threshold + 281 ") has not yet been reached (" + num_inconsistent_views + ")"); 282 num_inconsistent_views++; 283 return; 284 } 285 else 286 num_inconsistent_views=0; 287 } 288 else 289 num_inconsistent_views=0; 290 291 if(log.isDebugEnabled()) { 292 StringBuilder sb=new StringBuilder(); 293 sb.append(local_addr + " found different views : " + Util.printViews(different_views) + 294 "; sending up MERGE event with merge participants " + merge_participants + ".\n"); 295 sb.append("Discovery results:\n"); 296 for(PingData data: discovery_rsps) 297 sb.append("[" + data.getAddress() + "]: " + data.getView()).append("\n"); 298 log.debug(sb.toString()); 299 } 300 Event evt=new Event(Event.MERGE, views); 301 try { 302 up_prot.up(evt); 303 } 304 catch(Throwable t) { 305 log.error("failed sending up MERGE event", t); 306 } 307 } 308 309 310 /** 311 * Returns a random value within [min_interval - max_interval] 312 */ computeInterval()313 long computeInterval() { 314 return min_interval + Util.random(max_interval - min_interval); 315 } 316 317 /** Returns a list of PingData with only the view from members around the cluster */ 318 @SuppressWarnings("unchecked") findAllMembers()319 private List<PingData> findAllMembers() { 320 List<PingData> retval=(List<PingData>)down_prot.down(new Event(Event.FIND_ALL_MBRS)); 321 if(retval == null) return Collections.emptyList(); 322 if(is_coord && local_addr != null) { 323 PingData tmp=new PingData(local_addr, view, true); 324 //let's make sure that we add ourself as a coordinator 325 if(!retval.contains(tmp)) 326 retval.add(tmp); 327 } 328 return retval; 329 } 330 331 332 getViews(List<PingData> initial_mbrs)333 public Map<Address,View> getViews(List<PingData> initial_mbrs) { 334 Map<Address,View> retval=new HashMap<Address,View>(); 335 for(PingData response: initial_mbrs) { 336 if(!response.isServer()) 337 continue; 338 Address sender=response.getAddress(); 339 View view=response.getView(); 340 if(sender == null || view == null) 341 continue; 342 retval.put(sender,view); 343 } 344 return retval; 345 } 346 347 detectDifferentViews(Map<Address,View> map)348 public List<View> detectDifferentViews(Map<Address,View> map) { 349 final List<View> ret=new ArrayList<View>(); 350 for(View view: map.values()) { 351 if(view == null) 352 continue; 353 ViewId vid=view.getVid(); 354 if(!Util.containsViewId(ret, vid)) 355 ret.add(view); 356 } 357 return ret; 358 } 359 360 361 } 362 } 363