1 /*
2  * MemoryPager.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbserver/MemoryPager.h"
22 #include "fdbserver/Knobs.h"
23 
24 #include "flow/Arena.h"
25 #include "flow/UnitTest.h"
26 #include "flow/actorcompiler.h"
27 
28 typedef uint8_t* PhysicalPageID;
29 typedef std::vector<std::pair<Version, PhysicalPageID>> PageVersionMap;
30 typedef std::vector<PageVersionMap> LogicalPageTable;
31 
32 class MemoryPager;
33 
34 class MemoryPage : public IPage, ReferenceCounted<MemoryPage> {
35 public:
36 	MemoryPage();
37 	MemoryPage(uint8_t *data);
38 	virtual ~MemoryPage();
39 
addref() const40 	virtual void addref() const {
41 		ReferenceCounted<MemoryPage>::addref();
42 	}
43 
delref() const44 	virtual void delref() const {
45 		ReferenceCounted<MemoryPage>::delref();
46 	}
47 
48 	virtual int size() const;
49 	virtual uint8_t const* begin() const;
50 	virtual uint8_t* mutate();
51 
52 private:
53 	friend class MemoryPager;
54 	uint8_t *data;
55 	bool allocated;
56 
57 	static const int PAGE_BYTES;
58 };
59 
60 class MemoryPagerSnapshot : public IPagerSnapshot, ReferenceCounted<MemoryPagerSnapshot> {
61 public:
MemoryPagerSnapshot(MemoryPager * pager,Version version)62 	MemoryPagerSnapshot(MemoryPager *pager, Version version) : pager(pager), version(version) {}
63 	virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID);
getVersion() const64 	virtual Version getVersion() const {
65 		return version;
66 	}
67 
addref()68 	virtual void addref() {
69 		ReferenceCounted<MemoryPagerSnapshot>::addref();
70 	}
71 
delref()72 	virtual void delref() {
73 		ReferenceCounted<MemoryPagerSnapshot>::delref();
74 	}
75 
76 private:
77 	MemoryPager *pager;
78 	Version version;
79 };
80 
81 class MemoryPager : public IPager, ReferenceCounted<MemoryPager> {
82 public:
83 	MemoryPager();
84 
85 	virtual Reference<IPage> newPageBuffer();
86 	virtual int getUsablePageSize();
87 
88 	virtual Reference<IPagerSnapshot> getReadSnapshot(Version version);
89 
90 	virtual LogicalPageID allocateLogicalPage();
91 	virtual void freeLogicalPage(LogicalPageID pageID, Version version);
92 	virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID);
93 	virtual void forgetVersions(Version begin, Version end);
94 	virtual Future<Void> commit();
95 
getStorageBytes()96 	virtual StorageBytes getStorageBytes() {
97 		// TODO:  Get actual values for used and free memory
98 		return StorageBytes();
99 	}
100 
101 	virtual void setLatestVersion(Version version);
102 	virtual Future<Version> getLatestVersion();
103 
104 	virtual Future<Void> getError();
105 	virtual Future<Void> onClosed();
106 	virtual void dispose();
107 	virtual void close();
108 
109 	virtual Reference<const IPage> getPage(LogicalPageID pageID, Version version);
110 
111 private:
112 	Version latestVersion;
113 	Version committedVersion;
114 	Standalone<VectorRef<VectorRef<uint8_t>>> data;
115 	LogicalPageTable pageTable;
116 
117 	Promise<Void> closed;
118 
119 	std::vector<PhysicalPageID> freeList; // TODO: is this good enough for now?
120 
121 	PhysicalPageID allocatePage(Reference<IPage> contents);
122 	void extendData();
123 
124 	static const PhysicalPageID INVALID_PAGE;
125 };
126 
createMemoryPager()127 IPager * createMemoryPager() {
128 	return new MemoryPager();
129 }
130 
MemoryPage()131 MemoryPage::MemoryPage() : allocated(true) {
132 	data = (uint8_t*)FastAllocator<4096>::allocate();
133 }
134 
MemoryPage(uint8_t * data)135 MemoryPage::MemoryPage(uint8_t *data) : data(data), allocated(false) {}
136 
~MemoryPage()137 MemoryPage::~MemoryPage() {
138 	if(allocated) {
139 		FastAllocator<4096>::release(data);
140 	}
141 }
142 
begin() const143 uint8_t const* MemoryPage::begin() const {
144 	return data;
145 }
146 
mutate()147 uint8_t* MemoryPage::mutate() {
148 	return data;
149 }
150 
size() const151 int MemoryPage::size() const {
152 	return PAGE_BYTES;
153 }
154 
155 const int MemoryPage::PAGE_BYTES = 4096;
156 
getPhysicalPage(LogicalPageID pageID)157 Future<Reference<const IPage>> MemoryPagerSnapshot::getPhysicalPage(LogicalPageID pageID) {
158 	return pager->getPage(pageID, version);
159 }
160 
MemoryPager()161 MemoryPager::MemoryPager() : latestVersion(0), committedVersion(0) {
162 	extendData();
163 	pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES);
164 }
165 
newPageBuffer()166 Reference<IPage> MemoryPager::newPageBuffer() {
167 	return Reference<IPage>(new MemoryPage());
168 }
169 
getUsablePageSize()170 int MemoryPager::getUsablePageSize() {
171 	return MemoryPage::PAGE_BYTES;
172 }
173 
getReadSnapshot(Version version)174 Reference<IPagerSnapshot> MemoryPager::getReadSnapshot(Version version) {
175 	ASSERT(version <= latestVersion);
176 	return Reference<IPagerSnapshot>(new MemoryPagerSnapshot(this, version));
177 }
178 
allocateLogicalPage()179 LogicalPageID MemoryPager::allocateLogicalPage() {
180 	ASSERT(pageTable.size() >= SERVER_KNOBS->PAGER_RESERVED_PAGES);
181 	pageTable.push_back(PageVersionMap());
182 	return pageTable.size() - 1;
183 }
184 
freeLogicalPage(LogicalPageID pageID,Version version)185 void MemoryPager::freeLogicalPage(LogicalPageID pageID, Version version) {
186 	ASSERT(pageID < pageTable.size());
187 
188 	PageVersionMap &pageVersionMap = pageTable[pageID];
189 	ASSERT(!pageVersionMap.empty());
190 
191 	auto itr = std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair<Version, PhysicalPageID> p, Version v) {
192 		return p.first < v;
193 	});
194 
195 	pageVersionMap.erase(itr, pageVersionMap.end());
196 	if(pageVersionMap.size() > 0 && pageVersionMap.back().second != INVALID_PAGE) {
197 		pageVersionMap.push_back(std::make_pair(version, INVALID_PAGE));
198 	}
199 }
200 
writePage(LogicalPageID pageID,Reference<IPage> contents,Version updateVersion,LogicalPageID referencePageID)201 void MemoryPager::writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID) {
202 	ASSERT(updateVersion > latestVersion || updateVersion == 0);
203 	ASSERT(pageID < pageTable.size());
204 
205 	if(referencePageID != invalidLogicalPageID) {
206 		PageVersionMap &rpv = pageTable[referencePageID];
207 		ASSERT(!rpv.empty());
208 		updateVersion = rpv.back().first;
209 	}
210 
211 	PageVersionMap &pageVersionMap = pageTable[pageID];
212 
213 	ASSERT(updateVersion >= committedVersion || updateVersion == 0);
214 	PhysicalPageID physicalPageID = allocatePage(contents);
215 
216 	ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != INVALID_PAGE);
217 
218 	if(updateVersion == 0) {
219 		ASSERT(pageVersionMap.size());
220 		updateVersion = pageVersionMap.back().first;
221 		pageVersionMap.back().second = physicalPageID;
222 		// TODO: what to do with old page?
223 	}
224 	else {
225 		ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion);
226 		pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID));
227 	}
228 
229 }
230 
forgetVersions(Version begin,Version end)231 void MemoryPager::forgetVersions(Version begin, Version end) {
232 	ASSERT(begin <= end);
233 	ASSERT(end <= latestVersion);
234 	// TODO
235 }
236 
commit()237 Future<Void> MemoryPager::commit() {
238 	ASSERT(committedVersion < latestVersion);
239 	committedVersion = latestVersion;
240 	return Void();
241 }
242 
setLatestVersion(Version version)243 void MemoryPager::setLatestVersion(Version version) {
244 	ASSERT(version > latestVersion);
245 	latestVersion = version;
246 }
247 
getLatestVersion()248 Future<Version> MemoryPager::getLatestVersion() {
249 	return latestVersion;
250 }
251 
getPage(LogicalPageID pageID,Version version)252 Reference<const IPage> MemoryPager::getPage(LogicalPageID pageID, Version version) {
253 	ASSERT(pageID < pageTable.size());
254 	PageVersionMap const& pageVersionMap = pageTable[pageID];
255 
256 	auto itr = std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair<Version, PhysicalPageID> p) {
257 		return v < p.first;
258 	});
259 
260 	if(itr == pageVersionMap.begin()) {
261 		return Reference<IPage>(); // TODO: should this be an error?
262 	}
263 
264 	--itr;
265 
266 	ASSERT(itr->second != INVALID_PAGE);
267 	return Reference<const IPage>(new MemoryPage(itr->second)); // TODO: Page memory owned by the pager. Change this?
268 }
269 
getError()270 Future<Void> MemoryPager::getError() {
271 	return Void();
272 }
273 
onClosed()274 Future<Void> MemoryPager::onClosed() {
275 	return closed.getFuture();
276 }
277 
dispose()278 void MemoryPager::dispose() {
279 	closed.send(Void());
280 	delete this;
281 }
282 
close()283 void MemoryPager::close() {
284 	dispose();
285 }
286 
allocatePage(Reference<IPage> contents)287 PhysicalPageID MemoryPager::allocatePage(Reference<IPage> contents) {
288 	if(freeList.size()) {
289 		PhysicalPageID pageID = freeList.back();
290 		freeList.pop_back();
291 
292 		memcpy(pageID, contents->begin(), contents->size());
293 		return pageID;
294 	}
295 	else {
296 		ASSERT(data.size() && data.back().capacity() - data.back().size() >= contents->size());
297 		PhysicalPageID pageID = data.back().end();
298 
299 		data.back().append(data.arena(), contents->begin(), contents->size());
300 		if(data.back().size() == data.back().capacity()) {
301 			extendData();
302 		}
303 		else {
304 			ASSERT(data.back().size() <= data.back().capacity() - 4096);
305 		}
306 
307 		return pageID;
308 	}
309 }
310 
extendData()311 void MemoryPager::extendData() {
312 	if(data.size() > 1000) { // TODO: is this an ok way to handle large data size?
313 		throw io_error();
314 	}
315 
316 	VectorRef<uint8_t> d;
317 	d.reserve(data.arena(), 1 << 22);
318 	data.push_back(data.arena(), d);
319 }
320 
321 // TODO: these tests are not MemoryPager specific, we should make them more general
322 
fillPage(Reference<IPage> page,LogicalPageID pageID,Version version)323 void fillPage(Reference<IPage> page, LogicalPageID pageID, Version version) {
324 	ASSERT(page->size() > sizeof(LogicalPageID) + sizeof(Version));
325 
326 	memset(page->mutate(), 0, page->size());
327 	memcpy(page->mutate(), (void*)&pageID, sizeof(LogicalPageID));
328 	memcpy(page->mutate() + sizeof(LogicalPageID), (void*)&version, sizeof(Version));
329 }
330 
validatePage(Reference<const IPage> page,LogicalPageID pageID,Version version)331 bool validatePage(Reference<const IPage> page, LogicalPageID pageID, Version version) {
332 	bool valid = true;
333 
334 	LogicalPageID readPageID = *(LogicalPageID*)page->begin();
335 	if(readPageID != pageID) {
336 		fprintf(stderr, "Invalid PageID detected: %u (expected %u)\n", readPageID, pageID);
337 		valid = false;
338 	}
339 
340 	Version readVersion = *(Version*)(page->begin()+sizeof(LogicalPageID));
341 	if(readVersion != version) {
342 		fprintf(stderr, "Invalid Version detected on page %u: %lld (expected %lld)\n", pageID, readVersion, version);
343 		valid = false;
344 	}
345 
346 	return valid;
347 }
348 
writePage(IPager * pager,Reference<IPage> page,LogicalPageID pageID,Version version,bool updateVersion=true)349 void writePage(IPager *pager, Reference<IPage> page, LogicalPageID pageID, Version version, bool updateVersion=true) {
350 	fillPage(page, pageID, version);
351 	pager->writePage(pageID, page, updateVersion ? version : 0);
352 }
353 
commit(IPager * pager)354 ACTOR Future<Void> commit(IPager *pager) {
355 	static int commitNum = 1;
356 	state int myCommit = commitNum++;
357 
358 	debug_printf("Commit%d\n", myCommit);
359 	wait(pager->commit());
360 	debug_printf("FinishedCommit%d\n", myCommit);
361 	return Void();
362 }
363 
read(IPager * pager,LogicalPageID pageID,Version version,Version expectedVersion=-1)364 ACTOR Future<Void> read(IPager *pager, LogicalPageID pageID, Version version, Version expectedVersion=-1) {
365 	static int readNum = 1;
366 	state int myRead = readNum++;
367 	state Reference<IPagerSnapshot> readSnapshot = pager->getReadSnapshot(version);
368 	debug_printf("Read%d\n", myRead);
369 	Reference<const IPage> readPage = wait(readSnapshot->getPhysicalPage(pageID));
370 	debug_printf("FinishedRead%d\n", myRead);
371 	ASSERT(validatePage(readPage, pageID, expectedVersion >= 0 ? expectedVersion : version));
372 	return Void();
373 }
374 
simplePagerTest(IPager * pager)375 ACTOR Future<Void> simplePagerTest(IPager *pager) {
376 	state Reference<IPage> page = pager->newPageBuffer();
377 
378 	Version latestVersion = wait(pager->getLatestVersion());
379 	debug_printf("Got latest version: %lld\n", latestVersion);
380 
381 	state Version version = latestVersion+1;
382 	state Version v1 = version;
383 
384 	state LogicalPageID pageID1 = pager->allocateLogicalPage();
385 
386 	writePage(pager, page, pageID1, v1);
387 	pager->setLatestVersion(v1);
388 	wait(commit(pager));
389 
390 	state LogicalPageID pageID2 = pager->allocateLogicalPage();
391 
392 	state Version v2 = ++version;
393 
394 	writePage(pager, page, pageID1, v2);
395 	writePage(pager, page, pageID2, v2);
396 	pager->setLatestVersion(v2);
397 	wait(commit(pager));
398 
399 	wait(read(pager, pageID1, v2));
400 	wait(read(pager, pageID1, v1));
401 
402 	state Version v3 = ++version;
403 	writePage(pager, page, pageID1, v3, false);
404 	pager->setLatestVersion(v3);
405 
406 	wait(read(pager, pageID1, v2, v3));
407 	wait(read(pager, pageID1, v3, v3));
408 
409 	state LogicalPageID pageID3 = pager->allocateLogicalPage();
410 
411 	state Version v4 = ++version;
412 	writePage(pager, page, pageID2, v4);
413 	writePage(pager, page, pageID3, v4);
414 	pager->setLatestVersion(v4);
415 	wait(commit(pager));
416 
417 	wait(read(pager, pageID2, v4, v4));
418 
419 	state Version v5 = ++version;
420 	writePage(pager, page, pageID2, v5);
421 
422 	state LogicalPageID pageID4 = pager->allocateLogicalPage();
423 	writePage(pager, page, pageID4, v5);
424 
425 	state Version v6 = ++version;
426 	pager->freeLogicalPage(pageID2, v5);
427 	pager->freeLogicalPage(pageID3, v3);
428 	pager->setLatestVersion(v6);
429 	wait(commit(pager));
430 
431 	pager->forgetVersions(0, v4);
432 	wait(commit(pager));
433 
434 	wait(delay(3.0));
435 
436 	wait(commit(pager));
437 
438 	return Void();
439 }
440 
441 /*
442 TEST_CASE("/fdbserver/memorypager/simple") {
443 	state IPager *pager = new MemoryPager();
444 
445 	wait(simplePagerTest(pager));
446 
447 	Future<Void> closedFuture = pager->onClosed();
448 	pager->dispose();
449 
450 	wait(closedFuture);
451 	return Void();
452 }
453 */
454 
455 const PhysicalPageID MemoryPager::INVALID_PAGE = nullptr;
456