1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include <deque>
6 #include <dirent.h>
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <pthread.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <sys/mman.h>
14 #include <sys/stat.h>
15 #include <sys/time.h>
16 #include <sys/types.h>
17 #include <time.h>
18 #include <unistd.h>
19 #if defined(LEVELDB_PLATFORM_ANDROID)
20 #include <sys/stat.h>
21 #endif
22 #include "leveldb/env.h"
23 #include "leveldb/slice.h"
24 #include "port/port.h"
25 #include "util/logging.h"
26 #include "util/posix_logger.h"
27
28 namespace leveldb {
29
30 namespace {
31
IOError(const std::string & context,int err_number)32 static Status IOError(const std::string& context, int err_number) {
33 return Status::IOError(context, strerror(err_number));
34 }
35
36 class PosixSequentialFile: public SequentialFile {
37 private:
38 std::string filename_;
39 FILE* file_;
40
41 public:
PosixSequentialFile(const std::string & fname,FILE * f)42 PosixSequentialFile(const std::string& fname, FILE* f)
43 : filename_(fname), file_(f) { }
~PosixSequentialFile()44 virtual ~PosixSequentialFile() { fclose(file_); }
45
Read(size_t n,Slice * result,char * scratch)46 virtual Status Read(size_t n, Slice* result, char* scratch) {
47 Status s;
48 size_t r = fread_unlocked(scratch, 1, n, file_);
49 *result = Slice(scratch, r);
50 if (r < n) {
51 if (feof(file_)) {
52 // We leave status as ok if we hit the end of the file
53 } else {
54 // A partial read with an error: return a non-ok status
55 s = IOError(filename_, errno);
56 }
57 }
58 return s;
59 }
60
Skip(uint64_t n)61 virtual Status Skip(uint64_t n) {
62 if (fseek(file_, n, SEEK_CUR)) {
63 return IOError(filename_, errno);
64 }
65 return Status::OK();
66 }
67 };
68
69 class PosixRandomAccessFile: public RandomAccessFile {
70 private:
71 std::string filename_;
72 int fd_;
73
74 public:
PosixRandomAccessFile(const std::string & fname,int fd)75 PosixRandomAccessFile(const std::string& fname, int fd)
76 : filename_(fname), fd_(fd) { }
~PosixRandomAccessFile()77 virtual ~PosixRandomAccessFile() { close(fd_); }
78
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const79 virtual Status Read(uint64_t offset, size_t n, Slice* result,
80 char* scratch) const {
81 Status s;
82 ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
83 *result = Slice(scratch, (r < 0) ? 0 : r);
84 if (r < 0) {
85 // An error: return a non-ok status
86 s = IOError(filename_, errno);
87 }
88 return s;
89 }
90 };
91
92 // We preallocate up to an extra megabyte and use memcpy to append new
93 // data to the file. This is safe since we either properly close the
94 // file before reading from it, or for log files, the reading code
95 // knows enough to skip zero suffixes.
96 class PosixMmapFile : public WritableFile {
97 private:
98 std::string filename_;
99 int fd_;
100 size_t page_size_;
101 size_t map_size_; // How much extra memory to map at a time
102 char* base_; // The mapped region
103 char* limit_; // Limit of the mapped region
104 char* dst_; // Where to write next (in range [base_,limit_])
105 char* last_sync_; // Where have we synced up to
106 uint64_t file_offset_; // Offset of base_ in file
107
108 // Have we done an munmap of unsynced data?
109 bool pending_sync_;
110
111 // Roundup x to a multiple of y
Roundup(size_t x,size_t y)112 static size_t Roundup(size_t x, size_t y) {
113 return ((x + y - 1) / y) * y;
114 }
115
TruncateToPageBoundary(size_t s)116 size_t TruncateToPageBoundary(size_t s) {
117 s -= (s & (page_size_ - 1));
118 assert((s % page_size_) == 0);
119 return s;
120 }
121
UnmapCurrentRegion()122 bool UnmapCurrentRegion() {
123 bool result = true;
124 if (base_ != NULL) {
125 if (last_sync_ < limit_) {
126 // Defer syncing this data until next Sync() call, if any
127 pending_sync_ = true;
128 }
129 if (munmap(base_, limit_ - base_) != 0) {
130 result = false;
131 }
132 file_offset_ += limit_ - base_;
133 base_ = NULL;
134 limit_ = NULL;
135 last_sync_ = NULL;
136 dst_ = NULL;
137
138 // Increase the amount we map the next time, but capped at 1MB
139 if (map_size_ < (1<<20)) {
140 map_size_ *= 2;
141 }
142 }
143 return result;
144 }
145
MapNewRegion()146 bool MapNewRegion() {
147 assert(base_ == NULL);
148 if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
149 return false;
150 }
151 void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
152 fd_, file_offset_);
153 if (ptr == MAP_FAILED) {
154 return false;
155 }
156 base_ = reinterpret_cast<char*>(ptr);
157 limit_ = base_ + map_size_;
158 dst_ = base_;
159 last_sync_ = base_;
160 return true;
161 }
162
163 public:
PosixMmapFile(const std::string & fname,int fd,size_t page_size)164 PosixMmapFile(const std::string& fname, int fd, size_t page_size)
165 : filename_(fname),
166 fd_(fd),
167 page_size_(page_size),
168 map_size_(Roundup(65536, page_size)),
169 base_(NULL),
170 limit_(NULL),
171 dst_(NULL),
172 last_sync_(NULL),
173 file_offset_(0),
174 pending_sync_(false) {
175 assert((page_size & (page_size - 1)) == 0);
176 }
177
178
~PosixMmapFile()179 ~PosixMmapFile() {
180 if (fd_ >= 0) {
181 PosixMmapFile::Close();
182 }
183 }
184
Append(const Slice & data)185 virtual Status Append(const Slice& data) {
186 const char* src = data.data();
187 size_t left = data.size();
188 while (left > 0) {
189 assert(base_ <= dst_);
190 assert(dst_ <= limit_);
191 size_t avail = limit_ - dst_;
192 if (avail == 0) {
193 if (!UnmapCurrentRegion() ||
194 !MapNewRegion()) {
195 return IOError(filename_, errno);
196 }
197 }
198
199 size_t n = (left <= avail) ? left : avail;
200 memcpy(dst_, src, n);
201 dst_ += n;
202 src += n;
203 left -= n;
204 }
205 return Status::OK();
206 }
207
Close()208 virtual Status Close() {
209 Status s;
210 size_t unused = limit_ - dst_;
211 if (!UnmapCurrentRegion()) {
212 s = IOError(filename_, errno);
213 } else if (unused > 0) {
214 // Trim the extra space at the end of the file
215 if (ftruncate(fd_, file_offset_ - unused) < 0) {
216 s = IOError(filename_, errno);
217 }
218 }
219
220 if (close(fd_) < 0) {
221 if (s.ok()) {
222 s = IOError(filename_, errno);
223 }
224 }
225
226 fd_ = -1;
227 base_ = NULL;
228 limit_ = NULL;
229 return s;
230 }
231
Flush()232 virtual Status Flush() {
233 return Status::OK();
234 }
235
Sync()236 virtual Status Sync() {
237 Status s;
238
239 if (pending_sync_) {
240 // Some unmapped data was not synced
241 pending_sync_ = false;
242 if (fdatasync(fd_) < 0) {
243 s = IOError(filename_, errno);
244 }
245 }
246
247 if (dst_ > last_sync_) {
248 // Find the beginnings of the pages that contain the first and last
249 // bytes to be synced.
250 size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
251 size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
252 last_sync_ = dst_;
253 if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
254 s = IOError(filename_, errno);
255 }
256 }
257
258 return s;
259 }
260 };
261
LockOrUnlock(int fd,bool lock)262 static int LockOrUnlock(int fd, bool lock) {
263 errno = 0;
264 struct flock f;
265 memset(&f, 0, sizeof(f));
266 f.l_type = (lock ? F_WRLCK : F_UNLCK);
267 f.l_whence = SEEK_SET;
268 f.l_start = 0;
269 f.l_len = 0; // Lock/unlock entire file
270 return fcntl(fd, F_SETLK, &f);
271 }
272
273 class PosixFileLock : public FileLock {
274 public:
275 int fd_;
276 };
277
278 class PosixEnv : public Env {
279 public:
280 PosixEnv();
~PosixEnv()281 virtual ~PosixEnv() {
282 fprintf(stderr, "Destroying Env::Default()\n");
283 exit(1);
284 }
285
NewSequentialFile(const std::string & fname,SequentialFile ** result)286 virtual Status NewSequentialFile(const std::string& fname,
287 SequentialFile** result) {
288 FILE* f = fopen(fname.c_str(), "r");
289 if (f == NULL) {
290 *result = NULL;
291 return IOError(fname, errno);
292 } else {
293 *result = new PosixSequentialFile(fname, f);
294 return Status::OK();
295 }
296 }
297
NewRandomAccessFile(const std::string & fname,RandomAccessFile ** result)298 virtual Status NewRandomAccessFile(const std::string& fname,
299 RandomAccessFile** result) {
300 int fd = open(fname.c_str(), O_RDONLY);
301 if (fd < 0) {
302 *result = NULL;
303 return IOError(fname, errno);
304 }
305 *result = new PosixRandomAccessFile(fname, fd);
306 return Status::OK();
307 }
308
NewWritableFile(const std::string & fname,WritableFile ** result)309 virtual Status NewWritableFile(const std::string& fname,
310 WritableFile** result) {
311 Status s;
312 const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
313 if (fd < 0) {
314 *result = NULL;
315 s = IOError(fname, errno);
316 } else {
317 *result = new PosixMmapFile(fname, fd, page_size_);
318 }
319 return s;
320 }
321
FileExists(const std::string & fname)322 virtual bool FileExists(const std::string& fname) {
323 return access(fname.c_str(), F_OK) == 0;
324 }
325
GetChildren(const std::string & dir,std::vector<std::string> * result)326 virtual Status GetChildren(const std::string& dir,
327 std::vector<std::string>* result) {
328 result->clear();
329 DIR* d = opendir(dir.c_str());
330 if (d == NULL) {
331 return IOError(dir, errno);
332 }
333 struct dirent* entry;
334 while ((entry = readdir(d)) != NULL) {
335 result->push_back(entry->d_name);
336 }
337 closedir(d);
338 return Status::OK();
339 }
340
DeleteFile(const std::string & fname)341 virtual Status DeleteFile(const std::string& fname) {
342 Status result;
343 if (unlink(fname.c_str()) != 0) {
344 result = IOError(fname, errno);
345 }
346 return result;
347 };
348
CreateDir(const std::string & name)349 virtual Status CreateDir(const std::string& name) {
350 Status result;
351 if (mkdir(name.c_str(), 0755) != 0) {
352 result = IOError(name, errno);
353 }
354 return result;
355 };
356
DeleteDir(const std::string & name)357 virtual Status DeleteDir(const std::string& name) {
358 Status result;
359 if (rmdir(name.c_str()) != 0) {
360 result = IOError(name, errno);
361 }
362 return result;
363 };
364
GetFileSize(const std::string & fname,uint64_t * size)365 virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
366 Status s;
367 struct stat sbuf;
368 if (stat(fname.c_str(), &sbuf) != 0) {
369 *size = 0;
370 s = IOError(fname, errno);
371 } else {
372 *size = sbuf.st_size;
373 }
374 return s;
375 }
376
RenameFile(const std::string & src,const std::string & target)377 virtual Status RenameFile(const std::string& src, const std::string& target) {
378 Status result;
379 if (rename(src.c_str(), target.c_str()) != 0) {
380 result = IOError(src, errno);
381 }
382 return result;
383 }
384
LockFile(const std::string & fname,FileLock ** lock)385 virtual Status LockFile(const std::string& fname, FileLock** lock) {
386 *lock = NULL;
387 Status result;
388 int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
389 if (fd < 0) {
390 result = IOError(fname, errno);
391 } else if (LockOrUnlock(fd, true) == -1) {
392 result = IOError("lock " + fname, errno);
393 close(fd);
394 } else {
395 PosixFileLock* my_lock = new PosixFileLock;
396 my_lock->fd_ = fd;
397 *lock = my_lock;
398 }
399 return result;
400 }
401
UnlockFile(FileLock * lock)402 virtual Status UnlockFile(FileLock* lock) {
403 PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
404 Status result;
405 if (LockOrUnlock(my_lock->fd_, false) == -1) {
406 result = IOError("unlock", errno);
407 }
408 close(my_lock->fd_);
409 delete my_lock;
410 return result;
411 }
412
413 virtual void Schedule(void (*function)(void*), void* arg);
414
415 virtual void StartThread(void (*function)(void* arg), void* arg);
416
GetTestDirectory(std::string * result)417 virtual Status GetTestDirectory(std::string* result) {
418 const char* env = getenv("TEST_TMPDIR");
419 if (env && env[0] != '\0') {
420 *result = env;
421 } else {
422 char buf[100];
423 snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
424 *result = buf;
425 }
426 // Directory may already exist
427 CreateDir(*result);
428 return Status::OK();
429 }
430
gettid()431 static uint64_t gettid() {
432 pthread_t tid = pthread_self();
433 uint64_t thread_id = 0;
434 memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
435 return thread_id;
436 }
437
NewLogger(const std::string & fname,Logger ** result)438 virtual Status NewLogger(const std::string& fname, Logger** result) {
439 FILE* f = fopen(fname.c_str(), "w");
440 if (f == NULL) {
441 *result = NULL;
442 return IOError(fname, errno);
443 } else {
444 *result = new PosixLogger(f, &PosixEnv::gettid);
445 return Status::OK();
446 }
447 }
448
NowMicros()449 virtual uint64_t NowMicros() {
450 struct timeval tv;
451 gettimeofday(&tv, NULL);
452 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
453 }
454
SleepForMicroseconds(int micros)455 virtual void SleepForMicroseconds(int micros) {
456 usleep(micros);
457 }
458
459 private:
PthreadCall(const char * label,int result)460 void PthreadCall(const char* label, int result) {
461 if (result != 0) {
462 fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
463 exit(1);
464 }
465 }
466
467 // BGThread() is the body of the background thread
468 void BGThread();
BGThreadWrapper(void * arg)469 static void* BGThreadWrapper(void* arg) {
470 reinterpret_cast<PosixEnv*>(arg)->BGThread();
471 return NULL;
472 }
473
474 size_t page_size_;
475 pthread_mutex_t mu_;
476 pthread_cond_t bgsignal_;
477 pthread_t bgthread_;
478 bool started_bgthread_;
479
480 // Entry per Schedule() call
481 struct BGItem { void* arg; void (*function)(void*); };
482 typedef std::deque<BGItem> BGQueue;
483 BGQueue queue_;
484 };
485
PosixEnv()486 PosixEnv::PosixEnv() : page_size_(getpagesize()),
487 started_bgthread_(false) {
488 PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
489 PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
490 }
491
Schedule(void (* function)(void *),void * arg)492 void PosixEnv::Schedule(void (*function)(void*), void* arg) {
493 PthreadCall("lock", pthread_mutex_lock(&mu_));
494
495 // Start background thread if necessary
496 if (!started_bgthread_) {
497 started_bgthread_ = true;
498 PthreadCall(
499 "create thread",
500 pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
501 }
502
503 // If the queue is currently empty, the background thread may currently be
504 // waiting.
505 if (queue_.empty()) {
506 PthreadCall("signal", pthread_cond_signal(&bgsignal_));
507 }
508
509 // Add to priority queue
510 queue_.push_back(BGItem());
511 queue_.back().function = function;
512 queue_.back().arg = arg;
513
514 PthreadCall("unlock", pthread_mutex_unlock(&mu_));
515 }
516
BGThread()517 void PosixEnv::BGThread() {
518 while (true) {
519 // Wait until there is an item that is ready to run
520 PthreadCall("lock", pthread_mutex_lock(&mu_));
521 while (queue_.empty()) {
522 PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
523 }
524
525 void (*function)(void*) = queue_.front().function;
526 void* arg = queue_.front().arg;
527 queue_.pop_front();
528
529 PthreadCall("unlock", pthread_mutex_unlock(&mu_));
530 (*function)(arg);
531 }
532 }
533
534 namespace {
535 struct StartThreadState {
536 void (*user_function)(void*);
537 void* arg;
538 };
539 }
StartThreadWrapper(void * arg)540 static void* StartThreadWrapper(void* arg) {
541 StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
542 state->user_function(state->arg);
543 delete state;
544 return NULL;
545 }
546
StartThread(void (* function)(void * arg),void * arg)547 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
548 pthread_t t;
549 StartThreadState* state = new StartThreadState;
550 state->user_function = function;
551 state->arg = arg;
552 PthreadCall("start thread",
553 pthread_create(&t, NULL, &StartThreadWrapper, state));
554 }
555
556 }
557
558 static pthread_once_t once = PTHREAD_ONCE_INIT;
559 static Env* default_env;
InitDefaultEnv()560 static void InitDefaultEnv() { default_env = new PosixEnv; }
561
Default()562 Env* Env::Default() {
563 pthread_once(&once, InitDefaultEnv);
564 return default_env;
565 }
566
567 }
568