1 /*
2  * RYWPerformance.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbrpc/ContinuousSample.h"
22 #include "fdbclient/NativeAPI.actor.h"
23 #include "fdbserver/TesterInterface.actor.h"
24 #include "fdbclient/ReadYourWrites.h"
25 #include "fdbserver/workloads/workloads.actor.h"
26 #include "flow/actorcompiler.h"  // This must be the last #include.
27 
28 struct RYWPerformanceWorkload : TestWorkload {
29 	int keyBytes, nodes, ranges;
RYWPerformanceWorkloadRYWPerformanceWorkload30 	RYWPerformanceWorkload(WorkloadContext const& wcx)
31 		: TestWorkload(wcx)
32 	{
33 		nodes = getOption( options, LiteralStringRef("nodes"), 10000 );
34 		ranges = getOption( options, LiteralStringRef("ranges"), 10 );
35 		keyBytes = std::max( getOption( options, LiteralStringRef("keyBytes"), 16 ), 16 );
36 	}
37 
descriptionRYWPerformanceWorkload38 	virtual std::string description() { return "RYWPerformance"; }
39 
setupRYWPerformanceWorkload40 	virtual Future<Void> setup( Database const& cx ) {
41 		if( clientId == 0 )
42 			return _setup( cx, this );
43 		return Void();
44 	}
45 
_setupRYWPerformanceWorkload46 	ACTOR Future<Void> _setup( Database cx, RYWPerformanceWorkload* self ) {
47 		state Transaction tr(cx);
48 
49 		loop {
50 			try {
51 				for(int i = 0; i < self->nodes; i++) tr.set(self->keyForIndex(i), LiteralStringRef("bar"));
52 
53 				wait( tr.commit() );
54 				break;
55 			} catch (Error& e) {
56 				wait( tr.onError(e) );
57 			}
58 		}
59 
60 		return Void();
61 	}
62 
startRYWPerformanceWorkload63 	virtual Future<Void> start( Database const& cx ) {
64 		if( clientId == 0 )
65 			return _start( cx, this );
66 		return Void();
67 	}
68 
fillCacheRYWPerformanceWorkload69 	ACTOR static Future<Void> fillCache( ReadYourWritesTransaction *tr, RYWPerformanceWorkload* self, int type ) {
70 		state int i;
71 		if( type == 0 ) {
72 			for( i = 0; i < self->nodes; i++ ) {
73 				tr->set( self->keyForIndex(i), LiteralStringRef("foo"));
74 			}
75 		} else if( type == 1 ) {
76 			std::vector<Future<Optional<Value>>> gets;
77 			for( i = 0; i < self->nodes; i++ ) {
78 				gets.push_back( tr->get( self->keyForIndex(i) ) );
79 			}
80 			wait( waitForAll(gets) );
81 		} else if( type == 2 ) {
82 			std::vector<Future<Optional<Value>>> gets;
83 			for( i = 0; i < self->nodes; i++ ) {
84 				gets.push_back( tr->get( self->keyForIndex(i) ) );
85 			}
86 			wait( waitForAll(gets) );
87 			for( i = 0; i < self->nodes; i++ ) {
88 				tr->set( self->keyForIndex(i), LiteralStringRef("foo"));
89 			}
90 		} else if( type == 3 ) {
91 			std::vector<Future<Optional<Value>>> gets;
92 			for( i = 0; i < self->nodes; i+=2 ) {
93 				gets.push_back( tr->get( self->keyForIndex(i) ) );
94 			}
95 			wait( waitForAll(gets) );
96 			for( i = 1; i < self->nodes; i+=2 ) {
97 				tr->set( self->keyForIndex(i), LiteralStringRef("foo"));
98 			}
99 		} else if( type == 4 ) {
100 			wait(success( tr->getRange(KeyRangeRef(self->keyForIndex(0),self->keyForIndex(self->nodes)),self->nodes )));
101 		} else if( type == 5 ) {
102 			wait(success( tr->getRange(KeyRangeRef(self->keyForIndex(0),self->keyForIndex(self->nodes)),self->nodes )));
103 			for( i = 0; i < self->nodes; i++ ) {
104 				tr->set( self->keyForIndex(i), LiteralStringRef("foo"));
105 			}
106 		} else if( type == 6 ) {
107 			wait(success( tr->getRange(KeyRangeRef(self->keyForIndex(0),self->keyForIndex(self->nodes)),self->nodes )));
108 			for( i = 0; i < self->nodes; i+= 2 ) {
109 				tr->set( self->keyForIndex(i), LiteralStringRef("foo"));
110 			}
111 		} else if( type == 7 ) {
112 			wait(success( tr->getRange(KeyRangeRef(self->keyForIndex(0),self->keyForIndex(self->nodes)),self->nodes )));
113 			for( i = 0; i < self->nodes; i++ ) {
114 				tr->clear( self->keyForIndex(i) );
115 			}
116 		} else if( type == 8 ) {
117 			wait(success( tr->getRange(KeyRangeRef(self->keyForIndex(0),self->keyForIndex(self->nodes)),self->nodes )));
118 			for( i = 0; i < self->nodes; i += 2 ) {
119 				tr->clear( KeyRangeRef( self->keyForIndex(i), self->keyForIndex(i+1) ) );
120 			}
121 		} else if( type == 9 ) {
122 			std::vector<Future<Standalone<RangeResultRef>>> gets;
123 			for( i = 0; i < self->nodes; i++ ) {
124 				gets.push_back( tr->getRange(KeyRangeRef(self->keyForIndex(i),self->keyForIndex(i+2)),self->nodes ) );
125 			}
126 			wait( waitForAll(gets) );
127 		} else if( type == 10 ) {
128 			std::vector<Future<Standalone<RangeResultRef>>> gets;
129 			for( i = 0; i < self->nodes; i++ ) {
130 				gets.push_back( tr->getRange(KeyRangeRef(self->keyForIndex(i),self->keyForIndex(i+2)),self->nodes ) );
131 			}
132 			wait( waitForAll(gets) );
133 			for( i = 0; i < self->nodes; i++ ) {
134 				tr->set( self->keyForIndex(i), LiteralStringRef("foo"));
135 			}
136 		} else if( type == 11 ) {
137 			std::vector<Future<Standalone<RangeResultRef>>> gets;
138 			for( i = 0; i < self->nodes; i++ ) {
139 				gets.push_back( tr->getRange(KeyRangeRef(self->keyForIndex(i),self->keyForIndex(i+2)),self->nodes ) );
140 			}
141 			wait( waitForAll(gets) );
142 			for( i = 0; i < self->nodes; i+= 2 ) {
143 				tr->set( self->keyForIndex(i), LiteralStringRef("foo"));
144 			}
145 		} else if( type == 12 ) {
146 			std::vector<Future<Standalone<RangeResultRef>>> gets;
147 			for( i = 0; i < self->nodes; i++ ) {
148 				gets.push_back( tr->getRange(KeyRangeRef(self->keyForIndex(i),self->keyForIndex(i+2)),self->nodes ) );
149 			}
150 			wait( waitForAll(gets) );
151 			for( i = 0; i < self->nodes; i++ ) {
152 				tr->clear( self->keyForIndex(i) );
153 			}
154 		} else if( type == 13 ) {
155 			std::vector<Future<Standalone<RangeResultRef>>> gets;
156 			for( i = 0; i < self->nodes; i++ ) {
157 				gets.push_back( tr->getRange(KeyRangeRef(self->keyForIndex(i),self->keyForIndex(i+2)),self->nodes ) );
158 			}
159 			wait( waitForAll(gets) );
160 			for( i = 0; i < self->nodes; i += 2 ) {
161 				tr->clear( KeyRangeRef( self->keyForIndex(i), self->keyForIndex(i+1) ) );
162 			}
163 		}
164 		return Void();
165 	}
166 
test_get_singleRYWPerformanceWorkload167 	ACTOR static Future<Void> test_get_single( Database cx, RYWPerformanceWorkload* self, int cacheType ) {
168 		state int i;
169 		state ReadYourWritesTransaction tr( cx );
170 
171 		loop {
172 			try {
173 				wait( self->fillCache(&tr, self, cacheType) );
174 
175 				state double startTime = timer();
176 
177 				for( i = 0; i < self->nodes; i++ ) {
178 					wait(success( tr.get(self->keyForIndex(self->nodes/2))));
179 				}
180 
181 				fprintf(stderr, "%f", self->nodes / (timer() - startTime));
182 
183 				return Void();
184 			} catch( Error &e ) {
185 				wait( tr.onError(e) );
186 			}
187 		}
188 	}
189 
test_get_many_sequentialRYWPerformanceWorkload190 	ACTOR static Future<Void> test_get_many_sequential( Database cx, RYWPerformanceWorkload* self, int cacheType ) {
191 		state int i;
192 		state ReadYourWritesTransaction tr( cx );
193 
194 		loop {
195 			try {
196 				wait( self->fillCache(&tr, self, cacheType) );
197 
198 				state double startTime = timer();
199 
200 				for( i = 0; i < self->nodes; i++ ) {
201 					wait(success( tr.get(self->keyForIndex(i))));
202 				}
203 
204 				fprintf(stderr, "%f", self->nodes / (timer() - startTime));
205 
206 				return Void();
207 			} catch( Error &e ) {
208 				wait( tr.onError(e) );
209 			}
210 		}
211 	}
212 
test_get_range_basicRYWPerformanceWorkload213 	ACTOR static Future<Void> test_get_range_basic( Database cx, RYWPerformanceWorkload* self, int cacheType ) {
214 		state int i;
215 		state ReadYourWritesTransaction tr( cx );
216 
217 		loop {
218 			try {
219 				wait( self->fillCache(&tr, self, cacheType) );
220 
221 				state double startTime = timer();
222 
223 				for( i = 0; i < self->ranges; i++ ) {
224 					wait(success( tr.getRange(KeyRangeRef(self->keyForIndex(0),self->keyForIndex(self->nodes)),self->nodes )));
225 				}
226 
227 				fprintf(stderr, "%f", self->ranges / (timer() - startTime));
228 
229 				return Void();
230 			} catch( Error &e ) {
231 				wait( tr.onError(e) );
232 			}
233 		}
234 	}
235 
test_interleaved_sets_getsRYWPerformanceWorkload236 	ACTOR static Future<Void> test_interleaved_sets_gets( Database cx, RYWPerformanceWorkload* self, int cacheType ) {
237 		state int i;
238 		state ReadYourWritesTransaction tr( cx );
239 
240 		loop {
241 			try {
242 				wait( self->fillCache(&tr, self, cacheType) );
243 
244 				tr.set( self->keyForIndex(self->nodes/2), self->keyForIndex(self->nodes) );
245 
246 				state double startTime = timer();
247 
248 				for( i = 0; i < self->nodes; i++ ) {
249 					wait(success( tr.get(self->keyForIndex(self->nodes/2)) ));
250 					tr.set( self->keyForIndex(self->nodes/2),  self->keyForIndex(i) );
251 				}
252 
253 				fprintf(stderr, "%f", self->nodes / (timer() - startTime));
254 
255 				return Void();
256 			} catch( Error &e ) {
257 				wait( tr.onError(e) );
258 			}
259 		}
260 	}
261 
_startRYWPerformanceWorkload262 	ACTOR static Future<Void> _start( Database cx, RYWPerformanceWorkload* self ) {
263 		state int i;
264 		fprintf(stderr, "test_get_single, ");
265 		for( i = 0; i < 14; i++ ) {
266 			wait( self->test_get_single( cx, self, i ) );
267 			if( i == 13 ) fprintf(stderr, "\n");
268 			else fprintf(stderr, ", ");
269 		}
270 		fprintf(stderr, "test_get_many_sequential, ");
271 		for( i = 0; i < 14; i++ ) {
272 			wait( self->test_get_many_sequential( cx, self, i ) );
273 			if( i == 13 ) fprintf(stderr, "\n");
274 			else fprintf(stderr, ", ");
275 		}
276 		fprintf(stderr, "test_get_range_basic, ");
277 		for( i = 4; i < 14; i++ ) {
278 			wait( self->test_get_range_basic( cx, self, i ) );
279 			if( i == 13 ) fprintf(stderr, "\n");
280 			else fprintf(stderr, ", ");
281 		}
282 		fprintf(stderr, "test_interleaved_sets_gets, ");
283 		for( i = 0; i < 14; i++ ) {
284 			wait( self->test_interleaved_sets_gets( cx, self, i ) );
285 			if( i == 13 ) fprintf(stderr, "\n");
286 			else fprintf(stderr, ", ");
287 		}
288 		return Void();
289 	}
290 
checkRYWPerformanceWorkload291 	virtual Future<bool> check( Database const& cx ) {
292 		return true;
293 	}
294 
getMetricsRYWPerformanceWorkload295 	virtual void getMetrics( vector<PerfMetric>& m ) {
296 	}
297 
keyForIndexRYWPerformanceWorkload298 	Key keyForIndex( uint64_t index ) {
299 		Key result = makeString( keyBytes );
300 		uint8_t* data = mutateString( result );
301 		memset(data, '.', keyBytes);
302 
303 		double d = double(index) / nodes;
304 		emplaceIndex( data, 0, *(int64_t*)&d );
305 
306 		return result;
307 	}
308 };
309 
310 WorkloadFactory<RYWPerformanceWorkload> RYWPerformanceWorkloadFactory("RYWPerformance");
311