Skip to content

Commit

Permalink
transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzejkop committed Oct 15, 2024
1 parent 9cd87cb commit 90fe013
Showing 1 changed file with 75 additions and 4 deletions.
79 changes: 75 additions & 4 deletions src/database/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,83 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
Ok(Some(root_id as usize))
}

async fn mark_root_as_processed(self, root: &Hash) -> Result<(), Error> {
let mut conn = self.acquire().await?;

let root_id = conn.get_id_by_root(root).await?;

let Some(root_id) = root_id else {
return Err(Error::MissingRoot { root: *root });
};

let root_id = root_id as i64;

sqlx::query(
r#"
UPDATE identities
SET status = $2, mined_at = CURRENT_TIMESTAMP
WHERE id <= $1
AND status <> $2
AND status <> $3;
"#,
)
.bind(root_id)
.bind(<&str>::from(ProcessedStatus::Processed))
.bind(<&str>::from(ProcessedStatus::Mined))
.execute(&mut *conn)
.await?;

sqlx::query(
r#"
UPDATE identities
SET status = $2, mined_at = NULL
WHERE id > $1
"#,
)
.bind(root_id)
.bind(<&str>::from(ProcessedStatus::Pending))
.execute(&mut *conn)
.await?;

Ok(())
}

async fn mark_root_as_mined(self, root: &Hash) -> Result<(), Error> {
let mut conn = self.acquire().await?;

let root_id = conn.get_id_by_root(root).await?;

let Some(root_id) = root_id else {
return Err(Error::MissingRoot { root: *root });
};

let root_id = root_id as i64;

sqlx::query(
r#"
UPDATE identities
SET status = $2
WHERE id <= $1
AND status <> $2
"#,
)
.bind(root_id)
.bind(<&str>::from(ProcessedStatus::Mined))
.execute(&mut *conn)
.await?;

Ok(())
}

async fn mark_all_as_pending(self) -> Result<(), Error> {
let mut conn = self.acquire().await?;

sqlx::query(
r#"
UPDATE identities
SET status = $1, mined_at = NULL
WHERE status <> $1
"#,
UPDATE identities
SET status = $1, mined_at = NULL
WHERE status <> $1
"#,
)
.bind(<&str>::from(ProcessedStatus::Pending))
.execute(&mut *conn)
Expand Down Expand Up @@ -884,3 +952,6 @@ pub trait DbMethods<'c>: Acquire<'c, Database = Postgres> + Sized {
Ok(())
}
}

// Blanket implementation for all types that satisfy the trait bounds
impl<'c, T> DbMethods<'c> for T where T: Acquire<'c, Database = Postgres> + Send + Sync + Sized {}

0 comments on commit 90fe013

Please sign in to comment.