1 package org.jgroups.util; 2 3 import org.jgroups.logging.Log; 4 import org.jgroups.logging.LogFactory; 5 import org.jgroups.Address; 6 import org.jgroups.Global; 7 import org.jgroups.annotations.Immutable; 8 9 import java.io.*; 10 import static java.lang.Math.max; 11 import java.util.*; 12 import java.util.concurrent.ConcurrentHashMap; 13 14 15 /** 16 * A message digest, which is used by the PBCAST layer for gossiping (also used by NAKACK for 17 * keeping track of current seqnos for all members). It contains pairs of senders and a range of seqnos 18 * (low and high), where each sender is associated with its highest and lowest seqnos seen so far. That 19 * is, the lowest seqno which was not yet garbage-collected and the highest that was seen so far and is 20 * deliverable (or was already delivered) to the application. A range of [0 - 0] means no messages have 21 * been received yet. 22 * <p> April 3 2001 (bela): Added high_seqnos_seen member. It is used to disseminate 23 * information about the last (highest) message M received from a sender P. Since we might be using a 24 * negative acknowledgment message numbering scheme, we would never know if the last message was 25 * lost. Therefore we periodically gossip and include the last message seqno. Members who haven't seen 26 * it (e.g. because msg was dropped) will request a retransmission. See DESIGN for details. 27 * @author Bela Ban 28 */ 29 public class Digest implements Externalizable, Streamable { 30 31 public static final Digest EMPTY_DIGEST = new Digest(); 32 /** Map<Address, Entry> */ 33 protected final Map<Address,Entry> senders; 34 35 protected static final Log log=LogFactory.getLog(Digest.class); 36 private static final long serialVersionUID=6611464897656359215L; 37 38 39 /** Used for externalization */ Digest()40 public Digest() { 41 senders=createSenders(7); 42 } 43 Digest(int size)44 public Digest(int size) { 45 senders=createSenders(size); 46 } 47 48 /** Creates a new digest from an existing map by copying the keys and values from map */ Digest(Map<Address, Entry> map)49 public Digest(Map<Address, Entry> map) { 50 senders=createSenders(map); 51 } 52 53 Digest(Digest d)54 public Digest(Digest d) { 55 this(d.senders); 56 } 57 58 Digest(Address sender, long low, long highest_delivered, long highest_received)59 public Digest(Address sender, long low, long highest_delivered, long highest_received) { 60 senders=createSenders(1); 61 senders.put(sender, new Entry(low, highest_delivered, highest_received)); 62 } 63 Digest(Address sender, long low, long highest_delivered)64 public Digest(Address sender, long low, long highest_delivered) { 65 senders=createSenders(1); 66 senders.put(sender, new Entry(low, highest_delivered)); 67 } 68 69 /** Returns an unmodifiable map, so modifications will result in exceptions */ getSenders()70 public Map<Address, Entry> getSenders() { 71 return Collections.unmodifiableMap(senders); 72 } 73 equals(Object obj)74 public boolean equals(Object obj) { 75 if(!(obj instanceof Digest)) 76 return false; 77 Digest other=(Digest)obj; 78 return senders.equals(other.senders); 79 } 80 81 contains(Address sender)82 public boolean contains(Address sender) { 83 return senders.containsKey(sender); 84 } 85 86 /** Returns the Entry for the given sender. Note that Entry is immutable */ get(Address sender)87 public Entry get(Address sender) { 88 return senders.get(sender); 89 } 90 91 92 /** 93 * Compares two digests and returns true if the senders are the same, otherwise false. 94 * @param other 95 * @return True if senders are the same, otherwise false. 96 */ sameSenders(Digest other)97 public boolean sameSenders(Digest other) { 98 if(other == null) return false; 99 if(this.senders.size() != other.senders.size()) return false; 100 101 Set<Address> my_senders=senders.keySet(), other_senders=other.senders.keySet(); 102 return my_senders.equals(other_senders); 103 } 104 difference(Digest other)105 public Digest difference(Digest other) { 106 if(other == null) return copy(); 107 108 Digest result=EMPTY_DIGEST; 109 if(this.equals(other)) { 110 return result; 111 } 112 else { 113 //find intersection and compare their entries 114 Map<Address, Entry> resultMap=new ConcurrentHashMap<Address, Entry>(7); 115 Set<Address> intersection=new TreeSet<Address>(this.senders.keySet()); 116 intersection.retainAll(other.senders.keySet()); 117 118 for(Address address : intersection) { 119 Entry e1=this.get(address); 120 Entry e2=other.get(address); 121 if(e1.getHighestDeliveredSeqno() != e2.getHighestDeliveredSeqno()) { 122 long low=Math.min(e1.highest_delivered_seqno, e2.highest_delivered_seqno); 123 long high=max(e1.highest_delivered_seqno, e2.highest_delivered_seqno); 124 Entry r=new Entry(low, high); 125 resultMap.put(address, r); 126 } 127 } 128 129 //any entries left in (this - intersection)? 130 //if yes, add them to result 131 if(intersection.size() != this.senders.keySet().size()) { 132 Set<Address> thisMinusInteresection=new TreeSet<Address>(this.senders.keySet()); 133 thisMinusInteresection.removeAll(intersection); 134 for(Address address : thisMinusInteresection) { 135 resultMap.put(address, new Entry(this.get(address))); 136 } 137 } 138 139 //any entries left in (other - intersection)? 140 //if yes, add them to result 141 if(intersection.size() != other.senders.keySet().size()) { 142 Set<Address> otherMinusInteresection=new TreeSet<Address>(other.senders.keySet()); 143 otherMinusInteresection.removeAll(intersection); 144 for(Address address : otherMinusInteresection) { 145 resultMap.put(address, new Entry(other.get(address))); 146 } 147 } 148 result=new Digest(resultMap); 149 } 150 return result; 151 } 152 highestSequence(Digest other)153 public Digest highestSequence(Digest other) { 154 if(other == null) return copy(); 155 156 Digest result=EMPTY_DIGEST; 157 if(this.equals(other)) { 158 return this; 159 } 160 else { 161 //find intersection and compare their entries 162 Map<Address, Entry> resultMap=new ConcurrentHashMap<Address, Entry>(7); 163 Set<Address> intersection=new TreeSet<Address>(this.senders.keySet()); 164 intersection.retainAll(other.senders.keySet()); 165 166 for(Address address : intersection) { 167 Entry e1=this.get(address); 168 Entry e2=other.get(address); 169 170 long high=max(e1.highest_delivered_seqno, e2.highest_delivered_seqno); 171 Entry r=new Entry(0, high); 172 resultMap.put(address, r); 173 } 174 175 //any entries left in (this - intersection)? 176 //if yes, add them to result 177 if(intersection.size() != this.senders.keySet().size()) { 178 Set<Address> thisMinusInteresection=new TreeSet<Address>(this.senders.keySet()); 179 thisMinusInteresection.removeAll(intersection); 180 for(Address address : thisMinusInteresection) { 181 resultMap.put(address, new Entry(this.get(address))); 182 } 183 } 184 185 //any entries left in (other - intersection)? 186 //if yes, add them to result 187 if(intersection.size() != other.senders.keySet().size()) { 188 Set<Address> otherMinusInteresection=new TreeSet<Address>(other.senders.keySet()); 189 otherMinusInteresection.removeAll(intersection); 190 for(Address address : otherMinusInteresection) { 191 resultMap.put(address, new Entry(other.get(address))); 192 } 193 } 194 result=new Digest(resultMap); 195 } 196 return result; 197 } 198 199 size()200 public int size() { 201 return senders.size(); 202 } 203 204 lowSeqnoAt(Address sender)205 public long lowSeqnoAt(Address sender) { 206 Entry entry=senders.get(sender); 207 if(entry == null) 208 return -1; 209 else 210 return entry.low_seqno; 211 } 212 213 highestDeliveredSeqnoAt(Address sender)214 public long highestDeliveredSeqnoAt(Address sender) { 215 Entry entry=senders.get(sender); 216 if(entry == null) 217 return -1; 218 else 219 return entry.highest_delivered_seqno; 220 } 221 222 highestReceivedSeqnoAt(Address sender)223 public long highestReceivedSeqnoAt(Address sender) { 224 Entry entry=senders.get(sender); 225 if(entry == null) 226 return -1; 227 else 228 return entry.highest_received_seqno; 229 } 230 231 232 /** 233 * Returns true if all senders of the current digest have their seqnos >= the ones from other 234 * @param other 235 * @return 236 */ isGreaterThanOrEqual(Digest other)237 public boolean isGreaterThanOrEqual(Digest other) { 238 if(other == null) 239 return true; 240 Map<Address,Entry> our_map=getSenders(); 241 Address sender; 242 Entry my_entry, their_entry; 243 long my_highest, their_highest; 244 for(Map.Entry<Address,Entry> entry: our_map.entrySet()) { 245 sender=entry.getKey(); 246 my_entry=entry.getValue(); 247 their_entry=other.get(sender); 248 if(their_entry == null) 249 continue; 250 my_highest=my_entry.getHighest(); 251 their_highest=their_entry.getHighest(); 252 if(my_highest < their_highest) 253 return false; 254 } 255 return true; 256 } 257 258 copy()259 public Digest copy() { 260 return new Digest(senders); 261 } 262 263 toString()264 public String toString() { 265 StringBuilder sb=new StringBuilder(); 266 boolean first=true; 267 if(senders.isEmpty()) return "[]"; 268 269 for(Map.Entry<Address,Entry> entry: senders.entrySet()) { 270 Address key=entry.getKey(); 271 Entry val=entry.getValue(); 272 if(!first) 273 sb.append(", "); 274 else 275 first=false; 276 sb.append(key).append(": ").append('[').append(val.low_seqno).append(" : "); 277 sb.append(val.highest_delivered_seqno); 278 if(val.highest_received_seqno >= 0) 279 sb.append(" (").append(val.highest_received_seqno).append(")"); 280 sb.append("]"); 281 } 282 return sb.toString(); 283 } 284 toStringSorted()285 public String toStringSorted() { 286 StringBuilder sb=new StringBuilder(); 287 boolean first=true; 288 if(senders.isEmpty()) return "[]"; 289 290 TreeMap<Address,Entry> copy=new TreeMap<Address,Entry>(senders); 291 for(Map.Entry<Address,Entry> entry: copy.entrySet()) { 292 Address key=entry.getKey(); 293 Entry val=entry.getValue(); 294 if(!first) 295 sb.append(", "); 296 else 297 first=false; 298 sb.append(key).append(": ").append('[').append(val.low_seqno).append(" : "); 299 sb.append(val.highest_delivered_seqno); 300 if(val.highest_received_seqno >= 0) 301 sb.append(" (").append(val.highest_received_seqno).append(")"); 302 sb.append("]"); 303 } 304 return sb.toString(); 305 } 306 307 printHighestDeliveredSeqnos()308 public String printHighestDeliveredSeqnos() { 309 StringBuilder sb=new StringBuilder("["); 310 boolean first=true; 311 312 TreeMap<Address,Entry> copy=new TreeMap<Address,Entry>(senders); 313 for(Map.Entry<Address,Entry> entry: copy.entrySet()) { 314 Address key=entry.getKey(); 315 Entry val=entry.getValue(); 316 if(!first) 317 sb.append(", "); 318 else 319 first=false; 320 sb.append(key).append("#").append(val.highest_delivered_seqno); 321 } 322 sb.append(']'); 323 return sb.toString(); 324 } 325 326 printHighestReceivedSeqnos()327 public String printHighestReceivedSeqnos() { 328 StringBuilder sb=new StringBuilder(); 329 boolean first=true; 330 331 for(Map.Entry<Address,Entry> entry: senders.entrySet()) { 332 Address key=entry.getKey(); 333 Entry val=entry.getValue(); 334 if(!first) 335 sb.append(", "); 336 else { 337 sb.append('['); 338 first=false; 339 } 340 sb.append(key).append("#").append(val.highest_received_seqno); 341 } 342 sb.append(']'); 343 return sb.toString(); 344 } 345 346 writeExternal(ObjectOutput out)347 public void writeExternal(ObjectOutput out) throws IOException { 348 out.writeObject(senders); 349 } 350 readExternal(ObjectInput in)351 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 352 Map<Address, Entry> tmp=(Map<Address, Entry>)in.readObject(); 353 senders.clear(); 354 senders.putAll(tmp); 355 } 356 writeTo(DataOutputStream out)357 public void writeTo(DataOutputStream out) throws IOException { 358 out.writeShort(senders.size()); 359 for(Map.Entry<Address,Entry> entry: senders.entrySet()) { 360 Entry val=entry.getValue(); 361 Util.writeAddress(entry.getKey(), out); 362 out.writeLong(val.low_seqno); 363 out.writeLong(val.highest_delivered_seqno); 364 out.writeLong(val.highest_received_seqno); 365 } 366 } 367 368 readFrom(DataInputStream in)369 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { 370 short size=in.readShort(); 371 Map<Address,Entry> tmp=new HashMap<Address, Entry>(size); 372 Address key; 373 for(int i=0; i < size; i++) { 374 key=Util.readAddress(in); 375 tmp.put(key, new Entry(in.readLong(), in.readLong(), in.readLong())); 376 } 377 senders.clear(); 378 senders.putAll(tmp); 379 } 380 381 serializedSize()382 public long serializedSize() { 383 long retval=Global.SHORT_SIZE; // number of elements in 'senders' 384 if(!senders.isEmpty()) { 385 Address addr=senders.keySet().iterator().next(); 386 int len=Util.size(addr); 387 len+=Entry.SIZE; // 3 longs in one Entry 388 retval+=len * senders.size(); 389 } 390 return retval; 391 } 392 createSenders(int size)393 private static Map<Address, Entry> createSenders(int size) { 394 return new ConcurrentHashMap<Address,Entry>(size); 395 } 396 createSenders(Map<Address, Entry> map)397 private static Map<Address, Entry> createSenders(Map<Address, Entry> map) { 398 return new ConcurrentHashMap<Address,Entry>(map); 399 } 400 401 402 403 /** 404 * Class keeping track of the lowest and highest sequence numbers delivered, and the highest 405 * sequence numbers received, per member. This class is immutable 406 */ 407 @Immutable 408 public static class Entry implements Externalizable, Streamable { 409 private long low_seqno=0; 410 private long highest_delivered_seqno=0; // the highest delivered seqno, e.g. in 1,2,4,5,7 --> 2 411 private long highest_received_seqno=0; //the highest received seqno, e.g. in 1,2,4,5,7 --> 7 412 final static int SIZE=Global.LONG_SIZE * 3; 413 private static final long serialVersionUID=-4468945932249281704L; 414 Entry()415 public Entry() { 416 } 417 Entry(long low_seqno, long highest_delivered_seqno, long highest_received_seqno)418 public Entry(long low_seqno, long highest_delivered_seqno, long highest_received_seqno) { 419 this.low_seqno=low_seqno; 420 this.highest_delivered_seqno=highest_delivered_seqno; 421 this.highest_received_seqno=highest_received_seqno; 422 check(); 423 } 424 425 426 Entry(long low_seqno, long highest_delivered_seqno)427 public Entry(long low_seqno, long highest_delivered_seqno) { 428 this.low_seqno=low_seqno; 429 this.highest_delivered_seqno=highest_delivered_seqno; 430 check(); 431 } 432 Entry(Entry other)433 public Entry(Entry other) { 434 if(other != null) { 435 low_seqno=other.low_seqno; 436 highest_delivered_seqno=other.highest_delivered_seqno; 437 highest_received_seqno=other.highest_received_seqno; 438 check(); 439 } 440 } 441 getLow()442 public final long getLow() {return low_seqno;} getHighestDeliveredSeqno()443 public final long getHighestDeliveredSeqno() {return highest_delivered_seqno;} getHighestReceivedSeqno()444 public final long getHighestReceivedSeqno() {return highest_received_seqno;} 445 446 /** Return the max of the highest delivered or highest received seqno */ getHighest()447 public final long getHighest() {return max(highest_delivered_seqno, highest_received_seqno);} 448 equals(Object obj)449 public boolean equals(Object obj) { 450 if(!(obj instanceof Entry)) 451 return false; 452 Entry other=(Entry)obj; 453 return low_seqno == other.low_seqno && highest_delivered_seqno == other.highest_delivered_seqno && highest_received_seqno == other.highest_received_seqno; 454 } 455 hashCode()456 public int hashCode() { 457 return (int)(low_seqno + highest_delivered_seqno + highest_received_seqno); 458 } 459 toString()460 public String toString() { 461 return new StringBuilder("low=").append(low_seqno).append(", highest delivered=").append(highest_delivered_seqno). 462 append(", highest received=").append(highest_received_seqno).toString(); 463 } 464 writeExternal(ObjectOutput out)465 public void writeExternal(ObjectOutput out) throws IOException { 466 out.writeLong(low_seqno); 467 out.writeLong(highest_delivered_seqno); 468 out.writeLong(highest_received_seqno); 469 } 470 readExternal(ObjectInput in)471 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 472 low_seqno=in.readLong(); 473 highest_delivered_seqno=in.readLong(); 474 highest_received_seqno=in.readLong(); 475 } 476 size()477 public static int size() { 478 return SIZE; 479 } 480 writeTo(DataOutputStream out)481 public void writeTo(DataOutputStream out) throws IOException { 482 out.writeLong(low_seqno); 483 out.writeLong(highest_delivered_seqno); 484 out.writeLong(highest_received_seqno); 485 } 486 readFrom(DataInputStream in)487 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { 488 low_seqno=in.readLong(); 489 highest_delivered_seqno=in.readLong(); 490 highest_received_seqno=in.readLong(); 491 } 492 493 check()494 private void check() { 495 if(low_seqno > highest_delivered_seqno) 496 throw new IllegalArgumentException("low_seqno (" + low_seqno + ") is greater than highest_delivered_seqno (" + highest_delivered_seqno + ")"); 497 } 498 499 500 } 501 } 502