1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <algorithm>
19 #include <cstring>
20 
21 #include "arrow/compute/kernels/common.h"
22 #include "arrow/scalar.h"
23 #include "arrow/util/bit_block_counter.h"
24 #include "arrow/util/bit_util.h"
25 #include "arrow/util/bitmap_ops.h"
26 
27 namespace arrow {
28 
29 using internal::BitBlockCount;
30 using internal::BitBlockCounter;
31 
32 namespace compute {
33 namespace internal {
34 
35 namespace {
36 
37 template <typename Type, typename Enable = void>
38 struct FillNullFunctor {};
39 
40 // Numeric inputs
41 
42 template <typename Type>
43 struct FillNullFunctor<Type, enable_if_t<is_number_type<Type>::value>> {
44   using T = typename TypeTraits<Type>::CType;
45 
Execarrow::compute::internal::__anon03f87fc10111::FillNullFunctor46   static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
47     const ArrayData& data = *batch[0].array();
48     const Scalar& fill_value = *batch[1].scalar();
49     ArrayData* output = out->mutable_array();
50 
51     // Ensure the kernel is configured properly to have no validity bitmap /
52     // null count 0 unless we explicitly propagate it below.
53     DCHECK(output->buffers[0] == nullptr);
54 
55     T value = UnboxScalar<Type>::Unbox(fill_value);
56     if (data.MayHaveNulls() != 0 && fill_value.is_valid) {
57       KERNEL_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> out_buf, ctx,
58                              ctx->Allocate(data.length * sizeof(T)));
59 
60       const uint8_t* is_valid = data.buffers[0]->data();
61       const T* in_values = data.GetValues<T>(1);
62       T* out_values = reinterpret_cast<T*>(out_buf->mutable_data());
63       int64_t offset = data.offset;
64       BitBlockCounter bit_counter(is_valid, data.offset, data.length);
65       while (offset < data.offset + data.length) {
66         BitBlockCount block = bit_counter.NextWord();
67         if (block.AllSet()) {
68           // Block all not null
69           std::memcpy(out_values, in_values, block.length * sizeof(T));
70         } else if (block.NoneSet()) {
71           // Block all null
72           std::fill(out_values, out_values + block.length, value);
73         } else {
74           for (int64_t i = 0; i < block.length; ++i) {
75             out_values[i] = BitUtil::GetBit(is_valid, offset + i) ? in_values[i] : value;
76           }
77         }
78         offset += block.length;
79         out_values += block.length;
80         in_values += block.length;
81       }
82       output->buffers[1] = out_buf;
83     } else {
84       *output = data;
85     }
86   }
87 };
88 
89 // Boolean input
90 
91 template <typename Type>
92 struct FillNullFunctor<Type, enable_if_t<is_boolean_type<Type>::value>> {
Execarrow::compute::internal::__anon03f87fc10111::FillNullFunctor93   static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
94     const ArrayData& data = *batch[0].array();
95     const Scalar& fill_value = *batch[1].scalar();
96     ArrayData* output = out->mutable_array();
97 
98     bool value = UnboxScalar<BooleanType>::Unbox(fill_value);
99     if (data.MayHaveNulls() != 0 && fill_value.is_valid) {
100       KERNEL_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> out_buf, ctx,
101                              ctx->AllocateBitmap(data.length));
102 
103       const uint8_t* is_valid = data.buffers[0]->data();
104       const uint8_t* data_bitmap = data.buffers[1]->data();
105       uint8_t* out_bitmap = out_buf->mutable_data();
106 
107       int64_t data_offset = data.offset;
108       BitBlockCounter bit_counter(is_valid, data.offset, data.length);
109 
110       int64_t out_offset = 0;
111       while (out_offset < data.length) {
112         BitBlockCount block = bit_counter.NextWord();
113         if (block.AllSet()) {
114           // Block all not null
115           ::arrow::internal::CopyBitmap(data_bitmap, data_offset, block.length,
116                                         out_bitmap, out_offset);
117         } else if (block.NoneSet()) {
118           // Block all null
119           BitUtil::SetBitsTo(out_bitmap, out_offset, block.length, value);
120         } else {
121           for (int64_t i = 0; i < block.length; ++i) {
122             BitUtil::SetBitTo(out_bitmap, out_offset + i,
123                               BitUtil::GetBit(is_valid, data_offset + i)
124                                   ? BitUtil::GetBit(data_bitmap, data_offset + i)
125                                   : value);
126           }
127         }
128         data_offset += block.length;
129         out_offset += block.length;
130       }
131       output->buffers[1] = out_buf;
132     } else {
133       *output = data;
134     }
135   }
136 };
137 
138 // Null input
139 
140 template <typename Type>
141 struct FillNullFunctor<Type, enable_if_t<is_null_type<Type>::value>> {
Execarrow::compute::internal::__anon03f87fc10111::FillNullFunctor142   static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
143     // Nothing preallocated, so we assign into the output
144     *out->mutable_array() = *batch[0].array();
145   }
146 };
147 
148 // Binary-like input
149 
150 template <typename Type>
151 struct FillNullFunctor<Type, enable_if_t<is_base_binary_type<Type>::value>> {
152   using BuilderType = typename TypeTraits<Type>::BuilderType;
153 
Execarrow::compute::internal::__anon03f87fc10111::FillNullFunctor154   static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
155     const ArrayData& input = *batch[0].array();
156     const auto& fill_value_scalar =
157         checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
158     util::string_view fill_value(*fill_value_scalar.value);
159     ArrayData* output = out->mutable_array();
160 
161     // Ensure the kernel is configured properly to have no validity bitmap /
162     // null count 0 unless we explicitly propagate it below.
163     DCHECK(output->buffers[0] == nullptr);
164 
165     const int64_t null_count = input.GetNullCount();
166 
167     if (null_count > 0 && fill_value_scalar.is_valid) {
168       BuilderType builder(input.type, ctx->memory_pool());
169       KERNEL_RETURN_IF_ERROR(ctx, builder.ReserveData(input.buffers[2]->size() +
170                                                       fill_value.length() * null_count));
171       KERNEL_RETURN_IF_ERROR(ctx, builder.Resize(input.length));
172 
173       KERNEL_RETURN_IF_ERROR(ctx, VisitArrayDataInline<Type>(
174                                       input,
175                                       [&](util::string_view s) {
176                                         builder.UnsafeAppend(s);
177                                         return Status::OK();
178                                       },
179                                       [&]() {
180                                         builder.UnsafeAppend(fill_value);
181                                         return Status::OK();
182                                       }));
183       std::shared_ptr<Array> string_array;
184       KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_array));
185       *output = *string_array->data();
186       // The builder does not match the logical type, due to
187       // GenerateTypeAgnosticVarBinaryBase
188       output->type = input.type;
189     } else {
190       *output = input;
191     }
192   }
193 };
194 
AddBasicFillNullKernels(ScalarKernel kernel,ScalarFunction * func)195 void AddBasicFillNullKernels(ScalarKernel kernel, ScalarFunction* func) {
196   auto AddKernels = [&](const std::vector<std::shared_ptr<DataType>>& types) {
197     for (const std::shared_ptr<DataType>& ty : types) {
198       kernel.signature =
199           KernelSignature::Make({InputType::Array(ty), InputType::Scalar(ty)}, ty);
200       kernel.exec = GenerateTypeAgnosticPrimitive<FillNullFunctor>(*ty);
201       DCHECK_OK(func->AddKernel(kernel));
202     }
203   };
204   AddKernels(NumericTypes());
205   AddKernels(TemporalTypes());
206   AddKernels({boolean(), null()});
207 }
208 
AddBinaryFillNullKernels(ScalarKernel kernel,ScalarFunction * func)209 void AddBinaryFillNullKernels(ScalarKernel kernel, ScalarFunction* func) {
210   for (const std::shared_ptr<DataType>& ty : BaseBinaryTypes()) {
211     kernel.signature =
212         KernelSignature::Make({InputType::Array(ty), InputType::Scalar(ty)}, ty);
213     kernel.exec = GenerateTypeAgnosticVarBinaryBase<FillNullFunctor>(*ty);
214     DCHECK_OK(func->AddKernel(kernel));
215   }
216 }
217 
218 const FunctionDoc fill_null_doc{
219     "Replace null elements",
220     ("`fill_value` must be a scalar of the same type as `values`.\n"
221      "Each non-null value in `values` is emitted as-is.\n"
222      "Each null value in `values` is replaced with `fill_value`."),
223     {"values", "fill_value"}};
224 
225 }  // namespace
226 
RegisterScalarFillNull(FunctionRegistry * registry)227 void RegisterScalarFillNull(FunctionRegistry* registry) {
228   {
229     ScalarKernel fill_null_base;
230     fill_null_base.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
231     fill_null_base.mem_allocation = MemAllocation::NO_PREALLOCATE;
232     auto fill_null =
233         std::make_shared<ScalarFunction>("fill_null", Arity::Binary(), &fill_null_doc);
234     AddBasicFillNullKernels(fill_null_base, fill_null.get());
235     AddBinaryFillNullKernels(fill_null_base, fill_null.get());
236     DCHECK_OK(registry->AddFunction(fill_null));
237   }
238 }
239 
240 }  // namespace internal
241 }  // namespace compute
242 }  // namespace arrow
243