1 /**
2  * @file   positive_delta_filter.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file defines class PositiveDeltaFilter.
31  */
32 
33 #include "tiledb/sm/filter/positive_delta_filter.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/enums/datatype.h"
36 #include "tiledb/sm/enums/filter_option.h"
37 #include "tiledb/sm/enums/filter_type.h"
38 #include "tiledb/sm/filter/filter_pipeline.h"
39 #include "tiledb/sm/misc/utils.h"
40 #include "tiledb/sm/tile/tile.h"
41 
42 using namespace tiledb::common;
43 
44 namespace tiledb {
45 namespace sm {
46 
PositiveDeltaFilter()47 PositiveDeltaFilter::PositiveDeltaFilter()
48     : Filter(FilterType::FILTER_POSITIVE_DELTA) {
49   max_window_size_ = 1024;
50 }
51 
dump(FILE * out) const52 void PositiveDeltaFilter::dump(FILE* out) const {
53   if (out == nullptr)
54     out = stdout;
55   fprintf(out, "PositiveDelta: POSITIVE_DELTA_MAX_WINDOW=%u", max_window_size_);
56 }
57 
run_forward(FilterBuffer * input_metadata,FilterBuffer * input,FilterBuffer * output_metadata,FilterBuffer * output) const58 Status PositiveDeltaFilter::run_forward(
59     FilterBuffer* input_metadata,
60     FilterBuffer* input,
61     FilterBuffer* output_metadata,
62     FilterBuffer* output) const {
63   auto tile_type = pipeline_->current_tile()->type();
64 
65   // If encoding can't work, just return the input unmodified.
66   if (!datatype_is_integer(tile_type)) {
67     RETURN_NOT_OK(output->append_view(input));
68     RETURN_NOT_OK(output_metadata->append_view(input_metadata));
69     return Status::Ok();
70   }
71 
72   switch (tile_type) {
73     case Datatype::INT8:
74       return run_forward<int8_t>(
75           input_metadata, input, output_metadata, output);
76     case Datatype::UINT8:
77       return run_forward<uint8_t>(
78           input_metadata, input, output_metadata, output);
79     case Datatype::INT16:
80       return run_forward<int16_t>(
81           input_metadata, input, output_metadata, output);
82     case Datatype::UINT16:
83       return run_forward<uint16_t>(
84           input_metadata, input, output_metadata, output);
85     case Datatype::INT32:
86       return run_forward<int>(input_metadata, input, output_metadata, output);
87     case Datatype::UINT32:
88       return run_forward<unsigned>(
89           input_metadata, input, output_metadata, output);
90     case Datatype::INT64:
91       return run_forward<int64_t>(
92           input_metadata, input, output_metadata, output);
93     case Datatype::UINT64:
94       return run_forward<uint64_t>(
95           input_metadata, input, output_metadata, output);
96     case Datatype::DATETIME_YEAR:
97     case Datatype::DATETIME_MONTH:
98     case Datatype::DATETIME_WEEK:
99     case Datatype::DATETIME_DAY:
100     case Datatype::DATETIME_HR:
101     case Datatype::DATETIME_MIN:
102     case Datatype::DATETIME_SEC:
103     case Datatype::DATETIME_MS:
104     case Datatype::DATETIME_US:
105     case Datatype::DATETIME_NS:
106     case Datatype::DATETIME_PS:
107     case Datatype::DATETIME_FS:
108     case Datatype::DATETIME_AS:
109     case Datatype::TIME_HR:
110     case Datatype::TIME_MIN:
111     case Datatype::TIME_SEC:
112     case Datatype::TIME_MS:
113     case Datatype::TIME_US:
114     case Datatype::TIME_NS:
115     case Datatype::TIME_PS:
116     case Datatype::TIME_FS:
117     case Datatype::TIME_AS:
118       return run_forward<int64_t>(
119           input_metadata, input, output_metadata, output);
120     default:
121       return LOG_STATUS(
122           Status::FilterError("Cannot filter; Unsupported input type"));
123   }
124 }
125 
126 template <typename T>
run_forward(FilterBuffer * input_metadata,FilterBuffer * input,FilterBuffer * output_metadata,FilterBuffer * output) const127 Status PositiveDeltaFilter::run_forward(
128     FilterBuffer* input_metadata,
129     FilterBuffer* input,
130     FilterBuffer* output_metadata,
131     FilterBuffer* output) const {
132   // Compute the upper bound on the size of the output.
133   std::vector<ConstBuffer> parts = input->buffers();
134   auto num_parts = (uint32_t)parts.size();
135   uint64_t output_size_ub = 0;
136   uint32_t metadata_size = 2 * sizeof(uint32_t);
137   uint32_t total_num_windows = 0;
138   for (unsigned i = 0; i < num_parts; i++) {
139     auto part = &parts[i];
140     auto part_size = static_cast<uint32_t>(part->size());
141     // Compute overhead for the part
142     uint32_t window_size =
143         std::min(part_size, max_window_size_) / sizeof(T) * sizeof(T);
144     uint32_t num_windows =
145         part_size / window_size + uint32_t(bool(part_size % window_size));
146     uint32_t overhead = num_windows * (sizeof(uint32_t) + sizeof(T));
147     output_size_ub += part_size;
148     metadata_size += overhead;
149     total_num_windows += num_windows;
150   }
151 
152   // Allocate space in output buffer for the upper bound.
153   RETURN_NOT_OK(output->prepend_buffer(output_size_ub));
154   Buffer* buffer_ptr = output->buffer_ptr(0);
155   buffer_ptr->reset_offset();
156   assert(buffer_ptr != nullptr);
157 
158   // Forward the existing metadata
159   RETURN_NOT_OK(output_metadata->append_view(input_metadata));
160   // Allocate a buffer for this filter's metadata and write the header.
161   RETURN_NOT_OK(output_metadata->prepend_buffer(metadata_size));
162   RETURN_NOT_OK(output_metadata->write(&total_num_windows, sizeof(uint32_t)));
163 
164   // Compress all parts.
165   for (unsigned i = 0; i < num_parts; i++) {
166     RETURN_NOT_OK(encode_part<T>(&parts[i], output, output_metadata));
167   }
168 
169   return Status::Ok();
170 }
171 
172 template <typename T>
encode_part(ConstBuffer * input,FilterBuffer * output,FilterBuffer * output_metadata) const173 Status PositiveDeltaFilter::encode_part(
174     ConstBuffer* input,
175     FilterBuffer* output,
176     FilterBuffer* output_metadata) const {
177   // Compute window size in bytes as a multiple of the element width
178   auto input_bytes = static_cast<uint32_t>(input->size());
179   uint32_t window_size = std::min(input_bytes, max_window_size_);
180   window_size = window_size / sizeof(T) * sizeof(T);
181 
182   // Compute number of windows
183   uint32_t num_windows =
184       input_bytes / window_size + uint32_t(bool(input_bytes % window_size));
185 
186   // Write each window.
187   for (uint32_t i = 0; i < num_windows; i++) {
188     // Compute the actual size in bytes of the window (may be smaller at the end
189     // of the input if the window size doesn't evenly divide).
190     auto window_nbytes = static_cast<uint32_t>(
191         std::min(window_size, input_bytes - i * window_size));
192     uint32_t window_nelts = window_nbytes / sizeof(T);
193 
194     // Write window header.
195     T window_value_offset = input->value<T>();
196     RETURN_NOT_OK(output_metadata->write(&window_value_offset, sizeof(T)));
197     RETURN_NOT_OK(output_metadata->write(&window_nbytes, sizeof(uint32_t)));
198 
199     if (window_nbytes % sizeof(T) != 0) {
200       // Can't encode; just write the window bytes unmodified.
201       RETURN_NOT_OK(
202           output->write((char*)input->data() + input->offset(), window_nbytes));
203       input->advance_offset(window_nbytes);
204     } else {
205       // Encode and write the relative values to output.
206       T prev_value = input->value<T>();
207       for (uint32_t j = 0; j < window_nelts; j++) {
208         T curr_value = input->value<T>();
209         if (curr_value < prev_value)
210           return LOG_STATUS(Status::FilterError(
211               "Positive delta filter error: delta is not positive."));
212 
213         T delta = curr_value - prev_value;
214         RETURN_NOT_OK(output->write(&delta, sizeof(T)));
215         input->advance_offset(sizeof(T));
216 
217         prev_value = curr_value;
218       }
219     }
220   }
221 
222   return Status::Ok();
223 }
224 
run_reverse(FilterBuffer * input_metadata,FilterBuffer * input,FilterBuffer * output_metadata,FilterBuffer * output,const Config & config) const225 Status PositiveDeltaFilter::run_reverse(
226     FilterBuffer* input_metadata,
227     FilterBuffer* input,
228     FilterBuffer* output_metadata,
229     FilterBuffer* output,
230     const Config& config) const {
231   (void)config;
232 
233   auto tile_type = pipeline_->current_tile()->type();
234 
235   // If encoding wasn't applied, just return the input unmodified.
236   if (!datatype_is_integer(tile_type)) {
237     RETURN_NOT_OK(output->append_view(input));
238     RETURN_NOT_OK(output_metadata->append_view(input_metadata));
239     return Status::Ok();
240   }
241 
242   switch (tile_type) {
243     case Datatype::INT8:
244       return run_reverse<int8_t>(
245           input_metadata, input, output_metadata, output);
246     case Datatype::UINT8:
247       return run_reverse<uint8_t>(
248           input_metadata, input, output_metadata, output);
249     case Datatype::INT16:
250       return run_reverse<int16_t>(
251           input_metadata, input, output_metadata, output);
252     case Datatype::UINT16:
253       return run_reverse<uint16_t>(
254           input_metadata, input, output_metadata, output);
255     case Datatype::INT32:
256       return run_reverse<int>(input_metadata, input, output_metadata, output);
257     case Datatype::UINT32:
258       return run_reverse<unsigned>(
259           input_metadata, input, output_metadata, output);
260     case Datatype::INT64:
261       return run_reverse<int64_t>(
262           input_metadata, input, output_metadata, output);
263     case Datatype::UINT64:
264       return run_reverse<uint64_t>(
265           input_metadata, input, output_metadata, output);
266     case Datatype::DATETIME_YEAR:
267     case Datatype::DATETIME_MONTH:
268     case Datatype::DATETIME_WEEK:
269     case Datatype::DATETIME_DAY:
270     case Datatype::DATETIME_HR:
271     case Datatype::DATETIME_MIN:
272     case Datatype::DATETIME_SEC:
273     case Datatype::DATETIME_MS:
274     case Datatype::DATETIME_US:
275     case Datatype::DATETIME_NS:
276     case Datatype::DATETIME_PS:
277     case Datatype::DATETIME_FS:
278     case Datatype::DATETIME_AS:
279     case Datatype::TIME_HR:
280     case Datatype::TIME_MIN:
281     case Datatype::TIME_SEC:
282     case Datatype::TIME_MS:
283     case Datatype::TIME_US:
284     case Datatype::TIME_NS:
285     case Datatype::TIME_PS:
286     case Datatype::TIME_FS:
287     case Datatype::TIME_AS:
288       return run_reverse<int64_t>(
289           input_metadata, input, output_metadata, output);
290     default:
291       return LOG_STATUS(
292           Status::FilterError("Cannot filter; Unsupported input type"));
293   }
294 }
295 
296 template <typename T>
run_reverse(FilterBuffer * input_metadata,FilterBuffer * input,FilterBuffer * output_metadata,FilterBuffer * output) const297 Status PositiveDeltaFilter::run_reverse(
298     FilterBuffer* input_metadata,
299     FilterBuffer* input,
300     FilterBuffer* output_metadata,
301     FilterBuffer* output) const {
302   auto tile_type = pipeline_->current_tile()->type();
303   auto tile_type_size = datatype_size(tile_type);
304 
305   uint32_t num_windows;
306   RETURN_NOT_OK(input_metadata->read(&num_windows, sizeof(uint32_t)));
307 
308   RETURN_NOT_OK(output->prepend_buffer(input->size()));
309   output->reset_offset();
310 
311   // Read each window
312   for (uint32_t i = 0; i < num_windows; i++) {
313     uint32_t window_nbytes;
314     T window_value_offset;
315     // Read window header
316     RETURN_NOT_OK(input_metadata->read(&window_value_offset, tile_type_size));
317     RETURN_NOT_OK(input_metadata->read(&window_nbytes, sizeof(uint32_t)));
318 
319     if (window_nbytes % sizeof(T) != 0) {
320       // Window was not encoded.
321       RETURN_NOT_OK(output->write(input, window_nbytes));
322       input->advance_offset(window_nbytes);
323     } else {
324       // Read and decode each window value.
325       uint32_t window_nelts = window_nbytes / sizeof(T);
326       T prev_value = window_value_offset;
327       for (uint32_t j = 0; j < window_nelts; j++) {
328         T delta;
329         RETURN_NOT_OK(input->read(&delta, sizeof(T)));
330         T decoded_value = prev_value + delta;
331         RETURN_NOT_OK(output->write(&decoded_value, tile_type_size));
332 
333         prev_value = decoded_value;
334       }
335     }
336   }
337 
338   // Output metadata is a view on the input metadata, skipping what was used by
339   // this filter.
340   auto md_offset = input_metadata->offset();
341   RETURN_NOT_OK(output_metadata->append_view(
342       input_metadata, md_offset, input_metadata->size() - md_offset));
343 
344   return Status::Ok();
345 }
346 
set_option_impl(FilterOption option,const void * value)347 Status PositiveDeltaFilter::set_option_impl(
348     FilterOption option, const void* value) {
349   if (value == nullptr)
350     return LOG_STATUS(Status::FilterError(
351         "Positive delta filter error; invalid option value"));
352 
353   switch (option) {
354     case FilterOption::POSITIVE_DELTA_MAX_WINDOW:
355       max_window_size_ = *(uint32_t*)value;
356       return Status::Ok();
357     default:
358       return LOG_STATUS(
359           Status::FilterError("Positive delta filter error; unknown option"));
360   }
361 }
362 
get_option_impl(FilterOption option,void * value) const363 Status PositiveDeltaFilter::get_option_impl(
364     FilterOption option, void* value) const {
365   switch (option) {
366     case FilterOption::POSITIVE_DELTA_MAX_WINDOW:
367       *(uint32_t*)value = max_window_size_;
368       return Status::Ok();
369     default:
370       return LOG_STATUS(
371           Status::FilterError("Positive delta filter error; unknown option"));
372   }
373 }
374 
max_window_size() const375 uint32_t PositiveDeltaFilter::max_window_size() const {
376   return max_window_size_;
377 }
378 
set_max_window_size(uint32_t max_window_size)379 void PositiveDeltaFilter::set_max_window_size(uint32_t max_window_size) {
380   max_window_size_ = max_window_size;
381 }
382 
clone_impl() const383 PositiveDeltaFilter* PositiveDeltaFilter::clone_impl() const {
384   auto clone = new PositiveDeltaFilter;
385   clone->max_window_size_ = max_window_size_;
386   return clone;
387 }
388 
deserialize_impl(ConstBuffer * buff)389 Status PositiveDeltaFilter::deserialize_impl(ConstBuffer* buff) {
390   RETURN_NOT_OK(buff->read(&max_window_size_, sizeof(uint32_t)));
391   return Status::Ok();
392 }
393 
serialize_impl(Buffer * buff) const394 Status PositiveDeltaFilter::serialize_impl(Buffer* buff) const {
395   RETURN_NOT_OK(buff->write(&max_window_size_, sizeof(uint32_t)));
396   return Status::Ok();
397 }
398 
399 }  // namespace sm
400 }  // namespace tiledb
401