1 package org.jgroups.blocks; 2 3 4 import org.jgroups.Address; 5 import org.jgroups.Message; 6 import org.jgroups.Transport; 7 import org.jgroups.View; 8 import org.jgroups.annotations.GuardedBy; 9 import org.jgroups.util.Rsp; 10 import org.jgroups.util.RspList; 11 12 import java.util.*; 13 import java.util.concurrent.ExecutionException; 14 import java.util.concurrent.TimeUnit; 15 import java.util.concurrent.TimeoutException; 16 17 18 /** 19 * Sends a request to multiple destinations. Alternative implementation when we have few targets: between UnicastRequest 20 * with 1 target and GroupRequest with many destination members. Performance is about the same as for GroupRequest, but 21 * this class should use less memory as it doesn't create hashmaps. Don't use with many targets as we have to do 22 * a linear search through an array of targets to match a response to a request.<p/> 23 * MultiRequest is currently not used 24 * 25 * @author Bela Ban 26 * @since 2.9 27 */ 28 public class MultiRequest extends Request { 29 @GuardedBy("lock") 30 private final Rsp[] responses; 31 32 protected final int expected_mbrs; 33 34 @GuardedBy("lock") 35 int num_received, num_not_received, num_suspected; 36 37 38 39 /** 40 * @param m 41 * The message to be sent 42 * @param corr 43 * The request correlator to be used. A request correlator 44 * sends requests tagged with a unique ID and notifies the 45 * sender when matching responses are received. The reason 46 * <code>GroupRequest</code> uses it instead of a 47 * <code>Transport</code> is that multiple 48 * requests/responses might be sent/received concurrently. 49 * @param mbrs 50 * The initial membership. This value reflects the membership 51 * to which the request is sent (and from which potential 52 * responses are expected). Is reset by reset(). 53 * @param options The options to be passed to the request 54 */ MultiRequest(Message m, RequestCorrelator corr, Collection<Address> mbrs, RequestOptions options, int expected_mbrs)55 public MultiRequest(Message m, RequestCorrelator corr, Collection<Address> mbrs, RequestOptions options, int expected_mbrs) { 56 super(m, corr, null, options); 57 this.expected_mbrs=expected_mbrs; 58 responses=new Rsp[mbrs.size()]; 59 setTargets(mbrs); 60 } 61 MultiRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options, int expected_mbrs)62 public MultiRequest(Message m, RequestCorrelator corr, Address target, RequestOptions options, int expected_mbrs) { 63 super(m, corr, null, options); 64 this.expected_mbrs=expected_mbrs; 65 responses=new Rsp[1]; 66 setTarget(target); 67 } 68 69 70 71 /** 72 * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely 73 * (e.g. if a suspicion service is available; timeouts are not needed). 74 */ MultiRequest(Message m, Transport transport, Collection<Address> mbrs, RequestOptions options, int expected_mbrs)75 public MultiRequest(Message m, Transport transport, Collection<Address> mbrs, RequestOptions options, int expected_mbrs) { 76 super(m, null, transport, options); 77 this.expected_mbrs=expected_mbrs; 78 responses=new Rsp[1]; 79 setTargets(mbrs); 80 } 81 setTarget(Address mbr)82 void setTarget(Address mbr) { 83 responses[0]=new Rsp(mbr); 84 num_not_received++; 85 } 86 setTargets(Collection<Address> mbrs)87 void setTargets(Collection<Address> mbrs) { 88 int index=0; 89 for(Address mbr: mbrs) { 90 responses[index++]=new Rsp(mbr); 91 num_not_received++; 92 } 93 } 94 getAnycasting()95 public boolean getAnycasting() { 96 return options.getAnycasting(); 97 } 98 setAnycasting(boolean anycasting)99 public void setAnycasting(boolean anycasting) { 100 options.setAnycasting(anycasting); 101 } 102 103 104 sendRequest()105 public void sendRequest() throws Exception { 106 List<Address> targets=null; 107 targets=new ArrayList<Address>(responses.length); 108 for(Rsp rsp: responses) 109 targets.add(rsp.getSender()); 110 111 sendRequest(targets, req_id, options); 112 } 113 findResponse(Address target)114 Rsp findResponse(Address target) { 115 for(Rsp rsp: responses) { 116 if(rsp != null && target.equals(rsp.getSender())) 117 return rsp; 118 } 119 return null; 120 } 121 122 /* ---------------------- Interface RspCollector -------------------------- */ 123 /** 124 * <b>Callback</b> (called by RequestCorrelator or Transport). 125 * Adds a response to the response table. When all responses have been received, 126 * <code>execute()</code> returns. 127 */ receiveResponse(Object response_value, Address sender)128 public void receiveResponse(Object response_value, Address sender) { 129 if(done) 130 return; 131 Rsp rsp=findResponse(sender); 132 if(rsp == null) 133 return; 134 135 RspFilter rsp_filter=options.getRspFilter(); 136 boolean responseReceived=false; 137 if(!rsp.wasReceived()) { 138 if((responseReceived=(rsp_filter == null) || rsp_filter.isAcceptable(response_value, sender))) 139 rsp.setValue(response_value); 140 rsp.setReceived(responseReceived); 141 } 142 143 lock.lock(); 144 try { 145 if(responseReceived) 146 num_received++; 147 done=rsp_filter == null? responsesComplete() : !rsp_filter.needMoreResponses(); 148 if(responseReceived || done) 149 completed.signalAll(); // wakes up execute() 150 if(done && corr != null) 151 corr.done(req_id); 152 } 153 finally { 154 lock.unlock(); 155 } 156 if(responseReceived || done) 157 checkCompletion(this); 158 } 159 160 161 /** 162 * <b>Callback</b> (called by RequestCorrelator or Transport). 163 * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected). 164 * This method would probably be called when getting a suspect message from a failure detector 165 * (where available). It is used to exclude faulty members from the response list. 166 */ suspect(Address suspected_member)167 public void suspect(Address suspected_member) { 168 if(suspected_member == null) 169 return; 170 171 boolean changed=false; 172 Rsp rsp=findResponse(suspected_member); 173 if(rsp != null) { 174 if(rsp.setSuspected(true)) { 175 rsp.setValue(null); 176 changed=true; 177 lock.lock(); 178 try { 179 num_suspected++; 180 completed.signalAll(); 181 } 182 finally { 183 lock.unlock(); 184 } 185 } 186 } 187 188 if(changed) 189 checkCompletion(this); 190 } 191 192 193 /** 194 * Any member of 'membership' that is not in the new view is flagged as 195 * SUSPECTED. Any member in the new view that is <em>not</em> in the 196 * membership (ie, the set of responses expected for the current RPC) will 197 * <em>not</em> be added to it. If we did this we might run into the 198 * following problem: 199 * <ul> 200 * <li>Membership is {A,B} 201 * <li>A sends a synchronous group RPC (which sleeps for 60 secs in the 202 * invocation handler) 203 * <li>C joins while A waits for responses from A and B 204 * <li>If this would generate a new view {A,B,C} and if this expanded the 205 * response set to {A,B,C}, A would wait forever on C's response because C 206 * never received the request in the first place, therefore won't send a 207 * response. 208 * </ul> 209 */ viewChange(View new_view)210 public void viewChange(View new_view) { 211 Vector<Address> mbrs=new_view != null? new_view.getMembers() : null; 212 if(mbrs == null) 213 return; 214 215 boolean changed=false; 216 if(responses == null || responses.length == 0) 217 return; 218 219 lock.lock(); 220 try { 221 for(Rsp rsp: responses) { 222 Address mbr=rsp.getSender(); 223 if(!mbrs.contains(mbr)) { 224 rsp.setValue(null); 225 if(rsp.setSuspected(true)) { 226 num_suspected++; 227 changed=true; 228 } 229 } 230 } 231 if(changed) 232 completed.signalAll(); 233 } 234 finally { 235 lock.unlock(); 236 } 237 if(changed) 238 checkCompletion(this); 239 } 240 241 242 /* -------------------- End of Interface RspCollector ----------------------------------- */ 243 244 245 246 /** Returns the results as a RspList */ getResults()247 public RspList getResults() { 248 RspList list=new RspList(); 249 for(Rsp rsp: responses) 250 list.put(rsp.getSender(), rsp); 251 return list; 252 } 253 254 255 get()256 public RspList get() throws InterruptedException, ExecutionException { 257 lock.lock(); 258 try { 259 waitForResults(0); 260 } 261 finally { 262 lock.unlock(); 263 } 264 return getResults(); 265 } 266 get(long timeout, TimeUnit unit)267 public RspList get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 268 boolean ok; 269 lock.lock(); 270 try { 271 ok=waitForResults(unit.toMillis(timeout)); 272 } 273 finally { 274 lock.unlock(); 275 } 276 if(!ok) 277 throw new TimeoutException(); 278 return getResults(); 279 } 280 toString()281 public String toString() { 282 StringBuilder ret=new StringBuilder(128); 283 ret.append(super.toString()); 284 285 lock.lock(); 286 try { 287 if(!(responses.length == 0)) { 288 ret.append(", entries:\n"); 289 for(Rsp rsp: responses) { 290 Address mbr=rsp.getSender(); 291 ret.append(mbr).append(": ").append(rsp).append("\n"); 292 } 293 } 294 } 295 finally { 296 lock.unlock(); 297 } 298 return ret.toString(); 299 } 300 301 302 303 304 /* --------------------------------- Private Methods -------------------------------------*/ 305 determineMajority(int i)306 private static int determineMajority(int i) { 307 return i < 2? i : (i / 2) + 1; 308 } 309 310 311 312 sendRequest(List<Address> targetMembers, long requestId, RequestOptions options)313 private void sendRequest(List<Address> targetMembers, long requestId, RequestOptions options) throws Exception { 314 try { 315 if(log.isTraceEnabled()) log.trace(new StringBuilder("sending request (id=").append(req_id).append(')')); 316 if(corr != null) { 317 corr.sendRequest(requestId, targetMembers, request_msg, options.getMode() == GET_NONE? null : this, options); 318 } 319 else { 320 if(options.getAnycasting()) { 321 for(Address mbr: targetMembers) { 322 Message copy=request_msg.copy(true); 323 copy.setDest(mbr); 324 transport.send(copy); 325 } 326 } 327 else { 328 transport.send(request_msg); 329 } 330 } 331 } 332 catch(Exception ex) { 333 if(corr != null) 334 corr.done(requestId); 335 throw ex; 336 } 337 } 338 339 340 @GuardedBy("lock") responsesComplete()341 protected boolean responsesComplete() { 342 if(done) 343 return true; 344 345 final int num_total=responses.length; 346 347 switch(options.getMode()) { 348 case GET_FIRST: 349 if(num_received > 0) 350 return true; 351 if(num_suspected >= num_total) 352 // e.g. 2 members, and both suspected 353 return true; 354 break; 355 case GET_ALL: 356 return num_received + num_suspected >= num_total; 357 case GET_MAJORITY: 358 int majority=determineMajority(num_total); 359 if(num_received + num_suspected >= majority) 360 return true; 361 break; 362 case GET_ABS_MAJORITY: 363 majority=determineMajority(num_total); 364 if(num_received >= majority) 365 return true; 366 break; 367 case GET_N: 368 if(expected_mbrs >= num_total) { 369 return responsesComplete(); 370 } 371 return num_received >= expected_mbrs || num_received + num_not_received < expected_mbrs && num_received + num_suspected >= expected_mbrs; 372 case GET_NONE: 373 return true; 374 default : 375 if(log.isErrorEnabled()) log.error("rsp_mode " + options.getMode() + " unknown !"); 376 break; 377 } 378 return false; 379 } 380 381 382 383 }