1 /*
2  * VersionedMap.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FDBCLIENT_VERSIONEDMAP_H
22 #define FDBCLIENT_VERSIONEDMAP_H
23 #pragma once
24 
25 #include "flow/flow.h"
26 #include "flow/IndexedSet.h"
27 #include "fdbclient/FDBTypes.h"
28 #include "flow/IRandom.h"
29 #include "fdbclient/VersionedMap.actor.h"
30 
31 // PTree is a persistent balanced binary tree implementation. It is based on a treap as a way to guarantee O(1) space for node insertion (rotating is asymptotically cheap),
32 // but the constant factors are very large.
33 //
34 // Each node has three pointers - the first two are its left and right children, respectively, and the third can be set to point to a newer version of the node.
35 // This third pointer allows us to maintain persistence without full path copying, and is employed to achieve O(1) space node insertion.
36 //
37 // PTree also supports efficient finger searches.
38 namespace PTreeImpl {
39 
40 	#pragma warning(disable: 4800)
41 
42 	template<class T>
43 	struct PTree : public ReferenceCounted<PTree<T>>, FastAllocated<PTree<T>>, NonCopyable {
44 		uint32_t priority;
45 		Reference<PTree> pointer[3];
46 		Version lastUpdateVersion;
47 		bool updated;
48 		bool replacedPointer;
49 		T data;
50 
childPTree51 		Reference<PTree> child(bool which, Version at) const {
52 			if (updated && lastUpdateVersion<=at && which == replacedPointer)
53 				return pointer[2];
54 			else
55 				return pointer[which];
56 		}
leftPTree57 		Reference<PTree> left(Version at) const { return child(false, at); }
rightPTree58 		Reference<PTree> right(Version at) const { return child(true, at); }
59 
PTreePTree60 		PTree(const T& data, Version ver) : data(data), lastUpdateVersion(ver), updated(false) {
61 			priority = g_random->randomUInt32();
62 		}
PTreePTree63 		PTree( uint32_t pri, T const& data, Reference<PTree> const& left, Reference<PTree> const& right, Version ver ) : priority(pri), data(data), lastUpdateVersion(ver), updated(false) {
64 			pointer[0] = left; pointer[1] = right;
65 		}
66 	private:
67 		PTree(PTree const&);
68 	};
69 
70 	template<class T>
update(Reference<PTree<T>> const & node,bool which,Reference<PTree<T>> const & ptr,Version at)71 	static Reference<PTree<T>> update( Reference<PTree<T>> const& node, bool which, Reference<PTree<T>> const& ptr, Version at ) {
72 		if (ptr.getPtr() == node->child(which, at).getPtr()/* && node->replacedVersion <= at*/) {
73 			return node;
74 		}
75 		if (node->lastUpdateVersion == at) {
76 			//&& (!node->updated || node->replacedPointer==which)) {
77 			if (node->updated && node->replacedPointer != which) {
78 				// We are going to have to copy this node, but its aux pointer will never be used again
79 				// and should drop its reference count
80 				Reference<PTree<T>> r;
81 				if (which)
82 					r = Reference<PTree<T>>( new PTree<T>( node->priority, node->data, node->child(0, at), ptr, at ) );
83 				else
84 					r = Reference<PTree<T>>( new PTree<T>( node->priority, node->data, ptr, node->child(1, at), at ) );
85 				node->pointer[2].clear();
86 				return r;
87 			} else {
88 				if (node->updated)
89 					node->pointer[2] = ptr;
90 				else
91 					node->pointer[which] = ptr;
92 				return node;
93 			}
94 		}
95 		if ( node->updated ) {
96 			if (which)
97 				return Reference<PTree<T>>( new PTree<T>( node->priority, node->data, node->child(0, at), ptr, at ) );
98 			else
99 				return Reference<PTree<T>>( new PTree<T>( node->priority, node->data, ptr, node->child(1, at), at ) );
100 		} else {
101 			node->lastUpdateVersion = at;
102 			node->replacedPointer = which;
103 			node->pointer[2] = ptr;
104 			node->updated = true;
105 			return node;
106 		}
107 	}
108 
109 	template<class T, class X>
contains(const Reference<PTree<T>> & p,Version at,const X & x)110 	bool contains(const Reference<PTree<T>>& p, Version at, const X& x) {
111 		if (!p) return false;
112 		bool less = x < p->data;
113 		if (!less && !(p->data<x)) return true;  // x == p->data
114 		return contains(p->child(!less, at), at, x);
115 	}
116 
117 	template<class T, class X>
lower_bound(const Reference<PTree<T>> & p,Version at,const X & x,std::vector<const PTree<T> * > & f)118 	void lower_bound(const Reference<PTree<T>>& p, Version at, const X& x, std::vector<const PTree<T>*>& f){
119 		if (!p) {
120 			while (f.size() && !(x < f.back()->data))
121 				f.pop_back();
122 			return;
123 		}
124 		f.push_back(p.getPtr());
125 		bool less = x < p->data;
126 		if (!less && !(p->data<x)) return;  // x == p->data
127 		lower_bound(p->child(!less, at), at, x, f);
128 	}
129 
130 	template<class T, class X>
upper_bound(const Reference<PTree<T>> & p,Version at,const X & x,std::vector<const PTree<T> * > & f)131 	void upper_bound(const Reference<PTree<T>>& p, Version at, const X& x, std::vector<const PTree<T>*>& f){
132 		if (!p) {
133 			while (f.size() && !(x < f.back()->data))
134 				f.pop_back();
135 			return;
136 		}
137 		f.push_back(p.getPtr());
138 		upper_bound(p->child(!(x < p->data), at), at, x, f);
139 	}
140 
141 	template<class T, bool forward>
move(Version at,std::vector<const PTree<T> * > & f)142 	void move(Version at, std::vector<const PTree<T>*>& f){
143 		ASSERT(f.size());
144 		const PTree<T> *n;
145 		n = f.back();
146 		if (n->child(forward, at)){
147 			n = n->child(forward, at).getPtr();
148 			do {
149 				f.push_back(n);
150 				n = n->child(!forward, at).getPtr();
151 			} while (n);
152 		} else {
153 			do {
154 				n = f.back();
155 				f.pop_back();
156 			} while (f.size() && f.back()->child(forward, at).getPtr() == n);
157 		}
158 	}
159 
160 	template<class T, bool forward>
halfMove(Version at,std::vector<const PTree<T> * > & f)161 	int halfMove(Version at, std::vector<const PTree<T>*>& f) {
162 		// Post: f[:return_value] is the finger that would have been returned by move<forward>(at,f), and f[:original_length_of_f] is unmodified
163 		ASSERT(f.size());
164 		const PTree<T> *n;
165 		n = f.back();
166 		if (n->child(forward, at)){
167 			n = n->child(forward, at).getPtr();
168 			do {
169 				f.push_back(n);
170 				n = n->child(!forward, at).getPtr();
171 			} while (n);
172 			return f.size();
173 		} else {
174 			int s = f.size();
175 			do {
176 				n = f[s-1];
177 				--s;
178 			} while (s && f[s-1]->child(forward, at).getPtr() == n);
179 			return s;
180 		}
181 	}
182 
183 	template<class T>
next(Version at,std::vector<const PTree<T> * > & f)184 	void next(Version at, std::vector<const PTree<T>*>& f){
185 		move<T,true>(at, f);
186 	}
187 
188 	template<class T>
previous(Version at,std::vector<const PTree<T> * > & f)189 	void previous(Version at, std::vector<const PTree<T>*>& f){
190 		move<T,false>(at, f);
191 	}
192 
193 	template<class T>
halfNext(Version at,std::vector<const PTree<T> * > & f)194 	int halfNext(Version at, std::vector<const PTree<T>*>& f){
195 		return halfMove<T,true>(at, f);
196 	}
197 
198 	template<class T>
halfPrevious(Version at,std::vector<const PTree<T> * > & f)199 	int halfPrevious(Version at, std::vector<const PTree<T>*>& f){
200 		return halfMove<T,false>(at, f);
201 	}
202 
203 	template<class T>
get(std::vector<const PTree<T> * > & f)204 	T get(std::vector<const PTree<T>*>& f){
205 		ASSERT(f.size());
206 		return f.back()->data;
207 	}
208 
209 	// Modifies p to point to a PTree with x inserted
210 	template<class T>
insert(Reference<PTree<T>> & p,Version at,const T & x)211 	void insert(Reference<PTree<T>>& p, Version at, const T& x) {
212 		if (!p){
213 			p = Reference<PTree<T>>(new PTree<T>(x, at));
214 		} else {
215 			bool direction = !(x < p->data);
216 			Reference<PTree<T>> child = p->child(direction, at);
217 			insert(child, at, x);
218 			p = update(p, direction, child, at);
219 			if (p->child(direction, at)->priority > p->priority)
220 				rotate(p, at, !direction);
221 		}
222 	}
223 
224 	template<class T>
firstNode(const Reference<PTree<T>> & p,Version at)225 	Reference<PTree<T>> firstNode(const Reference<PTree<T>>& p, Version at) {
226 		if (!p) ASSERT(false);
227 		if (!p->left(at)) return p;
228 		return firstNode(p->left(at), at);
229 	}
230 
231 	template<class T>
lastNode(const Reference<PTree<T>> & p,Version at)232 	Reference<PTree<T>> lastNode(const Reference<PTree<T>>& p, Version at) {
233 		if (!p) ASSERT(false);
234 		if (!p->right(at)) return p;
235 		return lastNode(p->right(at), at);
236 	}
237 
238 	template<class T, bool last>
firstOrLastFinger(const Reference<PTree<T>> & p,Version at,std::vector<const PTree<T> * > & f)239 	void firstOrLastFinger(const Reference<PTree<T>>& p, Version at, std::vector<const PTree<T>*>& f) {
240 		if (!p) return;
241 		f.push_back(p.getPtr());
242 		firstOrLastFinger<T, last>(p->child(last, at), at, f);
243 	}
244 
245 	template<class T>
first(const Reference<PTree<T>> & p,Version at,std::vector<const PTree<T> * > & f)246 	void first(const Reference<PTree<T>>& p, Version at, std::vector<const PTree<T>*>& f) {
247 		return firstOrLastFinger<T, false>(p, at, f);
248 	}
249 
250 	template<class T>
last(const Reference<PTree<T>> & p,Version at,std::vector<const PTree<T> * > & f)251 	void last(const Reference<PTree<T>>& p, Version at, std::vector<const PTree<T>*>& f) {
252 		return firstOrLastFinger<T, true>(p, at, f);
253 	}
254 
255 	// modifies p to point to a PTree with the root of p removed
256 	template<class T>
removeRoot(Reference<PTree<T>> & p,Version at)257 	void removeRoot(Reference<PTree<T>>& p, Version at) {
258 		if (!p->right(at))
259 			p = p->left(at);
260 		else if (!p->left(at))
261 			p = p->right(at);
262 		else {
263 			bool direction = p->right(at)->priority < p->left(at)->priority;
264 			rotate(p,at,direction);
265 			Reference<PTree<T>> child = p->child(direction, at);
266 			removeRoot(child, at);
267 			p = update(p, direction, child, at);
268 		}
269 	}
270 
271 	// changes p to point to a PTree with x removed
272 	template<class T, class X>
remove(Reference<PTree<T>> & p,Version at,const X & x)273 	void remove(Reference<PTree<T>>& p, Version at, const X& x) {
274 		if (!p) ASSERT(false); // attempt to remove item not present in PTree
275 		if (x < p->data) {
276 			Reference<PTree<T>> child = p->child(0, at);
277 			remove(child, at, x);
278 			p = update(p, 0, child, at);
279 		} else if (p->data < x) {
280 			Reference<PTree<T>> child = p->child(1, at);
281 			remove(child, at, x);
282 			p = update(p, 1, child, at);
283 		} else
284 			removeRoot(p, at);
285 	}
286 
287 	template<class T, class X>
remove(Reference<PTree<T>> & p,Version at,const X & begin,const X & end)288 	void remove(Reference<PTree<T>>& p, Version at, const X& begin, const X& end) {
289 		if (!p) return;
290 		int beginDir, endDir;
291 		if (begin < p->data) beginDir = -1;
292 		else if (p->data < begin) beginDir = +1;
293 		else beginDir = 0;
294 		if (!(p->data < end)) endDir = -1;
295 		else endDir = +1;
296 
297 		if (beginDir == endDir) {
298 			Reference<PTree<T>> child = p->child(beginDir==+1, at);
299 			remove( child, at, begin, end );
300 			p = update(p, beginDir==+1, child, at);
301 		} else {
302 			if (beginDir==-1) {
303 				Reference<PTree<T>> left = p->child(0, at);
304 				removeBeyond(left, at, begin, 1);
305 				p = update(p, 0, left, at);
306 			}
307 			if (endDir==+1) {
308 				Reference<PTree<T>> right = p->child(1, at);
309 				removeBeyond(right, at, end, 0);
310 				p = update(p, 1, right, at);
311 			}
312 			if (beginDir < endDir)
313 				removeRoot(p, at);
314 		}
315 	}
316 
317 	template <class T, class X>
removeBeyond(Reference<PTree<T>> & p,Version at,const X & pivot,bool dir)318 	void removeBeyond(Reference<PTree<T>>& p, Version at, const X& pivot, bool dir) {
319 		if (!p) return;
320 
321 		if ( (p->data < pivot)^dir ) {
322 			p = p->child(!dir, at);
323 			removeBeyond(p, at, pivot, dir );
324 		} else {
325 			Reference<PTree<T>> child = p->child(dir, at);
326 			removeBeyond(child, at, pivot, dir);
327 			p = update(p, dir, child, at);
328 		}
329 	}
330 
331 	/*template<class T, class X>
332 	void remove(Reference<PTree<T>>& p, Version at, const X& begin, const X& end) {
333 		Reference<PTree<T>> left, center, right;
334 		split(p, begin, left, center, at);
335 		split(center, end, center, right, at);
336 		p = append(left, right, at);
337 	}*/
338 
339 	// inputs a PTree with the root node potentially violating the heap property
340 	// modifies p to point to a valid PTree
341 	template<class T>
demoteRoot(Reference<PTree<T>> & p,Version at)342 	void demoteRoot(Reference<PTree<T>>& p, Version at){
343 		if (!p) ASSERT(false);
344 
345 		uint32_t priority[2];
346 		for (int i=0;i<2;i++)
347 			if (p->child(i, at)) priority[i] = p->child(i, at)->priority;
348 			else priority[i] = 0;
349 
350 		bool higherDirection = priority[1] > priority[0];
351 
352 		if (priority[higherDirection] < p->priority) return;
353 
354 		// else, child(higherDirection) is a greater priority than us and the other child...
355 		rotate(p, at, !higherDirection);
356 		Reference<PTree<T>> child = p->child(!higherDirection, at);
357 		demoteRoot(child, at);
358 		p = update(p, !higherDirection, child, at);
359 	}
360 
361 	template<class T>
append(const Reference<PTree<T>> & left,const Reference<PTree<T>> & right,Version at)362 	Reference<PTree<T>> append(const Reference<PTree<T>>& left, const Reference<PTree<T>>& right, Version at) {
363 		if (!left) return right;
364 		if (!right) return left;
365 
366 		Reference<PTree<T>> r = Reference<PTree<T>>(new PTree<T>(lastNode(left, at)->data, at));
367 		ASSERT( r->data < firstNode(right, at)->data);
368 		Reference<PTree<T>> a = left;
369 		remove(a, at, r->data);
370 
371 		r->pointer[0] = a;
372 		r->pointer[1] = right;
373 		demoteRoot(r, at);
374 		return r;
375 	}
376 
377 	template<class T, class X>
split(Reference<PTree<T>> p,const X & x,Reference<PTree<T>> & left,Reference<PTree<T>> & right,Version at)378 	void split(Reference<PTree<T>> p, const X& x, Reference<PTree<T>>& left, Reference<PTree<T>>& right, Version at) {
379 		if (!p){
380 			left = Reference<PTree<T>>();
381 			right = Reference<PTree<T>>();
382 			return;
383 		}
384 
385 		if (p->data < x){
386 			left = p;
387 			Reference<PTree<T>> lr = left->right(at);
388 			split(lr, x, lr, right, at);
389 			left = update(left, 1, lr, at);
390 		} else {
391 			right = p;
392 			Reference<PTree<T>> rl = right->left(at);
393 			split(rl, x, left, rl, at);
394 			right = update(right, 0, rl, at);
395 		}
396 	}
397 
398 	template<class T>
rotate(Reference<PTree<T>> & p,Version at,bool right)399 	void rotate(Reference<PTree<T>>& p, Version at, bool right){
400 		auto r = p->child(!right, at);
401 
402 		auto n1 = r->child(!right, at);
403 		auto n2 = r->child(right, at);
404 		auto n3 = p->child(right, at);
405 
406 		auto newC = update( p, !right, n2, at );
407 		newC = update( newC, right, n3, at );
408 		p = update( r, !right, n1, at );
409 		p = update( p, right, newC, at );
410 	}
411 
412 	template <class T>
413 	void printTree(const Reference<PTree<T>>& p, Version at, int depth = 0) {
414 		if (p->left(at)) printTree(p->left(at), at, depth+1);
415 		for (int i=0;i<depth;i++)
416 			printf("  ");
417 		printf(":%s\n", describe(p->data).c_str());
418 		if (p->right(at)) printTree(p->right(at), at, depth+1);
419 	}
420 
421 	template <class T>
422 	void printTreeDetails(const Reference<PTree<T>>& p, int depth = 0) {
423 		printf("Node %p (depth %d): %s\n", p.getPtr(), depth, describe(p->data).c_str());
424 		printf("  Left: %p\n", p->pointer[0].getPtr());
425 		printf("  Right: %p\n", p->pointer[1].getPtr());
426 		if (p->pointer[2])
427 			printf("  Version %lld %s: %p\n", p->lastUpdateVersion, p->replacedPointer ? "Right" : "Left", p->pointer[2].getPtr());
428 		for(int i=0; i<3; i++)
429 			if (p->pointer[i]) printTreeDetails(p->pointer[i], depth+1);
430 	}
431 
432 	/*static int depth(const Reference<PTree<int>>& p, Version at) {
433 		if (!p) return 0;
434 		int d1 = depth(p->left(at), at) + 1;
435 		int d2 = depth(p->right(at), at) + 1;
436 		return d1 > d2 ? d1 : d2;
437 	}*/
438 
439 	template <class T>
440 	void validate(const Reference<PTree<T>>& p, Version at, T* min, T* max, int& count, int& height, int depth=0) {
441 		if (!p) { height=0; return; }
442 		ASSERT( (!min || *min <= p->data) && (!max || p->data <= *max) );
443 		for (int i=0;i<2;i++){
444 			if (p->child(i, at))
445 				ASSERT(p->child(i, at)->priority <= p->priority);
446 		}
447 
448 		++count;
449 		int h1, h2;
450 		validate(p->left(at), at, min, &p->data, count, h1, depth+1);
451 		validate(p->right(at), at, &p->data, max, count, h2, depth+1);
452 		height = std::max(h1, h2) + 1;
453 	}
454 
455 	template<class T>
check(const Reference<PTree<T>> & p)456 	void check(const Reference<PTree<T>>& p){
457 		int count=0, height;
458 		validate(p, (T*)0, (T*)0, count, height);
459 		if (count && height > 4.3 * log(double(count))){
460 			//printf("height %d; count %d\n", height, count);
461 			ASSERT(false);
462 		}
463 	}
464 
465 }
466 
467 // VersionedMap provides an interface to a partially persistent tree, allowing you to read the values at a particular version,
468 // create new versions, modify the current version of the tree, and forget versions prior to a specific version.
469 template <class K, class T>
470 class VersionedMap : NonCopyable {
471 //private:
472 public:
473 	typedef PTreeImpl::PTree<MapPair<K,std::pair<T,Version>>> PTreeT;
474 	typedef Reference< PTreeT > Tree;
475 
476 	Version oldestVersion, latestVersion;
477 	std::map<Version, Tree> roots;
478 	Tree *latestRoot;
479 
getRoot(Version v)480 	Tree const& getRoot( Version v ) const {
481 		auto r = roots.upper_bound(v);
482 		--r;
483 		return r->second;
484 	}
485 
486 	static const int overheadPerItem = 128*4;
487 	struct iterator;
488 
VersionedMap()489 	VersionedMap() : oldestVersion(0), latestVersion(0) {
490 		latestRoot = &roots[0];
491 	}
VersionedMap(VersionedMap && v)492 	VersionedMap( VersionedMap&& v ) BOOST_NOEXCEPT : oldestVersion(v.oldestVersion), latestVersion(v.latestVersion), roots(std::move(v.roots)) {
493 		latestRoot = &roots[latestVersion];
494 	}
495 	void operator = (VersionedMap && v) BOOST_NOEXCEPT {
496 		oldestVersion = v.oldestVersion;
497 		latestVersion = v.latestVersion;
498 		roots = std::move(v.roots);
499 		latestRoot = &roots[latestVersion];
500 	}
501 
getLatestVersion()502 	Version getLatestVersion() const { return latestVersion; }
getOldestVersion()503 	Version getOldestVersion() const { return oldestVersion; }
getNextOldestVersion()504 	Version getNextOldestVersion() const { return roots.upper_bound(oldestVersion)->first; }
505 
forgetVersionsBefore(Version newOldestVersion)506 	void forgetVersionsBefore(Version newOldestVersion) {
507 		ASSERT( newOldestVersion <= latestVersion );
508 		roots[newOldestVersion] = getRoot(newOldestVersion);
509 		roots.erase(roots.begin(), roots.lower_bound(newOldestVersion));
510 		oldestVersion = newOldestVersion;
511 	}
512 
513 	Future<Void> forgetVersionsBeforeAsync( Version newOldestVersion, int taskID = 7000 ) {
514 		ASSERT( newOldestVersion <= latestVersion );
515 		roots[newOldestVersion] = getRoot(newOldestVersion);
516 
517 		vector<Tree> toFree;
518 		toFree.reserve(10000);
519 		auto newBegin = roots.lower_bound(newOldestVersion);
520 		Tree *lastRoot = nullptr;
521 		for(auto root = roots.begin(); root != newBegin; ++root) {
522 			if(root->second) {
523 				if(lastRoot != nullptr && root->second == *lastRoot) {
524 					(*lastRoot).clear();
525 				}
526 				if(root->second->isSoleOwner()) {
527 					toFree.push_back(root->second);
528 				}
529 				lastRoot = &root->second;
530 			}
531 		}
532 
533 		roots.erase(roots.begin(), newBegin);
534 		oldestVersion = newOldestVersion;
535 		return deferredCleanupActor(toFree, taskID);
536 	}
537 
538 public:
createNewVersion(Version version)539 	void createNewVersion(Version version) {     // following sets and erases are into the given version, which may now be passed to at().  Must be called in monotonically increasing order.
540 		if (version > latestVersion) {
541 			latestVersion = version;
542 			Tree r = getRoot(version);
543 			latestRoot = &roots[version];
544 			*latestRoot = r;
545 		} else ASSERT( version == latestVersion );
546 	}
547 
548 	// insert() and erase() invalidate atLatest() and all iterators into it
insert(const K & k,const T & t)549 	void insert(const K& k, const T& t) {
550 		insert( k, t, latestVersion );
551 	}
insert(const K & k,const T & t,Version insertAt)552 	void insert(const K& k, const T& t, Version insertAt) {
553 		if (PTreeImpl::contains( *latestRoot, latestVersion, k )) PTreeImpl::remove( *latestRoot, latestVersion, k ); // FIXME: Make PTreeImpl::insert do this automatically  (see also WriteMap.h FIXME)
554 		PTreeImpl::insert( *latestRoot, latestVersion, MapPair<K,std::pair<T,Version>>(k,std::make_pair(t,insertAt)) );
555 	}
erase(const K & begin,const K & end)556 	void erase(const K& begin, const K& end) {
557 		PTreeImpl::remove( *latestRoot, latestVersion, begin, end );
558 	}
erase(const K & key)559 	void erase(const K& key ) {  // key must be present
560 		PTreeImpl::remove( *latestRoot, latestVersion, key );
561 	}
erase(iterator const & item)562 	void erase(iterator const& item) {  // iterator must be in latest version!
563 		// SOMEDAY: Optimize to use item.finger and avoid repeated search
564 		K key = item.key();
565 		erase(key);
566 	}
567 
568 	// for(auto i = vm.at(version).lower_bound(range.begin); i < range.end; ++i)
569 	struct iterator{
iteratoriterator570 		explicit iterator(Tree const& root, Version at) : root(root), at(at) {}
571 
keyiterator572 		K const& key() const { return finger.back()->data.key; }
insertVersioniterator573 		Version insertVersion() const { return finger.back()->data.value.second; }  // Returns the version at which the current item was inserted
574 		operator bool() const { return finger.size()!=0; }
575 		bool operator < (const K& key) const { return this->key() < key; }
576 
577 		T const& operator*() { return finger.back()->data.value.first; }
578 		T const* operator->() { return &finger.back()->data.value.first; }
579 		void operator++() { if (finger.size()) PTreeImpl::next( at, finger ); else PTreeImpl::first(root, at, finger); }
580 		void operator--() { if (finger.size()) PTreeImpl::previous( at, finger ); else PTreeImpl::last(root, at, finger); }
581 		bool operator == ( const iterator& r ) const { if (finger.size() && r.finger.size()) return finger.back() == r.finger.back(); else return finger.size()==r.finger.size(); }
582 		bool operator != ( const iterator& r ) const { if (finger.size() && r.finger.size()) return finger.back() != r.finger.back(); else return finger.size()!=r.finger.size(); }
583 
584 	private:
585 		friend class VersionedMap<K,T>;
586 		Tree root;
587 		Version at;
588 		vector< PTreeT const* > finger;
589 	};
590 
591 	class ViewAtVersion {
592 	public:
ViewAtVersion(Tree const & root,Version at)593 		ViewAtVersion(Tree const& root, Version at) : root(root), at(at) {}
594 
begin()595 		iterator begin() const { iterator i(root,at); PTreeImpl::first( root, at, i.finger ); return i; }
end()596 		iterator end() const { return iterator(root,at); }
597 
598 		// Returns x such that key==*x, or end()
599 		template <class X>
find(const X & key)600 		iterator find(const X &key) const {
601 			iterator i(root,at);
602 			PTreeImpl::lower_bound( root, at, key, i.finger );
603 			if (i && i.key() == key)
604 				return i;
605 			else
606 				return end();
607 		}
608 
609 		// Returns the smallest x such that *x>=key, or end()
610 		template <class X>
lower_bound(const X & key)611 		iterator lower_bound(const X &key) const {
612 			iterator i(root,at);
613 			PTreeImpl::lower_bound( root, at, key, i.finger );
614 			return i;
615 		}
616 
617 		// Returns the smallest x such that *x>key, or end()
618 		template <class X>
upper_bound(const X & key)619 		iterator upper_bound(const X &key) const {
620 			iterator i(root,at);
621 			PTreeImpl::upper_bound( root, at, key, i.finger );
622 			return i;
623 		}
624 
625 		// Returns the largest x such that *x<=key, or end()
626 		template <class X>
lastLessOrEqual(const X & key)627 		iterator lastLessOrEqual( const X &key ) const {
628 			iterator i(root,at);
629 			PTreeImpl::upper_bound( root, at, key, i.finger );
630 			--i;
631 			return i;
632 		}
633 
634 		// Returns the largest x such that *x<key, or end()
635 		template <class X>
lastLess(const X & key)636 		iterator lastLess( const X &key ) const {
637 			iterator i(root,at);
638 			PTreeImpl::lower_bound( root, at, key, i.finger );
639 			--i;
640 			return i;
641 		}
642 
validate()643 		void validate() {
644 			int count=0, height=0;
645 			PTreeImpl::validate<MapPair<K,std::pair<T,Version>>>( root, at, NULL, NULL, count, height );
646 			if ( height > 100 )
647 				TraceEvent(SevWarnAlways, "DiabolicalPTreeSize").detail("Size", count).detail("Height", height);
648 		}
649 	private:
650 		Tree root;
651 		Version at;
652 	};
653 
at(Version v)654 	ViewAtVersion at( Version v ) const { return ViewAtVersion(getRoot(v), v); }
atLatest()655 	ViewAtVersion atLatest() const { return ViewAtVersion(*latestRoot, latestVersion); }
656 
657 	// TODO: getHistory?
658 
659 };
660 
661 #endif
662