1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.protocol.datatransfer;
19 
20 import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
21 
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.classification.InterfaceStability;
28 import org.apache.hadoop.fs.StorageType;
29 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
30 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
31 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
32 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
33 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
34 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
35 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
36 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
37 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
38 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
39 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
40 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
41 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
42 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
43 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
44 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
45 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
46 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
47 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
48 import org.apache.hadoop.security.token.Token;
49 import org.apache.hadoop.util.DataChecksum;
50 
51 import org.apache.htrace.Trace;
52 import org.apache.htrace.Span;
53 
54 import com.google.protobuf.Message;
55 
56 /** Sender */
57 @InterfaceAudience.Private
58 @InterfaceStability.Evolving
59 public class Sender implements DataTransferProtocol {
60   private final DataOutputStream out;
61 
62   /** Create a sender for DataTransferProtocol with a output stream. */
Sender(final DataOutputStream out)63   public Sender(final DataOutputStream out) {
64     this.out = out;
65   }
66 
67   /** Initialize a operation. */
op(final DataOutput out, final Op op )68   private static void op(final DataOutput out, final Op op
69       ) throws IOException {
70     out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
71     op.write(out);
72   }
73 
send(final DataOutputStream out, final Op opcode, final Message proto)74   private static void send(final DataOutputStream out, final Op opcode,
75       final Message proto) throws IOException {
76     if (LOG.isTraceEnabled()) {
77       LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
78           + ": " + proto);
79     }
80     op(out, opcode);
81     proto.writeDelimitedTo(out);
82     out.flush();
83   }
84 
getCachingStrategy(CachingStrategy cachingStrategy)85   static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
86     CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
87     if (cachingStrategy.getReadahead() != null) {
88       builder.setReadahead(cachingStrategy.getReadahead().longValue());
89     }
90     if (cachingStrategy.getDropBehind() != null) {
91       builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
92     }
93     return builder.build();
94   }
95 
96   @Override
readBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final long blockOffset, final long length, final boolean sendChecksum, final CachingStrategy cachingStrategy)97   public void readBlock(final ExtendedBlock blk,
98       final Token<BlockTokenIdentifier> blockToken,
99       final String clientName,
100       final long blockOffset,
101       final long length,
102       final boolean sendChecksum,
103       final CachingStrategy cachingStrategy) throws IOException {
104 
105     OpReadBlockProto proto = OpReadBlockProto.newBuilder()
106       .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
107       .setOffset(blockOffset)
108       .setLen(length)
109       .setSendChecksums(sendChecksum)
110       .setCachingStrategy(getCachingStrategy(cachingStrategy))
111       .build();
112 
113     send(out, Op.READ_BLOCK, proto);
114   }
115 
116 
117   @Override
writeBlock(final ExtendedBlock blk, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, final CachingStrategy cachingStrategy, final boolean allowLazyPersist, final boolean pinning, final boolean[] targetPinnings)118   public void writeBlock(final ExtendedBlock blk,
119       final StorageType storageType,
120       final Token<BlockTokenIdentifier> blockToken,
121       final String clientName,
122       final DatanodeInfo[] targets,
123       final StorageType[] targetStorageTypes,
124       final DatanodeInfo source,
125       final BlockConstructionStage stage,
126       final int pipelineSize,
127       final long minBytesRcvd,
128       final long maxBytesRcvd,
129       final long latestGenerationStamp,
130       DataChecksum requestedChecksum,
131       final CachingStrategy cachingStrategy,
132       final boolean allowLazyPersist,
133       final boolean pinning,
134       final boolean[] targetPinnings) throws IOException {
135     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
136         blk, clientName, blockToken);
137 
138     ChecksumProto checksumProto =
139       DataTransferProtoUtil.toProto(requestedChecksum);
140 
141     OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
142       .setHeader(header)
143       .setStorageType(PBHelper.convertStorageType(storageType))
144       .addAllTargets(PBHelper.convert(targets, 1))
145       .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
146       .setStage(toProto(stage))
147       .setPipelineSize(pipelineSize)
148       .setMinBytesRcvd(minBytesRcvd)
149       .setMaxBytesRcvd(maxBytesRcvd)
150       .setLatestGenerationStamp(latestGenerationStamp)
151       .setRequestedChecksum(checksumProto)
152       .setCachingStrategy(getCachingStrategy(cachingStrategy))
153       .setAllowLazyPersist(allowLazyPersist)
154       .setPinning(pinning)
155       .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
156 
157     if (source != null) {
158       proto.setSource(PBHelper.convertDatanodeInfo(source));
159     }
160 
161     send(out, Op.WRITE_BLOCK, proto.build());
162   }
163 
164   @Override
transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes)165   public void transferBlock(final ExtendedBlock blk,
166       final Token<BlockTokenIdentifier> blockToken,
167       final String clientName,
168       final DatanodeInfo[] targets,
169       final StorageType[] targetStorageTypes) throws IOException {
170 
171     OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
172       .setHeader(DataTransferProtoUtil.buildClientHeader(
173           blk, clientName, blockToken))
174       .addAllTargets(PBHelper.convert(targets))
175       .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
176       .build();
177 
178     send(out, Op.TRANSFER_BLOCK, proto);
179   }
180 
181   @Override
requestShortCircuitFds(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, SlotId slotId, int maxVersion, boolean supportsReceiptVerification)182   public void requestShortCircuitFds(final ExtendedBlock blk,
183       final Token<BlockTokenIdentifier> blockToken,
184       SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
185         throws IOException {
186     OpRequestShortCircuitAccessProto.Builder builder =
187         OpRequestShortCircuitAccessProto.newBuilder()
188           .setHeader(DataTransferProtoUtil.buildBaseHeader(
189             blk, blockToken)).setMaxVersion(maxVersion);
190     if (slotId != null) {
191       builder.setSlotId(PBHelper.convert(slotId));
192     }
193     builder.setSupportsReceiptVerification(supportsReceiptVerification);
194     OpRequestShortCircuitAccessProto proto = builder.build();
195     send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
196   }
197 
198   @Override
releaseShortCircuitFds(SlotId slotId)199   public void releaseShortCircuitFds(SlotId slotId) throws IOException {
200     ReleaseShortCircuitAccessRequestProto.Builder builder =
201         ReleaseShortCircuitAccessRequestProto.newBuilder().
202         setSlotId(PBHelper.convert(slotId));
203     if (Trace.isTracing()) {
204       Span s = Trace.currentSpan();
205       builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
206           .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
207     }
208     ReleaseShortCircuitAccessRequestProto proto = builder.build();
209     send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
210   }
211 
212   @Override
requestShortCircuitShm(String clientName)213   public void requestShortCircuitShm(String clientName) throws IOException {
214     ShortCircuitShmRequestProto.Builder builder =
215         ShortCircuitShmRequestProto.newBuilder().
216         setClientName(clientName);
217     if (Trace.isTracing()) {
218       Span s = Trace.currentSpan();
219       builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
220           .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
221     }
222     ShortCircuitShmRequestProto proto = builder.build();
223     send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
224   }
225 
226   @Override
replaceBlock(final ExtendedBlock blk, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo source)227   public void replaceBlock(final ExtendedBlock blk,
228       final StorageType storageType,
229       final Token<BlockTokenIdentifier> blockToken,
230       final String delHint,
231       final DatanodeInfo source) throws IOException {
232     OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
233       .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
234       .setStorageType(PBHelper.convertStorageType(storageType))
235       .setDelHint(delHint)
236       .setSource(PBHelper.convertDatanodeInfo(source))
237       .build();
238 
239     send(out, Op.REPLACE_BLOCK, proto);
240   }
241 
242   @Override
copyBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken)243   public void copyBlock(final ExtendedBlock blk,
244       final Token<BlockTokenIdentifier> blockToken) throws IOException {
245     OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
246       .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
247       .build();
248 
249     send(out, Op.COPY_BLOCK, proto);
250   }
251 
252   @Override
blockChecksum(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken)253   public void blockChecksum(final ExtendedBlock blk,
254       final Token<BlockTokenIdentifier> blockToken) throws IOException {
255     OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
256       .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
257       .build();
258 
259     send(out, Op.BLOCK_CHECKSUM, proto);
260   }
261 }
262