1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18# cython: language_level = 3 19 20from pyarrow.lib cimport check_status 21from pyarrow.compat import frombytes, tobytes 22from pyarrow.includes.common cimport * 23from pyarrow.includes.libarrow cimport * 24from pyarrow.includes.libarrow_fs cimport * 25from pyarrow._fs cimport FileSystem 26 27 28cpdef enum S3LogLevel: 29 Off = <int8_t> CS3LogLevel_Off 30 Fatal = <int8_t> CS3LogLevel_Fatal 31 Error = <int8_t> CS3LogLevel_Error 32 Warn = <int8_t> CS3LogLevel_Warn 33 Info = <int8_t> CS3LogLevel_Info 34 Debug = <int8_t> CS3LogLevel_Debug 35 Trace = <int8_t> CS3LogLevel_Trace 36 37 38def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal): 39 cdef CS3GlobalOptions options 40 options.log_level = <CS3LogLevel> log_level 41 check_status(CInitializeS3(options)) 42 43 44def finalize_s3(): 45 check_status(CFinalizeS3()) 46 47 48cdef class S3FileSystem(FileSystem): 49 """S3-backed FileSystem implementation 50 51 If neither access_key nor secret_key are provided then attempts to 52 initialize from AWS environment variables, otherwise both access_key and 53 secret_key must be provided. 54 55 Note: S3 buckets are special and the operations available on them may be 56 limited or more expensive than desired. 57 58 Parameters 59 ---------- 60 access_key: str, default None 61 AWS Access Key ID. Pass None to use the standard AWS environment 62 variables and/or configuration file. 63 secret_key: str, default None 64 AWS Secret Access key. Pass None to use the standard AWS environment 65 variables and/or configuration file. 66 region: str, default 'us-east-1' 67 AWS region to connect to. 68 scheme: str, default 'https' 69 S3 connection transport scheme. 70 endpoint_override: str, default None 71 Override region with a connect string such as "localhost:9000" 72 background_writes: boolean, default True 73 Whether OutputStream writes will be issued in the background, without 74 blocking. 75 """ 76 77 cdef: 78 CS3FileSystem* s3fs 79 80 def __init__(self, access_key=None, secret_key=None, region=None, 81 scheme=None, endpoint_override=None, 82 bint background_writes=True): 83 cdef: 84 CS3Options options 85 shared_ptr[CS3FileSystem] wrapped 86 87 if access_key is not None and secret_key is None: 88 raise ValueError( 89 'In order to initialize with explicit credentials both ' 90 'access_key and secret_key must be provided, ' 91 '`secret_key` is not set.' 92 ) 93 elif access_key is None and secret_key is not None: 94 raise ValueError( 95 'In order to initialize with explicit credentials both ' 96 'access_key and secret_key must be provided, ' 97 '`access_key` is not set.' 98 ) 99 elif access_key is not None or secret_key is not None: 100 options = CS3Options.FromAccessKey( 101 tobytes(access_key), 102 tobytes(secret_key) 103 ) 104 else: 105 options = CS3Options.Defaults() 106 107 if region is not None: 108 options.region = tobytes(region) 109 if scheme is not None: 110 options.scheme = tobytes(scheme) 111 if endpoint_override is not None: 112 options.endpoint_override = tobytes(endpoint_override) 113 if background_writes is not None: 114 options.background_writes = background_writes 115 116 with nogil: 117 wrapped = GetResultValue(CS3FileSystem.Make(options)) 118 119 self.init(<shared_ptr[CFileSystem]> wrapped) 120 121 cdef init(self, const shared_ptr[CFileSystem]& wrapped): 122 FileSystem.init(self, wrapped) 123 self.s3fs = <CS3FileSystem*> wrapped.get() 124 125 def __reduce__(self): 126 cdef CS3Options opts = self.s3fs.options() 127 return ( 128 S3FileSystem, ( 129 frombytes(opts.GetAccessKey()), 130 frombytes(opts.GetSecretKey()), 131 frombytes(opts.region), 132 frombytes(opts.scheme), 133 frombytes(opts.endpoint_override), 134 opts.background_writes 135 ) 136 ) 137