1 //! Error manipulations.
2 
3 use std::{error, ffi, fmt};
4 
5 use rdkafka_sys as rdsys;
6 use rdkafka_sys::types::*;
7 
8 // Re-export rdkafka error
9 pub use rdsys::types::RDKafkaError;
10 
11 /// Kafka result.
12 pub type KafkaResult<T> = Result<T, KafkaError>;
13 
14 /// Verify if the value represents an error condition.
15 ///
16 /// Some librdkafka codes are informational, rather than true errors.
17 pub trait IsError {
18     /// Reports whether the value represents an error.
is_error(self) -> bool19     fn is_error(self) -> bool;
20 }
21 
22 impl IsError for RDKafkaRespErr {
is_error(self) -> bool23     fn is_error(self) -> bool {
24         self as i32 != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR as i32
25     }
26 }
27 
28 impl IsError for RDKafkaConfRes {
is_error(self) -> bool29     fn is_error(self) -> bool {
30         self as i32 != RDKafkaConfRes::RD_KAFKA_CONF_OK as i32
31     }
32 }
33 
34 // TODO: consider using macro
35 
36 /// Represents all possible Kafka errors.
37 ///
38 /// If applicable, check the underlying [`RDKafkaError`] to get details.
39 #[derive(Clone, PartialEq, Eq)]
40 pub enum KafkaError {
41     /// Creation of admin operation failed.
42     AdminOpCreation(String),
43     /// The admin operation itself failed.
44     AdminOp(RDKafkaError),
45     /// The client was dropped before the operation completed.
46     Canceled,
47     /// Invalid client configuration.
48     ClientConfig(RDKafkaConfRes, String, String, String),
49     /// Client creation failed.
50     ClientCreation(String),
51     /// Consumer commit failed.
52     ConsumerCommit(RDKafkaError),
53     /// Global error.
54     Global(RDKafkaError),
55     /// Group list fetch failed.
56     GroupListFetch(RDKafkaError),
57     /// Message consumption failed.
58     MessageConsumption(RDKafkaError),
59     /// Message production error.
60     MessageProduction(RDKafkaError),
61     /// Metadata fetch error.
62     MetadataFetch(RDKafkaError),
63     /// No message was received.
64     NoMessageReceived,
65     /// Unexpected null pointer
66     Nul(ffi::NulError),
67     /// Offset fetch failed.
68     OffsetFetch(RDKafkaError),
69     /// End of partition reached.
70     PartitionEOF(i32),
71     /// Pause/Resume failed.
72     PauseResume(String),
73     /// Seeking a partition failed.
74     Seek(String),
75     /// Setting partition offset failed.
76     SetPartitionOffset(RDKafkaError),
77     /// Offset store failed.
78     StoreOffset(RDKafkaError),
79     /// Subscription creation failed.
80     Subscription(String),
81 }
82 
83 impl fmt::Debug for KafkaError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result84     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85         match *self {
86             KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err),
87             KafkaError::AdminOpCreation(ref err) => {
88                 write!(f, "KafkaError (Admin operation creation error: {})", err)
89             }
90             KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
91             KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(
92                 f,
93                 "KafkaError (Client config error: {} {} {})",
94                 desc, key, value
95             ),
96             KafkaError::ClientCreation(ref err) => {
97                 write!(f, "KafkaError (Client creation error: {})", err)
98             }
99             KafkaError::ConsumerCommit(err) => {
100                 write!(f, "KafkaError (Consumer commit error: {})", err)
101             }
102             KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
103             KafkaError::GroupListFetch(err) => {
104                 write!(f, "KafkaError (Group list fetch error: {})", err)
105             }
106             KafkaError::MessageConsumption(err) => {
107                 write!(f, "KafkaError (Message consumption error: {})", err)
108             }
109             KafkaError::MessageProduction(err) => {
110                 write!(f, "KafkaError (Message production error: {})", err)
111             }
112             KafkaError::MetadataFetch(err) => {
113                 write!(f, "KafkaError (Metadata fetch error: {})", err)
114             }
115             KafkaError::NoMessageReceived => {
116                 write!(f, "No message received within the given poll interval")
117             }
118             KafkaError::Nul(_) => write!(f, "FFI null error"),
119             KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
120             KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
121             KafkaError::PauseResume(ref err) => {
122                 write!(f, "KafkaError (Pause/resume error: {})", err)
123             }
124             KafkaError::Seek(ref err) => write!(f, "KafkaError (Seek error: {})", err),
125             KafkaError::SetPartitionOffset(err) => {
126                 write!(f, "KafkaError (Set partition offset error: {})", err)
127             }
128             KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err),
129             KafkaError::Subscription(ref err) => {
130                 write!(f, "KafkaError (Subscription error: {})", err)
131             }
132         }
133     }
134 }
135 
136 impl fmt::Display for KafkaError {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result137     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
138         match *self {
139             KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err),
140             KafkaError::AdminOpCreation(ref err) => {
141                 write!(f, "Admin operation creation error: {}", err)
142             }
143             KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
144             KafkaError::ClientConfig(_, ref desc, ref key, ref value) => {
145                 write!(f, "Client config error: {} {} {}", desc, key, value)
146             }
147             KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
148             KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
149             KafkaError::Global(err) => write!(f, "Global error: {}", err),
150             KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
151             KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
152             KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
153             KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
154             KafkaError::NoMessageReceived => {
155                 write!(f, "No message received within the given poll interval")
156             }
157             KafkaError::Nul(_) => write!(f, "FFI nul error"),
158             KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
159             KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
160             KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
161             KafkaError::Seek(ref err) => write!(f, "Seek error: {}", err),
162             KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
163             KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
164             KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
165         }
166     }
167 }
168 
169 impl error::Error for KafkaError {
description(&self) -> &str170     fn description(&self) -> &str {
171         match *self {
172             KafkaError::AdminOp(_) => "Admin operation error",
173             KafkaError::AdminOpCreation(_) => "Admin operation creation error",
174             KafkaError::Canceled => "Client dropped",
175             KafkaError::ClientConfig(_, _, _, _) => "Client config error",
176             KafkaError::ClientCreation(_) => "Client creation error",
177             KafkaError::ConsumerCommit(_) => "Consumer commit error",
178             KafkaError::Global(_) => "Global error",
179             KafkaError::GroupListFetch(_) => "Group list fetch error",
180             KafkaError::MessageConsumption(_) => "Message consumption error",
181             KafkaError::MessageProduction(_) => "Message production error",
182             KafkaError::MetadataFetch(_) => "Meta data fetch error",
183             KafkaError::NoMessageReceived => "No message received within the given poll interval",
184             KafkaError::Nul(_) => "FFI nul error",
185             KafkaError::OffsetFetch(_) => "Offset fetch error",
186             KafkaError::PartitionEOF(_) => "Partition EOF error",
187             KafkaError::PauseResume(_) => "Pause/resume error",
188             KafkaError::Seek(_) => "Seek error",
189             KafkaError::SetPartitionOffset(_) => "Set partition offset error",
190             KafkaError::StoreOffset(_) => "Store offset error",
191             KafkaError::Subscription(_) => "Subscription error",
192         }
193     }
194 
195     #[allow(clippy::match_same_arms)]
cause(&self) -> Option<&dyn error::Error>196     fn cause(&self) -> Option<&dyn error::Error> {
197         match *self {
198             KafkaError::AdminOp(_) => None,
199             KafkaError::AdminOpCreation(_) => None,
200             KafkaError::Canceled => None,
201             KafkaError::ClientConfig(_, _, _, _) => None,
202             KafkaError::ClientCreation(_) => None,
203             KafkaError::ConsumerCommit(ref err) => Some(err),
204             KafkaError::Global(ref err) => Some(err),
205             KafkaError::GroupListFetch(ref err) => Some(err),
206             KafkaError::MessageConsumption(ref err) => Some(err),
207             KafkaError::MessageProduction(ref err) => Some(err),
208             KafkaError::MetadataFetch(ref err) => Some(err),
209             KafkaError::NoMessageReceived => None,
210             KafkaError::Nul(_) => None,
211             KafkaError::OffsetFetch(ref err) => Some(err),
212             KafkaError::PartitionEOF(_) => None,
213             KafkaError::PauseResume(_) => None,
214             KafkaError::Seek(_) => None,
215             KafkaError::SetPartitionOffset(ref err) => Some(err),
216             KafkaError::StoreOffset(ref err) => Some(err),
217             KafkaError::Subscription(_) => None,
218         }
219     }
220 }
221 
222 impl From<ffi::NulError> for KafkaError {
from(err: ffi::NulError) -> KafkaError223     fn from(err: ffi::NulError) -> KafkaError {
224         KafkaError::Nul(err)
225     }
226 }
227