1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hdfs.protocolPB;
20 
21 import java.io.IOException;
22 import java.util.List;
23 
24 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
25 import org.apache.hadoop.hdfs.protocol.DatanodeID;
26 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
27 import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
28 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto;
29 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
30 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
31 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
32 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto;
33 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto;
34 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
35 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
36 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
37 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
38 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
39 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
40 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
41 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
42 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
43 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
44 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
45 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
46 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
47 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
48 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
49 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RollingUpgradeStatusProto;
50 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
51 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
52 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
53 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
54 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
55 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
56 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
57 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
58 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
59 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
60 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
61 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
62 
63 import com.google.common.base.Preconditions;
64 import com.google.protobuf.RpcController;
65 import com.google.protobuf.ServiceException;
66 
67 public class DatanodeProtocolServerSideTranslatorPB implements
68     DatanodeProtocolPB {
69 
70   private final DatanodeProtocol impl;
71   private static final ErrorReportResponseProto
72       VOID_ERROR_REPORT_RESPONSE_PROTO =
73           ErrorReportResponseProto.newBuilder().build();
74   private static final BlockReceivedAndDeletedResponseProto
75       VOID_BLOCK_RECEIVED_AND_DELETE_RESPONSE =
76           BlockReceivedAndDeletedResponseProto.newBuilder().build();
77   private static final ReportBadBlocksResponseProto
78       VOID_REPORT_BAD_BLOCK_RESPONSE =
79           ReportBadBlocksResponseProto.newBuilder().build();
80   private static final CommitBlockSynchronizationResponseProto
81       VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO =
82           CommitBlockSynchronizationResponseProto.newBuilder().build();
83 
DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl)84   public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl) {
85     this.impl = impl;
86   }
87 
88   @Override
registerDatanode( RpcController controller, RegisterDatanodeRequestProto request)89   public RegisterDatanodeResponseProto registerDatanode(
90       RpcController controller, RegisterDatanodeRequestProto request)
91       throws ServiceException {
92     DatanodeRegistration registration = PBHelper.convert(request
93         .getRegistration());
94     DatanodeRegistration registrationResp;
95     try {
96       registrationResp = impl.registerDatanode(registration);
97     } catch (IOException e) {
98       throw new ServiceException(e);
99     }
100     return RegisterDatanodeResponseProto.newBuilder()
101         .setRegistration(PBHelper.convert(registrationResp)).build();
102   }
103 
104   @Override
sendHeartbeat(RpcController controller, HeartbeatRequestProto request)105   public HeartbeatResponseProto sendHeartbeat(RpcController controller,
106       HeartbeatRequestProto request) throws ServiceException {
107     HeartbeatResponse response;
108     try {
109       final StorageReport[] report = PBHelper.convertStorageReports(
110           request.getReportsList());
111       VolumeFailureSummary volumeFailureSummary =
112           request.hasVolumeFailureSummary() ? PBHelper.convertVolumeFailureSummary(
113               request.getVolumeFailureSummary()) : null;
114       response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
115           report, request.getCacheCapacity(), request.getCacheUsed(),
116           request.getXmitsInProgress(),
117           request.getXceiverCount(), request.getFailedVolumes(),
118           volumeFailureSummary);
119     } catch (IOException e) {
120       throw new ServiceException(e);
121     }
122     HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
123         .newBuilder();
124     DatanodeCommand[] cmds = response.getCommands();
125     if (cmds != null) {
126       for (int i = 0; i < cmds.length; i++) {
127         if (cmds[i] != null) {
128           builder.addCmds(PBHelper.convert(cmds[i]));
129         }
130       }
131     }
132     builder.setHaStatus(PBHelper.convert(response.getNameNodeHaState()));
133     RollingUpgradeStatus rollingUpdateStatus = response
134         .getRollingUpdateStatus();
135     if (rollingUpdateStatus != null) {
136       // V2 is always set for newer datanodes.
137       // To be compatible with older datanodes, V1 is set to null
138       //  if the RU was finalized.
139       RollingUpgradeStatusProto rus = PBHelper.convertRollingUpgradeStatus(
140           rollingUpdateStatus);
141       builder.setRollingUpgradeStatusV2(rus);
142       if (!rollingUpdateStatus.isFinalized()) {
143         builder.setRollingUpgradeStatus(rus);
144       }
145     }
146     return builder.build();
147   }
148 
149   @Override
blockReport(RpcController controller, BlockReportRequestProto request)150   public BlockReportResponseProto blockReport(RpcController controller,
151       BlockReportRequestProto request) throws ServiceException {
152     DatanodeCommand cmd = null;
153     StorageBlockReport[] report =
154         new StorageBlockReport[request.getReportsCount()];
155 
156     int index = 0;
157     for (StorageBlockReportProto s : request.getReportsList()) {
158       final BlockListAsLongs blocks;
159       if (s.hasNumberOfBlocks()) { // new style buffer based reports
160         int num = (int)s.getNumberOfBlocks();
161         Preconditions.checkState(s.getBlocksCount() == 0,
162             "cannot send both blocks list and buffers");
163         blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList());
164       } else {
165         blocks = BlockListAsLongs.decodeLongs(s.getBlocksList());
166       }
167       report[index++] = new StorageBlockReport(PBHelper.convert(s.getStorage()),
168           blocks);
169     }
170     try {
171       cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
172           request.getBlockPoolId(), report,
173           request.hasContext() ?
174               PBHelper.convert(request.getContext()) : null);
175     } catch (IOException e) {
176       throw new ServiceException(e);
177     }
178     BlockReportResponseProto.Builder builder =
179         BlockReportResponseProto.newBuilder();
180     if (cmd != null) {
181       builder.setCmd(PBHelper.convert(cmd));
182     }
183     return builder.build();
184   }
185 
186   @Override
cacheReport(RpcController controller, CacheReportRequestProto request)187   public CacheReportResponseProto cacheReport(RpcController controller,
188       CacheReportRequestProto request) throws ServiceException {
189     DatanodeCommand cmd = null;
190     try {
191       cmd = impl.cacheReport(
192           PBHelper.convert(request.getRegistration()),
193           request.getBlockPoolId(),
194           request.getBlocksList());
195     } catch (IOException e) {
196       throw new ServiceException(e);
197     }
198     CacheReportResponseProto.Builder builder =
199         CacheReportResponseProto.newBuilder();
200     if (cmd != null) {
201       builder.setCmd(PBHelper.convert(cmd));
202     }
203     return builder.build();
204   }
205 
206 
207   @Override
blockReceivedAndDeleted( RpcController controller, BlockReceivedAndDeletedRequestProto request)208   public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
209       RpcController controller, BlockReceivedAndDeletedRequestProto request)
210       throws ServiceException {
211     List<StorageReceivedDeletedBlocksProto> sBlocks = request.getBlocksList();
212     StorageReceivedDeletedBlocks[] info =
213         new StorageReceivedDeletedBlocks[sBlocks.size()];
214     for (int i = 0; i < sBlocks.size(); i++) {
215       StorageReceivedDeletedBlocksProto sBlock = sBlocks.get(i);
216       List<ReceivedDeletedBlockInfoProto> list = sBlock.getBlocksList();
217       ReceivedDeletedBlockInfo[] rdBlocks =
218           new ReceivedDeletedBlockInfo[list.size()];
219       for (int j = 0; j < list.size(); j++) {
220         rdBlocks[j] = PBHelper.convert(list.get(j));
221       }
222       if (sBlock.hasStorage()) {
223         info[i] = new StorageReceivedDeletedBlocks(
224             PBHelper.convert(sBlock.getStorage()), rdBlocks);
225       } else {
226         info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageUuid(), rdBlocks);
227       }
228     }
229     try {
230       impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),
231           request.getBlockPoolId(), info);
232     } catch (IOException e) {
233       throw new ServiceException(e);
234     }
235     return VOID_BLOCK_RECEIVED_AND_DELETE_RESPONSE;
236   }
237 
238   @Override
errorReport(RpcController controller, ErrorReportRequestProto request)239   public ErrorReportResponseProto errorReport(RpcController controller,
240       ErrorReportRequestProto request) throws ServiceException {
241     try {
242       impl.errorReport(PBHelper.convert(request.getRegistartion()),
243           request.getErrorCode(), request.getMsg());
244     } catch (IOException e) {
245       throw new ServiceException(e);
246     }
247     return VOID_ERROR_REPORT_RESPONSE_PROTO;
248   }
249 
250   @Override
versionRequest(RpcController controller, VersionRequestProto request)251   public VersionResponseProto versionRequest(RpcController controller,
252       VersionRequestProto request) throws ServiceException {
253     NamespaceInfo info;
254     try {
255       info = impl.versionRequest();
256     } catch (IOException e) {
257       throw new ServiceException(e);
258     }
259     return VersionResponseProto.newBuilder()
260         .setInfo(PBHelper.convert(info)).build();
261   }
262 
263   @Override
reportBadBlocks(RpcController controller, ReportBadBlocksRequestProto request)264   public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
265       ReportBadBlocksRequestProto request) throws ServiceException {
266     List<LocatedBlockProto> lbps = request.getBlocksList();
267     LocatedBlock [] blocks = new LocatedBlock [lbps.size()];
268     for(int i=0; i<lbps.size(); i++) {
269       blocks[i] = PBHelper.convert(lbps.get(i));
270     }
271     try {
272       impl.reportBadBlocks(blocks);
273     } catch (IOException e) {
274       throw new ServiceException(e);
275     }
276     return VOID_REPORT_BAD_BLOCK_RESPONSE;
277   }
278 
279   @Override
commitBlockSynchronization( RpcController controller, CommitBlockSynchronizationRequestProto request)280   public CommitBlockSynchronizationResponseProto commitBlockSynchronization(
281       RpcController controller, CommitBlockSynchronizationRequestProto request)
282       throws ServiceException {
283     List<DatanodeIDProto> dnprotos = request.getNewTaragetsList();
284     DatanodeID[] dns = new DatanodeID[dnprotos.size()];
285     for (int i = 0; i < dnprotos.size(); i++) {
286       dns[i] = PBHelper.convert(dnprotos.get(i));
287     }
288     final List<String> sidprotos = request.getNewTargetStoragesList();
289     final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]);
290     try {
291       impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()),
292           request.getNewGenStamp(), request.getNewLength(),
293           request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs);
294     } catch (IOException e) {
295       throw new ServiceException(e);
296     }
297     return VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO;
298   }
299 }
300