1 use crate::{
2 sinks::http::{HttpMethod, HttpSinkConfig},
3 sinks::util::{
4 encoding::{EncodingConfigWithDefault, EncodingConfiguration},
5 service2::TowerRequestConfig,
6 BatchConfig, Compression,
7 },
8 topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
9 };
10 use http::Uri;
11 use indexmap::IndexMap;
12 use serde::{Deserialize, Serialize};
13 use snafu::Snafu;
14
15 #[derive(Debug, Snafu)]
16 enum BuildError {
17 #[snafu(display(
18 "Missing authentication key, must provide either 'license_key' or 'insert_key'"
19 ))]
20 MissingAuthParam,
21 }
22
23 #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
24 #[serde(rename_all = "snake_case")]
25 #[derivative(Default)]
26 pub enum NewRelicLogsRegion {
27 #[derivative(Default)]
28 Us,
29 Eu,
30 }
31
32 #[derive(Deserialize, Serialize, Debug, Derivative, Clone)]
33 #[derivative(Default)]
34 pub struct NewRelicLogsConfig {
35 pub license_key: Option<String>,
36 pub insert_key: Option<String>,
37 pub region: Option<NewRelicLogsRegion>,
38 #[serde(skip_serializing_if = "skip_serializing_if_default", default)]
39 pub encoding: EncodingConfigWithDefault<Encoding>,
40 #[serde(default)]
41 pub compression: Compression,
42 #[serde(default)]
43 pub batch: BatchConfig,
44
45 #[serde(default)]
46 pub request: TowerRequestConfig,
47 }
48
49 inventory::submit! {
50 SinkDescription::new::<NewRelicLogsConfig>("new_relic_logs")
51 }
52
53 #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
54 #[serde(rename_all = "snake_case")]
55 #[derivative(Default)]
56 pub enum Encoding {
57 #[derivative(Default)]
58 Json,
59 }
60
61 impl From<Encoding> for crate::sinks::http::Encoding {
from(v: Encoding) -> crate::sinks::http::Encoding62 fn from(v: Encoding) -> crate::sinks::http::Encoding {
63 match v {
64 Encoding::Json => crate::sinks::http::Encoding::Json,
65 }
66 }
67 }
68
69 // There is another one of these in `util::encoding`, but this one is specialized for New Relic.
70 /// For encodings, answers "Is it possible to skip serializing this value, because it's the
71 /// default?"
skip_serializing_if_default(e: &EncodingConfigWithDefault<Encoding>) -> bool72 pub(crate) fn skip_serializing_if_default(e: &EncodingConfigWithDefault<Encoding>) -> bool {
73 e.codec() == &Encoding::default()
74 }
75
76 #[typetag::serde(name = "new_relic_logs")]
77 impl SinkConfig for NewRelicLogsConfig {
build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)>78 fn build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)> {
79 let http_conf = self.create_config()?;
80 http_conf.build(cx)
81 }
82
input_type(&self) -> DataType83 fn input_type(&self) -> DataType {
84 DataType::Log
85 }
86
sink_type(&self) -> &'static str87 fn sink_type(&self) -> &'static str {
88 "new_relic_logs"
89 }
90 }
91
92 impl NewRelicLogsConfig {
create_config(&self) -> crate::Result<HttpSinkConfig>93 fn create_config(&self) -> crate::Result<HttpSinkConfig> {
94 let mut headers: IndexMap<String, String> = IndexMap::new();
95
96 if let Some(license_key) = &self.license_key {
97 headers.insert("X-License-Key".to_owned(), license_key.clone());
98 } else if let Some(insert_key) = &self.insert_key {
99 headers.insert("X-Insert-Key".to_owned(), insert_key.clone());
100 } else {
101 return Err(Box::new(BuildError::MissingAuthParam));
102 }
103
104 let uri = match self.region.as_ref().unwrap_or(&NewRelicLogsRegion::Us) {
105 NewRelicLogsRegion::Us => Uri::from_static("https://log-api.newrelic.com/log/v1"),
106 NewRelicLogsRegion::Eu => Uri::from_static("https://log-api.eu.newrelic.com/log/v1"),
107 };
108
109 let batch = self.batch.use_size_as_bytes()?;
110 let batch = BatchConfig {
111 // The max request size is 10MiB, so in order to be comfortably
112 // within this we batch up to 5MiB.
113 max_bytes: Some(batch.max_bytes.unwrap_or(bytesize::mib(5u64) as usize)),
114 max_events: None,
115 ..batch
116 };
117
118 let request = TowerRequestConfig {
119 // The default throughput ceiling defaults are relatively
120 // conservative so we crank them up for New Relic.
121 in_flight_limit: Some(self.request.in_flight_limit.unwrap_or(100)),
122 rate_limit_num: Some(self.request.rate_limit_num.unwrap_or(100)),
123 ..self.request
124 };
125
126 Ok(HttpSinkConfig {
127 uri: uri.into(),
128 method: Some(HttpMethod::Post),
129 healthcheck_uri: None,
130 auth: None,
131 headers: Some(headers),
132 compression: self.compression,
133 encoding: self.encoding.clone().without_default(),
134
135 batch,
136 request,
137
138 tls: None,
139 })
140 }
141 }
142
143 #[cfg(test)]
144 mod tests {
145 use super::*;
146 use crate::{
147 event::Event,
148 sinks::util::test::build_test_server,
149 test_util::{next_addr, runtime, shutdown_on_idle},
150 topology::config::SinkConfig,
151 };
152 use bytes05::buf::BufExt;
153 use futures01::{stream, Sink, Stream};
154 use hyper::Method;
155 use serde_json::Value;
156 use std::io::BufRead;
157
158 #[test]
new_relic_logs_check_config_no_auth()159 fn new_relic_logs_check_config_no_auth() {
160 assert_eq!(
161 format!(
162 "{}",
163 NewRelicLogsConfig::default().create_config().unwrap_err()
164 ),
165 "Missing authentication key, must provide either 'license_key' or 'insert_key'"
166 .to_owned(),
167 );
168 }
169
170 #[test]
new_relic_logs_check_config_defaults()171 fn new_relic_logs_check_config_defaults() {
172 let mut nr_config = NewRelicLogsConfig::default();
173 nr_config.license_key = Some("foo".to_owned());
174 let http_config = nr_config.create_config().unwrap();
175
176 assert_eq!(
177 format!("{}", http_config.uri),
178 "https://log-api.newrelic.com/log/v1".to_string()
179 );
180 assert_eq!(http_config.method, Some(HttpMethod::Post));
181 assert_eq!(http_config.encoding.codec(), &Encoding::Json.into());
182 assert_eq!(
183 http_config.batch.max_bytes,
184 Some(bytesize::mib(5u64) as usize)
185 );
186 assert_eq!(http_config.request.in_flight_limit, Some(100));
187 assert_eq!(http_config.request.rate_limit_num, Some(100));
188 assert_eq!(
189 http_config.headers.unwrap()["X-License-Key"],
190 "foo".to_owned()
191 );
192 assert!(http_config.tls.is_none());
193 assert!(http_config.auth.is_none());
194 }
195
196 #[test]
new_relic_logs_check_config_custom()197 fn new_relic_logs_check_config_custom() {
198 let mut nr_config = NewRelicLogsConfig::default();
199 nr_config.insert_key = Some("foo".to_owned());
200 nr_config.region = Some(NewRelicLogsRegion::Eu);
201 nr_config.batch.max_size = Some(bytesize::mib(8u64) as usize);
202 nr_config.request.in_flight_limit = Some(12);
203 nr_config.request.rate_limit_num = Some(24);
204
205 let http_config = nr_config.create_config().unwrap();
206
207 assert_eq!(
208 format!("{}", http_config.uri),
209 "https://log-api.eu.newrelic.com/log/v1".to_string()
210 );
211 assert_eq!(http_config.method, Some(HttpMethod::Post));
212 assert_eq!(http_config.encoding.codec(), &Encoding::Json.into());
213 assert_eq!(
214 http_config.batch.max_bytes,
215 Some(bytesize::mib(8u64) as usize)
216 );
217 assert_eq!(http_config.request.in_flight_limit, Some(12));
218 assert_eq!(http_config.request.rate_limit_num, Some(24));
219 assert_eq!(
220 http_config.headers.unwrap()["X-Insert-Key"],
221 "foo".to_owned()
222 );
223 assert!(http_config.tls.is_none());
224 assert!(http_config.auth.is_none());
225 }
226
227 #[test]
new_relic_logs_check_config_custom_from_toml()228 fn new_relic_logs_check_config_custom_from_toml() {
229 let config = r#"
230 insert_key = "foo"
231 region = "eu"
232
233 [batch]
234 max_size = 8388608
235
236 [request]
237 in_flight_limit = 12
238 rate_limit_num = 24
239 "#;
240 let nr_config: NewRelicLogsConfig = toml::from_str(&config).unwrap();
241
242 let http_config = nr_config.create_config().unwrap();
243
244 assert_eq!(
245 format!("{}", http_config.uri),
246 "https://log-api.eu.newrelic.com/log/v1".to_string()
247 );
248 assert_eq!(http_config.method, Some(HttpMethod::Post));
249 assert_eq!(http_config.encoding.codec(), &Encoding::Json.into());
250 assert_eq!(
251 http_config.batch.max_bytes,
252 Some(bytesize::mib(8u64) as usize)
253 );
254 assert_eq!(http_config.request.in_flight_limit, Some(12));
255 assert_eq!(http_config.request.rate_limit_num, Some(24));
256 assert_eq!(
257 http_config.headers.unwrap()["X-Insert-Key"],
258 "foo".to_owned()
259 );
260 assert!(http_config.tls.is_none());
261 assert!(http_config.auth.is_none());
262 }
263
264 #[test]
new_relic_logs_happy_path()265 fn new_relic_logs_happy_path() {
266 let in_addr = next_addr();
267
268 let mut nr_config = NewRelicLogsConfig::default();
269 nr_config.license_key = Some("foo".to_owned());
270 let mut http_config = nr_config.create_config().unwrap();
271 http_config.uri = format!("http://{}/fake_nr", in_addr)
272 .parse::<http::Uri>()
273 .unwrap()
274 .into();
275
276 let mut rt = runtime();
277
278 let (sink, _healthcheck) = http_config.build(SinkContext::new_test()).unwrap();
279 let (rx, trigger, server) = build_test_server(in_addr, &mut rt);
280
281 let input_lines = (0..100).map(|i| format!("msg {}", i)).collect::<Vec<_>>();
282 let events = stream::iter_ok(input_lines.clone().into_iter().map(Event::from));
283
284 let pump = sink.send_all(events);
285
286 rt.spawn(server);
287
288 let _ = rt.block_on(pump).unwrap();
289 drop(trigger);
290
291 let output_lines = rx
292 .wait()
293 .map(Result::unwrap)
294 .map(|(parts, body)| {
295 assert_eq!(Method::POST, parts.method);
296 assert_eq!("/fake_nr", parts.uri.path());
297 assert_eq!(
298 parts
299 .headers
300 .get("X-License-Key")
301 .and_then(|v| v.to_str().ok()),
302 Some("foo")
303 );
304 body.reader()
305 })
306 .flat_map(BufRead::lines)
307 .map(Result::unwrap)
308 .flat_map(|s| -> Vec<String> {
309 let vals: Vec<Value> = serde_json::from_str(&s).unwrap();
310 vals.iter()
311 .map(|v| v.get("message").unwrap().as_str().unwrap().to_owned())
312 .collect()
313 })
314 .collect::<Vec<_>>();
315
316 shutdown_on_idle(rt);
317
318 assert_eq!(input_lines, output_lines);
319 }
320 }
321