1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <cstdint>
21 #include <memory>
22 #include <string>
23 #include <unordered_map>
24 #include <vector>
25 
26 #include "arrow/io/interfaces.h"
27 #include "arrow/util/macros.h"
28 #include "arrow/util/visibility.h"
29 
30 namespace arrow {
31 
32 class Buffer;
33 class MemoryPool;
34 class Status;
35 
36 namespace io {
37 
38 class HdfsReadableFile;
39 class HdfsOutputStream;
40 
41 /// DEPRECATED.  Use the FileSystem API in arrow::fs instead.
42 struct ObjectType {
43   enum type { FILE, DIRECTORY };
44 };
45 
46 /// DEPRECATED.  Use the FileSystem API in arrow::fs instead.
47 struct ARROW_EXPORT FileStatistics {
48   /// Size of file, -1 if finding length is unsupported
49   int64_t size;
50   ObjectType::type kind;
51 };
52 
53 class ARROW_EXPORT FileSystem {
54  public:
55   virtual ~FileSystem() = default;
56 
57   virtual Status MakeDirectory(const std::string& path) = 0;
58 
59   virtual Status DeleteDirectory(const std::string& path) = 0;
60 
61   virtual Status GetChildren(const std::string& path,
62                              std::vector<std::string>* listing) = 0;
63 
64   virtual Status Rename(const std::string& src, const std::string& dst) = 0;
65 
66   virtual Status Stat(const std::string& path, FileStatistics* stat) = 0;
67 };
68 
69 struct HdfsPathInfo {
70   ObjectType::type kind;
71 
72   std::string name;
73   std::string owner;
74   std::string group;
75 
76   // Access times in UNIX timestamps (seconds)
77   int64_t size;
78   int64_t block_size;
79 
80   int32_t last_modified_time;
81   int32_t last_access_time;
82 
83   int16_t replication;
84   int16_t permissions;
85 };
86 
87 struct HdfsConnectionConfig {
88   std::string host;
89   int port;
90   std::string user;
91   std::string kerb_ticket;
92   std::unordered_map<std::string, std::string> extra_conf;
93 };
94 
95 class ARROW_EXPORT HadoopFileSystem : public FileSystem {
96  public:
97   ~HadoopFileSystem() override;
98 
99   // Connect to an HDFS cluster given a configuration
100   //
101   // @param config (in): configuration for connecting
102   // @param fs (out): the created client
103   // @returns Status
104   static Status Connect(const HdfsConnectionConfig* config,
105                         std::shared_ptr<HadoopFileSystem>* fs);
106 
107   // Create directory and all parents
108   //
109   // @param path (in): absolute HDFS path
110   // @returns Status
111   Status MakeDirectory(const std::string& path) override;
112 
113   // Delete file or directory
114   // @param path absolute path to data
115   // @param recursive if path is a directory, delete contents as well
116   // @returns error status on failure
117   Status Delete(const std::string& path, bool recursive = false);
118 
119   Status DeleteDirectory(const std::string& path) override;
120 
121   // Disconnect from cluster
122   //
123   // @returns Status
124   Status Disconnect();
125 
126   // @param path (in): absolute HDFS path
127   // @returns bool, true if the path exists, false if not (or on error)
128   bool Exists(const std::string& path);
129 
130   // @param path (in): absolute HDFS path
131   // @param info (out)
132   // @returns Status
133   Status GetPathInfo(const std::string& path, HdfsPathInfo* info);
134 
135   // @param nbytes (out): total capacity of the filesystem
136   // @returns Status
137   Status GetCapacity(int64_t* nbytes);
138 
139   // @param nbytes (out): total bytes used of the filesystem
140   // @returns Status
141   Status GetUsed(int64_t* nbytes);
142 
143   Status GetChildren(const std::string& path, std::vector<std::string>* listing) override;
144 
145   /// List directory contents
146   ///
147   /// If path is a relative path, returned values will be absolute paths or URIs
148   /// starting from the current working directory.
149   Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing);
150 
151   /// Return the filesystem's current working directory.
152   ///
153   /// The working directory is the base path for all relative paths given to
154   /// other APIs.
155   /// NOTE: this actually returns a URI.
156   Status GetWorkingDirectory(std::string* out);
157 
158   /// Change
159   ///
160   /// @param path file path to change
161   /// @param owner pass null for no change
162   /// @param group pass null for no change
163   Status Chown(const std::string& path, const char* owner, const char* group);
164 
165   /// Change path permissions
166   ///
167   /// \param path Absolute path in file system
168   /// \param mode Mode bitset
169   /// \return Status
170   Status Chmod(const std::string& path, int mode);
171 
172   // Move file or directory from source path to destination path within the
173   // current filesystem
174   Status Rename(const std::string& src, const std::string& dst) override;
175 
176   Status Copy(const std::string& src, const std::string& dst);
177 
178   Status Move(const std::string& src, const std::string& dst);
179 
180   Status Stat(const std::string& path, FileStatistics* stat) override;
181 
182   // TODO(wesm): GetWorkingDirectory, SetWorkingDirectory
183 
184   // Open an HDFS file in READ mode. Returns error
185   // status if the file is not found.
186   //
187   // @param path complete file path
188   Status OpenReadable(const std::string& path, int32_t buffer_size,
189                       std::shared_ptr<HdfsReadableFile>* file);
190 
191   Status OpenReadable(const std::string& path, int32_t buffer_size,
192                       const io::IOContext& io_context,
193                       std::shared_ptr<HdfsReadableFile>* file);
194 
195   Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file);
196 
197   Status OpenReadable(const std::string& path, const io::IOContext& io_context,
198                       std::shared_ptr<HdfsReadableFile>* file);
199 
200   // FileMode::WRITE options
201   // @param path complete file path
202   // @param buffer_size 0 by default
203   // @param replication 0 by default
204   // @param default_block_size 0 by default
205   Status OpenWritable(const std::string& path, bool append, int32_t buffer_size,
206                       int16_t replication, int64_t default_block_size,
207                       std::shared_ptr<HdfsOutputStream>* file);
208 
209   Status OpenWritable(const std::string& path, bool append,
210                       std::shared_ptr<HdfsOutputStream>* file);
211 
212  private:
213   friend class HdfsReadableFile;
214   friend class HdfsOutputStream;
215 
216   class ARROW_NO_EXPORT HadoopFileSystemImpl;
217   std::unique_ptr<HadoopFileSystemImpl> impl_;
218 
219   HadoopFileSystem();
220   ARROW_DISALLOW_COPY_AND_ASSIGN(HadoopFileSystem);
221 };
222 
223 class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
224  public:
225   ~HdfsReadableFile() override;
226 
227   Status Close() override;
228 
229   bool closed() const override;
230 
231   // NOTE: If you wish to read a particular range of a file in a multithreaded
232   // context, you may prefer to use ReadAt to avoid locking issues
233   Result<int64_t> Read(int64_t nbytes, void* out) override;
234   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
235   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
236   Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override;
237 
238   Status Seek(int64_t position) override;
239   Result<int64_t> Tell() const override;
240   Result<int64_t> GetSize() override;
241 
242  private:
243   explicit HdfsReadableFile(const io::IOContext&);
244 
245   class ARROW_NO_EXPORT HdfsReadableFileImpl;
246   std::unique_ptr<HdfsReadableFileImpl> impl_;
247 
248   friend class HadoopFileSystem::HadoopFileSystemImpl;
249 
250   ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile);
251 };
252 
253 // Naming this file OutputStream because it does not support seeking (like the
254 // WritableFile interface)
255 class ARROW_EXPORT HdfsOutputStream : public OutputStream {
256  public:
257   ~HdfsOutputStream() override;
258 
259   Status Close() override;
260 
261   bool closed() const override;
262 
263   using OutputStream::Write;
264   Status Write(const void* buffer, int64_t nbytes) override;
265 
266   Status Flush() override;
267 
268   Result<int64_t> Tell() const override;
269 
270  private:
271   class ARROW_NO_EXPORT HdfsOutputStreamImpl;
272   std::unique_ptr<HdfsOutputStreamImpl> impl_;
273 
274   friend class HadoopFileSystem::HadoopFileSystemImpl;
275 
276   HdfsOutputStream();
277 
278   ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream);
279 };
280 
281 Status ARROW_EXPORT HaveLibHdfs();
282 
283 }  // namespace io
284 }  // namespace arrow
285