1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 7 #pragma once 8 #include <algorithm> 9 #include <stdio.h> 10 #include <time.h> 11 #include <iostream> 12 #include "port/sys_time.h" 13 #include "rocksdb/env.h" 14 #include "rocksdb/status.h" 15 16 #ifdef USE_HDFS 17 #include <hdfs.h> 18 19 namespace rocksdb { 20 21 // Thrown during execution when there is an issue with the supplied 22 // arguments. 23 class HdfsUsageException : public std::exception { }; 24 25 // A simple exception that indicates something went wrong that is not 26 // recoverable. The intention is for the message to be printed (with 27 // nothing else) and the process terminate. 28 class HdfsFatalException : public std::exception { 29 public: HdfsFatalException(const std::string & s)30 explicit HdfsFatalException(const std::string& s) : what_(s) { } ~HdfsFatalException()31 virtual ~HdfsFatalException() throw() { } what()32 virtual const char* what() const throw() { 33 return what_.c_str(); 34 } 35 private: 36 const std::string what_; 37 }; 38 39 // 40 // The HDFS environment for rocksdb. This class overrides all the 41 // file/dir access methods and delegates the thread-mgmt methods to the 42 // default posix environment. 43 // 44 class HdfsEnv : public Env { 45 46 public: HdfsEnv(const std::string & fsname)47 explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) { 48 posixEnv = Env::Default(); 49 fileSys_ = connectToPath(fsname_); 50 } 51 ~HdfsEnv()52 virtual ~HdfsEnv() { 53 fprintf(stderr, "Destroying HdfsEnv::Default()\n"); 54 hdfsDisconnect(fileSys_); 55 } 56 57 Status NewSequentialFile(const std::string& fname, 58 std::unique_ptr<SequentialFile>* result, 59 const EnvOptions& options) override; 60 61 Status NewRandomAccessFile(const std::string& fname, 62 std::unique_ptr<RandomAccessFile>* result, 63 const EnvOptions& options) override; 64 65 Status NewWritableFile(const std::string& fname, 66 std::unique_ptr<WritableFile>* result, 67 const EnvOptions& options) override; 68 69 Status NewDirectory(const std::string& name, 70 std::unique_ptr<Directory>* result) override; 71 72 Status FileExists(const std::string& fname) override; 73 74 Status GetChildren(const std::string& path, 75 std::vector<std::string>* result) override; 76 77 Status DeleteFile(const std::string& fname) override; 78 79 Status CreateDir(const std::string& name) override; 80 81 Status CreateDirIfMissing(const std::string& name) override; 82 83 Status DeleteDir(const std::string& name) override; 84 85 Status GetFileSize(const std::string& fname, uint64_t* size) override; 86 87 Status GetFileModificationTime(const std::string& fname, 88 uint64_t* file_mtime) override; 89 90 Status RenameFile(const std::string& src, const std::string& target) override; 91 LinkFile(const std::string &,const std::string &)92 Status LinkFile(const std::string& /*src*/, 93 const std::string& /*target*/) override { 94 return Status::NotSupported(); // not supported 95 } 96 97 Status LockFile(const std::string& fname, FileLock** lock) override; 98 99 Status UnlockFile(FileLock* lock) override; 100 101 Status NewLogger(const std::string& fname, 102 std::shared_ptr<Logger>* result) override; 103 104 void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW, 105 void* tag = nullptr, 106 void (*unschedFunction)(void* arg) = 0) override { 107 posixEnv->Schedule(function, arg, pri, tag, unschedFunction); 108 } 109 UnSchedule(void * tag,Priority pri)110 int UnSchedule(void* tag, Priority pri) override { 111 return posixEnv->UnSchedule(tag, pri); 112 } 113 StartThread(void (* function)(void * arg),void * arg)114 void StartThread(void (*function)(void* arg), void* arg) override { 115 posixEnv->StartThread(function, arg); 116 } 117 WaitForJoin()118 void WaitForJoin() override { posixEnv->WaitForJoin(); } 119 120 unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override { 121 return posixEnv->GetThreadPoolQueueLen(pri); 122 } 123 GetTestDirectory(std::string * path)124 Status GetTestDirectory(std::string* path) override { 125 return posixEnv->GetTestDirectory(path); 126 } 127 NowMicros()128 uint64_t NowMicros() override { return posixEnv->NowMicros(); } 129 SleepForMicroseconds(int micros)130 void SleepForMicroseconds(int micros) override { 131 posixEnv->SleepForMicroseconds(micros); 132 } 133 GetHostName(char * name,uint64_t len)134 Status GetHostName(char* name, uint64_t len) override { 135 return posixEnv->GetHostName(name, len); 136 } 137 GetCurrentTime(int64_t * unix_time)138 Status GetCurrentTime(int64_t* unix_time) override { 139 return posixEnv->GetCurrentTime(unix_time); 140 } 141 GetAbsolutePath(const std::string & db_path,std::string * output_path)142 Status GetAbsolutePath(const std::string& db_path, 143 std::string* output_path) override { 144 return posixEnv->GetAbsolutePath(db_path, output_path); 145 } 146 147 void SetBackgroundThreads(int number, Priority pri = LOW) override { 148 posixEnv->SetBackgroundThreads(number, pri); 149 } 150 151 int GetBackgroundThreads(Priority pri = LOW) override { 152 return posixEnv->GetBackgroundThreads(pri); 153 } 154 IncBackgroundThreadsIfNeeded(int number,Priority pri)155 void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { 156 posixEnv->IncBackgroundThreadsIfNeeded(number, pri); 157 } 158 TimeToString(uint64_t number)159 std::string TimeToString(uint64_t number) override { 160 return posixEnv->TimeToString(number); 161 } 162 gettid()163 static uint64_t gettid() { 164 assert(sizeof(pthread_t) <= sizeof(uint64_t)); 165 return (uint64_t)pthread_self(); 166 } 167 GetThreadID()168 uint64_t GetThreadID() const override { return HdfsEnv::gettid(); } 169 170 private: 171 std::string fsname_; // string of the form "hdfs://hostname:port/" 172 hdfsFS fileSys_; // a single FileSystem object for all files 173 Env* posixEnv; // This object is derived from Env, but not from 174 // posixEnv. We have posixnv as an encapsulated 175 // object here so that we can use posix timers, 176 // posix threads, etc. 177 178 static const std::string kProto; 179 static const std::string pathsep; 180 181 /** 182 * If the URI is specified of the form hdfs://server:port/path, 183 * then connect to the specified cluster 184 * else connect to default. 185 */ connectToPath(const std::string & uri)186 hdfsFS connectToPath(const std::string& uri) { 187 if (uri.empty()) { 188 return nullptr; 189 } 190 if (uri.find(kProto) != 0) { 191 // uri doesn't start with hdfs:// -> use default:0, which is special 192 // to libhdfs. 193 return hdfsConnectNewInstance("default", 0); 194 } 195 const std::string hostport = uri.substr(kProto.length()); 196 197 std::vector <std::string> parts; 198 split(hostport, ':', parts); 199 if (parts.size() != 2) { 200 throw HdfsFatalException("Bad uri for hdfs " + uri); 201 } 202 // parts[0] = hosts, parts[1] = port/xxx/yyy 203 std::string host(parts[0]); 204 std::string remaining(parts[1]); 205 206 int rem = static_cast<int>(remaining.find(pathsep)); 207 std::string portStr = (rem == 0 ? remaining : 208 remaining.substr(0, rem)); 209 210 tPort port; 211 port = atoi(portStr.c_str()); 212 if (port == 0) { 213 throw HdfsFatalException("Bad host-port for hdfs " + uri); 214 } 215 hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port); 216 return fs; 217 } 218 split(const std::string & s,char delim,std::vector<std::string> & elems)219 void split(const std::string &s, char delim, 220 std::vector<std::string> &elems) { 221 elems.clear(); 222 size_t prev = 0; 223 size_t pos = s.find(delim); 224 while (pos != std::string::npos) { 225 elems.push_back(s.substr(prev, pos)); 226 prev = pos + 1; 227 pos = s.find(delim, prev); 228 } 229 elems.push_back(s.substr(prev, s.size())); 230 } 231 }; 232 233 } // namespace rocksdb 234 235 #else // USE_HDFS 236 237 238 namespace rocksdb { 239 240 static const Status notsup; 241 242 class HdfsEnv : public Env { 243 244 public: HdfsEnv(const std::string &)245 explicit HdfsEnv(const std::string& /*fsname*/) { 246 fprintf(stderr, "You have not build rocksdb with HDFS support\n"); 247 fprintf(stderr, "Please see hdfs/README for details\n"); 248 abort(); 249 } 250 ~HdfsEnv()251 virtual ~HdfsEnv() { 252 } 253 254 virtual Status NewSequentialFile(const std::string& fname, 255 std::unique_ptr<SequentialFile>* result, 256 const EnvOptions& options) override; 257 NewRandomAccessFile(const std::string &,std::unique_ptr<RandomAccessFile> *,const EnvOptions &)258 virtual Status NewRandomAccessFile( 259 const std::string& /*fname*/, 260 std::unique_ptr<RandomAccessFile>* /*result*/, 261 const EnvOptions& /*options*/) override { 262 return notsup; 263 } 264 NewWritableFile(const std::string &,std::unique_ptr<WritableFile> *,const EnvOptions &)265 virtual Status NewWritableFile(const std::string& /*fname*/, 266 std::unique_ptr<WritableFile>* /*result*/, 267 const EnvOptions& /*options*/) override { 268 return notsup; 269 } 270 NewDirectory(const std::string &,std::unique_ptr<Directory> *)271 virtual Status NewDirectory(const std::string& /*name*/, 272 std::unique_ptr<Directory>* /*result*/) override { 273 return notsup; 274 } 275 FileExists(const std::string &)276 virtual Status FileExists(const std::string& /*fname*/) override { 277 return notsup; 278 } 279 GetChildren(const std::string &,std::vector<std::string> *)280 virtual Status GetChildren(const std::string& /*path*/, 281 std::vector<std::string>* /*result*/) override { 282 return notsup; 283 } 284 DeleteFile(const std::string &)285 virtual Status DeleteFile(const std::string& /*fname*/) override { 286 return notsup; 287 } 288 CreateDir(const std::string &)289 virtual Status CreateDir(const std::string& /*name*/) override { 290 return notsup; 291 } 292 CreateDirIfMissing(const std::string &)293 virtual Status CreateDirIfMissing(const std::string& /*name*/) override { 294 return notsup; 295 } 296 DeleteDir(const std::string &)297 virtual Status DeleteDir(const std::string& /*name*/) override { 298 return notsup; 299 } 300 GetFileSize(const std::string &,uint64_t *)301 virtual Status GetFileSize(const std::string& /*fname*/, 302 uint64_t* /*size*/) override { 303 return notsup; 304 } 305 GetFileModificationTime(const std::string &,uint64_t *)306 virtual Status GetFileModificationTime(const std::string& /*fname*/, 307 uint64_t* /*time*/) override { 308 return notsup; 309 } 310 RenameFile(const std::string &,const std::string &)311 virtual Status RenameFile(const std::string& /*src*/, 312 const std::string& /*target*/) override { 313 return notsup; 314 } 315 LinkFile(const std::string &,const std::string &)316 virtual Status LinkFile(const std::string& /*src*/, 317 const std::string& /*target*/) override { 318 return notsup; 319 } 320 LockFile(const std::string &,FileLock **)321 virtual Status LockFile(const std::string& /*fname*/, 322 FileLock** /*lock*/) override { 323 return notsup; 324 } 325 UnlockFile(FileLock *)326 virtual Status UnlockFile(FileLock* /*lock*/) override { return notsup; } 327 NewLogger(const std::string &,std::shared_ptr<Logger> *)328 virtual Status NewLogger(const std::string& /*fname*/, 329 std::shared_ptr<Logger>* /*result*/) override { 330 return notsup; 331 } 332 333 virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/, 334 Priority /*pri*/ = LOW, void* /*tag*/ = nullptr, 335 void (* /*unschedFunction*/)(void* arg) = 0) override {} 336 UnSchedule(void *,Priority)337 virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; } 338 StartThread(void (*)(void * arg),void *)339 virtual void StartThread(void (* /*function*/)(void* arg), 340 void* /*arg*/) override {} 341 WaitForJoin()342 virtual void WaitForJoin() override {} 343 344 virtual unsigned int GetThreadPoolQueueLen( 345 Priority /*pri*/ = LOW) const override { 346 return 0; 347 } 348 GetTestDirectory(std::string *)349 virtual Status GetTestDirectory(std::string* /*path*/) override { 350 return notsup; 351 } 352 NowMicros()353 virtual uint64_t NowMicros() override { return 0; } 354 SleepForMicroseconds(int)355 virtual void SleepForMicroseconds(int /*micros*/) override {} 356 GetHostName(char *,uint64_t)357 virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override { 358 return notsup; 359 } 360 GetCurrentTime(int64_t *)361 virtual Status GetCurrentTime(int64_t* /*unix_time*/) override { 362 return notsup; 363 } 364 GetAbsolutePath(const std::string &,std::string *)365 virtual Status GetAbsolutePath(const std::string& /*db_path*/, 366 std::string* /*outputpath*/) override { 367 return notsup; 368 } 369 370 virtual void SetBackgroundThreads(int /*number*/, 371 Priority /*pri*/ = LOW) override {} 372 virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override { 373 return 0; 374 } IncBackgroundThreadsIfNeeded(int,Priority)375 virtual void IncBackgroundThreadsIfNeeded(int /*number*/, 376 Priority /*pri*/) override {} TimeToString(uint64_t)377 virtual std::string TimeToString(uint64_t /*number*/) override { return ""; } 378 GetThreadID()379 virtual uint64_t GetThreadID() const override { 380 return 0; 381 } 382 }; 383 } 384 385 #endif // USE_HDFS 386