Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fix simple source #69

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "signal"] }
tokio-util = "0.7.10"
tokio-stream = { version = "0.1.14", features = ["net"] }
serde = { version = "1.0.194", features = ["derive"] }
chrono = "0.4.31"
chrono = "0.4.38"
serde_json = "1.0.111"
futures-util = "0.3.30"
tracing = "0.1.40"
Expand Down
2 changes: 1 addition & 1 deletion examples/simple-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ path = "src/main.rs"
tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { path = "../../" }
chrono = "0.4.30"
chrono = "0.4.38"
uuid = "1.2.0"
74 changes: 27 additions & 47 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,97 +2,77 @@

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let source_handle = simple_source::SimpleSource::new();
let source_handle = simple_source::SimpleSource::new("Hello World!".to_string());
numaflow::source::Server::new(source_handle).start().await
}

pub(crate) mod simple_source {
use std::{collections::HashSet, sync::RwLock};

use chrono::Utc;
use tokio::sync::mpsc::Sender;

use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use std::{
collections::HashMap,
collections::HashSet,
sync::atomic::{AtomicUsize, Ordering},
sync::RwLock,
};
use tokio::{sync::mpsc::Sender, time::Instant};
use tonic::async_trait;
use uuid::Uuid;

/// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
/// does not provide a mutable reference as explained in [`numaflow::source::Sourcer`]
/// does not provide a mutable reference as explained in [`Sourcer`]
pub(crate) struct SimpleSource {
read_idx: AtomicUsize,
yet_to_ack: RwLock<HashSet<u32>>,
payload: String,
yet_to_ack: RwLock<HashSet<String>>,
}

impl SimpleSource {
pub fn new() -> Self {
pub(crate) fn new(payload: String) -> Self {
Self {
read_idx: AtomicUsize::new(0),
payload,
yet_to_ack: RwLock::new(HashSet::new()),
}
}
}

#[async_trait]
#[tonic::async_trait]
impl Sourcer for SimpleSource {
async fn read(&self, source_request: SourceReadRequest, transmitter: Sender<Message>) {
async fn read(&self, request: SourceReadRequest, transmitter: Sender<Message>) {
if !self.yet_to_ack.read().unwrap().is_empty() {
return;
}
let start = Instant::now();

for i in 1..=source_request.count {
// if the time elapsed is greater than the timeout, return
if start.elapsed().as_millis() > source_request.timeout.as_millis() {
return;
}

let mut headers = HashMap::new();
headers.insert(String::from("x-txn-id"), String::from(Uuid::new_v4()));

// increment the read_idx which is used as the offset
self.read_idx
.store(self.read_idx.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
let offset = self.read_idx.load(Ordering::Relaxed);
// send the message to the transmitter
let event_time = Utc::now();
let mut message_offsets = Vec::with_capacity(request.count);
for i in 0..request.count {
let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i);
transmitter
.send(Message {
value: format!("{i} at {offset}").into_bytes(),
value: format!("{}-{}", self.payload, event_time).into_bytes(),
event_time,
offset: Offset {
offset: offset.to_be_bytes().to_vec(),
offset: offset.clone().into_bytes(),
partition_id: 0,
},
event_time: chrono::offset::Utc::now(),
keys: vec![],
headers,
headers: Default::default(),
})
.await
.unwrap();

// add the entry to hashmap to mark the offset as pending to-be-acked
let mut yet_to_ack = self.yet_to_ack.write().expect("lock has been poisoned");
yet_to_ack.insert(offset as u32);
message_offsets.push(offset)
}
self.yet_to_ack.write().unwrap().extend(message_offsets)
}

async fn ack(&self, offsets: Vec<Offset>) {
// remove the offsets from yet_to_ack since we have received an ack for it
for offset in offsets {
let val = u32::from_be_bytes(offset.offset[0..4].try_into().unwrap());
// remove the entry from pending table after acquiring lock
self.yet_to_ack.write().unwrap().remove(&val);
let x = &String::from_utf8(offset.offset).unwrap();
self.yet_to_ack.write().unwrap().remove(x);
}
}

async fn pending(&self) -> usize {
// pending for simple source is zero since we are not reading from any external source
0
self.yet_to_ack.read().unwrap().len()
}

async fn partitions(&self) -> Option<Vec<i32>> {
Some(vec![1])
Some(vec![0])
}
}
}
4 changes: 2 additions & 2 deletions examples/sink-log/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.70 AS build
FROM rust:1.76-bookworm AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand All @@ -11,7 +11,7 @@ WORKDIR /numaflow-rs/examples/sink-log
RUN cargo build --release

# our final base
FROM rust AS sink-log
FROM debian:bookworm AS simple-source

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/examples/sink-log/target/release/server .
Expand Down
Loading