1 package org.jgroups.util; 2 3 import org.jgroups.Address; 4 5 import java.util.Iterator; 6 import java.util.Map; 7 8 /** 9 * A mutable version of Digest (which is immutable 10 * @author Bela Ban 11 */ 12 public class MutableDigest extends Digest { 13 private boolean sealed=false; 14 15 /** Used for externalization */ MutableDigest()16 public MutableDigest() { 17 super(); 18 } 19 MutableDigest(int size)20 public MutableDigest(int size) { 21 super(size); 22 } 23 24 MutableDigest(Map<Address,Entry> map)25 public MutableDigest(Map<Address,Entry> map) { 26 super(map); 27 } 28 29 MutableDigest(Digest digest)30 public MutableDigest(Digest digest) { 31 super(digest.getSenders()); 32 } 33 34 getSenders()35 public Map<Address, Entry> getSenders() { 36 return senders; 37 } 38 add(Address sender, long low_seqno, long highest_delivered_seqno)39 public void add(Address sender, long low_seqno, long highest_delivered_seqno) { 40 checkSealed(); 41 add(sender, low_seqno, highest_delivered_seqno, -1); 42 } 43 44 add(Address sender, long low_seqno, long highest_delivered_seqno, long highest_received_seqno)45 public void add(Address sender, long low_seqno, long highest_delivered_seqno, long highest_received_seqno) { 46 checkSealed(); 47 add(sender, new Digest.Entry(low_seqno, highest_delivered_seqno, highest_received_seqno)); 48 } 49 add(Address sender, Entry entry)50 private void add(Address sender, Entry entry) { 51 if(sender == null || entry == null) { 52 if(log.isErrorEnabled()) 53 log.error("sender (" + sender + ") or entry (" + entry + ")is null, will not add entry"); 54 return; 55 } 56 checkSealed(); 57 senders.put(sender, entry); 58 } 59 60 add(Digest digest)61 public void add(Digest digest) { 62 if(digest != null) { 63 checkSealed(); 64 Map.Entry<Address,Entry> entry; 65 Address key; 66 Entry val; 67 for(Iterator<Map.Entry<Address,Entry>> it=digest.senders.entrySet().iterator(); it.hasNext();) { 68 entry=it.next(); 69 key=entry.getKey(); 70 val=entry.getValue(); 71 add(key, val.getLow(), val.getHighestDeliveredSeqno(), val.getHighestReceivedSeqno()); 72 } 73 } 74 } 75 replace(Digest d)76 public void replace(Digest d) { 77 if(d != null) { 78 clear(); 79 add(d); 80 } 81 } 82 set(Address sender, long low_seqno, long highest_delivered_seqno, long highest_received_seqno)83 public boolean set(Address sender, long low_seqno, long highest_delivered_seqno, long highest_received_seqno) { 84 checkSealed(); 85 Entry entry=senders.put(sender, new Entry(low_seqno, highest_delivered_seqno, highest_received_seqno)); 86 return entry == null; 87 } 88 89 /** 90 * Adds a digest to this digest. This digest must have enough space to add the other digest; otherwise an error 91 * message will be written. For each sender in the other digest, the merge() method will be called. 92 */ merge(Digest digest)93 public void merge(Digest digest) { 94 if(digest == null) { 95 if(log.isErrorEnabled()) log.error("digest to be merged with is null"); 96 return; 97 } 98 checkSealed(); 99 Map.Entry<Address,Entry> entry; 100 Address sender; 101 Entry val; 102 for(Iterator<Map.Entry<Address,Entry>> it=digest.senders.entrySet().iterator(); it.hasNext();) { 103 entry=it.next(); 104 sender=entry.getKey(); 105 val=entry.getValue(); 106 if(val != null) { 107 merge(sender, val.getLow(), val.getHighestDeliveredSeqno(), val.getHighestReceivedSeqno()); 108 } 109 } 110 } 111 112 113 /** 114 * Similar to add(), but if the sender already exists, its seqnos will be modified (no new entry) as follows: 115 * <ol> 116 * <li>this.low_seqno=min(this.low_seqno, low_seqno) 117 * <li>this.highest_delivered_seqno=max(this.highest_delivered_seqno, highest_delivered_seqno) 118 * <li>this.highest_received_seqno=max(this.highest_received_seqno, highest_received_seqno) 119 * </ol> 120 * If the sender doesn not exist, a new entry will be added (provided there is enough space) 121 */ merge(Address sender, long low_seqno, long highest_delivered_seqno, long highest_received_seqno)122 public void merge(Address sender, long low_seqno, long highest_delivered_seqno, long highest_received_seqno) { 123 if(sender == null) { 124 if(log.isErrorEnabled()) log.error("sender == null"); 125 return; 126 } 127 checkSealed(); 128 Entry entry=senders.get(sender); 129 if(entry == null) { 130 add(sender, low_seqno, highest_delivered_seqno, highest_received_seqno); 131 } 132 else { 133 Entry new_entry=new Entry(Math.min(entry.getLow(), low_seqno), 134 Math.max(entry.getHighestDeliveredSeqno(), highest_delivered_seqno), 135 Math.max(entry.getHighestReceivedSeqno(), highest_received_seqno)); 136 senders.put(sender, new_entry); 137 } 138 } 139 140 141 142 /** Increments the sender's highest delivered seqno by 1 */ incrementHighestDeliveredSeqno(Address sender)143 public void incrementHighestDeliveredSeqno(Address sender) { 144 Entry entry=senders.get(sender); 145 if(entry == null) 146 return; 147 checkSealed(); 148 149 long new_highest_delivered=entry.getHighestDeliveredSeqno() +1; 150 151 // highest_received must be >= highest_delivered, but not smaller ! 152 long new_highest_received=Math.max(entry.getHighestReceivedSeqno(), new_highest_delivered); 153 154 Entry new_entry=new Entry(entry.getLow(), new_highest_delivered, new_highest_received); 155 senders.put(sender, new_entry); 156 } 157 158 159 /** 160 * Resets the seqnos for the sender at 'index' to 0. This happens when a member has left the group, 161 * but it is still in the digest. Resetting its seqnos ensures that no-one will request a message 162 * retransmission from the dead member. 163 */ resetAt(Address sender)164 public void resetAt(Address sender) { 165 Entry entry=senders.get(sender); 166 if(entry != null) 167 checkSealed(); 168 senders.put(sender, new Entry()); 169 } 170 171 clear()172 public void clear() { 173 checkSealed(); 174 senders.clear(); 175 } 176 177 178 setHighestDeliveredAndSeenSeqnos(Address sender, long low_seqno, long highest_delivered_seqno, long highest_received_seqno)179 public void setHighestDeliveredAndSeenSeqnos(Address sender, long low_seqno, long highest_delivered_seqno, long highest_received_seqno) { 180 Entry entry=senders.get(sender); 181 if(entry != null) { 182 checkSealed(); 183 Entry new_entry=new Entry(low_seqno, highest_delivered_seqno, highest_received_seqno); 184 senders.put(sender, new_entry); 185 } 186 } 187 188 /** Seals the instance against modifications */ seal()189 public boolean seal() { 190 boolean retval=sealed; 191 sealed=true; 192 return retval; 193 } 194 195 checkSealed()196 private final void checkSealed() { 197 if(sealed) 198 throw new IllegalAccessError("instance has been sealed and cannot be modified"); 199 } 200 201 } 202