1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.datanode;
19 
20 import java.io.File;
21 import java.io.FileDescriptor;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.nio.channels.ClosedChannelException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 
35 import javax.management.NotCompliantMBeanException;
36 import javax.management.ObjectName;
37 import javax.management.StandardMBean;
38 
39 import org.apache.commons.lang.ArrayUtils;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.StorageType;
42 import org.apache.hadoop.hdfs.DFSConfigKeys;
43 import org.apache.hadoop.hdfs.protocol.Block;
44 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
45 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
46 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
47 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
48 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
49 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
50 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
51 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
52 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
53 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
54 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
55 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
56 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
57 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
58 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
59 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
60 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
61 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
62 import org.apache.hadoop.io.IOUtils;
63 import org.apache.hadoop.metrics2.util.MBeans;
64 import org.apache.hadoop.util.DataChecksum;
65 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
66 
67 /**
68  * This class implements a simulated FSDataset.
69  *
70  * Blocks that are created are recorded but their data (plus their CRCs) are
71  *  discarded.
72  * Fixed data is returned when blocks are read; a null CRC meta file is
73  * created for such data.
74  *
75  * This FSDataset does not remember any block information across its
76  * restarts; it does however offer an operation to inject blocks
77  *  (See the TestInectionForSImulatedStorage()
78  * for a usage example of injection.
79  *
80  * Note the synchronization is coarse grained - it is at each method.
81  */
82 public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
83   static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
84     @Override
newInstance(DataNode datanode, DataStorage storage, Configuration conf)85     public SimulatedFSDataset newInstance(DataNode datanode,
86         DataStorage storage, Configuration conf) throws IOException {
87       return new SimulatedFSDataset(storage, conf);
88     }
89 
90     @Override
isSimulated()91     public boolean isSimulated() {
92       return true;
93     }
94   }
95 
setFactory(Configuration conf)96   public static void setFactory(Configuration conf) {
97     conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
98         Factory.class.getName());
99   }
100 
101   public static final String CONFIG_PROPERTY_CAPACITY =
102       "dfs.datanode.simulateddatastorage.capacity";
103 
104   public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
105   public static final byte DEFAULT_DATABYTE = 9;
106 
107   public static final String CONFIG_PROPERTY_STATE =
108       "dfs.datanode.simulateddatastorage.state";
109   private static final DatanodeStorage.State DEFAULT_STATE =
110       DatanodeStorage.State.NORMAL;
111 
112   static final byte[] nullCrcFileData;
113   static {
114     DataChecksum checksum = DataChecksum.newDataChecksum(
115         DataChecksum.Type.NULL, 16*1024 );
116     byte[] nullCrcHeader = checksum.getHeader();
117     nullCrcFileData =  new byte[2 + nullCrcHeader.length];
118     nullCrcFileData[0] = (byte) ((BlockMetadataHeader.VERSION >>> 8) & 0xff);
119     nullCrcFileData[1] = (byte) (BlockMetadataHeader.VERSION & 0xff);
120     for (int i = 0; i < nullCrcHeader.length; i++) {
121       nullCrcFileData[i+2] = nullCrcHeader[i];
122     }
123   }
124 
125   // information about a single block
126   private class BInfo implements ReplicaInPipelineInterface {
127     final Block theBlock;
128     private boolean finalized = false; // if not finalized => ongoing creation
129     SimulatedOutputStream oStream = null;
130     private long bytesAcked;
131     private long bytesRcvd;
132     private boolean pinned = false;
BInfo(String bpid, Block b, boolean forWriting)133     BInfo(String bpid, Block b, boolean forWriting) throws IOException {
134       theBlock = new Block(b);
135       if (theBlock.getNumBytes() < 0) {
136         theBlock.setNumBytes(0);
137       }
138       if (!storage.alloc(bpid, theBlock.getNumBytes())) {
139         // expected length - actual length may
140         // be more - we find out at finalize
141         DataNode.LOG.warn("Lack of free storage on a block alloc");
142         throw new IOException("Creating block, no free space available");
143       }
144 
145       if (forWriting) {
146         finalized = false;
147         oStream = new SimulatedOutputStream();
148       } else {
149         finalized = true;
150         oStream = null;
151       }
152     }
153 
154     @Override
getStorageUuid()155     public String getStorageUuid() {
156       return storage.getStorageUuid();
157     }
158 
159     @Override
getGenerationStamp()160     synchronized public long getGenerationStamp() {
161       return theBlock.getGenerationStamp();
162     }
163 
164     @Override
getNumBytes()165     synchronized public long getNumBytes() {
166       if (!finalized) {
167          return bytesRcvd;
168       } else {
169         return theBlock.getNumBytes();
170       }
171     }
172 
173     @Override
setNumBytes(long length)174     synchronized public void setNumBytes(long length) {
175       if (!finalized) {
176          bytesRcvd = length;
177       } else {
178         theBlock.setNumBytes(length);
179       }
180     }
181 
getIStream()182     synchronized SimulatedInputStream getIStream() {
183       if (!finalized) {
184         // throw new IOException("Trying to read an unfinalized block");
185          return new SimulatedInputStream(oStream.getLength(), DEFAULT_DATABYTE);
186       } else {
187         return new SimulatedInputStream(theBlock.getNumBytes(), DEFAULT_DATABYTE);
188       }
189     }
190 
finalizeBlock(String bpid, long finalSize)191     synchronized void finalizeBlock(String bpid, long finalSize)
192         throws IOException {
193       if (finalized) {
194         throw new IOException(
195             "Finalizing a block that has already been finalized" +
196             theBlock.getBlockId());
197       }
198       if (oStream == null) {
199         DataNode.LOG.error("Null oStream on unfinalized block - bug");
200         throw new IOException("Unexpected error on finalize");
201       }
202 
203       if (oStream.getLength() != finalSize) {
204         DataNode.LOG.warn("Size passed to finalize (" + finalSize +
205                     ")does not match what was written:" + oStream.getLength());
206         throw new IOException(
207           "Size passed to finalize does not match the amount of data written");
208       }
209       // We had allocated the expected length when block was created;
210       // adjust if necessary
211       long extraLen = finalSize - theBlock.getNumBytes();
212       if (extraLen > 0) {
213         if (!storage.alloc(bpid,extraLen)) {
214           DataNode.LOG.warn("Lack of free storage on a block alloc");
215           throw new IOException("Creating block, no free space available");
216         }
217       } else {
218         storage.free(bpid, -extraLen);
219       }
220       theBlock.setNumBytes(finalSize);
221 
222       finalized = true;
223       oStream = null;
224       return;
225     }
226 
unfinalizeBlock()227     synchronized void unfinalizeBlock() throws IOException {
228       if (!finalized) {
229         throw new IOException("Unfinalized a block that's not finalized "
230             + theBlock);
231       }
232       finalized = false;
233       oStream = new SimulatedOutputStream();
234       long blockLen = theBlock.getNumBytes();
235       oStream.setLength(blockLen);
236       bytesRcvd = blockLen;
237       bytesAcked = blockLen;
238     }
239 
getMetaIStream()240     SimulatedInputStream getMetaIStream() {
241       return new SimulatedInputStream(nullCrcFileData);
242     }
243 
isFinalized()244     synchronized boolean isFinalized() {
245       return finalized;
246     }
247 
248     @Override
createStreams(boolean isCreate, DataChecksum requestedChecksum)249     synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
250         DataChecksum requestedChecksum) throws IOException {
251       if (finalized) {
252         throw new IOException("Trying to write to a finalized replica "
253             + theBlock);
254       } else {
255         SimulatedOutputStream crcStream = new SimulatedOutputStream();
256         return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
257             volume.isTransientStorage());
258       }
259     }
260 
261     @Override
getBlockId()262     synchronized public long getBlockId() {
263       return theBlock.getBlockId();
264     }
265 
266     @Override
getVisibleLength()267     synchronized public long getVisibleLength() {
268       return getBytesAcked();
269     }
270 
271     @Override
getState()272     public ReplicaState getState() {
273       return finalized ? ReplicaState.FINALIZED : ReplicaState.RBW;
274     }
275 
276     @Override
getBytesAcked()277     synchronized public long getBytesAcked() {
278       if (finalized) {
279         return theBlock.getNumBytes();
280       } else {
281         return bytesAcked;
282       }
283     }
284 
285     @Override
setBytesAcked(long bytesAcked)286     synchronized public void setBytesAcked(long bytesAcked) {
287       if (!finalized) {
288         this.bytesAcked = bytesAcked;
289       }
290     }
291 
292     @Override
releaseAllBytesReserved()293     public void releaseAllBytesReserved() {
294     }
295 
296     @Override
getBytesOnDisk()297     synchronized public long getBytesOnDisk() {
298       if (finalized) {
299         return theBlock.getNumBytes();
300       } else {
301         return oStream.getLength();
302       }
303     }
304 
305     @Override
setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum)306     public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
307       oStream.setLength(dataLength);
308     }
309 
310     @Override
getLastChecksumAndDataLen()311     public ChunkChecksum getLastChecksumAndDataLen() {
312       return new ChunkChecksum(oStream.getLength(), null);
313     }
314 
315     @Override
isOnTransientStorage()316     public boolean isOnTransientStorage() {
317       return false;
318     }
319   }
320 
321   /**
322    * Class is used for tracking block pool storage utilization similar
323    * to {@link BlockPoolSlice}
324    */
325   private static class SimulatedBPStorage {
326     private long used;    // in bytes
327 
getUsed()328     long getUsed() {
329       return used;
330     }
331 
alloc(long amount)332     void alloc(long amount) {
333       used += amount;
334     }
335 
free(long amount)336     void free(long amount) {
337       used -= amount;
338     }
339 
SimulatedBPStorage()340     SimulatedBPStorage() {
341       used = 0;
342     }
343   }
344 
345   /**
346    * Class used for tracking datanode level storage utilization similar
347    * to {@link FSVolumeSet}
348    */
349   private static class SimulatedStorage {
350     private final Map<String, SimulatedBPStorage> map =
351       new HashMap<String, SimulatedBPStorage>();
352 
353     private final long capacity;  // in bytes
354     private final DatanodeStorage dnStorage;
355 
getFree()356     synchronized long getFree() {
357       return capacity - getUsed();
358     }
359 
getCapacity()360     long getCapacity() {
361       return capacity;
362     }
363 
getUsed()364     synchronized long getUsed() {
365       long used = 0;
366       for (SimulatedBPStorage bpStorage : map.values()) {
367         used += bpStorage.getUsed();
368       }
369       return used;
370     }
371 
getBlockPoolUsed(String bpid)372     synchronized long getBlockPoolUsed(String bpid) throws IOException {
373       return getBPStorage(bpid).getUsed();
374     }
375 
getNumFailedVolumes()376     int getNumFailedVolumes() {
377       return 0;
378     }
379 
alloc(String bpid, long amount)380     synchronized boolean alloc(String bpid, long amount) throws IOException {
381       if (getFree() >= amount) {
382         getBPStorage(bpid).alloc(amount);
383         return true;
384       }
385       return false;
386     }
387 
free(String bpid, long amount)388     synchronized void free(String bpid, long amount) throws IOException {
389       getBPStorage(bpid).free(amount);
390     }
391 
SimulatedStorage(long cap, DatanodeStorage.State state)392     SimulatedStorage(long cap, DatanodeStorage.State state) {
393       capacity = cap;
394       dnStorage = new DatanodeStorage(
395           "SimulatedStorage-" + DatanodeStorage.generateUuid(),
396           state, StorageType.DEFAULT);
397     }
398 
addBlockPool(String bpid)399     synchronized void addBlockPool(String bpid) {
400       SimulatedBPStorage bpStorage = map.get(bpid);
401       if (bpStorage != null) {
402         return;
403       }
404       map.put(bpid, new SimulatedBPStorage());
405     }
406 
removeBlockPool(String bpid)407     synchronized void removeBlockPool(String bpid) {
408       map.remove(bpid);
409     }
410 
getBPStorage(String bpid)411     private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
412       SimulatedBPStorage bpStorage = map.get(bpid);
413       if (bpStorage == null) {
414         throw new IOException("block pool " + bpid + " not found");
415       }
416       return bpStorage;
417     }
418 
getStorageUuid()419     String getStorageUuid() {
420       return dnStorage.getStorageID();
421     }
422 
getDnStorage()423     DatanodeStorage getDnStorage() {
424       return dnStorage;
425     }
426 
getStorageReport(String bpid)427     synchronized StorageReport getStorageReport(String bpid) {
428       return new StorageReport(dnStorage,
429           false, getCapacity(), getUsed(), getFree(),
430           map.get(bpid).getUsed());
431     }
432   }
433 
434   static class SimulatedVolume implements FsVolumeSpi {
435     private final SimulatedStorage storage;
436 
SimulatedVolume(final SimulatedStorage storage)437     SimulatedVolume(final SimulatedStorage storage) {
438       this.storage = storage;
439     }
440 
441     @Override
obtainReference()442     public FsVolumeReference obtainReference() throws ClosedChannelException {
443       return null;
444     }
445 
446     @Override
getStorageID()447     public String getStorageID() {
448       return storage.getStorageUuid();
449     }
450 
451     @Override
getBlockPoolList()452     public String[] getBlockPoolList() {
453       return new String[0];
454     }
455 
456     @Override
getAvailable()457     public long getAvailable() throws IOException {
458       return storage.getCapacity() - storage.getUsed();
459     }
460 
461     @Override
getBasePath()462     public String getBasePath() {
463       return null;
464     }
465 
466     @Override
getPath(String bpid)467     public String getPath(String bpid) throws IOException {
468       return null;
469     }
470 
471     @Override
getFinalizedDir(String bpid)472     public File getFinalizedDir(String bpid) throws IOException {
473       return null;
474     }
475 
476     @Override
getStorageType()477     public StorageType getStorageType() {
478       return null;
479     }
480 
481     @Override
isTransientStorage()482     public boolean isTransientStorage() {
483       return false;
484     }
485 
486     @Override
reserveSpaceForRbw(long bytesToReserve)487     public void reserveSpaceForRbw(long bytesToReserve) {
488     }
489 
490     @Override
releaseReservedSpace(long bytesToRelease)491     public void releaseReservedSpace(long bytesToRelease) {
492     }
493 
494     @Override
newBlockIterator(String bpid, String name)495     public BlockIterator newBlockIterator(String bpid, String name) {
496       throw new UnsupportedOperationException();
497     }
498 
499     @Override
loadBlockIterator(String bpid, String name)500     public BlockIterator loadBlockIterator(String bpid, String name)
501         throws IOException {
502       throw new UnsupportedOperationException();
503     }
504 
505     @Override
getDataset()506     public FsDatasetSpi getDataset() {
507       throw new UnsupportedOperationException();
508     }
509   }
510 
511   private final Map<String, Map<Block, BInfo>> blockMap
512       = new HashMap<String, Map<Block,BInfo>>();
513   private final SimulatedStorage storage;
514   private final SimulatedVolume volume;
515   private final String datanodeUuid;
516 
SimulatedFSDataset(DataStorage storage, Configuration conf)517   public SimulatedFSDataset(DataStorage storage, Configuration conf) {
518     if (storage != null) {
519       for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
520         storage.createStorageID(storage.getStorageDir(i), false);
521       }
522       this.datanodeUuid = storage.getDatanodeUuid();
523     } else {
524       this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid();
525     }
526 
527     registerMBean(datanodeUuid);
528     this.storage = new SimulatedStorage(
529         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
530         conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
531     this.volume = new SimulatedVolume(this.storage);
532   }
533 
injectBlocks(String bpid, Iterable<? extends Block> injectBlocks)534   public synchronized void injectBlocks(String bpid,
535       Iterable<? extends Block> injectBlocks) throws IOException {
536     ExtendedBlock blk = new ExtendedBlock();
537     if (injectBlocks != null) {
538       for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
539         if (b == null) {
540           throw new NullPointerException("Null blocks in block list");
541         }
542         blk.set(bpid, b);
543         if (isValidBlock(blk)) {
544           throw new IOException("Block already exists in  block list");
545         }
546       }
547       Map<Block, BInfo> map = blockMap.get(bpid);
548       if (map == null) {
549         map = new HashMap<Block, BInfo>();
550         blockMap.put(bpid, map);
551       }
552 
553       for (Block b: injectBlocks) {
554         BInfo binfo = new BInfo(bpid, b, false);
555         map.put(binfo.theBlock, binfo);
556       }
557     }
558   }
559 
560   /** Get a map for a given block pool Id */
getMap(String bpid)561   private Map<Block, BInfo> getMap(String bpid) throws IOException {
562     final Map<Block, BInfo> map = blockMap.get(bpid);
563     if (map == null) {
564       throw new IOException("Non existent blockpool " + bpid);
565     }
566     return map;
567   }
568 
569   @Override // FsDatasetSpi
finalizeBlock(ExtendedBlock b)570   public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
571     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
572     BInfo binfo = map.get(b.getLocalBlock());
573     if (binfo == null) {
574       throw new IOException("Finalizing a non existing block " + b);
575     }
576     binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes());
577   }
578 
579   @Override // FsDatasetSpi
unfinalizeBlock(ExtendedBlock b)580   public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{
581     if (isValidRbw(b)) {
582       final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
583       map.remove(b.getLocalBlock());
584     }
585   }
586 
getBlockReport(String bpid)587   synchronized BlockListAsLongs getBlockReport(String bpid) {
588     BlockListAsLongs.Builder report = BlockListAsLongs.builder();
589     final Map<Block, BInfo> map = blockMap.get(bpid);
590     if (map != null) {
591       for (BInfo b : map.values()) {
592         if (b.isFinalized()) {
593           report.add(b);
594         }
595       }
596     }
597     return report.build();
598   }
599 
600   @Override
getBlockReports( String bpid)601   public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
602       String bpid) {
603     return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid));
604   }
605 
606   @Override // FsDatasetSpi
getCacheReport(String bpid)607   public List<Long> getCacheReport(String bpid) {
608     return new LinkedList<Long>();
609   }
610 
611   @Override // FSDatasetMBean
getCapacity()612   public long getCapacity() {
613     return storage.getCapacity();
614   }
615 
616   @Override // FSDatasetMBean
getDfsUsed()617   public long getDfsUsed() {
618     return storage.getUsed();
619   }
620 
621   @Override // FSDatasetMBean
getBlockPoolUsed(String bpid)622   public long getBlockPoolUsed(String bpid) throws IOException {
623     return storage.getBlockPoolUsed(bpid);
624   }
625 
626   @Override // FSDatasetMBean
getRemaining()627   public long getRemaining() {
628     return storage.getFree();
629   }
630 
631   @Override // FSDatasetMBean
getNumFailedVolumes()632   public int getNumFailedVolumes() {
633     return storage.getNumFailedVolumes();
634   }
635 
636   @Override // FSDatasetMBean
getFailedStorageLocations()637   public String[] getFailedStorageLocations() {
638     return null;
639   }
640 
641   @Override // FSDatasetMBean
getLastVolumeFailureDate()642   public long getLastVolumeFailureDate() {
643     return 0;
644   }
645 
646   @Override // FSDatasetMBean
getEstimatedCapacityLostTotal()647   public long getEstimatedCapacityLostTotal() {
648     return 0;
649   }
650 
651   @Override // FsDatasetSpi
getVolumeFailureSummary()652   public VolumeFailureSummary getVolumeFailureSummary() {
653     return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0);
654   }
655 
656   @Override // FSDatasetMBean
getCacheUsed()657   public long getCacheUsed() {
658     return 0l;
659   }
660 
661   @Override // FSDatasetMBean
getCacheCapacity()662   public long getCacheCapacity() {
663     return 0l;
664   }
665 
666   @Override // FSDatasetMBean
getNumBlocksCached()667   public long getNumBlocksCached() {
668     return 0l;
669   }
670 
671   @Override
getNumBlocksFailedToCache()672   public long getNumBlocksFailedToCache() {
673     return 0l;
674   }
675 
676   @Override
getNumBlocksFailedToUncache()677   public long getNumBlocksFailedToUncache() {
678     return 0l;
679   }
680 
681   @Override // FsDatasetSpi
getLength(ExtendedBlock b)682   public synchronized long getLength(ExtendedBlock b) throws IOException {
683     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
684     BInfo binfo = map.get(b.getLocalBlock());
685     if (binfo == null) {
686       throw new IOException("Finalizing a non existing block " + b);
687     }
688     return binfo.getNumBytes();
689   }
690 
691   @Override
692   @Deprecated
getReplica(String bpid, long blockId)693   public Replica getReplica(String bpid, long blockId) {
694     final Map<Block, BInfo> map = blockMap.get(bpid);
695     if (map != null) {
696       return map.get(new Block(blockId));
697     }
698     return null;
699   }
700 
701   @Override
getReplicaString(String bpid, long blockId)702   public synchronized String getReplicaString(String bpid, long blockId) {
703     Replica r = null;
704     final Map<Block, BInfo> map = blockMap.get(bpid);
705     if (map != null) {
706       r = map.get(new Block(blockId));
707     }
708     return r == null? "null": r.toString();
709   }
710 
711   @Override // FsDatasetSpi
getStoredBlock(String bpid, long blkid)712   public Block getStoredBlock(String bpid, long blkid) throws IOException {
713     final Map<Block, BInfo> map = blockMap.get(bpid);
714     if (map != null) {
715       BInfo binfo = map.get(new Block(blkid));
716       if (binfo == null) {
717         return null;
718       }
719       return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes());
720     }
721     return null;
722   }
723 
724   @Override // FsDatasetSpi
invalidate(String bpid, Block[] invalidBlks)725   public synchronized void invalidate(String bpid, Block[] invalidBlks)
726       throws IOException {
727     boolean error = false;
728     if (invalidBlks == null) {
729       return;
730     }
731     final Map<Block, BInfo> map = getMap(bpid);
732     for (Block b: invalidBlks) {
733       if (b == null) {
734         continue;
735       }
736       BInfo binfo = map.get(b);
737       if (binfo == null) {
738         error = true;
739         DataNode.LOG.warn("Invalidate: Missing block");
740         continue;
741       }
742       storage.free(bpid, binfo.getNumBytes());
743       map.remove(b);
744     }
745     if (error) {
746       throw new IOException("Invalidate: Missing blocks.");
747     }
748   }
749 
750   @Override // FSDatasetSpi
cache(String bpid, long[] cacheBlks)751   public void cache(String bpid, long[] cacheBlks) {
752     throw new UnsupportedOperationException(
753         "SimulatedFSDataset does not support cache operation!");
754   }
755 
756   @Override // FSDatasetSpi
uncache(String bpid, long[] uncacheBlks)757   public void uncache(String bpid, long[] uncacheBlks) {
758     throw new UnsupportedOperationException(
759         "SimulatedFSDataset does not support uncache operation!");
760   }
761 
762   @Override // FSDatasetSpi
isCached(String bpid, long blockId)763   public boolean isCached(String bpid, long blockId) {
764     return false;
765   }
766 
getBInfo(final ExtendedBlock b)767   private BInfo getBInfo(final ExtendedBlock b) {
768     final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
769     return map == null? null: map.get(b.getLocalBlock());
770   }
771 
772   @Override // {@link FsDatasetSpi}
contains(ExtendedBlock block)773   public boolean contains(ExtendedBlock block) {
774     return getBInfo(block) != null;
775   }
776 
777   /**
778    * Check if a block is valid.
779    *
780    * @param b           The block to check.
781    * @param minLength   The minimum length that the block must have.  May be 0.
782    * @param state       If this is null, it is ignored.  If it is non-null, we
783    *                        will check that the replica has this state.
784    *
785    * @throws ReplicaNotFoundException          If the replica is not found
786    *
787    * @throws UnexpectedReplicaStateException   If the replica is not in the
788    *                                             expected state.
789    */
790   @Override // {@link FsDatasetSpi}
checkBlock(ExtendedBlock b, long minLength, ReplicaState state)791   public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
792       throws ReplicaNotFoundException, UnexpectedReplicaStateException {
793     final BInfo binfo = getBInfo(b);
794 
795     if (binfo == null) {
796       throw new ReplicaNotFoundException(b);
797     }
798     if ((state == ReplicaState.FINALIZED && !binfo.isFinalized()) ||
799         (state != ReplicaState.FINALIZED && binfo.isFinalized())) {
800       throw new UnexpectedReplicaStateException(b,state);
801     }
802   }
803 
804   @Override // FsDatasetSpi
isValidBlock(ExtendedBlock b)805   public synchronized boolean isValidBlock(ExtendedBlock b) {
806     try {
807       checkBlock(b, 0, ReplicaState.FINALIZED);
808     } catch (IOException e) {
809       return false;
810     }
811     return true;
812   }
813 
814   /* check if a block is created but not finalized */
815   @Override
isValidRbw(ExtendedBlock b)816   public synchronized boolean isValidRbw(ExtendedBlock b) {
817     try {
818       checkBlock(b, 0, ReplicaState.RBW);
819     } catch (IOException e) {
820       return false;
821     }
822     return true;
823   }
824 
825   @Override
toString()826   public String toString() {
827     return getStorageInfo();
828   }
829 
830   @Override // FsDatasetSpi
append( ExtendedBlock b, long newGS, long expectedBlockLen)831   public synchronized ReplicaHandler append(
832       ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
833     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
834     BInfo binfo = map.get(b.getLocalBlock());
835     if (binfo == null || !binfo.isFinalized()) {
836       throw new ReplicaNotFoundException("Block " + b
837           + " is not valid, and cannot be appended to.");
838     }
839     binfo.unfinalizeBlock();
840     return new ReplicaHandler(binfo, null);
841   }
842 
843   @Override // FsDatasetSpi
recoverAppend( ExtendedBlock b, long newGS, long expectedBlockLen)844   public synchronized ReplicaHandler recoverAppend(
845       ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
846     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
847     BInfo binfo = map.get(b.getLocalBlock());
848     if (binfo == null) {
849       throw new ReplicaNotFoundException("Block " + b
850           + " is not valid, and cannot be appended to.");
851     }
852     if (binfo.isFinalized()) {
853       binfo.unfinalizeBlock();
854     }
855     map.remove(b);
856     binfo.theBlock.setGenerationStamp(newGS);
857     map.put(binfo.theBlock, binfo);
858     return new ReplicaHandler(binfo, null);
859   }
860 
861   @Override // FsDatasetSpi
recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)862   public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
863       throws IOException {
864     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
865     BInfo binfo = map.get(b.getLocalBlock());
866     if (binfo == null) {
867       throw new ReplicaNotFoundException("Block " + b
868           + " is not valid, and cannot be appended to.");
869     }
870     if (!binfo.isFinalized()) {
871       binfo.finalizeBlock(b.getBlockPoolId(), binfo.getNumBytes());
872     }
873     map.remove(b.getLocalBlock());
874     binfo.theBlock.setGenerationStamp(newGS);
875     map.put(binfo.theBlock, binfo);
876     return binfo.getStorageUuid();
877   }
878 
879   @Override // FsDatasetSpi
recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)880   public synchronized ReplicaHandler recoverRbw(
881       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
882       throws IOException {
883     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
884     BInfo binfo = map.get(b.getLocalBlock());
885     if ( binfo == null) {
886       throw new ReplicaNotFoundException("Block " + b
887           + " does not exist, and cannot be appended to.");
888     }
889     if (binfo.isFinalized()) {
890       throw new ReplicaAlreadyExistsException("Block " + b
891           + " is valid, and cannot be written to.");
892     }
893     map.remove(b);
894     binfo.theBlock.setGenerationStamp(newGS);
895     map.put(binfo.theBlock, binfo);
896     return new ReplicaHandler(binfo, null);
897   }
898 
899   @Override // FsDatasetSpi
createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)900   public synchronized ReplicaHandler createRbw(
901       StorageType storageType, ExtendedBlock b,
902       boolean allowLazyPersist) throws IOException {
903     return createTemporary(storageType, b);
904   }
905 
906   @Override // FsDatasetSpi
createTemporary( StorageType storageType, ExtendedBlock b)907   public synchronized ReplicaHandler createTemporary(
908       StorageType storageType, ExtendedBlock b) throws IOException {
909     if (isValidBlock(b)) {
910           throw new ReplicaAlreadyExistsException("Block " + b +
911               " is valid, and cannot be written to.");
912       }
913     if (isValidRbw(b)) {
914         throw new ReplicaAlreadyExistsException("Block " + b +
915             " is being written, and cannot be written to.");
916     }
917     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
918     BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
919     map.put(binfo.theBlock, binfo);
920     return new ReplicaHandler(binfo, null);
921   }
922 
getBlockInputStream(ExtendedBlock b )923   synchronized InputStream getBlockInputStream(ExtendedBlock b
924       ) throws IOException {
925     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
926     BInfo binfo = map.get(b.getLocalBlock());
927     if (binfo == null) {
928       throw new IOException("No such Block " + b );
929     }
930 
931     return binfo.getIStream();
932   }
933 
934   @Override // FsDatasetSpi
getBlockInputStream(ExtendedBlock b, long seekOffset)935   public synchronized InputStream getBlockInputStream(ExtendedBlock b,
936       long seekOffset) throws IOException {
937     InputStream result = getBlockInputStream(b);
938     IOUtils.skipFully(result, seekOffset);
939     return result;
940   }
941 
942   /** Not supported */
943   @Override // FsDatasetSpi
getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff)944   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
945       long ckoff) throws IOException {
946     throw new IOException("Not supported");
947   }
948 
949   @Override // FsDatasetSpi
getMetaDataInputStream(ExtendedBlock b )950   public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b
951       ) throws IOException {
952     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
953     BInfo binfo = map.get(b.getLocalBlock());
954     if (binfo == null) {
955       throw new IOException("No such Block " + b );
956     }
957     if (!binfo.finalized) {
958       throw new IOException("Block " + b +
959           " is being written, its meta cannot be read");
960     }
961     final SimulatedInputStream sin = binfo.getMetaIStream();
962     return new LengthInputStream(sin, sin.getLength());
963   }
964 
965   @Override
checkDataDir()966   public Set<File> checkDataDir() {
967     // nothing to check for simulated data set
968     return null;
969   }
970 
971   @Override // FsDatasetSpi
adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams stream, int checksumSize)972   public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
973                                               ReplicaOutputStreams stream,
974                                               int checksumSize)
975                                               throws IOException {
976   }
977 
978   /**
979    * Simulated input and output streams
980    *
981    */
982   static private class SimulatedInputStream extends java.io.InputStream {
983 
984 
985     byte theRepeatedData = 7;
986     final long length; // bytes
987     int currentPos = 0;
988     byte[] data = null;
989 
990     /**
991      * An input stream of size l with repeated bytes
992      * @param l size of the stream
993      * @param iRepeatedData byte that is repeated in the stream
994      */
SimulatedInputStream(long l, byte iRepeatedData)995     SimulatedInputStream(long l, byte iRepeatedData) {
996       length = l;
997       theRepeatedData = iRepeatedData;
998     }
999 
1000     /**
1001      * An input stream of of the supplied data
1002      * @param iData data to construct the stream
1003      */
SimulatedInputStream(byte[] iData)1004     SimulatedInputStream(byte[] iData) {
1005       data = iData;
1006       length = data.length;
1007     }
1008 
1009     /**
1010      * @return the lenght of the input stream
1011      */
getLength()1012     long getLength() {
1013       return length;
1014     }
1015 
1016     @Override
read()1017     public int read() throws IOException {
1018       if (currentPos >= length)
1019         return -1;
1020       if (data !=null) {
1021         return data[currentPos++];
1022       } else {
1023         currentPos++;
1024         return theRepeatedData;
1025       }
1026     }
1027 
1028     @Override
read(byte[] b)1029     public int read(byte[] b) throws IOException {
1030 
1031       if (b == null) {
1032         throw new NullPointerException();
1033       }
1034       if (b.length == 0) {
1035         return 0;
1036       }
1037       if (currentPos >= length) { // EOF
1038         return -1;
1039       }
1040       int bytesRead = (int) Math.min(b.length, length-currentPos);
1041       if (data != null) {
1042         System.arraycopy(data, currentPos, b, 0, bytesRead);
1043       } else { // all data is zero
1044         for (int i : b) {
1045           b[i] = theRepeatedData;
1046         }
1047       }
1048       currentPos += bytesRead;
1049       return bytesRead;
1050     }
1051   }
1052 
1053   /**
1054    * This class implements an output stream that merely throws its data away, but records its
1055    * length.
1056    *
1057    */
1058   static private class SimulatedOutputStream extends OutputStream {
1059     long length = 0;
1060 
1061     /**
1062      * constructor for Simulated Output Steram
1063      */
SimulatedOutputStream()1064     SimulatedOutputStream() {
1065     }
1066 
1067     /**
1068      *
1069      * @return the length of the data created so far.
1070      */
getLength()1071     long getLength() {
1072       return length;
1073     }
1074 
1075     /**
1076      */
setLength(long length)1077     void setLength(long length) {
1078       this.length = length;
1079     }
1080 
1081     @Override
write(int arg0)1082     public void write(int arg0) throws IOException {
1083       length++;
1084     }
1085 
1086     @Override
write(byte[] b)1087     public void write(byte[] b) throws IOException {
1088       length += b.length;
1089     }
1090 
1091     @Override
write(byte[] b, int off, int len)1092     public void write(byte[] b,
1093               int off,
1094               int len) throws IOException  {
1095       length += len;
1096     }
1097   }
1098 
1099   private ObjectName mbeanName;
1100 
1101 
1102 
1103   /**
1104    * Register the FSDataset MBean using the name
1105    *        "hadoop:service=DataNode,name=FSDatasetState-<storageid>"
1106    *  We use storage id for MBean name since a minicluster within a single
1107    * Java VM may have multiple Simulated Datanodes.
1108    */
registerMBean(final String storageId)1109   void registerMBean(final String storageId) {
1110     // We wrap to bypass standard mbean naming convetion.
1111     // This wraping can be removed in java 6 as it is more flexible in
1112     // package naming for mbeans and their impl.
1113     StandardMBean bean;
1114 
1115     try {
1116       bean = new StandardMBean(this,FSDatasetMBean.class);
1117       mbeanName = MBeans.register("DataNode", "FSDatasetState-"+
1118                                   storageId, bean);
1119     } catch (NotCompliantMBeanException e) {
1120       DataNode.LOG.warn("Error registering FSDatasetState MBean", e);
1121     }
1122 
1123     DataNode.LOG.info("Registered FSDatasetState MBean");
1124   }
1125 
1126   @Override
shutdown()1127   public void shutdown() {
1128     if (mbeanName != null) MBeans.unregister(mbeanName);
1129   }
1130 
1131   @Override
getStorageInfo()1132   public String getStorageInfo() {
1133     return "Simulated FSDataset-" + datanodeUuid;
1134   }
1135 
1136   @Override
hasEnoughResource()1137   public boolean hasEnoughResource() {
1138     return true;
1139   }
1140 
1141   @Override
initReplicaRecovery(RecoveringBlock rBlock)1142   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
1143   throws IOException {
1144     ExtendedBlock b = rBlock.getBlock();
1145     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
1146     BInfo binfo = map.get(b.getLocalBlock());
1147     if (binfo == null) {
1148       throw new IOException("No such Block " + b );
1149     }
1150 
1151     return new ReplicaRecoveryInfo(binfo.getBlockId(), binfo.getBytesOnDisk(),
1152         binfo.getGenerationStamp(),
1153         binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW);
1154   }
1155 
1156   @Override // FsDatasetSpi
updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newBlockId, long newlength)1157   public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
1158                                         long recoveryId,
1159                                         long newBlockId,
1160                                         long newlength) {
1161     // Caller does not care about the exact Storage UUID returned.
1162     return datanodeUuid;
1163   }
1164 
1165   @Override // FsDatasetSpi
getReplicaVisibleLength(ExtendedBlock block)1166   public long getReplicaVisibleLength(ExtendedBlock block) {
1167     return block.getNumBytes();
1168   }
1169 
1170   @Override // FsDatasetSpi
addBlockPool(String bpid, Configuration conf)1171   public void addBlockPool(String bpid, Configuration conf) {
1172     Map<Block, BInfo> map = new HashMap<Block, BInfo>();
1173     blockMap.put(bpid, map);
1174     storage.addBlockPool(bpid);
1175   }
1176 
1177   @Override // FsDatasetSpi
shutdownBlockPool(String bpid)1178   public void shutdownBlockPool(String bpid) {
1179     blockMap.remove(bpid);
1180     storage.removeBlockPool(bpid);
1181   }
1182 
1183   @Override // FsDatasetSpi
deleteBlockPool(String bpid, boolean force)1184   public void deleteBlockPool(String bpid, boolean force) {
1185      return;
1186   }
1187 
1188   @Override
convertTemporaryToRbw(ExtendedBlock temporary)1189   public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
1190       throws IOException {
1191     final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
1192     if (map == null) {
1193       throw new IOException("Block pool not found, temporary=" + temporary);
1194     }
1195     final BInfo r = map.get(temporary.getLocalBlock());
1196     if (r == null) {
1197       throw new IOException("Block not found, temporary=" + temporary);
1198     } else if (r.isFinalized()) {
1199       throw new IOException("Replica already finalized, temporary="
1200           + temporary + ", r=" + r);
1201     }
1202     return r;
1203   }
1204 
1205   @Override
getBlockLocalPathInfo(ExtendedBlock b)1206   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
1207     throw new UnsupportedOperationException();
1208   }
1209 
1210   @Override
getHdfsBlocksMetadata(String bpid, long[] blockIds)1211   public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds)
1212       throws IOException {
1213     throw new UnsupportedOperationException();
1214   }
1215 
1216   @Override
enableTrash(String bpid)1217   public void enableTrash(String bpid) {
1218     throw new UnsupportedOperationException();
1219   }
1220 
1221   @Override
clearTrash(String bpid)1222   public void clearTrash(String bpid) {
1223   }
1224 
1225   @Override
trashEnabled(String bpid)1226   public boolean trashEnabled(String bpid) {
1227     return false;
1228   }
1229 
1230   @Override
setRollingUpgradeMarker(String bpid)1231   public void setRollingUpgradeMarker(String bpid) {
1232   }
1233 
1234   @Override
clearRollingUpgradeMarker(String bpid)1235   public void clearRollingUpgradeMarker(String bpid) {
1236   }
1237 
1238   @Override
checkAndUpdate(String bpid, long blockId, File diskFile, File diskMetaFile, FsVolumeSpi vol)1239   public void checkAndUpdate(String bpid, long blockId, File diskFile,
1240       File diskMetaFile, FsVolumeSpi vol) throws IOException {
1241     throw new UnsupportedOperationException();
1242   }
1243 
1244   @Override
getVolumes()1245   public List<FsVolumeSpi> getVolumes() {
1246     throw new UnsupportedOperationException();
1247   }
1248 
1249   @Override
addVolume( final StorageLocation location, final List<NamespaceInfo> nsInfos)1250   public void addVolume(
1251       final StorageLocation location,
1252       final List<NamespaceInfo> nsInfos) throws IOException {
1253     throw new UnsupportedOperationException();
1254   }
1255 
1256   @Override
getStorage(final String storageUuid)1257   public DatanodeStorage getStorage(final String storageUuid) {
1258     return storageUuid.equals(storage.getStorageUuid()) ?
1259         storage.dnStorage :
1260         null;
1261   }
1262 
1263   @Override
getStorageReports(String bpid)1264   public StorageReport[] getStorageReports(String bpid) {
1265     return new StorageReport[] {storage.getStorageReport(bpid)};
1266   }
1267 
1268   @Override
getFinalizedBlocks(String bpid)1269   public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
1270     throw new UnsupportedOperationException();
1271   }
1272 
1273   @Override
getFinalizedBlocksOnPersistentStorage(String bpid)1274   public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
1275     throw new UnsupportedOperationException();
1276   }
1277 
1278   @Override
getVolumeInfoMap()1279   public Map<String, Object> getVolumeInfoMap() {
1280     throw new UnsupportedOperationException();
1281   }
1282 
1283   @Override
getVolume(ExtendedBlock b)1284   public FsVolumeSpi getVolume(ExtendedBlock b) {
1285     return volume;
1286   }
1287 
1288   @Override
removeVolumes(Set<File> volumes, boolean clearFailure)1289   public synchronized void removeVolumes(Set<File> volumes, boolean clearFailure) {
1290     throw new UnsupportedOperationException();
1291   }
1292 
1293   @Override
submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags)1294   public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
1295       FileDescriptor fd, long offset, long nbytes, int flags) {
1296     throw new UnsupportedOperationException();
1297   }
1298 
1299   @Override
onCompleteLazyPersist(String bpId, long blockId, long creationTime, File[] savedFiles, FsVolumeSpi targetVolume)1300   public void onCompleteLazyPersist(String bpId, long blockId,
1301       long creationTime, File[] savedFiles, FsVolumeSpi targetVolume) {
1302     throw new UnsupportedOperationException();
1303   }
1304 
1305   @Override
onFailLazyPersist(String bpId, long blockId)1306   public void onFailLazyPersist(String bpId, long blockId) {
1307     throw new UnsupportedOperationException();
1308   }
1309 
1310   @Override
moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType)1311   public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
1312       StorageType targetStorageType) throws IOException {
1313     // TODO Auto-generated method stub
1314     return null;
1315   }
1316 
1317   @Override
setPinning(ExtendedBlock b)1318   public void setPinning(ExtendedBlock b) throws IOException {
1319     blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
1320   }
1321 
1322   @Override
getPinning(ExtendedBlock b)1323   public boolean getPinning(ExtendedBlock b) throws IOException {
1324     return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
1325   }
1326 
1327   @Override
isDeletingBlock(String bpid, long blockId)1328   public boolean isDeletingBlock(String bpid, long blockId) {
1329     throw new UnsupportedOperationException();
1330   }
1331 }
1332 
1333