1 use crate::{
2 event::{self, Event, LogEvent, Value},
3 shutdown::ShutdownSignal,
4 tls::{MaybeTlsSettings, TlsConfig},
5 topology::config::{DataType, GlobalOptions, SourceConfig},
6 };
7 use bytes05::{buf::BufExt, Bytes};
8 use chrono::{DateTime, TimeZone, Utc};
9 use flate2::read::GzDecoder;
10 use futures::{
11 compat::{AsyncRead01CompatExt, Future01CompatExt, Stream01CompatExt},
12 FutureExt, TryFutureExt, TryStreamExt,
13 };
14 use futures01::{sync::mpsc, Async, Future, Sink, Stream};
15 use http::StatusCode;
16 use lazy_static::lazy_static;
17 use serde::{de, Deserialize, Serialize};
18 use serde_json::{de::IoRead, json, Deserializer, Value as JsonValue};
19 use snafu::Snafu;
20 use std::{
21 io::Read,
22 net::{Ipv4Addr, SocketAddr},
23 };
24 use string_cache::DefaultAtom as Atom;
25 use tokio_util::compat::FuturesAsyncReadCompatExt;
26 use warp::{filters::BoxedFilter, path, reject::Rejection, reply::Response, Filter, Reply};
27
28 // Event fields unique to splunk_hec source
29 lazy_static! {
30 pub static ref CHANNEL: Atom = Atom::from("splunk_channel");
31 pub static ref INDEX: Atom = Atom::from("splunk_index");
32 pub static ref SOURCE: Atom = Atom::from("splunk_source");
33 pub static ref SOURCETYPE: Atom = Atom::from("splunk_sourcetype");
34 }
35
36 /// Accepts HTTP requests.
37 #[derive(Deserialize, Serialize, Debug, Clone)]
38 #[serde(deny_unknown_fields, default)]
39 pub struct SplunkConfig {
40 /// Local address on which to listen
41 #[serde(default = "default_socket_address")]
42 address: SocketAddr,
43 /// Splunk HEC token
44 token: Option<String>,
45 tls: Option<TlsConfig>,
46 }
47
48 impl SplunkConfig {
49 #[cfg(test)]
on(address: SocketAddr) -> Self50 pub fn on(address: SocketAddr) -> Self {
51 SplunkConfig {
52 address,
53 ..Self::default()
54 }
55 }
56 }
57
58 impl Default for SplunkConfig {
default() -> Self59 fn default() -> Self {
60 SplunkConfig {
61 address: default_socket_address(),
62 token: None,
63 tls: None,
64 }
65 }
66 }
67
default_socket_address() -> SocketAddr68 fn default_socket_address() -> SocketAddr {
69 SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 8088)
70 }
71
72 #[typetag::serde(name = "splunk_hec")]
73 impl SourceConfig for SplunkConfig {
build( &self, _: &str, _: &GlobalOptions, shutdown: ShutdownSignal, out: mpsc::Sender<Event>, ) -> crate::Result<super::Source>74 fn build(
75 &self,
76 _: &str,
77 _: &GlobalOptions,
78 shutdown: ShutdownSignal,
79 out: mpsc::Sender<Event>,
80 ) -> crate::Result<super::Source> {
81 let source = SplunkSource::new(self);
82
83 let event_service = source.event_service(out.clone());
84 let raw_service = source.raw_service(out.clone());
85 let health_service = source.health_service(out);
86 let options = SplunkSource::options();
87
88 let services = path!("services" / "collector" / ..)
89 .and(
90 event_service
91 .or(raw_service)
92 .unify()
93 .or(health_service)
94 .unify()
95 .or(options)
96 .unify(),
97 )
98 .or_else(finish_err);
99
100 let tls = MaybeTlsSettings::from_config(&self.tls, true)?;
101 let incoming = tls.bind(&self.address)?.incoming();
102
103 let fut = async move {
104 let _ = warp::serve(services)
105 .serve_incoming_with_graceful_shutdown(
106 incoming.compat().map_ok(|s| s.compat().compat()),
107 shutdown.clone().compat().map(|_| ()),
108 )
109 .await;
110 // We need to drop the last copy of ShutdownSignalToken only after server has shut down.
111 drop(shutdown);
112 Ok(())
113 };
114 Ok(Box::new(fut.boxed().compat()))
115 }
116
output_type(&self) -> DataType117 fn output_type(&self) -> DataType {
118 DataType::Log
119 }
120
source_type(&self) -> &'static str121 fn source_type(&self) -> &'static str {
122 "splunk_hec"
123 }
124 }
125
126 /// Shared data for responding to requests.
127 struct SplunkSource {
128 credentials: Option<Bytes>,
129 }
130
131 impl SplunkSource {
new(config: &SplunkConfig) -> Self132 fn new(config: &SplunkConfig) -> Self {
133 SplunkSource {
134 credentials: config
135 .token
136 .as_ref()
137 .map(|token| format!("Splunk {}", token).into()),
138 }
139 }
140
event_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)>141 fn event_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)> {
142 warp::post()
143 .and(path!("event").or(path!("event" / "1.0")))
144 .and(self.authorization())
145 .and(warp::header::optional::<String>("x-splunk-request-channel"))
146 .and(warp::header::optional::<String>("host"))
147 .and(self.gzip())
148 .and(warp::body::bytes())
149 .and_then(
150 move |_,
151 _,
152 channel: Option<String>,
153 host: Option<String>,
154 gzip: bool,
155 body: Bytes| {
156 let out = out.clone();
157 async move {
158 // Construct event parser
159 if gzip {
160 EventStream::new(GzDecoder::new(body.reader()), channel, host)
161 .forward(out.clone().sink_map_err(|_| ApiError::ServerShutdown))
162 .map(|_| ())
163 .compat()
164 .await
165 } else {
166 EventStream::new(body.reader(), channel, host)
167 .forward(out.clone().sink_map_err(|_| ApiError::ServerShutdown))
168 .map(|_| ())
169 .compat()
170 .await
171 }
172 }
173 },
174 )
175 .map(finish_ok)
176 .boxed()
177 }
178
raw_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)>179 fn raw_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)> {
180 warp::post()
181 .and(path!("raw" / "1.0").or(path!("raw")))
182 .and(self.authorization())
183 .and(
184 warp::header::optional::<String>("x-splunk-request-channel").and_then(
185 |channel: Option<String>| async {
186 if let Some(channel) = channel {
187 Ok(channel)
188 } else {
189 Err(Rejection::from(ApiError::MissingChannel))
190 }
191 },
192 ),
193 )
194 .and(warp::header::optional::<String>("host"))
195 .and(self.gzip())
196 .and(warp::body::bytes())
197 .and_then(
198 move |_, _, channel: String, host: Option<String>, gzip: bool, body: Bytes| {
199 let out = out.clone();
200 async move {
201 // Construct event parser
202 futures01::stream::once(raw_event(body, gzip, channel, host))
203 .forward(out.clone().sink_map_err(|_| ApiError::ServerShutdown))
204 .map(|_| ())
205 .compat()
206 .await
207 }
208 },
209 )
210 .map(finish_ok)
211 .boxed()
212 }
213
health_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)>214 fn health_service(&self, out: mpsc::Sender<Event>) -> BoxedFilter<(Response,)> {
215 let credentials = self.credentials.clone();
216 let authorize =
217 warp::header::optional("Authorization").and_then(move |token: Option<String>| {
218 let credentials = credentials.clone();
219 async move {
220 match (token, credentials) {
221 (_, None) => Ok(()),
222 (Some(token), Some(password)) if token.as_bytes() == password.as_ref() => {
223 Ok(())
224 }
225 _ => Err(Rejection::from(ApiError::BadRequest)),
226 }
227 }
228 });
229
230 warp::get()
231 .and(path!("health" / "1.0").or(path!("health")))
232 .and(authorize)
233 .and_then(move |_, _| {
234 let out = out.clone();
235 async move {
236 match out.clone().poll_ready() {
237 Ok(Async::Ready(())) => Ok(warp::reply().into_response()),
238 // Since channel of mpsc::Sender increase by one with each sender, technically
239 // channel will never be full, and this will never be returned.
240 // This behavior dosn't fulfill one of purposes of healthcheck.
241 Ok(Async::NotReady) => Ok(warp::reply::with_status(
242 warp::reply(),
243 StatusCode::SERVICE_UNAVAILABLE,
244 )
245 .into_response()),
246 Err(_) => Err(Rejection::from(ApiError::ServerShutdown)),
247 }
248 }
249 })
250 .boxed()
251 }
252
options() -> BoxedFilter<(Response,)>253 fn options() -> BoxedFilter<(Response,)> {
254 let post = warp::options()
255 .and(
256 path!("event")
257 .or(path!("event" / "1.0"))
258 .or(path!("raw" / "1.0"))
259 .or(path!("raw")),
260 )
261 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
262
263 let get = warp::options()
264 .and(path!("health").or(path!("health" / "1.0")))
265 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
266
267 post.or(get).unify().boxed()
268 }
269
270 /// Authorize request
authorization(&self) -> BoxedFilter<((),)>271 fn authorization(&self) -> BoxedFilter<((),)> {
272 let credentials = self.credentials.clone();
273 warp::header::optional("Authorization")
274 .and_then(move |token: Option<String>| {
275 let credentials = credentials.clone();
276 async move {
277 match (token, credentials) {
278 (_, None) => Ok(()),
279 (Some(token), Some(password)) if token.as_bytes() == password.as_ref() => {
280 Ok(())
281 }
282 (Some(_), Some(_)) => Err(Rejection::from(ApiError::InvalidAuthorization)),
283 (None, Some(_)) => Err(Rejection::from(ApiError::MissingAuthorization)),
284 }
285 }
286 })
287 .boxed()
288 }
289
290 /// Is body encoded with gzip
gzip(&self) -> BoxedFilter<(bool,)>291 fn gzip(&self) -> BoxedFilter<(bool,)> {
292 warp::header::optional::<String>("Content-Encoding")
293 .and_then(|encoding: Option<String>| async move {
294 match encoding {
295 Some(s) if s.as_bytes() == b"gzip" => Ok(true),
296 Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
297 None => Ok(false),
298 }
299 })
300 .boxed()
301 }
302 }
303
304 /// Constructs one ore more events from json-s coming from reader.
305 /// If errors, it's done with input.
306 struct EventStream<R: Read> {
307 /// Remaining request with JSON events
308 data: R,
309 /// Count of sended events
310 events: usize,
311 /// Optinal channel from headers
312 channel: Option<Value>,
313 /// Default time
314 time: Time,
315 /// Remaining extracted default values
316 extractors: [DefaultExtractor; 4],
317 }
318
319 impl<R: Read> EventStream<R> {
new(data: R, channel: Option<String>, host: Option<String>) -> Self320 fn new(data: R, channel: Option<String>, host: Option<String>) -> Self {
321 EventStream {
322 data,
323 events: 0,
324 channel: channel.map(|value| value.as_bytes().into()),
325 time: Time::Now(Utc::now()),
326 extractors: [
327 DefaultExtractor::new_with(
328 "host",
329 &event::log_schema().host_key(),
330 host.map(|value| value.as_bytes().into()),
331 ),
332 DefaultExtractor::new("index", &INDEX),
333 DefaultExtractor::new("source", &SOURCE),
334 DefaultExtractor::new("sourcetype", &SOURCETYPE),
335 ],
336 }
337 }
338
339 /// As serde_json::from_reader, but doesn't require that all data has to be consumed,
340 /// nor that it has to exist.
from_reader_take<T>(&mut self) -> Result<Option<T>, serde_json::Error> where T: de::DeserializeOwned,341 fn from_reader_take<T>(&mut self) -> Result<Option<T>, serde_json::Error>
342 where
343 T: de::DeserializeOwned,
344 {
345 use serde_json::de::Read;
346 let mut reader = IoRead::new(&mut self.data);
347 match reader.peek()? {
348 None => Ok(None),
349 Some(_) => Deserialize::deserialize(&mut Deserializer::new(reader)).map(Some),
350 }
351 }
352 }
353
354 impl<R: Read> Stream for EventStream<R> {
355 type Item = Event;
356 type Error = Rejection;
poll(&mut self) -> Result<Async<Option<Event>>, Rejection>357 fn poll(&mut self) -> Result<Async<Option<Event>>, Rejection> {
358 // Parse JSON object
359 let mut json = match self.from_reader_take::<JsonValue>() {
360 Ok(Some(json)) => json,
361 Ok(None) => {
362 return if self.events == 0 {
363 Err(ApiError::NoData.into())
364 } else {
365 Ok(Async::Ready(None))
366 };
367 }
368 Err(error) => {
369 error!(message = "Malformed request body",%error);
370 return Err(ApiError::InvalidDataFormat { event: self.events }.into());
371 }
372 };
373
374 // Concstruct Event from parsed json event
375 let mut event = Event::new_empty_log();
376 let log = event.as_mut_log();
377
378 // Add source type
379 log.insert(event::log_schema().source_type_key(), "splunk_hec");
380
381 // Process event field
382 match json.get_mut("event") {
383 Some(event) => match event.take() {
384 JsonValue::String(string) => {
385 if string.is_empty() {
386 return Err(ApiError::EmptyEventField { event: self.events }.into());
387 }
388 log.insert(event::log_schema().message_key().clone(), string);
389 }
390 JsonValue::Object(mut object) => {
391 if object.is_empty() {
392 return Err(ApiError::EmptyEventField { event: self.events }.into());
393 }
394
395 // Add 'line' value as 'event::schema().message_key'
396 if let Some(line) = object.remove("line") {
397 match line {
398 // This don't quite fit the meaning of a event::schema().message_key
399 JsonValue::Array(_) | JsonValue::Object(_) => {
400 log.insert("line", line);
401 }
402 _ => {
403 log.insert(event::log_schema().message_key(), line);
404 }
405 }
406 }
407
408 for (key, value) in object {
409 log.insert(key, value);
410 }
411 }
412 _ => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
413 },
414 None => return Err(ApiError::MissingEventField { event: self.events }.into()),
415 }
416
417 // Process channel field
418 if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
419 log.insert(CHANNEL.clone(), guid);
420 } else if let Some(guid) = self.channel.as_ref() {
421 log.insert(CHANNEL.clone(), guid.clone());
422 }
423
424 // Process fields field
425 if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
426 for (key, value) in object {
427 log.insert(key, value);
428 }
429 }
430
431 // Process time field
432 let parsed_time = match json.get_mut("time").map(JsonValue::take) {
433 Some(JsonValue::Number(time)) => Some(Some(time)),
434 Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
435 _ => None,
436 };
437 match parsed_time {
438 None => (),
439 Some(Some(t)) => {
440 if let Some(t) = t.as_u64() {
441 let time = parse_timestamp(t as i64)
442 .ok_or_else(|| ApiError::InvalidDataFormat { event: self.events })?;
443
444 self.time = Time::Provided(time);
445 } else if let Some(t) = t.as_f64() {
446 self.time = Time::Provided(Utc.timestamp(
447 t.floor() as i64,
448 (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
449 ));
450 } else {
451 return Err(ApiError::InvalidDataFormat { event: self.events }.into());
452 }
453 }
454 Some(None) => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
455 }
456
457 // Add time field
458 match self.time.clone() {
459 Time::Provided(time) => log.insert(event::log_schema().timestamp_key().clone(), time),
460 Time::Now(time) => log.insert(event::log_schema().timestamp_key().clone(), time),
461 };
462
463 // Extract default extracted fields
464 for de in self.extractors.iter_mut() {
465 de.extract(log, &mut json);
466 }
467
468 self.events += 1;
469
470 Ok(Async::Ready(Some(event)))
471 }
472 }
473
474 /// Parse a `i64` unix timestamp that can either be in seconds, milliseconds or
475 /// nanoseconds.
476 ///
477 /// This attempts to parse timestamps based on what cutoff range they fall into.
478 /// For seconds to be parsed the timestamp must be less than the unix epoch of
479 /// the year `2400`. For this to parse milliseconds the time must be smaller
480 /// than the year `10,000` in unix epcoch milliseconds. If the value is larger
481 /// than both we attempt to parse it as nanoseconds.
482 ///
483 /// Returns `None` if `t` is negative.
parse_timestamp(t: i64) -> Option<DateTime<Utc>>484 fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
485 // Utc.ymd(2400, 1, 1).and_hms(0,0,0).timestamp();
486 const SEC_CUTOFF: i64 = 13569465600;
487 // Utc.ymd(10_000, 1, 1).and_hms(0,0,0).timestamp_millis();
488 const MILLISEC_CUTOFF: i64 = 253402300800000;
489
490 // Timestamps can't be negative!
491 if t < 0 {
492 return None;
493 }
494
495 let ts = if t < SEC_CUTOFF {
496 Utc.timestamp(t, 0)
497 } else if t < MILLISEC_CUTOFF {
498 Utc.timestamp_millis(t)
499 } else {
500 Utc.timestamp_nanos(t)
501 };
502
503 Some(ts)
504 }
505
506 /// Maintains last known extracted value of field and uses it in the absence of field.
507 struct DefaultExtractor {
508 field: &'static str,
509 to_field: &'static Atom,
510 value: Option<Value>,
511 }
512
513 impl DefaultExtractor {
new(field: &'static str, to_field: &'static Atom) -> Self514 fn new(field: &'static str, to_field: &'static Atom) -> Self {
515 DefaultExtractor {
516 field,
517 to_field,
518 value: None,
519 }
520 }
521
new_with( field: &'static str, to_field: &'static Atom, value: impl Into<Option<Value>>, ) -> Self522 fn new_with(
523 field: &'static str,
524 to_field: &'static Atom,
525 value: impl Into<Option<Value>>,
526 ) -> Self {
527 DefaultExtractor {
528 field,
529 to_field,
530 value: value.into(),
531 }
532 }
533
extract(&mut self, log: &mut LogEvent, value: &mut JsonValue)534 fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
535 // Process json_field
536 if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
537 self.value = Some(new_value.into());
538 }
539
540 // Add data field
541 if let Some(index) = self.value.as_ref() {
542 log.insert(self.to_field.clone(), index.clone());
543 }
544 }
545 }
546
547 /// For tracking origin of the timestamp
548 #[derive(Clone, Debug)]
549 enum Time {
550 /// Backup
551 Now(DateTime<Utc>),
552 /// Provided in the request
553 Provided(DateTime<Utc>),
554 }
555
556 /// Creates event from raw request
raw_event( bytes: Bytes, gzip: bool, channel: String, host: Option<String>, ) -> Result<Event, Rejection>557 fn raw_event(
558 bytes: Bytes,
559 gzip: bool,
560 channel: String,
561 host: Option<String>,
562 ) -> Result<Event, Rejection> {
563 // Process gzip
564 let message: Value = if gzip {
565 let mut data = Vec::new();
566 match GzDecoder::new(bytes.reader()).read_to_end(&mut data) {
567 Ok(0) => return Err(ApiError::NoData.into()),
568 Ok(_) => data.into(),
569 Err(error) => {
570 error!(message = "Malformed request body",%error);
571 return Err(ApiError::InvalidDataFormat { event: 0 }.into());
572 }
573 }
574 } else {
575 bytes.into()
576 };
577
578 // Construct event
579 let mut event = Event::new_empty_log();
580 let log = event.as_mut_log();
581
582 // Add message
583 log.insert(event::log_schema().message_key().clone(), message);
584
585 // Add channel
586 log.insert(CHANNEL.clone(), channel.as_bytes());
587
588 // Add host
589 if let Some(host) = host {
590 log.insert(event::log_schema().host_key().clone(), host.as_bytes());
591 }
592
593 // Add timestamp
594 log.insert(event::log_schema().timestamp_key().clone(), Utc::now());
595
596 // Add source type
597 event
598 .as_mut_log()
599 .try_insert(event::log_schema().source_type_key(), "splunk_hec");
600
601 Ok(event)
602 }
603
604 #[derive(Debug, Snafu)]
605 enum ApiError {
606 MissingAuthorization,
607 InvalidAuthorization,
608 UnsupportedEncoding,
609 MissingChannel,
610 NoData,
611 InvalidDataFormat { event: usize },
612 ServerShutdown,
613 EmptyEventField { event: usize },
614 MissingEventField { event: usize },
615 BadRequest,
616 }
617
618 impl From<ApiError> for Rejection {
from(error: ApiError) -> Self619 fn from(error: ApiError) -> Self {
620 warp::reject::custom(error)
621 }
622 }
623
624 impl warp::reject::Reject for ApiError {}
625
626 /// Cached bodies for common responses
627 mod splunk_response {
628 use bytes::Bytes;
629 use lazy_static::lazy_static;
630 use serde_json::{json, Value};
631
json_to_bytes(value: Value) -> Bytes632 fn json_to_bytes(value: Value) -> Bytes {
633 serde_json::to_string(&value).unwrap().into()
634 }
635
636 lazy_static! {
637 pub static ref INVALID_AUTHORIZATION: Bytes =
638 json_to_bytes(json!({"text":"Invalid authorization","code":3}));
639 pub static ref MISSING_CREDENTIALS: Bytes =
640 json_to_bytes(json!({"text":"Token is required","code":2}));
641 pub static ref NO_DATA: Bytes = json_to_bytes(json!({"text":"No data","code":5}));
642 pub static ref SUCCESS: Bytes = json_to_bytes(json!({"text":"Success","code":0}));
643 pub static ref SERVER_ERROR: Bytes =
644 json_to_bytes(json!({"text":"Internal server error","code":8}));
645 pub static ref SERVER_SHUTDOWN: Bytes =
646 json_to_bytes(json!({"text":"Server is shuting down","code":9}));
647 pub static ref UNSUPPORTED_MEDIA_TYPE: Bytes =
648 json_to_bytes(json!({"text":"unsupported content encoding"}));
649 pub static ref NO_CHANNEL: Bytes =
650 json_to_bytes(json!({"text":"Data channel is missing","code":10}));
651 }
652 }
653
finish_ok(_: ()) -> Response654 fn finish_ok(_: ()) -> Response {
655 response_json(StatusCode::OK, splunk_response::SUCCESS.as_ref())
656 }
657
finish_err(rejection: Rejection) -> Result<(Response,), Rejection>658 async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
659 if let Some(error) = rejection.find::<ApiError>() {
660 Ok((match error {
661 ApiError::MissingAuthorization => response_json(
662 StatusCode::UNAUTHORIZED,
663 splunk_response::MISSING_CREDENTIALS.as_ref(),
664 ),
665 ApiError::InvalidAuthorization => response_json(
666 StatusCode::UNAUTHORIZED,
667 splunk_response::INVALID_AUTHORIZATION.as_ref(),
668 ),
669 ApiError::UnsupportedEncoding => response_json(
670 StatusCode::UNSUPPORTED_MEDIA_TYPE,
671 splunk_response::UNSUPPORTED_MEDIA_TYPE.as_ref(),
672 ),
673 ApiError::MissingChannel => response_json(
674 StatusCode::BAD_REQUEST,
675 splunk_response::NO_CHANNEL.as_ref(),
676 ),
677 ApiError::NoData => {
678 response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA.as_ref())
679 }
680 ApiError::ServerShutdown => response_json(
681 StatusCode::SERVICE_UNAVAILABLE,
682 splunk_response::SERVER_SHUTDOWN.as_ref(),
683 ),
684 ApiError::InvalidDataFormat { event } => event_error("Invalid data format", 6, *event),
685 ApiError::EmptyEventField { event } => {
686 event_error("Event field cannot be blank", 13, *event)
687 }
688 ApiError::MissingEventField { event } => {
689 event_error("Event field is required", 12, *event)
690 }
691 ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
692 },))
693 } else {
694 Err(rejection)
695 }
696 }
697
698 /// Response without body
empty_response(code: StatusCode) -> Response699 fn empty_response(code: StatusCode) -> Response {
700 let mut res = Response::default();
701 *res.status_mut() = code;
702 res
703 }
704
705 /// Response with body
response_json(code: StatusCode, body: impl Serialize) -> Response706 fn response_json(code: StatusCode, body: impl Serialize) -> Response {
707 warp::reply::with_status(warp::reply::json(&body), code).into_response()
708 }
709
710 /// Error happened during parsing of events
event_error(text: &str, code: u16, event: usize) -> Response711 fn event_error(text: &str, code: u16, event: usize) -> Response {
712 let body = json!({
713 "text":text,
714 "code":code,
715 "invalid-event-number":event
716 });
717 match serde_json::to_string(&body) {
718 Ok(string) => response_json(StatusCode::BAD_REQUEST, string),
719 Err(error) => {
720 error!("Error encoding json body: {}", error);
721 response_json(
722 StatusCode::INTERNAL_SERVER_ERROR,
723 splunk_response::SERVER_ERROR.clone(),
724 )
725 }
726 }
727 }
728
729 #[cfg(feature = "sinks-splunk_hec")]
730 #[cfg(test)]
731 mod tests {
732 use super::{parse_timestamp, SplunkConfig};
733 use crate::runtime::Runtime;
734 use crate::test_util::{self, collect_n, runtime};
735 use crate::{
736 event::{self, Event},
737 shutdown::ShutdownSignal,
738 sinks::{
739 splunk_hec::{Encoding, HecSinkConfig},
740 util::{encoding::EncodingConfigWithDefault, Compression},
741 Healthcheck, RouterSink,
742 },
743 topology::config::{GlobalOptions, SinkConfig, SinkContext, SourceConfig},
744 };
745 use chrono::{TimeZone, Utc};
746 use futures::compat::Future01CompatExt;
747 use futures01::{stream, sync::mpsc, Sink};
748 use std::net::SocketAddr;
749
750 /// Splunk token
751 const TOKEN: &str = "token";
752
753 const CHANNEL_CAPACITY: usize = 1000;
754
source(rt: &mut Runtime) -> (mpsc::Receiver<Event>, SocketAddr)755 fn source(rt: &mut Runtime) -> (mpsc::Receiver<Event>, SocketAddr) {
756 source_with(rt, Some(TOKEN.to_owned()))
757 }
758
source_with(rt: &mut Runtime, token: Option<String>) -> (mpsc::Receiver<Event>, SocketAddr)759 fn source_with(rt: &mut Runtime, token: Option<String>) -> (mpsc::Receiver<Event>, SocketAddr) {
760 test_util::trace_init();
761 let (sender, recv) = mpsc::channel(CHANNEL_CAPACITY);
762 let address = test_util::next_addr();
763 rt.spawn(
764 SplunkConfig {
765 address,
766 token,
767 tls: None,
768 }
769 .build(
770 "default",
771 &GlobalOptions::default(),
772 ShutdownSignal::noop(),
773 sender,
774 )
775 .unwrap(),
776 );
777 (recv, address)
778 }
779
sink( address: SocketAddr, encoding: impl Into<EncodingConfigWithDefault<Encoding>>, compression: Compression, ) -> (RouterSink, Healthcheck)780 fn sink(
781 address: SocketAddr,
782 encoding: impl Into<EncodingConfigWithDefault<Encoding>>,
783 compression: Compression,
784 ) -> (RouterSink, Healthcheck) {
785 HecSinkConfig {
786 host: format!("http://{}", address),
787 token: TOKEN.to_owned(),
788 encoding: encoding.into(),
789 compression,
790 ..HecSinkConfig::default()
791 }
792 .build(SinkContext::new_test())
793 .unwrap()
794 }
795
start( encoding: impl Into<EncodingConfigWithDefault<Encoding>>, compression: Compression, ) -> (Runtime, RouterSink, mpsc::Receiver<Event>)796 fn start(
797 encoding: impl Into<EncodingConfigWithDefault<Encoding>>,
798 compression: Compression,
799 ) -> (Runtime, RouterSink, mpsc::Receiver<Event>) {
800 let mut rt = runtime();
801 let (source, address) = source(&mut rt);
802 let (sink, health) = sink(address, encoding, compression);
803 assert!(rt.block_on(health).is_ok());
804 (rt, sink, source)
805 }
806
channel_n( messages: Vec<impl Into<Event> + Send + 'static>, sink: RouterSink, source: mpsc::Receiver<Event>, rt: &mut Runtime, ) -> Vec<Event>807 fn channel_n(
808 messages: Vec<impl Into<Event> + Send + 'static>,
809 sink: RouterSink,
810 source: mpsc::Receiver<Event>,
811 rt: &mut Runtime,
812 ) -> Vec<Event> {
813 let n = messages.len();
814 assert!(
815 n <= CHANNEL_CAPACITY,
816 "To much messages for the sink channel"
817 );
818 let pump = sink.send_all(stream::iter_ok(messages.into_iter().map(Into::into)));
819 let _ = rt.block_on(pump).unwrap();
820 let events = rt.block_on(collect_n(source, n)).unwrap();
821
822 assert_eq!(n, events.len());
823
824 events
825 }
826
post(address: SocketAddr, api: &str, message: &str) -> u16827 async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
828 send_with(address, api, message, TOKEN).await
829 }
830
send_with(address: SocketAddr, api: &str, message: &str, token: &str) -> u16831 async fn send_with(address: SocketAddr, api: &str, message: &str, token: &str) -> u16 {
832 reqwest::Client::new()
833 .post(&format!("http://{}/{}", address, api))
834 .header("Authorization", format!("Splunk {}", token))
835 .header("x-splunk-request-channel", "guid")
836 .body(message.to_owned())
837 .send()
838 .await
839 .unwrap()
840 .status()
841 .as_u16()
842 }
843
844 #[test]
no_compression_text_event()845 fn no_compression_text_event() {
846 let message = "gzip_text_event";
847 let (mut rt, sink, source) = start(Encoding::Text, Compression::None);
848
849 let event = channel_n(vec![message], sink, source, &mut rt).remove(0);
850
851 assert_eq!(
852 event.as_log()[&event::log_schema().message_key()],
853 message.into()
854 );
855 assert!(event
856 .as_log()
857 .get(&event::log_schema().timestamp_key())
858 .is_some());
859 assert_eq!(
860 event.as_log()[event::log_schema().source_type_key()],
861 "splunk_hec".into()
862 );
863 }
864
865 #[test]
one_simple_text_event()866 fn one_simple_text_event() {
867 let message = "one_simple_text_event";
868 let (mut rt, sink, source) = start(Encoding::Text, Compression::Gzip);
869
870 let event = channel_n(vec![message], sink, source, &mut rt).remove(0);
871
872 assert_eq!(
873 event.as_log()[&event::log_schema().message_key()],
874 message.into()
875 );
876 assert!(event
877 .as_log()
878 .get(&event::log_schema().timestamp_key())
879 .is_some());
880 assert_eq!(
881 event.as_log()[event::log_schema().source_type_key()],
882 "splunk_hec".into()
883 );
884 }
885
886 #[test]
multiple_simple_text_event()887 fn multiple_simple_text_event() {
888 let n = 200;
889 let (mut rt, sink, source) = start(Encoding::Text, Compression::None);
890
891 let messages = (0..n)
892 .map(|i| format!("multiple_simple_text_event_{}", i))
893 .collect::<Vec<_>>();
894 let events = channel_n(messages.clone(), sink, source, &mut rt);
895
896 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
897 assert_eq!(
898 event.as_log()[&event::log_schema().message_key()],
899 msg.into()
900 );
901 assert!(event
902 .as_log()
903 .get(&event::log_schema().timestamp_key())
904 .is_some());
905 assert_eq!(
906 event.as_log()[event::log_schema().source_type_key()],
907 "splunk_hec".into()
908 );
909 }
910 }
911
912 #[test]
one_simple_json_event()913 fn one_simple_json_event() {
914 let message = "one_simple_json_event";
915 let (mut rt, sink, source) = start(Encoding::Json, Compression::Gzip);
916
917 let event = channel_n(vec![message], sink, source, &mut rt).remove(0);
918
919 assert_eq!(
920 event.as_log()[&event::log_schema().message_key()],
921 message.into()
922 );
923 assert!(event
924 .as_log()
925 .get(&event::log_schema().timestamp_key())
926 .is_some());
927 assert_eq!(
928 event.as_log()[event::log_schema().source_type_key()],
929 "splunk_hec".into()
930 );
931 }
932
933 #[test]
multiple_simple_json_event()934 fn multiple_simple_json_event() {
935 let n = 200;
936 let (mut rt, sink, source) = start(Encoding::Json, Compression::Gzip);
937
938 let messages = (0..n)
939 .map(|i| format!("multiple_simple_json_event{}", i))
940 .collect::<Vec<_>>();
941 let events = channel_n(messages.clone(), sink, source, &mut rt);
942
943 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
944 assert_eq!(
945 event.as_log()[&event::log_schema().message_key()],
946 msg.into()
947 );
948 assert!(event
949 .as_log()
950 .get(&event::log_schema().timestamp_key())
951 .is_some());
952 assert_eq!(
953 event.as_log()[event::log_schema().source_type_key()],
954 "splunk_hec".into()
955 );
956 }
957 }
958
959 #[test]
json_event()960 fn json_event() {
961 let (mut rt, sink, source) = start(Encoding::Json, Compression::Gzip);
962
963 let mut event = Event::new_empty_log();
964 event.as_mut_log().insert("greeting", "hello");
965 event.as_mut_log().insert("name", "bob");
966
967 let pump = sink.send(event);
968 let _ = rt.block_on(pump).unwrap();
969 let event = rt.block_on(collect_n(source, 1)).unwrap().remove(0);
970
971 assert_eq!(event.as_log()[&"greeting".into()], "hello".into());
972 assert_eq!(event.as_log()[&"name".into()], "bob".into());
973 assert!(event
974 .as_log()
975 .get(&event::log_schema().timestamp_key())
976 .is_some());
977 assert_eq!(
978 event.as_log()[event::log_schema().source_type_key()],
979 "splunk_hec".into()
980 );
981 }
982
983 #[test]
line_to_message()984 fn line_to_message() {
985 let (mut rt, sink, source) = start(Encoding::Json, Compression::Gzip);
986
987 let mut event = Event::new_empty_log();
988 event.as_mut_log().insert("line", "hello");
989
990 let pump = sink.send(event);
991 let _ = rt.block_on(pump).unwrap();
992 let event = rt.block_on(collect_n(source, 1)).unwrap().remove(0);
993
994 assert_eq!(
995 event.as_log()[&event::log_schema().message_key()],
996 "hello".into()
997 );
998 }
999
1000 #[test]
raw()1001 fn raw() {
1002 let message = "raw";
1003 let mut rt = runtime();
1004 let (source, address) = source(&mut rt);
1005
1006 rt.block_on_std(async move {
1007 assert_eq!(200, post(address, "services/collector/raw", message).await);
1008
1009 let event = collect_n(source, 1).compat().await.unwrap().remove(0);
1010 assert_eq!(
1011 event.as_log()[&event::log_schema().message_key()],
1012 message.into()
1013 );
1014 assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1015 assert!(event
1016 .as_log()
1017 .get(&event::log_schema().timestamp_key())
1018 .is_some());
1019 assert_eq!(
1020 event.as_log()[event::log_schema().source_type_key()],
1021 "splunk_hec".into()
1022 );
1023 });
1024 }
1025
1026 #[test]
no_data()1027 fn no_data() {
1028 let mut rt = runtime();
1029 let (_source, address) = source(&mut rt);
1030
1031 rt.block_on_std(async move {
1032 assert_eq!(400, post(address, "services/collector/event", "").await);
1033 });
1034 }
1035
1036 #[test]
invalid_token()1037 fn invalid_token() {
1038 let mut rt = runtime();
1039 let (_source, address) = source(&mut rt);
1040
1041 rt.block_on_std(async move {
1042 assert_eq!(
1043 401,
1044 send_with(address, "services/collector/event", "", "nope").await
1045 );
1046 });
1047 }
1048
1049 #[test]
no_autorization()1050 fn no_autorization() {
1051 let message = "no_autorization";
1052 let mut rt = runtime();
1053 let (source, address) = source_with(&mut rt, None);
1054 let (sink, health) = sink(address, Encoding::Text, Compression::Gzip);
1055 assert!(rt.block_on(health).is_ok());
1056
1057 let event = channel_n(vec![message], sink, source, &mut rt).remove(0);
1058
1059 assert_eq!(
1060 event.as_log()[&event::log_schema().message_key()],
1061 message.into()
1062 );
1063 }
1064
1065 #[test]
partial()1066 fn partial() {
1067 let message = r#"{"event":"first"}{"event":"second""#;
1068 let mut rt = runtime();
1069 let (source, address) = source(&mut rt);
1070
1071 rt.block_on_std(async move {
1072 assert_eq!(
1073 400,
1074 post(address, "services/collector/event", message).await
1075 );
1076
1077 let event = collect_n(source, 1).compat().await.unwrap().remove(0);
1078 assert_eq!(
1079 event.as_log()[&event::log_schema().message_key()],
1080 "first".into()
1081 );
1082 assert!(event
1083 .as_log()
1084 .get(&event::log_schema().timestamp_key())
1085 .is_some());
1086 assert_eq!(
1087 event.as_log()[event::log_schema().source_type_key()],
1088 "splunk_hec".into()
1089 );
1090 });
1091 }
1092
1093 #[test]
default()1094 fn default() {
1095 let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
1096 let mut rt = runtime();
1097 let (source, address) = source(&mut rt);
1098
1099 rt.block_on_std(async move {
1100 assert_eq!(
1101 200,
1102 post(address, "services/collector/event", message).await
1103 );
1104
1105 let events = collect_n(source, 3).compat().await.unwrap();
1106
1107 assert_eq!(
1108 events[0].as_log()[&event::log_schema().message_key()],
1109 "first".into()
1110 );
1111 assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
1112
1113 assert_eq!(
1114 events[1].as_log()[&event::log_schema().message_key()],
1115 "second".into()
1116 );
1117 assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
1118
1119 assert_eq!(
1120 events[2].as_log()[&event::log_schema().message_key()],
1121 "third".into()
1122 );
1123 assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
1124 });
1125 }
1126
1127 #[test]
parse_timestamps()1128 fn parse_timestamps() {
1129 let cases = vec![
1130 Utc::now(),
1131 Utc.ymd(1971, 11, 7).and_hms(1, 1, 1),
1132 Utc.ymd(2011, 8, 5).and_hms(1, 1, 1),
1133 Utc.ymd(2189, 11, 4).and_hms(2, 2, 2),
1134 ];
1135
1136 for case in cases {
1137 let sec = case.timestamp();
1138 let millis = case.timestamp_millis();
1139 let nano = case.timestamp_nanos();
1140
1141 assert_eq!(
1142 parse_timestamp(sec as i64).unwrap().timestamp(),
1143 case.timestamp()
1144 );
1145 assert_eq!(
1146 parse_timestamp(millis as i64).unwrap().timestamp_millis(),
1147 case.timestamp_millis()
1148 );
1149 assert_eq!(
1150 parse_timestamp(nano as i64).unwrap().timestamp_nanos(),
1151 case.timestamp_nanos()
1152 );
1153 }
1154
1155 assert!(parse_timestamp(-1).is_none());
1156 }
1157 }
1158