Skip to content

Commit

Permalink
feat(cli): watch artifacts (#897)
Browse files Browse the repository at this point in the history
- Solve [MET
710](https://linear.app/metatypedev/issue/MET-710/watch-artifacts).

<!-- 2. Explain WHY the change cannot be made simpler -->

- ...

<!-- 3. Explain HOW users should update their code -->

#### Migration notes

---

- [x] The change comes with new or modified tests
- [ ] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change
  • Loading branch information
luckasRanarison authored Nov 6, 2024
1 parent 1cbbe57 commit 3c27fca
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 23 deletions.
31 changes: 27 additions & 4 deletions src/meta-cli/src/deploy/actors/task/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use super::action::{
};
use super::command::build_task_command;
use crate::deploy::actors::console::Console;
use crate::deploy::actors::task_manager::TaskRef;
use crate::deploy::actors::task_manager::{self, TaskRef};
use crate::interlude::*;
use crate::secrets::Secrets;
use color_eyre::owo_colors::OwoColorize;
use common::node::Node;
use serde::Deserialize;
use common::typegraph::Typegraph;
use serde::{Deserialize, Deserializer};
use std::{path::Path, sync::Arc};
use tokio::process::Command;

Expand Down Expand Up @@ -113,9 +114,27 @@ pub struct Migration {
pub archive: String,
}

#[derive(Deserialize, Debug, Clone)]
pub struct TypegraphData {
pub name: String,
pub path: PathBuf,
#[serde(deserialize_with = "deserialize_typegraph")]
pub value: Arc<Typegraph>,
}

fn deserialize_typegraph<'de, D>(deserializer: D) -> Result<Arc<Typegraph>, D::Error>
where
D: Deserializer<'de>,
{
let serialized = String::deserialize(deserializer)?;
let typegraph =
serde_json::from_str(&serialized).map_err(|e| serde::de::Error::custom(e.to_string()))?;
Ok(Arc::new(typegraph))
}

#[derive(Deserialize, Debug)]
pub struct DeploySuccess {
pub typegraph: String,
pub typegraph: TypegraphData,
pub messages: Vec<MessageEntry>,
pub migrations: Vec<Migration>,
pub failure: Option<String>,
Expand All @@ -129,7 +148,7 @@ pub struct DeployError {

impl OutputData for DeploySuccess {
fn get_typegraph_name(&self) -> String {
self.typegraph.clone()
self.typegraph.name.clone()
}

fn is_success(&self) -> bool {
Expand Down Expand Up @@ -267,6 +286,10 @@ impl TaskAction for DeployAction {
}))
}
None => {
ctx.task_manager
.do_send(task_manager::message::TypegraphDeployed(
data.typegraph.clone(),
));
ctx.console.info(format!(
"{icon} successfully deployed typegraph {name} from {path}",
icon = "✓".green(),
Expand Down
17 changes: 16 additions & 1 deletion src/meta-cli/src/deploy/actors/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::console::{Console, ConsoleActor};
use super::discovery::DiscoveryActor;
use super::task::action::{TaskAction, TaskActionGenerator};
use super::task::{self, TaskActor, TaskFinishStatus};
use super::watcher::WatcherActor;
use super::watcher::{self, WatcherActor};
use crate::{config::Config, interlude::*};
use colored::OwoColorize;
use futures::channel::oneshot;
Expand All @@ -14,6 +14,7 @@ use signal_handler::set_stop_recipient;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use task::deploy::TypegraphData;

pub mod report;
pub use report::Report;
Expand Down Expand Up @@ -56,6 +57,10 @@ pub mod message {
#[derive(Message)]
#[rtype(result = "()")]
pub struct DiscoveryDone;

#[derive(Message)]
#[rtype(result = "()")]
pub struct TypegraphDeployed(pub TypegraphData);
}

use message::*;
Expand Down Expand Up @@ -526,3 +531,13 @@ impl<A: TaskAction + 'static> Handler<Restart> for TaskManager<A> {
ctx.address().do_send(ForceStop);
}
}

impl<A: TaskAction + 'static> Handler<TypegraphDeployed> for TaskManager<A> {
type Result = ();

fn handle(&mut self, msg: TypegraphDeployed, _ctx: &mut Self::Context) -> Self::Result {
if let Some(addr) = &self.watcher_addr {
addr.do_send(watcher::message::UpdateDependencies(msg.0));
}
}
}
9 changes: 5 additions & 4 deletions src/meta-cli/src/deploy/actors/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use super::task::action::TaskAction;
use super::task_manager::{self, TaskGenerator, TaskManager, TaskReason};
use crate::config::Config;
use crate::deploy::actors::console::ConsoleActor;
use crate::deploy::actors::task::deploy::TypegraphData;
use crate::deploy::push::pusher::RetryManager;
use crate::interlude::*;
use crate::typegraph::dependency_graph::DependencyGraph;
use crate::typegraph::loader::discovery::FileFilter;
use common::typegraph::Typegraph;
use notify_debouncer_mini::notify::{RecommendedWatcher, RecursiveMode};
use notify_debouncer_mini::{new_debouncer, notify, DebounceEventResult, Debouncer};
use pathdiff::diff_paths;
Expand All @@ -31,7 +31,7 @@ pub mod message {

#[derive(Message)]
#[rtype(result = "()")]
pub struct UpdateDependencies(pub Arc<Typegraph>);
pub struct UpdateDependencies(pub TypegraphData);

#[derive(Message)]
#[rtype(result = "()")]
Expand Down Expand Up @@ -152,7 +152,7 @@ impl<A: TaskAction + 'static> Handler<File> for WatcherActor<A> {
let path = msg.0;
if &path == self.config.path.as_ref().unwrap() {
self.console
.warning("metatype configuration filie changed".to_owned());
.warning("metatype configuration file changed".to_owned());
self.console
.warning("reloading all the typegraphs".to_owned());
self.task_manager.do_send(task_manager::message::Restart);
Expand Down Expand Up @@ -202,7 +202,8 @@ impl<A: TaskAction + 'static> Handler<UpdateDependencies> for WatcherActor<A> {
type Result = ();

fn handle(&mut self, msg: UpdateDependencies, _ctx: &mut Self::Context) -> Self::Result {
self.dependency_graph.update_typegraph(&msg.0)
let TypegraphData { path, value, .. } = msg.0;
self.dependency_graph.update_typegraph(path, &value)
}
}

Expand Down
20 changes: 9 additions & 11 deletions src/meta-cli/src/typegraph/dependency_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,29 @@ pub struct DependencyGraph {
}

impl DependencyGraph {
pub fn update_typegraph(&mut self, tg: &Typegraph) {
let path = tg.path.clone().unwrap();
if !self.deps.contains_key(path.as_ref()) {
self.deps.insert(path.to_path_buf(), HashSet::default());
}

let deps = self.deps.get_mut(path.as_ref()).unwrap();
let old_deps = std::mem::replace(deps, tg.deps.iter().cloned().collect());
pub fn update_typegraph(&mut self, path: PathBuf, tg: &Typegraph) {
let parent_dir = path.parent().unwrap();
let artifacts = tg.meta.artifacts.values();
let artifact_paths = artifacts.flat_map(|a| parent_dir.join(&a.path).canonicalize());
let deps = self.deps.entry(path.clone()).or_default();
let old_deps = std::mem::replace(deps, artifact_paths.collect());
let removed_deps = old_deps.difference(deps);
let added_deps = deps.difference(&old_deps);

for removed in removed_deps {
let rdeps = self.reverse_deps.get_mut(removed).unwrap();
rdeps.take(path.as_ref()).unwrap();
rdeps.take(&path).unwrap();
if rdeps.is_empty() {
self.reverse_deps.remove(removed);
}
}

for added in added_deps {
if let Some(set) = self.reverse_deps.get_mut(added) {
set.insert(path.to_path_buf());
set.insert(path.clone());
} else {
self.reverse_deps
.insert(added.clone(), HashSet::from_iter(Some(path.to_path_buf())));
.insert(added.clone(), HashSet::from_iter(Some(path.clone())));
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/typegraph/deno/src/tg_manage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export class Manager {
} as TypegraphOutput;

const deployTarget = await rpc.getDeployTarget();
const { response } = await tgDeploy(reusableTgOutput, {
const { response, serialized } = await tgDeploy(reusableTgOutput, {
typegate: {
url: deployTarget.baseUrl,
auth: new BasicAuth(
Expand All @@ -131,7 +131,14 @@ export class Manager {
defaultMigrationAction: deployData.defaultMigrationAction,
});

log.success({ typegraph: this.#typegraph.name, ...response });
log.success({
typegraph: {
name: this.#typegraph.name,
path: env.typegraph_path,
value: serialized,
},
...response,
});
} catch (err: any) {
log.failure({
typegraph: this.#typegraph.name,
Expand Down
11 changes: 10 additions & 1 deletion src/typegraph/python/typegraph/graph/tg_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,16 @@ def deploy(self):

if not isinstance(response, dict):
raise Exception("unexpected")
Log.success({"typegraph": self.typegraph.name, **response})
Log.success(
{
"typegraph": {
"name": self.typegraph.name,
"path": env.typegraph_path,
"value": ret.serialized,
},
**response,
}
)
except Exception as err:
Log.debug(traceback.format_exc())
if isinstance(err, ErrorStack):
Expand Down
3 changes: 3 additions & 0 deletions tests/e2e/cli/artifacts/ops.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function add({ lhs, rhs }: { lhs: number; rhs: number }) {
return lhs + rhs;
}
16 changes: 16 additions & 0 deletions tests/e2e/cli/typegraphs/deps.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Policy, t, typegraph } from "@typegraph/sdk/index.ts";
import { DenoRuntime } from "@typegraph/sdk/runtimes/deno.ts";

await typegraph("deps", (g) => {
const pub = Policy.public();
const deno = new DenoRuntime();

g.expose({
add: deno
.import(t.struct({ lhs: t.integer(), rhs: t.integer() }), t.integer(), {
name: "add",
module: "../deps/ops.ts",
})
.withPolicy(pub),
});
});
76 changes: 76 additions & 0 deletions tests/e2e/cli/watch_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright Metatype OÜ, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

import * as path from "@std/path";
import { Meta } from "test-utils/mod.ts";
import { MetaTest } from "test-utils/test.ts";
import { killProcess, Lines } from "test-utils/process.ts";

const typegraphConfig = `
typegraphs:
typescript:
include: "api/example.ts"`;

async function setupDirectory(t: MetaTest, dir: string) {
await t.shell([
"bash",
"-c",
`
rm -rf ./tmp && mkdir -p tmp/deps
meta new --template deno ${dir}
cp ./e2e/cli/typegraphs/deps.ts ${path.join(dir, "api", "example.ts")}
cp ./e2e/cli/artifacts/ops.ts ${path.join(dir, "deps", "ops.ts")}
echo "${typegraphConfig}" >> ${path.join(dir, "metatype.yaml")}
`,
]);
}

Meta.test({ name: "meta dev: watch artifacts" }, async (t) => {
const targetDir = path.join(t.workingDir, "tmp");

console.log("Preparing test directory...");

await setupDirectory(t, targetDir);

const metadev = new Deno.Command("meta", {
cwd: targetDir,
args: ["dev", `--gate=http://localhost:${t.port}`],
stderr: "piped",
}).spawn();

const stderr = new Lines(metadev.stderr);

await t.should("upload artifact", async () => {
await stderr.readWhile((line) => !line.includes("artifact uploaded"));
});

await t.should("deploy typegraph", async () => {
await stderr.readWhile(
(line) => !line.includes("successfully deployed typegraph"),
);
});

await t.shell(["bash", "-c", "echo '' >> deps/ops.ts"], {
currentDir: targetDir,
});

await t.should("watch modified file", async () => {
await stderr.readWhile((line) => !line.includes("File modified"));
});

await t.should("re-upload artifact", async () => {
await stderr.readWhile((line) => !line.includes("artifact uploaded"));
});

await t.should("re-deploy typegraph", async () => {
await stderr.readWhile(
(line) => !line.includes("successfully deployed typegraph"),
);
});

t.addCleanup(async () => {
await stderr.close();
await killProcess(metadev);
await t.shell(["rm", "-rf", targetDir]);
});
});

0 comments on commit 3c27fca

Please sign in to comment.