1 package org.jgroups.blocks; 2 3 import org.jgroups.logging.Log; 4 import org.jgroups.logging.LogFactory; 5 import org.jgroups.Address; 6 import org.jgroups.Global; 7 import org.jgroups.Version; 8 import org.jgroups.stack.IpAddress; 9 import org.jgroups.util.*; 10 11 import java.io.*; 12 import java.net.InetAddress; 13 import java.net.ServerSocket; 14 import java.net.Socket; 15 import java.net.SocketException; 16 import java.util.*; 17 import java.util.Map.Entry; 18 import java.util.concurrent.BlockingQueue; 19 import java.util.concurrent.LinkedBlockingQueue; 20 import java.util.concurrent.atomic.AtomicInteger; 21 import java.util.concurrent.locks.Lock; 22 import java.util.concurrent.locks.ReentrantLock; 23 24 /** 25 * Shared class for TCP connection tables. 26 * @author Scott Marlow 27 */ 28 public abstract class BasicConnectionTable { 29 private ThreadFactory factory; 30 final Map<Address,Connection> conns=new HashMap<Address,Connection>(); // keys: Addresses (peer address), values: Connection 31 Receiver receiver=null; 32 boolean use_send_queues=false; // max number of messages in a send queue 33 int send_queue_size=10000; 34 InetAddress bind_addr=null; 35 Address local_addr=null; // bind_addr + port of srv_sock 36 int srv_port=7800; 37 int recv_buf_size=120000; 38 int send_buf_size=60000; 39 final Vector<ConnectionListener> conn_listeners=new Vector<ConnectionListener>(); // listeners to be notified when a conn is established/torn down 40 Reaper reaper=null; // closes conns that have been idle for more than n secs 41 long reaper_interval=60000; // reap unused conns once a minute 42 long conn_expire_time=300000; // connections can be idle for 5 minutes before they are reaped 43 int sock_conn_timeout=1000; // max time in millis to wait for Socket.connect() to return 44 int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address 45 final ThreadGroup thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "ConnectionTable"); 46 protected final Log log= LogFactory.getLog(getClass()); 47 final byte[] cookie={'b', 'e', 'l', 'a'}; 48 boolean use_reaper=false; // by default we don't reap idle conns 49 static final int backlog=20; // 20 conn requests are queued by ServerSocket (addtl will be discarded) 50 volatile ServerSocket srv_sock=null; 51 boolean tcp_nodelay=false; 52 int linger=-1; 53 protected SocketFactory socket_factory=new DefaultSocketFactory(); 54 55 /** 56 * The address which will be broadcast to the group (the externally visible address which this host should 57 * be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to. 58 */ 59 InetAddress external_addr=null; 60 int max_port=0; // maximum port to bind to (if < srv_port, no limit) 61 Thread acceptor=null; // continuously calls srv_sock.accept() 62 boolean running=false; 63 /** Total number of Connections created for this connection table */ 64 static AtomicInteger conn_creations=new AtomicInteger(0); 65 66 final static long MAX_JOIN_TIMEOUT=Global.THREAD_SHUTDOWN_WAIT_TIME; 67 68 69 BasicConnectionTable()70 protected BasicConnectionTable() { 71 factory = new DefaultThreadFactory(new ThreadGroup(Util.getGlobalThreadGroup(),"ConnectionTable"),"Connection Table", false); 72 } 73 setReceiver(Receiver r)74 public final void setReceiver(Receiver r) { 75 receiver=r; 76 } 77 addConnectionListener(ConnectionListener l)78 public void addConnectionListener(ConnectionListener l) { 79 if(l != null && !conn_listeners.contains(l)) 80 conn_listeners.addElement(l); 81 } 82 removeConnectionListener(ConnectionListener l)83 public void removeConnectionListener(ConnectionListener l) { 84 if(l != null) conn_listeners.removeElement(l); 85 } 86 getLocalAddress()87 public Address getLocalAddress() { 88 if(local_addr == null) 89 local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null; 90 return local_addr; 91 } 92 getSendBufferSize()93 public int getSendBufferSize() { 94 return send_buf_size; 95 } 96 setSendBufferSize(int send_buf_size)97 public void setSendBufferSize(int send_buf_size) { 98 this.send_buf_size=send_buf_size; 99 } 100 getReceiveBufferSize()101 public int getReceiveBufferSize() { 102 return recv_buf_size; 103 } 104 setReceiveBufferSize(int recv_buf_size)105 public void setReceiveBufferSize(int recv_buf_size) { 106 this.recv_buf_size=recv_buf_size; 107 } 108 getSocketConnectionTimeout()109 public int getSocketConnectionTimeout() { 110 return sock_conn_timeout; 111 } 112 setSocketConnectionTimeout(int sock_conn_timeout)113 public void setSocketConnectionTimeout(int sock_conn_timeout) { 114 this.sock_conn_timeout=sock_conn_timeout; 115 } 116 getPeerAddressReadTimeout()117 public int getPeerAddressReadTimeout() { 118 return peer_addr_read_timeout; 119 } 120 setPeerAddressReadTimeout(int peer_addr_read_timeout)121 public void setPeerAddressReadTimeout(int peer_addr_read_timeout) { 122 this.peer_addr_read_timeout=peer_addr_read_timeout; 123 } 124 getNumConnections()125 public int getNumConnections() { 126 return conns.size(); 127 } 128 getNumberOfConnectionCreations()129 public static int getNumberOfConnectionCreations() { 130 return conn_creations.intValue(); 131 } 132 getTcpNodelay()133 public boolean getTcpNodelay() { 134 return tcp_nodelay; 135 } 136 setTcpNodelay(boolean tcp_nodelay)137 public void setTcpNodelay(boolean tcp_nodelay) { 138 this.tcp_nodelay=tcp_nodelay; 139 } 140 getLinger()141 public int getLinger() { 142 return linger; 143 } 144 setLinger(int linger)145 public void setLinger(int linger) { 146 this.linger=linger; 147 } 148 setThreadFactory(ThreadFactory factory)149 public void setThreadFactory(ThreadFactory factory){ 150 this.factory = factory; 151 } 152 getThreadFactory()153 public ThreadFactory getThreadFactory(){ 154 return factory; 155 } 156 getSocketFactory()157 public SocketFactory getSocketFactory() { 158 return socket_factory; 159 } 160 setSocketFactory(SocketFactory socket_factory)161 public void setSocketFactory(SocketFactory socket_factory) { 162 this.socket_factory=socket_factory; 163 } 164 getUseSendQueues()165 public boolean getUseSendQueues() {return use_send_queues;} 166 setUseSendQueues(boolean flag)167 public void setUseSendQueues(boolean flag) {this.use_send_queues=flag;} 168 getSendQueueSize()169 public int getSendQueueSize() { 170 return send_queue_size; 171 } 172 setSendQueueSize(int send_queue_size)173 public void setSendQueueSize(int send_queue_size) { 174 this.send_queue_size=send_queue_size; 175 } 176 start()177 public void start() throws Exception { 178 running=true; 179 } 180 stop()181 public void stop() { 182 running=false; 183 184 // 1. Stop the reaper 185 if(reaper != null) 186 reaper.stop(); 187 188 // 2. close the server socket (this also stops the acceptor thread) 189 if(srv_sock != null) { 190 try { 191 ServerSocket tmp=srv_sock; 192 srv_sock=null; 193 socket_factory.close(tmp); 194 if(acceptor != null) 195 Util.interruptAndWaitToDie(acceptor); 196 } 197 catch(Exception e) { 198 } 199 } 200 201 // 3. then close the connections 202 Collection<Connection> connsCopy=null; 203 synchronized(conns) { 204 connsCopy=new LinkedList<Connection>(conns.values()); 205 conns.clear(); 206 } 207 for(Connection conn:connsCopy) { 208 conn.destroy(); 209 } 210 connsCopy.clear(); 211 local_addr=null; 212 } 213 214 /** 215 Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected. 216 */ removeConnection(Address addr)217 public void removeConnection(Address addr) { 218 Connection conn; 219 220 synchronized(conns) { 221 conn=conns.remove(addr); 222 } 223 224 if(conn != null) { 225 try { 226 conn.destroy(); // won't do anything if already destroyed 227 } 228 catch(Exception e) { 229 } 230 } 231 if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString()); 232 } 233 234 /** 235 * Calls the receiver callback. We do not serialize access to this method, and it may be called concurrently 236 * by several Connection handler threads. Therefore the receiver needs to be reentrant. 237 */ receive(Address sender, byte[] data, int offset, int length)238 public void receive(Address sender, byte[] data, int offset, int length) { 239 if(receiver != null) { 240 receiver.receive(sender, data, offset, length); 241 } 242 else 243 if(log.isErrorEnabled()) log.error("receiver is null (not set) !"); 244 } 245 toString()246 public String toString() { 247 StringBuilder ret=new StringBuilder(); 248 Address key; 249 Connection val; 250 Entry<Address,Connection> entry; 251 HashMap<Address,Connection> copy; 252 253 synchronized(conns) { 254 copy=new HashMap<Address,Connection>(conns); 255 } 256 ret.append("local_addr=" + local_addr).append("\n"); 257 ret.append("connections (" + copy.size() + "):\n"); 258 for(Iterator<Entry<Address,Connection>> it=copy.entrySet().iterator(); it.hasNext();) { 259 entry=it.next(); 260 key=entry.getKey(); 261 val=entry.getValue(); 262 ret.append(key + ": " + val + '\n'); 263 } 264 ret.append('\n'); 265 return ret.toString(); 266 } 267 notifyConnectionOpened(Address peer)268 void notifyConnectionOpened(Address peer) { 269 if(peer == null) return; 270 for(int i=0; i < conn_listeners.size(); i++) 271 conn_listeners.elementAt(i).connectionOpened(peer); 272 } 273 notifyConnectionClosed(Address peer)274 void notifyConnectionClosed(Address peer) { 275 if(peer == null) return; 276 for(int i=0; i < conn_listeners.size(); i++) 277 conn_listeners.elementAt(i).connectionClosed(peer); 278 } 279 addConnection(Address peer, Connection c)280 void addConnection(Address peer, Connection c) { 281 synchronized (conns) { 282 conns.put(peer, c); 283 } 284 if(reaper != null && !reaper.isRunning()) 285 reaper.start(); 286 } 287 send(Address dest, byte[] data, int offset, int length)288 public void send(Address dest, byte[] data, int offset, int length) throws Exception { 289 Connection conn; 290 if(dest == null) { 291 if(log.isErrorEnabled()) 292 log.error("destination is null"); 293 return; 294 } 295 296 if(data == null) { 297 log.warn("data is null; discarding packet"); 298 return; 299 } 300 301 if(!running) { 302 if(log.isWarnEnabled()) 303 log.warn("connection table is not running, discarding message to " + dest); 304 return; 305 } 306 307 if(dest.equals(local_addr)) { 308 receive(local_addr, data, offset, length); 309 return; 310 } 311 312 // 1. Try to obtain correct Connection (or create one if not yet existent) 313 try { 314 conn=getConnection(dest); 315 if(conn == null) return; 316 } 317 catch(Throwable ex) { 318 throw new Exception("connection to " + dest + " could not be established", ex); 319 } 320 321 // 2. Send the message using that connection 322 try { 323 conn.send(data, offset, length); 324 } 325 catch(Throwable ex) { 326 if(log.isTraceEnabled()) 327 log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table", ex); 328 removeConnection(dest); 329 } 330 } 331 getConnection(Address dest)332 abstract Connection getConnection(Address dest) throws Exception; 333 334 /** 335 * Removes all connections from ConnectionTable which are not in current_mbrs 336 * @param current_mbrs 337 */ retainAll(Collection<Address> current_mbrs)338 public void retainAll(Collection<Address> current_mbrs) { 339 if(current_mbrs == null) return; 340 HashMap<Address,Connection> copy; 341 synchronized(conns) { 342 copy=new HashMap<Address,Connection>(conns); 343 conns.keySet().retainAll(current_mbrs); 344 } 345 copy.keySet().removeAll(current_mbrs); 346 347 //destroy orphaned connection i.e. connections 348 //to members that are not in current view 349 for(Connection orphanConnection:copy.values()){ 350 if (log.isTraceEnabled()) 351 log.trace("At " + local_addr + " destroying orphan to " 352 + orphanConnection.getPeerAddress()); 353 orphanConnection.destroy(); 354 } 355 copy.clear(); 356 } 357 358 359 360 /** Used for message reception. */ 361 public interface Receiver { receive(Address sender, byte[] data, int offset, int length)362 void receive(Address sender, byte[] data, int offset, int length); 363 } 364 365 /** Used to be notified about connection establishment and teardown. */ 366 public interface ConnectionListener { connectionOpened(Address peer_addr)367 void connectionOpened(Address peer_addr); connectionClosed(Address peer_addr)368 void connectionClosed(Address peer_addr); 369 } 370 371 class Connection implements Runnable { 372 Socket sock=null; // socket to/from peer (result of srv_sock.accept() or new Socket()) 373 String sock_addr=null; // used for Thread.getName() 374 DataOutputStream out=null; // for sending messages 375 DataInputStream in=null; // for receiving messages 376 Thread receiverThread=null; // thread for receiving messages 377 Address peer_addr=null; // address of the 'other end' of the connection 378 final Lock send_lock=new ReentrantLock(); // serialize send() 379 long last_access=System.currentTimeMillis(); // last time a message was sent or received 380 381 /** Bounded queue of data to be sent to the peer of this connection */ 382 BlockingQueue<byte[]> send_queue=null; 383 Sender sender=null; 384 boolean is_running=false; 385 386 getSockAddress()387 private String getSockAddress() { 388 if(sock_addr != null) 389 return sock_addr; 390 if(sock != null) { 391 StringBuilder sb; 392 sb=new StringBuilder(); 393 sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort()); 394 sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort()); 395 sock_addr=sb.toString(); 396 } 397 return sock_addr; 398 } 399 400 401 402 Connection(Socket s, Address peer_addr)403 Connection(Socket s, Address peer_addr) { 404 sock=s; 405 this.peer_addr=peer_addr; 406 407 if(use_send_queues) { 408 send_queue=new LinkedBlockingQueue<byte[]>(send_queue_size); 409 sender=new Sender(); 410 } 411 412 try { 413 // out=new DataOutputStream(sock.getOutputStream()); 414 // in=new DataInputStream(sock.getInputStream()); 415 416 // The change to buffered input and output stream yielded a 400% performance gain ! 417 // bela Sept 7 2006 418 out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream())); 419 in=new DataInputStream(new BufferedInputStream(sock.getInputStream())); 420 if(sender != null) 421 sender.start(); 422 conn_creations.incrementAndGet(); 423 } 424 catch(Exception ex) { 425 if(log.isErrorEnabled()) log.error("exception is " + ex); 426 } 427 } 428 429 established()430 boolean established() { 431 return receiverThread != null; 432 } 433 434 setPeerAddress(Address peer_addr)435 void setPeerAddress(Address peer_addr) { 436 this.peer_addr=peer_addr; 437 } 438 getPeerAddress()439 Address getPeerAddress() {return peer_addr;} 440 updateLastAccessed()441 void updateLastAccessed() { 442 last_access=System.currentTimeMillis(); 443 } 444 init()445 void init() { 446 is_running=true; 447 if(receiverThread == null || !receiverThread.isAlive()) { 448 // Roland Kurmann 4/7/2003, put in thread_group 449 receiverThread=getThreadFactory().newThread(thread_group,this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]"); 450 receiverThread.start(); 451 if(log.isTraceEnabled()) 452 log.trace("receiver started: " + receiverThread); 453 } 454 455 } 456 457 /** 458 * Returns true if underlying socket to peer is closed 459 * 460 * @return 461 */ isSocketClosed()462 boolean isSocketClosed() { 463 return !(sock != null && sock.isConnected()); 464 } 465 466 destroy()467 void destroy() { 468 if(log.isTraceEnabled()) log.trace("destroyed " + this); 469 is_running=false; 470 closeSocket(); // should terminate handler as well 471 if(sender != null) 472 sender.stop(); 473 Thread tmp=receiverThread; 474 receiverThread=null; 475 if(tmp != null) { 476 Util.interruptAndWaitToDie(tmp); 477 } 478 479 conn_creations.decrementAndGet(); 480 } 481 482 483 /** 484 * 485 * @param data Guaranteed to be non null 486 * @param offset 487 * @param length 488 */ send(byte[] data, int offset, int length)489 void send(byte[] data, int offset, int length) { 490 if(!is_running) { 491 if(log.isWarnEnabled()) 492 log.warn("Connection is not running, discarding message"); 493 return; 494 } 495 if(use_send_queues) { 496 try { 497 // we need to copy the byte[] buffer here because the original buffer might get changed meanwhile 498 byte[] tmp=new byte[length]; 499 System.arraycopy(data, offset, tmp, 0, length); 500 send_queue.put(tmp); 501 } 502 catch(InterruptedException e) { 503 Thread.currentThread().interrupt(); 504 } 505 } 506 else 507 _send(data, offset, length, true); 508 } 509 510 511 /** 512 * Sends data using the 'out' output stream of the socket 513 * @param data 514 * @param offset 515 * @param length 516 * @param acquire_lock 517 */ _send(byte[] data, int offset, int length, boolean acquire_lock)518 private void _send(byte[] data, int offset, int length, boolean acquire_lock) { 519 if(acquire_lock) 520 send_lock.lock(); 521 522 try { 523 doSend(data, offset, length); 524 updateLastAccessed(); 525 } 526 catch(InterruptedException iex) { 527 Thread.currentThread().interrupt(); // set interrupt flag again 528 } 529 catch(Throwable ex) { 530 if(log.isErrorEnabled()) log.error("failed sending data to " + peer_addr + ": " + ex); 531 } 532 finally { 533 if(acquire_lock) 534 send_lock.unlock(); 535 } 536 } 537 538 doSend(byte[] data, int offset, int length)539 void doSend(byte[] data, int offset, int length) throws Exception { 540 try { 541 // we're using 'double-writes', sending the buffer to the destination in 2 pieces. this would 542 // ensure that, if the peer closed the connection while we were idle, we would get an exception. 543 // this won't happen if we use a single write (see Stevens, ch. 5.13). 544 if(out != null) { 545 out.writeInt(length); // write the length of the data buffer first 546 Util.doubleWrite(data, offset, length, out); 547 out.flush(); // may not be very efficient (but safe) 548 } 549 } 550 catch(Exception ex) { 551 removeConnection(peer_addr); 552 throw ex; 553 } 554 } 555 556 557 /** 558 * Reads the peer's address. First a cookie has to be sent which has to match my own cookie, otherwise 559 * the connection will be refused 560 */ readPeerAddress(Socket client_sock)561 Address readPeerAddress(Socket client_sock) throws Exception { 562 Address client_peer_addr=null; 563 byte[] input_cookie=new byte[cookie.length]; 564 int client_port=client_sock != null? client_sock.getPort() : 0; 565 short version; 566 InetAddress client_addr=client_sock != null? client_sock.getInetAddress() : null; 567 568 int timeout=client_sock.getSoTimeout(); 569 client_sock.setSoTimeout(peer_addr_read_timeout); 570 571 try { 572 573 if(in != null) { 574 initCookie(input_cookie); 575 576 // read the cookie first 577 in.readFully(input_cookie, 0, input_cookie.length); 578 if(!matchCookie(input_cookie)) 579 throw new SocketException("ConnectionTable.Connection.readPeerAddress(): cookie sent by " + 580 client_peer_addr + " does not match own cookie; terminating connection"); 581 // then read the version 582 version=in.readShort(); 583 584 if(Version.isBinaryCompatible(version) == false) { 585 if(log.isWarnEnabled()) 586 log.warn(new StringBuilder("packet from ").append(client_addr).append(':').append(client_port). 587 append(" has different version (").append(Version.print(version)).append(") from ours ("). 588 append(Version.printVersion()).append("). This may cause problems").toString()); 589 } 590 client_peer_addr=new IpAddress(); 591 client_peer_addr.readFrom(in); 592 593 updateLastAccessed(); 594 } 595 return client_peer_addr; 596 } 597 finally { 598 client_sock.setSoTimeout(timeout); 599 } 600 } 601 602 603 /** 604 * Send the cookie first, then the our port number. If the cookie doesn't match the receiver's cookie, 605 * the receiver will reject the connection and close it. 606 */ sendLocalAddress(Address local_addr)607 void sendLocalAddress(Address local_addr) { 608 if(local_addr == null) { 609 if(log.isWarnEnabled()) log.warn("local_addr is null"); 610 return; 611 } 612 if(out != null) { 613 try { 614 // write the cookie 615 out.write(cookie, 0, cookie.length); 616 617 // write the version 618 out.writeShort(Version.version); 619 local_addr.writeTo(out); 620 out.flush(); // needed ? 621 updateLastAccessed(); 622 } 623 catch(Throwable t) { 624 if(log.isErrorEnabled()) log.error("exception is " + t); 625 } 626 } 627 } 628 629 initCookie(byte[] c)630 void initCookie(byte[] c) { 631 if(c != null) 632 for(int i=0; i < c.length; i++) 633 c[i]=0; 634 } 635 matchCookie(byte[] input)636 boolean matchCookie(byte[] input) { 637 if(input == null || input.length < cookie.length) return false; 638 for(int i=0; i < cookie.length; i++) 639 if(cookie[i] != input[i]) return false; 640 return true; 641 } 642 643 printCookie(byte[] c)644 String printCookie(byte[] c) { 645 if(c == null) return ""; 646 return new String(c); 647 } 648 649 run()650 public void run() { 651 while(receiverThread != null && receiverThread.equals(Thread.currentThread()) && is_running) { 652 try { 653 if(in == null) { 654 if(log.isErrorEnabled()) log.error("input stream is null !"); 655 break; 656 } 657 int len=in.readInt(); 658 byte[] buf=new byte[len]; 659 in.readFully(buf, 0, len); 660 updateLastAccessed(); 661 receive(peer_addr, buf, 0, len); // calls receiver.receive(msg) 662 } 663 catch(OutOfMemoryError mem_ex) { 664 if(log.isWarnEnabled()) log.warn("dropped invalid message, closing connection"); 665 break; // continue; 666 } 667 catch(IOException io_ex) { 668 //this is very common occurrence, hence log under trace level 669 if(log.isTraceEnabled()) log.trace("Exception while read blocked for data from peer ", io_ex); 670 notifyConnectionClosed(peer_addr); 671 break; 672 } 673 catch(Throwable e) { 674 if(log.isWarnEnabled()) log.warn("Problem encountered while receiving message from peer " + peer_addr, e); 675 } 676 } 677 if(log.isTraceEnabled()) 678 log.trace("ConnectionTable.Connection.Receiver terminated"); 679 receiverThread=null; 680 closeSocket(); 681 // remove(peer_addr); 682 } 683 684 toString()685 public String toString() { 686 StringBuilder ret=new StringBuilder(); 687 InetAddress local=null, remote=null; 688 String local_str, remote_str; 689 690 Socket tmp_sock=sock; 691 if(tmp_sock == null) 692 ret.append("<null socket>"); 693 else { 694 //since the sock variable gets set to null we want to make 695 //make sure we make it through here without a nullpointer exception 696 local=tmp_sock.getLocalAddress(); 697 remote=tmp_sock.getInetAddress(); 698 local_str=local != null ? Util.shortName(local) : "<null>"; 699 remote_str=remote != null ? Util.shortName(remote) : "<null>"; 700 ret.append('<' + local_str + ':' + tmp_sock.getLocalPort() + 701 " --> " + remote_str + ':' + tmp_sock.getPort() + "> (" + 702 ((System.currentTimeMillis() - last_access) / 1000) + " secs old)"); 703 } 704 tmp_sock=null; 705 706 return ret.toString(); 707 } 708 709 closeSocket()710 void closeSocket() { 711 Util.close(sock); // should actually close in/out (so we don't need to close them explicitly) 712 sock=null; 713 Util.close(out); // flushes data 714 // removed 4/22/2003 (request by Roland Kurmann) 715 // out=null; 716 Util.close(in); 717 } 718 719 720 class Sender implements Runnable { 721 Thread senderThread; 722 private boolean is_it_running=false; 723 start()724 void start() { 725 if(senderThread == null || !senderThread.isAlive()) { 726 senderThread=getThreadFactory().newThread(thread_group,this, "ConnectionTable.Connection.Sender local_addr=" + local_addr + " [" + getSockAddress() + "]"); 727 senderThread.setDaemon(true); 728 is_it_running=true; 729 senderThread.start(); 730 if(log.isTraceEnabled()) 731 log.trace("sender thread started: " + senderThread); 732 } 733 } 734 stop()735 void stop() { 736 is_it_running=false; 737 if(send_queue != null) 738 send_queue.clear(); 739 if(senderThread != null) { 740 Thread tmp=senderThread; 741 senderThread=null; 742 Util.interruptAndWaitToDie(tmp); 743 } 744 } 745 isRunning()746 boolean isRunning() { 747 return is_it_running && senderThread != null; 748 } 749 run()750 public void run() { 751 byte[] data; 752 while(senderThread != null && senderThread.equals(Thread.currentThread()) && is_it_running) { 753 try { 754 data=send_queue.take(); 755 if(data == null) 756 continue; 757 // we don't need to serialize access to 'out' as we're the only thread sending messages 758 _send(data, 0, data.length, false); 759 } 760 catch(InterruptedException e) { 761 ; 762 } 763 } 764 is_it_running=false; 765 if(log.isTraceEnabled()) 766 log.trace("ConnectionTable.Connection.Sender thread terminated"); 767 } 768 } 769 770 771 } 772 773 class Reaper implements Runnable { 774 Thread t=null; 775 Reaper()776 Reaper() { 777 ; 778 } 779 780 // return true if we have zero connections haveZeroConnections()781 private boolean haveZeroConnections() { 782 synchronized(conns) { 783 return conns.isEmpty(); 784 } 785 } 786 start()787 public void start() { 788 789 if(haveZeroConnections()) 790 return; 791 if(t != null && !t.isAlive()) 792 t=null; 793 if(t == null) { 794 //RKU 7.4.2003, put in threadgroup 795 t=getThreadFactory().newThread(thread_group, this, "ConnectionTable.ReaperThread"); 796 t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons 797 t.start(); 798 } 799 } 800 stop()801 public void stop() { 802 Thread tmp=t; 803 if(t != null) 804 t=null; 805 if(tmp != null) { 806 Util.interruptAndWaitToDie(tmp); 807 } 808 } 809 810 isRunning()811 public boolean isRunning() { 812 return t != null; 813 } 814 run()815 public void run() { 816 Connection connection; 817 Entry<Address,Connection> entry; 818 long curr_time; 819 820 if(log.isDebugEnabled()) log.debug("connection reaper thread was started. Number of connections=" + 821 conns.size() + ", reaper_interval=" + reaper_interval + ", conn_expire_time=" + 822 conn_expire_time); 823 824 while(!haveZeroConnections() && t != null && t.equals(Thread.currentThread())) { 825 Util.sleep(reaper_interval); 826 if(t == null || !Thread.currentThread().equals(t)) 827 break; 828 synchronized(conns) { 829 curr_time=System.currentTimeMillis(); 830 for(Iterator<Entry<Address,Connection>> it=conns.entrySet().iterator(); it.hasNext();) { 831 entry=it.next(); 832 connection=entry.getValue(); 833 if(log.isTraceEnabled()) log.trace("connection is " + 834 ((curr_time - connection.last_access) / 1000) + " seconds old (curr-time=" + 835 curr_time + ", last_access=" + connection.last_access + ')'); 836 if(connection.last_access + conn_expire_time < curr_time) { 837 if(log.isTraceEnabled()) log.trace("connection " + connection + 838 " has been idle for too long (conn_expire_time=" + conn_expire_time + 839 "), will be removed"); 840 connection.destroy(); 841 it.remove(); 842 } 843 } 844 } 845 } 846 if(log.isDebugEnabled()) log.debug("reaper terminated"); 847 t=null; 848 } 849 } 850 } 851