Skip to content

Commit

Permalink
WIP controller automations migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
psFried committed Jan 7, 2025
1 parent 82d83a3 commit 127dcf1
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 71 deletions.
2 changes: 1 addition & 1 deletion crates/agent/src/controllers/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<C: ControlPlane + Send + Sync + 'static> Executor for LiveSpecControllerExe
state: &'s mut Self::State,
inbox: &'s mut std::collections::VecDeque<(models::Id, Option<Self::Receive>)>,
) -> anyhow::Result<Self::Outcome> {
let controller_state = super::fetch_controller_state(task_id, pool).await?;
let controller_state = fetch_controller_state(task_id, pool).await?;
let (status, failures, error, next_run) =
run_controller(state, inbox, &controller_state, &self.control_plane).await;

Expand Down
16 changes: 12 additions & 4 deletions crates/agent/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,18 @@ pub struct ControllerState {
pub live_dependency_hash: Option<String>,
}

/// Returns a struct with all of the initial state that's needed to run a
/// controller. If the `live_specs` row still has a `controller_next_run` set,
/// this will return an error so that automation-driven controllers and
/// handler-driven controllers don't try to update the same row.
pub async fn fetch_controller_state(
controller_task_id: Id,
db: impl sqlx::PgExecutor<'static>,
) -> anyhow::Result<ControllerState> {
use agent_sql::TextJson;
use serde_json::value::RawValue;

// TODO: move to agent-sql?
let job = sqlx::query_as!(
let maybe_job = sqlx::query_as!(
agent_sql::controllers::ControllerJob,
r#"select
ls.id as "live_spec_id: Id",
Expand All @@ -102,12 +105,17 @@ pub async fn fetch_controller_state(
join live_specs ls on t.task_id = ls.controller_task_id
left outer join controller_jobs cj on ls.id = cj.live_spec_id
left outer join data_planes dp on ls.data_plane_id = dp.id
where ls.controller_task_id = $1::flowid;"#,
where ls.controller_task_id = $1::flowid
and ls.controller_next_run is null;"#,
controller_task_id as Id,
)
.fetch_one(db)
.fetch_optional(db)
.await
.context("querying for controller job")?;

let Some(job) = maybe_job else {
anyhow::bail!("live_specs row still has legacy controller_next_run set");
};
ControllerState::parse_db_row(&job)
}

Expand Down
34 changes: 1 addition & 33 deletions crates/agent/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ async fn listen_for_tasks(
let maybe_notification = tokio::select! {
_ = recv_timeout => {
should_poke_connection = true;
// This is just a convenient place to ensure that the controller_jobs handler
// gets invoked periodically, since it needs to handle periodic tasks.
task_tx.send(String::from(CONTROLLER_JOBS_TABLE))?;
continue;
},
notify = listener.try_recv() => notify
Expand Down Expand Up @@ -116,39 +113,16 @@ enum Status {
Idle,
}

const PUBLICATIONS_TABLE: &str = "publications";
const CONTROLLER_JOBS_TABLE: &str = "controller_jobs";

/// The purpose of this is to ensure that the controllers handler gets polled
/// promptly after a publication. Without any form of explicit trigger, the
/// controllers handler might otherwise only be polled in response to the
/// periodic trigger in `listen_for_notifications`. But we know that every
/// successful publication will result in at least one controller run, so this
/// notifies the controllers handler after every successful invocation of the
/// publications handler.
struct ControllersHack(tokio::sync::mpsc::UnboundedSender<String>);
impl ControllersHack {
fn notify(&self) {
let _ = self.0.send(String::from(CONTROLLER_JOBS_TABLE));
}
}

struct WrappedHandler {
status: Status,
handler: Box<dyn Handler>,
controllers_hack: Option<ControllersHack>,
}

impl WrappedHandler {
async fn handle_next_job(&mut self, pg_pool: &sqlx::PgPool) -> anyhow::Result<()> {
let allow_background = self.status != Status::PollInteractive;
match self.handler.handle(pg_pool, allow_background).await {
Ok(HandleResult::HadJob) => {
if let Some(hack) = &self.controllers_hack {
hack.notify();
}
Ok(())
}
Ok(HandleResult::HadJob) => Ok(()),
Ok(HandleResult::NoJobs) if self.status == Status::PollInteractive => {
tracing::debug!(handler = %self.handler.name(), "handler completed all interactive jobs");
self.status = Status::PollBackground;
Expand Down Expand Up @@ -191,18 +165,12 @@ where
let mut handlers_by_table = handlers
.into_iter()
.map(|h| {
let controllers_hack = if h.table_name() == PUBLICATIONS_TABLE {
Some(ControllersHack(task_tx.clone()))
} else {
None
};
(
h.table_name().to_string(),
WrappedHandler {
// We'll start by assuming every handler might have interactive jobs to handle
status: Status::PollInteractive,
handler: h,
controllers_hack,
},
)
})
Expand Down
47 changes: 47 additions & 0 deletions supabase/migrations/20241219171749_controller_automations-1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
begin;

-- Add the `controller_task_id` column to the `live_specs` table, which will
-- allow new agent versions to publish specs and use automations for controllers
-- of the new specs, while still allowing legacy agents to run using the old
-- `controller_next_run` column and handler.
alter table public.live_specs
add column controller_task_id public.flowid;

comment on column public.live_specs.controller_task_id is 'The task id of the controller task that is responsible for this spec';

create unique index live_specs_controller_task_id_uindex on public.live_specs (controller_task_id);


-- Update the inferred schema trigger function to support both the new and
-- legacy controller notification mechanisms. This allows both the new and
-- legacy agents to coexist and run controllers.
CREATE or replace FUNCTION internal.on_inferred_schema_update() RETURNS trigger
LANGUAGE plpgsql SECURITY DEFINER
AS $$
declare
controller_task_id flowid;
begin

select ls.controller_task_id into controller_task_id
from public.live_specs ls
where ls.catalog_name = new.collection_name and ls.spec_type = 'collection';
if controller_task_id is not null then
perform internal.send_to_task(
controller_task_id,
'00:00:00:00:00:00:00:00'::flowid,
'{"type":"inferred_schema_updated"}'
);
else
-- Legacy controller notification code, to be removed once the rollout is complete.
-- The least function is necessary in order to avoid delaying a controller job in scenarios
-- where there is a backlog of controller runs that are due.
update live_specs set controller_next_run = least(controller_next_run, now())
where catalog_name = new.collection_name and spec_type = 'collection';
end if;

return null;
end;
$$;


commit;
33 changes: 0 additions & 33 deletions supabase/migrations/20241219171749_controller_automations.sql

This file was deleted.

25 changes: 25 additions & 0 deletions supabase/migrations/20250106141749_controller-automations-2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- Incrementally enable automation-based controllers for existing live specs, while
-- preserving the ability to run legacy controllers for specs that haven't been upgraded yet.
-- This is important because a controller that has been upgraded could subsequently be downgraded
-- by a legacy controller (due to legacy agents running `notify_dependents`, which sets `controller_next_run`).
begin;

with new_tasks as (
update public.live_specs
set
controller_task_id = gen.tid,
controller_next_run = null
from (
select internal.id_generator () as tid, id as live_spec_id
from public.live_specs
where controller_task_id is null
limit 100
) gen
where live_specs.id = gen.live_spec_id
returning controller_task_id
)
insert into internal.tasks (task_id, task_type, wake_at)
select controller_task_id, 2, now ()
from new_tasks;

commit;
63 changes: 63 additions & 0 deletions supabase/migrations/20250106141751_controller-automations-3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
-- To be run after no more legacy agents are running, to complete the transition to automation-based controllers
-- and remove the old `live_specs.controller_next_run` column.
begin;

-- Generate controller task ids for any live specs that still don't have them.
update public.live_specs
set
controller_task_id = gen.tid
from
(
select
internal.id_generator () as tid,
id as live_spec_id
from
public.live_specs
where
controller_task_id is null
) gen
where
live_specs.id = gen.live_spec_id;

alter table public.live_specs
alter column controller_task_id
set
not null;

alter table public.live_specs
drop column controller_next_run;

-- Create automation jobs for any live specs that don't yet have them.
insert into
internal.tasks (task_id, task_type, wake_at)
select
controller_task_id,
2,
controller_next_run
from
public.live_specs;

-- Update the inferred schema trigger function to drop support for legacy controller notifications.
CREATE or replace FUNCTION internal.on_inferred_schema_update() RETURNS trigger
LANGUAGE plpgsql SECURITY DEFINER
AS $$
declare
controller_task_id flowid;
begin

select ls.controller_task_id into controller_task_id
from public.live_specs ls
where ls.catalog_name = new.collection_name and ls.spec_type = 'collection';
if controller_task_id is not null then
perform internal.send_to_task(
controller_task_id,
'00:00:00:00:00:00:00:00'::flowid,
'{"type":"inferred_schema_updated"}'::json
);
end if;

return null;
end;
$$;

commit;

0 comments on commit 127dcf1

Please sign in to comment.