Skip to content

Commit

Permalink
Add an ErrBuf helper to manage APIs with error strings
Browse files Browse the repository at this point in the history
  • Loading branch information
benesch committed Aug 6, 2019
1 parent 6a3da62 commit 70f6125
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 14 deletions.
9 changes: 4 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::error::{IsError, KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::metadata::Metadata;
use crate::statistics::Statistics;
use crate::util::{bytes_cstr_to_owned, timeout_to_ms};
use crate::util::{ErrBuf, timeout_to_ms};

/// Client-level context
///
Expand Down Expand Up @@ -116,21 +116,20 @@ impl<C: ClientContext> Client<C> {
pub fn new(config: &ClientConfig, native_config: NativeClientConfig, rd_kafka_type: RDKafkaType,
context: C)
-> KafkaResult<Client<C>> {
let errstr = [0i8; 1024];
let mut err_buf = ErrBuf::new();
let mut boxed_context = Box::new(context);
unsafe { rdsys::rd_kafka_conf_set_opaque(native_config.ptr(), (&mut *boxed_context) as *mut C as *mut c_void) };
unsafe { rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>)) };
unsafe { rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::<C>)) };
unsafe { rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::<C>)) };

let client_ptr = unsafe {
rdsys::rd_kafka_new(rd_kafka_type, native_config.ptr_move(), errstr.as_ptr() as *mut c_char, errstr.len())
rdsys::rd_kafka_new(rd_kafka_type, native_config.ptr_move(), err_buf.as_mut_ptr(), err_buf.len())
};
trace!("Create new librdkafka client {:p}", client_ptr);

if client_ptr.is_null() {
let descr = unsafe { bytes_cstr_to_owned(&errstr) };
return Err(KafkaError::ClientCreation(descr));
return Err(KafkaError::ClientCreation(err_buf.to_string()));
}

unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };
Expand Down
14 changes: 5 additions & 9 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ use crate::rdsys;

use crate::client::ClientContext;
use crate::error::{KafkaError, KafkaResult, IsError};
use crate::util::bytes_cstr_to_owned;
use crate::util::ErrBuf;

use std::collections::HashMap;
use std::ffi::CString;
use std::mem;
use std::os::raw::c_char;

const ERR_LEN: usize = 256;


/// The log levels supported by librdkafka.
Expand Down Expand Up @@ -146,17 +143,16 @@ impl ClientConfig {
/// Returns the native rdkafka-sys configuration.
pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
let conf = unsafe { rdsys::rd_kafka_conf_new() };
let errstr = [0; ERR_LEN];
let mut err_buf = ErrBuf::new();
for (key, value) in &self.conf_map {
let key_c = CString::new(key.to_string())?;
let value_c = CString::new(value.to_string())?;
let ret = unsafe {
rdsys::rd_kafka_conf_set(conf, key_c.as_ptr(), value_c.as_ptr(),
errstr.as_ptr() as *mut c_char, errstr.len())
err_buf.as_mut_ptr(), err_buf.len())
};
if ret.is_error() {
let descr = unsafe { bytes_cstr_to_owned(&errstr) };
return Err(KafkaError::ClientConfig(ret, descr, key.to_string(), value.to_string()));
if ret.is_error() {;
return Err(KafkaError::ClientConfig(ret, err_buf.to_string(), key.to_string(), value.to_string()));
}
}
Ok(unsafe {NativeClientConfig::from_ptr(conf)})
Expand Down
30 changes: 30 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,36 @@ pub unsafe fn cstr_to_owned(cstr: *const i8) -> String {
CStr::from_ptr(cstr as *const c_char).to_string_lossy().into_owned()
}

pub(crate) struct ErrBuf {
buf: [c_char; ErrBuf::MAX_ERR_LEN]
}

impl ErrBuf {
const MAX_ERR_LEN: usize = 512;

pub fn new() -> ErrBuf {
ErrBuf { buf: [0; ErrBuf::MAX_ERR_LEN] }
}

pub fn as_mut_ptr(&mut self) -> *mut i8 {
self.buf.as_mut_ptr()
}

pub fn len(&self) -> usize {
self.buf.len()
}

pub fn to_string(&self) -> String {
unsafe { bytes_cstr_to_owned(&self.buf) }
}
}

impl Default for ErrBuf {
fn default() -> ErrBuf {
ErrBuf::new()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 70f6125

Please sign in to comment.