1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18module Arrow 19 class TableLoader 20 class << self 21 def load(input, options={}) 22 new(input, options).load 23 end 24 end 25 26 def initialize(input, options={}) 27 input = input.to_path if input.respond_to?(:to_path) 28 @input = input 29 @options = options 30 fill_options 31 end 32 33 def load 34 format = @options[:format] 35 custom_load_method = "load_as_#{format}" 36 unless respond_to?(custom_load_method, true) 37 available_formats = [] 38 (methods(true) | private_methods(true)).each do |name| 39 match_data = /\Aload_as_/.match(name.to_s) 40 if match_data 41 available_formats << match_data.post_match 42 end 43 end 44 deprecated_formats = ["batch", "stream"] 45 available_formats -= deprecated_formats 46 message = "Arrow::Table load format must be one of [" 47 message << available_formats.join(", ") 48 message << "]: #{format.inspect}" 49 raise ArgumentError, message 50 end 51 if method(custom_load_method).arity.zero? 52 __send__(custom_load_method) 53 else 54 # For backward compatibility. 55 __send__(custom_load_method, @input) 56 end 57 end 58 59 private 60 def fill_options 61 if @options[:format] and @options.key?(:compression) 62 return 63 end 64 65 if @input.is_a?(Buffer) 66 info = {} 67 else 68 extension = PathExtension.new(@input) 69 info = extension.extract 70 end 71 format = info[:format] 72 @options = @options.dup 73 if format and respond_to?("load_as_#{format}", true) 74 @options[:format] ||= format.to_sym 75 else 76 @options[:format] ||= :arrow 77 end 78 unless @options.key?(:compression) 79 @options[:compression] = info[:compression] 80 end 81 end 82 83 def open_input_stream 84 if @input.is_a?(Buffer) 85 BufferInputStream.new(@input) 86 else 87 MemoryMappedInputStream.new(@input) 88 end 89 end 90 91 def load_raw(input, reader) 92 schema = reader.schema 93 record_batches = [] 94 reader.each do |record_batch| 95 record_batches << record_batch 96 end 97 table = Table.new(schema, record_batches) 98 table.instance_variable_set(:@input, input) 99 table 100 end 101 102 def load_as_arrow 103 input = nil 104 reader = nil 105 error = nil 106 reader_class_candidates = [ 107 RecordBatchFileReader, 108 RecordBatchStreamReader, 109 ] 110 reader_class_candidates.each do |reader_class_candidate| 111 input = open_input_stream 112 begin 113 reader = reader_class_candidate.new(input) 114 rescue Arrow::Error 115 error = $! 116 else 117 break 118 end 119 end 120 raise error if reader.nil? 121 load_raw(input, reader) 122 end 123 124 # @since 1.0.0 125 def load_as_arrow_file 126 input = open_input_stream 127 reader = RecordBatchFileReader.new(input) 128 load_raw(input, reader) 129 end 130 131 # @deprecated Use `format: :arrow_file` instead. 132 def load_as_batch 133 load_as_arrow_file 134 end 135 136 # @since 1.0.0 137 def load_as_arrow_streaming 138 input = open_input_stream 139 reader = RecordBatchStreamReader.new(input) 140 load_raw(input, reader) 141 end 142 143 # @deprecated Use `format: :arrow_streaming` instead. 144 def load_as_stream 145 load_as_arrow_streaming 146 end 147 148 if Arrow.const_defined?(:ORCFileReader) 149 def load_as_orc 150 input = open_input_stream 151 reader = ORCFileReader.new(input) 152 field_indexes = @options[:field_indexes] 153 reader.set_field_indexes(field_indexes) if field_indexes 154 table = reader.read_stripes 155 table.instance_variable_set(:@input, input) 156 table 157 end 158 end 159 160 def csv_load(options) 161 options.delete(:format) 162 if @input.is_a?(Buffer) 163 CSVLoader.load(@input.data.to_s, **options) 164 else 165 CSVLoader.load(Pathname.new(@input), **options) 166 end 167 end 168 169 def load_as_csv 170 csv_load(@options.dup) 171 end 172 173 def load_as_tsv 174 options = @options.dup 175 options[:delimiter] = "\t" 176 csv_load(options.dup) 177 end 178 179 def load_as_feather 180 input = open_input_stream 181 reader = FeatherFileReader.new(input) 182 table = reader.read 183 table.instance_variable_set(:@input, input) 184 table 185 end 186 end 187end 188