1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.log; 9 10 import java.nio.ByteBuffer; 11 import java.util.concurrent.atomic.AtomicInteger; 12 import java.util.concurrent.locks.LockSupport; 13 14 import com.sleepycat.je.DatabaseException; 15 import com.sleepycat.je.ThreadInterruptedException; 16 import com.sleepycat.je.dbi.EnvironmentImpl; 17 import com.sleepycat.je.latch.Latch; 18 import com.sleepycat.je.latch.LatchFactory; 19 import com.sleepycat.je.utilint.DbLsn; 20 21 /** 22 * LogBuffers hold outgoing, newly written log entries. 23 * Space is allocated via the allocate() method that 24 * returns a LogBufferSegment object. The LogBuffer.writePinCount 25 * is incremented each time space is allocated. Once the 26 * caller copies data into the log buffer, the 27 * pin count is decremented via the free() method. 28 * Readers of a log buffer wait until the pin count 29 * is zero. 30 * 31 * The pin count is incremented under the readLatch. The 32 * pin count is decremented without holding the latch. 33 * Holding the readLatch will prevent the pin count from 34 * being incremented. 35 */ 36 public class LogBuffer implements LogSource { 37 38 private static final String DEBUG_NAME = LogBuffer.class.getName(); 39 40 /* Storage */ 41 private final ByteBuffer buffer; 42 43 /* Information about what log entries are held here. */ 44 private long firstLsn; 45 private long lastLsn; 46 47 /* The read latch serializes access to and modification of the LSN info. */ 48 private Latch readLatch; 49 50 /* 51 * Buffer may be rewritten because an IOException previously occurred. 52 */ 53 private boolean rewriteAllowed; 54 55 private AtomicInteger writePinCount = new AtomicInteger(); 56 private byte[] data; 57 private EnvironmentImpl env; 58 LogBuffer(int capacity, EnvironmentImpl env)59 LogBuffer(int capacity, EnvironmentImpl env) 60 throws DatabaseException { 61 62 data = new byte[capacity]; 63 buffer = ByteBuffer.wrap(data); 64 readLatch = LatchFactory.createExclusiveLatch( 65 env, DEBUG_NAME, false /*collectStats*/); 66 this.env = env; 67 reinit(); 68 } 69 70 /* 71 * Used by LogManager for the case when we have a temporary buffer in hand 72 * and no LogBuffers in the LogBufferPool are large enough to hold the 73 * current entry being written. We just wrap the temporary ByteBuffer 74 * in a LogBuffer and pass it to FileManager. [#12674]. 75 */ LogBuffer(ByteBuffer buffer, long firstLsn)76 LogBuffer(ByteBuffer buffer, long firstLsn) { 77 this.buffer = buffer; 78 this.firstLsn = firstLsn; 79 this.lastLsn = firstLsn; 80 rewriteAllowed = false; 81 } 82 reinit()83 void reinit() 84 throws DatabaseException { 85 86 readLatch.acquireExclusive(); 87 buffer.clear(); 88 firstLsn = DbLsn.NULL_LSN; 89 lastLsn = DbLsn.NULL_LSN; 90 rewriteAllowed = false; 91 writePinCount.set(0); 92 readLatch.release(); 93 } 94 95 /* 96 * Write support 97 */ 98 99 /** 100 * Return first LSN held in this buffer. Assumes the log write latch is 101 * held. 102 */ getFirstLsn()103 public long getFirstLsn() { 104 return firstLsn; 105 } 106 107 /** 108 * This LSN has been written to the log. 109 */ registerLsn(long lsn)110 void registerLsn(long lsn) 111 throws DatabaseException { 112 113 readLatch.acquireExclusive(); 114 try { 115 if (lastLsn != DbLsn.NULL_LSN) { 116 assert (DbLsn.compareTo(lsn, lastLsn) > 0): 117 "lsn=" + lsn + " lastlsn=" + lastLsn; 118 } 119 lastLsn = lsn; 120 if (firstLsn == DbLsn.NULL_LSN) { 121 firstLsn = lsn; 122 } 123 } finally { 124 readLatch.release(); 125 } 126 } 127 128 /** 129 * Check capacity of buffer. Assumes that the log write latch is held. 130 * @return true if this buffer can hold this many more bytes. 131 */ hasRoom(int numBytes)132 boolean hasRoom(int numBytes) { 133 return (numBytes <= (buffer.capacity() - buffer.position())); 134 } 135 136 /** 137 * @return the actual data buffer. 138 */ getDataBuffer()139 public ByteBuffer getDataBuffer() { 140 return buffer; 141 } 142 143 /** 144 * @return capacity in bytes 145 */ getCapacity()146 int getCapacity() { 147 return buffer.capacity(); 148 } 149 150 /* 151 * Read support 152 */ 153 154 /** 155 * Support for reading out of a still-in-memory log. Can be used to 156 * determine if a log entry with a given LSN is contained in this buffer, 157 * or whether an arbitrary LSN location is present in the buffer. 158 * 159 * @return true if this buffer holds the data at this LSN location. If true 160 * is returned, the buffer will be latched for read. Returns false if LSN 161 * is not here, and releases the read latch. 162 */ containsLsn(long lsn)163 boolean containsLsn(long lsn) { 164 assert lsn != DbLsn.NULL_LSN; 165 166 /* 167 * Latch before we look at the LSNs. We do not have to wait 168 * for zero to check the LSN field but need to have the count 169 * zero for a reader to read the buffer. 170 */ 171 waitForZeroAndLatch(); 172 boolean found = false; 173 174 if ((firstLsn != DbLsn.NULL_LSN) && 175 (DbLsn.getFileNumber(firstLsn) == DbLsn.getFileNumber(lsn))) { 176 177 final long fileOffset = DbLsn.getFileOffset(lsn); 178 final int contentSize; 179 if (buffer.position() == 0) { 180 /* Buffer was flipped for reading. */ 181 contentSize = buffer.limit(); 182 } else { 183 /* Buffer is still being written into. */ 184 contentSize = buffer.position(); 185 } 186 final long firstLsnOffset = DbLsn.getFileOffset(firstLsn); 187 final long lastContentOffset = firstLsnOffset + contentSize; 188 189 if ((firstLsnOffset <= fileOffset) && 190 (lastContentOffset > fileOffset)) { 191 found = true; 192 } 193 } 194 195 if (found) { 196 return true; 197 } else { 198 readLatch.release(); 199 return false; 200 } 201 } 202 203 /** 204 * When modifying the buffer, acquire the readLatch. Call release() to 205 * release the latch. Note that containsLsn() acquires the latch for 206 * reading. 207 */ latchForWrite()208 public void latchForWrite() 209 throws DatabaseException { 210 211 readLatch.acquireExclusive(); 212 } 213 214 /* 215 * LogSource support 216 */ 217 218 /** 219 * @see LogSource#release 220 */ release()221 public void release() { 222 readLatch.releaseIfOwner(); 223 } 224 getRewriteAllowed()225 boolean getRewriteAllowed() { 226 return rewriteAllowed; 227 } 228 setRewriteAllowed()229 void setRewriteAllowed() { 230 rewriteAllowed = true; 231 } 232 233 /** 234 * Allocate a segment out of the buffer. 235 * Called with the buffer latched. 236 * @param size of buffer to allocate 237 * @return null if not enough room, otherwise a 238 * LogBufferSegment for the data. 239 */ allocate(int size)240 public LogBufferSegment allocate(int size) { 241 if (hasRoom(size)) { 242 ByteBuffer buf = 243 ByteBuffer.wrap(data, buffer.position(), size); 244 buffer.position(buffer.position() + size); 245 writePinCount.incrementAndGet(); 246 return new LogBufferSegment(this, buf); 247 } 248 return null; 249 } 250 251 /** 252 * Called with the buffer not latched. 253 */ free()254 public void free() { 255 writePinCount.decrementAndGet(); 256 } 257 258 /** 259 * Acquire the buffer latched and with the buffer 260 * pin count equal to zero. 261 */ waitForZeroAndLatch()262 public void waitForZeroAndLatch() { 263 boolean done = false; 264 while (!done) { 265 if (writePinCount.get() > 0) { 266 LockSupport.parkNanos(this, 100); 267 /* 268 * This may be overkill to check if a thread was 269 * interrupted. There should be no interrupt of the 270 * thread pinning and unpinning the buffer. 271 */ 272 if (Thread.interrupted()) { 273 throw new ThreadInterruptedException( 274 env, "Interrupt during read operation"); 275 } 276 } else { 277 readLatch.acquireExclusive(); 278 if (writePinCount.get() == 0) { 279 done = true; 280 } else { 281 readLatch.release(); 282 } 283 } 284 } 285 } 286 287 /** 288 * @see LogSource#getBytes 289 */ getBytes(long fileOffset)290 public ByteBuffer getBytes(long fileOffset) { 291 292 /* 293 * Make a copy of this buffer (doesn't copy data, only buffer state) 294 * and position it to read the requested data. 295 * 296 * Note that we catch Exception here because it is possible that 297 * another thread is modifying the state of buffer simultaneously. 298 * Specifically, this can happen if another thread is writing this log 299 * buffer out and it does (e.g.) a flip operation on it. The actual 300 * mark/pos of the buffer may be caught in an unpredictable state. We 301 * could add another latch to protect this buffer, but that's heavier 302 * weight than we need. So the easiest thing to do is to just retry 303 * the duplicate operation. See [#9822]. 304 */ 305 ByteBuffer copy = null; 306 while (true) { 307 try { 308 copy = buffer.duplicate(); 309 copy.position((int) 310 (fileOffset - DbLsn.getFileOffset(firstLsn))); 311 break; 312 } catch (IllegalArgumentException IAE) { 313 continue; 314 } 315 } 316 return copy; 317 } 318 319 /** 320 * @see LogSource#getBytes 321 */ getBytes(long fileOffset, int numBytes)322 public ByteBuffer getBytes(long fileOffset, int numBytes) 323 throws ChecksumException { 324 325 ByteBuffer copy = getBytes(fileOffset); 326 /* Log Buffer should always hold a whole entry. */ 327 if (copy.remaining() < numBytes) { 328 throw new ChecksumException("copy.remaining=" + copy.remaining() + 329 " numBytes=" + numBytes); 330 } 331 return copy; 332 } 333 334 /** 335 * Entries in write buffers are always the current version. 336 */ getLogVersion()337 public int getLogVersion() { 338 return LogEntryType.LOG_VERSION; 339 } 340 } 341