1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 //! Crypto expressions
19 
20 use std::sync::Arc;
21 
22 use md5::Md5;
23 use sha2::{
24     digest::Output as SHA2DigestOutput, Digest as SHA2Digest, Sha224, Sha256, Sha384,
25     Sha512,
26 };
27 
28 use crate::{
29     error::{DataFusionError, Result},
30     scalar::ScalarValue,
31 };
32 use arrow::{
33     array::{Array, BinaryArray, GenericStringArray, StringOffsetSizeTrait},
34     datatypes::DataType,
35 };
36 
37 use super::{string_expressions::unary_string_function, ColumnarValue};
38 
39 /// Computes the md5 of a string.
md5_process(input: &str) -> String40 fn md5_process(input: &str) -> String {
41     let mut digest = Md5::default();
42     digest.update(&input);
43 
44     let mut result = String::new();
45 
46     for byte in &digest.finalize() {
47         result.push_str(&format!("{:02x}", byte));
48     }
49 
50     result
51 }
52 
53 // It's not possible to return &[u8], because trait in trait without short lifetime
sha_process<D: SHA2Digest + Default>(input: &str) -> SHA2DigestOutput<D>54 fn sha_process<D: SHA2Digest + Default>(input: &str) -> SHA2DigestOutput<D> {
55     let mut digest = D::default();
56     digest.update(&input);
57 
58     digest.finalize()
59 }
60 
61 /// # Errors
62 /// This function errors when:
63 /// * the number of arguments is not 1
64 /// * the first argument is not castable to a `GenericStringArray`
unary_binary_function<T, R, F>( args: &[&dyn Array], op: F, name: &str, ) -> Result<BinaryArray> where R: AsRef<[u8]>, T: StringOffsetSizeTrait, F: Fn(&str) -> R,65 fn unary_binary_function<T, R, F>(
66     args: &[&dyn Array],
67     op: F,
68     name: &str,
69 ) -> Result<BinaryArray>
70 where
71     R: AsRef<[u8]>,
72     T: StringOffsetSizeTrait,
73     F: Fn(&str) -> R,
74 {
75     if args.len() != 1 {
76         return Err(DataFusionError::Internal(format!(
77             "{:?} args were supplied but {} takes exactly one argument",
78             args.len(),
79             name,
80         )));
81     }
82 
83     let array = args[0]
84         .as_any()
85         .downcast_ref::<GenericStringArray<T>>()
86         .ok_or_else(|| {
87             DataFusionError::Internal("failed to downcast to string".to_string())
88         })?;
89 
90     // first map is the iterator, second is for the `Option<_>`
91     Ok(array.iter().map(|x| x.map(|x| op(x))).collect())
92 }
93 
handle<F, R>(args: &[ColumnarValue], op: F, name: &str) -> Result<ColumnarValue> where R: AsRef<[u8]>, F: Fn(&str) -> R,94 fn handle<F, R>(args: &[ColumnarValue], op: F, name: &str) -> Result<ColumnarValue>
95 where
96     R: AsRef<[u8]>,
97     F: Fn(&str) -> R,
98 {
99     match &args[0] {
100         ColumnarValue::Array(a) => match a.data_type() {
101             DataType::Utf8 => {
102                 Ok(ColumnarValue::Array(Arc::new(unary_binary_function::<
103                     i32,
104                     _,
105                     _,
106                 >(
107                     &[a.as_ref()], op, name
108                 )?)))
109             }
110             DataType::LargeUtf8 => {
111                 Ok(ColumnarValue::Array(Arc::new(unary_binary_function::<
112                     i64,
113                     _,
114                     _,
115                 >(
116                     &[a.as_ref()], op, name
117                 )?)))
118             }
119             other => Err(DataFusionError::Internal(format!(
120                 "Unsupported data type {:?} for function {}",
121                 other, name,
122             ))),
123         },
124         ColumnarValue::Scalar(scalar) => match scalar {
125             ScalarValue::Utf8(a) => {
126                 let result = a.as_ref().map(|x| (op)(x).as_ref().to_vec());
127                 Ok(ColumnarValue::Scalar(ScalarValue::Binary(result)))
128             }
129             ScalarValue::LargeUtf8(a) => {
130                 let result = a.as_ref().map(|x| (op)(x).as_ref().to_vec());
131                 Ok(ColumnarValue::Scalar(ScalarValue::Binary(result)))
132             }
133             other => Err(DataFusionError::Internal(format!(
134                 "Unsupported data type {:?} for function {}",
135                 other, name,
136             ))),
137         },
138     }
139 }
140 
md5_array<T: StringOffsetSizeTrait>( args: &[&dyn Array], ) -> Result<GenericStringArray<i32>>141 fn md5_array<T: StringOffsetSizeTrait>(
142     args: &[&dyn Array],
143 ) -> Result<GenericStringArray<i32>> {
144     unary_string_function::<T, i32, _, _>(args, md5_process, "md5")
145 }
146 
147 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
md5(args: &[ColumnarValue]) -> Result<ColumnarValue>148 pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
149     match &args[0] {
150         ColumnarValue::Array(a) => match a.data_type() {
151             DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(md5_array::<i32>(&[
152                 a.as_ref()
153             ])?))),
154             DataType::LargeUtf8 => {
155                 Ok(ColumnarValue::Array(Arc::new(md5_array::<i64>(&[
156                     a.as_ref()
157                 ])?)))
158             }
159             other => Err(DataFusionError::Internal(format!(
160                 "Unsupported data type {:?} for function md5",
161                 other,
162             ))),
163         },
164         ColumnarValue::Scalar(scalar) => match scalar {
165             ScalarValue::Utf8(a) => {
166                 let result = a.as_ref().map(|x| md5_process(x));
167                 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(result)))
168             }
169             ScalarValue::LargeUtf8(a) => {
170                 let result = a.as_ref().map(|x| md5_process(x));
171                 Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(result)))
172             }
173             other => Err(DataFusionError::Internal(format!(
174                 "Unsupported data type {:?} for function md5",
175                 other,
176             ))),
177         },
178     }
179 }
180 
181 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
sha224(args: &[ColumnarValue]) -> Result<ColumnarValue>182 pub fn sha224(args: &[ColumnarValue]) -> Result<ColumnarValue> {
183     handle(args, sha_process::<Sha224>, "ssh224")
184 }
185 
186 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
sha256(args: &[ColumnarValue]) -> Result<ColumnarValue>187 pub fn sha256(args: &[ColumnarValue]) -> Result<ColumnarValue> {
188     handle(args, sha_process::<Sha256>, "sha256")
189 }
190 
191 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
sha384(args: &[ColumnarValue]) -> Result<ColumnarValue>192 pub fn sha384(args: &[ColumnarValue]) -> Result<ColumnarValue> {
193     handle(args, sha_process::<Sha384>, "sha384")
194 }
195 
196 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
sha512(args: &[ColumnarValue]) -> Result<ColumnarValue>197 pub fn sha512(args: &[ColumnarValue]) -> Result<ColumnarValue> {
198     handle(args, sha_process::<Sha512>, "sha512")
199 }
200