1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper.server;
20 
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.ByteBuffer;
24 import java.util.Collections;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ConcurrentHashMap;
28 import javax.management.JMException;
29 import javax.security.auth.login.AppConfigurationEntry;
30 import javax.security.auth.login.Configuration;
31 import javax.security.auth.login.LoginException;
32 import org.apache.zookeeper.Environment;
33 import org.apache.zookeeper.Login;
34 import org.apache.zookeeper.common.ZKConfig;
35 import org.apache.zookeeper.jmx.MBeanRegistry;
36 import org.apache.zookeeper.server.auth.SaslServerCallbackHandler;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 
40 public abstract class ServerCnxnFactory {
41 
42     public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
43     private static final String ZOOKEEPER_MAX_CONNECTION = "zookeeper.maxCnxns";
44     public static final int ZOOKEEPER_MAX_CONNECTION_DEFAULT = 0;
45 
46     private static final Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class);
47 
48     // Tells whether SSL is enabled on this ServerCnxnFactory
49     protected boolean secure;
50 
51     /**
52      * The buffer will cause the connection to be close when we do a send.
53      */
54     static final ByteBuffer closeConn = ByteBuffer.allocate(0);
55 
56     // total number of connections accepted by the ZooKeeper server
57     protected int maxCnxns;
58 
59     // sessionMap is used by closeSession()
60     final ConcurrentHashMap<Long, ServerCnxn> sessionMap = new ConcurrentHashMap<Long, ServerCnxn>();
61 
62     private static String loginUser = Login.SYSTEM_USER;
63 
addSession(long sessionId, ServerCnxn cnxn)64     public void addSession(long sessionId, ServerCnxn cnxn) {
65         sessionMap.put(sessionId, cnxn);
66     }
67 
removeCnxnFromSessionMap(ServerCnxn cnxn)68     public void removeCnxnFromSessionMap(ServerCnxn cnxn) {
69         long sessionId = cnxn.getSessionId();
70         if (sessionId != 0) {
71             sessionMap.remove(sessionId);
72         }
73     }
74 
75     /**
76      * @return true if the cnxn that contains the sessionId exists in this ServerCnxnFactory
77      *         and it's closed. Otherwise false.
78      */
closeSession(long sessionId, ServerCnxn.DisconnectReason reason)79     public boolean closeSession(long sessionId, ServerCnxn.DisconnectReason reason) {
80         ServerCnxn cnxn = sessionMap.remove(sessionId);
81         if (cnxn != null) {
82             try {
83                 cnxn.close(reason);
84             } catch (Exception e) {
85                 LOG.warn("exception during session close", e);
86             }
87             return true;
88         }
89         return false;
90     }
91 
getLocalPort()92     public abstract int getLocalPort();
93 
getConnections()94     public abstract Iterable<ServerCnxn> getConnections();
95 
getNumAliveConnections()96     public int getNumAliveConnections() {
97         return cnxns.size();
98     }
99 
getZooKeeperServer()100     public final ZooKeeperServer getZooKeeperServer() {
101         return zkServer;
102     }
103 
configure(InetSocketAddress addr, int maxcc)104     public void configure(InetSocketAddress addr, int maxcc) throws IOException {
105         configure(addr, maxcc, -1);
106     }
107 
configure(InetSocketAddress addr, int maxcc, int backlog)108     public void configure(InetSocketAddress addr, int maxcc, int backlog) throws IOException {
109         configure(addr, maxcc, backlog, false);
110     }
111 
configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure)112     public abstract void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException;
113 
reconfigure(InetSocketAddress addr)114     public abstract void reconfigure(InetSocketAddress addr);
115 
116     protected SaslServerCallbackHandler saslServerCallbackHandler;
117     public Login login;
118 
119     /** Maximum number of connections allowed from particular host (ip) */
getMaxClientCnxnsPerHost()120     public abstract int getMaxClientCnxnsPerHost();
121 
122     /** Maximum number of connections allowed from particular host (ip) */
setMaxClientCnxnsPerHost(int max)123     public abstract void setMaxClientCnxnsPerHost(int max);
124 
isSecure()125     public boolean isSecure() {
126         return secure;
127     }
128 
startup(ZooKeeperServer zkServer)129     public void startup(ZooKeeperServer zkServer) throws IOException, InterruptedException {
130         startup(zkServer, true);
131     }
132 
133     // This method is to maintain compatiblity of startup(zks) and enable sharing of zks
134     // when we add secureCnxnFactory.
startup(ZooKeeperServer zkServer, boolean startServer)135     public abstract void startup(ZooKeeperServer zkServer, boolean startServer) throws IOException, InterruptedException;
136 
137     /** The maximum queue length of the ZooKeeper server's socket */
getSocketListenBacklog()138     public abstract int getSocketListenBacklog();
139 
join()140     public abstract void join() throws InterruptedException;
141 
shutdown()142     public abstract void shutdown();
143 
start()144     public abstract void start();
145 
146     protected ZooKeeperServer zkServer;
setZooKeeperServer(ZooKeeperServer zks)147     public final void setZooKeeperServer(ZooKeeperServer zks) {
148         this.zkServer = zks;
149         if (zks != null) {
150             if (secure) {
151                 zks.setSecureServerCnxnFactory(this);
152             } else {
153                 zks.setServerCnxnFactory(this);
154             }
155         }
156     }
157 
closeAll(ServerCnxn.DisconnectReason reason)158     public abstract void closeAll(ServerCnxn.DisconnectReason reason);
159 
createFactory()160     public static ServerCnxnFactory createFactory() throws IOException {
161         String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
162         if (serverCnxnFactoryName == null) {
163             serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
164         }
165         try {
166             ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
167                                                                            .getDeclaredConstructor()
168                                                                            .newInstance();
169             LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
170             return serverCnxnFactory;
171         } catch (Exception e) {
172             IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e);
173             throw ioe;
174         }
175     }
176 
createFactory(int clientPort, int maxClientCnxns)177     public static ServerCnxnFactory createFactory(int clientPort, int maxClientCnxns) throws IOException {
178         return createFactory(new InetSocketAddress(clientPort), maxClientCnxns, -1);
179     }
180 
createFactory(int clientPort, int maxClientCnxns, int backlog)181     public static ServerCnxnFactory createFactory(int clientPort, int maxClientCnxns, int backlog) throws IOException {
182         return createFactory(new InetSocketAddress(clientPort), maxClientCnxns, backlog);
183     }
184 
createFactory(InetSocketAddress addr, int maxClientCnxns)185     public static ServerCnxnFactory createFactory(InetSocketAddress addr, int maxClientCnxns) throws IOException {
186         return createFactory(addr, maxClientCnxns, -1);
187     }
188 
createFactory(InetSocketAddress addr, int maxClientCnxns, int backlog)189     public static ServerCnxnFactory createFactory(InetSocketAddress addr, int maxClientCnxns, int backlog) throws IOException {
190         ServerCnxnFactory factory = createFactory();
191         factory.configure(addr, maxClientCnxns, backlog);
192         return factory;
193     }
194 
getLocalAddress()195     public abstract InetSocketAddress getLocalAddress();
196 
resetAllConnectionStats()197     public abstract void resetAllConnectionStats();
198 
getAllConnectionInfo(boolean brief)199     public abstract Iterable<Map<String, Object>> getAllConnectionInfo(boolean brief);
200 
201     private final ConcurrentHashMap<ServerCnxn, ConnectionBean> connectionBeans = new ConcurrentHashMap<ServerCnxn, ConnectionBean>();
202 
203     // Connection set is relied on heavily by four letter commands
204     // Construct a ConcurrentHashSet using a ConcurrentHashMap
205     protected final Set<ServerCnxn> cnxns = Collections.newSetFromMap(new ConcurrentHashMap<ServerCnxn, Boolean>());
unregisterConnection(ServerCnxn serverCnxn)206     public void unregisterConnection(ServerCnxn serverCnxn) {
207         ConnectionBean jmxConnectionBean = connectionBeans.remove(serverCnxn);
208         if (jmxConnectionBean != null) {
209             MBeanRegistry.getInstance().unregister(jmxConnectionBean);
210         }
211     }
212 
registerConnection(ServerCnxn serverCnxn)213     public void registerConnection(ServerCnxn serverCnxn) {
214         if (zkServer != null) {
215             ConnectionBean jmxConnectionBean = new ConnectionBean(serverCnxn, zkServer);
216             try {
217                 MBeanRegistry.getInstance().register(jmxConnectionBean, zkServer.jmxServerBean);
218                 connectionBeans.put(serverCnxn, jmxConnectionBean);
219             } catch (JMException e) {
220                 LOG.warn("Could not register connection", e);
221             }
222         }
223 
224     }
225 
226     /**
227      * Initialize the server SASL if specified.
228      *
229      * If the user has specified a "ZooKeeperServer.LOGIN_CONTEXT_NAME_KEY"
230      * or a jaas.conf using "java.security.auth.login.config"
231      * the authentication is required and an exception is raised.
232      * Otherwise no authentication is configured and no exception is raised.
233      *
234      * @throws IOException if jaas.conf is missing or there's an error in it.
235      */
configureSaslLogin()236     protected void configureSaslLogin() throws IOException {
237         String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY, ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);
238 
239         // Note that 'Configuration' here refers to javax.security.auth.login.Configuration.
240         AppConfigurationEntry[] entries = null;
241         SecurityException securityException = null;
242         try {
243             entries = Configuration.getConfiguration().getAppConfigurationEntry(serverSection);
244         } catch (SecurityException e) {
245             // handle below: might be harmless if the user doesn't intend to use JAAS authentication.
246             securityException = e;
247         }
248 
249         // No entries in jaas.conf
250         // If there's a configuration exception fetching the jaas section and
251         // the user has required sasl by specifying a LOGIN_CONTEXT_NAME_KEY or a jaas file
252         // we throw an exception otherwise we continue without authentication.
253         if (entries == null) {
254             String jaasFile = System.getProperty(Environment.JAAS_CONF_KEY);
255             String loginContextName = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY);
256             if (securityException != null && (loginContextName != null || jaasFile != null)) {
257                 String errorMessage = "No JAAS configuration section named '" + serverSection + "' was found";
258                 if (jaasFile != null) {
259                     errorMessage += " in '" + jaasFile + "'.";
260                 }
261                 if (loginContextName != null) {
262                     errorMessage += " But " + ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY + " was set.";
263                 }
264                 LOG.error(errorMessage);
265                 throw new IOException(errorMessage);
266             }
267             return;
268         }
269 
270         // jaas.conf entry available
271         try {
272             saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());
273             login = new Login(serverSection, saslServerCallbackHandler, new ZKConfig());
274             setLoginUser(login.getUserName());
275             login.startThreadIfNeeded();
276         } catch (LoginException e) {
277             throw new IOException("Could not configure server because SASL configuration did not allow the "
278                                   + " ZooKeeper server to authenticate itself properly: "
279                                   + e);
280         }
281     }
282 
setLoginUser(String name)283     private static void setLoginUser(String name) {
284         //Created this method to avoid ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD find bug issue
285         loginUser = name;
286     }
287     /**
288      * User who has started the ZooKeeper server user, it will be the logged-in
289      * user. If no user logged-in then system user
290      */
getUserName()291     public static String getUserName() {
292         return loginUser;
293     }
294 
295     /**
296      * Maximum number of connections allowed in the ZooKeeper system
297      */
getMaxCnxns()298     public int getMaxCnxns() {
299         return maxCnxns;
300     }
301 
initMaxCnxns()302     protected void initMaxCnxns() {
303         maxCnxns = Integer.getInteger(ZOOKEEPER_MAX_CONNECTION, ZOOKEEPER_MAX_CONNECTION_DEFAULT);
304         if (maxCnxns < 0) {
305             maxCnxns = ZOOKEEPER_MAX_CONNECTION_DEFAULT;
306             LOG.warn("maxCnxns should be greater than or equal to 0, using default vlaue {}.",
307                     ZOOKEEPER_MAX_CONNECTION_DEFAULT);
308         } else if (maxCnxns == ZOOKEEPER_MAX_CONNECTION_DEFAULT) {
309             LOG.warn("maxCnxns is not configured, using default value {}.",
310                     ZOOKEEPER_MAX_CONNECTION_DEFAULT);
311         } else {
312             LOG.info("maxCnxns configured value is {}.", maxCnxns);
313         }
314     }
315 
316     /**
317      * Ensure total number of connections are less than the maxCnxns
318      */
limitTotalNumberOfCnxns()319     protected boolean limitTotalNumberOfCnxns() {
320         if (maxCnxns <= 0) {
321             // maxCnxns limit is disabled
322             return false;
323         }
324         int cnxns = getNumAliveConnections();
325         if (cnxns >= maxCnxns) {
326             LOG.error("Too many connections " + cnxns + " - max is " + maxCnxns);
327             return true;
328         }
329         return false;
330     }
331 }
332