1 //! Error manipulations. 2 3 use std::{error, ffi, fmt}; 4 5 use rdkafka_sys as rdsys; 6 use rdkafka_sys::types::*; 7 8 // Re-export rdkafka error 9 pub use rdsys::types::RDKafkaError; 10 11 /// Kafka result. 12 pub type KafkaResult<T> = Result<T, KafkaError>; 13 14 /// Verify if the value represents an error condition. 15 /// 16 /// Some librdkafka codes are informational, rather than true errors. 17 pub trait IsError { 18 /// Reports whether the value represents an error. is_error(self) -> bool19 fn is_error(self) -> bool; 20 } 21 22 impl IsError for RDKafkaRespErr { is_error(self) -> bool23 fn is_error(self) -> bool { 24 self as i32 != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR as i32 25 } 26 } 27 28 impl IsError for RDKafkaConfRes { is_error(self) -> bool29 fn is_error(self) -> bool { 30 self as i32 != RDKafkaConfRes::RD_KAFKA_CONF_OK as i32 31 } 32 } 33 34 // TODO: consider using macro 35 36 /// Represents all possible Kafka errors. 37 /// 38 /// If applicable, check the underlying [`RDKafkaError`] to get details. 39 #[derive(Clone, PartialEq, Eq)] 40 pub enum KafkaError { 41 /// Creation of admin operation failed. 42 AdminOpCreation(String), 43 /// The admin operation itself failed. 44 AdminOp(RDKafkaError), 45 /// The client was dropped before the operation completed. 46 Canceled, 47 /// Invalid client configuration. 48 ClientConfig(RDKafkaConfRes, String, String, String), 49 /// Client creation failed. 50 ClientCreation(String), 51 /// Consumer commit failed. 52 ConsumerCommit(RDKafkaError), 53 /// Global error. 54 Global(RDKafkaError), 55 /// Group list fetch failed. 56 GroupListFetch(RDKafkaError), 57 /// Message consumption failed. 58 MessageConsumption(RDKafkaError), 59 /// Message production error. 60 MessageProduction(RDKafkaError), 61 /// Metadata fetch error. 62 MetadataFetch(RDKafkaError), 63 /// No message was received. 64 NoMessageReceived, 65 /// Unexpected null pointer 66 Nul(ffi::NulError), 67 /// Offset fetch failed. 68 OffsetFetch(RDKafkaError), 69 /// End of partition reached. 70 PartitionEOF(i32), 71 /// Pause/Resume failed. 72 PauseResume(String), 73 /// Seeking a partition failed. 74 Seek(String), 75 /// Setting partition offset failed. 76 SetPartitionOffset(RDKafkaError), 77 /// Offset store failed. 78 StoreOffset(RDKafkaError), 79 /// Subscription creation failed. 80 Subscription(String), 81 } 82 83 impl fmt::Debug for KafkaError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result84 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 85 match *self { 86 KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err), 87 KafkaError::AdminOpCreation(ref err) => { 88 write!(f, "KafkaError (Admin operation creation error: {})", err) 89 } 90 KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"), 91 KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!( 92 f, 93 "KafkaError (Client config error: {} {} {})", 94 desc, key, value 95 ), 96 KafkaError::ClientCreation(ref err) => { 97 write!(f, "KafkaError (Client creation error: {})", err) 98 } 99 KafkaError::ConsumerCommit(err) => { 100 write!(f, "KafkaError (Consumer commit error: {})", err) 101 } 102 KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err), 103 KafkaError::GroupListFetch(err) => { 104 write!(f, "KafkaError (Group list fetch error: {})", err) 105 } 106 KafkaError::MessageConsumption(err) => { 107 write!(f, "KafkaError (Message consumption error: {})", err) 108 } 109 KafkaError::MessageProduction(err) => { 110 write!(f, "KafkaError (Message production error: {})", err) 111 } 112 KafkaError::MetadataFetch(err) => { 113 write!(f, "KafkaError (Metadata fetch error: {})", err) 114 } 115 KafkaError::NoMessageReceived => { 116 write!(f, "No message received within the given poll interval") 117 } 118 KafkaError::Nul(_) => write!(f, "FFI null error"), 119 KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err), 120 KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n), 121 KafkaError::PauseResume(ref err) => { 122 write!(f, "KafkaError (Pause/resume error: {})", err) 123 } 124 KafkaError::Seek(ref err) => write!(f, "KafkaError (Seek error: {})", err), 125 KafkaError::SetPartitionOffset(err) => { 126 write!(f, "KafkaError (Set partition offset error: {})", err) 127 } 128 KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err), 129 KafkaError::Subscription(ref err) => { 130 write!(f, "KafkaError (Subscription error: {})", err) 131 } 132 } 133 } 134 } 135 136 impl fmt::Display for KafkaError { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result137 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 138 match *self { 139 KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err), 140 KafkaError::AdminOpCreation(ref err) => { 141 write!(f, "Admin operation creation error: {}", err) 142 } 143 KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"), 144 KafkaError::ClientConfig(_, ref desc, ref key, ref value) => { 145 write!(f, "Client config error: {} {} {}", desc, key, value) 146 } 147 KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err), 148 KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err), 149 KafkaError::Global(err) => write!(f, "Global error: {}", err), 150 KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err), 151 KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err), 152 KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err), 153 KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err), 154 KafkaError::NoMessageReceived => { 155 write!(f, "No message received within the given poll interval") 156 } 157 KafkaError::Nul(_) => write!(f, "FFI nul error"), 158 KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err), 159 KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n), 160 KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err), 161 KafkaError::Seek(ref err) => write!(f, "Seek error: {}", err), 162 KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err), 163 KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err), 164 KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err), 165 } 166 } 167 } 168 169 impl error::Error for KafkaError { description(&self) -> &str170 fn description(&self) -> &str { 171 match *self { 172 KafkaError::AdminOp(_) => "Admin operation error", 173 KafkaError::AdminOpCreation(_) => "Admin operation creation error", 174 KafkaError::Canceled => "Client dropped", 175 KafkaError::ClientConfig(_, _, _, _) => "Client config error", 176 KafkaError::ClientCreation(_) => "Client creation error", 177 KafkaError::ConsumerCommit(_) => "Consumer commit error", 178 KafkaError::Global(_) => "Global error", 179 KafkaError::GroupListFetch(_) => "Group list fetch error", 180 KafkaError::MessageConsumption(_) => "Message consumption error", 181 KafkaError::MessageProduction(_) => "Message production error", 182 KafkaError::MetadataFetch(_) => "Meta data fetch error", 183 KafkaError::NoMessageReceived => "No message received within the given poll interval", 184 KafkaError::Nul(_) => "FFI nul error", 185 KafkaError::OffsetFetch(_) => "Offset fetch error", 186 KafkaError::PartitionEOF(_) => "Partition EOF error", 187 KafkaError::PauseResume(_) => "Pause/resume error", 188 KafkaError::Seek(_) => "Seek error", 189 KafkaError::SetPartitionOffset(_) => "Set partition offset error", 190 KafkaError::StoreOffset(_) => "Store offset error", 191 KafkaError::Subscription(_) => "Subscription error", 192 } 193 } 194 195 #[allow(clippy::match_same_arms)] cause(&self) -> Option<&dyn error::Error>196 fn cause(&self) -> Option<&dyn error::Error> { 197 match *self { 198 KafkaError::AdminOp(_) => None, 199 KafkaError::AdminOpCreation(_) => None, 200 KafkaError::Canceled => None, 201 KafkaError::ClientConfig(_, _, _, _) => None, 202 KafkaError::ClientCreation(_) => None, 203 KafkaError::ConsumerCommit(ref err) => Some(err), 204 KafkaError::Global(ref err) => Some(err), 205 KafkaError::GroupListFetch(ref err) => Some(err), 206 KafkaError::MessageConsumption(ref err) => Some(err), 207 KafkaError::MessageProduction(ref err) => Some(err), 208 KafkaError::MetadataFetch(ref err) => Some(err), 209 KafkaError::NoMessageReceived => None, 210 KafkaError::Nul(_) => None, 211 KafkaError::OffsetFetch(ref err) => Some(err), 212 KafkaError::PartitionEOF(_) => None, 213 KafkaError::PauseResume(_) => None, 214 KafkaError::Seek(_) => None, 215 KafkaError::SetPartitionOffset(ref err) => Some(err), 216 KafkaError::StoreOffset(ref err) => Some(err), 217 KafkaError::Subscription(_) => None, 218 } 219 } 220 } 221 222 impl From<ffi::NulError> for KafkaError { from(err: ffi::NulError) -> KafkaError223 fn from(err: ffi::NulError) -> KafkaError { 224 KafkaError::Nul(err) 225 } 226 } 227