Skip to content

Commit

Permalink
chore: simplify API and get rid of type level programming (#205)
Browse files Browse the repository at this point in the history
* experiment

* cleanup

* more cleanup

* naming

* handle_boxed_cmd

* EventSourcedEntity clone

* pub BoxedCmd

* change handle_boxed_cmd return type

* cleanup

* no more handle_boxed_cmd and pub BoxedCmd

---------

Co-authored-by: Johannes Rudolph <[email protected]>
  • Loading branch information
hseeberger and jrudolph authored Mar 20, 2024
1 parent 9716f3f commit 9d3404a
Show file tree
Hide file tree
Showing 15 changed files with 119 additions and 287 deletions.
55 changes: 0 additions & 55 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ bb8-postgres = { version = "0.8" }
bytes = { version = "1.5" }
configured = { version = "0.7" }
error-ext = { version = "0.1" }
frunk = { version = "0.4" }
futures = { version = "0.3" }
humantime-serde = { version = "1.1" }
pin-project = { version = "1.1" }
Expand Down
11 changes: 5 additions & 6 deletions eventsourced-nats/src/evt_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,11 @@ fn evt_stream_max_bytes_default() -> i64 {

#[cfg(test)]
mod tests {
use super::*;
use crate::tests::NATS_VERSION;
use crate::{tests::NATS_VERSION, NatsEvtLog, NatsEvtLogConfig};
use error_ext::BoxError;
use eventsourced::binarize;
use futures::TryStreamExt;
use std::future;
use eventsourced::{binarize, evt_log::EvtLog};
use futures::{StreamExt, TryStreamExt};
use std::{future, num::NonZeroU64};
use testcontainers::{clients::Cli, core::WaitFor};
use testcontainers_modules::testcontainers::GenericImage;
use uuid::Uuid;
Expand All @@ -391,7 +390,7 @@ mod tests {
let container = client.run((nats_image, vec!["-js".to_string()]));
let server_addr = format!("localhost:{}", container.get_host_port_ipv4(4222));

let config = Config {
let config = NatsEvtLogConfig {
server_addr,
setup: true,
..Default::default()
Expand Down
7 changes: 3 additions & 4 deletions eventsourced-nats/src/snapshot_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,9 @@ mod proto {

#[cfg(test)]
mod tests {
use super::*;
use crate::tests::NATS_VERSION;
use crate::{tests::NATS_VERSION, NatsSnapshotStore, NatsSnapshotStoreConfig};
use error_ext::BoxError;
use eventsourced::binarize;
use eventsourced::{binarize, snapshot_store::SnapshotStore};
use testcontainers::{clients::Cli, core::WaitFor};
use testcontainers_modules::testcontainers::GenericImage;
use uuid::Uuid;
Expand All @@ -244,7 +243,7 @@ mod tests {
let container = client.run((nats_image, vec!["-js".to_string()]));
let server_addr = format!("localhost:{}", container.get_host_port_ipv4(4222));

let config = Config {
let config = NatsSnapshotStoreConfig {
server_addr,
setup: true,
..Default::default()
Expand Down
12 changes: 7 additions & 5 deletions eventsourced-postgres/src/evt_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,20 +417,22 @@ const fn id_broadcast_capacity_default() -> NonZeroUsize {

#[cfg(test)]
mod tests {
use super::*;
use eventsourced::binarize;
use std::future;
use crate::{PostgresEvtLog, PostgresEvtLogConfig};
use error_ext::BoxError;
use eventsourced::{binarize, evt_log::EvtLog};
use futures::{StreamExt, TryStreamExt};
use std::{future, num::NonZeroU64};
use testcontainers::clients::Cli;
use testcontainers_modules::postgres::Postgres;
use uuid::Uuid;

#[tokio::test]
async fn test_evt_log() -> Result<(), Box<dyn StdError + Send + Sync>> {
async fn test_evt_log() -> Result<(), BoxError> {
let client = Cli::default();
let container = client.run(Postgres::default().with_host_auth());
let port = container.get_host_port_ipv4(5432);

let config = Config {
let config = PostgresEvtLogConfig {
port,
setup: true,
..Default::default()
Expand Down
6 changes: 3 additions & 3 deletions eventsourced-postgres/src/snapshot_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ fn snapshots_table_default() -> String {

#[cfg(test)]
mod tests {
use super::*;
use crate::{PostgresSnapshotStore, PostgresSnapshotStoreConfig};
use error_ext::BoxError;
use eventsourced::binarize;
use eventsourced::{binarize, snapshot_store::SnapshotStore};
use testcontainers::clients::Cli;
use testcontainers_modules::postgres::Postgres;
use uuid::Uuid;
Expand All @@ -205,7 +205,7 @@ mod tests {
let container = client.run(Postgres::default().with_host_auth());
let port = container.get_host_port_ipv4(5432);

let config = Config {
let config = PostgresSnapshotStoreConfig {
port,
setup: true,
..Default::default()
Expand Down
1 change: 0 additions & 1 deletion eventsourced-projection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@ trait-variant = { workspace = true }
testcontainers = { workspace = true }
testcontainers-modules = { workspace = true, features = [ "postgres" ] }
tokio = { workspace = true, features = [ "macros" ] }
tracing-subscriber = { workspace = true }
uuid = { workspace = true }
129 changes: 34 additions & 95 deletions eventsourced-projection/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,92 +346,23 @@ async fn save_seq_no(
.await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use crate::postgres::{ErrorStrategy, EvtHandler, Projection};
use error_ext::BoxError;
use futures::{stream, Stream};
use eventsourced::{
binarize::serde_json::to_bytes,
evt_log::{test::TestEvtLog, EvtLog},
};
use sqlx::{
postgres::{PgConnectOptions, PgPoolOptions},
Row,
Postgres, QueryBuilder, Row, Transaction,
};
use std::{iter::once, time::Duration};
use testcontainers::{clients::Cli, RunnableImage};
use testcontainers_modules::postgres::Postgres as TCPostgres;
use uuid::Uuid;

#[derive(Debug, Clone)]
struct TestEvtLog;

impl EvtLog for TestEvtLog {
type Id = Uuid;
type Error = TestEvtLogError;

async fn persist<E, ToBytes, ToBytesError>(
&mut self,
_type_name: &'static str,
_id: &Self::Id,
last_seq_no: Option<NonZeroU64>,
_evt: &E,
_to_bytes: &ToBytes,
) -> Result<NonZeroU64, Self::Error>
where
E: Sync,
ToBytes: Fn(&E) -> Result<Bytes, ToBytesError> + Sync,
ToBytesError: StdError + Send + Sync + 'static,
{
let seq_no = last_seq_no.unwrap_or(NonZeroU64::MIN);
Ok(seq_no)
}

async fn last_seq_no(
&self,
_type_name: &'static str,
_id: &Self::Id,
) -> Result<Option<NonZeroU64>, Self::Error> {
Ok(Some(42.try_into().unwrap()))
}

async fn evts_by_id<E, FromBytes, FromBytesError>(
&self,
_type_name: &'static str,
_id: &Self::Id,
_from_seq_no: NonZeroU64,
_from_bytes: FromBytes,
) -> Result<impl Stream<Item = Result<(NonZeroU64, E), Self::Error>> + Send, Self::Error>
where
E: Send,
FromBytes: Fn(Bytes) -> Result<E, FromBytesError> + Copy + Send + Sync,
FromBytesError: StdError + Send + Sync + 'static,
{
Ok(stream::empty())
}

async fn evts_by_type<E, FromBytes, FromBytesError>(
&self,
_type_name: &'static str,
seq_no: NonZeroU64,
from_bytes: FromBytes,
) -> Result<impl Stream<Item = Result<(NonZeroU64, E), Self::Error>> + Send, Self::Error>
where
E: Send,
FromBytes: Fn(Bytes) -> Result<E, FromBytesError> + Copy + Send,
FromBytesError: StdError + Send + Sync + 'static,
{
let evts = stream::iter(seq_no.get()..=100).map(move |n| {
let evt = n as i64;
let n = NonZeroU64::new(n).unwrap();
let evt = from_bytes(serde_json::to_vec(&evt).unwrap().into()).unwrap();
Ok((n, evt))
});

Ok(evts)
}
}

#[derive(Debug, Error)]
#[error("TestEvtLogError")]
struct TestEvtLogError(#[source] BoxError);
use tokio::time::sleep;

#[derive(Clone)]
struct TestHandler;
Expand All @@ -444,52 +375,60 @@ mod tests {
evt: i32,
tx: &mut Transaction<'static, Postgres>,
) -> Result<(), Self::Error> {
let query = "INSERT INTO test (n) VALUES ($1)";
sqlx::query(query).bind(evt).execute(&mut **tx).await?;
QueryBuilder::new("INSERT INTO test (n) ")
.push_values(once(evt), |mut q, evt| {
q.push_bind(evt);
})
.build()
.execute(&mut **tx)
.await?;
Ok(())
}
}

#[tokio::test]
async fn test() -> Result<(), BoxError> {
// let _ = tracing_subscriber::registry()
// .with(EnvFilter::from_default_env())
// .with(fmt::layer().pretty())
// .try_init()?;

let testcontainers_client = Cli::default();
let container = testcontainers_client
.run(RunnableImage::from(TCPostgres::default()).with_tag("16-alpine"));
let containers = Cli::default();

let container =
containers.run(RunnableImage::from(TCPostgres::default()).with_tag("16-alpine"));
let port = container.get_host_port_ipv4(5432);

let cnn_url = format!("postgresql://postgres:postgres@localhost:{port}");
let cnn_options = cnn_url.parse::<PgConnectOptions>()?;
let pool = PgPoolOptions::new().connect_with(cnn_options).await?;

sqlx::query("CREATE TABLE test (n bigint PRIMARY KEY);")
let mut evt_log = TestEvtLog::<u64>::default();
for n in 1..=100 {
evt_log.persist("test", &0, None, &n, &to_bytes).await?;
}

sqlx::query("CREATE TABLE test (n bigint);")
.execute(&pool)
.await?;

let projection = Projection::new(
"dummy",
"test",
"test-projection".to_string(),
TestEvtLog,
evt_log.clone(),
TestHandler,
ErrorStrategy::Stop,
pool.clone(),
)
.await?;

sqlx::query("INSERT INTO projection VALUES ($1, $2)")
.bind("test-projection")
.bind(10)
QueryBuilder::new("INSERT INTO projection ")
.push_values(once(("test-projection", 10)), |mut q, (name, seq_no)| {
q.push_bind(name).push_bind(seq_no);
})
.build()
.execute(&pool)
.await?;

projection.run().await?;

let mut state = projection.get_state().await?;
let max = Some(NonZeroU64::new(100).unwrap());
let max = Some(100.try_into()?);
while state.seq_no < max {
sleep(Duration::from_millis(100)).await;
state = projection.get_state().await?;
Expand Down
2 changes: 0 additions & 2 deletions eventsourced/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ rustdoc-args = [ "--cfg", "docsrs" ]
[dependencies]
bytes = { workspace = true }
error-ext = { workspace = true }
frunk = { workspace = true }
futures = { workspace = true }
pin-project = { workspace = true }
prost = { workspace = true, optional = true }
Expand All @@ -30,7 +29,6 @@ trait-variant = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
async-stream = { workspace = true }
tokio = { workspace = true, features = [ "macros", "rt-multi-thread", "time" ] }
tracing-test = { workspace = true }
uuid = { workspace = true }
Expand Down
Loading

0 comments on commit 9d3404a

Please sign in to comment.