1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.regionserver;
20 
21 import java.util.LinkedList;
22 import java.util.concurrent.atomic.AtomicLong;
23 
24 import com.google.common.annotations.VisibleForTesting;
25 
26 import com.google.common.base.Objects;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.hbase.util.ClassSize;
32 
33 
34 /**
35  * Manages the read/write consistency. This provides an interface for readers to determine what
36  * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit"
37  * the new writes for readers to read (thus forming atomic transactions).
38  */
39 @InterfaceAudience.Private
40 public class MultiVersionConcurrencyControl {
41   private static final Log LOG = LogFactory.getLog(MultiVersionConcurrencyControl.class);
42   static final long NO_WRITE_NUMBER = 0;
43 
44   final AtomicLong readPoint = new AtomicLong(0);
45   final AtomicLong writePoint = new AtomicLong(0);
46   private final Object readWaiters = new Object();
47   /**
48    * Represents no value, or not set.
49    */
50   public static final long NONE = -1;
51 
52   // This is the pending queue of writes.
53   //
54   // TODO(eclark): Should this be an array of fixed size to
55   // reduce the number of allocations on the write path?
56   // This could be equal to the number of handlers + a small number.
57   // TODO: St.Ack 20150903 Sounds good to me.
58   private final LinkedList<WriteEntry> writeQueue = new LinkedList<WriteEntry>();
59 
MultiVersionConcurrencyControl()60   public MultiVersionConcurrencyControl() {
61     super();
62   }
63 
64   /**
65    * Construct and set read point. Write point is uninitialized.
66    */
MultiVersionConcurrencyControl(long startPoint)67   public MultiVersionConcurrencyControl(long startPoint) {
68     tryAdvanceTo(startPoint, NONE);
69   }
70 
71   /**
72    * Step the MVCC forward on to a new read/write basis.
73    * @param newStartPoint
74    */
advanceTo(long newStartPoint)75   public void advanceTo(long newStartPoint) {
76     while (true) {
77       long seqId = this.getWritePoint();
78       if (seqId >= newStartPoint) break;
79       if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break;
80     }
81   }
82 
83   /**
84    * Step the MVCC forward on to a new read/write basis.
85    * @param newStartPoint Point to move read and write points to.
86    * @param expected If not -1 (#NONE)
87    * @return Returns false if <code>expected</code> is not equal to the
88    * current <code>readPoint</code> or if <code>startPoint</code> is less than current
89    * <code>readPoint</code>
90    */
tryAdvanceTo(long newStartPoint, long expected)91   boolean tryAdvanceTo(long newStartPoint, long expected) {
92     synchronized (writeQueue) {
93       long currentRead = this.readPoint.get();
94       long currentWrite = this.writePoint.get();
95       if (currentRead != currentWrite) {
96         throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
97           ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
98       }
99       if (expected != NONE && expected != currentRead) {
100         return false;
101       }
102 
103       if (newStartPoint < currentRead) {
104         return false;
105       }
106 
107       readPoint.set(newStartPoint);
108       writePoint.set(newStartPoint);
109     }
110     return true;
111   }
112 
113   /**
114    * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
115    * to our queue of ongoing writes. Return this WriteEntry instance.
116    * To complete the write transaction and wait for it to be visible, call
117    * {@link #completeAndWait(WriteEntry)}. If the write failed, call
118    * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write
119    * transaction.
120    * @see #complete(WriteEntry)
121    * @see #completeAndWait(WriteEntry)
122    */
begin()123   public WriteEntry begin() {
124     synchronized (writeQueue) {
125       long nextWriteNumber = writePoint.incrementAndGet();
126       WriteEntry e = new WriteEntry(nextWriteNumber);
127       writeQueue.add(e);
128       return e;
129     }
130   }
131 
132   /**
133    * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs
134    * to complete.
135    */
await()136   public void await() {
137     // Add a write and then wait on reads to catch up to it.
138     completeAndWait(begin());
139   }
140 
141   /**
142    * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the
143    * read point catches up to our write.
144    *
145    * At the end of this call, the global read point is at least as large as the write point
146    * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
147    */
completeAndWait(WriteEntry e)148   public void completeAndWait(WriteEntry e) {
149     complete(e);
150     waitForRead(e);
151   }
152 
153   /**
154    * Mark the {@link WriteEntry} as complete and advance the read point as much as possible.
155    * Call this even if the write has FAILED (AFTER backing out the write transaction
156    * changes completely) so we can clean up the outstanding transaction.
157    *
158    * How much is the read point advanced?
159    *
160    * Let S be the set of all write numbers that are completed. Set the read point to the highest
161    * numbered write of S.
162    *
163    * @param writeEntry
164    *
165    * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
166    */
complete(WriteEntry writeEntry)167   public boolean complete(WriteEntry writeEntry) {
168     synchronized (writeQueue) {
169       writeEntry.markCompleted();
170 
171       long nextReadValue = NONE;
172       boolean ranOnce = false;
173       while (!writeQueue.isEmpty()) {
174         ranOnce = true;
175         WriteEntry queueFirst = writeQueue.getFirst();
176 
177         if (nextReadValue > 0) {
178           if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
179             throw new RuntimeException("Invariant in complete violated, nextReadValue="
180                 + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
181           }
182         }
183 
184         if (queueFirst.isCompleted()) {
185           nextReadValue = queueFirst.getWriteNumber();
186           writeQueue.removeFirst();
187         } else {
188           break;
189         }
190       }
191 
192       if (!ranOnce) {
193         throw new RuntimeException("There is no first!");
194       }
195 
196       if (nextReadValue > 0) {
197         synchronized (readWaiters) {
198           readPoint.set(nextReadValue);
199           readWaiters.notifyAll();
200         }
201       }
202       return readPoint.get() >= writeEntry.getWriteNumber();
203     }
204   }
205 
206   /**
207    * Wait for the global readPoint to advance up to the passed in write entry number.
208    */
waitForRead(WriteEntry e)209   void waitForRead(WriteEntry e) {
210     boolean interrupted = false;
211     int count = 0;
212     synchronized (readWaiters) {
213       while (readPoint.get() < e.getWriteNumber()) {
214         if (count % 100 == 0 && count > 0) {
215           LOG.warn("STUCK: " + this);
216         }
217         count++;
218         try {
219           readWaiters.wait(10);
220         } catch (InterruptedException ie) {
221           // We were interrupted... finish the loop -- i.e. cleanup --and then
222           // on our way out, reset the interrupt flag.
223           interrupted = true;
224         }
225       }
226     }
227     if (interrupted) {
228       Thread.currentThread().interrupt();
229     }
230   }
231 
232   @VisibleForTesting
toString()233   public String toString() {
234     return Objects.toStringHelper(this)
235         .add("readPoint", readPoint)
236         .add("writePoint", writePoint).toString();
237   }
238 
getReadPoint()239   public long getReadPoint() {
240     return readPoint.get();
241   }
242 
243   @VisibleForTesting
getWritePoint()244   public long getWritePoint() {
245     return writePoint.get();
246   }
247 
248   /**
249    * Write number and whether write has completed given out at start of a write transaction.
250    * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
251    */
252   @InterfaceAudience.Private
253   public static class WriteEntry {
254     private final long writeNumber;
255     private boolean completed = false;
256 
WriteEntry(long writeNumber)257     WriteEntry(long writeNumber) {
258       this.writeNumber = writeNumber;
259     }
260 
markCompleted()261     void markCompleted() {
262       this.completed = true;
263     }
264 
isCompleted()265     boolean isCompleted() {
266       return this.completed;
267     }
268 
getWriteNumber()269     public long getWriteNumber() {
270       return this.writeNumber;
271     }
272 
273     @Override
toString()274     public String toString() {
275       return this.writeNumber + ", " + this.completed;
276     }
277   }
278 
279   public static final long FIXED_SIZE = ClassSize.align(
280       ClassSize.OBJECT +
281       2 * Bytes.SIZEOF_LONG +
282       2 * ClassSize.REFERENCE);
283 }
284