1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.shortcircuit;
19 
20 import java.io.FileInputStream;
21 import java.io.IOException;
22 import java.nio.MappedByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.nio.channels.FileChannel.MapMode;
25 
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.hdfs.ExtendedBlockId;
30 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
31 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
32 import org.apache.hadoop.io.IOUtils;
33 import org.apache.hadoop.io.nativeio.NativeIO;
34 import org.apache.hadoop.util.Time;
35 
36 import com.google.common.annotations.VisibleForTesting;
37 import com.google.common.base.Preconditions;
38 
39 /**
40  * A ShortCircuitReplica object contains file descriptors for a block that
41  * we are reading via short-circuit local reads.
42  *
43  * The file descriptors can be shared between multiple threads because
44  * all the operations we perform are stateless-- i.e., we use pread
45  * instead of read, to avoid using the shared position state.
46  */
47 @InterfaceAudience.Private
48 public class ShortCircuitReplica {
49   public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class);
50 
51   /**
52    * Identifies this ShortCircuitReplica object.
53    */
54   final ExtendedBlockId key;
55 
56   /**
57    * The block data input stream.
58    */
59   private final FileInputStream dataStream;
60 
61   /**
62    * The block metadata input stream.
63    *
64    * TODO: make this nullable if the file has no checksums on disk.
65    */
66   private final FileInputStream metaStream;
67 
68   /**
69    * Block metadata header.
70    */
71   private final BlockMetadataHeader metaHeader;
72 
73   /**
74    * The cache we belong to.
75    */
76   private final ShortCircuitCache cache;
77 
78   /**
79    * Monotonic time at which the replica was created.
80    */
81   private final long creationTimeMs;
82 
83   /**
84    * If non-null, the shared memory slot associated with this replica.
85    */
86   private final Slot slot;
87 
88   /**
89    * Current mmap state.
90    *
91    * Protected by the cache lock.
92    */
93   Object mmapData;
94 
95   /**
96    * True if this replica has been purged from the cache; false otherwise.
97    *
98    * Protected by the cache lock.
99    */
100   boolean purged = false;
101 
102   /**
103    * Number of external references to this replica.  Replicas are referenced
104    * by the cache, BlockReaderLocal instances, and by ClientMmap instances.
105    * The number starts at 2 because when we create a replica, it is referenced
106    * by both the cache and the requester.
107    *
108    * Protected by the cache lock.
109    */
110   int refCount = 2;
111 
112   /**
113    * The monotonic time in nanoseconds at which the replica became evictable, or
114    * null if it is not evictable.
115    *
116    * Protected by the cache lock.
117    */
118   private Long evictableTimeNs = null;
119 
ShortCircuitReplica(ExtendedBlockId key, FileInputStream dataStream, FileInputStream metaStream, ShortCircuitCache cache, long creationTimeMs, Slot slot)120   public ShortCircuitReplica(ExtendedBlockId key,
121       FileInputStream dataStream, FileInputStream metaStream,
122       ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
123     this.key = key;
124     this.dataStream = dataStream;
125     this.metaStream = metaStream;
126     this.metaHeader =
127           BlockMetadataHeader.preadHeader(metaStream.getChannel());
128     if (metaHeader.getVersion() != 1) {
129       throw new IOException("invalid metadata header version " +
130           metaHeader.getVersion() + ".  Can only handle version 1.");
131     }
132     this.cache = cache;
133     this.creationTimeMs = creationTimeMs;
134     this.slot = slot;
135   }
136 
137   /**
138    * Decrement the reference count.
139    */
unref()140   public void unref() {
141     cache.unref(this);
142   }
143 
144   /**
145    * Check if the replica is stale.
146    *
147    * Must be called with the cache lock held.
148    */
isStale()149   boolean isStale() {
150     if (slot != null) {
151       // Check staleness by looking at the shared memory area we use to
152       // communicate with the DataNode.
153       boolean stale = !slot.isValid();
154       if (LOG.isTraceEnabled()) {
155         LOG.trace(this + ": checked shared memory segment.  isStale=" + stale);
156       }
157       return stale;
158     } else {
159       // Fall back to old, time-based staleness method.
160       long deltaMs = Time.monotonicNow() - creationTimeMs;
161       long staleThresholdMs = cache.getStaleThresholdMs();
162       if (deltaMs > staleThresholdMs) {
163         if (LOG.isTraceEnabled()) {
164           LOG.trace(this + " is stale because it's " + deltaMs +
165               " ms old, and staleThresholdMs = " + staleThresholdMs);
166         }
167         return true;
168       } else {
169         if (LOG.isTraceEnabled()) {
170           LOG.trace(this + " is not stale because it's only " + deltaMs +
171               " ms old, and staleThresholdMs = " + staleThresholdMs);
172         }
173         return false;
174       }
175     }
176   }
177 
178   /**
179    * Try to add a no-checksum anchor to our shared memory slot.
180    *
181    * It is only possible to add this anchor when the block is mlocked on the Datanode.
182    * The DataNode will not munlock the block until the number of no-checksum anchors
183    * for the block reaches zero.
184    *
185    * This method does not require any synchronization.
186    *
187    * @return     True if we successfully added a no-checksum anchor.
188    */
addNoChecksumAnchor()189   public boolean addNoChecksumAnchor() {
190     if (slot == null) {
191       return false;
192     }
193     boolean result = slot.addAnchor();
194     if (LOG.isTraceEnabled()) {
195       if (result) {
196         LOG.trace(this + ": added no-checksum anchor to slot " + slot);
197       } else {
198         LOG.trace(this + ": could not add no-checksum anchor to slot " + slot);
199       }
200     }
201     return result;
202   }
203 
204   /**
205    * Remove a no-checksum anchor for our shared memory slot.
206    *
207    * This method does not require any synchronization.
208    */
removeNoChecksumAnchor()209   public void removeNoChecksumAnchor() {
210     if (slot != null) {
211       slot.removeAnchor();
212     }
213   }
214 
215   /**
216    * Check if the replica has an associated mmap that has been fully loaded.
217    *
218    * Must be called with the cache lock held.
219    */
220   @VisibleForTesting
hasMmap()221   public boolean hasMmap() {
222     return ((mmapData != null) && (mmapData instanceof MappedByteBuffer));
223   }
224 
225   /**
226    * Free the mmap associated with this replica.
227    *
228    * Must be called with the cache lock held.
229    */
munmap()230   void munmap() {
231     MappedByteBuffer mmap = (MappedByteBuffer)mmapData;
232     NativeIO.POSIX.munmap(mmap);
233     mmapData = null;
234   }
235 
236   /**
237    * Close the replica.
238    *
239    * Must be called after there are no more references to the replica in the
240    * cache or elsewhere.
241    */
close()242   void close() {
243     String suffix = "";
244 
245     Preconditions.checkState(refCount == 0,
246         "tried to close replica with refCount %d: %s", refCount, this);
247     refCount = -1;
248     Preconditions.checkState(purged,
249         "tried to close unpurged replica %s", this);
250     if (hasMmap()) {
251       munmap();
252       if (LOG.isTraceEnabled()) {
253         suffix += "  munmapped.";
254       }
255     }
256     IOUtils.cleanup(LOG, dataStream, metaStream);
257     if (slot != null) {
258       cache.scheduleSlotReleaser(slot);
259       if (LOG.isTraceEnabled()) {
260         suffix += "  scheduling " + slot + " for later release.";
261       }
262     }
263     if (LOG.isTraceEnabled()) {
264       LOG.trace("closed " + this + suffix);
265     }
266   }
267 
getDataStream()268   public FileInputStream getDataStream() {
269     return dataStream;
270   }
271 
getMetaStream()272   public FileInputStream getMetaStream() {
273     return metaStream;
274   }
275 
getMetaHeader()276   public BlockMetadataHeader getMetaHeader() {
277     return metaHeader;
278   }
279 
getKey()280   public ExtendedBlockId getKey() {
281     return key;
282   }
283 
getOrCreateClientMmap(boolean anchor)284   public ClientMmap getOrCreateClientMmap(boolean anchor) {
285     return cache.getOrCreateClientMmap(this, anchor);
286   }
287 
loadMmapInternal()288   MappedByteBuffer loadMmapInternal() {
289     try {
290       FileChannel channel = dataStream.getChannel();
291       MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0,
292           Math.min(Integer.MAX_VALUE, channel.size()));
293       if (LOG.isTraceEnabled()) {
294         LOG.trace(this + ": created mmap of size " + channel.size());
295       }
296       return mmap;
297     } catch (IOException e) {
298       LOG.warn(this + ": mmap error", e);
299       return null;
300     } catch (RuntimeException e) {
301       LOG.warn(this + ": mmap error", e);
302       return null;
303     }
304   }
305 
306   /**
307    * Get the evictable time in nanoseconds.
308    *
309    * Note: you must hold the cache lock to call this function.
310    *
311    * @return the evictable time in nanoseconds.
312    */
getEvictableTimeNs()313   public Long getEvictableTimeNs() {
314     return evictableTimeNs;
315   }
316 
317   /**
318    * Set the evictable time in nanoseconds.
319    *
320    * Note: you must hold the cache lock to call this function.
321    *
322    * @param evictableTimeNs   The evictable time in nanoseconds, or null
323    *                          to set no evictable time.
324    */
setEvictableTimeNs(Long evictableTimeNs)325   void setEvictableTimeNs(Long evictableTimeNs) {
326     this.evictableTimeNs = evictableTimeNs;
327   }
328 
329   @VisibleForTesting
getSlot()330   public Slot getSlot() {
331     return slot;
332   }
333 
334   /**
335    * Convert the replica to a string for debugging purposes.
336    * Note that we can't take the lock here.
337    */
338   @Override
toString()339   public String toString() {
340     return new StringBuilder().append("ShortCircuitReplica{").
341         append("key=").append(key).
342         append(", metaHeader.version=").append(metaHeader.getVersion()).
343         append(", metaHeader.checksum=").append(metaHeader.getChecksum()).
344         append(", ident=").append("0x").
345           append(Integer.toHexString(System.identityHashCode(this))).
346         append(", creationTimeMs=").append(creationTimeMs).
347         append("}").toString();
348   }
349 }
350