1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.regionserver; 20 21 import java.util.LinkedList; 22 import java.util.concurrent.atomic.AtomicLong; 23 24 import com.google.common.annotations.VisibleForTesting; 25 26 import com.google.common.base.Objects; 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 import org.apache.hadoop.hbase.classification.InterfaceAudience; 30 import org.apache.hadoop.hbase.util.Bytes; 31 import org.apache.hadoop.hbase.util.ClassSize; 32 33 34 /** 35 * Manages the read/write consistency. This provides an interface for readers to determine what 36 * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit" 37 * the new writes for readers to read (thus forming atomic transactions). 38 */ 39 @InterfaceAudience.Private 40 public class MultiVersionConcurrencyControl { 41 private static final Log LOG = LogFactory.getLog(MultiVersionConcurrencyControl.class); 42 static final long NO_WRITE_NUMBER = 0; 43 44 final AtomicLong readPoint = new AtomicLong(0); 45 final AtomicLong writePoint = new AtomicLong(0); 46 private final Object readWaiters = new Object(); 47 /** 48 * Represents no value, or not set. 49 */ 50 public static final long NONE = -1; 51 52 // This is the pending queue of writes. 53 // 54 // TODO(eclark): Should this be an array of fixed size to 55 // reduce the number of allocations on the write path? 56 // This could be equal to the number of handlers + a small number. 57 // TODO: St.Ack 20150903 Sounds good to me. 58 private final LinkedList<WriteEntry> writeQueue = new LinkedList<WriteEntry>(); 59 MultiVersionConcurrencyControl()60 public MultiVersionConcurrencyControl() { 61 super(); 62 } 63 64 /** 65 * Construct and set read point. Write point is uninitialized. 66 */ MultiVersionConcurrencyControl(long startPoint)67 public MultiVersionConcurrencyControl(long startPoint) { 68 tryAdvanceTo(startPoint, NONE); 69 } 70 71 /** 72 * Step the MVCC forward on to a new read/write basis. 73 * @param newStartPoint 74 */ advanceTo(long newStartPoint)75 public void advanceTo(long newStartPoint) { 76 while (true) { 77 long seqId = this.getWritePoint(); 78 if (seqId >= newStartPoint) break; 79 if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break; 80 } 81 } 82 83 /** 84 * Step the MVCC forward on to a new read/write basis. 85 * @param newStartPoint Point to move read and write points to. 86 * @param expected If not -1 (#NONE) 87 * @return Returns false if <code>expected</code> is not equal to the 88 * current <code>readPoint</code> or if <code>startPoint</code> is less than current 89 * <code>readPoint</code> 90 */ tryAdvanceTo(long newStartPoint, long expected)91 boolean tryAdvanceTo(long newStartPoint, long expected) { 92 synchronized (writeQueue) { 93 long currentRead = this.readPoint.get(); 94 long currentWrite = this.writePoint.get(); 95 if (currentRead != currentWrite) { 96 throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead + 97 ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo"); 98 } 99 if (expected != NONE && expected != currentRead) { 100 return false; 101 } 102 103 if (newStartPoint < currentRead) { 104 return false; 105 } 106 107 readPoint.set(newStartPoint); 108 writePoint.set(newStartPoint); 109 } 110 return true; 111 } 112 113 /** 114 * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it 115 * to our queue of ongoing writes. Return this WriteEntry instance. 116 * To complete the write transaction and wait for it to be visible, call 117 * {@link #completeAndWait(WriteEntry)}. If the write failed, call 118 * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write 119 * transaction. 120 * @see #complete(WriteEntry) 121 * @see #completeAndWait(WriteEntry) 122 */ begin()123 public WriteEntry begin() { 124 synchronized (writeQueue) { 125 long nextWriteNumber = writePoint.incrementAndGet(); 126 WriteEntry e = new WriteEntry(nextWriteNumber); 127 writeQueue.add(e); 128 return e; 129 } 130 } 131 132 /** 133 * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs 134 * to complete. 135 */ await()136 public void await() { 137 // Add a write and then wait on reads to catch up to it. 138 completeAndWait(begin()); 139 } 140 141 /** 142 * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the 143 * read point catches up to our write. 144 * 145 * At the end of this call, the global read point is at least as large as the write point 146 * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. 147 */ completeAndWait(WriteEntry e)148 public void completeAndWait(WriteEntry e) { 149 complete(e); 150 waitForRead(e); 151 } 152 153 /** 154 * Mark the {@link WriteEntry} as complete and advance the read point as much as possible. 155 * Call this even if the write has FAILED (AFTER backing out the write transaction 156 * changes completely) so we can clean up the outstanding transaction. 157 * 158 * How much is the read point advanced? 159 * 160 * Let S be the set of all write numbers that are completed. Set the read point to the highest 161 * numbered write of S. 162 * 163 * @param writeEntry 164 * 165 * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) 166 */ complete(WriteEntry writeEntry)167 public boolean complete(WriteEntry writeEntry) { 168 synchronized (writeQueue) { 169 writeEntry.markCompleted(); 170 171 long nextReadValue = NONE; 172 boolean ranOnce = false; 173 while (!writeQueue.isEmpty()) { 174 ranOnce = true; 175 WriteEntry queueFirst = writeQueue.getFirst(); 176 177 if (nextReadValue > 0) { 178 if (nextReadValue + 1 != queueFirst.getWriteNumber()) { 179 throw new RuntimeException("Invariant in complete violated, nextReadValue=" 180 + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber()); 181 } 182 } 183 184 if (queueFirst.isCompleted()) { 185 nextReadValue = queueFirst.getWriteNumber(); 186 writeQueue.removeFirst(); 187 } else { 188 break; 189 } 190 } 191 192 if (!ranOnce) { 193 throw new RuntimeException("There is no first!"); 194 } 195 196 if (nextReadValue > 0) { 197 synchronized (readWaiters) { 198 readPoint.set(nextReadValue); 199 readWaiters.notifyAll(); 200 } 201 } 202 return readPoint.get() >= writeEntry.getWriteNumber(); 203 } 204 } 205 206 /** 207 * Wait for the global readPoint to advance up to the passed in write entry number. 208 */ waitForRead(WriteEntry e)209 void waitForRead(WriteEntry e) { 210 boolean interrupted = false; 211 int count = 0; 212 synchronized (readWaiters) { 213 while (readPoint.get() < e.getWriteNumber()) { 214 if (count % 100 == 0 && count > 0) { 215 LOG.warn("STUCK: " + this); 216 } 217 count++; 218 try { 219 readWaiters.wait(10); 220 } catch (InterruptedException ie) { 221 // We were interrupted... finish the loop -- i.e. cleanup --and then 222 // on our way out, reset the interrupt flag. 223 interrupted = true; 224 } 225 } 226 } 227 if (interrupted) { 228 Thread.currentThread().interrupt(); 229 } 230 } 231 232 @VisibleForTesting toString()233 public String toString() { 234 return Objects.toStringHelper(this) 235 .add("readPoint", readPoint) 236 .add("writePoint", writePoint).toString(); 237 } 238 getReadPoint()239 public long getReadPoint() { 240 return readPoint.get(); 241 } 242 243 @VisibleForTesting getWritePoint()244 public long getWritePoint() { 245 return writePoint.get(); 246 } 247 248 /** 249 * Write number and whether write has completed given out at start of a write transaction. 250 * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait. 251 */ 252 @InterfaceAudience.Private 253 public static class WriteEntry { 254 private final long writeNumber; 255 private boolean completed = false; 256 WriteEntry(long writeNumber)257 WriteEntry(long writeNumber) { 258 this.writeNumber = writeNumber; 259 } 260 markCompleted()261 void markCompleted() { 262 this.completed = true; 263 } 264 isCompleted()265 boolean isCompleted() { 266 return this.completed; 267 } 268 getWriteNumber()269 public long getWriteNumber() { 270 return this.writeNumber; 271 } 272 273 @Override toString()274 public String toString() { 275 return this.writeNumber + ", " + this.completed; 276 } 277 } 278 279 public static final long FIXED_SIZE = ClassSize.align( 280 ClassSize.OBJECT + 281 2 * Bytes.SIZEOF_LONG + 282 2 * ClassSize.REFERENCE); 283 } 284