1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <cmath>
19 #include <vector>
20 
21 #include "arrow/compute/api_aggregate.h"
22 #include "arrow/compute/kernels/common.h"
23 #include "arrow/stl_allocator.h"
24 #include "arrow/util/bit_run_reader.h"
25 
26 namespace arrow {
27 namespace compute {
28 namespace internal {
29 
30 namespace {
31 
32 using arrow::internal::checked_pointer_cast;
33 using arrow::internal::VisitSetBitRunsVoid;
34 
35 using QuantileState = internal::OptionsWrapper<QuantileOptions>;
36 
37 // output is at some input data point, not interpolated
IsDataPoint(const QuantileOptions & options)38 bool IsDataPoint(const QuantileOptions& options) {
39   // some interpolation methods return exact data point
40   return options.interpolation == QuantileOptions::LOWER ||
41          options.interpolation == QuantileOptions::HIGHER ||
42          options.interpolation == QuantileOptions::NEAREST;
43 }
44 
45 // quantile to exact datapoint index (IsDataPoint == true)
QuantileToDataPoint(size_t length,double q,enum QuantileOptions::Interpolation interpolation)46 uint64_t QuantileToDataPoint(size_t length, double q,
47                              enum QuantileOptions::Interpolation interpolation) {
48   const double index = (length - 1) * q;
49   uint64_t datapoint_index = static_cast<uint64_t>(index);
50   const double fraction = index - datapoint_index;
51 
52   if (interpolation == QuantileOptions::LINEAR ||
53       interpolation == QuantileOptions::MIDPOINT) {
54     DCHECK_EQ(fraction, 0);
55   }
56 
57   // convert NEAREST interpolation method to LOWER or HIGHER
58   if (interpolation == QuantileOptions::NEAREST) {
59     if (fraction < 0.5) {
60       interpolation = QuantileOptions::LOWER;
61     } else if (fraction > 0.5) {
62       interpolation = QuantileOptions::HIGHER;
63     } else {
64       // round 0.5 to nearest even number, similar to numpy.around
65       interpolation =
66           (datapoint_index & 1) ? QuantileOptions::HIGHER : QuantileOptions::LOWER;
67     }
68   }
69 
70   if (interpolation == QuantileOptions::HIGHER && fraction != 0) {
71     ++datapoint_index;
72   }
73 
74   return datapoint_index;
75 }
76 
77 // copy and nth_element approach, large memory footprint
78 template <typename InType>
79 struct SortQuantiler {
80   using CType = typename InType::c_type;
81   using Allocator = arrow::stl::allocator<CType>;
82 
Execarrow::compute::internal::__anona394dc340111::SortQuantiler83   void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
84     const QuantileOptions& options = QuantileState::Get(ctx);
85 
86     // copy all chunks to a buffer, ignore nulls and nans
87     std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
88 
89     const Datum& datum = batch[0];
90     const int64_t in_length = datum.length() - datum.null_count();
91     if (in_length > 0) {
92       in_buffer.resize(in_length);
93 
94       int64_t index = 0;
95       for (const auto& array : datum.chunks()) {
96         index += CopyArray(in_buffer.data() + index, *array);
97       }
98       DCHECK_EQ(index, in_length);
99 
100       // drop nan
101       if (is_floating_type<InType>::value) {
102         const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
103                                         [](CType v) { return v != v; });
104         in_buffer.resize(it - in_buffer.begin());
105       }
106     }
107 
108     // prepare out array
109     int64_t out_length = options.q.size();
110     if (in_buffer.empty()) {
111       out_length = 0;  // input is empty or only contains null and nan, return empty array
112     }
113     // out type depends on options
114     const bool is_datapoint = IsDataPoint(options);
115     const std::shared_ptr<DataType> out_type =
116         is_datapoint ? TypeTraits<InType>::type_singleton() : float64();
117     auto out_data = ArrayData::Make(out_type, out_length, 0);
118     out_data->buffers.resize(2, nullptr);
119 
120     // calculate quantiles
121     if (out_length > 0) {
122       const auto out_bit_width = checked_pointer_cast<NumberType>(out_type)->bit_width();
123       KERNEL_ASSIGN_OR_RAISE(out_data->buffers[1], ctx,
124                              ctx->Allocate(out_length * out_bit_width / 8));
125 
126       // find quantiles in descending order
127       std::vector<int64_t> q_indices(out_length);
128       std::iota(q_indices.begin(), q_indices.end(), 0);
129       std::sort(q_indices.begin(), q_indices.end(),
130                 [&options](int64_t left_index, int64_t right_index) {
131                   return options.q[right_index] < options.q[left_index];
132                 });
133 
134       // input array is partitioned around data point at `last_index` (pivot)
135       // for next quatile which is smaller, we only consider inputs left of the pivot
136       uint64_t last_index = in_buffer.size();
137       if (is_datapoint) {
138         CType* out_buffer = out_data->template GetMutableValues<CType>(1);
139         for (int64_t i = 0; i < out_length; ++i) {
140           const int64_t q_index = q_indices[i];
141           out_buffer[q_index] = GetQuantileAtDataPoint(
142               in_buffer, &last_index, options.q[q_index], options.interpolation);
143         }
144       } else {
145         double* out_buffer = out_data->template GetMutableValues<double>(1);
146         for (int64_t i = 0; i < out_length; ++i) {
147           const int64_t q_index = q_indices[i];
148           out_buffer[q_index] = GetQuantileByInterp(
149               in_buffer, &last_index, options.q[q_index], options.interpolation);
150         }
151       }
152     }
153 
154     *out = Datum(std::move(out_data));
155   }
156 
CopyArrayarrow::compute::internal::__anona394dc340111::SortQuantiler157   int64_t CopyArray(CType* buffer, const Array& array) {
158     const int64_t n = array.length() - array.null_count();
159     if (n > 0) {
160       int64_t index = 0;
161       const ArrayData& data = *array.data();
162       const CType* values = data.GetValues<CType>(1);
163       VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
164                           [&](int64_t pos, int64_t len) {
165                             memcpy(buffer + index, values + pos, len * sizeof(CType));
166                             index += len;
167                           });
168       DCHECK_EQ(index, n);
169     }
170     return n;
171   }
172 
173   // return quantile located exactly at some input data point
GetQuantileAtDataPointarrow::compute::internal::__anona394dc340111::SortQuantiler174   CType GetQuantileAtDataPoint(std::vector<CType, Allocator>& in, uint64_t* last_index,
175                                double q,
176                                enum QuantileOptions::Interpolation interpolation) {
177     const uint64_t datapoint_index = QuantileToDataPoint(in.size(), q, interpolation);
178 
179     if (datapoint_index != *last_index) {
180       DCHECK_LT(datapoint_index, *last_index);
181       std::nth_element(in.begin(), in.begin() + datapoint_index,
182                        in.begin() + *last_index);
183       *last_index = datapoint_index;
184     }
185 
186     return in[datapoint_index];
187   }
188 
189   // return quantile interpolated from adjacent input data points
GetQuantileByInterparrow::compute::internal::__anona394dc340111::SortQuantiler190   double GetQuantileByInterp(std::vector<CType, Allocator>& in, uint64_t* last_index,
191                              double q,
192                              enum QuantileOptions::Interpolation interpolation) {
193     const double index = (in.size() - 1) * q;
194     const uint64_t lower_index = static_cast<uint64_t>(index);
195     const double fraction = index - lower_index;
196 
197     if (lower_index != *last_index) {
198       DCHECK_LT(lower_index, *last_index);
199       std::nth_element(in.begin(), in.begin() + lower_index, in.begin() + *last_index);
200     }
201 
202     const double lower_value = static_cast<double>(in[lower_index]);
203     if (fraction == 0) {
204       *last_index = lower_index;
205       return lower_value;
206     }
207 
208     const uint64_t higher_index = lower_index + 1;
209     DCHECK_LT(higher_index, in.size());
210     if (lower_index != *last_index && higher_index != *last_index) {
211       DCHECK_LT(higher_index, *last_index);
212       // higher value must be the minimal value after lower_index
213       auto min = std::min_element(in.begin() + higher_index, in.begin() + *last_index);
214       std::iter_swap(in.begin() + higher_index, min);
215     }
216     *last_index = lower_index;
217 
218     const double higher_value = static_cast<double>(in[higher_index]);
219 
220     if (interpolation == QuantileOptions::LINEAR) {
221       // more stable than naive linear interpolation
222       return fraction * higher_value + (1 - fraction) * lower_value;
223     } else if (interpolation == QuantileOptions::MIDPOINT) {
224       return lower_value / 2 + higher_value / 2;
225     } else {
226       DCHECK(false);
227       return NAN;
228     }
229   }
230 };
231 
232 // histogram approach with constant memory, only for integers within limited value range
233 template <typename InType>
234 struct CountQuantiler {
235   using CType = typename InType::c_type;
236 
237   CType min;
238   std::vector<uint64_t> counts;  // counts[i]: # of values equals i + min
239 
240   // indices to adjacent non-empty bins covering current quantile
241   struct AdjacentBins {
242     int left_index;
243     int right_index;
244     uint64_t total_count;  // accumulated counts till left_index (inclusive)
245   };
246 
CountQuantilerarrow::compute::internal::__anona394dc340111::CountQuantiler247   CountQuantiler(CType min, CType max) {
248     uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
249     DCHECK_LT(value_range, 1 << 30);
250     this->min = min;
251     this->counts.resize(value_range);
252   }
253 
Execarrow::compute::internal::__anona394dc340111::CountQuantiler254   void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
255     const QuantileOptions& options = QuantileState::Get(ctx);
256 
257     // count values in all chunks, ignore nulls
258     const Datum& datum = batch[0];
259     const int64_t in_length = datum.length() - datum.null_count();
260     if (in_length > 0) {
261       for (auto& c : this->counts) c = 0;
262       for (const auto& array : datum.chunks()) {
263         const ArrayData& data = *array->data();
264         const CType* values = data.GetValues<CType>(1);
265         VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
266                             [&](int64_t pos, int64_t len) {
267                               for (int64_t i = 0; i < len; ++i) {
268                                 ++this->counts[values[pos + i] - this->min];
269                               }
270                             });
271       }
272     }
273 
274     // prepare out array
275     int64_t out_length = options.q.size();
276     if (in_length == 0) {
277       out_length = 0;  // input is empty or only contains null, return empty array
278     }
279     // out type depends on options
280     const bool is_datapoint = IsDataPoint(options);
281     const std::shared_ptr<DataType> out_type =
282         is_datapoint ? TypeTraits<InType>::type_singleton() : float64();
283     auto out_data = ArrayData::Make(out_type, out_length, 0);
284     out_data->buffers.resize(2, nullptr);
285 
286     // calculate quantiles
287     if (out_length > 0) {
288       const auto out_bit_width = checked_pointer_cast<NumberType>(out_type)->bit_width();
289       KERNEL_ASSIGN_OR_RAISE(out_data->buffers[1], ctx,
290                              ctx->Allocate(out_length * out_bit_width / 8));
291 
292       // find quantiles in ascending order
293       std::vector<int64_t> q_indices(out_length);
294       std::iota(q_indices.begin(), q_indices.end(), 0);
295       std::sort(q_indices.begin(), q_indices.end(),
296                 [&options](int64_t left_index, int64_t right_index) {
297                   return options.q[left_index] < options.q[right_index];
298                 });
299 
300       AdjacentBins bins{0, 0, this->counts[0]};
301       if (is_datapoint) {
302         CType* out_buffer = out_data->template GetMutableValues<CType>(1);
303         for (int64_t i = 0; i < out_length; ++i) {
304           const int64_t q_index = q_indices[i];
305           out_buffer[q_index] = GetQuantileAtDataPoint(
306               in_length, &bins, options.q[q_index], options.interpolation);
307         }
308       } else {
309         double* out_buffer = out_data->template GetMutableValues<double>(1);
310         for (int64_t i = 0; i < out_length; ++i) {
311           const int64_t q_index = q_indices[i];
312           out_buffer[q_index] = GetQuantileByInterp(in_length, &bins, options.q[q_index],
313                                                     options.interpolation);
314         }
315       }
316     }
317 
318     *out = Datum(std::move(out_data));
319   }
320 
321   // return quantile located exactly at some input data point
GetQuantileAtDataPointarrow::compute::internal::__anona394dc340111::CountQuantiler322   CType GetQuantileAtDataPoint(int64_t in_length, AdjacentBins* bins, double q,
323                                enum QuantileOptions::Interpolation interpolation) {
324     const uint64_t datapoint_index = QuantileToDataPoint(in_length, q, interpolation);
325     while (datapoint_index >= bins->total_count &&
326            static_cast<size_t>(bins->left_index) < this->counts.size() - 1) {
327       ++bins->left_index;
328       bins->total_count += this->counts[bins->left_index];
329     }
330     DCHECK_LT(datapoint_index, bins->total_count);
331     return static_cast<CType>(bins->left_index + this->min);
332   }
333 
334   // return quantile interpolated from adjacent input data points
GetQuantileByInterparrow::compute::internal::__anona394dc340111::CountQuantiler335   double GetQuantileByInterp(int64_t in_length, AdjacentBins* bins, double q,
336                              enum QuantileOptions::Interpolation interpolation) {
337     const double index = (in_length - 1) * q;
338     const uint64_t index_floor = static_cast<uint64_t>(index);
339     const double fraction = index - index_floor;
340 
341     while (index_floor >= bins->total_count &&
342            static_cast<size_t>(bins->left_index) < this->counts.size() - 1) {
343       ++bins->left_index;
344       bins->total_count += this->counts[bins->left_index];
345     }
346     DCHECK_LT(index_floor, bins->total_count);
347     const double lower_value = static_cast<double>(bins->left_index + this->min);
348 
349     // quantile lies in this bin, no interpolation needed
350     if (index <= bins->total_count - 1) {
351       return lower_value;
352     }
353 
354     // quantile lies across two bins, locate next bin if not already done
355     DCHECK_EQ(index_floor, bins->total_count - 1);
356     if (bins->right_index <= bins->left_index) {
357       bins->right_index = bins->left_index + 1;
358       while (static_cast<size_t>(bins->right_index) < this->counts.size() - 1 &&
359              this->counts[bins->right_index] == 0) {
360         ++bins->right_index;
361       }
362     }
363     DCHECK_LT(static_cast<size_t>(bins->right_index), this->counts.size());
364     DCHECK_GT(this->counts[bins->right_index], 0);
365     const double higher_value = static_cast<double>(bins->right_index + this->min);
366 
367     if (interpolation == QuantileOptions::LINEAR) {
368       return fraction * higher_value + (1 - fraction) * lower_value;
369     } else if (interpolation == QuantileOptions::MIDPOINT) {
370       return lower_value / 2 + higher_value / 2;
371     } else {
372       DCHECK(false);
373       return NAN;
374     }
375   }
376 };
377 
378 // histogram or 'copy & nth_element' approach per value range and size, only for integers
379 template <typename InType>
380 struct CountOrSortQuantiler {
381   using CType = typename InType::c_type;
382 
Execarrow::compute::internal::__anona394dc340111::CountOrSortQuantiler383   void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
384     // cross point to benefit from histogram approach
385     // parameters estimated from ad-hoc benchmarks manually
386     static constexpr int kMinArraySize = 65536;
387     static constexpr int kMaxValueRange = 65536;
388 
389     const Datum& datum = batch[0];
390     if (datum.length() - datum.null_count() >= kMinArraySize) {
391       CType min = std::numeric_limits<CType>::max();
392       CType max = std::numeric_limits<CType>::min();
393 
394       for (const auto& array : datum.chunks()) {
395         const ArrayData& data = *array->data();
396         const CType* values = data.GetValues<CType>(1);
397         VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
398                             [&](int64_t pos, int64_t len) {
399                               for (int64_t i = 0; i < len; ++i) {
400                                 min = std::min(min, values[pos + i]);
401                                 max = std::max(max, values[pos + i]);
402                               }
403                             });
404       }
405 
406       if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
407         CountQuantiler<InType>(min, max).Exec(ctx, batch, out);
408         return;
409       }
410     }
411 
412     SortQuantiler<InType>().Exec(ctx, batch, out);
413   }
414 };
415 
416 template <typename InType, typename Enable = void>
417 struct ExactQuantiler;
418 
419 template <>
420 struct ExactQuantiler<UInt8Type> {
421   CountQuantiler<UInt8Type> impl;
ExactQuantilerarrow::compute::internal::__anona394dc340111::ExactQuantiler422   ExactQuantiler() : impl(0, 255) {}
423 };
424 
425 template <>
426 struct ExactQuantiler<Int8Type> {
427   CountQuantiler<Int8Type> impl;
ExactQuantilerarrow::compute::internal::__anona394dc340111::ExactQuantiler428   ExactQuantiler() : impl(-128, 127) {}
429 };
430 
431 template <typename InType>
432 struct ExactQuantiler<InType, enable_if_t<(is_integer_type<InType>::value &&
433                                            (sizeof(typename InType::c_type) > 1))>> {
434   CountOrSortQuantiler<InType> impl;
435 };
436 
437 template <typename InType>
438 struct ExactQuantiler<InType, enable_if_t<is_floating_type<InType>::value>> {
439   SortQuantiler<InType> impl;
440 };
441 
442 template <typename _, typename InType>
443 struct QuantileExecutor {
Execarrow::compute::internal::__anona394dc340111::QuantileExecutor444   static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
445     if (ctx->state() == nullptr) {
446       ctx->SetStatus(Status::Invalid("Quantile requires QuantileOptions"));
447       return;
448     }
449 
450     const QuantileOptions& options = QuantileState::Get(ctx);
451     if (options.q.empty()) {
452       ctx->SetStatus(Status::Invalid("Requires quantile argument"));
453       return;
454     }
455     for (double q : options.q) {
456       if (q < 0 || q > 1) {
457         ctx->SetStatus(Status::Invalid("Quantile must be between 0 and 1"));
458         return;
459       }
460     }
461 
462     ExactQuantiler<InType>().impl.Exec(ctx, batch, out);
463   }
464 };
465 
ResolveOutput(KernelContext * ctx,const std::vector<ValueDescr> & args)466 Result<ValueDescr> ResolveOutput(KernelContext* ctx,
467                                  const std::vector<ValueDescr>& args) {
468   const QuantileOptions& options = QuantileState::Get(ctx);
469   if (IsDataPoint(options)) {
470     return ValueDescr::Array(args[0].type);
471   } else {
472     return ValueDescr::Array(float64());
473   }
474 }
475 
AddQuantileKernels(VectorFunction * func)476 void AddQuantileKernels(VectorFunction* func) {
477   VectorKernel base;
478   base.init = QuantileState::Init;
479   base.can_execute_chunkwise = false;
480   base.output_chunked = false;
481 
482   for (const auto& ty : NumericTypes()) {
483     base.signature =
484         KernelSignature::Make({InputType::Array(ty)}, OutputType(ResolveOutput));
485     // output type is determined at runtime, set template argument to nulltype
486     base.exec = GenerateNumeric<QuantileExecutor, NullType>(*ty);
487     DCHECK_OK(func->AddKernel(base));
488   }
489 }
490 
491 const FunctionDoc quantile_doc{
492     "Compute an array of quantiles of a numeric array or chunked array",
493     ("By default, 0.5 quantile (median) is returned.\n"
494      "If quantile lies between two data points, an interpolated value is\n"
495      "returned based on selected interpolation method.\n"
496      "Nulls and NaNs are ignored.\n"
497      "An empty array is returned if there is no valid data point."),
498     {"array"},
499     "QuantileOptions"};
500 
501 }  // namespace
502 
RegisterScalarAggregateQuantile(FunctionRegistry * registry)503 void RegisterScalarAggregateQuantile(FunctionRegistry* registry) {
504   static QuantileOptions default_options;
505   auto func = std::make_shared<VectorFunction>("quantile", Arity::Unary(), &quantile_doc,
506                                                &default_options);
507   AddQuantileKernels(func.get());
508   DCHECK_OK(registry->AddFunction(std::move(func)));
509 }
510 
511 }  // namespace internal
512 }  // namespace compute
513 }  // namespace arrow
514