1 2 package org.jgroups.protocols.pbcast; 3 4 import org.jgroups.*; 5 import org.jgroups.logging.Log; 6 import org.jgroups.util.Digest; 7 import org.jgroups.util.MergeId; 8 9 import java.util.Collection; 10 import java.util.Map; 11 import java.util.Vector; 12 13 14 public abstract class GmsImpl { 15 protected final GMS gms; 16 protected final Merger merger; 17 protected final Log log; 18 volatile boolean leaving=false; 19 20 GmsImpl(GMS gms)21 protected GmsImpl(GMS gms) { 22 this.gms=gms; 23 merger=gms.merger; 24 log=gms.getLog(); 25 } 26 join(Address mbr, boolean useFlushIfPresent)27 public abstract void join(Address mbr, boolean useFlushIfPresent); joinWithStateTransfer(Address local_addr,boolean useFlushIfPresent)28 public abstract void joinWithStateTransfer(Address local_addr,boolean useFlushIfPresent); 29 leave(Address mbr)30 public abstract void leave(Address mbr); 31 handleJoinResponse(JoinRsp join_rsp)32 public void handleJoinResponse(JoinRsp join_rsp) {} handleLeaveResponse()33 public void handleLeaveResponse() {} 34 suspect(Address mbr)35 public void suspect(Address mbr) {} unsuspect(Address mbr)36 public void unsuspect(Address mbr) {} 37 merge(Map<Address,View> views)38 public void merge(Map<Address,View> views) {} // only processed by coord handleMergeRequest(Address sender, MergeId merge_id, Collection<? extends Address> mbrs)39 public void handleMergeRequest(Address sender, MergeId merge_id, Collection<? extends Address> mbrs) {} // only processed by coords handleMergeResponse(MergeData data, MergeId merge_id)40 public void handleMergeResponse(MergeData data, MergeId merge_id) {} // only processed by coords handleMergeView(MergeData data, MergeId merge_id)41 public void handleMergeView(MergeData data, MergeId merge_id) {} // only processed by coords handleMergeCancelled(MergeId merge_id)42 public void handleMergeCancelled(MergeId merge_id) {} // only processed by coords handleDigestResponse(Address sender, Digest digest)43 public void handleDigestResponse(Address sender, Digest digest) {} // only processed by coords 44 handleMembershipChange(Collection<Request> requests)45 public void handleMembershipChange(Collection<Request> requests) {} handleViewChange(View new_view, Digest digest)46 public void handleViewChange(View new_view, Digest digest) {} 47 init()48 public void init() throws Exception {leaving=false;} start()49 public void start() throws Exception {leaving=false;} stop()50 public void stop() {leaving=true;} 51 52 53 sendMergeRejectedResponse(Address sender, MergeId merge_id)54 protected void sendMergeRejectedResponse(Address sender, MergeId merge_id) { 55 Message msg=new Message(sender, null, null); 56 msg.setFlag(Message.OOB); 57 GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP); 58 hdr.merge_rejected=true; 59 hdr.merge_id=merge_id; 60 msg.putHeader(gms.getId(), hdr); 61 if(log.isDebugEnabled()) log.debug("merge response=" + hdr); 62 gms.getDownProtocol().down(new Event(Event.MSG, msg)); 63 } 64 65 wrongMethod(String method_name)66 protected void wrongMethod(String method_name) { 67 if(log.isWarnEnabled()) 68 log.warn(method_name + "() should not be invoked on an instance of " + getClass().getName()); 69 } 70 71 72 73 /** 74 Returns potential coordinator based on lexicographic ordering of member addresses. Another 75 approach would be to keep track of the primary partition and return the first member if we 76 are the primary partition. 77 */ iWouldBeCoordinator(Vector<Address> new_mbrs)78 protected boolean iWouldBeCoordinator(Vector<Address> new_mbrs) { 79 Membership tmp_mbrs=gms.members.copy(); 80 tmp_mbrs.merge(new_mbrs, null); 81 tmp_mbrs.sort(); 82 return !(tmp_mbrs.size() <= 0 || gms.local_addr == null) && gms.local_addr.equals(tmp_mbrs.elementAt(0)); 83 } 84 85 86 public static class Request { 87 static final int JOIN = 1; 88 static final int LEAVE = 2; 89 static final int SUSPECT = 3; 90 static final int MERGE = 4; 91 static final int JOIN_WITH_STATE_TRANSFER = 6; 92 93 94 int type=-1; 95 Address mbr; 96 boolean suspected; 97 Map<Address,View> views; // different view on MERGE 98 boolean useFlushIfPresent; 99 100 Request(int type, Address mbr, boolean suspected)101 Request(int type, Address mbr, boolean suspected) { 102 this.type=type; 103 this.mbr=mbr; 104 this.suspected=suspected; 105 } 106 Request(int type, Address mbr, boolean suspected, Map<Address,View> views, boolean useFlushPresent)107 Request(int type, Address mbr, boolean suspected, Map<Address,View> views, boolean useFlushPresent) { 108 this(type, mbr, suspected); 109 this.views=views; 110 this.useFlushIfPresent=useFlushPresent; 111 } 112 Request(int type, Address mbr, boolean suspected, Map<Address,View> views)113 Request(int type, Address mbr, boolean suspected, Map<Address,View> views) { 114 this(type, mbr, suspected, views, true); 115 } 116 getType()117 public int getType() { 118 return type; 119 } 120 toString()121 public String toString() { 122 switch(type) { 123 case JOIN: return "JOIN(" + mbr + ")"; 124 case JOIN_WITH_STATE_TRANSFER: return "JOIN_WITH_STATE_TRANSFER(" + mbr + ")"; 125 case LEAVE: return "LEAVE(" + mbr + ", " + suspected + ")"; 126 case SUSPECT: return "SUSPECT(" + mbr + ")"; 127 case MERGE: return "MERGE(" + views.size() + " views)"; 128 } 129 return "<invalid (type=" + type + ")"; 130 } 131 132 /** 133 * Specifies whether this request can be processed with other request simultaneously 134 */ canBeProcessedTogether(Request other)135 public boolean canBeProcessedTogether(Request other) { 136 if(other == null) 137 return false; 138 int other_type=other.type; 139 return (type == JOIN || type == LEAVE || type == SUSPECT) && 140 (other_type == JOIN || other_type == LEAVE || other_type == SUSPECT); 141 } 142 } 143 144 } 145