From 93133d9e3da9fbf28b142d35194e39ba1da8f179 Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Sun, 2 Jun 2024 19:24:00 +0200 Subject: [PATCH] feat: projection (#18) --- Cargo.lock | 191 ++++++++-- Cargo.toml | 4 + sql/create_event_log_uuid.sql | 4 +- sql/create_projection.sql | 2 + src/entity.rs | 248 +++++++------ src/lib.rs | 1 + src/projection.rs | 637 ++++++++++++++++++++++++++++++++++ 7 files changed, 956 insertions(+), 131 deletions(-) create mode 100644 sql/create_projection.sql create mode 100644 src/projection.rs diff --git a/Cargo.lock b/Cargo.lock index 510e3bf..7a6ff0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" dependencies = [ "gimli", ] @@ -60,6 +60,34 @@ dependencies = [ "libc", ] +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -94,9 +122,9 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "backtrace" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" +checksum = "17c6a35df3749d2e8bb1b7b21a976d82b15548788d2735b9d82f329268f71a11" dependencies = [ "addr2line", "cc", @@ -471,6 +499,8 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" name = "evented" version = "0.0.2" dependencies = [ + "assert_matches", + "async-stream", "error-ext", "futures", "secrecy", @@ -484,6 +514,7 @@ dependencies = [ "time", "tokio", "tracing", + "tracing-test", "trait-variant", "uuid", ] @@ -643,9 +674,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.28.1" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" [[package]] name = "hashbrown" @@ -815,9 +846,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d8d52be92d09acc2e01dddb7fde3ad983fc6489c7db4837e605bc3fca4cb63e" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" dependencies = [ "bytes", "futures-channel", @@ -997,6 +1028,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "md-5" version = "0.10.6" @@ -1049,6 +1089,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -1114,9 +1164,9 @@ dependencies = [ [[package]] name = "object" -version = "0.32.2" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +checksum = "b8ec7ab813848ba4522158d5517a6093db1ded27575b070f4177b8d12b41db5e" dependencies = [ "memchr", ] @@ -1139,6 +1189,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.3" @@ -1164,25 +1220,25 @@ dependencies = [ [[package]] name = "parse-display" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06af5f9333eb47bd9ba8462d612e37a8328a5cb80b13f0af4de4c3b89f52dee5" +checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" dependencies = [ "parse-display-derive", "regex", - "regex-syntax", + "regex-syntax 0.8.3", ] [[package]] name = "parse-display-derive" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc9252f259500ee570c75adcc4e317fa6f57a1e47747d622e0bf838002a7b790" +checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281" dependencies = [ "proc-macro2", "quote", "regex", - "regex-syntax", + "regex-syntax 0.8.3", "structmeta", "syn 2.0.66", ] @@ -1281,9 +1337,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.84" +version = "1.0.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec96c6a92621310b51366f1e28d05ef11489516e93be030060e5fc12024a49d6" +checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" dependencies = [ "unicode-ident", ] @@ -1364,8 +1420,17 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.6", + "regex-syntax 0.8.3", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1376,9 +1441,15 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.3", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.3" @@ -1653,6 +1724,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "signature" version = "2.2.0" @@ -2057,6 +2137,16 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.3.36" @@ -2223,6 +2313,59 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "tracing-test" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a2c0ff408fe918a94c428a3f2ad04e4afd5c95bbc08fcf868eff750c15728a4" +dependencies = [ + "lazy_static", + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258bc1c4f8e2e73a977812ab339d503e6feeb92700f6d07a6de4d321522d5c08" +dependencies = [ + "lazy_static", + "quote", + "syn 1.0.109", ] [[package]] @@ -2322,6 +2465,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 92a2eba..5d1de87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ repository = "https://github.com/hseeberger/evented" documentation = "https://github.com/hseeberger/evented" [dependencies] +async-stream = { version = "0.3" } error-ext = { version = "0.2" } futures = { version = "0.3" } secrecy = { version = "0.8", features = [ "serde" ] } @@ -19,13 +20,16 @@ serde_json = { version = "1.0" } serde_with = { version = "3.8" } sqlx = { version = "0.7", features = [ "postgres", "runtime-tokio" ] } thiserror = { version = "1.0" } +tokio = { version = "1", features = [ "sync" ] } tracing = { version = "0.1" } trait-variant = { version = "0.1" } [dev-dependencies] +assert_matches = { version = "1.5" } sqlx = { version = "0.7", features = [ "uuid" ] } testcontainers = { version = "0.17" } testcontainers-modules = { version = "0.5", features = [ "postgres" ] } time = { version = "0.3", features = [ "serde-human-readable" ] } tokio = { version = "1", features = [ "macros" ] } +tracing-test = { version = "0.2" } uuid = { version = "1.8", features = [ "serde", "v7" ] } diff --git a/sql/create_event_log_uuid.sql b/sql/create_event_log_uuid.sql index 5d4aa0d..de28227 100644 --- a/sql/create_event_log_uuid.sql +++ b/sql/create_event_log_uuid.sql @@ -3,8 +3,8 @@ CREATE TABLE seq_no bigserial PRIMARY KEY, entity_id uuid NOT NULL, version bigint NOT NULL, - type text NOT NULL, - event bytea NOT NULL, + type_name text NOT NULL, + event jsonb NOT NULL, metadata jsonb NOT NULL, UNIQUE (entity_id, version) ); \ No newline at end of file diff --git a/sql/create_projection.sql b/sql/create_projection.sql new file mode 100644 index 0000000..a696b21 --- /dev/null +++ b/sql/create_projection.sql @@ -0,0 +1,2 @@ +CREATE TABLE + IF NOT EXISTS projection (name text PRIMARY KEY, seq_no bigint); \ No newline at end of file diff --git a/src/entity.rs b/src/entity.rs index 55fbbdf..8f0ae7f 100644 --- a/src/entity.rs +++ b/src/entity.rs @@ -2,6 +2,7 @@ use crate::pool::Pool; use error_ext::BoxError; use futures::{future::ok, Stream, StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; +use serde_json::Value; use sqlx::{postgres::PgRow, Encode, Postgres, Row, Transaction, Type}; use std::{ fmt::{Debug, Display}, @@ -255,27 +256,32 @@ pub enum Error { } #[instrument(skip(pool))] -async fn current_events_by_id<'i, E>( - id: &'i E::Id, - pool: &Pool, -) -> impl Stream> + Send + 'i +async fn current_events_by_id<'a, E>( + id: &'a E::Id, + pool: &'a Pool, +) -> impl Stream> + Send + 'a where E: Entity, { - sqlx::query("SELECT version, event FROM event WHERE entity_id = $1 ORDER BY seq_no ASC") - .bind(id) - .fetch(&**pool) - .map_err(|error| Error::Sqlx("cannot get next event".to_string(), error)) - .map(|row| { - row.and_then(|row| { - let version = (row.get::(0) as u64) - .try_into() - .expect("version greater zero"); - let bytes = row.get::<&[u8], _>(1); - let event = serde_json::from_slice::(bytes).map_err(Error::De)?; - Ok((version, event)) - }) + sqlx::query( + "SELECT version, event + FROM event + WHERE entity_id = $1 + ORDER BY seq_no ASC", + ) + .bind(id) + .fetch(&**pool) + .map_err(|error| Error::Sqlx("cannot get next event".to_string(), error)) + .map(|row| { + row.and_then(|row| { + let version = (row.get::(0) as u64) + .try_into() + .expect("version greater zero"); + let value = row.get::(1); + let event = serde_json::from_value::(value).map_err(Error::De)?; + Ok((version, event)) }) + }) } #[instrument(skip(ewms, listener))] @@ -297,12 +303,16 @@ where .await .map_err(|error| Error::Sqlx("cannot begin transaction".to_string(), error))?; - let version = sqlx::query("SELECT MAX(version) FROM event WHERE entity_id = $1") - .bind(id) - .fetch_one(&mut *tx) - .await - .map_err(|error| Error::Sqlx("cannot select max version".to_string(), error)) - .map(into_version)?; + let version = sqlx::query( + "SELECT MAX(version) + FROM event + WHERE entity_id = $1", + ) + .bind(id) + .fetch_one(&mut *tx) + .await + .map_err(|error| Error::Sqlx("cannot select max version".to_string(), error)) + .map(into_version)?; if version != last_version { return Err(Error::UnexpectedVersion(version, last_version)); @@ -311,17 +321,20 @@ where let mut version = last_version.map(|n| n.get() as i64).unwrap_or_default(); for ewm @ EventWithMetadata { event, metadata } in ewms.iter() { version += 1; - let bytes = serde_json::to_vec(event).map_err(Error::Ser)?; + let bytes = serde_json::to_value(event).map_err(Error::Ser)?; let metadata = serde_json::to_value(metadata).map_err(Error::Ser)?; - sqlx::query("INSERT INTO event (entity_id, version, type, event, metadata) VALUES ($1, $2, $3, $4, $5)") - .bind(id) - .bind(version) - .bind(E::TYPE_NAME) - .bind(&bytes) - .bind(metadata) - .execute(&mut *tx) - .await - .map_err(|error| Error::Sqlx("cannot execute statement".to_string(), error))?; + sqlx::query( + "INSERT INTO event (entity_id, version, type_name, event, metadata) + VALUES ($1, $2, $3, $4, $5)", + ) + .bind(id) + .bind(version) + .bind(E::TYPE_NAME) + .bind(&bytes) + .bind(metadata) + .execute(&mut *tx) + .await + .map_err(|error| Error::Sqlx("cannot execute statement".to_string(), error))?; if let Some(listener) = listener { listener @@ -482,30 +495,41 @@ mod tests { event: Event::Increased { id, inc }, .. } => { - let value = sqlx::query("SELECT value FROM counters WHERE id = $1") - .bind(id) - .fetch_optional(&mut **tx) - .await - .map_err(Box::new)? - .map(|row| row.try_get::(0)) - .transpose()?; + let value = sqlx::query( + "SELECT value + FROM counters + WHERE id = $1", + ) + .bind(id) + .fetch_optional(&mut **tx) + .await + .map_err(Box::new)? + .map(|row| row.try_get::(0)) + .transpose()?; match value { Some(value) => { - sqlx::query("UPDATE counters SET value = $1 WHERE id = $2") - .bind(value + *inc as i64) - .bind(id) - .execute(&mut **tx) - .await - .map_err(Box::new)?; + sqlx::query( + "UPDATE counters + SET value = $1 + WHERE id = $2", + ) + .bind(value + *inc as i64) + .bind(id) + .execute(&mut **tx) + .await + .map_err(Box::new)?; } None => { - sqlx::query("INSERT INTO counters VALUES ($1, $2)") - .bind(id) - .bind(*inc as i64) - .execute(&mut **tx) - .await - .map_err(Box::new)?; + sqlx::query( + "INSERT INTO counters + VALUES ($1, $2)", + ) + .bind(id) + .bind(*inc as i64) + .execute(&mut **tx) + .await + .map_err(Box::new)?; } } Ok(()) @@ -533,46 +557,46 @@ mod tests { sslmode: PgSslMode::Prefer, }; - let pool = Pool::new(config).await.expect("pool can be created"); + let pool = Pool::new(config).await?; let ddl = include_str!("../sql/create_event_log_uuid.sql"); - (&*pool).execute(ddl).await.unwrap(); + (&*pool).execute(ddl).await?; let id = Uuid::from_u128(0); sqlx::query( - "INSERT INTO event (entity_id, version, type, event, metadata) VALUES ($1, $2, $3, $4, $5)", + "INSERT INTO event (entity_id, version, type_name, event, metadata) + VALUES ($1, $2, $3, $4, $5)", ) .bind(&id) .bind(1_i64) - .bind("type") - .bind(serde_json::to_vec(&Event::Increased { id, inc: 40 }).unwrap()) + .bind("test") + .bind(serde_json::to_value(&Event::Increased { id, inc: 40 })?) .bind(Value::Null) .execute(&*pool) - .await - .unwrap(); + .await?; sqlx::query( - "INSERT INTO event (entity_id, version, type, event, metadata) VALUES ($1, $2, $3, $4, $5)", + "INSERT INTO event (entity_id, version, type_name, event, metadata) + VALUES ($1, $2, $3, $4, $5)", ) .bind(&id) .bind(2_i64) - .bind("type") - .bind(serde_json::to_vec(&Event::Decreased { id, dec: 20 }).unwrap()) + .bind("test") + .bind(serde_json::to_value(&Event::Decreased { id, dec: 20 })?) .bind(Value::Null) .execute(&*pool) - .await - .unwrap(); + .await?; sqlx::query( - "INSERT INTO event (entity_id, version, type, event, metadata) VALUES ($1, $2, $3, $4, $5)", + "INSERT INTO event (entity_id, version, type_name, event, metadata) + VALUES ($1, $2, $3, $4, $5)", ) .bind(&id) .bind(3_i64) - .bind("type") - .bind(serde_json::to_vec(&Event::Increased { id, inc: 22 }).unwrap()) + .bind("test") + .bind(serde_json::to_value(&Event::Increased { id, inc: 22 })?) .bind(Value::Null) .execute(&*pool) - .await - .unwrap(); + .await?; - let counter = Counter::default().entity().build(id, pool).await.unwrap(); + let counter = Counter::default().entity().build(id, pool).await?; assert_eq!(counter.entity.0, 42); Ok(()) @@ -597,21 +621,21 @@ mod tests { let pool = Pool::new(config).await.expect("pool can be created"); let ddl = include_str!("../sql/create_event_log_uuid.sql"); - (&*pool).execute(ddl).await.unwrap(); + (&*pool).execute(ddl).await?; let id = Uuid::from_u128(0); - let mut counter = Counter::default().entity().build(id, pool).await.unwrap(); + let mut counter = Counter::default().entity().build(id, pool).await?; assert_eq!(counter.entity, Counter(0)); - let result = counter.handle_command(Decrease(1)).await.unwrap(); + let result = counter.handle_command(Decrease(1)).await?; assert_eq!(result, Err(Underflow)); - let result = counter.handle_command(Increase(40)).await.unwrap(); + let result = counter.handle_command(Increase(40)).await?; assert_eq!(result, Ok(&Counter(40))); - let result = counter.handle_command(Decrease(20)).await.unwrap(); + let result = counter.handle_command(Decrease(20)).await?; assert_eq!(result, Ok(&Counter(20))); - let result = counter.handle_command(Increase(22)).await.unwrap(); + let result = counter.handle_command(Increase(22)).await?; assert_eq!(result, Ok(&Counter(42))); Ok(()) @@ -636,10 +660,12 @@ mod tests { let pool = Pool::new(config).await.expect("pool can be created"); let ddl = include_str!("../sql/create_event_log_uuid.sql"); - (&*pool).execute(ddl).await.unwrap(); + (&*pool).execute(ddl).await?; - let ddl = "CREATE TABLE IF NOT EXISTS counters (id uuid, value bigint, PRIMARY KEY (id));"; - (&*pool).execute(ddl).await.unwrap(); + let ddl = "CREATE TABLE + IF NOT EXISTS + counters (id uuid, value bigint, PRIMARY KEY (id));"; + (&*pool).execute(ddl).await?; let id_0 = Uuid::from_u128(0); let id_1 = Uuid::from_u128(1); @@ -649,47 +675,53 @@ mod tests { .entity() .with_listener(Listener) .build(id_0, pool.clone()) - .await - .unwrap(); + .await?; let mut counter_1 = Counter::default() .entity() .with_listener(Listener) .build(id_1, pool.clone()) - .await - .unwrap(); + .await?; let mut counter_2 = Counter::default() .entity() .with_listener(Listener) .build(id_2, pool.clone()) - .await - .unwrap(); - - let _ = counter_1.handle_command(Increase(1)).await.unwrap(); - let _ = counter_2.handle_command(Increase(1)).await.unwrap(); - let _ = counter_2.handle_command(Increase(1)).await.unwrap(); - - let value = sqlx::query("SELECT value FROM counters WHERE id = $1") - .bind(id_0) - .fetch_optional(&*pool) - .await - .unwrap() - .map(|row| row.get::(0)); + .await?; + + let _ = counter_1.handle_command(Increase(1)).await?; + let _ = counter_2.handle_command(Increase(1)).await?; + let _ = counter_2.handle_command(Increase(1)).await?; + + let value = sqlx::query( + "SELECT value + FROM counters + WHERE id = $1", + ) + .bind(id_0) + .fetch_optional(&*pool) + .await? + .map(|row| row.get::(0)); assert!(value.is_none()); - let value = sqlx::query("SELECT value FROM counters WHERE id = $1") - .bind(id_1) - .fetch_optional(&*pool) - .await - .unwrap() - .map(|row| row.get::(0)); + let value = sqlx::query( + "SELECT value + FROM counters + WHERE id = $1", + ) + .bind(id_1) + .fetch_optional(&*pool) + .await? + .map(|row| row.get::(0)); assert_eq!(value, Some(1)); - let value = sqlx::query("SELECT value FROM counters WHERE id = $1") - .bind(id_2) - .fetch_optional(&*pool) - .await - .unwrap() - .map(|row| row.get::(0)); + let value = sqlx::query( + "SELECT value + FROM counters + WHERE id = $1", + ) + .bind(id_2) + .fetch_optional(&*pool) + .await? + .map(|row| row.get::(0)); assert_eq!(value, Some(2)); Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 8397b67..b06c2d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,2 +1,3 @@ pub mod entity; pub mod pool; +pub mod projection; diff --git a/src/projection.rs b/src/projection.rs new file mode 100644 index 0000000..2cd913a --- /dev/null +++ b/src/projection.rs @@ -0,0 +1,637 @@ +use crate::pool::Pool; +use async_stream::stream; +use error_ext::{BoxError, StdErrorExt}; +use futures::{Stream, StreamExt, TryStreamExt}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use sqlx::{postgres::PgRow, Postgres, Row, Transaction}; +use std::{ + error::Error as StdError, fmt::Debug, num::NonZeroU64, pin::pin, sync::Arc, time::Duration, +}; +use thiserror::Error; +use tokio::{ + sync::{mpsc, oneshot, RwLock}, + task, + time::sleep, +}; +use tracing::{debug, error, info}; + +/// A projection of events of an event sourced entity to a Postgres database. +#[derive(Debug, Clone)] +pub struct Projection { + name: String, + command_in: mpsc::Sender<(Command, oneshot::Sender)>, +} + +impl Projection { + pub async fn by_type_name( + type_name: &'static str, + name: String, + event_handler: H, + error_strategy: ErrorStrategy, + pool: Pool, + ) -> Self + where + E: for<'de> Deserialize<'de> + Send, + H: EventHandler + Clone + Sync + 'static, + { + let state = Arc::new(RwLock::new(State { + seq_no: None, + running: false, + error: None, + })); + + let (command_in, mut command_out) = mpsc::channel::<(Command, oneshot::Sender)>(1); + + task::spawn({ + let name = name.clone(); + let state = state.clone(); + + async move { + while let Some((command, reply_in)) = command_out.recv().await { + match command { + Command::Run => { + // Do not remove braces, dead-lock is waiting for you! + let running = { state.read().await.running }; + if running { + info!(type_name, name, "projection already running"); + } else { + info!(type_name, name, "running projection"); + + // Do not remove braces, dead-lock is waiting for you! + { + let mut state = state.write().await; + state.running = true; + state.error = None; + } + + run_projection::( + type_name, + name.clone(), + state.clone(), + event_handler.clone(), + pool.clone(), + error_strategy, + ) + .await; + } + + if reply_in.send(state.read().await.clone()).is_err() { + error!(type_name, name, "cannot send state"); + } + } + + Command::Stop => { + // Do not remove braces, dead-lock is waiting for you! + let running = { state.read().await.running }; + if running { + info!(type_name, name, "stopping projection"); + { + let mut state = state.write().await; + state.running = false; + } + } else { + info!(type_name, name, "projection already stopped"); + } + + if reply_in.send(state.read().await.clone()).is_err() { + error!(type_name, name, "cannot send state"); + } + } + + Command::GetState => { + if reply_in.send(state.read().await.clone()).is_err() { + error!(type_name, name, "cannot send state"); + } + } + } + } + } + }); + + Projection { name, command_in } + } + + pub async fn run(&self) -> Result { + self.dispatch_command(Command::Run).await + } + + pub async fn stop(&self) -> Result { + self.dispatch_command(Command::Stop).await + } + + pub async fn get_state(&self) -> Result { + self.dispatch_command(Command::GetState).await + } + + async fn dispatch_command(&self, command: Command) -> Result { + let (reply_in, reply_out) = oneshot::channel(); + self.command_in + .send((command, reply_in)) + .await + .map_err(|_| CommandError::SendCommand(command, self.name.clone()))?; + let state = reply_out + .await + .map_err(|_| CommandError::ReceiveResponse(command, self.name.clone()))?; + Ok(state) + } +} + +#[trait_variant::make(Send)] +pub trait EventHandler { + type Error: StdError + Send + Sync + 'static; + + async fn handle_event( + &self, + event: E, + tx: &mut Transaction<'static, Postgres>, + ) -> Result<(), Self::Error>; +} + +#[derive(Debug, Error, Serialize, Deserialize)] +pub enum CommandError { + /// The command cannot be sent to the spawned projection. + #[error("cannot send command {0:?} to spawned projection {1}")] + SendCommand(Command, String), + + /// A reply for the command cannot be received from the spawned projection. + #[error("cannot receive reply for command {0:?} from spawned projection {1}")] + ReceiveResponse(Command, String), +} + +#[derive(Debug, Clone, Copy)] +pub enum ErrorStrategy { + Retry(Duration), + Stop, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct State { + pub seq_no: Option, + pub running: bool, + pub error: Option, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum Command { + Run, + Stop, + GetState, +} + +#[derive(Debug, Error)] +enum RunError { + #[error("{0}")] + Sqlx(String, #[source] sqlx::Error), + + #[error("cannot deserialize event")] + De(#[source] serde_json::Error), + + #[error(transparent)] + Handler(BoxError), +} + +async fn run_projection( + type_name: &'static str, + name: String, + state: Arc>, + event_handler: H, + pool: Pool, + error_strategy: ErrorStrategy, +) where + E: for<'de> Deserialize<'de> + Send, + H: EventHandler + Sync + 'static, +{ + task::spawn({ + async move { + loop { + let result = + start_projection::(type_name, &name, &event_handler, &pool, &state).await; + match result { + Ok(_) => { + info!(type_name, name, "projection stopped"); + { + let mut state = state.write().await; + state.running = false; + } + break; + } + + Err(error) => { + error!( + error = error.as_chain(), + type_name, name, "projection error" + ); + + match error_strategy { + ErrorStrategy::Retry(delay) => { + info!(type_name, name, ?delay, "projection retrying after error"); + { + let mut state = state.write().await; + state.error = Some(error.to_string()); + } + sleep(delay).await + } + + ErrorStrategy::Stop => { + info!(type_name, name, "projection stopped after error"); + { + let mut state = state.write().await; + state.running = false; + state.error = Some(error.to_string()); + } + break; + } + } + } + } + } + } + }); +} + +async fn start_projection( + type_name: &'static str, + name: &str, + handler: &H, + pool: &Pool, + state: &Arc>, +) -> Result<(), RunError> +where + E: for<'de> Deserialize<'de> + Send, + H: EventHandler, +{ + debug!(type_name, name, "starting projection"); + + // We add 1 here, so we can later query events starting with this seq_no. + let seq_no = load_seq_no(name, pool) + .await? + .map(|n| n.saturating_add(1)) + .unwrap_or(1); + let events = events_by_type::(type_name, seq_no, Duration::from_secs(2), pool).await?; + let mut events = pin!(events); + + while let Some(event) = events.next().await { + if !state.read().await.running { + break; + }; + + let (seq_no, event) = event?; + + let mut tx = pool + .begin() + .await + .map_err(|error| RunError::Sqlx("cannot begin transaction".to_string(), error))?; + handler + .handle_event(event, &mut tx) + .await + .map_err(|error| RunError::Handler(error.into()))?; + debug!(type_name, name, seq_no, "projection handled event"); + save_seq_no(seq_no, name, &mut tx).await?; + tx.commit() + .await + .map_err(|error| RunError::Sqlx("cannot commit transaction".to_string(), error))?; + + state.write().await.seq_no = Some(seq_no); + } + + Ok(()) +} + +async fn load_seq_no(name: &str, pool: &Pool) -> Result, RunError> { + let seq_no = sqlx::query( + "SELECT seq_no + FROM projection + WHERE name=$1", + ) + .bind(name) + .fetch_optional(&**pool) + .await + .map_err(|error| RunError::Sqlx("cannot load seq_no".to_string(), error))? + .map(|row| { + row.try_get::("seq_no") + .map_err(|error| RunError::Sqlx("cannot get seq_no from row".to_string(), error)) + }) + .transpose()?; + Ok(seq_no) +} + +async fn events_by_type<'a, E>( + type_name: &'static str, + seq_no: i64, + poll_interval: Duration, + pool: &'a Pool, +) -> Result> + Send + 'a, RunError> +where + E: for<'de> Deserialize<'de> + Send + 'a, +{ + let last_seq_no = sqlx::query( + "SELECT MAX(seq_no) + FROM event + WHERE type_name = $1", + ) + .bind(type_name) + .fetch_one(&**pool) + .await + .map_err(|error| RunError::Sqlx("cannot select max version".to_string(), error)) + .map(into_seq_no)?; + + debug!(last_seq_no, seq_no, "selected last seq_no"); + + let mut current_seq_no = seq_no; + let events = stream! { + 'outer: loop { + let events = + current_events_by_type::(type_name, current_seq_no, pool) + .await; + + for await event in events { + match event { + Ok(event @ (seq_no, _)) => { + current_seq_no = seq_no.get() as i64 + 1; + yield Ok(event); + } + + Err(error) => { + yield Err(error); + break 'outer; + } + } + } + + // Only sleep if requesting future events. + if current_seq_no >= last_seq_no { + sleep(poll_interval).await; + } + } + }; + + Ok(events) +} + +async fn current_events_by_type( + type_name: &'static str, + seq_no: i64, + pool: &Pool, +) -> impl Stream> + Send +where + E: for<'de> Deserialize<'de>, +{ + sqlx::query( + "SELECT seq_no, event + FROM event + WHERE type_name = $1 AND seq_no >= $2 + ORDER BY seq_no ASC", + ) + .bind(type_name) + .bind(seq_no) + .fetch(&**pool) + .map_err(|error| RunError::Sqlx("cannot get next event".to_string(), error)) + .map(|row| { + row.and_then(|row| { + let seq_no = (row.get::(0) as u64) + .try_into() + .expect("seq_no greater zero"); + let value = row.get::(1); + let event = serde_json::from_value::(value).map_err(RunError::De)?; + Ok((seq_no, event)) + }) + }) +} + +async fn save_seq_no( + seq_no: NonZeroU64, + name: &str, + tx: &mut Transaction<'_, Postgres>, +) -> Result<(), RunError> { + let query = "INSERT INTO projection (name, seq_no) + VALUES ($1, $2) + ON CONFLICT (name) DO + UPDATE SET seq_no = $2"; + sqlx::query(query) + .bind(name) + .bind(seq_no.get() as i64) + .execute(&mut **tx) + .await + .map_err(|error| RunError::Sqlx("cannot save sequence number".to_string(), error))?; + Ok(()) +} + +fn into_seq_no(row: PgRow) -> i64 { + // If there is no seq_no there is one row with a NULL column, hence use `try_get`. + row.try_get::(0).ok().unwrap_or_default() +} + +#[cfg(test)] +mod tests { + use crate::{ + pool::{Config, Pool}, + projection::{ErrorStrategy, EventHandler, Projection, State}, + }; + use assert_matches::assert_matches; + use error_ext::BoxError; + use serde_json::Value; + use sqlx::{postgres::PgSslMode, Executor, Postgres, QueryBuilder, Row, Transaction}; + use std::{iter::once, time::Duration}; + use testcontainers::{runners::AsyncRunner, RunnableImage}; + use testcontainers_modules::postgres::Postgres as TCPostgres; + use tokio::time::sleep; + use tracing_test::traced_test; + use uuid::Uuid; + + #[derive(Clone)] + struct TestHandler; + + impl EventHandler for TestHandler { + type Error = sqlx::Error; + + async fn handle_event( + &self, + event: i32, + tx: &mut Transaction<'static, Postgres>, + ) -> Result<(), Self::Error> { + QueryBuilder::new("INSERT INTO test (n) ") + .push_values(once(event), |mut q, event| { + q.push_bind(event); + }) + .build() + .execute(&mut **tx) + .await?; + Ok(()) + } + } + + #[tokio::test] + #[traced_test] + async fn test() -> Result<(), BoxError> { + let container = RunnableImage::from(TCPostgres::default()) + .with_tag("16-alpine") + .start() + .await?; + let pg_port = container.get_host_port_ipv4(5432).await?; + + let config = Config { + host: "localhost".to_string(), + port: pg_port, + user: "postgres".to_string(), + password: "postgres".to_string().into(), + dbname: "postgres".to_string(), + sslmode: PgSslMode::Prefer, + }; + + let pool = Pool::new(config).await?; + + let ddl = include_str!("../sql/create_event_log_uuid.sql"); + (&*pool).execute(ddl).await?; + let ddl = include_str!("../sql/create_projection.sql"); + (&*pool).execute(ddl).await?; + sqlx::query("CREATE TABLE test (n bigint);") + .execute(&*pool) + .await?; + + let values = (1..=100).map(|n| (Uuid::nil(), n, "test", n, Value::Null)); + QueryBuilder::new("INSERT INTO event (entity_id, version, type_name, event, metadata)") + .push_values( + values, + |mut q, (id, version, type_name, event, metadata)| { + let event = serde_json::to_value(event).unwrap(); + q.push_bind(id) + .push_bind(version) + .push_bind(type_name) + .push_bind(event) + .push_bind(metadata); + }, + ) + .build() + .execute(&*pool) + .await?; + + let projection = Projection::by_type_name( + "test", + "test-projection".to_string(), + TestHandler, + ErrorStrategy::Stop, + pool.clone(), + ) + .await; + + QueryBuilder::new("INSERT INTO projection ") + .push_values(once(("test-projection", 10)), |mut q, (name, seq_no)| { + q.push_bind(name).push_bind(seq_no); + }) + .build() + .execute(&*pool) + .await?; + + projection.run().await?; + + let mut state = projection.get_state().await?; + let max = Some(100.try_into()?); + while state.seq_no < max { + sleep(Duration::from_millis(100)).await; + state = projection.get_state().await?; + } + assert_matches!( + state, + State { + seq_no, + running, + error + } if seq_no == max && running && error.is_none() + ); + + projection.stop().await?; + sleep(Duration::from_millis(100)).await; + let state = projection.get_state().await?; + sleep(Duration::from_millis(100)).await; + let mut state_2 = projection.get_state().await?; + while state_2 != state { + sleep(Duration::from_millis(100)).await; + state_2 = projection.get_state().await?; + } + assert_matches!( + state, + State { + seq_no, + running, + error + } if seq_no == Some(100.try_into().unwrap()) && !running && error.is_none() + ); + + let sum = sqlx::query("SELECT * FROM test;") + .fetch_all(&*pool) + .await? + .into_iter() + .map(|row| row.try_get::(0)) + .try_fold(0i64, |acc, n| n.map(|n| acc + n))?; + assert_eq!(sum, 4_995); // sum(1..100) - sum(1..10) + + projection.run().await?; + let mut state = projection.get_state().await?; + while !state.running { + sleep(Duration::from_millis(100)).await; + state = projection.get_state().await?; + } + sleep(Duration::from_millis(100)).await; + + let values = (101..=200).map(|n| (Uuid::nil(), n, "test", n, Value::Null)); + QueryBuilder::new("INSERT INTO event (entity_id, version, type_name, event, metadata)") + .push_values( + values, + |mut q, (id, version, type_name, event, metadata)| { + let event = serde_json::to_value(event).unwrap(); + q.push_bind(id) + .push_bind(version) + .push_bind(type_name) + .push_bind(event) + .push_bind(metadata); + }, + ) + .build() + .execute(&*pool) + .await?; + + let mut state = projection.get_state().await?; + let max = Some(200.try_into()?); + while state.seq_no < max { + sleep(Duration::from_millis(100)).await; + state = projection.get_state().await?; + } + assert_matches!( + state, + State { + seq_no, + running, + error + } if seq_no == max && running && error.is_none() + ); + + projection.stop().await?; + sleep(Duration::from_millis(100)).await; + let state = projection.get_state().await?; + sleep(Duration::from_millis(100)).await; + let mut state_2 = projection.get_state().await?; + while state_2 != state { + sleep(Duration::from_millis(100)).await; + state_2 = projection.get_state().await?; + } + assert_matches!( + state, + State { + seq_no, + running, + error + } if seq_no == Some(200.try_into().unwrap()) && !running && error.is_none() + ); + + let sum = sqlx::query("SELECT * FROM test") + .fetch_all(&*pool) + .await? + .into_iter() + .map(|row| row.try_get::(0)) + .try_fold(0i64, |acc, n| n.map(|n| acc + n))?; + assert_eq!(sum, 20_045); // sum(1..200) - sum(1..10) + + Ok(()) + } +}