1 // This file is part of OpenTSDB.
2 // Copyright (C) 2010-2012  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.tsd;
14 
15 import java.io.IOException;
16 import java.nio.channels.ClosedChannelException;
17 import java.util.concurrent.atomic.AtomicInteger;
18 import java.util.concurrent.atomic.AtomicLong;
19 
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelException;
25 import org.jboss.netty.channel.ChannelHandlerContext;
26 import org.jboss.netty.channel.ChannelStateEvent;
27 import org.jboss.netty.channel.ExceptionEvent;
28 import org.jboss.netty.channel.SimpleChannelHandler;
29 import org.jboss.netty.channel.group.DefaultChannelGroup;
30 import org.jboss.netty.handler.codec.embedder.CodecEmbedderException;
31 
32 import net.opentsdb.stats.StatsCollector;
33 
34 /**
35  * Keeps track of all existing connections.
36  */
37 final class ConnectionManager extends SimpleChannelHandler {
38 
39   private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class);
40 
41   private static final AtomicLong connections_established = new AtomicLong();
42   private static final AtomicLong connections_rejected = new AtomicLong();
43   private static final AtomicLong exceptions_unknown = new AtomicLong();
44   private static final AtomicLong exceptions_closed = new AtomicLong();
45   private static final AtomicLong exceptions_reset = new AtomicLong();
46   private static final AtomicLong exceptions_timeout = new AtomicLong();
47 
48   /** Max connections can be serviced by tsd, if over limit, tsd will refuse
49    * new connections. */
50   private final int connections_limit;
51 
52   /** A counter used for determining how many channels are open. Something odd
53    * happens with the DefaultChannelGroup in that .size() doesn't return the
54    * actual number of open connections. TODO - find out why. */
55   private final AtomicInteger open_connections = new AtomicInteger();
56 
57   private static final DefaultChannelGroup channels =
58     new DefaultChannelGroup("all-channels");
59 
closeAllConnections()60   static void closeAllConnections() {
61     channels.close().awaitUninterruptibly();
62   }
63 
64   /**
65    * Default Ctor with no concurrent connection limit.
66    */
ConnectionManager()67   public ConnectionManager() {
68     connections_limit = 0;
69   }
70 
71   /**
72    * CTor for setting a limit on concurrent connections.
73    * @param connections_limit The maximum number of concurrent connections allowed.
74    * @since 2.3
75    */
ConnectionManager(final int connections_limit)76   public ConnectionManager(final int connections_limit) {
77     LOG.info("TSD concurrent connection limit set to: " + connections_limit);
78     this.connections_limit = connections_limit;
79   }
80 
81   /**
82    * Collects the stats and metrics tracked by this instance.
83    * @param collector The collector to use.
84    */
collectStats(final StatsCollector collector)85   public static void collectStats(final StatsCollector collector) {
86     collector.record("connectionmgr.connections", channels.size(), "type=open");
87     collector.record("connectionmgr.connections", connections_rejected,
88         "type=rejected");
89     collector.record("connectionmgr.connections", connections_established,
90         "type=total");
91     collector.record("connectionmgr.exceptions", exceptions_closed,
92         "type=closed");
93     collector.record("connectionmgr.exceptions", exceptions_reset,
94         "type=reset");
95     collector.record("connectionmgr.exceptions", exceptions_timeout,
96         "type=timeout");
97     collector.record("connectionmgr.exceptions", exceptions_unknown,
98         "type=unknown");
99   }
100 
101   @Override
channelOpen(final ChannelHandlerContext ctx, final ChannelStateEvent e)102   public void channelOpen(final ChannelHandlerContext ctx,
103                           final ChannelStateEvent e) throws IOException {
104     if (connections_limit > 0) {
105       final int channel_size = open_connections.incrementAndGet();
106       if (channel_size > connections_limit) {
107         throw new ConnectionRefusedException("Channel size (" + channel_size + ") exceeds total "
108             + "connection limit (" + connections_limit + ")");
109         // exceptionCaught will close the connection and increment the counter.
110       }
111     }
112     channels.add(e.getChannel());
113     connections_established.incrementAndGet();
114   }
115 
116   @Override
channelClosed(final ChannelHandlerContext ctx, final ChannelStateEvent e)117   public void channelClosed(final ChannelHandlerContext ctx,
118                           final ChannelStateEvent e) throws IOException {
119     open_connections.decrementAndGet();
120   }
121 
122   @Override
handleUpstream(final ChannelHandlerContext ctx, final ChannelEvent e)123   public void handleUpstream(final ChannelHandlerContext ctx,
124                              final ChannelEvent e) throws Exception {
125     if (e instanceof ChannelStateEvent) {
126       LOG.info(e.toString());
127     }
128     super.handleUpstream(ctx, e);
129   }
130 
131   @Override
exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e)132   public void exceptionCaught(final ChannelHandlerContext ctx,
133                               final ExceptionEvent e) {
134     final Throwable cause = e.getCause();
135     final Channel chan = ctx.getChannel();
136     if (cause instanceof ClosedChannelException) {
137       exceptions_closed.incrementAndGet();
138       LOG.warn("Attempt to write to closed channel " + chan);
139       return;
140     }
141     if (cause instanceof IOException) {
142       final String message = cause.getMessage();
143       if ("Connection reset by peer".equals(message)) {
144         exceptions_reset.incrementAndGet();
145         return;
146       } else if ("Connection timed out".equals(message)) {
147         exceptions_timeout.incrementAndGet();
148         // Do nothing.  A client disconnecting isn't really our problem.  Oh,
149         // and I'm not kidding you, there's no better way to detect ECONNRESET
150         // in Java.  Like, people have been bitching about errno for years,
151         // and Java managed to do something *far* worse.  That's quite a feat.
152         return;
153       } else if (cause instanceof ConnectionRefusedException) {
154         connections_rejected.incrementAndGet();
155         if (LOG.isDebugEnabled()) {
156           LOG.debug("Refusing connection from " + chan, e.getCause());
157         }
158         chan.close();
159         return;
160       }
161     }
162     if (cause instanceof CodecEmbedderException) {
163     	// payload was not compressed as it was announced to be
164     	LOG.warn("Http codec error : " + cause.getMessage());
165     	e.getChannel().close();
166     	return;
167     }
168     exceptions_unknown.incrementAndGet();
169     LOG.error("Unexpected exception from downstream for " + chan, cause);
170     e.getChannel().close();
171   }
172 
173   /** Simple exception for refusing a connection. */
174   private static class ConnectionRefusedException extends ChannelException {
175 
176     /**
177      * Default ctor with a message.
178      * @param message A descriptive message for the exception.
179      */
ConnectionRefusedException(final String message)180     public ConnectionRefusedException(final String message) {
181       super(message);
182     }
183 
184     private static final long serialVersionUID = 5348377149312597939L;
185   }
186 }
187