1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.log;
9 
10 import java.nio.ByteBuffer;
11 import java.util.concurrent.atomic.AtomicInteger;
12 import java.util.concurrent.locks.LockSupport;
13 
14 import com.sleepycat.je.DatabaseException;
15 import com.sleepycat.je.ThreadInterruptedException;
16 import com.sleepycat.je.dbi.EnvironmentImpl;
17 import com.sleepycat.je.latch.Latch;
18 import com.sleepycat.je.latch.LatchFactory;
19 import com.sleepycat.je.utilint.DbLsn;
20 
21 /**
22  * LogBuffers hold outgoing, newly written log entries.
23  * Space is allocated via the allocate() method that
24  * returns a LogBufferSegment object. The LogBuffer.writePinCount
25  * is incremented each time space is allocated. Once the
26  * caller copies data into the log buffer, the
27  * pin count is decremented via the free() method.
28  * Readers of a log buffer wait until the pin count
29  * is zero.
30  *
31  * The pin count is incremented under the readLatch. The
32  * pin count is decremented without holding the latch.
33  * Holding the readLatch will prevent the pin count from
34  * being incremented.
35  */
36 public class LogBuffer implements LogSource {
37 
38     private static final String DEBUG_NAME = LogBuffer.class.getName();
39 
40     /* Storage */
41     private final ByteBuffer buffer;
42 
43     /* Information about what log entries are held here. */
44     private long firstLsn;
45     private long lastLsn;
46 
47     /* The read latch serializes access to and modification of the LSN info. */
48     private Latch readLatch;
49 
50     /*
51      * Buffer may be rewritten because an IOException previously occurred.
52      */
53     private boolean rewriteAllowed;
54 
55     private AtomicInteger writePinCount = new AtomicInteger();
56     private byte[] data;
57     private EnvironmentImpl env;
58 
LogBuffer(int capacity, EnvironmentImpl env)59     LogBuffer(int capacity, EnvironmentImpl env)
60         throws DatabaseException {
61 
62         data = new byte[capacity];
63         buffer = ByteBuffer.wrap(data);
64         readLatch = LatchFactory.createExclusiveLatch(
65             env, DEBUG_NAME, false /*collectStats*/);
66         this.env = env;
67         reinit();
68     }
69 
70     /*
71      * Used by LogManager for the case when we have a temporary buffer in hand
72      * and no LogBuffers in the LogBufferPool are large enough to hold the
73      * current entry being written.  We just wrap the temporary ByteBuffer
74      * in a LogBuffer and pass it to FileManager. [#12674].
75      */
LogBuffer(ByteBuffer buffer, long firstLsn)76     LogBuffer(ByteBuffer buffer, long firstLsn) {
77         this.buffer = buffer;
78         this.firstLsn = firstLsn;
79         this.lastLsn = firstLsn;
80         rewriteAllowed = false;
81     }
82 
reinit()83     void reinit()
84         throws DatabaseException {
85 
86         readLatch.acquireExclusive();
87         buffer.clear();
88         firstLsn = DbLsn.NULL_LSN;
89         lastLsn = DbLsn.NULL_LSN;
90         rewriteAllowed = false;
91         writePinCount.set(0);
92         readLatch.release();
93     }
94 
95     /*
96      * Write support
97      */
98 
99     /**
100      * Return first LSN held in this buffer. Assumes the log write latch is
101      * held.
102      */
getFirstLsn()103     public long getFirstLsn() {
104         return firstLsn;
105     }
106 
107     /**
108      * This LSN has been written to the log.
109      */
registerLsn(long lsn)110     void registerLsn(long lsn)
111         throws DatabaseException {
112 
113         readLatch.acquireExclusive();
114         try {
115             if (lastLsn != DbLsn.NULL_LSN) {
116                 assert (DbLsn.compareTo(lsn, lastLsn) > 0):
117                     "lsn=" + lsn + " lastlsn=" + lastLsn;
118             }
119             lastLsn = lsn;
120             if (firstLsn == DbLsn.NULL_LSN) {
121                 firstLsn = lsn;
122             }
123         } finally {
124             readLatch.release();
125         }
126     }
127 
128     /**
129      * Check capacity of buffer. Assumes that the log write latch is held.
130      * @return true if this buffer can hold this many more bytes.
131      */
hasRoom(int numBytes)132     boolean hasRoom(int numBytes) {
133         return (numBytes <= (buffer.capacity() - buffer.position()));
134     }
135 
136     /**
137      * @return the actual data buffer.
138      */
getDataBuffer()139     public ByteBuffer getDataBuffer() {
140         return buffer;
141     }
142 
143     /**
144      * @return capacity in bytes
145      */
getCapacity()146     int getCapacity() {
147         return buffer.capacity();
148     }
149 
150     /*
151      * Read support
152      */
153 
154     /**
155      * Support for reading out of a still-in-memory log.  Can be used to
156      * determine if a log entry with a given LSN is contained in this buffer,
157      * or whether an arbitrary LSN location is present in the buffer.
158      *
159      * @return true if this buffer holds the data at this LSN location. If true
160      * is returned, the buffer will be latched for read. Returns false if LSN
161      * is not here, and releases the read latch.
162      */
containsLsn(long lsn)163     boolean containsLsn(long lsn) {
164         assert lsn != DbLsn.NULL_LSN;
165 
166         /*
167          * Latch before we look at the LSNs. We do not have to wait
168          * for zero to check the LSN field but need to have the count
169          * zero for a reader to read the buffer.
170          */
171         waitForZeroAndLatch();
172         boolean found = false;
173 
174         if ((firstLsn != DbLsn.NULL_LSN) &&
175             (DbLsn.getFileNumber(firstLsn) == DbLsn.getFileNumber(lsn))) {
176 
177             final long fileOffset = DbLsn.getFileOffset(lsn);
178             final int contentSize;
179             if (buffer.position() == 0) {
180                 /* Buffer was flipped for reading. */
181                 contentSize = buffer.limit();
182             } else {
183                 /* Buffer is still being written into. */
184                 contentSize = buffer.position();
185             }
186             final long firstLsnOffset = DbLsn.getFileOffset(firstLsn);
187             final long lastContentOffset = firstLsnOffset + contentSize;
188 
189             if ((firstLsnOffset <= fileOffset) &&
190                 (lastContentOffset > fileOffset)) {
191                 found = true;
192             }
193         }
194 
195         if (found) {
196             return true;
197         } else {
198             readLatch.release();
199             return false;
200         }
201     }
202 
203     /**
204      * When modifying the buffer, acquire the readLatch.  Call release() to
205      * release the latch.  Note that containsLsn() acquires the latch for
206      * reading.
207      */
latchForWrite()208     public void latchForWrite()
209         throws DatabaseException {
210 
211         readLatch.acquireExclusive();
212     }
213 
214     /*
215      * LogSource support
216      */
217 
218     /**
219      * @see LogSource#release
220      */
release()221     public void release() {
222         readLatch.releaseIfOwner();
223     }
224 
getRewriteAllowed()225     boolean getRewriteAllowed() {
226         return rewriteAllowed;
227     }
228 
setRewriteAllowed()229     void setRewriteAllowed() {
230         rewriteAllowed = true;
231     }
232 
233     /**
234      * Allocate a segment out of the buffer.
235      * Called with the buffer latched.
236      * @param size of buffer to allocate
237      * @return null if not enough room, otherwise a
238      *         LogBufferSegment for the data.
239      */
allocate(int size)240     public LogBufferSegment allocate(int size) {
241         if (hasRoom(size)) {
242             ByteBuffer buf =
243                 ByteBuffer.wrap(data, buffer.position(), size);
244             buffer.position(buffer.position() + size);
245             writePinCount.incrementAndGet();
246             return new LogBufferSegment(this, buf);
247         }
248         return null;
249     }
250 
251     /**
252      * Called with the buffer not latched.
253      */
free()254     public void free() {
255         writePinCount.decrementAndGet();
256     }
257 
258     /**
259      * Acquire the buffer latched and with the buffer
260      * pin count equal to zero.
261      */
waitForZeroAndLatch()262     public void waitForZeroAndLatch() {
263         boolean done = false;
264         while (!done) {
265             if (writePinCount.get() > 0) {
266                 LockSupport.parkNanos(this, 100);
267                 /*
268                  * This may be overkill to check if a thread was
269                  * interrupted. There should be no interrupt of the
270                  * thread pinning and unpinning the buffer.
271                  */
272                 if (Thread.interrupted()) {
273                     throw new ThreadInterruptedException(
274                         env, "Interrupt during read operation");
275                 }
276             } else {
277                 readLatch.acquireExclusive();
278                 if (writePinCount.get() == 0) {
279                    done = true;
280                 } else {
281                     readLatch.release();
282                 }
283             }
284         }
285     }
286 
287     /**
288      * @see LogSource#getBytes
289      */
getBytes(long fileOffset)290     public ByteBuffer getBytes(long fileOffset) {
291 
292         /*
293          * Make a copy of this buffer (doesn't copy data, only buffer state)
294          * and position it to read the requested data.
295          *
296          * Note that we catch Exception here because it is possible that
297          * another thread is modifying the state of buffer simultaneously.
298          * Specifically, this can happen if another thread is writing this log
299          * buffer out and it does (e.g.) a flip operation on it.  The actual
300          * mark/pos of the buffer may be caught in an unpredictable state.  We
301          * could add another latch to protect this buffer, but that's heavier
302          * weight than we need.  So the easiest thing to do is to just retry
303          * the duplicate operation.  See [#9822].
304          */
305         ByteBuffer copy = null;
306         while (true) {
307             try {
308                 copy = buffer.duplicate();
309                 copy.position((int)
310                               (fileOffset - DbLsn.getFileOffset(firstLsn)));
311                 break;
312             } catch (IllegalArgumentException IAE) {
313                 continue;
314             }
315         }
316         return copy;
317     }
318 
319     /**
320      * @see LogSource#getBytes
321      */
getBytes(long fileOffset, int numBytes)322     public ByteBuffer getBytes(long fileOffset, int numBytes)
323         throws ChecksumException {
324 
325         ByteBuffer copy = getBytes(fileOffset);
326         /* Log Buffer should always hold a whole entry. */
327         if (copy.remaining() < numBytes) {
328             throw new ChecksumException("copy.remaining=" + copy.remaining() +
329                                         " numBytes=" + numBytes);
330         }
331         return copy;
332     }
333 
334     /**
335      * Entries in write buffers are always the current version.
336      */
getLogVersion()337     public int getLogVersion() {
338         return LogEntryType.LOG_VERSION;
339     }
340 }
341