1 //! Message publication
2 use futures::channel::oneshot;
3 use futures::future::try_join_all;
4 use std::collections::{BTreeMap, HashMap, VecDeque};
5 use std::io::Write;
6 use std::sync::{Arc, Mutex};
7 
8 use crate::client::SerializeMessage;
9 use crate::connection::{Connection, SerialId};
10 use crate::error::{ConnectionError, ProducerError};
11 use crate::executor::Executor;
12 use crate::message::proto::{self, CommandSendReceipt, CompressionType, EncryptionKeys, Schema};
13 use crate::message::BatchedMessage;
14 use crate::{Error, Pulsar};
15 use futures::task::{Context, Poll};
16 use futures::Future;
17 use tokio::macros::support::Pin;
18 
19 type ProducerId = u64;
20 type ProducerName = String;
21 
22 pub struct SendFuture(pub(crate) oneshot::Receiver<Result<CommandSendReceipt, Error>>);
23 
24 impl Future for SendFuture {
25     type Output = Result<CommandSendReceipt, Error>;
26 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>27     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
28         match Pin::new(&mut self.0).poll(cx) {
29             Poll::Ready(Ok(r)) => Poll::Ready(r),
30             Poll::Ready(Err(_)) => Poll::Ready(Err(ProducerError::Custom(
31                 "producer unexpectedly disconnected".into(),
32             )
33             .into())),
34             Poll::Pending => Poll::Pending,
35         }
36     }
37 }
38 
39 /// message data that will be sent on a topic
40 ///
41 /// generated from the [SerializeMessage] trait or [MessageBuilder]
42 ///
43 /// this is actually a subset of the fields of a message, because batching,
44 /// compression and encryption should be handled by the producer
45 #[derive(Debug, Clone, Default)]
46 pub struct Message {
47     /// serialized data
48     pub payload: Vec<u8>,
49     pub properties: HashMap<String, String>,
50     /// key to decide partition for the msg
51     pub partition_key: ::std::option::Option<String>,
52     /// Override namespace's replication
53     pub replicate_to: ::std::vec::Vec<String>,
54     /// the timestamp that this event occurs. it is typically set by applications.
55     /// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
56     pub event_time: ::std::option::Option<u64>,
57     pub schema_version: ::std::option::Option<Vec<u8>>,
58 }
59 
60 /// internal message type carrying options that must be defined
61 /// by the producer
62 #[derive(Debug, Clone, Default)]
63 pub(crate) struct ProducerMessage {
64     pub payload: Vec<u8>,
65     pub properties: HashMap<String, String>,
66     ///key to decide partition for the msg
67     pub partition_key: ::std::option::Option<String>,
68     /// Override namespace's replication
69     pub replicate_to: ::std::vec::Vec<String>,
70     pub compression: ::std::option::Option<i32>,
71     pub uncompressed_size: ::std::option::Option<u32>,
72     /// Removed below checksum field from Metadata as
73     /// it should be part of send-command which keeps checksum of header + payload
74     ///optional sfixed64 checksum = 10;
75     ///differentiate single and batch message metadata
76     pub num_messages_in_batch: ::std::option::Option<i32>,
77     pub event_time: ::std::option::Option<u64>,
78     /// Contains encryption key name, encrypted key and metadata to describe the key
79     pub encryption_keys: ::std::vec::Vec<EncryptionKeys>,
80     /// Algorithm used to encrypt data key
81     pub encryption_algo: ::std::option::Option<String>,
82     /// Additional parameters required by encryption
83     pub encryption_param: ::std::option::Option<Vec<u8>>,
84     pub schema_version: ::std::option::Option<Vec<u8>>,
85 }
86 
87 impl From<Message> for ProducerMessage {
from(m: Message) -> Self88     fn from(m: Message) -> Self {
89         ProducerMessage {
90             payload: m.payload,
91             properties: m.properties,
92             partition_key: m.partition_key,
93             replicate_to: m.replicate_to,
94             event_time: m.event_time,
95             schema_version: m.schema_version,
96             ..Default::default()
97         }
98     }
99 }
100 
101 /// Configuration options for producers
102 #[derive(Clone, Default)]
103 pub struct ProducerOptions {
104     pub encrypted: Option<bool>,
105     pub metadata: BTreeMap<String, String>,
106     pub schema: Option<Schema>,
107     pub batch_size: Option<u32>,
108     pub compression: Option<proto::CompressionType>,
109 }
110 
111 /// Wrapper structure that manges multiple producers at once, creating them as needed
112 /// ```rust,no_run
113 /// use pulsar::{Pulsar, TokioExecutor};
114 ///
115 /// # async fn test() -> Result<(), pulsar::Error> {
116 /// # let addr = "pulsar://127.0.0.1:6650";
117 /// # let topic = "topic";
118 /// # let message = "data".to_owned();
119 /// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
120 /// let mut producer = pulsar.producer()
121 ///     .with_name("name")
122 ///     .build_multi_topic();
123 /// let send_1 = producer.send(topic, &message).await?;
124 /// let send_2 = producer.send(topic, &message).await?;
125 /// send_1.await?;
126 /// send_2.await?;
127 /// # Ok(())
128 /// # }
129 /// ```
130 pub struct MultiTopicProducer<Exe: Executor> {
131     client: Pulsar<Exe>,
132     producers: BTreeMap<String, Producer<Exe>>,
133     options: ProducerOptions,
134     name: Option<String>,
135 }
136 
137 impl<Exe: Executor> MultiTopicProducer<Exe> {
options(&self) -> &ProducerOptions138     pub fn options(&self) -> &ProducerOptions {
139         &self.options
140     }
141 
topics(&self) -> Vec<String>142     pub fn topics(&self) -> Vec<String> {
143         self.producers.keys().cloned().collect()
144     }
145 
close_producer<S: Into<String>>(&mut self, topic: S) -> Result<(), Error>146     pub async fn close_producer<S: Into<String>>(&mut self, topic: S) -> Result<(), Error> {
147         let partitions = self.client.lookup_partitioned_topic(topic).await?;
148         for (topic, _) in partitions {
149             self.producers.remove(&topic);
150         }
151         Ok(())
152     }
153 
send<T: SerializeMessage + Sized, S: Into<String>>( &mut self, topic: S, message: T, ) -> Result<SendFuture, Error>154     pub async fn send<T: SerializeMessage + Sized, S: Into<String>>(
155         &mut self,
156         topic: S,
157         message: T,
158     ) -> Result<SendFuture, Error> {
159         let message = T::serialize_message(message)?;
160         let topic = topic.into();
161         if !self.producers.contains_key(&topic) {
162             let mut builder = self
163                 .client
164                 .producer()
165                 .with_topic(&topic)
166                 .with_options(self.options.clone());
167             if let Some(name) = &self.name {
168                 builder = builder.with_name(name.clone());
169             }
170             let producer = builder
171                 .build()
172                 .await?;
173             self.producers.insert(topic.clone(), producer);
174         }
175 
176         let producer = self.producers.get_mut(&topic).unwrap();
177         producer.send(message).await
178     }
179 
send_all<'a, 'b, T, S, I>( &mut self, topic: S, messages: I, ) -> Result<Vec<SendFuture>, Error> where 'b: 'a, T: 'b + SerializeMessage + Sized, I: IntoIterator<Item = T>, S: Into<String>,180     pub async fn send_all<'a, 'b, T, S, I>(
181         &mut self,
182         topic: S,
183         messages: I,
184     ) -> Result<Vec<SendFuture>, Error>
185     where
186         'b: 'a,
187         T: 'b + SerializeMessage + Sized,
188         I: IntoIterator<Item = T>,
189         S: Into<String>,
190     {
191         let topic = topic.into();
192         let mut sends = Vec::new();
193         for msg in messages {
194             sends.push(self.send(&topic, msg).await);
195         }
196         // TODO determine whether to keep this approach or go with the partial send, but more mem friendly lazy approach.
197         // serialize all messages before sending to avoid a partial send
198         if sends.iter().all(|s| s.is_ok()) {
199             Ok(sends.into_iter().map(|s| s.unwrap()).collect())
200         } else {
201             Err(ProducerError::PartialSend(sends).into())
202         }
203     }
204 }
205 
206 pub struct Producer<Exe: Executor> {
207     inner: ProducerInner<Exe>,
208 }
209 
210 impl<Exe: Executor> Producer<Exe> {
builder(pulsar: &Pulsar<Exe>) -> ProducerBuilder<Exe>211     pub fn builder(pulsar: &Pulsar<Exe>) -> ProducerBuilder<Exe> {
212         ProducerBuilder::new(&pulsar)
213     }
214 
topic(&self) -> &str215     pub fn topic(&self) -> &str {
216         match &self.inner {
217             ProducerInner::Single(p) => p.topic(),
218             ProducerInner::Partitioned(p) => &p.topic,
219         }
220     }
221 
partitions(&self) -> Option<Vec<String>>222     pub fn partitions(&self) -> Option<Vec<String>> {
223         match &self.inner {
224             ProducerInner::Single(_) => None,
225             ProducerInner::Partitioned(p) => {
226                 Some(
227                     p.producers.iter()
228                         .map(|p| p.topic().to_owned())
229                         .collect()
230                 )
231             }
232         }
233     }
234 
options(&self) -> &ProducerOptions235     pub fn options(&self) -> &ProducerOptions {
236         match &self.inner {
237             ProducerInner::Single(p) => p.options(),
238             ProducerInner::Partitioned(p) => &p.options,
239         }
240     }
241 
create_message(&mut self) -> MessageBuilder<(), Exe>242     pub fn create_message(&mut self) -> MessageBuilder<(), Exe> {
243         MessageBuilder::new(self)
244     }
245 
check_connection(&self) -> Result<(), Error>246     pub async fn check_connection(&self) -> Result<(), Error> {
247         match &self.inner {
248             ProducerInner::Single(p) => p.check_connection().await,
249             ProducerInner::Partitioned(p) => {
250                 try_join_all(p.producers.iter().map(|p| p.check_connection()))
251                     .await
252                     .map(drop)
253             }
254         }
255     }
256 
257     /// Sends a message
258     ///
259     /// this function returns a `SendFuture` because the receipt can come long after
260     /// this function was called, for various reasons:
261     /// - the message was sent successfully but Pulsar did not send the receipt yet
262     /// - the producer is batching messages, so this function must return immediately,
263     /// and the receipt will come when the batched messages are actually sent
264     ///
265     /// Usage:
266     ///
267     /// ```rust,no_run
268     /// # async fn run(mut producer: pulsar::Producer<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
269     /// let f1 = producer.send("hello").await?;
270     /// let f2 = producer.send("world").await?;
271     /// let receipt1 = f1.await?;
272     /// let receipt2 = f2.await?;
273     /// # Ok(())
274     /// # }
275     /// ```
send<T: SerializeMessage + Sized>( &mut self, message: T, ) -> Result<SendFuture, Error>276     pub async fn send<T: SerializeMessage + Sized>(
277         &mut self,
278         message: T,
279     ) -> Result<SendFuture, Error> {
280         match &mut self.inner {
281             ProducerInner::Single(p) => p.send(message).await,
282             ProducerInner::Partitioned(p) => p.next().send(message).await,
283         }
284     }
285 
send_all<T, I>(&mut self, messages: I) -> Result<Vec<SendFuture>, Error> where T: SerializeMessage, I: IntoIterator<Item = T>,286     pub async fn send_all<T, I>(&mut self, messages: I) -> Result<Vec<SendFuture>, Error>
287     where
288         T: SerializeMessage,
289         I: IntoIterator<Item = T>,
290     {
291         let producer = match &mut self.inner {
292             ProducerInner::Single(p) => p,
293             ProducerInner::Partitioned(p) => p.next(),
294         };
295         let mut sends = Vec::new();
296         for message in messages {
297             sends.push(producer.send(message).await);
298         }
299         if sends.iter().all(|s| s.is_ok()) {
300             Ok(sends.into_iter().map(|s| s.unwrap()).collect())
301         } else {
302             Err(ProducerError::PartialSend(sends).into())
303         }
304     }
305 
306     /// sends the current batch of messages
send_batch(&mut self) -> Result<(), Error>307     pub async fn send_batch(&mut self) -> Result<(), Error> {
308         match &mut self.inner {
309             ProducerInner::Single(p) => p.send_batch().await,
310             ProducerInner::Partitioned(p) => {
311                 try_join_all(p.producers.iter_mut().map(|p| p.send_batch()))
312                     .await
313                     .map(drop)
314             }
315         }
316     }
317 
send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error>318     pub(crate) async fn send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error> {
319         match &mut self.inner {
320             ProducerInner::Single(p) => p.send_raw(message).await,
321             ProducerInner::Partitioned(p) => p.next().send_raw(message).await,
322         }
323     }
324 }
325 
326 enum ProducerInner<Exe: Executor> {
327     Single(TopicProducer<Exe>),
328     Partitioned(PartitionedProducer<Exe>),
329 }
330 
331 struct PartitionedProducer<Exe: Executor> {
332     // Guaranteed to be non-empty
333     producers: VecDeque<TopicProducer<Exe>>,
334     topic: String,
335     options: ProducerOptions,
336 }
337 
338 impl<Exe: Executor> PartitionedProducer<Exe> {
next(&mut self) -> &mut TopicProducer<Exe>339     pub fn next(&mut self) -> &mut TopicProducer<Exe> {
340         self.producers.rotate_left(1);
341         self.producers.front_mut().unwrap()
342     }
343 }
344 
345 /// a producer is used to publish messages on a topic
346 struct TopicProducer<Exe: Executor> {
347     client: Pulsar<Exe>,
348     connection: Arc<Connection>,
349     id: ProducerId,
350     name: ProducerName,
351     topic: String,
352     message_id: SerialId,
353     //putting it in a mutex because we must send multiple messages at once
354     // while we might be pushing more messages from elsewhere
355     batch: Option<Mutex<Batch>>,
356     compression: Option<proto::CompressionType>,
357     _drop_signal: oneshot::Sender<()>,
358     options: ProducerOptions,
359 }
360 
361 impl<Exe: Executor> TopicProducer<Exe> {
from_connection<S: Into<String>>( client: Pulsar<Exe>, connection: Arc<Connection>, topic: S, name: Option<String>, options: ProducerOptions, ) -> Result<Self, Error>362     pub(crate) async fn from_connection<S: Into<String>>(
363         client: Pulsar<Exe>,
364         connection: Arc<Connection>,
365         topic: S,
366         name: Option<String>,
367         options: ProducerOptions,
368     ) -> Result<Self, Error> {
369         let topic = topic.into();
370         let producer_id = rand::random();
371         let sequence_ids = SerialId::new();
372 
373         let _ = connection
374             .sender()
375             .lookup_topic(topic.clone(), false)
376             .await?;
377 
378         let topic = topic.clone();
379         let batch_size = options.batch_size;
380         let compression = options.compression;
381 
382         match compression {
383             None | Some(CompressionType::None) => {}
384             Some(CompressionType::Lz4) => {
385                 #[cfg(not(feature = "lz4"))]
386                 return Err(Error::Custom("cannot create a producer with LZ4 compression because the 'lz4' cargo feature is not active".to_string()));
387             }
388             Some(CompressionType::Zlib) => {
389                 #[cfg(not(feature = "flate2"))]
390                 return Err(Error::Custom("cannot create a producer with zlib compression because the 'flate2' cargo feature is not active".to_string()));
391             }
392             Some(CompressionType::Zstd) => {
393                 #[cfg(not(feature = "zstd"))]
394                 return Err(Error::Custom("cannot create a producer with zstd compression because the 'zstd' cargo feature is not active".to_string()));
395             }
396             Some(CompressionType::Snappy) => {
397                 #[cfg(not(feature = "snap"))]
398                 return Err(Error::Custom("cannot create a producer with Snappy compression because the 'snap' cargo feature is not active".to_string()));
399             } //Some() => unimplemented!(),
400         };
401 
402         let success = connection
403             .sender()
404             .create_producer(topic.clone(), producer_id, name, options.clone())
405             .await?;
406 
407         // drop_signal will be dropped when the TopicProducer is dropped, then
408         // drop_receiver will return, and we can close the producer
409         let (_drop_signal, drop_receiver) = oneshot::channel::<()>();
410         let conn = connection.clone();
411         let _ = client.executor.spawn(Box::pin(async move {
412             let _res = drop_receiver.await;
413             let _ = conn.sender().close_producer(producer_id).await;
414         }));
415 
416         Ok(TopicProducer {
417             client,
418             connection,
419             id: producer_id,
420             name: success.producer_name,
421             topic,
422             message_id: sequence_ids,
423             batch: batch_size.map(Batch::new).map(Mutex::new),
424             compression,
425             _drop_signal,
426             options,
427         })
428     }
429 
topic(&self) -> &str430     pub fn topic(&self) -> &str {
431         &self.topic
432     }
433 
options(&self) -> &ProducerOptions434     pub fn options(&self) -> &ProducerOptions {
435         &self.options
436     }
437 
check_connection(&self) -> Result<(), Error>438     pub async fn check_connection(&self) -> Result<(), Error> {
439         self.connection.sender().send_ping().await?;
440         Ok(())
441     }
442 
send<T: SerializeMessage + Sized>( &mut self, message: T, ) -> Result<SendFuture, Error>443     pub async fn send<T: SerializeMessage + Sized>(
444         &mut self,
445         message: T,
446     ) -> Result<SendFuture, Error> {
447         match T::serialize_message(message) {
448             Ok(message) => self.send_raw(message.into()).await,
449             Err(e) => Err(e),
450         }
451     }
452 
send_batch(&mut self) -> Result<(), Error>453     pub async fn send_batch(&mut self) -> Result<(), Error> {
454         match self.batch.as_ref() {
455             None => Err(ProducerError::Custom("not a batching producer".to_string()).into()),
456             Some(batch) => {
457                 let mut payload: Vec<u8> = Vec::new();
458                 let mut receipts = Vec::new();
459                 let message_count;
460 
461                 {
462                     let batch = batch.lock().unwrap();
463                     let messages = batch.get_messages();
464                     message_count = messages.len();
465                     for (tx, message) in messages {
466                         receipts.push(tx);
467                         message.serialize(&mut payload);
468                     }
469                 }
470 
471                 if message_count == 0 {
472                     return Ok(());
473                 }
474 
475                 let message = ProducerMessage {
476                     payload,
477                     num_messages_in_batch: Some(message_count as i32),
478                     ..Default::default()
479                 };
480 
481                 trace!("sending a batched message of size {}", message_count);
482                 let send_receipt = self.send_compress(message).await.map_err(|e| Arc::new(e));
483                 for resolver in receipts {
484                     let _ = resolver.send(
485                         send_receipt
486                             .clone()
487                             .map_err(|e| ProducerError::Batch(e).into()),
488                     );
489                 }
490 
491                 Ok(())
492             }
493         }
494     }
495 
send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error>496     pub(crate) async fn send_raw(&mut self, message: ProducerMessage) -> Result<SendFuture, Error> {
497         let (tx, rx) = oneshot::channel();
498         match self.batch.as_ref() {
499             None => {
500                 let receipt = self.send_compress(message).await?;
501                 let _ = tx.send(Ok(receipt));
502                 Ok(SendFuture(rx))
503             }
504             Some(batch) => {
505                 let mut payload: Vec<u8> = Vec::new();
506                 let mut receipts = Vec::new();
507                 let mut counter = 0i32;
508 
509                 {
510                     let batch = batch.lock().unwrap();
511                     batch.push_back((tx, message));
512 
513                     if batch.is_full() {
514                         for (tx, message) in batch.get_messages() {
515                             receipts.push(tx);
516                             message.serialize(&mut payload);
517                             counter += 1;
518                         }
519                     }
520                 }
521 
522                 if counter > 0 {
523                     let message = ProducerMessage {
524                         payload,
525                         num_messages_in_batch: Some(counter),
526                         ..Default::default()
527                     };
528 
529                     let send_receipt = self.send_compress(message).await.map_err(|e| Arc::new(e));
530 
531                     trace!("sending a batched message of size {}", counter);
532                     for tx in receipts.drain(..) {
533                         let _ = tx.send(
534                             send_receipt
535                                 .clone()
536                                 .map_err(|e| ProducerError::Batch(e).into()),
537                         );
538                     }
539                 }
540 
541                 Ok(SendFuture(rx))
542             }
543         }
544     }
545 
send_compress( &mut self, mut message: ProducerMessage, ) -> Result<proto::CommandSendReceipt, Error>546     async fn send_compress(
547         &mut self,
548         mut message: ProducerMessage,
549     ) -> Result<proto::CommandSendReceipt, Error> {
550         let compressed_message = match self.compression {
551             None | Some(CompressionType::None) => message,
552             Some(CompressionType::Lz4) => {
553                 #[cfg(not(feature = "lz4"))]
554                 return unimplemented!();
555 
556                 #[cfg(feature = "lz4")]
557                 {
558                     let v: Vec<u8> = Vec::new();
559                     let mut encoder = lz4::EncoderBuilder::new()
560                         .build(v)
561                         .map_err(ProducerError::Io)?;
562                     encoder
563                         .write(&message.payload[..])
564                         .map_err(ProducerError::Io)?;
565                     let (compressed_payload, result) = encoder.finish();
566 
567                     result.map_err(ProducerError::Io)?;
568                     message.payload = compressed_payload;
569                     message.compression = Some(1);
570                     message
571                 }
572             }
573             Some(CompressionType::Zlib) => {
574                 #[cfg(not(feature = "flate2"))]
575                 return unimplemented!();
576 
577                 #[cfg(feature = "flate2")]
578                 {
579                     let mut e =
580                         flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
581                     e.write_all(&message.payload[..])
582                         .map_err(ProducerError::Io)?;
583                     let compressed_payload = e.finish().map_err(ProducerError::Io)?;
584 
585                     message.payload = compressed_payload;
586                     message.compression = Some(2);
587                     message
588                 }
589             }
590             Some(CompressionType::Zstd) => {
591                 #[cfg(not(feature = "zstd"))]
592                 return unimplemented!();
593 
594                 #[cfg(feature = "zstd")]
595                 {
596                     let compressed_payload =
597                         zstd::encode_all(&message.payload[..], 0).map_err(ProducerError::Io)?;
598                     message.compression = Some(3);
599                     message.payload = compressed_payload;
600                     message
601                 }
602             }
603             Some(CompressionType::Snappy) => {
604                 #[cfg(not(feature = "snap"))]
605                 return unimplemented!();
606 
607                 #[cfg(feature = "snap")]
608                 {
609                     let compressed_payload: Vec<u8> = Vec::new();
610                     let mut encoder = snap::write::FrameEncoder::new(compressed_payload);
611                     encoder
612                         .write(&message.payload[..])
613                         .map_err(ProducerError::Io)?;
614                     let compressed_payload = encoder
615                         .into_inner()
616                         //FIXME
617                         .map_err(|e| {
618                             std::io::Error::new(
619                                 std::io::ErrorKind::Other,
620                                 format!("Snappy compression error: {:?}", e),
621                             )
622                         })
623                         .map_err(ProducerError::Io)?;
624 
625                     message.payload = compressed_payload;
626                     message.compression = Some(4);
627                     message
628                 }
629             }
630         };
631 
632         self.send_inner(compressed_message).await
633     }
634 
send_inner( &mut self, message: ProducerMessage, ) -> Result<proto::CommandSendReceipt, Error>635     async fn send_inner(
636         &mut self,
637         message: ProducerMessage,
638     ) -> Result<proto::CommandSendReceipt, Error> {
639         let msg = message.clone();
640         match self
641             .connection
642             .sender()
643             .send(self.id, self.name.clone(), self.message_id.get(), message)
644             .await
645         {
646             Ok(receipt) => return Ok(receipt),
647             Err(ConnectionError::Disconnected) => {}
648             Err(e) => {
649                 error!("send_inner got error: {:?}", e);
650                 return Err(ProducerError::Connection(e).into());
651             }
652         };
653 
654         error!("send_inner disconnected");
655         self.reconnect().await?;
656 
657         match self
658             .connection
659             .sender()
660             .send(self.id, self.name.clone(), self.message_id.get(), msg)
661             .await
662         {
663             Ok(receipt) => Ok(receipt),
664             Err(e) => {
665                 error!("send_inner got error: {:?}", e);
666                 Err(ProducerError::Connection(e).into())
667             }
668         }
669     }
670 
reconnect(&mut self) -> Result<(), Error>671     async fn reconnect(&mut self) -> Result<(), Error> {
672         debug!("reconnecting producer for topic: {}", self.topic);
673         let broker_address = self.client.lookup_topic(&self.topic).await?;
674         let conn = self.client.manager.get_connection(&broker_address).await?;
675 
676         self.connection = conn;
677 
678         let topic = self.topic.clone();
679         let batch_size = self.options.batch_size;
680 
681         let _ = self
682             .connection
683             .sender()
684             .create_producer(
685                 topic.clone(),
686                 self.id,
687                 Some(self.name.clone()),
688                 self.options.clone(),
689             )
690             .await?;
691 
692         // drop_signal will be dropped when the TopicProducer is dropped, then
693         // drop_receiver will return, and we can close the producer
694         let (_drop_signal, drop_receiver) = oneshot::channel::<()>();
695         let batch = batch_size.map(Batch::new).map(Mutex::new);
696         let conn = self.connection.clone();
697         let producer_id = self.id;
698         let _ = self.client.executor.spawn(Box::pin(async move {
699             let _res = drop_receiver.await;
700             let _ = conn.sender().close_producer(producer_id).await;
701         }));
702 
703         self.batch = batch;
704         self._drop_signal = _drop_signal;
705 
706         Ok(())
707     }
708 }
709 
710 /// Helper structure to prepare a producer
711 ///
712 /// generated from [Pulsar::producer]
713 #[derive(Clone)]
714 pub struct ProducerBuilder<Exe: Executor> {
715     pulsar: Pulsar<Exe>,
716     topic: Option<String>,
717     name: Option<String>,
718     producer_options: Option<ProducerOptions>,
719 }
720 
721 impl<Exe: Executor> ProducerBuilder<Exe> {
new(pulsar: &Pulsar<Exe>) -> Self722     pub fn new(pulsar: &Pulsar<Exe>) -> Self {
723         ProducerBuilder {
724             pulsar: pulsar.clone(),
725             topic: None,
726             name: None,
727             producer_options: None,
728         }
729     }
730 
with_topic<S: Into<String>>(mut self, topic: S) -> Self731     pub fn with_topic<S: Into<String>>(mut self, topic: S) -> Self {
732         self.topic = Some(topic.into());
733         self
734     }
735 
with_name<S: Into<String>>(mut self, name: S) -> Self736     pub fn with_name<S: Into<String>>(mut self, name: S) -> Self {
737         self.name = Some(name.into());
738         self
739     }
740 
with_options(mut self, options: ProducerOptions) -> Self741     pub fn with_options(mut self, options: ProducerOptions) -> Self {
742         self.producer_options = Some(options);
743         self
744     }
745 
build(self) -> Result<Producer<Exe>, Error>746     pub async fn build(self) -> Result<Producer<Exe>, Error> {
747         let ProducerBuilder {
748             pulsar,
749             topic,
750             name,
751             producer_options,
752         } = self;
753         let topic = topic.ok_or(Error::Custom(format!("topic not set")))?;
754         let options = producer_options.unwrap_or_default();
755 
756         let producers: Vec<TopicProducer<Exe>> = try_join_all(
757             pulsar
758                 .lookup_partitioned_topic(&topic)
759                 .await?
760                 .into_iter()
761                 .map(|(topic, addr)| {
762                     let name = name.clone();
763                     let options = options.clone();
764                     let pulsar = pulsar.clone();
765                     async move {
766                         let conn = pulsar.manager.get_connection(&addr).await?;
767                         let producer =
768                             TopicProducer::from_connection(pulsar, conn, topic, name, options)
769                                 .await?;
770                         Ok::<_, Error>(producer)
771                     }
772                 }),
773         )
774             .await?;
775 
776         let producer = if producers.len() == 1 {
777             ProducerInner::Single(producers.into_iter().next().unwrap())
778         } else if producers.len() > 1 {
779             let mut producers = VecDeque::from(producers);
780             // write to topic-1 first
781             producers.rotate_right(1);
782             ProducerInner::Partitioned(PartitionedProducer {
783                 producers,
784                 topic,
785                 options,
786             })
787         } else {
788             return Err(Error::Custom(format!(
789                 "Unexpected error: Partition lookup returned no topics for {}",
790                 topic
791             )));
792         };
793         Ok(Producer { inner: producer })
794     }
795 
build_multi_topic(self) -> MultiTopicProducer<Exe>796     pub fn build_multi_topic(self) -> MultiTopicProducer<Exe> {
797         MultiTopicProducer {
798             client: self.pulsar,
799             producers: Default::default(),
800             options: self.producer_options.unwrap_or_default(),
801             name: self.name,
802         }
803     }
804 }
805 
806 struct Batch {
807     pub length: u32,
808     // put it in a mutex because the design of Producer requires an immutable TopicProducer,
809     // so we cannot have a mutable Batch in a send_raw(&mut self, ...)
810     pub storage: Mutex<
811         VecDeque<(
812             oneshot::Sender<Result<proto::CommandSendReceipt, Error>>,
813             BatchedMessage,
814         )>,
815     >,
816 }
817 
818 impl Batch {
new(length: u32) -> Batch819     pub fn new(length: u32) -> Batch {
820         Batch {
821             length,
822             storage: Mutex::new(VecDeque::with_capacity(length as usize)),
823         }
824     }
825 
is_full(&self) -> bool826     pub fn is_full(&self) -> bool {
827         self.storage.lock().unwrap().len() >= self.length as usize
828     }
829 
push_back( &self, msg: ( oneshot::Sender<Result<proto::CommandSendReceipt, Error>>, ProducerMessage, ), )830     pub fn push_back(
831         &self,
832         msg: (
833             oneshot::Sender<Result<proto::CommandSendReceipt, Error>>,
834             ProducerMessage,
835         ),
836     ) {
837         let (tx, message) = msg;
838 
839         let properties = message
840             .properties
841             .into_iter()
842             .map(|(key, value)| proto::KeyValue { key, value })
843             .collect();
844 
845         let batched = BatchedMessage {
846             metadata: proto::SingleMessageMetadata {
847                 properties,
848                 partition_key: message.partition_key,
849                 payload_size: message.payload.len() as i32,
850                 ..Default::default()
851             },
852             payload: message.payload,
853         };
854         self.storage.lock().unwrap().push_back((tx, batched))
855     }
856 
get_messages( &self, ) -> Vec<( oneshot::Sender<Result<proto::CommandSendReceipt, Error>>, BatchedMessage, )>857     pub fn get_messages(
858         &self,
859     ) -> Vec<(
860         oneshot::Sender<Result<proto::CommandSendReceipt, Error>>,
861         BatchedMessage,
862     )> {
863         self.storage.lock().unwrap().drain(..).collect()
864     }
865 }
866 
867 /// Helper structure to prepare a message
868 ///
869 /// generated with [Producer::create_message]
870 pub struct MessageBuilder<'a, T, Exe: Executor> {
871     producer: &'a mut Producer<Exe>,
872     properties: HashMap<String, String>,
873     partition_key: Option<String>,
874     content: T,
875 }
876 
877 impl<'a, Exe: Executor> MessageBuilder<'a, (), Exe> {
new(producer: &'a mut Producer<Exe>) -> Self878     pub fn new(producer: &'a mut Producer<Exe>) -> Self {
879         MessageBuilder {
880             producer,
881             properties: HashMap::new(),
882             partition_key: None,
883             content: (),
884         }
885     }
886 }
887 
888 impl<'a, T, Exe: Executor> MessageBuilder<'a, T, Exe> {
with_content<C>(self, content: C) -> MessageBuilder<'a, C, Exe>889     pub fn with_content<C>(self, content: C) -> MessageBuilder<'a, C, Exe> {
890         MessageBuilder {
891             producer: self.producer,
892             properties: self.properties,
893             partition_key: self.partition_key,
894             content,
895         }
896     }
897 
with_partition_key<S: Into<String>>(mut self, partition_key: S) -> Self898     pub fn with_partition_key<S: Into<String>>(mut self, partition_key: S) -> Self {
899         self.partition_key = Some(partition_key.into());
900         self
901     }
902 
with_property<S1: Into<String>, S2: Into<String>>(mut self, key: S1, value: S2) -> Self903     pub fn with_property<S1: Into<String>, S2: Into<String>>(mut self, key: S1, value: S2) -> Self {
904         self.properties.insert(key.into(), value.into());
905         self
906     }
907 }
908 
909 impl<'a, T: SerializeMessage + Sized, Exe: Executor> MessageBuilder<'a, T, Exe> {
send(self) -> Result<SendFuture, Error>910     pub async fn send(self) -> Result<SendFuture, Error> {
911         let MessageBuilder {
912             producer,
913             properties,
914             partition_key,
915             content,
916         } = self;
917 
918         let mut message = T::serialize_message(content)?;
919         message.properties = properties;
920         message.partition_key = partition_key;
921         producer.send_raw(message.into()).await
922     }
923 }
924