1 //! Utility functions and types.
2
3 use std::ffi::CStr;
4 use std::fmt;
5 use std::future::Future;
6 use std::ops::Deref;
7 use std::os::raw::c_char;
8 use std::os::raw::c_void;
9 use std::ptr;
10 use std::ptr::NonNull;
11 use std::slice;
12 use std::sync::Arc;
13 use std::time::{Duration, SystemTime, UNIX_EPOCH};
14
15 use log::trace;
16
17 use rdkafka_sys as rdsys;
18
19 /// Returns a tuple representing the version of `librdkafka` in hexadecimal and
20 /// string format.
get_rdkafka_version() -> (u16, String)21 pub fn get_rdkafka_version() -> (u16, String) {
22 let version_number = unsafe { rdsys::rd_kafka_version() } as u16;
23 let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
24 (version_number, c_str.to_string_lossy().into_owned())
25 }
26
27 /// Specifies a timeout for a Kafka operation.
28 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
29 pub enum Timeout {
30 /// Time out after the specified duration elapses.
31 After(Duration),
32 /// Block forever.
33 Never,
34 }
35
36 impl Timeout {
37 /// Converts a timeout to Kafka's expected representation.
as_millis(&self) -> i3238 pub(crate) fn as_millis(&self) -> i32 {
39 match self {
40 Timeout::After(d) => d.as_millis() as i32,
41 Timeout::Never => -1,
42 }
43 }
44 }
45
46 impl std::ops::SubAssign for Timeout {
sub_assign(&mut self, other: Self)47 fn sub_assign(&mut self, other: Self) {
48 match (self, other) {
49 (Timeout::After(lhs), Timeout::After(rhs)) => *lhs -= rhs,
50 (Timeout::Never, Timeout::After(_)) => (),
51 _ => panic!("subtraction of Timeout::Never is ill-defined"),
52 }
53 }
54 }
55
56 impl From<Duration> for Timeout {
from(d: Duration) -> Timeout57 fn from(d: Duration) -> Timeout {
58 Timeout::After(d)
59 }
60 }
61
62 impl From<Option<Duration>> for Timeout {
from(v: Option<Duration>) -> Timeout63 fn from(v: Option<Duration>) -> Timeout {
64 match v {
65 None => Timeout::Never,
66 Some(d) => Timeout::After(d),
67 }
68 }
69 }
70
71 /// Converts the given time to the number of milliseconds since the Unix epoch.
millis_to_epoch(time: SystemTime) -> i6472 pub fn millis_to_epoch(time: SystemTime) -> i64 {
73 time.duration_since(UNIX_EPOCH)
74 .unwrap_or_else(|_| Duration::from_secs(0))
75 .as_millis() as i64
76 }
77
78 /// Returns the current time in milliseconds since the Unix epoch.
current_time_millis() -> i6479 pub fn current_time_millis() -> i64 {
80 millis_to_epoch(SystemTime::now())
81 }
82
83 /// Converts a pointer to an array to an optional slice. If the pointer is null,
84 /// returns `None`.
ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]>85 pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
86 if ptr.is_null() {
87 None
88 } else {
89 Some(slice::from_raw_parts::<T>(ptr as *const T, size))
90 }
91 }
92
93 /// Converts a pointer to an array to a slice. If the pointer is null or the
94 /// size is zero, returns a zero-length slice..
ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T]95 pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
96 if ptr.is_null() || size == 0 {
97 &[][..]
98 } else {
99 slice::from_raw_parts::<T>(ptr as *const T, size)
100 }
101 }
102
103 /// Converts Rust data to and from raw pointers.
104 ///
105 /// This conversion is used to pass opaque objects to the C library and vice
106 /// versa.
107 pub trait IntoOpaque: Send + Sync {
108 /// Converts the object into a raw pointer.
as_ptr(&self) -> *mut c_void109 fn as_ptr(&self) -> *mut c_void;
110
111 /// Converts the raw pointer back to the original Rust object.
from_ptr(_: *mut c_void) -> Self112 unsafe fn from_ptr(_: *mut c_void) -> Self;
113 }
114
115 impl IntoOpaque for () {
as_ptr(&self) -> *mut c_void116 fn as_ptr(&self) -> *mut c_void {
117 ptr::null_mut()
118 }
119
from_ptr(_: *mut c_void) -> Self120 unsafe fn from_ptr(_: *mut c_void) -> Self {}
121 }
122
123 impl IntoOpaque for usize {
as_ptr(&self) -> *mut c_void124 fn as_ptr(&self) -> *mut c_void {
125 *self as *mut usize as *mut c_void
126 }
127
from_ptr(ptr: *mut c_void) -> Self128 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
129 ptr as usize
130 }
131 }
132
133 impl<T: Send + Sync> IntoOpaque for Box<T> {
as_ptr(&self) -> *mut c_void134 fn as_ptr(&self) -> *mut c_void {
135 self.as_ref() as *const T as *mut c_void
136 }
137
from_ptr(ptr: *mut c_void) -> Self138 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
139 Box::from_raw(ptr as *mut T)
140 }
141 }
142
143 impl<T: Send + Sync> IntoOpaque for Arc<T> {
as_ptr(&self) -> *mut c_void144 fn as_ptr(&self) -> *mut c_void {
145 self.as_ref() as *const T as *mut c_void
146 }
147
from_ptr(ptr: *mut c_void) -> Self148 unsafe fn from_ptr(ptr: *mut c_void) -> Self {
149 Arc::from_raw(ptr as *mut T)
150 }
151 }
152
153 // TODO: check if the implementation returns a copy of the data and update the documentation
154 /// Converts a byte array representing a C string into a [`String`].
bytes_cstr_to_owned(bytes_cstr: &[c_char]) -> String155 pub unsafe fn bytes_cstr_to_owned(bytes_cstr: &[c_char]) -> String {
156 CStr::from_ptr(bytes_cstr.as_ptr() as *const c_char)
157 .to_string_lossy()
158 .into_owned()
159 }
160
161 /// Converts a C string into a [`String`].
cstr_to_owned(cstr: *const c_char) -> String162 pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
163 CStr::from_ptr(cstr as *const c_char)
164 .to_string_lossy()
165 .into_owned()
166 }
167
168 pub(crate) struct ErrBuf {
169 buf: [c_char; ErrBuf::MAX_ERR_LEN],
170 }
171
172 impl ErrBuf {
173 const MAX_ERR_LEN: usize = 512;
174
new() -> ErrBuf175 pub fn new() -> ErrBuf {
176 ErrBuf {
177 buf: [0; ErrBuf::MAX_ERR_LEN],
178 }
179 }
180
as_mut_ptr(&mut self) -> *mut c_char181 pub fn as_mut_ptr(&mut self) -> *mut c_char {
182 self.buf.as_mut_ptr()
183 }
184
len(&self) -> usize185 pub fn len(&self) -> usize {
186 self.buf.len()
187 }
188
to_string(&self) -> String189 pub fn to_string(&self) -> String {
190 unsafe { bytes_cstr_to_owned(&self.buf) }
191 }
192 }
193
194 impl Default for ErrBuf {
default() -> ErrBuf195 fn default() -> ErrBuf {
196 ErrBuf::new()
197 }
198 }
199
200 pub(crate) trait WrappedCPointer {
201 type Target;
202
ptr(&self) -> *mut Self::Target203 fn ptr(&self) -> *mut Self::Target;
204
is_null(&self) -> bool205 fn is_null(&self) -> bool {
206 self.ptr().is_null()
207 }
208 }
209
210 /// Converts a container into a C array.
211 pub(crate) trait AsCArray<T: WrappedCPointer> {
as_c_array(&self) -> *mut *mut T::Target212 fn as_c_array(&self) -> *mut *mut T::Target;
213 }
214
215 impl<T: WrappedCPointer> AsCArray<T> for Vec<T> {
as_c_array(&self) -> *mut *mut T::Target216 fn as_c_array(&self) -> *mut *mut T::Target {
217 self.as_ptr() as *mut *mut T::Target
218 }
219 }
220
221 pub(crate) struct NativePtr<T>
222 where
223 T: KafkaDrop,
224 {
225 ptr: NonNull<T>,
226 }
227
228 impl<T> Drop for NativePtr<T>
229 where
230 T: KafkaDrop,
231 {
drop(&mut self)232 fn drop(&mut self) {
233 trace!("Destroying {}: {:?}", T::TYPE, self.ptr);
234 unsafe { T::DROP(self.ptr.as_ptr()) }
235 trace!("Destroyed {}: {:?}", T::TYPE, self.ptr);
236 }
237 }
238
239 pub(crate) unsafe trait KafkaDrop {
240 const TYPE: &'static str;
241 const DROP: unsafe extern "C" fn(*mut Self);
242 }
243
244 impl<T> WrappedCPointer for NativePtr<T>
245 where
246 T: KafkaDrop,
247 {
248 type Target = T;
249
ptr(&self) -> *mut T250 fn ptr(&self) -> *mut T {
251 self.ptr.as_ptr()
252 }
253 }
254
255 impl<T> Deref for NativePtr<T>
256 where
257 T: KafkaDrop,
258 {
259 type Target = T;
deref(&self) -> &Self::Target260 fn deref(&self) -> &Self::Target {
261 unsafe { self.ptr.as_ref() }
262 }
263 }
264
265 impl<T> fmt::Debug for NativePtr<T>
266 where
267 T: KafkaDrop,
268 {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result269 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
270 self.ptr.fmt(f)
271 }
272 }
273
274 impl<T> NativePtr<T>
275 where
276 T: KafkaDrop,
277 {
from_ptr(ptr: *mut T) -> Option<Self>278 pub(crate) unsafe fn from_ptr(ptr: *mut T) -> Option<Self> {
279 NonNull::new(ptr).map(|ptr| Self { ptr })
280 }
281
ptr(&self) -> *mut T282 pub(crate) fn ptr(&self) -> *mut T {
283 self.ptr.as_ptr()
284 }
285 }
286
287 pub(crate) struct OnDrop<F>(pub F)
288 where
289 F: Fn();
290
291 impl<F> Drop for OnDrop<F>
292 where
293 F: Fn(),
294 {
drop(&mut self)295 fn drop(&mut self) {
296 (self.0)()
297 }
298 }
299
300 /// An abstraction over asynchronous runtimes.
301 ///
302 /// There are several asynchronous runtimes available for Rust. By default
303 /// rust-rdkafka uses Tokio, via the [`TokioRuntime`], but it has pluggable
304 /// support for any runtime that can satisfy this trait.
305 ///
306 /// For an example of using the [smol] runtime with rust-rdkafka, see the
307 /// [smol_runtime] example.
308 ///
309 /// [smol]: https://docs.rs/smol
310 /// [smol_runtime]: https://github.com/fede1024/rust-rdkafka/tree/master/examples/smol_runtime.rs
311 pub trait AsyncRuntime {
312 /// The type of the future returned by
313 /// [`delay_for`](AsyncRuntime::delay_for).
314 type Delay: Future<Output = ()> + Send + Unpin;
315
316 /// Spawns an asynchronous task.
317 ///
318 /// The task should be be polled to completion, unless the runtime exits
319 /// first. With some runtimes this requires an explicit "detach" step.
spawn<T>(task: T) where T: Future<Output = ()> + Send + 'static320 fn spawn<T>(task: T)
321 where
322 T: Future<Output = ()> + Send + 'static;
323
324 /// Constructs a future that will resolve after `duration` has elapsed.
delay_for(duration: Duration) -> Self::Delay325 fn delay_for(duration: Duration) -> Self::Delay;
326 }
327
328 /// An [`AsyncRuntime`] implementation backed by [Tokio](tokio).
329 ///
330 /// This runtime is used by default throughout the crate, unless the `tokio`
331 /// feature is disabled.
332 #[cfg(feature = "tokio")]
333 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
334 pub struct TokioRuntime;
335
336 #[cfg(feature = "tokio")]
337 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
338 impl AsyncRuntime for TokioRuntime {
339 type Delay = tokio::time::Delay;
340
spawn<T>(task: T) where T: Future<Output = ()> + Send + 'static,341 fn spawn<T>(task: T)
342 where
343 T: Future<Output = ()> + Send + 'static,
344 {
345 tokio::spawn(task);
346 }
347
delay_for(duration: Duration) -> Self::Delay348 fn delay_for(duration: Duration) -> Self::Delay {
349 tokio::time::delay_for(duration)
350 }
351 }
352