1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.datanode; 19 20 import java.io.File; 21 import java.io.FileDescriptor; 22 import java.io.IOException; 23 import java.io.InputStream; 24 import java.io.OutputStream; 25 import java.nio.channels.ClosedChannelException; 26 import java.util.ArrayList; 27 import java.util.Collection; 28 import java.util.Collections; 29 import java.util.HashMap; 30 import java.util.LinkedList; 31 import java.util.List; 32 import java.util.Map; 33 import java.util.Set; 34 35 import javax.management.NotCompliantMBeanException; 36 import javax.management.ObjectName; 37 import javax.management.StandardMBean; 38 39 import org.apache.commons.lang.ArrayUtils; 40 import org.apache.hadoop.conf.Configuration; 41 import org.apache.hadoop.fs.StorageType; 42 import org.apache.hadoop.hdfs.DFSConfigKeys; 43 import org.apache.hadoop.hdfs.protocol.Block; 44 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; 45 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; 46 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 47 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; 48 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; 49 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 50 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; 51 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; 52 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; 53 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; 54 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; 55 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; 56 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; 57 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; 58 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; 59 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; 60 import org.apache.hadoop.hdfs.server.protocol.StorageReport; 61 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; 62 import org.apache.hadoop.io.IOUtils; 63 import org.apache.hadoop.metrics2.util.MBeans; 64 import org.apache.hadoop.util.DataChecksum; 65 import org.apache.hadoop.util.DiskChecker.DiskErrorException; 66 67 /** 68 * This class implements a simulated FSDataset. 69 * 70 * Blocks that are created are recorded but their data (plus their CRCs) are 71 * discarded. 72 * Fixed data is returned when blocks are read; a null CRC meta file is 73 * created for such data. 74 * 75 * This FSDataset does not remember any block information across its 76 * restarts; it does however offer an operation to inject blocks 77 * (See the TestInectionForSImulatedStorage() 78 * for a usage example of injection. 79 * 80 * Note the synchronization is coarse grained - it is at each method. 81 */ 82 public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { 83 static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> { 84 @Override newInstance(DataNode datanode, DataStorage storage, Configuration conf)85 public SimulatedFSDataset newInstance(DataNode datanode, 86 DataStorage storage, Configuration conf) throws IOException { 87 return new SimulatedFSDataset(storage, conf); 88 } 89 90 @Override isSimulated()91 public boolean isSimulated() { 92 return true; 93 } 94 } 95 setFactory(Configuration conf)96 public static void setFactory(Configuration conf) { 97 conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, 98 Factory.class.getName()); 99 } 100 101 public static final String CONFIG_PROPERTY_CAPACITY = 102 "dfs.datanode.simulateddatastorage.capacity"; 103 104 public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte 105 public static final byte DEFAULT_DATABYTE = 9; 106 107 public static final String CONFIG_PROPERTY_STATE = 108 "dfs.datanode.simulateddatastorage.state"; 109 private static final DatanodeStorage.State DEFAULT_STATE = 110 DatanodeStorage.State.NORMAL; 111 112 static final byte[] nullCrcFileData; 113 static { 114 DataChecksum checksum = DataChecksum.newDataChecksum( 115 DataChecksum.Type.NULL, 16*1024 ); 116 byte[] nullCrcHeader = checksum.getHeader(); 117 nullCrcFileData = new byte[2 + nullCrcHeader.length]; 118 nullCrcFileData[0] = (byte) ((BlockMetadataHeader.VERSION >>> 8) & 0xff); 119 nullCrcFileData[1] = (byte) (BlockMetadataHeader.VERSION & 0xff); 120 for (int i = 0; i < nullCrcHeader.length; i++) { 121 nullCrcFileData[i+2] = nullCrcHeader[i]; 122 } 123 } 124 125 // information about a single block 126 private class BInfo implements ReplicaInPipelineInterface { 127 final Block theBlock; 128 private boolean finalized = false; // if not finalized => ongoing creation 129 SimulatedOutputStream oStream = null; 130 private long bytesAcked; 131 private long bytesRcvd; 132 private boolean pinned = false; BInfo(String bpid, Block b, boolean forWriting)133 BInfo(String bpid, Block b, boolean forWriting) throws IOException { 134 theBlock = new Block(b); 135 if (theBlock.getNumBytes() < 0) { 136 theBlock.setNumBytes(0); 137 } 138 if (!storage.alloc(bpid, theBlock.getNumBytes())) { 139 // expected length - actual length may 140 // be more - we find out at finalize 141 DataNode.LOG.warn("Lack of free storage on a block alloc"); 142 throw new IOException("Creating block, no free space available"); 143 } 144 145 if (forWriting) { 146 finalized = false; 147 oStream = new SimulatedOutputStream(); 148 } else { 149 finalized = true; 150 oStream = null; 151 } 152 } 153 154 @Override getStorageUuid()155 public String getStorageUuid() { 156 return storage.getStorageUuid(); 157 } 158 159 @Override getGenerationStamp()160 synchronized public long getGenerationStamp() { 161 return theBlock.getGenerationStamp(); 162 } 163 164 @Override getNumBytes()165 synchronized public long getNumBytes() { 166 if (!finalized) { 167 return bytesRcvd; 168 } else { 169 return theBlock.getNumBytes(); 170 } 171 } 172 173 @Override setNumBytes(long length)174 synchronized public void setNumBytes(long length) { 175 if (!finalized) { 176 bytesRcvd = length; 177 } else { 178 theBlock.setNumBytes(length); 179 } 180 } 181 getIStream()182 synchronized SimulatedInputStream getIStream() { 183 if (!finalized) { 184 // throw new IOException("Trying to read an unfinalized block"); 185 return new SimulatedInputStream(oStream.getLength(), DEFAULT_DATABYTE); 186 } else { 187 return new SimulatedInputStream(theBlock.getNumBytes(), DEFAULT_DATABYTE); 188 } 189 } 190 finalizeBlock(String bpid, long finalSize)191 synchronized void finalizeBlock(String bpid, long finalSize) 192 throws IOException { 193 if (finalized) { 194 throw new IOException( 195 "Finalizing a block that has already been finalized" + 196 theBlock.getBlockId()); 197 } 198 if (oStream == null) { 199 DataNode.LOG.error("Null oStream on unfinalized block - bug"); 200 throw new IOException("Unexpected error on finalize"); 201 } 202 203 if (oStream.getLength() != finalSize) { 204 DataNode.LOG.warn("Size passed to finalize (" + finalSize + 205 ")does not match what was written:" + oStream.getLength()); 206 throw new IOException( 207 "Size passed to finalize does not match the amount of data written"); 208 } 209 // We had allocated the expected length when block was created; 210 // adjust if necessary 211 long extraLen = finalSize - theBlock.getNumBytes(); 212 if (extraLen > 0) { 213 if (!storage.alloc(bpid,extraLen)) { 214 DataNode.LOG.warn("Lack of free storage on a block alloc"); 215 throw new IOException("Creating block, no free space available"); 216 } 217 } else { 218 storage.free(bpid, -extraLen); 219 } 220 theBlock.setNumBytes(finalSize); 221 222 finalized = true; 223 oStream = null; 224 return; 225 } 226 unfinalizeBlock()227 synchronized void unfinalizeBlock() throws IOException { 228 if (!finalized) { 229 throw new IOException("Unfinalized a block that's not finalized " 230 + theBlock); 231 } 232 finalized = false; 233 oStream = new SimulatedOutputStream(); 234 long blockLen = theBlock.getNumBytes(); 235 oStream.setLength(blockLen); 236 bytesRcvd = blockLen; 237 bytesAcked = blockLen; 238 } 239 getMetaIStream()240 SimulatedInputStream getMetaIStream() { 241 return new SimulatedInputStream(nullCrcFileData); 242 } 243 isFinalized()244 synchronized boolean isFinalized() { 245 return finalized; 246 } 247 248 @Override createStreams(boolean isCreate, DataChecksum requestedChecksum)249 synchronized public ReplicaOutputStreams createStreams(boolean isCreate, 250 DataChecksum requestedChecksum) throws IOException { 251 if (finalized) { 252 throw new IOException("Trying to write to a finalized replica " 253 + theBlock); 254 } else { 255 SimulatedOutputStream crcStream = new SimulatedOutputStream(); 256 return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, 257 volume.isTransientStorage()); 258 } 259 } 260 261 @Override getBlockId()262 synchronized public long getBlockId() { 263 return theBlock.getBlockId(); 264 } 265 266 @Override getVisibleLength()267 synchronized public long getVisibleLength() { 268 return getBytesAcked(); 269 } 270 271 @Override getState()272 public ReplicaState getState() { 273 return finalized ? ReplicaState.FINALIZED : ReplicaState.RBW; 274 } 275 276 @Override getBytesAcked()277 synchronized public long getBytesAcked() { 278 if (finalized) { 279 return theBlock.getNumBytes(); 280 } else { 281 return bytesAcked; 282 } 283 } 284 285 @Override setBytesAcked(long bytesAcked)286 synchronized public void setBytesAcked(long bytesAcked) { 287 if (!finalized) { 288 this.bytesAcked = bytesAcked; 289 } 290 } 291 292 @Override releaseAllBytesReserved()293 public void releaseAllBytesReserved() { 294 } 295 296 @Override getBytesOnDisk()297 synchronized public long getBytesOnDisk() { 298 if (finalized) { 299 return theBlock.getNumBytes(); 300 } else { 301 return oStream.getLength(); 302 } 303 } 304 305 @Override setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum)306 public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { 307 oStream.setLength(dataLength); 308 } 309 310 @Override getLastChecksumAndDataLen()311 public ChunkChecksum getLastChecksumAndDataLen() { 312 return new ChunkChecksum(oStream.getLength(), null); 313 } 314 315 @Override isOnTransientStorage()316 public boolean isOnTransientStorage() { 317 return false; 318 } 319 } 320 321 /** 322 * Class is used for tracking block pool storage utilization similar 323 * to {@link BlockPoolSlice} 324 */ 325 private static class SimulatedBPStorage { 326 private long used; // in bytes 327 getUsed()328 long getUsed() { 329 return used; 330 } 331 alloc(long amount)332 void alloc(long amount) { 333 used += amount; 334 } 335 free(long amount)336 void free(long amount) { 337 used -= amount; 338 } 339 SimulatedBPStorage()340 SimulatedBPStorage() { 341 used = 0; 342 } 343 } 344 345 /** 346 * Class used for tracking datanode level storage utilization similar 347 * to {@link FSVolumeSet} 348 */ 349 private static class SimulatedStorage { 350 private final Map<String, SimulatedBPStorage> map = 351 new HashMap<String, SimulatedBPStorage>(); 352 353 private final long capacity; // in bytes 354 private final DatanodeStorage dnStorage; 355 getFree()356 synchronized long getFree() { 357 return capacity - getUsed(); 358 } 359 getCapacity()360 long getCapacity() { 361 return capacity; 362 } 363 getUsed()364 synchronized long getUsed() { 365 long used = 0; 366 for (SimulatedBPStorage bpStorage : map.values()) { 367 used += bpStorage.getUsed(); 368 } 369 return used; 370 } 371 getBlockPoolUsed(String bpid)372 synchronized long getBlockPoolUsed(String bpid) throws IOException { 373 return getBPStorage(bpid).getUsed(); 374 } 375 getNumFailedVolumes()376 int getNumFailedVolumes() { 377 return 0; 378 } 379 alloc(String bpid, long amount)380 synchronized boolean alloc(String bpid, long amount) throws IOException { 381 if (getFree() >= amount) { 382 getBPStorage(bpid).alloc(amount); 383 return true; 384 } 385 return false; 386 } 387 free(String bpid, long amount)388 synchronized void free(String bpid, long amount) throws IOException { 389 getBPStorage(bpid).free(amount); 390 } 391 SimulatedStorage(long cap, DatanodeStorage.State state)392 SimulatedStorage(long cap, DatanodeStorage.State state) { 393 capacity = cap; 394 dnStorage = new DatanodeStorage( 395 "SimulatedStorage-" + DatanodeStorage.generateUuid(), 396 state, StorageType.DEFAULT); 397 } 398 addBlockPool(String bpid)399 synchronized void addBlockPool(String bpid) { 400 SimulatedBPStorage bpStorage = map.get(bpid); 401 if (bpStorage != null) { 402 return; 403 } 404 map.put(bpid, new SimulatedBPStorage()); 405 } 406 removeBlockPool(String bpid)407 synchronized void removeBlockPool(String bpid) { 408 map.remove(bpid); 409 } 410 getBPStorage(String bpid)411 private SimulatedBPStorage getBPStorage(String bpid) throws IOException { 412 SimulatedBPStorage bpStorage = map.get(bpid); 413 if (bpStorage == null) { 414 throw new IOException("block pool " + bpid + " not found"); 415 } 416 return bpStorage; 417 } 418 getStorageUuid()419 String getStorageUuid() { 420 return dnStorage.getStorageID(); 421 } 422 getDnStorage()423 DatanodeStorage getDnStorage() { 424 return dnStorage; 425 } 426 getStorageReport(String bpid)427 synchronized StorageReport getStorageReport(String bpid) { 428 return new StorageReport(dnStorage, 429 false, getCapacity(), getUsed(), getFree(), 430 map.get(bpid).getUsed()); 431 } 432 } 433 434 static class SimulatedVolume implements FsVolumeSpi { 435 private final SimulatedStorage storage; 436 SimulatedVolume(final SimulatedStorage storage)437 SimulatedVolume(final SimulatedStorage storage) { 438 this.storage = storage; 439 } 440 441 @Override obtainReference()442 public FsVolumeReference obtainReference() throws ClosedChannelException { 443 return null; 444 } 445 446 @Override getStorageID()447 public String getStorageID() { 448 return storage.getStorageUuid(); 449 } 450 451 @Override getBlockPoolList()452 public String[] getBlockPoolList() { 453 return new String[0]; 454 } 455 456 @Override getAvailable()457 public long getAvailable() throws IOException { 458 return storage.getCapacity() - storage.getUsed(); 459 } 460 461 @Override getBasePath()462 public String getBasePath() { 463 return null; 464 } 465 466 @Override getPath(String bpid)467 public String getPath(String bpid) throws IOException { 468 return null; 469 } 470 471 @Override getFinalizedDir(String bpid)472 public File getFinalizedDir(String bpid) throws IOException { 473 return null; 474 } 475 476 @Override getStorageType()477 public StorageType getStorageType() { 478 return null; 479 } 480 481 @Override isTransientStorage()482 public boolean isTransientStorage() { 483 return false; 484 } 485 486 @Override reserveSpaceForRbw(long bytesToReserve)487 public void reserveSpaceForRbw(long bytesToReserve) { 488 } 489 490 @Override releaseReservedSpace(long bytesToRelease)491 public void releaseReservedSpace(long bytesToRelease) { 492 } 493 494 @Override newBlockIterator(String bpid, String name)495 public BlockIterator newBlockIterator(String bpid, String name) { 496 throw new UnsupportedOperationException(); 497 } 498 499 @Override loadBlockIterator(String bpid, String name)500 public BlockIterator loadBlockIterator(String bpid, String name) 501 throws IOException { 502 throw new UnsupportedOperationException(); 503 } 504 505 @Override getDataset()506 public FsDatasetSpi getDataset() { 507 throw new UnsupportedOperationException(); 508 } 509 } 510 511 private final Map<String, Map<Block, BInfo>> blockMap 512 = new HashMap<String, Map<Block,BInfo>>(); 513 private final SimulatedStorage storage; 514 private final SimulatedVolume volume; 515 private final String datanodeUuid; 516 SimulatedFSDataset(DataStorage storage, Configuration conf)517 public SimulatedFSDataset(DataStorage storage, Configuration conf) { 518 if (storage != null) { 519 for (int i = 0; i < storage.getNumStorageDirs(); ++i) { 520 storage.createStorageID(storage.getStorageDir(i), false); 521 } 522 this.datanodeUuid = storage.getDatanodeUuid(); 523 } else { 524 this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid(); 525 } 526 527 registerMBean(datanodeUuid); 528 this.storage = new SimulatedStorage( 529 conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), 530 conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); 531 this.volume = new SimulatedVolume(this.storage); 532 } 533 injectBlocks(String bpid, Iterable<? extends Block> injectBlocks)534 public synchronized void injectBlocks(String bpid, 535 Iterable<? extends Block> injectBlocks) throws IOException { 536 ExtendedBlock blk = new ExtendedBlock(); 537 if (injectBlocks != null) { 538 for (Block b: injectBlocks) { // if any blocks in list is bad, reject list 539 if (b == null) { 540 throw new NullPointerException("Null blocks in block list"); 541 } 542 blk.set(bpid, b); 543 if (isValidBlock(blk)) { 544 throw new IOException("Block already exists in block list"); 545 } 546 } 547 Map<Block, BInfo> map = blockMap.get(bpid); 548 if (map == null) { 549 map = new HashMap<Block, BInfo>(); 550 blockMap.put(bpid, map); 551 } 552 553 for (Block b: injectBlocks) { 554 BInfo binfo = new BInfo(bpid, b, false); 555 map.put(binfo.theBlock, binfo); 556 } 557 } 558 } 559 560 /** Get a map for a given block pool Id */ getMap(String bpid)561 private Map<Block, BInfo> getMap(String bpid) throws IOException { 562 final Map<Block, BInfo> map = blockMap.get(bpid); 563 if (map == null) { 564 throw new IOException("Non existent blockpool " + bpid); 565 } 566 return map; 567 } 568 569 @Override // FsDatasetSpi finalizeBlock(ExtendedBlock b)570 public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { 571 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 572 BInfo binfo = map.get(b.getLocalBlock()); 573 if (binfo == null) { 574 throw new IOException("Finalizing a non existing block " + b); 575 } 576 binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes()); 577 } 578 579 @Override // FsDatasetSpi unfinalizeBlock(ExtendedBlock b)580 public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{ 581 if (isValidRbw(b)) { 582 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 583 map.remove(b.getLocalBlock()); 584 } 585 } 586 getBlockReport(String bpid)587 synchronized BlockListAsLongs getBlockReport(String bpid) { 588 BlockListAsLongs.Builder report = BlockListAsLongs.builder(); 589 final Map<Block, BInfo> map = blockMap.get(bpid); 590 if (map != null) { 591 for (BInfo b : map.values()) { 592 if (b.isFinalized()) { 593 report.add(b); 594 } 595 } 596 } 597 return report.build(); 598 } 599 600 @Override getBlockReports( String bpid)601 public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports( 602 String bpid) { 603 return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid)); 604 } 605 606 @Override // FsDatasetSpi getCacheReport(String bpid)607 public List<Long> getCacheReport(String bpid) { 608 return new LinkedList<Long>(); 609 } 610 611 @Override // FSDatasetMBean getCapacity()612 public long getCapacity() { 613 return storage.getCapacity(); 614 } 615 616 @Override // FSDatasetMBean getDfsUsed()617 public long getDfsUsed() { 618 return storage.getUsed(); 619 } 620 621 @Override // FSDatasetMBean getBlockPoolUsed(String bpid)622 public long getBlockPoolUsed(String bpid) throws IOException { 623 return storage.getBlockPoolUsed(bpid); 624 } 625 626 @Override // FSDatasetMBean getRemaining()627 public long getRemaining() { 628 return storage.getFree(); 629 } 630 631 @Override // FSDatasetMBean getNumFailedVolumes()632 public int getNumFailedVolumes() { 633 return storage.getNumFailedVolumes(); 634 } 635 636 @Override // FSDatasetMBean getFailedStorageLocations()637 public String[] getFailedStorageLocations() { 638 return null; 639 } 640 641 @Override // FSDatasetMBean getLastVolumeFailureDate()642 public long getLastVolumeFailureDate() { 643 return 0; 644 } 645 646 @Override // FSDatasetMBean getEstimatedCapacityLostTotal()647 public long getEstimatedCapacityLostTotal() { 648 return 0; 649 } 650 651 @Override // FsDatasetSpi getVolumeFailureSummary()652 public VolumeFailureSummary getVolumeFailureSummary() { 653 return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0); 654 } 655 656 @Override // FSDatasetMBean getCacheUsed()657 public long getCacheUsed() { 658 return 0l; 659 } 660 661 @Override // FSDatasetMBean getCacheCapacity()662 public long getCacheCapacity() { 663 return 0l; 664 } 665 666 @Override // FSDatasetMBean getNumBlocksCached()667 public long getNumBlocksCached() { 668 return 0l; 669 } 670 671 @Override getNumBlocksFailedToCache()672 public long getNumBlocksFailedToCache() { 673 return 0l; 674 } 675 676 @Override getNumBlocksFailedToUncache()677 public long getNumBlocksFailedToUncache() { 678 return 0l; 679 } 680 681 @Override // FsDatasetSpi getLength(ExtendedBlock b)682 public synchronized long getLength(ExtendedBlock b) throws IOException { 683 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 684 BInfo binfo = map.get(b.getLocalBlock()); 685 if (binfo == null) { 686 throw new IOException("Finalizing a non existing block " + b); 687 } 688 return binfo.getNumBytes(); 689 } 690 691 @Override 692 @Deprecated getReplica(String bpid, long blockId)693 public Replica getReplica(String bpid, long blockId) { 694 final Map<Block, BInfo> map = blockMap.get(bpid); 695 if (map != null) { 696 return map.get(new Block(blockId)); 697 } 698 return null; 699 } 700 701 @Override getReplicaString(String bpid, long blockId)702 public synchronized String getReplicaString(String bpid, long blockId) { 703 Replica r = null; 704 final Map<Block, BInfo> map = blockMap.get(bpid); 705 if (map != null) { 706 r = map.get(new Block(blockId)); 707 } 708 return r == null? "null": r.toString(); 709 } 710 711 @Override // FsDatasetSpi getStoredBlock(String bpid, long blkid)712 public Block getStoredBlock(String bpid, long blkid) throws IOException { 713 final Map<Block, BInfo> map = blockMap.get(bpid); 714 if (map != null) { 715 BInfo binfo = map.get(new Block(blkid)); 716 if (binfo == null) { 717 return null; 718 } 719 return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes()); 720 } 721 return null; 722 } 723 724 @Override // FsDatasetSpi invalidate(String bpid, Block[] invalidBlks)725 public synchronized void invalidate(String bpid, Block[] invalidBlks) 726 throws IOException { 727 boolean error = false; 728 if (invalidBlks == null) { 729 return; 730 } 731 final Map<Block, BInfo> map = getMap(bpid); 732 for (Block b: invalidBlks) { 733 if (b == null) { 734 continue; 735 } 736 BInfo binfo = map.get(b); 737 if (binfo == null) { 738 error = true; 739 DataNode.LOG.warn("Invalidate: Missing block"); 740 continue; 741 } 742 storage.free(bpid, binfo.getNumBytes()); 743 map.remove(b); 744 } 745 if (error) { 746 throw new IOException("Invalidate: Missing blocks."); 747 } 748 } 749 750 @Override // FSDatasetSpi cache(String bpid, long[] cacheBlks)751 public void cache(String bpid, long[] cacheBlks) { 752 throw new UnsupportedOperationException( 753 "SimulatedFSDataset does not support cache operation!"); 754 } 755 756 @Override // FSDatasetSpi uncache(String bpid, long[] uncacheBlks)757 public void uncache(String bpid, long[] uncacheBlks) { 758 throw new UnsupportedOperationException( 759 "SimulatedFSDataset does not support uncache operation!"); 760 } 761 762 @Override // FSDatasetSpi isCached(String bpid, long blockId)763 public boolean isCached(String bpid, long blockId) { 764 return false; 765 } 766 getBInfo(final ExtendedBlock b)767 private BInfo getBInfo(final ExtendedBlock b) { 768 final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId()); 769 return map == null? null: map.get(b.getLocalBlock()); 770 } 771 772 @Override // {@link FsDatasetSpi} contains(ExtendedBlock block)773 public boolean contains(ExtendedBlock block) { 774 return getBInfo(block) != null; 775 } 776 777 /** 778 * Check if a block is valid. 779 * 780 * @param b The block to check. 781 * @param minLength The minimum length that the block must have. May be 0. 782 * @param state If this is null, it is ignored. If it is non-null, we 783 * will check that the replica has this state. 784 * 785 * @throws ReplicaNotFoundException If the replica is not found 786 * 787 * @throws UnexpectedReplicaStateException If the replica is not in the 788 * expected state. 789 */ 790 @Override // {@link FsDatasetSpi} checkBlock(ExtendedBlock b, long minLength, ReplicaState state)791 public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state) 792 throws ReplicaNotFoundException, UnexpectedReplicaStateException { 793 final BInfo binfo = getBInfo(b); 794 795 if (binfo == null) { 796 throw new ReplicaNotFoundException(b); 797 } 798 if ((state == ReplicaState.FINALIZED && !binfo.isFinalized()) || 799 (state != ReplicaState.FINALIZED && binfo.isFinalized())) { 800 throw new UnexpectedReplicaStateException(b,state); 801 } 802 } 803 804 @Override // FsDatasetSpi isValidBlock(ExtendedBlock b)805 public synchronized boolean isValidBlock(ExtendedBlock b) { 806 try { 807 checkBlock(b, 0, ReplicaState.FINALIZED); 808 } catch (IOException e) { 809 return false; 810 } 811 return true; 812 } 813 814 /* check if a block is created but not finalized */ 815 @Override isValidRbw(ExtendedBlock b)816 public synchronized boolean isValidRbw(ExtendedBlock b) { 817 try { 818 checkBlock(b, 0, ReplicaState.RBW); 819 } catch (IOException e) { 820 return false; 821 } 822 return true; 823 } 824 825 @Override toString()826 public String toString() { 827 return getStorageInfo(); 828 } 829 830 @Override // FsDatasetSpi append( ExtendedBlock b, long newGS, long expectedBlockLen)831 public synchronized ReplicaHandler append( 832 ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { 833 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 834 BInfo binfo = map.get(b.getLocalBlock()); 835 if (binfo == null || !binfo.isFinalized()) { 836 throw new ReplicaNotFoundException("Block " + b 837 + " is not valid, and cannot be appended to."); 838 } 839 binfo.unfinalizeBlock(); 840 return new ReplicaHandler(binfo, null); 841 } 842 843 @Override // FsDatasetSpi recoverAppend( ExtendedBlock b, long newGS, long expectedBlockLen)844 public synchronized ReplicaHandler recoverAppend( 845 ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { 846 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 847 BInfo binfo = map.get(b.getLocalBlock()); 848 if (binfo == null) { 849 throw new ReplicaNotFoundException("Block " + b 850 + " is not valid, and cannot be appended to."); 851 } 852 if (binfo.isFinalized()) { 853 binfo.unfinalizeBlock(); 854 } 855 map.remove(b); 856 binfo.theBlock.setGenerationStamp(newGS); 857 map.put(binfo.theBlock, binfo); 858 return new ReplicaHandler(binfo, null); 859 } 860 861 @Override // FsDatasetSpi recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)862 public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) 863 throws IOException { 864 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 865 BInfo binfo = map.get(b.getLocalBlock()); 866 if (binfo == null) { 867 throw new ReplicaNotFoundException("Block " + b 868 + " is not valid, and cannot be appended to."); 869 } 870 if (!binfo.isFinalized()) { 871 binfo.finalizeBlock(b.getBlockPoolId(), binfo.getNumBytes()); 872 } 873 map.remove(b.getLocalBlock()); 874 binfo.theBlock.setGenerationStamp(newGS); 875 map.put(binfo.theBlock, binfo); 876 return binfo.getStorageUuid(); 877 } 878 879 @Override // FsDatasetSpi recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)880 public synchronized ReplicaHandler recoverRbw( 881 ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) 882 throws IOException { 883 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 884 BInfo binfo = map.get(b.getLocalBlock()); 885 if ( binfo == null) { 886 throw new ReplicaNotFoundException("Block " + b 887 + " does not exist, and cannot be appended to."); 888 } 889 if (binfo.isFinalized()) { 890 throw new ReplicaAlreadyExistsException("Block " + b 891 + " is valid, and cannot be written to."); 892 } 893 map.remove(b); 894 binfo.theBlock.setGenerationStamp(newGS); 895 map.put(binfo.theBlock, binfo); 896 return new ReplicaHandler(binfo, null); 897 } 898 899 @Override // FsDatasetSpi createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)900 public synchronized ReplicaHandler createRbw( 901 StorageType storageType, ExtendedBlock b, 902 boolean allowLazyPersist) throws IOException { 903 return createTemporary(storageType, b); 904 } 905 906 @Override // FsDatasetSpi createTemporary( StorageType storageType, ExtendedBlock b)907 public synchronized ReplicaHandler createTemporary( 908 StorageType storageType, ExtendedBlock b) throws IOException { 909 if (isValidBlock(b)) { 910 throw new ReplicaAlreadyExistsException("Block " + b + 911 " is valid, and cannot be written to."); 912 } 913 if (isValidRbw(b)) { 914 throw new ReplicaAlreadyExistsException("Block " + b + 915 " is being written, and cannot be written to."); 916 } 917 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 918 BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); 919 map.put(binfo.theBlock, binfo); 920 return new ReplicaHandler(binfo, null); 921 } 922 getBlockInputStream(ExtendedBlock b )923 synchronized InputStream getBlockInputStream(ExtendedBlock b 924 ) throws IOException { 925 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 926 BInfo binfo = map.get(b.getLocalBlock()); 927 if (binfo == null) { 928 throw new IOException("No such Block " + b ); 929 } 930 931 return binfo.getIStream(); 932 } 933 934 @Override // FsDatasetSpi getBlockInputStream(ExtendedBlock b, long seekOffset)935 public synchronized InputStream getBlockInputStream(ExtendedBlock b, 936 long seekOffset) throws IOException { 937 InputStream result = getBlockInputStream(b); 938 IOUtils.skipFully(result, seekOffset); 939 return result; 940 } 941 942 /** Not supported */ 943 @Override // FsDatasetSpi getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff)944 public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, 945 long ckoff) throws IOException { 946 throw new IOException("Not supported"); 947 } 948 949 @Override // FsDatasetSpi getMetaDataInputStream(ExtendedBlock b )950 public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b 951 ) throws IOException { 952 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 953 BInfo binfo = map.get(b.getLocalBlock()); 954 if (binfo == null) { 955 throw new IOException("No such Block " + b ); 956 } 957 if (!binfo.finalized) { 958 throw new IOException("Block " + b + 959 " is being written, its meta cannot be read"); 960 } 961 final SimulatedInputStream sin = binfo.getMetaIStream(); 962 return new LengthInputStream(sin, sin.getLength()); 963 } 964 965 @Override checkDataDir()966 public Set<File> checkDataDir() { 967 // nothing to check for simulated data set 968 return null; 969 } 970 971 @Override // FsDatasetSpi adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams stream, int checksumSize)972 public synchronized void adjustCrcChannelPosition(ExtendedBlock b, 973 ReplicaOutputStreams stream, 974 int checksumSize) 975 throws IOException { 976 } 977 978 /** 979 * Simulated input and output streams 980 * 981 */ 982 static private class SimulatedInputStream extends java.io.InputStream { 983 984 985 byte theRepeatedData = 7; 986 final long length; // bytes 987 int currentPos = 0; 988 byte[] data = null; 989 990 /** 991 * An input stream of size l with repeated bytes 992 * @param l size of the stream 993 * @param iRepeatedData byte that is repeated in the stream 994 */ SimulatedInputStream(long l, byte iRepeatedData)995 SimulatedInputStream(long l, byte iRepeatedData) { 996 length = l; 997 theRepeatedData = iRepeatedData; 998 } 999 1000 /** 1001 * An input stream of of the supplied data 1002 * @param iData data to construct the stream 1003 */ SimulatedInputStream(byte[] iData)1004 SimulatedInputStream(byte[] iData) { 1005 data = iData; 1006 length = data.length; 1007 } 1008 1009 /** 1010 * @return the lenght of the input stream 1011 */ getLength()1012 long getLength() { 1013 return length; 1014 } 1015 1016 @Override read()1017 public int read() throws IOException { 1018 if (currentPos >= length) 1019 return -1; 1020 if (data !=null) { 1021 return data[currentPos++]; 1022 } else { 1023 currentPos++; 1024 return theRepeatedData; 1025 } 1026 } 1027 1028 @Override read(byte[] b)1029 public int read(byte[] b) throws IOException { 1030 1031 if (b == null) { 1032 throw new NullPointerException(); 1033 } 1034 if (b.length == 0) { 1035 return 0; 1036 } 1037 if (currentPos >= length) { // EOF 1038 return -1; 1039 } 1040 int bytesRead = (int) Math.min(b.length, length-currentPos); 1041 if (data != null) { 1042 System.arraycopy(data, currentPos, b, 0, bytesRead); 1043 } else { // all data is zero 1044 for (int i : b) { 1045 b[i] = theRepeatedData; 1046 } 1047 } 1048 currentPos += bytesRead; 1049 return bytesRead; 1050 } 1051 } 1052 1053 /** 1054 * This class implements an output stream that merely throws its data away, but records its 1055 * length. 1056 * 1057 */ 1058 static private class SimulatedOutputStream extends OutputStream { 1059 long length = 0; 1060 1061 /** 1062 * constructor for Simulated Output Steram 1063 */ SimulatedOutputStream()1064 SimulatedOutputStream() { 1065 } 1066 1067 /** 1068 * 1069 * @return the length of the data created so far. 1070 */ getLength()1071 long getLength() { 1072 return length; 1073 } 1074 1075 /** 1076 */ setLength(long length)1077 void setLength(long length) { 1078 this.length = length; 1079 } 1080 1081 @Override write(int arg0)1082 public void write(int arg0) throws IOException { 1083 length++; 1084 } 1085 1086 @Override write(byte[] b)1087 public void write(byte[] b) throws IOException { 1088 length += b.length; 1089 } 1090 1091 @Override write(byte[] b, int off, int len)1092 public void write(byte[] b, 1093 int off, 1094 int len) throws IOException { 1095 length += len; 1096 } 1097 } 1098 1099 private ObjectName mbeanName; 1100 1101 1102 1103 /** 1104 * Register the FSDataset MBean using the name 1105 * "hadoop:service=DataNode,name=FSDatasetState-<storageid>" 1106 * We use storage id for MBean name since a minicluster within a single 1107 * Java VM may have multiple Simulated Datanodes. 1108 */ registerMBean(final String storageId)1109 void registerMBean(final String storageId) { 1110 // We wrap to bypass standard mbean naming convetion. 1111 // This wraping can be removed in java 6 as it is more flexible in 1112 // package naming for mbeans and their impl. 1113 StandardMBean bean; 1114 1115 try { 1116 bean = new StandardMBean(this,FSDatasetMBean.class); 1117 mbeanName = MBeans.register("DataNode", "FSDatasetState-"+ 1118 storageId, bean); 1119 } catch (NotCompliantMBeanException e) { 1120 DataNode.LOG.warn("Error registering FSDatasetState MBean", e); 1121 } 1122 1123 DataNode.LOG.info("Registered FSDatasetState MBean"); 1124 } 1125 1126 @Override shutdown()1127 public void shutdown() { 1128 if (mbeanName != null) MBeans.unregister(mbeanName); 1129 } 1130 1131 @Override getStorageInfo()1132 public String getStorageInfo() { 1133 return "Simulated FSDataset-" + datanodeUuid; 1134 } 1135 1136 @Override hasEnoughResource()1137 public boolean hasEnoughResource() { 1138 return true; 1139 } 1140 1141 @Override initReplicaRecovery(RecoveringBlock rBlock)1142 public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) 1143 throws IOException { 1144 ExtendedBlock b = rBlock.getBlock(); 1145 final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); 1146 BInfo binfo = map.get(b.getLocalBlock()); 1147 if (binfo == null) { 1148 throw new IOException("No such Block " + b ); 1149 } 1150 1151 return new ReplicaRecoveryInfo(binfo.getBlockId(), binfo.getBytesOnDisk(), 1152 binfo.getGenerationStamp(), 1153 binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW); 1154 } 1155 1156 @Override // FsDatasetSpi updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newBlockId, long newlength)1157 public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, 1158 long recoveryId, 1159 long newBlockId, 1160 long newlength) { 1161 // Caller does not care about the exact Storage UUID returned. 1162 return datanodeUuid; 1163 } 1164 1165 @Override // FsDatasetSpi getReplicaVisibleLength(ExtendedBlock block)1166 public long getReplicaVisibleLength(ExtendedBlock block) { 1167 return block.getNumBytes(); 1168 } 1169 1170 @Override // FsDatasetSpi addBlockPool(String bpid, Configuration conf)1171 public void addBlockPool(String bpid, Configuration conf) { 1172 Map<Block, BInfo> map = new HashMap<Block, BInfo>(); 1173 blockMap.put(bpid, map); 1174 storage.addBlockPool(bpid); 1175 } 1176 1177 @Override // FsDatasetSpi shutdownBlockPool(String bpid)1178 public void shutdownBlockPool(String bpid) { 1179 blockMap.remove(bpid); 1180 storage.removeBlockPool(bpid); 1181 } 1182 1183 @Override // FsDatasetSpi deleteBlockPool(String bpid, boolean force)1184 public void deleteBlockPool(String bpid, boolean force) { 1185 return; 1186 } 1187 1188 @Override convertTemporaryToRbw(ExtendedBlock temporary)1189 public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary) 1190 throws IOException { 1191 final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId()); 1192 if (map == null) { 1193 throw new IOException("Block pool not found, temporary=" + temporary); 1194 } 1195 final BInfo r = map.get(temporary.getLocalBlock()); 1196 if (r == null) { 1197 throw new IOException("Block not found, temporary=" + temporary); 1198 } else if (r.isFinalized()) { 1199 throw new IOException("Replica already finalized, temporary=" 1200 + temporary + ", r=" + r); 1201 } 1202 return r; 1203 } 1204 1205 @Override getBlockLocalPathInfo(ExtendedBlock b)1206 public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { 1207 throw new UnsupportedOperationException(); 1208 } 1209 1210 @Override getHdfsBlocksMetadata(String bpid, long[] blockIds)1211 public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) 1212 throws IOException { 1213 throw new UnsupportedOperationException(); 1214 } 1215 1216 @Override enableTrash(String bpid)1217 public void enableTrash(String bpid) { 1218 throw new UnsupportedOperationException(); 1219 } 1220 1221 @Override clearTrash(String bpid)1222 public void clearTrash(String bpid) { 1223 } 1224 1225 @Override trashEnabled(String bpid)1226 public boolean trashEnabled(String bpid) { 1227 return false; 1228 } 1229 1230 @Override setRollingUpgradeMarker(String bpid)1231 public void setRollingUpgradeMarker(String bpid) { 1232 } 1233 1234 @Override clearRollingUpgradeMarker(String bpid)1235 public void clearRollingUpgradeMarker(String bpid) { 1236 } 1237 1238 @Override checkAndUpdate(String bpid, long blockId, File diskFile, File diskMetaFile, FsVolumeSpi vol)1239 public void checkAndUpdate(String bpid, long blockId, File diskFile, 1240 File diskMetaFile, FsVolumeSpi vol) throws IOException { 1241 throw new UnsupportedOperationException(); 1242 } 1243 1244 @Override getVolumes()1245 public List<FsVolumeSpi> getVolumes() { 1246 throw new UnsupportedOperationException(); 1247 } 1248 1249 @Override addVolume( final StorageLocation location, final List<NamespaceInfo> nsInfos)1250 public void addVolume( 1251 final StorageLocation location, 1252 final List<NamespaceInfo> nsInfos) throws IOException { 1253 throw new UnsupportedOperationException(); 1254 } 1255 1256 @Override getStorage(final String storageUuid)1257 public DatanodeStorage getStorage(final String storageUuid) { 1258 return storageUuid.equals(storage.getStorageUuid()) ? 1259 storage.dnStorage : 1260 null; 1261 } 1262 1263 @Override getStorageReports(String bpid)1264 public StorageReport[] getStorageReports(String bpid) { 1265 return new StorageReport[] {storage.getStorageReport(bpid)}; 1266 } 1267 1268 @Override getFinalizedBlocks(String bpid)1269 public List<FinalizedReplica> getFinalizedBlocks(String bpid) { 1270 throw new UnsupportedOperationException(); 1271 } 1272 1273 @Override getFinalizedBlocksOnPersistentStorage(String bpid)1274 public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) { 1275 throw new UnsupportedOperationException(); 1276 } 1277 1278 @Override getVolumeInfoMap()1279 public Map<String, Object> getVolumeInfoMap() { 1280 throw new UnsupportedOperationException(); 1281 } 1282 1283 @Override getVolume(ExtendedBlock b)1284 public FsVolumeSpi getVolume(ExtendedBlock b) { 1285 return volume; 1286 } 1287 1288 @Override removeVolumes(Set<File> volumes, boolean clearFailure)1289 public synchronized void removeVolumes(Set<File> volumes, boolean clearFailure) { 1290 throw new UnsupportedOperationException(); 1291 } 1292 1293 @Override submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags)1294 public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, 1295 FileDescriptor fd, long offset, long nbytes, int flags) { 1296 throw new UnsupportedOperationException(); 1297 } 1298 1299 @Override onCompleteLazyPersist(String bpId, long blockId, long creationTime, File[] savedFiles, FsVolumeSpi targetVolume)1300 public void onCompleteLazyPersist(String bpId, long blockId, 1301 long creationTime, File[] savedFiles, FsVolumeSpi targetVolume) { 1302 throw new UnsupportedOperationException(); 1303 } 1304 1305 @Override onFailLazyPersist(String bpId, long blockId)1306 public void onFailLazyPersist(String bpId, long blockId) { 1307 throw new UnsupportedOperationException(); 1308 } 1309 1310 @Override moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType)1311 public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, 1312 StorageType targetStorageType) throws IOException { 1313 // TODO Auto-generated method stub 1314 return null; 1315 } 1316 1317 @Override setPinning(ExtendedBlock b)1318 public void setPinning(ExtendedBlock b) throws IOException { 1319 blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true; 1320 } 1321 1322 @Override getPinning(ExtendedBlock b)1323 public boolean getPinning(ExtendedBlock b) throws IOException { 1324 return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned; 1325 } 1326 1327 @Override isDeletingBlock(String bpid, long blockId)1328 public boolean isDeletingBlock(String bpid, long blockId) { 1329 throw new UnsupportedOperationException(); 1330 } 1331 } 1332 1333