1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18# ---------------------------------------------------------------------- 19# HDFS IO implementation 20 21# cython: language_level = 3 22 23import re 24 25from pyarrow.lib cimport check_status, _Weakrefable, NativeFile 26from pyarrow.includes.common cimport * 27from pyarrow.includes.libarrow cimport * 28from pyarrow.includes.libarrow_fs cimport * 29from pyarrow.lib import frombytes, tobytes, ArrowIOError 30 31from queue import Queue, Empty as QueueEmpty, Full as QueueFull 32 33 34_HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)') 35 36 37def have_libhdfs(): 38 try: 39 with nogil: 40 check_status(HaveLibHdfs()) 41 return True 42 except Exception: 43 return False 44 45 46def strip_hdfs_abspath(path): 47 m = _HDFS_PATH_RE.match(path) 48 if m: 49 return m.group(3) 50 else: 51 return path 52 53 54cdef class HadoopFileSystem(_Weakrefable): 55 cdef: 56 shared_ptr[CIOHadoopFileSystem] client 57 58 cdef readonly: 59 bint is_open 60 object host 61 object user 62 object kerb_ticket 63 int port 64 dict extra_conf 65 66 def _connect(self, host, port, user, kerb_ticket, extra_conf): 67 cdef HdfsConnectionConfig conf 68 69 if host is not None: 70 conf.host = tobytes(host) 71 self.host = host 72 73 conf.port = port 74 self.port = port 75 76 if user is not None: 77 conf.user = tobytes(user) 78 self.user = user 79 80 if kerb_ticket is not None: 81 conf.kerb_ticket = tobytes(kerb_ticket) 82 self.kerb_ticket = kerb_ticket 83 84 with nogil: 85 check_status(HaveLibHdfs()) 86 87 if extra_conf is not None and isinstance(extra_conf, dict): 88 conf.extra_conf = {tobytes(k): tobytes(v) 89 for k, v in extra_conf.items()} 90 self.extra_conf = extra_conf 91 92 with nogil: 93 check_status(CIOHadoopFileSystem.Connect(&conf, &self.client)) 94 self.is_open = True 95 96 @classmethod 97 def connect(cls, *args, **kwargs): 98 return cls(*args, **kwargs) 99 100 def __dealloc__(self): 101 if self.is_open: 102 self.close() 103 104 def close(self): 105 """ 106 Disconnect from the HDFS cluster 107 """ 108 self._ensure_client() 109 with nogil: 110 check_status(self.client.get().Disconnect()) 111 self.is_open = False 112 113 cdef _ensure_client(self): 114 if self.client.get() == NULL: 115 raise IOError('HDFS client improperly initialized') 116 elif not self.is_open: 117 raise IOError('HDFS client is closed') 118 119 def exists(self, path): 120 """ 121 Returns True if the path is known to the cluster, False if it does not 122 (or there is an RPC error) 123 """ 124 self._ensure_client() 125 126 cdef c_string c_path = tobytes(path) 127 cdef c_bool result 128 with nogil: 129 result = self.client.get().Exists(c_path) 130 return result 131 132 def isdir(self, path): 133 cdef HdfsPathInfo info 134 try: 135 self._path_info(path, &info) 136 except ArrowIOError: 137 return False 138 return info.kind == ObjectType_DIRECTORY 139 140 def isfile(self, path): 141 cdef HdfsPathInfo info 142 try: 143 self._path_info(path, &info) 144 except ArrowIOError: 145 return False 146 return info.kind == ObjectType_FILE 147 148 def get_capacity(self): 149 """ 150 Get reported total capacity of file system 151 152 Returns 153 ------- 154 capacity : int 155 """ 156 cdef int64_t capacity = 0 157 with nogil: 158 check_status(self.client.get().GetCapacity(&capacity)) 159 return capacity 160 161 def get_space_used(self): 162 """ 163 Get space used on file system 164 165 Returns 166 ------- 167 space_used : int 168 """ 169 cdef int64_t space_used = 0 170 with nogil: 171 check_status(self.client.get().GetUsed(&space_used)) 172 return space_used 173 174 def df(self): 175 """ 176 Return free space on disk, like the UNIX df command 177 178 Returns 179 ------- 180 space : int 181 """ 182 return self.get_capacity() - self.get_space_used() 183 184 def rename(self, path, new_path): 185 cdef c_string c_path = tobytes(path) 186 cdef c_string c_new_path = tobytes(new_path) 187 with nogil: 188 check_status(self.client.get().Rename(c_path, c_new_path)) 189 190 def info(self, path): 191 """ 192 Return detailed HDFS information for path 193 194 Parameters 195 ---------- 196 path : string 197 Path to file or directory 198 199 Returns 200 ------- 201 path_info : dict 202 """ 203 cdef HdfsPathInfo info 204 self._path_info(path, &info) 205 return { 206 'path': frombytes(info.name), 207 'owner': frombytes(info.owner), 208 'group': frombytes(info.group), 209 'size': info.size, 210 'block_size': info.block_size, 211 'last_modified': info.last_modified_time, 212 'last_accessed': info.last_access_time, 213 'replication': info.replication, 214 'permissions': info.permissions, 215 'kind': ('directory' if info.kind == ObjectType_DIRECTORY 216 else 'file') 217 } 218 219 def stat(self, path): 220 """ 221 Return basic file system statistics about path 222 223 Parameters 224 ---------- 225 path : string 226 Path to file or directory 227 228 Returns 229 ------- 230 stat : dict 231 """ 232 cdef FileStatistics info 233 cdef c_string c_path = tobytes(path) 234 with nogil: 235 check_status(self.client.get() 236 .Stat(c_path, &info)) 237 return { 238 'size': info.size, 239 'kind': ('directory' if info.kind == ObjectType_DIRECTORY 240 else 'file') 241 } 242 243 cdef _path_info(self, path, HdfsPathInfo* info): 244 cdef c_string c_path = tobytes(path) 245 246 with nogil: 247 check_status(self.client.get() 248 .GetPathInfo(c_path, info)) 249 250 def ls(self, path, bint full_info): 251 cdef: 252 c_string c_path = tobytes(path) 253 vector[HdfsPathInfo] listing 254 list results = [] 255 int i 256 257 self._ensure_client() 258 259 with nogil: 260 check_status(self.client.get() 261 .ListDirectory(c_path, &listing)) 262 263 cdef const HdfsPathInfo* info 264 for i in range(<int> listing.size()): 265 info = &listing[i] 266 267 # Try to trim off the hdfs://HOST:PORT piece 268 name = strip_hdfs_abspath(frombytes(info.name)) 269 270 if full_info: 271 kind = ('file' if info.kind == ObjectType_FILE 272 else 'directory') 273 274 results.append({ 275 'kind': kind, 276 'name': name, 277 'owner': frombytes(info.owner), 278 'group': frombytes(info.group), 279 'last_modified_time': info.last_modified_time, 280 'last_access_time': info.last_access_time, 281 'size': info.size, 282 'replication': info.replication, 283 'block_size': info.block_size, 284 'permissions': info.permissions 285 }) 286 else: 287 results.append(name) 288 289 return results 290 291 def chmod(self, path, mode): 292 """ 293 Change file permissions 294 295 Parameters 296 ---------- 297 path : string 298 absolute path to file or directory 299 mode : int 300 POSIX-like bitmask 301 """ 302 self._ensure_client() 303 cdef c_string c_path = tobytes(path) 304 cdef int c_mode = mode 305 with nogil: 306 check_status(self.client.get() 307 .Chmod(c_path, c_mode)) 308 309 def chown(self, path, owner=None, group=None): 310 """ 311 Change file permissions 312 313 Parameters 314 ---------- 315 path : string 316 absolute path to file or directory 317 owner : string, default None 318 New owner, None for no change 319 group : string, default None 320 New group, None for no change 321 """ 322 cdef: 323 c_string c_path 324 c_string c_owner 325 c_string c_group 326 const char* c_owner_ptr = NULL 327 const char* c_group_ptr = NULL 328 329 self._ensure_client() 330 331 c_path = tobytes(path) 332 if owner is not None: 333 c_owner = tobytes(owner) 334 c_owner_ptr = c_owner.c_str() 335 336 if group is not None: 337 c_group = tobytes(group) 338 c_group_ptr = c_group.c_str() 339 340 with nogil: 341 check_status(self.client.get() 342 .Chown(c_path, c_owner_ptr, c_group_ptr)) 343 344 def mkdir(self, path): 345 """ 346 Create indicated directory and any necessary parent directories 347 """ 348 self._ensure_client() 349 cdef c_string c_path = tobytes(path) 350 with nogil: 351 check_status(self.client.get() 352 .MakeDirectory(c_path)) 353 354 def delete(self, path, bint recursive=False): 355 """ 356 Delete the indicated file or directory 357 358 Parameters 359 ---------- 360 path : string 361 recursive : boolean, default False 362 If True, also delete child paths for directories 363 """ 364 self._ensure_client() 365 366 cdef c_string c_path = tobytes(path) 367 with nogil: 368 check_status(self.client.get() 369 .Delete(c_path, recursive == 1)) 370 371 def open(self, path, mode='rb', buffer_size=None, replication=None, 372 default_block_size=None): 373 """ 374 Open HDFS file for reading or writing 375 376 Parameters 377 ---------- 378 mode : string 379 Must be one of 'rb', 'wb', 'ab' 380 381 Returns 382 ------- 383 handle : HdfsFile 384 """ 385 self._ensure_client() 386 387 cdef HdfsFile out = HdfsFile() 388 389 if mode not in ('rb', 'wb', 'ab'): 390 raise Exception("Mode must be 'rb' (read), " 391 "'wb' (write, new file), or 'ab' (append)") 392 393 cdef c_string c_path = tobytes(path) 394 cdef c_bool append = False 395 396 # 0 in libhdfs means "use the default" 397 cdef int32_t c_buffer_size = buffer_size or 0 398 cdef int16_t c_replication = replication or 0 399 cdef int64_t c_default_block_size = default_block_size or 0 400 401 cdef shared_ptr[HdfsOutputStream] wr_handle 402 cdef shared_ptr[HdfsReadableFile] rd_handle 403 404 if mode in ('wb', 'ab'): 405 if mode == 'ab': 406 append = True 407 408 with nogil: 409 check_status( 410 self.client.get() 411 .OpenWritable(c_path, append, c_buffer_size, 412 c_replication, c_default_block_size, 413 &wr_handle)) 414 415 out.set_output_stream(<shared_ptr[COutputStream]> wr_handle) 416 out.is_writable = True 417 else: 418 with nogil: 419 check_status(self.client.get() 420 .OpenReadable(c_path, &rd_handle)) 421 422 out.set_random_access_file( 423 <shared_ptr[CRandomAccessFile]> rd_handle) 424 out.is_readable = True 425 426 assert not out.closed 427 428 if c_buffer_size == 0: 429 c_buffer_size = 2 ** 16 430 431 out.mode = mode 432 out.buffer_size = c_buffer_size 433 out.parent = _HdfsFileNanny(self, out) 434 out.own_file = True 435 436 return out 437 438 def download(self, path, stream, buffer_size=None): 439 with self.open(path, 'rb') as f: 440 f.download(stream, buffer_size=buffer_size) 441 442 def upload(self, path, stream, buffer_size=None): 443 """ 444 Upload file-like object to HDFS path 445 """ 446 with self.open(path, 'wb') as f: 447 f.upload(stream, buffer_size=buffer_size) 448 449 450# ARROW-404: Helper class to ensure that files are closed before the 451# client. During deallocation of the extension class, the attributes are 452# decref'd which can cause the client to get closed first if the file has the 453# last remaining reference 454cdef class _HdfsFileNanny(_Weakrefable): 455 cdef: 456 object client 457 object file_handle_ref 458 459 def __cinit__(self, client, file_handle): 460 import weakref 461 self.client = client 462 self.file_handle_ref = weakref.ref(file_handle) 463 464 def __dealloc__(self): 465 fh = self.file_handle_ref() 466 if fh: 467 fh.close() 468 # avoid cyclic GC 469 self.file_handle_ref = None 470 self.client = None 471 472 473cdef class HdfsFile(NativeFile): 474 cdef readonly: 475 int32_t buffer_size 476 object mode 477 object parent 478 479 def __dealloc__(self): 480 self.parent = None 481