1 package org.jgroups.util;
2 
3 import org.jgroups.logging.Log;
4 import org.jgroups.logging.LogFactory;
5 import org.jgroups.Address;
6 import org.jgroups.Global;
7 import org.jgroups.annotations.Immutable;
8 
9 import java.io.*;
10 import static java.lang.Math.max;
11 import java.util.*;
12 import java.util.concurrent.ConcurrentHashMap;
13 
14 
15 /**
16  * A message digest, which is used by the PBCAST layer for gossiping (also used by NAKACK for
17  * keeping track of current seqnos for all members). It contains pairs of senders and a range of seqnos
18  * (low and high), where each sender is associated with its highest and lowest seqnos seen so far.  That
19  * is, the lowest seqno which was not yet garbage-collected and the highest that was seen so far and is
20  * deliverable (or was already delivered) to the application.  A range of [0 - 0] means no messages have
21  * been received yet.
22  * <p> April 3 2001 (bela): Added high_seqnos_seen member. It is used to disseminate
23  * information about the last (highest) message M received from a sender P. Since we might be using a
24  * negative acknowledgment message numbering scheme, we would never know if the last message was
25  * lost. Therefore we periodically gossip and include the last message seqno. Members who haven't seen
26  * it (e.g. because msg was dropped) will request a retransmission. See DESIGN for details.
27  * @author Bela Ban
28  */
29 public class Digest implements Externalizable, Streamable {
30 
31 	public static final Digest EMPTY_DIGEST = new Digest();
32     /** Map&lt;Address, Entry> */
33     protected final Map<Address,Entry> senders;
34 
35     protected static final Log log=LogFactory.getLog(Digest.class);
36     private static final long serialVersionUID=6611464897656359215L;
37 
38 
39     /** Used for externalization */
Digest()40     public Digest() {
41        senders=createSenders(7);
42     }
43 
Digest(int size)44     public Digest(int size) {
45         senders=createSenders(size);
46     }
47 
48     /** Creates a new digest from an existing map by copying the keys and values from map */
Digest(Map<Address, Entry> map)49     public Digest(Map<Address, Entry> map) {
50         senders=createSenders(map);
51     }
52 
53 
Digest(Digest d)54     public Digest(Digest d) {
55         this(d.senders);
56     }
57 
58 
Digest(Address sender, long low, long highest_delivered, long highest_received)59     public Digest(Address sender, long low, long highest_delivered, long highest_received) {
60         senders=createSenders(1);
61         senders.put(sender, new Entry(low, highest_delivered, highest_received));
62     }
63 
Digest(Address sender, long low, long highest_delivered)64     public Digest(Address sender, long low, long highest_delivered) {
65         senders=createSenders(1);
66         senders.put(sender, new Entry(low, highest_delivered));
67     }
68 
69     /** Returns an unmodifiable map, so modifications will result in exceptions */
getSenders()70     public Map<Address, Entry> getSenders() {
71         return Collections.unmodifiableMap(senders);
72     }
73 
equals(Object obj)74     public boolean equals(Object obj) {
75         if(!(obj instanceof Digest))
76             return false;
77         Digest other=(Digest)obj;
78         return senders.equals(other.senders);
79     }
80 
81 
contains(Address sender)82     public boolean contains(Address sender) {
83         return senders.containsKey(sender);
84     }
85 
86     /** Returns the Entry for the given sender. Note that Entry is immutable */
get(Address sender)87     public Entry get(Address sender) {
88         return senders.get(sender);
89     }
90 
91 
92     /**
93      * Compares two digests and returns true if the senders are the same, otherwise false.
94      * @param other
95      * @return True if senders are the same, otherwise false.
96      */
sameSenders(Digest other)97     public boolean sameSenders(Digest other) {
98         if(other == null) return false;
99         if(this.senders.size() != other.senders.size()) return false;
100 
101         Set<Address> my_senders=senders.keySet(), other_senders=other.senders.keySet();
102         return my_senders.equals(other_senders);
103     }
104 
difference(Digest other)105     public Digest difference(Digest other) {
106         if(other == null) return copy();
107 
108         Digest result=EMPTY_DIGEST;
109         if(this.equals(other)) {
110             return result;
111         }
112         else {
113             //find intersection and compare their entries
114             Map<Address, Entry> resultMap=new ConcurrentHashMap<Address, Entry>(7);
115             Set<Address> intersection=new TreeSet<Address>(this.senders.keySet());
116             intersection.retainAll(other.senders.keySet());
117 
118             for(Address address : intersection) {
119                 Entry e1=this.get(address);
120                 Entry e2=other.get(address);
121                 if(e1.getHighestDeliveredSeqno() != e2.getHighestDeliveredSeqno()) {
122                     long low=Math.min(e1.highest_delivered_seqno, e2.highest_delivered_seqno);
123                     long high=max(e1.highest_delivered_seqno, e2.highest_delivered_seqno);
124                     Entry r=new Entry(low, high);
125                     resultMap.put(address, r);
126                 }
127             }
128 
129             //any entries left in (this - intersection)?
130             //if yes, add them to result
131             if(intersection.size() != this.senders.keySet().size()) {
132                 Set<Address> thisMinusInteresection=new TreeSet<Address>(this.senders.keySet());
133                 thisMinusInteresection.removeAll(intersection);
134                 for(Address address : thisMinusInteresection) {
135                     resultMap.put(address, new Entry(this.get(address)));
136                 }
137             }
138 
139             //any entries left in (other - intersection)?
140             //if yes, add them to result
141             if(intersection.size() != other.senders.keySet().size()) {
142                 Set<Address> otherMinusInteresection=new TreeSet<Address>(other.senders.keySet());
143                 otherMinusInteresection.removeAll(intersection);
144                 for(Address address : otherMinusInteresection) {
145                     resultMap.put(address, new Entry(other.get(address)));
146                 }
147             }
148             result=new Digest(resultMap);
149         }
150         return result;
151     }
152 
highestSequence(Digest other)153     public Digest highestSequence(Digest other) {
154         if(other == null) return copy();
155 
156         Digest result=EMPTY_DIGEST;
157         if(this.equals(other)) {
158             return this;
159         }
160         else {
161             //find intersection and compare their entries
162             Map<Address, Entry> resultMap=new ConcurrentHashMap<Address, Entry>(7);
163             Set<Address> intersection=new TreeSet<Address>(this.senders.keySet());
164             intersection.retainAll(other.senders.keySet());
165 
166             for(Address address : intersection) {
167                 Entry e1=this.get(address);
168                 Entry e2=other.get(address);
169 
170                 long high=max(e1.highest_delivered_seqno, e2.highest_delivered_seqno);
171                 Entry r=new Entry(0, high);
172                 resultMap.put(address, r);
173             }
174 
175             //any entries left in (this - intersection)?
176             //if yes, add them to result
177             if(intersection.size() != this.senders.keySet().size()) {
178                 Set<Address> thisMinusInteresection=new TreeSet<Address>(this.senders.keySet());
179                 thisMinusInteresection.removeAll(intersection);
180                 for(Address address : thisMinusInteresection) {
181                     resultMap.put(address, new Entry(this.get(address)));
182                 }
183             }
184 
185             //any entries left in (other - intersection)?
186             //if yes, add them to result
187             if(intersection.size() != other.senders.keySet().size()) {
188                 Set<Address> otherMinusInteresection=new TreeSet<Address>(other.senders.keySet());
189                 otherMinusInteresection.removeAll(intersection);
190                 for(Address address : otherMinusInteresection) {
191                     resultMap.put(address, new Entry(other.get(address)));
192                 }
193             }
194             result=new Digest(resultMap);
195         }
196         return result;
197     }
198 
199 
size()200     public int size() {
201         return senders.size();
202     }
203 
204 
lowSeqnoAt(Address sender)205     public long lowSeqnoAt(Address sender) {
206         Entry entry=senders.get(sender);
207         if(entry == null)
208             return -1;
209         else
210             return entry.low_seqno;
211     }
212 
213 
highestDeliveredSeqnoAt(Address sender)214     public long highestDeliveredSeqnoAt(Address sender) {
215         Entry entry=senders.get(sender);
216         if(entry == null)
217             return -1;
218         else
219             return entry.highest_delivered_seqno;
220     }
221 
222 
highestReceivedSeqnoAt(Address sender)223     public long highestReceivedSeqnoAt(Address sender) {
224         Entry entry=senders.get(sender);
225         if(entry == null)
226             return -1;
227         else
228             return entry.highest_received_seqno;
229     }
230 
231 
232     /**
233      * Returns true if all senders of the current digest have their seqnos >= the ones from other
234      * @param other
235      * @return
236      */
isGreaterThanOrEqual(Digest other)237     public boolean isGreaterThanOrEqual(Digest other) {
238         if(other == null)
239             return true;
240         Map<Address,Entry> our_map=getSenders();
241         Address sender;
242         Entry my_entry, their_entry;
243         long my_highest, their_highest;
244         for(Map.Entry<Address,Entry> entry: our_map.entrySet()) {
245             sender=entry.getKey();
246             my_entry=entry.getValue();
247             their_entry=other.get(sender);
248             if(their_entry == null)
249                 continue;
250             my_highest=my_entry.getHighest();
251             their_highest=their_entry.getHighest();
252             if(my_highest < their_highest)
253                 return false;
254         }
255         return true;
256     }
257 
258 
copy()259     public Digest copy() {
260         return new Digest(senders);
261     }
262 
263 
toString()264     public String toString() {
265         StringBuilder sb=new StringBuilder();
266         boolean first=true;
267         if(senders.isEmpty()) return "[]";
268 
269         for(Map.Entry<Address,Entry> entry: senders.entrySet()) {
270             Address key=entry.getKey();
271             Entry val=entry.getValue();
272             if(!first)
273                 sb.append(", ");
274             else
275                 first=false;
276             sb.append(key).append(": ").append('[').append(val.low_seqno).append(" : ");
277             sb.append(val.highest_delivered_seqno);
278             if(val.highest_received_seqno >= 0)
279                 sb.append(" (").append(val.highest_received_seqno).append(")");
280             sb.append("]");
281         }
282         return sb.toString();
283     }
284 
toStringSorted()285     public String toStringSorted() {
286         StringBuilder sb=new StringBuilder();
287         boolean first=true;
288         if(senders.isEmpty()) return "[]";
289 
290         TreeMap<Address,Entry> copy=new TreeMap<Address,Entry>(senders);
291         for(Map.Entry<Address,Entry> entry: copy.entrySet()) {
292             Address key=entry.getKey();
293             Entry val=entry.getValue();
294             if(!first)
295                 sb.append(", ");
296             else
297                 first=false;
298             sb.append(key).append(": ").append('[').append(val.low_seqno).append(" : ");
299             sb.append(val.highest_delivered_seqno);
300             if(val.highest_received_seqno >= 0)
301                 sb.append(" (").append(val.highest_received_seqno).append(")");
302             sb.append("]");
303         }
304         return sb.toString();
305     }
306 
307 
printHighestDeliveredSeqnos()308     public String printHighestDeliveredSeqnos() {
309         StringBuilder sb=new StringBuilder("[");
310         boolean first=true;
311 
312         TreeMap<Address,Entry> copy=new TreeMap<Address,Entry>(senders);
313         for(Map.Entry<Address,Entry> entry: copy.entrySet()) {
314             Address key=entry.getKey();
315             Entry val=entry.getValue();
316             if(!first)
317                 sb.append(", ");
318             else
319                 first=false;
320             sb.append(key).append("#").append(val.highest_delivered_seqno);
321         }
322         sb.append(']');
323         return sb.toString();
324     }
325 
326 
printHighestReceivedSeqnos()327     public String printHighestReceivedSeqnos() {
328         StringBuilder sb=new StringBuilder();
329         boolean first=true;
330 
331         for(Map.Entry<Address,Entry> entry: senders.entrySet()) {
332             Address key=entry.getKey();
333             Entry val=entry.getValue();
334             if(!first)
335                 sb.append(", ");
336             else {
337                 sb.append('[');
338                 first=false;
339             }
340             sb.append(key).append("#").append(val.highest_received_seqno);
341         }
342         sb.append(']');
343         return sb.toString();
344     }
345 
346 
writeExternal(ObjectOutput out)347     public void writeExternal(ObjectOutput out) throws IOException {
348         out.writeObject(senders);
349     }
350 
readExternal(ObjectInput in)351     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
352         Map<Address, Entry> tmp=(Map<Address, Entry>)in.readObject();
353         senders.clear();
354         senders.putAll(tmp);
355     }
356 
writeTo(DataOutputStream out)357     public void writeTo(DataOutputStream out) throws IOException {
358         out.writeShort(senders.size());
359         for(Map.Entry<Address,Entry> entry: senders.entrySet()) {
360             Entry val=entry.getValue();
361             Util.writeAddress(entry.getKey(), out);
362             out.writeLong(val.low_seqno);
363             out.writeLong(val.highest_delivered_seqno);
364             out.writeLong(val.highest_received_seqno);
365         }
366     }
367 
368 
readFrom(DataInputStream in)369     public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
370         short size=in.readShort();
371         Map<Address,Entry> tmp=new HashMap<Address, Entry>(size);
372         Address key;
373         for(int i=0; i < size; i++) {
374             key=Util.readAddress(in);
375             tmp.put(key, new Entry(in.readLong(), in.readLong(), in.readLong()));
376         }
377         senders.clear();
378         senders.putAll(tmp);
379     }
380 
381 
serializedSize()382     public long serializedSize() {
383         long retval=Global.SHORT_SIZE; // number of elements in 'senders'
384         if(!senders.isEmpty()) {
385             Address addr=senders.keySet().iterator().next();
386             int len=Util.size(addr);
387             len+=Entry.SIZE; // 3 longs in one Entry
388             retval+=len * senders.size();
389         }
390         return retval;
391     }
392 
createSenders(int size)393     private static Map<Address, Entry> createSenders(int size) {
394         return new ConcurrentHashMap<Address,Entry>(size);
395     }
396 
createSenders(Map<Address, Entry> map)397     private static Map<Address, Entry> createSenders(Map<Address, Entry> map) {
398         return new ConcurrentHashMap<Address,Entry>(map);
399     }
400 
401 
402 
403     /**
404      * Class keeping track of the lowest and highest sequence numbers delivered, and the highest
405      * sequence numbers received, per member. This class is immutable
406      */
407     @Immutable
408     public static class Entry implements Externalizable, Streamable {
409         private long low_seqno=0;
410         private long highest_delivered_seqno=0; // the highest delivered seqno, e.g. in 1,2,4,5,7 --> 2
411         private long highest_received_seqno=0; //the highest received seqno, e.g. in 1,2,4,5,7 --> 7
412         final static int SIZE=Global.LONG_SIZE * 3;
413         private static final long serialVersionUID=-4468945932249281704L;
414 
Entry()415         public Entry() {
416         }
417 
Entry(long low_seqno, long highest_delivered_seqno, long highest_received_seqno)418         public Entry(long low_seqno, long highest_delivered_seqno, long highest_received_seqno) {
419             this.low_seqno=low_seqno;
420             this.highest_delivered_seqno=highest_delivered_seqno;
421             this.highest_received_seqno=highest_received_seqno;
422             check();
423         }
424 
425 
426 
Entry(long low_seqno, long highest_delivered_seqno)427         public Entry(long low_seqno, long highest_delivered_seqno) {
428             this.low_seqno=low_seqno;
429             this.highest_delivered_seqno=highest_delivered_seqno;
430             check();
431         }
432 
Entry(Entry other)433         public Entry(Entry other) {
434             if(other != null) {
435                 low_seqno=other.low_seqno;
436                 highest_delivered_seqno=other.highest_delivered_seqno;
437                 highest_received_seqno=other.highest_received_seqno;
438                 check();
439             }
440         }
441 
getLow()442         public final long getLow() {return low_seqno;}
getHighestDeliveredSeqno()443         public final long getHighestDeliveredSeqno() {return highest_delivered_seqno;}
getHighestReceivedSeqno()444         public final long getHighestReceivedSeqno() {return highest_received_seqno;}
445 
446         /** Return the max of the highest delivered or highest received seqno */
getHighest()447         public final long getHighest() {return max(highest_delivered_seqno, highest_received_seqno);}
448 
equals(Object obj)449         public boolean equals(Object obj) {
450             if(!(obj instanceof Entry))
451                 return false;
452             Entry other=(Entry)obj;
453             return low_seqno == other.low_seqno && highest_delivered_seqno == other.highest_delivered_seqno && highest_received_seqno == other.highest_received_seqno;
454         }
455 
hashCode()456         public int hashCode() {
457             return (int)(low_seqno + highest_delivered_seqno + highest_received_seqno);
458         }
459 
toString()460         public String toString() {
461             return new StringBuilder("low=").append(low_seqno).append(", highest delivered=").append(highest_delivered_seqno).
462                     append(", highest received=").append(highest_received_seqno).toString();
463         }
464 
writeExternal(ObjectOutput out)465         public void writeExternal(ObjectOutput out) throws IOException {
466             out.writeLong(low_seqno);
467             out.writeLong(highest_delivered_seqno);
468             out.writeLong(highest_received_seqno);
469         }
470 
readExternal(ObjectInput in)471         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
472             low_seqno=in.readLong();
473             highest_delivered_seqno=in.readLong();
474             highest_received_seqno=in.readLong();
475         }
476 
size()477         public static int size() {
478             return SIZE;
479         }
480 
writeTo(DataOutputStream out)481         public void writeTo(DataOutputStream out) throws IOException {
482             out.writeLong(low_seqno);
483             out.writeLong(highest_delivered_seqno);
484             out.writeLong(highest_received_seqno);
485         }
486 
readFrom(DataInputStream in)487         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
488             low_seqno=in.readLong();
489             highest_delivered_seqno=in.readLong();
490             highest_received_seqno=in.readLong();
491         }
492 
493 
check()494         private void check() {
495             if(low_seqno > highest_delivered_seqno)
496                 throw new IllegalArgumentException("low_seqno (" + low_seqno + ") is greater than highest_delivered_seqno (" + highest_delivered_seqno + ")");
497         }
498 
499 
500     }
501 }
502