1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 
7 #include "rocksdb/env.h"
8 #include "hdfs/env_hdfs.h"
9 
10 #ifdef USE_HDFS
11 #ifndef ROCKSDB_HDFS_FILE_C
12 #define ROCKSDB_HDFS_FILE_C
13 
14 #include <stdio.h>
15 #include <time.h>
16 #include <algorithm>
17 #include <iostream>
18 #include <sstream>
19 #include "logging/logging.h"
20 #include "rocksdb/status.h"
21 #include "util/string_util.h"
22 
23 #define HDFS_EXISTS 0
24 #define HDFS_DOESNT_EXIST -1
25 #define HDFS_SUCCESS 0
26 
27 //
28 // This file defines an HDFS environment for rocksdb. It uses the libhdfs
29 // api to access HDFS. All HDFS files created by one instance of rocksdb
30 // will reside on the same HDFS cluster.
31 //
32 
33 namespace ROCKSDB_NAMESPACE {
34 
35 namespace {
36 
37 // Log error message
IOError(const std::string & context,int err_number)38 static Status IOError(const std::string& context, int err_number) {
39   return (err_number == ENOSPC)
40              ? Status::NoSpace(context, errnoStr(err_number).c_str())
41              : (err_number == ENOENT)
42                    ? Status::PathNotFound(context, errnoStr(err_number).c_str())
43                    : Status::IOError(context, errnoStr(err_number).c_str());
44 }
45 
46 // assume that there is one global logger for now. It is not thread-safe,
47 // but need not be because the logger is initialized at db-open time.
48 static Logger* mylog = nullptr;
49 
50 // Used for reading a file from HDFS. It implements both sequential-read
51 // access methods as well as random read access methods.
52 class HdfsReadableFile : virtual public SequentialFile,
53                          virtual public RandomAccessFile {
54  private:
55   hdfsFS fileSys_;
56   std::string filename_;
57   hdfsFile hfile_;
58 
59  public:
HdfsReadableFile(hdfsFS fileSys,const std::string & fname)60   HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
61       : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
62     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
63                     filename_.c_str());
64     hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
65     ROCKS_LOG_DEBUG(mylog,
66                     "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
67                     filename_.c_str(), hfile_);
68   }
69 
~HdfsReadableFile()70   virtual ~HdfsReadableFile() {
71     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
72                     filename_.c_str());
73     hdfsCloseFile(fileSys_, hfile_);
74     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
75                     filename_.c_str());
76     hfile_ = nullptr;
77   }
78 
isValid()79   bool isValid() {
80     return hfile_ != nullptr;
81   }
82 
83   // sequential access, read data at current offset in file
Read(size_t n,Slice * result,char * scratch)84   virtual Status Read(size_t n, Slice* result, char* scratch) {
85     Status s;
86     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
87                     filename_.c_str(), n);
88 
89     char* buffer = scratch;
90     size_t total_bytes_read = 0;
91     tSize bytes_read = 0;
92     tSize remaining_bytes = (tSize)n;
93 
94     // Read a total of n bytes repeatedly until we hit error or eof
95     while (remaining_bytes > 0) {
96       bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
97       if (bytes_read <= 0) {
98         break;
99       }
100       assert(bytes_read <= remaining_bytes);
101 
102       total_bytes_read += bytes_read;
103       remaining_bytes -= bytes_read;
104       buffer += bytes_read;
105     }
106     assert(total_bytes_read <= n);
107 
108     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
109                     filename_.c_str());
110 
111     if (bytes_read < 0) {
112       s = IOError(filename_, errno);
113     } else {
114       *result = Slice(scratch, total_bytes_read);
115     }
116 
117     return s;
118   }
119 
120   // random access, read data from specified offset in file
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const121   virtual Status Read(uint64_t offset, size_t n, Slice* result,
122                       char* scratch) const {
123     Status s;
124     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
125                     filename_.c_str());
126     tSize bytes_read =
127         hdfsPread(fileSys_, hfile_, offset, static_cast<void*>(scratch),
128                   static_cast<tSize>(n));
129     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
130                     filename_.c_str());
131     *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
132     if (bytes_read < 0) {
133       // An error: return a non-ok status
134       s = IOError(filename_, errno);
135     }
136     return s;
137   }
138 
Skip(uint64_t n)139   virtual Status Skip(uint64_t n) {
140     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
141                     filename_.c_str());
142     // get current offset from file
143     tOffset current = hdfsTell(fileSys_, hfile_);
144     if (current < 0) {
145       return IOError(filename_, errno);
146     }
147     // seek to new offset in file
148     tOffset newoffset = current + n;
149     int val = hdfsSeek(fileSys_, hfile_, newoffset);
150     if (val < 0) {
151       return IOError(filename_, errno);
152     }
153     return Status::OK();
154   }
155 
156  private:
157 
158   // returns true if we are at the end of file, false otherwise
feof()159   bool feof() {
160     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
161                     filename_.c_str());
162     if (hdfsTell(fileSys_, hfile_) == fileSize()) {
163       return true;
164     }
165     return false;
166   }
167 
168   // the current size of the file
fileSize()169   tOffset fileSize() {
170     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
171                     filename_.c_str());
172     hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
173     tOffset size = 0L;
174     if (pFileInfo != nullptr) {
175       size = pFileInfo->mSize;
176       hdfsFreeFileInfo(pFileInfo, 1);
177     } else {
178       throw HdfsFatalException("fileSize on unknown file " + filename_);
179     }
180     return size;
181   }
182 };
183 
184 // Appends to an existing file in HDFS.
185 class HdfsWritableFile: public WritableFile {
186  private:
187   hdfsFS fileSys_;
188   std::string filename_;
189   hdfsFile hfile_;
190 
191  public:
HdfsWritableFile(hdfsFS fileSys,const std::string & fname,const EnvOptions & options)192   HdfsWritableFile(hdfsFS fileSys, const std::string& fname,
193                    const EnvOptions& options)
194       : WritableFile(options),
195         fileSys_(fileSys),
196         filename_(fname),
197         hfile_(nullptr) {
198     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
199                     filename_.c_str());
200     hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
201     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
202                     filename_.c_str());
203     assert(hfile_ != nullptr);
204   }
~HdfsWritableFile()205   virtual ~HdfsWritableFile() {
206     if (hfile_ != nullptr) {
207       ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
208                       filename_.c_str());
209       hdfsCloseFile(fileSys_, hfile_);
210       ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
211                       filename_.c_str());
212       hfile_ = nullptr;
213     }
214   }
215 
216   using WritableFile::Append;
217 
218   // If the file was successfully created, then this returns true.
219   // Otherwise returns false.
isValid()220   bool isValid() {
221     return hfile_ != nullptr;
222   }
223 
224   // The name of the file, mostly needed for debug logging.
getName()225   const std::string& getName() {
226     return filename_;
227   }
228 
Append(const Slice & data)229   virtual Status Append(const Slice& data) {
230     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
231                     filename_.c_str());
232     const char* src = data.data();
233     size_t left = data.size();
234     size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
235     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
236                     filename_.c_str());
237     if (ret != left) {
238       return IOError(filename_, errno);
239     }
240     return Status::OK();
241   }
242 
Flush()243   virtual Status Flush() {
244     return Status::OK();
245   }
246 
Sync()247   virtual Status Sync() {
248     Status s;
249     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
250                     filename_.c_str());
251     if (hdfsFlush(fileSys_, hfile_) == -1) {
252       return IOError(filename_, errno);
253     }
254     if (hdfsHSync(fileSys_, hfile_) == -1) {
255       return IOError(filename_, errno);
256     }
257     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
258                     filename_.c_str());
259     return Status::OK();
260   }
261 
262   // This is used by HdfsLogger to write data to the debug log file
Append(const char * src,size_t size)263   virtual Status Append(const char* src, size_t size) {
264     if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
265         static_cast<tSize>(size)) {
266       return IOError(filename_, errno);
267     }
268     return Status::OK();
269   }
270 
Close()271   virtual Status Close() {
272     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
273                     filename_.c_str());
274     if (hdfsCloseFile(fileSys_, hfile_) != 0) {
275       return IOError(filename_, errno);
276     }
277     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
278                     filename_.c_str());
279     hfile_ = nullptr;
280     return Status::OK();
281   }
282 };
283 
284 // The object that implements the debug logs to reside in HDFS.
285 class HdfsLogger : public Logger {
286  private:
287   HdfsWritableFile* file_;
288   uint64_t (*gettid_)();  // Return the thread id for the current thread
289 
HdfsCloseHelper()290   Status HdfsCloseHelper() {
291     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
292                     file_->getName().c_str());
293     if (mylog != nullptr && mylog == this) {
294       mylog = nullptr;
295     }
296     return Status::OK();
297   }
298 
299  protected:
CloseImpl()300   virtual Status CloseImpl() override { return HdfsCloseHelper(); }
301 
302  public:
HdfsLogger(HdfsWritableFile * f,uint64_t (* gettid)())303   HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
304       : file_(f), gettid_(gettid) {
305     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
306                     file_->getName().c_str());
307   }
308 
~HdfsLogger()309   ~HdfsLogger() override {
310     if (!closed_) {
311       closed_ = true;
312       HdfsCloseHelper();
313     }
314   }
315 
316   using Logger::Logv;
Logv(const char * format,va_list ap)317   void Logv(const char* format, va_list ap) override {
318     const uint64_t thread_id = (*gettid_)();
319 
320     // We try twice: the first time with a fixed-size stack allocated buffer,
321     // and the second time with a much larger dynamically allocated buffer.
322     char buffer[500];
323     for (int iter = 0; iter < 2; iter++) {
324       char* base;
325       int bufsize;
326       if (iter == 0) {
327         bufsize = sizeof(buffer);
328         base = buffer;
329       } else {
330         bufsize = 30000;
331         base = new char[bufsize];
332       }
333       char* p = base;
334       char* limit = base + bufsize;
335 
336       struct timeval now_tv;
337       gettimeofday(&now_tv, nullptr);
338       const time_t seconds = now_tv.tv_sec;
339       struct tm t;
340       localtime_r(&seconds, &t);
341       p += snprintf(p, limit - p,
342                     "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
343                     t.tm_year + 1900,
344                     t.tm_mon + 1,
345                     t.tm_mday,
346                     t.tm_hour,
347                     t.tm_min,
348                     t.tm_sec,
349                     static_cast<int>(now_tv.tv_usec),
350                     static_cast<long long unsigned int>(thread_id));
351 
352       // Print the message
353       if (p < limit) {
354         va_list backup_ap;
355         va_copy(backup_ap, ap);
356         p += vsnprintf(p, limit - p, format, backup_ap);
357         va_end(backup_ap);
358       }
359 
360       // Truncate to available space if necessary
361       if (p >= limit) {
362         if (iter == 0) {
363           continue;       // Try again with larger buffer
364         } else {
365           p = limit - 1;
366         }
367       }
368 
369       // Add newline if necessary
370       if (p == base || p[-1] != '\n') {
371         *p++ = '\n';
372       }
373 
374       assert(p <= limit);
375       file_->Append(base, p-base);
376       file_->Flush();
377       if (base != buffer) {
378         delete[] base;
379       }
380       break;
381     }
382   }
383 };
384 
385 }  // namespace
386 
387 // Finally, the hdfs environment
388 
389 const std::string HdfsEnv::kProto = "hdfs://";
390 const std::string HdfsEnv::pathsep = "/";
391 
392 // open a file for sequential reading
NewSequentialFile(const std::string & fname,std::unique_ptr<SequentialFile> * result,const EnvOptions &)393 Status HdfsEnv::NewSequentialFile(const std::string& fname,
394                                   std::unique_ptr<SequentialFile>* result,
395                                   const EnvOptions& /*options*/) {
396   result->reset();
397   HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
398   if (f == nullptr || !f->isValid()) {
399     delete f;
400     *result = nullptr;
401     return IOError(fname, errno);
402   }
403   result->reset(dynamic_cast<SequentialFile*>(f));
404   return Status::OK();
405 }
406 
407 // open a file for random reading
NewRandomAccessFile(const std::string & fname,std::unique_ptr<RandomAccessFile> * result,const EnvOptions &)408 Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
409                                     std::unique_ptr<RandomAccessFile>* result,
410                                     const EnvOptions& /*options*/) {
411   result->reset();
412   HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
413   if (f == nullptr || !f->isValid()) {
414     delete f;
415     *result = nullptr;
416     return IOError(fname, errno);
417   }
418   result->reset(dynamic_cast<RandomAccessFile*>(f));
419   return Status::OK();
420 }
421 
422 // create a new file for writing
NewWritableFile(const std::string & fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)423 Status HdfsEnv::NewWritableFile(const std::string& fname,
424                                 std::unique_ptr<WritableFile>* result,
425                                 const EnvOptions& options) {
426   result->reset();
427   Status s;
428   HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
429   if (f == nullptr || !f->isValid()) {
430     delete f;
431     *result = nullptr;
432     return IOError(fname, errno);
433   }
434   result->reset(dynamic_cast<WritableFile*>(f));
435   return Status::OK();
436 }
437 
438 class HdfsDirectory : public Directory {
439  public:
HdfsDirectory(int fd)440   explicit HdfsDirectory(int fd) : fd_(fd) {}
~HdfsDirectory()441   ~HdfsDirectory() {}
442 
Fsync()443   Status Fsync() override { return Status::OK(); }
444 
GetFd() const445   int GetFd() const { return fd_; }
446 
447  private:
448   int fd_;
449 };
450 
NewDirectory(const std::string & name,std::unique_ptr<Directory> * result)451 Status HdfsEnv::NewDirectory(const std::string& name,
452                              std::unique_ptr<Directory>* result) {
453   int value = hdfsExists(fileSys_, name.c_str());
454   switch (value) {
455     case HDFS_EXISTS:
456       result->reset(new HdfsDirectory(0));
457       return Status::OK();
458     default:  // fail if the directory doesn't exist
459       ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
460       throw HdfsFatalException("hdfsExists call failed with error " +
461                                ToString(value) + " on path " + name +
462                                ".\n");
463   }
464 }
465 
FileExists(const std::string & fname)466 Status HdfsEnv::FileExists(const std::string& fname) {
467   int value = hdfsExists(fileSys_, fname.c_str());
468   switch (value) {
469     case HDFS_EXISTS:
470       return Status::OK();
471     case HDFS_DOESNT_EXIST:
472       return Status::NotFound();
473     default:  // anything else should be an error
474       ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
475       return Status::IOError("hdfsExists call failed with error " +
476                              ToString(value) + " on path " + fname + ".\n");
477   }
478 }
479 
GetChildren(const std::string & path,std::vector<std::string> * result)480 Status HdfsEnv::GetChildren(const std::string& path,
481                             std::vector<std::string>* result) {
482   int value = hdfsExists(fileSys_, path.c_str());
483   switch (value) {
484     case HDFS_EXISTS: {  // directory exists
485     int numEntries = 0;
486     hdfsFileInfo* pHdfsFileInfo = 0;
487     pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
488     if (numEntries >= 0) {
489       for(int i = 0; i < numEntries; i++) {
490         std::string pathname(pHdfsFileInfo[i].mName);
491         size_t pos = pathname.rfind("/");
492         if (std::string::npos != pos) {
493           result->push_back(pathname.substr(pos + 1));
494         }
495       }
496       if (pHdfsFileInfo != nullptr) {
497         hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
498       }
499     } else {
500       // numEntries < 0 indicates error
501       ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
502       throw HdfsFatalException(
503           "hdfsListDirectory call failed negative error.\n");
504     }
505     break;
506   }
507   case HDFS_DOESNT_EXIST:  // directory does not exist, exit
508     return Status::NotFound();
509   default:          // anything else should be an error
510     ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
511     throw HdfsFatalException("hdfsExists call failed with error " +
512                              ToString(value) + ".\n");
513   }
514   return Status::OK();
515 }
516 
DeleteFile(const std::string & fname)517 Status HdfsEnv::DeleteFile(const std::string& fname) {
518   if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
519     return Status::OK();
520   }
521   return IOError(fname, errno);
522 };
523 
CreateDir(const std::string & name)524 Status HdfsEnv::CreateDir(const std::string& name) {
525   if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
526     return Status::OK();
527   }
528   return IOError(name, errno);
529 };
530 
CreateDirIfMissing(const std::string & name)531 Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
532   const int value = hdfsExists(fileSys_, name.c_str());
533   //  Not atomic. state might change b/w hdfsExists and CreateDir.
534   switch (value) {
535     case HDFS_EXISTS:
536     return Status::OK();
537     case HDFS_DOESNT_EXIST:
538     return CreateDir(name);
539     default:  // anything else should be an error
540       ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
541       throw HdfsFatalException("hdfsExists call failed with error " +
542                                ToString(value) + ".\n");
543   }
544 };
545 
DeleteDir(const std::string & name)546 Status HdfsEnv::DeleteDir(const std::string& name) {
547   return DeleteFile(name);
548 };
549 
GetFileSize(const std::string & fname,uint64_t * size)550 Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
551   *size = 0L;
552   hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
553   if (pFileInfo != nullptr) {
554     *size = pFileInfo->mSize;
555     hdfsFreeFileInfo(pFileInfo, 1);
556     return Status::OK();
557   }
558   return IOError(fname, errno);
559 }
560 
GetFileModificationTime(const std::string & fname,uint64_t * time)561 Status HdfsEnv::GetFileModificationTime(const std::string& fname,
562                                         uint64_t* time) {
563   hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
564   if (pFileInfo != nullptr) {
565     *time = static_cast<uint64_t>(pFileInfo->mLastMod);
566     hdfsFreeFileInfo(pFileInfo, 1);
567     return Status::OK();
568   }
569   return IOError(fname, errno);
570 
571 }
572 
573 // The rename is not atomic. HDFS does not allow a renaming if the
574 // target already exists. So, we delete the target before attempting the
575 // rename.
RenameFile(const std::string & src,const std::string & target)576 Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
577   hdfsDelete(fileSys_, target.c_str(), 1);
578   if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
579     return Status::OK();
580   }
581   return IOError(src, errno);
582 }
583 
LockFile(const std::string &,FileLock ** lock)584 Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
585   // there isn's a very good way to atomically check and create
586   // a file via libhdfs
587   *lock = nullptr;
588   return Status::OK();
589 }
590 
UnlockFile(FileLock *)591 Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
592 
NewLogger(const std::string & fname,std::shared_ptr<Logger> * result)593 Status HdfsEnv::NewLogger(const std::string& fname,
594                           std::shared_ptr<Logger>* result) {
595   // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That
596   // option is only intended for WAL/flush/compaction writes, so turn it off in
597   // the logger.
598   EnvOptions options;
599   options.strict_bytes_per_sync = false;
600   HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
601   if (f == nullptr || !f->isValid()) {
602     delete f;
603     *result = nullptr;
604     return IOError(fname, errno);
605   }
606   HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
607   result->reset(h);
608   if (mylog == nullptr) {
609     // mylog = h; // uncomment this for detailed logging
610   }
611   return Status::OK();
612 }
613 
IsDirectory(const std::string & path,bool * is_dir)614 Status HdfsEnv::IsDirectory(const std::string& path, bool* is_dir) {
615   hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, path.c_str());
616   if (pFileInfo != nullptr) {
617     if (is_dir != nullptr) {
618       *is_dir = (pFileInfo->mKind == kObjectKindDirectory);
619     }
620     hdfsFreeFileInfo(pFileInfo, 1);
621     return Status::OK();
622   }
623   return IOError(path, errno);
624 }
625 
626 // The factory method for creating an HDFS Env
NewHdfsEnv(Env ** hdfs_env,const std::string & fsname)627 Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
628   *hdfs_env = new HdfsEnv(fsname);
629   return Status::OK();
630 }
631 }  // namespace ROCKSDB_NAMESPACE
632 
633 #endif // ROCKSDB_HDFS_FILE_C
634 
635 #else // USE_HDFS
636 
637 // dummy placeholders used when HDFS is not available
638 namespace ROCKSDB_NAMESPACE {
NewSequentialFile(const std::string &,std::unique_ptr<SequentialFile> *,const EnvOptions &)639 Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
640                                   std::unique_ptr<SequentialFile>* /*result*/,
641                                   const EnvOptions& /*options*/) {
642   return Status::NotSupported("Not compiled with hdfs support");
643 }
644 
NewHdfsEnv(Env **,const std::string &)645  Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
646    return Status::NotSupported("Not compiled with hdfs support");
647  }
648  }  // namespace ROCKSDB_NAMESPACE
649 
650 #endif
651