1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18module Arrow
19  class TableLoader
20    class << self
21      def load(input, options={})
22        new(input, options).load
23      end
24    end
25
26    def initialize(input, options={})
27      input = input.to_path if input.respond_to?(:to_path)
28      @input = input
29      @options = options
30      fill_options
31    end
32
33    def load
34      format = @options[:format]
35      custom_load_method = "load_as_#{format}"
36      unless respond_to?(custom_load_method, true)
37        available_formats = []
38        (methods(true) | private_methods(true)).each do |name|
39          match_data = /\Aload_as_/.match(name.to_s)
40          if match_data
41            available_formats << match_data.post_match
42          end
43        end
44        deprecated_formats = ["batch", "stream"]
45        available_formats -= deprecated_formats
46        message = "Arrow::Table load format must be one of ["
47        message << available_formats.join(", ")
48        message << "]: #{format.inspect}"
49        raise ArgumentError, message
50      end
51      if method(custom_load_method).arity.zero?
52        __send__(custom_load_method)
53      else
54        # For backward compatibility.
55        __send__(custom_load_method, @input)
56      end
57    end
58
59    private
60    def fill_options
61      if @options[:format] and @options.key?(:compression)
62        return
63      end
64
65      if @input.is_a?(Buffer)
66        info = {}
67      else
68        extension = PathExtension.new(@input)
69        info = extension.extract
70      end
71      format = info[:format]
72      @options = @options.dup
73      if format and respond_to?("load_as_#{format}", true)
74        @options[:format] ||= format.to_sym
75      else
76        @options[:format] ||= :arrow
77      end
78      unless @options.key?(:compression)
79        @options[:compression] = info[:compression]
80      end
81    end
82
83    def open_input_stream
84      if @input.is_a?(Buffer)
85        BufferInputStream.new(@input)
86      else
87        MemoryMappedInputStream.new(@input)
88      end
89    end
90
91    def load_raw(input, reader)
92      schema = reader.schema
93      record_batches = []
94      reader.each do |record_batch|
95        record_batches << record_batch
96      end
97      table = Table.new(schema, record_batches)
98      table.instance_variable_set(:@input, input)
99      table
100    end
101
102    def load_as_arrow
103      input = nil
104      reader = nil
105      error = nil
106      reader_class_candidates = [
107        RecordBatchFileReader,
108        RecordBatchStreamReader,
109      ]
110      reader_class_candidates.each do |reader_class_candidate|
111        input = open_input_stream
112        begin
113          reader = reader_class_candidate.new(input)
114        rescue Arrow::Error
115          error = $!
116        else
117          break
118        end
119      end
120      raise error if reader.nil?
121      load_raw(input, reader)
122    end
123
124    # @since 1.0.0
125    def load_as_arrow_file
126      input = open_input_stream
127      reader = RecordBatchFileReader.new(input)
128      load_raw(input, reader)
129    end
130
131    # @deprecated Use `format: :arrow_file` instead.
132    def load_as_batch
133      load_as_arrow_file
134    end
135
136    # @since 1.0.0
137    def load_as_arrow_streaming
138      input = open_input_stream
139      reader = RecordBatchStreamReader.new(input)
140      load_raw(input, reader)
141    end
142
143    # @deprecated Use `format: :arrow_streaming` instead.
144    def load_as_stream
145      load_as_arrow_streaming
146    end
147
148    if Arrow.const_defined?(:ORCFileReader)
149      def load_as_orc
150        input = open_input_stream
151        reader = ORCFileReader.new(input)
152        field_indexes = @options[:field_indexes]
153        reader.set_field_indexes(field_indexes) if field_indexes
154        table = reader.read_stripes
155        table.instance_variable_set(:@input, input)
156        table
157      end
158    end
159
160    def csv_load(options)
161      options.delete(:format)
162      if @input.is_a?(Buffer)
163        CSVLoader.load(@input.data.to_s, **options)
164      else
165        CSVLoader.load(Pathname.new(@input), **options)
166      end
167    end
168
169    def load_as_csv
170      csv_load(@options.dup)
171    end
172
173    def load_as_tsv
174      options = @options.dup
175      options[:delimiter] = "\t"
176      csv_load(options.dup)
177    end
178
179    def load_as_feather
180      input = open_input_stream
181      reader = FeatherFileReader.new(input)
182      table = reader.read
183      table.instance_variable_set(:@input, input)
184      table
185    end
186  end
187end
188