Skip to content

Commit

Permalink
Support asynchronous log callback
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Jan 16, 2025
1 parent e5a881f commit 1d99d59
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 109 deletions.
53 changes: 35 additions & 18 deletions bindings/dart/lib/bindings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,79 @@ import 'package:path/path.dart';

export 'bindings.g.dart';

final class LogMessage extends Struct {
@Uint8()
external int level;

external Pointer<Uint8> ptr;

@Uint64()
external int len;

@Uint64()
external int cap;
}

/// Callback for `service_start` and `service_stop`.
typedef StatusCallback = Void Function(Pointer<Void>, Uint16);

/// Callback for `log_init`.
typedef LogCallback = Void Function(Uint8, Pointer<Char>);
typedef LogCallback = Void Function(LogMessage);

///
typedef ServiceStart = Pointer<Void> Function(
typedef StartService = Pointer<Void> Function(
Pointer<Char>,
Pointer<Char>,
Pointer<NativeFunction<StatusCallback>>,
Pointer<Void>,
);

typedef _ServiceStartC = Pointer<Void> Function(
typedef _StartServiceC = Pointer<Void> Function(
Pointer<Char>,
Pointer<Char>,
Pointer<NativeFunction<StatusCallback>>,
Pointer<Void>,
);

typedef ServiceStop = void Function(
typedef StopService = void Function(
Pointer<Void>, Pointer<NativeFunction<StatusCallback>>, Pointer<Void>);

typedef _ServiceStopC = Void Function(
typedef _StopServiceC = Void Function(
Pointer<Void>, Pointer<NativeFunction<StatusCallback>>, Pointer<Void>);

typedef LogInit = int Function(
typedef InitLog = int Function(
Pointer<Char>,
Pointer<NativeFunction<LogCallback>>,
Pointer<Char>,
);

typedef _LogInitC = Uint16 Function(
typedef _InitLogC = Uint16 Function(
Pointer<Char>,
Pointer<NativeFunction<LogCallback>>,
Pointer<Char>,
);

typedef ReleaseLogMessage = void Function(LogMessage);
typedef _ReleaseLogMessageC = Void Function(LogMessage);

class Bindings {
Bindings(DynamicLibrary library)
: serviceStart = library
.lookup<NativeFunction<_ServiceStartC>>('service_start')
.asFunction(),
serviceStop = library
.lookup<NativeFunction<_ServiceStopC>>('service_stop')
.asFunction(),
logInit =
library.lookup<NativeFunction<_LogInitC>>('log_init').asFunction();
: startService = library
.lookupFunction<_StartServiceC, StartService>('start_service'),
stopService =
library.lookupFunction<_StopServiceC, StopService>('stop_service'),
initLog = library.lookupFunction<_InitLogC, InitLog>('init_log'),
releaseLogMessage =
library.lookupFunction<_ReleaseLogMessageC, ReleaseLogMessage>(
'release_log_message');

/// Bidings instance that uses the default library.
static Bindings instance = Bindings(_defaultLib());

final ServiceStart serviceStart;
final ServiceStop serviceStop;
final LogInit logInit;
final StartService startService;
final StopService stopService;
final InitLog initLog;
final ReleaseLogMessage releaseLogMessage;
}

DynamicLibrary _defaultLib() {
Expand Down
6 changes: 4 additions & 2 deletions bindings/dart/lib/ouisync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export 'bindings.dart'
PeerSource,
PeerStateKind;
export 'exception.dart';
export 'server.dart' show logInit;
export 'server.dart' show initLog;

part 'local_secret.dart';

Expand Down Expand Up @@ -56,7 +56,8 @@ class Session {
// that one instead. If we do spawn, we are responsible for logging
if (startServer) {
try {
logInit(callback: logger, tag: 'Server');
initLog(callback: logger, tag: 'Server');

server = await Server.start(
configPath: configPath,
debugLabel: debugLabel,
Expand All @@ -67,6 +68,7 @@ class Session {
}

final client = await Client.connect(configPath: configPath);

return Session._(client, server);
}

Expand Down
17 changes: 10 additions & 7 deletions bindings/dart/lib/server.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'dart:ffi';

import 'package:ffi/ffi.dart';
Expand Down Expand Up @@ -29,7 +30,7 @@ class Server {
);

try {
final handle = Bindings.instance.serviceStart(
final handle = Bindings.instance.startService(
configPathPtr.cast(),
debugLabelPtr.cast(),
callback.nativeFunction,
Expand Down Expand Up @@ -70,7 +71,7 @@ class Server {
);

try {
Bindings.instance.serviceStop(
Bindings.instance.stopService(
handle,
callback.nativeFunction,
nullptr,
Expand All @@ -87,7 +88,7 @@ class Server {
}
}

void logInit({
void initLog({
String? file,
Function(LogLevel, String)? callback,
String tag = '',
Expand All @@ -99,17 +100,19 @@ void logInit({

if (callback != null) {
nativeCallback = NativeCallable<LogCallback>.listener(
(int level, Pointer<Char> message) {
(LogMessage message) {
callback(
LogLevel.decode(level),
message.cast<Utf8>().toDartString(),
LogLevel.decode(message.level),
utf8.decode(message.ptr.asTypedList(message.len)),
);

Bindings.instance.releaseLogMessage(message);
},
);
}

try {
Bindings.instance.logInit(
Bindings.instance.initLog(
filePtr.cast(),
nativeCallback?.nativeFunction ?? nullptr,
tagPtr.cast(),
Expand Down
112 changes: 84 additions & 28 deletions service/src/ffi.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
ffi::{c_char, c_void, CStr, CString},
io,
io, mem,
path::Path,
pin::pin,
sync::OnceLock,
Expand All @@ -12,7 +12,7 @@ use tracing::{Instrument, Span};

use self::callback::Callback;
use crate::{
logger::{LogColor, LogFormat, Logger},
logger::{BufferPool, Logger},
protocol::{ErrorCode, LogLevel, ToErrorCode},
Error, Service,
};
Expand All @@ -32,7 +32,7 @@ use crate::{
/// - `debug_label` must be either null or must be safe to pass to [std::ffi::CStr::from_ptr].
/// - `callback_context` must be either null or it must be safe to access from multiple threads.
#[no_mangle]
pub unsafe extern "C" fn service_start(
pub unsafe extern "C" fn start_service(
config_dir: *const c_char,
debug_label: *const c_char,
callback: extern "C" fn(*const c_void, ErrorCode),
Expand Down Expand Up @@ -64,7 +64,7 @@ pub unsafe extern "C" fn service_start(
/// passed to `ouisync_stop`.
/// - `callback_context` must be either null of it must be safe to access from multiple threads.
#[no_mangle]
pub unsafe extern "C" fn service_stop(
pub unsafe extern "C" fn stop_service(
handle: *mut c_void,
callback: extern "C" fn(*const c_void, ErrorCode),
callback_context: *const c_void,
Expand Down Expand Up @@ -171,49 +171,105 @@ fn init(
/// Logs using the platforms' default logging infrastructure. If `file` is not null, additionally
/// logs to that file.
///
/// # Callback
///
/// If `callback` is not null, it is invoked for each log message. After the log message has been
/// processed, it needs to be released by calling `release_log_message`. Failure to do so will
/// cause memory leak. The messages can be processed asynchronously (e.g., in another thread).
///
/// # Safety
///
/// `file` must be either null or it must be safe to pass to [std::ffi::CStr::from_ptr].
/// `tag` must be non-null and safe to pass to [std::ffi::CStr::from_ptr].
#[no_mangle]
pub unsafe extern "C" fn log_init(
pub unsafe extern "C" fn init_log(
file: *const c_char,
callback: Option<extern "C" fn(LogLevel, *const c_char)>,
callback: Option<extern "C" fn(LogMessage)>,
tag: *const c_char,
) -> ErrorCode {
try_log_init(file, callback, tag).to_error_code()
try_init_log(file, callback, tag).to_error_code()
}

static LOGGER: OnceLock<Logger> = OnceLock::new();
#[no_mangle]
pub unsafe extern "C" fn release_log_message(message: LogMessage) {
let message = message.into_message();

if let Some(pool) = LOGGER.get().and_then(|wrapper| wrapper.pool.as_ref()) {
pool.release(message);
}
}

#[repr(C)]
pub struct LogMessage {
level: LogLevel,
ptr: *const u8,
len: usize,
cap: usize,
}

impl LogMessage {
fn new(level: LogLevel, message: Vec<u8>) -> Self {
let ptr = message.as_ptr();
let len = message.len();
let cap = message.capacity();
mem::forget(message);

Self {
level,
ptr,
len,
cap,
}
}

unsafe fn try_log_init(
unsafe fn into_message(self) -> Vec<u8> {
Vec::from_raw_parts(self.ptr as _, self.len, self.cap)
}
}

struct LoggerWrapper {
_logger: Logger,
pool: Option<BufferPool>,
}

static LOGGER: OnceLock<LoggerWrapper> = OnceLock::new();

unsafe fn try_init_log(
file: *const c_char,
callback: Option<extern "C" fn(LogLevel, *const c_char)>,
callback: Option<extern "C" fn(LogMessage)>,
tag: *const c_char,
) -> Result<(), Error> {
let file = if file.is_null() {
None
let builder = Logger::builder().with_tag(CStr::from_ptr(tag).to_str()?);
let builder = if !file.is_null() {
builder.with_file(Path::new(CStr::from_ptr(file).to_str()?))
} else {
Some(Path::new(CStr::from_ptr(file).to_str()?))
builder
};

let callback = callback.map(|callback| {
Box::new(move |level, message: &[u8]| {
callback(LogLevel::from(level), message.as_ptr() as _)
}) as _
});

let tag = CStr::from_ptr(tag).to_str()?.to_owned();
let (builder, pool) = if let Some(callback) = callback {
let pool = BufferPool::default();
let callback = Box::new(move |level, message: &mut Vec<u8>| {
callback(LogMessage::new(LogLevel::from(level), mem::take(message)));
});

let logger = Logger::new(file, callback, tag, LogFormat::Human, LogColor::Always)
.map_err(Error::InitializeLogger)?;
(builder.with_callback(callback, pool.clone()), Some(pool))
} else {
(builder, None)
};

LOGGER.set(logger).map_err(|_| {
Error::InitializeLogger(io::Error::new(
io::ErrorKind::AlreadyExists,
"logger already initialized",
))
})?;
let logger = builder.build().map_err(Error::InitializeLogger)?;

LOGGER
.set(LoggerWrapper {
_logger: logger,
pool,
})
.map_err(|_| {
Error::InitializeLogger(io::Error::new(
io::ErrorKind::AlreadyExists,
"logger already initialized",
))
})?;

Ok(())
}
Expand Down
Loading

0 comments on commit 1d99d59

Please sign in to comment.