1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 
7 #pragma once
8 #include <algorithm>
9 #include <stdio.h>
10 #include <time.h>
11 #include <iostream>
12 #include "port/sys_time.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/status.h"
15 
16 #ifdef USE_HDFS
17 #include <hdfs.h>
18 
19 namespace ROCKSDB_NAMESPACE {
20 
21 // Thrown during execution when there is an issue with the supplied
22 // arguments.
23 class HdfsUsageException : public std::exception { };
24 
25 // A simple exception that indicates something went wrong that is not
26 // recoverable.  The intention is for the message to be printed (with
27 // nothing else) and the process terminate.
28 class HdfsFatalException : public std::exception {
29 public:
HdfsFatalException(const std::string & s)30   explicit HdfsFatalException(const std::string& s) : what_(s) { }
~HdfsFatalException()31   virtual ~HdfsFatalException() throw() { }
what()32   virtual const char* what() const throw() {
33     return what_.c_str();
34   }
35 private:
36   const std::string what_;
37 };
38 
39 //
40 // The HDFS environment for rocksdb. This class overrides all the
41 // file/dir access methods and delegates the thread-mgmt methods to the
42 // default posix environment.
43 //
44 class HdfsEnv : public Env {
45 
46  public:
HdfsEnv(const std::string & fsname)47   explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) {
48     posixEnv = Env::Default();
49     fileSys_ = connectToPath(fsname_);
50   }
51 
~HdfsEnv()52   virtual ~HdfsEnv() {
53     fprintf(stderr, "Destroying HdfsEnv::Default()\n");
54     hdfsDisconnect(fileSys_);
55   }
56 
57   Status NewSequentialFile(const std::string& fname,
58                            std::unique_ptr<SequentialFile>* result,
59                            const EnvOptions& options) override;
60 
61   Status NewRandomAccessFile(const std::string& fname,
62                              std::unique_ptr<RandomAccessFile>* result,
63                              const EnvOptions& options) override;
64 
65   Status NewWritableFile(const std::string& fname,
66                          std::unique_ptr<WritableFile>* result,
67                          const EnvOptions& options) override;
68 
69   Status NewDirectory(const std::string& name,
70                       std::unique_ptr<Directory>* result) override;
71 
72   Status FileExists(const std::string& fname) override;
73 
74   Status GetChildren(const std::string& path,
75                      std::vector<std::string>* result) override;
76 
77   Status DeleteFile(const std::string& fname) override;
78 
79   Status CreateDir(const std::string& name) override;
80 
81   Status CreateDirIfMissing(const std::string& name) override;
82 
83   Status DeleteDir(const std::string& name) override;
84 
85   Status GetFileSize(const std::string& fname, uint64_t* size) override;
86 
87   Status GetFileModificationTime(const std::string& fname,
88                                  uint64_t* file_mtime) override;
89 
90   Status RenameFile(const std::string& src, const std::string& target) override;
91 
LinkFile(const std::string &,const std::string &)92   Status LinkFile(const std::string& /*src*/,
93                   const std::string& /*target*/) override {
94     return Status::NotSupported(); // not supported
95   }
96 
97   Status LockFile(const std::string& fname, FileLock** lock) override;
98 
99   Status UnlockFile(FileLock* lock) override;
100 
101   Status NewLogger(const std::string& fname,
102                    std::shared_ptr<Logger>* result) override;
103 
104   void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
105                 void* tag = nullptr,
106                 void (*unschedFunction)(void* arg) = 0) override {
107     posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
108   }
109 
UnSchedule(void * tag,Priority pri)110   int UnSchedule(void* tag, Priority pri) override {
111     return posixEnv->UnSchedule(tag, pri);
112   }
113 
StartThread(void (* function)(void * arg),void * arg)114   void StartThread(void (*function)(void* arg), void* arg) override {
115     posixEnv->StartThread(function, arg);
116   }
117 
WaitForJoin()118   void WaitForJoin() override { posixEnv->WaitForJoin(); }
119 
120   unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
121     return posixEnv->GetThreadPoolQueueLen(pri);
122   }
123 
GetTestDirectory(std::string * path)124   Status GetTestDirectory(std::string* path) override {
125     return posixEnv->GetTestDirectory(path);
126   }
127 
NowMicros()128   uint64_t NowMicros() override { return posixEnv->NowMicros(); }
129 
SleepForMicroseconds(int micros)130   void SleepForMicroseconds(int micros) override {
131     posixEnv->SleepForMicroseconds(micros);
132   }
133 
GetHostName(char * name,uint64_t len)134   Status GetHostName(char* name, uint64_t len) override {
135     return posixEnv->GetHostName(name, len);
136   }
137 
GetCurrentTime(int64_t * unix_time)138   Status GetCurrentTime(int64_t* unix_time) override {
139     return posixEnv->GetCurrentTime(unix_time);
140   }
141 
GetAbsolutePath(const std::string & db_path,std::string * output_path)142   Status GetAbsolutePath(const std::string& db_path,
143                          std::string* output_path) override {
144     return posixEnv->GetAbsolutePath(db_path, output_path);
145   }
146 
147   void SetBackgroundThreads(int number, Priority pri = LOW) override {
148     posixEnv->SetBackgroundThreads(number, pri);
149   }
150 
151   int GetBackgroundThreads(Priority pri = LOW) override {
152     return posixEnv->GetBackgroundThreads(pri);
153   }
154 
IncBackgroundThreadsIfNeeded(int number,Priority pri)155   void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
156     posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
157   }
158 
TimeToString(uint64_t number)159   std::string TimeToString(uint64_t number) override {
160     return posixEnv->TimeToString(number);
161   }
162 
gettid()163   static uint64_t gettid() {
164     assert(sizeof(pthread_t) <= sizeof(uint64_t));
165     return (uint64_t)pthread_self();
166   }
167 
GetThreadID()168   uint64_t GetThreadID() const override { return HdfsEnv::gettid(); }
169 
170  private:
171   std::string fsname_;  // string of the form "hdfs://hostname:port/"
172   hdfsFS fileSys_;      //  a single FileSystem object for all files
173   Env*  posixEnv;       // This object is derived from Env, but not from
174                         // posixEnv. We have posixnv as an encapsulated
175                         // object here so that we can use posix timers,
176                         // posix threads, etc.
177 
178   static const std::string kProto;
179   static const std::string pathsep;
180 
181   /**
182    * If the URI is specified of the form hdfs://server:port/path,
183    * then connect to the specified cluster
184    * else connect to default.
185    */
connectToPath(const std::string & uri)186   hdfsFS connectToPath(const std::string& uri) {
187     if (uri.empty()) {
188       return nullptr;
189     }
190     if (uri.find(kProto) != 0) {
191       // uri doesn't start with hdfs:// -> use default:0, which is special
192       // to libhdfs.
193       return hdfsConnectNewInstance("default", 0);
194     }
195     const std::string hostport = uri.substr(kProto.length());
196 
197     std::vector <std::string> parts;
198     split(hostport, ':', parts);
199     if (parts.size() != 2) {
200       throw HdfsFatalException("Bad uri for hdfs " + uri);
201     }
202     // parts[0] = hosts, parts[1] = port/xxx/yyy
203     std::string host(parts[0]);
204     std::string remaining(parts[1]);
205 
206     int rem = static_cast<int>(remaining.find(pathsep));
207     std::string portStr = (rem == 0 ? remaining :
208                            remaining.substr(0, rem));
209 
210     tPort port;
211     port = atoi(portStr.c_str());
212     if (port == 0) {
213       throw HdfsFatalException("Bad host-port for hdfs " + uri);
214     }
215     hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
216     return fs;
217   }
218 
split(const std::string & s,char delim,std::vector<std::string> & elems)219   void split(const std::string &s, char delim,
220              std::vector<std::string> &elems) {
221     elems.clear();
222     size_t prev = 0;
223     size_t pos = s.find(delim);
224     while (pos != std::string::npos) {
225       elems.push_back(s.substr(prev, pos));
226       prev = pos + 1;
227       pos = s.find(delim, prev);
228     }
229     elems.push_back(s.substr(prev, s.size()));
230   }
231 };
232 
233 }  // namespace ROCKSDB_NAMESPACE
234 
235 #else // USE_HDFS
236 
237 namespace ROCKSDB_NAMESPACE {
238 
239 static const Status notsup;
240 
241 class HdfsEnv : public Env {
242 
243  public:
HdfsEnv(const std::string &)244   explicit HdfsEnv(const std::string& /*fsname*/) {
245     fprintf(stderr, "You have not build rocksdb with HDFS support\n");
246     fprintf(stderr, "Please see hdfs/README for details\n");
247     abort();
248   }
249 
~HdfsEnv()250   virtual ~HdfsEnv() {
251   }
252 
253   virtual Status NewSequentialFile(const std::string& fname,
254                                    std::unique_ptr<SequentialFile>* result,
255                                    const EnvOptions& options) override;
256 
NewRandomAccessFile(const std::string &,std::unique_ptr<RandomAccessFile> *,const EnvOptions &)257   virtual Status NewRandomAccessFile(
258       const std::string& /*fname*/,
259       std::unique_ptr<RandomAccessFile>* /*result*/,
260       const EnvOptions& /*options*/) override {
261     return notsup;
262   }
263 
NewWritableFile(const std::string &,std::unique_ptr<WritableFile> *,const EnvOptions &)264   virtual Status NewWritableFile(const std::string& /*fname*/,
265                                  std::unique_ptr<WritableFile>* /*result*/,
266                                  const EnvOptions& /*options*/) override {
267     return notsup;
268   }
269 
NewDirectory(const std::string &,std::unique_ptr<Directory> *)270   virtual Status NewDirectory(const std::string& /*name*/,
271                               std::unique_ptr<Directory>* /*result*/) override {
272     return notsup;
273   }
274 
FileExists(const std::string &)275   virtual Status FileExists(const std::string& /*fname*/) override {
276     return notsup;
277   }
278 
GetChildren(const std::string &,std::vector<std::string> *)279   virtual Status GetChildren(const std::string& /*path*/,
280                              std::vector<std::string>* /*result*/) override {
281     return notsup;
282   }
283 
DeleteFile(const std::string &)284   virtual Status DeleteFile(const std::string& /*fname*/) override {
285     return notsup;
286   }
287 
CreateDir(const std::string &)288   virtual Status CreateDir(const std::string& /*name*/) override {
289     return notsup;
290   }
291 
CreateDirIfMissing(const std::string &)292   virtual Status CreateDirIfMissing(const std::string& /*name*/) override {
293     return notsup;
294   }
295 
DeleteDir(const std::string &)296   virtual Status DeleteDir(const std::string& /*name*/) override {
297     return notsup;
298   }
299 
GetFileSize(const std::string &,uint64_t *)300   virtual Status GetFileSize(const std::string& /*fname*/,
301                              uint64_t* /*size*/) override {
302     return notsup;
303   }
304 
GetFileModificationTime(const std::string &,uint64_t *)305   virtual Status GetFileModificationTime(const std::string& /*fname*/,
306                                          uint64_t* /*time*/) override {
307     return notsup;
308   }
309 
RenameFile(const std::string &,const std::string &)310   virtual Status RenameFile(const std::string& /*src*/,
311                             const std::string& /*target*/) override {
312     return notsup;
313   }
314 
LinkFile(const std::string &,const std::string &)315   virtual Status LinkFile(const std::string& /*src*/,
316                           const std::string& /*target*/) override {
317     return notsup;
318   }
319 
LockFile(const std::string &,FileLock **)320   virtual Status LockFile(const std::string& /*fname*/,
321                           FileLock** /*lock*/) override {
322     return notsup;
323   }
324 
UnlockFile(FileLock *)325   virtual Status UnlockFile(FileLock* /*lock*/) override { return notsup; }
326 
NewLogger(const std::string &,std::shared_ptr<Logger> *)327   virtual Status NewLogger(const std::string& /*fname*/,
328                            std::shared_ptr<Logger>* /*result*/) override {
329     return notsup;
330   }
331 
332   virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/,
333                         Priority /*pri*/ = LOW, void* /*tag*/ = nullptr,
334                         void (* /*unschedFunction*/)(void* arg) = 0) override {}
335 
UnSchedule(void *,Priority)336   virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; }
337 
StartThread(void (*)(void * arg),void *)338   virtual void StartThread(void (* /*function*/)(void* arg),
339                            void* /*arg*/) override {}
340 
WaitForJoin()341   virtual void WaitForJoin() override {}
342 
343   virtual unsigned int GetThreadPoolQueueLen(
344       Priority /*pri*/ = LOW) const override {
345     return 0;
346   }
347 
GetTestDirectory(std::string *)348   virtual Status GetTestDirectory(std::string* /*path*/) override {
349     return notsup;
350   }
351 
NowMicros()352   virtual uint64_t NowMicros() override { return 0; }
353 
SleepForMicroseconds(int)354   virtual void SleepForMicroseconds(int /*micros*/) override {}
355 
GetHostName(char *,uint64_t)356   virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override {
357     return notsup;
358   }
359 
GetCurrentTime(int64_t *)360   virtual Status GetCurrentTime(int64_t* /*unix_time*/) override {
361     return notsup;
362   }
363 
GetAbsolutePath(const std::string &,std::string *)364   virtual Status GetAbsolutePath(const std::string& /*db_path*/,
365                                  std::string* /*outputpath*/) override {
366     return notsup;
367   }
368 
369   virtual void SetBackgroundThreads(int /*number*/,
370                                     Priority /*pri*/ = LOW) override {}
371   virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override {
372     return 0;
373   }
IncBackgroundThreadsIfNeeded(int,Priority)374   virtual void IncBackgroundThreadsIfNeeded(int /*number*/,
375                                             Priority /*pri*/) override {}
TimeToString(uint64_t)376   virtual std::string TimeToString(uint64_t /*number*/) override { return ""; }
377 
GetThreadID()378   virtual uint64_t GetThreadID() const override {
379     return 0;
380   }
381 };
382 }  // namespace ROCKSDB_NAMESPACE
383 
384 #endif // USE_HDFS
385