Skip to content

Commit

Permalink
dekaf: Update to use journal::Client::append_once
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Jan 7, 2025
1 parent 7cea7b6 commit 6dec534
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 91 deletions.
209 changes: 123 additions & 86 deletions crates/dekaf/src/log_journal.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use crate::{dekaf_shard_template_id, topology::fetch_dekaf_task_auth, App};
use anyhow::bail;
use flow_client::fetch_task_authorization;
use gazette::journal;
use gazette::{
journal,
uuid::{self, Producer},
};
use proto_gazette::message_flags;
use serde_json::json;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::{span, Event, Id};
Expand All @@ -14,67 +18,125 @@ enum LoggingMessage {
Log(ops::Log),
}

struct LogForwarder {
app: Arc<App>,
producer: gazette::uuid::Producer,
}

impl LogForwarder {
fn new(app: Arc<App>, producer: Producer) -> Self {
Self { app, producer }
}

async fn forward_logs(
mut self,
mut logs_rx: tokio::sync::mpsc::Receiver<LoggingMessage>,
) -> anyhow::Result<()> {
let mut log_data = Vec::new();

let (ops_logs_journal_client, ops_logs_journal) = loop {
match logs_rx.recv().await {
Some(LoggingMessage::SetTaskName(name)) => {
let (client, ops_logs) = self.get_journal_client(name).await?;
break (client, ops_logs);
}
Some(LoggingMessage::Log(log)) => {
log_data.append(&mut self.serialize_log(log));
}
None => return Ok(()),
}
};

tracing::warn!(ops_logs_journal, "Got ops logs name!");

let resp = ops_logs_journal_client
.append_once(ops_logs_journal.clone(), log_data)
.await;

tracing::warn!(?resp, "Got append response!");

while let Some(msg) = logs_rx.recv().await {
match msg {
LoggingMessage::SetTaskName(_) => bail!("Already set task name!"),
LoggingMessage::Log(log) => {
let serialized = self.serialize_log(log);
ops_logs_journal_client
.append_once(ops_logs_journal.clone(), serialized)
.await?;
}
}
}

Ok(())
}

fn serialize_log(&mut self, log: ops::Log) -> Vec<u8> {
let uuid = gazette::uuid::build(
self.producer,
gazette::uuid::Clock::from_time(std::time::SystemTime::now()),
uuid::Flags(
(message_flags::MASK | message_flags::OUTSIDE_TXN)
.try_into()
.unwrap(),
),
);

let mut val = serde_json::to_value(log).expect("Log always serializes");

if let Some(obj) = val.as_object_mut() {
obj.insert("_meta".to_string(), json!({ "uuid": uuid }));
}

let mut buf = serde_json::to_vec(&val).expect("Value always serializes");
buf.push(b'\n');

buf
}

async fn get_journal_client(
&self,
task_name: String,
) -> anyhow::Result<(journal::Client, String)> {
let (client, _claims, ops_logs, _ops_stats, _task_spec) = fetch_dekaf_task_auth(
self.app.client_base.clone(),
&task_name,
&self.app.data_plane_fqdn,
&self.app.data_plane_signer,
)
.await?;

let client = fetch_task_authorization(
&client,
&dekaf_shard_template_id(task_name.as_str()),
&self.app.data_plane_fqdn,
&self.app.data_plane_signer,
proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND,
gazette::broker::LabelSelector {
include: Some(labels::build_set([("name", ops_logs.as_str())])),
exclude: None,
},
)
.await?;

Ok((client, ops_logs))
}
}

#[derive(Clone)]
struct SessionLogger {
/// Buffer log messages before we know what our task name is
tx: tokio::sync::mpsc::Sender<LoggingMessage>,
_handle: Arc<JoinHandle<anyhow::Result<()>>>,
_handle: Arc<JoinHandle<()>>,
}

