From 127dcf19c1a3b60a22e6e26707fd7b6c64a6aac4 Mon Sep 17 00:00:00 2001 From: Phil Date: Tue, 7 Jan 2025 17:48:07 -0500 Subject: [PATCH] WIP controller automations migrations --- crates/agent/src/controllers/executor.rs | 2 +- crates/agent/src/controllers/mod.rs | 16 +++-- crates/agent/src/handlers.rs | 34 +--------- ...0241219171749_controller_automations-1.sql | 47 ++++++++++++++ .../20241219171749_controller_automations.sql | 33 ---------- ...0250106141749_controller-automations-2.sql | 25 ++++++++ ...0250106141751_controller-automations-3.sql | 63 +++++++++++++++++++ 7 files changed, 149 insertions(+), 71 deletions(-) create mode 100644 supabase/migrations/20241219171749_controller_automations-1.sql delete mode 100644 supabase/migrations/20241219171749_controller_automations.sql create mode 100644 supabase/migrations/20250106141749_controller-automations-2.sql create mode 100644 supabase/migrations/20250106141751_controller-automations-3.sql diff --git a/crates/agent/src/controllers/executor.rs b/crates/agent/src/controllers/executor.rs index 1228934db3..62ed83ce0b 100644 --- a/crates/agent/src/controllers/executor.rs +++ b/crates/agent/src/controllers/executor.rs @@ -135,7 +135,7 @@ impl Executor for LiveSpecControllerExe state: &'s mut Self::State, inbox: &'s mut std::collections::VecDeque<(models::Id, Option)>, ) -> anyhow::Result { - let controller_state = super::fetch_controller_state(task_id, pool).await?; + let controller_state = fetch_controller_state(task_id, pool).await?; let (status, failures, error, next_run) = run_controller(state, inbox, &controller_state, &self.control_plane).await; diff --git a/crates/agent/src/controllers/mod.rs b/crates/agent/src/controllers/mod.rs index 2bcf0056a9..d7a539d72a 100644 --- a/crates/agent/src/controllers/mod.rs +++ b/crates/agent/src/controllers/mod.rs @@ -69,6 +69,10 @@ pub struct ControllerState { pub live_dependency_hash: Option, } +/// Returns a struct with all of the initial state that's needed to run a +/// controller. If the `live_specs` row still has a `controller_next_run` set, +/// this will return an error so that automation-driven controllers and +/// handler-driven controllers don't try to update the same row. pub async fn fetch_controller_state( controller_task_id: Id, db: impl sqlx::PgExecutor<'static>, @@ -76,8 +80,7 @@ pub async fn fetch_controller_state( use agent_sql::TextJson; use serde_json::value::RawValue; - // TODO: move to agent-sql? - let job = sqlx::query_as!( + let maybe_job = sqlx::query_as!( agent_sql::controllers::ControllerJob, r#"select ls.id as "live_spec_id: Id", @@ -102,12 +105,17 @@ pub async fn fetch_controller_state( join live_specs ls on t.task_id = ls.controller_task_id left outer join controller_jobs cj on ls.id = cj.live_spec_id left outer join data_planes dp on ls.data_plane_id = dp.id - where ls.controller_task_id = $1::flowid;"#, + where ls.controller_task_id = $1::flowid + and ls.controller_next_run is null;"#, controller_task_id as Id, ) - .fetch_one(db) + .fetch_optional(db) .await .context("querying for controller job")?; + + let Some(job) = maybe_job else { + anyhow::bail!("live_specs row still has legacy controller_next_run set"); + }; ControllerState::parse_db_row(&job) } diff --git a/crates/agent/src/handlers.rs b/crates/agent/src/handlers.rs index 6da973455f..ce74b0b08b 100644 --- a/crates/agent/src/handlers.rs +++ b/crates/agent/src/handlers.rs @@ -78,9 +78,6 @@ async fn listen_for_tasks( let maybe_notification = tokio::select! { _ = recv_timeout => { should_poke_connection = true; - // This is just a convenient place to ensure that the controller_jobs handler - // gets invoked periodically, since it needs to handle periodic tasks. - task_tx.send(String::from(CONTROLLER_JOBS_TABLE))?; continue; }, notify = listener.try_recv() => notify @@ -116,39 +113,16 @@ enum Status { Idle, } -const PUBLICATIONS_TABLE: &str = "publications"; -const CONTROLLER_JOBS_TABLE: &str = "controller_jobs"; - -/// The purpose of this is to ensure that the controllers handler gets polled -/// promptly after a publication. Without any form of explicit trigger, the -/// controllers handler might otherwise only be polled in response to the -/// periodic trigger in `listen_for_notifications`. But we know that every -/// successful publication will result in at least one controller run, so this -/// notifies the controllers handler after every successful invocation of the -/// publications handler. -struct ControllersHack(tokio::sync::mpsc::UnboundedSender); -impl ControllersHack { - fn notify(&self) { - let _ = self.0.send(String::from(CONTROLLER_JOBS_TABLE)); - } -} - struct WrappedHandler { status: Status, handler: Box, - controllers_hack: Option, } impl WrappedHandler { async fn handle_next_job(&mut self, pg_pool: &sqlx::PgPool) -> anyhow::Result<()> { let allow_background = self.status != Status::PollInteractive; match self.handler.handle(pg_pool, allow_background).await { - Ok(HandleResult::HadJob) => { - if let Some(hack) = &self.controllers_hack { - hack.notify(); - } - Ok(()) - } + Ok(HandleResult::HadJob) => Ok(()), Ok(HandleResult::NoJobs) if self.status == Status::PollInteractive => { tracing::debug!(handler = %self.handler.name(), "handler completed all interactive jobs"); self.status = Status::PollBackground; @@ -191,18 +165,12 @@ where let mut handlers_by_table = handlers .into_iter() .map(|h| { - let controllers_hack = if h.table_name() == PUBLICATIONS_TABLE { - Some(ControllersHack(task_tx.clone())) - } else { - None - }; ( h.table_name().to_string(), WrappedHandler { // We'll start by assuming every handler might have interactive jobs to handle status: Status::PollInteractive, handler: h, - controllers_hack, }, ) }) diff --git a/supabase/migrations/20241219171749_controller_automations-1.sql b/supabase/migrations/20241219171749_controller_automations-1.sql new file mode 100644 index 0000000000..59c8cdd3d2 --- /dev/null +++ b/supabase/migrations/20241219171749_controller_automations-1.sql @@ -0,0 +1,47 @@ +begin; + +-- Add the `controller_task_id` column to the `live_specs` table, which will +-- allow new agent versions to publish specs and use automations for controllers +-- of the new specs, while still allowing legacy agents to run using the old +-- `controller_next_run` column and handler. +alter table public.live_specs +add column controller_task_id public.flowid; + +comment on column public.live_specs.controller_task_id is 'The task id of the controller task that is responsible for this spec'; + +create unique index live_specs_controller_task_id_uindex on public.live_specs (controller_task_id); + + +-- Update the inferred schema trigger function to support both the new and +-- legacy controller notification mechanisms. This allows both the new and +-- legacy agents to coexist and run controllers. +CREATE or replace FUNCTION internal.on_inferred_schema_update() RETURNS trigger + LANGUAGE plpgsql SECURITY DEFINER + AS $$ +declare + controller_task_id flowid; +begin + + select ls.controller_task_id into controller_task_id + from public.live_specs ls + where ls.catalog_name = new.collection_name and ls.spec_type = 'collection'; + if controller_task_id is not null then + perform internal.send_to_task( + controller_task_id, + '00:00:00:00:00:00:00:00'::flowid, + '{"type":"inferred_schema_updated"}' + ); + else + -- Legacy controller notification code, to be removed once the rollout is complete. + -- The least function is necessary in order to avoid delaying a controller job in scenarios + -- where there is a backlog of controller runs that are due. + update live_specs set controller_next_run = least(controller_next_run, now()) + where catalog_name = new.collection_name and spec_type = 'collection'; + end if; + +return null; +end; +$$; + + +commit; diff --git a/supabase/migrations/20241219171749_controller_automations.sql b/supabase/migrations/20241219171749_controller_automations.sql deleted file mode 100644 index 2dda9ba8ee..0000000000 --- a/supabase/migrations/20241219171749_controller_automations.sql +++ /dev/null @@ -1,33 +0,0 @@ -begin; - -alter table public.live_specs -add column controller_task_id public.flowid; - -comment on column public.live_specs.controller_task_id is 'The task id of the controller task that is responsible for this spec'; - -update public.live_specs -set - controller_task_id = ( - select - internal.id_generator () - ) -where - controller_task_id is null; - -alter table public.live_specs -alter column controller_task_id -set - not null; - -insert into - internal.tasks (task_id, task_type, wake_at) -select - controller_task_id, - 2, - controller_next_run -from - public.live_specs; - -create unique index live_specs_controller_task_id_uindex on public.live_specs (controller_task_id); - -commit; diff --git a/supabase/migrations/20250106141749_controller-automations-2.sql b/supabase/migrations/20250106141749_controller-automations-2.sql new file mode 100644 index 0000000000..d21c7d3f1a --- /dev/null +++ b/supabase/migrations/20250106141749_controller-automations-2.sql @@ -0,0 +1,25 @@ +-- Incrementally enable automation-based controllers for existing live specs, while +-- preserving the ability to run legacy controllers for specs that haven't been upgraded yet. +-- This is important because a controller that has been upgraded could subsequently be downgraded +-- by a legacy controller (due to legacy agents running `notify_dependents`, which sets `controller_next_run`). +begin; + +with new_tasks as ( + update public.live_specs + set + controller_task_id = gen.tid, + controller_next_run = null + from ( + select internal.id_generator () as tid, id as live_spec_id + from public.live_specs + where controller_task_id is null + limit 100 + ) gen + where live_specs.id = gen.live_spec_id + returning controller_task_id +) +insert into internal.tasks (task_id, task_type, wake_at) +select controller_task_id, 2, now () +from new_tasks; + +commit; diff --git a/supabase/migrations/20250106141751_controller-automations-3.sql b/supabase/migrations/20250106141751_controller-automations-3.sql new file mode 100644 index 0000000000..05a71e851f --- /dev/null +++ b/supabase/migrations/20250106141751_controller-automations-3.sql @@ -0,0 +1,63 @@ +-- To be run after no more legacy agents are running, to complete the transition to automation-based controllers +-- and remove the old `live_specs.controller_next_run` column. +begin; + +-- Generate controller task ids for any live specs that still don't have them. +update public.live_specs +set + controller_task_id = gen.tid +from + ( + select + internal.id_generator () as tid, + id as live_spec_id + from + public.live_specs + where + controller_task_id is null + ) gen +where + live_specs.id = gen.live_spec_id; + +alter table public.live_specs +alter column controller_task_id +set + not null; + +alter table public.live_specs +drop column controller_next_run; + +-- Create automation jobs for any live specs that don't yet have them. +insert into + internal.tasks (task_id, task_type, wake_at) +select + controller_task_id, + 2, + controller_next_run +from + public.live_specs; + +-- Update the inferred schema trigger function to drop support for legacy controller notifications. +CREATE or replace FUNCTION internal.on_inferred_schema_update() RETURNS trigger + LANGUAGE plpgsql SECURITY DEFINER + AS $$ +declare + controller_task_id flowid; +begin + + select ls.controller_task_id into controller_task_id + from public.live_specs ls + where ls.catalog_name = new.collection_name and ls.spec_type = 'collection'; + if controller_task_id is not null then + perform internal.send_to_task( + controller_task_id, + '00:00:00:00:00:00:00:00'::flowid, + '{"type":"inferred_schema_updated"}'::json + ); + end if; + +return null; +end; +$$; + +commit;