1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 //! Crypto expressions
19
20 use std::sync::Arc;
21
22 use md5::Md5;
23 use sha2::{
24 digest::Output as SHA2DigestOutput, Digest as SHA2Digest, Sha224, Sha256, Sha384,
25 Sha512,
26 };
27
28 use crate::{
29 error::{DataFusionError, Result},
30 scalar::ScalarValue,
31 };
32 use arrow::{
33 array::{Array, BinaryArray, GenericStringArray, StringOffsetSizeTrait},
34 datatypes::DataType,
35 };
36
37 use super::{string_expressions::unary_string_function, ColumnarValue};
38
39 /// Computes the md5 of a string.
md5_process(input: &str) -> String40 fn md5_process(input: &str) -> String {
41 let mut digest = Md5::default();
42 digest.update(&input);
43
44 let mut result = String::new();
45
46 for byte in &digest.finalize() {
47 result.push_str(&format!("{:02x}", byte));
48 }
49
50 result
51 }
52
53 // It's not possible to return &[u8], because trait in trait without short lifetime
sha_process<D: SHA2Digest + Default>(input: &str) -> SHA2DigestOutput<D>54 fn sha_process<D: SHA2Digest + Default>(input: &str) -> SHA2DigestOutput<D> {
55 let mut digest = D::default();
56 digest.update(&input);
57
58 digest.finalize()
59 }
60
61 /// # Errors
62 /// This function errors when:
63 /// * the number of arguments is not 1
64 /// * the first argument is not castable to a `GenericStringArray`
unary_binary_function<T, R, F>( args: &[&dyn Array], op: F, name: &str, ) -> Result<BinaryArray> where R: AsRef<[u8]>, T: StringOffsetSizeTrait, F: Fn(&str) -> R,65 fn unary_binary_function<T, R, F>(
66 args: &[&dyn Array],
67 op: F,
68 name: &str,
69 ) -> Result<BinaryArray>
70 where
71 R: AsRef<[u8]>,
72 T: StringOffsetSizeTrait,
73 F: Fn(&str) -> R,
74 {
75 if args.len() != 1 {
76 return Err(DataFusionError::Internal(format!(
77 "{:?} args were supplied but {} takes exactly one argument",
78 args.len(),
79 name,
80 )));
81 }
82
83 let array = args[0]
84 .as_any()
85 .downcast_ref::<GenericStringArray<T>>()
86 .ok_or_else(|| {
87 DataFusionError::Internal("failed to downcast to string".to_string())
88 })?;
89
90 // first map is the iterator, second is for the `Option<_>`
91 Ok(array.iter().map(|x| x.map(|x| op(x))).collect())
92 }
93
handle<F, R>(args: &[ColumnarValue], op: F, name: &str) -> Result<ColumnarValue> where R: AsRef<[u8]>, F: Fn(&str) -> R,94 fn handle<F, R>(args: &[ColumnarValue], op: F, name: &str) -> Result<ColumnarValue>
95 where
96 R: AsRef<[u8]>,
97 F: Fn(&str) -> R,
98 {
99 match &args[0] {
100 ColumnarValue::Array(a) => match a.data_type() {
101 DataType::Utf8 => {
102 Ok(ColumnarValue::Array(Arc::new(unary_binary_function::<
103 i32,
104 _,
105 _,
106 >(
107 &[a.as_ref()], op, name
108 )?)))
109 }
110 DataType::LargeUtf8 => {
111 Ok(ColumnarValue::Array(Arc::new(unary_binary_function::<
112 i64,
113 _,
114 _,
115 >(
116 &[a.as_ref()], op, name
117 )?)))
118 }
119 other => Err(DataFusionError::Internal(format!(
120 "Unsupported data type {:?} for function {}",
121 other, name,
122 ))),
123 },
124 ColumnarValue::Scalar(scalar) => match scalar {
125 ScalarValue::Utf8(a) => {
126 let result = a.as_ref().map(|x| (op)(x).as_ref().to_vec());
127 Ok(ColumnarValue::Scalar(ScalarValue::Binary(result)))
128 }
129 ScalarValue::LargeUtf8(a) => {
130 let result = a.as_ref().map(|x| (op)(x).as_ref().to_vec());
131 Ok(ColumnarValue::Scalar(ScalarValue::Binary(result)))
132 }
133 other => Err(DataFusionError::Internal(format!(
134 "Unsupported data type {:?} for function {}",
135 other, name,
136 ))),
137 },
138 }
139 }
140
md5_array<T: StringOffsetSizeTrait>( args: &[&dyn Array], ) -> Result<GenericStringArray<i32>>141 fn md5_array<T: StringOffsetSizeTrait>(
142 args: &[&dyn Array],
143 ) -> Result<GenericStringArray<i32>> {
144 unary_string_function::<T, i32, _, _>(args, md5_process, "md5")
145 }
146
147 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
md5(args: &[ColumnarValue]) -> Result<ColumnarValue>148 pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
149 match &args[0] {
150 ColumnarValue::Array(a) => match a.data_type() {
151 DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(md5_array::<i32>(&[
152 a.as_ref()
153 ])?))),
154 DataType::LargeUtf8 => {
155 Ok(ColumnarValue::Array(Arc::new(md5_array::<i64>(&[
156 a.as_ref()
157 ])?)))
158 }
159 other => Err(DataFusionError::Internal(format!(
160 "Unsupported data type {:?} for function md5",
161 other,
162 ))),
163 },
164 ColumnarValue::Scalar(scalar) => match scalar {
165 ScalarValue::Utf8(a) => {
166 let result = a.as_ref().map(|x| md5_process(x));
167 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(result)))
168 }
169 ScalarValue::LargeUtf8(a) => {
170 let result = a.as_ref().map(|x| md5_process(x));
171 Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(result)))
172 }
173 other => Err(DataFusionError::Internal(format!(
174 "Unsupported data type {:?} for function md5",
175 other,
176 ))),
177 },
178 }
179 }
180
181 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
sha224(args: &[ColumnarValue]) -> Result<ColumnarValue>182 pub fn sha224(args: &[ColumnarValue]) -> Result<ColumnarValue> {
183 handle(args, sha_process::<Sha224>, "ssh224")
184 }
185
186 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
sha256(args: &[ColumnarValue]) -> Result<ColumnarValue>187 pub fn sha256(args: &[ColumnarValue]) -> Result<ColumnarValue> {
188 handle(args, sha_process::<Sha256>, "sha256")
189 }
190
191 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
sha384(args: &[ColumnarValue]) -> Result<ColumnarValue>192 pub fn sha384(args: &[ColumnarValue]) -> Result<ColumnarValue> {
193 handle(args, sha_process::<Sha384>, "sha384")
194 }
195
196 /// crypto function that accepts Utf8 or LargeUtf8 and returns a [`ColumnarValue`]
sha512(args: &[ColumnarValue]) -> Result<ColumnarValue>197 pub fn sha512(args: &[ColumnarValue]) -> Result<ColumnarValue> {
198 handle(args, sha_process::<Sha512>, "sha512")
199 }
200