1 /*
2  * SkipList.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include <stdint.h>
22 #include <memory.h>
23 #include <stdio.h>
24 #include <algorithm>
25 #include <numeric>
26 #include <string>
27 
28 /*
29 #ifdef __GNUG__
30 #include <smmintrin.h>
31 #endif
32 */
33 
34 #include "flow/Platform.h"
35 #include "fdbrpc/fdbrpc.h"
36 #include "fdbrpc/PerfMetric.h"
37 #include "fdbclient/FDBTypes.h"
38 #include "fdbclient/KeyRangeMap.h"
39 #include "fdbclient/SystemData.h"
40 #include "fdbserver/Knobs.h"
41 
42 #define PARALLEL_THREAD_COUNT 0	// FIXME: When >1, program execution (e.g. random numbers) is/was nondeterministic.  Why?
43 
44 using std::min;
45 using std::max;
46 using std::make_pair;
47 
48 static vector<PerfDoubleCounter*> skc;
49 
50 static thread_local uint32_t g_seed = 0;
51 
skfastrand()52 static inline int skfastrand() {
53 	g_seed = g_seed * 1664525L + 1013904223L;
54 	return g_seed;
55 }
56 
57 void setAffinity(int proc);
58 
59 class SlowConflictSet {
60 public:
61 	bool is_conflict( const VectorRef<KeyRangeRef>& readRanges, Version read_snapshot );
62 	void add( const VectorRef<KeyRangeRef>& clearRanges, const VectorRef<KeyValueRef>& setValues, Version now );
63 	void clear( Version now );
64 
65 private:
66 	KeyRangeMap<Version> age;
67 };
68 
is_conflict(const VectorRef<KeyRangeRef> & readRanges,Version read_snapshot)69 bool SlowConflictSet::is_conflict( const VectorRef<KeyRangeRef>& readRanges, Version read_snapshot ) {
70 	for(auto range = readRanges.begin(); range != readRanges.end(); ++range) {
71 		auto intersecting = age.intersectingRanges( *range );
72 		for(auto it = intersecting.begin(); it != intersecting.end(); ++it)
73 			if ( it.value() > read_snapshot )
74 				return true;
75 	}
76 	return false;
77 }
78 
clear(Version now)79 void SlowConflictSet::clear( Version now ) {
80 	age.insert(allKeys, now);
81 }
82 
add(const VectorRef<KeyRangeRef> & clearRanges,const VectorRef<KeyValueRef> & setValues,Version now)83 void SlowConflictSet::add( const VectorRef<KeyRangeRef>& clearRanges, const VectorRef<KeyValueRef>& setValues, Version now ) {
84 	for(auto c = clearRanges.begin(); c != clearRanges.end(); ++c)
85 		age.insert( *c, now );
86 	for(auto s = setValues.begin(); s != setValues.end(); ++s)
87 		age.insert( s->key, now );
88 }
89 
90 
91 PerfDoubleCounter
92 	g_buildTest("Build", skc),
93 	g_add("Add", skc),
94 	g_add_sort("A.Sort", skc),
95 	g_detectConflicts("Detect", skc),
96 	g_sort("D.Sort", skc),
97 	g_combine("D.Combine", skc),
98 	g_checkRead("D.CheckRead", skc),
99 	g_checkBatch("D.CheckIntraBatch", skc),
100 	g_merge("D.MergeWrite", skc),
101 	g_merge_launch("D.Merge.Launch", skc),
102 	g_merge_fork("D.Merge.Fork", skc),
103 	g_merge_start_var("D.Merge.StartVariance", skc),
104 	g_merge_end_var("D.Merge.EndVariance", skc),
105 	g_merge_run_var("D.Merge.RunVariance", skc),
106 	g_merge_run_shortest("D.Merge.ShortestRun", skc),
107 	g_merge_run_longest("D.Merge.LongestRun", skc),
108 	g_merge_run_total("D.Merge.TotalRun", skc),
109 	g_merge_join("D.Merge.Join", skc),
110 	g_removeBefore("D.RemoveBefore", skc)
111 	;
112 
compare(const StringRef & a,const StringRef & b)113 static force_inline int compare( const StringRef& a, const StringRef& b ) {
114 	int c = memcmp( a.begin(), b.begin(), min(a.size(), b.size()) );
115 	if (c<0) return -1;
116 	if (c>0) return +1;
117 	if (a.size() < b.size()) return -1;
118 	if (a.size() == b.size()) return 0;
119 	return +1;
120 }
121 
122 struct ReadConflictRange {
123 	StringRef begin, end;
124 	Version version;
125 	int transaction;
ReadConflictRangeReadConflictRange126 	ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction )
127 		: begin(begin), end(end), version(version), transaction(transaction)
128 	{
129 	}
operator <ReadConflictRange130 	bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin)<0; }
131 };
132 
133 
134 struct KeyInfo {
135 	StringRef key;
136 	int* pIndex;
137 	bool nextKey;
138 	bool begin;
139 	bool write;
140 	int transaction;
141 
KeyInfoKeyInfo142 	KeyInfo() {};
KeyInfoKeyInfo143 	KeyInfo( StringRef key, bool nextKey, bool begin, bool write, int transaction, int* pIndex ) : key(key), nextKey(nextKey), begin(begin), write(write), transaction(transaction), pIndex(pIndex) {}
144 };
145 
146 // returns true if done with string
getCharacter(const KeyInfo & ki,int character,int & outputCharacter)147 force_inline bool getCharacter(const KeyInfo& ki, int character, int &outputCharacter){
148 	// normal case
149 	if (character < ki.key.size()){
150 		outputCharacter = 5 + ki.key.begin()[character];
151 		return false;
152 	}
153 
154 	// nextKey append a zero
155 	if (ki.nextKey && character >= ki.key.size()){
156 		if (character == ki.key.size()){
157 			outputCharacter =  5; // extra '0' character
158 			return false;
159 		}
160 		character--;
161 	}
162 
163 	// termination
164 	if (character == ki.key.size()){
165 		outputCharacter = 0;
166 		return false;
167 	}
168 
169 	if (character == ki.key.size()+1) {
170 		// end/begin+read/write relative sorting
171 		outputCharacter = ki.begin*2 + (ki.write ^ ki.begin);
172 		return false;
173 	}
174 
175 	outputCharacter = 0;
176 	return true;
177 }
178 
operator <(const KeyInfo & lhs,const KeyInfo & rhs)179 bool operator < ( const KeyInfo& lhs, const KeyInfo& rhs ) {
180 	int i = min(lhs.key.size(), rhs.key.size());
181 	int c = memcmp( lhs.key.begin(), rhs.key.begin(), i );
182 	if (c!=0) return c<0;
183 
184 	// SOMEDAY: This is probably not very fast.  Slows D.Sort by ~20% relative to previous (incorrect) version.
185 
186 	bool lDone, rDone;
187 	int lc, rc;
188 	while (true) {
189 		lDone = getCharacter(lhs, i, lc);
190 		rDone = getCharacter(rhs, i, rc);
191 		if (lDone && rDone) return false;		// equality
192 		if (lc < rc) return true;
193 		if (lc > rc) return false;
194 		i++;
195 	}
196 }
197 
operator ==(const KeyInfo & lhs,const KeyInfo & rhs)198 bool operator == (const KeyInfo& lhs, const KeyInfo& rhs ) {
199 	return !(lhs<rhs || rhs<lhs);
200 }
201 
swapSort(vector<KeyInfo> & points,int a,int b)202 void swapSort(vector<KeyInfo>& points, int a, int b){
203 	if (points[b] < points[a]){
204 		KeyInfo temp;
205 		temp = points[a];
206 		points[a] = points[b];
207 		points[b] = temp;
208 	}
209 }
210 
smallSort(vector<KeyInfo> & points,int start,int N)211 void smallSort(vector<KeyInfo>& points, int start, int N){
212 	for (int i=1;i<N;i++)
213 		for (int j=i;j>0;j-=2)
214 			swapSort(points, start+j-1, start+j);
215 	for (int i=N-2;i>0;i--)
216 		for (int j=i;j>0;j-=2)
217 			swapSort(points, start+j-1, start+j);
218 }
219 
220 struct SortTask {
221 	int begin;
222 	int size;
223 	int character;
SortTaskSortTask224 	SortTask(int begin, int size, int character) : begin(begin), size(size), character(character) {}
225 };
226 
sortPoints(vector<KeyInfo> & points)227 void sortPoints(vector<KeyInfo>& points){
228 	vector<SortTask> tasks;
229 	vector<KeyInfo> newPoints;
230 	vector<int> counts;
231 
232 	tasks.push_back( SortTask(0, points.size(), 0) );
233 
234 	while (tasks.size()){
235 		SortTask st = tasks.back();
236 		tasks.pop_back();
237 
238 		if (st.size < 10){
239 			//smallSort(points, st.begin, st.size);
240 			std::sort(points.begin() + st.begin, points.begin() + st.begin + st.size );
241 			continue;
242 		}
243 
244 		newPoints.resize(st.size);
245 		counts.assign(256+5, 0);
246 
247 		// get counts
248 		int c;
249 		bool allDone = true;
250 		for (int i=st.begin; i<st.begin+st.size; i++){
251 			allDone &= getCharacter(points[i], st.character, c);
252 			counts[c]++;
253 		}
254 		if (allDone)
255 			continue;
256 
257 		// calculate offsets from counts and build next level of tasks
258 		int total=0;
259 		for(int i=0;i<counts.size();i++){
260 			int temp = counts[i];
261 			if (temp > 1)
262 				tasks.push_back(SortTask(st.begin+total, temp, st.character+1));
263 			counts[i] = total;
264 			total += temp;
265 		}
266 
267 		// put in their places
268 		for (int i=st.begin; i<st.begin+st.size; i++){
269 			getCharacter(points[i], st.character, c);
270 			newPoints[counts[c]++] = points[i];
271 		}
272 
273 		//copy back into original points array
274 		for (int i=0;i<st.size;i++)
275 			points[st.begin+i] = newPoints[i];
276 	}
277 
278 	//cout << endl << "Radix sort done" << endl;
279 }
280 
281 class SkipList : NonCopyable
282 {
283 private:
284 	static const int MaxLevels = 26;
285 
randomLevel()286 	int randomLevel() {
287 		/*int l = 0;
288 		while (g_random->random01() < 0.5 && l < MaxLevels-1) l++;
289 		return l; */
290 
291 		//g_random->randomInt(0, 1<<(MaxLevels-1));
292 		uint32_t i = uint32_t(skfastrand()) >> (32-(MaxLevels-1));
293 		int level = 0;
294 		while (i&1) {
295 			i>>=1;
296 			level++;
297 		}
298 		ASSERT( level < MaxLevels );
299 		return level;
300 	}
301 
302 	/*
303 	struct Node {
304 		int nPointers, valueLength;
305 		Node *pointers[nPointers];
306 		Version maxVersions[nPointers];
307 		char value[valueLength];
308 	};
309 	*/
310 
311 	struct Node {
levelSkipList::Node312 		int level() { return nPointers-1; }
valueSkipList::Node313 		uint8_t* value() { return end() + nPointers*(sizeof(Node*)+sizeof(Version)); }
lengthSkipList::Node314 		int length() { return valueLength; }
getNextSkipList::Node315 		Node* getNext(int i) { return *((Node**)end() + i); }
setNextSkipList::Node316 		void setNext(int i, Node* n) {
317 			*((Node**)end() + i) = n;
318 			#if defined(_DEBUG) || 1
319 			/*if (n && n->level() < i)
320 				*(volatile int*)0 = 0;*/
321 			#endif
322 		}
323 
getMaxVersionSkipList::Node324 		Version getMaxVersion(int i) { return ((Version*)(end() + nPointers*sizeof(Node*)))[i]; }
setMaxVersionSkipList::Node325 		void setMaxVersion(int i, Version v) { ((Version*)(end() + nPointers*sizeof(Node*)))[i] = v; }
326 
327 		// Return a node with initialized value but uninitialized pointers
createSkipList::Node328 		static Node* create( const StringRef& value, int level ) {
329 			int nodeSize = sizeof(Node) + value.size() + (level+1)*(sizeof(Node*)+sizeof(Version));
330 
331 			Node* n;
332 			if (nodeSize <= 64) {
333 				n = (Node*)FastAllocator<64>::allocate();
334 				INSTRUMENT_ALLOCATE("SkipListNode64");
335 			} else if (nodeSize <= 128) {
336 				n = (Node*)FastAllocator<128>::allocate();
337 				INSTRUMENT_ALLOCATE("SkipListNode128");
338 			} else {
339 				n = (Node*)new char[ nodeSize ];
340 				INSTRUMENT_ALLOCATE("SkipListNodeLarge");
341 			}
342 
343 			n->nPointers = level+1;
344 
345 			n->valueLength = value.size();
346 			memcpy(n->value(), value.begin(), value.size());
347 			return n;
348 		}
349 
350 		// pre: level>0, all lower level nodes between this and getNext(level) have correct maxversions
calcVersionForLevelSkipList::Node351 		void calcVersionForLevel(int level){
352 			Node *end = getNext(level);
353 			Version v = getMaxVersion(level-1);
354 			for(Node *x = getNext(level-1); x != end; x = x->getNext(level-1))
355 				v = max(v, x->getMaxVersion(level-1));
356 			setMaxVersion(level, v);
357 		}
358 
destroySkipList::Node359 		void destroy() {
360 			int nodeSize = getNodeSize();
361 			if (nodeSize <= 64) {
362 				FastAllocator<64>::release(this);
363 				INSTRUMENT_RELEASE("SkipListNode64");
364 			} else if (nodeSize <= 128) {
365 				FastAllocator<128>::release(this);
366 				INSTRUMENT_RELEASE("SkipListNode128");
367 			} else {
368 				delete[] (char*)this;
369 				INSTRUMENT_RELEASE("SkipListNodeLarge");
370 			}
371 		}
372 	private:
getNodeSizeSkipList::Node373 		int getNodeSize() { return sizeof(Node) + valueLength + nPointers*(sizeof(Node*)+sizeof(Version)); }
endSkipList::Node374 		uint8_t* end() { return (uint8_t*)(this+1); }
375 		int nPointers,
376 			valueLength;
377 	};
378 
less(const uint8_t * a,int aLen,const uint8_t * b,int bLen)379 	static force_inline bool less( const uint8_t* a, int aLen, const uint8_t* b, int bLen ) {
380 		int len = min(aLen, bLen);
381 		for(int i=0; i<len; i++)
382 			if (a[i] < b[i])
383 				return true;
384 			else if (a[i] > b[i])
385 				return false;
386 
387 		/*int c = memcmp(a,b,min(aLen,bLen));
388 		if (c<0) return true;
389 		if (c>0) return false;*/
390 		return aLen < bLen;
391 	}
392 
393 	Node *header;
394 
destroy()395 	void destroy() {
396 		Node *next, *x;
397 		for(x = header; x; x = next) {
398 			next = x->getNext(0);
399 			x->destroy();
400 		}
401 	}
402 public:
403 	struct Finger{
404 		Node *finger[MaxLevels]; // valid for levels >= level
405 		int level;
406 		Node* x;
407 		Node *alreadyChecked;
408 		StringRef value;
409 
FingerSkipList::Finger410 		Finger() : level(MaxLevels), x(NULL), alreadyChecked(NULL) {}
411 
FingerSkipList::Finger412 		Finger( Node* header, const StringRef& ptr ) :
413 			value(ptr), level(MaxLevels),
414 			alreadyChecked(NULL), x(header)
415 		{
416 		}
417 
initSkipList::Finger418 		void init(const StringRef& value, Node *header){
419 			this->value = value;
420 			x = header;
421 			alreadyChecked = NULL;
422 			level = MaxLevels;
423 		}
424 
425 		// pre: !finished()
prefetchSkipList::Finger426 		force_inline void prefetch() {
427 			Node* next = x->getNext(level-1);
428 			_mm_prefetch( (const char*)next, _MM_HINT_T0 );
429 			//if ( (((intptr_t)next) & 64) == 0 )
430 			_mm_prefetch( (const char*)next+64, _MM_HINT_T0 );
431 			//_mm_prefetch( (const char*)next+128, _MM_HINT_T0 );
432 			//_mm_prefetch( (const char*)next+192, _MM_HINT_T0 );
433 			//_mm_prefetch( (const char*)next+256, _MM_HINT_T0 );
434 			//_mm_prefetch( (const char*)next+320, _MM_HINT_T0 );
435 		}
436 
437 		// pre: !finished()
438 		// Returns true if we have advanced to the next level
advanceSkipList::Finger439 		force_inline bool advance() {
440 			Node* next = x->getNext(level-1);
441 
442 			if (next == alreadyChecked || !less(next->value(), next->length(), value.begin(), value.size())) {
443 				alreadyChecked = next;
444 				level--;
445 				finger[level] = x;
446 				return true;
447 			} else {
448 				x = next;
449 				return false;
450 			}
451 		}
452 
453 		// pre: !finished()
nextLevelSkipList::Finger454 		force_inline void nextLevel() {
455 			while (!advance());
456 		}
457 
finishedSkipList::Finger458 		force_inline bool finished(){
459 			return level == 0;
460 		}
461 
foundSkipList::Finger462 		force_inline Node* found() const {
463 			// valid after finished returns true
464 			Node *n = finger[0]->getNext(0);	// or alreadyChecked, but that is more easily invalidated
465 			if (n && n->length() == value.size() && !memcmp(n->value(), value.begin(), value.size()))
466 				return n;
467 			else
468 				return NULL;
469 		}
470 
getValueSkipList::Finger471 		StringRef getValue() const {
472 			Node* n = finger[0]->getNext(0);
473 			return n ? StringRef( n->value(), n->length() ) : StringRef();
474 		}
475 	};
476 
count()477 	int count() {
478 		int count = 0;
479 		Node* x = header->getNext(0);
480 		while (x) {
481 			x = x->getNext(0);
482 			count++;
483 		}
484 		return count;
485 	}
486 
SkipList(Version version=0)487 	explicit SkipList( Version version = 0 ) {
488 		header = Node::create(StringRef(), MaxLevels-1);
489 		for(int l=0; l<MaxLevels; l++) {
490 			header->setNext(l, NULL);
491 			header->setMaxVersion(l, version);
492 		}
493 	}
~SkipList()494 	~SkipList() {
495 		destroy();
496 	}
SkipList(SkipList && other)497 	SkipList(SkipList&& other) BOOST_NOEXCEPT
498 		: header(other.header)
499 	{
500 		other.header = NULL;
501 	}
operator =(SkipList && other)502 	void operator=(SkipList&& other) BOOST_NOEXCEPT {
503 		destroy();
504 		header = other.header;
505 		other.header = NULL;
506 	}
swap(SkipList & other)507 	void swap( SkipList& other ) {
508 		std::swap(header, other.header);
509 	}
510 
addConflictRanges(const Finger * fingers,int rangeCount,Version version)511 	void addConflictRanges( const Finger* fingers, int rangeCount, Version version ) {
512 		for(int r=rangeCount-1; r>=0; r--) {
513 			const Finger& startF = fingers[r*2];
514 			const Finger& endF = fingers[r*2+1];
515 
516 			if (endF.found()==NULL)
517 				insert(endF, endF.finger[0]->getMaxVersion(0));
518 
519 			remove( startF, endF );
520 			insert( startF, version );
521 		}
522 	}
523 
detectConflicts(ReadConflictRange * ranges,int count,bool * transactionConflictStatus)524 	void detectConflicts( ReadConflictRange* ranges, int count, bool* transactionConflictStatus ) {
525 		const int M = 16;
526 		int nextJob[M];
527 		CheckMax inProgress[ M ];
528 		if (!count) return;
529 
530 		int started = min(M,count);
531 		for(int i=0; i<started; i++){
532 			inProgress[i].init( ranges[i], header, transactionConflictStatus );
533 			nextJob[i] = i+1;
534 		}
535 		nextJob[started-1] = 0;
536 
537 		int prevJob = started-1;
538 		int job = 0;
539 		// vtune: 340 parts
540 		while (true) {
541 			if (inProgress[job].advance()) {
542 				if (started == count){
543 					if (prevJob == job) break;
544 					nextJob[prevJob] = nextJob[job];
545 					job = prevJob;
546 				}
547 				else
548 					inProgress[job].init( ranges[started++], header, transactionConflictStatus );
549 			}
550 			prevJob = job;
551 			job = nextJob[job];
552 		}
553 	}
554 
555 	// Splits the version history represented by this skiplist into separate key ranges
556 	//   delimited by the given array of keys.  This SkipList is left empty.  this->partition
557 	//   is intended to be followed by a call to this->concatenate() recombining the same
558 	//   partitions.  In between, operations on each partition must not touch any keys outside
559 	//   the partition.  Specifically, the partition to the left of 'key' must not have a range
560 	//	 [...,key) inserted, since that would insert an entry at 'key'.
partition(StringRef * begin,int splitCount,SkipList * output)561 	void partition( StringRef* begin, int splitCount, SkipList* output ) {
562 		for(int i=splitCount-1; i>=0; i--) {
563 			Finger f( header, begin[i] );
564 			while (!f.finished())
565 				f.nextLevel();
566 			split(f, output[i+1]);
567 		}
568 		swap(output[0]);
569 	}
570 
concatenate(SkipList * input,int count)571 	void concatenate( SkipList* input, int count ) {
572 		vector<Finger> ends( count-1 );
573 		for(int i=0; i<ends.size(); i++)
574 			input[i].getEnd( ends[i] );
575 
576 		for(int l=0; l<MaxLevels; l++) {
577 			for(int i=ends.size()-1; i>=0; i--) {
578 				ends[i].finger[l]->setNext( l, input[i+1].header->getNext(l) );
579 				if (l && (!i || ends[i].finger[l] != input[i].header))
580 					ends[i].finger[l]->calcVersionForLevel(l);
581 				input[i+1].header->setNext( l, NULL );
582 			}
583 		}
584 		swap(input[0]);
585 	}
586 
find(const StringRef * values,Finger * results,int * temp,int count)587 	void find( const StringRef* values, Finger* results, int* temp, int count ) {
588 		// Relying on the ordering of values, descend until the values aren't all in the
589 		// same part of the tree
590 
591 		// vtune: 11 parts
592 		results[0].init( values[0], header );
593 		const StringRef& endValue = values[count-1];
594 		while ( results[0].level > 1 ) {
595 			results[0].nextLevel();
596 			Node* ac = results[0].alreadyChecked;
597 			if (ac && less(ac->value(), ac->length(), endValue.begin(), endValue.size()))
598 				break;
599 		}
600 
601 		// Init all the other fingers to start descending where we stopped
602 		//   the first one
603 
604 		// SOMEDAY: this loop showed up on vtune, could be faster?
605 		// vtune: 8 parts
606 		int startLevel = results[0].level+1;
607 		Node *x = startLevel<MaxLevels ? results[0].finger[startLevel] : header;
608 		for(int i=1; i<count; i++) {
609 			results[i].level = startLevel;
610 			results[i].x = x;
611 			results[i].alreadyChecked = NULL;
612 			results[i].value = values[i];
613 			for(int j=startLevel; j<MaxLevels; j++)
614 				results[i].finger[j] = results[0].finger[j];
615 		}
616 
617 		int* nextJob = temp;
618 		for (int i=0;i<count-1;i++)
619 			nextJob[i] = i+1;
620 		nextJob[count-1] = 0;
621 
622 		int prevJob = count-1;
623 		int job = 0;
624 
625 		// vtune: 225 parts
626 		while (true) {
627 			Finger* f = &results[job];
628 			f->advance();
629 			if (f->finished()) {
630 				if (prevJob == job) break;
631 				nextJob[prevJob] = nextJob[job];
632 			}
633 			else {
634 				f->prefetch();
635 				prevJob = job;
636 			}
637 			job = nextJob[job];
638 		}
639 	}
640 
641 	/*Finger randomFinger() {
642 		// Written, not exactly uniform, not tested
643 		Finger f( header, StringRef() );
644 		Node* begin = header, *end = 0;
645 		for(int lev = MaxLevels-1; lev>=0; lev--) {
646 			int length = 0;
647 			for( Node* x = begin; x != end; x=x->getNext(lev) )
648 				length++;
649 			if (length == 1) { // forced down
650 				f.finger[lev] = begin;
651 			} else {
652 				int c = g_random->randomInt(0, length);
653 				for( Node* x = begin; x != end; x=x->getNext(lev) )
654 					if (!c--) {
655 						f.finger[lev] = begin = x;
656 						end = x->getNext(lev);
657 						break;
658 					}
659 			}
660 		}
661 		f.level = 0;
662 		return f;
663 	}*/
664 
removeBefore(Version v,Finger & f,int nodeCount)665 	int removeBefore( Version v, Finger& f, int nodeCount ) {
666 		/*Finger f( header, StringRef() );
667 		for(int i=0; i<MaxLevels; i++)
668 			f.finger[i] = header;
669 		f.level = 0;*/
670 		// f.x, f.alreadyChecked?
671 
672 		int removedCount = 0;
673 		bool wasAbove = true;
674 		while (nodeCount--) {
675 			Node* x = f.finger[0]->getNext(0);
676 			if (!x) break;
677 
678 			// double prefetch gives +25% speed (single threaded)
679 			Node* next = x->getNext(0);
680 			_mm_prefetch( (const char*)next, _MM_HINT_T0 );
681 			//_mm_prefetch( (const char*)next+64, _MM_HINT_T0 );
682 			next = x->getNext(1);
683 			_mm_prefetch( (const char*)next, _MM_HINT_T0 );
684 			//_mm_prefetch( (const char*)next+64, _MM_HINT_T0 );
685 
686 			bool isAbove = x->getMaxVersion(0) >= v;
687 			if (isAbove || wasAbove) {  // f.nextItem
688 				for(int l=0; l<=x->level(); l++)
689 					f.finger[l] = x;
690 			} else {					// f.eraseItem
691 				removedCount++;
692 				for(int l=0; l<=x->level(); l++)
693 					f.finger[l]->setNext(l, x->getNext(l));
694 				for(int i=1; i<=x->level(); i++)
695 					f.finger[i]->setMaxVersion( i, max(f.finger[i]->getMaxVersion(i), x->getMaxVersion(i)) );
696 				x->destroy();
697 			}
698 			wasAbove = isAbove;
699 		}
700 
701 		return removedCount;
702 	}
703 
704 private:
remove(const Finger & start,const Finger & end)705 	void remove( const Finger& start, const Finger& end ) {
706 		if (start.finger[0] == end.finger[0])
707 			return;
708 
709 		Node *x = start.finger[0]->getNext(0);
710 
711 		// vtune says: this loop is the expensive parts (6 parts)
712 		for(int i=0; i<MaxLevels; i++)
713 			if (start.finger[i] != end.finger[i])
714 				start.finger[i]->setNext(i, end.finger[i]->getNext(i));
715 
716 		while (true) {
717 			Node* next = x->getNext(0);
718 			x->destroy();
719 			if (x == end.finger[0]) break;
720 			x = next;
721 		}
722 	}
723 
724 	//void insert( const std::string& v, Version version ) { insert(StringRef(v), version); }
725 
insert(const Finger & f,Version version)726 	void insert( const Finger& f, Version version ) {
727 		int level = randomLevel();
728 		//cout << std::string((const char*)value,length) << " level: " << level << endl;
729 		Node *x = Node::create( f.value, level );
730 		x->setMaxVersion(0, version);
731 		for(int i=0; i<=level; i++) {
732 			x->setNext(i, f.finger[i]->getNext(i));
733 			f.finger[i]->setNext(i, x);
734 		}
735 		// vtune says: this loop is the costly part of this function
736 		for(int i=1; i<=level; i++) {
737 			f.finger[i]->calcVersionForLevel(i);
738 			x->calcVersionForLevel(i);
739 		}
740 		for(int i=level+1; i<MaxLevels; i++) {
741 			Version v = f.finger[i]->getMaxVersion(i);
742 			if (v >= version) break;
743 			f.finger[i]->setMaxVersion(i, version);
744 		}
745 	}
746 
insert(const StringRef & value,Version version)747 	void insert( const StringRef& value, Version version ) {
748 		Finger f(header, value);
749 		while (!f.finished())
750 			f.nextLevel();
751 		// SOMEDAY: equality?
752 		insert( f, version );
753 	}
754 
755 	struct CheckMax {
756 		Finger start, end;
757 		Version version;
758 		bool *result;
759 		int state;
760 
initSkipList::CheckMax761 		void init( const ReadConflictRange& r, Node* header, bool* tCS ) {
762 			this->start.init( r.begin, header );
763 			this->end.init( r.end, header );
764 			this->version = r.version;
765 			result = &tCS[ r.transaction ];
766 			this->state = 0;
767 		}
768 
noConflictSkipList::CheckMax769 		bool noConflict() { return true; }
conflictSkipList::CheckMax770 		bool conflict() { *result = true; return true; }
771 
772 		// Return true if finished
advanceSkipList::CheckMax773 		force_inline bool advance() {
774 			switch (state) {
775 			case 0:
776 				// find where start and end fingers diverge
777 				while (true) {
778 					if (!start.advance()) {
779 						start.prefetch();
780 						return false;
781 					}
782 					end.x = start.x;
783 					while (!end.advance());
784 
785 					int l = start.level;
786 					if (start.finger[l] != end.finger[l])
787 						break;
788 					// accept if the range spans the check range, but does not have a greater version
789 					if (start.finger[l]->getMaxVersion(l) <= version)
790 						return noConflict();
791 					if (l==0)
792 						return conflict();
793 				}
794 				state = 1;
795 			case 1:
796 				{
797 					// check the end side of the pyramid
798 					Node *e = end.finger[end.level];
799 					while (e->getMaxVersion(end.level) > version) {
800 						if (end.finished())
801 							return conflict();
802 						end.nextLevel();
803 						Node *f = end.finger[end.level];
804 						while (e != f){
805 							if (e->getMaxVersion(end.level) > version)
806 								return conflict();
807 							e = e->getNext(end.level);
808 						}
809 					}
810 
811 					// check the start side of the pyramid
812 					Node *s = end.finger[start.level];
813 					while (true){
814 						Node *nextS = start.finger[start.level]->getNext(start.level);
815 						Node *p = nextS;
816 						while (p != s){
817 							if (p->getMaxVersion(start.level) > version)
818 								return conflict();
819 							p = p->getNext(start.level);
820 						}
821 						if (start.finger[start.level]->getMaxVersion(start.level) <= version)
822 							return noConflict();
823 						s = nextS;
824 						if (start.finished()) {
825 							if (nextS->length() == start.value.size() && !memcmp(nextS->value(), start.value.begin(), start.value.size()))
826 								return noConflict();
827 							else
828 								return conflict();
829 						}
830 						start.nextLevel();
831 					}
832 				}
833 			default:
834 				__assume(false);
835 			}
836 		}
837 	};
838 
split(const Finger & f,SkipList & right)839 	void split( const Finger& f, SkipList& right ) {
840 		ASSERT( !right.header->getNext(0) );  // right must be empty
841 		right.header->setMaxVersion(0, f.finger[0]->getMaxVersion(0));
842 		for(int l=0; l<MaxLevels; l++) {
843 			right.header->setNext(l, f.finger[l]->getNext(l));
844 			f.finger[l]->setNext(l, NULL);
845 			/*if (l) {
846 				// SOMEDAY: Do we actually need these?
847 				right.header->calcVersionForLevel(l);
848 				f.finger[l]->calcVersionForLevel(l);
849 			}*/
850 		}
851  	}
852 
getEnd(Finger & end)853 	void getEnd( Finger& end ) {
854 		Node* node = header;
855 		for(int l=MaxLevels-1; l>=0; l--) {
856 			Node* next;
857 			while ( (next=node->getNext(l)) != NULL )
858 				node = next;
859 			end.finger[l] = node;
860 		}
861 		end.level = 0;
862 		// SOMEDAY: end.x? end.alreadyChecked?
863 		/*end = Finger(header, (const uint8_t*)"\xff\xff\xff\xff\xff\xff", 6);
864 		while (!end.finished())
865 			end.nextLevel();*/
866 	}
867 };
868 
869 struct Action {
870 	virtual void operator()() = 0;		// self-destructs
871 };
872 typedef Action* PAction;
873 
874 template <class F>
action(F && f)875 PAction action( F && f ) {
876 	struct FAction : Action, F, FastAllocated<FAction> {
877 		FAction( F&& f ) : F(std::move(f)) {}
878 		virtual void operator()() { F::operator()(); delete this; }
879 	};
880 	return new FAction( std::move(f) );
881 };
882 
workerThread(PAction * nextAction,Event * nextActionReady,int index,Event * whenFinished)883 void workerThread( PAction* nextAction, Event* nextActionReady, int index, Event* whenFinished ) {
884 	ASSERT(false);
885 	/*
886 	inThread<Void>( [nextAction,nextActionReady,index,whenFinished]()->Void {
887 		g_seed = index*123; fastrand();
888 		setAffinity( index );
889 		while (true) {
890 			try {
891 				nextActionReady->block();   // auto-reset
892 				Action* action = *nextAction;
893 				*nextAction = 0;
894 				if (!action) break;
895 
896 				(*action)();
897 			} catch (Error& e) {
898 				fprintf(stderr, "Error in worker thread: %s\n", e.what());
899 			} catch (...) {
900 				fprintf(stderr, "Error in worker thread: %s\n", unknown_error().what());
901 			}
902 		}
903 		//cout << "Worker thread finished" << endl;
904 		whenFinished->set();
905 		return Void();
906 	});*/
907 }
908 
setK(Arena & arena,int i)909 StringRef setK( Arena& arena, int i ) {
910 	char t[ sizeof(i) ];
911 	*(int*)t = i;
912 
913 	const int keySize = 16;
914 
915 	char* ss = new (arena) char[ keySize ];
916 	for(int c=0; c<keySize-sizeof(i); c++)
917 		ss[c] = '.';
918 	for(int c=0; c<sizeof(i); c++)
919 		ss[c+keySize-sizeof(i)] = t[sizeof(i)-1-c];
920 
921 	return StringRef( (const uint8_t*)ss, keySize );
922 }
923 
924 #include "fdbserver/ConflictSet.h"
925 
926 struct ConflictSet {
ConflictSetConflictSet927 	ConflictSet() : oldestVersion(0) {
928 		static_assert(PARALLEL_THREAD_COUNT == 0, "workerThread() not implemented");
929 		static_assert(PARALLEL_THREAD_COUNT == 0 || FASTALLOC_THREAD_SAFE, "Thread safe fast allocator required for multithreaded conflict set");
930 		for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) {
931 			worker_nextAction.push_back( NULL );
932 			worker_ready.push_back( new Event );
933 			worker_finished.push_back( new Event );
934 		}
935 		for(int t=0; t<worker_nextAction.size(); t++)
936 			workerThread( &worker_nextAction[t], worker_ready[t], (t)*2, worker_finished[t] );
937 	}
~ConflictSetConflictSet938 	~ConflictSet() {
939 		for(int i=0; i<worker_nextAction.size(); i++) {
940 			worker_nextAction[i] = 0;
941 			worker_ready[i]->set();
942 		}
943 		// Wait for workers to terminate; otherwise can get crashes at shutdown time
944 		for(int i=0; i<worker_finished.size(); i++)
945 			worker_finished[i]->block();
946 	}
947 
948 	SkipList versionHistory;
949 	Key removalKey;
950 	Version oldestVersion;
951 	vector<PAction> worker_nextAction;
952 	vector<Event*> worker_ready;
953 	vector<Event*> worker_finished;
954 };
955 
newConflictSet()956 ConflictSet* newConflictSet() { return new ConflictSet; }
clearConflictSet(ConflictSet * cs,Version v)957 void clearConflictSet( ConflictSet* cs, Version v ) {
958 	SkipList(v).swap( cs->versionHistory );
959 }
destroyConflictSet(ConflictSet * cs)960 void destroyConflictSet(ConflictSet* cs) {
961 	delete cs;
962 }
963 
ConflictBatch(ConflictSet * cs)964 ConflictBatch::ConflictBatch( ConflictSet* cs )
965 	: cs(cs), transactionCount(0)
966 {
967 }
968 
~ConflictBatch()969 ConflictBatch::~ConflictBatch()
970 {
971 }
972 
973 struct TransactionInfo {
974 	VectorRef< std::pair<int,int> > readRanges;
975 	VectorRef< std::pair<int,int> > writeRanges;
976 	bool tooOld;
977 };
978 
addTransaction(const CommitTransactionRef & tr)979 void ConflictBatch::addTransaction( const CommitTransactionRef& tr ) {
980 	int t = transactionCount++;
981 
982 	Arena& arena = transactionInfo.arena();
983 	TransactionInfo* info = new (arena) TransactionInfo;
984 
985 	if (tr.read_snapshot < cs->oldestVersion && tr.read_conflict_ranges.size()) {
986 		info->tooOld = true;
987 	} else {
988 		info->tooOld = false;
989 		info->readRanges.resize( arena, tr.read_conflict_ranges.size() );
990 		info->writeRanges.resize( arena, tr.write_conflict_ranges.size() );
991 
992 		vector<KeyInfo> &points = this->points;
993 		for(int r=0; r<tr.read_conflict_ranges.size(); r++) {
994 			const KeyRangeRef& range = tr.read_conflict_ranges[r];
995 			points.push_back( KeyInfo( range.begin, false, true, false, t, &info->readRanges[r].first ) );
996 			//points.back().keyEnd = StringRef(buf,range.second);
997 			points.push_back( KeyInfo( range.end, false, false, false, t, &info->readRanges[r].second ) );
998 			combinedReadConflictRanges.push_back( ReadConflictRange( range.begin, range.end, tr.read_snapshot, t ) );
999 		}
1000 		for(int r=0; r<tr.write_conflict_ranges.size(); r++) {
1001 			const KeyRangeRef& range = tr.write_conflict_ranges[r];
1002 			points.push_back( KeyInfo( range.begin, false, true, true, t, &info->writeRanges[r].first ) );
1003 			points.push_back( KeyInfo( range.end, false, false, true, t, &info->writeRanges[r].second ) );
1004 		}
1005 	}
1006 
1007 	this->transactionInfo.push_back( arena, info );
1008 }
1009 
1010 class MiniConflictSet2 : NonCopyable {
1011 	vector<bool> values;
1012 public:
MiniConflictSet2(int size)1013 	explicit MiniConflictSet2( int size ) {
1014 		values.assign( size, false );
1015 	}
set(int begin,int end)1016 	void set( int begin, int end ) {
1017 		for(int i=begin; i<end; i++)
1018 			values[i] = true;
1019 	}
any(int begin,int end)1020 	bool any( int begin, int end ) {
1021 		for(int i=begin; i<end; i++)
1022 			if (values[i])
1023 				return true;
1024 		return false;
1025 	}
1026 };
1027 
1028 class MiniConflictSet : NonCopyable {
1029 	typedef uint64_t wordType;
1030 	enum { bucketShift = 6, bucketMask=sizeof(wordType)*8-1 };
1031 	vector<wordType> values; // undefined when andValues is true for a range of values
1032 	vector<wordType> orValues;
1033 	vector<wordType> andValues;
1034 	MiniConflictSet2 debug;		// SOMEDAY: Test on big ranges, eliminate this
1035 
bitMask(unsigned int bit)1036 	uint64_t bitMask(unsigned int bit){ // computes results for bit%word
1037 		return (((wordType)1) << ( bit & bucketMask )); // '&' unnecesary?
1038 	}
setNthBit(vector<wordType> & v,const unsigned int bit)1039 	void setNthBit(vector<wordType> &v, const unsigned int bit){
1040 		v[bit>>bucketShift] |= bitMask(bit);
1041 	}
clearNthBit(vector<wordType> & v,const unsigned int bit)1042 	void clearNthBit(vector<wordType> &v, const unsigned int bit){
1043 		v[bit>>bucketShift] &= ~(bitMask(bit));
1044 	}
getNthBit(const vector<wordType> & v,const unsigned int bit)1045 	bool getNthBit(const vector<wordType> &v, const unsigned int bit){
1046 		return (v[bit>>bucketShift] & bitMask(bit)) != 0;
1047 	}
wordsForNBits(unsigned int bits)1048 	int wordsForNBits(unsigned int bits){
1049 		return (bits+((1<<bucketShift)-1))>>bucketShift;
1050 	}
highBits(int b)1051 	wordType highBits(int b){ // bits (b&bucketMask) and higher are 1
1052 		#pragma warning(disable: 4146)
1053 		return -(wordType(1) << b);
1054 		#pragma warning(default: 4146)
1055 	}
lowBits(int b)1056 	wordType lowBits(int b){ // bits lower than b are 1
1057 		return (wordType(1)<<b)-1;
1058 	}
lowBits2(int b)1059 	wordType lowBits2(int b) {
1060 		return (b&bucketMask) ? lowBits(b) : -1;
1061 	}
1062 
setBits(vector<wordType> & v,int bitBegin,int bitEnd,bool fillMiddle)1063 	void setBits(vector<wordType> &v, int bitBegin, int bitEnd, bool fillMiddle){
1064 		if (bitBegin >= bitEnd)	return;
1065 		int beginWord = bitBegin>>bucketShift;
1066 		int lastWord = ((bitEnd+bucketMask) >> bucketShift) - 1;
1067 		if (beginWord == lastWord){
1068 			v[beginWord] |= highBits(bitBegin) & lowBits2(bitEnd);
1069 		} else {
1070 			v[beginWord] |= highBits(bitBegin);
1071 			if (fillMiddle)
1072 				for(int w=beginWord+1;w<lastWord;w++)
1073 					v[w] = wordType(-1);
1074 			v[lastWord] |= lowBits2(bitEnd);
1075 		}
1076 	}
1077 
orBits(vector<wordType> & v,int bitBegin,int bitEnd,bool getMiddle)1078 	bool orBits(vector<wordType> &v, int bitBegin, int bitEnd, bool getMiddle) {
1079 		if (bitBegin >= bitEnd) return false;
1080 		int beginWord = bitBegin >> bucketShift;
1081 		int lastWord = ((bitEnd+bucketMask) >> bucketShift) - 1;
1082 		if (beginWord == lastWord)
1083 			return (v[beginWord] & highBits(bitBegin) & lowBits2(bitEnd)) != 0;
1084 		else {
1085 			if (getMiddle)
1086 				for(int w=beginWord+1; w<lastWord; w++)
1087 					if (v[w])
1088 						return true;
1089 			return ((v[beginWord] & highBits(bitBegin)) | (v[lastWord] & lowBits2(bitEnd))) != 0;
1090 		}
1091 	}
1092 
1093 public:
MiniConflictSet(int size)1094 	explicit MiniConflictSet( int size ) : debug(size) {
1095 		static_assert((1<<bucketShift) == sizeof(wordType)*8, "BucketShift incorrect");
1096 
1097 		values.assign( wordsForNBits(size), false );
1098 		orValues.assign( wordsForNBits(wordsForNBits(size)), false);
1099 		andValues.assign( wordsForNBits(wordsForNBits(size)), false);
1100 	}
1101 
set(int begin,int end)1102 	void set( int begin, int end ) {
1103 		debug.set(begin,end);
1104 		if (begin == end) return;
1105 
1106 		int beginWord = begin>>bucketShift;
1107 		int lastWord = ((end+bucketMask) >> bucketShift) - 1;
1108 
1109 		setBits(values, begin, end, false);
1110 		setBits(andValues, beginWord+1, lastWord, true);
1111 		setBits(orValues, beginWord, lastWord+1, true);
1112 	}
1113 
any(int begin,int end)1114 	bool any(int begin, int end) {
1115 		bool a = orImpl(begin,end);
1116 		bool b = debug.any(begin,end);
1117 		ASSERT( a == b );
1118 		return b;
1119 	}
1120 
orImpl(int begin,int end)1121 	bool orImpl( int begin, int end ) {
1122 		if (begin == end) return false;
1123 		int beginWord = begin>>bucketShift;
1124 		int lastWord = ((end+bucketMask) >> bucketShift) - 1;
1125 
1126 		return orBits( orValues, beginWord+1, lastWord, true ) ||
1127 			getNthBit( andValues, beginWord ) || getNthBit( andValues, lastWord ) ||
1128 			orBits( values, begin, end, false );
1129 	}
1130 };
1131 
1132 
checkIntraBatchConflicts()1133 void ConflictBatch::checkIntraBatchConflicts() {
1134 	int index = 0;
1135 	for(int p=0; p<points.size(); p++)
1136 		*points[p].pIndex = index++;
1137 
1138 	MiniConflictSet mcs( index );
1139 	for(int t=0; t<transactionInfo.size(); t++) {
1140 		const TransactionInfo& tr = *transactionInfo[t];
1141 		if (transactionConflictStatus[t]) continue;
1142 		bool conflict = tr.tooOld;
1143 		for(int i=0; i<tr.readRanges.size(); i++)
1144 			if ( mcs.any( tr.readRanges[i].first, tr.readRanges[i].second ) ) {
1145 				conflict = true;
1146 				break;
1147 			}
1148 		transactionConflictStatus[t] = conflict;
1149 		if (!conflict)
1150 			for(int i=0; i<tr.writeRanges.size(); i++)
1151 				mcs.set( tr.writeRanges[i].first, tr.writeRanges[i].second );
1152 	}
1153 }
1154 
GetTooOldTransactions(vector<int> & tooOldTransactions)1155 void ConflictBatch::GetTooOldTransactions(vector<int>& tooOldTransactions) {
1156 	for (int i = 0; i<transactionInfo.size(); i++) {
1157 		if (transactionInfo[i]->tooOld) {
1158 			tooOldTransactions.push_back(i);
1159 		}
1160 	}
1161 }
1162 
detectConflicts(Version now,Version newOldestVersion,vector<int> & nonConflicting,vector<int> * tooOldTransactions)1163 void ConflictBatch::detectConflicts(Version now, Version newOldestVersion, vector<int>& nonConflicting, vector<int>* tooOldTransactions) {
1164 	double t = timer();
1165 	sortPoints( points );
1166 	//std::sort( combinedReadConflictRanges.begin(), combinedReadConflictRanges.end() );
1167 	g_sort += timer()-t;
1168 
1169 	transactionConflictStatus = new bool[ transactionCount ];
1170 	memset(transactionConflictStatus, 0, transactionCount*sizeof(bool));
1171 
1172 	t = timer();
1173 	checkReadConflictRanges();
1174 	g_checkRead += timer()-t;
1175 
1176 	t = timer();
1177 	checkIntraBatchConflicts();
1178 	g_checkBatch += timer()-t;
1179 
1180 	t = timer();
1181 	combineWriteConflictRanges();
1182 	g_combine += timer()-t;
1183 
1184 	t = timer();
1185 	mergeWriteConflictRanges(now);
1186 	g_merge += timer()-t;
1187 
1188 	for (int i = 0; i < transactionCount; i++)
1189 	{
1190 		if (!transactionConflictStatus[i])
1191 			nonConflicting.push_back( i );
1192 		if (tooOldTransactions && transactionInfo[i]->tooOld)
1193 			tooOldTransactions->push_back(i);
1194 	}
1195 
1196 	delete[] transactionConflictStatus;
1197 
1198 	t = timer();
1199 	if (newOldestVersion > cs->oldestVersion) {
1200 		cs->oldestVersion = newOldestVersion;
1201 		SkipList::Finger finger;
1202 		int temp;
1203 		cs->versionHistory.find( &cs->removalKey, &finger, &temp, 1 );
1204 		cs->versionHistory.removeBefore( cs->oldestVersion, finger, combinedWriteConflictRanges.size()*3 + 10 );
1205 		cs->removalKey = finger.getValue();
1206 	}
1207 	g_removeBefore += timer()-t;
1208 }
1209 
checkReadConflictRanges()1210 void ConflictBatch::checkReadConflictRanges() {
1211 	if (!combinedReadConflictRanges.size())
1212 		return;
1213 
1214 	if (PARALLEL_THREAD_COUNT) {
1215 		Event done[PARALLEL_THREAD_COUNT?PARALLEL_THREAD_COUNT:1];
1216 		for(int t=0; t<PARALLEL_THREAD_COUNT; t++) {
1217 			cs->worker_nextAction[t] = action( [&,t] {
1218 #pragma GCC diagnostic push
1219 DISABLE_ZERO_DIVISION_FLAG
1220 				auto begin = &combinedReadConflictRanges[0] + t*combinedReadConflictRanges.size()/PARALLEL_THREAD_COUNT;
1221 				auto end = &combinedReadConflictRanges[0] + (t+1)*combinedReadConflictRanges.size()/PARALLEL_THREAD_COUNT;
1222 #pragma GCC diagnostic pop
1223 				cs->versionHistory.detectConflicts( begin, end-begin, transactionConflictStatus );
1224 				done[t].set();
1225 			});
1226 			cs->worker_ready[t]->set();
1227 		}
1228 		for(int i=0; i<PARALLEL_THREAD_COUNT; i++)
1229 			done[i].block();
1230 	} else {
1231 		cs->versionHistory.detectConflicts( &combinedReadConflictRanges[0], combinedReadConflictRanges.size(), transactionConflictStatus );
1232 	}
1233 }
1234 
addConflictRanges(Version now,vector<pair<StringRef,StringRef>>::iterator begin,vector<pair<StringRef,StringRef>>::iterator end,SkipList * part)1235 void ConflictBatch::addConflictRanges(Version now, vector< pair<StringRef,StringRef> >::iterator begin, vector< pair<StringRef,StringRef> >::iterator end,SkipList* part) {
1236 	int count = end-begin;
1237 #if 0
1238 	//for(auto w = begin; w != end; ++w)
1239 	for(auto w = end-1; w != begin-1; --w)
1240 		part->addConflictRange( w->first, w->second, now );
1241 #else
1242 	static_assert( sizeof( begin[0] ) == sizeof(StringRef)*2, "Write Conflict Range type not convertible to two StringPtrs" );
1243 	const StringRef* strings = reinterpret_cast<const StringRef*>( &*begin );
1244 	int stringCount = count*2;
1245 
1246 	static const int stripeSize = 16;
1247 	SkipList::Finger fingers[ stripeSize ];
1248 	int temp[ stripeSize ];
1249 	int stripes = (stringCount+stripeSize-1)/stripeSize;
1250 
1251 	int ss = stringCount - (stripes-1)*stripeSize;
1252 	for(int s=stripes-1; s>=0; s--) {
1253 		part->find( &strings[s * stripeSize], &fingers[0], temp, ss );
1254 		part->addConflictRanges( &fingers[0], ss/2, now );
1255 		ss = stripeSize;
1256 	}
1257 #endif
1258 }
1259 
mergeWriteConflictRanges(Version now)1260 void ConflictBatch::mergeWriteConflictRanges(Version now) {
1261 	if (!combinedWriteConflictRanges.size())
1262 		return;
1263 
1264 	if (PARALLEL_THREAD_COUNT) {
1265 		vector<SkipList> parts;
1266 		for (int i = 0; i < PARALLEL_THREAD_COUNT; i++)
1267 			parts.push_back(SkipList());
1268 
1269 		vector<StringRef> splits( parts.size()-1 );
1270 		for(int s=0; s<splits.size(); s++)
1271 			splits[s] = combinedWriteConflictRanges[ (s+1)*combinedWriteConflictRanges.size()/parts.size() ].first;
1272 
1273 		cs->versionHistory.partition( splits.size() ? &splits[0] : NULL, splits.size(), &parts[0] );
1274 		vector<double> tstart(PARALLEL_THREAD_COUNT), tend(PARALLEL_THREAD_COUNT);
1275 		Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1];
1276 		double before = timer();
1277 		for(int t=0; t<parts.size(); t++) {
1278 			cs->worker_nextAction[t] = action( [&,t] {
1279 				tstart[t] = timer();
1280 				auto begin = combinedWriteConflictRanges.begin() + (t*combinedWriteConflictRanges.size()/parts.size());
1281 				auto end = combinedWriteConflictRanges.begin() + ((t+1)*combinedWriteConflictRanges.size()/parts.size());
1282 
1283 				addConflictRanges(now, begin, end, &parts[t]);
1284 
1285 				tend[t] = timer();
1286 				done[t].set();
1287 			});
1288 			cs->worker_ready[t]->set();
1289 		}
1290 		double launch = timer();
1291 		for(int i=0; i<PARALLEL_THREAD_COUNT; i++)
1292 			done[i].block();
1293 		double after = timer();
1294 
1295 		g_merge_launch += launch-before;
1296 		//g_merge_start_var += *std::max_element(tstart.begin(), tstart.end()) - before;
1297 		g_merge_fork += *std::min_element(tstart.begin(), tstart.end()) - before;
1298 		g_merge_start_var += *std::max_element(tstart.begin(), tstart.end()) - *std::min_element(tstart.begin(), tstart.end());
1299 		g_merge_end_var += *std::max_element(tend.begin(), tend.end()) - *std::min_element(tend.begin(), tend.end());
1300 		g_merge_join += after - *std::max_element(tend.begin(), tend.end());
1301 		double run_max = 0, run_min = 1e9;
1302 		for(int i=0; i<tend.size(); i++) {
1303 			run_max = max(run_max, tend[i]-tstart[i]);
1304 			run_min = min(run_min, tend[i]-tstart[i]);
1305 		}
1306 		g_merge_run_var += run_max-run_min;
1307 		g_merge_run_shortest += run_min;
1308 		g_merge_run_longest += run_max;
1309 		g_merge_run_total += std::accumulate(tend.begin(),tend.end(),0.0)-std::accumulate(tstart.begin(),tstart.end(),0.0);
1310 
1311 		cs->versionHistory.concatenate( &parts[0], parts.size() );
1312 	} else {
1313 		addConflictRanges( now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), &cs->versionHistory );
1314 	}
1315 
1316 	//for(auto w = combinedWriteConflictRanges.begin(); w != combinedWriteConflictRanges.end(); ++w)
1317 	//	versionHistory.addConflictRange( w->first.begin(), w->first.size(), w->second.begin(), w->second.size(), now );
1318 }
1319 
combineWriteConflictRanges()1320 void ConflictBatch::combineWriteConflictRanges()
1321 {
1322 	int activeWriteCount = 0;
1323 	for(int i=0; i<points.size(); i++) {
1324 		KeyInfo& point = points[i];
1325 		if (point.write && !transactionConflictStatus[ point.transaction ]) {
1326 			if (point.begin) {
1327  				activeWriteCount++;
1328   				if (activeWriteCount == 1)
1329   					combinedWriteConflictRanges.push_back( make_pair( point.key, KeyRef() ) );
1330 			} else /*if (point.end)*/ {
1331 				activeWriteCount--;
1332 				if (activeWriteCount == 0)
1333 					combinedWriteConflictRanges.back().second = point.key;
1334 			}
1335 		}
1336 	}
1337 }
1338 
1339 //void showNumaStatus();
1340 
1341 /*
1342 bool sse4Less( const uint8_t* a, int aLen, const uint8_t* b, int bLen ) {
1343 	while (true) {
1344 		int res = _mm_cmpestri(*(__m128i*)a, aLen, *(__m128i*)b, bLen, _SIDD_UBYTE_OPS | _SIDD_CMP_EQUAL_EACH | _SIDD_NEGATIVE_POLARITY | _SIDD_LEAST_SIGNIFICANT );
1345 		printf("%d ", res);
1346 		if (res == 16) {
1347 			if (bLen < 16) return false;
1348 			a += 16; b += 16; aLen -= 16; bLen -= 16;
1349 		}
1350 		if (res == bLen) return false;
1351 		if (res == aLen) return true;
1352 
1353 		return a[res] < b[res];
1354 	}
1355 }
1356 
1357 void tless( const char* a, const char* b ) {
1358 	bool x = sse4Less( (const uint8_t*)a, strlen(a), (const uint8_t*)b, strlen(b) );
1359 	if (x)
1360 		printf("%s < %s\n", a, b);
1361 	else
1362 		printf("%s >= %s\n", a, b);
1363 }
1364 
1365 void sse4Test(){
1366 
1367 	tless("hello", "world");
1368 	tless("a", "a");
1369 	tless("world", "hello");
1370 	tless("world", "worry");
1371 	tless("worry", "world");
1372 	tless("hello", "hello1");
1373 	tless("hello1", "hello");
1374 
1375 	tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld");
1376 	tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello");
1377 	tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworry");
1378 	tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworry", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld");
1379 	tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello1");
1380 	tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello");
1381 
1382 	char *a = "hello1worldthisisalonglonglongstring";
1383 	char *b = "hello";
1384 	__m128i aa = *(__m128i*)a;
1385 	__m128i bb = *(__m128i*)a;
1386 
1387 	int res = _mm_cmpestri(aa, 2, bb, 2, _SIDD_UBYTE_OPS | _SIDD_CMP_EQUAL_EACH | _SIDD_NEGATIVE_POLARITY | _SIDD_LEAST_SIGNIFICANT );
1388 
1389 	cout << res << endl;
1390 
1391 }
1392 */
1393 
miniConflictSetTest()1394 void miniConflictSetTest() {
1395 	for(int i=0; i<2000000; i++) {
1396 		int size = 64*5;		// Also run 64*64*5 to test multiple words of andValues and orValues
1397 		MiniConflictSet mini(size);
1398 		for(int j=0; j<2; j++) {
1399 			int a = g_random->randomInt(0, size);
1400 			int b = g_random->randomInt(a, size);
1401 			mini.set( a, b );
1402 		}
1403 		for(int j=0; j<4; j++) {
1404 			int a = g_random->randomInt(0, size);
1405 			int b = g_random->randomInt(a, size);
1406 			mini.any( a, b );	// Tests correctness internally
1407 		}
1408 	}
1409 	printf("miniConflictSetTest complete\n");
1410 }
1411 
skipListTest()1412 void skipListTest() {
1413 	printf("Skip list test\n");
1414 
1415 	//sse4Test();
1416 
1417 	//A test case that breaks the old operator<
1418 	//KeyInfo a( LiteralStringRef("hello"), true, false, true, -1 );
1419 	//KeyInfo b( LiteralStringRef("hello\0"), false, false, false, 0 );
1420 
1421 	miniConflictSetTest();
1422 
1423 
1424 	setAffinity(0);
1425 	//showNumaStatus();
1426 
1427 	double start;
1428 
1429 	ConflictSet* cs = newConflictSet();
1430 
1431 	Arena testDataArena;
1432 	VectorRef< VectorRef<KeyRangeRef> > testData;
1433 	testData.resize(testDataArena, 500);
1434 	vector<vector<uint8_t>> success( testData.size() );
1435 	vector<vector<uint8_t>> success2( testData.size() );
1436 	for(int i=0; i<testData.size(); i++) {
1437 		testData[i].resize(testDataArena, 5000);
1438 		success[i].assign( testData[i].size(), false );
1439 		success2[i].assign( testData[i].size(), false );
1440 		for(int j=0; j<testData[i].size(); j++) {
1441 			int key = g_random->randomInt(0, 20000000);
1442 			int key2 = key + 1 + g_random->randomInt(0, 10);
1443 			testData[i][j] = KeyRangeRef(
1444 				setK( testDataArena, key ),
1445 				setK( testDataArena, key2 ) );
1446 		}
1447 	}
1448 	printf("Test data generated (%d)\n", g_random->randomInt(0,100000));
1449 	printf("  %d threads, %d batches, %d/batch\n", PARALLEL_THREAD_COUNT, testData.size(), testData[0].size());
1450 
1451 	printf("Running\n");
1452 
1453 	int readCount = 1, writeCount = 1;
1454 	int cranges = 0, tcount = 0;
1455 
1456 	start = timer();
1457 	vector<vector<int>> nonConflict( testData.size() );
1458 	for(int i=0; i<testData.size(); i++) {
1459 		Arena buf;
1460 		vector<CommitTransactionRef> trs;
1461 		double t = timer();
1462 		for(int j=0; j+readCount+writeCount<=testData[i].size(); j+=readCount+writeCount) {
1463 			CommitTransactionRef tr;
1464 			for(int k=0; k<readCount; k++) {
1465 				KeyRangeRef r( buf, testData[i][j+k] );
1466 				tr.read_conflict_ranges.push_back( buf, r );
1467 			}
1468 			for(int k=0; k<writeCount; k++) {
1469 				KeyRangeRef r( buf, testData[i][j+readCount+k] );
1470 				tr.write_conflict_ranges.push_back( buf, r );
1471 			}
1472 			cranges += tr.read_conflict_ranges.size() + tr.write_conflict_ranges.size();
1473 			tr.read_snapshot = i;
1474 			trs.push_back(tr);
1475 		}
1476 		tcount += trs.size();
1477 		g_buildTest += timer()-t;
1478 
1479 		t = timer();
1480 		ConflictBatch batch( cs );
1481 		for(int j=0; j<trs.size(); j++)
1482 			batch.addTransaction( trs[j] );
1483 		g_add += timer()-t;
1484 
1485 		t = timer();
1486 		batch.detectConflicts( i+50, i, nonConflict[i] );
1487 		g_detectConflicts += timer()-t;
1488 	}
1489 	double elapsed = timer()-start;
1490 	printf("New conflict set: %0.3f sec\n", elapsed);
1491 	printf("                  %0.3f Mtransactions/sec\n", tcount/elapsed/1e6);
1492 	printf("                  %0.3f Mkeys/sec\n", cranges*2/elapsed/1e6);
1493 
1494 	elapsed = g_detectConflicts.getValue();
1495 	printf("Detect only:      %0.3f sec\n", elapsed);
1496 	printf("                  %0.3f Mtransactions/sec\n", tcount/elapsed/1e6);
1497 	printf("                  %0.3f Mkeys/sec\n", cranges*2/elapsed/1e6);
1498 
1499 	elapsed = g_checkRead.getValue() + g_merge.getValue();
1500 	printf("Skiplist only:    %0.3f sec\n", elapsed);
1501 	printf("                  %0.3f Mtransactions/sec\n", tcount/elapsed/1e6);
1502 	printf("                  %0.3f Mkeys/sec\n", cranges*2/elapsed/1e6);
1503 
1504 	printf("Performance counters:\n");
1505 	for(int c=0; c<skc.size(); c++) {
1506 		printf("%20s: %s\n", skc[c]->getMetric().name().c_str(), skc[c]->getMetric().formatted().c_str());
1507 	}
1508 
1509 	//showNumaStatus();
1510 
1511 	printf("%d entries in version history\n", cs->versionHistory.count());
1512 
1513 	/*start = timer();
1514 	vector<vector<int>> nonConflict2( testData.size() );
1515 	SlowConflictSet scs;
1516 	Standalone<VectorRef<KeyRangeRef>> ranges;
1517 	ranges.resize( ranges.arena(), 1 );
1518 
1519 	for(int i=0; i<testData.size(); i++) {
1520 		for(int j=0; j<testData[i].size(); j++) {
1521 			ranges[0] = testData[i][j];
1522 			if (!scs.is_conflict( ranges, i )) {
1523 				nonConflict2[i].push_back( j );
1524 				scs.add( ranges, VectorRef<KeyValueRef>(), i + 50 );
1525 			}
1526 		}
1527 	}
1528 	printf("Old conflict set: %0.3f sec\n", timer()-start);
1529 
1530 	int aminusb=0, bminusa=0, atotal=0;
1531 	for(int i=0; i<testData.size(); i++) {
1532 		vector<bool> a( testData[i].size() ), b( testData[i].size() );
1533 		for(int j=0; j<nonConflict[i].size(); j++)
1534 			a[ nonConflict[i][j] ] = true;
1535 		for(int j=0; j<nonConflict2[i].size(); j++)
1536 			b[ nonConflict2[i][j] ] = true;
1537 		for(int j=0; j<a.size(); j++) {
1538 			if (a[j]) atotal++;
1539 			if (a[j] && !b[j]) aminusb++;
1540 			else if (b[j] && !a[j]) bminusa++;
1541 		}
1542 	}
1543 	printf("%d transactions accepted\n", atotal);
1544 	if (bminusa)
1545 		printf("ERROR: %d transactions unnecessarily rejected!\n", bminusa);
1546 	if (aminusb)
1547 		printf("ERROR: %d transactions incorrectly accepted!\n", aminusb);
1548 		*/
1549 	//for(int i=0; i<testData.size(); i++)
1550 	//	printf("%d %d %d %d\n", i, nonConflict[i].size(), nonConflict2[i].size()-nonConflict[i].size(), nonConflict[i] != nonConflict2[i]);
1551 }
1552