1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 7 #pragma once 8 #include <algorithm> 9 #include <stdio.h> 10 #include <time.h> 11 #include <iostream> 12 #include "port/sys_time.h" 13 #include "rocksdb/env.h" 14 #include "rocksdb/status.h" 15 16 #ifdef USE_HDFS 17 #include <hdfs.h> 18 19 namespace ROCKSDB_NAMESPACE { 20 21 // Thrown during execution when there is an issue with the supplied 22 // arguments. 23 class HdfsUsageException : public std::exception { }; 24 25 // A simple exception that indicates something went wrong that is not 26 // recoverable. The intention is for the message to be printed (with 27 // nothing else) and the process terminate. 28 class HdfsFatalException : public std::exception { 29 public: HdfsFatalException(const std::string & s)30 explicit HdfsFatalException(const std::string& s) : what_(s) { } ~HdfsFatalException()31 virtual ~HdfsFatalException() throw() { } what()32 virtual const char* what() const throw() { 33 return what_.c_str(); 34 } 35 private: 36 const std::string what_; 37 }; 38 39 // 40 // The HDFS environment for rocksdb. This class overrides all the 41 // file/dir access methods and delegates the thread-mgmt methods to the 42 // default posix environment. 43 // 44 class HdfsEnv : public Env { 45 46 public: HdfsEnv(const std::string & fsname)47 explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) { 48 posixEnv = Env::Default(); 49 fileSys_ = connectToPath(fsname_); 50 } 51 ~HdfsEnv()52 virtual ~HdfsEnv() { 53 fprintf(stderr, "Destroying HdfsEnv::Default()\n"); 54 hdfsDisconnect(fileSys_); 55 } 56 57 Status NewSequentialFile(const std::string& fname, 58 std::unique_ptr<SequentialFile>* result, 59 const EnvOptions& options) override; 60 61 Status NewRandomAccessFile(const std::string& fname, 62 std::unique_ptr<RandomAccessFile>* result, 63 const EnvOptions& options) override; 64 65 Status NewWritableFile(const std::string& fname, 66 std::unique_ptr<WritableFile>* result, 67 const EnvOptions& options) override; 68 69 Status NewDirectory(const std::string& name, 70 std::unique_ptr<Directory>* result) override; 71 72 Status FileExists(const std::string& fname) override; 73 74 Status GetChildren(const std::string& path, 75 std::vector<std::string>* result) override; 76 77 Status DeleteFile(const std::string& fname) override; 78 79 Status CreateDir(const std::string& name) override; 80 81 Status CreateDirIfMissing(const std::string& name) override; 82 83 Status DeleteDir(const std::string& name) override; 84 85 Status GetFileSize(const std::string& fname, uint64_t* size) override; 86 87 Status GetFileModificationTime(const std::string& fname, 88 uint64_t* file_mtime) override; 89 90 Status RenameFile(const std::string& src, const std::string& target) override; 91 LinkFile(const std::string &,const std::string &)92 Status LinkFile(const std::string& /*src*/, 93 const std::string& /*target*/) override { 94 return Status::NotSupported(); // not supported 95 } 96 97 Status LockFile(const std::string& fname, FileLock** lock) override; 98 99 Status UnlockFile(FileLock* lock) override; 100 101 Status NewLogger(const std::string& fname, 102 std::shared_ptr<Logger>* result) override; 103 104 void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW, 105 void* tag = nullptr, 106 void (*unschedFunction)(void* arg) = 0) override { 107 posixEnv->Schedule(function, arg, pri, tag, unschedFunction); 108 } 109 UnSchedule(void * tag,Priority pri)110 int UnSchedule(void* tag, Priority pri) override { 111 return posixEnv->UnSchedule(tag, pri); 112 } 113 StartThread(void (* function)(void * arg),void * arg)114 void StartThread(void (*function)(void* arg), void* arg) override { 115 posixEnv->StartThread(function, arg); 116 } 117 WaitForJoin()118 void WaitForJoin() override { posixEnv->WaitForJoin(); } 119 120 unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override { 121 return posixEnv->GetThreadPoolQueueLen(pri); 122 } 123 GetTestDirectory(std::string * path)124 Status GetTestDirectory(std::string* path) override { 125 return posixEnv->GetTestDirectory(path); 126 } 127 NowMicros()128 uint64_t NowMicros() override { return posixEnv->NowMicros(); } 129 SleepForMicroseconds(int micros)130 void SleepForMicroseconds(int micros) override { 131 posixEnv->SleepForMicroseconds(micros); 132 } 133 GetHostName(char * name,uint64_t len)134 Status GetHostName(char* name, uint64_t len) override { 135 return posixEnv->GetHostName(name, len); 136 } 137 GetCurrentTime(int64_t * unix_time)138 Status GetCurrentTime(int64_t* unix_time) override { 139 return posixEnv->GetCurrentTime(unix_time); 140 } 141 GetAbsolutePath(const std::string & db_path,std::string * output_path)142 Status GetAbsolutePath(const std::string& db_path, 143 std::string* output_path) override { 144 return posixEnv->GetAbsolutePath(db_path, output_path); 145 } 146 147 void SetBackgroundThreads(int number, Priority pri = LOW) override { 148 posixEnv->SetBackgroundThreads(number, pri); 149 } 150 151 int GetBackgroundThreads(Priority pri = LOW) override { 152 return posixEnv->GetBackgroundThreads(pri); 153 } 154 IncBackgroundThreadsIfNeeded(int number,Priority pri)155 void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { 156 posixEnv->IncBackgroundThreadsIfNeeded(number, pri); 157 } 158 TimeToString(uint64_t number)159 std::string TimeToString(uint64_t number) override { 160 return posixEnv->TimeToString(number); 161 } 162 gettid()163 static uint64_t gettid() { 164 assert(sizeof(pthread_t) <= sizeof(uint64_t)); 165 return (uint64_t)pthread_self(); 166 } 167 GetThreadID()168 uint64_t GetThreadID() const override { return HdfsEnv::gettid(); } 169 170 private: 171 std::string fsname_; // string of the form "hdfs://hostname:port/" 172 hdfsFS fileSys_; // a single FileSystem object for all files 173 Env* posixEnv; // This object is derived from Env, but not from 174 // posixEnv. We have posixnv as an encapsulated 175 // object here so that we can use posix timers, 176 // posix threads, etc. 177 178 static const std::string kProto; 179 static const std::string pathsep; 180 181 /** 182 * If the URI is specified of the form hdfs://server:port/path, 183 * then connect to the specified cluster 184 * else connect to default. 185 */ connectToPath(const std::string & uri)186 hdfsFS connectToPath(const std::string& uri) { 187 if (uri.empty()) { 188 return nullptr; 189 } 190 if (uri.find(kProto) != 0) { 191 // uri doesn't start with hdfs:// -> use default:0, which is special 192 // to libhdfs. 193 return hdfsConnectNewInstance("default", 0); 194 } 195 const std::string hostport = uri.substr(kProto.length()); 196 197 std::vector <std::string> parts; 198 split(hostport, ':', parts); 199 if (parts.size() != 2) { 200 throw HdfsFatalException("Bad uri for hdfs " + uri); 201 } 202 // parts[0] = hosts, parts[1] = port/xxx/yyy 203 std::string host(parts[0]); 204 std::string remaining(parts[1]); 205 206 int rem = static_cast<int>(remaining.find(pathsep)); 207 std::string portStr = (rem == 0 ? remaining : 208 remaining.substr(0, rem)); 209 210 tPort port; 211 port = atoi(portStr.c_str()); 212 if (port == 0) { 213 throw HdfsFatalException("Bad host-port for hdfs " + uri); 214 } 215 hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port); 216 return fs; 217 } 218 split(const std::string & s,char delim,std::vector<std::string> & elems)219 void split(const std::string &s, char delim, 220 std::vector<std::string> &elems) { 221 elems.clear(); 222 size_t prev = 0; 223 size_t pos = s.find(delim); 224 while (pos != std::string::npos) { 225 elems.push_back(s.substr(prev, pos)); 226 prev = pos + 1; 227 pos = s.find(delim, prev); 228 } 229 elems.push_back(s.substr(prev, s.size())); 230 } 231 }; 232 233 } // namespace ROCKSDB_NAMESPACE 234 235 #else // USE_HDFS 236 237 namespace ROCKSDB_NAMESPACE { 238 239 static const Status notsup; 240 241 class HdfsEnv : public Env { 242 243 public: HdfsEnv(const std::string &)244 explicit HdfsEnv(const std::string& /*fsname*/) { 245 fprintf(stderr, "You have not build rocksdb with HDFS support\n"); 246 fprintf(stderr, "Please see hdfs/README for details\n"); 247 abort(); 248 } 249 ~HdfsEnv()250 virtual ~HdfsEnv() { 251 } 252 253 virtual Status NewSequentialFile(const std::string& fname, 254 std::unique_ptr<SequentialFile>* result, 255 const EnvOptions& options) override; 256 NewRandomAccessFile(const std::string &,std::unique_ptr<RandomAccessFile> *,const EnvOptions &)257 virtual Status NewRandomAccessFile( 258 const std::string& /*fname*/, 259 std::unique_ptr<RandomAccessFile>* /*result*/, 260 const EnvOptions& /*options*/) override { 261 return notsup; 262 } 263 NewWritableFile(const std::string &,std::unique_ptr<WritableFile> *,const EnvOptions &)264 virtual Status NewWritableFile(const std::string& /*fname*/, 265 std::unique_ptr<WritableFile>* /*result*/, 266 const EnvOptions& /*options*/) override { 267 return notsup; 268 } 269 NewDirectory(const std::string &,std::unique_ptr<Directory> *)270 virtual Status NewDirectory(const std::string& /*name*/, 271 std::unique_ptr<Directory>* /*result*/) override { 272 return notsup; 273 } 274 FileExists(const std::string &)275 virtual Status FileExists(const std::string& /*fname*/) override { 276 return notsup; 277 } 278 GetChildren(const std::string &,std::vector<std::string> *)279 virtual Status GetChildren(const std::string& /*path*/, 280 std::vector<std::string>* /*result*/) override { 281 return notsup; 282 } 283 DeleteFile(const std::string &)284 virtual Status DeleteFile(const std::string& /*fname*/) override { 285 return notsup; 286 } 287 CreateDir(const std::string &)288 virtual Status CreateDir(const std::string& /*name*/) override { 289 return notsup; 290 } 291 CreateDirIfMissing(const std::string &)292 virtual Status CreateDirIfMissing(const std::string& /*name*/) override { 293 return notsup; 294 } 295 DeleteDir(const std::string &)296 virtual Status DeleteDir(const std::string& /*name*/) override { 297 return notsup; 298 } 299 GetFileSize(const std::string &,uint64_t *)300 virtual Status GetFileSize(const std::string& /*fname*/, 301 uint64_t* /*size*/) override { 302 return notsup; 303 } 304 GetFileModificationTime(const std::string &,uint64_t *)305 virtual Status GetFileModificationTime(const std::string& /*fname*/, 306 uint64_t* /*time*/) override { 307 return notsup; 308 } 309 RenameFile(const std::string &,const std::string &)310 virtual Status RenameFile(const std::string& /*src*/, 311 const std::string& /*target*/) override { 312 return notsup; 313 } 314 LinkFile(const std::string &,const std::string &)315 virtual Status LinkFile(const std::string& /*src*/, 316 const std::string& /*target*/) override { 317 return notsup; 318 } 319 LockFile(const std::string &,FileLock **)320 virtual Status LockFile(const std::string& /*fname*/, 321 FileLock** /*lock*/) override { 322 return notsup; 323 } 324 UnlockFile(FileLock *)325 virtual Status UnlockFile(FileLock* /*lock*/) override { return notsup; } 326 NewLogger(const std::string &,std::shared_ptr<Logger> *)327 virtual Status NewLogger(const std::string& /*fname*/, 328 std::shared_ptr<Logger>* /*result*/) override { 329 return notsup; 330 } 331 332 virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/, 333 Priority /*pri*/ = LOW, void* /*tag*/ = nullptr, 334 void (* /*unschedFunction*/)(void* arg) = 0) override {} 335 UnSchedule(void *,Priority)336 virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; } 337 StartThread(void (*)(void * arg),void *)338 virtual void StartThread(void (* /*function*/)(void* arg), 339 void* /*arg*/) override {} 340 WaitForJoin()341 virtual void WaitForJoin() override {} 342 343 virtual unsigned int GetThreadPoolQueueLen( 344 Priority /*pri*/ = LOW) const override { 345 return 0; 346 } 347 GetTestDirectory(std::string *)348 virtual Status GetTestDirectory(std::string* /*path*/) override { 349 return notsup; 350 } 351 NowMicros()352 virtual uint64_t NowMicros() override { return 0; } 353 SleepForMicroseconds(int)354 virtual void SleepForMicroseconds(int /*micros*/) override {} 355 GetHostName(char *,uint64_t)356 virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override { 357 return notsup; 358 } 359 GetCurrentTime(int64_t *)360 virtual Status GetCurrentTime(int64_t* /*unix_time*/) override { 361 return notsup; 362 } 363 GetAbsolutePath(const std::string &,std::string *)364 virtual Status GetAbsolutePath(const std::string& /*db_path*/, 365 std::string* /*outputpath*/) override { 366 return notsup; 367 } 368 369 virtual void SetBackgroundThreads(int /*number*/, 370 Priority /*pri*/ = LOW) override {} 371 virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override { 372 return 0; 373 } IncBackgroundThreadsIfNeeded(int,Priority)374 virtual void IncBackgroundThreadsIfNeeded(int /*number*/, 375 Priority /*pri*/) override {} TimeToString(uint64_t)376 virtual std::string TimeToString(uint64_t /*number*/) override { return ""; } 377 GetThreadID()378 virtual uint64_t GetThreadID() const override { 379 return 0; 380 } 381 }; 382 } // namespace ROCKSDB_NAMESPACE 383 384 #endif // USE_HDFS 385