1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18# ----------------------------------------------------------------------
19# HDFS IO implementation
20
21from queue import Queue, Empty as QueueEmpty, Full as QueueFull
22
23
24_HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)')
25
26
27def have_libhdfs():
28    try:
29        with nogil:
30            check_status(HaveLibHdfs())
31        return True
32    except Exception:
33        return False
34
35
36def strip_hdfs_abspath(path):
37    m = _HDFS_PATH_RE.match(path)
38    if m:
39        return m.group(3)
40    else:
41        return path
42
43
44cdef class HadoopFileSystem:
45    cdef:
46        shared_ptr[CIOHadoopFileSystem] client
47
48    cdef readonly:
49        bint is_open
50        object host
51        object user
52        object kerb_ticket
53        int port
54        dict extra_conf
55
56    def _connect(self, host, port, user, kerb_ticket, extra_conf):
57        cdef HdfsConnectionConfig conf
58
59        if host is not None:
60            conf.host = tobytes(host)
61        self.host = host
62
63        conf.port = port
64        self.port = port
65
66        if user is not None:
67            conf.user = tobytes(user)
68        self.user = user
69
70        if kerb_ticket is not None:
71            conf.kerb_ticket = tobytes(kerb_ticket)
72        self.kerb_ticket = kerb_ticket
73
74        with nogil:
75            check_status(HaveLibHdfs())
76
77        if extra_conf is not None and isinstance(extra_conf, dict):
78            conf.extra_conf = {tobytes(k): tobytes(v)
79                               for k, v in extra_conf.items()}
80        self.extra_conf = extra_conf
81
82        with nogil:
83            check_status(CIOHadoopFileSystem.Connect(&conf, &self.client))
84        self.is_open = True
85
86    @classmethod
87    def connect(cls, *args, **kwargs):
88        return cls(*args, **kwargs)
89
90    def __dealloc__(self):
91        if self.is_open:
92            self.close()
93
94    def close(self):
95        """
96        Disconnect from the HDFS cluster
97        """
98        self._ensure_client()
99        with nogil:
100            check_status(self.client.get().Disconnect())
101        self.is_open = False
102
103    cdef _ensure_client(self):
104        if self.client.get() == NULL:
105            raise IOError('HDFS client improperly initialized')
106        elif not self.is_open:
107            raise IOError('HDFS client is closed')
108
109    def exists(self, path):
110        """
111        Returns True if the path is known to the cluster, False if it does not
112        (or there is an RPC error)
113        """
114        self._ensure_client()
115
116        cdef c_string c_path = tobytes(path)
117        cdef c_bool result
118        with nogil:
119            result = self.client.get().Exists(c_path)
120        return result
121
122    def isdir(self, path):
123        cdef HdfsPathInfo info
124        try:
125            self._path_info(path, &info)
126        except ArrowIOError:
127            return False
128        return info.kind == ObjectType_DIRECTORY
129
130    def isfile(self, path):
131        cdef HdfsPathInfo info
132        try:
133            self._path_info(path, &info)
134        except ArrowIOError:
135            return False
136        return info.kind == ObjectType_FILE
137
138    def get_capacity(self):
139        """
140        Get reported total capacity of file system
141
142        Returns
143        -------
144        capacity : int
145        """
146        cdef int64_t capacity = 0
147        with nogil:
148            check_status(self.client.get().GetCapacity(&capacity))
149        return capacity
150
151    def get_space_used(self):
152        """
153        Get space used on file system
154
155        Returns
156        -------
157        space_used : int
158        """
159        cdef int64_t space_used = 0
160        with nogil:
161            check_status(self.client.get().GetUsed(&space_used))
162        return space_used
163
164    def df(self):
165        """
166        Return free space on disk, like the UNIX df command
167
168        Returns
169        -------
170        space : int
171        """
172        return self.get_capacity() - self.get_space_used()
173
174    def rename(self, path, new_path):
175        cdef c_string c_path = tobytes(path)
176        cdef c_string c_new_path = tobytes(new_path)
177        with nogil:
178            check_status(self.client.get().Rename(c_path, c_new_path))
179
180    def info(self, path):
181        """
182        Return detailed HDFS information for path
183
184        Parameters
185        ----------
186        path : string
187            Path to file or directory
188
189        Returns
190        -------
191        path_info : dict
192        """
193        cdef HdfsPathInfo info
194        self._path_info(path, &info)
195        return {
196            'path': frombytes(info.name),
197            'owner': frombytes(info.owner),
198            'group': frombytes(info.group),
199            'size': info.size,
200            'block_size': info.block_size,
201            'last_modified': info.last_modified_time,
202            'last_accessed': info.last_access_time,
203            'replication': info.replication,
204            'permissions': info.permissions,
205            'kind': ('directory' if info.kind == ObjectType_DIRECTORY
206                     else 'file')
207        }
208
209    def stat(self, path):
210        """
211        Return basic file system statistics about path
212
213        Parameters
214        ----------
215        path : string
216            Path to file or directory
217
218        Returns
219        -------
220        stat : dict
221        """
222        cdef FileStatistics info
223        cdef c_string c_path = tobytes(path)
224        with nogil:
225            check_status(self.client.get()
226                         .Stat(c_path, &info))
227        return {
228            'size': info.size,
229            'kind': ('directory' if info.kind == ObjectType_DIRECTORY
230                     else 'file')
231        }
232
233    cdef _path_info(self, path, HdfsPathInfo* info):
234        cdef c_string c_path = tobytes(path)
235
236        with nogil:
237            check_status(self.client.get()
238                         .GetPathInfo(c_path, info))
239
240    def ls(self, path, bint full_info):
241        cdef:
242            c_string c_path = tobytes(path)
243            vector[HdfsPathInfo] listing
244            list results = []
245            int i
246
247        self._ensure_client()
248
249        with nogil:
250            check_status(self.client.get()
251                         .ListDirectory(c_path, &listing))
252
253        cdef const HdfsPathInfo* info
254        for i in range(<int> listing.size()):
255            info = &listing[i]
256
257            # Try to trim off the hdfs://HOST:PORT piece
258            name = strip_hdfs_abspath(frombytes(info.name))
259
260            if full_info:
261                kind = ('file' if info.kind == ObjectType_FILE
262                        else 'directory')
263
264                results.append({
265                    'kind': kind,
266                    'name': name,
267                    'owner': frombytes(info.owner),
268                    'group': frombytes(info.group),
269                    'last_modified_time': info.last_modified_time,
270                    'last_access_time': info.last_access_time,
271                    'size': info.size,
272                    'replication': info.replication,
273                    'block_size': info.block_size,
274                    'permissions': info.permissions
275                })
276            else:
277                results.append(name)
278
279        return results
280
281    def chmod(self, path, mode):
282        """
283        Change file permissions
284
285        Parameters
286        ----------
287        path : string
288            absolute path to file or directory
289        mode : int
290            POSIX-like bitmask
291        """
292        self._ensure_client()
293        cdef c_string c_path = tobytes(path)
294        cdef int c_mode = mode
295        with nogil:
296            check_status(self.client.get()
297                         .Chmod(c_path, c_mode))
298
299    def chown(self, path, owner=None, group=None):
300        """
301        Change file permissions
302
303        Parameters
304        ----------
305        path : string
306            absolute path to file or directory
307        owner : string, default None
308            New owner, None for no change
309        group : string, default None
310            New group, None for no change
311        """
312        cdef:
313            c_string c_path
314            c_string c_owner
315            c_string c_group
316            const char* c_owner_ptr = NULL
317            const char* c_group_ptr = NULL
318
319        self._ensure_client()
320
321        c_path = tobytes(path)
322        if owner is not None:
323            c_owner = tobytes(owner)
324            c_owner_ptr = c_owner.c_str()
325
326        if group is not None:
327            c_group = tobytes(group)
328            c_group_ptr = c_group.c_str()
329
330        with nogil:
331            check_status(self.client.get()
332                         .Chown(c_path, c_owner_ptr, c_group_ptr))
333
334    def mkdir(self, path):
335        """
336        Create indicated directory and any necessary parent directories
337        """
338        self._ensure_client()
339        cdef c_string c_path = tobytes(path)
340        with nogil:
341            check_status(self.client.get()
342                         .MakeDirectory(c_path))
343
344    def delete(self, path, bint recursive=False):
345        """
346        Delete the indicated file or directory
347
348        Parameters
349        ----------
350        path : string
351        recursive : boolean, default False
352            If True, also delete child paths for directories
353        """
354        self._ensure_client()
355
356        cdef c_string c_path = tobytes(path)
357        with nogil:
358            check_status(self.client.get()
359                         .Delete(c_path, recursive == 1))
360
361    def open(self, path, mode='rb', buffer_size=None, replication=None,
362             default_block_size=None):
363        """
364        Open HDFS file for reading or writing
365
366        Parameters
367        ----------
368        mode : string
369            Must be one of 'rb', 'wb', 'ab'
370
371        Returns
372        -------
373        handle : HdfsFile
374        """
375        self._ensure_client()
376
377        cdef HdfsFile out = HdfsFile()
378
379        if mode not in ('rb', 'wb', 'ab'):
380            raise Exception("Mode must be 'rb' (read), "
381                            "'wb' (write, new file), or 'ab' (append)")
382
383        cdef c_string c_path = tobytes(path)
384        cdef c_bool append = False
385
386        # 0 in libhdfs means "use the default"
387        cdef int32_t c_buffer_size = buffer_size or 0
388        cdef int16_t c_replication = replication or 0
389        cdef int64_t c_default_block_size = default_block_size or 0
390
391        cdef shared_ptr[HdfsOutputStream] wr_handle
392        cdef shared_ptr[HdfsReadableFile] rd_handle
393
394        if mode in ('wb', 'ab'):
395            if mode == 'ab':
396                append = True
397
398            with nogil:
399                check_status(
400                    self.client.get()
401                    .OpenWritable(c_path, append, c_buffer_size,
402                                  c_replication, c_default_block_size,
403                                  &wr_handle))
404
405            out.set_output_stream(<shared_ptr[COutputStream]> wr_handle)
406            out.is_writable = True
407        else:
408            with nogil:
409                check_status(self.client.get()
410                             .OpenReadable(c_path, &rd_handle))
411
412            out.set_random_access_file(
413                <shared_ptr[CRandomAccessFile]> rd_handle)
414            out.is_readable = True
415
416        assert not out.closed
417
418        if c_buffer_size == 0:
419            c_buffer_size = 2 ** 16
420
421        out.mode = mode
422        out.buffer_size = c_buffer_size
423        out.parent = _HdfsFileNanny(self, out)
424        out.own_file = True
425
426        return out
427
428    def download(self, path, stream, buffer_size=None):
429        with self.open(path, 'rb') as f:
430            f.download(stream, buffer_size=buffer_size)
431
432    def upload(self, path, stream, buffer_size=None):
433        """
434        Upload file-like object to HDFS path
435        """
436        with self.open(path, 'wb') as f:
437            f.upload(stream, buffer_size=buffer_size)
438
439
440# ARROW-404: Helper class to ensure that files are closed before the
441# client. During deallocation of the extension class, the attributes are
442# decref'd which can cause the client to get closed first if the file has the
443# last remaining reference
444cdef class _HdfsFileNanny:
445    cdef:
446        object client
447        object file_handle_ref
448
449    def __cinit__(self, client, file_handle):
450        import weakref
451        self.client = client
452        self.file_handle_ref = weakref.ref(file_handle)
453
454    def __dealloc__(self):
455        fh = self.file_handle_ref()
456        if fh:
457            fh.close()
458        # avoid cyclic GC
459        self.file_handle_ref = None
460        self.client = None
461
462
463cdef class HdfsFile(NativeFile):
464    cdef readonly:
465        int32_t buffer_size
466        object mode
467        object parent
468
469    def __dealloc__(self):
470        self.parent = None
471