1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 //! DateTime expressions
19 use std::sync::Arc;
20 
21 use super::ColumnarValue;
22 use crate::{
23     error::{DataFusionError, Result},
24     scalar::{ScalarType, ScalarValue},
25 };
26 use arrow::{
27     array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
28     datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType},
29 };
30 use arrow::{
31     array::{
32         Date32Array, Date64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
33         TimestampNanosecondArray, TimestampSecondArray,
34     },
35     compute::kernels::temporal,
36     datatypes::TimeUnit,
37     temporal_conversions::timestamp_ns_to_datetime,
38 };
39 use chrono::prelude::*;
40 use chrono::Duration;
41 use chrono::LocalResult;
42 
43 #[inline]
44 /// Accepts a string in RFC3339 / ISO8601 standard format and some
45 /// variants and converts it to a nanosecond precision timestamp.
46 ///
47 /// Implements the `to_timestamp` function to convert a string to a
48 /// timestamp, following the model of spark SQL’s to_`timestamp`.
49 ///
50 /// In addition to RFC3339 / ISO8601 standard timestamps, it also
51 /// accepts strings that use a space ` ` to separate the date and time
52 /// as well as strings that have no explicit timezone offset.
53 ///
54 /// Examples of accepted inputs:
55 /// * `1997-01-31T09:26:56.123Z`        # RCF3339
56 /// * `1997-01-31T09:26:56.123-05:00`   # RCF3339
57 /// * `1997-01-31 09:26:56.123-05:00`   # close to RCF3339 but with a space rather than T
58 /// * `1997-01-31T09:26:56.123`         # close to RCF3339 but no timezone offset specified
59 /// * `1997-01-31 09:26:56.123`         # close to RCF3339 but uses a space and no timezone offset
60 /// * `1997-01-31 09:26:56`             # close to RCF3339, no fractional seconds
61 //
62 /// Internally, this function uses the `chrono` library for the
63 /// datetime parsing
64 ///
65 /// We hope to extend this function in the future with a second
66 /// parameter to specifying the format string.
67 ///
68 /// ## Timestamp Precision
69 ///
70 /// DataFusion uses the maximum precision timestamps supported by
71 /// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This
72 /// means the range of dates that timestamps can represent is ~1677 AD
73 /// to 2262 AM
74 ///
75 ///
76 /// ## Timezone / Offset Handling
77 ///
78 /// By using the Arrow format, DataFusion inherits Arrow’s handling of
79 /// timestamp values. Specifically, the stored numerical values of
80 /// timestamps are stored compared to offset UTC.
81 ///
82 /// This function intertprets strings without an explicit time zone as
83 /// timestamps with offsets of the local time on the machine that ran
84 /// the datafusion query
85 ///
86 /// For example, `1997-01-31 09:26:56.123Z` is interpreted as UTC, as
87 /// it has an explicit timezone specifier (“Z” for Zulu/UTC)
88 ///
89 /// `1997-01-31T09:26:56.123` is interpreted as a local timestamp in
90 /// the timezone of the machine that ran DataFusion. For example, if
91 /// the system timezone is set to Americas/New_York (UTC-5) the
92 /// timestamp will be interpreted as though it were
93 /// `1997-01-31T09:26:56.123-05:00`
string_to_timestamp_nanos(s: &str) -> Result<i64>94 fn string_to_timestamp_nanos(s: &str) -> Result<i64> {
95     // Fast path:  RFC3339 timestamp (with a T)
96     // Example: 2020-09-08T13:42:29.190855Z
97     if let Ok(ts) = DateTime::parse_from_rfc3339(s) {
98         return Ok(ts.timestamp_nanos());
99     }
100 
101     // Implement quasi-RFC3339 support by trying to parse the
102     // timestamp with various other format specifiers to to support
103     // separating the date and time with a space ' ' rather than 'T' to be
104     // (more) compatible with Apache Spark SQL
105 
106     // timezone offset, using ' ' as a separator
107     // Example: 2020-09-08 13:42:29.190855-05:00
108     if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") {
109         return Ok(ts.timestamp_nanos());
110     }
111 
112     // with an explicit Z, using ' ' as a separator
113     // Example: 2020-09-08 13:42:29Z
114     if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") {
115         return Ok(ts.timestamp_nanos());
116     }
117 
118     // Support timestamps without an explicit timezone offset, again
119     // to be compatible with what Apache Spark SQL does.
120 
121     // without a timezone specifier as a local time, using T as a separator
122     // Example: 2020-09-08T13:42:29.190855
123     if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f") {
124         return naive_datetime_to_timestamp(s, ts);
125     }
126 
127     // without a timezone specifier as a local time, using T as a
128     // separator, no fractional seconds
129     // Example: 2020-09-08T13:42:29
130     if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
131         return naive_datetime_to_timestamp(s, ts);
132     }
133 
134     // without a timezone specifier as a local time, using ' ' as a separator
135     // Example: 2020-09-08 13:42:29.190855
136     if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S.%f") {
137         return naive_datetime_to_timestamp(s, ts);
138     }
139 
140     // without a timezone specifier as a local time, using ' ' as a
141     // separator, no fractional seconds
142     // Example: 2020-09-08 13:42:29
143     if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
144         return naive_datetime_to_timestamp(s, ts);
145     }
146 
147     // Note we don't pass along the error message from the underlying
148     // chrono parsing because we tried several different format
149     // strings and we don't know which the user was trying to
150     // match. Ths any of the specific error messages is likely to be
151     // be more confusing than helpful
152     Err(DataFusionError::Execution(format!(
153         "Error parsing '{}' as timestamp",
154         s
155     )))
156 }
157 
158 /// Converts the naive datetime (which has no specific timezone) to a
159 /// nanosecond epoch timestamp relative to UTC.
naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result<i64>160 fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result<i64> {
161     let l = Local {};
162 
163     match l.from_local_datetime(&datetime) {
164         LocalResult::None => Err(DataFusionError::Execution(format!(
165             "Error parsing '{}' as timestamp: local time representation is invalid",
166             s
167         ))),
168         LocalResult::Single(local_datetime) => {
169             Ok(local_datetime.with_timezone(&Utc).timestamp_nanos())
170         }
171         // Ambiguous times can happen if the timestamp is exactly when
172         // a daylight savings time transition occurs, for example, and
173         // so the datetime could validly be said to be in two
174         // potential offsets. However, since we are about to convert
175         // to UTC anyways, we can pick one arbitrarily
176         LocalResult::Ambiguous(local_datetime, _) => {
177             Ok(local_datetime.with_timezone(&Utc).timestamp_nanos())
178         }
179     }
180 }
181 
182 // given a function `op` that maps a `&str` to a Result of an arrow native type,
183 // returns a `PrimitiveArray` after the application
184 // of the function to `args[0]`.
185 /// # Errors
186 /// This function errors iff:
187 /// * the number of arguments is not 1 or
188 /// * the first argument is not castable to a `GenericStringArray` or
189 /// * the function `op` errors
unary_string_to_primitive_function<'a, T, O, F>( args: &[&'a dyn Array], op: F, name: &str, ) -> Result<PrimitiveArray<O>> where O: ArrowPrimitiveType, T: StringOffsetSizeTrait, F: Fn(&'a str) -> Result<O::Native>,190 pub(crate) fn unary_string_to_primitive_function<'a, T, O, F>(
191     args: &[&'a dyn Array],
192     op: F,
193     name: &str,
194 ) -> Result<PrimitiveArray<O>>
195 where
196     O: ArrowPrimitiveType,
197     T: StringOffsetSizeTrait,
198     F: Fn(&'a str) -> Result<O::Native>,
199 {
200     if args.len() != 1 {
201         return Err(DataFusionError::Internal(format!(
202             "{:?} args were supplied but {} takes exactly one argument",
203             args.len(),
204             name,
205         )));
206     }
207 
208     let array = args[0]
209         .as_any()
210         .downcast_ref::<GenericStringArray<T>>()
211         .ok_or_else(|| {
212             DataFusionError::Internal("failed to downcast to string".to_string())
213         })?;
214 
215     // first map is the iterator, second is for the `Option<_>`
216     array.iter().map(|x| x.map(|x| op(x)).transpose()).collect()
217 }
218 
219 // given an function that maps a `&str` to a arrow native type,
220 // returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
221 // depending on the `args`'s variant.
handle<'a, O, F, S>( args: &'a [ColumnarValue], op: F, name: &str, ) -> Result<ColumnarValue> where O: ArrowPrimitiveType, S: ScalarType<O::Native>, F: Fn(&'a str) -> Result<O::Native>,222 fn handle<'a, O, F, S>(
223     args: &'a [ColumnarValue],
224     op: F,
225     name: &str,
226 ) -> Result<ColumnarValue>
227 where
228     O: ArrowPrimitiveType,
229     S: ScalarType<O::Native>,
230     F: Fn(&'a str) -> Result<O::Native>,
231 {
232     match &args[0] {
233         ColumnarValue::Array(a) => match a.data_type() {
234             DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
235                 unary_string_to_primitive_function::<i32, O, _>(&[a.as_ref()], op, name)?,
236             ))),
237             DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
238                 unary_string_to_primitive_function::<i64, O, _>(&[a.as_ref()], op, name)?,
239             ))),
240             other => Err(DataFusionError::Internal(format!(
241                 "Unsupported data type {:?} for function {}",
242                 other, name,
243             ))),
244         },
245         ColumnarValue::Scalar(scalar) => match scalar {
246             ScalarValue::Utf8(a) => {
247                 let result = a.as_ref().map(|x| (op)(x)).transpose()?;
248                 Ok(ColumnarValue::Scalar(S::scalar(result)))
249             }
250             ScalarValue::LargeUtf8(a) => {
251                 let result = a.as_ref().map(|x| (op)(x)).transpose()?;
252                 Ok(ColumnarValue::Scalar(S::scalar(result)))
253             }
254             other => Err(DataFusionError::Internal(format!(
255                 "Unsupported data type {:?} for function {}",
256                 other, name
257             ))),
258         },
259     }
260 }
261 
262 /// to_timestamp SQL function
to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue>263 pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
264     handle::<TimestampNanosecondType, _, TimestampNanosecondType>(
265         args,
266         string_to_timestamp_nanos,
267         "to_timestamp",
268     )
269 }
270 
date_trunc_single(granularity: &str, value: i64) -> Result<i64>271 fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
272     let value = timestamp_ns_to_datetime(value).with_nanosecond(0);
273     let value = match granularity {
274         "second" => value,
275         "minute" => value.and_then(|d| d.with_second(0)),
276         "hour" => value
277             .and_then(|d| d.with_second(0))
278             .and_then(|d| d.with_minute(0)),
279         "day" => value
280             .and_then(|d| d.with_second(0))
281             .and_then(|d| d.with_minute(0))
282             .and_then(|d| d.with_hour(0)),
283         "week" => value
284             .and_then(|d| d.with_second(0))
285             .and_then(|d| d.with_minute(0))
286             .and_then(|d| d.with_hour(0))
287             .map(|d| d - Duration::seconds(60 * 60 * 24 * d.weekday() as i64)),
288         "month" => value
289             .and_then(|d| d.with_second(0))
290             .and_then(|d| d.with_minute(0))
291             .and_then(|d| d.with_hour(0))
292             .and_then(|d| d.with_day0(0)),
293         "year" => value
294             .and_then(|d| d.with_second(0))
295             .and_then(|d| d.with_minute(0))
296             .and_then(|d| d.with_hour(0))
297             .and_then(|d| d.with_day0(0))
298             .and_then(|d| d.with_month0(0)),
299         unsupported => {
300             return Err(DataFusionError::Execution(format!(
301                 "Unsupported date_trunc granularity: {}",
302                 unsupported
303             )))
304         }
305     };
306     // `with_x(0)` are infalible because `0` are always a valid
307     Ok(value.unwrap().timestamp_nanos())
308 }
309 
310 /// date_trunc SQL function
date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue>311 pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
312     let (granularity, array) = (&args[0], &args[1]);
313 
314     let granularity =
315         if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = granularity {
316             v
317         } else {
318             return Err(DataFusionError::Execution(
319                 "Granularity of `date_trunc` must be non-null scalar Utf8".to_string(),
320             ));
321         };
322 
323     let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();
324 
325     Ok(match array {
326         ColumnarValue::Scalar(scalar) => {
327             if let ScalarValue::TimeNanosecond(v) = scalar {
328                 ColumnarValue::Scalar(ScalarValue::TimeNanosecond((f)(*v)?))
329             } else {
330                 return Err(DataFusionError::Execution(
331                     "array of `date_trunc` must be non-null scalar Utf8".to_string(),
332                 ));
333             }
334         }
335         ColumnarValue::Array(array) => {
336             let array = array
337                 .as_any()
338                 .downcast_ref::<TimestampNanosecondArray>()
339                 .unwrap();
340             let array = array
341                 .iter()
342                 .map(f)
343                 .collect::<Result<TimestampNanosecondArray>>()?;
344 
345             ColumnarValue::Array(Arc::new(array))
346         }
347     })
348 }
349 
350 macro_rules! extract_date_part {
351     ($ARRAY: expr, $FN:expr) => {
352         match $ARRAY.data_type() {
353             DataType::Date32 => {
354                 let array = $ARRAY.as_any().downcast_ref::<Date32Array>().unwrap();
355                 Ok($FN(array)?)
356             }
357             DataType::Date64 => {
358                 let array = $ARRAY.as_any().downcast_ref::<Date64Array>().unwrap();
359                 Ok($FN(array)?)
360             }
361             DataType::Timestamp(time_unit, None) => match time_unit {
362                 TimeUnit::Second => {
363                     let array = $ARRAY
364                         .as_any()
365                         .downcast_ref::<TimestampSecondArray>()
366                         .unwrap();
367                     Ok($FN(array)?)
368                 }
369                 TimeUnit::Millisecond => {
370                     let array = $ARRAY
371                         .as_any()
372                         .downcast_ref::<TimestampMillisecondArray>()
373                         .unwrap();
374                     Ok($FN(array)?)
375                 }
376                 TimeUnit::Microsecond => {
377                     let array = $ARRAY
378                         .as_any()
379                         .downcast_ref::<TimestampMicrosecondArray>()
380                         .unwrap();
381                     Ok($FN(array)?)
382                 }
383                 TimeUnit::Nanosecond => {
384                     let array = $ARRAY
385                         .as_any()
386                         .downcast_ref::<TimestampNanosecondArray>()
387                         .unwrap();
388                     Ok($FN(array)?)
389                 }
390             },
391             datatype => Err(DataFusionError::Internal(format!(
392                 "Extract does not support datatype {:?}",
393                 datatype
394             ))),
395         }
396     };
397 }
398 
399 /// DATE_PART SQL function
date_part(args: &[ColumnarValue]) -> Result<ColumnarValue>400 pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
401     if args.len() != 2 {
402         return Err(DataFusionError::Execution(
403             "Expected two arguments in DATE_PART".to_string(),
404         ));
405     }
406     let (date_part, array) = (&args[0], &args[1]);
407 
408     let date_part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = date_part {
409         v
410     } else {
411         return Err(DataFusionError::Execution(
412             "First argument of `DATE_PART` must be non-null scalar Utf8".to_string(),
413         ));
414     };
415 
416     let is_scalar = matches!(array, ColumnarValue::Scalar(_));
417 
418     let array = match array {
419         ColumnarValue::Array(array) => array.clone(),
420         ColumnarValue::Scalar(scalar) => scalar.to_array(),
421     };
422 
423     let arr = match date_part.to_lowercase().as_str() {
424         "hour" => extract_date_part!(array, temporal::hour),
425         "year" => extract_date_part!(array, temporal::year),
426         _ => Err(DataFusionError::Execution(format!(
427             "Date part '{}' not supported",
428             date_part
429         ))),
430     }?;
431 
432     Ok(if is_scalar {
433         ColumnarValue::Scalar(ScalarValue::try_from_array(
434             &(Arc::new(arr) as ArrayRef),
435             0,
436         )?)
437     } else {
438         ColumnarValue::Array(Arc::new(arr))
439     })
440 }
441 
442 #[cfg(test)]
443 mod tests {
444     use std::sync::Arc;
445 
446     use arrow::array::{ArrayRef, Int64Array, StringBuilder};
447 
448     use super::*;
449 
450     #[test]
to_timestamp_arrays_and_nulls() -> Result<()>451     fn to_timestamp_arrays_and_nulls() -> Result<()> {
452         // ensure that arrow array implementation is wired up and handles nulls correctly
453 
454         let mut string_builder = StringBuilder::new(2);
455         let mut ts_builder = TimestampNanosecondArray::builder(2);
456 
457         string_builder.append_value("2020-09-08T13:42:29.190855Z")?;
458         ts_builder.append_value(1599572549190855000)?;
459 
460         string_builder.append_null()?;
461         ts_builder.append_null()?;
462         let expected_timestamps = &ts_builder.finish() as &dyn Array;
463 
464         let string_array =
465             ColumnarValue::Array(Arc::new(string_builder.finish()) as ArrayRef);
466         let parsed_timestamps = to_timestamp(&[string_array])
467             .expect("that to_timestamp parsed values without error");
468         if let ColumnarValue::Array(parsed_array) = parsed_timestamps {
469             assert_eq!(parsed_array.len(), 2);
470             assert_eq!(expected_timestamps, parsed_array.as_ref());
471         } else {
472             panic!("Expected a columnar array")
473         }
474         Ok(())
475     }
476 
477     #[test]
date_trunc_test()478     fn date_trunc_test() {
479         let cases = vec![
480             (
481                 "2020-09-08T13:42:29.190855Z",
482                 "second",
483                 "2020-09-08T13:42:29.000000Z",
484             ),
485             (
486                 "2020-09-08T13:42:29.190855Z",
487                 "minute",
488                 "2020-09-08T13:42:00.000000Z",
489             ),
490             (
491                 "2020-09-08T13:42:29.190855Z",
492                 "hour",
493                 "2020-09-08T13:00:00.000000Z",
494             ),
495             (
496                 "2020-09-08T13:42:29.190855Z",
497                 "day",
498                 "2020-09-08T00:00:00.000000Z",
499             ),
500             (
501                 "2020-09-08T13:42:29.190855Z",
502                 "week",
503                 "2020-09-07T00:00:00.000000Z",
504             ),
505             (
506                 "2020-09-08T13:42:29.190855Z",
507                 "month",
508                 "2020-09-01T00:00:00.000000Z",
509             ),
510             (
511                 "2020-09-08T13:42:29.190855Z",
512                 "year",
513                 "2020-01-01T00:00:00.000000Z",
514             ),
515             (
516                 "2021-01-01T13:42:29.190855Z",
517                 "week",
518                 "2020-12-28T00:00:00.000000Z",
519             ),
520             (
521                 "2020-01-01T13:42:29.190855Z",
522                 "week",
523                 "2019-12-30T00:00:00.000000Z",
524             ),
525         ];
526 
527         cases.iter().for_each(|(original, granularity, expected)| {
528             let original = string_to_timestamp_nanos(original).unwrap();
529             let expected = string_to_timestamp_nanos(expected).unwrap();
530             let result = date_trunc_single(granularity, original).unwrap();
531             assert_eq!(result, expected);
532         });
533     }
534 
535     #[test]
to_timestamp_invalid_input_type() -> Result<()>536     fn to_timestamp_invalid_input_type() -> Result<()> {
537         // pass the wrong type of input array to to_timestamp and test
538         // that we get an error.
539 
540         let mut builder = Int64Array::builder(1);
541         builder.append_value(1)?;
542         let int64array = ColumnarValue::Array(Arc::new(builder.finish()));
543 
544         let expected_err =
545             "Internal error: Unsupported data type Int64 for function to_timestamp";
546         match to_timestamp(&[int64array]) {
547             Ok(_) => panic!("Expected error but got success"),
548             Err(e) => {
549                 assert!(
550                     e.to_string().contains(expected_err),
551                     "Can not find expected error '{}'. Actual error '{}'",
552                     expected_err,
553                     e
554                 );
555             }
556         }
557         Ok(())
558     }
559 }
560