1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <algorithm>
19 #include <cerrno>
20 #include <cstdint>
21 #include <cstring>
22 #include <limits>
23 #include <memory>
24 #include <mutex>
25 #include <sstream>
26 #include <string>
27 #include <unordered_map>
28 #include <utility>
29 #include <vector>
30 
31 #include "arrow/buffer.h"
32 #include "arrow/io/hdfs.h"
33 #include "arrow/io/hdfs_internal.h"
34 #include "arrow/io/interfaces.h"
35 #include "arrow/memory_pool.h"
36 #include "arrow/result.h"
37 #include "arrow/status.h"
38 #include "arrow/util/io_util.h"
39 #include "arrow/util/logging.h"
40 
41 using std::size_t;
42 
43 namespace arrow {
44 
45 using internal::IOErrorFromErrno;
46 
47 namespace io {
48 
49 namespace {
50 
TranslateErrno(int error_code)51 std::string TranslateErrno(int error_code) {
52   std::stringstream ss;
53   ss << error_code << " (" << strerror(error_code) << ")";
54   if (error_code == 255) {
55     // Unknown error can occur if the host is correct but the port is not
56     ss << " Please check that you are connecting to the correct HDFS RPC port";
57   }
58   return ss.str();
59 }
60 
61 }  // namespace
62 
63 #define CHECK_FAILURE(RETURN_VALUE, WHAT)                                               \
64   do {                                                                                  \
65     if (RETURN_VALUE == -1) {                                                           \
66       return Status::IOError("HDFS ", WHAT, " failed, errno: ", TranslateErrno(errno)); \
67     }                                                                                   \
68   } while (0)
69 
70 static constexpr int kDefaultHdfsBufferSize = 1 << 16;
71 
72 // ----------------------------------------------------------------------
73 // File reading
74 
75 class HdfsAnyFileImpl {
76  public:
set_members(const std::string & path,internal::LibHdfsShim * driver,hdfsFS fs,hdfsFile handle)77   void set_members(const std::string& path, internal::LibHdfsShim* driver, hdfsFS fs,
78                    hdfsFile handle) {
79     path_ = path;
80     driver_ = driver;
81     fs_ = fs;
82     file_ = handle;
83     is_open_ = true;
84   }
85 
Seek(int64_t position)86   Status Seek(int64_t position) {
87     RETURN_NOT_OK(CheckClosed());
88     int ret = driver_->Seek(fs_, file_, position);
89     CHECK_FAILURE(ret, "seek");
90     return Status::OK();
91   }
92 
Tell()93   Result<int64_t> Tell() {
94     RETURN_NOT_OK(CheckClosed());
95     int64_t ret = driver_->Tell(fs_, file_);
96     CHECK_FAILURE(ret, "tell");
97     return ret;
98   }
99 
is_open() const100   bool is_open() const { return is_open_; }
101 
102  protected:
CheckClosed()103   Status CheckClosed() {
104     if (!is_open_) {
105       return Status::Invalid("Operation on closed HDFS file");
106     }
107     return Status::OK();
108   }
109 
110   std::string path_;
111 
112   internal::LibHdfsShim* driver_;
113 
114   // For threadsafety
115   std::mutex lock_;
116 
117   // These are pointers in libhdfs, so OK to copy
118   hdfsFS fs_;
119   hdfsFile file_;
120 
121   bool is_open_;
122 };
123 
124 namespace {
125 
GetPathInfoFailed(const std::string & path)126 Status GetPathInfoFailed(const std::string& path) {
127   std::stringstream ss;
128   ss << "Calling GetPathInfo for " << path << " failed. errno: " << TranslateErrno(errno);
129   return Status::IOError(ss.str());
130 }
131 
132 }  // namespace
133 
134 // Private implementation for read-only files
135 class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
136  public:
HdfsReadableFileImpl(MemoryPool * pool)137   explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {}
138 
Close()139   Status Close() {
140     if (is_open_) {
141       // is_open_ must be set to false in the beginning, because the destructor
142       // attempts to close the stream again, and if the first close fails, then
143       // the error doesn't get propagated properly and the second close
144       // initiated by the destructor raises a segfault
145       is_open_ = false;
146       int ret = driver_->CloseFile(fs_, file_);
147       CHECK_FAILURE(ret, "CloseFile");
148     }
149     return Status::OK();
150   }
151 
closed() const152   bool closed() const { return !is_open_; }
153 
ReadAt(int64_t position,int64_t nbytes,uint8_t * buffer)154   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, uint8_t* buffer) {
155     RETURN_NOT_OK(CheckClosed());
156     if (!driver_->HasPread()) {
157       std::lock_guard<std::mutex> guard(lock_);
158       RETURN_NOT_OK(Seek(position));
159       return Read(nbytes, buffer);
160     }
161 
162     constexpr int64_t kMaxBlockSize = std::numeric_limits<int32_t>::max();
163     int64_t total_bytes = 0;
164     while (nbytes > 0) {
165       const auto block_size = static_cast<tSize>(std::min(kMaxBlockSize, nbytes));
166       tSize ret =
167           driver_->Pread(fs_, file_, static_cast<tOffset>(position), buffer, block_size);
168       CHECK_FAILURE(ret, "read");
169       DCHECK_LE(ret, block_size);
170       if (ret == 0) {
171         break;  // EOF
172       }
173       buffer += ret;
174       total_bytes += ret;
175       position += ret;
176       nbytes -= ret;
177     }
178     return total_bytes;
179   }
180 
ReadAt(int64_t position,int64_t nbytes)181   Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) {
182     RETURN_NOT_OK(CheckClosed());
183 
184     ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
185     ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
186                           ReadAt(position, nbytes, buffer->mutable_data()));
187     if (bytes_read < nbytes) {
188       RETURN_NOT_OK(buffer->Resize(bytes_read));
189       buffer->ZeroPadding();
190     }
191     return std::move(buffer);
192   }
193 
Read(int64_t nbytes,void * buffer)194   Result<int64_t> Read(int64_t nbytes, void* buffer) {
195     RETURN_NOT_OK(CheckClosed());
196 
197     int64_t total_bytes = 0;
198     while (total_bytes < nbytes) {
199       tSize ret = driver_->Read(
200           fs_, file_, reinterpret_cast<uint8_t*>(buffer) + total_bytes,
201           static_cast<tSize>(std::min<int64_t>(buffer_size_, nbytes - total_bytes)));
202       CHECK_FAILURE(ret, "read");
203       total_bytes += ret;
204       if (ret == 0) {
205         break;
206       }
207     }
208     return total_bytes;
209   }
210 
Read(int64_t nbytes)211   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) {
212     RETURN_NOT_OK(CheckClosed());
213 
214     ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
215     ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()));
216     if (bytes_read < nbytes) {
217       RETURN_NOT_OK(buffer->Resize(bytes_read));
218     }
219     return std::move(buffer);
220   }
221 
GetSize()222   Result<int64_t> GetSize() {
223     RETURN_NOT_OK(CheckClosed());
224 
225     hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str());
226     if (entry == nullptr) {
227       return GetPathInfoFailed(path_);
228     }
229     int64_t size = entry->mSize;
230     driver_->FreeFileInfo(entry, 1);
231     return size;
232   }
233 
set_memory_pool(MemoryPool * pool)234   void set_memory_pool(MemoryPool* pool) { pool_ = pool; }
235 
set_buffer_size(int32_t buffer_size)236   void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; }
237 
238  private:
239   MemoryPool* pool_;
240   int32_t buffer_size_;
241 };
242 
HdfsReadableFile(const io::IOContext & io_context)243 HdfsReadableFile::HdfsReadableFile(const io::IOContext& io_context) {
244   impl_.reset(new HdfsReadableFileImpl(io_context.pool()));
245 }
246 
~HdfsReadableFile()247 HdfsReadableFile::~HdfsReadableFile() { DCHECK_OK(impl_->Close()); }
248 
Close()249 Status HdfsReadableFile::Close() { return impl_->Close(); }
250 
closed() const251 bool HdfsReadableFile::closed() const { return impl_->closed(); }
252 
ReadAt(int64_t position,int64_t nbytes,void * buffer)253 Result<int64_t> HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, void* buffer) {
254   return impl_->ReadAt(position, nbytes, reinterpret_cast<uint8_t*>(buffer));
255 }
256 
ReadAt(int64_t position,int64_t nbytes)257 Result<std::shared_ptr<Buffer>> HdfsReadableFile::ReadAt(int64_t position,
258                                                          int64_t nbytes) {
259   return impl_->ReadAt(position, nbytes);
260 }
261 
Read(int64_t nbytes,void * buffer)262 Result<int64_t> HdfsReadableFile::Read(int64_t nbytes, void* buffer) {
263   return impl_->Read(nbytes, buffer);
264 }
265 
Read(int64_t nbytes)266 Result<std::shared_ptr<Buffer>> HdfsReadableFile::Read(int64_t nbytes) {
267   return impl_->Read(nbytes);
268 }
269 
GetSize()270 Result<int64_t> HdfsReadableFile::GetSize() { return impl_->GetSize(); }
271 
Seek(int64_t position)272 Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); }
273 
Tell() const274 Result<int64_t> HdfsReadableFile::Tell() const { return impl_->Tell(); }
275 
276 // ----------------------------------------------------------------------
277 // File writing
278 
279 // Private implementation for writable-only files
280 class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl {
281  public:
HdfsOutputStreamImpl()282   HdfsOutputStreamImpl() {}
283 
Close()284   Status Close() {
285     if (is_open_) {
286       // is_open_ must be set to false in the beginning, because the destructor
287       // attempts to close the stream again, and if the first close fails, then
288       // the error doesn't get propagated properly and the second close
289       // initiated by the destructor raises a segfault
290       is_open_ = false;
291       RETURN_NOT_OK(FlushInternal());
292       int ret = driver_->CloseFile(fs_, file_);
293       CHECK_FAILURE(ret, "CloseFile");
294     }
295     return Status::OK();
296   }
297 
closed() const298   bool closed() const { return !is_open_; }
299 
Flush()300   Status Flush() {
301     RETURN_NOT_OK(CheckClosed());
302 
303     return FlushInternal();
304   }
305 
Write(const uint8_t * buffer,int64_t nbytes)306   Status Write(const uint8_t* buffer, int64_t nbytes) {
307     RETURN_NOT_OK(CheckClosed());
308 
309     constexpr int64_t kMaxBlockSize = std::numeric_limits<int32_t>::max();
310 
311     std::lock_guard<std::mutex> guard(lock_);
312     while (nbytes > 0) {
313       const auto block_size = static_cast<tSize>(std::min(kMaxBlockSize, nbytes));
314       tSize ret = driver_->Write(fs_, file_, buffer, block_size);
315       CHECK_FAILURE(ret, "Write");
316       DCHECK_LE(ret, block_size);
317       buffer += ret;
318       nbytes -= ret;
319     }
320     return Status::OK();
321   }
322 
323  protected:
FlushInternal()324   Status FlushInternal() {
325     int ret = driver_->Flush(fs_, file_);
326     CHECK_FAILURE(ret, "Flush");
327     return Status::OK();
328   }
329 };
330 
HdfsOutputStream()331 HdfsOutputStream::HdfsOutputStream() { impl_.reset(new HdfsOutputStreamImpl()); }
332 
~HdfsOutputStream()333 HdfsOutputStream::~HdfsOutputStream() { DCHECK_OK(impl_->Close()); }
334 
Close()335 Status HdfsOutputStream::Close() { return impl_->Close(); }
336 
closed() const337 bool HdfsOutputStream::closed() const { return impl_->closed(); }
338 
Write(const void * buffer,int64_t nbytes)339 Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes) {
340   return impl_->Write(reinterpret_cast<const uint8_t*>(buffer), nbytes);
341 }
342 
Flush()343 Status HdfsOutputStream::Flush() { return impl_->Flush(); }
344 
Tell() const345 Result<int64_t> HdfsOutputStream::Tell() const { return impl_->Tell(); }
346 
347 // ----------------------------------------------------------------------
348 // HDFS client
349 
350 // TODO(wesm): this could throw std::bad_alloc in the course of copying strings
351 // into the path info object
SetPathInfo(const hdfsFileInfo * input,HdfsPathInfo * out)352 static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) {
353   out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY;
354   out->name = std::string(input->mName);
355   out->owner = std::string(input->mOwner);
356   out->group = std::string(input->mGroup);
357 
358   out->last_access_time = static_cast<int32_t>(input->mLastAccess);
359   out->last_modified_time = static_cast<int32_t>(input->mLastMod);
360   out->size = static_cast<int64_t>(input->mSize);
361 
362   out->replication = input->mReplication;
363   out->block_size = input->mBlockSize;
364 
365   out->permissions = input->mPermissions;
366 }
367 
368 // Private implementation
369 class HadoopFileSystem::HadoopFileSystemImpl {
370  public:
HadoopFileSystemImpl()371   HadoopFileSystemImpl() : driver_(NULLPTR), port_(0), fs_(NULLPTR) {}
372 
Connect(const HdfsConnectionConfig * config)373   Status Connect(const HdfsConnectionConfig* config) {
374     RETURN_NOT_OK(ConnectLibHdfs(&driver_));
375 
376     // connect to HDFS with the builder object
377     hdfsBuilder* builder = driver_->NewBuilder();
378     if (!config->host.empty()) {
379       driver_->BuilderSetNameNode(builder, config->host.c_str());
380     }
381     driver_->BuilderSetNameNodePort(builder, static_cast<tPort>(config->port));
382     if (!config->user.empty()) {
383       driver_->BuilderSetUserName(builder, config->user.c_str());
384     }
385     if (!config->kerb_ticket.empty()) {
386       driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str());
387     }
388 
389     for (const auto& kv : config->extra_conf) {
390       int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str());
391       CHECK_FAILURE(ret, "confsetstr");
392     }
393 
394     driver_->BuilderSetForceNewInstance(builder);
395     fs_ = driver_->BuilderConnect(builder);
396 
397     if (fs_ == nullptr) {
398       return Status::IOError("HDFS connection failed");
399     }
400     namenode_host_ = config->host;
401     port_ = config->port;
402     user_ = config->user;
403     kerb_ticket_ = config->kerb_ticket;
404 
405     return Status::OK();
406   }
407 
MakeDirectory(const std::string & path)408   Status MakeDirectory(const std::string& path) {
409     int ret = driver_->MakeDirectory(fs_, path.c_str());
410     CHECK_FAILURE(ret, "create directory");
411     return Status::OK();
412   }
413 
Delete(const std::string & path,bool recursive)414   Status Delete(const std::string& path, bool recursive) {
415     int ret = driver_->Delete(fs_, path.c_str(), static_cast<int>(recursive));
416     CHECK_FAILURE(ret, "delete");
417     return Status::OK();
418   }
419 
Disconnect()420   Status Disconnect() {
421     int ret = driver_->Disconnect(fs_);
422     CHECK_FAILURE(ret, "hdfsFS::Disconnect");
423     return Status::OK();
424   }
425 
Exists(const std::string & path)426   bool Exists(const std::string& path) {
427     // hdfsExists does not distinguish between RPC failure and the file not
428     // existing
429     int ret = driver_->Exists(fs_, path.c_str());
430     return ret == 0;
431   }
432 
GetCapacity(int64_t * nbytes)433   Status GetCapacity(int64_t* nbytes) {
434     tOffset ret = driver_->GetCapacity(fs_);
435     CHECK_FAILURE(ret, "GetCapacity");
436     *nbytes = ret;
437     return Status::OK();
438   }
439 
GetUsed(int64_t * nbytes)440   Status GetUsed(int64_t* nbytes) {
441     tOffset ret = driver_->GetUsed(fs_);
442     CHECK_FAILURE(ret, "GetUsed");
443     *nbytes = ret;
444     return Status::OK();
445   }
446 
GetWorkingDirectory(std::string * out)447   Status GetWorkingDirectory(std::string* out) {
448     char buffer[2048];
449     if (driver_->GetWorkingDirectory(fs_, buffer, sizeof(buffer) - 1) == nullptr) {
450       return Status::IOError("HDFS GetWorkingDirectory failed, errno: ",
451                              TranslateErrno(errno));
452     }
453     *out = buffer;
454     return Status::OK();
455   }
456 
GetPathInfo(const std::string & path,HdfsPathInfo * info)457   Status GetPathInfo(const std::string& path, HdfsPathInfo* info) {
458     hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str());
459 
460     if (entry == nullptr) {
461       return GetPathInfoFailed(path);
462     }
463 
464     SetPathInfo(entry, info);
465     driver_->FreeFileInfo(entry, 1);
466 
467     return Status::OK();
468   }
469 
Stat(const std::string & path,FileStatistics * stat)470   Status Stat(const std::string& path, FileStatistics* stat) {
471     HdfsPathInfo info;
472     RETURN_NOT_OK(GetPathInfo(path, &info));
473 
474     stat->size = info.size;
475     stat->kind = info.kind;
476     return Status::OK();
477   }
478 
GetChildren(const std::string & path,std::vector<std::string> * listing)479   Status GetChildren(const std::string& path, std::vector<std::string>* listing) {
480     std::vector<HdfsPathInfo> detailed_listing;
481     RETURN_NOT_OK(ListDirectory(path, &detailed_listing));
482     for (const auto& info : detailed_listing) {
483       listing->push_back(info.name);
484     }
485     return Status::OK();
486   }
487 
ListDirectory(const std::string & path,std::vector<HdfsPathInfo> * listing)488   Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) {
489     int num_entries = 0;
490     errno = 0;
491     hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries);
492 
493     if (entries == nullptr) {
494       // If the directory is empty, entries is NULL but errno is 0. Non-zero
495       // errno indicates error
496       //
497       // Note: errno is thread-local
498       //
499       // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set
500       // errno 2/ENOENT for empty directories. To be more robust to this we
501       // double check this case
502       if ((errno == 0) || (errno == ENOENT && Exists(path))) {
503         num_entries = 0;
504       } else {
505         return Status::IOError("HDFS list directory failed, errno: ",
506                                TranslateErrno(errno));
507       }
508     }
509 
510     // Allocate additional space for elements
511     int vec_offset = static_cast<int>(listing->size());
512     listing->resize(vec_offset + num_entries);
513 
514     for (int i = 0; i < num_entries; ++i) {
515       SetPathInfo(entries + i, &(*listing)[vec_offset + i]);
516     }
517 
518     // Free libhdfs file info
519     driver_->FreeFileInfo(entries, num_entries);
520 
521     return Status::OK();
522   }
523 
OpenReadable(const std::string & path,int32_t buffer_size,const io::IOContext & io_context,std::shared_ptr<HdfsReadableFile> * file)524   Status OpenReadable(const std::string& path, int32_t buffer_size,
525                       const io::IOContext& io_context,
526                       std::shared_ptr<HdfsReadableFile>* file) {
527     errno = 0;
528     hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0);
529 
530     if (handle == nullptr) {
531       if (errno) {
532         return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed");
533       } else {
534         return Status::IOError("Opening HDFS file '", path, "' failed");
535       }
536     }
537 
538     // std::make_shared does not work with private ctors
539     *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile(io_context));
540     (*file)->impl_->set_members(path, driver_, fs_, handle);
541     (*file)->impl_->set_buffer_size(buffer_size);
542 
543     return Status::OK();
544   }
545 
OpenWritable(const std::string & path,bool append,int32_t buffer_size,int16_t replication,int64_t default_block_size,std::shared_ptr<HdfsOutputStream> * file)546   Status OpenWritable(const std::string& path, bool append, int32_t buffer_size,
547                       int16_t replication, int64_t default_block_size,
548                       std::shared_ptr<HdfsOutputStream>* file) {
549     int flags = O_WRONLY;
550     if (append) flags |= O_APPEND;
551 
552     errno = 0;
553     hdfsFile handle =
554         driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, replication,
555                           static_cast<tSize>(default_block_size));
556 
557     if (handle == nullptr) {
558       if (errno) {
559         return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed");
560       } else {
561         return Status::IOError("Opening HDFS file '", path, "' failed");
562       }
563     }
564 
565     // std::make_shared does not work with private ctors
566     *file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream());
567     (*file)->impl_->set_members(path, driver_, fs_, handle);
568 
569     return Status::OK();
570   }
571 
Rename(const std::string & src,const std::string & dst)572   Status Rename(const std::string& src, const std::string& dst) {
573     int ret = driver_->Rename(fs_, src.c_str(), dst.c_str());
574     CHECK_FAILURE(ret, "Rename");
575     return Status::OK();
576   }
577 
Copy(const std::string & src,const std::string & dst)578   Status Copy(const std::string& src, const std::string& dst) {
579     int ret = driver_->Copy(fs_, src.c_str(), fs_, dst.c_str());
580     CHECK_FAILURE(ret, "Rename");
581     return Status::OK();
582   }
583 
Move(const std::string & src,const std::string & dst)584   Status Move(const std::string& src, const std::string& dst) {
585     int ret = driver_->Move(fs_, src.c_str(), fs_, dst.c_str());
586     CHECK_FAILURE(ret, "Rename");
587     return Status::OK();
588   }
589 
Chmod(const std::string & path,int mode)590   Status Chmod(const std::string& path, int mode) {
591     int ret = driver_->Chmod(fs_, path.c_str(), static_cast<short>(mode));  // NOLINT
592     CHECK_FAILURE(ret, "Chmod");
593     return Status::OK();
594   }
595 
Chown(const std::string & path,const char * owner,const char * group)596   Status Chown(const std::string& path, const char* owner, const char* group) {
597     int ret = driver_->Chown(fs_, path.c_str(), owner, group);
598     CHECK_FAILURE(ret, "Chown");
599     return Status::OK();
600   }
601 
602  private:
603   internal::LibHdfsShim* driver_;
604 
605   std::string namenode_host_;
606   std::string user_;
607   int port_;
608   std::string kerb_ticket_;
609 
610   hdfsFS fs_;
611 };
612 
613 // ----------------------------------------------------------------------
614 // Public API for HDFSClient
615 
HadoopFileSystem()616 HadoopFileSystem::HadoopFileSystem() { impl_.reset(new HadoopFileSystemImpl()); }
617 
~HadoopFileSystem()618 HadoopFileSystem::~HadoopFileSystem() {}
619 
Connect(const HdfsConnectionConfig * config,std::shared_ptr<HadoopFileSystem> * fs)620 Status HadoopFileSystem::Connect(const HdfsConnectionConfig* config,
621                                  std::shared_ptr<HadoopFileSystem>* fs) {
622   // ctor is private, make_shared will not work
623   *fs = std::shared_ptr<HadoopFileSystem>(new HadoopFileSystem());
624 
625   RETURN_NOT_OK((*fs)->impl_->Connect(config));
626   return Status::OK();
627 }
628 
MakeDirectory(const std::string & path)629 Status HadoopFileSystem::MakeDirectory(const std::string& path) {
630   return impl_->MakeDirectory(path);
631 }
632 
Delete(const std::string & path,bool recursive)633 Status HadoopFileSystem::Delete(const std::string& path, bool recursive) {
634   return impl_->Delete(path, recursive);
635 }
636 
DeleteDirectory(const std::string & path)637 Status HadoopFileSystem::DeleteDirectory(const std::string& path) {
638   return Delete(path, true);
639 }
640 
Disconnect()641 Status HadoopFileSystem::Disconnect() { return impl_->Disconnect(); }
642 
Exists(const std::string & path)643 bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); }
644 
GetPathInfo(const std::string & path,HdfsPathInfo * info)645 Status HadoopFileSystem::GetPathInfo(const std::string& path, HdfsPathInfo* info) {
646   return impl_->GetPathInfo(path, info);
647 }
648 
Stat(const std::string & path,FileStatistics * stat)649 Status HadoopFileSystem::Stat(const std::string& path, FileStatistics* stat) {
650   return impl_->Stat(path, stat);
651 }
652 
GetCapacity(int64_t * nbytes)653 Status HadoopFileSystem::GetCapacity(int64_t* nbytes) {
654   return impl_->GetCapacity(nbytes);
655 }
656 
GetUsed(int64_t * nbytes)657 Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); }
658 
GetWorkingDirectory(std::string * out)659 Status HadoopFileSystem::GetWorkingDirectory(std::string* out) {
660   return impl_->GetWorkingDirectory(out);
661 }
662 
GetChildren(const std::string & path,std::vector<std::string> * listing)663 Status HadoopFileSystem::GetChildren(const std::string& path,
664                                      std::vector<std::string>* listing) {
665   return impl_->GetChildren(path, listing);
666 }
667 
ListDirectory(const std::string & path,std::vector<HdfsPathInfo> * listing)668 Status HadoopFileSystem::ListDirectory(const std::string& path,
669                                        std::vector<HdfsPathInfo>* listing) {
670   return impl_->ListDirectory(path, listing);
671 }
672 
OpenReadable(const std::string & path,int32_t buffer_size,std::shared_ptr<HdfsReadableFile> * file)673 Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size,
674                                       std::shared_ptr<HdfsReadableFile>* file) {
675   return impl_->OpenReadable(path, buffer_size, io::default_io_context(), file);
676 }
677 
OpenReadable(const std::string & path,std::shared_ptr<HdfsReadableFile> * file)678 Status HadoopFileSystem::OpenReadable(const std::string& path,
679                                       std::shared_ptr<HdfsReadableFile>* file) {
680   return OpenReadable(path, kDefaultHdfsBufferSize, io::default_io_context(), file);
681 }
682 
OpenReadable(const std::string & path,int32_t buffer_size,const io::IOContext & io_context,std::shared_ptr<HdfsReadableFile> * file)683 Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size,
684                                       const io::IOContext& io_context,
685                                       std::shared_ptr<HdfsReadableFile>* file) {
686   return impl_->OpenReadable(path, buffer_size, io_context, file);
687 }
688 
OpenReadable(const std::string & path,const io::IOContext & io_context,std::shared_ptr<HdfsReadableFile> * file)689 Status HadoopFileSystem::OpenReadable(const std::string& path,
690                                       const io::IOContext& io_context,
691                                       std::shared_ptr<HdfsReadableFile>* file) {
692   return OpenReadable(path, kDefaultHdfsBufferSize, io_context, file);
693 }
694 
OpenWritable(const std::string & path,bool append,int32_t buffer_size,int16_t replication,int64_t default_block_size,std::shared_ptr<HdfsOutputStream> * file)695 Status HadoopFileSystem::OpenWritable(const std::string& path, bool append,
696                                       int32_t buffer_size, int16_t replication,
697                                       int64_t default_block_size,
698                                       std::shared_ptr<HdfsOutputStream>* file) {
699   return impl_->OpenWritable(path, append, buffer_size, replication, default_block_size,
700                              file);
701 }
702 
OpenWritable(const std::string & path,bool append,std::shared_ptr<HdfsOutputStream> * file)703 Status HadoopFileSystem::OpenWritable(const std::string& path, bool append,
704                                       std::shared_ptr<HdfsOutputStream>* file) {
705   return OpenWritable(path, append, 0, 0, 0, file);
706 }
707 
Chmod(const std::string & path,int mode)708 Status HadoopFileSystem::Chmod(const std::string& path, int mode) {
709   return impl_->Chmod(path, mode);
710 }
711 
Chown(const std::string & path,const char * owner,const char * group)712 Status HadoopFileSystem::Chown(const std::string& path, const char* owner,
713                                const char* group) {
714   return impl_->Chown(path, owner, group);
715 }
716 
Rename(const std::string & src,const std::string & dst)717 Status HadoopFileSystem::Rename(const std::string& src, const std::string& dst) {
718   return impl_->Rename(src, dst);
719 }
720 
Copy(const std::string & src,const std::string & dst)721 Status HadoopFileSystem::Copy(const std::string& src, const std::string& dst) {
722   return impl_->Copy(src, dst);
723 }
724 
Move(const std::string & src,const std::string & dst)725 Status HadoopFileSystem::Move(const std::string& src, const std::string& dst) {
726   return impl_->Move(src, dst);
727 }
728 
729 // ----------------------------------------------------------------------
730 // Allow public API users to check whether we are set up correctly
731 
HaveLibHdfs()732 Status HaveLibHdfs() {
733   internal::LibHdfsShim* driver;
734   return internal::ConnectLibHdfs(&driver);
735 }
736 
737 }  // namespace io
738 }  // namespace arrow
739