1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.shortcircuit; 19 20 import java.io.FileInputStream; 21 import java.io.IOException; 22 import java.nio.MappedByteBuffer; 23 import java.nio.channels.FileChannel; 24 import java.nio.channels.FileChannel.MapMode; 25 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 import org.apache.hadoop.classification.InterfaceAudience; 29 import org.apache.hadoop.hdfs.ExtendedBlockId; 30 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; 31 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; 32 import org.apache.hadoop.io.IOUtils; 33 import org.apache.hadoop.io.nativeio.NativeIO; 34 import org.apache.hadoop.util.Time; 35 36 import com.google.common.annotations.VisibleForTesting; 37 import com.google.common.base.Preconditions; 38 39 /** 40 * A ShortCircuitReplica object contains file descriptors for a block that 41 * we are reading via short-circuit local reads. 42 * 43 * The file descriptors can be shared between multiple threads because 44 * all the operations we perform are stateless-- i.e., we use pread 45 * instead of read, to avoid using the shared position state. 46 */ 47 @InterfaceAudience.Private 48 public class ShortCircuitReplica { 49 public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); 50 51 /** 52 * Identifies this ShortCircuitReplica object. 53 */ 54 final ExtendedBlockId key; 55 56 /** 57 * The block data input stream. 58 */ 59 private final FileInputStream dataStream; 60 61 /** 62 * The block metadata input stream. 63 * 64 * TODO: make this nullable if the file has no checksums on disk. 65 */ 66 private final FileInputStream metaStream; 67 68 /** 69 * Block metadata header. 70 */ 71 private final BlockMetadataHeader metaHeader; 72 73 /** 74 * The cache we belong to. 75 */ 76 private final ShortCircuitCache cache; 77 78 /** 79 * Monotonic time at which the replica was created. 80 */ 81 private final long creationTimeMs; 82 83 /** 84 * If non-null, the shared memory slot associated with this replica. 85 */ 86 private final Slot slot; 87 88 /** 89 * Current mmap state. 90 * 91 * Protected by the cache lock. 92 */ 93 Object mmapData; 94 95 /** 96 * True if this replica has been purged from the cache; false otherwise. 97 * 98 * Protected by the cache lock. 99 */ 100 boolean purged = false; 101 102 /** 103 * Number of external references to this replica. Replicas are referenced 104 * by the cache, BlockReaderLocal instances, and by ClientMmap instances. 105 * The number starts at 2 because when we create a replica, it is referenced 106 * by both the cache and the requester. 107 * 108 * Protected by the cache lock. 109 */ 110 int refCount = 2; 111 112 /** 113 * The monotonic time in nanoseconds at which the replica became evictable, or 114 * null if it is not evictable. 115 * 116 * Protected by the cache lock. 117 */ 118 private Long evictableTimeNs = null; 119 ShortCircuitReplica(ExtendedBlockId key, FileInputStream dataStream, FileInputStream metaStream, ShortCircuitCache cache, long creationTimeMs, Slot slot)120 public ShortCircuitReplica(ExtendedBlockId key, 121 FileInputStream dataStream, FileInputStream metaStream, 122 ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException { 123 this.key = key; 124 this.dataStream = dataStream; 125 this.metaStream = metaStream; 126 this.metaHeader = 127 BlockMetadataHeader.preadHeader(metaStream.getChannel()); 128 if (metaHeader.getVersion() != 1) { 129 throw new IOException("invalid metadata header version " + 130 metaHeader.getVersion() + ". Can only handle version 1."); 131 } 132 this.cache = cache; 133 this.creationTimeMs = creationTimeMs; 134 this.slot = slot; 135 } 136 137 /** 138 * Decrement the reference count. 139 */ unref()140 public void unref() { 141 cache.unref(this); 142 } 143 144 /** 145 * Check if the replica is stale. 146 * 147 * Must be called with the cache lock held. 148 */ isStale()149 boolean isStale() { 150 if (slot != null) { 151 // Check staleness by looking at the shared memory area we use to 152 // communicate with the DataNode. 153 boolean stale = !slot.isValid(); 154 if (LOG.isTraceEnabled()) { 155 LOG.trace(this + ": checked shared memory segment. isStale=" + stale); 156 } 157 return stale; 158 } else { 159 // Fall back to old, time-based staleness method. 160 long deltaMs = Time.monotonicNow() - creationTimeMs; 161 long staleThresholdMs = cache.getStaleThresholdMs(); 162 if (deltaMs > staleThresholdMs) { 163 if (LOG.isTraceEnabled()) { 164 LOG.trace(this + " is stale because it's " + deltaMs + 165 " ms old, and staleThresholdMs = " + staleThresholdMs); 166 } 167 return true; 168 } else { 169 if (LOG.isTraceEnabled()) { 170 LOG.trace(this + " is not stale because it's only " + deltaMs + 171 " ms old, and staleThresholdMs = " + staleThresholdMs); 172 } 173 return false; 174 } 175 } 176 } 177 178 /** 179 * Try to add a no-checksum anchor to our shared memory slot. 180 * 181 * It is only possible to add this anchor when the block is mlocked on the Datanode. 182 * The DataNode will not munlock the block until the number of no-checksum anchors 183 * for the block reaches zero. 184 * 185 * This method does not require any synchronization. 186 * 187 * @return True if we successfully added a no-checksum anchor. 188 */ addNoChecksumAnchor()189 public boolean addNoChecksumAnchor() { 190 if (slot == null) { 191 return false; 192 } 193 boolean result = slot.addAnchor(); 194 if (LOG.isTraceEnabled()) { 195 if (result) { 196 LOG.trace(this + ": added no-checksum anchor to slot " + slot); 197 } else { 198 LOG.trace(this + ": could not add no-checksum anchor to slot " + slot); 199 } 200 } 201 return result; 202 } 203 204 /** 205 * Remove a no-checksum anchor for our shared memory slot. 206 * 207 * This method does not require any synchronization. 208 */ removeNoChecksumAnchor()209 public void removeNoChecksumAnchor() { 210 if (slot != null) { 211 slot.removeAnchor(); 212 } 213 } 214 215 /** 216 * Check if the replica has an associated mmap that has been fully loaded. 217 * 218 * Must be called with the cache lock held. 219 */ 220 @VisibleForTesting hasMmap()221 public boolean hasMmap() { 222 return ((mmapData != null) && (mmapData instanceof MappedByteBuffer)); 223 } 224 225 /** 226 * Free the mmap associated with this replica. 227 * 228 * Must be called with the cache lock held. 229 */ munmap()230 void munmap() { 231 MappedByteBuffer mmap = (MappedByteBuffer)mmapData; 232 NativeIO.POSIX.munmap(mmap); 233 mmapData = null; 234 } 235 236 /** 237 * Close the replica. 238 * 239 * Must be called after there are no more references to the replica in the 240 * cache or elsewhere. 241 */ close()242 void close() { 243 String suffix = ""; 244 245 Preconditions.checkState(refCount == 0, 246 "tried to close replica with refCount %d: %s", refCount, this); 247 refCount = -1; 248 Preconditions.checkState(purged, 249 "tried to close unpurged replica %s", this); 250 if (hasMmap()) { 251 munmap(); 252 if (LOG.isTraceEnabled()) { 253 suffix += " munmapped."; 254 } 255 } 256 IOUtils.cleanup(LOG, dataStream, metaStream); 257 if (slot != null) { 258 cache.scheduleSlotReleaser(slot); 259 if (LOG.isTraceEnabled()) { 260 suffix += " scheduling " + slot + " for later release."; 261 } 262 } 263 if (LOG.isTraceEnabled()) { 264 LOG.trace("closed " + this + suffix); 265 } 266 } 267 getDataStream()268 public FileInputStream getDataStream() { 269 return dataStream; 270 } 271 getMetaStream()272 public FileInputStream getMetaStream() { 273 return metaStream; 274 } 275 getMetaHeader()276 public BlockMetadataHeader getMetaHeader() { 277 return metaHeader; 278 } 279 getKey()280 public ExtendedBlockId getKey() { 281 return key; 282 } 283 getOrCreateClientMmap(boolean anchor)284 public ClientMmap getOrCreateClientMmap(boolean anchor) { 285 return cache.getOrCreateClientMmap(this, anchor); 286 } 287 loadMmapInternal()288 MappedByteBuffer loadMmapInternal() { 289 try { 290 FileChannel channel = dataStream.getChannel(); 291 MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, 292 Math.min(Integer.MAX_VALUE, channel.size())); 293 if (LOG.isTraceEnabled()) { 294 LOG.trace(this + ": created mmap of size " + channel.size()); 295 } 296 return mmap; 297 } catch (IOException e) { 298 LOG.warn(this + ": mmap error", e); 299 return null; 300 } catch (RuntimeException e) { 301 LOG.warn(this + ": mmap error", e); 302 return null; 303 } 304 } 305 306 /** 307 * Get the evictable time in nanoseconds. 308 * 309 * Note: you must hold the cache lock to call this function. 310 * 311 * @return the evictable time in nanoseconds. 312 */ getEvictableTimeNs()313 public Long getEvictableTimeNs() { 314 return evictableTimeNs; 315 } 316 317 /** 318 * Set the evictable time in nanoseconds. 319 * 320 * Note: you must hold the cache lock to call this function. 321 * 322 * @param evictableTimeNs The evictable time in nanoseconds, or null 323 * to set no evictable time. 324 */ setEvictableTimeNs(Long evictableTimeNs)325 void setEvictableTimeNs(Long evictableTimeNs) { 326 this.evictableTimeNs = evictableTimeNs; 327 } 328 329 @VisibleForTesting getSlot()330 public Slot getSlot() { 331 return slot; 332 } 333 334 /** 335 * Convert the replica to a string for debugging purposes. 336 * Note that we can't take the lock here. 337 */ 338 @Override toString()339 public String toString() { 340 return new StringBuilder().append("ShortCircuitReplica{"). 341 append("key=").append(key). 342 append(", metaHeader.version=").append(metaHeader.getVersion()). 343 append(", metaHeader.checksum=").append(metaHeader.getChecksum()). 344 append(", ident=").append("0x"). 345 append(Integer.toHexString(System.identityHashCode(this))). 346 append(", creationTimeMs=").append(creationTimeMs). 347 append("}").toString(); 348 } 349 } 350