1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6
7 #include "rocksdb/env.h"
8 #include "hdfs/env_hdfs.h"
9
10 #ifdef USE_HDFS
11 #ifndef ROCKSDB_HDFS_FILE_C
12 #define ROCKSDB_HDFS_FILE_C
13
14 #include <stdio.h>
15 #include <sys/time.h>
16 #include <time.h>
17 #include <algorithm>
18 #include <iostream>
19 #include <sstream>
20 #include "logging/logging.h"
21 #include "rocksdb/status.h"
22 #include "util/string_util.h"
23
24 #define HDFS_EXISTS 0
25 #define HDFS_DOESNT_EXIST -1
26 #define HDFS_SUCCESS 0
27
28 //
29 // This file defines an HDFS environment for rocksdb. It uses the libhdfs
30 // api to access HDFS. All HDFS files created by one instance of rocksdb
31 // will reside on the same HDFS cluster.
32 //
33
34 namespace ROCKSDB_NAMESPACE {
35
36 namespace {
37
38 // Log error message
IOError(const std::string & context,int err_number)39 static Status IOError(const std::string& context, int err_number) {
40 return (err_number == ENOSPC)
41 ? Status::NoSpace(context, strerror(err_number))
42 : (err_number == ENOENT)
43 ? Status::PathNotFound(context, strerror(err_number))
44 : Status::IOError(context, strerror(err_number));
45 }
46
47 // assume that there is one global logger for now. It is not thread-safe,
48 // but need not be because the logger is initialized at db-open time.
49 static Logger* mylog = nullptr;
50
51 // Used for reading a file from HDFS. It implements both sequential-read
52 // access methods as well as random read access methods.
53 class HdfsReadableFile : virtual public SequentialFile,
54 virtual public RandomAccessFile {
55 private:
56 hdfsFS fileSys_;
57 std::string filename_;
58 hdfsFile hfile_;
59
60 public:
HdfsReadableFile(hdfsFS fileSys,const std::string & fname)61 HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
62 : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
63 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
64 filename_.c_str());
65 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
66 ROCKS_LOG_DEBUG(mylog,
67 "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
68 filename_.c_str(), hfile_);
69 }
70
~HdfsReadableFile()71 virtual ~HdfsReadableFile() {
72 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
73 filename_.c_str());
74 hdfsCloseFile(fileSys_, hfile_);
75 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
76 filename_.c_str());
77 hfile_ = nullptr;
78 }
79
isValid()80 bool isValid() {
81 return hfile_ != nullptr;
82 }
83
84 // sequential access, read data at current offset in file
Read(size_t n,Slice * result,char * scratch)85 virtual Status Read(size_t n, Slice* result, char* scratch) {
86 Status s;
87 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
88 filename_.c_str(), n);
89
90 char* buffer = scratch;
91 size_t total_bytes_read = 0;
92 tSize bytes_read = 0;
93 tSize remaining_bytes = (tSize)n;
94
95 // Read a total of n bytes repeatedly until we hit error or eof
96 while (remaining_bytes > 0) {
97 bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
98 if (bytes_read <= 0) {
99 break;
100 }
101 assert(bytes_read <= remaining_bytes);
102
103 total_bytes_read += bytes_read;
104 remaining_bytes -= bytes_read;
105 buffer += bytes_read;
106 }
107 assert(total_bytes_read <= n);
108
109 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
110 filename_.c_str());
111
112 if (bytes_read < 0) {
113 s = IOError(filename_, errno);
114 } else {
115 *result = Slice(scratch, total_bytes_read);
116 }
117
118 return s;
119 }
120
121 // random access, read data from specified offset in file
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const122 virtual Status Read(uint64_t offset, size_t n, Slice* result,
123 char* scratch) const {
124 Status s;
125 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
126 filename_.c_str());
127 ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
128 (void*)scratch, (tSize)n);
129 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
130 filename_.c_str());
131 *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
132 if (bytes_read < 0) {
133 // An error: return a non-ok status
134 s = IOError(filename_, errno);
135 }
136 return s;
137 }
138
Skip(uint64_t n)139 virtual Status Skip(uint64_t n) {
140 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
141 filename_.c_str());
142 // get current offset from file
143 tOffset current = hdfsTell(fileSys_, hfile_);
144 if (current < 0) {
145 return IOError(filename_, errno);
146 }
147 // seek to new offset in file
148 tOffset newoffset = current + n;
149 int val = hdfsSeek(fileSys_, hfile_, newoffset);
150 if (val < 0) {
151 return IOError(filename_, errno);
152 }
153 return Status::OK();
154 }
155
156 private:
157
158 // returns true if we are at the end of file, false otherwise
feof()159 bool feof() {
160 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
161 filename_.c_str());
162 if (hdfsTell(fileSys_, hfile_) == fileSize()) {
163 return true;
164 }
165 return false;
166 }
167
168 // the current size of the file
fileSize()169 tOffset fileSize() {
170 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
171 filename_.c_str());
172 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
173 tOffset size = 0L;
174 if (pFileInfo != nullptr) {
175 size = pFileInfo->mSize;
176 hdfsFreeFileInfo(pFileInfo, 1);
177 } else {
178 throw HdfsFatalException("fileSize on unknown file " + filename_);
179 }
180 return size;
181 }
182 };
183
184 // Appends to an existing file in HDFS.
185 class HdfsWritableFile: public WritableFile {
186 private:
187 hdfsFS fileSys_;
188 std::string filename_;
189 hdfsFile hfile_;
190
191 public:
HdfsWritableFile(hdfsFS fileSys,const std::string & fname,const EnvOptions & options)192 HdfsWritableFile(hdfsFS fileSys, const std::string& fname,
193 const EnvOptions& options)
194 : WritableFile(options),
195 fileSys_(fileSys),
196 filename_(fname),
197 hfile_(nullptr) {
198 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
199 filename_.c_str());
200 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
201 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
202 filename_.c_str());
203 assert(hfile_ != nullptr);
204 }
~HdfsWritableFile()205 virtual ~HdfsWritableFile() {
206 if (hfile_ != nullptr) {
207 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
208 filename_.c_str());
209 hdfsCloseFile(fileSys_, hfile_);
210 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
211 filename_.c_str());
212 hfile_ = nullptr;
213 }
214 }
215
216 // If the file was successfully created, then this returns true.
217 // Otherwise returns false.
isValid()218 bool isValid() {
219 return hfile_ != nullptr;
220 }
221
222 // The name of the file, mostly needed for debug logging.
getName()223 const std::string& getName() {
224 return filename_;
225 }
226
Append(const Slice & data)227 virtual Status Append(const Slice& data) {
228 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
229 filename_.c_str());
230 const char* src = data.data();
231 size_t left = data.size();
232 size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
233 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
234 filename_.c_str());
235 if (ret != left) {
236 return IOError(filename_, errno);
237 }
238 return Status::OK();
239 }
240
Flush()241 virtual Status Flush() {
242 return Status::OK();
243 }
244
Sync()245 virtual Status Sync() {
246 Status s;
247 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
248 filename_.c_str());
249 if (hdfsFlush(fileSys_, hfile_) == -1) {
250 return IOError(filename_, errno);
251 }
252 if (hdfsHSync(fileSys_, hfile_) == -1) {
253 return IOError(filename_, errno);
254 }
255 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
256 filename_.c_str());
257 return Status::OK();
258 }
259
260 // This is used by HdfsLogger to write data to the debug log file
Append(const char * src,size_t size)261 virtual Status Append(const char* src, size_t size) {
262 if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
263 static_cast<tSize>(size)) {
264 return IOError(filename_, errno);
265 }
266 return Status::OK();
267 }
268
Close()269 virtual Status Close() {
270 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
271 filename_.c_str());
272 if (hdfsCloseFile(fileSys_, hfile_) != 0) {
273 return IOError(filename_, errno);
274 }
275 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
276 filename_.c_str());
277 hfile_ = nullptr;
278 return Status::OK();
279 }
280 };
281
282 // The object that implements the debug logs to reside in HDFS.
283 class HdfsLogger : public Logger {
284 private:
285 HdfsWritableFile* file_;
286 uint64_t (*gettid_)(); // Return the thread id for the current thread
287
HdfsCloseHelper()288 Status HdfsCloseHelper() {
289 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
290 file_->getName().c_str());
291 if (mylog != nullptr && mylog == this) {
292 mylog = nullptr;
293 }
294 return Status::OK();
295 }
296
297 protected:
CloseImpl()298 virtual Status CloseImpl() override { return HdfsCloseHelper(); }
299
300 public:
HdfsLogger(HdfsWritableFile * f,uint64_t (* gettid)())301 HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
302 : file_(f), gettid_(gettid) {
303 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
304 file_->getName().c_str());
305 }
306
~HdfsLogger()307 ~HdfsLogger() override {
308 if (!closed_) {
309 closed_ = true;
310 HdfsCloseHelper();
311 }
312 }
313
314 using Logger::Logv;
Logv(const char * format,va_list ap)315 void Logv(const char* format, va_list ap) override {
316 const uint64_t thread_id = (*gettid_)();
317
318 // We try twice: the first time with a fixed-size stack allocated buffer,
319 // and the second time with a much larger dynamically allocated buffer.
320 char buffer[500];
321 for (int iter = 0; iter < 2; iter++) {
322 char* base;
323 int bufsize;
324 if (iter == 0) {
325 bufsize = sizeof(buffer);
326 base = buffer;
327 } else {
328 bufsize = 30000;
329 base = new char[bufsize];
330 }
331 char* p = base;
332 char* limit = base + bufsize;
333
334 struct timeval now_tv;
335 gettimeofday(&now_tv, nullptr);
336 const time_t seconds = now_tv.tv_sec;
337 struct tm t;
338 localtime_r(&seconds, &t);
339 p += snprintf(p, limit - p,
340 "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
341 t.tm_year + 1900,
342 t.tm_mon + 1,
343 t.tm_mday,
344 t.tm_hour,
345 t.tm_min,
346 t.tm_sec,
347 static_cast<int>(now_tv.tv_usec),
348 static_cast<long long unsigned int>(thread_id));
349
350 // Print the message
351 if (p < limit) {
352 va_list backup_ap;
353 va_copy(backup_ap, ap);
354 p += vsnprintf(p, limit - p, format, backup_ap);
355 va_end(backup_ap);
356 }
357
358 // Truncate to available space if necessary
359 if (p >= limit) {
360 if (iter == 0) {
361 continue; // Try again with larger buffer
362 } else {
363 p = limit - 1;
364 }
365 }
366
367 // Add newline if necessary
368 if (p == base || p[-1] != '\n') {
369 *p++ = '\n';
370 }
371
372 assert(p <= limit);
373 file_->Append(base, p-base);
374 file_->Flush();
375 if (base != buffer) {
376 delete[] base;
377 }
378 break;
379 }
380 }
381 };
382
383 } // namespace
384
385 // Finally, the hdfs environment
386
387 const std::string HdfsEnv::kProto = "hdfs://";
388 const std::string HdfsEnv::pathsep = "/";
389
390 // open a file for sequential reading
NewSequentialFile(const std::string & fname,std::unique_ptr<SequentialFile> * result,const EnvOptions &)391 Status HdfsEnv::NewSequentialFile(const std::string& fname,
392 std::unique_ptr<SequentialFile>* result,
393 const EnvOptions& /*options*/) {
394 result->reset();
395 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
396 if (f == nullptr || !f->isValid()) {
397 delete f;
398 *result = nullptr;
399 return IOError(fname, errno);
400 }
401 result->reset(dynamic_cast<SequentialFile*>(f));
402 return Status::OK();
403 }
404
405 // open a file for random reading
NewRandomAccessFile(const std::string & fname,std::unique_ptr<RandomAccessFile> * result,const EnvOptions &)406 Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
407 std::unique_ptr<RandomAccessFile>* result,
408 const EnvOptions& /*options*/) {
409 result->reset();
410 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
411 if (f == nullptr || !f->isValid()) {
412 delete f;
413 *result = nullptr;
414 return IOError(fname, errno);
415 }
416 result->reset(dynamic_cast<RandomAccessFile*>(f));
417 return Status::OK();
418 }
419
420 // create a new file for writing
NewWritableFile(const std::string & fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)421 Status HdfsEnv::NewWritableFile(const std::string& fname,
422 std::unique_ptr<WritableFile>* result,
423 const EnvOptions& options) {
424 result->reset();
425 Status s;
426 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
427 if (f == nullptr || !f->isValid()) {
428 delete f;
429 *result = nullptr;
430 return IOError(fname, errno);
431 }
432 result->reset(dynamic_cast<WritableFile*>(f));
433 return Status::OK();
434 }
435
436 class HdfsDirectory : public Directory {
437 public:
HdfsDirectory(int fd)438 explicit HdfsDirectory(int fd) : fd_(fd) {}
~HdfsDirectory()439 ~HdfsDirectory() {}
440
Fsync()441 Status Fsync() override { return Status::OK(); }
442
GetFd() const443 int GetFd() const { return fd_; }
444
445 private:
446 int fd_;
447 };
448
NewDirectory(const std::string & name,std::unique_ptr<Directory> * result)449 Status HdfsEnv::NewDirectory(const std::string& name,
450 std::unique_ptr<Directory>* result) {
451 int value = hdfsExists(fileSys_, name.c_str());
452 switch (value) {
453 case HDFS_EXISTS:
454 result->reset(new HdfsDirectory(0));
455 return Status::OK();
456 default: // fail if the directory doesn't exist
457 ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
458 throw HdfsFatalException("hdfsExists call failed with error " +
459 ToString(value) + " on path " + name +
460 ".\n");
461 }
462 }
463
FileExists(const std::string & fname)464 Status HdfsEnv::FileExists(const std::string& fname) {
465 int value = hdfsExists(fileSys_, fname.c_str());
466 switch (value) {
467 case HDFS_EXISTS:
468 return Status::OK();
469 case HDFS_DOESNT_EXIST:
470 return Status::NotFound();
471 default: // anything else should be an error
472 ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
473 return Status::IOError("hdfsExists call failed with error " +
474 ToString(value) + " on path " + fname + ".\n");
475 }
476 }
477
GetChildren(const std::string & path,std::vector<std::string> * result)478 Status HdfsEnv::GetChildren(const std::string& path,
479 std::vector<std::string>* result) {
480 int value = hdfsExists(fileSys_, path.c_str());
481 switch (value) {
482 case HDFS_EXISTS: { // directory exists
483 int numEntries = 0;
484 hdfsFileInfo* pHdfsFileInfo = 0;
485 pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
486 if (numEntries >= 0) {
487 for(int i = 0; i < numEntries; i++) {
488 std::string pathname(pHdfsFileInfo[i].mName);
489 size_t pos = pathname.rfind("/");
490 if (std::string::npos != pos) {
491 result->push_back(pathname.substr(pos + 1));
492 }
493 }
494 if (pHdfsFileInfo != nullptr) {
495 hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
496 }
497 } else {
498 // numEntries < 0 indicates error
499 ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
500 throw HdfsFatalException(
501 "hdfsListDirectory call failed negative error.\n");
502 }
503 break;
504 }
505 case HDFS_DOESNT_EXIST: // directory does not exist, exit
506 return Status::NotFound();
507 default: // anything else should be an error
508 ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
509 throw HdfsFatalException("hdfsExists call failed with error " +
510 ToString(value) + ".\n");
511 }
512 return Status::OK();
513 }
514
DeleteFile(const std::string & fname)515 Status HdfsEnv::DeleteFile(const std::string& fname) {
516 if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
517 return Status::OK();
518 }
519 return IOError(fname, errno);
520 };
521
CreateDir(const std::string & name)522 Status HdfsEnv::CreateDir(const std::string& name) {
523 if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
524 return Status::OK();
525 }
526 return IOError(name, errno);
527 };
528
CreateDirIfMissing(const std::string & name)529 Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
530 const int value = hdfsExists(fileSys_, name.c_str());
531 // Not atomic. state might change b/w hdfsExists and CreateDir.
532 switch (value) {
533 case HDFS_EXISTS:
534 return Status::OK();
535 case HDFS_DOESNT_EXIST:
536 return CreateDir(name);
537 default: // anything else should be an error
538 ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
539 throw HdfsFatalException("hdfsExists call failed with error " +
540 ToString(value) + ".\n");
541 }
542 };
543
DeleteDir(const std::string & name)544 Status HdfsEnv::DeleteDir(const std::string& name) {
545 return DeleteFile(name);
546 };
547
GetFileSize(const std::string & fname,uint64_t * size)548 Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
549 *size = 0L;
550 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
551 if (pFileInfo != nullptr) {
552 *size = pFileInfo->mSize;
553 hdfsFreeFileInfo(pFileInfo, 1);
554 return Status::OK();
555 }
556 return IOError(fname, errno);
557 }
558
GetFileModificationTime(const std::string & fname,uint64_t * time)559 Status HdfsEnv::GetFileModificationTime(const std::string& fname,
560 uint64_t* time) {
561 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
562 if (pFileInfo != nullptr) {
563 *time = static_cast<uint64_t>(pFileInfo->mLastMod);
564 hdfsFreeFileInfo(pFileInfo, 1);
565 return Status::OK();
566 }
567 return IOError(fname, errno);
568
569 }
570
571 // The rename is not atomic. HDFS does not allow a renaming if the
572 // target already exists. So, we delete the target before attempting the
573 // rename.
RenameFile(const std::string & src,const std::string & target)574 Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
575 hdfsDelete(fileSys_, target.c_str(), 1);
576 if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
577 return Status::OK();
578 }
579 return IOError(src, errno);
580 }
581
LockFile(const std::string &,FileLock ** lock)582 Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
583 // there isn's a very good way to atomically check and create
584 // a file via libhdfs
585 *lock = nullptr;
586 return Status::OK();
587 }
588
UnlockFile(FileLock *)589 Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
590
NewLogger(const std::string & fname,std::shared_ptr<Logger> * result)591 Status HdfsEnv::NewLogger(const std::string& fname,
592 std::shared_ptr<Logger>* result) {
593 // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That
594 // option is only intended for WAL/flush/compaction writes, so turn it off in
595 // the logger.
596 EnvOptions options;
597 options.strict_bytes_per_sync = false;
598 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
599 if (f == nullptr || !f->isValid()) {
600 delete f;
601 *result = nullptr;
602 return IOError(fname, errno);
603 }
604 HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
605 result->reset(h);
606 if (mylog == nullptr) {
607 // mylog = h; // uncomment this for detailed logging
608 }
609 return Status::OK();
610 }
611
612 // The factory method for creating an HDFS Env
NewHdfsEnv(Env ** hdfs_env,const std::string & fsname)613 Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
614 *hdfs_env = new HdfsEnv(fsname);
615 return Status::OK();
616 }
617 } // namespace ROCKSDB_NAMESPACE
618
619 #endif // ROCKSDB_HDFS_FILE_C
620
621 #else // USE_HDFS
622
623 // dummy placeholders used when HDFS is not available
624 namespace ROCKSDB_NAMESPACE {
NewSequentialFile(const std::string &,std::unique_ptr<SequentialFile> *,const EnvOptions &)625 Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
626 std::unique_ptr<SequentialFile>* /*result*/,
627 const EnvOptions& /*options*/) {
628 return Status::NotSupported("Not compiled with hdfs support");
629 }
630
NewHdfsEnv(Env **,const std::string &)631 Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
632 return Status::NotSupported("Not compiled with hdfs support");
633 }
634 } // namespace ROCKSDB_NAMESPACE
635
636 #endif
637