1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.ha; 19 20 import java.io.IOException; 21 import java.net.InetSocketAddress; 22 23 import org.apache.hadoop.classification.InterfaceAudience; 24 import org.apache.hadoop.classification.InterfaceStability; 25 import org.apache.hadoop.conf.Configuration; 26 import org.apache.hadoop.fs.CommonConfigurationKeys; 27 import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService; 28 import org.apache.hadoop.ha.protocolPB.ZKFCProtocolPB; 29 import org.apache.hadoop.ha.protocolPB.ZKFCProtocolServerSideTranslatorPB; 30 import org.apache.hadoop.ipc.ProtobufRpcEngine; 31 import org.apache.hadoop.ipc.RPC; 32 import org.apache.hadoop.ipc.RPC.Server; 33 import org.apache.hadoop.security.AccessControlException; 34 import org.apache.hadoop.security.authorize.PolicyProvider; 35 36 import com.google.protobuf.BlockingService; 37 38 @InterfaceAudience.LimitedPrivate("HDFS") 39 @InterfaceStability.Evolving 40 public class ZKFCRpcServer implements ZKFCProtocol { 41 42 private static final int HANDLER_COUNT = 3; 43 private final ZKFailoverController zkfc; 44 private Server server; 45 ZKFCRpcServer(Configuration conf, InetSocketAddress bindAddr, ZKFailoverController zkfc, PolicyProvider policy)46 ZKFCRpcServer(Configuration conf, 47 InetSocketAddress bindAddr, 48 ZKFailoverController zkfc, 49 PolicyProvider policy) throws IOException { 50 this.zkfc = zkfc; 51 52 RPC.setProtocolEngine(conf, ZKFCProtocolPB.class, 53 ProtobufRpcEngine.class); 54 ZKFCProtocolServerSideTranslatorPB translator = 55 new ZKFCProtocolServerSideTranslatorPB(this); 56 BlockingService service = ZKFCProtocolService 57 .newReflectiveBlockingService(translator); 58 this.server = new RPC.Builder(conf).setProtocol(ZKFCProtocolPB.class) 59 .setInstance(service).setBindAddress(bindAddr.getHostName()) 60 .setPort(bindAddr.getPort()).setNumHandlers(HANDLER_COUNT) 61 .setVerbose(false).build(); 62 63 // set service-level authorization security policy 64 if (conf.getBoolean( 65 CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { 66 server.refreshServiceAcl(conf, policy); 67 } 68 69 } 70 start()71 void start() { 72 this.server.start(); 73 } 74 getAddress()75 public InetSocketAddress getAddress() { 76 return server.getListenerAddress(); 77 } 78 stopAndJoin()79 void stopAndJoin() throws InterruptedException { 80 this.server.stop(); 81 this.server.join(); 82 } 83 84 @Override cedeActive(int millisToCede)85 public void cedeActive(int millisToCede) throws IOException, 86 AccessControlException { 87 zkfc.checkRpcAdminAccess(); 88 zkfc.cedeActive(millisToCede); 89 } 90 91 @Override gracefulFailover()92 public void gracefulFailover() throws IOException, AccessControlException { 93 zkfc.checkRpcAdminAccess(); 94 zkfc.gracefulFailoverToYou(); 95 } 96 97 } 98