1 package org.jgroups.blocks; 2 3 4 import org.jgroups.Address; 5 import org.jgroups.Message; 6 import org.jgroups.Transport; 7 import org.jgroups.View; 8 import org.jgroups.annotations.GuardedBy; 9 import org.jgroups.util.Rsp; 10 11 import java.util.Collection; 12 import java.util.concurrent.ExecutionException; 13 import java.util.concurrent.TimeUnit; 14 import java.util.concurrent.TimeoutException; 15 16 17 /** 18 * Sends a request to a single target destination 19 * 20 * @author Bela Ban 21 */ 22 public class UnicastRequest<T> extends Request { 23 protected final Rsp<T> result; 24 protected final Address target; 25 26 27 28 /** 29 @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely 30 (e.g. if a suspicion service is available; timeouts are not needed). 31 */ UnicastRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options)32 public UnicastRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options) { 33 super(m, corr, null, options); 34 this.target=target; 35 result=new Rsp(target); 36 } 37 38 39 /** 40 * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely 41 * (e.g. if a suspicion service is available; timeouts are not needed). 42 */ UnicastRequest(Message m, Transport transport, Address target, RequestOptions options)43 public UnicastRequest(Message m, Transport transport, Address target, RequestOptions options) { 44 super(m, null, transport, options); 45 this.target=target; 46 result=new Rsp(target); 47 } 48 49 sendRequest()50 protected void sendRequest() throws Exception { 51 try { 52 if(log.isTraceEnabled()) log.trace(new StringBuilder("sending request (id=").append(req_id).append(')')); 53 if(corr != null) { 54 corr.sendUnicastRequest(req_id, target, request_msg, options.getMode() == GET_NONE? null : this); 55 } 56 else { 57 transport.send(request_msg); 58 } 59 } 60 catch(Exception ex) { 61 if(corr != null) 62 corr.done(req_id); 63 throw ex; 64 } 65 } 66 67 68 /* ---------------------- Interface RspCollector -------------------------- */ 69 /** 70 * <b>Callback</b> (called by RequestCorrelator or Transport). 71 * Adds a response to the response table. When all responses have been received, 72 * <code>execute()</code> returns. 73 */ receiveResponse(Object response_value, Address sender)74 public void receiveResponse(Object response_value, Address sender) { 75 RspFilter rsp_filter=options.getRspFilter(); 76 77 lock.lock(); 78 try { 79 if(done) 80 return; 81 if(!result.wasReceived()) { 82 boolean responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender); 83 result.setValue((T)response_value); 84 result.setReceived(responseReceived); 85 if(log.isTraceEnabled()) 86 log.trace(new StringBuilder("received response for request ").append(req_id) 87 .append(", sender=").append(sender).append(", val=").append(response_value)); 88 } 89 done=rsp_filter == null? responsesComplete() : !rsp_filter.needMoreResponses(); 90 if(done && corr != null) 91 corr.done(req_id); 92 } 93 finally { 94 completed.signalAll(); // wakes up execute() 95 lock.unlock(); 96 } 97 checkCompletion(this); 98 } 99 100 101 /** 102 * <b>Callback</b> (called by RequestCorrelator or Transport). 103 * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected). 104 * This method would probably be called when getting a suspect message from a failure detector 105 * (where available). It is used to exclude faulty members from the response list. 106 */ suspect(Address suspected_member)107 public void suspect(Address suspected_member) { 108 if(suspected_member == null || !suspected_member.equals(target)) 109 return; 110 111 lock.lock(); 112 try { 113 if(done) 114 return; 115 if(result != null && !result.wasReceived()) 116 result.setSuspected(true); 117 done=true; 118 if(corr != null) 119 corr.done(req_id); 120 completed.signalAll(); 121 } 122 finally { 123 lock.unlock(); 124 } 125 checkCompletion(this); 126 } 127 128 129 /** 130 * If the target address is not a member of the new view, we'll mark the response as not received and unblock 131 * the caller of execute() 132 */ viewChange(View new_view)133 public void viewChange(View new_view) { 134 Collection<Address> mbrs=new_view != null? new_view.getMembers() : null; 135 if(mbrs == null) 136 return; 137 138 lock.lock(); 139 try { 140 if(!mbrs.contains(target)) { 141 result.setReceived(false); 142 done=true; 143 if(corr != null) 144 corr.done(req_id); 145 completed.signalAll(); 146 } 147 } 148 finally { 149 lock.unlock(); 150 } 151 152 checkCompletion(this); 153 } 154 155 156 /* -------------------- End of Interface RspCollector ----------------------------------- */ 157 getResult()158 public Rsp getResult() { 159 return result; 160 } 161 162 get()163 public T get() throws InterruptedException, ExecutionException { 164 lock.lock(); 165 try { 166 waitForResults(0); 167 return result.getValue(); 168 } 169 finally { 170 lock.unlock(); 171 } 172 } 173 get(long timeout, TimeUnit unit)174 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 175 boolean ok; 176 lock.lock(); 177 try { 178 ok=waitForResults(unit.toMillis(timeout)); 179 } 180 finally { 181 lock.unlock(); 182 } 183 if(!ok) 184 throw new TimeoutException(); 185 return result.getValue(); 186 } 187 toString()188 public String toString() { 189 StringBuilder ret=new StringBuilder(128); 190 ret.append(super.toString()); 191 ret.append(", target=" + target); 192 return ret.toString(); 193 } 194 195 196 197 @GuardedBy("lock") responsesComplete()198 protected boolean responsesComplete() { 199 return done || options.getMode() == GET_NONE || result.wasReceived() || result.wasSuspected(); 200 } 201 202 203 204 }