1 package org.jgroups.blocks;
2 
3 
4 import org.jgroups.Address;
5 import org.jgroups.Message;
6 import org.jgroups.Transport;
7 import org.jgroups.View;
8 import org.jgroups.annotations.GuardedBy;
9 import org.jgroups.util.Rsp;
10 import org.jgroups.util.RspList;
11 
12 import java.util.*;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.TimeoutException;
16 
17 
18 /**
19  * Sends a request to multiple destinations. Alternative implementation when we have few targets: between UnicastRequest
20  * with 1 target and GroupRequest with many destination members. Performance is about the same as for GroupRequest, but
21  * this class should use less memory as it doesn't create hashmaps. Don't use with many targets as we have to do
22  * a linear search through an array of targets to match a response to a request.<p/>
23  * MultiRequest is currently not used
24  *
25  * @author Bela Ban
26  * @since 2.9
27  */
28 public class MultiRequest extends Request {
29     @GuardedBy("lock")
30     private final Rsp[] responses;
31 
32     protected final int expected_mbrs;
33 
34     @GuardedBy("lock")
35     int num_received, num_not_received, num_suspected;
36 
37 
38 
39     /**
40      * @param m
41      *                The message to be sent
42      * @param corr
43      *                The request correlator to be used. A request correlator
44      *                sends requests tagged with a unique ID and notifies the
45      *                sender when matching responses are received. The reason
46      *                <code>GroupRequest</code> uses it instead of a
47      *                <code>Transport</code> is that multiple
48      *                requests/responses might be sent/received concurrently.
49      * @param mbrs
50      *                The initial membership. This value reflects the membership
51      *                to which the request is sent (and from which potential
52      *                responses are expected). Is reset by reset().
53      * @param options The options to be passed to the request
54      */
MultiRequest(Message m, RequestCorrelator corr, Collection<Address> mbrs, RequestOptions options, int expected_mbrs)55     public MultiRequest(Message m, RequestCorrelator corr, Collection<Address> mbrs, RequestOptions options, int expected_mbrs) {
56         super(m, corr, null, options);
57         this.expected_mbrs=expected_mbrs;
58         responses=new Rsp[mbrs.size()];
59         setTargets(mbrs);
60     }
61 
MultiRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options, int expected_mbrs)62     public MultiRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options, int expected_mbrs) {
63         super(m, corr, null, options);
64         this.expected_mbrs=expected_mbrs;
65         responses=new Rsp[1];
66         setTarget(target);
67     }
68 
69 
70 
71     /**
72      * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
73      *                       (e.g. if a suspicion service is available; timeouts are not needed).
74      */
MultiRequest(Message m, Transport transport, Collection<Address> mbrs, RequestOptions options, int expected_mbrs)75     public MultiRequest(Message m, Transport transport, Collection<Address> mbrs, RequestOptions options, int expected_mbrs) {
76         super(m, null, transport, options);
77         this.expected_mbrs=expected_mbrs;
78         responses=new Rsp[1];
79         setTargets(mbrs);
80     }
81 
setTarget(Address mbr)82     void setTarget(Address mbr) {
83         responses[0]=new Rsp(mbr);
84         num_not_received++;
85     }
86 
setTargets(Collection<Address> mbrs)87     void setTargets(Collection<Address> mbrs) {
88         int index=0;
89         for(Address mbr: mbrs) {
90             responses[index++]=new Rsp(mbr);
91             num_not_received++;
92         }
93     }
94 
getAnycasting()95     public boolean getAnycasting() {
96         return options.getAnycasting();
97     }
98 
setAnycasting(boolean anycasting)99     public void setAnycasting(boolean anycasting) {
100         options.setAnycasting(anycasting);
101     }
102 
103 
104 
sendRequest()105     public void sendRequest() throws Exception {
106         List<Address> targets=null;
107         targets=new ArrayList<Address>(responses.length);
108         for(Rsp rsp: responses)
109             targets.add(rsp.getSender());
110 
111         sendRequest(targets, req_id, options);
112     }
113 
findResponse(Address target)114     Rsp findResponse(Address target) {
115         for(Rsp rsp: responses) {
116             if(rsp != null && target.equals(rsp.getSender()))
117                 return rsp;
118         }
119         return null;
120     }
121 
122     /* ---------------------- Interface RspCollector -------------------------- */
123     /**
124      * <b>Callback</b> (called by RequestCorrelator or Transport).
125      * Adds a response to the response table. When all responses have been received,
126      * <code>execute()</code> returns.
127      */
receiveResponse(Object response_value, Address sender)128     public void receiveResponse(Object response_value, Address sender) {
129         if(done)
130             return;
131         Rsp rsp=findResponse(sender);
132         if(rsp == null)
133             return;
134 
135         RspFilter rsp_filter=options.getRspFilter();
136         boolean responseReceived=false;
137         if(!rsp.wasReceived()) {
138             if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender)))
139                 rsp.setValue(response_value);
140             rsp.setReceived(responseReceived);
141         }
142 
143         lock.lock();
144         try {
145             if(responseReceived)
146                 num_received++;
147             done=rsp_filter == null? responsesComplete() : !rsp_filter.needMoreResponses();
148             if(responseReceived || done)
149                 completed.signalAll(); // wakes up execute()
150             if(done && corr != null)
151                 corr.done(req_id);
152         }
153         finally {
154             lock.unlock();
155         }
156         if(responseReceived || done)
157             checkCompletion(this);
158     }
159 
160 
161     /**
162      * <b>Callback</b> (called by RequestCorrelator or Transport).
163      * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected).
164      * This method would probably be called when getting a suspect message from a failure detector
165      * (where available). It is used to exclude faulty members from the response list.
166      */
suspect(Address suspected_member)167      public void suspect(Address suspected_member) {
168         if(suspected_member == null)
169             return;
170 
171         boolean changed=false;
172         Rsp rsp=findResponse(suspected_member);
173         if(rsp !=  null) {
174             if(rsp.setSuspected(true)) {
175                 rsp.setValue(null);
176                 changed=true;
177                 lock.lock();
178                 try {
179                     num_suspected++;
180                     completed.signalAll();
181                 }
182                 finally {
183                     lock.unlock();
184                 }
185             }
186         }
187 
188         if(changed)
189             checkCompletion(this);
190     }
191 
192 
193     /**
194      * Any member of 'membership' that is not in the new view is flagged as
195      * SUSPECTED. Any member in the new view that is <em>not</em> in the
196      * membership (ie, the set of responses expected for the current RPC) will
197      * <em>not</em> be added to it. If we did this we might run into the
198      * following problem:
199      * <ul>
200      * <li>Membership is {A,B}
201      * <li>A sends a synchronous group RPC (which sleeps for 60 secs in the
202      * invocation handler)
203      * <li>C joins while A waits for responses from A and B
204      * <li>If this would generate a new view {A,B,C} and if this expanded the
205      * response set to {A,B,C}, A would wait forever on C's response because C
206      * never received the request in the first place, therefore won't send a
207      * response.
208      * </ul>
209      */
viewChange(View new_view)210      public void viewChange(View new_view) {
211         Vector<Address> mbrs=new_view != null? new_view.getMembers() : null;
212         if(mbrs == null)
213             return;
214 
215         boolean changed=false;
216         if(responses == null || responses.length == 0)
217                 return;
218 
219         lock.lock();
220         try {
221             for(Rsp rsp: responses) {
222                 Address mbr=rsp.getSender();
223                 if(!mbrs.contains(mbr)) {
224                     rsp.setValue(null);
225                     if(rsp.setSuspected(true)) {
226                         num_suspected++;
227                         changed=true;
228                     }
229                 }
230             }
231             if(changed)
232                 completed.signalAll();
233         }
234         finally {
235             lock.unlock();
236         }
237         if(changed)
238             checkCompletion(this);
239     }
240 
241 
242     /* -------------------- End of Interface RspCollector ----------------------------------- */
243 
244 
245 
246     /** Returns the results as a RspList */
getResults()247     public RspList getResults() {
248         RspList list=new RspList();
249         for(Rsp rsp: responses)
250             list.put(rsp.getSender(), rsp);
251         return list;
252     }
253 
254 
255 
get()256     public RspList get() throws InterruptedException, ExecutionException {
257         lock.lock();
258         try {
259             waitForResults(0);
260         }
261         finally {
262             lock.unlock();
263         }
264         return getResults();
265     }
266 
get(long timeout, TimeUnit unit)267     public RspList get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
268         boolean ok;
269         lock.lock();
270         try {
271             ok=waitForResults(unit.toMillis(timeout));
272         }
273         finally {
274             lock.unlock();
275         }
276         if(!ok)
277             throw new TimeoutException();
278         return getResults();
279     }
280 
toString()281     public String toString() {
282         StringBuilder ret=new StringBuilder(128);
283         ret.append(super.toString());
284 
285         lock.lock();
286         try {
287             if(!(responses.length == 0)) {
288                 ret.append(", entries:\n");
289                 for(Rsp rsp: responses) {
290                     Address mbr=rsp.getSender();
291                     ret.append(mbr).append(": ").append(rsp).append("\n");
292                 }
293             }
294         }
295         finally {
296             lock.unlock();
297         }
298         return ret.toString();
299     }
300 
301 
302 
303 
304     /* --------------------------------- Private Methods -------------------------------------*/
305 
determineMajority(int i)306     private static int determineMajority(int i) {
307         return i < 2? i : (i / 2) + 1;
308     }
309 
310 
311 
312 
sendRequest(List<Address> targetMembers, long requestId, RequestOptions options)313     private void sendRequest(List<Address> targetMembers, long requestId, RequestOptions options) throws Exception {
314         try {
315             if(log.isTraceEnabled()) log.trace(new StringBuilder("sending request (id=").append(req_id).append(')'));
316             if(corr != null) {
317                 corr.sendRequest(requestId, targetMembers, request_msg, options.getMode() == GET_NONE? null : this, options);
318             }
319             else {
320                 if(options.getAnycasting()) {
321                     for(Address mbr: targetMembers) {
322                         Message copy=request_msg.copy(true);
323                         copy.setDest(mbr);
324                         transport.send(copy);
325                     }
326                 }
327                 else {
328                     transport.send(request_msg);
329                 }
330             }
331         }
332         catch(Exception ex) {
333             if(corr != null)
334                 corr.done(requestId);
335             throw ex;
336         }
337     }
338 
339 
340     @GuardedBy("lock")
responsesComplete()341     protected boolean responsesComplete() {
342         if(done)
343             return true;
344 
345         final int num_total=responses.length;
346 
347         switch(options.getMode()) {
348             case GET_FIRST:
349                 if(num_received > 0)
350                     return true;
351                 if(num_suspected >= num_total)
352                 // e.g. 2 members, and both suspected
353                     return true;
354                 break;
355             case GET_ALL:
356                 return num_received + num_suspected >= num_total;
357             case GET_MAJORITY:
358                 int majority=determineMajority(num_total);
359                 if(num_received + num_suspected >= majority)
360                     return true;
361                 break;
362             case GET_ABS_MAJORITY:
363                 majority=determineMajority(num_total);
364                 if(num_received >= majority)
365                     return true;
366                 break;
367             case GET_N:
368                 if(expected_mbrs >= num_total) {
369                     return responsesComplete();
370                 }
371                 return num_received >= expected_mbrs || num_received + num_not_received < expected_mbrs && num_received + num_suspected >= expected_mbrs;
372             case GET_NONE:
373                 return true;
374             default :
375                 if(log.isErrorEnabled()) log.error("rsp_mode " + options.getMode() + " unknown !");
376                 break;
377         }
378         return false;
379     }
380 
381 
382 
383 }