1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/dataset/partition.h"
19 
20 #include <algorithm>
21 #include <memory>
22 #include <utility>
23 #include <vector>
24 
25 #include "arrow/array/array_base.h"
26 #include "arrow/array/array_dict.h"
27 #include "arrow/array/array_nested.h"
28 #include "arrow/array/builder_dict.h"
29 #include "arrow/compute/api_aggregate.h"
30 #include "arrow/compute/api_scalar.h"
31 #include "arrow/compute/api_vector.h"
32 #include "arrow/compute/cast.h"
33 #include "arrow/compute/exec/expression_internal.h"
34 #include "arrow/dataset/dataset_internal.h"
35 #include "arrow/filesystem/path_util.h"
36 #include "arrow/scalar.h"
37 #include "arrow/util/int_util_internal.h"
38 #include "arrow/util/logging.h"
39 #include "arrow/util/make_unique.h"
40 #include "arrow/util/string_view.h"
41 #include "arrow/util/uri.h"
42 #include "arrow/util/utf8.h"
43 
44 namespace arrow {
45 
46 using internal::checked_cast;
47 using internal::checked_pointer_cast;
48 using util::string_view;
49 
50 using internal::DictionaryMemoTable;
51 
52 namespace dataset {
53 
54 namespace {
55 /// Apply UriUnescape, then ensure the results are valid UTF-8.
SafeUriUnescape(util::string_view encoded)56 Result<std::string> SafeUriUnescape(util::string_view encoded) {
57   auto decoded = ::arrow::internal::UriUnescape(encoded);
58   if (!util::ValidateUTF8(decoded)) {
59     return Status::Invalid("Partition segment was not valid UTF-8 after URL decoding: ",
60                            encoded);
61   }
62   return decoded;
63 }
64 }  // namespace
65 
Default()66 std::shared_ptr<Partitioning> Partitioning::Default() {
67   class DefaultPartitioning : public Partitioning {
68    public:
69     DefaultPartitioning() : Partitioning(::arrow::schema({})) {}
70 
71     std::string type_name() const override { return "default"; }
72 
73     Result<compute::Expression> Parse(const std::string& path) const override {
74       return compute::literal(true);
75     }
76 
77     Result<std::string> Format(const compute::Expression& expr) const override {
78       return Status::NotImplemented("formatting paths from ", type_name(),
79                                     " Partitioning");
80     }
81 
82     Result<PartitionedBatches> Partition(
83         const std::shared_ptr<RecordBatch>& batch) const override {
84       return PartitionedBatches{{batch}, {compute::literal(true)}};
85     }
86   };
87 
88   return std::make_shared<DefaultPartitioning>();
89 }
90 
ApplyGroupings(const ListArray & groupings,const std::shared_ptr<RecordBatch> & batch)91 static Result<RecordBatchVector> ApplyGroupings(
92     const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
93   ARROW_ASSIGN_OR_RAISE(Datum sorted,
94                         compute::Take(batch, groupings.data()->child_data[0]));
95 
96   const auto& sorted_batch = *sorted.record_batch();
97 
98   RecordBatchVector out(static_cast<size_t>(groupings.length()));
99   for (size_t i = 0; i < out.size(); ++i) {
100     out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
101   }
102 
103   return out;
104 }
105 
Partition(const std::shared_ptr<RecordBatch> & batch) const106 Result<Partitioning::PartitionedBatches> KeyValuePartitioning::Partition(
107     const std::shared_ptr<RecordBatch>& batch) const {
108   std::vector<int> key_indices;
109   int num_keys = 0;
110 
111   // assemble vector of indices of fields in batch on which we'll partition
112   for (const auto& partition_field : schema_->fields()) {
113     ARROW_ASSIGN_OR_RAISE(
114         auto match, FieldRef(partition_field->name()).FindOneOrNone(*batch->schema()))
115 
116     if (match.empty()) continue;
117     key_indices.push_back(match[0]);
118     ++num_keys;
119   }
120 
121   if (key_indices.empty()) {
122     // no fields to group by; return the whole batch
123     return PartitionedBatches{{batch}, {compute::literal(true)}};
124   }
125 
126   // assemble an ExecBatch of the key columns
127   compute::ExecBatch key_batch({}, batch->num_rows());
128   for (int i : key_indices) {
129     key_batch.values.emplace_back(batch->column_data(i));
130   }
131 
132   ARROW_ASSIGN_OR_RAISE(auto grouper,
133                         compute::internal::Grouper::Make(key_batch.GetDescriptors()));
134 
135   ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
136 
137   auto ids = id_batch.array_as<UInt32Array>();
138   ARROW_ASSIGN_OR_RAISE(auto groupings, compute::internal::Grouper::MakeGroupings(
139                                             *ids, grouper->num_groups()));
140 
141   ARROW_ASSIGN_OR_RAISE(auto uniques, grouper->GetUniques());
142   ArrayVector unique_arrays(num_keys);
143   for (int i = 0; i < num_keys; ++i) {
144     unique_arrays[i] = uniques.values[i].make_array();
145   }
146 
147   PartitionedBatches out;
148 
149   // assemble partition expressions from the unique keys
150   out.expressions.resize(grouper->num_groups());
151   for (uint32_t group = 0; group < grouper->num_groups(); ++group) {
152     std::vector<compute::Expression> exprs(num_keys);
153 
154     for (int i = 0; i < num_keys; ++i) {
155       ARROW_ASSIGN_OR_RAISE(auto val, unique_arrays[i]->GetScalar(group));
156       const auto& name = batch->schema()->field(key_indices[i])->name();
157 
158       exprs[i] = val->is_valid ? compute::equal(compute::field_ref(name),
159                                                 compute::literal(std::move(val)))
160                                : compute::is_null(compute::field_ref(name));
161     }
162     out.expressions[group] = and_(std::move(exprs));
163   }
164 
165   // remove key columns from batch to which we'll be applying the groupings
166   auto rest = batch;
167   std::sort(key_indices.begin(), key_indices.end(), std::greater<int>());
168   for (int i : key_indices) {
169     // indices are in descending order; indices larger than i (which would be invalidated
170     // here) have already been handled
171     ARROW_ASSIGN_OR_RAISE(rest, rest->RemoveColumn(i));
172   }
173   ARROW_ASSIGN_OR_RAISE(out.batches, ApplyGroupings(*groupings, rest));
174 
175   return out;
176 }
177 
operator <<(std::ostream & os,SegmentEncoding segment_encoding)178 std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding) {
179   switch (segment_encoding) {
180     case SegmentEncoding::None:
181       os << "SegmentEncoding::None";
182       break;
183     case SegmentEncoding::Uri:
184       os << "SegmentEncoding::Uri";
185       break;
186     default:
187       os << "(invalid SegmentEncoding " << static_cast<int8_t>(segment_encoding) << ")";
188       break;
189   }
190   return os;
191 }
192 
ConvertKey(const Key & key) const193 Result<compute::Expression> KeyValuePartitioning::ConvertKey(const Key& key) const {
194   ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(key.name).FindOneOrNone(*schema_));
195   if (match.empty()) {
196     return compute::literal(true);
197   }
198 
199   auto field_index = match[0];
200   auto field = schema_->field(field_index);
201 
202   std::shared_ptr<Scalar> converted;
203 
204   if (!key.value.has_value()) {
205     return compute::is_null(compute::field_ref(field->name()));
206   } else if (field->type()->id() == Type::DICTIONARY) {
207     if (dictionaries_.empty() || dictionaries_[field_index] == nullptr) {
208       return Status::Invalid("No dictionary provided for dictionary field ",
209                              field->ToString());
210     }
211 
212     DictionaryScalar::ValueType value;
213     value.dictionary = dictionaries_[field_index];
214 
215     const auto& dictionary_type = checked_cast<const DictionaryType&>(*field->type());
216     if (!value.dictionary->type()->Equals(dictionary_type.value_type())) {
217       return Status::TypeError("Dictionary supplied for field ", field->ToString(),
218                                " had incorrect type ",
219                                value.dictionary->type()->ToString());
220     }
221 
222     // look up the partition value in the dictionary
223     ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(value.dictionary->type(), *key.value));
224     ARROW_ASSIGN_OR_RAISE(auto index, compute::IndexIn(converted, value.dictionary));
225     auto to_index_type = compute::CastOptions::Safe(dictionary_type.index_type());
226     ARROW_ASSIGN_OR_RAISE(index, compute::Cast(index, to_index_type));
227     value.index = index.scalar();
228     if (!value.index->is_valid) {
229       return Status::Invalid("Dictionary supplied for field ", field->ToString(),
230                              " does not contain '", *key.value, "'");
231     }
232     converted = std::make_shared<DictionaryScalar>(std::move(value), field->type());
233   } else {
234     ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(field->type(), *key.value));
235   }
236 
237   return compute::equal(compute::field_ref(field->name()),
238                         compute::literal(std::move(converted)));
239 }
240 
Parse(const std::string & path) const241 Result<compute::Expression> KeyValuePartitioning::Parse(const std::string& path) const {
242   std::vector<compute::Expression> expressions;
243 
244   ARROW_ASSIGN_OR_RAISE(auto parsed, ParseKeys(path));
245   for (const Key& key : parsed) {
246     ARROW_ASSIGN_OR_RAISE(auto expr, ConvertKey(key));
247     if (expr == compute::literal(true)) continue;
248     expressions.push_back(std::move(expr));
249   }
250 
251   return and_(std::move(expressions));
252 }
253 
Format(const compute::Expression & expr) const254 Result<std::string> KeyValuePartitioning::Format(const compute::Expression& expr) const {
255   ScalarVector values{static_cast<size_t>(schema_->num_fields()), nullptr};
256 
257   ARROW_ASSIGN_OR_RAISE(auto known_values, ExtractKnownFieldValues(expr));
258   for (const auto& ref_value : known_values.map) {
259     if (!ref_value.second.is_scalar()) {
260       return Status::Invalid("non-scalar partition key ", ref_value.second.ToString());
261     }
262 
263     ARROW_ASSIGN_OR_RAISE(auto match, ref_value.first.FindOneOrNone(*schema_));
264     if (match.empty()) continue;
265 
266     auto value = ref_value.second.scalar();
267 
268     const auto& field = schema_->field(match[0]);
269     if (!value->type->Equals(field->type())) {
270       if (value->is_valid) {
271         auto maybe_converted = compute::Cast(value, field->type());
272         if (!maybe_converted.ok()) {
273           return Status::TypeError("Error converting scalar ", value->ToString(),
274                                    " (of type ", *value->type,
275                                    ") to a partition key for ", field->ToString(), ": ",
276                                    maybe_converted.status().message());
277         }
278         value = maybe_converted->scalar();
279       } else {
280         value = MakeNullScalar(field->type());
281       }
282     }
283 
284     if (value->type->id() == Type::DICTIONARY) {
285       ARROW_ASSIGN_OR_RAISE(
286           value, checked_cast<const DictionaryScalar&>(*value).GetEncodedValue());
287     }
288 
289     values[match[0]] = std::move(value);
290   }
291 
292   return FormatValues(values);
293 }
294 
DirectoryPartitioning(std::shared_ptr<Schema> schema,ArrayVector dictionaries,KeyValuePartitioningOptions options)295 DirectoryPartitioning::DirectoryPartitioning(std::shared_ptr<Schema> schema,
296                                              ArrayVector dictionaries,
297                                              KeyValuePartitioningOptions options)
298     : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options) {
299   util::InitializeUTF8();
300 }
301 
ParseKeys(const std::string & path) const302 Result<std::vector<KeyValuePartitioning::Key>> DirectoryPartitioning::ParseKeys(
303     const std::string& path) const {
304   std::vector<Key> keys;
305 
306   int i = 0;
307   for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
308     if (i >= schema_->num_fields()) break;
309 
310     switch (options_.segment_encoding) {
311       case SegmentEncoding::None: {
312         if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) {
313           return Status::Invalid("Partition segment was not valid UTF-8: ", segment);
314         }
315         keys.push_back({schema_->field(i++)->name(), std::move(segment)});
316         break;
317       }
318       case SegmentEncoding::Uri: {
319         ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment));
320         keys.push_back({schema_->field(i++)->name(), std::move(decoded)});
321         break;
322       }
323       default:
324         return Status::NotImplemented("Unknown segment encoding: ",
325                                       options_.segment_encoding);
326     }
327   }
328 
329   return keys;
330 }
331 
NextValid(const ScalarVector & values,int first_null)332 inline util::optional<int> NextValid(const ScalarVector& values, int first_null) {
333   auto it = std::find_if(values.begin() + first_null + 1, values.end(),
334                          [](const std::shared_ptr<Scalar>& v) { return v != nullptr; });
335 
336   if (it == values.end()) {
337     return util::nullopt;
338   }
339 
340   return static_cast<int>(it - values.begin());
341 }
342 
FormatValues(const ScalarVector & values) const343 Result<std::string> DirectoryPartitioning::FormatValues(
344     const ScalarVector& values) const {
345   std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));
346 
347   for (int i = 0; i < schema_->num_fields(); ++i) {
348     if (values[i] != nullptr && values[i]->is_valid) {
349       segments[i] = values[i]->ToString();
350       continue;
351     }
352 
353     if (auto illegal_index = NextValid(values, i)) {
354       // XXX maybe we should just ignore keys provided after the first absent one?
355       return Status::Invalid("No partition key for ", schema_->field(i)->name(),
356                              " but a key was provided subsequently for ",
357                              schema_->field(*illegal_index)->name(), ".");
358     }
359 
360     // if all subsequent keys are absent we'll just print the available keys
361     break;
362   }
363 
364   return fs::internal::JoinAbstractPath(std::move(segments));
365 }
366 
AsPartitioningOptions() const367 KeyValuePartitioningOptions PartitioningFactoryOptions::AsPartitioningOptions() const {
368   KeyValuePartitioningOptions options;
369   options.segment_encoding = segment_encoding;
370   return options;
371 }
372 
AsHivePartitioningOptions() const373 HivePartitioningOptions HivePartitioningFactoryOptions::AsHivePartitioningOptions()
374     const {
375   HivePartitioningOptions options;
376   options.segment_encoding = segment_encoding;
377   options.null_fallback = null_fallback;
378   return options;
379 }
380 
381 namespace {
382 class KeyValuePartitioningFactory : public PartitioningFactory {
383  protected:
KeyValuePartitioningFactory(PartitioningFactoryOptions options)384   explicit KeyValuePartitioningFactory(PartitioningFactoryOptions options)
385       : options_(std::move(options)) {}
386 
GetOrInsertField(const std::string & name)387   int GetOrInsertField(const std::string& name) {
388     auto it_inserted =
389         name_to_index_.emplace(name, static_cast<int>(name_to_index_.size()));
390 
391     if (it_inserted.second) {
392       repr_memos_.push_back(MakeMemo());
393     }
394 
395     return it_inserted.first->second;
396   }
397 
InsertRepr(const std::string & name,util::optional<string_view> repr)398   Status InsertRepr(const std::string& name, util::optional<string_view> repr) {
399     auto field_index = GetOrInsertField(name);
400     if (repr.has_value()) {
401       return InsertRepr(field_index, *repr);
402     } else {
403       return Status::OK();
404     }
405   }
406 
InsertRepr(int index,util::string_view repr)407   Status InsertRepr(int index, util::string_view repr) {
408     int dummy;
409     return repr_memos_[index]->GetOrInsert<StringType>(repr, &dummy);
410   }
411 
DoInspect()412   Result<std::shared_ptr<Schema>> DoInspect() {
413     dictionaries_.assign(name_to_index_.size(), nullptr);
414 
415     std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());
416     if (options_.schema) {
417       const auto requested_size = options_.schema->fields().size();
418       const auto inferred_size = fields.size();
419       if (inferred_size != requested_size) {
420         return Status::Invalid("Requested schema has ", requested_size,
421                                " fields, but only ", inferred_size, " were detected");
422       }
423     }
424 
425     for (const auto& name_index : name_to_index_) {
426       const auto& name = name_index.first;
427       auto index = name_index.second;
428 
429       std::shared_ptr<ArrayData> reprs;
430       RETURN_NOT_OK(repr_memos_[index]->GetArrayData(0, &reprs));
431 
432       if (reprs->length == 0) {
433         return Status::Invalid("No non-null segments were available for field '", name,
434                                "'; couldn't infer type");
435       }
436 
437       std::shared_ptr<Field> current_field;
438       std::shared_ptr<Array> dict;
439       if (options_.schema) {
440         // if we have a schema, use the schema type.
441         current_field = options_.schema->field(index);
442         auto cast_target = current_field->type();
443         if (is_dictionary(cast_target->id())) {
444           cast_target = checked_pointer_cast<DictionaryType>(cast_target)->value_type();
445         }
446         auto maybe_dict = compute::Cast(reprs, cast_target);
447         if (!maybe_dict.ok()) {
448           return Status::Invalid("Could not cast segments for partition field ",
449                                  current_field->name(), " to requested type ",
450                                  current_field->type()->ToString(),
451                                  " because: ", maybe_dict.status());
452         }
453         dict = maybe_dict.ValueOrDie().make_array();
454       } else {
455         // try casting to int32, otherwise bail and just use the string reprs
456         dict = compute::Cast(reprs, int32()).ValueOr(reprs).make_array();
457         auto type = dict->type();
458         if (options_.infer_dictionary) {
459           // wrap the inferred type in dictionary()
460           type = dictionary(int32(), std::move(type));
461         }
462         current_field = field(name, std::move(type));
463       }
464       fields[index] = std::move(current_field);
465       dictionaries_[index] = std::move(dict);
466     }
467 
468     Reset();
469     return ::arrow::schema(std::move(fields));
470   }
471 
FieldNames()472   std::vector<std::string> FieldNames() {
473     std::vector<std::string> names(name_to_index_.size());
474 
475     for (auto kv : name_to_index_) {
476       names[kv.second] = kv.first;
477     }
478     return names;
479   }
480 
Reset()481   virtual void Reset() {
482     name_to_index_.clear();
483     repr_memos_.clear();
484   }
485 
MakeMemo()486   std::unique_ptr<DictionaryMemoTable> MakeMemo() {
487     return ::arrow::internal::make_unique<DictionaryMemoTable>(default_memory_pool(),
488                                                                utf8());
489   }
490 
491   PartitioningFactoryOptions options_;
492   ArrayVector dictionaries_;
493   std::unordered_map<std::string, int> name_to_index_;
494   std::vector<std::unique_ptr<DictionaryMemoTable>> repr_memos_;
495 };
496 
497 class DirectoryPartitioningFactory : public KeyValuePartitioningFactory {
498  public:
DirectoryPartitioningFactory(std::vector<std::string> field_names,PartitioningFactoryOptions options)499   DirectoryPartitioningFactory(std::vector<std::string> field_names,
500                                PartitioningFactoryOptions options)
501       : KeyValuePartitioningFactory(options), field_names_(std::move(field_names)) {
502     Reset();
503     util::InitializeUTF8();
504   }
505 
type_name() const506   std::string type_name() const override { return "directory"; }
507 
Inspect(const std::vector<std::string> & paths)508   Result<std::shared_ptr<Schema>> Inspect(
509       const std::vector<std::string>& paths) override {
510     for (auto path : paths) {
511       size_t field_index = 0;
512       for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
513         if (field_index == field_names_.size()) break;
514 
515         switch (options_.segment_encoding) {
516           case SegmentEncoding::None: {
517             if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) {
518               return Status::Invalid("Partition segment was not valid UTF-8: ", segment);
519             }
520             RETURN_NOT_OK(InsertRepr(static_cast<int>(field_index++), segment));
521             break;
522           }
523           case SegmentEncoding::Uri: {
524             ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment));
525             RETURN_NOT_OK(InsertRepr(static_cast<int>(field_index++), decoded));
526             break;
527           }
528           default:
529             return Status::NotImplemented("Unknown segment encoding: ",
530                                           options_.segment_encoding);
531         }
532       }
533     }
534 
535     return DoInspect();
536   }
537 
Finish(const std::shared_ptr<Schema> & schema) const538   Result<std::shared_ptr<Partitioning>> Finish(
539       const std::shared_ptr<Schema>& schema) const override {
540     for (FieldRef ref : field_names_) {
541       // ensure all of field_names_ are present in schema
542       RETURN_NOT_OK(ref.FindOne(*schema).status());
543     }
544 
545     // drop fields which aren't in field_names_
546     auto out_schema = SchemaFromColumnNames(schema, field_names_);
547 
548     return std::make_shared<DirectoryPartitioning>(std::move(out_schema), dictionaries_,
549                                                    options_.AsPartitioningOptions());
550   }
551 
552  private:
Reset()553   void Reset() override {
554     KeyValuePartitioningFactory::Reset();
555 
556     for (const auto& name : field_names_) {
557       GetOrInsertField(name);
558     }
559   }
560 
561   std::vector<std::string> field_names_;
562 };
563 
564 }  // namespace
565 
MakeFactory(std::vector<std::string> field_names,PartitioningFactoryOptions options)566 std::shared_ptr<PartitioningFactory> DirectoryPartitioning::MakeFactory(
567     std::vector<std::string> field_names, PartitioningFactoryOptions options) {
568   return std::shared_ptr<PartitioningFactory>(
569       new DirectoryPartitioningFactory(std::move(field_names), options));
570 }
571 
ParseKey(const std::string & segment,const HivePartitioningOptions & options)572 Result<util::optional<KeyValuePartitioning::Key>> HivePartitioning::ParseKey(
573     const std::string& segment, const HivePartitioningOptions& options) {
574   auto name_end = string_view(segment).find_first_of('=');
575   // Not round-trippable
576   if (name_end == string_view::npos) {
577     return util::nullopt;
578   }
579 
580   // Static method, so we have no better place for it
581   util::InitializeUTF8();
582 
583   auto name = segment.substr(0, name_end);
584   std::string value;
585   switch (options.segment_encoding) {
586     case SegmentEncoding::None: {
587       value = segment.substr(name_end + 1);
588       if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(value))) {
589         return Status::Invalid("Partition segment was not valid UTF-8: ", value);
590       }
591       break;
592     }
593     case SegmentEncoding::Uri: {
594       auto raw_value = util::string_view(segment).substr(name_end + 1);
595       ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value));
596       break;
597     }
598     default:
599       return Status::NotImplemented("Unknown segment encoding: ",
600                                     options.segment_encoding);
601   }
602 
603   if (value == options.null_fallback) {
604     return Key{std::move(name), util::nullopt};
605   }
606   return Key{std::move(name), std::move(value)};
607 }
608 
ParseKeys(const std::string & path) const609 Result<std::vector<KeyValuePartitioning::Key>> HivePartitioning::ParseKeys(
610     const std::string& path) const {
611   std::vector<Key> keys;
612 
613   for (const auto& segment : fs::internal::SplitAbstractPath(path)) {
614     ARROW_ASSIGN_OR_RAISE(auto maybe_key, ParseKey(segment, hive_options_));
615     if (auto key = maybe_key) {
616       keys.push_back(std::move(*key));
617     }
618   }
619 
620   return keys;
621 }
622 
FormatValues(const ScalarVector & values) const623 Result<std::string> HivePartitioning::FormatValues(const ScalarVector& values) const {
624   std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));
625 
626   for (int i = 0; i < schema_->num_fields(); ++i) {
627     const std::string& name = schema_->field(i)->name();
628 
629     if (values[i] == nullptr) {
630       segments[i] = "";
631     } else if (!values[i]->is_valid) {
632       // If no key is available just provide a placeholder segment to maintain the
633       // field_index <-> path nesting relation
634       segments[i] = name + "=" + hive_options_.null_fallback;
635     } else {
636       segments[i] = name + "=" + values[i]->ToString();
637     }
638   }
639 
640   return fs::internal::JoinAbstractPath(std::move(segments));
641 }
642 
643 class HivePartitioningFactory : public KeyValuePartitioningFactory {
644  public:
HivePartitioningFactory(HivePartitioningFactoryOptions options)645   explicit HivePartitioningFactory(HivePartitioningFactoryOptions options)
646       : KeyValuePartitioningFactory(options), options_(std::move(options)) {}
647 
type_name() const648   std::string type_name() const override { return "hive"; }
649 
Inspect(const std::vector<std::string> & paths)650   Result<std::shared_ptr<Schema>> Inspect(
651       const std::vector<std::string>& paths) override {
652     auto options = options_.AsHivePartitioningOptions();
653     for (auto path : paths) {
654       for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
655         ARROW_ASSIGN_OR_RAISE(auto maybe_key,
656                               HivePartitioning::ParseKey(segment, options));
657         if (auto key = maybe_key) {
658           RETURN_NOT_OK(InsertRepr(key->name, key->value));
659         }
660       }
661     }
662 
663     field_names_ = FieldNames();
664     return DoInspect();
665   }
666 
Finish(const std::shared_ptr<Schema> & schema) const667   Result<std::shared_ptr<Partitioning>> Finish(
668       const std::shared_ptr<Schema>& schema) const override {
669     if (dictionaries_.empty()) {
670       return std::make_shared<HivePartitioning>(schema, dictionaries_);
671     } else {
672       for (FieldRef ref : field_names_) {
673         // ensure all of field_names_ are present in schema
674         RETURN_NOT_OK(ref.FindOne(*schema));
675       }
676 
677       // drop fields which aren't in field_names_
678       auto out_schema = SchemaFromColumnNames(schema, field_names_);
679 
680       return std::make_shared<HivePartitioning>(std::move(out_schema), dictionaries_,
681                                                 options_.AsHivePartitioningOptions());
682     }
683   }
684 
685  private:
686   const HivePartitioningFactoryOptions options_;
687   std::vector<std::string> field_names_;
688 };
689 
MakeFactory(HivePartitioningFactoryOptions options)690 std::shared_ptr<PartitioningFactory> HivePartitioning::MakeFactory(
691     HivePartitioningFactoryOptions options) {
692   return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory(options));
693 }
694 
StripPrefixAndFilename(const std::string & path,const std::string & prefix)695 std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) {
696   auto maybe_base_less = fs::internal::RemoveAncestor(prefix, path);
697   auto base_less = maybe_base_less ? std::string(*maybe_base_less) : path;
698   auto basename_filename = fs::internal::GetAbstractPathParent(base_less);
699   return basename_filename.first;
700 }
701 
StripPrefixAndFilename(const std::vector<std::string> & paths,const std::string & prefix)702 std::vector<std::string> StripPrefixAndFilename(const std::vector<std::string>& paths,
703                                                 const std::string& prefix) {
704   std::vector<std::string> result;
705   result.reserve(paths.size());
706   for (const auto& path : paths) {
707     result.emplace_back(StripPrefixAndFilename(path, prefix));
708   }
709   return result;
710 }
711 
StripPrefixAndFilename(const std::vector<fs::FileInfo> & files,const std::string & prefix)712 std::vector<std::string> StripPrefixAndFilename(const std::vector<fs::FileInfo>& files,
713                                                 const std::string& prefix) {
714   std::vector<std::string> result;
715   result.reserve(files.size());
716   for (const auto& info : files) {
717     result.emplace_back(StripPrefixAndFilename(info.path(), prefix));
718   }
719   return result;
720 }
721 
GetOrInferSchema(const std::vector<std::string> & paths)722 Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
723     const std::vector<std::string>& paths) {
724   if (auto part = partitioning()) {
725     return part->schema();
726   }
727 
728   return factory()->Inspect(paths);
729 }
730 
731 }  // namespace dataset
732 }  // namespace arrow
733