1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18# ---------------------------------------------------------------------- 19# HDFS IO implementation 20 21from queue import Queue, Empty as QueueEmpty, Full as QueueFull 22 23 24_HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)') 25 26 27def have_libhdfs(): 28 try: 29 with nogil: 30 check_status(HaveLibHdfs()) 31 return True 32 except Exception: 33 return False 34 35 36def strip_hdfs_abspath(path): 37 m = _HDFS_PATH_RE.match(path) 38 if m: 39 return m.group(3) 40 else: 41 return path 42 43 44cdef class HadoopFileSystem: 45 cdef: 46 shared_ptr[CIOHadoopFileSystem] client 47 48 cdef readonly: 49 bint is_open 50 object host 51 object user 52 object kerb_ticket 53 int port 54 dict extra_conf 55 56 def _connect(self, host, port, user, kerb_ticket, extra_conf): 57 cdef HdfsConnectionConfig conf 58 59 if host is not None: 60 conf.host = tobytes(host) 61 self.host = host 62 63 conf.port = port 64 self.port = port 65 66 if user is not None: 67 conf.user = tobytes(user) 68 self.user = user 69 70 if kerb_ticket is not None: 71 conf.kerb_ticket = tobytes(kerb_ticket) 72 self.kerb_ticket = kerb_ticket 73 74 with nogil: 75 check_status(HaveLibHdfs()) 76 77 if extra_conf is not None and isinstance(extra_conf, dict): 78 conf.extra_conf = {tobytes(k): tobytes(v) 79 for k, v in extra_conf.items()} 80 self.extra_conf = extra_conf 81 82 with nogil: 83 check_status(CIOHadoopFileSystem.Connect(&conf, &self.client)) 84 self.is_open = True 85 86 @classmethod 87 def connect(cls, *args, **kwargs): 88 return cls(*args, **kwargs) 89 90 def __dealloc__(self): 91 if self.is_open: 92 self.close() 93 94 def close(self): 95 """ 96 Disconnect from the HDFS cluster 97 """ 98 self._ensure_client() 99 with nogil: 100 check_status(self.client.get().Disconnect()) 101 self.is_open = False 102 103 cdef _ensure_client(self): 104 if self.client.get() == NULL: 105 raise IOError('HDFS client improperly initialized') 106 elif not self.is_open: 107 raise IOError('HDFS client is closed') 108 109 def exists(self, path): 110 """ 111 Returns True if the path is known to the cluster, False if it does not 112 (or there is an RPC error) 113 """ 114 self._ensure_client() 115 116 cdef c_string c_path = tobytes(path) 117 cdef c_bool result 118 with nogil: 119 result = self.client.get().Exists(c_path) 120 return result 121 122 def isdir(self, path): 123 cdef HdfsPathInfo info 124 try: 125 self._path_info(path, &info) 126 except ArrowIOError: 127 return False 128 return info.kind == ObjectType_DIRECTORY 129 130 def isfile(self, path): 131 cdef HdfsPathInfo info 132 try: 133 self._path_info(path, &info) 134 except ArrowIOError: 135 return False 136 return info.kind == ObjectType_FILE 137 138 def get_capacity(self): 139 """ 140 Get reported total capacity of file system 141 142 Returns 143 ------- 144 capacity : int 145 """ 146 cdef int64_t capacity = 0 147 with nogil: 148 check_status(self.client.get().GetCapacity(&capacity)) 149 return capacity 150 151 def get_space_used(self): 152 """ 153 Get space used on file system 154 155 Returns 156 ------- 157 space_used : int 158 """ 159 cdef int64_t space_used = 0 160 with nogil: 161 check_status(self.client.get().GetUsed(&space_used)) 162 return space_used 163 164 def df(self): 165 """ 166 Return free space on disk, like the UNIX df command 167 168 Returns 169 ------- 170 space : int 171 """ 172 return self.get_capacity() - self.get_space_used() 173 174 def rename(self, path, new_path): 175 cdef c_string c_path = tobytes(path) 176 cdef c_string c_new_path = tobytes(new_path) 177 with nogil: 178 check_status(self.client.get().Rename(c_path, c_new_path)) 179 180 def info(self, path): 181 """ 182 Return detailed HDFS information for path 183 184 Parameters 185 ---------- 186 path : string 187 Path to file or directory 188 189 Returns 190 ------- 191 path_info : dict 192 """ 193 cdef HdfsPathInfo info 194 self._path_info(path, &info) 195 return { 196 'path': frombytes(info.name), 197 'owner': frombytes(info.owner), 198 'group': frombytes(info.group), 199 'size': info.size, 200 'block_size': info.block_size, 201 'last_modified': info.last_modified_time, 202 'last_accessed': info.last_access_time, 203 'replication': info.replication, 204 'permissions': info.permissions, 205 'kind': ('directory' if info.kind == ObjectType_DIRECTORY 206 else 'file') 207 } 208 209 def stat(self, path): 210 """ 211 Return basic file system statistics about path 212 213 Parameters 214 ---------- 215 path : string 216 Path to file or directory 217 218 Returns 219 ------- 220 stat : dict 221 """ 222 cdef FileStatistics info 223 cdef c_string c_path = tobytes(path) 224 with nogil: 225 check_status(self.client.get() 226 .Stat(c_path, &info)) 227 return { 228 'size': info.size, 229 'kind': ('directory' if info.kind == ObjectType_DIRECTORY 230 else 'file') 231 } 232 233 cdef _path_info(self, path, HdfsPathInfo* info): 234 cdef c_string c_path = tobytes(path) 235 236 with nogil: 237 check_status(self.client.get() 238 .GetPathInfo(c_path, info)) 239 240 def ls(self, path, bint full_info): 241 cdef: 242 c_string c_path = tobytes(path) 243 vector[HdfsPathInfo] listing 244 list results = [] 245 int i 246 247 self._ensure_client() 248 249 with nogil: 250 check_status(self.client.get() 251 .ListDirectory(c_path, &listing)) 252 253 cdef const HdfsPathInfo* info 254 for i in range(<int> listing.size()): 255 info = &listing[i] 256 257 # Try to trim off the hdfs://HOST:PORT piece 258 name = strip_hdfs_abspath(frombytes(info.name)) 259 260 if full_info: 261 kind = ('file' if info.kind == ObjectType_FILE 262 else 'directory') 263 264 results.append({ 265 'kind': kind, 266 'name': name, 267 'owner': frombytes(info.owner), 268 'group': frombytes(info.group), 269 'last_modified_time': info.last_modified_time, 270 'last_access_time': info.last_access_time, 271 'size': info.size, 272 'replication': info.replication, 273 'block_size': info.block_size, 274 'permissions': info.permissions 275 }) 276 else: 277 results.append(name) 278 279 return results 280 281 def chmod(self, path, mode): 282 """ 283 Change file permissions 284 285 Parameters 286 ---------- 287 path : string 288 absolute path to file or directory 289 mode : int 290 POSIX-like bitmask 291 """ 292 self._ensure_client() 293 cdef c_string c_path = tobytes(path) 294 cdef int c_mode = mode 295 with nogil: 296 check_status(self.client.get() 297 .Chmod(c_path, c_mode)) 298 299 def chown(self, path, owner=None, group=None): 300 """ 301 Change file permissions 302 303 Parameters 304 ---------- 305 path : string 306 absolute path to file or directory 307 owner : string, default None 308 New owner, None for no change 309 group : string, default None 310 New group, None for no change 311 """ 312 cdef: 313 c_string c_path 314 c_string c_owner 315 c_string c_group 316 const char* c_owner_ptr = NULL 317 const char* c_group_ptr = NULL 318 319 self._ensure_client() 320 321 c_path = tobytes(path) 322 if owner is not None: 323 c_owner = tobytes(owner) 324 c_owner_ptr = c_owner.c_str() 325 326 if group is not None: 327 c_group = tobytes(group) 328 c_group_ptr = c_group.c_str() 329 330 with nogil: 331 check_status(self.client.get() 332 .Chown(c_path, c_owner_ptr, c_group_ptr)) 333 334 def mkdir(self, path): 335 """ 336 Create indicated directory and any necessary parent directories 337 """ 338 self._ensure_client() 339 cdef c_string c_path = tobytes(path) 340 with nogil: 341 check_status(self.client.get() 342 .MakeDirectory(c_path)) 343 344 def delete(self, path, bint recursive=False): 345 """ 346 Delete the indicated file or directory 347 348 Parameters 349 ---------- 350 path : string 351 recursive : boolean, default False 352 If True, also delete child paths for directories 353 """ 354 self._ensure_client() 355 356 cdef c_string c_path = tobytes(path) 357 with nogil: 358 check_status(self.client.get() 359 .Delete(c_path, recursive == 1)) 360 361 def open(self, path, mode='rb', buffer_size=None, replication=None, 362 default_block_size=None): 363 """ 364 Open HDFS file for reading or writing 365 366 Parameters 367 ---------- 368 mode : string 369 Must be one of 'rb', 'wb', 'ab' 370 371 Returns 372 ------- 373 handle : HdfsFile 374 """ 375 self._ensure_client() 376 377 cdef HdfsFile out = HdfsFile() 378 379 if mode not in ('rb', 'wb', 'ab'): 380 raise Exception("Mode must be 'rb' (read), " 381 "'wb' (write, new file), or 'ab' (append)") 382 383 cdef c_string c_path = tobytes(path) 384 cdef c_bool append = False 385 386 # 0 in libhdfs means "use the default" 387 cdef int32_t c_buffer_size = buffer_size or 0 388 cdef int16_t c_replication = replication or 0 389 cdef int64_t c_default_block_size = default_block_size or 0 390 391 cdef shared_ptr[HdfsOutputStream] wr_handle 392 cdef shared_ptr[HdfsReadableFile] rd_handle 393 394 if mode in ('wb', 'ab'): 395 if mode == 'ab': 396 append = True 397 398 with nogil: 399 check_status( 400 self.client.get() 401 .OpenWritable(c_path, append, c_buffer_size, 402 c_replication, c_default_block_size, 403 &wr_handle)) 404 405 out.set_output_stream(<shared_ptr[COutputStream]> wr_handle) 406 out.is_writable = True 407 else: 408 with nogil: 409 check_status(self.client.get() 410 .OpenReadable(c_path, &rd_handle)) 411 412 out.set_random_access_file( 413 <shared_ptr[CRandomAccessFile]> rd_handle) 414 out.is_readable = True 415 416 assert not out.closed 417 418 if c_buffer_size == 0: 419 c_buffer_size = 2 ** 16 420 421 out.mode = mode 422 out.buffer_size = c_buffer_size 423 out.parent = _HdfsFileNanny(self, out) 424 out.own_file = True 425 426 return out 427 428 def download(self, path, stream, buffer_size=None): 429 with self.open(path, 'rb') as f: 430 f.download(stream, buffer_size=buffer_size) 431 432 def upload(self, path, stream, buffer_size=None): 433 """ 434 Upload file-like object to HDFS path 435 """ 436 with self.open(path, 'wb') as f: 437 f.upload(stream, buffer_size=buffer_size) 438 439 440# ARROW-404: Helper class to ensure that files are closed before the 441# client. During deallocation of the extension class, the attributes are 442# decref'd which can cause the client to get closed first if the file has the 443# last remaining reference 444cdef class _HdfsFileNanny: 445 cdef: 446 object client 447 object file_handle_ref 448 449 def __cinit__(self, client, file_handle): 450 import weakref 451 self.client = client 452 self.file_handle_ref = weakref.ref(file_handle) 453 454 def __dealloc__(self): 455 fh = self.file_handle_ref() 456 if fh: 457 fh.close() 458 # avoid cyclic GC 459 self.file_handle_ref = None 460 self.client = None 461 462 463cdef class HdfsFile(NativeFile): 464 cdef readonly: 465 int32_t buffer_size 466 object mode 467 object parent 468 469 def __dealloc__(self): 470 self.parent = None 471