1 // This file is part of OpenTSDB. 2 // Copyright (C) 2010-2012 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.tsd; 14 15 import java.io.IOException; 16 import java.nio.channels.ClosedChannelException; 17 import java.util.concurrent.atomic.AtomicInteger; 18 import java.util.concurrent.atomic.AtomicLong; 19 20 import org.slf4j.Logger; 21 import org.slf4j.LoggerFactory; 22 import org.jboss.netty.channel.Channel; 23 import org.jboss.netty.channel.ChannelEvent; 24 import org.jboss.netty.channel.ChannelException; 25 import org.jboss.netty.channel.ChannelHandlerContext; 26 import org.jboss.netty.channel.ChannelStateEvent; 27 import org.jboss.netty.channel.ExceptionEvent; 28 import org.jboss.netty.channel.SimpleChannelHandler; 29 import org.jboss.netty.channel.group.DefaultChannelGroup; 30 import org.jboss.netty.handler.codec.embedder.CodecEmbedderException; 31 32 import net.opentsdb.stats.StatsCollector; 33 34 /** 35 * Keeps track of all existing connections. 36 */ 37 final class ConnectionManager extends SimpleChannelHandler { 38 39 private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class); 40 41 private static final AtomicLong connections_established = new AtomicLong(); 42 private static final AtomicLong connections_rejected = new AtomicLong(); 43 private static final AtomicLong exceptions_unknown = new AtomicLong(); 44 private static final AtomicLong exceptions_closed = new AtomicLong(); 45 private static final AtomicLong exceptions_reset = new AtomicLong(); 46 private static final AtomicLong exceptions_timeout = new AtomicLong(); 47 48 /** Max connections can be serviced by tsd, if over limit, tsd will refuse 49 * new connections. */ 50 private final int connections_limit; 51 52 /** A counter used for determining how many channels are open. Something odd 53 * happens with the DefaultChannelGroup in that .size() doesn't return the 54 * actual number of open connections. TODO - find out why. */ 55 private final AtomicInteger open_connections = new AtomicInteger(); 56 57 private static final DefaultChannelGroup channels = 58 new DefaultChannelGroup("all-channels"); 59 closeAllConnections()60 static void closeAllConnections() { 61 channels.close().awaitUninterruptibly(); 62 } 63 64 /** 65 * Default Ctor with no concurrent connection limit. 66 */ ConnectionManager()67 public ConnectionManager() { 68 connections_limit = 0; 69 } 70 71 /** 72 * CTor for setting a limit on concurrent connections. 73 * @param connections_limit The maximum number of concurrent connections allowed. 74 * @since 2.3 75 */ ConnectionManager(final int connections_limit)76 public ConnectionManager(final int connections_limit) { 77 LOG.info("TSD concurrent connection limit set to: " + connections_limit); 78 this.connections_limit = connections_limit; 79 } 80 81 /** 82 * Collects the stats and metrics tracked by this instance. 83 * @param collector The collector to use. 84 */ collectStats(final StatsCollector collector)85 public static void collectStats(final StatsCollector collector) { 86 collector.record("connectionmgr.connections", channels.size(), "type=open"); 87 collector.record("connectionmgr.connections", connections_rejected, 88 "type=rejected"); 89 collector.record("connectionmgr.connections", connections_established, 90 "type=total"); 91 collector.record("connectionmgr.exceptions", exceptions_closed, 92 "type=closed"); 93 collector.record("connectionmgr.exceptions", exceptions_reset, 94 "type=reset"); 95 collector.record("connectionmgr.exceptions", exceptions_timeout, 96 "type=timeout"); 97 collector.record("connectionmgr.exceptions", exceptions_unknown, 98 "type=unknown"); 99 } 100 101 @Override channelOpen(final ChannelHandlerContext ctx, final ChannelStateEvent e)102 public void channelOpen(final ChannelHandlerContext ctx, 103 final ChannelStateEvent e) throws IOException { 104 if (connections_limit > 0) { 105 final int channel_size = open_connections.incrementAndGet(); 106 if (channel_size > connections_limit) { 107 throw new ConnectionRefusedException("Channel size (" + channel_size + ") exceeds total " 108 + "connection limit (" + connections_limit + ")"); 109 // exceptionCaught will close the connection and increment the counter. 110 } 111 } 112 channels.add(e.getChannel()); 113 connections_established.incrementAndGet(); 114 } 115 116 @Override channelClosed(final ChannelHandlerContext ctx, final ChannelStateEvent e)117 public void channelClosed(final ChannelHandlerContext ctx, 118 final ChannelStateEvent e) throws IOException { 119 open_connections.decrementAndGet(); 120 } 121 122 @Override handleUpstream(final ChannelHandlerContext ctx, final ChannelEvent e)123 public void handleUpstream(final ChannelHandlerContext ctx, 124 final ChannelEvent e) throws Exception { 125 if (e instanceof ChannelStateEvent) { 126 LOG.info(e.toString()); 127 } 128 super.handleUpstream(ctx, e); 129 } 130 131 @Override exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e)132 public void exceptionCaught(final ChannelHandlerContext ctx, 133 final ExceptionEvent e) { 134 final Throwable cause = e.getCause(); 135 final Channel chan = ctx.getChannel(); 136 if (cause instanceof ClosedChannelException) { 137 exceptions_closed.incrementAndGet(); 138 LOG.warn("Attempt to write to closed channel " + chan); 139 return; 140 } 141 if (cause instanceof IOException) { 142 final String message = cause.getMessage(); 143 if ("Connection reset by peer".equals(message)) { 144 exceptions_reset.incrementAndGet(); 145 return; 146 } else if ("Connection timed out".equals(message)) { 147 exceptions_timeout.incrementAndGet(); 148 // Do nothing. A client disconnecting isn't really our problem. Oh, 149 // and I'm not kidding you, there's no better way to detect ECONNRESET 150 // in Java. Like, people have been bitching about errno for years, 151 // and Java managed to do something *far* worse. That's quite a feat. 152 return; 153 } else if (cause instanceof ConnectionRefusedException) { 154 connections_rejected.incrementAndGet(); 155 if (LOG.isDebugEnabled()) { 156 LOG.debug("Refusing connection from " + chan, e.getCause()); 157 } 158 chan.close(); 159 return; 160 } 161 } 162 if (cause instanceof CodecEmbedderException) { 163 // payload was not compressed as it was announced to be 164 LOG.warn("Http codec error : " + cause.getMessage()); 165 e.getChannel().close(); 166 return; 167 } 168 exceptions_unknown.incrementAndGet(); 169 LOG.error("Unexpected exception from downstream for " + chan, cause); 170 e.getChannel().close(); 171 } 172 173 /** Simple exception for refusing a connection. */ 174 private static class ConnectionRefusedException extends ChannelException { 175 176 /** 177 * Default ctor with a message. 178 * @param message A descriptive message for the exception. 179 */ ConnectionRefusedException(final String message)180 public ConnectionRefusedException(final String message) { 181 super(message); 182 } 183 184 private static final long serialVersionUID = 5348377149312597939L; 185 } 186 } 187