1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include "converters.hpp"
21 
22 namespace red_arrow {
23   namespace {
24     class RawRecordsBuilder : private Converter, public arrow::ArrayVisitor {
25     public:
RawRecordsBuilder(VALUE records,int n_columns)26       explicit RawRecordsBuilder(VALUE records, int n_columns)
27         : Converter(),
28           records_(records),
29           n_columns_(n_columns) {
30       }
31 
build(const arrow::RecordBatch & record_batch)32       void build(const arrow::RecordBatch& record_batch) {
33         rb::protect([&] {
34           const auto n_rows = record_batch.num_rows();
35           for (int64_t i = 0; i < n_rows; ++i) {
36             auto record = rb_ary_new_capa(n_columns_);
37             rb_ary_push(records_, record);
38           }
39           row_offset_ = 0;
40           for (int i = 0; i < n_columns_; ++i) {
41             const auto array = record_batch.column(i).get();
42             column_index_ = i;
43             check_status(array->Accept(this),
44                          "[record-batch][raw-records]");
45           }
46           return Qnil;
47         });
48       }
49 
build(const arrow::Table & table)50       void build(const arrow::Table& table) {
51         rb::protect([&] {
52           const auto n_rows = table.num_rows();
53           for (int64_t i = 0; i < n_rows; ++i) {
54             auto record = rb_ary_new_capa(n_columns_);
55             rb_ary_push(records_, record);
56           }
57           for (int i = 0; i < n_columns_; ++i) {
58             const auto& chunked_array = table.column(i).get();
59             column_index_ = i;
60             row_offset_ = 0;
61             for (const auto array : chunked_array->chunks()) {
62               check_status(array->Accept(this),
63                            "[table][raw-records]");
64               row_offset_ += array->length();
65             }
66           }
67           return Qnil;
68         });
69       }
70 
71 #define VISIT(TYPE)                                                     \
72       arrow::Status Visit(const arrow::TYPE ## Array& array) override { \
73         convert(array);                                                 \
74         return arrow::Status::OK();                                     \
75       }
76 
77       VISIT(Null)
78       VISIT(Boolean)
79       VISIT(Int8)
80       VISIT(Int16)
81       VISIT(Int32)
82       VISIT(Int64)
83       VISIT(UInt8)
84       VISIT(UInt16)
85       VISIT(UInt32)
86       VISIT(UInt64)
87       // TODO
88       // VISIT(HalfFloat)
89       VISIT(Float)
90       VISIT(Double)
91       VISIT(Binary)
92       VISIT(String)
93       VISIT(FixedSizeBinary)
94       VISIT(Date32)
95       VISIT(Date64)
96       VISIT(Time32)
97       VISIT(Time64)
98       VISIT(Timestamp)
99       // TODO
100       // VISIT(Interval)
101       VISIT(List)
102       VISIT(Struct)
103       VISIT(SparseUnion)
104       VISIT(DenseUnion)
105       VISIT(Dictionary)
106       VISIT(Decimal128)
107       VISIT(Decimal256)
108       // TODO
109       // VISIT(Extension)
110 
111 #undef VISIT
112 
113     private:
114       template <typename ArrayType>
convert(const ArrayType & array)115       void convert(const ArrayType& array) {
116         const auto n = array.length();
117         if (array.null_count() > 0) {
118           for (int64_t i = 0, ii = row_offset_; i < n; ++i, ++ii) {
119             auto value = Qnil;
120             if (!array.IsNull(i)) {
121               value = convert_value(array, i);
122             }
123             auto record = rb_ary_entry(records_, ii);
124             rb_ary_store(record, column_index_, value);
125           }
126         } else {
127           for (int64_t i = 0, ii = row_offset_; i < n; ++i, ++ii) {
128             auto record = rb_ary_entry(records_, ii);
129             rb_ary_store(record, column_index_, convert_value(array, i));
130           }
131         }
132       }
133 
134       // Destination for converted records.
135       VALUE records_;
136 
137       // The current column index.
138       int column_index_;
139 
140       // The current row offset.
141       int64_t row_offset_;
142 
143       // The number of columns.
144       const int n_columns_;
145     };
146   }
147 
148   VALUE
record_batch_raw_records(VALUE rb_record_batch)149   record_batch_raw_records(VALUE rb_record_batch) {
150     auto garrow_record_batch = GARROW_RECORD_BATCH(RVAL2GOBJ(rb_record_batch));
151     auto record_batch = garrow_record_batch_get_raw(garrow_record_batch).get();
152     const auto n_rows = record_batch->num_rows();
153     const auto n_columns = record_batch->num_columns();
154     auto records = rb_ary_new_capa(n_rows);
155 
156     try {
157       RawRecordsBuilder builder(records, n_columns);
158       builder.build(*record_batch);
159     } catch (rb::State& state) {
160       state.jump();
161     }
162 
163     return records;
164   }
165 
166   VALUE
table_raw_records(VALUE rb_table)167   table_raw_records(VALUE rb_table) {
168     auto garrow_table = GARROW_TABLE(RVAL2GOBJ(rb_table));
169     auto table = garrow_table_get_raw(garrow_table).get();
170     const auto n_rows = table->num_rows();
171     const auto n_columns = table->num_columns();
172     auto records = rb_ary_new_capa(n_rows);
173 
174     try {
175       RawRecordsBuilder builder(records, n_columns);
176       builder.build(*table);
177     } catch (rb::State& state) {
178       state.jump();
179     }
180 
181     return records;
182   }
183 }
184