1 package org.jgroups.blocks;
2 
3 
4 import org.jgroups.Address;
5 import org.jgroups.Message;
6 import org.jgroups.Transport;
7 import org.jgroups.View;
8 import org.jgroups.annotations.GuardedBy;
9 import org.jgroups.util.Rsp;
10 
11 import java.util.Collection;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException;
15 
16 
17 /**
18  * Sends a request to a single target destination
19  *
20  * @author Bela Ban
21  */
22 public class UnicastRequest<T> extends Request {
23     protected final Rsp<T>     result;
24     protected final Address    target;
25 
26 
27 
28     /**
29      @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
30      (e.g. if a suspicion service is available; timeouts are not needed).
31      */
UnicastRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options)32     public UnicastRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options) {
33         super(m, corr, null, options);
34         this.target=target;
35         result=new Rsp(target);
36     }
37 
38 
39     /**
40      * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
41      *                       (e.g. if a suspicion service is available; timeouts are not needed).
42      */
UnicastRequest(Message m, Transport transport, Address target, RequestOptions options)43     public UnicastRequest(Message m, Transport transport, Address target, RequestOptions options) {
44         super(m, null, transport, options);
45         this.target=target;
46         result=new Rsp(target);
47     }
48 
49 
sendRequest()50     protected void sendRequest() throws Exception {
51         try {
52             if(log.isTraceEnabled()) log.trace(new StringBuilder("sending request (id=").append(req_id).append(')'));
53             if(corr != null) {
54                 corr.sendUnicastRequest(req_id, target, request_msg, options.getMode() == GET_NONE? null : this);
55             }
56             else {
57                 transport.send(request_msg);
58             }
59         }
60         catch(Exception ex) {
61             if(corr != null)
62                 corr.done(req_id);
63             throw ex;
64         }
65     }
66 
67 
68     /* ---------------------- Interface RspCollector -------------------------- */
69     /**
70      * <b>Callback</b> (called by RequestCorrelator or Transport).
71      * Adds a response to the response table. When all responses have been received,
72      * <code>execute()</code> returns.
73      */
receiveResponse(Object response_value, Address sender)74     public void receiveResponse(Object response_value, Address sender) {
75         RspFilter rsp_filter=options.getRspFilter();
76 
77         lock.lock();
78         try {
79             if(done)
80                 return;
81             if(!result.wasReceived()) {
82                 boolean responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender);
83                 result.setValue((T)response_value);
84                 result.setReceived(responseReceived);
85                 if(log.isTraceEnabled())
86                     log.trace(new StringBuilder("received response for request ").append(req_id)
87                             .append(", sender=").append(sender).append(", val=").append(response_value));
88             }
89             done=rsp_filter == null? responsesComplete() : !rsp_filter.needMoreResponses();
90             if(done && corr != null)
91                 corr.done(req_id);
92         }
93         finally {
94             completed.signalAll(); // wakes up execute()
95             lock.unlock();
96         }
97         checkCompletion(this);
98     }
99 
100 
101     /**
102      * <b>Callback</b> (called by RequestCorrelator or Transport).
103      * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected).
104      * This method would probably be called when getting a suspect message from a failure detector
105      * (where available). It is used to exclude faulty members from the response list.
106      */
suspect(Address suspected_member)107     public void suspect(Address suspected_member) {
108         if(suspected_member == null || !suspected_member.equals(target))
109             return;
110 
111         lock.lock();
112         try {
113             if(done)
114                 return;
115             if(result != null && !result.wasReceived())
116                 result.setSuspected(true);
117             done=true;
118             if(corr != null)
119                 corr.done(req_id);
120             completed.signalAll();
121         }
122         finally {
123             lock.unlock();
124         }
125         checkCompletion(this);
126     }
127 
128 
129     /**
130      * If the target address is not a member of the new view, we'll mark the response as not received and unblock
131      * the caller of execute()
132      */
viewChange(View new_view)133     public void viewChange(View new_view) {
134         Collection<Address> mbrs=new_view != null? new_view.getMembers() : null;
135         if(mbrs == null)
136             return;
137 
138         lock.lock();
139         try {
140             if(!mbrs.contains(target)) {
141                 result.setReceived(false);
142                 done=true;
143                 if(corr != null)
144                     corr.done(req_id);
145                 completed.signalAll();
146             }
147         }
148         finally {
149             lock.unlock();
150         }
151 
152         checkCompletion(this);
153     }
154 
155 
156     /* -------------------- End of Interface RspCollector ----------------------------------- */
157 
getResult()158     public Rsp getResult() {
159         return result;
160     }
161 
162 
get()163     public T get() throws InterruptedException, ExecutionException {
164         lock.lock();
165         try {
166             waitForResults(0);
167             return result.getValue();
168         }
169         finally {
170             lock.unlock();
171         }
172     }
173 
get(long timeout, TimeUnit unit)174     public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
175         boolean ok;
176         lock.lock();
177         try {
178             ok=waitForResults(unit.toMillis(timeout));
179         }
180         finally {
181             lock.unlock();
182         }
183         if(!ok)
184             throw new TimeoutException();
185         return result.getValue();
186     }
187 
toString()188     public String toString() {
189         StringBuilder ret=new StringBuilder(128);
190         ret.append(super.toString());
191         ret.append(", target=" + target);
192         return ret.toString();
193     }
194 
195 
196 
197     @GuardedBy("lock")
responsesComplete()198     protected boolean responsesComplete() {
199         return done || options.getMode() == GET_NONE || result.wasReceived() || result.wasSuspected();
200     }
201 
202 
203 
204 }