1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18# ----------------------------------------------------------------------
19# HDFS IO implementation
20
21# cython: language_level = 3
22
23import re
24
25from pyarrow.lib cimport check_status, _Weakrefable, NativeFile
26from pyarrow.includes.common cimport *
27from pyarrow.includes.libarrow cimport *
28from pyarrow.includes.libarrow_fs cimport *
29from pyarrow.lib import frombytes, tobytes, ArrowIOError
30
31from queue import Queue, Empty as QueueEmpty, Full as QueueFull
32
33
34_HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)')
35
36
37def have_libhdfs():
38    try:
39        with nogil:
40            check_status(HaveLibHdfs())
41        return True
42    except Exception:
43        return False
44
45
46def strip_hdfs_abspath(path):
47    m = _HDFS_PATH_RE.match(path)
48    if m:
49        return m.group(3)
50    else:
51        return path
52
53
54cdef class HadoopFileSystem(_Weakrefable):
55    cdef:
56        shared_ptr[CIOHadoopFileSystem] client
57
58    cdef readonly:
59        bint is_open
60        object host
61        object user
62        object kerb_ticket
63        int port
64        dict extra_conf
65
66    def _connect(self, host, port, user, kerb_ticket, extra_conf):
67        cdef HdfsConnectionConfig conf
68
69        if host is not None:
70            conf.host = tobytes(host)
71        self.host = host
72
73        conf.port = port
74        self.port = port
75
76        if user is not None:
77            conf.user = tobytes(user)
78        self.user = user
79
80        if kerb_ticket is not None:
81            conf.kerb_ticket = tobytes(kerb_ticket)
82        self.kerb_ticket = kerb_ticket
83
84        with nogil:
85            check_status(HaveLibHdfs())
86
87        if extra_conf is not None and isinstance(extra_conf, dict):
88            conf.extra_conf = {tobytes(k): tobytes(v)
89                               for k, v in extra_conf.items()}
90        self.extra_conf = extra_conf
91
92        with nogil:
93            check_status(CIOHadoopFileSystem.Connect(&conf, &self.client))
94        self.is_open = True
95
96    @classmethod
97    def connect(cls, *args, **kwargs):
98        return cls(*args, **kwargs)
99
100    def __dealloc__(self):
101        if self.is_open:
102            self.close()
103
104    def close(self):
105        """
106        Disconnect from the HDFS cluster
107        """
108        self._ensure_client()
109        with nogil:
110            check_status(self.client.get().Disconnect())
111        self.is_open = False
112
113    cdef _ensure_client(self):
114        if self.client.get() == NULL:
115            raise IOError('HDFS client improperly initialized')
116        elif not self.is_open:
117            raise IOError('HDFS client is closed')
118
119    def exists(self, path):
120        """
121        Returns True if the path is known to the cluster, False if it does not
122        (or there is an RPC error)
123        """
124        self._ensure_client()
125
126        cdef c_string c_path = tobytes(path)
127        cdef c_bool result
128        with nogil:
129            result = self.client.get().Exists(c_path)
130        return result
131
132    def isdir(self, path):
133        cdef HdfsPathInfo info
134        try:
135            self._path_info(path, &info)
136        except ArrowIOError:
137            return False
138        return info.kind == ObjectType_DIRECTORY
139
140    def isfile(self, path):
141        cdef HdfsPathInfo info
142        try:
143            self._path_info(path, &info)
144        except ArrowIOError:
145            return False
146        return info.kind == ObjectType_FILE
147
148    def get_capacity(self):
149        """
150        Get reported total capacity of file system
151
152        Returns
153        -------
154        capacity : int
155        """
156        cdef int64_t capacity = 0
157        with nogil:
158            check_status(self.client.get().GetCapacity(&capacity))
159        return capacity
160
161    def get_space_used(self):
162        """
163        Get space used on file system
164
165        Returns
166        -------
167        space_used : int
168        """
169        cdef int64_t space_used = 0
170        with nogil:
171            check_status(self.client.get().GetUsed(&space_used))
172        return space_used
173
174    def df(self):
175        """
176        Return free space on disk, like the UNIX df command
177
178        Returns
179        -------
180        space : int
181        """
182        return self.get_capacity() - self.get_space_used()
183
184    def rename(self, path, new_path):
185        cdef c_string c_path = tobytes(path)
186        cdef c_string c_new_path = tobytes(new_path)
187        with nogil:
188            check_status(self.client.get().Rename(c_path, c_new_path))
189
190    def info(self, path):
191        """
192        Return detailed HDFS information for path
193
194        Parameters
195        ----------
196        path : string
197            Path to file or directory
198
199        Returns
200        -------
201        path_info : dict
202        """
203        cdef HdfsPathInfo info
204        self._path_info(path, &info)
205        return {
206            'path': frombytes(info.name),
207            'owner': frombytes(info.owner),
208            'group': frombytes(info.group),
209            'size': info.size,
210            'block_size': info.block_size,
211            'last_modified': info.last_modified_time,
212            'last_accessed': info.last_access_time,
213            'replication': info.replication,
214            'permissions': info.permissions,
215            'kind': ('directory' if info.kind == ObjectType_DIRECTORY
216                     else 'file')
217        }
218
219    def stat(self, path):
220        """
221        Return basic file system statistics about path
222
223        Parameters
224        ----------
225        path : string
226            Path to file or directory
227
228        Returns
229        -------
230        stat : dict
231        """
232        cdef FileStatistics info
233        cdef c_string c_path = tobytes(path)
234        with nogil:
235            check_status(self.client.get()
236                         .Stat(c_path, &info))
237        return {
238            'size': info.size,
239            'kind': ('directory' if info.kind == ObjectType_DIRECTORY
240                     else 'file')
241        }
242
243    cdef _path_info(self, path, HdfsPathInfo* info):
244        cdef c_string c_path = tobytes(path)
245
246        with nogil:
247            check_status(self.client.get()
248                         .GetPathInfo(c_path, info))
249
250    def ls(self, path, bint full_info):
251        cdef:
252            c_string c_path = tobytes(path)
253            vector[HdfsPathInfo] listing
254            list results = []
255            int i
256
257        self._ensure_client()
258
259        with nogil:
260            check_status(self.client.get()
261                         .ListDirectory(c_path, &listing))
262
263        cdef const HdfsPathInfo* info
264        for i in range(<int> listing.size()):
265            info = &listing[i]
266
267            # Try to trim off the hdfs://HOST:PORT piece
268            name = strip_hdfs_abspath(frombytes(info.name))
269
270            if full_info:
271                kind = ('file' if info.kind == ObjectType_FILE
272                        else 'directory')
273
274                results.append({
275                    'kind': kind,
276                    'name': name,
277                    'owner': frombytes(info.owner),
278                    'group': frombytes(info.group),
279                    'last_modified_time': info.last_modified_time,
280                    'last_access_time': info.last_access_time,
281                    'size': info.size,
282                    'replication': info.replication,
283                    'block_size': info.block_size,
284                    'permissions': info.permissions
285                })
286            else:
287                results.append(name)
288
289        return results
290
291    def chmod(self, path, mode):
292        """
293        Change file permissions
294
295        Parameters
296        ----------
297        path : string
298            absolute path to file or directory
299        mode : int
300            POSIX-like bitmask
301        """
302        self._ensure_client()
303        cdef c_string c_path = tobytes(path)
304        cdef int c_mode = mode
305        with nogil:
306            check_status(self.client.get()
307                         .Chmod(c_path, c_mode))
308
309    def chown(self, path, owner=None, group=None):
310        """
311        Change file permissions
312
313        Parameters
314        ----------
315        path : string
316            absolute path to file or directory
317        owner : string, default None
318            New owner, None for no change
319        group : string, default None
320            New group, None for no change
321        """
322        cdef:
323            c_string c_path
324            c_string c_owner
325            c_string c_group
326            const char* c_owner_ptr = NULL
327            const char* c_group_ptr = NULL
328
329        self._ensure_client()
330
331        c_path = tobytes(path)
332        if owner is not None:
333            c_owner = tobytes(owner)
334            c_owner_ptr = c_owner.c_str()
335
336        if group is not None:
337            c_group = tobytes(group)
338            c_group_ptr = c_group.c_str()
339
340        with nogil:
341            check_status(self.client.get()
342                         .Chown(c_path, c_owner_ptr, c_group_ptr))
343
344    def mkdir(self, path):
345        """
346        Create indicated directory and any necessary parent directories
347        """
348        self._ensure_client()
349        cdef c_string c_path = tobytes(path)
350        with nogil:
351            check_status(self.client.get()
352                         .MakeDirectory(c_path))
353
354    def delete(self, path, bint recursive=False):
355        """
356        Delete the indicated file or directory
357
358        Parameters
359        ----------
360        path : string
361        recursive : boolean, default False
362            If True, also delete child paths for directories
363        """
364        self._ensure_client()
365
366        cdef c_string c_path = tobytes(path)
367        with nogil:
368            check_status(self.client.get()
369                         .Delete(c_path, recursive == 1))
370
371    def open(self, path, mode='rb', buffer_size=None, replication=None,
372             default_block_size=None):
373        """
374        Open HDFS file for reading or writing
375
376        Parameters
377        ----------
378        mode : string
379            Must be one of 'rb', 'wb', 'ab'
380
381        Returns
382        -------
383        handle : HdfsFile
384        """
385        self._ensure_client()
386
387        cdef HdfsFile out = HdfsFile()
388
389        if mode not in ('rb', 'wb', 'ab'):
390            raise Exception("Mode must be 'rb' (read), "
391                            "'wb' (write, new file), or 'ab' (append)")
392
393        cdef c_string c_path = tobytes(path)
394        cdef c_bool append = False
395
396        # 0 in libhdfs means "use the default"
397        cdef int32_t c_buffer_size = buffer_size or 0
398        cdef int16_t c_replication = replication or 0
399        cdef int64_t c_default_block_size = default_block_size or 0
400
401        cdef shared_ptr[HdfsOutputStream] wr_handle
402        cdef shared_ptr[HdfsReadableFile] rd_handle
403
404        if mode in ('wb', 'ab'):
405            if mode == 'ab':
406                append = True
407
408            with nogil:
409                check_status(
410                    self.client.get()
411                    .OpenWritable(c_path, append, c_buffer_size,
412                                  c_replication, c_default_block_size,
413                                  &wr_handle))
414
415            out.set_output_stream(<shared_ptr[COutputStream]> wr_handle)
416            out.is_writable = True
417        else:
418            with nogil:
419                check_status(self.client.get()
420                             .OpenReadable(c_path, &rd_handle))
421
422            out.set_random_access_file(
423                <shared_ptr[CRandomAccessFile]> rd_handle)
424            out.is_readable = True
425
426        assert not out.closed
427
428        if c_buffer_size == 0:
429            c_buffer_size = 2 ** 16
430
431        out.mode = mode
432        out.buffer_size = c_buffer_size
433        out.parent = _HdfsFileNanny(self, out)
434        out.own_file = True
435
436        return out
437
438    def download(self, path, stream, buffer_size=None):
439        with self.open(path, 'rb') as f:
440            f.download(stream, buffer_size=buffer_size)
441
442    def upload(self, path, stream, buffer_size=None):
443        """
444        Upload file-like object to HDFS path
445        """
446        with self.open(path, 'wb') as f:
447            f.upload(stream, buffer_size=buffer_size)
448
449
450# ARROW-404: Helper class to ensure that files are closed before the
451# client. During deallocation of the extension class, the attributes are
452# decref'd which can cause the client to get closed first if the file has the
453# last remaining reference
454cdef class _HdfsFileNanny(_Weakrefable):
455    cdef:
456        object client
457        object file_handle_ref
458
459    def __cinit__(self, client, file_handle):
460        import weakref
461        self.client = client
462        self.file_handle_ref = weakref.ref(file_handle)
463
464    def __dealloc__(self):
465        fh = self.file_handle_ref()
466        if fh:
467            fh.close()
468        # avoid cyclic GC
469        self.file_handle_ref = None
470        self.client = None
471
472
473cdef class HdfsFile(NativeFile):
474    cdef readonly:
475        int32_t buffer_size
476        object mode
477        object parent
478
479    def __dealloc__(self):
480        self.parent = None
481