1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.protocol.datatransfer; 19 20 import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto; 21 22 import java.io.DataOutput; 23 import java.io.DataOutputStream; 24 import java.io.IOException; 25 26 import org.apache.hadoop.classification.InterfaceAudience; 27 import org.apache.hadoop.classification.InterfaceStability; 28 import org.apache.hadoop.fs.StorageType; 29 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 30 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 31 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; 32 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; 33 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; 34 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; 35 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; 36 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; 37 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; 38 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; 39 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; 40 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; 41 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; 42 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; 43 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; 44 import org.apache.hadoop.hdfs.protocolPB.PBHelper; 45 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 46 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; 47 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; 48 import org.apache.hadoop.security.token.Token; 49 import org.apache.hadoop.util.DataChecksum; 50 51 import org.apache.htrace.Trace; 52 import org.apache.htrace.Span; 53 54 import com.google.protobuf.Message; 55 56 /** Sender */ 57 @InterfaceAudience.Private 58 @InterfaceStability.Evolving 59 public class Sender implements DataTransferProtocol { 60 private final DataOutputStream out; 61 62 /** Create a sender for DataTransferProtocol with a output stream. */ Sender(final DataOutputStream out)63 public Sender(final DataOutputStream out) { 64 this.out = out; 65 } 66 67 /** Initialize a operation. */ op(final DataOutput out, final Op op )68 private static void op(final DataOutput out, final Op op 69 ) throws IOException { 70 out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); 71 op.write(out); 72 } 73 send(final DataOutputStream out, final Op opcode, final Message proto)74 private static void send(final DataOutputStream out, final Op opcode, 75 final Message proto) throws IOException { 76 if (LOG.isTraceEnabled()) { 77 LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName() 78 + ": " + proto); 79 } 80 op(out, opcode); 81 proto.writeDelimitedTo(out); 82 out.flush(); 83 } 84 getCachingStrategy(CachingStrategy cachingStrategy)85 static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) { 86 CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder(); 87 if (cachingStrategy.getReadahead() != null) { 88 builder.setReadahead(cachingStrategy.getReadahead().longValue()); 89 } 90 if (cachingStrategy.getDropBehind() != null) { 91 builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue()); 92 } 93 return builder.build(); 94 } 95 96 @Override readBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final long blockOffset, final long length, final boolean sendChecksum, final CachingStrategy cachingStrategy)97 public void readBlock(final ExtendedBlock blk, 98 final Token<BlockTokenIdentifier> blockToken, 99 final String clientName, 100 final long blockOffset, 101 final long length, 102 final boolean sendChecksum, 103 final CachingStrategy cachingStrategy) throws IOException { 104 105 OpReadBlockProto proto = OpReadBlockProto.newBuilder() 106 .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) 107 .setOffset(blockOffset) 108 .setLen(length) 109 .setSendChecksums(sendChecksum) 110 .setCachingStrategy(getCachingStrategy(cachingStrategy)) 111 .build(); 112 113 send(out, Op.READ_BLOCK, proto); 114 } 115 116 117 @Override writeBlock(final ExtendedBlock blk, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, final CachingStrategy cachingStrategy, final boolean allowLazyPersist, final boolean pinning, final boolean[] targetPinnings)118 public void writeBlock(final ExtendedBlock blk, 119 final StorageType storageType, 120 final Token<BlockTokenIdentifier> blockToken, 121 final String clientName, 122 final DatanodeInfo[] targets, 123 final StorageType[] targetStorageTypes, 124 final DatanodeInfo source, 125 final BlockConstructionStage stage, 126 final int pipelineSize, 127 final long minBytesRcvd, 128 final long maxBytesRcvd, 129 final long latestGenerationStamp, 130 DataChecksum requestedChecksum, 131 final CachingStrategy cachingStrategy, 132 final boolean allowLazyPersist, 133 final boolean pinning, 134 final boolean[] targetPinnings) throws IOException { 135 ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( 136 blk, clientName, blockToken); 137 138 ChecksumProto checksumProto = 139 DataTransferProtoUtil.toProto(requestedChecksum); 140 141 OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() 142 .setHeader(header) 143 .setStorageType(PBHelper.convertStorageType(storageType)) 144 .addAllTargets(PBHelper.convert(targets, 1)) 145 .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1)) 146 .setStage(toProto(stage)) 147 .setPipelineSize(pipelineSize) 148 .setMinBytesRcvd(minBytesRcvd) 149 .setMaxBytesRcvd(maxBytesRcvd) 150 .setLatestGenerationStamp(latestGenerationStamp) 151 .setRequestedChecksum(checksumProto) 152 .setCachingStrategy(getCachingStrategy(cachingStrategy)) 153 .setAllowLazyPersist(allowLazyPersist) 154 .setPinning(pinning) 155 .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1)); 156 157 if (source != null) { 158 proto.setSource(PBHelper.convertDatanodeInfo(source)); 159 } 160 161 send(out, Op.WRITE_BLOCK, proto.build()); 162 } 163 164 @Override transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes)165 public void transferBlock(final ExtendedBlock blk, 166 final Token<BlockTokenIdentifier> blockToken, 167 final String clientName, 168 final DatanodeInfo[] targets, 169 final StorageType[] targetStorageTypes) throws IOException { 170 171 OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() 172 .setHeader(DataTransferProtoUtil.buildClientHeader( 173 blk, clientName, blockToken)) 174 .addAllTargets(PBHelper.convert(targets)) 175 .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) 176 .build(); 177 178 send(out, Op.TRANSFER_BLOCK, proto); 179 } 180 181 @Override requestShortCircuitFds(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, SlotId slotId, int maxVersion, boolean supportsReceiptVerification)182 public void requestShortCircuitFds(final ExtendedBlock blk, 183 final Token<BlockTokenIdentifier> blockToken, 184 SlotId slotId, int maxVersion, boolean supportsReceiptVerification) 185 throws IOException { 186 OpRequestShortCircuitAccessProto.Builder builder = 187 OpRequestShortCircuitAccessProto.newBuilder() 188 .setHeader(DataTransferProtoUtil.buildBaseHeader( 189 blk, blockToken)).setMaxVersion(maxVersion); 190 if (slotId != null) { 191 builder.setSlotId(PBHelper.convert(slotId)); 192 } 193 builder.setSupportsReceiptVerification(supportsReceiptVerification); 194 OpRequestShortCircuitAccessProto proto = builder.build(); 195 send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); 196 } 197 198 @Override releaseShortCircuitFds(SlotId slotId)199 public void releaseShortCircuitFds(SlotId slotId) throws IOException { 200 ReleaseShortCircuitAccessRequestProto.Builder builder = 201 ReleaseShortCircuitAccessRequestProto.newBuilder(). 202 setSlotId(PBHelper.convert(slotId)); 203 if (Trace.isTracing()) { 204 Span s = Trace.currentSpan(); 205 builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() 206 .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); 207 } 208 ReleaseShortCircuitAccessRequestProto proto = builder.build(); 209 send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); 210 } 211 212 @Override requestShortCircuitShm(String clientName)213 public void requestShortCircuitShm(String clientName) throws IOException { 214 ShortCircuitShmRequestProto.Builder builder = 215 ShortCircuitShmRequestProto.newBuilder(). 216 setClientName(clientName); 217 if (Trace.isTracing()) { 218 Span s = Trace.currentSpan(); 219 builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() 220 .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); 221 } 222 ShortCircuitShmRequestProto proto = builder.build(); 223 send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); 224 } 225 226 @Override replaceBlock(final ExtendedBlock blk, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo source)227 public void replaceBlock(final ExtendedBlock blk, 228 final StorageType storageType, 229 final Token<BlockTokenIdentifier> blockToken, 230 final String delHint, 231 final DatanodeInfo source) throws IOException { 232 OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() 233 .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) 234 .setStorageType(PBHelper.convertStorageType(storageType)) 235 .setDelHint(delHint) 236 .setSource(PBHelper.convertDatanodeInfo(source)) 237 .build(); 238 239 send(out, Op.REPLACE_BLOCK, proto); 240 } 241 242 @Override copyBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken)243 public void copyBlock(final ExtendedBlock blk, 244 final Token<BlockTokenIdentifier> blockToken) throws IOException { 245 OpCopyBlockProto proto = OpCopyBlockProto.newBuilder() 246 .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) 247 .build(); 248 249 send(out, Op.COPY_BLOCK, proto); 250 } 251 252 @Override blockChecksum(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken)253 public void blockChecksum(final ExtendedBlock blk, 254 final Token<BlockTokenIdentifier> blockToken) throws IOException { 255 OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() 256 .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) 257 .build(); 258 259 send(out, Op.BLOCK_CHECKSUM, proto); 260 } 261 } 262