1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.ha;
19 
20 import java.io.IOException;
21 import java.net.InetSocketAddress;
22 
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.classification.InterfaceStability;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.CommonConfigurationKeys;
27 import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService;
28 import org.apache.hadoop.ha.protocolPB.ZKFCProtocolPB;
29 import org.apache.hadoop.ha.protocolPB.ZKFCProtocolServerSideTranslatorPB;
30 import org.apache.hadoop.ipc.ProtobufRpcEngine;
31 import org.apache.hadoop.ipc.RPC;
32 import org.apache.hadoop.ipc.RPC.Server;
33 import org.apache.hadoop.security.AccessControlException;
34 import org.apache.hadoop.security.authorize.PolicyProvider;
35 
36 import com.google.protobuf.BlockingService;
37 
38 @InterfaceAudience.LimitedPrivate("HDFS")
39 @InterfaceStability.Evolving
40 public class ZKFCRpcServer implements ZKFCProtocol {
41 
42   private static final int HANDLER_COUNT = 3;
43   private final ZKFailoverController zkfc;
44   private Server server;
45 
ZKFCRpcServer(Configuration conf, InetSocketAddress bindAddr, ZKFailoverController zkfc, PolicyProvider policy)46   ZKFCRpcServer(Configuration conf,
47       InetSocketAddress bindAddr,
48       ZKFailoverController zkfc,
49       PolicyProvider policy) throws IOException {
50     this.zkfc = zkfc;
51 
52     RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
53         ProtobufRpcEngine.class);
54     ZKFCProtocolServerSideTranslatorPB translator =
55         new ZKFCProtocolServerSideTranslatorPB(this);
56     BlockingService service = ZKFCProtocolService
57         .newReflectiveBlockingService(translator);
58     this.server = new RPC.Builder(conf).setProtocol(ZKFCProtocolPB.class)
59         .setInstance(service).setBindAddress(bindAddr.getHostName())
60         .setPort(bindAddr.getPort()).setNumHandlers(HANDLER_COUNT)
61         .setVerbose(false).build();
62 
63     // set service-level authorization security policy
64     if (conf.getBoolean(
65         CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
66       server.refreshServiceAcl(conf, policy);
67     }
68 
69   }
70 
start()71   void start() {
72     this.server.start();
73   }
74 
getAddress()75   public InetSocketAddress getAddress() {
76     return server.getListenerAddress();
77   }
78 
stopAndJoin()79   void stopAndJoin() throws InterruptedException {
80     this.server.stop();
81     this.server.join();
82   }
83 
84   @Override
cedeActive(int millisToCede)85   public void cedeActive(int millisToCede) throws IOException,
86       AccessControlException {
87     zkfc.checkRpcAdminAccess();
88     zkfc.cedeActive(millisToCede);
89   }
90 
91   @Override
gracefulFailover()92   public void gracefulFailover() throws IOException, AccessControlException {
93     zkfc.checkRpcAdminAccess();
94     zkfc.gracefulFailoverToYou();
95   }
96 
97 }
98