Skip to content

Commit

Permalink
dekaf: Fix incorrect usage of fetch_task_authorization
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Jan 7, 2025
1 parent 42c6784 commit e90337e
Showing 1 changed file with 21 additions and 31 deletions.
52 changes: 21 additions & 31 deletions crates/dekaf/src/log_journal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{topology::fetch_dekaf_task_auth, App};
use crate::{dekaf_shard_template_id, topology::fetch_dekaf_task_auth, App};
use flow_client::fetch_task_authorization;
use gazette::journal;
use serde_json::json;
Expand All @@ -12,13 +12,12 @@ use tracing_subscriber::layer::{Context, Layer};
enum LoggingMessage {
SetTaskName(String),
Log(ops::Log),
Shutdown,
}

#[derive(Clone)]
struct SessionLogger {
/// Buffer log messages before we know what our task name is
tx: Arc<tokio::sync::mpsc::Sender<LoggingMessage>>,
tx: tokio::sync::mpsc::Sender<LoggingMessage>,
_handle: Arc<JoinHandle<anyhow::Result<()>>>,
}

Expand All @@ -45,7 +44,7 @@ impl SessionLogger {

let client = fetch_task_authorization(
&client,
&ops_logs,
&dekaf_shard_template_id(name.as_str()),
&app.data_plane_fqdn,
&app.data_plane_signer,
proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND,
Expand Down Expand Up @@ -73,16 +72,13 @@ impl SessionLogger {
logs_buffer.push_back(log);
}
}
LoggingMessage::Shutdown => {
break;
}
}
}
Ok(())
});

Self {
tx: Arc::new(log_tx),
tx: log_tx,
_handle: Arc::new(handle),
}
}
Expand All @@ -100,13 +96,22 @@ async fn send_to_journal(
let mut buf = serde_json::to_vec(&message).expect("Log always serializes");
buf.push(b'\n');

let req = proto_gazette::broker::AppendRequest {
journal: journal.to_string(),
content: buf,
..Default::default()
};

client.append(futures::stream::once(async { req })).await?;
let req = vec![
proto_gazette::broker::AppendRequest {
journal: journal.to_string(),
..Default::default()
},
proto_gazette::broker::AppendRequest {
content: buf,
..Default::default()
},
proto_gazette::broker::AppendRequest {
content: vec![],
..Default::default()
},
];

client.append(futures::stream::iter(req)).await?;

Ok(())
}
Expand Down Expand Up @@ -170,7 +175,7 @@ where
log.spans.push(span.clone());
}
}
logger.tx.try_send(LoggingMessage::Log(log)).unwrap();
logger.tx.try_send(LoggingMessage::Log(log));
}
}

Expand Down Expand Up @@ -215,21 +220,6 @@ where
span.extensions_mut().insert(visitor);
}
}

fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
// When we close out a span containing a SessionSpanMarker, that indicates that
// the session was closed, and we should shut down the log writer.
let span = ctx.span(&id).unwrap();
match span.extensions_mut().get_mut::<SessionSpanMarker>() {
Some(SessionSpanMarker {
logger: Some(ref logger),
..
}) => {
logger.tx.try_send(LoggingMessage::Shutdown).unwrap();
}
_ => {}
};
}
}

#[derive(Clone)]
Expand Down

0 comments on commit e90337e

Please sign in to comment.