Skip to content

Commit

Permalink
feat(subs,gate): substantial integration (#844)
Browse files Browse the repository at this point in the history
Supersedes #818
* Backend constructs
* Workflow logic
  * Exceptions/Interrupts + configurable `SUBSTANTIAL_RELAUNCH_MS`
  * Runs (start, stop, healthcheck queries)
* Workflows in typescript for now
* Events (receive, handle)
* Std interrupts (`ctx.sleep(..)`, `ctx.ensure(..)`)

Also removed Substantial syntactic sugar from the Deno/PythonRuntime
SDKs as it might introduce confusion on the inner workings of the
Typegate.

#### Migration notes

None

- [x] The change comes with new or modified tests
- [ ] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change
  • Loading branch information
michael-0acf4 authored Sep 23, 2024
1 parent 00ffe5a commit c56803c
Show file tree
Hide file tree
Showing 47 changed files with 5,279 additions and 398 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
],
"python.languageServer": "Pylance",
"python.analysis.extraPaths": [
"typegraph/python"
"./src/typegraph/python"
],
"prettier.proseWrap": "never"
}
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"src/typegate/standalone",
"src/typegraph/core",
"src/xtask",
"src/substantial"
]

exclude = [
Expand All @@ -28,6 +29,7 @@ edition = "2021"
# internal crates
mt_deno = { path = "src/mt_deno/" }
common = { path = "src/common/" }
substantial = { path = "src/substantial/" }
metagen = { path = "src/metagen/" }
typegate_engine = { path = "src/typegate/engine" }

Expand Down
26 changes: 23 additions & 3 deletions src/common/src/typegraph/runtimes/substantial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,42 @@ use std::path::PathBuf;

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct RedisConfig {
pub host: String,
pub port: u16,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum SubstantialBackend {
Fs,
Memory,
Redis(RedisConfig),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SubstantialRuntimeData {
pub endpoint: String,
pub basic_auth_secret: Option<String>,
pub backend: SubstantialBackend,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "lowercase")]
pub enum WorkflowKind {
Python,
Deno,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct WorkflowMatData {
pub name: String,
pub file: String,
pub file: PathBuf,
pub kind: WorkflowKind,
pub deps: Vec<PathBuf>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ModuleMatData {
pub entry_point: PathBuf,
pub deps: Vec<PathBuf>,
}
15 changes: 15 additions & 0 deletions src/substantial/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "substantial"
edition.workspace = true
version.workspace = true

[dependencies]
anyhow.workspace = true
chrono = "0.4.38"
serde.workspace = true
serde_json.workspace = true

protobuf = "3.5.1"

[dev-dependencies]
tokio = { workspace = true, features =["full"] }
7 changes: 7 additions & 0 deletions src/substantial/proto-gen.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set -eux

# https://github.com/protocolbuffers/protobuf/issues/13346

# must be in sync with substantial/Cargo.toml protobuf
cargo install protobuf-codegen
protoc -I . --rust_out=src/protocol protocol/*
50 changes: 50 additions & 0 deletions src/substantial/protocol/events.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
syntax = "proto3";

package substantial.protos.events;

import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";

message Start {
google.protobuf.Struct kwargs = 1;
}

message Save {
uint32 id = 1;
string value = 2;
int32 counter = 3;
}

message Sleep {
uint32 id = 1;
google.protobuf.Timestamp start = 2;
google.protobuf.Timestamp end = 3;
}

message Send {
string name = 1;
string value = 2;
}

message Stop {
oneof result {
string ok = 1;
string err = 2;
}
}

message Event {
google.protobuf.Timestamp at = 1;
oneof of {
Start start = 10;
Save save = 11;
Sleep sleep = 12;
Send send = 13;
Stop stop = 14;
}
};

message Records {
string run_id = 1;
repeated Event events = 2;
}
27 changes: 27 additions & 0 deletions src/substantial/protocol/metadata.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
syntax = "proto3";

package substantial.protos.metadata;

import "google/protobuf/timestamp.proto";

message Info {
string message = 1;
}
message Error {
string message = 1;
string stack = 2;
string type = 3;
}

message Metadata {
google.protobuf.Timestamp at = 1;
oneof of {
Info info = 10;
Error error = 11;
}
};

message Records {
string run_id = 1;
repeated Metadata metadata = 2;
}
177 changes: 177 additions & 0 deletions src/substantial/src/backends/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use super::Backend;
use crate::protocol::{
events::{Event, Records},
metadata::Metadata,
};
use anyhow::{Context, Ok, Result};
use chrono::{DateTime, Utc};
use protobuf::Message;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::{fs, path::Path};

pub struct FsBackend {
root: PathBuf,
}

impl FsBackend {
pub fn new(root: &Path) -> Self {
for d in &["runs", "schedules", "leases"] {
fs::create_dir_all(root.join(d))
.with_context(|| "Failed to create directory")
.unwrap();
}
Self {
root: root.to_owned(),
}
}

pub fn run_path(&self, run_id: &str) -> PathBuf {
self.root.join("runs").join(run_id)
}

pub fn schedule_path(&self, queue: &str) -> PathBuf {
self.root.join("schedules").join(queue)
}
}

impl Backend for FsBackend {
fn read_events(&self, run_id: String) -> Result<Option<Records>> {
let f = self.run_path(&run_id).join("events");
if !f.exists() {
return Ok(None);
}
let file = fs::File::open(f)?;
let content = file.bytes().collect::<Result<Vec<_>, _>>()?;
Ok(Some(Records::parse_from_bytes(&content)?))
}

fn write_events(&self, run_id: String, content: Records) -> Result<()> {
let f = self.run_path(&run_id).join("events");
fs::create_dir_all(f.parent().with_context(|| "Parent")?)
.with_context(|| "Failed to create directory")?;
let mut file = fs::File::create(f).with_context(|| "Failed to create file")?;
file.write_all(&content.write_to_bytes()?)
.with_context(|| "Failed to write to file")?;
Ok(())
}

fn read_all_metadata(&self, run_id: String) -> Result<Vec<Metadata>> {
let mut ret = Vec::new();
let f = self.run_path(&run_id).join("logs");
if f.exists() {
for log in fs::read_dir(f).with_context(|| "Failed to read directory")? {
let log = log.with_context(|| "Failed to read log entry")?.path();
let content = fs::File::open(&log)?
.bytes()
.collect::<Result<Vec<_>, _>>()?;
ret.push(Metadata::parse_from_bytes(&content).unwrap());
}
}
Ok(ret)
}

fn append_metadata(
&self,
run_id: String,
schedule: DateTime<Utc>,
content: String,
) -> Result<()> {
let f = self
.run_path(&run_id)
.join("logs")
.join(schedule.to_rfc3339());
fs::create_dir_all(f.parent().unwrap()).with_context(|| "Failed to create directory")?;
let mut file = fs::File::create(f).with_context(|| "Failed to create log file")?;
file.write_all(content.as_bytes())
.with_context(|| "Failed to write to log file")?;
Ok(())
}

fn add_schedule(
&self,
queue: String,
run_id: String,
schedule: DateTime<Utc>,
content: Option<Event>,
) -> Result<()> {
let q = self.schedule_path(&queue);

if q.exists() {
for sched in fs::read_dir(&q).with_context(|| "Failed to read schedule directory")? {
let sched = sched
.with_context(|| "Failed to read schedule entry")?
.path();
for planned in
fs::read_dir(&sched).with_context(|| "Failed to read planned directory")?
{
let planned = planned
.with_context(|| "Failed to read planned entry")?
.path();
let planned_date =
DateTime::parse_from_rfc3339(sched.file_name().unwrap().to_str().unwrap())
.with_context(|| "Failed to parse date")?
.with_timezone(&Utc); // !
if planned.file_name().unwrap().to_string_lossy() == run_id
&& planned_date <= schedule
&& self
.read_schedule(queue.clone(), run_id.clone(), planned_date)?
.is_none()
{
self.close_schedule(queue.clone(), run_id.clone(), planned_date)?;
}
}
}
}

// ** https://datatracker.ietf.org/doc/html/rfc3339 **
let f1 = q.join(schedule.to_rfc3339()).join(run_id);

fs::create_dir_all(f1.parent().unwrap())
.with_context(|| "Failed to create schedule directory")?;
let mut file = fs::File::create(f1).with_context(|| "Failed to create schedule file")?;
if let Some(content) = content {
file.write_all(&content.write_to_bytes()?)
.with_context(|| "Failed to write schedule file")
} else {
file.write_all(b"") // !
.with_context(|| "Failed to write empty schedule file")
}
}

fn read_schedule(
&self,
queue: String,
run_id: String,
schedule: DateTime<Utc>,
) -> Result<Option<Event>> {
let f = self
.schedule_path(&queue)
.join(schedule.to_rfc3339())
.join(run_id);
if !f.exists() {
return Ok(None);
}

let file = fs::File::open(f).with_context(|| "Failed to open schedule file")?;
let content = file.bytes().collect::<Result<Vec<_>, _>>()?;
Ok(if content.is_empty() {
None
} else {
Some(Event::parse_from_bytes(&content)?)
})
}

fn close_schedule(&self, queue: String, run_id: String, schedule: DateTime<Utc>) -> Result<()> {
let f = self
.schedule_path(&queue)
.join(schedule.to_rfc3339())
.join(run_id);
if f.exists() {
println!("closed {:?}", f);
fs::remove_file(f).with_context(|| "Failed to delete schedule file")?;
}

Ok(())
}
}
Loading

0 comments on commit c56803c

Please sign in to comment.