1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6
7 #include "rocksdb/env.h"
8 #include "hdfs/env_hdfs.h"
9
10 #ifdef USE_HDFS
11 #ifndef ROCKSDB_HDFS_FILE_C
12 #define ROCKSDB_HDFS_FILE_C
13
14 #include <stdio.h>
15 #include <time.h>
16 #include <algorithm>
17 #include <iostream>
18 #include <sstream>
19 #include "logging/logging.h"
20 #include "rocksdb/status.h"
21 #include "util/string_util.h"
22
23 #define HDFS_EXISTS 0
24 #define HDFS_DOESNT_EXIST -1
25 #define HDFS_SUCCESS 0
26
27 //
28 // This file defines an HDFS environment for rocksdb. It uses the libhdfs
29 // api to access HDFS. All HDFS files created by one instance of rocksdb
30 // will reside on the same HDFS cluster.
31 //
32
33 namespace ROCKSDB_NAMESPACE {
34
35 namespace {
36
37 // Log error message
IOError(const std::string & context,int err_number)38 static Status IOError(const std::string& context, int err_number) {
39 return (err_number == ENOSPC)
40 ? Status::NoSpace(context, errnoStr(err_number).c_str())
41 : (err_number == ENOENT)
42 ? Status::PathNotFound(context, errnoStr(err_number).c_str())
43 : Status::IOError(context, errnoStr(err_number).c_str());
44 }
45
46 // assume that there is one global logger for now. It is not thread-safe,
47 // but need not be because the logger is initialized at db-open time.
48 static Logger* mylog = nullptr;
49
50 // Used for reading a file from HDFS. It implements both sequential-read
51 // access methods as well as random read access methods.
52 class HdfsReadableFile : virtual public SequentialFile,
53 virtual public RandomAccessFile {
54 private:
55 hdfsFS fileSys_;
56 std::string filename_;
57 hdfsFile hfile_;
58
59 public:
HdfsReadableFile(hdfsFS fileSys,const std::string & fname)60 HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
61 : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
62 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
63 filename_.c_str());
64 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
65 ROCKS_LOG_DEBUG(mylog,
66 "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
67 filename_.c_str(), hfile_);
68 }
69
~HdfsReadableFile()70 virtual ~HdfsReadableFile() {
71 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
72 filename_.c_str());
73 hdfsCloseFile(fileSys_, hfile_);
74 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
75 filename_.c_str());
76 hfile_ = nullptr;
77 }
78
isValid()79 bool isValid() {
80 return hfile_ != nullptr;
81 }
82
83 // sequential access, read data at current offset in file
Read(size_t n,Slice * result,char * scratch)84 virtual Status Read(size_t n, Slice* result, char* scratch) {
85 Status s;
86 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
87 filename_.c_str(), n);
88
89 char* buffer = scratch;
90 size_t total_bytes_read = 0;
91 tSize bytes_read = 0;
92 tSize remaining_bytes = (tSize)n;
93
94 // Read a total of n bytes repeatedly until we hit error or eof
95 while (remaining_bytes > 0) {
96 bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
97 if (bytes_read <= 0) {
98 break;
99 }
100 assert(bytes_read <= remaining_bytes);
101
102 total_bytes_read += bytes_read;
103 remaining_bytes -= bytes_read;
104 buffer += bytes_read;
105 }
106 assert(total_bytes_read <= n);
107
108 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
109 filename_.c_str());
110
111 if (bytes_read < 0) {
112 s = IOError(filename_, errno);
113 } else {
114 *result = Slice(scratch, total_bytes_read);
115 }
116
117 return s;
118 }
119
120 // random access, read data from specified offset in file
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const121 virtual Status Read(uint64_t offset, size_t n, Slice* result,
122 char* scratch) const {
123 Status s;
124 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
125 filename_.c_str());
126 tSize bytes_read =
127 hdfsPread(fileSys_, hfile_, offset, static_cast<void*>(scratch),
128 static_cast<tSize>(n));
129 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
130 filename_.c_str());
131 *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
132 if (bytes_read < 0) {
133 // An error: return a non-ok status
134 s = IOError(filename_, errno);
135 }
136 return s;
137 }
138
Skip(uint64_t n)139 virtual Status Skip(uint64_t n) {
140 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
141 filename_.c_str());
142 // get current offset from file
143 tOffset current = hdfsTell(fileSys_, hfile_);
144 if (current < 0) {
145 return IOError(filename_, errno);
146 }
147 // seek to new offset in file
148 tOffset newoffset = current + n;
149 int val = hdfsSeek(fileSys_, hfile_, newoffset);
150 if (val < 0) {
151 return IOError(filename_, errno);
152 }
153 return Status::OK();
154 }
155
156 private:
157
158 // returns true if we are at the end of file, false otherwise
feof()159 bool feof() {
160 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
161 filename_.c_str());
162 if (hdfsTell(fileSys_, hfile_) == fileSize()) {
163 return true;
164 }
165 return false;
166 }
167
168 // the current size of the file
fileSize()169 tOffset fileSize() {
170 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
171 filename_.c_str());
172 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
173 tOffset size = 0L;
174 if (pFileInfo != nullptr) {
175 size = pFileInfo->mSize;
176 hdfsFreeFileInfo(pFileInfo, 1);
177 } else {
178 throw HdfsFatalException("fileSize on unknown file " + filename_);
179 }
180 return size;
181 }
182 };
183
184 // Appends to an existing file in HDFS.
185 class HdfsWritableFile: public WritableFile {
186 private:
187 hdfsFS fileSys_;
188 std::string filename_;
189 hdfsFile hfile_;
190
191 public:
HdfsWritableFile(hdfsFS fileSys,const std::string & fname,const EnvOptions & options)192 HdfsWritableFile(hdfsFS fileSys, const std::string& fname,
193 const EnvOptions& options)
194 : WritableFile(options),
195 fileSys_(fileSys),
196 filename_(fname),
197 hfile_(nullptr) {
198 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
199 filename_.c_str());
200 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
201 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
202 filename_.c_str());
203 assert(hfile_ != nullptr);
204 }
~HdfsWritableFile()205 virtual ~HdfsWritableFile() {
206 if (hfile_ != nullptr) {
207 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
208 filename_.c_str());
209 hdfsCloseFile(fileSys_, hfile_);
210 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
211 filename_.c_str());
212 hfile_ = nullptr;
213 }
214 }
215
216 using WritableFile::Append;
217
218 // If the file was successfully created, then this returns true.
219 // Otherwise returns false.
isValid()220 bool isValid() {
221 return hfile_ != nullptr;
222 }
223
224 // The name of the file, mostly needed for debug logging.
getName()225 const std::string& getName() {
226 return filename_;
227 }
228
Append(const Slice & data)229 virtual Status Append(const Slice& data) {
230 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
231 filename_.c_str());
232 const char* src = data.data();
233 size_t left = data.size();
234 size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
235 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
236 filename_.c_str());
237 if (ret != left) {
238 return IOError(filename_, errno);
239 }
240 return Status::OK();
241 }
242
Flush()243 virtual Status Flush() {
244 return Status::OK();
245 }
246
Sync()247 virtual Status Sync() {
248 Status s;
249 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
250 filename_.c_str());
251 if (hdfsFlush(fileSys_, hfile_) == -1) {
252 return IOError(filename_, errno);
253 }
254 if (hdfsHSync(fileSys_, hfile_) == -1) {
255 return IOError(filename_, errno);
256 }
257 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
258 filename_.c_str());
259 return Status::OK();
260 }
261
262 // This is used by HdfsLogger to write data to the debug log file
Append(const char * src,size_t size)263 virtual Status Append(const char* src, size_t size) {
264 if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
265 static_cast<tSize>(size)) {
266 return IOError(filename_, errno);
267 }
268 return Status::OK();
269 }
270
Close()271 virtual Status Close() {
272 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
273 filename_.c_str());
274 if (hdfsCloseFile(fileSys_, hfile_) != 0) {
275 return IOError(filename_, errno);
276 }
277 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
278 filename_.c_str());
279 hfile_ = nullptr;
280 return Status::OK();
281 }
282 };
283
284 // The object that implements the debug logs to reside in HDFS.
285 class HdfsLogger : public Logger {
286 private:
287 HdfsWritableFile* file_;
288 uint64_t (*gettid_)(); // Return the thread id for the current thread
289
HdfsCloseHelper()290 Status HdfsCloseHelper() {
291 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
292 file_->getName().c_str());
293 if (mylog != nullptr && mylog == this) {
294 mylog = nullptr;
295 }
296 return Status::OK();
297 }
298
299 protected:
CloseImpl()300 virtual Status CloseImpl() override { return HdfsCloseHelper(); }
301
302 public:
HdfsLogger(HdfsWritableFile * f,uint64_t (* gettid)())303 HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
304 : file_(f), gettid_(gettid) {
305 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
306 file_->getName().c_str());
307 }
308
~HdfsLogger()309 ~HdfsLogger() override {
310 if (!closed_) {
311 closed_ = true;
312 HdfsCloseHelper();
313 }
314 }
315
316 using Logger::Logv;
Logv(const char * format,va_list ap)317 void Logv(const char* format, va_list ap) override {
318 const uint64_t thread_id = (*gettid_)();
319
320 // We try twice: the first time with a fixed-size stack allocated buffer,
321 // and the second time with a much larger dynamically allocated buffer.
322 char buffer[500];
323 for (int iter = 0; iter < 2; iter++) {
324 char* base;
325 int bufsize;
326 if (iter == 0) {
327 bufsize = sizeof(buffer);
328 base = buffer;
329 } else {
330 bufsize = 30000;
331 base = new char[bufsize];
332 }
333 char* p = base;
334 char* limit = base + bufsize;
335
336 struct timeval now_tv;
337 gettimeofday(&now_tv, nullptr);
338 const time_t seconds = now_tv.tv_sec;
339 struct tm t;
340 localtime_r(&seconds, &t);
341 p += snprintf(p, limit - p,
342 "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
343 t.tm_year + 1900,
344 t.tm_mon + 1,
345 t.tm_mday,
346 t.tm_hour,
347 t.tm_min,
348 t.tm_sec,
349 static_cast<int>(now_tv.tv_usec),
350 static_cast<long long unsigned int>(thread_id));
351
352 // Print the message
353 if (p < limit) {
354 va_list backup_ap;
355 va_copy(backup_ap, ap);
356 p += vsnprintf(p, limit - p, format, backup_ap);
357 va_end(backup_ap);
358 }
359
360 // Truncate to available space if necessary
361 if (p >= limit) {
362 if (iter == 0) {
363 continue; // Try again with larger buffer
364 } else {
365 p = limit - 1;
366 }
367 }
368
369 // Add newline if necessary
370 if (p == base || p[-1] != '\n') {
371 *p++ = '\n';
372 }
373
374 assert(p <= limit);
375 file_->Append(base, p-base);
376 file_->Flush();
377 if (base != buffer) {
378 delete[] base;
379 }
380 break;
381 }
382 }
383 };
384
385 } // namespace
386
387 // Finally, the hdfs environment
388
389 const std::string HdfsEnv::kProto = "hdfs://";
390 const std::string HdfsEnv::pathsep = "/";
391
392 // open a file for sequential reading
NewSequentialFile(const std::string & fname,std::unique_ptr<SequentialFile> * result,const EnvOptions &)393 Status HdfsEnv::NewSequentialFile(const std::string& fname,
394 std::unique_ptr<SequentialFile>* result,
395 const EnvOptions& /*options*/) {
396 result->reset();
397 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
398 if (f == nullptr || !f->isValid()) {
399 delete f;
400 *result = nullptr;
401 return IOError(fname, errno);
402 }
403 result->reset(dynamic_cast<SequentialFile*>(f));
404 return Status::OK();
405 }
406
407 // open a file for random reading
NewRandomAccessFile(const std::string & fname,std::unique_ptr<RandomAccessFile> * result,const EnvOptions &)408 Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
409 std::unique_ptr<RandomAccessFile>* result,
410 const EnvOptions& /*options*/) {
411 result->reset();
412 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
413 if (f == nullptr || !f->isValid()) {
414 delete f;
415 *result = nullptr;
416 return IOError(fname, errno);
417 }
418 result->reset(dynamic_cast<RandomAccessFile*>(f));
419 return Status::OK();
420 }
421
422 // create a new file for writing
NewWritableFile(const std::string & fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)423 Status HdfsEnv::NewWritableFile(const std::string& fname,
424 std::unique_ptr<WritableFile>* result,
425 const EnvOptions& options) {
426 result->reset();
427 Status s;
428 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
429 if (f == nullptr || !f->isValid()) {
430 delete f;
431 *result = nullptr;
432 return IOError(fname, errno);
433 }
434 result->reset(dynamic_cast<WritableFile*>(f));
435 return Status::OK();
436 }
437
438 class HdfsDirectory : public Directory {
439 public:
HdfsDirectory(int fd)440 explicit HdfsDirectory(int fd) : fd_(fd) {}
~HdfsDirectory()441 ~HdfsDirectory() {}
442
Fsync()443 Status Fsync() override { return Status::OK(); }
444
GetFd() const445 int GetFd() const { return fd_; }
446
447 private:
448 int fd_;
449 };
450
NewDirectory(const std::string & name,std::unique_ptr<Directory> * result)451 Status HdfsEnv::NewDirectory(const std::string& name,
452 std::unique_ptr<Directory>* result) {
453 int value = hdfsExists(fileSys_, name.c_str());
454 switch (value) {
455 case HDFS_EXISTS:
456 result->reset(new HdfsDirectory(0));
457 return Status::OK();
458 default: // fail if the directory doesn't exist
459 ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
460 throw HdfsFatalException("hdfsExists call failed with error " +
461 ToString(value) + " on path " + name +
462 ".\n");
463 }
464 }
465
FileExists(const std::string & fname)466 Status HdfsEnv::FileExists(const std::string& fname) {
467 int value = hdfsExists(fileSys_, fname.c_str());
468 switch (value) {
469 case HDFS_EXISTS:
470 return Status::OK();
471 case HDFS_DOESNT_EXIST:
472 return Status::NotFound();
473 default: // anything else should be an error
474 ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
475 return Status::IOError("hdfsExists call failed with error " +
476 ToString(value) + " on path " + fname + ".\n");
477 }
478 }
479
GetChildren(const std::string & path,std::vector<std::string> * result)480 Status HdfsEnv::GetChildren(const std::string& path,
481 std::vector<std::string>* result) {
482 int value = hdfsExists(fileSys_, path.c_str());
483 switch (value) {
484 case HDFS_EXISTS: { // directory exists
485 int numEntries = 0;
486 hdfsFileInfo* pHdfsFileInfo = 0;
487 pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
488 if (numEntries >= 0) {
489 for(int i = 0; i < numEntries; i++) {
490 std::string pathname(pHdfsFileInfo[i].mName);
491 size_t pos = pathname.rfind("/");
492 if (std::string::npos != pos) {
493 result->push_back(pathname.substr(pos + 1));
494 }
495 }
496 if (pHdfsFileInfo != nullptr) {
497 hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
498 }
499 } else {
500 // numEntries < 0 indicates error
501 ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
502 throw HdfsFatalException(
503 "hdfsListDirectory call failed negative error.\n");
504 }
505 break;
506 }
507 case HDFS_DOESNT_EXIST: // directory does not exist, exit
508 return Status::NotFound();
509 default: // anything else should be an error
510 ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
511 throw HdfsFatalException("hdfsExists call failed with error " +
512 ToString(value) + ".\n");
513 }
514 return Status::OK();
515 }
516
DeleteFile(const std::string & fname)517 Status HdfsEnv::DeleteFile(const std::string& fname) {
518 if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
519 return Status::OK();
520 }
521 return IOError(fname, errno);
522 };
523
CreateDir(const std::string & name)524 Status HdfsEnv::CreateDir(const std::string& name) {
525 if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
526 return Status::OK();
527 }
528 return IOError(name, errno);
529 };
530
CreateDirIfMissing(const std::string & name)531 Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
532 const int value = hdfsExists(fileSys_, name.c_str());
533 // Not atomic. state might change b/w hdfsExists and CreateDir.
534 switch (value) {
535 case HDFS_EXISTS:
536 return Status::OK();
537 case HDFS_DOESNT_EXIST:
538 return CreateDir(name);
539 default: // anything else should be an error
540 ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
541 throw HdfsFatalException("hdfsExists call failed with error " +
542 ToString(value) + ".\n");
543 }
544 };
545
DeleteDir(const std::string & name)546 Status HdfsEnv::DeleteDir(const std::string& name) {
547 return DeleteFile(name);
548 };
549
GetFileSize(const std::string & fname,uint64_t * size)550 Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
551 *size = 0L;
552 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
553 if (pFileInfo != nullptr) {
554 *size = pFileInfo->mSize;
555 hdfsFreeFileInfo(pFileInfo, 1);
556 return Status::OK();
557 }
558 return IOError(fname, errno);
559 }
560
GetFileModificationTime(const std::string & fname,uint64_t * time)561 Status HdfsEnv::GetFileModificationTime(const std::string& fname,
562 uint64_t* time) {
563 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
564 if (pFileInfo != nullptr) {
565 *time = static_cast<uint64_t>(pFileInfo->mLastMod);
566 hdfsFreeFileInfo(pFileInfo, 1);
567 return Status::OK();
568 }
569 return IOError(fname, errno);
570
571 }
572
573 // The rename is not atomic. HDFS does not allow a renaming if the
574 // target already exists. So, we delete the target before attempting the
575 // rename.
RenameFile(const std::string & src,const std::string & target)576 Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
577 hdfsDelete(fileSys_, target.c_str(), 1);
578 if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
579 return Status::OK();
580 }
581 return IOError(src, errno);
582 }
583
LockFile(const std::string &,FileLock ** lock)584 Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
585 // there isn's a very good way to atomically check and create
586 // a file via libhdfs
587 *lock = nullptr;
588 return Status::OK();
589 }
590
UnlockFile(FileLock *)591 Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
592
NewLogger(const std::string & fname,std::shared_ptr<Logger> * result)593 Status HdfsEnv::NewLogger(const std::string& fname,
594 std::shared_ptr<Logger>* result) {
595 // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That
596 // option is only intended for WAL/flush/compaction writes, so turn it off in
597 // the logger.
598 EnvOptions options;
599 options.strict_bytes_per_sync = false;
600 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
601 if (f == nullptr || !f->isValid()) {
602 delete f;
603 *result = nullptr;
604 return IOError(fname, errno);
605 }
606 HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
607 result->reset(h);
608 if (mylog == nullptr) {
609 // mylog = h; // uncomment this for detailed logging
610 }
611 return Status::OK();
612 }
613
IsDirectory(const std::string & path,bool * is_dir)614 Status HdfsEnv::IsDirectory(const std::string& path, bool* is_dir) {
615 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, path.c_str());
616 if (pFileInfo != nullptr) {
617 if (is_dir != nullptr) {
618 *is_dir = (pFileInfo->mKind == kObjectKindDirectory);
619 }
620 hdfsFreeFileInfo(pFileInfo, 1);
621 return Status::OK();
622 }
623 return IOError(path, errno);
624 }
625
626 // The factory method for creating an HDFS Env
NewHdfsEnv(Env ** hdfs_env,const std::string & fsname)627 Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
628 *hdfs_env = new HdfsEnv(fsname);
629 return Status::OK();
630 }
631 } // namespace ROCKSDB_NAMESPACE
632
633 #endif // ROCKSDB_HDFS_FILE_C
634
635 #else // USE_HDFS
636
637 // dummy placeholders used when HDFS is not available
638 namespace ROCKSDB_NAMESPACE {
NewSequentialFile(const std::string &,std::unique_ptr<SequentialFile> *,const EnvOptions &)639 Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
640 std::unique_ptr<SequentialFile>* /*result*/,
641 const EnvOptions& /*options*/) {
642 return Status::NotSupported("Not compiled with hdfs support");
643 }
644
NewHdfsEnv(Env **,const std::string &)645 Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
646 return Status::NotSupported("Not compiled with hdfs support");
647 }
648 } // namespace ROCKSDB_NAMESPACE
649
650 #endif
651