1 use std::convert::TryInto;
2 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3 
4 use clap::{App, Arg};
5 use futures::stream::StreamExt;
6 use hdrhistogram::Histogram;
7 
8 use rdkafka::config::ClientConfig;
9 use rdkafka::consumer::{Consumer, StreamConsumer};
10 use rdkafka::message::Message;
11 use rdkafka::producer::{FutureProducer, FutureRecord};
12 
13 use crate::example_utils::setup_logger;
14 
15 mod example_utils;
16 
17 #[tokio::main]
main()18 async fn main() {
19     let matches = App::new("Roundtrip example")
20         .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
21         .about("Measures latency between producer and consumer")
22         .arg(
23             Arg::with_name("brokers")
24                 .short("b")
25                 .long("brokers")
26                 .help("Broker list in kafka format")
27                 .takes_value(true)
28                 .default_value("localhost:9092"),
29         )
30         .arg(
31             Arg::with_name("topic")
32                 .long("topic")
33                 .help("topic")
34                 .takes_value(true)
35                 .required(true),
36         )
37         .arg(
38             Arg::with_name("log-conf")
39                 .long("log-conf")
40                 .help("Configure the logging format (example: 'rdkafka=trace')")
41                 .takes_value(true),
42         )
43         .get_matches();
44 
45     setup_logger(true, matches.value_of("log-conf"));
46 
47     let brokers = matches.value_of("brokers").unwrap();
48     let topic = matches.value_of("topic").unwrap().to_owned();
49 
50     let producer: FutureProducer = ClientConfig::new()
51         .set("bootstrap.servers", brokers)
52         .set("message.timeout.ms", "5000")
53         .create()
54         .expect("Producer creation error");
55 
56     let consumer: StreamConsumer = ClientConfig::new()
57         .set("bootstrap.servers", brokers)
58         .set("session.timeout.ms", "6000")
59         .set("enable.auto.commit", "false")
60         .set("group.id", "rust-rdkafka-roundtrip-example")
61         .create()
62         .expect("Consumer creation failed");
63     consumer.subscribe(&[&topic]).unwrap();
64 
65     tokio::spawn(async move {
66         let mut i = 0_usize;
67         loop {
68             producer
69                 .send_result(
70                     FutureRecord::to(&topic)
71                         .key(&i.to_string())
72                         .payload("dummy")
73                         .timestamp(now()),
74                 )
75                 .unwrap()
76                 .await
77                 .unwrap()
78                 .unwrap();
79             i += 1;
80         }
81     });
82 
83     let start = Instant::now();
84     let mut stream = consumer.start();
85     let mut latencies = Histogram::<u64>::new(5).unwrap();
86     println!("Warming up for 10s...");
87     while let Some(message) = stream.next().await {
88         let message = message.unwrap();
89         let then = message.timestamp().to_millis().unwrap();
90         if start.elapsed() < Duration::from_secs(10) {
91             // Warming up.
92         } else if start.elapsed() < Duration::from_secs(20) {
93             if latencies.len() == 0 {
94                 println!("Recording for 10s...");
95             }
96             latencies += (now() - then) as u64;
97         } else {
98             break;
99         }
100     }
101 
102     println!("measurements: {}", latencies.len());
103     println!("mean latency: {}ms", latencies.mean());
104     println!("p50 latency:  {}ms", latencies.value_at_quantile(0.50));
105     println!("p90 latency:  {}ms", latencies.value_at_quantile(0.90));
106     println!("p99 latency:  {}ms", latencies.value_at_quantile(0.99));
107 }
108 
now() -> i64109 fn now() -> i64 {
110     SystemTime::now()
111         .duration_since(UNIX_EPOCH)
112         .unwrap()
113         .as_millis()
114         .try_into()
115         .unwrap()
116 }
117