1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <cmath>
19 #include <vector>
20
21 #include "arrow/compute/api_aggregate.h"
22 #include "arrow/compute/kernels/common.h"
23 #include "arrow/stl_allocator.h"
24 #include "arrow/util/bit_run_reader.h"
25
26 namespace arrow {
27 namespace compute {
28 namespace internal {
29
30 namespace {
31
32 using arrow::internal::checked_pointer_cast;
33 using arrow::internal::VisitSetBitRunsVoid;
34
35 using QuantileState = internal::OptionsWrapper<QuantileOptions>;
36
37 // output is at some input data point, not interpolated
IsDataPoint(const QuantileOptions & options)38 bool IsDataPoint(const QuantileOptions& options) {
39 // some interpolation methods return exact data point
40 return options.interpolation == QuantileOptions::LOWER ||
41 options.interpolation == QuantileOptions::HIGHER ||
42 options.interpolation == QuantileOptions::NEAREST;
43 }
44
45 // quantile to exact datapoint index (IsDataPoint == true)
QuantileToDataPoint(size_t length,double q,enum QuantileOptions::Interpolation interpolation)46 uint64_t QuantileToDataPoint(size_t length, double q,
47 enum QuantileOptions::Interpolation interpolation) {
48 const double index = (length - 1) * q;
49 uint64_t datapoint_index = static_cast<uint64_t>(index);
50 const double fraction = index - datapoint_index;
51
52 if (interpolation == QuantileOptions::LINEAR ||
53 interpolation == QuantileOptions::MIDPOINT) {
54 DCHECK_EQ(fraction, 0);
55 }
56
57 // convert NEAREST interpolation method to LOWER or HIGHER
58 if (interpolation == QuantileOptions::NEAREST) {
59 if (fraction < 0.5) {
60 interpolation = QuantileOptions::LOWER;
61 } else if (fraction > 0.5) {
62 interpolation = QuantileOptions::HIGHER;
63 } else {
64 // round 0.5 to nearest even number, similar to numpy.around
65 interpolation =
66 (datapoint_index & 1) ? QuantileOptions::HIGHER : QuantileOptions::LOWER;
67 }
68 }
69
70 if (interpolation == QuantileOptions::HIGHER && fraction != 0) {
71 ++datapoint_index;
72 }
73
74 return datapoint_index;
75 }
76
77 // copy and nth_element approach, large memory footprint
78 template <typename InType>
79 struct SortQuantiler {
80 using CType = typename InType::c_type;
81 using Allocator = arrow::stl::allocator<CType>;
82
Execarrow::compute::internal::__anona394dc340111::SortQuantiler83 void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
84 const QuantileOptions& options = QuantileState::Get(ctx);
85
86 // copy all chunks to a buffer, ignore nulls and nans
87 std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
88
89 const Datum& datum = batch[0];
90 const int64_t in_length = datum.length() - datum.null_count();
91 if (in_length > 0) {
92 in_buffer.resize(in_length);
93
94 int64_t index = 0;
95 for (const auto& array : datum.chunks()) {
96 index += CopyArray(in_buffer.data() + index, *array);
97 }
98 DCHECK_EQ(index, in_length);
99
100 // drop nan
101 if (is_floating_type<InType>::value) {
102 const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
103 [](CType v) { return v != v; });
104 in_buffer.resize(it - in_buffer.begin());
105 }
106 }
107
108 // prepare out array
109 int64_t out_length = options.q.size();
110 if (in_buffer.empty()) {
111 out_length = 0; // input is empty or only contains null and nan, return empty array
112 }
113 // out type depends on options
114 const bool is_datapoint = IsDataPoint(options);
115 const std::shared_ptr<DataType> out_type =
116 is_datapoint ? TypeTraits<InType>::type_singleton() : float64();
117 auto out_data = ArrayData::Make(out_type, out_length, 0);
118 out_data->buffers.resize(2, nullptr);
119
120 // calculate quantiles
121 if (out_length > 0) {
122 const auto out_bit_width = checked_pointer_cast<NumberType>(out_type)->bit_width();
123 KERNEL_ASSIGN_OR_RAISE(out_data->buffers[1], ctx,
124 ctx->Allocate(out_length * out_bit_width / 8));
125
126 // find quantiles in descending order
127 std::vector<int64_t> q_indices(out_length);
128 std::iota(q_indices.begin(), q_indices.end(), 0);
129 std::sort(q_indices.begin(), q_indices.end(),
130 [&options](int64_t left_index, int64_t right_index) {
131 return options.q[right_index] < options.q[left_index];
132 });
133
134 // input array is partitioned around data point at `last_index` (pivot)
135 // for next quatile which is smaller, we only consider inputs left of the pivot
136 uint64_t last_index = in_buffer.size();
137 if (is_datapoint) {
138 CType* out_buffer = out_data->template GetMutableValues<CType>(1);
139 for (int64_t i = 0; i < out_length; ++i) {
140 const int64_t q_index = q_indices[i];
141 out_buffer[q_index] = GetQuantileAtDataPoint(
142 in_buffer, &last_index, options.q[q_index], options.interpolation);
143 }
144 } else {
145 double* out_buffer = out_data->template GetMutableValues<double>(1);
146 for (int64_t i = 0; i < out_length; ++i) {
147 const int64_t q_index = q_indices[i];
148 out_buffer[q_index] = GetQuantileByInterp(
149 in_buffer, &last_index, options.q[q_index], options.interpolation);
150 }
151 }
152 }
153
154 *out = Datum(std::move(out_data));
155 }
156
CopyArrayarrow::compute::internal::__anona394dc340111::SortQuantiler157 int64_t CopyArray(CType* buffer, const Array& array) {
158 const int64_t n = array.length() - array.null_count();
159 if (n > 0) {
160 int64_t index = 0;
161 const ArrayData& data = *array.data();
162 const CType* values = data.GetValues<CType>(1);
163 VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
164 [&](int64_t pos, int64_t len) {
165 memcpy(buffer + index, values + pos, len * sizeof(CType));
166 index += len;
167 });
168 DCHECK_EQ(index, n);
169 }
170 return n;
171 }
172
173 // return quantile located exactly at some input data point
GetQuantileAtDataPointarrow::compute::internal::__anona394dc340111::SortQuantiler174 CType GetQuantileAtDataPoint(std::vector<CType, Allocator>& in, uint64_t* last_index,
175 double q,
176 enum QuantileOptions::Interpolation interpolation) {
177 const uint64_t datapoint_index = QuantileToDataPoint(in.size(), q, interpolation);
178
179 if (datapoint_index != *last_index) {
180 DCHECK_LT(datapoint_index, *last_index);
181 std::nth_element(in.begin(), in.begin() + datapoint_index,
182 in.begin() + *last_index);
183 *last_index = datapoint_index;
184 }
185
186 return in[datapoint_index];
187 }
188
189 // return quantile interpolated from adjacent input data points
GetQuantileByInterparrow::compute::internal::__anona394dc340111::SortQuantiler190 double GetQuantileByInterp(std::vector<CType, Allocator>& in, uint64_t* last_index,
191 double q,
192 enum QuantileOptions::Interpolation interpolation) {
193 const double index = (in.size() - 1) * q;
194 const uint64_t lower_index = static_cast<uint64_t>(index);
195 const double fraction = index - lower_index;
196
197 if (lower_index != *last_index) {
198 DCHECK_LT(lower_index, *last_index);
199 std::nth_element(in.begin(), in.begin() + lower_index, in.begin() + *last_index);
200 }
201
202 const double lower_value = static_cast<double>(in[lower_index]);
203 if (fraction == 0) {
204 *last_index = lower_index;
205 return lower_value;
206 }
207
208 const uint64_t higher_index = lower_index + 1;
209 DCHECK_LT(higher_index, in.size());
210 if (lower_index != *last_index && higher_index != *last_index) {
211 DCHECK_LT(higher_index, *last_index);
212 // higher value must be the minimal value after lower_index
213 auto min = std::min_element(in.begin() + higher_index, in.begin() + *last_index);
214 std::iter_swap(in.begin() + higher_index, min);
215 }
216 *last_index = lower_index;
217
218 const double higher_value = static_cast<double>(in[higher_index]);
219
220 if (interpolation == QuantileOptions::LINEAR) {
221 // more stable than naive linear interpolation
222 return fraction * higher_value + (1 - fraction) * lower_value;
223 } else if (interpolation == QuantileOptions::MIDPOINT) {
224 return lower_value / 2 + higher_value / 2;
225 } else {
226 DCHECK(false);
227 return NAN;
228 }
229 }
230 };
231
232 // histogram approach with constant memory, only for integers within limited value range
233 template <typename InType>
234 struct CountQuantiler {
235 using CType = typename InType::c_type;
236
237 CType min;
238 std::vector<uint64_t> counts; // counts[i]: # of values equals i + min
239
240 // indices to adjacent non-empty bins covering current quantile
241 struct AdjacentBins {
242 int left_index;
243 int right_index;
244 uint64_t total_count; // accumulated counts till left_index (inclusive)
245 };
246
CountQuantilerarrow::compute::internal::__anona394dc340111::CountQuantiler247 CountQuantiler(CType min, CType max) {
248 uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
249 DCHECK_LT(value_range, 1 << 30);
250 this->min = min;
251 this->counts.resize(value_range);
252 }
253
Execarrow::compute::internal::__anona394dc340111::CountQuantiler254 void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
255 const QuantileOptions& options = QuantileState::Get(ctx);
256
257 // count values in all chunks, ignore nulls
258 const Datum& datum = batch[0];
259 const int64_t in_length = datum.length() - datum.null_count();
260 if (in_length > 0) {
261 for (auto& c : this->counts) c = 0;
262 for (const auto& array : datum.chunks()) {
263 const ArrayData& data = *array->data();
264 const CType* values = data.GetValues<CType>(1);
265 VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
266 [&](int64_t pos, int64_t len) {
267 for (int64_t i = 0; i < len; ++i) {
268 ++this->counts[values[pos + i] - this->min];
269 }
270 });
271 }
272 }
273
274 // prepare out array
275 int64_t out_length = options.q.size();
276 if (in_length == 0) {
277 out_length = 0; // input is empty or only contains null, return empty array
278 }
279 // out type depends on options
280 const bool is_datapoint = IsDataPoint(options);
281 const std::shared_ptr<DataType> out_type =
282 is_datapoint ? TypeTraits<InType>::type_singleton() : float64();
283 auto out_data = ArrayData::Make(out_type, out_length, 0);
284 out_data->buffers.resize(2, nullptr);
285
286 // calculate quantiles
287 if (out_length > 0) {
288 const auto out_bit_width = checked_pointer_cast<NumberType>(out_type)->bit_width();
289 KERNEL_ASSIGN_OR_RAISE(out_data->buffers[1], ctx,
290 ctx->Allocate(out_length * out_bit_width / 8));
291
292 // find quantiles in ascending order
293 std::vector<int64_t> q_indices(out_length);
294 std::iota(q_indices.begin(), q_indices.end(), 0);
295 std::sort(q_indices.begin(), q_indices.end(),
296 [&options](int64_t left_index, int64_t right_index) {
297 return options.q[left_index] < options.q[right_index];
298 });
299
300 AdjacentBins bins{0, 0, this->counts[0]};
301 if (is_datapoint) {
302 CType* out_buffer = out_data->template GetMutableValues<CType>(1);
303 for (int64_t i = 0; i < out_length; ++i) {
304 const int64_t q_index = q_indices[i];
305 out_buffer[q_index] = GetQuantileAtDataPoint(
306 in_length, &bins, options.q[q_index], options.interpolation);
307 }
308 } else {
309 double* out_buffer = out_data->template GetMutableValues<double>(1);
310 for (int64_t i = 0; i < out_length; ++i) {
311 const int64_t q_index = q_indices[i];
312 out_buffer[q_index] = GetQuantileByInterp(in_length, &bins, options.q[q_index],
313 options.interpolation);
314 }
315 }
316 }
317
318 *out = Datum(std::move(out_data));
319 }
320
321 // return quantile located exactly at some input data point
GetQuantileAtDataPointarrow::compute::internal::__anona394dc340111::CountQuantiler322 CType GetQuantileAtDataPoint(int64_t in_length, AdjacentBins* bins, double q,
323 enum QuantileOptions::Interpolation interpolation) {
324 const uint64_t datapoint_index = QuantileToDataPoint(in_length, q, interpolation);
325 while (datapoint_index >= bins->total_count &&
326 static_cast<size_t>(bins->left_index) < this->counts.size() - 1) {
327 ++bins->left_index;
328 bins->total_count += this->counts[bins->left_index];
329 }
330 DCHECK_LT(datapoint_index, bins->total_count);
331 return static_cast<CType>(bins->left_index + this->min);
332 }
333
334 // return quantile interpolated from adjacent input data points
GetQuantileByInterparrow::compute::internal::__anona394dc340111::CountQuantiler335 double GetQuantileByInterp(int64_t in_length, AdjacentBins* bins, double q,
336 enum QuantileOptions::Interpolation interpolation) {
337 const double index = (in_length - 1) * q;
338 const uint64_t index_floor = static_cast<uint64_t>(index);
339 const double fraction = index - index_floor;
340
341 while (index_floor >= bins->total_count &&
342 static_cast<size_t>(bins->left_index) < this->counts.size() - 1) {
343 ++bins->left_index;
344 bins->total_count += this->counts[bins->left_index];
345 }
346 DCHECK_LT(index_floor, bins->total_count);
347 const double lower_value = static_cast<double>(bins->left_index + this->min);
348
349 // quantile lies in this bin, no interpolation needed
350 if (index <= bins->total_count - 1) {
351 return lower_value;
352 }
353
354 // quantile lies across two bins, locate next bin if not already done
355 DCHECK_EQ(index_floor, bins->total_count - 1);
356 if (bins->right_index <= bins->left_index) {
357 bins->right_index = bins->left_index + 1;
358 while (static_cast<size_t>(bins->right_index) < this->counts.size() - 1 &&
359 this->counts[bins->right_index] == 0) {
360 ++bins->right_index;
361 }
362 }
363 DCHECK_LT(static_cast<size_t>(bins->right_index), this->counts.size());
364 DCHECK_GT(this->counts[bins->right_index], 0);
365 const double higher_value = static_cast<double>(bins->right_index + this->min);
366
367 if (interpolation == QuantileOptions::LINEAR) {
368 return fraction * higher_value + (1 - fraction) * lower_value;
369 } else if (interpolation == QuantileOptions::MIDPOINT) {
370 return lower_value / 2 + higher_value / 2;
371 } else {
372 DCHECK(false);
373 return NAN;
374 }
375 }
376 };
377
378 // histogram or 'copy & nth_element' approach per value range and size, only for integers
379 template <typename InType>
380 struct CountOrSortQuantiler {
381 using CType = typename InType::c_type;
382
Execarrow::compute::internal::__anona394dc340111::CountOrSortQuantiler383 void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
384 // cross point to benefit from histogram approach
385 // parameters estimated from ad-hoc benchmarks manually
386 static constexpr int kMinArraySize = 65536;
387 static constexpr int kMaxValueRange = 65536;
388
389 const Datum& datum = batch[0];
390 if (datum.length() - datum.null_count() >= kMinArraySize) {
391 CType min = std::numeric_limits<CType>::max();
392 CType max = std::numeric_limits<CType>::min();
393
394 for (const auto& array : datum.chunks()) {
395 const ArrayData& data = *array->data();
396 const CType* values = data.GetValues<CType>(1);
397 VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
398 [&](int64_t pos, int64_t len) {
399 for (int64_t i = 0; i < len; ++i) {
400 min = std::min(min, values[pos + i]);
401 max = std::max(max, values[pos + i]);
402 }
403 });
404 }
405
406 if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
407 CountQuantiler<InType>(min, max).Exec(ctx, batch, out);
408 return;
409 }
410 }
411
412 SortQuantiler<InType>().Exec(ctx, batch, out);
413 }
414 };
415
416 template <typename InType, typename Enable = void>
417 struct ExactQuantiler;
418
419 template <>
420 struct ExactQuantiler<UInt8Type> {
421 CountQuantiler<UInt8Type> impl;
ExactQuantilerarrow::compute::internal::__anona394dc340111::ExactQuantiler422 ExactQuantiler() : impl(0, 255) {}
423 };
424
425 template <>
426 struct ExactQuantiler<Int8Type> {
427 CountQuantiler<Int8Type> impl;
ExactQuantilerarrow::compute::internal::__anona394dc340111::ExactQuantiler428 ExactQuantiler() : impl(-128, 127) {}
429 };
430
431 template <typename InType>
432 struct ExactQuantiler<InType, enable_if_t<(is_integer_type<InType>::value &&
433 (sizeof(typename InType::c_type) > 1))>> {
434 CountOrSortQuantiler<InType> impl;
435 };
436
437 template <typename InType>
438 struct ExactQuantiler<InType, enable_if_t<is_floating_type<InType>::value>> {
439 SortQuantiler<InType> impl;
440 };
441
442 template <typename _, typename InType>
443 struct QuantileExecutor {
Execarrow::compute::internal::__anona394dc340111::QuantileExecutor444 static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
445 if (ctx->state() == nullptr) {
446 ctx->SetStatus(Status::Invalid("Quantile requires QuantileOptions"));
447 return;
448 }
449
450 const QuantileOptions& options = QuantileState::Get(ctx);
451 if (options.q.empty()) {
452 ctx->SetStatus(Status::Invalid("Requires quantile argument"));
453 return;
454 }
455 for (double q : options.q) {
456 if (q < 0 || q > 1) {
457 ctx->SetStatus(Status::Invalid("Quantile must be between 0 and 1"));
458 return;
459 }
460 }
461
462 ExactQuantiler<InType>().impl.Exec(ctx, batch, out);
463 }
464 };
465
ResolveOutput(KernelContext * ctx,const std::vector<ValueDescr> & args)466 Result<ValueDescr> ResolveOutput(KernelContext* ctx,
467 const std::vector<ValueDescr>& args) {
468 const QuantileOptions& options = QuantileState::Get(ctx);
469 if (IsDataPoint(options)) {
470 return ValueDescr::Array(args[0].type);
471 } else {
472 return ValueDescr::Array(float64());
473 }
474 }
475
AddQuantileKernels(VectorFunction * func)476 void AddQuantileKernels(VectorFunction* func) {
477 VectorKernel base;
478 base.init = QuantileState::Init;
479 base.can_execute_chunkwise = false;
480 base.output_chunked = false;
481
482 for (const auto& ty : NumericTypes()) {
483 base.signature =
484 KernelSignature::Make({InputType::Array(ty)}, OutputType(ResolveOutput));
485 // output type is determined at runtime, set template argument to nulltype
486 base.exec = GenerateNumeric<QuantileExecutor, NullType>(*ty);
487 DCHECK_OK(func->AddKernel(base));
488 }
489 }
490
491 const FunctionDoc quantile_doc{
492 "Compute an array of quantiles of a numeric array or chunked array",
493 ("By default, 0.5 quantile (median) is returned.\n"
494 "If quantile lies between two data points, an interpolated value is\n"
495 "returned based on selected interpolation method.\n"
496 "Nulls and NaNs are ignored.\n"
497 "An empty array is returned if there is no valid data point."),
498 {"array"},
499 "QuantileOptions"};
500
501 } // namespace
502
RegisterScalarAggregateQuantile(FunctionRegistry * registry)503 void RegisterScalarAggregateQuantile(FunctionRegistry* registry) {
504 static QuantileOptions default_options;
505 auto func = std::make_shared<VectorFunction>("quantile", Arity::Unary(), &quantile_doc,
506 &default_options);
507 AddQuantileKernels(func.get());
508 DCHECK_OK(registry->AddFunction(std::move(func)));
509 }
510
511 } // namespace internal
512 } // namespace compute
513 } // namespace arrow
514