1 use super::{LogEvent, Value};
2 use std::{
3 collections::BTreeMap,
4 hash::{Hash, Hasher},
5 };
6 use string_cache::DefaultAtom as Atom;
7
8 // TODO: if we had `Value` implement `Eq` and `Hash`, the implementation here
9 // would be much easier. The issue is with `f64` type. We should consider using
10 // a newtype for `f64` there that'd implement `Eq` and `Hash` is it's safe, for
11 // example `NormalF64`, and guard the values with `val.is_normal() == true`
12 // invariant.
13 // See also: https://internals.rust-lang.org/t/f32-f64-should-implement-hash/5436/32
14
15 /// An event discriminant identifies a distinguishable subset of events.
16 /// Intended for disecting streams of events to substreams, for instance to
17 /// be able to allocate a buffer per substream.
18 /// Implements `PartialEq`, `Eq` and `Hash` to enable use as a `HashMap` key.
19 #[derive(Debug, Clone)]
20 pub struct Discriminant {
21 values: Vec<Option<Value>>,
22 }
23
24 impl Discriminant {
25 /// Create a new Discriminant from the `LogEvent` and an ordered slice of
26 /// fields to include into a discriminant value.
from_log_event(event: &LogEvent, discriminant_fields: &[Atom]) -> Self27 pub fn from_log_event(event: &LogEvent, discriminant_fields: &[Atom]) -> Self {
28 let values: Vec<Option<Value>> = discriminant_fields
29 .iter()
30 .map(|discriminant_field| event.get(discriminant_field).cloned())
31 .collect();
32 Self { values }
33 }
34 }
35
36 impl PartialEq for Discriminant {
eq(&self, other: &Self) -> bool37 fn eq(&self, other: &Self) -> bool {
38 self.values
39 .iter()
40 .zip(other.values.iter())
41 .all(|(this, other)| match (this, other) {
42 (None, None) => true,
43 (Some(this), Some(other)) => value_eq(this, other),
44 _ => false,
45 })
46 }
47 }
48
49 impl Eq for Discriminant {}
50
51 // Equality check for for discriminant purposes.
value_eq(this: &Value, other: &Value) -> bool52 fn value_eq(this: &Value, other: &Value) -> bool {
53 match (this, other) {
54 // Trivial.
55 (Value::Bytes(this), Value::Bytes(other)) => this.eq(other),
56 (Value::Boolean(this), Value::Boolean(other)) => this.eq(other),
57 (Value::Integer(this), Value::Integer(other)) => this.eq(other),
58 (Value::Timestamp(this), Value::Timestamp(other)) => this.eq(other),
59 (Value::Null, Value::Null) => true,
60 // Non-trivial.
61 (Value::Float(this), Value::Float(other)) => f64_eq(this, other),
62 (Value::Array(this), Value::Array(other)) => array_eq(this, other),
63 (Value::Map(this), Value::Map(other)) => map_eq(this, other),
64 // Type mismatch.
65 _ => false,
66 }
67 }
68
69 // Does an f64 comparison that is suitable for discriminant purposes.
f64_eq(this: &f64, other: &f64) -> bool70 fn f64_eq(this: &f64, other: &f64) -> bool {
71 if this.is_nan() && other.is_nan() {
72 return true;
73 }
74 if this != other {
75 return false;
76 };
77 if (this.is_sign_positive() && other.is_sign_negative())
78 || (this.is_sign_negative() && other.is_sign_positive())
79 {
80 return false;
81 }
82 true
83 }
84
array_eq(this: &Vec<Value>, other: &Vec<Value>) -> bool85 fn array_eq(this: &Vec<Value>, other: &Vec<Value>) -> bool {
86 if this.len() != other.len() {
87 return false;
88 }
89
90 this.iter()
91 .zip(other.iter())
92 .all(|(first, second)| value_eq(first, second))
93 }
94
map_eq(this: &BTreeMap<String, Value>, other: &BTreeMap<String, Value>) -> bool95 fn map_eq(this: &BTreeMap<String, Value>, other: &BTreeMap<String, Value>) -> bool {
96 if this.len() != other.len() {
97 return false;
98 }
99
100 this.iter()
101 .zip(other.iter())
102 .all(|((key1, value1), (key2, value2))| key1 == key2 && value_eq(value1, value2))
103 }
104
105 impl Hash for Discriminant {
hash<H: Hasher>(&self, state: &mut H)106 fn hash<H: Hasher>(&self, state: &mut H) {
107 for value in &self.values {
108 match value {
109 Some(value) => {
110 state.write_u8(1);
111 hash_value(state, value);
112 }
113 None => state.write_u8(0),
114 }
115 }
116 }
117 }
118
119 // Hashes value for discriminant purposes.
hash_value<H: Hasher>(hasher: &mut H, value: &Value)120 fn hash_value<H: Hasher>(hasher: &mut H, value: &Value) {
121 match value {
122 // Trivial.
123 Value::Bytes(val) => val.hash(hasher),
124 Value::Boolean(val) => val.hash(hasher),
125 Value::Integer(val) => val.hash(hasher),
126 Value::Timestamp(val) => val.hash(hasher),
127 // Non-trivial.
128 Value::Float(val) => hash_f64(hasher, val),
129 Value::Array(val) => hash_array(hasher, val),
130 Value::Map(val) => hash_map(hasher, val),
131 Value::Null => hash_null(hasher),
132 }
133 }
134
135 // Does f64 hashing that is suitable for discriminant purposes.
hash_f64<H: Hasher>(hasher: &mut H, value: &f64)136 fn hash_f64<H: Hasher>(hasher: &mut H, value: &f64) {
137 hasher.write(&value.to_ne_bytes());
138 }
139
hash_array<H: Hasher>(hasher: &mut H, array: &Vec<Value>)140 fn hash_array<H: Hasher>(hasher: &mut H, array: &Vec<Value>) {
141 for val in array.iter() {
142 hash_value(hasher, val);
143 }
144 }
145
hash_map<H: Hasher>(hasher: &mut H, map: &BTreeMap<String, Value>)146 fn hash_map<H: Hasher>(hasher: &mut H, map: &BTreeMap<String, Value>) {
147 for (key, val) in map.iter() {
148 hasher.write(key.as_bytes());
149 hash_value(hasher, val);
150 }
151 }
152
hash_null<H: Hasher>(hasher: &mut H)153 fn hash_null<H: Hasher>(hasher: &mut H) {
154 hasher.write_u8(0);
155 }
156
157 #[cfg(test)]
158 mod tests {
159 use super::*;
160 use crate::event::LogEvent;
161 use std::collections::{hash_map::DefaultHasher, HashMap};
162
hash<H: Hash>(hash: H) -> u64163 fn hash<H: Hash>(hash: H) -> u64 {
164 let mut hasher = DefaultHasher::new();
165 hash.hash(&mut hasher);
166 hasher.finish()
167 }
168
169 #[test]
equal()170 fn equal() {
171 let mut event_1 = LogEvent::default();
172 event_1.insert("hostname", "localhost");
173 event_1.insert("irrelevant", "not even used");
174 let mut event_2 = event_1.clone();
175 event_2.insert("irrelevant", "does not matter if it's different");
176
177 let discriminant_fields = vec![Atom::from("hostname"), Atom::from("container_id")];
178
179 let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
180 let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
181
182 assert_eq!(discriminant_1, discriminant_2);
183 assert_eq!(hash(discriminant_1), hash(discriminant_2));
184 }
185
186 #[test]
not_equal()187 fn not_equal() {
188 let mut event_1 = LogEvent::default();
189 event_1.insert("hostname", "localhost");
190 event_1.insert("container_id", "abc");
191 let mut event_2 = event_1.clone();
192 event_2.insert("container_id", "def");
193
194 let discriminant_fields = vec![Atom::from("hostname"), Atom::from("container_id")];
195
196 let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
197 let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
198
199 assert_ne!(discriminant_1, discriminant_2);
200 assert_ne!(hash(discriminant_1), hash(discriminant_2));
201 }
202
203 #[test]
field_order()204 fn field_order() {
205 let mut event_1 = LogEvent::default();
206 event_1.insert("a", "a");
207 event_1.insert("b", "b");
208 let mut event_2 = LogEvent::default();
209 event_2.insert("b", "b");
210 event_2.insert("a", "a");
211
212 let discriminant_fields = vec![Atom::from("a"), Atom::from("b")];
213
214 let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
215 let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
216
217 assert_eq!(discriminant_1, discriminant_2);
218 assert_eq!(hash(discriminant_1), hash(discriminant_2));
219 }
220
221 #[test]
map_values_key_order()222 fn map_values_key_order() {
223 let mut event_1 = LogEvent::default();
224 event_1.insert("nested.a", "a");
225 event_1.insert("nested.b", "b");
226 let mut event_2 = LogEvent::default();
227 event_2.insert("nested.b", "b");
228 event_2.insert("nested.a", "a");
229
230 let discriminant_fields = vec![Atom::from("nested")];
231
232 let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
233 let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
234
235 assert_eq!(discriminant_1, discriminant_2);
236 assert_eq!(hash(discriminant_1), hash(discriminant_2));
237 }
238
239 #[test]
array_values_insertion_order()240 fn array_values_insertion_order() {
241 let mut event_1 = LogEvent::default();
242 event_1.insert("array[0]", "a");
243 event_1.insert("array[1]", "b");
244 let mut event_2 = LogEvent::default();
245 event_2.insert("array[1]", "b");
246 event_2.insert("array[0]", "a");
247
248 let discriminant_fields = vec![Atom::from("array")];
249
250 let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
251 let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
252
253 assert_eq!(discriminant_1, discriminant_2);
254 assert_eq!(hash(discriminant_1), hash(discriminant_2));
255 }
256
257 #[test]
map_values_matter_1()258 fn map_values_matter_1() {
259 let mut event_1 = LogEvent::default();
260 event_1.insert("nested.a", "a"); // `nested` is a `Value::Map`
261 let event_2 = LogEvent::default(); // empty event
262
263 let discriminant_fields = vec![Atom::from("nested")];
264
265 let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
266 let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
267
268 assert_ne!(discriminant_1, discriminant_2);
269 assert_ne!(hash(discriminant_1), hash(discriminant_2));
270 }
271
272 #[test]
map_values_matter_2()273 fn map_values_matter_2() {
274 let mut event_1 = LogEvent::default();
275 event_1.insert("nested.a", "a"); // `nested` is a `Value::Map`
276 let mut event_2 = LogEvent::default();
277 event_2.insert("nested", "x"); // `nested` is a `Value::String`
278
279 let discriminant_fields = vec![Atom::from("nested")];
280
281 let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
282 let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
283
284 assert_ne!(discriminant_1, discriminant_2);
285 assert_ne!(hash(discriminant_1), hash(discriminant_2));
286 }
287
288 #[test]
with_hash_map()289 fn with_hash_map() {
290 let mut map: HashMap<Discriminant, usize> = HashMap::new();
291
292 let event_stream_1 = {
293 let mut event = LogEvent::default();
294 event.insert("hostname", "a.test");
295 event.insert("container_id", "abc");
296 event
297 };
298
299 let event_stream_2 = {
300 let mut event = LogEvent::default();
301 event.insert("hostname", "b.test");
302 event.insert("container_id", "def");
303 event
304 };
305
306 let event_stream_3 = {
307 // no `hostname` or `container_id`
308 LogEvent::default()
309 };
310
311 let discriminant_fields = vec![Atom::from("hostname"), Atom::from("container_id")];
312
313 let mut process_event = |event| {
314 let discriminant = Discriminant::from_log_event(&event, &discriminant_fields);
315 *map.entry(discriminant).and_modify(|e| *e += 1).or_insert(0)
316 };
317
318 {
319 let mut event = event_stream_1.clone();
320 event.insert("message", "a");
321 assert_eq!(process_event(event), 0);
322 }
323
324 {
325 let mut event = event_stream_1.clone();
326 event.insert("message", "b");
327 event.insert("irrelevant", "c");
328 assert_eq!(process_event(event), 1);
329 }
330
331 {
332 let mut event = event_stream_2.clone();
333 event.insert("message", "d");
334 assert_eq!(process_event(event), 0);
335 }
336
337 {
338 let mut event = event_stream_2.clone();
339 event.insert("message", "e");
340 event.insert("irrelevant", "d");
341 assert_eq!(process_event(event), 1);
342 }
343
344 {
345 let mut event = event_stream_3.clone();
346 event.insert("message", "f");
347 assert_eq!(process_event(event), 0);
348 }
349
350 {
351 let mut event = event_stream_3.clone();
352 event.insert("message", "g");
353 event.insert("irrelevant", "d");
354 assert_eq!(process_event(event), 1);
355 }
356
357 // Now assert the amount of events processed per descriminant.
358 assert_eq!(process_event(event_stream_1), 2);
359 assert_eq!(process_event(event_stream_2), 2);
360 assert_eq!(process_event(event_stream_3), 2);
361 }
362 }
363