1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hdfs.protocolPB; 20 21 import java.io.IOException; 22 import java.util.List; 23 24 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; 25 import org.apache.hadoop.hdfs.protocol.DatanodeID; 26 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 27 import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; 28 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto; 29 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto; 30 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto; 31 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto; 32 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto; 33 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto; 34 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto; 35 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto; 36 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto; 37 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto; 38 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto; 39 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto; 40 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; 41 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto; 42 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto; 43 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto; 44 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto; 45 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto; 46 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; 47 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; 48 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; 49 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RollingUpgradeStatusProto; 50 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; 51 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto; 52 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; 53 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; 54 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; 55 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; 56 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; 57 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; 58 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; 59 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; 60 import org.apache.hadoop.hdfs.server.protocol.StorageReport; 61 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; 62 63 import com.google.common.base.Preconditions; 64 import com.google.protobuf.RpcController; 65 import com.google.protobuf.ServiceException; 66 67 public class DatanodeProtocolServerSideTranslatorPB implements 68 DatanodeProtocolPB { 69 70 private final DatanodeProtocol impl; 71 private static final ErrorReportResponseProto 72 VOID_ERROR_REPORT_RESPONSE_PROTO = 73 ErrorReportResponseProto.newBuilder().build(); 74 private static final BlockReceivedAndDeletedResponseProto 75 VOID_BLOCK_RECEIVED_AND_DELETE_RESPONSE = 76 BlockReceivedAndDeletedResponseProto.newBuilder().build(); 77 private static final ReportBadBlocksResponseProto 78 VOID_REPORT_BAD_BLOCK_RESPONSE = 79 ReportBadBlocksResponseProto.newBuilder().build(); 80 private static final CommitBlockSynchronizationResponseProto 81 VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO = 82 CommitBlockSynchronizationResponseProto.newBuilder().build(); 83 DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl)84 public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl) { 85 this.impl = impl; 86 } 87 88 @Override registerDatanode( RpcController controller, RegisterDatanodeRequestProto request)89 public RegisterDatanodeResponseProto registerDatanode( 90 RpcController controller, RegisterDatanodeRequestProto request) 91 throws ServiceException { 92 DatanodeRegistration registration = PBHelper.convert(request 93 .getRegistration()); 94 DatanodeRegistration registrationResp; 95 try { 96 registrationResp = impl.registerDatanode(registration); 97 } catch (IOException e) { 98 throw new ServiceException(e); 99 } 100 return RegisterDatanodeResponseProto.newBuilder() 101 .setRegistration(PBHelper.convert(registrationResp)).build(); 102 } 103 104 @Override sendHeartbeat(RpcController controller, HeartbeatRequestProto request)105 public HeartbeatResponseProto sendHeartbeat(RpcController controller, 106 HeartbeatRequestProto request) throws ServiceException { 107 HeartbeatResponse response; 108 try { 109 final StorageReport[] report = PBHelper.convertStorageReports( 110 request.getReportsList()); 111 VolumeFailureSummary volumeFailureSummary = 112 request.hasVolumeFailureSummary() ? PBHelper.convertVolumeFailureSummary( 113 request.getVolumeFailureSummary()) : null; 114 response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), 115 report, request.getCacheCapacity(), request.getCacheUsed(), 116 request.getXmitsInProgress(), 117 request.getXceiverCount(), request.getFailedVolumes(), 118 volumeFailureSummary); 119 } catch (IOException e) { 120 throw new ServiceException(e); 121 } 122 HeartbeatResponseProto.Builder builder = HeartbeatResponseProto 123 .newBuilder(); 124 DatanodeCommand[] cmds = response.getCommands(); 125 if (cmds != null) { 126 for (int i = 0; i < cmds.length; i++) { 127 if (cmds[i] != null) { 128 builder.addCmds(PBHelper.convert(cmds[i])); 129 } 130 } 131 } 132 builder.setHaStatus(PBHelper.convert(response.getNameNodeHaState())); 133 RollingUpgradeStatus rollingUpdateStatus = response 134 .getRollingUpdateStatus(); 135 if (rollingUpdateStatus != null) { 136 // V2 is always set for newer datanodes. 137 // To be compatible with older datanodes, V1 is set to null 138 // if the RU was finalized. 139 RollingUpgradeStatusProto rus = PBHelper.convertRollingUpgradeStatus( 140 rollingUpdateStatus); 141 builder.setRollingUpgradeStatusV2(rus); 142 if (!rollingUpdateStatus.isFinalized()) { 143 builder.setRollingUpgradeStatus(rus); 144 } 145 } 146 return builder.build(); 147 } 148 149 @Override blockReport(RpcController controller, BlockReportRequestProto request)150 public BlockReportResponseProto blockReport(RpcController controller, 151 BlockReportRequestProto request) throws ServiceException { 152 DatanodeCommand cmd = null; 153 StorageBlockReport[] report = 154 new StorageBlockReport[request.getReportsCount()]; 155 156 int index = 0; 157 for (StorageBlockReportProto s : request.getReportsList()) { 158 final BlockListAsLongs blocks; 159 if (s.hasNumberOfBlocks()) { // new style buffer based reports 160 int num = (int)s.getNumberOfBlocks(); 161 Preconditions.checkState(s.getBlocksCount() == 0, 162 "cannot send both blocks list and buffers"); 163 blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList()); 164 } else { 165 blocks = BlockListAsLongs.decodeLongs(s.getBlocksList()); 166 } 167 report[index++] = new StorageBlockReport(PBHelper.convert(s.getStorage()), 168 blocks); 169 } 170 try { 171 cmd = impl.blockReport(PBHelper.convert(request.getRegistration()), 172 request.getBlockPoolId(), report, 173 request.hasContext() ? 174 PBHelper.convert(request.getContext()) : null); 175 } catch (IOException e) { 176 throw new ServiceException(e); 177 } 178 BlockReportResponseProto.Builder builder = 179 BlockReportResponseProto.newBuilder(); 180 if (cmd != null) { 181 builder.setCmd(PBHelper.convert(cmd)); 182 } 183 return builder.build(); 184 } 185 186 @Override cacheReport(RpcController controller, CacheReportRequestProto request)187 public CacheReportResponseProto cacheReport(RpcController controller, 188 CacheReportRequestProto request) throws ServiceException { 189 DatanodeCommand cmd = null; 190 try { 191 cmd = impl.cacheReport( 192 PBHelper.convert(request.getRegistration()), 193 request.getBlockPoolId(), 194 request.getBlocksList()); 195 } catch (IOException e) { 196 throw new ServiceException(e); 197 } 198 CacheReportResponseProto.Builder builder = 199 CacheReportResponseProto.newBuilder(); 200 if (cmd != null) { 201 builder.setCmd(PBHelper.convert(cmd)); 202 } 203 return builder.build(); 204 } 205 206 207 @Override blockReceivedAndDeleted( RpcController controller, BlockReceivedAndDeletedRequestProto request)208 public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted( 209 RpcController controller, BlockReceivedAndDeletedRequestProto request) 210 throws ServiceException { 211 List<StorageReceivedDeletedBlocksProto> sBlocks = request.getBlocksList(); 212 StorageReceivedDeletedBlocks[] info = 213 new StorageReceivedDeletedBlocks[sBlocks.size()]; 214 for (int i = 0; i < sBlocks.size(); i++) { 215 StorageReceivedDeletedBlocksProto sBlock = sBlocks.get(i); 216 List<ReceivedDeletedBlockInfoProto> list = sBlock.getBlocksList(); 217 ReceivedDeletedBlockInfo[] rdBlocks = 218 new ReceivedDeletedBlockInfo[list.size()]; 219 for (int j = 0; j < list.size(); j++) { 220 rdBlocks[j] = PBHelper.convert(list.get(j)); 221 } 222 if (sBlock.hasStorage()) { 223 info[i] = new StorageReceivedDeletedBlocks( 224 PBHelper.convert(sBlock.getStorage()), rdBlocks); 225 } else { 226 info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageUuid(), rdBlocks); 227 } 228 } 229 try { 230 impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()), 231 request.getBlockPoolId(), info); 232 } catch (IOException e) { 233 throw new ServiceException(e); 234 } 235 return VOID_BLOCK_RECEIVED_AND_DELETE_RESPONSE; 236 } 237 238 @Override errorReport(RpcController controller, ErrorReportRequestProto request)239 public ErrorReportResponseProto errorReport(RpcController controller, 240 ErrorReportRequestProto request) throws ServiceException { 241 try { 242 impl.errorReport(PBHelper.convert(request.getRegistartion()), 243 request.getErrorCode(), request.getMsg()); 244 } catch (IOException e) { 245 throw new ServiceException(e); 246 } 247 return VOID_ERROR_REPORT_RESPONSE_PROTO; 248 } 249 250 @Override versionRequest(RpcController controller, VersionRequestProto request)251 public VersionResponseProto versionRequest(RpcController controller, 252 VersionRequestProto request) throws ServiceException { 253 NamespaceInfo info; 254 try { 255 info = impl.versionRequest(); 256 } catch (IOException e) { 257 throw new ServiceException(e); 258 } 259 return VersionResponseProto.newBuilder() 260 .setInfo(PBHelper.convert(info)).build(); 261 } 262 263 @Override reportBadBlocks(RpcController controller, ReportBadBlocksRequestProto request)264 public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller, 265 ReportBadBlocksRequestProto request) throws ServiceException { 266 List<LocatedBlockProto> lbps = request.getBlocksList(); 267 LocatedBlock [] blocks = new LocatedBlock [lbps.size()]; 268 for(int i=0; i<lbps.size(); i++) { 269 blocks[i] = PBHelper.convert(lbps.get(i)); 270 } 271 try { 272 impl.reportBadBlocks(blocks); 273 } catch (IOException e) { 274 throw new ServiceException(e); 275 } 276 return VOID_REPORT_BAD_BLOCK_RESPONSE; 277 } 278 279 @Override commitBlockSynchronization( RpcController controller, CommitBlockSynchronizationRequestProto request)280 public CommitBlockSynchronizationResponseProto commitBlockSynchronization( 281 RpcController controller, CommitBlockSynchronizationRequestProto request) 282 throws ServiceException { 283 List<DatanodeIDProto> dnprotos = request.getNewTaragetsList(); 284 DatanodeID[] dns = new DatanodeID[dnprotos.size()]; 285 for (int i = 0; i < dnprotos.size(); i++) { 286 dns[i] = PBHelper.convert(dnprotos.get(i)); 287 } 288 final List<String> sidprotos = request.getNewTargetStoragesList(); 289 final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]); 290 try { 291 impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()), 292 request.getNewGenStamp(), request.getNewLength(), 293 request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs); 294 } catch (IOException e) { 295 throw new ServiceException(e); 296 } 297 return VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO; 298 } 299 } 300