1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 //! DateTime expressions
19 use std::sync::Arc;
20
21 use super::ColumnarValue;
22 use crate::{
23 error::{DataFusionError, Result},
24 scalar::{ScalarType, ScalarValue},
25 };
26 use arrow::{
27 array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
28 datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType},
29 };
30 use arrow::{
31 array::{
32 Date32Array, Date64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray,
34 },
35 compute::kernels::temporal,
36 datatypes::TimeUnit,
37 temporal_conversions::timestamp_ns_to_datetime,
38 };
39 use chrono::prelude::*;
40 use chrono::Duration;
41 use chrono::LocalResult;
42
43 #[inline]
44 /// Accepts a string in RFC3339 / ISO8601 standard format and some
45 /// variants and converts it to a nanosecond precision timestamp.
46 ///
47 /// Implements the `to_timestamp` function to convert a string to a
48 /// timestamp, following the model of spark SQL’s to_`timestamp`.
49 ///
50 /// In addition to RFC3339 / ISO8601 standard timestamps, it also
51 /// accepts strings that use a space ` ` to separate the date and time
52 /// as well as strings that have no explicit timezone offset.
53 ///
54 /// Examples of accepted inputs:
55 /// * `1997-01-31T09:26:56.123Z` # RCF3339
56 /// * `1997-01-31T09:26:56.123-05:00` # RCF3339
57 /// * `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space rather than T
58 /// * `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone offset specified
59 /// * `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and no timezone offset
60 /// * `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
61 //
62 /// Internally, this function uses the `chrono` library for the
63 /// datetime parsing
64 ///
65 /// We hope to extend this function in the future with a second
66 /// parameter to specifying the format string.
67 ///
68 /// ## Timestamp Precision
69 ///
70 /// DataFusion uses the maximum precision timestamps supported by
71 /// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This
72 /// means the range of dates that timestamps can represent is ~1677 AD
73 /// to 2262 AM
74 ///
75 ///
76 /// ## Timezone / Offset Handling
77 ///
78 /// By using the Arrow format, DataFusion inherits Arrow’s handling of
79 /// timestamp values. Specifically, the stored numerical values of
80 /// timestamps are stored compared to offset UTC.
81 ///
82 /// This function intertprets strings without an explicit time zone as
83 /// timestamps with offsets of the local time on the machine that ran
84 /// the datafusion query
85 ///
86 /// For example, `1997-01-31 09:26:56.123Z` is interpreted as UTC, as
87 /// it has an explicit timezone specifier (“Z” for Zulu/UTC)
88 ///
89 /// `1997-01-31T09:26:56.123` is interpreted as a local timestamp in
90 /// the timezone of the machine that ran DataFusion. For example, if
91 /// the system timezone is set to Americas/New_York (UTC-5) the
92 /// timestamp will be interpreted as though it were
93 /// `1997-01-31T09:26:56.123-05:00`
string_to_timestamp_nanos(s: &str) -> Result<i64>94 fn string_to_timestamp_nanos(s: &str) -> Result<i64> {
95 // Fast path: RFC3339 timestamp (with a T)
96 // Example: 2020-09-08T13:42:29.190855Z
97 if let Ok(ts) = DateTime::parse_from_rfc3339(s) {
98 return Ok(ts.timestamp_nanos());
99 }
100
101 // Implement quasi-RFC3339 support by trying to parse the
102 // timestamp with various other format specifiers to to support
103 // separating the date and time with a space ' ' rather than 'T' to be
104 // (more) compatible with Apache Spark SQL
105
106 // timezone offset, using ' ' as a separator
107 // Example: 2020-09-08 13:42:29.190855-05:00
108 if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") {
109 return Ok(ts.timestamp_nanos());
110 }
111
112 // with an explicit Z, using ' ' as a separator
113 // Example: 2020-09-08 13:42:29Z
114 if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") {
115 return Ok(ts.timestamp_nanos());
116 }
117
118 // Support timestamps without an explicit timezone offset, again
119 // to be compatible with what Apache Spark SQL does.
120
121 // without a timezone specifier as a local time, using T as a separator
122 // Example: 2020-09-08T13:42:29.190855
123 if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f") {
124 return naive_datetime_to_timestamp(s, ts);
125 }
126
127 // without a timezone specifier as a local time, using T as a
128 // separator, no fractional seconds
129 // Example: 2020-09-08T13:42:29
130 if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
131 return naive_datetime_to_timestamp(s, ts);
132 }
133
134 // without a timezone specifier as a local time, using ' ' as a separator
135 // Example: 2020-09-08 13:42:29.190855
136 if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S.%f") {
137 return naive_datetime_to_timestamp(s, ts);
138 }
139
140 // without a timezone specifier as a local time, using ' ' as a
141 // separator, no fractional seconds
142 // Example: 2020-09-08 13:42:29
143 if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
144 return naive_datetime_to_timestamp(s, ts);
145 }
146
147 // Note we don't pass along the error message from the underlying
148 // chrono parsing because we tried several different format
149 // strings and we don't know which the user was trying to
150 // match. Ths any of the specific error messages is likely to be
151 // be more confusing than helpful
152 Err(DataFusionError::Execution(format!(
153 "Error parsing '{}' as timestamp",
154 s
155 )))
156 }
157
158 /// Converts the naive datetime (which has no specific timezone) to a
159 /// nanosecond epoch timestamp relative to UTC.
naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result<i64>160 fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result<i64> {
161 let l = Local {};
162
163 match l.from_local_datetime(&datetime) {
164 LocalResult::None => Err(DataFusionError::Execution(format!(
165 "Error parsing '{}' as timestamp: local time representation is invalid",
166 s
167 ))),
168 LocalResult::Single(local_datetime) => {
169 Ok(local_datetime.with_timezone(&Utc).timestamp_nanos())
170 }
171 // Ambiguous times can happen if the timestamp is exactly when
172 // a daylight savings time transition occurs, for example, and
173 // so the datetime could validly be said to be in two
174 // potential offsets. However, since we are about to convert
175 // to UTC anyways, we can pick one arbitrarily
176 LocalResult::Ambiguous(local_datetime, _) => {
177 Ok(local_datetime.with_timezone(&Utc).timestamp_nanos())
178 }
179 }
180 }
181
182 // given a function `op` that maps a `&str` to a Result of an arrow native type,
183 // returns a `PrimitiveArray` after the application
184 // of the function to `args[0]`.
185 /// # Errors
186 /// This function errors iff:
187 /// * the number of arguments is not 1 or
188 /// * the first argument is not castable to a `GenericStringArray` or
189 /// * the function `op` errors
unary_string_to_primitive_function<'a, T, O, F>( args: &[&'a dyn Array], op: F, name: &str, ) -> Result<PrimitiveArray<O>> where O: ArrowPrimitiveType, T: StringOffsetSizeTrait, F: Fn(&'a str) -> Result<O::Native>,190 pub(crate) fn unary_string_to_primitive_function<'a, T, O, F>(
191 args: &[&'a dyn Array],
192 op: F,
193 name: &str,
194 ) -> Result<PrimitiveArray<O>>
195 where
196 O: ArrowPrimitiveType,
197 T: StringOffsetSizeTrait,
198 F: Fn(&'a str) -> Result<O::Native>,
199 {
200 if args.len() != 1 {
201 return Err(DataFusionError::Internal(format!(
202 "{:?} args were supplied but {} takes exactly one argument",
203 args.len(),
204 name,
205 )));
206 }
207
208 let array = args[0]
209 .as_any()
210 .downcast_ref::<GenericStringArray<T>>()
211 .ok_or_else(|| {
212 DataFusionError::Internal("failed to downcast to string".to_string())
213 })?;
214
215 // first map is the iterator, second is for the `Option<_>`
216 array.iter().map(|x| x.map(|x| op(x)).transpose()).collect()
217 }
218
219 // given an function that maps a `&str` to a arrow native type,
220 // returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
221 // depending on the `args`'s variant.
handle<'a, O, F, S>( args: &'a [ColumnarValue], op: F, name: &str, ) -> Result<ColumnarValue> where O: ArrowPrimitiveType, S: ScalarType<O::Native>, F: Fn(&'a str) -> Result<O::Native>,222 fn handle<'a, O, F, S>(
223 args: &'a [ColumnarValue],
224 op: F,
225 name: &str,
226 ) -> Result<ColumnarValue>
227 where
228 O: ArrowPrimitiveType,
229 S: ScalarType<O::Native>,
230 F: Fn(&'a str) -> Result<O::Native>,
231 {
232 match &args[0] {
233 ColumnarValue::Array(a) => match a.data_type() {
234 DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
235 unary_string_to_primitive_function::<i32, O, _>(&[a.as_ref()], op, name)?,
236 ))),
237 DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
238 unary_string_to_primitive_function::<i64, O, _>(&[a.as_ref()], op, name)?,
239 ))),
240 other => Err(DataFusionError::Internal(format!(
241 "Unsupported data type {:?} for function {}",
242 other, name,
243 ))),
244 },
245 ColumnarValue::Scalar(scalar) => match scalar {
246 ScalarValue::Utf8(a) => {
247 let result = a.as_ref().map(|x| (op)(x)).transpose()?;
248 Ok(ColumnarValue::Scalar(S::scalar(result)))
249 }
250 ScalarValue::LargeUtf8(a) => {
251 let result = a.as_ref().map(|x| (op)(x)).transpose()?;
252 Ok(ColumnarValue::Scalar(S::scalar(result)))
253 }
254 other => Err(DataFusionError::Internal(format!(
255 "Unsupported data type {:?} for function {}",
256 other, name
257 ))),
258 },
259 }
260 }
261
262 /// to_timestamp SQL function
to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue>263 pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
264 handle::<TimestampNanosecondType, _, TimestampNanosecondType>(
265 args,
266 string_to_timestamp_nanos,
267 "to_timestamp",
268 )
269 }
270
date_trunc_single(granularity: &str, value: i64) -> Result<i64>271 fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
272 let value = timestamp_ns_to_datetime(value).with_nanosecond(0);
273 let value = match granularity {
274 "second" => value,
275 "minute" => value.and_then(|d| d.with_second(0)),
276 "hour" => value
277 .and_then(|d| d.with_second(0))
278 .and_then(|d| d.with_minute(0)),
279 "day" => value
280 .and_then(|d| d.with_second(0))
281 .and_then(|d| d.with_minute(0))
282 .and_then(|d| d.with_hour(0)),
283 "week" => value
284 .and_then(|d| d.with_second(0))
285 .and_then(|d| d.with_minute(0))
286 .and_then(|d| d.with_hour(0))
287 .map(|d| d - Duration::seconds(60 * 60 * 24 * d.weekday() as i64)),
288 "month" => value
289 .and_then(|d| d.with_second(0))
290 .and_then(|d| d.with_minute(0))
291 .and_then(|d| d.with_hour(0))
292 .and_then(|d| d.with_day0(0)),
293 "year" => value
294 .and_then(|d| d.with_second(0))
295 .and_then(|d| d.with_minute(0))
296 .and_then(|d| d.with_hour(0))
297 .and_then(|d| d.with_day0(0))
298 .and_then(|d| d.with_month0(0)),
299 unsupported => {
300 return Err(DataFusionError::Execution(format!(
301 "Unsupported date_trunc granularity: {}",
302 unsupported
303 )))
304 }
305 };
306 // `with_x(0)` are infalible because `0` are always a valid
307 Ok(value.unwrap().timestamp_nanos())
308 }
309
310 /// date_trunc SQL function
date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue>311 pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
312 let (granularity, array) = (&args[0], &args[1]);
313
314 let granularity =
315 if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = granularity {
316 v
317 } else {
318 return Err(DataFusionError::Execution(
319 "Granularity of `date_trunc` must be non-null scalar Utf8".to_string(),
320 ));
321 };
322
323 let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();
324
325 Ok(match array {
326 ColumnarValue::Scalar(scalar) => {
327 if let ScalarValue::TimeNanosecond(v) = scalar {
328 ColumnarValue::Scalar(ScalarValue::TimeNanosecond((f)(*v)?))
329 } else {
330 return Err(DataFusionError::Execution(
331 "array of `date_trunc` must be non-null scalar Utf8".to_string(),
332 ));
333 }
334 }
335 ColumnarValue::Array(array) => {
336 let array = array
337 .as_any()
338 .downcast_ref::<TimestampNanosecondArray>()
339 .unwrap();
340 let array = array
341 .iter()
342 .map(f)
343 .collect::<Result<TimestampNanosecondArray>>()?;
344
345 ColumnarValue::Array(Arc::new(array))
346 }
347 })
348 }
349
350 macro_rules! extract_date_part {
351 ($ARRAY: expr, $FN:expr) => {
352 match $ARRAY.data_type() {
353 DataType::Date32 => {
354 let array = $ARRAY.as_any().downcast_ref::<Date32Array>().unwrap();
355 Ok($FN(array)?)
356 }
357 DataType::Date64 => {
358 let array = $ARRAY.as_any().downcast_ref::<Date64Array>().unwrap();
359 Ok($FN(array)?)
360 }
361 DataType::Timestamp(time_unit, None) => match time_unit {
362 TimeUnit::Second => {
363 let array = $ARRAY
364 .as_any()
365 .downcast_ref::<TimestampSecondArray>()
366 .unwrap();
367 Ok($FN(array)?)
368 }
369 TimeUnit::Millisecond => {
370 let array = $ARRAY
371 .as_any()
372 .downcast_ref::<TimestampMillisecondArray>()
373 .unwrap();
374 Ok($FN(array)?)
375 }
376 TimeUnit::Microsecond => {
377 let array = $ARRAY
378 .as_any()
379 .downcast_ref::<TimestampMicrosecondArray>()
380 .unwrap();
381 Ok($FN(array)?)
382 }
383 TimeUnit::Nanosecond => {
384 let array = $ARRAY
385 .as_any()
386 .downcast_ref::<TimestampNanosecondArray>()
387 .unwrap();
388 Ok($FN(array)?)
389 }
390 },
391 datatype => Err(DataFusionError::Internal(format!(
392 "Extract does not support datatype {:?}",
393 datatype
394 ))),
395 }
396 };
397 }
398
399 /// DATE_PART SQL function
date_part(args: &[ColumnarValue]) -> Result<ColumnarValue>400 pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
401 if args.len() != 2 {
402 return Err(DataFusionError::Execution(
403 "Expected two arguments in DATE_PART".to_string(),
404 ));
405 }
406 let (date_part, array) = (&args[0], &args[1]);
407
408 let date_part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = date_part {
409 v
410 } else {
411 return Err(DataFusionError::Execution(
412 "First argument of `DATE_PART` must be non-null scalar Utf8".to_string(),
413 ));
414 };
415
416 let is_scalar = matches!(array, ColumnarValue::Scalar(_));
417
418 let array = match array {
419 ColumnarValue::Array(array) => array.clone(),
420 ColumnarValue::Scalar(scalar) => scalar.to_array(),
421 };
422
423 let arr = match date_part.to_lowercase().as_str() {
424 "hour" => extract_date_part!(array, temporal::hour),
425 "year" => extract_date_part!(array, temporal::year),
426 _ => Err(DataFusionError::Execution(format!(
427 "Date part '{}' not supported",
428 date_part
429 ))),
430 }?;
431
432 Ok(if is_scalar {
433 ColumnarValue::Scalar(ScalarValue::try_from_array(
434 &(Arc::new(arr) as ArrayRef),
435 0,
436 )?)
437 } else {
438 ColumnarValue::Array(Arc::new(arr))
439 })
440 }
441
442 #[cfg(test)]
443 mod tests {
444 use std::sync::Arc;
445
446 use arrow::array::{ArrayRef, Int64Array, StringBuilder};
447
448 use super::*;
449
450 #[test]
to_timestamp_arrays_and_nulls() -> Result<()>451 fn to_timestamp_arrays_and_nulls() -> Result<()> {
452 // ensure that arrow array implementation is wired up and handles nulls correctly
453
454 let mut string_builder = StringBuilder::new(2);
455 let mut ts_builder = TimestampNanosecondArray::builder(2);
456
457 string_builder.append_value("2020-09-08T13:42:29.190855Z")?;
458 ts_builder.append_value(1599572549190855000)?;
459
460 string_builder.append_null()?;
461 ts_builder.append_null()?;
462 let expected_timestamps = &ts_builder.finish() as &dyn Array;
463
464 let string_array =
465 ColumnarValue::Array(Arc::new(string_builder.finish()) as ArrayRef);
466 let parsed_timestamps = to_timestamp(&[string_array])
467 .expect("that to_timestamp parsed values without error");
468 if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
469 assert_eq!(parsed_array.len(), 2);
470 assert_eq!(expected_timestamps, parsed_array.as_ref());
471 } else {
472 panic!("Expected a columnar array")
473 }
474 Ok(())
475 }
476
477 #[test]
date_trunc_test()478 fn date_trunc_test() {
479 let cases = vec![
480 (
481 "2020-09-08T13:42:29.190855Z",
482 "second",
483 "2020-09-08T13:42:29.000000Z",
484 ),
485 (
486 "2020-09-08T13:42:29.190855Z",
487 "minute",
488 "2020-09-08T13:42:00.000000Z",
489 ),
490 (
491 "2020-09-08T13:42:29.190855Z",
492 "hour",
493 "2020-09-08T13:00:00.000000Z",
494 ),
495 (
496 "2020-09-08T13:42:29.190855Z",
497 "day",
498 "2020-09-08T00:00:00.000000Z",
499 ),
500 (
501 "2020-09-08T13:42:29.190855Z",
502 "week",
503 "2020-09-07T00:00:00.000000Z",
504 ),
505 (
506 "2020-09-08T13:42:29.190855Z",
507 "month",
508 "2020-09-01T00:00:00.000000Z",
509 ),
510 (
511 "2020-09-08T13:42:29.190855Z",
512 "year",
513 "2020-01-01T00:00:00.000000Z",
514 ),
515 (
516 "2021-01-01T13:42:29.190855Z",
517 "week",
518 "2020-12-28T00:00:00.000000Z",
519 ),
520 (
521 "2020-01-01T13:42:29.190855Z",
522 "week",
523 "2019-12-30T00:00:00.000000Z",
524 ),
525 ];
526
527 cases.iter().for_each(|(original, granularity, expected)| {
528 let original = string_to_timestamp_nanos(original).unwrap();
529 let expected = string_to_timestamp_nanos(expected).unwrap();
530 let result = date_trunc_single(granularity, original).unwrap();
531 assert_eq!(result, expected);
532 });
533 }
534
535 #[test]
to_timestamp_invalid_input_type() -> Result<()>536 fn to_timestamp_invalid_input_type() -> Result<()> {
537 // pass the wrong type of input array to to_timestamp and test
538 // that we get an error.
539
540 let mut builder = Int64Array::builder(1);
541 builder.append_value(1)?;
542 let int64array = ColumnarValue::Array(Arc::new(builder.finish()));
543
544 let expected_err =
545 "Internal error: Unsupported data type Int64 for function to_timestamp";
546 match to_timestamp(&[int64array]) {
547 Ok(_) => panic!("Expected error but got success"),
548 Err(e) => {
549 assert!(
550 e.to_string().contains(expected_err),
551 "Can not find expected error '{}'. Actual error '{}'",
552 expected_err,
553 e
554 );
555 }
556 }
557 Ok(())
558 }
559 }
560