1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/dataset/partition.h"
19
20 #include <algorithm>
21 #include <memory>
22 #include <utility>
23 #include <vector>
24
25 #include "arrow/array/array_base.h"
26 #include "arrow/array/array_dict.h"
27 #include "arrow/array/array_nested.h"
28 #include "arrow/array/builder_dict.h"
29 #include "arrow/compute/api_aggregate.h"
30 #include "arrow/compute/api_scalar.h"
31 #include "arrow/compute/api_vector.h"
32 #include "arrow/compute/cast.h"
33 #include "arrow/compute/exec/expression_internal.h"
34 #include "arrow/dataset/dataset_internal.h"
35 #include "arrow/filesystem/path_util.h"
36 #include "arrow/scalar.h"
37 #include "arrow/util/int_util_internal.h"
38 #include "arrow/util/logging.h"
39 #include "arrow/util/make_unique.h"
40 #include "arrow/util/string_view.h"
41 #include "arrow/util/uri.h"
42 #include "arrow/util/utf8.h"
43
44 namespace arrow {
45
46 using internal::checked_cast;
47 using internal::checked_pointer_cast;
48 using util::string_view;
49
50 using internal::DictionaryMemoTable;
51
52 namespace dataset {
53
54 namespace {
55 /// Apply UriUnescape, then ensure the results are valid UTF-8.
SafeUriUnescape(util::string_view encoded)56 Result<std::string> SafeUriUnescape(util::string_view encoded) {
57 auto decoded = ::arrow::internal::UriUnescape(encoded);
58 if (!util::ValidateUTF8(decoded)) {
59 return Status::Invalid("Partition segment was not valid UTF-8 after URL decoding: ",
60 encoded);
61 }
62 return decoded;
63 }
64 } // namespace
65
Default()66 std::shared_ptr<Partitioning> Partitioning::Default() {
67 class DefaultPartitioning : public Partitioning {
68 public:
69 DefaultPartitioning() : Partitioning(::arrow::schema({})) {}
70
71 std::string type_name() const override { return "default"; }
72
73 Result<compute::Expression> Parse(const std::string& path) const override {
74 return compute::literal(true);
75 }
76
77 Result<std::string> Format(const compute::Expression& expr) const override {
78 return Status::NotImplemented("formatting paths from ", type_name(),
79 " Partitioning");
80 }
81
82 Result<PartitionedBatches> Partition(
83 const std::shared_ptr<RecordBatch>& batch) const override {
84 return PartitionedBatches{{batch}, {compute::literal(true)}};
85 }
86 };
87
88 return std::make_shared<DefaultPartitioning>();
89 }
90
ApplyGroupings(const ListArray & groupings,const std::shared_ptr<RecordBatch> & batch)91 static Result<RecordBatchVector> ApplyGroupings(
92 const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
93 ARROW_ASSIGN_OR_RAISE(Datum sorted,
94 compute::Take(batch, groupings.data()->child_data[0]));
95
96 const auto& sorted_batch = *sorted.record_batch();
97
98 RecordBatchVector out(static_cast<size_t>(groupings.length()));
99 for (size_t i = 0; i < out.size(); ++i) {
100 out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
101 }
102
103 return out;
104 }
105
Partition(const std::shared_ptr<RecordBatch> & batch) const106 Result<Partitioning::PartitionedBatches> KeyValuePartitioning::Partition(
107 const std::shared_ptr<RecordBatch>& batch) const {
108 std::vector<int> key_indices;
109 int num_keys = 0;
110
111 // assemble vector of indices of fields in batch on which we'll partition
112 for (const auto& partition_field : schema_->fields()) {
113 ARROW_ASSIGN_OR_RAISE(
114 auto match, FieldRef(partition_field->name()).FindOneOrNone(*batch->schema()))
115
116 if (match.empty()) continue;
117 key_indices.push_back(match[0]);
118 ++num_keys;
119 }
120
121 if (key_indices.empty()) {
122 // no fields to group by; return the whole batch
123 return PartitionedBatches{{batch}, {compute::literal(true)}};
124 }
125
126 // assemble an ExecBatch of the key columns
127 compute::ExecBatch key_batch({}, batch->num_rows());
128 for (int i : key_indices) {
129 key_batch.values.emplace_back(batch->column_data(i));
130 }
131
132 ARROW_ASSIGN_OR_RAISE(auto grouper,
133 compute::internal::Grouper::Make(key_batch.GetDescriptors()));
134
135 ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
136
137 auto ids = id_batch.array_as<UInt32Array>();
138 ARROW_ASSIGN_OR_RAISE(auto groupings, compute::internal::Grouper::MakeGroupings(
139 *ids, grouper->num_groups()));
140
141 ARROW_ASSIGN_OR_RAISE(auto uniques, grouper->GetUniques());
142 ArrayVector unique_arrays(num_keys);
143 for (int i = 0; i < num_keys; ++i) {
144 unique_arrays[i] = uniques.values[i].make_array();
145 }
146
147 PartitionedBatches out;
148
149 // assemble partition expressions from the unique keys
150 out.expressions.resize(grouper->num_groups());
151 for (uint32_t group = 0; group < grouper->num_groups(); ++group) {
152 std::vector<compute::Expression> exprs(num_keys);
153
154 for (int i = 0; i < num_keys; ++i) {
155 ARROW_ASSIGN_OR_RAISE(auto val, unique_arrays[i]->GetScalar(group));
156 const auto& name = batch->schema()->field(key_indices[i])->name();
157
158 exprs[i] = val->is_valid ? compute::equal(compute::field_ref(name),
159 compute::literal(std::move(val)))
160 : compute::is_null(compute::field_ref(name));
161 }
162 out.expressions[group] = and_(std::move(exprs));
163 }
164
165 // remove key columns from batch to which we'll be applying the groupings
166 auto rest = batch;
167 std::sort(key_indices.begin(), key_indices.end(), std::greater<int>());
168 for (int i : key_indices) {
169 // indices are in descending order; indices larger than i (which would be invalidated
170 // here) have already been handled
171 ARROW_ASSIGN_OR_RAISE(rest, rest->RemoveColumn(i));
172 }
173 ARROW_ASSIGN_OR_RAISE(out.batches, ApplyGroupings(*groupings, rest));
174
175 return out;
176 }
177
operator <<(std::ostream & os,SegmentEncoding segment_encoding)178 std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding) {
179 switch (segment_encoding) {
180 case SegmentEncoding::None:
181 os << "SegmentEncoding::None";
182 break;
183 case SegmentEncoding::Uri:
184 os << "SegmentEncoding::Uri";
185 break;
186 default:
187 os << "(invalid SegmentEncoding " << static_cast<int8_t>(segment_encoding) << ")";
188 break;
189 }
190 return os;
191 }
192
ConvertKey(const Key & key) const193 Result<compute::Expression> KeyValuePartitioning::ConvertKey(const Key& key) const {
194 ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(key.name).FindOneOrNone(*schema_));
195 if (match.empty()) {
196 return compute::literal(true);
197 }
198
199 auto field_index = match[0];
200 auto field = schema_->field(field_index);
201
202 std::shared_ptr<Scalar> converted;
203
204 if (!key.value.has_value()) {
205 return compute::is_null(compute::field_ref(field->name()));
206 } else if (field->type()->id() == Type::DICTIONARY) {
207 if (dictionaries_.empty() || dictionaries_[field_index] == nullptr) {
208 return Status::Invalid("No dictionary provided for dictionary field ",
209 field->ToString());
210 }
211
212 DictionaryScalar::ValueType value;
213 value.dictionary = dictionaries_[field_index];
214
215 const auto& dictionary_type = checked_cast<const DictionaryType&>(*field->type());
216 if (!value.dictionary->type()->Equals(dictionary_type.value_type())) {
217 return Status::TypeError("Dictionary supplied for field ", field->ToString(),
218 " had incorrect type ",
219 value.dictionary->type()->ToString());
220 }
221
222 // look up the partition value in the dictionary
223 ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(value.dictionary->type(), *key.value));
224 ARROW_ASSIGN_OR_RAISE(auto index, compute::IndexIn(converted, value.dictionary));
225 auto to_index_type = compute::CastOptions::Safe(dictionary_type.index_type());
226 ARROW_ASSIGN_OR_RAISE(index, compute::Cast(index, to_index_type));
227 value.index = index.scalar();
228 if (!value.index->is_valid) {
229 return Status::Invalid("Dictionary supplied for field ", field->ToString(),
230 " does not contain '", *key.value, "'");
231 }
232 converted = std::make_shared<DictionaryScalar>(std::move(value), field->type());
233 } else {
234 ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(field->type(), *key.value));
235 }
236
237 return compute::equal(compute::field_ref(field->name()),
238 compute::literal(std::move(converted)));
239 }
240
Parse(const std::string & path) const241 Result<compute::Expression> KeyValuePartitioning::Parse(const std::string& path) const {
242 std::vector<compute::Expression> expressions;
243
244 ARROW_ASSIGN_OR_RAISE(auto parsed, ParseKeys(path));
245 for (const Key& key : parsed) {
246 ARROW_ASSIGN_OR_RAISE(auto expr, ConvertKey(key));
247 if (expr == compute::literal(true)) continue;
248 expressions.push_back(std::move(expr));
249 }
250
251 return and_(std::move(expressions));
252 }
253
Format(const compute::Expression & expr) const254 Result<std::string> KeyValuePartitioning::Format(const compute::Expression& expr) const {
255 ScalarVector values{static_cast<size_t>(schema_->num_fields()), nullptr};
256
257 ARROW_ASSIGN_OR_RAISE(auto known_values, ExtractKnownFieldValues(expr));
258 for (const auto& ref_value : known_values.map) {
259 if (!ref_value.second.is_scalar()) {
260 return Status::Invalid("non-scalar partition key ", ref_value.second.ToString());
261 }
262
263 ARROW_ASSIGN_OR_RAISE(auto match, ref_value.first.FindOneOrNone(*schema_));
264 if (match.empty()) continue;
265
266 auto value = ref_value.second.scalar();
267
268 const auto& field = schema_->field(match[0]);
269 if (!value->type->Equals(field->type())) {
270 if (value->is_valid) {
271 auto maybe_converted = compute::Cast(value, field->type());
272 if (!maybe_converted.ok()) {
273 return Status::TypeError("Error converting scalar ", value->ToString(),
274 " (of type ", *value->type,
275 ") to a partition key for ", field->ToString(), ": ",
276 maybe_converted.status().message());
277 }
278 value = maybe_converted->scalar();
279 } else {
280 value = MakeNullScalar(field->type());
281 }
282 }
283
284 if (value->type->id() == Type::DICTIONARY) {
285 ARROW_ASSIGN_OR_RAISE(
286 value, checked_cast<const DictionaryScalar&>(*value).GetEncodedValue());
287 }
288
289 values[match[0]] = std::move(value);
290 }
291
292 return FormatValues(values);
293 }
294
DirectoryPartitioning(std::shared_ptr<Schema> schema,ArrayVector dictionaries,KeyValuePartitioningOptions options)295 DirectoryPartitioning::DirectoryPartitioning(std::shared_ptr<Schema> schema,
296 ArrayVector dictionaries,
297 KeyValuePartitioningOptions options)
298 : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options) {
299 util::InitializeUTF8();
300 }
301
ParseKeys(const std::string & path) const302 Result<std::vector<KeyValuePartitioning::Key>> DirectoryPartitioning::ParseKeys(
303 const std::string& path) const {
304 std::vector<Key> keys;
305
306 int i = 0;
307 for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
308 if (i >= schema_->num_fields()) break;
309
310 switch (options_.segment_encoding) {
311 case SegmentEncoding::None: {
312 if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) {
313 return Status::Invalid("Partition segment was not valid UTF-8: ", segment);
314 }
315 keys.push_back({schema_->field(i++)->name(), std::move(segment)});
316 break;
317 }
318 case SegmentEncoding::Uri: {
319 ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment));
320 keys.push_back({schema_->field(i++)->name(), std::move(decoded)});
321 break;
322 }
323 default:
324 return Status::NotImplemented("Unknown segment encoding: ",
325 options_.segment_encoding);
326 }
327 }
328
329 return keys;
330 }
331
NextValid(const ScalarVector & values,int first_null)332 inline util::optional<int> NextValid(const ScalarVector& values, int first_null) {
333 auto it = std::find_if(values.begin() + first_null + 1, values.end(),
334 [](const std::shared_ptr<Scalar>& v) { return v != nullptr; });
335
336 if (it == values.end()) {
337 return util::nullopt;
338 }
339
340 return static_cast<int>(it - values.begin());
341 }
342
FormatValues(const ScalarVector & values) const343 Result<std::string> DirectoryPartitioning::FormatValues(
344 const ScalarVector& values) const {
345 std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));
346
347 for (int i = 0; i < schema_->num_fields(); ++i) {
348 if (values[i] != nullptr && values[i]->is_valid) {
349 segments[i] = values[i]->ToString();
350 continue;
351 }
352
353 if (auto illegal_index = NextValid(values, i)) {
354 // XXX maybe we should just ignore keys provided after the first absent one?
355 return Status::Invalid("No partition key for ", schema_->field(i)->name(),
356 " but a key was provided subsequently for ",
357 schema_->field(*illegal_index)->name(), ".");
358 }
359
360 // if all subsequent keys are absent we'll just print the available keys
361 break;
362 }
363
364 return fs::internal::JoinAbstractPath(std::move(segments));
365 }
366
AsPartitioningOptions() const367 KeyValuePartitioningOptions PartitioningFactoryOptions::AsPartitioningOptions() const {
368 KeyValuePartitioningOptions options;
369 options.segment_encoding = segment_encoding;
370 return options;
371 }
372
AsHivePartitioningOptions() const373 HivePartitioningOptions HivePartitioningFactoryOptions::AsHivePartitioningOptions()
374 const {
375 HivePartitioningOptions options;
376 options.segment_encoding = segment_encoding;
377 options.null_fallback = null_fallback;
378 return options;
379 }
380
381 namespace {
382 class KeyValuePartitioningFactory : public PartitioningFactory {
383 protected:
KeyValuePartitioningFactory(PartitioningFactoryOptions options)384 explicit KeyValuePartitioningFactory(PartitioningFactoryOptions options)
385 : options_(std::move(options)) {}
386
GetOrInsertField(const std::string & name)387 int GetOrInsertField(const std::string& name) {
388 auto it_inserted =
389 name_to_index_.emplace(name, static_cast<int>(name_to_index_.size()));
390
391 if (it_inserted.second) {
392 repr_memos_.push_back(MakeMemo());
393 }
394
395 return it_inserted.first->second;
396 }
397
InsertRepr(const std::string & name,util::optional<string_view> repr)398 Status InsertRepr(const std::string& name, util::optional<string_view> repr) {
399 auto field_index = GetOrInsertField(name);
400 if (repr.has_value()) {
401 return InsertRepr(field_index, *repr);
402 } else {
403 return Status::OK();
404 }
405 }
406
InsertRepr(int index,util::string_view repr)407 Status InsertRepr(int index, util::string_view repr) {
408 int dummy;
409 return repr_memos_[index]->GetOrInsert<StringType>(repr, &dummy);
410 }
411
DoInspect()412 Result<std::shared_ptr<Schema>> DoInspect() {
413 dictionaries_.assign(name_to_index_.size(), nullptr);
414
415 std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());
416 if (options_.schema) {
417 const auto requested_size = options_.schema->fields().size();
418 const auto inferred_size = fields.size();
419 if (inferred_size != requested_size) {
420 return Status::Invalid("Requested schema has ", requested_size,
421 " fields, but only ", inferred_size, " were detected");
422 }
423 }
424
425 for (const auto& name_index : name_to_index_) {
426 const auto& name = name_index.first;
427 auto index = name_index.second;
428
429 std::shared_ptr<ArrayData> reprs;
430 RETURN_NOT_OK(repr_memos_[index]->GetArrayData(0, &reprs));
431
432 if (reprs->length == 0) {
433 return Status::Invalid("No non-null segments were available for field '", name,
434 "'; couldn't infer type");
435 }
436
437 std::shared_ptr<Field> current_field;
438 std::shared_ptr<Array> dict;
439 if (options_.schema) {
440 // if we have a schema, use the schema type.
441 current_field = options_.schema->field(index);
442 auto cast_target = current_field->type();
443 if (is_dictionary(cast_target->id())) {
444 cast_target = checked_pointer_cast<DictionaryType>(cast_target)->value_type();
445 }
446 auto maybe_dict = compute::Cast(reprs, cast_target);
447 if (!maybe_dict.ok()) {
448 return Status::Invalid("Could not cast segments for partition field ",
449 current_field->name(), " to requested type ",
450 current_field->type()->ToString(),
451 " because: ", maybe_dict.status());
452 }
453 dict = maybe_dict.ValueOrDie().make_array();
454 } else {
455 // try casting to int32, otherwise bail and just use the string reprs
456 dict = compute::Cast(reprs, int32()).ValueOr(reprs).make_array();
457 auto type = dict->type();
458 if (options_.infer_dictionary) {
459 // wrap the inferred type in dictionary()
460 type = dictionary(int32(), std::move(type));
461 }
462 current_field = field(name, std::move(type));
463 }
464 fields[index] = std::move(current_field);
465 dictionaries_[index] = std::move(dict);
466 }
467
468 Reset();
469 return ::arrow::schema(std::move(fields));
470 }
471
FieldNames()472 std::vector<std::string> FieldNames() {
473 std::vector<std::string> names(name_to_index_.size());
474
475 for (auto kv : name_to_index_) {
476 names[kv.second] = kv.first;
477 }
478 return names;
479 }
480
Reset()481 virtual void Reset() {
482 name_to_index_.clear();
483 repr_memos_.clear();
484 }
485
MakeMemo()486 std::unique_ptr<DictionaryMemoTable> MakeMemo() {
487 return ::arrow::internal::make_unique<DictionaryMemoTable>(default_memory_pool(),
488 utf8());
489 }
490
491 PartitioningFactoryOptions options_;
492 ArrayVector dictionaries_;
493 std::unordered_map<std::string, int> name_to_index_;
494 std::vector<std::unique_ptr<DictionaryMemoTable>> repr_memos_;
495 };
496
497 class DirectoryPartitioningFactory : public KeyValuePartitioningFactory {
498 public:
DirectoryPartitioningFactory(std::vector<std::string> field_names,PartitioningFactoryOptions options)499 DirectoryPartitioningFactory(std::vector<std::string> field_names,
500 PartitioningFactoryOptions options)
501 : KeyValuePartitioningFactory(options), field_names_(std::move(field_names)) {
502 Reset();
503 util::InitializeUTF8();
504 }
505
type_name() const506 std::string type_name() const override { return "directory"; }
507
Inspect(const std::vector<std::string> & paths)508 Result<std::shared_ptr<Schema>> Inspect(
509 const std::vector<std::string>& paths) override {
510 for (auto path : paths) {
511 size_t field_index = 0;
512 for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
513 if (field_index == field_names_.size()) break;
514
515 switch (options_.segment_encoding) {
516 case SegmentEncoding::None: {
517 if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) {
518 return Status::Invalid("Partition segment was not valid UTF-8: ", segment);
519 }
520 RETURN_NOT_OK(InsertRepr(static_cast<int>(field_index++), segment));
521 break;
522 }
523 case SegmentEncoding::Uri: {
524 ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment));
525 RETURN_NOT_OK(InsertRepr(static_cast<int>(field_index++), decoded));
526 break;
527 }
528 default:
529 return Status::NotImplemented("Unknown segment encoding: ",
530 options_.segment_encoding);
531 }
532 }
533 }
534
535 return DoInspect();
536 }
537
Finish(const std::shared_ptr<Schema> & schema) const538 Result<std::shared_ptr<Partitioning>> Finish(
539 const std::shared_ptr<Schema>& schema) const override {
540 for (FieldRef ref : field_names_) {
541 // ensure all of field_names_ are present in schema
542 RETURN_NOT_OK(ref.FindOne(*schema).status());
543 }
544
545 // drop fields which aren't in field_names_
546 auto out_schema = SchemaFromColumnNames(schema, field_names_);
547
548 return std::make_shared<DirectoryPartitioning>(std::move(out_schema), dictionaries_,
549 options_.AsPartitioningOptions());
550 }
551
552 private:
Reset()553 void Reset() override {
554 KeyValuePartitioningFactory::Reset();
555
556 for (const auto& name : field_names_) {
557 GetOrInsertField(name);
558 }
559 }
560
561 std::vector<std::string> field_names_;
562 };
563
564 } // namespace
565
MakeFactory(std::vector<std::string> field_names,PartitioningFactoryOptions options)566 std::shared_ptr<PartitioningFactory> DirectoryPartitioning::MakeFactory(
567 std::vector<std::string> field_names, PartitioningFactoryOptions options) {
568 return std::shared_ptr<PartitioningFactory>(
569 new DirectoryPartitioningFactory(std::move(field_names), options));
570 }
571
ParseKey(const std::string & segment,const HivePartitioningOptions & options)572 Result<util::optional<KeyValuePartitioning::Key>> HivePartitioning::ParseKey(
573 const std::string& segment, const HivePartitioningOptions& options) {
574 auto name_end = string_view(segment).find_first_of('=');
575 // Not round-trippable
576 if (name_end == string_view::npos) {
577 return util::nullopt;
578 }
579
580 // Static method, so we have no better place for it
581 util::InitializeUTF8();
582
583 auto name = segment.substr(0, name_end);
584 std::string value;
585 switch (options.segment_encoding) {
586 case SegmentEncoding::None: {
587 value = segment.substr(name_end + 1);
588 if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(value))) {
589 return Status::Invalid("Partition segment was not valid UTF-8: ", value);
590 }
591 break;
592 }
593 case SegmentEncoding::Uri: {
594 auto raw_value = util::string_view(segment).substr(name_end + 1);
595 ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value));
596 break;
597 }
598 default:
599 return Status::NotImplemented("Unknown segment encoding: ",
600 options.segment_encoding);
601 }
602
603 if (value == options.null_fallback) {
604 return Key{std::move(name), util::nullopt};
605 }
606 return Key{std::move(name), std::move(value)};
607 }
608
ParseKeys(const std::string & path) const609 Result<std::vector<KeyValuePartitioning::Key>> HivePartitioning::ParseKeys(
610 const std::string& path) const {
611 std::vector<Key> keys;
612
613 for (const auto& segment : fs::internal::SplitAbstractPath(path)) {
614 ARROW_ASSIGN_OR_RAISE(auto maybe_key, ParseKey(segment, hive_options_));
615 if (auto key = maybe_key) {
616 keys.push_back(std::move(*key));
617 }
618 }
619
620 return keys;
621 }
622
FormatValues(const ScalarVector & values) const623 Result<std::string> HivePartitioning::FormatValues(const ScalarVector& values) const {
624 std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));
625
626 for (int i = 0; i < schema_->num_fields(); ++i) {
627 const std::string& name = schema_->field(i)->name();
628
629 if (values[i] == nullptr) {
630 segments[i] = "";
631 } else if (!values[i]->is_valid) {
632 // If no key is available just provide a placeholder segment to maintain the
633 // field_index <-> path nesting relation
634 segments[i] = name + "=" + hive_options_.null_fallback;
635 } else {
636 segments[i] = name + "=" + values[i]->ToString();
637 }
638 }
639
640 return fs::internal::JoinAbstractPath(std::move(segments));
641 }
642
643 class HivePartitioningFactory : public KeyValuePartitioningFactory {
644 public:
HivePartitioningFactory(HivePartitioningFactoryOptions options)645 explicit HivePartitioningFactory(HivePartitioningFactoryOptions options)
646 : KeyValuePartitioningFactory(options), options_(std::move(options)) {}
647
type_name() const648 std::string type_name() const override { return "hive"; }
649
Inspect(const std::vector<std::string> & paths)650 Result<std::shared_ptr<Schema>> Inspect(
651 const std::vector<std::string>& paths) override {
652 auto options = options_.AsHivePartitioningOptions();
653 for (auto path : paths) {
654 for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
655 ARROW_ASSIGN_OR_RAISE(auto maybe_key,
656 HivePartitioning::ParseKey(segment, options));
657 if (auto key = maybe_key) {
658 RETURN_NOT_OK(InsertRepr(key->name, key->value));
659 }
660 }
661 }
662
663 field_names_ = FieldNames();
664 return DoInspect();
665 }
666
Finish(const std::shared_ptr<Schema> & schema) const667 Result<std::shared_ptr<Partitioning>> Finish(
668 const std::shared_ptr<Schema>& schema) const override {
669 if (dictionaries_.empty()) {
670 return std::make_shared<HivePartitioning>(schema, dictionaries_);
671 } else {
672 for (FieldRef ref : field_names_) {
673 // ensure all of field_names_ are present in schema
674 RETURN_NOT_OK(ref.FindOne(*schema));
675 }
676
677 // drop fields which aren't in field_names_
678 auto out_schema = SchemaFromColumnNames(schema, field_names_);
679
680 return std::make_shared<HivePartitioning>(std::move(out_schema), dictionaries_,
681 options_.AsHivePartitioningOptions());
682 }
683 }
684
685 private:
686 const HivePartitioningFactoryOptions options_;
687 std::vector<std::string> field_names_;
688 };
689
MakeFactory(HivePartitioningFactoryOptions options)690 std::shared_ptr<PartitioningFactory> HivePartitioning::MakeFactory(
691 HivePartitioningFactoryOptions options) {
692 return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory(options));
693 }
694
StripPrefixAndFilename(const std::string & path,const std::string & prefix)695 std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) {
696 auto maybe_base_less = fs::internal::RemoveAncestor(prefix, path);
697 auto base_less = maybe_base_less ? std::string(*maybe_base_less) : path;
698 auto basename_filename = fs::internal::GetAbstractPathParent(base_less);
699 return basename_filename.first;
700 }
701
StripPrefixAndFilename(const std::vector<std::string> & paths,const std::string & prefix)702 std::vector<std::string> StripPrefixAndFilename(const std::vector<std::string>& paths,
703 const std::string& prefix) {
704 std::vector<std::string> result;
705 result.reserve(paths.size());
706 for (const auto& path : paths) {
707 result.emplace_back(StripPrefixAndFilename(path, prefix));
708 }
709 return result;
710 }
711
StripPrefixAndFilename(const std::vector<fs::FileInfo> & files,const std::string & prefix)712 std::vector<std::string> StripPrefixAndFilename(const std::vector<fs::FileInfo>& files,
713 const std::string& prefix) {
714 std::vector<std::string> result;
715 result.reserve(files.size());
716 for (const auto& info : files) {
717 result.emplace_back(StripPrefixAndFilename(info.path(), prefix));
718 }
719 return result;
720 }
721
GetOrInferSchema(const std::vector<std::string> & paths)722 Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
723 const std::vector<std::string>& paths) {
724 if (auto part = partitioning()) {
725 return part->schema();
726 }
727
728 return factory()->Inspect(paths);
729 }
730
731 } // namespace dataset
732 } // namespace arrow
733