1 /*
2 * MemoryPager.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbserver/MemoryPager.h"
22 #include "fdbserver/Knobs.h"
23
24 #include "flow/Arena.h"
25 #include "flow/UnitTest.h"
26 #include "flow/actorcompiler.h"
27
28 typedef uint8_t* PhysicalPageID;
29 typedef std::vector<std::pair<Version, PhysicalPageID>> PageVersionMap;
30 typedef std::vector<PageVersionMap> LogicalPageTable;
31
32 class MemoryPager;
33
34 class MemoryPage : public IPage, ReferenceCounted<MemoryPage> {
35 public:
36 MemoryPage();
37 MemoryPage(uint8_t *data);
38 virtual ~MemoryPage();
39
addref() const40 virtual void addref() const {
41 ReferenceCounted<MemoryPage>::addref();
42 }
43
delref() const44 virtual void delref() const {
45 ReferenceCounted<MemoryPage>::delref();
46 }
47
48 virtual int size() const;
49 virtual uint8_t const* begin() const;
50 virtual uint8_t* mutate();
51
52 private:
53 friend class MemoryPager;
54 uint8_t *data;
55 bool allocated;
56
57 static const int PAGE_BYTES;
58 };
59
60 class MemoryPagerSnapshot : public IPagerSnapshot, ReferenceCounted<MemoryPagerSnapshot> {
61 public:
MemoryPagerSnapshot(MemoryPager * pager,Version version)62 MemoryPagerSnapshot(MemoryPager *pager, Version version) : pager(pager), version(version) {}
63 virtual Future<Reference<const IPage>> getPhysicalPage(LogicalPageID pageID);
getVersion() const64 virtual Version getVersion() const {
65 return version;
66 }
67
addref()68 virtual void addref() {
69 ReferenceCounted<MemoryPagerSnapshot>::addref();
70 }
71
delref()72 virtual void delref() {
73 ReferenceCounted<MemoryPagerSnapshot>::delref();
74 }
75
76 private:
77 MemoryPager *pager;
78 Version version;
79 };
80
81 class MemoryPager : public IPager, ReferenceCounted<MemoryPager> {
82 public:
83 MemoryPager();
84
85 virtual Reference<IPage> newPageBuffer();
86 virtual int getUsablePageSize();
87
88 virtual Reference<IPagerSnapshot> getReadSnapshot(Version version);
89
90 virtual LogicalPageID allocateLogicalPage();
91 virtual void freeLogicalPage(LogicalPageID pageID, Version version);
92 virtual void writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID);
93 virtual void forgetVersions(Version begin, Version end);
94 virtual Future<Void> commit();
95
getStorageBytes()96 virtual StorageBytes getStorageBytes() {
97 // TODO: Get actual values for used and free memory
98 return StorageBytes();
99 }
100
101 virtual void setLatestVersion(Version version);
102 virtual Future<Version> getLatestVersion();
103
104 virtual Future<Void> getError();
105 virtual Future<Void> onClosed();
106 virtual void dispose();
107 virtual void close();
108
109 virtual Reference<const IPage> getPage(LogicalPageID pageID, Version version);
110
111 private:
112 Version latestVersion;
113 Version committedVersion;
114 Standalone<VectorRef<VectorRef<uint8_t>>> data;
115 LogicalPageTable pageTable;
116
117 Promise<Void> closed;
118
119 std::vector<PhysicalPageID> freeList; // TODO: is this good enough for now?
120
121 PhysicalPageID allocatePage(Reference<IPage> contents);
122 void extendData();
123
124 static const PhysicalPageID INVALID_PAGE;
125 };
126
createMemoryPager()127 IPager * createMemoryPager() {
128 return new MemoryPager();
129 }
130
MemoryPage()131 MemoryPage::MemoryPage() : allocated(true) {
132 data = (uint8_t*)FastAllocator<4096>::allocate();
133 }
134
MemoryPage(uint8_t * data)135 MemoryPage::MemoryPage(uint8_t *data) : data(data), allocated(false) {}
136
~MemoryPage()137 MemoryPage::~MemoryPage() {
138 if(allocated) {
139 FastAllocator<4096>::release(data);
140 }
141 }
142
begin() const143 uint8_t const* MemoryPage::begin() const {
144 return data;
145 }
146
mutate()147 uint8_t* MemoryPage::mutate() {
148 return data;
149 }
150
size() const151 int MemoryPage::size() const {
152 return PAGE_BYTES;
153 }
154
155 const int MemoryPage::PAGE_BYTES = 4096;
156
getPhysicalPage(LogicalPageID pageID)157 Future<Reference<const IPage>> MemoryPagerSnapshot::getPhysicalPage(LogicalPageID pageID) {
158 return pager->getPage(pageID, version);
159 }
160
MemoryPager()161 MemoryPager::MemoryPager() : latestVersion(0), committedVersion(0) {
162 extendData();
163 pageTable.resize(SERVER_KNOBS->PAGER_RESERVED_PAGES);
164 }
165
newPageBuffer()166 Reference<IPage> MemoryPager::newPageBuffer() {
167 return Reference<IPage>(new MemoryPage());
168 }
169
getUsablePageSize()170 int MemoryPager::getUsablePageSize() {
171 return MemoryPage::PAGE_BYTES;
172 }
173
getReadSnapshot(Version version)174 Reference<IPagerSnapshot> MemoryPager::getReadSnapshot(Version version) {
175 ASSERT(version <= latestVersion);
176 return Reference<IPagerSnapshot>(new MemoryPagerSnapshot(this, version));
177 }
178
allocateLogicalPage()179 LogicalPageID MemoryPager::allocateLogicalPage() {
180 ASSERT(pageTable.size() >= SERVER_KNOBS->PAGER_RESERVED_PAGES);
181 pageTable.push_back(PageVersionMap());
182 return pageTable.size() - 1;
183 }
184
freeLogicalPage(LogicalPageID pageID,Version version)185 void MemoryPager::freeLogicalPage(LogicalPageID pageID, Version version) {
186 ASSERT(pageID < pageTable.size());
187
188 PageVersionMap &pageVersionMap = pageTable[pageID];
189 ASSERT(!pageVersionMap.empty());
190
191 auto itr = std::lower_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](std::pair<Version, PhysicalPageID> p, Version v) {
192 return p.first < v;
193 });
194
195 pageVersionMap.erase(itr, pageVersionMap.end());
196 if(pageVersionMap.size() > 0 && pageVersionMap.back().second != INVALID_PAGE) {
197 pageVersionMap.push_back(std::make_pair(version, INVALID_PAGE));
198 }
199 }
200
writePage(LogicalPageID pageID,Reference<IPage> contents,Version updateVersion,LogicalPageID referencePageID)201 void MemoryPager::writePage(LogicalPageID pageID, Reference<IPage> contents, Version updateVersion, LogicalPageID referencePageID) {
202 ASSERT(updateVersion > latestVersion || updateVersion == 0);
203 ASSERT(pageID < pageTable.size());
204
205 if(referencePageID != invalidLogicalPageID) {
206 PageVersionMap &rpv = pageTable[referencePageID];
207 ASSERT(!rpv.empty());
208 updateVersion = rpv.back().first;
209 }
210
211 PageVersionMap &pageVersionMap = pageTable[pageID];
212
213 ASSERT(updateVersion >= committedVersion || updateVersion == 0);
214 PhysicalPageID physicalPageID = allocatePage(contents);
215
216 ASSERT(pageVersionMap.empty() || pageVersionMap.back().second != INVALID_PAGE);
217
218 if(updateVersion == 0) {
219 ASSERT(pageVersionMap.size());
220 updateVersion = pageVersionMap.back().first;
221 pageVersionMap.back().second = physicalPageID;
222 // TODO: what to do with old page?
223 }
224 else {
225 ASSERT(pageVersionMap.empty() || pageVersionMap.back().first < updateVersion);
226 pageVersionMap.push_back(std::make_pair(updateVersion, physicalPageID));
227 }
228
229 }
230
forgetVersions(Version begin,Version end)231 void MemoryPager::forgetVersions(Version begin, Version end) {
232 ASSERT(begin <= end);
233 ASSERT(end <= latestVersion);
234 // TODO
235 }
236
commit()237 Future<Void> MemoryPager::commit() {
238 ASSERT(committedVersion < latestVersion);
239 committedVersion = latestVersion;
240 return Void();
241 }
242
setLatestVersion(Version version)243 void MemoryPager::setLatestVersion(Version version) {
244 ASSERT(version > latestVersion);
245 latestVersion = version;
246 }
247
getLatestVersion()248 Future<Version> MemoryPager::getLatestVersion() {
249 return latestVersion;
250 }
251
getPage(LogicalPageID pageID,Version version)252 Reference<const IPage> MemoryPager::getPage(LogicalPageID pageID, Version version) {
253 ASSERT(pageID < pageTable.size());
254 PageVersionMap const& pageVersionMap = pageTable[pageID];
255
256 auto itr = std::upper_bound(pageVersionMap.begin(), pageVersionMap.end(), version, [](Version v, std::pair<Version, PhysicalPageID> p) {
257 return v < p.first;
258 });
259
260 if(itr == pageVersionMap.begin()) {
261 return Reference<IPage>(); // TODO: should this be an error?
262 }
263
264 --itr;
265
266 ASSERT(itr->second != INVALID_PAGE);
267 return Reference<const IPage>(new MemoryPage(itr->second)); // TODO: Page memory owned by the pager. Change this?
268 }
269
getError()270 Future<Void> MemoryPager::getError() {
271 return Void();
272 }
273
onClosed()274 Future<Void> MemoryPager::onClosed() {
275 return closed.getFuture();
276 }
277
dispose()278 void MemoryPager::dispose() {
279 closed.send(Void());
280 delete this;
281 }
282
close()283 void MemoryPager::close() {
284 dispose();
285 }
286
allocatePage(Reference<IPage> contents)287 PhysicalPageID MemoryPager::allocatePage(Reference<IPage> contents) {
288 if(freeList.size()) {
289 PhysicalPageID pageID = freeList.back();
290 freeList.pop_back();
291
292 memcpy(pageID, contents->begin(), contents->size());
293 return pageID;
294 }
295 else {
296 ASSERT(data.size() && data.back().capacity() - data.back().size() >= contents->size());
297 PhysicalPageID pageID = data.back().end();
298
299 data.back().append(data.arena(), contents->begin(), contents->size());
300 if(data.back().size() == data.back().capacity()) {
301 extendData();
302 }
303 else {
304 ASSERT(data.back().size() <= data.back().capacity() - 4096);
305 }
306
307 return pageID;
308 }
309 }
310
extendData()311 void MemoryPager::extendData() {
312 if(data.size() > 1000) { // TODO: is this an ok way to handle large data size?
313 throw io_error();
314 }
315
316 VectorRef<uint8_t> d;
317 d.reserve(data.arena(), 1 << 22);
318 data.push_back(data.arena(), d);
319 }
320
321 // TODO: these tests are not MemoryPager specific, we should make them more general
322
fillPage(Reference<IPage> page,LogicalPageID pageID,Version version)323 void fillPage(Reference<IPage> page, LogicalPageID pageID, Version version) {
324 ASSERT(page->size() > sizeof(LogicalPageID) + sizeof(Version));
325
326 memset(page->mutate(), 0, page->size());
327 memcpy(page->mutate(), (void*)&pageID, sizeof(LogicalPageID));
328 memcpy(page->mutate() + sizeof(LogicalPageID), (void*)&version, sizeof(Version));
329 }
330
validatePage(Reference<const IPage> page,LogicalPageID pageID,Version version)331 bool validatePage(Reference<const IPage> page, LogicalPageID pageID, Version version) {
332 bool valid = true;
333
334 LogicalPageID readPageID = *(LogicalPageID*)page->begin();
335 if(readPageID != pageID) {
336 fprintf(stderr, "Invalid PageID detected: %u (expected %u)\n", readPageID, pageID);
337 valid = false;
338 }
339
340 Version readVersion = *(Version*)(page->begin()+sizeof(LogicalPageID));
341 if(readVersion != version) {
342 fprintf(stderr, "Invalid Version detected on page %u: %lld (expected %lld)\n", pageID, readVersion, version);
343 valid = false;
344 }
345
346 return valid;
347 }
348
writePage(IPager * pager,Reference<IPage> page,LogicalPageID pageID,Version version,bool updateVersion=true)349 void writePage(IPager *pager, Reference<IPage> page, LogicalPageID pageID, Version version, bool updateVersion=true) {
350 fillPage(page, pageID, version);
351 pager->writePage(pageID, page, updateVersion ? version : 0);
352 }
353
commit(IPager * pager)354 ACTOR Future<Void> commit(IPager *pager) {
355 static int commitNum = 1;
356 state int myCommit = commitNum++;
357
358 debug_printf("Commit%d\n", myCommit);
359 wait(pager->commit());
360 debug_printf("FinishedCommit%d\n", myCommit);
361 return Void();
362 }
363
read(IPager * pager,LogicalPageID pageID,Version version,Version expectedVersion=-1)364 ACTOR Future<Void> read(IPager *pager, LogicalPageID pageID, Version version, Version expectedVersion=-1) {
365 static int readNum = 1;
366 state int myRead = readNum++;
367 state Reference<IPagerSnapshot> readSnapshot = pager->getReadSnapshot(version);
368 debug_printf("Read%d\n", myRead);
369 Reference<const IPage> readPage = wait(readSnapshot->getPhysicalPage(pageID));
370 debug_printf("FinishedRead%d\n", myRead);
371 ASSERT(validatePage(readPage, pageID, expectedVersion >= 0 ? expectedVersion : version));
372 return Void();
373 }
374
simplePagerTest(IPager * pager)375 ACTOR Future<Void> simplePagerTest(IPager *pager) {
376 state Reference<IPage> page = pager->newPageBuffer();
377
378 Version latestVersion = wait(pager->getLatestVersion());
379 debug_printf("Got latest version: %lld\n", latestVersion);
380
381 state Version version = latestVersion+1;
382 state Version v1 = version;
383
384 state LogicalPageID pageID1 = pager->allocateLogicalPage();
385
386 writePage(pager, page, pageID1, v1);
387 pager->setLatestVersion(v1);
388 wait(commit(pager));
389
390 state LogicalPageID pageID2 = pager->allocateLogicalPage();
391
392 state Version v2 = ++version;
393
394 writePage(pager, page, pageID1, v2);
395 writePage(pager, page, pageID2, v2);
396 pager->setLatestVersion(v2);
397 wait(commit(pager));
398
399 wait(read(pager, pageID1, v2));
400 wait(read(pager, pageID1, v1));
401
402 state Version v3 = ++version;
403 writePage(pager, page, pageID1, v3, false);
404 pager->setLatestVersion(v3);
405
406 wait(read(pager, pageID1, v2, v3));
407 wait(read(pager, pageID1, v3, v3));
408
409 state LogicalPageID pageID3 = pager->allocateLogicalPage();
410
411 state Version v4 = ++version;
412 writePage(pager, page, pageID2, v4);
413 writePage(pager, page, pageID3, v4);
414 pager->setLatestVersion(v4);
415 wait(commit(pager));
416
417 wait(read(pager, pageID2, v4, v4));
418
419 state Version v5 = ++version;
420 writePage(pager, page, pageID2, v5);
421
422 state LogicalPageID pageID4 = pager->allocateLogicalPage();
423 writePage(pager, page, pageID4, v5);
424
425 state Version v6 = ++version;
426 pager->freeLogicalPage(pageID2, v5);
427 pager->freeLogicalPage(pageID3, v3);
428 pager->setLatestVersion(v6);
429 wait(commit(pager));
430
431 pager->forgetVersions(0, v4);
432 wait(commit(pager));
433
434 wait(delay(3.0));
435
436 wait(commit(pager));
437
438 return Void();
439 }
440
441 /*
442 TEST_CASE("/fdbserver/memorypager/simple") {
443 state IPager *pager = new MemoryPager();
444
445 wait(simplePagerTest(pager));
446
447 Future<Void> closedFuture = pager->onClosed();
448 pager->dispose();
449
450 wait(closedFuture);
451 return Void();
452 }
453 */
454
455 const PhysicalPageID MemoryPager::INVALID_PAGE = nullptr;
456