1 package org.jgroups.blocks;
2 
3 import org.jgroups.logging.Log;
4 import org.jgroups.logging.LogFactory;
5 import org.jgroups.Address;
6 import org.jgroups.Global;
7 import org.jgroups.Version;
8 import org.jgroups.stack.IpAddress;
9 import org.jgroups.util.*;
10 
11 import java.io.*;
12 import java.net.InetAddress;
13 import java.net.ServerSocket;
14 import java.net.Socket;
15 import java.net.SocketException;
16 import java.util.*;
17 import java.util.Map.Entry;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.atomic.AtomicInteger;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23 
24 /**
25  * Shared class for TCP connection tables.
26  * @author Scott Marlow
27  */
28 public abstract class BasicConnectionTable {
29     private ThreadFactory factory;
30     final Map<Address,Connection>  conns=new HashMap<Address,Connection>();         // keys: Addresses (peer address), values: Connection
31     Receiver              receiver=null;
32     boolean               use_send_queues=false;       // max number of messages in a send queue
33     int                   send_queue_size=10000;
34     InetAddress           bind_addr=null;
35     Address               local_addr=null;             // bind_addr + port of srv_sock
36     int                   srv_port=7800;
37     int                   recv_buf_size=120000;
38     int                   send_buf_size=60000;
39     final Vector<ConnectionListener>        conn_listeners=new Vector<ConnectionListener>(); // listeners to be notified when a conn is established/torn down
40     Reaper                reaper=null;                 // closes conns that have been idle for more than n secs
41     long                  reaper_interval=60000;       // reap unused conns once a minute
42     long                  conn_expire_time=300000;     // connections can be idle for 5 minutes before they are reaped
43     int                   sock_conn_timeout=1000;      // max time in millis to wait for Socket.connect() to return
44     int                   peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address
45     final ThreadGroup     thread_group=new ThreadGroup(Util.getGlobalThreadGroup(), "ConnectionTable");
46     protected final Log   log= LogFactory.getLog(getClass());
47     final byte[]          cookie={'b', 'e', 'l', 'a'};
48     boolean               use_reaper=false;            // by default we don't reap idle conns
49     static final int      backlog=20;                  // 20 conn requests are queued by ServerSocket (addtl will be discarded)
50     volatile ServerSocket srv_sock=null;
51     boolean               tcp_nodelay=false;
52     int                   linger=-1;
53     protected SocketFactory socket_factory=new DefaultSocketFactory();
54 
55    /**
56     * The address which will be broadcast to the group (the externally visible address which this host should
57     * be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
58     */
59     InetAddress		    external_addr=null;
60     int                 max_port=0;                   // maximum port to bind to (if < srv_port, no limit)
61     Thread              acceptor=null;               // continuously calls srv_sock.accept()
62     boolean             running=false;
63     /** Total number of Connections created for this connection table */
64     static AtomicInteger conn_creations=new AtomicInteger(0);
65 
66     final static long   MAX_JOIN_TIMEOUT=Global.THREAD_SHUTDOWN_WAIT_TIME;
67 
68 
69 
BasicConnectionTable()70     protected BasicConnectionTable() {
71         factory = new DefaultThreadFactory(new ThreadGroup(Util.getGlobalThreadGroup(),"ConnectionTable"),"Connection Table", false);
72     }
73 
setReceiver(Receiver r)74     public final void setReceiver(Receiver r) {
75         receiver=r;
76     }
77 
addConnectionListener(ConnectionListener l)78     public void addConnectionListener(ConnectionListener l) {
79         if(l != null && !conn_listeners.contains(l))
80             conn_listeners.addElement(l);
81     }
82 
removeConnectionListener(ConnectionListener l)83     public void removeConnectionListener(ConnectionListener l) {
84         if(l != null) conn_listeners.removeElement(l);
85     }
86 
getLocalAddress()87     public Address getLocalAddress() {
88         if(local_addr == null)
89             local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null;
90         return local_addr;
91     }
92 
getSendBufferSize()93     public int getSendBufferSize() {
94         return send_buf_size;
95     }
96 
setSendBufferSize(int send_buf_size)97     public void setSendBufferSize(int send_buf_size) {
98         this.send_buf_size=send_buf_size;
99     }
100 
getReceiveBufferSize()101     public int getReceiveBufferSize() {
102         return recv_buf_size;
103     }
104 
setReceiveBufferSize(int recv_buf_size)105     public void setReceiveBufferSize(int recv_buf_size) {
106         this.recv_buf_size=recv_buf_size;
107     }
108 
getSocketConnectionTimeout()109     public int getSocketConnectionTimeout() {
110         return sock_conn_timeout;
111     }
112 
setSocketConnectionTimeout(int sock_conn_timeout)113     public void setSocketConnectionTimeout(int sock_conn_timeout) {
114         this.sock_conn_timeout=sock_conn_timeout;
115     }
116 
getPeerAddressReadTimeout()117     public int getPeerAddressReadTimeout() {
118         return peer_addr_read_timeout;
119     }
120 
setPeerAddressReadTimeout(int peer_addr_read_timeout)121     public void setPeerAddressReadTimeout(int peer_addr_read_timeout) {
122         this.peer_addr_read_timeout=peer_addr_read_timeout;
123     }
124 
getNumConnections()125     public int getNumConnections() {
126         return conns.size();
127     }
128 
getNumberOfConnectionCreations()129     public static int getNumberOfConnectionCreations() {
130         return conn_creations.intValue();
131     }
132 
getTcpNodelay()133     public boolean getTcpNodelay() {
134         return tcp_nodelay;
135     }
136 
setTcpNodelay(boolean tcp_nodelay)137     public void setTcpNodelay(boolean tcp_nodelay) {
138         this.tcp_nodelay=tcp_nodelay;
139     }
140 
getLinger()141     public int getLinger() {
142         return linger;
143     }
144 
setLinger(int linger)145     public void setLinger(int linger) {
146         this.linger=linger;
147     }
148 
setThreadFactory(ThreadFactory factory)149     public void setThreadFactory(ThreadFactory factory){
150         this.factory = factory;
151     }
152 
getThreadFactory()153     public ThreadFactory getThreadFactory(){
154         return factory;
155     }
156 
getSocketFactory()157     public SocketFactory getSocketFactory() {
158         return socket_factory;
159     }
160 
setSocketFactory(SocketFactory socket_factory)161     public void setSocketFactory(SocketFactory socket_factory) {
162         this.socket_factory=socket_factory;
163     }
164 
getUseSendQueues()165     public boolean getUseSendQueues() {return use_send_queues;}
166 
setUseSendQueues(boolean flag)167     public void setUseSendQueues(boolean flag) {this.use_send_queues=flag;}
168 
getSendQueueSize()169     public int getSendQueueSize() {
170         return send_queue_size;
171     }
172 
setSendQueueSize(int send_queue_size)173     public void setSendQueueSize(int send_queue_size) {
174         this.send_queue_size=send_queue_size;
175     }
176 
start()177     public void start() throws Exception {
178         running=true;
179     }
180 
stop()181     public void stop() {
182         running=false;
183 
184         // 1. Stop the reaper
185         if(reaper != null)
186             reaper.stop();
187 
188         // 2. close the server socket (this also stops the acceptor thread)
189         if(srv_sock != null) {
190             try {
191                 ServerSocket tmp=srv_sock;
192                 srv_sock=null;
193                 socket_factory.close(tmp);
194                 if(acceptor != null)
195                     Util.interruptAndWaitToDie(acceptor);
196             }
197             catch(Exception e) {
198             }
199         }
200 
201         // 3. then close the connections
202         Collection<Connection> connsCopy=null;
203         synchronized(conns) {
204             connsCopy=new LinkedList<Connection>(conns.values());
205             conns.clear();
206         }
207         for(Connection conn:connsCopy) {
208             conn.destroy();
209         }
210         connsCopy.clear();
211         local_addr=null;
212     }
213 
214     /**
215      Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected.
216      */
removeConnection(Address addr)217     public void removeConnection(Address addr) {
218         Connection conn;
219 
220        synchronized(conns) {
221            conn=conns.remove(addr);
222        }
223 
224        if(conn != null) {
225            try {
226                conn.destroy();  // won't do anything if already destroyed
227            }
228            catch(Exception e) {
229            }
230        }
231        if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString());
232    }
233 
234    /**
235     * Calls the receiver callback. We do not serialize access to this method, and it may be called concurrently
236     * by several Connection handler threads. Therefore the receiver needs to be reentrant.
237     */
receive(Address sender, byte[] data, int offset, int length)238    public void receive(Address sender, byte[] data, int offset, int length) {
239        if(receiver != null) {
240            receiver.receive(sender, data, offset, length);
241        }
242        else
243            if(log.isErrorEnabled()) log.error("receiver is null (not set) !");
244    }
245 
toString()246    public String toString() {
247        StringBuilder ret=new StringBuilder();
248        Address key;
249        Connection val;
250        Entry<Address,Connection> entry;
251        HashMap<Address,Connection> copy;
252 
253        synchronized(conns) {
254            copy=new HashMap<Address,Connection>(conns);
255        }
256        ret.append("local_addr=" + local_addr).append("\n");
257        ret.append("connections (" + copy.size() + "):\n");
258        for(Iterator<Entry<Address,Connection>> it=copy.entrySet().iterator(); it.hasNext();) {
259            entry=it.next();
260            key=entry.getKey();
261            val=entry.getValue();
262            ret.append(key + ": " + val + '\n');
263        }
264        ret.append('\n');
265        return ret.toString();
266    }
267 
notifyConnectionOpened(Address peer)268    void notifyConnectionOpened(Address peer) {
269        if(peer == null) return;
270        for(int i=0; i < conn_listeners.size(); i++)
271            conn_listeners.elementAt(i).connectionOpened(peer);
272    }
273 
notifyConnectionClosed(Address peer)274    void notifyConnectionClosed(Address peer) {
275        if(peer == null) return;
276        for(int i=0; i < conn_listeners.size(); i++)
277            conn_listeners.elementAt(i).connectionClosed(peer);
278    }
279 
addConnection(Address peer, Connection c)280    void addConnection(Address peer, Connection c) {
281        synchronized (conns) {
282            conns.put(peer, c);
283        }
284        if(reaper != null && !reaper.isRunning())
285            reaper.start();
286    }
287 
send(Address dest, byte[] data, int offset, int length)288    public void send(Address dest, byte[] data, int offset, int length) throws Exception {
289        Connection conn;
290        if(dest == null) {
291            if(log.isErrorEnabled())
292                log.error("destination is null");
293            return;
294        }
295 
296        if(data == null) {
297            log.warn("data is null; discarding packet");
298            return;
299        }
300 
301        if(!running) {
302            if(log.isWarnEnabled())
303                log.warn("connection table is not running, discarding message to " + dest);
304            return;
305        }
306 
307        if(dest.equals(local_addr)) {
308            receive(local_addr, data, offset, length);
309            return;
310        }
311 
312        // 1. Try to obtain correct Connection (or create one if not yet existent)
313        try {
314            conn=getConnection(dest);
315            if(conn == null) return;
316        }
317        catch(Throwable ex) {
318            throw new Exception("connection to " + dest + " could not be established", ex);
319        }
320 
321        // 2. Send the message using that connection
322        try {
323            conn.send(data, offset, length);
324        }
325        catch(Throwable ex) {
326            if(log.isTraceEnabled())
327                log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table", ex);
328            removeConnection(dest);
329        }
330    }
331 
getConnection(Address dest)332    abstract Connection getConnection(Address dest) throws Exception;
333 
334       /**
335        * Removes all connections from ConnectionTable which are not in current_mbrs
336        * @param current_mbrs
337        */
retainAll(Collection<Address> current_mbrs)338       public void retainAll(Collection<Address> current_mbrs) {
339           if(current_mbrs == null) return;
340           HashMap<Address,Connection> copy;
341           synchronized(conns) {
342               copy=new HashMap<Address,Connection>(conns);
343               conns.keySet().retainAll(current_mbrs);
344           }
345           copy.keySet().removeAll(current_mbrs);
346 
347           //destroy orphaned connection i.e. connections
348           //to members that are not in current view
349           for(Connection orphanConnection:copy.values()){
350               if (log.isTraceEnabled())
351                 log.trace("At " + local_addr + " destroying orphan to "
352                         + orphanConnection.getPeerAddress());
353               orphanConnection.destroy();
354           }
355           copy.clear();
356       }
357 
358 
359 
360     /** Used for message reception. */
361     public interface Receiver {
receive(Address sender, byte[] data, int offset, int length)362        void receive(Address sender, byte[] data, int offset, int length);
363    }
364 
365    /** Used to be notified about connection establishment and teardown. */
366    public interface ConnectionListener {
connectionOpened(Address peer_addr)367        void connectionOpened(Address peer_addr);
connectionClosed(Address peer_addr)368        void connectionClosed(Address peer_addr);
369    }
370 
371    class Connection implements Runnable {
372        Socket           sock=null;                // socket to/from peer (result of srv_sock.accept() or new Socket())
373        String           sock_addr=null;           // used for Thread.getName()
374        DataOutputStream out=null;                 // for sending messages
375        DataInputStream  in=null;                  // for receiving messages
376        Thread           receiverThread=null;      // thread for receiving messages
377        Address          peer_addr=null;           // address of the 'other end' of the connection
378        final Lock       send_lock=new ReentrantLock();  // serialize send()
379        long             last_access=System.currentTimeMillis(); // last time a message was sent or received
380 
381        /** Bounded queue of data to be sent to the peer of this connection */
382        BlockingQueue<byte[]> send_queue=null;
383        Sender                sender=null;
384        boolean               is_running=false;
385 
386 
getSockAddress()387        private String getSockAddress() {
388            if(sock_addr != null)
389                return sock_addr;
390            if(sock != null) {
391                StringBuilder sb;
392                sb=new StringBuilder();
393                sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort());
394                sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort());
395                sock_addr=sb.toString();
396            }
397            return sock_addr;
398        }
399 
400 
401 
402 
Connection(Socket s, Address peer_addr)403        Connection(Socket s, Address peer_addr) {
404            sock=s;
405            this.peer_addr=peer_addr;
406 
407            if(use_send_queues) {
408                send_queue=new LinkedBlockingQueue<byte[]>(send_queue_size);
409                sender=new Sender();
410            }
411 
412            try {
413                // out=new DataOutputStream(sock.getOutputStream());
414                // in=new DataInputStream(sock.getInputStream());
415 
416                // The change to buffered input and output stream yielded a 400% performance gain !
417                // bela Sept 7 2006
418                out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
419                in=new DataInputStream(new BufferedInputStream(sock.getInputStream()));
420                if(sender != null)
421                    sender.start();
422                conn_creations.incrementAndGet();
423            }
424            catch(Exception ex) {
425                if(log.isErrorEnabled()) log.error("exception is " + ex);
426            }
427        }
428 
429 
established()430        boolean established() {
431            return receiverThread != null;
432        }
433 
434 
setPeerAddress(Address peer_addr)435        void setPeerAddress(Address peer_addr) {
436            this.peer_addr=peer_addr;
437        }
438 
getPeerAddress()439        Address getPeerAddress() {return peer_addr;}
440 
updateLastAccessed()441        void updateLastAccessed() {
442            last_access=System.currentTimeMillis();
443        }
444 
init()445        void init() {
446            is_running=true;
447            if(receiverThread == null || !receiverThread.isAlive()) {
448                // Roland Kurmann 4/7/2003, put in thread_group
449                receiverThread=getThreadFactory().newThread(thread_group,this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]");
450                receiverThread.start();
451                if(log.isTraceEnabled())
452                    log.trace("receiver started: " + receiverThread);
453            }
454 
455        }
456 
457        /**
458          * Returns true if underlying socket to peer is closed
459          *
460          * @return
461          */
isSocketClosed()462         boolean isSocketClosed() {
463             return !(sock != null && sock.isConnected());
464         }
465 
466 
destroy()467        void destroy() {
468        	   if(log.isTraceEnabled()) log.trace("destroyed " + this);
469            is_running=false;
470            closeSocket(); // should terminate handler as well
471            if(sender != null)
472                sender.stop();
473            Thread tmp=receiverThread;
474            receiverThread=null;
475            if(tmp != null) {
476                Util.interruptAndWaitToDie(tmp);
477            }
478 
479            conn_creations.decrementAndGet();
480        }
481 
482 
483        /**
484         *
485         * @param data Guaranteed to be non null
486         * @param offset
487         * @param length
488         */
send(byte[] data, int offset, int length)489        void send(byte[] data, int offset, int length) {
490            if(!is_running) {
491                if(log.isWarnEnabled())
492                    log.warn("Connection is not running, discarding message");
493                return;
494            }
495            if(use_send_queues) {
496                try {
497                    // we need to copy the byte[] buffer here because the original buffer might get changed meanwhile
498                    byte[] tmp=new byte[length];
499                    System.arraycopy(data, offset, tmp, 0, length);
500                    send_queue.put(tmp);
501                }
502                catch(InterruptedException e) {
503                    Thread.currentThread().interrupt();
504                }
505            }
506            else
507                _send(data, offset, length, true);
508        }
509 
510 
511        /**
512         * Sends data using the 'out' output stream of the socket
513         * @param data
514         * @param offset
515         * @param length
516         * @param acquire_lock
517         */
_send(byte[] data, int offset, int length, boolean acquire_lock)518        private void _send(byte[] data, int offset, int length, boolean acquire_lock) {
519            if(acquire_lock)
520                send_lock.lock();
521 
522            try {
523                doSend(data, offset, length);
524                updateLastAccessed();
525            }
526            catch(InterruptedException iex) {
527                Thread.currentThread().interrupt(); // set interrupt flag again
528            }
529            catch(Throwable ex) {
530                if(log.isErrorEnabled()) log.error("failed sending data to " + peer_addr + ": " + ex);
531            }
532            finally {
533                if(acquire_lock)
534                    send_lock.unlock();
535            }
536        }
537 
538 
doSend(byte[] data, int offset, int length)539        void doSend(byte[] data, int offset, int length) throws Exception {
540            try {
541                // we're using 'double-writes', sending the buffer to the destination in 2 pieces. this would
542                // ensure that, if the peer closed the connection while we were idle, we would get an exception.
543                // this won't happen if we use a single write (see Stevens, ch. 5.13).
544                if(out != null) {
545                    out.writeInt(length); // write the length of the data buffer first
546                    Util.doubleWrite(data, offset, length, out);
547                    out.flush();  // may not be very efficient (but safe)
548                }
549            }
550            catch(Exception ex) {
551                removeConnection(peer_addr);
552                throw ex;
553            }
554        }
555 
556 
557        /**
558         * Reads the peer's address. First a cookie has to be sent which has to match my own cookie, otherwise
559         * the connection will be refused
560         */
readPeerAddress(Socket client_sock)561        Address readPeerAddress(Socket client_sock) throws Exception {
562            Address     client_peer_addr=null;
563            byte[]      input_cookie=new byte[cookie.length];
564            int         client_port=client_sock != null? client_sock.getPort() : 0;
565            short       version;
566            InetAddress client_addr=client_sock != null? client_sock.getInetAddress() : null;
567 
568            int timeout=client_sock.getSoTimeout();
569            client_sock.setSoTimeout(peer_addr_read_timeout);
570 
571            try {
572 
573                if(in != null) {
574                    initCookie(input_cookie);
575 
576                    // read the cookie first
577                    in.readFully(input_cookie, 0, input_cookie.length);
578                    if(!matchCookie(input_cookie))
579                        throw new SocketException("ConnectionTable.Connection.readPeerAddress(): cookie sent by " +
580                                client_peer_addr + " does not match own cookie; terminating connection");
581                    // then read the version
582                    version=in.readShort();
583 
584                    if(Version.isBinaryCompatible(version) == false) {
585                        if(log.isWarnEnabled())
586                            log.warn(new StringBuilder("packet from ").append(client_addr).append(':').append(client_port).
587                                    append(" has different version (").append(Version.print(version)).append(") from ours (").
588                                    append(Version.printVersion()).append("). This may cause problems").toString());
589                    }
590                    client_peer_addr=new IpAddress();
591                    client_peer_addr.readFrom(in);
592 
593                    updateLastAccessed();
594                }
595                return client_peer_addr;
596            }
597            finally {
598                client_sock.setSoTimeout(timeout);
599            }
600        }
601 
602 
603        /**
604         * Send the cookie first, then the our port number. If the cookie doesn't match the receiver's cookie,
605         * the receiver will reject the connection and close it.
606         */
sendLocalAddress(Address local_addr)607        void sendLocalAddress(Address local_addr) {
608            if(local_addr == null) {
609                if(log.isWarnEnabled()) log.warn("local_addr is null");
610                return;
611            }
612            if(out != null) {
613                try {
614                    // write the cookie
615                    out.write(cookie, 0, cookie.length);
616 
617                    // write the version
618                    out.writeShort(Version.version);
619                    local_addr.writeTo(out);
620                    out.flush(); // needed ?
621                    updateLastAccessed();
622                }
623                catch(Throwable t) {
624                    if(log.isErrorEnabled()) log.error("exception is " + t);
625                }
626            }
627        }
628 
629 
initCookie(byte[] c)630        void initCookie(byte[] c) {
631            if(c != null)
632                for(int i=0; i < c.length; i++)
633                    c[i]=0;
634        }
635 
matchCookie(byte[] input)636        boolean matchCookie(byte[] input) {
637            if(input == null || input.length < cookie.length) return false;
638            for(int i=0; i < cookie.length; i++)
639                if(cookie[i] != input[i]) return false;
640            return true;
641        }
642 
643 
printCookie(byte[] c)644        String printCookie(byte[] c) {
645            if(c == null) return "";
646            return new String(c);
647        }
648 
649 
run()650        public void run() {
651            while(receiverThread != null && receiverThread.equals(Thread.currentThread()) && is_running) {
652                try {
653                    if(in == null) {
654                        if(log.isErrorEnabled()) log.error("input stream is null !");
655                        break;
656                    }
657                    int len=in.readInt();
658                    byte[] buf=new byte[len];
659                    in.readFully(buf, 0, len);
660                    updateLastAccessed();
661                    receive(peer_addr, buf, 0, len); // calls receiver.receive(msg)
662                }
663                catch(OutOfMemoryError mem_ex) {
664                    if(log.isWarnEnabled()) log.warn("dropped invalid message, closing connection");
665                    break; // continue;
666                }
667                catch(IOException io_ex) {
668                    //this is very common occurrence, hence log under trace level
669                    if(log.isTraceEnabled()) log.trace("Exception while read blocked for data from peer ", io_ex);
670                    notifyConnectionClosed(peer_addr);
671                    break;
672                }
673                catch(Throwable e) {
674                    if(log.isWarnEnabled()) log.warn("Problem encountered while receiving message from peer " + peer_addr, e);
675                }
676            }
677            if(log.isTraceEnabled())
678                log.trace("ConnectionTable.Connection.Receiver terminated");
679            receiverThread=null;
680            closeSocket();
681            // remove(peer_addr);
682        }
683 
684 
toString()685        public String toString() {
686            StringBuilder ret=new StringBuilder();
687            InetAddress local=null, remote=null;
688            String local_str, remote_str;
689 
690            Socket tmp_sock=sock;
691            if(tmp_sock == null)
692                ret.append("<null socket>");
693            else {
694                //since the sock variable gets set to null we want to make
695                //make sure we make it through here without a nullpointer exception
696                local=tmp_sock.getLocalAddress();
697                remote=tmp_sock.getInetAddress();
698                local_str=local != null ? Util.shortName(local) : "<null>";
699                remote_str=remote != null ? Util.shortName(remote) : "<null>";
700                ret.append('<' + local_str + ':' + tmp_sock.getLocalPort() +
701                           " --> " + remote_str + ':' + tmp_sock.getPort() + "> (" +
702                           ((System.currentTimeMillis() - last_access) / 1000) + " secs old)");
703            }
704            tmp_sock=null;
705 
706            return ret.toString();
707        }
708 
709 
closeSocket()710        void closeSocket() {
711            Util.close(sock); // should actually close in/out (so we don't need to close them explicitly)
712            sock=null;
713            Util.close(out);  // flushes data
714            // removed 4/22/2003 (request by Roland Kurmann)
715            // out=null;
716            Util.close(in);
717        }
718 
719 
720        class Sender implements Runnable {
721            Thread senderThread;
722            private boolean is_it_running=false;
723 
start()724            void start() {
725                if(senderThread == null || !senderThread.isAlive()) {
726                    senderThread=getThreadFactory().newThread(thread_group,this, "ConnectionTable.Connection.Sender local_addr=" + local_addr + " [" + getSockAddress() + "]");
727                    senderThread.setDaemon(true);
728                    is_it_running=true;
729                    senderThread.start();
730                    if(log.isTraceEnabled())
731                        log.trace("sender thread started: " + senderThread);
732                }
733            }
734 
stop()735            void stop() {
736                is_it_running=false;
737                if(send_queue != null)
738                    send_queue.clear();
739                if(senderThread != null) {
740                    Thread tmp=senderThread;
741                    senderThread=null;
742                    Util.interruptAndWaitToDie(tmp);
743                }
744            }
745 
isRunning()746            boolean isRunning() {
747                return is_it_running && senderThread != null;
748            }
749 
run()750            public void run() {
751                byte[] data;
752                while(senderThread != null && senderThread.equals(Thread.currentThread()) && is_it_running) {
753                    try {
754                        data=send_queue.take();
755                        if(data == null)
756                            continue;
757                        // we don't need to serialize access to 'out' as we're the only thread sending messages
758                        _send(data, 0, data.length, false);
759                    }
760                    catch(InterruptedException e) {
761                        ;
762                    }
763                }
764                is_it_running=false;
765                if(log.isTraceEnabled())
766                    log.trace("ConnectionTable.Connection.Sender thread terminated");
767            }
768        }
769 
770 
771    }
772 
773    class Reaper implements Runnable {
774        Thread t=null;
775 
Reaper()776        Reaper() {
777            ;
778        }
779 
780        // return true if we have zero connections
haveZeroConnections()781        private boolean haveZeroConnections() {
782            synchronized(conns) {
783                return conns.isEmpty();
784            }
785        }
786 
start()787        public void start() {
788 
789            if(haveZeroConnections())
790                return;
791            if(t != null && !t.isAlive())
792                t=null;
793            if(t == null) {
794                //RKU 7.4.2003, put in threadgroup
795                t=getThreadFactory().newThread(thread_group, this, "ConnectionTable.ReaperThread");
796                t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons
797                t.start();
798            }
799        }
800 
stop()801        public void stop() {
802            Thread tmp=t;
803            if(t != null)
804                t=null;
805            if(tmp != null) {
806                Util.interruptAndWaitToDie(tmp);
807            }
808        }
809 
810 
isRunning()811        public boolean isRunning() {
812            return t != null;
813        }
814 
run()815        public void run() {
816            Connection connection;
817            Entry<Address,Connection> entry;
818            long curr_time;
819 
820            if(log.isDebugEnabled()) log.debug("connection reaper thread was started. Number of connections=" +
821                    conns.size() + ", reaper_interval=" + reaper_interval + ", conn_expire_time=" +
822                    conn_expire_time);
823 
824            while(!haveZeroConnections() && t != null && t.equals(Thread.currentThread())) {
825                Util.sleep(reaper_interval);
826                if(t == null || !Thread.currentThread().equals(t))
827                    break;
828                synchronized(conns) {
829                    curr_time=System.currentTimeMillis();
830                    for(Iterator<Entry<Address,Connection>> it=conns.entrySet().iterator(); it.hasNext();) {
831                        entry=it.next();
832                        connection=entry.getValue();
833                        if(log.isTraceEnabled()) log.trace("connection is " +
834                                                         ((curr_time - connection.last_access) / 1000) + " seconds old (curr-time=" +
835                                                         curr_time + ", last_access=" + connection.last_access + ')');
836                        if(connection.last_access + conn_expire_time < curr_time) {
837                            if(log.isTraceEnabled()) log.trace("connection " + connection +
838                                                             " has been idle for too long (conn_expire_time=" + conn_expire_time +
839                                                             "), will be removed");
840                            connection.destroy();
841                            it.remove();
842                        }
843                    }
844                }
845            }
846            if(log.isDebugEnabled()) log.debug("reaper terminated");
847            t=null;
848        }
849    }
850 }
851