1 /**
2 * @file positive_delta_filter.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file defines class PositiveDeltaFilter.
31 */
32
33 #include "tiledb/sm/filter/positive_delta_filter.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/enums/datatype.h"
36 #include "tiledb/sm/enums/filter_option.h"
37 #include "tiledb/sm/enums/filter_type.h"
38 #include "tiledb/sm/filter/filter_pipeline.h"
39 #include "tiledb/sm/misc/utils.h"
40 #include "tiledb/sm/tile/tile.h"
41
42 using namespace tiledb::common;
43
44 namespace tiledb {
45 namespace sm {
46
PositiveDeltaFilter()47 PositiveDeltaFilter::PositiveDeltaFilter()
48 : Filter(FilterType::FILTER_POSITIVE_DELTA) {
49 max_window_size_ = 1024;
50 }
51
dump(FILE * out) const52 void PositiveDeltaFilter::dump(FILE* out) const {
53 if (out == nullptr)
54 out = stdout;
55 fprintf(out, "PositiveDelta: POSITIVE_DELTA_MAX_WINDOW=%u", max_window_size_);
56 }
57
run_forward(FilterBuffer * input_metadata,FilterBuffer * input,FilterBuffer * output_metadata,FilterBuffer * output) const58 Status PositiveDeltaFilter::run_forward(
59 FilterBuffer* input_metadata,
60 FilterBuffer* input,
61 FilterBuffer* output_metadata,
62 FilterBuffer* output) const {
63 auto tile_type = pipeline_->current_tile()->type();
64
65 // If encoding can't work, just return the input unmodified.
66 if (!datatype_is_integer(tile_type)) {
67 RETURN_NOT_OK(output->append_view(input));
68 RETURN_NOT_OK(output_metadata->append_view(input_metadata));
69 return Status::Ok();
70 }
71
72 switch (tile_type) {
73 case Datatype::INT8:
74 return run_forward<int8_t>(
75 input_metadata, input, output_metadata, output);
76 case Datatype::UINT8:
77 return run_forward<uint8_t>(
78 input_metadata, input, output_metadata, output);
79 case Datatype::INT16:
80 return run_forward<int16_t>(
81 input_metadata, input, output_metadata, output);
82 case Datatype::UINT16:
83 return run_forward<uint16_t>(
84 input_metadata, input, output_metadata, output);
85 case Datatype::INT32:
86 return run_forward<int>(input_metadata, input, output_metadata, output);
87 case Datatype::UINT32:
88 return run_forward<unsigned>(
89 input_metadata, input, output_metadata, output);
90 case Datatype::INT64:
91 return run_forward<int64_t>(
92 input_metadata, input, output_metadata, output);
93 case Datatype::UINT64:
94 return run_forward<uint64_t>(
95 input_metadata, input, output_metadata, output);
96 case Datatype::DATETIME_YEAR:
97 case Datatype::DATETIME_MONTH:
98 case Datatype::DATETIME_WEEK:
99 case Datatype::DATETIME_DAY:
100 case Datatype::DATETIME_HR:
101 case Datatype::DATETIME_MIN:
102 case Datatype::DATETIME_SEC:
103 case Datatype::DATETIME_MS:
104 case Datatype::DATETIME_US:
105 case Datatype::DATETIME_NS:
106 case Datatype::DATETIME_PS:
107 case Datatype::DATETIME_FS:
108 case Datatype::DATETIME_AS:
109 case Datatype::TIME_HR:
110 case Datatype::TIME_MIN:
111 case Datatype::TIME_SEC:
112 case Datatype::TIME_MS:
113 case Datatype::TIME_US:
114 case Datatype::TIME_NS:
115 case Datatype::TIME_PS:
116 case Datatype::TIME_FS:
117 case Datatype::TIME_AS:
118 return run_forward<int64_t>(
119 input_metadata, input, output_metadata, output);
120 default:
121 return LOG_STATUS(
122 Status::FilterError("Cannot filter; Unsupported input type"));
123 }
124 }
125
126 template <typename T>
run_forward(FilterBuffer * input_metadata,FilterBuffer * input,FilterBuffer * output_metadata,FilterBuffer * output) const127 Status PositiveDeltaFilter::run_forward(
128 FilterBuffer* input_metadata,
129 FilterBuffer* input,
130 FilterBuffer* output_metadata,
131 FilterBuffer* output) const {
132 // Compute the upper bound on the size of the output.
133 std::vector<ConstBuffer> parts = input->buffers();
134 auto num_parts = (uint32_t)parts.size();
135 uint64_t output_size_ub = 0;
136 uint32_t metadata_size = 2 * sizeof(uint32_t);
137 uint32_t total_num_windows = 0;
138 for (unsigned i = 0; i < num_parts; i++) {
139 auto part = &parts[i];
140 auto part_size = static_cast<uint32_t>(part->size());
141 // Compute overhead for the part
142 uint32_t window_size =
143 std::min(part_size, max_window_size_) / sizeof(T) * sizeof(T);
144 uint32_t num_windows =
145 part_size / window_size + uint32_t(bool(part_size % window_size));
146 uint32_t overhead = num_windows * (sizeof(uint32_t) + sizeof(T));
147 output_size_ub += part_size;
148 metadata_size += overhead;
149 total_num_windows += num_windows;
150 }
151
152 // Allocate space in output buffer for the upper bound.
153 RETURN_NOT_OK(output->prepend_buffer(output_size_ub));
154 Buffer* buffer_ptr = output->buffer_ptr(0);
155 buffer_ptr->reset_offset();
156 assert(buffer_ptr != nullptr);
157
158 // Forward the existing metadata
159 RETURN_NOT_OK(output_metadata->append_view(input_metadata));
160 // Allocate a buffer for this filter's metadata and write the header.
161 RETURN_NOT_OK(output_metadata->prepend_buffer(metadata_size));
162 RETURN_NOT_OK(output_metadata->write(&total_num_windows, sizeof(uint32_t)));
163
164 // Compress all parts.
165 for (unsigned i = 0; i < num_parts; i++) {
166 RETURN_NOT_OK(encode_part<T>(&parts[i], output, output_metadata));
167 }
168
169 return Status::Ok();
170 }
171
172 template <typename T>
encode_part(ConstBuffer * input,FilterBuffer * output,FilterBuffer * output_metadata) const173 Status PositiveDeltaFilter::encode_part(
174 ConstBuffer* input,
175 FilterBuffer* output,
176 FilterBuffer* output_metadata) const {
177 // Compute window size in bytes as a multiple of the element width
178 auto input_bytes = static_cast<uint32_t>(input->size());
179 uint32_t window_size = std::min(input_bytes, max_window_size_);
180 window_size = window_size / sizeof(T) * sizeof(T);
181
182 // Compute number of windows
183 uint32_t num_windows =
184 input_bytes / window_size + uint32_t(bool(input_bytes % window_size));
185
186 // Write each window.
187 for (uint32_t i = 0; i < num_windows; i++) {
188 // Compute the actual size in bytes of the window (may be smaller at the end
189 // of the input if the window size doesn't evenly divide).
190 auto window_nbytes = static_cast<uint32_t>(
191 std::min(window_size, input_bytes - i * window_size));
192 uint32_t window_nelts = window_nbytes / sizeof(T);
193
194 // Write window header.
195 T window_value_offset = input->value<T>();
196 RETURN_NOT_OK(output_metadata->write(&window_value_offset, sizeof(T)));
197 RETURN_NOT_OK(output_metadata->write(&window_nbytes, sizeof(uint32_t)));
198
199 if (window_nbytes % sizeof(T) != 0) {
200 // Can't encode; just write the window bytes unmodified.
201 RETURN_NOT_OK(
202 output->write((char*)input->data() + input->offset(), window_nbytes));
203 input->advance_offset(window_nbytes);
204 } else {
205 // Encode and write the relative values to output.
206 T prev_value = input->value<T>();
207 for (uint32_t j = 0; j < window_nelts; j++) {
208 T curr_value = input->value<T>();
209 if (curr_value < prev_value)
210 return LOG_STATUS(Status::FilterError(
211 "Positive delta filter error: delta is not positive."));
212
213 T delta = curr_value - prev_value;
214 RETURN_NOT_OK(output->write(&delta, sizeof(T)));
215 input->advance_offset(sizeof(T));
216
217 prev_value = curr_value;
218 }
219 }
220 }
221
222 return Status::Ok();
223 }
224
run_reverse(FilterBuffer * input_metadata,FilterBuffer * input,FilterBuffer * output_metadata,FilterBuffer * output,const Config & config) const225 Status PositiveDeltaFilter::run_reverse(
226 FilterBuffer* input_metadata,
227 FilterBuffer* input,
228 FilterBuffer* output_metadata,
229 FilterBuffer* output,
230 const Config& config) const {
231 (void)config;
232
233 auto tile_type = pipeline_->current_tile()->type();
234
235 // If encoding wasn't applied, just return the input unmodified.
236 if (!datatype_is_integer(tile_type)) {
237 RETURN_NOT_OK(output->append_view(input));
238 RETURN_NOT_OK(output_metadata->append_view(input_metadata));
239 return Status::Ok();
240 }
241
242 switch (tile_type) {
243 case Datatype::INT8:
244 return run_reverse<int8_t>(
245 input_metadata, input, output_metadata, output);
246 case Datatype::UINT8:
247 return run_reverse<uint8_t>(
248 input_metadata, input, output_metadata, output);
249 case Datatype::INT16:
250 return run_reverse<int16_t>(
251 input_metadata, input, output_metadata, output);
252 case Datatype::UINT16:
253 return run_reverse<uint16_t>(
254 input_metadata, input, output_metadata, output);
255 case Datatype::INT32:
256 return run_reverse<int>(input_metadata, input, output_metadata, output);
257 case Datatype::UINT32:
258 return run_reverse<unsigned>(
259 input_metadata, input, output_metadata, output);
260 case Datatype::INT64:
261 return run_reverse<int64_t>(
262 input_metadata, input, output_metadata, output);
263 case Datatype::UINT64:
264 return run_reverse<uint64_t>(
265 input_metadata, input, output_metadata, output);
266 case Datatype::DATETIME_YEAR:
267 case Datatype::DATETIME_MONTH:
268 case Datatype::DATETIME_WEEK:
269 case Datatype::DATETIME_DAY:
270 case Datatype::DATETIME_HR:
271 case Datatype::DATETIME_MIN:
272 case Datatype::DATETIME_SEC:
273 case Datatype::DATETIME_MS:
274 case Datatype::DATETIME_US:
275 case Datatype::DATETIME_NS:
276 case Datatype::DATETIME_PS:
277 case Datatype::DATETIME_FS:
278 case Datatype::DATETIME_AS:
279 case Datatype::TIME_HR:
280 case Datatype::TIME_MIN:
281 case Datatype::TIME_SEC:
282 case Datatype::TIME_MS:
283 case Datatype::TIME_US:
284 case Datatype::TIME_NS:
285 case Datatype::TIME_PS:
286 case Datatype::TIME_FS:
287 case Datatype::TIME_AS:
288 return run_reverse<int64_t>(
289 input_metadata, input, output_metadata, output);
290 default:
291 return LOG_STATUS(
292 Status::FilterError("Cannot filter; Unsupported input type"));
293 }
294 }
295
296 template <typename T>
run_reverse(FilterBuffer * input_metadata,FilterBuffer * input,FilterBuffer * output_metadata,FilterBuffer * output) const297 Status PositiveDeltaFilter::run_reverse(
298 FilterBuffer* input_metadata,
299 FilterBuffer* input,
300 FilterBuffer* output_metadata,
301 FilterBuffer* output) const {
302 auto tile_type = pipeline_->current_tile()->type();
303 auto tile_type_size = datatype_size(tile_type);
304
305 uint32_t num_windows;
306 RETURN_NOT_OK(input_metadata->read(&num_windows, sizeof(uint32_t)));
307
308 RETURN_NOT_OK(output->prepend_buffer(input->size()));
309 output->reset_offset();
310
311 // Read each window
312 for (uint32_t i = 0; i < num_windows; i++) {
313 uint32_t window_nbytes;
314 T window_value_offset;
315 // Read window header
316 RETURN_NOT_OK(input_metadata->read(&window_value_offset, tile_type_size));
317 RETURN_NOT_OK(input_metadata->read(&window_nbytes, sizeof(uint32_t)));
318
319 if (window_nbytes % sizeof(T) != 0) {
320 // Window was not encoded.
321 RETURN_NOT_OK(output->write(input, window_nbytes));
322 input->advance_offset(window_nbytes);
323 } else {
324 // Read and decode each window value.
325 uint32_t window_nelts = window_nbytes / sizeof(T);
326 T prev_value = window_value_offset;
327 for (uint32_t j = 0; j < window_nelts; j++) {
328 T delta;
329 RETURN_NOT_OK(input->read(&delta, sizeof(T)));
330 T decoded_value = prev_value + delta;
331 RETURN_NOT_OK(output->write(&decoded_value, tile_type_size));
332
333 prev_value = decoded_value;
334 }
335 }
336 }
337
338 // Output metadata is a view on the input metadata, skipping what was used by
339 // this filter.
340 auto md_offset = input_metadata->offset();
341 RETURN_NOT_OK(output_metadata->append_view(
342 input_metadata, md_offset, input_metadata->size() - md_offset));
343
344 return Status::Ok();
345 }
346
set_option_impl(FilterOption option,const void * value)347 Status PositiveDeltaFilter::set_option_impl(
348 FilterOption option, const void* value) {
349 if (value == nullptr)
350 return LOG_STATUS(Status::FilterError(
351 "Positive delta filter error; invalid option value"));
352
353 switch (option) {
354 case FilterOption::POSITIVE_DELTA_MAX_WINDOW:
355 max_window_size_ = *(uint32_t*)value;
356 return Status::Ok();
357 default:
358 return LOG_STATUS(
359 Status::FilterError("Positive delta filter error; unknown option"));
360 }
361 }
362
get_option_impl(FilterOption option,void * value) const363 Status PositiveDeltaFilter::get_option_impl(
364 FilterOption option, void* value) const {
365 switch (option) {
366 case FilterOption::POSITIVE_DELTA_MAX_WINDOW:
367 *(uint32_t*)value = max_window_size_;
368 return Status::Ok();
369 default:
370 return LOG_STATUS(
371 Status::FilterError("Positive delta filter error; unknown option"));
372 }
373 }
374
max_window_size() const375 uint32_t PositiveDeltaFilter::max_window_size() const {
376 return max_window_size_;
377 }
378
set_max_window_size(uint32_t max_window_size)379 void PositiveDeltaFilter::set_max_window_size(uint32_t max_window_size) {
380 max_window_size_ = max_window_size;
381 }
382
clone_impl() const383 PositiveDeltaFilter* PositiveDeltaFilter::clone_impl() const {
384 auto clone = new PositiveDeltaFilter;
385 clone->max_window_size_ = max_window_size_;
386 return clone;
387 }
388
deserialize_impl(ConstBuffer * buff)389 Status PositiveDeltaFilter::deserialize_impl(ConstBuffer* buff) {
390 RETURN_NOT_OK(buff->read(&max_window_size_, sizeof(uint32_t)));
391 return Status::Ok();
392 }
393
serialize_impl(Buffer * buff) const394 Status PositiveDeltaFilter::serialize_impl(Buffer* buff) const {
395 RETURN_NOT_OK(buff->write(&max_window_size_, sizeof(uint32_t)));
396 return Status::Ok();
397 }
398
399 } // namespace sm
400 } // namespace tiledb
401