1 use crate::config::*;
2 use crate::errors::*;
3 use crate::varz::*;
4
5 #[allow(unused_imports)]
6 use futures::prelude::*;
7 use hyper::header::CONTENT_TYPE;
8 use hyper::server::conn::Http;
9 use hyper::service::service_fn;
10 use hyper::{Body, Request, Response, StatusCode};
11 use prometheus::{self, Encoder, TextEncoder};
12 use std::mem;
13 use std::sync::atomic::{AtomicU32, Ordering};
14 use std::sync::Arc;
15 use tokio::net::TcpListener;
16 use tokio::runtime::Handle;
17
18 const METRICS_CONNECTION_TIMEOUT_SECS: u64 = 10;
19 const METRICS_MAX_CONCURRENT_CONNECTIONS: u32 = 2;
20
handle_client_connection( req: Request<Body>, varz: Varz, path: Arc<String>, ) -> Result<Response<Body>, Error>21 async fn handle_client_connection(
22 req: Request<Body>,
23 varz: Varz,
24 path: Arc<String>,
25 ) -> Result<Response<Body>, Error> {
26 let mut buffer = vec![];
27 if req.uri().path() != path.as_str() {
28 let response = Response::builder()
29 .status(StatusCode::NOT_FOUND)
30 .body(Body::empty())?;
31 return Ok(response);
32 }
33 let StartInstant(start_instant) = varz.start_instant;
34 let uptime = start_instant.elapsed().as_secs();
35 varz.uptime.set(uptime as _);
36 let client_queries = varz.client_queries_udp.get() + varz.client_queries_tcp.get();
37 varz.client_queries.set(client_queries as _);
38 let metric_families = prometheus::gather();
39 let encoder = TextEncoder::new();
40 encoder.encode(&metric_families, &mut buffer)?;
41 let response = Response::builder()
42 .header(CONTENT_TYPE, encoder.format_type())
43 .body(buffer.into())?;
44 Ok(response)
45 }
46
47 #[allow(unreachable_code)]
prometheus_service( varz: Varz, metrics_config: MetricsConfig, runtime_handle: Handle, ) -> Result<(), Error>48 pub async fn prometheus_service(
49 varz: Varz,
50 metrics_config: MetricsConfig,
51 runtime_handle: Handle,
52 ) -> Result<(), Error> {
53 let path = Arc::new(metrics_config.path);
54 let stream = TcpListener::bind(metrics_config.listen_addr).await?;
55 let concurrent_connections = Arc::new(AtomicU32::new(0));
56 loop {
57 let (client, _client_addr) = stream.accept().await?;
58 let count = concurrent_connections.fetch_add(1, Ordering::Relaxed);
59 if count >= METRICS_MAX_CONCURRENT_CONNECTIONS {
60 concurrent_connections.fetch_sub(1, Ordering::Relaxed);
61 mem::drop(client);
62 continue;
63 }
64 let path = path.clone();
65 let varz = varz.clone();
66 let service =
67 service_fn(move |req| handle_client_connection(req, varz.clone(), path.clone()));
68 let connection = Http::new().serve_connection(client, service);
69 let concurrent_connections = concurrent_connections.clone();
70 runtime_handle.spawn(
71 tokio::time::timeout(
72 std::time::Duration::from_secs(METRICS_CONNECTION_TIMEOUT_SECS),
73 connection,
74 )
75 .map(move |_| {
76 concurrent_connections.fetch_sub(1, Ordering::Relaxed);
77 }),
78 );
79 }
80 Ok(())
81 }
82