1 //! Kafka Producer - A higher-level API for sending messages to Kafka
2 //! topics.
3 //!
4 //! This module hosts a multi-topic capable producer for a Kafka
5 //! cluster providing a convenient API for sending messages
6 //! synchronously.
7 //!
8 //! In Kafka, each message is a key/value pair where one or the other
9 //! is optional.  A `Record` represents all the data necessary to
10 //! produce such a message to Kafka using the `Producer`.  It
11 //! specifies the target topic and the target partition the message is
12 //! supposed to be delivered to as well as the key and the value.
13 //!
14 //! # Example
15 //! ```no_run
16 //! use std::fmt::Write;
17 //! use std::time::Duration;
18 //! use kafka::producer::{Producer, Record, RequiredAcks};
19 //!
20 //! let mut producer =
21 //!     Producer::from_hosts(vec!("localhost:9092".to_owned()))
22 //!         .with_ack_timeout(Duration::from_secs(1))
23 //!         .with_required_acks(RequiredAcks::One)
24 //!         .create()
25 //!         .unwrap();
26 //!
27 //! let mut buf = String::with_capacity(2);
28 //! for i in 0..10 {
29 //!   let _ = write!(&mut buf, "{}", i); // some computation of the message data to be sent
30 //!   producer.send(&Record::from_value("my-topic", buf.as_bytes())).unwrap();
31 //!   buf.clear();
32 //! }
33 //! ```
34 //!
35 //! In this example, when the `producer.send(..)` returns
36 //! successfully, we are guaranteed the message is delivered to Kafka
37 //! and persisted by at least one Kafka broker.  However, when sending
38 //! multiple messages just like in this example, it is more efficient
39 //! to send them in batches using `Producer::send_all`.
40 //!
41 //! Since some of the `Record`s attributes are optional, convenience
42 //! methods exist to ease their creation.  In this example, the call
43 //! to `Record::from_value` creates a key-less, value-only record with
44 //! an unspecified partition.  The `Record` struct, however, is
45 //! intended to provide full control over its lifecycle to client
46 //! code, and, hence, is fully open.  Its current constructor methods
47 //! are provided for convience only.
48 //!
49 //! Beside the target topic, key, and the value of a `Record`, client
50 //! code is allowed to specify the topic partition the message is
51 //! supposed to be delivered to.  If the partition of a `Record` is
52 //! not specified - more precisely speaking if it's negative -
53 //! `Producer` will rely on its underlying `Partitioner` to find a
54 //! suitable one.  A `Partitioner` implementation can be supplied by
55 //! client code at the `Producer`'s construction time and defaults to
56 //! `DefaultPartitioner`.  See that for more information for its
57 //! strategy to find a partition.
58 
59 // XXX 1) rethink return values for the send_all() method
60 // XXX 2) Handle recoverable errors behind the scenes through retry attempts
61 
62 use std::collections::HashMap;
63 use std::fmt;
64 use std::hash::{Hasher, BuildHasher, BuildHasherDefault};
65 use std::time::Duration;
66 use client::{self, KafkaClient};
67 use error::{ErrorKind, Result};
68 use ref_slice::ref_slice;
69 use twox_hash::XxHash32;
70 
71 #[cfg(feature = "security")]
72 use client::SecurityConfig;
73 
74 #[cfg(not(feature = "security"))]
75 type SecurityConfig = ();
76 use client_internals::KafkaClientInternals;
77 use protocol;
78 
79 // public re-exports
80 pub use client::{Compression, RequiredAcks, ProduceConfirm, ProducePartitionConfirm};
81 
82 /// The default value for `Builder::with_ack_timeout`.
83 pub const DEFAULT_ACK_TIMEOUT_MILLIS: u64 = 30 * 1000;
84 
85 /// The default value for `Builder::with_required_acks`.
86 pub const DEFAULT_REQUIRED_ACKS: RequiredAcks = RequiredAcks::One;
87 
88 // --------------------------------------------------------------------
89 
90 /// A trait used by `Producer` to obtain the bytes `Record::key` and
91 /// `Record::value` represent.  This leaves the choice of the types
92 /// for `key` and `value` with the client.
93 pub trait AsBytes {
as_bytes(&self) -> &[u8]94     fn as_bytes(&self) -> &[u8];
95 }
96 
97 impl AsBytes for () {
as_bytes(&self) -> &[u8]98     fn as_bytes(&self) -> &[u8] {
99         &[]
100     }
101 }
102 
103 // There seems to be some compiler issue with this:
104 // impl<T: AsRef<[u8]>> AsBytes for T {
105 //     fn as_bytes(&self) -> &[u8] { self.as_ref() }
106 // }
107 
108 // for now we provide the impl for some standard library types
109 impl AsBytes for String {
as_bytes(&self) -> &[u8]110     fn as_bytes(&self) -> &[u8] {
111         self.as_ref()
112     }
113 }
114 impl AsBytes for Vec<u8> {
as_bytes(&self) -> &[u8]115     fn as_bytes(&self) -> &[u8] {
116         self.as_ref()
117     }
118 }
119 
120 impl<'a> AsBytes for &'a [u8] {
as_bytes(&self) -> &[u8]121     fn as_bytes(&self) -> &[u8] {
122         self
123     }
124 }
125 impl<'a> AsBytes for &'a str {
as_bytes(&self) -> &[u8]126     fn as_bytes(&self) -> &[u8] {
127         str::as_bytes(self)
128     }
129 }
130 
131 // --------------------------------------------------------------------
132 
133 /// A structure representing a message to be sent to Kafka through the
134 /// `Producer` API.  Such a message is basically a key/value pair
135 /// specifying the target topic and optionally the topic's partition.
136 pub struct Record<'a, K, V> {
137     /// Key data of this (message) record.
138     pub key: K,
139 
140     /// Value data of this (message) record.
141     pub value: V,
142 
143     /// Name of the topic this message is supposed to be delivered to.
144     pub topic: &'a str,
145 
146     /// The partition id of the topic to deliver this message to.
147     /// This partition may be `< 0` in which case it is considered
148     /// "unspecified".  A `Producer` will then typically try to derive
149     /// a partition on its own.
150     pub partition: i32,
151 }
152 
153 impl<'a, K, V> Record<'a, K, V> {
154     /// Convenience function to create a new key/value record with an
155     /// "unspecified" partition - this is, a partition set to a negative
156     /// value.
157     #[inline]
from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V>158     pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
159         Record {
160             key: key,
161             value: value,
162             topic: topic,
163             partition: -1,
164         }
165     }
166 
167     /// Convenience method to set the partition.
168     #[inline]
with_partition(mut self, partition: i32) -> Self169     pub fn with_partition(mut self, partition: i32) -> Self {
170         self.partition = partition;
171         self
172     }
173 }
174 
175 impl<'a, V> Record<'a, (), V> {
176     /// Convenience function to create a new value only record with an
177     /// "unspecified" partition - this is, a partition set to a negative
178     /// value.
179     #[inline]
from_value(topic: &'a str, value: V) -> Record<'a, (), V>180     pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
181         Record {
182             key: (),
183             value: value,
184             topic: topic,
185             partition: -1,
186         }
187     }
188 }
189 
190 impl<'a, K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'a, K, V> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result191     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192         write!(
193             f,
194             "Record {{ topic: {}, partition: {}, key: {:?}, value: {:?} }}",
195             self.topic,
196             self.partition,
197             self.key,
198             self.value
199         )
200     }
201 }
202 
203 // --------------------------------------------------------------------
204 
205 /// The Kafka Producer
206 ///
207 /// See module level documentation.
208 pub struct Producer<P = DefaultPartitioner> {
209     client: KafkaClient,
210     state: State<P>,
211     config: Config,
212 }
213 
214 struct State<P> {
215     /// A list of available partition IDs for each topic.
216     partitions: HashMap<String, Partitions>,
217     /// The partitioner to decide how to distribute messages
218     partitioner: P,
219 }
220 
221 struct Config {
222     /// The maximum time to wait for acknowledgements. See
223     /// `KafkaClient::produce_messages`.
224     ack_timeout: i32,
225     /// The number of acks to request. See
226     /// `KafkaClient::produce_messages`.
227     required_acks: i16,
228 }
229 
230 impl Producer {
231     /// Starts building a new producer using the given Kafka client.
from_client(client: KafkaClient) -> Builder<DefaultPartitioner>232     pub fn from_client(client: KafkaClient) -> Builder<DefaultPartitioner> {
233         Builder::new(Some(client), Vec::new())
234     }
235 
236     /// Starts building a producer bootstraping internally a new kafka
237     /// client from the given kafka hosts.
from_hosts(hosts: Vec<String>) -> Builder<DefaultPartitioner>238     pub fn from_hosts(hosts: Vec<String>) -> Builder<DefaultPartitioner> {
239         Builder::new(None, hosts)
240     }
241 
242     /// Borrows the underlying kafka client.
client(&self) -> &KafkaClient243     pub fn client(&self) -> &KafkaClient {
244         &self.client
245     }
246 
247     /// Destroys this producer returning the underlying kafka client.
into_client(self) -> KafkaClient248     pub fn into_client(self) -> KafkaClient {
249         self.client
250     }
251 }
252 
253 
254 impl<P: Partitioner> Producer<P> {
255     /// Synchronously send the specified message to Kafka.
send<'a, K, V>(&mut self, rec: &Record<'a, K, V>) -> Result<()> where K: AsBytes, V: AsBytes,256     pub fn send<'a, K, V>(&mut self, rec: &Record<'a, K, V>) -> Result<()>
257     where
258         K: AsBytes,
259         V: AsBytes,
260     {
261         let mut rs = try!(self.send_all(ref_slice(rec)));
262 
263         if self.config.required_acks == 0 {
264             // ~ with no required_acks we get no response and
265             // consider the send-data request blindly as successful
266             Ok(())
267         } else {
268             assert_eq!(1, rs.len());
269             let mut produce_confirm = rs.pop().unwrap();
270 
271             assert_eq!(1, produce_confirm.partition_confirms.len());
272             produce_confirm
273                 .partition_confirms
274                 .pop()
275                 .unwrap()
276                 .offset
277                 .map(|_| ())
278                 .map_err(|err| ErrorKind::Kafka(err).into())
279         }
280     }
281 
282     /// Synchronously send all of the specified messages to Kafka. To validate
283     /// that all of the specified records have been successfully delivered,
284     /// inspection of the offsets on the returned confirms is necessary.
send_all<'a, K, V>(&mut self, recs: &[Record<'a, K, V>]) -> Result<Vec<ProduceConfirm>> where K: AsBytes, V: AsBytes,285     pub fn send_all<'a, K, V>(&mut self, recs: &[Record<'a, K, V>]) -> Result<Vec<ProduceConfirm>>
286     where
287         K: AsBytes,
288         V: AsBytes,
289     {
290         let partitioner = &mut self.state.partitioner;
291         let partitions = &self.state.partitions;
292         let client = &mut self.client;
293         let config = &self.config;
294 
295         client.internal_produce_messages(
296             config.required_acks,
297             config.ack_timeout,
298             recs.into_iter().map(|r| {
299                 let mut m = client::ProduceMessage {
300                     key: to_option(r.key.as_bytes()),
301                     value: to_option(r.value.as_bytes()),
302                     topic: r.topic,
303                     partition: r.partition,
304                 };
305                 partitioner.partition(Topics::new(partitions), &mut m);
306                 m
307             }),
308         )
309     }
310 }
311 
to_option(data: &[u8]) -> Option<&[u8]>312 fn to_option(data: &[u8]) -> Option<&[u8]> {
313     if data.is_empty() { None } else { Some(data) }
314 }
315 
316 // --------------------------------------------------------------------
317 
318 impl<P> State<P> {
new(client: &mut KafkaClient, partitioner: P) -> Result<State<P>>319     fn new(client: &mut KafkaClient, partitioner: P) -> Result<State<P>> {
320         let ts = client.topics();
321         let mut ids = HashMap::with_capacity(ts.len());
322         for t in ts {
323             let ps = t.partitions();
324             ids.insert(
325                 t.name().to_owned(),
326                 Partitions {
327                     available_ids: ps.available_ids(),
328                     num_all_partitions: ps.len() as u32,
329                 },
330             );
331         }
332         Ok(State {
333             partitions: ids,
334             partitioner: partitioner,
335         })
336     }
337 }
338 
339 // --------------------------------------------------------------------
340 
341 /// A Kafka Producer builder easing the process of setting up various
342 /// configuration settings.
343 pub struct Builder<P = DefaultPartitioner> {
344     client: Option<KafkaClient>,
345     hosts: Vec<String>,
346     compression: Compression,
347     ack_timeout: Duration,
348     conn_idle_timeout: Duration,
349     required_acks: RequiredAcks,
350     partitioner: P,
351     security_config: Option<SecurityConfig>,
352     client_id: Option<String>,
353 }
354 
355 impl Builder {
new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder<DefaultPartitioner>356     fn new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder<DefaultPartitioner> {
357         let mut b = Builder {
358             client: client,
359             hosts: hosts,
360             compression: client::DEFAULT_COMPRESSION,
361             ack_timeout: Duration::from_millis(DEFAULT_ACK_TIMEOUT_MILLIS),
362             conn_idle_timeout: Duration::from_millis(
363                 client::DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS,
364             ),
365             required_acks: DEFAULT_REQUIRED_ACKS,
366             partitioner: DefaultPartitioner::default(),
367             security_config: None,
368             client_id: None,
369         };
370         if let Some(ref c) = b.client {
371             b.compression = c.compression();
372             b.conn_idle_timeout = c.connection_idle_timeout();
373         }
374         b
375     }
376 
377     /// Specifies the security config to use.
378     /// See `KafkaClient::new_secure` for more info.
379     #[cfg(feature = "security")]
with_security(mut self, security: SecurityConfig) -> Self380     pub fn with_security(mut self, security: SecurityConfig) -> Self {
381         self.security_config = Some(security);
382         self
383     }
384 
385     /// Sets the compression algorithm to use when sending out data.
386     ///
387     /// See `KafkaClient::set_compression`.
with_compression(mut self, compression: Compression) -> Self388     pub fn with_compression(mut self, compression: Compression) -> Self {
389         self.compression = compression;
390         self
391     }
392 
393     /// Sets the maximum time the kafka brokers can await the receipt
394     /// of required acknowledgements (which is specified through
395     /// `Builder::with_required_acks`.)  Note that Kafka explicitely
396     /// documents this not to be a hard limit.
with_ack_timeout(mut self, timeout: Duration) -> Self397     pub fn with_ack_timeout(mut self, timeout: Duration) -> Self {
398         self.ack_timeout = timeout;
399         self
400     }
401 
402     /// Specifies the timeout for idle connections.
403     /// See `KafkaClient::set_connection_idle_timeout`.
with_connection_idle_timeout(mut self, timeout: Duration) -> Self404     pub fn with_connection_idle_timeout(mut self, timeout: Duration) -> Self {
405         self.conn_idle_timeout = timeout;
406         self
407     }
408 
409     /// Sets how many acknowledgements the kafka brokers should
410     /// receive before responding to sent messages.
411     ///
412     /// See `RequiredAcks`.
with_required_acks(mut self, acks: RequiredAcks) -> Self413     pub fn with_required_acks(mut self, acks: RequiredAcks) -> Self {
414         self.required_acks = acks;
415         self
416     }
417 
418     /// Specifies a client_id to be sent along every request to Kafka
419     /// brokers. See `KafkaClient::set_client_id`.
with_client_id(mut self, client_id: String) -> Self420     pub fn with_client_id(mut self, client_id: String) -> Self {
421         self.client_id = Some(client_id);
422         self
423     }
424 }
425 
426 impl<P> Builder<P> {
427     /// Sets the partitioner to dispatch when sending messages without
428     /// an explicit partition assignment.
with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q>429     pub fn with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q> {
430         Builder {
431             client: self.client,
432             hosts: self.hosts,
433             compression: self.compression,
434             ack_timeout: self.ack_timeout,
435             conn_idle_timeout: self.conn_idle_timeout,
436             required_acks: self.required_acks,
437             partitioner: partitioner,
438             security_config: None,
439             client_id: None,
440         }
441     }
442 
443     #[cfg(not(feature = "security"))]
new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient444     fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {
445         KafkaClient::new(hosts)
446     }
447 
448     #[cfg(feature = "security")]
new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient449     fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
450         if let Some(security) = security {
451             KafkaClient::new_secure(hosts, security)
452         } else {
453             KafkaClient::new(hosts)
454         }
455     }
456 
457     /// Finally creates/builds a new producer based on the so far
458     /// supplied settings.
create(self) -> Result<Producer<P>>459     pub fn create(self) -> Result<Producer<P>> {
460         // ~ create the client if necessary
461         let (mut client, need_metadata) = match self.client {
462             Some(client) => (client, false),
463             None => (Self::new_kafka_client(self.hosts, self.security_config), true),
464         };
465         // ~ apply configuration settings
466         client.set_compression(self.compression);
467         client.set_connection_idle_timeout(self.conn_idle_timeout);
468         if let Some(client_id) = self.client_id {
469             client.set_client_id(client_id);
470         }
471         let producer_config = Config {
472             ack_timeout: try!(protocol::to_millis_i32(self.ack_timeout)),
473             required_acks: self.required_acks as i16,
474         };
475         // ~ load metadata if necessary
476         if need_metadata {
477             try!(client.load_metadata_all());
478         }
479         // ~ create producer state
480         let state = try!(State::new(&mut client, self.partitioner));
481         Ok(Producer {
482             client: client,
483             state: state,
484             config: producer_config,
485         })
486     }
487 }
488 
489 // --------------------------------------------------------------------
490 
491 /// A description of available topics and their available partitions.
492 ///
493 /// Indented for use by `Partitioner`s.
494 pub struct Topics<'a> {
495     partitions: &'a HashMap<String, Partitions>,
496 }
497 
498 /// Producer relevant partition information of a particular topic.
499 ///
500 /// Indented for use by `Partition`s.
501 pub struct Partitions {
502     available_ids: Vec<i32>,
503     num_all_partitions: u32,
504 }
505 
506 impl Partitions {
507     /// Retrieves the list of the identifiers of currently "available"
508     /// partitions for the given topic.  This list excludes partitions
509     /// which do not have a leader broker assigned.
510     #[inline]
available_ids(&self) -> &[i32]511     pub fn available_ids(&self) -> &[i32] {
512         &self.available_ids
513     }
514 
515     /// Retrieves the number of "available" partitions. This is a
516     /// merely a convenience method. See `Partitions::available_ids`.
517     #[inline]
num_available(&self) -> u32518     pub fn num_available(&self) -> u32 {
519         self.available_ids.len() as u32
520     }
521 
522     /// The total number of partitions of the underlygin topic.  This
523     /// number includes also partitions without a current leader
524     /// assignment.
525     #[inline]
num_all(&self) -> u32526     pub fn num_all(&self) -> u32 {
527         self.num_all_partitions
528     }
529 }
530 
531 impl<'a> Topics<'a> {
new(partitions: &'a HashMap<String, Partitions>) -> Topics<'a>532     fn new(partitions: &'a HashMap<String, Partitions>) -> Topics<'a> {
533         Topics { partitions: partitions }
534     }
535 
536     /// Retrieves informationa about a topic's partitions.
537     #[inline]
partitions(&'a self, topic: &str) -> Option<&'a Partitions>538     pub fn partitions(&'a self, topic: &str) -> Option<&'a Partitions> {
539         self.partitions.get(topic)
540     }
541 }
542 
543 /// A partitioner is given a chance to choose/redefine a partition for
544 /// a message to be sent to Kafka.  See also
545 /// `Record#with_partition`.
546 ///
547 /// Implementations can be stateful.
548 pub trait Partitioner {
549     /// Supposed to inspect the given message and if desired re-assign
550     /// the message's target partition.
551     ///
552     /// `topics` a description of the currently known topics and their
553     /// currently available partitions.
554     ///
555     /// `msg` the message whose partition assignment potentially to
556     /// change.
partition(&mut self, topics: Topics, msg: &mut client::ProduceMessage)557     fn partition(&mut self, topics: Topics, msg: &mut client::ProduceMessage);
558 }
559 
560 /// The default hasher implementation used of `DefaultPartitioner`.
561 pub type DefaultHasher = XxHash32;
562 
563 /// As its name implies `DefaultPartitioner` is the default
564 /// partitioner for `Producer`.
565 ///
566 /// For every message it proceedes as follows:
567 ///
568 /// - If the messages contains a non-negative partition value it
569 /// leaves the message untouched.  This will cause `Producer` to try
570 /// to send the message to exactly that partition to.
571 ///
572 /// - Otherwise, if the message has an "unspecified" `partition` -
573 /// this is, it has a negative partition value - and a specified key,
574 /// `DefaultPartitioner` will compute a hash from the key using the
575 /// underlying hasher and take `hash % num_all_partitions` to derive
576 /// the partition to send the message to.  This will consistently
577 /// cause messages with the same key to be sent to the same partition.
578 ///
579 /// - Otherwise - a message with an "unspecified" `partition` and no
580 /// key - `DefaultPartitioner` will "randomly" pick one from the
581 /// "available" partitions trying to distribute the messages across
582 /// the multiple partitions.  In particular, it tries to distribute
583 /// such messsages across the "available" partitions in a round robin
584 /// fashion.  "Available" it this context means partitions with a
585 /// known leader.
586 ///
587 /// This behavior may not suffice every workload.  If your application
588 /// is dependent on a particular distribution scheme different from
589 /// the one outlined above, you want to provide your own partioner to
590 /// the `Producer` at its initialization time.
591 ///
592 /// See `Builder::with_partitioner`.
593 #[derive(Default)]
594 pub struct DefaultPartitioner<H = BuildHasherDefault<DefaultHasher>> {
595     // ~ a hasher builder; used to consistently hash keys
596     hash_builder: H,
597     // ~ a counter incremented with each partitioned message to
598     // achieve a different partition assignment for each message
599     cntr: u32,
600 }
601 
602 impl DefaultPartitioner {
603     /// Creates a new partitioner which will use the given hash
604     /// builder to hash message keys.
with_hasher<B: BuildHasher>(hash_builder: B) -> DefaultPartitioner<B>605     pub fn with_hasher<B: BuildHasher>(hash_builder: B) -> DefaultPartitioner<B> {
606         DefaultPartitioner {
607             hash_builder: hash_builder.into(),
608             cntr: 0,
609         }
610     }
611 
with_default_hasher<B>() -> DefaultPartitioner<BuildHasherDefault<B>> where B: Hasher + Default,612     pub fn with_default_hasher<B>() -> DefaultPartitioner<BuildHasherDefault<B>>
613     where
614         B: Hasher + Default,
615     {
616         DefaultPartitioner {
617             hash_builder: BuildHasherDefault::<B>::default(),
618             cntr: 0,
619         }
620     }
621 }
622 
623 impl<H: BuildHasher> Partitioner for DefaultPartitioner<H> {
624     #[allow(unused_variables)]
partition(&mut self, topics: Topics, rec: &mut client::ProduceMessage)625     fn partition(&mut self, topics: Topics, rec: &mut client::ProduceMessage) {
626         if rec.partition >= 0 {
627             // ~ partition explicitely defined, trust the user
628             return;
629         }
630         let partitions = match topics.partitions(rec.topic) {
631             None => return, // ~ unknown topic, this is not the place to deal with it.
632             Some(partitions) => partitions,
633         };
634         match rec.key {
635             Some(key) => {
636                 let num_partitions = partitions.num_all();
637                 if num_partitions == 0 {
638                     // ~ no partitions at all ... a rather strange
639                     // topic. again, this is not the right place to
640                     // deal with it.
641                     return;
642                 }
643                 let mut h = self.hash_builder.build_hasher();
644                 h.write(key);
645                 // ~ unconditionally dispatch to partitions no matter
646                 // whether they are currently available or not.  this
647                 // guarantees consistency which is the point of
648                 // partitioning by key.  other behaviour - if desired
649                 // - can be implemented in custom, user provided
650                 // partitioners.
651                 let hash = h.finish() as u32;
652                 // if `num_partitions == u32::MAX` this can lead to a
653                 // negative partition ... such a partition count is very
654                 // unlikely though
655                 rec.partition = (hash % num_partitions) as i32;
656             }
657             None => {
658                 // ~ no key available, determine a partition from the
659                 // available ones.
660                 let avail = partitions.available_ids();
661                 if avail.len() > 0 {
662                     rec.partition = avail[self.cntr as usize % avail.len()];
663                     // ~ update internal state so that the next time we choose
664                     // a different partition
665                     self.cntr = self.cntr.wrapping_add(1);
666                 }
667             }
668         }
669     }
670 }
671 
672 // --------------------------------------------------------------------
673 
674 #[cfg(test)]
675 mod default_partitioner_tests {
676     use std::hash::{Hasher, BuildHasherDefault};
677     use std::collections::HashMap;
678 
679     use client;
680     use super::{DefaultPartitioner, DefaultHasher, Partitioner, Partitions, Topics};
681 
topics_map(topics: Vec<(&str, Partitions)>) -> HashMap<String, Partitions>682     fn topics_map(topics: Vec<(&str, Partitions)>) -> HashMap<String, Partitions> {
683         let mut h = HashMap::new();
684         for topic in topics {
685             h.insert(topic.0.into(), topic.1);
686         }
687         h
688     }
689 
assert_partitioning<P: Partitioner>( topics: &HashMap<String, Partitions>, p: &mut P, topic: &str, key: &str, ) -> i32690     fn assert_partitioning<P: Partitioner>(
691         topics: &HashMap<String, Partitions>,
692         p: &mut P,
693         topic: &str,
694         key: &str,
695     ) -> i32 {
696         let mut msg = client::ProduceMessage {
697             key: Some(key.as_bytes()),
698             value: None,
699             topic: topic,
700             partition: -1,
701         };
702         p.partition(Topics::new(topics), &mut msg);
703         let num_partitions = topics.get(topic).unwrap().num_all_partitions as i32;
704         assert!(msg.partition >= 0 && msg.partition < num_partitions);
705         msg.partition
706     }
707 
708     /// Validate consistent partitioning on a message's key
709     #[test]
test_key_partitioning()710     fn test_key_partitioning() {
711         let h = topics_map(vec![
712             (
713                 "foo",
714                 Partitions {
715                     available_ids: vec![0, 1, 4],
716                     num_all_partitions: 5,
717                 }
718             ),
719             (
720                 "bar",
721                 Partitions {
722                     available_ids: vec![0, 1],
723                     num_all_partitions: 2,
724                 }
725             ),
726         ]);
727 
728         let mut p: DefaultPartitioner<BuildHasherDefault<DefaultHasher>> = Default::default();
729 
730         // ~ validate that partitioning by the same key leads to the same
731         // partition
732         let h1 = assert_partitioning(&h, &mut p, "foo", "foo-key");
733         let h2 = assert_partitioning(&h, &mut p, "foo", "foo-key");
734         assert_eq!(h1, h2);
735 
736         // ~ validate that partitioning by different keys leads to
737         // different partitions (the keys are chosen such that they lead
738         // to different partitions)
739         let h3 = assert_partitioning(&h, &mut p, "foo", "foo-key");
740         let h4 = assert_partitioning(&h, &mut p, "foo", "bar-key");
741         assert!(h3 != h4);
742     }
743 
744     #[derive(Default)]
745     struct MyCustomHasher(u64);
746 
747     impl Hasher for MyCustomHasher {
finish(&self) -> u64748         fn finish(&self) -> u64 {
749             self.0
750         }
write(&mut self, bytes: &[u8])751         fn write(&mut self, bytes: &[u8]) {
752             self.0 = bytes[0] as u64;
753         }
754     }
755 
756     /// Validate it is possible to register a custom hasher with the
757     /// default partitioner
758     #[test]
default_partitioner_with_custom_hasher_default()759     fn default_partitioner_with_custom_hasher_default() {
760         // this must compile
761         let mut p = DefaultPartitioner::with_default_hasher::<MyCustomHasher>();
762 
763         let h = topics_map(vec![
764             (
765                 "confirms",
766                 Partitions {
767                     available_ids: vec![0, 1],
768                     num_all_partitions: 2,
769                 }
770             ),
771             (
772                 "contents",
773                 Partitions {
774                     available_ids: vec![0, 1, 9],
775                     num_all_partitions: 10,
776                 }
777             ),
778         ]);
779 
780         // verify also the partitioner derives the correct partition
781         // ... this is hash modulo num_all_partitions. here it is a
782         // topic with a total of 2 partitions.
783         let p1 = assert_partitioning(&h, &mut p, "confirms", "A" /* ascii: 65 */);
784         assert_eq!(1, p1);
785 
786         // here it is a topic with a total of 10 partitions
787         let p2 = assert_partitioning(&h, &mut p, "contents", "B" /* ascii: 66 */);
788         assert_eq!(6, p2);
789     }
790 }
791