1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <algorithm>
19 #include <cstring>
20
21 #include "arrow/compute/kernels/common.h"
22 #include "arrow/scalar.h"
23 #include "arrow/util/bit_block_counter.h"
24 #include "arrow/util/bit_util.h"
25 #include "arrow/util/bitmap_ops.h"
26
27 namespace arrow {
28
29 using internal::BitBlockCount;
30 using internal::BitBlockCounter;
31
32 namespace compute {
33 namespace internal {
34
35 namespace {
36
37 template <typename Type, typename Enable = void>
38 struct FillNullFunctor {};
39
40 // Numeric inputs
41
42 template <typename Type>
43 struct FillNullFunctor<Type, enable_if_t<is_number_type<Type>::value>> {
44 using T = typename TypeTraits<Type>::CType;
45
Execarrow::compute::internal::__anon03f87fc10111::FillNullFunctor46 static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
47 const ArrayData& data = *batch[0].array();
48 const Scalar& fill_value = *batch[1].scalar();
49 ArrayData* output = out->mutable_array();
50
51 // Ensure the kernel is configured properly to have no validity bitmap /
52 // null count 0 unless we explicitly propagate it below.
53 DCHECK(output->buffers[0] == nullptr);
54
55 T value = UnboxScalar<Type>::Unbox(fill_value);
56 if (data.MayHaveNulls() != 0 && fill_value.is_valid) {
57 KERNEL_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> out_buf, ctx,
58 ctx->Allocate(data.length * sizeof(T)));
59
60 const uint8_t* is_valid = data.buffers[0]->data();
61 const T* in_values = data.GetValues<T>(1);
62 T* out_values = reinterpret_cast<T*>(out_buf->mutable_data());
63 int64_t offset = data.offset;
64 BitBlockCounter bit_counter(is_valid, data.offset, data.length);
65 while (offset < data.offset + data.length) {
66 BitBlockCount block = bit_counter.NextWord();
67 if (block.AllSet()) {
68 // Block all not null
69 std::memcpy(out_values, in_values, block.length * sizeof(T));
70 } else if (block.NoneSet()) {
71 // Block all null
72 std::fill(out_values, out_values + block.length, value);
73 } else {
74 for (int64_t i = 0; i < block.length; ++i) {
75 out_values[i] = BitUtil::GetBit(is_valid, offset + i) ? in_values[i] : value;
76 }
77 }
78 offset += block.length;
79 out_values += block.length;
80 in_values += block.length;
81 }
82 output->buffers[1] = out_buf;
83 } else {
84 *output = data;
85 }
86 }
87 };
88
89 // Boolean input
90
91 template <typename Type>
92 struct FillNullFunctor<Type, enable_if_t<is_boolean_type<Type>::value>> {
Execarrow::compute::internal::__anon03f87fc10111::FillNullFunctor93 static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
94 const ArrayData& data = *batch[0].array();
95 const Scalar& fill_value = *batch[1].scalar();
96 ArrayData* output = out->mutable_array();
97
98 bool value = UnboxScalar<BooleanType>::Unbox(fill_value);
99 if (data.MayHaveNulls() != 0 && fill_value.is_valid) {
100 KERNEL_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> out_buf, ctx,
101 ctx->AllocateBitmap(data.length));
102
103 const uint8_t* is_valid = data.buffers[0]->data();
104 const uint8_t* data_bitmap = data.buffers[1]->data();
105 uint8_t* out_bitmap = out_buf->mutable_data();
106
107 int64_t data_offset = data.offset;
108 BitBlockCounter bit_counter(is_valid, data.offset, data.length);
109
110 int64_t out_offset = 0;
111 while (out_offset < data.length) {
112 BitBlockCount block = bit_counter.NextWord();
113 if (block.AllSet()) {
114 // Block all not null
115 ::arrow::internal::CopyBitmap(data_bitmap, data_offset, block.length,
116 out_bitmap, out_offset);
117 } else if (block.NoneSet()) {
118 // Block all null
119 BitUtil::SetBitsTo(out_bitmap, out_offset, block.length, value);
120 } else {
121 for (int64_t i = 0; i < block.length; ++i) {
122 BitUtil::SetBitTo(out_bitmap, out_offset + i,
123 BitUtil::GetBit(is_valid, data_offset + i)
124 ? BitUtil::GetBit(data_bitmap, data_offset + i)
125 : value);
126 }
127 }
128 data_offset += block.length;
129 out_offset += block.length;
130 }
131 output->buffers[1] = out_buf;
132 } else {
133 *output = data;
134 }
135 }
136 };
137
138 // Null input
139
140 template <typename Type>
141 struct FillNullFunctor<Type, enable_if_t<is_null_type<Type>::value>> {
Execarrow::compute::internal::__anon03f87fc10111::FillNullFunctor142 static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
143 // Nothing preallocated, so we assign into the output
144 *out->mutable_array() = *batch[0].array();
145 }
146 };
147
148 // Binary-like input
149
150 template <typename Type>
151 struct FillNullFunctor<Type, enable_if_t<is_base_binary_type<Type>::value>> {
152 using BuilderType = typename TypeTraits<Type>::BuilderType;
153
Execarrow::compute::internal::__anon03f87fc10111::FillNullFunctor154 static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
155 const ArrayData& input = *batch[0].array();
156 const auto& fill_value_scalar =
157 checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
158 util::string_view fill_value(*fill_value_scalar.value);
159 ArrayData* output = out->mutable_array();
160
161 // Ensure the kernel is configured properly to have no validity bitmap /
162 // null count 0 unless we explicitly propagate it below.
163 DCHECK(output->buffers[0] == nullptr);
164
165 const int64_t null_count = input.GetNullCount();
166
167 if (null_count > 0 && fill_value_scalar.is_valid) {
168 BuilderType builder(input.type, ctx->memory_pool());
169 KERNEL_RETURN_IF_ERROR(ctx, builder.ReserveData(input.buffers[2]->size() +
170 fill_value.length() * null_count));
171 KERNEL_RETURN_IF_ERROR(ctx, builder.Resize(input.length));
172
173 KERNEL_RETURN_IF_ERROR(ctx, VisitArrayDataInline<Type>(
174 input,
175 [&](util::string_view s) {
176 builder.UnsafeAppend(s);
177 return Status::OK();
178 },
179 [&]() {
180 builder.UnsafeAppend(fill_value);
181 return Status::OK();
182 }));
183 std::shared_ptr<Array> string_array;
184 KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_array));
185 *output = *string_array->data();
186 // The builder does not match the logical type, due to
187 // GenerateTypeAgnosticVarBinaryBase
188 output->type = input.type;
189 } else {
190 *output = input;
191 }
192 }
193 };
194
AddBasicFillNullKernels(ScalarKernel kernel,ScalarFunction * func)195 void AddBasicFillNullKernels(ScalarKernel kernel, ScalarFunction* func) {
196 auto AddKernels = [&](const std::vector<std::shared_ptr<DataType>>& types) {
197 for (const std::shared_ptr<DataType>& ty : types) {
198 kernel.signature =
199 KernelSignature::Make({InputType::Array(ty), InputType::Scalar(ty)}, ty);
200 kernel.exec = GenerateTypeAgnosticPrimitive<FillNullFunctor>(*ty);
201 DCHECK_OK(func->AddKernel(kernel));
202 }
203 };
204 AddKernels(NumericTypes());
205 AddKernels(TemporalTypes());
206 AddKernels({boolean(), null()});
207 }
208
AddBinaryFillNullKernels(ScalarKernel kernel,ScalarFunction * func)209 void AddBinaryFillNullKernels(ScalarKernel kernel, ScalarFunction* func) {
210 for (const std::shared_ptr<DataType>& ty : BaseBinaryTypes()) {
211 kernel.signature =
212 KernelSignature::Make({InputType::Array(ty), InputType::Scalar(ty)}, ty);
213 kernel.exec = GenerateTypeAgnosticVarBinaryBase<FillNullFunctor>(*ty);
214 DCHECK_OK(func->AddKernel(kernel));
215 }
216 }
217
218 const FunctionDoc fill_null_doc{
219 "Replace null elements",
220 ("`fill_value` must be a scalar of the same type as `values`.\n"
221 "Each non-null value in `values` is emitted as-is.\n"
222 "Each null value in `values` is replaced with `fill_value`."),
223 {"values", "fill_value"}};
224
225 } // namespace
226
RegisterScalarFillNull(FunctionRegistry * registry)227 void RegisterScalarFillNull(FunctionRegistry* registry) {
228 {
229 ScalarKernel fill_null_base;
230 fill_null_base.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
231 fill_null_base.mem_allocation = MemAllocation::NO_PREALLOCATE;
232 auto fill_null =
233 std::make_shared<ScalarFunction>("fill_null", Arity::Binary(), &fill_null_doc);
234 AddBasicFillNullKernels(fill_null_base, fill_null.get());
235 AddBinaryFillNullKernels(fill_null_base, fill_null.get());
236 DCHECK_OK(registry->AddFunction(fill_null));
237 }
238 }
239
240 } // namespace internal
241 } // namespace compute
242 } // namespace arrow
243