Skip to content

Commit

Permalink
Add migrations to queue article
Browse files Browse the repository at this point in the history
  • Loading branch information
sneakycrow committed Oct 16, 2024
1 parent 34f21d6 commit e6e0d16
Showing 1 changed file with 52 additions and 12 deletions.
64 changes: 52 additions & 12 deletions _drafts/2024-10-15-creating-a-queue-for-video-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ We're going to use Postgres to store jobs to be processed. And when we query pos
we'll use the `SKIP LOCKED` mechanism to give us concurrency within our workers.
We'll also add some retry mechanisms for retrying failed jobs.

### depdencies
### dependencies

We'll mostly be using sqlx for it's simplicity here, but this should be fairly interchangable with
whatever library you're using.
Expand Down Expand Up @@ -64,12 +64,50 @@ We'll want to create two tables, our main one is for managing the jobs, but we a
table, we want to track where the raw video is located, whether it's been processed, and then standard stuff like an ID and
timestamps.

TODO: Add migration for videos
TODO: Add migration for queue
Tip: You can create a top level folder called `migrations` and then use the `sqlx` cli to run them, with `sqlx migrate run`

For the queue table, which holds the scheduled jobs, we want to track a timestamp that allows us to schedule it, how many
failed attempts (if any) there are, it's current status, and then the message payload. Additionally, the standard ids and
timestamps.

```sql 0001_init_queue.sql
CREATE TABLE queue (
id UUID PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,

scheduled_for TIMESTAMP WITH TIME ZONE NOT NULL,
failed_attempts INT NOT NULL,
status INT NOT NULL,
message JSONB NOT NULL
);
CREATE INDEX index_queue_on_scheduled_for ON queue (scheduled_for);
CREATE INDEX index_queue_on_status ON queue (status);
```

Next, we need a table for storing `videos`. These two migratinos don't need to be in any particular order,
or could be combined into one. I just like to isolate my migrations logic.

For our videos, we want to store the `raw_file_path` where the raw video was uploaded, a column `processed_file_path`
for representing where the processed `m3u8` playlist is, and a `processing_status`which maps to the
current status of processing. Additionally, standard id and timestamp columns.

```sql 0002_init_videos.sql
CREATE TABLE videos (
id TEXT PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,

raw_file_path TEXT NOT NULL,
processed_file_path TEXT,
processing_status TEXT NOT NULL DEFAULT 'pending'
);
```

### db module

This module holds our function for creating a database connection pool
This module holds our function for creating a database connection pool. We'll create a connection for our workers
to use for updating the processing status and job queue in our database.

```rust db.rs
use sqlx::postgres::PgPool;
Expand Down Expand Up @@ -126,18 +164,13 @@ impl std::convert::From<sqlx::migrate::MigrateError> for Error {

### queue module

This is the primary module for this article. We're going to define a trait that's our base queue,
and then implement that trait into a `PostgresQueue`. By having the queue itself be a trait we could
use different configurations, like if we moved to a different backend for managing the queue parts.

Our queue needs to be able to push jobs, pull jobs, fails jobs, delete jobs, and clear the queue. All work together
to manage the queue itself.

Before we define the queue, which will hold jobs, we need to define what a `Job` is. A job should have a unique identifier
and some kind of message with a payload to run it. For now, we only need a `ProcessRawVideo` payload, but we'll have this
be a member of an enum representing all payloads called `Message`.

Our specific message for processing a video will accept a `video_id` and a `path`. Presumably, the path itself
Our specific message for processing a video will accept a `video_id` and a `path`. The path should be a path to the raw video.
We included our path as a column on our `videos` table, but including it here saves us a query. Then we can use the `video_id`
to update the row after we're done processing.

```rust lib.rs
/// The job to be processed, containing the message payload
Expand All @@ -158,6 +191,13 @@ pub enum Message {
}
```

Now, we can define our queue. We're going to define a trait that's our base queue,
and then implement that trait into a `PostgresQueue`. By having the queue itself be a trait we could
use different configurations, like if we moved to a different backend for managing the queue parts.

Our queue needs to be able to push jobs, pull jobs, fails jobs, delete jobs, and clear the queue. All work together
to manage the queue itself.

```rust lib.rs
#[async_trait::async_trait]
pub trait Queue: Send + Sync + Debug {
Expand Down

0 comments on commit e6e0d16

Please sign in to comment.