impl SessionLogger {
fn new(app: Arc<App>) -> Self {
let (log_tx, mut log_rx) = tokio::sync::mpsc::channel(10000);
fn new(app: Arc<App>, producer: Producer) -> Self {
let (log_tx, log_rx) = tokio::sync::mpsc::channel(10000);

let forwarder = LogForwarder::new(app, producer);
let handle = tokio::spawn(async move {
let mut ops_logs_journal_client = None;
let mut ops_logs_journal = None;
let mut logs_buffer = VecDeque::new();

while let Some(msg) = log_rx.recv().await {
match msg {
LoggingMessage::SetTaskName(name) => {
let (client, _claims, ops_logs, _ops_stats, _task_spec) =
fetch_dekaf_task_auth(
app.client_base.clone(),
&name,
&app.data_plane_fqdn,
&app.data_plane_signer,
)
.await?;

let client = fetch_task_authorization(
&client,
&dekaf_shard_template_id(name.as_str()),
&app.data_plane_fqdn,
&app.data_plane_signer,
proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND,
gazette::broker::LabelSelector {
include: Some(labels::build_set([("name", ops_logs.as_str())])),
exclude: None,
},
)
.await?;

// Flush buffered logs now that we have a client
while let Some(log) = logs_buffer.pop_front() {
send_to_journal(log, ops_logs.as_str(), &client).await?;
}

ops_logs_journal = Some(ops_logs.to_string());
ops_logs_journal_client = Some(client);
}
LoggingMessage::Log(log) => {
if let (Some(client), Some(journal)) =
(&ops_logs_journal_client, &ops_logs_journal)
{
send_to_journal(log, journal.as_str(), client).await?;
} else {
logs_buffer.push_back(log);
}
}
}
if let Err(e) = forwarder.forward_logs(log_rx).await {
tracing::error!(error = ?e, "Log forwarding errored");
}
Ok(())
});

Self {
Expand All @@ -88,41 +150,14 @@ impl SessionLogger {
}
}

async fn send_to_journal(
message: ops::Log,
journal: &str,
client: &journal::Client,
) -> anyhow::Result<()> {
let mut buf = serde_json::to_vec(&message).expect("Log always serializes");
buf.push(b'\n');

let req = vec![
proto_gazette::broker::AppendRequest {
journal: journal.to_string(),
..Default::default()
},
proto_gazette::broker::AppendRequest {
content: buf,
..Default::default()
},
proto_gazette::broker::AppendRequest {
content: vec![],
..Default::default()
},
];

client.append(futures::stream::iter(req)).await?;

Ok(())
}

pub struct SessionSubscriberLayer {
app: Arc<App>,
producer: uuid::Producer,
}

impl SessionSubscriberLayer {
pub fn new(app: Arc<App>) -> Self {
Self { app }
pub fn new(app: Arc<App>, producer: uuid::Producer) -> Self {
Self { app, producer }
}

pub fn log_from_metadata(&self, metadata: &tracing::Metadata) -> ops::Log {
Expand Down Expand Up @@ -210,6 +245,7 @@ where
let mut visitor = SessionSpanMarker {
logger: None,
app: self.app.clone(),
producer: self.producer,
};

attrs.record(&mut visitor);
Expand All @@ -226,12 +262,13 @@ where
struct SessionSpanMarker {
logger: Option<SessionLogger>,
app: Arc<App>,
producer: uuid::Producer,
}

impl tracing::field::Visit for SessionSpanMarker {
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
if field.name() == "is_session" && self.logger.is_none() && value {
self.logger = Some(SessionLogger::new(self.app.clone()))
self.logger = Some(SessionLogger::new(self.app.clone(), self.producer))
}
}

Expand All @@ -241,7 +278,7 @@ impl tracing::field::Visit for SessionSpanMarker {
if let Some(ref mut logger) = self.logger {
logger.set_task_name(value.to_string())
} else {
let logger = SessionLogger::new(self.app.clone());
let logger = SessionLogger::new(self.app.clone(), self.producer);
logger.set_task_name(value.to_string());
self.logger = Some(logger)
}
Expand Down
23 changes: 18 additions & 5 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use flow_client::{
LOCAL_PG_URL,
};
use futures::{FutureExt, TryStreamExt};
use rand::Rng;
use rustls::pki_types::CertificateDer;
use std::{
fs::File,
Expand Down Expand Up @@ -154,18 +155,30 @@ async fn main() -> anyhow::Result<()> {
client_base: flow_client::Client::new(cli.agent_endpoint, api_key, api_endpoint, None),
});

let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::WARN.into()) // Otherwise it's ERROR.
.from_env_lossy();
let env_filter_builder = || {
EnvFilter::builder()
.with_default_directive(LevelFilter::WARN.into()) // Otherwise it's ERROR.
.from_env_lossy()
};

let fmt_layer = tracing_subscriber::fmt::Layer::default()
.with_writer(std::io::stderr)
.with_filter(env_filter);
.with_filter(env_filter_builder());

// There's probably a neat bit-banging way to do this with i64 and masks, but I'm just not that fancy.
let mut producer_id = rand::thread_rng().gen::<[u8; 6]>();
producer_id[0] |= 0x01;

let registry = tracing_subscriber::registry()
// We attach `ops::tracing::Layer` so we can use the `Log` structs it attaches to spans
.with(ops::tracing::Layer::new(|_| {}, std::time::SystemTime::now))
.with(dekaf::SessionSubscriberLayer::new(app.clone()))
.with(
dekaf::SessionSubscriberLayer::new(
app.clone(),
gazette::uuid::Producer::from_bytes(producer_id),
)
.with_filter(env_filter_builder()),
)
.with(fmt_layer);

registry.init();
Expand Down

0 comments on commit 6dec534

Please sign in to comment.